1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: PJ Kirner <pjkirner@clusterfs.com>
6 * E Barton <eeb@bartonsoftware.com>
8 * This file is part of the Lustre file system, http://www.lustre.org
9 * Lustre is a trademark of Cluster File Systems, Inc.
11 * This file is confidential source code owned by Cluster File Systems.
12 * No viewing, modification, compilation, redistribution, or any other
13 * form of use is permitted except through a signed license agreement.
15 * If you have not signed such an agreement, then you have no rights to
16 * this file. Please destroy it immediately and contact CFS.
21 #include <libcfs/list.h>
24 kptllnd_count_queue(struct list_head *q)
37 kptllnd_get_peer_info(int index,
38 lnet_process_id_t *id,
39 int *state, int *sent_hello,
40 int *refcount, __u64 *incarnation,
41 __u64 *next_matchbits, __u64 *last_matchbits_seen,
42 int *nsendq, int *nactiveq,
43 int *credits, int *outstanding_credits)
45 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
47 struct list_head *ptmp;
52 read_lock_irqsave(g_lock, flags);
54 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
56 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57 peer = list_entry(ptmp, kptl_peer_t, peer_list);
63 *state = peer->peer_state;
64 *sent_hello = peer->peer_sent_hello;
65 *refcount = atomic_read(&peer->peer_refcount);
66 *incarnation = peer->peer_incarnation;
68 spin_lock(&peer->peer_lock);
70 *next_matchbits = peer->peer_next_matchbits;
71 *last_matchbits_seen = peer->peer_last_matchbits_seen;
72 *credits = peer->peer_credits;
73 *outstanding_credits = peer->peer_outstanding_credits;
75 *nsendq = kptllnd_count_queue(&peer->peer_sendq);
76 *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
78 spin_unlock(&peer->peer_lock);
86 read_unlock_irqrestore(g_lock, flags);
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
93 LASSERT (!kptllnd_data.kptl_shutdown);
94 LASSERT (kptllnd_data.kptl_n_active_peers <
95 kptllnd_data.kptl_expected_peers);
97 LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
98 peer->peer_state == PEER_STATE_ACTIVE);
100 kptllnd_data.kptl_n_active_peers++;
101 atomic_inc(&peer->peer_refcount); /* +1 ref for the list */
103 /* NB add to HEAD of peer list for MRU order!
104 * (see kptllnd_cull_peertable) */
105 list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
109 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
111 /* I'm about to add a new peer with this portals ID to the peer table,
112 * so (a) this peer should not exist already and (b) I want to leave at
113 * most (max_procs_per_nid - 1) peers with this NID in the table. */
114 struct list_head *peers = kptllnd_nid2peerlist(pid.nid);
115 int cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
117 struct list_head *tmp;
118 struct list_head *nxt;
122 list_for_each_safe (tmp, nxt, peers) {
123 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
125 peer = list_entry(tmp, kptl_peer_t, peer_list);
127 if (peer->peer_id.nid != pid.nid)
130 LASSERT (peer->peer_id.pid != pid.pid);
134 if (count < cull_count) /* recent (don't cull) */
137 CDEBUG(D_NET, "Cull %s(%s)\n",
138 libcfs_id2str(peer->peer_id),
139 kptllnd_ptlid2str(peer->peer_ptlid));
141 kptllnd_peer_close_locked(peer, 0);
146 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
151 LIBCFS_ALLOC(peer, sizeof (*peer));
153 CERROR("Can't create peer %s (%s)\n",
155 kptllnd_ptlid2str(ppid));
159 memset(peer, 0, sizeof(*peer)); /* zero flags etc */
161 INIT_LIST_HEAD (&peer->peer_noops);
162 INIT_LIST_HEAD (&peer->peer_sendq);
163 INIT_LIST_HEAD (&peer->peer_activeq);
164 spin_lock_init (&peer->peer_lock);
166 peer->peer_state = PEER_STATE_ALLOCATED;
167 peer->peer_error = 0;
168 peer->peer_last_alive = cfs_time_current();
169 peer->peer_id = lpid;
170 peer->peer_ptlid = ppid;
171 peer->peer_credits = 1; /* enough for HELLO */
172 peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
173 peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
174 peer->peer_sent_credits = 1; /* HELLO credit is implicit */
175 peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
177 atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */
179 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
181 peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
183 /* Only increase # peers under lock, to guarantee we dont grow it
185 if (kptllnd_data.kptl_shutdown) {
186 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
188 LIBCFS_FREE(peer, sizeof(*peer));
192 kptllnd_data.kptl_npeers++;
193 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
199 kptllnd_peer_destroy (kptl_peer_t *peer)
203 CDEBUG(D_NET, "Peer=%p\n", peer);
205 LASSERT (!in_interrupt());
206 LASSERT (atomic_read(&peer->peer_refcount) == 0);
207 LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
208 peer->peer_state == PEER_STATE_ZOMBIE);
209 LASSERT (list_empty(&peer->peer_noops));
210 LASSERT (list_empty(&peer->peer_sendq));
211 LASSERT (list_empty(&peer->peer_activeq));
213 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
215 if (peer->peer_state == PEER_STATE_ZOMBIE)
216 list_del(&peer->peer_list);
218 kptllnd_data.kptl_npeers--;
220 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
222 LIBCFS_FREE (peer, sizeof (*peer));
226 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
228 struct list_head *tmp;
229 struct list_head *nxt;
232 list_for_each_safe (tmp, nxt, peerq) {
233 tx = list_entry(tmp, kptl_tx_t, tx_list);
235 list_del(&tx->tx_list);
236 list_add_tail(&tx->tx_list, txs);
238 tx->tx_status = -EIO;
244 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
248 spin_lock_irqsave(&peer->peer_lock, flags);
250 kptllnd_cancel_txlist(&peer->peer_noops, txs);
251 kptllnd_cancel_txlist(&peer->peer_sendq, txs);
252 kptllnd_cancel_txlist(&peer->peer_activeq, txs);
254 spin_unlock_irqrestore(&peer->peer_lock, flags);
258 kptllnd_peer_alive (kptl_peer_t *peer)
260 /* This is racy, but everyone's only writing cfs_time_current() */
261 peer->peer_last_alive = cfs_time_current();
266 kptllnd_peer_notify (kptl_peer_t *peer)
269 time_t last_alive = 0;
272 spin_lock_irqsave(&peer->peer_lock, flags);
274 if (peer->peer_error != 0) {
275 error = peer->peer_error;
276 peer->peer_error = 0;
278 last_alive = cfs_time_current_sec() -
279 cfs_duration_sec(cfs_time_current() -
280 peer->peer_last_alive);
283 spin_unlock_irqrestore(&peer->peer_lock, flags);
286 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
291 kptllnd_handle_closing_peers ()
294 struct list_head txs;
296 struct list_head *tmp;
297 struct list_head *nxt;
301 /* Check with a read lock first to avoid blocking anyone */
303 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
304 idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
305 list_empty(&kptllnd_data.kptl_zombie_peers);
306 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
311 INIT_LIST_HEAD(&txs);
313 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
315 /* Cancel txs on all zombie peers. NB anyone dropping the last peer
316 * ref removes it from this list, so I musn't drop the lock while
318 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
319 peer = list_entry (tmp, kptl_peer_t, peer_list);
321 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
323 kptllnd_peer_cancel_txs(peer, &txs);
326 /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB
327 * I'm the only one removing from this list, but peers can be added on
328 * the end any time I drop the lock. */
330 list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
331 peer = list_entry (tmp, kptl_peer_t, peer_list);
333 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
335 list_del(&peer->peer_list);
336 list_add_tail(&peer->peer_list,
337 &kptllnd_data.kptl_zombie_peers);
338 peer->peer_state = PEER_STATE_ZOMBIE;
340 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
342 kptllnd_peer_notify(peer);
343 kptllnd_peer_cancel_txs(peer, &txs);
344 kptllnd_peer_decref(peer);
346 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
349 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
351 /* Drop peer's ref on all cancelled txs. This will get
352 * kptllnd_tx_fini() to abort outstanding comms if necessary. */
354 list_for_each_safe (tmp, nxt, &txs) {
355 tx = list_entry(tmp, kptl_tx_t, tx_list);
356 list_del(&tx->tx_list);
357 kptllnd_tx_decref(tx);
362 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
364 switch (peer->peer_state) {
368 case PEER_STATE_WAITING_HELLO:
369 case PEER_STATE_ACTIVE:
370 /* Ensure new peers see a new incarnation of me */
371 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
372 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
373 kptllnd_data.kptl_incarnation++;
375 /* Removing from peer table */
376 kptllnd_data.kptl_n_active_peers--;
377 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
379 list_del(&peer->peer_list);
380 kptllnd_peer_unreserve_buffers();
382 peer->peer_error = why; /* stash 'why' only on first close */
383 peer->peer_state = PEER_STATE_CLOSING;
385 /* Schedule for immediate attention, taking peer table's ref */
386 list_add_tail(&peer->peer_list,
387 &kptllnd_data.kptl_closing_peers);
388 wake_up(&kptllnd_data.kptl_watchdog_waitq);
391 case PEER_STATE_ZOMBIE:
392 case PEER_STATE_CLOSING:
398 kptllnd_peer_close(kptl_peer_t *peer, int why)
402 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
403 kptllnd_peer_close_locked(peer, why);
404 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
408 kptllnd_peer_del(lnet_process_id_t id)
410 struct list_head *ptmp;
411 struct list_head *pnxt;
420 * Find the single bucket we are supposed to look at or if nid is a
421 * wildcard (LNET_NID_ANY) then look at all of the buckets
423 if (id.nid != LNET_NID_ANY) {
424 struct list_head *l = kptllnd_nid2peerlist(id.nid);
426 lo = hi = l - kptllnd_data.kptl_peers;
428 if (id.pid != LNET_PID_ANY)
432 hi = kptllnd_data.kptl_peer_hash_size - 1;
436 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
438 for (i = lo; i <= hi; i++) {
439 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
440 peer = list_entry (ptmp, kptl_peer_t, peer_list);
442 if (!(id.nid == LNET_NID_ANY ||
443 (peer->peer_id.nid == id.nid &&
444 (id.pid == LNET_PID_ANY ||
445 peer->peer_id.pid == id.pid))))
448 kptllnd_peer_addref(peer); /* 1 ref for me... */
450 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
453 kptllnd_peer_close(peer, 0);
454 kptllnd_peer_decref(peer); /* ...until here */
456 rc = 0; /* matched something */
458 /* start again now I've dropped the lock */
463 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
469 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
471 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
472 ptl_handle_md_t msg_mdh;
477 LASSERT (!tx->tx_idle);
478 LASSERT (!tx->tx_active);
479 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
480 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
481 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
482 tx->tx_type == TX_TYPE_PUT_REQUEST ||
483 tx->tx_type == TX_TYPE_GET_REQUEST);
485 kptllnd_set_tx_peer(tx, peer);
487 memset(&md, 0, sizeof(md));
489 md.threshold = tx->tx_acked ? 2 : 1; /* SEND END + ACK? */
490 md.options = PTL_MD_OP_PUT |
491 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
492 PTL_MD_EVENT_START_DISABLE;
493 md.user_ptr = &tx->tx_msg_eventarg;
494 md.eq_handle = kptllnd_data.kptl_eqh;
497 md.start = tx->tx_msg;
498 md.length = tx->tx_msg->ptlm_nob;
501 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
503 md.start = tx->tx_frags;
505 md.options |= PTL_MD_IOVEC;
508 prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
510 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
511 libcfs_id2str(peer->peer_id),
512 kptllnd_errtype2str(prc), prc);
513 tx->tx_status = -EIO;
514 kptllnd_tx_decref(tx);
518 spin_lock_irqsave(&peer->peer_lock, flags);
520 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
522 tx->tx_msg_mdh = msg_mdh;
524 /* Ensure HELLO is sent first */
525 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
526 list_add(&tx->tx_list, &peer->peer_noops);
527 else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
528 list_add(&tx->tx_list, &peer->peer_sendq);
530 list_add_tail(&tx->tx_list, &peer->peer_sendq);
532 spin_unlock_irqrestore(&peer->peer_lock, flags);
536 kptllnd_peer_send_noop (kptl_peer_t *peer)
538 if (!peer->peer_sent_hello ||
539 peer->peer_credits == 0 ||
540 !list_empty(&peer->peer_noops) ||
541 peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
544 /* No tx to piggyback NOOP onto or no credit to send a tx */
545 return (list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
549 kptllnd_peer_check_sends (kptl_peer_t *peer)
557 LASSERT(!in_interrupt());
559 spin_lock_irqsave(&peer->peer_lock, flags);
561 peer->peer_retry_noop = 0;
563 if (kptllnd_peer_send_noop(peer)) {
564 /* post a NOOP to return credits */
565 spin_unlock_irqrestore(&peer->peer_lock, flags);
567 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
569 CERROR("Can't return credits to %s: can't allocate descriptor\n",
570 libcfs_id2str(peer->peer_id));
572 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
573 kptllnd_post_tx(peer, tx, 0);
576 spin_lock_irqsave(&peer->peer_lock, flags);
577 peer->peer_retry_noop = (tx == NULL);
581 if (!list_empty(&peer->peer_noops)) {
582 LASSERT (peer->peer_sent_hello);
583 tx = list_entry(peer->peer_noops.next,
585 } else if (!list_empty(&peer->peer_sendq)) {
586 tx = list_entry(peer->peer_sendq.next,
589 /* nothing to send right now */
593 LASSERT (tx->tx_active);
594 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
595 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
597 LASSERT (peer->peer_outstanding_credits >= 0);
598 LASSERT (peer->peer_sent_credits >= 0);
599 LASSERT (peer->peer_sent_credits +
600 peer->peer_outstanding_credits <=
601 *kptllnd_tunables.kptl_peercredits);
602 LASSERT (peer->peer_credits >= 0);
604 msg_type = tx->tx_msg->ptlm_type;
606 /* Ensure HELLO is sent first */
607 if (!peer->peer_sent_hello) {
608 LASSERT (list_empty(&peer->peer_noops));
609 if (msg_type != PTLLND_MSG_TYPE_HELLO)
611 peer->peer_sent_hello = 1;
614 if (peer->peer_credits == 0) {
615 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
616 libcfs_id2str(peer->peer_id),
618 peer->peer_outstanding_credits,
619 peer->peer_sent_credits,
620 kptllnd_msgtype2str(msg_type), tx);
624 /* Last/Initial credit reserved for NOOP/HELLO */
625 if (peer->peer_credits == 1 &&
626 msg_type != PTLLND_MSG_TYPE_HELLO &&
627 msg_type != PTLLND_MSG_TYPE_NOOP) {
628 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
629 "not using last credit for %s[%p]\n",
630 libcfs_id2str(peer->peer_id),
632 peer->peer_outstanding_credits,
633 peer->peer_sent_credits,
634 kptllnd_msgtype2str(msg_type), tx);
638 list_del(&tx->tx_list);
640 /* Discard any NOOP I queued if I'm not at the high-water mark
641 * any more or more messages have been queued */
642 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
643 !kptllnd_peer_send_noop(peer)) {
646 spin_unlock_irqrestore(&peer->peer_lock, flags);
648 CDEBUG(D_NET, "%s: redundant noop\n",
649 libcfs_id2str(peer->peer_id));
650 kptllnd_tx_decref(tx);
652 spin_lock_irqsave(&peer->peer_lock, flags);
656 /* fill last-minute msg fields */
657 kptllnd_msg_pack(tx->tx_msg, peer);
659 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
660 tx->tx_type == TX_TYPE_GET_REQUEST) {
661 /* peer_next_matchbits must be known good */
662 LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
663 /* Assume 64-bit matchbits can't wrap */
664 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
665 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
666 peer->peer_next_matchbits++;
669 peer->peer_sent_credits += peer->peer_outstanding_credits;
670 peer->peer_outstanding_credits = 0;
671 peer->peer_credits--;
673 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
674 libcfs_id2str(peer->peer_id), peer->peer_credits,
675 peer->peer_outstanding_credits, peer->peer_sent_credits,
676 kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
677 tx->tx_msg->ptlm_credits);
679 list_add_tail(&tx->tx_list, &peer->peer_activeq);
681 kptllnd_tx_addref(tx); /* 1 ref for me... */
683 spin_unlock_irqrestore(&peer->peer_lock, flags);
685 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
686 tx->tx_type == TX_TYPE_GET_REQUEST) {
687 /* Post bulk now we have safe matchbits */
688 rc = PtlMEAttach(kptllnd_data.kptl_nih,
689 *kptllnd_tunables.kptl_portal,
691 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
697 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
698 libcfs_id2str(peer->peer_id),
699 kptllnd_errtype2str(rc), rc);
703 rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
706 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
707 libcfs_id2str(tx->tx_peer->peer_id),
708 kptllnd_errtype2str(rc), rc);
709 rc = PtlMEUnlink(meh);
710 LASSERT(rc == PTL_OK);
711 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
714 /* I'm not racing with the event callback here. It's a
715 * bug if there's an event on the MD I just attached
716 * before I actually send the RDMA request message -
717 * probably matchbits re-used in error. */
720 tx->tx_tposted = jiffies; /* going on the wire */
722 rc = PtlPut (tx->tx_msg_mdh,
723 tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
725 *kptllnd_tunables.kptl_portal,
729 0); /* header data */
731 CERROR("PtlPut %s error %s(%d)\n",
732 libcfs_id2str(peer->peer_id),
733 kptllnd_errtype2str(rc), rc);
737 kptllnd_tx_decref(tx); /* drop my ref */
739 spin_lock_irqsave(&peer->peer_lock, flags);
742 spin_unlock_irqrestore(&peer->peer_lock, flags);
746 /* Nuke everything (including tx we were trying) */
747 kptllnd_peer_close(peer, -EIO);
748 kptllnd_tx_decref(tx);
752 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
755 struct list_head *ele;
757 list_for_each(ele, &peer->peer_sendq) {
758 tx = list_entry(ele, kptl_tx_t, tx_list);
760 if (time_after_eq(jiffies, tx->tx_deadline)) {
761 kptllnd_tx_addref(tx);
766 list_for_each(ele, &peer->peer_activeq) {
767 tx = list_entry(ele, kptl_tx_t, tx_list);
769 if (time_after_eq(jiffies, tx->tx_deadline)) {
770 kptllnd_tx_addref(tx);
780 kptllnd_peer_check_bucket (int idx, int stamp)
782 struct list_head *peers = &kptllnd_data.kptl_peers[idx];
783 struct list_head *ptmp;
791 CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
794 /* NB. Shared lock while I just look */
795 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
797 list_for_each (ptmp, peers) {
798 peer = list_entry (ptmp, kptl_peer_t, peer_list);
800 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
801 libcfs_id2str(peer->peer_id), peer->peer_credits,
802 peer->peer_outstanding_credits, peer->peer_sent_credits);
804 spin_lock(&peer->peer_lock);
806 if (peer->peer_check_stamp == stamp) {
807 /* checked already this pass */
808 spin_unlock(&peer->peer_lock);
812 peer->peer_check_stamp = stamp;
813 tx = kptllnd_find_timed_out_tx(peer);
814 check_sends = peer->peer_retry_noop;
816 spin_unlock(&peer->peer_lock);
818 if (tx == NULL && !check_sends)
821 kptllnd_peer_addref(peer); /* 1 ref for me... */
823 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
825 if (tx == NULL) { /* nothing timed out */
826 kptllnd_peer_check_sends(peer);
827 kptllnd_peer_decref(peer); /* ...until here or... */
829 /* rescan after dropping the lock */
833 spin_lock_irqsave(&peer->peer_lock, flags);
834 nsend = kptllnd_count_queue(&peer->peer_sendq);
835 nactive = kptllnd_count_queue(&peer->peer_activeq);
836 spin_unlock_irqrestore(&peer->peer_lock, flags);
838 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
839 libcfs_id2str(peer->peer_id),
840 (tx->tx_tposted == 0) ?
841 "no free peer buffers" :
842 "please check Portals");
844 if (tx->tx_tposted) {
845 CERROR("Could not send to %s after %ds (sent %lds ago); "
846 "check Portals for possible issues\n",
847 libcfs_id2str(peer->peer_id),
848 *kptllnd_tunables.kptl_timeout,
849 cfs_duration_sec(jiffies - tx->tx_tposted));
851 CERROR("Could not get credits for %s after %ds; "
852 "possible Lustre networking issues\n",
853 libcfs_id2str(peer->peer_id),
854 *kptllnd_tunables.kptl_timeout);
857 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
858 "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
859 "%sposted %lu T/O %ds\n",
860 libcfs_id2str(peer->peer_id), peer->peer_credits,
861 peer->peer_outstanding_credits, peer->peer_sent_credits,
862 nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
863 tx->tx_active ? "A" : "",
864 PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
866 PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
869 (tx->tx_tposted == 0) ? "not " : "",
870 (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
871 *kptllnd_tunables.kptl_timeout);
873 kptllnd_dump_ptltrace();
875 kptllnd_tx_decref(tx);
877 kptllnd_peer_close(peer, -ETIMEDOUT);
878 kptllnd_peer_decref(peer); /* ...until here */
880 /* start again now I've dropped the lock */
884 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
888 kptllnd_id2peer_locked (lnet_process_id_t id)
890 struct list_head *peers = kptllnd_nid2peerlist(id.nid);
891 struct list_head *tmp;
894 list_for_each (tmp, peers) {
896 peer = list_entry (tmp, kptl_peer_t, peer_list);
898 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
899 peer->peer_state == PEER_STATE_ACTIVE);
901 if (peer->peer_id.nid != id.nid ||
902 peer->peer_id.pid != id.pid)
905 kptllnd_peer_addref(peer);
907 CDEBUG(D_NET, "%s -> %s (%d)\n",
909 kptllnd_ptlid2str(peer->peer_ptlid),
910 atomic_read (&peer->peer_refcount));
918 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
920 LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
921 "messages may be dropped\n",
922 str, libcfs_id2str(id),
923 kptllnd_data.kptl_n_active_peers);
924 LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
925 "'max_nodes' or 'max_procs_per_node'\n");
929 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
932 struct list_head *tmp;
934 /* Find the last matchbits I saw this new peer using. Note..
935 A. This peer cannot be in the peer table - she's new!
936 B. If I can't find the peer in the closing/zombie peers, all
937 matchbits are safe because all refs to the (old) peer have gone
938 so all txs have completed so there's no risk of matchbit
942 LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
944 /* peer's last matchbits can't change after it comes out of the peer
945 * table, so first match is fine */
947 list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
948 peer = list_entry (tmp, kptl_peer_t, peer_list);
950 if (peer->peer_id.nid == lpid.nid &&
951 peer->peer_id.pid == lpid.pid)
952 return peer->peer_last_matchbits_seen;
955 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
956 peer = list_entry (tmp, kptl_peer_t, peer_list);
958 if (peer->peer_id.nid == lpid.nid &&
959 peer->peer_id.pid == lpid.pid)
960 return peer->peer_last_matchbits_seen;
963 return PTL_RESERVED_MATCHBITS;
967 kptllnd_peer_handle_hello (ptl_process_id_t initiator,
970 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
972 kptl_peer_t *new_peer;
973 lnet_process_id_t lpid;
977 __u64 safe_matchbits;
978 __u64 last_matchbits_seen;
980 lpid.nid = msg->ptlm_srcnid;
981 lpid.pid = msg->ptlm_srcpid;
983 CDEBUG(D_NET, "hello from %s(%s)\n",
984 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
986 if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
987 (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
988 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
989 * userspace. Refuse the connection if she hasn't set the
990 * correct flag in her PID... */
991 CERROR("Userflag not set in hello from %s (%s)\n",
992 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
996 /* kptlhm_matchbits are the highest matchbits my peer may have used to
997 * RDMA to me. I ensure I never register buffers for RDMA that could
998 * match any she used */
999 safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1001 if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1002 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1003 safe_matchbits, libcfs_id2str(lpid));
1007 if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1008 CERROR("%s: max message size %d < MIN %d",
1009 libcfs_id2str(lpid),
1010 msg->ptlm_u.hello.kptlhm_max_msg_size,
1011 PTLLND_MIN_BUFFER_SIZE);
1015 if (msg->ptlm_credits <= 1) {
1016 CERROR("Need more than 1+%d credits from %s\n",
1017 msg->ptlm_credits, libcfs_id2str(lpid));
1021 write_lock_irqsave(g_lock, flags);
1023 peer = kptllnd_id2peer_locked(lpid);
1025 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1026 /* Completing HELLO handshake */
1027 LASSERT(peer->peer_incarnation == 0);
1029 if (msg->ptlm_dststamp != 0 &&
1030 msg->ptlm_dststamp != peer->peer_myincarnation) {
1031 write_unlock_irqrestore(g_lock, flags);
1033 CERROR("Ignoring HELLO from %s: unexpected "
1034 "dststamp "LPX64" ("LPX64" wanted)\n",
1035 libcfs_id2str(lpid),
1037 peer->peer_myincarnation);
1038 kptllnd_peer_decref(peer);
1042 /* Concurrent initiation or response to my HELLO */
1043 peer->peer_state = PEER_STATE_ACTIVE;
1044 peer->peer_incarnation = msg->ptlm_srcstamp;
1045 peer->peer_next_matchbits = safe_matchbits;
1046 peer->peer_max_msg_size =
1047 msg->ptlm_u.hello.kptlhm_max_msg_size;
1049 write_unlock_irqrestore(g_lock, flags);
1053 if (msg->ptlm_dststamp != 0 &&
1054 msg->ptlm_dststamp <= peer->peer_myincarnation) {
1055 write_unlock_irqrestore(g_lock, flags);
1057 CERROR("Ignoring stale HELLO from %s: "
1058 "dststamp "LPX64" (current "LPX64")\n",
1059 libcfs_id2str(lpid),
1061 peer->peer_myincarnation);
1062 kptllnd_peer_decref(peer);
1066 /* Brand new connection attempt: remove old incarnation */
1067 kptllnd_peer_close_locked(peer, 0);
1070 kptllnd_cull_peertable_locked(lpid);
1072 write_unlock_irqrestore(g_lock, flags);
1075 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1076 " stamp "LPX64"("LPX64")\n",
1077 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1078 msg->ptlm_srcstamp, peer->peer_incarnation);
1080 kptllnd_peer_decref(peer);
1083 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1084 if (hello_tx == NULL) {
1085 CERROR("Unable to allocate HELLO message for %s\n",
1086 libcfs_id2str(lpid));
1090 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1091 sizeof(kptl_hello_msg_t));
1093 new_peer = kptllnd_peer_allocate(lpid, initiator);
1094 if (new_peer == NULL) {
1095 kptllnd_tx_decref(hello_tx);
1099 rc = kptllnd_peer_reserve_buffers();
1101 kptllnd_peer_decref(new_peer);
1102 kptllnd_tx_decref(hello_tx);
1104 CERROR("Failed to reserve buffers for %s\n",
1105 libcfs_id2str(lpid));
1109 write_lock_irqsave(g_lock, flags);
1112 if (kptllnd_data.kptl_shutdown) {
1113 write_unlock_irqrestore(g_lock, flags);
1115 CERROR ("Shutdown started, refusing connection from %s\n",
1116 libcfs_id2str(lpid));
1117 kptllnd_peer_unreserve_buffers();
1118 kptllnd_peer_decref(new_peer);
1119 kptllnd_tx_decref(hello_tx);
1123 peer = kptllnd_id2peer_locked(lpid);
1125 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1126 /* An outgoing message instantiated 'peer' for me */
1127 LASSERT(peer->peer_incarnation == 0);
1129 peer->peer_state = PEER_STATE_ACTIVE;
1130 peer->peer_incarnation = msg->ptlm_srcstamp;
1131 peer->peer_next_matchbits = safe_matchbits;
1132 peer->peer_max_msg_size =
1133 msg->ptlm_u.hello.kptlhm_max_msg_size;
1135 write_unlock_irqrestore(g_lock, flags);
1137 CWARN("Outgoing instantiated peer %s\n",
1138 libcfs_id2str(lpid));
1140 LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1142 write_unlock_irqrestore(g_lock, flags);
1144 /* WOW! Somehow this peer completed the HELLO
1145 * handshake while I slept. I guess I could have slept
1146 * while it rebooted and sent a new HELLO, so I'll fail
1148 CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1149 kptllnd_peer_decref(peer);
1153 kptllnd_peer_unreserve_buffers();
1154 kptllnd_peer_decref(new_peer);
1155 kptllnd_tx_decref(hello_tx);
1159 if (kptllnd_data.kptl_n_active_peers ==
1160 kptllnd_data.kptl_expected_peers) {
1161 /* peer table full */
1162 write_unlock_irqrestore(g_lock, flags);
1164 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1166 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1168 CERROR("Refusing connection from %s\n",
1169 libcfs_id2str(lpid));
1170 kptllnd_peer_unreserve_buffers();
1171 kptllnd_peer_decref(new_peer);
1172 kptllnd_tx_decref(hello_tx);
1176 write_lock_irqsave(g_lock, flags);
1177 kptllnd_data.kptl_expected_peers++;
1181 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1183 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1184 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1185 *kptllnd_tunables.kptl_max_msg_size;
1187 new_peer->peer_state = PEER_STATE_ACTIVE;
1188 new_peer->peer_incarnation = msg->ptlm_srcstamp;
1189 new_peer->peer_next_matchbits = safe_matchbits;
1190 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1191 new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1193 kptllnd_peer_add_peertable_locked(new_peer);
1195 write_unlock_irqrestore(g_lock, flags);
1197 /* NB someone else could get in now and post a message before I post
1198 * the HELLO, but post_tx/check_sends take care of that! */
1200 CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1201 libcfs_id2str(new_peer->peer_id), hello_tx);
1203 kptllnd_post_tx(new_peer, hello_tx, 0);
1204 kptllnd_peer_check_sends(new_peer);
1210 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1212 kptllnd_post_tx(peer, tx, nfrag);
1213 kptllnd_peer_check_sends(peer);
1217 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1219 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1220 ptl_process_id_t ptl_id;
1221 kptl_peer_t *new_peer;
1222 kptl_tx_t *hello_tx;
1223 unsigned long flags;
1225 __u64 last_matchbits_seen;
1227 /* I expect to find the peer, so I only take a read lock... */
1228 read_lock_irqsave(g_lock, flags);
1229 *peerp = kptllnd_id2peer_locked(target);
1230 read_unlock_irqrestore(g_lock, flags);
1235 if ((target.pid & LNET_PID_USERFLAG) != 0) {
1236 CWARN("Refusing to create a new connection to %s "
1237 "(non-kernel peer)\n", libcfs_id2str(target));
1238 return -EHOSTUNREACH;
1241 /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1242 * the same portals PID */
1243 ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1244 ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1246 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1247 if (hello_tx == NULL) {
1248 CERROR("Unable to allocate connect message for %s\n",
1249 libcfs_id2str(target));
1253 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1254 sizeof(kptl_hello_msg_t));
1256 new_peer = kptllnd_peer_allocate(target, ptl_id);
1257 if (new_peer == NULL) {
1262 rc = kptllnd_peer_reserve_buffers();
1266 write_lock_irqsave(g_lock, flags);
1268 if (kptllnd_data.kptl_shutdown) {
1269 write_unlock_irqrestore(g_lock, flags);
1274 *peerp = kptllnd_id2peer_locked(target);
1275 if (*peerp != NULL) {
1276 write_unlock_irqrestore(g_lock, flags);
1280 kptllnd_cull_peertable_locked(target);
1282 if (kptllnd_data.kptl_n_active_peers ==
1283 kptllnd_data.kptl_expected_peers) {
1284 /* peer table full */
1285 write_unlock_irqrestore(g_lock, flags);
1287 kptllnd_peertable_overflow_msg("Connection to ", target);
1289 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1291 CERROR("Can't create connection to %s\n",
1292 libcfs_id2str(target));
1296 write_lock_irqsave(g_lock, flags);
1297 kptllnd_data.kptl_expected_peers++;
1301 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1303 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1304 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1305 *kptllnd_tunables.kptl_max_msg_size;
1307 new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1308 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1310 kptllnd_peer_add_peertable_locked(new_peer);
1312 write_unlock_irqrestore(g_lock, flags);
1314 /* NB someone else could get in now and post a message before I post
1315 * the HELLO, but post_tx/check_sends take care of that! */
1317 CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1318 libcfs_id2str(new_peer->peer_id), hello_tx);
1320 kptllnd_post_tx(new_peer, hello_tx, 0);
1321 kptllnd_peer_check_sends(new_peer);
1327 kptllnd_peer_unreserve_buffers();
1329 kptllnd_peer_decref(new_peer);
1331 kptllnd_tx_decref(hello_tx);