Whamcloud - gitweb
LU-11551 lnet: Do not allow deleting of router nis
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <linux/sched.h>
38 #ifdef HAVE_SCHED_HEADERS
39 #include <linux/sched/signal.h>
40 #endif
41 #include <linux/uaccess.h>
42
43 #include <lnet/lib-lnet.h>
44 #include <uapi/linux/lnet/lnet-dlc.h>
45
46 /* Value indicating that recovery needs to re-check a peer immediately. */
47 #define LNET_REDISCOVER_PEER    (1)
48
49 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp);
50
51 static void
52 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
53 {
54         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
55                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
56                 lnet_peer_ni_decref_locked(lpni);
57         }
58 }
59
60 void
61 lnet_peer_net_added(struct lnet_net *net)
62 {
63         struct lnet_peer_ni *lpni, *tmp;
64
65         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
66                                  lpni_on_remote_peer_ni_list) {
67
68                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
69                         lpni->lpni_net = net;
70
71                         spin_lock(&lpni->lpni_lock);
72                         lpni->lpni_txcredits =
73                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
74                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
75                         lpni->lpni_rtrcredits =
76                                 lnet_peer_buffer_credits(lpni->lpni_net);
77                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
78                         spin_unlock(&lpni->lpni_lock);
79
80                         lnet_peer_remove_from_remote_list(lpni);
81                 }
82         }
83 }
84
85 static void
86 lnet_peer_tables_destroy(void)
87 {
88         struct lnet_peer_table  *ptable;
89         struct list_head        *hash;
90         int                     i;
91         int                     j;
92
93         if (!the_lnet.ln_peer_tables)
94                 return;
95
96         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
97                 hash = ptable->pt_hash;
98                 if (!hash) /* not intialized */
99                         break;
100
101                 LASSERT(list_empty(&ptable->pt_zombie_list));
102
103                 ptable->pt_hash = NULL;
104                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
105                         LASSERT(list_empty(&hash[j]));
106
107                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
108         }
109
110         cfs_percpt_free(the_lnet.ln_peer_tables);
111         the_lnet.ln_peer_tables = NULL;
112 }
113
114 int
115 lnet_peer_tables_create(void)
116 {
117         struct lnet_peer_table  *ptable;
118         struct list_head        *hash;
119         int                     i;
120         int                     j;
121
122         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
123                                                    sizeof(*ptable));
124         if (the_lnet.ln_peer_tables == NULL) {
125                 CERROR("Failed to allocate cpu-partition peer tables\n");
126                 return -ENOMEM;
127         }
128
129         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
130                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
131                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
132                 if (hash == NULL) {
133                         CERROR("Failed to create peer hash table\n");
134                         lnet_peer_tables_destroy();
135                         return -ENOMEM;
136                 }
137
138                 spin_lock_init(&ptable->pt_zombie_lock);
139                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
140
141                 INIT_LIST_HEAD(&ptable->pt_peer_list);
142
143                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
144                         INIT_LIST_HEAD(&hash[j]);
145                 ptable->pt_hash = hash; /* sign of initialization */
146         }
147
148         return 0;
149 }
150
151 static struct lnet_peer_ni *
152 lnet_peer_ni_alloc(lnet_nid_t nid)
153 {
154         struct lnet_peer_ni *lpni;
155         struct lnet_net *net;
156         int cpt;
157
158         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
159
160         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
161         if (!lpni)
162                 return NULL;
163
164         INIT_LIST_HEAD(&lpni->lpni_txq);
165         INIT_LIST_HEAD(&lpni->lpni_hashlist);
166         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
167         INIT_LIST_HEAD(&lpni->lpni_recovery);
168         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
169         LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
170
171         spin_lock_init(&lpni->lpni_lock);
172
173         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
174         lpni->lpni_last_alive = ktime_get_seconds(); /* assumes alive */
175         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
176         lpni->lpni_nid = nid;
177         lpni->lpni_cpt = cpt;
178         atomic_set(&lpni->lpni_healthv, LNET_MAX_HEALTH_VALUE);
179
180         net = lnet_get_net_locked(LNET_NIDNET(nid));
181         lpni->lpni_net = net;
182         if (net) {
183                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
184                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
185                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
186                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
187         } else {
188                 /*
189                  * This peer_ni is not on a local network, so we
190                  * cannot add the credits here. In case the net is
191                  * added later, add the peer_ni to the remote peer ni
192                  * list so it can be easily found and revisited.
193                  */
194                 /* FIXME: per-net implementation instead? */
195                 atomic_inc(&lpni->lpni_refcount);
196                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
197                               &the_lnet.ln_remote_peer_ni_list);
198         }
199
200         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
201
202         return lpni;
203 }
204
205 static struct lnet_peer_net *
206 lnet_peer_net_alloc(__u32 net_id)
207 {
208         struct lnet_peer_net *lpn;
209
210         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
211         if (!lpn)
212                 return NULL;
213
214         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
215         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
216         lpn->lpn_net_id = net_id;
217
218         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
219
220         return lpn;
221 }
222
223 void
224 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
225 {
226         struct lnet_peer *lp;
227
228         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
229
230         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
231         LASSERT(list_empty(&lpn->lpn_peer_nis));
232         LASSERT(list_empty(&lpn->lpn_peer_nets));
233         lp = lpn->lpn_peer;
234         lpn->lpn_peer = NULL;
235         LIBCFS_FREE(lpn, sizeof(*lpn));
236
237         lnet_peer_decref_locked(lp);
238 }
239
240 static struct lnet_peer *
241 lnet_peer_alloc(lnet_nid_t nid)
242 {
243         struct lnet_peer *lp;
244
245         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
246         if (!lp)
247                 return NULL;
248
249         INIT_LIST_HEAD(&lp->lp_rtrq);
250         INIT_LIST_HEAD(&lp->lp_routes);
251         INIT_LIST_HEAD(&lp->lp_peer_list);
252         INIT_LIST_HEAD(&lp->lp_peer_nets);
253         INIT_LIST_HEAD(&lp->lp_dc_list);
254         INIT_LIST_HEAD(&lp->lp_dc_pendq);
255         INIT_LIST_HEAD(&lp->lp_rtr_list);
256         init_waitqueue_head(&lp->lp_dc_waitq);
257         spin_lock_init(&lp->lp_lock);
258         lp->lp_primary_nid = nid;
259         /*
260          * Turn off discovery for loopback peer. If you're creating a peer
261          * for the loopback interface then that was initiated when we
262          * attempted to send a message over the loopback. There is no need
263          * to ever use a different interface when sending messages to
264          * myself.
265          */
266         if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
267                 lp->lp_state = LNET_PEER_NO_DISCOVERY;
268         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
269
270         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
271
272         return lp;
273 }
274
275 void
276 lnet_destroy_peer_locked(struct lnet_peer *lp)
277 {
278         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
279
280         LASSERT(atomic_read(&lp->lp_refcount) == 0);
281         LASSERT(lp->lp_rtr_refcount == 0);
282         LASSERT(list_empty(&lp->lp_peer_nets));
283         LASSERT(list_empty(&lp->lp_peer_list));
284         LASSERT(list_empty(&lp->lp_dc_list));
285
286         if (lp->lp_data)
287                 lnet_ping_buffer_decref(lp->lp_data);
288
289         /*
290          * if there are messages still on the pending queue, then make
291          * sure to queue them on the ln_msg_resend list so they can be
292          * resent at a later point if the discovery thread is still
293          * running.
294          * If the discovery thread has stopped, then the wakeup will be a
295          * no-op, and it is expected the lnet_shutdown_lndnets() will
296          * eventually be called, which will traverse this list and
297          * finalize the messages on the list.
298          * We can not resend them now because we're holding the cpt lock.
299          * Releasing the lock can cause an inconsistent state
300          */
301         spin_lock(&the_lnet.ln_msg_resend_lock);
302         spin_lock(&lp->lp_lock);
303         list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
304         spin_unlock(&lp->lp_lock);
305         spin_unlock(&the_lnet.ln_msg_resend_lock);
306         wake_up(&the_lnet.ln_dc_waitq);
307
308         LIBCFS_FREE(lp, sizeof(*lp));
309 }
310
311 /*
312  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
313  * that peer_net, detach the peer_net from the peer.
314  *
315  * Call with lnet_net_lock/EX held
316  */
317 static void
318 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
319 {
320         struct lnet_peer_table *ptable;
321         struct lnet_peer_net *lpn;
322         struct lnet_peer *lp;
323
324         /*
325          * Belts and suspenders: gracefully handle teardown of a
326          * partially connected peer_ni.
327          */
328         lpn = lpni->lpni_peer_net;
329
330         list_del_init(&lpni->lpni_peer_nis);
331         /*
332          * If there are no lpni's left, we detach lpn from
333          * lp_peer_nets, so it cannot be found anymore.
334          */
335         if (list_empty(&lpn->lpn_peer_nis))
336                 list_del_init(&lpn->lpn_peer_nets);
337
338         /* Update peer NID count. */
339         lp = lpn->lpn_peer;
340         lp->lp_nnis--;
341
342         /*
343          * If there are no more peer nets, make the peer unfindable
344          * via the peer_tables.
345          *
346          * Otherwise, if the peer is DISCOVERED, tell discovery to
347          * take another look at it. This is a no-op if discovery for
348          * this peer did the detaching.
349          */
350         if (list_empty(&lp->lp_peer_nets)) {
351                 list_del_init(&lp->lp_peer_list);
352                 ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
353                 ptable->pt_peers--;
354         } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) {
355                 /* Discovery isn't running, nothing to do here. */
356         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
357                 lnet_peer_queue_for_discovery(lp);
358                 wake_up(&the_lnet.ln_dc_waitq);
359         }
360         CDEBUG(D_NET, "peer %s NID %s\n",
361                 libcfs_nid2str(lp->lp_primary_nid),
362                 libcfs_nid2str(lpni->lpni_nid));
363 }
364
365 /* called with lnet_net_lock LNET_LOCK_EX held */
366 static int
367 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
368 {
369         struct lnet_peer_table *ptable = NULL;
370
371         /* don't remove a peer_ni if it's also a gateway */
372         if (lnet_isrouter(lpni)) {
373                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
374                        libcfs_nid2str(lpni->lpni_nid));
375                 return -EBUSY;
376         }
377
378         lnet_peer_remove_from_remote_list(lpni);
379
380         /* remove peer ni from the hash list. */
381         list_del_init(&lpni->lpni_hashlist);
382
383         /*
384          * indicate the peer is being deleted so the monitor thread can
385          * remove it from the recovery queue.
386          */
387         spin_lock(&lpni->lpni_lock);
388         lpni->lpni_state |= LNET_PEER_NI_DELETING;
389         spin_unlock(&lpni->lpni_lock);
390
391         /* decrement the ref count on the peer table */
392         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
393         LASSERT(ptable->pt_number > 0);
394         ptable->pt_number--;
395
396         /*
397          * The peer_ni can no longer be found with a lookup. But there
398          * can be current users, so keep track of it on the zombie
399          * list until the reference count has gone to zero.
400          *
401          * The last reference may be lost in a place where the
402          * lnet_net_lock locks only a single cpt, and that cpt may not
403          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
404          * has its own lock.
405          */
406         spin_lock(&ptable->pt_zombie_lock);
407         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
408         ptable->pt_zombies++;
409         spin_unlock(&ptable->pt_zombie_lock);
410
411         /* no need to keep this peer_ni on the hierarchy anymore */
412         lnet_peer_detach_peer_ni_locked(lpni);
413
414         /* remove hashlist reference on peer_ni */
415         lnet_peer_ni_decref_locked(lpni);
416
417         return 0;
418 }
419
420 void lnet_peer_uninit(void)
421 {
422         struct lnet_peer_ni *lpni, *tmp;
423
424         lnet_net_lock(LNET_LOCK_EX);
425
426         /* remove all peer_nis from the remote peer and the hash list */
427         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
428                                  lpni_on_remote_peer_ni_list)
429                 lnet_peer_ni_del_locked(lpni);
430
431         lnet_peer_tables_destroy();
432
433         lnet_net_unlock(LNET_LOCK_EX);
434 }
435
436 static int
437 lnet_peer_del_locked(struct lnet_peer *peer)
438 {
439         struct lnet_peer_ni *lpni = NULL, *lpni2;
440         int rc = 0, rc2 = 0;
441
442         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
443
444         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
445         while (lpni != NULL) {
446                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
447                 rc = lnet_peer_ni_del_locked(lpni);
448                 if (rc != 0)
449                         rc2 = rc;
450                 lpni = lpni2;
451         }
452
453         return rc2;
454 }
455
456 static int
457 lnet_peer_del(struct lnet_peer *peer)
458 {
459         lnet_net_lock(LNET_LOCK_EX);
460         lnet_peer_del_locked(peer);
461         lnet_net_unlock(LNET_LOCK_EX);
462
463         return 0;
464 }
465
466 /*
467  * Delete a NID from a peer. Call with ln_api_mutex held.
468  *
469  * Error codes:
470  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
471  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
472  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
473  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
474  */
475 static int
476 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
477 {
478         struct lnet_peer_ni *lpni;
479         lnet_nid_t primary_nid = lp->lp_primary_nid;
480         int rc = 0;
481
482         if (!(flags & LNET_PEER_CONFIGURED)) {
483                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
484                         rc = -EPERM;
485                         goto out;
486                 }
487         }
488         lpni = lnet_find_peer_ni_locked(nid);
489         if (!lpni) {
490                 rc = -ENOENT;
491                 goto out;
492         }
493         lnet_peer_ni_decref_locked(lpni);
494         if (lp != lpni->lpni_peer_net->lpn_peer) {
495                 rc = -ECHILD;
496                 goto out;
497         }
498
499         /*
500          * This function only allows deletion of the primary NID if it
501          * is the only NID.
502          */
503         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
504                 rc = -EBUSY;
505                 goto out;
506         }
507
508         lnet_net_lock(LNET_LOCK_EX);
509
510         rc = lnet_peer_ni_del_locked(lpni);
511
512         lnet_net_unlock(LNET_LOCK_EX);
513
514 out:
515         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
516                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
517
518         return rc;
519 }
520
521 static void
522 lnet_peer_table_cleanup_locked(struct lnet_net *net,
523                                struct lnet_peer_table *ptable)
524 {
525         int                      i;
526         struct lnet_peer_ni     *next;
527         struct lnet_peer_ni     *lpni;
528         struct lnet_peer        *peer;
529
530         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
531                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
532                                          lpni_hashlist) {
533                         if (net != NULL && net != lpni->lpni_net)
534                                 continue;
535
536                         peer = lpni->lpni_peer_net->lpn_peer;
537                         if (peer->lp_primary_nid != lpni->lpni_nid) {
538                                 lnet_peer_ni_del_locked(lpni);
539                                 continue;
540                         }
541                         /*
542                          * Removing the primary NID implies removing
543                          * the entire peer. Advance next beyond any
544                          * peer_ni that belongs to the same peer.
545                          */
546                         list_for_each_entry_from(next, &ptable->pt_hash[i],
547                                                  lpni_hashlist) {
548                                 if (next->lpni_peer_net->lpn_peer != peer)
549                                         break;
550                         }
551                         lnet_peer_del_locked(peer);
552                 }
553         }
554 }
555
556 static void
557 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
558 {
559         int     i = 3;
560
561         spin_lock(&ptable->pt_zombie_lock);
562         while (ptable->pt_zombies) {
563                 spin_unlock(&ptable->pt_zombie_lock);
564
565                 if (is_power_of_2(i)) {
566                         CDEBUG(D_WARNING,
567                                "Waiting for %d zombies on peer table\n",
568                                ptable->pt_zombies);
569                 }
570                 set_current_state(TASK_UNINTERRUPTIBLE);
571                 schedule_timeout(cfs_time_seconds(1) >> 1);
572                 spin_lock(&ptable->pt_zombie_lock);
573         }
574         spin_unlock(&ptable->pt_zombie_lock);
575 }
576
577 static void
578 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
579                                 struct lnet_peer_table *ptable)
580 {
581         struct lnet_peer_ni     *lp;
582         struct lnet_peer_ni     *tmp;
583         lnet_nid_t              gw_nid;
584         int                     i;
585
586         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
587                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
588                                          lpni_hashlist) {
589                         if (net != lp->lpni_net)
590                                 continue;
591
592                         if (!lnet_isrouter(lp))
593                                 continue;
594
595                         gw_nid = lp->lpni_peer_net->lpn_peer->lp_primary_nid;
596
597                         lnet_net_unlock(LNET_LOCK_EX);
598                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), gw_nid);
599                         lnet_net_lock(LNET_LOCK_EX);
600                 }
601         }
602 }
603
604 void
605 lnet_peer_tables_cleanup(struct lnet_net *net)
606 {
607         int i;
608         struct lnet_peer_table *ptable;
609
610         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
611         /* If just deleting the peers for a NI, get rid of any routes these
612          * peers are gateways for. */
613         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
614                 lnet_net_lock(LNET_LOCK_EX);
615                 lnet_peer_table_del_rtrs_locked(net, ptable);
616                 lnet_net_unlock(LNET_LOCK_EX);
617         }
618
619         /* Start the cleanup process */
620         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
621                 lnet_net_lock(LNET_LOCK_EX);
622                 lnet_peer_table_cleanup_locked(net, ptable);
623                 lnet_net_unlock(LNET_LOCK_EX);
624         }
625
626         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
627                 lnet_peer_ni_finalize_wait(ptable);
628 }
629
630 static struct lnet_peer_ni *
631 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
632 {
633         struct list_head        *peers;
634         struct lnet_peer_ni     *lp;
635
636         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
637
638         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
639         list_for_each_entry(lp, peers, lpni_hashlist) {
640                 if (lp->lpni_nid == nid) {
641                         lnet_peer_ni_addref_locked(lp);
642                         return lp;
643                 }
644         }
645
646         return NULL;
647 }
648
649 struct lnet_peer_ni *
650 lnet_find_peer_ni_locked(lnet_nid_t nid)
651 {
652         struct lnet_peer_ni *lpni;
653         struct lnet_peer_table *ptable;
654         int cpt;
655
656         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
657
658         ptable = the_lnet.ln_peer_tables[cpt];
659         lpni = lnet_get_peer_ni_locked(ptable, nid);
660
661         return lpni;
662 }
663
664 struct lnet_peer *
665 lnet_find_peer(lnet_nid_t nid)
666 {
667         struct lnet_peer_ni *lpni;
668         struct lnet_peer *lp = NULL;
669         int cpt;
670
671         cpt = lnet_net_lock_current();
672         lpni = lnet_find_peer_ni_locked(nid);
673         if (lpni) {
674                 lp = lpni->lpni_peer_net->lpn_peer;
675                 lnet_peer_addref_locked(lp);
676                 lnet_peer_ni_decref_locked(lpni);
677         }
678         lnet_net_unlock(cpt);
679
680         return lp;
681 }
682
683 struct lnet_peer_ni *
684 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
685                              struct lnet_peer_net *peer_net,
686                              struct lnet_peer_ni *prev)
687 {
688         struct lnet_peer_ni *lpni;
689         struct lnet_peer_net *net = peer_net;
690
691         if (!prev) {
692                 if (!net) {
693                         if (list_empty(&peer->lp_peer_nets))
694                                 return NULL;
695
696                         net = list_entry(peer->lp_peer_nets.next,
697                                          struct lnet_peer_net,
698                                          lpn_peer_nets);
699                 }
700                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
701                                   lpni_peer_nis);
702
703                 return lpni;
704         }
705
706         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
707                 /*
708                  * if you reached the end of the peer ni list and the peer
709                  * net is specified then there are no more peer nis in that
710                  * net.
711                  */
712                 if (net)
713                         return NULL;
714
715                 /*
716                  * we reached the end of this net ni list. move to the
717                  * next net
718                  */
719                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
720                     &peer->lp_peer_nets)
721                         /* no more nets and no more NIs. */
722                         return NULL;
723
724                 /* get the next net */
725                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
726                                  struct lnet_peer_net,
727                                  lpn_peer_nets);
728                 /* get the ni on it */
729                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
730                                   lpni_peer_nis);
731
732                 return lpni;
733         }
734
735         /* there are more nis left */
736         lpni = list_entry(prev->lpni_peer_nis.next,
737                           struct lnet_peer_ni, lpni_peer_nis);
738
739         return lpni;
740 }
741
742 /* Call with the ln_api_mutex held */
743 int lnet_get_peer_list(u32 *countp, u32 *sizep, struct lnet_process_id __user *ids)
744 {
745         struct lnet_process_id id;
746         struct lnet_peer_table *ptable;
747         struct lnet_peer *lp;
748         __u32 count = 0;
749         __u32 size = 0;
750         int lncpt;
751         int cpt;
752         __u32 i;
753         int rc;
754
755         rc = -ESHUTDOWN;
756         if (the_lnet.ln_state != LNET_STATE_RUNNING)
757                 goto done;
758
759         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
760
761         /*
762          * Count the number of peers, and return E2BIG if the buffer
763          * is too small. We'll also return the desired size.
764          */
765         rc = -E2BIG;
766         for (cpt = 0; cpt < lncpt; cpt++) {
767                 ptable = the_lnet.ln_peer_tables[cpt];
768                 count += ptable->pt_peers;
769         }
770         size = count * sizeof(*ids);
771         if (size > *sizep)
772                 goto done;
773
774         /*
775          * Walk the peer lists and copy out the primary nids.
776          * This is safe because the peer lists are only modified
777          * while the ln_api_mutex is held. So we don't need to
778          * hold the lnet_net_lock as well, and can therefore
779          * directly call copy_to_user().
780          */
781         rc = -EFAULT;
782         memset(&id, 0, sizeof(id));
783         id.pid = LNET_PID_LUSTRE;
784         i = 0;
785         for (cpt = 0; cpt < lncpt; cpt++) {
786                 ptable = the_lnet.ln_peer_tables[cpt];
787                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
788                         if (i >= count)
789                                 goto done;
790                         id.nid = lp->lp_primary_nid;
791                         if (copy_to_user(&ids[i], &id, sizeof(id)))
792                                 goto done;
793                         i++;
794                 }
795         }
796         rc = 0;
797 done:
798         *countp = count;
799         *sizep = size;
800         return rc;
801 }
802
803 /*
804  * Start pushes to peers that need to be updated for a configuration
805  * change on this node.
806  */
807 void
808 lnet_push_update_to_peers(int force)
809 {
810         struct lnet_peer_table *ptable;
811         struct lnet_peer *lp;
812         int lncpt;
813         int cpt;
814
815         lnet_net_lock(LNET_LOCK_EX);
816         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
817         for (cpt = 0; cpt < lncpt; cpt++) {
818                 ptable = the_lnet.ln_peer_tables[cpt];
819                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
820                         if (force) {
821                                 spin_lock(&lp->lp_lock);
822                                 if (lp->lp_state & LNET_PEER_MULTI_RAIL)
823                                         lp->lp_state |= LNET_PEER_FORCE_PUSH;
824                                 spin_unlock(&lp->lp_lock);
825                         }
826                         if (lnet_peer_needs_push(lp))
827                                 lnet_peer_queue_for_discovery(lp);
828                 }
829         }
830         lnet_net_unlock(LNET_LOCK_EX);
831         wake_up(&the_lnet.ln_dc_waitq);
832 }
833
834 /*
835  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
836  * this is a preferred point-to-point path. Call with lnet_net_lock in
837  * shared mmode.
838  */
839 bool
840 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
841 {
842         int i;
843
844         if (lpni->lpni_pref_nnids == 0)
845                 return false;
846         if (lpni->lpni_pref_nnids == 1)
847                 return lpni->lpni_pref.nid == nid;
848         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
849                 if (lpni->lpni_pref.nids[i] == nid)
850                         return true;
851         }
852         return false;
853 }
854
855 /*
856  * Set a single ni as preferred, provided no preferred ni is already
857  * defined. Only to be used for non-multi-rail peer_ni.
858  */
859 int
860 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
861 {
862         int rc = 0;
863
864         spin_lock(&lpni->lpni_lock);
865         if (nid == LNET_NID_ANY) {
866                 rc = -EINVAL;
867         } else if (lpni->lpni_pref_nnids > 0) {
868                 rc = -EPERM;
869         } else if (lpni->lpni_pref_nnids == 0) {
870                 lpni->lpni_pref.nid = nid;
871                 lpni->lpni_pref_nnids = 1;
872                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
873         }
874         spin_unlock(&lpni->lpni_lock);
875
876         CDEBUG(D_NET, "peer %s nid %s: %d\n",
877                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
878         return rc;
879 }
880
881 /*
882  * Clear the preferred NID from a non-multi-rail peer_ni, provided
883  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
884  */
885 int
886 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
887 {
888         int rc = 0;
889
890         spin_lock(&lpni->lpni_lock);
891         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
892                 lpni->lpni_pref_nnids = 0;
893                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
894         } else if (lpni->lpni_pref_nnids == 0) {
895                 rc = -ENOENT;
896         } else {
897                 rc = -EPERM;
898         }
899         spin_unlock(&lpni->lpni_lock);
900
901         CDEBUG(D_NET, "peer %s: %d\n",
902                libcfs_nid2str(lpni->lpni_nid), rc);
903         return rc;
904 }
905
906 /*
907  * Clear the preferred NIDs from a non-multi-rail peer.
908  */
909 void
910 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
911 {
912         struct lnet_peer_ni *lpni = NULL;
913
914         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
915                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
916 }
917
918 int
919 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
920 {
921         lnet_nid_t *nids = NULL;
922         lnet_nid_t *oldnids = NULL;
923         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
924         int size;
925         int i;
926         int rc = 0;
927
928         if (nid == LNET_NID_ANY) {
929                 rc = -EINVAL;
930                 goto out;
931         }
932
933         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
934                 rc = -EEXIST;
935                 goto out;
936         }
937
938         /* A non-MR node may have only one preferred NI per peer_ni */
939         if (lpni->lpni_pref_nnids > 0) {
940                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
941                         rc = -EPERM;
942                         goto out;
943                 }
944         }
945
946         if (lpni->lpni_pref_nnids != 0) {
947                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
948                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
949                 if (!nids) {
950                         rc = -ENOMEM;
951                         goto out;
952                 }
953                 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
954                         if (lpni->lpni_pref.nids[i] == nid) {
955                                 LIBCFS_FREE(nids, size);
956                                 rc = -EEXIST;
957                                 goto out;
958                         }
959                         nids[i] = lpni->lpni_pref.nids[i];
960                 }
961                 nids[i] = nid;
962         }
963
964         lnet_net_lock(LNET_LOCK_EX);
965         spin_lock(&lpni->lpni_lock);
966         if (lpni->lpni_pref_nnids == 0) {
967                 lpni->lpni_pref.nid = nid;
968         } else {
969                 oldnids = lpni->lpni_pref.nids;
970                 lpni->lpni_pref.nids = nids;
971         }
972         lpni->lpni_pref_nnids++;
973         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
974         spin_unlock(&lpni->lpni_lock);
975         lnet_net_unlock(LNET_LOCK_EX);
976
977         if (oldnids) {
978                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
979                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
980         }
981 out:
982         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
983                 spin_lock(&lpni->lpni_lock);
984                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
985                 spin_unlock(&lpni->lpni_lock);
986         }
987         CDEBUG(D_NET, "peer %s nid %s: %d\n",
988                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
989         return rc;
990 }
991
992 int
993 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
994 {
995         lnet_nid_t *nids = NULL;
996         lnet_nid_t *oldnids = NULL;
997         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
998         int size;
999         int i, j;
1000         int rc = 0;
1001
1002         if (lpni->lpni_pref_nnids == 0) {
1003                 rc = -ENOENT;
1004                 goto out;
1005         }
1006
1007         if (lpni->lpni_pref_nnids == 1) {
1008                 if (lpni->lpni_pref.nid != nid) {
1009                         rc = -ENOENT;
1010                         goto out;
1011                 }
1012         } else if (lpni->lpni_pref_nnids == 2) {
1013                 if (lpni->lpni_pref.nids[0] != nid &&
1014                     lpni->lpni_pref.nids[1] != nid) {
1015                         rc = -ENOENT;
1016                         goto out;
1017                 }
1018         } else {
1019                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
1020                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
1021                 if (!nids) {
1022                         rc = -ENOMEM;
1023                         goto out;
1024                 }
1025                 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
1026                         if (lpni->lpni_pref.nids[i] != nid)
1027                                 continue;
1028                         nids[j++] = lpni->lpni_pref.nids[i];
1029                 }
1030                 /* Check if we actually removed a nid. */
1031                 if (j == lpni->lpni_pref_nnids) {
1032                         LIBCFS_FREE(nids, size);
1033                         rc = -ENOENT;
1034                         goto out;
1035                 }
1036         }
1037
1038         lnet_net_lock(LNET_LOCK_EX);
1039         spin_lock(&lpni->lpni_lock);
1040         if (lpni->lpni_pref_nnids == 1) {
1041                 lpni->lpni_pref.nid = LNET_NID_ANY;
1042         } else if (lpni->lpni_pref_nnids == 2) {
1043                 oldnids = lpni->lpni_pref.nids;
1044                 if (oldnids[0] == nid)
1045                         lpni->lpni_pref.nid = oldnids[1];
1046                 else
1047                         lpni->lpni_pref.nid = oldnids[2];
1048         } else {
1049                 oldnids = lpni->lpni_pref.nids;
1050                 lpni->lpni_pref.nids = nids;
1051         }
1052         lpni->lpni_pref_nnids--;
1053         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1054         spin_unlock(&lpni->lpni_lock);
1055         lnet_net_unlock(LNET_LOCK_EX);
1056
1057         if (oldnids) {
1058                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
1059                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
1060         }
1061 out:
1062         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1063                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
1064         return rc;
1065 }
1066
1067 lnet_nid_t
1068 lnet_peer_primary_nid_locked(lnet_nid_t nid)
1069 {
1070         struct lnet_peer_ni *lpni;
1071         lnet_nid_t primary_nid = nid;
1072
1073         lpni = lnet_find_peer_ni_locked(nid);
1074         if (lpni) {
1075                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
1076                 lnet_peer_ni_decref_locked(lpni);
1077         }
1078
1079         return primary_nid;
1080 }
1081
1082 lnet_nid_t
1083 LNetPrimaryNID(lnet_nid_t nid)
1084 {
1085         struct lnet_peer *lp;
1086         struct lnet_peer_ni *lpni;
1087         lnet_nid_t primary_nid = nid;
1088         int rc = 0;
1089         int cpt;
1090
1091         cpt = lnet_net_lock_current();
1092         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1093         if (IS_ERR(lpni)) {
1094                 rc = PTR_ERR(lpni);
1095                 goto out_unlock;
1096         }
1097         lp = lpni->lpni_peer_net->lpn_peer;
1098         while (!lnet_peer_is_uptodate(lp)) {
1099                 rc = lnet_discover_peer_locked(lpni, cpt, true);
1100                 if (rc)
1101                         goto out_decref;
1102                 lp = lpni->lpni_peer_net->lpn_peer;
1103         }
1104         primary_nid = lp->lp_primary_nid;
1105 out_decref:
1106         lnet_peer_ni_decref_locked(lpni);
1107 out_unlock:
1108         lnet_net_unlock(cpt);
1109
1110         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
1111                libcfs_nid2str(primary_nid), rc);
1112         return primary_nid;
1113 }
1114 EXPORT_SYMBOL(LNetPrimaryNID);
1115
1116 struct lnet_peer_net *
1117 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
1118 {
1119         struct lnet_peer_net *peer_net;
1120         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
1121                 if (peer_net->lpn_net_id == net_id)
1122                         return peer_net;
1123         }
1124         return NULL;
1125 }
1126
1127 /*
1128  * Attach a peer_ni to a peer_net and peer. This function assumes
1129  * peer_ni is not already attached to the peer_net/peer. The peer_ni
1130  * may be attached to a different peer, in which case it will be
1131  * properly detached first. The whole operation is done atomically.
1132  *
1133  * Always returns 0.  This is the last function called from functions
1134  * that do return an int, so returning 0 here allows the compiler to
1135  * do a tail call.
1136  */
1137 static int
1138 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
1139                                 struct lnet_peer_net *lpn,
1140                                 struct lnet_peer_ni *lpni,
1141                                 unsigned flags)
1142 {
1143         struct lnet_peer_table *ptable;
1144
1145         /* Install the new peer_ni */
1146         lnet_net_lock(LNET_LOCK_EX);
1147         /* Add peer_ni to global peer table hash, if necessary. */
1148         if (list_empty(&lpni->lpni_hashlist)) {
1149                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1150
1151                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1152                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1153                 ptable->pt_version++;
1154                 ptable->pt_number++;
1155                 /* This is the 1st refcount on lpni. */
1156                 atomic_inc(&lpni->lpni_refcount);
1157         }
1158
1159         /* Detach the peer_ni from an existing peer, if necessary. */
1160         if (lpni->lpni_peer_net) {
1161                 LASSERT(lpni->lpni_peer_net != lpn);
1162                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1163                 lnet_peer_detach_peer_ni_locked(lpni);
1164                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1165                 lpni->lpni_peer_net = NULL;
1166         }
1167
1168         /* Add peer_ni to peer_net */
1169         lpni->lpni_peer_net = lpn;
1170         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1171         lnet_peer_net_addref_locked(lpn);
1172
1173         /* Add peer_net to peer */
1174         if (!lpn->lpn_peer) {
1175                 lpn->lpn_peer = lp;
1176                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1177                 lnet_peer_addref_locked(lp);
1178         }
1179
1180         /* Add peer to global peer list, if necessary */
1181         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1182         if (list_empty(&lp->lp_peer_list)) {
1183                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1184                 ptable->pt_peers++;
1185         }
1186
1187
1188         /* Update peer state */
1189         spin_lock(&lp->lp_lock);
1190         if (flags & LNET_PEER_CONFIGURED) {
1191                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1192                         lp->lp_state |= LNET_PEER_CONFIGURED;
1193         }
1194         if (flags & LNET_PEER_MULTI_RAIL) {
1195                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1196                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1197                         lnet_peer_clr_non_mr_pref_nids(lp);
1198                 }
1199         }
1200         spin_unlock(&lp->lp_lock);
1201
1202         lp->lp_nnis++;
1203         lnet_net_unlock(LNET_LOCK_EX);
1204
1205         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1206                libcfs_nid2str(lp->lp_primary_nid),
1207                libcfs_nid2str(lpni->lpni_nid), flags);
1208
1209         return 0;
1210 }
1211
1212 /*
1213  * Create a new peer, with nid as its primary nid.
1214  *
1215  * Call with the lnet_api_mutex held.
1216  */
1217 static int
1218 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1219 {
1220         struct lnet_peer *lp;
1221         struct lnet_peer_net *lpn;
1222         struct lnet_peer_ni *lpni;
1223         int rc = 0;
1224
1225         LASSERT(nid != LNET_NID_ANY);
1226
1227         /*
1228          * No need for the lnet_net_lock here, because the
1229          * lnet_api_mutex is held.
1230          */
1231         lpni = lnet_find_peer_ni_locked(nid);
1232         if (lpni) {
1233                 /* A peer with this NID already exists. */
1234                 lp = lpni->lpni_peer_net->lpn_peer;
1235                 lnet_peer_ni_decref_locked(lpni);
1236                 /*
1237                  * This is an error if the peer was configured and the
1238                  * primary NID differs or an attempt is made to change
1239                  * the Multi-Rail flag. Otherwise the assumption is
1240                  * that an existing peer is being modified.
1241                  */
1242                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1243                         if (lp->lp_primary_nid != nid)
1244                                 rc = -EEXIST;
1245                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1246                                 rc = -EPERM;
1247                         goto out;
1248                 }
1249                 /* Delete and recreate as a configured peer. */
1250                 lnet_peer_del(lp);
1251         }
1252
1253         /* Create peer, peer_net, and peer_ni. */
1254         rc = -ENOMEM;
1255         lp = lnet_peer_alloc(nid);
1256         if (!lp)
1257                 goto out;
1258         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1259         if (!lpn)
1260                 goto out_free_lp;
1261         lpni = lnet_peer_ni_alloc(nid);
1262         if (!lpni)
1263                 goto out_free_lpn;
1264
1265         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1266
1267 out_free_lpn:
1268         LIBCFS_FREE(lpn, sizeof(*lpn));
1269 out_free_lp:
1270         LIBCFS_FREE(lp, sizeof(*lp));
1271 out:
1272         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1273                libcfs_nid2str(nid), flags, rc);
1274         return rc;
1275 }
1276
1277 /*
1278  * Add a NID to a peer. Call with ln_api_mutex held.
1279  *
1280  * Error codes:
1281  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1282  *  -EEXIST:   The NID was configured by DLC for a different peer.
1283  *  -ENOMEM:   Out of memory.
1284  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1285  *             non-multi-rail peer.
1286  */
1287 static int
1288 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1289 {
1290         struct lnet_peer_net *lpn;
1291         struct lnet_peer_ni *lpni;
1292         int rc = 0;
1293
1294         LASSERT(lp);
1295         LASSERT(nid != LNET_NID_ANY);
1296
1297         /* A configured peer can only be updated through configuration. */
1298         if (!(flags & LNET_PEER_CONFIGURED)) {
1299                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1300                         rc = -EPERM;
1301                         goto out;
1302                 }
1303         }
1304
1305         /*
1306          * The MULTI_RAIL flag can be set but not cleared, because
1307          * that would leave the peer struct in an invalid state.
1308          */
1309         if (flags & LNET_PEER_MULTI_RAIL) {
1310                 spin_lock(&lp->lp_lock);
1311                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1312                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1313                         lnet_peer_clr_non_mr_pref_nids(lp);
1314                 }
1315                 spin_unlock(&lp->lp_lock);
1316         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1317                 rc = -EPERM;
1318                 goto out;
1319         }
1320
1321         lpni = lnet_find_peer_ni_locked(nid);
1322         if (lpni) {
1323                 /*
1324                  * A peer_ni already exists. This is only a problem if
1325                  * it is not connected to this peer and was configured
1326                  * by DLC.
1327                  */
1328                 lnet_peer_ni_decref_locked(lpni);
1329                 if (lpni->lpni_peer_net->lpn_peer == lp)
1330                         goto out;
1331                 if (lnet_peer_ni_is_configured(lpni)) {
1332                         rc = -EEXIST;
1333                         goto out;
1334                 }
1335                 /* If this is the primary NID, destroy the peer. */
1336                 if (lnet_peer_ni_is_primary(lpni)) {
1337                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1338                         lpni = lnet_peer_ni_alloc(nid);
1339                         if (!lpni) {
1340                                 rc = -ENOMEM;
1341                                 goto out;
1342                         }
1343                 }
1344         } else {
1345                 lpni = lnet_peer_ni_alloc(nid);
1346                 if (!lpni) {
1347                         rc = -ENOMEM;
1348                         goto out;
1349                 }
1350         }
1351
1352         /*
1353          * Get the peer_net. Check that we're not adding a second
1354          * peer_ni on a peer_net of a non-multi-rail peer.
1355          */
1356         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1357         if (!lpn) {
1358                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1359                 if (!lpn) {
1360                         rc = -ENOMEM;
1361                         goto out_free_lpni;
1362                 }
1363         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1364                 rc = -ENOTUNIQ;
1365                 goto out_free_lpni;
1366         }
1367
1368         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1369
1370 out_free_lpni:
1371         /* If the peer_ni was allocated above its peer_net pointer is NULL */
1372         if (!lpni->lpni_peer_net)
1373                 LIBCFS_FREE(lpni, sizeof(*lpni));
1374 out:
1375         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1376                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1377                flags, rc);
1378         return rc;
1379 }
1380
1381 /*
1382  * Update the primary NID of a peer, if possible.
1383  *
1384  * Call with the lnet_api_mutex held.
1385  */
1386 static int
1387 lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1388 {
1389         lnet_nid_t old = lp->lp_primary_nid;
1390         int rc = 0;
1391
1392         if (lp->lp_primary_nid == nid)
1393                 goto out;
1394         rc = lnet_peer_add_nid(lp, nid, flags);
1395         if (rc)
1396                 goto out;
1397         lp->lp_primary_nid = nid;
1398 out:
1399         CDEBUG(D_NET, "peer %s NID %s: %d\n",
1400                libcfs_nid2str(old), libcfs_nid2str(nid), rc);
1401         return rc;
1402 }
1403
1404 /*
1405  * lpni creation initiated due to traffic either sending or receiving.
1406  */
1407 static int
1408 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1409 {
1410         struct lnet_peer *lp;
1411         struct lnet_peer_net *lpn;
1412         struct lnet_peer_ni *lpni;
1413         unsigned flags = 0;
1414         int rc = 0;
1415
1416         if (nid == LNET_NID_ANY) {
1417                 rc = -EINVAL;
1418                 goto out;
1419         }
1420
1421         /* lnet_net_lock is not needed here because ln_api_lock is held */
1422         lpni = lnet_find_peer_ni_locked(nid);
1423         if (lpni) {
1424                 /*
1425                  * We must have raced with another thread. Since we
1426                  * know next to nothing about a peer_ni created by
1427                  * traffic, we just assume everything is ok and
1428                  * return.
1429                  */
1430                 lnet_peer_ni_decref_locked(lpni);
1431                 goto out;
1432         }
1433
1434         /* Create peer, peer_net, and peer_ni. */
1435         rc = -ENOMEM;
1436         lp = lnet_peer_alloc(nid);
1437         if (!lp)
1438                 goto out;
1439         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1440         if (!lpn)
1441                 goto out_free_lp;
1442         lpni = lnet_peer_ni_alloc(nid);
1443         if (!lpni)
1444                 goto out_free_lpn;
1445         if (pref != LNET_NID_ANY)
1446                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1447
1448         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1449
1450 out_free_lpn:
1451         LIBCFS_FREE(lpn, sizeof(*lpn));
1452 out_free_lp:
1453         LIBCFS_FREE(lp, sizeof(*lp));
1454 out:
1455         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1456         return rc;
1457 }
1458
1459 /*
1460  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1461  *
1462  * This API handles the following combinations:
1463  *   Create a peer with its primary NI if only the prim_nid is provided
1464  *   Add a NID to a peer identified by the prim_nid. The peer identified
1465  *   by the prim_nid must already exist.
1466  *   The peer being created may be non-MR.
1467  *
1468  * The caller must hold ln_api_mutex. This prevents the peer from
1469  * being created/modified/deleted by a different thread.
1470  */
1471 int
1472 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1473 {
1474         struct lnet_peer *lp = NULL;
1475         struct lnet_peer_ni *lpni;
1476         unsigned flags;
1477
1478         /* The prim_nid must always be specified */
1479         if (prim_nid == LNET_NID_ANY)
1480                 return -EINVAL;
1481
1482         flags = LNET_PEER_CONFIGURED;
1483         if (mr)
1484                 flags |= LNET_PEER_MULTI_RAIL;
1485
1486         /*
1487          * If nid isn't specified, we must create a new peer with
1488          * prim_nid as its primary nid.
1489          */
1490         if (nid == LNET_NID_ANY)
1491                 return lnet_peer_add(prim_nid, flags);
1492
1493         /* Look up the prim_nid, which must exist. */
1494         lpni = lnet_find_peer_ni_locked(prim_nid);
1495         if (!lpni)
1496                 return -ENOENT;
1497         lnet_peer_ni_decref_locked(lpni);
1498         lp = lpni->lpni_peer_net->lpn_peer;
1499
1500         /* Peer must have been configured. */
1501         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1502                 CDEBUG(D_NET, "peer %s was not configured\n",
1503                        libcfs_nid2str(prim_nid));
1504                 return -ENOENT;
1505         }
1506
1507         /* Primary NID must match */
1508         if (lp->lp_primary_nid != prim_nid) {
1509                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1510                        libcfs_nid2str(prim_nid),
1511                        libcfs_nid2str(lp->lp_primary_nid));
1512                 return -ENODEV;
1513         }
1514
1515         /* Multi-Rail flag must match. */
1516         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1517                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1518                        libcfs_nid2str(prim_nid));
1519                 return -EPERM;
1520         }
1521
1522         return lnet_peer_add_nid(lp, nid, flags);
1523 }
1524
1525 /*
1526  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1527  *
1528  * This API handles the following combinations:
1529  *   Delete a NI from a peer if both prim_nid and nid are provided.
1530  *   Delete a peer if only prim_nid is provided.
1531  *   Delete a peer if its primary nid is provided.
1532  *
1533  * The caller must hold ln_api_mutex. This prevents the peer from
1534  * being modified/deleted by a different thread.
1535  */
1536 int
1537 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1538 {
1539         struct lnet_peer *lp;
1540         struct lnet_peer_ni *lpni;
1541         unsigned flags;
1542
1543         if (prim_nid == LNET_NID_ANY)
1544                 return -EINVAL;
1545
1546         lpni = lnet_find_peer_ni_locked(prim_nid);
1547         if (!lpni)
1548                 return -ENOENT;
1549         lnet_peer_ni_decref_locked(lpni);
1550         lp = lpni->lpni_peer_net->lpn_peer;
1551
1552         if (prim_nid != lp->lp_primary_nid) {
1553                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1554                        libcfs_nid2str(prim_nid),
1555                        libcfs_nid2str(lp->lp_primary_nid));
1556                 return -ENODEV;
1557         }
1558
1559         lnet_net_lock(LNET_LOCK_EX);
1560         if (lp->lp_rtr_refcount > 0) {
1561                 lnet_net_unlock(LNET_LOCK_EX);
1562                 CERROR("%s is a router. Can not be deleted\n",
1563                        libcfs_nid2str(prim_nid));
1564                 return -EBUSY;
1565         }
1566         lnet_net_unlock(LNET_LOCK_EX);
1567
1568         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1569                 return lnet_peer_del(lp);
1570
1571         flags = LNET_PEER_CONFIGURED;
1572         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1573                 flags |= LNET_PEER_MULTI_RAIL;
1574
1575         return lnet_peer_del_nid(lp, nid, flags);
1576 }
1577
1578 void
1579 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1580 {
1581         struct lnet_peer_table *ptable;
1582         struct lnet_peer_net *lpn;
1583
1584         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1585
1586         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1587         LASSERT(list_empty(&lpni->lpni_txq));
1588         LASSERT(lpni->lpni_txqnob == 0);
1589         LASSERT(list_empty(&lpni->lpni_peer_nis));
1590         LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
1591
1592         lpn = lpni->lpni_peer_net;
1593         lpni->lpni_peer_net = NULL;
1594         lpni->lpni_net = NULL;
1595
1596         /* remove the peer ni from the zombie list */
1597         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1598         spin_lock(&ptable->pt_zombie_lock);
1599         list_del_init(&lpni->lpni_hashlist);
1600         ptable->pt_zombies--;
1601         spin_unlock(&ptable->pt_zombie_lock);
1602
1603         if (lpni->lpni_pref_nnids > 1) {
1604                 LIBCFS_FREE(lpni->lpni_pref.nids,
1605                         sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1606         }
1607         LIBCFS_FREE(lpni, sizeof(*lpni));
1608
1609         lnet_peer_net_decref_locked(lpn);
1610 }
1611
1612 struct lnet_peer_ni *
1613 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1614 {
1615         struct lnet_peer_ni *lpni = NULL;
1616         int rc;
1617
1618         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1619                 return ERR_PTR(-ESHUTDOWN);
1620
1621         /*
1622          * find if a peer_ni already exists.
1623          * If so then just return that.
1624          */
1625         lpni = lnet_find_peer_ni_locked(nid);
1626         if (lpni)
1627                 return lpni;
1628
1629         lnet_net_unlock(cpt);
1630
1631         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1632         if (rc) {
1633                 lpni = ERR_PTR(rc);
1634                 goto out_net_relock;
1635         }
1636
1637         lpni = lnet_find_peer_ni_locked(nid);
1638         LASSERT(lpni);
1639
1640 out_net_relock:
1641         lnet_net_lock(cpt);
1642
1643         return lpni;
1644 }
1645
1646 /*
1647  * Get a peer_ni for the given nid, create it if necessary. Takes a
1648  * hold on the peer_ni.
1649  */
1650 struct lnet_peer_ni *
1651 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1652 {
1653         struct lnet_peer_ni *lpni = NULL;
1654         int rc;
1655
1656         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1657                 return ERR_PTR(-ESHUTDOWN);
1658
1659         /*
1660          * find if a peer_ni already exists.
1661          * If so then just return that.
1662          */
1663         lpni = lnet_find_peer_ni_locked(nid);
1664         if (lpni)
1665                 return lpni;
1666
1667         /*
1668          * Slow path:
1669          * use the lnet_api_mutex to serialize the creation of the peer_ni
1670          * and the creation/deletion of the local ni/net. When a local ni is
1671          * created, if there exists a set of peer_nis on that network,
1672          * they need to be traversed and updated. When a local NI is
1673          * deleted, which could result in a network being deleted, then
1674          * all peer nis on that network need to be removed as well.
1675          *
1676          * Creation through traffic should also be serialized with
1677          * creation through DLC.
1678          */
1679         lnet_net_unlock(cpt);
1680         mutex_lock(&the_lnet.ln_api_mutex);
1681         /*
1682          * Shutdown is only set under the ln_api_lock, so a single
1683          * check here is sufficent.
1684          */
1685         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1686                 lpni = ERR_PTR(-ESHUTDOWN);
1687                 goto out_mutex_unlock;
1688         }
1689
1690         rc = lnet_peer_ni_traffic_add(nid, pref);
1691         if (rc) {
1692                 lpni = ERR_PTR(rc);
1693                 goto out_mutex_unlock;
1694         }
1695
1696         lpni = lnet_find_peer_ni_locked(nid);
1697         LASSERT(lpni);
1698
1699 out_mutex_unlock:
1700         mutex_unlock(&the_lnet.ln_api_mutex);
1701         lnet_net_lock(cpt);
1702
1703         /* Lock has been dropped, check again for shutdown. */
1704         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1705                 if (!IS_ERR(lpni))
1706                         lnet_peer_ni_decref_locked(lpni);
1707                 lpni = ERR_PTR(-ESHUTDOWN);
1708         }
1709
1710         return lpni;
1711 }
1712
1713 /*
1714  * Peer Discovery
1715  */
1716
1717 /*
1718  * Is a peer uptodate from the point of view of discovery?
1719  *
1720  * If it is currently being processed, obviously not.
1721  * A forced Ping or Push is also handled by the discovery thread.
1722  *
1723  * Otherwise look at whether the peer needs rediscovering.
1724  */
1725 bool
1726 lnet_peer_is_uptodate(struct lnet_peer *lp)
1727 {
1728         bool rc;
1729
1730         spin_lock(&lp->lp_lock);
1731         if (lp->lp_state & (LNET_PEER_DISCOVERING |
1732                             LNET_PEER_FORCE_PING |
1733                             LNET_PEER_FORCE_PUSH)) {
1734                 rc = false;
1735         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1736                 rc = true;
1737         } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
1738                 if (lnet_peer_discovery_disabled)
1739                         rc = true;
1740                 else
1741                         rc = false;
1742         } else if (lnet_peer_needs_push(lp)) {
1743                 rc = false;
1744         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1745                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1746                         rc = true;
1747                 else
1748                         rc = false;
1749         } else {
1750                 rc = false;
1751         }
1752         spin_unlock(&lp->lp_lock);
1753
1754         return rc;
1755 }
1756
1757 /*
1758  * Queue a peer for the attention of the discovery thread.  Call with
1759  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1760  * -EALREADY if the peer was already queued.
1761  */
1762 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1763 {
1764         int rc;
1765
1766         spin_lock(&lp->lp_lock);
1767         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1768                 lp->lp_state |= LNET_PEER_DISCOVERING;
1769         spin_unlock(&lp->lp_lock);
1770         if (list_empty(&lp->lp_dc_list)) {
1771                 lnet_peer_addref_locked(lp);
1772                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1773                 wake_up(&the_lnet.ln_dc_waitq);
1774                 rc = 0;
1775         } else {
1776                 rc = -EALREADY;
1777         }
1778
1779         CDEBUG(D_NET, "Queue peer %s: %d\n",
1780                libcfs_nid2str(lp->lp_primary_nid), rc);
1781
1782         return rc;
1783 }
1784
1785 /*
1786  * Discovery of a peer is complete. Wake all waiters on the peer.
1787  * Call with lnet_net_lock/EX held.
1788  */
1789 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1790 {
1791         struct lnet_msg *msg, *tmp;
1792         int rc = 0;
1793         struct list_head pending_msgs;
1794
1795         INIT_LIST_HEAD(&pending_msgs);
1796
1797         CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n",
1798                libcfs_nid2str(lp->lp_primary_nid));
1799
1800         list_del_init(&lp->lp_dc_list);
1801         spin_lock(&lp->lp_lock);
1802         list_splice_init(&lp->lp_dc_pendq, &pending_msgs);
1803         spin_unlock(&lp->lp_lock);
1804         wake_up_all(&lp->lp_dc_waitq);
1805
1806         lnet_net_unlock(LNET_LOCK_EX);
1807
1808         /* iterate through all pending messages and send them again */
1809         list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
1810                 list_del_init(&msg->msg_list);
1811                 if (lp->lp_dc_error) {
1812                         lnet_finalize(msg, lp->lp_dc_error);
1813                         continue;
1814                 }
1815
1816                 CDEBUG(D_NET, "sending pending message %s to target %s\n",
1817                        lnet_msgtyp2str(msg->msg_type),
1818                        libcfs_id2str(msg->msg_target));
1819                 rc = lnet_send(msg->msg_src_nid_param, msg,
1820                                msg->msg_rtr_nid_param);
1821                 if (rc < 0) {
1822                         CNETERR("Error sending %s to %s: %d\n",
1823                                lnet_msgtyp2str(msg->msg_type),
1824                                libcfs_id2str(msg->msg_target), rc);
1825                         lnet_finalize(msg, rc);
1826                 }
1827         }
1828         lnet_net_lock(LNET_LOCK_EX);
1829         lnet_peer_decref_locked(lp);
1830 }
1831
1832 /*
1833  * Handle inbound push.
1834  * Like any event handler, called with lnet_res_lock/CPT held.
1835  */
1836 void lnet_peer_push_event(struct lnet_event *ev)
1837 {
1838         struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
1839         struct lnet_peer *lp;
1840
1841         /* lnet_find_peer() adds a refcount */
1842         lp = lnet_find_peer(ev->source.nid);
1843         if (!lp) {
1844                 CDEBUG(D_NET, "Push Put from unknown %s (source %s). Ignoring...\n",
1845                        libcfs_nid2str(ev->initiator.nid),
1846                        libcfs_nid2str(ev->source.nid));
1847                 return;
1848         }
1849
1850         /* Ensure peer state remains consistent while we modify it. */
1851         spin_lock(&lp->lp_lock);
1852
1853         /*
1854          * If some kind of error happened the contents of the message
1855          * cannot be used. Clear the NIDS_UPTODATE and set the
1856          * FORCE_PING flag to trigger a ping.
1857          */
1858         if (ev->status) {
1859                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1860                 lp->lp_state |= LNET_PEER_FORCE_PING;
1861                 CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n",
1862                        ev->status,
1863                        libcfs_nid2str(lp->lp_primary_nid),
1864                        libcfs_nid2str(ev->source.nid));
1865                 goto out;
1866         }
1867
1868         /*
1869          * A push with invalid or corrupted info. Clear the UPTODATE
1870          * flag to trigger a ping.
1871          */
1872         if (lnet_ping_info_validate(&pbuf->pb_info)) {
1873                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1874                 lp->lp_state |= LNET_PEER_FORCE_PING;
1875                 CDEBUG(D_NET, "Corrupted Push from %s\n",
1876                        libcfs_nid2str(lp->lp_primary_nid));
1877                 goto out;
1878         }
1879
1880         /*
1881          * Make sure we'll allocate the correct size ping buffer when
1882          * pinging the peer.
1883          */
1884         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
1885                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
1886
1887         /*
1888          * A non-Multi-Rail peer is not supposed to be capable of
1889          * sending a push.
1890          */
1891         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) {
1892                 CERROR("Push from non-Multi-Rail peer %s dropped\n",
1893                        libcfs_nid2str(lp->lp_primary_nid));
1894                 goto out;
1895         }
1896
1897         /*
1898          * Check the MULTIRAIL flag. Complain if the peer was DLC
1899          * configured without it.
1900          */
1901         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1902                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1903                         CERROR("Push says %s is Multi-Rail, DLC says not\n",
1904                                libcfs_nid2str(lp->lp_primary_nid));
1905                 } else {
1906                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1907                         lnet_peer_clr_non_mr_pref_nids(lp);
1908                 }
1909         }
1910
1911         /*
1912          * The peer may have discovery disabled at its end. Set
1913          * NO_DISCOVERY as appropriate.
1914          */
1915         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
1916                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
1917                        libcfs_nid2str(lp->lp_primary_nid));
1918                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
1919         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1920                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
1921                        libcfs_nid2str(lp->lp_primary_nid));
1922                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
1923         }
1924
1925         /*
1926          * Check for truncation of the Put message. Clear the
1927          * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping,
1928          * and tell discovery to allocate a bigger buffer.
1929          */
1930         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
1931                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
1932                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
1933                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1934                 lp->lp_state |= LNET_PEER_FORCE_PING;
1935                 CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n",
1936                        libcfs_nid2str(lp->lp_primary_nid),
1937                        pbuf->pb_info.pi_nnis);
1938                 goto out;
1939         }
1940
1941         /*
1942          * Check whether the Put data is stale. Stale data can just be
1943          * dropped.
1944          */
1945         if (pbuf->pb_info.pi_nnis > 1 &&
1946             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid &&
1947             LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
1948                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1949                        libcfs_nid2str(lp->lp_primary_nid),
1950                        LNET_PING_BUFFER_SEQNO(pbuf),
1951                        lp->lp_peer_seqno);
1952                 goto out;
1953         }
1954
1955         /*
1956          * Check whether the Put data is new, in which case we clear
1957          * the UPTODATE flag and prepare to process it.
1958          *
1959          * If the Put data is current, and the peer is UPTODATE then
1960          * we assome everything is all right and drop the data as
1961          * stale.
1962          */
1963         if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) {
1964                 lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
1965                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1966         } else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) {
1967                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1968                        libcfs_nid2str(lp->lp_primary_nid),
1969                        LNET_PING_BUFFER_SEQNO(pbuf),
1970                        lp->lp_peer_seqno);
1971                 goto out;
1972         }
1973
1974         /*
1975          * If there is data present that hasn't been processed yet,
1976          * we'll replace it if the Put contained newer data and it
1977          * fits. We're racing with a Ping or earlier Push in this
1978          * case.
1979          */
1980         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
1981                 if (LNET_PING_BUFFER_SEQNO(pbuf) >
1982                         LNET_PING_BUFFER_SEQNO(lp->lp_data) &&
1983                     pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) {
1984                         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1985                                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1986                         CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n",
1987                               libcfs_nid2str(lp->lp_primary_nid),
1988                               LNET_PING_BUFFER_SEQNO(pbuf),
1989                               LNET_PING_BUFFER_SEQNO(lp->lp_data));
1990                 }
1991                 goto out;
1992         }
1993
1994         /*
1995          * Allocate a buffer to copy the data. On a failure we drop
1996          * the Push and set FORCE_PING to force the discovery
1997          * thread to fix the problem by pinging the peer.
1998          */
1999         lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC);
2000         if (!lp->lp_data) {
2001                 lp->lp_state |= LNET_PEER_FORCE_PING;
2002                 CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n",
2003                        libcfs_nid2str(lp->lp_primary_nid),
2004                        LNET_PING_BUFFER_SEQNO(pbuf));
2005                 goto out;
2006         }
2007
2008         /* Success */
2009         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
2010                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
2011         lp->lp_state |= LNET_PEER_DATA_PRESENT;
2012         CDEBUG(D_NET, "Received Push %s %u\n",
2013                libcfs_nid2str(lp->lp_primary_nid),
2014                LNET_PING_BUFFER_SEQNO(pbuf));
2015
2016 out:
2017         /*
2018          * Queue the peer for discovery if not done, force it on the request
2019          * queue and wake the discovery thread if the peer was already queued,
2020          * because its status changed.
2021          */
2022         spin_unlock(&lp->lp_lock);
2023         lnet_net_lock(LNET_LOCK_EX);
2024         if (!lnet_peer_is_uptodate(lp) && lnet_peer_queue_for_discovery(lp)) {
2025                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2026                 wake_up(&the_lnet.ln_dc_waitq);
2027         }
2028         /* Drop refcount from lookup */
2029         lnet_peer_decref_locked(lp);
2030         lnet_net_unlock(LNET_LOCK_EX);
2031 }
2032
2033 /*
2034  * Clear the discovery error state, unless we're already discovering
2035  * this peer, in which case the error is current.
2036  */
2037 static void lnet_peer_clear_discovery_error(struct lnet_peer *lp)
2038 {
2039         spin_lock(&lp->lp_lock);
2040         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
2041                 lp->lp_dc_error = 0;
2042         spin_unlock(&lp->lp_lock);
2043 }
2044
2045 /*
2046  * Peer discovery slow path. The ln_api_mutex is held on entry, and
2047  * dropped/retaken within this function. An lnet_peer_ni is passed in
2048  * because discovery could tear down an lnet_peer.
2049  */
2050 int
2051 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block)
2052 {
2053         DEFINE_WAIT(wait);
2054         struct lnet_peer *lp;
2055         int rc = 0;
2056
2057 again:
2058         lnet_net_unlock(cpt);
2059         lnet_net_lock(LNET_LOCK_EX);
2060         lp = lpni->lpni_peer_net->lpn_peer;
2061         lnet_peer_clear_discovery_error(lp);
2062
2063         /*
2064          * We're willing to be interrupted. The lpni can become a
2065          * zombie if we race with DLC, so we must check for that.
2066          */
2067         for (;;) {
2068                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
2069                 if (signal_pending(current))
2070                         break;
2071                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2072                         break;
2073                 if (lp->lp_dc_error)
2074                         break;
2075                 if (lnet_peer_is_uptodate(lp))
2076                         break;
2077                 lnet_peer_queue_for_discovery(lp);
2078                 /*
2079                  * if caller requested a non-blocking operation then
2080                  * return immediately. Once discovery is complete then the
2081                  * peer ref will be decremented and any pending messages
2082                  * that were stopped due to discovery will be transmitted.
2083                  */
2084                 if (!block)
2085                         break;
2086
2087                 lnet_peer_addref_locked(lp);
2088                 lnet_net_unlock(LNET_LOCK_EX);
2089                 schedule();
2090                 finish_wait(&lp->lp_dc_waitq, &wait);
2091                 lnet_net_lock(LNET_LOCK_EX);
2092                 lnet_peer_decref_locked(lp);
2093                 /* Peer may have changed */
2094                 lp = lpni->lpni_peer_net->lpn_peer;
2095         }
2096         finish_wait(&lp->lp_dc_waitq, &wait);
2097
2098         lnet_net_unlock(LNET_LOCK_EX);
2099         lnet_net_lock(cpt);
2100
2101         /*
2102          * If the peer has changed after we've discovered the older peer,
2103          * then we need to discovery the new peer to make sure the
2104          * interface information is up to date
2105          */
2106         if (lp != lpni->lpni_peer_net->lpn_peer)
2107                 goto again;
2108
2109         if (signal_pending(current))
2110                 rc = -EINTR;
2111         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2112                 rc = -ESHUTDOWN;
2113         else if (lp->lp_dc_error)
2114                 rc = lp->lp_dc_error;
2115         else if (!block)
2116                 CDEBUG(D_NET, "non-blocking discovery\n");
2117         else if (!lnet_peer_is_uptodate(lp))
2118                 goto again;
2119
2120         CDEBUG(D_NET, "peer %s NID %s: %d. %s\n",
2121                (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"),
2122                libcfs_nid2str(lpni->lpni_nid), rc,
2123                (!block) ? "pending discovery" : "discovery complete");
2124
2125         return rc;
2126 }
2127
2128 /* Handle an incoming ack for a push. */
2129 static void
2130 lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev)
2131 {
2132         struct lnet_ping_buffer *pbuf;
2133
2134         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2135         spin_lock(&lp->lp_lock);
2136         lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2137         lp->lp_push_error = ev->status;
2138         if (ev->status)
2139                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2140         else
2141                 lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2142         spin_unlock(&lp->lp_lock);
2143
2144         CDEBUG(D_NET, "peer %s ev->status %d\n",
2145                libcfs_nid2str(lp->lp_primary_nid), ev->status);
2146 }
2147
2148 /* Handle a Reply message. This is the reply to a Ping message. */
2149 static void
2150 lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
2151 {
2152         struct lnet_ping_buffer *pbuf;
2153         int rc;
2154
2155         spin_lock(&lp->lp_lock);
2156
2157         /*
2158          * If some kind of error happened the contents of message
2159          * cannot be used. Set PING_FAILED to trigger a retry.
2160          */
2161         if (ev->status) {
2162                 lp->lp_state |= LNET_PEER_PING_FAILED;
2163                 lp->lp_ping_error = ev->status;
2164                 CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n",
2165                        ev->status,
2166                        libcfs_nid2str(lp->lp_primary_nid),
2167                        libcfs_nid2str(ev->source.nid));
2168                 goto out;
2169         }
2170
2171         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2172         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
2173                 lnet_swap_pinginfo(pbuf);
2174
2175         /*
2176          * A reply with invalid or corrupted info. Set PING_FAILED to
2177          * trigger a retry.
2178          */
2179         rc = lnet_ping_info_validate(&pbuf->pb_info);
2180         if (rc) {
2181                 lp->lp_state |= LNET_PEER_PING_FAILED;
2182                 lp->lp_ping_error = 0;
2183                 CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n",
2184                        libcfs_nid2str(lp->lp_primary_nid), rc);
2185                 goto out;
2186         }
2187
2188         /*
2189          * Update the MULTI_RAIL flag based on the reply. If the peer
2190          * was configured with DLC then the setting should match what
2191          * DLC put in.
2192          */
2193         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
2194                 if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2195                         /* Everything's fine */
2196                 } else if (lp->lp_state & LNET_PEER_CONFIGURED) {
2197                         CWARN("Reply says %s is Multi-Rail, DLC says not\n",
2198                               libcfs_nid2str(lp->lp_primary_nid));
2199                 } else {
2200                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
2201                         lnet_peer_clr_non_mr_pref_nids(lp);
2202                 }
2203         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2204                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
2205                         CWARN("DLC says %s is Multi-Rail, Reply says not\n",
2206                               libcfs_nid2str(lp->lp_primary_nid));
2207                 } else {
2208                         CERROR("Multi-Rail state vanished from %s\n",
2209                                libcfs_nid2str(lp->lp_primary_nid));
2210                         lp->lp_state &= ~LNET_PEER_MULTI_RAIL;
2211                 }
2212         }
2213
2214         /*
2215          * Make sure we'll allocate the correct size ping buffer when
2216          * pinging the peer.
2217          */
2218         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
2219                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
2220
2221         /*
2222          * The peer may have discovery disabled at its end. Set
2223          * NO_DISCOVERY as appropriate.
2224          */
2225         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
2226                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
2227                        libcfs_nid2str(lp->lp_primary_nid));
2228                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
2229         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
2230                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
2231                        libcfs_nid2str(lp->lp_primary_nid));
2232                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
2233         }
2234
2235         /*
2236          * Check for truncation of the Reply. Clear PING_SENT and set
2237          * PING_FAILED to trigger a retry.
2238          */
2239         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
2240                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
2241                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
2242                 lp->lp_state |= LNET_PEER_PING_FAILED;
2243                 lp->lp_ping_error = 0;
2244                 CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n",
2245                        libcfs_nid2str(lp->lp_primary_nid),
2246                        pbuf->pb_info.pi_nnis);
2247                 goto out;
2248         }
2249
2250         /*
2251          * Check the sequence numbers in the reply. These are only
2252          * available if the reply came from a Multi-Rail peer.
2253          */
2254         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
2255             pbuf->pb_info.pi_nnis > 1 &&
2256             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
2257                 if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
2258                         CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n",
2259                                 libcfs_nid2str(lp->lp_primary_nid),
2260                                 LNET_PING_BUFFER_SEQNO(pbuf),
2261                                 lp->lp_peer_seqno);
2262                         goto out;
2263                 }
2264
2265                 if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno)
2266                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2267         }
2268
2269         /* We're happy with the state of the data in the buffer. */
2270         CDEBUG(D_NET, "peer %s data present %u\n",
2271                libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno);
2272         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
2273                 lnet_ping_buffer_decref(lp->lp_data);
2274         else
2275                 lp->lp_state |= LNET_PEER_DATA_PRESENT;
2276         lnet_ping_buffer_addref(pbuf);
2277         lp->lp_data = pbuf;
2278 out:
2279         lp->lp_state &= ~LNET_PEER_PING_SENT;
2280         spin_unlock(&lp->lp_lock);
2281 }
2282
2283 /*
2284  * Send event handling. Only matters for error cases, where we clean
2285  * up state on the peer and peer_ni that would otherwise be updated in
2286  * the REPLY event handler for a successful Ping, and the ACK event
2287  * handler for a successful Push.
2288  */
2289 static int
2290 lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev)
2291 {
2292         int rc = 0;
2293
2294         if (!ev->status)
2295                 goto out;
2296
2297         spin_lock(&lp->lp_lock);
2298         if (ev->msg_type == LNET_MSG_GET) {
2299                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2300                 lp->lp_state |= LNET_PEER_PING_FAILED;
2301                 lp->lp_ping_error = ev->status;
2302         } else { /* ev->msg_type == LNET_MSG_PUT */
2303                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2304                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2305                 lp->lp_push_error = ev->status;
2306         }
2307         spin_unlock(&lp->lp_lock);
2308         rc = LNET_REDISCOVER_PEER;
2309 out:
2310         CDEBUG(D_NET, "%s Send to %s: %d\n",
2311                 (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"),
2312                 libcfs_nid2str(ev->target.nid), rc);
2313         return rc;
2314 }
2315
2316 /*
2317  * Unlink event handling. This event is only seen if a call to
2318  * LNetMDUnlink() caused the event to be unlinked. If this call was
2319  * made after the event was set up in LNetGet() or LNetPut() then we
2320  * assume the Ping or Push timed out.
2321  */
2322 static void
2323 lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev)
2324 {
2325         spin_lock(&lp->lp_lock);
2326         /* We've passed through LNetGet() */
2327         if (lp->lp_state & LNET_PEER_PING_SENT) {
2328                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2329                 lp->lp_state |= LNET_PEER_PING_FAILED;
2330                 lp->lp_ping_error = -ETIMEDOUT;
2331                 CDEBUG(D_NET, "Ping Unlink for message to peer %s\n",
2332                         libcfs_nid2str(lp->lp_primary_nid));
2333         }
2334         /* We've passed through LNetPut() */
2335         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2336                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2337                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2338                 lp->lp_push_error = -ETIMEDOUT;
2339                 CDEBUG(D_NET, "Push Unlink for message to peer %s\n",
2340                         libcfs_nid2str(lp->lp_primary_nid));
2341         }
2342         spin_unlock(&lp->lp_lock);
2343 }
2344
2345 /*
2346  * Event handler for the discovery EQ.
2347  *
2348  * Called with lnet_res_lock(cpt) held. The cpt is the
2349  * lnet_cpt_of_cookie() of the md handle cookie.
2350  */
2351 static void lnet_discovery_event_handler(struct lnet_event *event)
2352 {
2353         struct lnet_peer *lp = event->md.user_ptr;
2354         struct lnet_ping_buffer *pbuf;
2355         int rc;
2356
2357         /* discovery needs to take another look */
2358         rc = LNET_REDISCOVER_PEER;
2359
2360         CDEBUG(D_NET, "Received event: %d\n", event->type);
2361
2362         switch (event->type) {
2363         case LNET_EVENT_ACK:
2364                 lnet_discovery_event_ack(lp, event);
2365                 break;
2366         case LNET_EVENT_REPLY:
2367                 lnet_discovery_event_reply(lp, event);
2368                 break;
2369         case LNET_EVENT_SEND:
2370                 /* Only send failure triggers a retry. */
2371                 rc = lnet_discovery_event_send(lp, event);
2372                 break;
2373         case LNET_EVENT_UNLINK:
2374                 /* LNetMDUnlink() was called */
2375                 lnet_discovery_event_unlink(lp, event);
2376                 break;
2377         default:
2378                 /* Invalid events. */
2379                 LBUG();
2380         }
2381         lnet_net_lock(LNET_LOCK_EX);
2382         if (event->unlinked) {
2383                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
2384                 lnet_ping_buffer_decref(pbuf);
2385                 lnet_peer_decref_locked(lp);
2386         }
2387
2388         /* put peer back at end of request queue, if discovery not already
2389          * done */
2390         if (rc == LNET_REDISCOVER_PEER && !lnet_peer_is_uptodate(lp)) {
2391                 list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2392                 wake_up(&the_lnet.ln_dc_waitq);
2393         }
2394         lnet_net_unlock(LNET_LOCK_EX);
2395 }
2396
2397 /*
2398  * Build a peer from incoming data.
2399  *
2400  * The NIDs in the incoming data are supposed to be structured as follows:
2401  *  - loopback
2402  *  - primary NID
2403  *  - other NIDs in same net
2404  *  - NIDs in second net
2405  *  - NIDs in third net
2406  *  - ...
2407  * This due to the way the list of NIDs in the data is created.
2408  *
2409  * Note that this function will mark the peer uptodate unless an
2410  * ENOMEM is encontered. All other errors are due to a conflict
2411  * between the DLC configuration and what discovery sees. We treat DLC
2412  * as binding, and therefore set the NIDS_UPTODATE flag to prevent the
2413  * peer from becoming stuck in discovery.
2414  */
2415 static int lnet_peer_merge_data(struct lnet_peer *lp,
2416                                 struct lnet_ping_buffer *pbuf)
2417 {
2418         struct lnet_peer_ni *lpni;
2419         lnet_nid_t *curnis = NULL;
2420         lnet_nid_t *addnis = NULL;
2421         lnet_nid_t *delnis = NULL;
2422         unsigned flags;
2423         int ncurnis;
2424         int naddnis;
2425         int ndelnis;
2426         int nnis = 0;
2427         int i;
2428         int j;
2429         int rc;
2430
2431         flags = LNET_PEER_DISCOVERED;
2432         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2433                 flags |= LNET_PEER_MULTI_RAIL;
2434
2435         nnis = MAX(lp->lp_nnis, pbuf->pb_info.pi_nnis);
2436         LIBCFS_ALLOC(curnis, nnis * sizeof(lnet_nid_t));
2437         LIBCFS_ALLOC(addnis, nnis * sizeof(lnet_nid_t));
2438         LIBCFS_ALLOC(delnis, nnis * sizeof(lnet_nid_t));
2439         if (!curnis || !addnis || !delnis) {
2440                 rc = -ENOMEM;
2441                 goto out;
2442         }
2443         ncurnis = 0;
2444         naddnis = 0;
2445         ndelnis = 0;
2446
2447         /* Construct the list of NIDs present in peer. */
2448         lpni = NULL;
2449         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
2450                 curnis[ncurnis++] = lpni->lpni_nid;
2451
2452         /*
2453          * Check for NIDs in pbuf not present in curnis[].
2454          * The loop starts at 1 to skip the loopback NID.
2455          */
2456         for (i = 1; i < pbuf->pb_info.pi_nnis; i++) {
2457                 for (j = 0; j < ncurnis; j++)
2458                         if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
2459                                 break;
2460                 if (j == ncurnis)
2461                         addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid;
2462         }
2463         /*
2464          * Check for NIDs in curnis[] not present in pbuf.
2465          * The nested loop starts at 1 to skip the loopback NID.
2466          *
2467          * But never add the loopback NID to delnis[]: if it is
2468          * present in curnis[] then this peer is for this node.
2469          */
2470         for (i = 0; i < ncurnis; i++) {
2471                 if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND)
2472                         continue;
2473                 for (j = 1; j < pbuf->pb_info.pi_nnis; j++)
2474                         if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid)
2475                                 break;
2476                 if (j == pbuf->pb_info.pi_nnis)
2477                         delnis[ndelnis++] = curnis[i];
2478         }
2479
2480         for (i = 0; i < naddnis; i++) {
2481                 rc = lnet_peer_add_nid(lp, addnis[i], flags);
2482                 if (rc) {
2483                         CERROR("Error adding NID %s to peer %s: %d\n",
2484                                libcfs_nid2str(addnis[i]),
2485                                libcfs_nid2str(lp->lp_primary_nid), rc);
2486                         if (rc == -ENOMEM)
2487                                 goto out;
2488                 }
2489         }
2490         for (i = 0; i < ndelnis; i++) {
2491                 rc = lnet_peer_del_nid(lp, delnis[i], flags);
2492                 if (rc) {
2493                         CERROR("Error deleting NID %s from peer %s: %d\n",
2494                                libcfs_nid2str(delnis[i]),
2495                                libcfs_nid2str(lp->lp_primary_nid), rc);
2496                         if (rc == -ENOMEM)
2497                                 goto out;
2498                 }
2499         }
2500         /*
2501          * Errors other than -ENOMEM are due to peers having been
2502          * configured with DLC. Ignore these because DLC overrides
2503          * Discovery.
2504          */
2505         rc = 0;
2506 out:
2507         LIBCFS_FREE(curnis, nnis * sizeof(lnet_nid_t));
2508         LIBCFS_FREE(addnis, nnis * sizeof(lnet_nid_t));
2509         LIBCFS_FREE(delnis, nnis * sizeof(lnet_nid_t));
2510         lnet_ping_buffer_decref(pbuf);
2511         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2512
2513         if (rc) {
2514                 spin_lock(&lp->lp_lock);
2515                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2516                 lp->lp_state |= LNET_PEER_FORCE_PING;
2517                 spin_unlock(&lp->lp_lock);
2518         }
2519         return rc;
2520 }
2521
2522 /*
2523  * The data in pbuf says lp is its primary peer, but the data was
2524  * received by a different peer. Try to update lp with the data.
2525  */
2526 static int
2527 lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
2528 {
2529         struct lnet_handle_md mdh;
2530
2531         /* Queue lp for discovery, and force it on the request queue. */
2532         lnet_net_lock(LNET_LOCK_EX);
2533         if (lnet_peer_queue_for_discovery(lp))
2534                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2535         lnet_net_unlock(LNET_LOCK_EX);
2536
2537         LNetInvalidateMDHandle(&mdh);
2538
2539         /*
2540          * Decide whether we can move the peer to the DATA_PRESENT state.
2541          *
2542          * We replace stale data for a multi-rail peer, repair PING_FAILED
2543          * status, and preempt FORCE_PING.
2544          *
2545          * If after that we have DATA_PRESENT, we merge it into this peer.
2546          */
2547         spin_lock(&lp->lp_lock);
2548         if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2549                 if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) {
2550                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2551                 } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2552                         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2553                         lnet_ping_buffer_decref(pbuf);
2554                         pbuf = lp->lp_data;
2555                         lp->lp_data = NULL;
2556                 }
2557         }
2558         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2559                 lnet_ping_buffer_decref(lp->lp_data);
2560                 lp->lp_data = NULL;
2561                 lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2562         }
2563         if (lp->lp_state & LNET_PEER_PING_FAILED) {
2564                 mdh = lp->lp_ping_mdh;
2565                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2566                 lp->lp_state &= ~LNET_PEER_PING_FAILED;
2567                 lp->lp_ping_error = 0;
2568         }
2569         if (lp->lp_state & LNET_PEER_FORCE_PING)
2570                 lp->lp_state &= ~LNET_PEER_FORCE_PING;
2571         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2572         spin_unlock(&lp->lp_lock);
2573
2574         if (!LNetMDHandleIsInvalid(mdh))
2575                 LNetMDUnlink(mdh);
2576
2577         if (pbuf)
2578                 return lnet_peer_merge_data(lp, pbuf);
2579
2580         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2581         return 0;
2582 }
2583
2584 /*
2585  * Update a peer using the data received.
2586  */
2587 static int lnet_peer_data_present(struct lnet_peer *lp)
2588 __must_hold(&lp->lp_lock)
2589 {
2590         struct lnet_ping_buffer *pbuf;
2591         struct lnet_peer_ni *lpni;
2592         lnet_nid_t nid = LNET_NID_ANY;
2593         unsigned flags;
2594         int rc = 0;
2595
2596         pbuf = lp->lp_data;
2597         lp->lp_data = NULL;
2598         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2599         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2600         spin_unlock(&lp->lp_lock);
2601
2602         /*
2603          * Modifications of peer structures are done while holding the
2604          * ln_api_mutex. A global lock is required because we may be
2605          * modifying multiple peer structures, and a mutex greatly
2606          * simplifies memory management.
2607          *
2608          * The actual changes to the data structures must also protect
2609          * against concurrent lookups, for which the lnet_net_lock in
2610          * LNET_LOCK_EX mode is used.
2611          */
2612         mutex_lock(&the_lnet.ln_api_mutex);
2613         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
2614                 rc = -ESHUTDOWN;
2615                 goto out;
2616         }
2617
2618         /*
2619          * If this peer is not on the peer list then it is being torn
2620          * down, and our reference count may be all that is keeping it
2621          * alive. Don't do any work on it.
2622          */
2623         if (list_empty(&lp->lp_peer_list))
2624                 goto out;
2625
2626         flags = LNET_PEER_DISCOVERED;
2627         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2628                 flags |= LNET_PEER_MULTI_RAIL;
2629
2630         /*
2631          * Check whether the primary NID in the message matches the
2632          * primary NID of the peer. If it does, update the peer, if
2633          * it it does not, check whether there is already a peer with
2634          * that primary NID. If no such peer exists, try to update
2635          * the primary NID of the current peer (allowed if it was
2636          * created due to message traffic) and complete the update.
2637          * If the peer did exist, hand off the data to it.
2638          *
2639          * The peer for the loopback interface is a special case: this
2640          * is the peer for the local node, and we want to set its
2641          * primary NID to the correct value here. Moreover, this peer
2642          * can show up with only the loopback NID in the ping buffer.
2643          */
2644         if (pbuf->pb_info.pi_nnis <= 1)
2645                 goto out;
2646         nid = pbuf->pb_info.pi_ni[1].ns_nid;
2647         if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) {
2648                 rc = lnet_peer_set_primary_nid(lp, nid, flags);
2649                 if (!rc)
2650                         rc = lnet_peer_merge_data(lp, pbuf);
2651         } else if (lp->lp_primary_nid == nid) {
2652                 rc = lnet_peer_merge_data(lp, pbuf);
2653         } else {
2654                 lpni = lnet_find_peer_ni_locked(nid);
2655                 if (!lpni) {
2656                         rc = lnet_peer_set_primary_nid(lp, nid, flags);
2657                         if (rc) {
2658                                 CERROR("Primary NID error %s versus %s: %d\n",
2659                                        libcfs_nid2str(lp->lp_primary_nid),
2660                                        libcfs_nid2str(nid), rc);
2661                         } else {
2662                                 rc = lnet_peer_merge_data(lp, pbuf);
2663                         }
2664                 } else {
2665                         rc = lnet_peer_set_primary_data(
2666                                 lpni->lpni_peer_net->lpn_peer, pbuf);
2667                         lnet_peer_ni_decref_locked(lpni);
2668                 }
2669         }
2670 out:
2671         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2672         mutex_unlock(&the_lnet.ln_api_mutex);
2673
2674         spin_lock(&lp->lp_lock);
2675         /* Tell discovery to re-check the peer immediately. */
2676         if (!rc)
2677                 rc = LNET_REDISCOVER_PEER;
2678         return rc;
2679 }
2680
2681 /*
2682  * A ping failed. Clear the PING_FAILED state and set the
2683  * FORCE_PING state, to ensure a retry even if discovery is
2684  * disabled. This avoids being left with incorrect state.
2685  */
2686 static int lnet_peer_ping_failed(struct lnet_peer *lp)
2687 __must_hold(&lp->lp_lock)
2688 {
2689         struct lnet_handle_md mdh;
2690         int rc;
2691
2692         mdh = lp->lp_ping_mdh;
2693         LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2694         lp->lp_state &= ~LNET_PEER_PING_FAILED;
2695         lp->lp_state |= LNET_PEER_FORCE_PING;
2696         rc = lp->lp_ping_error;
2697         lp->lp_ping_error = 0;
2698         spin_unlock(&lp->lp_lock);
2699
2700         if (!LNetMDHandleIsInvalid(mdh))
2701                 LNetMDUnlink(mdh);
2702
2703         CDEBUG(D_NET, "peer %s:%d\n",
2704                libcfs_nid2str(lp->lp_primary_nid), rc);
2705
2706         spin_lock(&lp->lp_lock);
2707         return rc ? rc : LNET_REDISCOVER_PEER;
2708 }
2709
2710 /*
2711  * Select NID to send a Ping or Push to.
2712  */
2713 static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
2714 {
2715         struct lnet_peer_ni *lpni;
2716
2717         /* Look for a direct-connected NID for this peer. */
2718         lpni = NULL;
2719         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2720                 if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
2721                         continue;
2722                 break;
2723         }
2724         if (lpni)
2725                 return lpni->lpni_nid;
2726
2727         /* Look for a routed-connected NID for this peer. */
2728         lpni = NULL;
2729         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2730                 if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
2731                         continue;
2732                 break;
2733         }
2734         if (lpni)
2735                 return lpni->lpni_nid;
2736
2737         return LNET_NID_ANY;
2738 }
2739
2740 /* Active side of ping. */
2741 static int lnet_peer_send_ping(struct lnet_peer *lp)
2742 __must_hold(&lp->lp_lock)
2743 {
2744         lnet_nid_t pnid;
2745         int nnis;
2746         int rc;
2747         int cpt;
2748
2749         lp->lp_state |= LNET_PEER_PING_SENT;
2750         lp->lp_state &= ~LNET_PEER_FORCE_PING;
2751         spin_unlock(&lp->lp_lock);
2752
2753         cpt = lnet_net_lock_current();
2754         /* Refcount for MD. */
2755         lnet_peer_addref_locked(lp);
2756         pnid = lnet_peer_select_nid(lp);
2757         lnet_net_unlock(cpt);
2758
2759         nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN);
2760
2761         rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp,
2762                             the_lnet.ln_dc_eqh, false);
2763
2764         /*
2765          * if LNetMDBind in lnet_send_ping fails we need to decrement the
2766          * refcount on the peer, otherwise LNetMDUnlink will be called
2767          * which will eventually do that.
2768          */
2769         if (rc > 0) {
2770                 lnet_net_lock(cpt);
2771                 lnet_peer_decref_locked(lp);
2772                 lnet_net_unlock(cpt);
2773                 rc = -rc; /* change the rc to negative value */
2774                 goto fail_error;
2775         } else if (rc < 0) {
2776                 goto fail_error;
2777         }
2778
2779         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2780
2781         spin_lock(&lp->lp_lock);
2782         return 0;
2783
2784 fail_error:
2785         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2786         /*
2787          * The errors that get us here are considered hard errors and
2788          * cause Discovery to terminate. So we clear PING_SENT, but do
2789          * not set either PING_FAILED or FORCE_PING. In fact we need
2790          * to clear PING_FAILED, because the unlink event handler will
2791          * have set it if we called LNetMDUnlink() above.
2792          */
2793         spin_lock(&lp->lp_lock);
2794         lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED);
2795         return rc;
2796 }
2797
2798 /*
2799  * This function exists because you cannot call LNetMDUnlink() from an
2800  * event handler.
2801  */
2802 static int lnet_peer_push_failed(struct lnet_peer *lp)
2803 __must_hold(&lp->lp_lock)
2804 {
2805         struct lnet_handle_md mdh;
2806         int rc;
2807
2808         mdh = lp->lp_push_mdh;
2809         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2810         lp->lp_state &= ~LNET_PEER_PUSH_FAILED;
2811         rc = lp->lp_push_error;
2812         lp->lp_push_error = 0;
2813         spin_unlock(&lp->lp_lock);
2814
2815         if (!LNetMDHandleIsInvalid(mdh))
2816                 LNetMDUnlink(mdh);
2817
2818         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2819         spin_lock(&lp->lp_lock);
2820         return rc ? rc : LNET_REDISCOVER_PEER;
2821 }
2822
2823 /* Active side of push. */
2824 static int lnet_peer_send_push(struct lnet_peer *lp)
2825 __must_hold(&lp->lp_lock)
2826 {
2827         struct lnet_ping_buffer *pbuf;
2828         struct lnet_process_id id;
2829         struct lnet_md md;
2830         int cpt;
2831         int rc;
2832
2833         /* Don't push to a non-multi-rail peer. */
2834         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
2835                 lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2836                 return 0;
2837         }
2838
2839         lp->lp_state |= LNET_PEER_PUSH_SENT;
2840         lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2841         spin_unlock(&lp->lp_lock);
2842
2843         cpt = lnet_net_lock_current();
2844         pbuf = the_lnet.ln_ping_target;
2845         lnet_ping_buffer_addref(pbuf);
2846         lnet_net_unlock(cpt);
2847
2848         /* Push source MD */
2849         md.start     = &pbuf->pb_info;
2850         md.length    = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
2851         md.threshold = 2; /* Put/Ack */
2852         md.max_size  = 0;
2853         md.options   = 0;
2854         md.eq_handle = the_lnet.ln_dc_eqh;
2855         md.user_ptr  = lp;
2856
2857         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
2858         if (rc) {
2859                 lnet_ping_buffer_decref(pbuf);
2860                 CERROR("Can't bind push source MD: %d\n", rc);
2861                 goto fail_error;
2862         }
2863         cpt = lnet_net_lock_current();
2864         /* Refcount for MD. */
2865         lnet_peer_addref_locked(lp);
2866         id.pid = LNET_PID_LUSTRE;
2867         id.nid = lnet_peer_select_nid(lp);
2868         lnet_net_unlock(cpt);
2869
2870         if (id.nid == LNET_NID_ANY) {
2871                 rc = -EHOSTUNREACH;
2872                 goto fail_unlink;
2873         }
2874
2875         rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh,
2876                      LNET_ACK_REQ, id, LNET_RESERVED_PORTAL,
2877                      LNET_PROTO_PING_MATCHBITS, 0, 0);
2878
2879         if (rc)
2880                 goto fail_unlink;
2881
2882         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2883
2884         spin_lock(&lp->lp_lock);
2885         return 0;
2886
2887 fail_unlink:
2888         LNetMDUnlink(lp->lp_push_mdh);
2889         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2890 fail_error:
2891         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2892         /*
2893          * The errors that get us here are considered hard errors and
2894          * cause Discovery to terminate. So we clear PUSH_SENT, but do
2895          * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED,
2896          * because the unlink event handler will have set it if we
2897          * called LNetMDUnlink() above.
2898          */
2899         spin_lock(&lp->lp_lock);
2900         lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED);
2901         return rc;
2902 }
2903
2904 /*
2905  * An unrecoverable error was encountered during discovery.
2906  * Set error status in peer and abort discovery.
2907  */
2908 static void lnet_peer_discovery_error(struct lnet_peer *lp, int error)
2909 {
2910         CDEBUG(D_NET, "Discovery error %s: %d\n",
2911                libcfs_nid2str(lp->lp_primary_nid), error);
2912
2913         spin_lock(&lp->lp_lock);
2914         lp->lp_dc_error = error;
2915         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2916         lp->lp_state |= LNET_PEER_REDISCOVER;
2917         spin_unlock(&lp->lp_lock);
2918 }
2919
2920 /*
2921  * Mark the peer as discovered.
2922  */
2923 static int lnet_peer_discovered(struct lnet_peer *lp)
2924 __must_hold(&lp->lp_lock)
2925 {
2926         lp->lp_state |= LNET_PEER_DISCOVERED;
2927         lp->lp_state &= ~(LNET_PEER_DISCOVERING |
2928                           LNET_PEER_REDISCOVER);
2929
2930         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2931
2932         return 0;
2933 }
2934
2935 /*
2936  * Mark the peer as to be rediscovered.
2937  */
2938 static int lnet_peer_rediscover(struct lnet_peer *lp)
2939 __must_hold(&lp->lp_lock)
2940 {
2941         lp->lp_state |= LNET_PEER_REDISCOVER;
2942         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2943
2944         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2945
2946         return 0;
2947 }
2948
2949 /*
2950  * Discovering this peer is taking too long. Cancel any Ping or Push
2951  * that discovery is waiting on by unlinking the relevant MDs. The
2952  * lnet_discovery_event_handler() will proceed from here and complete
2953  * the cleanup.
2954  */
2955 static void lnet_peer_cancel_discovery(struct lnet_peer *lp)
2956 {
2957         struct lnet_handle_md ping_mdh;
2958         struct lnet_handle_md push_mdh;
2959
2960         LNetInvalidateMDHandle(&ping_mdh);
2961         LNetInvalidateMDHandle(&push_mdh);
2962
2963         spin_lock(&lp->lp_lock);
2964         if (lp->lp_state & LNET_PEER_PING_SENT) {
2965                 ping_mdh = lp->lp_ping_mdh;
2966                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2967         }
2968         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2969                 push_mdh = lp->lp_push_mdh;
2970                 LNetInvalidateMDHandle(&lp->lp_push_mdh);
2971         }
2972         spin_unlock(&lp->lp_lock);
2973
2974         if (!LNetMDHandleIsInvalid(ping_mdh))
2975                 LNetMDUnlink(ping_mdh);
2976         if (!LNetMDHandleIsInvalid(push_mdh))
2977                 LNetMDUnlink(push_mdh);
2978 }
2979
2980 /*
2981  * Wait for work to be queued or some other change that must be
2982  * attended to. Returns non-zero if the discovery thread should shut
2983  * down.
2984  */
2985 static int lnet_peer_discovery_wait_for_work(void)
2986 {
2987         int cpt;
2988         int rc = 0;
2989
2990         DEFINE_WAIT(wait);
2991
2992         cpt = lnet_net_lock_current();
2993         for (;;) {
2994                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
2995                                 TASK_INTERRUPTIBLE);
2996                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
2997                         break;
2998                 if (lnet_push_target_resize_needed())
2999                         break;
3000                 if (!list_empty(&the_lnet.ln_dc_request))
3001                         break;
3002                 if (!list_empty(&the_lnet.ln_msg_resend))
3003                         break;
3004                 lnet_net_unlock(cpt);
3005
3006                 /*
3007                  * wakeup max every second to check if there are peers that
3008                  * have been stuck on the working queue for greater than
3009                  * the peer timeout.
3010                  */
3011                 schedule_timeout(cfs_time_seconds(1));
3012                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
3013                 cpt = lnet_net_lock_current();
3014         }
3015         finish_wait(&the_lnet.ln_dc_waitq, &wait);
3016
3017         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3018                 rc = -ESHUTDOWN;
3019
3020         lnet_net_unlock(cpt);
3021
3022         CDEBUG(D_NET, "woken: %d\n", rc);
3023
3024         return rc;
3025 }
3026
3027 /*
3028  * Messages that were pending on a destroyed peer will be put on a global
3029  * resend list. The message resend list will be checked by
3030  * the discovery thread when it wakes up, and will resend messages. These
3031  * messages can still be sendable in the case the lpni which was the initial
3032  * cause of the message re-queue was transfered to another peer.
3033  *
3034  * It is possible that LNet could be shutdown while we're iterating
3035  * through the list. lnet_shudown_lndnets() will attempt to access the
3036  * resend list, but will have to wait until the spinlock is released, by
3037  * which time there shouldn't be any more messages on the resend list.
3038  * During shutdown lnet_send() will fail and lnet_finalize() will be called
3039  * for the messages so they can be released. The other case is that
3040  * lnet_shudown_lndnets() can finalize all the messages before this
3041  * function can visit the resend list, in which case this function will be
3042  * a no-op.
3043  */
3044 static void lnet_resend_msgs(void)
3045 {
3046         struct lnet_msg *msg, *tmp;
3047         struct list_head resend;
3048         int rc;
3049
3050         INIT_LIST_HEAD(&resend);
3051
3052         spin_lock(&the_lnet.ln_msg_resend_lock);
3053         list_splice(&the_lnet.ln_msg_resend, &resend);
3054         spin_unlock(&the_lnet.ln_msg_resend_lock);
3055
3056         list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
3057                 list_del_init(&msg->msg_list);
3058                 rc = lnet_send(msg->msg_src_nid_param, msg,
3059                                msg->msg_rtr_nid_param);
3060                 if (rc < 0) {
3061                         CNETERR("Error sending %s to %s: %d\n",
3062                                lnet_msgtyp2str(msg->msg_type),
3063                                libcfs_id2str(msg->msg_target), rc);
3064                         lnet_finalize(msg, rc);
3065                 }
3066         }
3067 }
3068
3069 /* The discovery thread. */
3070 static int lnet_peer_discovery(void *arg)
3071 {
3072         struct lnet_peer *lp;
3073         int rc;
3074
3075         CDEBUG(D_NET, "started\n");
3076         cfs_block_allsigs();
3077
3078         for (;;) {
3079                 if (lnet_peer_discovery_wait_for_work())
3080                         break;
3081
3082                 lnet_resend_msgs();
3083
3084                 if (lnet_push_target_resize_needed())
3085                         lnet_push_target_resize();
3086
3087                 lnet_net_lock(LNET_LOCK_EX);
3088                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3089                         break;
3090
3091                 /*
3092                  * Process all incoming discovery work requests.  When
3093                  * discovery must wait on a peer to change state, it
3094                  * is added to the tail of the ln_dc_working queue. A
3095                  * timestamp keeps track of when the peer was added,
3096                  * so we can time out discovery requests that take too
3097                  * long.
3098                  */
3099                 while (!list_empty(&the_lnet.ln_dc_request)) {
3100                         lp = list_first_entry(&the_lnet.ln_dc_request,
3101                                               struct lnet_peer, lp_dc_list);
3102                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
3103                         /*
3104                          * set the time the peer was put on the dc_working
3105                          * queue. It shouldn't remain on the queue
3106                          * forever, in case the GET message (for ping)
3107                          * doesn't get a REPLY or the PUT message (for
3108                          * push) doesn't get an ACK.
3109                          */
3110                         lp->lp_last_queued = ktime_get_real_seconds();
3111                         lnet_net_unlock(LNET_LOCK_EX);
3112
3113                         /*
3114                          * Select an action depending on the state of
3115                          * the peer and whether discovery is disabled.
3116                          * The check whether discovery is disabled is
3117                          * done after the code that handles processing
3118                          * for arrived data, cleanup for failures, and
3119                          * forcing a Ping or Push.
3120                          */
3121                         spin_lock(&lp->lp_lock);
3122                         CDEBUG(D_NET, "peer %s state %#x\n",
3123                                 libcfs_nid2str(lp->lp_primary_nid),
3124                                 lp->lp_state);
3125                         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
3126                                 rc = lnet_peer_data_present(lp);
3127                         else if (lp->lp_state & LNET_PEER_PING_FAILED)
3128                                 rc = lnet_peer_ping_failed(lp);
3129                         else if (lp->lp_state & LNET_PEER_PUSH_FAILED)
3130                                 rc = lnet_peer_push_failed(lp);
3131                         else if (lp->lp_state & LNET_PEER_FORCE_PING)
3132                                 rc = lnet_peer_send_ping(lp);
3133                         else if (lp->lp_state & LNET_PEER_FORCE_PUSH)
3134                                 rc = lnet_peer_send_push(lp);
3135                         else if (lnet_peer_discovery_disabled)
3136                                 rc = lnet_peer_rediscover(lp);
3137                         else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
3138                                 rc = lnet_peer_send_ping(lp);
3139                         else if (lnet_peer_needs_push(lp))
3140                                 rc = lnet_peer_send_push(lp);
3141                         else
3142                                 rc = lnet_peer_discovered(lp);
3143                         CDEBUG(D_NET, "peer %s state %#x rc %d\n",
3144                                 libcfs_nid2str(lp->lp_primary_nid),
3145                                 lp->lp_state, rc);
3146                         spin_unlock(&lp->lp_lock);
3147
3148                         lnet_net_lock(LNET_LOCK_EX);
3149                         if (rc == LNET_REDISCOVER_PEER) {
3150                                 list_move(&lp->lp_dc_list,
3151                                           &the_lnet.ln_dc_request);
3152                         } else if (rc) {
3153                                 lnet_peer_discovery_error(lp, rc);
3154                         }
3155                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
3156                                 lnet_peer_discovery_complete(lp);
3157                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3158                                 break;
3159                 }
3160
3161                 lnet_net_unlock(LNET_LOCK_EX);
3162         }
3163
3164         CDEBUG(D_NET, "stopping\n");
3165         /*
3166          * Clean up before telling lnet_peer_discovery_stop() that
3167          * we're done. Use wake_up() below to somewhat reduce the
3168          * size of the thundering herd if there are multiple threads
3169          * waiting on discovery of a single peer.
3170          */
3171
3172         /* Queue cleanup 1: stop all pending pings and pushes. */
3173         lnet_net_lock(LNET_LOCK_EX);
3174         while (!list_empty(&the_lnet.ln_dc_working)) {
3175                 lp = list_first_entry(&the_lnet.ln_dc_working,
3176                                       struct lnet_peer, lp_dc_list);
3177                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
3178                 lnet_net_unlock(LNET_LOCK_EX);
3179                 lnet_peer_cancel_discovery(lp);
3180                 lnet_net_lock(LNET_LOCK_EX);
3181         }
3182         lnet_net_unlock(LNET_LOCK_EX);
3183
3184         /* Queue cleanup 2: wait for the expired queue to clear. */
3185         while (!list_empty(&the_lnet.ln_dc_expired))
3186                 schedule_timeout(cfs_time_seconds(1));
3187
3188         /* Queue cleanup 3: clear the request queue. */
3189         lnet_net_lock(LNET_LOCK_EX);
3190         while (!list_empty(&the_lnet.ln_dc_request)) {
3191                 lp = list_first_entry(&the_lnet.ln_dc_request,
3192                                       struct lnet_peer, lp_dc_list);
3193                 lnet_peer_discovery_error(lp, -ESHUTDOWN);
3194                 lnet_peer_discovery_complete(lp);
3195         }
3196         lnet_net_unlock(LNET_LOCK_EX);
3197
3198         LNetEQFree(the_lnet.ln_dc_eqh);
3199         LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3200
3201         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3202         wake_up(&the_lnet.ln_dc_waitq);
3203
3204         CDEBUG(D_NET, "stopped\n");
3205
3206         return 0;
3207 }
3208
3209 /* ln_api_mutex is held on entry. */
3210 int lnet_peer_discovery_start(void)
3211 {
3212         struct task_struct *task;
3213         int rc;
3214
3215         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
3216                 return -EALREADY;
3217
3218         rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
3219         if (rc != 0) {
3220                 CERROR("Can't allocate discovery EQ: %d\n", rc);
3221                 return rc;
3222         }
3223
3224         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
3225         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
3226         if (IS_ERR(task)) {
3227                 rc = PTR_ERR(task);
3228                 CERROR("Can't start peer discovery thread: %d\n", rc);
3229
3230                 LNetEQFree(the_lnet.ln_dc_eqh);
3231                 LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3232
3233                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3234         }
3235
3236         CDEBUG(D_NET, "discovery start: %d\n", rc);
3237
3238         return rc;
3239 }
3240
3241 /* ln_api_mutex is held on entry. */
3242 void lnet_peer_discovery_stop(void)
3243 {
3244         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
3245                 return;
3246
3247         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
3248         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
3249         wake_up(&the_lnet.ln_dc_waitq);
3250
3251         wait_event(the_lnet.ln_dc_waitq,
3252                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
3253
3254         LASSERT(list_empty(&the_lnet.ln_dc_request));
3255         LASSERT(list_empty(&the_lnet.ln_dc_working));
3256         LASSERT(list_empty(&the_lnet.ln_dc_expired));
3257
3258         CDEBUG(D_NET, "discovery stopped\n");
3259 }
3260
3261 /* Debugging */
3262
3263 void
3264 lnet_debug_peer(lnet_nid_t nid)
3265 {
3266         char                    *aliveness = "NA";
3267         struct lnet_peer_ni     *lp;
3268         int                     cpt;
3269
3270         cpt = lnet_cpt_of_nid(nid, NULL);
3271         lnet_net_lock(cpt);
3272
3273         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
3274         if (IS_ERR(lp)) {
3275                 lnet_net_unlock(cpt);
3276                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
3277                 return;
3278         }
3279
3280         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
3281                 aliveness = lp->lpni_alive ? "up" : "down";
3282
3283         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
3284                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
3285                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
3286                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
3287                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
3288
3289         lnet_peer_ni_decref_locked(lp);
3290
3291         lnet_net_unlock(cpt);
3292 }
3293
3294 /* Gathering information for userspace. */
3295
3296 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
3297                           char aliveness[LNET_MAX_STR_LEN],
3298                           __u32 *cpt_iter, __u32 *refcount,
3299                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
3300                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
3301                           __u32 *peer_tx_qnob)
3302 {
3303         struct lnet_peer_table          *peer_table;
3304         struct lnet_peer_ni             *lp;
3305         int                             j;
3306         int                             lncpt;
3307         bool                            found = false;
3308
3309         /* get the number of CPTs */
3310         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3311
3312         /* if the cpt number to be examined is >= the number of cpts in
3313          * the system then indicate that there are no more cpts to examin
3314          */
3315         if (*cpt_iter >= lncpt)
3316                 return -ENOENT;
3317
3318         /* get the current table */
3319         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
3320         /* if the ptable is NULL then there are no more cpts to examine */
3321         if (peer_table == NULL)
3322                 return -ENOENT;
3323
3324         lnet_net_lock(*cpt_iter);
3325
3326         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
3327                 struct list_head *peers = &peer_table->pt_hash[j];
3328
3329                 list_for_each_entry(lp, peers, lpni_hashlist) {
3330                         if (peer_index-- > 0)
3331                                 continue;
3332
3333                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
3334                         if (lnet_isrouter(lp) ||
3335                                 lnet_peer_aliveness_enabled(lp))
3336                                 snprintf(aliveness, LNET_MAX_STR_LEN,
3337                                          lp->lpni_alive ? "up" : "down");
3338
3339                         *nid = lp->lpni_nid;
3340                         *refcount = atomic_read(&lp->lpni_refcount);
3341                         *ni_peer_tx_credits =
3342                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
3343                         *peer_tx_credits = lp->lpni_txcredits;
3344                         *peer_rtr_credits = lp->lpni_rtrcredits;
3345                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
3346                         *peer_tx_qnob = lp->lpni_txqnob;
3347
3348                         found = true;
3349                 }
3350
3351         }
3352         lnet_net_unlock(*cpt_iter);
3353
3354         *cpt_iter = lncpt;
3355
3356         return found ? 0 : -ENOENT;
3357 }
3358
3359 /* ln_api_mutex is held, which keeps the peer list stable */
3360 int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
3361 {
3362         struct lnet_ioctl_element_stats *lpni_stats;
3363         struct lnet_ioctl_element_msg_stats *lpni_msg_stats;
3364         struct lnet_ioctl_peer_ni_hstats *lpni_hstats;
3365         struct lnet_peer_ni_credit_info *lpni_info;
3366         struct lnet_peer_ni *lpni;
3367         struct lnet_peer *lp;
3368         lnet_nid_t nid;
3369         __u32 size;
3370         int rc;
3371
3372         lp = lnet_find_peer(cfg->prcfg_prim_nid);
3373
3374         if (!lp) {
3375                 rc = -ENOENT;
3376                 goto out;
3377         }
3378
3379         size = sizeof(nid) + sizeof(*lpni_info) + sizeof(*lpni_stats)
3380                 + sizeof(*lpni_msg_stats) + sizeof(*lpni_hstats);
3381         size *= lp->lp_nnis;
3382         if (size > cfg->prcfg_size) {
3383                 cfg->prcfg_size = size;
3384                 rc = -E2BIG;
3385                 goto out_lp_decref;
3386         }
3387
3388         cfg->prcfg_prim_nid = lp->lp_primary_nid;
3389         cfg->prcfg_mr = lnet_peer_is_multi_rail(lp);
3390         cfg->prcfg_cfg_nid = lp->lp_primary_nid;
3391         cfg->prcfg_count = lp->lp_nnis;
3392         cfg->prcfg_size = size;
3393         cfg->prcfg_state = lp->lp_state;
3394
3395         /* Allocate helper buffers. */
3396         rc = -ENOMEM;
3397         LIBCFS_ALLOC(lpni_info, sizeof(*lpni_info));
3398         if (!lpni_info)
3399                 goto out_lp_decref;
3400         LIBCFS_ALLOC(lpni_stats, sizeof(*lpni_stats));
3401         if (!lpni_stats)
3402                 goto out_free_info;
3403         LIBCFS_ALLOC(lpni_msg_stats, sizeof(*lpni_msg_stats));
3404         if (!lpni_msg_stats)
3405                 goto out_free_stats;
3406         LIBCFS_ALLOC(lpni_hstats, sizeof(*lpni_hstats));
3407         if (!lpni_hstats)
3408                 goto out_free_msg_stats;
3409
3410
3411         lpni = NULL;
3412         rc = -EFAULT;
3413         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
3414                 nid = lpni->lpni_nid;
3415                 if (copy_to_user(bulk, &nid, sizeof(nid)))
3416                         goto out_free_hstats;
3417                 bulk += sizeof(nid);
3418
3419                 memset(lpni_info, 0, sizeof(*lpni_info));
3420                 snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
3421                 if (lnet_isrouter(lpni) ||
3422                         lnet_peer_aliveness_enabled(lpni))
3423                         snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN,
3424                                 lpni->lpni_alive ? "up" : "down");
3425
3426                 lpni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
3427                 lpni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
3428                         lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
3429                 lpni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
3430                 lpni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
3431                 lpni_info->cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
3432                 lpni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
3433                 lpni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
3434                 if (copy_to_user(bulk, lpni_info, sizeof(*lpni_info)))
3435                         goto out_free_hstats;
3436                 bulk += sizeof(*lpni_info);
3437
3438                 memset(lpni_stats, 0, sizeof(*lpni_stats));
3439                 lpni_stats->iel_send_count = lnet_sum_stats(&lpni->lpni_stats,
3440                                                             LNET_STATS_TYPE_SEND);
3441                 lpni_stats->iel_recv_count = lnet_sum_stats(&lpni->lpni_stats,
3442                                                             LNET_STATS_TYPE_RECV);
3443                 lpni_stats->iel_drop_count = lnet_sum_stats(&lpni->lpni_stats,
3444                                                             LNET_STATS_TYPE_DROP);
3445                 if (copy_to_user(bulk, lpni_stats, sizeof(*lpni_stats)))
3446                         goto out_free_hstats;
3447                 bulk += sizeof(*lpni_stats);
3448                 lnet_usr_translate_stats(lpni_msg_stats, &lpni->lpni_stats);
3449                 if (copy_to_user(bulk, lpni_msg_stats, sizeof(*lpni_msg_stats)))
3450                         goto out_free_hstats;
3451                 bulk += sizeof(*lpni_msg_stats);
3452                 lpni_hstats->hlpni_network_timeout =
3453                   atomic_read(&lpni->lpni_hstats.hlt_network_timeout);
3454                 lpni_hstats->hlpni_remote_dropped =
3455                   atomic_read(&lpni->lpni_hstats.hlt_remote_dropped);
3456                 lpni_hstats->hlpni_remote_timeout =
3457                   atomic_read(&lpni->lpni_hstats.hlt_remote_timeout);
3458                 lpni_hstats->hlpni_remote_error =
3459                   atomic_read(&lpni->lpni_hstats.hlt_remote_error);
3460                 lpni_hstats->hlpni_health_value =
3461                   atomic_read(&lpni->lpni_healthv);
3462                 if (copy_to_user(bulk, lpni_hstats, sizeof(*lpni_hstats)))
3463                         goto out_free_hstats;
3464                 bulk += sizeof(*lpni_hstats);
3465         }
3466         rc = 0;
3467
3468 out_free_hstats:
3469         LIBCFS_FREE(lpni_hstats, sizeof(*lpni_hstats));
3470 out_free_msg_stats:
3471         LIBCFS_FREE(lpni_msg_stats, sizeof(*lpni_msg_stats));
3472 out_free_stats:
3473         LIBCFS_FREE(lpni_stats, sizeof(*lpni_stats));
3474 out_free_info:
3475         LIBCFS_FREE(lpni_info, sizeof(*lpni_info));
3476 out_lp_decref:
3477         lnet_peer_decref_locked(lp);
3478 out:
3479         return rc;
3480 }
3481
3482 void
3483 lnet_peer_ni_add_to_recoveryq_locked(struct lnet_peer_ni *lpni)
3484 {
3485         /* the mt could've shutdown and cleaned up the queues */
3486         if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
3487                 return;
3488
3489         if (list_empty(&lpni->lpni_recovery) &&
3490             atomic_read(&lpni->lpni_healthv) < LNET_MAX_HEALTH_VALUE) {
3491                 CERROR("lpni %s added to recovery queue. Health = %d\n",
3492                         libcfs_nid2str(lpni->lpni_nid),
3493                         atomic_read(&lpni->lpni_healthv));
3494                 list_add_tail(&lpni->lpni_recovery, &the_lnet.ln_mt_peerNIRecovq);
3495                 lnet_peer_ni_addref_locked(lpni);
3496         }
3497 }
3498
3499 /* Call with the ln_api_mutex held */
3500 void
3501 lnet_peer_ni_set_healthv(lnet_nid_t nid, int value, bool all)
3502 {
3503         struct lnet_peer_table *ptable;
3504         struct lnet_peer *lp;
3505         struct lnet_peer_net *lpn;
3506         struct lnet_peer_ni *lpni;
3507         int lncpt;
3508         int cpt;
3509
3510         if (the_lnet.ln_state != LNET_STATE_RUNNING)
3511                 return;
3512
3513         if (!all) {
3514                 lnet_net_lock(LNET_LOCK_EX);
3515                 lpni = lnet_find_peer_ni_locked(nid);
3516                 if (!lpni) {
3517                         lnet_net_unlock(LNET_LOCK_EX);
3518                         return;
3519                 }
3520                 atomic_set(&lpni->lpni_healthv, value);
3521                 lnet_peer_ni_add_to_recoveryq_locked(lpni);
3522                 lnet_peer_ni_decref_locked(lpni);
3523                 lnet_net_unlock(LNET_LOCK_EX);
3524                 return;
3525         }
3526
3527         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3528
3529         /*
3530          * Walk all the peers and reset the healhv for each one to the
3531          * maximum value.
3532          */
3533         lnet_net_lock(LNET_LOCK_EX);
3534         for (cpt = 0; cpt < lncpt; cpt++) {
3535                 ptable = the_lnet.ln_peer_tables[cpt];
3536                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
3537                         list_for_each_entry(lpn, &lp->lp_peer_nets, lpn_peer_nets) {
3538                                 list_for_each_entry(lpni, &lpn->lpn_peer_nis,
3539                                                     lpni_peer_nis) {
3540                                         atomic_set(&lpni->lpni_healthv, value);
3541                                         lnet_peer_ni_add_to_recoveryq_locked(lpni);
3542                                 }
3543                         }
3544                 }
3545         }
3546         lnet_net_unlock(LNET_LOCK_EX);
3547 }
3548