X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fptllnd%2Fptllnd_rx_buf.c;h=f12be7d03d1004919fbd3fcf9f3613582ead8c80;hp=f7bf2d3c347b6e56b545c90a259fb3526b3b13bc;hb=294c39d488fcd95a523466c7726ff1b5a8327890;hpb=6869932b552ac705f411de3362f01bd50c1f6f7d diff --git a/lnet/klnds/ptllnd/ptllnd_rx_buf.c b/lnet/klnds/ptllnd/ptllnd_rx_buf.c index f7bf2d3..f12be7d 100644 --- a/lnet/klnds/ptllnd/ptllnd_rx_buf.c +++ b/lnet/klnds/ptllnd/ptllnd_rx_buf.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -26,7 +24,7 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. */ /* @@ -44,8 +42,8 @@ void kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp) { memset(rxbp, 0, sizeof(*rxbp)); - spin_lock_init(&rxbp->rxbp_lock); - INIT_LIST_HEAD(&rxbp->rxbp_list); + cfs_spin_lock_init(&rxbp->rxbp_lock); + CFS_INIT_LIST_HEAD(&rxbp->rxbp_list); } void @@ -58,7 +56,7 @@ kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb) LASSERT(!rxb->rxb_posted); LASSERT(rxb->rxb_idle); - list_del(&rxb->rxb_list); + cfs_list_del(&rxb->rxb_list); rxbp->rxbp_count--; LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size()); @@ -80,7 +78,7 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count) CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); for (;;) { if (rxbp->rxbp_shutdown) { @@ -94,7 +92,7 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count) break; } - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); LIBCFS_ALLOC(rxb, sizeof(*rxb)); LIBCFS_ALLOC(buffer, bufsize); @@ -107,7 +105,7 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count) if (buffer != NULL) LIBCFS_FREE(buffer, bufsize); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); rc = -ENOMEM; break; } @@ -122,33 +120,33 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count) rxb->rxb_buffer = buffer; rxb->rxb_mdh = PTL_INVALID_HANDLE; - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); if (rxbp->rxbp_shutdown) { - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); LIBCFS_FREE(rxb, sizeof(*rxb)); LIBCFS_FREE(buffer, bufsize); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); rc = -ESHUTDOWN; break; } - list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list); + cfs_list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list); rxbp->rxbp_count++; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); kptllnd_rx_buffer_post(rxb); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); } if (rc == 0) rxbp->rxbp_reserved += count; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return rc; } @@ -159,12 +157,12 @@ kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp, { unsigned long flags; - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count); rxbp->rxbp_reserved -= count; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); } void @@ -174,11 +172,11 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp) int rc; int i; unsigned long flags; - struct list_head *tmp; - struct list_head *nxt; + cfs_list_t *tmp; + cfs_list_t *nxt; ptl_handle_md_t mdh; - /* CAVEAT EMPTOR: I'm racing with everything here!!! + /* CAVEAT EMPTOR: I'm racing with everything here!!! * * Buffers can still be posted after I set rxbp_shutdown because I * can't hold rxbp_lock while I'm posting them. @@ -189,20 +187,20 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp) * different MD) from when the MD is actually unlinked, to when the * event callback tells me it has been unlinked. */ - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); rxbp->rxbp_shutdown = 1; for (i = 9;; i++) { - list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) { - rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list); - + cfs_list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) { + rxb = cfs_list_entry (tmp, kptl_rx_buffer_t, rxb_list); + if (rxb->rxb_idle) { - spin_unlock_irqrestore(&rxbp->rxbp_lock, - flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, + flags); kptllnd_rx_buffer_destroy(rxb); - spin_lock_irqsave(&rxbp->rxbp_lock, - flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, + flags); continue; } @@ -210,11 +208,11 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp) if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE)) continue; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); rc = PtlMDUnlink(mdh); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS /* callback clears rxb_mdh and drops net's ref @@ -230,10 +228,10 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp) #endif } - if (list_empty(&rxbp->rxbp_list)) + if (cfs_list_empty(&rxbp->rxbp_list)) break; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); /* Wait a bit for references to be dropped */ CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ @@ -242,10 +240,10 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp) cfs_pause(cfs_time_seconds(1)); - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); } - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); } void @@ -259,7 +257,7 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb) kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool; unsigned long flags; - LASSERT (!in_interrupt()); + LASSERT (!cfs_in_interrupt()); LASSERT (rxb->rxb_refcount == 0); LASSERT (!rxb->rxb_idle); LASSERT (!rxb->rxb_posted); @@ -268,18 +266,18 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb) any.nid = PTL_NID_ANY; any.pid = PTL_PID_ANY; - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); if (rxbp->rxbp_shutdown) { rxb->rxb_idle = 1; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return; } rxb->rxb_refcount = 1; /* net's ref */ rxb->rxb_posted = 1; /* I'm posting */ - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); rc = PtlMEAttach(kptllnd_data.kptl_nih, *kptllnd_tunables.kptl_portal, @@ -299,7 +297,7 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb) * Setup MD */ md.start = rxb->rxb_buffer; - md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages; + md.length = kptllnd_rx_buffer_size(); md.threshold = PTL_MD_THRESH_INF; md.options = PTL_MD_OP_PUT | PTL_MD_LUSTRE_COMPLETION_SEMANTICS | @@ -312,10 +310,10 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb) rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh); if (rc == PTL_OK) { - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); if (rxb->rxb_posted) /* Not auto-unlinked yet!!! */ rxb->rxb_mdh = mdh; - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return; } @@ -325,11 +323,11 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb) LASSERT(rc == PTL_OK); failed: - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); rxb->rxb_posted = 0; /* XXX this will just try again immediately */ kptllnd_rx_buffer_decref_locked(rxb); - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); } kptl_rx_t * @@ -369,21 +367,21 @@ kptllnd_rx_done(kptl_rx_t *rx, int post_credit) if (peer != NULL) { /* Update credits (after I've decref-ed the buffer) */ - spin_lock_irqsave(&peer->peer_lock, flags); + cfs_spin_lock_irqsave(&peer->peer_lock, flags); if (post_credit == PTLLND_POSTRX_PEER_CREDIT) peer->peer_outstanding_credits++; LASSERT (peer->peer_outstanding_credits + peer->peer_sent_credits <= - *kptllnd_tunables.kptl_peercredits); + *kptllnd_tunables.kptl_peertxcredits); CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n", libcfs_id2str(peer->peer_id), peer->peer_credits, peer->peer_outstanding_credits, peer->peer_sent_credits, rx); - spin_unlock_irqrestore(&peer->peer_lock, flags); + cfs_spin_unlock_irqrestore(&peer->peer_lock, flags); /* I might have to send back credits */ kptllnd_peer_check_sends(peer); @@ -410,16 +408,16 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev) #endif CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n", - kptllnd_ptlid2str(ev->initiator), - kptllnd_evtype2str(ev->type), ev->type, rxb, + kptllnd_ptlid2str(ev->initiator), + kptllnd_evtype2str(ev->type), ev->type, rxb, kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type, unlinked); LASSERT (!rxb->rxb_idle); LASSERT (ev->md.start == rxb->rxb_buffer); - LASSERT (ev->offset + ev->mlength <= + LASSERT (ev->offset + ev->mlength <= PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages); - LASSERT (ev->type == PTL_EVENT_PUT_END || + LASSERT (ev->type == PTL_EVENT_PUT_END || ev->type == PTL_EVENT_UNLINK); LASSERT (ev->type == PTL_EVENT_UNLINK || ev->match_bits == LNET_MSG_MATCHBITS); @@ -430,7 +428,6 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev) kptllnd_evtype2str(ev->type), ev->type, rxb, kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type, unlinked); - } else if (ev->type == PTL_EVENT_PUT_END && !rxbp->rxbp_shutdown) { @@ -463,7 +460,7 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev) /* Portals can't force alignment - copy into * rx_space (avoiding overflow) to fix */ int maxlen = *kptllnd_tunables.kptl_max_msg_size; - + rx->rx_rxb = NULL; rx->rx_nob = MIN(maxlen, ev->mlength); rx->rx_msg = (kptl_msg_t *)rx->rx_space; @@ -477,35 +474,32 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev) rx->rx_initiator = ev->initiator; rx->rx_treceived = jiffies; -#ifdef CRAY_XT3 - rx->rx_uid = ev->uid; -#endif /* Queue for attention */ - spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, - flags); + cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, + flags); - list_add_tail(&rx->rx_list, - &kptllnd_data.kptl_sched_rxq); - wake_up(&kptllnd_data.kptl_sched_waitq); + cfs_list_add_tail(&rx->rx_list, + &kptllnd_data.kptl_sched_rxq); + cfs_waitq_signal(&kptllnd_data.kptl_sched_waitq); - spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, - flags); + cfs_spin_unlock_irqrestore(&kptllnd_data. \ + kptl_sched_lock, flags); } } if (unlinked) { - spin_lock_irqsave(&rxbp->rxbp_lock, flags); + cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags); rxb->rxb_posted = 0; rxb->rxb_mdh = PTL_INVALID_HANDLE; kptllnd_rx_buffer_decref_locked(rxb); - spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); + cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); } } void -kptllnd_nak (kptl_rx_t *rx) +kptllnd_nak (ptl_process_id_t dest) { /* Fire-and-forget a stub message that will let the peer know my * protocol magic/version and make her drop/refresh any peer state she @@ -523,33 +517,56 @@ kptllnd_nak (kptl_rx_t *rx) rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh); if (rc != PTL_OK) { CWARN("Can't NAK %s: bind failed %s(%d)\n", - kptllnd_ptlid2str(rx->rx_initiator), - kptllnd_errtype2str(rc), rc); + kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc); return; } - rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator, + rc = PtlPut(mdh, PTL_NOACK_REQ, dest, *kptllnd_tunables.kptl_portal, 0, LNET_MSG_MATCHBITS, 0, 0); - - if (rc != PTL_OK) + if (rc != PTL_OK) { CWARN("Can't NAK %s: put failed %s(%d)\n", - kptllnd_ptlid2str(rx->rx_initiator), - kptllnd_errtype2str(rc), rc); + kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc); + } +} + +kptl_net_t * +kptllnd_find_net (lnet_nid_t nid) +{ + kptl_net_t *net; + + cfs_read_lock(&kptllnd_data.kptl_net_rw_lock); + cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) { + LASSERT (!net->net_shutdown); + + if (net->net_ni->ni_nid == nid) { + kptllnd_net_addref(net); + cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock); + return net; + } + } + cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock); + + return NULL; } void kptllnd_rx_parse(kptl_rx_t *rx) { kptl_msg_t *msg = rx->rx_msg; + int rc = 0; int post_credit = PTLLND_POSTRX_PEER_CREDIT; + kptl_net_t *net = NULL; kptl_peer_t *peer; - int rc; + cfs_list_t txs; unsigned long flags; lnet_process_id_t srcid; + LASSERT (!cfs_in_interrupt()); LASSERT (rx->rx_peer == NULL); + CFS_INIT_LIST_HEAD(&txs); + if ((rx->rx_nob >= 4 && (msg->ptlm_magic == LNET_PROTO_MAGIC || msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) || @@ -565,7 +582,8 @@ kptllnd_rx_parse(kptl_rx_t *rx) (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ? msg->ptlm_version : __swab16(msg->ptlm_version)), PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator)); - kptllnd_nak(rx); + /* NB backward compatibility */ + kptllnd_nak(rx->rx_initiator); goto rx_done; } @@ -585,8 +603,8 @@ kptllnd_rx_parse(kptl_rx_t *rx) jiffies - rx->rx_treceived, cfs_duration_sec(jiffies - rx->rx_treceived)); - if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) { - CERROR("Bad source id %s from %s\n", + if (kptllnd_lnet2ptlnid(srcid.nid) != rx->rx_initiator.nid) { + CERROR("Bad source nid %s from %s\n", libcfs_id2str(srcid), kptllnd_ptlid2str(rx->rx_initiator)); goto rx_done; @@ -597,73 +615,106 @@ kptllnd_rx_parse(kptl_rx_t *rx) if (peer == NULL) goto rx_done; - CWARN("NAK from %s (%s)\n", - libcfs_id2str(srcid), + CWARN("NAK from %s (%d:%s)\n", + libcfs_id2str(srcid), peer->peer_state, kptllnd_ptlid2str(rx->rx_initiator)); + /* NB can't nuke new peer - bug 17546 comment 31 */ + if (peer->peer_state == PEER_STATE_WAITING_HELLO) { + CDEBUG(D_NET, "Stale NAK from %s(%s): WAITING_HELLO\n", + libcfs_id2str(srcid), + kptllnd_ptlid2str(rx->rx_initiator)); + kptllnd_peer_decref(peer); + goto rx_done; + } + rc = -EPROTO; goto failed; } - if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid || - msg->ptlm_dstpid != the_lnet.ln_pid) { - CERROR("Bad dstid %s (expected %s) from %s\n", + net = kptllnd_find_net(msg->ptlm_dstnid); + if (net == NULL || msg->ptlm_dstpid != the_lnet.ln_pid) { + CERROR("Bad dstid %s from %s\n", libcfs_id2str((lnet_process_id_t) { .nid = msg->ptlm_dstnid, .pid = msg->ptlm_dstpid}), - libcfs_id2str((lnet_process_id_t) { - .nid = kptllnd_data.kptl_ni->ni_nid, - .pid = the_lnet.ln_pid}), kptllnd_ptlid2str(rx->rx_initiator)); goto rx_done; } + if (LNET_NIDNET(srcid.nid) != LNET_NIDNET(net->net_ni->ni_nid)) { + lnet_nid_t nid = LNET_MKNID(LNET_NIDNET(net->net_ni->ni_nid), + LNET_NIDADDR(srcid.nid)); + CERROR("Bad source nid %s from %s, %s expected.\n", + libcfs_id2str(srcid), + kptllnd_ptlid2str(rx->rx_initiator), + libcfs_nid2str(nid)); + goto rx_done; + } + if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) { - peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg); + peer = kptllnd_peer_handle_hello(net, rx->rx_initiator, msg); if (peer == NULL) goto rx_done; } else { peer = kptllnd_id2peer(srcid); if (peer == NULL) { - CWARN("NAK %s: no connection; peer must reconnect\n", + CWARN("NAK %s: no connection, %s must reconnect\n", + kptllnd_msgtype2str(msg->ptlm_type), libcfs_id2str(srcid)); /* NAK to make the peer reconnect */ - kptllnd_nak(rx); + kptllnd_nak(rx->rx_initiator); goto rx_done; } - /* Ignore anything apart from HELLO while I'm waiting for it and - * any messages for a previous incarnation of the connection */ - if (peer->peer_state == PEER_STATE_WAITING_HELLO || - msg->ptlm_dststamp < peer->peer_myincarnation) { + /* Ignore any messages for a previous incarnation of me */ + if (msg->ptlm_dststamp < peer->peer_myincarnation) { kptllnd_peer_decref(peer); goto rx_done; } - if (msg->ptlm_srcstamp != peer->peer_incarnation) { - CERROR("%s: Unexpected srcstamp "LPX64" " + if (msg->ptlm_dststamp != peer->peer_myincarnation) { + CERROR("%s: Unexpected dststamp "LPX64" " "("LPX64" expected)\n", - libcfs_id2str(peer->peer_id), - msg->ptlm_srcstamp, - peer->peer_incarnation); + libcfs_id2str(peer->peer_id), msg->ptlm_dststamp, + peer->peer_myincarnation); rc = -EPROTO; goto failed; } - if (msg->ptlm_dststamp != peer->peer_myincarnation) { - CERROR("%s: Unexpected dststamp "LPX64" " + if (peer->peer_state == PEER_STATE_WAITING_HELLO) { + /* recoverable error - restart txs */ + cfs_spin_lock_irqsave(&peer->peer_lock, flags); + kptllnd_cancel_txlist(&peer->peer_sendq, &txs); + cfs_spin_unlock_irqrestore(&peer->peer_lock, flags); + + CWARN("NAK %s: Unexpected %s message\n", + libcfs_id2str(srcid), + kptllnd_msgtype2str(msg->ptlm_type)); + kptllnd_nak(rx->rx_initiator); + rc = -EPROTO; + goto failed; + } + + if (msg->ptlm_srcstamp != peer->peer_incarnation) { + CERROR("%s: Unexpected srcstamp "LPX64" " "("LPX64" expected)\n", - libcfs_id2str(peer->peer_id), msg->ptlm_dststamp, - peer->peer_myincarnation); + libcfs_id2str(srcid), + msg->ptlm_srcstamp, + peer->peer_incarnation); rc = -EPROTO; goto failed; } } - LASSERT (msg->ptlm_srcnid == peer->peer_id.nid && - msg->ptlm_srcpid == peer->peer_id.pid); + LASSERTF (LNET_NIDADDR(msg->ptlm_srcnid) == + LNET_NIDADDR(peer->peer_id.nid), "m %s p %s\n", + libcfs_nid2str(msg->ptlm_srcnid), + libcfs_nid2str(peer->peer_id.nid)); + LASSERTF (msg->ptlm_srcpid == peer->peer_id.pid, "m %u p %u\n", + msg->ptlm_srcpid, peer->peer_id.pid); - spin_lock_irqsave(&peer->peer_lock, flags); + cfs_spin_lock_irqsave(&peer->peer_lock, flags); /* Check peer only sends when I've sent her credits */ if (peer->peer_sent_credits == 0) { @@ -671,10 +722,11 @@ kptllnd_rx_parse(kptl_rx_t *rx) int oc = peer->peer_outstanding_credits; int sc = peer->peer_sent_credits; - spin_unlock_irqrestore(&peer->peer_lock, flags); + cfs_spin_unlock_irqrestore(&peer->peer_lock, flags); CERROR("%s: buffer overrun [%d/%d+%d]\n", libcfs_id2str(peer->peer_id), c, sc, oc); + rc = -EPROTO; goto failed; } peer->peer_sent_credits--; @@ -689,7 +741,7 @@ kptllnd_rx_parse(kptl_rx_t *rx) post_credit = PTLLND_POSTRX_NO_CREDIT; } - spin_unlock_irqrestore(&peer->peer_lock, flags); + cfs_spin_unlock_irqrestore(&peer->peer_lock, flags); /* See if something can go out now that credits have come in */ if (msg->ptlm_credits != 0) @@ -715,12 +767,14 @@ kptllnd_rx_parse(kptl_rx_t *rx) case PTLLND_MSG_TYPE_IMMEDIATE: CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n"); - rc = lnet_parse(kptllnd_data.kptl_ni, + rc = lnet_parse(net->net_ni, &msg->ptlm_u.immediate.kptlim_hdr, msg->ptlm_srcnid, rx, 0); - if (rc >= 0) /* kptllnd_recv owns 'rx' now */ + if (rc >= 0) { /* kptllnd_recv owns 'rx' now */ + kptllnd_net_decref(net); return; + } goto failed; case PTLLND_MSG_TYPE_PUT: @@ -734,28 +788,37 @@ kptllnd_rx_parse(kptl_rx_t *rx) PTL_RESERVED_MATCHBITS); /* Update last match bits seen */ - spin_lock_irqsave(&peer->peer_lock, flags); + cfs_spin_lock_irqsave(&peer->peer_lock, flags); if (msg->ptlm_u.rdma.kptlrm_matchbits > rx->rx_peer->peer_last_matchbits_seen) rx->rx_peer->peer_last_matchbits_seen = msg->ptlm_u.rdma.kptlrm_matchbits; - spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags); + cfs_spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags); - rc = lnet_parse(kptllnd_data.kptl_ni, + rc = lnet_parse(net->net_ni, &msg->ptlm_u.rdma.kptlrm_hdr, msg->ptlm_srcnid, rx, 1); - if (rc >= 0) /* kptllnd_recv owns 'rx' now */ + if (rc >= 0) { /* kptllnd_recv owns 'rx' now */ + kptllnd_net_decref(net); return; + } goto failed; - } + } failed: + LASSERT (rc != 0); kptllnd_peer_close(peer, rc); if (rx->rx_peer == NULL) /* drop ref on peer */ kptllnd_peer_decref(peer); /* unless rx_done will */ + if (!cfs_list_empty(&txs)) { + LASSERT (net != NULL); + kptllnd_restart_txs(net, srcid, &txs); + } rx_done: + if (net != NULL) + kptllnd_net_decref(net); kptllnd_rx_done(rx, post_credit); }