1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: PJ Kirner <pjkirner@clusterfs.com>
6 * E Barton <eeb@bartonsoftware.com>
8 * This file is part of the Lustre file system, http://www.lustre.org
9 * Lustre is a trademark of Cluster File Systems, Inc.
11 * This file is confidential source code owned by Cluster File Systems.
12 * No viewing, modification, compilation, redistribution, or any other
13 * form of use is permitted except through a signed license agreement.
15 * If you have not signed such an agreement, then you have no rights to
16 * this file. Please destroy it immediately and contact CFS.
21 #include <libcfs/list.h>
24 kptllnd_count_queue(struct list_head *q)
37 kptllnd_get_peer_info(int index,
38 lnet_process_id_t *id,
39 int *state, int *sent_hello,
40 int *refcount, __u64 *incarnation,
41 __u64 *next_matchbits, __u64 *last_matchbits_seen,
42 int *nsendq, int *nactiveq,
43 int *credits, int *outstanding_credits)
45 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
47 struct list_head *ptmp;
52 read_lock_irqsave(g_lock, flags);
54 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
56 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57 peer = list_entry(ptmp, kptl_peer_t, peer_list);
63 *state = peer->peer_state;
64 *sent_hello = peer->peer_sent_hello;
65 *refcount = atomic_read(&peer->peer_refcount);
66 *incarnation = peer->peer_incarnation;
68 spin_lock(&peer->peer_lock);
70 *next_matchbits = peer->peer_next_matchbits;
71 *last_matchbits_seen = peer->peer_last_matchbits_seen;
72 *credits = peer->peer_credits;
73 *outstanding_credits = peer->peer_outstanding_credits;
75 *nsendq = kptllnd_count_queue(&peer->peer_sendq);
76 *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
78 spin_unlock(&peer->peer_lock);
86 read_unlock_irqrestore(g_lock, flags);
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
93 LASSERT (kptllnd_data.kptl_n_active_peers <
94 kptllnd_data.kptl_expected_peers);
96 LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
97 peer->peer_state == PEER_STATE_ACTIVE);
99 kptllnd_data.kptl_n_active_peers++;
100 atomic_inc(&peer->peer_refcount); /* +1 ref for the list */
102 /* NB add to HEAD of peer list for MRU order!
103 * (see kptllnd_cull_peertable) */
104 list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
108 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
110 /* I'm about to add a new peer with this portals ID to the peer table,
111 * so (a) this peer should not exist already and (b) I want to leave at
112 * most (max_procs_per_nid - 1) peers with this NID in the table. */
113 struct list_head *peers = kptllnd_nid2peerlist(pid.nid);
114 int cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
116 struct list_head *tmp;
117 struct list_head *nxt;
121 list_for_each_safe (tmp, nxt, peers) {
122 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
124 peer = list_entry(tmp, kptl_peer_t, peer_list);
126 if (peer->peer_id.nid != pid.nid)
129 LASSERT (peer->peer_id.pid != pid.pid);
133 if (count < cull_count) /* recent (don't cull) */
136 CDEBUG(D_NET, "Cull %s(%s)\n",
137 libcfs_id2str(peer->peer_id),
138 kptllnd_ptlid2str(peer->peer_ptlid));
140 kptllnd_peer_close_locked(peer, 0);
145 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
150 LIBCFS_ALLOC(peer, sizeof (*peer));
152 CERROR("Can't create peer %s (%s)\n",
154 kptllnd_ptlid2str(ppid));
158 memset(peer, 0, sizeof(*peer)); /* zero flags etc */
160 INIT_LIST_HEAD (&peer->peer_sendq);
161 INIT_LIST_HEAD (&peer->peer_activeq);
162 spin_lock_init (&peer->peer_lock);
164 peer->peer_state = PEER_STATE_ALLOCATED;
165 peer->peer_error = 0;
166 peer->peer_last_alive = cfs_time_current();
167 peer->peer_id = lpid;
168 peer->peer_ptlid = ppid;
169 peer->peer_credits = 1; /* enough for HELLO */
170 peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
171 peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
173 atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */
175 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
177 /* Only increase # peers under lock, to guarantee we dont grow it
179 if (kptllnd_data.kptl_shutdown) {
180 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
182 LIBCFS_FREE(peer, sizeof(*peer));
186 kptllnd_data.kptl_npeers++;
187 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
193 kptllnd_peer_destroy (kptl_peer_t *peer)
197 CDEBUG(D_NET, "Peer=%p\n", peer);
199 LASSERT (!in_interrupt());
200 LASSERT (atomic_read(&peer->peer_refcount) == 0);
201 LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
202 peer->peer_state == PEER_STATE_ZOMBIE);
203 LASSERT (list_empty(&peer->peer_sendq));
204 LASSERT (list_empty(&peer->peer_activeq));
206 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
208 if (peer->peer_state == PEER_STATE_ZOMBIE)
209 list_del(&peer->peer_list);
211 kptllnd_data.kptl_npeers--;
213 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
215 LIBCFS_FREE (peer, sizeof (*peer));
219 kptllnd_peer_cancel_txs(kptl_peer_t *peer)
221 struct list_head sendq;
222 struct list_head activeq;
223 struct list_head *tmp;
224 struct list_head *nxt;
228 /* atomically grab all the peer's tx-es... */
230 spin_lock_irqsave(&peer->peer_lock, flags);
232 list_add(&sendq, &peer->peer_sendq);
233 list_del_init(&peer->peer_sendq);
234 list_for_each (tmp, &sendq) {
235 tx = list_entry(tmp, kptl_tx_t, tx_list);
239 list_add(&activeq, &peer->peer_activeq);
240 list_del_init(&peer->peer_activeq);
241 list_for_each (tmp, &activeq) {
242 tx = list_entry(tmp, kptl_tx_t, tx_list);
246 spin_unlock_irqrestore(&peer->peer_lock, flags);
248 /* ...then drop the peer's ref on them at leasure. This will get
249 * kptllnd_tx_fini() to abort outstanding comms if necessary. */
251 list_for_each_safe (tmp, nxt, &sendq) {
252 tx = list_entry(tmp, kptl_tx_t, tx_list);
253 list_del(&tx->tx_list);
254 tx->tx_status = -EIO;
255 kptllnd_tx_decref(tx);
258 list_for_each_safe (tmp, nxt, &activeq) {
259 tx = list_entry(tmp, kptl_tx_t, tx_list);
260 list_del(&tx->tx_list);
261 tx->tx_status = -EIO;
262 kptllnd_tx_decref(tx);
267 kptllnd_peer_alive (kptl_peer_t *peer)
269 /* This is racy, but everyone's only writing cfs_time_current() */
270 peer->peer_last_alive = cfs_time_current();
275 kptllnd_peer_notify (kptl_peer_t *peer)
278 time_t last_alive = 0;
281 spin_lock_irqsave(&peer->peer_lock, flags);
283 if (peer->peer_error != 0) {
284 error = peer->peer_error;
285 peer->peer_error = 0;
287 last_alive = cfs_time_current_sec() -
288 cfs_duration_sec(cfs_time_current() -
289 peer->peer_last_alive);
292 spin_unlock_irqrestore(&peer->peer_lock, flags);
295 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
300 kptllnd_handle_closing_peers ()
304 struct list_head *tmp;
305 struct list_head *nxt;
308 /* Check with a read lock first to avoid blocking anyone */
310 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
311 idle = list_empty(&kptllnd_data.kptl_closing_peers);
312 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
317 /* Scan the closing peers and cancel their txs.
318 * NB only safe while there is only a single watchdog */
320 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
322 list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
323 peer = list_entry (tmp, kptl_peer_t, peer_list);
325 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
327 list_del(&peer->peer_list);
328 list_add_tail(&peer->peer_list,
329 &kptllnd_data.kptl_zombie_peers);
330 peer->peer_state = PEER_STATE_ZOMBIE;
332 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
334 kptllnd_peer_notify(peer);
335 kptllnd_peer_cancel_txs(peer);
336 kptllnd_peer_decref(peer);
338 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
341 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
345 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
347 switch (peer->peer_state) {
351 case PEER_STATE_WAITING_HELLO:
352 case PEER_STATE_ACTIVE:
353 /* Removing from peer table */
354 kptllnd_data.kptl_n_active_peers--;
355 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
357 list_del(&peer->peer_list);
358 kptllnd_peer_unreserve_buffers();
360 peer->peer_error = why; /* stash 'why' only on first close */
362 /* Schedule for immediate attention, taking peer table's ref */
363 list_add_tail(&peer->peer_list,
364 &kptllnd_data.kptl_closing_peers);
365 wake_up(&kptllnd_data.kptl_watchdog_waitq);
368 case PEER_STATE_ZOMBIE:
369 /* Schedule for attention at next timeout */
370 kptllnd_peer_addref(peer);
371 list_del(&peer->peer_list);
372 list_add_tail(&peer->peer_list,
373 &kptllnd_data.kptl_closing_peers);
376 case PEER_STATE_CLOSING:
380 peer->peer_state = PEER_STATE_CLOSING;
384 kptllnd_peer_close(kptl_peer_t *peer, int why)
388 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
389 kptllnd_peer_close_locked(peer, why);
390 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
394 kptllnd_peer_del(lnet_process_id_t id)
396 struct list_head *ptmp;
397 struct list_head *pnxt;
406 * Find the single bucket we are supposed to look at or if nid is a
407 * wildcard (LNET_NID_ANY) then look at all of the buckets
409 if (id.nid != LNET_NID_ANY) {
410 struct list_head *l = kptllnd_nid2peerlist(id.nid);
412 lo = hi = l - kptllnd_data.kptl_peers;
414 if (id.pid != LNET_PID_ANY)
418 hi = kptllnd_data.kptl_peer_hash_size - 1;
422 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
424 for (i = lo; i <= hi; i++) {
425 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
426 peer = list_entry (ptmp, kptl_peer_t, peer_list);
428 if (!(id.nid == LNET_NID_ANY ||
429 (peer->peer_id.nid == id.nid &&
430 (id.pid == LNET_PID_ANY ||
431 peer->peer_id.pid == id.pid))))
434 kptllnd_peer_addref(peer); /* 1 ref for me... */
436 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
439 kptllnd_peer_close(peer, 0);
440 kptllnd_peer_decref(peer); /* ...until here */
442 rc = 0; /* matched something */
444 /* start again now I've dropped the lock */
449 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
455 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx)
457 /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
458 ptl_handle_md_t rdma_mdh = PTL_INVALID_HANDLE;
459 ptl_handle_md_t msg_mdh = PTL_INVALID_HANDLE;
465 LASSERT (!tx->tx_idle);
466 LASSERT (!tx->tx_active);
467 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
468 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
469 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
470 tx->tx_type == TX_TYPE_PUT_REQUEST ||
471 tx->tx_type == TX_TYPE_GET_REQUEST);
473 kptllnd_set_tx_peer(tx, peer);
475 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
476 tx->tx_type == TX_TYPE_GET_REQUEST) {
478 spin_lock_irqsave(&peer->peer_lock, flags);
480 /* Assume 64-bit matchbits can't wrap */
481 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
482 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
483 peer->peer_next_matchbits++;
485 spin_unlock_irqrestore(&peer->peer_lock, flags);
487 prc = PtlMEAttach(kptllnd_data.kptl_nih,
488 *kptllnd_tunables.kptl_portal,
490 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
496 CERROR("PtlMEAttach(%s) failed: %d\n",
497 libcfs_id2str(peer->peer_id), prc);
501 prc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, &rdma_mdh);
503 CERROR("PtlMDAttach(%s) failed: %d\n",
504 libcfs_id2str(tx->tx_peer->peer_id), prc);
505 prc = PtlMEUnlink(meh);
506 LASSERT(prc == PTL_OK);
507 rdma_mdh = PTL_INVALID_HANDLE;
511 /* I'm not racing with the event callback here. It's a bug if
512 * there's an event on the MD I just attached before I actually
513 * send the RDMA request message which the event callback
514 * catches by asserting 'rdma_mdh' is valid. */
517 memset(&md, 0, sizeof(md));
519 md.start = tx->tx_msg;
520 md.length = tx->tx_msg->ptlm_nob;
522 md.options = PTL_MD_OP_PUT |
523 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
524 PTL_MD_EVENT_START_DISABLE;
525 md.user_ptr = &tx->tx_msg_eventarg;
526 md.eq_handle = kptllnd_data.kptl_eqh;
528 prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
530 msg_mdh = PTL_INVALID_HANDLE;
534 spin_lock_irqsave(&peer->peer_lock, flags);
536 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
538 tx->tx_rdma_mdh = rdma_mdh;
539 tx->tx_msg_mdh = msg_mdh;
541 /* Ensure HELLO is sent first */
542 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
543 list_add(&tx->tx_list, &peer->peer_sendq);
545 list_add_tail(&tx->tx_list, &peer->peer_sendq);
547 spin_unlock_irqrestore(&peer->peer_lock, flags);
551 spin_lock_irqsave(&peer->peer_lock, flags);
553 tx->tx_status = -EIO;
554 tx->tx_rdma_mdh = rdma_mdh;
555 tx->tx_msg_mdh = msg_mdh;
557 spin_unlock_irqrestore(&peer->peer_lock, flags);
559 kptllnd_tx_decref(tx);
563 kptllnd_peer_check_sends (kptl_peer_t *peer)
570 LASSERT(!in_interrupt());
572 spin_lock_irqsave(&peer->peer_lock, flags);
574 if (list_empty(&peer->peer_sendq) &&
575 peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER) {
577 /* post a NOOP to return credits */
578 spin_unlock_irqrestore(&peer->peer_lock, flags);
580 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
582 CERROR("Can't return credits to %s: can't allocate descriptor\n",
583 libcfs_id2str(peer->peer_id));
585 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
586 kptllnd_post_tx(peer, tx);
589 spin_lock_irqsave(&peer->peer_lock, flags);
592 while (!list_empty(&peer->peer_sendq)) {
593 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
595 LASSERT (tx->tx_active);
596 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
597 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
598 !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
600 LASSERT (peer->peer_outstanding_credits >= 0);
601 LASSERT (peer->peer_outstanding_credits <=
602 *kptllnd_tunables.kptl_peercredits);
603 LASSERT (peer->peer_credits >= 0);
604 LASSERT (peer->peer_credits <=
605 *kptllnd_tunables.kptl_peercredits);
607 /* Ensure HELLO is sent first */
608 if (!peer->peer_sent_hello) {
609 if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
611 peer->peer_sent_hello = 1;
614 if (peer->peer_credits == 0) {
615 CDEBUG(D_NET, "%s: no credits\n",
616 libcfs_id2str(peer->peer_id));
620 /* Don't use the last credit unless I've got credits to
622 if (peer->peer_credits == 1 &&
623 peer->peer_outstanding_credits == 0) {
624 CDEBUG(D_NET, "%s: not using last credit\n",
625 libcfs_id2str(peer->peer_id));
629 list_del(&tx->tx_list);
631 /* Discard any NOOP I queued if I'm not at the high-water mark
632 * any more or more messages have been queued */
633 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
634 (!list_empty(&peer->peer_sendq) ||
635 peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
639 spin_unlock_irqrestore(&peer->peer_lock, flags);
641 CDEBUG(D_NET, "%s: redundant noop\n",
642 libcfs_id2str(peer->peer_id));
643 kptllnd_tx_decref(tx);
645 spin_lock_irqsave(&peer->peer_lock, flags);
649 CDEBUG(D_NET, "tx=%p nob=%d to %s(%s)\n",
650 tx, tx->tx_msg->ptlm_nob,
651 libcfs_id2str(peer->peer_id),
652 kptllnd_ptlid2str(peer->peer_ptlid));
654 /* fill last-minute msg header fields */
655 kptllnd_msg_pack(tx->tx_msg, peer);
657 peer->peer_outstanding_credits = 0;
658 peer->peer_credits--;
660 list_add_tail(&tx->tx_list, &peer->peer_activeq);
662 kptllnd_tx_addref(tx); /* 1 ref for me... */
664 spin_unlock_irqrestore(&peer->peer_lock, flags);
666 rc = PtlPut (tx->tx_msg_mdh,
669 *kptllnd_tunables.kptl_portal,
673 0); /* header data */
675 CERROR("PtlPut %s error %d\n",
676 libcfs_id2str(peer->peer_id), rc);
678 /* Nuke everything (including this tx) */
679 kptllnd_peer_close(peer, -EIO);
683 kptllnd_tx_decref(tx); /* drop my ref */
685 spin_lock_irqsave(&peer->peer_lock, flags);
688 spin_unlock_irqrestore(&peer->peer_lock, flags);
692 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
695 struct list_head *tmp;
698 spin_lock_irqsave(&peer->peer_lock, flags);
700 list_for_each(tmp, &peer->peer_sendq) {
701 tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
703 if (time_after_eq(jiffies, tx->tx_deadline)) {
704 kptllnd_tx_addref(tx);
705 spin_unlock_irqrestore(&peer->peer_lock, flags);
710 list_for_each(tmp, &peer->peer_activeq) {
711 tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
713 if (time_after_eq(jiffies, tx->tx_deadline)) {
714 kptllnd_tx_addref(tx);
715 spin_unlock_irqrestore(&peer->peer_lock, flags);
720 spin_unlock_irqrestore(&peer->peer_lock, flags);
726 kptllnd_peer_check_bucket (int idx)
728 struct list_head *peers = &kptllnd_data.kptl_peers[idx];
729 struct list_head *ptmp;
736 CDEBUG(D_NET, "Bucket=%d\n", idx);
739 /* NB. Shared lock while I just look */
740 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
742 list_for_each (ptmp, peers) {
743 peer = list_entry (ptmp, kptl_peer_t, peer_list);
745 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d\n",
746 libcfs_id2str(peer->peer_id),
747 peer->peer_credits, peer->peer_outstanding_credits);
749 /* In case we have enough credits to return via a
750 * NOOP, but there were no non-blocking tx descs
751 * free to do it last time... */
752 kptllnd_peer_check_sends(peer);
754 tx = kptllnd_find_timed_out_tx(peer);
758 kptllnd_peer_addref(peer); /* 1 ref for me... */
760 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
763 spin_lock_irqsave(&peer->peer_lock, flags);
764 nsend = kptllnd_count_queue(&peer->peer_sendq);
765 nactive = kptllnd_count_queue(&peer->peer_activeq);
766 spin_unlock_irqrestore(&peer->peer_lock, flags);
768 LCONSOLE_ERROR("Timing out %s: please check Portals\n",
769 libcfs_id2str(peer->peer_id));
771 CERROR("%s timed out: cred %d outstanding %d sendq %d "
772 "activeq %d Tx %s (%s%s%s) status %d T/O %ds\n",
773 libcfs_id2str(peer->peer_id),
774 peer->peer_credits, peer->peer_outstanding_credits,
775 nsend, nactive, kptllnd_tx_typestr(tx->tx_type),
776 tx->tx_active ? "A" : "",
777 PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
779 PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
781 tx->tx_status, *kptllnd_tunables.kptl_timeout);
783 kptllnd_dump_ptltrace();
785 kptllnd_tx_decref(tx);
787 kptllnd_peer_close(peer, -ETIMEDOUT);
788 kptllnd_peer_decref(peer); /* ...until here */
790 /* start again now I've dropped the lock */
794 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
798 kptllnd_id2peer_locked (lnet_process_id_t id)
800 struct list_head *peers = kptllnd_nid2peerlist(id.nid);
801 struct list_head *tmp;
804 list_for_each (tmp, peers) {
806 peer = list_entry (tmp, kptl_peer_t, peer_list);
808 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
809 peer->peer_state == PEER_STATE_ACTIVE);
811 if (peer->peer_id.nid != id.nid ||
812 peer->peer_id.pid != id.pid)
815 kptllnd_peer_addref(peer);
817 CDEBUG(D_NET, "%s -> %s (%d)\n",
819 kptllnd_ptlid2str(peer->peer_ptlid),
820 atomic_read (&peer->peer_refcount));
828 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
830 LCONSOLE_ERROR("%s %s overflows the peer table[%d]: "
831 "messages may be dropped\n",
832 str, libcfs_id2str(id),
833 kptllnd_data.kptl_n_active_peers);
834 LCONSOLE_ERROR("Please correct by increasing "
835 "'max_nodes' or 'max_procs_per_node'\n");
839 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
842 struct list_head *tmp;
844 /* Find the last matchbits I saw this new peer using. Note..
845 A. This peer cannot be in the peer table - she's new!
846 B. If I can't find the peer in the closing/zombie peers, all
847 matchbits are safe because all refs to the (old) peer have gone
848 so all txs have completed so there's no risk of matchbit
852 LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
854 /* peer's last matchbits can't change after it comes out of the peer
855 * table, so first match is fine */
857 list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
858 peer = list_entry (tmp, kptl_peer_t, peer_list);
860 if (peer->peer_id.nid == lpid.nid &&
861 peer->peer_id.pid == lpid.pid)
862 return peer->peer_last_matchbits_seen;
865 list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
866 peer = list_entry (tmp, kptl_peer_t, peer_list);
868 if (peer->peer_id.nid == lpid.nid &&
869 peer->peer_id.pid == lpid.pid)
870 return peer->peer_last_matchbits_seen;
873 return PTL_RESERVED_MATCHBITS;
877 kptllnd_peer_handle_hello (ptl_process_id_t initiator,
880 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
882 kptl_peer_t *new_peer;
883 lnet_process_id_t lpid;
887 __u64 safe_matchbits;
888 __u64 last_matchbits_seen;
890 lpid.nid = msg->ptlm_srcnid;
891 lpid.pid = msg->ptlm_srcpid;
893 CDEBUG(D_NET, "hello from %s(%s)\n",
894 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
896 if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
897 (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
898 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
899 * userspace. Refuse the connection if she hasn't set the
900 * correct flag in her PID... */
901 CERROR("Userflag not set in hello from %s (%s)\n",
902 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
906 /* kptlhm_matchbits are the highest matchbits my peer may have used to
907 * RDMA to me. I ensure I never register buffers for RDMA that could
908 * match any she used */
909 safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
911 if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
912 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
913 safe_matchbits, libcfs_id2str(lpid));
917 if (msg->ptlm_u.hello.kptlhm_max_msg_size !=
918 *kptllnd_tunables.kptl_max_msg_size) {
919 CERROR("max message size MUST be equal for all peers: "
920 "got %d expected %d from %s\n",
921 msg->ptlm_u.hello.kptlhm_max_msg_size,
922 *kptllnd_tunables.kptl_max_msg_size,
923 libcfs_id2str(lpid));
927 if (msg->ptlm_credits + 1 != *kptllnd_tunables.kptl_peercredits) {
928 CERROR("peercredits MUST be equal on all peers: "
929 "got %d expected %d from %s\n",
930 msg->ptlm_credits + 1,
931 *kptllnd_tunables.kptl_peercredits,
932 libcfs_id2str(lpid));
936 write_lock_irqsave(g_lock, flags);
938 peer = kptllnd_id2peer_locked(lpid);
940 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
941 /* Completing HELLO handshake */
942 LASSERT(peer->peer_incarnation == 0);
944 peer->peer_state = PEER_STATE_ACTIVE;
945 peer->peer_incarnation = msg->ptlm_srcstamp;
946 peer->peer_next_matchbits = safe_matchbits;
948 write_unlock_irqrestore(g_lock, flags);
952 /* remove old incarnation of this peer */
953 kptllnd_peer_close_locked(peer, 0);
956 kptllnd_cull_peertable_locked(lpid);
958 write_unlock_irqrestore(g_lock, flags);
961 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
962 " stamp "LPX64"("LPX64")\n",
963 libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
964 msg->ptlm_srcstamp, peer->peer_incarnation);
966 kptllnd_peer_decref(peer);
969 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
970 if (hello_tx == NULL) {
971 CERROR("Unable to allocate HELLO message for %s\n",
972 libcfs_id2str(lpid));
976 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
977 sizeof(kptl_hello_msg_t));
979 new_peer = kptllnd_peer_allocate(lpid, initiator);
980 if (new_peer == NULL) {
981 kptllnd_tx_decref(hello_tx);
985 rc = kptllnd_peer_reserve_buffers();
987 kptllnd_peer_decref(new_peer);
988 kptllnd_tx_decref(hello_tx);
990 CERROR("Failed to reserve buffers for %s\n",
991 libcfs_id2str(lpid));
995 write_lock_irqsave(g_lock, flags);
997 peer = kptllnd_id2peer_locked(lpid);
999 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1000 /* An outgoing message instantiated 'peer' for me and
1001 * presumably provoked this reply */
1002 CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid));
1003 LASSERT(peer->peer_incarnation == 0);
1005 peer->peer_state = PEER_STATE_ACTIVE;
1006 peer->peer_incarnation = msg->ptlm_srcstamp;
1007 peer->peer_next_matchbits = safe_matchbits;
1009 LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1010 /* WOW! Somehow this peer completed the HELLO
1011 * handshake while I slept. I guess I could have slept
1012 * while it rebooted and sent a new HELLO, so I'll fail
1014 CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1015 kptllnd_peer_decref(peer);
1019 write_unlock_irqrestore(g_lock, flags);
1021 kptllnd_peer_unreserve_buffers();
1022 kptllnd_peer_decref(new_peer);
1023 kptllnd_tx_decref(hello_tx);
1027 if (kptllnd_data.kptl_n_active_peers ==
1028 kptllnd_data.kptl_expected_peers) {
1029 /* peer table full */
1030 write_unlock_irqrestore(g_lock, flags);
1032 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1034 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1036 CERROR("Refusing connection from %s\n",
1037 libcfs_id2str(lpid));
1038 kptllnd_peer_unreserve_buffers();
1039 kptllnd_peer_decref(new_peer);
1040 kptllnd_tx_decref(hello_tx);
1044 write_lock_irqsave(g_lock, flags);
1045 kptllnd_data.kptl_expected_peers++;
1048 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1050 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1051 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1052 *kptllnd_tunables.kptl_max_msg_size;
1054 new_peer->peer_state = PEER_STATE_ACTIVE;
1055 new_peer->peer_incarnation = msg->ptlm_srcstamp;
1056 new_peer->peer_next_matchbits = safe_matchbits;
1057 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1059 kptllnd_peer_add_peertable_locked(new_peer);
1061 write_unlock_irqrestore(g_lock, flags);
1063 /* NB someone else could get in now and post a message before I post
1064 * the HELLO, but post_tx/check_sends take care of that! */
1066 kptllnd_post_tx(new_peer, hello_tx);
1067 kptllnd_peer_check_sends(new_peer);
1073 kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
1075 rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1076 ptl_process_id_t ptl_id;
1078 kptl_peer_t *new_peer = NULL;
1079 kptl_tx_t *hello_tx = NULL;
1080 unsigned long flags;
1082 __u64 last_matchbits_seen;
1084 LASSERT (tx->tx_lnet_msg != NULL);
1085 LASSERT (tx->tx_peer == NULL);
1087 /* I expect to find the peer, so I only take a read lock... */
1088 read_lock_irqsave(g_lock, flags);
1089 peer = kptllnd_id2peer_locked(target);
1090 read_unlock_irqrestore(g_lock, flags);
1096 if ((target.pid & LNET_PID_USERFLAG) != 0) {
1097 CWARN("Refusing to create a new connection to %s "
1098 "(non-kernel peer)\n", libcfs_id2str(target));
1099 tx->tx_status = -EHOSTUNREACH;
1103 /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1104 * the same portals PID */
1105 ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1106 ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1108 write_lock_irqsave(g_lock, flags);
1110 peer = kptllnd_id2peer_locked(target);
1112 write_unlock_irqrestore(g_lock, flags);
1116 kptllnd_cull_peertable_locked(target);
1118 write_unlock_irqrestore(g_lock, flags);
1120 hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1121 if (hello_tx == NULL) {
1122 CERROR("Unable to allocate connect message for %s\n",
1123 libcfs_id2str(target));
1124 tx->tx_status = -ENOMEM;
1128 kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1129 sizeof(kptl_hello_msg_t));
1131 new_peer = kptllnd_peer_allocate(target, ptl_id);
1132 if (new_peer == NULL) {
1133 tx->tx_status = -ENOMEM;
1137 rc = kptllnd_peer_reserve_buffers();
1143 write_lock_irqsave(g_lock, flags);
1145 peer = kptllnd_id2peer_locked(target);
1146 if (peer != NULL) { /* someone else beat me to it */
1147 write_unlock_irqrestore(g_lock, flags);
1149 kptllnd_peer_unreserve_buffers();
1150 kptllnd_peer_decref(new_peer);
1151 kptllnd_tx_decref(hello_tx);
1155 if (kptllnd_data.kptl_n_active_peers ==
1156 kptllnd_data.kptl_expected_peers) {
1157 /* peer table full */
1158 write_unlock_irqrestore(g_lock, flags);
1160 kptllnd_peertable_overflow_msg("Connection to ", target);
1162 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1164 CERROR("Can't create connection to %s\n",
1165 libcfs_id2str(target));
1166 kptllnd_peer_unreserve_buffers();
1167 tx->tx_status = -ENOMEM;
1170 write_lock_irqsave(g_lock, flags);
1171 kptllnd_data.kptl_expected_peers++;
1174 last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1176 hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1177 hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1178 *kptllnd_tunables.kptl_max_msg_size;
1180 new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1181 new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1183 kptllnd_peer_add_peertable_locked(new_peer);
1185 write_unlock_irqrestore(g_lock, flags);
1187 /* NB someone else could get in now and post a message before I post
1188 * the HELLO, but post_tx/check_sends take care of that! */
1191 kptllnd_post_tx(peer, hello_tx);
1194 kptllnd_post_tx(peer, tx);
1195 kptllnd_peer_check_sends(peer);
1196 kptllnd_peer_decref(peer);
1200 if (hello_tx != NULL)
1201 kptllnd_tx_decref(hello_tx);
1203 if (new_peer != NULL)
1204 kptllnd_peer_decref(new_peer);
1206 LASSERT (tx->tx_status != 0);
1207 kptllnd_tx_decref(tx);