4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2014, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
35 #define DEBUG_SUBSYSTEM S_LNET
37 #include <lnet/lib-lnet.h>
38 #include <lnet/lib-dlc.h>
40 unsigned lnet_peer_discovery_enabled = 1;
41 module_param(lnet_peer_discovery_enabled, uint, 0644);
42 MODULE_PARM_DESC(lnet_peer_discovery_enabled,
43 "Explicitly enable/disable peer discovery");
46 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
48 if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
49 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
50 lnet_peer_ni_decref_locked(lpni);
55 lnet_peer_net_added(struct lnet_net *net)
57 struct lnet_peer_ni *lpni, *tmp;
59 list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
60 lpni_on_remote_peer_ni_list) {
62 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
65 spin_lock(&lpni->lpni_lock);
66 lpni->lpni_txcredits =
67 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
68 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
69 lpni->lpni_rtrcredits =
70 lnet_peer_buffer_credits(lpni->lpni_net);
71 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
72 spin_unlock(&lpni->lpni_lock);
74 lnet_peer_remove_from_remote_list(lpni);
80 lnet_peer_tables_destroy(void)
82 struct lnet_peer_table *ptable;
83 struct list_head *hash;
87 if (!the_lnet.ln_peer_tables)
90 cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
91 hash = ptable->pt_hash;
92 if (!hash) /* not intialized */
95 LASSERT(list_empty(&ptable->pt_zombie_list));
97 ptable->pt_hash = NULL;
98 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
99 LASSERT(list_empty(&hash[j]));
101 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
104 cfs_percpt_free(the_lnet.ln_peer_tables);
105 the_lnet.ln_peer_tables = NULL;
109 lnet_peer_tables_create(void)
111 struct lnet_peer_table *ptable;
112 struct list_head *hash;
116 the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
118 if (the_lnet.ln_peer_tables == NULL) {
119 CERROR("Failed to allocate cpu-partition peer tables\n");
123 cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
124 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
125 LNET_PEER_HASH_SIZE * sizeof(*hash));
127 CERROR("Failed to create peer hash table\n");
128 lnet_peer_tables_destroy();
132 spin_lock_init(&ptable->pt_zombie_lock);
133 INIT_LIST_HEAD(&ptable->pt_zombie_list);
135 INIT_LIST_HEAD(&ptable->pt_peer_list);
137 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
138 INIT_LIST_HEAD(&hash[j]);
139 ptable->pt_hash = hash; /* sign of initialization */
145 static struct lnet_peer_ni *
146 lnet_peer_ni_alloc(lnet_nid_t nid)
148 struct lnet_peer_ni *lpni;
149 struct lnet_net *net;
152 cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
154 LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
158 INIT_LIST_HEAD(&lpni->lpni_txq);
159 INIT_LIST_HEAD(&lpni->lpni_rtrq);
160 INIT_LIST_HEAD(&lpni->lpni_routes);
161 INIT_LIST_HEAD(&lpni->lpni_hashlist);
162 INIT_LIST_HEAD(&lpni->lpni_peer_nis);
163 INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
165 spin_lock_init(&lpni->lpni_lock);
167 lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
168 lpni->lpni_last_alive = cfs_time_current(); /* assumes alive */
169 lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
170 lpni->lpni_nid = nid;
171 lpni->lpni_cpt = cpt;
172 lnet_set_peer_ni_health_locked(lpni, true);
174 net = lnet_get_net_locked(LNET_NIDNET(nid));
175 lpni->lpni_net = net;
177 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
178 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
179 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
180 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
183 * This peer_ni is not on a local network, so we
184 * cannot add the credits here. In case the net is
185 * added later, add the peer_ni to the remote peer ni
186 * list so it can be easily found and revisited.
188 /* FIXME: per-net implementation instead? */
189 atomic_inc(&lpni->lpni_refcount);
190 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
191 &the_lnet.ln_remote_peer_ni_list);
194 CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
199 static struct lnet_peer_net *
200 lnet_peer_net_alloc(__u32 net_id)
202 struct lnet_peer_net *lpn;
204 LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
208 INIT_LIST_HEAD(&lpn->lpn_peer_nets);
209 INIT_LIST_HEAD(&lpn->lpn_peer_nis);
210 lpn->lpn_net_id = net_id;
212 CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
218 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
220 struct lnet_peer *lp;
222 CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
224 LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
225 LASSERT(list_empty(&lpn->lpn_peer_nis));
226 LASSERT(list_empty(&lpn->lpn_peer_nets));
228 lpn->lpn_peer = NULL;
229 LIBCFS_FREE(lpn, sizeof(*lpn));
231 lnet_peer_decref_locked(lp);
234 static struct lnet_peer *
235 lnet_peer_alloc(lnet_nid_t nid)
237 struct lnet_peer *lp;
239 LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
243 INIT_LIST_HEAD(&lp->lp_peer_list);
244 INIT_LIST_HEAD(&lp->lp_peer_nets);
245 INIT_LIST_HEAD(&lp->lp_dc_list);
246 init_waitqueue_head(&lp->lp_dc_waitq);
247 spin_lock_init(&lp->lp_lock);
248 lp->lp_primary_nid = nid;
249 lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
251 CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
257 lnet_destroy_peer_locked(struct lnet_peer *lp)
259 CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
261 LASSERT(atomic_read(&lp->lp_refcount) == 0);
262 LASSERT(list_empty(&lp->lp_peer_nets));
263 LASSERT(list_empty(&lp->lp_peer_list));
265 LIBCFS_FREE(lp, sizeof(*lp));
269 * Detach a peer_ni from its peer_net. If this was the last peer_ni on
270 * that peer_net, detach the peer_net from the peer.
272 * Call with lnet_net_lock/EX held
275 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
277 struct lnet_peer_table *ptable;
278 struct lnet_peer_net *lpn;
279 struct lnet_peer *lp;
282 * Belts and suspenders: gracefully handle teardown of a
283 * partially connected peer_ni.
285 lpn = lpni->lpni_peer_net;
287 list_del_init(&lpni->lpni_peer_nis);
289 * If there are no lpni's left, we detach lpn from
290 * lp_peer_nets, so it cannot be found anymore.
292 if (list_empty(&lpn->lpn_peer_nis))
293 list_del_init(&lpn->lpn_peer_nets);
295 /* Update peer NID count. */
297 ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
299 ptable->pt_peer_nnids--;
302 * If there are no more peer nets, make the peer unfindable
303 * via the peer_tables.
305 if (list_empty(&lp->lp_peer_nets)) {
306 list_del_init(&lp->lp_peer_list);
309 CDEBUG(D_NET, "peer %s NID %s\n",
310 libcfs_nid2str(lp->lp_primary_nid),
311 libcfs_nid2str(lpni->lpni_nid));
314 /* called with lnet_net_lock LNET_LOCK_EX held */
316 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
318 struct lnet_peer_table *ptable = NULL;
320 /* don't remove a peer_ni if it's also a gateway */
321 if (lpni->lpni_rtr_refcount > 0) {
322 CERROR("Peer NI %s is a gateway. Can not delete it\n",
323 libcfs_nid2str(lpni->lpni_nid));
327 lnet_peer_remove_from_remote_list(lpni);
329 /* remove peer ni from the hash list. */
330 list_del_init(&lpni->lpni_hashlist);
332 /* decrement the ref count on the peer table */
333 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
334 LASSERT(ptable->pt_number > 0);
338 * The peer_ni can no longer be found with a lookup. But there
339 * can be current users, so keep track of it on the zombie
340 * list until the reference count has gone to zero.
342 * The last reference may be lost in a place where the
343 * lnet_net_lock locks only a single cpt, and that cpt may not
344 * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
347 spin_lock(&ptable->pt_zombie_lock);
348 list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
349 ptable->pt_zombies++;
350 spin_unlock(&ptable->pt_zombie_lock);
352 /* no need to keep this peer_ni on the hierarchy anymore */
353 lnet_peer_detach_peer_ni_locked(lpni);
355 /* remove hashlist reference on peer_ni */
356 lnet_peer_ni_decref_locked(lpni);
361 void lnet_peer_uninit(void)
363 struct lnet_peer_ni *lpni, *tmp;
365 lnet_net_lock(LNET_LOCK_EX);
367 /* remove all peer_nis from the remote peer and the hash list */
368 list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
369 lpni_on_remote_peer_ni_list)
370 lnet_peer_ni_del_locked(lpni);
372 lnet_peer_tables_destroy();
374 lnet_net_unlock(LNET_LOCK_EX);
378 lnet_peer_del_locked(struct lnet_peer *peer)
380 struct lnet_peer_ni *lpni = NULL, *lpni2;
383 CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
385 lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
386 while (lpni != NULL) {
387 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
388 rc = lnet_peer_ni_del_locked(lpni);
398 lnet_peer_del(struct lnet_peer *peer)
400 lnet_net_lock(LNET_LOCK_EX);
401 lnet_peer_del_locked(peer);
402 lnet_net_unlock(LNET_LOCK_EX);
408 * Delete a NID from a peer. Call with ln_api_mutex held.
411 * -EPERM: Non-DLC deletion from DLC-configured peer.
412 * -ENOENT: No lnet_peer_ni corresponding to the nid.
413 * -ECHILD: The lnet_peer_ni isn't connected to the peer.
414 * -EBUSY: The lnet_peer_ni is the primary, and not the only peer_ni.
417 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
419 struct lnet_peer_ni *lpni;
420 lnet_nid_t primary_nid = lp->lp_primary_nid;
423 if (!(flags & LNET_PEER_CONFIGURED)) {
424 if (lp->lp_state & LNET_PEER_CONFIGURED) {
429 lpni = lnet_find_peer_ni_locked(nid);
434 lnet_peer_ni_decref_locked(lpni);
435 if (lp != lpni->lpni_peer_net->lpn_peer) {
441 * This function only allows deletion of the primary NID if it
444 if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
449 lnet_net_lock(LNET_LOCK_EX);
450 lnet_peer_ni_del_locked(lpni);
451 lnet_net_unlock(LNET_LOCK_EX);
454 CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
455 libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
461 lnet_peer_table_cleanup_locked(struct lnet_net *net,
462 struct lnet_peer_table *ptable)
465 struct lnet_peer_ni *next;
466 struct lnet_peer_ni *lpni;
467 struct lnet_peer *peer;
469 for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
470 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
472 if (net != NULL && net != lpni->lpni_net)
475 peer = lpni->lpni_peer_net->lpn_peer;
476 if (peer->lp_primary_nid != lpni->lpni_nid) {
477 lnet_peer_ni_del_locked(lpni);
481 * Removing the primary NID implies removing
482 * the entire peer. Advance next beyond any
483 * peer_ni that belongs to the same peer.
485 list_for_each_entry_from(next, &ptable->pt_hash[i],
487 if (next->lpni_peer_net->lpn_peer != peer)
490 lnet_peer_del_locked(peer);
496 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
500 spin_lock(&ptable->pt_zombie_lock);
501 while (ptable->pt_zombies) {
502 spin_unlock(&ptable->pt_zombie_lock);
506 "Waiting for %d zombies on peer table\n",
509 set_current_state(TASK_UNINTERRUPTIBLE);
510 schedule_timeout(cfs_time_seconds(1) >> 1);
511 spin_lock(&ptable->pt_zombie_lock);
513 spin_unlock(&ptable->pt_zombie_lock);
517 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
518 struct lnet_peer_table *ptable)
520 struct lnet_peer_ni *lp;
521 struct lnet_peer_ni *tmp;
525 for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
526 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
528 if (net != lp->lpni_net)
531 if (lp->lpni_rtr_refcount == 0)
534 lpni_nid = lp->lpni_nid;
536 lnet_net_unlock(LNET_LOCK_EX);
537 lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
538 lnet_net_lock(LNET_LOCK_EX);
544 lnet_peer_tables_cleanup(struct lnet_net *net)
547 struct lnet_peer_table *ptable;
549 LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
550 /* If just deleting the peers for a NI, get rid of any routes these
551 * peers are gateways for. */
552 cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
553 lnet_net_lock(LNET_LOCK_EX);
554 lnet_peer_table_del_rtrs_locked(net, ptable);
555 lnet_net_unlock(LNET_LOCK_EX);
558 /* Start the cleanup process */
559 cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
560 lnet_net_lock(LNET_LOCK_EX);
561 lnet_peer_table_cleanup_locked(net, ptable);
562 lnet_net_unlock(LNET_LOCK_EX);
565 cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
566 lnet_peer_ni_finalize_wait(ptable);
569 static struct lnet_peer_ni *
570 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
572 struct list_head *peers;
573 struct lnet_peer_ni *lp;
575 LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
577 peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
578 list_for_each_entry(lp, peers, lpni_hashlist) {
579 if (lp->lpni_nid == nid) {
580 lnet_peer_ni_addref_locked(lp);
588 struct lnet_peer_ni *
589 lnet_find_peer_ni_locked(lnet_nid_t nid)
591 struct lnet_peer_ni *lpni;
592 struct lnet_peer_table *ptable;
595 cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
597 ptable = the_lnet.ln_peer_tables[cpt];
598 lpni = lnet_get_peer_ni_locked(ptable, nid);
603 struct lnet_peer_ni *
604 lnet_get_peer_ni_idx_locked(int idx, struct lnet_peer_net **lpn,
605 struct lnet_peer **lp)
607 struct lnet_peer_table *ptable;
608 struct lnet_peer_ni *lpni;
612 lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
614 for (cpt = 0; cpt < lncpt; cpt++) {
615 ptable = the_lnet.ln_peer_tables[cpt];
616 if (ptable->pt_peer_nnids > idx)
618 idx -= ptable->pt_peer_nnids;
623 list_for_each_entry((*lp), &ptable->pt_peer_list, lp_peer_list) {
624 if ((*lp)->lp_nnis <= idx) {
625 idx -= (*lp)->lp_nnis;
628 list_for_each_entry((*lpn), &((*lp)->lp_peer_nets),
630 list_for_each_entry(lpni, &((*lpn)->lpn_peer_nis),
641 struct lnet_peer_ni *
642 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
643 struct lnet_peer_net *peer_net,
644 struct lnet_peer_ni *prev)
646 struct lnet_peer_ni *lpni;
647 struct lnet_peer_net *net = peer_net;
651 if (list_empty(&peer->lp_peer_nets))
654 net = list_entry(peer->lp_peer_nets.next,
655 struct lnet_peer_net,
658 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
664 if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
666 * if you reached the end of the peer ni list and the peer
667 * net is specified then there are no more peer nis in that
674 * we reached the end of this net ni list. move to the
677 if (prev->lpni_peer_net->lpn_peer_nets.next ==
679 /* no more nets and no more NIs. */
682 /* get the next net */
683 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
684 struct lnet_peer_net,
686 /* get the ni on it */
687 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
693 /* there are more nis left */
694 lpni = list_entry(prev->lpni_peer_nis.next,
695 struct lnet_peer_ni, lpni_peer_nis);
701 * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
702 * this is a preferred point-to-point path. Call with lnet_net_lock in
706 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
710 if (lpni->lpni_pref_nnids == 0)
712 if (lpni->lpni_pref_nnids == 1)
713 return lpni->lpni_pref.nid == nid;
714 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
715 if (lpni->lpni_pref.nids[i] == nid)
722 * Set a single ni as preferred, provided no preferred ni is already
723 * defined. Only to be used for non-multi-rail peer_ni.
726 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
730 spin_lock(&lpni->lpni_lock);
731 if (nid == LNET_NID_ANY) {
733 } else if (lpni->lpni_pref_nnids > 0) {
735 } else if (lpni->lpni_pref_nnids == 0) {
736 lpni->lpni_pref.nid = nid;
737 lpni->lpni_pref_nnids = 1;
738 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
740 spin_unlock(&lpni->lpni_lock);
742 CDEBUG(D_NET, "peer %s nid %s: %d\n",
743 libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
748 * Clear the preferred NID from a non-multi-rail peer_ni, provided
749 * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
752 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
756 spin_lock(&lpni->lpni_lock);
757 if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
758 lpni->lpni_pref_nnids = 0;
759 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
760 } else if (lpni->lpni_pref_nnids == 0) {
765 spin_unlock(&lpni->lpni_lock);
767 CDEBUG(D_NET, "peer %s: %d\n",
768 libcfs_nid2str(lpni->lpni_nid), rc);
773 * Clear the preferred NIDs from a non-multi-rail peer.
776 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
778 struct lnet_peer_ni *lpni = NULL;
780 while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
781 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
785 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
787 lnet_nid_t *nids = NULL;
788 lnet_nid_t *oldnids = NULL;
789 struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
794 if (nid == LNET_NID_ANY) {
799 if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
804 /* A non-MR node may have only one preferred NI per peer_ni */
805 if (lpni->lpni_pref_nnids > 0) {
806 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
812 if (lpni->lpni_pref_nnids != 0) {
813 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
814 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
819 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
820 if (lpni->lpni_pref.nids[i] == nid) {
821 LIBCFS_FREE(nids, size);
825 nids[i] = lpni->lpni_pref.nids[i];
830 lnet_net_lock(LNET_LOCK_EX);
831 spin_lock(&lpni->lpni_lock);
832 if (lpni->lpni_pref_nnids == 0) {
833 lpni->lpni_pref.nid = nid;
835 oldnids = lpni->lpni_pref.nids;
836 lpni->lpni_pref.nids = nids;
838 lpni->lpni_pref_nnids++;
839 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
840 spin_unlock(&lpni->lpni_lock);
841 lnet_net_unlock(LNET_LOCK_EX);
844 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
845 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
848 if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
849 spin_lock(&lpni->lpni_lock);
850 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
851 spin_unlock(&lpni->lpni_lock);
853 CDEBUG(D_NET, "peer %s nid %s: %d\n",
854 libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
859 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
861 lnet_nid_t *nids = NULL;
862 lnet_nid_t *oldnids = NULL;
863 struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
868 if (lpni->lpni_pref_nnids == 0) {
873 if (lpni->lpni_pref_nnids == 1) {
874 if (lpni->lpni_pref.nid != nid) {
878 } else if (lpni->lpni_pref_nnids == 2) {
879 if (lpni->lpni_pref.nids[0] != nid &&
880 lpni->lpni_pref.nids[1] != nid) {
885 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
886 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
891 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
892 if (lpni->lpni_pref.nids[i] != nid)
894 nids[j++] = lpni->lpni_pref.nids[i];
896 /* Check if we actually removed a nid. */
897 if (j == lpni->lpni_pref_nnids) {
898 LIBCFS_FREE(nids, size);
904 lnet_net_lock(LNET_LOCK_EX);
905 spin_lock(&lpni->lpni_lock);
906 if (lpni->lpni_pref_nnids == 1) {
907 lpni->lpni_pref.nid = LNET_NID_ANY;
908 } else if (lpni->lpni_pref_nnids == 2) {
909 oldnids = lpni->lpni_pref.nids;
910 if (oldnids[0] == nid)
911 lpni->lpni_pref.nid = oldnids[1];
913 lpni->lpni_pref.nid = oldnids[2];
915 oldnids = lpni->lpni_pref.nids;
916 lpni->lpni_pref.nids = nids;
918 lpni->lpni_pref_nnids--;
919 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
920 spin_unlock(&lpni->lpni_lock);
921 lnet_net_unlock(LNET_LOCK_EX);
924 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
925 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
928 CDEBUG(D_NET, "peer %s nid %s: %d\n",
929 libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
934 lnet_peer_primary_nid(lnet_nid_t nid)
936 struct lnet_peer_ni *lpni;
937 lnet_nid_t primary_nid = nid;
940 cpt = lnet_net_lock_current();
941 lpni = lnet_find_peer_ni_locked(nid);
943 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
944 lnet_peer_ni_decref_locked(lpni);
946 lnet_net_unlock(cpt);
952 LNetPrimaryNID(lnet_nid_t nid)
954 struct lnet_peer_ni *lpni;
955 lnet_nid_t primary_nid = nid;
959 cpt = lnet_net_lock_current();
960 lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
965 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
966 lnet_peer_ni_decref_locked(lpni);
968 lnet_net_unlock(cpt);
970 CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
971 libcfs_nid2str(primary_nid), rc);
974 EXPORT_SYMBOL(LNetPrimaryNID);
976 struct lnet_peer_net *
977 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
979 struct lnet_peer_net *peer_net;
980 list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
981 if (peer_net->lpn_net_id == net_id)
988 * Attach a peer_ni to a peer_net and peer. This function assumes
989 * peer_ni is not already attached to the peer_net/peer. The peer_ni
990 * may be attached to a different peer, in which case it will be
991 * properly detached first. The whole operation is done atomically.
993 * Always returns 0. This is the last function called from functions
994 * that do return an int, so returning 0 here allows the compiler to
998 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
999 struct lnet_peer_net *lpn,
1000 struct lnet_peer_ni *lpni,
1003 struct lnet_peer_table *ptable;
1005 /* Install the new peer_ni */
1006 lnet_net_lock(LNET_LOCK_EX);
1007 /* Add peer_ni to global peer table hash, if necessary. */
1008 if (list_empty(&lpni->lpni_hashlist)) {
1009 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1011 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1012 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1013 ptable->pt_version++;
1014 ptable->pt_number++;
1015 /* This is the 1st refcount on lpni. */
1016 atomic_inc(&lpni->lpni_refcount);
1019 /* Detach the peer_ni from an existing peer, if necessary. */
1020 if (lpni->lpni_peer_net) {
1021 LASSERT(lpni->lpni_peer_net != lpn);
1022 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1023 lnet_peer_detach_peer_ni_locked(lpni);
1024 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1025 lpni->lpni_peer_net = NULL;
1028 /* Add peer_ni to peer_net */
1029 lpni->lpni_peer_net = lpn;
1030 list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1031 lnet_peer_net_addref_locked(lpn);
1033 /* Add peer_net to peer */
1034 if (!lpn->lpn_peer) {
1036 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1037 lnet_peer_addref_locked(lp);
1040 /* Add peer to global peer list, if necessary */
1041 ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1042 if (list_empty(&lp->lp_peer_list)) {
1043 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1048 /* Update peer state */
1049 spin_lock(&lp->lp_lock);
1050 if (flags & LNET_PEER_CONFIGURED) {
1051 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1052 lp->lp_state |= LNET_PEER_CONFIGURED;
1054 if (flags & LNET_PEER_MULTI_RAIL) {
1055 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1056 lp->lp_state |= LNET_PEER_MULTI_RAIL;
1057 lnet_peer_clr_non_mr_pref_nids(lp);
1060 spin_unlock(&lp->lp_lock);
1063 the_lnet.ln_peer_tables[lp->lp_cpt]->pt_peer_nnids++;
1064 lnet_net_unlock(LNET_LOCK_EX);
1066 CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1067 libcfs_nid2str(lp->lp_primary_nid),
1068 libcfs_nid2str(lpni->lpni_nid), flags);
1074 * Create a new peer, with nid as its primary nid.
1076 * Call with the lnet_api_mutex held.
1079 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1081 struct lnet_peer *lp;
1082 struct lnet_peer_net *lpn;
1083 struct lnet_peer_ni *lpni;
1086 LASSERT(nid != LNET_NID_ANY);
1089 * No need for the lnet_net_lock here, because the
1090 * lnet_api_mutex is held.
1092 lpni = lnet_find_peer_ni_locked(nid);
1094 /* A peer with this NID already exists. */
1095 lp = lpni->lpni_peer_net->lpn_peer;
1096 lnet_peer_ni_decref_locked(lpni);
1098 * This is an error if the peer was configured and the
1099 * primary NID differs or an attempt is made to change
1100 * the Multi-Rail flag. Otherwise the assumption is
1101 * that an existing peer is being modified.
1103 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1104 if (lp->lp_primary_nid != nid)
1106 else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1110 /* Delete and recreate as a configured peer. */
1114 /* Create peer, peer_net, and peer_ni. */
1116 lp = lnet_peer_alloc(nid);
1119 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1122 lpni = lnet_peer_ni_alloc(nid);
1126 return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1129 LIBCFS_FREE(lpn, sizeof(*lpn));
1131 LIBCFS_FREE(lp, sizeof(*lp));
1133 CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1134 libcfs_nid2str(nid), flags, rc);
1139 * Add a NID to a peer. Call with ln_api_mutex held.
1142 * -EPERM: Non-DLC addition to a DLC-configured peer.
1143 * -EEXIST: The NID was configured by DLC for a different peer.
1144 * -ENOMEM: Out of memory.
1145 * -ENOTUNIQ: Adding a second peer NID on a single network on a
1146 * non-multi-rail peer.
1149 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1151 struct lnet_peer_net *lpn;
1152 struct lnet_peer_ni *lpni;
1156 LASSERT(nid != LNET_NID_ANY);
1158 /* A configured peer can only be updated through configuration. */
1159 if (!(flags & LNET_PEER_CONFIGURED)) {
1160 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1167 * The MULTI_RAIL flag can be set but not cleared, because
1168 * that would leave the peer struct in an invalid state.
1170 if (flags & LNET_PEER_MULTI_RAIL) {
1171 spin_lock(&lp->lp_lock);
1172 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1173 lp->lp_state |= LNET_PEER_MULTI_RAIL;
1174 lnet_peer_clr_non_mr_pref_nids(lp);
1176 spin_unlock(&lp->lp_lock);
1177 } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1182 lpni = lnet_find_peer_ni_locked(nid);
1185 * A peer_ni already exists. This is only a problem if
1186 * it is not connected to this peer and was configured
1189 lnet_peer_ni_decref_locked(lpni);
1190 if (lpni->lpni_peer_net->lpn_peer == lp)
1192 if (lnet_peer_ni_is_configured(lpni)) {
1196 /* If this is the primary NID, destroy the peer. */
1197 if (lnet_peer_ni_is_primary(lpni)) {
1198 lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1199 lpni = lnet_peer_ni_alloc(nid);
1206 lpni = lnet_peer_ni_alloc(nid);
1214 * Get the peer_net. Check that we're not adding a second
1215 * peer_ni on a peer_net of a non-multi-rail peer.
1217 lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1219 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1224 } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1229 return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1232 /* If the peer_ni was allocated above its peer_net pointer is NULL */
1233 if (!lpni->lpni_peer_net)
1234 LIBCFS_FREE(lpni, sizeof(*lpni));
1236 CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1237 libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1243 * lpni creation initiated due to traffic either sending or receiving.
1246 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1248 struct lnet_peer *lp;
1249 struct lnet_peer_net *lpn;
1250 struct lnet_peer_ni *lpni;
1254 if (nid == LNET_NID_ANY) {
1259 /* lnet_net_lock is not needed here because ln_api_lock is held */
1260 lpni = lnet_find_peer_ni_locked(nid);
1263 * We must have raced with another thread. Since we
1264 * know next to nothing about a peer_ni created by
1265 * traffic, we just assume everything is ok and
1268 lnet_peer_ni_decref_locked(lpni);
1272 /* Create peer, peer_net, and peer_ni. */
1274 lp = lnet_peer_alloc(nid);
1277 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1280 lpni = lnet_peer_ni_alloc(nid);
1283 if (pref != LNET_NID_ANY)
1284 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1286 return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1289 LIBCFS_FREE(lpn, sizeof(*lpn));
1291 LIBCFS_FREE(lp, sizeof(*lp));
1293 CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1298 * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1300 * This API handles the following combinations:
1301 * Create a peer with its primary NI if only the prim_nid is provided
1302 * Add a NID to a peer identified by the prim_nid. The peer identified
1303 * by the prim_nid must already exist.
1304 * The peer being created may be non-MR.
1306 * The caller must hold ln_api_mutex. This prevents the peer from
1307 * being created/modified/deleted by a different thread.
1310 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1312 struct lnet_peer *lp = NULL;
1313 struct lnet_peer_ni *lpni;
1316 /* The prim_nid must always be specified */
1317 if (prim_nid == LNET_NID_ANY)
1320 flags = LNET_PEER_CONFIGURED;
1322 flags |= LNET_PEER_MULTI_RAIL;
1325 * If nid isn't specified, we must create a new peer with
1326 * prim_nid as its primary nid.
1328 if (nid == LNET_NID_ANY)
1329 return lnet_peer_add(prim_nid, flags);
1331 /* Look up the prim_nid, which must exist. */
1332 lpni = lnet_find_peer_ni_locked(prim_nid);
1335 lnet_peer_ni_decref_locked(lpni);
1336 lp = lpni->lpni_peer_net->lpn_peer;
1338 /* Peer must have been configured. */
1339 if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1340 CDEBUG(D_NET, "peer %s was not configured\n",
1341 libcfs_nid2str(prim_nid));
1345 /* Primary NID must match */
1346 if (lp->lp_primary_nid != prim_nid) {
1347 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1348 libcfs_nid2str(prim_nid),
1349 libcfs_nid2str(lp->lp_primary_nid));
1353 /* Multi-Rail flag must match. */
1354 if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1355 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1356 libcfs_nid2str(prim_nid));
1360 return lnet_peer_add_nid(lp, nid, flags);
1364 * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1366 * This API handles the following combinations:
1367 * Delete a NI from a peer if both prim_nid and nid are provided.
1368 * Delete a peer if only prim_nid is provided.
1369 * Delete a peer if its primary nid is provided.
1371 * The caller must hold ln_api_mutex. This prevents the peer from
1372 * being modified/deleted by a different thread.
1375 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1377 struct lnet_peer *lp;
1378 struct lnet_peer_ni *lpni;
1381 if (prim_nid == LNET_NID_ANY)
1384 lpni = lnet_find_peer_ni_locked(prim_nid);
1387 lnet_peer_ni_decref_locked(lpni);
1388 lp = lpni->lpni_peer_net->lpn_peer;
1390 if (prim_nid != lp->lp_primary_nid) {
1391 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1392 libcfs_nid2str(prim_nid),
1393 libcfs_nid2str(lp->lp_primary_nid));
1397 if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1398 return lnet_peer_del(lp);
1400 flags = LNET_PEER_CONFIGURED;
1401 if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1402 flags |= LNET_PEER_MULTI_RAIL;
1404 return lnet_peer_del_nid(lp, nid, flags);
1408 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1410 struct lnet_peer_table *ptable;
1411 struct lnet_peer_net *lpn;
1413 CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1415 LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1416 LASSERT(lpni->lpni_rtr_refcount == 0);
1417 LASSERT(list_empty(&lpni->lpni_txq));
1418 LASSERT(lpni->lpni_txqnob == 0);
1420 lpn = lpni->lpni_peer_net;
1421 lpni->lpni_peer_net = NULL;
1422 lpni->lpni_net = NULL;
1424 /* remove the peer ni from the zombie list */
1425 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1426 spin_lock(&ptable->pt_zombie_lock);
1427 list_del_init(&lpni->lpni_hashlist);
1428 ptable->pt_zombies--;
1429 spin_unlock(&ptable->pt_zombie_lock);
1431 if (lpni->lpni_pref_nnids > 1) {
1432 LIBCFS_FREE(lpni->lpni_pref.nids,
1433 sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1435 LIBCFS_FREE(lpni, sizeof(*lpni));
1437 lnet_peer_net_decref_locked(lpn);
1440 struct lnet_peer_ni *
1441 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1443 struct lnet_peer_ni *lpni = NULL;
1446 if (the_lnet.ln_state != LNET_STATE_RUNNING)
1447 return ERR_PTR(-ESHUTDOWN);
1450 * find if a peer_ni already exists.
1451 * If so then just return that.
1453 lpni = lnet_find_peer_ni_locked(nid);
1457 lnet_net_unlock(cpt);
1459 rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1462 goto out_net_relock;
1465 lpni = lnet_find_peer_ni_locked(nid);
1475 * Get a peer_ni for the given nid, create it if necessary. Takes a
1476 * hold on the peer_ni.
1478 struct lnet_peer_ni *
1479 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1481 struct lnet_peer_ni *lpni = NULL;
1484 if (the_lnet.ln_state != LNET_STATE_RUNNING)
1485 return ERR_PTR(-ESHUTDOWN);
1488 * find if a peer_ni already exists.
1489 * If so then just return that.
1491 lpni = lnet_find_peer_ni_locked(nid);
1497 * use the lnet_api_mutex to serialize the creation of the peer_ni
1498 * and the creation/deletion of the local ni/net. When a local ni is
1499 * created, if there exists a set of peer_nis on that network,
1500 * they need to be traversed and updated. When a local NI is
1501 * deleted, which could result in a network being deleted, then
1502 * all peer nis on that network need to be removed as well.
1504 * Creation through traffic should also be serialized with
1505 * creation through DLC.
1507 lnet_net_unlock(cpt);
1508 mutex_lock(&the_lnet.ln_api_mutex);
1510 * Shutdown is only set under the ln_api_lock, so a single
1511 * check here is sufficent.
1513 if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1514 lpni = ERR_PTR(-ESHUTDOWN);
1515 goto out_mutex_unlock;
1518 rc = lnet_peer_ni_traffic_add(nid, pref);
1521 goto out_mutex_unlock;
1524 lpni = lnet_find_peer_ni_locked(nid);
1528 mutex_unlock(&the_lnet.ln_api_mutex);
1531 /* Lock has been dropped, check again for shutdown. */
1532 if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1534 lnet_peer_ni_decref_locked(lpni);
1535 lpni = ERR_PTR(-ESHUTDOWN);
1546 lnet_peer_is_uptodate(struct lnet_peer *lp)
1550 spin_lock(&lp->lp_lock);
1551 if (lp->lp_state & LNET_PEER_DISCOVERING) {
1553 } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1554 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1558 } else if (lp->lp_state & LNET_PEER_UNDISCOVERED) {
1559 if (lnet_peer_discovery_enabled)
1566 spin_unlock(&lp->lp_lock);
1572 * Queue a peer for the attention of the discovery thread. Call with
1573 * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1574 * -EALREADY if the peer was already queued.
1576 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1580 spin_lock(&lp->lp_lock);
1581 if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1582 lp->lp_state |= LNET_PEER_DISCOVERING;
1583 if (!(lp->lp_state & LNET_PEER_QUEUED)) {
1584 lp->lp_state |= LNET_PEER_QUEUED;
1585 spin_unlock(&lp->lp_lock);
1586 lnet_peer_addref_locked(lp);
1587 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1588 wake_up(&the_lnet.ln_dc_waitq);
1591 spin_unlock(&lp->lp_lock);
1599 * Discovery of a peer is complete. Wake all waiters on the peer.
1600 * Call with lnet_net_lock/EX held.
1602 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1604 spin_lock(&lp->lp_lock);
1605 lp->lp_state &= ~LNET_PEER_QUEUED;
1606 spin_unlock(&lp->lp_lock);
1607 list_del_init(&lp->lp_dc_list);
1608 wake_up_all(&lp->lp_dc_waitq);
1609 lnet_peer_decref_locked(lp);
1613 * Peer discovery slow path. The ln_api_mutex is held on entry, and
1614 * dropped/retaken within this function. An lnet_peer_ni is passed in
1615 * because discovery could tear down an lnet_peer.
1618 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt)
1621 struct lnet_peer *lp;
1625 lnet_net_unlock(cpt);
1626 lnet_net_lock(LNET_LOCK_EX);
1628 /* We're willing to be interrupted. */
1630 lp = lpni->lpni_peer_net->lpn_peer;
1631 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
1632 if (signal_pending(current))
1634 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
1636 if (lnet_peer_is_uptodate(lp))
1638 lnet_peer_queue_for_discovery(lp);
1639 lnet_peer_addref_locked(lp);
1640 lnet_net_unlock(LNET_LOCK_EX);
1642 finish_wait(&lp->lp_dc_waitq, &wait);
1643 lnet_net_lock(LNET_LOCK_EX);
1644 lnet_peer_decref_locked(lp);
1645 /* Do not use lp beyond this point. */
1647 finish_wait(&lp->lp_dc_waitq, &wait);
1649 lnet_net_unlock(LNET_LOCK_EX);
1652 if (signal_pending(current))
1654 else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
1656 else if (!lnet_peer_is_uptodate(lp))
1663 * Event handler for the discovery EQ.
1665 * Called with lnet_res_lock(cpt) held. The cpt is the
1666 * lnet_cpt_of_cookie() of the md handle cookie.
1668 static void lnet_discovery_event_handler(lnet_event_t *event)
1670 wake_up(&the_lnet.ln_dc_waitq);
1674 * Wait for work to be queued or some other change that must be
1675 * attended to. Returns non-zero if the discovery thread should shut
1678 static int lnet_peer_discovery_wait_for_work(void)
1685 cpt = lnet_net_lock_current();
1687 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
1688 TASK_INTERRUPTIBLE);
1689 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1691 if (!list_empty(&the_lnet.ln_dc_request))
1693 lnet_net_unlock(cpt);
1695 finish_wait(&the_lnet.ln_dc_waitq, &wait);
1696 cpt = lnet_net_lock_current();
1698 finish_wait(&the_lnet.ln_dc_waitq, &wait);
1700 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1703 lnet_net_unlock(cpt);
1705 CDEBUG(D_NET, "woken: %d\n", rc);
1710 /* The discovery thread. */
1711 static int lnet_peer_discovery(void *arg)
1713 struct lnet_peer *lp;
1715 CDEBUG(D_NET, "started\n");
1716 cfs_block_allsigs();
1719 if (lnet_peer_discovery_wait_for_work())
1722 lnet_net_lock(LNET_LOCK_EX);
1723 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1725 while (!list_empty(&the_lnet.ln_dc_request)) {
1726 lp = list_first_entry(&the_lnet.ln_dc_request,
1727 struct lnet_peer, lp_dc_list);
1728 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
1729 lnet_net_unlock(LNET_LOCK_EX);
1731 /* Just tag and release for now. */
1732 spin_lock(&lp->lp_lock);
1733 if (lnet_peer_discovery_enabled) {
1734 lp->lp_state |= (LNET_PEER_DISCOVERED |
1735 LNET_PEER_NIDS_UPTODATE);
1736 lp->lp_state &= ~(LNET_PEER_UNDISCOVERED |
1737 LNET_PEER_DISCOVERING);
1739 lp->lp_state |= LNET_PEER_UNDISCOVERED;
1740 lp->lp_state &= ~(LNET_PEER_DISCOVERED |
1741 LNET_PEER_NIDS_UPTODATE |
1742 LNET_PEER_DISCOVERING);
1744 spin_unlock(&lp->lp_lock);
1746 lnet_net_lock(LNET_LOCK_EX);
1747 if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1748 lnet_peer_discovery_complete(lp);
1749 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1752 lnet_net_unlock(LNET_LOCK_EX);
1755 CDEBUG(D_NET, "stopping\n");
1757 * Clean up before telling lnet_peer_discovery_stop() that
1758 * we're done. Use wake_up() below to somewhat reduce the
1759 * size of the thundering herd if there are multiple threads
1760 * waiting on discovery of a single peer.
1762 LNetEQFree(the_lnet.ln_dc_eqh);
1763 LNetInvalidateHandle(&the_lnet.ln_dc_eqh);
1765 lnet_net_lock(LNET_LOCK_EX);
1766 list_for_each_entry(lp, &the_lnet.ln_dc_request, lp_dc_list) {
1767 spin_lock(&lp->lp_lock);
1768 lp->lp_state |= LNET_PEER_UNDISCOVERED;
1769 lp->lp_state &= ~(LNET_PEER_DISCOVERED |
1770 LNET_PEER_DISCOVERING |
1771 LNET_PEER_NIDS_UPTODATE);
1772 spin_unlock(&lp->lp_lock);
1773 lnet_peer_discovery_complete(lp);
1775 list_for_each_entry(lp, &the_lnet.ln_dc_working, lp_dc_list) {
1776 spin_lock(&lp->lp_lock);
1777 lp->lp_state |= LNET_PEER_UNDISCOVERED;
1778 lp->lp_state &= ~(LNET_PEER_DISCOVERED |
1779 LNET_PEER_DISCOVERING |
1780 LNET_PEER_NIDS_UPTODATE);
1781 spin_unlock(&lp->lp_lock);
1782 lnet_peer_discovery_complete(lp);
1784 lnet_net_unlock(LNET_LOCK_EX);
1786 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
1787 wake_up(&the_lnet.ln_dc_waitq);
1789 CDEBUG(D_NET, "stopped\n");
1794 /* ln_api_mutex is held on entry. */
1795 int lnet_peer_discovery_start(void)
1797 struct task_struct *task;
1800 if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
1803 INIT_LIST_HEAD(&the_lnet.ln_dc_request);
1804 INIT_LIST_HEAD(&the_lnet.ln_dc_working);
1805 init_waitqueue_head(&the_lnet.ln_dc_waitq);
1807 rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
1809 CERROR("Can't allocate discovery EQ: %d\n", rc);
1813 the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
1814 task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
1817 CERROR("Can't start peer discovery thread: %d\n", rc);
1819 LNetEQFree(the_lnet.ln_dc_eqh);
1820 LNetInvalidateHandle(&the_lnet.ln_dc_eqh);
1822 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
1828 /* ln_api_mutex is held on entry. */
1829 void lnet_peer_discovery_stop(void)
1831 if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
1834 LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
1835 the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
1836 wake_up(&the_lnet.ln_dc_waitq);
1838 wait_event(the_lnet.ln_dc_waitq,
1839 the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
1841 LASSERT(list_empty(&the_lnet.ln_dc_request));
1842 LASSERT(list_empty(&the_lnet.ln_dc_working));
1848 lnet_debug_peer(lnet_nid_t nid)
1850 char *aliveness = "NA";
1851 struct lnet_peer_ni *lp;
1854 cpt = lnet_cpt_of_nid(nid, NULL);
1857 lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1859 lnet_net_unlock(cpt);
1860 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
1864 if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
1865 aliveness = lp->lpni_alive ? "up" : "down";
1867 CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
1868 libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
1869 aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
1870 lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
1871 lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
1873 lnet_peer_ni_decref_locked(lp);
1875 lnet_net_unlock(cpt);
1878 /* Gathering information for userspace. */
1880 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
1881 char aliveness[LNET_MAX_STR_LEN],
1882 __u32 *cpt_iter, __u32 *refcount,
1883 __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
1884 __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
1885 __u32 *peer_tx_qnob)
1887 struct lnet_peer_table *peer_table;
1888 struct lnet_peer_ni *lp;
1893 /* get the number of CPTs */
1894 lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
1896 /* if the cpt number to be examined is >= the number of cpts in
1897 * the system then indicate that there are no more cpts to examin
1899 if (*cpt_iter >= lncpt)
1902 /* get the current table */
1903 peer_table = the_lnet.ln_peer_tables[*cpt_iter];
1904 /* if the ptable is NULL then there are no more cpts to examine */
1905 if (peer_table == NULL)
1908 lnet_net_lock(*cpt_iter);
1910 for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
1911 struct list_head *peers = &peer_table->pt_hash[j];
1913 list_for_each_entry(lp, peers, lpni_hashlist) {
1914 if (peer_index-- > 0)
1917 snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
1918 if (lnet_isrouter(lp) ||
1919 lnet_peer_aliveness_enabled(lp))
1920 snprintf(aliveness, LNET_MAX_STR_LEN,
1921 lp->lpni_alive ? "up" : "down");
1923 *nid = lp->lpni_nid;
1924 *refcount = atomic_read(&lp->lpni_refcount);
1925 *ni_peer_tx_credits =
1926 lp->lpni_net->net_tunables.lct_peer_tx_credits;
1927 *peer_tx_credits = lp->lpni_txcredits;
1928 *peer_rtr_credits = lp->lpni_rtrcredits;
1929 *peer_min_rtr_credits = lp->lpni_mintxcredits;
1930 *peer_tx_qnob = lp->lpni_txqnob;
1936 lnet_net_unlock(*cpt_iter);
1940 return found ? 0 : -ENOENT;
1943 /* ln_api_mutex is held, which keeps the peer list stable */
1944 int lnet_get_peer_info(__u32 idx, lnet_nid_t *primary_nid, lnet_nid_t *nid,
1945 bool *mr, struct lnet_peer_ni_credit_info *peer_ni_info,
1946 struct lnet_ioctl_element_stats *peer_ni_stats)
1948 struct lnet_peer_ni *lpni = NULL;
1949 struct lnet_peer_net *lpn = NULL;
1950 struct lnet_peer *lp = NULL;
1952 lpni = lnet_get_peer_ni_idx_locked(idx, &lpn, &lp);
1957 *primary_nid = lp->lp_primary_nid;
1958 *mr = lnet_peer_is_multi_rail(lp);
1959 *nid = lpni->lpni_nid;
1960 snprintf(peer_ni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
1961 if (lnet_isrouter(lpni) ||
1962 lnet_peer_aliveness_enabled(lpni))
1963 snprintf(peer_ni_info->cr_aliveness, LNET_MAX_STR_LEN,
1964 lpni->lpni_alive ? "up" : "down");
1966 peer_ni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
1967 peer_ni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
1968 lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
1969 peer_ni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
1970 peer_ni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
1971 peer_ni_info->cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
1972 peer_ni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
1973 peer_ni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
1975 peer_ni_stats->send_count = atomic_read(&lpni->lpni_stats.send_count);
1976 peer_ni_stats->recv_count = atomic_read(&lpni->lpni_stats.recv_count);
1977 peer_ni_stats->drop_count = atomic_read(&lpni->lpni_stats.drop_count);