4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lnet/klnds/ptllnd/ptllnd_peer.c
36 * Author: PJ Kirner <pjkirner@clusterfs.com>
37 * Author: E Barton <eeb@bartonsoftware.com>
41 #include <libcfs/list.h>
44 kptllnd_count_queue(cfs_list_t *q)
49 cfs_list_for_each(e, q) {
57 kptllnd_get_peer_info(int index,
58 lnet_process_id_t *id,
59 int *state, int *sent_hello,
60 int *refcount, __u64 *incarnation,
61 __u64 *next_matchbits, __u64 *last_matchbits_seen,
62 int *nsendq, int *nactiveq,
63 int *credits, int *outstanding_credits)
65 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
72 read_lock_irqsave(g_lock, flags);
74 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
75 cfs_list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
76 peer = cfs_list_entry(ptmp, kptl_peer_t, peer_list);
82 *state = peer->peer_state;
83 *sent_hello = peer->peer_sent_hello;
84 *refcount = cfs_atomic_read(&peer->peer_refcount);
85 *incarnation = peer->peer_incarnation;
87 spin_lock(&peer->peer_lock);
89 *next_matchbits = peer->peer_next_matchbits;
90 *last_matchbits_seen = peer->peer_last_matchbits_seen;
91 *credits = peer->peer_credits;
92 *outstanding_credits = peer->peer_outstanding_credits;
94 *nsendq = kptllnd_count_queue(&peer->peer_sendq);
95 *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
97 spin_unlock(&peer->peer_lock);
105 read_unlock_irqrestore(g_lock, flags);
110 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
112 LASSERT (kptllnd_data.kptl_n_active_peers <
113 kptllnd_data.kptl_expected_peers);
115 LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
116 peer->peer_state == PEER_STATE_ACTIVE);
118 kptllnd_data.kptl_n_active_peers++;
119 cfs_atomic_inc(&peer->peer_refcount); /* +1 ref for the list */
121 /* NB add to HEAD of peer list for MRU order!
122 * (see kptllnd_cull_peertable) */
123 cfs_list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
127 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
129 /* I'm about to add a new peer with this portals ID to the peer table,
130 * so (a) this peer should not exist already and (b) I want to leave at
131 * most (max_procs_per_nid - 1) peers with this NID in the table. */
132 cfs_list_t *peers = kptllnd_nid2peerlist(pid.nid);
133 int cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
140 cfs_list_for_each_safe (tmp, nxt, peers) {
141 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
143 peer = cfs_list_entry(tmp, kptl_peer_t, peer_list);
145 if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
148 LASSERT (peer->peer_id.pid != pid.pid);
152 if (count < cull_count) /* recent (don't cull) */
155 CDEBUG(D_NET, "Cull %s(%s)\n",
156 libcfs_id2str(peer->peer_id),
157 kptllnd_ptlid2str(peer->peer_ptlid));
159 kptllnd_peer_close_locked(peer, 0);
164 kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
169 LIBCFS_ALLOC(peer, sizeof (*peer));
171 CERROR("Can't create peer %s (%s)\n",
173 kptllnd_ptlid2str(ppid));
177 memset(peer, 0, sizeof(*peer)); /* zero flags etc */
179 CFS_INIT_LIST_HEAD (&peer->peer_noops);
180 CFS_INIT_LIST_HEAD (&peer->peer_sendq);
181 CFS_INIT_LIST_HEAD (&peer->peer_activeq);
182 spin_lock_init(&peer->peer_lock);
184 peer->peer_state = PEER_STATE_ALLOCATED;
185 peer->peer_error = 0;
186 peer->peer_last_alive = 0;
187 peer->peer_id = lpid;
188 peer->peer_ptlid = ppid;
189 peer->peer_credits = 1; /* enough for HELLO */
190 peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
191 peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
192 peer->peer_sent_credits = 1; /* HELLO credit is implicit */
193 peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
195 cfs_atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */
197 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
199 peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
201 /* Only increase # peers under lock, to guarantee we dont grow it
203 if (net->net_shutdown) {
204 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
206 LIBCFS_FREE(peer, sizeof(*peer));
210 kptllnd_data.kptl_npeers++;
211 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
216 kptllnd_peer_destroy (kptl_peer_t *peer)
220 CDEBUG(D_NET, "Peer=%p\n", peer);
222 LASSERT (!cfs_in_interrupt());
223 LASSERT (cfs_atomic_read(&peer->peer_refcount) == 0);
224 LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
225 peer->peer_state == PEER_STATE_ZOMBIE);
226 LASSERT (cfs_list_empty(&peer->peer_noops));
227 LASSERT (cfs_list_empty(&peer->peer_sendq));
228 LASSERT (cfs_list_empty(&peer->peer_activeq));
230 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
232 if (peer->peer_state == PEER_STATE_ZOMBIE)
233 cfs_list_del(&peer->peer_list);
235 kptllnd_data.kptl_npeers--;
237 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
239 LIBCFS_FREE (peer, sizeof (*peer));
243 kptllnd_cancel_txlist (cfs_list_t *peerq, cfs_list_t *txs)
249 cfs_list_for_each_safe (tmp, nxt, peerq) {
250 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
252 cfs_list_del(&tx->tx_list);
253 cfs_list_add_tail(&tx->tx_list, txs);
255 tx->tx_status = -EIO;
261 kptllnd_peer_cancel_txs(kptl_peer_t *peer, cfs_list_t *txs)
265 spin_lock_irqsave(&peer->peer_lock, flags);
267 kptllnd_cancel_txlist(&peer->peer_noops, txs);
268 kptllnd_cancel_txlist(&peer->peer_sendq, txs);
269 kptllnd_cancel_txlist(&peer->peer_activeq, txs);
271 spin_unlock_irqrestore(&peer->peer_lock, flags);
275 kptllnd_peer_alive (kptl_peer_t *peer)
277 /* This is racy, but everyone's only writing cfs_time_current() */
278 peer->peer_last_alive = cfs_time_current();
283 kptllnd_peer_notify (kptl_peer_t *peer)
291 cfs_time_t last_alive = 0;
293 spin_lock_irqsave(&peer->peer_lock, flags);
295 if (peer->peer_error != 0) {
296 error = peer->peer_error;
297 peer->peer_error = 0;
298 last_alive = peer->peer_last_alive;
301 spin_unlock_irqrestore(&peer->peer_lock, flags);
306 read_lock(&kptllnd_data.kptl_net_rw_lock);
307 cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
309 read_unlock(&kptllnd_data.kptl_net_rw_lock);
311 if (nnets == 0) /* shutdown in progress */
314 LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
316 CERROR("Failed to allocate nets[%d]\n", nnets);
319 memset(nets, 0, nnets * sizeof(*nets));
321 read_lock(&kptllnd_data.kptl_net_rw_lock);
323 cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
326 kptllnd_net_addref(net);
329 read_unlock(&kptllnd_data.kptl_net_rw_lock);
331 for (i = 0; i < nnets; i++) {
338 if (!net->net_shutdown) {
339 peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
340 peer->peer_ptlid.nid);
341 lnet_notify(net->net_ni, peer_nid, 0, last_alive);
344 kptllnd_net_decref(net);
347 LIBCFS_FREE(nets, nnets * sizeof(*nets));
351 kptllnd_handle_closing_peers ()
361 /* Check with a read lock first to avoid blocking anyone */
363 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
364 idle = cfs_list_empty(&kptllnd_data.kptl_closing_peers) &&
365 cfs_list_empty(&kptllnd_data.kptl_zombie_peers);
366 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
371 CFS_INIT_LIST_HEAD(&txs);
373 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
375 /* Cancel txs on all zombie peers. NB anyone dropping the last peer
376 * ref removes it from this list, so I musn't drop the lock while
378 cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
379 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
381 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
383 kptllnd_peer_cancel_txs(peer, &txs);
386 /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB
387 * I'm the only one removing from this list, but peers can be added on
388 * the end any time I drop the lock. */
390 cfs_list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
391 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
393 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
395 cfs_list_del(&peer->peer_list);
396 cfs_list_add_tail(&peer->peer_list,
397 &kptllnd_data.kptl_zombie_peers);
398 peer->peer_state = PEER_STATE_ZOMBIE;
400 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
403 kptllnd_peer_notify(peer);
404 kptllnd_peer_cancel_txs(peer, &txs);
405 kptllnd_peer_decref(peer);
407 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
410 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
412 /* Drop peer's ref on all cancelled txs. This will get
413 * kptllnd_tx_fini() to abort outstanding comms if necessary. */
415 cfs_list_for_each_safe (tmp, nxt, &txs) {
416 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
417 cfs_list_del(&tx->tx_list);
418 kptllnd_tx_decref(tx);
423 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
425 switch (peer->peer_state) {
429 case PEER_STATE_WAITING_HELLO:
430 case PEER_STATE_ACTIVE:
431 /* Ensure new peers see a new incarnation of me */
432 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
433 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
434 kptllnd_data.kptl_incarnation++;
436 /* Removing from peer table */
437 kptllnd_data.kptl_n_active_peers--;
438 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
440 cfs_list_del(&peer->peer_list);
441 kptllnd_peer_unreserve_buffers();
443 peer->peer_error = why; /* stash 'why' only on first close */
444 peer->peer_state = PEER_STATE_CLOSING;
446 /* Schedule for immediate attention, taking peer table's ref */
447 cfs_list_add_tail(&peer->peer_list,
448 &kptllnd_data.kptl_closing_peers);
449 cfs_waitq_signal(&kptllnd_data.kptl_watchdog_waitq);
452 case PEER_STATE_ZOMBIE:
453 case PEER_STATE_CLOSING:
459 kptllnd_peer_close(kptl_peer_t *peer, int why)
463 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
464 kptllnd_peer_close_locked(peer, why);
465 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
469 kptllnd_peer_del(lnet_process_id_t id)
481 * Find the single bucket we are supposed to look at or if nid is a
482 * wildcard (LNET_NID_ANY) then look at all of the buckets
484 if (id.nid != LNET_NID_ANY) {
485 cfs_list_t *l = kptllnd_nid2peerlist(id.nid);
487 lo = hi = l - kptllnd_data.kptl_peers;
489 if (id.pid != LNET_PID_ANY)
493 hi = kptllnd_data.kptl_peer_hash_size - 1;
497 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
499 for (i = lo; i <= hi; i++) {
500 cfs_list_for_each_safe (ptmp, pnxt,
501 &kptllnd_data.kptl_peers[i]) {
502 peer = cfs_list_entry (ptmp, kptl_peer_t, peer_list);
504 if (!(id.nid == LNET_NID_ANY ||
505 (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
506 (id.pid == LNET_PID_ANY ||
507 peer->peer_id.pid == id.pid))))
510 kptllnd_peer_addref(peer); /* 1 ref for me... */
512 read_unlock_irqrestore(&kptllnd_data. \
516 kptllnd_peer_close(peer, 0);
517 kptllnd_peer_decref(peer); /* ...until here */
519 rc = 0; /* matched something */
521 /* start again now I've dropped the lock */
526 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
532 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
534 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
537 spin_lock_irqsave(&peer->peer_lock, flags);
539 /* Ensure HELLO is sent first */
540 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
541 cfs_list_add(&tx->tx_list, &peer->peer_noops);
542 else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
543 cfs_list_add(&tx->tx_list, &peer->peer_sendq);
545 cfs_list_add_tail(&tx->tx_list, &peer->peer_sendq);
547 spin_unlock_irqrestore(&peer->peer_lock, flags);
552 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
554 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
555 ptl_handle_md_t msg_mdh;
559 LASSERT (!tx->tx_idle);
560 LASSERT (!tx->tx_active);
561 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
562 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
563 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
564 tx->tx_type == TX_TYPE_PUT_REQUEST ||
565 tx->tx_type == TX_TYPE_GET_REQUEST);
567 kptllnd_set_tx_peer(tx, peer);
569 memset(&md, 0, sizeof(md));
571 md.threshold = tx->tx_acked ? 2 : 1; /* SEND END + ACK? */
572 md.options = PTL_MD_OP_PUT |
573 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
574 PTL_MD_EVENT_START_DISABLE;
575 md.user_ptr = &tx->tx_msg_eventarg;
576 md.eq_handle = kptllnd_data.kptl_eqh;
579 md.start = tx->tx_msg;
580 md.length = tx->tx_msg->ptlm_nob;
583 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
585 md.start = tx->tx_frags;
587 md.options |= PTL_MD_IOVEC;
590 prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
592 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
593 libcfs_id2str(peer->peer_id),
594 kptllnd_errtype2str(prc), prc);
595 tx->tx_status = -EIO;
596 kptllnd_tx_decref(tx);
601 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ);
603 tx->tx_msg_mdh = msg_mdh;
604 kptllnd_queue_tx(peer, tx);
607 /* NB "restarts" comes from peer_sendq of a single peer */
609 kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target,
610 cfs_list_t *restarts)
616 LASSERT (!cfs_list_empty(restarts));
618 if (kptllnd_find_target(net, target, &peer) != 0)
621 cfs_list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
622 LASSERT (tx->tx_peer != NULL);
623 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
624 tx->tx_type == TX_TYPE_PUT_REQUEST ||
625 tx->tx_type == TX_TYPE_SMALL_MESSAGE);
627 cfs_list_del_init(&tx->tx_list);
630 tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
631 kptllnd_tx_decref(tx);
635 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
638 kptllnd_peer_decref(tx->tx_peer);
640 kptllnd_set_tx_peer(tx, peer);
641 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
647 kptllnd_peer_check_sends(peer);
648 kptllnd_peer_decref(peer);
652 kptllnd_peer_send_noop (kptl_peer_t *peer)
654 if (!peer->peer_sent_hello ||
655 peer->peer_credits == 0 ||
656 !cfs_list_empty(&peer->peer_noops) ||
657 peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
660 /* No tx to piggyback NOOP onto or no credit to send a tx */
661 return (cfs_list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
665 kptllnd_peer_check_sends (kptl_peer_t *peer)
673 LASSERT(!cfs_in_interrupt());
675 spin_lock_irqsave(&peer->peer_lock, flags);
677 peer->peer_retry_noop = 0;
679 if (kptllnd_peer_send_noop(peer)) {
680 /* post a NOOP to return credits */
681 spin_unlock_irqrestore(&peer->peer_lock, flags);
683 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
685 CERROR("Can't return credits to %s: can't allocate descriptor\n",
686 libcfs_id2str(peer->peer_id));
688 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
690 kptllnd_post_tx(peer, tx, 0);
693 spin_lock_irqsave(&peer->peer_lock, flags);
694 peer->peer_retry_noop = (tx == NULL);
698 if (!cfs_list_empty(&peer->peer_noops)) {
699 LASSERT (peer->peer_sent_hello);
700 tx = cfs_list_entry(peer->peer_noops.next,
702 } else if (!cfs_list_empty(&peer->peer_sendq)) {
703 tx = cfs_list_entry(peer->peer_sendq.next,
706 /* nothing to send right now */
710 LASSERT (tx->tx_active);
711 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
712 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
714 LASSERT (peer->peer_outstanding_credits >= 0);
715 LASSERT (peer->peer_sent_credits >= 0);
716 LASSERT (peer->peer_sent_credits +
717 peer->peer_outstanding_credits <=
718 *kptllnd_tunables.kptl_peertxcredits);
719 LASSERT (peer->peer_credits >= 0);
721 msg_type = tx->tx_msg->ptlm_type;
723 /* Ensure HELLO is sent first */
724 if (!peer->peer_sent_hello) {
725 LASSERT (cfs_list_empty(&peer->peer_noops));
726 if (msg_type != PTLLND_MSG_TYPE_HELLO)
728 peer->peer_sent_hello = 1;
731 if (peer->peer_credits == 0) {
732 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
733 libcfs_id2str(peer->peer_id),
735 peer->peer_outstanding_credits,
736 peer->peer_sent_credits,
737 kptllnd_msgtype2str(msg_type), tx);
741 /* Last/Initial credit reserved for NOOP/HELLO */
742 if (peer->peer_credits == 1 &&
743 msg_type != PTLLND_MSG_TYPE_HELLO &&
744 msg_type != PTLLND_MSG_TYPE_NOOP) {
745 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
746 "not using last credit for %s[%p]\n",
747 libcfs_id2str(peer->peer_id),
749 peer->peer_outstanding_credits,
750 peer->peer_sent_credits,
751 kptllnd_msgtype2str(msg_type), tx);
755 cfs_list_del(&tx->tx_list);
757 /* Discard any NOOP I queued if I'm not at the high-water mark
758 * any more or more messages have been queued */
759 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
760 !kptllnd_peer_send_noop(peer)) {
763 spin_unlock_irqrestore(&peer->peer_lock, flags);
765 CDEBUG(D_NET, "%s: redundant noop\n",
766 libcfs_id2str(peer->peer_id));
767 kptllnd_tx_decref(tx);
769 spin_lock_irqsave(&peer->peer_lock, flags);
773 /* fill last-minute msg fields */
774 kptllnd_msg_pack(tx->tx_msg, peer);
776 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
777 tx->tx_type == TX_TYPE_GET_REQUEST) {
778 /* peer_next_matchbits must be known good */
779 LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
780 /* Assume 64-bit matchbits can't wrap */
781 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
782 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
783 peer->peer_next_matchbits++;
786 peer->peer_sent_credits += peer->peer_outstanding_credits;
787 peer->peer_outstanding_credits = 0;
788 peer->peer_credits--;
790 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
791 libcfs_id2str(peer->peer_id), peer->peer_credits,
792 peer->peer_outstanding_credits, peer->peer_sent_credits,
793 kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
794 tx->tx_msg->ptlm_credits);
796 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
798 kptllnd_tx_addref(tx); /* 1 ref for me... */
800 spin_unlock_irqrestore(&peer->peer_lock, flags);
802 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
803 tx->tx_type == TX_TYPE_GET_REQUEST) {
804 /* Post bulk now we have safe matchbits */
805 rc = PtlMEAttach(kptllnd_data.kptl_nih,
806 *kptllnd_tunables.kptl_portal,
808 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
814 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
815 libcfs_id2str(peer->peer_id),
816 kptllnd_errtype2str(rc), rc);
820 rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
823 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
824 libcfs_id2str(tx->tx_peer->peer_id),
825 kptllnd_errtype2str(rc), rc);
826 rc = PtlMEUnlink(meh);
827 LASSERT(rc == PTL_OK);
828 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
831 /* I'm not racing with the event callback here. It's a
832 * bug if there's an event on the MD I just attached
833 * before I actually send the RDMA request message -
834 * probably matchbits re-used in error. */
837 tx->tx_tposted = jiffies; /* going on the wire */
839 rc = PtlPut (tx->tx_msg_mdh,
840 tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
842 *kptllnd_tunables.kptl_portal,
846 0); /* header data */
848 CERROR("PtlPut %s error %s(%d)\n",
849 libcfs_id2str(peer->peer_id),
850 kptllnd_errtype2str(rc), rc);
854 kptllnd_tx_decref(tx); /* drop my ref */
856 spin_lock_irqsave(&peer->peer_lock, flags);
859 spin_unlock_irqrestore(&peer->peer_lock, flags);
863 /* Nuke everything (including tx we were trying) */
864 kptllnd_peer_close(peer, -EIO);
865 kptllnd_tx_decref(tx);
869 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
874 cfs_list_for_each(ele, &peer->peer_sendq) {
875 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
877 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
878 kptllnd_tx_addref(tx);
883 cfs_list_for_each(ele, &peer->peer_activeq) {
884 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
886 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
887 kptllnd_tx_addref(tx);
897 kptllnd_peer_check_bucket (int idx, int stamp)
899 cfs_list_t *peers = &kptllnd_data.kptl_peers[idx];
903 CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
906 /* NB. Shared lock while I just look */
907 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
909 cfs_list_for_each_entry (peer, peers, peer_list) {
912 int c = -1, oc = -1, sc = -1;
913 int nsend = -1, nactive = -1;
914 int sent_hello = -1, state = -1;
916 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
917 libcfs_id2str(peer->peer_id), peer->peer_credits,
918 peer->peer_outstanding_credits, peer->peer_sent_credits);
920 spin_lock(&peer->peer_lock);
922 if (peer->peer_check_stamp == stamp) {
923 /* checked already this pass */
924 spin_unlock(&peer->peer_lock);
928 peer->peer_check_stamp = stamp;
929 tx = kptllnd_find_timed_out_tx(peer);
930 check_sends = peer->peer_retry_noop;
933 c = peer->peer_credits;
934 sc = peer->peer_sent_credits;
935 oc = peer->peer_outstanding_credits;
936 state = peer->peer_state;
937 sent_hello = peer->peer_sent_hello;
938 nsend = kptllnd_count_queue(&peer->peer_sendq);
939 nactive = kptllnd_count_queue(&peer->peer_activeq);
942 spin_unlock(&peer->peer_lock);
944 if (tx == NULL && !check_sends)
947 kptllnd_peer_addref(peer); /* 1 ref for me... */
949 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
952 if (tx == NULL) { /* nothing timed out */
953 kptllnd_peer_check_sends(peer);
954 kptllnd_peer_decref(peer); /* ...until here or... */
956 /* rescan after dropping the lock */
960 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
961 libcfs_id2str(peer->peer_id),
962 (tx->tx_tposted == 0) ?
963 "no free peer buffers" :
964 "please check Portals");
966 if (tx->tx_tposted) {
967 CERROR("Could not send to %s after %ds (sent %lds ago); "
968 "check Portals for possible issues\n",
969 libcfs_id2str(peer->peer_id),
970 *kptllnd_tunables.kptl_timeout,
971 cfs_duration_sec(jiffies - tx->tx_tposted));
972 } else if (state < PEER_STATE_ACTIVE) {
973 CERROR("Could not connect %s (%d) after %ds; "
974 "peer might be down\n",
975 libcfs_id2str(peer->peer_id), state,
976 *kptllnd_tunables.kptl_timeout);
978 CERROR("Could not get credits for %s after %ds; "
979 "possible Lustre networking issues\n",
980 libcfs_id2str(peer->peer_id),
981 *kptllnd_tunables.kptl_timeout);
984 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
985 "state %d, sent_hello %d, sendq %d, activeq %d "
986 "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
987 libcfs_id2str(peer->peer_id), c, oc, sc,
988 state, sent_hello, nsend, nactive,
989 tx, kptllnd_tx_typestr(tx->tx_type),
990 kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
991 tx->tx_active ? "A" : "",
992 PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
994 PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
997 (tx->tx_tposted == 0) ? "not " : "",
998 (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
999 *kptllnd_tunables.kptl_timeout);
1001 kptllnd_tx_decref(tx);
1003 kptllnd_peer_close(peer, -ETIMEDOUT);
1004 kptllnd_peer_decref(peer); /* ...until here */
1006 /* start again now I've dropped the lock */
1010 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
1014 kptllnd_id2peer_locked (lnet_process_id_t id)
1016 cfs_list_t *peers = kptllnd_nid2peerlist(id.nid);
1020 cfs_list_for_each (tmp, peers) {
1021 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1023 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
1024 peer->peer_state == PEER_STATE_ACTIVE);
1026 /* NB logical LNet peers share one kptl_peer_t */
1027 if (peer->peer_id.pid != id.pid ||
1028 LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
1031 kptllnd_peer_addref(peer);
1033 CDEBUG(D_NET, "%s -> %s (%d)\n",
1035 kptllnd_ptlid2str(peer->peer_ptlid),
1036 cfs_atomic_read (&peer->peer_refcount));
1044 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
1046 LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
1047 "messages may be dropped\n",
1048 str, libcfs_id2str(id),
1049 kptllnd_data.kptl_n_active_peers);
1050 LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1051 "'max_nodes' or 'max_procs_per_node'\n");
1055 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1060 /* Find the last matchbits I saw this new peer using. Note..
1061 A. This peer cannot be in the peer table - she's new!
1062 B. If I can't find the peer in the closing/zombie peers, all
1063 matchbits are safe because all refs to the (old) peer have gone
1064 so all txs have completed so there's no risk of matchbit
1068 LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1070 /* peer's last matchbits can't change after it comes out of the peer
1071 * table, so first match is fine */
1073 cfs_list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1074 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1076 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1077 peer->peer_id.pid == lpid.pid)
1078 return peer->peer_last_matchbits_seen;
1081 cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1082 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1084 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1085 peer->peer_id.pid == lpid.pid)
1086 return peer->peer_last_matchbits_seen;
1089 return PTL_RESERVED_MATCHBITS;
1093 kptllnd_peer_handle_hello (kptl_net_t *net,
1094 ptl_process_id_t initiator, kptl_msg_t *msg)
1096 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1098 kptl_peer_t *new_peer;
1099 lnet_process_id_t lpid;
1100 unsigned long flags;
1101 kptl_tx_t *hello_tx;
1103 __u64 safe_matchbits;
1104 __u64 last_matchbits_seen;
1106 lpid.nid = msg->ptlm_srcnid;
1107 lpid.pid = msg->ptlm_srcpid;
1109 CDEBUG(D_NET, "hello from %s(%s)\n",
1110 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1112 if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1113 (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1114 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1115 * userspace. Refuse the connection if she hasn't set the
1116 * correct flag in her PID... */
1117 CERROR("Userflag not set in hello from %s (%s)\n",
1118 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1122 /* kptlhm_matchbits are the highest matchbits my peer may have used to
1123 * RDMA to me. I ensure I never register buffers for RDMA that could
1124 * match any she used */
1125 safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1127 if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1128 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1129 safe_matchbits, libcfs_id2str(lpid));
1133 if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1134 CERROR("%s: max message size %d < MIN %d",
1135 libcfs_id2str(lpid),
1136 msg->ptlm_u.hello.kptlhm_max_msg_size,
1137 PTLLND_MIN_BUFFER_SIZE);
1141 if (msg->ptlm_credits <= 1) {
1142 CERROR("Need more than 1+%d credits from %s\n",
1143 msg->ptlm_credits, libcfs_id2str(lpid));
1147 write_lock_irqsave(g_lock, flags);
1149 peer = kptllnd_id2peer_locked(lpid);
1151 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1152 /* Completing HELLO handshake */
1153 LASSERT(peer->peer_incarnation == 0);
1155 if (msg->ptlm_dststamp != 0 &&
1156 msg->ptlm_dststamp != peer->peer_myincarnation) {
1157 write_unlock_irqrestore(g_lock, flags);
1159 CERROR("Ignoring HELLO from %s: unexpected "
1160 "dststamp "LPX64" ("LPX64" wanted)\n",
1161 libcfs_id2str(lpid),
1163 peer->peer_myincarnation);
1164 kptllnd_peer_decref(peer);
1168 /* Concurrent initiation or response to my HELLO */
1169 peer->peer_state = PEER_STATE_ACTIVE;
1170 peer->peer_incarnation = msg->ptlm_srcstamp;
1171 peer->peer_next_matchbits = safe_matchbits;
1172 peer->peer_max_msg_size =
1173 msg->ptlm_u.hello.kptlhm_max_msg_size;
1175 write_unlock_irqrestore(g_lock, flags);
1179 if (msg->ptlm_dststamp != 0 &&
1180 msg->ptlm_dststamp <= peer->peer_myincarnation) {
1181 write_unlock_irqrestore(g_lock, flags);
1183 CERROR("Ignoring stale HELLO from %s: "
1184 "dststamp "LPX64" (current "LPX64")\n",
1185 libcfs_id2str(lpid),
1187 peer->peer_myincarnation);
1188 kptllnd_peer_decref(peer);
1192 /* Brand new connection attempt: remove old incarnation */
1193 kptllnd_peer_close_locked(peer, 0);
1196 kptllnd_cull_peertable_locked(lpid);
1198 write_unlock_irqrestore(g_lock, flags);
1201 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1202 " stamp "LPX64"("LPX64")\n",
1203 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1204 msg->ptlm_srcstamp, peer->peer_incarnation);
1206 kptllnd_peer_decref(peer);
1210 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1211 if (hello_tx == NULL) {
1212 CERROR("Unable to allocate HELLO message for %s\n",
1213 libcfs_id2str(lpid));
1217 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1218 lpid, sizeof(kptl_hello_msg_t));
1220 new_peer = kptllnd_peer_allocate(net, lpid, initiator);
1221 if (new_peer == NULL) {
1222 kptllnd_tx_decref(hello_tx);
1226 rc = kptllnd_peer_reserve_buffers();
1228 kptllnd_peer_decref(new_peer);
1229 kptllnd_tx_decref(hello_tx);
1231 CERROR("Failed to reserve buffers for %s\n",
1232 libcfs_id2str(lpid));
1236 write_lock_irqsave(g_lock, flags);
1239 if (net->net_shutdown) {
1240 write_unlock_irqrestore(g_lock, flags);
1242 CERROR ("Shutdown started, refusing connection from %s\n",
1243 libcfs_id2str(lpid));
1244 kptllnd_peer_unreserve_buffers();
1245 kptllnd_peer_decref(new_peer);
1246 kptllnd_tx_decref(hello_tx);
1250 peer = kptllnd_id2peer_locked(lpid);
1252 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1253 /* An outgoing message instantiated 'peer' for me */
1254 LASSERT(peer->peer_incarnation == 0);
1256 peer->peer_state = PEER_STATE_ACTIVE;
1257 peer->peer_incarnation = msg->ptlm_srcstamp;
1258 peer->peer_next_matchbits = safe_matchbits;
1259 peer->peer_max_msg_size =
1260 msg->ptlm_u.hello.kptlhm_max_msg_size;
1262 write_unlock_irqrestore(g_lock, flags);
1264 CWARN("Outgoing instantiated peer %s\n",
1265 libcfs_id2str(lpid));
1267 LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1269 write_unlock_irqrestore(g_lock, flags);
1271 /* WOW! Somehow this peer completed the HELLO
1272 * handshake while I slept. I guess I could have slept
1273 * while it rebooted and sent a new HELLO, so I'll fail
1275 CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1276 kptllnd_peer_decref(peer);
1280 kptllnd_peer_unreserve_buffers();
1281 kptllnd_peer_decref(new_peer);
1282 kptllnd_tx_decref(hello_tx);
1286 if (kptllnd_data.kptl_n_active_peers ==
1287 kptllnd_data.kptl_expected_peers) {
1288 /* peer table full */
1289 write_unlock_irqrestore(g_lock, flags);
1291 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1293 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1295 CERROR("Refusing connection from %s\n",
1296 libcfs_id2str(lpid));
1297 kptllnd_peer_unreserve_buffers();
1298 kptllnd_peer_decref(new_peer);
1299 kptllnd_tx_decref(hello_tx);
1303 write_lock_irqsave(g_lock, flags);
1304 kptllnd_data.kptl_expected_peers++;
1308 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1310 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1311 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1312 *kptllnd_tunables.kptl_max_msg_size;
1314 new_peer->peer_state = PEER_STATE_ACTIVE;
1315 new_peer->peer_incarnation = msg->ptlm_srcstamp;
1316 new_peer->peer_next_matchbits = safe_matchbits;
1317 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1318 new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1320 LASSERT (!net->net_shutdown);
1321 kptllnd_peer_add_peertable_locked(new_peer);
1323 write_unlock_irqrestore(g_lock, flags);
1325 /* NB someone else could get in now and post a message before I post
1326 * the HELLO, but post_tx/check_sends take care of that! */
1328 CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1329 libcfs_id2str(new_peer->peer_id), hello_tx);
1331 kptllnd_post_tx(new_peer, hello_tx, 0);
1332 kptllnd_peer_check_sends(new_peer);
1338 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1340 kptllnd_post_tx(peer, tx, nfrag);
1341 kptllnd_peer_check_sends(peer);
1345 kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
1346 kptl_peer_t **peerp)
1348 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1349 ptl_process_id_t ptl_id;
1350 kptl_peer_t *new_peer;
1351 kptl_tx_t *hello_tx;
1352 unsigned long flags;
1354 __u64 last_matchbits_seen;
1356 /* I expect to find the peer, so I only take a read lock... */
1357 read_lock_irqsave(g_lock, flags);
1358 *peerp = kptllnd_id2peer_locked(target);
1359 read_unlock_irqrestore(g_lock, flags);
1364 if ((target.pid & LNET_PID_USERFLAG) != 0) {
1365 CWARN("Refusing to create a new connection to %s "
1366 "(non-kernel peer)\n", libcfs_id2str(target));
1367 return -EHOSTUNREACH;
1370 /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
1371 * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
1372 ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1373 ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1375 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1376 if (hello_tx == NULL) {
1377 CERROR("Unable to allocate connect message for %s\n",
1378 libcfs_id2str(target));
1382 hello_tx->tx_acked = 1;
1383 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1384 target, sizeof(kptl_hello_msg_t));
1386 new_peer = kptllnd_peer_allocate(net, target, ptl_id);
1387 if (new_peer == NULL) {
1392 rc = kptllnd_peer_reserve_buffers();
1396 write_lock_irqsave(g_lock, flags);
1398 /* Called only in lnd_send which can't happen after lnd_shutdown */
1399 LASSERT (!net->net_shutdown);
1401 *peerp = kptllnd_id2peer_locked(target);
1402 if (*peerp != NULL) {
1403 write_unlock_irqrestore(g_lock, flags);
1407 kptllnd_cull_peertable_locked(target);
1409 if (kptllnd_data.kptl_n_active_peers ==
1410 kptllnd_data.kptl_expected_peers) {
1411 /* peer table full */
1412 write_unlock_irqrestore(g_lock, flags);
1414 kptllnd_peertable_overflow_msg("Connection to ", target);
1416 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1418 CERROR("Can't create connection to %s\n",
1419 libcfs_id2str(target));
1423 write_lock_irqsave(g_lock, flags);
1424 kptllnd_data.kptl_expected_peers++;
1428 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1430 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1431 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1432 *kptllnd_tunables.kptl_max_msg_size;
1434 new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1435 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1437 kptllnd_peer_add_peertable_locked(new_peer);
1439 write_unlock_irqrestore(g_lock, flags);
1441 /* NB someone else could get in now and post a message before I post
1442 * the HELLO, but post_tx/check_sends take care of that! */
1444 CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1445 libcfs_id2str(new_peer->peer_id), hello_tx);
1447 kptllnd_post_tx(new_peer, hello_tx, 0);
1448 kptllnd_peer_check_sends(new_peer);
1454 kptllnd_peer_unreserve_buffers();
1456 kptllnd_peer_decref(new_peer);
1458 kptllnd_tx_decref(hello_tx);