X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fptllnd%2Fptllnd_rx_buf.c;h=f12be7d03d1004919fbd3fcf9f3613582ead8c80;hp=b83ab518344cb16f8c9a3471e943e6b56405708a;hb=294c39d488fcd95a523466c7726ff1b5a8327890;hpb=5d88d521ad1abc2d94ac6a9c6a9b2e023335b757 diff --git a/lnet/klnds/ptllnd/ptllnd_rx_buf.c b/lnet/klnds/ptllnd/ptllnd_rx_buf.c index b83ab51..f12be7d 100644 --- a/lnet/klnds/ptllnd/ptllnd_rx_buf.c +++ b/lnet/klnds/ptllnd/ptllnd_rx_buf.c @@ -1,19 +1,39 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: +/* + * GPL HEADER START * - * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved. - * Author: PJ Kirner + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * This file is confidential source code owned by Cluster File Systems. - * No viewing, modification, compilation, redistribution, or any other - * form of use is permitted except through a signed license agreement. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * If you have not signed such an agreement, then you have no rights to - * this file. Please destroy it immediately and contact CFS. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lnet/klnds/ptllnd/ptllnd_rx_buf.c + * + * Author: PJ Kirner */ #include "ptllnd.h" @@ -22,8 +42,8 @@ void kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp) { memset(rxbp, 0, sizeof(*rxbp)); - spin_lock_init(&rxbp->rxbp_lock); - INIT_LIST_HEAD(&rxbp->rxbp_list); + cfs_spin_lock_init(&rxbp->rxbp_lock); + CFS_INIT_LIST_HEAD(&rxbp->rxbp_list); } void @@ -36,7 +56,7 @@ kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb) LASSERT(!rxb->rxb_posted); LASSERT(rxb->rxb_idle); - list_del(&rxb->rxb_list); + cfs_list_del(&rxb->rxb_list); rxbp->rxbp_count--; LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size()); @@ -58,7 +78,7 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count) CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); for (;;) { if (rxbp->rxbp_shutdown) { @@ -72,7 +92,7 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count) break; } - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); LIBCFS_ALLOC(rxb, sizeof(*rxb)); LIBCFS_ALLOC(buffer, bufsize); @@ -85,7 +105,7 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count) if (buffer != NULL) LIBCFS_FREE(buffer, bufsize); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); rc = -ENOMEM; break; } @@ -100,33 +120,33 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count) rxb->rxb_buffer = buffer; rxb->rxb_mdh = PTL_INVALID_HANDLE; - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); if (rxbp->rxbp_shutdown) { - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); LIBCFS_FREE(rxb, sizeof(*rxb)); LIBCFS_FREE(buffer, bufsize); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); rc = -ESHUTDOWN; break; } - list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list); + cfs_list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list); rxbp->rxbp_count++; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); kptllnd_rx_buffer_post(rxb); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); } if (rc == 0) rxbp->rxbp_reserved += count; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return rc; } @@ -137,12 +157,12 @@ kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp, { unsigned long flags; - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count); rxbp->rxbp_reserved -= count; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); } void @@ -152,11 +172,11 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp) int rc; int i; unsigned long flags; - struct list_head *tmp; - struct list_head *nxt; + cfs_list_t *tmp; + cfs_list_t *nxt; ptl_handle_md_t mdh; - /* CAVEAT EMPTOR: I'm racing with everything here!!! + /* CAVEAT EMPTOR: I'm racing with everything here!!! * * Buffers can still be posted after I set rxbp_shutdown because I * can't hold rxbp_lock while I'm posting them. @@ -167,20 +187,20 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp) * different MD) from when the MD is actually unlinked, to when the * event callback tells me it has been unlinked. */ - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); rxbp->rxbp_shutdown = 1; for (i = 9;; i++) { - list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) { - rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list); - + cfs_list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) { + rxb = cfs_list_entry (tmp, kptl_rx_buffer_t, rxb_list); + if (rxb->rxb_idle) { - spin_unlock_irqrestore(&rxbp->rxbp_lock, - flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, + flags); kptllnd_rx_buffer_destroy(rxb); - spin_lock_irqsave(&rxbp->rxbp_lock, - flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, + flags); continue; } @@ -188,11 +208,11 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp) if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE)) continue; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); rc = PtlMDUnlink(mdh); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS /* callback clears rxb_mdh and drops net's ref @@ -208,10 +228,10 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp) #endif } - if (list_empty(&rxbp->rxbp_list)) + if (cfs_list_empty(&rxbp->rxbp_list)) break; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); /* Wait a bit for references to be dropped */ CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ @@ -220,10 +240,10 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp) cfs_pause(cfs_time_seconds(1)); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); } - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); } void @@ -237,7 +257,7 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb) kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool; unsigned long flags; - LASSERT (!in_interrupt()); + LASSERT (!cfs_in_interrupt()); LASSERT (rxb->rxb_refcount == 0); LASSERT (!rxb->rxb_idle); LASSERT (!rxb->rxb_posted); @@ -246,18 +266,18 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb) any.nid = PTL_NID_ANY; any.pid = PTL_PID_ANY; - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); if (rxbp->rxbp_shutdown) { rxb->rxb_idle = 1; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return; } rxb->rxb_refcount = 1; /* net's ref */ rxb->rxb_posted = 1; /* I'm posting */ - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); rc = PtlMEAttach(kptllnd_data.kptl_nih, *kptllnd_tunables.kptl_portal, @@ -268,7 +288,8 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb) PTL_INS_AFTER, &meh); if (rc != PTL_OK) { - CERROR("PtlMeAttach rxb failed %d\n", rc); + CERROR("PtlMeAttach rxb failed %s(%d)\n", + kptllnd_errtype2str(rc), rc); goto failed; } @@ -276,7 +297,7 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb) * Setup MD */ md.start = rxb->rxb_buffer; - md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages; + md.length = kptllnd_rx_buffer_size(); md.threshold = PTL_MD_THRESH_INF; md.options = PTL_MD_OP_PUT | PTL_MD_LUSTRE_COMPLETION_SEMANTICS | @@ -289,23 +310,24 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb) rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh); if (rc == PTL_OK) { - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); if (rxb->rxb_posted) /* Not auto-unlinked yet!!! */ rxb->rxb_mdh = mdh; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return; } - CERROR("PtlMDAttach rxb failed %d\n", rc); + CERROR("PtlMDAttach rxb failed %s(%d)\n", + kptllnd_errtype2str(rc), rc); rc = PtlMEUnlink(meh); LASSERT(rc == PTL_OK); failed: - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); rxb->rxb_posted = 0; /* XXX this will just try again immediately */ kptllnd_rx_buffer_decref_locked(rxb); - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); } kptl_rx_t * @@ -329,12 +351,15 @@ kptllnd_rx_alloc(void) } void -kptllnd_rx_done(kptl_rx_t *rx) +kptllnd_rx_done(kptl_rx_t *rx, int post_credit) { kptl_rx_buffer_t *rxb = rx->rx_rxb; kptl_peer_t *peer = rx->rx_peer; unsigned long flags; + LASSERT (post_credit == PTLLND_POSTRX_NO_CREDIT || + post_credit == PTLLND_POSTRX_PEER_CREDIT); + CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer); if (rxb != NULL) @@ -342,17 +367,21 @@ kptllnd_rx_done(kptl_rx_t *rx) if (peer != NULL) { /* Update credits (after I've decref-ed the buffer) */ - spin_lock_irqsave(&peer->peer_lock, flags); + cfs_spin_lock_irqsave(&peer->peer_lock, flags); - peer->peer_outstanding_credits++; - LASSERT (peer->peer_outstanding_credits <= - *kptllnd_tunables.kptl_peercredits); + if (post_credit == PTLLND_POSTRX_PEER_CREDIT) + peer->peer_outstanding_credits++; - CDEBUG(D_NETTRACE, "%s[%d/%d]: rx %p done\n", - libcfs_id2str(peer->peer_id), - peer->peer_credits, peer->peer_outstanding_credits, rx); + LASSERT (peer->peer_outstanding_credits + + peer->peer_sent_credits <= + *kptllnd_tunables.kptl_peertxcredits); - spin_unlock_irqrestore(&peer->peer_lock, flags); + CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n", + libcfs_id2str(peer->peer_id), peer->peer_credits, + peer->peer_outstanding_credits, peer->peer_sent_credits, + rx); + + cfs_spin_unlock_irqrestore(&peer->peer_lock, flags); /* I might have to send back credits */ kptllnd_peer_check_sends(peer); @@ -378,28 +407,29 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev) unlinked = ev->type == PTL_EVENT_UNLINK; #endif - CDEBUG(D_NET, "RXB Callback %s(%d) rxb=%p id=%s unlink=%d rc %d\n", - kptllnd_evtype2str(ev->type), ev->type, rxb, - kptllnd_ptlid2str(ev->initiator), - unlinked, ev->ni_fail_type); + CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n", + kptllnd_ptlid2str(ev->initiator), + kptllnd_evtype2str(ev->type), ev->type, rxb, + kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type, + unlinked); LASSERT (!rxb->rxb_idle); LASSERT (ev->md.start == rxb->rxb_buffer); - LASSERT (ev->offset + ev->mlength <= + LASSERT (ev->offset + ev->mlength <= PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages); - LASSERT (ev->type == PTL_EVENT_PUT_END || + LASSERT (ev->type == PTL_EVENT_PUT_END || ev->type == PTL_EVENT_UNLINK); LASSERT (ev->type == PTL_EVENT_UNLINK || ev->match_bits == LNET_MSG_MATCHBITS); - if (ev->ni_fail_type != PTL_NI_OK) - CERROR("event type %d, status %d from %s\n", - ev->type, ev->ni_fail_type, - kptllnd_ptlid2str(ev->initiator)); - - if (ev->type == PTL_EVENT_PUT_END && - ev->ni_fail_type == PTL_NI_OK && - !rxbp->rxbp_shutdown) { + if (ev->ni_fail_type != PTL_NI_OK) { + CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn", + kptllnd_ptlid2str(ev->initiator), + kptllnd_evtype2str(ev->type), ev->type, rxb, + kptllnd_errtype2str(ev->ni_fail_type), + ev->ni_fail_type, unlinked); + } else if (ev->type == PTL_EVENT_PUT_END && + !rxbp->rxbp_shutdown) { /* rxbp_shutdown sampled without locking! I only treat it as a * hint since shutdown can start while rx's are queued on @@ -409,10 +439,10 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev) * odd-length message will misalign subsequent messages and * force the fixup below... */ if ((ev->mlength & 7) != 0) - CWARN("Message from %s has odd length %d: " + CWARN("Message from %s has odd length "LPU64": " "probable version incompatibility\n", kptllnd_ptlid2str(ev->initiator), - ev->mlength); + (__u64)ev->mlength); #endif rx = kptllnd_rx_alloc(); if (rx == NULL) { @@ -430,7 +460,7 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev) /* Portals can't force alignment - copy into * rx_space (avoiding overflow) to fix */ int maxlen = *kptllnd_tunables.kptl_max_msg_size; - + rx->rx_rxb = NULL; rx->rx_nob = MIN(maxlen, ev->mlength); rx->rx_msg = (kptl_msg_t *)rx->rx_space; @@ -443,35 +473,33 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev) } rx->rx_initiator = ev->initiator; -#ifdef CRAY_XT3 - rx->rx_uid = ev->uid; -#endif + rx->rx_treceived = jiffies; /* Queue for attention */ - spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, - flags); + cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, + flags); - list_add_tail(&rx->rx_list, - &kptllnd_data.kptl_sched_rxq); - wake_up(&kptllnd_data.kptl_sched_waitq); + cfs_list_add_tail(&rx->rx_list, + &kptllnd_data.kptl_sched_rxq); + cfs_waitq_signal(&kptllnd_data.kptl_sched_waitq); - spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, - flags); + cfs_spin_unlock_irqrestore(&kptllnd_data. \ + kptl_sched_lock, flags); } } if (unlinked) { - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); rxb->rxb_posted = 0; rxb->rxb_mdh = PTL_INVALID_HANDLE; kptllnd_rx_buffer_decref_locked(rxb); - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); } } void -kptllnd_nak (kptl_rx_t *rx) +kptllnd_nak (ptl_process_id_t dest) { /* Fire-and-forget a stub message that will let the peer know my * protocol magic/version and make her drop/refresh any peer state she @@ -488,32 +516,57 @@ kptllnd_nak (kptl_rx_t *rx) rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh); if (rc != PTL_OK) { - CWARN("Can't NAK %s: bind failed %d\n", - kptllnd_ptlid2str(rx->rx_initiator), rc); + CWARN("Can't NAK %s: bind failed %s(%d)\n", + kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc); return; } - rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator, + rc = PtlPut(mdh, PTL_NOACK_REQ, dest, *kptllnd_tunables.kptl_portal, 0, LNET_MSG_MATCHBITS, 0, 0); + if (rc != PTL_OK) { + CWARN("Can't NAK %s: put failed %s(%d)\n", + kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc); + } +} + +kptl_net_t * +kptllnd_find_net (lnet_nid_t nid) +{ + kptl_net_t *net; + + cfs_read_lock(&kptllnd_data.kptl_net_rw_lock); + cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) { + LASSERT (!net->net_shutdown); + + if (net->net_ni->ni_nid == nid) { + kptllnd_net_addref(net); + cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock); + return net; + } + } + cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock); - if (rc != PTL_OK) - CWARN("Can't NAK %s: put failed %d\n", - kptllnd_ptlid2str(rx->rx_initiator), rc); + return NULL; } void kptllnd_rx_parse(kptl_rx_t *rx) { kptl_msg_t *msg = rx->rx_msg; + int rc = 0; + int post_credit = PTLLND_POSTRX_PEER_CREDIT; + kptl_net_t *net = NULL; kptl_peer_t *peer; - int rc; - int credits; + cfs_list_t txs; unsigned long flags; lnet_process_id_t srcid; + LASSERT (!cfs_in_interrupt()); LASSERT (rx->rx_peer == NULL); + CFS_INIT_LIST_HEAD(&txs); + if ((rx->rx_nob >= 4 && (msg->ptlm_magic == LNET_PROTO_MAGIC || msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) || @@ -529,7 +582,8 @@ kptllnd_rx_parse(kptl_rx_t *rx) (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ? msg->ptlm_version : __swab16(msg->ptlm_version)), PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator)); - kptllnd_nak(rx); + /* NB backward compatibility */ + kptllnd_nak(rx->rx_initiator); goto rx_done; } @@ -543,11 +597,14 @@ kptllnd_rx_parse(kptl_rx_t *rx) srcid.nid = msg->ptlm_srcnid; srcid.pid = msg->ptlm_srcpid; - CDEBUG(D_NETTRACE, "%s: RX %s c %d %p\n", libcfs_id2str(srcid), - kptllnd_msgtype2str(msg->ptlm_type), msg->ptlm_credits, rx); + CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n", + libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type), + msg->ptlm_credits, rx, rx->rx_rxb, + jiffies - rx->rx_treceived, + cfs_duration_sec(jiffies - rx->rx_treceived)); - if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) { - CERROR("Bad source id %s from %s\n", + if (kptllnd_lnet2ptlnid(srcid.nid) != rx->rx_initiator.nid) { + CERROR("Bad source nid %s from %s\n", libcfs_id2str(srcid), kptllnd_ptlid2str(rx->rx_initiator)); goto rx_done; @@ -558,95 +615,137 @@ kptllnd_rx_parse(kptl_rx_t *rx) if (peer == NULL) goto rx_done; - CWARN("NAK from %s (%s)\n", - libcfs_id2str(srcid), + CWARN("NAK from %s (%d:%s)\n", + libcfs_id2str(srcid), peer->peer_state, kptllnd_ptlid2str(rx->rx_initiator)); + /* NB can't nuke new peer - bug 17546 comment 31 */ + if (peer->peer_state == PEER_STATE_WAITING_HELLO) { + CDEBUG(D_NET, "Stale NAK from %s(%s): WAITING_HELLO\n", + libcfs_id2str(srcid), + kptllnd_ptlid2str(rx->rx_initiator)); + kptllnd_peer_decref(peer); + goto rx_done; + } + rc = -EPROTO; goto failed; } - if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid || - msg->ptlm_dstpid != the_lnet.ln_pid) { - CERROR("Bad dstid %s (expected %s) from %s\n", + net = kptllnd_find_net(msg->ptlm_dstnid); + if (net == NULL || msg->ptlm_dstpid != the_lnet.ln_pid) { + CERROR("Bad dstid %s from %s\n", libcfs_id2str((lnet_process_id_t) { .nid = msg->ptlm_dstnid, .pid = msg->ptlm_dstpid}), - libcfs_id2str((lnet_process_id_t) { - .nid = kptllnd_data.kptl_ni->ni_nid, - .pid = the_lnet.ln_pid}), kptllnd_ptlid2str(rx->rx_initiator)); goto rx_done; } + if (LNET_NIDNET(srcid.nid) != LNET_NIDNET(net->net_ni->ni_nid)) { + lnet_nid_t nid = LNET_MKNID(LNET_NIDNET(net->net_ni->ni_nid), + LNET_NIDADDR(srcid.nid)); + CERROR("Bad source nid %s from %s, %s expected.\n", + libcfs_id2str(srcid), + kptllnd_ptlid2str(rx->rx_initiator), + libcfs_nid2str(nid)); + goto rx_done; + } + if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) { - peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg); - if (peer == NULL) { - CWARN("No peer for %s\n", - kptllnd_ptlid2str(rx->rx_initiator)); + peer = kptllnd_peer_handle_hello(net, rx->rx_initiator, msg); + if (peer == NULL) goto rx_done; - } } else { peer = kptllnd_id2peer(srcid); if (peer == NULL) { - CWARN("NAK %s: no connection; peer must reconnect\n", + CWARN("NAK %s: no connection, %s must reconnect\n", + kptllnd_msgtype2str(msg->ptlm_type), libcfs_id2str(srcid)); /* NAK to make the peer reconnect */ - kptllnd_nak(rx); + kptllnd_nak(rx->rx_initiator); goto rx_done; } - /* Ignore anything else while I'm waiting for HELLO */ - if (peer->peer_state == PEER_STATE_WAITING_HELLO) { + /* Ignore any messages for a previous incarnation of me */ + if (msg->ptlm_dststamp < peer->peer_myincarnation) { kptllnd_peer_decref(peer); goto rx_done; } - } - LASSERT (msg->ptlm_srcnid == peer->peer_id.nid && - msg->ptlm_srcpid == peer->peer_id.pid); + if (msg->ptlm_dststamp != peer->peer_myincarnation) { + CERROR("%s: Unexpected dststamp "LPX64" " + "("LPX64" expected)\n", + libcfs_id2str(peer->peer_id), msg->ptlm_dststamp, + peer->peer_myincarnation); + rc = -EPROTO; + goto failed; + } - if (msg->ptlm_srcstamp != peer->peer_incarnation) { - CERROR("Stale rx from %s srcstamp "LPX64" expected "LPX64"\n", - libcfs_id2str(peer->peer_id), - msg->ptlm_srcstamp, - peer->peer_incarnation); - rc = -EPROTO; - goto failed; + if (peer->peer_state == PEER_STATE_WAITING_HELLO) { + /* recoverable error - restart txs */ + cfs_spin_lock_irqsave(&peer->peer_lock, flags); + kptllnd_cancel_txlist(&peer->peer_sendq, &txs); + cfs_spin_unlock_irqrestore(&peer->peer_lock, flags); + + CWARN("NAK %s: Unexpected %s message\n", + libcfs_id2str(srcid), + kptllnd_msgtype2str(msg->ptlm_type)); + kptllnd_nak(rx->rx_initiator); + rc = -EPROTO; + goto failed; + } + + if (msg->ptlm_srcstamp != peer->peer_incarnation) { + CERROR("%s: Unexpected srcstamp "LPX64" " + "("LPX64" expected)\n", + libcfs_id2str(srcid), + msg->ptlm_srcstamp, + peer->peer_incarnation); + rc = -EPROTO; + goto failed; + } } - if (msg->ptlm_dststamp != kptllnd_data.kptl_incarnation && - (msg->ptlm_type != PTLLND_MSG_TYPE_HELLO || /* HELLO sends a */ - msg->ptlm_dststamp != 0)) { /* zero dststamp */ - CERROR("Stale rx from %s dststamp "LPX64" expected "LPX64"\n", - libcfs_id2str(peer->peer_id), msg->ptlm_dststamp, - kptllnd_data.kptl_incarnation); + LASSERTF (LNET_NIDADDR(msg->ptlm_srcnid) == + LNET_NIDADDR(peer->peer_id.nid), "m %s p %s\n", + libcfs_nid2str(msg->ptlm_srcnid), + libcfs_nid2str(peer->peer_id.nid)); + LASSERTF (msg->ptlm_srcpid == peer->peer_id.pid, "m %u p %u\n", + msg->ptlm_srcpid, peer->peer_id.pid); + + cfs_spin_lock_irqsave(&peer->peer_lock, flags); + + /* Check peer only sends when I've sent her credits */ + if (peer->peer_sent_credits == 0) { + int c = peer->peer_credits; + int oc = peer->peer_outstanding_credits; + int sc = peer->peer_sent_credits; + + cfs_spin_unlock_irqrestore(&peer->peer_lock, flags); + + CERROR("%s: buffer overrun [%d/%d+%d]\n", + libcfs_id2str(peer->peer_id), c, sc, oc); rc = -EPROTO; goto failed; } + peer->peer_sent_credits--; - if (msg->ptlm_credits != 0) { - spin_lock_irqsave(&peer->peer_lock, flags); + /* No check for credit overflow - the peer may post new + * buffers after the startup handshake. */ + peer->peer_credits += msg->ptlm_credits; - if (peer->peer_credits + msg->ptlm_credits > - *kptllnd_tunables.kptl_peercredits) { - credits = peer->peer_credits; - spin_unlock_irqrestore(&peer->peer_lock, flags); - - CERROR("Credit overflow from %s: %d + %d > %d\n", - libcfs_id2str(peer->peer_id), - credits, msg->ptlm_credits, - *kptllnd_tunables.kptl_peercredits); - rc = -EPROTO; - goto failed; - } - - peer->peer_credits += msg->ptlm_credits; + /* This ensures the credit taken by NOOP can be returned */ + if (msg->ptlm_type == PTLLND_MSG_TYPE_NOOP) { + peer->peer_outstanding_credits++; + post_credit = PTLLND_POSTRX_NO_CREDIT; + } - spin_unlock_irqrestore(&peer->peer_lock, flags); + cfs_spin_unlock_irqrestore(&peer->peer_lock, flags); + /* See if something can go out now that credits have come in */ + if (msg->ptlm_credits != 0) kptllnd_peer_check_sends(peer); - } /* ptllnd-level protocol correct - rx takes my ref on peer and increments * peer_outstanding_credits when it completes */ @@ -668,12 +767,14 @@ kptllnd_rx_parse(kptl_rx_t *rx) case PTLLND_MSG_TYPE_IMMEDIATE: CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n"); - rc = lnet_parse(kptllnd_data.kptl_ni, + rc = lnet_parse(net->net_ni, &msg->ptlm_u.immediate.kptlim_hdr, msg->ptlm_srcnid, rx, 0); - if (rc >= 0) /* kptllnd_recv owns 'rx' now */ + if (rc >= 0) { /* kptllnd_recv owns 'rx' now */ + kptllnd_net_decref(net); return; + } goto failed; case PTLLND_MSG_TYPE_PUT: @@ -687,28 +788,37 @@ kptllnd_rx_parse(kptl_rx_t *rx) PTL_RESERVED_MATCHBITS); /* Update last match bits seen */ - spin_lock_irqsave(&peer->peer_lock, flags); + cfs_spin_lock_irqsave(&peer->peer_lock, flags); if (msg->ptlm_u.rdma.kptlrm_matchbits > rx->rx_peer->peer_last_matchbits_seen) rx->rx_peer->peer_last_matchbits_seen = msg->ptlm_u.rdma.kptlrm_matchbits; - spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags); + cfs_spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags); - rc = lnet_parse(kptllnd_data.kptl_ni, + rc = lnet_parse(net->net_ni, &msg->ptlm_u.rdma.kptlrm_hdr, msg->ptlm_srcnid, rx, 1); - if (rc >= 0) /* kptllnd_recv owns 'rx' now */ + if (rc >= 0) { /* kptllnd_recv owns 'rx' now */ + kptllnd_net_decref(net); return; + } goto failed; - } + } failed: + LASSERT (rc != 0); kptllnd_peer_close(peer, rc); if (rx->rx_peer == NULL) /* drop ref on peer */ kptllnd_peer_decref(peer); /* unless rx_done will */ + if (!cfs_list_empty(&txs)) { + LASSERT (net != NULL); + kptllnd_restart_txs(net, srcid, &txs); + } rx_done: - kptllnd_rx_done(rx); + if (net != NULL) + kptllnd_net_decref(net); + kptllnd_rx_done(rx, post_credit); }