1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: PJ Kirner <pjkirner@clusterfs.com>
6 * E Barton <eeb@bartonsoftware.com>
8 * This file is part of the Lustre file system, http://www.lustre.org
9 * Lustre is a trademark of Cluster File Systems, Inc.
11 * This file is confidential source code owned by Cluster File Systems.
12 * No viewing, modification, compilation, redistribution, or any other
13 * form of use is permitted except through a signed license agreement.
15 * If you have not signed such an agreement, then you have no rights to
16 * this file. Please destroy it immediately and contact CFS.
21 #include <libcfs/list.h>
24 kptllnd_count_queue(struct list_head *q)
37 kptllnd_get_peer_info(int index,
38 lnet_process_id_t *id,
39 int *state, int *sent_hello,
40 int *refcount, __u64 *incarnation,
41 __u64 *next_matchbits, __u64 *last_matchbits_seen,
42 int *nsendq, int *nactiveq,
43 int *credits, int *outstanding_credits)
45 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
47 struct list_head *ptmp;
52 read_lock_irqsave(g_lock, flags);
54 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
56 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57 peer = list_entry(ptmp, kptl_peer_t, peer_list);
63 *state = peer->peer_state;
64 *sent_hello = peer->peer_sent_hello;
65 *refcount = atomic_read(&peer->peer_refcount);
66 *incarnation = peer->peer_incarnation;
68 spin_lock(&peer->peer_lock);
70 *next_matchbits = peer->peer_next_matchbits;
71 *last_matchbits_seen = peer->peer_last_matchbits_seen;
72 *credits = peer->peer_credits;
73 *outstanding_credits = peer->peer_outstanding_credits;
75 *nsendq = kptllnd_count_queue(&peer->peer_sendq);
76 *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
78 spin_unlock(&peer->peer_lock);
86 read_unlock_irqrestore(g_lock, flags);
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
93 LASSERT (kptllnd_data.kptl_n_active_peers <
94 kptllnd_data.kptl_expected_peers);
96 LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
97 peer->peer_state == PEER_STATE_ACTIVE);
99 kptllnd_data.kptl_n_active_peers++;
100 atomic_inc(&peer->peer_refcount); /* +1 ref for the list */
102 /* NB add to HEAD of peer list for MRU order!
103 * (see kptllnd_cull_peertable) */
104 list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
108 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
110 /* I'm about to add a new peer with this portals ID to the peer table,
111 * so (a) this peer should not exist already and (b) I want to leave at
112 * most (max_procs_per_nid - 1) peers with this NID in the table. */
113 struct list_head *peers = kptllnd_nid2peerlist(pid.nid);
114 int cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
116 struct list_head *tmp;
117 struct list_head *nxt;
121 list_for_each_safe (tmp, nxt, peers) {
122 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
124 peer = list_entry(tmp, kptl_peer_t, peer_list);
126 if (peer->peer_id.nid != pid.nid)
129 LASSERT (peer->peer_id.pid != pid.pid);
133 if (count < cull_count) /* recent (don't cull) */
136 CDEBUG(D_NET, "Cull %s(%s)\n",
137 libcfs_id2str(peer->peer_id),
138 kptllnd_ptlid2str(peer->peer_ptlid));
140 kptllnd_peer_close_locked(peer, 0);
145 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
150 LIBCFS_ALLOC(peer, sizeof (*peer));
152 CERROR("Can't create peer %s (%s)\n",
154 kptllnd_ptlid2str(ppid));
158 memset(peer, 0, sizeof(*peer)); /* zero flags etc */
160 INIT_LIST_HEAD (&peer->peer_sendq);
161 INIT_LIST_HEAD (&peer->peer_activeq);
162 spin_lock_init (&peer->peer_lock);
164 peer->peer_state = PEER_STATE_ALLOCATED;
165 peer->peer_error = 0;
166 peer->peer_last_alive = cfs_time_current();
167 peer->peer_id = lpid;
168 peer->peer_ptlid = ppid;
169 peer->peer_credits = 1; /* enough for HELLO */
170 peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
171 peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
172 peer->peer_sent_credits = 1; /* HELLO credit is implicit */
173 peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
175 atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */
177 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
179 peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
181 /* Only increase # peers under lock, to guarantee we dont grow it
183 if (kptllnd_data.kptl_shutdown) {
184 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
186 LIBCFS_FREE(peer, sizeof(*peer));
190 kptllnd_data.kptl_npeers++;
191 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
197 kptllnd_peer_destroy (kptl_peer_t *peer)
201 CDEBUG(D_NET, "Peer=%p\n", peer);
203 LASSERT (!in_interrupt());
204 LASSERT (atomic_read(&peer->peer_refcount) == 0);
205 LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
206 peer->peer_state == PEER_STATE_ZOMBIE);
207 LASSERT (list_empty(&peer->peer_sendq));
208 LASSERT (list_empty(&peer->peer_activeq));
210 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
212 if (peer->peer_state == PEER_STATE_ZOMBIE)
213 list_del(&peer->peer_list);
215 kptllnd_data.kptl_npeers--;
217 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
219 LIBCFS_FREE (peer, sizeof (*peer));
223 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
225 struct list_head *tmp;
226 struct list_head *nxt;
229 list_for_each_safe (tmp, nxt, peerq) {
230 tx = list_entry(tmp, kptl_tx_t, tx_list);
232 list_del(&tx->tx_list);
233 list_add_tail(&tx->tx_list, txs);
235 tx->tx_status = -EIO;
241 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
245 spin_lock_irqsave(&peer->peer_lock, flags);
247 kptllnd_cancel_txlist(&peer->peer_sendq, txs);
248 kptllnd_cancel_txlist(&peer->peer_activeq, txs);
250 spin_unlock_irqrestore(&peer->peer_lock, flags);
254 kptllnd_peer_alive (kptl_peer_t *peer)
256 /* This is racy, but everyone's only writing cfs_time_current() */
257 peer->peer_last_alive = cfs_time_current();
262 kptllnd_peer_notify (kptl_peer_t *peer)
265 time_t last_alive = 0;
268 spin_lock_irqsave(&peer->peer_lock, flags);
270 if (peer->peer_error != 0) {
271 error = peer->peer_error;
272 peer->peer_error = 0;
274 last_alive = cfs_time_current_sec() -
275 cfs_duration_sec(cfs_time_current() -
276 peer->peer_last_alive);
279 spin_unlock_irqrestore(&peer->peer_lock, flags);
282 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
287 kptllnd_handle_closing_peers ()
290 struct list_head txs;
292 struct list_head *tmp;
293 struct list_head *nxt;
297 /* Check with a read lock first to avoid blocking anyone */
299 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
300 idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
301 list_empty(&kptllnd_data.kptl_zombie_peers);
302 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
307 INIT_LIST_HEAD(&txs);
309 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
311 /* Cancel txs on all zombie peers. NB anyone dropping the last peer
312 * ref removes it from this list, so I musn't drop the lock while
314 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
315 peer = list_entry (tmp, kptl_peer_t, peer_list);
317 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
319 kptllnd_peer_cancel_txs(peer, &txs);
322 /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB
323 * I'm the only one removing from this list, but peers can be added on
324 * the end any time I drop the lock. */
326 list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
327 peer = list_entry (tmp, kptl_peer_t, peer_list);
329 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
331 list_del(&peer->peer_list);
332 list_add_tail(&peer->peer_list,
333 &kptllnd_data.kptl_zombie_peers);
334 peer->peer_state = PEER_STATE_ZOMBIE;
336 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
338 kptllnd_peer_notify(peer);
339 kptllnd_peer_cancel_txs(peer, &txs);
340 kptllnd_peer_decref(peer);
342 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
345 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
347 /* Drop peer's ref on all cancelled txs. This will get
348 * kptllnd_tx_fini() to abort outstanding comms if necessary. */
350 list_for_each_safe (tmp, nxt, &txs) {
351 tx = list_entry(tmp, kptl_tx_t, tx_list);
352 list_del(&tx->tx_list);
353 kptllnd_tx_decref(tx);
358 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
360 switch (peer->peer_state) {
364 case PEER_STATE_WAITING_HELLO:
365 case PEER_STATE_ACTIVE:
366 /* Ensure new peers see a new incarnation of me */
367 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
368 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
369 kptllnd_data.kptl_incarnation++;
371 /* Removing from peer table */
372 kptllnd_data.kptl_n_active_peers--;
373 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
375 list_del(&peer->peer_list);
376 kptllnd_peer_unreserve_buffers();
378 peer->peer_error = why; /* stash 'why' only on first close */
379 peer->peer_state = PEER_STATE_CLOSING;
381 /* Schedule for immediate attention, taking peer table's ref */
382 list_add_tail(&peer->peer_list,
383 &kptllnd_data.kptl_closing_peers);
384 wake_up(&kptllnd_data.kptl_watchdog_waitq);
387 case PEER_STATE_ZOMBIE:
388 case PEER_STATE_CLOSING:
394 kptllnd_peer_close(kptl_peer_t *peer, int why)
398 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
399 kptllnd_peer_close_locked(peer, why);
400 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
404 kptllnd_peer_del(lnet_process_id_t id)
406 struct list_head *ptmp;
407 struct list_head *pnxt;
416 * Find the single bucket we are supposed to look at or if nid is a
417 * wildcard (LNET_NID_ANY) then look at all of the buckets
419 if (id.nid != LNET_NID_ANY) {
420 struct list_head *l = kptllnd_nid2peerlist(id.nid);
422 lo = hi = l - kptllnd_data.kptl_peers;
424 if (id.pid != LNET_PID_ANY)
428 hi = kptllnd_data.kptl_peer_hash_size - 1;
432 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
434 for (i = lo; i <= hi; i++) {
435 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
436 peer = list_entry (ptmp, kptl_peer_t, peer_list);
438 if (!(id.nid == LNET_NID_ANY ||
439 (peer->peer_id.nid == id.nid &&
440 (id.pid == LNET_PID_ANY ||
441 peer->peer_id.pid == id.pid))))
444 kptllnd_peer_addref(peer); /* 1 ref for me... */
446 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
449 kptllnd_peer_close(peer, 0);
450 kptllnd_peer_decref(peer); /* ...until here */
452 rc = 0; /* matched something */
454 /* start again now I've dropped the lock */
459 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
465 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
467 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
468 ptl_handle_md_t rdma_mdh = PTL_INVALID_HANDLE;
469 ptl_handle_md_t msg_mdh = PTL_INVALID_HANDLE;
475 LASSERT (!tx->tx_idle);
476 LASSERT (!tx->tx_active);
477 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
478 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
479 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
480 tx->tx_type == TX_TYPE_PUT_REQUEST ||
481 tx->tx_type == TX_TYPE_GET_REQUEST);
483 kptllnd_set_tx_peer(tx, peer);
485 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
486 tx->tx_type == TX_TYPE_GET_REQUEST) {
488 spin_lock_irqsave(&peer->peer_lock, flags);
490 /* Assume 64-bit matchbits can't wrap */
491 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
492 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
493 peer->peer_next_matchbits++;
495 spin_unlock_irqrestore(&peer->peer_lock, flags);
497 prc = PtlMEAttach(kptllnd_data.kptl_nih,
498 *kptllnd_tunables.kptl_portal,
500 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
506 CERROR("PtlMEAttach(%s) failed: %d\n",
507 libcfs_id2str(peer->peer_id), prc);
511 prc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, &rdma_mdh);
513 CERROR("PtlMDAttach(%s) failed: %d\n",
514 libcfs_id2str(tx->tx_peer->peer_id), prc);
515 prc = PtlMEUnlink(meh);
516 LASSERT(prc == PTL_OK);
517 rdma_mdh = PTL_INVALID_HANDLE;
521 /* I'm not racing with the event callback here. It's a bug if
522 * there's an event on the MD I just attached before I actually
523 * send the RDMA request message which the event callback
524 * catches by asserting 'rdma_mdh' is valid. */
527 memset(&md, 0, sizeof(md));
529 md.threshold = tx->tx_acked ? 2 : 1; /* SEND END + ACK? */
530 md.options = PTL_MD_OP_PUT |
531 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
532 PTL_MD_EVENT_START_DISABLE;
533 md.user_ptr = &tx->tx_msg_eventarg;
534 md.eq_handle = kptllnd_data.kptl_eqh;
537 md.start = tx->tx_msg;
538 md.length = tx->tx_msg->ptlm_nob;
541 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
543 md.start = tx->tx_frags;
545 md.options |= PTL_MD_IOVEC;
548 prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
550 msg_mdh = PTL_INVALID_HANDLE;
554 spin_lock_irqsave(&peer->peer_lock, flags);
556 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
558 tx->tx_rdma_mdh = rdma_mdh;
559 tx->tx_msg_mdh = msg_mdh;
561 /* Ensure HELLO is sent first */
562 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
563 list_add(&tx->tx_list, &peer->peer_sendq);
565 list_add_tail(&tx->tx_list, &peer->peer_sendq);
567 spin_unlock_irqrestore(&peer->peer_lock, flags);
571 spin_lock_irqsave(&peer->peer_lock, flags);
573 tx->tx_status = -EIO;
574 tx->tx_rdma_mdh = rdma_mdh;
575 tx->tx_msg_mdh = msg_mdh;
577 spin_unlock_irqrestore(&peer->peer_lock, flags);
579 kptllnd_tx_decref(tx);
583 kptllnd_peer_check_sends (kptl_peer_t *peer)
590 LASSERT(!in_interrupt());
592 spin_lock_irqsave(&peer->peer_lock, flags);
594 peer->peer_retry_noop = 0;
596 if (list_empty(&peer->peer_sendq) &&
597 peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
598 peer->peer_credits != 0) {
600 /* post a NOOP to return credits */
601 spin_unlock_irqrestore(&peer->peer_lock, flags);
603 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
605 CERROR("Can't return credits to %s: can't allocate descriptor\n",
606 libcfs_id2str(peer->peer_id));
608 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
609 kptllnd_post_tx(peer, tx, 0);
612 spin_lock_irqsave(&peer->peer_lock, flags);
613 peer->peer_retry_noop = (tx == NULL);
616 while (!list_empty(&peer->peer_sendq)) {
617 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
619 LASSERT (tx->tx_active);
620 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
621 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
622 !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
624 LASSERT (peer->peer_outstanding_credits >= 0);
625 LASSERT (peer->peer_sent_credits >= 0);
626 LASSERT (peer->peer_sent_credits +
627 peer->peer_outstanding_credits <=
628 *kptllnd_tunables.kptl_peercredits);
629 LASSERT (peer->peer_credits >= 0);
631 /* Ensure HELLO is sent first */
632 if (!peer->peer_sent_hello) {
633 if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
635 peer->peer_sent_hello = 1;
638 if (peer->peer_credits == 0) {
639 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n",
640 libcfs_id2str(peer->peer_id),
642 peer->peer_outstanding_credits,
643 peer->peer_sent_credits, tx);
647 /* Don't use the last credit unless I've got credits to
649 if (peer->peer_credits == 1 &&
650 peer->peer_outstanding_credits == 0) {
651 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
652 "not using last credit for %p\n",
653 libcfs_id2str(peer->peer_id),
655 peer->peer_outstanding_credits,
656 peer->peer_sent_credits, tx);
660 list_del(&tx->tx_list);
662 /* Discard any NOOP I queued if I'm not at the high-water mark
663 * any more or more messages have been queued */
664 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
665 (!list_empty(&peer->peer_sendq) ||
666 peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
670 spin_unlock_irqrestore(&peer->peer_lock, flags);
672 CDEBUG(D_NET, "%s: redundant noop\n",
673 libcfs_id2str(peer->peer_id));
674 kptllnd_tx_decref(tx);
676 spin_lock_irqsave(&peer->peer_lock, flags);
680 /* fill last-minute msg header fields */
681 kptllnd_msg_pack(tx->tx_msg, peer);
683 peer->peer_sent_credits += peer->peer_outstanding_credits;
684 peer->peer_outstanding_credits = 0;
685 peer->peer_credits--;
687 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
688 libcfs_id2str(peer->peer_id), peer->peer_credits,
689 peer->peer_outstanding_credits, peer->peer_sent_credits,
690 kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
691 tx, tx->tx_msg->ptlm_nob,
692 tx->tx_msg->ptlm_credits);
694 list_add_tail(&tx->tx_list, &peer->peer_activeq);
696 kptllnd_tx_addref(tx); /* 1 ref for me... */
698 spin_unlock_irqrestore(&peer->peer_lock, flags);
700 tx->tx_tposted = jiffies; /* going on the wire */
702 rc = PtlPut (tx->tx_msg_mdh,
703 tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
705 *kptllnd_tunables.kptl_portal,
709 0); /* header data */
711 CERROR("PtlPut %s error %d\n",
712 libcfs_id2str(peer->peer_id), rc);
714 /* Nuke everything (including this tx) */
715 kptllnd_peer_close(peer, -EIO);
719 kptllnd_tx_decref(tx); /* drop my ref */
721 spin_lock_irqsave(&peer->peer_lock, flags);
724 spin_unlock_irqrestore(&peer->peer_lock, flags);
728 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
731 struct list_head *tmp;
733 list_for_each(tmp, &peer->peer_sendq) {
734 tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
736 if (time_after_eq(jiffies, tx->tx_deadline)) {
737 kptllnd_tx_addref(tx);
742 list_for_each(tmp, &peer->peer_activeq) {
743 tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
745 if (time_after_eq(jiffies, tx->tx_deadline)) {
746 kptllnd_tx_addref(tx);
756 kptllnd_peer_check_bucket (int idx, int stamp)
758 struct list_head *peers = &kptllnd_data.kptl_peers[idx];
759 struct list_head *ptmp;
767 CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
770 /* NB. Shared lock while I just look */
771 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
773 list_for_each (ptmp, peers) {
774 peer = list_entry (ptmp, kptl_peer_t, peer_list);
776 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
777 libcfs_id2str(peer->peer_id), peer->peer_credits,
778 peer->peer_outstanding_credits, peer->peer_sent_credits);
780 spin_lock(&peer->peer_lock);
782 if (peer->peer_check_stamp == stamp) {
783 /* checked already this pass */
784 spin_unlock(&peer->peer_lock);
788 peer->peer_check_stamp = stamp;
789 tx = kptllnd_find_timed_out_tx(peer);
790 check_sends = peer->peer_retry_noop;
792 spin_unlock(&peer->peer_lock);
794 if (tx == NULL && !check_sends)
797 kptllnd_peer_addref(peer); /* 1 ref for me... */
799 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
801 if (tx == NULL) { /* nothing timed out */
802 kptllnd_peer_check_sends(peer);
803 kptllnd_peer_decref(peer); /* ...until here or... */
805 /* rescan after dropping the lock */
809 spin_lock_irqsave(&peer->peer_lock, flags);
810 nsend = kptllnd_count_queue(&peer->peer_sendq);
811 nactive = kptllnd_count_queue(&peer->peer_activeq);
812 spin_unlock_irqrestore(&peer->peer_lock, flags);
814 LCONSOLE_ERROR("Timing out %s: %s\n",
815 libcfs_id2str(peer->peer_id),
816 (tx->tx_tposted == 0) ?
817 "no free peer buffers" : "please check Portals");
819 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
820 "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
821 "%sposted %lu T/O %ds\n",
822 libcfs_id2str(peer->peer_id), peer->peer_credits,
823 peer->peer_outstanding_credits, peer->peer_sent_credits,
824 nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
825 tx->tx_active ? "A" : "",
826 PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
828 PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
831 (tx->tx_tposted == 0) ? "not " : "",
832 (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
833 *kptllnd_tunables.kptl_timeout);
835 kptllnd_dump_ptltrace();
837 kptllnd_tx_decref(tx);
839 kptllnd_peer_close(peer, -ETIMEDOUT);
840 kptllnd_peer_decref(peer); /* ...until here */
842 /* start again now I've dropped the lock */
846 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
850 kptllnd_id2peer_locked (lnet_process_id_t id)
852 struct list_head *peers = kptllnd_nid2peerlist(id.nid);
853 struct list_head *tmp;
856 list_for_each (tmp, peers) {
858 peer = list_entry (tmp, kptl_peer_t, peer_list);
860 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
861 peer->peer_state == PEER_STATE_ACTIVE);
863 if (peer->peer_id.nid != id.nid ||
864 peer->peer_id.pid != id.pid)
867 kptllnd_peer_addref(peer);
869 CDEBUG(D_NET, "%s -> %s (%d)\n",
871 kptllnd_ptlid2str(peer->peer_ptlid),
872 atomic_read (&peer->peer_refcount));
880 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
882 LCONSOLE_ERROR("%s %s overflows the peer table[%d]: "
883 "messages may be dropped\n",
884 str, libcfs_id2str(id),
885 kptllnd_data.kptl_n_active_peers);
886 LCONSOLE_ERROR("Please correct by increasing "
887 "'max_nodes' or 'max_procs_per_node'\n");
891 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
894 struct list_head *tmp;
896 /* Find the last matchbits I saw this new peer using. Note..
897 A. This peer cannot be in the peer table - she's new!
898 B. If I can't find the peer in the closing/zombie peers, all
899 matchbits are safe because all refs to the (old) peer have gone
900 so all txs have completed so there's no risk of matchbit
904 LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
906 /* peer's last matchbits can't change after it comes out of the peer
907 * table, so first match is fine */
909 list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
910 peer = list_entry (tmp, kptl_peer_t, peer_list);
912 if (peer->peer_id.nid == lpid.nid &&
913 peer->peer_id.pid == lpid.pid)
914 return peer->peer_last_matchbits_seen;
917 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
918 peer = list_entry (tmp, kptl_peer_t, peer_list);
920 if (peer->peer_id.nid == lpid.nid &&
921 peer->peer_id.pid == lpid.pid)
922 return peer->peer_last_matchbits_seen;
925 return PTL_RESERVED_MATCHBITS;
929 kptllnd_peer_handle_hello (ptl_process_id_t initiator,
932 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
934 kptl_peer_t *new_peer;
935 lnet_process_id_t lpid;
939 __u64 safe_matchbits;
940 __u64 last_matchbits_seen;
942 lpid.nid = msg->ptlm_srcnid;
943 lpid.pid = msg->ptlm_srcpid;
945 CDEBUG(D_NET, "hello from %s(%s)\n",
946 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
948 if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
949 (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
950 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
951 * userspace. Refuse the connection if she hasn't set the
952 * correct flag in her PID... */
953 CERROR("Userflag not set in hello from %s (%s)\n",
954 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
958 /* kptlhm_matchbits are the highest matchbits my peer may have used to
959 * RDMA to me. I ensure I never register buffers for RDMA that could
960 * match any she used */
961 safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
963 if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
964 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
965 safe_matchbits, libcfs_id2str(lpid));
969 if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
970 CERROR("%s: max message size %d < MIN %d",
972 msg->ptlm_u.hello.kptlhm_max_msg_size,
973 *kptllnd_tunables.kptl_max_msg_size);
977 if (msg->ptlm_credits <= 1) {
978 CERROR("Need more than 1+%d credits from %s\n",
979 msg->ptlm_credits, libcfs_id2str(lpid));
983 write_lock_irqsave(g_lock, flags);
985 peer = kptllnd_id2peer_locked(lpid);
987 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
988 /* Completing HELLO handshake */
989 LASSERT(peer->peer_incarnation == 0);
991 if (msg->ptlm_dststamp != 0 &&
992 msg->ptlm_dststamp != peer->peer_myincarnation) {
993 write_unlock_irqrestore(g_lock, flags);
995 CERROR("Ignoring HELLO from %s: unexpected "
996 "dststamp "LPX64" ("LPX64" wanted)\n",
999 peer->peer_myincarnation);
1000 kptllnd_peer_decref(peer);
1004 /* Concurrent initiation or response to my HELLO */
1005 peer->peer_state = PEER_STATE_ACTIVE;
1006 peer->peer_incarnation = msg->ptlm_srcstamp;
1007 peer->peer_next_matchbits = safe_matchbits;
1008 peer->peer_max_msg_size =
1009 msg->ptlm_u.hello.kptlhm_max_msg_size;
1011 write_unlock_irqrestore(g_lock, flags);
1015 if (msg->ptlm_dststamp != 0 &&
1016 msg->ptlm_dststamp <= peer->peer_myincarnation) {
1017 write_unlock_irqrestore(g_lock, flags);
1019 CERROR("Ignoring stale HELLO from %s: "
1020 "dststamp "LPX64" (current "LPX64")\n",
1021 libcfs_id2str(lpid),
1023 peer->peer_myincarnation);
1024 kptllnd_peer_decref(peer);
1028 /* Brand new connection attempt: remove old incarnation */
1029 kptllnd_peer_close_locked(peer, 0);
1032 kptllnd_cull_peertable_locked(lpid);
1034 write_unlock_irqrestore(g_lock, flags);
1037 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1038 " stamp "LPX64"("LPX64")\n",
1039 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1040 msg->ptlm_srcstamp, peer->peer_incarnation);
1042 kptllnd_peer_decref(peer);
1045 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1046 if (hello_tx == NULL) {
1047 CERROR("Unable to allocate HELLO message for %s\n",
1048 libcfs_id2str(lpid));
1052 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1053 sizeof(kptl_hello_msg_t));
1055 new_peer = kptllnd_peer_allocate(lpid, initiator);
1056 if (new_peer == NULL) {
1057 kptllnd_tx_decref(hello_tx);
1061 rc = kptllnd_peer_reserve_buffers();
1063 kptllnd_peer_decref(new_peer);
1064 kptllnd_tx_decref(hello_tx);
1066 CERROR("Failed to reserve buffers for %s\n",
1067 libcfs_id2str(lpid));
1071 write_lock_irqsave(g_lock, flags);
1073 peer = kptllnd_id2peer_locked(lpid);
1075 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1076 /* An outgoing message instantiated 'peer' for me */
1077 LASSERT(peer->peer_incarnation == 0);
1079 peer->peer_state = PEER_STATE_ACTIVE;
1080 peer->peer_incarnation = msg->ptlm_srcstamp;
1081 peer->peer_next_matchbits = safe_matchbits;
1082 peer->peer_max_msg_size =
1083 msg->ptlm_u.hello.kptlhm_max_msg_size;
1085 write_unlock_irqrestore(g_lock, flags);
1087 CWARN("Outgoing instantiated peer %s\n",
1088 libcfs_id2str(lpid));
1090 LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1092 write_unlock_irqrestore(g_lock, flags);
1094 /* WOW! Somehow this peer completed the HELLO
1095 * handshake while I slept. I guess I could have slept
1096 * while it rebooted and sent a new HELLO, so I'll fail
1098 CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1099 kptllnd_peer_decref(peer);
1103 kptllnd_peer_unreserve_buffers();
1104 kptllnd_peer_decref(new_peer);
1105 kptllnd_tx_decref(hello_tx);
1109 if (kptllnd_data.kptl_n_active_peers ==
1110 kptllnd_data.kptl_expected_peers) {
1111 /* peer table full */
1112 write_unlock_irqrestore(g_lock, flags);
1114 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1116 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1118 CERROR("Refusing connection from %s\n",
1119 libcfs_id2str(lpid));
1120 kptllnd_peer_unreserve_buffers();
1121 kptllnd_peer_decref(new_peer);
1122 kptllnd_tx_decref(hello_tx);
1126 write_lock_irqsave(g_lock, flags);
1127 kptllnd_data.kptl_expected_peers++;
1131 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1133 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1134 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1135 *kptllnd_tunables.kptl_max_msg_size;
1137 new_peer->peer_state = PEER_STATE_ACTIVE;
1138 new_peer->peer_incarnation = msg->ptlm_srcstamp;
1139 new_peer->peer_next_matchbits = safe_matchbits;
1140 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1141 new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1143 kptllnd_peer_add_peertable_locked(new_peer);
1145 write_unlock_irqrestore(g_lock, flags);
1147 /* NB someone else could get in now and post a message before I post
1148 * the HELLO, but post_tx/check_sends take care of that! */
1150 CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1151 libcfs_id2str(new_peer->peer_id), hello_tx);
1153 kptllnd_post_tx(new_peer, hello_tx, 0);
1154 kptllnd_peer_check_sends(new_peer);
1160 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1162 kptllnd_post_tx(peer, tx, nfrag);
1163 kptllnd_peer_check_sends(peer);
1167 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1169 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1170 ptl_process_id_t ptl_id;
1171 kptl_peer_t *new_peer;
1172 kptl_tx_t *hello_tx;
1173 unsigned long flags;
1175 __u64 last_matchbits_seen;
1177 /* I expect to find the peer, so I only take a read lock... */
1178 read_lock_irqsave(g_lock, flags);
1179 *peerp = kptllnd_id2peer_locked(target);
1180 read_unlock_irqrestore(g_lock, flags);
1185 if ((target.pid & LNET_PID_USERFLAG) != 0) {
1186 CWARN("Refusing to create a new connection to %s "
1187 "(non-kernel peer)\n", libcfs_id2str(target));
1188 return -EHOSTUNREACH;
1191 /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1192 * the same portals PID */
1193 ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1194 ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1196 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1197 if (hello_tx == NULL) {
1198 CERROR("Unable to allocate connect message for %s\n",
1199 libcfs_id2str(target));
1203 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1204 sizeof(kptl_hello_msg_t));
1206 new_peer = kptllnd_peer_allocate(target, ptl_id);
1207 if (new_peer == NULL) {
1212 rc = kptllnd_peer_reserve_buffers();
1216 write_lock_irqsave(g_lock, flags);
1218 *peerp = kptllnd_id2peer_locked(target);
1219 if (*peerp != NULL) {
1220 write_unlock_irqrestore(g_lock, flags);
1224 kptllnd_cull_peertable_locked(target);
1226 if (kptllnd_data.kptl_n_active_peers ==
1227 kptllnd_data.kptl_expected_peers) {
1228 /* peer table full */
1229 write_unlock_irqrestore(g_lock, flags);
1231 kptllnd_peertable_overflow_msg("Connection to ", target);
1233 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1235 CERROR("Can't create connection to %s\n",
1236 libcfs_id2str(target));
1240 write_lock_irqsave(g_lock, flags);
1241 kptllnd_data.kptl_expected_peers++;
1245 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1247 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1248 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1249 *kptllnd_tunables.kptl_max_msg_size;
1251 new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1252 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1254 kptllnd_peer_add_peertable_locked(new_peer);
1256 write_unlock_irqrestore(g_lock, flags);
1258 /* NB someone else could get in now and post a message before I post
1259 * the HELLO, but post_tx/check_sends take care of that! */
1261 CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1262 libcfs_id2str(new_peer->peer_id), hello_tx);
1264 kptllnd_post_tx(new_peer, hello_tx, 0);
1265 kptllnd_peer_check_sends(new_peer);
1271 kptllnd_peer_unreserve_buffers();
1273 kptllnd_peer_decref(new_peer);
1275 kptllnd_tx_decref(hello_tx);