Whamcloud - gitweb
LU-11299 lnet: use discovery for routing
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <linux/sched.h>
38 #ifdef HAVE_SCHED_HEADERS
39 #include <linux/sched/signal.h>
40 #endif
41 #include <linux/uaccess.h>
42
43 #include <lnet/lib-lnet.h>
44 #include <uapi/linux/lnet/lnet-dlc.h>
45
46 /* Value indicating that recovery needs to re-check a peer immediately. */
47 #define LNET_REDISCOVER_PEER    (1)
48
49 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp);
50
51 static void
52 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
53 {
54         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
55                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
56                 lnet_peer_ni_decref_locked(lpni);
57         }
58 }
59
60 void
61 lnet_peer_net_added(struct lnet_net *net)
62 {
63         struct lnet_peer_ni *lpni, *tmp;
64
65         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
66                                  lpni_on_remote_peer_ni_list) {
67
68                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
69                         lpni->lpni_net = net;
70
71                         spin_lock(&lpni->lpni_lock);
72                         lpni->lpni_txcredits =
73                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
74                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
75                         lpni->lpni_rtrcredits =
76                                 lnet_peer_buffer_credits(lpni->lpni_net);
77                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
78                         spin_unlock(&lpni->lpni_lock);
79
80                         lnet_peer_remove_from_remote_list(lpni);
81                 }
82         }
83 }
84
85 static void
86 lnet_peer_tables_destroy(void)
87 {
88         struct lnet_peer_table  *ptable;
89         struct list_head        *hash;
90         int                     i;
91         int                     j;
92
93         if (!the_lnet.ln_peer_tables)
94                 return;
95
96         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
97                 hash = ptable->pt_hash;
98                 if (!hash) /* not intialized */
99                         break;
100
101                 LASSERT(list_empty(&ptable->pt_zombie_list));
102
103                 ptable->pt_hash = NULL;
104                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
105                         LASSERT(list_empty(&hash[j]));
106
107                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
108         }
109
110         cfs_percpt_free(the_lnet.ln_peer_tables);
111         the_lnet.ln_peer_tables = NULL;
112 }
113
114 int
115 lnet_peer_tables_create(void)
116 {
117         struct lnet_peer_table  *ptable;
118         struct list_head        *hash;
119         int                     i;
120         int                     j;
121
122         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
123                                                    sizeof(*ptable));
124         if (the_lnet.ln_peer_tables == NULL) {
125                 CERROR("Failed to allocate cpu-partition peer tables\n");
126                 return -ENOMEM;
127         }
128
129         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
130                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
131                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
132                 if (hash == NULL) {
133                         CERROR("Failed to create peer hash table\n");
134                         lnet_peer_tables_destroy();
135                         return -ENOMEM;
136                 }
137
138                 spin_lock_init(&ptable->pt_zombie_lock);
139                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
140
141                 INIT_LIST_HEAD(&ptable->pt_peer_list);
142
143                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
144                         INIT_LIST_HEAD(&hash[j]);
145                 ptable->pt_hash = hash; /* sign of initialization */
146         }
147
148         return 0;
149 }
150
151 static struct lnet_peer_ni *
152 lnet_peer_ni_alloc(lnet_nid_t nid)
153 {
154         struct lnet_peer_ni *lpni;
155         struct lnet_net *net;
156         int cpt;
157
158         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
159
160         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
161         if (!lpni)
162                 return NULL;
163
164         INIT_LIST_HEAD(&lpni->lpni_txq);
165         INIT_LIST_HEAD(&lpni->lpni_hashlist);
166         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
167         INIT_LIST_HEAD(&lpni->lpni_recovery);
168         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
169         LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
170
171         spin_lock_init(&lpni->lpni_lock);
172
173         if (lnet_peers_start_down())
174                 lpni->lpni_ns_status = LNET_NI_STATUS_DOWN;
175         else
176                 lpni->lpni_ns_status = LNET_NI_STATUS_UP;
177         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
178         lpni->lpni_nid = nid;
179         lpni->lpni_cpt = cpt;
180         atomic_set(&lpni->lpni_healthv, LNET_MAX_HEALTH_VALUE);
181
182         net = lnet_get_net_locked(LNET_NIDNET(nid));
183         lpni->lpni_net = net;
184         if (net) {
185                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
186                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
187                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
188                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
189         } else {
190                 /*
191                  * This peer_ni is not on a local network, so we
192                  * cannot add the credits here. In case the net is
193                  * added later, add the peer_ni to the remote peer ni
194                  * list so it can be easily found and revisited.
195                  */
196                 /* FIXME: per-net implementation instead? */
197                 atomic_inc(&lpni->lpni_refcount);
198                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
199                               &the_lnet.ln_remote_peer_ni_list);
200         }
201
202         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
203
204         return lpni;
205 }
206
207 static struct lnet_peer_net *
208 lnet_peer_net_alloc(__u32 net_id)
209 {
210         struct lnet_peer_net *lpn;
211
212         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
213         if (!lpn)
214                 return NULL;
215
216         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
217         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
218         lpn->lpn_net_id = net_id;
219
220         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
221
222         return lpn;
223 }
224
225 void
226 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
227 {
228         struct lnet_peer *lp;
229
230         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
231
232         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
233         LASSERT(list_empty(&lpn->lpn_peer_nis));
234         LASSERT(list_empty(&lpn->lpn_peer_nets));
235         lp = lpn->lpn_peer;
236         lpn->lpn_peer = NULL;
237         LIBCFS_FREE(lpn, sizeof(*lpn));
238
239         lnet_peer_decref_locked(lp);
240 }
241
242 static struct lnet_peer *
243 lnet_peer_alloc(lnet_nid_t nid)
244 {
245         struct lnet_peer *lp;
246
247         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
248         if (!lp)
249                 return NULL;
250
251         INIT_LIST_HEAD(&lp->lp_rtrq);
252         INIT_LIST_HEAD(&lp->lp_routes);
253         INIT_LIST_HEAD(&lp->lp_peer_list);
254         INIT_LIST_HEAD(&lp->lp_peer_nets);
255         INIT_LIST_HEAD(&lp->lp_dc_list);
256         INIT_LIST_HEAD(&lp->lp_dc_pendq);
257         INIT_LIST_HEAD(&lp->lp_rtr_list);
258         init_waitqueue_head(&lp->lp_dc_waitq);
259         spin_lock_init(&lp->lp_lock);
260         lp->lp_primary_nid = nid;
261         /*
262          * Turn off discovery for loopback peer. If you're creating a peer
263          * for the loopback interface then that was initiated when we
264          * attempted to send a message over the loopback. There is no need
265          * to ever use a different interface when sending messages to
266          * myself.
267          */
268         if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
269                 lp->lp_state = LNET_PEER_NO_DISCOVERY;
270         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
271
272         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
273
274         return lp;
275 }
276
277 void
278 lnet_destroy_peer_locked(struct lnet_peer *lp)
279 {
280         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
281
282         LASSERT(atomic_read(&lp->lp_refcount) == 0);
283         LASSERT(lp->lp_rtr_refcount == 0);
284         LASSERT(list_empty(&lp->lp_peer_nets));
285         LASSERT(list_empty(&lp->lp_peer_list));
286         LASSERT(list_empty(&lp->lp_dc_list));
287
288         if (lp->lp_data)
289                 lnet_ping_buffer_decref(lp->lp_data);
290
291         /*
292          * if there are messages still on the pending queue, then make
293          * sure to queue them on the ln_msg_resend list so they can be
294          * resent at a later point if the discovery thread is still
295          * running.
296          * If the discovery thread has stopped, then the wakeup will be a
297          * no-op, and it is expected the lnet_shutdown_lndnets() will
298          * eventually be called, which will traverse this list and
299          * finalize the messages on the list.
300          * We can not resend them now because we're holding the cpt lock.
301          * Releasing the lock can cause an inconsistent state
302          */
303         spin_lock(&the_lnet.ln_msg_resend_lock);
304         spin_lock(&lp->lp_lock);
305         list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
306         spin_unlock(&lp->lp_lock);
307         spin_unlock(&the_lnet.ln_msg_resend_lock);
308         wake_up(&the_lnet.ln_dc_waitq);
309
310         LIBCFS_FREE(lp, sizeof(*lp));
311 }
312
313 /*
314  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
315  * that peer_net, detach the peer_net from the peer.
316  *
317  * Call with lnet_net_lock/EX held
318  */
319 static void
320 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
321 {
322         struct lnet_peer_table *ptable;
323         struct lnet_peer_net *lpn;
324         struct lnet_peer *lp;
325
326         /*
327          * Belts and suspenders: gracefully handle teardown of a
328          * partially connected peer_ni.
329          */
330         lpn = lpni->lpni_peer_net;
331
332         list_del_init(&lpni->lpni_peer_nis);
333         /*
334          * If there are no lpni's left, we detach lpn from
335          * lp_peer_nets, so it cannot be found anymore.
336          */
337         if (list_empty(&lpn->lpn_peer_nis))
338                 list_del_init(&lpn->lpn_peer_nets);
339
340         /* Update peer NID count. */
341         lp = lpn->lpn_peer;
342         lp->lp_nnis--;
343
344         /*
345          * If there are no more peer nets, make the peer unfindable
346          * via the peer_tables.
347          *
348          * Otherwise, if the peer is DISCOVERED, tell discovery to
349          * take another look at it. This is a no-op if discovery for
350          * this peer did the detaching.
351          */
352         if (list_empty(&lp->lp_peer_nets)) {
353                 list_del_init(&lp->lp_peer_list);
354                 ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
355                 ptable->pt_peers--;
356         } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) {
357                 /* Discovery isn't running, nothing to do here. */
358         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
359                 lnet_peer_queue_for_discovery(lp);
360                 wake_up(&the_lnet.ln_dc_waitq);
361         }
362         CDEBUG(D_NET, "peer %s NID %s\n",
363                 libcfs_nid2str(lp->lp_primary_nid),
364                 libcfs_nid2str(lpni->lpni_nid));
365 }
366
367 /* called with lnet_net_lock LNET_LOCK_EX held */
368 static int
369 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
370 {
371         struct lnet_peer_table *ptable = NULL;
372
373         /* don't remove a peer_ni if it's also a gateway */
374         if (lnet_isrouter(lpni)) {
375                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
376                        libcfs_nid2str(lpni->lpni_nid));
377                 return -EBUSY;
378         }
379
380         lnet_peer_remove_from_remote_list(lpni);
381
382         /* remove peer ni from the hash list. */
383         list_del_init(&lpni->lpni_hashlist);
384
385         /*
386          * indicate the peer is being deleted so the monitor thread can
387          * remove it from the recovery queue.
388          */
389         spin_lock(&lpni->lpni_lock);
390         lpni->lpni_state |= LNET_PEER_NI_DELETING;
391         spin_unlock(&lpni->lpni_lock);
392
393         /* decrement the ref count on the peer table */
394         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
395         LASSERT(ptable->pt_number > 0);
396         ptable->pt_number--;
397
398         /*
399          * The peer_ni can no longer be found with a lookup. But there
400          * can be current users, so keep track of it on the zombie
401          * list until the reference count has gone to zero.
402          *
403          * The last reference may be lost in a place where the
404          * lnet_net_lock locks only a single cpt, and that cpt may not
405          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
406          * has its own lock.
407          */
408         spin_lock(&ptable->pt_zombie_lock);
409         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
410         ptable->pt_zombies++;
411         spin_unlock(&ptable->pt_zombie_lock);
412
413         /* no need to keep this peer_ni on the hierarchy anymore */
414         lnet_peer_detach_peer_ni_locked(lpni);
415
416         /* remove hashlist reference on peer_ni */
417         lnet_peer_ni_decref_locked(lpni);
418
419         return 0;
420 }
421
422 void lnet_peer_uninit(void)
423 {
424         struct lnet_peer_ni *lpni, *tmp;
425
426         lnet_net_lock(LNET_LOCK_EX);
427
428         /* remove all peer_nis from the remote peer and the hash list */
429         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
430                                  lpni_on_remote_peer_ni_list)
431                 lnet_peer_ni_del_locked(lpni);
432
433         lnet_peer_tables_destroy();
434
435         lnet_net_unlock(LNET_LOCK_EX);
436 }
437
438 static int
439 lnet_peer_del_locked(struct lnet_peer *peer)
440 {
441         struct lnet_peer_ni *lpni = NULL, *lpni2;
442         int rc = 0, rc2 = 0;
443
444         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
445
446         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
447         while (lpni != NULL) {
448                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
449                 rc = lnet_peer_ni_del_locked(lpni);
450                 if (rc != 0)
451                         rc2 = rc;
452                 lpni = lpni2;
453         }
454
455         return rc2;
456 }
457
458 static int
459 lnet_peer_del(struct lnet_peer *peer)
460 {
461         lnet_net_lock(LNET_LOCK_EX);
462         lnet_peer_del_locked(peer);
463         lnet_net_unlock(LNET_LOCK_EX);
464
465         return 0;
466 }
467
468 /*
469  * Delete a NID from a peer. Call with ln_api_mutex held.
470  *
471  * Error codes:
472  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
473  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
474  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
475  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
476  */
477 static int
478 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
479 {
480         struct lnet_peer_ni *lpni;
481         lnet_nid_t primary_nid = lp->lp_primary_nid;
482         int rc = 0;
483
484         if (!(flags & LNET_PEER_CONFIGURED)) {
485                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
486                         rc = -EPERM;
487                         goto out;
488                 }
489         }
490         lpni = lnet_find_peer_ni_locked(nid);
491         if (!lpni) {
492                 rc = -ENOENT;
493                 goto out;
494         }
495         lnet_peer_ni_decref_locked(lpni);
496         if (lp != lpni->lpni_peer_net->lpn_peer) {
497                 rc = -ECHILD;
498                 goto out;
499         }
500
501         /*
502          * This function only allows deletion of the primary NID if it
503          * is the only NID.
504          */
505         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
506                 rc = -EBUSY;
507                 goto out;
508         }
509
510         lnet_net_lock(LNET_LOCK_EX);
511
512         rc = lnet_peer_ni_del_locked(lpni);
513
514         lnet_net_unlock(LNET_LOCK_EX);
515
516 out:
517         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
518                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
519
520         return rc;
521 }
522
523 static void
524 lnet_peer_table_cleanup_locked(struct lnet_net *net,
525                                struct lnet_peer_table *ptable)
526 {
527         int                      i;
528         struct lnet_peer_ni     *next;
529         struct lnet_peer_ni     *lpni;
530         struct lnet_peer        *peer;
531
532         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
533                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
534                                          lpni_hashlist) {
535                         if (net != NULL && net != lpni->lpni_net)
536                                 continue;
537
538                         peer = lpni->lpni_peer_net->lpn_peer;
539                         if (peer->lp_primary_nid != lpni->lpni_nid) {
540                                 lnet_peer_ni_del_locked(lpni);
541                                 continue;
542                         }
543                         /*
544                          * Removing the primary NID implies removing
545                          * the entire peer. Advance next beyond any
546                          * peer_ni that belongs to the same peer.
547                          */
548                         list_for_each_entry_from(next, &ptable->pt_hash[i],
549                                                  lpni_hashlist) {
550                                 if (next->lpni_peer_net->lpn_peer != peer)
551                                         break;
552                         }
553                         lnet_peer_del_locked(peer);
554                 }
555         }
556 }
557
558 static void
559 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
560 {
561         int     i = 3;
562
563         spin_lock(&ptable->pt_zombie_lock);
564         while (ptable->pt_zombies) {
565                 spin_unlock(&ptable->pt_zombie_lock);
566
567                 if (is_power_of_2(i)) {
568                         CDEBUG(D_WARNING,
569                                "Waiting for %d zombies on peer table\n",
570                                ptable->pt_zombies);
571                 }
572                 set_current_state(TASK_UNINTERRUPTIBLE);
573                 schedule_timeout(cfs_time_seconds(1) >> 1);
574                 spin_lock(&ptable->pt_zombie_lock);
575         }
576         spin_unlock(&ptable->pt_zombie_lock);
577 }
578
579 static void
580 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
581                                 struct lnet_peer_table *ptable)
582 {
583         struct lnet_peer_ni     *lp;
584         struct lnet_peer_ni     *tmp;
585         lnet_nid_t              gw_nid;
586         int                     i;
587
588         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
589                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
590                                          lpni_hashlist) {
591                         if (net != lp->lpni_net)
592                                 continue;
593
594                         if (!lnet_isrouter(lp))
595                                 continue;
596
597                         gw_nid = lp->lpni_peer_net->lpn_peer->lp_primary_nid;
598
599                         lnet_net_unlock(LNET_LOCK_EX);
600                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), gw_nid);
601                         lnet_net_lock(LNET_LOCK_EX);
602                 }
603         }
604 }
605
606 void
607 lnet_peer_tables_cleanup(struct lnet_net *net)
608 {
609         int i;
610         struct lnet_peer_table *ptable;
611
612         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
613         /* If just deleting the peers for a NI, get rid of any routes these
614          * peers are gateways for. */
615         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
616                 lnet_net_lock(LNET_LOCK_EX);
617                 lnet_peer_table_del_rtrs_locked(net, ptable);
618                 lnet_net_unlock(LNET_LOCK_EX);
619         }
620
621         /* Start the cleanup process */
622         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
623                 lnet_net_lock(LNET_LOCK_EX);
624                 lnet_peer_table_cleanup_locked(net, ptable);
625                 lnet_net_unlock(LNET_LOCK_EX);
626         }
627
628         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
629                 lnet_peer_ni_finalize_wait(ptable);
630 }
631
632 static struct lnet_peer_ni *
633 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
634 {
635         struct list_head        *peers;
636         struct lnet_peer_ni     *lp;
637
638         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
639
640         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
641         list_for_each_entry(lp, peers, lpni_hashlist) {
642                 if (lp->lpni_nid == nid) {
643                         lnet_peer_ni_addref_locked(lp);
644                         return lp;
645                 }
646         }
647
648         return NULL;
649 }
650
651 struct lnet_peer_ni *
652 lnet_find_peer_ni_locked(lnet_nid_t nid)
653 {
654         struct lnet_peer_ni *lpni;
655         struct lnet_peer_table *ptable;
656         int cpt;
657
658         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
659
660         ptable = the_lnet.ln_peer_tables[cpt];
661         lpni = lnet_get_peer_ni_locked(ptable, nid);
662
663         return lpni;
664 }
665
666 struct lnet_peer_ni *
667 lnet_peer_get_ni_locked(struct lnet_peer *lp, lnet_nid_t nid)
668 {
669         struct lnet_peer_net *lpn;
670         struct lnet_peer_ni *lpni;
671
672         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
673         if (!lpn)
674                 return NULL;
675
676         list_for_each_entry(lpni, &lpn->lpn_peer_nis, lpni_peer_nis) {
677                 if (lpni->lpni_nid == nid)
678                         return lpni;
679         }
680
681         return NULL;
682 }
683
684 struct lnet_peer *
685 lnet_find_peer(lnet_nid_t nid)
686 {
687         struct lnet_peer_ni *lpni;
688         struct lnet_peer *lp = NULL;
689         int cpt;
690
691         cpt = lnet_net_lock_current();
692         lpni = lnet_find_peer_ni_locked(nid);
693         if (lpni) {
694                 lp = lpni->lpni_peer_net->lpn_peer;
695                 lnet_peer_addref_locked(lp);
696                 lnet_peer_ni_decref_locked(lpni);
697         }
698         lnet_net_unlock(cpt);
699
700         return lp;
701 }
702
703 struct lnet_peer_ni *
704 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
705                              struct lnet_peer_net *peer_net,
706                              struct lnet_peer_ni *prev)
707 {
708         struct lnet_peer_ni *lpni;
709         struct lnet_peer_net *net = peer_net;
710
711         if (!prev) {
712                 if (!net) {
713                         if (list_empty(&peer->lp_peer_nets))
714                                 return NULL;
715
716                         net = list_entry(peer->lp_peer_nets.next,
717                                          struct lnet_peer_net,
718                                          lpn_peer_nets);
719                 }
720                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
721                                   lpni_peer_nis);
722
723                 return lpni;
724         }
725
726         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
727                 /*
728                  * if you reached the end of the peer ni list and the peer
729                  * net is specified then there are no more peer nis in that
730                  * net.
731                  */
732                 if (net)
733                         return NULL;
734
735                 /*
736                  * we reached the end of this net ni list. move to the
737                  * next net
738                  */
739                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
740                     &peer->lp_peer_nets)
741                         /* no more nets and no more NIs. */
742                         return NULL;
743
744                 /* get the next net */
745                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
746                                  struct lnet_peer_net,
747                                  lpn_peer_nets);
748                 /* get the ni on it */
749                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
750                                   lpni_peer_nis);
751
752                 return lpni;
753         }
754
755         /* there are more nis left */
756         lpni = list_entry(prev->lpni_peer_nis.next,
757                           struct lnet_peer_ni, lpni_peer_nis);
758
759         return lpni;
760 }
761
762 /* Call with the ln_api_mutex held */
763 int lnet_get_peer_list(u32 *countp, u32 *sizep, struct lnet_process_id __user *ids)
764 {
765         struct lnet_process_id id;
766         struct lnet_peer_table *ptable;
767         struct lnet_peer *lp;
768         __u32 count = 0;
769         __u32 size = 0;
770         int lncpt;
771         int cpt;
772         __u32 i;
773         int rc;
774
775         rc = -ESHUTDOWN;
776         if (the_lnet.ln_state != LNET_STATE_RUNNING)
777                 goto done;
778
779         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
780
781         /*
782          * Count the number of peers, and return E2BIG if the buffer
783          * is too small. We'll also return the desired size.
784          */
785         rc = -E2BIG;
786         for (cpt = 0; cpt < lncpt; cpt++) {
787                 ptable = the_lnet.ln_peer_tables[cpt];
788                 count += ptable->pt_peers;
789         }
790         size = count * sizeof(*ids);
791         if (size > *sizep)
792                 goto done;
793
794         /*
795          * Walk the peer lists and copy out the primary nids.
796          * This is safe because the peer lists are only modified
797          * while the ln_api_mutex is held. So we don't need to
798          * hold the lnet_net_lock as well, and can therefore
799          * directly call copy_to_user().
800          */
801         rc = -EFAULT;
802         memset(&id, 0, sizeof(id));
803         id.pid = LNET_PID_LUSTRE;
804         i = 0;
805         for (cpt = 0; cpt < lncpt; cpt++) {
806                 ptable = the_lnet.ln_peer_tables[cpt];
807                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
808                         if (i >= count)
809                                 goto done;
810                         id.nid = lp->lp_primary_nid;
811                         if (copy_to_user(&ids[i], &id, sizeof(id)))
812                                 goto done;
813                         i++;
814                 }
815         }
816         rc = 0;
817 done:
818         *countp = count;
819         *sizep = size;
820         return rc;
821 }
822
823 /*
824  * Start pushes to peers that need to be updated for a configuration
825  * change on this node.
826  */
827 void
828 lnet_push_update_to_peers(int force)
829 {
830         struct lnet_peer_table *ptable;
831         struct lnet_peer *lp;
832         int lncpt;
833         int cpt;
834
835         lnet_net_lock(LNET_LOCK_EX);
836         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
837         for (cpt = 0; cpt < lncpt; cpt++) {
838                 ptable = the_lnet.ln_peer_tables[cpt];
839                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
840                         if (force) {
841                                 spin_lock(&lp->lp_lock);
842                                 if (lp->lp_state & LNET_PEER_MULTI_RAIL)
843                                         lp->lp_state |= LNET_PEER_FORCE_PUSH;
844                                 spin_unlock(&lp->lp_lock);
845                         }
846                         if (lnet_peer_needs_push(lp))
847                                 lnet_peer_queue_for_discovery(lp);
848                 }
849         }
850         lnet_net_unlock(LNET_LOCK_EX);
851         wake_up(&the_lnet.ln_dc_waitq);
852 }
853
854 /*
855  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
856  * this is a preferred point-to-point path. Call with lnet_net_lock in
857  * shared mmode.
858  */
859 bool
860 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
861 {
862         int i;
863
864         if (lpni->lpni_pref_nnids == 0)
865                 return false;
866         if (lpni->lpni_pref_nnids == 1)
867                 return lpni->lpni_pref.nid == nid;
868         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
869                 if (lpni->lpni_pref.nids[i] == nid)
870                         return true;
871         }
872         return false;
873 }
874
875 /*
876  * Set a single ni as preferred, provided no preferred ni is already
877  * defined. Only to be used for non-multi-rail peer_ni.
878  */
879 int
880 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
881 {
882         int rc = 0;
883
884         spin_lock(&lpni->lpni_lock);
885         if (nid == LNET_NID_ANY) {
886                 rc = -EINVAL;
887         } else if (lpni->lpni_pref_nnids > 0) {
888                 rc = -EPERM;
889         } else if (lpni->lpni_pref_nnids == 0) {
890                 lpni->lpni_pref.nid = nid;
891                 lpni->lpni_pref_nnids = 1;
892                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
893         }
894         spin_unlock(&lpni->lpni_lock);
895
896         CDEBUG(D_NET, "peer %s nid %s: %d\n",
897                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
898         return rc;
899 }
900
901 /*
902  * Clear the preferred NID from a non-multi-rail peer_ni, provided
903  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
904  */
905 int
906 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
907 {
908         int rc = 0;
909
910         spin_lock(&lpni->lpni_lock);
911         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
912                 lpni->lpni_pref_nnids = 0;
913                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
914         } else if (lpni->lpni_pref_nnids == 0) {
915                 rc = -ENOENT;
916         } else {
917                 rc = -EPERM;
918         }
919         spin_unlock(&lpni->lpni_lock);
920
921         CDEBUG(D_NET, "peer %s: %d\n",
922                libcfs_nid2str(lpni->lpni_nid), rc);
923         return rc;
924 }
925
926 /*
927  * Clear the preferred NIDs from a non-multi-rail peer.
928  */
929 void
930 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
931 {
932         struct lnet_peer_ni *lpni = NULL;
933
934         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
935                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
936 }
937
938 int
939 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
940 {
941         lnet_nid_t *nids = NULL;
942         lnet_nid_t *oldnids = NULL;
943         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
944         int size;
945         int i;
946         int rc = 0;
947
948         if (nid == LNET_NID_ANY) {
949                 rc = -EINVAL;
950                 goto out;
951         }
952
953         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
954                 rc = -EEXIST;
955                 goto out;
956         }
957
958         /* A non-MR node may have only one preferred NI per peer_ni */
959         if (lpni->lpni_pref_nnids > 0) {
960                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
961                         rc = -EPERM;
962                         goto out;
963                 }
964         }
965
966         if (lpni->lpni_pref_nnids != 0) {
967                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
968                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
969                 if (!nids) {
970                         rc = -ENOMEM;
971                         goto out;
972                 }
973                 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
974                         if (lpni->lpni_pref.nids[i] == nid) {
975                                 LIBCFS_FREE(nids, size);
976                                 rc = -EEXIST;
977                                 goto out;
978                         }
979                         nids[i] = lpni->lpni_pref.nids[i];
980                 }
981                 nids[i] = nid;
982         }
983
984         lnet_net_lock(LNET_LOCK_EX);
985         spin_lock(&lpni->lpni_lock);
986         if (lpni->lpni_pref_nnids == 0) {
987                 lpni->lpni_pref.nid = nid;
988         } else {
989                 oldnids = lpni->lpni_pref.nids;
990                 lpni->lpni_pref.nids = nids;
991         }
992         lpni->lpni_pref_nnids++;
993         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
994         spin_unlock(&lpni->lpni_lock);
995         lnet_net_unlock(LNET_LOCK_EX);
996
997         if (oldnids) {
998                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
999                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
1000         }
1001 out:
1002         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
1003                 spin_lock(&lpni->lpni_lock);
1004                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1005                 spin_unlock(&lpni->lpni_lock);
1006         }
1007         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1008                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
1009         return rc;
1010 }
1011
1012 int
1013 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
1014 {
1015         lnet_nid_t *nids = NULL;
1016         lnet_nid_t *oldnids = NULL;
1017         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
1018         int size;
1019         int i, j;
1020         int rc = 0;
1021
1022         if (lpni->lpni_pref_nnids == 0) {
1023                 rc = -ENOENT;
1024                 goto out;
1025         }
1026
1027         if (lpni->lpni_pref_nnids == 1) {
1028                 if (lpni->lpni_pref.nid != nid) {
1029                         rc = -ENOENT;
1030                         goto out;
1031                 }
1032         } else if (lpni->lpni_pref_nnids == 2) {
1033                 if (lpni->lpni_pref.nids[0] != nid &&
1034                     lpni->lpni_pref.nids[1] != nid) {
1035                         rc = -ENOENT;
1036                         goto out;
1037                 }
1038         } else {
1039                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
1040                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
1041                 if (!nids) {
1042                         rc = -ENOMEM;
1043                         goto out;
1044                 }
1045                 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
1046                         if (lpni->lpni_pref.nids[i] != nid)
1047                                 continue;
1048                         nids[j++] = lpni->lpni_pref.nids[i];
1049                 }
1050                 /* Check if we actually removed a nid. */
1051                 if (j == lpni->lpni_pref_nnids) {
1052                         LIBCFS_FREE(nids, size);
1053                         rc = -ENOENT;
1054                         goto out;
1055                 }
1056         }
1057
1058         lnet_net_lock(LNET_LOCK_EX);
1059         spin_lock(&lpni->lpni_lock);
1060         if (lpni->lpni_pref_nnids == 1) {
1061                 lpni->lpni_pref.nid = LNET_NID_ANY;
1062         } else if (lpni->lpni_pref_nnids == 2) {
1063                 oldnids = lpni->lpni_pref.nids;
1064                 if (oldnids[0] == nid)
1065                         lpni->lpni_pref.nid = oldnids[1];
1066                 else
1067                         lpni->lpni_pref.nid = oldnids[2];
1068         } else {
1069                 oldnids = lpni->lpni_pref.nids;
1070                 lpni->lpni_pref.nids = nids;
1071         }
1072         lpni->lpni_pref_nnids--;
1073         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1074         spin_unlock(&lpni->lpni_lock);
1075         lnet_net_unlock(LNET_LOCK_EX);
1076
1077         if (oldnids) {
1078                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
1079                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
1080         }
1081 out:
1082         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1083                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
1084         return rc;
1085 }
1086
1087 lnet_nid_t
1088 lnet_peer_primary_nid_locked(lnet_nid_t nid)
1089 {
1090         struct lnet_peer_ni *lpni;
1091         lnet_nid_t primary_nid = nid;
1092
1093         lpni = lnet_find_peer_ni_locked(nid);
1094         if (lpni) {
1095                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
1096                 lnet_peer_ni_decref_locked(lpni);
1097         }
1098
1099         return primary_nid;
1100 }
1101
1102 lnet_nid_t
1103 LNetPrimaryNID(lnet_nid_t nid)
1104 {
1105         struct lnet_peer *lp;
1106         struct lnet_peer_ni *lpni;
1107         lnet_nid_t primary_nid = nid;
1108         int rc = 0;
1109         int cpt;
1110
1111         cpt = lnet_net_lock_current();
1112         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1113         if (IS_ERR(lpni)) {
1114                 rc = PTR_ERR(lpni);
1115                 goto out_unlock;
1116         }
1117         lp = lpni->lpni_peer_net->lpn_peer;
1118         while (!lnet_peer_is_uptodate(lp)) {
1119                 rc = lnet_discover_peer_locked(lpni, cpt, true);
1120                 if (rc)
1121                         goto out_decref;
1122                 lp = lpni->lpni_peer_net->lpn_peer;
1123         }
1124         primary_nid = lp->lp_primary_nid;
1125 out_decref:
1126         lnet_peer_ni_decref_locked(lpni);
1127 out_unlock:
1128         lnet_net_unlock(cpt);
1129
1130         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
1131                libcfs_nid2str(primary_nid), rc);
1132         return primary_nid;
1133 }
1134 EXPORT_SYMBOL(LNetPrimaryNID);
1135
1136 struct lnet_peer_net *
1137 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
1138 {
1139         struct lnet_peer_net *peer_net;
1140         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
1141                 if (peer_net->lpn_net_id == net_id)
1142                         return peer_net;
1143         }
1144         return NULL;
1145 }
1146
1147 /*
1148  * Attach a peer_ni to a peer_net and peer. This function assumes
1149  * peer_ni is not already attached to the peer_net/peer. The peer_ni
1150  * may be attached to a different peer, in which case it will be
1151  * properly detached first. The whole operation is done atomically.
1152  *
1153  * Always returns 0.  This is the last function called from functions
1154  * that do return an int, so returning 0 here allows the compiler to
1155  * do a tail call.
1156  */
1157 static int
1158 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
1159                                 struct lnet_peer_net *lpn,
1160                                 struct lnet_peer_ni *lpni,
1161                                 unsigned flags)
1162 {
1163         struct lnet_peer_table *ptable;
1164
1165         /* Install the new peer_ni */
1166         lnet_net_lock(LNET_LOCK_EX);
1167         /* Add peer_ni to global peer table hash, if necessary. */
1168         if (list_empty(&lpni->lpni_hashlist)) {
1169                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1170
1171                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1172                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1173                 ptable->pt_version++;
1174                 ptable->pt_number++;
1175                 /* This is the 1st refcount on lpni. */
1176                 atomic_inc(&lpni->lpni_refcount);
1177         }
1178
1179         /* Detach the peer_ni from an existing peer, if necessary. */
1180         if (lpni->lpni_peer_net) {
1181                 LASSERT(lpni->lpni_peer_net != lpn);
1182                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1183                 lnet_peer_detach_peer_ni_locked(lpni);
1184                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1185                 lpni->lpni_peer_net = NULL;
1186         }
1187
1188         /* Add peer_ni to peer_net */
1189         lpni->lpni_peer_net = lpn;
1190         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1191         lnet_peer_net_addref_locked(lpn);
1192
1193         /* Add peer_net to peer */
1194         if (!lpn->lpn_peer) {
1195                 lpn->lpn_peer = lp;
1196                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1197                 lnet_peer_addref_locked(lp);
1198         }
1199
1200         /* Add peer to global peer list, if necessary */
1201         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1202         if (list_empty(&lp->lp_peer_list)) {
1203                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1204                 ptable->pt_peers++;
1205         }
1206
1207
1208         /* Update peer state */
1209         spin_lock(&lp->lp_lock);
1210         if (flags & LNET_PEER_CONFIGURED) {
1211                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1212                         lp->lp_state |= LNET_PEER_CONFIGURED;
1213         }
1214         if (flags & LNET_PEER_MULTI_RAIL) {
1215                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1216                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1217                         lnet_peer_clr_non_mr_pref_nids(lp);
1218                 }
1219         }
1220         spin_unlock(&lp->lp_lock);
1221
1222         lp->lp_nnis++;
1223         lnet_net_unlock(LNET_LOCK_EX);
1224
1225         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1226                libcfs_nid2str(lp->lp_primary_nid),
1227                libcfs_nid2str(lpni->lpni_nid), flags);
1228
1229         return 0;
1230 }
1231
1232 /*
1233  * Create a new peer, with nid as its primary nid.
1234  *
1235  * Call with the lnet_api_mutex held.
1236  */
1237 static int
1238 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1239 {
1240         struct lnet_peer *lp;
1241         struct lnet_peer_net *lpn;
1242         struct lnet_peer_ni *lpni;
1243         int rc = 0;
1244
1245         LASSERT(nid != LNET_NID_ANY);
1246
1247         /*
1248          * No need for the lnet_net_lock here, because the
1249          * lnet_api_mutex is held.
1250          */
1251         lpni = lnet_find_peer_ni_locked(nid);
1252         if (lpni) {
1253                 /* A peer with this NID already exists. */
1254                 lp = lpni->lpni_peer_net->lpn_peer;
1255                 lnet_peer_ni_decref_locked(lpni);
1256                 /*
1257                  * This is an error if the peer was configured and the
1258                  * primary NID differs or an attempt is made to change
1259                  * the Multi-Rail flag. Otherwise the assumption is
1260                  * that an existing peer is being modified.
1261                  */
1262                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1263                         if (lp->lp_primary_nid != nid)
1264                                 rc = -EEXIST;
1265                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1266                                 rc = -EPERM;
1267                         goto out;
1268                 }
1269                 /* Delete and recreate as a configured peer. */
1270                 lnet_peer_del(lp);
1271         }
1272
1273         /* Create peer, peer_net, and peer_ni. */
1274         rc = -ENOMEM;
1275         lp = lnet_peer_alloc(nid);
1276         if (!lp)
1277                 goto out;
1278         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1279         if (!lpn)
1280                 goto out_free_lp;
1281         lpni = lnet_peer_ni_alloc(nid);
1282         if (!lpni)
1283                 goto out_free_lpn;
1284
1285         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1286
1287 out_free_lpn:
1288         LIBCFS_FREE(lpn, sizeof(*lpn));
1289 out_free_lp:
1290         LIBCFS_FREE(lp, sizeof(*lp));
1291 out:
1292         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1293                libcfs_nid2str(nid), flags, rc);
1294         return rc;
1295 }
1296
1297 /*
1298  * Add a NID to a peer. Call with ln_api_mutex held.
1299  *
1300  * Error codes:
1301  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1302  *  -EEXIST:   The NID was configured by DLC for a different peer.
1303  *  -ENOMEM:   Out of memory.
1304  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1305  *             non-multi-rail peer.
1306  */
1307 static int
1308 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1309 {
1310         struct lnet_peer_net *lpn;
1311         struct lnet_peer_ni *lpni;
1312         int rc = 0;
1313
1314         LASSERT(lp);
1315         LASSERT(nid != LNET_NID_ANY);
1316
1317         /* A configured peer can only be updated through configuration. */
1318         if (!(flags & LNET_PEER_CONFIGURED)) {
1319                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1320                         rc = -EPERM;
1321                         goto out;
1322                 }
1323         }
1324
1325         /*
1326          * The MULTI_RAIL flag can be set but not cleared, because
1327          * that would leave the peer struct in an invalid state.
1328          */
1329         if (flags & LNET_PEER_MULTI_RAIL) {
1330                 spin_lock(&lp->lp_lock);
1331                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1332                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1333                         lnet_peer_clr_non_mr_pref_nids(lp);
1334                 }
1335                 spin_unlock(&lp->lp_lock);
1336         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1337                 rc = -EPERM;
1338                 goto out;
1339         }
1340
1341         lpni = lnet_find_peer_ni_locked(nid);
1342         if (lpni) {
1343                 /*
1344                  * A peer_ni already exists. This is only a problem if
1345                  * it is not connected to this peer and was configured
1346                  * by DLC.
1347                  */
1348                 lnet_peer_ni_decref_locked(lpni);
1349                 if (lpni->lpni_peer_net->lpn_peer == lp)
1350                         goto out;
1351                 if (lnet_peer_ni_is_configured(lpni)) {
1352                         rc = -EEXIST;
1353                         goto out;
1354                 }
1355                 /* If this is the primary NID, destroy the peer. */
1356                 if (lnet_peer_ni_is_primary(lpni)) {
1357                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1358                         lpni = lnet_peer_ni_alloc(nid);
1359                         if (!lpni) {
1360                                 rc = -ENOMEM;
1361                                 goto out;
1362                         }
1363                 }
1364         } else {
1365                 lpni = lnet_peer_ni_alloc(nid);
1366                 if (!lpni) {
1367                         rc = -ENOMEM;
1368                         goto out;
1369                 }
1370         }
1371
1372         /*
1373          * Get the peer_net. Check that we're not adding a second
1374          * peer_ni on a peer_net of a non-multi-rail peer.
1375          */
1376         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1377         if (!lpn) {
1378                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1379                 if (!lpn) {
1380                         rc = -ENOMEM;
1381                         goto out_free_lpni;
1382                 }
1383         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1384                 rc = -ENOTUNIQ;
1385                 goto out_free_lpni;
1386         }
1387
1388         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1389
1390 out_free_lpni:
1391         /* If the peer_ni was allocated above its peer_net pointer is NULL */
1392         if (!lpni->lpni_peer_net)
1393                 LIBCFS_FREE(lpni, sizeof(*lpni));
1394 out:
1395         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1396                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1397                flags, rc);
1398         return rc;
1399 }
1400
1401 /*
1402  * Update the primary NID of a peer, if possible.
1403  *
1404  * Call with the lnet_api_mutex held.
1405  */
1406 static int
1407 lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1408 {
1409         lnet_nid_t old = lp->lp_primary_nid;
1410         int rc = 0;
1411
1412         if (lp->lp_primary_nid == nid)
1413                 goto out;
1414         rc = lnet_peer_add_nid(lp, nid, flags);
1415         if (rc)
1416                 goto out;
1417         lp->lp_primary_nid = nid;
1418 out:
1419         CDEBUG(D_NET, "peer %s NID %s: %d\n",
1420                libcfs_nid2str(old), libcfs_nid2str(nid), rc);
1421         return rc;
1422 }
1423
1424 /*
1425  * lpni creation initiated due to traffic either sending or receiving.
1426  */
1427 static int
1428 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1429 {
1430         struct lnet_peer *lp;
1431         struct lnet_peer_net *lpn;
1432         struct lnet_peer_ni *lpni;
1433         unsigned flags = 0;
1434         int rc = 0;
1435
1436         if (nid == LNET_NID_ANY) {
1437                 rc = -EINVAL;
1438                 goto out;
1439         }
1440
1441         /* lnet_net_lock is not needed here because ln_api_lock is held */
1442         lpni = lnet_find_peer_ni_locked(nid);
1443         if (lpni) {
1444                 /*
1445                  * We must have raced with another thread. Since we
1446                  * know next to nothing about a peer_ni created by
1447                  * traffic, we just assume everything is ok and
1448                  * return.
1449                  */
1450                 lnet_peer_ni_decref_locked(lpni);
1451                 goto out;
1452         }
1453
1454         /* Create peer, peer_net, and peer_ni. */
1455         rc = -ENOMEM;
1456         lp = lnet_peer_alloc(nid);
1457         if (!lp)
1458                 goto out;
1459         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1460         if (!lpn)
1461                 goto out_free_lp;
1462         lpni = lnet_peer_ni_alloc(nid);
1463         if (!lpni)
1464                 goto out_free_lpn;
1465         if (pref != LNET_NID_ANY)
1466                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1467
1468         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1469
1470 out_free_lpn:
1471         LIBCFS_FREE(lpn, sizeof(*lpn));
1472 out_free_lp:
1473         LIBCFS_FREE(lp, sizeof(*lp));
1474 out:
1475         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1476         return rc;
1477 }
1478
1479 /*
1480  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1481  *
1482  * This API handles the following combinations:
1483  *   Create a peer with its primary NI if only the prim_nid is provided
1484  *   Add a NID to a peer identified by the prim_nid. The peer identified
1485  *   by the prim_nid must already exist.
1486  *   The peer being created may be non-MR.
1487  *
1488  * The caller must hold ln_api_mutex. This prevents the peer from
1489  * being created/modified/deleted by a different thread.
1490  */
1491 int
1492 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1493 {
1494         struct lnet_peer *lp = NULL;
1495         struct lnet_peer_ni *lpni;
1496         unsigned flags;
1497
1498         /* The prim_nid must always be specified */
1499         if (prim_nid == LNET_NID_ANY)
1500                 return -EINVAL;
1501
1502         flags = LNET_PEER_CONFIGURED;
1503         if (mr)
1504                 flags |= LNET_PEER_MULTI_RAIL;
1505
1506         /*
1507          * If nid isn't specified, we must create a new peer with
1508          * prim_nid as its primary nid.
1509          */
1510         if (nid == LNET_NID_ANY)
1511                 return lnet_peer_add(prim_nid, flags);
1512
1513         /* Look up the prim_nid, which must exist. */
1514         lpni = lnet_find_peer_ni_locked(prim_nid);
1515         if (!lpni)
1516                 return -ENOENT;
1517         lnet_peer_ni_decref_locked(lpni);
1518         lp = lpni->lpni_peer_net->lpn_peer;
1519
1520         /* Peer must have been configured. */
1521         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1522                 CDEBUG(D_NET, "peer %s was not configured\n",
1523                        libcfs_nid2str(prim_nid));
1524                 return -ENOENT;
1525         }
1526
1527         /* Primary NID must match */
1528         if (lp->lp_primary_nid != prim_nid) {
1529                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1530                        libcfs_nid2str(prim_nid),
1531                        libcfs_nid2str(lp->lp_primary_nid));
1532                 return -ENODEV;
1533         }
1534
1535         /* Multi-Rail flag must match. */
1536         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1537                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1538                        libcfs_nid2str(prim_nid));
1539                 return -EPERM;
1540         }
1541
1542         return lnet_peer_add_nid(lp, nid, flags);
1543 }
1544
1545 /*
1546  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1547  *
1548  * This API handles the following combinations:
1549  *   Delete a NI from a peer if both prim_nid and nid are provided.
1550  *   Delete a peer if only prim_nid is provided.
1551  *   Delete a peer if its primary nid is provided.
1552  *
1553  * The caller must hold ln_api_mutex. This prevents the peer from
1554  * being modified/deleted by a different thread.
1555  */
1556 int
1557 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1558 {
1559         struct lnet_peer *lp;
1560         struct lnet_peer_ni *lpni;
1561         unsigned flags;
1562
1563         if (prim_nid == LNET_NID_ANY)
1564                 return -EINVAL;
1565
1566         lpni = lnet_find_peer_ni_locked(prim_nid);
1567         if (!lpni)
1568                 return -ENOENT;
1569         lnet_peer_ni_decref_locked(lpni);
1570         lp = lpni->lpni_peer_net->lpn_peer;
1571
1572         if (prim_nid != lp->lp_primary_nid) {
1573                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1574                        libcfs_nid2str(prim_nid),
1575                        libcfs_nid2str(lp->lp_primary_nid));
1576                 return -ENODEV;
1577         }
1578
1579         lnet_net_lock(LNET_LOCK_EX);
1580         if (lp->lp_rtr_refcount > 0) {
1581                 lnet_net_unlock(LNET_LOCK_EX);
1582                 CERROR("%s is a router. Can not be deleted\n",
1583                        libcfs_nid2str(prim_nid));
1584                 return -EBUSY;
1585         }
1586         lnet_net_unlock(LNET_LOCK_EX);
1587
1588         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1589                 return lnet_peer_del(lp);
1590
1591         flags = LNET_PEER_CONFIGURED;
1592         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1593                 flags |= LNET_PEER_MULTI_RAIL;
1594
1595         return lnet_peer_del_nid(lp, nid, flags);
1596 }
1597
1598 void
1599 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1600 {
1601         struct lnet_peer_table *ptable;
1602         struct lnet_peer_net *lpn;
1603
1604         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1605
1606         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1607         LASSERT(list_empty(&lpni->lpni_txq));
1608         LASSERT(lpni->lpni_txqnob == 0);
1609         LASSERT(list_empty(&lpni->lpni_peer_nis));
1610         LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
1611
1612         lpn = lpni->lpni_peer_net;
1613         lpni->lpni_peer_net = NULL;
1614         lpni->lpni_net = NULL;
1615
1616         /* remove the peer ni from the zombie list */
1617         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1618         spin_lock(&ptable->pt_zombie_lock);
1619         list_del_init(&lpni->lpni_hashlist);
1620         ptable->pt_zombies--;
1621         spin_unlock(&ptable->pt_zombie_lock);
1622
1623         if (lpni->lpni_pref_nnids > 1) {
1624                 LIBCFS_FREE(lpni->lpni_pref.nids,
1625                         sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1626         }
1627         LIBCFS_FREE(lpni, sizeof(*lpni));
1628
1629         lnet_peer_net_decref_locked(lpn);
1630 }
1631
1632 struct lnet_peer_ni *
1633 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1634 {
1635         struct lnet_peer_ni *lpni = NULL;
1636         int rc;
1637
1638         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1639                 return ERR_PTR(-ESHUTDOWN);
1640
1641         /*
1642          * find if a peer_ni already exists.
1643          * If so then just return that.
1644          */
1645         lpni = lnet_find_peer_ni_locked(nid);
1646         if (lpni)
1647                 return lpni;
1648
1649         lnet_net_unlock(cpt);
1650
1651         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1652         if (rc) {
1653                 lpni = ERR_PTR(rc);
1654                 goto out_net_relock;
1655         }
1656
1657         lpni = lnet_find_peer_ni_locked(nid);
1658         LASSERT(lpni);
1659
1660 out_net_relock:
1661         lnet_net_lock(cpt);
1662
1663         return lpni;
1664 }
1665
1666 /*
1667  * Get a peer_ni for the given nid, create it if necessary. Takes a
1668  * hold on the peer_ni.
1669  */
1670 struct lnet_peer_ni *
1671 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1672 {
1673         struct lnet_peer_ni *lpni = NULL;
1674         int rc;
1675
1676         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1677                 return ERR_PTR(-ESHUTDOWN);
1678
1679         /*
1680          * find if a peer_ni already exists.
1681          * If so then just return that.
1682          */
1683         lpni = lnet_find_peer_ni_locked(nid);
1684         if (lpni)
1685                 return lpni;
1686
1687         /*
1688          * Slow path:
1689          * use the lnet_api_mutex to serialize the creation of the peer_ni
1690          * and the creation/deletion of the local ni/net. When a local ni is
1691          * created, if there exists a set of peer_nis on that network,
1692          * they need to be traversed and updated. When a local NI is
1693          * deleted, which could result in a network being deleted, then
1694          * all peer nis on that network need to be removed as well.
1695          *
1696          * Creation through traffic should also be serialized with
1697          * creation through DLC.
1698          */
1699         lnet_net_unlock(cpt);
1700         mutex_lock(&the_lnet.ln_api_mutex);
1701         /*
1702          * Shutdown is only set under the ln_api_lock, so a single
1703          * check here is sufficent.
1704          */
1705         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1706                 lpni = ERR_PTR(-ESHUTDOWN);
1707                 goto out_mutex_unlock;
1708         }
1709
1710         rc = lnet_peer_ni_traffic_add(nid, pref);
1711         if (rc) {
1712                 lpni = ERR_PTR(rc);
1713                 goto out_mutex_unlock;
1714         }
1715
1716         lpni = lnet_find_peer_ni_locked(nid);
1717         LASSERT(lpni);
1718
1719 out_mutex_unlock:
1720         mutex_unlock(&the_lnet.ln_api_mutex);
1721         lnet_net_lock(cpt);
1722
1723         /* Lock has been dropped, check again for shutdown. */
1724         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1725                 if (!IS_ERR(lpni))
1726                         lnet_peer_ni_decref_locked(lpni);
1727                 lpni = ERR_PTR(-ESHUTDOWN);
1728         }
1729
1730         return lpni;
1731 }
1732
1733 /*
1734  * Peer Discovery
1735  */
1736
1737 bool
1738 lnet_peer_gw_discovery(struct lnet_peer *lp)
1739 {
1740         bool rc = false;
1741
1742         spin_lock(&lp->lp_lock);
1743         if (lp->lp_state & LNET_PEER_RTR_DISCOVERY)
1744                 rc = true;
1745         spin_unlock(&lp->lp_lock);
1746
1747         return rc;
1748 }
1749
1750 /*
1751  * Is a peer uptodate from the point of view of discovery?
1752  *
1753  * If it is currently being processed, obviously not.
1754  * A forced Ping or Push is also handled by the discovery thread.
1755  *
1756  * Otherwise look at whether the peer needs rediscovering.
1757  */
1758 bool
1759 lnet_peer_is_uptodate(struct lnet_peer *lp)
1760 {
1761         bool rc;
1762
1763         spin_lock(&lp->lp_lock);
1764         if (lp->lp_state & (LNET_PEER_DISCOVERING |
1765                             LNET_PEER_FORCE_PING |
1766                             LNET_PEER_FORCE_PUSH)) {
1767                 rc = false;
1768         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1769                 rc = true;
1770         } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
1771                 if (lnet_peer_discovery_disabled)
1772                         rc = true;
1773                 else
1774                         rc = false;
1775         } else if (lnet_peer_needs_push(lp)) {
1776                 rc = false;
1777         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1778                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1779                         rc = true;
1780                 else
1781                         rc = false;
1782         } else {
1783                 rc = false;
1784         }
1785         spin_unlock(&lp->lp_lock);
1786
1787         return rc;
1788 }
1789
1790 /*
1791  * Queue a peer for the attention of the discovery thread.  Call with
1792  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1793  * -EALREADY if the peer was already queued.
1794  */
1795 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1796 {
1797         int rc;
1798
1799         spin_lock(&lp->lp_lock);
1800         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1801                 lp->lp_state |= LNET_PEER_DISCOVERING;
1802         spin_unlock(&lp->lp_lock);
1803         if (list_empty(&lp->lp_dc_list)) {
1804                 lnet_peer_addref_locked(lp);
1805                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1806                 wake_up(&the_lnet.ln_dc_waitq);
1807                 rc = 0;
1808         } else {
1809                 rc = -EALREADY;
1810         }
1811
1812         CDEBUG(D_NET, "Queue peer %s: %d\n",
1813                libcfs_nid2str(lp->lp_primary_nid), rc);
1814
1815         return rc;
1816 }
1817
1818 /*
1819  * Discovery of a peer is complete. Wake all waiters on the peer.
1820  * Call with lnet_net_lock/EX held.
1821  */
1822 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1823 {
1824         struct lnet_msg *msg, *tmp;
1825         int rc = 0;
1826         struct list_head pending_msgs;
1827
1828         INIT_LIST_HEAD(&pending_msgs);
1829
1830         CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n",
1831                libcfs_nid2str(lp->lp_primary_nid));
1832
1833         list_del_init(&lp->lp_dc_list);
1834         spin_lock(&lp->lp_lock);
1835         list_splice_init(&lp->lp_dc_pendq, &pending_msgs);
1836         spin_unlock(&lp->lp_lock);
1837         wake_up_all(&lp->lp_dc_waitq);
1838
1839         if (lp->lp_rtr_refcount > 0)
1840                 lnet_router_discovery_complete(lp);
1841
1842         lnet_net_unlock(LNET_LOCK_EX);
1843
1844         /* iterate through all pending messages and send them again */
1845         list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
1846                 list_del_init(&msg->msg_list);
1847                 if (lp->lp_dc_error) {
1848                         lnet_finalize(msg, lp->lp_dc_error);
1849                         continue;
1850                 }
1851
1852                 CDEBUG(D_NET, "sending pending message %s to target %s\n",
1853                        lnet_msgtyp2str(msg->msg_type),
1854                        libcfs_id2str(msg->msg_target));
1855                 rc = lnet_send(msg->msg_src_nid_param, msg,
1856                                msg->msg_rtr_nid_param);
1857                 if (rc < 0) {
1858                         CNETERR("Error sending %s to %s: %d\n",
1859                                lnet_msgtyp2str(msg->msg_type),
1860                                libcfs_id2str(msg->msg_target), rc);
1861                         lnet_finalize(msg, rc);
1862                 }
1863         }
1864         lnet_net_lock(LNET_LOCK_EX);
1865         lnet_peer_decref_locked(lp);
1866 }
1867
1868 /*
1869  * Handle inbound push.
1870  * Like any event handler, called with lnet_res_lock/CPT held.
1871  */
1872 void lnet_peer_push_event(struct lnet_event *ev)
1873 {
1874         struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
1875         struct lnet_peer *lp;
1876
1877         /* lnet_find_peer() adds a refcount */
1878         lp = lnet_find_peer(ev->source.nid);
1879         if (!lp) {
1880                 CDEBUG(D_NET, "Push Put from unknown %s (source %s). Ignoring...\n",
1881                        libcfs_nid2str(ev->initiator.nid),
1882                        libcfs_nid2str(ev->source.nid));
1883                 return;
1884         }
1885
1886         /* Ensure peer state remains consistent while we modify it. */
1887         spin_lock(&lp->lp_lock);
1888
1889         /*
1890          * If some kind of error happened the contents of the message
1891          * cannot be used. Clear the NIDS_UPTODATE and set the
1892          * FORCE_PING flag to trigger a ping.
1893          */
1894         if (ev->status) {
1895                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1896                 lp->lp_state |= LNET_PEER_FORCE_PING;
1897                 CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n",
1898                        ev->status,
1899                        libcfs_nid2str(lp->lp_primary_nid),
1900                        libcfs_nid2str(ev->source.nid));
1901                 goto out;
1902         }
1903
1904         /*
1905          * A push with invalid or corrupted info. Clear the UPTODATE
1906          * flag to trigger a ping.
1907          */
1908         if (lnet_ping_info_validate(&pbuf->pb_info)) {
1909                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1910                 lp->lp_state |= LNET_PEER_FORCE_PING;
1911                 CDEBUG(D_NET, "Corrupted Push from %s\n",
1912                        libcfs_nid2str(lp->lp_primary_nid));
1913                 goto out;
1914         }
1915
1916         /*
1917          * Make sure we'll allocate the correct size ping buffer when
1918          * pinging the peer.
1919          */
1920         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
1921                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
1922
1923         /*
1924          * A non-Multi-Rail peer is not supposed to be capable of
1925          * sending a push.
1926          */
1927         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) {
1928                 CERROR("Push from non-Multi-Rail peer %s dropped\n",
1929                        libcfs_nid2str(lp->lp_primary_nid));
1930                 goto out;
1931         }
1932
1933         /*
1934          * Check the MULTIRAIL flag. Complain if the peer was DLC
1935          * configured without it.
1936          */
1937         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1938                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1939                         CERROR("Push says %s is Multi-Rail, DLC says not\n",
1940                                libcfs_nid2str(lp->lp_primary_nid));
1941                 } else {
1942                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1943                         lnet_peer_clr_non_mr_pref_nids(lp);
1944                 }
1945         }
1946
1947         /*
1948          * The peer may have discovery disabled at its end. Set
1949          * NO_DISCOVERY as appropriate.
1950          */
1951         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
1952                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
1953                        libcfs_nid2str(lp->lp_primary_nid));
1954                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
1955         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1956                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
1957                        libcfs_nid2str(lp->lp_primary_nid));
1958                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
1959         }
1960
1961         /*
1962          * Check for truncation of the Put message. Clear the
1963          * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping,
1964          * and tell discovery to allocate a bigger buffer.
1965          */
1966         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
1967                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
1968                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
1969                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1970                 lp->lp_state |= LNET_PEER_FORCE_PING;
1971                 CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n",
1972                        libcfs_nid2str(lp->lp_primary_nid),
1973                        pbuf->pb_info.pi_nnis);
1974                 goto out;
1975         }
1976
1977         /*
1978          * Check whether the Put data is stale. Stale data can just be
1979          * dropped.
1980          */
1981         if (pbuf->pb_info.pi_nnis > 1 &&
1982             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid &&
1983             LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
1984                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1985                        libcfs_nid2str(lp->lp_primary_nid),
1986                        LNET_PING_BUFFER_SEQNO(pbuf),
1987                        lp->lp_peer_seqno);
1988                 goto out;
1989         }
1990
1991         /*
1992          * Check whether the Put data is new, in which case we clear
1993          * the UPTODATE flag and prepare to process it.
1994          *
1995          * If the Put data is current, and the peer is UPTODATE then
1996          * we assome everything is all right and drop the data as
1997          * stale.
1998          */
1999         if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) {
2000                 lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2001                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2002         } else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) {
2003                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
2004                        libcfs_nid2str(lp->lp_primary_nid),
2005                        LNET_PING_BUFFER_SEQNO(pbuf),
2006                        lp->lp_peer_seqno);
2007                 goto out;
2008         }
2009
2010         /*
2011          * If there is data present that hasn't been processed yet,
2012          * we'll replace it if the Put contained newer data and it
2013          * fits. We're racing with a Ping or earlier Push in this
2014          * case.
2015          */
2016         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2017                 if (LNET_PING_BUFFER_SEQNO(pbuf) >
2018                         LNET_PING_BUFFER_SEQNO(lp->lp_data) &&
2019                     pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) {
2020                         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
2021                                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
2022                         CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n",
2023                               libcfs_nid2str(lp->lp_primary_nid),
2024                               LNET_PING_BUFFER_SEQNO(pbuf),
2025                               LNET_PING_BUFFER_SEQNO(lp->lp_data));
2026                 }
2027                 goto out;
2028         }
2029
2030         /*
2031          * Allocate a buffer to copy the data. On a failure we drop
2032          * the Push and set FORCE_PING to force the discovery
2033          * thread to fix the problem by pinging the peer.
2034          */
2035         lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC);
2036         if (!lp->lp_data) {
2037                 lp->lp_state |= LNET_PEER_FORCE_PING;
2038                 CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n",
2039                        libcfs_nid2str(lp->lp_primary_nid),
2040                        LNET_PING_BUFFER_SEQNO(pbuf));
2041                 goto out;
2042         }
2043
2044         /* Success */
2045         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
2046                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
2047         lp->lp_state |= LNET_PEER_DATA_PRESENT;
2048         CDEBUG(D_NET, "Received Push %s %u\n",
2049                libcfs_nid2str(lp->lp_primary_nid),
2050                LNET_PING_BUFFER_SEQNO(pbuf));
2051
2052 out:
2053         /*
2054          * Queue the peer for discovery if not done, force it on the request
2055          * queue and wake the discovery thread if the peer was already queued,
2056          * because its status changed.
2057          */
2058         spin_unlock(&lp->lp_lock);
2059         lnet_net_lock(LNET_LOCK_EX);
2060         if (!lnet_peer_is_uptodate(lp) && lnet_peer_queue_for_discovery(lp)) {
2061                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2062                 wake_up(&the_lnet.ln_dc_waitq);
2063         }
2064         /* Drop refcount from lookup */
2065         lnet_peer_decref_locked(lp);
2066         lnet_net_unlock(LNET_LOCK_EX);
2067 }
2068
2069 /*
2070  * Clear the discovery error state, unless we're already discovering
2071  * this peer, in which case the error is current.
2072  */
2073 static void lnet_peer_clear_discovery_error(struct lnet_peer *lp)
2074 {
2075         spin_lock(&lp->lp_lock);
2076         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
2077                 lp->lp_dc_error = 0;
2078         spin_unlock(&lp->lp_lock);
2079 }
2080
2081 /*
2082  * Peer discovery slow path. The ln_api_mutex is held on entry, and
2083  * dropped/retaken within this function. An lnet_peer_ni is passed in
2084  * because discovery could tear down an lnet_peer.
2085  */
2086 int
2087 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block)
2088 {
2089         DEFINE_WAIT(wait);
2090         struct lnet_peer *lp;
2091         int rc = 0;
2092
2093 again:
2094         lnet_net_unlock(cpt);
2095         lnet_net_lock(LNET_LOCK_EX);
2096         lp = lpni->lpni_peer_net->lpn_peer;
2097         lnet_peer_clear_discovery_error(lp);
2098
2099         /*
2100          * We're willing to be interrupted. The lpni can become a
2101          * zombie if we race with DLC, so we must check for that.
2102          */
2103         for (;;) {
2104                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
2105                 if (signal_pending(current))
2106                         break;
2107                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2108                         break;
2109                 if (lp->lp_dc_error)
2110                         break;
2111                 if (lnet_peer_is_uptodate(lp))
2112                         break;
2113                 lnet_peer_queue_for_discovery(lp);
2114                 /*
2115                  * if caller requested a non-blocking operation then
2116                  * return immediately. Once discovery is complete then the
2117                  * peer ref will be decremented and any pending messages
2118                  * that were stopped due to discovery will be transmitted.
2119                  */
2120                 if (!block)
2121                         break;
2122
2123                 lnet_peer_addref_locked(lp);
2124                 lnet_net_unlock(LNET_LOCK_EX);
2125                 schedule();
2126                 finish_wait(&lp->lp_dc_waitq, &wait);
2127                 lnet_net_lock(LNET_LOCK_EX);
2128                 lnet_peer_decref_locked(lp);
2129                 /* Peer may have changed */
2130                 lp = lpni->lpni_peer_net->lpn_peer;
2131         }
2132         finish_wait(&lp->lp_dc_waitq, &wait);
2133
2134         lnet_net_unlock(LNET_LOCK_EX);
2135         lnet_net_lock(cpt);
2136
2137         /*
2138          * If the peer has changed after we've discovered the older peer,
2139          * then we need to discovery the new peer to make sure the
2140          * interface information is up to date
2141          */
2142         if (lp != lpni->lpni_peer_net->lpn_peer)
2143                 goto again;
2144
2145         if (signal_pending(current))
2146                 rc = -EINTR;
2147         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2148                 rc = -ESHUTDOWN;
2149         else if (lp->lp_dc_error)
2150                 rc = lp->lp_dc_error;
2151         else if (!block)
2152                 CDEBUG(D_NET, "non-blocking discovery\n");
2153         else if (!lnet_peer_is_uptodate(lp))
2154                 goto again;
2155
2156         CDEBUG(D_NET, "peer %s NID %s: %d. %s\n",
2157                (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"),
2158                libcfs_nid2str(lpni->lpni_nid), rc,
2159                (!block) ? "pending discovery" : "discovery complete");
2160
2161         return rc;
2162 }
2163
2164 /* Handle an incoming ack for a push. */
2165 static void
2166 lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev)
2167 {
2168         struct lnet_ping_buffer *pbuf;
2169
2170         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2171         spin_lock(&lp->lp_lock);
2172         lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2173         lp->lp_push_error = ev->status;
2174         if (ev->status)
2175                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2176         else
2177                 lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2178         spin_unlock(&lp->lp_lock);
2179
2180         CDEBUG(D_NET, "peer %s ev->status %d\n",
2181                libcfs_nid2str(lp->lp_primary_nid), ev->status);
2182 }
2183
2184 /* Handle a Reply message. This is the reply to a Ping message. */
2185 static void
2186 lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
2187 {
2188         struct lnet_ping_buffer *pbuf;
2189         int rc;
2190
2191         spin_lock(&lp->lp_lock);
2192
2193         /*
2194          * If some kind of error happened the contents of message
2195          * cannot be used. Set PING_FAILED to trigger a retry.
2196          */
2197         if (ev->status) {
2198                 lp->lp_state |= LNET_PEER_PING_FAILED;
2199                 lp->lp_ping_error = ev->status;
2200                 CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n",
2201                        ev->status,
2202                        libcfs_nid2str(lp->lp_primary_nid),
2203                        libcfs_nid2str(ev->source.nid));
2204                 goto out;
2205         }
2206
2207         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2208         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
2209                 lnet_swap_pinginfo(pbuf);
2210
2211         /*
2212          * A reply with invalid or corrupted info. Set PING_FAILED to
2213          * trigger a retry.
2214          */
2215         rc = lnet_ping_info_validate(&pbuf->pb_info);
2216         if (rc) {
2217                 lp->lp_state |= LNET_PEER_PING_FAILED;
2218                 lp->lp_ping_error = 0;
2219                 CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n",
2220                        libcfs_nid2str(lp->lp_primary_nid), rc);
2221                 goto out;
2222         }
2223
2224         /*
2225          * Update the MULTI_RAIL flag based on the reply. If the peer
2226          * was configured with DLC then the setting should match what
2227          * DLC put in.
2228          */
2229         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
2230                 if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2231                         /* Everything's fine */
2232                 } else if (lp->lp_state & LNET_PEER_CONFIGURED) {
2233                         CWARN("Reply says %s is Multi-Rail, DLC says not\n",
2234                               libcfs_nid2str(lp->lp_primary_nid));
2235                 } else {
2236                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
2237                         lnet_peer_clr_non_mr_pref_nids(lp);
2238                 }
2239         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2240                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
2241                         CWARN("DLC says %s is Multi-Rail, Reply says not\n",
2242                               libcfs_nid2str(lp->lp_primary_nid));
2243                 } else {
2244                         CERROR("Multi-Rail state vanished from %s\n",
2245                                libcfs_nid2str(lp->lp_primary_nid));
2246                         lp->lp_state &= ~LNET_PEER_MULTI_RAIL;
2247                 }
2248         }
2249
2250         /*
2251          * Make sure we'll allocate the correct size ping buffer when
2252          * pinging the peer.
2253          */
2254         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
2255                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
2256
2257         /*
2258          * The peer may have discovery disabled at its end. Set
2259          * NO_DISCOVERY as appropriate.
2260          */
2261         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
2262                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
2263                        libcfs_nid2str(lp->lp_primary_nid));
2264                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
2265         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
2266                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
2267                        libcfs_nid2str(lp->lp_primary_nid));
2268                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
2269         }
2270
2271         /*
2272          * Check for truncation of the Reply. Clear PING_SENT and set
2273          * PING_FAILED to trigger a retry.
2274          */
2275         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
2276                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
2277                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
2278                 lp->lp_state |= LNET_PEER_PING_FAILED;
2279                 lp->lp_ping_error = 0;
2280                 CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n",
2281                        libcfs_nid2str(lp->lp_primary_nid),
2282                        pbuf->pb_info.pi_nnis);
2283                 goto out;
2284         }
2285
2286         /*
2287          * Check the sequence numbers in the reply. These are only
2288          * available if the reply came from a Multi-Rail peer.
2289          */
2290         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
2291             pbuf->pb_info.pi_nnis > 1 &&
2292             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
2293                 if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
2294                         CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n",
2295                                 libcfs_nid2str(lp->lp_primary_nid),
2296                                 LNET_PING_BUFFER_SEQNO(pbuf),
2297                                 lp->lp_peer_seqno);
2298                         goto out;
2299                 }
2300
2301                 if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno)
2302                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2303         }
2304
2305         /* We're happy with the state of the data in the buffer. */
2306         CDEBUG(D_NET, "peer %s data present %u\n",
2307                libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno);
2308         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
2309                 lnet_ping_buffer_decref(lp->lp_data);
2310         else
2311                 lp->lp_state |= LNET_PEER_DATA_PRESENT;
2312         lnet_ping_buffer_addref(pbuf);
2313         lp->lp_data = pbuf;
2314 out:
2315         lp->lp_state &= ~LNET_PEER_PING_SENT;
2316         spin_unlock(&lp->lp_lock);
2317 }
2318
2319 /*
2320  * Send event handling. Only matters for error cases, where we clean
2321  * up state on the peer and peer_ni that would otherwise be updated in
2322  * the REPLY event handler for a successful Ping, and the ACK event
2323  * handler for a successful Push.
2324  */
2325 static int
2326 lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev)
2327 {
2328         int rc = 0;
2329
2330         if (!ev->status)
2331                 goto out;
2332
2333         spin_lock(&lp->lp_lock);
2334         if (ev->msg_type == LNET_MSG_GET) {
2335                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2336                 lp->lp_state |= LNET_PEER_PING_FAILED;
2337                 lp->lp_ping_error = ev->status;
2338         } else { /* ev->msg_type == LNET_MSG_PUT */
2339                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2340                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2341                 lp->lp_push_error = ev->status;
2342         }
2343         spin_unlock(&lp->lp_lock);
2344         rc = LNET_REDISCOVER_PEER;
2345 out:
2346         CDEBUG(D_NET, "%s Send to %s: %d\n",
2347                 (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"),
2348                 libcfs_nid2str(ev->target.nid), rc);
2349         return rc;
2350 }
2351
2352 /*
2353  * Unlink event handling. This event is only seen if a call to
2354  * LNetMDUnlink() caused the event to be unlinked. If this call was
2355  * made after the event was set up in LNetGet() or LNetPut() then we
2356  * assume the Ping or Push timed out.
2357  */
2358 static void
2359 lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev)
2360 {
2361         spin_lock(&lp->lp_lock);
2362         /* We've passed through LNetGet() */
2363         if (lp->lp_state & LNET_PEER_PING_SENT) {
2364                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2365                 lp->lp_state |= LNET_PEER_PING_FAILED;
2366                 lp->lp_ping_error = -ETIMEDOUT;
2367                 CDEBUG(D_NET, "Ping Unlink for message to peer %s\n",
2368                         libcfs_nid2str(lp->lp_primary_nid));
2369         }
2370         /* We've passed through LNetPut() */
2371         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2372                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2373                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2374                 lp->lp_push_error = -ETIMEDOUT;
2375                 CDEBUG(D_NET, "Push Unlink for message to peer %s\n",
2376                         libcfs_nid2str(lp->lp_primary_nid));
2377         }
2378         spin_unlock(&lp->lp_lock);
2379 }
2380
2381 /*
2382  * Event handler for the discovery EQ.
2383  *
2384  * Called with lnet_res_lock(cpt) held. The cpt is the
2385  * lnet_cpt_of_cookie() of the md handle cookie.
2386  */
2387 static void lnet_discovery_event_handler(struct lnet_event *event)
2388 {
2389         struct lnet_peer *lp = event->md.user_ptr;
2390         struct lnet_ping_buffer *pbuf;
2391         int rc;
2392
2393         /* discovery needs to take another look */
2394         rc = LNET_REDISCOVER_PEER;
2395
2396         CDEBUG(D_NET, "Received event: %d\n", event->type);
2397
2398         switch (event->type) {
2399         case LNET_EVENT_ACK:
2400                 lnet_discovery_event_ack(lp, event);
2401                 break;
2402         case LNET_EVENT_REPLY:
2403                 lnet_discovery_event_reply(lp, event);
2404                 break;
2405         case LNET_EVENT_SEND:
2406                 /* Only send failure triggers a retry. */
2407                 rc = lnet_discovery_event_send(lp, event);
2408                 break;
2409         case LNET_EVENT_UNLINK:
2410                 /* LNetMDUnlink() was called */
2411                 lnet_discovery_event_unlink(lp, event);
2412                 break;
2413         default:
2414                 /* Invalid events. */
2415                 LBUG();
2416         }
2417         lnet_net_lock(LNET_LOCK_EX);
2418         if (event->unlinked) {
2419                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
2420                 lnet_ping_buffer_decref(pbuf);
2421                 lnet_peer_decref_locked(lp);
2422         }
2423
2424         /* put peer back at end of request queue, if discovery not already
2425          * done */
2426         if (rc == LNET_REDISCOVER_PEER && !lnet_peer_is_uptodate(lp)) {
2427                 list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2428                 wake_up(&the_lnet.ln_dc_waitq);
2429         }
2430         lnet_net_unlock(LNET_LOCK_EX);
2431 }
2432
2433 /*
2434  * Build a peer from incoming data.
2435  *
2436  * The NIDs in the incoming data are supposed to be structured as follows:
2437  *  - loopback
2438  *  - primary NID
2439  *  - other NIDs in same net
2440  *  - NIDs in second net
2441  *  - NIDs in third net
2442  *  - ...
2443  * This due to the way the list of NIDs in the data is created.
2444  *
2445  * Note that this function will mark the peer uptodate unless an
2446  * ENOMEM is encontered. All other errors are due to a conflict
2447  * between the DLC configuration and what discovery sees. We treat DLC
2448  * as binding, and therefore set the NIDS_UPTODATE flag to prevent the
2449  * peer from becoming stuck in discovery.
2450  */
2451 static int lnet_peer_merge_data(struct lnet_peer *lp,
2452                                 struct lnet_ping_buffer *pbuf)
2453 {
2454         struct lnet_peer_ni *lpni;
2455         lnet_nid_t *curnis = NULL;
2456         struct lnet_ni_status *addnis = NULL;
2457         lnet_nid_t *delnis = NULL;
2458         unsigned flags;
2459         int ncurnis;
2460         int naddnis;
2461         int ndelnis;
2462         int nnis = 0;
2463         int i;
2464         int j;
2465         int rc;
2466
2467         flags = LNET_PEER_DISCOVERED;
2468         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2469                 flags |= LNET_PEER_MULTI_RAIL;
2470
2471         /*
2472          * Cache the routing feature for the peer; whether it is enabled
2473          * for disabled as reported by the remote peer.
2474          */
2475         spin_lock(&lp->lp_lock);
2476         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_RTE_DISABLED))
2477                 lp->lp_state |= LNET_PEER_ROUTER_ENABLED;
2478         else
2479                 lp->lp_state &= ~LNET_PEER_ROUTER_ENABLED;
2480         spin_unlock(&lp->lp_lock);
2481
2482         nnis = MAX(lp->lp_nnis, pbuf->pb_info.pi_nnis);
2483         LIBCFS_ALLOC(curnis, nnis * sizeof(*curnis));
2484         LIBCFS_ALLOC(addnis, nnis * sizeof(*addnis));
2485         LIBCFS_ALLOC(delnis, nnis * sizeof(*delnis));
2486         if (!curnis || !addnis || !delnis) {
2487                 rc = -ENOMEM;
2488                 goto out;
2489         }
2490         ncurnis = 0;
2491         naddnis = 0;
2492         ndelnis = 0;
2493
2494         /* Construct the list of NIDs present in peer. */
2495         lpni = NULL;
2496         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
2497                 curnis[ncurnis++] = lpni->lpni_nid;
2498
2499         /*
2500          * Check for NIDs in pbuf not present in curnis[].
2501          * The loop starts at 1 to skip the loopback NID.
2502          */
2503         for (i = 1; i < pbuf->pb_info.pi_nnis; i++) {
2504                 for (j = 0; j < ncurnis; j++)
2505                         if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
2506                                 break;
2507                 if (j == ncurnis)
2508                         addnis[naddnis++] = pbuf->pb_info.pi_ni[i];
2509         }
2510         /*
2511          * Check for NIDs in curnis[] not present in pbuf.
2512          * The nested loop starts at 1 to skip the loopback NID.
2513          *
2514          * But never add the loopback NID to delnis[]: if it is
2515          * present in curnis[] then this peer is for this node.
2516          */
2517         for (i = 0; i < ncurnis; i++) {
2518                 if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND)
2519                         continue;
2520                 for (j = 1; j < pbuf->pb_info.pi_nnis; j++) {
2521                         if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid) {
2522                                 /*
2523                                  * update the information we cache for the
2524                                  * peer with the latest information we
2525                                  * received
2526                                  */
2527                                 lpni = lnet_find_peer_ni_locked(curnis[i]);
2528                                 if (lpni) {
2529                                         lpni->lpni_ns_status = pbuf->pb_info.pi_ni[j].ns_status;
2530                                         lnet_peer_ni_decref_locked(lpni);
2531                                 }
2532                                 break;
2533                         }
2534                 }
2535                 if (j == pbuf->pb_info.pi_nnis)
2536                         delnis[ndelnis++] = curnis[i];
2537         }
2538
2539         for (i = 0; i < naddnis; i++) {
2540                 rc = lnet_peer_add_nid(lp, addnis[i].ns_nid, flags);
2541                 if (rc) {
2542                         CERROR("Error adding NID %s to peer %s: %d\n",
2543                                libcfs_nid2str(addnis[i].ns_nid),
2544                                libcfs_nid2str(lp->lp_primary_nid), rc);
2545                         if (rc == -ENOMEM)
2546                                 goto out;
2547                 }
2548                 lpni = lnet_find_peer_ni_locked(addnis[i].ns_nid);
2549                 if (lpni) {
2550                         lpni->lpni_ns_status = addnis[i].ns_status;
2551                         lnet_peer_ni_decref_locked(lpni);
2552                 }
2553         }
2554
2555         for (i = 0; i < ndelnis; i++) {
2556                 rc = lnet_peer_del_nid(lp, delnis[i], flags);
2557                 if (rc) {
2558                         CERROR("Error deleting NID %s from peer %s: %d\n",
2559                                libcfs_nid2str(delnis[i]),
2560                                libcfs_nid2str(lp->lp_primary_nid), rc);
2561                         if (rc == -ENOMEM)
2562                                 goto out;
2563                 }
2564         }
2565         /*
2566          * Errors other than -ENOMEM are due to peers having been
2567          * configured with DLC. Ignore these because DLC overrides
2568          * Discovery.
2569          */
2570         rc = 0;
2571 out:
2572         LIBCFS_FREE(curnis, nnis * sizeof(*curnis));
2573         LIBCFS_FREE(addnis, nnis * sizeof(*addnis));
2574         LIBCFS_FREE(delnis, nnis * sizeof(*delnis));
2575         lnet_ping_buffer_decref(pbuf);
2576         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2577
2578         if (rc) {
2579                 spin_lock(&lp->lp_lock);
2580                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2581                 lp->lp_state |= LNET_PEER_FORCE_PING;
2582                 spin_unlock(&lp->lp_lock);
2583         }
2584         return rc;
2585 }
2586
2587 /*
2588  * The data in pbuf says lp is its primary peer, but the data was
2589  * received by a different peer. Try to update lp with the data.
2590  */
2591 static int
2592 lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
2593 {
2594         struct lnet_handle_md mdh;
2595
2596         /* Queue lp for discovery, and force it on the request queue. */
2597         lnet_net_lock(LNET_LOCK_EX);
2598         if (lnet_peer_queue_for_discovery(lp))
2599                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2600         lnet_net_unlock(LNET_LOCK_EX);
2601
2602         LNetInvalidateMDHandle(&mdh);
2603
2604         /*
2605          * Decide whether we can move the peer to the DATA_PRESENT state.
2606          *
2607          * We replace stale data for a multi-rail peer, repair PING_FAILED
2608          * status, and preempt FORCE_PING.
2609          *
2610          * If after that we have DATA_PRESENT, we merge it into this peer.
2611          */
2612         spin_lock(&lp->lp_lock);
2613         if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2614                 if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) {
2615                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2616                 } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2617                         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2618                         lnet_ping_buffer_decref(pbuf);
2619                         pbuf = lp->lp_data;
2620                         lp->lp_data = NULL;
2621                 }
2622         }
2623         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2624                 lnet_ping_buffer_decref(lp->lp_data);
2625                 lp->lp_data = NULL;
2626                 lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2627         }
2628         if (lp->lp_state & LNET_PEER_PING_FAILED) {
2629                 mdh = lp->lp_ping_mdh;
2630                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2631                 lp->lp_state &= ~LNET_PEER_PING_FAILED;
2632                 lp->lp_ping_error = 0;
2633         }
2634         if (lp->lp_state & LNET_PEER_FORCE_PING)
2635                 lp->lp_state &= ~LNET_PEER_FORCE_PING;
2636         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2637         spin_unlock(&lp->lp_lock);
2638
2639         if (!LNetMDHandleIsInvalid(mdh))
2640                 LNetMDUnlink(mdh);
2641
2642         if (pbuf)
2643                 return lnet_peer_merge_data(lp, pbuf);
2644
2645         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2646         return 0;
2647 }
2648
2649 /*
2650  * Update a peer using the data received.
2651  */
2652 static int lnet_peer_data_present(struct lnet_peer *lp)
2653 __must_hold(&lp->lp_lock)
2654 {
2655         struct lnet_ping_buffer *pbuf;
2656         struct lnet_peer_ni *lpni;
2657         lnet_nid_t nid = LNET_NID_ANY;
2658         unsigned flags;
2659         int rc = 0;
2660
2661         pbuf = lp->lp_data;
2662         lp->lp_data = NULL;
2663         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2664         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2665         spin_unlock(&lp->lp_lock);
2666
2667         /*
2668          * Modifications of peer structures are done while holding the
2669          * ln_api_mutex. A global lock is required because we may be
2670          * modifying multiple peer structures, and a mutex greatly
2671          * simplifies memory management.
2672          *
2673          * The actual changes to the data structures must also protect
2674          * against concurrent lookups, for which the lnet_net_lock in
2675          * LNET_LOCK_EX mode is used.
2676          */
2677         mutex_lock(&the_lnet.ln_api_mutex);
2678         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
2679                 rc = -ESHUTDOWN;
2680                 goto out;
2681         }
2682
2683         /*
2684          * If this peer is not on the peer list then it is being torn
2685          * down, and our reference count may be all that is keeping it
2686          * alive. Don't do any work on it.
2687          */
2688         if (list_empty(&lp->lp_peer_list))
2689                 goto out;
2690
2691         flags = LNET_PEER_DISCOVERED;
2692         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2693                 flags |= LNET_PEER_MULTI_RAIL;
2694
2695         /*
2696          * Check whether the primary NID in the message matches the
2697          * primary NID of the peer. If it does, update the peer, if
2698          * it it does not, check whether there is already a peer with
2699          * that primary NID. If no such peer exists, try to update
2700          * the primary NID of the current peer (allowed if it was
2701          * created due to message traffic) and complete the update.
2702          * If the peer did exist, hand off the data to it.
2703          *
2704          * The peer for the loopback interface is a special case: this
2705          * is the peer for the local node, and we want to set its
2706          * primary NID to the correct value here. Moreover, this peer
2707          * can show up with only the loopback NID in the ping buffer.
2708          */
2709         if (pbuf->pb_info.pi_nnis <= 1)
2710                 goto out;
2711         nid = pbuf->pb_info.pi_ni[1].ns_nid;
2712         if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) {
2713                 rc = lnet_peer_set_primary_nid(lp, nid, flags);
2714                 if (!rc)
2715                         rc = lnet_peer_merge_data(lp, pbuf);
2716         } else if (lp->lp_primary_nid == nid) {
2717                 rc = lnet_peer_merge_data(lp, pbuf);
2718         } else {
2719                 lpni = lnet_find_peer_ni_locked(nid);
2720                 if (!lpni) {
2721                         rc = lnet_peer_set_primary_nid(lp, nid, flags);
2722                         if (rc) {
2723                                 CERROR("Primary NID error %s versus %s: %d\n",
2724                                        libcfs_nid2str(lp->lp_primary_nid),
2725                                        libcfs_nid2str(nid), rc);
2726                         } else {
2727                                 rc = lnet_peer_merge_data(lp, pbuf);
2728                         }
2729                 } else {
2730                         struct lnet_peer *new_lp;
2731                         new_lp = lpni->lpni_peer_net->lpn_peer;
2732                         rc = lnet_peer_set_primary_data(new_lp, pbuf);
2733                         lnet_consolidate_routes_locked(lp, new_lp);
2734                         lnet_peer_ni_decref_locked(lpni);
2735                 }
2736         }
2737 out:
2738         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2739         mutex_unlock(&the_lnet.ln_api_mutex);
2740
2741         spin_lock(&lp->lp_lock);
2742         /* Tell discovery to re-check the peer immediately. */
2743         if (!rc)
2744                 rc = LNET_REDISCOVER_PEER;
2745         return rc;
2746 }
2747
2748 /*
2749  * A ping failed. Clear the PING_FAILED state and set the
2750  * FORCE_PING state, to ensure a retry even if discovery is
2751  * disabled. This avoids being left with incorrect state.
2752  */
2753 static int lnet_peer_ping_failed(struct lnet_peer *lp)
2754 __must_hold(&lp->lp_lock)
2755 {
2756         struct lnet_handle_md mdh;
2757         int rc;
2758
2759         mdh = lp->lp_ping_mdh;
2760         LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2761         lp->lp_state &= ~LNET_PEER_PING_FAILED;
2762         lp->lp_state |= LNET_PEER_FORCE_PING;
2763         rc = lp->lp_ping_error;
2764         lp->lp_ping_error = 0;
2765         spin_unlock(&lp->lp_lock);
2766
2767         if (!LNetMDHandleIsInvalid(mdh))
2768                 LNetMDUnlink(mdh);
2769
2770         CDEBUG(D_NET, "peer %s:%d\n",
2771                libcfs_nid2str(lp->lp_primary_nid), rc);
2772
2773         spin_lock(&lp->lp_lock);
2774         return rc ? rc : LNET_REDISCOVER_PEER;
2775 }
2776
2777 /*
2778  * Select NID to send a Ping or Push to.
2779  */
2780 static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
2781 {
2782         struct lnet_peer_ni *lpni;
2783
2784         /* Look for a direct-connected NID for this peer. */
2785         lpni = NULL;
2786         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2787                 if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
2788                         continue;
2789                 break;
2790         }
2791         if (lpni)
2792                 return lpni->lpni_nid;
2793
2794         /* Look for a routed-connected NID for this peer. */
2795         lpni = NULL;
2796         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2797                 if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
2798                         continue;
2799                 break;
2800         }
2801         if (lpni)
2802                 return lpni->lpni_nid;
2803
2804         return LNET_NID_ANY;
2805 }
2806
2807 /* Active side of ping. */
2808 static int lnet_peer_send_ping(struct lnet_peer *lp)
2809 __must_hold(&lp->lp_lock)
2810 {
2811         lnet_nid_t pnid;
2812         int nnis;
2813         int rc;
2814         int cpt;
2815
2816         lp->lp_state |= LNET_PEER_PING_SENT;
2817         lp->lp_state &= ~LNET_PEER_FORCE_PING;
2818         spin_unlock(&lp->lp_lock);
2819
2820         cpt = lnet_net_lock_current();
2821         /* Refcount for MD. */
2822         lnet_peer_addref_locked(lp);
2823         pnid = lnet_peer_select_nid(lp);
2824         lnet_net_unlock(cpt);
2825
2826         nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN);
2827
2828         rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp,
2829                             the_lnet.ln_dc_eqh, false);
2830
2831         /*
2832          * if LNetMDBind in lnet_send_ping fails we need to decrement the
2833          * refcount on the peer, otherwise LNetMDUnlink will be called
2834          * which will eventually do that.
2835          */
2836         if (rc > 0) {
2837                 lnet_net_lock(cpt);
2838                 lnet_peer_decref_locked(lp);
2839                 lnet_net_unlock(cpt);
2840                 rc = -rc; /* change the rc to negative value */
2841                 goto fail_error;
2842         } else if (rc < 0) {
2843                 goto fail_error;
2844         }
2845
2846         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2847
2848         spin_lock(&lp->lp_lock);
2849         return 0;
2850
2851 fail_error:
2852         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2853         /*
2854          * The errors that get us here are considered hard errors and
2855          * cause Discovery to terminate. So we clear PING_SENT, but do
2856          * not set either PING_FAILED or FORCE_PING. In fact we need
2857          * to clear PING_FAILED, because the unlink event handler will
2858          * have set it if we called LNetMDUnlink() above.
2859          */
2860         spin_lock(&lp->lp_lock);
2861         lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED);
2862         return rc;
2863 }
2864
2865 /*
2866  * This function exists because you cannot call LNetMDUnlink() from an
2867  * event handler.
2868  */
2869 static int lnet_peer_push_failed(struct lnet_peer *lp)
2870 __must_hold(&lp->lp_lock)
2871 {
2872         struct lnet_handle_md mdh;
2873         int rc;
2874
2875         mdh = lp->lp_push_mdh;
2876         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2877         lp->lp_state &= ~LNET_PEER_PUSH_FAILED;
2878         rc = lp->lp_push_error;
2879         lp->lp_push_error = 0;
2880         spin_unlock(&lp->lp_lock);
2881
2882         if (!LNetMDHandleIsInvalid(mdh))
2883                 LNetMDUnlink(mdh);
2884
2885         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2886         spin_lock(&lp->lp_lock);
2887         return rc ? rc : LNET_REDISCOVER_PEER;
2888 }
2889
2890 /* Active side of push. */
2891 static int lnet_peer_send_push(struct lnet_peer *lp)
2892 __must_hold(&lp->lp_lock)
2893 {
2894         struct lnet_ping_buffer *pbuf;
2895         struct lnet_process_id id;
2896         struct lnet_md md;
2897         int cpt;
2898         int rc;
2899
2900         /* Don't push to a non-multi-rail peer. */
2901         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
2902                 lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2903                 return 0;
2904         }
2905
2906         lp->lp_state |= LNET_PEER_PUSH_SENT;
2907         lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2908         spin_unlock(&lp->lp_lock);
2909
2910         cpt = lnet_net_lock_current();
2911         pbuf = the_lnet.ln_ping_target;
2912         lnet_ping_buffer_addref(pbuf);
2913         lnet_net_unlock(cpt);
2914
2915         /* Push source MD */
2916         md.start     = &pbuf->pb_info;
2917         md.length    = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
2918         md.threshold = 2; /* Put/Ack */
2919         md.max_size  = 0;
2920         md.options   = 0;
2921         md.eq_handle = the_lnet.ln_dc_eqh;
2922         md.user_ptr  = lp;
2923
2924         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
2925         if (rc) {
2926                 lnet_ping_buffer_decref(pbuf);
2927                 CERROR("Can't bind push source MD: %d\n", rc);
2928                 goto fail_error;
2929         }
2930         cpt = lnet_net_lock_current();
2931         /* Refcount for MD. */
2932         lnet_peer_addref_locked(lp);
2933         id.pid = LNET_PID_LUSTRE;
2934         id.nid = lnet_peer_select_nid(lp);
2935         lnet_net_unlock(cpt);
2936
2937         if (id.nid == LNET_NID_ANY) {
2938                 rc = -EHOSTUNREACH;
2939                 goto fail_unlink;
2940         }
2941
2942         rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh,
2943                      LNET_ACK_REQ, id, LNET_RESERVED_PORTAL,
2944                      LNET_PROTO_PING_MATCHBITS, 0, 0);
2945
2946         if (rc)
2947                 goto fail_unlink;
2948
2949         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2950
2951         spin_lock(&lp->lp_lock);
2952         return 0;
2953
2954 fail_unlink:
2955         LNetMDUnlink(lp->lp_push_mdh);
2956         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2957 fail_error:
2958         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2959         /*
2960          * The errors that get us here are considered hard errors and
2961          * cause Discovery to terminate. So we clear PUSH_SENT, but do
2962          * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED,
2963          * because the unlink event handler will have set it if we
2964          * called LNetMDUnlink() above.
2965          */
2966         spin_lock(&lp->lp_lock);
2967         lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED);
2968         return rc;
2969 }
2970
2971 /*
2972  * An unrecoverable error was encountered during discovery.
2973  * Set error status in peer and abort discovery.
2974  */
2975 static void lnet_peer_discovery_error(struct lnet_peer *lp, int error)
2976 {
2977         CDEBUG(D_NET, "Discovery error %s: %d\n",
2978                libcfs_nid2str(lp->lp_primary_nid), error);
2979
2980         spin_lock(&lp->lp_lock);
2981         lp->lp_dc_error = error;
2982         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2983         lp->lp_state |= LNET_PEER_REDISCOVER;
2984         spin_unlock(&lp->lp_lock);
2985 }
2986
2987 /*
2988  * Mark the peer as discovered.
2989  */
2990 static int lnet_peer_discovered(struct lnet_peer *lp)
2991 __must_hold(&lp->lp_lock)
2992 {
2993         lp->lp_state |= LNET_PEER_DISCOVERED;
2994         lp->lp_state &= ~(LNET_PEER_DISCOVERING |
2995                           LNET_PEER_REDISCOVER);
2996
2997         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2998
2999         return 0;
3000 }
3001
3002 /*
3003  * Mark the peer as to be rediscovered.
3004  */
3005 static int lnet_peer_rediscover(struct lnet_peer *lp)
3006 __must_hold(&lp->lp_lock)
3007 {
3008         lp->lp_state |= LNET_PEER_REDISCOVER;
3009         lp->lp_state &= ~LNET_PEER_DISCOVERING;
3010
3011         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
3012
3013         return 0;
3014 }
3015
3016 /*
3017  * Discovering this peer is taking too long. Cancel any Ping or Push
3018  * that discovery is waiting on by unlinking the relevant MDs. The
3019  * lnet_discovery_event_handler() will proceed from here and complete
3020  * the cleanup.
3021  */
3022 static void lnet_peer_cancel_discovery(struct lnet_peer *lp)
3023 {
3024         struct lnet_handle_md ping_mdh;
3025         struct lnet_handle_md push_mdh;
3026
3027         LNetInvalidateMDHandle(&ping_mdh);
3028         LNetInvalidateMDHandle(&push_mdh);
3029
3030         spin_lock(&lp->lp_lock);
3031         if (lp->lp_state & LNET_PEER_PING_SENT) {
3032                 ping_mdh = lp->lp_ping_mdh;
3033                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
3034         }
3035         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
3036                 push_mdh = lp->lp_push_mdh;
3037                 LNetInvalidateMDHandle(&lp->lp_push_mdh);
3038         }
3039         spin_unlock(&lp->lp_lock);
3040
3041         if (!LNetMDHandleIsInvalid(ping_mdh))
3042                 LNetMDUnlink(ping_mdh);
3043         if (!LNetMDHandleIsInvalid(push_mdh))
3044                 LNetMDUnlink(push_mdh);
3045 }
3046
3047 /*
3048  * Wait for work to be queued or some other change that must be
3049  * attended to. Returns non-zero if the discovery thread should shut
3050  * down.
3051  */
3052 static int lnet_peer_discovery_wait_for_work(void)
3053 {
3054         int cpt;
3055         int rc = 0;
3056
3057         DEFINE_WAIT(wait);
3058
3059         cpt = lnet_net_lock_current();
3060         for (;;) {
3061                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
3062                                 TASK_INTERRUPTIBLE);
3063                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3064                         break;
3065                 if (lnet_push_target_resize_needed())
3066                         break;
3067                 if (!list_empty(&the_lnet.ln_dc_request))
3068                         break;
3069                 if (!list_empty(&the_lnet.ln_msg_resend))
3070                         break;
3071                 lnet_net_unlock(cpt);
3072
3073                 /*
3074                  * wakeup max every second to check if there are peers that
3075                  * have been stuck on the working queue for greater than
3076                  * the peer timeout.
3077                  */
3078                 schedule_timeout(cfs_time_seconds(1));
3079                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
3080                 cpt = lnet_net_lock_current();
3081         }
3082         finish_wait(&the_lnet.ln_dc_waitq, &wait);
3083
3084         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3085                 rc = -ESHUTDOWN;
3086
3087         lnet_net_unlock(cpt);
3088
3089         CDEBUG(D_NET, "woken: %d\n", rc);
3090
3091         return rc;
3092 }
3093
3094 /*
3095  * Messages that were pending on a destroyed peer will be put on a global
3096  * resend list. The message resend list will be checked by
3097  * the discovery thread when it wakes up, and will resend messages. These
3098  * messages can still be sendable in the case the lpni which was the initial
3099  * cause of the message re-queue was transfered to another peer.
3100  *
3101  * It is possible that LNet could be shutdown while we're iterating
3102  * through the list. lnet_shudown_lndnets() will attempt to access the
3103  * resend list, but will have to wait until the spinlock is released, by
3104  * which time there shouldn't be any more messages on the resend list.
3105  * During shutdown lnet_send() will fail and lnet_finalize() will be called
3106  * for the messages so they can be released. The other case is that
3107  * lnet_shudown_lndnets() can finalize all the messages before this
3108  * function can visit the resend list, in which case this function will be
3109  * a no-op.
3110  */
3111 static void lnet_resend_msgs(void)
3112 {
3113         struct lnet_msg *msg, *tmp;
3114         struct list_head resend;
3115         int rc;
3116
3117         INIT_LIST_HEAD(&resend);
3118
3119         spin_lock(&the_lnet.ln_msg_resend_lock);
3120         list_splice(&the_lnet.ln_msg_resend, &resend);
3121         spin_unlock(&the_lnet.ln_msg_resend_lock);
3122
3123         list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
3124                 list_del_init(&msg->msg_list);
3125                 rc = lnet_send(msg->msg_src_nid_param, msg,
3126                                msg->msg_rtr_nid_param);
3127                 if (rc < 0) {
3128                         CNETERR("Error sending %s to %s: %d\n",
3129                                lnet_msgtyp2str(msg->msg_type),
3130                                libcfs_id2str(msg->msg_target), rc);
3131                         lnet_finalize(msg, rc);
3132                 }
3133         }
3134 }
3135
3136 /* The discovery thread. */
3137 static int lnet_peer_discovery(void *arg)
3138 {
3139         struct lnet_peer *lp;
3140         int rc;
3141
3142         CDEBUG(D_NET, "started\n");
3143         cfs_block_allsigs();
3144
3145         for (;;) {
3146                 if (lnet_peer_discovery_wait_for_work())
3147                         break;
3148
3149                 lnet_resend_msgs();
3150
3151                 if (lnet_push_target_resize_needed())
3152                         lnet_push_target_resize();
3153
3154                 lnet_net_lock(LNET_LOCK_EX);
3155                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3156                         break;
3157
3158                 /*
3159                  * Process all incoming discovery work requests.  When
3160                  * discovery must wait on a peer to change state, it
3161                  * is added to the tail of the ln_dc_working queue. A
3162                  * timestamp keeps track of when the peer was added,
3163                  * so we can time out discovery requests that take too
3164                  * long.
3165                  */
3166                 while (!list_empty(&the_lnet.ln_dc_request)) {
3167                         lp = list_first_entry(&the_lnet.ln_dc_request,
3168                                               struct lnet_peer, lp_dc_list);
3169                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
3170                         /*
3171                          * set the time the peer was put on the dc_working
3172                          * queue. It shouldn't remain on the queue
3173                          * forever, in case the GET message (for ping)
3174                          * doesn't get a REPLY or the PUT message (for
3175                          * push) doesn't get an ACK.
3176                          */
3177                         lp->lp_last_queued = ktime_get_real_seconds();
3178                         lnet_net_unlock(LNET_LOCK_EX);
3179
3180                         /*
3181                          * Select an action depending on the state of
3182                          * the peer and whether discovery is disabled.
3183                          * The check whether discovery is disabled is
3184                          * done after the code that handles processing
3185                          * for arrived data, cleanup for failures, and
3186                          * forcing a Ping or Push.
3187                          */
3188                         spin_lock(&lp->lp_lock);
3189                         CDEBUG(D_NET, "peer %s state %#x\n",
3190                                 libcfs_nid2str(lp->lp_primary_nid),
3191                                 lp->lp_state);
3192                         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
3193                                 rc = lnet_peer_data_present(lp);
3194                         else if (lp->lp_state & LNET_PEER_PING_FAILED)
3195                                 rc = lnet_peer_ping_failed(lp);
3196                         else if (lp->lp_state & LNET_PEER_PUSH_FAILED)
3197                                 rc = lnet_peer_push_failed(lp);
3198                         else if (lp->lp_state & LNET_PEER_FORCE_PING)
3199                                 rc = lnet_peer_send_ping(lp);
3200                         else if (lp->lp_state & LNET_PEER_FORCE_PUSH)
3201                                 rc = lnet_peer_send_push(lp);
3202                         else if (lnet_peer_discovery_disabled)
3203                                 rc = lnet_peer_rediscover(lp);
3204                         else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
3205                                 rc = lnet_peer_send_ping(lp);
3206                         else if (lnet_peer_needs_push(lp))
3207                                 rc = lnet_peer_send_push(lp);
3208                         else
3209                                 rc = lnet_peer_discovered(lp);
3210                         CDEBUG(D_NET, "peer %s state %#x rc %d\n",
3211                                 libcfs_nid2str(lp->lp_primary_nid),
3212                                 lp->lp_state, rc);
3213                         spin_unlock(&lp->lp_lock);
3214
3215                         lnet_net_lock(LNET_LOCK_EX);
3216                         if (rc == LNET_REDISCOVER_PEER) {
3217                                 list_move(&lp->lp_dc_list,
3218                                           &the_lnet.ln_dc_request);
3219                         } else if (rc) {
3220                                 lnet_peer_discovery_error(lp, rc);
3221                         }
3222                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
3223                                 lnet_peer_discovery_complete(lp);
3224                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3225                                 break;
3226                 }
3227
3228                 lnet_net_unlock(LNET_LOCK_EX);
3229         }
3230
3231         CDEBUG(D_NET, "stopping\n");
3232         /*
3233          * Clean up before telling lnet_peer_discovery_stop() that
3234          * we're done. Use wake_up() below to somewhat reduce the
3235          * size of the thundering herd if there are multiple threads
3236          * waiting on discovery of a single peer.
3237          */
3238
3239         /* Queue cleanup 1: stop all pending pings and pushes. */
3240         lnet_net_lock(LNET_LOCK_EX);
3241         while (!list_empty(&the_lnet.ln_dc_working)) {
3242                 lp = list_first_entry(&the_lnet.ln_dc_working,
3243                                       struct lnet_peer, lp_dc_list);
3244                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
3245                 lnet_net_unlock(LNET_LOCK_EX);
3246                 lnet_peer_cancel_discovery(lp);
3247                 lnet_net_lock(LNET_LOCK_EX);
3248         }
3249         lnet_net_unlock(LNET_LOCK_EX);
3250
3251         /* Queue cleanup 2: wait for the expired queue to clear. */
3252         while (!list_empty(&the_lnet.ln_dc_expired))
3253                 schedule_timeout(cfs_time_seconds(1));
3254
3255         /* Queue cleanup 3: clear the request queue. */
3256         lnet_net_lock(LNET_LOCK_EX);
3257         while (!list_empty(&the_lnet.ln_dc_request)) {
3258                 lp = list_first_entry(&the_lnet.ln_dc_request,
3259                                       struct lnet_peer, lp_dc_list);
3260                 lnet_peer_discovery_error(lp, -ESHUTDOWN);
3261                 lnet_peer_discovery_complete(lp);
3262         }
3263         lnet_net_unlock(LNET_LOCK_EX);
3264
3265         LNetEQFree(the_lnet.ln_dc_eqh);
3266         LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3267
3268         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3269         wake_up(&the_lnet.ln_dc_waitq);
3270
3271         CDEBUG(D_NET, "stopped\n");
3272
3273         return 0;
3274 }
3275
3276 /* ln_api_mutex is held on entry. */
3277 int lnet_peer_discovery_start(void)
3278 {
3279         struct task_struct *task;
3280         int rc;
3281
3282         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
3283                 return -EALREADY;
3284
3285         rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
3286         if (rc != 0) {
3287                 CERROR("Can't allocate discovery EQ: %d\n", rc);
3288                 return rc;
3289         }
3290
3291         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
3292         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
3293         if (IS_ERR(task)) {
3294                 rc = PTR_ERR(task);
3295                 CERROR("Can't start peer discovery thread: %d\n", rc);
3296
3297                 LNetEQFree(the_lnet.ln_dc_eqh);
3298                 LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3299
3300                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3301         }
3302
3303         CDEBUG(D_NET, "discovery start: %d\n", rc);
3304
3305         return rc;
3306 }
3307
3308 /* ln_api_mutex is held on entry. */
3309 void lnet_peer_discovery_stop(void)
3310 {
3311         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
3312                 return;
3313
3314         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
3315         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
3316         wake_up(&the_lnet.ln_dc_waitq);
3317
3318         wait_event(the_lnet.ln_dc_waitq,
3319                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
3320
3321         LASSERT(list_empty(&the_lnet.ln_dc_request));
3322         LASSERT(list_empty(&the_lnet.ln_dc_working));
3323         LASSERT(list_empty(&the_lnet.ln_dc_expired));
3324
3325         CDEBUG(D_NET, "discovery stopped\n");
3326 }
3327
3328 /* Debugging */
3329
3330 void
3331 lnet_debug_peer(lnet_nid_t nid)
3332 {
3333         char                    *aliveness = "NA";
3334         struct lnet_peer_ni     *lp;
3335         int                     cpt;
3336
3337         cpt = lnet_cpt_of_nid(nid, NULL);
3338         lnet_net_lock(cpt);
3339
3340         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
3341         if (IS_ERR(lp)) {
3342                 lnet_net_unlock(cpt);
3343                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
3344                 return;
3345         }
3346
3347         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
3348                 aliveness = (lnet_is_peer_ni_alive(lp)) ? "up" : "down";
3349
3350         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
3351                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
3352                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
3353                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
3354                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
3355
3356         lnet_peer_ni_decref_locked(lp);
3357
3358         lnet_net_unlock(cpt);
3359 }
3360
3361 /* Gathering information for userspace. */
3362
3363 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
3364                           char aliveness[LNET_MAX_STR_LEN],
3365                           __u32 *cpt_iter, __u32 *refcount,
3366                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
3367                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
3368                           __u32 *peer_tx_qnob)
3369 {
3370         struct lnet_peer_table          *peer_table;
3371         struct lnet_peer_ni             *lp;
3372         int                             j;
3373         int                             lncpt;
3374         bool                            found = false;
3375
3376         /* get the number of CPTs */
3377         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3378
3379         /* if the cpt number to be examined is >= the number of cpts in
3380          * the system then indicate that there are no more cpts to examin
3381          */
3382         if (*cpt_iter >= lncpt)
3383                 return -ENOENT;
3384
3385         /* get the current table */
3386         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
3387         /* if the ptable is NULL then there are no more cpts to examine */
3388         if (peer_table == NULL)
3389                 return -ENOENT;
3390
3391         lnet_net_lock(*cpt_iter);
3392
3393         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
3394                 struct list_head *peers = &peer_table->pt_hash[j];
3395
3396                 list_for_each_entry(lp, peers, lpni_hashlist) {
3397                         if (peer_index-- > 0)
3398                                 continue;
3399
3400                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
3401                         if (lnet_isrouter(lp) ||
3402                                 lnet_peer_aliveness_enabled(lp))
3403                                 snprintf(aliveness, LNET_MAX_STR_LEN,
3404                                          lnet_is_peer_ni_alive(lp) ? "up" : "down");
3405
3406                         *nid = lp->lpni_nid;
3407                         *refcount = atomic_read(&lp->lpni_refcount);
3408                         *ni_peer_tx_credits =
3409                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
3410                         *peer_tx_credits = lp->lpni_txcredits;
3411                         *peer_rtr_credits = lp->lpni_rtrcredits;
3412                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
3413                         *peer_tx_qnob = lp->lpni_txqnob;
3414
3415                         found = true;
3416                 }
3417
3418         }
3419         lnet_net_unlock(*cpt_iter);
3420
3421         *cpt_iter = lncpt;
3422
3423         return found ? 0 : -ENOENT;
3424 }
3425
3426 /* ln_api_mutex is held, which keeps the peer list stable */
3427 int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
3428 {
3429         struct lnet_ioctl_element_stats *lpni_stats;
3430         struct lnet_ioctl_element_msg_stats *lpni_msg_stats;
3431         struct lnet_ioctl_peer_ni_hstats *lpni_hstats;
3432         struct lnet_peer_ni_credit_info *lpni_info;
3433         struct lnet_peer_ni *lpni;
3434         struct lnet_peer *lp;
3435         lnet_nid_t nid;
3436         __u32 size;
3437         int rc;
3438
3439         lp = lnet_find_peer(cfg->prcfg_prim_nid);
3440
3441         if (!lp) {
3442                 rc = -ENOENT;
3443                 goto out;
3444         }
3445
3446         size = sizeof(nid) + sizeof(*lpni_info) + sizeof(*lpni_stats)
3447                 + sizeof(*lpni_msg_stats) + sizeof(*lpni_hstats);
3448         size *= lp->lp_nnis;
3449         if (size > cfg->prcfg_size) {
3450                 cfg->prcfg_size = size;
3451                 rc = -E2BIG;
3452                 goto out_lp_decref;
3453         }
3454
3455         cfg->prcfg_prim_nid = lp->lp_primary_nid;
3456         cfg->prcfg_mr = lnet_peer_is_multi_rail(lp);
3457         cfg->prcfg_cfg_nid = lp->lp_primary_nid;
3458         cfg->prcfg_count = lp->lp_nnis;
3459         cfg->prcfg_size = size;
3460         cfg->prcfg_state = lp->lp_state;
3461
3462         /* Allocate helper buffers. */
3463         rc = -ENOMEM;
3464         LIBCFS_ALLOC(lpni_info, sizeof(*lpni_info));
3465         if (!lpni_info)
3466                 goto out_lp_decref;
3467         LIBCFS_ALLOC(lpni_stats, sizeof(*lpni_stats));
3468         if (!lpni_stats)
3469                 goto out_free_info;
3470         LIBCFS_ALLOC(lpni_msg_stats, sizeof(*lpni_msg_stats));
3471         if (!lpni_msg_stats)
3472                 goto out_free_stats;
3473         LIBCFS_ALLOC(lpni_hstats, sizeof(*lpni_hstats));
3474         if (!lpni_hstats)
3475                 goto out_free_msg_stats;
3476
3477
3478         lpni = NULL;
3479         rc = -EFAULT;
3480         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
3481                 nid = lpni->lpni_nid;
3482                 if (copy_to_user(bulk, &nid, sizeof(nid)))
3483                         goto out_free_hstats;
3484                 bulk += sizeof(nid);
3485
3486                 memset(lpni_info, 0, sizeof(*lpni_info));
3487                 snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
3488                 if (lnet_isrouter(lpni) ||
3489                         lnet_peer_aliveness_enabled(lpni))
3490                         snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN,
3491                                 lnet_is_peer_ni_alive(lpni) ? "up" : "down");
3492
3493                 lpni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
3494                 lpni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
3495                         lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
3496                 lpni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
3497                 lpni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
3498                 lpni_info->cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
3499                 lpni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
3500                 lpni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
3501                 if (copy_to_user(bulk, lpni_info, sizeof(*lpni_info)))
3502                         goto out_free_hstats;
3503                 bulk += sizeof(*lpni_info);
3504
3505                 memset(lpni_stats, 0, sizeof(*lpni_stats));
3506                 lpni_stats->iel_send_count = lnet_sum_stats(&lpni->lpni_stats,
3507                                                             LNET_STATS_TYPE_SEND);
3508                 lpni_stats->iel_recv_count = lnet_sum_stats(&lpni->lpni_stats,
3509                                                             LNET_STATS_TYPE_RECV);
3510                 lpni_stats->iel_drop_count = lnet_sum_stats(&lpni->lpni_stats,
3511                                                             LNET_STATS_TYPE_DROP);
3512                 if (copy_to_user(bulk, lpni_stats, sizeof(*lpni_stats)))
3513                         goto out_free_hstats;
3514                 bulk += sizeof(*lpni_stats);
3515                 lnet_usr_translate_stats(lpni_msg_stats, &lpni->lpni_stats);
3516                 if (copy_to_user(bulk, lpni_msg_stats, sizeof(*lpni_msg_stats)))
3517                         goto out_free_hstats;
3518                 bulk += sizeof(*lpni_msg_stats);
3519                 lpni_hstats->hlpni_network_timeout =
3520                   atomic_read(&lpni->lpni_hstats.hlt_network_timeout);
3521                 lpni_hstats->hlpni_remote_dropped =
3522                   atomic_read(&lpni->lpni_hstats.hlt_remote_dropped);
3523                 lpni_hstats->hlpni_remote_timeout =
3524                   atomic_read(&lpni->lpni_hstats.hlt_remote_timeout);
3525                 lpni_hstats->hlpni_remote_error =
3526                   atomic_read(&lpni->lpni_hstats.hlt_remote_error);
3527                 lpni_hstats->hlpni_health_value =
3528                   atomic_read(&lpni->lpni_healthv);
3529                 if (copy_to_user(bulk, lpni_hstats, sizeof(*lpni_hstats)))
3530                         goto out_free_hstats;
3531                 bulk += sizeof(*lpni_hstats);
3532         }
3533         rc = 0;
3534
3535 out_free_hstats:
3536         LIBCFS_FREE(lpni_hstats, sizeof(*lpni_hstats));
3537 out_free_msg_stats:
3538         LIBCFS_FREE(lpni_msg_stats, sizeof(*lpni_msg_stats));
3539 out_free_stats:
3540         LIBCFS_FREE(lpni_stats, sizeof(*lpni_stats));
3541 out_free_info:
3542         LIBCFS_FREE(lpni_info, sizeof(*lpni_info));
3543 out_lp_decref:
3544         lnet_peer_decref_locked(lp);
3545 out:
3546         return rc;
3547 }
3548
3549 void
3550 lnet_peer_ni_add_to_recoveryq_locked(struct lnet_peer_ni *lpni)
3551 {
3552         /* the mt could've shutdown and cleaned up the queues */
3553         if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
3554                 return;
3555
3556         if (list_empty(&lpni->lpni_recovery) &&
3557             atomic_read(&lpni->lpni_healthv) < LNET_MAX_HEALTH_VALUE) {
3558                 CERROR("lpni %s added to recovery queue. Health = %d\n",
3559                         libcfs_nid2str(lpni->lpni_nid),
3560                         atomic_read(&lpni->lpni_healthv));
3561                 list_add_tail(&lpni->lpni_recovery, &the_lnet.ln_mt_peerNIRecovq);
3562                 lnet_peer_ni_addref_locked(lpni);
3563         }
3564 }
3565
3566 /* Call with the ln_api_mutex held */
3567 void
3568 lnet_peer_ni_set_healthv(lnet_nid_t nid, int value, bool all)
3569 {
3570         struct lnet_peer_table *ptable;
3571         struct lnet_peer *lp;
3572         struct lnet_peer_net *lpn;
3573         struct lnet_peer_ni *lpni;
3574         int lncpt;
3575         int cpt;
3576
3577         if (the_lnet.ln_state != LNET_STATE_RUNNING)
3578                 return;
3579
3580         if (!all) {
3581                 lnet_net_lock(LNET_LOCK_EX);
3582                 lpni = lnet_find_peer_ni_locked(nid);
3583                 if (!lpni) {
3584                         lnet_net_unlock(LNET_LOCK_EX);
3585                         return;
3586                 }
3587                 atomic_set(&lpni->lpni_healthv, value);
3588                 lnet_peer_ni_add_to_recoveryq_locked(lpni);
3589                 lnet_peer_ni_decref_locked(lpni);
3590                 lnet_net_unlock(LNET_LOCK_EX);
3591                 return;
3592         }
3593
3594         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3595
3596         /*
3597          * Walk all the peers and reset the healhv for each one to the
3598          * maximum value.
3599          */
3600         lnet_net_lock(LNET_LOCK_EX);
3601         for (cpt = 0; cpt < lncpt; cpt++) {
3602                 ptable = the_lnet.ln_peer_tables[cpt];
3603                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
3604                         list_for_each_entry(lpn, &lp->lp_peer_nets, lpn_peer_nets) {
3605                                 list_for_each_entry(lpni, &lpn->lpn_peer_nis,
3606                                                     lpni_peer_nis) {
3607                                         atomic_set(&lpni->lpni_healthv, value);
3608                                         lnet_peer_ni_add_to_recoveryq_locked(lpni);
3609                                 }
3610                         }
3611                 }
3612         }
3613         lnet_net_unlock(LNET_LOCK_EX);
3614 }
3615