-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
+/*
+ * GPL HEADER START
*
- * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
- * Author: PJ Kirner <pjkirner@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * This file is part of the Lustre file system, http://www.lustre.org
- * Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * This file is confidential source code owned by Cluster File Systems.
- * No viewing, modification, compilation, redistribution, or any other
- * form of use is permitted except through a signed license agreement.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * If you have not signed such an agreement, then you have no rights to
- * this file. Please destroy it immediately and contact CFS.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, Intel Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lnet/klnds/ptllnd/ptllnd_rx_buf.c
+ *
+ * Author: PJ Kirner <pjkirner@clusterfs.com>
*/
#include "ptllnd.h"
kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
{
memset(rxbp, 0, sizeof(*rxbp));
- spin_lock_init(&rxbp->rxbp_lock);
- INIT_LIST_HEAD(&rxbp->rxbp_list);
+ spin_lock_init(&rxbp->rxbp_lock);
+ CFS_INIT_LIST_HEAD(&rxbp->rxbp_list);
}
void
LASSERT(!rxb->rxb_posted);
LASSERT(rxb->rxb_idle);
- list_del(&rxb->rxb_list);
+ cfs_list_del(&rxb->rxb_list);
rxbp->rxbp_count--;
LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
for (;;) {
if (rxbp->rxbp_shutdown) {
break;
}
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
LIBCFS_ALLOC(rxb, sizeof(*rxb));
LIBCFS_ALLOC(buffer, bufsize);
if (buffer != NULL)
LIBCFS_FREE(buffer, bufsize);
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
rc = -ENOMEM;
break;
}
rxb->rxb_buffer = buffer;
rxb->rxb_mdh = PTL_INVALID_HANDLE;
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
if (rxbp->rxbp_shutdown) {
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
LIBCFS_FREE(rxb, sizeof(*rxb));
LIBCFS_FREE(buffer, bufsize);
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
rc = -ESHUTDOWN;
break;
}
- list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
+ cfs_list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
rxbp->rxbp_count++;
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
kptllnd_rx_buffer_post(rxb);
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
}
if (rc == 0)
rxbp->rxbp_reserved += count;
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
return rc;
}
{
unsigned long flags;
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
rxbp->rxbp_reserved -= count;
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
}
void
int rc;
int i;
unsigned long flags;
- struct list_head *tmp;
- struct list_head *nxt;
+ cfs_list_t *tmp;
+ cfs_list_t *nxt;
ptl_handle_md_t mdh;
- /* CAVEAT EMPTOR: I'm racing with everything here!!!
+ /* CAVEAT EMPTOR: I'm racing with everything here!!!
*
* Buffers can still be posted after I set rxbp_shutdown because I
* can't hold rxbp_lock while I'm posting them.
* different MD) from when the MD is actually unlinked, to when the
* event callback tells me it has been unlinked. */
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
rxbp->rxbp_shutdown = 1;
for (i = 9;; i++) {
- list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
- rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list);
-
+ cfs_list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
+ rxb = cfs_list_entry (tmp, kptl_rx_buffer_t, rxb_list);
+
if (rxb->rxb_idle) {
- spin_unlock_irqrestore(&rxbp->rxbp_lock,
- flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock,
+ flags);
kptllnd_rx_buffer_destroy(rxb);
- spin_lock_irqsave(&rxbp->rxbp_lock,
- flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock,
+ flags);
continue;
}
if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
continue;
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
rc = PtlMDUnlink(mdh);
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
/* callback clears rxb_mdh and drops net's ref
#endif
}
- if (list_empty(&rxbp->rxbp_list))
+ if (cfs_list_empty(&rxbp->rxbp_list))
break;
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
/* Wait a bit for references to be dropped */
CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
cfs_pause(cfs_time_seconds(1));
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
}
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
}
void
kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
unsigned long flags;
- LASSERT (!in_interrupt());
+ LASSERT (!cfs_in_interrupt());
LASSERT (rxb->rxb_refcount == 0);
LASSERT (!rxb->rxb_idle);
LASSERT (!rxb->rxb_posted);
any.nid = PTL_NID_ANY;
any.pid = PTL_PID_ANY;
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
if (rxbp->rxbp_shutdown) {
rxb->rxb_idle = 1;
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
return;
}
rxb->rxb_refcount = 1; /* net's ref */
rxb->rxb_posted = 1; /* I'm posting */
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
rc = PtlMEAttach(kptllnd_data.kptl_nih,
*kptllnd_tunables.kptl_portal,
PTL_INS_AFTER,
&meh);
if (rc != PTL_OK) {
- CERROR("PtlMeAttach rxb failed %d\n", rc);
+ CERROR("PtlMeAttach rxb failed %s(%d)\n",
+ kptllnd_errtype2str(rc), rc);
goto failed;
}
* Setup MD
*/
md.start = rxb->rxb_buffer;
- md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;
+ md.length = kptllnd_rx_buffer_size();
md.threshold = PTL_MD_THRESH_INF;
md.options = PTL_MD_OP_PUT |
PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
if (rc == PTL_OK) {
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
if (rxb->rxb_posted) /* Not auto-unlinked yet!!! */
rxb->rxb_mdh = mdh;
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
return;
}
- CERROR("PtlMDAttach rxb failed %d\n", rc);
+ CERROR("PtlMDAttach rxb failed %s(%d)\n",
+ kptllnd_errtype2str(rc), rc);
rc = PtlMEUnlink(meh);
LASSERT(rc == PTL_OK);
failed:
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
rxb->rxb_posted = 0;
/* XXX this will just try again immediately */
kptllnd_rx_buffer_decref_locked(rxb);
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
}
kptl_rx_t *
}
void
-kptllnd_rx_done(kptl_rx_t *rx)
+kptllnd_rx_done(kptl_rx_t *rx, int post_credit)
{
kptl_rx_buffer_t *rxb = rx->rx_rxb;
kptl_peer_t *peer = rx->rx_peer;
unsigned long flags;
+ LASSERT (post_credit == PTLLND_POSTRX_NO_CREDIT ||
+ post_credit == PTLLND_POSTRX_PEER_CREDIT);
+
CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
if (rxb != NULL)
if (peer != NULL) {
/* Update credits (after I've decref-ed the buffer) */
- spin_lock_irqsave(&peer->peer_lock, flags);
+ spin_lock_irqsave(&peer->peer_lock, flags);
- peer->peer_outstanding_credits++;
- LASSERT (peer->peer_outstanding_credits <=
- *kptllnd_tunables.kptl_peercredits);
+ if (post_credit == PTLLND_POSTRX_PEER_CREDIT)
+ peer->peer_outstanding_credits++;
+
+ LASSERT (peer->peer_outstanding_credits +
+ peer->peer_sent_credits <=
+ *kptllnd_tunables.kptl_peertxcredits);
- CDEBUG(D_NETTRACE, "%s[%d/%d]: rx %p done\n",
- libcfs_id2str(peer->peer_id),
- peer->peer_credits, peer->peer_outstanding_credits, rx);
+ CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
+ libcfs_id2str(peer->peer_id), peer->peer_credits,
+ peer->peer_outstanding_credits, peer->peer_sent_credits,
+ rx);
- spin_unlock_irqrestore(&peer->peer_lock, flags);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
/* I might have to send back credits */
kptllnd_peer_check_sends(peer);
unlinked = ev->type == PTL_EVENT_UNLINK;
#endif
- CDEBUG(D_NET, "RXB Callback %s(%d) rxb=%p id=%s unlink=%d rc %d\n",
- kptllnd_evtype2str(ev->type), ev->type, rxb,
- kptllnd_ptlid2str(ev->initiator),
- unlinked, ev->ni_fail_type);
+ CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
+ kptllnd_ptlid2str(ev->initiator),
+ kptllnd_evtype2str(ev->type), ev->type, rxb,
+ kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
+ unlinked);
LASSERT (!rxb->rxb_idle);
LASSERT (ev->md.start == rxb->rxb_buffer);
- LASSERT (ev->offset + ev->mlength <=
+ LASSERT (ev->offset + ev->mlength <=
PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
- LASSERT (ev->type == PTL_EVENT_PUT_END ||
+ LASSERT (ev->type == PTL_EVENT_PUT_END ||
ev->type == PTL_EVENT_UNLINK);
LASSERT (ev->type == PTL_EVENT_UNLINK ||
ev->match_bits == LNET_MSG_MATCHBITS);
- if (ev->ni_fail_type != PTL_NI_OK)
- CERROR("event type %d, status %d from %s\n",
- ev->type, ev->ni_fail_type,
- kptllnd_ptlid2str(ev->initiator));
-
- if (ev->type == PTL_EVENT_PUT_END &&
- ev->ni_fail_type == PTL_NI_OK &&
- !rxbp->rxbp_shutdown) {
+ if (ev->ni_fail_type != PTL_NI_OK) {
+ CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
+ kptllnd_ptlid2str(ev->initiator),
+ kptllnd_evtype2str(ev->type), ev->type, rxb,
+ kptllnd_errtype2str(ev->ni_fail_type),
+ ev->ni_fail_type, unlinked);
+ } else if (ev->type == PTL_EVENT_PUT_END &&
+ !rxbp->rxbp_shutdown) {
/* rxbp_shutdown sampled without locking! I only treat it as a
* hint since shutdown can start while rx's are queued on
* odd-length message will misalign subsequent messages and
* force the fixup below... */
if ((ev->mlength & 7) != 0)
- CWARN("Message from %s has odd length %d: "
+ CWARN("Message from %s has odd length "LPU64": "
"probable version incompatibility\n",
kptllnd_ptlid2str(ev->initiator),
- ev->mlength);
+ (__u64)ev->mlength);
#endif
rx = kptllnd_rx_alloc();
if (rx == NULL) {
/* Portals can't force alignment - copy into
* rx_space (avoiding overflow) to fix */
int maxlen = *kptllnd_tunables.kptl_max_msg_size;
-
+
rx->rx_rxb = NULL;
rx->rx_nob = MIN(maxlen, ev->mlength);
rx->rx_msg = (kptl_msg_t *)rx->rx_space;
}
rx->rx_initiator = ev->initiator;
-#ifdef CRAY_XT3
- rx->rx_uid = ev->uid;
-#endif
+ rx->rx_treceived = jiffies;
/* Queue for attention */
- spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
- flags);
+ spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
+ flags);
- list_add_tail(&rx->rx_list,
- &kptllnd_data.kptl_sched_rxq);
- wake_up(&kptllnd_data.kptl_sched_waitq);
+ cfs_list_add_tail(&rx->rx_list,
+ &kptllnd_data.kptl_sched_rxq);
+ cfs_waitq_signal(&kptllnd_data.kptl_sched_waitq);
- spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
- flags);
+ spin_unlock_irqrestore(&kptllnd_data. \
+ kptl_sched_lock, flags);
}
}
if (unlinked) {
- spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
rxb->rxb_posted = 0;
rxb->rxb_mdh = PTL_INVALID_HANDLE;
kptllnd_rx_buffer_decref_locked(rxb);
- spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
}
}
void
-kptllnd_nak (kptl_rx_t *rx)
+kptllnd_nak (ptl_process_id_t dest)
{
/* Fire-and-forget a stub message that will let the peer know my
* protocol magic/version and make her drop/refresh any peer state she
rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
if (rc != PTL_OK) {
- CWARN("Can't NAK %s: bind failed %d\n",
- kptllnd_ptlid2str(rx->rx_initiator), rc);
+ CWARN("Can't NAK %s: bind failed %s(%d)\n",
+ kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
return;
}
- rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,
+ rc = PtlPut(mdh, PTL_NOACK_REQ, dest,
*kptllnd_tunables.kptl_portal, 0,
LNET_MSG_MATCHBITS, 0, 0);
+ if (rc != PTL_OK) {
+ CWARN("Can't NAK %s: put failed %s(%d)\n",
+ kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
+ }
+}
- if (rc != PTL_OK)
- CWARN("Can't NAK %s: put failed %d\n",
- kptllnd_ptlid2str(rx->rx_initiator), rc);
+kptl_net_t *
+kptllnd_find_net (lnet_nid_t nid)
+{
+ kptl_net_t *net;
+
+ read_lock(&kptllnd_data.kptl_net_rw_lock);
+ cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
+ LASSERT (!net->net_shutdown);
+
+ if (net->net_ni->ni_nid == nid) {
+ kptllnd_net_addref(net);
+ read_unlock(&kptllnd_data.kptl_net_rw_lock);
+ return net;
+ }
+ }
+ read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+ return NULL;
}
void
kptllnd_rx_parse(kptl_rx_t *rx)
{
kptl_msg_t *msg = rx->rx_msg;
+ int rc = 0;
+ int post_credit = PTLLND_POSTRX_PEER_CREDIT;
+ kptl_net_t *net = NULL;
kptl_peer_t *peer;
- int rc;
- int credits;
+ cfs_list_t txs;
unsigned long flags;
lnet_process_id_t srcid;
+ LASSERT (!cfs_in_interrupt());
LASSERT (rx->rx_peer == NULL);
+ CFS_INIT_LIST_HEAD(&txs);
+
if ((rx->rx_nob >= 4 &&
(msg->ptlm_magic == LNET_PROTO_MAGIC ||
msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
(__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
msg->ptlm_version : __swab16(msg->ptlm_version)),
PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
- kptllnd_nak(rx);
+ /* NB backward compatibility */
+ kptllnd_nak(rx->rx_initiator);
goto rx_done;
}
srcid.nid = msg->ptlm_srcnid;
srcid.pid = msg->ptlm_srcpid;
- CDEBUG(D_NETTRACE, "%s: RX %s c %d %p\n", libcfs_id2str(srcid),
- kptllnd_msgtype2str(msg->ptlm_type), msg->ptlm_credits, rx);
+ CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
+ libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
+ msg->ptlm_credits, rx, rx->rx_rxb,
+ jiffies - rx->rx_treceived,
+ cfs_duration_sec(jiffies - rx->rx_treceived));
- if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
- CERROR("Bad source id %s from %s\n",
+ if (kptllnd_lnet2ptlnid(srcid.nid) != rx->rx_initiator.nid) {
+ CERROR("Bad source nid %s from %s\n",
libcfs_id2str(srcid),
kptllnd_ptlid2str(rx->rx_initiator));
goto rx_done;
if (peer == NULL)
goto rx_done;
- CWARN("NAK from %s (%s)\n",
- libcfs_id2str(srcid),
+ CWARN("NAK from %s (%d:%s)\n",
+ libcfs_id2str(srcid), peer->peer_state,
kptllnd_ptlid2str(rx->rx_initiator));
+ /* NB can't nuke new peer - bug 17546 comment 31 */
+ if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
+ CDEBUG(D_NET, "Stale NAK from %s(%s): WAITING_HELLO\n",
+ libcfs_id2str(srcid),
+ kptllnd_ptlid2str(rx->rx_initiator));
+ kptllnd_peer_decref(peer);
+ goto rx_done;
+ }
+
rc = -EPROTO;
goto failed;
}
- if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||
- msg->ptlm_dstpid != the_lnet.ln_pid) {
- CERROR("Bad dstid %s (expected %s) from %s\n",
+ net = kptllnd_find_net(msg->ptlm_dstnid);
+ if (net == NULL || msg->ptlm_dstpid != the_lnet.ln_pid) {
+ CERROR("Bad dstid %s from %s\n",
libcfs_id2str((lnet_process_id_t) {
.nid = msg->ptlm_dstnid,
.pid = msg->ptlm_dstpid}),
- libcfs_id2str((lnet_process_id_t) {
- .nid = kptllnd_data.kptl_ni->ni_nid,
- .pid = the_lnet.ln_pid}),
kptllnd_ptlid2str(rx->rx_initiator));
goto rx_done;
}
+ if (LNET_NIDNET(srcid.nid) != LNET_NIDNET(net->net_ni->ni_nid)) {
+ lnet_nid_t nid = LNET_MKNID(LNET_NIDNET(net->net_ni->ni_nid),
+ LNET_NIDADDR(srcid.nid));
+ CERROR("Bad source nid %s from %s, %s expected.\n",
+ libcfs_id2str(srcid),
+ kptllnd_ptlid2str(rx->rx_initiator),
+ libcfs_nid2str(nid));
+ goto rx_done;
+ }
+
if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
- peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
- if (peer == NULL) {
- CWARN("No peer for %s\n",
- kptllnd_ptlid2str(rx->rx_initiator));
+ peer = kptllnd_peer_handle_hello(net, rx->rx_initiator, msg);
+ if (peer == NULL)
goto rx_done;
- }
} else {
peer = kptllnd_id2peer(srcid);
if (peer == NULL) {
- CWARN("NAK %s: no connection; peer must reconnect\n",
+ CWARN("NAK %s: no connection, %s must reconnect\n",
+ kptllnd_msgtype2str(msg->ptlm_type),
libcfs_id2str(srcid));
/* NAK to make the peer reconnect */
- kptllnd_nak(rx);
+ kptllnd_nak(rx->rx_initiator);
goto rx_done;
}
- /* Ignore anything else while I'm waiting for HELLO */
- if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
+ /* Ignore any messages for a previous incarnation of me */
+ if (msg->ptlm_dststamp < peer->peer_myincarnation) {
kptllnd_peer_decref(peer);
goto rx_done;
}
- }
- LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
- msg->ptlm_srcpid == peer->peer_id.pid);
+ if (msg->ptlm_dststamp != peer->peer_myincarnation) {
+ CERROR("%s: Unexpected dststamp "LPX64" "
+ "("LPX64" expected)\n",
+ libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
+ peer->peer_myincarnation);
+ rc = -EPROTO;
+ goto failed;
+ }
- if (msg->ptlm_srcstamp != peer->peer_incarnation) {
- CERROR("Stale rx from %s srcstamp "LPX64" expected "LPX64"\n",
- libcfs_id2str(peer->peer_id),
- msg->ptlm_srcstamp,
- peer->peer_incarnation);
- rc = -EPROTO;
- goto failed;
+ if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
+ /* recoverable error - restart txs */
+ spin_lock_irqsave(&peer->peer_lock, flags);
+ kptllnd_cancel_txlist(&peer->peer_sendq, &txs);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
+
+ CWARN("NAK %s: Unexpected %s message\n",
+ libcfs_id2str(srcid),
+ kptllnd_msgtype2str(msg->ptlm_type));
+ kptllnd_nak(rx->rx_initiator);
+ rc = -EPROTO;
+ goto failed;
+ }
+
+ if (msg->ptlm_srcstamp != peer->peer_incarnation) {
+ CERROR("%s: Unexpected srcstamp "LPX64" "
+ "("LPX64" expected)\n",
+ libcfs_id2str(srcid),
+ msg->ptlm_srcstamp,
+ peer->peer_incarnation);
+ rc = -EPROTO;
+ goto failed;
+ }
}
- if (msg->ptlm_dststamp != kptllnd_data.kptl_incarnation &&
- (msg->ptlm_type != PTLLND_MSG_TYPE_HELLO || /* HELLO sends a */
- msg->ptlm_dststamp != 0)) { /* zero dststamp */
- CERROR("Stale rx from %s dststamp "LPX64" expected "LPX64"\n",
- libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
- kptllnd_data.kptl_incarnation);
+ LASSERTF (LNET_NIDADDR(msg->ptlm_srcnid) ==
+ LNET_NIDADDR(peer->peer_id.nid), "m %s p %s\n",
+ libcfs_nid2str(msg->ptlm_srcnid),
+ libcfs_nid2str(peer->peer_id.nid));
+ LASSERTF (msg->ptlm_srcpid == peer->peer_id.pid, "m %u p %u\n",
+ msg->ptlm_srcpid, peer->peer_id.pid);
+
+ spin_lock_irqsave(&peer->peer_lock, flags);
+
+ /* Check peer only sends when I've sent her credits */
+ if (peer->peer_sent_credits == 0) {
+ int c = peer->peer_credits;
+ int oc = peer->peer_outstanding_credits;
+ int sc = peer->peer_sent_credits;
+
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
+
+ CERROR("%s: buffer overrun [%d/%d+%d]\n",
+ libcfs_id2str(peer->peer_id), c, sc, oc);
rc = -EPROTO;
goto failed;
}
+ peer->peer_sent_credits--;
- if (msg->ptlm_credits != 0) {
- spin_lock_irqsave(&peer->peer_lock, flags);
+ /* No check for credit overflow - the peer may post new
+ * buffers after the startup handshake. */
+ peer->peer_credits += msg->ptlm_credits;
- if (peer->peer_credits + msg->ptlm_credits >
- *kptllnd_tunables.kptl_peercredits) {
- credits = peer->peer_credits;
- spin_unlock_irqrestore(&peer->peer_lock, flags);
-
- CERROR("Credit overflow from %s: %d + %d > %d\n",
- libcfs_id2str(peer->peer_id),
- credits, msg->ptlm_credits,
- *kptllnd_tunables.kptl_peercredits);
- rc = -EPROTO;
- goto failed;
- }
-
- peer->peer_credits += msg->ptlm_credits;
+ /* This ensures the credit taken by NOOP can be returned */
+ if (msg->ptlm_type == PTLLND_MSG_TYPE_NOOP) {
+ peer->peer_outstanding_credits++;
+ post_credit = PTLLND_POSTRX_NO_CREDIT;
+ }
- spin_unlock_irqrestore(&peer->peer_lock, flags);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
+ /* See if something can go out now that credits have come in */
+ if (msg->ptlm_credits != 0)
kptllnd_peer_check_sends(peer);
- }
/* ptllnd-level protocol correct - rx takes my ref on peer and increments
* peer_outstanding_credits when it completes */
case PTLLND_MSG_TYPE_IMMEDIATE:
CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
- rc = lnet_parse(kptllnd_data.kptl_ni,
+ rc = lnet_parse(net->net_ni,
&msg->ptlm_u.immediate.kptlim_hdr,
msg->ptlm_srcnid,
rx, 0);
- if (rc >= 0) /* kptllnd_recv owns 'rx' now */
+ if (rc >= 0) { /* kptllnd_recv owns 'rx' now */
+ kptllnd_net_decref(net);
return;
+ }
goto failed;
case PTLLND_MSG_TYPE_PUT:
PTL_RESERVED_MATCHBITS);
/* Update last match bits seen */
- spin_lock_irqsave(&peer->peer_lock, flags);
+ spin_lock_irqsave(&peer->peer_lock, flags);
if (msg->ptlm_u.rdma.kptlrm_matchbits >
rx->rx_peer->peer_last_matchbits_seen)
rx->rx_peer->peer_last_matchbits_seen =
msg->ptlm_u.rdma.kptlrm_matchbits;
- spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
+ spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
- rc = lnet_parse(kptllnd_data.kptl_ni,
+ rc = lnet_parse(net->net_ni,
&msg->ptlm_u.rdma.kptlrm_hdr,
msg->ptlm_srcnid,
rx, 1);
- if (rc >= 0) /* kptllnd_recv owns 'rx' now */
+ if (rc >= 0) { /* kptllnd_recv owns 'rx' now */
+ kptllnd_net_decref(net);
return;
+ }
goto failed;
- }
+ }
failed:
+ LASSERT (rc != 0);
kptllnd_peer_close(peer, rc);
if (rx->rx_peer == NULL) /* drop ref on peer */
kptllnd_peer_decref(peer); /* unless rx_done will */
+ if (!cfs_list_empty(&txs)) {
+ LASSERT (net != NULL);
+ kptllnd_restart_txs(net, srcid, &txs);
+ }
rx_done:
- kptllnd_rx_done(rx);
+ if (net != NULL)
+ kptllnd_net_decref(net);
+ kptllnd_rx_done(rx, post_credit);
}