Whamcloud - gitweb
LU-9019 lnet: move ping and delay injection to time64_t
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <linux/sched.h>
38 #ifdef HAVE_SCHED_HEADERS
39 #include <linux/sched/signal.h>
40 #endif
41 #include <linux/uaccess.h>
42
43 #include <lnet/lib-lnet.h>
44 #include <uapi/linux/lnet/lnet-dlc.h>
45
46 /* Value indicating that recovery needs to re-check a peer immediately. */
47 #define LNET_REDISCOVER_PEER    (1)
48
49 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp);
50
51 static void
52 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
53 {
54         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
55                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
56                 lnet_peer_ni_decref_locked(lpni);
57         }
58 }
59
60 void
61 lnet_peer_net_added(struct lnet_net *net)
62 {
63         struct lnet_peer_ni *lpni, *tmp;
64
65         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
66                                  lpni_on_remote_peer_ni_list) {
67
68                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
69                         lpni->lpni_net = net;
70
71                         spin_lock(&lpni->lpni_lock);
72                         lpni->lpni_txcredits =
73                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
74                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
75                         lpni->lpni_rtrcredits =
76                                 lnet_peer_buffer_credits(lpni->lpni_net);
77                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
78                         spin_unlock(&lpni->lpni_lock);
79
80                         lnet_peer_remove_from_remote_list(lpni);
81                 }
82         }
83 }
84
85 static void
86 lnet_peer_tables_destroy(void)
87 {
88         struct lnet_peer_table  *ptable;
89         struct list_head        *hash;
90         int                     i;
91         int                     j;
92
93         if (!the_lnet.ln_peer_tables)
94                 return;
95
96         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
97                 hash = ptable->pt_hash;
98                 if (!hash) /* not intialized */
99                         break;
100
101                 LASSERT(list_empty(&ptable->pt_zombie_list));
102
103                 ptable->pt_hash = NULL;
104                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
105                         LASSERT(list_empty(&hash[j]));
106
107                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
108         }
109
110         cfs_percpt_free(the_lnet.ln_peer_tables);
111         the_lnet.ln_peer_tables = NULL;
112 }
113
114 int
115 lnet_peer_tables_create(void)
116 {
117         struct lnet_peer_table  *ptable;
118         struct list_head        *hash;
119         int                     i;
120         int                     j;
121
122         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
123                                                    sizeof(*ptable));
124         if (the_lnet.ln_peer_tables == NULL) {
125                 CERROR("Failed to allocate cpu-partition peer tables\n");
126                 return -ENOMEM;
127         }
128
129         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
130                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
131                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
132                 if (hash == NULL) {
133                         CERROR("Failed to create peer hash table\n");
134                         lnet_peer_tables_destroy();
135                         return -ENOMEM;
136                 }
137
138                 spin_lock_init(&ptable->pt_zombie_lock);
139                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
140
141                 INIT_LIST_HEAD(&ptable->pt_peer_list);
142
143                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
144                         INIT_LIST_HEAD(&hash[j]);
145                 ptable->pt_hash = hash; /* sign of initialization */
146         }
147
148         return 0;
149 }
150
151 static struct lnet_peer_ni *
152 lnet_peer_ni_alloc(lnet_nid_t nid)
153 {
154         struct lnet_peer_ni *lpni;
155         struct lnet_net *net;
156         int cpt;
157
158         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
159
160         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
161         if (!lpni)
162                 return NULL;
163
164         INIT_LIST_HEAD(&lpni->lpni_txq);
165         INIT_LIST_HEAD(&lpni->lpni_rtrq);
166         INIT_LIST_HEAD(&lpni->lpni_routes);
167         INIT_LIST_HEAD(&lpni->lpni_hashlist);
168         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
169         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
170
171         spin_lock_init(&lpni->lpni_lock);
172
173         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
174         lpni->lpni_last_alive = ktime_get_seconds(); /* assumes alive */
175         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
176         lpni->lpni_nid = nid;
177         lpni->lpni_cpt = cpt;
178         lnet_set_peer_ni_health_locked(lpni, true);
179
180         net = lnet_get_net_locked(LNET_NIDNET(nid));
181         lpni->lpni_net = net;
182         if (net) {
183                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
184                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
185                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
186                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
187         } else {
188                 /*
189                  * This peer_ni is not on a local network, so we
190                  * cannot add the credits here. In case the net is
191                  * added later, add the peer_ni to the remote peer ni
192                  * list so it can be easily found and revisited.
193                  */
194                 /* FIXME: per-net implementation instead? */
195                 atomic_inc(&lpni->lpni_refcount);
196                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
197                               &the_lnet.ln_remote_peer_ni_list);
198         }
199
200         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
201
202         return lpni;
203 }
204
205 static struct lnet_peer_net *
206 lnet_peer_net_alloc(__u32 net_id)
207 {
208         struct lnet_peer_net *lpn;
209
210         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
211         if (!lpn)
212                 return NULL;
213
214         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
215         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
216         lpn->lpn_net_id = net_id;
217
218         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
219
220         return lpn;
221 }
222
223 void
224 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
225 {
226         struct lnet_peer *lp;
227
228         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
229
230         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
231         LASSERT(list_empty(&lpn->lpn_peer_nis));
232         LASSERT(list_empty(&lpn->lpn_peer_nets));
233         lp = lpn->lpn_peer;
234         lpn->lpn_peer = NULL;
235         LIBCFS_FREE(lpn, sizeof(*lpn));
236
237         lnet_peer_decref_locked(lp);
238 }
239
240 static struct lnet_peer *
241 lnet_peer_alloc(lnet_nid_t nid)
242 {
243         struct lnet_peer *lp;
244
245         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
246         if (!lp)
247                 return NULL;
248
249         INIT_LIST_HEAD(&lp->lp_peer_list);
250         INIT_LIST_HEAD(&lp->lp_peer_nets);
251         INIT_LIST_HEAD(&lp->lp_dc_list);
252         INIT_LIST_HEAD(&lp->lp_dc_pendq);
253         init_waitqueue_head(&lp->lp_dc_waitq);
254         spin_lock_init(&lp->lp_lock);
255         lp->lp_primary_nid = nid;
256         /*
257          * Turn off discovery for loopback peer. If you're creating a peer
258          * for the loopback interface then that was initiated when we
259          * attempted to send a message over the loopback. There is no need
260          * to ever use a different interface when sending messages to
261          * myself.
262          */
263         if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
264                 lp->lp_state = LNET_PEER_NO_DISCOVERY;
265         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
266
267         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
268
269         return lp;
270 }
271
272 void
273 lnet_destroy_peer_locked(struct lnet_peer *lp)
274 {
275         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
276
277         LASSERT(atomic_read(&lp->lp_refcount) == 0);
278         LASSERT(list_empty(&lp->lp_peer_nets));
279         LASSERT(list_empty(&lp->lp_peer_list));
280         LASSERT(list_empty(&lp->lp_dc_list));
281
282         if (lp->lp_data)
283                 lnet_ping_buffer_decref(lp->lp_data);
284
285         /*
286          * if there are messages still on the pending queue, then make
287          * sure to queue them on the ln_msg_resend list so they can be
288          * resent at a later point if the discovery thread is still
289          * running.
290          * If the discovery thread has stopped, then the wakeup will be a
291          * no-op, and it is expected the lnet_shutdown_lndnets() will
292          * eventually be called, which will traverse this list and
293          * finalize the messages on the list.
294          * We can not resend them now because we're holding the cpt lock.
295          * Releasing the lock can cause an inconsistent state
296          */
297         spin_lock(&the_lnet.ln_msg_resend_lock);
298         list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
299         spin_unlock(&the_lnet.ln_msg_resend_lock);
300         wake_up(&the_lnet.ln_dc_waitq);
301
302         LIBCFS_FREE(lp, sizeof(*lp));
303 }
304
305 /*
306  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
307  * that peer_net, detach the peer_net from the peer.
308  *
309  * Call with lnet_net_lock/EX held
310  */
311 static void
312 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
313 {
314         struct lnet_peer_table *ptable;
315         struct lnet_peer_net *lpn;
316         struct lnet_peer *lp;
317
318         /*
319          * Belts and suspenders: gracefully handle teardown of a
320          * partially connected peer_ni.
321          */
322         lpn = lpni->lpni_peer_net;
323
324         list_del_init(&lpni->lpni_peer_nis);
325         /*
326          * If there are no lpni's left, we detach lpn from
327          * lp_peer_nets, so it cannot be found anymore.
328          */
329         if (list_empty(&lpn->lpn_peer_nis))
330                 list_del_init(&lpn->lpn_peer_nets);
331
332         /* Update peer NID count. */
333         lp = lpn->lpn_peer;
334         lp->lp_nnis--;
335
336         /*
337          * If there are no more peer nets, make the peer unfindable
338          * via the peer_tables.
339          *
340          * Otherwise, if the peer is DISCOVERED, tell discovery to
341          * take another look at it. This is a no-op if discovery for
342          * this peer did the detaching.
343          */
344         if (list_empty(&lp->lp_peer_nets)) {
345                 list_del_init(&lp->lp_peer_list);
346                 ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
347                 ptable->pt_peers--;
348         } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) {
349                 /* Discovery isn't running, nothing to do here. */
350         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
351                 lnet_peer_queue_for_discovery(lp);
352                 wake_up(&the_lnet.ln_dc_waitq);
353         }
354         CDEBUG(D_NET, "peer %s NID %s\n",
355                 libcfs_nid2str(lp->lp_primary_nid),
356                 libcfs_nid2str(lpni->lpni_nid));
357 }
358
359 /* called with lnet_net_lock LNET_LOCK_EX held */
360 static int
361 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
362 {
363         struct lnet_peer_table *ptable = NULL;
364
365         /* don't remove a peer_ni if it's also a gateway */
366         if (lpni->lpni_rtr_refcount > 0) {
367                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
368                        libcfs_nid2str(lpni->lpni_nid));
369                 return -EBUSY;
370         }
371
372         lnet_peer_remove_from_remote_list(lpni);
373
374         /* remove peer ni from the hash list. */
375         list_del_init(&lpni->lpni_hashlist);
376
377         /* decrement the ref count on the peer table */
378         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
379         LASSERT(ptable->pt_number > 0);
380         ptable->pt_number--;
381
382         /*
383          * The peer_ni can no longer be found with a lookup. But there
384          * can be current users, so keep track of it on the zombie
385          * list until the reference count has gone to zero.
386          *
387          * The last reference may be lost in a place where the
388          * lnet_net_lock locks only a single cpt, and that cpt may not
389          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
390          * has its own lock.
391          */
392         spin_lock(&ptable->pt_zombie_lock);
393         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
394         ptable->pt_zombies++;
395         spin_unlock(&ptable->pt_zombie_lock);
396
397         /* no need to keep this peer_ni on the hierarchy anymore */
398         lnet_peer_detach_peer_ni_locked(lpni);
399
400         /* remove hashlist reference on peer_ni */
401         lnet_peer_ni_decref_locked(lpni);
402
403         return 0;
404 }
405
406 void lnet_peer_uninit(void)
407 {
408         struct lnet_peer_ni *lpni, *tmp;
409
410         lnet_net_lock(LNET_LOCK_EX);
411
412         /* remove all peer_nis from the remote peer and the hash list */
413         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
414                                  lpni_on_remote_peer_ni_list)
415                 lnet_peer_ni_del_locked(lpni);
416
417         lnet_peer_tables_destroy();
418
419         lnet_net_unlock(LNET_LOCK_EX);
420 }
421
422 static int
423 lnet_peer_del_locked(struct lnet_peer *peer)
424 {
425         struct lnet_peer_ni *lpni = NULL, *lpni2;
426         int rc = 0, rc2 = 0;
427
428         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
429
430         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
431         while (lpni != NULL) {
432                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
433                 rc = lnet_peer_ni_del_locked(lpni);
434                 if (rc != 0)
435                         rc2 = rc;
436                 lpni = lpni2;
437         }
438
439         return rc2;
440 }
441
442 static int
443 lnet_peer_del(struct lnet_peer *peer)
444 {
445         lnet_net_lock(LNET_LOCK_EX);
446         lnet_peer_del_locked(peer);
447         lnet_net_unlock(LNET_LOCK_EX);
448
449         return 0;
450 }
451
452 /*
453  * Delete a NID from a peer. Call with ln_api_mutex held.
454  *
455  * Error codes:
456  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
457  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
458  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
459  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
460  */
461 static int
462 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
463 {
464         struct lnet_peer_ni *lpni;
465         lnet_nid_t primary_nid = lp->lp_primary_nid;
466         int rc = 0;
467
468         if (!(flags & LNET_PEER_CONFIGURED)) {
469                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
470                         rc = -EPERM;
471                         goto out;
472                 }
473         }
474         lpni = lnet_find_peer_ni_locked(nid);
475         if (!lpni) {
476                 rc = -ENOENT;
477                 goto out;
478         }
479         lnet_peer_ni_decref_locked(lpni);
480         if (lp != lpni->lpni_peer_net->lpn_peer) {
481                 rc = -ECHILD;
482                 goto out;
483         }
484
485         /*
486          * This function only allows deletion of the primary NID if it
487          * is the only NID.
488          */
489         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
490                 rc = -EBUSY;
491                 goto out;
492         }
493
494         lnet_net_lock(LNET_LOCK_EX);
495         lnet_peer_ni_del_locked(lpni);
496         lnet_net_unlock(LNET_LOCK_EX);
497
498 out:
499         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
500                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
501
502         return rc;
503 }
504
505 static void
506 lnet_peer_table_cleanup_locked(struct lnet_net *net,
507                                struct lnet_peer_table *ptable)
508 {
509         int                      i;
510         struct lnet_peer_ni     *next;
511         struct lnet_peer_ni     *lpni;
512         struct lnet_peer        *peer;
513
514         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
515                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
516                                          lpni_hashlist) {
517                         if (net != NULL && net != lpni->lpni_net)
518                                 continue;
519
520                         peer = lpni->lpni_peer_net->lpn_peer;
521                         if (peer->lp_primary_nid != lpni->lpni_nid) {
522                                 lnet_peer_ni_del_locked(lpni);
523                                 continue;
524                         }
525                         /*
526                          * Removing the primary NID implies removing
527                          * the entire peer. Advance next beyond any
528                          * peer_ni that belongs to the same peer.
529                          */
530                         list_for_each_entry_from(next, &ptable->pt_hash[i],
531                                                  lpni_hashlist) {
532                                 if (next->lpni_peer_net->lpn_peer != peer)
533                                         break;
534                         }
535                         lnet_peer_del_locked(peer);
536                 }
537         }
538 }
539
540 static void
541 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
542 {
543         int     i = 3;
544
545         spin_lock(&ptable->pt_zombie_lock);
546         while (ptable->pt_zombies) {
547                 spin_unlock(&ptable->pt_zombie_lock);
548
549                 if (is_power_of_2(i)) {
550                         CDEBUG(D_WARNING,
551                                "Waiting for %d zombies on peer table\n",
552                                ptable->pt_zombies);
553                 }
554                 set_current_state(TASK_UNINTERRUPTIBLE);
555                 schedule_timeout(cfs_time_seconds(1) >> 1);
556                 spin_lock(&ptable->pt_zombie_lock);
557         }
558         spin_unlock(&ptable->pt_zombie_lock);
559 }
560
561 static void
562 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
563                                 struct lnet_peer_table *ptable)
564 {
565         struct lnet_peer_ni     *lp;
566         struct lnet_peer_ni     *tmp;
567         lnet_nid_t              lpni_nid;
568         int                     i;
569
570         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
571                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
572                                          lpni_hashlist) {
573                         if (net != lp->lpni_net)
574                                 continue;
575
576                         if (lp->lpni_rtr_refcount == 0)
577                                 continue;
578
579                         lpni_nid = lp->lpni_nid;
580
581                         lnet_net_unlock(LNET_LOCK_EX);
582                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
583                         lnet_net_lock(LNET_LOCK_EX);
584                 }
585         }
586 }
587
588 void
589 lnet_peer_tables_cleanup(struct lnet_net *net)
590 {
591         int i;
592         struct lnet_peer_table *ptable;
593
594         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
595         /* If just deleting the peers for a NI, get rid of any routes these
596          * peers are gateways for. */
597         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
598                 lnet_net_lock(LNET_LOCK_EX);
599                 lnet_peer_table_del_rtrs_locked(net, ptable);
600                 lnet_net_unlock(LNET_LOCK_EX);
601         }
602
603         /* Start the cleanup process */
604         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
605                 lnet_net_lock(LNET_LOCK_EX);
606                 lnet_peer_table_cleanup_locked(net, ptable);
607                 lnet_net_unlock(LNET_LOCK_EX);
608         }
609
610         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
611                 lnet_peer_ni_finalize_wait(ptable);
612 }
613
614 static struct lnet_peer_ni *
615 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
616 {
617         struct list_head        *peers;
618         struct lnet_peer_ni     *lp;
619
620         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
621
622         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
623         list_for_each_entry(lp, peers, lpni_hashlist) {
624                 if (lp->lpni_nid == nid) {
625                         lnet_peer_ni_addref_locked(lp);
626                         return lp;
627                 }
628         }
629
630         return NULL;
631 }
632
633 struct lnet_peer_ni *
634 lnet_find_peer_ni_locked(lnet_nid_t nid)
635 {
636         struct lnet_peer_ni *lpni;
637         struct lnet_peer_table *ptable;
638         int cpt;
639
640         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
641
642         ptable = the_lnet.ln_peer_tables[cpt];
643         lpni = lnet_get_peer_ni_locked(ptable, nid);
644
645         return lpni;
646 }
647
648 struct lnet_peer *
649 lnet_find_peer(lnet_nid_t nid)
650 {
651         struct lnet_peer_ni *lpni;
652         struct lnet_peer *lp = NULL;
653         int cpt;
654
655         cpt = lnet_net_lock_current();
656         lpni = lnet_find_peer_ni_locked(nid);
657         if (lpni) {
658                 lp = lpni->lpni_peer_net->lpn_peer;
659                 lnet_peer_addref_locked(lp);
660                 lnet_peer_ni_decref_locked(lpni);
661         }
662         lnet_net_unlock(cpt);
663
664         return lp;
665 }
666
667 struct lnet_peer_ni *
668 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
669                              struct lnet_peer_net *peer_net,
670                              struct lnet_peer_ni *prev)
671 {
672         struct lnet_peer_ni *lpni;
673         struct lnet_peer_net *net = peer_net;
674
675         if (!prev) {
676                 if (!net) {
677                         if (list_empty(&peer->lp_peer_nets))
678                                 return NULL;
679
680                         net = list_entry(peer->lp_peer_nets.next,
681                                          struct lnet_peer_net,
682                                          lpn_peer_nets);
683                 }
684                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
685                                   lpni_peer_nis);
686
687                 return lpni;
688         }
689
690         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
691                 /*
692                  * if you reached the end of the peer ni list and the peer
693                  * net is specified then there are no more peer nis in that
694                  * net.
695                  */
696                 if (net)
697                         return NULL;
698
699                 /*
700                  * we reached the end of this net ni list. move to the
701                  * next net
702                  */
703                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
704                     &peer->lp_peer_nets)
705                         /* no more nets and no more NIs. */
706                         return NULL;
707
708                 /* get the next net */
709                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
710                                  struct lnet_peer_net,
711                                  lpn_peer_nets);
712                 /* get the ni on it */
713                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
714                                   lpni_peer_nis);
715
716                 return lpni;
717         }
718
719         /* there are more nis left */
720         lpni = list_entry(prev->lpni_peer_nis.next,
721                           struct lnet_peer_ni, lpni_peer_nis);
722
723         return lpni;
724 }
725
726 /* Call with the ln_api_mutex held */
727 int lnet_get_peer_list(u32 *countp, u32 *sizep, struct lnet_process_id __user *ids)
728 {
729         struct lnet_process_id id;
730         struct lnet_peer_table *ptable;
731         struct lnet_peer *lp;
732         __u32 count = 0;
733         __u32 size = 0;
734         int lncpt;
735         int cpt;
736         __u32 i;
737         int rc;
738
739         rc = -ESHUTDOWN;
740         if (the_lnet.ln_state != LNET_STATE_RUNNING)
741                 goto done;
742
743         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
744
745         /*
746          * Count the number of peers, and return E2BIG if the buffer
747          * is too small. We'll also return the desired size.
748          */
749         rc = -E2BIG;
750         for (cpt = 0; cpt < lncpt; cpt++) {
751                 ptable = the_lnet.ln_peer_tables[cpt];
752                 count += ptable->pt_peers;
753         }
754         size = count * sizeof(*ids);
755         if (size > *sizep)
756                 goto done;
757
758         /*
759          * Walk the peer lists and copy out the primary nids.
760          * This is safe because the peer lists are only modified
761          * while the ln_api_mutex is held. So we don't need to
762          * hold the lnet_net_lock as well, and can therefore
763          * directly call copy_to_user().
764          */
765         rc = -EFAULT;
766         memset(&id, 0, sizeof(id));
767         id.pid = LNET_PID_LUSTRE;
768         i = 0;
769         for (cpt = 0; cpt < lncpt; cpt++) {
770                 ptable = the_lnet.ln_peer_tables[cpt];
771                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
772                         if (i >= count)
773                                 goto done;
774                         id.nid = lp->lp_primary_nid;
775                         if (copy_to_user(&ids[i], &id, sizeof(id)))
776                                 goto done;
777                         i++;
778                 }
779         }
780         rc = 0;
781 done:
782         *countp = count;
783         *sizep = size;
784         return rc;
785 }
786
787 /*
788  * Start pushes to peers that need to be updated for a configuration
789  * change on this node.
790  */
791 void
792 lnet_push_update_to_peers(int force)
793 {
794         struct lnet_peer_table *ptable;
795         struct lnet_peer *lp;
796         int lncpt;
797         int cpt;
798
799         lnet_net_lock(LNET_LOCK_EX);
800         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
801         for (cpt = 0; cpt < lncpt; cpt++) {
802                 ptable = the_lnet.ln_peer_tables[cpt];
803                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
804                         if (force) {
805                                 spin_lock(&lp->lp_lock);
806                                 if (lp->lp_state & LNET_PEER_MULTI_RAIL)
807                                         lp->lp_state |= LNET_PEER_FORCE_PUSH;
808                                 spin_unlock(&lp->lp_lock);
809                         }
810                         if (lnet_peer_needs_push(lp))
811                                 lnet_peer_queue_for_discovery(lp);
812                 }
813         }
814         lnet_net_unlock(LNET_LOCK_EX);
815         wake_up(&the_lnet.ln_dc_waitq);
816 }
817
818 /*
819  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
820  * this is a preferred point-to-point path. Call with lnet_net_lock in
821  * shared mmode.
822  */
823 bool
824 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
825 {
826         int i;
827
828         if (lpni->lpni_pref_nnids == 0)
829                 return false;
830         if (lpni->lpni_pref_nnids == 1)
831                 return lpni->lpni_pref.nid == nid;
832         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
833                 if (lpni->lpni_pref.nids[i] == nid)
834                         return true;
835         }
836         return false;
837 }
838
839 /*
840  * Set a single ni as preferred, provided no preferred ni is already
841  * defined. Only to be used for non-multi-rail peer_ni.
842  */
843 int
844 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
845 {
846         int rc = 0;
847
848         spin_lock(&lpni->lpni_lock);
849         if (nid == LNET_NID_ANY) {
850                 rc = -EINVAL;
851         } else if (lpni->lpni_pref_nnids > 0) {
852                 rc = -EPERM;
853         } else if (lpni->lpni_pref_nnids == 0) {
854                 lpni->lpni_pref.nid = nid;
855                 lpni->lpni_pref_nnids = 1;
856                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
857         }
858         spin_unlock(&lpni->lpni_lock);
859
860         CDEBUG(D_NET, "peer %s nid %s: %d\n",
861                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
862         return rc;
863 }
864
865 /*
866  * Clear the preferred NID from a non-multi-rail peer_ni, provided
867  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
868  */
869 int
870 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
871 {
872         int rc = 0;
873
874         spin_lock(&lpni->lpni_lock);
875         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
876                 lpni->lpni_pref_nnids = 0;
877                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
878         } else if (lpni->lpni_pref_nnids == 0) {
879                 rc = -ENOENT;
880         } else {
881                 rc = -EPERM;
882         }
883         spin_unlock(&lpni->lpni_lock);
884
885         CDEBUG(D_NET, "peer %s: %d\n",
886                libcfs_nid2str(lpni->lpni_nid), rc);
887         return rc;
888 }
889
890 /*
891  * Clear the preferred NIDs from a non-multi-rail peer.
892  */
893 void
894 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
895 {
896         struct lnet_peer_ni *lpni = NULL;
897
898         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
899                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
900 }
901
902 int
903 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
904 {
905         lnet_nid_t *nids = NULL;
906         lnet_nid_t *oldnids = NULL;
907         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
908         int size;
909         int i;
910         int rc = 0;
911
912         if (nid == LNET_NID_ANY) {
913                 rc = -EINVAL;
914                 goto out;
915         }
916
917         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
918                 rc = -EEXIST;
919                 goto out;
920         }
921
922         /* A non-MR node may have only one preferred NI per peer_ni */
923         if (lpni->lpni_pref_nnids > 0) {
924                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
925                         rc = -EPERM;
926                         goto out;
927                 }
928         }
929
930         if (lpni->lpni_pref_nnids != 0) {
931                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
932                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
933                 if (!nids) {
934                         rc = -ENOMEM;
935                         goto out;
936                 }
937                 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
938                         if (lpni->lpni_pref.nids[i] == nid) {
939                                 LIBCFS_FREE(nids, size);
940                                 rc = -EEXIST;
941                                 goto out;
942                         }
943                         nids[i] = lpni->lpni_pref.nids[i];
944                 }
945                 nids[i] = nid;
946         }
947
948         lnet_net_lock(LNET_LOCK_EX);
949         spin_lock(&lpni->lpni_lock);
950         if (lpni->lpni_pref_nnids == 0) {
951                 lpni->lpni_pref.nid = nid;
952         } else {
953                 oldnids = lpni->lpni_pref.nids;
954                 lpni->lpni_pref.nids = nids;
955         }
956         lpni->lpni_pref_nnids++;
957         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
958         spin_unlock(&lpni->lpni_lock);
959         lnet_net_unlock(LNET_LOCK_EX);
960
961         if (oldnids) {
962                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
963                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
964         }
965 out:
966         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
967                 spin_lock(&lpni->lpni_lock);
968                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
969                 spin_unlock(&lpni->lpni_lock);
970         }
971         CDEBUG(D_NET, "peer %s nid %s: %d\n",
972                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
973         return rc;
974 }
975
976 int
977 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
978 {
979         lnet_nid_t *nids = NULL;
980         lnet_nid_t *oldnids = NULL;
981         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
982         int size;
983         int i, j;
984         int rc = 0;
985
986         if (lpni->lpni_pref_nnids == 0) {
987                 rc = -ENOENT;
988                 goto out;
989         }
990
991         if (lpni->lpni_pref_nnids == 1) {
992                 if (lpni->lpni_pref.nid != nid) {
993                         rc = -ENOENT;
994                         goto out;
995                 }
996         } else if (lpni->lpni_pref_nnids == 2) {
997                 if (lpni->lpni_pref.nids[0] != nid &&
998                     lpni->lpni_pref.nids[1] != nid) {
999                         rc = -ENOENT;
1000                         goto out;
1001                 }
1002         } else {
1003                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
1004                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
1005                 if (!nids) {
1006                         rc = -ENOMEM;
1007                         goto out;
1008                 }
1009                 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
1010                         if (lpni->lpni_pref.nids[i] != nid)
1011                                 continue;
1012                         nids[j++] = lpni->lpni_pref.nids[i];
1013                 }
1014                 /* Check if we actually removed a nid. */
1015                 if (j == lpni->lpni_pref_nnids) {
1016                         LIBCFS_FREE(nids, size);
1017                         rc = -ENOENT;
1018                         goto out;
1019                 }
1020         }
1021
1022         lnet_net_lock(LNET_LOCK_EX);
1023         spin_lock(&lpni->lpni_lock);
1024         if (lpni->lpni_pref_nnids == 1) {
1025                 lpni->lpni_pref.nid = LNET_NID_ANY;
1026         } else if (lpni->lpni_pref_nnids == 2) {
1027                 oldnids = lpni->lpni_pref.nids;
1028                 if (oldnids[0] == nid)
1029                         lpni->lpni_pref.nid = oldnids[1];
1030                 else
1031                         lpni->lpni_pref.nid = oldnids[2];
1032         } else {
1033                 oldnids = lpni->lpni_pref.nids;
1034                 lpni->lpni_pref.nids = nids;
1035         }
1036         lpni->lpni_pref_nnids--;
1037         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1038         spin_unlock(&lpni->lpni_lock);
1039         lnet_net_unlock(LNET_LOCK_EX);
1040
1041         if (oldnids) {
1042                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
1043                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
1044         }
1045 out:
1046         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1047                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
1048         return rc;
1049 }
1050
1051 lnet_nid_t
1052 lnet_peer_primary_nid_locked(lnet_nid_t nid)
1053 {
1054         struct lnet_peer_ni *lpni;
1055         lnet_nid_t primary_nid = nid;
1056
1057         lpni = lnet_find_peer_ni_locked(nid);
1058         if (lpni) {
1059                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
1060                 lnet_peer_ni_decref_locked(lpni);
1061         }
1062
1063         return primary_nid;
1064 }
1065
1066 lnet_nid_t
1067 LNetPrimaryNID(lnet_nid_t nid)
1068 {
1069         struct lnet_peer *lp;
1070         struct lnet_peer_ni *lpni;
1071         lnet_nid_t primary_nid = nid;
1072         int rc = 0;
1073         int cpt;
1074
1075         cpt = lnet_net_lock_current();
1076         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1077         if (IS_ERR(lpni)) {
1078                 rc = PTR_ERR(lpni);
1079                 goto out_unlock;
1080         }
1081         lp = lpni->lpni_peer_net->lpn_peer;
1082         while (!lnet_peer_is_uptodate(lp)) {
1083                 rc = lnet_discover_peer_locked(lpni, cpt, true);
1084                 if (rc)
1085                         goto out_decref;
1086                 lp = lpni->lpni_peer_net->lpn_peer;
1087         }
1088         primary_nid = lp->lp_primary_nid;
1089 out_decref:
1090         lnet_peer_ni_decref_locked(lpni);
1091 out_unlock:
1092         lnet_net_unlock(cpt);
1093
1094         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
1095                libcfs_nid2str(primary_nid), rc);
1096         return primary_nid;
1097 }
1098 EXPORT_SYMBOL(LNetPrimaryNID);
1099
1100 struct lnet_peer_net *
1101 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
1102 {
1103         struct lnet_peer_net *peer_net;
1104         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
1105                 if (peer_net->lpn_net_id == net_id)
1106                         return peer_net;
1107         }
1108         return NULL;
1109 }
1110
1111 /*
1112  * Attach a peer_ni to a peer_net and peer. This function assumes
1113  * peer_ni is not already attached to the peer_net/peer. The peer_ni
1114  * may be attached to a different peer, in which case it will be
1115  * properly detached first. The whole operation is done atomically.
1116  *
1117  * Always returns 0.  This is the last function called from functions
1118  * that do return an int, so returning 0 here allows the compiler to
1119  * do a tail call.
1120  */
1121 static int
1122 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
1123                                 struct lnet_peer_net *lpn,
1124                                 struct lnet_peer_ni *lpni,
1125                                 unsigned flags)
1126 {
1127         struct lnet_peer_table *ptable;
1128
1129         /* Install the new peer_ni */
1130         lnet_net_lock(LNET_LOCK_EX);
1131         /* Add peer_ni to global peer table hash, if necessary. */
1132         if (list_empty(&lpni->lpni_hashlist)) {
1133                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1134
1135                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1136                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1137                 ptable->pt_version++;
1138                 ptable->pt_number++;
1139                 /* This is the 1st refcount on lpni. */
1140                 atomic_inc(&lpni->lpni_refcount);
1141         }
1142
1143         /* Detach the peer_ni from an existing peer, if necessary. */
1144         if (lpni->lpni_peer_net) {
1145                 LASSERT(lpni->lpni_peer_net != lpn);
1146                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1147                 lnet_peer_detach_peer_ni_locked(lpni);
1148                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1149                 lpni->lpni_peer_net = NULL;
1150         }
1151
1152         /* Add peer_ni to peer_net */
1153         lpni->lpni_peer_net = lpn;
1154         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1155         lnet_peer_net_addref_locked(lpn);
1156
1157         /* Add peer_net to peer */
1158         if (!lpn->lpn_peer) {
1159                 lpn->lpn_peer = lp;
1160                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1161                 lnet_peer_addref_locked(lp);
1162         }
1163
1164         /* Add peer to global peer list, if necessary */
1165         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1166         if (list_empty(&lp->lp_peer_list)) {
1167                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1168                 ptable->pt_peers++;
1169         }
1170
1171
1172         /* Update peer state */
1173         spin_lock(&lp->lp_lock);
1174         if (flags & LNET_PEER_CONFIGURED) {
1175                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1176                         lp->lp_state |= LNET_PEER_CONFIGURED;
1177         }
1178         if (flags & LNET_PEER_MULTI_RAIL) {
1179                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1180                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1181                         lnet_peer_clr_non_mr_pref_nids(lp);
1182                 }
1183         }
1184         spin_unlock(&lp->lp_lock);
1185
1186         lp->lp_nnis++;
1187         lnet_net_unlock(LNET_LOCK_EX);
1188
1189         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1190                libcfs_nid2str(lp->lp_primary_nid),
1191                libcfs_nid2str(lpni->lpni_nid), flags);
1192
1193         return 0;
1194 }
1195
1196 /*
1197  * Create a new peer, with nid as its primary nid.
1198  *
1199  * Call with the lnet_api_mutex held.
1200  */
1201 static int
1202 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1203 {
1204         struct lnet_peer *lp;
1205         struct lnet_peer_net *lpn;
1206         struct lnet_peer_ni *lpni;
1207         int rc = 0;
1208
1209         LASSERT(nid != LNET_NID_ANY);
1210
1211         /*
1212          * No need for the lnet_net_lock here, because the
1213          * lnet_api_mutex is held.
1214          */
1215         lpni = lnet_find_peer_ni_locked(nid);
1216         if (lpni) {
1217                 /* A peer with this NID already exists. */
1218                 lp = lpni->lpni_peer_net->lpn_peer;
1219                 lnet_peer_ni_decref_locked(lpni);
1220                 /*
1221                  * This is an error if the peer was configured and the
1222                  * primary NID differs or an attempt is made to change
1223                  * the Multi-Rail flag. Otherwise the assumption is
1224                  * that an existing peer is being modified.
1225                  */
1226                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1227                         if (lp->lp_primary_nid != nid)
1228                                 rc = -EEXIST;
1229                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1230                                 rc = -EPERM;
1231                         goto out;
1232                 }
1233                 /* Delete and recreate as a configured peer. */
1234                 lnet_peer_del(lp);
1235         }
1236
1237         /* Create peer, peer_net, and peer_ni. */
1238         rc = -ENOMEM;
1239         lp = lnet_peer_alloc(nid);
1240         if (!lp)
1241                 goto out;
1242         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1243         if (!lpn)
1244                 goto out_free_lp;
1245         lpni = lnet_peer_ni_alloc(nid);
1246         if (!lpni)
1247                 goto out_free_lpn;
1248
1249         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1250
1251 out_free_lpn:
1252         LIBCFS_FREE(lpn, sizeof(*lpn));
1253 out_free_lp:
1254         LIBCFS_FREE(lp, sizeof(*lp));
1255 out:
1256         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1257                libcfs_nid2str(nid), flags, rc);
1258         return rc;
1259 }
1260
1261 /*
1262  * Add a NID to a peer. Call with ln_api_mutex held.
1263  *
1264  * Error codes:
1265  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1266  *  -EEXIST:   The NID was configured by DLC for a different peer.
1267  *  -ENOMEM:   Out of memory.
1268  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1269  *             non-multi-rail peer.
1270  */
1271 static int
1272 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1273 {
1274         struct lnet_peer_net *lpn;
1275         struct lnet_peer_ni *lpni;
1276         int rc = 0;
1277
1278         LASSERT(lp);
1279         LASSERT(nid != LNET_NID_ANY);
1280
1281         /* A configured peer can only be updated through configuration. */
1282         if (!(flags & LNET_PEER_CONFIGURED)) {
1283                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1284                         rc = -EPERM;
1285                         goto out;
1286                 }
1287         }
1288
1289         /*
1290          * The MULTI_RAIL flag can be set but not cleared, because
1291          * that would leave the peer struct in an invalid state.
1292          */
1293         if (flags & LNET_PEER_MULTI_RAIL) {
1294                 spin_lock(&lp->lp_lock);
1295                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1296                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1297                         lnet_peer_clr_non_mr_pref_nids(lp);
1298                 }
1299                 spin_unlock(&lp->lp_lock);
1300         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1301                 rc = -EPERM;
1302                 goto out;
1303         }
1304
1305         lpni = lnet_find_peer_ni_locked(nid);
1306         if (lpni) {
1307                 /*
1308                  * A peer_ni already exists. This is only a problem if
1309                  * it is not connected to this peer and was configured
1310                  * by DLC.
1311                  */
1312                 lnet_peer_ni_decref_locked(lpni);
1313                 if (lpni->lpni_peer_net->lpn_peer == lp)
1314                         goto out;
1315                 if (lnet_peer_ni_is_configured(lpni)) {
1316                         rc = -EEXIST;
1317                         goto out;
1318                 }
1319                 /* If this is the primary NID, destroy the peer. */
1320                 if (lnet_peer_ni_is_primary(lpni)) {
1321                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1322                         lpni = lnet_peer_ni_alloc(nid);
1323                         if (!lpni) {
1324                                 rc = -ENOMEM;
1325                                 goto out;
1326                         }
1327                 }
1328         } else {
1329                 lpni = lnet_peer_ni_alloc(nid);
1330                 if (!lpni) {
1331                         rc = -ENOMEM;
1332                         goto out;
1333                 }
1334         }
1335
1336         /*
1337          * Get the peer_net. Check that we're not adding a second
1338          * peer_ni on a peer_net of a non-multi-rail peer.
1339          */
1340         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1341         if (!lpn) {
1342                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1343                 if (!lpn) {
1344                         rc = -ENOMEM;
1345                         goto out_free_lpni;
1346                 }
1347         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1348                 rc = -ENOTUNIQ;
1349                 goto out_free_lpni;
1350         }
1351
1352         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1353
1354 out_free_lpni:
1355         /* If the peer_ni was allocated above its peer_net pointer is NULL */
1356         if (!lpni->lpni_peer_net)
1357                 LIBCFS_FREE(lpni, sizeof(*lpni));
1358 out:
1359         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1360                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1361                flags, rc);
1362         return rc;
1363 }
1364
1365 /*
1366  * Update the primary NID of a peer, if possible.
1367  *
1368  * Call with the lnet_api_mutex held.
1369  */
1370 static int
1371 lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1372 {
1373         lnet_nid_t old = lp->lp_primary_nid;
1374         int rc = 0;
1375
1376         if (lp->lp_primary_nid == nid)
1377                 goto out;
1378         rc = lnet_peer_add_nid(lp, nid, flags);
1379         if (rc)
1380                 goto out;
1381         lp->lp_primary_nid = nid;
1382 out:
1383         CDEBUG(D_NET, "peer %s NID %s: %d\n",
1384                libcfs_nid2str(old), libcfs_nid2str(nid), rc);
1385         return rc;
1386 }
1387
1388 /*
1389  * lpni creation initiated due to traffic either sending or receiving.
1390  */
1391 static int
1392 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1393 {
1394         struct lnet_peer *lp;
1395         struct lnet_peer_net *lpn;
1396         struct lnet_peer_ni *lpni;
1397         unsigned flags = 0;
1398         int rc = 0;
1399
1400         if (nid == LNET_NID_ANY) {
1401                 rc = -EINVAL;
1402                 goto out;
1403         }
1404
1405         /* lnet_net_lock is not needed here because ln_api_lock is held */
1406         lpni = lnet_find_peer_ni_locked(nid);
1407         if (lpni) {
1408                 /*
1409                  * We must have raced with another thread. Since we
1410                  * know next to nothing about a peer_ni created by
1411                  * traffic, we just assume everything is ok and
1412                  * return.
1413                  */
1414                 lnet_peer_ni_decref_locked(lpni);
1415                 goto out;
1416         }
1417
1418         /* Create peer, peer_net, and peer_ni. */
1419         rc = -ENOMEM;
1420         lp = lnet_peer_alloc(nid);
1421         if (!lp)
1422                 goto out;
1423         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1424         if (!lpn)
1425                 goto out_free_lp;
1426         lpni = lnet_peer_ni_alloc(nid);
1427         if (!lpni)
1428                 goto out_free_lpn;
1429         if (pref != LNET_NID_ANY)
1430                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1431
1432         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1433
1434 out_free_lpn:
1435         LIBCFS_FREE(lpn, sizeof(*lpn));
1436 out_free_lp:
1437         LIBCFS_FREE(lp, sizeof(*lp));
1438 out:
1439         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1440         return rc;
1441 }
1442
1443 /*
1444  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1445  *
1446  * This API handles the following combinations:
1447  *   Create a peer with its primary NI if only the prim_nid is provided
1448  *   Add a NID to a peer identified by the prim_nid. The peer identified
1449  *   by the prim_nid must already exist.
1450  *   The peer being created may be non-MR.
1451  *
1452  * The caller must hold ln_api_mutex. This prevents the peer from
1453  * being created/modified/deleted by a different thread.
1454  */
1455 int
1456 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1457 {
1458         struct lnet_peer *lp = NULL;
1459         struct lnet_peer_ni *lpni;
1460         unsigned flags;
1461
1462         /* The prim_nid must always be specified */
1463         if (prim_nid == LNET_NID_ANY)
1464                 return -EINVAL;
1465
1466         flags = LNET_PEER_CONFIGURED;
1467         if (mr)
1468                 flags |= LNET_PEER_MULTI_RAIL;
1469
1470         /*
1471          * If nid isn't specified, we must create a new peer with
1472          * prim_nid as its primary nid.
1473          */
1474         if (nid == LNET_NID_ANY)
1475                 return lnet_peer_add(prim_nid, flags);
1476
1477         /* Look up the prim_nid, which must exist. */
1478         lpni = lnet_find_peer_ni_locked(prim_nid);
1479         if (!lpni)
1480                 return -ENOENT;
1481         lnet_peer_ni_decref_locked(lpni);
1482         lp = lpni->lpni_peer_net->lpn_peer;
1483
1484         /* Peer must have been configured. */
1485         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1486                 CDEBUG(D_NET, "peer %s was not configured\n",
1487                        libcfs_nid2str(prim_nid));
1488                 return -ENOENT;
1489         }
1490
1491         /* Primary NID must match */
1492         if (lp->lp_primary_nid != prim_nid) {
1493                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1494                        libcfs_nid2str(prim_nid),
1495                        libcfs_nid2str(lp->lp_primary_nid));
1496                 return -ENODEV;
1497         }
1498
1499         /* Multi-Rail flag must match. */
1500         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1501                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1502                        libcfs_nid2str(prim_nid));
1503                 return -EPERM;
1504         }
1505
1506         return lnet_peer_add_nid(lp, nid, flags);
1507 }
1508
1509 /*
1510  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1511  *
1512  * This API handles the following combinations:
1513  *   Delete a NI from a peer if both prim_nid and nid are provided.
1514  *   Delete a peer if only prim_nid is provided.
1515  *   Delete a peer if its primary nid is provided.
1516  *
1517  * The caller must hold ln_api_mutex. This prevents the peer from
1518  * being modified/deleted by a different thread.
1519  */
1520 int
1521 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1522 {
1523         struct lnet_peer *lp;
1524         struct lnet_peer_ni *lpni;
1525         unsigned flags;
1526
1527         if (prim_nid == LNET_NID_ANY)
1528                 return -EINVAL;
1529
1530         lpni = lnet_find_peer_ni_locked(prim_nid);
1531         if (!lpni)
1532                 return -ENOENT;
1533         lnet_peer_ni_decref_locked(lpni);
1534         lp = lpni->lpni_peer_net->lpn_peer;
1535
1536         if (prim_nid != lp->lp_primary_nid) {
1537                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1538                        libcfs_nid2str(prim_nid),
1539                        libcfs_nid2str(lp->lp_primary_nid));
1540                 return -ENODEV;
1541         }
1542
1543         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1544                 return lnet_peer_del(lp);
1545
1546         flags = LNET_PEER_CONFIGURED;
1547         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1548                 flags |= LNET_PEER_MULTI_RAIL;
1549
1550         return lnet_peer_del_nid(lp, nid, flags);
1551 }
1552
1553 void
1554 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1555 {
1556         struct lnet_peer_table *ptable;
1557         struct lnet_peer_net *lpn;
1558
1559         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1560
1561         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1562         LASSERT(lpni->lpni_rtr_refcount == 0);
1563         LASSERT(list_empty(&lpni->lpni_txq));
1564         LASSERT(lpni->lpni_txqnob == 0);
1565         LASSERT(list_empty(&lpni->lpni_peer_nis));
1566         LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
1567
1568         lpn = lpni->lpni_peer_net;
1569         lpni->lpni_peer_net = NULL;
1570         lpni->lpni_net = NULL;
1571
1572         /* remove the peer ni from the zombie list */
1573         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1574         spin_lock(&ptable->pt_zombie_lock);
1575         list_del_init(&lpni->lpni_hashlist);
1576         ptable->pt_zombies--;
1577         spin_unlock(&ptable->pt_zombie_lock);
1578
1579         if (lpni->lpni_pref_nnids > 1) {
1580                 LIBCFS_FREE(lpni->lpni_pref.nids,
1581                         sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1582         }
1583         LIBCFS_FREE(lpni, sizeof(*lpni));
1584
1585         lnet_peer_net_decref_locked(lpn);
1586 }
1587
1588 struct lnet_peer_ni *
1589 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1590 {
1591         struct lnet_peer_ni *lpni = NULL;
1592         int rc;
1593
1594         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1595                 return ERR_PTR(-ESHUTDOWN);
1596
1597         /*
1598          * find if a peer_ni already exists.
1599          * If so then just return that.
1600          */
1601         lpni = lnet_find_peer_ni_locked(nid);
1602         if (lpni)
1603                 return lpni;
1604
1605         lnet_net_unlock(cpt);
1606
1607         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1608         if (rc) {
1609                 lpni = ERR_PTR(rc);
1610                 goto out_net_relock;
1611         }
1612
1613         lpni = lnet_find_peer_ni_locked(nid);
1614         LASSERT(lpni);
1615
1616 out_net_relock:
1617         lnet_net_lock(cpt);
1618
1619         return lpni;
1620 }
1621
1622 /*
1623  * Get a peer_ni for the given nid, create it if necessary. Takes a
1624  * hold on the peer_ni.
1625  */
1626 struct lnet_peer_ni *
1627 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1628 {
1629         struct lnet_peer_ni *lpni = NULL;
1630         int rc;
1631
1632         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1633                 return ERR_PTR(-ESHUTDOWN);
1634
1635         /*
1636          * find if a peer_ni already exists.
1637          * If so then just return that.
1638          */
1639         lpni = lnet_find_peer_ni_locked(nid);
1640         if (lpni)
1641                 return lpni;
1642
1643         /*
1644          * Slow path:
1645          * use the lnet_api_mutex to serialize the creation of the peer_ni
1646          * and the creation/deletion of the local ni/net. When a local ni is
1647          * created, if there exists a set of peer_nis on that network,
1648          * they need to be traversed and updated. When a local NI is
1649          * deleted, which could result in a network being deleted, then
1650          * all peer nis on that network need to be removed as well.
1651          *
1652          * Creation through traffic should also be serialized with
1653          * creation through DLC.
1654          */
1655         lnet_net_unlock(cpt);
1656         mutex_lock(&the_lnet.ln_api_mutex);
1657         /*
1658          * Shutdown is only set under the ln_api_lock, so a single
1659          * check here is sufficent.
1660          */
1661         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1662                 lpni = ERR_PTR(-ESHUTDOWN);
1663                 goto out_mutex_unlock;
1664         }
1665
1666         rc = lnet_peer_ni_traffic_add(nid, pref);
1667         if (rc) {
1668                 lpni = ERR_PTR(rc);
1669                 goto out_mutex_unlock;
1670         }
1671
1672         lpni = lnet_find_peer_ni_locked(nid);
1673         LASSERT(lpni);
1674
1675 out_mutex_unlock:
1676         mutex_unlock(&the_lnet.ln_api_mutex);
1677         lnet_net_lock(cpt);
1678
1679         /* Lock has been dropped, check again for shutdown. */
1680         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1681                 if (!IS_ERR(lpni))
1682                         lnet_peer_ni_decref_locked(lpni);
1683                 lpni = ERR_PTR(-ESHUTDOWN);
1684         }
1685
1686         return lpni;
1687 }
1688
1689 /*
1690  * Peer Discovery
1691  */
1692
1693 /*
1694  * Is a peer uptodate from the point of view of discovery?
1695  *
1696  * If it is currently being processed, obviously not.
1697  * A forced Ping or Push is also handled by the discovery thread.
1698  *
1699  * Otherwise look at whether the peer needs rediscovering.
1700  */
1701 bool
1702 lnet_peer_is_uptodate(struct lnet_peer *lp)
1703 {
1704         bool rc;
1705
1706         spin_lock(&lp->lp_lock);
1707         if (lp->lp_state & (LNET_PEER_DISCOVERING |
1708                             LNET_PEER_FORCE_PING |
1709                             LNET_PEER_FORCE_PUSH)) {
1710                 rc = false;
1711         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1712                 rc = true;
1713         } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
1714                 if (lnet_peer_discovery_disabled)
1715                         rc = true;
1716                 else
1717                         rc = false;
1718         } else if (lnet_peer_needs_push(lp)) {
1719                 rc = false;
1720         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1721                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1722                         rc = true;
1723                 else
1724                         rc = false;
1725         } else {
1726                 rc = false;
1727         }
1728         spin_unlock(&lp->lp_lock);
1729
1730         return rc;
1731 }
1732
1733 /*
1734  * Queue a peer for the attention of the discovery thread.  Call with
1735  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1736  * -EALREADY if the peer was already queued.
1737  */
1738 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1739 {
1740         int rc;
1741
1742         spin_lock(&lp->lp_lock);
1743         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1744                 lp->lp_state |= LNET_PEER_DISCOVERING;
1745         spin_unlock(&lp->lp_lock);
1746         if (list_empty(&lp->lp_dc_list)) {
1747                 lnet_peer_addref_locked(lp);
1748                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1749                 wake_up(&the_lnet.ln_dc_waitq);
1750                 rc = 0;
1751         } else {
1752                 rc = -EALREADY;
1753         }
1754
1755         CDEBUG(D_NET, "Queue peer %s: %d\n",
1756                libcfs_nid2str(lp->lp_primary_nid), rc);
1757
1758         return rc;
1759 }
1760
1761 /*
1762  * Discovery of a peer is complete. Wake all waiters on the peer.
1763  * Call with lnet_net_lock/EX held.
1764  */
1765 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1766 {
1767         struct lnet_msg *msg, *tmp;
1768         int rc = 0;
1769         struct list_head pending_msgs;
1770
1771         INIT_LIST_HEAD(&pending_msgs);
1772
1773         CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n",
1774                libcfs_nid2str(lp->lp_primary_nid));
1775
1776         list_del_init(&lp->lp_dc_list);
1777         list_splice_init(&lp->lp_dc_pendq, &pending_msgs);
1778         wake_up_all(&lp->lp_dc_waitq);
1779
1780         lnet_net_unlock(LNET_LOCK_EX);
1781
1782         /* iterate through all pending messages and send them again */
1783         list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
1784                 list_del_init(&msg->msg_list);
1785                 if (lp->lp_dc_error) {
1786                         lnet_finalize(msg, lp->lp_dc_error);
1787                         continue;
1788                 }
1789
1790                 CDEBUG(D_NET, "sending pending message %s to target %s\n",
1791                        lnet_msgtyp2str(msg->msg_type),
1792                        libcfs_id2str(msg->msg_target));
1793                 rc = lnet_send(msg->msg_src_nid_param, msg,
1794                                msg->msg_rtr_nid_param);
1795                 if (rc < 0) {
1796                         CNETERR("Error sending %s to %s: %d\n",
1797                                lnet_msgtyp2str(msg->msg_type),
1798                                libcfs_id2str(msg->msg_target), rc);
1799                         lnet_finalize(msg, rc);
1800                 }
1801         }
1802         lnet_net_lock(LNET_LOCK_EX);
1803         lnet_peer_decref_locked(lp);
1804 }
1805
1806 /*
1807  * Handle inbound push.
1808  * Like any event handler, called with lnet_res_lock/CPT held.
1809  */
1810 void lnet_peer_push_event(struct lnet_event *ev)
1811 {
1812         struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
1813         struct lnet_peer *lp;
1814
1815         /* lnet_find_peer() adds a refcount */
1816         lp = lnet_find_peer(ev->source.nid);
1817         if (!lp) {
1818                 CDEBUG(D_NET, "Push Put from unknown %s (source %s). Ignoring...\n",
1819                        libcfs_nid2str(ev->initiator.nid),
1820                        libcfs_nid2str(ev->source.nid));
1821                 return;
1822         }
1823
1824         /* Ensure peer state remains consistent while we modify it. */
1825         spin_lock(&lp->lp_lock);
1826
1827         /*
1828          * If some kind of error happened the contents of the message
1829          * cannot be used. Clear the NIDS_UPTODATE and set the
1830          * FORCE_PING flag to trigger a ping.
1831          */
1832         if (ev->status) {
1833                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1834                 lp->lp_state |= LNET_PEER_FORCE_PING;
1835                 CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n",
1836                        ev->status,
1837                        libcfs_nid2str(lp->lp_primary_nid),
1838                        libcfs_nid2str(ev->source.nid));
1839                 goto out;
1840         }
1841
1842         /*
1843          * A push with invalid or corrupted info. Clear the UPTODATE
1844          * flag to trigger a ping.
1845          */
1846         if (lnet_ping_info_validate(&pbuf->pb_info)) {
1847                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1848                 lp->lp_state |= LNET_PEER_FORCE_PING;
1849                 CDEBUG(D_NET, "Corrupted Push from %s\n",
1850                        libcfs_nid2str(lp->lp_primary_nid));
1851                 goto out;
1852         }
1853
1854         /*
1855          * Make sure we'll allocate the correct size ping buffer when
1856          * pinging the peer.
1857          */
1858         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
1859                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
1860
1861         /*
1862          * A non-Multi-Rail peer is not supposed to be capable of
1863          * sending a push.
1864          */
1865         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) {
1866                 CERROR("Push from non-Multi-Rail peer %s dropped\n",
1867                        libcfs_nid2str(lp->lp_primary_nid));
1868                 goto out;
1869         }
1870
1871         /*
1872          * Check the MULTIRAIL flag. Complain if the peer was DLC
1873          * configured without it.
1874          */
1875         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1876                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1877                         CERROR("Push says %s is Multi-Rail, DLC says not\n",
1878                                libcfs_nid2str(lp->lp_primary_nid));
1879                 } else {
1880                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1881                         lnet_peer_clr_non_mr_pref_nids(lp);
1882                 }
1883         }
1884
1885         /*
1886          * The peer may have discovery disabled at its end. Set
1887          * NO_DISCOVERY as appropriate.
1888          */
1889         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
1890                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
1891                        libcfs_nid2str(lp->lp_primary_nid));
1892                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
1893         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1894                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
1895                        libcfs_nid2str(lp->lp_primary_nid));
1896                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
1897         }
1898
1899         /*
1900          * Check for truncation of the Put message. Clear the
1901          * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping,
1902          * and tell discovery to allocate a bigger buffer.
1903          */
1904         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
1905                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
1906                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
1907                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1908                 lp->lp_state |= LNET_PEER_FORCE_PING;
1909                 CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n",
1910                        libcfs_nid2str(lp->lp_primary_nid),
1911                        pbuf->pb_info.pi_nnis);
1912                 goto out;
1913         }
1914
1915         /*
1916          * Check whether the Put data is stale. Stale data can just be
1917          * dropped.
1918          */
1919         if (pbuf->pb_info.pi_nnis > 1 &&
1920             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid &&
1921             LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
1922                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1923                        libcfs_nid2str(lp->lp_primary_nid),
1924                        LNET_PING_BUFFER_SEQNO(pbuf),
1925                        lp->lp_peer_seqno);
1926                 goto out;
1927         }
1928
1929         /*
1930          * Check whether the Put data is new, in which case we clear
1931          * the UPTODATE flag and prepare to process it.
1932          *
1933          * If the Put data is current, and the peer is UPTODATE then
1934          * we assome everything is all right and drop the data as
1935          * stale.
1936          */
1937         if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) {
1938                 lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
1939                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1940         } else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) {
1941                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1942                        libcfs_nid2str(lp->lp_primary_nid),
1943                        LNET_PING_BUFFER_SEQNO(pbuf),
1944                        lp->lp_peer_seqno);
1945                 goto out;
1946         }
1947
1948         /*
1949          * If there is data present that hasn't been processed yet,
1950          * we'll replace it if the Put contained newer data and it
1951          * fits. We're racing with a Ping or earlier Push in this
1952          * case.
1953          */
1954         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
1955                 if (LNET_PING_BUFFER_SEQNO(pbuf) >
1956                         LNET_PING_BUFFER_SEQNO(lp->lp_data) &&
1957                     pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) {
1958                         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1959                                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1960                         CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n",
1961                               libcfs_nid2str(lp->lp_primary_nid),
1962                               LNET_PING_BUFFER_SEQNO(pbuf),
1963                               LNET_PING_BUFFER_SEQNO(lp->lp_data));
1964                 }
1965                 goto out;
1966         }
1967
1968         /*
1969          * Allocate a buffer to copy the data. On a failure we drop
1970          * the Push and set FORCE_PING to force the discovery
1971          * thread to fix the problem by pinging the peer.
1972          */
1973         lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC);
1974         if (!lp->lp_data) {
1975                 lp->lp_state |= LNET_PEER_FORCE_PING;
1976                 CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n",
1977                        libcfs_nid2str(lp->lp_primary_nid),
1978                        LNET_PING_BUFFER_SEQNO(pbuf));
1979                 goto out;
1980         }
1981
1982         /* Success */
1983         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1984                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1985         lp->lp_state |= LNET_PEER_DATA_PRESENT;
1986         CDEBUG(D_NET, "Received Push %s %u\n",
1987                libcfs_nid2str(lp->lp_primary_nid),
1988                LNET_PING_BUFFER_SEQNO(pbuf));
1989
1990 out:
1991         /*
1992          * Queue the peer for discovery, and wake the discovery thread
1993          * if the peer was already queued, because its status changed.
1994          */
1995         spin_unlock(&lp->lp_lock);
1996         lnet_net_lock(LNET_LOCK_EX);
1997         if (lnet_peer_queue_for_discovery(lp))
1998                 wake_up(&the_lnet.ln_dc_waitq);
1999         /* Drop refcount from lookup */
2000         lnet_peer_decref_locked(lp);
2001         lnet_net_unlock(LNET_LOCK_EX);
2002 }
2003
2004 /*
2005  * Clear the discovery error state, unless we're already discovering
2006  * this peer, in which case the error is current.
2007  */
2008 static void lnet_peer_clear_discovery_error(struct lnet_peer *lp)
2009 {
2010         spin_lock(&lp->lp_lock);
2011         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
2012                 lp->lp_dc_error = 0;
2013         spin_unlock(&lp->lp_lock);
2014 }
2015
2016 /*
2017  * Peer discovery slow path. The ln_api_mutex is held on entry, and
2018  * dropped/retaken within this function. An lnet_peer_ni is passed in
2019  * because discovery could tear down an lnet_peer.
2020  */
2021 int
2022 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block)
2023 {
2024         DEFINE_WAIT(wait);
2025         struct lnet_peer *lp;
2026         int rc = 0;
2027
2028 again:
2029         lnet_net_unlock(cpt);
2030         lnet_net_lock(LNET_LOCK_EX);
2031         lp = lpni->lpni_peer_net->lpn_peer;
2032         lnet_peer_clear_discovery_error(lp);
2033
2034         /*
2035          * We're willing to be interrupted. The lpni can become a
2036          * zombie if we race with DLC, so we must check for that.
2037          */
2038         for (;;) {
2039                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
2040                 if (signal_pending(current))
2041                         break;
2042                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2043                         break;
2044                 if (lp->lp_dc_error)
2045                         break;
2046                 if (lnet_peer_is_uptodate(lp))
2047                         break;
2048                 lnet_peer_queue_for_discovery(lp);
2049                 /*
2050                  * if caller requested a non-blocking operation then
2051                  * return immediately. Once discovery is complete then the
2052                  * peer ref will be decremented and any pending messages
2053                  * that were stopped due to discovery will be transmitted.
2054                  */
2055                 if (!block)
2056                         break;
2057
2058                 lnet_peer_addref_locked(lp);
2059                 lnet_net_unlock(LNET_LOCK_EX);
2060                 schedule();
2061                 finish_wait(&lp->lp_dc_waitq, &wait);
2062                 lnet_net_lock(LNET_LOCK_EX);
2063                 lnet_peer_decref_locked(lp);
2064                 /* Peer may have changed */
2065                 lp = lpni->lpni_peer_net->lpn_peer;
2066         }
2067         finish_wait(&lp->lp_dc_waitq, &wait);
2068
2069         lnet_net_unlock(LNET_LOCK_EX);
2070         lnet_net_lock(cpt);
2071
2072         /*
2073          * If the peer has changed after we've discovered the older peer,
2074          * then we need to discovery the new peer to make sure the
2075          * interface information is up to date
2076          */
2077         if (lp != lpni->lpni_peer_net->lpn_peer)
2078                 goto again;
2079
2080         if (signal_pending(current))
2081                 rc = -EINTR;
2082         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2083                 rc = -ESHUTDOWN;
2084         else if (lp->lp_dc_error)
2085                 rc = lp->lp_dc_error;
2086         else if (!block)
2087                 CDEBUG(D_NET, "non-blocking discovery\n");
2088         else if (!lnet_peer_is_uptodate(lp))
2089                 goto again;
2090
2091         CDEBUG(D_NET, "peer %s NID %s: %d. %s\n",
2092                (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"),
2093                libcfs_nid2str(lpni->lpni_nid), rc,
2094                (!block) ? "pending discovery" : "discovery complete");
2095
2096         return rc;
2097 }
2098
2099 /* Handle an incoming ack for a push. */
2100 static void
2101 lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev)
2102 {
2103         struct lnet_ping_buffer *pbuf;
2104
2105         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2106         spin_lock(&lp->lp_lock);
2107         lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2108         lp->lp_push_error = ev->status;
2109         if (ev->status)
2110                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2111         else
2112                 lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2113         spin_unlock(&lp->lp_lock);
2114
2115         CDEBUG(D_NET, "peer %s ev->status %d\n",
2116                libcfs_nid2str(lp->lp_primary_nid), ev->status);
2117 }
2118
2119 /* Handle a Reply message. This is the reply to a Ping message. */
2120 static void
2121 lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
2122 {
2123         struct lnet_ping_buffer *pbuf;
2124         int rc;
2125
2126         spin_lock(&lp->lp_lock);
2127
2128         /*
2129          * If some kind of error happened the contents of message
2130          * cannot be used. Set PING_FAILED to trigger a retry.
2131          */
2132         if (ev->status) {
2133                 lp->lp_state |= LNET_PEER_PING_FAILED;
2134                 lp->lp_ping_error = ev->status;
2135                 CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n",
2136                        ev->status,
2137                        libcfs_nid2str(lp->lp_primary_nid),
2138                        libcfs_nid2str(ev->source.nid));
2139                 goto out;
2140         }
2141
2142         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2143         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
2144                 lnet_swap_pinginfo(pbuf);
2145
2146         /*
2147          * A reply with invalid or corrupted info. Set PING_FAILED to
2148          * trigger a retry.
2149          */
2150         rc = lnet_ping_info_validate(&pbuf->pb_info);
2151         if (rc) {
2152                 lp->lp_state |= LNET_PEER_PING_FAILED;
2153                 lp->lp_ping_error = 0;
2154                 CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n",
2155                        libcfs_nid2str(lp->lp_primary_nid), rc);
2156                 goto out;
2157         }
2158
2159         /*
2160          * Update the MULTI_RAIL flag based on the reply. If the peer
2161          * was configured with DLC then the setting should match what
2162          * DLC put in.
2163          */
2164         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
2165                 if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2166                         /* Everything's fine */
2167                 } else if (lp->lp_state & LNET_PEER_CONFIGURED) {
2168                         CWARN("Reply says %s is Multi-Rail, DLC says not\n",
2169                               libcfs_nid2str(lp->lp_primary_nid));
2170                 } else {
2171                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
2172                         lnet_peer_clr_non_mr_pref_nids(lp);
2173                 }
2174         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2175                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
2176                         CWARN("DLC says %s is Multi-Rail, Reply says not\n",
2177                               libcfs_nid2str(lp->lp_primary_nid));
2178                 } else {
2179                         CERROR("Multi-Rail state vanished from %s\n",
2180                                libcfs_nid2str(lp->lp_primary_nid));
2181                         lp->lp_state &= ~LNET_PEER_MULTI_RAIL;
2182                 }
2183         }
2184
2185         /*
2186          * Make sure we'll allocate the correct size ping buffer when
2187          * pinging the peer.
2188          */
2189         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
2190                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
2191
2192         /*
2193          * The peer may have discovery disabled at its end. Set
2194          * NO_DISCOVERY as appropriate.
2195          */
2196         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
2197                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
2198                        libcfs_nid2str(lp->lp_primary_nid));
2199                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
2200         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
2201                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
2202                        libcfs_nid2str(lp->lp_primary_nid));
2203                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
2204         }
2205
2206         /*
2207          * Check for truncation of the Reply. Clear PING_SENT and set
2208          * PING_FAILED to trigger a retry.
2209          */
2210         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
2211                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
2212                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
2213                 lp->lp_state |= LNET_PEER_PING_FAILED;
2214                 lp->lp_ping_error = 0;
2215                 CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n",
2216                        libcfs_nid2str(lp->lp_primary_nid),
2217                        pbuf->pb_info.pi_nnis);
2218                 goto out;
2219         }
2220
2221         /*
2222          * Check the sequence numbers in the reply. These are only
2223          * available if the reply came from a Multi-Rail peer.
2224          */
2225         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
2226             pbuf->pb_info.pi_nnis > 1 &&
2227             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
2228                 if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
2229                         CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n",
2230                                 libcfs_nid2str(lp->lp_primary_nid),
2231                                 LNET_PING_BUFFER_SEQNO(pbuf),
2232                                 lp->lp_peer_seqno);
2233                         goto out;
2234                 }
2235
2236                 if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno)
2237                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2238         }
2239
2240         /* We're happy with the state of the data in the buffer. */
2241         CDEBUG(D_NET, "peer %s data present %u\n",
2242                libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno);
2243         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
2244                 lnet_ping_buffer_decref(lp->lp_data);
2245         else
2246                 lp->lp_state |= LNET_PEER_DATA_PRESENT;
2247         lnet_ping_buffer_addref(pbuf);
2248         lp->lp_data = pbuf;
2249 out:
2250         lp->lp_state &= ~LNET_PEER_PING_SENT;
2251         spin_unlock(&lp->lp_lock);
2252 }
2253
2254 /*
2255  * Send event handling. Only matters for error cases, where we clean
2256  * up state on the peer and peer_ni that would otherwise be updated in
2257  * the REPLY event handler for a successful Ping, and the ACK event
2258  * handler for a successful Push.
2259  */
2260 static int
2261 lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev)
2262 {
2263         int rc = 0;
2264
2265         if (!ev->status)
2266                 goto out;
2267
2268         spin_lock(&lp->lp_lock);
2269         if (ev->msg_type == LNET_MSG_GET) {
2270                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2271                 lp->lp_state |= LNET_PEER_PING_FAILED;
2272                 lp->lp_ping_error = ev->status;
2273         } else { /* ev->msg_type == LNET_MSG_PUT */
2274                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2275                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2276                 lp->lp_push_error = ev->status;
2277         }
2278         spin_unlock(&lp->lp_lock);
2279         rc = LNET_REDISCOVER_PEER;
2280 out:
2281         CDEBUG(D_NET, "%s Send to %s: %d\n",
2282                 (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"),
2283                 libcfs_nid2str(ev->target.nid), rc);
2284         return rc;
2285 }
2286
2287 /*
2288  * Unlink event handling. This event is only seen if a call to
2289  * LNetMDUnlink() caused the event to be unlinked. If this call was
2290  * made after the event was set up in LNetGet() or LNetPut() then we
2291  * assume the Ping or Push timed out.
2292  */
2293 static void
2294 lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev)
2295 {
2296         spin_lock(&lp->lp_lock);
2297         /* We've passed through LNetGet() */
2298         if (lp->lp_state & LNET_PEER_PING_SENT) {
2299                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2300                 lp->lp_state |= LNET_PEER_PING_FAILED;
2301                 lp->lp_ping_error = -ETIMEDOUT;
2302                 CDEBUG(D_NET, "Ping Unlink for message to peer %s\n",
2303                         libcfs_nid2str(lp->lp_primary_nid));
2304         }
2305         /* We've passed through LNetPut() */
2306         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2307                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2308                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2309                 lp->lp_push_error = -ETIMEDOUT;
2310                 CDEBUG(D_NET, "Push Unlink for message to peer %s\n",
2311                         libcfs_nid2str(lp->lp_primary_nid));
2312         }
2313         spin_unlock(&lp->lp_lock);
2314 }
2315
2316 /*
2317  * Event handler for the discovery EQ.
2318  *
2319  * Called with lnet_res_lock(cpt) held. The cpt is the
2320  * lnet_cpt_of_cookie() of the md handle cookie.
2321  */
2322 static void lnet_discovery_event_handler(struct lnet_event *event)
2323 {
2324         struct lnet_peer *lp = event->md.user_ptr;
2325         struct lnet_ping_buffer *pbuf;
2326         int rc;
2327
2328         /* discovery needs to take another look */
2329         rc = LNET_REDISCOVER_PEER;
2330
2331         CDEBUG(D_NET, "Received event: %d\n", event->type);
2332
2333         switch (event->type) {
2334         case LNET_EVENT_ACK:
2335                 lnet_discovery_event_ack(lp, event);
2336                 break;
2337         case LNET_EVENT_REPLY:
2338                 lnet_discovery_event_reply(lp, event);
2339                 break;
2340         case LNET_EVENT_SEND:
2341                 /* Only send failure triggers a retry. */
2342                 rc = lnet_discovery_event_send(lp, event);
2343                 break;
2344         case LNET_EVENT_UNLINK:
2345                 /* LNetMDUnlink() was called */
2346                 lnet_discovery_event_unlink(lp, event);
2347                 break;
2348         default:
2349                 /* Invalid events. */
2350                 LBUG();
2351         }
2352         lnet_net_lock(LNET_LOCK_EX);
2353         if (event->unlinked) {
2354                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
2355                 lnet_ping_buffer_decref(pbuf);
2356                 lnet_peer_decref_locked(lp);
2357         }
2358         if (rc == LNET_REDISCOVER_PEER) {
2359                 list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2360                 wake_up(&the_lnet.ln_dc_waitq);
2361         }
2362         lnet_net_unlock(LNET_LOCK_EX);
2363 }
2364
2365 /*
2366  * Build a peer from incoming data.
2367  *
2368  * The NIDs in the incoming data are supposed to be structured as follows:
2369  *  - loopback
2370  *  - primary NID
2371  *  - other NIDs in same net
2372  *  - NIDs in second net
2373  *  - NIDs in third net
2374  *  - ...
2375  * This due to the way the list of NIDs in the data is created.
2376  *
2377  * Note that this function will mark the peer uptodate unless an
2378  * ENOMEM is encontered. All other errors are due to a conflict
2379  * between the DLC configuration and what discovery sees. We treat DLC
2380  * as binding, and therefore set the NIDS_UPTODATE flag to prevent the
2381  * peer from becoming stuck in discovery.
2382  */
2383 static int lnet_peer_merge_data(struct lnet_peer *lp,
2384                                 struct lnet_ping_buffer *pbuf)
2385 {
2386         struct lnet_peer_ni *lpni;
2387         lnet_nid_t *curnis = NULL;
2388         lnet_nid_t *addnis = NULL;
2389         lnet_nid_t *delnis = NULL;
2390         unsigned flags;
2391         int ncurnis;
2392         int naddnis;
2393         int ndelnis;
2394         int nnis = 0;
2395         int i;
2396         int j;
2397         int rc;
2398
2399         flags = LNET_PEER_DISCOVERED;
2400         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2401                 flags |= LNET_PEER_MULTI_RAIL;
2402
2403         nnis = MAX(lp->lp_nnis, pbuf->pb_info.pi_nnis);
2404         LIBCFS_ALLOC(curnis, nnis * sizeof(lnet_nid_t));
2405         LIBCFS_ALLOC(addnis, nnis * sizeof(lnet_nid_t));
2406         LIBCFS_ALLOC(delnis, nnis * sizeof(lnet_nid_t));
2407         if (!curnis || !addnis || !delnis) {
2408                 rc = -ENOMEM;
2409                 goto out;
2410         }
2411         ncurnis = 0;
2412         naddnis = 0;
2413         ndelnis = 0;
2414
2415         /* Construct the list of NIDs present in peer. */
2416         lpni = NULL;
2417         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
2418                 curnis[ncurnis++] = lpni->lpni_nid;
2419
2420         /*
2421          * Check for NIDs in pbuf not present in curnis[].
2422          * The loop starts at 1 to skip the loopback NID.
2423          */
2424         for (i = 1; i < pbuf->pb_info.pi_nnis; i++) {
2425                 for (j = 0; j < ncurnis; j++)
2426                         if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
2427                                 break;
2428                 if (j == ncurnis)
2429                         addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid;
2430         }
2431         /*
2432          * Check for NIDs in curnis[] not present in pbuf.
2433          * The nested loop starts at 1 to skip the loopback NID.
2434          *
2435          * But never add the loopback NID to delnis[]: if it is
2436          * present in curnis[] then this peer is for this node.
2437          */
2438         for (i = 0; i < ncurnis; i++) {
2439                 if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND)
2440                         continue;
2441                 for (j = 1; j < pbuf->pb_info.pi_nnis; j++)
2442                         if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid)
2443                                 break;
2444                 if (j == pbuf->pb_info.pi_nnis)
2445                         delnis[ndelnis++] = curnis[i];
2446         }
2447
2448         for (i = 0; i < naddnis; i++) {
2449                 rc = lnet_peer_add_nid(lp, addnis[i], flags);
2450                 if (rc) {
2451                         CERROR("Error adding NID %s to peer %s: %d\n",
2452                                libcfs_nid2str(addnis[i]),
2453                                libcfs_nid2str(lp->lp_primary_nid), rc);
2454                         if (rc == -ENOMEM)
2455                                 goto out;
2456                 }
2457         }
2458         for (i = 0; i < ndelnis; i++) {
2459                 rc = lnet_peer_del_nid(lp, delnis[i], flags);
2460                 if (rc) {
2461                         CERROR("Error deleting NID %s from peer %s: %d\n",
2462                                libcfs_nid2str(delnis[i]),
2463                                libcfs_nid2str(lp->lp_primary_nid), rc);
2464                         if (rc == -ENOMEM)
2465                                 goto out;
2466                 }
2467         }
2468         /*
2469          * Errors other than -ENOMEM are due to peers having been
2470          * configured with DLC. Ignore these because DLC overrides
2471          * Discovery.
2472          */
2473         rc = 0;
2474 out:
2475         LIBCFS_FREE(curnis, nnis * sizeof(lnet_nid_t));
2476         LIBCFS_FREE(addnis, nnis * sizeof(lnet_nid_t));
2477         LIBCFS_FREE(delnis, nnis * sizeof(lnet_nid_t));
2478         lnet_ping_buffer_decref(pbuf);
2479         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2480
2481         if (rc) {
2482                 spin_lock(&lp->lp_lock);
2483                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2484                 lp->lp_state |= LNET_PEER_FORCE_PING;
2485                 spin_unlock(&lp->lp_lock);
2486         }
2487         return rc;
2488 }
2489
2490 /*
2491  * The data in pbuf says lp is its primary peer, but the data was
2492  * received by a different peer. Try to update lp with the data.
2493  */
2494 static int
2495 lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
2496 {
2497         struct lnet_handle_md mdh;
2498
2499         /* Queue lp for discovery, and force it on the request queue. */
2500         lnet_net_lock(LNET_LOCK_EX);
2501         if (lnet_peer_queue_for_discovery(lp))
2502                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2503         lnet_net_unlock(LNET_LOCK_EX);
2504
2505         LNetInvalidateMDHandle(&mdh);
2506
2507         /*
2508          * Decide whether we can move the peer to the DATA_PRESENT state.
2509          *
2510          * We replace stale data for a multi-rail peer, repair PING_FAILED
2511          * status, and preempt FORCE_PING.
2512          *
2513          * If after that we have DATA_PRESENT, we merge it into this peer.
2514          */
2515         spin_lock(&lp->lp_lock);
2516         if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2517                 if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) {
2518                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2519                 } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2520                         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2521                         lnet_ping_buffer_decref(pbuf);
2522                         pbuf = lp->lp_data;
2523                         lp->lp_data = NULL;
2524                 }
2525         }
2526         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2527                 lnet_ping_buffer_decref(lp->lp_data);
2528                 lp->lp_data = NULL;
2529                 lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2530         }
2531         if (lp->lp_state & LNET_PEER_PING_FAILED) {
2532                 mdh = lp->lp_ping_mdh;
2533                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2534                 lp->lp_state &= ~LNET_PEER_PING_FAILED;
2535                 lp->lp_ping_error = 0;
2536         }
2537         if (lp->lp_state & LNET_PEER_FORCE_PING)
2538                 lp->lp_state &= ~LNET_PEER_FORCE_PING;
2539         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2540         spin_unlock(&lp->lp_lock);
2541
2542         if (!LNetMDHandleIsInvalid(mdh))
2543                 LNetMDUnlink(mdh);
2544
2545         if (pbuf)
2546                 return lnet_peer_merge_data(lp, pbuf);
2547
2548         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2549         return 0;
2550 }
2551
2552 /*
2553  * Update a peer using the data received.
2554  */
2555 static int lnet_peer_data_present(struct lnet_peer *lp)
2556 __must_hold(&lp->lp_lock)
2557 {
2558         struct lnet_ping_buffer *pbuf;
2559         struct lnet_peer_ni *lpni;
2560         lnet_nid_t nid = LNET_NID_ANY;
2561         unsigned flags;
2562         int rc = 0;
2563
2564         pbuf = lp->lp_data;
2565         lp->lp_data = NULL;
2566         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2567         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2568         spin_unlock(&lp->lp_lock);
2569
2570         /*
2571          * Modifications of peer structures are done while holding the
2572          * ln_api_mutex. A global lock is required because we may be
2573          * modifying multiple peer structures, and a mutex greatly
2574          * simplifies memory management.
2575          *
2576          * The actual changes to the data structures must also protect
2577          * against concurrent lookups, for which the lnet_net_lock in
2578          * LNET_LOCK_EX mode is used.
2579          */
2580         mutex_lock(&the_lnet.ln_api_mutex);
2581         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
2582                 rc = -ESHUTDOWN;
2583                 goto out;
2584         }
2585
2586         /*
2587          * If this peer is not on the peer list then it is being torn
2588          * down, and our reference count may be all that is keeping it
2589          * alive. Don't do any work on it.
2590          */
2591         if (list_empty(&lp->lp_peer_list))
2592                 goto out;
2593
2594         flags = LNET_PEER_DISCOVERED;
2595         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2596                 flags |= LNET_PEER_MULTI_RAIL;
2597
2598         /*
2599          * Check whether the primary NID in the message matches the
2600          * primary NID of the peer. If it does, update the peer, if
2601          * it it does not, check whether there is already a peer with
2602          * that primary NID. If no such peer exists, try to update
2603          * the primary NID of the current peer (allowed if it was
2604          * created due to message traffic) and complete the update.
2605          * If the peer did exist, hand off the data to it.
2606          *
2607          * The peer for the loopback interface is a special case: this
2608          * is the peer for the local node, and we want to set its
2609          * primary NID to the correct value here. Moreover, this peer
2610          * can show up with only the loopback NID in the ping buffer.
2611          */
2612         if (pbuf->pb_info.pi_nnis <= 1)
2613                 goto out;
2614         nid = pbuf->pb_info.pi_ni[1].ns_nid;
2615         if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) {
2616                 rc = lnet_peer_set_primary_nid(lp, nid, flags);
2617                 if (!rc)
2618                         rc = lnet_peer_merge_data(lp, pbuf);
2619         } else if (lp->lp_primary_nid == nid) {
2620                 rc = lnet_peer_merge_data(lp, pbuf);
2621         } else {
2622                 lpni = lnet_find_peer_ni_locked(nid);
2623                 if (!lpni) {
2624                         rc = lnet_peer_set_primary_nid(lp, nid, flags);
2625                         if (rc) {
2626                                 CERROR("Primary NID error %s versus %s: %d\n",
2627                                        libcfs_nid2str(lp->lp_primary_nid),
2628                                        libcfs_nid2str(nid), rc);
2629                         } else {
2630                                 rc = lnet_peer_merge_data(lp, pbuf);
2631                         }
2632                 } else {
2633                         rc = lnet_peer_set_primary_data(
2634                                 lpni->lpni_peer_net->lpn_peer, pbuf);
2635                         lnet_peer_ni_decref_locked(lpni);
2636                 }
2637         }
2638 out:
2639         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2640         mutex_unlock(&the_lnet.ln_api_mutex);
2641
2642         spin_lock(&lp->lp_lock);
2643         /* Tell discovery to re-check the peer immediately. */
2644         if (!rc)
2645                 rc = LNET_REDISCOVER_PEER;
2646         return rc;
2647 }
2648
2649 /*
2650  * A ping failed. Clear the PING_FAILED state and set the
2651  * FORCE_PING state, to ensure a retry even if discovery is
2652  * disabled. This avoids being left with incorrect state.
2653  */
2654 static int lnet_peer_ping_failed(struct lnet_peer *lp)
2655 __must_hold(&lp->lp_lock)
2656 {
2657         struct lnet_handle_md mdh;
2658         int rc;
2659
2660         mdh = lp->lp_ping_mdh;
2661         LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2662         lp->lp_state &= ~LNET_PEER_PING_FAILED;
2663         lp->lp_state |= LNET_PEER_FORCE_PING;
2664         rc = lp->lp_ping_error;
2665         lp->lp_ping_error = 0;
2666         spin_unlock(&lp->lp_lock);
2667
2668         if (!LNetMDHandleIsInvalid(mdh))
2669                 LNetMDUnlink(mdh);
2670
2671         CDEBUG(D_NET, "peer %s:%d\n",
2672                libcfs_nid2str(lp->lp_primary_nid), rc);
2673
2674         spin_lock(&lp->lp_lock);
2675         return rc ? rc : LNET_REDISCOVER_PEER;
2676 }
2677
2678 /*
2679  * Select NID to send a Ping or Push to.
2680  */
2681 static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
2682 {
2683         struct lnet_peer_ni *lpni;
2684
2685         /* Look for a direct-connected NID for this peer. */
2686         lpni = NULL;
2687         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2688                 if (!lnet_is_peer_ni_healthy_locked(lpni))
2689                         continue;
2690                 if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
2691                         continue;
2692                 break;
2693         }
2694         if (lpni)
2695                 return lpni->lpni_nid;
2696
2697         /* Look for a routed-connected NID for this peer. */
2698         lpni = NULL;
2699         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2700                 if (!lnet_is_peer_ni_healthy_locked(lpni))
2701                         continue;
2702                 if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
2703                         continue;
2704                 break;
2705         }
2706         if (lpni)
2707                 return lpni->lpni_nid;
2708
2709         return LNET_NID_ANY;
2710 }
2711
2712 /* Active side of ping. */
2713 static int lnet_peer_send_ping(struct lnet_peer *lp)
2714 __must_hold(&lp->lp_lock)
2715 {
2716         struct lnet_md md = { NULL };
2717         struct lnet_process_id id;
2718         struct lnet_ping_buffer *pbuf;
2719         int nnis;
2720         int rc;
2721         int cpt;
2722
2723         lp->lp_state |= LNET_PEER_PING_SENT;
2724         lp->lp_state &= ~LNET_PEER_FORCE_PING;
2725         spin_unlock(&lp->lp_lock);
2726
2727         nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN);
2728         pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
2729         if (!pbuf) {
2730                 rc = -ENOMEM;
2731                 goto fail_error;
2732         }
2733
2734         /* initialize md content */
2735         md.start     = &pbuf->pb_info;
2736         md.length    = LNET_PING_INFO_SIZE(nnis);
2737         md.threshold = 2; /* GET/REPLY */
2738         md.max_size  = 0;
2739         md.options   = LNET_MD_TRUNCATE;
2740         md.user_ptr  = lp;
2741         md.eq_handle = the_lnet.ln_dc_eqh;
2742
2743         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_ping_mdh);
2744         if (rc != 0) {
2745                 lnet_ping_buffer_decref(pbuf);
2746                 CERROR("Can't bind MD: %d\n", rc);
2747                 goto fail_error;
2748         }
2749         cpt = lnet_net_lock_current();
2750         /* Refcount for MD. */
2751         lnet_peer_addref_locked(lp);
2752         id.pid = LNET_PID_LUSTRE;
2753         id.nid = lnet_peer_select_nid(lp);
2754         lnet_net_unlock(cpt);
2755
2756         if (id.nid == LNET_NID_ANY) {
2757                 rc = -EHOSTUNREACH;
2758                 goto fail_unlink_md;
2759         }
2760
2761         rc = LNetGet(LNET_NID_ANY, lp->lp_ping_mdh, id,
2762                      LNET_RESERVED_PORTAL,
2763                      LNET_PROTO_PING_MATCHBITS, 0);
2764
2765         if (rc)
2766                 goto fail_unlink_md;
2767
2768         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2769
2770         spin_lock(&lp->lp_lock);
2771         return 0;
2772
2773 fail_unlink_md:
2774         LNetMDUnlink(lp->lp_ping_mdh);
2775         LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2776 fail_error:
2777         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2778         /*
2779          * The errors that get us here are considered hard errors and
2780          * cause Discovery to terminate. So we clear PING_SENT, but do
2781          * not set either PING_FAILED or FORCE_PING. In fact we need
2782          * to clear PING_FAILED, because the unlink event handler will
2783          * have set it if we called LNetMDUnlink() above.
2784          */
2785         spin_lock(&lp->lp_lock);
2786         lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED);
2787         return rc;
2788 }
2789
2790 /*
2791  * This function exists because you cannot call LNetMDUnlink() from an
2792  * event handler.
2793  */
2794 static int lnet_peer_push_failed(struct lnet_peer *lp)
2795 __must_hold(&lp->lp_lock)
2796 {
2797         struct lnet_handle_md mdh;
2798         int rc;
2799
2800         mdh = lp->lp_push_mdh;
2801         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2802         lp->lp_state &= ~LNET_PEER_PUSH_FAILED;
2803         rc = lp->lp_push_error;
2804         lp->lp_push_error = 0;
2805         spin_unlock(&lp->lp_lock);
2806
2807         if (!LNetMDHandleIsInvalid(mdh))
2808                 LNetMDUnlink(mdh);
2809
2810         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2811         spin_lock(&lp->lp_lock);
2812         return rc ? rc : LNET_REDISCOVER_PEER;
2813 }
2814
2815 /* Active side of push. */
2816 static int lnet_peer_send_push(struct lnet_peer *lp)
2817 __must_hold(&lp->lp_lock)
2818 {
2819         struct lnet_ping_buffer *pbuf;
2820         struct lnet_process_id id;
2821         struct lnet_md md;
2822         int cpt;
2823         int rc;
2824
2825         /* Don't push to a non-multi-rail peer. */
2826         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
2827                 lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2828                 return 0;
2829         }
2830
2831         lp->lp_state |= LNET_PEER_PUSH_SENT;
2832         lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2833         spin_unlock(&lp->lp_lock);
2834
2835         cpt = lnet_net_lock_current();
2836         pbuf = the_lnet.ln_ping_target;
2837         lnet_ping_buffer_addref(pbuf);
2838         lnet_net_unlock(cpt);
2839
2840         /* Push source MD */
2841         md.start     = &pbuf->pb_info;
2842         md.length    = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
2843         md.threshold = 2; /* Put/Ack */
2844         md.max_size  = 0;
2845         md.options   = 0;
2846         md.eq_handle = the_lnet.ln_dc_eqh;
2847         md.user_ptr  = lp;
2848
2849         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
2850         if (rc) {
2851                 lnet_ping_buffer_decref(pbuf);
2852                 CERROR("Can't bind push source MD: %d\n", rc);
2853                 goto fail_error;
2854         }
2855         cpt = lnet_net_lock_current();
2856         /* Refcount for MD. */
2857         lnet_peer_addref_locked(lp);
2858         id.pid = LNET_PID_LUSTRE;
2859         id.nid = lnet_peer_select_nid(lp);
2860         lnet_net_unlock(cpt);
2861
2862         if (id.nid == LNET_NID_ANY) {
2863                 rc = -EHOSTUNREACH;
2864                 goto fail_unlink;
2865         }
2866
2867         rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh,
2868                      LNET_ACK_REQ, id, LNET_RESERVED_PORTAL,
2869                      LNET_PROTO_PING_MATCHBITS, 0, 0);
2870
2871         if (rc)
2872                 goto fail_unlink;
2873
2874         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2875
2876         spin_lock(&lp->lp_lock);
2877         return 0;
2878
2879 fail_unlink:
2880         LNetMDUnlink(lp->lp_push_mdh);
2881         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2882 fail_error:
2883         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2884         /*
2885          * The errors that get us here are considered hard errors and
2886          * cause Discovery to terminate. So we clear PUSH_SENT, but do
2887          * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED,
2888          * because the unlink event handler will have set it if we
2889          * called LNetMDUnlink() above.
2890          */
2891         spin_lock(&lp->lp_lock);
2892         lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED);
2893         return rc;
2894 }
2895
2896 /*
2897  * An unrecoverable error was encountered during discovery.
2898  * Set error status in peer and abort discovery.
2899  */
2900 static void lnet_peer_discovery_error(struct lnet_peer *lp, int error)
2901 {
2902         CDEBUG(D_NET, "Discovery error %s: %d\n",
2903                libcfs_nid2str(lp->lp_primary_nid), error);
2904
2905         spin_lock(&lp->lp_lock);
2906         lp->lp_dc_error = error;
2907         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2908         lp->lp_state |= LNET_PEER_REDISCOVER;
2909         spin_unlock(&lp->lp_lock);
2910 }
2911
2912 /*
2913  * Mark the peer as discovered.
2914  */
2915 static int lnet_peer_discovered(struct lnet_peer *lp)
2916 __must_hold(&lp->lp_lock)
2917 {
2918         lp->lp_state |= LNET_PEER_DISCOVERED;
2919         lp->lp_state &= ~(LNET_PEER_DISCOVERING |
2920                           LNET_PEER_REDISCOVER);
2921
2922         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2923
2924         return 0;
2925 }
2926
2927 /*
2928  * Mark the peer as to be rediscovered.
2929  */
2930 static int lnet_peer_rediscover(struct lnet_peer *lp)
2931 __must_hold(&lp->lp_lock)
2932 {
2933         lp->lp_state |= LNET_PEER_REDISCOVER;
2934         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2935
2936         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2937
2938         return 0;
2939 }
2940
2941 /*
2942  * Returns the first peer on the ln_dc_working queue if its timeout
2943  * has expired. Takes the current time as an argument so as to not
2944  * obsessively re-check the clock. The oldest discovery request will
2945  * be at the head of the queue.
2946  */
2947 static struct lnet_peer *lnet_peer_dc_timed_out(time64_t now)
2948 {
2949         struct lnet_peer *lp;
2950
2951         if (list_empty(&the_lnet.ln_dc_working))
2952                 return NULL;
2953         lp = list_first_entry(&the_lnet.ln_dc_working,
2954                               struct lnet_peer, lp_dc_list);
2955         if (now < lp->lp_last_queued + DEFAULT_PEER_TIMEOUT)
2956                 return NULL;
2957         return lp;
2958 }
2959
2960 /*
2961  * Discovering this peer is taking too long. Cancel any Ping or Push
2962  * that discovery is waiting on by unlinking the relevant MDs. The
2963  * lnet_discovery_event_handler() will proceed from here and complete
2964  * the cleanup.
2965  */
2966 static void lnet_peer_discovery_timeout(struct lnet_peer *lp)
2967 {
2968         struct lnet_handle_md ping_mdh;
2969         struct lnet_handle_md push_mdh;
2970
2971         LNetInvalidateMDHandle(&ping_mdh);
2972         LNetInvalidateMDHandle(&push_mdh);
2973
2974         spin_lock(&lp->lp_lock);
2975         if (lp->lp_state & LNET_PEER_PING_SENT) {
2976                 ping_mdh = lp->lp_ping_mdh;
2977                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2978         }
2979         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2980                 push_mdh = lp->lp_push_mdh;
2981                 LNetInvalidateMDHandle(&lp->lp_push_mdh);
2982         }
2983         spin_unlock(&lp->lp_lock);
2984
2985         if (!LNetMDHandleIsInvalid(ping_mdh))
2986                 LNetMDUnlink(ping_mdh);
2987         if (!LNetMDHandleIsInvalid(push_mdh))
2988                 LNetMDUnlink(push_mdh);
2989 }
2990
2991 /*
2992  * Wait for work to be queued or some other change that must be
2993  * attended to. Returns non-zero if the discovery thread should shut
2994  * down.
2995  */
2996 static int lnet_peer_discovery_wait_for_work(void)
2997 {
2998         int cpt;
2999         int rc = 0;
3000
3001         DEFINE_WAIT(wait);
3002
3003         cpt = lnet_net_lock_current();
3004         for (;;) {
3005                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
3006                                 TASK_INTERRUPTIBLE);
3007                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3008                         break;
3009                 if (lnet_push_target_resize_needed())
3010                         break;
3011                 if (!list_empty(&the_lnet.ln_dc_request))
3012                         break;
3013                 if (!list_empty(&the_lnet.ln_msg_resend))
3014                         break;
3015                 if (lnet_peer_dc_timed_out(ktime_get_real_seconds()))
3016                         break;
3017                 lnet_net_unlock(cpt);
3018
3019                 /*
3020                  * wakeup max every second to check if there are peers that
3021                  * have been stuck on the working queue for greater than
3022                  * the peer timeout.
3023                  */
3024                 schedule_timeout(cfs_time_seconds(1));
3025                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
3026                 cpt = lnet_net_lock_current();
3027         }
3028         finish_wait(&the_lnet.ln_dc_waitq, &wait);
3029
3030         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3031                 rc = -ESHUTDOWN;
3032
3033         lnet_net_unlock(cpt);
3034
3035         CDEBUG(D_NET, "woken: %d\n", rc);
3036
3037         return rc;
3038 }
3039
3040 /*
3041  * Messages that were pending on a destroyed peer will be put on a global
3042  * resend list. The message resend list will be checked by
3043  * the discovery thread when it wakes up, and will resend messages. These
3044  * messages can still be sendable in the case the lpni which was the initial
3045  * cause of the message re-queue was transfered to another peer.
3046  *
3047  * It is possible that LNet could be shutdown while we're iterating
3048  * through the list. lnet_shudown_lndnets() will attempt to access the
3049  * resend list, but will have to wait until the spinlock is released, by
3050  * which time there shouldn't be any more messages on the resend list.
3051  * During shutdown lnet_send() will fail and lnet_finalize() will be called
3052  * for the messages so they can be released. The other case is that
3053  * lnet_shudown_lndnets() can finalize all the messages before this
3054  * function can visit the resend list, in which case this function will be
3055  * a no-op.
3056  */
3057 static void lnet_resend_msgs(void)
3058 {
3059         struct lnet_msg *msg, *tmp;
3060         struct list_head resend;
3061         int rc;
3062
3063         INIT_LIST_HEAD(&resend);
3064
3065         spin_lock(&the_lnet.ln_msg_resend_lock);
3066         list_splice(&the_lnet.ln_msg_resend, &resend);
3067         spin_unlock(&the_lnet.ln_msg_resend_lock);
3068
3069         list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
3070                 list_del_init(&msg->msg_list);
3071                 rc = lnet_send(msg->msg_src_nid_param, msg,
3072                                msg->msg_rtr_nid_param);
3073                 if (rc < 0) {
3074                         CNETERR("Error sending %s to %s: %d\n",
3075                                lnet_msgtyp2str(msg->msg_type),
3076                                libcfs_id2str(msg->msg_target), rc);
3077                         lnet_finalize(msg, rc);
3078                 }
3079         }
3080 }
3081
3082 /* The discovery thread. */
3083 static int lnet_peer_discovery(void *arg)
3084 {
3085         struct lnet_peer *lp;
3086         time64_t now;
3087         int rc;
3088
3089         CDEBUG(D_NET, "started\n");
3090         cfs_block_allsigs();
3091
3092         for (;;) {
3093                 if (lnet_peer_discovery_wait_for_work())
3094                         break;
3095
3096                 lnet_resend_msgs();
3097
3098                 if (lnet_push_target_resize_needed())
3099                         lnet_push_target_resize();
3100
3101                 lnet_net_lock(LNET_LOCK_EX);
3102                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3103                         break;
3104
3105                 /*
3106                  * Process all incoming discovery work requests.  When
3107                  * discovery must wait on a peer to change state, it
3108                  * is added to the tail of the ln_dc_working queue. A
3109                  * timestamp keeps track of when the peer was added,
3110                  * so we can time out discovery requests that take too
3111                  * long.
3112                  */
3113                 while (!list_empty(&the_lnet.ln_dc_request)) {
3114                         lp = list_first_entry(&the_lnet.ln_dc_request,
3115                                               struct lnet_peer, lp_dc_list);
3116                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
3117                         /*
3118                          * set the time the peer was put on the dc_working
3119                          * queue. It shouldn't remain on the queue
3120                          * forever, in case the GET message (for ping)
3121                          * doesn't get a REPLY or the PUT message (for
3122                          * push) doesn't get an ACK.
3123                          *
3124                          * TODO: LNet Health will deal with this scenario
3125                          * in a generic way.
3126                          */
3127                         lp->lp_last_queued = ktime_get_real_seconds();
3128                         lnet_net_unlock(LNET_LOCK_EX);
3129
3130                         /*
3131                          * Select an action depending on the state of
3132                          * the peer and whether discovery is disabled.
3133                          * The check whether discovery is disabled is
3134                          * done after the code that handles processing
3135                          * for arrived data, cleanup for failures, and
3136                          * forcing a Ping or Push.
3137                          */
3138                         spin_lock(&lp->lp_lock);
3139                         CDEBUG(D_NET, "peer %s state %#x\n",
3140                                 libcfs_nid2str(lp->lp_primary_nid),
3141                                 lp->lp_state);
3142                         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
3143                                 rc = lnet_peer_data_present(lp);
3144                         else if (lp->lp_state & LNET_PEER_PING_FAILED)
3145                                 rc = lnet_peer_ping_failed(lp);
3146                         else if (lp->lp_state & LNET_PEER_PUSH_FAILED)
3147                                 rc = lnet_peer_push_failed(lp);
3148                         else if (lp->lp_state & LNET_PEER_FORCE_PING)
3149                                 rc = lnet_peer_send_ping(lp);
3150                         else if (lp->lp_state & LNET_PEER_FORCE_PUSH)
3151                                 rc = lnet_peer_send_push(lp);
3152                         else if (lnet_peer_discovery_disabled)
3153                                 rc = lnet_peer_rediscover(lp);
3154                         else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
3155                                 rc = lnet_peer_send_ping(lp);
3156                         else if (lnet_peer_needs_push(lp))
3157                                 rc = lnet_peer_send_push(lp);
3158                         else
3159                                 rc = lnet_peer_discovered(lp);
3160                         CDEBUG(D_NET, "peer %s state %#x rc %d\n",
3161                                 libcfs_nid2str(lp->lp_primary_nid),
3162                                 lp->lp_state, rc);
3163                         spin_unlock(&lp->lp_lock);
3164
3165                         lnet_net_lock(LNET_LOCK_EX);
3166                         if (rc == LNET_REDISCOVER_PEER) {
3167                                 list_move(&lp->lp_dc_list,
3168                                           &the_lnet.ln_dc_request);
3169                         } else if (rc) {
3170                                 lnet_peer_discovery_error(lp, rc);
3171                         }
3172                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
3173                                 lnet_peer_discovery_complete(lp);
3174                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3175                                 break;
3176                 }
3177
3178                 /*
3179                  * Now that the ln_dc_request queue has been emptied
3180                  * check the ln_dc_working queue for peers that are
3181                  * taking too long. Move all that are found to the
3182                  * ln_dc_expired queue and time out any pending
3183                  * Ping or Push. We have to drop the lnet_net_lock
3184                  * in the loop because lnet_peer_discovery_timeout()
3185                  * calls LNetMDUnlink().
3186                  */
3187                 now = ktime_get_real_seconds();
3188                 while ((lp = lnet_peer_dc_timed_out(now)) != NULL) {
3189                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
3190                         lnet_net_unlock(LNET_LOCK_EX);
3191                         lnet_peer_discovery_timeout(lp);
3192                         lnet_net_lock(LNET_LOCK_EX);
3193                 }
3194
3195                 lnet_net_unlock(LNET_LOCK_EX);
3196         }
3197
3198         CDEBUG(D_NET, "stopping\n");
3199         /*
3200          * Clean up before telling lnet_peer_discovery_stop() that
3201          * we're done. Use wake_up() below to somewhat reduce the
3202          * size of the thundering herd if there are multiple threads
3203          * waiting on discovery of a single peer.
3204          */
3205         LNetEQFree(the_lnet.ln_dc_eqh);
3206         LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3207
3208         /* Queue cleanup 1: stop all pending pings and pushes. */
3209         lnet_net_lock(LNET_LOCK_EX);
3210         while (!list_empty(&the_lnet.ln_dc_working)) {
3211                 lp = list_first_entry(&the_lnet.ln_dc_working,
3212                                       struct lnet_peer, lp_dc_list);
3213                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
3214                 lnet_net_unlock(LNET_LOCK_EX);
3215                 lnet_peer_discovery_timeout(lp);
3216                 lnet_net_lock(LNET_LOCK_EX);
3217         }
3218         lnet_net_unlock(LNET_LOCK_EX);
3219
3220         /* Queue cleanup 2: wait for the expired queue to clear. */
3221         while (!list_empty(&the_lnet.ln_dc_expired))
3222                 schedule_timeout(cfs_time_seconds(1));
3223
3224         /* Queue cleanup 3: clear the request queue. */
3225         lnet_net_lock(LNET_LOCK_EX);
3226         while (!list_empty(&the_lnet.ln_dc_request)) {
3227                 lp = list_first_entry(&the_lnet.ln_dc_request,
3228                                       struct lnet_peer, lp_dc_list);
3229                 lnet_peer_discovery_error(lp, -ESHUTDOWN);
3230                 lnet_peer_discovery_complete(lp);
3231         }
3232         lnet_net_unlock(LNET_LOCK_EX);
3233
3234         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3235         wake_up(&the_lnet.ln_dc_waitq);
3236
3237         CDEBUG(D_NET, "stopped\n");
3238
3239         return 0;
3240 }
3241
3242 /* ln_api_mutex is held on entry. */
3243 int lnet_peer_discovery_start(void)
3244 {
3245         struct task_struct *task;
3246         int rc;
3247
3248         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
3249                 return -EALREADY;
3250
3251         rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
3252         if (rc != 0) {
3253                 CERROR("Can't allocate discovery EQ: %d\n", rc);
3254                 return rc;
3255         }
3256
3257         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
3258         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
3259         if (IS_ERR(task)) {
3260                 rc = PTR_ERR(task);
3261                 CERROR("Can't start peer discovery thread: %d\n", rc);
3262
3263                 LNetEQFree(the_lnet.ln_dc_eqh);
3264                 LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3265
3266                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3267         }
3268
3269         CDEBUG(D_NET, "discovery start: %d\n", rc);
3270
3271         return rc;
3272 }
3273
3274 /* ln_api_mutex is held on entry. */
3275 void lnet_peer_discovery_stop(void)
3276 {
3277         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
3278                 return;
3279
3280         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
3281         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
3282         wake_up(&the_lnet.ln_dc_waitq);
3283
3284         wait_event(the_lnet.ln_dc_waitq,
3285                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
3286
3287         LASSERT(list_empty(&the_lnet.ln_dc_request));
3288         LASSERT(list_empty(&the_lnet.ln_dc_working));
3289         LASSERT(list_empty(&the_lnet.ln_dc_expired));
3290
3291         CDEBUG(D_NET, "discovery stopped\n");
3292 }
3293
3294 /* Debugging */
3295
3296 void
3297 lnet_debug_peer(lnet_nid_t nid)
3298 {
3299         char                    *aliveness = "NA";
3300         struct lnet_peer_ni     *lp;
3301         int                     cpt;
3302
3303         cpt = lnet_cpt_of_nid(nid, NULL);
3304         lnet_net_lock(cpt);
3305
3306         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
3307         if (IS_ERR(lp)) {
3308                 lnet_net_unlock(cpt);
3309                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
3310                 return;
3311         }
3312
3313         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
3314                 aliveness = lp->lpni_alive ? "up" : "down";
3315
3316         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
3317                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
3318                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
3319                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
3320                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
3321
3322         lnet_peer_ni_decref_locked(lp);
3323
3324         lnet_net_unlock(cpt);
3325 }
3326
3327 /* Gathering information for userspace. */
3328
3329 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
3330                           char aliveness[LNET_MAX_STR_LEN],
3331                           __u32 *cpt_iter, __u32 *refcount,
3332                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
3333                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
3334                           __u32 *peer_tx_qnob)
3335 {
3336         struct lnet_peer_table          *peer_table;
3337         struct lnet_peer_ni             *lp;
3338         int                             j;
3339         int                             lncpt;
3340         bool                            found = false;
3341
3342         /* get the number of CPTs */
3343         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3344
3345         /* if the cpt number to be examined is >= the number of cpts in
3346          * the system then indicate that there are no more cpts to examin
3347          */
3348         if (*cpt_iter >= lncpt)
3349                 return -ENOENT;
3350
3351         /* get the current table */
3352         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
3353         /* if the ptable is NULL then there are no more cpts to examine */
3354         if (peer_table == NULL)
3355                 return -ENOENT;
3356
3357         lnet_net_lock(*cpt_iter);
3358
3359         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
3360                 struct list_head *peers = &peer_table->pt_hash[j];
3361
3362                 list_for_each_entry(lp, peers, lpni_hashlist) {
3363                         if (peer_index-- > 0)
3364                                 continue;
3365
3366                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
3367                         if (lnet_isrouter(lp) ||
3368                                 lnet_peer_aliveness_enabled(lp))
3369                                 snprintf(aliveness, LNET_MAX_STR_LEN,
3370                                          lp->lpni_alive ? "up" : "down");
3371
3372                         *nid = lp->lpni_nid;
3373                         *refcount = atomic_read(&lp->lpni_refcount);
3374                         *ni_peer_tx_credits =
3375                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
3376                         *peer_tx_credits = lp->lpni_txcredits;
3377                         *peer_rtr_credits = lp->lpni_rtrcredits;
3378                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
3379                         *peer_tx_qnob = lp->lpni_txqnob;
3380
3381                         found = true;
3382                 }
3383
3384         }
3385         lnet_net_unlock(*cpt_iter);
3386
3387         *cpt_iter = lncpt;
3388
3389         return found ? 0 : -ENOENT;
3390 }
3391
3392 /* ln_api_mutex is held, which keeps the peer list stable */
3393 int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
3394 {
3395         struct lnet_ioctl_element_stats *lpni_stats;
3396         struct lnet_ioctl_element_msg_stats *lpni_msg_stats;
3397         struct lnet_peer_ni_credit_info *lpni_info;
3398         struct lnet_peer_ni *lpni;
3399         struct lnet_peer *lp;
3400         lnet_nid_t nid;
3401         __u32 size;
3402         int rc;
3403
3404         lp = lnet_find_peer(cfg->prcfg_prim_nid);
3405
3406         if (!lp) {
3407                 rc = -ENOENT;
3408                 goto out;
3409         }
3410
3411         size = sizeof(nid) + sizeof(*lpni_info) + sizeof(*lpni_stats)
3412                 + sizeof(*lpni_msg_stats);
3413         size *= lp->lp_nnis;
3414         if (size > cfg->prcfg_size) {
3415                 cfg->prcfg_size = size;
3416                 rc = -E2BIG;
3417                 goto out_lp_decref;
3418         }
3419
3420         cfg->prcfg_prim_nid = lp->lp_primary_nid;
3421         cfg->prcfg_mr = lnet_peer_is_multi_rail(lp);
3422         cfg->prcfg_cfg_nid = lp->lp_primary_nid;
3423         cfg->prcfg_count = lp->lp_nnis;
3424         cfg->prcfg_size = size;
3425         cfg->prcfg_state = lp->lp_state;
3426
3427         /* Allocate helper buffers. */
3428         rc = -ENOMEM;
3429         LIBCFS_ALLOC(lpni_info, sizeof(*lpni_info));
3430         if (!lpni_info)
3431                 goto out_lp_decref;
3432         LIBCFS_ALLOC(lpni_stats, sizeof(*lpni_stats));
3433         if (!lpni_stats)
3434                 goto out_free_info;
3435         LIBCFS_ALLOC(lpni_msg_stats, sizeof(*lpni_msg_stats));
3436         if (!lpni_msg_stats)
3437                 goto out_free_stats;
3438
3439
3440         lpni = NULL;
3441         rc = -EFAULT;
3442         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
3443                 nid = lpni->lpni_nid;
3444                 if (copy_to_user(bulk, &nid, sizeof(nid)))
3445                         goto out_free_msg_stats;
3446                 bulk += sizeof(nid);
3447
3448                 memset(lpni_info, 0, sizeof(*lpni_info));
3449                 snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
3450                 if (lnet_isrouter(lpni) ||
3451                         lnet_peer_aliveness_enabled(lpni))
3452                         snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN,
3453                                 lpni->lpni_alive ? "up" : "down");
3454
3455                 lpni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
3456                 lpni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
3457                         lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
3458                 lpni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
3459                 lpni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
3460                 lpni_info->cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
3461                 lpni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
3462                 lpni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
3463                 if (copy_to_user(bulk, lpni_info, sizeof(*lpni_info)))
3464                         goto out_free_msg_stats;
3465                 bulk += sizeof(*lpni_info);
3466
3467                 memset(lpni_stats, 0, sizeof(*lpni_stats));
3468                 lpni_stats->iel_send_count = lnet_sum_stats(&lpni->lpni_stats,
3469                                                             LNET_STATS_TYPE_SEND);
3470                 lpni_stats->iel_recv_count = lnet_sum_stats(&lpni->lpni_stats,
3471                                                             LNET_STATS_TYPE_RECV);
3472                 lpni_stats->iel_drop_count = lnet_sum_stats(&lpni->lpni_stats,
3473                                                             LNET_STATS_TYPE_DROP);
3474                 if (copy_to_user(bulk, lpni_stats, sizeof(*lpni_stats)))
3475                         goto out_free_msg_stats;
3476                 bulk += sizeof(*lpni_stats);
3477                 lnet_usr_translate_stats(lpni_msg_stats, &lpni->lpni_stats);
3478                 if (copy_to_user(bulk, lpni_msg_stats, sizeof(*lpni_msg_stats)))
3479                         goto out_free_msg_stats;
3480                 bulk += sizeof(*lpni_msg_stats);
3481         }
3482         rc = 0;
3483
3484 out_free_msg_stats:
3485         LIBCFS_FREE(lpni_msg_stats, sizeof(*lpni_msg_stats));
3486 out_free_stats:
3487         LIBCFS_FREE(lpni_stats, sizeof(*lpni_stats));
3488 out_free_info:
3489         LIBCFS_FREE(lpni_info, sizeof(*lpni_info));
3490 out_lp_decref:
3491         lnet_peer_decref_locked(lp);
3492 out:
3493         return rc;
3494 }