X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Fklnds%2Fptllnd%2Fptllnd_peer.c;h=fc52df75e6db22a0f0ae893f54a279e2be153394;hb=c435b48fae2db343df70e948418abff277ffb998;hp=f8999e0b14f2e816de929f71b02223ec3b0b160f;hpb=5d88d521ad1abc2d94ac6a9c6a9b2e023335b757;p=fs%2Flustre-release.git diff --git a/lnet/klnds/ptllnd/ptllnd_peer.c b/lnet/klnds/ptllnd/ptllnd_peer.c index f8999e0..fc52df7 100644 --- a/lnet/klnds/ptllnd/ptllnd_peer.c +++ b/lnet/klnds/ptllnd/ptllnd_peer.c @@ -169,11 +169,15 @@ kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid) peer->peer_credits = 1; /* enough for HELLO */ peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS; peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1; + peer->peer_sent_credits = 1; /* HELLO credit is implicit */ + peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */ atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */ write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); + peer->peer_myincarnation = kptllnd_data.kptl_incarnation; + /* Only increase # peers under lock, to guarantee we dont grow it * during shutdown */ if (kptllnd_data.kptl_shutdown) { @@ -216,51 +220,34 @@ kptllnd_peer_destroy (kptl_peer_t *peer) } void -kptllnd_peer_cancel_txs(kptl_peer_t *peer) +kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs) { - struct list_head sendq; - struct list_head activeq; struct list_head *tmp; struct list_head *nxt; kptl_tx_t *tx; - unsigned long flags; - - /* atomically grab all the peer's tx-es... */ - spin_lock_irqsave(&peer->peer_lock, flags); - - list_add(&sendq, &peer->peer_sendq); - list_del_init(&peer->peer_sendq); - list_for_each (tmp, &sendq) { + list_for_each_safe (tmp, nxt, peerq) { tx = list_entry(tmp, kptl_tx_t, tx_list); - tx->tx_active = 0; - } - list_add(&activeq, &peer->peer_activeq); - list_del_init(&peer->peer_activeq); - list_for_each (tmp, &activeq) { - tx = list_entry(tmp, kptl_tx_t, tx_list); + list_del(&tx->tx_list); + list_add_tail(&tx->tx_list, txs); + + tx->tx_status = -EIO; tx->tx_active = 0; } +} - spin_unlock_irqrestore(&peer->peer_lock, flags); - - /* ...then drop the peer's ref on them at leasure. This will get - * kptllnd_tx_fini() to abort outstanding comms if necessary. */ +void +kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs) +{ + unsigned long flags; - list_for_each_safe (tmp, nxt, &sendq) { - tx = list_entry(tmp, kptl_tx_t, tx_list); - list_del(&tx->tx_list); - tx->tx_status = -EIO; - kptllnd_tx_decref(tx); - } + spin_lock_irqsave(&peer->peer_lock, flags); - list_for_each_safe (tmp, nxt, &activeq) { - tx = list_entry(tmp, kptl_tx_t, tx_list); - list_del(&tx->tx_list); - tx->tx_status = -EIO; - kptllnd_tx_decref(tx); - } + kptllnd_cancel_txlist(&peer->peer_sendq, txs); + kptllnd_cancel_txlist(&peer->peer_activeq, txs); + + spin_unlock_irqrestore(&peer->peer_lock, flags); } void @@ -300,25 +287,42 @@ void kptllnd_handle_closing_peers () { unsigned long flags; + struct list_head txs; kptl_peer_t *peer; struct list_head *tmp; struct list_head *nxt; + kptl_tx_t *tx; int idle; /* Check with a read lock first to avoid blocking anyone */ read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); - idle = list_empty(&kptllnd_data.kptl_closing_peers); + idle = list_empty(&kptllnd_data.kptl_closing_peers) && + list_empty(&kptllnd_data.kptl_zombie_peers); read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); if (idle) return; - /* Scan the closing peers and cancel their txs. - * NB only safe while there is only a single watchdog */ + INIT_LIST_HEAD(&txs); write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); + /* Cancel txs on all zombie peers. NB anyone dropping the last peer + * ref removes it from this list, so I musn't drop the lock while + * scanning it. */ + list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) { + peer = list_entry (tmp, kptl_peer_t, peer_list); + + LASSERT (peer->peer_state == PEER_STATE_ZOMBIE); + + kptllnd_peer_cancel_txs(peer, &txs); + } + + /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB + * I'm the only one removing from this list, but peers can be added on + * the end any time I drop the lock. */ + list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) { peer = list_entry (tmp, kptl_peer_t, peer_list); @@ -332,13 +336,22 @@ kptllnd_handle_closing_peers () write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); kptllnd_peer_notify(peer); - kptllnd_peer_cancel_txs(peer); + kptllnd_peer_cancel_txs(peer, &txs); kptllnd_peer_decref(peer); write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); } write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); + + /* Drop peer's ref on all cancelled txs. This will get + * kptllnd_tx_fini() to abort outstanding comms if necessary. */ + + list_for_each_safe (tmp, nxt, &txs) { + tx = list_entry(tmp, kptl_tx_t, tx_list); + list_del(&tx->tx_list); + kptllnd_tx_decref(tx); + } } void @@ -350,6 +363,11 @@ kptllnd_peer_close_locked(kptl_peer_t *peer, int why) case PEER_STATE_WAITING_HELLO: case PEER_STATE_ACTIVE: + /* Ensure new peers see a new incarnation of me */ + LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation); + if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation) + kptllnd_data.kptl_incarnation++; + /* Removing from peer table */ kptllnd_data.kptl_n_active_peers--; LASSERT (kptllnd_data.kptl_n_active_peers >= 0); @@ -358,6 +376,7 @@ kptllnd_peer_close_locked(kptl_peer_t *peer, int why) kptllnd_peer_unreserve_buffers(); peer->peer_error = why; /* stash 'why' only on first close */ + peer->peer_state = PEER_STATE_CLOSING; /* Schedule for immediate attention, taking peer table's ref */ list_add_tail(&peer->peer_list, @@ -366,18 +385,9 @@ kptllnd_peer_close_locked(kptl_peer_t *peer, int why) break; case PEER_STATE_ZOMBIE: - /* Schedule for attention at next timeout */ - kptllnd_peer_addref(peer); - list_del(&peer->peer_list); - list_add_tail(&peer->peer_list, - &kptllnd_data.kptl_closing_peers); - break; - case PEER_STATE_CLOSING: break; } - - peer->peer_state = PEER_STATE_CLOSING; } void @@ -452,12 +462,10 @@ again: } void -kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx) +kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag) { /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */ - ptl_handle_md_t rdma_mdh = PTL_INVALID_HANDLE; - ptl_handle_md_t msg_mdh = PTL_INVALID_HANDLE; - ptl_handle_me_t meh; + ptl_handle_md_t msg_mdh; ptl_md_t md; ptl_err_t prc; unsigned long flags; @@ -472,70 +480,41 @@ kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx) kptllnd_set_tx_peer(tx, peer); - if (tx->tx_type == TX_TYPE_PUT_REQUEST || - tx->tx_type == TX_TYPE_GET_REQUEST) { - - spin_lock_irqsave(&peer->peer_lock, flags); - - /* Assume 64-bit matchbits can't wrap */ - LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS); - tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits = - peer->peer_next_matchbits++; - - spin_unlock_irqrestore(&peer->peer_lock, flags); - - prc = PtlMEAttach(kptllnd_data.kptl_nih, - *kptllnd_tunables.kptl_portal, - peer->peer_ptlid, - tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits, - 0, /* ignore bits */ - PTL_UNLINK, - PTL_INS_BEFORE, - &meh); - if (prc != PTL_OK) { - CERROR("PtlMEAttach(%s) failed: %d\n", - libcfs_id2str(peer->peer_id), prc); - goto failed; - } - - prc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, &rdma_mdh); - if (prc != PTL_OK) { - CERROR("PtlMDAttach(%s) failed: %d\n", - libcfs_id2str(tx->tx_peer->peer_id), prc); - prc = PtlMEUnlink(meh); - LASSERT(prc == PTL_OK); - rdma_mdh = PTL_INVALID_HANDLE; - goto failed; - } - - /* I'm not racing with the event callback here. It's a bug if - * there's an event on the MD I just attached before I actually - * send the RDMA request message which the event callback - * catches by asserting 'rdma_mdh' is valid. */ - } - memset(&md, 0, sizeof(md)); - - md.start = tx->tx_msg; - md.length = tx->tx_msg->ptlm_nob; - md.threshold = 1; + + md.threshold = tx->tx_acked ? 2 : 1; /* SEND END + ACK? */ md.options = PTL_MD_OP_PUT | PTL_MD_LUSTRE_COMPLETION_SEMANTICS | PTL_MD_EVENT_START_DISABLE; md.user_ptr = &tx->tx_msg_eventarg; md.eq_handle = kptllnd_data.kptl_eqh; + if (nfrag == 0) { + md.start = tx->tx_msg; + md.length = tx->tx_msg->ptlm_nob; + } else { + LASSERT (nfrag > 1); + LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg); + + md.start = tx->tx_frags; + md.length = nfrag; + md.options |= PTL_MD_IOVEC; + } + prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh); if (prc != PTL_OK) { - msg_mdh = PTL_INVALID_HANDLE; - goto failed; + CERROR("PtlMDBind(%s) failed: %s(%d)\n", + libcfs_id2str(peer->peer_id), + kptllnd_errtype2str(prc), prc); + tx->tx_status = -EIO; + kptllnd_tx_decref(tx); + return; } spin_lock_irqsave(&peer->peer_lock, flags); tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ); tx->tx_active = 1; - tx->tx_rdma_mdh = rdma_mdh; tx->tx_msg_mdh = msg_mdh; /* Ensure HELLO is sent first */ @@ -545,24 +524,12 @@ kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx) list_add_tail(&tx->tx_list, &peer->peer_sendq); spin_unlock_irqrestore(&peer->peer_lock, flags); - return; - - failed: - spin_lock_irqsave(&peer->peer_lock, flags); - - tx->tx_status = -EIO; - tx->tx_rdma_mdh = rdma_mdh; - tx->tx_msg_mdh = msg_mdh; - - spin_unlock_irqrestore(&peer->peer_lock, flags); - - kptllnd_tx_decref(tx); } void kptllnd_peer_check_sends (kptl_peer_t *peer) { - + ptl_handle_me_t meh; kptl_tx_t *tx; int rc; unsigned long flags; @@ -571,6 +538,8 @@ kptllnd_peer_check_sends (kptl_peer_t *peer) spin_lock_irqsave(&peer->peer_lock, flags); + peer->peer_retry_noop = 0; + if (list_empty(&peer->peer_sendq) && peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER && peer->peer_credits != 0) { @@ -584,10 +553,11 @@ kptllnd_peer_check_sends (kptl_peer_t *peer) libcfs_id2str(peer->peer_id)); } else { kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0); - kptllnd_post_tx(peer, tx); + kptllnd_post_tx(peer, tx, 0); } spin_lock_irqsave(&peer->peer_lock, flags); + peer->peer_retry_noop = (tx == NULL); } while (!list_empty(&peer->peer_sendq)) { @@ -595,15 +565,14 @@ kptllnd_peer_check_sends (kptl_peer_t *peer) LASSERT (tx->tx_active); LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE)); - LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE || - !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE)); + LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE)); LASSERT (peer->peer_outstanding_credits >= 0); - LASSERT (peer->peer_outstanding_credits <= + LASSERT (peer->peer_sent_credits >= 0); + LASSERT (peer->peer_sent_credits + + peer->peer_outstanding_credits <= *kptllnd_tunables.kptl_peercredits); LASSERT (peer->peer_credits >= 0); - LASSERT (peer->peer_credits <= - *kptllnd_tunables.kptl_peercredits); /* Ensure HELLO is sent first */ if (!peer->peer_sent_hello) { @@ -613,9 +582,11 @@ kptllnd_peer_check_sends (kptl_peer_t *peer) } if (peer->peer_credits == 0) { - CDEBUG(D_NETTRACE, "%s[%d/%d]: no credits for %p\n", - libcfs_id2str(peer->peer_id), - peer->peer_credits, peer->peer_outstanding_credits, tx); + CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n", + libcfs_id2str(peer->peer_id), + peer->peer_credits, + peer->peer_outstanding_credits, + peer->peer_sent_credits, tx); break; } @@ -623,9 +594,12 @@ kptllnd_peer_check_sends (kptl_peer_t *peer) * return */ if (peer->peer_credits == 1 && peer->peer_outstanding_credits == 0) { - CDEBUG(D_NETTRACE, "%s[%d/%d]: not using last credit for %p\n", - libcfs_id2str(peer->peer_id), - peer->peer_credits, peer->peer_outstanding_credits, tx); + CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: " + "not using last credit for %p\n", + libcfs_id2str(peer->peer_id), + peer->peer_credits, + peer->peer_outstanding_credits, + peer->peer_sent_credits, tx); break; } @@ -649,15 +623,26 @@ kptllnd_peer_check_sends (kptl_peer_t *peer) continue; } - /* fill last-minute msg header fields */ + /* fill last-minute msg fields */ kptllnd_msg_pack(tx->tx_msg, peer); + if (tx->tx_type == TX_TYPE_PUT_REQUEST || + tx->tx_type == TX_TYPE_GET_REQUEST) { + /* peer_next_matchbits must be known good */ + LASSERT (peer->peer_state >= PEER_STATE_ACTIVE); + /* Assume 64-bit matchbits can't wrap */ + LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS); + tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits = + peer->peer_next_matchbits++; + } + + peer->peer_sent_credits += peer->peer_outstanding_credits; peer->peer_outstanding_credits = 0; peer->peer_credits--; - CDEBUG(D_NETTRACE, "%s[%d/%d]: %s tx=%p nob=%d cred=%d\n", - libcfs_id2str(peer->peer_id), - peer->peer_credits, peer->peer_outstanding_credits, + CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n", + libcfs_id2str(peer->peer_id), peer->peer_credits, + peer->peer_outstanding_credits, peer->peer_sent_credits, kptllnd_msgtype2str(tx->tx_msg->ptlm_type), tx, tx->tx_msg->ptlm_nob, tx->tx_msg->ptlm_credits); @@ -668,8 +653,45 @@ kptllnd_peer_check_sends (kptl_peer_t *peer) spin_unlock_irqrestore(&peer->peer_lock, flags); + if (tx->tx_type == TX_TYPE_PUT_REQUEST || + tx->tx_type == TX_TYPE_GET_REQUEST) { + /* Post bulk now we have safe matchbits */ + rc = PtlMEAttach(kptllnd_data.kptl_nih, + *kptllnd_tunables.kptl_portal, + peer->peer_ptlid, + tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits, + 0, /* ignore bits */ + PTL_UNLINK, + PTL_INS_BEFORE, + &meh); + if (rc != PTL_OK) { + CERROR("PtlMEAttach(%s) failed: %s(%d)\n", + libcfs_id2str(peer->peer_id), + kptllnd_errtype2str(rc), rc); + goto failed; + } + + rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, + &tx->tx_rdma_mdh); + if (rc != PTL_OK) { + CERROR("PtlMDAttach(%s) failed: %s(%d)\n", + libcfs_id2str(tx->tx_peer->peer_id), + kptllnd_errtype2str(rc), rc); + rc = PtlMEUnlink(meh); + LASSERT(rc == PTL_OK); + tx->tx_rdma_mdh = PTL_INVALID_HANDLE; + goto failed; + } + /* I'm not racing with the event callback here. It's a + * bug if there's an event on the MD I just attached + * before I actually send the RDMA request message - + * probably matchbits re-used in error. */ + } + + tx->tx_tposted = jiffies; /* going on the wire */ + rc = PtlPut (tx->tx_msg_mdh, - PTL_NOACK_REQ, + tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ, peer->peer_ptlid, *kptllnd_tunables.kptl_portal, 0, /* acl cookie */ @@ -677,12 +699,10 @@ kptllnd_peer_check_sends (kptl_peer_t *peer) 0, /* offset */ 0); /* header data */ if (rc != PTL_OK) { - CERROR("PtlPut %s error %d\n", - libcfs_id2str(peer->peer_id), rc); - - /* Nuke everything (including this tx) */ - kptllnd_peer_close(peer, -EIO); - return; + CERROR("PtlPut %s error %s(%d)\n", + libcfs_id2str(peer->peer_id), + kptllnd_errtype2str(rc), rc); + goto failed; } kptllnd_tx_decref(tx); /* drop my ref */ @@ -691,6 +711,12 @@ kptllnd_peer_check_sends (kptl_peer_t *peer) } spin_unlock_irqrestore(&peer->peer_lock, flags); + return; + + failed: + /* Nuke everything (including tx we were trying) */ + kptllnd_peer_close(peer, -EIO); + kptllnd_tx_decref(tx); } kptl_tx_t * @@ -698,16 +724,12 @@ kptllnd_find_timed_out_tx(kptl_peer_t *peer) { kptl_tx_t *tx; struct list_head *tmp; - unsigned long flags; - - spin_lock_irqsave(&peer->peer_lock, flags); list_for_each(tmp, &peer->peer_sendq) { tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list); if (time_after_eq(jiffies, tx->tx_deadline)) { kptllnd_tx_addref(tx); - spin_unlock_irqrestore(&peer->peer_lock, flags); return tx; } } @@ -717,18 +739,16 @@ kptllnd_find_timed_out_tx(kptl_peer_t *peer) if (time_after_eq(jiffies, tx->tx_deadline)) { kptllnd_tx_addref(tx); - spin_unlock_irqrestore(&peer->peer_lock, flags); return tx; } } - spin_unlock_irqrestore(&peer->peer_lock, flags); return NULL; } void -kptllnd_peer_check_bucket (int idx) +kptllnd_peer_check_bucket (int idx, int stamp) { struct list_head *peers = &kptllnd_data.kptl_peers[idx]; struct list_head *ptmp; @@ -737,8 +757,9 @@ kptllnd_peer_check_bucket (int idx) unsigned long flags; int nsend; int nactive; + int check_sends; - CDEBUG(D_NET, "Bucket=%d\n", idx); + CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp); again: /* NB. Shared lock while I just look */ @@ -747,43 +768,64 @@ kptllnd_peer_check_bucket (int idx) list_for_each (ptmp, peers) { peer = list_entry (ptmp, kptl_peer_t, peer_list); - CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d\n", - libcfs_id2str(peer->peer_id), - peer->peer_credits, peer->peer_outstanding_credits); + CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n", + libcfs_id2str(peer->peer_id), peer->peer_credits, + peer->peer_outstanding_credits, peer->peer_sent_credits); + + spin_lock(&peer->peer_lock); - /* In case we have enough credits to return via a - * NOOP, but there were no non-blocking tx descs - * free to do it last time... */ - kptllnd_peer_check_sends(peer); + if (peer->peer_check_stamp == stamp) { + /* checked already this pass */ + spin_unlock(&peer->peer_lock); + continue; + } + peer->peer_check_stamp = stamp; tx = kptllnd_find_timed_out_tx(peer); - if (tx == NULL) + check_sends = peer->peer_retry_noop; + + spin_unlock(&peer->peer_lock); + + if (tx == NULL && !check_sends) continue; kptllnd_peer_addref(peer); /* 1 ref for me... */ - read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, - flags); + read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); + + if (tx == NULL) { /* nothing timed out */ + kptllnd_peer_check_sends(peer); + kptllnd_peer_decref(peer); /* ...until here or... */ + + /* rescan after dropping the lock */ + goto again; + } spin_lock_irqsave(&peer->peer_lock, flags); nsend = kptllnd_count_queue(&peer->peer_sendq); nactive = kptllnd_count_queue(&peer->peer_activeq); spin_unlock_irqrestore(&peer->peer_lock, flags); - LCONSOLE_ERROR("Timing out %s: please check Portals\n", - libcfs_id2str(peer->peer_id)); - - CERROR("%s timed out: cred %d outstanding %d sendq %d " - "activeq %d Tx %s (%s%s%s) status %d T/O %ds\n", - libcfs_id2str(peer->peer_id), - peer->peer_credits, peer->peer_outstanding_credits, - nsend, nactive, kptllnd_tx_typestr(tx->tx_type), + LCONSOLE_ERROR("Timing out %s: %s\n", + libcfs_id2str(peer->peer_id), + (tx->tx_tposted == 0) ? + "no free peer buffers" : "please check Portals"); + + CERROR("%s timed out: cred %d outstanding %d, sent %d, " + "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d " + "%sposted %lu T/O %ds\n", + libcfs_id2str(peer->peer_id), peer->peer_credits, + peer->peer_outstanding_credits, peer->peer_sent_credits, + nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type), tx->tx_active ? "A" : "", PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ? "" : "M", PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ? "" : "D", - tx->tx_status, *kptllnd_tunables.kptl_timeout); + tx->tx_status, + (tx->tx_tposted == 0) ? "not " : "", + (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted), + *kptllnd_tunables.kptl_timeout); kptllnd_dump_ptltrace(); @@ -919,22 +961,17 @@ kptllnd_peer_handle_hello (ptl_process_id_t initiator, return NULL; } - if (msg->ptlm_u.hello.kptlhm_max_msg_size != - *kptllnd_tunables.kptl_max_msg_size) { - CERROR("max message size MUST be equal for all peers: " - "got %d expected %d from %s\n", + if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) { + CERROR("%s: max message size %d < MIN %d", + libcfs_id2str(lpid), msg->ptlm_u.hello.kptlhm_max_msg_size, - *kptllnd_tunables.kptl_max_msg_size, - libcfs_id2str(lpid)); + PTLLND_MIN_BUFFER_SIZE); return NULL; } - if (msg->ptlm_credits + 1 != *kptllnd_tunables.kptl_peercredits) { - CERROR("peercredits MUST be equal on all peers: " - "got %d expected %d from %s\n", - msg->ptlm_credits + 1, - *kptllnd_tunables.kptl_peercredits, - libcfs_id2str(lpid)); + if (msg->ptlm_credits <= 1) { + CERROR("Need more than 1+%d credits from %s\n", + msg->ptlm_credits, libcfs_id2str(lpid)); return NULL; } @@ -946,15 +983,44 @@ kptllnd_peer_handle_hello (ptl_process_id_t initiator, /* Completing HELLO handshake */ LASSERT(peer->peer_incarnation == 0); + if (msg->ptlm_dststamp != 0 && + msg->ptlm_dststamp != peer->peer_myincarnation) { + write_unlock_irqrestore(g_lock, flags); + + CERROR("Ignoring HELLO from %s: unexpected " + "dststamp "LPX64" ("LPX64" wanted)\n", + libcfs_id2str(lpid), + msg->ptlm_dststamp, + peer->peer_myincarnation); + kptllnd_peer_decref(peer); + return NULL; + } + + /* Concurrent initiation or response to my HELLO */ peer->peer_state = PEER_STATE_ACTIVE; peer->peer_incarnation = msg->ptlm_srcstamp; peer->peer_next_matchbits = safe_matchbits; - + peer->peer_max_msg_size = + msg->ptlm_u.hello.kptlhm_max_msg_size; + write_unlock_irqrestore(g_lock, flags); return peer; } - /* remove old incarnation of this peer */ + if (msg->ptlm_dststamp != 0 && + msg->ptlm_dststamp <= peer->peer_myincarnation) { + write_unlock_irqrestore(g_lock, flags); + + CERROR("Ignoring stale HELLO from %s: " + "dststamp "LPX64" (current "LPX64")\n", + libcfs_id2str(lpid), + msg->ptlm_dststamp, + peer->peer_myincarnation); + kptllnd_peer_decref(peer); + return NULL; + } + + /* Brand new connection attempt: remove old incarnation */ kptllnd_peer_close_locked(peer, 0); } @@ -998,20 +1064,28 @@ kptllnd_peer_handle_hello (ptl_process_id_t initiator, } write_lock_irqsave(g_lock, flags); - + again: peer = kptllnd_id2peer_locked(lpid); if (peer != NULL) { if (peer->peer_state == PEER_STATE_WAITING_HELLO) { - /* An outgoing message instantiated 'peer' for me and - * presumably provoked this reply */ - CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid)); + /* An outgoing message instantiated 'peer' for me */ LASSERT(peer->peer_incarnation == 0); peer->peer_state = PEER_STATE_ACTIVE; peer->peer_incarnation = msg->ptlm_srcstamp; peer->peer_next_matchbits = safe_matchbits; + peer->peer_max_msg_size = + msg->ptlm_u.hello.kptlhm_max_msg_size; + + write_unlock_irqrestore(g_lock, flags); + + CWARN("Outgoing instantiated peer %s\n", + libcfs_id2str(lpid)); } else { LASSERT (peer->peer_state == PEER_STATE_ACTIVE); + + write_unlock_irqrestore(g_lock, flags); + /* WOW! Somehow this peer completed the HELLO * handshake while I slept. I guess I could have slept * while it rebooted and sent a new HELLO, so I'll fail @@ -1020,8 +1094,6 @@ kptllnd_peer_handle_hello (ptl_process_id_t initiator, kptllnd_peer_decref(peer); peer = NULL; } - - write_unlock_irqrestore(g_lock, flags); kptllnd_peer_unreserve_buffers(); kptllnd_peer_decref(new_peer); @@ -1048,6 +1120,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t initiator, write_lock_irqsave(g_lock, flags); kptllnd_data.kptl_expected_peers++; + goto again; } last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid); @@ -1060,6 +1133,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t initiator, new_peer->peer_incarnation = msg->ptlm_srcstamp; new_peer->peer_next_matchbits = safe_matchbits; new_peer->peer_last_matchbits_seen = last_matchbits_seen; + new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size; kptllnd_peer_add_peertable_locked(new_peer); @@ -1071,41 +1145,42 @@ kptllnd_peer_handle_hello (ptl_process_id_t initiator, CDEBUG(D_NETTRACE, "%s: post response hello %p\n", libcfs_id2str(new_peer->peer_id), hello_tx); - kptllnd_post_tx(new_peer, hello_tx); + kptllnd_post_tx(new_peer, hello_tx, 0); kptllnd_peer_check_sends(new_peer); return new_peer; } void -kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target) +kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag) +{ + kptllnd_post_tx(peer, tx, nfrag); + kptllnd_peer_check_sends(peer); +} + +int +kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target) { rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock; ptl_process_id_t ptl_id; - kptl_peer_t *peer; - kptl_peer_t *new_peer = NULL; - kptl_tx_t *hello_tx = NULL; + kptl_peer_t *new_peer; + kptl_tx_t *hello_tx; unsigned long flags; int rc; __u64 last_matchbits_seen; - LASSERT (tx->tx_lnet_msg != NULL); - LASSERT (tx->tx_peer == NULL); - /* I expect to find the peer, so I only take a read lock... */ read_lock_irqsave(g_lock, flags); - peer = kptllnd_id2peer_locked(target); + *peerp = kptllnd_id2peer_locked(target); read_unlock_irqrestore(g_lock, flags); - if (peer != NULL) { - goto post; - } + if (*peerp != NULL) + return 0; if ((target.pid & LNET_PID_USERFLAG) != 0) { CWARN("Refusing to create a new connection to %s " "(non-kernel peer)\n", libcfs_id2str(target)); - tx->tx_status = -EHOSTUNREACH; - goto failed; + return -EHOSTUNREACH; } /* The new peer is a kernel ptllnd, and kernel ptllnds all have @@ -1113,24 +1188,11 @@ kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target) ptl_id.nid = kptllnd_lnet2ptlnid(target.nid); ptl_id.pid = kptllnd_data.kptl_portals_id.pid; - write_lock_irqsave(g_lock, flags); - - peer = kptllnd_id2peer_locked(target); - if (peer != NULL) { - write_unlock_irqrestore(g_lock, flags); - goto post; - } - - kptllnd_cull_peertable_locked(target); - - write_unlock_irqrestore(g_lock, flags); - hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE); if (hello_tx == NULL) { CERROR("Unable to allocate connect message for %s\n", libcfs_id2str(target)); - tx->tx_status = -ENOMEM; - goto failed; + return -ENOMEM; } kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO, @@ -1138,28 +1200,24 @@ kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target) new_peer = kptllnd_peer_allocate(target, ptl_id); if (new_peer == NULL) { - tx->tx_status = -ENOMEM; - goto failed; + rc = -ENOMEM; + goto unwind_0; } rc = kptllnd_peer_reserve_buffers(); - if (rc != 0) { - tx->tx_status = rc; - goto failed; - } + if (rc != 0) + goto unwind_1; write_lock_irqsave(g_lock, flags); - - peer = kptllnd_id2peer_locked(target); - if (peer != NULL) { /* someone else beat me to it */ + again: + *peerp = kptllnd_id2peer_locked(target); + if (*peerp != NULL) { write_unlock_irqrestore(g_lock, flags); - - kptllnd_peer_unreserve_buffers(); - kptllnd_peer_decref(new_peer); - kptllnd_tx_decref(hello_tx); - goto post; + goto unwind_2; } - + + kptllnd_cull_peertable_locked(target); + if (kptllnd_data.kptl_n_active_peers == kptllnd_data.kptl_expected_peers) { /* peer table full */ @@ -1171,12 +1229,12 @@ kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target) if (rc != 0) { CERROR("Can't create connection to %s\n", libcfs_id2str(target)); - kptllnd_peer_unreserve_buffers(); - tx->tx_status = -ENOMEM; - goto failed; + rc = -ENOMEM; + goto unwind_2; } write_lock_irqsave(g_lock, flags); kptllnd_data.kptl_expected_peers++; + goto again; } last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target); @@ -1198,23 +1256,18 @@ kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target) CDEBUG(D_NETTRACE, "%s: post initial hello %p\n", libcfs_id2str(new_peer->peer_id), hello_tx); - peer = new_peer; - kptllnd_post_tx(peer, hello_tx); - - post: - kptllnd_post_tx(peer, tx); - kptllnd_peer_check_sends(peer); - kptllnd_peer_decref(peer); - return; + kptllnd_post_tx(new_peer, hello_tx, 0); + kptllnd_peer_check_sends(new_peer); + + *peerp = new_peer; + return 0; - failed: - if (hello_tx != NULL) - kptllnd_tx_decref(hello_tx); - - if (new_peer != NULL) - kptllnd_peer_decref(new_peer); + unwind_2: + kptllnd_peer_unreserve_buffers(); + unwind_1: + kptllnd_peer_decref(new_peer); + unwind_0: + kptllnd_tx_decref(hello_tx); - LASSERT (tx->tx_status != 0); - kptllnd_tx_decref(tx); - + return rc; }