/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (C) 2002 Cluster File Systems, Inc.
- * Author: Eric Barton <eric@bartonsoftware.com>
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
*
- * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
- * W. Marcus Miller - Based on ksocknal
+ * Author: Eric Barton <eric@bartonsoftware.com>
*
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
+ * This file is part of Portals, http://www.lustre.org
*
* Portals is free software; you can redistribute it and/or
* modify it under the terms of version 2 of the GNU General Public
* You should have received a copy of the GNU General Public License
* along with Portals; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- *
*/
-#include "qswnal.h"
-
-/*
- * LIB functions follow
- *
- */
-static int
-kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
-{
- if (nid == nal->libnal_ni.ni_pid.nid)
- *dist = 0; /* it's me */
- else if (kqswnal_nid2elanid (nid) >= 0)
- *dist = 1; /* it's my peer */
- else
- *dist = 2; /* via router */
- return (0);
-}
+#include "qswlnd.h"
void
kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
{
- struct timeval now;
time_t then;
- do_gettimeofday (&now);
- then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
+ then = cfs_time_current_sec() -
+ cfs_duration_sec(cfs_time_current() -
+ ktx->ktx_launchtime);
- kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
+ lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then);
}
void
kqswnal_unmap_tx (kqswnal_tx_t *ktx)
{
-#if MULTIRAIL_EKC
int i;
ktx->ktx_rail = -1; /* unset rail */
-#endif
if (ktx->ktx_nmappedpages == 0)
return;
-#if MULTIRAIL_EKC
CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
ep_dvma_unload(kqswnal_data.kqn_ep,
kqswnal_data.kqn_ep_tx_nmh,
&ktx->ktx_frags[i]);
-#else
- CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
- ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
-
- LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
- LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
- kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
- elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
- kqswnal_data.kqn_eptxdmahandle,
- ktx->ktx_basepage, ktx->ktx_nmappedpages);
-#endif
ktx->ktx_nmappedpages = 0;
}
int
-kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
+kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob,
+ unsigned int niov, lnet_kiov_t *kiov)
{
int nfrags = ktx->ktx_nfrag;
int nmapped = ktx->ktx_nmappedpages;
int maxmapped = ktx->ktx_npages;
- uint32_t basepage = ktx->ktx_basepage + nmapped;
+ __u32 basepage = ktx->ktx_basepage + nmapped;
char *ptr;
-#if MULTIRAIL_EKC
+
EP_RAILMASK railmask;
int rail;
kqswnal_nid2elanid(ktx->ktx_nid));
rail = ktx->ktx_rail;
if (rail < 0) {
- CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+ CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
return (-ENETDOWN);
}
railmask = 1 << rail;
-#endif
+
LASSERT (nmapped <= maxmapped);
LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
LASSERT (nfrags <= EP_MAXFRAG);
"%p[%d] loading %p for %d, page %d, %d total\n",
ktx, nfrags, ptr, fraglen, basepage, nmapped);
-#if MULTIRAIL_EKC
ep_dvma_load(kqswnal_data.kqn_ep, NULL,
ptr, fraglen,
kqswnal_data.kqn_ep_tx_nmh, basepage,
/* new frag if this is the first or can't merge */
nfrags++;
}
-#else
- elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
- kqswnal_data.kqn_eptxdmahandle,
- ptr, fraglen,
- basepage, &ktx->ktx_frags[nfrags].Base);
-
- if (nfrags > 0 && /* previous frag mapped */
- ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
- (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
- /* just extend previous */
- ktx->ktx_frags[nfrags - 1].Len += fraglen;
- else {
- ktx->ktx_frags[nfrags].Len = fraglen;
- nfrags++; /* new frag */
- }
-#endif
kunmap (kiov->kiov_page);
return (0);
}
+#if KQSW_CKSUM
+__u32
+kqswnal_csum_kiov (__u32 csum, int offset, int nob,
+ unsigned int niov, lnet_kiov_t *kiov)
+{
+ char *ptr;
+
+ if (nob == 0)
+ return csum;
+
+ LASSERT (niov > 0);
+ LASSERT (nob > 0);
+
+ /* skip complete frags before 'offset' */
+ while (offset >= kiov->kiov_len) {
+ offset -= kiov->kiov_len;
+ kiov++;
+ niov--;
+ LASSERT (niov > 0);
+ }
+
+ do {
+ int fraglen = kiov->kiov_len - offset;
+
+ /* each page frag is contained in one page */
+ LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
+
+ if (fraglen > nob)
+ fraglen = nob;
+
+ ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
+
+ csum = kqswnal_csum(csum, ptr, fraglen);
+
+ kunmap (kiov->kiov_page);
+
+ kiov++;
+ niov--;
+ nob -= fraglen;
+ offset = 0;
+
+ /* iov must not run out before end of data */
+ LASSERT (nob == 0 || niov > 0);
+
+ } while (nob > 0);
+
+ return csum;
+}
+#endif
+
int
kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
- int niov, struct iovec *iov)
+ unsigned int niov, struct iovec *iov)
{
int nfrags = ktx->ktx_nfrag;
int nmapped = ktx->ktx_nmappedpages;
int maxmapped = ktx->ktx_npages;
- uint32_t basepage = ktx->ktx_basepage + nmapped;
-#if MULTIRAIL_EKC
+ __u32 basepage = ktx->ktx_basepage + nmapped;
+
EP_RAILMASK railmask;
int rail;
kqswnal_nid2elanid(ktx->ktx_nid));
rail = ktx->ktx_rail;
if (rail < 0) {
- CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+ CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
return (-ENETDOWN);
}
railmask = 1 << rail;
-#endif
+
LASSERT (nmapped <= maxmapped);
LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
LASSERT (nfrags <= EP_MAXFRAG);
ktx, nfrags, iov->iov_base + offset, fraglen,
basepage, npages, nmapped);
-#if MULTIRAIL_EKC
ep_dvma_load(kqswnal_data.kqn_ep, NULL,
iov->iov_base + offset, fraglen,
kqswnal_data.kqn_ep_tx_nmh, basepage,
/* new frag if this is the first or can't merge */
nfrags++;
}
-#else
- elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
- kqswnal_data.kqn_eptxdmahandle,
- iov->iov_base + offset, fraglen,
- basepage, &ktx->ktx_frags[nfrags].Base);
-
- if (nfrags > 0 && /* previous frag mapped */
- ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
- (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
- /* just extend previous */
- ktx->ktx_frags[nfrags - 1].Len += fraglen;
- else {
- ktx->ktx_frags[nfrags].Len = fraglen;
- nfrags++; /* new frag */
- }
-#endif
/* keep in loop for failure case */
ktx->ktx_nmappedpages = nmapped;
return (0);
}
+#if KQSW_CKSUM
+__u32
+kqswnal_csum_iov (__u32 csum, int offset, int nob,
+ unsigned int niov, struct iovec *iov)
+{
+ if (nob == 0)
+ return csum;
+
+ LASSERT (niov > 0);
+ LASSERT (nob > 0);
+
+ /* skip complete frags before offset */
+ while (offset >= iov->iov_len) {
+ offset -= iov->iov_len;
+ iov++;
+ niov--;
+ LASSERT (niov > 0);
+ }
+
+ do {
+ int fraglen = iov->iov_len - offset;
+
+ if (fraglen > nob)
+ fraglen = nob;
+
+ csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen);
+
+ iov++;
+ niov--;
+ nob -= fraglen;
+ offset = 0;
+
+ /* iov must not run out before end of data */
+ LASSERT (nob == 0 || niov > 0);
+
+ } while (nob > 0);
+
+ return csum;
+}
+#endif
void
kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
{
- kpr_fwd_desc_t *fwd = NULL;
unsigned long flags;
kqswnal_unmap_tx (ktx); /* release temporary mappings */
spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
list_del (&ktx->ktx_list); /* take off active list */
-
- if (ktx->ktx_isnblk) {
- /* reserved for non-blocking tx */
- list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
- spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
- return;
- }
-
list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
- /* anything blocking for a tx descriptor? */
- if (!kqswnal_data.kqn_shuttingdown &&
- !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
- {
- CDEBUG(D_NET,"wakeup fwd\n");
-
- fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
- kpr_fwd_desc_t, kprfd_list);
- list_del (&fwd->kprfd_list);
- }
-
- wake_up (&kqswnal_data.kqn_idletxd_waitq);
-
spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
-
- if (fwd == NULL)
- return;
-
- /* schedule packet for forwarding again */
- spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
-
- list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
- wake_up (&kqswnal_data.kqn_sched_waitq);
-
- spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
}
kqswnal_tx_t *
-kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
+kqswnal_get_idle_tx (void)
{
unsigned long flags;
- kqswnal_tx_t *ktx = NULL;
-
- for (;;) {
- spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
-
- if (kqswnal_data.kqn_shuttingdown)
- break;
-
- /* "normal" descriptor is free */
- if (!list_empty (&kqswnal_data.kqn_idletxds)) {
- ktx = list_entry (kqswnal_data.kqn_idletxds.next,
- kqswnal_tx_t, ktx_list);
- break;
- }
-
- if (fwd != NULL) /* forwarded packet? */
- break;
-
- /* doing a local transmit */
- if (!may_block) {
- if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
- CERROR ("intr tx desc pool exhausted\n");
- break;
- }
+ kqswnal_tx_t *ktx;
- ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
- kqswnal_tx_t, ktx_list);
- break;
- }
-
- /* block for idle tx */
+ spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
+ if (kqswnal_data.kqn_shuttingdown ||
+ list_empty (&kqswnal_data.kqn_idletxds)) {
spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
- CDEBUG (D_NET, "blocking for tx desc\n");
- wait_event (kqswnal_data.kqn_idletxd_waitq,
- !list_empty (&kqswnal_data.kqn_idletxds) ||
- kqswnal_data.kqn_shuttingdown);
+ return NULL;
}
- if (ktx != NULL) {
- list_del (&ktx->ktx_list);
- list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
- ktx->ktx_launcher = current->pid;
- atomic_inc(&kqswnal_data.kqn_pending_txs);
- } else if (fwd != NULL) {
- /* queue forwarded packet until idle txd available */
- CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
- list_add_tail (&fwd->kprfd_list,
- &kqswnal_data.kqn_idletxd_fwdq);
- }
+ ktx = list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t, ktx_list);
+ list_del (&ktx->ktx_list);
+
+ list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
+ ktx->ktx_launcher = current->pid;
+ atomic_inc(&kqswnal_data.kqn_pending_txs);
spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
/* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
- LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
-
+ LASSERT (ktx->ktx_nmappedpages == 0);
return (ktx);
}
void
-kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
+kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
{
+ lnet_msg_t *lnetmsg0 = NULL;
+ lnet_msg_t *lnetmsg1 = NULL;
+ int status0 = 0;
+ int status1 = 0;
+ kqswnal_rx_t *krx;
+
+ LASSERT (!in_interrupt());
+
+ if (ktx->ktx_status == -EHOSTDOWN)
+ kqswnal_notify_peer_down(ktx);
+
switch (ktx->ktx_state) {
- case KTX_FORWARDING: /* router asked me to forward this packet */
- kpr_fwd_done (&kqswnal_data.kqn_router,
- (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
+ case KTX_RDMA_FETCH: /* optimized PUT/REPLY handled */
+ krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+ lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
+ status0 = ktx->ktx_status;
+#if KQSW_CKSUM
+ if (status0 == 0) { /* RDMA succeeded */
+ kqswnal_msg_t *msg;
+ __u32 csum;
+
+ msg = (kqswnal_msg_t *)
+ page_address(krx->krx_kiov[0].kiov_page);
+
+ csum = (lnetmsg0->msg_kiov != NULL) ?
+ kqswnal_csum_kiov(krx->krx_cksum,
+ lnetmsg0->msg_offset,
+ lnetmsg0->msg_wanted,
+ lnetmsg0->msg_niov,
+ lnetmsg0->msg_kiov) :
+ kqswnal_csum_iov(krx->krx_cksum,
+ lnetmsg0->msg_offset,
+ lnetmsg0->msg_wanted,
+ lnetmsg0->msg_niov,
+ lnetmsg0->msg_iov);
+
+ /* Can only check csum if I got it all */
+ if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&
+ csum != msg->kqm_cksum) {
+ ktx->ktx_status = -EIO;
+ krx->krx_rpc_reply.msg.status = -EIO;
+ CERROR("RDMA checksum failed %u(%u) from %s\n",
+ csum, msg->kqm_cksum,
+ libcfs_nid2str(kqswnal_rx_nid(krx)));
+ }
+ }
+#endif
+ LASSERT (krx->krx_state == KRX_COMPLETING);
+ kqswnal_rx_decref (krx);
break;
- case KTX_RDMAING: /* optimized GET/PUT handled */
+ case KTX_RDMA_STORE: /* optimized GET handled */
case KTX_PUTTING: /* optimized PUT sent */
case KTX_SENDING: /* normal send */
- lib_finalize (&kqswnal_lib, NULL,
- (lib_msg_t *)ktx->ktx_args[1],
- (error == 0) ? PTL_OK : PTL_FAIL);
+ lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
+ status0 = ktx->ktx_status;
break;
- case KTX_GETTING: /* optimized GET sent & REPLY received */
+ case KTX_GETTING: /* optimized GET sent & payload received */
/* Complete the GET with success since we can't avoid
* delivering a REPLY event; we committed to it when we
* launched the GET */
- lib_finalize (&kqswnal_lib, NULL,
- (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
- lib_finalize (&kqswnal_lib, NULL,
- (lib_msg_t *)ktx->ktx_args[2],
- (error == 0) ? PTL_OK : PTL_FAIL);
+ lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
+ status0 = 0;
+ lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];
+ status1 = ktx->ktx_status;
+#if KQSW_CKSUM
+ if (status1 == 0) { /* RDMA succeeded */
+ lnet_msg_t *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
+ lnet_libmd_t *md = lnetmsg0->msg_md;
+ __u32 csum;
+
+ csum = ((md->md_options & LNET_MD_KIOV) != 0) ?
+ kqswnal_csum_kiov(~0, 0,
+ md->md_length,
+ md->md_niov,
+ md->md_iov.kiov) :
+ kqswnal_csum_iov(~0, 0,
+ md->md_length,
+ md->md_niov,
+ md->md_iov.iov);
+
+ if (csum != ktx->ktx_cksum) {
+ CERROR("RDMA checksum failed %u(%u) from %s\n",
+ csum, ktx->ktx_cksum,
+ libcfs_nid2str(ktx->ktx_nid));
+ status1 = -EIO;
+ }
+ }
+#endif
break;
default:
}
kqswnal_put_idle_tx (ktx);
+
+ lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);
+ if (lnetmsg1 != NULL)
+ lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);
+}
+
+void
+kqswnal_tx_done (kqswnal_tx_t *ktx, int status)
+{
+ unsigned long flags;
+
+ ktx->ktx_status = status;
+
+ if (!in_interrupt()) {
+ kqswnal_tx_done_in_thread_context(ktx);
+ return;
+ }
+
+ /* Complete the send in thread context */
+ spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
+
+ list_add_tail(&ktx->ktx_schedlist,
+ &kqswnal_data.kqn_donetxds);
+ wake_up(&kqswnal_data.kqn_sched_waitq);
+
+ spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
}
static void
kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
{
- kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
+ kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
+ kqswnal_rpc_reply_t *reply;
LASSERT (txd != NULL);
LASSERT (ktx != NULL);
if (status != EP_SUCCESS) {
- CERROR ("Tx completion to "LPX64" failed: %d\n",
- ktx->ktx_nid, status);
+ CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n",
+ libcfs_nid2str(ktx->ktx_nid), status);
- kqswnal_notify_peer_down(ktx);
status = -EHOSTDOWN;
} else switch (ktx->ktx_state) {
case KTX_GETTING:
case KTX_PUTTING:
- /* RPC completed OK; but what did our peer put in the status
- * block? */
-#if MULTIRAIL_EKC
- status = ep_txd_statusblk(txd)->Data[0];
-#else
- status = ep_txd_statusblk(txd)->Status;
+ /* RPC complete! */
+ reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);
+ if (reply->msg.magic == 0) { /* "old" peer */
+ status = reply->msg.status;
+ break;
+ }
+
+ if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {
+ if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {
+ CERROR("%s unexpected rpc reply magic %08x\n",
+ libcfs_nid2str(ktx->ktx_nid),
+ reply->msg.magic);
+ status = -EPROTO;
+ break;
+ }
+
+ __swab32s(&reply->msg.status);
+ __swab32s(&reply->msg.version);
+
+ if (ktx->ktx_state == KTX_GETTING) {
+ __swab32s(&reply->msg.u.get.len);
+ __swab32s(&reply->msg.u.get.cksum);
+ }
+ }
+
+ status = reply->msg.status;
+ if (status != 0) {
+ CERROR("%s RPC status %08x\n",
+ libcfs_nid2str(ktx->ktx_nid), status);
+ break;
+ }
+
+ if (ktx->ktx_state == KTX_GETTING) {
+ lnet_set_reply_msg_len(kqswnal_data.kqn_ni,
+ (lnet_msg_t *)ktx->ktx_args[2],
+ reply->msg.u.get.len);
+#if KQSW_CKSUM
+ ktx->ktx_cksum = reply->msg.u.get.cksum;
#endif
+ }
break;
- case KTX_FORWARDING:
case KTX_SENDING:
status = 0;
break;
break;
}
- kqswnal_tx_done (ktx, status);
+ kqswnal_tx_done(ktx, status);
}
int
unsigned long flags;
int rc;
- ktx->ktx_launchtime = jiffies;
+ ktx->ktx_launchtime = cfs_time_current();
if (kqswnal_data.kqn_shuttingdown)
return (-ESHUTDOWN);
LASSERT (dest >= 0); /* must be a peer */
-#if MULTIRAIL_EKC
if (ktx->ktx_nmappedpages != 0)
attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
-#endif
switch (ktx->ktx_state) {
case KTX_GETTING:
case KTX_PUTTING:
+ if (the_lnet.ln_testprotocompat != 0) {
+ kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
+
+ /* single-shot proto test:
+ * Future version queries will use an RPC, so I'll
+ * co-opt one of the existing ones */
+ LNET_LOCK();
+ if ((the_lnet.ln_testprotocompat & 1) != 0) {
+ msg->kqm_version++;
+ the_lnet.ln_testprotocompat &= ~1;
+ }
+ if ((the_lnet.ln_testprotocompat & 2) != 0) {
+ msg->kqm_magic = LNET_PROTO_MAGIC;
+ the_lnet.ln_testprotocompat &= ~2;
+ }
+ LNET_UNLOCK();
+ }
+
/* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
* The other frags are the payload, awaiting RDMA */
rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
NULL, ktx->ktx_frags, 1);
break;
- case KTX_FORWARDING:
case KTX_SENDING:
-#if MULTIRAIL_EKC
rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
ktx->ktx_port, attr,
kqswnal_txhandler, ktx,
NULL, ktx->ktx_frags, ktx->ktx_nfrag);
-#else
- rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
- ktx->ktx_port, attr,
- kqswnal_txhandler, ktx,
- ktx->ktx_frags, ktx->ktx_nfrag);
-#endif
break;
default:
case EP_ENOMEM: /* can't allocate ep txd => queue for later */
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
- list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
+ list_add_tail (&ktx->ktx_schedlist, &kqswnal_data.kqn_delayedtxds);
wake_up (&kqswnal_data.kqn_sched_waitq);
spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
return (0);
default: /* fatal error */
- CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
+ CDEBUG (D_NETERROR, "Tx to %s failed: %d\n", libcfs_nid2str(ktx->ktx_nid), rc);
kqswnal_notify_peer_down(ktx);
return (-EHOSTUNREACH);
}
#if 0
static char *
-hdr_type_string (ptl_hdr_t *hdr)
+hdr_type_string (lnet_hdr_t *hdr)
{
switch (hdr->type) {
- case PTL_MSG_ACK:
+ case LNET_MSG_ACK:
return ("ACK");
- case PTL_MSG_PUT:
+ case LNET_MSG_PUT:
return ("PUT");
- case PTL_MSG_GET:
+ case LNET_MSG_GET:
return ("GET");
- case PTL_MSG_REPLY:
+ case LNET_MSG_REPLY:
return ("REPLY");
default:
return ("<UNKNOWN>");
}
static void
-kqswnal_cerror_hdr(ptl_hdr_t * hdr)
+kqswnal_cerror_hdr(lnet_hdr_t * hdr)
{
char *type_str = hdr_type_string (hdr);
le32_to_cpu(hdr->dest_pid));
switch (le32_to_cpu(hdr->type)) {
- case PTL_MSG_PUT:
+ case LNET_MSG_PUT:
CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
"match bits "LPX64"\n",
le32_to_cpu(hdr->msg.put.ptl_index),
hdr->msg.put.hdr_data);
break;
- case PTL_MSG_GET:
+ case LNET_MSG_GET:
CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
"match bits "LPX64"\n",
le32_to_cpu(hdr->msg.get.ptl_index),
le32_to_cpu(hdr->msg.get.src_offset));
break;
- case PTL_MSG_ACK:
+ case LNET_MSG_ACK:
CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
hdr->msg.ack.dst_wmd.wh_interface_cookie,
hdr->msg.ack.dst_wmd.wh_object_cookie,
le32_to_cpu(hdr->msg.ack.mlength));
break;
- case PTL_MSG_REPLY:
+ case LNET_MSG_REPLY:
CERROR(" dst md "LPX64"."LPX64"\n",
hdr->msg.reply.dst_wmd.wh_interface_cookie,
hdr->msg.reply.dst_wmd.wh_object_cookie);
} /* end of print_hdr() */
#endif
-#if !MULTIRAIL_EKC
-void
-kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
-{
- int i;
-
- CDEBUG (how, "%s: %d\n", str, n);
- for (i = 0; i < n; i++) {
- CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len);
- }
-}
-
-int
-kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
- int nsrc, EP_IOVEC *src,
- int ndst, EP_IOVEC *dst)
-{
- int count;
- int nob;
-
- LASSERT (ndv > 0);
- LASSERT (nsrc > 0);
- LASSERT (ndst > 0);
-
- for (count = 0; count < ndv; count++, dv++) {
-
- if (nsrc == 0 || ndst == 0) {
- if (nsrc != ndst) {
- /* For now I'll barf on any left over entries */
- CERROR ("mismatched src and dst iovs\n");
- return (-EINVAL);
- }
- return (count);
- }
-
- nob = (src->Len < dst->Len) ? src->Len : dst->Len;
- dv->Len = nob;
- dv->Source = src->Base;
- dv->Dest = dst->Base;
-
- if (nob >= src->Len) {
- src++;
- nsrc--;
- } else {
- src->Len -= nob;
- src->Base += nob;
- }
-
- if (nob >= dst->Len) {
- dst++;
- ndst--;
- } else {
- src->Len -= nob;
- src->Base += nob;
- }
- }
-
- CERROR ("DATAVEC too small\n");
- return (-E2BIG);
-}
-#else
int
kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
int nrfrag, EP_NMD *rfrag)
return (0);
}
-#endif
kqswnal_remotemd_t *
-kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
+kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx)
{
+ /* Check that the RMD sent after the "raw" LNET header in a
+ * portals-compatible QSWLND message is OK */
char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
- ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
- kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
- ptl_nid_t nid = kqswnal_rx_nid(krx);
-
- /* Note (1) lib_parse has already flipped hdr.
- * (2) RDMA addresses are sent in native endian-ness. When
- * EKC copes with different endian nodes, I'll fix this (and
- * eat my hat :) */
-
- LASSERT (krx->krx_nob >= sizeof(*hdr));
-
- if (hdr->type != type) {
- CERROR ("Unexpected optimized get/put type %d (%d expected)"
- "from "LPX64"\n", hdr->type, type, nid);
- return (NULL);
- }
-
- if (hdr->src_nid != nid) {
- CERROR ("Unexpected optimized get/put source NID "
- LPX64" from "LPX64"\n", hdr->src_nid, nid);
- return (NULL);
- }
+ kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));
- LASSERT (nid == expected_nid);
+ /* Note RDMA addresses are sent in native endian-ness in the "old"
+ * portals protocol so no swabbing... */
if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
/* msg too small to discover rmd size */
CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
"rxd %p, ktx %p, status %d\n", rxd, ktx, status);
- LASSERT (ktx->ktx_state == KTX_RDMAING);
+ LASSERT (ktx->ktx_state == KTX_RDMA_STORE);
LASSERT (krx->krx_rxd == rxd);
LASSERT (krx->krx_rpc_reply_needed);
krx->krx_rpc_reply_needed = 0;
kqswnal_rx_decref (krx);
- /* free ktx & finalize() its lib_msg_t */
+ /* free ktx & finalize() its lnet_msg_t */
kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
}
void
kqswnal_rdma_fetch_complete (EP_RXD *rxd)
{
- /* Completed fetching the PUT data */
+ /* Completed fetching the PUT/REPLY data */
int status = ep_rxd_status(rxd);
kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
- unsigned long flags;
CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
"rxd %p, ktx %p, status %d\n", rxd, ktx, status);
- LASSERT (ktx->ktx_state == KTX_RDMAING);
+ LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);
LASSERT (krx->krx_rxd == rxd);
+ /* RPC completes with failure by default */
LASSERT (krx->krx_rpc_reply_needed);
+ LASSERT (krx->krx_rpc_reply.msg.status != 0);
- /* Set the RPC completion status */
- status = (status == EP_SUCCESS) ? 0 : -ECONNABORTED;
- krx->krx_rpc_reply_status = status;
-
- /* free ktx & finalize() its lib_msg_t */
- kqswnal_tx_done(ktx, status);
-
- if (!in_interrupt()) {
- /* OK to complete the RPC now (iff I had the last ref) */
- kqswnal_rx_decref (krx);
- return;
+ if (status == EP_SUCCESS) {
+ krx->krx_rpc_reply.msg.status = 0;
+ status = 0;
+ } else {
+ /* Abandon RPC since get failed */
+ krx->krx_rpc_reply_needed = 0;
+ status = -ECONNABORTED;
}
+ /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */
LASSERT (krx->krx_state == KRX_PARSE);
krx->krx_state = KRX_COMPLETING;
- /* Complete the RPC in thread context */
- spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
-
- list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
- wake_up (&kqswnal_data.kqn_sched_waitq);
-
- spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+ /* free ktx & finalize() its lnet_msg_t */
+ kqswnal_tx_done(ktx, status);
}
int
-kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
- int niov, struct iovec *iov, ptl_kiov_t *kiov,
- size_t offset, size_t len)
+kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,
+ int type, kqswnal_remotemd_t *rmd,
+ unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
+ unsigned int offset, unsigned int len)
{
- kqswnal_remotemd_t *rmd;
kqswnal_tx_t *ktx;
int eprc;
int rc;
-#if !MULTIRAIL_EKC
- EP_DATAVEC datav[EP_MAXFRAG];
- int ndatav;
-#endif
- LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
/* Not both mapped and paged payload */
LASSERT (iov == NULL || kiov == NULL);
/* RPC completes with failure by default */
LASSERT (krx->krx_rpc_reply_needed);
- LASSERT (krx->krx_rpc_reply_status != 0);
-
- rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
- if (rmd == NULL)
- return (-EPROTO);
+ LASSERT (krx->krx_rpc_reply.msg.status != 0);
if (len == 0) {
/* data got truncated to nothing. */
- lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
+ lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);
/* Let kqswnal_rx_done() complete the RPC with success */
- krx->krx_rpc_reply_status = 0;
+ krx->krx_rpc_reply.msg.status = 0;
return (0);
}
/* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
actually sending a portals message with it */
- ktx = kqswnal_get_idle_tx(NULL, 0);
+ ktx = kqswnal_get_idle_tx();
if (ktx == NULL) {
- CERROR ("Can't get txd for RDMA with "LPX64"\n",
- libmsg->ev.initiator.nid);
+ CERROR ("Can't get txd for RDMA with %s\n",
+ libcfs_nid2str(kqswnal_rx_nid(krx)));
return (-ENOMEM);
}
- ktx->ktx_state = KTX_RDMAING;
- ktx->ktx_nid = libmsg->ev.initiator.nid;
+ ktx->ktx_state = type;
+ ktx->ktx_nid = kqswnal_rx_nid(krx);
ktx->ktx_args[0] = krx;
- ktx->ktx_args[1] = libmsg;
+ ktx->ktx_args[1] = lntmsg;
+
+ LASSERT (atomic_read(&krx->krx_refcount) > 0);
+ /* Take an extra ref for the completion callback */
+ atomic_inc(&krx->krx_refcount);
-#if MULTIRAIL_EKC
/* Map on the rail the RPC prefers */
ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
ep_rxd_railmask(krx->krx_rxd));
-#endif
/* Start mapping at offset 0 (we're not mapping any headers) */
ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
goto out;
}
-#if MULTIRAIL_EKC
rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
rmd->kqrmd_nfrag, rmd->kqrmd_frag);
if (rc != 0) {
CERROR ("Incompatible RDMA descriptors\n");
goto out;
}
-#else
+
switch (type) {
default:
LBUG();
-
- case PTL_MSG_GET:
- ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
- ktx->ktx_nfrag, ktx->ktx_frags,
- rmd->kqrmd_nfrag, rmd->kqrmd_frag);
- break;
-
- case PTL_MSG_PUT:
- ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
- rmd->kqrmd_nfrag, rmd->kqrmd_frag,
- ktx->ktx_nfrag, ktx->ktx_frags);
- break;
- }
- if (ndatav < 0) {
- CERROR ("Can't create datavec: %d\n", ndatav);
- rc = ndatav;
- goto out;
- }
+ case KTX_RDMA_STORE:
+ krx->krx_rpc_reply.msg.status = 0;
+ krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC;
+ krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
+ krx->krx_rpc_reply.msg.u.get.len = len;
+#if KQSW_CKSUM
+ krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?
+ kqswnal_csum_kiov(~0, offset, len, niov, kiov) :
+ kqswnal_csum_iov(~0, offset, len, niov, iov);
+ if (*kqswnal_tunables.kqn_inject_csum_error == 4) {
+ krx->krx_rpc_reply.msg.u.get.cksum++;
+ *kqswnal_tunables.kqn_inject_csum_error = 0;
+ }
#endif
-
- LASSERT (atomic_read(&krx->krx_refcount) > 0);
- /* Take an extra ref for the completion callback */
- atomic_inc(&krx->krx_refcount);
-
- switch (type) {
- default:
- LBUG();
-
- case PTL_MSG_GET:
-#if MULTIRAIL_EKC
eprc = ep_complete_rpc(krx->krx_rxd,
kqswnal_rdma_store_complete, ktx,
- &kqswnal_data.kqn_rpc_success,
- ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
-#else
- eprc = ep_complete_rpc (krx->krx_rxd,
- kqswnal_rdma_store_complete, ktx,
- &kqswnal_data.kqn_rpc_success,
- datav, ndatav);
- if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
- krx->krx_rxd = NULL;
-#endif
+ &krx->krx_rpc_reply.ep_statusblk,
+ ktx->ktx_frags, rmd->kqrmd_frag,
+ rmd->kqrmd_nfrag);
if (eprc != EP_SUCCESS) {
CERROR("can't complete RPC: %d\n", eprc);
/* don't re-attempt RPC completion */
}
break;
- case PTL_MSG_PUT:
-#if MULTIRAIL_EKC
+ case KTX_RDMA_FETCH:
eprc = ep_rpc_get (krx->krx_rxd,
kqswnal_rdma_fetch_complete, ktx,
rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
-#else
- eprc = ep_rpc_get (krx->krx_rxd,
- kqswnal_rdma_fetch_complete, ktx,
- datav, ndatav);
-#endif
if (eprc != EP_SUCCESS) {
CERROR("ep_rpc_get failed: %d\n", eprc);
+ /* Don't attempt RPC completion:
+ * EKC nuked it when the get failed */
+ krx->krx_rpc_reply_needed = 0;
rc = -ECONNABORTED;
}
break;
return (rc);
}
-static ptl_err_t
-kqswnal_sendmsg (lib_nal_t *nal,
- void *private,
- lib_msg_t *libmsg,
- ptl_hdr_t *hdr,
- int type,
- ptl_nid_t nid,
- ptl_pid_t pid,
- unsigned int payload_niov,
- struct iovec *payload_iov,
- ptl_kiov_t *payload_kiov,
- size_t payload_offset,
- size_t payload_nob)
+int
+kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
{
- kqswnal_tx_t *ktx;
- int rc;
- ptl_nid_t targetnid;
-#if KQSW_CHECKSUM
- int i;
- kqsw_csum_t csum;
- int sumoff;
- int sumnob;
-#endif
+ lnet_hdr_t *hdr = &lntmsg->msg_hdr;
+ int type = lntmsg->msg_type;
+ lnet_process_id_t target = lntmsg->msg_target;
+ int target_is_router = lntmsg->msg_target_is_router;
+ int routing = lntmsg->msg_routing;
+ unsigned int payload_niov = lntmsg->msg_niov;
+ struct iovec *payload_iov = lntmsg->msg_iov;
+ lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
+ unsigned int payload_offset = lntmsg->msg_offset;
+ unsigned int payload_nob = lntmsg->msg_len;
+ int nob;
+ kqswnal_tx_t *ktx;
+ int rc;
+
/* NB 1. hdr is in network byte order */
/* 2. 'private' depends on the message type */
- CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
- " pid %u\n", payload_nob, payload_niov, nid, pid);
+ CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
+ payload_nob, payload_niov, libcfs_id2str(target));
LASSERT (payload_nob == 0 || payload_niov > 0);
- LASSERT (payload_niov <= PTL_MD_MAX_IOV);
+ LASSERT (payload_niov <= LNET_MAX_IOV);
/* It must be OK to kmap() if required */
LASSERT (payload_kiov == NULL || !in_interrupt ());
/* payload is either all vaddrs or all pages */
LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
- if (payload_nob > KQSW_MAXPAYLOAD) {
- CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
- payload_nob, KQSW_MAXPAYLOAD);
- return (PTL_FAIL);
- }
-
- if (type == PTL_MSG_REPLY && /* can I look in 'private' */
- ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
- /* Must be a REPLY for an optimized GET */
- rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
- payload_niov, payload_iov, payload_kiov,
- payload_offset, payload_nob);
- return ((rc == 0) ? PTL_OK : PTL_FAIL);
- }
-
- targetnid = nid;
- if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
- rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
- sizeof (ptl_hdr_t) + payload_nob, &targetnid);
- if (rc != 0) {
- CERROR("Can't route to "LPX64": router error %d\n",
- nid, rc);
- return (PTL_FAIL);
- }
- if (kqswnal_nid2elanid (targetnid) < 0) {
- CERROR("Bad gateway "LPX64" for "LPX64"\n",
- targetnid, nid);
- return (PTL_FAIL);
- }
+ if (kqswnal_nid2elanid (target.nid) < 0) {
+ CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
+ return -EIO;
}
/* I may not block for a transmit descriptor if I might block the
- * receiver, or an interrupt handler. */
- ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
- type == PTL_MSG_REPLY ||
- in_interrupt()));
+ * router, receiver, or an interrupt handler. */
+ ktx = kqswnal_get_idle_tx();
if (ktx == NULL) {
- CERROR ("Can't get txd for msg type %d for "LPX64"\n",
- type, libmsg->ev.initiator.nid);
- return (PTL_NO_SPACE);
+ CERROR ("Can't get txd for msg type %d for %s\n",
+ type, libcfs_nid2str(target.nid));
+ return (-ENOMEM);
}
ktx->ktx_state = KTX_SENDING;
- ktx->ktx_nid = targetnid;
+ ktx->ktx_nid = target.nid;
ktx->ktx_args[0] = private;
- ktx->ktx_args[1] = libmsg;
+ ktx->ktx_args[1] = lntmsg;
ktx->ktx_args[2] = NULL; /* set when a GET commits to REPLY */
- memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
-
-#if KQSW_CHECKSUM
- csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
- memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
- for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
- LASSERT(i < niov);
- if (payload_kiov != NULL) {
- ptl_kiov_t *kiov = &payload_kiov[i];
-
- if (sumoff >= kiov->kiov_len) {
- sumoff -= kiov->kiov_len;
- } else {
- char *addr = ((char *)kmap (kiov->kiov_page)) +
- kiov->kiov_offset + sumoff;
- int fragnob = kiov->kiov_len - sumoff;
-
- csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
- sumnob -= fragnob;
- sumoff = 0;
- kunmap(kiov->kiov_page);
- }
- } else {
- struct iovec *iov = &payload_iov[i];
-
- if (sumoff > iov->iov_len) {
- sumoff -= iov->iov_len;
- } else {
- char *addr = iov->iov_base + sumoff;
- int fragnob = iov->iov_len - sumoff;
-
- csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
- sumnob -= fragnob;
- sumoff = 0;
- }
- }
- }
- memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
-#endif
-
- /* The first frag will be the pre-mapped buffer for (at least) the
- * portals header. */
+ /* The first frag will be the pre-mapped buffer. */
ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- if (nid == targetnid && /* not forwarding */
- ((type == PTL_MSG_GET && /* optimize GET? */
- kqswnal_tunables.kqn_optimized_gets != 0 &&
- le32_to_cpu(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
- (type == PTL_MSG_PUT && /* optimize PUT? */
- kqswnal_tunables.kqn_optimized_puts != 0 &&
- payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
- lib_md_t *md = libmsg->md;
- kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
-
+ if ((!target_is_router && /* target.nid is final dest */
+ !routing && /* I'm the source */
+ type == LNET_MSG_GET && /* optimize GET? */
+ *kqswnal_tunables.kqn_optimized_gets != 0 &&
+ lntmsg->msg_md->md_length >=
+ *kqswnal_tunables.kqn_optimized_gets) ||
+ ((type == LNET_MSG_PUT || /* optimize PUT? */
+ type == LNET_MSG_REPLY) && /* optimize REPLY? */
+ *kqswnal_tunables.kqn_optimized_puts != 0 &&
+ payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {
+ lnet_libmd_t *md = lntmsg->msg_md;
+ kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
+ lnet_hdr_t *mhdr;
+ kqswnal_remotemd_t *rmd;
+
/* Optimised path: I send over the Elan vaddrs of the local
* buffers, and my peer DMAs directly to/from them.
*
* First I set up ktx as if it was going to send this
* payload, (it needs to map it anyway). This fills
* ktx_frags[1] and onward with the network addresses
- * of the GET sink frags. I copy these into ktx_buffer,
- * immediately after the header, and send that as my
- * message. */
+ * of the buffer frags. */
- ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
+ /* Send an RDMA message */
+ msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
+ msg->kqm_version = QSWLND_PROTO_VERSION;
+ msg->kqm_type = QSWLND_MSG_RDMA;
+
+ mhdr = &msg->kqm_u.rdma.kqrm_hdr;
+ rmd = &msg->kqm_u.rdma.kqrm_rmd;
+
+ *mhdr = *hdr;
+ nob = (((char *)rmd) - ktx->ktx_buffer);
+
+ if (type == LNET_MSG_GET) {
+ if ((md->md_options & LNET_MD_KIOV) != 0)
+ rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,
+ md->md_niov, md->md_iov.kiov);
+ else
+ rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,
+ md->md_niov, md->md_iov.iov);
+ ktx->ktx_state = KTX_GETTING;
+ } else {
+ if (payload_kiov != NULL)
+ rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,
+ payload_niov, payload_kiov);
+ else
+ rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,
+ payload_niov, payload_iov);
+ ktx->ktx_state = KTX_PUTTING;
+ }
- if ((libmsg->md->options & PTL_MD_KIOV) != 0)
- rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
- md->md_niov, md->md_iov.kiov);
- else
- rc = kqswnal_map_tx_iov (ktx, 0, md->length,
- md->md_niov, md->md_iov.iov);
if (rc != 0)
goto out;
rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
+ nob += offsetof(kqswnal_remotemd_t,
+ kqrmd_frag[rmd->kqrmd_nfrag]);
+ LASSERT (nob <= KQSW_TX_BUFFER_SIZE);
- payload_nob = offsetof(kqswnal_remotemd_t,
- kqrmd_frag[rmd->kqrmd_nfrag]);
- LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
-
-#if MULTIRAIL_EKC
memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
rmd->kqrmd_nfrag * sizeof(EP_NMD));
- ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
- 0, KQSW_HDR_SIZE + payload_nob);
-#else
- memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
- rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
-
- ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
- ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
+#if KQSW_CKSUM
+ msg->kqm_nob = nob + payload_nob;
+ msg->kqm_cksum = 0;
+ msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
#endif
- if (type == PTL_MSG_GET) {
+ if (type == LNET_MSG_GET) {
/* Allocate reply message now while I'm in thread context */
- ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
- nid, libmsg);
+ ktx->ktx_args[2] = lnet_create_reply_msg (
+ kqswnal_data.kqn_ni, lntmsg);
if (ktx->ktx_args[2] == NULL)
goto out;
/* NB finalizing the REPLY message is my
* responsibility now, whatever happens. */
+#if KQSW_CKSUM
+ if (*kqswnal_tunables.kqn_inject_csum_error == 3) {
+ msg->kqm_cksum++;
+ *kqswnal_tunables.kqn_inject_csum_error = 0;
+ }
+
+ } else if (payload_kiov != NULL) {
+ /* must checksum payload after header so receiver can
+ * compute partial header cksum before swab. Sadly
+ * this causes 2 rounds of kmap */
+ msg->kqm_cksum =
+ kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,
+ payload_niov, payload_kiov);
+ if (*kqswnal_tunables.kqn_inject_csum_error == 2) {
+ msg->kqm_cksum++;
+ *kqswnal_tunables.kqn_inject_csum_error = 0;
+ }
+ } else {
+ msg->kqm_cksum =
+ kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,
+ payload_niov, payload_iov);
+ if (*kqswnal_tunables.kqn_inject_csum_error == 2) {
+ msg->kqm_cksum++;
+ *kqswnal_tunables.kqn_inject_csum_error = 0;
+ }
+#endif
}
- } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
+ } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {
+ lnet_hdr_t *mhdr;
+ char *payload;
+ kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
- /* small message: single frag copied into the pre-mapped buffer */
+ /* single frag copied into the pre-mapped buffer */
+ msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
+ msg->kqm_version = QSWLND_PROTO_VERSION;
+ msg->kqm_type = QSWLND_MSG_IMMEDIATE;
-#if MULTIRAIL_EKC
- ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
- 0, KQSW_HDR_SIZE + payload_nob);
-#else
- ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
- ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
-#endif
- if (payload_nob > 0) {
- if (payload_kiov != NULL)
- lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
- payload_niov, payload_kiov,
- payload_offset, payload_nob);
- else
- lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
- payload_niov, payload_iov,
- payload_offset, payload_nob);
+ mhdr = &msg->kqm_u.immediate.kqim_hdr;
+ payload = msg->kqm_u.immediate.kqim_payload;
+
+ *mhdr = *hdr;
+ nob = (payload - ktx->ktx_buffer) + payload_nob;
+
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
+
+ if (payload_kiov != NULL)
+ lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
+ payload_niov, payload_kiov,
+ payload_offset, payload_nob);
+ else
+ lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
+ payload_niov, payload_iov,
+ payload_offset, payload_nob);
+#if KQSW_CKSUM
+ msg->kqm_nob = nob;
+ msg->kqm_cksum = 0;
+ msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
+ if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
+ msg->kqm_cksum++;
+ *kqswnal_tunables.kqn_inject_csum_error = 0;
}
+#endif
} else {
+ lnet_hdr_t *mhdr;
+ kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
- /* large message: multiple frags: first is hdr in pre-mapped buffer */
+ /* multiple frags: first is hdr in pre-mapped buffer */
+ msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
+ msg->kqm_version = QSWLND_PROTO_VERSION;
+ msg->kqm_type = QSWLND_MSG_IMMEDIATE;
+
+ mhdr = &msg->kqm_u.immediate.kqim_hdr;
+ nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
+
+ *mhdr = *hdr;
+
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
-#if MULTIRAIL_EKC
- ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
- 0, KQSW_HDR_SIZE);
-#else
- ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
- ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
-#endif
if (payload_kiov != NULL)
rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
payload_niov, payload_kiov);
payload_niov, payload_iov);
if (rc != 0)
goto out;
+
+#if KQSW_CKSUM
+ msg->kqm_nob = nob + payload_nob;
+ msg->kqm_cksum = 0;
+ msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
+
+ msg->kqm_cksum = (payload_kiov != NULL) ?
+ kqswnal_csum_kiov(msg->kqm_cksum,
+ payload_offset, payload_nob,
+ payload_niov, payload_kiov) :
+ kqswnal_csum_iov(msg->kqm_cksum,
+ payload_offset, payload_nob,
+ payload_niov, payload_iov);
+
+ if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
+ msg->kqm_cksum++;
+ *kqswnal_tunables.kqn_inject_csum_error = 0;
+ }
+#endif
+ nob += payload_nob;
}
- ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
+ ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?
EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
rc = kqswnal_launch (ktx);
out:
- CDEBUG(rc == 0 ? D_NET : D_ERROR,
- "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n",
- rc == 0 ? "Sent" : "Failed to send",
- payload_nob, nid, targetnid, rc);
+ CDEBUG(rc == 0 ? D_NET : D_NETERROR, "%s %d bytes to %s%s: rc %d\n",
+ routing ? (rc == 0 ? "Routed" : "Failed to route") :
+ (rc == 0 ? "Sent" : "Failed to send"),
+ nob, libcfs_nid2str(target.nid),
+ target_is_router ? "(router)" : "", rc);
if (rc != 0) {
- if (ktx->ktx_state == KTX_GETTING &&
- ktx->ktx_args[2] != NULL) {
+ lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
+ int state = ktx->ktx_state;
+
+ kqswnal_put_idle_tx (ktx);
+
+ if (state == KTX_GETTING && repmsg != NULL) {
/* We committed to reply, but there was a problem
* launching the GET. We can't avoid delivering a
* REPLY event since we committed above, so we
* pretend the GET succeeded but the REPLY
* failed. */
rc = 0;
- lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
- lib_finalize (&kqswnal_lib, private,
- (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
+ lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
+ lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
}
- kqswnal_put_idle_tx (ktx);
}
atomic_dec(&kqswnal_data.kqn_pending_txs);
- return (rc == 0 ? PTL_OK : PTL_FAIL);
-}
-
-static ptl_err_t
-kqswnal_send (lib_nal_t *nal,
- void *private,
- lib_msg_t *libmsg,
- ptl_hdr_t *hdr,
- int type,
- ptl_nid_t nid,
- ptl_pid_t pid,
- unsigned int payload_niov,
- struct iovec *payload_iov,
- size_t payload_offset,
- size_t payload_nob)
-{
- return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
- payload_niov, payload_iov, NULL,
- payload_offset, payload_nob));
-}
-
-static ptl_err_t
-kqswnal_send_pages (lib_nal_t *nal,
- void *private,
- lib_msg_t *libmsg,
- ptl_hdr_t *hdr,
- int type,
- ptl_nid_t nid,
- ptl_pid_t pid,
- unsigned int payload_niov,
- ptl_kiov_t *payload_kiov,
- size_t payload_offset,
- size_t payload_nob)
-{
- return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
- payload_niov, NULL, payload_kiov,
- payload_offset, payload_nob));
-}
-
-void
-kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
-{
- int rc;
- kqswnal_tx_t *ktx;
- ptl_kiov_t *kiov = fwd->kprfd_kiov;
- int niov = fwd->kprfd_niov;
- int nob = fwd->kprfd_nob;
- ptl_nid_t nid = fwd->kprfd_gateway_nid;
-
-#if KQSW_CHECKSUM
- CERROR ("checksums for forwarded packets not implemented\n");
- LBUG ();
-#endif
- /* The router wants this NAL to forward a packet */
- CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
- fwd, nid, niov, nob);
-
- ktx = kqswnal_get_idle_tx (fwd, 0);
- if (ktx == NULL) /* can't get txd right now */
- return; /* fwd will be scheduled when tx desc freed */
-
- if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
- nid = fwd->kprfd_target_nid; /* target is final dest */
-
- if (kqswnal_nid2elanid (nid) < 0) {
- CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
- rc = -EHOSTUNREACH;
- goto out;
- }
-
- /* copy hdr into pre-mapped buffer */
- memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
-
- ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ?
- EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
- ktx->ktx_nid = nid;
- ktx->ktx_state = KTX_FORWARDING;
- ktx->ktx_args[0] = fwd;
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-
- if (nob <= KQSW_TX_MAXCONTIG)
- {
- /* send payload from ktx's pre-mapped contiguous buffer */
-#if MULTIRAIL_EKC
- ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
- 0, KQSW_HDR_SIZE + nob);
-#else
- ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
- ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
-#endif
- if (nob > 0)
- lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
- niov, kiov, 0, nob);
- }
- else
- {
- /* zero copy payload */
-#if MULTIRAIL_EKC
- ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
- 0, KQSW_HDR_SIZE);
-#else
- ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
- ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
-#endif
- rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
- if (rc != 0)
- goto out;
- }
-
- rc = kqswnal_launch (ktx);
- out:
- if (rc != 0) {
- CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
-
- /* complete now (with failure) */
- kqswnal_tx_done (ktx, rc);
- }
-
- atomic_dec(&kqswnal_data.kqn_pending_txs);
-}
-
-void
-kqswnal_fwd_callback (void *arg, int error)
-{
- kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
-
- /* The router has finished forwarding this packet */
-
- if (error != 0)
- {
- ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
-
- CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
- le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid),error);
- }
-
- LASSERT (atomic_read(&krx->krx_refcount) == 1);
- kqswnal_rx_decref (krx);
+ return (rc == 0 ? 0 : -EIO);
}
void
krx->krx_state = KRX_POSTED;
-#if MULTIRAIL_EKC
if (kqswnal_data.kqn_shuttingdown) {
/* free EKC rxd on shutdown */
ep_complete_receive(krx->krx_rxd);
kqswnal_rxhandler, krx,
&krx->krx_elanbuffer, 0);
}
-#else
- if (kqswnal_data.kqn_shuttingdown)
- return;
-
- if (krx->krx_rxd == NULL) {
- /* We had a failed ep_complete_rpc() which nukes the
- * descriptor in "old" EKC */
- int eprc = ep_queue_receive(krx->krx_eprx,
- kqswnal_rxhandler, krx,
- krx->krx_elanbuffer,
- krx->krx_npages * PAGE_SIZE, 0);
- LASSERT (eprc == EP_SUCCESS);
- /* We don't handle failure here; it's incredibly rare
- * (never reported?) and only happens with "old" EKC */
- } else {
- ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
- krx->krx_elanbuffer,
- krx->krx_npages * PAGE_SIZE);
- }
-#endif
}
void
kqswnal_rx_done (kqswnal_rx_t *krx)
{
int rc;
- EP_STATUSBLK *sblk;
LASSERT (atomic_read(&krx->krx_refcount) == 0);
if (krx->krx_rpc_reply_needed) {
/* We've not completed the peer's RPC yet... */
- sblk = (krx->krx_rpc_reply_status == 0) ?
- &kqswnal_data.kqn_rpc_success :
- &kqswnal_data.kqn_rpc_failed;
+ krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC;
+ krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
LASSERT (!in_interrupt());
-#if MULTIRAIL_EKC
- rc = ep_complete_rpc(krx->krx_rxd,
- kqswnal_rpc_complete, krx,
- sblk, NULL, NULL, 0);
- if (rc == EP_SUCCESS)
- return;
-#else
+
rc = ep_complete_rpc(krx->krx_rxd,
kqswnal_rpc_complete, krx,
- sblk, NULL, 0);
+ &krx->krx_rpc_reply.ep_statusblk,
+ NULL, NULL, 0);
if (rc == EP_SUCCESS)
return;
- /* "old" EKC destroys rxd on failed completion */
- krx->krx_rxd = NULL;
-#endif
CERROR("can't complete RPC: %d\n", rc);
krx->krx_rpc_reply_needed = 0;
}
void
kqswnal_parse (kqswnal_rx_t *krx)
{
- ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
- ptl_nid_t dest_nid = le64_to_cpu(hdr->dest_nid);
- int payload_nob;
+ lnet_ni_t *ni = kqswnal_data.kqn_ni;
+ kqswnal_msg_t *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
+ lnet_nid_t fromnid = kqswnal_rx_nid(krx);
+ int swab;
+ int n;
+ int i;
int nob;
- int niov;
+ int rc;
LASSERT (atomic_read(&krx->krx_refcount) == 1);
- if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
- /* I ignore parse errors since I'm not consuming a byte
- * stream */
- (void)lib_parse (&kqswnal_lib, hdr, krx);
-
- /* Drop my ref; any RDMA activity takes an additional ref */
- kqswnal_rx_decref(krx);
- return;
+ if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {
+ CERROR("Short message %d received from %s\n",
+ krx->krx_nob, libcfs_nid2str(fromnid));
+ goto done;
}
-#if KQSW_CHECKSUM
- LASSERTF (0, "checksums for forwarded packets not implemented\n");
+ swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);
+
+ if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {
+#if KQSW_CKSUM
+ __u32 csum0;
+ __u32 csum1;
+
+ /* csum byte array before swab */
+ csum1 = msg->kqm_cksum;
+ msg->kqm_cksum = 0;
+ csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,
+ krx->krx_npages, krx->krx_kiov);
+ msg->kqm_cksum = csum1;
#endif
- if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
- {
- CERROR("dropping packet from "LPX64" for "LPX64
- ": target is peer\n", le64_to_cpu(hdr->src_nid), dest_nid);
+ if (swab) {
+ __swab16s(&msg->kqm_version);
+ __swab16s(&msg->kqm_type);
+#if KQSW_CKSUM
+ __swab32s(&msg->kqm_cksum);
+ __swab32s(&msg->kqm_nob);
+#endif
+ }
- kqswnal_rx_decref (krx);
- return;
- }
+ if (msg->kqm_version != QSWLND_PROTO_VERSION) {
+ /* Future protocol version compatibility support!
+ * The next qswlnd-specific protocol rev will first
+ * send an RPC to check version.
+ * 1.4.6 and 1.4.7.early reply with a status
+ * block containing its current version.
+ * Later versions send a failure (-ve) status +
+ * magic/version */
+
+ if (!krx->krx_rpc_reply_needed) {
+ CERROR("Unexpected version %d from %s\n",
+ msg->kqm_version, libcfs_nid2str(fromnid));
+ goto done;
+ }
- nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
- niov = 0;
- if (nob > 0) {
- krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
- krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
- niov = 1;
- nob -= PAGE_SIZE - KQSW_HDR_SIZE;
-
- while (nob > 0) {
- LASSERT (niov < krx->krx_npages);
-
- krx->krx_kiov[niov].kiov_offset = 0;
- krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
- niov++;
- nob -= PAGE_SIZE;
+ LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
+ goto done;
+ }
+
+ switch (msg->kqm_type) {
+ default:
+ CERROR("Bad request type %x from %s\n",
+ msg->kqm_type, libcfs_nid2str(fromnid));
+ goto done;
+
+ case QSWLND_MSG_IMMEDIATE:
+ if (krx->krx_rpc_reply_needed) {
+ /* Should have been a simple message */
+ CERROR("IMMEDIATE sent as RPC from %s\n",
+ libcfs_nid2str(fromnid));
+ goto done;
+ }
+
+ nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
+ if (krx->krx_nob < nob) {
+ CERROR("Short IMMEDIATE %d(%d) from %s\n",
+ krx->krx_nob, nob, libcfs_nid2str(fromnid));
+ goto done;
+ }
+
+#if KQSW_CKSUM
+ if (csum0 != msg->kqm_cksum) {
+ CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",
+ csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));
+ CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);
+ goto done;
+ }
+#endif
+ rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,
+ fromnid, krx, 0);
+ if (rc < 0)
+ goto done;
+ return;
+
+ case QSWLND_MSG_RDMA:
+ if (!krx->krx_rpc_reply_needed) {
+ /* Should have been a simple message */
+ CERROR("RDMA sent as simple message from %s\n",
+ libcfs_nid2str(fromnid));
+ goto done;
+ }
+
+ nob = offsetof(kqswnal_msg_t,
+ kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);
+ if (krx->krx_nob < nob) {
+ CERROR("Short RDMA message %d(%d) from %s\n",
+ krx->krx_nob, nob, libcfs_nid2str(fromnid));
+ goto done;
+ }
+
+ if (swab)
+ __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);
+
+ n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;
+ nob = offsetof(kqswnal_msg_t,
+ kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);
+
+ if (krx->krx_nob < nob) {
+ CERROR("short RDMA message %d(%d) from %s\n",
+ krx->krx_nob, nob, libcfs_nid2str(fromnid));
+ goto done;
+ }
+
+ if (swab) {
+ for (i = 0; i < n; i++) {
+ EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];
+
+ __swab32s(&nmd->nmd_addr);
+ __swab32s(&nmd->nmd_len);
+ __swab32s(&nmd->nmd_attr);
+ }
+ }
+
+#if KQSW_CKSUM
+ krx->krx_cksum = csum0; /* stash checksum so far */
+#endif
+ rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,
+ fromnid, krx, 1);
+ if (rc < 0)
+ goto done;
+ return;
}
+ /* Not Reached */
}
- kpr_fwd_init (&krx->krx_fwd, dest_nid,
- hdr, payload_nob, niov, krx->krx_kiov,
- kqswnal_fwd_callback, krx);
+ if (msg->kqm_magic == LNET_PROTO_MAGIC ||
+ msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {
+ /* Future protocol version compatibility support!
+ * When LNET unifies protocols over all LNDs, the first thing a
+ * peer will send will be a version query RPC.
+ * 1.4.6 and 1.4.7.early reply with a status block containing
+ * LNET_PROTO_QSW_MAGIC..
+ * Later versions send a failure (-ve) status +
+ * magic/version */
+
+ if (!krx->krx_rpc_reply_needed) {
+ CERROR("Unexpected magic %08x from %s\n",
+ msg->kqm_magic, libcfs_nid2str(fromnid));
+ goto done;
+ }
+
+ LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
+ goto done;
+ }
- kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
+ CERROR("Unrecognised magic %08x from %s\n",
+ msg->kqm_magic, libcfs_nid2str(fromnid));
+ done:
+ kqswnal_rx_decref(krx);
}
/* Receive Interrupt Handler: posts to schedulers */
int nob = ep_rxd_len (rxd);
int status = ep_rxd_status (rxd);
kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
-
CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
rxd, krx, nob, status);
LASSERT (krx != NULL);
- LASSERT (krx->krx_state = KRX_POSTED);
+ LASSERT (krx->krx_state == KRX_POSTED);
krx->krx_state = KRX_PARSE;
krx->krx_rxd = rxd;
krx->krx_nob = nob;
-#if MULTIRAIL_EKC
- krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
-#else
- krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
-#endif
+
+ /* RPC reply iff rpc request received without error */
+ krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
+ (status == EP_SUCCESS ||
+ status == EP_MSG_TOO_BIG);
+
/* Default to failure if an RPC reply is requested but not handled */
- krx->krx_rpc_reply_status = -EPROTO;
+ krx->krx_rpc_reply.msg.status = -EPROTO;
atomic_set (&krx->krx_refcount, 1);
- /* must receive a whole header to be able to parse */
- if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
- {
+ if (status != EP_SUCCESS) {
/* receives complete with failure when receiver is removed */
-#if MULTIRAIL_EKC
if (status == EP_SHUTDOWN)
LASSERT (kqswnal_data.kqn_shuttingdown);
else
CERROR("receive status failed with status %d nob %d\n",
ep_rxd_status(rxd), nob);
-#else
- if (!kqswnal_data.kqn_shuttingdown)
- CERROR("receive status failed with status %d nob %d\n",
- ep_rxd_status(rxd), nob);
-#endif
kqswnal_rx_decref(krx);
return;
}
spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
}
-#if KQSW_CHECKSUM
-void
-kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
-{
- ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
-
- CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
- ", dpid %d, spid %d, type %d\n",
- ishdr ? "Header" : "Payload", krx,
- le64_to_cpu(hdr->dest_nid), le64_to_cpu(hdr->src_nid)
- le32_to_cpu(hdr->dest_pid), le32_to_cpu(hdr->src_pid),
- le32_to_cpu(hdr->type));
-
- switch (le32_to_cpu(hdr->type))
- {
- case PTL_MSG_ACK:
- CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
- " len %u\n",
- le32_to_cpu(hdr->msg.ack.mlength),
- hdr->msg.ack.dst_wmd.handle_cookie,
- hdr->msg.ack.dst_wmd.handle_idx,
- le64_to_cpu(hdr->msg.ack.match_bits),
- le32_to_cpu(hdr->msg.ack.length));
- break;
- case PTL_MSG_PUT:
- CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
- " len %u off %u data "LPX64"\n",
- le32_to_cpu(hdr->msg.put.ptl_index),
- hdr->msg.put.ack_wmd.handle_cookie,
- hdr->msg.put.ack_wmd.handle_idx,
- le64_to_cpu(hdr->msg.put.match_bits),
- le32_to_cpu(hdr->msg.put.length),
- le32_to_cpu(hdr->msg.put.offset),
- hdr->msg.put.hdr_data);
- break;
- case PTL_MSG_GET:
- CERROR ("GET: <>\n");
- break;
- case PTL_MSG_REPLY:
- CERROR ("REPLY: <>\n");
- break;
- default:
- CERROR ("TYPE?: <>\n");
- }
-}
-#endif
-
-static ptl_err_t
-kqswnal_recvmsg (lib_nal_t *nal,
- void *private,
- lib_msg_t *libmsg,
- unsigned int niov,
- struct iovec *iov,
- ptl_kiov_t *kiov,
- size_t offset,
- size_t mlen,
- size_t rlen)
+int
+kqswnal_recv (lnet_ni_t *ni,
+ void *private,
+ lnet_msg_t *lntmsg,
+ int delayed,
+ unsigned int niov,
+ struct iovec *iov,
+ lnet_kiov_t *kiov,
+ unsigned int offset,
+ unsigned int mlen,
+ unsigned int rlen)
{
- kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
- char *buffer = page_address(krx->krx_kiov[0].kiov_page);
- ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
- int page;
- char *page_ptr;
- int page_nob;
- char *iov_ptr;
- int iov_nob;
- int frag;
- int rc;
-#if KQSW_CHECKSUM
- kqsw_csum_t senders_csum;
- kqsw_csum_t payload_csum = 0;
- kqsw_csum_t hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
- size_t csum_len = mlen;
- int csum_frags = 0;
- int csum_nob = 0;
- static atomic_t csum_counter;
- int csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
-
- atomic_inc (&csum_counter);
-
- memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
- if (senders_csum != hdr_csum)
- kqswnal_csum_error (krx, 1);
-#endif
- /* NB lib_parse() has already flipped *hdr */
-
- CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
-
- if (krx->krx_rpc_reply_needed &&
- hdr->type == PTL_MSG_PUT) {
- /* This must be an optimized PUT */
- rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
- niov, iov, kiov, offset, mlen);
- return (rc == 0 ? PTL_OK : PTL_FAIL);
- }
-
- /* What was actually received must be >= payload. */
- LASSERT (mlen <= rlen);
- if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
- CERROR("Bad message size: have %d, need %d + %d\n",
- krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
- return (PTL_FAIL);
- }
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
+ lnet_nid_t fromnid;
+ kqswnal_msg_t *msg;
+ lnet_hdr_t *hdr;
+ kqswnal_remotemd_t *rmd;
+ int msg_offset;
+ int rc;
- /* It must be OK to kmap() if required */
- LASSERT (kiov == NULL || !in_interrupt ());
+ LASSERT (!in_interrupt ()); /* OK to map */
/* Either all pages or all vaddrs */
LASSERT (!(kiov != NULL && iov != NULL));
- if (mlen != 0) {
- page = 0;
- page_ptr = buffer + KQSW_HDR_SIZE;
- page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
+ fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
+ msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
- LASSERT (niov > 0);
+ if (krx->krx_rpc_reply_needed) {
+ /* optimized (rdma) request sent as RPC */
- if (kiov != NULL) {
- /* skip complete frags */
- while (offset >= kiov->kiov_len) {
- offset -= kiov->kiov_len;
- kiov++;
- niov--;
- LASSERT (niov > 0);
- }
- iov_ptr = ((char *)kmap (kiov->kiov_page)) +
- kiov->kiov_offset + offset;
- iov_nob = kiov->kiov_len - offset;
- } else {
- /* skip complete frags */
- while (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- iov++;
- niov--;
- LASSERT (niov > 0);
- }
- iov_ptr = iov->iov_base + offset;
- iov_nob = iov->iov_len - offset;
- }
-
- for (;;)
- {
- frag = mlen;
- if (frag > page_nob)
- frag = page_nob;
- if (frag > iov_nob)
- frag = iov_nob;
-
- memcpy (iov_ptr, page_ptr, frag);
-#if KQSW_CHECKSUM
- payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
- csum_nob += frag;
- csum_frags++;
-#endif
- mlen -= frag;
- if (mlen == 0)
+ LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);
+ hdr = &msg->kqm_u.rdma.kqrm_hdr;
+ rmd = &msg->kqm_u.rdma.kqrm_rmd;
+
+ /* NB header is still in wire byte order */
+
+ switch (le32_to_cpu(hdr->type)) {
+ case LNET_MSG_PUT:
+ case LNET_MSG_REPLY:
+ /* This is an optimized PUT/REPLY */
+ rc = kqswnal_rdma(krx, lntmsg,
+ KTX_RDMA_FETCH, rmd,
+ niov, iov, kiov, offset, mlen);
break;
- page_nob -= frag;
- if (page_nob != 0)
- page_ptr += frag;
- else
- {
- page++;
- LASSERT (page < krx->krx_npages);
- page_ptr = page_address(krx->krx_kiov[page].kiov_page);
- page_nob = PAGE_SIZE;
- }
+ case LNET_MSG_GET:
+#if KQSW_CKSUM
+ if (krx->krx_cksum != msg->kqm_cksum) {
+ CERROR("Bad GET checksum %08x(%08x) from %s\n",
+ krx->krx_cksum, msg->kqm_cksum,
+ libcfs_nid2str(fromnid));
+ rc = -EIO;
+ break;
+ }
+#endif
+ if (lntmsg == NULL) {
+ /* No buffer match: my decref will
+ * complete the RPC with failure */
+ rc = 0;
+ } else {
+ /* Matched something! */
+ rc = kqswnal_rdma(krx, lntmsg,
+ KTX_RDMA_STORE, rmd,
+ lntmsg->msg_niov,
+ lntmsg->msg_iov,
+ lntmsg->msg_kiov,
+ lntmsg->msg_offset,
+ lntmsg->msg_len);
+ }
+ break;
- iov_nob -= frag;
- if (iov_nob != 0)
- iov_ptr += frag;
- else if (kiov != NULL) {
- kunmap (kiov->kiov_page);
- kiov++;
- niov--;
- LASSERT (niov > 0);
- iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
- iov_nob = kiov->kiov_len;
- } else {
- iov++;
- niov--;
- LASSERT (niov > 0);
- iov_ptr = iov->iov_base;
- iov_nob = iov->iov_len;
- }
+ default:
+ CERROR("Bad RPC type %d\n",
+ le32_to_cpu(hdr->type));
+ rc = -EPROTO;
+ break;
}
- if (kiov != NULL)
- kunmap (kiov->kiov_page);
+ kqswnal_rx_decref(krx);
+ return rc;
}
-#if KQSW_CHECKSUM
- memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t),
- sizeof(kqsw_csum_t));
-
- if (csum_len != rlen)
- CERROR("Unable to checksum data in user's buffer\n");
- else if (senders_csum != payload_csum)
- kqswnal_csum_error (krx, 0);
-
- if (csum_verbose)
- CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
- "csum_nob %d\n",
- hdr_csum, payload_csum, csum_frags, csum_nob);
-#endif
- lib_finalize(nal, private, libmsg, PTL_OK);
-
- return (PTL_OK);
-}
+ LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);
+ msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
+
+ if (krx->krx_nob < msg_offset + rlen) {
+ CERROR("Bad message size from %s: have %d, need %d + %d\n",
+ libcfs_nid2str(fromnid), krx->krx_nob,
+ msg_offset, rlen);
+ kqswnal_rx_decref(krx);
+ return -EPROTO;
+ }
-static ptl_err_t
-kqswnal_recv(lib_nal_t *nal,
- void *private,
- lib_msg_t *libmsg,
- unsigned int niov,
- struct iovec *iov,
- size_t offset,
- size_t mlen,
- size_t rlen)
-{
- return (kqswnal_recvmsg(nal, private, libmsg,
- niov, iov, NULL,
- offset, mlen, rlen));
-}
+ if (kiov != NULL)
+ lnet_copy_kiov2kiov(niov, kiov, offset,
+ krx->krx_npages, krx->krx_kiov,
+ msg_offset, mlen);
+ else
+ lnet_copy_kiov2iov(niov, iov, offset,
+ krx->krx_npages, krx->krx_kiov,
+ msg_offset, mlen);
-static ptl_err_t
-kqswnal_recv_pages (lib_nal_t *nal,
- void *private,
- lib_msg_t *libmsg,
- unsigned int niov,
- ptl_kiov_t *kiov,
- size_t offset,
- size_t mlen,
- size_t rlen)
-{
- return (kqswnal_recvmsg(nal, private, libmsg,
- niov, NULL, kiov,
- offset, mlen, rlen));
+ lnet_finalize(ni, lntmsg, 0);
+ kqswnal_rx_decref(krx);
+ return 0;
}
int
{
kqswnal_rx_t *krx;
kqswnal_tx_t *ktx;
- kpr_fwd_desc_t *fwd;
unsigned long flags;
int rc;
int counter = 0;
int did_something;
- kportal_daemonize ("kqswnal_sched");
- kportal_blockallsigs ();
+ cfs_daemonize ("kqswnal_sched");
+ cfs_block_allsigs ();
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
flags);
- switch (krx->krx_state) {
- case KRX_PARSE:
- kqswnal_parse (krx);
- break;
- case KRX_COMPLETING:
- /* Drop last ref to reply to RPC and requeue */
- LASSERT (krx->krx_rpc_reply_needed);
- kqswnal_rx_decref (krx);
- break;
- default:
- LBUG();
- }
+ LASSERT (krx->krx_state == KRX_PARSE);
+ kqswnal_parse (krx);
did_something = 1;
spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
}
- if (!list_empty (&kqswnal_data.kqn_delayedtxds))
+ if (!list_empty (&kqswnal_data.kqn_donetxds))
{
- ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
- kqswnal_tx_t, ktx_list);
- list_del_init (&ktx->ktx_delayed_list);
+ ktx = list_entry(kqswnal_data.kqn_donetxds.next,
+ kqswnal_tx_t, ktx_schedlist);
+ list_del_init (&ktx->ktx_schedlist);
spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
flags);
- rc = kqswnal_launch (ktx);
- if (rc != 0) {
- CERROR("Failed delayed transmit to "LPX64
- ": %d\n", ktx->ktx_nid, rc);
- kqswnal_tx_done (ktx, rc);
- }
- atomic_dec (&kqswnal_data.kqn_pending_txs);
+ kqswnal_tx_done_in_thread_context(ktx);
did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
- if (!list_empty (&kqswnal_data.kqn_delayedfwds))
+ if (!list_empty (&kqswnal_data.kqn_delayedtxds))
{
- fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
- list_del (&fwd->kprfd_list);
- spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+ ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
+ kqswnal_tx_t, ktx_schedlist);
+ list_del_init (&ktx->ktx_schedlist);
+ spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
+ flags);
- /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
- kqswnal_fwd_packet (NULL, fwd);
+ rc = kqswnal_launch (ktx);
+ if (rc != 0) {
+ CERROR("Failed delayed transmit to %s: %d\n",
+ libcfs_nid2str(ktx->ktx_nid), rc);
+ kqswnal_tx_done (ktx, rc);
+ }
+ atomic_dec (&kqswnal_data.kqn_pending_txs);
did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
* there's nothing left to do */
break;
}
- rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
- kqswnal_data.kqn_shuttingdown == 2 ||
- !list_empty(&kqswnal_data.kqn_readyrxds) ||
- !list_empty(&kqswnal_data.kqn_delayedtxds) ||
- !list_empty(&kqswnal_data.kqn_delayedfwds));
+ rc = wait_event_interruptible_exclusive (
+ kqswnal_data.kqn_sched_waitq,
+ kqswnal_data.kqn_shuttingdown == 2 ||
+ !list_empty(&kqswnal_data.kqn_readyrxds) ||
+ !list_empty(&kqswnal_data.kqn_donetxds) ||
+ !list_empty(&kqswnal_data.kqn_delayedtxds));
LASSERT (rc == 0);
} else if (need_resched())
schedule ();
kqswnal_thread_fini ();
return (0);
}
-
-lib_nal_t kqswnal_lib =
-{
- libnal_data: &kqswnal_data, /* NAL private data */
- libnal_send: kqswnal_send,
- libnal_send_pages: kqswnal_send_pages,
- libnal_recv: kqswnal_recv,
- libnal_recv_pages: kqswnal_recv_pages,
- libnal_dist: kqswnal_dist
-};