1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: PJ Kirner <pjkirner@clusterfs.com>
6 * E Barton <eeb@bartonsoftware.com>
8 * This file is part of the Lustre file system, http://www.lustre.org
9 * Lustre is a trademark of Cluster File Systems, Inc.
11 * This file is confidential source code owned by Cluster File Systems.
12 * No viewing, modification, compilation, redistribution, or any other
13 * form of use is permitted except through a signed license agreement.
15 * If you have not signed such an agreement, then you have no rights to
16 * this file. Please destroy it immediately and contact CFS.
21 #include <libcfs/list.h>
24 kptllnd_count_queue(struct list_head *q)
37 kptllnd_get_peer_info(int index,
38 lnet_process_id_t *id,
39 int *state, int *sent_hello,
40 int *refcount, __u64 *incarnation,
41 __u64 *next_matchbits, __u64 *last_matchbits_seen,
42 int *nsendq, int *nactiveq,
43 int *credits, int *outstanding_credits)
45 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
47 struct list_head *ptmp;
52 read_lock_irqsave(g_lock, flags);
54 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
56 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57 peer = list_entry(ptmp, kptl_peer_t, peer_list);
63 *state = peer->peer_state;
64 *sent_hello = peer->peer_sent_hello;
65 *refcount = atomic_read(&peer->peer_refcount);
66 *incarnation = peer->peer_incarnation;
68 spin_lock(&peer->peer_lock);
70 *next_matchbits = peer->peer_next_matchbits;
71 *last_matchbits_seen = peer->peer_last_matchbits_seen;
72 *credits = peer->peer_credits;
73 *outstanding_credits = peer->peer_outstanding_credits;
75 *nsendq = kptllnd_count_queue(&peer->peer_sendq);
76 *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
78 spin_unlock(&peer->peer_lock);
86 read_unlock_irqrestore(g_lock, flags);
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
93 LASSERT (kptllnd_data.kptl_n_active_peers <
94 kptllnd_data.kptl_expected_peers);
96 LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
97 peer->peer_state == PEER_STATE_ACTIVE);
99 kptllnd_data.kptl_n_active_peers++;
100 atomic_inc(&peer->peer_refcount); /* +1 ref for the list */
102 /* NB add to HEAD of peer list for MRU order!
103 * (see kptllnd_cull_peertable) */
104 list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
108 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
110 /* I'm about to add a new peer with this portals ID to the peer table,
111 * so (a) this peer should not exist already and (b) I want to leave at
112 * most (max_procs_per_nid - 1) peers with this NID in the table. */
113 struct list_head *peers = kptllnd_nid2peerlist(pid.nid);
114 int cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
116 struct list_head *tmp;
117 struct list_head *nxt;
121 list_for_each_safe (tmp, nxt, peers) {
122 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
124 peer = list_entry(tmp, kptl_peer_t, peer_list);
126 if (peer->peer_id.nid != pid.nid)
129 LASSERT (peer->peer_id.pid != pid.pid);
133 if (count < cull_count) /* recent (don't cull) */
136 CDEBUG(D_NET, "Cull %s(%s)\n",
137 libcfs_id2str(peer->peer_id),
138 kptllnd_ptlid2str(peer->peer_ptlid));
140 kptllnd_peer_close_locked(peer, 0);
145 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
150 LIBCFS_ALLOC(peer, sizeof (*peer));
152 CERROR("Can't create peer %s (%s)\n",
154 kptllnd_ptlid2str(ppid));
158 memset(peer, 0, sizeof(*peer)); /* zero flags etc */
160 INIT_LIST_HEAD (&peer->peer_sendq);
161 INIT_LIST_HEAD (&peer->peer_activeq);
162 spin_lock_init (&peer->peer_lock);
164 peer->peer_state = PEER_STATE_ALLOCATED;
165 peer->peer_error = 0;
166 peer->peer_last_alive = cfs_time_current();
167 peer->peer_id = lpid;
168 peer->peer_ptlid = ppid;
169 peer->peer_credits = 1; /* enough for HELLO */
170 peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
171 peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
172 peer->peer_active_rxs = 0;
174 atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */
176 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
178 peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
180 /* Only increase # peers under lock, to guarantee we dont grow it
182 if (kptllnd_data.kptl_shutdown) {
183 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
185 LIBCFS_FREE(peer, sizeof(*peer));
189 kptllnd_data.kptl_npeers++;
190 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
196 kptllnd_peer_destroy (kptl_peer_t *peer)
200 CDEBUG(D_NET, "Peer=%p\n", peer);
202 LASSERT (!in_interrupt());
203 LASSERT (atomic_read(&peer->peer_refcount) == 0);
204 LASSERT (peer->peer_active_rxs == 0);
205 LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
206 peer->peer_state == PEER_STATE_ZOMBIE);
207 LASSERT (list_empty(&peer->peer_sendq));
208 LASSERT (list_empty(&peer->peer_activeq));
210 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
212 if (peer->peer_state == PEER_STATE_ZOMBIE)
213 list_del(&peer->peer_list);
215 kptllnd_data.kptl_npeers--;
217 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
219 LIBCFS_FREE (peer, sizeof (*peer));
223 kptllnd_peer_cancel_txs(kptl_peer_t *peer)
225 struct list_head sendq;
226 struct list_head activeq;
227 struct list_head *tmp;
228 struct list_head *nxt;
232 /* atomically grab all the peer's tx-es... */
234 spin_lock_irqsave(&peer->peer_lock, flags);
236 list_add(&sendq, &peer->peer_sendq);
237 list_del_init(&peer->peer_sendq);
238 list_for_each (tmp, &sendq) {
239 tx = list_entry(tmp, kptl_tx_t, tx_list);
243 list_add(&activeq, &peer->peer_activeq);
244 list_del_init(&peer->peer_activeq);
245 list_for_each (tmp, &activeq) {
246 tx = list_entry(tmp, kptl_tx_t, tx_list);
250 spin_unlock_irqrestore(&peer->peer_lock, flags);
252 /* ...then drop the peer's ref on them at leasure. This will get
253 * kptllnd_tx_fini() to abort outstanding comms if necessary. */
255 list_for_each_safe (tmp, nxt, &sendq) {
256 tx = list_entry(tmp, kptl_tx_t, tx_list);
257 list_del(&tx->tx_list);
258 tx->tx_status = -EIO;
259 kptllnd_tx_decref(tx);
262 list_for_each_safe (tmp, nxt, &activeq) {
263 tx = list_entry(tmp, kptl_tx_t, tx_list);
264 list_del(&tx->tx_list);
265 tx->tx_status = -EIO;
266 kptllnd_tx_decref(tx);
271 kptllnd_peer_alive (kptl_peer_t *peer)
273 /* This is racy, but everyone's only writing cfs_time_current() */
274 peer->peer_last_alive = cfs_time_current();
279 kptllnd_peer_notify (kptl_peer_t *peer)
282 time_t last_alive = 0;
285 spin_lock_irqsave(&peer->peer_lock, flags);
287 if (peer->peer_error != 0) {
288 error = peer->peer_error;
289 peer->peer_error = 0;
291 last_alive = cfs_time_current_sec() -
292 cfs_duration_sec(cfs_time_current() -
293 peer->peer_last_alive);
296 spin_unlock_irqrestore(&peer->peer_lock, flags);
299 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
304 kptllnd_handle_closing_peers ()
308 struct list_head *tmp;
309 struct list_head *nxt;
312 /* Check with a read lock first to avoid blocking anyone */
314 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
315 idle = list_empty(&kptllnd_data.kptl_closing_peers);
316 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
321 /* Scan the closing peers and cancel their txs.
322 * NB only safe while there is only a single watchdog */
324 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
326 list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
327 peer = list_entry (tmp, kptl_peer_t, peer_list);
329 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
331 list_del(&peer->peer_list);
332 list_add_tail(&peer->peer_list,
333 &kptllnd_data.kptl_zombie_peers);
334 peer->peer_state = PEER_STATE_ZOMBIE;
336 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
338 kptllnd_peer_notify(peer);
339 kptllnd_peer_cancel_txs(peer);
340 kptllnd_peer_decref(peer);
342 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
345 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
349 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
351 switch (peer->peer_state) {
355 case PEER_STATE_WAITING_HELLO:
356 case PEER_STATE_ACTIVE:
357 /* Ensure new peers see a new incarnation of me */
358 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
359 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
360 kptllnd_data.kptl_incarnation++;
362 /* Removing from peer table */
363 kptllnd_data.kptl_n_active_peers--;
364 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
366 list_del(&peer->peer_list);
367 kptllnd_peer_unreserve_buffers();
369 peer->peer_error = why; /* stash 'why' only on first close */
371 /* Schedule for immediate attention, taking peer table's ref */
372 list_add_tail(&peer->peer_list,
373 &kptllnd_data.kptl_closing_peers);
374 wake_up(&kptllnd_data.kptl_watchdog_waitq);
377 case PEER_STATE_ZOMBIE:
378 /* Schedule for attention at next timeout */
379 kptllnd_peer_addref(peer);
380 list_del(&peer->peer_list);
381 list_add_tail(&peer->peer_list,
382 &kptllnd_data.kptl_closing_peers);
385 case PEER_STATE_CLOSING:
389 peer->peer_state = PEER_STATE_CLOSING;
393 kptllnd_peer_close(kptl_peer_t *peer, int why)
397 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
398 kptllnd_peer_close_locked(peer, why);
399 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
403 kptllnd_peer_del(lnet_process_id_t id)
405 struct list_head *ptmp;
406 struct list_head *pnxt;
415 * Find the single bucket we are supposed to look at or if nid is a
416 * wildcard (LNET_NID_ANY) then look at all of the buckets
418 if (id.nid != LNET_NID_ANY) {
419 struct list_head *l = kptllnd_nid2peerlist(id.nid);
421 lo = hi = l - kptllnd_data.kptl_peers;
423 if (id.pid != LNET_PID_ANY)
427 hi = kptllnd_data.kptl_peer_hash_size - 1;
431 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
433 for (i = lo; i <= hi; i++) {
434 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
435 peer = list_entry (ptmp, kptl_peer_t, peer_list);
437 if (!(id.nid == LNET_NID_ANY ||
438 (peer->peer_id.nid == id.nid &&
439 (id.pid == LNET_PID_ANY ||
440 peer->peer_id.pid == id.pid))))
443 kptllnd_peer_addref(peer); /* 1 ref for me... */
445 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
448 kptllnd_peer_close(peer, 0);
449 kptllnd_peer_decref(peer); /* ...until here */
451 rc = 0; /* matched something */
453 /* start again now I've dropped the lock */
458 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
464 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx)
466 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
467 ptl_handle_md_t rdma_mdh = PTL_INVALID_HANDLE;
468 ptl_handle_md_t msg_mdh = PTL_INVALID_HANDLE;
474 LASSERT (!tx->tx_idle);
475 LASSERT (!tx->tx_active);
476 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
477 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
478 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
479 tx->tx_type == TX_TYPE_PUT_REQUEST ||
480 tx->tx_type == TX_TYPE_GET_REQUEST);
482 kptllnd_set_tx_peer(tx, peer);
484 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
485 tx->tx_type == TX_TYPE_GET_REQUEST) {
487 spin_lock_irqsave(&peer->peer_lock, flags);
489 /* Assume 64-bit matchbits can't wrap */
490 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
491 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
492 peer->peer_next_matchbits++;
494 spin_unlock_irqrestore(&peer->peer_lock, flags);
496 prc = PtlMEAttach(kptllnd_data.kptl_nih,
497 *kptllnd_tunables.kptl_portal,
499 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
505 CERROR("PtlMEAttach(%s) failed: %d\n",
506 libcfs_id2str(peer->peer_id), prc);
510 prc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, &rdma_mdh);
512 CERROR("PtlMDAttach(%s) failed: %d\n",
513 libcfs_id2str(tx->tx_peer->peer_id), prc);
514 prc = PtlMEUnlink(meh);
515 LASSERT(prc == PTL_OK);
516 rdma_mdh = PTL_INVALID_HANDLE;
520 /* I'm not racing with the event callback here. It's a bug if
521 * there's an event on the MD I just attached before I actually
522 * send the RDMA request message which the event callback
523 * catches by asserting 'rdma_mdh' is valid. */
526 memset(&md, 0, sizeof(md));
528 md.start = tx->tx_msg;
529 md.length = tx->tx_msg->ptlm_nob;
531 md.options = PTL_MD_OP_PUT |
532 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
533 PTL_MD_EVENT_START_DISABLE;
534 md.user_ptr = &tx->tx_msg_eventarg;
535 md.eq_handle = kptllnd_data.kptl_eqh;
537 prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
539 msg_mdh = PTL_INVALID_HANDLE;
543 spin_lock_irqsave(&peer->peer_lock, flags);
545 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
547 tx->tx_rdma_mdh = rdma_mdh;
548 tx->tx_msg_mdh = msg_mdh;
550 /* Ensure HELLO is sent first */
551 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
552 list_add(&tx->tx_list, &peer->peer_sendq);
554 list_add_tail(&tx->tx_list, &peer->peer_sendq);
556 spin_unlock_irqrestore(&peer->peer_lock, flags);
560 spin_lock_irqsave(&peer->peer_lock, flags);
562 tx->tx_status = -EIO;
563 tx->tx_rdma_mdh = rdma_mdh;
564 tx->tx_msg_mdh = msg_mdh;
566 spin_unlock_irqrestore(&peer->peer_lock, flags);
568 kptllnd_tx_decref(tx);
572 kptllnd_peer_check_sends (kptl_peer_t *peer)
579 LASSERT(!in_interrupt());
581 spin_lock_irqsave(&peer->peer_lock, flags);
583 if (list_empty(&peer->peer_sendq) &&
584 peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
585 peer->peer_credits != 0) {
587 /* post a NOOP to return credits */
588 spin_unlock_irqrestore(&peer->peer_lock, flags);
590 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
592 CERROR("Can't return credits to %s: can't allocate descriptor\n",
593 libcfs_id2str(peer->peer_id));
595 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
596 kptllnd_post_tx(peer, tx);
599 spin_lock_irqsave(&peer->peer_lock, flags);
602 while (!list_empty(&peer->peer_sendq)) {
603 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
605 LASSERT (tx->tx_active);
606 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
607 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
608 !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
610 LASSERT (peer->peer_outstanding_credits >= 0);
611 LASSERT (peer->peer_outstanding_credits <=
612 *kptllnd_tunables.kptl_peercredits);
613 LASSERT (peer->peer_credits >= 0);
614 LASSERT (peer->peer_credits <=
615 *kptllnd_tunables.kptl_peercredits);
617 /* Ensure HELLO is sent first */
618 if (!peer->peer_sent_hello) {
619 if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
621 peer->peer_sent_hello = 1;
624 if (peer->peer_credits == 0) {
625 CDEBUG(D_NETTRACE, "%s[%d/%d]: no credits for %p\n",
626 libcfs_id2str(peer->peer_id),
627 peer->peer_credits, peer->peer_outstanding_credits, tx);
631 /* Don't use the last credit unless I've got credits to
633 if (peer->peer_credits == 1 &&
634 peer->peer_outstanding_credits == 0) {
635 CDEBUG(D_NETTRACE, "%s[%d/%d]: not using last credit for %p\n",
636 libcfs_id2str(peer->peer_id),
637 peer->peer_credits, peer->peer_outstanding_credits, tx);
641 list_del(&tx->tx_list);
643 /* Discard any NOOP I queued if I'm not at the high-water mark
644 * any more or more messages have been queued */
645 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
646 (!list_empty(&peer->peer_sendq) ||
647 peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
651 spin_unlock_irqrestore(&peer->peer_lock, flags);
653 CDEBUG(D_NET, "%s: redundant noop\n",
654 libcfs_id2str(peer->peer_id));
655 kptllnd_tx_decref(tx);
657 spin_lock_irqsave(&peer->peer_lock, flags);
661 /* fill last-minute msg header fields */
662 kptllnd_msg_pack(tx->tx_msg, peer);
664 peer->peer_outstanding_credits = 0;
665 peer->peer_credits--;
667 CDEBUG(D_NETTRACE, "%s[%d/%d]: %s tx=%p nob=%d cred=%d\n",
668 libcfs_id2str(peer->peer_id),
669 peer->peer_credits, peer->peer_outstanding_credits,
670 kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
671 tx, tx->tx_msg->ptlm_nob,
672 tx->tx_msg->ptlm_credits);
674 list_add_tail(&tx->tx_list, &peer->peer_activeq);
676 kptllnd_tx_addref(tx); /* 1 ref for me... */
678 spin_unlock_irqrestore(&peer->peer_lock, flags);
680 rc = PtlPut (tx->tx_msg_mdh,
683 *kptllnd_tunables.kptl_portal,
687 0); /* header data */
689 CERROR("PtlPut %s error %d\n",
690 libcfs_id2str(peer->peer_id), rc);
692 /* Nuke everything (including this tx) */
693 kptllnd_peer_close(peer, -EIO);
697 kptllnd_tx_decref(tx); /* drop my ref */
699 spin_lock_irqsave(&peer->peer_lock, flags);
702 spin_unlock_irqrestore(&peer->peer_lock, flags);
706 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
709 struct list_head *tmp;
712 spin_lock_irqsave(&peer->peer_lock, flags);
714 list_for_each(tmp, &peer->peer_sendq) {
715 tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
717 if (time_after_eq(jiffies, tx->tx_deadline)) {
718 kptllnd_tx_addref(tx);
719 spin_unlock_irqrestore(&peer->peer_lock, flags);
724 list_for_each(tmp, &peer->peer_activeq) {
725 tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
727 if (time_after_eq(jiffies, tx->tx_deadline)) {
728 kptllnd_tx_addref(tx);
729 spin_unlock_irqrestore(&peer->peer_lock, flags);
734 spin_unlock_irqrestore(&peer->peer_lock, flags);
740 kptllnd_peer_check_bucket (int idx)
742 struct list_head *peers = &kptllnd_data.kptl_peers[idx];
743 struct list_head *ptmp;
750 CDEBUG(D_NET, "Bucket=%d\n", idx);
753 /* NB. Shared lock while I just look */
754 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
756 list_for_each (ptmp, peers) {
757 peer = list_entry (ptmp, kptl_peer_t, peer_list);
759 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d\n",
760 libcfs_id2str(peer->peer_id),
761 peer->peer_credits, peer->peer_outstanding_credits);
763 /* In case we have enough credits to return via a
764 * NOOP, but there were no non-blocking tx descs
765 * free to do it last time... */
766 kptllnd_peer_check_sends(peer);
768 tx = kptllnd_find_timed_out_tx(peer);
772 kptllnd_peer_addref(peer); /* 1 ref for me... */
774 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
777 spin_lock_irqsave(&peer->peer_lock, flags);
778 nsend = kptllnd_count_queue(&peer->peer_sendq);
779 nactive = kptllnd_count_queue(&peer->peer_activeq);
780 spin_unlock_irqrestore(&peer->peer_lock, flags);
782 LCONSOLE_ERROR("Timing out %s: please check Portals\n",
783 libcfs_id2str(peer->peer_id));
785 CERROR("%s timed out: cred %d outstanding %d sendq %d "
786 "activeq %d Tx %s (%s%s%s) status %d T/O %ds\n",
787 libcfs_id2str(peer->peer_id),
788 peer->peer_credits, peer->peer_outstanding_credits,
789 nsend, nactive, kptllnd_tx_typestr(tx->tx_type),
790 tx->tx_active ? "A" : "",
791 PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
793 PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
795 tx->tx_status, *kptllnd_tunables.kptl_timeout);
797 kptllnd_dump_ptltrace();
799 kptllnd_tx_decref(tx);
801 kptllnd_peer_close(peer, -ETIMEDOUT);
802 kptllnd_peer_decref(peer); /* ...until here */
804 /* start again now I've dropped the lock */
808 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
812 kptllnd_id2peer_locked (lnet_process_id_t id)
814 struct list_head *peers = kptllnd_nid2peerlist(id.nid);
815 struct list_head *tmp;
818 list_for_each (tmp, peers) {
820 peer = list_entry (tmp, kptl_peer_t, peer_list);
822 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
823 peer->peer_state == PEER_STATE_ACTIVE);
825 if (peer->peer_id.nid != id.nid ||
826 peer->peer_id.pid != id.pid)
829 kptllnd_peer_addref(peer);
831 CDEBUG(D_NET, "%s -> %s (%d)\n",
833 kptllnd_ptlid2str(peer->peer_ptlid),
834 atomic_read (&peer->peer_refcount));
842 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
844 LCONSOLE_ERROR("%s %s overflows the peer table[%d]: "
845 "messages may be dropped\n",
846 str, libcfs_id2str(id),
847 kptllnd_data.kptl_n_active_peers);
848 LCONSOLE_ERROR("Please correct by increasing "
849 "'max_nodes' or 'max_procs_per_node'\n");
853 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
856 struct list_head *tmp;
858 /* Find the last matchbits I saw this new peer using. Note..
859 A. This peer cannot be in the peer table - she's new!
860 B. If I can't find the peer in the closing/zombie peers, all
861 matchbits are safe because all refs to the (old) peer have gone
862 so all txs have completed so there's no risk of matchbit
866 LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
868 /* peer's last matchbits can't change after it comes out of the peer
869 * table, so first match is fine */
871 list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
872 peer = list_entry (tmp, kptl_peer_t, peer_list);
874 if (peer->peer_id.nid == lpid.nid &&
875 peer->peer_id.pid == lpid.pid)
876 return peer->peer_last_matchbits_seen;
879 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
880 peer = list_entry (tmp, kptl_peer_t, peer_list);
882 if (peer->peer_id.nid == lpid.nid &&
883 peer->peer_id.pid == lpid.pid)
884 return peer->peer_last_matchbits_seen;
887 return PTL_RESERVED_MATCHBITS;
891 kptllnd_peer_handle_hello (ptl_process_id_t initiator,
894 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
896 kptl_peer_t *new_peer;
897 lnet_process_id_t lpid;
901 __u64 safe_matchbits;
902 __u64 last_matchbits_seen;
904 lpid.nid = msg->ptlm_srcnid;
905 lpid.pid = msg->ptlm_srcpid;
907 CDEBUG(D_NET, "hello from %s(%s)\n",
908 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
910 if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
911 (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
912 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
913 * userspace. Refuse the connection if she hasn't set the
914 * correct flag in her PID... */
915 CERROR("Userflag not set in hello from %s (%s)\n",
916 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
920 /* kptlhm_matchbits are the highest matchbits my peer may have used to
921 * RDMA to me. I ensure I never register buffers for RDMA that could
922 * match any she used */
923 safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
925 if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
926 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
927 safe_matchbits, libcfs_id2str(lpid));
931 if (msg->ptlm_u.hello.kptlhm_max_msg_size !=
932 *kptllnd_tunables.kptl_max_msg_size) {
933 CERROR("max message size MUST be equal for all peers: "
934 "got %d expected %d from %s\n",
935 msg->ptlm_u.hello.kptlhm_max_msg_size,
936 *kptllnd_tunables.kptl_max_msg_size,
937 libcfs_id2str(lpid));
941 if (msg->ptlm_credits + 1 != *kptllnd_tunables.kptl_peercredits) {
942 CERROR("peercredits MUST be equal on all peers: "
943 "got %d expected %d from %s\n",
944 msg->ptlm_credits + 1,
945 *kptllnd_tunables.kptl_peercredits,
946 libcfs_id2str(lpid));
950 write_lock_irqsave(g_lock, flags);
952 peer = kptllnd_id2peer_locked(lpid);
954 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
955 /* Completing HELLO handshake */
956 LASSERT(peer->peer_incarnation == 0);
958 if (msg->ptlm_dststamp != 0 &&
959 msg->ptlm_dststamp != peer->peer_myincarnation) {
960 write_unlock_irqrestore(g_lock, flags);
962 CERROR("Ignoring HELLO from %s: unexpected "
963 "dststamp "LPX64" ("LPX64" wanted)\n",
966 peer->peer_myincarnation);
967 kptllnd_peer_decref(peer);
971 /* Concurrent initiation or response to my HELLO */
972 peer->peer_state = PEER_STATE_ACTIVE;
973 peer->peer_incarnation = msg->ptlm_srcstamp;
974 peer->peer_next_matchbits = safe_matchbits;
976 write_unlock_irqrestore(g_lock, flags);
980 if (msg->ptlm_dststamp != 0 &&
981 msg->ptlm_dststamp <= peer->peer_myincarnation) {
982 write_unlock_irqrestore(g_lock, flags);
984 CERROR("Ignoring stale HELLO from %s: "
985 "dststamp "LPX64" (current "LPX64")\n",
988 peer->peer_myincarnation);
989 kptllnd_peer_decref(peer);
993 /* Brand new connection attempt: remove old incarnation */
994 kptllnd_peer_close_locked(peer, 0);
997 kptllnd_cull_peertable_locked(lpid);
999 write_unlock_irqrestore(g_lock, flags);
1002 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1003 " stamp "LPX64"("LPX64")\n",
1004 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1005 msg->ptlm_srcstamp, peer->peer_incarnation);
1007 kptllnd_peer_decref(peer);
1010 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1011 if (hello_tx == NULL) {
1012 CERROR("Unable to allocate HELLO message for %s\n",
1013 libcfs_id2str(lpid));
1017 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1018 sizeof(kptl_hello_msg_t));
1020 new_peer = kptllnd_peer_allocate(lpid, initiator);
1021 if (new_peer == NULL) {
1022 kptllnd_tx_decref(hello_tx);
1026 rc = kptllnd_peer_reserve_buffers();
1028 kptllnd_peer_decref(new_peer);
1029 kptllnd_tx_decref(hello_tx);
1031 CERROR("Failed to reserve buffers for %s\n",
1032 libcfs_id2str(lpid));
1036 write_lock_irqsave(g_lock, flags);
1038 peer = kptllnd_id2peer_locked(lpid);
1040 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1041 /* An outgoing message instantiated 'peer' for me and
1042 * presumably provoked this reply */
1043 CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid));
1044 LASSERT(peer->peer_incarnation == 0);
1046 peer->peer_state = PEER_STATE_ACTIVE;
1047 peer->peer_incarnation = msg->ptlm_srcstamp;
1048 peer->peer_next_matchbits = safe_matchbits;
1050 LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1051 /* WOW! Somehow this peer completed the HELLO
1052 * handshake while I slept. I guess I could have slept
1053 * while it rebooted and sent a new HELLO, so I'll fail
1055 CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1056 kptllnd_peer_decref(peer);
1060 write_unlock_irqrestore(g_lock, flags);
1062 kptllnd_peer_unreserve_buffers();
1063 kptllnd_peer_decref(new_peer);
1064 kptllnd_tx_decref(hello_tx);
1068 if (kptllnd_data.kptl_n_active_peers ==
1069 kptllnd_data.kptl_expected_peers) {
1070 /* peer table full */
1071 write_unlock_irqrestore(g_lock, flags);
1073 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1075 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1077 CERROR("Refusing connection from %s\n",
1078 libcfs_id2str(lpid));
1079 kptllnd_peer_unreserve_buffers();
1080 kptllnd_peer_decref(new_peer);
1081 kptllnd_tx_decref(hello_tx);
1085 write_lock_irqsave(g_lock, flags);
1086 kptllnd_data.kptl_expected_peers++;
1089 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1091 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1092 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1093 *kptllnd_tunables.kptl_max_msg_size;
1095 new_peer->peer_state = PEER_STATE_ACTIVE;
1096 new_peer->peer_incarnation = msg->ptlm_srcstamp;
1097 new_peer->peer_next_matchbits = safe_matchbits;
1098 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1100 kptllnd_peer_add_peertable_locked(new_peer);
1102 write_unlock_irqrestore(g_lock, flags);
1104 /* NB someone else could get in now and post a message before I post
1105 * the HELLO, but post_tx/check_sends take care of that! */
1107 CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1108 libcfs_id2str(new_peer->peer_id), hello_tx);
1110 kptllnd_post_tx(new_peer, hello_tx);
1111 kptllnd_peer_check_sends(new_peer);
1117 kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
1119 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1120 ptl_process_id_t ptl_id;
1122 kptl_peer_t *new_peer = NULL;
1123 kptl_tx_t *hello_tx = NULL;
1124 unsigned long flags;
1126 __u64 last_matchbits_seen;
1128 LASSERT (tx->tx_lnet_msg != NULL);
1129 LASSERT (tx->tx_peer == NULL);
1131 /* I expect to find the peer, so I only take a read lock... */
1132 read_lock_irqsave(g_lock, flags);
1133 peer = kptllnd_id2peer_locked(target);
1134 read_unlock_irqrestore(g_lock, flags);
1140 if ((target.pid & LNET_PID_USERFLAG) != 0) {
1141 CWARN("Refusing to create a new connection to %s "
1142 "(non-kernel peer)\n", libcfs_id2str(target));
1143 tx->tx_status = -EHOSTUNREACH;
1147 /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1148 * the same portals PID */
1149 ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1150 ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1152 write_lock_irqsave(g_lock, flags);
1154 peer = kptllnd_id2peer_locked(target);
1156 write_unlock_irqrestore(g_lock, flags);
1160 kptllnd_cull_peertable_locked(target);
1162 write_unlock_irqrestore(g_lock, flags);
1164 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1165 if (hello_tx == NULL) {
1166 CERROR("Unable to allocate connect message for %s\n",
1167 libcfs_id2str(target));
1168 tx->tx_status = -ENOMEM;
1172 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1173 sizeof(kptl_hello_msg_t));
1175 new_peer = kptllnd_peer_allocate(target, ptl_id);
1176 if (new_peer == NULL) {
1177 tx->tx_status = -ENOMEM;
1181 rc = kptllnd_peer_reserve_buffers();
1187 write_lock_irqsave(g_lock, flags);
1189 peer = kptllnd_id2peer_locked(target);
1190 if (peer != NULL) { /* someone else beat me to it */
1191 write_unlock_irqrestore(g_lock, flags);
1193 kptllnd_peer_unreserve_buffers();
1194 kptllnd_peer_decref(new_peer);
1195 kptllnd_tx_decref(hello_tx);
1199 if (kptllnd_data.kptl_n_active_peers ==
1200 kptllnd_data.kptl_expected_peers) {
1201 /* peer table full */
1202 write_unlock_irqrestore(g_lock, flags);
1204 kptllnd_peertable_overflow_msg("Connection to ", target);
1206 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1208 CERROR("Can't create connection to %s\n",
1209 libcfs_id2str(target));
1210 kptllnd_peer_unreserve_buffers();
1211 tx->tx_status = -ENOMEM;
1214 write_lock_irqsave(g_lock, flags);
1215 kptllnd_data.kptl_expected_peers++;
1218 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1220 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1221 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1222 *kptllnd_tunables.kptl_max_msg_size;
1224 new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1225 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1227 kptllnd_peer_add_peertable_locked(new_peer);
1229 write_unlock_irqrestore(g_lock, flags);
1231 /* NB someone else could get in now and post a message before I post
1232 * the HELLO, but post_tx/check_sends take care of that! */
1234 CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1235 libcfs_id2str(new_peer->peer_id), hello_tx);
1238 kptllnd_post_tx(peer, hello_tx);
1241 kptllnd_post_tx(peer, tx);
1242 kptllnd_peer_check_sends(peer);
1243 kptllnd_peer_decref(peer);
1247 if (hello_tx != NULL)
1248 kptllnd_tx_decref(hello_tx);
1250 if (new_peer != NULL)
1251 kptllnd_peer_decref(new_peer);
1253 LASSERT (tx->tx_status != 0);
1254 kptllnd_tx_decref(tx);