Whamcloud - gitweb
c15ae264db01bb2d27e64198c8f2dd5853990d7b
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lnet/lnet/peer.c
32  */
33
34 #define DEBUG_SUBSYSTEM S_LNET
35
36 #include <linux/sched.h>
37 #ifdef HAVE_SCHED_HEADERS
38 #include <linux/sched/signal.h>
39 #endif
40 #include <linux/uaccess.h>
41
42 #include <lnet/udsp.h>
43 #include <lnet/lib-lnet.h>
44 #include <uapi/linux/lnet/lnet-dlc.h>
45
46 /* Value indicating that recovery needs to re-check a peer immediately. */
47 #define LNET_REDISCOVER_PEER    (1)
48
49 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp);
50
51 static void
52 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
53 {
54         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
55                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
56                 lnet_peer_ni_decref_locked(lpni);
57         }
58 }
59
60 void
61 lnet_peer_net_added(struct lnet_net *net)
62 {
63         struct lnet_peer_ni *lpni, *tmp;
64
65         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
66                                  lpni_on_remote_peer_ni_list) {
67
68                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
69                         lpni->lpni_net = net;
70
71                         spin_lock(&lpni->lpni_lock);
72                         lpni->lpni_txcredits =
73                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
74                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
75                         lpni->lpni_rtrcredits =
76                                 lnet_peer_buffer_credits(lpni->lpni_net);
77                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
78                         spin_unlock(&lpni->lpni_lock);
79
80                         lnet_peer_remove_from_remote_list(lpni);
81                 }
82         }
83 }
84
85 static void
86 lnet_peer_tables_destroy(void)
87 {
88         struct lnet_peer_table  *ptable;
89         struct list_head        *hash;
90         int                     i;
91         int                     j;
92
93         if (!the_lnet.ln_peer_tables)
94                 return;
95
96         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
97                 hash = ptable->pt_hash;
98                 if (!hash) /* not intialized */
99                         break;
100
101                 LASSERT(list_empty(&ptable->pt_zombie_list));
102
103                 ptable->pt_hash = NULL;
104                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
105                         LASSERT(list_empty(&hash[j]));
106
107                 CFS_FREE_PTR_ARRAY(hash, LNET_PEER_HASH_SIZE);
108         }
109
110         cfs_percpt_free(the_lnet.ln_peer_tables);
111         the_lnet.ln_peer_tables = NULL;
112 }
113
114 int
115 lnet_peer_tables_create(void)
116 {
117         struct lnet_peer_table  *ptable;
118         struct list_head        *hash;
119         int                     i;
120         int                     j;
121
122         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
123                                                    sizeof(*ptable));
124         if (the_lnet.ln_peer_tables == NULL) {
125                 CERROR("Failed to allocate cpu-partition peer tables\n");
126                 return -ENOMEM;
127         }
128
129         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
130                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
131                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
132                 if (hash == NULL) {
133                         CERROR("Failed to create peer hash table\n");
134                         lnet_peer_tables_destroy();
135                         return -ENOMEM;
136                 }
137
138                 spin_lock_init(&ptable->pt_zombie_lock);
139                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
140
141                 INIT_LIST_HEAD(&ptable->pt_peer_list);
142
143                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
144                         INIT_LIST_HEAD(&hash[j]);
145                 ptable->pt_hash = hash; /* sign of initialization */
146         }
147
148         return 0;
149 }
150
151 static struct lnet_peer_ni *
152 lnet_peer_ni_alloc(lnet_nid_t nid)
153 {
154         struct lnet_peer_ni *lpni;
155         struct lnet_net *net;
156         int cpt;
157
158         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
159
160         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
161         if (!lpni)
162                 return NULL;
163
164         INIT_LIST_HEAD(&lpni->lpni_txq);
165         INIT_LIST_HEAD(&lpni->lpni_hashlist);
166         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
167         INIT_LIST_HEAD(&lpni->lpni_recovery);
168         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
169         INIT_LIST_HEAD(&lpni->lpni_rtr_pref_nids);
170         LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
171         kref_init(&lpni->lpni_kref);
172         lpni->lpni_sel_priority = LNET_MAX_SELECTION_PRIORITY;
173
174         spin_lock_init(&lpni->lpni_lock);
175
176         if (lnet_peers_start_down())
177                 lpni->lpni_ns_status = LNET_NI_STATUS_DOWN;
178         else
179                 lpni->lpni_ns_status = LNET_NI_STATUS_UP;
180         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
181         lpni->lpni_nid = nid;
182         lpni->lpni_cpt = cpt;
183         atomic_set(&lpni->lpni_healthv, LNET_MAX_HEALTH_VALUE);
184
185         net = lnet_get_net_locked(LNET_NIDNET(nid));
186         lpni->lpni_net = net;
187         if (net) {
188                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
189                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
190                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
191                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
192         } else {
193                 /*
194                  * This peer_ni is not on a local network, so we
195                  * cannot add the credits here. In case the net is
196                  * added later, add the peer_ni to the remote peer ni
197                  * list so it can be easily found and revisited.
198                  */
199                 /* FIXME: per-net implementation instead? */
200                 lnet_peer_ni_addref_locked(lpni);
201                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
202                               &the_lnet.ln_remote_peer_ni_list);
203         }
204
205         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
206
207         return lpni;
208 }
209
210 static struct lnet_peer_net *
211 lnet_peer_net_alloc(__u32 net_id)
212 {
213         struct lnet_peer_net *lpn;
214
215         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
216         if (!lpn)
217                 return NULL;
218
219         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
220         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
221         lpn->lpn_net_id = net_id;
222         lpn->lpn_sel_priority = LNET_MAX_SELECTION_PRIORITY;
223
224         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
225
226         return lpn;
227 }
228
229 void
230 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
231 {
232         struct lnet_peer *lp;
233
234         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
235
236         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
237         LASSERT(list_empty(&lpn->lpn_peer_nis));
238         LASSERT(list_empty(&lpn->lpn_peer_nets));
239         lp = lpn->lpn_peer;
240         lpn->lpn_peer = NULL;
241         LIBCFS_FREE(lpn, sizeof(*lpn));
242
243         lnet_peer_decref_locked(lp);
244 }
245
246 static struct lnet_peer *
247 lnet_peer_alloc(lnet_nid_t nid)
248 {
249         struct lnet_peer *lp;
250
251         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
252         if (!lp)
253                 return NULL;
254
255         INIT_LIST_HEAD(&lp->lp_rtrq);
256         INIT_LIST_HEAD(&lp->lp_routes);
257         INIT_LIST_HEAD(&lp->lp_peer_list);
258         INIT_LIST_HEAD(&lp->lp_peer_nets);
259         INIT_LIST_HEAD(&lp->lp_dc_list);
260         INIT_LIST_HEAD(&lp->lp_dc_pendq);
261         INIT_LIST_HEAD(&lp->lp_rtr_list);
262         init_waitqueue_head(&lp->lp_dc_waitq);
263         spin_lock_init(&lp->lp_lock);
264         lp->lp_primary_nid = nid;
265         lp->lp_disc_src_nid = LNET_NID_ANY;
266         if (lnet_peers_start_down())
267                 lp->lp_alive = false;
268         else
269                 lp->lp_alive = true;
270
271         /*
272          * all peers created on a router should have health on
273          * if it's not already on.
274          */
275         if (the_lnet.ln_routing && !lnet_health_sensitivity)
276                 lp->lp_health_sensitivity = 1;
277
278         /*
279          * Turn off discovery for loopback peer. If you're creating a peer
280          * for the loopback interface then that was initiated when we
281          * attempted to send a message over the loopback. There is no need
282          * to ever use a different interface when sending messages to
283          * myself.
284          */
285         if (nid == LNET_NID_LO_0)
286                 lp->lp_state = LNET_PEER_NO_DISCOVERY;
287         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
288
289         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
290
291         return lp;
292 }
293
294 void
295 lnet_destroy_peer_locked(struct lnet_peer *lp)
296 {
297         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
298
299         LASSERT(atomic_read(&lp->lp_refcount) == 0);
300         LASSERT(lp->lp_rtr_refcount == 0);
301         LASSERT(list_empty(&lp->lp_peer_nets));
302         LASSERT(list_empty(&lp->lp_peer_list));
303         LASSERT(list_empty(&lp->lp_dc_list));
304
305         if (lp->lp_data)
306                 lnet_ping_buffer_decref(lp->lp_data);
307
308         /*
309          * if there are messages still on the pending queue, then make
310          * sure to queue them on the ln_msg_resend list so they can be
311          * resent at a later point if the discovery thread is still
312          * running.
313          * If the discovery thread has stopped, then the wakeup will be a
314          * no-op, and it is expected the lnet_shutdown_lndnets() will
315          * eventually be called, which will traverse this list and
316          * finalize the messages on the list.
317          * We can not resend them now because we're holding the cpt lock.
318          * Releasing the lock can cause an inconsistent state
319          */
320         spin_lock(&the_lnet.ln_msg_resend_lock);
321         spin_lock(&lp->lp_lock);
322         list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
323         spin_unlock(&lp->lp_lock);
324         spin_unlock(&the_lnet.ln_msg_resend_lock);
325         wake_up(&the_lnet.ln_dc_waitq);
326
327         LIBCFS_FREE(lp, sizeof(*lp));
328 }
329
330 /*
331  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
332  * that peer_net, detach the peer_net from the peer.
333  *
334  * Call with lnet_net_lock/EX held
335  */
336 static void
337 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
338 {
339         struct lnet_peer_table *ptable;
340         struct lnet_peer_net *lpn;
341         struct lnet_peer *lp;
342
343         /*
344          * Belts and suspenders: gracefully handle teardown of a
345          * partially connected peer_ni.
346          */
347         lpn = lpni->lpni_peer_net;
348
349         list_del_init(&lpni->lpni_peer_nis);
350         /*
351          * If there are no lpni's left, we detach lpn from
352          * lp_peer_nets, so it cannot be found anymore.
353          */
354         if (list_empty(&lpn->lpn_peer_nis))
355                 list_del_init(&lpn->lpn_peer_nets);
356
357         /* Update peer NID count. */
358         lp = lpn->lpn_peer;
359         lp->lp_nnis--;
360
361         /*
362          * If there are no more peer nets, make the peer unfindable
363          * via the peer_tables.
364          *
365          * Otherwise, if the peer is DISCOVERED, tell discovery to
366          * take another look at it. This is a no-op if discovery for
367          * this peer did the detaching.
368          */
369         if (list_empty(&lp->lp_peer_nets)) {
370                 list_del_init(&lp->lp_peer_list);
371                 ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
372                 ptable->pt_peers--;
373         } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) {
374                 /* Discovery isn't running, nothing to do here. */
375         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
376                 lnet_peer_queue_for_discovery(lp);
377                 wake_up(&the_lnet.ln_dc_waitq);
378         }
379         CDEBUG(D_NET, "peer %s NID %s\n",
380                 libcfs_nid2str(lp->lp_primary_nid),
381                 libcfs_nid2str(lpni->lpni_nid));
382 }
383
384 /* called with lnet_net_lock LNET_LOCK_EX held */
385 static int
386 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni, bool force)
387 {
388         struct lnet_peer_table *ptable = NULL;
389
390         /* don't remove a peer_ni if it's also a gateway */
391         if (lnet_isrouter(lpni) && !force) {
392                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
393                        libcfs_nid2str(lpni->lpni_nid));
394                 return -EBUSY;
395         }
396
397         lnet_peer_remove_from_remote_list(lpni);
398
399         /* remove peer ni from the hash list. */
400         list_del_init(&lpni->lpni_hashlist);
401
402         /*
403          * indicate the peer is being deleted so the monitor thread can
404          * remove it from the recovery queue.
405          */
406         spin_lock(&lpni->lpni_lock);
407         lpni->lpni_state |= LNET_PEER_NI_DELETING;
408         spin_unlock(&lpni->lpni_lock);
409
410         /* decrement the ref count on the peer table */
411         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
412
413         /*
414          * The peer_ni can no longer be found with a lookup. But there
415          * can be current users, so keep track of it on the zombie
416          * list until the reference count has gone to zero.
417          *
418          * The last reference may be lost in a place where the
419          * lnet_net_lock locks only a single cpt, and that cpt may not
420          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
421          * has its own lock.
422          */
423         spin_lock(&ptable->pt_zombie_lock);
424         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
425         ptable->pt_zombies++;
426         spin_unlock(&ptable->pt_zombie_lock);
427
428         /* no need to keep this peer_ni on the hierarchy anymore */
429         lnet_peer_detach_peer_ni_locked(lpni);
430
431         /* remove hashlist reference on peer_ni */
432         lnet_peer_ni_decref_locked(lpni);
433
434         return 0;
435 }
436
437 void lnet_peer_uninit(void)
438 {
439         struct lnet_peer_ni *lpni, *tmp;
440
441         lnet_net_lock(LNET_LOCK_EX);
442
443         /* remove all peer_nis from the remote peer and the hash list */
444         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
445                                  lpni_on_remote_peer_ni_list)
446                 lnet_peer_ni_del_locked(lpni, false);
447
448         lnet_peer_tables_destroy();
449
450         lnet_net_unlock(LNET_LOCK_EX);
451 }
452
453 static int
454 lnet_peer_del_locked(struct lnet_peer *peer)
455 {
456         struct lnet_peer_ni *lpni = NULL, *lpni2;
457         int rc = 0, rc2 = 0;
458
459         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
460
461         spin_lock(&peer->lp_lock);
462         peer->lp_state |= LNET_PEER_MARK_DELETED;
463         spin_unlock(&peer->lp_lock);
464
465         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
466         while (lpni != NULL) {
467                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
468                 rc = lnet_peer_ni_del_locked(lpni, false);
469                 if (rc != 0)
470                         rc2 = rc;
471                 lpni = lpni2;
472         }
473
474         return rc2;
475 }
476
477 /*
478  * Discovering this peer is taking too long. Cancel any Ping or Push
479  * that discovery is waiting on by unlinking the relevant MDs. The
480  * lnet_discovery_event_handler() will proceed from here and complete
481  * the cleanup.
482  */
483 static void lnet_peer_cancel_discovery(struct lnet_peer *lp)
484 {
485         struct lnet_handle_md ping_mdh;
486         struct lnet_handle_md push_mdh;
487
488         LNetInvalidateMDHandle(&ping_mdh);
489         LNetInvalidateMDHandle(&push_mdh);
490
491         spin_lock(&lp->lp_lock);
492         if (lp->lp_state & LNET_PEER_PING_SENT) {
493                 ping_mdh = lp->lp_ping_mdh;
494                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
495         }
496         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
497                 push_mdh = lp->lp_push_mdh;
498                 LNetInvalidateMDHandle(&lp->lp_push_mdh);
499         }
500         spin_unlock(&lp->lp_lock);
501
502         if (!LNetMDHandleIsInvalid(ping_mdh))
503                 LNetMDUnlink(ping_mdh);
504         if (!LNetMDHandleIsInvalid(push_mdh))
505                 LNetMDUnlink(push_mdh);
506 }
507
508 static int
509 lnet_peer_del(struct lnet_peer *peer)
510 {
511         lnet_peer_cancel_discovery(peer);
512         lnet_net_lock(LNET_LOCK_EX);
513         lnet_peer_del_locked(peer);
514         lnet_net_unlock(LNET_LOCK_EX);
515
516         return 0;
517 }
518
519 /*
520  * Delete a NID from a peer. Call with ln_api_mutex held.
521  *
522  * Error codes:
523  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
524  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
525  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
526  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
527  */
528 static int
529 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
530 {
531         struct lnet_peer_ni *lpni;
532         lnet_nid_t primary_nid = lp->lp_primary_nid;
533         int rc = 0;
534         bool force = (flags & LNET_PEER_RTR_NI_FORCE_DEL) ? true : false;
535
536         if (!(flags & LNET_PEER_CONFIGURED)) {
537                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
538                         rc = -EPERM;
539                         goto out;
540                 }
541         }
542         lpni = lnet_find_peer_ni_locked(nid);
543         if (!lpni) {
544                 rc = -ENOENT;
545                 goto out;
546         }
547         lnet_peer_ni_decref_locked(lpni);
548         if (lp != lpni->lpni_peer_net->lpn_peer) {
549                 rc = -ECHILD;
550                 goto out;
551         }
552
553         /*
554          * This function only allows deletion of the primary NID if it
555          * is the only NID.
556          */
557         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1 && !force) {
558                 rc = -EBUSY;
559                 goto out;
560         }
561
562         lnet_net_lock(LNET_LOCK_EX);
563
564         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1 && force) {
565                 struct lnet_peer_ni *lpni2;
566                 /* assign the next peer_ni to be the primary */
567                 lpni2 = lnet_get_next_peer_ni_locked(lp, NULL, lpni);
568                 LASSERT(lpni2);
569                 lp->lp_primary_nid = lpni2->lpni_nid;
570         }
571         rc = lnet_peer_ni_del_locked(lpni, force);
572
573         lnet_net_unlock(LNET_LOCK_EX);
574
575 out:
576         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
577                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
578
579         return rc;
580 }
581
582 static void
583 lnet_peer_table_cleanup_locked(struct lnet_net *net,
584                                struct lnet_peer_table *ptable)
585 {
586         int                      i;
587         struct lnet_peer_ni     *next;
588         struct lnet_peer_ni     *lpni;
589         struct lnet_peer        *peer;
590
591         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
592                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
593                                          lpni_hashlist) {
594                         if (net != NULL && net != lpni->lpni_net)
595                                 continue;
596
597                         peer = lpni->lpni_peer_net->lpn_peer;
598                         if (peer->lp_primary_nid != lpni->lpni_nid) {
599                                 lnet_peer_ni_del_locked(lpni, false);
600                                 continue;
601                         }
602                         /*
603                          * Removing the primary NID implies removing
604                          * the entire peer. Advance next beyond any
605                          * peer_ni that belongs to the same peer.
606                          */
607                         list_for_each_entry_from(next, &ptable->pt_hash[i],
608                                                  lpni_hashlist) {
609                                 if (next->lpni_peer_net->lpn_peer != peer)
610                                         break;
611                         }
612                         lnet_peer_del_locked(peer);
613                 }
614         }
615 }
616
617 static void
618 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
619 {
620         wait_var_event_warning(&ptable->pt_zombies,
621                                ptable->pt_zombies == 0,
622                                "Waiting for %d zombies on peer table\n",
623                                ptable->pt_zombies);
624 }
625
626 static void
627 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
628                                 struct lnet_peer_table *ptable)
629 {
630         struct lnet_peer_ni     *lp;
631         struct lnet_peer_ni     *tmp;
632         lnet_nid_t              gw_nid;
633         int                     i;
634
635         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
636                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
637                                          lpni_hashlist) {
638                         if (net != lp->lpni_net)
639                                 continue;
640
641                         if (!lnet_isrouter(lp))
642                                 continue;
643
644                         gw_nid = lp->lpni_peer_net->lpn_peer->lp_primary_nid;
645
646                         lnet_net_unlock(LNET_LOCK_EX);
647                         lnet_del_route(LNET_NET_ANY, gw_nid);
648                         lnet_net_lock(LNET_LOCK_EX);
649                 }
650         }
651 }
652
653 void
654 lnet_peer_tables_cleanup(struct lnet_net *net)
655 {
656         int i;
657         struct lnet_peer_table *ptable;
658
659         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
660         /* If just deleting the peers for a NI, get rid of any routes these
661          * peers are gateways for. */
662         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
663                 lnet_net_lock(LNET_LOCK_EX);
664                 lnet_peer_table_del_rtrs_locked(net, ptable);
665                 lnet_net_unlock(LNET_LOCK_EX);
666         }
667
668         /* Start the cleanup process */
669         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
670                 lnet_net_lock(LNET_LOCK_EX);
671                 lnet_peer_table_cleanup_locked(net, ptable);
672                 lnet_net_unlock(LNET_LOCK_EX);
673         }
674
675         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
676                 lnet_peer_ni_finalize_wait(ptable);
677 }
678
679 static struct lnet_peer_ni *
680 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
681 {
682         struct list_head        *peers;
683         struct lnet_peer_ni     *lp;
684
685         if (the_lnet.ln_state != LNET_STATE_RUNNING)
686                 return NULL;
687
688         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
689         list_for_each_entry(lp, peers, lpni_hashlist) {
690                 if (lp->lpni_nid == nid) {
691                         lnet_peer_ni_addref_locked(lp);
692                         return lp;
693                 }
694         }
695
696         return NULL;
697 }
698
699 struct lnet_peer_ni *
700 lnet_find_peer_ni_locked(lnet_nid_t nid)
701 {
702         struct lnet_peer_ni *lpni;
703         struct lnet_peer_table *ptable;
704         int cpt;
705
706         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
707
708         ptable = the_lnet.ln_peer_tables[cpt];
709         lpni = lnet_get_peer_ni_locked(ptable, nid);
710
711         return lpni;
712 }
713
714 struct lnet_peer_ni *
715 lnet_peer_get_ni_locked(struct lnet_peer *lp, lnet_nid_t nid)
716 {
717         struct lnet_peer_net *lpn;
718         struct lnet_peer_ni *lpni;
719
720         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
721         if (!lpn)
722                 return NULL;
723
724         list_for_each_entry(lpni, &lpn->lpn_peer_nis, lpni_peer_nis) {
725                 if (lpni->lpni_nid == nid)
726                         return lpni;
727         }
728
729         return NULL;
730 }
731
732 struct lnet_peer *
733 lnet_find_peer(lnet_nid_t nid)
734 {
735         struct lnet_peer_ni *lpni;
736         struct lnet_peer *lp = NULL;
737         int cpt;
738
739         cpt = lnet_net_lock_current();
740         lpni = lnet_find_peer_ni_locked(nid);
741         if (lpni) {
742                 lp = lpni->lpni_peer_net->lpn_peer;
743                 lnet_peer_addref_locked(lp);
744                 lnet_peer_ni_decref_locked(lpni);
745         }
746         lnet_net_unlock(cpt);
747
748         return lp;
749 }
750
751 struct lnet_peer_net *
752 lnet_get_next_peer_net_locked(struct lnet_peer *lp, __u32 prev_lpn_id)
753 {
754         struct lnet_peer_net *net;
755
756         if (!prev_lpn_id) {
757                 /* no net id provided return the first net */
758                 net = list_first_entry_or_null(&lp->lp_peer_nets,
759                                                struct lnet_peer_net,
760                                                lpn_peer_nets);
761
762                 return net;
763         }
764
765         /* find the net after the one provided */
766         list_for_each_entry(net, &lp->lp_peer_nets, lpn_peer_nets) {
767                 if (net->lpn_net_id == prev_lpn_id) {
768                         /*
769                          * if we reached the end of the list loop to the
770                          * beginning.
771                          */
772                         if (net->lpn_peer_nets.next == &lp->lp_peer_nets)
773                                 return list_first_entry_or_null(&lp->lp_peer_nets,
774                                                                 struct lnet_peer_net,
775                                                                 lpn_peer_nets);
776                         else
777                                 return list_next_entry(net, lpn_peer_nets);
778                 }
779         }
780
781         return NULL;
782 }
783
784 struct lnet_peer_ni *
785 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
786                              struct lnet_peer_net *peer_net,
787                              struct lnet_peer_ni *prev)
788 {
789         struct lnet_peer_ni *lpni;
790         struct lnet_peer_net *net = peer_net;
791
792         if (!prev) {
793                 if (!net) {
794                         if (list_empty(&peer->lp_peer_nets))
795                                 return NULL;
796
797                         net = list_entry(peer->lp_peer_nets.next,
798                                          struct lnet_peer_net,
799                                          lpn_peer_nets);
800                 }
801                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
802                                   lpni_peer_nis);
803
804                 return lpni;
805         }
806
807         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
808                 /*
809                  * if you reached the end of the peer ni list and the peer
810                  * net is specified then there are no more peer nis in that
811                  * net.
812                  */
813                 if (net)
814                         return NULL;
815
816                 /*
817                  * we reached the end of this net ni list. move to the
818                  * next net
819                  */
820                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
821                     &peer->lp_peer_nets)
822                         /* no more nets and no more NIs. */
823                         return NULL;
824
825                 /* get the next net */
826                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
827                                  struct lnet_peer_net,
828                                  lpn_peer_nets);
829                 /* get the ni on it */
830                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
831                                   lpni_peer_nis);
832
833                 return lpni;
834         }
835
836         /* there are more nis left */
837         lpni = list_entry(prev->lpni_peer_nis.next,
838                           struct lnet_peer_ni, lpni_peer_nis);
839
840         return lpni;
841 }
842
843 /* Call with the ln_api_mutex held */
844 int lnet_get_peer_list(u32 *countp, u32 *sizep, struct lnet_process_id __user *ids)
845 {
846         struct lnet_process_id id;
847         struct lnet_peer_table *ptable;
848         struct lnet_peer *lp;
849         __u32 count = 0;
850         __u32 size = 0;
851         int lncpt;
852         int cpt;
853         __u32 i;
854         int rc;
855
856         rc = -ESHUTDOWN;
857         if (the_lnet.ln_state != LNET_STATE_RUNNING)
858                 goto done;
859
860         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
861
862         /*
863          * Count the number of peers, and return E2BIG if the buffer
864          * is too small. We'll also return the desired size.
865          */
866         rc = -E2BIG;
867         for (cpt = 0; cpt < lncpt; cpt++) {
868                 ptable = the_lnet.ln_peer_tables[cpt];
869                 count += ptable->pt_peers;
870         }
871         size = count * sizeof(*ids);
872         if (size > *sizep)
873                 goto done;
874
875         /*
876          * Walk the peer lists and copy out the primary nids.
877          * This is safe because the peer lists are only modified
878          * while the ln_api_mutex is held. So we don't need to
879          * hold the lnet_net_lock as well, and can therefore
880          * directly call copy_to_user().
881          */
882         rc = -EFAULT;
883         memset(&id, 0, sizeof(id));
884         id.pid = LNET_PID_LUSTRE;
885         i = 0;
886         for (cpt = 0; cpt < lncpt; cpt++) {
887                 ptable = the_lnet.ln_peer_tables[cpt];
888                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
889                         if (i >= count)
890                                 goto done;
891                         id.nid = lp->lp_primary_nid;
892                         if (copy_to_user(&ids[i], &id, sizeof(id)))
893                                 goto done;
894                         i++;
895                 }
896         }
897         rc = 0;
898 done:
899         *countp = count;
900         *sizep = size;
901         return rc;
902 }
903
904 /*
905  * Start pushes to peers that need to be updated for a configuration
906  * change on this node.
907  */
908 void
909 lnet_push_update_to_peers(int force)
910 {
911         struct lnet_peer_table *ptable;
912         struct lnet_peer *lp;
913         int lncpt;
914         int cpt;
915
916         lnet_net_lock(LNET_LOCK_EX);
917         if (lnet_peer_discovery_disabled)
918                 force = 0;
919         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
920         for (cpt = 0; cpt < lncpt; cpt++) {
921                 ptable = the_lnet.ln_peer_tables[cpt];
922                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
923                         if (force) {
924                                 spin_lock(&lp->lp_lock);
925                                 if (lp->lp_state & LNET_PEER_MULTI_RAIL)
926                                         lp->lp_state |= LNET_PEER_FORCE_PUSH;
927                                 spin_unlock(&lp->lp_lock);
928                         }
929                         if (lnet_peer_needs_push(lp))
930                                 lnet_peer_queue_for_discovery(lp);
931                 }
932         }
933         lnet_net_unlock(LNET_LOCK_EX);
934         wake_up(&the_lnet.ln_dc_waitq);
935 }
936
937 /* find the NID in the preferred gateways for the remote peer
938  * return:
939  *      false: list is not empty and NID is not preferred
940  *      false: list is empty
941  *      true: nid is found in the list
942  */
943 bool
944 lnet_peer_is_pref_rtr_locked(struct lnet_peer_ni *lpni,
945                              lnet_nid_t gw_nid)
946 {
947         struct lnet_nid_list *ne;
948
949         CDEBUG(D_NET, "%s: rtr pref emtpy: %d\n",
950                libcfs_nid2str(lpni->lpni_nid),
951                list_empty(&lpni->lpni_rtr_pref_nids));
952
953         if (list_empty(&lpni->lpni_rtr_pref_nids))
954                 return false;
955
956         /* iterate through all the preferred NIDs and see if any of them
957          * matches the provided gw_nid
958          */
959         list_for_each_entry(ne, &lpni->lpni_rtr_pref_nids, nl_list) {
960                 CDEBUG(D_NET, "Comparing pref %s with gw %s\n",
961                        libcfs_nid2str(ne->nl_nid),
962                        libcfs_nid2str(gw_nid));
963                 if (ne->nl_nid == gw_nid)
964                         return true;
965         }
966
967         return false;
968 }
969
970 void
971 lnet_peer_clr_pref_rtrs(struct lnet_peer_ni *lpni)
972 {
973         struct list_head zombies;
974         struct lnet_nid_list *ne;
975         struct lnet_nid_list *tmp;
976         int cpt = lpni->lpni_cpt;
977
978         INIT_LIST_HEAD(&zombies);
979
980         lnet_net_lock(cpt);
981         list_splice_init(&lpni->lpni_rtr_pref_nids, &zombies);
982         lnet_net_unlock(cpt);
983
984         list_for_each_entry_safe(ne, tmp, &zombies, nl_list) {
985                 list_del(&ne->nl_list);
986                 LIBCFS_FREE(ne, sizeof(*ne));
987         }
988 }
989
990 int
991 lnet_peer_add_pref_rtr(struct lnet_peer_ni *lpni,
992                        lnet_nid_t gw_nid)
993 {
994         int cpt = lpni->lpni_cpt;
995         struct lnet_nid_list *ne = NULL;
996
997         /* This function is called with api_mutex held. When the api_mutex
998          * is held the list can not be modified, as it is only modified as
999          * a result of applying a UDSP and that happens under api_mutex
1000          * lock.
1001          */
1002         __must_hold(&the_lnet.ln_api_mutex);
1003
1004         list_for_each_entry(ne, &lpni->lpni_rtr_pref_nids, nl_list) {
1005                 if (ne->nl_nid == gw_nid)
1006                         return -EEXIST;
1007         }
1008
1009         LIBCFS_CPT_ALLOC(ne, lnet_cpt_table(), cpt, sizeof(*ne));
1010         if (!ne)
1011                 return -ENOMEM;
1012
1013         ne->nl_nid = gw_nid;
1014
1015         /* Lock the cpt to protect against addition and checks in the
1016          * selection algorithm
1017          */
1018         lnet_net_lock(cpt);
1019         list_add(&ne->nl_list, &lpni->lpni_rtr_pref_nids);
1020         lnet_net_unlock(cpt);
1021
1022         return 0;
1023 }
1024
1025 /*
1026  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
1027  * this is a preferred point-to-point path. Call with lnet_net_lock in
1028  * shared mmode.
1029  */
1030 bool
1031 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
1032 {
1033         struct lnet_nid_list *ne;
1034
1035         if (lpni->lpni_pref_nnids == 0)
1036                 return false;
1037         if (lpni->lpni_pref_nnids == 1)
1038                 return lpni->lpni_pref.nid == nid;
1039         list_for_each_entry(ne, &lpni->lpni_pref.nids, nl_list) {
1040                 if (ne->nl_nid == nid)
1041                         return true;
1042         }
1043         return false;
1044 }
1045
1046 /*
1047  * Set a single ni as preferred, provided no preferred ni is already
1048  * defined. Only to be used for non-multi-rail peer_ni.
1049  */
1050 int
1051 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
1052 {
1053         int rc = 0;
1054
1055         spin_lock(&lpni->lpni_lock);
1056         if (nid == LNET_NID_ANY) {
1057                 rc = -EINVAL;
1058         } else if (lpni->lpni_pref_nnids > 0) {
1059                 rc = -EPERM;
1060         } else if (lpni->lpni_pref_nnids == 0) {
1061                 lpni->lpni_pref.nid = nid;
1062                 lpni->lpni_pref_nnids = 1;
1063                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
1064         }
1065         spin_unlock(&lpni->lpni_lock);
1066
1067         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1068                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
1069         return rc;
1070 }
1071
1072 /*
1073  * Clear the preferred NID from a non-multi-rail peer_ni, provided
1074  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
1075  */
1076 int
1077 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
1078 {
1079         int rc = 0;
1080
1081         spin_lock(&lpni->lpni_lock);
1082         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
1083                 lpni->lpni_pref_nnids = 0;
1084                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1085         } else if (lpni->lpni_pref_nnids == 0) {
1086                 rc = -ENOENT;
1087         } else {
1088                 rc = -EPERM;
1089         }
1090         spin_unlock(&lpni->lpni_lock);
1091
1092         CDEBUG(D_NET, "peer %s: %d\n",
1093                libcfs_nid2str(lpni->lpni_nid), rc);
1094         return rc;
1095 }
1096
1097 void
1098 lnet_peer_ni_set_selection_priority(struct lnet_peer_ni *lpni, __u32 priority)
1099 {
1100         lpni->lpni_sel_priority = priority;
1101 }
1102
1103 /*
1104  * Clear the preferred NIDs from a non-multi-rail peer.
1105  */
1106 void
1107 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
1108 {
1109         struct lnet_peer_ni *lpni = NULL;
1110
1111         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
1112                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
1113 }
1114
1115 int
1116 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
1117 {
1118         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
1119         struct lnet_nid_list *ne1 = NULL;
1120         struct lnet_nid_list *ne2 = NULL;
1121         lnet_nid_t tmp_nid = LNET_NID_ANY;
1122         int rc = 0;
1123
1124         if (nid == LNET_NID_ANY) {
1125                 rc = -EINVAL;
1126                 goto out;
1127         }
1128
1129         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
1130                 rc = -EEXIST;
1131                 goto out;
1132         }
1133
1134         /* A non-MR node may have only one preferred NI per peer_ni */
1135         if (lpni->lpni_pref_nnids > 0 &&
1136             !(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1137                 rc = -EPERM;
1138                 goto out;
1139         }
1140
1141         /* add the new preferred nid to the list of preferred nids */
1142         if (lpni->lpni_pref_nnids != 0) {
1143                 size_t alloc_size = sizeof(*ne1);
1144
1145                 if (lpni->lpni_pref_nnids == 1) {
1146                         tmp_nid = lpni->lpni_pref.nid;
1147                         INIT_LIST_HEAD(&lpni->lpni_pref.nids);
1148                 }
1149
1150                 list_for_each_entry(ne1, &lpni->lpni_pref.nids, nl_list) {
1151                         if (ne1->nl_nid == nid) {
1152                                 rc = -EEXIST;
1153                                 goto out;
1154                         }
1155                 }
1156
1157                 LIBCFS_CPT_ALLOC(ne1, lnet_cpt_table(), lpni->lpni_cpt,
1158                                  alloc_size);
1159                 if (!ne1) {
1160                         rc = -ENOMEM;
1161                         goto out;
1162                 }
1163
1164                 /* move the originally stored nid to the list */
1165                 if (lpni->lpni_pref_nnids == 1) {
1166                         LIBCFS_CPT_ALLOC(ne2, lnet_cpt_table(),
1167                                 lpni->lpni_cpt, alloc_size);
1168                         if (!ne2) {
1169                                 rc = -ENOMEM;
1170                                 goto out;
1171                         }
1172                         INIT_LIST_HEAD(&ne2->nl_list);
1173                         ne2->nl_nid = tmp_nid;
1174                 }
1175                 ne1->nl_nid = nid;
1176         }
1177
1178         lnet_net_lock(LNET_LOCK_EX);
1179         spin_lock(&lpni->lpni_lock);
1180         if (lpni->lpni_pref_nnids == 0) {
1181                 lpni->lpni_pref.nid = nid;
1182         } else {
1183                 if (ne2)
1184                         list_add_tail(&ne2->nl_list, &lpni->lpni_pref.nids);
1185                 list_add_tail(&ne1->nl_list, &lpni->lpni_pref.nids);
1186         }
1187         lpni->lpni_pref_nnids++;
1188         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1189         spin_unlock(&lpni->lpni_lock);
1190         lnet_net_unlock(LNET_LOCK_EX);
1191
1192 out:
1193         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
1194                 spin_lock(&lpni->lpni_lock);
1195                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1196                 spin_unlock(&lpni->lpni_lock);
1197         }
1198         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1199                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
1200         return rc;
1201 }
1202
1203 int
1204 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
1205 {
1206         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
1207         struct lnet_nid_list *ne = NULL;
1208         int rc = 0;
1209
1210         if (lpni->lpni_pref_nnids == 0) {
1211                 rc = -ENOENT;
1212                 goto out;
1213         }
1214
1215         if (lpni->lpni_pref_nnids == 1) {
1216                 if (lpni->lpni_pref.nid != nid) {
1217                         rc = -ENOENT;
1218                         goto out;
1219                 }
1220         } else {
1221                 list_for_each_entry(ne, &lpni->lpni_pref.nids, nl_list) {
1222                         if (ne->nl_nid == nid)
1223                                 goto remove_nid_entry;
1224                 }
1225                 rc = -ENOENT;
1226                 ne = NULL;
1227                 goto out;
1228         }
1229
1230 remove_nid_entry:
1231         lnet_net_lock(LNET_LOCK_EX);
1232         spin_lock(&lpni->lpni_lock);
1233         if (lpni->lpni_pref_nnids == 1)
1234                 lpni->lpni_pref.nid = LNET_NID_ANY;
1235         else {
1236                 list_del_init(&ne->nl_list);
1237                 if (lpni->lpni_pref_nnids == 2) {
1238                         struct lnet_nid_list *ne, *tmp;
1239
1240                         list_for_each_entry_safe(ne, tmp,
1241                                                  &lpni->lpni_pref.nids,
1242                                                  nl_list) {
1243                                 lpni->lpni_pref.nid = ne->nl_nid;
1244                                 list_del_init(&ne->nl_list);
1245                                 LIBCFS_FREE(ne, sizeof(*ne));
1246                         }
1247                 }
1248         }
1249         lpni->lpni_pref_nnids--;
1250         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1251         spin_unlock(&lpni->lpni_lock);
1252         lnet_net_unlock(LNET_LOCK_EX);
1253
1254         if (ne)
1255                 LIBCFS_FREE(ne, sizeof(*ne));
1256 out:
1257         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1258                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
1259         return rc;
1260 }
1261
1262 void
1263 lnet_peer_clr_pref_nids(struct lnet_peer_ni *lpni)
1264 {
1265         struct list_head zombies;
1266         struct lnet_nid_list *ne;
1267         struct lnet_nid_list *tmp;
1268
1269         INIT_LIST_HEAD(&zombies);
1270
1271         lnet_net_lock(LNET_LOCK_EX);
1272         if (lpni->lpni_pref_nnids == 1)
1273                 lpni->lpni_pref.nid = LNET_NID_ANY;
1274         else if (lpni->lpni_pref_nnids > 1)
1275                 list_splice_init(&lpni->lpni_pref.nids, &zombies);
1276         lpni->lpni_pref_nnids = 0;
1277         lnet_net_unlock(LNET_LOCK_EX);
1278
1279         list_for_each_entry_safe(ne, tmp, &zombies, nl_list) {
1280                 list_del_init(&ne->nl_list);
1281                 LIBCFS_FREE(ne, sizeof(*ne));
1282         }
1283 }
1284
1285 lnet_nid_t
1286 lnet_peer_primary_nid_locked(lnet_nid_t nid)
1287 {
1288         struct lnet_peer_ni *lpni;
1289         lnet_nid_t primary_nid = nid;
1290
1291         lpni = lnet_find_peer_ni_locked(nid);
1292         if (lpni) {
1293                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
1294                 lnet_peer_ni_decref_locked(lpni);
1295         }
1296
1297         return primary_nid;
1298 }
1299
1300 bool
1301 lnet_is_discovery_disabled_locked(struct lnet_peer *lp)
1302 __must_hold(&lp->lp_lock)
1303 {
1304         if (lnet_peer_discovery_disabled)
1305                 return true;
1306
1307         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL) ||
1308             (lp->lp_state & LNET_PEER_NO_DISCOVERY)) {
1309                 return true;
1310         }
1311
1312         return false;
1313 }
1314
1315 /*
1316  * Peer Discovery
1317  */
1318 bool
1319 lnet_is_discovery_disabled(struct lnet_peer *lp)
1320 {
1321         bool rc = false;
1322
1323         spin_lock(&lp->lp_lock);
1324         rc = lnet_is_discovery_disabled_locked(lp);
1325         spin_unlock(&lp->lp_lock);
1326
1327         return rc;
1328 }
1329
1330 lnet_nid_t
1331 LNetPrimaryNID(lnet_nid_t nid)
1332 {
1333         struct lnet_peer *lp;
1334         struct lnet_peer_ni *lpni;
1335         lnet_nid_t primary_nid = nid;
1336         int rc = 0;
1337         int cpt;
1338
1339         if (nid == LNET_NID_LO_0)
1340                 return LNET_NID_LO_0;
1341
1342         cpt = lnet_net_lock_current();
1343         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1344         if (IS_ERR(lpni)) {
1345                 rc = PTR_ERR(lpni);
1346                 goto out_unlock;
1347         }
1348         lp = lpni->lpni_peer_net->lpn_peer;
1349
1350         while (!lnet_peer_is_uptodate(lp)) {
1351                 spin_lock(&lp->lp_lock);
1352                 /* force a full discovery cycle */
1353                 lp->lp_state |= LNET_PEER_FORCE_PING | LNET_PEER_FORCE_PUSH;
1354                 spin_unlock(&lp->lp_lock);
1355
1356                 rc = lnet_discover_peer_locked(lpni, cpt, true);
1357                 if (rc)
1358                         goto out_decref;
1359                 /* The lpni (or lp) for this NID may have changed and our ref is
1360                  * the only thing keeping the old one around. Release the ref
1361                  * and lookup the lpni again
1362                  */
1363                 lnet_peer_ni_decref_locked(lpni);
1364                 lpni = lnet_find_peer_ni_locked(nid);
1365                 if (!lpni) {
1366                         rc = -ENOENT;
1367                         goto out_unlock;
1368                 }
1369                 lp = lpni->lpni_peer_net->lpn_peer;
1370
1371                 /* Only try once if discovery is disabled */
1372                 if (lnet_is_discovery_disabled(lp))
1373                         break;
1374         }
1375         primary_nid = lp->lp_primary_nid;
1376 out_decref:
1377         lnet_peer_ni_decref_locked(lpni);
1378 out_unlock:
1379         lnet_net_unlock(cpt);
1380
1381         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
1382                libcfs_nid2str(primary_nid), rc);
1383         return primary_nid;
1384 }
1385 EXPORT_SYMBOL(LNetPrimaryNID);
1386
1387 struct lnet_peer_net *
1388 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
1389 {
1390         struct lnet_peer_net *peer_net;
1391         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
1392                 if (peer_net->lpn_net_id == net_id)
1393                         return peer_net;
1394         }
1395         return NULL;
1396 }
1397
1398 /*
1399  * Attach a peer_ni to a peer_net and peer. This function assumes
1400  * peer_ni is not already attached to the peer_net/peer. The peer_ni
1401  * may be attached to a different peer, in which case it will be
1402  * properly detached first. The whole operation is done atomically.
1403  *
1404  * This function consumes the reference on lpni and Always returns 0.
1405  * This is the last function called from functions that do return an
1406  * int, so returning 0 here allows the compiler to do a tail call.
1407  */
1408 static int
1409 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
1410                                 struct lnet_peer_net *lpn,
1411                                 struct lnet_peer_ni *lpni,
1412                                 unsigned flags)
1413 {
1414         struct lnet_peer_table *ptable;
1415         bool new_lpn = false;
1416         int rc;
1417
1418         /* Install the new peer_ni */
1419         lnet_net_lock(LNET_LOCK_EX);
1420         /* Add peer_ni to global peer table hash, if necessary. */
1421         if (list_empty(&lpni->lpni_hashlist)) {
1422                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1423
1424                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1425                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1426                 ptable->pt_version++;
1427                 lnet_peer_ni_addref_locked(lpni);
1428         }
1429
1430         /* Detach the peer_ni from an existing peer, if necessary. */
1431         if (lpni->lpni_peer_net) {
1432                 LASSERT(lpni->lpni_peer_net != lpn);
1433                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1434                 lnet_peer_detach_peer_ni_locked(lpni);
1435                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1436                 lpni->lpni_peer_net = NULL;
1437         }
1438
1439         /* Add peer_ni to peer_net */
1440         lpni->lpni_peer_net = lpn;
1441         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1442         lnet_update_peer_net_healthv(lpni);
1443         lnet_peer_net_addref_locked(lpn);
1444
1445         /* Add peer_net to peer */
1446         if (!lpn->lpn_peer) {
1447                 new_lpn = true;
1448                 lpn->lpn_peer = lp;
1449                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1450                 lnet_peer_addref_locked(lp);
1451         }
1452
1453         /* Add peer to global peer list, if necessary */
1454         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1455         if (list_empty(&lp->lp_peer_list)) {
1456                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1457                 ptable->pt_peers++;
1458         }
1459
1460
1461         /* Update peer state */
1462         spin_lock(&lp->lp_lock);
1463         if (flags & LNET_PEER_CONFIGURED) {
1464                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1465                         lp->lp_state |= LNET_PEER_CONFIGURED;
1466         }
1467         if (flags & LNET_PEER_MULTI_RAIL) {
1468                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1469                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1470                         lnet_peer_clr_non_mr_pref_nids(lp);
1471                 }
1472         }
1473         spin_unlock(&lp->lp_lock);
1474
1475         lp->lp_nnis++;
1476
1477         /* apply UDSPs */
1478         if (new_lpn) {
1479                 rc = lnet_udsp_apply_policies_on_lpn(lpn);
1480                 if (rc)
1481                         CERROR("Failed to apply UDSPs on lpn %s\n",
1482                                libcfs_net2str(lpn->lpn_net_id));
1483         }
1484         rc = lnet_udsp_apply_policies_on_lpni(lpni);
1485         if (rc)
1486                 CERROR("Failed to apply UDSPs on lpni %s\n",
1487                        libcfs_nid2str(lpni->lpni_nid));
1488
1489         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1490                libcfs_nid2str(lp->lp_primary_nid),
1491                libcfs_nid2str(lpni->lpni_nid), flags);
1492         lnet_peer_ni_decref_locked(lpni);
1493         lnet_net_unlock(LNET_LOCK_EX);
1494
1495         return 0;
1496 }
1497
1498 /*
1499  * Create a new peer, with nid as its primary nid.
1500  *
1501  * Call with the lnet_api_mutex held.
1502  */
1503 static int
1504 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1505 {
1506         struct lnet_peer *lp;
1507         struct lnet_peer_net *lpn;
1508         struct lnet_peer_ni *lpni;
1509         int rc = 0;
1510
1511         LASSERT(nid != LNET_NID_ANY);
1512
1513         /*
1514          * No need for the lnet_net_lock here, because the
1515          * lnet_api_mutex is held.
1516          */
1517         lpni = lnet_find_peer_ni_locked(nid);
1518         if (lpni) {
1519                 /* A peer with this NID already exists. */
1520                 lp = lpni->lpni_peer_net->lpn_peer;
1521                 lnet_peer_ni_decref_locked(lpni);
1522                 /*
1523                  * This is an error if the peer was configured and the
1524                  * primary NID differs or an attempt is made to change
1525                  * the Multi-Rail flag. Otherwise the assumption is
1526                  * that an existing peer is being modified.
1527                  */
1528                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1529                         if (lp->lp_primary_nid != nid)
1530                                 rc = -EEXIST;
1531                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1532                                 rc = -EPERM;
1533                         goto out;
1534                 }
1535                 /* Delete and recreate as a configured peer. */
1536                 lnet_peer_del(lp);
1537         }
1538
1539         /* Create peer, peer_net, and peer_ni. */
1540         rc = -ENOMEM;
1541         lp = lnet_peer_alloc(nid);
1542         if (!lp)
1543                 goto out;
1544         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1545         if (!lpn)
1546                 goto out_free_lp;
1547         lpni = lnet_peer_ni_alloc(nid);
1548         if (!lpni)
1549                 goto out_free_lpn;
1550
1551         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1552
1553 out_free_lpn:
1554         LIBCFS_FREE(lpn, sizeof(*lpn));
1555 out_free_lp:
1556         LIBCFS_FREE(lp, sizeof(*lp));
1557 out:
1558         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1559                libcfs_nid2str(nid), flags, rc);
1560         return rc;
1561 }
1562
1563 /*
1564  * Add a NID to a peer. Call with ln_api_mutex held.
1565  *
1566  * Error codes:
1567  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1568  *  -EEXIST:   The NID was configured by DLC for a different peer.
1569  *  -ENOMEM:   Out of memory.
1570  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1571  *             non-multi-rail peer.
1572  */
1573 static int
1574 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1575 {
1576         struct lnet_peer_net *lpn;
1577         struct lnet_peer_ni *lpni;
1578         int rc = 0;
1579
1580         LASSERT(lp);
1581         LASSERT(nid != LNET_NID_ANY);
1582
1583         /* A configured peer can only be updated through configuration. */
1584         if (!(flags & LNET_PEER_CONFIGURED)) {
1585                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1586                         rc = -EPERM;
1587                         goto out;
1588                 }
1589         }
1590
1591         /*
1592          * The MULTI_RAIL flag can be set but not cleared, because
1593          * that would leave the peer struct in an invalid state.
1594          */
1595         if (flags & LNET_PEER_MULTI_RAIL) {
1596                 spin_lock(&lp->lp_lock);
1597                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1598                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1599                         lnet_peer_clr_non_mr_pref_nids(lp);
1600                 }
1601                 spin_unlock(&lp->lp_lock);
1602         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1603                 rc = -EPERM;
1604                 goto out;
1605         }
1606
1607         lpni = lnet_find_peer_ni_locked(nid);
1608         if (lpni) {
1609                 /*
1610                  * A peer_ni already exists. This is only a problem if
1611                  * it is not connected to this peer and was configured
1612                  * by DLC.
1613                  */
1614                 if (lpni->lpni_peer_net->lpn_peer == lp)
1615                         goto out_free_lpni;
1616                 if (lnet_peer_ni_is_configured(lpni)) {
1617                         rc = -EEXIST;
1618                         goto out_free_lpni;
1619                 }
1620                 /* If this is the primary NID, destroy the peer. */
1621                 if (lnet_peer_ni_is_primary(lpni)) {
1622                         struct lnet_peer *rtr_lp =
1623                                 lpni->lpni_peer_net->lpn_peer;
1624                         int rtr_refcount = rtr_lp->lp_rtr_refcount;
1625                         /*
1626                          * if we're trying to delete a router it means
1627                          * we're moving this peer NI to a new peer so must
1628                          * transfer router properties to the new peer
1629                          */
1630                         if (rtr_refcount > 0) {
1631                                 flags |= LNET_PEER_RTR_NI_FORCE_DEL;
1632                                 lnet_rtr_transfer_to_peer(rtr_lp, lp);
1633                         }
1634                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1635                         lnet_peer_ni_decref_locked(lpni);
1636                         lpni = lnet_peer_ni_alloc(nid);
1637                         if (!lpni) {
1638                                 rc = -ENOMEM;
1639                                 goto out_free_lpni;
1640                         }
1641                 }
1642         } else {
1643                 lpni = lnet_peer_ni_alloc(nid);
1644                 if (!lpni) {
1645                         rc = -ENOMEM;
1646                         goto out_free_lpni;
1647                 }
1648         }
1649
1650         /*
1651          * Get the peer_net. Check that we're not adding a second
1652          * peer_ni on a peer_net of a non-multi-rail peer.
1653          */
1654         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1655         if (!lpn) {
1656                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1657                 if (!lpn) {
1658                         rc = -ENOMEM;
1659                         goto out_free_lpni;
1660                 }
1661         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1662                 rc = -ENOTUNIQ;
1663                 goto out_free_lpni;
1664         }
1665
1666         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1667
1668 out_free_lpni:
1669         lnet_peer_ni_decref_locked(lpni);
1670 out:
1671         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1672                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1673                flags, rc);
1674         return rc;
1675 }
1676
1677 /*
1678  * Update the primary NID of a peer, if possible.
1679  *
1680  * Call with the lnet_api_mutex held.
1681  */
1682 static int
1683 lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1684 {
1685         lnet_nid_t old = lp->lp_primary_nid;
1686         int rc = 0;
1687
1688         if (lp->lp_primary_nid == nid)
1689                 goto out;
1690         rc = lnet_peer_add_nid(lp, nid, flags);
1691         if (rc)
1692                 goto out;
1693         lp->lp_primary_nid = nid;
1694 out:
1695         CDEBUG(D_NET, "peer %s NID %s: %d\n",
1696                libcfs_nid2str(old), libcfs_nid2str(nid), rc);
1697         return rc;
1698 }
1699
1700 /*
1701  * lpni creation initiated due to traffic either sending or receiving.
1702  */
1703 static int
1704 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1705 {
1706         struct lnet_peer *lp;
1707         struct lnet_peer_net *lpn;
1708         struct lnet_peer_ni *lpni;
1709         unsigned flags = 0;
1710         int rc = 0;
1711
1712         if (nid == LNET_NID_ANY) {
1713                 rc = -EINVAL;
1714                 goto out;
1715         }
1716
1717         /* lnet_net_lock is not needed here because ln_api_lock is held */
1718         lpni = lnet_find_peer_ni_locked(nid);
1719         if (lpni) {
1720                 /*
1721                  * We must have raced with another thread. Since we
1722                  * know next to nothing about a peer_ni created by
1723                  * traffic, we just assume everything is ok and
1724                  * return.
1725                  */
1726                 lnet_peer_ni_decref_locked(lpni);
1727                 goto out;
1728         }
1729
1730         /* Create peer, peer_net, and peer_ni. */
1731         rc = -ENOMEM;
1732         lp = lnet_peer_alloc(nid);
1733         if (!lp)
1734                 goto out;
1735         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1736         if (!lpn)
1737                 goto out_free_lp;
1738         lpni = lnet_peer_ni_alloc(nid);
1739         if (!lpni)
1740                 goto out_free_lpn;
1741         if (pref != LNET_NID_ANY)
1742                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1743
1744         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1745
1746 out_free_lpn:
1747         LIBCFS_FREE(lpn, sizeof(*lpn));
1748 out_free_lp:
1749         LIBCFS_FREE(lp, sizeof(*lp));
1750 out:
1751         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1752         return rc;
1753 }
1754
1755 /*
1756  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1757  *
1758  * This API handles the following combinations:
1759  *   Create a peer with its primary NI if only the prim_nid is provided
1760  *   Add a NID to a peer identified by the prim_nid. The peer identified
1761  *   by the prim_nid must already exist.
1762  *   The peer being created may be non-MR.
1763  *
1764  * The caller must hold ln_api_mutex. This prevents the peer from
1765  * being created/modified/deleted by a different thread.
1766  */
1767 int
1768 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1769 {
1770         struct lnet_peer *lp = NULL;
1771         struct lnet_peer_ni *lpni;
1772         unsigned flags;
1773
1774         /* The prim_nid must always be specified */
1775         if (prim_nid == LNET_NID_ANY)
1776                 return -EINVAL;
1777
1778         flags = LNET_PEER_CONFIGURED;
1779         if (mr)
1780                 flags |= LNET_PEER_MULTI_RAIL;
1781
1782         /*
1783          * If nid isn't specified, we must create a new peer with
1784          * prim_nid as its primary nid.
1785          */
1786         if (nid == LNET_NID_ANY)
1787                 return lnet_peer_add(prim_nid, flags);
1788
1789         /* Look up the prim_nid, which must exist. */
1790         lpni = lnet_find_peer_ni_locked(prim_nid);
1791         if (!lpni)
1792                 return -ENOENT;
1793         lnet_peer_ni_decref_locked(lpni);
1794         lp = lpni->lpni_peer_net->lpn_peer;
1795
1796         /* Peer must have been configured. */
1797         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1798                 CDEBUG(D_NET, "peer %s was not configured\n",
1799                        libcfs_nid2str(prim_nid));
1800                 return -ENOENT;
1801         }
1802
1803         /* Primary NID must match */
1804         if (lp->lp_primary_nid != prim_nid) {
1805                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1806                        libcfs_nid2str(prim_nid),
1807                        libcfs_nid2str(lp->lp_primary_nid));
1808                 return -ENODEV;
1809         }
1810
1811         /* Multi-Rail flag must match. */
1812         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1813                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1814                        libcfs_nid2str(prim_nid));
1815                 return -EPERM;
1816         }
1817
1818         return lnet_peer_add_nid(lp, nid, flags);
1819 }
1820
1821 /*
1822  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1823  *
1824  * This API handles the following combinations:
1825  *   Delete a NI from a peer if both prim_nid and nid are provided.
1826  *   Delete a peer if only prim_nid is provided.
1827  *   Delete a peer if its primary nid is provided.
1828  *
1829  * The caller must hold ln_api_mutex. This prevents the peer from
1830  * being modified/deleted by a different thread.
1831  */
1832 int
1833 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1834 {
1835         struct lnet_peer *lp;
1836         struct lnet_peer_ni *lpni;
1837         unsigned flags;
1838
1839         if (prim_nid == LNET_NID_ANY)
1840                 return -EINVAL;
1841
1842         lpni = lnet_find_peer_ni_locked(prim_nid);
1843         if (!lpni)
1844                 return -ENOENT;
1845         lnet_peer_ni_decref_locked(lpni);
1846         lp = lpni->lpni_peer_net->lpn_peer;
1847
1848         if (prim_nid != lp->lp_primary_nid) {
1849                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1850                        libcfs_nid2str(prim_nid),
1851                        libcfs_nid2str(lp->lp_primary_nid));
1852                 return -ENODEV;
1853         }
1854
1855         lnet_net_lock(LNET_LOCK_EX);
1856         if (lp->lp_rtr_refcount > 0) {
1857                 lnet_net_unlock(LNET_LOCK_EX);
1858                 CERROR("%s is a router. Can not be deleted\n",
1859                        libcfs_nid2str(prim_nid));
1860                 return -EBUSY;
1861         }
1862         lnet_net_unlock(LNET_LOCK_EX);
1863
1864         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1865                 return lnet_peer_del(lp);
1866
1867         flags = LNET_PEER_CONFIGURED;
1868         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1869                 flags |= LNET_PEER_MULTI_RAIL;
1870
1871         return lnet_peer_del_nid(lp, nid, flags);
1872 }
1873
1874 void
1875 lnet_destroy_peer_ni_locked(struct kref *ref)
1876 {
1877         struct lnet_peer_ni *lpni = container_of(ref, struct lnet_peer_ni,
1878                                                  lpni_kref);
1879         struct lnet_peer_table *ptable;
1880         struct lnet_peer_net *lpn;
1881
1882         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1883
1884         LASSERT(kref_read(&lpni->lpni_kref) == 0);
1885         LASSERT(list_empty(&lpni->lpni_txq));
1886         LASSERT(lpni->lpni_txqnob == 0);
1887         LASSERT(list_empty(&lpni->lpni_peer_nis));
1888         LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
1889
1890         lpn = lpni->lpni_peer_net;
1891         lpni->lpni_peer_net = NULL;
1892         lpni->lpni_net = NULL;
1893
1894         if (!list_empty(&lpni->lpni_hashlist)) {
1895                 /* remove the peer ni from the zombie list */
1896                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1897                 spin_lock(&ptable->pt_zombie_lock);
1898                 list_del_init(&lpni->lpni_hashlist);
1899                 ptable->pt_zombies--;
1900                 spin_unlock(&ptable->pt_zombie_lock);
1901         }
1902
1903         if (lpni->lpni_pref_nnids > 1) {
1904                 struct lnet_nid_list *ne, *tmp;
1905
1906                 list_for_each_entry_safe(ne, tmp, &lpni->lpni_pref.nids,
1907                                          nl_list) {
1908                         list_del_init(&ne->nl_list);
1909                         LIBCFS_FREE(ne, sizeof(*ne));
1910                 }
1911         }
1912         LIBCFS_FREE(lpni, sizeof(*lpni));
1913
1914         if (lpn)
1915                 lnet_peer_net_decref_locked(lpn);
1916 }
1917
1918 struct lnet_peer_ni *
1919 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1920 {
1921         struct lnet_peer_ni *lpni = NULL;
1922         int rc;
1923
1924         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1925                 return ERR_PTR(-ESHUTDOWN);
1926
1927         /*
1928          * find if a peer_ni already exists.
1929          * If so then just return that.
1930          */
1931         lpni = lnet_find_peer_ni_locked(nid);
1932         if (lpni)
1933                 return lpni;
1934
1935         lnet_net_unlock(cpt);
1936
1937         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1938         if (rc) {
1939                 lpni = ERR_PTR(rc);
1940                 goto out_net_relock;
1941         }
1942
1943         lpni = lnet_find_peer_ni_locked(nid);
1944         LASSERT(lpni);
1945
1946 out_net_relock:
1947         lnet_net_lock(cpt);
1948
1949         return lpni;
1950 }
1951
1952 /*
1953  * Get a peer_ni for the given nid, create it if necessary. Takes a
1954  * hold on the peer_ni.
1955  */
1956 struct lnet_peer_ni *
1957 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1958 {
1959         struct lnet_peer_ni *lpni = NULL;
1960         int rc;
1961
1962         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1963                 return ERR_PTR(-ESHUTDOWN);
1964
1965         /*
1966          * find if a peer_ni already exists.
1967          * If so then just return that.
1968          */
1969         lpni = lnet_find_peer_ni_locked(nid);
1970         if (lpni)
1971                 return lpni;
1972
1973         /*
1974          * Slow path:
1975          * use the lnet_api_mutex to serialize the creation of the peer_ni
1976          * and the creation/deletion of the local ni/net. When a local ni is
1977          * created, if there exists a set of peer_nis on that network,
1978          * they need to be traversed and updated. When a local NI is
1979          * deleted, which could result in a network being deleted, then
1980          * all peer nis on that network need to be removed as well.
1981          *
1982          * Creation through traffic should also be serialized with
1983          * creation through DLC.
1984          */
1985         lnet_net_unlock(cpt);
1986         mutex_lock(&the_lnet.ln_api_mutex);
1987         /*
1988          * Shutdown is only set under the ln_api_lock, so a single
1989          * check here is sufficent.
1990          */
1991         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1992                 lpni = ERR_PTR(-ESHUTDOWN);
1993                 goto out_mutex_unlock;
1994         }
1995
1996         rc = lnet_peer_ni_traffic_add(nid, pref);
1997         if (rc) {
1998                 lpni = ERR_PTR(rc);
1999                 goto out_mutex_unlock;
2000         }
2001
2002         lpni = lnet_find_peer_ni_locked(nid);
2003         LASSERT(lpni);
2004
2005 out_mutex_unlock:
2006         mutex_unlock(&the_lnet.ln_api_mutex);
2007         lnet_net_lock(cpt);
2008
2009         /* Lock has been dropped, check again for shutdown. */
2010         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
2011                 if (!IS_ERR(lpni))
2012                         lnet_peer_ni_decref_locked(lpni);
2013                 lpni = ERR_PTR(-ESHUTDOWN);
2014         }
2015
2016         return lpni;
2017 }
2018
2019 bool
2020 lnet_peer_gw_discovery(struct lnet_peer *lp)
2021 {
2022         bool rc = false;
2023
2024         spin_lock(&lp->lp_lock);
2025         if (lp->lp_state & LNET_PEER_RTR_DISCOVERY)
2026                 rc = true;
2027         spin_unlock(&lp->lp_lock);
2028
2029         return rc;
2030 }
2031
2032 bool
2033 lnet_peer_is_uptodate(struct lnet_peer *lp)
2034 {
2035         bool rc;
2036
2037         spin_lock(&lp->lp_lock);
2038         rc = lnet_peer_is_uptodate_locked(lp);
2039         spin_unlock(&lp->lp_lock);
2040         return rc;
2041 }
2042
2043 /*
2044  * Is a peer uptodate from the point of view of discovery?
2045  *
2046  * If it is currently being processed, obviously not.
2047  * A forced Ping or Push is also handled by the discovery thread.
2048  *
2049  * Otherwise look at whether the peer needs rediscovering.
2050  */
2051 bool
2052 lnet_peer_is_uptodate_locked(struct lnet_peer *lp)
2053 __must_hold(&lp->lp_lock)
2054 {
2055         bool rc;
2056
2057         if (lp->lp_state & (LNET_PEER_DISCOVERING |
2058                             LNET_PEER_FORCE_PING |
2059                             LNET_PEER_FORCE_PUSH)) {
2060                 rc = false;
2061         } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
2062                 rc = false;
2063         } else if (lnet_peer_needs_push(lp)) {
2064                 rc = false;
2065         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
2066                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
2067                         rc = true;
2068                 else
2069                         rc = false;
2070         } else {
2071                 rc = false;
2072         }
2073
2074         return rc;
2075 }
2076
2077 /* Add the message to the peer's lp_dc_pendq and queue the peer for discovery */
2078 void
2079 lnet_peer_queue_message(struct lnet_peer *lp, struct lnet_msg *msg)
2080 {
2081         /* The discovery thread holds net_lock/EX and lp_lock when it splices
2082          * the lp_dc_pendq onto a local list for resending. Thus, we do the same
2083          * when adding to the list and queuing the peer to ensure that we do not
2084          * strand any messages on the lp_dc_pendq. This scheme ensures the
2085          * message will be resent even if the peer is already being discovered.
2086          * Therefore we needn't check the return value of
2087          * lnet_peer_queue_for_discovery(lp).
2088          */
2089         lnet_net_lock(LNET_LOCK_EX);
2090         spin_lock(&lp->lp_lock);
2091         list_add_tail(&msg->msg_list, &lp->lp_dc_pendq);
2092         spin_unlock(&lp->lp_lock);
2093         lnet_peer_queue_for_discovery(lp);
2094         lnet_net_unlock(LNET_LOCK_EX);
2095 }
2096
2097 /*
2098  * Queue a peer for the attention of the discovery thread.  Call with
2099  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
2100  * -EALREADY if the peer was already queued.
2101  */
2102 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
2103 {
2104         int rc;
2105
2106         spin_lock(&lp->lp_lock);
2107         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
2108                 lp->lp_state |= LNET_PEER_DISCOVERING;
2109         spin_unlock(&lp->lp_lock);
2110         if (list_empty(&lp->lp_dc_list)) {
2111                 lnet_peer_addref_locked(lp);
2112                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2113                 wake_up(&the_lnet.ln_dc_waitq);
2114                 rc = 0;
2115         } else {
2116                 rc = -EALREADY;
2117         }
2118
2119         CDEBUG(D_NET, "Queue peer %s: %d\n",
2120                libcfs_nid2str(lp->lp_primary_nid), rc);
2121
2122         return rc;
2123 }
2124
2125 /*
2126  * Discovery of a peer is complete. Wake all waiters on the peer.
2127  * Call with lnet_net_lock/EX held.
2128  */
2129 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
2130 {
2131         struct lnet_msg *msg, *tmp;
2132         int rc = 0;
2133         LIST_HEAD(pending_msgs);
2134
2135         CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n",
2136                libcfs_nid2str(lp->lp_primary_nid));
2137
2138         list_del_init(&lp->lp_dc_list);
2139         spin_lock(&lp->lp_lock);
2140         list_splice_init(&lp->lp_dc_pendq, &pending_msgs);
2141         spin_unlock(&lp->lp_lock);
2142         wake_up(&lp->lp_dc_waitq);
2143
2144         if (lp->lp_rtr_refcount > 0)
2145                 lnet_router_discovery_complete(lp);
2146
2147         lnet_net_unlock(LNET_LOCK_EX);
2148
2149         /* iterate through all pending messages and send them again */
2150         list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
2151                 list_del_init(&msg->msg_list);
2152                 if (lp->lp_dc_error) {
2153                         lnet_finalize(msg, lp->lp_dc_error);
2154                         continue;
2155                 }
2156
2157                 CDEBUG(D_NET, "sending pending message %s to target %s\n",
2158                        lnet_msgtyp2str(msg->msg_type),
2159                        libcfs_id2str(msg->msg_target));
2160                 rc = lnet_send(msg->msg_src_nid_param, msg,
2161                                msg->msg_rtr_nid_param);
2162                 if (rc < 0) {
2163                         CNETERR("Error sending %s to %s: %d\n",
2164                                lnet_msgtyp2str(msg->msg_type),
2165                                libcfs_id2str(msg->msg_target), rc);
2166                         lnet_finalize(msg, rc);
2167                 }
2168         }
2169         lnet_net_lock(LNET_LOCK_EX);
2170         lnet_peer_decref_locked(lp);
2171 }
2172
2173 /*
2174  * Handle inbound push.
2175  * Like any event handler, called with lnet_res_lock/CPT held.
2176  */
2177 void lnet_peer_push_event(struct lnet_event *ev)
2178 {
2179         struct lnet_ping_buffer *pbuf;
2180         struct lnet_peer *lp;
2181
2182         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md_start + ev->offset);
2183
2184         /* lnet_find_peer() adds a refcount */
2185         lp = lnet_find_peer(ev->source.nid);
2186         if (!lp) {
2187                 CDEBUG(D_NET, "Push Put from unknown %s (source %s). Ignoring...\n",
2188                        libcfs_nid2str(ev->initiator.nid),
2189                        libcfs_nid2str(ev->source.nid));
2190                 pbuf->pb_needs_post = true;
2191                 return;
2192         }
2193
2194         /* Ensure peer state remains consistent while we modify it. */
2195         spin_lock(&lp->lp_lock);
2196
2197         /*
2198          * If some kind of error happened the contents of the message
2199          * cannot be used. Clear the NIDS_UPTODATE and set the
2200          * FORCE_PING flag to trigger a ping.
2201          */
2202         if (ev->status) {
2203                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2204                 lp->lp_state |= LNET_PEER_FORCE_PING;
2205                 CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n",
2206                        ev->status,
2207                        libcfs_nid2str(lp->lp_primary_nid),
2208                        libcfs_nid2str(ev->source.nid));
2209                 goto out;
2210         }
2211
2212         /*
2213          * A push with invalid or corrupted info. Clear the UPTODATE
2214          * flag to trigger a ping.
2215          */
2216         if (lnet_ping_info_validate(&pbuf->pb_info)) {
2217                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2218                 lp->lp_state |= LNET_PEER_FORCE_PING;
2219                 CDEBUG(D_NET, "Corrupted Push from %s\n",
2220                        libcfs_nid2str(lp->lp_primary_nid));
2221                 goto out;
2222         }
2223
2224         /*
2225          * Make sure we'll allocate the correct size ping buffer when
2226          * pinging the peer.
2227          */
2228         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
2229                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
2230
2231         /*
2232          * A non-Multi-Rail peer is not supposed to be capable of
2233          * sending a push.
2234          */
2235         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) {
2236                 CERROR("Push from non-Multi-Rail peer %s dropped\n",
2237                        libcfs_nid2str(lp->lp_primary_nid));
2238                 goto out;
2239         }
2240
2241         /*
2242          * The peer may have discovery disabled at its end. Set
2243          * NO_DISCOVERY as appropriate.
2244          */
2245         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
2246                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
2247                        libcfs_nid2str(lp->lp_primary_nid));
2248                 /*
2249                  * Mark the peer for deletion if we already know about it
2250                  * and it's going from discovery set to no discovery set
2251                  */
2252                 if (!(lp->lp_state & (LNET_PEER_NO_DISCOVERY |
2253                                       LNET_PEER_DISCOVERING)) &&
2254                      lp->lp_state & LNET_PEER_DISCOVERED) {
2255                         CDEBUG(D_NET, "Marking %s:0x%x for deletion\n",
2256                                libcfs_nid2str(lp->lp_primary_nid),
2257                                lp->lp_state);
2258                         lp->lp_state |= LNET_PEER_MARK_DELETION;
2259                 }
2260                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
2261         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
2262                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
2263                        libcfs_nid2str(lp->lp_primary_nid));
2264                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
2265         }
2266
2267         /*
2268          * Update the MULTI_RAIL flag based on the push. If the peer
2269          * was configured with DLC then the setting should match what
2270          * DLC put in.
2271          * NB: We verified above that the MR feature bit is set in pi_features
2272          */
2273         if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2274                 CDEBUG(D_NET, "peer %s(%p) is MR\n",
2275                        libcfs_nid2str(lp->lp_primary_nid), lp);
2276         } else if (lp->lp_state & LNET_PEER_CONFIGURED) {
2277                 CWARN("Push says %s is Multi-Rail, DLC says not\n",
2278                       libcfs_nid2str(lp->lp_primary_nid));
2279         } else if (lnet_peer_discovery_disabled) {
2280                 CDEBUG(D_NET, "peer %s(%p) not MR: DD disabled locally\n",
2281                        libcfs_nid2str(lp->lp_primary_nid), lp);
2282         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
2283                 CDEBUG(D_NET, "peer %s(%p) not MR: DD disabled remotely\n",
2284                        libcfs_nid2str(lp->lp_primary_nid), lp);
2285         } else {
2286                 CDEBUG(D_NET, "peer %s(%p) is MR capable\n",
2287                        libcfs_nid2str(lp->lp_primary_nid), lp);
2288                 lp->lp_state |= LNET_PEER_MULTI_RAIL;
2289                 lnet_peer_clr_non_mr_pref_nids(lp);
2290         }
2291
2292         /*
2293          * Check for truncation of the Put message. Clear the
2294          * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping,
2295          * and tell discovery to allocate a bigger buffer.
2296          */
2297         if (ev->mlength < ev->rlength) {
2298                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
2299                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
2300                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2301                 lp->lp_state |= LNET_PEER_FORCE_PING;
2302                 CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n",
2303                        libcfs_nid2str(lp->lp_primary_nid),
2304                        pbuf->pb_info.pi_nnis);
2305                 goto out;
2306         }
2307
2308         /* always assume new data */
2309         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2310         lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2311
2312         /*
2313          * If there is data present that hasn't been processed yet,
2314          * we'll replace it if the Put contained newer data and it
2315          * fits. We're racing with a Ping or earlier Push in this
2316          * case.
2317          */
2318         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2319                 if (LNET_PING_BUFFER_SEQNO(pbuf) >
2320                         LNET_PING_BUFFER_SEQNO(lp->lp_data) &&
2321                     pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) {
2322                         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
2323                                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
2324                         CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n",
2325                               libcfs_nid2str(lp->lp_primary_nid),
2326                               LNET_PING_BUFFER_SEQNO(pbuf),
2327                               LNET_PING_BUFFER_SEQNO(lp->lp_data));
2328                 }
2329                 goto out;
2330         }
2331
2332         /*
2333          * Allocate a buffer to copy the data. On a failure we drop
2334          * the Push and set FORCE_PING to force the discovery
2335          * thread to fix the problem by pinging the peer.
2336          */
2337         lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC);
2338         if (!lp->lp_data) {
2339                 lp->lp_state |= LNET_PEER_FORCE_PING;
2340                 CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n",
2341                        libcfs_nid2str(lp->lp_primary_nid),
2342                        LNET_PING_BUFFER_SEQNO(pbuf));
2343                 goto out;
2344         }
2345
2346         /* Success */
2347         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
2348                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
2349         lp->lp_state |= LNET_PEER_DATA_PRESENT;
2350         CDEBUG(D_NET, "Received Push %s %u\n",
2351                libcfs_nid2str(lp->lp_primary_nid),
2352                LNET_PING_BUFFER_SEQNO(pbuf));
2353
2354 out:
2355         /* We've processed this buffer. It can be reposted */
2356         pbuf->pb_needs_post = true;
2357
2358         /*
2359          * Queue the peer for discovery if not done, force it on the request
2360          * queue and wake the discovery thread if the peer was already queued,
2361          * because its status changed.
2362          */
2363         spin_unlock(&lp->lp_lock);
2364         lnet_net_lock(LNET_LOCK_EX);
2365         if (!lnet_peer_is_uptodate(lp) && lnet_peer_queue_for_discovery(lp)) {
2366                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2367                 wake_up(&the_lnet.ln_dc_waitq);
2368         }
2369         /* Drop refcount from lookup */
2370         lnet_peer_decref_locked(lp);
2371         lnet_net_unlock(LNET_LOCK_EX);
2372 }
2373
2374 /*
2375  * Clear the discovery error state, unless we're already discovering
2376  * this peer, in which case the error is current.
2377  */
2378 static void lnet_peer_clear_discovery_error(struct lnet_peer *lp)
2379 {
2380         spin_lock(&lp->lp_lock);
2381         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
2382                 lp->lp_dc_error = 0;
2383         spin_unlock(&lp->lp_lock);
2384 }
2385
2386 /*
2387  * Peer discovery slow path. The ln_api_mutex is held on entry, and
2388  * dropped/retaken within this function. An lnet_peer_ni is passed in
2389  * because discovery could tear down an lnet_peer.
2390  */
2391 int
2392 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block)
2393 {
2394         DEFINE_WAIT(wait);
2395         struct lnet_peer *lp;
2396         int rc = 0;
2397         int count = 0;
2398
2399 again:
2400         lnet_net_unlock(cpt);
2401         lnet_net_lock(LNET_LOCK_EX);
2402         lp = lpni->lpni_peer_net->lpn_peer;
2403         lnet_peer_clear_discovery_error(lp);
2404
2405         /*
2406          * We're willing to be interrupted. The lpni can become a
2407          * zombie if we race with DLC, so we must check for that.
2408          */
2409         for (;;) {
2410                 /* Keep lp alive when the lnet_net_lock is unlocked */
2411                 lnet_peer_addref_locked(lp);
2412                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
2413                 if (signal_pending(current))
2414                         break;
2415                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2416                         break;
2417                 /*
2418                  * Don't repeat discovery if discovery is disabled. This is
2419                  * done to ensure we can use discovery as a standard ping as
2420                  * well for backwards compatibility with routers which do not
2421                  * have discovery or have discovery disabled
2422                  */
2423                 if (lnet_is_discovery_disabled(lp) && count > 0)
2424                         break;
2425                 if (lp->lp_dc_error)
2426                         break;
2427                 if (lnet_peer_is_uptodate(lp))
2428                         break;
2429                 lnet_peer_queue_for_discovery(lp);
2430                 count++;
2431                 CDEBUG(D_NET, "Discovery attempt # %d\n", count);
2432
2433                 /*
2434                  * If caller requested a non-blocking operation then
2435                  * return immediately. Once discovery is complete any
2436                  * pending messages that were stopped due to discovery
2437                  * will be transmitted.
2438                  */
2439                 if (!block)
2440                         break;
2441
2442                 lnet_net_unlock(LNET_LOCK_EX);
2443                 schedule();
2444                 finish_wait(&lp->lp_dc_waitq, &wait);
2445                 lnet_net_lock(LNET_LOCK_EX);
2446                 lnet_peer_decref_locked(lp);
2447                 /* Peer may have changed */
2448                 lp = lpni->lpni_peer_net->lpn_peer;
2449         }
2450         finish_wait(&lp->lp_dc_waitq, &wait);
2451
2452         lnet_net_unlock(LNET_LOCK_EX);
2453         lnet_net_lock(cpt);
2454         lnet_peer_decref_locked(lp);
2455         /*
2456          * The peer may have changed, so re-check and rediscover if that turns
2457          * out to have been the case. The reference count on lp ensured that
2458          * even if it was unlinked from lpni the memory could not be recycled.
2459          * Thus the check below is sufficient to determine whether the peer
2460          * changed. If the peer changed, then lp must not be dereferenced.
2461          */
2462         if (lp != lpni->lpni_peer_net->lpn_peer)
2463                 goto again;
2464
2465         if (signal_pending(current))
2466                 rc = -EINTR;
2467         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2468                 rc = -ESHUTDOWN;
2469         else if (lp->lp_dc_error)
2470                 rc = lp->lp_dc_error;
2471         else if (!block)
2472                 CDEBUG(D_NET, "non-blocking discovery\n");
2473         else if (!lnet_peer_is_uptodate(lp) && !lnet_is_discovery_disabled(lp))
2474                 goto again;
2475
2476         CDEBUG(D_NET, "peer %s NID %s: %d. %s\n",
2477                (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"),
2478                libcfs_nid2str(lpni->lpni_nid), rc,
2479                (!block) ? "pending discovery" : "discovery complete");
2480
2481         return rc;
2482 }
2483
2484 /* Handle an incoming ack for a push. */
2485 static void
2486 lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev)
2487 {
2488         struct lnet_ping_buffer *pbuf;
2489
2490         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md_start);
2491         spin_lock(&lp->lp_lock);
2492         lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2493         lp->lp_push_error = ev->status;
2494         if (ev->status)
2495                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2496         else
2497                 lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2498         spin_unlock(&lp->lp_lock);
2499
2500         CDEBUG(D_NET, "peer %s ev->status %d\n",
2501                libcfs_nid2str(lp->lp_primary_nid), ev->status);
2502 }
2503
2504 /* Handle a Reply message. This is the reply to a Ping message. */
2505 static void
2506 lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
2507 {
2508         struct lnet_ping_buffer *pbuf;
2509         int rc;
2510
2511         spin_lock(&lp->lp_lock);
2512
2513         lp->lp_disc_src_nid = ev->target.nid;
2514
2515         /*
2516          * If some kind of error happened the contents of message
2517          * cannot be used. Set PING_FAILED to trigger a retry.
2518          */
2519         if (ev->status) {
2520                 lp->lp_state |= LNET_PEER_PING_FAILED;
2521                 lp->lp_ping_error = ev->status;
2522                 CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n",
2523                        ev->status,
2524                        libcfs_nid2str(lp->lp_primary_nid),
2525                        libcfs_nid2str(ev->source.nid));
2526                 goto out;
2527         }
2528
2529         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md_start);
2530         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
2531                 lnet_swap_pinginfo(pbuf);
2532
2533         /*
2534          * A reply with invalid or corrupted info. Set PING_FAILED to
2535          * trigger a retry.
2536          */
2537         rc = lnet_ping_info_validate(&pbuf->pb_info);
2538         if (rc) {
2539                 lp->lp_state |= LNET_PEER_PING_FAILED;
2540                 lp->lp_ping_error = 0;
2541                 CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n",
2542                        libcfs_nid2str(lp->lp_primary_nid), rc);
2543                 goto out;
2544         }
2545
2546
2547         /*
2548          * The peer may have discovery disabled at its end. Set
2549          * NO_DISCOVERY as appropriate.
2550          */
2551         if ((pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY) &&
2552             !lnet_peer_discovery_disabled) {
2553                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
2554                        libcfs_nid2str(lp->lp_primary_nid));
2555                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
2556         } else {
2557                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
2558                        libcfs_nid2str(lp->lp_primary_nid));
2559                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
2560         }
2561
2562         /*
2563          * Update the MULTI_RAIL flag based on the reply. If the peer
2564          * was configured with DLC then the setting should match what
2565          * DLC put in.
2566          */
2567         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
2568                 if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2569                         CDEBUG(D_NET, "peer %s(%p) is MR\n",
2570                                libcfs_nid2str(lp->lp_primary_nid), lp);
2571                 } else if (lp->lp_state & LNET_PEER_CONFIGURED) {
2572                         CWARN("Reply says %s is Multi-Rail, DLC says not\n",
2573                               libcfs_nid2str(lp->lp_primary_nid));
2574                 } else if (lnet_peer_discovery_disabled) {
2575                         CDEBUG(D_NET,
2576                                "peer %s(%p) not MR: DD disabled locally\n",
2577                                libcfs_nid2str(lp->lp_primary_nid), lp);
2578                 } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
2579                         CDEBUG(D_NET,
2580                                "peer %s(%p) not MR: DD disabled remotely\n",
2581                                libcfs_nid2str(lp->lp_primary_nid), lp);
2582                 } else {
2583                         CDEBUG(D_NET, "peer %s(%p) is MR capable\n",
2584                                libcfs_nid2str(lp->lp_primary_nid), lp);
2585                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
2586                         lnet_peer_clr_non_mr_pref_nids(lp);
2587                 }
2588         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2589                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
2590                         CWARN("DLC says %s is Multi-Rail, Reply says not\n",
2591                               libcfs_nid2str(lp->lp_primary_nid));
2592                 } else {
2593                         CERROR("Multi-Rail state vanished from %s\n",
2594                                libcfs_nid2str(lp->lp_primary_nid));
2595                         lp->lp_state &= ~LNET_PEER_MULTI_RAIL;
2596                 }
2597         }
2598
2599         /*
2600          * Make sure we'll allocate the correct size ping buffer when
2601          * pinging the peer.
2602          */
2603         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
2604                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
2605
2606         /*
2607          * Check for truncation of the Reply. Clear PING_SENT and set
2608          * PING_FAILED to trigger a retry.
2609          */
2610         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
2611                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
2612                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
2613                 lp->lp_state |= LNET_PEER_PING_FAILED;
2614                 lp->lp_ping_error = 0;
2615                 CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n",
2616                        libcfs_nid2str(lp->lp_primary_nid),
2617                        pbuf->pb_info.pi_nnis);
2618                 goto out;
2619         }
2620
2621         /*
2622          * Check the sequence numbers in the reply. These are only
2623          * available if the reply came from a Multi-Rail peer.
2624          */
2625         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
2626             pbuf->pb_info.pi_nnis > 1 &&
2627             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
2628                 if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno)
2629                         CDEBUG(D_NET, "peer %s: seq# got %u have %u. peer rebooted?\n",
2630                                 libcfs_nid2str(lp->lp_primary_nid),
2631                                 LNET_PING_BUFFER_SEQNO(pbuf),
2632                                 lp->lp_peer_seqno);
2633
2634                 lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2635         }
2636
2637         /* We're happy with the state of the data in the buffer. */
2638         CDEBUG(D_NET, "peer %s data present %u. state = 0x%x\n",
2639                libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno, lp->lp_state);
2640         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
2641                 lnet_ping_buffer_decref(lp->lp_data);
2642         else
2643                 lp->lp_state |= LNET_PEER_DATA_PRESENT;
2644         lnet_ping_buffer_addref(pbuf);
2645         lp->lp_data = pbuf;
2646 out:
2647         lp->lp_state &= ~LNET_PEER_PING_SENT;
2648         spin_unlock(&lp->lp_lock);
2649
2650         lnet_net_lock(LNET_LOCK_EX);
2651         /*
2652          * If this peer is a gateway, call the routing callback to
2653          * handle the ping reply
2654          */
2655         if (lp->lp_rtr_refcount > 0)
2656                 lnet_router_discovery_ping_reply(lp);
2657         lnet_net_unlock(LNET_LOCK_EX);
2658 }
2659
2660 /*
2661  * Send event handling. Only matters for error cases, where we clean
2662  * up state on the peer and peer_ni that would otherwise be updated in
2663  * the REPLY event handler for a successful Ping, and the ACK event
2664  * handler for a successful Push.
2665  */
2666 static int
2667 lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev)
2668 {
2669         int rc = 0;
2670
2671         if (!ev->status)
2672                 goto out;
2673
2674         spin_lock(&lp->lp_lock);
2675         if (ev->msg_type == LNET_MSG_GET) {
2676                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2677                 lp->lp_state |= LNET_PEER_PING_FAILED;
2678                 lp->lp_ping_error = ev->status;
2679         } else { /* ev->msg_type == LNET_MSG_PUT */
2680                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2681                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2682                 lp->lp_push_error = ev->status;
2683         }
2684         spin_unlock(&lp->lp_lock);
2685         rc = LNET_REDISCOVER_PEER;
2686 out:
2687         CDEBUG(D_NET, "%s Send to %s: %d\n",
2688                 (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"),
2689                 libcfs_nid2str(ev->target.nid), rc);
2690         return rc;
2691 }
2692
2693 /*
2694  * Unlink event handling. This event is only seen if a call to
2695  * LNetMDUnlink() caused the event to be unlinked. If this call was
2696  * made after the event was set up in LNetGet() or LNetPut() then we
2697  * assume the Ping or Push timed out.
2698  */
2699 static void
2700 lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev)
2701 {
2702         spin_lock(&lp->lp_lock);
2703         /* We've passed through LNetGet() */
2704         if (lp->lp_state & LNET_PEER_PING_SENT) {
2705                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2706                 lp->lp_state |= LNET_PEER_PING_FAILED;
2707                 lp->lp_ping_error = -ETIMEDOUT;
2708                 CDEBUG(D_NET, "Ping Unlink for message to peer %s\n",
2709                         libcfs_nid2str(lp->lp_primary_nid));
2710         }
2711         /* We've passed through LNetPut() */
2712         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2713                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2714                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2715                 lp->lp_push_error = -ETIMEDOUT;
2716                 CDEBUG(D_NET, "Push Unlink for message to peer %s\n",
2717                         libcfs_nid2str(lp->lp_primary_nid));
2718         }
2719         spin_unlock(&lp->lp_lock);
2720 }
2721
2722 /*
2723  * Event handler for the discovery EQ.
2724  *
2725  * Called with lnet_res_lock(cpt) held. The cpt is the
2726  * lnet_cpt_of_cookie() of the md handle cookie.
2727  */
2728 static void lnet_discovery_event_handler(struct lnet_event *event)
2729 {
2730         struct lnet_peer *lp = event->md_user_ptr;
2731         struct lnet_ping_buffer *pbuf;
2732         int rc;
2733
2734         /* discovery needs to take another look */
2735         rc = LNET_REDISCOVER_PEER;
2736
2737         CDEBUG(D_NET, "Received event: %d\n", event->type);
2738
2739         switch (event->type) {
2740         case LNET_EVENT_ACK:
2741                 lnet_discovery_event_ack(lp, event);
2742                 break;
2743         case LNET_EVENT_REPLY:
2744                 lnet_discovery_event_reply(lp, event);
2745                 break;
2746         case LNET_EVENT_SEND:
2747                 /* Only send failure triggers a retry. */
2748                 rc = lnet_discovery_event_send(lp, event);
2749                 break;
2750         case LNET_EVENT_UNLINK:
2751                 /* LNetMDUnlink() was called */
2752                 lnet_discovery_event_unlink(lp, event);
2753                 break;
2754         default:
2755                 /* Invalid events. */
2756                 LBUG();
2757         }
2758         lnet_net_lock(LNET_LOCK_EX);
2759         if (event->unlinked) {
2760                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md_start);
2761                 lnet_ping_buffer_decref(pbuf);
2762                 lnet_peer_decref_locked(lp);
2763         }
2764
2765         /* put peer back at end of request queue, if discovery not already
2766          * done */
2767         if (rc == LNET_REDISCOVER_PEER && !lnet_peer_is_uptodate(lp)) {
2768                 list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2769                 wake_up(&the_lnet.ln_dc_waitq);
2770         }
2771         lnet_net_unlock(LNET_LOCK_EX);
2772 }
2773
2774 /*
2775  * Build a peer from incoming data.
2776  *
2777  * The NIDs in the incoming data are supposed to be structured as follows:
2778  *  - loopback
2779  *  - primary NID
2780  *  - other NIDs in same net
2781  *  - NIDs in second net
2782  *  - NIDs in third net
2783  *  - ...
2784  * This due to the way the list of NIDs in the data is created.
2785  *
2786  * Note that this function will mark the peer uptodate unless an
2787  * ENOMEM is encontered. All other errors are due to a conflict
2788  * between the DLC configuration and what discovery sees. We treat DLC
2789  * as binding, and therefore set the NIDS_UPTODATE flag to prevent the
2790  * peer from becoming stuck in discovery.
2791  */
2792 static int lnet_peer_merge_data(struct lnet_peer *lp,
2793                                 struct lnet_ping_buffer *pbuf)
2794 {
2795         struct lnet_peer_ni *lpni;
2796         lnet_nid_t *curnis = NULL;
2797         struct lnet_ni_status *addnis = NULL;
2798         lnet_nid_t *delnis = NULL;
2799         unsigned flags;
2800         int ncurnis;
2801         int naddnis;
2802         int ndelnis;
2803         int nnis = 0;
2804         int i;
2805         int j;
2806         int rc;
2807
2808         flags = LNET_PEER_DISCOVERED;
2809         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2810                 flags |= LNET_PEER_MULTI_RAIL;
2811
2812         /*
2813          * Cache the routing feature for the peer; whether it is enabled
2814          * for disabled as reported by the remote peer.
2815          */
2816         spin_lock(&lp->lp_lock);
2817         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_RTE_DISABLED))
2818                 lp->lp_state |= LNET_PEER_ROUTER_ENABLED;
2819         else
2820                 lp->lp_state &= ~LNET_PEER_ROUTER_ENABLED;
2821         spin_unlock(&lp->lp_lock);
2822
2823         nnis = max_t(int, lp->lp_nnis, pbuf->pb_info.pi_nnis);
2824         CFS_ALLOC_PTR_ARRAY(curnis, nnis);
2825         CFS_ALLOC_PTR_ARRAY(addnis, nnis);
2826         CFS_ALLOC_PTR_ARRAY(delnis, nnis);
2827         if (!curnis || !addnis || !delnis) {
2828                 rc = -ENOMEM;
2829                 goto out;
2830         }
2831         ncurnis = 0;
2832         naddnis = 0;
2833         ndelnis = 0;
2834
2835         /* Construct the list of NIDs present in peer. */
2836         lpni = NULL;
2837         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
2838                 curnis[ncurnis++] = lpni->lpni_nid;
2839
2840         /*
2841          * Check for NIDs in pbuf not present in curnis[].
2842          * The loop starts at 1 to skip the loopback NID.
2843          */
2844         for (i = 1; i < pbuf->pb_info.pi_nnis; i++) {
2845                 for (j = 0; j < ncurnis; j++)
2846                         if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
2847                                 break;
2848                 if (j == ncurnis)
2849                         addnis[naddnis++] = pbuf->pb_info.pi_ni[i];
2850         }
2851         /*
2852          * Check for NIDs in curnis[] not present in pbuf.
2853          * The nested loop starts at 1 to skip the loopback NID.
2854          *
2855          * But never add the loopback NID to delnis[]: if it is
2856          * present in curnis[] then this peer is for this node.
2857          */
2858         for (i = 0; i < ncurnis; i++) {
2859                 if (curnis[i] == LNET_NID_LO_0)
2860                         continue;
2861                 for (j = 1; j < pbuf->pb_info.pi_nnis; j++) {
2862                         if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid) {
2863                                 /*
2864                                  * update the information we cache for the
2865                                  * peer with the latest information we
2866                                  * received
2867                                  */
2868                                 lpni = lnet_find_peer_ni_locked(curnis[i]);
2869                                 if (lpni) {
2870                                         lpni->lpni_ns_status = pbuf->pb_info.pi_ni[j].ns_status;
2871                                         lnet_peer_ni_decref_locked(lpni);
2872                                 }
2873                                 break;
2874                         }
2875                 }
2876                 if (j == pbuf->pb_info.pi_nnis)
2877                         delnis[ndelnis++] = curnis[i];
2878         }
2879
2880         /*
2881          * If we get here and the discovery is disabled then we don't want
2882          * to add or delete any NIs. We just updated the ones we have some
2883          * information on, and call it a day
2884          */
2885         rc = 0;
2886         if (lnet_is_discovery_disabled(lp))
2887                 goto out;
2888
2889         for (i = 0; i < naddnis; i++) {
2890                 rc = lnet_peer_add_nid(lp, addnis[i].ns_nid, flags);
2891                 if (rc) {
2892                         CERROR("Error adding NID %s to peer %s: %d\n",
2893                                libcfs_nid2str(addnis[i].ns_nid),
2894                                libcfs_nid2str(lp->lp_primary_nid), rc);
2895                         if (rc == -ENOMEM)
2896                                 goto out;
2897                 }
2898                 lpni = lnet_find_peer_ni_locked(addnis[i].ns_nid);
2899                 if (lpni) {
2900                         lpni->lpni_ns_status = addnis[i].ns_status;
2901                         lnet_peer_ni_decref_locked(lpni);
2902                 }
2903         }
2904
2905         for (i = 0; i < ndelnis; i++) {
2906                 /*
2907                  * for routers it's okay to delete the primary_nid because
2908                  * the upper layers don't really rely on it. So if we're
2909                  * being told that the router changed its primary_nid
2910                  * then it's okay to delete it.
2911                  */
2912                 if (lp->lp_rtr_refcount > 0)
2913                         flags |= LNET_PEER_RTR_NI_FORCE_DEL;
2914                 rc = lnet_peer_del_nid(lp, delnis[i], flags);
2915                 if (rc) {
2916                         CERROR("Error deleting NID %s from peer %s: %d\n",
2917                                libcfs_nid2str(delnis[i]),
2918                                libcfs_nid2str(lp->lp_primary_nid), rc);
2919                         if (rc == -ENOMEM)
2920                                 goto out;
2921                 }
2922         }
2923         /*
2924          * Errors other than -ENOMEM are due to peers having been
2925          * configured with DLC. Ignore these because DLC overrides
2926          * Discovery.
2927          */
2928         rc = 0;
2929 out:
2930         CFS_FREE_PTR_ARRAY(curnis, nnis);
2931         CFS_FREE_PTR_ARRAY(addnis, nnis);
2932         CFS_FREE_PTR_ARRAY(delnis, nnis);
2933         lnet_ping_buffer_decref(pbuf);
2934         CDEBUG(D_NET, "peer %s (%p): %d\n", libcfs_nid2str(lp->lp_primary_nid), lp, rc);
2935
2936         if (rc) {
2937                 spin_lock(&lp->lp_lock);
2938                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2939                 lp->lp_state |= LNET_PEER_FORCE_PING;
2940                 spin_unlock(&lp->lp_lock);
2941         }
2942         return rc;
2943 }
2944
2945 /*
2946  * The data in pbuf says lp is its primary peer, but the data was
2947  * received by a different peer. Try to update lp with the data.
2948  */
2949 static int
2950 lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
2951 {
2952         struct lnet_handle_md mdh;
2953
2954         /* Queue lp for discovery, and force it on the request queue. */
2955         lnet_net_lock(LNET_LOCK_EX);
2956         if (lnet_peer_queue_for_discovery(lp))
2957                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2958         lnet_net_unlock(LNET_LOCK_EX);
2959
2960         LNetInvalidateMDHandle(&mdh);
2961
2962         /*
2963          * Decide whether we can move the peer to the DATA_PRESENT state.
2964          *
2965          * We replace stale data for a multi-rail peer, repair PING_FAILED
2966          * status, and preempt FORCE_PING.
2967          *
2968          * If after that we have DATA_PRESENT, we merge it into this peer.
2969          */
2970         spin_lock(&lp->lp_lock);
2971         if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2972                 if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) {
2973                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2974                 } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2975                         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2976                         lnet_ping_buffer_decref(pbuf);
2977                         pbuf = lp->lp_data;
2978                         lp->lp_data = NULL;
2979                 }
2980         }
2981         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2982                 lnet_ping_buffer_decref(lp->lp_data);
2983                 lp->lp_data = NULL;
2984                 lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2985         }
2986         if (lp->lp_state & LNET_PEER_PING_FAILED) {
2987                 mdh = lp->lp_ping_mdh;
2988                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2989                 lp->lp_state &= ~LNET_PEER_PING_FAILED;
2990                 lp->lp_ping_error = 0;
2991         }
2992         if (lp->lp_state & LNET_PEER_FORCE_PING)
2993                 lp->lp_state &= ~LNET_PEER_FORCE_PING;
2994         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2995         spin_unlock(&lp->lp_lock);
2996
2997         if (!LNetMDHandleIsInvalid(mdh))
2998                 LNetMDUnlink(mdh);
2999
3000         if (pbuf)
3001                 return lnet_peer_merge_data(lp, pbuf);
3002
3003         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
3004         return 0;
3005 }
3006
3007 static bool lnet_is_nid_in_ping_info(lnet_nid_t nid, struct lnet_ping_info *pinfo)
3008 {
3009         int i;
3010
3011         for (i = 0; i < pinfo->pi_nnis; i++) {
3012                 if (pinfo->pi_ni[i].ns_nid == nid)
3013                         return true;
3014         }
3015
3016         return false;
3017 }
3018
3019 /* Delete a peer that has been marked for deletion. NB: when this peer was added
3020  * to the discovery queue a reference was taken that will prevent the peer from
3021  * actually being freed by this function. After this function exits the
3022  * discovery thread should call lnet_peer_discovery_complete() which will
3023  * drop that reference as well as wake any waiters that may also be holding a
3024  * ref on the peer
3025  */
3026 static int lnet_peer_deletion(struct lnet_peer *lp)
3027 __must_hold(&lp->lp_lock)
3028 {
3029         struct list_head rlist;
3030         struct lnet_route *route, *tmp;
3031         int sensitivity = lp->lp_health_sensitivity;
3032
3033         INIT_LIST_HEAD(&rlist);
3034
3035         lp->lp_state &= ~(LNET_PEER_DISCOVERING | LNET_PEER_FORCE_PING |
3036                           LNET_PEER_FORCE_PUSH);
3037         CDEBUG(D_NET, "peer %s(%p) state %#x\n",
3038                libcfs_nid2str(lp->lp_primary_nid), lp, lp->lp_state);
3039
3040         /* no-op if lnet_peer_del() has already been called on this peer */
3041         if (lp->lp_state & LNET_PEER_MARK_DELETED)
3042                 return 0;
3043
3044         if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
3045                 return -ESHUTDOWN;
3046
3047         spin_unlock(&lp->lp_lock);
3048
3049         mutex_lock(&the_lnet.ln_api_mutex);
3050
3051         lnet_net_lock(LNET_LOCK_EX);
3052         /* remove the peer from the discovery work
3053          * queue if it's on there in preparation
3054          * of deleting it.
3055          */
3056         if (!list_empty(&lp->lp_dc_list))
3057                 list_del(&lp->lp_dc_list);
3058         list_for_each_entry_safe(route, tmp,
3059                                  &lp->lp_routes,
3060                                  lr_gwlist)
3061                 lnet_move_route(route, NULL, &rlist);
3062         lnet_net_unlock(LNET_LOCK_EX);
3063
3064         /* lnet_peer_del() deletes all the peer NIs owned by this peer */
3065         lnet_peer_del(lp);
3066
3067         list_for_each_entry_safe(route, tmp,
3068                                  &rlist, lr_list) {
3069                 /* re-add these routes */
3070                 lnet_add_route(route->lr_net,
3071                                route->lr_hops,
3072                                route->lr_nid,
3073                                route->lr_priority,
3074                                sensitivity);
3075                 LIBCFS_FREE(route, sizeof(*route));
3076         }
3077
3078         mutex_unlock(&the_lnet.ln_api_mutex);
3079
3080         spin_lock(&lp->lp_lock);
3081
3082         return 0;
3083 }
3084
3085 /*
3086  * Update a peer using the data received.
3087  */
3088 static int lnet_peer_data_present(struct lnet_peer *lp)
3089 __must_hold(&lp->lp_lock)
3090 {
3091         struct lnet_ping_buffer *pbuf;
3092         struct lnet_peer_ni *lpni;
3093         lnet_nid_t nid = LNET_NID_ANY;
3094         unsigned flags;
3095         int rc = 0;
3096
3097         pbuf = lp->lp_data;
3098         lp->lp_data = NULL;
3099         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
3100         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
3101         spin_unlock(&lp->lp_lock);
3102
3103         /*
3104          * Modifications of peer structures are done while holding the
3105          * ln_api_mutex. A global lock is required because we may be
3106          * modifying multiple peer structures, and a mutex greatly
3107          * simplifies memory management.
3108          *
3109          * The actual changes to the data structures must also protect
3110          * against concurrent lookups, for which the lnet_net_lock in
3111          * LNET_LOCK_EX mode is used.
3112          */
3113         mutex_lock(&the_lnet.ln_api_mutex);
3114         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
3115                 rc = -ESHUTDOWN;
3116                 goto out;
3117         }
3118
3119         /*
3120          * If this peer is not on the peer list then it is being torn
3121          * down, and our reference count may be all that is keeping it
3122          * alive. Don't do any work on it.
3123          */
3124         if (list_empty(&lp->lp_peer_list))
3125                 goto out;
3126
3127         flags = LNET_PEER_DISCOVERED;
3128         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
3129                 flags |= LNET_PEER_MULTI_RAIL;
3130
3131         /*
3132          * Check whether the primary NID in the message matches the
3133          * primary NID of the peer. If it does, update the peer, if
3134          * it it does not, check whether there is already a peer with
3135          * that primary NID. If no such peer exists, try to update
3136          * the primary NID of the current peer (allowed if it was
3137          * created due to message traffic) and complete the update.
3138          * If the peer did exist, hand off the data to it.
3139          *
3140          * The peer for the loopback interface is a special case: this
3141          * is the peer for the local node, and we want to set its
3142          * primary NID to the correct value here. Moreover, this peer
3143          * can show up with only the loopback NID in the ping buffer.
3144          */
3145         if (pbuf->pb_info.pi_nnis <= 1)
3146                 goto out;
3147         nid = pbuf->pb_info.pi_ni[1].ns_nid;
3148         if (lp->lp_primary_nid == LNET_NID_LO_0) {
3149                 rc = lnet_peer_set_primary_nid(lp, nid, flags);
3150                 if (!rc)
3151                         rc = lnet_peer_merge_data(lp, pbuf);
3152         /*
3153          * if the primary nid of the peer is present in the ping info returned
3154          * from the peer, but it's not the local primary peer we have
3155          * cached and discovery is disabled, then we don't want to update
3156          * our local peer info, by adding or removing NIDs, we just want
3157          * to update the status of the nids that we currently have
3158          * recorded in that peer.
3159          */
3160         } else if (lp->lp_primary_nid == nid ||
3161                    (lnet_is_nid_in_ping_info(lp->lp_primary_nid, &pbuf->pb_info) &&
3162                     lnet_is_discovery_disabled(lp))) {
3163                 rc = lnet_peer_merge_data(lp, pbuf);
3164         } else {
3165                 lpni = lnet_find_peer_ni_locked(nid);
3166                 if (!lpni || lp == lpni->lpni_peer_net->lpn_peer) {
3167                         rc = lnet_peer_set_primary_nid(lp, nid, flags);
3168                         if (rc) {
3169                                 CERROR("Primary NID error %s versus %s: %d\n",
3170                                        libcfs_nid2str(lp->lp_primary_nid),
3171                                        libcfs_nid2str(nid), rc);
3172                         } else {
3173                                 rc = lnet_peer_merge_data(lp, pbuf);
3174                         }
3175                         if (lpni)
3176                                 lnet_peer_ni_decref_locked(lpni);
3177                 } else {
3178                         struct lnet_peer *new_lp;
3179                         new_lp = lpni->lpni_peer_net->lpn_peer;
3180                         /*
3181                          * if lp has discovery/MR enabled that means new_lp
3182                          * should have discovery/MR enabled as well, since
3183                          * it's the same peer, which we're about to merge
3184                          */
3185                         spin_lock(&lp->lp_lock);
3186                         spin_lock(&new_lp->lp_lock);
3187                         if (!(lp->lp_state & LNET_PEER_NO_DISCOVERY))
3188                                 new_lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
3189                         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
3190                                 new_lp->lp_state |= LNET_PEER_MULTI_RAIL;
3191                         /* If we're processing a ping reply then we may be
3192                          * about to send a push to the peer that we ping'd.
3193                          * Since the ping reply that we're processing was
3194                          * received by lp, we need to set the discovery source
3195                          * NID for new_lp to the NID stored in lp.
3196                          */
3197                         if (lp->lp_disc_src_nid != LNET_NID_ANY)
3198                                 new_lp->lp_disc_src_nid = lp->lp_disc_src_nid;
3199                         spin_unlock(&new_lp->lp_lock);
3200                         spin_unlock(&lp->lp_lock);
3201
3202                         rc = lnet_peer_set_primary_data(new_lp, pbuf);
3203                         lnet_consolidate_routes_locked(lp, new_lp);
3204                         lnet_peer_ni_decref_locked(lpni);
3205                 }
3206         }
3207 out:
3208         CDEBUG(D_NET, "peer %s(%p): %d. state = 0x%x\n", libcfs_nid2str(lp->lp_primary_nid), lp, rc,
3209                lp->lp_state);
3210         mutex_unlock(&the_lnet.ln_api_mutex);
3211
3212         spin_lock(&lp->lp_lock);
3213         /* Tell discovery to re-check the peer immediately. */
3214         if (!rc)
3215                 rc = LNET_REDISCOVER_PEER;
3216         return rc;
3217 }
3218
3219 /*
3220  * A ping failed. Clear the PING_FAILED state and set the
3221  * FORCE_PING state, to ensure a retry even if discovery is
3222  * disabled. This avoids being left with incorrect state.
3223  */
3224 static int lnet_peer_ping_failed(struct lnet_peer *lp)
3225 __must_hold(&lp->lp_lock)
3226 {
3227         struct lnet_handle_md mdh;
3228         int rc;
3229
3230         mdh = lp->lp_ping_mdh;
3231         LNetInvalidateMDHandle(&lp->lp_ping_mdh);
3232         lp->lp_state &= ~LNET_PEER_PING_FAILED;
3233         lp->lp_state |= LNET_PEER_FORCE_PING;
3234         rc = lp->lp_ping_error;
3235         lp->lp_ping_error = 0;
3236         spin_unlock(&lp->lp_lock);
3237
3238         if (!LNetMDHandleIsInvalid(mdh))
3239                 LNetMDUnlink(mdh);
3240
3241         CDEBUG(D_NET, "peer %s:%d\n",
3242                libcfs_nid2str(lp->lp_primary_nid), rc);
3243
3244         spin_lock(&lp->lp_lock);
3245         return rc ? rc : LNET_REDISCOVER_PEER;
3246 }
3247
3248 /*
3249  * Select NID to send a Ping or Push to.
3250  */
3251 static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
3252 {
3253         struct lnet_peer_ni *lpni;
3254
3255         /* Look for a direct-connected NID for this peer. */
3256         lpni = NULL;
3257         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
3258                 if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
3259                         continue;
3260                 break;
3261         }
3262         if (lpni)
3263                 return lpni->lpni_nid;
3264
3265         /* Look for a routed-connected NID for this peer. */
3266         lpni = NULL;
3267         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
3268                 if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
3269                         continue;
3270                 break;
3271         }
3272         if (lpni)
3273                 return lpni->lpni_nid;
3274
3275         return LNET_NID_ANY;
3276 }
3277
3278 /* Active side of ping. */
3279 static int lnet_peer_send_ping(struct lnet_peer *lp)
3280 __must_hold(&lp->lp_lock)
3281 {
3282         lnet_nid_t pnid;
3283         int nnis;
3284         int rc;
3285         int cpt;
3286
3287         lp->lp_state |= LNET_PEER_PING_SENT;
3288         lp->lp_state &= ~LNET_PEER_FORCE_PING;
3289         spin_unlock(&lp->lp_lock);
3290
3291         cpt = lnet_net_lock_current();
3292         /* Refcount for MD. */
3293         lnet_peer_addref_locked(lp);
3294         pnid = lnet_peer_select_nid(lp);
3295         lnet_net_unlock(cpt);
3296
3297         nnis = max(lp->lp_data_nnis, LNET_INTERFACES_MIN);
3298
3299         rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp,
3300                             the_lnet.ln_dc_handler, false);
3301
3302         /*
3303          * if LNetMDBind in lnet_send_ping fails we need to decrement the
3304          * refcount on the peer, otherwise LNetMDUnlink will be called
3305          * which will eventually do that.
3306          */
3307         if (rc > 0) {
3308                 lnet_net_lock(cpt);
3309                 lnet_peer_decref_locked(lp);
3310                 lnet_net_unlock(cpt);
3311                 rc = -rc; /* change the rc to negative value */
3312                 goto fail_error;
3313         } else if (rc < 0) {
3314                 goto fail_error;
3315         }
3316
3317         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
3318
3319         spin_lock(&lp->lp_lock);
3320         return 0;
3321
3322 fail_error:
3323         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
3324         /*
3325          * The errors that get us here are considered hard errors and
3326          * cause Discovery to terminate. So we clear PING_SENT, but do
3327          * not set either PING_FAILED or FORCE_PING. In fact we need
3328          * to clear PING_FAILED, because the unlink event handler will
3329          * have set it if we called LNetMDUnlink() above.
3330          */
3331         spin_lock(&lp->lp_lock);
3332         lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED);
3333         return rc;
3334 }
3335
3336 /*
3337  * This function exists because you cannot call LNetMDUnlink() from an
3338  * event handler.
3339  */
3340 static int lnet_peer_push_failed(struct lnet_peer *lp)
3341 __must_hold(&lp->lp_lock)
3342 {
3343         struct lnet_handle_md mdh;
3344         int rc;
3345
3346         mdh = lp->lp_push_mdh;
3347         LNetInvalidateMDHandle(&lp->lp_push_mdh);
3348         lp->lp_state &= ~LNET_PEER_PUSH_FAILED;
3349         rc = lp->lp_push_error;
3350         lp->lp_push_error = 0;
3351         spin_unlock(&lp->lp_lock);
3352
3353         if (!LNetMDHandleIsInvalid(mdh))
3354                 LNetMDUnlink(mdh);
3355
3356         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
3357         spin_lock(&lp->lp_lock);
3358         return rc ? rc : LNET_REDISCOVER_PEER;
3359 }
3360
3361 /*
3362  * Mark the peer as discovered.
3363  */
3364 static int lnet_peer_discovered(struct lnet_peer *lp)
3365 __must_hold(&lp->lp_lock)
3366 {
3367         lp->lp_state |= LNET_PEER_DISCOVERED;
3368         lp->lp_state &= ~(LNET_PEER_DISCOVERING |
3369                           LNET_PEER_REDISCOVER);
3370
3371         lp->lp_dc_error = 0;
3372
3373         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
3374
3375         return 0;
3376 }
3377
3378 /* Active side of push. */
3379 static int lnet_peer_send_push(struct lnet_peer *lp)
3380 __must_hold(&lp->lp_lock)
3381 {
3382         struct lnet_ping_buffer *pbuf;
3383         struct lnet_process_id id;
3384         struct lnet_md md;
3385         int cpt;
3386         int rc;
3387
3388         /* Don't push to a non-multi-rail peer. */
3389         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
3390                 lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
3391                 /* if peer's NIDs are uptodate then peer is discovered */
3392                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) {
3393                         rc = lnet_peer_discovered(lp);
3394                         return rc;
3395                 }
3396
3397                 return 0;
3398         }
3399
3400         lp->lp_state |= LNET_PEER_PUSH_SENT;
3401         lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
3402         spin_unlock(&lp->lp_lock);
3403
3404         cpt = lnet_net_lock_current();
3405         pbuf = the_lnet.ln_ping_target;
3406         lnet_ping_buffer_addref(pbuf);
3407         lnet_net_unlock(cpt);
3408
3409         /* Push source MD */
3410         md.start     = &pbuf->pb_info;
3411         md.length    = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
3412         md.threshold = 2; /* Put/Ack */
3413         md.max_size  = 0;
3414         md.options   = LNET_MD_TRACK_RESPONSE;
3415         md.handler   = the_lnet.ln_dc_handler;
3416         md.user_ptr  = lp;
3417
3418         rc = LNetMDBind(&md, LNET_UNLINK, &lp->lp_push_mdh);
3419         if (rc) {
3420                 lnet_ping_buffer_decref(pbuf);
3421                 CERROR("Can't bind push source MD: %d\n", rc);
3422                 goto fail_error;
3423         }
3424         cpt = lnet_net_lock_current();
3425         /* Refcount for MD. */
3426         lnet_peer_addref_locked(lp);
3427         id.pid = LNET_PID_LUSTRE;
3428         id.nid = lnet_peer_select_nid(lp);
3429         lnet_net_unlock(cpt);
3430
3431         if (id.nid == LNET_NID_ANY) {
3432                 rc = -EHOSTUNREACH;
3433                 goto fail_unlink;
3434         }
3435
3436         rc = LNetPut(lp->lp_disc_src_nid, lp->lp_push_mdh,
3437                      LNET_ACK_REQ, id, LNET_RESERVED_PORTAL,
3438                      LNET_PROTO_PING_MATCHBITS, 0, 0);
3439
3440         /*
3441          * reset the discovery nid. There is no need to restrict sending
3442          * from that source, if we call lnet_push_update_to_peers(). It'll
3443          * get set to a specific NID, if we initiate discovery from the
3444          * scratch
3445          */
3446         lp->lp_disc_src_nid = LNET_NID_ANY;
3447
3448         if (rc)
3449                 goto fail_unlink;
3450
3451         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
3452
3453         spin_lock(&lp->lp_lock);
3454         return 0;
3455
3456 fail_unlink:
3457         LNetMDUnlink(lp->lp_push_mdh);
3458         LNetInvalidateMDHandle(&lp->lp_push_mdh);
3459 fail_error:
3460         CDEBUG(D_NET, "peer %s(%p): %d\n", libcfs_nid2str(lp->lp_primary_nid), lp, rc);
3461         /*
3462          * The errors that get us here are considered hard errors and
3463          * cause Discovery to terminate. So we clear PUSH_SENT, but do
3464          * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED,
3465          * because the unlink event handler will have set it if we
3466          * called LNetMDUnlink() above.
3467          */
3468         spin_lock(&lp->lp_lock);
3469         lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED);
3470         return rc;
3471 }
3472
3473 /*
3474  * An unrecoverable error was encountered during discovery.
3475  * Set error status in peer and abort discovery.
3476  */
3477 static void lnet_peer_discovery_error(struct lnet_peer *lp, int error)
3478 {
3479         CDEBUG(D_NET, "Discovery error %s: %d\n",
3480                libcfs_nid2str(lp->lp_primary_nid), error);
3481
3482         spin_lock(&lp->lp_lock);
3483         lp->lp_dc_error = error;
3484         lp->lp_state &= ~LNET_PEER_DISCOVERING;
3485         lp->lp_state |= LNET_PEER_REDISCOVER;
3486         spin_unlock(&lp->lp_lock);
3487 }
3488
3489 /*
3490  * Wait for work to be queued or some other change that must be
3491  * attended to. Returns non-zero if the discovery thread should shut
3492  * down.
3493  */
3494 static int lnet_peer_discovery_wait_for_work(void)
3495 {
3496         int cpt;
3497         int rc = 0;
3498
3499         DEFINE_WAIT(wait);
3500
3501         cpt = lnet_net_lock_current();
3502         for (;;) {
3503                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
3504                                 TASK_INTERRUPTIBLE);
3505                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3506                         break;
3507                 if (lnet_push_target_resize_needed() ||
3508                     the_lnet.ln_push_target->pb_needs_post)
3509                         break;
3510                 if (!list_empty(&the_lnet.ln_dc_request))
3511                         break;
3512                 if (!list_empty(&the_lnet.ln_msg_resend))
3513                         break;
3514                 lnet_net_unlock(cpt);
3515
3516                 /*
3517                  * wakeup max every second to check if there are peers that
3518                  * have been stuck on the working queue for greater than
3519                  * the peer timeout.
3520                  */
3521                 schedule_timeout(cfs_time_seconds(1));
3522                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
3523                 cpt = lnet_net_lock_current();
3524         }
3525         finish_wait(&the_lnet.ln_dc_waitq, &wait);
3526
3527         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3528                 rc = -ESHUTDOWN;
3529
3530         lnet_net_unlock(cpt);
3531
3532         CDEBUG(D_NET, "woken: %d\n", rc);
3533
3534         return rc;
3535 }
3536
3537 /*
3538  * Messages that were pending on a destroyed peer will be put on a global
3539  * resend list. The message resend list will be checked by
3540  * the discovery thread when it wakes up, and will resend messages. These
3541  * messages can still be sendable in the case the lpni which was the initial
3542  * cause of the message re-queue was transfered to another peer.
3543  *
3544  * It is possible that LNet could be shutdown while we're iterating
3545  * through the list. lnet_shudown_lndnets() will attempt to access the
3546  * resend list, but will have to wait until the spinlock is released, by
3547  * which time there shouldn't be any more messages on the resend list.
3548  * During shutdown lnet_send() will fail and lnet_finalize() will be called
3549  * for the messages so they can be released. The other case is that
3550  * lnet_shudown_lndnets() can finalize all the messages before this
3551  * function can visit the resend list, in which case this function will be
3552  * a no-op.
3553  */
3554 static void lnet_resend_msgs(void)
3555 {
3556         struct lnet_msg *msg, *tmp;
3557         LIST_HEAD(resend);
3558         int rc;
3559
3560         spin_lock(&the_lnet.ln_msg_resend_lock);
3561         list_splice(&the_lnet.ln_msg_resend, &resend);
3562         spin_unlock(&the_lnet.ln_msg_resend_lock);
3563
3564         list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
3565                 list_del_init(&msg->msg_list);
3566                 rc = lnet_send(msg->msg_src_nid_param, msg,
3567                                msg->msg_rtr_nid_param);
3568                 if (rc < 0) {
3569                         CNETERR("Error sending %s to %s: %d\n",
3570                                lnet_msgtyp2str(msg->msg_type),
3571                                libcfs_id2str(msg->msg_target), rc);
3572                         lnet_finalize(msg, rc);
3573                 }
3574         }
3575 }
3576
3577 /* The discovery thread. */
3578 static int lnet_peer_discovery(void *arg)
3579 {
3580         struct lnet_peer *lp;
3581         int rc;
3582
3583         wait_for_completion(&the_lnet.ln_started);
3584
3585         CDEBUG(D_NET, "started\n");
3586
3587         for (;;) {
3588                 if (lnet_peer_discovery_wait_for_work())
3589                         break;
3590
3591                 if (lnet_push_target_resize_needed())
3592                         lnet_push_target_resize();
3593                 else if (the_lnet.ln_push_target->pb_needs_post)
3594                         lnet_push_target_post(the_lnet.ln_push_target,
3595                                               &the_lnet.ln_push_target_md);
3596
3597                 lnet_resend_msgs();
3598
3599                 lnet_net_lock(LNET_LOCK_EX);
3600                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) {
3601                         lnet_net_unlock(LNET_LOCK_EX);
3602                         break;
3603                 }
3604
3605                 /*
3606                  * Process all incoming discovery work requests.  When
3607                  * discovery must wait on a peer to change state, it
3608                  * is added to the tail of the ln_dc_working queue. A
3609                  * timestamp keeps track of when the peer was added,
3610                  * so we can time out discovery requests that take too
3611                  * long.
3612                  */
3613                 while (!list_empty(&the_lnet.ln_dc_request)) {
3614                         lp = list_first_entry(&the_lnet.ln_dc_request,
3615                                               struct lnet_peer, lp_dc_list);
3616                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
3617                         /*
3618                          * set the time the peer was put on the dc_working
3619                          * queue. It shouldn't remain on the queue
3620                          * forever, in case the GET message (for ping)
3621                          * doesn't get a REPLY or the PUT message (for
3622                          * push) doesn't get an ACK.
3623                          */
3624                         lp->lp_last_queued = ktime_get_real_seconds();
3625                         lnet_net_unlock(LNET_LOCK_EX);
3626
3627                         if (lnet_push_target_resize_needed())
3628                                 lnet_push_target_resize();
3629                         else if (the_lnet.ln_push_target->pb_needs_post)
3630                                 lnet_push_target_post(the_lnet.ln_push_target,
3631                                                       &the_lnet.ln_push_target_md);
3632
3633                         /*
3634                          * Select an action depending on the state of
3635                          * the peer and whether discovery is disabled.
3636                          * The check whether discovery is disabled is
3637                          * done after the code that handles processing
3638                          * for arrived data, cleanup for failures, and
3639                          * forcing a Ping or Push.
3640                          */
3641                         spin_lock(&lp->lp_lock);
3642                         CDEBUG(D_NET, "peer %s(%p) state %#x\n",
3643                                 libcfs_nid2str(lp->lp_primary_nid), lp,
3644                                 lp->lp_state);
3645                         if (lp->lp_state & (LNET_PEER_MARK_DELETION |
3646                                             LNET_PEER_MARK_DELETED))
3647                                 rc = lnet_peer_deletion(lp);
3648                         else if (lp->lp_state & LNET_PEER_DATA_PRESENT)
3649                                 rc = lnet_peer_data_present(lp);
3650                         else if (lp->lp_state & LNET_PEER_PING_FAILED)
3651                                 rc = lnet_peer_ping_failed(lp);
3652                         else if (lp->lp_state & LNET_PEER_PUSH_FAILED)
3653                                 rc = lnet_peer_push_failed(lp);
3654                         else if (lp->lp_state & LNET_PEER_FORCE_PING)
3655                                 rc = lnet_peer_send_ping(lp);
3656                         else if (lp->lp_state & LNET_PEER_FORCE_PUSH)
3657                                 rc = lnet_peer_send_push(lp);
3658                         else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
3659                                 rc = lnet_peer_send_ping(lp);
3660                         else if (lnet_peer_needs_push(lp))
3661                                 rc = lnet_peer_send_push(lp);
3662                         else
3663                                 rc = lnet_peer_discovered(lp);
3664                         CDEBUG(D_NET, "peer %s(%p) state %#x rc %d\n",
3665                                 libcfs_nid2str(lp->lp_primary_nid), lp,
3666                                 lp->lp_state, rc);
3667                         spin_unlock(&lp->lp_lock);
3668
3669                         lnet_net_lock(LNET_LOCK_EX);
3670                         if (rc == LNET_REDISCOVER_PEER) {
3671                                 list_move(&lp->lp_dc_list,
3672                                           &the_lnet.ln_dc_request);
3673                         } else if (rc) {
3674                                 lnet_peer_discovery_error(lp, rc);
3675                         }
3676                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
3677                                 lnet_peer_discovery_complete(lp);
3678                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3679                                 break;
3680
3681                 }
3682
3683                 lnet_net_unlock(LNET_LOCK_EX);
3684         }
3685
3686         CDEBUG(D_NET, "stopping\n");
3687         /*
3688          * Clean up before telling lnet_peer_discovery_stop() that
3689          * we're done. Use wake_up() below to somewhat reduce the
3690          * size of the thundering herd if there are multiple threads
3691          * waiting on discovery of a single peer.
3692          */
3693
3694         /* Queue cleanup 1: stop all pending pings and pushes. */
3695         lnet_net_lock(LNET_LOCK_EX);
3696         while (!list_empty(&the_lnet.ln_dc_working)) {
3697                 lp = list_first_entry(&the_lnet.ln_dc_working,
3698                                       struct lnet_peer, lp_dc_list);
3699                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
3700                 lnet_net_unlock(LNET_LOCK_EX);
3701                 lnet_peer_cancel_discovery(lp);
3702                 lnet_net_lock(LNET_LOCK_EX);
3703         }
3704         lnet_net_unlock(LNET_LOCK_EX);
3705
3706         /* Queue cleanup 2: wait for the expired queue to clear. */
3707         while (!list_empty(&the_lnet.ln_dc_expired))
3708                 schedule_timeout_uninterruptible(cfs_time_seconds(1));
3709
3710         /* Queue cleanup 3: clear the request queue. */
3711         lnet_net_lock(LNET_LOCK_EX);
3712         while (!list_empty(&the_lnet.ln_dc_request)) {
3713                 lp = list_first_entry(&the_lnet.ln_dc_request,
3714                                       struct lnet_peer, lp_dc_list);
3715                 lnet_peer_discovery_error(lp, -ESHUTDOWN);
3716                 lnet_peer_discovery_complete(lp);
3717         }
3718         lnet_net_unlock(LNET_LOCK_EX);
3719
3720         lnet_assert_handler_unused(the_lnet.ln_dc_handler);
3721         the_lnet.ln_dc_handler = NULL;
3722
3723         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3724         wake_up(&the_lnet.ln_dc_waitq);
3725
3726         CDEBUG(D_NET, "stopped\n");
3727
3728         return 0;
3729 }
3730
3731 /* ln_api_mutex is held on entry. */
3732 int lnet_peer_discovery_start(void)
3733 {
3734         struct task_struct *task;
3735         int rc = 0;
3736
3737         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
3738                 return -EALREADY;
3739
3740         the_lnet.ln_dc_handler = lnet_discovery_event_handler;
3741         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
3742         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
3743         if (IS_ERR(task)) {
3744                 rc = PTR_ERR(task);
3745                 CERROR("Can't start peer discovery thread: %d\n", rc);
3746
3747                 the_lnet.ln_dc_handler = NULL;
3748
3749                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3750         }
3751
3752         CDEBUG(D_NET, "discovery start: %d\n", rc);
3753
3754         return rc;
3755 }
3756
3757 /* ln_api_mutex is held on entry. */
3758 void lnet_peer_discovery_stop(void)
3759 {
3760         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
3761                 return;
3762
3763         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
3764         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
3765
3766         /* In the LNetNIInit() path we may be stopping discovery before it
3767          * entered its work loop
3768          */
3769         if (!completion_done(&the_lnet.ln_started))
3770                 complete(&the_lnet.ln_started);
3771         else
3772                 wake_up(&the_lnet.ln_dc_waitq);
3773
3774         wait_event(the_lnet.ln_dc_waitq,
3775                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
3776
3777         LASSERT(list_empty(&the_lnet.ln_dc_request));
3778         LASSERT(list_empty(&the_lnet.ln_dc_working));
3779         LASSERT(list_empty(&the_lnet.ln_dc_expired));
3780
3781         CDEBUG(D_NET, "discovery stopped\n");
3782 }
3783
3784 /* Debugging */
3785
3786 void
3787 lnet_debug_peer(lnet_nid_t nid)
3788 {
3789         char                    *aliveness = "NA";
3790         struct lnet_peer_ni     *lp;
3791         int                     cpt;
3792
3793         cpt = lnet_cpt_of_nid(nid, NULL);
3794         lnet_net_lock(cpt);
3795
3796         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
3797         if (IS_ERR(lp)) {
3798                 lnet_net_unlock(cpt);
3799                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
3800                 return;
3801         }
3802
3803         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
3804                 aliveness = (lnet_is_peer_ni_alive(lp)) ? "up" : "down";
3805
3806         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
3807                libcfs_nid2str(lp->lpni_nid), kref_read(&lp->lpni_kref),
3808                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
3809                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
3810                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
3811
3812         lnet_peer_ni_decref_locked(lp);
3813
3814         lnet_net_unlock(cpt);
3815 }
3816
3817 /* Gathering information for userspace. */
3818
3819 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
3820                           char aliveness[LNET_MAX_STR_LEN],
3821                           __u32 *cpt_iter, __u32 *refcount,
3822                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
3823                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
3824                           __u32 *peer_tx_qnob)
3825 {
3826         struct lnet_peer_table          *peer_table;
3827         struct lnet_peer_ni             *lp;
3828         int                             j;
3829         int                             lncpt;
3830         bool                            found = false;
3831
3832         /* get the number of CPTs */
3833         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3834
3835         /* if the cpt number to be examined is >= the number of cpts in
3836          * the system then indicate that there are no more cpts to examin
3837          */
3838         if (*cpt_iter >= lncpt)
3839                 return -ENOENT;
3840
3841         /* get the current table */
3842         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
3843         /* if the ptable is NULL then there are no more cpts to examine */
3844         if (peer_table == NULL)
3845                 return -ENOENT;
3846
3847         lnet_net_lock(*cpt_iter);
3848
3849         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
3850                 struct list_head *peers = &peer_table->pt_hash[j];
3851
3852                 list_for_each_entry(lp, peers, lpni_hashlist) {
3853                         if (peer_index-- > 0)
3854                                 continue;
3855
3856                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
3857                         if (lnet_isrouter(lp) ||
3858                                 lnet_peer_aliveness_enabled(lp))
3859                                 snprintf(aliveness, LNET_MAX_STR_LEN,
3860                                          lnet_is_peer_ni_alive(lp) ? "up" : "down");
3861
3862                         *nid = lp->lpni_nid;
3863                         *refcount = kref_read(&lp->lpni_kref);
3864                         *ni_peer_tx_credits =
3865                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
3866                         *peer_tx_credits = lp->lpni_txcredits;
3867                         *peer_rtr_credits = lp->lpni_rtrcredits;
3868                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
3869                         *peer_tx_qnob = lp->lpni_txqnob;
3870
3871                         found = true;
3872                 }
3873
3874         }
3875         lnet_net_unlock(*cpt_iter);
3876
3877         *cpt_iter = lncpt;
3878
3879         return found ? 0 : -ENOENT;
3880 }
3881
3882 /* ln_api_mutex is held, which keeps the peer list stable */
3883 int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
3884 {
3885         struct lnet_ioctl_element_stats *lpni_stats;
3886         struct lnet_ioctl_element_msg_stats *lpni_msg_stats;
3887         struct lnet_ioctl_peer_ni_hstats *lpni_hstats;
3888         struct lnet_peer_ni_credit_info *lpni_info;
3889         struct lnet_peer_ni *lpni;
3890         struct lnet_peer *lp;
3891         lnet_nid_t nid;
3892         __u32 size;
3893         int rc;
3894
3895         lp = lnet_find_peer(cfg->prcfg_prim_nid);
3896
3897         if (!lp) {
3898                 rc = -ENOENT;
3899                 goto out;
3900         }
3901
3902         size = sizeof(nid) + sizeof(*lpni_info) + sizeof(*lpni_stats)
3903                 + sizeof(*lpni_msg_stats) + sizeof(*lpni_hstats);
3904         size *= lp->lp_nnis;
3905         if (size > cfg->prcfg_size) {
3906                 cfg->prcfg_size = size;
3907                 rc = -E2BIG;
3908                 goto out_lp_decref;
3909         }
3910
3911         cfg->prcfg_prim_nid = lp->lp_primary_nid;
3912         cfg->prcfg_mr = lnet_peer_is_multi_rail(lp);
3913         cfg->prcfg_cfg_nid = lp->lp_primary_nid;
3914         cfg->prcfg_count = lp->lp_nnis;
3915         cfg->prcfg_size = size;
3916         cfg->prcfg_state = lp->lp_state;
3917
3918         /* Allocate helper buffers. */
3919         rc = -ENOMEM;
3920         LIBCFS_ALLOC(lpni_info, sizeof(*lpni_info));
3921         if (!lpni_info)
3922                 goto out_lp_decref;
3923         LIBCFS_ALLOC(lpni_stats, sizeof(*lpni_stats));
3924         if (!lpni_stats)
3925                 goto out_free_info;
3926         LIBCFS_ALLOC(lpni_msg_stats, sizeof(*lpni_msg_stats));
3927         if (!lpni_msg_stats)
3928                 goto out_free_stats;
3929         LIBCFS_ALLOC(lpni_hstats, sizeof(*lpni_hstats));
3930         if (!lpni_hstats)
3931                 goto out_free_msg_stats;
3932
3933
3934         lpni = NULL;
3935         rc = -EFAULT;
3936         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
3937                 nid = lpni->lpni_nid;
3938                 if (copy_to_user(bulk, &nid, sizeof(nid)))
3939                         goto out_free_hstats;
3940                 bulk += sizeof(nid);
3941
3942                 memset(lpni_info, 0, sizeof(*lpni_info));
3943                 snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
3944                 if (lnet_isrouter(lpni) ||
3945                         lnet_peer_aliveness_enabled(lpni))
3946                         snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN,
3947                                 lnet_is_peer_ni_alive(lpni) ? "up" : "down");
3948
3949                 lpni_info->cr_refcount = kref_read(&lpni->lpni_kref);
3950                 lpni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
3951                         lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
3952                 lpni_info->cr_peer_tx_credits = lp