4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lnet/klnds/ptllnd/ptllnd_peer.c
36 * Author: PJ Kirner <pjkirner@clusterfs.com>
37 * Author: E Barton <eeb@bartonsoftware.com>
41 #include <libcfs/list.h>
44 kptllnd_count_queue(cfs_list_t *q)
49 cfs_list_for_each(e, q) {
57 kptllnd_get_peer_info(int index,
58 lnet_process_id_t *id,
59 int *state, int *sent_hello,
60 int *refcount, __u64 *incarnation,
61 __u64 *next_matchbits, __u64 *last_matchbits_seen,
62 int *nsendq, int *nactiveq,
63 int *credits, int *outstanding_credits)
65 cfs_rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
72 cfs_read_lock_irqsave(g_lock, flags);
74 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
75 cfs_list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
76 peer = cfs_list_entry(ptmp, kptl_peer_t, peer_list);
82 *state = peer->peer_state;
83 *sent_hello = peer->peer_sent_hello;
84 *refcount = cfs_atomic_read(&peer->peer_refcount);
85 *incarnation = peer->peer_incarnation;
87 cfs_spin_lock(&peer->peer_lock);
89 *next_matchbits = peer->peer_next_matchbits;
90 *last_matchbits_seen = peer->peer_last_matchbits_seen;
91 *credits = peer->peer_credits;
92 *outstanding_credits = peer->peer_outstanding_credits;
94 *nsendq = kptllnd_count_queue(&peer->peer_sendq);
95 *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
97 cfs_spin_unlock(&peer->peer_lock);
105 cfs_read_unlock_irqrestore(g_lock, flags);
110 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
112 LASSERT (kptllnd_data.kptl_n_active_peers <
113 kptllnd_data.kptl_expected_peers);
115 LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
116 peer->peer_state == PEER_STATE_ACTIVE);
118 kptllnd_data.kptl_n_active_peers++;
119 cfs_atomic_inc(&peer->peer_refcount); /* +1 ref for the list */
121 /* NB add to HEAD of peer list for MRU order!
122 * (see kptllnd_cull_peertable) */
123 cfs_list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
127 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
129 /* I'm about to add a new peer with this portals ID to the peer table,
130 * so (a) this peer should not exist already and (b) I want to leave at
131 * most (max_procs_per_nid - 1) peers with this NID in the table. */
132 cfs_list_t *peers = kptllnd_nid2peerlist(pid.nid);
133 int cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
140 cfs_list_for_each_safe (tmp, nxt, peers) {
141 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
143 peer = cfs_list_entry(tmp, kptl_peer_t, peer_list);
145 if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
148 LASSERT (peer->peer_id.pid != pid.pid);
152 if (count < cull_count) /* recent (don't cull) */
155 CDEBUG(D_NET, "Cull %s(%s)\n",
156 libcfs_id2str(peer->peer_id),
157 kptllnd_ptlid2str(peer->peer_ptlid));
159 kptllnd_peer_close_locked(peer, 0);
164 kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
169 LIBCFS_ALLOC(peer, sizeof (*peer));
171 CERROR("Can't create peer %s (%s)\n",
173 kptllnd_ptlid2str(ppid));
177 memset(peer, 0, sizeof(*peer)); /* zero flags etc */
179 CFS_INIT_LIST_HEAD (&peer->peer_noops);
180 CFS_INIT_LIST_HEAD (&peer->peer_sendq);
181 CFS_INIT_LIST_HEAD (&peer->peer_activeq);
182 cfs_spin_lock_init (&peer->peer_lock);
184 peer->peer_state = PEER_STATE_ALLOCATED;
185 peer->peer_error = 0;
186 peer->peer_last_alive = 0;
187 peer->peer_id = lpid;
188 peer->peer_ptlid = ppid;
189 peer->peer_credits = 1; /* enough for HELLO */
190 peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
191 peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
192 peer->peer_sent_credits = 1; /* HELLO credit is implicit */
193 peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
195 cfs_atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */
197 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
199 peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
201 /* Only increase # peers under lock, to guarantee we dont grow it
203 if (net->net_shutdown) {
204 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
206 LIBCFS_FREE(peer, sizeof(*peer));
210 kptllnd_data.kptl_npeers++;
211 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
216 kptllnd_peer_destroy (kptl_peer_t *peer)
220 CDEBUG(D_NET, "Peer=%p\n", peer);
222 LASSERT (!cfs_in_interrupt());
223 LASSERT (cfs_atomic_read(&peer->peer_refcount) == 0);
224 LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
225 peer->peer_state == PEER_STATE_ZOMBIE);
226 LASSERT (cfs_list_empty(&peer->peer_noops));
227 LASSERT (cfs_list_empty(&peer->peer_sendq));
228 LASSERT (cfs_list_empty(&peer->peer_activeq));
230 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
232 if (peer->peer_state == PEER_STATE_ZOMBIE)
233 cfs_list_del(&peer->peer_list);
235 kptllnd_data.kptl_npeers--;
237 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
239 LIBCFS_FREE (peer, sizeof (*peer));
243 kptllnd_cancel_txlist (cfs_list_t *peerq, cfs_list_t *txs)
249 cfs_list_for_each_safe (tmp, nxt, peerq) {
250 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
252 cfs_list_del(&tx->tx_list);
253 cfs_list_add_tail(&tx->tx_list, txs);
255 tx->tx_status = -EIO;
261 kptllnd_peer_cancel_txs(kptl_peer_t *peer, cfs_list_t *txs)
265 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
267 kptllnd_cancel_txlist(&peer->peer_noops, txs);
268 kptllnd_cancel_txlist(&peer->peer_sendq, txs);
269 kptllnd_cancel_txlist(&peer->peer_activeq, txs);
271 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
275 kptllnd_peer_alive (kptl_peer_t *peer)
277 /* This is racy, but everyone's only writing cfs_time_current() */
278 peer->peer_last_alive = cfs_time_current();
283 kptllnd_peer_notify (kptl_peer_t *peer)
291 cfs_time_t last_alive = 0;
293 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
295 if (peer->peer_error != 0) {
296 error = peer->peer_error;
297 peer->peer_error = 0;
298 last_alive = peer->peer_last_alive;
301 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
306 cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
307 cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
309 cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
311 if (nnets == 0) /* shutdown in progress */
314 LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
316 CERROR("Failed to allocate nets[%d]\n", nnets);
319 memset(nets, 0, nnets * sizeof(*nets));
321 cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
323 cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
326 kptllnd_net_addref(net);
329 cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
331 for (i = 0; i < nnets; i++) {
338 if (!net->net_shutdown) {
339 peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
340 peer->peer_ptlid.nid);
341 lnet_notify(net->net_ni, peer_nid, 0, last_alive);
344 kptllnd_net_decref(net);
347 LIBCFS_FREE(nets, nnets * sizeof(*nets));
351 kptllnd_handle_closing_peers ()
361 /* Check with a read lock first to avoid blocking anyone */
363 cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
364 idle = cfs_list_empty(&kptllnd_data.kptl_closing_peers) &&
365 cfs_list_empty(&kptllnd_data.kptl_zombie_peers);
366 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
371 CFS_INIT_LIST_HEAD(&txs);
373 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
375 /* Cancel txs on all zombie peers. NB anyone dropping the last peer
376 * ref removes it from this list, so I musn't drop the lock while
378 cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
379 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
381 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
383 kptllnd_peer_cancel_txs(peer, &txs);
386 /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB
387 * I'm the only one removing from this list, but peers can be added on
388 * the end any time I drop the lock. */
390 cfs_list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
391 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
393 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
395 cfs_list_del(&peer->peer_list);
396 cfs_list_add_tail(&peer->peer_list,
397 &kptllnd_data.kptl_zombie_peers);
398 peer->peer_state = PEER_STATE_ZOMBIE;
400 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
403 kptllnd_peer_notify(peer);
404 kptllnd_peer_cancel_txs(peer, &txs);
405 kptllnd_peer_decref(peer);
407 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
410 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
412 /* Drop peer's ref on all cancelled txs. This will get
413 * kptllnd_tx_fini() to abort outstanding comms if necessary. */
415 cfs_list_for_each_safe (tmp, nxt, &txs) {
416 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
417 cfs_list_del(&tx->tx_list);
418 kptllnd_tx_decref(tx);
423 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
425 switch (peer->peer_state) {
429 case PEER_STATE_WAITING_HELLO:
430 case PEER_STATE_ACTIVE:
431 /* Ensure new peers see a new incarnation of me */
432 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
433 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
434 kptllnd_data.kptl_incarnation++;
436 /* Removing from peer table */
437 kptllnd_data.kptl_n_active_peers--;
438 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
440 cfs_list_del(&peer->peer_list);
441 kptllnd_peer_unreserve_buffers();
443 peer->peer_error = why; /* stash 'why' only on first close */
444 peer->peer_state = PEER_STATE_CLOSING;
446 /* Schedule for immediate attention, taking peer table's ref */
447 cfs_list_add_tail(&peer->peer_list,
448 &kptllnd_data.kptl_closing_peers);
449 cfs_waitq_signal(&kptllnd_data.kptl_watchdog_waitq);
452 case PEER_STATE_ZOMBIE:
453 case PEER_STATE_CLOSING:
459 kptllnd_peer_close(kptl_peer_t *peer, int why)
463 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
464 kptllnd_peer_close_locked(peer, why);
465 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
469 kptllnd_peer_del(lnet_process_id_t id)
481 * Find the single bucket we are supposed to look at or if nid is a
482 * wildcard (LNET_NID_ANY) then look at all of the buckets
484 if (id.nid != LNET_NID_ANY) {
485 cfs_list_t *l = kptllnd_nid2peerlist(id.nid);
487 lo = hi = l - kptllnd_data.kptl_peers;
489 if (id.pid != LNET_PID_ANY)
493 hi = kptllnd_data.kptl_peer_hash_size - 1;
497 cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
499 for (i = lo; i <= hi; i++) {
500 cfs_list_for_each_safe (ptmp, pnxt,
501 &kptllnd_data.kptl_peers[i]) {
502 peer = cfs_list_entry (ptmp, kptl_peer_t, peer_list);
504 if (!(id.nid == LNET_NID_ANY ||
505 (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
506 (id.pid == LNET_PID_ANY ||
507 peer->peer_id.pid == id.pid))))
510 kptllnd_peer_addref(peer); /* 1 ref for me... */
512 cfs_read_unlock_irqrestore(&kptllnd_data. \
516 kptllnd_peer_close(peer, 0);
517 kptllnd_peer_decref(peer); /* ...until here */
519 rc = 0; /* matched something */
521 /* start again now I've dropped the lock */
526 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
532 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
534 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
537 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
539 /* Ensure HELLO is sent first */
540 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
541 cfs_list_add(&tx->tx_list, &peer->peer_noops);
542 else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
543 cfs_list_add(&tx->tx_list, &peer->peer_sendq);
545 cfs_list_add_tail(&tx->tx_list, &peer->peer_sendq);
547 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
552 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
554 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
555 ptl_handle_md_t msg_mdh;
559 LASSERT (!tx->tx_idle);
560 LASSERT (!tx->tx_active);
561 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
562 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
563 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
564 tx->tx_type == TX_TYPE_PUT_REQUEST ||
565 tx->tx_type == TX_TYPE_GET_REQUEST);
567 kptllnd_set_tx_peer(tx, peer);
569 memset(&md, 0, sizeof(md));
571 md.threshold = tx->tx_acked ? 2 : 1; /* SEND END + ACK? */
572 md.options = PTL_MD_OP_PUT |
573 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
574 PTL_MD_EVENT_START_DISABLE;
575 md.user_ptr = &tx->tx_msg_eventarg;
576 md.eq_handle = kptllnd_data.kptl_eqh;
579 md.start = tx->tx_msg;
580 md.length = tx->tx_msg->ptlm_nob;
583 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
585 md.start = tx->tx_frags;
587 md.options |= PTL_MD_IOVEC;
590 prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
592 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
593 libcfs_id2str(peer->peer_id),
594 kptllnd_errtype2str(prc), prc);
595 tx->tx_status = -EIO;
596 kptllnd_tx_decref(tx);
601 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ);
603 tx->tx_msg_mdh = msg_mdh;
604 kptllnd_queue_tx(peer, tx);
607 /* NB "restarts" comes from peer_sendq of a single peer */
609 kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target,
610 cfs_list_t *restarts)
616 LASSERT (!cfs_list_empty(restarts));
618 if (kptllnd_find_target(net, target, &peer) != 0)
621 cfs_list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
622 LASSERT (tx->tx_peer != NULL);
623 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
624 tx->tx_type == TX_TYPE_PUT_REQUEST ||
625 tx->tx_type == TX_TYPE_SMALL_MESSAGE);
627 cfs_list_del_init(&tx->tx_list);
630 tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
631 kptllnd_tx_decref(tx);
635 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
638 kptllnd_peer_decref(tx->tx_peer);
640 kptllnd_set_tx_peer(tx, peer);
641 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
647 kptllnd_peer_check_sends(peer);
648 kptllnd_peer_decref(peer);
652 kptllnd_peer_send_noop (kptl_peer_t *peer)
654 if (!peer->peer_sent_hello ||
655 peer->peer_credits == 0 ||
656 !cfs_list_empty(&peer->peer_noops) ||
657 peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
660 /* No tx to piggyback NOOP onto or no credit to send a tx */
661 return (cfs_list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
665 kptllnd_peer_check_sends (kptl_peer_t *peer)
673 LASSERT(!cfs_in_interrupt());
675 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
677 peer->peer_retry_noop = 0;
679 if (kptllnd_peer_send_noop(peer)) {
680 /* post a NOOP to return credits */
681 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
683 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
685 CERROR("Can't return credits to %s: can't allocate descriptor\n",
686 libcfs_id2str(peer->peer_id));
688 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
690 kptllnd_post_tx(peer, tx, 0);
693 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
694 peer->peer_retry_noop = (tx == NULL);
698 if (!cfs_list_empty(&peer->peer_noops)) {
699 LASSERT (peer->peer_sent_hello);
700 tx = cfs_list_entry(peer->peer_noops.next,
702 } else if (!cfs_list_empty(&peer->peer_sendq)) {
703 tx = cfs_list_entry(peer->peer_sendq.next,
706 /* nothing to send right now */
710 LASSERT (tx->tx_active);
711 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
712 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
714 LASSERT (peer->peer_outstanding_credits >= 0);
715 LASSERT (peer->peer_sent_credits >= 0);
716 LASSERT (peer->peer_sent_credits +
717 peer->peer_outstanding_credits <=
718 *kptllnd_tunables.kptl_peertxcredits);
719 LASSERT (peer->peer_credits >= 0);
721 msg_type = tx->tx_msg->ptlm_type;
723 /* Ensure HELLO is sent first */
724 if (!peer->peer_sent_hello) {
725 LASSERT (cfs_list_empty(&peer->peer_noops));
726 if (msg_type != PTLLND_MSG_TYPE_HELLO)
728 peer->peer_sent_hello = 1;
731 if (peer->peer_credits == 0) {
732 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
733 libcfs_id2str(peer->peer_id),
735 peer->peer_outstanding_credits,
736 peer->peer_sent_credits,
737 kptllnd_msgtype2str(msg_type), tx);
741 /* Last/Initial credit reserved for NOOP/HELLO */
742 if (peer->peer_credits == 1 &&
743 msg_type != PTLLND_MSG_TYPE_HELLO &&
744 msg_type != PTLLND_MSG_TYPE_NOOP) {
745 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
746 "not using last credit for %s[%p]\n",
747 libcfs_id2str(peer->peer_id),
749 peer->peer_outstanding_credits,
750 peer->peer_sent_credits,
751 kptllnd_msgtype2str(msg_type), tx);
755 cfs_list_del(&tx->tx_list);
757 /* Discard any NOOP I queued if I'm not at the high-water mark
758 * any more or more messages have been queued */
759 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
760 !kptllnd_peer_send_noop(peer)) {
763 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
765 CDEBUG(D_NET, "%s: redundant noop\n",
766 libcfs_id2str(peer->peer_id));
767 kptllnd_tx_decref(tx);
769 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
773 /* fill last-minute msg fields */
774 kptllnd_msg_pack(tx->tx_msg, peer);
776 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
777 tx->tx_type == TX_TYPE_GET_REQUEST) {
778 /* peer_next_matchbits must be known good */
779 LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
780 /* Assume 64-bit matchbits can't wrap */
781 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
782 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
783 peer->peer_next_matchbits++;
786 peer->peer_sent_credits += peer->peer_outstanding_credits;
787 peer->peer_outstanding_credits = 0;
788 peer->peer_credits--;
790 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
791 libcfs_id2str(peer->peer_id), peer->peer_credits,
792 peer->peer_outstanding_credits, peer->peer_sent_credits,
793 kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
794 tx->tx_msg->ptlm_credits);
796 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
798 kptllnd_tx_addref(tx); /* 1 ref for me... */
800 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
802 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
803 tx->tx_type == TX_TYPE_GET_REQUEST) {
804 /* Post bulk now we have safe matchbits */
805 rc = PtlMEAttach(kptllnd_data.kptl_nih,
806 *kptllnd_tunables.kptl_portal,
808 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
814 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
815 libcfs_id2str(peer->peer_id),
816 kptllnd_errtype2str(rc), rc);
820 rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
823 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
824 libcfs_id2str(tx->tx_peer->peer_id),
825 kptllnd_errtype2str(rc), rc);
826 rc = PtlMEUnlink(meh);
827 LASSERT(rc == PTL_OK);
828 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
831 /* I'm not racing with the event callback here. It's a
832 * bug if there's an event on the MD I just attached
833 * before I actually send the RDMA request message -
834 * probably matchbits re-used in error. */
837 tx->tx_tposted = jiffies; /* going on the wire */
839 rc = PtlPut (tx->tx_msg_mdh,
840 tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
842 *kptllnd_tunables.kptl_portal,
846 0); /* header data */
848 CERROR("PtlPut %s error %s(%d)\n",
849 libcfs_id2str(peer->peer_id),
850 kptllnd_errtype2str(rc), rc);
854 kptllnd_tx_decref(tx); /* drop my ref */
856 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
859 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
863 /* Nuke everything (including tx we were trying) */
864 kptllnd_peer_close(peer, -EIO);
865 kptllnd_tx_decref(tx);
866 kptllnd_schedule_ptltrace_dump();
870 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
875 cfs_list_for_each(ele, &peer->peer_sendq) {
876 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
878 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
879 kptllnd_tx_addref(tx);
884 cfs_list_for_each(ele, &peer->peer_activeq) {
885 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
887 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
888 kptllnd_tx_addref(tx);
898 kptllnd_peer_check_bucket (int idx, int stamp)
900 cfs_list_t *peers = &kptllnd_data.kptl_peers[idx];
904 CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
907 /* NB. Shared lock while I just look */
908 cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
910 cfs_list_for_each_entry (peer, peers, peer_list) {
913 int c = -1, oc = -1, sc = -1;
914 int nsend = -1, nactive = -1;
915 int sent_hello = -1, state = -1;
917 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
918 libcfs_id2str(peer->peer_id), peer->peer_credits,
919 peer->peer_outstanding_credits, peer->peer_sent_credits);
921 cfs_spin_lock(&peer->peer_lock);
923 if (peer->peer_check_stamp == stamp) {
924 /* checked already this pass */
925 cfs_spin_unlock(&peer->peer_lock);
929 peer->peer_check_stamp = stamp;
930 tx = kptllnd_find_timed_out_tx(peer);
931 check_sends = peer->peer_retry_noop;
934 c = peer->peer_credits;
935 sc = peer->peer_sent_credits;
936 oc = peer->peer_outstanding_credits;
937 state = peer->peer_state;
938 sent_hello = peer->peer_sent_hello;
939 nsend = kptllnd_count_queue(&peer->peer_sendq);
940 nactive = kptllnd_count_queue(&peer->peer_activeq);
943 cfs_spin_unlock(&peer->peer_lock);
945 if (tx == NULL && !check_sends)
948 kptllnd_peer_addref(peer); /* 1 ref for me... */
950 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
953 if (tx == NULL) { /* nothing timed out */
954 kptllnd_peer_check_sends(peer);
955 kptllnd_peer_decref(peer); /* ...until here or... */
957 /* rescan after dropping the lock */
961 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
962 libcfs_id2str(peer->peer_id),
963 (tx->tx_tposted == 0) ?
964 "no free peer buffers" :
965 "please check Portals");
967 if (tx->tx_tposted) {
968 CERROR("Could not send to %s after %ds (sent %lds ago); "
969 "check Portals for possible issues\n",
970 libcfs_id2str(peer->peer_id),
971 *kptllnd_tunables.kptl_timeout,
972 cfs_duration_sec(jiffies - tx->tx_tposted));
973 } else if (state < PEER_STATE_ACTIVE) {
974 CERROR("Could not connect %s (%d) after %ds; "
975 "peer might be down\n",
976 libcfs_id2str(peer->peer_id), state,
977 *kptllnd_tunables.kptl_timeout);
979 CERROR("Could not get credits for %s after %ds; "
980 "possible Lustre networking issues\n",
981 libcfs_id2str(peer->peer_id),
982 *kptllnd_tunables.kptl_timeout);
985 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
986 "state %d, sent_hello %d, sendq %d, activeq %d "
987 "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
988 libcfs_id2str(peer->peer_id), c, oc, sc,
989 state, sent_hello, nsend, nactive,
990 tx, kptllnd_tx_typestr(tx->tx_type),
991 kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
992 tx->tx_active ? "A" : "",
993 PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
995 PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
998 (tx->tx_tposted == 0) ? "not " : "",
999 (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
1000 *kptllnd_tunables.kptl_timeout);
1003 if (*kptllnd_tunables.kptl_ptltrace_on_timeout)
1004 kptllnd_dump_ptltrace();
1007 kptllnd_tx_decref(tx);
1009 kptllnd_peer_close(peer, -ETIMEDOUT);
1010 kptllnd_peer_decref(peer); /* ...until here */
1012 /* start again now I've dropped the lock */
1016 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
1020 kptllnd_id2peer_locked (lnet_process_id_t id)
1022 cfs_list_t *peers = kptllnd_nid2peerlist(id.nid);
1026 cfs_list_for_each (tmp, peers) {
1027 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1029 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
1030 peer->peer_state == PEER_STATE_ACTIVE);
1032 /* NB logical LNet peers share one kptl_peer_t */
1033 if (peer->peer_id.pid != id.pid ||
1034 LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
1037 kptllnd_peer_addref(peer);
1039 CDEBUG(D_NET, "%s -> %s (%d)\n",
1041 kptllnd_ptlid2str(peer->peer_ptlid),
1042 cfs_atomic_read (&peer->peer_refcount));
1050 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
1052 LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
1053 "messages may be dropped\n",
1054 str, libcfs_id2str(id),
1055 kptllnd_data.kptl_n_active_peers);
1056 LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1057 "'max_nodes' or 'max_procs_per_node'\n");
1061 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1066 /* Find the last matchbits I saw this new peer using. Note..
1067 A. This peer cannot be in the peer table - she's new!
1068 B. If I can't find the peer in the closing/zombie peers, all
1069 matchbits are safe because all refs to the (old) peer have gone
1070 so all txs have completed so there's no risk of matchbit
1074 LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1076 /* peer's last matchbits can't change after it comes out of the peer
1077 * table, so first match is fine */
1079 cfs_list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1080 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1082 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1083 peer->peer_id.pid == lpid.pid)
1084 return peer->peer_last_matchbits_seen;
1087 cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1088 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1090 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1091 peer->peer_id.pid == lpid.pid)
1092 return peer->peer_last_matchbits_seen;
1095 return PTL_RESERVED_MATCHBITS;
1099 kptllnd_peer_handle_hello (kptl_net_t *net,
1100 ptl_process_id_t initiator, kptl_msg_t *msg)
1102 cfs_rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1104 kptl_peer_t *new_peer;
1105 lnet_process_id_t lpid;
1106 unsigned long flags;
1107 kptl_tx_t *hello_tx;
1109 __u64 safe_matchbits;
1110 __u64 last_matchbits_seen;
1112 lpid.nid = msg->ptlm_srcnid;
1113 lpid.pid = msg->ptlm_srcpid;
1115 CDEBUG(D_NET, "hello from %s(%s)\n",
1116 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1118 if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1119 (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1120 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1121 * userspace. Refuse the connection if she hasn't set the
1122 * correct flag in her PID... */
1123 CERROR("Userflag not set in hello from %s (%s)\n",
1124 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1128 /* kptlhm_matchbits are the highest matchbits my peer may have used to
1129 * RDMA to me. I ensure I never register buffers for RDMA that could
1130 * match any she used */
1131 safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1133 if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1134 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1135 safe_matchbits, libcfs_id2str(lpid));
1139 if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1140 CERROR("%s: max message size %d < MIN %d",
1141 libcfs_id2str(lpid),
1142 msg->ptlm_u.hello.kptlhm_max_msg_size,
1143 PTLLND_MIN_BUFFER_SIZE);
1147 if (msg->ptlm_credits <= 1) {
1148 CERROR("Need more than 1+%d credits from %s\n",
1149 msg->ptlm_credits, libcfs_id2str(lpid));
1153 cfs_write_lock_irqsave(g_lock, flags);
1155 peer = kptllnd_id2peer_locked(lpid);
1157 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1158 /* Completing HELLO handshake */
1159 LASSERT(peer->peer_incarnation == 0);
1161 if (msg->ptlm_dststamp != 0 &&
1162 msg->ptlm_dststamp != peer->peer_myincarnation) {
1163 cfs_write_unlock_irqrestore(g_lock, flags);
1165 CERROR("Ignoring HELLO from %s: unexpected "
1166 "dststamp "LPX64" ("LPX64" wanted)\n",
1167 libcfs_id2str(lpid),
1169 peer->peer_myincarnation);
1170 kptllnd_peer_decref(peer);
1174 /* Concurrent initiation or response to my HELLO */
1175 peer->peer_state = PEER_STATE_ACTIVE;
1176 peer->peer_incarnation = msg->ptlm_srcstamp;
1177 peer->peer_next_matchbits = safe_matchbits;
1178 peer->peer_max_msg_size =
1179 msg->ptlm_u.hello.kptlhm_max_msg_size;
1181 cfs_write_unlock_irqrestore(g_lock, flags);
1185 if (msg->ptlm_dststamp != 0 &&
1186 msg->ptlm_dststamp <= peer->peer_myincarnation) {
1187 cfs_write_unlock_irqrestore(g_lock, flags);
1189 CERROR("Ignoring stale HELLO from %s: "
1190 "dststamp "LPX64" (current "LPX64")\n",
1191 libcfs_id2str(lpid),
1193 peer->peer_myincarnation);
1194 kptllnd_peer_decref(peer);
1198 /* Brand new connection attempt: remove old incarnation */
1199 kptllnd_peer_close_locked(peer, 0);
1202 kptllnd_cull_peertable_locked(lpid);
1204 cfs_write_unlock_irqrestore(g_lock, flags);
1207 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1208 " stamp "LPX64"("LPX64")\n",
1209 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1210 msg->ptlm_srcstamp, peer->peer_incarnation);
1212 kptllnd_peer_decref(peer);
1216 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1217 if (hello_tx == NULL) {
1218 CERROR("Unable to allocate HELLO message for %s\n",
1219 libcfs_id2str(lpid));
1223 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1224 lpid, sizeof(kptl_hello_msg_t));
1226 new_peer = kptllnd_peer_allocate(net, lpid, initiator);
1227 if (new_peer == NULL) {
1228 kptllnd_tx_decref(hello_tx);
1232 rc = kptllnd_peer_reserve_buffers();
1234 kptllnd_peer_decref(new_peer);
1235 kptllnd_tx_decref(hello_tx);
1237 CERROR("Failed to reserve buffers for %s\n",
1238 libcfs_id2str(lpid));
1242 cfs_write_lock_irqsave(g_lock, flags);
1245 if (net->net_shutdown) {
1246 cfs_write_unlock_irqrestore(g_lock, flags);
1248 CERROR ("Shutdown started, refusing connection from %s\n",
1249 libcfs_id2str(lpid));
1250 kptllnd_peer_unreserve_buffers();
1251 kptllnd_peer_decref(new_peer);
1252 kptllnd_tx_decref(hello_tx);
1256 peer = kptllnd_id2peer_locked(lpid);
1258 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1259 /* An outgoing message instantiated 'peer' for me */
1260 LASSERT(peer->peer_incarnation == 0);
1262 peer->peer_state = PEER_STATE_ACTIVE;
1263 peer->peer_incarnation = msg->ptlm_srcstamp;
1264 peer->peer_next_matchbits = safe_matchbits;
1265 peer->peer_max_msg_size =
1266 msg->ptlm_u.hello.kptlhm_max_msg_size;
1268 cfs_write_unlock_irqrestore(g_lock, flags);
1270 CWARN("Outgoing instantiated peer %s\n",
1271 libcfs_id2str(lpid));
1273 LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1275 cfs_write_unlock_irqrestore(g_lock, flags);
1277 /* WOW! Somehow this peer completed the HELLO
1278 * handshake while I slept. I guess I could have slept
1279 * while it rebooted and sent a new HELLO, so I'll fail
1281 CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1282 kptllnd_peer_decref(peer);
1286 kptllnd_peer_unreserve_buffers();
1287 kptllnd_peer_decref(new_peer);
1288 kptllnd_tx_decref(hello_tx);
1292 if (kptllnd_data.kptl_n_active_peers ==
1293 kptllnd_data.kptl_expected_peers) {
1294 /* peer table full */
1295 cfs_write_unlock_irqrestore(g_lock, flags);
1297 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1299 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1301 CERROR("Refusing connection from %s\n",
1302 libcfs_id2str(lpid));
1303 kptllnd_peer_unreserve_buffers();
1304 kptllnd_peer_decref(new_peer);
1305 kptllnd_tx_decref(hello_tx);
1309 cfs_write_lock_irqsave(g_lock, flags);
1310 kptllnd_data.kptl_expected_peers++;
1314 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1316 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1317 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1318 *kptllnd_tunables.kptl_max_msg_size;
1320 new_peer->peer_state = PEER_STATE_ACTIVE;
1321 new_peer->peer_incarnation = msg->ptlm_srcstamp;
1322 new_peer->peer_next_matchbits = safe_matchbits;
1323 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1324 new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1326 LASSERT (!net->net_shutdown);
1327 kptllnd_peer_add_peertable_locked(new_peer);
1329 cfs_write_unlock_irqrestore(g_lock, flags);
1331 /* NB someone else could get in now and post a message before I post
1332 * the HELLO, but post_tx/check_sends take care of that! */
1334 CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1335 libcfs_id2str(new_peer->peer_id), hello_tx);
1337 kptllnd_post_tx(new_peer, hello_tx, 0);
1338 kptllnd_peer_check_sends(new_peer);
1344 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1346 kptllnd_post_tx(peer, tx, nfrag);
1347 kptllnd_peer_check_sends(peer);
1351 kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
1352 kptl_peer_t **peerp)
1354 cfs_rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1355 ptl_process_id_t ptl_id;
1356 kptl_peer_t *new_peer;
1357 kptl_tx_t *hello_tx;
1358 unsigned long flags;
1360 __u64 last_matchbits_seen;
1362 /* I expect to find the peer, so I only take a read lock... */
1363 cfs_read_lock_irqsave(g_lock, flags);
1364 *peerp = kptllnd_id2peer_locked(target);
1365 cfs_read_unlock_irqrestore(g_lock, flags);
1370 if ((target.pid & LNET_PID_USERFLAG) != 0) {
1371 CWARN("Refusing to create a new connection to %s "
1372 "(non-kernel peer)\n", libcfs_id2str(target));
1373 return -EHOSTUNREACH;
1376 /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
1377 * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
1378 ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1379 ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1381 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1382 if (hello_tx == NULL) {
1383 CERROR("Unable to allocate connect message for %s\n",
1384 libcfs_id2str(target));
1388 hello_tx->tx_acked = 1;
1389 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1390 target, sizeof(kptl_hello_msg_t));
1392 new_peer = kptllnd_peer_allocate(net, target, ptl_id);
1393 if (new_peer == NULL) {
1398 rc = kptllnd_peer_reserve_buffers();
1402 cfs_write_lock_irqsave(g_lock, flags);
1404 /* Called only in lnd_send which can't happen after lnd_shutdown */
1405 LASSERT (!net->net_shutdown);
1407 *peerp = kptllnd_id2peer_locked(target);
1408 if (*peerp != NULL) {
1409 cfs_write_unlock_irqrestore(g_lock, flags);
1413 kptllnd_cull_peertable_locked(target);
1415 if (kptllnd_data.kptl_n_active_peers ==
1416 kptllnd_data.kptl_expected_peers) {
1417 /* peer table full */
1418 cfs_write_unlock_irqrestore(g_lock, flags);
1420 kptllnd_peertable_overflow_msg("Connection to ", target);
1422 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1424 CERROR("Can't create connection to %s\n",
1425 libcfs_id2str(target));
1429 cfs_write_lock_irqsave(g_lock, flags);
1430 kptllnd_data.kptl_expected_peers++;
1434 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1436 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1437 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1438 *kptllnd_tunables.kptl_max_msg_size;
1440 new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1441 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1443 kptllnd_peer_add_peertable_locked(new_peer);
1445 cfs_write_unlock_irqrestore(g_lock, flags);
1447 /* NB someone else could get in now and post a message before I post
1448 * the HELLO, but post_tx/check_sends take care of that! */
1450 CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1451 libcfs_id2str(new_peer->peer_id), hello_tx);
1453 kptllnd_post_tx(new_peer, hello_tx, 0);
1454 kptllnd_peer_check_sends(new_peer);
1460 kptllnd_peer_unreserve_buffers();
1462 kptllnd_peer_decref(new_peer);
1464 kptllnd_tx_decref(hello_tx);