1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ptllnd/ptllnd_peer.c
38 * Author: PJ Kirner <pjkirner@clusterfs.com>
39 * Author: E Barton <eeb@bartonsoftware.com>
43 #include <libcfs/list.h>
46 kptllnd_count_queue(struct list_head *q)
59 kptllnd_get_peer_info(int index,
60 lnet_process_id_t *id,
61 int *state, int *sent_hello,
62 int *refcount, __u64 *incarnation,
63 __u64 *next_matchbits, __u64 *last_matchbits_seen,
64 int *nsendq, int *nactiveq,
65 int *credits, int *outstanding_credits)
67 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
69 struct list_head *ptmp;
74 read_lock_irqsave(g_lock, flags);
76 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
77 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
78 peer = list_entry(ptmp, kptl_peer_t, peer_list);
84 *state = peer->peer_state;
85 *sent_hello = peer->peer_sent_hello;
86 *refcount = atomic_read(&peer->peer_refcount);
87 *incarnation = peer->peer_incarnation;
89 spin_lock(&peer->peer_lock);
91 *next_matchbits = peer->peer_next_matchbits;
92 *last_matchbits_seen = peer->peer_last_matchbits_seen;
93 *credits = peer->peer_credits;
94 *outstanding_credits = peer->peer_outstanding_credits;
96 *nsendq = kptllnd_count_queue(&peer->peer_sendq);
97 *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
99 spin_unlock(&peer->peer_lock);
107 read_unlock_irqrestore(g_lock, flags);
112 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
114 LASSERT (kptllnd_data.kptl_n_active_peers <
115 kptllnd_data.kptl_expected_peers);
117 LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
118 peer->peer_state == PEER_STATE_ACTIVE);
120 kptllnd_data.kptl_n_active_peers++;
121 atomic_inc(&peer->peer_refcount); /* +1 ref for the list */
123 /* NB add to HEAD of peer list for MRU order!
124 * (see kptllnd_cull_peertable) */
125 list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
129 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
131 /* I'm about to add a new peer with this portals ID to the peer table,
132 * so (a) this peer should not exist already and (b) I want to leave at
133 * most (max_procs_per_nid - 1) peers with this NID in the table. */
134 struct list_head *peers = kptllnd_nid2peerlist(pid.nid);
135 int cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
137 struct list_head *tmp;
138 struct list_head *nxt;
142 list_for_each_safe (tmp, nxt, peers) {
143 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
145 peer = list_entry(tmp, kptl_peer_t, peer_list);
147 if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
150 LASSERT (peer->peer_id.pid != pid.pid);
154 if (count < cull_count) /* recent (don't cull) */
157 CDEBUG(D_NET, "Cull %s(%s)\n",
158 libcfs_id2str(peer->peer_id),
159 kptllnd_ptlid2str(peer->peer_ptlid));
161 kptllnd_peer_close_locked(peer, 0);
166 kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
171 LIBCFS_ALLOC(peer, sizeof (*peer));
173 CERROR("Can't create peer %s (%s)\n",
175 kptllnd_ptlid2str(ppid));
179 memset(peer, 0, sizeof(*peer)); /* zero flags etc */
181 INIT_LIST_HEAD (&peer->peer_noops);
182 INIT_LIST_HEAD (&peer->peer_sendq);
183 INIT_LIST_HEAD (&peer->peer_activeq);
184 spin_lock_init (&peer->peer_lock);
186 peer->peer_state = PEER_STATE_ALLOCATED;
187 peer->peer_error = 0;
188 peer->peer_last_alive = 0;
189 peer->peer_id = lpid;
190 peer->peer_ptlid = ppid;
191 peer->peer_credits = 1; /* enough for HELLO */
192 peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
193 peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
194 peer->peer_sent_credits = 1; /* HELLO credit is implicit */
195 peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
197 atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */
199 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
201 peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
203 /* Only increase # peers under lock, to guarantee we dont grow it
205 if (net->net_shutdown) {
206 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
207 LIBCFS_FREE(peer, sizeof(*peer));
211 kptllnd_data.kptl_npeers++;
212 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
217 kptllnd_peer_destroy (kptl_peer_t *peer)
221 CDEBUG(D_NET, "Peer=%p\n", peer);
223 LASSERT (!in_interrupt());
224 LASSERT (atomic_read(&peer->peer_refcount) == 0);
225 LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
226 peer->peer_state == PEER_STATE_ZOMBIE);
227 LASSERT (list_empty(&peer->peer_noops));
228 LASSERT (list_empty(&peer->peer_sendq));
229 LASSERT (list_empty(&peer->peer_activeq));
231 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
233 if (peer->peer_state == PEER_STATE_ZOMBIE)
234 list_del(&peer->peer_list);
236 kptllnd_data.kptl_npeers--;
238 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
240 LIBCFS_FREE (peer, sizeof (*peer));
244 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
246 struct list_head *tmp;
247 struct list_head *nxt;
250 list_for_each_safe (tmp, nxt, peerq) {
251 tx = list_entry(tmp, kptl_tx_t, tx_list);
253 list_del(&tx->tx_list);
254 list_add_tail(&tx->tx_list, txs);
256 tx->tx_status = -EIO;
262 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
266 spin_lock_irqsave(&peer->peer_lock, flags);
268 kptllnd_cancel_txlist(&peer->peer_noops, txs);
269 kptllnd_cancel_txlist(&peer->peer_sendq, txs);
270 kptllnd_cancel_txlist(&peer->peer_activeq, txs);
272 spin_unlock_irqrestore(&peer->peer_lock, flags);
276 kptllnd_peer_alive (kptl_peer_t *peer)
278 /* This is racy, but everyone's only writing cfs_time_current() */
279 peer->peer_last_alive = cfs_time_current();
284 kptllnd_peer_notify (kptl_peer_t *peer)
292 time_t last_alive = 0;
294 spin_lock_irqsave(&peer->peer_lock, flags);
296 if (peer->peer_error != 0) {
297 error = peer->peer_error;
298 peer->peer_error = 0;
300 last_alive = cfs_time_current_sec() -
301 cfs_duration_sec(cfs_time_current() -
302 peer->peer_last_alive);
305 spin_unlock_irqrestore(&peer->peer_lock, flags);
310 read_lock(&kptllnd_data.kptl_net_rw_lock);
311 list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
313 read_unlock(&kptllnd_data.kptl_net_rw_lock);
315 if (nnets == 0) /* shutdown in progress */
318 LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
320 CERROR("Failed to allocate nets[%d]\n", nnets);
323 memset(nets, 0, nnets * sizeof(*nets));
325 read_lock(&kptllnd_data.kptl_net_rw_lock);
327 list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
330 kptllnd_net_addref(net);
333 read_unlock(&kptllnd_data.kptl_net_rw_lock);
335 for (i = 0; i < nnets; i++) {
342 if (!net->net_shutdown) {
343 peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
344 peer->peer_ptlid.nid);
345 lnet_notify(net->net_ni, peer_nid, 0, last_alive);
348 kptllnd_net_decref(net);
351 LIBCFS_FREE(nets, nnets * sizeof(*nets));
355 kptllnd_handle_closing_peers ()
358 struct list_head txs;
360 struct list_head *tmp;
361 struct list_head *nxt;
365 /* Check with a read lock first to avoid blocking anyone */
367 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
368 idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
369 list_empty(&kptllnd_data.kptl_zombie_peers);
370 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
375 INIT_LIST_HEAD(&txs);
377 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
379 /* Cancel txs on all zombie peers. NB anyone dropping the last peer
380 * ref removes it from this list, so I musn't drop the lock while
382 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
383 peer = list_entry (tmp, kptl_peer_t, peer_list);
385 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
387 kptllnd_peer_cancel_txs(peer, &txs);
390 /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB
391 * I'm the only one removing from this list, but peers can be added on
392 * the end any time I drop the lock. */
394 list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
395 peer = list_entry (tmp, kptl_peer_t, peer_list);
397 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
399 list_del(&peer->peer_list);
400 list_add_tail(&peer->peer_list,
401 &kptllnd_data.kptl_zombie_peers);
402 peer->peer_state = PEER_STATE_ZOMBIE;
404 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
406 kptllnd_peer_notify(peer);
407 kptllnd_peer_cancel_txs(peer, &txs);
408 kptllnd_peer_decref(peer);
410 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
413 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
415 /* Drop peer's ref on all cancelled txs. This will get
416 * kptllnd_tx_fini() to abort outstanding comms if necessary. */
418 list_for_each_safe (tmp, nxt, &txs) {
419 tx = list_entry(tmp, kptl_tx_t, tx_list);
420 list_del(&tx->tx_list);
421 kptllnd_tx_decref(tx);
426 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
428 switch (peer->peer_state) {
432 case PEER_STATE_WAITING_HELLO:
433 case PEER_STATE_ACTIVE:
434 /* Ensure new peers see a new incarnation of me */
435 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
436 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
437 kptllnd_data.kptl_incarnation++;
439 /* Removing from peer table */
440 kptllnd_data.kptl_n_active_peers--;
441 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
443 list_del(&peer->peer_list);
444 kptllnd_peer_unreserve_buffers();
446 peer->peer_error = why; /* stash 'why' only on first close */
447 peer->peer_state = PEER_STATE_CLOSING;
449 /* Schedule for immediate attention, taking peer table's ref */
450 list_add_tail(&peer->peer_list,
451 &kptllnd_data.kptl_closing_peers);
452 wake_up(&kptllnd_data.kptl_watchdog_waitq);
455 case PEER_STATE_ZOMBIE:
456 case PEER_STATE_CLOSING:
462 kptllnd_peer_close(kptl_peer_t *peer, int why)
466 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
467 kptllnd_peer_close_locked(peer, why);
468 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
472 kptllnd_peer_del(lnet_process_id_t id)
474 struct list_head *ptmp;
475 struct list_head *pnxt;
484 * Find the single bucket we are supposed to look at or if nid is a
485 * wildcard (LNET_NID_ANY) then look at all of the buckets
487 if (id.nid != LNET_NID_ANY) {
488 struct list_head *l = kptllnd_nid2peerlist(id.nid);
490 lo = hi = l - kptllnd_data.kptl_peers;
492 if (id.pid != LNET_PID_ANY)
496 hi = kptllnd_data.kptl_peer_hash_size - 1;
500 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
502 for (i = lo; i <= hi; i++) {
503 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
504 peer = list_entry (ptmp, kptl_peer_t, peer_list);
506 if (!(id.nid == LNET_NID_ANY ||
507 (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
508 (id.pid == LNET_PID_ANY ||
509 peer->peer_id.pid == id.pid))))
512 kptllnd_peer_addref(peer); /* 1 ref for me... */
514 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
517 kptllnd_peer_close(peer, 0);
518 kptllnd_peer_decref(peer); /* ...until here */
520 rc = 0; /* matched something */
522 /* start again now I've dropped the lock */
527 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
533 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
535 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
538 spin_lock_irqsave(&peer->peer_lock, flags);
540 /* Ensure HELLO is sent first */
541 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
542 list_add(&tx->tx_list, &peer->peer_noops);
543 else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
544 list_add(&tx->tx_list, &peer->peer_sendq);
546 list_add_tail(&tx->tx_list, &peer->peer_sendq);
548 spin_unlock_irqrestore(&peer->peer_lock, flags);
553 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
555 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
556 ptl_handle_md_t msg_mdh;
560 LASSERT (!tx->tx_idle);
561 LASSERT (!tx->tx_active);
562 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
563 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
564 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
565 tx->tx_type == TX_TYPE_PUT_REQUEST ||
566 tx->tx_type == TX_TYPE_GET_REQUEST);
568 kptllnd_set_tx_peer(tx, peer);
570 memset(&md, 0, sizeof(md));
572 md.threshold = tx->tx_acked ? 2 : 1; /* SEND END + ACK? */
573 md.options = PTL_MD_OP_PUT |
574 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
575 PTL_MD_EVENT_START_DISABLE;
576 md.user_ptr = &tx->tx_msg_eventarg;
577 md.eq_handle = kptllnd_data.kptl_eqh;
580 md.start = tx->tx_msg;
581 md.length = tx->tx_msg->ptlm_nob;
584 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
586 md.start = tx->tx_frags;
588 md.options |= PTL_MD_IOVEC;
591 prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
593 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
594 libcfs_id2str(peer->peer_id),
595 kptllnd_errtype2str(prc), prc);
596 tx->tx_status = -EIO;
597 kptllnd_tx_decref(tx);
602 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
604 tx->tx_msg_mdh = msg_mdh;
605 kptllnd_queue_tx(peer, tx);
608 /* NB "restarts" comes from peer_sendq of a single peer */
610 kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target, struct list_head *restarts)
616 LASSERT (!list_empty(restarts));
618 if (kptllnd_find_target(net, target, &peer) != 0)
621 list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
622 LASSERT (tx->tx_peer != NULL);
623 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
624 tx->tx_type == TX_TYPE_PUT_REQUEST ||
625 tx->tx_type == TX_TYPE_SMALL_MESSAGE);
627 list_del_init(&tx->tx_list);
630 tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
631 kptllnd_tx_decref(tx);
635 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
638 kptllnd_peer_decref(tx->tx_peer);
640 kptllnd_set_tx_peer(tx, peer);
641 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
647 kptllnd_peer_check_sends(peer);
648 kptllnd_peer_decref(peer);
652 kptllnd_peer_send_noop (kptl_peer_t *peer)
654 if (!peer->peer_sent_hello ||
655 peer->peer_credits == 0 ||
656 !list_empty(&peer->peer_noops) ||
657 peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
660 /* No tx to piggyback NOOP onto or no credit to send a tx */
661 return (list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
665 kptllnd_peer_check_sends (kptl_peer_t *peer)
673 LASSERT(!in_interrupt());
675 spin_lock_irqsave(&peer->peer_lock, flags);
677 peer->peer_retry_noop = 0;
679 if (kptllnd_peer_send_noop(peer)) {
680 /* post a NOOP to return credits */
681 spin_unlock_irqrestore(&peer->peer_lock, flags);
683 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
685 CERROR("Can't return credits to %s: can't allocate descriptor\n",
686 libcfs_id2str(peer->peer_id));
688 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
690 kptllnd_post_tx(peer, tx, 0);
693 spin_lock_irqsave(&peer->peer_lock, flags);
694 peer->peer_retry_noop = (tx == NULL);
698 if (!list_empty(&peer->peer_noops)) {
699 LASSERT (peer->peer_sent_hello);
700 tx = list_entry(peer->peer_noops.next,
702 } else if (!list_empty(&peer->peer_sendq)) {
703 tx = list_entry(peer->peer_sendq.next,
706 /* nothing to send right now */
710 LASSERT (tx->tx_active);
711 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
712 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
714 LASSERT (peer->peer_outstanding_credits >= 0);
715 LASSERT (peer->peer_sent_credits >= 0);
716 LASSERT (peer->peer_sent_credits +
717 peer->peer_outstanding_credits <=
718 *kptllnd_tunables.kptl_peertxcredits);
719 LASSERT (peer->peer_credits >= 0);
721 msg_type = tx->tx_msg->ptlm_type;
723 /* Ensure HELLO is sent first */
724 if (!peer->peer_sent_hello) {
725 LASSERT (list_empty(&peer->peer_noops));
726 if (msg_type != PTLLND_MSG_TYPE_HELLO)
728 peer->peer_sent_hello = 1;
731 if (peer->peer_credits == 0) {
732 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
733 libcfs_id2str(peer->peer_id),
735 peer->peer_outstanding_credits,
736 peer->peer_sent_credits,
737 kptllnd_msgtype2str(msg_type), tx);
741 /* Last/Initial credit reserved for NOOP/HELLO */
742 if (peer->peer_credits == 1 &&
743 msg_type != PTLLND_MSG_TYPE_HELLO &&
744 msg_type != PTLLND_MSG_TYPE_NOOP) {
745 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
746 "not using last credit for %s[%p]\n",
747 libcfs_id2str(peer->peer_id),
749 peer->peer_outstanding_credits,
750 peer->peer_sent_credits,
751 kptllnd_msgtype2str(msg_type), tx);
755 list_del(&tx->tx_list);
757 /* Discard any NOOP I queued if I'm not at the high-water mark
758 * any more or more messages have been queued */
759 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
760 !kptllnd_peer_send_noop(peer)) {
763 spin_unlock_irqrestore(&peer->peer_lock, flags);
765 CDEBUG(D_NET, "%s: redundant noop\n",
766 libcfs_id2str(peer->peer_id));
767 kptllnd_tx_decref(tx);
769 spin_lock_irqsave(&peer->peer_lock, flags);
773 /* fill last-minute msg fields */
774 kptllnd_msg_pack(tx->tx_msg, peer);
776 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
777 tx->tx_type == TX_TYPE_GET_REQUEST) {
778 /* peer_next_matchbits must be known good */
779 LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
780 /* Assume 64-bit matchbits can't wrap */
781 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
782 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
783 peer->peer_next_matchbits++;
786 peer->peer_sent_credits += peer->peer_outstanding_credits;
787 peer->peer_outstanding_credits = 0;
788 peer->peer_credits--;
790 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
791 libcfs_id2str(peer->peer_id), peer->peer_credits,
792 peer->peer_outstanding_credits, peer->peer_sent_credits,
793 kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
794 tx->tx_msg->ptlm_credits);
796 list_add_tail(&tx->tx_list, &peer->peer_activeq);
798 kptllnd_tx_addref(tx); /* 1 ref for me... */
800 spin_unlock_irqrestore(&peer->peer_lock, flags);
802 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
803 tx->tx_type == TX_TYPE_GET_REQUEST) {
804 /* Post bulk now we have safe matchbits */
805 rc = PtlMEAttach(kptllnd_data.kptl_nih,
806 *kptllnd_tunables.kptl_portal,
808 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
814 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
815 libcfs_id2str(peer->peer_id),
816 kptllnd_errtype2str(rc), rc);
820 rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
823 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
824 libcfs_id2str(tx->tx_peer->peer_id),
825 kptllnd_errtype2str(rc), rc);
826 rc = PtlMEUnlink(meh);
827 LASSERT(rc == PTL_OK);
828 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
831 /* I'm not racing with the event callback here. It's a
832 * bug if there's an event on the MD I just attached
833 * before I actually send the RDMA request message -
834 * probably matchbits re-used in error. */
837 tx->tx_tposted = jiffies; /* going on the wire */
839 rc = PtlPut (tx->tx_msg_mdh,
840 tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
842 *kptllnd_tunables.kptl_portal,
846 0); /* header data */
848 CERROR("PtlPut %s error %s(%d)\n",
849 libcfs_id2str(peer->peer_id),
850 kptllnd_errtype2str(rc), rc);
854 kptllnd_tx_decref(tx); /* drop my ref */
856 spin_lock_irqsave(&peer->peer_lock, flags);
859 spin_unlock_irqrestore(&peer->peer_lock, flags);
863 /* Nuke everything (including tx we were trying) */
864 kptllnd_peer_close(peer, -EIO);
865 kptllnd_tx_decref(tx);
866 kptllnd_schedule_ptltrace_dump();
870 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
873 struct list_head *ele;
875 list_for_each(ele, &peer->peer_sendq) {
876 tx = list_entry(ele, kptl_tx_t, tx_list);
878 if (time_after_eq(jiffies, tx->tx_deadline)) {
879 kptllnd_tx_addref(tx);
884 list_for_each(ele, &peer->peer_activeq) {
885 tx = list_entry(ele, kptl_tx_t, tx_list);
887 if (time_after_eq(jiffies, tx->tx_deadline)) {
888 kptllnd_tx_addref(tx);
898 kptllnd_peer_check_bucket (int idx, int stamp)
900 struct list_head *peers = &kptllnd_data.kptl_peers[idx];
904 CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
907 /* NB. Shared lock while I just look */
908 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
910 list_for_each_entry (peer, peers, peer_list) {
913 int c = -1, oc = -1, sc = -1;
914 int nsend = -1, nactive = -1;
915 int sent_hello = -1, state = -1;
917 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
918 libcfs_id2str(peer->peer_id), peer->peer_credits,
919 peer->peer_outstanding_credits, peer->peer_sent_credits);
921 spin_lock(&peer->peer_lock);
923 if (peer->peer_check_stamp == stamp) {
924 /* checked already this pass */
925 spin_unlock(&peer->peer_lock);
929 peer->peer_check_stamp = stamp;
930 tx = kptllnd_find_timed_out_tx(peer);
931 check_sends = peer->peer_retry_noop;
934 c = peer->peer_credits;
935 sc = peer->peer_sent_credits;
936 oc = peer->peer_outstanding_credits;
937 state = peer->peer_state;
938 sent_hello = peer->peer_sent_hello;
939 nsend = kptllnd_count_queue(&peer->peer_sendq);
940 nactive = kptllnd_count_queue(&peer->peer_activeq);
943 spin_unlock(&peer->peer_lock);
945 if (tx == NULL && !check_sends)
948 kptllnd_peer_addref(peer); /* 1 ref for me... */
950 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
952 if (tx == NULL) { /* nothing timed out */
953 kptllnd_peer_check_sends(peer);
954 kptllnd_peer_decref(peer); /* ...until here or... */
956 /* rescan after dropping the lock */
960 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
961 libcfs_id2str(peer->peer_id),
962 (tx->tx_tposted == 0) ?
963 "no free peer buffers" :
964 "please check Portals");
966 if (tx->tx_tposted) {
967 CERROR("Could not send to %s after %ds (sent %lds ago); "
968 "check Portals for possible issues\n",
969 libcfs_id2str(peer->peer_id),
970 *kptllnd_tunables.kptl_timeout,
971 cfs_duration_sec(jiffies - tx->tx_tposted));
972 } else if (state < PEER_STATE_ACTIVE) {
973 CERROR("Could not connect %s (%d) after %ds; "
974 "peer might be down\n",
975 libcfs_id2str(peer->peer_id), state,
976 *kptllnd_tunables.kptl_timeout);
978 CERROR("Could not get credits for %s after %ds; "
979 "possible Lustre networking issues\n",
980 libcfs_id2str(peer->peer_id),
981 *kptllnd_tunables.kptl_timeout);
984 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
985 "state %d, sent_hello %d, sendq %d, activeq %d "
986 "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
987 libcfs_id2str(peer->peer_id), c, oc, sc,
988 state, sent_hello, nsend, nactive,
989 tx, kptllnd_tx_typestr(tx->tx_type),
990 kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
991 tx->tx_active ? "A" : "",
992 PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
994 PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
997 (tx->tx_tposted == 0) ? "not " : "",
998 (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
999 *kptllnd_tunables.kptl_timeout);
1002 if (*kptllnd_tunables.kptl_ptltrace_on_timeout)
1003 kptllnd_dump_ptltrace();
1006 kptllnd_tx_decref(tx);
1008 kptllnd_peer_close(peer, -ETIMEDOUT);
1009 kptllnd_peer_decref(peer); /* ...until here */
1011 /* start again now I've dropped the lock */
1015 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
1019 kptllnd_id2peer_locked (lnet_process_id_t id)
1021 struct list_head *peers = kptllnd_nid2peerlist(id.nid);
1022 struct list_head *tmp;
1025 list_for_each (tmp, peers) {
1026 peer = list_entry (tmp, kptl_peer_t, peer_list);
1028 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
1029 peer->peer_state == PEER_STATE_ACTIVE);
1031 /* NB logical LNet peers share one kptl_peer_t */
1032 if (peer->peer_id.pid != id.pid ||
1033 LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
1036 kptllnd_peer_addref(peer);
1038 CDEBUG(D_NET, "%s -> %s (%d)\n",
1040 kptllnd_ptlid2str(peer->peer_ptlid),
1041 atomic_read (&peer->peer_refcount));
1049 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
1051 LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
1052 "messages may be dropped\n",
1053 str, libcfs_id2str(id),
1054 kptllnd_data.kptl_n_active_peers);
1055 LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1056 "'max_nodes' or 'max_procs_per_node'\n");
1060 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1063 struct list_head *tmp;
1065 /* Find the last matchbits I saw this new peer using. Note..
1066 A. This peer cannot be in the peer table - she's new!
1067 B. If I can't find the peer in the closing/zombie peers, all
1068 matchbits are safe because all refs to the (old) peer have gone
1069 so all txs have completed so there's no risk of matchbit
1073 LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1075 /* peer's last matchbits can't change after it comes out of the peer
1076 * table, so first match is fine */
1078 list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1079 peer = list_entry (tmp, kptl_peer_t, peer_list);
1081 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1082 peer->peer_id.pid == lpid.pid)
1083 return peer->peer_last_matchbits_seen;
1086 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1087 peer = list_entry (tmp, kptl_peer_t, peer_list);
1089 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1090 peer->peer_id.pid == lpid.pid)
1091 return peer->peer_last_matchbits_seen;
1094 return PTL_RESERVED_MATCHBITS;
1098 kptllnd_peer_handle_hello (kptl_net_t *net,
1099 ptl_process_id_t initiator, kptl_msg_t *msg)
1101 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1103 kptl_peer_t *new_peer;
1104 lnet_process_id_t lpid;
1105 unsigned long flags;
1106 kptl_tx_t *hello_tx;
1108 __u64 safe_matchbits;
1109 __u64 last_matchbits_seen;
1111 lpid.nid = msg->ptlm_srcnid;
1112 lpid.pid = msg->ptlm_srcpid;
1114 CDEBUG(D_NET, "hello from %s(%s)\n",
1115 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1117 if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1118 (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1119 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1120 * userspace. Refuse the connection if she hasn't set the
1121 * correct flag in her PID... */
1122 CERROR("Userflag not set in hello from %s (%s)\n",
1123 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1127 /* kptlhm_matchbits are the highest matchbits my peer may have used to
1128 * RDMA to me. I ensure I never register buffers for RDMA that could
1129 * match any she used */
1130 safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1132 if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1133 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1134 safe_matchbits, libcfs_id2str(lpid));
1138 if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1139 CERROR("%s: max message size %d < MIN %d",
1140 libcfs_id2str(lpid),
1141 msg->ptlm_u.hello.kptlhm_max_msg_size,
1142 PTLLND_MIN_BUFFER_SIZE);
1146 if (msg->ptlm_credits <= 1) {
1147 CERROR("Need more than 1+%d credits from %s\n",
1148 msg->ptlm_credits, libcfs_id2str(lpid));
1152 write_lock_irqsave(g_lock, flags);
1154 peer = kptllnd_id2peer_locked(lpid);
1156 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1157 /* Completing HELLO handshake */
1158 LASSERT(peer->peer_incarnation == 0);
1160 if (msg->ptlm_dststamp != 0 &&
1161 msg->ptlm_dststamp != peer->peer_myincarnation) {
1162 write_unlock_irqrestore(g_lock, flags);
1164 CERROR("Ignoring HELLO from %s: unexpected "
1165 "dststamp "LPX64" ("LPX64" wanted)\n",
1166 libcfs_id2str(lpid),
1168 peer->peer_myincarnation);
1169 kptllnd_peer_decref(peer);
1173 /* Concurrent initiation or response to my HELLO */
1174 peer->peer_state = PEER_STATE_ACTIVE;
1175 peer->peer_incarnation = msg->ptlm_srcstamp;
1176 peer->peer_next_matchbits = safe_matchbits;
1177 peer->peer_max_msg_size =
1178 msg->ptlm_u.hello.kptlhm_max_msg_size;
1180 write_unlock_irqrestore(g_lock, flags);
1184 if (msg->ptlm_dststamp != 0 &&
1185 msg->ptlm_dststamp <= peer->peer_myincarnation) {
1186 write_unlock_irqrestore(g_lock, flags);
1188 CERROR("Ignoring stale HELLO from %s: "
1189 "dststamp "LPX64" (current "LPX64")\n",
1190 libcfs_id2str(lpid),
1192 peer->peer_myincarnation);
1193 kptllnd_peer_decref(peer);
1197 /* Brand new connection attempt: remove old incarnation */
1198 kptllnd_peer_close_locked(peer, 0);
1201 kptllnd_cull_peertable_locked(lpid);
1203 write_unlock_irqrestore(g_lock, flags);
1206 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1207 " stamp "LPX64"("LPX64")\n",
1208 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1209 msg->ptlm_srcstamp, peer->peer_incarnation);
1211 kptllnd_peer_decref(peer);
1215 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1216 if (hello_tx == NULL) {
1217 CERROR("Unable to allocate HELLO message for %s\n",
1218 libcfs_id2str(lpid));
1222 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1223 lpid, sizeof(kptl_hello_msg_t));
1225 new_peer = kptllnd_peer_allocate(net, lpid, initiator);
1226 if (new_peer == NULL) {
1227 kptllnd_tx_decref(hello_tx);
1231 rc = kptllnd_peer_reserve_buffers();
1233 kptllnd_peer_decref(new_peer);
1234 kptllnd_tx_decref(hello_tx);
1236 CERROR("Failed to reserve buffers for %s\n",
1237 libcfs_id2str(lpid));
1241 write_lock_irqsave(g_lock, flags);
1244 if (net->net_shutdown) {
1245 write_unlock_irqrestore(g_lock, flags);
1247 CERROR ("Shutdown started, refusing connection from %s\n",
1248 libcfs_id2str(lpid));
1249 kptllnd_peer_unreserve_buffers();
1250 kptllnd_peer_decref(new_peer);
1251 kptllnd_tx_decref(hello_tx);
1255 peer = kptllnd_id2peer_locked(lpid);
1257 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1258 /* An outgoing message instantiated 'peer' for me */
1259 LASSERT(peer->peer_incarnation == 0);
1261 peer->peer_state = PEER_STATE_ACTIVE;
1262 peer->peer_incarnation = msg->ptlm_srcstamp;
1263 peer->peer_next_matchbits = safe_matchbits;
1264 peer->peer_max_msg_size =
1265 msg->ptlm_u.hello.kptlhm_max_msg_size;
1267 write_unlock_irqrestore(g_lock, flags);
1269 CWARN("Outgoing instantiated peer %s\n",
1270 libcfs_id2str(lpid));
1272 LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1274 write_unlock_irqrestore(g_lock, flags);
1276 /* WOW! Somehow this peer completed the HELLO
1277 * handshake while I slept. I guess I could have slept
1278 * while it rebooted and sent a new HELLO, so I'll fail
1280 CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1281 kptllnd_peer_decref(peer);
1285 kptllnd_peer_unreserve_buffers();
1286 kptllnd_peer_decref(new_peer);
1287 kptllnd_tx_decref(hello_tx);
1291 if (kptllnd_data.kptl_n_active_peers ==
1292 kptllnd_data.kptl_expected_peers) {
1293 /* peer table full */
1294 write_unlock_irqrestore(g_lock, flags);
1296 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1298 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1300 CERROR("Refusing connection from %s\n",
1301 libcfs_id2str(lpid));
1302 kptllnd_peer_unreserve_buffers();
1303 kptllnd_peer_decref(new_peer);
1304 kptllnd_tx_decref(hello_tx);
1308 write_lock_irqsave(g_lock, flags);
1309 kptllnd_data.kptl_expected_peers++;
1313 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1315 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1316 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1317 *kptllnd_tunables.kptl_max_msg_size;
1319 new_peer->peer_state = PEER_STATE_ACTIVE;
1320 new_peer->peer_incarnation = msg->ptlm_srcstamp;
1321 new_peer->peer_next_matchbits = safe_matchbits;
1322 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1323 new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1325 LASSERT (!net->net_shutdown);
1326 kptllnd_peer_add_peertable_locked(new_peer);
1328 write_unlock_irqrestore(g_lock, flags);
1330 /* NB someone else could get in now and post a message before I post
1331 * the HELLO, but post_tx/check_sends take care of that! */
1333 CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1334 libcfs_id2str(new_peer->peer_id), hello_tx);
1336 kptllnd_post_tx(new_peer, hello_tx, 0);
1337 kptllnd_peer_check_sends(new_peer);
1343 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1345 kptllnd_post_tx(peer, tx, nfrag);
1346 kptllnd_peer_check_sends(peer);
1350 kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
1351 kptl_peer_t **peerp)
1353 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1354 ptl_process_id_t ptl_id;
1355 kptl_peer_t *new_peer;
1356 kptl_tx_t *hello_tx;
1357 unsigned long flags;
1359 __u64 last_matchbits_seen;
1361 /* I expect to find the peer, so I only take a read lock... */
1362 read_lock_irqsave(g_lock, flags);
1363 *peerp = kptllnd_id2peer_locked(target);
1364 read_unlock_irqrestore(g_lock, flags);
1369 if ((target.pid & LNET_PID_USERFLAG) != 0) {
1370 CWARN("Refusing to create a new connection to %s "
1371 "(non-kernel peer)\n", libcfs_id2str(target));
1372 return -EHOSTUNREACH;
1375 /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
1376 * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
1377 ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1378 ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1380 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1381 if (hello_tx == NULL) {
1382 CERROR("Unable to allocate connect message for %s\n",
1383 libcfs_id2str(target));
1387 hello_tx->tx_acked = 1;
1388 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1389 target, sizeof(kptl_hello_msg_t));
1391 new_peer = kptllnd_peer_allocate(net, target, ptl_id);
1392 if (new_peer == NULL) {
1397 rc = kptllnd_peer_reserve_buffers();
1401 write_lock_irqsave(g_lock, flags);
1403 /* Called only in lnd_send which can't happen after lnd_shutdown */
1404 LASSERT (!net->net_shutdown);
1406 *peerp = kptllnd_id2peer_locked(target);
1407 if (*peerp != NULL) {
1408 write_unlock_irqrestore(g_lock, flags);
1412 kptllnd_cull_peertable_locked(target);
1414 if (kptllnd_data.kptl_n_active_peers ==
1415 kptllnd_data.kptl_expected_peers) {
1416 /* peer table full */
1417 write_unlock_irqrestore(g_lock, flags);
1419 kptllnd_peertable_overflow_msg("Connection to ", target);
1421 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1423 CERROR("Can't create connection to %s\n",
1424 libcfs_id2str(target));
1428 write_lock_irqsave(g_lock, flags);
1429 kptllnd_data.kptl_expected_peers++;
1433 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1435 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1436 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1437 *kptllnd_tunables.kptl_max_msg_size;
1439 new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1440 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1442 kptllnd_peer_add_peertable_locked(new_peer);
1444 write_unlock_irqrestore(g_lock, flags);
1446 /* NB someone else could get in now and post a message before I post
1447 * the HELLO, but post_tx/check_sends take care of that! */
1449 CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1450 libcfs_id2str(new_peer->peer_id), hello_tx);
1452 kptllnd_post_tx(new_peer, hello_tx, 0);
1453 kptllnd_peer_check_sends(new_peer);
1459 kptllnd_peer_unreserve_buffers();
1461 kptllnd_peer_decref(new_peer);
1463 kptllnd_tx_decref(hello_tx);