1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ptllnd/ptllnd_peer.c
38 * Author: PJ Kirner <pjkirner@clusterfs.com>
39 * Author: E Barton <eeb@bartonsoftware.com>
43 #include <libcfs/list.h>
46 kptllnd_count_queue(cfs_list_t *q)
51 cfs_list_for_each(e, q) {
59 kptllnd_get_peer_info(int index,
60 lnet_process_id_t *id,
61 int *state, int *sent_hello,
62 int *refcount, __u64 *incarnation,
63 __u64 *next_matchbits, __u64 *last_matchbits_seen,
64 int *nsendq, int *nactiveq,
65 int *credits, int *outstanding_credits)
67 cfs_rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
74 cfs_read_lock_irqsave(g_lock, flags);
76 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
77 cfs_list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
78 peer = cfs_list_entry(ptmp, kptl_peer_t, peer_list);
84 *state = peer->peer_state;
85 *sent_hello = peer->peer_sent_hello;
86 *refcount = cfs_atomic_read(&peer->peer_refcount);
87 *incarnation = peer->peer_incarnation;
89 cfs_spin_lock(&peer->peer_lock);
91 *next_matchbits = peer->peer_next_matchbits;
92 *last_matchbits_seen = peer->peer_last_matchbits_seen;
93 *credits = peer->peer_credits;
94 *outstanding_credits = peer->peer_outstanding_credits;
96 *nsendq = kptllnd_count_queue(&peer->peer_sendq);
97 *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
99 cfs_spin_unlock(&peer->peer_lock);
107 cfs_read_unlock_irqrestore(g_lock, flags);
112 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
114 LASSERT (kptllnd_data.kptl_n_active_peers <
115 kptllnd_data.kptl_expected_peers);
117 LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
118 peer->peer_state == PEER_STATE_ACTIVE);
120 kptllnd_data.kptl_n_active_peers++;
121 cfs_atomic_inc(&peer->peer_refcount); /* +1 ref for the list */
123 /* NB add to HEAD of peer list for MRU order!
124 * (see kptllnd_cull_peertable) */
125 cfs_list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
129 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
131 /* I'm about to add a new peer with this portals ID to the peer table,
132 * so (a) this peer should not exist already and (b) I want to leave at
133 * most (max_procs_per_nid - 1) peers with this NID in the table. */
134 cfs_list_t *peers = kptllnd_nid2peerlist(pid.nid);
135 int cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
142 cfs_list_for_each_safe (tmp, nxt, peers) {
143 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
145 peer = cfs_list_entry(tmp, kptl_peer_t, peer_list);
147 if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
150 LASSERT (peer->peer_id.pid != pid.pid);
154 if (count < cull_count) /* recent (don't cull) */
157 CDEBUG(D_NET, "Cull %s(%s)\n",
158 libcfs_id2str(peer->peer_id),
159 kptllnd_ptlid2str(peer->peer_ptlid));
161 kptllnd_peer_close_locked(peer, 0);
166 kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
171 LIBCFS_ALLOC(peer, sizeof (*peer));
173 CERROR("Can't create peer %s (%s)\n",
175 kptllnd_ptlid2str(ppid));
179 memset(peer, 0, sizeof(*peer)); /* zero flags etc */
181 CFS_INIT_LIST_HEAD (&peer->peer_noops);
182 CFS_INIT_LIST_HEAD (&peer->peer_sendq);
183 CFS_INIT_LIST_HEAD (&peer->peer_activeq);
184 cfs_spin_lock_init (&peer->peer_lock);
186 peer->peer_state = PEER_STATE_ALLOCATED;
187 peer->peer_error = 0;
188 peer->peer_last_alive = 0;
189 peer->peer_id = lpid;
190 peer->peer_ptlid = ppid;
191 peer->peer_credits = 1; /* enough for HELLO */
192 peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
193 peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
194 peer->peer_sent_credits = 1; /* HELLO credit is implicit */
195 peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
197 cfs_atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */
199 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
201 peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
203 /* Only increase # peers under lock, to guarantee we dont grow it
205 if (net->net_shutdown) {
206 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
208 LIBCFS_FREE(peer, sizeof(*peer));
212 kptllnd_data.kptl_npeers++;
213 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
218 kptllnd_peer_destroy (kptl_peer_t *peer)
222 CDEBUG(D_NET, "Peer=%p\n", peer);
224 LASSERT (!cfs_in_interrupt());
225 LASSERT (cfs_atomic_read(&peer->peer_refcount) == 0);
226 LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
227 peer->peer_state == PEER_STATE_ZOMBIE);
228 LASSERT (cfs_list_empty(&peer->peer_noops));
229 LASSERT (cfs_list_empty(&peer->peer_sendq));
230 LASSERT (cfs_list_empty(&peer->peer_activeq));
232 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
234 if (peer->peer_state == PEER_STATE_ZOMBIE)
235 cfs_list_del(&peer->peer_list);
237 kptllnd_data.kptl_npeers--;
239 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
241 LIBCFS_FREE (peer, sizeof (*peer));
245 kptllnd_cancel_txlist (cfs_list_t *peerq, cfs_list_t *txs)
251 cfs_list_for_each_safe (tmp, nxt, peerq) {
252 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
254 cfs_list_del(&tx->tx_list);
255 cfs_list_add_tail(&tx->tx_list, txs);
257 tx->tx_status = -EIO;
263 kptllnd_peer_cancel_txs(kptl_peer_t *peer, cfs_list_t *txs)
267 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
269 kptllnd_cancel_txlist(&peer->peer_noops, txs);
270 kptllnd_cancel_txlist(&peer->peer_sendq, txs);
271 kptllnd_cancel_txlist(&peer->peer_activeq, txs);
273 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
277 kptllnd_peer_alive (kptl_peer_t *peer)
279 /* This is racy, but everyone's only writing cfs_time_current() */
280 peer->peer_last_alive = cfs_time_current();
285 kptllnd_peer_notify (kptl_peer_t *peer)
293 cfs_time_t last_alive = 0;
295 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
297 if (peer->peer_error != 0) {
298 error = peer->peer_error;
299 peer->peer_error = 0;
300 last_alive = peer->peer_last_alive;
303 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
308 cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
309 cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
311 cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
313 if (nnets == 0) /* shutdown in progress */
316 LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
318 CERROR("Failed to allocate nets[%d]\n", nnets);
321 memset(nets, 0, nnets * sizeof(*nets));
323 cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
325 cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
328 kptllnd_net_addref(net);
331 cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
333 for (i = 0; i < nnets; i++) {
340 if (!net->net_shutdown) {
341 peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
342 peer->peer_ptlid.nid);
343 lnet_notify(net->net_ni, peer_nid, 0, last_alive);
346 kptllnd_net_decref(net);
349 LIBCFS_FREE(nets, nnets * sizeof(*nets));
353 kptllnd_handle_closing_peers ()
363 /* Check with a read lock first to avoid blocking anyone */
365 cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
366 idle = cfs_list_empty(&kptllnd_data.kptl_closing_peers) &&
367 cfs_list_empty(&kptllnd_data.kptl_zombie_peers);
368 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
373 CFS_INIT_LIST_HEAD(&txs);
375 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
377 /* Cancel txs on all zombie peers. NB anyone dropping the last peer
378 * ref removes it from this list, so I musn't drop the lock while
380 cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
381 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
383 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
385 kptllnd_peer_cancel_txs(peer, &txs);
388 /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB
389 * I'm the only one removing from this list, but peers can be added on
390 * the end any time I drop the lock. */
392 cfs_list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
393 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
395 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
397 cfs_list_del(&peer->peer_list);
398 cfs_list_add_tail(&peer->peer_list,
399 &kptllnd_data.kptl_zombie_peers);
400 peer->peer_state = PEER_STATE_ZOMBIE;
402 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
405 kptllnd_peer_notify(peer);
406 kptllnd_peer_cancel_txs(peer, &txs);
407 kptllnd_peer_decref(peer);
409 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
412 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
414 /* Drop peer's ref on all cancelled txs. This will get
415 * kptllnd_tx_fini() to abort outstanding comms if necessary. */
417 cfs_list_for_each_safe (tmp, nxt, &txs) {
418 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
419 cfs_list_del(&tx->tx_list);
420 kptllnd_tx_decref(tx);
425 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
427 switch (peer->peer_state) {
431 case PEER_STATE_WAITING_HELLO:
432 case PEER_STATE_ACTIVE:
433 /* Ensure new peers see a new incarnation of me */
434 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
435 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
436 kptllnd_data.kptl_incarnation++;
438 /* Removing from peer table */
439 kptllnd_data.kptl_n_active_peers--;
440 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
442 cfs_list_del(&peer->peer_list);
443 kptllnd_peer_unreserve_buffers();
445 peer->peer_error = why; /* stash 'why' only on first close */
446 peer->peer_state = PEER_STATE_CLOSING;
448 /* Schedule for immediate attention, taking peer table's ref */
449 cfs_list_add_tail(&peer->peer_list,
450 &kptllnd_data.kptl_closing_peers);
451 cfs_waitq_signal(&kptllnd_data.kptl_watchdog_waitq);
454 case PEER_STATE_ZOMBIE:
455 case PEER_STATE_CLOSING:
461 kptllnd_peer_close(kptl_peer_t *peer, int why)
465 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
466 kptllnd_peer_close_locked(peer, why);
467 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
471 kptllnd_peer_del(lnet_process_id_t id)
483 * Find the single bucket we are supposed to look at or if nid is a
484 * wildcard (LNET_NID_ANY) then look at all of the buckets
486 if (id.nid != LNET_NID_ANY) {
487 cfs_list_t *l = kptllnd_nid2peerlist(id.nid);
489 lo = hi = l - kptllnd_data.kptl_peers;
491 if (id.pid != LNET_PID_ANY)
495 hi = kptllnd_data.kptl_peer_hash_size - 1;
499 cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
501 for (i = lo; i <= hi; i++) {
502 cfs_list_for_each_safe (ptmp, pnxt,
503 &kptllnd_data.kptl_peers[i]) {
504 peer = cfs_list_entry (ptmp, kptl_peer_t, peer_list);
506 if (!(id.nid == LNET_NID_ANY ||
507 (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
508 (id.pid == LNET_PID_ANY ||
509 peer->peer_id.pid == id.pid))))
512 kptllnd_peer_addref(peer); /* 1 ref for me... */
514 cfs_read_unlock_irqrestore(&kptllnd_data. \
518 kptllnd_peer_close(peer, 0);
519 kptllnd_peer_decref(peer); /* ...until here */
521 rc = 0; /* matched something */
523 /* start again now I've dropped the lock */
528 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
534 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
536 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
539 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
541 /* Ensure HELLO is sent first */
542 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
543 cfs_list_add(&tx->tx_list, &peer->peer_noops);
544 else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
545 cfs_list_add(&tx->tx_list, &peer->peer_sendq);
547 cfs_list_add_tail(&tx->tx_list, &peer->peer_sendq);
549 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
554 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
556 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
557 ptl_handle_md_t msg_mdh;
561 LASSERT (!tx->tx_idle);
562 LASSERT (!tx->tx_active);
563 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
564 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
565 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
566 tx->tx_type == TX_TYPE_PUT_REQUEST ||
567 tx->tx_type == TX_TYPE_GET_REQUEST);
569 kptllnd_set_tx_peer(tx, peer);
571 memset(&md, 0, sizeof(md));
573 md.threshold = tx->tx_acked ? 2 : 1; /* SEND END + ACK? */
574 md.options = PTL_MD_OP_PUT |
575 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
576 PTL_MD_EVENT_START_DISABLE;
577 md.user_ptr = &tx->tx_msg_eventarg;
578 md.eq_handle = kptllnd_data.kptl_eqh;
581 md.start = tx->tx_msg;
582 md.length = tx->tx_msg->ptlm_nob;
585 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
587 md.start = tx->tx_frags;
589 md.options |= PTL_MD_IOVEC;
592 prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
594 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
595 libcfs_id2str(peer->peer_id),
596 kptllnd_errtype2str(prc), prc);
597 tx->tx_status = -EIO;
598 kptllnd_tx_decref(tx);
603 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ);
605 tx->tx_msg_mdh = msg_mdh;
606 kptllnd_queue_tx(peer, tx);
609 /* NB "restarts" comes from peer_sendq of a single peer */
611 kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target,
612 cfs_list_t *restarts)
618 LASSERT (!cfs_list_empty(restarts));
620 if (kptllnd_find_target(net, target, &peer) != 0)
623 cfs_list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
624 LASSERT (tx->tx_peer != NULL);
625 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
626 tx->tx_type == TX_TYPE_PUT_REQUEST ||
627 tx->tx_type == TX_TYPE_SMALL_MESSAGE);
629 cfs_list_del_init(&tx->tx_list);
632 tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
633 kptllnd_tx_decref(tx);
637 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
640 kptllnd_peer_decref(tx->tx_peer);
642 kptllnd_set_tx_peer(tx, peer);
643 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
649 kptllnd_peer_check_sends(peer);
650 kptllnd_peer_decref(peer);
654 kptllnd_peer_send_noop (kptl_peer_t *peer)
656 if (!peer->peer_sent_hello ||
657 peer->peer_credits == 0 ||
658 !cfs_list_empty(&peer->peer_noops) ||
659 peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
662 /* No tx to piggyback NOOP onto or no credit to send a tx */
663 return (cfs_list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
667 kptllnd_peer_check_sends (kptl_peer_t *peer)
675 LASSERT(!cfs_in_interrupt());
677 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
679 peer->peer_retry_noop = 0;
681 if (kptllnd_peer_send_noop(peer)) {
682 /* post a NOOP to return credits */
683 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
685 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
687 CERROR("Can't return credits to %s: can't allocate descriptor\n",
688 libcfs_id2str(peer->peer_id));
690 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
692 kptllnd_post_tx(peer, tx, 0);
695 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
696 peer->peer_retry_noop = (tx == NULL);
700 if (!cfs_list_empty(&peer->peer_noops)) {
701 LASSERT (peer->peer_sent_hello);
702 tx = cfs_list_entry(peer->peer_noops.next,
704 } else if (!cfs_list_empty(&peer->peer_sendq)) {
705 tx = cfs_list_entry(peer->peer_sendq.next,
708 /* nothing to send right now */
712 LASSERT (tx->tx_active);
713 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
714 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
716 LASSERT (peer->peer_outstanding_credits >= 0);
717 LASSERT (peer->peer_sent_credits >= 0);
718 LASSERT (peer->peer_sent_credits +
719 peer->peer_outstanding_credits <=
720 *kptllnd_tunables.kptl_peertxcredits);
721 LASSERT (peer->peer_credits >= 0);
723 msg_type = tx->tx_msg->ptlm_type;
725 /* Ensure HELLO is sent first */
726 if (!peer->peer_sent_hello) {
727 LASSERT (cfs_list_empty(&peer->peer_noops));
728 if (msg_type != PTLLND_MSG_TYPE_HELLO)
730 peer->peer_sent_hello = 1;
733 if (peer->peer_credits == 0) {
734 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
735 libcfs_id2str(peer->peer_id),
737 peer->peer_outstanding_credits,
738 peer->peer_sent_credits,
739 kptllnd_msgtype2str(msg_type), tx);
743 /* Last/Initial credit reserved for NOOP/HELLO */
744 if (peer->peer_credits == 1 &&
745 msg_type != PTLLND_MSG_TYPE_HELLO &&
746 msg_type != PTLLND_MSG_TYPE_NOOP) {
747 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
748 "not using last credit for %s[%p]\n",
749 libcfs_id2str(peer->peer_id),
751 peer->peer_outstanding_credits,
752 peer->peer_sent_credits,
753 kptllnd_msgtype2str(msg_type), tx);
757 cfs_list_del(&tx->tx_list);
759 /* Discard any NOOP I queued if I'm not at the high-water mark
760 * any more or more messages have been queued */
761 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
762 !kptllnd_peer_send_noop(peer)) {
765 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
767 CDEBUG(D_NET, "%s: redundant noop\n",
768 libcfs_id2str(peer->peer_id));
769 kptllnd_tx_decref(tx);
771 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
775 /* fill last-minute msg fields */
776 kptllnd_msg_pack(tx->tx_msg, peer);
778 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
779 tx->tx_type == TX_TYPE_GET_REQUEST) {
780 /* peer_next_matchbits must be known good */
781 LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
782 /* Assume 64-bit matchbits can't wrap */
783 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
784 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
785 peer->peer_next_matchbits++;
788 peer->peer_sent_credits += peer->peer_outstanding_credits;
789 peer->peer_outstanding_credits = 0;
790 peer->peer_credits--;
792 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
793 libcfs_id2str(peer->peer_id), peer->peer_credits,
794 peer->peer_outstanding_credits, peer->peer_sent_credits,
795 kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
796 tx->tx_msg->ptlm_credits);
798 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
800 kptllnd_tx_addref(tx); /* 1 ref for me... */
802 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
804 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
805 tx->tx_type == TX_TYPE_GET_REQUEST) {
806 /* Post bulk now we have safe matchbits */
807 rc = PtlMEAttach(kptllnd_data.kptl_nih,
808 *kptllnd_tunables.kptl_portal,
810 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
816 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
817 libcfs_id2str(peer->peer_id),
818 kptllnd_errtype2str(rc), rc);
822 rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
825 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
826 libcfs_id2str(tx->tx_peer->peer_id),
827 kptllnd_errtype2str(rc), rc);
828 rc = PtlMEUnlink(meh);
829 LASSERT(rc == PTL_OK);
830 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
833 /* I'm not racing with the event callback here. It's a
834 * bug if there's an event on the MD I just attached
835 * before I actually send the RDMA request message -
836 * probably matchbits re-used in error. */
839 tx->tx_tposted = jiffies; /* going on the wire */
841 rc = PtlPut (tx->tx_msg_mdh,
842 tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
844 *kptllnd_tunables.kptl_portal,
848 0); /* header data */
850 CERROR("PtlPut %s error %s(%d)\n",
851 libcfs_id2str(peer->peer_id),
852 kptllnd_errtype2str(rc), rc);
856 kptllnd_tx_decref(tx); /* drop my ref */
858 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
861 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
865 /* Nuke everything (including tx we were trying) */
866 kptllnd_peer_close(peer, -EIO);
867 kptllnd_tx_decref(tx);
868 kptllnd_schedule_ptltrace_dump();
872 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
877 cfs_list_for_each(ele, &peer->peer_sendq) {
878 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
880 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
881 kptllnd_tx_addref(tx);
886 cfs_list_for_each(ele, &peer->peer_activeq) {
887 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
889 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
890 kptllnd_tx_addref(tx);
900 kptllnd_peer_check_bucket (int idx, int stamp)
902 cfs_list_t *peers = &kptllnd_data.kptl_peers[idx];
906 CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
909 /* NB. Shared lock while I just look */
910 cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
912 cfs_list_for_each_entry (peer, peers, peer_list) {
915 int c = -1, oc = -1, sc = -1;
916 int nsend = -1, nactive = -1;
917 int sent_hello = -1, state = -1;
919 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
920 libcfs_id2str(peer->peer_id), peer->peer_credits,
921 peer->peer_outstanding_credits, peer->peer_sent_credits);
923 cfs_spin_lock(&peer->peer_lock);
925 if (peer->peer_check_stamp == stamp) {
926 /* checked already this pass */
927 cfs_spin_unlock(&peer->peer_lock);
931 peer->peer_check_stamp = stamp;
932 tx = kptllnd_find_timed_out_tx(peer);
933 check_sends = peer->peer_retry_noop;
936 c = peer->peer_credits;
937 sc = peer->peer_sent_credits;
938 oc = peer->peer_outstanding_credits;
939 state = peer->peer_state;
940 sent_hello = peer->peer_sent_hello;
941 nsend = kptllnd_count_queue(&peer->peer_sendq);
942 nactive = kptllnd_count_queue(&peer->peer_activeq);
945 cfs_spin_unlock(&peer->peer_lock);
947 if (tx == NULL && !check_sends)
950 kptllnd_peer_addref(peer); /* 1 ref for me... */
952 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
955 if (tx == NULL) { /* nothing timed out */
956 kptllnd_peer_check_sends(peer);
957 kptllnd_peer_decref(peer); /* ...until here or... */
959 /* rescan after dropping the lock */
963 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
964 libcfs_id2str(peer->peer_id),
965 (tx->tx_tposted == 0) ?
966 "no free peer buffers" :
967 "please check Portals");
969 if (tx->tx_tposted) {
970 CERROR("Could not send to %s after %ds (sent %lds ago); "
971 "check Portals for possible issues\n",
972 libcfs_id2str(peer->peer_id),
973 *kptllnd_tunables.kptl_timeout,
974 cfs_duration_sec(jiffies - tx->tx_tposted));
975 } else if (state < PEER_STATE_ACTIVE) {
976 CERROR("Could not connect %s (%d) after %ds; "
977 "peer might be down\n",
978 libcfs_id2str(peer->peer_id), state,
979 *kptllnd_tunables.kptl_timeout);
981 CERROR("Could not get credits for %s after %ds; "
982 "possible Lustre networking issues\n",
983 libcfs_id2str(peer->peer_id),
984 *kptllnd_tunables.kptl_timeout);
987 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
988 "state %d, sent_hello %d, sendq %d, activeq %d "
989 "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
990 libcfs_id2str(peer->peer_id), c, oc, sc,
991 state, sent_hello, nsend, nactive,
992 tx, kptllnd_tx_typestr(tx->tx_type),
993 kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
994 tx->tx_active ? "A" : "",
995 PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
997 PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
1000 (tx->tx_tposted == 0) ? "not " : "",
1001 (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
1002 *kptllnd_tunables.kptl_timeout);
1005 if (*kptllnd_tunables.kptl_ptltrace_on_timeout)
1006 kptllnd_dump_ptltrace();
1009 kptllnd_tx_decref(tx);
1011 kptllnd_peer_close(peer, -ETIMEDOUT);
1012 kptllnd_peer_decref(peer); /* ...until here */
1014 /* start again now I've dropped the lock */
1018 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
1022 kptllnd_id2peer_locked (lnet_process_id_t id)
1024 cfs_list_t *peers = kptllnd_nid2peerlist(id.nid);
1028 cfs_list_for_each (tmp, peers) {
1029 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1031 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
1032 peer->peer_state == PEER_STATE_ACTIVE);
1034 /* NB logical LNet peers share one kptl_peer_t */
1035 if (peer->peer_id.pid != id.pid ||
1036 LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
1039 kptllnd_peer_addref(peer);
1041 CDEBUG(D_NET, "%s -> %s (%d)\n",
1043 kptllnd_ptlid2str(peer->peer_ptlid),
1044 cfs_atomic_read (&peer->peer_refcount));
1052 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
1054 LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
1055 "messages may be dropped\n",
1056 str, libcfs_id2str(id),
1057 kptllnd_data.kptl_n_active_peers);
1058 LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1059 "'max_nodes' or 'max_procs_per_node'\n");
1063 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1068 /* Find the last matchbits I saw this new peer using. Note..
1069 A. This peer cannot be in the peer table - she's new!
1070 B. If I can't find the peer in the closing/zombie peers, all
1071 matchbits are safe because all refs to the (old) peer have gone
1072 so all txs have completed so there's no risk of matchbit
1076 LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1078 /* peer's last matchbits can't change after it comes out of the peer
1079 * table, so first match is fine */
1081 cfs_list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1082 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1084 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1085 peer->peer_id.pid == lpid.pid)
1086 return peer->peer_last_matchbits_seen;
1089 cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1090 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1092 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1093 peer->peer_id.pid == lpid.pid)
1094 return peer->peer_last_matchbits_seen;
1097 return PTL_RESERVED_MATCHBITS;
1101 kptllnd_peer_handle_hello (kptl_net_t *net,
1102 ptl_process_id_t initiator, kptl_msg_t *msg)
1104 cfs_rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1106 kptl_peer_t *new_peer;
1107 lnet_process_id_t lpid;
1108 unsigned long flags;
1109 kptl_tx_t *hello_tx;
1111 __u64 safe_matchbits;
1112 __u64 last_matchbits_seen;
1114 lpid.nid = msg->ptlm_srcnid;
1115 lpid.pid = msg->ptlm_srcpid;
1117 CDEBUG(D_NET, "hello from %s(%s)\n",
1118 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1120 if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1121 (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1122 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1123 * userspace. Refuse the connection if she hasn't set the
1124 * correct flag in her PID... */
1125 CERROR("Userflag not set in hello from %s (%s)\n",
1126 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1130 /* kptlhm_matchbits are the highest matchbits my peer may have used to
1131 * RDMA to me. I ensure I never register buffers for RDMA that could
1132 * match any she used */
1133 safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1135 if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1136 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1137 safe_matchbits, libcfs_id2str(lpid));
1141 if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1142 CERROR("%s: max message size %d < MIN %d",
1143 libcfs_id2str(lpid),
1144 msg->ptlm_u.hello.kptlhm_max_msg_size,
1145 PTLLND_MIN_BUFFER_SIZE);
1149 if (msg->ptlm_credits <= 1) {
1150 CERROR("Need more than 1+%d credits from %s\n",
1151 msg->ptlm_credits, libcfs_id2str(lpid));
1155 cfs_write_lock_irqsave(g_lock, flags);
1157 peer = kptllnd_id2peer_locked(lpid);
1159 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1160 /* Completing HELLO handshake */
1161 LASSERT(peer->peer_incarnation == 0);
1163 if (msg->ptlm_dststamp != 0 &&
1164 msg->ptlm_dststamp != peer->peer_myincarnation) {
1165 cfs_write_unlock_irqrestore(g_lock, flags);
1167 CERROR("Ignoring HELLO from %s: unexpected "
1168 "dststamp "LPX64" ("LPX64" wanted)\n",
1169 libcfs_id2str(lpid),
1171 peer->peer_myincarnation);
1172 kptllnd_peer_decref(peer);
1176 /* Concurrent initiation or response to my HELLO */
1177 peer->peer_state = PEER_STATE_ACTIVE;
1178 peer->peer_incarnation = msg->ptlm_srcstamp;
1179 peer->peer_next_matchbits = safe_matchbits;
1180 peer->peer_max_msg_size =
1181 msg->ptlm_u.hello.kptlhm_max_msg_size;
1183 cfs_write_unlock_irqrestore(g_lock, flags);
1187 if (msg->ptlm_dststamp != 0 &&
1188 msg->ptlm_dststamp <= peer->peer_myincarnation) {
1189 cfs_write_unlock_irqrestore(g_lock, flags);
1191 CERROR("Ignoring stale HELLO from %s: "
1192 "dststamp "LPX64" (current "LPX64")\n",
1193 libcfs_id2str(lpid),
1195 peer->peer_myincarnation);
1196 kptllnd_peer_decref(peer);
1200 /* Brand new connection attempt: remove old incarnation */
1201 kptllnd_peer_close_locked(peer, 0);
1204 kptllnd_cull_peertable_locked(lpid);
1206 cfs_write_unlock_irqrestore(g_lock, flags);
1209 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1210 " stamp "LPX64"("LPX64")\n",
1211 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1212 msg->ptlm_srcstamp, peer->peer_incarnation);
1214 kptllnd_peer_decref(peer);
1218 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1219 if (hello_tx == NULL) {
1220 CERROR("Unable to allocate HELLO message for %s\n",
1221 libcfs_id2str(lpid));
1225 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1226 lpid, sizeof(kptl_hello_msg_t));
1228 new_peer = kptllnd_peer_allocate(net, lpid, initiator);
1229 if (new_peer == NULL) {
1230 kptllnd_tx_decref(hello_tx);
1234 rc = kptllnd_peer_reserve_buffers();
1236 kptllnd_peer_decref(new_peer);
1237 kptllnd_tx_decref(hello_tx);
1239 CERROR("Failed to reserve buffers for %s\n",
1240 libcfs_id2str(lpid));
1244 cfs_write_lock_irqsave(g_lock, flags);
1247 if (net->net_shutdown) {
1248 cfs_write_unlock_irqrestore(g_lock, flags);
1250 CERROR ("Shutdown started, refusing connection from %s\n",
1251 libcfs_id2str(lpid));
1252 kptllnd_peer_unreserve_buffers();
1253 kptllnd_peer_decref(new_peer);
1254 kptllnd_tx_decref(hello_tx);
1258 peer = kptllnd_id2peer_locked(lpid);
1260 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1261 /* An outgoing message instantiated 'peer' for me */
1262 LASSERT(peer->peer_incarnation == 0);
1264 peer->peer_state = PEER_STATE_ACTIVE;
1265 peer->peer_incarnation = msg->ptlm_srcstamp;
1266 peer->peer_next_matchbits = safe_matchbits;
1267 peer->peer_max_msg_size =
1268 msg->ptlm_u.hello.kptlhm_max_msg_size;
1270 cfs_write_unlock_irqrestore(g_lock, flags);
1272 CWARN("Outgoing instantiated peer %s\n",
1273 libcfs_id2str(lpid));
1275 LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1277 cfs_write_unlock_irqrestore(g_lock, flags);
1279 /* WOW! Somehow this peer completed the HELLO
1280 * handshake while I slept. I guess I could have slept
1281 * while it rebooted and sent a new HELLO, so I'll fail
1283 CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1284 kptllnd_peer_decref(peer);
1288 kptllnd_peer_unreserve_buffers();
1289 kptllnd_peer_decref(new_peer);
1290 kptllnd_tx_decref(hello_tx);
1294 if (kptllnd_data.kptl_n_active_peers ==
1295 kptllnd_data.kptl_expected_peers) {
1296 /* peer table full */
1297 cfs_write_unlock_irqrestore(g_lock, flags);
1299 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1301 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1303 CERROR("Refusing connection from %s\n",
1304 libcfs_id2str(lpid));
1305 kptllnd_peer_unreserve_buffers();
1306 kptllnd_peer_decref(new_peer);
1307 kptllnd_tx_decref(hello_tx);
1311 cfs_write_lock_irqsave(g_lock, flags);
1312 kptllnd_data.kptl_expected_peers++;
1316 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1318 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1319 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1320 *kptllnd_tunables.kptl_max_msg_size;
1322 new_peer->peer_state = PEER_STATE_ACTIVE;
1323 new_peer->peer_incarnation = msg->ptlm_srcstamp;
1324 new_peer->peer_next_matchbits = safe_matchbits;
1325 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1326 new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1328 LASSERT (!net->net_shutdown);
1329 kptllnd_peer_add_peertable_locked(new_peer);
1331 cfs_write_unlock_irqrestore(g_lock, flags);
1333 /* NB someone else could get in now and post a message before I post
1334 * the HELLO, but post_tx/check_sends take care of that! */
1336 CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1337 libcfs_id2str(new_peer->peer_id), hello_tx);
1339 kptllnd_post_tx(new_peer, hello_tx, 0);
1340 kptllnd_peer_check_sends(new_peer);
1346 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1348 kptllnd_post_tx(peer, tx, nfrag);
1349 kptllnd_peer_check_sends(peer);
1353 kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
1354 kptl_peer_t **peerp)
1356 cfs_rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1357 ptl_process_id_t ptl_id;
1358 kptl_peer_t *new_peer;
1359 kptl_tx_t *hello_tx;
1360 unsigned long flags;
1362 __u64 last_matchbits_seen;
1364 /* I expect to find the peer, so I only take a read lock... */
1365 cfs_read_lock_irqsave(g_lock, flags);
1366 *peerp = kptllnd_id2peer_locked(target);
1367 cfs_read_unlock_irqrestore(g_lock, flags);
1372 if ((target.pid & LNET_PID_USERFLAG) != 0) {
1373 CWARN("Refusing to create a new connection to %s "
1374 "(non-kernel peer)\n", libcfs_id2str(target));
1375 return -EHOSTUNREACH;
1378 /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
1379 * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
1380 ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1381 ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1383 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1384 if (hello_tx == NULL) {
1385 CERROR("Unable to allocate connect message for %s\n",
1386 libcfs_id2str(target));
1390 hello_tx->tx_acked = 1;
1391 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1392 target, sizeof(kptl_hello_msg_t));
1394 new_peer = kptllnd_peer_allocate(net, target, ptl_id);
1395 if (new_peer == NULL) {
1400 rc = kptllnd_peer_reserve_buffers();
1404 cfs_write_lock_irqsave(g_lock, flags);
1406 /* Called only in lnd_send which can't happen after lnd_shutdown */
1407 LASSERT (!net->net_shutdown);
1409 *peerp = kptllnd_id2peer_locked(target);
1410 if (*peerp != NULL) {
1411 cfs_write_unlock_irqrestore(g_lock, flags);
1415 kptllnd_cull_peertable_locked(target);
1417 if (kptllnd_data.kptl_n_active_peers ==
1418 kptllnd_data.kptl_expected_peers) {
1419 /* peer table full */
1420 cfs_write_unlock_irqrestore(g_lock, flags);
1422 kptllnd_peertable_overflow_msg("Connection to ", target);
1424 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1426 CERROR("Can't create connection to %s\n",
1427 libcfs_id2str(target));
1431 cfs_write_lock_irqsave(g_lock, flags);
1432 kptllnd_data.kptl_expected_peers++;
1436 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1438 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1439 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1440 *kptllnd_tunables.kptl_max_msg_size;
1442 new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1443 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1445 kptllnd_peer_add_peertable_locked(new_peer);
1447 cfs_write_unlock_irqrestore(g_lock, flags);
1449 /* NB someone else could get in now and post a message before I post
1450 * the HELLO, but post_tx/check_sends take care of that! */
1452 CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1453 libcfs_id2str(new_peer->peer_id), hello_tx);
1455 kptllnd_post_tx(new_peer, hello_tx, 0);
1456 kptllnd_peer_check_sends(new_peer);
1462 kptllnd_peer_unreserve_buffers();
1464 kptllnd_peer_decref(new_peer);
1466 kptllnd_tx_decref(hello_tx);