* Copyright (C) 2002 Cluster File Systems, Inc.
* Author: Eric Barton <eric@bartonsoftware.com>
*
- * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
- * W. Marcus Miller - Based on ksocknal
- *
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
+ * This file is part of Portals, http://www.lustre.org
*
* Portals is free software; you can redistribute it and/or
* modify it under the terms of version 2 of the GNU General Public
#include "qswnal.h"
-EP_STATUSBLK kqswnal_rpc_success;
-EP_STATUSBLK kqswnal_rpc_failed;
-
/*
* LIB functions follow
*
*/
-static ptl_err_t
-kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
- size_t len)
-{
- CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
- nal->ni.nid, len, src_addr, dst_addr );
- memcpy( dst_addr, src_addr, len );
-
- return (PTL_OK);
-}
-
-static ptl_err_t
-kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
- size_t len)
-{
- CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
- nal->ni.nid, len, src_addr, dst_addr );
- memcpy( dst_addr, src_addr, len );
-
- return (PTL_OK);
-}
-
-static void *
-kqswnal_malloc(nal_cb_t *nal, size_t len)
-{
- void *buf;
-
- PORTAL_ALLOC(buf, len);
- return (buf);
-}
-
-static void
-kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
-{
- PORTAL_FREE(buf, len);
-}
-
-static void
-kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
-{
- va_list ap;
- char msg[256];
-
- va_start (ap, fmt);
- vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
- va_end (ap);
-
- msg[sizeof (msg) - 1] = 0; /* ensure terminated */
-
- CDEBUG (D_NET, "%s", msg);
-}
-
-
-static void
-kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
-{
- kqswnal_data_t *data= nal->nal_data;
-
- spin_lock_irqsave(&data->kqn_statelock, *flags);
-}
-
-
-static void
-kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
-{
- kqswnal_data_t *data= nal->nal_data;
-
- spin_unlock_irqrestore(&data->kqn_statelock, *flags);
-}
-
-
static int
-kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
+kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
{
- if (nid == nal->ni.nid)
+ if (nid == nal->libnal_ni.ni_pid.nid)
*dist = 0; /* it's me */
else if (kqswnal_nid2elanid (nid) >= 0)
*dist = 1; /* it's my peer */
{
#if MULTIRAIL_EKC
int i;
+
+ ktx->ktx_rail = -1; /* unset rail */
#endif
if (ktx->ktx_nmappedpages == 0)
char *ptr;
#if MULTIRAIL_EKC
EP_RAILMASK railmask;
- int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
- EP_RAILMASK_ALL,
- kqswnal_nid2elanid(ktx->ktx_nid));
-
+ int rail;
+
+ if (ktx->ktx_rail < 0)
+ ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+ EP_RAILMASK_ALL,
+ kqswnal_nid2elanid(ktx->ktx_nid));
+ rail = ktx->ktx_rail;
if (rail < 0) {
CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
return (-ENETDOWN);
do {
int fraglen = kiov->kiov_len - offset;
- /* nob exactly spans the iovs */
- LASSERT (fraglen <= nob);
- /* each frag fits in a page */
+ /* each page frag is contained in one page */
LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
+ if (fraglen > nob)
+ fraglen = nob;
+
nmapped++;
if (nmapped > maxmapped) {
CERROR("Can't map message in %d pages (max %d)\n",
uint32_t basepage = ktx->ktx_basepage + nmapped;
#if MULTIRAIL_EKC
EP_RAILMASK railmask;
- int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
- EP_RAILMASK_ALL,
- kqswnal_nid2elanid(ktx->ktx_nid));
+ int rail;
+ if (ktx->ktx_rail < 0)
+ ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+ EP_RAILMASK_ALL,
+ kqswnal_nid2elanid(ktx->ktx_nid));
+ rail = ktx->ktx_rail;
if (rail < 0) {
CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
return (-ENETDOWN);
do {
int fraglen = iov->iov_len - offset;
- long npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
-
- /* nob exactly spans the iovs */
- LASSERT (fraglen <= nob);
+ long npages;
+ if (fraglen > nob)
+ fraglen = nob;
+ npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
+
nmapped += npages;
if (nmapped > maxmapped) {
CERROR("Can't map message in %d pages (max %d)\n",
list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
/* anything blocking for a tx descriptor? */
- if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
+ if (!kqswnal_data.kqn_shuttingdown &&
+ !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
{
CDEBUG(D_NET,"wakeup fwd\n");
for (;;) {
spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
+ if (kqswnal_data.kqn_shuttingdown)
+ break;
+
/* "normal" descriptor is free */
if (!list_empty (&kqswnal_data.kqn_idletxds)) {
ktx = list_entry (kqswnal_data.kqn_idletxds.next,
break;
}
- /* "normal" descriptor pool is empty */
-
- if (fwd != NULL) { /* forwarded packet => queue for idle txd */
- CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
- list_add_tail (&fwd->kprfd_list,
- &kqswnal_data.kqn_idletxd_fwdq);
+ if (fwd != NULL) /* forwarded packet? */
break;
- }
/* doing a local transmit */
if (!may_block) {
CDEBUG (D_NET, "blocking for tx desc\n");
wait_event (kqswnal_data.kqn_idletxd_waitq,
- !list_empty (&kqswnal_data.kqn_idletxds));
+ !list_empty (&kqswnal_data.kqn_idletxds) ||
+ kqswnal_data.kqn_shuttingdown);
}
if (ktx != NULL) {
list_del (&ktx->ktx_list);
list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
ktx->ktx_launcher = current->pid;
+ atomic_inc(&kqswnal_data.kqn_pending_txs);
+ } else if (fwd != NULL) {
+ /* queue forwarded packet until idle txd available */
+ CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
+ list_add_tail (&fwd->kprfd_list,
+ &kqswnal_data.kqn_idletxd_fwdq);
}
spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
void
kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
{
- lib_msg_t *msg;
- lib_msg_t *repmsg = NULL;
-
switch (ktx->ktx_state) {
case KTX_FORWARDING: /* router asked me to forward this packet */
kpr_fwd_done (&kqswnal_data.kqn_router,
(kpr_fwd_desc_t *)ktx->ktx_args[0], error);
break;
- case KTX_SENDING: /* packet sourced locally */
- lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
+ case KTX_RDMAING: /* optimized GET/PUT handled */
+ case KTX_PUTTING: /* optimized PUT sent */
+ case KTX_SENDING: /* normal send */
+ lib_finalize (&kqswnal_lib, NULL,
(lib_msg_t *)ktx->ktx_args[1],
- (error == 0) ? PTL_OK :
- (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
+ (error == 0) ? PTL_OK : PTL_FAIL);
break;
- case KTX_GETTING: /* Peer has DMA-ed direct? */
- msg = (lib_msg_t *)ktx->ktx_args[1];
-
- if (error == 0) {
- repmsg = lib_fake_reply_msg (&kqswnal_lib,
- ktx->ktx_nid, msg->md);
- if (repmsg == NULL)
- error = -ENOMEM;
- }
-
- if (error == 0) {
- lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
- msg, PTL_OK);
- lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
- } else {
- lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
- (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
- }
+ case KTX_GETTING: /* optimized GET sent & REPLY received */
+ /* Complete the GET with success since we can't avoid
+ * delivering a REPLY event; we committed to it when we
+ * launched the GET */
+ lib_finalize (&kqswnal_lib, NULL,
+ (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
+ lib_finalize (&kqswnal_lib, NULL,
+ (lib_msg_t *)ktx->ktx_args[2],
+ (error == 0) ? PTL_OK : PTL_FAIL);
break;
default:
kqswnal_notify_peer_down(ktx);
status = -EHOSTDOWN;
- } else if (ktx->ktx_state == KTX_GETTING) {
- /* RPC completed OK; what did our peer put in the status
+ } else switch (ktx->ktx_state) {
+
+ case KTX_GETTING:
+ case KTX_PUTTING:
+ /* RPC completed OK; but what did our peer put in the status
* block? */
#if MULTIRAIL_EKC
status = ep_txd_statusblk(txd)->Data[0];
#else
status = ep_txd_statusblk(txd)->Status;
#endif
- } else {
+ break;
+
+ case KTX_FORWARDING:
+ case KTX_SENDING:
status = 0;
+ break;
+
+ default:
+ LBUG();
+ break;
}
kqswnal_tx_done (ktx, status);
/* Don't block for transmit descriptor if we're in interrupt context */
int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
int dest = kqswnal_nid2elanid (ktx->ktx_nid);
- long flags;
+ unsigned long flags;
int rc;
ktx->ktx_launchtime = jiffies;
+ if (kqswnal_data.kqn_shuttingdown)
+ return (-ESHUTDOWN);
+
LASSERT (dest >= 0); /* must be a peer */
- if (ktx->ktx_state == KTX_GETTING) {
- /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
- * other frags are the GET sink which we obviously don't
- * send here :) */
+
#if MULTIRAIL_EKC
+ if (ktx->ktx_nmappedpages != 0)
+ attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
+#endif
+
+ switch (ktx->ktx_state) {
+ case KTX_GETTING:
+ case KTX_PUTTING:
+ /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
+ * The other frags are the payload, awaiting RDMA */
rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
ktx->ktx_port, attr,
kqswnal_txhandler, ktx,
NULL, ktx->ktx_frags, 1);
-#else
- rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
- ktx->ktx_port, attr, kqswnal_txhandler,
- ktx, NULL, ktx->ktx_frags, 1);
-#endif
- } else {
+ break;
+
+ case KTX_FORWARDING:
+ case KTX_SENDING:
#if MULTIRAIL_EKC
rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
ktx->ktx_port, attr,
kqswnal_txhandler, ktx,
ktx->ktx_frags, ktx->ktx_nfrag);
#endif
+ break;
+
+ default:
+ LBUG();
+ rc = -EINVAL; /* no compiler warning please */
+ break;
}
switch (rc) {
return (0);
case EP_ENOMEM: /* can't allocate ep txd => queue for later */
- LASSERT (in_interrupt());
-
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
}
}
+#if 0
static char *
hdr_type_string (ptl_hdr_t *hdr)
{
char *type_str = hdr_type_string (hdr);
CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
- NTOH__u32(hdr->payload_length));
- CERROR(" From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
- NTOH__u32(hdr->src_pid));
- CERROR(" To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
- NTOH__u32(hdr->dest_pid));
+ le32_to_cpu(hdr->payload_length));
+ CERROR(" From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
+ le32_to_cpu(hdr->src_pid));
+ CERROR(" To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
+ le32_to_cpu(hdr->dest_pid));
- switch (NTOH__u32(hdr->type)) {
+ switch (le32_to_cpu(hdr->type)) {
case PTL_MSG_PUT:
CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
"match bits "LPX64"\n",
- NTOH__u32 (hdr->msg.put.ptl_index),
+ le32_to_cpu(hdr->msg.put.ptl_index),
hdr->msg.put.ack_wmd.wh_interface_cookie,
hdr->msg.put.ack_wmd.wh_object_cookie,
- NTOH__u64 (hdr->msg.put.match_bits));
+ le64_to_cpu(hdr->msg.put.match_bits));
CERROR(" offset %d, hdr data "LPX64"\n",
- NTOH__u32(hdr->msg.put.offset),
+ le32_to_cpu(hdr->msg.put.offset),
hdr->msg.put.hdr_data);
break;
case PTL_MSG_GET:
CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
"match bits "LPX64"\n",
- NTOH__u32 (hdr->msg.get.ptl_index),
+ le32_to_cpu(hdr->msg.get.ptl_index),
hdr->msg.get.return_wmd.wh_interface_cookie,
hdr->msg.get.return_wmd.wh_object_cookie,
hdr->msg.get.match_bits);
CERROR(" Length %d, src offset %d\n",
- NTOH__u32 (hdr->msg.get.sink_length),
- NTOH__u32 (hdr->msg.get.src_offset));
+ le32_to_cpu(hdr->msg.get.sink_length),
+ le32_to_cpu(hdr->msg.get.src_offset));
break;
case PTL_MSG_ACK:
CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
hdr->msg.ack.dst_wmd.wh_interface_cookie,
hdr->msg.ack.dst_wmd.wh_object_cookie,
- NTOH__u32 (hdr->msg.ack.mlength));
+ le32_to_cpu(hdr->msg.ack.mlength));
break;
case PTL_MSG_REPLY:
}
} /* end of print_hdr() */
+#endif
#if !MULTIRAIL_EKC
void
CERROR ("DATAVEC too small\n");
return (-E2BIG);
}
+#else
+int
+kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
+ int nrfrag, EP_NMD *rfrag)
+{
+ int i;
+
+ if (nlfrag != nrfrag) {
+ CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+ nlfrag, nrfrag);
+ return (-EINVAL);
+ }
+
+ for (i = 0; i < nlfrag; i++)
+ if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
+ CERROR("Can't cope with unequal frags %d(%d):"
+ " %d local %d remote\n",
+ i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
+ return (-EINVAL);
+ }
+
+ return (0);
+}
#endif
-int
-kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
- struct iovec *iov, ptl_kiov_t *kiov,
- int offset, int nob)
+kqswnal_remotemd_t *
+kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
{
- kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
- int rc;
-#if MULTIRAIL_EKC
- int i;
-#else
- EP_DATAVEC datav[EP_MAXFRAG];
- int ndatav;
-#endif
- LASSERT (krx->krx_rpc_reply_needed);
- LASSERT ((iov == NULL) != (kiov == NULL));
+ ptl_nid_t nid = kqswnal_rx_nid(krx);
+
+ /* Note (1) lib_parse has already flipped hdr.
+ * (2) RDMA addresses are sent in native endian-ness. When
+ * EKC copes with different endian nodes, I'll fix this (and
+ * eat my hat :) */
+
+ LASSERT (krx->krx_nob >= sizeof(*hdr));
+
+ if (hdr->type != type) {
+ CERROR ("Unexpected optimized get/put type %d (%d expected)"
+ "from "LPX64"\n", hdr->type, type, nid);
+ return (NULL);
+ }
+
+ if (hdr->src_nid != nid) {
+ CERROR ("Unexpected optimized get/put source NID "
+ LPX64" from "LPX64"\n", hdr->src_nid, nid);
+ return (NULL);
+ }
+
+ LASSERT (nid == expected_nid);
- /* see kqswnal_sendmsg comment regarding endian-ness */
if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
/* msg too small to discover rmd size */
CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
- return (-EINVAL);
+ return (NULL);
}
-
+
if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
/* rmd doesn't fit in the incoming message */
CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
krx->krx_nob, rmd->kqrmd_nfrag,
(int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
- return (-EINVAL);
+ return (NULL);
+ }
+
+ return (rmd);
+}
+
+void
+kqswnal_rdma_store_complete (EP_RXD *rxd)
+{
+ int status = ep_rxd_status(rxd);
+ kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+
+ CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+ "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+ LASSERT (ktx->ktx_state == KTX_RDMAING);
+ LASSERT (krx->krx_rxd == rxd);
+ LASSERT (krx->krx_rpc_reply_needed);
+
+ krx->krx_rpc_reply_needed = 0;
+ kqswnal_rx_decref (krx);
+
+ /* free ktx & finalize() its lib_msg_t */
+ kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
+}
+
+void
+kqswnal_rdma_fetch_complete (EP_RXD *rxd)
+{
+ /* Completed fetching the PUT data */
+ int status = ep_rxd_status(rxd);
+ kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+ unsigned long flags;
+
+ CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+ "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+ LASSERT (ktx->ktx_state == KTX_RDMAING);
+ LASSERT (krx->krx_rxd == rxd);
+ /* RPC completes with failure by default */
+ LASSERT (krx->krx_rpc_reply_needed);
+ LASSERT (krx->krx_rpc_reply_status != 0);
+
+ if (status == EP_SUCCESS) {
+ status = krx->krx_rpc_reply_status = 0;
+ } else {
+ /* Abandon RPC since get failed */
+ krx->krx_rpc_reply_needed = 0;
+ status = -ECONNABORTED;
}
- /* Map the source data... */
+ /* free ktx & finalize() its lib_msg_t */
+ kqswnal_tx_done(ktx, status);
+
+ if (!in_interrupt()) {
+ /* OK to complete the RPC now (iff I had the last ref) */
+ kqswnal_rx_decref (krx);
+ return;
+ }
+
+ LASSERT (krx->krx_state == KRX_PARSE);
+ krx->krx_state = KRX_COMPLETING;
+
+ /* Complete the RPC in thread context */
+ spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+
+ list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
+ wake_up (&kqswnal_data.kqn_sched_waitq);
+
+ spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+}
+
+int
+kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
+ int niov, struct iovec *iov, ptl_kiov_t *kiov,
+ size_t offset, size_t len)
+{
+ kqswnal_remotemd_t *rmd;
+ kqswnal_tx_t *ktx;
+ int eprc;
+ int rc;
+#if !MULTIRAIL_EKC
+ EP_DATAVEC datav[EP_MAXFRAG];
+ int ndatav;
+#endif
+
+ LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
+ /* Not both mapped and paged payload */
+ LASSERT (iov == NULL || kiov == NULL);
+ /* RPC completes with failure by default */
+ LASSERT (krx->krx_rpc_reply_needed);
+ LASSERT (krx->krx_rpc_reply_status != 0);
+
+ rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
+ if (rmd == NULL)
+ return (-EPROTO);
+
+ if (len == 0) {
+ /* data got truncated to nothing. */
+ lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
+ /* Let kqswnal_rx_done() complete the RPC with success */
+ krx->krx_rpc_reply_status = 0;
+ return (0);
+ }
+
+ /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
+ actually sending a portals message with it */
+ ktx = kqswnal_get_idle_tx(NULL, 0);
+ if (ktx == NULL) {
+ CERROR ("Can't get txd for RDMA with "LPX64"\n",
+ libmsg->ev.initiator.nid);
+ return (-ENOMEM);
+ }
+
+ ktx->ktx_state = KTX_RDMAING;
+ ktx->ktx_nid = libmsg->ev.initiator.nid;
+ ktx->ktx_args[0] = krx;
+ ktx->ktx_args[1] = libmsg;
+
+#if MULTIRAIL_EKC
+ /* Map on the rail the RPC prefers */
+ ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
+ ep_rxd_railmask(krx->krx_rxd));
+#endif
+
+ /* Start mapping at offset 0 (we're not mapping any headers) */
ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
+
if (kiov != NULL)
- rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
+ rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
else
- rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
+ rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
if (rc != 0) {
- CERROR ("Can't map source data: %d\n", rc);
- return (rc);
+ CERROR ("Can't map local RDMA data: %d\n", rc);
+ goto out;
}
#if MULTIRAIL_EKC
- if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
- CERROR("Can't cope with unequal # frags: %d local %d remote\n",
- ktx->ktx_nfrag, rmd->kqrmd_nfrag);
- return (-EINVAL);
+ rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ if (rc != 0) {
+ CERROR ("Incompatible RDMA descriptors\n");
+ goto out;
}
-
- for (i = 0; i < rmd->kqrmd_nfrag; i++)
- if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
- CERROR("Can't cope with unequal frags %d(%d):"
- " %d local %d remote\n",
- i, rmd->kqrmd_nfrag,
- ktx->ktx_frags[i].nmd_len,
- rmd->kqrmd_frag[i].nmd_len);
- return (-EINVAL);
- }
#else
- ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
- ktx->ktx_nfrag, ktx->ktx_frags,
- rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ switch (type) {
+ default:
+ LBUG();
+
+ case PTL_MSG_GET:
+ ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+ ktx->ktx_nfrag, ktx->ktx_frags,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ break;
+
+ case PTL_MSG_PUT:
+ ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag,
+ ktx->ktx_nfrag, ktx->ktx_frags);
+ break;
+ }
+
if (ndatav < 0) {
CERROR ("Can't create datavec: %d\n", ndatav);
- return (ndatav);
+ rc = ndatav;
+ goto out;
}
#endif
- /* Our caller will start to race with kqswnal_dma_reply_complete... */
- LASSERT (atomic_read (&krx->krx_refcount) == 1);
- atomic_set (&krx->krx_refcount, 2);
+ LASSERT (atomic_read(&krx->krx_refcount) > 0);
+ /* Take an extra ref for the completion callback */
+ atomic_inc(&krx->krx_refcount);
-#if MULTIRAIL_EKC
- rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
- &kqswnal_rpc_success,
- ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
- if (rc == EP_SUCCESS)
- return (0);
+ switch (type) {
+ default:
+ LBUG();
- /* Well we tried... */
- krx->krx_rpc_reply_needed = 0;
+ case PTL_MSG_GET:
+#if MULTIRAIL_EKC
+ eprc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rdma_store_complete, ktx,
+ &kqswnal_data.kqn_rpc_success,
+ ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
#else
- rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
- &kqswnal_rpc_success, datav, ndatav);
- if (rc == EP_SUCCESS)
- return (0);
-
- /* "old" EKC destroys rxd on failed completion */
- krx->krx_rxd = NULL;
+ eprc = ep_complete_rpc (krx->krx_rxd,
+ kqswnal_rdma_store_complete, ktx,
+ &kqswnal_data.kqn_rpc_success,
+ datav, ndatav);
+ if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
+ krx->krx_rxd = NULL;
#endif
+ if (eprc != EP_SUCCESS) {
+ CERROR("can't complete RPC: %d\n", eprc);
+ /* don't re-attempt RPC completion */
+ krx->krx_rpc_reply_needed = 0;
+ rc = -ECONNABORTED;
+ }
+ break;
+
+ case PTL_MSG_PUT:
+#if MULTIRAIL_EKC
+ eprc = ep_rpc_get (krx->krx_rxd,
+ kqswnal_rdma_fetch_complete, ktx,
+ rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
+ eprc = ep_rpc_get (krx->krx_rxd,
+ kqswnal_rdma_fetch_complete, ktx,
+ datav, ndatav);
+#endif
+ if (eprc != EP_SUCCESS) {
+ CERROR("ep_rpc_get failed: %d\n", eprc);
+ /* Don't attempt RPC completion:
+ * EKC nuked it when the get failed */
+ krx->krx_rpc_reply_needed = 0;
+ rc = -ECONNABORTED;
+ }
+ break;
+ }
- CERROR("can't complete RPC: %d\n", rc);
-
- /* reset refcount back to 1: we're not going to be racing with
- * kqswnal_dma_reply_complete. */
- atomic_set (&krx->krx_refcount, 1);
+ out:
+ if (rc != 0) {
+ kqswnal_rx_decref(krx); /* drop callback's ref */
+ kqswnal_put_idle_tx (ktx);
+ }
- return (-ECONNABORTED);
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
+ return (rc);
}
static ptl_err_t
-kqswnal_sendmsg (nal_cb_t *nal,
+kqswnal_sendmsg (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_hdr_t *hdr,
int sumoff;
int sumnob;
#endif
+ /* NB 1. hdr is in network byte order */
+ /* 2. 'private' depends on the message type */
CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
" pid %u\n", payload_nob, payload_niov, nid, pid);
LASSERT (payload_kiov == NULL || !in_interrupt ());
/* payload is either all vaddrs or all pages */
LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
-
+
if (payload_nob > KQSW_MAXPAYLOAD) {
CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
payload_nob, KQSW_MAXPAYLOAD);
return (PTL_FAIL);
}
+ if (type == PTL_MSG_REPLY && /* can I look in 'private' */
+ ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
+ /* Must be a REPLY for an optimized GET */
+ rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
+ payload_niov, payload_iov, payload_kiov,
+ payload_offset, payload_nob);
+ return ((rc == 0) ? PTL_OK : PTL_FAIL);
+ }
+
targetnid = nid;
if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
type == PTL_MSG_REPLY ||
in_interrupt()));
if (ktx == NULL) {
- kqswnal_cerror_hdr (hdr);
- return (PTL_NOSPACE);
+ CERROR ("Can't get txd for msg type %d for "LPX64"\n",
+ type, libmsg->ev.initiator.nid);
+ return (PTL_NO_SPACE);
}
+ ktx->ktx_state = KTX_SENDING;
ktx->ktx_nid = targetnid;
ktx->ktx_args[0] = private;
ktx->ktx_args[1] = libmsg;
-
- if (type == PTL_MSG_REPLY &&
- ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
- if (nid != targetnid ||
- kqswnal_nid2elanid(nid) !=
- ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
- CERROR("Optimized reply nid conflict: "
- "nid "LPX64" via "LPX64" elanID %d\n",
- nid, targetnid,
- ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
- return (PTL_FAIL);
- }
-
- /* peer expects RPC completion with GET data */
- rc = kqswnal_dma_reply (ktx, payload_niov,
- payload_iov, payload_kiov,
- payload_offset, payload_nob);
- if (rc == 0)
- return (PTL_OK);
-
- CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
+ ktx->ktx_args[2] = NULL; /* set when a GET commits to REPLY */
memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
- ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
#if KQSW_CHECKSUM
csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
#endif
- if (kqswnal_data.kqn_optimized_gets &&
- type == PTL_MSG_GET && /* doing a GET */
- nid == targetnid) { /* not forwarding */
+ /* The first frag will be the pre-mapped buffer for (at least) the
+ * portals header. */
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+
+ if (nid == targetnid && /* not forwarding */
+ ((type == PTL_MSG_GET && /* optimize GET? */
+ kqswnal_tunables.kqn_optimized_gets != 0 &&
+ le32_to_cpu(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
+ (type == PTL_MSG_PUT && /* optimize PUT? */
+ kqswnal_tunables.kqn_optimized_puts != 0 &&
+ payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
lib_md_t *md = libmsg->md;
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
- /* Optimised path: I send over the Elan vaddrs of the get
- * sink buffers, and my peer DMAs directly into them.
+ /* Optimised path: I send over the Elan vaddrs of the local
+ * buffers, and my peer DMAs directly to/from them.
*
* First I set up ktx as if it was going to send this
* payload, (it needs to map it anyway). This fills
* ktx_frags[1] and onward with the network addresses
* of the GET sink frags. I copy these into ktx_buffer,
- * immediately after the header, and send that as my GET
- * message.
- *
- * Note that the addresses are sent in native endian-ness.
- * When EKC copes with different endian nodes, I'll fix
- * this (and eat my hat :) */
+ * immediately after the header, and send that as my
+ * message. */
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- ktx->ktx_state = KTX_GETTING;
+ ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
if ((libmsg->md->options & PTL_MD_KIOV) != 0)
rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
else
rc = kqswnal_map_tx_iov (ktx, 0, md->length,
md->md_niov, md->md_iov.iov);
-
- if (rc < 0) {
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
+ if (rc != 0)
+ goto out;
rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
#endif
+ if (type == PTL_MSG_GET) {
+ /* Allocate reply message now while I'm in thread context */
+ ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
+ nid, libmsg);
+ if (ktx->ktx_args[2] == NULL)
+ goto out;
+
+ /* NB finalizing the REPLY message is my
+ * responsibility now, whatever happens. */
+ }
+
} else if (payload_nob <= KQSW_TX_MAXCONTIG) {
/* small message: single frag copied into the pre-mapped buffer */
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- ktx->ktx_state = KTX_SENDING;
#if MULTIRAIL_EKC
ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
0, KQSW_HDR_SIZE + payload_nob);
/* large message: multiple frags: first is hdr in pre-mapped buffer */
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- ktx->ktx_state = KTX_SENDING;
#if MULTIRAIL_EKC
ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
0, KQSW_HDR_SIZE);
else
rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
payload_niov, payload_iov);
- if (rc != 0) {
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
+ if (rc != 0)
+ goto out;
}
ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
rc = kqswnal_launch (ktx);
- if (rc != 0) { /* failed? */
- CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
+
+ out:
+ CDEBUG(rc == 0 ? D_NET : D_ERROR,
+ "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n",
+ rc == 0 ? "Sent" : "Failed to send",
+ payload_nob, nid, targetnid, rc);
+
+ if (rc != 0) {
+ if (ktx->ktx_state == KTX_GETTING &&
+ ktx->ktx_args[2] != NULL) {
+ /* We committed to reply, but there was a problem
+ * launching the GET. We can't avoid delivering a
+ * REPLY event since we committed above, so we
+ * pretend the GET succeeded but the REPLY
+ * failed. */
+ rc = 0;
+ lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
+ lib_finalize (&kqswnal_lib, private,
+ (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
+ }
+
kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
}
-
- CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n",
- payload_nob, nid, targetnid);
- return (PTL_OK);
+
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
+ return (rc == 0 ? PTL_OK : PTL_FAIL);
}
static ptl_err_t
-kqswnal_send (nal_cb_t *nal,
+kqswnal_send (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_hdr_t *hdr,
}
static ptl_err_t
-kqswnal_send_pages (nal_cb_t *nal,
+kqswnal_send_pages (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_hdr_t *hdr,
if (ktx == NULL) /* can't get txd right now */
return; /* fwd will be scheduled when tx desc freed */
- if (nid == kqswnal_lib.ni.nid) /* gateway is me */
+ if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
nid = fwd->kprfd_target_nid; /* target is final dest */
if (kqswnal_nid2elanid (nid) < 0) {
CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
rc = -EHOSTUNREACH;
- goto failed;
+ goto out;
}
/* copy hdr into pre-mapped buffer */
memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
- ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ?
EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
#endif
rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
if (rc != 0)
- goto failed;
+ goto out;
}
rc = kqswnal_launch (ktx);
- if (rc == 0)
- return;
+ out:
+ if (rc != 0) {
+ CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
- failed:
- LASSERT (rc != 0);
- CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
+ /* complete now (with failure) */
+ kqswnal_tx_done (ktx, rc);
+ }
- kqswnal_put_idle_tx (ktx);
- /* complete now (with failure) */
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
}
void
ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
- NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
+ le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid),error);
}
- kqswnal_requeue_rx (krx);
+ LASSERT (atomic_read(&krx->krx_refcount) == 1);
+ kqswnal_rx_decref (krx);
}
void
-kqswnal_dma_reply_complete (EP_RXD *rxd)
+kqswnal_requeue_rx (kqswnal_rx_t *krx)
{
- int status = ep_rxd_status(rxd);
- kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
- kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
- lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1];
-
- CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
- "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+ LASSERT (atomic_read(&krx->krx_refcount) == 0);
+ LASSERT (!krx->krx_rpc_reply_needed);
- LASSERT (krx->krx_rxd == rxd);
- LASSERT (krx->krx_rpc_reply_needed);
+ krx->krx_state = KRX_POSTED;
- krx->krx_rpc_reply_needed = 0;
- kqswnal_rx_done (krx);
+#if MULTIRAIL_EKC
+ if (kqswnal_data.kqn_shuttingdown) {
+ /* free EKC rxd on shutdown */
+ ep_complete_receive(krx->krx_rxd);
+ } else {
+ /* repost receive */
+ ep_requeue_receive(krx->krx_rxd,
+ kqswnal_rxhandler, krx,
+ &krx->krx_elanbuffer, 0);
+ }
+#else
+ if (kqswnal_data.kqn_shuttingdown)
+ return;
- lib_finalize (&kqswnal_lib, NULL, msg,
- (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
- kqswnal_put_idle_tx (ktx);
+ if (krx->krx_rxd == NULL) {
+ /* We had a failed ep_complete_rpc() which nukes the
+ * descriptor in "old" EKC */
+ int eprc = ep_queue_receive(krx->krx_eprx,
+ kqswnal_rxhandler, krx,
+ krx->krx_elanbuffer,
+ krx->krx_npages * PAGE_SIZE, 0);
+ LASSERT (eprc == EP_SUCCESS);
+ /* We don't handle failure here; it's incredibly rare
+ * (never reported?) and only happens with "old" EKC */
+ } else {
+ ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+ krx->krx_elanbuffer,
+ krx->krx_npages * PAGE_SIZE);
+ }
+#endif
}
void
}
void
-kqswnal_requeue_rx (kqswnal_rx_t *krx)
+kqswnal_rx_done (kqswnal_rx_t *krx)
{
- int rc;
+ int rc;
+ EP_STATUSBLK *sblk;
LASSERT (atomic_read(&krx->krx_refcount) == 0);
if (krx->krx_rpc_reply_needed) {
+ /* We've not completed the peer's RPC yet... */
+ sblk = (krx->krx_rpc_reply_status == 0) ?
+ &kqswnal_data.kqn_rpc_success :
+ &kqswnal_data.kqn_rpc_failed;
- /* We failed to complete the peer's optimized GET (e.g. we
- * couldn't map the source buffers). We complete the
- * peer's EKC rpc now with failure. */
+ LASSERT (!in_interrupt());
#if MULTIRAIL_EKC
- rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
- &kqswnal_rpc_failed, NULL, NULL, 0);
+ rc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rpc_complete, krx,
+ sblk, NULL, NULL, 0);
if (rc == EP_SUCCESS)
return;
-
- CERROR("can't complete RPC: %d\n", rc);
#else
- if (krx->krx_rxd != NULL) {
- /* We didn't try (and fail) to complete earlier... */
- rc = ep_complete_rpc(krx->krx_rxd,
- kqswnal_rpc_complete, krx,
- &kqswnal_rpc_failed, NULL, 0);
- if (rc == EP_SUCCESS)
- return;
-
- CERROR("can't complete RPC: %d\n", rc);
- }
-
- /* NB the old ep_complete_rpc() frees rxd on failure, so we
- * have to requeue from scratch here, unless we're shutting
- * down */
- if (kqswnal_data.kqn_shuttingdown)
+ rc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rpc_complete, krx,
+ sblk, NULL, 0);
+ if (rc == EP_SUCCESS)
return;
- rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
- krx->krx_elanbuffer,
- krx->krx_npages * PAGE_SIZE, 0);
- LASSERT (rc == EP_SUCCESS);
- /* We don't handle failure here; it's incredibly rare
- * (never reported?) and only happens with "old" EKC */
- return;
+ /* "old" EKC destroys rxd on failed completion */
+ krx->krx_rxd = NULL;
#endif
+ CERROR("can't complete RPC: %d\n", rc);
+ krx->krx_rpc_reply_needed = 0;
}
-#if MULTIRAIL_EKC
- if (kqswnal_data.kqn_shuttingdown) {
- /* free EKC rxd on shutdown */
- ep_complete_receive(krx->krx_rxd);
- } else {
- /* repost receive */
- ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
- &krx->krx_elanbuffer, 0);
- }
-#else
- /* don't actually requeue on shutdown */
- if (!kqswnal_data.kqn_shuttingdown)
- ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
- krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
-#endif
+ kqswnal_requeue_rx(krx);
}
void
-kqswnal_rx (kqswnal_rx_t *krx)
+kqswnal_parse (kqswnal_rx_t *krx)
{
ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
- ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
+ ptl_nid_t dest_nid = le64_to_cpu(hdr->dest_nid);
int payload_nob;
int nob;
int niov;
- LASSERT (atomic_read(&krx->krx_refcount) == 0);
+ LASSERT (atomic_read(&krx->krx_refcount) == 1);
- if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
- atomic_set(&krx->krx_refcount, 1);
- lib_parse (&kqswnal_lib, hdr, krx);
- kqswnal_rx_done(krx);
+ if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
+ /* I ignore parse errors since I'm not consuming a byte
+ * stream */
+ (void)lib_parse (&kqswnal_lib, hdr, krx);
+
+ /* Drop my ref; any RDMA activity takes an additional ref */
+ kqswnal_rx_decref(krx);
return;
}
#if KQSW_CHECKSUM
- CERROR ("checksums for forwarded packets not implemented\n");
- LBUG ();
+ LASSERTF (0, "checksums for forwarded packets not implemented\n");
#endif
+
if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
{
CERROR("dropping packet from "LPX64" for "LPX64
- ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
+ ": target is peer\n", le64_to_cpu(hdr->src_nid), dest_nid);
- kqswnal_requeue_rx (krx);
+ kqswnal_rx_decref (krx);
return;
}
void
kqswnal_rxhandler(EP_RXD *rxd)
{
- long flags;
+ unsigned long flags;
int nob = ep_rxd_len (rxd);
int status = ep_rxd_status (rxd);
kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
rxd, krx, nob, status);
LASSERT (krx != NULL);
-
+ LASSERT (krx->krx_state = KRX_POSTED);
+
+ krx->krx_state = KRX_PARSE;
krx->krx_rxd = rxd;
krx->krx_nob = nob;
-#if MULTIRAIL_EKC
- krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
-#else
- krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
-#endif
-
+
+ /* RPC reply iff rpc request received without error */
+ krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
+ (status == EP_SUCCESS ||
+ status == EP_MSG_TOO_BIG);
+
+ /* Default to failure if an RPC reply is requested but not handled */
+ krx->krx_rpc_reply_status = -EPROTO;
+ atomic_set (&krx->krx_refcount, 1);
+
/* must receive a whole header to be able to parse */
if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
{
CERROR("receive status failed with status %d nob %d\n",
ep_rxd_status(rxd), nob);
#endif
- kqswnal_requeue_rx (krx);
+ kqswnal_rx_decref(krx);
return;
}
if (!in_interrupt()) {
- kqswnal_rx (krx);
+ kqswnal_parse(krx);
return;
}
CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
", dpid %d, spid %d, type %d\n",
ishdr ? "Header" : "Payload", krx,
- NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
- NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
- NTOH__u32(hdr->type));
+ le64_to_cpu(hdr->dest_nid), le64_to_cpu(hdr->src_nid)
+ le32_to_cpu(hdr->dest_pid), le32_to_cpu(hdr->src_pid),
+ le32_to_cpu(hdr->type));
- switch (NTOH__u32 (hdr->type))
+ switch (le32_to_cpu(hdr->type))
{
case PTL_MSG_ACK:
CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
" len %u\n",
- NTOH__u32(hdr->msg.ack.mlength),
+ le32_to_cpu(hdr->msg.ack.mlength),
hdr->msg.ack.dst_wmd.handle_cookie,
hdr->msg.ack.dst_wmd.handle_idx,
- NTOH__u64(hdr->msg.ack.match_bits),
- NTOH__u32(hdr->msg.ack.length));
+ le64_to_cpu(hdr->msg.ack.match_bits),
+ le32_to_cpu(hdr->msg.ack.length));
break;
case PTL_MSG_PUT:
CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
" len %u off %u data "LPX64"\n",
- NTOH__u32(hdr->msg.put.ptl_index),
+ le32_to_cpu(hdr->msg.put.ptl_index),
hdr->msg.put.ack_wmd.handle_cookie,
hdr->msg.put.ack_wmd.handle_idx,
- NTOH__u64(hdr->msg.put.match_bits),
- NTOH__u32(hdr->msg.put.length),
- NTOH__u32(hdr->msg.put.offset),
+ le64_to_cpu(hdr->msg.put.match_bits),
+ le32_to_cpu(hdr->msg.put.length),
+ le32_to_cpu(hdr->msg.put.offset),
hdr->msg.put.hdr_data);
break;
case PTL_MSG_GET:
#endif
static ptl_err_t
-kqswnal_recvmsg (nal_cb_t *nal,
+kqswnal_recvmsg (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
{
kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
char *buffer = page_address(krx->krx_kiov[0].kiov_page);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
int page;
char *page_ptr;
int page_nob;
char *iov_ptr;
int iov_nob;
int frag;
+ int rc;
#if KQSW_CHECKSUM
kqsw_csum_t senders_csum;
kqsw_csum_t payload_csum = 0;
- kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
+ kqsw_csum_t hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
size_t csum_len = mlen;
int csum_frags = 0;
int csum_nob = 0;
if (senders_csum != hdr_csum)
kqswnal_csum_error (krx, 1);
#endif
+ /* NB lib_parse() has already flipped *hdr */
+
CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
+ if (krx->krx_rpc_reply_needed &&
+ hdr->type == PTL_MSG_PUT) {
+ /* This must be an optimized PUT */
+ rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
+ niov, iov, kiov, offset, mlen);
+ return (rc == 0 ? PTL_OK : PTL_FAIL);
+ }
+
/* What was actually received must be >= payload. */
LASSERT (mlen <= rlen);
if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
}
static ptl_err_t
-kqswnal_recv(nal_cb_t *nal,
+kqswnal_recv(lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
}
static ptl_err_t
-kqswnal_recv_pages (nal_cb_t *nal,
+kqswnal_recv_pages (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
return ((int)pid);
atomic_inc (&kqswnal_data.kqn_nthreads);
- atomic_inc (&kqswnal_data.kqn_nthreads_running);
return (0);
}
kqswnal_rx_t *krx;
kqswnal_tx_t *ktx;
kpr_fwd_desc_t *fwd;
- long flags;
+ unsigned long flags;
int rc;
int counter = 0;
- int shuttingdown = 0;
int did_something;
kportal_daemonize ("kqswnal_sched");
for (;;)
{
- if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
-
- if (kqswnal_data.kqn_shuttingdown == 2)
- break;
-
- /* During stage 1 of shutdown we are still responsive
- * to receives */
-
- atomic_dec (&kqswnal_data.kqn_nthreads_running);
- shuttingdown = kqswnal_data.kqn_shuttingdown;
- }
-
did_something = 0;
if (!list_empty (&kqswnal_data.kqn_readyrxds))
spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
flags);
- kqswnal_rx (krx);
+ switch (krx->krx_state) {
+ case KRX_PARSE:
+ kqswnal_parse (krx);
+ break;
+ case KRX_COMPLETING:
+ kqswnal_rx_decref (krx);
+ break;
+ default:
+ LBUG();
+ }
did_something = 1;
spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
}
- if (!shuttingdown &&
- !list_empty (&kqswnal_data.kqn_delayedtxds))
+ if (!list_empty (&kqswnal_data.kqn_delayedtxds))
{
ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
kqswnal_tx_t, ktx_list);
flags);
rc = kqswnal_launch (ktx);
- if (rc != 0) /* failed: ktx_nid down? */
- {
+ if (rc != 0) {
CERROR("Failed delayed transmit to "LPX64
": %d\n", ktx->ktx_nid, rc);
kqswnal_tx_done (ktx, rc);
}
+ atomic_dec (&kqswnal_data.kqn_pending_txs);
did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
- if (!shuttingdown &
- !list_empty (&kqswnal_data.kqn_delayedfwds))
+ if (!list_empty (&kqswnal_data.kqn_delayedfwds))
{
fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
list_del (&fwd->kprfd_list);
spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+ /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
kqswnal_fwd_packet (NULL, fwd);
did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
- /* nothing to do or hogging CPU */
+ /* nothing to do or hogging CPU */
if (!did_something || counter++ == KQSW_RESCHED) {
spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
flags);
counter = 0;
if (!did_something) {
+ if (kqswnal_data.kqn_shuttingdown == 2) {
+ /* We only exit in stage 2 of shutdown when
+ * there's nothing left to do */
+ break;
+ }
rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
- kqswnal_data.kqn_shuttingdown != shuttingdown ||
+ kqswnal_data.kqn_shuttingdown == 2 ||
!list_empty(&kqswnal_data.kqn_readyrxds) ||
!list_empty(&kqswnal_data.kqn_delayedtxds) ||
!list_empty(&kqswnal_data.kqn_delayedfwds));
LASSERT (rc == 0);
- } else if (current->need_resched)
+ } else if (need_resched())
schedule ();
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
}
- spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
-
kqswnal_thread_fini ();
return (0);
}
-nal_cb_t kqswnal_lib =
+lib_nal_t kqswnal_lib =
{
- nal_data: &kqswnal_data, /* NAL private data */
- cb_send: kqswnal_send,
- cb_send_pages: kqswnal_send_pages,
- cb_recv: kqswnal_recv,
- cb_recv_pages: kqswnal_recv_pages,
- cb_read: kqswnal_read,
- cb_write: kqswnal_write,
- cb_malloc: kqswnal_malloc,
- cb_free: kqswnal_free,
- cb_printf: kqswnal_printf,
- cb_cli: kqswnal_cli,
- cb_sti: kqswnal_sti,
- cb_dist: kqswnal_dist
+ libnal_data: &kqswnal_data, /* NAL private data */
+ libnal_send: kqswnal_send,
+ libnal_send_pages: kqswnal_send_pages,
+ libnal_recv: kqswnal_recv,
+ libnal_recv_pages: kqswnal_recv_pages,
+ libnal_dist: kqswnal_dist
};