From cb16ca43c56e9a24eb5581c9c7ccf024e6e9853d Mon Sep 17 00:00:00 2001 From: eeb Date: Wed, 30 Jul 2003 04:13:55 +0000 Subject: [PATCH] * Optimized qswnal GETs use EKC rpcs to "teleport" payload (as far as portals is concerned). * allowed qswnal to have knowledge of lib_msg_t. * lib_fake_reply_msg() written to allow qswnal to create replies and deliver them in the normal fashion (with lib_finalize()). --- lnet/include/lnet/lib-lnet.h | 2 + lnet/include/lnet/lib-p30.h | 2 + lnet/klnds/qswlnd/qswlnd.c | 8 +- lnet/klnds/qswlnd/qswlnd.h | 27 +- lnet/klnds/qswlnd/qswlnd_cb.c | 476 +++++++++++++++++++++++++------ lnet/libcfs/module.c | 1 + lnet/lnet/lib-move.c | 62 ++++ lustre/portals/include/portals/lib-p30.h | 2 + lustre/portals/knals/qswnal/qswnal.c | 8 +- lustre/portals/knals/qswnal/qswnal.h | 27 +- lustre/portals/knals/qswnal/qswnal_cb.c | 476 +++++++++++++++++++++++++------ lustre/portals/libcfs/module.c | 1 + lustre/portals/portals/lib-move.c | 62 ++++ 13 files changed, 964 insertions(+), 190 deletions(-) diff --git a/lnet/include/lnet/lib-lnet.h b/lnet/include/lnet/lib-lnet.h index b623b93..2401f22 100644 --- a/lnet/include/lnet/lib-lnet.h +++ b/lnet/include/lnet/lib-lnet.h @@ -361,6 +361,8 @@ extern char *dispatch_name(int index); */ extern int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private); extern int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t * msg); +extern lib_msg_t *lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, + lib_md_t *getmd); extern void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr); extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov); diff --git a/lnet/include/lnet/lib-p30.h b/lnet/include/lnet/lib-p30.h index b623b93..2401f22 100644 --- a/lnet/include/lnet/lib-p30.h +++ b/lnet/include/lnet/lib-p30.h @@ -361,6 +361,8 @@ extern char *dispatch_name(int index); */ extern int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private); extern int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t * msg); +extern lib_msg_t *lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, + lib_md_t *getmd); extern void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr); extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov); diff --git a/lnet/klnds/qswlnd/qswlnd.c b/lnet/klnds/qswlnd/qswlnd.c index 1a8fb74..7cfc80e 100644 --- a/lnet/klnds/qswlnd/qswlnd.c +++ b/lnet/klnds/qswlnd/qswlnd.c @@ -383,12 +383,14 @@ kqswnal_initialise (void) } /**********************************************************************/ - /* Reserve Elan address space for transmit buffers */ + /* Reserve Elan address space for transmit descriptors NB we may + * either send the contents of associated buffers immediately, or + * map them for the peer to suck/blow... */ dmareq.Waitfn = DDI_DMA_SLEEP; dmareq.ElanAddr = (E3_Addr) 0; dmareq.Attr = PTE_LOAD_LITTLE_ENDIAN; - dmareq.Perm = ELAN_PERM_REMOTEREAD; + dmareq.Perm = ELAN_PERM_REMOTEWRITE; rc = elan3_dma_reserve(kqswnal_data.kqn_epdev->DmaState, KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS), @@ -552,7 +554,7 @@ kqswnal_initialise (void) rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx, krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE, 0); - if (rc != 0) + if (rc != ESUCCESS) { CERROR ("failed ep_queue_receive %d\n", rc); kqswnal_finalise (); diff --git a/lnet/klnds/qswlnd/qswlnd.h b/lnet/klnds/qswlnd/qswlnd.h index ef19ace..5cbbb9a 100644 --- a/lnet/klnds/qswlnd/qswlnd.h +++ b/lnet/klnds/qswlnd/qswlnd.h @@ -75,6 +75,8 @@ #include #include +#define KQSW_OPTIMIZE_GETS 1 + #define KQSW_CHECKSUM 0 #if KQSW_CHECKSUM typedef unsigned long kqsw_csum_t; @@ -131,6 +133,12 @@ typedef unsigned long kqsw_csum_t; #define KQSW_NRXMSGBYTES_LARGE (KQSW_NRXMSGPAGES_LARGE * PAGE_SIZE) /* biggest complete packet we can receive (or transmit) */ +/* Remote memory descriptor */ +typedef struct +{ + __u32 kqrmd_neiov; /* # frags */ + EP_IOVEC kqrmd_eiov[0]; /* actual frags */ +} kqswnal_remotemd_t; typedef struct { @@ -140,6 +148,8 @@ typedef struct E3_Addr krx_elanaddr; /* Elan address of buffer (contiguous in elan vm) */ int krx_npages; /* # pages in receive buffer */ int krx_nob; /* Number Of Bytes received into buffer */ + atomic_t krx_refcount; /* who's using me? */ + int krx_rpc_completed; /* I completed peer's RPC */ kpr_fwd_desc_t krx_fwd; /* embedded forwarding descriptor */ struct page *krx_pages[KQSW_NRXMSGPAGES_LARGE]; /* pages allocated */ struct iovec krx_iov[KQSW_NRXMSGPAGES_LARGE]; /* iovec for forwarding */ @@ -153,18 +163,22 @@ typedef struct uint32_t ktx_basepage; /* page offset in reserved elan tx vaddrs for mapping pages */ int ktx_npages; /* pages reserved for mapping messages */ int ktx_nmappedpages; /* # pages mapped for current message */ - EP_IOVEC ktx_iov[EP_MAXFRAG]; /* msg frags (elan vaddrs) */ - int ktx_niov; /* # message frags */ int ktx_port; /* destination ep port */ ptl_nid_t ktx_nid; /* destination node */ void *ktx_args[2]; /* completion passthru */ E3_Addr ktx_ebuffer; /* elan address of ktx_buffer */ char *ktx_buffer; /* pre-allocated contiguous buffer for hdr + small payloads */ + int ktx_nfrag; /* # message frags */ + union { + EP_IOVEC iov[EP_MAXFRAG]; /* msg frags (elan vaddrs) */ + EP_DATAVEC datav[EP_MAXFRAG]; /* DMA frags (eolan vaddrs) */ + } ktx_frags; } kqswnal_tx_t; #define KTX_IDLE 0 /* MUST BE ZERO (so zeroed ktx is idle) */ #define KTX_SENDING 1 /* local send */ #define KTX_FORWARDING 2 /* routing a packet */ +#define KTX_GETTING 3 /* local optimised get */ typedef struct { @@ -217,6 +231,8 @@ extern int kqswnal_thread_start (int (*fn)(void *arg), void *arg); extern void kqswnal_rxhandler(EP_RXD *rxd); extern int kqswnal_scheduler (void *); extern void kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd); +extern void kqswnal_reply_complete (EP_RXD *rxd); +extern void kqswnal_requeue_rx (kqswnal_rx_t *krx); static inline ptl_nid_t kqswnal_elanid2nid (int elanid) @@ -235,13 +251,6 @@ kqswnal_nid2elanid (ptl_nid_t nid) return (nid - kqswnal_data.kqn_nid_offset); } -static inline void -kqswnal_requeue_rx (kqswnal_rx_t *krx) -{ - ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx, - krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE); -} - static inline int kqswnal_pages_spanned (void *base, int nob) { diff --git a/lnet/klnds/qswlnd/qswlnd_cb.c b/lnet/klnds/qswlnd/qswlnd_cb.c index 35c7c6c..7032f6b 100644 --- a/lnet/klnds/qswlnd/qswlnd_cb.c +++ b/lnet/klnds/qswlnd/qswlnd_cb.c @@ -26,11 +26,6 @@ #include "qswnal.h" -atomic_t kqswnal_packets_launched; -atomic_t kqswnal_packets_transmitted; -atomic_t kqswnal_packets_received; - - /* * LIB functions follow * @@ -125,7 +120,7 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx) return; CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n", - ktx, ktx->ktx_niov, ktx->ktx_basepage, ktx->ktx_nmappedpages); + ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages); LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages); LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <= @@ -140,15 +135,14 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx) int kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) { - int nfrags = ktx->ktx_niov; - const int maxfrags = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]); + int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; int maxmapped = ktx->ktx_npages; uint32_t basepage = ktx->ktx_basepage + nmapped; char *ptr; LASSERT (nmapped <= maxmapped); - LASSERT (nfrags <= maxfrags); + LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); @@ -167,9 +161,9 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) return (-EMSGSIZE); } - if (nfrags == maxfrags) { + if (nfrags == EP_MAXFRAG) { CERROR("Message too fragmented in Elan VM (max %d frags)\n", - maxfrags); + EP_MAXFRAG); return (-EMSGSIZE); } @@ -185,7 +179,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, kqswnal_data.kqn_eptxdmahandle, ptr, fraglen, - basepage, &ktx->ktx_iov[nfrags].Base); + basepage, &ktx->ktx_frags.iov[nfrags].Base); kunmap (kiov->kiov_page); @@ -193,12 +187,12 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) ktx->ktx_nmappedpages = nmapped; if (nfrags > 0 && /* previous frag mapped */ - ktx->ktx_iov[nfrags].Base == /* contiguous with this one */ - (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len)) + ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */ + (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len)) /* just extend previous */ - ktx->ktx_iov[nfrags - 1].Len += fraglen; + ktx->ktx_frags.iov[nfrags - 1].Len += fraglen; else { - ktx->ktx_iov[nfrags].Len = fraglen; + ktx->ktx_frags.iov[nfrags].Len = fraglen; nfrags++; /* new frag */ } @@ -212,9 +206,9 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) } while (nob > 0); - ktx->ktx_niov = nfrags; + ktx->ktx_nfrag = nfrags; CDEBUG (D_NET, "%p got %d frags over %d pages\n", - ktx, ktx->ktx_niov, ktx->ktx_nmappedpages); + ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages); return (0); } @@ -222,14 +216,13 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) int kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) { - int nfrags = ktx->ktx_niov; - const int maxfrags = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]); + int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; int maxmapped = ktx->ktx_npages; uint32_t basepage = ktx->ktx_basepage + nmapped; LASSERT (nmapped <= maxmapped); - LASSERT (nfrags <= maxfrags); + LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); @@ -247,9 +240,9 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) return (-EMSGSIZE); } - if (nfrags == maxfrags) { + if (nfrags == EP_MAXFRAG) { CERROR("Message too fragmented in Elan VM (max %d frags)\n", - maxfrags); + EP_MAXFRAG); return (-EMSGSIZE); } @@ -261,17 +254,17 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, kqswnal_data.kqn_eptxdmahandle, iov->iov_base, fraglen, - basepage, &ktx->ktx_iov[nfrags].Base); + basepage, &ktx->ktx_frags.iov[nfrags].Base); /* keep in loop for failure case */ ktx->ktx_nmappedpages = nmapped; if (nfrags > 0 && /* previous frag mapped */ - ktx->ktx_iov[nfrags].Base == /* contiguous with this one */ - (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len)) + ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */ + (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len)) /* just extend previous */ - ktx->ktx_iov[nfrags - 1].Len += fraglen; + ktx->ktx_frags.iov[nfrags - 1].Len += fraglen; else { - ktx->ktx_iov[nfrags].Len = fraglen; + ktx->ktx_frags.iov[nfrags].Len = fraglen; nfrags++; /* new frag */ } @@ -285,13 +278,14 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) } while (nob > 0); - ktx->ktx_niov = nfrags; + ktx->ktx_nfrag = nfrags; CDEBUG (D_NET, "%p got %d frags over %d pages\n", - ktx, ktx->ktx_niov, ktx->ktx_nmappedpages); + ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages); return (0); } + void kqswnal_put_idle_tx (kqswnal_tx_t *ktx) { @@ -396,12 +390,16 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */ LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0); + return (ktx); } void kqswnal_tx_done (kqswnal_tx_t *ktx, int error) { + lib_msg_t *msg; + lib_msg_t *repmsg; + switch (ktx->ktx_state) { case KTX_FORWARDING: /* router asked me to forward this packet */ kpr_fwd_done (&kqswnal_data.kqn_router, @@ -413,6 +411,21 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) (lib_msg_t *)ktx->ktx_args[1]); break; + case KTX_GETTING: /* Peer has DMA-ed direct? */ + LASSERT (KQSW_OPTIMIZE_GETS); + msg = (lib_msg_t *)ktx->ktx_args[1]; + repmsg = NULL; + + if (error == 0) + repmsg = lib_fake_reply_msg (&kqswnal_lib, + ktx->ktx_nid, msg->md); + + lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg); + + if (repmsg != NULL) + lib_finalize (&kqswnal_lib, NULL, repmsg); + break; + default: LASSERT (0); } @@ -430,13 +443,18 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status) CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status); - if (status == EP_SUCCESS) - atomic_inc (&kqswnal_packets_transmitted); - if (status != EP_SUCCESS) { CERROR ("kqswnal: Transmit failed with %d\n", status); status = -EIO; + + } else if (ktx->ktx_state == KTX_GETTING) { + /* RPC completed OK; what did our peer put in the status + * block? */ + LASSERT (KQSW_OPTIMIZE_GETS); + status = ep_txd_statusblk(txd)->Status; + } else { + status = 0; } kqswnal_tx_done (ktx, status); @@ -452,11 +470,16 @@ kqswnal_launch (kqswnal_tx_t *ktx) int rc; LASSERT (dest >= 0); /* must be a peer */ - rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest, - ktx->ktx_port, attr, kqswnal_txhandler, - ktx, ktx->ktx_iov, ktx->ktx_niov); - if (rc == 0) - atomic_inc (&kqswnal_packets_launched); + if (ktx->ktx_state == KTX_GETTING) { + LASSERT (KQSW_OPTIMIZE_GETS); + rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest, + ktx->ktx_port, attr, kqswnal_txhandler, + ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag); + } else { + rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest, + ktx->ktx_port, attr, kqswnal_txhandler, + ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag); + } if (rc != ENOMEM) return (rc); @@ -547,10 +570,141 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr) } /* end of print_hdr() */ +void +kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) +{ + int i; + + CDEBUG (how, "%s: %d\n", str, n); + for (i = 0; i < n; i++) { + CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len); + } +} + +int +kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv, + int nsrc, EP_IOVEC *src, + int ndst, EP_IOVEC *dst) +{ + int count; + int nob; + + LASSERT (ndv > 0); + LASSERT (nsrc > 0); + LASSERT (ndst > 0); + + for (count = 0; count < ndv; count++, dv++) { + + if (nsrc == 0 || ndst == 0) { + if (nsrc != ndst) { + /* For now I'll barf on any left over entries */ + CERROR ("mismatched src and dst iovs\n"); + return (-EINVAL); + } + return (count); + } + + nob = (src->Len < dst->Len) ? src->Len : dst->Len; + dv->Len = nob; + dv->Source = src->Base; + dv->Dest = dst->Base; + + if (nob >= src->Len) { + src++; + nsrc--; + } else { + src->Len -= nob; + src->Base += nob; + } + + if (nob >= dst->Len) { + dst++; + ndst--; + } else { + src->Len -= nob; + src->Base += nob; + } + } + + CERROR ("DATAVEC too small\n"); + return (-E2BIG); +} + +int +kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, + struct iovec *iov, ptl_kiov_t *kiov, int nob) +{ + kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; + char *buffer = (char *)page_address(krx->krx_pages[0]); + kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE); + EP_IOVEC eiov[EP_MAXFRAG]; + EP_STATUSBLK blk; + int rc; + + LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed); + LASSERT ((iov == NULL) != (kiov == NULL)); + + /* see .*_pack_k?iov comment regarding endian-ness */ + if (buffer + krx->krx_nob < (char *)(rmd + 1)) { + /* msg too small to discover rmd size */ + CERROR ("Incoming message [%d] too small for RMD (%d needed)\n", + krx->krx_nob, ((char *)(rmd + 1)) - buffer); + return (-EINVAL); + } + + if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) { + /* rmd doesn't fit in the incoming message */ + CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n", + krx->krx_nob, rmd->kqrmd_neiov, + ((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer); + return (-EINVAL); + } + + /* Ghastly hack part 1, uses the existing procedures to map the source data... */ + ktx->ktx_nfrag = 0; + if (kiov != NULL) + rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov); + else + rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov); + + if (rc != 0) { + CERROR ("Can't map source data: %d\n", rc); + return (rc); + } + + /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */ + memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0])); + + rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav, + ktx->ktx_nfrag, eiov, + rmd->kqrmd_neiov, rmd->kqrmd_eiov); + if (rc < 0) { + CERROR ("Can't create datavec: %d\n", rc); + return (rc); + } + ktx->ktx_nfrag = rc; + + memset (&blk, 0, sizeof (blk)); /* zero blk.Status */ + + /* Our caller will start to race with kqswnal_rpc_complete... */ + LASSERT (atomic_read (&krx->krx_refcount) == 1); + atomic_set (&krx->krx_refcount, 2); + + rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx, + &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag); + if (rc == ESUCCESS) + return (0); + + /* reset refcount back to 1: we're not going to be racing with + * kqswnal_rely_complete. */ + atomic_set (&krx->krx_refcount, 1); + return (-ECONNABORTED); +} + static int kqswnal_sendmsg (nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, ptl_nid_t nid, @@ -562,7 +716,7 @@ kqswnal_sendmsg (nal_cb_t *nal, { kqswnal_tx_t *ktx; int rc; - ptl_nid_t gatewaynid; + ptl_nid_t targetnid; #if KQSW_CHECKSUM int i; kqsw_csum_t csum; @@ -588,25 +742,25 @@ kqswnal_sendmsg (nal_cb_t *nal, if (payload_nob > KQSW_MAXPAYLOAD) { CERROR ("request exceeds MTU size "LPSZ" (max %u).\n", payload_nob, KQSW_MAXPAYLOAD); - lib_finalize (&kqswnal_lib, private, cookie); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } + targetnid = nid; if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */ - rc = kpr_lookup (&kqswnal_data.kqn_router, nid, &gatewaynid); + rc = kpr_lookup (&kqswnal_data.kqn_router, nid, &targetnid); if (rc != 0) { CERROR("Can't route to "LPX64": router error %d\n", nid, rc); - lib_finalize (&kqswnal_lib, private, cookie); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } - if (kqswnal_nid2elanid (gatewaynid) < 0) { + if (kqswnal_nid2elanid (targetnid) < 0) { CERROR("Bad gateway "LPX64" for "LPX64"\n", - gatewaynid, nid); - lib_finalize (&kqswnal_lib, private, cookie); + targetnid, nid); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } - nid = gatewaynid; } /* I may not block for a transmit descriptor if I might block the @@ -616,10 +770,30 @@ kqswnal_sendmsg (nal_cb_t *nal, in_interrupt())); if (ktx == NULL) { kqswnal_cerror_hdr (hdr); - lib_finalize (&kqswnal_lib, private, cookie); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } + ktx->ktx_args[0] = private; + ktx->ktx_args[1] = libmsg; + +#if KQSW_OPTIMIZE_GETS + if (type == PTL_MSG_REPLY && + ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) { + /* peer expects RPC completion with GET data */ + rc = kqswnal_dma_reply (ktx, + payload_niov, payload_iov, + payload_kiov, payload_nob); + if (rc == 0) + return (0); + + CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); + kqswnal_put_idle_tx (ktx); + lib_finalize (&kqswnal_lib, private, libmsg); + return (-1); + } +#endif + memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */ #if KQSW_CHECKSUM @@ -642,13 +816,59 @@ kqswnal_sendmsg (nal_cb_t *nal, } memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum)); #endif - + /* Set up first frag from pre-mapped buffer (it's at least the * portals header) */ - ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; - ktx->ktx_iov[0].Len = KQSW_HDR_SIZE; - ktx->ktx_niov = 1; + ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE; + ktx->ktx_nfrag = 1; + ktx->ktx_state = KTX_SENDING; /* => lib_finalize() on completion */ + +#if KQSW_OPTIMIZE_GETS + if (type == PTL_MSG_GET && /* doing a GET */ + nid == targetnid) { /* not forwarding */ + lib_md_t *md = libmsg->md; + kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE); + + /* Optimised path: I send over the Elan vaddrs of the get + * sink buffers, and my peer DMAs directly into them. + * + * First I set up ktx as if it was going to send this + * payload, (it needs to map it anyway). This fills + * ktx_frags.iov[1] and onward with the network addresses + * of the get sink frags. I copy these into ktx_buffer, + * immediately after the header, and send that as my GET + * message. + * + * Note that the addresses are sent in native endian-ness. + * When EKC copes with different endian nodes, I'll fix + * this (and eat my hat :) */ + + if ((libmsg->md->options & PTL_MD_KIOV) != 0) + rc = kqswnal_map_tx_kiov (ktx, md->length, + md->md_niov, md->md_iov.kiov); + else + rc = kqswnal_map_tx_iov (ktx, md->length, + md->md_niov, md->md_iov.iov); + + if (rc < 0) { + kqswnal_put_idle_tx (ktx); + lib_finalize (&kqswnal_lib, private, libmsg); + return (-1); + } + rmd->kqrmd_neiov = ktx->ktx_nfrag - 1; + memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1], + rmd->kqrmd_neiov * sizeof (EP_IOVEC)); + + ktx->ktx_nfrag = 1; + ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t, + kqrmd_eiov[rmd->kqrmd_neiov]); + ktx->ktx_state = KTX_GETTING; + payload_nob = rc; + + } else +#endif if (payload_nob > 0) { /* got some payload (something more to do) */ /* make a single contiguous message? */ if (payload_nob <= KQSW_TX_MAXCONTIG) { @@ -660,7 +880,7 @@ kqswnal_sendmsg (nal_cb_t *nal, lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, payload_niov, payload_iov, payload_nob); /* first frag includes payload */ - ktx->ktx_iov[0].Len += payload_nob; + ktx->ktx_frags.iov[0].Len += payload_nob; } else { if (payload_kiov != NULL) rc = kqswnal_map_tx_kiov (ktx, payload_nob, @@ -670,34 +890,32 @@ kqswnal_sendmsg (nal_cb_t *nal, payload_niov, payload_iov); if (rc != 0) { kqswnal_put_idle_tx (ktx); - lib_finalize (&kqswnal_lib, private, cookie); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } } } - ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ? + ktx->ktx_nid = targetnid; + ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ? EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE; - ktx->ktx_nid = nid; - ktx->ktx_state = KTX_SENDING; /* => lib_finalize() on completion */ - ktx->ktx_args[0] = private; - ktx->ktx_args[1] = cookie; rc = kqswnal_launch (ktx); if (rc != 0) { /* failed? */ - CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc); - lib_finalize (&kqswnal_lib, private, cookie); + CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc); + kqswnal_put_idle_tx (ktx); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } - CDEBUG(D_NET, "send to "LPSZ" bytes to "LPX64"\n", payload_nob, nid); + CDEBUG(D_NET, "send to "LPSZ" bytes to "LPX64"\n", payload_nob, targetnid); return (0); } static int kqswnal_send (nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, ptl_nid_t nid, @@ -706,14 +924,14 @@ kqswnal_send (nal_cb_t *nal, struct iovec *payload_iov, size_t payload_nob) { - return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid, + return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid, payload_niov, payload_iov, NULL, payload_nob)); } static int kqswnal_send_pages (nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, ptl_nid_t nid, @@ -722,7 +940,7 @@ kqswnal_send_pages (nal_cb_t *nal, ptl_kiov_t *payload_kiov, size_t payload_nob) { - return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid, + return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid, payload_niov, NULL, payload_kiov, payload_nob)); } @@ -774,14 +992,14 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { /* send from ktx's pre-allocated/mapped contiguous buffer? */ lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob); - ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; /* already mapped */ - ktx->ktx_iov[0].Len = nob; - ktx->ktx_niov = 1; + ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */ + ktx->ktx_frags.iov[0].Len = nob; + ktx->ktx_nfrag = 1; } else { /* zero copy */ - ktx->ktx_niov = 0; /* no frags mapped yet */ + ktx->ktx_nfrag = 0; /* no frags mapped yet */ rc = kqswnal_map_tx_iov (ktx, nob, niov, iov); if (rc != 0) goto failed; @@ -825,6 +1043,97 @@ kqswnal_fwd_callback (void *arg, int error) } void +kqswnal_reply_complete (EP_RXD *rxd) +{ + int status = ep_rxd_status(rxd); + kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd); + kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; + lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1]; + + CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR, + "rxd %p, ktx %p, status %d\n", rxd, ktx, status); + + LASSERT (krx->krx_rxd == rxd); + + krx->krx_rpc_completed = 1; + kqswnal_requeue_rx (krx); + + lib_finalize (&kqswnal_lib, NULL, msg); + kqswnal_put_idle_tx (ktx); +} + +void +kqswnal_rpc_complete (EP_RXD *rxd) +{ + int status = ep_rxd_status(rxd); + kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd); + + CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR, + "rxd %p, krx %p, status %d\n", rxd, krx, status); + + LASSERT (krx->krx_rxd == rxd); + + krx->krx_rpc_completed = 1; + kqswnal_requeue_rx (krx); +} + +void +kqswnal_requeue_rx (kqswnal_rx_t *krx) +{ + EP_STATUSBLK blk; + int rc; + + LASSERT (atomic_read (&krx->krx_refcount) > 0); + if (!atomic_dec_and_test (&krx->krx_refcount)) + return; + + if (!ep_rxd_isrpc(krx->krx_rxd) || + krx->krx_rpc_completed) { + + /* don't actually requeue on shutdown */ + if (kqswnal_data.kqn_shuttingdown) + return; + + ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx, + krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE); + return; + } + + /* Sender wanted an RPC, but we didn't complete it (we must have + * dropped the sender's message). We complete it now with + * failure... */ + memset (&blk, 0, sizeof (blk)); + blk.Status = -ECONNREFUSED; + + atomic_set (&krx->krx_refcount, 1); + + rc = ep_complete_rpc (krx->krx_rxd, + kqswnal_rpc_complete, krx, + &blk, NULL, 0); + if (rc == ESUCCESS) { + /* callback will call me again to requeue, having set + * krx_rpc_completed... */ + return; + } + + CERROR("can't complete RPC: %d\n", rc); + + /* we don't actually requeue on shutdown */ + if (kqswnal_data.kqn_shuttingdown) + return; + + /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue + * from scratch here... */ + rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx, + krx->krx_elanaddr, + krx->krx_npages * PAGE_SIZE, 0); + + LASSERT (rc == ESUCCESS); + /* This needs to be fixed by ep_complete_rpc NOT freeing + * krx->krx_rxd on failure so we can just ep_requeue_receive() */ +} + +void kqswnal_rx (kqswnal_rx_t *krx) { ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]); @@ -846,6 +1155,7 @@ kqswnal_rx (kqswnal_rx_t *krx) { CERROR("dropping packet from "LPX64" for "LPX64 ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid); + kqswnal_requeue_rx (krx); return; } @@ -881,16 +1191,18 @@ kqswnal_rxhandler(EP_RXD *rxd) krx->krx_rxd = rxd; krx->krx_nob = nob; - + LASSERT (atomic_read (&krx->krx_refcount) == 0); + atomic_set (&krx->krx_refcount, 1); + krx->krx_rpc_completed = 0; + /* must receive a whole header to be able to parse */ if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t)) { /* receives complete with failure when receiver is removed */ - if (kqswnal_data.kqn_shuttingdown) - return; + if (!kqswnal_data.kqn_shuttingdown) + CERROR("receive status failed with status %d nob %d\n", + ep_rxd_status(rxd), nob); - CERROR("receive status failed with status %d nob %d\n", - ep_rxd_status(rxd), nob); kqswnal_requeue_rx (krx); return; } @@ -900,8 +1212,6 @@ kqswnal_rxhandler(EP_RXD *rxd) return; } - atomic_inc (&kqswnal_packets_received); - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds); @@ -961,7 +1271,7 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr) static int kqswnal_recvmsg (nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov, @@ -1085,7 +1395,7 @@ kqswnal_recvmsg (nal_cb_t *nal, "csum_nob %d\n", hdr_csum, payload_csum, csum_frags, csum_nob); #endif - lib_finalize(nal, private, cookie); + lib_finalize(nal, private, libmsg); kqswnal_requeue_rx (krx); @@ -1095,25 +1405,25 @@ kqswnal_recvmsg (nal_cb_t *nal, static int kqswnal_recv(nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen) { - return (kqswnal_recvmsg (nal, private, cookie, niov, iov, NULL, mlen, rlen)); + return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen)); } static int kqswnal_recv_pages (nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen) { - return (kqswnal_recvmsg (nal, private, cookie, niov, NULL, kiov, mlen, rlen)); + return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen)); } int diff --git a/lnet/libcfs/module.c b/lnet/libcfs/module.c index e8eb290..9bb7e00 100644 --- a/lnet/libcfs/module.c +++ b/lnet/libcfs/module.c @@ -559,6 +559,7 @@ EXPORT_SYMBOL(lib_copy_kiov2buf); EXPORT_SYMBOL(lib_copy_buf2kiov); EXPORT_SYMBOL(lib_finalize); EXPORT_SYMBOL(lib_parse); +EXPORT_SYMBOL(lib_fake_reply_msg); EXPORT_SYMBOL(lib_init); EXPORT_SYMBOL(lib_fini); EXPORT_SYMBOL(portal_kmemory); diff --git a/lnet/lnet/lib-move.c b/lnet/lnet/lib-move.c index 02f8b60..23527ce 100644 --- a/lnet/lnet/lib-move.c +++ b/lnet/lnet/lib-move.c @@ -1183,6 +1183,68 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret) return ret->rc = PTL_OK; } +lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, + lib_md_t *getmd) +{ + /* The NAL can DMA direct to the GET md (i.e. no REPLY msg). This + * returns a msg the NAL can pass to lib_finalize() so that a REPLY + * event still occurs. + * + * CAVEAT EMPTOR: 'getmd' is passed by pointer so it MUST be valid. + * This can only be guaranteed while a lib_msg_t holds a reference + * on it (ie. pending > 0), so best call this before the + * lib_finalize() of the original GET. */ + + lib_ni_t *ni = &nal->ni; + lib_msg_t *msg; + unsigned long flags; + + state_lock(nal, &flags); + + LASSERT (getmd->pending > 0); + + if (getmd->threshold == 0) { + CERROR ("Dropping REPLY from "LPU64" for inactive MD %p\n", + peer_nid, getmd); + goto drop; + } + + LASSERT (getmd->offset == 0); + + CDEBUG(D_NET, "Reply from "LPU64" md %p\n", peer_nid, getmd); + + msg = get_new_msg (nal, getmd); + if (msg == NULL) { + CERROR("Dropping REPLY from "LPU64" md %p: can't allocate msg\n", + peer_nid, getmd); + goto drop; + } + + if (getmd->eq) { + msg->ev.type = PTL_EVENT_REPLY; + msg->ev.initiator.nid = peer_nid; + msg->ev.initiator.pid = 0; /* XXX FIXME!!! */ + msg->ev.rlength = msg->ev.mlength = getmd->length; + msg->ev.offset = 0; + + lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc); + } + + ni->counters.recv_count++; + ni->counters.recv_length += getmd->length; + + state_unlock(nal, &flags); + + return msg; + + drop: + nal->ni.counters.drop_count++; + nal->ni.counters.drop_length += getmd->length; + + state_unlock (nal, &flags); + + return NULL; +} int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret) { diff --git a/lustre/portals/include/portals/lib-p30.h b/lustre/portals/include/portals/lib-p30.h index b623b93..2401f22 100644 --- a/lustre/portals/include/portals/lib-p30.h +++ b/lustre/portals/include/portals/lib-p30.h @@ -361,6 +361,8 @@ extern char *dispatch_name(int index); */ extern int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private); extern int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t * msg); +extern lib_msg_t *lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, + lib_md_t *getmd); extern void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr); extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov); diff --git a/lustre/portals/knals/qswnal/qswnal.c b/lustre/portals/knals/qswnal/qswnal.c index 1a8fb74..7cfc80e 100644 --- a/lustre/portals/knals/qswnal/qswnal.c +++ b/lustre/portals/knals/qswnal/qswnal.c @@ -383,12 +383,14 @@ kqswnal_initialise (void) } /**********************************************************************/ - /* Reserve Elan address space for transmit buffers */ + /* Reserve Elan address space for transmit descriptors NB we may + * either send the contents of associated buffers immediately, or + * map them for the peer to suck/blow... */ dmareq.Waitfn = DDI_DMA_SLEEP; dmareq.ElanAddr = (E3_Addr) 0; dmareq.Attr = PTE_LOAD_LITTLE_ENDIAN; - dmareq.Perm = ELAN_PERM_REMOTEREAD; + dmareq.Perm = ELAN_PERM_REMOTEWRITE; rc = elan3_dma_reserve(kqswnal_data.kqn_epdev->DmaState, KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS), @@ -552,7 +554,7 @@ kqswnal_initialise (void) rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx, krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE, 0); - if (rc != 0) + if (rc != ESUCCESS) { CERROR ("failed ep_queue_receive %d\n", rc); kqswnal_finalise (); diff --git a/lustre/portals/knals/qswnal/qswnal.h b/lustre/portals/knals/qswnal/qswnal.h index ef19ace..5cbbb9a 100644 --- a/lustre/portals/knals/qswnal/qswnal.h +++ b/lustre/portals/knals/qswnal/qswnal.h @@ -75,6 +75,8 @@ #include #include +#define KQSW_OPTIMIZE_GETS 1 + #define KQSW_CHECKSUM 0 #if KQSW_CHECKSUM typedef unsigned long kqsw_csum_t; @@ -131,6 +133,12 @@ typedef unsigned long kqsw_csum_t; #define KQSW_NRXMSGBYTES_LARGE (KQSW_NRXMSGPAGES_LARGE * PAGE_SIZE) /* biggest complete packet we can receive (or transmit) */ +/* Remote memory descriptor */ +typedef struct +{ + __u32 kqrmd_neiov; /* # frags */ + EP_IOVEC kqrmd_eiov[0]; /* actual frags */ +} kqswnal_remotemd_t; typedef struct { @@ -140,6 +148,8 @@ typedef struct E3_Addr krx_elanaddr; /* Elan address of buffer (contiguous in elan vm) */ int krx_npages; /* # pages in receive buffer */ int krx_nob; /* Number Of Bytes received into buffer */ + atomic_t krx_refcount; /* who's using me? */ + int krx_rpc_completed; /* I completed peer's RPC */ kpr_fwd_desc_t krx_fwd; /* embedded forwarding descriptor */ struct page *krx_pages[KQSW_NRXMSGPAGES_LARGE]; /* pages allocated */ struct iovec krx_iov[KQSW_NRXMSGPAGES_LARGE]; /* iovec for forwarding */ @@ -153,18 +163,22 @@ typedef struct uint32_t ktx_basepage; /* page offset in reserved elan tx vaddrs for mapping pages */ int ktx_npages; /* pages reserved for mapping messages */ int ktx_nmappedpages; /* # pages mapped for current message */ - EP_IOVEC ktx_iov[EP_MAXFRAG]; /* msg frags (elan vaddrs) */ - int ktx_niov; /* # message frags */ int ktx_port; /* destination ep port */ ptl_nid_t ktx_nid; /* destination node */ void *ktx_args[2]; /* completion passthru */ E3_Addr ktx_ebuffer; /* elan address of ktx_buffer */ char *ktx_buffer; /* pre-allocated contiguous buffer for hdr + small payloads */ + int ktx_nfrag; /* # message frags */ + union { + EP_IOVEC iov[EP_MAXFRAG]; /* msg frags (elan vaddrs) */ + EP_DATAVEC datav[EP_MAXFRAG]; /* DMA frags (eolan vaddrs) */ + } ktx_frags; } kqswnal_tx_t; #define KTX_IDLE 0 /* MUST BE ZERO (so zeroed ktx is idle) */ #define KTX_SENDING 1 /* local send */ #define KTX_FORWARDING 2 /* routing a packet */ +#define KTX_GETTING 3 /* local optimised get */ typedef struct { @@ -217,6 +231,8 @@ extern int kqswnal_thread_start (int (*fn)(void *arg), void *arg); extern void kqswnal_rxhandler(EP_RXD *rxd); extern int kqswnal_scheduler (void *); extern void kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd); +extern void kqswnal_reply_complete (EP_RXD *rxd); +extern void kqswnal_requeue_rx (kqswnal_rx_t *krx); static inline ptl_nid_t kqswnal_elanid2nid (int elanid) @@ -235,13 +251,6 @@ kqswnal_nid2elanid (ptl_nid_t nid) return (nid - kqswnal_data.kqn_nid_offset); } -static inline void -kqswnal_requeue_rx (kqswnal_rx_t *krx) -{ - ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx, - krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE); -} - static inline int kqswnal_pages_spanned (void *base, int nob) { diff --git a/lustre/portals/knals/qswnal/qswnal_cb.c b/lustre/portals/knals/qswnal/qswnal_cb.c index 35c7c6c..7032f6b 100644 --- a/lustre/portals/knals/qswnal/qswnal_cb.c +++ b/lustre/portals/knals/qswnal/qswnal_cb.c @@ -26,11 +26,6 @@ #include "qswnal.h" -atomic_t kqswnal_packets_launched; -atomic_t kqswnal_packets_transmitted; -atomic_t kqswnal_packets_received; - - /* * LIB functions follow * @@ -125,7 +120,7 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx) return; CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n", - ktx, ktx->ktx_niov, ktx->ktx_basepage, ktx->ktx_nmappedpages); + ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages); LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages); LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <= @@ -140,15 +135,14 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx) int kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) { - int nfrags = ktx->ktx_niov; - const int maxfrags = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]); + int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; int maxmapped = ktx->ktx_npages; uint32_t basepage = ktx->ktx_basepage + nmapped; char *ptr; LASSERT (nmapped <= maxmapped); - LASSERT (nfrags <= maxfrags); + LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); @@ -167,9 +161,9 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) return (-EMSGSIZE); } - if (nfrags == maxfrags) { + if (nfrags == EP_MAXFRAG) { CERROR("Message too fragmented in Elan VM (max %d frags)\n", - maxfrags); + EP_MAXFRAG); return (-EMSGSIZE); } @@ -185,7 +179,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, kqswnal_data.kqn_eptxdmahandle, ptr, fraglen, - basepage, &ktx->ktx_iov[nfrags].Base); + basepage, &ktx->ktx_frags.iov[nfrags].Base); kunmap (kiov->kiov_page); @@ -193,12 +187,12 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) ktx->ktx_nmappedpages = nmapped; if (nfrags > 0 && /* previous frag mapped */ - ktx->ktx_iov[nfrags].Base == /* contiguous with this one */ - (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len)) + ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */ + (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len)) /* just extend previous */ - ktx->ktx_iov[nfrags - 1].Len += fraglen; + ktx->ktx_frags.iov[nfrags - 1].Len += fraglen; else { - ktx->ktx_iov[nfrags].Len = fraglen; + ktx->ktx_frags.iov[nfrags].Len = fraglen; nfrags++; /* new frag */ } @@ -212,9 +206,9 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) } while (nob > 0); - ktx->ktx_niov = nfrags; + ktx->ktx_nfrag = nfrags; CDEBUG (D_NET, "%p got %d frags over %d pages\n", - ktx, ktx->ktx_niov, ktx->ktx_nmappedpages); + ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages); return (0); } @@ -222,14 +216,13 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) int kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) { - int nfrags = ktx->ktx_niov; - const int maxfrags = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]); + int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; int maxmapped = ktx->ktx_npages; uint32_t basepage = ktx->ktx_basepage + nmapped; LASSERT (nmapped <= maxmapped); - LASSERT (nfrags <= maxfrags); + LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); @@ -247,9 +240,9 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) return (-EMSGSIZE); } - if (nfrags == maxfrags) { + if (nfrags == EP_MAXFRAG) { CERROR("Message too fragmented in Elan VM (max %d frags)\n", - maxfrags); + EP_MAXFRAG); return (-EMSGSIZE); } @@ -261,17 +254,17 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, kqswnal_data.kqn_eptxdmahandle, iov->iov_base, fraglen, - basepage, &ktx->ktx_iov[nfrags].Base); + basepage, &ktx->ktx_frags.iov[nfrags].Base); /* keep in loop for failure case */ ktx->ktx_nmappedpages = nmapped; if (nfrags > 0 && /* previous frag mapped */ - ktx->ktx_iov[nfrags].Base == /* contiguous with this one */ - (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len)) + ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */ + (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len)) /* just extend previous */ - ktx->ktx_iov[nfrags - 1].Len += fraglen; + ktx->ktx_frags.iov[nfrags - 1].Len += fraglen; else { - ktx->ktx_iov[nfrags].Len = fraglen; + ktx->ktx_frags.iov[nfrags].Len = fraglen; nfrags++; /* new frag */ } @@ -285,13 +278,14 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) } while (nob > 0); - ktx->ktx_niov = nfrags; + ktx->ktx_nfrag = nfrags; CDEBUG (D_NET, "%p got %d frags over %d pages\n", - ktx, ktx->ktx_niov, ktx->ktx_nmappedpages); + ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages); return (0); } + void kqswnal_put_idle_tx (kqswnal_tx_t *ktx) { @@ -396,12 +390,16 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */ LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0); + return (ktx); } void kqswnal_tx_done (kqswnal_tx_t *ktx, int error) { + lib_msg_t *msg; + lib_msg_t *repmsg; + switch (ktx->ktx_state) { case KTX_FORWARDING: /* router asked me to forward this packet */ kpr_fwd_done (&kqswnal_data.kqn_router, @@ -413,6 +411,21 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) (lib_msg_t *)ktx->ktx_args[1]); break; + case KTX_GETTING: /* Peer has DMA-ed direct? */ + LASSERT (KQSW_OPTIMIZE_GETS); + msg = (lib_msg_t *)ktx->ktx_args[1]; + repmsg = NULL; + + if (error == 0) + repmsg = lib_fake_reply_msg (&kqswnal_lib, + ktx->ktx_nid, msg->md); + + lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg); + + if (repmsg != NULL) + lib_finalize (&kqswnal_lib, NULL, repmsg); + break; + default: LASSERT (0); } @@ -430,13 +443,18 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status) CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status); - if (status == EP_SUCCESS) - atomic_inc (&kqswnal_packets_transmitted); - if (status != EP_SUCCESS) { CERROR ("kqswnal: Transmit failed with %d\n", status); status = -EIO; + + } else if (ktx->ktx_state == KTX_GETTING) { + /* RPC completed OK; what did our peer put in the status + * block? */ + LASSERT (KQSW_OPTIMIZE_GETS); + status = ep_txd_statusblk(txd)->Status; + } else { + status = 0; } kqswnal_tx_done (ktx, status); @@ -452,11 +470,16 @@ kqswnal_launch (kqswnal_tx_t *ktx) int rc; LASSERT (dest >= 0); /* must be a peer */ - rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest, - ktx->ktx_port, attr, kqswnal_txhandler, - ktx, ktx->ktx_iov, ktx->ktx_niov); - if (rc == 0) - atomic_inc (&kqswnal_packets_launched); + if (ktx->ktx_state == KTX_GETTING) { + LASSERT (KQSW_OPTIMIZE_GETS); + rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest, + ktx->ktx_port, attr, kqswnal_txhandler, + ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag); + } else { + rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest, + ktx->ktx_port, attr, kqswnal_txhandler, + ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag); + } if (rc != ENOMEM) return (rc); @@ -547,10 +570,141 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr) } /* end of print_hdr() */ +void +kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) +{ + int i; + + CDEBUG (how, "%s: %d\n", str, n); + for (i = 0; i < n; i++) { + CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len); + } +} + +int +kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv, + int nsrc, EP_IOVEC *src, + int ndst, EP_IOVEC *dst) +{ + int count; + int nob; + + LASSERT (ndv > 0); + LASSERT (nsrc > 0); + LASSERT (ndst > 0); + + for (count = 0; count < ndv; count++, dv++) { + + if (nsrc == 0 || ndst == 0) { + if (nsrc != ndst) { + /* For now I'll barf on any left over entries */ + CERROR ("mismatched src and dst iovs\n"); + return (-EINVAL); + } + return (count); + } + + nob = (src->Len < dst->Len) ? src->Len : dst->Len; + dv->Len = nob; + dv->Source = src->Base; + dv->Dest = dst->Base; + + if (nob >= src->Len) { + src++; + nsrc--; + } else { + src->Len -= nob; + src->Base += nob; + } + + if (nob >= dst->Len) { + dst++; + ndst--; + } else { + src->Len -= nob; + src->Base += nob; + } + } + + CERROR ("DATAVEC too small\n"); + return (-E2BIG); +} + +int +kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, + struct iovec *iov, ptl_kiov_t *kiov, int nob) +{ + kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; + char *buffer = (char *)page_address(krx->krx_pages[0]); + kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE); + EP_IOVEC eiov[EP_MAXFRAG]; + EP_STATUSBLK blk; + int rc; + + LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed); + LASSERT ((iov == NULL) != (kiov == NULL)); + + /* see .*_pack_k?iov comment regarding endian-ness */ + if (buffer + krx->krx_nob < (char *)(rmd + 1)) { + /* msg too small to discover rmd size */ + CERROR ("Incoming message [%d] too small for RMD (%d needed)\n", + krx->krx_nob, ((char *)(rmd + 1)) - buffer); + return (-EINVAL); + } + + if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) { + /* rmd doesn't fit in the incoming message */ + CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n", + krx->krx_nob, rmd->kqrmd_neiov, + ((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer); + return (-EINVAL); + } + + /* Ghastly hack part 1, uses the existing procedures to map the source data... */ + ktx->ktx_nfrag = 0; + if (kiov != NULL) + rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov); + else + rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov); + + if (rc != 0) { + CERROR ("Can't map source data: %d\n", rc); + return (rc); + } + + /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */ + memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0])); + + rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav, + ktx->ktx_nfrag, eiov, + rmd->kqrmd_neiov, rmd->kqrmd_eiov); + if (rc < 0) { + CERROR ("Can't create datavec: %d\n", rc); + return (rc); + } + ktx->ktx_nfrag = rc; + + memset (&blk, 0, sizeof (blk)); /* zero blk.Status */ + + /* Our caller will start to race with kqswnal_rpc_complete... */ + LASSERT (atomic_read (&krx->krx_refcount) == 1); + atomic_set (&krx->krx_refcount, 2); + + rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx, + &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag); + if (rc == ESUCCESS) + return (0); + + /* reset refcount back to 1: we're not going to be racing with + * kqswnal_rely_complete. */ + atomic_set (&krx->krx_refcount, 1); + return (-ECONNABORTED); +} + static int kqswnal_sendmsg (nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, ptl_nid_t nid, @@ -562,7 +716,7 @@ kqswnal_sendmsg (nal_cb_t *nal, { kqswnal_tx_t *ktx; int rc; - ptl_nid_t gatewaynid; + ptl_nid_t targetnid; #if KQSW_CHECKSUM int i; kqsw_csum_t csum; @@ -588,25 +742,25 @@ kqswnal_sendmsg (nal_cb_t *nal, if (payload_nob > KQSW_MAXPAYLOAD) { CERROR ("request exceeds MTU size "LPSZ" (max %u).\n", payload_nob, KQSW_MAXPAYLOAD); - lib_finalize (&kqswnal_lib, private, cookie); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } + targetnid = nid; if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */ - rc = kpr_lookup (&kqswnal_data.kqn_router, nid, &gatewaynid); + rc = kpr_lookup (&kqswnal_data.kqn_router, nid, &targetnid); if (rc != 0) { CERROR("Can't route to "LPX64": router error %d\n", nid, rc); - lib_finalize (&kqswnal_lib, private, cookie); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } - if (kqswnal_nid2elanid (gatewaynid) < 0) { + if (kqswnal_nid2elanid (targetnid) < 0) { CERROR("Bad gateway "LPX64" for "LPX64"\n", - gatewaynid, nid); - lib_finalize (&kqswnal_lib, private, cookie); + targetnid, nid); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } - nid = gatewaynid; } /* I may not block for a transmit descriptor if I might block the @@ -616,10 +770,30 @@ kqswnal_sendmsg (nal_cb_t *nal, in_interrupt())); if (ktx == NULL) { kqswnal_cerror_hdr (hdr); - lib_finalize (&kqswnal_lib, private, cookie); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } + ktx->ktx_args[0] = private; + ktx->ktx_args[1] = libmsg; + +#if KQSW_OPTIMIZE_GETS + if (type == PTL_MSG_REPLY && + ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) { + /* peer expects RPC completion with GET data */ + rc = kqswnal_dma_reply (ktx, + payload_niov, payload_iov, + payload_kiov, payload_nob); + if (rc == 0) + return (0); + + CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); + kqswnal_put_idle_tx (ktx); + lib_finalize (&kqswnal_lib, private, libmsg); + return (-1); + } +#endif + memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */ #if KQSW_CHECKSUM @@ -642,13 +816,59 @@ kqswnal_sendmsg (nal_cb_t *nal, } memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum)); #endif - + /* Set up first frag from pre-mapped buffer (it's at least the * portals header) */ - ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; - ktx->ktx_iov[0].Len = KQSW_HDR_SIZE; - ktx->ktx_niov = 1; + ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE; + ktx->ktx_nfrag = 1; + ktx->ktx_state = KTX_SENDING; /* => lib_finalize() on completion */ + +#if KQSW_OPTIMIZE_GETS + if (type == PTL_MSG_GET && /* doing a GET */ + nid == targetnid) { /* not forwarding */ + lib_md_t *md = libmsg->md; + kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE); + + /* Optimised path: I send over the Elan vaddrs of the get + * sink buffers, and my peer DMAs directly into them. + * + * First I set up ktx as if it was going to send this + * payload, (it needs to map it anyway). This fills + * ktx_frags.iov[1] and onward with the network addresses + * of the get sink frags. I copy these into ktx_buffer, + * immediately after the header, and send that as my GET + * message. + * + * Note that the addresses are sent in native endian-ness. + * When EKC copes with different endian nodes, I'll fix + * this (and eat my hat :) */ + + if ((libmsg->md->options & PTL_MD_KIOV) != 0) + rc = kqswnal_map_tx_kiov (ktx, md->length, + md->md_niov, md->md_iov.kiov); + else + rc = kqswnal_map_tx_iov (ktx, md->length, + md->md_niov, md->md_iov.iov); + + if (rc < 0) { + kqswnal_put_idle_tx (ktx); + lib_finalize (&kqswnal_lib, private, libmsg); + return (-1); + } + rmd->kqrmd_neiov = ktx->ktx_nfrag - 1; + memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1], + rmd->kqrmd_neiov * sizeof (EP_IOVEC)); + + ktx->ktx_nfrag = 1; + ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t, + kqrmd_eiov[rmd->kqrmd_neiov]); + ktx->ktx_state = KTX_GETTING; + payload_nob = rc; + + } else +#endif if (payload_nob > 0) { /* got some payload (something more to do) */ /* make a single contiguous message? */ if (payload_nob <= KQSW_TX_MAXCONTIG) { @@ -660,7 +880,7 @@ kqswnal_sendmsg (nal_cb_t *nal, lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, payload_niov, payload_iov, payload_nob); /* first frag includes payload */ - ktx->ktx_iov[0].Len += payload_nob; + ktx->ktx_frags.iov[0].Len += payload_nob; } else { if (payload_kiov != NULL) rc = kqswnal_map_tx_kiov (ktx, payload_nob, @@ -670,34 +890,32 @@ kqswnal_sendmsg (nal_cb_t *nal, payload_niov, payload_iov); if (rc != 0) { kqswnal_put_idle_tx (ktx); - lib_finalize (&kqswnal_lib, private, cookie); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } } } - ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ? + ktx->ktx_nid = targetnid; + ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ? EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE; - ktx->ktx_nid = nid; - ktx->ktx_state = KTX_SENDING; /* => lib_finalize() on completion */ - ktx->ktx_args[0] = private; - ktx->ktx_args[1] = cookie; rc = kqswnal_launch (ktx); if (rc != 0) { /* failed? */ - CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc); - lib_finalize (&kqswnal_lib, private, cookie); + CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc); + kqswnal_put_idle_tx (ktx); + lib_finalize (&kqswnal_lib, private, libmsg); return (-1); } - CDEBUG(D_NET, "send to "LPSZ" bytes to "LPX64"\n", payload_nob, nid); + CDEBUG(D_NET, "send to "LPSZ" bytes to "LPX64"\n", payload_nob, targetnid); return (0); } static int kqswnal_send (nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, ptl_nid_t nid, @@ -706,14 +924,14 @@ kqswnal_send (nal_cb_t *nal, struct iovec *payload_iov, size_t payload_nob) { - return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid, + return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid, payload_niov, payload_iov, NULL, payload_nob)); } static int kqswnal_send_pages (nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, ptl_nid_t nid, @@ -722,7 +940,7 @@ kqswnal_send_pages (nal_cb_t *nal, ptl_kiov_t *payload_kiov, size_t payload_nob) { - return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid, + return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid, payload_niov, NULL, payload_kiov, payload_nob)); } @@ -774,14 +992,14 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { /* send from ktx's pre-allocated/mapped contiguous buffer? */ lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob); - ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; /* already mapped */ - ktx->ktx_iov[0].Len = nob; - ktx->ktx_niov = 1; + ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */ + ktx->ktx_frags.iov[0].Len = nob; + ktx->ktx_nfrag = 1; } else { /* zero copy */ - ktx->ktx_niov = 0; /* no frags mapped yet */ + ktx->ktx_nfrag = 0; /* no frags mapped yet */ rc = kqswnal_map_tx_iov (ktx, nob, niov, iov); if (rc != 0) goto failed; @@ -825,6 +1043,97 @@ kqswnal_fwd_callback (void *arg, int error) } void +kqswnal_reply_complete (EP_RXD *rxd) +{ + int status = ep_rxd_status(rxd); + kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd); + kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; + lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1]; + + CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR, + "rxd %p, ktx %p, status %d\n", rxd, ktx, status); + + LASSERT (krx->krx_rxd == rxd); + + krx->krx_rpc_completed = 1; + kqswnal_requeue_rx (krx); + + lib_finalize (&kqswnal_lib, NULL, msg); + kqswnal_put_idle_tx (ktx); +} + +void +kqswnal_rpc_complete (EP_RXD *rxd) +{ + int status = ep_rxd_status(rxd); + kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd); + + CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR, + "rxd %p, krx %p, status %d\n", rxd, krx, status); + + LASSERT (krx->krx_rxd == rxd); + + krx->krx_rpc_completed = 1; + kqswnal_requeue_rx (krx); +} + +void +kqswnal_requeue_rx (kqswnal_rx_t *krx) +{ + EP_STATUSBLK blk; + int rc; + + LASSERT (atomic_read (&krx->krx_refcount) > 0); + if (!atomic_dec_and_test (&krx->krx_refcount)) + return; + + if (!ep_rxd_isrpc(krx->krx_rxd) || + krx->krx_rpc_completed) { + + /* don't actually requeue on shutdown */ + if (kqswnal_data.kqn_shuttingdown) + return; + + ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx, + krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE); + return; + } + + /* Sender wanted an RPC, but we didn't complete it (we must have + * dropped the sender's message). We complete it now with + * failure... */ + memset (&blk, 0, sizeof (blk)); + blk.Status = -ECONNREFUSED; + + atomic_set (&krx->krx_refcount, 1); + + rc = ep_complete_rpc (krx->krx_rxd, + kqswnal_rpc_complete, krx, + &blk, NULL, 0); + if (rc == ESUCCESS) { + /* callback will call me again to requeue, having set + * krx_rpc_completed... */ + return; + } + + CERROR("can't complete RPC: %d\n", rc); + + /* we don't actually requeue on shutdown */ + if (kqswnal_data.kqn_shuttingdown) + return; + + /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue + * from scratch here... */ + rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx, + krx->krx_elanaddr, + krx->krx_npages * PAGE_SIZE, 0); + + LASSERT (rc == ESUCCESS); + /* This needs to be fixed by ep_complete_rpc NOT freeing + * krx->krx_rxd on failure so we can just ep_requeue_receive() */ +} + +void kqswnal_rx (kqswnal_rx_t *krx) { ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]); @@ -846,6 +1155,7 @@ kqswnal_rx (kqswnal_rx_t *krx) { CERROR("dropping packet from "LPX64" for "LPX64 ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid); + kqswnal_requeue_rx (krx); return; } @@ -881,16 +1191,18 @@ kqswnal_rxhandler(EP_RXD *rxd) krx->krx_rxd = rxd; krx->krx_nob = nob; - + LASSERT (atomic_read (&krx->krx_refcount) == 0); + atomic_set (&krx->krx_refcount, 1); + krx->krx_rpc_completed = 0; + /* must receive a whole header to be able to parse */ if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t)) { /* receives complete with failure when receiver is removed */ - if (kqswnal_data.kqn_shuttingdown) - return; + if (!kqswnal_data.kqn_shuttingdown) + CERROR("receive status failed with status %d nob %d\n", + ep_rxd_status(rxd), nob); - CERROR("receive status failed with status %d nob %d\n", - ep_rxd_status(rxd), nob); kqswnal_requeue_rx (krx); return; } @@ -900,8 +1212,6 @@ kqswnal_rxhandler(EP_RXD *rxd) return; } - atomic_inc (&kqswnal_packets_received); - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds); @@ -961,7 +1271,7 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr) static int kqswnal_recvmsg (nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov, @@ -1085,7 +1395,7 @@ kqswnal_recvmsg (nal_cb_t *nal, "csum_nob %d\n", hdr_csum, payload_csum, csum_frags, csum_nob); #endif - lib_finalize(nal, private, cookie); + lib_finalize(nal, private, libmsg); kqswnal_requeue_rx (krx); @@ -1095,25 +1405,25 @@ kqswnal_recvmsg (nal_cb_t *nal, static int kqswnal_recv(nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen) { - return (kqswnal_recvmsg (nal, private, cookie, niov, iov, NULL, mlen, rlen)); + return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen)); } static int kqswnal_recv_pages (nal_cb_t *nal, void *private, - lib_msg_t *cookie, + lib_msg_t *libmsg, unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen) { - return (kqswnal_recvmsg (nal, private, cookie, niov, NULL, kiov, mlen, rlen)); + return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen)); } int diff --git a/lustre/portals/libcfs/module.c b/lustre/portals/libcfs/module.c index e8eb290..9bb7e00 100644 --- a/lustre/portals/libcfs/module.c +++ b/lustre/portals/libcfs/module.c @@ -559,6 +559,7 @@ EXPORT_SYMBOL(lib_copy_kiov2buf); EXPORT_SYMBOL(lib_copy_buf2kiov); EXPORT_SYMBOL(lib_finalize); EXPORT_SYMBOL(lib_parse); +EXPORT_SYMBOL(lib_fake_reply_msg); EXPORT_SYMBOL(lib_init); EXPORT_SYMBOL(lib_fini); EXPORT_SYMBOL(portal_kmemory); diff --git a/lustre/portals/portals/lib-move.c b/lustre/portals/portals/lib-move.c index 02f8b60..23527ce 100644 --- a/lustre/portals/portals/lib-move.c +++ b/lustre/portals/portals/lib-move.c @@ -1183,6 +1183,68 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret) return ret->rc = PTL_OK; } +lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, + lib_md_t *getmd) +{ + /* The NAL can DMA direct to the GET md (i.e. no REPLY msg). This + * returns a msg the NAL can pass to lib_finalize() so that a REPLY + * event still occurs. + * + * CAVEAT EMPTOR: 'getmd' is passed by pointer so it MUST be valid. + * This can only be guaranteed while a lib_msg_t holds a reference + * on it (ie. pending > 0), so best call this before the + * lib_finalize() of the original GET. */ + + lib_ni_t *ni = &nal->ni; + lib_msg_t *msg; + unsigned long flags; + + state_lock(nal, &flags); + + LASSERT (getmd->pending > 0); + + if (getmd->threshold == 0) { + CERROR ("Dropping REPLY from "LPU64" for inactive MD %p\n", + peer_nid, getmd); + goto drop; + } + + LASSERT (getmd->offset == 0); + + CDEBUG(D_NET, "Reply from "LPU64" md %p\n", peer_nid, getmd); + + msg = get_new_msg (nal, getmd); + if (msg == NULL) { + CERROR("Dropping REPLY from "LPU64" md %p: can't allocate msg\n", + peer_nid, getmd); + goto drop; + } + + if (getmd->eq) { + msg->ev.type = PTL_EVENT_REPLY; + msg->ev.initiator.nid = peer_nid; + msg->ev.initiator.pid = 0; /* XXX FIXME!!! */ + msg->ev.rlength = msg->ev.mlength = getmd->length; + msg->ev.offset = 0; + + lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc); + } + + ni->counters.recv_count++; + ni->counters.recv_length += getmd->length; + + state_unlock(nal, &flags); + + return msg; + + drop: + nal->ni.counters.drop_count++; + nal->ni.counters.drop_length += getmd->length; + + state_unlock (nal, &flags); + + return NULL; +} int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret) { -- 1.8.3.1