1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ptllnd/ptllnd_peer.c
38 * Author: PJ Kirner <pjkirner@clusterfs.com>
39 * Author: E Barton <eeb@bartonsoftware.com>
43 #include <libcfs/list.h>
46 kptllnd_count_queue(struct list_head *q)
59 kptllnd_get_peer_info(int index,
60 lnet_process_id_t *id,
61 int *state, int *sent_hello,
62 int *refcount, __u64 *incarnation,
63 __u64 *next_matchbits, __u64 *last_matchbits_seen,
64 int *nsendq, int *nactiveq,
65 int *credits, int *outstanding_credits)
67 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
69 struct list_head *ptmp;
74 read_lock_irqsave(g_lock, flags);
76 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
78 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
79 peer = list_entry(ptmp, kptl_peer_t, peer_list);
85 *state = peer->peer_state;
86 *sent_hello = peer->peer_sent_hello;
87 *refcount = atomic_read(&peer->peer_refcount);
88 *incarnation = peer->peer_incarnation;
90 spin_lock(&peer->peer_lock);
92 *next_matchbits = peer->peer_next_matchbits;
93 *last_matchbits_seen = peer->peer_last_matchbits_seen;
94 *credits = peer->peer_credits;
95 *outstanding_credits = peer->peer_outstanding_credits;
97 *nsendq = kptllnd_count_queue(&peer->peer_sendq);
98 *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
100 spin_unlock(&peer->peer_lock);
108 read_unlock_irqrestore(g_lock, flags);
113 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
115 LASSERT (!kptllnd_data.kptl_shutdown);
116 LASSERT (kptllnd_data.kptl_n_active_peers <
117 kptllnd_data.kptl_expected_peers);
119 LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
120 peer->peer_state == PEER_STATE_ACTIVE);
122 kptllnd_data.kptl_n_active_peers++;
123 atomic_inc(&peer->peer_refcount); /* +1 ref for the list */
125 /* NB add to HEAD of peer list for MRU order!
126 * (see kptllnd_cull_peertable) */
127 list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
131 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
133 /* I'm about to add a new peer with this portals ID to the peer table,
134 * so (a) this peer should not exist already and (b) I want to leave at
135 * most (max_procs_per_nid - 1) peers with this NID in the table. */
136 struct list_head *peers = kptllnd_nid2peerlist(pid.nid);
137 int cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
139 struct list_head *tmp;
140 struct list_head *nxt;
144 list_for_each_safe (tmp, nxt, peers) {
145 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
147 peer = list_entry(tmp, kptl_peer_t, peer_list);
149 if (peer->peer_id.nid != pid.nid)
152 LASSERT (peer->peer_id.pid != pid.pid);
156 if (count < cull_count) /* recent (don't cull) */
159 CDEBUG(D_NET, "Cull %s(%s)\n",
160 libcfs_id2str(peer->peer_id),
161 kptllnd_ptlid2str(peer->peer_ptlid));
163 kptllnd_peer_close_locked(peer, 0);
168 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
173 LIBCFS_ALLOC(peer, sizeof (*peer));
175 CERROR("Can't create peer %s (%s)\n",
177 kptllnd_ptlid2str(ppid));
181 memset(peer, 0, sizeof(*peer)); /* zero flags etc */
183 INIT_LIST_HEAD (&peer->peer_noops);
184 INIT_LIST_HEAD (&peer->peer_sendq);
185 INIT_LIST_HEAD (&peer->peer_activeq);
186 spin_lock_init (&peer->peer_lock);
188 peer->peer_state = PEER_STATE_ALLOCATED;
189 peer->peer_error = 0;
190 peer->peer_last_alive = 0;
191 peer->peer_id = lpid;
192 peer->peer_ptlid = ppid;
193 peer->peer_credits = 1; /* enough for HELLO */
194 peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
195 peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
196 peer->peer_sent_credits = 1; /* HELLO credit is implicit */
197 peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
199 atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */
201 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
203 peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
205 /* Only increase # peers under lock, to guarantee we dont grow it
207 if (kptllnd_data.kptl_shutdown) {
208 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
210 LIBCFS_FREE(peer, sizeof(*peer));
214 kptllnd_data.kptl_npeers++;
215 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
221 kptllnd_peer_destroy (kptl_peer_t *peer)
225 CDEBUG(D_NET, "Peer=%p\n", peer);
227 LASSERT (!in_interrupt());
228 LASSERT (atomic_read(&peer->peer_refcount) == 0);
229 LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
230 peer->peer_state == PEER_STATE_ZOMBIE);
231 LASSERT (list_empty(&peer->peer_noops));
232 LASSERT (list_empty(&peer->peer_sendq));
233 LASSERT (list_empty(&peer->peer_activeq));
235 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
237 if (peer->peer_state == PEER_STATE_ZOMBIE)
238 list_del(&peer->peer_list);
240 kptllnd_data.kptl_npeers--;
242 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
244 LIBCFS_FREE (peer, sizeof (*peer));
248 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
250 struct list_head *tmp;
251 struct list_head *nxt;
254 list_for_each_safe (tmp, nxt, peerq) {
255 tx = list_entry(tmp, kptl_tx_t, tx_list);
257 list_del(&tx->tx_list);
258 list_add_tail(&tx->tx_list, txs);
260 tx->tx_status = -EIO;
266 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
270 spin_lock_irqsave(&peer->peer_lock, flags);
272 kptllnd_cancel_txlist(&peer->peer_noops, txs);
273 kptllnd_cancel_txlist(&peer->peer_sendq, txs);
274 kptllnd_cancel_txlist(&peer->peer_activeq, txs);
276 spin_unlock_irqrestore(&peer->peer_lock, flags);
280 kptllnd_peer_alive (kptl_peer_t *peer)
282 /* This is racy, but everyone's only writing cfs_time_current() */
283 peer->peer_last_alive = cfs_time_current();
288 kptllnd_peer_notify (kptl_peer_t *peer)
291 time_t last_alive = 0;
294 spin_lock_irqsave(&peer->peer_lock, flags);
296 if (peer->peer_error != 0) {
297 error = peer->peer_error;
298 peer->peer_error = 0;
300 last_alive = cfs_time_current_sec() -
301 cfs_duration_sec(cfs_time_current() -
302 peer->peer_last_alive);
305 spin_unlock_irqrestore(&peer->peer_lock, flags);
308 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
313 kptllnd_handle_closing_peers ()
316 struct list_head txs;
318 struct list_head *tmp;
319 struct list_head *nxt;
323 /* Check with a read lock first to avoid blocking anyone */
325 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
326 idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
327 list_empty(&kptllnd_data.kptl_zombie_peers);
328 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
333 INIT_LIST_HEAD(&txs);
335 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
337 /* Cancel txs on all zombie peers. NB anyone dropping the last peer
338 * ref removes it from this list, so I musn't drop the lock while
340 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
341 peer = list_entry (tmp, kptl_peer_t, peer_list);
343 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
345 kptllnd_peer_cancel_txs(peer, &txs);
348 /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB
349 * I'm the only one removing from this list, but peers can be added on
350 * the end any time I drop the lock. */
352 list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
353 peer = list_entry (tmp, kptl_peer_t, peer_list);
355 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
357 list_del(&peer->peer_list);
358 list_add_tail(&peer->peer_list,
359 &kptllnd_data.kptl_zombie_peers);
360 peer->peer_state = PEER_STATE_ZOMBIE;
362 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
364 kptllnd_peer_notify(peer);
365 kptllnd_peer_cancel_txs(peer, &txs);
366 kptllnd_peer_decref(peer);
368 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
371 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
373 /* Drop peer's ref on all cancelled txs. This will get
374 * kptllnd_tx_fini() to abort outstanding comms if necessary. */
376 list_for_each_safe (tmp, nxt, &txs) {
377 tx = list_entry(tmp, kptl_tx_t, tx_list);
378 list_del(&tx->tx_list);
379 kptllnd_tx_decref(tx);
384 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
386 switch (peer->peer_state) {
390 case PEER_STATE_WAITING_HELLO:
391 case PEER_STATE_ACTIVE:
392 /* Ensure new peers see a new incarnation of me */
393 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
394 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
395 kptllnd_data.kptl_incarnation++;
397 /* Removing from peer table */
398 kptllnd_data.kptl_n_active_peers--;
399 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
401 list_del(&peer->peer_list);
402 kptllnd_peer_unreserve_buffers();
404 peer->peer_error = why; /* stash 'why' only on first close */
405 peer->peer_state = PEER_STATE_CLOSING;
407 /* Schedule for immediate attention, taking peer table's ref */
408 list_add_tail(&peer->peer_list,
409 &kptllnd_data.kptl_closing_peers);
410 wake_up(&kptllnd_data.kptl_watchdog_waitq);
413 case PEER_STATE_ZOMBIE:
414 case PEER_STATE_CLOSING:
420 kptllnd_peer_close(kptl_peer_t *peer, int why)
424 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
425 kptllnd_peer_close_locked(peer, why);
426 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
430 kptllnd_peer_del(lnet_process_id_t id)
432 struct list_head *ptmp;
433 struct list_head *pnxt;
442 * Find the single bucket we are supposed to look at or if nid is a
443 * wildcard (LNET_NID_ANY) then look at all of the buckets
445 if (id.nid != LNET_NID_ANY) {
446 struct list_head *l = kptllnd_nid2peerlist(id.nid);
448 lo = hi = l - kptllnd_data.kptl_peers;
450 if (id.pid != LNET_PID_ANY)
454 hi = kptllnd_data.kptl_peer_hash_size - 1;
458 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
460 for (i = lo; i <= hi; i++) {
461 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
462 peer = list_entry (ptmp, kptl_peer_t, peer_list);
464 if (!(id.nid == LNET_NID_ANY ||
465 (peer->peer_id.nid == id.nid &&
466 (id.pid == LNET_PID_ANY ||
467 peer->peer_id.pid == id.pid))))
470 kptllnd_peer_addref(peer); /* 1 ref for me... */
472 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
475 kptllnd_peer_close(peer, 0);
476 kptllnd_peer_decref(peer); /* ...until here */
478 rc = 0; /* matched something */
480 /* start again now I've dropped the lock */
485 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
491 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
493 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
496 spin_lock_irqsave(&peer->peer_lock, flags);
498 /* Ensure HELLO is sent first */
499 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
500 list_add(&tx->tx_list, &peer->peer_noops);
501 else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
502 list_add(&tx->tx_list, &peer->peer_sendq);
504 list_add_tail(&tx->tx_list, &peer->peer_sendq);
506 spin_unlock_irqrestore(&peer->peer_lock, flags);
511 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
513 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
514 ptl_handle_md_t msg_mdh;
518 LASSERT (!tx->tx_idle);
519 LASSERT (!tx->tx_active);
520 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
521 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
522 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
523 tx->tx_type == TX_TYPE_PUT_REQUEST ||
524 tx->tx_type == TX_TYPE_GET_REQUEST);
526 kptllnd_set_tx_peer(tx, peer);
528 memset(&md, 0, sizeof(md));
530 md.threshold = tx->tx_acked ? 2 : 1; /* SEND END + ACK? */
531 md.options = PTL_MD_OP_PUT |
532 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
533 PTL_MD_EVENT_START_DISABLE;
534 md.user_ptr = &tx->tx_msg_eventarg;
535 md.eq_handle = kptllnd_data.kptl_eqh;
538 md.start = tx->tx_msg;
539 md.length = tx->tx_msg->ptlm_nob;
542 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
544 md.start = tx->tx_frags;
546 md.options |= PTL_MD_IOVEC;
549 prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
551 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
552 libcfs_id2str(peer->peer_id),
553 kptllnd_errtype2str(prc), prc);
554 tx->tx_status = -EIO;
555 kptllnd_tx_decref(tx);
560 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
562 tx->tx_msg_mdh = msg_mdh;
563 kptllnd_queue_tx(peer, tx);
566 /* NB "restarts" comes from peer_sendq of a single peer */
568 kptllnd_restart_txs (lnet_process_id_t target, struct list_head *restarts)
574 LASSERT (!list_empty(restarts));
576 if (kptllnd_find_target(&peer, target) != 0)
579 list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
580 LASSERT (tx->tx_peer != NULL);
581 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
582 tx->tx_type == TX_TYPE_PUT_REQUEST ||
583 tx->tx_type == TX_TYPE_SMALL_MESSAGE);
585 list_del_init(&tx->tx_list);
588 tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
589 kptllnd_tx_decref(tx);
593 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
596 kptllnd_peer_decref(tx->tx_peer);
598 kptllnd_set_tx_peer(tx, peer);
599 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
605 kptllnd_peer_check_sends(peer);
606 kptllnd_peer_decref(peer);
610 kptllnd_peer_send_noop (kptl_peer_t *peer)
612 if (!peer->peer_sent_hello ||
613 peer->peer_credits == 0 ||
614 !list_empty(&peer->peer_noops) ||
615 peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
618 /* No tx to piggyback NOOP onto or no credit to send a tx */
619 return (list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
623 kptllnd_peer_check_sends (kptl_peer_t *peer)
631 LASSERT(!in_interrupt());
633 spin_lock_irqsave(&peer->peer_lock, flags);
635 peer->peer_retry_noop = 0;
637 if (kptllnd_peer_send_noop(peer)) {
638 /* post a NOOP to return credits */
639 spin_unlock_irqrestore(&peer->peer_lock, flags);
641 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
643 CERROR("Can't return credits to %s: can't allocate descriptor\n",
644 libcfs_id2str(peer->peer_id));
646 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
647 kptllnd_post_tx(peer, tx, 0);
650 spin_lock_irqsave(&peer->peer_lock, flags);
651 peer->peer_retry_noop = (tx == NULL);
655 if (!list_empty(&peer->peer_noops)) {
656 LASSERT (peer->peer_sent_hello);
657 tx = list_entry(peer->peer_noops.next,
659 } else if (!list_empty(&peer->peer_sendq)) {
660 tx = list_entry(peer->peer_sendq.next,
663 /* nothing to send right now */
667 LASSERT (tx->tx_active);
668 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
669 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
671 LASSERT (peer->peer_outstanding_credits >= 0);
672 LASSERT (peer->peer_sent_credits >= 0);
673 LASSERT (peer->peer_sent_credits +
674 peer->peer_outstanding_credits <=
675 *kptllnd_tunables.kptl_peertxcredits);
676 LASSERT (peer->peer_credits >= 0);
678 msg_type = tx->tx_msg->ptlm_type;
680 /* Ensure HELLO is sent first */
681 if (!peer->peer_sent_hello) {
682 LASSERT (list_empty(&peer->peer_noops));
683 if (msg_type != PTLLND_MSG_TYPE_HELLO)
685 peer->peer_sent_hello = 1;
688 if (peer->peer_credits == 0) {
689 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
690 libcfs_id2str(peer->peer_id),
692 peer->peer_outstanding_credits,
693 peer->peer_sent_credits,
694 kptllnd_msgtype2str(msg_type), tx);
698 /* Last/Initial credit reserved for NOOP/HELLO */
699 if (peer->peer_credits == 1 &&
700 msg_type != PTLLND_MSG_TYPE_HELLO &&
701 msg_type != PTLLND_MSG_TYPE_NOOP) {
702 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
703 "not using last credit for %s[%p]\n",
704 libcfs_id2str(peer->peer_id),
706 peer->peer_outstanding_credits,
707 peer->peer_sent_credits,
708 kptllnd_msgtype2str(msg_type), tx);
712 list_del(&tx->tx_list);
714 /* Discard any NOOP I queued if I'm not at the high-water mark
715 * any more or more messages have been queued */
716 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
717 !kptllnd_peer_send_noop(peer)) {
720 spin_unlock_irqrestore(&peer->peer_lock, flags);
722 CDEBUG(D_NET, "%s: redundant noop\n",
723 libcfs_id2str(peer->peer_id));
724 kptllnd_tx_decref(tx);
726 spin_lock_irqsave(&peer->peer_lock, flags);
730 /* fill last-minute msg fields */
731 kptllnd_msg_pack(tx->tx_msg, peer);
733 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
734 tx->tx_type == TX_TYPE_GET_REQUEST) {
735 /* peer_next_matchbits must be known good */
736 LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
737 /* Assume 64-bit matchbits can't wrap */
738 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
739 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
740 peer->peer_next_matchbits++;
743 peer->peer_sent_credits += peer->peer_outstanding_credits;
744 peer->peer_outstanding_credits = 0;
745 peer->peer_credits--;
747 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
748 libcfs_id2str(peer->peer_id), peer->peer_credits,
749 peer->peer_outstanding_credits, peer->peer_sent_credits,
750 kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
751 tx->tx_msg->ptlm_credits);
753 list_add_tail(&tx->tx_list, &peer->peer_activeq);
755 kptllnd_tx_addref(tx); /* 1 ref for me... */
757 spin_unlock_irqrestore(&peer->peer_lock, flags);
759 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
760 tx->tx_type == TX_TYPE_GET_REQUEST) {
761 /* Post bulk now we have safe matchbits */
762 rc = PtlMEAttach(kptllnd_data.kptl_nih,
763 *kptllnd_tunables.kptl_portal,
765 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
771 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
772 libcfs_id2str(peer->peer_id),
773 kptllnd_errtype2str(rc), rc);
777 rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
780 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
781 libcfs_id2str(tx->tx_peer->peer_id),
782 kptllnd_errtype2str(rc), rc);
783 rc = PtlMEUnlink(meh);
784 LASSERT(rc == PTL_OK);
785 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
788 /* I'm not racing with the event callback here. It's a
789 * bug if there's an event on the MD I just attached
790 * before I actually send the RDMA request message -
791 * probably matchbits re-used in error. */
794 tx->tx_tposted = jiffies; /* going on the wire */
796 rc = PtlPut (tx->tx_msg_mdh,
797 tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
799 *kptllnd_tunables.kptl_portal,
803 0); /* header data */
805 CERROR("PtlPut %s error %s(%d)\n",
806 libcfs_id2str(peer->peer_id),
807 kptllnd_errtype2str(rc), rc);
811 kptllnd_tx_decref(tx); /* drop my ref */
813 spin_lock_irqsave(&peer->peer_lock, flags);
816 spin_unlock_irqrestore(&peer->peer_lock, flags);
820 /* Nuke everything (including tx we were trying) */
821 kptllnd_peer_close(peer, -EIO);
822 kptllnd_tx_decref(tx);
823 kptllnd_schedule_ptltrace_dump();
827 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
830 struct list_head *ele;
832 list_for_each(ele, &peer->peer_sendq) {
833 tx = list_entry(ele, kptl_tx_t, tx_list);
835 if (time_after_eq(jiffies, tx->tx_deadline)) {
836 kptllnd_tx_addref(tx);
841 list_for_each(ele, &peer->peer_activeq) {
842 tx = list_entry(ele, kptl_tx_t, tx_list);
844 if (time_after_eq(jiffies, tx->tx_deadline)) {
845 kptllnd_tx_addref(tx);
855 kptllnd_peer_check_bucket (int idx, int stamp)
857 struct list_head *peers = &kptllnd_data.kptl_peers[idx];
858 struct list_head *ptmp;
866 CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
869 /* NB. Shared lock while I just look */
870 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
872 list_for_each (ptmp, peers) {
873 peer = list_entry (ptmp, kptl_peer_t, peer_list);
875 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
876 libcfs_id2str(peer->peer_id), peer->peer_credits,
877 peer->peer_outstanding_credits, peer->peer_sent_credits);
879 spin_lock(&peer->peer_lock);
881 if (peer->peer_check_stamp == stamp) {
882 /* checked already this pass */
883 spin_unlock(&peer->peer_lock);
887 peer->peer_check_stamp = stamp;
888 tx = kptllnd_find_timed_out_tx(peer);
889 check_sends = peer->peer_retry_noop;
891 spin_unlock(&peer->peer_lock);
893 if (tx == NULL && !check_sends)
896 kptllnd_peer_addref(peer); /* 1 ref for me... */
898 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
900 if (tx == NULL) { /* nothing timed out */
901 kptllnd_peer_check_sends(peer);
902 kptllnd_peer_decref(peer); /* ...until here or... */
904 /* rescan after dropping the lock */
908 spin_lock_irqsave(&peer->peer_lock, flags);
909 nsend = kptllnd_count_queue(&peer->peer_sendq);
910 nactive = kptllnd_count_queue(&peer->peer_activeq);
911 spin_unlock_irqrestore(&peer->peer_lock, flags);
913 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
914 libcfs_id2str(peer->peer_id),
915 (tx->tx_tposted == 0) ?
916 "no free peer buffers" :
917 "please check Portals");
919 if (tx->tx_tposted) {
920 CERROR("Could not send to %s after %ds (sent %lds ago); "
921 "check Portals for possible issues\n",
922 libcfs_id2str(peer->peer_id),
923 *kptllnd_tunables.kptl_timeout,
924 cfs_duration_sec(jiffies - tx->tx_tposted));
926 CERROR("Could not get credits for %s after %ds; "
927 "possible Lustre networking issues\n",
928 libcfs_id2str(peer->peer_id),
929 *kptllnd_tunables.kptl_timeout);
932 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
933 "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
934 "%sposted %lu T/O %ds\n",
935 libcfs_id2str(peer->peer_id), peer->peer_credits,
936 peer->peer_outstanding_credits, peer->peer_sent_credits,
937 nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
938 tx->tx_active ? "A" : "",
939 PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
941 PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
944 (tx->tx_tposted == 0) ? "not " : "",
945 (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
946 *kptllnd_tunables.kptl_timeout);
949 if (*kptllnd_tunables.kptl_ptltrace_on_timeout)
950 kptllnd_dump_ptltrace();
953 kptllnd_tx_decref(tx);
955 kptllnd_peer_close(peer, -ETIMEDOUT);
956 kptllnd_peer_decref(peer); /* ...until here */
958 /* start again now I've dropped the lock */
962 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
966 kptllnd_id2peer_locked (lnet_process_id_t id)
968 struct list_head *peers = kptllnd_nid2peerlist(id.nid);
969 struct list_head *tmp;
972 list_for_each (tmp, peers) {
974 peer = list_entry (tmp, kptl_peer_t, peer_list);
976 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
977 peer->peer_state == PEER_STATE_ACTIVE);
979 if (peer->peer_id.nid != id.nid ||
980 peer->peer_id.pid != id.pid)
983 kptllnd_peer_addref(peer);
985 CDEBUG(D_NET, "%s -> %s (%d)\n",
987 kptllnd_ptlid2str(peer->peer_ptlid),
988 atomic_read (&peer->peer_refcount));
996 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
998 LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
999 "messages may be dropped\n",
1000 str, libcfs_id2str(id),
1001 kptllnd_data.kptl_n_active_peers);
1002 LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1003 "'max_nodes' or 'max_procs_per_node'\n");
1007 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1010 struct list_head *tmp;
1012 /* Find the last matchbits I saw this new peer using. Note..
1013 A. This peer cannot be in the peer table - she's new!
1014 B. If I can't find the peer in the closing/zombie peers, all
1015 matchbits are safe because all refs to the (old) peer have gone
1016 so all txs have completed so there's no risk of matchbit
1020 LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1022 /* peer's last matchbits can't change after it comes out of the peer
1023 * table, so first match is fine */
1025 list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1026 peer = list_entry (tmp, kptl_peer_t, peer_list);
1028 if (peer->peer_id.nid == lpid.nid &&
1029 peer->peer_id.pid == lpid.pid)
1030 return peer->peer_last_matchbits_seen;
1033 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1034 peer = list_entry (tmp, kptl_peer_t, peer_list);
1036 if (peer->peer_id.nid == lpid.nid &&
1037 peer->peer_id.pid == lpid.pid)
1038 return peer->peer_last_matchbits_seen;
1041 return PTL_RESERVED_MATCHBITS;
1045 kptllnd_peer_handle_hello (ptl_process_id_t initiator,
1048 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1050 kptl_peer_t *new_peer;
1051 lnet_process_id_t lpid;
1052 unsigned long flags;
1053 kptl_tx_t *hello_tx;
1055 __u64 safe_matchbits;
1056 __u64 last_matchbits_seen;
1058 lpid.nid = msg->ptlm_srcnid;
1059 lpid.pid = msg->ptlm_srcpid;
1061 CDEBUG(D_NET, "hello from %s(%s)\n",
1062 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1064 if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1065 (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1066 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1067 * userspace. Refuse the connection if she hasn't set the
1068 * correct flag in her PID... */
1069 CERROR("Userflag not set in hello from %s (%s)\n",
1070 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1074 /* kptlhm_matchbits are the highest matchbits my peer may have used to
1075 * RDMA to me. I ensure I never register buffers for RDMA that could
1076 * match any she used */
1077 safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1079 if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1080 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1081 safe_matchbits, libcfs_id2str(lpid));
1085 if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1086 CERROR("%s: max message size %d < MIN %d",
1087 libcfs_id2str(lpid),
1088 msg->ptlm_u.hello.kptlhm_max_msg_size,
1089 PTLLND_MIN_BUFFER_SIZE);
1093 if (msg->ptlm_credits <= 1) {
1094 CERROR("Need more than 1+%d credits from %s\n",
1095 msg->ptlm_credits, libcfs_id2str(lpid));
1099 write_lock_irqsave(g_lock, flags);
1101 peer = kptllnd_id2peer_locked(lpid);
1103 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1104 /* Completing HELLO handshake */
1105 LASSERT(peer->peer_incarnation == 0);
1107 if (msg->ptlm_dststamp != 0 &&
1108 msg->ptlm_dststamp != peer->peer_myincarnation) {
1109 write_unlock_irqrestore(g_lock, flags);
1111 CERROR("Ignoring HELLO from %s: unexpected "
1112 "dststamp "LPX64" ("LPX64" wanted)\n",
1113 libcfs_id2str(lpid),
1115 peer->peer_myincarnation);
1116 kptllnd_peer_decref(peer);
1120 /* Concurrent initiation or response to my HELLO */
1121 peer->peer_state = PEER_STATE_ACTIVE;
1122 peer->peer_incarnation = msg->ptlm_srcstamp;
1123 peer->peer_next_matchbits = safe_matchbits;
1124 peer->peer_max_msg_size =
1125 msg->ptlm_u.hello.kptlhm_max_msg_size;
1127 write_unlock_irqrestore(g_lock, flags);
1131 if (msg->ptlm_dststamp != 0 &&
1132 msg->ptlm_dststamp <= peer->peer_myincarnation) {
1133 write_unlock_irqrestore(g_lock, flags);
1135 CERROR("Ignoring stale HELLO from %s: "
1136 "dststamp "LPX64" (current "LPX64")\n",
1137 libcfs_id2str(lpid),
1139 peer->peer_myincarnation);
1140 kptllnd_peer_decref(peer);
1144 /* Brand new connection attempt: remove old incarnation */
1145 kptllnd_peer_close_locked(peer, 0);
1148 kptllnd_cull_peertable_locked(lpid);
1150 write_unlock_irqrestore(g_lock, flags);
1153 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1154 " stamp "LPX64"("LPX64")\n",
1155 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1156 msg->ptlm_srcstamp, peer->peer_incarnation);
1158 kptllnd_peer_decref(peer);
1161 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1162 if (hello_tx == NULL) {
1163 CERROR("Unable to allocate HELLO message for %s\n",
1164 libcfs_id2str(lpid));
1168 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1169 sizeof(kptl_hello_msg_t));
1171 new_peer = kptllnd_peer_allocate(lpid, initiator);
1172 if (new_peer == NULL) {
1173 kptllnd_tx_decref(hello_tx);
1177 rc = kptllnd_peer_reserve_buffers();
1179 kptllnd_peer_decref(new_peer);
1180 kptllnd_tx_decref(hello_tx);
1182 CERROR("Failed to reserve buffers for %s\n",
1183 libcfs_id2str(lpid));
1187 write_lock_irqsave(g_lock, flags);
1190 if (kptllnd_data.kptl_shutdown) {
1191 write_unlock_irqrestore(g_lock, flags);
1193 CERROR ("Shutdown started, refusing connection from %s\n",
1194 libcfs_id2str(lpid));
1195 kptllnd_peer_unreserve_buffers();
1196 kptllnd_peer_decref(new_peer);
1197 kptllnd_tx_decref(hello_tx);
1201 peer = kptllnd_id2peer_locked(lpid);
1203 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1204 /* An outgoing message instantiated 'peer' for me */
1205 LASSERT(peer->peer_incarnation == 0);
1207 peer->peer_state = PEER_STATE_ACTIVE;
1208 peer->peer_incarnation = msg->ptlm_srcstamp;
1209 peer->peer_next_matchbits = safe_matchbits;
1210 peer->peer_max_msg_size =
1211 msg->ptlm_u.hello.kptlhm_max_msg_size;
1213 write_unlock_irqrestore(g_lock, flags);
1215 CWARN("Outgoing instantiated peer %s\n",
1216 libcfs_id2str(lpid));
1218 LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1220 write_unlock_irqrestore(g_lock, flags);
1222 /* WOW! Somehow this peer completed the HELLO
1223 * handshake while I slept. I guess I could have slept
1224 * while it rebooted and sent a new HELLO, so I'll fail
1226 CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1227 kptllnd_peer_decref(peer);
1231 kptllnd_peer_unreserve_buffers();
1232 kptllnd_peer_decref(new_peer);
1233 kptllnd_tx_decref(hello_tx);
1237 if (kptllnd_data.kptl_n_active_peers ==
1238 kptllnd_data.kptl_expected_peers) {
1239 /* peer table full */
1240 write_unlock_irqrestore(g_lock, flags);
1242 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1244 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1246 CERROR("Refusing connection from %s\n",
1247 libcfs_id2str(lpid));
1248 kptllnd_peer_unreserve_buffers();
1249 kptllnd_peer_decref(new_peer);
1250 kptllnd_tx_decref(hello_tx);
1254 write_lock_irqsave(g_lock, flags);
1255 kptllnd_data.kptl_expected_peers++;
1259 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1261 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1262 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1263 *kptllnd_tunables.kptl_max_msg_size;
1265 new_peer->peer_state = PEER_STATE_ACTIVE;
1266 new_peer->peer_incarnation = msg->ptlm_srcstamp;
1267 new_peer->peer_next_matchbits = safe_matchbits;
1268 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1269 new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1271 kptllnd_peer_add_peertable_locked(new_peer);
1273 write_unlock_irqrestore(g_lock, flags);
1275 /* NB someone else could get in now and post a message before I post
1276 * the HELLO, but post_tx/check_sends take care of that! */
1278 CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1279 libcfs_id2str(new_peer->peer_id), hello_tx);
1281 kptllnd_post_tx(new_peer, hello_tx, 0);
1282 kptllnd_peer_check_sends(new_peer);
1288 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1290 kptllnd_post_tx(peer, tx, nfrag);
1291 kptllnd_peer_check_sends(peer);
1295 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1297 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1298 ptl_process_id_t ptl_id;
1299 kptl_peer_t *new_peer;
1300 kptl_tx_t *hello_tx;
1301 unsigned long flags;
1303 __u64 last_matchbits_seen;
1305 /* I expect to find the peer, so I only take a read lock... */
1306 read_lock_irqsave(g_lock, flags);
1307 *peerp = kptllnd_id2peer_locked(target);
1308 read_unlock_irqrestore(g_lock, flags);
1313 if ((target.pid & LNET_PID_USERFLAG) != 0) {
1314 CWARN("Refusing to create a new connection to %s "
1315 "(non-kernel peer)\n", libcfs_id2str(target));
1316 return -EHOSTUNREACH;
1319 /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1320 * the same portals PID */
1321 ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1322 ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1324 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1325 if (hello_tx == NULL) {
1326 CERROR("Unable to allocate connect message for %s\n",
1327 libcfs_id2str(target));
1331 hello_tx->tx_acked = 1;
1332 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1333 sizeof(kptl_hello_msg_t));
1335 new_peer = kptllnd_peer_allocate(target, ptl_id);
1336 if (new_peer == NULL) {
1341 rc = kptllnd_peer_reserve_buffers();
1345 write_lock_irqsave(g_lock, flags);
1347 if (kptllnd_data.kptl_shutdown) {
1348 write_unlock_irqrestore(g_lock, flags);
1353 *peerp = kptllnd_id2peer_locked(target);
1354 if (*peerp != NULL) {
1355 write_unlock_irqrestore(g_lock, flags);
1359 kptllnd_cull_peertable_locked(target);
1361 if (kptllnd_data.kptl_n_active_peers ==
1362 kptllnd_data.kptl_expected_peers) {
1363 /* peer table full */
1364 write_unlock_irqrestore(g_lock, flags);
1366 kptllnd_peertable_overflow_msg("Connection to ", target);
1368 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1370 CERROR("Can't create connection to %s\n",
1371 libcfs_id2str(target));
1375 write_lock_irqsave(g_lock, flags);
1376 kptllnd_data.kptl_expected_peers++;
1380 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1382 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1383 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1384 *kptllnd_tunables.kptl_max_msg_size;
1386 new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1387 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1389 kptllnd_peer_add_peertable_locked(new_peer);
1391 write_unlock_irqrestore(g_lock, flags);
1393 /* NB someone else could get in now and post a message before I post
1394 * the HELLO, but post_tx/check_sends take care of that! */
1396 CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1397 libcfs_id2str(new_peer->peer_id), hello_tx);
1399 kptllnd_post_tx(new_peer, hello_tx, 0);
1400 kptllnd_peer_check_sends(new_peer);
1406 kptllnd_peer_unreserve_buffers();
1408 kptllnd_peer_decref(new_peer);
1410 kptllnd_tx_decref(hello_tx);