Whamcloud - gitweb
f2b081996031aa1dd134b1bc397919b4d2263b90
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <linux/sched.h>
38 #ifdef HAVE_SCHED_HEADERS
39 #include <linux/sched/signal.h>
40 #endif
41 #include <linux/uaccess.h>
42
43 #include <lnet/lib-lnet.h>
44 #include <uapi/linux/lnet/lnet-dlc.h>
45
46 /* Value indicating that recovery needs to re-check a peer immediately. */
47 #define LNET_REDISCOVER_PEER    (1)
48
49 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp);
50
51 static void
52 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
53 {
54         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
55                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
56                 lnet_peer_ni_decref_locked(lpni);
57         }
58 }
59
60 void
61 lnet_peer_net_added(struct lnet_net *net)
62 {
63         struct lnet_peer_ni *lpni, *tmp;
64
65         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
66                                  lpni_on_remote_peer_ni_list) {
67
68                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
69                         lpni->lpni_net = net;
70
71                         spin_lock(&lpni->lpni_lock);
72                         lpni->lpni_txcredits =
73                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
74                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
75                         lpni->lpni_rtrcredits =
76                                 lnet_peer_buffer_credits(lpni->lpni_net);
77                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
78                         spin_unlock(&lpni->lpni_lock);
79
80                         lnet_peer_remove_from_remote_list(lpni);
81                 }
82         }
83 }
84
85 static void
86 lnet_peer_tables_destroy(void)
87 {
88         struct lnet_peer_table  *ptable;
89         struct list_head        *hash;
90         int                     i;
91         int                     j;
92
93         if (!the_lnet.ln_peer_tables)
94                 return;
95
96         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
97                 hash = ptable->pt_hash;
98                 if (!hash) /* not intialized */
99                         break;
100
101                 LASSERT(list_empty(&ptable->pt_zombie_list));
102
103                 ptable->pt_hash = NULL;
104                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
105                         LASSERT(list_empty(&hash[j]));
106
107                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
108         }
109
110         cfs_percpt_free(the_lnet.ln_peer_tables);
111         the_lnet.ln_peer_tables = NULL;
112 }
113
114 int
115 lnet_peer_tables_create(void)
116 {
117         struct lnet_peer_table  *ptable;
118         struct list_head        *hash;
119         int                     i;
120         int                     j;
121
122         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
123                                                    sizeof(*ptable));
124         if (the_lnet.ln_peer_tables == NULL) {
125                 CERROR("Failed to allocate cpu-partition peer tables\n");
126                 return -ENOMEM;
127         }
128
129         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
130                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
131                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
132                 if (hash == NULL) {
133                         CERROR("Failed to create peer hash table\n");
134                         lnet_peer_tables_destroy();
135                         return -ENOMEM;
136                 }
137
138                 spin_lock_init(&ptable->pt_zombie_lock);
139                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
140
141                 INIT_LIST_HEAD(&ptable->pt_peer_list);
142
143                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
144                         INIT_LIST_HEAD(&hash[j]);
145                 ptable->pt_hash = hash; /* sign of initialization */
146         }
147
148         return 0;
149 }
150
151 static struct lnet_peer_ni *
152 lnet_peer_ni_alloc(lnet_nid_t nid)
153 {
154         struct lnet_peer_ni *lpni;
155         struct lnet_net *net;
156         int cpt;
157
158         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
159
160         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
161         if (!lpni)
162                 return NULL;
163
164         INIT_LIST_HEAD(&lpni->lpni_txq);
165         INIT_LIST_HEAD(&lpni->lpni_rtrq);
166         INIT_LIST_HEAD(&lpni->lpni_routes);
167         INIT_LIST_HEAD(&lpni->lpni_hashlist);
168         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
169         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
170
171         spin_lock_init(&lpni->lpni_lock);
172
173         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
174         lpni->lpni_last_alive = ktime_get_seconds(); /* assumes alive */
175         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
176         lpni->lpni_nid = nid;
177         lpni->lpni_cpt = cpt;
178         lnet_set_peer_ni_health_locked(lpni, true);
179
180         net = lnet_get_net_locked(LNET_NIDNET(nid));
181         lpni->lpni_net = net;
182         if (net) {
183                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
184                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
185                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
186                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
187         } else {
188                 /*
189                  * This peer_ni is not on a local network, so we
190                  * cannot add the credits here. In case the net is
191                  * added later, add the peer_ni to the remote peer ni
192                  * list so it can be easily found and revisited.
193                  */
194                 /* FIXME: per-net implementation instead? */
195                 atomic_inc(&lpni->lpni_refcount);
196                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
197                               &the_lnet.ln_remote_peer_ni_list);
198         }
199
200         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
201
202         return lpni;
203 }
204
205 static struct lnet_peer_net *
206 lnet_peer_net_alloc(__u32 net_id)
207 {
208         struct lnet_peer_net *lpn;
209
210         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
211         if (!lpn)
212                 return NULL;
213
214         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
215         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
216         lpn->lpn_net_id = net_id;
217
218         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
219
220         return lpn;
221 }
222
223 void
224 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
225 {
226         struct lnet_peer *lp;
227
228         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
229
230         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
231         LASSERT(list_empty(&lpn->lpn_peer_nis));
232         LASSERT(list_empty(&lpn->lpn_peer_nets));
233         lp = lpn->lpn_peer;
234         lpn->lpn_peer = NULL;
235         LIBCFS_FREE(lpn, sizeof(*lpn));
236
237         lnet_peer_decref_locked(lp);
238 }
239
240 static struct lnet_peer *
241 lnet_peer_alloc(lnet_nid_t nid)
242 {
243         struct lnet_peer *lp;
244
245         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
246         if (!lp)
247                 return NULL;
248
249         INIT_LIST_HEAD(&lp->lp_peer_list);
250         INIT_LIST_HEAD(&lp->lp_peer_nets);
251         INIT_LIST_HEAD(&lp->lp_dc_list);
252         INIT_LIST_HEAD(&lp->lp_dc_pendq);
253         init_waitqueue_head(&lp->lp_dc_waitq);
254         spin_lock_init(&lp->lp_lock);
255         lp->lp_primary_nid = nid;
256         /*
257          * Turn off discovery for loopback peer. If you're creating a peer
258          * for the loopback interface then that was initiated when we
259          * attempted to send a message over the loopback. There is no need
260          * to ever use a different interface when sending messages to
261          * myself.
262          */
263         if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
264                 lp->lp_state = LNET_PEER_NO_DISCOVERY;
265         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
266
267         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
268
269         return lp;
270 }
271
272 void
273 lnet_destroy_peer_locked(struct lnet_peer *lp)
274 {
275         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
276
277         LASSERT(atomic_read(&lp->lp_refcount) == 0);
278         LASSERT(list_empty(&lp->lp_peer_nets));
279         LASSERT(list_empty(&lp->lp_peer_list));
280         LASSERT(list_empty(&lp->lp_dc_list));
281
282         if (lp->lp_data)
283                 lnet_ping_buffer_decref(lp->lp_data);
284
285         /*
286          * if there are messages still on the pending queue, then make
287          * sure to queue them on the ln_msg_resend list so they can be
288          * resent at a later point if the discovery thread is still
289          * running.
290          * If the discovery thread has stopped, then the wakeup will be a
291          * no-op, and it is expected the lnet_shutdown_lndnets() will
292          * eventually be called, which will traverse this list and
293          * finalize the messages on the list.
294          * We can not resend them now because we're holding the cpt lock.
295          * Releasing the lock can cause an inconsistent state
296          */
297         spin_lock(&the_lnet.ln_msg_resend_lock);
298         list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
299         spin_unlock(&the_lnet.ln_msg_resend_lock);
300         wake_up(&the_lnet.ln_dc_waitq);
301
302         LIBCFS_FREE(lp, sizeof(*lp));
303 }
304
305 /*
306  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
307  * that peer_net, detach the peer_net from the peer.
308  *
309  * Call with lnet_net_lock/EX held
310  */
311 static void
312 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
313 {
314         struct lnet_peer_table *ptable;
315         struct lnet_peer_net *lpn;
316         struct lnet_peer *lp;
317
318         /*
319          * Belts and suspenders: gracefully handle teardown of a
320          * partially connected peer_ni.
321          */
322         lpn = lpni->lpni_peer_net;
323
324         list_del_init(&lpni->lpni_peer_nis);
325         /*
326          * If there are no lpni's left, we detach lpn from
327          * lp_peer_nets, so it cannot be found anymore.
328          */
329         if (list_empty(&lpn->lpn_peer_nis))
330                 list_del_init(&lpn->lpn_peer_nets);
331
332         /* Update peer NID count. */
333         lp = lpn->lpn_peer;
334         lp->lp_nnis--;
335
336         /*
337          * If there are no more peer nets, make the peer unfindable
338          * via the peer_tables.
339          *
340          * Otherwise, if the peer is DISCOVERED, tell discovery to
341          * take another look at it. This is a no-op if discovery for
342          * this peer did the detaching.
343          */
344         if (list_empty(&lp->lp_peer_nets)) {
345                 list_del_init(&lp->lp_peer_list);
346                 ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
347                 ptable->pt_peers--;
348         } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) {
349                 /* Discovery isn't running, nothing to do here. */
350         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
351                 lnet_peer_queue_for_discovery(lp);
352                 wake_up(&the_lnet.ln_dc_waitq);
353         }
354         CDEBUG(D_NET, "peer %s NID %s\n",
355                 libcfs_nid2str(lp->lp_primary_nid),
356                 libcfs_nid2str(lpni->lpni_nid));
357 }
358
359 /* called with lnet_net_lock LNET_LOCK_EX held */
360 static int
361 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
362 {
363         struct lnet_peer_table *ptable = NULL;
364
365         /* don't remove a peer_ni if it's also a gateway */
366         if (lpni->lpni_rtr_refcount > 0) {
367                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
368                        libcfs_nid2str(lpni->lpni_nid));
369                 return -EBUSY;
370         }
371
372         lnet_peer_remove_from_remote_list(lpni);
373
374         /* remove peer ni from the hash list. */
375         list_del_init(&lpni->lpni_hashlist);
376
377         /* decrement the ref count on the peer table */
378         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
379         LASSERT(ptable->pt_number > 0);
380         ptable->pt_number--;
381
382         /*
383          * The peer_ni can no longer be found with a lookup. But there
384          * can be current users, so keep track of it on the zombie
385          * list until the reference count has gone to zero.
386          *
387          * The last reference may be lost in a place where the
388          * lnet_net_lock locks only a single cpt, and that cpt may not
389          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
390          * has its own lock.
391          */
392         spin_lock(&ptable->pt_zombie_lock);
393         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
394         ptable->pt_zombies++;
395         spin_unlock(&ptable->pt_zombie_lock);
396
397         /* no need to keep this peer_ni on the hierarchy anymore */
398         lnet_peer_detach_peer_ni_locked(lpni);
399
400         /* remove hashlist reference on peer_ni */
401         lnet_peer_ni_decref_locked(lpni);
402
403         return 0;
404 }
405
406 void lnet_peer_uninit(void)
407 {
408         struct lnet_peer_ni *lpni, *tmp;
409
410         lnet_net_lock(LNET_LOCK_EX);
411
412         /* remove all peer_nis from the remote peer and the hash list */
413         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
414                                  lpni_on_remote_peer_ni_list)
415                 lnet_peer_ni_del_locked(lpni);
416
417         lnet_peer_tables_destroy();
418
419         lnet_net_unlock(LNET_LOCK_EX);
420 }
421
422 static int
423 lnet_peer_del_locked(struct lnet_peer *peer)
424 {
425         struct lnet_peer_ni *lpni = NULL, *lpni2;
426         int rc = 0, rc2 = 0;
427
428         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
429
430         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
431         while (lpni != NULL) {
432                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
433                 rc = lnet_peer_ni_del_locked(lpni);
434                 if (rc != 0)
435                         rc2 = rc;
436                 lpni = lpni2;
437         }
438
439         return rc2;
440 }
441
442 static int
443 lnet_peer_del(struct lnet_peer *peer)
444 {
445         lnet_net_lock(LNET_LOCK_EX);
446         lnet_peer_del_locked(peer);
447         lnet_net_unlock(LNET_LOCK_EX);
448
449         return 0;
450 }
451
452 /*
453  * Delete a NID from a peer. Call with ln_api_mutex held.
454  *
455  * Error codes:
456  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
457  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
458  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
459  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
460  */
461 static int
462 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
463 {
464         struct lnet_peer_ni *lpni;
465         lnet_nid_t primary_nid = lp->lp_primary_nid;
466         int rc = 0;
467
468         if (!(flags & LNET_PEER_CONFIGURED)) {
469                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
470                         rc = -EPERM;
471                         goto out;
472                 }
473         }
474         lpni = lnet_find_peer_ni_locked(nid);
475         if (!lpni) {
476                 rc = -ENOENT;
477                 goto out;
478         }
479         lnet_peer_ni_decref_locked(lpni);
480         if (lp != lpni->lpni_peer_net->lpn_peer) {
481                 rc = -ECHILD;
482                 goto out;
483         }
484
485         /*
486          * This function only allows deletion of the primary NID if it
487          * is the only NID.
488          */
489         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
490                 rc = -EBUSY;
491                 goto out;
492         }
493
494         lnet_net_lock(LNET_LOCK_EX);
495         lnet_peer_ni_del_locked(lpni);
496         lnet_net_unlock(LNET_LOCK_EX);
497
498 out:
499         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
500                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
501
502         return rc;
503 }
504
505 static void
506 lnet_peer_table_cleanup_locked(struct lnet_net *net,
507                                struct lnet_peer_table *ptable)
508 {
509         int                      i;
510         struct lnet_peer_ni     *next;
511         struct lnet_peer_ni     *lpni;
512         struct lnet_peer        *peer;
513
514         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
515                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
516                                          lpni_hashlist) {
517                         if (net != NULL && net != lpni->lpni_net)
518                                 continue;
519
520                         peer = lpni->lpni_peer_net->lpn_peer;
521                         if (peer->lp_primary_nid != lpni->lpni_nid) {
522                                 lnet_peer_ni_del_locked(lpni);
523                                 continue;
524                         }
525                         /*
526                          * Removing the primary NID implies removing
527                          * the entire peer. Advance next beyond any
528                          * peer_ni that belongs to the same peer.
529                          */
530                         list_for_each_entry_from(next, &ptable->pt_hash[i],
531                                                  lpni_hashlist) {
532                                 if (next->lpni_peer_net->lpn_peer != peer)
533                                         break;
534                         }
535                         lnet_peer_del_locked(peer);
536                 }
537         }
538 }
539
540 static void
541 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
542 {
543         int     i = 3;
544
545         spin_lock(&ptable->pt_zombie_lock);
546         while (ptable->pt_zombies) {
547                 spin_unlock(&ptable->pt_zombie_lock);
548
549                 if (is_power_of_2(i)) {
550                         CDEBUG(D_WARNING,
551                                "Waiting for %d zombies on peer table\n",
552                                ptable->pt_zombies);
553                 }
554                 set_current_state(TASK_UNINTERRUPTIBLE);
555                 schedule_timeout(cfs_time_seconds(1) >> 1);
556                 spin_lock(&ptable->pt_zombie_lock);
557         }
558         spin_unlock(&ptable->pt_zombie_lock);
559 }
560
561 static void
562 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
563                                 struct lnet_peer_table *ptable)
564 {
565         struct lnet_peer_ni     *lp;
566         struct lnet_peer_ni     *tmp;
567         lnet_nid_t              lpni_nid;
568         int                     i;
569
570         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
571                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
572                                          lpni_hashlist) {
573                         if (net != lp->lpni_net)
574                                 continue;
575
576                         if (lp->lpni_rtr_refcount == 0)
577                                 continue;
578
579                         lpni_nid = lp->lpni_nid;
580
581                         lnet_net_unlock(LNET_LOCK_EX);
582                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
583                         lnet_net_lock(LNET_LOCK_EX);
584                 }
585         }
586 }
587
588 void
589 lnet_peer_tables_cleanup(struct lnet_net *net)
590 {
591         int i;
592         struct lnet_peer_table *ptable;
593
594         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
595         /* If just deleting the peers for a NI, get rid of any routes these
596          * peers are gateways for. */
597         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
598                 lnet_net_lock(LNET_LOCK_EX);
599                 lnet_peer_table_del_rtrs_locked(net, ptable);
600                 lnet_net_unlock(LNET_LOCK_EX);
601         }
602
603         /* Start the cleanup process */
604         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
605                 lnet_net_lock(LNET_LOCK_EX);
606                 lnet_peer_table_cleanup_locked(net, ptable);
607                 lnet_net_unlock(LNET_LOCK_EX);
608         }
609
610         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
611                 lnet_peer_ni_finalize_wait(ptable);
612 }
613
614 static struct lnet_peer_ni *
615 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
616 {
617         struct list_head        *peers;
618         struct lnet_peer_ni     *lp;
619
620         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
621
622         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
623         list_for_each_entry(lp, peers, lpni_hashlist) {
624                 if (lp->lpni_nid == nid) {
625                         lnet_peer_ni_addref_locked(lp);
626                         return lp;
627                 }
628         }
629
630         return NULL;
631 }
632
633 struct lnet_peer_ni *
634 lnet_find_peer_ni_locked(lnet_nid_t nid)
635 {
636         struct lnet_peer_ni *lpni;
637         struct lnet_peer_table *ptable;
638         int cpt;
639
640         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
641
642         ptable = the_lnet.ln_peer_tables[cpt];
643         lpni = lnet_get_peer_ni_locked(ptable, nid);
644
645         return lpni;
646 }
647
648 struct lnet_peer *
649 lnet_find_peer(lnet_nid_t nid)
650 {
651         struct lnet_peer_ni *lpni;
652         struct lnet_peer *lp = NULL;
653         int cpt;
654
655         cpt = lnet_net_lock_current();
656         lpni = lnet_find_peer_ni_locked(nid);
657         if (lpni) {
658                 lp = lpni->lpni_peer_net->lpn_peer;
659                 lnet_peer_addref_locked(lp);
660                 lnet_peer_ni_decref_locked(lpni);
661         }
662         lnet_net_unlock(cpt);
663
664         return lp;
665 }
666
667 struct lnet_peer_ni *
668 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
669                              struct lnet_peer_net *peer_net,
670                              struct lnet_peer_ni *prev)
671 {
672         struct lnet_peer_ni *lpni;
673         struct lnet_peer_net *net = peer_net;
674
675         if (!prev) {
676                 if (!net) {
677                         if (list_empty(&peer->lp_peer_nets))
678                                 return NULL;
679
680                         net = list_entry(peer->lp_peer_nets.next,
681                                          struct lnet_peer_net,
682                                          lpn_peer_nets);
683                 }
684                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
685                                   lpni_peer_nis);
686
687                 return lpni;
688         }
689
690         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
691                 /*
692                  * if you reached the end of the peer ni list and the peer
693                  * net is specified then there are no more peer nis in that
694                  * net.
695                  */
696                 if (net)
697                         return NULL;
698
699                 /*
700                  * we reached the end of this net ni list. move to the
701                  * next net
702                  */
703                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
704                     &peer->lp_peer_nets)
705                         /* no more nets and no more NIs. */
706                         return NULL;
707
708                 /* get the next net */
709                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
710                                  struct lnet_peer_net,
711                                  lpn_peer_nets);
712                 /* get the ni on it */
713                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
714                                   lpni_peer_nis);
715
716                 return lpni;
717         }
718
719         /* there are more nis left */
720         lpni = list_entry(prev->lpni_peer_nis.next,
721                           struct lnet_peer_ni, lpni_peer_nis);
722
723         return lpni;
724 }
725
726 /* Call with the ln_api_mutex held */
727 int lnet_get_peer_list(u32 *countp, u32 *sizep, struct lnet_process_id __user *ids)
728 {
729         struct lnet_process_id id;
730         struct lnet_peer_table *ptable;
731         struct lnet_peer *lp;
732         __u32 count = 0;
733         __u32 size = 0;
734         int lncpt;
735         int cpt;
736         __u32 i;
737         int rc;
738
739         rc = -ESHUTDOWN;
740         if (the_lnet.ln_state != LNET_STATE_RUNNING)
741                 goto done;
742
743         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
744
745         /*
746          * Count the number of peers, and return E2BIG if the buffer
747          * is too small. We'll also return the desired size.
748          */
749         rc = -E2BIG;
750         for (cpt = 0; cpt < lncpt; cpt++) {
751                 ptable = the_lnet.ln_peer_tables[cpt];
752                 count += ptable->pt_peers;
753         }
754         size = count * sizeof(*ids);
755         if (size > *sizep)
756                 goto done;
757
758         /*
759          * Walk the peer lists and copy out the primary nids.
760          * This is safe because the peer lists are only modified
761          * while the ln_api_mutex is held. So we don't need to
762          * hold the lnet_net_lock as well, and can therefore
763          * directly call copy_to_user().
764          */
765         rc = -EFAULT;
766         memset(&id, 0, sizeof(id));
767         id.pid = LNET_PID_LUSTRE;
768         i = 0;
769         for (cpt = 0; cpt < lncpt; cpt++) {
770                 ptable = the_lnet.ln_peer_tables[cpt];
771                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
772                         if (i >= count)
773                                 goto done;
774                         id.nid = lp->lp_primary_nid;
775                         if (copy_to_user(&ids[i], &id, sizeof(id)))
776                                 goto done;
777                         i++;
778                 }
779         }
780         rc = 0;
781 done:
782         *countp = count;
783         *sizep = size;
784         return rc;
785 }
786
787 /*
788  * Start pushes to peers that need to be updated for a configuration
789  * change on this node.
790  */
791 void
792 lnet_push_update_to_peers(int force)
793 {
794         struct lnet_peer_table *ptable;
795         struct lnet_peer *lp;
796         int lncpt;
797         int cpt;
798
799         lnet_net_lock(LNET_LOCK_EX);
800         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
801         for (cpt = 0; cpt < lncpt; cpt++) {
802                 ptable = the_lnet.ln_peer_tables[cpt];
803                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
804                         if (force) {
805                                 spin_lock(&lp->lp_lock);
806                                 if (lp->lp_state & LNET_PEER_MULTI_RAIL)
807                                         lp->lp_state |= LNET_PEER_FORCE_PUSH;
808                                 spin_unlock(&lp->lp_lock);
809                         }
810                         if (lnet_peer_needs_push(lp))
811                                 lnet_peer_queue_for_discovery(lp);
812                 }
813         }
814         lnet_net_unlock(LNET_LOCK_EX);
815         wake_up(&the_lnet.ln_dc_waitq);
816 }
817
818 /*
819  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
820  * this is a preferred point-to-point path. Call with lnet_net_lock in
821  * shared mmode.
822  */
823 bool
824 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
825 {
826         int i;
827
828         if (lpni->lpni_pref_nnids == 0)
829                 return false;
830         if (lpni->lpni_pref_nnids == 1)
831                 return lpni->lpni_pref.nid == nid;
832         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
833                 if (lpni->lpni_pref.nids[i] == nid)
834                         return true;
835         }
836         return false;
837 }
838
839 /*
840  * Set a single ni as preferred, provided no preferred ni is already
841  * defined. Only to be used for non-multi-rail peer_ni.
842  */
843 int
844 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
845 {
846         int rc = 0;
847
848         spin_lock(&lpni->lpni_lock);
849         if (nid == LNET_NID_ANY) {
850                 rc = -EINVAL;
851         } else if (lpni->lpni_pref_nnids > 0) {
852                 rc = -EPERM;
853         } else if (lpni->lpni_pref_nnids == 0) {
854                 lpni->lpni_pref.nid = nid;
855                 lpni->lpni_pref_nnids = 1;
856                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
857         }
858         spin_unlock(&lpni->lpni_lock);
859
860         CDEBUG(D_NET, "peer %s nid %s: %d\n",
861                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
862         return rc;
863 }
864
865 /*
866  * Clear the preferred NID from a non-multi-rail peer_ni, provided
867  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
868  */
869 int
870 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
871 {
872         int rc = 0;
873
874         spin_lock(&lpni->lpni_lock);
875         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
876                 lpni->lpni_pref_nnids = 0;
877                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
878         } else if (lpni->lpni_pref_nnids == 0) {
879                 rc = -ENOENT;
880         } else {
881                 rc = -EPERM;
882         }
883         spin_unlock(&lpni->lpni_lock);
884
885         CDEBUG(D_NET, "peer %s: %d\n",
886                libcfs_nid2str(lpni->lpni_nid), rc);
887         return rc;
888 }
889
890 /*
891  * Clear the preferred NIDs from a non-multi-rail peer.
892  */
893 void
894 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
895 {
896         struct lnet_peer_ni *lpni = NULL;
897
898         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
899                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
900 }
901
902 int
903 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
904 {
905         lnet_nid_t *nids = NULL;
906         lnet_nid_t *oldnids = NULL;
907         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
908         int size;
909         int i;
910         int rc = 0;
911
912         if (nid == LNET_NID_ANY) {
913                 rc = -EINVAL;
914                 goto out;
915         }
916
917         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
918                 rc = -EEXIST;
919                 goto out;
920         }
921
922         /* A non-MR node may have only one preferred NI per peer_ni */
923         if (lpni->lpni_pref_nnids > 0) {
924                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
925                         rc = -EPERM;
926                         goto out;
927                 }
928         }
929
930         if (lpni->lpni_pref_nnids != 0) {
931                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
932                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
933                 if (!nids) {
934                         rc = -ENOMEM;
935                         goto out;
936                 }
937                 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
938                         if (lpni->lpni_pref.nids[i] == nid) {
939                                 LIBCFS_FREE(nids, size);
940                                 rc = -EEXIST;
941                                 goto out;
942                         }
943                         nids[i] = lpni->lpni_pref.nids[i];
944                 }
945                 nids[i] = nid;
946         }
947
948         lnet_net_lock(LNET_LOCK_EX);
949         spin_lock(&lpni->lpni_lock);
950         if (lpni->lpni_pref_nnids == 0) {
951                 lpni->lpni_pref.nid = nid;
952         } else {
953                 oldnids = lpni->lpni_pref.nids;
954                 lpni->lpni_pref.nids = nids;
955         }
956         lpni->lpni_pref_nnids++;
957         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
958         spin_unlock(&lpni->lpni_lock);
959         lnet_net_unlock(LNET_LOCK_EX);
960
961         if (oldnids) {
962                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
963                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
964         }
965 out:
966         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
967                 spin_lock(&lpni->lpni_lock);
968                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
969                 spin_unlock(&lpni->lpni_lock);
970         }
971         CDEBUG(D_NET, "peer %s nid %s: %d\n",
972                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
973         return rc;
974 }
975
976 int
977 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
978 {
979         lnet_nid_t *nids = NULL;
980         lnet_nid_t *oldnids = NULL;
981         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
982         int size;
983         int i, j;
984         int rc = 0;
985
986         if (lpni->lpni_pref_nnids == 0) {
987                 rc = -ENOENT;
988                 goto out;
989         }
990
991         if (lpni->lpni_pref_nnids == 1) {
992                 if (lpni->lpni_pref.nid != nid) {
993                         rc = -ENOENT;
994                         goto out;
995                 }
996         } else if (lpni->lpni_pref_nnids == 2) {
997                 if (lpni->lpni_pref.nids[0] != nid &&
998                     lpni->lpni_pref.nids[1] != nid) {
999                         rc = -ENOENT;
1000                         goto out;
1001                 }
1002         } else {
1003                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
1004                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
1005                 if (!nids) {
1006                         rc = -ENOMEM;
1007                         goto out;
1008                 }
1009                 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
1010                         if (lpni->lpni_pref.nids[i] != nid)
1011                                 continue;
1012                         nids[j++] = lpni->lpni_pref.nids[i];
1013                 }
1014                 /* Check if we actually removed a nid. */
1015                 if (j == lpni->lpni_pref_nnids) {
1016                         LIBCFS_FREE(nids, size);
1017                         rc = -ENOENT;
1018                         goto out;
1019                 }
1020         }
1021
1022         lnet_net_lock(LNET_LOCK_EX);
1023         spin_lock(&lpni->lpni_lock);
1024         if (lpni->lpni_pref_nnids == 1) {
1025                 lpni->lpni_pref.nid = LNET_NID_ANY;
1026         } else if (lpni->lpni_pref_nnids == 2) {
1027                 oldnids = lpni->lpni_pref.nids;
1028                 if (oldnids[0] == nid)
1029                         lpni->lpni_pref.nid = oldnids[1];
1030                 else
1031                         lpni->lpni_pref.nid = oldnids[2];
1032         } else {
1033                 oldnids = lpni->lpni_pref.nids;
1034                 lpni->lpni_pref.nids = nids;
1035         }
1036         lpni->lpni_pref_nnids--;
1037         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1038         spin_unlock(&lpni->lpni_lock);
1039         lnet_net_unlock(LNET_LOCK_EX);
1040
1041         if (oldnids) {
1042                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
1043                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
1044         }
1045 out:
1046         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1047                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
1048         return rc;
1049 }
1050
1051 lnet_nid_t
1052 lnet_peer_primary_nid_locked(lnet_nid_t nid)
1053 {
1054         struct lnet_peer_ni *lpni;
1055         lnet_nid_t primary_nid = nid;
1056
1057         lpni = lnet_find_peer_ni_locked(nid);
1058         if (lpni) {
1059                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
1060                 lnet_peer_ni_decref_locked(lpni);
1061         }
1062
1063         return primary_nid;
1064 }
1065
1066 lnet_nid_t
1067 LNetPrimaryNID(lnet_nid_t nid)
1068 {
1069         struct lnet_peer *lp;
1070         struct lnet_peer_ni *lpni;
1071         lnet_nid_t primary_nid = nid;
1072         int rc = 0;
1073         int cpt;
1074
1075         cpt = lnet_net_lock_current();
1076         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1077         if (IS_ERR(lpni)) {
1078                 rc = PTR_ERR(lpni);
1079                 goto out_unlock;
1080         }
1081         lp = lpni->lpni_peer_net->lpn_peer;
1082         while (!lnet_peer_is_uptodate(lp)) {
1083                 rc = lnet_discover_peer_locked(lpni, cpt, true);
1084                 if (rc)
1085                         goto out_decref;
1086                 lp = lpni->lpni_peer_net->lpn_peer;
1087         }
1088         primary_nid = lp->lp_primary_nid;
1089 out_decref:
1090         lnet_peer_ni_decref_locked(lpni);
1091 out_unlock:
1092         lnet_net_unlock(cpt);
1093
1094         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
1095                libcfs_nid2str(primary_nid), rc);
1096         return primary_nid;
1097 }
1098 EXPORT_SYMBOL(LNetPrimaryNID);
1099
1100 struct lnet_peer_net *
1101 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
1102 {
1103         struct lnet_peer_net *peer_net;
1104         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
1105                 if (peer_net->lpn_net_id == net_id)
1106                         return peer_net;
1107         }
1108         return NULL;
1109 }
1110
1111 /*
1112  * Attach a peer_ni to a peer_net and peer. This function assumes
1113  * peer_ni is not already attached to the peer_net/peer. The peer_ni
1114  * may be attached to a different peer, in which case it will be
1115  * properly detached first. The whole operation is done atomically.
1116  *
1117  * Always returns 0.  This is the last function called from functions
1118  * that do return an int, so returning 0 here allows the compiler to
1119  * do a tail call.
1120  */
1121 static int
1122 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
1123                                 struct lnet_peer_net *lpn,
1124                                 struct lnet_peer_ni *lpni,
1125                                 unsigned flags)
1126 {
1127         struct lnet_peer_table *ptable;
1128
1129         /* Install the new peer_ni */
1130         lnet_net_lock(LNET_LOCK_EX);
1131         /* Add peer_ni to global peer table hash, if necessary. */
1132         if (list_empty(&lpni->lpni_hashlist)) {
1133                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1134
1135                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1136                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1137                 ptable->pt_version++;
1138                 ptable->pt_number++;
1139                 /* This is the 1st refcount on lpni. */
1140                 atomic_inc(&lpni->lpni_refcount);
1141         }
1142
1143         /* Detach the peer_ni from an existing peer, if necessary. */
1144         if (lpni->lpni_peer_net) {
1145                 LASSERT(lpni->lpni_peer_net != lpn);
1146                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1147                 lnet_peer_detach_peer_ni_locked(lpni);
1148                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1149                 lpni->lpni_peer_net = NULL;
1150         }
1151
1152         /* Add peer_ni to peer_net */
1153         lpni->lpni_peer_net = lpn;
1154         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1155         lnet_peer_net_addref_locked(lpn);
1156
1157         /* Add peer_net to peer */
1158         if (!lpn->lpn_peer) {
1159                 lpn->lpn_peer = lp;
1160                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1161                 lnet_peer_addref_locked(lp);
1162         }
1163
1164         /* Add peer to global peer list, if necessary */
1165         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1166         if (list_empty(&lp->lp_peer_list)) {
1167                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1168                 ptable->pt_peers++;
1169         }
1170
1171
1172         /* Update peer state */
1173         spin_lock(&lp->lp_lock);
1174         if (flags & LNET_PEER_CONFIGURED) {
1175                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1176                         lp->lp_state |= LNET_PEER_CONFIGURED;
1177         }
1178         if (flags & LNET_PEER_MULTI_RAIL) {
1179                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1180                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1181                         lnet_peer_clr_non_mr_pref_nids(lp);
1182                 }
1183         }
1184         spin_unlock(&lp->lp_lock);
1185
1186         lp->lp_nnis++;
1187         lnet_net_unlock(LNET_LOCK_EX);
1188
1189         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1190                libcfs_nid2str(lp->lp_primary_nid),
1191                libcfs_nid2str(lpni->lpni_nid), flags);
1192
1193         return 0;
1194 }
1195
1196 /*
1197  * Create a new peer, with nid as its primary nid.
1198  *
1199  * Call with the lnet_api_mutex held.
1200  */
1201 static int
1202 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1203 {
1204         struct lnet_peer *lp;
1205         struct lnet_peer_net *lpn;
1206         struct lnet_peer_ni *lpni;
1207         int rc = 0;
1208
1209         LASSERT(nid != LNET_NID_ANY);
1210
1211         /*
1212          * No need for the lnet_net_lock here, because the
1213          * lnet_api_mutex is held.
1214          */
1215         lpni = lnet_find_peer_ni_locked(nid);
1216         if (lpni) {
1217                 /* A peer with this NID already exists. */
1218                 lp = lpni->lpni_peer_net->lpn_peer;
1219                 lnet_peer_ni_decref_locked(lpni);
1220                 /*
1221                  * This is an error if the peer was configured and the
1222                  * primary NID differs or an attempt is made to change
1223                  * the Multi-Rail flag. Otherwise the assumption is
1224                  * that an existing peer is being modified.
1225                  */
1226                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1227                         if (lp->lp_primary_nid != nid)
1228                                 rc = -EEXIST;
1229                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1230                                 rc = -EPERM;
1231                         goto out;
1232                 }
1233                 /* Delete and recreate as a configured peer. */
1234                 lnet_peer_del(lp);
1235         }
1236
1237         /* Create peer, peer_net, and peer_ni. */
1238         rc = -ENOMEM;
1239         lp = lnet_peer_alloc(nid);
1240         if (!lp)
1241                 goto out;
1242         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1243         if (!lpn)
1244                 goto out_free_lp;
1245         lpni = lnet_peer_ni_alloc(nid);
1246         if (!lpni)
1247                 goto out_free_lpn;
1248
1249         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1250
1251 out_free_lpn:
1252         LIBCFS_FREE(lpn, sizeof(*lpn));
1253 out_free_lp:
1254         LIBCFS_FREE(lp, sizeof(*lp));
1255 out:
1256         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1257                libcfs_nid2str(nid), flags, rc);
1258         return rc;
1259 }
1260
1261 /*
1262  * Add a NID to a peer. Call with ln_api_mutex held.
1263  *
1264  * Error codes:
1265  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1266  *  -EEXIST:   The NID was configured by DLC for a different peer.
1267  *  -ENOMEM:   Out of memory.
1268  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1269  *             non-multi-rail peer.
1270  */
1271 static int
1272 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1273 {
1274         struct lnet_peer_net *lpn;
1275         struct lnet_peer_ni *lpni;
1276         int rc = 0;
1277
1278         LASSERT(lp);
1279         LASSERT(nid != LNET_NID_ANY);
1280
1281         /* A configured peer can only be updated through configuration. */
1282         if (!(flags & LNET_PEER_CONFIGURED)) {
1283                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1284                         rc = -EPERM;
1285                         goto out;
1286                 }
1287         }
1288
1289         /*
1290          * The MULTI_RAIL flag can be set but not cleared, because
1291          * that would leave the peer struct in an invalid state.
1292          */
1293         if (flags & LNET_PEER_MULTI_RAIL) {
1294                 spin_lock(&lp->lp_lock);
1295                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1296                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1297                         lnet_peer_clr_non_mr_pref_nids(lp);
1298                 }
1299                 spin_unlock(&lp->lp_lock);
1300         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1301                 rc = -EPERM;
1302                 goto out;
1303         }
1304
1305         lpni = lnet_find_peer_ni_locked(nid);
1306         if (lpni) {
1307                 /*
1308                  * A peer_ni already exists. This is only a problem if
1309                  * it is not connected to this peer and was configured
1310                  * by DLC.
1311                  */
1312                 lnet_peer_ni_decref_locked(lpni);
1313                 if (lpni->lpni_peer_net->lpn_peer == lp)
1314                         goto out;
1315                 if (lnet_peer_ni_is_configured(lpni)) {
1316                         rc = -EEXIST;
1317                         goto out;
1318                 }
1319                 /* If this is the primary NID, destroy the peer. */
1320                 if (lnet_peer_ni_is_primary(lpni)) {
1321                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1322                         lpni = lnet_peer_ni_alloc(nid);
1323                         if (!lpni) {
1324                                 rc = -ENOMEM;
1325                                 goto out;
1326                         }
1327                 }
1328         } else {
1329                 lpni = lnet_peer_ni_alloc(nid);
1330                 if (!lpni) {
1331                         rc = -ENOMEM;
1332                         goto out;
1333                 }
1334         }
1335
1336         /*
1337          * Get the peer_net. Check that we're not adding a second
1338          * peer_ni on a peer_net of a non-multi-rail peer.
1339          */
1340         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1341         if (!lpn) {
1342                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1343                 if (!lpn) {
1344                         rc = -ENOMEM;
1345                         goto out_free_lpni;
1346                 }
1347         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1348                 rc = -ENOTUNIQ;
1349                 goto out_free_lpni;
1350         }
1351
1352         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1353
1354 out_free_lpni:
1355         /* If the peer_ni was allocated above its peer_net pointer is NULL */
1356         if (!lpni->lpni_peer_net)
1357                 LIBCFS_FREE(lpni, sizeof(*lpni));
1358 out:
1359         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1360                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1361                flags, rc);
1362         return rc;
1363 }
1364
1365 /*
1366  * Update the primary NID of a peer, if possible.
1367  *
1368  * Call with the lnet_api_mutex held.
1369  */
1370 static int
1371 lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1372 {
1373         lnet_nid_t old = lp->lp_primary_nid;
1374         int rc = 0;
1375
1376         if (lp->lp_primary_nid == nid)
1377                 goto out;
1378         rc = lnet_peer_add_nid(lp, nid, flags);
1379         if (rc)
1380                 goto out;
1381         lp->lp_primary_nid = nid;
1382 out:
1383         CDEBUG(D_NET, "peer %s NID %s: %d\n",
1384                libcfs_nid2str(old), libcfs_nid2str(nid), rc);
1385         return rc;
1386 }
1387
1388 /*
1389  * lpni creation initiated due to traffic either sending or receiving.
1390  */
1391 static int
1392 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1393 {
1394         struct lnet_peer *lp;
1395         struct lnet_peer_net *lpn;
1396         struct lnet_peer_ni *lpni;
1397         unsigned flags = 0;
1398         int rc = 0;
1399
1400         if (nid == LNET_NID_ANY) {
1401                 rc = -EINVAL;
1402                 goto out;
1403         }
1404
1405         /* lnet_net_lock is not needed here because ln_api_lock is held */
1406         lpni = lnet_find_peer_ni_locked(nid);
1407         if (lpni) {
1408                 /*
1409                  * We must have raced with another thread. Since we
1410                  * know next to nothing about a peer_ni created by
1411                  * traffic, we just assume everything is ok and
1412                  * return.
1413                  */
1414                 lnet_peer_ni_decref_locked(lpni);
1415                 goto out;
1416         }
1417
1418         /* Create peer, peer_net, and peer_ni. */
1419         rc = -ENOMEM;
1420         lp = lnet_peer_alloc(nid);
1421         if (!lp)
1422                 goto out;
1423         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1424         if (!lpn)
1425                 goto out_free_lp;
1426         lpni = lnet_peer_ni_alloc(nid);
1427         if (!lpni)
1428                 goto out_free_lpn;
1429         if (pref != LNET_NID_ANY)
1430                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1431
1432         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1433
1434 out_free_lpn:
1435         LIBCFS_FREE(lpn, sizeof(*lpn));
1436 out_free_lp:
1437         LIBCFS_FREE(lp, sizeof(*lp));
1438 out:
1439         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1440         return rc;
1441 }
1442
1443 /*
1444  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1445  *
1446  * This API handles the following combinations:
1447  *   Create a peer with its primary NI if only the prim_nid is provided
1448  *   Add a NID to a peer identified by the prim_nid. The peer identified
1449  *   by the prim_nid must already exist.
1450  *   The peer being created may be non-MR.
1451  *
1452  * The caller must hold ln_api_mutex. This prevents the peer from
1453  * being created/modified/deleted by a different thread.
1454  */
1455 int
1456 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1457 {
1458         struct lnet_peer *lp = NULL;
1459         struct lnet_peer_ni *lpni;
1460         unsigned flags;
1461
1462         /* The prim_nid must always be specified */
1463         if (prim_nid == LNET_NID_ANY)
1464                 return -EINVAL;
1465
1466         flags = LNET_PEER_CONFIGURED;
1467         if (mr)
1468                 flags |= LNET_PEER_MULTI_RAIL;
1469
1470         /*
1471          * If nid isn't specified, we must create a new peer with
1472          * prim_nid as its primary nid.
1473          */
1474         if (nid == LNET_NID_ANY)
1475                 return lnet_peer_add(prim_nid, flags);
1476
1477         /* Look up the prim_nid, which must exist. */
1478         lpni = lnet_find_peer_ni_locked(prim_nid);
1479         if (!lpni)
1480                 return -ENOENT;
1481         lnet_peer_ni_decref_locked(lpni);
1482         lp = lpni->lpni_peer_net->lpn_peer;
1483
1484         /* Peer must have been configured. */
1485         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1486                 CDEBUG(D_NET, "peer %s was not configured\n",
1487                        libcfs_nid2str(prim_nid));
1488                 return -ENOENT;
1489         }
1490
1491         /* Primary NID must match */
1492         if (lp->lp_primary_nid != prim_nid) {
1493                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1494                        libcfs_nid2str(prim_nid),
1495                        libcfs_nid2str(lp->lp_primary_nid));
1496                 return -ENODEV;
1497         }
1498
1499         /* Multi-Rail flag must match. */
1500         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1501                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1502                        libcfs_nid2str(prim_nid));
1503                 return -EPERM;
1504         }
1505
1506         return lnet_peer_add_nid(lp, nid, flags);
1507 }
1508
1509 /*
1510  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1511  *
1512  * This API handles the following combinations:
1513  *   Delete a NI from a peer if both prim_nid and nid are provided.
1514  *   Delete a peer if only prim_nid is provided.
1515  *   Delete a peer if its primary nid is provided.
1516  *
1517  * The caller must hold ln_api_mutex. This prevents the peer from
1518  * being modified/deleted by a different thread.
1519  */
1520 int
1521 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1522 {
1523         struct lnet_peer *lp;
1524         struct lnet_peer_ni *lpni;
1525         unsigned flags;
1526
1527         if (prim_nid == LNET_NID_ANY)
1528                 return -EINVAL;
1529
1530         lpni = lnet_find_peer_ni_locked(prim_nid);
1531         if (!lpni)
1532                 return -ENOENT;
1533         lnet_peer_ni_decref_locked(lpni);
1534         lp = lpni->lpni_peer_net->lpn_peer;
1535
1536         if (prim_nid != lp->lp_primary_nid) {
1537                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1538                        libcfs_nid2str(prim_nid),
1539                        libcfs_nid2str(lp->lp_primary_nid));
1540                 return -ENODEV;
1541         }
1542
1543         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1544                 return lnet_peer_del(lp);
1545
1546         flags = LNET_PEER_CONFIGURED;
1547         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1548                 flags |= LNET_PEER_MULTI_RAIL;
1549
1550         return lnet_peer_del_nid(lp, nid, flags);
1551 }
1552
1553 void
1554 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1555 {
1556         struct lnet_peer_table *ptable;
1557         struct lnet_peer_net *lpn;
1558
1559         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1560
1561         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1562         LASSERT(lpni->lpni_rtr_refcount == 0);
1563         LASSERT(list_empty(&lpni->lpni_txq));
1564         LASSERT(lpni->lpni_txqnob == 0);
1565         LASSERT(list_empty(&lpni->lpni_peer_nis));
1566         LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
1567
1568         lpn = lpni->lpni_peer_net;
1569         lpni->lpni_peer_net = NULL;
1570         lpni->lpni_net = NULL;
1571
1572         /* remove the peer ni from the zombie list */
1573         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1574         spin_lock(&ptable->pt_zombie_lock);
1575         list_del_init(&lpni->lpni_hashlist);
1576         ptable->pt_zombies--;
1577         spin_unlock(&ptable->pt_zombie_lock);
1578
1579         if (lpni->lpni_pref_nnids > 1) {
1580                 LIBCFS_FREE(lpni->lpni_pref.nids,
1581                         sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1582         }
1583         LIBCFS_FREE(lpni, sizeof(*lpni));
1584
1585         lnet_peer_net_decref_locked(lpn);
1586 }
1587
1588 struct lnet_peer_ni *
1589 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1590 {
1591         struct lnet_peer_ni *lpni = NULL;
1592         int rc;
1593
1594         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1595                 return ERR_PTR(-ESHUTDOWN);
1596
1597         /*
1598          * find if a peer_ni already exists.
1599          * If so then just return that.
1600          */
1601         lpni = lnet_find_peer_ni_locked(nid);
1602         if (lpni)
1603                 return lpni;
1604
1605         lnet_net_unlock(cpt);
1606
1607         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1608         if (rc) {
1609                 lpni = ERR_PTR(rc);
1610                 goto out_net_relock;
1611         }
1612
1613         lpni = lnet_find_peer_ni_locked(nid);
1614         LASSERT(lpni);
1615
1616 out_net_relock:
1617         lnet_net_lock(cpt);
1618
1619         return lpni;
1620 }
1621
1622 /*
1623  * Get a peer_ni for the given nid, create it if necessary. Takes a
1624  * hold on the peer_ni.
1625  */
1626 struct lnet_peer_ni *
1627 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1628 {
1629         struct lnet_peer_ni *lpni = NULL;
1630         int rc;
1631
1632         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1633                 return ERR_PTR(-ESHUTDOWN);
1634
1635         /*
1636          * find if a peer_ni already exists.
1637          * If so then just return that.
1638          */
1639         lpni = lnet_find_peer_ni_locked(nid);
1640         if (lpni)
1641                 return lpni;
1642
1643         /*
1644          * Slow path:
1645          * use the lnet_api_mutex to serialize the creation of the peer_ni
1646          * and the creation/deletion of the local ni/net. When a local ni is
1647          * created, if there exists a set of peer_nis on that network,
1648          * they need to be traversed and updated. When a local NI is
1649          * deleted, which could result in a network being deleted, then
1650          * all peer nis on that network need to be removed as well.
1651          *
1652          * Creation through traffic should also be serialized with
1653          * creation through DLC.
1654          */
1655         lnet_net_unlock(cpt);
1656         mutex_lock(&the_lnet.ln_api_mutex);
1657         /*
1658          * Shutdown is only set under the ln_api_lock, so a single
1659          * check here is sufficent.
1660          */
1661         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1662                 lpni = ERR_PTR(-ESHUTDOWN);
1663                 goto out_mutex_unlock;
1664         }
1665
1666         rc = lnet_peer_ni_traffic_add(nid, pref);
1667         if (rc) {
1668                 lpni = ERR_PTR(rc);
1669                 goto out_mutex_unlock;
1670         }
1671
1672         lpni = lnet_find_peer_ni_locked(nid);
1673         LASSERT(lpni);
1674
1675 out_mutex_unlock:
1676         mutex_unlock(&the_lnet.ln_api_mutex);
1677         lnet_net_lock(cpt);
1678
1679         /* Lock has been dropped, check again for shutdown. */
1680         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1681                 if (!IS_ERR(lpni))
1682                         lnet_peer_ni_decref_locked(lpni);
1683                 lpni = ERR_PTR(-ESHUTDOWN);
1684         }
1685
1686         return lpni;
1687 }
1688
1689 /*
1690  * Peer Discovery
1691  */
1692
1693 /*
1694  * Is a peer uptodate from the point of view of discovery?
1695  *
1696  * If it is currently being processed, obviously not.
1697  * A forced Ping or Push is also handled by the discovery thread.
1698  *
1699  * Otherwise look at whether the peer needs rediscovering.
1700  */
1701 bool
1702 lnet_peer_is_uptodate(struct lnet_peer *lp)
1703 {
1704         bool rc;
1705
1706         spin_lock(&lp->lp_lock);
1707         if (lp->lp_state & (LNET_PEER_DISCOVERING |
1708                             LNET_PEER_FORCE_PING |
1709                             LNET_PEER_FORCE_PUSH)) {
1710                 rc = false;
1711         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1712                 rc = true;
1713         } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
1714                 if (lnet_peer_discovery_disabled)
1715                         rc = true;
1716                 else
1717                         rc = false;
1718         } else if (lnet_peer_needs_push(lp)) {
1719                 rc = false;
1720         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1721                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1722                         rc = true;
1723                 else
1724                         rc = false;
1725         } else {
1726                 rc = false;
1727         }
1728         spin_unlock(&lp->lp_lock);
1729
1730         return rc;
1731 }
1732
1733 /*
1734  * Queue a peer for the attention of the discovery thread.  Call with
1735  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1736  * -EALREADY if the peer was already queued.
1737  */
1738 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1739 {
1740         int rc;
1741
1742         spin_lock(&lp->lp_lock);
1743         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1744                 lp->lp_state |= LNET_PEER_DISCOVERING;
1745         spin_unlock(&lp->lp_lock);
1746         if (list_empty(&lp->lp_dc_list)) {
1747                 lnet_peer_addref_locked(lp);
1748                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1749                 wake_up(&the_lnet.ln_dc_waitq);
1750                 rc = 0;
1751         } else {
1752                 rc = -EALREADY;
1753         }
1754
1755         CDEBUG(D_NET, "Queue peer %s: %d\n",
1756                libcfs_nid2str(lp->lp_primary_nid), rc);
1757
1758         return rc;
1759 }
1760
1761 /*
1762  * Discovery of a peer is complete. Wake all waiters on the peer.
1763  * Call with lnet_net_lock/EX held.
1764  */
1765 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1766 {
1767         struct lnet_msg *msg, *tmp;
1768         int rc = 0;
1769         struct list_head pending_msgs;
1770
1771         INIT_LIST_HEAD(&pending_msgs);
1772
1773         CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n",
1774                libcfs_nid2str(lp->lp_primary_nid));
1775
1776         list_del_init(&lp->lp_dc_list);
1777         list_splice_init(&lp->lp_dc_pendq, &pending_msgs);
1778         wake_up_all(&lp->lp_dc_waitq);
1779
1780         lnet_net_unlock(LNET_LOCK_EX);
1781
1782         /* iterate through all pending messages and send them again */
1783         list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
1784                 list_del_init(&msg->msg_list);
1785                 if (lp->lp_dc_error) {
1786                         lnet_finalize(msg, lp->lp_dc_error);
1787                         continue;
1788                 }
1789
1790                 CDEBUG(D_NET, "sending pending message %s to target %s\n",
1791                        lnet_msgtyp2str(msg->msg_type),
1792                        libcfs_id2str(msg->msg_target));
1793                 rc = lnet_send(msg->msg_src_nid_param, msg,
1794                                msg->msg_rtr_nid_param);
1795                 if (rc < 0) {
1796                         CNETERR("Error sending %s to %s: %d\n",
1797                                lnet_msgtyp2str(msg->msg_type),
1798                                libcfs_id2str(msg->msg_target), rc);
1799                         lnet_finalize(msg, rc);
1800                 }
1801         }
1802         lnet_net_lock(LNET_LOCK_EX);
1803         lnet_peer_decref_locked(lp);
1804 }
1805
1806 /*
1807  * Handle inbound push.
1808  * Like any event handler, called with lnet_res_lock/CPT held.
1809  */
1810 void lnet_peer_push_event(struct lnet_event *ev)
1811 {
1812         struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
1813         struct lnet_peer *lp;
1814
1815         /* lnet_find_peer() adds a refcount */
1816         lp = lnet_find_peer(ev->source.nid);
1817         if (!lp) {
1818                 CDEBUG(D_NET, "Push Put from unknown %s (source %s). Ignoring...\n",
1819                        libcfs_nid2str(ev->initiator.nid),
1820                        libcfs_nid2str(ev->source.nid));
1821                 return;
1822         }
1823
1824         /* Ensure peer state remains consistent while we modify it. */
1825         spin_lock(&lp->lp_lock);
1826
1827         /*
1828          * If some kind of error happened the contents of the message
1829          * cannot be used. Clear the NIDS_UPTODATE and set the
1830          * FORCE_PING flag to trigger a ping.
1831          */
1832         if (ev->status) {
1833                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1834                 lp->lp_state |= LNET_PEER_FORCE_PING;
1835                 CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n",
1836                        ev->status,
1837                        libcfs_nid2str(lp->lp_primary_nid),
1838                        libcfs_nid2str(ev->source.nid));
1839                 goto out;
1840         }
1841
1842         /*
1843          * A push with invalid or corrupted info. Clear the UPTODATE
1844          * flag to trigger a ping.
1845          */
1846         if (lnet_ping_info_validate(&pbuf->pb_info)) {
1847                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1848                 lp->lp_state |= LNET_PEER_FORCE_PING;
1849                 CDEBUG(D_NET, "Corrupted Push from %s\n",
1850                        libcfs_nid2str(lp->lp_primary_nid));
1851                 goto out;
1852         }
1853
1854         /*
1855          * Make sure we'll allocate the correct size ping buffer when
1856          * pinging the peer.
1857          */
1858         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
1859                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
1860
1861         /*
1862          * A non-Multi-Rail peer is not supposed to be capable of
1863          * sending a push.
1864          */
1865         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) {
1866                 CERROR("Push from non-Multi-Rail peer %s dropped\n",
1867                        libcfs_nid2str(lp->lp_primary_nid));
1868                 goto out;
1869         }
1870
1871         /*
1872          * Check the MULTIRAIL flag. Complain if the peer was DLC
1873          * configured without it.
1874          */
1875         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1876                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1877                         CERROR("Push says %s is Multi-Rail, DLC says not\n",
1878                                libcfs_nid2str(lp->lp_primary_nid));
1879                 } else {
1880                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1881                         lnet_peer_clr_non_mr_pref_nids(lp);
1882                 }
1883         }
1884
1885         /*
1886          * The peer may have discovery disabled at its end. Set
1887          * NO_DISCOVERY as appropriate.
1888          */
1889         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
1890                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
1891                        libcfs_nid2str(lp->lp_primary_nid));
1892                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
1893         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1894                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
1895                        libcfs_nid2str(lp->lp_primary_nid));
1896                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
1897         }
1898
1899         /*
1900          * Check for truncation of the Put message. Clear the
1901          * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping,
1902          * and tell discovery to allocate a bigger buffer.
1903          */
1904         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
1905                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
1906                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
1907                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1908                 lp->lp_state |= LNET_PEER_FORCE_PING;
1909                 CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n",
1910                        libcfs_nid2str(lp->lp_primary_nid),
1911                        pbuf->pb_info.pi_nnis);
1912                 goto out;
1913         }
1914
1915         /*
1916          * Check whether the Put data is stale. Stale data can just be
1917          * dropped.
1918          */
1919         if (pbuf->pb_info.pi_nnis > 1 &&
1920             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid &&
1921             LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
1922                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1923                        libcfs_nid2str(lp->lp_primary_nid),
1924                        LNET_PING_BUFFER_SEQNO(pbuf),
1925                        lp->lp_peer_seqno);
1926                 goto out;
1927         }
1928
1929         /*
1930          * Check whether the Put data is new, in which case we clear
1931          * the UPTODATE flag and prepare to process it.
1932          *
1933          * If the Put data is current, and the peer is UPTODATE then
1934          * we assome everything is all right and drop the data as
1935          * stale.
1936          */
1937         if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) {
1938                 lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
1939                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1940         } else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) {
1941                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1942                        libcfs_nid2str(lp->lp_primary_nid),
1943                        LNET_PING_BUFFER_SEQNO(pbuf),
1944                        lp->lp_peer_seqno);
1945                 goto out;
1946         }
1947
1948         /*
1949          * If there is data present that hasn't been processed yet,
1950          * we'll replace it if the Put contained newer data and it
1951          * fits. We're racing with a Ping or earlier Push in this
1952          * case.
1953          */
1954         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
1955                 if (LNET_PING_BUFFER_SEQNO(pbuf) >
1956                         LNET_PING_BUFFER_SEQNO(lp->lp_data) &&
1957                     pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) {
1958                         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1959                                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1960                         CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n",
1961                               libcfs_nid2str(lp->lp_primary_nid),
1962                               LNET_PING_BUFFER_SEQNO(pbuf),
1963                               LNET_PING_BUFFER_SEQNO(lp->lp_data));
1964                 }
1965                 goto out;
1966         }
1967
1968         /*
1969          * Allocate a buffer to copy the data. On a failure we drop
1970          * the Push and set FORCE_PING to force the discovery
1971          * thread to fix the problem by pinging the peer.
1972          */
1973         lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC);
1974         if (!lp->lp_data) {
1975                 lp->lp_state |= LNET_PEER_FORCE_PING;
1976                 CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n",
1977                        libcfs_nid2str(lp->lp_primary_nid),
1978                        LNET_PING_BUFFER_SEQNO(pbuf));
1979                 goto out;
1980         }
1981
1982         /* Success */
1983         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1984                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1985         lp->lp_state |= LNET_PEER_DATA_PRESENT;
1986         CDEBUG(D_NET, "Received Push %s %u\n",
1987                libcfs_nid2str(lp->lp_primary_nid),
1988                LNET_PING_BUFFER_SEQNO(pbuf));
1989
1990 out:
1991         /*
1992          * Queue the peer for discovery if not done, force it on the request
1993          * queue and wake the discovery thread if the peer was already queued,
1994          * because its status changed.
1995          */
1996         spin_unlock(&lp->lp_lock);
1997         lnet_net_lock(LNET_LOCK_EX);
1998         if (!lnet_peer_is_uptodate(lp) && lnet_peer_queue_for_discovery(lp)) {
1999                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2000                 wake_up(&the_lnet.ln_dc_waitq);
2001         }
2002         /* Drop refcount from lookup */
2003         lnet_peer_decref_locked(lp);
2004         lnet_net_unlock(LNET_LOCK_EX);
2005 }
2006
2007 /*
2008  * Clear the discovery error state, unless we're already discovering
2009  * this peer, in which case the error is current.
2010  */
2011 static void lnet_peer_clear_discovery_error(struct lnet_peer *lp)
2012 {
2013         spin_lock(&lp->lp_lock);
2014         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
2015                 lp->lp_dc_error = 0;
2016         spin_unlock(&lp->lp_lock);
2017 }
2018
2019 /*
2020  * Peer discovery slow path. The ln_api_mutex is held on entry, and
2021  * dropped/retaken within this function. An lnet_peer_ni is passed in
2022  * because discovery could tear down an lnet_peer.
2023  */
2024 int
2025 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block)
2026 {
2027         DEFINE_WAIT(wait);
2028         struct lnet_peer *lp;
2029         int rc = 0;
2030
2031 again:
2032         lnet_net_unlock(cpt);
2033         lnet_net_lock(LNET_LOCK_EX);
2034         lp = lpni->lpni_peer_net->lpn_peer;
2035         lnet_peer_clear_discovery_error(lp);
2036
2037         /*
2038          * We're willing to be interrupted. The lpni can become a
2039          * zombie if we race with DLC, so we must check for that.
2040          */
2041         for (;;) {
2042                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
2043                 if (signal_pending(current))
2044                         break;
2045                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2046                         break;
2047                 if (lp->lp_dc_error)
2048                         break;
2049                 if (lnet_peer_is_uptodate(lp))
2050                         break;
2051                 lnet_peer_queue_for_discovery(lp);
2052                 /*
2053                  * if caller requested a non-blocking operation then
2054                  * return immediately. Once discovery is complete then the
2055                  * peer ref will be decremented and any pending messages
2056                  * that were stopped due to discovery will be transmitted.
2057                  */
2058                 if (!block)
2059                         break;
2060
2061                 lnet_peer_addref_locked(lp);
2062                 lnet_net_unlock(LNET_LOCK_EX);
2063                 schedule();
2064                 finish_wait(&lp->lp_dc_waitq, &wait);
2065                 lnet_net_lock(LNET_LOCK_EX);
2066                 lnet_peer_decref_locked(lp);
2067                 /* Peer may have changed */
2068                 lp = lpni->lpni_peer_net->lpn_peer;
2069         }
2070         finish_wait(&lp->lp_dc_waitq, &wait);
2071
2072         lnet_net_unlock(LNET_LOCK_EX);
2073         lnet_net_lock(cpt);
2074
2075         /*
2076          * If the peer has changed after we've discovered the older peer,
2077          * then we need to discovery the new peer to make sure the
2078          * interface information is up to date
2079          */
2080         if (lp != lpni->lpni_peer_net->lpn_peer)
2081                 goto again;
2082
2083         if (signal_pending(current))
2084                 rc = -EINTR;
2085         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2086                 rc = -ESHUTDOWN;
2087         else if (lp->lp_dc_error)
2088                 rc = lp->lp_dc_error;
2089         else if (!block)
2090                 CDEBUG(D_NET, "non-blocking discovery\n");
2091         else if (!lnet_peer_is_uptodate(lp))
2092                 goto again;
2093
2094         CDEBUG(D_NET, "peer %s NID %s: %d. %s\n",
2095                (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"),
2096                libcfs_nid2str(lpni->lpni_nid), rc,
2097                (!block) ? "pending discovery" : "discovery complete");
2098
2099         return rc;
2100 }
2101
2102 /* Handle an incoming ack for a push. */
2103 static void
2104 lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev)
2105 {
2106         struct lnet_ping_buffer *pbuf;
2107
2108         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2109         spin_lock(&lp->lp_lock);
2110         lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2111         lp->lp_push_error = ev->status;
2112         if (ev->status)
2113                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2114         else
2115                 lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2116         spin_unlock(&lp->lp_lock);
2117
2118         CDEBUG(D_NET, "peer %s ev->status %d\n",
2119                libcfs_nid2str(lp->lp_primary_nid), ev->status);
2120 }
2121
2122 /* Handle a Reply message. This is the reply to a Ping message. */
2123 static void
2124 lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
2125 {
2126         struct lnet_ping_buffer *pbuf;
2127         int rc;
2128
2129         spin_lock(&lp->lp_lock);
2130
2131         /*
2132          * If some kind of error happened the contents of message
2133          * cannot be used. Set PING_FAILED to trigger a retry.
2134          */
2135         if (ev->status) {
2136                 lp->lp_state |= LNET_PEER_PING_FAILED;
2137                 lp->lp_ping_error = ev->status;
2138                 CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n",
2139                        ev->status,
2140                        libcfs_nid2str(lp->lp_primary_nid),
2141                        libcfs_nid2str(ev->source.nid));
2142                 goto out;
2143         }
2144
2145         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2146         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
2147                 lnet_swap_pinginfo(pbuf);
2148
2149         /*
2150          * A reply with invalid or corrupted info. Set PING_FAILED to
2151          * trigger a retry.
2152          */
2153         rc = lnet_ping_info_validate(&pbuf->pb_info);
2154         if (rc) {
2155                 lp->lp_state |= LNET_PEER_PING_FAILED;
2156                 lp->lp_ping_error = 0;
2157                 CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n",
2158                        libcfs_nid2str(lp->lp_primary_nid), rc);
2159                 goto out;
2160         }
2161
2162         /*
2163          * Update the MULTI_RAIL flag based on the reply. If the peer
2164          * was configured with DLC then the setting should match what
2165          * DLC put in.
2166          */
2167         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
2168                 if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2169                         /* Everything's fine */
2170                 } else if (lp->lp_state & LNET_PEER_CONFIGURED) {
2171                         CWARN("Reply says %s is Multi-Rail, DLC says not\n",
2172                               libcfs_nid2str(lp->lp_primary_nid));
2173                 } else {
2174                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
2175                         lnet_peer_clr_non_mr_pref_nids(lp);
2176                 }
2177         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2178                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
2179                         CWARN("DLC says %s is Multi-Rail, Reply says not\n",
2180                               libcfs_nid2str(lp->lp_primary_nid));
2181                 } else {
2182                         CERROR("Multi-Rail state vanished from %s\n",
2183                                libcfs_nid2str(lp->lp_primary_nid));
2184                         lp->lp_state &= ~LNET_PEER_MULTI_RAIL;
2185                 }
2186         }
2187
2188         /*
2189          * Make sure we'll allocate the correct size ping buffer when
2190          * pinging the peer.
2191          */
2192         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
2193                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
2194
2195         /*
2196          * The peer may have discovery disabled at its end. Set
2197          * NO_DISCOVERY as appropriate.
2198          */
2199         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
2200                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
2201                        libcfs_nid2str(lp->lp_primary_nid));
2202                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
2203         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
2204                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
2205                        libcfs_nid2str(lp->lp_primary_nid));
2206                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
2207         }
2208
2209         /*
2210          * Check for truncation of the Reply. Clear PING_SENT and set
2211          * PING_FAILED to trigger a retry.
2212          */
2213         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
2214                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
2215                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
2216                 lp->lp_state |= LNET_PEER_PING_FAILED;
2217                 lp->lp_ping_error = 0;
2218                 CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n",
2219                        libcfs_nid2str(lp->lp_primary_nid),
2220                        pbuf->pb_info.pi_nnis);
2221                 goto out;
2222         }
2223
2224         /*
2225          * Check the sequence numbers in the reply. These are only
2226          * available if the reply came from a Multi-Rail peer.
2227          */
2228         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
2229             pbuf->pb_info.pi_nnis > 1 &&
2230             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
2231                 if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
2232                         CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n",
2233                                 libcfs_nid2str(lp->lp_primary_nid),
2234                                 LNET_PING_BUFFER_SEQNO(pbuf),
2235                                 lp->lp_peer_seqno);
2236                         goto out;
2237                 }
2238
2239                 if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno)
2240                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2241         }
2242
2243         /* We're happy with the state of the data in the buffer. */
2244         CDEBUG(D_NET, "peer %s data present %u\n",
2245                libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno);
2246         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
2247                 lnet_ping_buffer_decref(lp->lp_data);
2248         else
2249                 lp->lp_state |= LNET_PEER_DATA_PRESENT;
2250         lnet_ping_buffer_addref(pbuf);
2251         lp->lp_data = pbuf;
2252 out:
2253         lp->lp_state &= ~LNET_PEER_PING_SENT;
2254         spin_unlock(&lp->lp_lock);
2255 }
2256
2257 /*
2258  * Send event handling. Only matters for error cases, where we clean
2259  * up state on the peer and peer_ni that would otherwise be updated in
2260  * the REPLY event handler for a successful Ping, and the ACK event
2261  * handler for a successful Push.
2262  */
2263 static int
2264 lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev)
2265 {
2266         int rc = 0;
2267
2268         if (!ev->status)
2269                 goto out;
2270
2271         spin_lock(&lp->lp_lock);
2272         if (ev->msg_type == LNET_MSG_GET) {
2273                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2274                 lp->lp_state |= LNET_PEER_PING_FAILED;
2275                 lp->lp_ping_error = ev->status;
2276         } else { /* ev->msg_type == LNET_MSG_PUT */
2277                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2278                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2279                 lp->lp_push_error = ev->status;
2280         }
2281         spin_unlock(&lp->lp_lock);
2282         rc = LNET_REDISCOVER_PEER;
2283 out:
2284         CDEBUG(D_NET, "%s Send to %s: %d\n",
2285                 (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"),
2286                 libcfs_nid2str(ev->target.nid), rc);
2287         return rc;
2288 }
2289
2290 /*
2291  * Unlink event handling. This event is only seen if a call to
2292  * LNetMDUnlink() caused the event to be unlinked. If this call was
2293  * made after the event was set up in LNetGet() or LNetPut() then we
2294  * assume the Ping or Push timed out.
2295  */
2296 static void
2297 lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev)
2298 {
2299         spin_lock(&lp->lp_lock);
2300         /* We've passed through LNetGet() */
2301         if (lp->lp_state & LNET_PEER_PING_SENT) {
2302                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2303                 lp->lp_state |= LNET_PEER_PING_FAILED;
2304                 lp->lp_ping_error = -ETIMEDOUT;
2305                 CDEBUG(D_NET, "Ping Unlink for message to peer %s\n",
2306                         libcfs_nid2str(lp->lp_primary_nid));
2307         }
2308         /* We've passed through LNetPut() */
2309         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2310                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2311                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2312                 lp->lp_push_error = -ETIMEDOUT;
2313                 CDEBUG(D_NET, "Push Unlink for message to peer %s\n",
2314                         libcfs_nid2str(lp->lp_primary_nid));
2315         }
2316         spin_unlock(&lp->lp_lock);
2317 }
2318
2319 /*
2320  * Event handler for the discovery EQ.
2321  *
2322  * Called with lnet_res_lock(cpt) held. The cpt is the
2323  * lnet_cpt_of_cookie() of the md handle cookie.
2324  */
2325 static void lnet_discovery_event_handler(struct lnet_event *event)
2326 {
2327         struct lnet_peer *lp = event->md.user_ptr;
2328         struct lnet_ping_buffer *pbuf;
2329         int rc;
2330
2331         /* discovery needs to take another look */
2332         rc = LNET_REDISCOVER_PEER;
2333
2334         CDEBUG(D_NET, "Received event: %d\n", event->type);
2335
2336         switch (event->type) {
2337         case LNET_EVENT_ACK:
2338                 lnet_discovery_event_ack(lp, event);
2339                 break;
2340         case LNET_EVENT_REPLY:
2341                 lnet_discovery_event_reply(lp, event);
2342                 break;
2343         case LNET_EVENT_SEND:
2344                 /* Only send failure triggers a retry. */
2345                 rc = lnet_discovery_event_send(lp, event);
2346                 break;
2347         case LNET_EVENT_UNLINK:
2348                 /* LNetMDUnlink() was called */
2349                 lnet_discovery_event_unlink(lp, event);
2350                 break;
2351         default:
2352                 /* Invalid events. */
2353                 LBUG();
2354         }
2355         lnet_net_lock(LNET_LOCK_EX);
2356         if (event->unlinked) {
2357                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
2358                 lnet_ping_buffer_decref(pbuf);
2359                 lnet_peer_decref_locked(lp);
2360         }
2361
2362         /* put peer back at end of request queue, if discovery not already
2363          * done */
2364         if (rc == LNET_REDISCOVER_PEER && !lnet_peer_is_uptodate(lp)) {
2365                 list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2366                 wake_up(&the_lnet.ln_dc_waitq);
2367         }
2368         lnet_net_unlock(LNET_LOCK_EX);
2369 }
2370
2371 /*
2372  * Build a peer from incoming data.
2373  *
2374  * The NIDs in the incoming data are supposed to be structured as follows:
2375  *  - loopback
2376  *  - primary NID
2377  *  - other NIDs in same net
2378  *  - NIDs in second net
2379  *  - NIDs in third net
2380  *  - ...
2381  * This due to the way the list of NIDs in the data is created.
2382  *
2383  * Note that this function will mark the peer uptodate unless an
2384  * ENOMEM is encontered. All other errors are due to a conflict
2385  * between the DLC configuration and what discovery sees. We treat DLC
2386  * as binding, and therefore set the NIDS_UPTODATE flag to prevent the
2387  * peer from becoming stuck in discovery.
2388  */
2389 static int lnet_peer_merge_data(struct lnet_peer *lp,
2390                                 struct lnet_ping_buffer *pbuf)
2391 {
2392         struct lnet_peer_ni *lpni;
2393         lnet_nid_t *curnis = NULL;
2394         lnet_nid_t *addnis = NULL;
2395         lnet_nid_t *delnis = NULL;
2396         unsigned flags;
2397         int ncurnis;
2398         int naddnis;
2399         int ndelnis;
2400         int nnis = 0;
2401         int i;
2402         int j;
2403         int rc;
2404
2405         flags = LNET_PEER_DISCOVERED;
2406         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2407                 flags |= LNET_PEER_MULTI_RAIL;
2408
2409         nnis = MAX(lp->lp_nnis, pbuf->pb_info.pi_nnis);
2410         LIBCFS_ALLOC(curnis, nnis * sizeof(lnet_nid_t));
2411         LIBCFS_ALLOC(addnis, nnis * sizeof(lnet_nid_t));
2412         LIBCFS_ALLOC(delnis, nnis * sizeof(lnet_nid_t));
2413         if (!curnis || !addnis || !delnis) {
2414                 rc = -ENOMEM;
2415                 goto out;
2416         }
2417         ncurnis = 0;
2418         naddnis = 0;
2419         ndelnis = 0;
2420
2421         /* Construct the list of NIDs present in peer. */
2422         lpni = NULL;
2423         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
2424                 curnis[ncurnis++] = lpni->lpni_nid;
2425
2426         /*
2427          * Check for NIDs in pbuf not present in curnis[].
2428          * The loop starts at 1 to skip the loopback NID.
2429          */
2430         for (i = 1; i < pbuf->pb_info.pi_nnis; i++) {
2431                 for (j = 0; j < ncurnis; j++)
2432                         if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
2433                                 break;
2434                 if (j == ncurnis)
2435                         addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid;
2436         }
2437         /*
2438          * Check for NIDs in curnis[] not present in pbuf.
2439          * The nested loop starts at 1 to skip the loopback NID.
2440          *
2441          * But never add the loopback NID to delnis[]: if it is
2442          * present in curnis[] then this peer is for this node.
2443          */
2444         for (i = 0; i < ncurnis; i++) {
2445                 if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND)
2446                         continue;
2447                 for (j = 1; j < pbuf->pb_info.pi_nnis; j++)
2448                         if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid)
2449                                 break;
2450                 if (j == pbuf->pb_info.pi_nnis)
2451                         delnis[ndelnis++] = curnis[i];
2452         }
2453
2454         for (i = 0; i < naddnis; i++) {
2455                 rc = lnet_peer_add_nid(lp, addnis[i], flags);
2456                 if (rc) {
2457                         CERROR("Error adding NID %s to peer %s: %d\n",
2458                                libcfs_nid2str(addnis[i]),
2459                                libcfs_nid2str(lp->lp_primary_nid), rc);
2460                         if (rc == -ENOMEM)
2461                                 goto out;
2462                 }
2463         }
2464         for (i = 0; i < ndelnis; i++) {
2465                 rc = lnet_peer_del_nid(lp, delnis[i], flags);
2466                 if (rc) {
2467                         CERROR("Error deleting NID %s from peer %s: %d\n",
2468                                libcfs_nid2str(delnis[i]),
2469                                libcfs_nid2str(lp->lp_primary_nid), rc);
2470                         if (rc == -ENOMEM)
2471                                 goto out;
2472                 }
2473         }
2474         /*
2475          * Errors other than -ENOMEM are due to peers having been
2476          * configured with DLC. Ignore these because DLC overrides
2477          * Discovery.
2478          */
2479         rc = 0;
2480 out:
2481         LIBCFS_FREE(curnis, nnis * sizeof(lnet_nid_t));
2482         LIBCFS_FREE(addnis, nnis * sizeof(lnet_nid_t));
2483         LIBCFS_FREE(delnis, nnis * sizeof(lnet_nid_t));
2484         lnet_ping_buffer_decref(pbuf);
2485         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2486
2487         if (rc) {
2488                 spin_lock(&lp->lp_lock);
2489                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2490                 lp->lp_state |= LNET_PEER_FORCE_PING;
2491                 spin_unlock(&lp->lp_lock);
2492         }
2493         return rc;
2494 }
2495
2496 /*
2497  * The data in pbuf says lp is its primary peer, but the data was
2498  * received by a different peer. Try to update lp with the data.
2499  */
2500 static int
2501 lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
2502 {
2503         struct lnet_handle_md mdh;
2504
2505         /* Queue lp for discovery, and force it on the request queue. */
2506         lnet_net_lock(LNET_LOCK_EX);
2507         if (lnet_peer_queue_for_discovery(lp))
2508                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2509         lnet_net_unlock(LNET_LOCK_EX);
2510
2511         LNetInvalidateMDHandle(&mdh);
2512
2513         /*
2514          * Decide whether we can move the peer to the DATA_PRESENT state.
2515          *
2516          * We replace stale data for a multi-rail peer, repair PING_FAILED
2517          * status, and preempt FORCE_PING.
2518          *
2519          * If after that we have DATA_PRESENT, we merge it into this peer.
2520          */
2521         spin_lock(&lp->lp_lock);
2522         if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2523                 if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) {
2524                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2525                 } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2526                         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2527                         lnet_ping_buffer_decref(pbuf);
2528                         pbuf = lp->lp_data;
2529                         lp->lp_data = NULL;
2530                 }
2531         }
2532         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2533                 lnet_ping_buffer_decref(lp->lp_data);
2534                 lp->lp_data = NULL;
2535                 lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2536         }
2537         if (lp->lp_state & LNET_PEER_PING_FAILED) {
2538                 mdh = lp->lp_ping_mdh;
2539                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2540                 lp->lp_state &= ~LNET_PEER_PING_FAILED;
2541                 lp->lp_ping_error = 0;
2542         }
2543         if (lp->lp_state & LNET_PEER_FORCE_PING)
2544                 lp->lp_state &= ~LNET_PEER_FORCE_PING;
2545         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2546         spin_unlock(&lp->lp_lock);
2547
2548         if (!LNetMDHandleIsInvalid(mdh))
2549                 LNetMDUnlink(mdh);
2550
2551         if (pbuf)
2552                 return lnet_peer_merge_data(lp, pbuf);
2553
2554         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2555         return 0;
2556 }
2557
2558 /*
2559  * Update a peer using the data received.
2560  */
2561 static int lnet_peer_data_present(struct lnet_peer *lp)
2562 __must_hold(&lp->lp_lock)
2563 {
2564         struct lnet_ping_buffer *pbuf;
2565         struct lnet_peer_ni *lpni;
2566         lnet_nid_t nid = LNET_NID_ANY;
2567         unsigned flags;
2568         int rc = 0;
2569
2570         pbuf = lp->lp_data;
2571         lp->lp_data = NULL;
2572         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2573         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2574         spin_unlock(&lp->lp_lock);
2575
2576         /*
2577          * Modifications of peer structures are done while holding the
2578          * ln_api_mutex. A global lock is required because we may be
2579          * modifying multiple peer structures, and a mutex greatly
2580          * simplifies memory management.
2581          *
2582          * The actual changes to the data structures must also protect
2583          * against concurrent lookups, for which the lnet_net_lock in
2584          * LNET_LOCK_EX mode is used.
2585          */
2586         mutex_lock(&the_lnet.ln_api_mutex);
2587         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
2588                 rc = -ESHUTDOWN;
2589                 goto out;
2590         }
2591
2592         /*
2593          * If this peer is not on the peer list then it is being torn
2594          * down, and our reference count may be all that is keeping it
2595          * alive. Don't do any work on it.
2596          */
2597         if (list_empty(&lp->lp_peer_list))
2598                 goto out;
2599
2600         flags = LNET_PEER_DISCOVERED;
2601         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2602                 flags |= LNET_PEER_MULTI_RAIL;
2603
2604         /*
2605          * Check whether the primary NID in the message matches the
2606          * primary NID of the peer. If it does, update the peer, if
2607          * it it does not, check whether there is already a peer with
2608          * that primary NID. If no such peer exists, try to update
2609          * the primary NID of the current peer (allowed if it was
2610          * created due to message traffic) and complete the update.
2611          * If the peer did exist, hand off the data to it.
2612          *
2613          * The peer for the loopback interface is a special case: this
2614          * is the peer for the local node, and we want to set its
2615          * primary NID to the correct value here. Moreover, this peer
2616          * can show up with only the loopback NID in the ping buffer.
2617          */
2618         if (pbuf->pb_info.pi_nnis <= 1)
2619                 goto out;
2620         nid = pbuf->pb_info.pi_ni[1].ns_nid;
2621         if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) {
2622                 rc = lnet_peer_set_primary_nid(lp, nid, flags);
2623                 if (!rc)
2624                         rc = lnet_peer_merge_data(lp, pbuf);
2625         } else if (lp->lp_primary_nid == nid) {
2626                 rc = lnet_peer_merge_data(lp, pbuf);
2627         } else {
2628                 lpni = lnet_find_peer_ni_locked(nid);
2629                 if (!lpni) {
2630                         rc = lnet_peer_set_primary_nid(lp, nid, flags);
2631                         if (rc) {
2632                                 CERROR("Primary NID error %s versus %s: %d\n",
2633                                        libcfs_nid2str(lp->lp_primary_nid),
2634                                        libcfs_nid2str(nid), rc);
2635                         } else {
2636                                 rc = lnet_peer_merge_data(lp, pbuf);
2637                         }
2638                 } else {
2639                         rc = lnet_peer_set_primary_data(
2640                                 lpni->lpni_peer_net->lpn_peer, pbuf);
2641                         lnet_peer_ni_decref_locked(lpni);
2642                 }
2643         }
2644 out:
2645         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2646         mutex_unlock(&the_lnet.ln_api_mutex);
2647
2648         spin_lock(&lp->lp_lock);
2649         /* Tell discovery to re-check the peer immediately. */
2650         if (!rc)
2651                 rc = LNET_REDISCOVER_PEER;
2652         return rc;
2653 }
2654
2655 /*
2656  * A ping failed. Clear the PING_FAILED state and set the
2657  * FORCE_PING state, to ensure a retry even if discovery is
2658  * disabled. This avoids being left with incorrect state.
2659  */
2660 static int lnet_peer_ping_failed(struct lnet_peer *lp)
2661 __must_hold(&lp->lp_lock)
2662 {
2663         struct lnet_handle_md mdh;
2664         int rc;
2665
2666         mdh = lp->lp_ping_mdh;
2667         LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2668         lp->lp_state &= ~LNET_PEER_PING_FAILED;
2669         lp->lp_state |= LNET_PEER_FORCE_PING;
2670         rc = lp->lp_ping_error;
2671         lp->lp_ping_error = 0;
2672         spin_unlock(&lp->lp_lock);
2673
2674         if (!LNetMDHandleIsInvalid(mdh))
2675                 LNetMDUnlink(mdh);
2676
2677         CDEBUG(D_NET, "peer %s:%d\n",
2678                libcfs_nid2str(lp->lp_primary_nid), rc);
2679
2680         spin_lock(&lp->lp_lock);
2681         return rc ? rc : LNET_REDISCOVER_PEER;
2682 }
2683
2684 /*
2685  * Select NID to send a Ping or Push to.
2686  */
2687 static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
2688 {
2689         struct lnet_peer_ni *lpni;
2690
2691         /* Look for a direct-connected NID for this peer. */
2692         lpni = NULL;
2693         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2694                 if (!lnet_is_peer_ni_healthy_locked(lpni))
2695                         continue;
2696                 if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
2697                         continue;
2698                 break;
2699         }
2700         if (lpni)
2701                 return lpni->lpni_nid;
2702
2703         /* Look for a routed-connected NID for this peer. */
2704         lpni = NULL;
2705         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2706                 if (!lnet_is_peer_ni_healthy_locked(lpni))
2707                         continue;
2708                 if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
2709                         continue;
2710                 break;
2711         }
2712         if (lpni)
2713                 return lpni->lpni_nid;
2714
2715         return LNET_NID_ANY;
2716 }
2717
2718 /* Active side of ping. */
2719 static int lnet_peer_send_ping(struct lnet_peer *lp)
2720 __must_hold(&lp->lp_lock)
2721 {
2722         struct lnet_md md = { NULL };
2723         struct lnet_process_id id;
2724         struct lnet_ping_buffer *pbuf;
2725         int nnis;
2726         int rc;
2727         int cpt;
2728
2729         lp->lp_state |= LNET_PEER_PING_SENT;
2730         lp->lp_state &= ~LNET_PEER_FORCE_PING;
2731         spin_unlock(&lp->lp_lock);
2732
2733         nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN);
2734         pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
2735         if (!pbuf) {
2736                 rc = -ENOMEM;
2737                 goto fail_error;
2738         }
2739
2740         /* initialize md content */
2741         md.start     = &pbuf->pb_info;
2742         md.length    = LNET_PING_INFO_SIZE(nnis);
2743         md.threshold = 2; /* GET/REPLY */
2744         md.max_size  = 0;
2745         md.options   = LNET_MD_TRUNCATE;
2746         md.user_ptr  = lp;
2747         md.eq_handle = the_lnet.ln_dc_eqh;
2748
2749         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_ping_mdh);
2750         if (rc != 0) {
2751                 lnet_ping_buffer_decref(pbuf);
2752                 CERROR("Can't bind MD: %d\n", rc);
2753                 goto fail_error;
2754         }
2755         cpt = lnet_net_lock_current();
2756         /* Refcount for MD. */
2757         lnet_peer_addref_locked(lp);
2758         id.pid = LNET_PID_LUSTRE;
2759         id.nid = lnet_peer_select_nid(lp);
2760         lnet_net_unlock(cpt);
2761
2762         if (id.nid == LNET_NID_ANY) {
2763                 rc = -EHOSTUNREACH;
2764                 goto fail_unlink_md;
2765         }
2766
2767         rc = LNetGet(LNET_NID_ANY, lp->lp_ping_mdh, id,
2768                      LNET_RESERVED_PORTAL,
2769                      LNET_PROTO_PING_MATCHBITS, 0);
2770
2771         if (rc)
2772                 goto fail_unlink_md;
2773
2774         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2775
2776         spin_lock(&lp->lp_lock);
2777         return 0;
2778
2779 fail_unlink_md:
2780         LNetMDUnlink(lp->lp_ping_mdh);
2781         LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2782 fail_error:
2783         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2784         /*
2785          * The errors that get us here are considered hard errors and
2786          * cause Discovery to terminate. So we clear PING_SENT, but do
2787          * not set either PING_FAILED or FORCE_PING. In fact we need
2788          * to clear PING_FAILED, because the unlink event handler will
2789          * have set it if we called LNetMDUnlink() above.
2790          */
2791         spin_lock(&lp->lp_lock);
2792         lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED);
2793         return rc;
2794 }
2795
2796 /*
2797  * This function exists because you cannot call LNetMDUnlink() from an
2798  * event handler.
2799  */
2800 static int lnet_peer_push_failed(struct lnet_peer *lp)
2801 __must_hold(&lp->lp_lock)
2802 {
2803         struct lnet_handle_md mdh;
2804         int rc;
2805
2806         mdh = lp->lp_push_mdh;
2807         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2808         lp->lp_state &= ~LNET_PEER_PUSH_FAILED;
2809         rc = lp->lp_push_error;
2810         lp->lp_push_error = 0;
2811         spin_unlock(&lp->lp_lock);
2812
2813         if (!LNetMDHandleIsInvalid(mdh))
2814                 LNetMDUnlink(mdh);
2815
2816         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2817         spin_lock(&lp->lp_lock);
2818         return rc ? rc : LNET_REDISCOVER_PEER;
2819 }
2820
2821 /* Active side of push. */
2822 static int lnet_peer_send_push(struct lnet_peer *lp)
2823 __must_hold(&lp->lp_lock)
2824 {
2825         struct lnet_ping_buffer *pbuf;
2826         struct lnet_process_id id;
2827         struct lnet_md md;
2828         int cpt;
2829         int rc;
2830
2831         /* Don't push to a non-multi-rail peer. */
2832         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
2833                 lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2834                 return 0;
2835         }
2836
2837         lp->lp_state |= LNET_PEER_PUSH_SENT;
2838         lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2839         spin_unlock(&lp->lp_lock);
2840
2841         cpt = lnet_net_lock_current();
2842         pbuf = the_lnet.ln_ping_target;
2843         lnet_ping_buffer_addref(pbuf);
2844         lnet_net_unlock(cpt);
2845
2846         /* Push source MD */
2847         md.start     = &pbuf->pb_info;
2848         md.length    = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
2849         md.threshold = 2; /* Put/Ack */
2850         md.max_size  = 0;
2851         md.options   = 0;
2852         md.eq_handle = the_lnet.ln_dc_eqh;
2853         md.user_ptr  = lp;
2854
2855         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
2856         if (rc) {
2857                 lnet_ping_buffer_decref(pbuf);
2858                 CERROR("Can't bind push source MD: %d\n", rc);
2859                 goto fail_error;
2860         }
2861         cpt = lnet_net_lock_current();
2862         /* Refcount for MD. */
2863         lnet_peer_addref_locked(lp);
2864         id.pid = LNET_PID_LUSTRE;
2865         id.nid = lnet_peer_select_nid(lp);
2866         lnet_net_unlock(cpt);
2867
2868         if (id.nid == LNET_NID_ANY) {
2869                 rc = -EHOSTUNREACH;
2870                 goto fail_unlink;
2871         }
2872
2873         rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh,
2874                      LNET_ACK_REQ, id, LNET_RESERVED_PORTAL,
2875                      LNET_PROTO_PING_MATCHBITS, 0, 0);
2876
2877         if (rc)
2878                 goto fail_unlink;
2879
2880         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2881
2882         spin_lock(&lp->lp_lock);
2883         return 0;
2884
2885 fail_unlink:
2886         LNetMDUnlink(lp->lp_push_mdh);
2887         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2888 fail_error:
2889         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2890         /*
2891          * The errors that get us here are considered hard errors and
2892          * cause Discovery to terminate. So we clear PUSH_SENT, but do
2893          * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED,
2894          * because the unlink event handler will have set it if we
2895          * called LNetMDUnlink() above.
2896          */
2897         spin_lock(&lp->lp_lock);
2898         lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED);
2899         return rc;
2900 }
2901
2902 /*
2903  * An unrecoverable error was encountered during discovery.
2904  * Set error status in peer and abort discovery.
2905  */
2906 static void lnet_peer_discovery_error(struct lnet_peer *lp, int error)
2907 {
2908         CDEBUG(D_NET, "Discovery error %s: %d\n",
2909                libcfs_nid2str(lp->lp_primary_nid), error);
2910
2911         spin_lock(&lp->lp_lock);
2912         lp->lp_dc_error = error;
2913         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2914         lp->lp_state |= LNET_PEER_REDISCOVER;
2915         spin_unlock(&lp->lp_lock);
2916 }
2917
2918 /*
2919  * Mark the peer as discovered.
2920  */
2921 static int lnet_peer_discovered(struct lnet_peer *lp)
2922 __must_hold(&lp->lp_lock)
2923 {
2924         lp->lp_state |= LNET_PEER_DISCOVERED;
2925         lp->lp_state &= ~(LNET_PEER_DISCOVERING |
2926                           LNET_PEER_REDISCOVER);
2927
2928         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2929
2930         return 0;
2931 }
2932
2933 /*
2934  * Mark the peer as to be rediscovered.
2935  */
2936 static int lnet_peer_rediscover(struct lnet_peer *lp)
2937 __must_hold(&lp->lp_lock)
2938 {
2939         lp->lp_state |= LNET_PEER_REDISCOVER;
2940         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2941
2942         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2943
2944         return 0;
2945 }
2946
2947 /*
2948  * Returns the first peer on the ln_dc_working queue if its timeout
2949  * has expired. Takes the current time as an argument so as to not
2950  * obsessively re-check the clock. The oldest discovery request will
2951  * be at the head of the queue.
2952  */
2953 static struct lnet_peer *lnet_peer_get_dc_timed_out(time64_t now)
2954 {
2955         struct lnet_peer *lp;
2956
2957         if (list_empty(&the_lnet.ln_dc_working))
2958                 return NULL;
2959         lp = list_first_entry(&the_lnet.ln_dc_working,
2960                               struct lnet_peer, lp_dc_list);
2961         if (now < lp->lp_last_queued + lnet_transaction_timeout)
2962                 return NULL;
2963         return lp;
2964 }
2965
2966 /*
2967  * Discovering this peer is taking too long. Cancel any Ping or Push
2968  * that discovery is waiting on by unlinking the relevant MDs. The
2969  * lnet_discovery_event_handler() will proceed from here and complete
2970  * the cleanup.
2971  */
2972 static void lnet_peer_cancel_discovery(struct lnet_peer *lp)
2973 {
2974         struct lnet_handle_md ping_mdh;
2975         struct lnet_handle_md push_mdh;
2976
2977         LNetInvalidateMDHandle(&ping_mdh);
2978         LNetInvalidateMDHandle(&push_mdh);
2979
2980         spin_lock(&lp->lp_lock);
2981         if (lp->lp_state & LNET_PEER_PING_SENT) {
2982                 ping_mdh = lp->lp_ping_mdh;
2983                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2984         }
2985         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2986                 push_mdh = lp->lp_push_mdh;
2987                 LNetInvalidateMDHandle(&lp->lp_push_mdh);
2988         }
2989         spin_unlock(&lp->lp_lock);
2990
2991         if (!LNetMDHandleIsInvalid(ping_mdh))
2992                 LNetMDUnlink(ping_mdh);
2993         if (!LNetMDHandleIsInvalid(push_mdh))
2994                 LNetMDUnlink(push_mdh);
2995 }
2996
2997 /*
2998  * Wait for work to be queued or some other change that must be
2999  * attended to. Returns non-zero if the discovery thread should shut
3000  * down.
3001  */
3002 static int lnet_peer_discovery_wait_for_work(void)
3003 {
3004         int cpt;
3005         int rc = 0;
3006
3007         DEFINE_WAIT(wait);
3008
3009         cpt = lnet_net_lock_current();
3010         for (;;) {
3011                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
3012                                 TASK_INTERRUPTIBLE);
3013                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3014                         break;
3015                 if (lnet_push_target_resize_needed())
3016                         break;
3017                 if (!list_empty(&the_lnet.ln_dc_request))
3018                         break;
3019                 if (!list_empty(&the_lnet.ln_msg_resend))
3020                         break;
3021                 if (lnet_peer_get_dc_timed_out(ktime_get_real_seconds()))
3022                         break;
3023                 lnet_net_unlock(cpt);
3024
3025                 /*
3026                  * wakeup max every second to check if there are peers that
3027                  * have been stuck on the working queue for greater than
3028                  * the peer timeout.
3029                  */
3030                 schedule_timeout(cfs_time_seconds(1));
3031                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
3032                 cpt = lnet_net_lock_current();
3033         }
3034         finish_wait(&the_lnet.ln_dc_waitq, &wait);
3035
3036         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3037                 rc = -ESHUTDOWN;
3038
3039         lnet_net_unlock(cpt);
3040
3041         CDEBUG(D_NET, "woken: %d\n", rc);
3042
3043         return rc;
3044 }
3045
3046 /*
3047  * Messages that were pending on a destroyed peer will be put on a global
3048  * resend list. The message resend list will be checked by
3049  * the discovery thread when it wakes up, and will resend messages. These
3050  * messages can still be sendable in the case the lpni which was the initial
3051  * cause of the message re-queue was transfered to another peer.
3052  *
3053  * It is possible that LNet could be shutdown while we're iterating
3054  * through the list. lnet_shudown_lndnets() will attempt to access the
3055  * resend list, but will have to wait until the spinlock is released, by
3056  * which time there shouldn't be any more messages on the resend list.
3057  * During shutdown lnet_send() will fail and lnet_finalize() will be called
3058  * for the messages so they can be released. The other case is that
3059  * lnet_shudown_lndnets() can finalize all the messages before this
3060  * function can visit the resend list, in which case this function will be
3061  * a no-op.
3062  */
3063 static void lnet_resend_msgs(void)
3064 {
3065         struct lnet_msg *msg, *tmp;
3066         struct list_head resend;
3067         int rc;
3068
3069         INIT_LIST_HEAD(&resend);
3070
3071         spin_lock(&the_lnet.ln_msg_resend_lock);
3072         list_splice(&the_lnet.ln_msg_resend, &resend);
3073         spin_unlock(&the_lnet.ln_msg_resend_lock);
3074
3075         list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
3076                 list_del_init(&msg->msg_list);
3077                 rc = lnet_send(msg->msg_src_nid_param, msg,
3078                                msg->msg_rtr_nid_param);
3079                 if (rc < 0) {
3080                         CNETERR("Error sending %s to %s: %d\n",
3081                                lnet_msgtyp2str(msg->msg_type),
3082                                libcfs_id2str(msg->msg_target), rc);
3083                         lnet_finalize(msg, rc);
3084                 }
3085         }
3086 }
3087
3088 /* The discovery thread. */
3089 static int lnet_peer_discovery(void *arg)
3090 {
3091         struct lnet_peer *lp;
3092         time64_t now;
3093         int rc;
3094
3095         CDEBUG(D_NET, "started\n");
3096         cfs_block_allsigs();
3097
3098         for (;;) {
3099                 if (lnet_peer_discovery_wait_for_work())
3100                         break;
3101
3102                 lnet_resend_msgs();
3103
3104                 if (lnet_push_target_resize_needed())
3105                         lnet_push_target_resize();
3106
3107                 lnet_net_lock(LNET_LOCK_EX);
3108                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3109                         break;
3110
3111                 /*
3112                  * Process all incoming discovery work requests.  When
3113                  * discovery must wait on a peer to change state, it
3114                  * is added to the tail of the ln_dc_working queue. A
3115                  * timestamp keeps track of when the peer was added,
3116                  * so we can time out discovery requests that take too
3117                  * long.
3118                  */
3119                 while (!list_empty(&the_lnet.ln_dc_request)) {
3120                         lp = list_first_entry(&the_lnet.ln_dc_request,
3121                                               struct lnet_peer, lp_dc_list);
3122                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
3123                         /*
3124                          * set the time the peer was put on the dc_working
3125                          * queue. It shouldn't remain on the queue
3126                          * forever, in case the GET message (for ping)
3127                          * doesn't get a REPLY or the PUT message (for
3128                          * push) doesn't get an ACK.
3129                          *
3130                          * TODO: LNet Health will deal with this scenario
3131                          * in a generic way.
3132                          */
3133                         lp->lp_last_queued = ktime_get_real_seconds();
3134                         lnet_net_unlock(LNET_LOCK_EX);
3135
3136                         /*
3137                          * Select an action depending on the state of
3138                          * the peer and whether discovery is disabled.
3139                          * The check whether discovery is disabled is
3140                          * done after the code that handles processing
3141                          * for arrived data, cleanup for failures, and
3142                          * forcing a Ping or Push.
3143                          */
3144                         spin_lock(&lp->lp_lock);
3145                         CDEBUG(D_NET, "peer %s state %#x\n",
3146                                 libcfs_nid2str(lp->lp_primary_nid),
3147                                 lp->lp_state);
3148                         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
3149                                 rc = lnet_peer_data_present(lp);
3150                         else if (lp->lp_state & LNET_PEER_PING_FAILED)
3151                                 rc = lnet_peer_ping_failed(lp);
3152                         else if (lp->lp_state & LNET_PEER_PUSH_FAILED)
3153                                 rc = lnet_peer_push_failed(lp);
3154                         else if (lp->lp_state & LNET_PEER_FORCE_PING)
3155                                 rc = lnet_peer_send_ping(lp);
3156                         else if (lp->lp_state & LNET_PEER_FORCE_PUSH)
3157                                 rc = lnet_peer_send_push(lp);
3158                         else if (lnet_peer_discovery_disabled)
3159                                 rc = lnet_peer_rediscover(lp);
3160                         else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
3161                                 rc = lnet_peer_send_ping(lp);
3162                         else if (lnet_peer_needs_push(lp))
3163                                 rc = lnet_peer_send_push(lp);
3164                         else
3165                                 rc = lnet_peer_discovered(lp);
3166                         CDEBUG(D_NET, "peer %s state %#x rc %d\n",
3167                                 libcfs_nid2str(lp->lp_primary_nid),
3168                                 lp->lp_state, rc);
3169                         spin_unlock(&lp->lp_lock);
3170
3171                         lnet_net_lock(LNET_LOCK_EX);
3172                         if (rc == LNET_REDISCOVER_PEER) {
3173                                 list_move(&lp->lp_dc_list,
3174                                           &the_lnet.ln_dc_request);
3175                         } else if (rc) {
3176                                 lnet_peer_discovery_error(lp, rc);
3177                         }
3178                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
3179                                 lnet_peer_discovery_complete(lp);
3180                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3181                                 break;
3182                 }
3183
3184                 /*
3185                  * Now that the ln_dc_request queue has been emptied
3186                  * check the ln_dc_working queue for peers that are
3187                  * taking too long. Move all that are found to the
3188                  * ln_dc_expired queue and time out any pending
3189                  * Ping or Push. We have to drop the lnet_net_lock
3190                  * in the loop because lnet_peer_cancel_discovery()
3191                  * calls LNetMDUnlink().
3192                  */
3193                 now = ktime_get_real_seconds();
3194                 while ((lp = lnet_peer_get_dc_timed_out(now)) != NULL) {
3195                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
3196                         lnet_net_unlock(LNET_LOCK_EX);
3197                         lnet_peer_cancel_discovery(lp);
3198                         lnet_net_lock(LNET_LOCK_EX);
3199                 }
3200
3201                 lnet_net_unlock(LNET_LOCK_EX);
3202         }
3203
3204         CDEBUG(D_NET, "stopping\n");
3205         /*
3206          * Clean up before telling lnet_peer_discovery_stop() that
3207          * we're done. Use wake_up() below to somewhat reduce the
3208          * size of the thundering herd if there are multiple threads
3209          * waiting on discovery of a single peer.
3210          */
3211         LNetEQFree(the_lnet.ln_dc_eqh);
3212         LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3213
3214         /* Queue cleanup 1: stop all pending pings and pushes. */
3215         lnet_net_lock(LNET_LOCK_EX);
3216         while (!list_empty(&the_lnet.ln_dc_working)) {
3217                 lp = list_first_entry(&the_lnet.ln_dc_working,
3218                                       struct lnet_peer, lp_dc_list);
3219                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
3220                 lnet_net_unlock(LNET_LOCK_EX);
3221                 lnet_peer_cancel_discovery(lp);
3222                 lnet_net_lock(LNET_LOCK_EX);
3223         }
3224         lnet_net_unlock(LNET_LOCK_EX);
3225
3226         /* Queue cleanup 2: wait for the expired queue to clear. */
3227         while (!list_empty(&the_lnet.ln_dc_expired))
3228                 schedule_timeout(cfs_time_seconds(1));
3229
3230         /* Queue cleanup 3: clear the request queue. */
3231         lnet_net_lock(LNET_LOCK_EX);
3232         while (!list_empty(&the_lnet.ln_dc_request)) {
3233                 lp = list_first_entry(&the_lnet.ln_dc_request,
3234                                       struct lnet_peer, lp_dc_list);
3235                 lnet_peer_discovery_error(lp, -ESHUTDOWN);
3236                 lnet_peer_discovery_complete(lp);
3237         }
3238         lnet_net_unlock(LNET_LOCK_EX);
3239
3240         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3241         wake_up(&the_lnet.ln_dc_waitq);
3242
3243         CDEBUG(D_NET, "stopped\n");
3244
3245         return 0;
3246 }
3247
3248 /* ln_api_mutex is held on entry. */
3249 int lnet_peer_discovery_start(void)
3250 {
3251         struct task_struct *task;
3252         int rc;
3253
3254         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
3255                 return -EALREADY;
3256
3257         rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
3258         if (rc != 0) {
3259                 CERROR("Can't allocate discovery EQ: %d\n", rc);
3260                 return rc;
3261         }
3262
3263         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
3264         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
3265         if (IS_ERR(task)) {
3266                 rc = PTR_ERR(task);
3267                 CERROR("Can't start peer discovery thread: %d\n", rc);
3268
3269                 LNetEQFree(the_lnet.ln_dc_eqh);
3270                 LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3271
3272                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3273         }
3274
3275         CDEBUG(D_NET, "discovery start: %d\n", rc);
3276
3277         return rc;
3278 }
3279
3280 /* ln_api_mutex is held on entry. */
3281 void lnet_peer_discovery_stop(void)
3282 {
3283         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
3284                 return;
3285
3286         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
3287         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
3288         wake_up(&the_lnet.ln_dc_waitq);
3289
3290         wait_event(the_lnet.ln_dc_waitq,
3291                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
3292
3293         LASSERT(list_empty(&the_lnet.ln_dc_request));
3294         LASSERT(list_empty(&the_lnet.ln_dc_working));
3295         LASSERT(list_empty(&the_lnet.ln_dc_expired));
3296
3297         CDEBUG(D_NET, "discovery stopped\n");
3298 }
3299
3300 /* Debugging */
3301
3302 void
3303 lnet_debug_peer(lnet_nid_t nid)
3304 {
3305         char                    *aliveness = "NA";
3306         struct lnet_peer_ni     *lp;
3307         int                     cpt;
3308
3309         cpt = lnet_cpt_of_nid(nid, NULL);
3310         lnet_net_lock(cpt);
3311
3312         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
3313         if (IS_ERR(lp)) {
3314                 lnet_net_unlock(cpt);
3315                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
3316                 return;
3317         }
3318
3319         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
3320                 aliveness = lp->lpni_alive ? "up" : "down";
3321
3322         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
3323                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
3324                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
3325                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
3326                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
3327
3328         lnet_peer_ni_decref_locked(lp);
3329
3330         lnet_net_unlock(cpt);
3331 }
3332
3333 /* Gathering information for userspace. */
3334
3335 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
3336                           char aliveness[LNET_MAX_STR_LEN],
3337                           __u32 *cpt_iter, __u32 *refcount,
3338                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
3339                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
3340                           __u32 *peer_tx_qnob)
3341 {
3342         struct lnet_peer_table          *peer_table;
3343         struct lnet_peer_ni             *lp;
3344         int                             j;
3345         int                             lncpt;
3346         bool                            found = false;
3347
3348         /* get the number of CPTs */
3349         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3350
3351         /* if the cpt number to be examined is >= the number of cpts in
3352          * the system then indicate that there are no more cpts to examin
3353          */
3354         if (*cpt_iter >= lncpt)
3355                 return -ENOENT;
3356
3357         /* get the current table */
3358         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
3359         /* if the ptable is NULL then there are no more cpts to examine */
3360         if (peer_table == NULL)
3361                 return -ENOENT;
3362
3363         lnet_net_lock(*cpt_iter);
3364
3365         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
3366                 struct list_head *peers = &peer_table->pt_hash[j];
3367
3368                 list_for_each_entry(lp, peers, lpni_hashlist) {
3369                         if (peer_index-- > 0)
3370                                 continue;
3371
3372                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
3373                         if (lnet_isrouter(lp) ||
3374                                 lnet_peer_aliveness_enabled(lp))
3375                                 snprintf(aliveness, LNET_MAX_STR_LEN,
3376                                          lp->lpni_alive ? "up" : "down");
3377
3378                         *nid = lp->lpni_nid;
3379                         *refcount = atomic_read(&lp->lpni_refcount);
3380                         *ni_peer_tx_credits =
3381                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
3382                         *peer_tx_credits = lp->lpni_txcredits;
3383                         *peer_rtr_credits = lp->lpni_rtrcredits;
3384                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
3385                         *peer_tx_qnob = lp->lpni_txqnob;
3386
3387                         found = true;
3388                 }
3389
3390         }
3391         lnet_net_unlock(*cpt_iter);
3392
3393         *cpt_iter = lncpt;
3394
3395         return found ? 0 : -ENOENT;
3396 }
3397
3398 /* ln_api_mutex is held, which keeps the peer list stable */
3399 int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
3400 {
3401         struct lnet_ioctl_element_stats *lpni_stats;
3402         struct lnet_ioctl_element_msg_stats *lpni_msg_stats;
3403         struct lnet_peer_ni_credit_info *lpni_info;
3404         struct lnet_peer_ni *lpni;
3405         struct lnet_peer *lp;
3406         lnet_nid_t nid;
3407         __u32 size;
3408         int rc;
3409
3410         lp = lnet_find_peer(cfg->prcfg_prim_nid);
3411
3412         if (!lp) {
3413                 rc = -ENOENT;
3414                 goto out;
3415         }
3416
3417         size = sizeof(nid) + sizeof(*lpni_info) + sizeof(*lpni_stats)
3418                 + sizeof(*lpni_msg_stats);
3419         size *= lp->lp_nnis;
3420         if (size > cfg->prcfg_size) {
3421                 cfg->prcfg_size = size;
3422                 rc = -E2BIG;
3423                 goto out_lp_decref;
3424         }
3425
3426         cfg->prcfg_prim_nid = lp->lp_primary_nid;
3427         cfg->prcfg_mr = lnet_peer_is_multi_rail(lp);
3428         cfg->prcfg_cfg_nid = lp->lp_primary_nid;
3429         cfg->prcfg_count = lp->lp_nnis;
3430         cfg->prcfg_size = size;
3431         cfg->prcfg_state = lp->lp_state;
3432
3433         /* Allocate helper buffers. */
3434         rc = -ENOMEM;
3435         LIBCFS_ALLOC(lpni_info, sizeof(*lpni_info));
3436         if (!lpni_info)
3437                 goto out_lp_decref;
3438         LIBCFS_ALLOC(lpni_stats, sizeof(*lpni_stats));
3439         if (!lpni_stats)
3440                 goto out_free_info;
3441         LIBCFS_ALLOC(lpni_msg_stats, sizeof(*lpni_msg_stats));
3442         if (!lpni_msg_stats)
3443                 goto out_free_stats;
3444
3445
3446         lpni = NULL;
3447         rc = -EFAULT;
3448         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
3449                 nid = lpni->lpni_nid;
3450                 if (copy_to_user(bulk, &nid, sizeof(nid)))
3451                         goto out_free_msg_stats;
3452                 bulk += sizeof(nid);
3453
3454                 memset(lpni_info, 0, sizeof(*lpni_info));
3455                 snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
3456                 if (lnet_isrouter(lpni) ||
3457                         lnet_peer_aliveness_enabled(lpni))
3458                         snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN,
3459                                 lpni->lpni_alive ? "up" : "down");
3460
3461                 lpni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
3462                 lpni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
3463                         lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
3464                 lpni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
3465                 lpni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
3466                 lpni_info->cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
3467                 lpni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
3468                 lpni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
3469                 if (copy_to_user(bulk, lpni_info, sizeof(*lpni_info)))
3470                         goto out_free_msg_stats;
3471                 bulk += sizeof(*lpni_info);
3472
3473                 memset(lpni_stats, 0, sizeof(*lpni_stats));
3474                 lpni_stats->iel_send_count = lnet_sum_stats(&lpni->lpni_stats,
3475                                                             LNET_STATS_TYPE_SEND);
3476                 lpni_stats->iel_recv_count = lnet_sum_stats(&lpni->lpni_stats,
3477                                                             LNET_STATS_TYPE_RECV);
3478                 lpni_stats->iel_drop_count = lnet_sum_stats(&lpni->lpni_stats,
3479                                                             LNET_STATS_TYPE_DROP);
3480                 if (copy_to_user(bulk, lpni_stats, sizeof(*lpni_stats)))
3481                         goto out_free_msg_stats;
3482                 bulk += sizeof(*lpni_stats);
3483                 lnet_usr_translate_stats(lpni_msg_stats, &lpni->lpni_stats);
3484                 if (copy_to_user(bulk, lpni_msg_stats, sizeof(*lpni_msg_stats)))
3485                         goto out_free_msg_stats;
3486                 bulk += sizeof(*lpni_msg_stats);
3487         }
3488         rc = 0;
3489
3490 out_free_msg_stats:
3491         LIBCFS_FREE(lpni_msg_stats, sizeof(*lpni_msg_stats));
3492 out_free_stats:
3493         LIBCFS_FREE(lpni_stats, sizeof(*lpni_stats));
3494 out_free_info:
3495         LIBCFS_FREE(lpni_info, sizeof(*lpni_info));
3496 out_lp_decref:
3497         lnet_peer_decref_locked(lp);
3498 out:
3499         return rc;
3500 }