Whamcloud - gitweb
LU-7734 lnet: implement Peer Discovery
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2014, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <lnet/lib-lnet.h>
38 #include <lnet/lib-dlc.h>
39
40 unsigned lnet_peer_discovery_enabled = 1;
41 module_param(lnet_peer_discovery_enabled, uint, 0644);
42 MODULE_PARM_DESC(lnet_peer_discovery_enabled,
43                 "Explicitly enable/disable peer discovery");
44
45 /* Value indicating that recovery needs to re-check a peer immediately. */
46 #define LNET_REDISCOVER_PEER    (1)
47
48 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp);
49
50 static void
51 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
52 {
53         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
54                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
55                 lnet_peer_ni_decref_locked(lpni);
56         }
57 }
58
59 void
60 lnet_peer_net_added(struct lnet_net *net)
61 {
62         struct lnet_peer_ni *lpni, *tmp;
63
64         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
65                                  lpni_on_remote_peer_ni_list) {
66
67                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
68                         lpni->lpni_net = net;
69
70                         spin_lock(&lpni->lpni_lock);
71                         lpni->lpni_txcredits =
72                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
73                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
74                         lpni->lpni_rtrcredits =
75                                 lnet_peer_buffer_credits(lpni->lpni_net);
76                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
77                         spin_unlock(&lpni->lpni_lock);
78
79                         lnet_peer_remove_from_remote_list(lpni);
80                 }
81         }
82 }
83
84 static void
85 lnet_peer_tables_destroy(void)
86 {
87         struct lnet_peer_table  *ptable;
88         struct list_head        *hash;
89         int                     i;
90         int                     j;
91
92         if (!the_lnet.ln_peer_tables)
93                 return;
94
95         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
96                 hash = ptable->pt_hash;
97                 if (!hash) /* not intialized */
98                         break;
99
100                 LASSERT(list_empty(&ptable->pt_zombie_list));
101
102                 ptable->pt_hash = NULL;
103                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
104                         LASSERT(list_empty(&hash[j]));
105
106                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
107         }
108
109         cfs_percpt_free(the_lnet.ln_peer_tables);
110         the_lnet.ln_peer_tables = NULL;
111 }
112
113 int
114 lnet_peer_tables_create(void)
115 {
116         struct lnet_peer_table  *ptable;
117         struct list_head        *hash;
118         int                     i;
119         int                     j;
120
121         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
122                                                    sizeof(*ptable));
123         if (the_lnet.ln_peer_tables == NULL) {
124                 CERROR("Failed to allocate cpu-partition peer tables\n");
125                 return -ENOMEM;
126         }
127
128         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
129                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
130                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
131                 if (hash == NULL) {
132                         CERROR("Failed to create peer hash table\n");
133                         lnet_peer_tables_destroy();
134                         return -ENOMEM;
135                 }
136
137                 spin_lock_init(&ptable->pt_zombie_lock);
138                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
139
140                 INIT_LIST_HEAD(&ptable->pt_peer_list);
141
142                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
143                         INIT_LIST_HEAD(&hash[j]);
144                 ptable->pt_hash = hash; /* sign of initialization */
145         }
146
147         return 0;
148 }
149
150 static struct lnet_peer_ni *
151 lnet_peer_ni_alloc(lnet_nid_t nid)
152 {
153         struct lnet_peer_ni *lpni;
154         struct lnet_net *net;
155         int cpt;
156
157         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
158
159         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
160         if (!lpni)
161                 return NULL;
162
163         INIT_LIST_HEAD(&lpni->lpni_txq);
164         INIT_LIST_HEAD(&lpni->lpni_rtrq);
165         INIT_LIST_HEAD(&lpni->lpni_routes);
166         INIT_LIST_HEAD(&lpni->lpni_hashlist);
167         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
168         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
169
170         spin_lock_init(&lpni->lpni_lock);
171
172         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
173         lpni->lpni_last_alive = cfs_time_current(); /* assumes alive */
174         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
175         lpni->lpni_nid = nid;
176         lpni->lpni_cpt = cpt;
177         lnet_set_peer_ni_health_locked(lpni, true);
178
179         net = lnet_get_net_locked(LNET_NIDNET(nid));
180         lpni->lpni_net = net;
181         if (net) {
182                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
183                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
184                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
185                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
186         } else {
187                 /*
188                  * This peer_ni is not on a local network, so we
189                  * cannot add the credits here. In case the net is
190                  * added later, add the peer_ni to the remote peer ni
191                  * list so it can be easily found and revisited.
192                  */
193                 /* FIXME: per-net implementation instead? */
194                 atomic_inc(&lpni->lpni_refcount);
195                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
196                               &the_lnet.ln_remote_peer_ni_list);
197         }
198
199         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
200
201         return lpni;
202 }
203
204 static struct lnet_peer_net *
205 lnet_peer_net_alloc(__u32 net_id)
206 {
207         struct lnet_peer_net *lpn;
208
209         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
210         if (!lpn)
211                 return NULL;
212
213         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
214         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
215         lpn->lpn_net_id = net_id;
216
217         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
218
219         return lpn;
220 }
221
222 void
223 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
224 {
225         struct lnet_peer *lp;
226
227         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
228
229         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
230         LASSERT(list_empty(&lpn->lpn_peer_nis));
231         LASSERT(list_empty(&lpn->lpn_peer_nets));
232         lp = lpn->lpn_peer;
233         lpn->lpn_peer = NULL;
234         LIBCFS_FREE(lpn, sizeof(*lpn));
235
236         lnet_peer_decref_locked(lp);
237 }
238
239 static struct lnet_peer *
240 lnet_peer_alloc(lnet_nid_t nid)
241 {
242         struct lnet_peer *lp;
243
244         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
245         if (!lp)
246                 return NULL;
247
248         INIT_LIST_HEAD(&lp->lp_peer_list);
249         INIT_LIST_HEAD(&lp->lp_peer_nets);
250         INIT_LIST_HEAD(&lp->lp_dc_list);
251         init_waitqueue_head(&lp->lp_dc_waitq);
252         spin_lock_init(&lp->lp_lock);
253         lp->lp_primary_nid = nid;
254         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
255
256         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
257
258         return lp;
259 }
260
261 void
262 lnet_destroy_peer_locked(struct lnet_peer *lp)
263 {
264         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
265
266         LASSERT(atomic_read(&lp->lp_refcount) == 0);
267         LASSERT(list_empty(&lp->lp_peer_nets));
268         LASSERT(list_empty(&lp->lp_peer_list));
269         LASSERT(list_empty(&lp->lp_dc_list));
270
271         if (lp->lp_data)
272                 lnet_ping_buffer_decref(lp->lp_data);
273
274         LIBCFS_FREE(lp, sizeof(*lp));
275 }
276
277 /*
278  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
279  * that peer_net, detach the peer_net from the peer.
280  *
281  * Call with lnet_net_lock/EX held
282  */
283 static void
284 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
285 {
286         struct lnet_peer_table *ptable;
287         struct lnet_peer_net *lpn;
288         struct lnet_peer *lp;
289
290         /*
291          * Belts and suspenders: gracefully handle teardown of a
292          * partially connected peer_ni.
293          */
294         lpn = lpni->lpni_peer_net;
295
296         list_del_init(&lpni->lpni_peer_nis);
297         /*
298          * If there are no lpni's left, we detach lpn from
299          * lp_peer_nets, so it cannot be found anymore.
300          */
301         if (list_empty(&lpn->lpn_peer_nis))
302                 list_del_init(&lpn->lpn_peer_nets);
303
304         /* Update peer NID count. */
305         lp = lpn->lpn_peer;
306         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
307         lp->lp_nnis--;
308         ptable->pt_peer_nnids--;
309
310         /*
311          * If there are no more peer nets, make the peer unfindable
312          * via the peer_tables.
313          *
314          * Otherwise, if the peer is DISCOVERED, tell discovery to
315          * take another look at it. This is a no-op if discovery for
316          * this peer did the detaching.
317          */
318         if (list_empty(&lp->lp_peer_nets)) {
319                 list_del_init(&lp->lp_peer_list);
320                 ptable->pt_peers--;
321         } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) {
322                 /* Discovery isn't running, nothing to do here. */
323         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
324                 lnet_peer_queue_for_discovery(lp);
325                 wake_up(&the_lnet.ln_dc_waitq);
326         }
327         CDEBUG(D_NET, "peer %s NID %s\n",
328                 libcfs_nid2str(lp->lp_primary_nid),
329                 libcfs_nid2str(lpni->lpni_nid));
330 }
331
332 /* called with lnet_net_lock LNET_LOCK_EX held */
333 static int
334 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
335 {
336         struct lnet_peer_table *ptable = NULL;
337
338         /* don't remove a peer_ni if it's also a gateway */
339         if (lpni->lpni_rtr_refcount > 0) {
340                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
341                        libcfs_nid2str(lpni->lpni_nid));
342                 return -EBUSY;
343         }
344
345         lnet_peer_remove_from_remote_list(lpni);
346
347         /* remove peer ni from the hash list. */
348         list_del_init(&lpni->lpni_hashlist);
349
350         /* decrement the ref count on the peer table */
351         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
352         LASSERT(ptable->pt_number > 0);
353         ptable->pt_number--;
354
355         /*
356          * The peer_ni can no longer be found with a lookup. But there
357          * can be current users, so keep track of it on the zombie
358          * list until the reference count has gone to zero.
359          *
360          * The last reference may be lost in a place where the
361          * lnet_net_lock locks only a single cpt, and that cpt may not
362          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
363          * has its own lock.
364          */
365         spin_lock(&ptable->pt_zombie_lock);
366         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
367         ptable->pt_zombies++;
368         spin_unlock(&ptable->pt_zombie_lock);
369
370         /* no need to keep this peer_ni on the hierarchy anymore */
371         lnet_peer_detach_peer_ni_locked(lpni);
372
373         /* remove hashlist reference on peer_ni */
374         lnet_peer_ni_decref_locked(lpni);
375
376         return 0;
377 }
378
379 void lnet_peer_uninit(void)
380 {
381         struct lnet_peer_ni *lpni, *tmp;
382
383         lnet_net_lock(LNET_LOCK_EX);
384
385         /* remove all peer_nis from the remote peer and the hash list */
386         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
387                                  lpni_on_remote_peer_ni_list)
388                 lnet_peer_ni_del_locked(lpni);
389
390         lnet_peer_tables_destroy();
391
392         lnet_net_unlock(LNET_LOCK_EX);
393 }
394
395 static int
396 lnet_peer_del_locked(struct lnet_peer *peer)
397 {
398         struct lnet_peer_ni *lpni = NULL, *lpni2;
399         int rc = 0, rc2 = 0;
400
401         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
402
403         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
404         while (lpni != NULL) {
405                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
406                 rc = lnet_peer_ni_del_locked(lpni);
407                 if (rc != 0)
408                         rc2 = rc;
409                 lpni = lpni2;
410         }
411
412         return rc2;
413 }
414
415 static int
416 lnet_peer_del(struct lnet_peer *peer)
417 {
418         lnet_net_lock(LNET_LOCK_EX);
419         lnet_peer_del_locked(peer);
420         lnet_net_unlock(LNET_LOCK_EX);
421
422         return 0;
423 }
424
425 /*
426  * Delete a NID from a peer. Call with ln_api_mutex held.
427  *
428  * Error codes:
429  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
430  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
431  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
432  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
433  */
434 static int
435 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
436 {
437         struct lnet_peer_ni *lpni;
438         lnet_nid_t primary_nid = lp->lp_primary_nid;
439         int rc = 0;
440
441         if (!(flags & LNET_PEER_CONFIGURED)) {
442                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
443                         rc = -EPERM;
444                         goto out;
445                 }
446         }
447         lpni = lnet_find_peer_ni_locked(nid);
448         if (!lpni) {
449                 rc = -ENOENT;
450                 goto out;
451         }
452         lnet_peer_ni_decref_locked(lpni);
453         if (lp != lpni->lpni_peer_net->lpn_peer) {
454                 rc = -ECHILD;
455                 goto out;
456         }
457
458         /*
459          * This function only allows deletion of the primary NID if it
460          * is the only NID.
461          */
462         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
463                 rc = -EBUSY;
464                 goto out;
465         }
466
467         lnet_net_lock(LNET_LOCK_EX);
468         lnet_peer_ni_del_locked(lpni);
469         lnet_net_unlock(LNET_LOCK_EX);
470
471 out:
472         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
473                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
474
475         return rc;
476 }
477
478 static void
479 lnet_peer_table_cleanup_locked(struct lnet_net *net,
480                                struct lnet_peer_table *ptable)
481 {
482         int                      i;
483         struct lnet_peer_ni     *next;
484         struct lnet_peer_ni     *lpni;
485         struct lnet_peer        *peer;
486
487         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
488                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
489                                          lpni_hashlist) {
490                         if (net != NULL && net != lpni->lpni_net)
491                                 continue;
492
493                         peer = lpni->lpni_peer_net->lpn_peer;
494                         if (peer->lp_primary_nid != lpni->lpni_nid) {
495                                 lnet_peer_ni_del_locked(lpni);
496                                 continue;
497                         }
498                         /*
499                          * Removing the primary NID implies removing
500                          * the entire peer. Advance next beyond any
501                          * peer_ni that belongs to the same peer.
502                          */
503                         list_for_each_entry_from(next, &ptable->pt_hash[i],
504                                                  lpni_hashlist) {
505                                 if (next->lpni_peer_net->lpn_peer != peer)
506                                         break;
507                         }
508                         lnet_peer_del_locked(peer);
509                 }
510         }
511 }
512
513 static void
514 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
515 {
516         int     i = 3;
517
518         spin_lock(&ptable->pt_zombie_lock);
519         while (ptable->pt_zombies) {
520                 spin_unlock(&ptable->pt_zombie_lock);
521
522                 if (IS_PO2(i)) {
523                         CDEBUG(D_WARNING,
524                                "Waiting for %d zombies on peer table\n",
525                                ptable->pt_zombies);
526                 }
527                 set_current_state(TASK_UNINTERRUPTIBLE);
528                 schedule_timeout(cfs_time_seconds(1) >> 1);
529                 spin_lock(&ptable->pt_zombie_lock);
530         }
531         spin_unlock(&ptable->pt_zombie_lock);
532 }
533
534 static void
535 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
536                                 struct lnet_peer_table *ptable)
537 {
538         struct lnet_peer_ni     *lp;
539         struct lnet_peer_ni     *tmp;
540         lnet_nid_t              lpni_nid;
541         int                     i;
542
543         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
544                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
545                                          lpni_hashlist) {
546                         if (net != lp->lpni_net)
547                                 continue;
548
549                         if (lp->lpni_rtr_refcount == 0)
550                                 continue;
551
552                         lpni_nid = lp->lpni_nid;
553
554                         lnet_net_unlock(LNET_LOCK_EX);
555                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
556                         lnet_net_lock(LNET_LOCK_EX);
557                 }
558         }
559 }
560
561 void
562 lnet_peer_tables_cleanup(struct lnet_net *net)
563 {
564         int                             i;
565         struct lnet_peer_table          *ptable;
566
567         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
568         /* If just deleting the peers for a NI, get rid of any routes these
569          * peers are gateways for. */
570         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
571                 lnet_net_lock(LNET_LOCK_EX);
572                 lnet_peer_table_del_rtrs_locked(net, ptable);
573                 lnet_net_unlock(LNET_LOCK_EX);
574         }
575
576         /* Start the cleanup process */
577         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
578                 lnet_net_lock(LNET_LOCK_EX);
579                 lnet_peer_table_cleanup_locked(net, ptable);
580                 lnet_net_unlock(LNET_LOCK_EX);
581         }
582
583         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
584                 lnet_peer_ni_finalize_wait(ptable);
585 }
586
587 static struct lnet_peer_ni *
588 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
589 {
590         struct list_head        *peers;
591         struct lnet_peer_ni     *lp;
592
593         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
594
595         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
596         list_for_each_entry(lp, peers, lpni_hashlist) {
597                 if (lp->lpni_nid == nid) {
598                         lnet_peer_ni_addref_locked(lp);
599                         return lp;
600                 }
601         }
602
603         return NULL;
604 }
605
606 struct lnet_peer_ni *
607 lnet_find_peer_ni_locked(lnet_nid_t nid)
608 {
609         struct lnet_peer_ni *lpni;
610         struct lnet_peer_table *ptable;
611         int cpt;
612
613         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
614
615         ptable = the_lnet.ln_peer_tables[cpt];
616         lpni = lnet_get_peer_ni_locked(ptable, nid);
617
618         return lpni;
619 }
620
621 struct lnet_peer *
622 lnet_find_peer(lnet_nid_t nid)
623 {
624         struct lnet_peer_ni *lpni;
625         struct lnet_peer *lp = NULL;
626         int cpt;
627
628         cpt = lnet_net_lock_current();
629         lpni = lnet_find_peer_ni_locked(nid);
630         if (lpni) {
631                 lp = lpni->lpni_peer_net->lpn_peer;
632                 lnet_peer_addref_locked(lp);
633                 lnet_peer_ni_decref_locked(lpni);
634         }
635         lnet_net_unlock(cpt);
636
637         return lp;
638 }
639
640 struct lnet_peer_ni *
641 lnet_get_peer_ni_idx_locked(int idx, struct lnet_peer_net **lpn,
642                             struct lnet_peer **lp)
643 {
644         struct lnet_peer_table  *ptable;
645         struct lnet_peer_ni     *lpni;
646         int                     lncpt;
647         int                     cpt;
648
649         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
650
651         for (cpt = 0; cpt < lncpt; cpt++) {
652                 ptable = the_lnet.ln_peer_tables[cpt];
653                 if (ptable->pt_peer_nnids > idx)
654                         break;
655                 idx -= ptable->pt_peer_nnids;
656         }
657         if (cpt >= lncpt)
658                 return NULL;
659
660         list_for_each_entry((*lp), &ptable->pt_peer_list, lp_peer_list) {
661                 if ((*lp)->lp_nnis <= idx) {
662                         idx -= (*lp)->lp_nnis;
663                         continue;
664                 }
665                 list_for_each_entry((*lpn), &((*lp)->lp_peer_nets),
666                                     lpn_peer_nets) {
667                         list_for_each_entry(lpni, &((*lpn)->lpn_peer_nis),
668                                             lpni_peer_nis) {
669                                 if (idx-- == 0)
670                                         return lpni;
671                         }
672                 }
673         }
674
675         return NULL;
676 }
677
678 struct lnet_peer_ni *
679 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
680                              struct lnet_peer_net *peer_net,
681                              struct lnet_peer_ni *prev)
682 {
683         struct lnet_peer_ni *lpni;
684         struct lnet_peer_net *net = peer_net;
685
686         if (!prev) {
687                 if (!net) {
688                         if (list_empty(&peer->lp_peer_nets))
689                                 return NULL;
690
691                         net = list_entry(peer->lp_peer_nets.next,
692                                          struct lnet_peer_net,
693                                          lpn_peer_nets);
694                 }
695                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
696                                   lpni_peer_nis);
697
698                 return lpni;
699         }
700
701         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
702                 /*
703                  * if you reached the end of the peer ni list and the peer
704                  * net is specified then there are no more peer nis in that
705                  * net.
706                  */
707                 if (net)
708                         return NULL;
709
710                 /*
711                  * we reached the end of this net ni list. move to the
712                  * next net
713                  */
714                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
715                     &peer->lp_peer_nets)
716                         /* no more nets and no more NIs. */
717                         return NULL;
718
719                 /* get the next net */
720                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
721                                  struct lnet_peer_net,
722                                  lpn_peer_nets);
723                 /* get the ni on it */
724                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
725                                   lpni_peer_nis);
726
727                 return lpni;
728         }
729
730         /* there are more nis left */
731         lpni = list_entry(prev->lpni_peer_nis.next,
732                           struct lnet_peer_ni, lpni_peer_nis);
733
734         return lpni;
735 }
736
737 /*
738  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
739  * this is a preferred point-to-point path. Call with lnet_net_lock in
740  * shared mmode.
741  */
742 bool
743 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
744 {
745         int i;
746
747         if (lpni->lpni_pref_nnids == 0)
748                 return false;
749         if (lpni->lpni_pref_nnids == 1)
750                 return lpni->lpni_pref.nid == nid;
751         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
752                 if (lpni->lpni_pref.nids[i] == nid)
753                         return true;
754         }
755         return false;
756 }
757
758 /*
759  * Set a single ni as preferred, provided no preferred ni is already
760  * defined. Only to be used for non-multi-rail peer_ni.
761  */
762 int
763 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
764 {
765         int rc = 0;
766
767         spin_lock(&lpni->lpni_lock);
768         if (nid == LNET_NID_ANY) {
769                 rc = -EINVAL;
770         } else if (lpni->lpni_pref_nnids > 0) {
771                 rc = -EPERM;
772         } else if (lpni->lpni_pref_nnids == 0) {
773                 lpni->lpni_pref.nid = nid;
774                 lpni->lpni_pref_nnids = 1;
775                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
776         }
777         spin_unlock(&lpni->lpni_lock);
778
779         CDEBUG(D_NET, "peer %s nid %s: %d\n",
780                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
781         return rc;
782 }
783
784 /*
785  * Clear the preferred NID from a non-multi-rail peer_ni, provided
786  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
787  */
788 int
789 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
790 {
791         int rc = 0;
792
793         spin_lock(&lpni->lpni_lock);
794         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
795                 lpni->lpni_pref_nnids = 0;
796                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
797         } else if (lpni->lpni_pref_nnids == 0) {
798                 rc = -ENOENT;
799         } else {
800                 rc = -EPERM;
801         }
802         spin_unlock(&lpni->lpni_lock);
803
804         CDEBUG(D_NET, "peer %s: %d\n",
805                libcfs_nid2str(lpni->lpni_nid), rc);
806         return rc;
807 }
808
809 /*
810  * Clear the preferred NIDs from a non-multi-rail peer.
811  */
812 void
813 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
814 {
815         struct lnet_peer_ni *lpni = NULL;
816
817         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
818                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
819 }
820
821 int
822 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
823 {
824         lnet_nid_t *nids = NULL;
825         lnet_nid_t *oldnids = NULL;
826         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
827         int size;
828         int i;
829         int rc = 0;
830
831         if (nid == LNET_NID_ANY) {
832                 rc = -EINVAL;
833                 goto out;
834         }
835
836         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
837                 rc = -EEXIST;
838                 goto out;
839         }
840
841         /* A non-MR node may have only one preferred NI per peer_ni */
842         if (lpni->lpni_pref_nnids > 0) {
843                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
844                         rc = -EPERM;
845                         goto out;
846                 }
847         }
848
849         if (lpni->lpni_pref_nnids != 0) {
850                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
851                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
852                 if (!nids) {
853                         rc = -ENOMEM;
854                         goto out;
855                 }
856                 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
857                         if (lpni->lpni_pref.nids[i] == nid) {
858                                 LIBCFS_FREE(nids, size);
859                                 rc = -EEXIST;
860                                 goto out;
861                         }
862                         nids[i] = lpni->lpni_pref.nids[i];
863                 }
864                 nids[i] = nid;
865         }
866
867         lnet_net_lock(LNET_LOCK_EX);
868         spin_lock(&lpni->lpni_lock);
869         if (lpni->lpni_pref_nnids == 0) {
870                 lpni->lpni_pref.nid = nid;
871         } else {
872                 oldnids = lpni->lpni_pref.nids;
873                 lpni->lpni_pref.nids = nids;
874         }
875         lpni->lpni_pref_nnids++;
876         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
877         spin_unlock(&lpni->lpni_lock);
878         lnet_net_unlock(LNET_LOCK_EX);
879
880         if (oldnids) {
881                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
882                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
883         }
884 out:
885         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
886                 spin_lock(&lpni->lpni_lock);
887                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
888                 spin_unlock(&lpni->lpni_lock);
889         }
890         CDEBUG(D_NET, "peer %s nid %s: %d\n",
891                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
892         return rc;
893 }
894
895 int
896 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
897 {
898         lnet_nid_t *nids = NULL;
899         lnet_nid_t *oldnids = NULL;
900         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
901         int size;
902         int i, j;
903         int rc = 0;
904
905         if (lpni->lpni_pref_nnids == 0) {
906                 rc = -ENOENT;
907                 goto out;
908         }
909
910         if (lpni->lpni_pref_nnids == 1) {
911                 if (lpni->lpni_pref.nid != nid) {
912                         rc = -ENOENT;
913                         goto out;
914                 }
915         } else if (lpni->lpni_pref_nnids == 2) {
916                 if (lpni->lpni_pref.nids[0] != nid &&
917                     lpni->lpni_pref.nids[1] != nid) {
918                         rc = -ENOENT;
919                         goto out;
920                 }
921         } else {
922                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
923                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
924                 if (!nids) {
925                         rc = -ENOMEM;
926                         goto out;
927                 }
928                 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
929                         if (lpni->lpni_pref.nids[i] != nid)
930                                 continue;
931                         nids[j++] = lpni->lpni_pref.nids[i];
932                 }
933                 /* Check if we actually removed a nid. */
934                 if (j == lpni->lpni_pref_nnids) {
935                         LIBCFS_FREE(nids, size);
936                         rc = -ENOENT;
937                         goto out;
938                 }
939         }
940
941         lnet_net_lock(LNET_LOCK_EX);
942         spin_lock(&lpni->lpni_lock);
943         if (lpni->lpni_pref_nnids == 1) {
944                 lpni->lpni_pref.nid = LNET_NID_ANY;
945         } else if (lpni->lpni_pref_nnids == 2) {
946                 oldnids = lpni->lpni_pref.nids;
947                 if (oldnids[0] == nid)
948                         lpni->lpni_pref.nid = oldnids[1];
949                 else
950                         lpni->lpni_pref.nid = oldnids[2];
951         } else {
952                 oldnids = lpni->lpni_pref.nids;
953                 lpni->lpni_pref.nids = nids;
954         }
955         lpni->lpni_pref_nnids--;
956         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
957         spin_unlock(&lpni->lpni_lock);
958         lnet_net_unlock(LNET_LOCK_EX);
959
960         if (oldnids) {
961                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
962                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
963         }
964 out:
965         CDEBUG(D_NET, "peer %s nid %s: %d\n",
966                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
967         return rc;
968 }
969
970 lnet_nid_t
971 lnet_peer_primary_nid(lnet_nid_t nid)
972 {
973         struct lnet_peer_ni *lpni;
974         lnet_nid_t primary_nid = nid;
975         int cpt;
976
977         cpt = lnet_net_lock_current();
978         lpni = lnet_find_peer_ni_locked(nid);
979         if (lpni) {
980                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
981                 lnet_peer_ni_decref_locked(lpni);
982         }
983         lnet_net_unlock(cpt);
984
985         return primary_nid;
986 }
987
988 lnet_nid_t
989 LNetPrimaryNID(lnet_nid_t nid)
990 {
991         struct lnet_peer *lp;
992         struct lnet_peer_ni *lpni;
993         lnet_nid_t primary_nid = nid;
994         int rc = 0;
995         int cpt;
996
997         cpt = lnet_net_lock_current();
998         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
999         if (IS_ERR(lpni)) {
1000                 rc = PTR_ERR(lpni);
1001                 goto out_unlock;
1002         }
1003         lp = lpni->lpni_peer_net->lpn_peer;
1004         while (!lnet_peer_is_uptodate(lp)) {
1005                 rc = lnet_discover_peer_locked(lpni, cpt);
1006                 if (rc)
1007                         goto out_decref;
1008                 lp = lpni->lpni_peer_net->lpn_peer;
1009         }
1010         primary_nid = lp->lp_primary_nid;
1011 out_decref:
1012         lnet_peer_ni_decref_locked(lpni);
1013 out_unlock:
1014         lnet_net_unlock(cpt);
1015
1016         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
1017                libcfs_nid2str(primary_nid), rc);
1018         return primary_nid;
1019 }
1020 EXPORT_SYMBOL(LNetPrimaryNID);
1021
1022 struct lnet_peer_net *
1023 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
1024 {
1025         struct lnet_peer_net *peer_net;
1026         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
1027                 if (peer_net->lpn_net_id == net_id)
1028                         return peer_net;
1029         }
1030         return NULL;
1031 }
1032
1033 /*
1034  * Attach a peer_ni to a peer_net and peer. This function assumes
1035  * peer_ni is not already attached to the peer_net/peer. The peer_ni
1036  * may be attached to a different peer, in which case it will be
1037  * properly detached first. The whole operation is done atomically.
1038  *
1039  * Always returns 0.  This is the last function called from functions
1040  * that do return an int, so returning 0 here allows the compiler to
1041  * do a tail call.
1042  */
1043 static int
1044 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
1045                                 struct lnet_peer_net *lpn,
1046                                 struct lnet_peer_ni *lpni,
1047                                 unsigned flags)
1048 {
1049         struct lnet_peer_table *ptable;
1050
1051         /* Install the new peer_ni */
1052         lnet_net_lock(LNET_LOCK_EX);
1053         /* Add peer_ni to global peer table hash, if necessary. */
1054         if (list_empty(&lpni->lpni_hashlist)) {
1055                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1056
1057                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1058                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1059                 ptable->pt_version++;
1060                 ptable->pt_number++;
1061                 /* This is the 1st refcount on lpni. */
1062                 atomic_inc(&lpni->lpni_refcount);
1063         }
1064
1065         /* Detach the peer_ni from an existing peer, if necessary. */
1066         if (lpni->lpni_peer_net) {
1067                 LASSERT(lpni->lpni_peer_net != lpn);
1068                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1069                 lnet_peer_detach_peer_ni_locked(lpni);
1070                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1071                 lpni->lpni_peer_net = NULL;
1072         }
1073
1074         /* Add peer_ni to peer_net */
1075         lpni->lpni_peer_net = lpn;
1076         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1077         lnet_peer_net_addref_locked(lpn);
1078
1079         /* Add peer_net to peer */
1080         if (!lpn->lpn_peer) {
1081                 lpn->lpn_peer = lp;
1082                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1083                 lnet_peer_addref_locked(lp);
1084         }
1085
1086         /* Add peer to global peer list, if necessary */
1087         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1088         if (list_empty(&lp->lp_peer_list)) {
1089                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1090                 ptable->pt_peers++;
1091         }
1092
1093
1094         /* Update peer state */
1095         spin_lock(&lp->lp_lock);
1096         if (flags & LNET_PEER_CONFIGURED) {
1097                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1098                         lp->lp_state |= LNET_PEER_CONFIGURED;
1099         }
1100         if (flags & LNET_PEER_MULTI_RAIL) {
1101                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1102                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1103                         lnet_peer_clr_non_mr_pref_nids(lp);
1104                 }
1105         }
1106         spin_unlock(&lp->lp_lock);
1107
1108         lp->lp_nnis++;
1109         the_lnet.ln_peer_tables[lp->lp_cpt]->pt_peer_nnids++;
1110         lnet_net_unlock(LNET_LOCK_EX);
1111
1112         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1113                libcfs_nid2str(lp->lp_primary_nid),
1114                libcfs_nid2str(lpni->lpni_nid), flags);
1115
1116         return 0;
1117 }
1118
1119 /*
1120  * Create a new peer, with nid as its primary nid.
1121  *
1122  * Call with the lnet_api_mutex held.
1123  */
1124 static int
1125 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1126 {
1127         struct lnet_peer *lp;
1128         struct lnet_peer_net *lpn;
1129         struct lnet_peer_ni *lpni;
1130         int rc = 0;
1131
1132         LASSERT(nid != LNET_NID_ANY);
1133
1134         /*
1135          * No need for the lnet_net_lock here, because the
1136          * lnet_api_mutex is held.
1137          */
1138         lpni = lnet_find_peer_ni_locked(nid);
1139         if (lpni) {
1140                 /* A peer with this NID already exists. */
1141                 lp = lpni->lpni_peer_net->lpn_peer;
1142                 lnet_peer_ni_decref_locked(lpni);
1143                 /*
1144                  * This is an error if the peer was configured and the
1145                  * primary NID differs or an attempt is made to change
1146                  * the Multi-Rail flag. Otherwise the assumption is
1147                  * that an existing peer is being modified.
1148                  */
1149                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1150                         if (lp->lp_primary_nid != nid)
1151                                 rc = -EEXIST;
1152                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1153                                 rc = -EPERM;
1154                         goto out;
1155                 }
1156                 /* Delete and recreate as a configured peer. */
1157                 lnet_peer_del(lp);
1158         }
1159
1160         /* Create peer, peer_net, and peer_ni. */
1161         rc = -ENOMEM;
1162         lp = lnet_peer_alloc(nid);
1163         if (!lp)
1164                 goto out;
1165         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1166         if (!lpn)
1167                 goto out_free_lp;
1168         lpni = lnet_peer_ni_alloc(nid);
1169         if (!lpni)
1170                 goto out_free_lpn;
1171
1172         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1173
1174 out_free_lpn:
1175         LIBCFS_FREE(lpn, sizeof(*lpn));
1176 out_free_lp:
1177         LIBCFS_FREE(lp, sizeof(*lp));
1178 out:
1179         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1180                libcfs_nid2str(nid), flags, rc);
1181         return rc;
1182 }
1183
1184 /*
1185  * Add a NID to a peer. Call with ln_api_mutex held.
1186  *
1187  * Error codes:
1188  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1189  *  -EEXIST:   The NID was configured by DLC for a different peer.
1190  *  -ENOMEM:   Out of memory.
1191  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1192  *             non-multi-rail peer.
1193  */
1194 static int
1195 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1196 {
1197         struct lnet_peer_net *lpn;
1198         struct lnet_peer_ni *lpni;
1199         int rc = 0;
1200
1201         LASSERT(lp);
1202         LASSERT(nid != LNET_NID_ANY);
1203
1204         /* A configured peer can only be updated through configuration. */
1205         if (!(flags & LNET_PEER_CONFIGURED)) {
1206                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1207                         rc = -EPERM;
1208                         goto out;
1209                 }
1210         }
1211
1212         /*
1213          * The MULTI_RAIL flag can be set but not cleared, because
1214          * that would leave the peer struct in an invalid state.
1215          */
1216         if (flags & LNET_PEER_MULTI_RAIL) {
1217                 spin_lock(&lp->lp_lock);
1218                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1219                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1220                         lnet_peer_clr_non_mr_pref_nids(lp);
1221                 }
1222                 spin_unlock(&lp->lp_lock);
1223         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1224                 rc = -EPERM;
1225                 goto out;
1226         }
1227
1228         lpni = lnet_find_peer_ni_locked(nid);
1229         if (lpni) {
1230                 /*
1231                  * A peer_ni already exists. This is only a problem if
1232                  * it is not connected to this peer and was configured
1233                  * by DLC.
1234                  */
1235                 lnet_peer_ni_decref_locked(lpni);
1236                 if (lpni->lpni_peer_net->lpn_peer == lp)
1237                         goto out;
1238                 if (lnet_peer_ni_is_configured(lpni)) {
1239                         rc = -EEXIST;
1240                         goto out;
1241                 }
1242                 /* If this is the primary NID, destroy the peer. */
1243                 if (lnet_peer_ni_is_primary(lpni)) {
1244                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1245                         lpni = lnet_peer_ni_alloc(nid);
1246                         if (!lpni) {
1247                                 rc = -ENOMEM;
1248                                 goto out;
1249                         }
1250                 }
1251         } else {
1252                 lpni = lnet_peer_ni_alloc(nid);
1253                 if (!lpni) {
1254                         rc = -ENOMEM;
1255                         goto out;
1256                 }
1257         }
1258
1259         /*
1260          * Get the peer_net. Check that we're not adding a second
1261          * peer_ni on a peer_net of a non-multi-rail peer.
1262          */
1263         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1264         if (!lpn) {
1265                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1266                 if (!lpn) {
1267                         rc = -ENOMEM;
1268                         goto out_free_lpni;
1269                 }
1270         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1271                 rc = -ENOTUNIQ;
1272                 goto out_free_lpni;
1273         }
1274
1275         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1276
1277 out_free_lpni:
1278         /* If the peer_ni was allocated above its peer_net pointer is NULL */
1279         if (!lpni->lpni_peer_net)
1280                 LIBCFS_FREE(lpni, sizeof(*lpni));
1281 out:
1282         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1283                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1284                flags, rc);
1285         return rc;
1286 }
1287
1288 /*
1289  * Update the primary NID of a peer, if possible.
1290  *
1291  * Call with the lnet_api_mutex held.
1292  */
1293 static int
1294 lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1295 {
1296         lnet_nid_t old = lp->lp_primary_nid;
1297         int rc = 0;
1298
1299         if (lp->lp_primary_nid == nid)
1300                 goto out;
1301         rc = lnet_peer_add_nid(lp, nid, flags);
1302         if (rc)
1303                 goto out;
1304         lp->lp_primary_nid = nid;
1305 out:
1306         CDEBUG(D_NET, "peer %s NID %s: %d\n",
1307                libcfs_nid2str(old), libcfs_nid2str(nid), rc);
1308         return rc;
1309 }
1310
1311 /*
1312  * lpni creation initiated due to traffic either sending or receiving.
1313  */
1314 static int
1315 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1316 {
1317         struct lnet_peer *lp;
1318         struct lnet_peer_net *lpn;
1319         struct lnet_peer_ni *lpni;
1320         unsigned flags = 0;
1321         int rc = 0;
1322
1323         if (nid == LNET_NID_ANY) {
1324                 rc = -EINVAL;
1325                 goto out;
1326         }
1327
1328         /* lnet_net_lock is not needed here because ln_api_lock is held */
1329         lpni = lnet_find_peer_ni_locked(nid);
1330         if (lpni) {
1331                 /*
1332                  * We must have raced with another thread. Since we
1333                  * know next to nothing about a peer_ni created by
1334                  * traffic, we just assume everything is ok and
1335                  * return.
1336                  */
1337                 lnet_peer_ni_decref_locked(lpni);
1338                 goto out;
1339         }
1340
1341         /* Create peer, peer_net, and peer_ni. */
1342         rc = -ENOMEM;
1343         lp = lnet_peer_alloc(nid);
1344         if (!lp)
1345                 goto out;
1346         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1347         if (!lpn)
1348                 goto out_free_lp;
1349         lpni = lnet_peer_ni_alloc(nid);
1350         if (!lpni)
1351                 goto out_free_lpn;
1352         if (pref != LNET_NID_ANY)
1353                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1354
1355         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1356
1357 out_free_lpn:
1358         LIBCFS_FREE(lpn, sizeof(*lpn));
1359 out_free_lp:
1360         LIBCFS_FREE(lp, sizeof(*lp));
1361 out:
1362         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1363         return rc;
1364 }
1365
1366 /*
1367  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1368  *
1369  * This API handles the following combinations:
1370  *   Create a peer with its primary NI if only the prim_nid is provided
1371  *   Add a NID to a peer identified by the prim_nid. The peer identified
1372  *   by the prim_nid must already exist.
1373  *   The peer being created may be non-MR.
1374  *
1375  * The caller must hold ln_api_mutex. This prevents the peer from
1376  * being created/modified/deleted by a different thread.
1377  */
1378 int
1379 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1380 {
1381         struct lnet_peer *lp = NULL;
1382         struct lnet_peer_ni *lpni;
1383         unsigned flags;
1384
1385         /* The prim_nid must always be specified */
1386         if (prim_nid == LNET_NID_ANY)
1387                 return -EINVAL;
1388
1389         flags = LNET_PEER_CONFIGURED;
1390         if (mr)
1391                 flags |= LNET_PEER_MULTI_RAIL;
1392
1393         /*
1394          * If nid isn't specified, we must create a new peer with
1395          * prim_nid as its primary nid.
1396          */
1397         if (nid == LNET_NID_ANY)
1398                 return lnet_peer_add(prim_nid, flags);
1399
1400         /* Look up the prim_nid, which must exist. */
1401         lpni = lnet_find_peer_ni_locked(prim_nid);
1402         if (!lpni)
1403                 return -ENOENT;
1404         lnet_peer_ni_decref_locked(lpni);
1405         lp = lpni->lpni_peer_net->lpn_peer;
1406
1407         /* Peer must have been configured. */
1408         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1409                 CDEBUG(D_NET, "peer %s was not configured\n",
1410                        libcfs_nid2str(prim_nid));
1411                 return -ENOENT;
1412         }
1413
1414         /* Primary NID must match */
1415         if (lp->lp_primary_nid != prim_nid) {
1416                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1417                        libcfs_nid2str(prim_nid),
1418                        libcfs_nid2str(lp->lp_primary_nid));
1419                 return -ENODEV;
1420         }
1421
1422         /* Multi-Rail flag must match. */
1423         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1424                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1425                        libcfs_nid2str(prim_nid));
1426                 return -EPERM;
1427         }
1428
1429         return lnet_peer_add_nid(lp, nid, flags);
1430 }
1431
1432 /*
1433  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1434  *
1435  * This API handles the following combinations:
1436  *   Delete a NI from a peer if both prim_nid and nid are provided.
1437  *   Delete a peer if only prim_nid is provided.
1438  *   Delete a peer if its primary nid is provided.
1439  *
1440  * The caller must hold ln_api_mutex. This prevents the peer from
1441  * being modified/deleted by a different thread.
1442  */
1443 int
1444 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1445 {
1446         struct lnet_peer *lp;
1447         struct lnet_peer_ni *lpni;
1448         unsigned flags;
1449
1450         if (prim_nid == LNET_NID_ANY)
1451                 return -EINVAL;
1452
1453         lpni = lnet_find_peer_ni_locked(prim_nid);
1454         if (!lpni)
1455                 return -ENOENT;
1456         lnet_peer_ni_decref_locked(lpni);
1457         lp = lpni->lpni_peer_net->lpn_peer;
1458
1459         if (prim_nid != lp->lp_primary_nid) {
1460                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1461                        libcfs_nid2str(prim_nid),
1462                        libcfs_nid2str(lp->lp_primary_nid));
1463                 return -ENODEV;
1464         }
1465
1466         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1467                 return lnet_peer_del(lp);
1468
1469         flags = LNET_PEER_CONFIGURED;
1470         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1471                 flags |= LNET_PEER_MULTI_RAIL;
1472
1473         return lnet_peer_del_nid(lp, nid, flags);
1474 }
1475
1476 void
1477 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1478 {
1479         struct lnet_peer_table *ptable;
1480         struct lnet_peer_net *lpn;
1481
1482         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1483
1484         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1485         LASSERT(lpni->lpni_rtr_refcount == 0);
1486         LASSERT(list_empty(&lpni->lpni_txq));
1487         LASSERT(lpni->lpni_txqnob == 0);
1488
1489         lpn = lpni->lpni_peer_net;
1490         lpni->lpni_peer_net = NULL;
1491         lpni->lpni_net = NULL;
1492
1493         /* remove the peer ni from the zombie list */
1494         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1495         spin_lock(&ptable->pt_zombie_lock);
1496         list_del_init(&lpni->lpni_hashlist);
1497         ptable->pt_zombies--;
1498         spin_unlock(&ptable->pt_zombie_lock);
1499
1500         if (lpni->lpni_pref_nnids > 1) {
1501                 LIBCFS_FREE(lpni->lpni_pref.nids,
1502                         sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1503         }
1504         LIBCFS_FREE(lpni, sizeof(*lpni));
1505
1506         lnet_peer_net_decref_locked(lpn);
1507 }
1508
1509 struct lnet_peer_ni *
1510 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1511 {
1512         struct lnet_peer_ni *lpni = NULL;
1513         int rc;
1514
1515         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1516                 return ERR_PTR(-ESHUTDOWN);
1517
1518         /*
1519          * find if a peer_ni already exists.
1520          * If so then just return that.
1521          */
1522         lpni = lnet_find_peer_ni_locked(nid);
1523         if (lpni)
1524                 return lpni;
1525
1526         lnet_net_unlock(cpt);
1527
1528         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1529         if (rc) {
1530                 lpni = ERR_PTR(rc);
1531                 goto out_net_relock;
1532         }
1533
1534         lpni = lnet_find_peer_ni_locked(nid);
1535         LASSERT(lpni);
1536
1537 out_net_relock:
1538         lnet_net_lock(cpt);
1539
1540         return lpni;
1541 }
1542
1543 /*
1544  * Get a peer_ni for the given nid, create it if necessary. Takes a
1545  * hold on the peer_ni.
1546  */
1547 struct lnet_peer_ni *
1548 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1549 {
1550         struct lnet_peer_ni *lpni = NULL;
1551         int rc;
1552
1553         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1554                 return ERR_PTR(-ESHUTDOWN);
1555
1556         /*
1557          * find if a peer_ni already exists.
1558          * If so then just return that.
1559          */
1560         lpni = lnet_find_peer_ni_locked(nid);
1561         if (lpni)
1562                 return lpni;
1563
1564         /*
1565          * Slow path:
1566          * use the lnet_api_mutex to serialize the creation of the peer_ni
1567          * and the creation/deletion of the local ni/net. When a local ni is
1568          * created, if there exists a set of peer_nis on that network,
1569          * they need to be traversed and updated. When a local NI is
1570          * deleted, which could result in a network being deleted, then
1571          * all peer nis on that network need to be removed as well.
1572          *
1573          * Creation through traffic should also be serialized with
1574          * creation through DLC.
1575          */
1576         lnet_net_unlock(cpt);
1577         mutex_lock(&the_lnet.ln_api_mutex);
1578         /*
1579          * Shutdown is only set under the ln_api_lock, so a single
1580          * check here is sufficent.
1581          */
1582         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1583                 lpni = ERR_PTR(-ESHUTDOWN);
1584                 goto out_mutex_unlock;
1585         }
1586
1587         rc = lnet_peer_ni_traffic_add(nid, pref);
1588         if (rc) {
1589                 lpni = ERR_PTR(rc);
1590                 goto out_mutex_unlock;
1591         }
1592
1593         lpni = lnet_find_peer_ni_locked(nid);
1594         LASSERT(lpni);
1595
1596 out_mutex_unlock:
1597         mutex_unlock(&the_lnet.ln_api_mutex);
1598         lnet_net_lock(cpt);
1599
1600         /* Lock has been dropped, check again for shutdown. */
1601         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1602                 if (!IS_ERR(lpni))
1603                         lnet_peer_ni_decref_locked(lpni);
1604                 lpni = ERR_PTR(-ESHUTDOWN);
1605         }
1606
1607         return lpni;
1608 }
1609
1610 /*
1611  * Peer Discovery
1612  */
1613
1614 bool
1615 lnet_peer_is_uptodate(struct lnet_peer *lp)
1616 {
1617         bool rc;
1618
1619         spin_lock(&lp->lp_lock);
1620         if (lnet_peer_needs_push(lp)) {
1621                 rc = false;
1622         } else if (lp->lp_state & LNET_PEER_DISCOVERING) {
1623                 rc = false;
1624         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1625                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1626                         rc = true;
1627                 else
1628                         rc = false;
1629         } else if (lp->lp_state & LNET_PEER_UNDISCOVERED) {
1630                 if (lnet_peer_discovery_enabled)
1631                         rc = false;
1632                 else
1633                         rc = true;
1634         } else {
1635                 rc = false;
1636         }
1637         spin_unlock(&lp->lp_lock);
1638
1639         return rc;
1640 }
1641
1642 /*
1643  * Queue a peer for the attention of the discovery thread.  Call with
1644  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1645  * -EALREADY if the peer was already queued.
1646  */
1647 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1648 {
1649         int rc;
1650
1651         spin_lock(&lp->lp_lock);
1652         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1653                 lp->lp_state |= LNET_PEER_DISCOVERING;
1654         if (!(lp->lp_state & LNET_PEER_QUEUED)) {
1655                 lp->lp_state |= LNET_PEER_QUEUED;
1656                 spin_unlock(&lp->lp_lock);
1657                 lnet_peer_addref_locked(lp);
1658                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1659                 wake_up(&the_lnet.ln_dc_waitq);
1660                 rc = 0;
1661         } else {
1662                 spin_unlock(&lp->lp_lock);
1663                 rc = -EALREADY;
1664         }
1665
1666         CDEBUG(D_NET, "Queue peer %s: %d\n",
1667                libcfs_nid2str(lp->lp_primary_nid), rc);
1668
1669         return rc;
1670 }
1671
1672 /*
1673  * Discovery of a peer is complete. Wake all waiters on the peer.
1674  * Call with lnet_net_lock/EX held.
1675  */
1676 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1677 {
1678         CDEBUG(D_NET, "Dequeue peer %s\n",
1679                libcfs_nid2str(lp->lp_primary_nid));
1680
1681         spin_lock(&lp->lp_lock);
1682         LASSERT(lp->lp_state & LNET_PEER_QUEUED);
1683         lp->lp_state &= ~LNET_PEER_QUEUED;
1684         spin_unlock(&lp->lp_lock);
1685         list_del_init(&lp->lp_dc_list);
1686         wake_up_all(&lp->lp_dc_waitq);
1687         lnet_peer_decref_locked(lp);
1688 }
1689
1690 /*
1691  * Handle inbound push.
1692  * Like any event handler, called with lnet_res_lock/CPT held.
1693  */
1694 void lnet_peer_push_event(struct lnet_event *ev)
1695 {
1696         struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
1697         struct lnet_peer *lp;
1698
1699         /* lnet_find_peer() adds a refcount */
1700         lp = lnet_find_peer(ev->source.nid);
1701         if (!lp) {
1702                 CERROR("Push Put from unknown %s (source %s)\n",
1703                        libcfs_nid2str(ev->initiator.nid),
1704                        libcfs_nid2str(ev->source.nid));
1705                 return;
1706         }
1707
1708         /* Ensure peer state remains consistent while we modify it. */
1709         spin_lock(&lp->lp_lock);
1710
1711         /*
1712          * If some kind of error happened the contents of the message
1713          * cannot be used. Clear the NIDS_UPTODATE and set the
1714          * PING_REQUIRED flag to trigger a ping.
1715          */
1716         if (ev->status) {
1717                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1718                 lp->lp_state |= LNET_PEER_PING_REQUIRED;
1719                 CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n",
1720                        ev->status,
1721                        libcfs_nid2str(lp->lp_primary_nid),
1722                        libcfs_nid2str(ev->source.nid));
1723                 goto out;
1724         }
1725
1726         /*
1727          * A push with invalid or corrupted info. Clear the UPTODATE
1728          * flag to trigger a ping.
1729          */
1730         if (lnet_ping_info_validate(&pbuf->pb_info)) {
1731                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1732                 lp->lp_state |= LNET_PEER_PING_REQUIRED;
1733                 CDEBUG(D_NET, "Corrupted Push from %s\n",
1734                        libcfs_nid2str(lp->lp_primary_nid));
1735                 goto out;
1736         }
1737
1738         /*
1739          * Make sure we'll allocate the correct size ping buffer when
1740          * pinging the peer.
1741          */
1742         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
1743                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
1744
1745         /*
1746          * A non-Multi-Rail peer is not supposed to be capable of
1747          * sending a push.
1748          */
1749         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) {
1750                 CERROR("Push from non-Multi-Rail peer %s dropped\n",
1751                        libcfs_nid2str(lp->lp_primary_nid));
1752                 goto out;
1753         }
1754
1755         /*
1756          * Set the MULTIRAIL flag. Complain if the peer was DLC
1757          * configured without it. This is the one place where
1758          * discovery will override DLC.
1759          */
1760         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1761                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1762                         CERROR("Push says %s is Multi-Rail, DLC says not\n",
1763                                libcfs_nid2str(lp->lp_primary_nid));
1764                 }
1765                 lp->lp_state |= LNET_PEER_MULTI_RAIL;
1766                 lnet_peer_clr_non_mr_pref_nids(lp);
1767         }
1768
1769         /*
1770          * Check for truncation of the Put message. Clear the
1771          * NIDS_UPTODATE flag and set PING_REQUIRED to trigger a ping,
1772          * and tell discovery to allocate a bigger buffer.
1773          */
1774         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
1775                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
1776                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
1777                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1778                 lp->lp_state |= LNET_PEER_PING_REQUIRED;
1779                 CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n",
1780                        libcfs_nid2str(lp->lp_primary_nid),
1781                        pbuf->pb_info.pi_nnis);
1782                 goto out;
1783         }
1784
1785         /*
1786          * Check whether the Put data is stale. Stale data can just be
1787          * dropped.
1788          */
1789         if (pbuf->pb_info.pi_nnis > 1 &&
1790             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid &&
1791             LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
1792                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1793                        libcfs_nid2str(lp->lp_primary_nid),
1794                        LNET_PING_BUFFER_SEQNO(pbuf),
1795                        lp->lp_peer_seqno);
1796                 goto out;
1797         }
1798
1799         /*
1800          * Check whether the Put data is new, in which case we clear
1801          * the UPTODATE flag and prepare to process it.
1802          *
1803          * If the Put data is current, and the peer is UPTODATE then
1804          * we assome everything is all right and drop the data as
1805          * stale.
1806          */
1807         if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) {
1808                 lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
1809                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1810         } else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) {
1811                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1812                        libcfs_nid2str(lp->lp_primary_nid),
1813                        LNET_PING_BUFFER_SEQNO(pbuf),
1814                        lp->lp_peer_seqno);
1815                 goto out;
1816         }
1817
1818         /*
1819          * If there is data present that hasn't been processed yet,
1820          * we'll replace it if the Put contained newer data and it
1821          * fits. We're racing with a Ping or earlier Push in this
1822          * case.
1823          */
1824         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
1825                 if (LNET_PING_BUFFER_SEQNO(pbuf) >
1826                         LNET_PING_BUFFER_SEQNO(lp->lp_data) &&
1827                     pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) {
1828                         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1829                                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1830                         CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n",
1831                               libcfs_nid2str(lp->lp_primary_nid),
1832                               LNET_PING_BUFFER_SEQNO(pbuf),
1833                               LNET_PING_BUFFER_SEQNO(lp->lp_data));
1834                 }
1835                 goto out;
1836         }
1837
1838         /*
1839          * Allocate a buffer to copy the data. On a failure we drop
1840          * the Push and set PING_REQUIRED to force the discovery
1841          * thread to fix the problem by pinging the peer.
1842          */
1843         lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC);
1844         if (!lp->lp_data) {
1845                 lp->lp_state |= LNET_PEER_PING_REQUIRED;
1846                 CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n",
1847                        libcfs_nid2str(lp->lp_primary_nid),
1848                        LNET_PING_BUFFER_SEQNO(pbuf));
1849                 goto out;
1850         }
1851
1852         /* Success */
1853         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1854                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1855         lp->lp_state |= LNET_PEER_DATA_PRESENT;
1856         CDEBUG(D_NET, "Received Push %s %u\n",
1857                libcfs_nid2str(lp->lp_primary_nid),
1858                LNET_PING_BUFFER_SEQNO(pbuf));
1859
1860 out:
1861         /*
1862          * Queue the peer for discovery, and wake the discovery thread
1863          * if the peer was already queued, because its status changed.
1864          */
1865         spin_unlock(&lp->lp_lock);
1866         lnet_net_lock(LNET_LOCK_EX);
1867         if (lnet_peer_queue_for_discovery(lp))
1868                 wake_up(&the_lnet.ln_dc_waitq);
1869         /* Drop refcount from lookup */
1870         lnet_peer_decref_locked(lp);
1871         lnet_net_unlock(LNET_LOCK_EX);
1872 }
1873
1874 /*
1875  * Clear the discovery error state, unless we're already discovering
1876  * this peer, in which case the error is current.
1877  */
1878 static void lnet_peer_clear_discovery_error(struct lnet_peer *lp)
1879 {
1880         spin_lock(&lp->lp_lock);
1881         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1882                 lp->lp_dc_error = 0;
1883         spin_unlock(&lp->lp_lock);
1884 }
1885
1886 /*
1887  * Peer discovery slow path. The ln_api_mutex is held on entry, and
1888  * dropped/retaken within this function. An lnet_peer_ni is passed in
1889  * because discovery could tear down an lnet_peer.
1890  */
1891 int
1892 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt)
1893 {
1894         DEFINE_WAIT(wait);
1895         struct lnet_peer *lp;
1896         int rc = 0;
1897
1898 again:
1899         lnet_net_unlock(cpt);
1900         lnet_net_lock(LNET_LOCK_EX);
1901         lp = lpni->lpni_peer_net->lpn_peer;
1902         lnet_peer_clear_discovery_error(lp);
1903
1904         /*
1905          * We're willing to be interrupted. The lpni can become a
1906          * zombie if we race with DLC, so we must check for that.
1907          */
1908         for (;;) {
1909                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
1910                 if (signal_pending(current))
1911                         break;
1912                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
1913                         break;
1914                 if (lp->lp_dc_error)
1915                         break;
1916                 if (lnet_peer_is_uptodate(lp))
1917                         break;
1918                 lnet_peer_queue_for_discovery(lp);
1919                 lnet_peer_addref_locked(lp);
1920                 lnet_net_unlock(LNET_LOCK_EX);
1921                 schedule();
1922                 finish_wait(&lp->lp_dc_waitq, &wait);
1923                 lnet_net_lock(LNET_LOCK_EX);
1924                 lnet_peer_decref_locked(lp);
1925                 /* Peer may have changed */
1926                 lp = lpni->lpni_peer_net->lpn_peer;
1927         }
1928         finish_wait(&lp->lp_dc_waitq, &wait);
1929
1930         lnet_net_unlock(LNET_LOCK_EX);
1931         lnet_net_lock(cpt);
1932
1933         if (signal_pending(current))
1934                 rc = -EINTR;
1935         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
1936                 rc = -ESHUTDOWN;
1937         else if (lp->lp_dc_error)
1938                 rc = lp->lp_dc_error;
1939         else if (!lnet_peer_is_uptodate(lp))
1940                 goto again;
1941
1942         CDEBUG(D_NET, "peer %s NID %s: %d\n",
1943                (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"),
1944                libcfs_nid2str(lpni->lpni_nid), rc);
1945
1946         return rc;
1947 }
1948
1949 /* Handle an incoming ack for a push. */
1950 static void
1951 lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev)
1952 {
1953         struct lnet_ping_buffer *pbuf;
1954
1955         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
1956         spin_lock(&lp->lp_lock);
1957         lp->lp_state &= ~LNET_PEER_PUSH_SENT;
1958         lp->lp_push_error = ev->status;
1959         if (ev->status)
1960                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
1961         else
1962                 lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
1963         spin_unlock(&lp->lp_lock);
1964
1965         CDEBUG(D_NET, "peer %s ev->status %d\n",
1966                libcfs_nid2str(lp->lp_primary_nid), ev->status);
1967 }
1968
1969 /* Handle a Reply message. This is the reply to a Ping message. */
1970 static void
1971 lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
1972 {
1973         struct lnet_ping_buffer *pbuf;
1974         int rc;
1975
1976         spin_lock(&lp->lp_lock);
1977
1978         /*
1979          * If some kind of error happened the contents of message
1980          * cannot be used. Set PING_FAILED to trigger a retry.
1981          */
1982         if (ev->status) {
1983                 lp->lp_state |= LNET_PEER_PING_FAILED;
1984                 lp->lp_ping_error = ev->status;
1985                 CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n",
1986                        ev->status,
1987                        libcfs_nid2str(lp->lp_primary_nid),
1988                        libcfs_nid2str(ev->source.nid));
1989                 goto out;
1990         }
1991
1992         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
1993         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
1994                 lnet_swap_pinginfo(pbuf);
1995
1996         /*
1997          * A reply with invalid or corrupted info. Set PING_FAILED to
1998          * trigger a retry.
1999          */
2000         rc = lnet_ping_info_validate(&pbuf->pb_info);
2001         if (rc) {
2002                 lp->lp_state |= LNET_PEER_PING_FAILED;
2003                 lp->lp_ping_error = 0;
2004                 CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n",
2005                        libcfs_nid2str(lp->lp_primary_nid), rc);
2006                 goto out;
2007         }
2008
2009         /*
2010          * Update the MULTI_RAIL flag based on the reply. If the peer
2011          * was configured with DLC then the setting should match what
2012          * DLC put in. Once MULTIRAIL has been set it is not expected
2013          * to be unset.
2014          */
2015         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
2016                 if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2017                         /* Everything's fine */
2018                 } else if (lp->lp_state & LNET_PEER_CONFIGURED) {
2019                         CWARN("Reply says %s is Multi-Rail, DLC says not\n",
2020                               libcfs_nid2str(lp->lp_primary_nid));
2021                 }
2022                 lp->lp_state |= LNET_PEER_MULTI_RAIL;
2023                 lnet_peer_clr_non_mr_pref_nids(lp);
2024         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2025                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
2026                         CWARN("DLC says %s is Multi-Rail, Reply says not\n",
2027                               libcfs_nid2str(lp->lp_primary_nid));
2028                 } else {
2029                         CERROR("Multi-Rail state vanished from %s\n",
2030                                libcfs_nid2str(lp->lp_primary_nid));
2031                 }
2032         }
2033
2034         /*
2035          * Make sure we'll allocate the correct size ping buffer when
2036          * pinging the peer.
2037          */
2038         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
2039                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
2040
2041         /*
2042          * Check for truncation of the Reply. Clear PING_SENT and set
2043          * PING_FAILED to trigger a retry.
2044          */
2045         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
2046                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
2047                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
2048                 lp->lp_state |= LNET_PEER_PING_FAILED;
2049                 lp->lp_ping_error = 0;
2050                 CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n",
2051                        libcfs_nid2str(lp->lp_primary_nid),
2052                        pbuf->pb_info.pi_nnis);
2053                 goto out;
2054         }
2055
2056         /*
2057          * Check the sequence numbers in the reply. These are only
2058          * available if the reply came from a Multi-Rail peer.
2059          */
2060         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
2061             pbuf->pb_info.pi_nnis > 1 &&
2062             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
2063                 if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
2064                         CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n",
2065                                 libcfs_nid2str(lp->lp_primary_nid),
2066                                 LNET_PING_BUFFER_SEQNO(pbuf),
2067                                 lp->lp_peer_seqno);
2068                         goto out;
2069                 }
2070
2071                 if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno)
2072                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2073         }
2074
2075         /* We're happy with the state of the data in the buffer. */
2076         CDEBUG(D_NET, "peer %s data present %u\n",
2077                libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno);
2078         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
2079                 lnet_ping_buffer_decref(lp->lp_data);
2080         else
2081                 lp->lp_state |= LNET_PEER_DATA_PRESENT;
2082         lnet_ping_buffer_addref(pbuf);
2083         lp->lp_data = pbuf;
2084 out:
2085         lp->lp_state &= ~LNET_PEER_PING_SENT;
2086         spin_unlock(&lp->lp_lock);
2087 }
2088
2089 /*
2090  * Send event handling. Only matters for error cases, where we clean
2091  * up state on the peer and peer_ni that would otherwise be updated in
2092  * the REPLY event handler for a successful Ping, and the ACK event
2093  * handler for a successful Push.
2094  */
2095 static int
2096 lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev)
2097 {
2098         int rc = 0;
2099
2100         if (!ev->status)
2101                 goto out;
2102
2103         LASSERT(lp->lp_state & LNET_PEER_QUEUED);
2104         spin_lock(&lp->lp_lock);
2105         if (ev->msg_type == LNET_MSG_GET) {
2106                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2107                 lp->lp_state |= LNET_PEER_PING_FAILED;
2108                 lp->lp_ping_error = ev->status;
2109         } else { /* ev->msg_type == LNET_MSG_PUT */
2110                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2111                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2112                 lp->lp_push_error = ev->status;
2113         }
2114         spin_unlock(&lp->lp_lock);
2115         rc = LNET_REDISCOVER_PEER;
2116 out:
2117         CDEBUG(D_NET, "%s Send to %s: %d\n",
2118                 (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"),
2119                 libcfs_nid2str(ev->target.nid), rc);
2120         return rc;
2121 }
2122
2123 /*
2124  * Event handler for the discovery EQ.
2125  *
2126  * Called with lnet_res_lock(cpt) held. The cpt is the
2127  * lnet_cpt_of_cookie() of the md handle cookie.
2128  */
2129 static void lnet_discovery_event_handler(lnet_event_t *event)
2130 {
2131         struct lnet_peer *lp = event->md.user_ptr;
2132         struct lnet_ping_buffer *pbuf;
2133         int rc;
2134
2135         /* discovery needs to take another look */
2136         rc = LNET_REDISCOVER_PEER;
2137
2138         switch (event->type) {
2139         case LNET_EVENT_ACK:
2140                 lnet_discovery_event_ack(lp, event);
2141                 break;
2142         case LNET_EVENT_REPLY:
2143                 lnet_discovery_event_reply(lp, event);
2144                 break;
2145         case LNET_EVENT_SEND:
2146                 /* Only send failure triggers a retry. */
2147                 rc = lnet_discovery_event_send(lp, event);
2148                 break;
2149         case LNET_EVENT_UNLINK:
2150                 /* Valid event, nothing to do here. */
2151                 break;
2152         default:
2153                 /* Invalid events. */
2154                 LBUG();
2155         }
2156         lnet_net_lock(LNET_LOCK_EX);
2157         if (event->unlinked) {
2158                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
2159                 lnet_ping_buffer_decref(pbuf);
2160                 lnet_peer_decref_locked(lp);
2161         }
2162         if (rc == LNET_REDISCOVER_PEER) {
2163                 list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2164                 wake_up(&the_lnet.ln_dc_waitq);
2165         }
2166         lnet_net_unlock(LNET_LOCK_EX);
2167 }
2168
2169 /*
2170  * Build a peer from incoming data.
2171  *
2172  * The NIDs in the incoming data are supposed to be structured as follows:
2173  *  - loopback
2174  *  - primary NID
2175  *  - other NIDs in same net
2176  *  - NIDs in second net
2177  *  - NIDs in third net
2178  *  - ...
2179  * This due to the way the list of NIDs in the data is created.
2180  *
2181  * Note that this function will mark the peer uptodate unless an
2182  * ENOMEM is encontered. All other errors are due to a conflict
2183  * between the DLC configuration and what discovery sees. We treat DLC
2184  * as binding, and therefore set the NIDS_UPTODATE flag to prevent the
2185  * peer from becoming stuck in discovery.
2186  */
2187 static int lnet_peer_merge_data(struct lnet_peer *lp,
2188                                 struct lnet_ping_buffer *pbuf)
2189 {
2190         struct lnet_peer_ni *lpni;
2191         lnet_nid_t *curnis = NULL;
2192         lnet_nid_t *addnis = NULL;
2193         lnet_nid_t *delnis = NULL;
2194         unsigned flags;
2195         int ncurnis;
2196         int naddnis;
2197         int ndelnis;
2198         int nnis = 0;
2199         int i;
2200         int j;
2201         int rc;
2202
2203         flags = LNET_PEER_DISCOVERED;
2204         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2205                 flags |= LNET_PEER_MULTI_RAIL;
2206
2207         nnis = MAX(lp->lp_nnis, pbuf->pb_info.pi_nnis);
2208         LIBCFS_ALLOC(curnis, nnis * sizeof(lnet_nid_t));
2209         LIBCFS_ALLOC(addnis, nnis * sizeof(lnet_nid_t));
2210         LIBCFS_ALLOC(delnis, nnis * sizeof(lnet_nid_t));
2211         if (!curnis || !addnis || !delnis) {
2212                 rc = -ENOMEM;
2213                 goto out;
2214         }
2215         ncurnis = 0;
2216         naddnis = 0;
2217         ndelnis = 0;
2218
2219         /* Construct the list of NIDs present in peer. */
2220         lpni = NULL;
2221         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
2222                 curnis[ncurnis++] = lpni->lpni_nid;
2223
2224         /*
2225          * Check for NIDs in pbuf not present in curnis[].
2226          * The loop starts at 1 to skip the loopback NID.
2227          */
2228         for (i = 1; i < pbuf->pb_info.pi_nnis; i++) {
2229                 for (j = 0; j < ncurnis; j++)
2230                         if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
2231                                 break;
2232                 if (j == ncurnis)
2233                         addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid;
2234         }
2235         /*
2236          * Check for NIDs in curnis[] not present in pbuf.
2237          * The nested loop starts at 1 to skip the loopback NID.
2238          *
2239          * But never add the loopback NID to delnis[]: if it is
2240          * present in curnis[] then this peer is for this node.
2241          */
2242         for (i = 0; i < ncurnis; i++) {
2243                 if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND)
2244                         continue;
2245                 for (j = 1; j < pbuf->pb_info.pi_nnis; j++)
2246                         if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid)
2247                                 break;
2248                 if (j == pbuf->pb_info.pi_nnis)
2249                         delnis[ndelnis++] = curnis[i];
2250         }
2251
2252         for (i = 0; i < naddnis; i++) {
2253                 rc = lnet_peer_add_nid(lp, addnis[i], flags);
2254                 if (rc) {
2255                         CERROR("Error adding NID %s to peer %s: %d\n",
2256                                libcfs_nid2str(addnis[i]),
2257                                libcfs_nid2str(lp->lp_primary_nid), rc);
2258                         if (rc == -ENOMEM)
2259                                 goto out;
2260                 }
2261         }
2262         for (i = 0; i < ndelnis; i++) {
2263                 rc = lnet_peer_del_nid(lp, delnis[i], flags);
2264                 if (rc) {
2265                         CERROR("Error deleting NID %s from peer %s: %d\n",
2266                                libcfs_nid2str(delnis[i]),
2267                                libcfs_nid2str(lp->lp_primary_nid), rc);
2268                         if (rc == -ENOMEM)
2269                                 goto out;
2270                 }
2271         }
2272         /*
2273          * Errors other than -ENOMEM are due to peers having been
2274          * configured with DLC. Ignore these because DLC overrides
2275          * Discovery.
2276          */
2277         rc = 0;
2278 out:
2279         LIBCFS_FREE(curnis, nnis * sizeof(lnet_nid_t));
2280         LIBCFS_FREE(addnis, nnis * sizeof(lnet_nid_t));
2281         LIBCFS_FREE(delnis, nnis * sizeof(lnet_nid_t));
2282         lnet_ping_buffer_decref(pbuf);
2283         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2284
2285         if (rc) {
2286                 spin_lock(&lp->lp_lock);
2287                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2288                 lp->lp_state |= LNET_PEER_PING_REQUIRED;
2289                 spin_unlock(&lp->lp_lock);
2290         }
2291         return rc;
2292 }
2293
2294 /*
2295  * The data in pbuf says lp is its primary peer, but the data was
2296  * received by a different peer. Try to update lp with the data.
2297  */
2298 static int
2299 lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
2300 {
2301         lnet_handle_md_t mdh;
2302
2303         /* Queue lp for discovery, and force it on the request queue. */
2304         lnet_net_lock(LNET_LOCK_EX);
2305         if (lnet_peer_queue_for_discovery(lp))
2306                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2307         lnet_net_unlock(LNET_LOCK_EX);
2308
2309         LNetInvalidateHandle(&mdh);
2310
2311         /*
2312          * Decide whether we can move the peer to the DATA_PRESENT state.
2313          *
2314          * We replace stale data for a multi-rail peer, repair PING_FAILED
2315          * status, and preempt PING_REQUIRED.
2316          *
2317          * If after that we have DATA_PRESENT, we merge it into this peer.
2318          */
2319         spin_lock(&lp->lp_lock);
2320         if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2321                 if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) {
2322                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2323                 } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2324                         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2325                         lnet_ping_buffer_decref(pbuf);
2326                         pbuf = lp->lp_data;
2327                         lp->lp_data = NULL;
2328                 }
2329         }
2330         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2331                 lnet_ping_buffer_decref(lp->lp_data);
2332                 lp->lp_data = NULL;
2333                 lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2334         }
2335         if (lp->lp_state & LNET_PEER_PING_FAILED) {
2336                 mdh = lp->lp_ping_mdh;
2337                 LNetInvalidateHandle(&lp->lp_ping_mdh);
2338                 lp->lp_state &= ~LNET_PEER_PING_FAILED;
2339                 lp->lp_ping_error = 0;
2340         }
2341         if (lp->lp_state & LNET_PEER_PING_REQUIRED) {
2342                 lp->lp_state &= ~LNET_PEER_PING_REQUIRED;
2343         }
2344         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2345         spin_unlock(&lp->lp_lock);
2346
2347         if (!LNetHandleIsInvalid(mdh))
2348                 LNetMDUnlink(mdh);
2349
2350         if (pbuf)
2351                 return lnet_peer_merge_data(lp, pbuf);
2352
2353         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2354         return 0;
2355 }
2356
2357 /*
2358  * Update a peer using the data received.
2359  */
2360 static int lnet_peer_data_present(struct lnet_peer *lp)
2361 __must_hold(&lp->lp_lock)
2362 {
2363         struct lnet_ping_buffer *pbuf;
2364         struct lnet_peer_ni *lpni;
2365         lnet_nid_t nid = LNET_NID_ANY;
2366         unsigned flags;
2367         int rc = 0;
2368
2369         pbuf = lp->lp_data;
2370         lp->lp_data = NULL;
2371         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2372         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2373         spin_unlock(&lp->lp_lock);
2374
2375         /*
2376          * Modifications of peer structures are done while holding the
2377          * ln_api_mutex. A global lock is required because we may be
2378          * modifying multiple peer structures, and a mutex greatly
2379          * simplifies memory management.
2380          *
2381          * The actual changes to the data structures must also protect
2382          * against concurrent lookups, for which the lnet_net_lock in
2383          * LNET_LOCK_EX mode is used.
2384          */
2385         mutex_lock(&the_lnet.ln_api_mutex);
2386         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
2387                 rc = -ESHUTDOWN;
2388                 goto out;
2389         }
2390
2391         /*
2392          * If this peer is not on the peer list then it is being torn
2393          * down, and our reference count may be all that is keeping it
2394          * alive. Don't do any work on it.
2395          */
2396         if (list_empty(&lp->lp_peer_list))
2397                 goto out;
2398
2399         flags = LNET_PEER_DISCOVERED;
2400         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2401                 flags |= LNET_PEER_MULTI_RAIL;
2402
2403         /*
2404          * Check whether the primary NID in the message matches the
2405          * primary NID of the peer. If it does, update the peer, if
2406          * it it does not, check whether there is already a peer with
2407          * that primary NID. If no such peer exists, try to update
2408          * the primary NID of the current peer (allowed if it was
2409          * created due to message traffic) and complete the update.
2410          * If the peer did exist, hand off the data to it.
2411          *
2412          * The peer for the loopback interface is a special case: this
2413          * is the peer for the local node, and we want to set its
2414          * primary NID to the correct value here.
2415          */
2416         if (pbuf->pb_info.pi_nnis > 1)
2417                 nid = pbuf->pb_info.pi_ni[1].ns_nid;
2418         if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) {
2419                 rc = lnet_peer_set_primary_nid(lp, nid, flags);
2420                 if (!rc)
2421                         rc = lnet_peer_merge_data(lp, pbuf);
2422         } else if (lp->lp_primary_nid == nid) {
2423                 rc = lnet_peer_merge_data(lp, pbuf);
2424         } else {
2425                 lpni = lnet_find_peer_ni_locked(nid);
2426                 if (!lpni) {
2427                         rc = lnet_peer_set_primary_nid(lp, nid, flags);
2428                         if (rc) {
2429                                 CERROR("Primary NID error %s versus %s: %d\n",
2430                                        libcfs_nid2str(lp->lp_primary_nid),
2431                                        libcfs_nid2str(nid), rc);
2432                         } else {
2433                                 rc = lnet_peer_merge_data(lp, pbuf);
2434                         }
2435                 } else {
2436                         rc = lnet_peer_set_primary_data(
2437                                 lpni->lpni_peer_net->lpn_peer, pbuf);
2438                         lnet_peer_ni_decref_locked(lpni);
2439                 }
2440         }
2441 out:
2442         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2443         mutex_unlock(&the_lnet.ln_api_mutex);
2444
2445         spin_lock(&lp->lp_lock);
2446         /* Tell discovery to re-check the peer immediately. */
2447         if (!rc)
2448                 rc = LNET_REDISCOVER_PEER;
2449         return rc;
2450 }
2451
2452 /*
2453  * A ping failed. Clear the PING_FAILED state and set the
2454  * PING_REQUIRED state, to ensure a retry even if discovery is
2455  * disabled. This avoids being left with incorrect state.
2456  */
2457 static int lnet_peer_ping_failed(struct lnet_peer *lp)
2458 __must_hold(&lp->lp_lock)
2459 {
2460         lnet_handle_md_t mdh;
2461         int rc;
2462
2463         mdh = lp->lp_ping_mdh;
2464         LNetInvalidateHandle(&lp->lp_ping_mdh);
2465         lp->lp_state &= ~LNET_PEER_PING_FAILED;
2466         lp->lp_state |= LNET_PEER_PING_REQUIRED;
2467         rc = lp->lp_ping_error;
2468         lp->lp_ping_error = 0;
2469         spin_unlock(&lp->lp_lock);
2470
2471         LNetMDUnlink(mdh);
2472
2473         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2474
2475         spin_lock(&lp->lp_lock);
2476         return rc ? rc : LNET_REDISCOVER_PEER;
2477 }
2478
2479 /*
2480  * Select NID to send a Ping or Push to.
2481  */
2482 static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
2483 {
2484         struct lnet_peer_ni *lpni;
2485
2486         /* Look for a direct-connected NID for this peer. */
2487         lpni = NULL;
2488         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2489                 if (!lnet_is_peer_ni_healthy_locked(lpni))
2490                         continue;
2491                 if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
2492                         continue;
2493                 break;
2494         }
2495         if (lpni)
2496                 return lpni->lpni_nid;
2497
2498         /* Look for a routed-connected NID for this peer. */
2499         lpni = NULL;
2500         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2501                 if (!lnet_is_peer_ni_healthy_locked(lpni))
2502                         continue;
2503                 if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
2504                         continue;
2505                 break;
2506         }
2507         if (lpni)
2508                 return lpni->lpni_nid;
2509
2510         return LNET_NID_ANY;
2511 }
2512
2513 /* Active side of ping. */
2514 static int lnet_peer_send_ping(struct lnet_peer *lp)
2515 __must_hold(&lp->lp_lock)
2516 {
2517         lnet_md_t md = { NULL };
2518         lnet_process_id_t id;
2519         struct lnet_ping_buffer *pbuf;
2520         int nnis;
2521         int rc;
2522         int cpt;
2523
2524         lp->lp_state |= LNET_PEER_PING_SENT;
2525         lp->lp_state &= ~LNET_PEER_PING_REQUIRED;
2526         spin_unlock(&lp->lp_lock);
2527
2528         nnis = MAX(lp->lp_data_nnis, LNET_MIN_INTERFACES);
2529         pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
2530         if (!pbuf) {
2531                 rc = -ENOMEM;
2532                 goto fail_error;
2533         }
2534
2535         /* initialize md content */
2536         md.start     = &pbuf->pb_info;
2537         md.length    = LNET_PING_INFO_SIZE(nnis);
2538         md.threshold = 2; /* GET/REPLY */
2539         md.max_size  = 0;
2540         md.options   = LNET_MD_TRUNCATE;
2541         md.user_ptr  = lp;
2542         md.eq_handle = the_lnet.ln_dc_eqh;
2543
2544         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_ping_mdh);
2545         if (rc != 0) {
2546                 lnet_ping_buffer_decref(pbuf);
2547                 CERROR("Can't bind MD: %d\n", rc);
2548                 goto fail_error;
2549         }
2550         cpt = lnet_net_lock_current();
2551         /* Refcount for MD. */
2552         lnet_peer_addref_locked(lp);
2553         id.pid = LNET_PID_LUSTRE;
2554         id.nid = lnet_peer_select_nid(lp);
2555         lnet_net_unlock(cpt);
2556
2557         if (id.nid == LNET_NID_ANY) {
2558                 rc = -EHOSTUNREACH;
2559                 goto fail_unlink_md;
2560         }
2561
2562         rc = LNetGet(LNET_NID_ANY, lp->lp_ping_mdh, id,
2563                      LNET_RESERVED_PORTAL,
2564                      LNET_PROTO_PING_MATCHBITS, 0);
2565
2566         if (rc)
2567                 goto fail_unlink_md;
2568
2569         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2570
2571         spin_lock(&lp->lp_lock);
2572         return 0;
2573
2574 fail_unlink_md:
2575         LNetMDUnlink(lp->lp_ping_mdh);
2576         LNetInvalidateHandle(&lp->lp_ping_mdh);
2577 fail_error:
2578         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2579         /*
2580          * The errors that get us here are considered hard errors and
2581          * cause Discovery to terminate. Se we clear PING_SENT, but do
2582          * not set either PING_FAILED or PING_REQUIRED.
2583          */
2584         spin_lock(&lp->lp_lock);
2585         lp->lp_state &= ~LNET_PEER_PING_SENT;
2586         return rc;
2587 }
2588
2589 /*
2590  * This function exists because you cannot call LNetMDUnlink() from an
2591  * event handler.
2592  */
2593 static int lnet_peer_push_failed(struct lnet_peer *lp)
2594 __must_hold(&lp->lp_lock)
2595 {
2596         lnet_handle_md_t mdh;
2597         int rc;
2598
2599         mdh = lp->lp_push_mdh;
2600         LNetInvalidateHandle(&lp->lp_push_mdh);
2601         lp->lp_state &= ~LNET_PEER_PUSH_FAILED;
2602         rc = lp->lp_push_error;
2603         lp->lp_push_error = 0;
2604         spin_unlock(&lp->lp_lock);
2605
2606         LNetMDUnlink(mdh);
2607
2608         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2609         spin_lock(&lp->lp_lock);
2610         return rc ? rc : LNET_REDISCOVER_PEER;
2611 }
2612
2613 /* Active side of push. */
2614 static int lnet_peer_send_push(struct lnet_peer *lp)
2615 __must_hold(&lp->lp_lock)
2616 {
2617         struct lnet_ping_buffer *pbuf;
2618         lnet_process_id_t id;
2619         lnet_md_t md;
2620         int cpt;
2621         int rc;
2622
2623         LASSERT(lp->lp_state & LNET_PEER_MULTI_RAIL);
2624
2625         lp->lp_state |= LNET_PEER_PUSH_SENT;
2626         spin_unlock(&lp->lp_lock);
2627
2628         cpt = lnet_net_lock_current();
2629         pbuf = the_lnet.ln_ping_target;
2630         lnet_ping_buffer_addref(pbuf);
2631         lnet_net_unlock(cpt);
2632
2633         /* Push source MD */
2634         md.start     = &pbuf->pb_info;
2635         md.length    = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
2636         md.threshold = 2; /* Put/Ack */
2637         md.max_size  = 0;
2638         md.options   = 0;
2639         md.eq_handle = the_lnet.ln_dc_eqh;
2640         md.user_ptr  = lp;
2641
2642         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
2643         if (rc) {
2644                 lnet_ping_buffer_decref(pbuf);
2645                 CERROR("Can't bind push source MD: %d\n", rc);
2646                 goto fail_error;
2647         }
2648         cpt = lnet_net_lock_current();
2649         /* Refcount for MD. */
2650         lnet_peer_addref_locked(lp);
2651         id.pid = LNET_PID_LUSTRE;
2652         id.nid = lnet_peer_select_nid(lp);
2653         lnet_net_unlock(cpt);
2654
2655         if (id.nid == LNET_NID_ANY) {
2656                 rc = -EHOSTUNREACH;
2657                 goto fail_unlink;
2658         }
2659
2660         rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh,
2661                      LNET_ACK_REQ, id, LNET_RESERVED_PORTAL,
2662                      LNET_PROTO_PING_MATCHBITS, 0, 0);
2663
2664         if (rc)
2665                 goto fail_unlink;
2666
2667         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2668
2669         spin_lock(&lp->lp_lock);
2670         return 0;
2671
2672 fail_unlink:
2673         LNetMDUnlink(lp->lp_push_mdh);
2674         LNetInvalidateHandle(&lp->lp_push_mdh);
2675 fail_error:
2676         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2677         /*
2678          * The errors that get us here are considered hard errors and
2679          * cause Discovery to terminate. Se we clear PUSH_SENT, but do
2680          * not set PUSH_FAILED.
2681          */
2682         spin_lock(&lp->lp_lock);
2683         lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2684         return rc;
2685 }
2686
2687 /*
2688  * An unrecoverable error was encountered during discovery.
2689  * Set error status in peer and abort discovery.
2690  */
2691 static void lnet_peer_discovery_error(struct lnet_peer *lp, int error)
2692 {
2693         CDEBUG(D_NET, "Discovery error %s: %d\n",
2694                libcfs_nid2str(lp->lp_primary_nid), error);
2695
2696         spin_lock(&lp->lp_lock);
2697         lp->lp_dc_error = error;
2698         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2699         lp->lp_state |= LNET_PEER_UNDISCOVERED;
2700         spin_unlock(&lp->lp_lock);
2701 }
2702
2703 /*
2704  * Mark the peer as undiscovered because discovery was disabled.
2705  */
2706 static int lnet_peer_undiscovered(struct lnet_peer *lp)
2707 __must_hold(&lp->lp_lock)
2708 {
2709
2710         lp->lp_state &= ~(LNET_PEER_DISCOVERED | LNET_PEER_DISCOVERING);
2711         lp->lp_state |= LNET_PEER_UNDISCOVERED;
2712
2713         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2714
2715         return 0;
2716 }
2717
2718 /*
2719  * Mark the peer as discovered.
2720  */
2721 static int lnet_peer_discovered(struct lnet_peer *lp)
2722 __must_hold(&lp->lp_lock)
2723 {
2724         lp->lp_state |= LNET_PEER_DISCOVERED;
2725         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2726
2727         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2728
2729         return 0;
2730 }
2731
2732 /*
2733  * Wait for work to be queued or some other change that must be
2734  * attended to. Returns non-zero if the discovery thread should shut
2735  * down.
2736  */
2737 static int lnet_peer_discovery_wait_for_work(void)
2738 {
2739         int cpt;
2740         int rc = 0;
2741
2742         DEFINE_WAIT(wait);
2743
2744         cpt = lnet_net_lock_current();
2745         for (;;) {
2746                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
2747                                 TASK_INTERRUPTIBLE);
2748                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
2749                         break;
2750                 if (lnet_push_target_resize_needed())
2751                         break;
2752                 if (!list_empty(&the_lnet.ln_dc_request))
2753                         break;
2754                 lnet_net_unlock(cpt);
2755                 schedule();
2756                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
2757                 cpt = lnet_net_lock_current();
2758         }
2759         finish_wait(&the_lnet.ln_dc_waitq, &wait);
2760
2761         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
2762                 rc = -ESHUTDOWN;
2763
2764         lnet_net_unlock(cpt);
2765
2766         CDEBUG(D_NET, "woken: %d\n", rc);
2767
2768         CDEBUG(D_NET, "%d\n", rc);
2769
2770         return rc;
2771 }
2772
2773 /* The discovery thread. */
2774 static int lnet_peer_discovery(void *arg)
2775 {
2776         struct lnet_peer *lp;
2777         int rc;
2778
2779         CDEBUG(D_NET, "started\n");
2780         cfs_block_allsigs();
2781
2782         for (;;) {
2783                 if (lnet_peer_discovery_wait_for_work())
2784                         break;
2785
2786                 if (lnet_push_target_resize_needed())
2787                         lnet_push_target_resize();
2788
2789                 lnet_net_lock(LNET_LOCK_EX);
2790                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
2791                         break;
2792                 while (!list_empty(&the_lnet.ln_dc_request)) {
2793                         lp = list_first_entry(&the_lnet.ln_dc_request,
2794                                               struct lnet_peer, lp_dc_list);
2795                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
2796                         lnet_net_unlock(LNET_LOCK_EX);
2797
2798                         spin_lock(&lp->lp_lock);
2799                         CDEBUG(D_NET, "peer %s state %#x\n",
2800                                 libcfs_nid2str(lp->lp_primary_nid),
2801                                 lp->lp_state);
2802                         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
2803                                 rc = lnet_peer_data_present(lp);
2804                         else if (lp->lp_state & LNET_PEER_PING_FAILED)
2805                                 rc = lnet_peer_ping_failed(lp);
2806                         else if (lp->lp_state & LNET_PEER_PUSH_FAILED)
2807                                 rc = lnet_peer_push_failed(lp);
2808                         else if (lp->lp_state & LNET_PEER_PING_REQUIRED)
2809                                 rc = lnet_peer_send_ping(lp);
2810                         else if (!lnet_peer_discovery_enabled)
2811                                 rc = lnet_peer_undiscovered(lp);
2812                         else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
2813                                 rc = lnet_peer_send_ping(lp);
2814                         else if (lnet_peer_needs_push(lp))
2815                                 rc = lnet_peer_send_push(lp);
2816                         else
2817                                 rc = lnet_peer_discovered(lp);
2818                         CDEBUG(D_NET, "peer %s state %#x rc %d\n",
2819                                 libcfs_nid2str(lp->lp_primary_nid),
2820                                 lp->lp_state, rc);
2821                         spin_unlock(&lp->lp_lock);
2822
2823                         lnet_net_lock(LNET_LOCK_EX);
2824                         if (rc == LNET_REDISCOVER_PEER) {
2825                                 list_move(&lp->lp_dc_list,
2826                                           &the_lnet.ln_dc_request);
2827                         } else if (rc) {
2828                                 lnet_peer_discovery_error(lp, rc);
2829                         }
2830                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
2831                                 lnet_peer_discovery_complete(lp);
2832                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
2833                                 break;
2834                 }
2835                 lnet_net_unlock(LNET_LOCK_EX);
2836         }
2837
2838         CDEBUG(D_NET, "stopping\n");
2839         /*
2840          * Clean up before telling lnet_peer_discovery_stop() that
2841          * we're done. Use wake_up() below to somewhat reduce the
2842          * size of the thundering herd if there are multiple threads
2843          * waiting on discovery of a single peer.
2844          */
2845         LNetEQFree(the_lnet.ln_dc_eqh);
2846         LNetInvalidateHandle(&the_lnet.ln_dc_eqh);
2847
2848         lnet_net_lock(LNET_LOCK_EX);
2849         list_for_each_entry(lp, &the_lnet.ln_dc_request, lp_dc_list) {
2850                 lnet_peer_discovery_error(lp, -ESHUTDOWN);
2851                 lnet_peer_discovery_complete(lp);
2852         }
2853         list_for_each_entry(lp, &the_lnet.ln_dc_working, lp_dc_list) {
2854                 lnet_peer_discovery_error(lp, -ESHUTDOWN);
2855                 lnet_peer_discovery_complete(lp);
2856         }
2857         lnet_net_unlock(LNET_LOCK_EX);
2858
2859         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
2860         wake_up(&the_lnet.ln_dc_waitq);
2861
2862         CDEBUG(D_NET, "stopped\n");
2863
2864         return 0;
2865 }
2866
2867 /* ln_api_mutex is held on entry. */
2868 int lnet_peer_discovery_start(void)
2869 {
2870         struct task_struct *task;
2871         int rc;
2872
2873         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
2874                 return -EALREADY;
2875
2876         INIT_LIST_HEAD(&the_lnet.ln_dc_request);
2877         INIT_LIST_HEAD(&the_lnet.ln_dc_working);
2878         init_waitqueue_head(&the_lnet.ln_dc_waitq);
2879
2880         rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
2881         if (rc != 0) {
2882                 CERROR("Can't allocate discovery EQ: %d\n", rc);
2883                 return rc;
2884         }
2885
2886         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
2887         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
2888         if (IS_ERR(task)) {
2889                 rc = PTR_ERR(task);
2890                 CERROR("Can't start peer discovery thread: %d\n", rc);
2891
2892                 LNetEQFree(the_lnet.ln_dc_eqh);
2893                 LNetInvalidateHandle(&the_lnet.ln_dc_eqh);
2894
2895                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
2896         }
2897
2898         CDEBUG(D_NET, "discovery start: %d\n", rc);
2899
2900         return rc;
2901 }
2902
2903 /* ln_api_mutex is held on entry. */
2904 void lnet_peer_discovery_stop(void)
2905 {
2906         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
2907                 return;
2908
2909         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
2910         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
2911         wake_up(&the_lnet.ln_dc_waitq);
2912
2913         wait_event(the_lnet.ln_dc_waitq,
2914                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
2915
2916         LASSERT(list_empty(&the_lnet.ln_dc_request));
2917         LASSERT(list_empty(&the_lnet.ln_dc_working));
2918
2919         CDEBUG(D_NET, "discovery stopped\n");
2920 }
2921
2922 /* Debugging */
2923
2924 void
2925 lnet_debug_peer(lnet_nid_t nid)
2926 {
2927         char                    *aliveness = "NA";
2928         struct lnet_peer_ni     *lp;
2929         int                     cpt;
2930
2931         cpt = lnet_cpt_of_nid(nid, NULL);
2932         lnet_net_lock(cpt);
2933
2934         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
2935         if (IS_ERR(lp)) {
2936                 lnet_net_unlock(cpt);
2937                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
2938                 return;
2939         }
2940
2941         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
2942                 aliveness = lp->lpni_alive ? "up" : "down";
2943
2944         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
2945                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
2946                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
2947                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
2948                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
2949
2950         lnet_peer_ni_decref_locked(lp);
2951
2952         lnet_net_unlock(cpt);
2953 }
2954
2955 /* Gathering information for userspace. */
2956
2957 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
2958                           char aliveness[LNET_MAX_STR_LEN],
2959                           __u32 *cpt_iter, __u32 *refcount,
2960                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
2961                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
2962                           __u32 *peer_tx_qnob)
2963 {
2964         struct lnet_peer_table          *peer_table;
2965         struct lnet_peer_ni             *lp;
2966         int                             j;
2967         int                             lncpt;
2968         bool                            found = false;
2969
2970         /* get the number of CPTs */
2971         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
2972
2973         /* if the cpt number to be examined is >= the number of cpts in
2974          * the system then indicate that there are no more cpts to examin
2975          */
2976         if (*cpt_iter >= lncpt)
2977                 return -ENOENT;
2978
2979         /* get the current table */
2980         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
2981         /* if the ptable is NULL then there are no more cpts to examine */
2982         if (peer_table == NULL)
2983                 return -ENOENT;
2984
2985         lnet_net_lock(*cpt_iter);
2986
2987         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
2988                 struct list_head *peers = &peer_table->pt_hash[j];
2989
2990                 list_for_each_entry(lp, peers, lpni_hashlist) {
2991                         if (peer_index-- > 0)
2992                                 continue;
2993
2994                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
2995                         if (lnet_isrouter(lp) ||
2996                                 lnet_peer_aliveness_enabled(lp))
2997                                 snprintf(aliveness, LNET_MAX_STR_LEN,
2998                                          lp->lpni_alive ? "up" : "down");
2999
3000                         *nid = lp->lpni_nid;
3001                         *refcount = atomic_read(&lp->lpni_refcount);
3002                         *ni_peer_tx_credits =
3003                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
3004                         *peer_tx_credits = lp->lpni_txcredits;
3005                         *peer_rtr_credits = lp->lpni_rtrcredits;
3006                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
3007                         *peer_tx_qnob = lp->lpni_txqnob;
3008
3009                         found = true;
3010                 }
3011
3012         }
3013         lnet_net_unlock(*cpt_iter);
3014
3015         *cpt_iter = lncpt;
3016
3017         return found ? 0 : -ENOENT;
3018 }
3019
3020 /* ln_api_mutex is held, which keeps the peer list stable */
3021 int lnet_get_peer_info(__u32 idx, lnet_nid_t *primary_nid, lnet_nid_t *nid,
3022                        bool *mr, struct lnet_peer_ni_credit_info *peer_ni_info,
3023                        struct lnet_ioctl_element_stats *peer_ni_stats)
3024 {
3025         struct lnet_peer_ni *lpni = NULL;
3026         struct lnet_peer_net *lpn = NULL;
3027         struct lnet_peer *lp = NULL;
3028
3029         lpni = lnet_get_peer_ni_idx_locked(idx, &lpn, &lp);
3030
3031         if (!lpni)
3032                 return -ENOENT;
3033
3034         *primary_nid = lp->lp_primary_nid;
3035         *mr = lnet_peer_is_multi_rail(lp);
3036         *nid = lpni->lpni_nid;
3037         snprintf(peer_ni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
3038         if (lnet_isrouter(lpni) ||
3039                 lnet_peer_aliveness_enabled(lpni))
3040                 snprintf(peer_ni_info->cr_aliveness, LNET_MAX_STR_LEN,
3041                          lpni->lpni_alive ? "up" : "down");
3042
3043         peer_ni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
3044         peer_ni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
3045                 lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
3046         peer_ni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
3047         peer_ni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
3048         peer_ni_info->cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
3049         peer_ni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
3050         peer_ni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
3051
3052         peer_ni_stats->send_count = atomic_read(&lpni->lpni_stats.send_count);
3053         peer_ni_stats->recv_count = atomic_read(&lpni->lpni_stats.recv_count);
3054         peer_ni_stats->drop_count = atomic_read(&lpni->lpni_stats.drop_count);
3055
3056         return 0;
3057 }