From: niu Date: Tue, 27 Jan 2004 01:49:56 +0000 (+0000) Subject: * update b_localprc from HEAD X-Git-Tag: v1_7_100~1^120~12 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=c391ddf0dccb542078cf5d95978d4f34ad9d4527;p=fs%2Flustre-release.git * update b_localprc from HEAD --- diff --git a/lnet/klnds/qswlnd/qswlnd_cb.c b/lnet/klnds/qswlnd/qswlnd_cb.c index 43926c9..96749cd 100644 --- a/lnet/klnds/qswlnd/qswlnd_cb.c +++ b/lnet/klnds/qswlnd/qswlnd_cb.c @@ -26,6 +26,9 @@ #include "qswnal.h" +EP_STATUSBLK kqswnal_rpc_success; +EP_STATUSBLK kqswnal_rpc_failed; + /* * LIB functions follow * @@ -128,9 +131,22 @@ kqswnal_notify_peer_down(kqswnal_tx_t *ktx) void kqswnal_unmap_tx (kqswnal_tx_t *ktx) { +#if MULTIRAIL_EKC + int i; +#endif + if (ktx->ktx_nmappedpages == 0) return; - + +#if MULTIRAIL_EKC + CDEBUG(D_NET, "%p unloading %d frags starting at %d\n", + ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag); + + for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++) + ep_dvma_unload(kqswnal_data.kqn_ep, + kqswnal_data.kqn_ep_tx_nmh, + &ktx->ktx_frags[i]); +#else CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n", ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages); @@ -138,9 +154,11 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx) LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <= kqswnal_data.kqn_eptxdmahandle->NumDvmaPages); - elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState, + elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState, kqswnal_data.kqn_eptxdmahandle, ktx->ktx_basepage, ktx->ktx_nmappedpages); + +#endif ktx->ktx_nmappedpages = 0; } @@ -152,12 +170,24 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) int maxmapped = ktx->ktx_npages; uint32_t basepage = ktx->ktx_basepage + nmapped; char *ptr; +#if MULTIRAIL_EKC + EP_RAILMASK railmask; + int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx, + EP_RAILMASK_ALL, + kqswnal_nid2elanid(ktx->ktx_nid)); + if (rail < 0) { + CERROR("No rails available for "LPX64"\n", ktx->ktx_nid); + return (-ENETDOWN); + } + railmask = 1 << rail; +#endif LASSERT (nmapped <= maxmapped); + LASSERT (nfrags >= ktx->ktx_firsttmpfrag); LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); - + do { int fraglen = kiov->kiov_len; @@ -188,25 +218,40 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) "%p[%d] loading %p for %d, page %d, %d total\n", ktx, nfrags, ptr, fraglen, basepage, nmapped); - elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, +#if MULTIRAIL_EKC + ep_dvma_load(kqswnal_data.kqn_ep, NULL, + ptr, fraglen, + kqswnal_data.kqn_ep_tx_nmh, basepage, + &railmask, &ktx->ktx_frags[nfrags]); + + if (nfrags == ktx->ktx_firsttmpfrag || + !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags])) { + /* new frag if this is the first or can't merge */ + nfrags++; + } +#else + elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState, kqswnal_data.kqn_eptxdmahandle, ptr, fraglen, - basepage, &ktx->ktx_frags.iov[nfrags].Base); - - kunmap (kiov->kiov_page); - - /* keep in loop for failure case */ - ktx->ktx_nmappedpages = nmapped; + basepage, &ktx->ktx_frags[nfrags].Base); if (nfrags > 0 && /* previous frag mapped */ - ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */ - (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len)) + ktx->ktx_frags[nfrags].Base == /* contiguous with this one */ + (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len)) /* just extend previous */ - ktx->ktx_frags.iov[nfrags - 1].Len += fraglen; + ktx->ktx_frags[nfrags - 1].Len += fraglen; else { - ktx->ktx_frags.iov[nfrags].Len = fraglen; + ktx->ktx_frags[nfrags].Len = fraglen; nfrags++; /* new frag */ } +#endif + + kunmap (kiov->kiov_page); + + /* keep in loop for failure case */ + ktx->ktx_nmappedpages = nmapped; basepage++; kiov++; @@ -232,8 +277,20 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) int nmapped = ktx->ktx_nmappedpages; int maxmapped = ktx->ktx_npages; uint32_t basepage = ktx->ktx_basepage + nmapped; - +#if MULTIRAIL_EKC + EP_RAILMASK railmask; + int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx, + EP_RAILMASK_ALL, + kqswnal_nid2elanid(ktx->ktx_nid)); + + if (rail < 0) { + CERROR("No rails available for "LPX64"\n", ktx->ktx_nid); + return (-ENETDOWN); + } + railmask = 1 << rail; +#endif LASSERT (nmapped <= maxmapped); + LASSERT (nfrags >= ktx->ktx_firsttmpfrag); LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); @@ -263,22 +320,38 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) ktx, nfrags, iov->iov_base, fraglen, basepage, npages, nmapped); - elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, +#if MULTIRAIL_EKC + ep_dvma_load(kqswnal_data.kqn_ep, NULL, + iov->iov_base, fraglen, + kqswnal_data.kqn_ep_tx_nmh, basepage, + &railmask, &ktx->ktx_frags[nfrags]); + + if (nfrags == ktx->ktx_firsttmpfrag || + !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags])) { + /* new frag if this is the first or can't merge */ + nfrags++; + } +#else + elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState, kqswnal_data.kqn_eptxdmahandle, iov->iov_base, fraglen, - basepage, &ktx->ktx_frags.iov[nfrags].Base); - /* keep in loop for failure case */ - ktx->ktx_nmappedpages = nmapped; + basepage, &ktx->ktx_frags[nfrags].Base); if (nfrags > 0 && /* previous frag mapped */ - ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */ - (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len)) + ktx->ktx_frags[nfrags].Base == /* contiguous with this one */ + (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len)) /* just extend previous */ - ktx->ktx_frags.iov[nfrags - 1].Len += fraglen; + ktx->ktx_frags[nfrags - 1].Len += fraglen; else { - ktx->ktx_frags.iov[nfrags].Len = fraglen; + ktx->ktx_frags[nfrags].Len = fraglen; nfrags++; /* new frag */ } +#endif + + /* keep in loop for failure case */ + ktx->ktx_nmappedpages = nmapped; basepage += npages; iov++; @@ -424,7 +497,6 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) break; case KTX_GETTING: /* Peer has DMA-ed direct? */ - LASSERT (KQSW_OPTIMIZE_GETS); msg = (lib_msg_t *)ktx->ktx_args[1]; repmsg = NULL; @@ -455,8 +527,8 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status) CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status); - if (status != EP_SUCCESS) - { + if (status != EP_SUCCESS) { + CERROR ("Tx completion to "LPX64" failed: %d\n", ktx->ktx_nid, status); @@ -466,8 +538,11 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status) } else if (ktx->ktx_state == KTX_GETTING) { /* RPC completed OK; what did our peer put in the status * block? */ - LASSERT (KQSW_OPTIMIZE_GETS); +#if MULTIRAIL_EKC + status = ep_txd_statusblk(txd)->Data[0]; +#else status = ep_txd_statusblk(txd)->Status; +#endif } else { status = 0; } @@ -488,21 +563,38 @@ kqswnal_launch (kqswnal_tx_t *ktx) LASSERT (dest >= 0); /* must be a peer */ if (ktx->ktx_state == KTX_GETTING) { - LASSERT (KQSW_OPTIMIZE_GETS); + /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The + * other frags are the GET sink which we obviously don't + * send here :) */ +#if MULTIRAIL_EKC + rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest, + ktx->ktx_port, attr, + kqswnal_txhandler, ktx, + NULL, ktx->ktx_frags, 1); +#else rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest, ktx->ktx_port, attr, kqswnal_txhandler, - ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag); + ktx, NULL, ktx->ktx_frags, 1); +#endif } else { +#if MULTIRAIL_EKC + rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest, + ktx->ktx_port, attr, + kqswnal_txhandler, ktx, + NULL, ktx->ktx_frags, ktx->ktx_nfrag); +#else rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest, - ktx->ktx_port, attr, kqswnal_txhandler, - ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag); + ktx->ktx_port, attr, + kqswnal_txhandler, ktx, + ktx->ktx_frags, ktx->ktx_nfrag); +#endif } switch (rc) { - case ESUCCESS: /* success */ + case EP_SUCCESS: /* success */ return (0); - case ENOMEM: /* can't allocate ep txd => queue for later */ + case EP_ENOMEM: /* can't allocate ep txd => queue for later */ LASSERT (in_interrupt()); spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); @@ -516,7 +608,7 @@ kqswnal_launch (kqswnal_tx_t *ktx) default: /* fatal error */ CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc); kqswnal_notify_peer_down(ktx); - return (rc); + return (-EHOSTUNREACH); } } @@ -589,6 +681,7 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr) } /* end of print_hdr() */ +#if !MULTIRAIL_EKC void kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) { @@ -648,6 +741,7 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv, CERROR ("DATAVEC too small\n"); return (-E2BIG); } +#endif int kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, @@ -656,14 +750,17 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; char *buffer = (char *)page_address(krx->krx_pages[0]); kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE); - EP_IOVEC eiov[EP_MAXFRAG]; - EP_STATUSBLK blk; int rc; - - LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed); +#if MULTIRAIL_EKC + int i; +#else + EP_DATAVEC datav[EP_MAXFRAG]; + int ndatav; +#endif + LASSERT (krx->krx_rpc_reply_needed); LASSERT ((iov == NULL) != (kiov == NULL)); - /* see .*_pack_k?iov comment regarding endian-ness */ + /* see kqswnal_sendmsg comment regarding endian-ness */ if (buffer + krx->krx_nob < (char *)(rmd + 1)) { /* msg too small to discover rmd size */ CERROR ("Incoming message [%d] too small for RMD (%d needed)\n", @@ -671,16 +768,16 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, return (-EINVAL); } - if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) { + if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) { /* rmd doesn't fit in the incoming message */ CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n", - krx->krx_nob, rmd->kqrmd_neiov, - (int)(((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer)); + krx->krx_nob, rmd->kqrmd_nfrag, + (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer)); return (-EINVAL); } - /* Ghastly hack part 1, uses the existing procedures to map the source data... */ - ktx->ktx_nfrag = 0; + /* Map the source data... */ + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; if (kiov != NULL) rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov); else @@ -691,32 +788,61 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, return (rc); } - /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */ - memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0])); - - rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav, - ktx->ktx_nfrag, eiov, - rmd->kqrmd_neiov, rmd->kqrmd_eiov); - if (rc < 0) { - CERROR ("Can't create datavec: %d\n", rc); - return (rc); +#if MULTIRAIL_EKC + if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) { + CERROR("Can't cope with unequal # frags: %d local %d remote\n", + ktx->ktx_nfrag, rmd->kqrmd_nfrag); + return (-EINVAL); } - ktx->ktx_nfrag = rc; - - memset (&blk, 0, sizeof (blk)); /* zero blk.Status */ + + for (i = 0; i < rmd->kqrmd_nfrag; i++) + if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) { + CERROR("Can't cope with unequal frags %d(%d):" + " %d local %d remote\n", + i, rmd->kqrmd_nfrag, + ktx->ktx_frags[i].nmd_len, + rmd->kqrmd_frag[i].nmd_len); + return (-EINVAL); + } +#else + ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav, + ktx->ktx_nfrag, ktx->ktx_frags, + rmd->kqrmd_nfrag, rmd->kqrmd_frag); + if (ndatav < 0) { + CERROR ("Can't create datavec: %d\n", ndatav); + return (ndatav); + } +#endif - /* Our caller will start to race with kqswnal_rpc_complete... */ + /* Our caller will start to race with kqswnal_dma_reply_complete... */ LASSERT (atomic_read (&krx->krx_refcount) == 1); atomic_set (&krx->krx_refcount, 2); - rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx, - &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag); - if (rc == ESUCCESS) +#if MULTIRAIL_EKC + rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, + &kqswnal_rpc_success, + ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag); + if (rc == EP_SUCCESS) + return (0); + + /* Well we tried... */ + krx->krx_rpc_reply_needed = 0; +#else + rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx, + &kqswnal_rpc_success, datav, ndatav); + if (rc == EP_SUCCESS) return (0); + /* "old" EKC destroys rxd on failed completion */ + krx->krx_rxd = NULL; +#endif + + CERROR("can't complete RPC: %d\n", rc); + /* reset refcount back to 1: we're not going to be racing with - * kqswnal_rely_complete. */ + * kqswnal_dma_reply_complete. */ atomic_set (&krx->krx_refcount, 1); + return (-ECONNABORTED); } @@ -785,12 +911,12 @@ kqswnal_sendmsg (nal_cb_t *nal, return (PTL_NOSPACE); } + ktx->ktx_nid = targetnid; ktx->ktx_args[0] = private; ktx->ktx_args[1] = libmsg; -#if KQSW_OPTIMIZE_GETS if (type == PTL_MSG_REPLY && - ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) { + ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { if (nid != targetnid || kqswnal_nid2elanid(nid) != ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) { @@ -798,7 +924,7 @@ kqswnal_sendmsg (nal_cb_t *nal, "nid "LPX64" via "LPX64" elanID %d\n", nid, targetnid, ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)); - return(PTL_FAIL); + return (PTL_FAIL); } /* peer expects RPC completion with GET data */ @@ -806,13 +932,12 @@ kqswnal_sendmsg (nal_cb_t *nal, payload_niov, payload_iov, payload_kiov, payload_nob); if (rc == 0) - return (0); + return (PTL_OK); CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); kqswnal_put_idle_tx (ktx); return (PTL_FAIL); } -#endif memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */ ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; @@ -838,15 +963,8 @@ kqswnal_sendmsg (nal_cb_t *nal, memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum)); #endif - /* Set up first frag from pre-mapped buffer (it's at least the - * portals header) */ - ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; - ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE; - ktx->ktx_nfrag = 1; - ktx->ktx_state = KTX_SENDING; /* => lib_finalize() on completion */ - -#if KQSW_OPTIMIZE_GETS - if (type == PTL_MSG_GET && /* doing a GET */ + if (kqswnal_data.kqn_optimized_gets && + type == PTL_MSG_GET && /* doing a GET */ nid == targetnid) { /* not forwarding */ lib_md_t *md = libmsg->md; kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE); @@ -856,8 +974,8 @@ kqswnal_sendmsg (nal_cb_t *nal, * * First I set up ktx as if it was going to send this * payload, (it needs to map it anyway). This fills - * ktx_frags.iov[1] and onward with the network addresses - * of the get sink frags. I copy these into ktx_buffer, + * ktx_frags[1] and onward with the network addresses + * of the GET sink frags. I copy these into ktx_buffer, * immediately after the header, and send that as my GET * message. * @@ -865,6 +983,9 @@ kqswnal_sendmsg (nal_cb_t *nal, * When EKC copes with different endian nodes, I'll fix * this (and eat my hat :) */ + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; + ktx->ktx_state = KTX_GETTING; + if ((libmsg->md->options & PTL_MD_KIOV) != 0) rc = kqswnal_map_tx_kiov (ktx, md->length, md->md_niov, md->md_iov.kiov); @@ -877,46 +998,73 @@ kqswnal_sendmsg (nal_cb_t *nal, return (PTL_FAIL); } - rmd->kqrmd_neiov = ktx->ktx_nfrag - 1; - memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1], - rmd->kqrmd_neiov * sizeof (EP_IOVEC)); + rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1; - ktx->ktx_nfrag = 1; - ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t, - kqrmd_eiov[rmd->kqrmd_neiov]); - payload_nob = ktx->ktx_frags.iov[0].Len; - ktx->ktx_state = KTX_GETTING; - } else + payload_nob = offsetof(kqswnal_remotemd_t, + kqrmd_frag[rmd->kqrmd_nfrag]); + LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE); + +#if MULTIRAIL_EKC + memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1], + rmd->kqrmd_nfrag * sizeof(EP_NMD)); + + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, KQSW_HDR_SIZE + payload_nob); +#else + memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1], + rmd->kqrmd_nfrag * sizeof(EP_IOVEC)); + + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob; +#endif + } else if (payload_nob <= KQSW_TX_MAXCONTIG) { + + /* small message: single frag copied into the pre-mapped buffer */ + + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; + ktx->ktx_state = KTX_SENDING; +#if MULTIRAIL_EKC + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, KQSW_HDR_SIZE + payload_nob); +#else + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob; #endif - if (payload_nob > 0) { /* got some payload (something more to do) */ - /* make a single contiguous message? */ - if (payload_nob <= KQSW_TX_MAXCONTIG) { - /* copy payload to ktx_buffer, immediately after hdr */ + if (payload_nob > 0) { if (payload_kiov != NULL) lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, payload_niov, payload_kiov, payload_nob); else lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, payload_niov, payload_iov, payload_nob); - /* first frag includes payload */ - ktx->ktx_frags.iov[0].Len += payload_nob; - } else { - if (payload_kiov != NULL) - rc = kqswnal_map_tx_kiov (ktx, payload_nob, - payload_niov, payload_kiov); - else - rc = kqswnal_map_tx_iov (ktx, payload_nob, - payload_niov, payload_iov); - if (rc != 0) { - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } - } - } + } + } else { - ktx->ktx_nid = targetnid; + /* large message: multiple frags: first is hdr in pre-mapped buffer */ + + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; + ktx->ktx_state = KTX_SENDING; +#if MULTIRAIL_EKC + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, KQSW_HDR_SIZE); +#else + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE; +#endif + if (payload_kiov != NULL) + rc = kqswnal_map_tx_kiov (ktx, payload_nob, + payload_niov, payload_kiov); + else + rc = kqswnal_map_tx_iov (ktx, payload_nob, + payload_niov, payload_iov); + if (rc != 0) { + kqswnal_put_idle_tx (ktx); + return (PTL_FAIL); + } + } + ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ? - EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE; + EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; rc = kqswnal_launch (ktx); if (rc != 0) { /* failed? */ @@ -962,8 +1110,6 @@ kqswnal_send_pages (nal_cb_t *nal, payload_niov, NULL, payload_kiov, payload_nob)); } -int kqswnal_fwd_copy_contig = 0; - void kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { @@ -984,7 +1130,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) LASSERT (niov > 0); - ktx = kqswnal_get_idle_tx (fwd, FALSE); + ktx = kqswnal_get_idle_tx (fwd, 0); if (ktx == NULL) /* can't get txd right now */ return; /* fwd will be scheduled when tx desc freed */ @@ -1005,20 +1151,31 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) goto failed; } - if ((kqswnal_fwd_copy_contig || niov > 1) && + ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ? + EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; + ktx->ktx_nid = nid; + ktx->ktx_state = KTX_FORWARDING; + ktx->ktx_args[0] = fwd; + + if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) && nob <= KQSW_TX_BUFFER_SIZE) { - /* send from ktx's pre-allocated/mapped contiguous buffer? */ + /* send from ktx's pre-mapped contiguous buffer? */ lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob); - ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */ - ktx->ktx_frags.iov[0].Len = nob; - ktx->ktx_nfrag = 1; +#if MULTIRAIL_EKC + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, nob); +#else + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = nob; +#endif + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; } else { /* zero copy */ - ktx->ktx_nfrag = 0; /* no frags mapped yet */ + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; rc = kqswnal_map_tx_iov (ktx, nob, niov, iov); if (rc != 0) goto failed; @@ -1026,12 +1183,6 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base; } - ktx->ktx_port = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ? - EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE; - ktx->ktx_nid = nid; - ktx->ktx_state = KTX_FORWARDING; /* kpr_put_packet() on completion */ - ktx->ktx_args[0] = fwd; - rc = kqswnal_launch (ktx); if (rc == 0) return; @@ -1064,7 +1215,7 @@ kqswnal_fwd_callback (void *arg, int error) } void -kqswnal_reply_complete (EP_RXD *rxd) +kqswnal_dma_reply_complete (EP_RXD *rxd) { int status = ep_rxd_status(rxd); kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd); @@ -1075,9 +1226,10 @@ kqswnal_reply_complete (EP_RXD *rxd) "rxd %p, ktx %p, status %d\n", rxd, ktx, status); LASSERT (krx->krx_rxd == rxd); + LASSERT (krx->krx_rpc_reply_needed); - krx->krx_rpc_completed = 1; - kqswnal_requeue_rx (krx); + krx->krx_rpc_reply_needed = 0; + kqswnal_rx_done (krx); lib_finalize (&kqswnal_lib, NULL, msg); kqswnal_put_idle_tx (ktx); @@ -1093,67 +1245,76 @@ kqswnal_rpc_complete (EP_RXD *rxd) "rxd %p, krx %p, status %d\n", rxd, krx, status); LASSERT (krx->krx_rxd == rxd); + LASSERT (krx->krx_rpc_reply_needed); - krx->krx_rpc_completed = 1; + krx->krx_rpc_reply_needed = 0; kqswnal_requeue_rx (krx); } void -kqswnal_requeue_rx (kqswnal_rx_t *krx) +kqswnal_requeue_rx (kqswnal_rx_t *krx) { - EP_STATUSBLK blk; - int rc; + int rc; - LASSERT (atomic_read (&krx->krx_refcount) > 0); - if (!atomic_dec_and_test (&krx->krx_refcount)) - return; + LASSERT (atomic_read(&krx->krx_refcount) == 0); - if (!ep_rxd_isrpc(krx->krx_rxd) || - krx->krx_rpc_completed) { + if (krx->krx_rpc_reply_needed) { - /* don't actually requeue on shutdown */ - if (kqswnal_data.kqn_shuttingdown) + /* We failed to complete the peer's optimized GET (e.g. we + * couldn't map the source buffers). We complete the + * peer's EKC rpc now with failure. */ +#if MULTIRAIL_EKC + rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx, + &kqswnal_rpc_failed, NULL, NULL, 0); + if (rc == EP_SUCCESS) return; - ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx, - krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE); - return; - } - - /* Sender wanted an RPC, but we didn't complete it (we must have - * dropped the sender's message). We complete it now with - * failure... */ - memset (&blk, 0, sizeof (blk)); - blk.Status = -ECONNREFUSED; - - atomic_set (&krx->krx_refcount, 1); + CERROR("can't complete RPC: %d\n", rc); +#else + if (krx->krx_rxd != NULL) { + /* We didn't try (and fail) to complete earlier... */ + rc = ep_complete_rpc(krx->krx_rxd, + kqswnal_rpc_complete, krx, + &kqswnal_rpc_failed, NULL, 0); + if (rc == EP_SUCCESS) + return; + + CERROR("can't complete RPC: %d\n", rc); + } + + /* NB the old ep_complete_rpc() frees rxd on failure, so we + * have to requeue from scratch here, unless we're shutting + * down */ + if (kqswnal_data.kqn_shuttingdown) + return; - rc = ep_complete_rpc (krx->krx_rxd, - kqswnal_rpc_complete, krx, - &blk, NULL, 0); - if (rc == ESUCCESS) { - /* callback will call me again to requeue, having set - * krx_rpc_completed... */ + rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx, + krx->krx_elanbuffer, + krx->krx_npages * PAGE_SIZE, 0); + LASSERT (rc == EP_SUCCESS); + /* We don't handle failure here; it's incredibly rare + * (never reported?) and only happens with "old" EKC */ return; +#endif } - CERROR("can't complete RPC: %d\n", rc); - - /* we don't actually requeue on shutdown */ - if (kqswnal_data.kqn_shuttingdown) - return; - - /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue - * from scratch here... */ - rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx, - krx->krx_elanaddr, - krx->krx_npages * PAGE_SIZE, 0); - - LASSERT (rc == ESUCCESS); - /* This needs to be fixed by ep_complete_rpc NOT freeing - * krx->krx_rxd on failure so we can just ep_requeue_receive() */ +#if MULTIRAIL_EKC + if (kqswnal_data.kqn_shuttingdown) { + /* free EKC rxd on shutdown */ + ep_complete_receive(krx->krx_rxd); + } else { + /* repost receive */ + ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx, + &krx->krx_elanbuffer, 0); + } +#else + /* don't actually requeue on shutdown */ + if (!kqswnal_data.kqn_shuttingdown) + ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx, + krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE); +#endif } - + void kqswnal_rx (kqswnal_rx_t *krx) { @@ -1162,9 +1323,12 @@ kqswnal_rx (kqswnal_rx_t *krx) int nob; int niov; + LASSERT (atomic_read(&krx->krx_refcount) == 0); + if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */ - /* NB krx requeued when lib_parse() calls back kqswnal_recv */ + atomic_set(&krx->krx_refcount, 1); lib_parse (&kqswnal_lib, hdr, krx); + kqswnal_rx_done(krx); return; } @@ -1212,18 +1376,27 @@ kqswnal_rxhandler(EP_RXD *rxd) krx->krx_rxd = rxd; krx->krx_nob = nob; - LASSERT (atomic_read (&krx->krx_refcount) == 0); - atomic_set (&krx->krx_refcount, 1); - krx->krx_rpc_completed = 0; +#if MULTIRAIL_EKC + krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd); +#else + krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd); +#endif /* must receive a whole header to be able to parse */ if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t)) { /* receives complete with failure when receiver is removed */ +#if MULTIRAIL_EKC + if (status == EP_SHUTDOWN) + LASSERT (kqswnal_data.kqn_shuttingdown); + else + CERROR("receive status failed with status %d nob %d\n", + ep_rxd_status(rxd), nob); +#else if (!kqswnal_data.kqn_shuttingdown) CERROR("receive status failed with status %d nob %d\n", ep_rxd_status(rxd), nob); - +#endif kqswnal_requeue_rx (krx); return; } @@ -1417,8 +1590,6 @@ kqswnal_recvmsg (nal_cb_t *nal, #endif lib_finalize(nal, private, libmsg); - kqswnal_requeue_rx (krx); - return (rlen); } @@ -1455,6 +1626,7 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg) return ((int)pid); atomic_inc (&kqswnal_data.kqn_nthreads); + atomic_inc (&kqswnal_data.kqn_nthreads_running); return (0); } @@ -1473,6 +1645,7 @@ kqswnal_scheduler (void *arg) long flags; int rc; int counter = 0; + int shuttingdown = 0; int did_something; kportal_daemonize ("kqswnal_sched"); @@ -1480,9 +1653,21 @@ kqswnal_scheduler (void *arg) spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); - while (!kqswnal_data.kqn_shuttingdown) + for (;;) { - did_something = FALSE; + if (kqswnal_data.kqn_shuttingdown != shuttingdown) { + + if (kqswnal_data.kqn_shuttingdown == 2) + break; + + /* During stage 1 of shutdown we are still responsive + * to receives */ + + atomic_dec (&kqswnal_data.kqn_nthreads_running); + shuttingdown = kqswnal_data.kqn_shuttingdown; + } + + did_something = 0; if (!list_empty (&kqswnal_data.kqn_readyrxds)) { @@ -1494,11 +1679,12 @@ kqswnal_scheduler (void *arg) kqswnal_rx (krx); - did_something = TRUE; + did_something = 1; spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); } - if (!list_empty (&kqswnal_data.kqn_delayedtxds)) + if (!shuttingdown && + !list_empty (&kqswnal_data.kqn_delayedtxds)) { ktx = list_entry(kqswnal_data.kqn_delayedtxds.next, kqswnal_tx_t, ktx_list); @@ -1514,11 +1700,12 @@ kqswnal_scheduler (void *arg) kqswnal_tx_done (ktx, rc); } - did_something = TRUE; + did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } - if (!list_empty (&kqswnal_data.kqn_delayedfwds)) + if (!shuttingdown & + !list_empty (&kqswnal_data.kqn_delayedfwds)) { fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list); list_del (&fwd->kprfd_list); @@ -1526,7 +1713,7 @@ kqswnal_scheduler (void *arg) kqswnal_fwd_packet (NULL, fwd); - did_something = TRUE; + did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } @@ -1539,7 +1726,7 @@ kqswnal_scheduler (void *arg) if (!did_something) { rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq, - kqswnal_data.kqn_shuttingdown || + kqswnal_data.kqn_shuttingdown != shuttingdown || !list_empty(&kqswnal_data.kqn_readyrxds) || !list_empty(&kqswnal_data.kqn_delayedtxds) || !list_empty(&kqswnal_data.kqn_delayedfwds)); diff --git a/lustre/portals/knals/qswnal/qswnal_cb.c b/lustre/portals/knals/qswnal/qswnal_cb.c index 43926c9..96749cd 100644 --- a/lustre/portals/knals/qswnal/qswnal_cb.c +++ b/lustre/portals/knals/qswnal/qswnal_cb.c @@ -26,6 +26,9 @@ #include "qswnal.h" +EP_STATUSBLK kqswnal_rpc_success; +EP_STATUSBLK kqswnal_rpc_failed; + /* * LIB functions follow * @@ -128,9 +131,22 @@ kqswnal_notify_peer_down(kqswnal_tx_t *ktx) void kqswnal_unmap_tx (kqswnal_tx_t *ktx) { +#if MULTIRAIL_EKC + int i; +#endif + if (ktx->ktx_nmappedpages == 0) return; - + +#if MULTIRAIL_EKC + CDEBUG(D_NET, "%p unloading %d frags starting at %d\n", + ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag); + + for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++) + ep_dvma_unload(kqswnal_data.kqn_ep, + kqswnal_data.kqn_ep_tx_nmh, + &ktx->ktx_frags[i]); +#else CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n", ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages); @@ -138,9 +154,11 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx) LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <= kqswnal_data.kqn_eptxdmahandle->NumDvmaPages); - elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState, + elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState, kqswnal_data.kqn_eptxdmahandle, ktx->ktx_basepage, ktx->ktx_nmappedpages); + +#endif ktx->ktx_nmappedpages = 0; } @@ -152,12 +170,24 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) int maxmapped = ktx->ktx_npages; uint32_t basepage = ktx->ktx_basepage + nmapped; char *ptr; +#if MULTIRAIL_EKC + EP_RAILMASK railmask; + int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx, + EP_RAILMASK_ALL, + kqswnal_nid2elanid(ktx->ktx_nid)); + if (rail < 0) { + CERROR("No rails available for "LPX64"\n", ktx->ktx_nid); + return (-ENETDOWN); + } + railmask = 1 << rail; +#endif LASSERT (nmapped <= maxmapped); + LASSERT (nfrags >= ktx->ktx_firsttmpfrag); LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); - + do { int fraglen = kiov->kiov_len; @@ -188,25 +218,40 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) "%p[%d] loading %p for %d, page %d, %d total\n", ktx, nfrags, ptr, fraglen, basepage, nmapped); - elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, +#if MULTIRAIL_EKC + ep_dvma_load(kqswnal_data.kqn_ep, NULL, + ptr, fraglen, + kqswnal_data.kqn_ep_tx_nmh, basepage, + &railmask, &ktx->ktx_frags[nfrags]); + + if (nfrags == ktx->ktx_firsttmpfrag || + !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags])) { + /* new frag if this is the first or can't merge */ + nfrags++; + } +#else + elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState, kqswnal_data.kqn_eptxdmahandle, ptr, fraglen, - basepage, &ktx->ktx_frags.iov[nfrags].Base); - - kunmap (kiov->kiov_page); - - /* keep in loop for failure case */ - ktx->ktx_nmappedpages = nmapped; + basepage, &ktx->ktx_frags[nfrags].Base); if (nfrags > 0 && /* previous frag mapped */ - ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */ - (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len)) + ktx->ktx_frags[nfrags].Base == /* contiguous with this one */ + (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len)) /* just extend previous */ - ktx->ktx_frags.iov[nfrags - 1].Len += fraglen; + ktx->ktx_frags[nfrags - 1].Len += fraglen; else { - ktx->ktx_frags.iov[nfrags].Len = fraglen; + ktx->ktx_frags[nfrags].Len = fraglen; nfrags++; /* new frag */ } +#endif + + kunmap (kiov->kiov_page); + + /* keep in loop for failure case */ + ktx->ktx_nmappedpages = nmapped; basepage++; kiov++; @@ -232,8 +277,20 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) int nmapped = ktx->ktx_nmappedpages; int maxmapped = ktx->ktx_npages; uint32_t basepage = ktx->ktx_basepage + nmapped; - +#if MULTIRAIL_EKC + EP_RAILMASK railmask; + int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx, + EP_RAILMASK_ALL, + kqswnal_nid2elanid(ktx->ktx_nid)); + + if (rail < 0) { + CERROR("No rails available for "LPX64"\n", ktx->ktx_nid); + return (-ENETDOWN); + } + railmask = 1 << rail; +#endif LASSERT (nmapped <= maxmapped); + LASSERT (nfrags >= ktx->ktx_firsttmpfrag); LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); @@ -263,22 +320,38 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) ktx, nfrags, iov->iov_base, fraglen, basepage, npages, nmapped); - elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, +#if MULTIRAIL_EKC + ep_dvma_load(kqswnal_data.kqn_ep, NULL, + iov->iov_base, fraglen, + kqswnal_data.kqn_ep_tx_nmh, basepage, + &railmask, &ktx->ktx_frags[nfrags]); + + if (nfrags == ktx->ktx_firsttmpfrag || + !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags])) { + /* new frag if this is the first or can't merge */ + nfrags++; + } +#else + elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState, kqswnal_data.kqn_eptxdmahandle, iov->iov_base, fraglen, - basepage, &ktx->ktx_frags.iov[nfrags].Base); - /* keep in loop for failure case */ - ktx->ktx_nmappedpages = nmapped; + basepage, &ktx->ktx_frags[nfrags].Base); if (nfrags > 0 && /* previous frag mapped */ - ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */ - (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len)) + ktx->ktx_frags[nfrags].Base == /* contiguous with this one */ + (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len)) /* just extend previous */ - ktx->ktx_frags.iov[nfrags - 1].Len += fraglen; + ktx->ktx_frags[nfrags - 1].Len += fraglen; else { - ktx->ktx_frags.iov[nfrags].Len = fraglen; + ktx->ktx_frags[nfrags].Len = fraglen; nfrags++; /* new frag */ } +#endif + + /* keep in loop for failure case */ + ktx->ktx_nmappedpages = nmapped; basepage += npages; iov++; @@ -424,7 +497,6 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) break; case KTX_GETTING: /* Peer has DMA-ed direct? */ - LASSERT (KQSW_OPTIMIZE_GETS); msg = (lib_msg_t *)ktx->ktx_args[1]; repmsg = NULL; @@ -455,8 +527,8 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status) CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status); - if (status != EP_SUCCESS) - { + if (status != EP_SUCCESS) { + CERROR ("Tx completion to "LPX64" failed: %d\n", ktx->ktx_nid, status); @@ -466,8 +538,11 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status) } else if (ktx->ktx_state == KTX_GETTING) { /* RPC completed OK; what did our peer put in the status * block? */ - LASSERT (KQSW_OPTIMIZE_GETS); +#if MULTIRAIL_EKC + status = ep_txd_statusblk(txd)->Data[0]; +#else status = ep_txd_statusblk(txd)->Status; +#endif } else { status = 0; } @@ -488,21 +563,38 @@ kqswnal_launch (kqswnal_tx_t *ktx) LASSERT (dest >= 0); /* must be a peer */ if (ktx->ktx_state == KTX_GETTING) { - LASSERT (KQSW_OPTIMIZE_GETS); + /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The + * other frags are the GET sink which we obviously don't + * send here :) */ +#if MULTIRAIL_EKC + rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest, + ktx->ktx_port, attr, + kqswnal_txhandler, ktx, + NULL, ktx->ktx_frags, 1); +#else rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest, ktx->ktx_port, attr, kqswnal_txhandler, - ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag); + ktx, NULL, ktx->ktx_frags, 1); +#endif } else { +#if MULTIRAIL_EKC + rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest, + ktx->ktx_port, attr, + kqswnal_txhandler, ktx, + NULL, ktx->ktx_frags, ktx->ktx_nfrag); +#else rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest, - ktx->ktx_port, attr, kqswnal_txhandler, - ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag); + ktx->ktx_port, attr, + kqswnal_txhandler, ktx, + ktx->ktx_frags, ktx->ktx_nfrag); +#endif } switch (rc) { - case ESUCCESS: /* success */ + case EP_SUCCESS: /* success */ return (0); - case ENOMEM: /* can't allocate ep txd => queue for later */ + case EP_ENOMEM: /* can't allocate ep txd => queue for later */ LASSERT (in_interrupt()); spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); @@ -516,7 +608,7 @@ kqswnal_launch (kqswnal_tx_t *ktx) default: /* fatal error */ CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc); kqswnal_notify_peer_down(ktx); - return (rc); + return (-EHOSTUNREACH); } } @@ -589,6 +681,7 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr) } /* end of print_hdr() */ +#if !MULTIRAIL_EKC void kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) { @@ -648,6 +741,7 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv, CERROR ("DATAVEC too small\n"); return (-E2BIG); } +#endif int kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, @@ -656,14 +750,17 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; char *buffer = (char *)page_address(krx->krx_pages[0]); kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE); - EP_IOVEC eiov[EP_MAXFRAG]; - EP_STATUSBLK blk; int rc; - - LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed); +#if MULTIRAIL_EKC + int i; +#else + EP_DATAVEC datav[EP_MAXFRAG]; + int ndatav; +#endif + LASSERT (krx->krx_rpc_reply_needed); LASSERT ((iov == NULL) != (kiov == NULL)); - /* see .*_pack_k?iov comment regarding endian-ness */ + /* see kqswnal_sendmsg comment regarding endian-ness */ if (buffer + krx->krx_nob < (char *)(rmd + 1)) { /* msg too small to discover rmd size */ CERROR ("Incoming message [%d] too small for RMD (%d needed)\n", @@ -671,16 +768,16 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, return (-EINVAL); } - if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) { + if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) { /* rmd doesn't fit in the incoming message */ CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n", - krx->krx_nob, rmd->kqrmd_neiov, - (int)(((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer)); + krx->krx_nob, rmd->kqrmd_nfrag, + (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer)); return (-EINVAL); } - /* Ghastly hack part 1, uses the existing procedures to map the source data... */ - ktx->ktx_nfrag = 0; + /* Map the source data... */ + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; if (kiov != NULL) rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov); else @@ -691,32 +788,61 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, return (rc); } - /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */ - memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0])); - - rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav, - ktx->ktx_nfrag, eiov, - rmd->kqrmd_neiov, rmd->kqrmd_eiov); - if (rc < 0) { - CERROR ("Can't create datavec: %d\n", rc); - return (rc); +#if MULTIRAIL_EKC + if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) { + CERROR("Can't cope with unequal # frags: %d local %d remote\n", + ktx->ktx_nfrag, rmd->kqrmd_nfrag); + return (-EINVAL); } - ktx->ktx_nfrag = rc; - - memset (&blk, 0, sizeof (blk)); /* zero blk.Status */ + + for (i = 0; i < rmd->kqrmd_nfrag; i++) + if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) { + CERROR("Can't cope with unequal frags %d(%d):" + " %d local %d remote\n", + i, rmd->kqrmd_nfrag, + ktx->ktx_frags[i].nmd_len, + rmd->kqrmd_frag[i].nmd_len); + return (-EINVAL); + } +#else + ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav, + ktx->ktx_nfrag, ktx->ktx_frags, + rmd->kqrmd_nfrag, rmd->kqrmd_frag); + if (ndatav < 0) { + CERROR ("Can't create datavec: %d\n", ndatav); + return (ndatav); + } +#endif - /* Our caller will start to race with kqswnal_rpc_complete... */ + /* Our caller will start to race with kqswnal_dma_reply_complete... */ LASSERT (atomic_read (&krx->krx_refcount) == 1); atomic_set (&krx->krx_refcount, 2); - rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx, - &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag); - if (rc == ESUCCESS) +#if MULTIRAIL_EKC + rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, + &kqswnal_rpc_success, + ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag); + if (rc == EP_SUCCESS) + return (0); + + /* Well we tried... */ + krx->krx_rpc_reply_needed = 0; +#else + rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx, + &kqswnal_rpc_success, datav, ndatav); + if (rc == EP_SUCCESS) return (0); + /* "old" EKC destroys rxd on failed completion */ + krx->krx_rxd = NULL; +#endif + + CERROR("can't complete RPC: %d\n", rc); + /* reset refcount back to 1: we're not going to be racing with - * kqswnal_rely_complete. */ + * kqswnal_dma_reply_complete. */ atomic_set (&krx->krx_refcount, 1); + return (-ECONNABORTED); } @@ -785,12 +911,12 @@ kqswnal_sendmsg (nal_cb_t *nal, return (PTL_NOSPACE); } + ktx->ktx_nid = targetnid; ktx->ktx_args[0] = private; ktx->ktx_args[1] = libmsg; -#if KQSW_OPTIMIZE_GETS if (type == PTL_MSG_REPLY && - ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) { + ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { if (nid != targetnid || kqswnal_nid2elanid(nid) != ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) { @@ -798,7 +924,7 @@ kqswnal_sendmsg (nal_cb_t *nal, "nid "LPX64" via "LPX64" elanID %d\n", nid, targetnid, ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)); - return(PTL_FAIL); + return (PTL_FAIL); } /* peer expects RPC completion with GET data */ @@ -806,13 +932,12 @@ kqswnal_sendmsg (nal_cb_t *nal, payload_niov, payload_iov, payload_kiov, payload_nob); if (rc == 0) - return (0); + return (PTL_OK); CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); kqswnal_put_idle_tx (ktx); return (PTL_FAIL); } -#endif memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */ ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; @@ -838,15 +963,8 @@ kqswnal_sendmsg (nal_cb_t *nal, memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum)); #endif - /* Set up first frag from pre-mapped buffer (it's at least the - * portals header) */ - ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; - ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE; - ktx->ktx_nfrag = 1; - ktx->ktx_state = KTX_SENDING; /* => lib_finalize() on completion */ - -#if KQSW_OPTIMIZE_GETS - if (type == PTL_MSG_GET && /* doing a GET */ + if (kqswnal_data.kqn_optimized_gets && + type == PTL_MSG_GET && /* doing a GET */ nid == targetnid) { /* not forwarding */ lib_md_t *md = libmsg->md; kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE); @@ -856,8 +974,8 @@ kqswnal_sendmsg (nal_cb_t *nal, * * First I set up ktx as if it was going to send this * payload, (it needs to map it anyway). This fills - * ktx_frags.iov[1] and onward with the network addresses - * of the get sink frags. I copy these into ktx_buffer, + * ktx_frags[1] and onward with the network addresses + * of the GET sink frags. I copy these into ktx_buffer, * immediately after the header, and send that as my GET * message. * @@ -865,6 +983,9 @@ kqswnal_sendmsg (nal_cb_t *nal, * When EKC copes with different endian nodes, I'll fix * this (and eat my hat :) */ + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; + ktx->ktx_state = KTX_GETTING; + if ((libmsg->md->options & PTL_MD_KIOV) != 0) rc = kqswnal_map_tx_kiov (ktx, md->length, md->md_niov, md->md_iov.kiov); @@ -877,46 +998,73 @@ kqswnal_sendmsg (nal_cb_t *nal, return (PTL_FAIL); } - rmd->kqrmd_neiov = ktx->ktx_nfrag - 1; - memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1], - rmd->kqrmd_neiov * sizeof (EP_IOVEC)); + rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1; - ktx->ktx_nfrag = 1; - ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t, - kqrmd_eiov[rmd->kqrmd_neiov]); - payload_nob = ktx->ktx_frags.iov[0].Len; - ktx->ktx_state = KTX_GETTING; - } else + payload_nob = offsetof(kqswnal_remotemd_t, + kqrmd_frag[rmd->kqrmd_nfrag]); + LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE); + +#if MULTIRAIL_EKC + memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1], + rmd->kqrmd_nfrag * sizeof(EP_NMD)); + + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, KQSW_HDR_SIZE + payload_nob); +#else + memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1], + rmd->kqrmd_nfrag * sizeof(EP_IOVEC)); + + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob; +#endif + } else if (payload_nob <= KQSW_TX_MAXCONTIG) { + + /* small message: single frag copied into the pre-mapped buffer */ + + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; + ktx->ktx_state = KTX_SENDING; +#if MULTIRAIL_EKC + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, KQSW_HDR_SIZE + payload_nob); +#else + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob; #endif - if (payload_nob > 0) { /* got some payload (something more to do) */ - /* make a single contiguous message? */ - if (payload_nob <= KQSW_TX_MAXCONTIG) { - /* copy payload to ktx_buffer, immediately after hdr */ + if (payload_nob > 0) { if (payload_kiov != NULL) lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, payload_niov, payload_kiov, payload_nob); else lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, payload_niov, payload_iov, payload_nob); - /* first frag includes payload */ - ktx->ktx_frags.iov[0].Len += payload_nob; - } else { - if (payload_kiov != NULL) - rc = kqswnal_map_tx_kiov (ktx, payload_nob, - payload_niov, payload_kiov); - else - rc = kqswnal_map_tx_iov (ktx, payload_nob, - payload_niov, payload_iov); - if (rc != 0) { - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } - } - } + } + } else { - ktx->ktx_nid = targetnid; + /* large message: multiple frags: first is hdr in pre-mapped buffer */ + + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; + ktx->ktx_state = KTX_SENDING; +#if MULTIRAIL_EKC + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, KQSW_HDR_SIZE); +#else + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE; +#endif + if (payload_kiov != NULL) + rc = kqswnal_map_tx_kiov (ktx, payload_nob, + payload_niov, payload_kiov); + else + rc = kqswnal_map_tx_iov (ktx, payload_nob, + payload_niov, payload_iov); + if (rc != 0) { + kqswnal_put_idle_tx (ktx); + return (PTL_FAIL); + } + } + ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ? - EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE; + EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; rc = kqswnal_launch (ktx); if (rc != 0) { /* failed? */ @@ -962,8 +1110,6 @@ kqswnal_send_pages (nal_cb_t *nal, payload_niov, NULL, payload_kiov, payload_nob)); } -int kqswnal_fwd_copy_contig = 0; - void kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { @@ -984,7 +1130,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) LASSERT (niov > 0); - ktx = kqswnal_get_idle_tx (fwd, FALSE); + ktx = kqswnal_get_idle_tx (fwd, 0); if (ktx == NULL) /* can't get txd right now */ return; /* fwd will be scheduled when tx desc freed */ @@ -1005,20 +1151,31 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) goto failed; } - if ((kqswnal_fwd_copy_contig || niov > 1) && + ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ? + EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; + ktx->ktx_nid = nid; + ktx->ktx_state = KTX_FORWARDING; + ktx->ktx_args[0] = fwd; + + if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) && nob <= KQSW_TX_BUFFER_SIZE) { - /* send from ktx's pre-allocated/mapped contiguous buffer? */ + /* send from ktx's pre-mapped contiguous buffer? */ lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob); - ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */ - ktx->ktx_frags.iov[0].Len = nob; - ktx->ktx_nfrag = 1; +#if MULTIRAIL_EKC + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, nob); +#else + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = nob; +#endif + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; } else { /* zero copy */ - ktx->ktx_nfrag = 0; /* no frags mapped yet */ + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; rc = kqswnal_map_tx_iov (ktx, nob, niov, iov); if (rc != 0) goto failed; @@ -1026,12 +1183,6 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base; } - ktx->ktx_port = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ? - EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE; - ktx->ktx_nid = nid; - ktx->ktx_state = KTX_FORWARDING; /* kpr_put_packet() on completion */ - ktx->ktx_args[0] = fwd; - rc = kqswnal_launch (ktx); if (rc == 0) return; @@ -1064,7 +1215,7 @@ kqswnal_fwd_callback (void *arg, int error) } void -kqswnal_reply_complete (EP_RXD *rxd) +kqswnal_dma_reply_complete (EP_RXD *rxd) { int status = ep_rxd_status(rxd); kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd); @@ -1075,9 +1226,10 @@ kqswnal_reply_complete (EP_RXD *rxd) "rxd %p, ktx %p, status %d\n", rxd, ktx, status); LASSERT (krx->krx_rxd == rxd); + LASSERT (krx->krx_rpc_reply_needed); - krx->krx_rpc_completed = 1; - kqswnal_requeue_rx (krx); + krx->krx_rpc_reply_needed = 0; + kqswnal_rx_done (krx); lib_finalize (&kqswnal_lib, NULL, msg); kqswnal_put_idle_tx (ktx); @@ -1093,67 +1245,76 @@ kqswnal_rpc_complete (EP_RXD *rxd) "rxd %p, krx %p, status %d\n", rxd, krx, status); LASSERT (krx->krx_rxd == rxd); + LASSERT (krx->krx_rpc_reply_needed); - krx->krx_rpc_completed = 1; + krx->krx_rpc_reply_needed = 0; kqswnal_requeue_rx (krx); } void -kqswnal_requeue_rx (kqswnal_rx_t *krx) +kqswnal_requeue_rx (kqswnal_rx_t *krx) { - EP_STATUSBLK blk; - int rc; + int rc; - LASSERT (atomic_read (&krx->krx_refcount) > 0); - if (!atomic_dec_and_test (&krx->krx_refcount)) - return; + LASSERT (atomic_read(&krx->krx_refcount) == 0); - if (!ep_rxd_isrpc(krx->krx_rxd) || - krx->krx_rpc_completed) { + if (krx->krx_rpc_reply_needed) { - /* don't actually requeue on shutdown */ - if (kqswnal_data.kqn_shuttingdown) + /* We failed to complete the peer's optimized GET (e.g. we + * couldn't map the source buffers). We complete the + * peer's EKC rpc now with failure. */ +#if MULTIRAIL_EKC + rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx, + &kqswnal_rpc_failed, NULL, NULL, 0); + if (rc == EP_SUCCESS) return; - ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx, - krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE); - return; - } - - /* Sender wanted an RPC, but we didn't complete it (we must have - * dropped the sender's message). We complete it now with - * failure... */ - memset (&blk, 0, sizeof (blk)); - blk.Status = -ECONNREFUSED; - - atomic_set (&krx->krx_refcount, 1); + CERROR("can't complete RPC: %d\n", rc); +#else + if (krx->krx_rxd != NULL) { + /* We didn't try (and fail) to complete earlier... */ + rc = ep_complete_rpc(krx->krx_rxd, + kqswnal_rpc_complete, krx, + &kqswnal_rpc_failed, NULL, 0); + if (rc == EP_SUCCESS) + return; + + CERROR("can't complete RPC: %d\n", rc); + } + + /* NB the old ep_complete_rpc() frees rxd on failure, so we + * have to requeue from scratch here, unless we're shutting + * down */ + if (kqswnal_data.kqn_shuttingdown) + return; - rc = ep_complete_rpc (krx->krx_rxd, - kqswnal_rpc_complete, krx, - &blk, NULL, 0); - if (rc == ESUCCESS) { - /* callback will call me again to requeue, having set - * krx_rpc_completed... */ + rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx, + krx->krx_elanbuffer, + krx->krx_npages * PAGE_SIZE, 0); + LASSERT (rc == EP_SUCCESS); + /* We don't handle failure here; it's incredibly rare + * (never reported?) and only happens with "old" EKC */ return; +#endif } - CERROR("can't complete RPC: %d\n", rc); - - /* we don't actually requeue on shutdown */ - if (kqswnal_data.kqn_shuttingdown) - return; - - /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue - * from scratch here... */ - rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx, - krx->krx_elanaddr, - krx->krx_npages * PAGE_SIZE, 0); - - LASSERT (rc == ESUCCESS); - /* This needs to be fixed by ep_complete_rpc NOT freeing - * krx->krx_rxd on failure so we can just ep_requeue_receive() */ +#if MULTIRAIL_EKC + if (kqswnal_data.kqn_shuttingdown) { + /* free EKC rxd on shutdown */ + ep_complete_receive(krx->krx_rxd); + } else { + /* repost receive */ + ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx, + &krx->krx_elanbuffer, 0); + } +#else + /* don't actually requeue on shutdown */ + if (!kqswnal_data.kqn_shuttingdown) + ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx, + krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE); +#endif } - + void kqswnal_rx (kqswnal_rx_t *krx) { @@ -1162,9 +1323,12 @@ kqswnal_rx (kqswnal_rx_t *krx) int nob; int niov; + LASSERT (atomic_read(&krx->krx_refcount) == 0); + if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */ - /* NB krx requeued when lib_parse() calls back kqswnal_recv */ + atomic_set(&krx->krx_refcount, 1); lib_parse (&kqswnal_lib, hdr, krx); + kqswnal_rx_done(krx); return; } @@ -1212,18 +1376,27 @@ kqswnal_rxhandler(EP_RXD *rxd) krx->krx_rxd = rxd; krx->krx_nob = nob; - LASSERT (atomic_read (&krx->krx_refcount) == 0); - atomic_set (&krx->krx_refcount, 1); - krx->krx_rpc_completed = 0; +#if MULTIRAIL_EKC + krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd); +#else + krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd); +#endif /* must receive a whole header to be able to parse */ if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t)) { /* receives complete with failure when receiver is removed */ +#if MULTIRAIL_EKC + if (status == EP_SHUTDOWN) + LASSERT (kqswnal_data.kqn_shuttingdown); + else + CERROR("receive status failed with status %d nob %d\n", + ep_rxd_status(rxd), nob); +#else if (!kqswnal_data.kqn_shuttingdown) CERROR("receive status failed with status %d nob %d\n", ep_rxd_status(rxd), nob); - +#endif kqswnal_requeue_rx (krx); return; } @@ -1417,8 +1590,6 @@ kqswnal_recvmsg (nal_cb_t *nal, #endif lib_finalize(nal, private, libmsg); - kqswnal_requeue_rx (krx); - return (rlen); } @@ -1455,6 +1626,7 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg) return ((int)pid); atomic_inc (&kqswnal_data.kqn_nthreads); + atomic_inc (&kqswnal_data.kqn_nthreads_running); return (0); } @@ -1473,6 +1645,7 @@ kqswnal_scheduler (void *arg) long flags; int rc; int counter = 0; + int shuttingdown = 0; int did_something; kportal_daemonize ("kqswnal_sched"); @@ -1480,9 +1653,21 @@ kqswnal_scheduler (void *arg) spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); - while (!kqswnal_data.kqn_shuttingdown) + for (;;) { - did_something = FALSE; + if (kqswnal_data.kqn_shuttingdown != shuttingdown) { + + if (kqswnal_data.kqn_shuttingdown == 2) + break; + + /* During stage 1 of shutdown we are still responsive + * to receives */ + + atomic_dec (&kqswnal_data.kqn_nthreads_running); + shuttingdown = kqswnal_data.kqn_shuttingdown; + } + + did_something = 0; if (!list_empty (&kqswnal_data.kqn_readyrxds)) { @@ -1494,11 +1679,12 @@ kqswnal_scheduler (void *arg) kqswnal_rx (krx); - did_something = TRUE; + did_something = 1; spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); } - if (!list_empty (&kqswnal_data.kqn_delayedtxds)) + if (!shuttingdown && + !list_empty (&kqswnal_data.kqn_delayedtxds)) { ktx = list_entry(kqswnal_data.kqn_delayedtxds.next, kqswnal_tx_t, ktx_list); @@ -1514,11 +1700,12 @@ kqswnal_scheduler (void *arg) kqswnal_tx_done (ktx, rc); } - did_something = TRUE; + did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } - if (!list_empty (&kqswnal_data.kqn_delayedfwds)) + if (!shuttingdown & + !list_empty (&kqswnal_data.kqn_delayedfwds)) { fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list); list_del (&fwd->kprfd_list); @@ -1526,7 +1713,7 @@ kqswnal_scheduler (void *arg) kqswnal_fwd_packet (NULL, fwd); - did_something = TRUE; + did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } @@ -1539,7 +1726,7 @@ kqswnal_scheduler (void *arg) if (!did_something) { rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq, - kqswnal_data.kqn_shuttingdown || + kqswnal_data.kqn_shuttingdown != shuttingdown || !list_empty(&kqswnal_data.kqn_readyrxds) || !list_empty(&kqswnal_data.kqn_delayedtxds) || !list_empty(&kqswnal_data.kqn_delayedfwds));