Whamcloud - gitweb
7fe9dd14c7576b156b3a61a125d5dc8eae4506a7
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2014, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <lnet/lib-lnet.h>
38 #include <lnet/lib-dlc.h>
39
40 unsigned lnet_peer_discovery_enabled = 1;
41 module_param(lnet_peer_discovery_enabled, uint, 0644);
42 MODULE_PARM_DESC(lnet_peer_discovery_enabled,
43                 "Explicitly enable/disable peer discovery");
44
45 static void
46 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
47 {
48         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
49                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
50                 lnet_peer_ni_decref_locked(lpni);
51         }
52 }
53
54 void
55 lnet_peer_net_added(struct lnet_net *net)
56 {
57         struct lnet_peer_ni *lpni, *tmp;
58
59         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
60                                  lpni_on_remote_peer_ni_list) {
61
62                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
63                         lpni->lpni_net = net;
64
65                         spin_lock(&lpni->lpni_lock);
66                         lpni->lpni_txcredits =
67                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
68                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
69                         lpni->lpni_rtrcredits =
70                                 lnet_peer_buffer_credits(lpni->lpni_net);
71                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
72                         spin_unlock(&lpni->lpni_lock);
73
74                         lnet_peer_remove_from_remote_list(lpni);
75                 }
76         }
77 }
78
79 static void
80 lnet_peer_tables_destroy(void)
81 {
82         struct lnet_peer_table  *ptable;
83         struct list_head        *hash;
84         int                     i;
85         int                     j;
86
87         if (!the_lnet.ln_peer_tables)
88                 return;
89
90         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
91                 hash = ptable->pt_hash;
92                 if (!hash) /* not intialized */
93                         break;
94
95                 LASSERT(list_empty(&ptable->pt_zombie_list));
96
97                 ptable->pt_hash = NULL;
98                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
99                         LASSERT(list_empty(&hash[j]));
100
101                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
102         }
103
104         cfs_percpt_free(the_lnet.ln_peer_tables);
105         the_lnet.ln_peer_tables = NULL;
106 }
107
108 int
109 lnet_peer_tables_create(void)
110 {
111         struct lnet_peer_table  *ptable;
112         struct list_head        *hash;
113         int                     i;
114         int                     j;
115
116         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
117                                                    sizeof(*ptable));
118         if (the_lnet.ln_peer_tables == NULL) {
119                 CERROR("Failed to allocate cpu-partition peer tables\n");
120                 return -ENOMEM;
121         }
122
123         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
124                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
125                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
126                 if (hash == NULL) {
127                         CERROR("Failed to create peer hash table\n");
128                         lnet_peer_tables_destroy();
129                         return -ENOMEM;
130                 }
131
132                 spin_lock_init(&ptable->pt_zombie_lock);
133                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
134
135                 INIT_LIST_HEAD(&ptable->pt_peer_list);
136
137                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
138                         INIT_LIST_HEAD(&hash[j]);
139                 ptable->pt_hash = hash; /* sign of initialization */
140         }
141
142         return 0;
143 }
144
145 static struct lnet_peer_ni *
146 lnet_peer_ni_alloc(lnet_nid_t nid)
147 {
148         struct lnet_peer_ni *lpni;
149         struct lnet_net *net;
150         int cpt;
151
152         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
153
154         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
155         if (!lpni)
156                 return NULL;
157
158         INIT_LIST_HEAD(&lpni->lpni_txq);
159         INIT_LIST_HEAD(&lpni->lpni_rtrq);
160         INIT_LIST_HEAD(&lpni->lpni_routes);
161         INIT_LIST_HEAD(&lpni->lpni_hashlist);
162         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
163         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
164
165         spin_lock_init(&lpni->lpni_lock);
166
167         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
168         lpni->lpni_last_alive = cfs_time_current(); /* assumes alive */
169         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
170         lpni->lpni_nid = nid;
171         lpni->lpni_cpt = cpt;
172         lnet_set_peer_ni_health_locked(lpni, true);
173
174         net = lnet_get_net_locked(LNET_NIDNET(nid));
175         lpni->lpni_net = net;
176         if (net) {
177                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
178                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
179                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
180                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
181         } else {
182                 /*
183                  * This peer_ni is not on a local network, so we
184                  * cannot add the credits here. In case the net is
185                  * added later, add the peer_ni to the remote peer ni
186                  * list so it can be easily found and revisited.
187                  */
188                 /* FIXME: per-net implementation instead? */
189                 atomic_inc(&lpni->lpni_refcount);
190                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
191                               &the_lnet.ln_remote_peer_ni_list);
192         }
193
194         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
195
196         return lpni;
197 }
198
199 static struct lnet_peer_net *
200 lnet_peer_net_alloc(__u32 net_id)
201 {
202         struct lnet_peer_net *lpn;
203
204         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
205         if (!lpn)
206                 return NULL;
207
208         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
209         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
210         lpn->lpn_net_id = net_id;
211
212         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
213
214         return lpn;
215 }
216
217 void
218 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
219 {
220         struct lnet_peer *lp;
221
222         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
223
224         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
225         LASSERT(list_empty(&lpn->lpn_peer_nis));
226         LASSERT(list_empty(&lpn->lpn_peer_nets));
227         lp = lpn->lpn_peer;
228         lpn->lpn_peer = NULL;
229         LIBCFS_FREE(lpn, sizeof(*lpn));
230
231         lnet_peer_decref_locked(lp);
232 }
233
234 static struct lnet_peer *
235 lnet_peer_alloc(lnet_nid_t nid)
236 {
237         struct lnet_peer *lp;
238
239         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
240         if (!lp)
241                 return NULL;
242
243         INIT_LIST_HEAD(&lp->lp_peer_list);
244         INIT_LIST_HEAD(&lp->lp_peer_nets);
245         INIT_LIST_HEAD(&lp->lp_dc_list);
246         init_waitqueue_head(&lp->lp_dc_waitq);
247         spin_lock_init(&lp->lp_lock);
248         lp->lp_primary_nid = nid;
249         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
250
251         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
252
253         return lp;
254 }
255
256 void
257 lnet_destroy_peer_locked(struct lnet_peer *lp)
258 {
259         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
260
261         LASSERT(atomic_read(&lp->lp_refcount) == 0);
262         LASSERT(list_empty(&lp->lp_peer_nets));
263         LASSERT(list_empty(&lp->lp_peer_list));
264
265         LIBCFS_FREE(lp, sizeof(*lp));
266 }
267
268 /*
269  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
270  * that peer_net, detach the peer_net from the peer.
271  *
272  * Call with lnet_net_lock/EX held
273  */
274 static void
275 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
276 {
277         struct lnet_peer_table *ptable;
278         struct lnet_peer_net *lpn;
279         struct lnet_peer *lp;
280
281         /*
282          * Belts and suspenders: gracefully handle teardown of a
283          * partially connected peer_ni.
284          */
285         lpn = lpni->lpni_peer_net;
286
287         list_del_init(&lpni->lpni_peer_nis);
288         /*
289          * If there are no lpni's left, we detach lpn from
290          * lp_peer_nets, so it cannot be found anymore.
291          */
292         if (list_empty(&lpn->lpn_peer_nis))
293                 list_del_init(&lpn->lpn_peer_nets);
294
295         /* Update peer NID count. */
296         lp = lpn->lpn_peer;
297         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
298         lp->lp_nnis--;
299         ptable->pt_peer_nnids--;
300
301         /*
302          * If there are no more peer nets, make the peer unfindable
303          * via the peer_tables.
304          */
305         if (list_empty(&lp->lp_peer_nets)) {
306                 list_del_init(&lp->lp_peer_list);
307                 ptable->pt_peers--;
308         }
309         CDEBUG(D_NET, "peer %s NID %s\n",
310                 libcfs_nid2str(lp->lp_primary_nid),
311                 libcfs_nid2str(lpni->lpni_nid));
312 }
313
314 /* called with lnet_net_lock LNET_LOCK_EX held */
315 static int
316 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
317 {
318         struct lnet_peer_table *ptable = NULL;
319
320         /* don't remove a peer_ni if it's also a gateway */
321         if (lpni->lpni_rtr_refcount > 0) {
322                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
323                        libcfs_nid2str(lpni->lpni_nid));
324                 return -EBUSY;
325         }
326
327         lnet_peer_remove_from_remote_list(lpni);
328
329         /* remove peer ni from the hash list. */
330         list_del_init(&lpni->lpni_hashlist);
331
332         /* decrement the ref count on the peer table */
333         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
334         LASSERT(ptable->pt_number > 0);
335         ptable->pt_number--;
336
337         /*
338          * The peer_ni can no longer be found with a lookup. But there
339          * can be current users, so keep track of it on the zombie
340          * list until the reference count has gone to zero.
341          *
342          * The last reference may be lost in a place where the
343          * lnet_net_lock locks only a single cpt, and that cpt may not
344          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
345          * has its own lock.
346          */
347         spin_lock(&ptable->pt_zombie_lock);
348         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
349         ptable->pt_zombies++;
350         spin_unlock(&ptable->pt_zombie_lock);
351
352         /* no need to keep this peer_ni on the hierarchy anymore */
353         lnet_peer_detach_peer_ni_locked(lpni);
354
355         /* remove hashlist reference on peer_ni */
356         lnet_peer_ni_decref_locked(lpni);
357
358         return 0;
359 }
360
361 void lnet_peer_uninit(void)
362 {
363         struct lnet_peer_ni *lpni, *tmp;
364
365         lnet_net_lock(LNET_LOCK_EX);
366
367         /* remove all peer_nis from the remote peer and the hash list */
368         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
369                                  lpni_on_remote_peer_ni_list)
370                 lnet_peer_ni_del_locked(lpni);
371
372         lnet_peer_tables_destroy();
373
374         lnet_net_unlock(LNET_LOCK_EX);
375 }
376
377 static int
378 lnet_peer_del_locked(struct lnet_peer *peer)
379 {
380         struct lnet_peer_ni *lpni = NULL, *lpni2;
381         int rc = 0, rc2 = 0;
382
383         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
384
385         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
386         while (lpni != NULL) {
387                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
388                 rc = lnet_peer_ni_del_locked(lpni);
389                 if (rc != 0)
390                         rc2 = rc;
391                 lpni = lpni2;
392         }
393
394         return rc2;
395 }
396
397 static int
398 lnet_peer_del(struct lnet_peer *peer)
399 {
400         lnet_net_lock(LNET_LOCK_EX);
401         lnet_peer_del_locked(peer);
402         lnet_net_unlock(LNET_LOCK_EX);
403
404         return 0;
405 }
406
407 /*
408  * Delete a NID from a peer. Call with ln_api_mutex held.
409  *
410  * Error codes:
411  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
412  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
413  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
414  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
415  */
416 static int
417 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
418 {
419         struct lnet_peer_ni *lpni;
420         lnet_nid_t primary_nid = lp->lp_primary_nid;
421         int rc = 0;
422
423         if (!(flags & LNET_PEER_CONFIGURED)) {
424                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
425                         rc = -EPERM;
426                         goto out;
427                 }
428         }
429         lpni = lnet_find_peer_ni_locked(nid);
430         if (!lpni) {
431                 rc = -ENOENT;
432                 goto out;
433         }
434         lnet_peer_ni_decref_locked(lpni);
435         if (lp != lpni->lpni_peer_net->lpn_peer) {
436                 rc = -ECHILD;
437                 goto out;
438         }
439
440         /*
441          * This function only allows deletion of the primary NID if it
442          * is the only NID.
443          */
444         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
445                 rc = -EBUSY;
446                 goto out;
447         }
448
449         lnet_net_lock(LNET_LOCK_EX);
450         lnet_peer_ni_del_locked(lpni);
451         lnet_net_unlock(LNET_LOCK_EX);
452
453 out:
454         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
455                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
456
457         return rc;
458 }
459
460 static void
461 lnet_peer_table_cleanup_locked(struct lnet_net *net,
462                                struct lnet_peer_table *ptable)
463 {
464         int                      i;
465         struct lnet_peer_ni     *next;
466         struct lnet_peer_ni     *lpni;
467         struct lnet_peer        *peer;
468
469         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
470                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
471                                          lpni_hashlist) {
472                         if (net != NULL && net != lpni->lpni_net)
473                                 continue;
474
475                         peer = lpni->lpni_peer_net->lpn_peer;
476                         if (peer->lp_primary_nid != lpni->lpni_nid) {
477                                 lnet_peer_ni_del_locked(lpni);
478                                 continue;
479                         }
480                         /*
481                          * Removing the primary NID implies removing
482                          * the entire peer. Advance next beyond any
483                          * peer_ni that belongs to the same peer.
484                          */
485                         list_for_each_entry_from(next, &ptable->pt_hash[i],
486                                                  lpni_hashlist) {
487                                 if (next->lpni_peer_net->lpn_peer != peer)
488                                         break;
489                         }
490                         lnet_peer_del_locked(peer);
491                 }
492         }
493 }
494
495 static void
496 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
497 {
498         int     i = 3;
499
500         spin_lock(&ptable->pt_zombie_lock);
501         while (ptable->pt_zombies) {
502                 spin_unlock(&ptable->pt_zombie_lock);
503
504                 if (IS_PO2(i)) {
505                         CDEBUG(D_WARNING,
506                                "Waiting for %d zombies on peer table\n",
507                                ptable->pt_zombies);
508                 }
509                 set_current_state(TASK_UNINTERRUPTIBLE);
510                 schedule_timeout(cfs_time_seconds(1) >> 1);
511                 spin_lock(&ptable->pt_zombie_lock);
512         }
513         spin_unlock(&ptable->pt_zombie_lock);
514 }
515
516 static void
517 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
518                                 struct lnet_peer_table *ptable)
519 {
520         struct lnet_peer_ni     *lp;
521         struct lnet_peer_ni     *tmp;
522         lnet_nid_t              lpni_nid;
523         int                     i;
524
525         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
526                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
527                                          lpni_hashlist) {
528                         if (net != lp->lpni_net)
529                                 continue;
530
531                         if (lp->lpni_rtr_refcount == 0)
532                                 continue;
533
534                         lpni_nid = lp->lpni_nid;
535
536                         lnet_net_unlock(LNET_LOCK_EX);
537                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
538                         lnet_net_lock(LNET_LOCK_EX);
539                 }
540         }
541 }
542
543 void
544 lnet_peer_tables_cleanup(struct lnet_net *net)
545 {
546         int                             i;
547         struct lnet_peer_table          *ptable;
548
549         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
550         /* If just deleting the peers for a NI, get rid of any routes these
551          * peers are gateways for. */
552         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
553                 lnet_net_lock(LNET_LOCK_EX);
554                 lnet_peer_table_del_rtrs_locked(net, ptable);
555                 lnet_net_unlock(LNET_LOCK_EX);
556         }
557
558         /* Start the cleanup process */
559         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
560                 lnet_net_lock(LNET_LOCK_EX);
561                 lnet_peer_table_cleanup_locked(net, ptable);
562                 lnet_net_unlock(LNET_LOCK_EX);
563         }
564
565         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
566                 lnet_peer_ni_finalize_wait(ptable);
567 }
568
569 static struct lnet_peer_ni *
570 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
571 {
572         struct list_head        *peers;
573         struct lnet_peer_ni     *lp;
574
575         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
576
577         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
578         list_for_each_entry(lp, peers, lpni_hashlist) {
579                 if (lp->lpni_nid == nid) {
580                         lnet_peer_ni_addref_locked(lp);
581                         return lp;
582                 }
583         }
584
585         return NULL;
586 }
587
588 struct lnet_peer_ni *
589 lnet_find_peer_ni_locked(lnet_nid_t nid)
590 {
591         struct lnet_peer_ni *lpni;
592         struct lnet_peer_table *ptable;
593         int cpt;
594
595         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
596
597         ptable = the_lnet.ln_peer_tables[cpt];
598         lpni = lnet_get_peer_ni_locked(ptable, nid);
599
600         return lpni;
601 }
602
603 struct lnet_peer_ni *
604 lnet_get_peer_ni_idx_locked(int idx, struct lnet_peer_net **lpn,
605                             struct lnet_peer **lp)
606 {
607         struct lnet_peer_table  *ptable;
608         struct lnet_peer_ni     *lpni;
609         int                     lncpt;
610         int                     cpt;
611
612         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
613
614         for (cpt = 0; cpt < lncpt; cpt++) {
615                 ptable = the_lnet.ln_peer_tables[cpt];
616                 if (ptable->pt_peer_nnids > idx)
617                         break;
618                 idx -= ptable->pt_peer_nnids;
619         }
620         if (cpt >= lncpt)
621                 return NULL;
622
623         list_for_each_entry((*lp), &ptable->pt_peer_list, lp_peer_list) {
624                 if ((*lp)->lp_nnis <= idx) {
625                         idx -= (*lp)->lp_nnis;
626                         continue;
627                 }
628                 list_for_each_entry((*lpn), &((*lp)->lp_peer_nets),
629                                     lpn_peer_nets) {
630                         list_for_each_entry(lpni, &((*lpn)->lpn_peer_nis),
631                                             lpni_peer_nis) {
632                                 if (idx-- == 0)
633                                         return lpni;
634                         }
635                 }
636         }
637
638         return NULL;
639 }
640
641 struct lnet_peer_ni *
642 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
643                              struct lnet_peer_net *peer_net,
644                              struct lnet_peer_ni *prev)
645 {
646         struct lnet_peer_ni *lpni;
647         struct lnet_peer_net *net = peer_net;
648
649         if (!prev) {
650                 if (!net) {
651                         if (list_empty(&peer->lp_peer_nets))
652                                 return NULL;
653
654                         net = list_entry(peer->lp_peer_nets.next,
655                                          struct lnet_peer_net,
656                                          lpn_peer_nets);
657                 }
658                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
659                                   lpni_peer_nis);
660
661                 return lpni;
662         }
663
664         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
665                 /*
666                  * if you reached the end of the peer ni list and the peer
667                  * net is specified then there are no more peer nis in that
668                  * net.
669                  */
670                 if (net)
671                         return NULL;
672
673                 /*
674                  * we reached the end of this net ni list. move to the
675                  * next net
676                  */
677                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
678                     &peer->lp_peer_nets)
679                         /* no more nets and no more NIs. */
680                         return NULL;
681
682                 /* get the next net */
683                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
684                                  struct lnet_peer_net,
685                                  lpn_peer_nets);
686                 /* get the ni on it */
687                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
688                                   lpni_peer_nis);
689
690                 return lpni;
691         }
692
693         /* there are more nis left */
694         lpni = list_entry(prev->lpni_peer_nis.next,
695                           struct lnet_peer_ni, lpni_peer_nis);
696
697         return lpni;
698 }
699
700 /*
701  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
702  * this is a preferred point-to-point path. Call with lnet_net_lock in
703  * shared mmode.
704  */
705 bool
706 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
707 {
708         int i;
709
710         if (lpni->lpni_pref_nnids == 0)
711                 return false;
712         if (lpni->lpni_pref_nnids == 1)
713                 return lpni->lpni_pref.nid == nid;
714         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
715                 if (lpni->lpni_pref.nids[i] == nid)
716                         return true;
717         }
718         return false;
719 }
720
721 /*
722  * Set a single ni as preferred, provided no preferred ni is already
723  * defined. Only to be used for non-multi-rail peer_ni.
724  */
725 int
726 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
727 {
728         int rc = 0;
729
730         spin_lock(&lpni->lpni_lock);
731         if (nid == LNET_NID_ANY) {
732                 rc = -EINVAL;
733         } else if (lpni->lpni_pref_nnids > 0) {
734                 rc = -EPERM;
735         } else if (lpni->lpni_pref_nnids == 0) {
736                 lpni->lpni_pref.nid = nid;
737                 lpni->lpni_pref_nnids = 1;
738                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
739         }
740         spin_unlock(&lpni->lpni_lock);
741
742         CDEBUG(D_NET, "peer %s nid %s: %d\n",
743                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
744         return rc;
745 }
746
747 /*
748  * Clear the preferred NID from a non-multi-rail peer_ni, provided
749  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
750  */
751 int
752 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
753 {
754         int rc = 0;
755
756         spin_lock(&lpni->lpni_lock);
757         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
758                 lpni->lpni_pref_nnids = 0;
759                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
760         } else if (lpni->lpni_pref_nnids == 0) {
761                 rc = -ENOENT;
762         } else {
763                 rc = -EPERM;
764         }
765         spin_unlock(&lpni->lpni_lock);
766
767         CDEBUG(D_NET, "peer %s: %d\n",
768                libcfs_nid2str(lpni->lpni_nid), rc);
769         return rc;
770 }
771
772 /*
773  * Clear the preferred NIDs from a non-multi-rail peer.
774  */
775 void
776 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
777 {
778         struct lnet_peer_ni *lpni = NULL;
779
780         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
781                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
782 }
783
784 int
785 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
786 {
787         lnet_nid_t *nids = NULL;
788         lnet_nid_t *oldnids = NULL;
789         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
790         int size;
791         int i;
792         int rc = 0;
793
794         if (nid == LNET_NID_ANY) {
795                 rc = -EINVAL;
796                 goto out;
797         }
798
799         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
800                 rc = -EEXIST;
801                 goto out;
802         }
803
804         /* A non-MR node may have only one preferred NI per peer_ni */
805         if (lpni->lpni_pref_nnids > 0) {
806                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
807                         rc = -EPERM;
808                         goto out;
809                 }
810         }
811
812         if (lpni->lpni_pref_nnids != 0) {
813                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
814                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
815                 if (!nids) {
816                         rc = -ENOMEM;
817                         goto out;
818                 }
819                 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
820                         if (lpni->lpni_pref.nids[i] == nid) {
821                                 LIBCFS_FREE(nids, size);
822                                 rc = -EEXIST;
823                                 goto out;
824                         }
825                         nids[i] = lpni->lpni_pref.nids[i];
826                 }
827                 nids[i] = nid;
828         }
829
830         lnet_net_lock(LNET_LOCK_EX);
831         spin_lock(&lpni->lpni_lock);
832         if (lpni->lpni_pref_nnids == 0) {
833                 lpni->lpni_pref.nid = nid;
834         } else {
835                 oldnids = lpni->lpni_pref.nids;
836                 lpni->lpni_pref.nids = nids;
837         }
838         lpni->lpni_pref_nnids++;
839         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
840         spin_unlock(&lpni->lpni_lock);
841         lnet_net_unlock(LNET_LOCK_EX);
842
843         if (oldnids) {
844                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
845                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
846         }
847 out:
848         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
849                 spin_lock(&lpni->lpni_lock);
850                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
851                 spin_unlock(&lpni->lpni_lock);
852         }
853         CDEBUG(D_NET, "peer %s nid %s: %d\n",
854                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
855         return rc;
856 }
857
858 int
859 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
860 {
861         lnet_nid_t *nids = NULL;
862         lnet_nid_t *oldnids = NULL;
863         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
864         int size;
865         int i, j;
866         int rc = 0;
867
868         if (lpni->lpni_pref_nnids == 0) {
869                 rc = -ENOENT;
870                 goto out;
871         }
872
873         if (lpni->lpni_pref_nnids == 1) {
874                 if (lpni->lpni_pref.nid != nid) {
875                         rc = -ENOENT;
876                         goto out;
877                 }
878         } else if (lpni->lpni_pref_nnids == 2) {
879                 if (lpni->lpni_pref.nids[0] != nid &&
880                     lpni->lpni_pref.nids[1] != nid) {
881                         rc = -ENOENT;
882                         goto out;
883                 }
884         } else {
885                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
886                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
887                 if (!nids) {
888                         rc = -ENOMEM;
889                         goto out;
890                 }
891                 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
892                         if (lpni->lpni_pref.nids[i] != nid)
893                                 continue;
894                         nids[j++] = lpni->lpni_pref.nids[i];
895                 }
896                 /* Check if we actually removed a nid. */
897                 if (j == lpni->lpni_pref_nnids) {
898                         LIBCFS_FREE(nids, size);
899                         rc = -ENOENT;
900                         goto out;
901                 }
902         }
903
904         lnet_net_lock(LNET_LOCK_EX);
905         spin_lock(&lpni->lpni_lock);
906         if (lpni->lpni_pref_nnids == 1) {
907                 lpni->lpni_pref.nid = LNET_NID_ANY;
908         } else if (lpni->lpni_pref_nnids == 2) {
909                 oldnids = lpni->lpni_pref.nids;
910                 if (oldnids[0] == nid)
911                         lpni->lpni_pref.nid = oldnids[1];
912                 else
913                         lpni->lpni_pref.nid = oldnids[2];
914         } else {
915                 oldnids = lpni->lpni_pref.nids;
916                 lpni->lpni_pref.nids = nids;
917         }
918         lpni->lpni_pref_nnids--;
919         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
920         spin_unlock(&lpni->lpni_lock);
921         lnet_net_unlock(LNET_LOCK_EX);
922
923         if (oldnids) {
924                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
925                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
926         }
927 out:
928         CDEBUG(D_NET, "peer %s nid %s: %d\n",
929                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
930         return rc;
931 }
932
933 lnet_nid_t
934 lnet_peer_primary_nid(lnet_nid_t nid)
935 {
936         struct lnet_peer_ni *lpni;
937         lnet_nid_t primary_nid = nid;
938         int cpt;
939
940         cpt = lnet_net_lock_current();
941         lpni = lnet_find_peer_ni_locked(nid);
942         if (lpni) {
943                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
944                 lnet_peer_ni_decref_locked(lpni);
945         }
946         lnet_net_unlock(cpt);
947
948         return primary_nid;
949 }
950
951 lnet_nid_t
952 LNetPrimaryNID(lnet_nid_t nid)
953 {
954         struct lnet_peer_ni *lpni;
955         lnet_nid_t primary_nid = nid;
956         int rc = 0;
957         int cpt;
958
959         cpt = lnet_net_lock_current();
960         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
961         if (IS_ERR(lpni)) {
962                 rc = PTR_ERR(lpni);
963                 goto out_unlock;
964         }
965         primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
966         lnet_peer_ni_decref_locked(lpni);
967 out_unlock:
968         lnet_net_unlock(cpt);
969
970         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
971                libcfs_nid2str(primary_nid), rc);
972         return primary_nid;
973 }
974 EXPORT_SYMBOL(LNetPrimaryNID);
975
976 struct lnet_peer_net *
977 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
978 {
979         struct lnet_peer_net *peer_net;
980         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
981                 if (peer_net->lpn_net_id == net_id)
982                         return peer_net;
983         }
984         return NULL;
985 }
986
987 /*
988  * Attach a peer_ni to a peer_net and peer. This function assumes
989  * peer_ni is not already attached to the peer_net/peer. The peer_ni
990  * may be attached to a different peer, in which case it will be
991  * properly detached first. The whole operation is done atomically.
992  *
993  * Always returns 0.  This is the last function called from functions
994  * that do return an int, so returning 0 here allows the compiler to
995  * do a tail call.
996  */
997 static int
998 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
999                                 struct lnet_peer_net *lpn,
1000                                 struct lnet_peer_ni *lpni,
1001                                 unsigned flags)
1002 {
1003         struct lnet_peer_table *ptable;
1004
1005         /* Install the new peer_ni */
1006         lnet_net_lock(LNET_LOCK_EX);
1007         /* Add peer_ni to global peer table hash, if necessary. */
1008         if (list_empty(&lpni->lpni_hashlist)) {
1009                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1010
1011                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1012                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1013                 ptable->pt_version++;
1014                 ptable->pt_number++;
1015                 /* This is the 1st refcount on lpni. */
1016                 atomic_inc(&lpni->lpni_refcount);
1017         }
1018
1019         /* Detach the peer_ni from an existing peer, if necessary. */
1020         if (lpni->lpni_peer_net) {
1021                 LASSERT(lpni->lpni_peer_net != lpn);
1022                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1023                 lnet_peer_detach_peer_ni_locked(lpni);
1024                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1025                 lpni->lpni_peer_net = NULL;
1026         }
1027
1028         /* Add peer_ni to peer_net */
1029         lpni->lpni_peer_net = lpn;
1030         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1031         lnet_peer_net_addref_locked(lpn);
1032
1033         /* Add peer_net to peer */
1034         if (!lpn->lpn_peer) {
1035                 lpn->lpn_peer = lp;
1036                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1037                 lnet_peer_addref_locked(lp);
1038         }
1039
1040         /* Add peer to global peer list, if necessary */
1041         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1042         if (list_empty(&lp->lp_peer_list)) {
1043                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1044                 ptable->pt_peers++;
1045         }
1046
1047
1048         /* Update peer state */
1049         spin_lock(&lp->lp_lock);
1050         if (flags & LNET_PEER_CONFIGURED) {
1051                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1052                         lp->lp_state |= LNET_PEER_CONFIGURED;
1053         }
1054         if (flags & LNET_PEER_MULTI_RAIL) {
1055                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1056                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1057                         lnet_peer_clr_non_mr_pref_nids(lp);
1058                 }
1059         }
1060         spin_unlock(&lp->lp_lock);
1061
1062         lp->lp_nnis++;
1063         the_lnet.ln_peer_tables[lp->lp_cpt]->pt_peer_nnids++;
1064         lnet_net_unlock(LNET_LOCK_EX);
1065
1066         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1067                libcfs_nid2str(lp->lp_primary_nid),
1068                libcfs_nid2str(lpni->lpni_nid), flags);
1069
1070         return 0;
1071 }
1072
1073 /*
1074  * Create a new peer, with nid as its primary nid.
1075  *
1076  * Call with the lnet_api_mutex held.
1077  */
1078 static int
1079 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1080 {
1081         struct lnet_peer *lp;
1082         struct lnet_peer_net *lpn;
1083         struct lnet_peer_ni *lpni;
1084         int rc = 0;
1085
1086         LASSERT(nid != LNET_NID_ANY);
1087
1088         /*
1089          * No need for the lnet_net_lock here, because the
1090          * lnet_api_mutex is held.
1091          */
1092         lpni = lnet_find_peer_ni_locked(nid);
1093         if (lpni) {
1094                 /* A peer with this NID already exists. */
1095                 lp = lpni->lpni_peer_net->lpn_peer;
1096                 lnet_peer_ni_decref_locked(lpni);
1097                 /*
1098                  * This is an error if the peer was configured and the
1099                  * primary NID differs or an attempt is made to change
1100                  * the Multi-Rail flag. Otherwise the assumption is
1101                  * that an existing peer is being modified.
1102                  */
1103                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1104                         if (lp->lp_primary_nid != nid)
1105                                 rc = -EEXIST;
1106                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1107                                 rc = -EPERM;
1108                         goto out;
1109                 }
1110                 /* Delete and recreate as a configured peer. */
1111                 lnet_peer_del(lp);
1112         }
1113
1114         /* Create peer, peer_net, and peer_ni. */
1115         rc = -ENOMEM;
1116         lp = lnet_peer_alloc(nid);
1117         if (!lp)
1118                 goto out;
1119         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1120         if (!lpn)
1121                 goto out_free_lp;
1122         lpni = lnet_peer_ni_alloc(nid);
1123         if (!lpni)
1124                 goto out_free_lpn;
1125
1126         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1127
1128 out_free_lpn:
1129         LIBCFS_FREE(lpn, sizeof(*lpn));
1130 out_free_lp:
1131         LIBCFS_FREE(lp, sizeof(*lp));
1132 out:
1133         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1134                libcfs_nid2str(nid), flags, rc);
1135         return rc;
1136 }
1137
1138 /*
1139  * Add a NID to a peer. Call with ln_api_mutex held.
1140  *
1141  * Error codes:
1142  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1143  *  -EEXIST:   The NID was configured by DLC for a different peer.
1144  *  -ENOMEM:   Out of memory.
1145  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1146  *             non-multi-rail peer.
1147  */
1148 static int
1149 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1150 {
1151         struct lnet_peer_net *lpn;
1152         struct lnet_peer_ni *lpni;
1153         int rc = 0;
1154
1155         LASSERT(lp);
1156         LASSERT(nid != LNET_NID_ANY);
1157
1158         /* A configured peer can only be updated through configuration. */
1159         if (!(flags & LNET_PEER_CONFIGURED)) {
1160                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1161                         rc = -EPERM;
1162                         goto out;
1163                 }
1164         }
1165
1166         /*
1167          * The MULTI_RAIL flag can be set but not cleared, because
1168          * that would leave the peer struct in an invalid state.
1169          */
1170         if (flags & LNET_PEER_MULTI_RAIL) {
1171                 spin_lock(&lp->lp_lock);
1172                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1173                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1174                         lnet_peer_clr_non_mr_pref_nids(lp);
1175                 }
1176                 spin_unlock(&lp->lp_lock);
1177         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1178                 rc = -EPERM;
1179                 goto out;
1180         }
1181
1182         lpni = lnet_find_peer_ni_locked(nid);
1183         if (lpni) {
1184                 /*
1185                  * A peer_ni already exists. This is only a problem if
1186                  * it is not connected to this peer and was configured
1187                  * by DLC.
1188                  */
1189                 lnet_peer_ni_decref_locked(lpni);
1190                 if (lpni->lpni_peer_net->lpn_peer == lp)
1191                         goto out;
1192                 if (lnet_peer_ni_is_configured(lpni)) {
1193                         rc = -EEXIST;
1194                         goto out;
1195                 }
1196                 /* If this is the primary NID, destroy the peer. */
1197                 if (lnet_peer_ni_is_primary(lpni)) {
1198                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1199                         lpni = lnet_peer_ni_alloc(nid);
1200                         if (!lpni) {
1201                                 rc = -ENOMEM;
1202                                 goto out;
1203                         }
1204                 }
1205         } else {
1206                 lpni = lnet_peer_ni_alloc(nid);
1207                 if (!lpni) {
1208                         rc = -ENOMEM;
1209                         goto out;
1210                 }
1211         }
1212
1213         /*
1214          * Get the peer_net. Check that we're not adding a second
1215          * peer_ni on a peer_net of a non-multi-rail peer.
1216          */
1217         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1218         if (!lpn) {
1219                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1220                 if (!lpn) {
1221                         rc = -ENOMEM;
1222                         goto out_free_lpni;
1223                 }
1224         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1225                 rc = -ENOTUNIQ;
1226                 goto out_free_lpni;
1227         }
1228
1229         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1230
1231 out_free_lpni:
1232         /* If the peer_ni was allocated above its peer_net pointer is NULL */
1233         if (!lpni->lpni_peer_net)
1234                 LIBCFS_FREE(lpni, sizeof(*lpni));
1235 out:
1236         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1237                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1238                flags, rc);
1239         return rc;
1240 }
1241
1242 /*
1243  * lpni creation initiated due to traffic either sending or receiving.
1244  */
1245 static int
1246 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1247 {
1248         struct lnet_peer *lp;
1249         struct lnet_peer_net *lpn;
1250         struct lnet_peer_ni *lpni;
1251         unsigned flags = 0;
1252         int rc = 0;
1253
1254         if (nid == LNET_NID_ANY) {
1255                 rc = -EINVAL;
1256                 goto out;
1257         }
1258
1259         /* lnet_net_lock is not needed here because ln_api_lock is held */
1260         lpni = lnet_find_peer_ni_locked(nid);
1261         if (lpni) {
1262                 /*
1263                  * We must have raced with another thread. Since we
1264                  * know next to nothing about a peer_ni created by
1265                  * traffic, we just assume everything is ok and
1266                  * return.
1267                  */
1268                 lnet_peer_ni_decref_locked(lpni);
1269                 goto out;
1270         }
1271
1272         /* Create peer, peer_net, and peer_ni. */
1273         rc = -ENOMEM;
1274         lp = lnet_peer_alloc(nid);
1275         if (!lp)
1276                 goto out;
1277         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1278         if (!lpn)
1279                 goto out_free_lp;
1280         lpni = lnet_peer_ni_alloc(nid);
1281         if (!lpni)
1282                 goto out_free_lpn;
1283         if (pref != LNET_NID_ANY)
1284                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1285
1286         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1287
1288 out_free_lpn:
1289         LIBCFS_FREE(lpn, sizeof(*lpn));
1290 out_free_lp:
1291         LIBCFS_FREE(lp, sizeof(*lp));
1292 out:
1293         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1294         return rc;
1295 }
1296
1297 /*
1298  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1299  *
1300  * This API handles the following combinations:
1301  *   Create a peer with its primary NI if only the prim_nid is provided
1302  *   Add a NID to a peer identified by the prim_nid. The peer identified
1303  *   by the prim_nid must already exist.
1304  *   The peer being created may be non-MR.
1305  *
1306  * The caller must hold ln_api_mutex. This prevents the peer from
1307  * being created/modified/deleted by a different thread.
1308  */
1309 int
1310 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1311 {
1312         struct lnet_peer *lp = NULL;
1313         struct lnet_peer_ni *lpni;
1314         unsigned flags;
1315
1316         /* The prim_nid must always be specified */
1317         if (prim_nid == LNET_NID_ANY)
1318                 return -EINVAL;
1319
1320         flags = LNET_PEER_CONFIGURED;
1321         if (mr)
1322                 flags |= LNET_PEER_MULTI_RAIL;
1323
1324         /*
1325          * If nid isn't specified, we must create a new peer with
1326          * prim_nid as its primary nid.
1327          */
1328         if (nid == LNET_NID_ANY)
1329                 return lnet_peer_add(prim_nid, flags);
1330
1331         /* Look up the prim_nid, which must exist. */
1332         lpni = lnet_find_peer_ni_locked(prim_nid);
1333         if (!lpni)
1334                 return -ENOENT;
1335         lnet_peer_ni_decref_locked(lpni);
1336         lp = lpni->lpni_peer_net->lpn_peer;
1337
1338         /* Peer must have been configured. */
1339         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1340                 CDEBUG(D_NET, "peer %s was not configured\n",
1341                        libcfs_nid2str(prim_nid));
1342                 return -ENOENT;
1343         }
1344
1345         /* Primary NID must match */
1346         if (lp->lp_primary_nid != prim_nid) {
1347                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1348                        libcfs_nid2str(prim_nid),
1349                        libcfs_nid2str(lp->lp_primary_nid));
1350                 return -ENODEV;
1351         }
1352
1353         /* Multi-Rail flag must match. */
1354         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1355                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1356                        libcfs_nid2str(prim_nid));
1357                 return -EPERM;
1358         }
1359
1360         return lnet_peer_add_nid(lp, nid, flags);
1361 }
1362
1363 /*
1364  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1365  *
1366  * This API handles the following combinations:
1367  *   Delete a NI from a peer if both prim_nid and nid are provided.
1368  *   Delete a peer if only prim_nid is provided.
1369  *   Delete a peer if its primary nid is provided.
1370  *
1371  * The caller must hold ln_api_mutex. This prevents the peer from
1372  * being modified/deleted by a different thread.
1373  */
1374 int
1375 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1376 {
1377         struct lnet_peer *lp;
1378         struct lnet_peer_ni *lpni;
1379         unsigned flags;
1380
1381         if (prim_nid == LNET_NID_ANY)
1382                 return -EINVAL;
1383
1384         lpni = lnet_find_peer_ni_locked(prim_nid);
1385         if (!lpni)
1386                 return -ENOENT;
1387         lnet_peer_ni_decref_locked(lpni);
1388         lp = lpni->lpni_peer_net->lpn_peer;
1389
1390         if (prim_nid != lp->lp_primary_nid) {
1391                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1392                        libcfs_nid2str(prim_nid),
1393                        libcfs_nid2str(lp->lp_primary_nid));
1394                 return -ENODEV;
1395         }
1396
1397         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1398                 return lnet_peer_del(lp);
1399
1400         flags = LNET_PEER_CONFIGURED;
1401         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1402                 flags |= LNET_PEER_MULTI_RAIL;
1403
1404         return lnet_peer_del_nid(lp, nid, flags);
1405 }
1406
1407 void
1408 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1409 {
1410         struct lnet_peer_table *ptable;
1411         struct lnet_peer_net *lpn;
1412
1413         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1414
1415         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1416         LASSERT(lpni->lpni_rtr_refcount == 0);
1417         LASSERT(list_empty(&lpni->lpni_txq));
1418         LASSERT(lpni->lpni_txqnob == 0);
1419
1420         lpn = lpni->lpni_peer_net;
1421         lpni->lpni_peer_net = NULL;
1422         lpni->lpni_net = NULL;
1423
1424         /* remove the peer ni from the zombie list */
1425         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1426         spin_lock(&ptable->pt_zombie_lock);
1427         list_del_init(&lpni->lpni_hashlist);
1428         ptable->pt_zombies--;
1429         spin_unlock(&ptable->pt_zombie_lock);
1430
1431         if (lpni->lpni_pref_nnids > 1) {
1432                 LIBCFS_FREE(lpni->lpni_pref.nids,
1433                         sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1434         }
1435         LIBCFS_FREE(lpni, sizeof(*lpni));
1436
1437         lnet_peer_net_decref_locked(lpn);
1438 }
1439
1440 struct lnet_peer_ni *
1441 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1442 {
1443         struct lnet_peer_ni *lpni = NULL;
1444         int rc;
1445
1446         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1447                 return ERR_PTR(-ESHUTDOWN);
1448
1449         /*
1450          * find if a peer_ni already exists.
1451          * If so then just return that.
1452          */
1453         lpni = lnet_find_peer_ni_locked(nid);
1454         if (lpni)
1455                 return lpni;
1456
1457         lnet_net_unlock(cpt);
1458
1459         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1460         if (rc) {
1461                 lpni = ERR_PTR(rc);
1462                 goto out_net_relock;
1463         }
1464
1465         lpni = lnet_find_peer_ni_locked(nid);
1466         LASSERT(lpni);
1467
1468 out_net_relock:
1469         lnet_net_lock(cpt);
1470
1471         return lpni;
1472 }
1473
1474 /*
1475  * Get a peer_ni for the given nid, create it if necessary. Takes a
1476  * hold on the peer_ni.
1477  */
1478 struct lnet_peer_ni *
1479 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1480 {
1481         struct lnet_peer_ni *lpni = NULL;
1482         int rc;
1483
1484         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1485                 return ERR_PTR(-ESHUTDOWN);
1486
1487         /*
1488          * find if a peer_ni already exists.
1489          * If so then just return that.
1490          */
1491         lpni = lnet_find_peer_ni_locked(nid);
1492         if (lpni)
1493                 return lpni;
1494
1495         /*
1496          * Slow path:
1497          * use the lnet_api_mutex to serialize the creation of the peer_ni
1498          * and the creation/deletion of the local ni/net. When a local ni is
1499          * created, if there exists a set of peer_nis on that network,
1500          * they need to be traversed and updated. When a local NI is
1501          * deleted, which could result in a network being deleted, then
1502          * all peer nis on that network need to be removed as well.
1503          *
1504          * Creation through traffic should also be serialized with
1505          * creation through DLC.
1506          */
1507         lnet_net_unlock(cpt);
1508         mutex_lock(&the_lnet.ln_api_mutex);
1509         /*
1510          * Shutdown is only set under the ln_api_lock, so a single
1511          * check here is sufficent.
1512          */
1513         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1514                 lpni = ERR_PTR(-ESHUTDOWN);
1515                 goto out_mutex_unlock;
1516         }
1517
1518         rc = lnet_peer_ni_traffic_add(nid, pref);
1519         if (rc) {
1520                 lpni = ERR_PTR(rc);
1521                 goto out_mutex_unlock;
1522         }
1523
1524         lpni = lnet_find_peer_ni_locked(nid);
1525         LASSERT(lpni);
1526
1527 out_mutex_unlock:
1528         mutex_unlock(&the_lnet.ln_api_mutex);
1529         lnet_net_lock(cpt);
1530
1531         /* Lock has been dropped, check again for shutdown. */
1532         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1533                 if (!IS_ERR(lpni))
1534                         lnet_peer_ni_decref_locked(lpni);
1535                 lpni = ERR_PTR(-ESHUTDOWN);
1536         }
1537
1538         return lpni;
1539 }
1540
1541 /*
1542  * Peer Discovery
1543  */
1544
1545 bool
1546 lnet_peer_is_uptodate(struct lnet_peer *lp)
1547 {
1548         bool rc;
1549
1550         spin_lock(&lp->lp_lock);
1551         if (lp->lp_state & LNET_PEER_DISCOVERING) {
1552                 rc = false;
1553         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1554                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1555                         rc = true;
1556                 else
1557                         rc = false;
1558         } else if (lp->lp_state & LNET_PEER_UNDISCOVERED) {
1559                 if (lnet_peer_discovery_enabled)
1560                         rc = false;
1561                 else
1562                         rc = true;
1563         } else {
1564                 rc = false;
1565         }
1566         spin_unlock(&lp->lp_lock);
1567
1568         return rc;
1569 }
1570
1571 /*
1572  * Queue a peer for the attention of the discovery thread.  Call with
1573  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1574  * -EALREADY if the peer was already queued.
1575  */
1576 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1577 {
1578         int rc;
1579
1580         spin_lock(&lp->lp_lock);
1581         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1582                 lp->lp_state |= LNET_PEER_DISCOVERING;
1583         if (!(lp->lp_state & LNET_PEER_QUEUED)) {
1584                 lp->lp_state |= LNET_PEER_QUEUED;
1585                 spin_unlock(&lp->lp_lock);
1586                 lnet_peer_addref_locked(lp);
1587                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1588                 wake_up(&the_lnet.ln_dc_waitq);
1589                 rc = 0;
1590         } else {
1591                 spin_unlock(&lp->lp_lock);
1592                 rc = -EALREADY;
1593         }
1594
1595         return rc;
1596 }
1597
1598 /*
1599  * Discovery of a peer is complete. Wake all waiters on the peer.
1600  * Call with lnet_net_lock/EX held.
1601  */
1602 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1603 {
1604         spin_lock(&lp->lp_lock);
1605         lp->lp_state &= ~LNET_PEER_QUEUED;
1606         spin_unlock(&lp->lp_lock);
1607         list_del_init(&lp->lp_dc_list);
1608         wake_up_all(&lp->lp_dc_waitq);
1609         lnet_peer_decref_locked(lp);
1610 }
1611
1612 /*
1613  * Peer discovery slow path. The ln_api_mutex is held on entry, and
1614  * dropped/retaken within this function. An lnet_peer_ni is passed in
1615  * because discovery could tear down an lnet_peer.
1616  */
1617 int
1618 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt)
1619 {
1620         DEFINE_WAIT(wait);
1621         struct lnet_peer *lp;
1622         int rc = 0;
1623
1624 again:
1625         lnet_net_unlock(cpt);
1626         lnet_net_lock(LNET_LOCK_EX);
1627
1628         /* We're willing to be interrupted. */
1629         for (;;) {
1630                 lp = lpni->lpni_peer_net->lpn_peer;
1631                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
1632                 if (signal_pending(current))
1633                         break;
1634                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
1635                         break;
1636                 if (lnet_peer_is_uptodate(lp))
1637                         break;
1638                 lnet_peer_queue_for_discovery(lp);
1639                 lnet_peer_addref_locked(lp);
1640                 lnet_net_unlock(LNET_LOCK_EX);
1641                 schedule();
1642                 finish_wait(&lp->lp_dc_waitq, &wait);
1643                 lnet_net_lock(LNET_LOCK_EX);
1644                 lnet_peer_decref_locked(lp);
1645                 /* Do not use lp beyond this point. */
1646         }
1647         finish_wait(&lp->lp_dc_waitq, &wait);
1648
1649         lnet_net_unlock(LNET_LOCK_EX);
1650         lnet_net_lock(cpt);
1651
1652         if (signal_pending(current))
1653                 rc = -EINTR;
1654         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
1655                 rc = -ESHUTDOWN;
1656         else if (!lnet_peer_is_uptodate(lp))
1657                 goto again;
1658
1659         return rc;
1660 }
1661
1662 /*
1663  * Event handler for the discovery EQ.
1664  *
1665  * Called with lnet_res_lock(cpt) held. The cpt is the
1666  * lnet_cpt_of_cookie() of the md handle cookie.
1667  */
1668 static void lnet_discovery_event_handler(lnet_event_t *event)
1669 {
1670         wake_up(&the_lnet.ln_dc_waitq);
1671 }
1672
1673 /*
1674  * Wait for work to be queued or some other change that must be
1675  * attended to. Returns non-zero if the discovery thread should shut
1676  * down.
1677  */
1678 static int lnet_peer_discovery_wait_for_work(void)
1679 {
1680         int cpt;
1681         int rc = 0;
1682
1683         DEFINE_WAIT(wait);
1684
1685         cpt = lnet_net_lock_current();
1686         for (;;) {
1687                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
1688                                 TASK_INTERRUPTIBLE);
1689                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1690                         break;
1691                 if (lnet_push_target_resize_needed())
1692                         break;
1693                 if (!list_empty(&the_lnet.ln_dc_request))
1694                         break;
1695                 lnet_net_unlock(cpt);
1696                 schedule();
1697                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
1698                 cpt = lnet_net_lock_current();
1699         }
1700         finish_wait(&the_lnet.ln_dc_waitq, &wait);
1701
1702         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1703                 rc = -ESHUTDOWN;
1704
1705         lnet_net_unlock(cpt);
1706
1707         CDEBUG(D_NET, "woken: %d\n", rc);
1708
1709         return rc;
1710 }
1711
1712 /* The discovery thread. */
1713 static int lnet_peer_discovery(void *arg)
1714 {
1715         struct lnet_peer *lp;
1716
1717         CDEBUG(D_NET, "started\n");
1718         cfs_block_allsigs();
1719
1720         for (;;) {
1721                 if (lnet_peer_discovery_wait_for_work())
1722                         break;
1723
1724                 if (lnet_push_target_resize_needed())
1725                         lnet_push_target_resize();
1726
1727                 lnet_net_lock(LNET_LOCK_EX);
1728                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1729                         break;
1730                 while (!list_empty(&the_lnet.ln_dc_request)) {
1731                         lp = list_first_entry(&the_lnet.ln_dc_request,
1732                                               struct lnet_peer, lp_dc_list);
1733                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
1734                         lnet_net_unlock(LNET_LOCK_EX);
1735
1736                         /* Just tag and release for now. */
1737                         spin_lock(&lp->lp_lock);
1738                         if (lnet_peer_discovery_enabled) {
1739                                 lp->lp_state |= (LNET_PEER_DISCOVERED |
1740                                                  LNET_PEER_NIDS_UPTODATE);
1741                                 lp->lp_state &= ~(LNET_PEER_UNDISCOVERED |
1742                                                   LNET_PEER_DISCOVERING);
1743                         } else {
1744                                 lp->lp_state |= LNET_PEER_UNDISCOVERED;
1745                                 lp->lp_state &= ~(LNET_PEER_DISCOVERED |
1746                                                   LNET_PEER_NIDS_UPTODATE |
1747                                                   LNET_PEER_DISCOVERING);
1748                         }
1749                         spin_unlock(&lp->lp_lock);
1750
1751                         lnet_net_lock(LNET_LOCK_EX);
1752                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1753                                 lnet_peer_discovery_complete(lp);
1754                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1755                                 break;
1756                 }
1757                 lnet_net_unlock(LNET_LOCK_EX);
1758         }
1759
1760         CDEBUG(D_NET, "stopping\n");
1761         /*
1762          * Clean up before telling lnet_peer_discovery_stop() that
1763          * we're done. Use wake_up() below to somewhat reduce the
1764          * size of the thundering herd if there are multiple threads
1765          * waiting on discovery of a single peer.
1766          */
1767         LNetEQFree(the_lnet.ln_dc_eqh);
1768         LNetInvalidateHandle(&the_lnet.ln_dc_eqh);
1769
1770         lnet_net_lock(LNET_LOCK_EX);
1771         list_for_each_entry(lp, &the_lnet.ln_dc_request, lp_dc_list) {
1772                 spin_lock(&lp->lp_lock);
1773                 lp->lp_state |= LNET_PEER_UNDISCOVERED;
1774                 lp->lp_state &= ~(LNET_PEER_DISCOVERED |
1775                                   LNET_PEER_DISCOVERING |
1776                                   LNET_PEER_NIDS_UPTODATE);
1777                 spin_unlock(&lp->lp_lock);
1778                 lnet_peer_discovery_complete(lp);
1779         }
1780         list_for_each_entry(lp, &the_lnet.ln_dc_working, lp_dc_list) {
1781                 spin_lock(&lp->lp_lock);
1782                 lp->lp_state |= LNET_PEER_UNDISCOVERED;
1783                 lp->lp_state &= ~(LNET_PEER_DISCOVERED |
1784                                   LNET_PEER_DISCOVERING |
1785                                   LNET_PEER_NIDS_UPTODATE);
1786                 spin_unlock(&lp->lp_lock);
1787                 lnet_peer_discovery_complete(lp);
1788         }
1789         lnet_net_unlock(LNET_LOCK_EX);
1790
1791         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
1792         wake_up(&the_lnet.ln_dc_waitq);
1793
1794         CDEBUG(D_NET, "stopped\n");
1795
1796         return 0;
1797 }
1798
1799 /* ln_api_mutex is held on entry. */
1800 int lnet_peer_discovery_start(void)
1801 {
1802         struct task_struct *task;
1803         int rc;
1804
1805         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
1806                 return -EALREADY;
1807
1808         INIT_LIST_HEAD(&the_lnet.ln_dc_request);
1809         INIT_LIST_HEAD(&the_lnet.ln_dc_working);
1810         init_waitqueue_head(&the_lnet.ln_dc_waitq);
1811
1812         rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
1813         if (rc != 0) {
1814                 CERROR("Can't allocate discovery EQ: %d\n", rc);
1815                 return rc;
1816         }
1817
1818         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
1819         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
1820         if (IS_ERR(task)) {
1821                 rc = PTR_ERR(task);
1822                 CERROR("Can't start peer discovery thread: %d\n", rc);
1823
1824                 LNetEQFree(the_lnet.ln_dc_eqh);
1825                 LNetInvalidateHandle(&the_lnet.ln_dc_eqh);
1826
1827                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
1828         }
1829
1830         return rc;
1831 }
1832
1833 /* ln_api_mutex is held on entry. */
1834 void lnet_peer_discovery_stop(void)
1835 {
1836         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
1837                 return;
1838
1839         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
1840         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
1841         wake_up(&the_lnet.ln_dc_waitq);
1842
1843         wait_event(the_lnet.ln_dc_waitq,
1844                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
1845
1846         LASSERT(list_empty(&the_lnet.ln_dc_request));
1847         LASSERT(list_empty(&the_lnet.ln_dc_working));
1848 }
1849
1850 /* Debugging */
1851
1852 void
1853 lnet_debug_peer(lnet_nid_t nid)
1854 {
1855         char                    *aliveness = "NA";
1856         struct lnet_peer_ni     *lp;
1857         int                     cpt;
1858
1859         cpt = lnet_cpt_of_nid(nid, NULL);
1860         lnet_net_lock(cpt);
1861
1862         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1863         if (IS_ERR(lp)) {
1864                 lnet_net_unlock(cpt);
1865                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
1866                 return;
1867         }
1868
1869         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
1870                 aliveness = lp->lpni_alive ? "up" : "down";
1871
1872         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
1873                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
1874                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
1875                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
1876                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
1877
1878         lnet_peer_ni_decref_locked(lp);
1879
1880         lnet_net_unlock(cpt);
1881 }
1882
1883 /* Gathering information for userspace. */
1884
1885 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
1886                           char aliveness[LNET_MAX_STR_LEN],
1887                           __u32 *cpt_iter, __u32 *refcount,
1888                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
1889                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
1890                           __u32 *peer_tx_qnob)
1891 {
1892         struct lnet_peer_table          *peer_table;
1893         struct lnet_peer_ni             *lp;
1894         int                             j;
1895         int                             lncpt;
1896         bool                            found = false;
1897
1898         /* get the number of CPTs */
1899         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
1900
1901         /* if the cpt number to be examined is >= the number of cpts in
1902          * the system then indicate that there are no more cpts to examin
1903          */
1904         if (*cpt_iter >= lncpt)
1905                 return -ENOENT;
1906
1907         /* get the current table */
1908         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
1909         /* if the ptable is NULL then there are no more cpts to examine */
1910         if (peer_table == NULL)
1911                 return -ENOENT;
1912
1913         lnet_net_lock(*cpt_iter);
1914
1915         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
1916                 struct list_head *peers = &peer_table->pt_hash[j];
1917
1918                 list_for_each_entry(lp, peers, lpni_hashlist) {
1919                         if (peer_index-- > 0)
1920                                 continue;
1921
1922                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
1923                         if (lnet_isrouter(lp) ||
1924                                 lnet_peer_aliveness_enabled(lp))
1925                                 snprintf(aliveness, LNET_MAX_STR_LEN,
1926                                          lp->lpni_alive ? "up" : "down");
1927
1928                         *nid = lp->lpni_nid;
1929                         *refcount = atomic_read(&lp->lpni_refcount);
1930                         *ni_peer_tx_credits =
1931                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
1932                         *peer_tx_credits = lp->lpni_txcredits;
1933                         *peer_rtr_credits = lp->lpni_rtrcredits;
1934                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
1935                         *peer_tx_qnob = lp->lpni_txqnob;
1936
1937                         found = true;
1938                 }
1939
1940         }
1941         lnet_net_unlock(*cpt_iter);
1942
1943         *cpt_iter = lncpt;
1944
1945         return found ? 0 : -ENOENT;
1946 }
1947
1948 /* ln_api_mutex is held, which keeps the peer list stable */
1949 int lnet_get_peer_info(__u32 idx, lnet_nid_t *primary_nid, lnet_nid_t *nid,
1950                        bool *mr, struct lnet_peer_ni_credit_info *peer_ni_info,
1951                        struct lnet_ioctl_element_stats *peer_ni_stats)
1952 {
1953         struct lnet_peer_ni *lpni = NULL;
1954         struct lnet_peer_net *lpn = NULL;
1955         struct lnet_peer *lp = NULL;
1956
1957         lpni = lnet_get_peer_ni_idx_locked(idx, &lpn, &lp);
1958
1959         if (!lpni)
1960                 return -ENOENT;
1961
1962         *primary_nid = lp->lp_primary_nid;
1963         *mr = lnet_peer_is_multi_rail(lp);
1964         *nid = lpni->lpni_nid;
1965         snprintf(peer_ni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
1966         if (lnet_isrouter(lpni) ||
1967                 lnet_peer_aliveness_enabled(lpni))
1968                 snprintf(peer_ni_info->cr_aliveness, LNET_MAX_STR_LEN,
1969                          lpni->lpni_alive ? "up" : "down");
1970
1971         peer_ni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
1972         peer_ni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
1973                 lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
1974         peer_ni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
1975         peer_ni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
1976         peer_ni_info->cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
1977         peer_ni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
1978         peer_ni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
1979
1980         peer_ni_stats->send_count = atomic_read(&lpni->lpni_stats.send_count);
1981         peer_ni_stats->recv_count = atomic_read(&lpni->lpni_stats.recv_count);
1982         peer_ni_stats->drop_count = atomic_read(&lpni->lpni_stats.drop_count);
1983
1984         return 0;
1985 }