Whamcloud - gitweb
LU-9480 lnet: add discovery thread
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2014, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <lnet/lib-lnet.h>
38 #include <uapi/linux/lnet/lnet-dlc.h>
39
40 static void
41 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
42 {
43         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
44                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
45                 lnet_peer_ni_decref_locked(lpni);
46         }
47 }
48
49 void
50 lnet_peer_net_added(struct lnet_net *net)
51 {
52         struct lnet_peer_ni *lpni, *tmp;
53
54         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
55                                  lpni_on_remote_peer_ni_list) {
56
57                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
58                         lpni->lpni_net = net;
59
60                         spin_lock(&lpni->lpni_lock);
61                         lpni->lpni_txcredits =
62                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
63                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
64                         lpni->lpni_rtrcredits =
65                                 lnet_peer_buffer_credits(lpni->lpni_net);
66                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
67                         spin_unlock(&lpni->lpni_lock);
68
69                         lnet_peer_remove_from_remote_list(lpni);
70                 }
71         }
72 }
73
74 static void
75 lnet_peer_tables_destroy(void)
76 {
77         struct lnet_peer_table  *ptable;
78         struct list_head        *hash;
79         int                     i;
80         int                     j;
81
82         if (!the_lnet.ln_peer_tables)
83                 return;
84
85         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
86                 hash = ptable->pt_hash;
87                 if (!hash) /* not intialized */
88                         break;
89
90                 LASSERT(list_empty(&ptable->pt_zombie_list));
91
92                 ptable->pt_hash = NULL;
93                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
94                         LASSERT(list_empty(&hash[j]));
95
96                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
97         }
98
99         cfs_percpt_free(the_lnet.ln_peer_tables);
100         the_lnet.ln_peer_tables = NULL;
101 }
102
103 int
104 lnet_peer_tables_create(void)
105 {
106         struct lnet_peer_table  *ptable;
107         struct list_head        *hash;
108         int                     i;
109         int                     j;
110
111         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
112                                                    sizeof(*ptable));
113         if (the_lnet.ln_peer_tables == NULL) {
114                 CERROR("Failed to allocate cpu-partition peer tables\n");
115                 return -ENOMEM;
116         }
117
118         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
119                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
120                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
121                 if (hash == NULL) {
122                         CERROR("Failed to create peer hash table\n");
123                         lnet_peer_tables_destroy();
124                         return -ENOMEM;
125                 }
126
127                 spin_lock_init(&ptable->pt_zombie_lock);
128                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
129
130                 INIT_LIST_HEAD(&ptable->pt_peer_list);
131
132                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
133                         INIT_LIST_HEAD(&hash[j]);
134                 ptable->pt_hash = hash; /* sign of initialization */
135         }
136
137         return 0;
138 }
139
140 static struct lnet_peer_ni *
141 lnet_peer_ni_alloc(lnet_nid_t nid)
142 {
143         struct lnet_peer_ni *lpni;
144         struct lnet_net *net;
145         int cpt;
146
147         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
148
149         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
150         if (!lpni)
151                 return NULL;
152
153         INIT_LIST_HEAD(&lpni->lpni_txq);
154         INIT_LIST_HEAD(&lpni->lpni_rtrq);
155         INIT_LIST_HEAD(&lpni->lpni_routes);
156         INIT_LIST_HEAD(&lpni->lpni_hashlist);
157         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
158         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
159
160         spin_lock_init(&lpni->lpni_lock);
161
162         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
163         lpni->lpni_last_alive = cfs_time_current(); /* assumes alive */
164         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
165         lpni->lpni_nid = nid;
166         lpni->lpni_cpt = cpt;
167         lnet_set_peer_ni_health_locked(lpni, true);
168
169         net = lnet_get_net_locked(LNET_NIDNET(nid));
170         lpni->lpni_net = net;
171         if (net) {
172                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
173                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
174                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
175                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
176         } else {
177                 /*
178                  * This peer_ni is not on a local network, so we
179                  * cannot add the credits here. In case the net is
180                  * added later, add the peer_ni to the remote peer ni
181                  * list so it can be easily found and revisited.
182                  */
183                 /* FIXME: per-net implementation instead? */
184                 atomic_inc(&lpni->lpni_refcount);
185                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
186                               &the_lnet.ln_remote_peer_ni_list);
187         }
188
189         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
190
191         return lpni;
192 }
193
194 static struct lnet_peer_net *
195 lnet_peer_net_alloc(__u32 net_id)
196 {
197         struct lnet_peer_net *lpn;
198
199         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
200         if (!lpn)
201                 return NULL;
202
203         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
204         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
205         lpn->lpn_net_id = net_id;
206
207         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
208
209         return lpn;
210 }
211
212 void
213 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
214 {
215         struct lnet_peer *lp;
216
217         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
218
219         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
220         LASSERT(list_empty(&lpn->lpn_peer_nis));
221         LASSERT(list_empty(&lpn->lpn_peer_nets));
222         lp = lpn->lpn_peer;
223         lpn->lpn_peer = NULL;
224         LIBCFS_FREE(lpn, sizeof(*lpn));
225
226         lnet_peer_decref_locked(lp);
227 }
228
229 static struct lnet_peer *
230 lnet_peer_alloc(lnet_nid_t nid)
231 {
232         struct lnet_peer *lp;
233
234         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
235         if (!lp)
236                 return NULL;
237
238         INIT_LIST_HEAD(&lp->lp_peer_list);
239         INIT_LIST_HEAD(&lp->lp_peer_nets);
240         INIT_LIST_HEAD(&lp->lp_dc_list);
241         init_waitqueue_head(&lp->lp_dc_waitq);
242         spin_lock_init(&lp->lp_lock);
243         lp->lp_primary_nid = nid;
244         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
245
246         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
247
248         return lp;
249 }
250
251 void
252 lnet_destroy_peer_locked(struct lnet_peer *lp)
253 {
254         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
255
256         LASSERT(atomic_read(&lp->lp_refcount) == 0);
257         LASSERT(list_empty(&lp->lp_peer_nets));
258         LASSERT(list_empty(&lp->lp_peer_list));
259
260         LIBCFS_FREE(lp, sizeof(*lp));
261 }
262
263 /*
264  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
265  * that peer_net, detach the peer_net from the peer.
266  *
267  * Call with lnet_net_lock/EX held
268  */
269 static void
270 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
271 {
272         struct lnet_peer_table *ptable;
273         struct lnet_peer_net *lpn;
274         struct lnet_peer *lp;
275
276         /*
277          * Belts and suspenders: gracefully handle teardown of a
278          * partially connected peer_ni.
279          */
280         lpn = lpni->lpni_peer_net;
281
282         list_del_init(&lpni->lpni_peer_nis);
283         /*
284          * If there are no lpni's left, we detach lpn from
285          * lp_peer_nets, so it cannot be found anymore.
286          */
287         if (list_empty(&lpn->lpn_peer_nis))
288                 list_del_init(&lpn->lpn_peer_nets);
289
290         /* Update peer NID count. */
291         lp = lpn->lpn_peer;
292         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
293         lp->lp_nnis--;
294         ptable->pt_peer_nnids--;
295
296         /*
297          * If there are no more peer nets, make the peer unfindable
298          * via the peer_tables.
299          */
300         if (list_empty(&lp->lp_peer_nets)) {
301                 list_del_init(&lp->lp_peer_list);
302                 ptable->pt_peers--;
303         }
304         CDEBUG(D_NET, "peer %s NID %s\n",
305                 libcfs_nid2str(lp->lp_primary_nid),
306                 libcfs_nid2str(lpni->lpni_nid));
307 }
308
309 /* called with lnet_net_lock LNET_LOCK_EX held */
310 static int
311 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
312 {
313         struct lnet_peer_table *ptable = NULL;
314
315         /* don't remove a peer_ni if it's also a gateway */
316         if (lpni->lpni_rtr_refcount > 0) {
317                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
318                        libcfs_nid2str(lpni->lpni_nid));
319                 return -EBUSY;
320         }
321
322         lnet_peer_remove_from_remote_list(lpni);
323
324         /* remove peer ni from the hash list. */
325         list_del_init(&lpni->lpni_hashlist);
326
327         /* decrement the ref count on the peer table */
328         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
329         LASSERT(ptable->pt_number > 0);
330         ptable->pt_number--;
331
332         /*
333          * The peer_ni can no longer be found with a lookup. But there
334          * can be current users, so keep track of it on the zombie
335          * list until the reference count has gone to zero.
336          *
337          * The last reference may be lost in a place where the
338          * lnet_net_lock locks only a single cpt, and that cpt may not
339          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
340          * has its own lock.
341          */
342         spin_lock(&ptable->pt_zombie_lock);
343         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
344         ptable->pt_zombies++;
345         spin_unlock(&ptable->pt_zombie_lock);
346
347         /* no need to keep this peer_ni on the hierarchy anymore */
348         lnet_peer_detach_peer_ni_locked(lpni);
349
350         /* remove hashlist reference on peer_ni */
351         lnet_peer_ni_decref_locked(lpni);
352
353         return 0;
354 }
355
356 void lnet_peer_uninit(void)
357 {
358         struct lnet_peer_ni *lpni, *tmp;
359
360         lnet_net_lock(LNET_LOCK_EX);
361
362         /* remove all peer_nis from the remote peer and the hash list */
363         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
364                                  lpni_on_remote_peer_ni_list)
365                 lnet_peer_ni_del_locked(lpni);
366
367         lnet_peer_tables_destroy();
368
369         lnet_net_unlock(LNET_LOCK_EX);
370 }
371
372 static int
373 lnet_peer_del_locked(struct lnet_peer *peer)
374 {
375         struct lnet_peer_ni *lpni = NULL, *lpni2;
376         int rc = 0, rc2 = 0;
377
378         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
379
380         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
381         while (lpni != NULL) {
382                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
383                 rc = lnet_peer_ni_del_locked(lpni);
384                 if (rc != 0)
385                         rc2 = rc;
386                 lpni = lpni2;
387         }
388
389         return rc2;
390 }
391
392 static int
393 lnet_peer_del(struct lnet_peer *peer)
394 {
395         lnet_net_lock(LNET_LOCK_EX);
396         lnet_peer_del_locked(peer);
397         lnet_net_unlock(LNET_LOCK_EX);
398
399         return 0;
400 }
401
402 /*
403  * Delete a NID from a peer. Call with ln_api_mutex held.
404  *
405  * Error codes:
406  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
407  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
408  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
409  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
410  */
411 static int
412 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
413 {
414         struct lnet_peer_ni *lpni;
415         lnet_nid_t primary_nid = lp->lp_primary_nid;
416         int rc = 0;
417
418         if (!(flags & LNET_PEER_CONFIGURED)) {
419                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
420                         rc = -EPERM;
421                         goto out;
422                 }
423         }
424         lpni = lnet_find_peer_ni_locked(nid);
425         if (!lpni) {
426                 rc = -ENOENT;
427                 goto out;
428         }
429         lnet_peer_ni_decref_locked(lpni);
430         if (lp != lpni->lpni_peer_net->lpn_peer) {
431                 rc = -ECHILD;
432                 goto out;
433         }
434
435         /*
436          * This function only allows deletion of the primary NID if it
437          * is the only NID.
438          */
439         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
440                 rc = -EBUSY;
441                 goto out;
442         }
443
444         lnet_net_lock(LNET_LOCK_EX);
445         lnet_peer_ni_del_locked(lpni);
446         lnet_net_unlock(LNET_LOCK_EX);
447
448 out:
449         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
450                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
451
452         return rc;
453 }
454
455 static void
456 lnet_peer_table_cleanup_locked(struct lnet_net *net,
457                                struct lnet_peer_table *ptable)
458 {
459         int                      i;
460         struct lnet_peer_ni     *next;
461         struct lnet_peer_ni     *lpni;
462         struct lnet_peer        *peer;
463
464         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
465                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
466                                          lpni_hashlist) {
467                         if (net != NULL && net != lpni->lpni_net)
468                                 continue;
469
470                         peer = lpni->lpni_peer_net->lpn_peer;
471                         if (peer->lp_primary_nid != lpni->lpni_nid) {
472                                 lnet_peer_ni_del_locked(lpni);
473                                 continue;
474                         }
475                         /*
476                          * Removing the primary NID implies removing
477                          * the entire peer. Advance next beyond any
478                          * peer_ni that belongs to the same peer.
479                          */
480                         list_for_each_entry_from(next, &ptable->pt_hash[i],
481                                                  lpni_hashlist) {
482                                 if (next->lpni_peer_net->lpn_peer != peer)
483                                         break;
484                         }
485                         lnet_peer_del_locked(peer);
486                 }
487         }
488 }
489
490 static void
491 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
492 {
493         int     i = 3;
494
495         spin_lock(&ptable->pt_zombie_lock);
496         while (ptable->pt_zombies) {
497                 spin_unlock(&ptable->pt_zombie_lock);
498
499                 if (is_power_of_2(i)) {
500                         CDEBUG(D_WARNING,
501                                "Waiting for %d zombies on peer table\n",
502                                ptable->pt_zombies);
503                 }
504                 set_current_state(TASK_UNINTERRUPTIBLE);
505                 schedule_timeout(cfs_time_seconds(1) >> 1);
506                 spin_lock(&ptable->pt_zombie_lock);
507         }
508         spin_unlock(&ptable->pt_zombie_lock);
509 }
510
511 static void
512 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
513                                 struct lnet_peer_table *ptable)
514 {
515         struct lnet_peer_ni     *lp;
516         struct lnet_peer_ni     *tmp;
517         lnet_nid_t              lpni_nid;
518         int                     i;
519
520         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
521                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
522                                          lpni_hashlist) {
523                         if (net != lp->lpni_net)
524                                 continue;
525
526                         if (lp->lpni_rtr_refcount == 0)
527                                 continue;
528
529                         lpni_nid = lp->lpni_nid;
530
531                         lnet_net_unlock(LNET_LOCK_EX);
532                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
533                         lnet_net_lock(LNET_LOCK_EX);
534                 }
535         }
536 }
537
538 void
539 lnet_peer_tables_cleanup(struct lnet_net *net)
540 {
541         int                             i;
542         struct lnet_peer_table          *ptable;
543
544         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
545         /* If just deleting the peers for a NI, get rid of any routes these
546          * peers are gateways for. */
547         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
548                 lnet_net_lock(LNET_LOCK_EX);
549                 lnet_peer_table_del_rtrs_locked(net, ptable);
550                 lnet_net_unlock(LNET_LOCK_EX);
551         }
552
553         /* Start the cleanup process */
554         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
555                 lnet_net_lock(LNET_LOCK_EX);
556                 lnet_peer_table_cleanup_locked(net, ptable);
557                 lnet_net_unlock(LNET_LOCK_EX);
558         }
559
560         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
561                 lnet_peer_ni_finalize_wait(ptable);
562 }
563
564 static struct lnet_peer_ni *
565 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
566 {
567         struct list_head        *peers;
568         struct lnet_peer_ni     *lp;
569
570         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
571
572         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
573         list_for_each_entry(lp, peers, lpni_hashlist) {
574                 if (lp->lpni_nid == nid) {
575                         lnet_peer_ni_addref_locked(lp);
576                         return lp;
577                 }
578         }
579
580         return NULL;
581 }
582
583 struct lnet_peer_ni *
584 lnet_find_peer_ni_locked(lnet_nid_t nid)
585 {
586         struct lnet_peer_ni *lpni;
587         struct lnet_peer_table *ptable;
588         int cpt;
589
590         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
591
592         ptable = the_lnet.ln_peer_tables[cpt];
593         lpni = lnet_get_peer_ni_locked(ptable, nid);
594
595         return lpni;
596 }
597
598 struct lnet_peer_ni *
599 lnet_get_peer_ni_idx_locked(int idx, struct lnet_peer_net **lpn,
600                             struct lnet_peer **lp)
601 {
602         struct lnet_peer_table  *ptable;
603         struct lnet_peer_ni     *lpni;
604         int                     lncpt;
605         int                     cpt;
606
607         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
608
609         for (cpt = 0; cpt < lncpt; cpt++) {
610                 ptable = the_lnet.ln_peer_tables[cpt];
611                 if (ptable->pt_peer_nnids > idx)
612                         break;
613                 idx -= ptable->pt_peer_nnids;
614         }
615         if (cpt >= lncpt)
616                 return NULL;
617
618         list_for_each_entry((*lp), &ptable->pt_peer_list, lp_peer_list) {
619                 if ((*lp)->lp_nnis <= idx) {
620                         idx -= (*lp)->lp_nnis;
621                         continue;
622                 }
623                 list_for_each_entry((*lpn), &((*lp)->lp_peer_nets),
624                                     lpn_peer_nets) {
625                         list_for_each_entry(lpni, &((*lpn)->lpn_peer_nis),
626                                             lpni_peer_nis) {
627                                 if (idx-- == 0)
628                                         return lpni;
629                         }
630                 }
631         }
632
633         return NULL;
634 }
635
636 struct lnet_peer_ni *
637 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
638                              struct lnet_peer_net *peer_net,
639                              struct lnet_peer_ni *prev)
640 {
641         struct lnet_peer_ni *lpni;
642         struct lnet_peer_net *net = peer_net;
643
644         if (!prev) {
645                 if (!net) {
646                         if (list_empty(&peer->lp_peer_nets))
647                                 return NULL;
648
649                         net = list_entry(peer->lp_peer_nets.next,
650                                          struct lnet_peer_net,
651                                          lpn_peer_nets);
652                 }
653                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
654                                   lpni_peer_nis);
655
656                 return lpni;
657         }
658
659         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
660                 /*
661                  * if you reached the end of the peer ni list and the peer
662                  * net is specified then there are no more peer nis in that
663                  * net.
664                  */
665                 if (net)
666                         return NULL;
667
668                 /*
669                  * we reached the end of this net ni list. move to the
670                  * next net
671                  */
672                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
673                     &peer->lp_peer_nets)
674                         /* no more nets and no more NIs. */
675                         return NULL;
676
677                 /* get the next net */
678                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
679                                  struct lnet_peer_net,
680                                  lpn_peer_nets);
681                 /* get the ni on it */
682                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
683                                   lpni_peer_nis);
684
685                 return lpni;
686         }
687
688         /* there are more nis left */
689         lpni = list_entry(prev->lpni_peer_nis.next,
690                           struct lnet_peer_ni, lpni_peer_nis);
691
692         return lpni;
693 }
694
695 /*
696  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
697  * this is a preferred point-to-point path. Call with lnet_net_lock in
698  * shared mmode.
699  */
700 bool
701 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
702 {
703         int i;
704
705         if (lpni->lpni_pref_nnids == 0)
706                 return false;
707         if (lpni->lpni_pref_nnids == 1)
708                 return lpni->lpni_pref.nid == nid;
709         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
710                 if (lpni->lpni_pref.nids[i] == nid)
711                         return true;
712         }
713         return false;
714 }
715
716 /*
717  * Set a single ni as preferred, provided no preferred ni is already
718  * defined. Only to be used for non-multi-rail peer_ni.
719  */
720 int
721 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
722 {
723         int rc = 0;
724
725         spin_lock(&lpni->lpni_lock);
726         if (nid == LNET_NID_ANY) {
727                 rc = -EINVAL;
728         } else if (lpni->lpni_pref_nnids > 0) {
729                 rc = -EPERM;
730         } else if (lpni->lpni_pref_nnids == 0) {
731                 lpni->lpni_pref.nid = nid;
732                 lpni->lpni_pref_nnids = 1;
733                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
734         }
735         spin_unlock(&lpni->lpni_lock);
736
737         CDEBUG(D_NET, "peer %s nid %s: %d\n",
738                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
739         return rc;
740 }
741
742 /*
743  * Clear the preferred NID from a non-multi-rail peer_ni, provided
744  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
745  */
746 int
747 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
748 {
749         int rc = 0;
750
751         spin_lock(&lpni->lpni_lock);
752         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
753                 lpni->lpni_pref_nnids = 0;
754                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
755         } else if (lpni->lpni_pref_nnids == 0) {
756                 rc = -ENOENT;
757         } else {
758                 rc = -EPERM;
759         }
760         spin_unlock(&lpni->lpni_lock);
761
762         CDEBUG(D_NET, "peer %s: %d\n",
763                libcfs_nid2str(lpni->lpni_nid), rc);
764         return rc;
765 }
766
767 /*
768  * Clear the preferred NIDs from a non-multi-rail peer.
769  */
770 void
771 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
772 {
773         struct lnet_peer_ni *lpni = NULL;
774
775         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
776                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
777 }
778
779 int
780 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
781 {
782         lnet_nid_t *nids = NULL;
783         lnet_nid_t *oldnids = NULL;
784         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
785         int size;
786         int i;
787         int rc = 0;
788
789         if (nid == LNET_NID_ANY) {
790                 rc = -EINVAL;
791                 goto out;
792         }
793
794         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
795                 rc = -EEXIST;
796                 goto out;
797         }
798
799         /* A non-MR node may have only one preferred NI per peer_ni */
800         if (lpni->lpni_pref_nnids > 0) {
801                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
802                         rc = -EPERM;
803                         goto out;
804                 }
805         }
806
807         if (lpni->lpni_pref_nnids != 0) {
808                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
809                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
810                 if (!nids) {
811                         rc = -ENOMEM;
812                         goto out;
813                 }
814                 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
815                         if (lpni->lpni_pref.nids[i] == nid) {
816                                 LIBCFS_FREE(nids, size);
817                                 rc = -EEXIST;
818                                 goto out;
819                         }
820                         nids[i] = lpni->lpni_pref.nids[i];
821                 }
822                 nids[i] = nid;
823         }
824
825         lnet_net_lock(LNET_LOCK_EX);
826         spin_lock(&lpni->lpni_lock);
827         if (lpni->lpni_pref_nnids == 0) {
828                 lpni->lpni_pref.nid = nid;
829         } else {
830                 oldnids = lpni->lpni_pref.nids;
831                 lpni->lpni_pref.nids = nids;
832         }
833         lpni->lpni_pref_nnids++;
834         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
835         spin_unlock(&lpni->lpni_lock);
836         lnet_net_unlock(LNET_LOCK_EX);
837
838         if (oldnids) {
839                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
840                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
841         }
842 out:
843         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
844                 spin_lock(&lpni->lpni_lock);
845                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
846                 spin_unlock(&lpni->lpni_lock);
847         }
848         CDEBUG(D_NET, "peer %s nid %s: %d\n",
849                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
850         return rc;
851 }
852
853 int
854 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
855 {
856         lnet_nid_t *nids = NULL;
857         lnet_nid_t *oldnids = NULL;
858         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
859         int size;
860         int i, j;
861         int rc = 0;
862
863         if (lpni->lpni_pref_nnids == 0) {
864                 rc = -ENOENT;
865                 goto out;
866         }
867
868         if (lpni->lpni_pref_nnids == 1) {
869                 if (lpni->lpni_pref.nid != nid) {
870                         rc = -ENOENT;
871                         goto out;
872                 }
873         } else if (lpni->lpni_pref_nnids == 2) {
874                 if (lpni->lpni_pref.nids[0] != nid &&
875                     lpni->lpni_pref.nids[1] != nid) {
876                         rc = -ENOENT;
877                         goto out;
878                 }
879         } else {
880                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
881                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
882                 if (!nids) {
883                         rc = -ENOMEM;
884                         goto out;
885                 }
886                 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
887                         if (lpni->lpni_pref.nids[i] != nid)
888                                 continue;
889                         nids[j++] = lpni->lpni_pref.nids[i];
890                 }
891                 /* Check if we actually removed a nid. */
892                 if (j == lpni->lpni_pref_nnids) {
893                         LIBCFS_FREE(nids, size);
894                         rc = -ENOENT;
895                         goto out;
896                 }
897         }
898
899         lnet_net_lock(LNET_LOCK_EX);
900         spin_lock(&lpni->lpni_lock);
901         if (lpni->lpni_pref_nnids == 1) {
902                 lpni->lpni_pref.nid = LNET_NID_ANY;
903         } else if (lpni->lpni_pref_nnids == 2) {
904                 oldnids = lpni->lpni_pref.nids;
905                 if (oldnids[0] == nid)
906                         lpni->lpni_pref.nid = oldnids[1];
907                 else
908                         lpni->lpni_pref.nid = oldnids[2];
909         } else {
910                 oldnids = lpni->lpni_pref.nids;
911                 lpni->lpni_pref.nids = nids;
912         }
913         lpni->lpni_pref_nnids--;
914         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
915         spin_unlock(&lpni->lpni_lock);
916         lnet_net_unlock(LNET_LOCK_EX);
917
918         if (oldnids) {
919                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
920                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
921         }
922 out:
923         CDEBUG(D_NET, "peer %s nid %s: %d\n",
924                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
925         return rc;
926 }
927
928 lnet_nid_t
929 lnet_peer_primary_nid_locked(lnet_nid_t nid)
930 {
931         struct lnet_peer_ni *lpni;
932         lnet_nid_t primary_nid = nid;
933
934         lpni = lnet_find_peer_ni_locked(nid);
935         if (lpni) {
936                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
937                 lnet_peer_ni_decref_locked(lpni);
938         }
939
940         return primary_nid;
941 }
942
943 lnet_nid_t
944 LNetPrimaryNID(lnet_nid_t nid)
945 {
946         struct lnet_peer_ni *lpni;
947         lnet_nid_t primary_nid = nid;
948         int rc = 0;
949         int cpt;
950
951         cpt = lnet_net_lock_current();
952         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
953         if (IS_ERR(lpni)) {
954                 rc = PTR_ERR(lpni);
955                 goto out_unlock;
956         }
957         primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
958         lnet_peer_ni_decref_locked(lpni);
959 out_unlock:
960         lnet_net_unlock(cpt);
961
962         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
963                libcfs_nid2str(primary_nid), rc);
964         return primary_nid;
965 }
966 EXPORT_SYMBOL(LNetPrimaryNID);
967
968 struct lnet_peer_net *
969 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
970 {
971         struct lnet_peer_net *peer_net;
972         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
973                 if (peer_net->lpn_net_id == net_id)
974                         return peer_net;
975         }
976         return NULL;
977 }
978
979 /*
980  * Attach a peer_ni to a peer_net and peer. This function assumes
981  * peer_ni is not already attached to the peer_net/peer. The peer_ni
982  * may be attached to a different peer, in which case it will be
983  * properly detached first. The whole operation is done atomically.
984  *
985  * Always returns 0.  This is the last function called from functions
986  * that do return an int, so returning 0 here allows the compiler to
987  * do a tail call.
988  */
989 static int
990 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
991                                 struct lnet_peer_net *lpn,
992                                 struct lnet_peer_ni *lpni,
993                                 unsigned flags)
994 {
995         struct lnet_peer_table *ptable;
996
997         /* Install the new peer_ni */
998         lnet_net_lock(LNET_LOCK_EX);
999         /* Add peer_ni to global peer table hash, if necessary. */
1000         if (list_empty(&lpni->lpni_hashlist)) {
1001                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1002
1003                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1004                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1005                 ptable->pt_version++;
1006                 ptable->pt_number++;
1007                 /* This is the 1st refcount on lpni. */
1008                 atomic_inc(&lpni->lpni_refcount);
1009         }
1010
1011         /* Detach the peer_ni from an existing peer, if necessary. */
1012         if (lpni->lpni_peer_net) {
1013                 LASSERT(lpni->lpni_peer_net != lpn);
1014                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1015                 lnet_peer_detach_peer_ni_locked(lpni);
1016                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1017                 lpni->lpni_peer_net = NULL;
1018         }
1019
1020         /* Add peer_ni to peer_net */
1021         lpni->lpni_peer_net = lpn;
1022         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1023         lnet_peer_net_addref_locked(lpn);
1024
1025         /* Add peer_net to peer */
1026         if (!lpn->lpn_peer) {
1027                 lpn->lpn_peer = lp;
1028                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1029                 lnet_peer_addref_locked(lp);
1030         }
1031
1032         /* Add peer to global peer list, if necessary */
1033         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1034         if (list_empty(&lp->lp_peer_list)) {
1035                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1036                 ptable->pt_peers++;
1037         }
1038
1039
1040         /* Update peer state */
1041         spin_lock(&lp->lp_lock);
1042         if (flags & LNET_PEER_CONFIGURED) {
1043                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1044                         lp->lp_state |= LNET_PEER_CONFIGURED;
1045         }
1046         if (flags & LNET_PEER_MULTI_RAIL) {
1047                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1048                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1049                         lnet_peer_clr_non_mr_pref_nids(lp);
1050                 }
1051         }
1052         spin_unlock(&lp->lp_lock);
1053
1054         lp->lp_nnis++;
1055         the_lnet.ln_peer_tables[lp->lp_cpt]->pt_peer_nnids++;
1056         lnet_net_unlock(LNET_LOCK_EX);
1057
1058         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1059                libcfs_nid2str(lp->lp_primary_nid),
1060                libcfs_nid2str(lpni->lpni_nid), flags);
1061
1062         return 0;
1063 }
1064
1065 /*
1066  * Create a new peer, with nid as its primary nid.
1067  *
1068  * Call with the lnet_api_mutex held.
1069  */
1070 static int
1071 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1072 {
1073         struct lnet_peer *lp;
1074         struct lnet_peer_net *lpn;
1075         struct lnet_peer_ni *lpni;
1076         int rc = 0;
1077
1078         LASSERT(nid != LNET_NID_ANY);
1079
1080         /*
1081          * No need for the lnet_net_lock here, because the
1082          * lnet_api_mutex is held.
1083          */
1084         lpni = lnet_find_peer_ni_locked(nid);
1085         if (lpni) {
1086                 /* A peer with this NID already exists. */
1087                 lp = lpni->lpni_peer_net->lpn_peer;
1088                 lnet_peer_ni_decref_locked(lpni);
1089                 /*
1090                  * This is an error if the peer was configured and the
1091                  * primary NID differs or an attempt is made to change
1092                  * the Multi-Rail flag. Otherwise the assumption is
1093                  * that an existing peer is being modified.
1094                  */
1095                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1096                         if (lp->lp_primary_nid != nid)
1097                                 rc = -EEXIST;
1098                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1099                                 rc = -EPERM;
1100                         goto out;
1101                 }
1102                 /* Delete and recreate as a configured peer. */
1103                 lnet_peer_del(lp);
1104         }
1105
1106         /* Create peer, peer_net, and peer_ni. */
1107         rc = -ENOMEM;
1108         lp = lnet_peer_alloc(nid);
1109         if (!lp)
1110                 goto out;
1111         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1112         if (!lpn)
1113                 goto out_free_lp;
1114         lpni = lnet_peer_ni_alloc(nid);
1115         if (!lpni)
1116                 goto out_free_lpn;
1117
1118         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1119
1120 out_free_lpn:
1121         LIBCFS_FREE(lpn, sizeof(*lpn));
1122 out_free_lp:
1123         LIBCFS_FREE(lp, sizeof(*lp));
1124 out:
1125         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1126                libcfs_nid2str(nid), flags, rc);
1127         return rc;
1128 }
1129
1130 /*
1131  * Add a NID to a peer. Call with ln_api_mutex held.
1132  *
1133  * Error codes:
1134  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1135  *  -EEXIST:   The NID was configured by DLC for a different peer.
1136  *  -ENOMEM:   Out of memory.
1137  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1138  *             non-multi-rail peer.
1139  */
1140 static int
1141 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1142 {
1143         struct lnet_peer_net *lpn;
1144         struct lnet_peer_ni *lpni;
1145         int rc = 0;
1146
1147         LASSERT(lp);
1148         LASSERT(nid != LNET_NID_ANY);
1149
1150         /* A configured peer can only be updated through configuration. */
1151         if (!(flags & LNET_PEER_CONFIGURED)) {
1152                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1153                         rc = -EPERM;
1154                         goto out;
1155                 }
1156         }
1157
1158         /*
1159          * The MULTI_RAIL flag can be set but not cleared, because
1160          * that would leave the peer struct in an invalid state.
1161          */
1162         if (flags & LNET_PEER_MULTI_RAIL) {
1163                 spin_lock(&lp->lp_lock);
1164                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1165                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1166                         lnet_peer_clr_non_mr_pref_nids(lp);
1167                 }
1168                 spin_unlock(&lp->lp_lock);
1169         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1170                 rc = -EPERM;
1171                 goto out;
1172         }
1173
1174         lpni = lnet_find_peer_ni_locked(nid);
1175         if (lpni) {
1176                 /*
1177                  * A peer_ni already exists. This is only a problem if
1178                  * it is not connected to this peer and was configured
1179                  * by DLC.
1180                  */
1181                 lnet_peer_ni_decref_locked(lpni);
1182                 if (lpni->lpni_peer_net->lpn_peer == lp)
1183                         goto out;
1184                 if (lnet_peer_ni_is_configured(lpni)) {
1185                         rc = -EEXIST;
1186                         goto out;
1187                 }
1188                 /* If this is the primary NID, destroy the peer. */
1189                 if (lnet_peer_ni_is_primary(lpni)) {
1190                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1191                         lpni = lnet_peer_ni_alloc(nid);
1192                         if (!lpni) {
1193                                 rc = -ENOMEM;
1194                                 goto out;
1195                         }
1196                 }
1197         } else {
1198                 lpni = lnet_peer_ni_alloc(nid);
1199                 if (!lpni) {
1200                         rc = -ENOMEM;
1201                         goto out;
1202                 }
1203         }
1204
1205         /*
1206          * Get the peer_net. Check that we're not adding a second
1207          * peer_ni on a peer_net of a non-multi-rail peer.
1208          */
1209         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1210         if (!lpn) {
1211                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1212                 if (!lpn) {
1213                         rc = -ENOMEM;
1214                         goto out_free_lpni;
1215                 }
1216         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1217                 rc = -ENOTUNIQ;
1218                 goto out_free_lpni;
1219         }
1220
1221         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1222
1223 out_free_lpni:
1224         /* If the peer_ni was allocated above its peer_net pointer is NULL */
1225         if (!lpni->lpni_peer_net)
1226                 LIBCFS_FREE(lpni, sizeof(*lpni));
1227 out:
1228         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1229                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1230                flags, rc);
1231         return rc;
1232 }
1233
1234 /*
1235  * lpni creation initiated due to traffic either sending or receiving.
1236  */
1237 static int
1238 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1239 {
1240         struct lnet_peer *lp;
1241         struct lnet_peer_net *lpn;
1242         struct lnet_peer_ni *lpni;
1243         unsigned flags = 0;
1244         int rc = 0;
1245
1246         if (nid == LNET_NID_ANY) {
1247                 rc = -EINVAL;
1248                 goto out;
1249         }
1250
1251         /* lnet_net_lock is not needed here because ln_api_lock is held */
1252         lpni = lnet_find_peer_ni_locked(nid);
1253         if (lpni) {
1254                 /*
1255                  * We must have raced with another thread. Since we
1256                  * know next to nothing about a peer_ni created by
1257                  * traffic, we just assume everything is ok and
1258                  * return.
1259                  */
1260                 lnet_peer_ni_decref_locked(lpni);
1261                 goto out;
1262         }
1263
1264         /* Create peer, peer_net, and peer_ni. */
1265         rc = -ENOMEM;
1266         lp = lnet_peer_alloc(nid);
1267         if (!lp)
1268                 goto out;
1269         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1270         if (!lpn)
1271                 goto out_free_lp;
1272         lpni = lnet_peer_ni_alloc(nid);
1273         if (!lpni)
1274                 goto out_free_lpn;
1275         if (pref != LNET_NID_ANY)
1276                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1277
1278         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1279
1280 out_free_lpn:
1281         LIBCFS_FREE(lpn, sizeof(*lpn));
1282 out_free_lp:
1283         LIBCFS_FREE(lp, sizeof(*lp));
1284 out:
1285         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1286         return rc;
1287 }
1288
1289 /*
1290  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1291  *
1292  * This API handles the following combinations:
1293  *   Create a peer with its primary NI if only the prim_nid is provided
1294  *   Add a NID to a peer identified by the prim_nid. The peer identified
1295  *   by the prim_nid must already exist.
1296  *   The peer being created may be non-MR.
1297  *
1298  * The caller must hold ln_api_mutex. This prevents the peer from
1299  * being created/modified/deleted by a different thread.
1300  */
1301 int
1302 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1303 {
1304         struct lnet_peer *lp = NULL;
1305         struct lnet_peer_ni *lpni;
1306         unsigned flags;
1307
1308         /* The prim_nid must always be specified */
1309         if (prim_nid == LNET_NID_ANY)
1310                 return -EINVAL;
1311
1312         flags = LNET_PEER_CONFIGURED;
1313         if (mr)
1314                 flags |= LNET_PEER_MULTI_RAIL;
1315
1316         /*
1317          * If nid isn't specified, we must create a new peer with
1318          * prim_nid as its primary nid.
1319          */
1320         if (nid == LNET_NID_ANY)
1321                 return lnet_peer_add(prim_nid, flags);
1322
1323         /* Look up the prim_nid, which must exist. */
1324         lpni = lnet_find_peer_ni_locked(prim_nid);
1325         if (!lpni)
1326                 return -ENOENT;
1327         lnet_peer_ni_decref_locked(lpni);
1328         lp = lpni->lpni_peer_net->lpn_peer;
1329
1330         /* Peer must have been configured. */
1331         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1332                 CDEBUG(D_NET, "peer %s was not configured\n",
1333                        libcfs_nid2str(prim_nid));
1334                 return -ENOENT;
1335         }
1336
1337         /* Primary NID must match */
1338         if (lp->lp_primary_nid != prim_nid) {
1339                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1340                        libcfs_nid2str(prim_nid),
1341                        libcfs_nid2str(lp->lp_primary_nid));
1342                 return -ENODEV;
1343         }
1344
1345         /* Multi-Rail flag must match. */
1346         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1347                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1348                        libcfs_nid2str(prim_nid));
1349                 return -EPERM;
1350         }
1351
1352         return lnet_peer_add_nid(lp, nid, flags);
1353 }
1354
1355 /*
1356  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1357  *
1358  * This API handles the following combinations:
1359  *   Delete a NI from a peer if both prim_nid and nid are provided.
1360  *   Delete a peer if only prim_nid is provided.
1361  *   Delete a peer if its primary nid is provided.
1362  *
1363  * The caller must hold ln_api_mutex. This prevents the peer from
1364  * being modified/deleted by a different thread.
1365  */
1366 int
1367 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1368 {
1369         struct lnet_peer *lp;
1370         struct lnet_peer_ni *lpni;
1371         unsigned flags;
1372
1373         if (prim_nid == LNET_NID_ANY)
1374                 return -EINVAL;
1375
1376         lpni = lnet_find_peer_ni_locked(prim_nid);
1377         if (!lpni)
1378                 return -ENOENT;
1379         lnet_peer_ni_decref_locked(lpni);
1380         lp = lpni->lpni_peer_net->lpn_peer;
1381
1382         if (prim_nid != lp->lp_primary_nid) {
1383                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1384                        libcfs_nid2str(prim_nid),
1385                        libcfs_nid2str(lp->lp_primary_nid));
1386                 return -ENODEV;
1387         }
1388
1389         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1390                 return lnet_peer_del(lp);
1391
1392         flags = LNET_PEER_CONFIGURED;
1393         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1394                 flags |= LNET_PEER_MULTI_RAIL;
1395
1396         return lnet_peer_del_nid(lp, nid, flags);
1397 }
1398
1399 void
1400 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1401 {
1402         struct lnet_peer_table *ptable;
1403         struct lnet_peer_net *lpn;
1404
1405         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1406
1407         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1408         LASSERT(lpni->lpni_rtr_refcount == 0);
1409         LASSERT(list_empty(&lpni->lpni_txq));
1410         LASSERT(lpni->lpni_txqnob == 0);
1411
1412         lpn = lpni->lpni_peer_net;
1413         lpni->lpni_peer_net = NULL;
1414         lpni->lpni_net = NULL;
1415
1416         /* remove the peer ni from the zombie list */
1417         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1418         spin_lock(&ptable->pt_zombie_lock);
1419         list_del_init(&lpni->lpni_hashlist);
1420         ptable->pt_zombies--;
1421         spin_unlock(&ptable->pt_zombie_lock);
1422
1423         if (lpni->lpni_pref_nnids > 1) {
1424                 LIBCFS_FREE(lpni->lpni_pref.nids,
1425                         sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1426         }
1427         LIBCFS_FREE(lpni, sizeof(*lpni));
1428
1429         lnet_peer_net_decref_locked(lpn);
1430 }
1431
1432 struct lnet_peer_ni *
1433 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1434 {
1435         struct lnet_peer_ni *lpni = NULL;
1436         int rc;
1437
1438         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1439                 return ERR_PTR(-ESHUTDOWN);
1440
1441         /*
1442          * find if a peer_ni already exists.
1443          * If so then just return that.
1444          */
1445         lpni = lnet_find_peer_ni_locked(nid);
1446         if (lpni)
1447                 return lpni;
1448
1449         lnet_net_unlock(cpt);
1450
1451         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1452         if (rc) {
1453                 lpni = ERR_PTR(rc);
1454                 goto out_net_relock;
1455         }
1456
1457         lpni = lnet_find_peer_ni_locked(nid);
1458         LASSERT(lpni);
1459
1460 out_net_relock:
1461         lnet_net_lock(cpt);
1462
1463         return lpni;
1464 }
1465
1466 /*
1467  * Get a peer_ni for the given nid, create it if necessary. Takes a
1468  * hold on the peer_ni.
1469  */
1470 struct lnet_peer_ni *
1471 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1472 {
1473         struct lnet_peer_ni *lpni = NULL;
1474         int rc;
1475
1476         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1477                 return ERR_PTR(-ESHUTDOWN);
1478
1479         /*
1480          * find if a peer_ni already exists.
1481          * If so then just return that.
1482          */
1483         lpni = lnet_find_peer_ni_locked(nid);
1484         if (lpni)
1485                 return lpni;
1486
1487         /*
1488          * Slow path:
1489          * use the lnet_api_mutex to serialize the creation of the peer_ni
1490          * and the creation/deletion of the local ni/net. When a local ni is
1491          * created, if there exists a set of peer_nis on that network,
1492          * they need to be traversed and updated. When a local NI is
1493          * deleted, which could result in a network being deleted, then
1494          * all peer nis on that network need to be removed as well.
1495          *
1496          * Creation through traffic should also be serialized with
1497          * creation through DLC.
1498          */
1499         lnet_net_unlock(cpt);
1500         mutex_lock(&the_lnet.ln_api_mutex);
1501         /*
1502          * Shutdown is only set under the ln_api_lock, so a single
1503          * check here is sufficent.
1504          */
1505         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1506                 lpni = ERR_PTR(-ESHUTDOWN);
1507                 goto out_mutex_unlock;
1508         }
1509
1510         rc = lnet_peer_ni_traffic_add(nid, pref);
1511         if (rc) {
1512                 lpni = ERR_PTR(rc);
1513                 goto out_mutex_unlock;
1514         }
1515
1516         lpni = lnet_find_peer_ni_locked(nid);
1517         LASSERT(lpni);
1518
1519 out_mutex_unlock:
1520         mutex_unlock(&the_lnet.ln_api_mutex);
1521         lnet_net_lock(cpt);
1522
1523         /* Lock has been dropped, check again for shutdown. */
1524         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1525                 if (!IS_ERR(lpni))
1526                         lnet_peer_ni_decref_locked(lpni);
1527                 lpni = ERR_PTR(-ESHUTDOWN);
1528         }
1529
1530         return lpni;
1531 }
1532
1533 /*
1534  * Peer Discovery
1535  */
1536
1537 /*
1538  * Is a peer uptodate from the point of view of discovery?
1539  *
1540  * If it is currently being processed, obviously not.
1541  * A forced Ping or Push is also handled by the discovery thread.
1542  *
1543  * Otherwise look at whether the peer needs rediscovering.
1544  */
1545 bool
1546 lnet_peer_is_uptodate(struct lnet_peer *lp)
1547 {
1548         bool rc;
1549
1550         spin_lock(&lp->lp_lock);
1551         if (lp->lp_state & (LNET_PEER_DISCOVERING |
1552                             LNET_PEER_FORCE_PING |
1553                             LNET_PEER_FORCE_PUSH)) {
1554                 rc = false;
1555         } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
1556                 if (lnet_peer_discovery_disabled)
1557                         rc = true;
1558                 else
1559                         rc = false;
1560         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1561                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1562                         rc = true;
1563                 else
1564                         rc = false;
1565         } else {
1566                 rc = false;
1567         }
1568         spin_unlock(&lp->lp_lock);
1569
1570         return rc;
1571 }
1572
1573 /*
1574  * Queue a peer for the attention of the discovery thread.  Call with
1575  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1576  * -EALREADY if the peer was already queued.
1577  */
1578 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1579 {
1580         int rc;
1581
1582         spin_lock(&lp->lp_lock);
1583         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1584                 lp->lp_state |= LNET_PEER_DISCOVERING;
1585         spin_unlock(&lp->lp_lock);
1586         if (list_empty(&lp->lp_dc_list)) {
1587                 lnet_peer_addref_locked(lp);
1588                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1589                 wake_up(&the_lnet.ln_dc_waitq);
1590                 rc = 0;
1591         } else {
1592                 rc = -EALREADY;
1593         }
1594
1595         return rc;
1596 }
1597
1598 /*
1599  * Discovery of a peer is complete. Wake all waiters on the peer.
1600  * Call with lnet_net_lock/EX held.
1601  */
1602 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1603 {
1604         list_del_init(&lp->lp_dc_list);
1605         wake_up_all(&lp->lp_dc_waitq);
1606         lnet_peer_decref_locked(lp);
1607 }
1608
1609 /*
1610  * Peer discovery slow path. The ln_api_mutex is held on entry, and
1611  * dropped/retaken within this function. An lnet_peer_ni is passed in
1612  * because discovery could tear down an lnet_peer.
1613  */
1614 int
1615 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt)
1616 {
1617         DEFINE_WAIT(wait);
1618         struct lnet_peer *lp;
1619         int rc = 0;
1620
1621 again:
1622         lnet_net_unlock(cpt);
1623         lnet_net_lock(LNET_LOCK_EX);
1624
1625         /* We're willing to be interrupted. */
1626         for (;;) {
1627                 lp = lpni->lpni_peer_net->lpn_peer;
1628                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
1629                 if (signal_pending(current))
1630                         break;
1631                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
1632                         break;
1633                 if (lnet_peer_is_uptodate(lp))
1634                         break;
1635                 lnet_peer_queue_for_discovery(lp);
1636                 lnet_peer_addref_locked(lp);
1637                 lnet_net_unlock(LNET_LOCK_EX);
1638                 schedule();
1639                 finish_wait(&lp->lp_dc_waitq, &wait);
1640                 lnet_net_lock(LNET_LOCK_EX);
1641                 lnet_peer_decref_locked(lp);
1642                 /* Do not use lp beyond this point. */
1643         }
1644         finish_wait(&lp->lp_dc_waitq, &wait);
1645
1646         lnet_net_unlock(LNET_LOCK_EX);
1647         lnet_net_lock(cpt);
1648
1649         if (signal_pending(current))
1650                 rc = -EINTR;
1651         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
1652                 rc = -ESHUTDOWN;
1653         else if (!lnet_peer_is_uptodate(lp))
1654                 goto again;
1655
1656         return rc;
1657 }
1658
1659 /*
1660  * Event handler for the discovery EQ.
1661  *
1662  * Called with lnet_res_lock(cpt) held. The cpt is the
1663  * lnet_cpt_of_cookie() of the md handle cookie.
1664  */
1665 static void lnet_discovery_event_handler(lnet_event_t *event)
1666 {
1667         wake_up(&the_lnet.ln_dc_waitq);
1668 }
1669
1670 /*
1671  * Wait for work to be queued or some other change that must be
1672  * attended to. Returns non-zero if the discovery thread should shut
1673  * down.
1674  */
1675 static int lnet_peer_discovery_wait_for_work(void)
1676 {
1677         int cpt;
1678         int rc = 0;
1679
1680         DEFINE_WAIT(wait);
1681
1682         cpt = lnet_net_lock_current();
1683         for (;;) {
1684                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
1685                                 TASK_INTERRUPTIBLE);
1686                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1687                         break;
1688                 if (!list_empty(&the_lnet.ln_dc_request))
1689                         break;
1690                 lnet_net_unlock(cpt);
1691                 schedule();
1692                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
1693                 cpt = lnet_net_lock_current();
1694         }
1695         finish_wait(&the_lnet.ln_dc_waitq, &wait);
1696
1697         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1698                 rc = -ESHUTDOWN;
1699
1700         lnet_net_unlock(cpt);
1701
1702         CDEBUG(D_NET, "woken: %d\n", rc);
1703
1704         return rc;
1705 }
1706
1707 /* The discovery thread. */
1708 static int lnet_peer_discovery(void *arg)
1709 {
1710         struct lnet_peer *lp;
1711
1712         CDEBUG(D_NET, "started\n");
1713         cfs_block_allsigs();
1714
1715         for (;;) {
1716                 if (lnet_peer_discovery_wait_for_work())
1717                         break;
1718
1719                 lnet_net_lock(LNET_LOCK_EX);
1720                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1721                         break;
1722                 while (!list_empty(&the_lnet.ln_dc_request)) {
1723                         lp = list_first_entry(&the_lnet.ln_dc_request,
1724                                               struct lnet_peer, lp_dc_list);
1725                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
1726                         lnet_net_unlock(LNET_LOCK_EX);
1727
1728                         /* Just tag and release for now. */
1729                         spin_lock(&lp->lp_lock);
1730                         if (lnet_peer_discovery_disabled) {
1731                                 lp->lp_state |= LNET_PEER_REDISCOVER;
1732                                 lp->lp_state &= ~(LNET_PEER_DISCOVERED |
1733                                                   LNET_PEER_NIDS_UPTODATE |
1734                                                   LNET_PEER_DISCOVERING);
1735                         } else {
1736                                 lp->lp_state |= (LNET_PEER_DISCOVERED |
1737                                                  LNET_PEER_NIDS_UPTODATE);
1738                                 lp->lp_state &= ~(LNET_PEER_REDISCOVER |
1739                                                   LNET_PEER_DISCOVERING);
1740                         }
1741                         spin_unlock(&lp->lp_lock);
1742
1743                         lnet_net_lock(LNET_LOCK_EX);
1744                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1745                                 lnet_peer_discovery_complete(lp);
1746                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
1747                                 break;
1748                 }
1749                 lnet_net_unlock(LNET_LOCK_EX);
1750         }
1751
1752         CDEBUG(D_NET, "stopping\n");
1753         /*
1754          * Clean up before telling lnet_peer_discovery_stop() that
1755          * we're done. Use wake_up() below to somewhat reduce the
1756          * size of the thundering herd if there are multiple threads
1757          * waiting on discovery of a single peer.
1758          */
1759         LNetEQFree(the_lnet.ln_dc_eqh);
1760         LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
1761
1762         lnet_net_lock(LNET_LOCK_EX);
1763         list_for_each_entry(lp, &the_lnet.ln_dc_request, lp_dc_list) {
1764                 spin_lock(&lp->lp_lock);
1765                 lp->lp_state |= LNET_PEER_REDISCOVER;
1766                 lp->lp_state &= ~(LNET_PEER_DISCOVERED |
1767                                   LNET_PEER_DISCOVERING |
1768                                   LNET_PEER_NIDS_UPTODATE);
1769                 spin_unlock(&lp->lp_lock);
1770                 lnet_peer_discovery_complete(lp);
1771         }
1772         list_for_each_entry(lp, &the_lnet.ln_dc_working, lp_dc_list) {
1773                 spin_lock(&lp->lp_lock);
1774                 lp->lp_state |= LNET_PEER_REDISCOVER;
1775                 lp->lp_state &= ~(LNET_PEER_DISCOVERED |
1776                                   LNET_PEER_DISCOVERING |
1777                                   LNET_PEER_NIDS_UPTODATE);
1778                 spin_unlock(&lp->lp_lock);
1779                 lnet_peer_discovery_complete(lp);
1780         }
1781         lnet_net_unlock(LNET_LOCK_EX);
1782
1783         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
1784         wake_up(&the_lnet.ln_dc_waitq);
1785
1786         CDEBUG(D_NET, "stopped\n");
1787
1788         return 0;
1789 }
1790
1791 /* ln_api_mutex is held on entry. */
1792 int lnet_peer_discovery_start(void)
1793 {
1794         struct task_struct *task;
1795         int rc;
1796
1797         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
1798                 return -EALREADY;
1799
1800         INIT_LIST_HEAD(&the_lnet.ln_dc_request);
1801         INIT_LIST_HEAD(&the_lnet.ln_dc_working);
1802         init_waitqueue_head(&the_lnet.ln_dc_waitq);
1803
1804         rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
1805         if (rc != 0) {
1806                 CERROR("Can't allocate discovery EQ: %d\n", rc);
1807                 return rc;
1808         }
1809
1810         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
1811         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
1812         if (IS_ERR(task)) {
1813                 rc = PTR_ERR(task);
1814                 CERROR("Can't start peer discovery thread: %d\n", rc);
1815
1816                 LNetEQFree(the_lnet.ln_dc_eqh);
1817                 LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
1818
1819                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
1820         }
1821
1822         return rc;
1823 }
1824
1825 /* ln_api_mutex is held on entry. */
1826 void lnet_peer_discovery_stop(void)
1827 {
1828         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
1829                 return;
1830
1831         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
1832         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
1833         wake_up(&the_lnet.ln_dc_waitq);
1834
1835         wait_event(the_lnet.ln_dc_waitq,
1836                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
1837
1838         LASSERT(list_empty(&the_lnet.ln_dc_request));
1839         LASSERT(list_empty(&the_lnet.ln_dc_working));
1840 }
1841
1842 /* Debugging */
1843
1844 void
1845 lnet_debug_peer(lnet_nid_t nid)
1846 {
1847         char                    *aliveness = "NA";
1848         struct lnet_peer_ni     *lp;
1849         int                     cpt;
1850
1851         cpt = lnet_cpt_of_nid(nid, NULL);
1852         lnet_net_lock(cpt);
1853
1854         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1855         if (IS_ERR(lp)) {
1856                 lnet_net_unlock(cpt);
1857                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
1858                 return;
1859         }
1860
1861         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
1862                 aliveness = lp->lpni_alive ? "up" : "down";
1863
1864         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
1865                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
1866                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
1867                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
1868                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
1869
1870         lnet_peer_ni_decref_locked(lp);
1871
1872         lnet_net_unlock(cpt);
1873 }
1874
1875 /* Gathering information for userspace. */
1876
1877 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
1878                           char aliveness[LNET_MAX_STR_LEN],
1879                           __u32 *cpt_iter, __u32 *refcount,
1880                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
1881                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
1882                           __u32 *peer_tx_qnob)
1883 {
1884         struct lnet_peer_table          *peer_table;
1885         struct lnet_peer_ni             *lp;
1886         int                             j;
1887         int                             lncpt;
1888         bool                            found = false;
1889
1890         /* get the number of CPTs */
1891         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
1892
1893         /* if the cpt number to be examined is >= the number of cpts in
1894          * the system then indicate that there are no more cpts to examin
1895          */
1896         if (*cpt_iter >= lncpt)
1897                 return -ENOENT;
1898
1899         /* get the current table */
1900         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
1901         /* if the ptable is NULL then there are no more cpts to examine */
1902         if (peer_table == NULL)
1903                 return -ENOENT;
1904
1905         lnet_net_lock(*cpt_iter);
1906
1907         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
1908                 struct list_head *peers = &peer_table->pt_hash[j];
1909
1910                 list_for_each_entry(lp, peers, lpni_hashlist) {
1911                         if (peer_index-- > 0)
1912                                 continue;
1913
1914                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
1915                         if (lnet_isrouter(lp) ||
1916                                 lnet_peer_aliveness_enabled(lp))
1917                                 snprintf(aliveness, LNET_MAX_STR_LEN,
1918                                          lp->lpni_alive ? "up" : "down");
1919
1920                         *nid = lp->lpni_nid;
1921                         *refcount = atomic_read(&lp->lpni_refcount);
1922                         *ni_peer_tx_credits =
1923                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
1924                         *peer_tx_credits = lp->lpni_txcredits;
1925                         *peer_rtr_credits = lp->lpni_rtrcredits;
1926                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
1927                         *peer_tx_qnob = lp->lpni_txqnob;
1928
1929                         found = true;
1930                 }
1931
1932         }
1933         lnet_net_unlock(*cpt_iter);
1934
1935         *cpt_iter = lncpt;
1936
1937         return found ? 0 : -ENOENT;
1938 }
1939
1940 /* ln_api_mutex is held, which keeps the peer list stable */
1941 int lnet_get_peer_info(__u32 idx, lnet_nid_t *primary_nid, lnet_nid_t *nid,
1942                        bool *mr,
1943                        struct lnet_peer_ni_credit_info __user *peer_ni_info,
1944                        struct lnet_ioctl_element_stats __user *peer_ni_stats)
1945 {
1946         struct lnet_peer_ni *lpni = NULL;
1947         struct lnet_peer_net *lpn = NULL;
1948         struct lnet_peer *lp = NULL;
1949         struct lnet_peer_ni_credit_info ni_info;
1950         struct lnet_ioctl_element_stats ni_stats;
1951         int rc;
1952
1953         lpni = lnet_get_peer_ni_idx_locked(idx, &lpn, &lp);
1954
1955         if (!lpni)
1956                 return -ENOENT;
1957
1958         *primary_nid = lp->lp_primary_nid;
1959         *mr = lnet_peer_is_multi_rail(lp);
1960         *nid = lpni->lpni_nid;
1961         snprintf(ni_info.cr_aliveness, LNET_MAX_STR_LEN, "NA");
1962         if (lnet_isrouter(lpni) ||
1963                 lnet_peer_aliveness_enabled(lpni))
1964                 snprintf(ni_info.cr_aliveness, LNET_MAX_STR_LEN,
1965                          lpni->lpni_alive ? "up" : "down");
1966
1967         ni_info.cr_refcount = atomic_read(&lpni->lpni_refcount);
1968         ni_info.cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
1969                 lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
1970         ni_info.cr_peer_tx_credits = lpni->lpni_txcredits;
1971         ni_info.cr_peer_rtr_credits = lpni->lpni_rtrcredits;
1972         ni_info.cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
1973         ni_info.cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
1974         ni_info.cr_peer_tx_qnob = lpni->lpni_txqnob;
1975         ni_info.cr_ncpt = lpni->lpni_cpt;
1976
1977         ni_stats.iel_send_count = atomic_read(&lpni->lpni_stats.send_count);
1978         ni_stats.iel_recv_count = atomic_read(&lpni->lpni_stats.recv_count);
1979         ni_stats.iel_drop_count = atomic_read(&lpni->lpni_stats.drop_count);
1980
1981         /* If copy_to_user fails */
1982         rc = -EFAULT;
1983         if (copy_to_user(peer_ni_info, &ni_info, sizeof(ni_info)))
1984                 goto copy_failed;
1985
1986         if (copy_to_user(peer_ni_stats, &ni_stats, sizeof(ni_stats)))
1987                 goto copy_failed;
1988
1989         rc = 0;
1990
1991 copy_failed:
1992         return rc;
1993 }