X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fqswlnd%2Fqswlnd_cb.c;h=eec1a6b0c26c0e987f4d73a83f83a76d595bc239;hp=3b47a251d9e874262c4dae45cdbf50c9aeb42fa9;hb=59071a8334bbc1a3a6d31565b7474063438d1f43;hpb=96ec6856f91f7f9031cfce4273c714d72cfe59ae diff --git a/lnet/klnds/qswlnd/qswlnd_cb.c b/lnet/klnds/qswlnd/qswlnd_cb.c index 3b47a25..eec1a6b 100644 --- a/lnet/klnds/qswlnd/qswlnd_cb.c +++ b/lnet/klnds/qswlnd/qswlnd_cb.c @@ -1,13 +1,11 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: +/* + * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * - * Copyright (C) 2002 Cluster File Systems, Inc. - * Author: Eric Barton + * Copyright (c) 2012, Intel Corporation. * - * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL) - * W. Marcus Miller - Based on ksocknal + * Author: Eric Barton * - * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ + * This file is part of Portals, http://www.lustre.org * * Portals is free software; you can redistribute it and/or * modify it under the terms of version 2 of the GNU General Public @@ -21,145 +19,90 @@ * You should have received a copy of the GNU General Public License * along with Portals; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - * */ -#include "qswnal.h" - -atomic_t kqswnal_packets_launched; -atomic_t kqswnal_packets_transmitted; -atomic_t kqswnal_packets_received; - - -/* - * LIB functions follow - * - */ -static int -kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, - size_t len) -{ - CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n", - nal->ni.nid, len, src_addr, dst_addr ); - memcpy( dst_addr, src_addr, len ); - - return (0); -} - -static int -kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, - size_t len) -{ - CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n", - nal->ni.nid, len, src_addr, dst_addr ); - memcpy( dst_addr, src_addr, len ); - - return (0); -} - -static void * -kqswnal_malloc(nal_cb_t *nal, size_t len) -{ - void *buf; - - PORTAL_ALLOC(buf, len); - return (buf); -} - -static void -kqswnal_free(nal_cb_t *nal, void *buf, size_t len) -{ - PORTAL_FREE(buf, len); -} - -static void -kqswnal_printf (nal_cb_t * nal, const char *fmt, ...) -{ - va_list ap; - char msg[256]; - - va_start (ap, fmt); - vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */ - va_end (ap); - - msg[sizeof (msg) - 1] = 0; /* ensure terminated */ +#include "qswlnd.h" - CDEBUG (D_NET, "%s", msg); -} - - -static void -kqswnal_cli(nal_cb_t *nal, unsigned long *flags) -{ - kqswnal_data_t *data= nal->nal_data; - - spin_lock_irqsave(&data->kqn_statelock, *flags); -} - - -static void -kqswnal_sti(nal_cb_t *nal, unsigned long *flags) +void +kqswnal_notify_peer_down(kqswnal_tx_t *ktx) { - kqswnal_data_t *data= nal->nal_data; + time_t then; - spin_unlock_irqrestore(&data->kqn_statelock, *flags); -} + then = cfs_time_current_sec() - + cfs_duration_sec(cfs_time_current() - + ktx->ktx_launchtime); - -static int -kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist) -{ - if (nid == nal->ni.nid) - *dist = 0; /* it's me */ - else if (kqswnal_nid2elanid (nid) >= 0) - *dist = 1; /* it's my peer */ - else - *dist = 2; /* via router */ - return (0); + lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then); } void kqswnal_unmap_tx (kqswnal_tx_t *ktx) { + int i; + + ktx->ktx_rail = -1; /* unset rail */ + if (ktx->ktx_nmappedpages == 0) return; + + CDEBUG(D_NET, "%p unloading %d frags starting at %d\n", + ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag); - CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n", - ktx, ktx->ktx_niov, ktx->ktx_basepage, ktx->ktx_nmappedpages); - - LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages); - LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <= - kqswnal_data.kqn_eptxdmahandle->NumDvmaPages); + for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++) + ep_dvma_unload(kqswnal_data.kqn_ep, + kqswnal_data.kqn_ep_tx_nmh, + &ktx->ktx_frags[i]); - elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState, - kqswnal_data.kqn_eptxdmahandle, - ktx->ktx_basepage, ktx->ktx_nmappedpages); ktx->ktx_nmappedpages = 0; } int -kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) +kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, + unsigned int niov, lnet_kiov_t *kiov) { - int nfrags = ktx->ktx_niov; - const int maxfrags = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]); + int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; int maxmapped = ktx->ktx_npages; - uint32_t basepage = ktx->ktx_basepage + nmapped; + __u32 basepage = ktx->ktx_basepage + nmapped; char *ptr; - + + EP_RAILMASK railmask; + int rail; + + if (ktx->ktx_rail < 0) + ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx, + EP_RAILMASK_ALL, + kqswnal_nid2elanid(ktx->ktx_nid)); + rail = ktx->ktx_rail; + if (rail < 0) { + CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid)); + return (-ENETDOWN); + } + railmask = 1 << rail; + LASSERT (nmapped <= maxmapped); - LASSERT (nfrags <= maxfrags); + LASSERT (nfrags >= ktx->ktx_firsttmpfrag); + LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); - + + /* skip complete frags before 'offset' */ + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + do { - int fraglen = kiov->kiov_len; + int fraglen = kiov->kiov_len - offset; - /* nob exactly spans the iovs */ - LASSERT (fraglen <= nob); - /* each frag fits in a page */ + /* each page frag is contained in one page */ LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE); + if (fraglen > nob) + fraglen = nob; + nmapped++; if (nmapped > maxmapped) { CERROR("Can't map message in %d pages (max %d)\n", @@ -167,79 +110,152 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) return (-EMSGSIZE); } - if (nfrags == maxfrags) { + if (nfrags == EP_MAXFRAG) { CERROR("Message too fragmented in Elan VM (max %d frags)\n", - maxfrags); + EP_MAXFRAG); return (-EMSGSIZE); } /* XXX this is really crap, but we'll have to kmap until * EKC has a page (rather than vaddr) mapping interface */ - ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; + ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset; CDEBUG(D_NET, "%p[%d] loading %p for %d, page %d, %d total\n", ktx, nfrags, ptr, fraglen, basepage, nmapped); - elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, - kqswnal_data.kqn_eptxdmahandle, - ptr, fraglen, - basepage, &ktx->ktx_iov[nfrags].Base); + ep_dvma_load(kqswnal_data.kqn_ep, NULL, + ptr, fraglen, + kqswnal_data.kqn_ep_tx_nmh, basepage, + &railmask, &ktx->ktx_frags[nfrags]); + + if (nfrags == ktx->ktx_firsttmpfrag || + !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags])) { + /* new frag if this is the first or can't merge */ + nfrags++; + } kunmap (kiov->kiov_page); /* keep in loop for failure case */ ktx->ktx_nmappedpages = nmapped; - if (nfrags > 0 && /* previous frag mapped */ - ktx->ktx_iov[nfrags].Base == /* contiguous with this one */ - (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len)) - /* just extend previous */ - ktx->ktx_iov[nfrags - 1].Len += fraglen; - else { - ktx->ktx_iov[nfrags].Len = fraglen; - nfrags++; /* new frag */ - } - basepage++; kiov++; niov--; nob -= fraglen; + offset = 0; /* iov must not run out before end of data */ LASSERT (nob == 0 || niov > 0); } while (nob > 0); - ktx->ktx_niov = nfrags; + ktx->ktx_nfrag = nfrags; CDEBUG (D_NET, "%p got %d frags over %d pages\n", - ktx, ktx->ktx_niov, ktx->ktx_nmappedpages); + ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages); return (0); } +#if KQSW_CKSUM +__u32 +kqswnal_csum_kiov (__u32 csum, int offset, int nob, + unsigned int niov, lnet_kiov_t *kiov) +{ + char *ptr; + + if (nob == 0) + return csum; + + LASSERT (niov > 0); + LASSERT (nob > 0); + + /* skip complete frags before 'offset' */ + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + + do { + int fraglen = kiov->kiov_len - offset; + + /* each page frag is contained in one page */ + LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE); + + if (fraglen > nob) + fraglen = nob; + + ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset; + + csum = kqswnal_csum(csum, ptr, fraglen); + + kunmap (kiov->kiov_page); + + kiov++; + niov--; + nob -= fraglen; + offset = 0; + + /* iov must not run out before end of data */ + LASSERT (nob == 0 || niov > 0); + + } while (nob > 0); + + return csum; +} +#endif + int -kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) +kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, + unsigned int niov, struct iovec *iov) { - int nfrags = ktx->ktx_niov; - const int maxfrags = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]); + int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; int maxmapped = ktx->ktx_npages; - uint32_t basepage = ktx->ktx_basepage + nmapped; + __u32 basepage = ktx->ktx_basepage + nmapped; + + EP_RAILMASK railmask; + int rail; + + if (ktx->ktx_rail < 0) + ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx, + EP_RAILMASK_ALL, + kqswnal_nid2elanid(ktx->ktx_nid)); + rail = ktx->ktx_rail; + if (rail < 0) { + CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid)); + return (-ENETDOWN); + } + railmask = 1 << rail; LASSERT (nmapped <= maxmapped); - LASSERT (nfrags <= maxfrags); + LASSERT (nfrags >= ktx->ktx_firsttmpfrag); + LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); + /* skip complete frags before offset */ + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + do { - int fraglen = iov->iov_len; - long npages = kqswnal_pages_spanned (iov->iov_base, fraglen); - - /* nob exactly spans the iovs */ - LASSERT (fraglen <= nob); + int fraglen = iov->iov_len - offset; + long npages; + if (fraglen > nob) + fraglen = nob; + npages = kqswnal_pages_spanned (iov->iov_base, fraglen); + nmapped += npages; if (nmapped > maxmapped) { CERROR("Can't map message in %d pages (max %d)\n", @@ -247,170 +263,231 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) return (-EMSGSIZE); } - if (nfrags == maxfrags) { + if (nfrags == EP_MAXFRAG) { CERROR("Message too fragmented in Elan VM (max %d frags)\n", - maxfrags); + EP_MAXFRAG); return (-EMSGSIZE); } CDEBUG(D_NET, "%p[%d] loading %p for %d, pages %d for %ld, %d total\n", - ktx, nfrags, iov->iov_base, fraglen, basepage, npages, - nmapped); + ktx, nfrags, iov->iov_base + offset, fraglen, + basepage, npages, nmapped); + + ep_dvma_load(kqswnal_data.kqn_ep, NULL, + iov->iov_base + offset, fraglen, + kqswnal_data.kqn_ep_tx_nmh, basepage, + &railmask, &ktx->ktx_frags[nfrags]); + + if (nfrags == ktx->ktx_firsttmpfrag || + !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags - 1], + &ktx->ktx_frags[nfrags])) { + /* new frag if this is the first or can't merge */ + nfrags++; + } - elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, - kqswnal_data.kqn_eptxdmahandle, - iov->iov_base, fraglen, - basepage, &ktx->ktx_iov[nfrags].Base); /* keep in loop for failure case */ ktx->ktx_nmappedpages = nmapped; - if (nfrags > 0 && /* previous frag mapped */ - ktx->ktx_iov[nfrags].Base == /* contiguous with this one */ - (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len)) - /* just extend previous */ - ktx->ktx_iov[nfrags - 1].Len += fraglen; - else { - ktx->ktx_iov[nfrags].Len = fraglen; - nfrags++; /* new frag */ - } - basepage += npages; iov++; niov--; nob -= fraglen; + offset = 0; /* iov must not run out before end of data */ LASSERT (nob == 0 || niov > 0); } while (nob > 0); - ktx->ktx_niov = nfrags; + ktx->ktx_nfrag = nfrags; CDEBUG (D_NET, "%p got %d frags over %d pages\n", - ktx, ktx->ktx_niov, ktx->ktx_nmappedpages); + ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages); return (0); } -void -kqswnal_put_idle_tx (kqswnal_tx_t *ktx) +#if KQSW_CKSUM +__u32 +kqswnal_csum_iov (__u32 csum, int offset, int nob, + unsigned int niov, struct iovec *iov) { - kpr_fwd_desc_t *fwd = NULL; - struct list_head *idle = ktx->ktx_idle; - unsigned long flags; - - kqswnal_unmap_tx (ktx); /* release temporary mappings */ - ktx->ktx_state = KTX_IDLE; + if (nob == 0) + return csum; + + LASSERT (niov > 0); + LASSERT (nob > 0); - spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags); + /* skip complete frags before offset */ + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + + do { + int fraglen = iov->iov_len - offset; + + if (fraglen > nob) + fraglen = nob; - list_add (&ktx->ktx_list, idle); + csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen); - /* reserved for non-blocking tx */ - if (idle == &kqswnal_data.kqn_nblk_idletxds) { - spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags); - return; - } + iov++; + niov--; + nob -= fraglen; + offset = 0; - /* anything blocking for a tx descriptor? */ - if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */ - { - CDEBUG(D_NET,"wakeup fwd\n"); + /* iov must not run out before end of data */ + LASSERT (nob == 0 || niov > 0); - fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next, - kpr_fwd_desc_t, kprfd_list); - list_del (&fwd->kprfd_list); - } + } while (nob > 0); - if (waitqueue_active (&kqswnal_data.kqn_idletxd_waitq)) /* process? */ - { - /* local sender waiting for tx desc */ - CDEBUG(D_NET,"wakeup process\n"); - wake_up (&kqswnal_data.kqn_idletxd_waitq); - } + return csum; +} +#endif - spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags); +void +kqswnal_put_idle_tx (kqswnal_tx_t *ktx) +{ + unsigned long flags; - if (fwd == NULL) - return; + kqswnal_unmap_tx(ktx); /* release temporary mappings */ + ktx->ktx_state = KTX_IDLE; - /* schedule packet for forwarding again */ - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); + spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags); - list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds); - if (waitqueue_active (&kqswnal_data.kqn_sched_waitq)) - wake_up (&kqswnal_data.kqn_sched_waitq); + cfs_list_del(&ktx->ktx_list); /* take off active list */ + cfs_list_add(&ktx->ktx_list, &kqswnal_data.kqn_idletxds); - spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); + spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags); } kqswnal_tx_t * -kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) +kqswnal_get_idle_tx (void) { - unsigned long flags; - kqswnal_tx_t *ktx = NULL; - - for (;;) { - spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags); - - /* "normal" descriptor is free */ - if (!list_empty (&kqswnal_data.kqn_idletxds)) { - ktx = list_entry (kqswnal_data.kqn_idletxds.next, - kqswnal_tx_t, ktx_list); - list_del (&ktx->ktx_list); - break; - } - - /* "normal" descriptor pool is empty */ - - if (fwd != NULL) { /* forwarded packet => queue for idle txd */ - CDEBUG (D_NET, "blocked fwd [%p]\n", fwd); - list_add_tail (&fwd->kprfd_list, - &kqswnal_data.kqn_idletxd_fwdq); - break; - } + unsigned long flags; + kqswnal_tx_t *ktx; - /* doing a local transmit */ - if (!may_block) { - if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) { - CERROR ("intr tx desc pool exhausted\n"); - break; - } + spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags); - ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next, - kqswnal_tx_t, ktx_list); - list_del (&ktx->ktx_list); - break; - } + if (kqswnal_data.kqn_shuttingdown || + cfs_list_empty(&kqswnal_data.kqn_idletxds)) { + spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags); - /* block for idle tx */ + return NULL; + } - spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags); + ktx = cfs_list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t, + ktx_list); + cfs_list_del (&ktx->ktx_list); - CDEBUG (D_NET, "blocking for tx desc\n"); - wait_event (kqswnal_data.kqn_idletxd_waitq, - !list_empty (&kqswnal_data.kqn_idletxds)); - } + cfs_list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds); + ktx->ktx_launcher = current->pid; + cfs_atomic_inc(&kqswnal_data.kqn_pending_txs); - spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags); + spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags); /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */ - LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0); + LASSERT (ktx->ktx_nmappedpages == 0); return (ktx); } void -kqswnal_tx_done (kqswnal_tx_t *ktx, int error) +kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx) { + lnet_msg_t *lnetmsg0 = NULL; + lnet_msg_t *lnetmsg1 = NULL; + int status0 = 0; + int status1 = 0; + kqswnal_rx_t *krx; + + LASSERT (!cfs_in_interrupt()); + + if (ktx->ktx_status == -EHOSTDOWN) + kqswnal_notify_peer_down(ktx); + switch (ktx->ktx_state) { - case KTX_FORWARDING: /* router asked me to forward this packet */ - kpr_fwd_done (&kqswnal_data.kqn_router, - (kpr_fwd_desc_t *)ktx->ktx_args[0], error); + case KTX_RDMA_FETCH: /* optimized PUT/REPLY handled */ + krx = (kqswnal_rx_t *)ktx->ktx_args[0]; + lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1]; + status0 = ktx->ktx_status; +#if KQSW_CKSUM + if (status0 == 0) { /* RDMA succeeded */ + kqswnal_msg_t *msg; + __u32 csum; + + msg = (kqswnal_msg_t *) + page_address(krx->krx_kiov[0].kiov_page); + + csum = (lnetmsg0->msg_kiov != NULL) ? + kqswnal_csum_kiov(krx->krx_cksum, + lnetmsg0->msg_offset, + lnetmsg0->msg_wanted, + lnetmsg0->msg_niov, + lnetmsg0->msg_kiov) : + kqswnal_csum_iov(krx->krx_cksum, + lnetmsg0->msg_offset, + lnetmsg0->msg_wanted, + lnetmsg0->msg_niov, + lnetmsg0->msg_iov); + + /* Can only check csum if I got it all */ + if (lnetmsg0->msg_wanted == lnetmsg0->msg_len && + csum != msg->kqm_cksum) { + ktx->ktx_status = -EIO; + krx->krx_rpc_reply.msg.status = -EIO; + CERROR("RDMA checksum failed %u(%u) from %s\n", + csum, msg->kqm_cksum, + libcfs_nid2str(kqswnal_rx_nid(krx))); + } + } +#endif + LASSERT (krx->krx_state == KRX_COMPLETING); + kqswnal_rx_decref (krx); + break; + + case KTX_RDMA_STORE: /* optimized GET handled */ + case KTX_PUTTING: /* optimized PUT sent */ + case KTX_SENDING: /* normal send */ + lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1]; + status0 = ktx->ktx_status; break; - case KTX_SENDING: /* packet sourced locally */ - lib_finalize (&kqswnal_lib, ktx->ktx_args[0], - (lib_msg_t *)ktx->ktx_args[1]); + case KTX_GETTING: /* optimized GET sent & payload received */ + /* Complete the GET with success since we can't avoid + * delivering a REPLY event; we committed to it when we + * launched the GET */ + lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1]; + status0 = 0; + lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2]; + status1 = ktx->ktx_status; +#if KQSW_CKSUM + if (status1 == 0) { /* RDMA succeeded */ + lnet_msg_t *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1]; + lnet_libmd_t *md = lnetmsg0->msg_md; + __u32 csum; + + csum = ((md->md_options & LNET_MD_KIOV) != 0) ? + kqswnal_csum_kiov(~0, 0, + md->md_length, + md->md_niov, + md->md_iov.kiov) : + kqswnal_csum_iov(~0, 0, + md->md_length, + md->md_niov, + md->md_iov.iov); + + if (csum != ktx->ktx_cksum) { + CERROR("RDMA checksum failed %u(%u) from %s\n", + csum, ktx->ktx_cksum, + libcfs_nid2str(ktx->ktx_nid)); + status1 = -EIO; + } + } +#endif break; default: @@ -418,76 +495,206 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) } kqswnal_put_idle_tx (ktx); + + lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0); + if (lnetmsg1 != NULL) + lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1); +} + +void +kqswnal_tx_done (kqswnal_tx_t *ktx, int status) +{ + unsigned long flags; + + ktx->ktx_status = status; + + if (!cfs_in_interrupt()) { + kqswnal_tx_done_in_thread_context(ktx); + return; + } + + /* Complete the send in thread context */ + spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); + + cfs_list_add_tail(&ktx->ktx_schedlist, + &kqswnal_data.kqn_donetxds); + wake_up(&kqswnal_data.kqn_sched_waitq); + + spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags); } static void kqswnal_txhandler(EP_TXD *txd, void *arg, int status) { - kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg; + kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg; + kqswnal_rpc_reply_t *reply; LASSERT (txd != NULL); LASSERT (ktx != NULL); CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status); - if (status == EP_SUCCESS) - atomic_inc (&kqswnal_packets_transmitted); + if (status != EP_SUCCESS) { - if (status != EP_SUCCESS) - { - CERROR ("kqswnal: Transmit failed with %d\n", status); - status = -EIO; + CNETERR("Tx completion to %s failed: %d\n", + libcfs_nid2str(ktx->ktx_nid), status); + + status = -EHOSTDOWN; + + } else switch (ktx->ktx_state) { + + case KTX_GETTING: + case KTX_PUTTING: + /* RPC complete! */ + reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd); + if (reply->msg.magic == 0) { /* "old" peer */ + status = reply->msg.status; + break; + } + + if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) { + if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) { + CERROR("%s unexpected rpc reply magic %08x\n", + libcfs_nid2str(ktx->ktx_nid), + reply->msg.magic); + status = -EPROTO; + break; + } + + __swab32s(&reply->msg.status); + __swab32s(&reply->msg.version); + + if (ktx->ktx_state == KTX_GETTING) { + __swab32s(&reply->msg.u.get.len); + __swab32s(&reply->msg.u.get.cksum); + } + } + + status = reply->msg.status; + if (status != 0) { + CERROR("%s RPC status %08x\n", + libcfs_nid2str(ktx->ktx_nid), status); + break; + } + + if (ktx->ktx_state == KTX_GETTING) { + lnet_set_reply_msg_len(kqswnal_data.kqn_ni, + (lnet_msg_t *)ktx->ktx_args[2], + reply->msg.u.get.len); +#if KQSW_CKSUM + ktx->ktx_cksum = reply->msg.u.get.cksum; +#endif + } + break; + + case KTX_SENDING: + status = 0; + break; + + default: + LBUG(); + break; } - kqswnal_tx_done (ktx, status); + kqswnal_tx_done(ktx, status); } int kqswnal_launch (kqswnal_tx_t *ktx) { /* Don't block for transmit descriptor if we're in interrupt context */ - int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0; + int attr = cfs_in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0; int dest = kqswnal_nid2elanid (ktx->ktx_nid); - long flags; + unsigned long flags; int rc; - + + ktx->ktx_launchtime = cfs_time_current(); + + if (kqswnal_data.kqn_shuttingdown) + return (-ESHUTDOWN); + LASSERT (dest >= 0); /* must be a peer */ - rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest, - ktx->ktx_port, attr, kqswnal_txhandler, - ktx, ktx->ktx_iov, ktx->ktx_niov); - if (rc == 0) - atomic_inc (&kqswnal_packets_launched); - if (rc != ENOMEM) - return (rc); + if (ktx->ktx_nmappedpages != 0) + attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail); + + switch (ktx->ktx_state) { + case KTX_GETTING: + case KTX_PUTTING: + if (the_lnet.ln_testprotocompat != 0) { + kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer; + + /* single-shot proto test: + * Future version queries will use an RPC, so I'll + * co-opt one of the existing ones */ + LNET_LOCK(); + if ((the_lnet.ln_testprotocompat & 1) != 0) { + msg->kqm_version++; + the_lnet.ln_testprotocompat &= ~1; + } + if ((the_lnet.ln_testprotocompat & 2) != 0) { + msg->kqm_magic = LNET_PROTO_MAGIC; + the_lnet.ln_testprotocompat &= ~2; + } + LNET_UNLOCK(); + } + + /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t. + * The other frags are the payload, awaiting RDMA */ + rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest, + ktx->ktx_port, attr, + kqswnal_txhandler, ktx, + NULL, ktx->ktx_frags, 1); + break; - /* can't allocate ep txd => queue for later */ + case KTX_SENDING: + rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest, + ktx->ktx_port, attr, + kqswnal_txhandler, ktx, + NULL, ktx->ktx_frags, ktx->ktx_nfrag); + break; - LASSERT (in_interrupt()); /* not called by thread (not looping) */ + default: + LBUG(); + rc = -EINVAL; /* no compiler warning please */ + break; + } - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); + switch (rc) { + case EP_SUCCESS: /* success */ + return (0); - list_add_tail (&ktx->ktx_list, &kqswnal_data.kqn_delayedtxds); - if (waitqueue_active (&kqswnal_data.kqn_sched_waitq)) - wake_up (&kqswnal_data.kqn_sched_waitq); + case EP_ENOMEM: /* can't allocate ep txd => queue for later */ + spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); - spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); + cfs_list_add_tail(&ktx->ktx_schedlist, + &kqswnal_data.kqn_delayedtxds); + wake_up(&kqswnal_data.kqn_sched_waitq); - return (0); -} + spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, + flags); + return (0); + default: /* fatal error */ + CNETERR ("Tx to %s failed: %d\n", + libcfs_nid2str(ktx->ktx_nid), rc); + kqswnal_notify_peer_down(ktx); + return (-EHOSTUNREACH); + } +} +#if 0 static char * -hdr_type_string (ptl_hdr_t *hdr) +hdr_type_string (lnet_hdr_t *hdr) { switch (hdr->type) { - case PTL_MSG_ACK: + case LNET_MSG_ACK: return ("ACK"); - case PTL_MSG_PUT: + case LNET_MSG_PUT: return ("PUT"); - case PTL_MSG_GET: + case LNET_MSG_GET: return ("GET"); - case PTL_MSG_REPLY: + case LNET_MSG_REPLY: return ("REPLY"); default: return (""); @@ -495,637 +702,974 @@ hdr_type_string (ptl_hdr_t *hdr) } static void -kqswnal_cerror_hdr(ptl_hdr_t * hdr) +kqswnal_cerror_hdr(lnet_hdr_t * hdr) { char *type_str = hdr_type_string (hdr); - CERROR("P3 Header at %p of type %s\n", hdr, type_str); - CERROR(" From nid/pid "LPU64"/%u", NTOH__u64(hdr->src_nid), - NTOH__u32(hdr->src_pid)); - CERROR(" To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid), - NTOH__u32(hdr->dest_pid)); + CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str, + le32_to_cpu(hdr->payload_length)); + CERROR(" From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid), + le32_to_cpu(hdr->src_pid)); + CERROR(" To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid), + le32_to_cpu(hdr->dest_pid)); - switch (NTOH__u32(hdr->type)) { - case PTL_MSG_PUT: + switch (le32_to_cpu(hdr->type)) { + case LNET_MSG_PUT: CERROR(" Ptl index %d, ack md "LPX64"."LPX64", " "match bits "LPX64"\n", - NTOH__u32 (hdr->msg.put.ptl_index), + le32_to_cpu(hdr->msg.put.ptl_index), hdr->msg.put.ack_wmd.wh_interface_cookie, hdr->msg.put.ack_wmd.wh_object_cookie, - NTOH__u64 (hdr->msg.put.match_bits)); - CERROR(" Length %d, offset %d, hdr data "LPX64"\n", - NTOH__u32(PTL_HDR_LENGTH(hdr)), - NTOH__u32(hdr->msg.put.offset), + le64_to_cpu(hdr->msg.put.match_bits)); + CERROR(" offset %d, hdr data "LPX64"\n", + le32_to_cpu(hdr->msg.put.offset), hdr->msg.put.hdr_data); break; - case PTL_MSG_GET: + case LNET_MSG_GET: CERROR(" Ptl index %d, return md "LPX64"."LPX64", " "match bits "LPX64"\n", - NTOH__u32 (hdr->msg.get.ptl_index), + le32_to_cpu(hdr->msg.get.ptl_index), hdr->msg.get.return_wmd.wh_interface_cookie, hdr->msg.get.return_wmd.wh_object_cookie, hdr->msg.get.match_bits); CERROR(" Length %d, src offset %d\n", - NTOH__u32 (hdr->msg.get.sink_length), - NTOH__u32 (hdr->msg.get.src_offset)); + le32_to_cpu(hdr->msg.get.sink_length), + le32_to_cpu(hdr->msg.get.src_offset)); break; - case PTL_MSG_ACK: + case LNET_MSG_ACK: CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n", hdr->msg.ack.dst_wmd.wh_interface_cookie, hdr->msg.ack.dst_wmd.wh_object_cookie, - NTOH__u32 (hdr->msg.ack.mlength)); + le32_to_cpu(hdr->msg.ack.mlength)); break; - case PTL_MSG_REPLY: - CERROR(" dst md "LPX64"."LPX64", length %d\n", + case LNET_MSG_REPLY: + CERROR(" dst md "LPX64"."LPX64"\n", hdr->msg.reply.dst_wmd.wh_interface_cookie, - hdr->msg.reply.dst_wmd.wh_object_cookie, - NTOH__u32 (PTL_HDR_LENGTH(hdr))); + hdr->msg.reply.dst_wmd.wh_object_cookie); } } /* end of print_hdr() */ +#endif -static int -kqswnal_sendmsg (nal_cb_t *nal, - void *private, - lib_msg_t *cookie, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, - ptl_pid_t pid, - unsigned int payload_niov, - struct iovec *payload_iov, - ptl_kiov_t *payload_kiov, - size_t payload_nob) +int +kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag, + int nrfrag, EP_NMD *rfrag) { - kqswnal_tx_t *ktx; - int rc; - ptl_nid_t gatewaynid; -#if KQSW_CHECKSUM - int i; - kqsw_csum_t csum; - int sumnob; -#endif + int i; + + if (nlfrag != nrfrag) { + CERROR("Can't cope with unequal # frags: %d local %d remote\n", + nlfrag, nrfrag); + return (-EINVAL); + } + + for (i = 0; i < nlfrag; i++) + if (lfrag[i].nmd_len != rfrag[i].nmd_len) { + CERROR("Can't cope with unequal frags %d(%d):" + " %d local %d remote\n", + i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len); + return (-EINVAL); + } - /* NB, the return code from this procedure is ignored. - * If we can't send, we must still complete with lib_finalize(). - * We'll have to wait for 3.2 to return an error event. - */ + return (0); +} - CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64 - " pid %u\n", payload_nob, payload_niov, nid, pid); +kqswnal_remotemd_t * +kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx) +{ + /* Check that the RMD sent after the "raw" LNET header in a + * portals-compatible QSWLND message is OK */ + char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page); + kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t)); + + /* Note RDMA addresses are sent in native endian-ness in the "old" + * portals protocol so no swabbing... */ + + if (buffer + krx->krx_nob < (char *)(rmd + 1)) { + /* msg too small to discover rmd size */ + CERROR ("Incoming message [%d] too small for RMD (%d needed)\n", + krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer)); + return (NULL); + } - LASSERT (payload_nob == 0 || payload_niov > 0); - LASSERT (payload_niov <= PTL_MD_MAX_IOV); + if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) { + /* rmd doesn't fit in the incoming message */ + CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n", + krx->krx_nob, rmd->kqrmd_nfrag, + (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer)); + return (NULL); + } - /* It must be OK to kmap() if required */ - LASSERT (payload_kiov == NULL || !in_interrupt ()); - /* payload is either all vaddrs or all pages */ - LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); + return (rmd); +} + +void +kqswnal_rdma_store_complete (EP_RXD *rxd) +{ + int status = ep_rxd_status(rxd); + kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd); + kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; + + CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR, + "rxd %p, ktx %p, status %d\n", rxd, ktx, status); + + LASSERT (ktx->ktx_state == KTX_RDMA_STORE); + LASSERT (krx->krx_rxd == rxd); + LASSERT (krx->krx_rpc_reply_needed); + + krx->krx_rpc_reply_needed = 0; + kqswnal_rx_decref (krx); + + /* free ktx & finalize() its lnet_msg_t */ + kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED); +} + +void +kqswnal_rdma_fetch_complete (EP_RXD *rxd) +{ + /* Completed fetching the PUT/REPLY data */ + int status = ep_rxd_status(rxd); + kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd); + kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; + + CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR, + "rxd %p, ktx %p, status %d\n", rxd, ktx, status); + + LASSERT (ktx->ktx_state == KTX_RDMA_FETCH); + LASSERT (krx->krx_rxd == rxd); + /* RPC completes with failure by default */ + LASSERT (krx->krx_rpc_reply_needed); + LASSERT (krx->krx_rpc_reply.msg.status != 0); + + if (status == EP_SUCCESS) { + krx->krx_rpc_reply.msg.status = 0; + status = 0; + } else { + /* Abandon RPC since get failed */ + krx->krx_rpc_reply_needed = 0; + status = -ECONNABORTED; + } + + /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */ + LASSERT (krx->krx_state == KRX_PARSE); + krx->krx_state = KRX_COMPLETING; + + /* free ktx & finalize() its lnet_msg_t */ + kqswnal_tx_done(ktx, status); +} + +int +kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg, + int type, kqswnal_remotemd_t *rmd, + unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, + unsigned int offset, unsigned int len) +{ + kqswnal_tx_t *ktx; + int eprc; + int rc; + + /* Not both mapped and paged payload */ + LASSERT (iov == NULL || kiov == NULL); + /* RPC completes with failure by default */ + LASSERT (krx->krx_rpc_reply_needed); + LASSERT (krx->krx_rpc_reply.msg.status != 0); + + if (len == 0) { + /* data got truncated to nothing. */ + lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0); + /* Let kqswnal_rx_done() complete the RPC with success */ + krx->krx_rpc_reply.msg.status = 0; + return (0); + } - if (payload_nob > KQSW_MAXPAYLOAD) { - CERROR ("request exceeds MTU size "LPSZ" (max %u).\n", - payload_nob, KQSW_MAXPAYLOAD); - lib_finalize (&kqswnal_lib, private, cookie); - return (-1); + /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not + actually sending a portals message with it */ + ktx = kqswnal_get_idle_tx(); + if (ktx == NULL) { + CERROR ("Can't get txd for RDMA with %s\n", + libcfs_nid2str(kqswnal_rx_nid(krx))); + return (-ENOMEM); } - if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */ - rc = kpr_lookup (&kqswnal_data.kqn_router, nid, &gatewaynid); - if (rc != 0) { - CERROR("Can't route to "LPX64": router error %d\n", - nid, rc); - lib_finalize (&kqswnal_lib, private, cookie); - return (-1); + ktx->ktx_state = type; + ktx->ktx_nid = kqswnal_rx_nid(krx); + ktx->ktx_args[0] = krx; + ktx->ktx_args[1] = lntmsg; + + LASSERT (cfs_atomic_read(&krx->krx_refcount) > 0); + /* Take an extra ref for the completion callback */ + cfs_atomic_inc(&krx->krx_refcount); + + /* Map on the rail the RPC prefers */ + ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx, + ep_rxd_railmask(krx->krx_rxd)); + + /* Start mapping at offset 0 (we're not mapping any headers) */ + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; + + if (kiov != NULL) + rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov); + else + rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov); + + if (rc != 0) { + CERROR ("Can't map local RDMA data: %d\n", rc); + goto out; + } + + rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags, + rmd->kqrmd_nfrag, rmd->kqrmd_frag); + if (rc != 0) { + CERROR ("Incompatible RDMA descriptors\n"); + goto out; + } + + switch (type) { + default: + LBUG(); + + case KTX_RDMA_STORE: + krx->krx_rpc_reply.msg.status = 0; + krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC; + krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION; + krx->krx_rpc_reply.msg.u.get.len = len; +#if KQSW_CKSUM + krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ? + kqswnal_csum_kiov(~0, offset, len, niov, kiov) : + kqswnal_csum_iov(~0, offset, len, niov, iov); + if (*kqswnal_tunables.kqn_inject_csum_error == 4) { + krx->krx_rpc_reply.msg.u.get.cksum++; + *kqswnal_tunables.kqn_inject_csum_error = 0; + } +#endif + eprc = ep_complete_rpc(krx->krx_rxd, + kqswnal_rdma_store_complete, ktx, + &krx->krx_rpc_reply.ep_statusblk, + ktx->ktx_frags, rmd->kqrmd_frag, + rmd->kqrmd_nfrag); + if (eprc != EP_SUCCESS) { + CERROR("can't complete RPC: %d\n", eprc); + /* don't re-attempt RPC completion */ + krx->krx_rpc_reply_needed = 0; + rc = -ECONNABORTED; } - if (kqswnal_nid2elanid (gatewaynid) < 0) { - CERROR("Bad gateway "LPX64" for "LPX64"\n", - gatewaynid, nid); - lib_finalize (&kqswnal_lib, private, cookie); - return (-1); + break; + + case KTX_RDMA_FETCH: + eprc = ep_rpc_get (krx->krx_rxd, + kqswnal_rdma_fetch_complete, ktx, + rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag); + if (eprc != EP_SUCCESS) { + CERROR("ep_rpc_get failed: %d\n", eprc); + /* Don't attempt RPC completion: + * EKC nuked it when the get failed */ + krx->krx_rpc_reply_needed = 0; + rc = -ECONNABORTED; } - nid = gatewaynid; + break; } - /* I may not block for a transmit descriptor if I might block the - * receiver, or an interrupt handler. */ - ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK || - type == PTL_MSG_REPLY || - in_interrupt())); - if (ktx == NULL) { - kqswnal_cerror_hdr (hdr); - lib_finalize (&kqswnal_lib, private, cookie); + out: + if (rc != 0) { + kqswnal_rx_decref(krx); /* drop callback's ref */ + kqswnal_put_idle_tx (ktx); } - memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */ + cfs_atomic_dec(&kqswnal_data.kqn_pending_txs); + return (rc); +} -#if KQSW_CHECKSUM - csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr)); - memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum)); - for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) { - if (payload_kiov != NULL) { - ptl_kiov_t *kiov = &payload_kiov[i]; - char *addr = ((char *)kmap (kiov->kiov_page)) + - kiov->kiov_offset; - - csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len)); - sumnob -= kiov->kiov_len; - } else { - struct iovec *iov = &payload_iov[i]; +int +kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) +{ + lnet_hdr_t *hdr = &lntmsg->msg_hdr; + int type = lntmsg->msg_type; + lnet_process_id_t target = lntmsg->msg_target; + int target_is_router = lntmsg->msg_target_is_router; + int routing = lntmsg->msg_routing; + unsigned int payload_niov = lntmsg->msg_niov; + struct iovec *payload_iov = lntmsg->msg_iov; + lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; + unsigned int payload_offset = lntmsg->msg_offset; + unsigned int payload_nob = lntmsg->msg_len; + int nob; + kqswnal_tx_t *ktx; + int rc; + + /* NB 1. hdr is in network byte order */ + /* 2. 'private' depends on the message type */ + + CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n", + payload_nob, payload_niov, libcfs_id2str(target)); - csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len)); - sumnob -= iov->iov_len; - } + LASSERT (payload_nob == 0 || payload_niov > 0); + LASSERT (payload_niov <= LNET_MAX_IOV); + + /* It must be OK to kmap() if required */ + LASSERT (payload_kiov == NULL || !cfs_in_interrupt ()); + /* payload is either all vaddrs or all pages */ + LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); + + if (kqswnal_nid2elanid (target.nid) < 0) { + CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid)); + return -EIO; } - memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum)); -#endif - /* Set up first frag from pre-mapped buffer (it's at least the - * portals header) */ - ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; - ktx->ktx_iov[0].Len = KQSW_HDR_SIZE; - ktx->ktx_niov = 1; + /* I may not block for a transmit descriptor if I might block the + * router, receiver, or an interrupt handler. */ + ktx = kqswnal_get_idle_tx(); + if (ktx == NULL) { + CERROR ("Can't get txd for msg type %d for %s\n", + type, libcfs_nid2str(target.nid)); + return (-ENOMEM); + } - if (payload_nob > 0) { /* got some payload (something more to do) */ - /* make a single contiguous message? */ - if (payload_nob <= KQSW_TX_MAXCONTIG) { - /* copy payload to ktx_buffer, immediately after hdr */ - if (payload_kiov != NULL) - lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, - payload_niov, payload_kiov, payload_nob); + ktx->ktx_state = KTX_SENDING; + ktx->ktx_nid = target.nid; + ktx->ktx_args[0] = private; + ktx->ktx_args[1] = lntmsg; + ktx->ktx_args[2] = NULL; /* set when a GET commits to REPLY */ + + /* The first frag will be the pre-mapped buffer. */ + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; + + if ((!target_is_router && /* target.nid is final dest */ + !routing && /* I'm the source */ + type == LNET_MSG_GET && /* optimize GET? */ + *kqswnal_tunables.kqn_optimized_gets != 0 && + lntmsg->msg_md->md_length >= + *kqswnal_tunables.kqn_optimized_gets) || + ((type == LNET_MSG_PUT || /* optimize PUT? */ + type == LNET_MSG_REPLY) && /* optimize REPLY? */ + *kqswnal_tunables.kqn_optimized_puts != 0 && + payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) { + lnet_libmd_t *md = lntmsg->msg_md; + kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer; + lnet_hdr_t *mhdr; + kqswnal_remotemd_t *rmd; + + /* Optimised path: I send over the Elan vaddrs of the local + * buffers, and my peer DMAs directly to/from them. + * + * First I set up ktx as if it was going to send this + * payload, (it needs to map it anyway). This fills + * ktx_frags[1] and onward with the network addresses + * of the buffer frags. */ + + /* Send an RDMA message */ + msg->kqm_magic = LNET_PROTO_QSW_MAGIC; + msg->kqm_version = QSWLND_PROTO_VERSION; + msg->kqm_type = QSWLND_MSG_RDMA; + + mhdr = &msg->kqm_u.rdma.kqrm_hdr; + rmd = &msg->kqm_u.rdma.kqrm_rmd; + + *mhdr = *hdr; + nob = (((char *)rmd) - ktx->ktx_buffer); + + if (type == LNET_MSG_GET) { + if ((md->md_options & LNET_MD_KIOV) != 0) + rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length, + md->md_niov, md->md_iov.kiov); else - lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, - payload_niov, payload_iov, payload_nob); - /* first frag includes payload */ - ktx->ktx_iov[0].Len += payload_nob; + rc = kqswnal_map_tx_iov (ktx, 0, md->md_length, + md->md_niov, md->md_iov.iov); + ktx->ktx_state = KTX_GETTING; } else { if (payload_kiov != NULL) - rc = kqswnal_map_tx_kiov (ktx, payload_nob, - payload_niov, payload_kiov); + rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob, + payload_niov, payload_kiov); else - rc = kqswnal_map_tx_iov (ktx, payload_nob, - payload_niov, payload_iov); - if (rc != 0) { - kqswnal_put_idle_tx (ktx); - lib_finalize (&kqswnal_lib, private, cookie); - return (-1); + rc = kqswnal_map_tx_iov(ktx, 0, payload_nob, + payload_niov, payload_iov); + ktx->ktx_state = KTX_PUTTING; + } + + if (rc != 0) + goto out; + + rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1; + nob += offsetof(kqswnal_remotemd_t, + kqrmd_frag[rmd->kqrmd_nfrag]); + LASSERT (nob <= KQSW_TX_BUFFER_SIZE); + + memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1], + rmd->kqrmd_nfrag * sizeof(EP_NMD)); + + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob); +#if KQSW_CKSUM + msg->kqm_nob = nob + payload_nob; + msg->kqm_cksum = 0; + msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob); +#endif + if (type == LNET_MSG_GET) { + /* Allocate reply message now while I'm in thread context */ + ktx->ktx_args[2] = lnet_create_reply_msg ( + kqswnal_data.kqn_ni, lntmsg); + if (ktx->ktx_args[2] == NULL) + goto out; + + /* NB finalizing the REPLY message is my + * responsibility now, whatever happens. */ +#if KQSW_CKSUM + if (*kqswnal_tunables.kqn_inject_csum_error == 3) { + msg->kqm_cksum++; + *kqswnal_tunables.kqn_inject_csum_error = 0; + } + + } else if (payload_kiov != NULL) { + /* must checksum payload after header so receiver can + * compute partial header cksum before swab. Sadly + * this causes 2 rounds of kmap */ + msg->kqm_cksum = + kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob, + payload_niov, payload_kiov); + if (*kqswnal_tunables.kqn_inject_csum_error == 2) { + msg->kqm_cksum++; + *kqswnal_tunables.kqn_inject_csum_error = 0; } - } + } else { + msg->kqm_cksum = + kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob, + payload_niov, payload_iov); + if (*kqswnal_tunables.kqn_inject_csum_error == 2) { + msg->kqm_cksum++; + *kqswnal_tunables.kqn_inject_csum_error = 0; + } +#endif + } + + } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) { + lnet_hdr_t *mhdr; + char *payload; + kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer; + + /* single frag copied into the pre-mapped buffer */ + msg->kqm_magic = LNET_PROTO_QSW_MAGIC; + msg->kqm_version = QSWLND_PROTO_VERSION; + msg->kqm_type = QSWLND_MSG_IMMEDIATE; + + mhdr = &msg->kqm_u.immediate.kqim_hdr; + payload = msg->kqm_u.immediate.kqim_payload; + + *mhdr = *hdr; + nob = (payload - ktx->ktx_buffer) + payload_nob; + + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob); + + if (payload_kiov != NULL) + lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0, + payload_niov, payload_kiov, + payload_offset, payload_nob); + else + lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0, + payload_niov, payload_iov, + payload_offset, payload_nob); +#if KQSW_CKSUM + msg->kqm_nob = nob; + msg->kqm_cksum = 0; + msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob); + if (*kqswnal_tunables.kqn_inject_csum_error == 1) { + msg->kqm_cksum++; + *kqswnal_tunables.kqn_inject_csum_error = 0; + } +#endif + } else { + lnet_hdr_t *mhdr; + kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer; + + /* multiple frags: first is hdr in pre-mapped buffer */ + msg->kqm_magic = LNET_PROTO_QSW_MAGIC; + msg->kqm_version = QSWLND_PROTO_VERSION; + msg->kqm_type = QSWLND_MSG_IMMEDIATE; + + mhdr = &msg->kqm_u.immediate.kqim_hdr; + nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload); + + *mhdr = *hdr; + + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob); + + if (payload_kiov != NULL) + rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, + payload_niov, payload_kiov); + else + rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob, + payload_niov, payload_iov); + if (rc != 0) + goto out; + +#if KQSW_CKSUM + msg->kqm_nob = nob + payload_nob; + msg->kqm_cksum = 0; + msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob); + + msg->kqm_cksum = (payload_kiov != NULL) ? + kqswnal_csum_kiov(msg->kqm_cksum, + payload_offset, payload_nob, + payload_niov, payload_kiov) : + kqswnal_csum_iov(msg->kqm_cksum, + payload_offset, payload_nob, + payload_niov, payload_iov); + + if (*kqswnal_tunables.kqn_inject_csum_error == 1) { + msg->kqm_cksum++; + *kqswnal_tunables.kqn_inject_csum_error = 0; + } +#endif + nob += payload_nob; } - ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ? - EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE; - ktx->ktx_nid = nid; - ktx->ktx_state = KTX_SENDING; /* => lib_finalize() on completion */ - ktx->ktx_args[0] = private; - ktx->ktx_args[1] = cookie; + ktx->ktx_port = (nob <= KQSW_SMALLMSG) ? + EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; rc = kqswnal_launch (ktx); - if (rc != 0) { /* failed? */ - CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc); - lib_finalize (&kqswnal_lib, private, cookie); - return (-1); - } - CDEBUG(D_NET, "send to "LPSZ" bytes to "LPX64"\n", payload_nob, nid); - return (0); + out: + CDEBUG_LIMIT(rc == 0? D_NET :D_NETERROR, "%s %d bytes to %s%s: rc %d\n", + routing ? (rc == 0 ? "Routed" : "Failed to route") : + (rc == 0 ? "Sent" : "Failed to send"), + nob, libcfs_nid2str(target.nid), + target_is_router ? "(router)" : "", rc); + + if (rc != 0) { + lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2]; + int state = ktx->ktx_state; + + kqswnal_put_idle_tx (ktx); + + if (state == KTX_GETTING && repmsg != NULL) { + /* We committed to reply, but there was a problem + * launching the GET. We can't avoid delivering a + * REPLY event since we committed above, so we + * pretend the GET succeeded but the REPLY + * failed. */ + rc = 0; + lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0); + lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO); + } + + } + + cfs_atomic_dec(&kqswnal_data.kqn_pending_txs); + return (rc == 0 ? 0 : -EIO); } -static int -kqswnal_send (nal_cb_t *nal, - void *private, - lib_msg_t *cookie, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, - ptl_pid_t pid, - unsigned int payload_niov, - struct iovec *payload_iov, - size_t payload_nob) +void +kqswnal_requeue_rx (kqswnal_rx_t *krx) { - return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid, - payload_niov, payload_iov, NULL, payload_nob)); + LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0); + LASSERT (!krx->krx_rpc_reply_needed); + + krx->krx_state = KRX_POSTED; + + if (kqswnal_data.kqn_shuttingdown) { + /* free EKC rxd on shutdown */ + ep_complete_receive(krx->krx_rxd); + } else { + /* repost receive */ + ep_requeue_receive(krx->krx_rxd, + kqswnal_rxhandler, krx, + &krx->krx_elanbuffer, 0); + } } -static int -kqswnal_send_pages (nal_cb_t *nal, - void *private, - lib_msg_t *cookie, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, - ptl_pid_t pid, - unsigned int payload_niov, - ptl_kiov_t *payload_kiov, - size_t payload_nob) +void +kqswnal_rpc_complete (EP_RXD *rxd) { - return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid, - payload_niov, NULL, payload_kiov, payload_nob)); -} + int status = ep_rxd_status(rxd); + kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd); + + CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR, + "rxd %p, krx %p, status %d\n", rxd, krx, status); -int kqswnal_fwd_copy_contig = 0; + LASSERT (krx->krx_rxd == rxd); + LASSERT (krx->krx_rpc_reply_needed); + + krx->krx_rpc_reply_needed = 0; + kqswnal_requeue_rx (krx); +} void -kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) +kqswnal_rx_done (kqswnal_rx_t *krx) { - int rc; - kqswnal_tx_t *ktx; - struct iovec *iov = fwd->kprfd_iov; - int niov = fwd->kprfd_niov; - int nob = fwd->kprfd_nob; - ptl_nid_t nid = fwd->kprfd_gateway_nid; - -#if KQSW_CHECKSUM - CERROR ("checksums for forwarded packets not implemented\n"); - LBUG (); -#endif - /* The router wants this NAL to forward a packet */ - CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n", - fwd, nid, niov, nob); + int rc; - LASSERT (niov > 0); - - ktx = kqswnal_get_idle_tx (fwd, FALSE); - if (ktx == NULL) /* can't get txd right now */ - return; /* fwd will be scheduled when tx desc freed */ + LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0); - if (nid == kqswnal_lib.ni.nid) /* gateway is me */ - nid = fwd->kprfd_target_nid; /* target is final dest */ + if (krx->krx_rpc_reply_needed) { + /* We've not completed the peer's RPC yet... */ + krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC; + krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION; - if (kqswnal_nid2elanid (nid) < 0) { - CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid); - rc = -EHOSTUNREACH; - goto failed; - } + LASSERT (!cfs_in_interrupt()); - if (nob > KQSW_NRXMSGBYTES_LARGE) { - CERROR ("Can't forward [%p] to "LPX64 - ": size %d bigger than max packet size %ld\n", - fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE); - rc = -EMSGSIZE; - goto failed; - } + rc = ep_complete_rpc(krx->krx_rxd, + kqswnal_rpc_complete, krx, + &krx->krx_rpc_reply.ep_statusblk, + NULL, NULL, 0); + if (rc == EP_SUCCESS) + return; - if ((kqswnal_fwd_copy_contig || niov > 1) && - nob <= KQSW_TX_BUFFER_SIZE) - { - /* send from ktx's pre-allocated/mapped contiguous buffer? */ - lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob); - ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; /* already mapped */ - ktx->ktx_iov[0].Len = nob; - ktx->ktx_niov = 1; + CERROR("can't complete RPC: %d\n", rc); + krx->krx_rpc_reply_needed = 0; } - else - { - /* zero copy */ - ktx->ktx_niov = 0; /* no frags mapped yet */ - rc = kqswnal_map_tx_iov (ktx, nob, niov, iov); - if (rc != 0) - goto failed; + + kqswnal_requeue_rx(krx); +} + +void +kqswnal_parse (kqswnal_rx_t *krx) +{ + lnet_ni_t *ni = kqswnal_data.kqn_ni; + kqswnal_msg_t *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page); + lnet_nid_t fromnid = kqswnal_rx_nid(krx); + int swab; + int n; + int i; + int nob; + int rc; + + LASSERT (cfs_atomic_read(&krx->krx_refcount) == 1); + + if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) { + CERROR("Short message %d received from %s\n", + krx->krx_nob, libcfs_nid2str(fromnid)); + goto done; } - ktx->ktx_port = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ? - EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE; - ktx->ktx_nid = nid; - ktx->ktx_state = KTX_FORWARDING; /* kpr_put_packet() on completion */ - ktx->ktx_args[0] = fwd; + swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC); - rc = kqswnal_launch (ktx); - if (rc == 0) - return; + if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) { +#if KQSW_CKSUM + __u32 csum0; + __u32 csum1; + + /* csum byte array before swab */ + csum1 = msg->kqm_cksum; + msg->kqm_cksum = 0; + csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob, + krx->krx_npages, krx->krx_kiov); + msg->kqm_cksum = csum1; +#endif - failed: - LASSERT (rc != 0); - CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc); + if (swab) { + __swab16s(&msg->kqm_version); + __swab16s(&msg->kqm_type); +#if KQSW_CKSUM + __swab32s(&msg->kqm_cksum); + __swab32s(&msg->kqm_nob); +#endif + } - kqswnal_put_idle_tx (ktx); - /* complete now (with failure) */ - kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc); -} + if (msg->kqm_version != QSWLND_PROTO_VERSION) { + /* Future protocol version compatibility support! + * The next qswlnd-specific protocol rev will first + * send an RPC to check version. + * 1.4.6 and 1.4.7.early reply with a status + * block containing its current version. + * Later versions send a failure (-ve) status + + * magic/version */ + + if (!krx->krx_rpc_reply_needed) { + CERROR("Unexpected version %d from %s\n", + msg->kqm_version, libcfs_nid2str(fromnid)); + goto done; + } -void -kqswnal_fwd_callback (void *arg, int error) -{ - kqswnal_rx_t *krx = (kqswnal_rx_t *)arg; + LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO); + goto done; + } - /* The router has finished forwarding this packet */ + switch (msg->kqm_type) { + default: + CERROR("Bad request type %x from %s\n", + msg->kqm_type, libcfs_nid2str(fromnid)); + goto done; + + case QSWLND_MSG_IMMEDIATE: + if (krx->krx_rpc_reply_needed) { + /* Should have been a simple message */ + CERROR("IMMEDIATE sent as RPC from %s\n", + libcfs_nid2str(fromnid)); + goto done; + } - if (error != 0) - { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); + nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload); + if (krx->krx_nob < nob) { + CERROR("Short IMMEDIATE %d(%d) from %s\n", + krx->krx_nob, nob, libcfs_nid2str(fromnid)); + goto done; + } - CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n", - NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error); - } +#if KQSW_CKSUM + if (csum0 != msg->kqm_cksum) { + CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n", + csum0, msg->kqm_cksum, libcfs_nid2str(fromnid)); + CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob); + goto done; + } +#endif + rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr, + fromnid, krx, 0); + if (rc < 0) + goto done; + return; - kqswnal_requeue_rx (krx); -} + case QSWLND_MSG_RDMA: + if (!krx->krx_rpc_reply_needed) { + /* Should have been a simple message */ + CERROR("RDMA sent as simple message from %s\n", + libcfs_nid2str(fromnid)); + goto done; + } -void -kqswnal_rx (kqswnal_rx_t *krx) -{ - ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]); - ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid); - int nob; - int niov; + nob = offsetof(kqswnal_msg_t, + kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]); + if (krx->krx_nob < nob) { + CERROR("Short RDMA message %d(%d) from %s\n", + krx->krx_nob, nob, libcfs_nid2str(fromnid)); + goto done; + } - if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */ - /* NB krx requeued when lib_parse() calls back kqswnal_recv */ - lib_parse (&kqswnal_lib, hdr, krx); - return; - } + if (swab) + __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag); + + n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag; + nob = offsetof(kqswnal_msg_t, + kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]); + + if (krx->krx_nob < nob) { + CERROR("short RDMA message %d(%d) from %s\n", + krx->krx_nob, nob, libcfs_nid2str(fromnid)); + goto done; + } + + if (swab) { + for (i = 0; i < n; i++) { + EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i]; + + __swab32s(&nmd->nmd_addr); + __swab32s(&nmd->nmd_len); + __swab32s(&nmd->nmd_attr); + } + } -#if KQSW_CHECKSUM - CERROR ("checksums for forwarded packets not implemented\n"); - LBUG (); +#if KQSW_CKSUM + krx->krx_cksum = csum0; /* stash checksum so far */ #endif - if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */ - { - CERROR("dropping packet from "LPX64" for "LPX64 - ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid); - kqswnal_requeue_rx (krx); - return; + rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr, + fromnid, krx, 1); + if (rc < 0) + goto done; + return; + } + /* Not Reached */ } - /* NB forwarding may destroy iov; rebuild every time */ - for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++) - { - LASSERT (niov < krx->krx_npages); - krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]); - krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob); - } + if (msg->kqm_magic == LNET_PROTO_MAGIC || + msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) { + /* Future protocol version compatibility support! + * When LNET unifies protocols over all LNDs, the first thing a + * peer will send will be a version query RPC. + * 1.4.6 and 1.4.7.early reply with a status block containing + * LNET_PROTO_QSW_MAGIC.. + * Later versions send a failure (-ve) status + + * magic/version */ + + if (!krx->krx_rpc_reply_needed) { + CERROR("Unexpected magic %08x from %s\n", + msg->kqm_magic, libcfs_nid2str(fromnid)); + goto done; + } - kpr_fwd_init (&krx->krx_fwd, dest_nid, - krx->krx_nob, niov, krx->krx_iov, - kqswnal_fwd_callback, krx); + LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO); + goto done; + } - kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd); + CERROR("Unrecognised magic %08x from %s\n", + msg->kqm_magic, libcfs_nid2str(fromnid)); + done: + kqswnal_rx_decref(krx); } /* Receive Interrupt Handler: posts to schedulers */ void kqswnal_rxhandler(EP_RXD *rxd) { - long flags; + unsigned long flags; int nob = ep_rxd_len (rxd); int status = ep_rxd_status (rxd); kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd); - CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n", rxd, krx, nob, status); LASSERT (krx != NULL); - + LASSERT (krx->krx_state == KRX_POSTED); + + krx->krx_state = KRX_PARSE; krx->krx_rxd = rxd; krx->krx_nob = nob; - /* must receive a whole header to be able to parse */ - if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t)) - { - /* receives complete with failure when receiver is removed */ - if (kqswnal_data.kqn_shuttingdown) - return; + /* RPC reply iff rpc request received without error */ + krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) && + (status == EP_SUCCESS || + status == EP_MSG_TOO_BIG); - CERROR("receive status failed with status %d nob %d\n", - ep_rxd_status(rxd), nob); - kqswnal_requeue_rx (krx); + /* Default to failure if an RPC reply is requested but not handled */ + krx->krx_rpc_reply.msg.status = -EPROTO; + cfs_atomic_set (&krx->krx_refcount, 1); + + if (status != EP_SUCCESS) { + /* receives complete with failure when receiver is removed */ + if (status == EP_SHUTDOWN) + LASSERT (kqswnal_data.kqn_shuttingdown); + else + CERROR("receive status failed with status %d nob %d\n", + ep_rxd_status(rxd), nob); + kqswnal_rx_decref(krx); return; } - atomic_inc (&kqswnal_packets_received); + if (!cfs_in_interrupt()) { + kqswnal_parse(krx); + return; + } - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); + spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); - list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds); - if (waitqueue_active (&kqswnal_data.kqn_sched_waitq)) - wake_up (&kqswnal_data.kqn_sched_waitq); + cfs_list_add_tail(&krx->krx_list, &kqswnal_data.kqn_readyrxds); + wake_up(&kqswnal_data.kqn_sched_waitq); - spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); + spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags); } -#if KQSW_CHECKSUM -void -kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr) +int +kqswnal_recv (lnet_ni_t *ni, + void *private, + lnet_msg_t *lntmsg, + int delayed, + unsigned int niov, + struct iovec *iov, + lnet_kiov_t *kiov, + unsigned int offset, + unsigned int mlen, + unsigned int rlen) { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); - - CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64 - ", dpid %d, spid %d, type %d\n", - ishdr ? "Header" : "Payload", krx, - NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid) - NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid), - NTOH__u32(hdr->type)); - - switch (NTOH__u32 (hdr->type)) - { - case PTL_MSG_ACK: - CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64 - " len %u\n", - NTOH__u32(hdr->msg.ack.mlength), - hdr->msg.ack.dst_wmd.handle_cookie, - hdr->msg.ack.dst_wmd.handle_idx, - NTOH__u64(hdr->msg.ack.match_bits), - NTOH__u32(hdr->msg.ack.length)); - break; - case PTL_MSG_PUT: - CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64 - " len %u off %u data "LPX64"\n", - NTOH__u32(hdr->msg.put.ptl_index), - hdr->msg.put.ack_wmd.handle_cookie, - hdr->msg.put.ack_wmd.handle_idx, - NTOH__u64(hdr->msg.put.match_bits), - NTOH__u32(hdr->msg.put.length), - NTOH__u32(hdr->msg.put.offset), - hdr->msg.put.hdr_data); - break; - case PTL_MSG_GET: - CERROR ("GET: <>\n"); - break; - case PTL_MSG_REPLY: - CERROR ("REPLY: <>\n"); - break; - default: - CERROR ("TYPE?: <>\n"); - } -} -#endif + kqswnal_rx_t *krx = (kqswnal_rx_t *)private; + lnet_nid_t fromnid; + kqswnal_msg_t *msg; + lnet_hdr_t *hdr; + kqswnal_remotemd_t *rmd; + int msg_offset; + int rc; + + LASSERT (!cfs_in_interrupt ()); /* OK to map */ + /* Either all pages or all vaddrs */ + LASSERT (!(kiov != NULL && iov != NULL)); -static int -kqswnal_recvmsg (nal_cb_t *nal, - void *private, - lib_msg_t *cookie, - unsigned int niov, - struct iovec *iov, - ptl_kiov_t *kiov, - size_t mlen, - size_t rlen) -{ - kqswnal_rx_t *krx = (kqswnal_rx_t *)private; - int page; - char *page_ptr; - int page_nob; - char *iov_ptr; - int iov_nob; - int frag; -#if KQSW_CHECKSUM - kqsw_csum_t senders_csum; - kqsw_csum_t payload_csum = 0; - kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]), - sizeof(ptl_hdr_t)); - size_t csum_len = mlen; - int csum_frags = 0; - int csum_nob = 0; - static atomic_t csum_counter; - int csum_verbose = (atomic_read(&csum_counter)%1000001) == 0; - - atomic_inc (&csum_counter); - - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof (ptl_hdr_t), sizeof (kqsw_csum_t)); - if (senders_csum != hdr_csum) - kqswnal_csum_error (krx, 1); -#endif - CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen); + fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd)); + msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page); - /* What was actually received must be >= payload. - * This is an LASSERT, as lib_finalize() doesn't have a completion status. */ - LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen); - LASSERT (mlen <= rlen); + if (krx->krx_rpc_reply_needed) { + /* optimized (rdma) request sent as RPC */ - /* It must be OK to kmap() if required */ - LASSERT (kiov == NULL || !in_interrupt ()); - /* Either all pages or all vaddrs */ - LASSERT (!(kiov != NULL && iov != NULL)); - - if (mlen != 0) - { - page = 0; - page_ptr = ((char *) page_address(krx->krx_pages[0])) + - KQSW_HDR_SIZE; - page_nob = PAGE_SIZE - KQSW_HDR_SIZE; + LASSERT (msg->kqm_type == QSWLND_MSG_RDMA); + hdr = &msg->kqm_u.rdma.kqrm_hdr; + rmd = &msg->kqm_u.rdma.kqrm_rmd; - LASSERT (niov > 0); - if (kiov != NULL) { - iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; - iov_nob = kiov->kiov_len; - } else { - iov_ptr = iov->iov_base; - iov_nob = iov->iov_len; - } + /* NB header is still in wire byte order */ - for (;;) - { - /* We expect the iov to exactly match mlen */ - LASSERT (iov_nob <= mlen); - - frag = MIN (page_nob, iov_nob); - memcpy (iov_ptr, page_ptr, frag); -#if KQSW_CHECKSUM - payload_csum = kqsw_csum (payload_csum, iov_ptr, frag); - csum_nob += frag; - csum_frags++; -#endif - mlen -= frag; - if (mlen == 0) + switch (le32_to_cpu(hdr->type)) { + case LNET_MSG_PUT: + case LNET_MSG_REPLY: + /* This is an optimized PUT/REPLY */ + rc = kqswnal_rdma(krx, lntmsg, + KTX_RDMA_FETCH, rmd, + niov, iov, kiov, offset, mlen); break; - page_nob -= frag; - if (page_nob != 0) - page_ptr += frag; - else - { - page++; - LASSERT (page < krx->krx_npages); - page_ptr = page_address(krx->krx_pages[page]); - page_nob = PAGE_SIZE; - } + case LNET_MSG_GET: +#if KQSW_CKSUM + if (krx->krx_cksum != msg->kqm_cksum) { + CERROR("Bad GET checksum %08x(%08x) from %s\n", + krx->krx_cksum, msg->kqm_cksum, + libcfs_nid2str(fromnid)); + rc = -EIO; + break; + } +#endif + if (lntmsg == NULL) { + /* No buffer match: my decref will + * complete the RPC with failure */ + rc = 0; + } else { + /* Matched something! */ + rc = kqswnal_rdma(krx, lntmsg, + KTX_RDMA_STORE, rmd, + lntmsg->msg_niov, + lntmsg->msg_iov, + lntmsg->msg_kiov, + lntmsg->msg_offset, + lntmsg->msg_len); + } + break; - iov_nob -= frag; - if (iov_nob != 0) - iov_ptr += frag; - else if (kiov != NULL) { - kunmap (kiov->kiov_page); - kiov++; - niov--; - LASSERT (niov > 0); - iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; - iov_nob = kiov->kiov_len; - } else { - iov++; - niov--; - LASSERT (niov > 0); - iov_ptr = iov->iov_base; - iov_nob = iov->iov_len; - } + default: + CERROR("Bad RPC type %d\n", + le32_to_cpu(hdr->type)); + rc = -EPROTO; + break; } - if (kiov != NULL) - kunmap (kiov->kiov_page); + kqswnal_rx_decref(krx); + return rc; } -#if KQSW_CHECKSUM - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t)); - - if (csum_len != rlen) - CERROR("Unable to checksum data in user's buffer\n"); - else if (senders_csum != payload_csum) - kqswnal_csum_error (krx, 0); - - if (csum_verbose) - CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, " - "csum_nob %d\n", - hdr_csum, payload_csum, csum_frags, csum_nob); -#endif - lib_finalize(nal, private, cookie); - - kqswnal_requeue_rx (krx); - - return (rlen); -} + LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE); + msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload); + + if (krx->krx_nob < msg_offset + rlen) { + CERROR("Bad message size from %s: have %d, need %d + %d\n", + libcfs_nid2str(fromnid), krx->krx_nob, + msg_offset, rlen); + kqswnal_rx_decref(krx); + return -EPROTO; + } -static int -kqswnal_recv(nal_cb_t *nal, - void *private, - lib_msg_t *cookie, - unsigned int niov, - struct iovec *iov, - size_t mlen, - size_t rlen) -{ - return (kqswnal_recvmsg (nal, private, cookie, niov, iov, NULL, mlen, rlen)); -} + if (kiov != NULL) + lnet_copy_kiov2kiov(niov, kiov, offset, + krx->krx_npages, krx->krx_kiov, + msg_offset, mlen); + else + lnet_copy_kiov2iov(niov, iov, offset, + krx->krx_npages, krx->krx_kiov, + msg_offset, mlen); -static int -kqswnal_recv_pages (nal_cb_t *nal, - void *private, - lib_msg_t *cookie, - unsigned int niov, - ptl_kiov_t *kiov, - size_t mlen, - size_t rlen) -{ - return (kqswnal_recvmsg (nal, private, cookie, niov, NULL, kiov, mlen, rlen)); + lnet_finalize(ni, lntmsg, 0); + kqswnal_rx_decref(krx); + return 0; } int -kqswnal_thread_start (int (*fn)(void *arg), void *arg) +kqswnal_thread_start(int (*fn)(void *arg), void *arg, char *name) { - long pid = kernel_thread (fn, arg, 0); + struct task_struct *task = cfs_thread_run(fn, arg, name); - if (pid < 0) - return ((int)pid); + if (IS_ERR(task)) + return PTR_ERR(task); - atomic_inc (&kqswnal_data.kqn_nthreads); - return (0); + cfs_atomic_inc(&kqswnal_data.kqn_nthreads); + return 0; } void kqswnal_thread_fini (void) { - atomic_dec (&kqswnal_data.kqn_nthreads); + cfs_atomic_dec (&kqswnal_data.kqn_nthreads); } int @@ -1133,107 +1677,102 @@ kqswnal_scheduler (void *arg) { kqswnal_rx_t *krx; kqswnal_tx_t *ktx; - kpr_fwd_desc_t *fwd; - long flags; + unsigned long flags; int rc; int counter = 0; int did_something; - kportal_daemonize ("kqswnal_sched"); - kportal_blockallsigs (); - - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); + cfs_block_allsigs (); - while (!kqswnal_data.kqn_shuttingdown) + spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); + + for (;;) { - did_something = FALSE; + did_something = 0; - if (!list_empty (&kqswnal_data.kqn_readyrxds)) + if (!cfs_list_empty (&kqswnal_data.kqn_readyrxds)) { - krx = list_entry(kqswnal_data.kqn_readyrxds.next, - kqswnal_rx_t, krx_list); - list_del (&krx->krx_list); - spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, - flags); - - kqswnal_rx (krx); - - did_something = TRUE; - spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); + krx = cfs_list_entry(kqswnal_data.kqn_readyrxds.next, + kqswnal_rx_t, krx_list); + cfs_list_del (&krx->krx_list); + spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, + flags); + + LASSERT (krx->krx_state == KRX_PARSE); + kqswnal_parse (krx); + + did_something = 1; + spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, + flags); } - if (!list_empty (&kqswnal_data.kqn_delayedtxds)) + if (!cfs_list_empty (&kqswnal_data.kqn_donetxds)) { - ktx = list_entry(kqswnal_data.kqn_delayedtxds.next, - kqswnal_tx_t, ktx_list); - list_del (&ktx->ktx_list); - spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, - flags); + ktx = cfs_list_entry(kqswnal_data.kqn_donetxds.next, + kqswnal_tx_t, ktx_schedlist); + cfs_list_del_init (&ktx->ktx_schedlist); + spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, + flags); - rc = kqswnal_launch (ktx); - if (rc != 0) /* failed: ktx_nid down? */ - { - CERROR("Failed delayed transmit to "LPX64 - ": %d\n", ktx->ktx_nid, rc); - kqswnal_tx_done (ktx, rc); - } + kqswnal_tx_done_in_thread_context(ktx); - did_something = TRUE; - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); + did_something = 1; + spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, + flags); } - if (!list_empty (&kqswnal_data.kqn_delayedfwds)) + if (!cfs_list_empty (&kqswnal_data.kqn_delayedtxds)) { - fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list); - list_del (&fwd->kprfd_list); - spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); + ktx = cfs_list_entry(kqswnal_data.kqn_delayedtxds.next, + kqswnal_tx_t, ktx_schedlist); + cfs_list_del_init (&ktx->ktx_schedlist); + spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, + flags); - kqswnal_fwd_packet (NULL, fwd); + rc = kqswnal_launch (ktx); + if (rc != 0) { + CERROR("Failed delayed transmit to %s: %d\n", + libcfs_nid2str(ktx->ktx_nid), rc); + kqswnal_tx_done (ktx, rc); + } + cfs_atomic_dec (&kqswnal_data.kqn_pending_txs); - did_something = TRUE; - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); + did_something = 1; + spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, + flags); } - /* nothing to do or hogging CPU */ + /* nothing to do or hogging CPU */ if (!did_something || counter++ == KQSW_RESCHED) { - spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, - flags); + spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, + flags); counter = 0; if (!did_something) { - rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq, - kqswnal_data.kqn_shuttingdown || - !list_empty(&kqswnal_data.kqn_readyrxds) || - !list_empty(&kqswnal_data.kqn_delayedtxds) || - !list_empty(&kqswnal_data.kqn_delayedfwds)); - LASSERT (rc == 0); - } else if (current->need_resched) - schedule (); - - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); - } - } - - spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); - - kqswnal_thread_fini (); - return (0); + if (kqswnal_data.kqn_shuttingdown == 2) { + /* We only exit in stage 2 of shutdown + * when there's nothing left to do */ + break; + } + rc = wait_event_interruptible_exclusive ( + kqswnal_data.kqn_sched_waitq, + kqswnal_data.kqn_shuttingdown == 2 || + !cfs_list_empty(&kqswnal_data. \ + kqn_readyrxds) || + !cfs_list_empty(&kqswnal_data. \ + kqn_donetxds) || + !cfs_list_empty(&kqswnal_data. \ + kqn_delayedtxds)); + LASSERT (rc == 0); + } else if (need_resched()) + schedule (); + + spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, + flags); + } + } + + kqswnal_thread_fini (); + return 0; } - -nal_cb_t kqswnal_lib = -{ - nal_data: &kqswnal_data, /* NAL private data */ - cb_send: kqswnal_send, - cb_send_pages: kqswnal_send_pages, - cb_recv: kqswnal_recv, - cb_recv_pages: kqswnal_recv_pages, - cb_read: kqswnal_read, - cb_write: kqswnal_write, - cb_malloc: kqswnal_malloc, - cb_free: kqswnal_free, - cb_printf: kqswnal_printf, - cb_cli: kqswnal_cli, - cb_sti: kqswnal_sti, - cb_dist: kqswnal_dist -};