1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ptllnd/ptllnd_peer.c
38 * Author: PJ Kirner <pjkirner@clusterfs.com>
39 * Author: E Barton <eeb@bartonsoftware.com>
43 #include <libcfs/list.h>
46 kptllnd_count_queue(struct list_head *q)
59 kptllnd_get_peer_info(int index,
60 lnet_process_id_t *id,
61 int *state, int *sent_hello,
62 int *refcount, __u64 *incarnation,
63 __u64 *next_matchbits, __u64 *last_matchbits_seen,
64 int *nsendq, int *nactiveq,
65 int *credits, int *outstanding_credits)
67 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
69 struct list_head *ptmp;
74 read_lock_irqsave(g_lock, flags);
76 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
77 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
78 peer = list_entry(ptmp, kptl_peer_t, peer_list);
84 *state = peer->peer_state;
85 *sent_hello = peer->peer_sent_hello;
86 *refcount = atomic_read(&peer->peer_refcount);
87 *incarnation = peer->peer_incarnation;
89 spin_lock(&peer->peer_lock);
91 *next_matchbits = peer->peer_next_matchbits;
92 *last_matchbits_seen = peer->peer_last_matchbits_seen;
93 *credits = peer->peer_credits;
94 *outstanding_credits = peer->peer_outstanding_credits;
96 *nsendq = kptllnd_count_queue(&peer->peer_sendq);
97 *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
99 spin_unlock(&peer->peer_lock);
107 read_unlock_irqrestore(g_lock, flags);
112 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
114 LASSERT (kptllnd_data.kptl_n_active_peers <
115 kptllnd_data.kptl_expected_peers);
117 LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
118 peer->peer_state == PEER_STATE_ACTIVE);
120 kptllnd_data.kptl_n_active_peers++;
121 atomic_inc(&peer->peer_refcount); /* +1 ref for the list */
123 /* NB add to HEAD of peer list for MRU order!
124 * (see kptllnd_cull_peertable) */
125 list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
129 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
131 /* I'm about to add a new peer with this portals ID to the peer table,
132 * so (a) this peer should not exist already and (b) I want to leave at
133 * most (max_procs_per_nid - 1) peers with this NID in the table. */
134 struct list_head *peers = kptllnd_nid2peerlist(pid.nid);
135 int cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
137 struct list_head *tmp;
138 struct list_head *nxt;
142 list_for_each_safe (tmp, nxt, peers) {
143 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
145 peer = list_entry(tmp, kptl_peer_t, peer_list);
147 if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
150 LASSERT (peer->peer_id.pid != pid.pid);
154 if (count < cull_count) /* recent (don't cull) */
157 CDEBUG(D_NET, "Cull %s(%s)\n",
158 libcfs_id2str(peer->peer_id),
159 kptllnd_ptlid2str(peer->peer_ptlid));
161 kptllnd_peer_close_locked(peer, 0);
166 kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
171 LIBCFS_ALLOC(peer, sizeof (*peer));
173 CERROR("Can't create peer %s (%s)\n",
175 kptllnd_ptlid2str(ppid));
179 memset(peer, 0, sizeof(*peer)); /* zero flags etc */
181 INIT_LIST_HEAD (&peer->peer_noops);
182 INIT_LIST_HEAD (&peer->peer_sendq);
183 INIT_LIST_HEAD (&peer->peer_activeq);
184 spin_lock_init (&peer->peer_lock);
186 peer->peer_state = PEER_STATE_ALLOCATED;
187 peer->peer_error = 0;
188 peer->peer_last_alive = 0;
189 peer->peer_id = lpid;
190 peer->peer_ptlid = ppid;
191 peer->peer_credits = 1; /* enough for HELLO */
192 peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
193 peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
194 peer->peer_sent_credits = 1; /* HELLO credit is implicit */
195 peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
197 atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */
199 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
201 peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
203 /* Only increase # peers under lock, to guarantee we dont grow it
205 if (net->net_shutdown) {
206 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
207 LIBCFS_FREE(peer, sizeof(*peer));
211 kptllnd_data.kptl_npeers++;
212 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
217 kptllnd_peer_destroy (kptl_peer_t *peer)
221 CDEBUG(D_NET, "Peer=%p\n", peer);
223 LASSERT (!in_interrupt());
224 LASSERT (atomic_read(&peer->peer_refcount) == 0);
225 LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
226 peer->peer_state == PEER_STATE_ZOMBIE);
227 LASSERT (list_empty(&peer->peer_noops));
228 LASSERT (list_empty(&peer->peer_sendq));
229 LASSERT (list_empty(&peer->peer_activeq));
231 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
233 if (peer->peer_state == PEER_STATE_ZOMBIE)
234 list_del(&peer->peer_list);
236 kptllnd_data.kptl_npeers--;
238 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
240 LIBCFS_FREE (peer, sizeof (*peer));
244 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
246 struct list_head *tmp;
247 struct list_head *nxt;
250 list_for_each_safe (tmp, nxt, peerq) {
251 tx = list_entry(tmp, kptl_tx_t, tx_list);
253 list_del(&tx->tx_list);
254 list_add_tail(&tx->tx_list, txs);
256 tx->tx_status = -EIO;
262 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
266 spin_lock_irqsave(&peer->peer_lock, flags);
268 kptllnd_cancel_txlist(&peer->peer_noops, txs);
269 kptllnd_cancel_txlist(&peer->peer_sendq, txs);
270 kptllnd_cancel_txlist(&peer->peer_activeq, txs);
272 spin_unlock_irqrestore(&peer->peer_lock, flags);
276 kptllnd_peer_alive (kptl_peer_t *peer)
278 /* This is racy, but everyone's only writing cfs_time_current() */
279 peer->peer_last_alive = cfs_time_current();
284 kptllnd_peer_notify (kptl_peer_t *peer)
292 cfs_time_t last_alive = 0;
294 spin_lock_irqsave(&peer->peer_lock, flags);
296 if (peer->peer_error != 0) {
297 error = peer->peer_error;
298 peer->peer_error = 0;
299 last_alive = peer->peer_last_alive;
302 spin_unlock_irqrestore(&peer->peer_lock, flags);
307 read_lock(&kptllnd_data.kptl_net_rw_lock);
308 list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
310 read_unlock(&kptllnd_data.kptl_net_rw_lock);
312 if (nnets == 0) /* shutdown in progress */
315 LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
317 CERROR("Failed to allocate nets[%d]\n", nnets);
320 memset(nets, 0, nnets * sizeof(*nets));
322 read_lock(&kptllnd_data.kptl_net_rw_lock);
324 list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
327 kptllnd_net_addref(net);
330 read_unlock(&kptllnd_data.kptl_net_rw_lock);
332 for (i = 0; i < nnets; i++) {
339 if (!net->net_shutdown) {
340 peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
341 peer->peer_ptlid.nid);
342 lnet_notify(net->net_ni, peer_nid, 0, last_alive);
345 kptllnd_net_decref(net);
348 LIBCFS_FREE(nets, nnets * sizeof(*nets));
352 kptllnd_handle_closing_peers ()
355 struct list_head txs;
357 struct list_head *tmp;
358 struct list_head *nxt;
362 /* Check with a read lock first to avoid blocking anyone */
364 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
365 idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
366 list_empty(&kptllnd_data.kptl_zombie_peers);
367 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
372 INIT_LIST_HEAD(&txs);
374 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
376 /* Cancel txs on all zombie peers. NB anyone dropping the last peer
377 * ref removes it from this list, so I musn't drop the lock while
379 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
380 peer = list_entry (tmp, kptl_peer_t, peer_list);
382 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
384 kptllnd_peer_cancel_txs(peer, &txs);
387 /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB
388 * I'm the only one removing from this list, but peers can be added on
389 * the end any time I drop the lock. */
391 list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
392 peer = list_entry (tmp, kptl_peer_t, peer_list);
394 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
396 list_del(&peer->peer_list);
397 list_add_tail(&peer->peer_list,
398 &kptllnd_data.kptl_zombie_peers);
399 peer->peer_state = PEER_STATE_ZOMBIE;
401 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
403 kptllnd_peer_notify(peer);
404 kptllnd_peer_cancel_txs(peer, &txs);
405 kptllnd_peer_decref(peer);
407 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
410 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
412 /* Drop peer's ref on all cancelled txs. This will get
413 * kptllnd_tx_fini() to abort outstanding comms if necessary. */
415 list_for_each_safe (tmp, nxt, &txs) {
416 tx = list_entry(tmp, kptl_tx_t, tx_list);
417 list_del(&tx->tx_list);
418 kptllnd_tx_decref(tx);
423 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
425 switch (peer->peer_state) {
429 case PEER_STATE_WAITING_HELLO:
430 case PEER_STATE_ACTIVE:
431 /* Ensure new peers see a new incarnation of me */
432 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
433 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
434 kptllnd_data.kptl_incarnation++;
436 /* Removing from peer table */
437 kptllnd_data.kptl_n_active_peers--;
438 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
440 list_del(&peer->peer_list);
441 kptllnd_peer_unreserve_buffers();
443 peer->peer_error = why; /* stash 'why' only on first close */
444 peer->peer_state = PEER_STATE_CLOSING;
446 /* Schedule for immediate attention, taking peer table's ref */
447 list_add_tail(&peer->peer_list,
448 &kptllnd_data.kptl_closing_peers);
449 wake_up(&kptllnd_data.kptl_watchdog_waitq);
452 case PEER_STATE_ZOMBIE:
453 case PEER_STATE_CLOSING:
459 kptllnd_peer_close(kptl_peer_t *peer, int why)
463 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
464 kptllnd_peer_close_locked(peer, why);
465 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
469 kptllnd_peer_del(lnet_process_id_t id)
471 struct list_head *ptmp;
472 struct list_head *pnxt;
481 * Find the single bucket we are supposed to look at or if nid is a
482 * wildcard (LNET_NID_ANY) then look at all of the buckets
484 if (id.nid != LNET_NID_ANY) {
485 struct list_head *l = kptllnd_nid2peerlist(id.nid);
487 lo = hi = l - kptllnd_data.kptl_peers;
489 if (id.pid != LNET_PID_ANY)
493 hi = kptllnd_data.kptl_peer_hash_size - 1;
497 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
499 for (i = lo; i <= hi; i++) {
500 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
501 peer = list_entry (ptmp, kptl_peer_t, peer_list);
503 if (!(id.nid == LNET_NID_ANY ||
504 (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
505 (id.pid == LNET_PID_ANY ||
506 peer->peer_id.pid == id.pid))))
509 kptllnd_peer_addref(peer); /* 1 ref for me... */
511 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
514 kptllnd_peer_close(peer, 0);
515 kptllnd_peer_decref(peer); /* ...until here */
517 rc = 0; /* matched something */
519 /* start again now I've dropped the lock */
524 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
530 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
532 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
535 spin_lock_irqsave(&peer->peer_lock, flags);
537 /* Ensure HELLO is sent first */
538 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
539 list_add(&tx->tx_list, &peer->peer_noops);
540 else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
541 list_add(&tx->tx_list, &peer->peer_sendq);
543 list_add_tail(&tx->tx_list, &peer->peer_sendq);
545 spin_unlock_irqrestore(&peer->peer_lock, flags);
550 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
552 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
553 ptl_handle_md_t msg_mdh;
557 LASSERT (!tx->tx_idle);
558 LASSERT (!tx->tx_active);
559 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
560 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
561 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
562 tx->tx_type == TX_TYPE_PUT_REQUEST ||
563 tx->tx_type == TX_TYPE_GET_REQUEST);
565 kptllnd_set_tx_peer(tx, peer);
567 memset(&md, 0, sizeof(md));
569 md.threshold = tx->tx_acked ? 2 : 1; /* SEND END + ACK? */
570 md.options = PTL_MD_OP_PUT |
571 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
572 PTL_MD_EVENT_START_DISABLE;
573 md.user_ptr = &tx->tx_msg_eventarg;
574 md.eq_handle = kptllnd_data.kptl_eqh;
577 md.start = tx->tx_msg;
578 md.length = tx->tx_msg->ptlm_nob;
581 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
583 md.start = tx->tx_frags;
585 md.options |= PTL_MD_IOVEC;
588 prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
590 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
591 libcfs_id2str(peer->peer_id),
592 kptllnd_errtype2str(prc), prc);
593 tx->tx_status = -EIO;
594 kptllnd_tx_decref(tx);
599 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
601 tx->tx_msg_mdh = msg_mdh;
602 kptllnd_queue_tx(peer, tx);
605 /* NB "restarts" comes from peer_sendq of a single peer */
607 kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target, struct list_head *restarts)
613 LASSERT (!list_empty(restarts));
615 if (kptllnd_find_target(net, target, &peer) != 0)
618 list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
619 LASSERT (tx->tx_peer != NULL);
620 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
621 tx->tx_type == TX_TYPE_PUT_REQUEST ||
622 tx->tx_type == TX_TYPE_SMALL_MESSAGE);
624 list_del_init(&tx->tx_list);
627 tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
628 kptllnd_tx_decref(tx);
632 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
635 kptllnd_peer_decref(tx->tx_peer);
637 kptllnd_set_tx_peer(tx, peer);
638 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
644 kptllnd_peer_check_sends(peer);
645 kptllnd_peer_decref(peer);
649 kptllnd_peer_send_noop (kptl_peer_t *peer)
651 if (!peer->peer_sent_hello ||
652 peer->peer_credits == 0 ||
653 !list_empty(&peer->peer_noops) ||
654 peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
657 /* No tx to piggyback NOOP onto or no credit to send a tx */
658 return (list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
662 kptllnd_peer_check_sends (kptl_peer_t *peer)
670 LASSERT(!in_interrupt());
672 spin_lock_irqsave(&peer->peer_lock, flags);
674 peer->peer_retry_noop = 0;
676 if (kptllnd_peer_send_noop(peer)) {
677 /* post a NOOP to return credits */
678 spin_unlock_irqrestore(&peer->peer_lock, flags);
680 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
682 CERROR("Can't return credits to %s: can't allocate descriptor\n",
683 libcfs_id2str(peer->peer_id));
685 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
687 kptllnd_post_tx(peer, tx, 0);
690 spin_lock_irqsave(&peer->peer_lock, flags);
691 peer->peer_retry_noop = (tx == NULL);
695 if (!list_empty(&peer->peer_noops)) {
696 LASSERT (peer->peer_sent_hello);
697 tx = list_entry(peer->peer_noops.next,
699 } else if (!list_empty(&peer->peer_sendq)) {
700 tx = list_entry(peer->peer_sendq.next,
703 /* nothing to send right now */
707 LASSERT (tx->tx_active);
708 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
709 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
711 LASSERT (peer->peer_outstanding_credits >= 0);
712 LASSERT (peer->peer_sent_credits >= 0);
713 LASSERT (peer->peer_sent_credits +
714 peer->peer_outstanding_credits <=
715 *kptllnd_tunables.kptl_peertxcredits);
716 LASSERT (peer->peer_credits >= 0);
718 msg_type = tx->tx_msg->ptlm_type;
720 /* Ensure HELLO is sent first */
721 if (!peer->peer_sent_hello) {
722 LASSERT (list_empty(&peer->peer_noops));
723 if (msg_type != PTLLND_MSG_TYPE_HELLO)
725 peer->peer_sent_hello = 1;
728 if (peer->peer_credits == 0) {
729 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
730 libcfs_id2str(peer->peer_id),
732 peer->peer_outstanding_credits,
733 peer->peer_sent_credits,
734 kptllnd_msgtype2str(msg_type), tx);
738 /* Last/Initial credit reserved for NOOP/HELLO */
739 if (peer->peer_credits == 1 &&
740 msg_type != PTLLND_MSG_TYPE_HELLO &&
741 msg_type != PTLLND_MSG_TYPE_NOOP) {
742 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
743 "not using last credit for %s[%p]\n",
744 libcfs_id2str(peer->peer_id),
746 peer->peer_outstanding_credits,
747 peer->peer_sent_credits,
748 kptllnd_msgtype2str(msg_type), tx);
752 list_del(&tx->tx_list);
754 /* Discard any NOOP I queued if I'm not at the high-water mark
755 * any more or more messages have been queued */
756 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
757 !kptllnd_peer_send_noop(peer)) {
760 spin_unlock_irqrestore(&peer->peer_lock, flags);
762 CDEBUG(D_NET, "%s: redundant noop\n",
763 libcfs_id2str(peer->peer_id));
764 kptllnd_tx_decref(tx);
766 spin_lock_irqsave(&peer->peer_lock, flags);
770 /* fill last-minute msg fields */
771 kptllnd_msg_pack(tx->tx_msg, peer);
773 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
774 tx->tx_type == TX_TYPE_GET_REQUEST) {
775 /* peer_next_matchbits must be known good */
776 LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
777 /* Assume 64-bit matchbits can't wrap */
778 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
779 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
780 peer->peer_next_matchbits++;
783 peer->peer_sent_credits += peer->peer_outstanding_credits;
784 peer->peer_outstanding_credits = 0;
785 peer->peer_credits--;
787 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
788 libcfs_id2str(peer->peer_id), peer->peer_credits,
789 peer->peer_outstanding_credits, peer->peer_sent_credits,
790 kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
791 tx->tx_msg->ptlm_credits);
793 list_add_tail(&tx->tx_list, &peer->peer_activeq);
795 kptllnd_tx_addref(tx); /* 1 ref for me... */
797 spin_unlock_irqrestore(&peer->peer_lock, flags);
799 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
800 tx->tx_type == TX_TYPE_GET_REQUEST) {
801 /* Post bulk now we have safe matchbits */
802 rc = PtlMEAttach(kptllnd_data.kptl_nih,
803 *kptllnd_tunables.kptl_portal,
805 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
811 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
812 libcfs_id2str(peer->peer_id),
813 kptllnd_errtype2str(rc), rc);
817 rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
820 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
821 libcfs_id2str(tx->tx_peer->peer_id),
822 kptllnd_errtype2str(rc), rc);
823 rc = PtlMEUnlink(meh);
824 LASSERT(rc == PTL_OK);
825 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
828 /* I'm not racing with the event callback here. It's a
829 * bug if there's an event on the MD I just attached
830 * before I actually send the RDMA request message -
831 * probably matchbits re-used in error. */
834 tx->tx_tposted = jiffies; /* going on the wire */
836 rc = PtlPut (tx->tx_msg_mdh,
837 tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
839 *kptllnd_tunables.kptl_portal,
843 0); /* header data */
845 CERROR("PtlPut %s error %s(%d)\n",
846 libcfs_id2str(peer->peer_id),
847 kptllnd_errtype2str(rc), rc);
851 kptllnd_tx_decref(tx); /* drop my ref */
853 spin_lock_irqsave(&peer->peer_lock, flags);
856 spin_unlock_irqrestore(&peer->peer_lock, flags);
860 /* Nuke everything (including tx we were trying) */
861 kptllnd_peer_close(peer, -EIO);
862 kptllnd_tx_decref(tx);
863 kptllnd_schedule_ptltrace_dump();
867 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
870 struct list_head *ele;
872 list_for_each(ele, &peer->peer_sendq) {
873 tx = list_entry(ele, kptl_tx_t, tx_list);
875 if (time_after_eq(jiffies, tx->tx_deadline)) {
876 kptllnd_tx_addref(tx);
881 list_for_each(ele, &peer->peer_activeq) {
882 tx = list_entry(ele, kptl_tx_t, tx_list);
884 if (time_after_eq(jiffies, tx->tx_deadline)) {
885 kptllnd_tx_addref(tx);
895 kptllnd_peer_check_bucket (int idx, int stamp)
897 struct list_head *peers = &kptllnd_data.kptl_peers[idx];
901 CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
904 /* NB. Shared lock while I just look */
905 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
907 list_for_each_entry (peer, peers, peer_list) {
910 int c = -1, oc = -1, sc = -1;
911 int nsend = -1, nactive = -1;
912 int sent_hello = -1, state = -1;
914 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
915 libcfs_id2str(peer->peer_id), peer->peer_credits,
916 peer->peer_outstanding_credits, peer->peer_sent_credits);
918 spin_lock(&peer->peer_lock);
920 if (peer->peer_check_stamp == stamp) {
921 /* checked already this pass */
922 spin_unlock(&peer->peer_lock);
926 peer->peer_check_stamp = stamp;
927 tx = kptllnd_find_timed_out_tx(peer);
928 check_sends = peer->peer_retry_noop;
931 c = peer->peer_credits;
932 sc = peer->peer_sent_credits;
933 oc = peer->peer_outstanding_credits;
934 state = peer->peer_state;
935 sent_hello = peer->peer_sent_hello;
936 nsend = kptllnd_count_queue(&peer->peer_sendq);
937 nactive = kptllnd_count_queue(&peer->peer_activeq);
940 spin_unlock(&peer->peer_lock);
942 if (tx == NULL && !check_sends)
945 kptllnd_peer_addref(peer); /* 1 ref for me... */
947 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
949 if (tx == NULL) { /* nothing timed out */
950 kptllnd_peer_check_sends(peer);
951 kptllnd_peer_decref(peer); /* ...until here or... */
953 /* rescan after dropping the lock */
957 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
958 libcfs_id2str(peer->peer_id),
959 (tx->tx_tposted == 0) ?
960 "no free peer buffers" :
961 "please check Portals");
963 if (tx->tx_tposted) {
964 CERROR("Could not send to %s after %ds (sent %lds ago); "
965 "check Portals for possible issues\n",
966 libcfs_id2str(peer->peer_id),
967 *kptllnd_tunables.kptl_timeout,
968 cfs_duration_sec(jiffies - tx->tx_tposted));
969 } else if (state < PEER_STATE_ACTIVE) {
970 CERROR("Could not connect %s (%d) after %ds; "
971 "peer might be down\n",
972 libcfs_id2str(peer->peer_id), state,
973 *kptllnd_tunables.kptl_timeout);
975 CERROR("Could not get credits for %s after %ds; "
976 "possible Lustre networking issues\n",
977 libcfs_id2str(peer->peer_id),
978 *kptllnd_tunables.kptl_timeout);
981 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
982 "state %d, sent_hello %d, sendq %d, activeq %d "
983 "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
984 libcfs_id2str(peer->peer_id), c, oc, sc,
985 state, sent_hello, nsend, nactive,
986 tx, kptllnd_tx_typestr(tx->tx_type),
987 kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
988 tx->tx_active ? "A" : "",
989 PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
991 PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
994 (tx->tx_tposted == 0) ? "not " : "",
995 (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
996 *kptllnd_tunables.kptl_timeout);
999 if (*kptllnd_tunables.kptl_ptltrace_on_timeout)
1000 kptllnd_dump_ptltrace();
1003 kptllnd_tx_decref(tx);
1005 kptllnd_peer_close(peer, -ETIMEDOUT);
1006 kptllnd_peer_decref(peer); /* ...until here */
1008 /* start again now I've dropped the lock */
1012 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
1016 kptllnd_id2peer_locked (lnet_process_id_t id)
1018 struct list_head *peers = kptllnd_nid2peerlist(id.nid);
1019 struct list_head *tmp;
1022 list_for_each (tmp, peers) {
1023 peer = list_entry (tmp, kptl_peer_t, peer_list);
1025 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
1026 peer->peer_state == PEER_STATE_ACTIVE);
1028 /* NB logical LNet peers share one kptl_peer_t */
1029 if (peer->peer_id.pid != id.pid ||
1030 LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
1033 kptllnd_peer_addref(peer);
1035 CDEBUG(D_NET, "%s -> %s (%d)\n",
1037 kptllnd_ptlid2str(peer->peer_ptlid),
1038 atomic_read (&peer->peer_refcount));
1046 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
1048 LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
1049 "messages may be dropped\n",
1050 str, libcfs_id2str(id),
1051 kptllnd_data.kptl_n_active_peers);
1052 LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1053 "'max_nodes' or 'max_procs_per_node'\n");
1057 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1060 struct list_head *tmp;
1062 /* Find the last matchbits I saw this new peer using. Note..
1063 A. This peer cannot be in the peer table - she's new!
1064 B. If I can't find the peer in the closing/zombie peers, all
1065 matchbits are safe because all refs to the (old) peer have gone
1066 so all txs have completed so there's no risk of matchbit
1070 LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1072 /* peer's last matchbits can't change after it comes out of the peer
1073 * table, so first match is fine */
1075 list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1076 peer = list_entry (tmp, kptl_peer_t, peer_list);
1078 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1079 peer->peer_id.pid == lpid.pid)
1080 return peer->peer_last_matchbits_seen;
1083 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1084 peer = list_entry (tmp, kptl_peer_t, peer_list);
1086 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1087 peer->peer_id.pid == lpid.pid)
1088 return peer->peer_last_matchbits_seen;
1091 return PTL_RESERVED_MATCHBITS;
1095 kptllnd_peer_handle_hello (kptl_net_t *net,
1096 ptl_process_id_t initiator, kptl_msg_t *msg)
1098 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1100 kptl_peer_t *new_peer;
1101 lnet_process_id_t lpid;
1102 unsigned long flags;
1103 kptl_tx_t *hello_tx;
1105 __u64 safe_matchbits;
1106 __u64 last_matchbits_seen;
1108 lpid.nid = msg->ptlm_srcnid;
1109 lpid.pid = msg->ptlm_srcpid;
1111 CDEBUG(D_NET, "hello from %s(%s)\n",
1112 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1114 if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1115 (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1116 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1117 * userspace. Refuse the connection if she hasn't set the
1118 * correct flag in her PID... */
1119 CERROR("Userflag not set in hello from %s (%s)\n",
1120 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1124 /* kptlhm_matchbits are the highest matchbits my peer may have used to
1125 * RDMA to me. I ensure I never register buffers for RDMA that could
1126 * match any she used */
1127 safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1129 if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1130 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1131 safe_matchbits, libcfs_id2str(lpid));
1135 if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1136 CERROR("%s: max message size %d < MIN %d",
1137 libcfs_id2str(lpid),
1138 msg->ptlm_u.hello.kptlhm_max_msg_size,
1139 PTLLND_MIN_BUFFER_SIZE);
1143 if (msg->ptlm_credits <= 1) {
1144 CERROR("Need more than 1+%d credits from %s\n",
1145 msg->ptlm_credits, libcfs_id2str(lpid));
1149 write_lock_irqsave(g_lock, flags);
1151 peer = kptllnd_id2peer_locked(lpid);
1153 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1154 /* Completing HELLO handshake */
1155 LASSERT(peer->peer_incarnation == 0);
1157 if (msg->ptlm_dststamp != 0 &&
1158 msg->ptlm_dststamp != peer->peer_myincarnation) {
1159 write_unlock_irqrestore(g_lock, flags);
1161 CERROR("Ignoring HELLO from %s: unexpected "
1162 "dststamp "LPX64" ("LPX64" wanted)\n",
1163 libcfs_id2str(lpid),
1165 peer->peer_myincarnation);
1166 kptllnd_peer_decref(peer);
1170 /* Concurrent initiation or response to my HELLO */
1171 peer->peer_state = PEER_STATE_ACTIVE;
1172 peer->peer_incarnation = msg->ptlm_srcstamp;
1173 peer->peer_next_matchbits = safe_matchbits;
1174 peer->peer_max_msg_size =
1175 msg->ptlm_u.hello.kptlhm_max_msg_size;
1177 write_unlock_irqrestore(g_lock, flags);
1181 if (msg->ptlm_dststamp != 0 &&
1182 msg->ptlm_dststamp <= peer->peer_myincarnation) {
1183 write_unlock_irqrestore(g_lock, flags);
1185 CERROR("Ignoring stale HELLO from %s: "
1186 "dststamp "LPX64" (current "LPX64")\n",
1187 libcfs_id2str(lpid),
1189 peer->peer_myincarnation);
1190 kptllnd_peer_decref(peer);
1194 /* Brand new connection attempt: remove old incarnation */
1195 kptllnd_peer_close_locked(peer, 0);
1198 kptllnd_cull_peertable_locked(lpid);
1200 write_unlock_irqrestore(g_lock, flags);
1203 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1204 " stamp "LPX64"("LPX64")\n",
1205 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1206 msg->ptlm_srcstamp, peer->peer_incarnation);
1208 kptllnd_peer_decref(peer);
1212 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1213 if (hello_tx == NULL) {
1214 CERROR("Unable to allocate HELLO message for %s\n",
1215 libcfs_id2str(lpid));
1219 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1220 lpid, sizeof(kptl_hello_msg_t));
1222 new_peer = kptllnd_peer_allocate(net, lpid, initiator);
1223 if (new_peer == NULL) {
1224 kptllnd_tx_decref(hello_tx);
1228 rc = kptllnd_peer_reserve_buffers();
1230 kptllnd_peer_decref(new_peer);
1231 kptllnd_tx_decref(hello_tx);
1233 CERROR("Failed to reserve buffers for %s\n",
1234 libcfs_id2str(lpid));
1238 write_lock_irqsave(g_lock, flags);
1241 if (net->net_shutdown) {
1242 write_unlock_irqrestore(g_lock, flags);
1244 CERROR ("Shutdown started, refusing connection from %s\n",
1245 libcfs_id2str(lpid));
1246 kptllnd_peer_unreserve_buffers();
1247 kptllnd_peer_decref(new_peer);
1248 kptllnd_tx_decref(hello_tx);
1252 peer = kptllnd_id2peer_locked(lpid);
1254 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1255 /* An outgoing message instantiated 'peer' for me */
1256 LASSERT(peer->peer_incarnation == 0);
1258 peer->peer_state = PEER_STATE_ACTIVE;
1259 peer->peer_incarnation = msg->ptlm_srcstamp;
1260 peer->peer_next_matchbits = safe_matchbits;
1261 peer->peer_max_msg_size =
1262 msg->ptlm_u.hello.kptlhm_max_msg_size;
1264 write_unlock_irqrestore(g_lock, flags);
1266 CWARN("Outgoing instantiated peer %s\n",
1267 libcfs_id2str(lpid));
1269 LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1271 write_unlock_irqrestore(g_lock, flags);
1273 /* WOW! Somehow this peer completed the HELLO
1274 * handshake while I slept. I guess I could have slept
1275 * while it rebooted and sent a new HELLO, so I'll fail
1277 CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1278 kptllnd_peer_decref(peer);
1282 kptllnd_peer_unreserve_buffers();
1283 kptllnd_peer_decref(new_peer);
1284 kptllnd_tx_decref(hello_tx);
1288 if (kptllnd_data.kptl_n_active_peers ==
1289 kptllnd_data.kptl_expected_peers) {
1290 /* peer table full */
1291 write_unlock_irqrestore(g_lock, flags);
1293 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1295 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1297 CERROR("Refusing connection from %s\n",
1298 libcfs_id2str(lpid));
1299 kptllnd_peer_unreserve_buffers();
1300 kptllnd_peer_decref(new_peer);
1301 kptllnd_tx_decref(hello_tx);
1305 write_lock_irqsave(g_lock, flags);
1306 kptllnd_data.kptl_expected_peers++;
1310 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1312 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1313 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1314 *kptllnd_tunables.kptl_max_msg_size;
1316 new_peer->peer_state = PEER_STATE_ACTIVE;
1317 new_peer->peer_incarnation = msg->ptlm_srcstamp;
1318 new_peer->peer_next_matchbits = safe_matchbits;
1319 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1320 new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1322 LASSERT (!net->net_shutdown);
1323 kptllnd_peer_add_peertable_locked(new_peer);
1325 write_unlock_irqrestore(g_lock, flags);
1327 /* NB someone else could get in now and post a message before I post
1328 * the HELLO, but post_tx/check_sends take care of that! */
1330 CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1331 libcfs_id2str(new_peer->peer_id), hello_tx);
1333 kptllnd_post_tx(new_peer, hello_tx, 0);
1334 kptllnd_peer_check_sends(new_peer);
1340 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1342 kptllnd_post_tx(peer, tx, nfrag);
1343 kptllnd_peer_check_sends(peer);
1347 kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
1348 kptl_peer_t **peerp)
1350 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1351 ptl_process_id_t ptl_id;
1352 kptl_peer_t *new_peer;
1353 kptl_tx_t *hello_tx;
1354 unsigned long flags;
1356 __u64 last_matchbits_seen;
1358 /* I expect to find the peer, so I only take a read lock... */
1359 read_lock_irqsave(g_lock, flags);
1360 *peerp = kptllnd_id2peer_locked(target);
1361 read_unlock_irqrestore(g_lock, flags);
1366 if ((target.pid & LNET_PID_USERFLAG) != 0) {
1367 CWARN("Refusing to create a new connection to %s "
1368 "(non-kernel peer)\n", libcfs_id2str(target));
1369 return -EHOSTUNREACH;
1372 /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
1373 * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
1374 ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1375 ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1377 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1378 if (hello_tx == NULL) {
1379 CERROR("Unable to allocate connect message for %s\n",
1380 libcfs_id2str(target));
1384 hello_tx->tx_acked = 1;
1385 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1386 target, sizeof(kptl_hello_msg_t));
1388 new_peer = kptllnd_peer_allocate(net, target, ptl_id);
1389 if (new_peer == NULL) {
1394 rc = kptllnd_peer_reserve_buffers();
1398 write_lock_irqsave(g_lock, flags);
1400 /* Called only in lnd_send which can't happen after lnd_shutdown */
1401 LASSERT (!net->net_shutdown);
1403 *peerp = kptllnd_id2peer_locked(target);
1404 if (*peerp != NULL) {
1405 write_unlock_irqrestore(g_lock, flags);
1409 kptllnd_cull_peertable_locked(target);
1411 if (kptllnd_data.kptl_n_active_peers ==
1412 kptllnd_data.kptl_expected_peers) {
1413 /* peer table full */
1414 write_unlock_irqrestore(g_lock, flags);
1416 kptllnd_peertable_overflow_msg("Connection to ", target);
1418 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1420 CERROR("Can't create connection to %s\n",
1421 libcfs_id2str(target));
1425 write_lock_irqsave(g_lock, flags);
1426 kptllnd_data.kptl_expected_peers++;
1430 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1432 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1433 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1434 *kptllnd_tunables.kptl_max_msg_size;
1436 new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1437 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1439 kptllnd_peer_add_peertable_locked(new_peer);
1441 write_unlock_irqrestore(g_lock, flags);
1443 /* NB someone else could get in now and post a message before I post
1444 * the HELLO, but post_tx/check_sends take care of that! */
1446 CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1447 libcfs_id2str(new_peer->peer_id), hello_tx);
1449 kptllnd_post_tx(new_peer, hello_tx, 0);
1450 kptllnd_peer_check_sends(new_peer);
1456 kptllnd_peer_unreserve_buffers();
1458 kptllnd_peer_decref(new_peer);
1460 kptllnd_tx_decref(hello_tx);