Whamcloud - gitweb
6d32f192ead95887d5380c7a927415fd8dfab1a6
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <linux/sched.h>
38 #ifdef HAVE_SCHED_HEADERS
39 #include <linux/sched/signal.h>
40 #endif
41 #include <linux/uaccess.h>
42
43 #include <lnet/lib-lnet.h>
44 #include <uapi/linux/lnet/lnet-dlc.h>
45
46 /* Value indicating that recovery needs to re-check a peer immediately. */
47 #define LNET_REDISCOVER_PEER    (1)
48
49 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp);
50
51 static void
52 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
53 {
54         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
55                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
56                 lnet_peer_ni_decref_locked(lpni);
57         }
58 }
59
60 void
61 lnet_peer_net_added(struct lnet_net *net)
62 {
63         struct lnet_peer_ni *lpni, *tmp;
64
65         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
66                                  lpni_on_remote_peer_ni_list) {
67
68                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
69                         lpni->lpni_net = net;
70
71                         spin_lock(&lpni->lpni_lock);
72                         lpni->lpni_txcredits =
73                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
74                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
75                         lpni->lpni_rtrcredits =
76                                 lnet_peer_buffer_credits(lpni->lpni_net);
77                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
78                         spin_unlock(&lpni->lpni_lock);
79
80                         lnet_peer_remove_from_remote_list(lpni);
81                 }
82         }
83 }
84
85 static void
86 lnet_peer_tables_destroy(void)
87 {
88         struct lnet_peer_table  *ptable;
89         struct list_head        *hash;
90         int                     i;
91         int                     j;
92
93         if (!the_lnet.ln_peer_tables)
94                 return;
95
96         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
97                 hash = ptable->pt_hash;
98                 if (!hash) /* not intialized */
99                         break;
100
101                 LASSERT(list_empty(&ptable->pt_zombie_list));
102
103                 ptable->pt_hash = NULL;
104                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
105                         LASSERT(list_empty(&hash[j]));
106
107                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
108         }
109
110         cfs_percpt_free(the_lnet.ln_peer_tables);
111         the_lnet.ln_peer_tables = NULL;
112 }
113
114 int
115 lnet_peer_tables_create(void)
116 {
117         struct lnet_peer_table  *ptable;
118         struct list_head        *hash;
119         int                     i;
120         int                     j;
121
122         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
123                                                    sizeof(*ptable));
124         if (the_lnet.ln_peer_tables == NULL) {
125                 CERROR("Failed to allocate cpu-partition peer tables\n");
126                 return -ENOMEM;
127         }
128
129         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
130                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
131                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
132                 if (hash == NULL) {
133                         CERROR("Failed to create peer hash table\n");
134                         lnet_peer_tables_destroy();
135                         return -ENOMEM;
136                 }
137
138                 spin_lock_init(&ptable->pt_zombie_lock);
139                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
140
141                 INIT_LIST_HEAD(&ptable->pt_peer_list);
142
143                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
144                         INIT_LIST_HEAD(&hash[j]);
145                 ptable->pt_hash = hash; /* sign of initialization */
146         }
147
148         return 0;
149 }
150
151 static struct lnet_peer_ni *
152 lnet_peer_ni_alloc(lnet_nid_t nid)
153 {
154         struct lnet_peer_ni *lpni;
155         struct lnet_net *net;
156         int cpt;
157
158         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
159
160         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
161         if (!lpni)
162                 return NULL;
163
164         INIT_LIST_HEAD(&lpni->lpni_txq);
165         INIT_LIST_HEAD(&lpni->lpni_rtrq);
166         INIT_LIST_HEAD(&lpni->lpni_routes);
167         INIT_LIST_HEAD(&lpni->lpni_hashlist);
168         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
169         INIT_LIST_HEAD(&lpni->lpni_recovery);
170         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
171
172         spin_lock_init(&lpni->lpni_lock);
173
174         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
175         lpni->lpni_last_alive = ktime_get_seconds(); /* assumes alive */
176         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
177         lpni->lpni_nid = nid;
178         lpni->lpni_cpt = cpt;
179         atomic_set(&lpni->lpni_healthv, LNET_MAX_HEALTH_VALUE);
180
181         net = lnet_get_net_locked(LNET_NIDNET(nid));
182         lpni->lpni_net = net;
183         if (net) {
184                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
185                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
186                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
187                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
188         } else {
189                 /*
190                  * This peer_ni is not on a local network, so we
191                  * cannot add the credits here. In case the net is
192                  * added later, add the peer_ni to the remote peer ni
193                  * list so it can be easily found and revisited.
194                  */
195                 /* FIXME: per-net implementation instead? */
196                 atomic_inc(&lpni->lpni_refcount);
197                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
198                               &the_lnet.ln_remote_peer_ni_list);
199         }
200
201         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
202
203         return lpni;
204 }
205
206 static struct lnet_peer_net *
207 lnet_peer_net_alloc(__u32 net_id)
208 {
209         struct lnet_peer_net *lpn;
210
211         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
212         if (!lpn)
213                 return NULL;
214
215         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
216         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
217         lpn->lpn_net_id = net_id;
218
219         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
220
221         return lpn;
222 }
223
224 void
225 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
226 {
227         struct lnet_peer *lp;
228
229         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
230
231         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
232         LASSERT(list_empty(&lpn->lpn_peer_nis));
233         LASSERT(list_empty(&lpn->lpn_peer_nets));
234         lp = lpn->lpn_peer;
235         lpn->lpn_peer = NULL;
236         LIBCFS_FREE(lpn, sizeof(*lpn));
237
238         lnet_peer_decref_locked(lp);
239 }
240
241 static struct lnet_peer *
242 lnet_peer_alloc(lnet_nid_t nid)
243 {
244         struct lnet_peer *lp;
245
246         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
247         if (!lp)
248                 return NULL;
249
250         INIT_LIST_HEAD(&lp->lp_peer_list);
251         INIT_LIST_HEAD(&lp->lp_peer_nets);
252         INIT_LIST_HEAD(&lp->lp_dc_list);
253         INIT_LIST_HEAD(&lp->lp_dc_pendq);
254         init_waitqueue_head(&lp->lp_dc_waitq);
255         spin_lock_init(&lp->lp_lock);
256         lp->lp_primary_nid = nid;
257         /*
258          * Turn off discovery for loopback peer. If you're creating a peer
259          * for the loopback interface then that was initiated when we
260          * attempted to send a message over the loopback. There is no need
261          * to ever use a different interface when sending messages to
262          * myself.
263          */
264         if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
265                 lp->lp_state = LNET_PEER_NO_DISCOVERY;
266         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
267
268         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
269
270         return lp;
271 }
272
273 void
274 lnet_destroy_peer_locked(struct lnet_peer *lp)
275 {
276         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
277
278         LASSERT(atomic_read(&lp->lp_refcount) == 0);
279         LASSERT(list_empty(&lp->lp_peer_nets));
280         LASSERT(list_empty(&lp->lp_peer_list));
281         LASSERT(list_empty(&lp->lp_dc_list));
282
283         if (lp->lp_data)
284                 lnet_ping_buffer_decref(lp->lp_data);
285
286         /*
287          * if there are messages still on the pending queue, then make
288          * sure to queue them on the ln_msg_resend list so they can be
289          * resent at a later point if the discovery thread is still
290          * running.
291          * If the discovery thread has stopped, then the wakeup will be a
292          * no-op, and it is expected the lnet_shutdown_lndnets() will
293          * eventually be called, which will traverse this list and
294          * finalize the messages on the list.
295          * We can not resend them now because we're holding the cpt lock.
296          * Releasing the lock can cause an inconsistent state
297          */
298         spin_lock(&the_lnet.ln_msg_resend_lock);
299         list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
300         spin_unlock(&the_lnet.ln_msg_resend_lock);
301         wake_up(&the_lnet.ln_dc_waitq);
302
303         LIBCFS_FREE(lp, sizeof(*lp));
304 }
305
306 /*
307  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
308  * that peer_net, detach the peer_net from the peer.
309  *
310  * Call with lnet_net_lock/EX held
311  */
312 static void
313 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
314 {
315         struct lnet_peer_table *ptable;
316         struct lnet_peer_net *lpn;
317         struct lnet_peer *lp;
318
319         /*
320          * Belts and suspenders: gracefully handle teardown of a
321          * partially connected peer_ni.
322          */
323         lpn = lpni->lpni_peer_net;
324
325         list_del_init(&lpni->lpni_peer_nis);
326         /*
327          * If there are no lpni's left, we detach lpn from
328          * lp_peer_nets, so it cannot be found anymore.
329          */
330         if (list_empty(&lpn->lpn_peer_nis))
331                 list_del_init(&lpn->lpn_peer_nets);
332
333         /* Update peer NID count. */
334         lp = lpn->lpn_peer;
335         lp->lp_nnis--;
336
337         /*
338          * If there are no more peer nets, make the peer unfindable
339          * via the peer_tables.
340          *
341          * Otherwise, if the peer is DISCOVERED, tell discovery to
342          * take another look at it. This is a no-op if discovery for
343          * this peer did the detaching.
344          */
345         if (list_empty(&lp->lp_peer_nets)) {
346                 list_del_init(&lp->lp_peer_list);
347                 ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
348                 ptable->pt_peers--;
349         } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) {
350                 /* Discovery isn't running, nothing to do here. */
351         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
352                 lnet_peer_queue_for_discovery(lp);
353                 wake_up(&the_lnet.ln_dc_waitq);
354         }
355         CDEBUG(D_NET, "peer %s NID %s\n",
356                 libcfs_nid2str(lp->lp_primary_nid),
357                 libcfs_nid2str(lpni->lpni_nid));
358 }
359
360 /* called with lnet_net_lock LNET_LOCK_EX held */
361 static int
362 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
363 {
364         struct lnet_peer_table *ptable = NULL;
365
366         /* don't remove a peer_ni if it's also a gateway */
367         if (lpni->lpni_rtr_refcount > 0) {
368                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
369                        libcfs_nid2str(lpni->lpni_nid));
370                 return -EBUSY;
371         }
372
373         lnet_peer_remove_from_remote_list(lpni);
374
375         /* remove peer ni from the hash list. */
376         list_del_init(&lpni->lpni_hashlist);
377
378         /*
379          * indicate the peer is being deleted so the monitor thread can
380          * remove it from the recovery queue.
381          */
382         spin_lock(&lpni->lpni_lock);
383         lpni->lpni_state |= LNET_PEER_NI_DELETING;
384         spin_unlock(&lpni->lpni_lock);
385
386         /* decrement the ref count on the peer table */
387         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
388         LASSERT(ptable->pt_number > 0);
389         ptable->pt_number--;
390
391         /*
392          * The peer_ni can no longer be found with a lookup. But there
393          * can be current users, so keep track of it on the zombie
394          * list until the reference count has gone to zero.
395          *
396          * The last reference may be lost in a place where the
397          * lnet_net_lock locks only a single cpt, and that cpt may not
398          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
399          * has its own lock.
400          */
401         spin_lock(&ptable->pt_zombie_lock);
402         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
403         ptable->pt_zombies++;
404         spin_unlock(&ptable->pt_zombie_lock);
405
406         /* no need to keep this peer_ni on the hierarchy anymore */
407         lnet_peer_detach_peer_ni_locked(lpni);
408
409         /* remove hashlist reference on peer_ni */
410         lnet_peer_ni_decref_locked(lpni);
411
412         return 0;
413 }
414
415 void lnet_peer_uninit(void)
416 {
417         struct lnet_peer_ni *lpni, *tmp;
418
419         lnet_net_lock(LNET_LOCK_EX);
420
421         /* remove all peer_nis from the remote peer and the hash list */
422         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
423                                  lpni_on_remote_peer_ni_list)
424                 lnet_peer_ni_del_locked(lpni);
425
426         lnet_peer_tables_destroy();
427
428         lnet_net_unlock(LNET_LOCK_EX);
429 }
430
431 static int
432 lnet_peer_del_locked(struct lnet_peer *peer)
433 {
434         struct lnet_peer_ni *lpni = NULL, *lpni2;
435         int rc = 0, rc2 = 0;
436
437         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
438
439         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
440         while (lpni != NULL) {
441                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
442                 rc = lnet_peer_ni_del_locked(lpni);
443                 if (rc != 0)
444                         rc2 = rc;
445                 lpni = lpni2;
446         }
447
448         return rc2;
449 }
450
451 static int
452 lnet_peer_del(struct lnet_peer *peer)
453 {
454         lnet_net_lock(LNET_LOCK_EX);
455         lnet_peer_del_locked(peer);
456         lnet_net_unlock(LNET_LOCK_EX);
457
458         return 0;
459 }
460
461 /*
462  * Delete a NID from a peer. Call with ln_api_mutex held.
463  *
464  * Error codes:
465  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
466  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
467  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
468  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
469  */
470 static int
471 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
472 {
473         struct lnet_peer_ni *lpni;
474         lnet_nid_t primary_nid = lp->lp_primary_nid;
475         int rc = 0;
476
477         if (!(flags & LNET_PEER_CONFIGURED)) {
478                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
479                         rc = -EPERM;
480                         goto out;
481                 }
482         }
483         lpni = lnet_find_peer_ni_locked(nid);
484         if (!lpni) {
485                 rc = -ENOENT;
486                 goto out;
487         }
488         lnet_peer_ni_decref_locked(lpni);
489         if (lp != lpni->lpni_peer_net->lpn_peer) {
490                 rc = -ECHILD;
491                 goto out;
492         }
493
494         /*
495          * This function only allows deletion of the primary NID if it
496          * is the only NID.
497          */
498         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
499                 rc = -EBUSY;
500                 goto out;
501         }
502
503         lnet_net_lock(LNET_LOCK_EX);
504         lnet_peer_ni_del_locked(lpni);
505         lnet_net_unlock(LNET_LOCK_EX);
506
507 out:
508         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
509                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
510
511         return rc;
512 }
513
514 static void
515 lnet_peer_table_cleanup_locked(struct lnet_net *net,
516                                struct lnet_peer_table *ptable)
517 {
518         int                      i;
519         struct lnet_peer_ni     *next;
520         struct lnet_peer_ni     *lpni;
521         struct lnet_peer        *peer;
522
523         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
524                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
525                                          lpni_hashlist) {
526                         if (net != NULL && net != lpni->lpni_net)
527                                 continue;
528
529                         peer = lpni->lpni_peer_net->lpn_peer;
530                         if (peer->lp_primary_nid != lpni->lpni_nid) {
531                                 lnet_peer_ni_del_locked(lpni);
532                                 continue;
533                         }
534                         /*
535                          * Removing the primary NID implies removing
536                          * the entire peer. Advance next beyond any
537                          * peer_ni that belongs to the same peer.
538                          */
539                         list_for_each_entry_from(next, &ptable->pt_hash[i],
540                                                  lpni_hashlist) {
541                                 if (next->lpni_peer_net->lpn_peer != peer)
542                                         break;
543                         }
544                         lnet_peer_del_locked(peer);
545                 }
546         }
547 }
548
549 static void
550 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
551 {
552         int     i = 3;
553
554         spin_lock(&ptable->pt_zombie_lock);
555         while (ptable->pt_zombies) {
556                 spin_unlock(&ptable->pt_zombie_lock);
557
558                 if (is_power_of_2(i)) {
559                         CDEBUG(D_WARNING,
560                                "Waiting for %d zombies on peer table\n",
561                                ptable->pt_zombies);
562                 }
563                 set_current_state(TASK_UNINTERRUPTIBLE);
564                 schedule_timeout(cfs_time_seconds(1) >> 1);
565                 spin_lock(&ptable->pt_zombie_lock);
566         }
567         spin_unlock(&ptable->pt_zombie_lock);
568 }
569
570 static void
571 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
572                                 struct lnet_peer_table *ptable)
573 {
574         struct lnet_peer_ni     *lp;
575         struct lnet_peer_ni     *tmp;
576         lnet_nid_t              lpni_nid;
577         int                     i;
578
579         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
580                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
581                                          lpni_hashlist) {
582                         if (net != lp->lpni_net)
583                                 continue;
584
585                         if (lp->lpni_rtr_refcount == 0)
586                                 continue;
587
588                         lpni_nid = lp->lpni_nid;
589
590                         lnet_net_unlock(LNET_LOCK_EX);
591                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
592                         lnet_net_lock(LNET_LOCK_EX);
593                 }
594         }
595 }
596
597 void
598 lnet_peer_tables_cleanup(struct lnet_net *net)
599 {
600         int i;
601         struct lnet_peer_table *ptable;
602
603         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
604         /* If just deleting the peers for a NI, get rid of any routes these
605          * peers are gateways for. */
606         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
607                 lnet_net_lock(LNET_LOCK_EX);
608                 lnet_peer_table_del_rtrs_locked(net, ptable);
609                 lnet_net_unlock(LNET_LOCK_EX);
610         }
611
612         /* Start the cleanup process */
613         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
614                 lnet_net_lock(LNET_LOCK_EX);
615                 lnet_peer_table_cleanup_locked(net, ptable);
616                 lnet_net_unlock(LNET_LOCK_EX);
617         }
618
619         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
620                 lnet_peer_ni_finalize_wait(ptable);
621 }
622
623 static struct lnet_peer_ni *
624 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
625 {
626         struct list_head        *peers;
627         struct lnet_peer_ni     *lp;
628
629         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
630
631         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
632         list_for_each_entry(lp, peers, lpni_hashlist) {
633                 if (lp->lpni_nid == nid) {
634                         lnet_peer_ni_addref_locked(lp);
635                         return lp;
636                 }
637         }
638
639         return NULL;
640 }
641
642 struct lnet_peer_ni *
643 lnet_find_peer_ni_locked(lnet_nid_t nid)
644 {
645         struct lnet_peer_ni *lpni;
646         struct lnet_peer_table *ptable;
647         int cpt;
648
649         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
650
651         ptable = the_lnet.ln_peer_tables[cpt];
652         lpni = lnet_get_peer_ni_locked(ptable, nid);
653
654         return lpni;
655 }
656
657 struct lnet_peer *
658 lnet_find_peer(lnet_nid_t nid)
659 {
660         struct lnet_peer_ni *lpni;
661         struct lnet_peer *lp = NULL;
662         int cpt;
663
664         cpt = lnet_net_lock_current();
665         lpni = lnet_find_peer_ni_locked(nid);
666         if (lpni) {
667                 lp = lpni->lpni_peer_net->lpn_peer;
668                 lnet_peer_addref_locked(lp);
669                 lnet_peer_ni_decref_locked(lpni);
670         }
671         lnet_net_unlock(cpt);
672
673         return lp;
674 }
675
676 struct lnet_peer_ni *
677 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
678                              struct lnet_peer_net *peer_net,
679                              struct lnet_peer_ni *prev)
680 {
681         struct lnet_peer_ni *lpni;
682         struct lnet_peer_net *net = peer_net;
683
684         if (!prev) {
685                 if (!net) {
686                         if (list_empty(&peer->lp_peer_nets))
687                                 return NULL;
688
689                         net = list_entry(peer->lp_peer_nets.next,
690                                          struct lnet_peer_net,
691                                          lpn_peer_nets);
692                 }
693                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
694                                   lpni_peer_nis);
695
696                 return lpni;
697         }
698
699         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
700                 /*
701                  * if you reached the end of the peer ni list and the peer
702                  * net is specified then there are no more peer nis in that
703                  * net.
704                  */
705                 if (net)
706                         return NULL;
707
708                 /*
709                  * we reached the end of this net ni list. move to the
710                  * next net
711                  */
712                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
713                     &peer->lp_peer_nets)
714                         /* no more nets and no more NIs. */
715                         return NULL;
716
717                 /* get the next net */
718                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
719                                  struct lnet_peer_net,
720                                  lpn_peer_nets);
721                 /* get the ni on it */
722                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
723                                   lpni_peer_nis);
724
725                 return lpni;
726         }
727
728         /* there are more nis left */
729         lpni = list_entry(prev->lpni_peer_nis.next,
730                           struct lnet_peer_ni, lpni_peer_nis);
731
732         return lpni;
733 }
734
735 /* Call with the ln_api_mutex held */
736 int lnet_get_peer_list(u32 *countp, u32 *sizep, struct lnet_process_id __user *ids)
737 {
738         struct lnet_process_id id;
739         struct lnet_peer_table *ptable;
740         struct lnet_peer *lp;
741         __u32 count = 0;
742         __u32 size = 0;
743         int lncpt;
744         int cpt;
745         __u32 i;
746         int rc;
747
748         rc = -ESHUTDOWN;
749         if (the_lnet.ln_state != LNET_STATE_RUNNING)
750                 goto done;
751
752         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
753
754         /*
755          * Count the number of peers, and return E2BIG if the buffer
756          * is too small. We'll also return the desired size.
757          */
758         rc = -E2BIG;
759         for (cpt = 0; cpt < lncpt; cpt++) {
760                 ptable = the_lnet.ln_peer_tables[cpt];
761                 count += ptable->pt_peers;
762         }
763         size = count * sizeof(*ids);
764         if (size > *sizep)
765                 goto done;
766
767         /*
768          * Walk the peer lists and copy out the primary nids.
769          * This is safe because the peer lists are only modified
770          * while the ln_api_mutex is held. So we don't need to
771          * hold the lnet_net_lock as well, and can therefore
772          * directly call copy_to_user().
773          */
774         rc = -EFAULT;
775         memset(&id, 0, sizeof(id));
776         id.pid = LNET_PID_LUSTRE;
777         i = 0;
778         for (cpt = 0; cpt < lncpt; cpt++) {
779                 ptable = the_lnet.ln_peer_tables[cpt];
780                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
781                         if (i >= count)
782                                 goto done;
783                         id.nid = lp->lp_primary_nid;
784                         if (copy_to_user(&ids[i], &id, sizeof(id)))
785                                 goto done;
786                         i++;
787                 }
788         }
789         rc = 0;
790 done:
791         *countp = count;
792         *sizep = size;
793         return rc;
794 }
795
796 /*
797  * Start pushes to peers that need to be updated for a configuration
798  * change on this node.
799  */
800 void
801 lnet_push_update_to_peers(int force)
802 {
803         struct lnet_peer_table *ptable;
804         struct lnet_peer *lp;
805         int lncpt;
806         int cpt;
807
808         lnet_net_lock(LNET_LOCK_EX);
809         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
810         for (cpt = 0; cpt < lncpt; cpt++) {
811                 ptable = the_lnet.ln_peer_tables[cpt];
812                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
813                         if (force) {
814                                 spin_lock(&lp->lp_lock);
815                                 if (lp->lp_state & LNET_PEER_MULTI_RAIL)
816                                         lp->lp_state |= LNET_PEER_FORCE_PUSH;
817                                 spin_unlock(&lp->lp_lock);
818                         }
819                         if (lnet_peer_needs_push(lp))
820                                 lnet_peer_queue_for_discovery(lp);
821                 }
822         }
823         lnet_net_unlock(LNET_LOCK_EX);
824         wake_up(&the_lnet.ln_dc_waitq);
825 }
826
827 /*
828  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
829  * this is a preferred point-to-point path. Call with lnet_net_lock in
830  * shared mmode.
831  */
832 bool
833 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
834 {
835         int i;
836
837         if (lpni->lpni_pref_nnids == 0)
838                 return false;
839         if (lpni->lpni_pref_nnids == 1)
840                 return lpni->lpni_pref.nid == nid;
841         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
842                 if (lpni->lpni_pref.nids[i] == nid)
843                         return true;
844         }
845         return false;
846 }
847
848 /*
849  * Set a single ni as preferred, provided no preferred ni is already
850  * defined. Only to be used for non-multi-rail peer_ni.
851  */
852 int
853 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
854 {
855         int rc = 0;
856
857         spin_lock(&lpni->lpni_lock);
858         if (nid == LNET_NID_ANY) {
859                 rc = -EINVAL;
860         } else if (lpni->lpni_pref_nnids > 0) {
861                 rc = -EPERM;
862         } else if (lpni->lpni_pref_nnids == 0) {
863                 lpni->lpni_pref.nid = nid;
864                 lpni->lpni_pref_nnids = 1;
865                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
866         }
867         spin_unlock(&lpni->lpni_lock);
868
869         CDEBUG(D_NET, "peer %s nid %s: %d\n",
870                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
871         return rc;
872 }
873
874 /*
875  * Clear the preferred NID from a non-multi-rail peer_ni, provided
876  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
877  */
878 int
879 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
880 {
881         int rc = 0;
882
883         spin_lock(&lpni->lpni_lock);
884         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
885                 lpni->lpni_pref_nnids = 0;
886                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
887         } else if (lpni->lpni_pref_nnids == 0) {
888                 rc = -ENOENT;
889         } else {
890                 rc = -EPERM;
891         }
892         spin_unlock(&lpni->lpni_lock);
893
894         CDEBUG(D_NET, "peer %s: %d\n",
895                libcfs_nid2str(lpni->lpni_nid), rc);
896         return rc;
897 }
898
899 /*
900  * Clear the preferred NIDs from a non-multi-rail peer.
901  */
902 void
903 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
904 {
905         struct lnet_peer_ni *lpni = NULL;
906
907         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
908                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
909 }
910
911 int
912 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
913 {
914         lnet_nid_t *nids = NULL;
915         lnet_nid_t *oldnids = NULL;
916         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
917         int size;
918         int i;
919         int rc = 0;
920
921         if (nid == LNET_NID_ANY) {
922                 rc = -EINVAL;
923                 goto out;
924         }
925
926         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
927                 rc = -EEXIST;
928                 goto out;
929         }
930
931         /* A non-MR node may have only one preferred NI per peer_ni */
932         if (lpni->lpni_pref_nnids > 0) {
933                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
934                         rc = -EPERM;
935                         goto out;
936                 }
937         }
938
939         if (lpni->lpni_pref_nnids != 0) {
940                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
941                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
942                 if (!nids) {
943                         rc = -ENOMEM;
944                         goto out;
945                 }
946                 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
947                         if (lpni->lpni_pref.nids[i] == nid) {
948                                 LIBCFS_FREE(nids, size);
949                                 rc = -EEXIST;
950                                 goto out;
951                         }
952                         nids[i] = lpni->lpni_pref.nids[i];
953                 }
954                 nids[i] = nid;
955         }
956
957         lnet_net_lock(LNET_LOCK_EX);
958         spin_lock(&lpni->lpni_lock);
959         if (lpni->lpni_pref_nnids == 0) {
960                 lpni->lpni_pref.nid = nid;
961         } else {
962                 oldnids = lpni->lpni_pref.nids;
963                 lpni->lpni_pref.nids = nids;
964         }
965         lpni->lpni_pref_nnids++;
966         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
967         spin_unlock(&lpni->lpni_lock);
968         lnet_net_unlock(LNET_LOCK_EX);
969
970         if (oldnids) {
971                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
972                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
973         }
974 out:
975         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
976                 spin_lock(&lpni->lpni_lock);
977                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
978                 spin_unlock(&lpni->lpni_lock);
979         }
980         CDEBUG(D_NET, "peer %s nid %s: %d\n",
981                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
982         return rc;
983 }
984
985 int
986 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
987 {
988         lnet_nid_t *nids = NULL;
989         lnet_nid_t *oldnids = NULL;
990         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
991         int size;
992         int i, j;
993         int rc = 0;
994
995         if (lpni->lpni_pref_nnids == 0) {
996                 rc = -ENOENT;
997                 goto out;
998         }
999
1000         if (lpni->lpni_pref_nnids == 1) {
1001                 if (lpni->lpni_pref.nid != nid) {
1002                         rc = -ENOENT;
1003                         goto out;
1004                 }
1005         } else if (lpni->lpni_pref_nnids == 2) {
1006                 if (lpni->lpni_pref.nids[0] != nid &&
1007                     lpni->lpni_pref.nids[1] != nid) {
1008                         rc = -ENOENT;
1009                         goto out;
1010                 }
1011         } else {
1012                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
1013                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
1014                 if (!nids) {
1015                         rc = -ENOMEM;
1016                         goto out;
1017                 }
1018                 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
1019                         if (lpni->lpni_pref.nids[i] != nid)
1020                                 continue;
1021                         nids[j++] = lpni->lpni_pref.nids[i];
1022                 }
1023                 /* Check if we actually removed a nid. */
1024                 if (j == lpni->lpni_pref_nnids) {
1025                         LIBCFS_FREE(nids, size);
1026                         rc = -ENOENT;
1027                         goto out;
1028                 }
1029         }
1030
1031         lnet_net_lock(LNET_LOCK_EX);
1032         spin_lock(&lpni->lpni_lock);
1033         if (lpni->lpni_pref_nnids == 1) {
1034                 lpni->lpni_pref.nid = LNET_NID_ANY;
1035         } else if (lpni->lpni_pref_nnids == 2) {
1036                 oldnids = lpni->lpni_pref.nids;
1037                 if (oldnids[0] == nid)
1038                         lpni->lpni_pref.nid = oldnids[1];
1039                 else
1040                         lpni->lpni_pref.nid = oldnids[2];
1041         } else {
1042                 oldnids = lpni->lpni_pref.nids;
1043                 lpni->lpni_pref.nids = nids;
1044         }
1045         lpni->lpni_pref_nnids--;
1046         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1047         spin_unlock(&lpni->lpni_lock);
1048         lnet_net_unlock(LNET_LOCK_EX);
1049
1050         if (oldnids) {
1051                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
1052                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
1053         }
1054 out:
1055         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1056                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
1057         return rc;
1058 }
1059
1060 lnet_nid_t
1061 lnet_peer_primary_nid_locked(lnet_nid_t nid)
1062 {
1063         struct lnet_peer_ni *lpni;
1064         lnet_nid_t primary_nid = nid;
1065
1066         lpni = lnet_find_peer_ni_locked(nid);
1067         if (lpni) {
1068                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
1069                 lnet_peer_ni_decref_locked(lpni);
1070         }
1071
1072         return primary_nid;
1073 }
1074
1075 lnet_nid_t
1076 LNetPrimaryNID(lnet_nid_t nid)
1077 {
1078         struct lnet_peer *lp;
1079         struct lnet_peer_ni *lpni;
1080         lnet_nid_t primary_nid = nid;
1081         int rc = 0;
1082         int cpt;
1083
1084         cpt = lnet_net_lock_current();
1085         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1086         if (IS_ERR(lpni)) {
1087                 rc = PTR_ERR(lpni);
1088                 goto out_unlock;
1089         }
1090         lp = lpni->lpni_peer_net->lpn_peer;
1091         while (!lnet_peer_is_uptodate(lp)) {
1092                 rc = lnet_discover_peer_locked(lpni, cpt, true);
1093                 if (rc)
1094                         goto out_decref;
1095                 lp = lpni->lpni_peer_net->lpn_peer;
1096         }
1097         primary_nid = lp->lp_primary_nid;
1098 out_decref:
1099         lnet_peer_ni_decref_locked(lpni);
1100 out_unlock:
1101         lnet_net_unlock(cpt);
1102
1103         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
1104                libcfs_nid2str(primary_nid), rc);
1105         return primary_nid;
1106 }
1107 EXPORT_SYMBOL(LNetPrimaryNID);
1108
1109 struct lnet_peer_net *
1110 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
1111 {
1112         struct lnet_peer_net *peer_net;
1113         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
1114                 if (peer_net->lpn_net_id == net_id)
1115                         return peer_net;
1116         }
1117         return NULL;
1118 }
1119
1120 /*
1121  * Attach a peer_ni to a peer_net and peer. This function assumes
1122  * peer_ni is not already attached to the peer_net/peer. The peer_ni
1123  * may be attached to a different peer, in which case it will be
1124  * properly detached first. The whole operation is done atomically.
1125  *
1126  * Always returns 0.  This is the last function called from functions
1127  * that do return an int, so returning 0 here allows the compiler to
1128  * do a tail call.
1129  */
1130 static int
1131 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
1132                                 struct lnet_peer_net *lpn,
1133                                 struct lnet_peer_ni *lpni,
1134                                 unsigned flags)
1135 {
1136         struct lnet_peer_table *ptable;
1137
1138         /* Install the new peer_ni */
1139         lnet_net_lock(LNET_LOCK_EX);
1140         /* Add peer_ni to global peer table hash, if necessary. */
1141         if (list_empty(&lpni->lpni_hashlist)) {
1142                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1143
1144                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1145                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1146                 ptable->pt_version++;
1147                 ptable->pt_number++;
1148                 /* This is the 1st refcount on lpni. */
1149                 atomic_inc(&lpni->lpni_refcount);
1150         }
1151
1152         /* Detach the peer_ni from an existing peer, if necessary. */
1153         if (lpni->lpni_peer_net) {
1154                 LASSERT(lpni->lpni_peer_net != lpn);
1155                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1156                 lnet_peer_detach_peer_ni_locked(lpni);
1157                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1158                 lpni->lpni_peer_net = NULL;
1159         }
1160
1161         /* Add peer_ni to peer_net */
1162         lpni->lpni_peer_net = lpn;
1163         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1164         lnet_peer_net_addref_locked(lpn);
1165
1166         /* Add peer_net to peer */
1167         if (!lpn->lpn_peer) {
1168                 lpn->lpn_peer = lp;
1169                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1170                 lnet_peer_addref_locked(lp);
1171         }
1172
1173         /* Add peer to global peer list, if necessary */
1174         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1175         if (list_empty(&lp->lp_peer_list)) {
1176                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1177                 ptable->pt_peers++;
1178         }
1179
1180
1181         /* Update peer state */
1182         spin_lock(&lp->lp_lock);
1183         if (flags & LNET_PEER_CONFIGURED) {
1184                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1185                         lp->lp_state |= LNET_PEER_CONFIGURED;
1186         }
1187         if (flags & LNET_PEER_MULTI_RAIL) {
1188                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1189                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1190                         lnet_peer_clr_non_mr_pref_nids(lp);
1191                 }
1192         }
1193         spin_unlock(&lp->lp_lock);
1194
1195         lp->lp_nnis++;
1196         lnet_net_unlock(LNET_LOCK_EX);
1197
1198         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1199                libcfs_nid2str(lp->lp_primary_nid),
1200                libcfs_nid2str(lpni->lpni_nid), flags);
1201
1202         return 0;
1203 }
1204
1205 /*
1206  * Create a new peer, with nid as its primary nid.
1207  *
1208  * Call with the lnet_api_mutex held.
1209  */
1210 static int
1211 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1212 {
1213         struct lnet_peer *lp;
1214         struct lnet_peer_net *lpn;
1215         struct lnet_peer_ni *lpni;
1216         int rc = 0;
1217
1218         LASSERT(nid != LNET_NID_ANY);
1219
1220         /*
1221          * No need for the lnet_net_lock here, because the
1222          * lnet_api_mutex is held.
1223          */
1224         lpni = lnet_find_peer_ni_locked(nid);
1225         if (lpni) {
1226                 /* A peer with this NID already exists. */
1227                 lp = lpni->lpni_peer_net->lpn_peer;
1228                 lnet_peer_ni_decref_locked(lpni);
1229                 /*
1230                  * This is an error if the peer was configured and the
1231                  * primary NID differs or an attempt is made to change
1232                  * the Multi-Rail flag. Otherwise the assumption is
1233                  * that an existing peer is being modified.
1234                  */
1235                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1236                         if (lp->lp_primary_nid != nid)
1237                                 rc = -EEXIST;
1238                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1239                                 rc = -EPERM;
1240                         goto out;
1241                 }
1242                 /* Delete and recreate as a configured peer. */
1243                 lnet_peer_del(lp);
1244         }
1245
1246         /* Create peer, peer_net, and peer_ni. */
1247         rc = -ENOMEM;
1248         lp = lnet_peer_alloc(nid);
1249         if (!lp)
1250                 goto out;
1251         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1252         if (!lpn)
1253                 goto out_free_lp;
1254         lpni = lnet_peer_ni_alloc(nid);
1255         if (!lpni)
1256                 goto out_free_lpn;
1257
1258         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1259
1260 out_free_lpn:
1261         LIBCFS_FREE(lpn, sizeof(*lpn));
1262 out_free_lp:
1263         LIBCFS_FREE(lp, sizeof(*lp));
1264 out:
1265         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1266                libcfs_nid2str(nid), flags, rc);
1267         return rc;
1268 }
1269
1270 /*
1271  * Add a NID to a peer. Call with ln_api_mutex held.
1272  *
1273  * Error codes:
1274  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1275  *  -EEXIST:   The NID was configured by DLC for a different peer.
1276  *  -ENOMEM:   Out of memory.
1277  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1278  *             non-multi-rail peer.
1279  */
1280 static int
1281 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1282 {
1283         struct lnet_peer_net *lpn;
1284         struct lnet_peer_ni *lpni;
1285         int rc = 0;
1286
1287         LASSERT(lp);
1288         LASSERT(nid != LNET_NID_ANY);
1289
1290         /* A configured peer can only be updated through configuration. */
1291         if (!(flags & LNET_PEER_CONFIGURED)) {
1292                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1293                         rc = -EPERM;
1294                         goto out;
1295                 }
1296         }
1297
1298         /*
1299          * The MULTI_RAIL flag can be set but not cleared, because
1300          * that would leave the peer struct in an invalid state.
1301          */
1302         if (flags & LNET_PEER_MULTI_RAIL) {
1303                 spin_lock(&lp->lp_lock);
1304                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1305                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1306                         lnet_peer_clr_non_mr_pref_nids(lp);
1307                 }
1308                 spin_unlock(&lp->lp_lock);
1309         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1310                 rc = -EPERM;
1311                 goto out;
1312         }
1313
1314         lpni = lnet_find_peer_ni_locked(nid);
1315         if (lpni) {
1316                 /*
1317                  * A peer_ni already exists. This is only a problem if
1318                  * it is not connected to this peer and was configured
1319                  * by DLC.
1320                  */
1321                 lnet_peer_ni_decref_locked(lpni);
1322                 if (lpni->lpni_peer_net->lpn_peer == lp)
1323                         goto out;
1324                 if (lnet_peer_ni_is_configured(lpni)) {
1325                         rc = -EEXIST;
1326                         goto out;
1327                 }
1328                 /* If this is the primary NID, destroy the peer. */
1329                 if (lnet_peer_ni_is_primary(lpni)) {
1330                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1331                         lpni = lnet_peer_ni_alloc(nid);
1332                         if (!lpni) {
1333                                 rc = -ENOMEM;
1334                                 goto out;
1335                         }
1336                 }
1337         } else {
1338                 lpni = lnet_peer_ni_alloc(nid);
1339                 if (!lpni) {
1340                         rc = -ENOMEM;
1341                         goto out;
1342                 }
1343         }
1344
1345         /*
1346          * Get the peer_net. Check that we're not adding a second
1347          * peer_ni on a peer_net of a non-multi-rail peer.
1348          */
1349         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1350         if (!lpn) {
1351                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1352                 if (!lpn) {
1353                         rc = -ENOMEM;
1354                         goto out_free_lpni;
1355                 }
1356         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1357                 rc = -ENOTUNIQ;
1358                 goto out_free_lpni;
1359         }
1360
1361         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1362
1363 out_free_lpni:
1364         /* If the peer_ni was allocated above its peer_net pointer is NULL */
1365         if (!lpni->lpni_peer_net)
1366                 LIBCFS_FREE(lpni, sizeof(*lpni));
1367 out:
1368         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1369                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1370                flags, rc);
1371         return rc;
1372 }
1373
1374 /*
1375  * Update the primary NID of a peer, if possible.
1376  *
1377  * Call with the lnet_api_mutex held.
1378  */
1379 static int
1380 lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1381 {
1382         lnet_nid_t old = lp->lp_primary_nid;
1383         int rc = 0;
1384
1385         if (lp->lp_primary_nid == nid)
1386                 goto out;
1387         rc = lnet_peer_add_nid(lp, nid, flags);
1388         if (rc)
1389                 goto out;
1390         lp->lp_primary_nid = nid;
1391 out:
1392         CDEBUG(D_NET, "peer %s NID %s: %d\n",
1393                libcfs_nid2str(old), libcfs_nid2str(nid), rc);
1394         return rc;
1395 }
1396
1397 /*
1398  * lpni creation initiated due to traffic either sending or receiving.
1399  */
1400 static int
1401 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1402 {
1403         struct lnet_peer *lp;
1404         struct lnet_peer_net *lpn;
1405         struct lnet_peer_ni *lpni;
1406         unsigned flags = 0;
1407         int rc = 0;
1408
1409         if (nid == LNET_NID_ANY) {
1410                 rc = -EINVAL;
1411                 goto out;
1412         }
1413
1414         /* lnet_net_lock is not needed here because ln_api_lock is held */
1415         lpni = lnet_find_peer_ni_locked(nid);
1416         if (lpni) {
1417                 /*
1418                  * We must have raced with another thread. Since we
1419                  * know next to nothing about a peer_ni created by
1420                  * traffic, we just assume everything is ok and
1421                  * return.
1422                  */
1423                 lnet_peer_ni_decref_locked(lpni);
1424                 goto out;
1425         }
1426
1427         /* Create peer, peer_net, and peer_ni. */
1428         rc = -ENOMEM;
1429         lp = lnet_peer_alloc(nid);
1430         if (!lp)
1431                 goto out;
1432         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1433         if (!lpn)
1434                 goto out_free_lp;
1435         lpni = lnet_peer_ni_alloc(nid);
1436         if (!lpni)
1437                 goto out_free_lpn;
1438         if (pref != LNET_NID_ANY)
1439                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1440
1441         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1442
1443 out_free_lpn:
1444         LIBCFS_FREE(lpn, sizeof(*lpn));
1445 out_free_lp:
1446         LIBCFS_FREE(lp, sizeof(*lp));
1447 out:
1448         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1449         return rc;
1450 }
1451
1452 /*
1453  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1454  *
1455  * This API handles the following combinations:
1456  *   Create a peer with its primary NI if only the prim_nid is provided
1457  *   Add a NID to a peer identified by the prim_nid. The peer identified
1458  *   by the prim_nid must already exist.
1459  *   The peer being created may be non-MR.
1460  *
1461  * The caller must hold ln_api_mutex. This prevents the peer from
1462  * being created/modified/deleted by a different thread.
1463  */
1464 int
1465 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1466 {
1467         struct lnet_peer *lp = NULL;
1468         struct lnet_peer_ni *lpni;
1469         unsigned flags;
1470
1471         /* The prim_nid must always be specified */
1472         if (prim_nid == LNET_NID_ANY)
1473                 return -EINVAL;
1474
1475         flags = LNET_PEER_CONFIGURED;
1476         if (mr)
1477                 flags |= LNET_PEER_MULTI_RAIL;
1478
1479         /*
1480          * If nid isn't specified, we must create a new peer with
1481          * prim_nid as its primary nid.
1482          */
1483         if (nid == LNET_NID_ANY)
1484                 return lnet_peer_add(prim_nid, flags);
1485
1486         /* Look up the prim_nid, which must exist. */
1487         lpni = lnet_find_peer_ni_locked(prim_nid);
1488         if (!lpni)
1489                 return -ENOENT;
1490         lnet_peer_ni_decref_locked(lpni);
1491         lp = lpni->lpni_peer_net->lpn_peer;
1492
1493         /* Peer must have been configured. */
1494         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1495                 CDEBUG(D_NET, "peer %s was not configured\n",
1496                        libcfs_nid2str(prim_nid));
1497                 return -ENOENT;
1498         }
1499
1500         /* Primary NID must match */
1501         if (lp->lp_primary_nid != prim_nid) {
1502                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1503                        libcfs_nid2str(prim_nid),
1504                        libcfs_nid2str(lp->lp_primary_nid));
1505                 return -ENODEV;
1506         }
1507
1508         /* Multi-Rail flag must match. */
1509         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1510                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1511                        libcfs_nid2str(prim_nid));
1512                 return -EPERM;
1513         }
1514
1515         return lnet_peer_add_nid(lp, nid, flags);
1516 }
1517
1518 /*
1519  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1520  *
1521  * This API handles the following combinations:
1522  *   Delete a NI from a peer if both prim_nid and nid are provided.
1523  *   Delete a peer if only prim_nid is provided.
1524  *   Delete a peer if its primary nid is provided.
1525  *
1526  * The caller must hold ln_api_mutex. This prevents the peer from
1527  * being modified/deleted by a different thread.
1528  */
1529 int
1530 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1531 {
1532         struct lnet_peer *lp;
1533         struct lnet_peer_ni *lpni;
1534         unsigned flags;
1535
1536         if (prim_nid == LNET_NID_ANY)
1537                 return -EINVAL;
1538
1539         lpni = lnet_find_peer_ni_locked(prim_nid);
1540         if (!lpni)
1541                 return -ENOENT;
1542         lnet_peer_ni_decref_locked(lpni);
1543         lp = lpni->lpni_peer_net->lpn_peer;
1544
1545         if (prim_nid != lp->lp_primary_nid) {
1546                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1547                        libcfs_nid2str(prim_nid),
1548                        libcfs_nid2str(lp->lp_primary_nid));
1549                 return -ENODEV;
1550         }
1551
1552         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1553                 return lnet_peer_del(lp);
1554
1555         flags = LNET_PEER_CONFIGURED;
1556         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1557                 flags |= LNET_PEER_MULTI_RAIL;
1558
1559         return lnet_peer_del_nid(lp, nid, flags);
1560 }
1561
1562 void
1563 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1564 {
1565         struct lnet_peer_table *ptable;
1566         struct lnet_peer_net *lpn;
1567
1568         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1569
1570         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1571         LASSERT(lpni->lpni_rtr_refcount == 0);
1572         LASSERT(list_empty(&lpni->lpni_txq));
1573         LASSERT(lpni->lpni_txqnob == 0);
1574         LASSERT(list_empty(&lpni->lpni_peer_nis));
1575         LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
1576
1577         lpn = lpni->lpni_peer_net;
1578         lpni->lpni_peer_net = NULL;
1579         lpni->lpni_net = NULL;
1580
1581         /* remove the peer ni from the zombie list */
1582         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1583         spin_lock(&ptable->pt_zombie_lock);
1584         list_del_init(&lpni->lpni_hashlist);
1585         ptable->pt_zombies--;
1586         spin_unlock(&ptable->pt_zombie_lock);
1587
1588         if (lpni->lpni_pref_nnids > 1) {
1589                 LIBCFS_FREE(lpni->lpni_pref.nids,
1590                         sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1591         }
1592         LIBCFS_FREE(lpni, sizeof(*lpni));
1593
1594         lnet_peer_net_decref_locked(lpn);
1595 }
1596
1597 struct lnet_peer_ni *
1598 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1599 {
1600         struct lnet_peer_ni *lpni = NULL;
1601         int rc;
1602
1603         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1604                 return ERR_PTR(-ESHUTDOWN);
1605
1606         /*
1607          * find if a peer_ni already exists.
1608          * If so then just return that.
1609          */
1610         lpni = lnet_find_peer_ni_locked(nid);
1611         if (lpni)
1612                 return lpni;
1613
1614         lnet_net_unlock(cpt);
1615
1616         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1617         if (rc) {
1618                 lpni = ERR_PTR(rc);
1619                 goto out_net_relock;
1620         }
1621
1622         lpni = lnet_find_peer_ni_locked(nid);
1623         LASSERT(lpni);
1624
1625 out_net_relock:
1626         lnet_net_lock(cpt);
1627
1628         return lpni;
1629 }
1630
1631 /*
1632  * Get a peer_ni for the given nid, create it if necessary. Takes a
1633  * hold on the peer_ni.
1634  */
1635 struct lnet_peer_ni *
1636 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1637 {
1638         struct lnet_peer_ni *lpni = NULL;
1639         int rc;
1640
1641         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1642                 return ERR_PTR(-ESHUTDOWN);
1643
1644         /*
1645          * find if a peer_ni already exists.
1646          * If so then just return that.
1647          */
1648         lpni = lnet_find_peer_ni_locked(nid);
1649         if (lpni)
1650                 return lpni;
1651
1652         /*
1653          * Slow path:
1654          * use the lnet_api_mutex to serialize the creation of the peer_ni
1655          * and the creation/deletion of the local ni/net. When a local ni is
1656          * created, if there exists a set of peer_nis on that network,
1657          * they need to be traversed and updated. When a local NI is
1658          * deleted, which could result in a network being deleted, then
1659          * all peer nis on that network need to be removed as well.
1660          *
1661          * Creation through traffic should also be serialized with
1662          * creation through DLC.
1663          */
1664         lnet_net_unlock(cpt);
1665         mutex_lock(&the_lnet.ln_api_mutex);
1666         /*
1667          * Shutdown is only set under the ln_api_lock, so a single
1668          * check here is sufficent.
1669          */
1670         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1671                 lpni = ERR_PTR(-ESHUTDOWN);
1672                 goto out_mutex_unlock;
1673         }
1674
1675         rc = lnet_peer_ni_traffic_add(nid, pref);
1676         if (rc) {
1677                 lpni = ERR_PTR(rc);
1678                 goto out_mutex_unlock;
1679         }
1680
1681         lpni = lnet_find_peer_ni_locked(nid);
1682         LASSERT(lpni);
1683
1684 out_mutex_unlock:
1685         mutex_unlock(&the_lnet.ln_api_mutex);
1686         lnet_net_lock(cpt);
1687
1688         /* Lock has been dropped, check again for shutdown. */
1689         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1690                 if (!IS_ERR(lpni))
1691                         lnet_peer_ni_decref_locked(lpni);
1692                 lpni = ERR_PTR(-ESHUTDOWN);
1693         }
1694
1695         return lpni;
1696 }
1697
1698 /*
1699  * Peer Discovery
1700  */
1701
1702 /*
1703  * Is a peer uptodate from the point of view of discovery?
1704  *
1705  * If it is currently being processed, obviously not.
1706  * A forced Ping or Push is also handled by the discovery thread.
1707  *
1708  * Otherwise look at whether the peer needs rediscovering.
1709  */
1710 bool
1711 lnet_peer_is_uptodate(struct lnet_peer *lp)
1712 {
1713         bool rc;
1714
1715         spin_lock(&lp->lp_lock);
1716         if (lp->lp_state & (LNET_PEER_DISCOVERING |
1717                             LNET_PEER_FORCE_PING |
1718                             LNET_PEER_FORCE_PUSH)) {
1719                 rc = false;
1720         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1721                 rc = true;
1722         } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
1723                 if (lnet_peer_discovery_disabled)
1724                         rc = true;
1725                 else
1726                         rc = false;
1727         } else if (lnet_peer_needs_push(lp)) {
1728                 rc = false;
1729         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1730                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1731                         rc = true;
1732                 else
1733                         rc = false;
1734         } else {
1735                 rc = false;
1736         }
1737         spin_unlock(&lp->lp_lock);
1738
1739         return rc;
1740 }
1741
1742 /*
1743  * Queue a peer for the attention of the discovery thread.  Call with
1744  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1745  * -EALREADY if the peer was already queued.
1746  */
1747 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1748 {
1749         int rc;
1750
1751         spin_lock(&lp->lp_lock);
1752         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1753                 lp->lp_state |= LNET_PEER_DISCOVERING;
1754         spin_unlock(&lp->lp_lock);
1755         if (list_empty(&lp->lp_dc_list)) {
1756                 lnet_peer_addref_locked(lp);
1757                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1758                 wake_up(&the_lnet.ln_dc_waitq);
1759                 rc = 0;
1760         } else {
1761                 rc = -EALREADY;
1762         }
1763
1764         CDEBUG(D_NET, "Queue peer %s: %d\n",
1765                libcfs_nid2str(lp->lp_primary_nid), rc);
1766
1767         return rc;
1768 }
1769
1770 /*
1771  * Discovery of a peer is complete. Wake all waiters on the peer.
1772  * Call with lnet_net_lock/EX held.
1773  */
1774 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1775 {
1776         struct lnet_msg *msg, *tmp;
1777         int rc = 0;
1778         struct list_head pending_msgs;
1779
1780         INIT_LIST_HEAD(&pending_msgs);
1781
1782         CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n",
1783                libcfs_nid2str(lp->lp_primary_nid));
1784
1785         list_del_init(&lp->lp_dc_list);
1786         list_splice_init(&lp->lp_dc_pendq, &pending_msgs);
1787         wake_up_all(&lp->lp_dc_waitq);
1788
1789         lnet_net_unlock(LNET_LOCK_EX);
1790
1791         /* iterate through all pending messages and send them again */
1792         list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
1793                 list_del_init(&msg->msg_list);
1794                 if (lp->lp_dc_error) {
1795                         lnet_finalize(msg, lp->lp_dc_error);
1796                         continue;
1797                 }
1798
1799                 CDEBUG(D_NET, "sending pending message %s to target %s\n",
1800                        lnet_msgtyp2str(msg->msg_type),
1801                        libcfs_id2str(msg->msg_target));
1802                 rc = lnet_send(msg->msg_src_nid_param, msg,
1803                                msg->msg_rtr_nid_param);
1804                 if (rc < 0) {
1805                         CNETERR("Error sending %s to %s: %d\n",
1806                                lnet_msgtyp2str(msg->msg_type),
1807                                libcfs_id2str(msg->msg_target), rc);
1808                         lnet_finalize(msg, rc);
1809                 }
1810         }
1811         lnet_net_lock(LNET_LOCK_EX);
1812         lnet_peer_decref_locked(lp);
1813 }
1814
1815 /*
1816  * Handle inbound push.
1817  * Like any event handler, called with lnet_res_lock/CPT held.
1818  */
1819 void lnet_peer_push_event(struct lnet_event *ev)
1820 {
1821         struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
1822         struct lnet_peer *lp;
1823
1824         /* lnet_find_peer() adds a refcount */
1825         lp = lnet_find_peer(ev->source.nid);
1826         if (!lp) {
1827                 CDEBUG(D_NET, "Push Put from unknown %s (source %s). Ignoring...\n",
1828                        libcfs_nid2str(ev->initiator.nid),
1829                        libcfs_nid2str(ev->source.nid));
1830                 return;
1831         }
1832
1833         /* Ensure peer state remains consistent while we modify it. */
1834         spin_lock(&lp->lp_lock);
1835
1836         /*
1837          * If some kind of error happened the contents of the message
1838          * cannot be used. Clear the NIDS_UPTODATE and set the
1839          * FORCE_PING flag to trigger a ping.
1840          */
1841         if (ev->status) {
1842                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1843                 lp->lp_state |= LNET_PEER_FORCE_PING;
1844                 CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n",
1845                        ev->status,
1846                        libcfs_nid2str(lp->lp_primary_nid),
1847                        libcfs_nid2str(ev->source.nid));
1848                 goto out;
1849         }
1850
1851         /*
1852          * A push with invalid or corrupted info. Clear the UPTODATE
1853          * flag to trigger a ping.
1854          */
1855         if (lnet_ping_info_validate(&pbuf->pb_info)) {
1856                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1857                 lp->lp_state |= LNET_PEER_FORCE_PING;
1858                 CDEBUG(D_NET, "Corrupted Push from %s\n",
1859                        libcfs_nid2str(lp->lp_primary_nid));
1860                 goto out;
1861         }
1862
1863         /*
1864          * Make sure we'll allocate the correct size ping buffer when
1865          * pinging the peer.
1866          */
1867         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
1868                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
1869
1870         /*
1871          * A non-Multi-Rail peer is not supposed to be capable of
1872          * sending a push.
1873          */
1874         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) {
1875                 CERROR("Push from non-Multi-Rail peer %s dropped\n",
1876                        libcfs_nid2str(lp->lp_primary_nid));
1877                 goto out;
1878         }
1879
1880         /*
1881          * Check the MULTIRAIL flag. Complain if the peer was DLC
1882          * configured without it.
1883          */
1884         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1885                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1886                         CERROR("Push says %s is Multi-Rail, DLC says not\n",
1887                                libcfs_nid2str(lp->lp_primary_nid));
1888                 } else {
1889                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1890                         lnet_peer_clr_non_mr_pref_nids(lp);
1891                 }
1892         }
1893
1894         /*
1895          * The peer may have discovery disabled at its end. Set
1896          * NO_DISCOVERY as appropriate.
1897          */
1898         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
1899                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
1900                        libcfs_nid2str(lp->lp_primary_nid));
1901                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
1902         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1903                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
1904                        libcfs_nid2str(lp->lp_primary_nid));
1905                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
1906         }
1907
1908         /*
1909          * Check for truncation of the Put message. Clear the
1910          * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping,
1911          * and tell discovery to allocate a bigger buffer.
1912          */
1913         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
1914                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
1915                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
1916                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1917                 lp->lp_state |= LNET_PEER_FORCE_PING;
1918                 CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n",
1919                        libcfs_nid2str(lp->lp_primary_nid),
1920                        pbuf->pb_info.pi_nnis);
1921                 goto out;
1922         }
1923
1924         /*
1925          * Check whether the Put data is stale. Stale data can just be
1926          * dropped.
1927          */
1928         if (pbuf->pb_info.pi_nnis > 1 &&
1929             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid &&
1930             LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
1931                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1932                        libcfs_nid2str(lp->lp_primary_nid),
1933                        LNET_PING_BUFFER_SEQNO(pbuf),
1934                        lp->lp_peer_seqno);
1935                 goto out;
1936         }
1937
1938         /*
1939          * Check whether the Put data is new, in which case we clear
1940          * the UPTODATE flag and prepare to process it.
1941          *
1942          * If the Put data is current, and the peer is UPTODATE then
1943          * we assome everything is all right and drop the data as
1944          * stale.
1945          */
1946         if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) {
1947                 lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
1948                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1949         } else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) {
1950                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1951                        libcfs_nid2str(lp->lp_primary_nid),
1952                        LNET_PING_BUFFER_SEQNO(pbuf),
1953                        lp->lp_peer_seqno);
1954                 goto out;
1955         }
1956
1957         /*
1958          * If there is data present that hasn't been processed yet,
1959          * we'll replace it if the Put contained newer data and it
1960          * fits. We're racing with a Ping or earlier Push in this
1961          * case.
1962          */
1963         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
1964                 if (LNET_PING_BUFFER_SEQNO(pbuf) >
1965                         LNET_PING_BUFFER_SEQNO(lp->lp_data) &&
1966                     pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) {
1967                         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1968                                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1969                         CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n",
1970                               libcfs_nid2str(lp->lp_primary_nid),
1971                               LNET_PING_BUFFER_SEQNO(pbuf),
1972                               LNET_PING_BUFFER_SEQNO(lp->lp_data));
1973                 }
1974                 goto out;
1975         }
1976
1977         /*
1978          * Allocate a buffer to copy the data. On a failure we drop
1979          * the Push and set FORCE_PING to force the discovery
1980          * thread to fix the problem by pinging the peer.
1981          */
1982         lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC);
1983         if (!lp->lp_data) {
1984                 lp->lp_state |= LNET_PEER_FORCE_PING;
1985                 CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n",
1986                        libcfs_nid2str(lp->lp_primary_nid),
1987                        LNET_PING_BUFFER_SEQNO(pbuf));
1988                 goto out;
1989         }
1990
1991         /* Success */
1992         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1993                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1994         lp->lp_state |= LNET_PEER_DATA_PRESENT;
1995         CDEBUG(D_NET, "Received Push %s %u\n",
1996                libcfs_nid2str(lp->lp_primary_nid),
1997                LNET_PING_BUFFER_SEQNO(pbuf));
1998
1999 out:
2000         /*
2001          * Queue the peer for discovery if not done, force it on the request
2002          * queue and wake the discovery thread if the peer was already queued,
2003          * because its status changed.
2004          */
2005         spin_unlock(&lp->lp_lock);
2006         lnet_net_lock(LNET_LOCK_EX);
2007         if (!lnet_peer_is_uptodate(lp) && lnet_peer_queue_for_discovery(lp)) {
2008                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2009                 wake_up(&the_lnet.ln_dc_waitq);
2010         }
2011         /* Drop refcount from lookup */
2012         lnet_peer_decref_locked(lp);
2013         lnet_net_unlock(LNET_LOCK_EX);
2014 }
2015
2016 /*
2017  * Clear the discovery error state, unless we're already discovering
2018  * this peer, in which case the error is current.
2019  */
2020 static void lnet_peer_clear_discovery_error(struct lnet_peer *lp)
2021 {
2022         spin_lock(&lp->lp_lock);
2023         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
2024                 lp->lp_dc_error = 0;
2025         spin_unlock(&lp->lp_lock);
2026 }
2027
2028 /*
2029  * Peer discovery slow path. The ln_api_mutex is held on entry, and
2030  * dropped/retaken within this function. An lnet_peer_ni is passed in
2031  * because discovery could tear down an lnet_peer.
2032  */
2033 int
2034 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block)
2035 {
2036         DEFINE_WAIT(wait);
2037         struct lnet_peer *lp;
2038         int rc = 0;
2039
2040 again:
2041         lnet_net_unlock(cpt);
2042         lnet_net_lock(LNET_LOCK_EX);
2043         lp = lpni->lpni_peer_net->lpn_peer;
2044         lnet_peer_clear_discovery_error(lp);
2045
2046         /*
2047          * We're willing to be interrupted. The lpni can become a
2048          * zombie if we race with DLC, so we must check for that.
2049          */
2050         for (;;) {
2051                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
2052                 if (signal_pending(current))
2053                         break;
2054                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2055                         break;
2056                 if (lp->lp_dc_error)
2057                         break;
2058                 if (lnet_peer_is_uptodate(lp))
2059                         break;
2060                 lnet_peer_queue_for_discovery(lp);
2061                 /*
2062                  * if caller requested a non-blocking operation then
2063                  * return immediately. Once discovery is complete then the
2064                  * peer ref will be decremented and any pending messages
2065                  * that were stopped due to discovery will be transmitted.
2066                  */
2067                 if (!block)
2068                         break;
2069
2070                 lnet_peer_addref_locked(lp);
2071                 lnet_net_unlock(LNET_LOCK_EX);
2072                 schedule();
2073                 finish_wait(&lp->lp_dc_waitq, &wait);
2074                 lnet_net_lock(LNET_LOCK_EX);
2075                 lnet_peer_decref_locked(lp);
2076                 /* Peer may have changed */
2077                 lp = lpni->lpni_peer_net->lpn_peer;
2078         }
2079         finish_wait(&lp->lp_dc_waitq, &wait);
2080
2081         lnet_net_unlock(LNET_LOCK_EX);
2082         lnet_net_lock(cpt);
2083
2084         /*
2085          * If the peer has changed after we've discovered the older peer,
2086          * then we need to discovery the new peer to make sure the
2087          * interface information is up to date
2088          */
2089         if (lp != lpni->lpni_peer_net->lpn_peer)
2090                 goto again;
2091
2092         if (signal_pending(current))
2093                 rc = -EINTR;
2094         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2095                 rc = -ESHUTDOWN;
2096         else if (lp->lp_dc_error)
2097                 rc = lp->lp_dc_error;
2098         else if (!block)
2099                 CDEBUG(D_NET, "non-blocking discovery\n");
2100         else if (!lnet_peer_is_uptodate(lp))
2101                 goto again;
2102
2103         CDEBUG(D_NET, "peer %s NID %s: %d. %s\n",
2104                (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"),
2105                libcfs_nid2str(lpni->lpni_nid), rc,
2106                (!block) ? "pending discovery" : "discovery complete");
2107
2108         return rc;
2109 }
2110
2111 /* Handle an incoming ack for a push. */
2112 static void
2113 lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev)
2114 {
2115         struct lnet_ping_buffer *pbuf;
2116
2117         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2118         spin_lock(&lp->lp_lock);
2119         lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2120         lp->lp_push_error = ev->status;
2121         if (ev->status)
2122                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2123         else
2124                 lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2125         spin_unlock(&lp->lp_lock);
2126
2127         CDEBUG(D_NET, "peer %s ev->status %d\n",
2128                libcfs_nid2str(lp->lp_primary_nid), ev->status);
2129 }
2130
2131 /* Handle a Reply message. This is the reply to a Ping message. */
2132 static void
2133 lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
2134 {
2135         struct lnet_ping_buffer *pbuf;
2136         int rc;
2137
2138         spin_lock(&lp->lp_lock);
2139
2140         /*
2141          * If some kind of error happened the contents of message
2142          * cannot be used. Set PING_FAILED to trigger a retry.
2143          */
2144         if (ev->status) {
2145                 lp->lp_state |= LNET_PEER_PING_FAILED;
2146                 lp->lp_ping_error = ev->status;
2147                 CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n",
2148                        ev->status,
2149                        libcfs_nid2str(lp->lp_primary_nid),
2150                        libcfs_nid2str(ev->source.nid));
2151                 goto out;
2152         }
2153
2154         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2155         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
2156                 lnet_swap_pinginfo(pbuf);
2157
2158         /*
2159          * A reply with invalid or corrupted info. Set PING_FAILED to
2160          * trigger a retry.
2161          */
2162         rc = lnet_ping_info_validate(&pbuf->pb_info);
2163         if (rc) {
2164                 lp->lp_state |= LNET_PEER_PING_FAILED;
2165                 lp->lp_ping_error = 0;
2166                 CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n",
2167                        libcfs_nid2str(lp->lp_primary_nid), rc);
2168                 goto out;
2169         }
2170
2171         /*
2172          * Update the MULTI_RAIL flag based on the reply. If the peer
2173          * was configured with DLC then the setting should match what
2174          * DLC put in.
2175          */
2176         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
2177                 if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2178                         /* Everything's fine */
2179                 } else if (lp->lp_state & LNET_PEER_CONFIGURED) {
2180                         CWARN("Reply says %s is Multi-Rail, DLC says not\n",
2181                               libcfs_nid2str(lp->lp_primary_nid));
2182                 } else {
2183                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
2184                         lnet_peer_clr_non_mr_pref_nids(lp);
2185                 }
2186         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2187                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
2188                         CWARN("DLC says %s is Multi-Rail, Reply says not\n",
2189                               libcfs_nid2str(lp->lp_primary_nid));
2190                 } else {
2191                         CERROR("Multi-Rail state vanished from %s\n",
2192                                libcfs_nid2str(lp->lp_primary_nid));
2193                         lp->lp_state &= ~LNET_PEER_MULTI_RAIL;
2194                 }
2195         }
2196
2197         /*
2198          * Make sure we'll allocate the correct size ping buffer when
2199          * pinging the peer.
2200          */
2201         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
2202                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
2203
2204         /*
2205          * The peer may have discovery disabled at its end. Set
2206          * NO_DISCOVERY as appropriate.
2207          */
2208         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
2209                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
2210                        libcfs_nid2str(lp->lp_primary_nid));
2211                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
2212         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
2213                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
2214                        libcfs_nid2str(lp->lp_primary_nid));
2215                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
2216         }
2217
2218         /*
2219          * Check for truncation of the Reply. Clear PING_SENT and set
2220          * PING_FAILED to trigger a retry.
2221          */
2222         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
2223                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
2224                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
2225                 lp->lp_state |= LNET_PEER_PING_FAILED;
2226                 lp->lp_ping_error = 0;
2227                 CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n",
2228                        libcfs_nid2str(lp->lp_primary_nid),
2229                        pbuf->pb_info.pi_nnis);
2230                 goto out;
2231         }
2232
2233         /*
2234          * Check the sequence numbers in the reply. These are only
2235          * available if the reply came from a Multi-Rail peer.
2236          */
2237         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
2238             pbuf->pb_info.pi_nnis > 1 &&
2239             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
2240                 if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
2241                         CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n",
2242                                 libcfs_nid2str(lp->lp_primary_nid),
2243                                 LNET_PING_BUFFER_SEQNO(pbuf),
2244                                 lp->lp_peer_seqno);
2245                         goto out;
2246                 }
2247
2248                 if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno)
2249                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2250         }
2251
2252         /* We're happy with the state of the data in the buffer. */
2253         CDEBUG(D_NET, "peer %s data present %u\n",
2254                libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno);
2255         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
2256                 lnet_ping_buffer_decref(lp->lp_data);
2257         else
2258                 lp->lp_state |= LNET_PEER_DATA_PRESENT;
2259         lnet_ping_buffer_addref(pbuf);
2260         lp->lp_data = pbuf;
2261 out:
2262         lp->lp_state &= ~LNET_PEER_PING_SENT;
2263         spin_unlock(&lp->lp_lock);
2264 }
2265
2266 /*
2267  * Send event handling. Only matters for error cases, where we clean
2268  * up state on the peer and peer_ni that would otherwise be updated in
2269  * the REPLY event handler for a successful Ping, and the ACK event
2270  * handler for a successful Push.
2271  */
2272 static int
2273 lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev)
2274 {
2275         int rc = 0;
2276
2277         if (!ev->status)
2278                 goto out;
2279
2280         spin_lock(&lp->lp_lock);
2281         if (ev->msg_type == LNET_MSG_GET) {
2282                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2283                 lp->lp_state |= LNET_PEER_PING_FAILED;
2284                 lp->lp_ping_error = ev->status;
2285         } else { /* ev->msg_type == LNET_MSG_PUT */
2286                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2287                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2288                 lp->lp_push_error = ev->status;
2289         }
2290         spin_unlock(&lp->lp_lock);
2291         rc = LNET_REDISCOVER_PEER;
2292 out:
2293         CDEBUG(D_NET, "%s Send to %s: %d\n",
2294                 (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"),
2295                 libcfs_nid2str(ev->target.nid), rc);
2296         return rc;
2297 }
2298
2299 /*
2300  * Unlink event handling. This event is only seen if a call to
2301  * LNetMDUnlink() caused the event to be unlinked. If this call was
2302  * made after the event was set up in LNetGet() or LNetPut() then we
2303  * assume the Ping or Push timed out.
2304  */
2305 static void
2306 lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev)
2307 {
2308         spin_lock(&lp->lp_lock);
2309         /* We've passed through LNetGet() */
2310         if (lp->lp_state & LNET_PEER_PING_SENT) {
2311                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2312                 lp->lp_state |= LNET_PEER_PING_FAILED;
2313                 lp->lp_ping_error = -ETIMEDOUT;
2314                 CDEBUG(D_NET, "Ping Unlink for message to peer %s\n",
2315                         libcfs_nid2str(lp->lp_primary_nid));
2316         }
2317         /* We've passed through LNetPut() */
2318         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2319                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2320                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2321                 lp->lp_push_error = -ETIMEDOUT;
2322                 CDEBUG(D_NET, "Push Unlink for message to peer %s\n",
2323                         libcfs_nid2str(lp->lp_primary_nid));
2324         }
2325         spin_unlock(&lp->lp_lock);
2326 }
2327
2328 /*
2329  * Event handler for the discovery EQ.
2330  *
2331  * Called with lnet_res_lock(cpt) held. The cpt is the
2332  * lnet_cpt_of_cookie() of the md handle cookie.
2333  */
2334 static void lnet_discovery_event_handler(struct lnet_event *event)
2335 {
2336         struct lnet_peer *lp = event->md.user_ptr;
2337         struct lnet_ping_buffer *pbuf;
2338         int rc;
2339
2340         /* discovery needs to take another look */
2341         rc = LNET_REDISCOVER_PEER;
2342
2343         CDEBUG(D_NET, "Received event: %d\n", event->type);
2344
2345         switch (event->type) {
2346         case LNET_EVENT_ACK:
2347                 lnet_discovery_event_ack(lp, event);
2348                 break;
2349         case LNET_EVENT_REPLY:
2350                 lnet_discovery_event_reply(lp, event);
2351                 break;
2352         case LNET_EVENT_SEND:
2353                 /* Only send failure triggers a retry. */
2354                 rc = lnet_discovery_event_send(lp, event);
2355                 break;
2356         case LNET_EVENT_UNLINK:
2357                 /* LNetMDUnlink() was called */
2358                 lnet_discovery_event_unlink(lp, event);
2359                 break;
2360         default:
2361                 /* Invalid events. */
2362                 LBUG();
2363         }
2364         lnet_net_lock(LNET_LOCK_EX);
2365         if (event->unlinked) {
2366                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
2367                 lnet_ping_buffer_decref(pbuf);
2368                 lnet_peer_decref_locked(lp);
2369         }
2370
2371         /* put peer back at end of request queue, if discovery not already
2372          * done */
2373         if (rc == LNET_REDISCOVER_PEER && !lnet_peer_is_uptodate(lp)) {
2374                 list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2375                 wake_up(&the_lnet.ln_dc_waitq);
2376         }
2377         lnet_net_unlock(LNET_LOCK_EX);
2378 }
2379
2380 /*
2381  * Build a peer from incoming data.
2382  *
2383  * The NIDs in the incoming data are supposed to be structured as follows:
2384  *  - loopback
2385  *  - primary NID
2386  *  - other NIDs in same net
2387  *  - NIDs in second net
2388  *  - NIDs in third net
2389  *  - ...
2390  * This due to the way the list of NIDs in the data is created.
2391  *
2392  * Note that this function will mark the peer uptodate unless an
2393  * ENOMEM is encontered. All other errors are due to a conflict
2394  * between the DLC configuration and what discovery sees. We treat DLC
2395  * as binding, and therefore set the NIDS_UPTODATE flag to prevent the
2396  * peer from becoming stuck in discovery.
2397  */
2398 static int lnet_peer_merge_data(struct lnet_peer *lp,
2399                                 struct lnet_ping_buffer *pbuf)
2400 {
2401         struct lnet_peer_ni *lpni;
2402         lnet_nid_t *curnis = NULL;
2403         lnet_nid_t *addnis = NULL;
2404         lnet_nid_t *delnis = NULL;
2405         unsigned flags;
2406         int ncurnis;
2407         int naddnis;
2408         int ndelnis;
2409         int nnis = 0;
2410         int i;
2411         int j;
2412         int rc;
2413
2414         flags = LNET_PEER_DISCOVERED;
2415         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2416                 flags |= LNET_PEER_MULTI_RAIL;
2417
2418         nnis = MAX(lp->lp_nnis, pbuf->pb_info.pi_nnis);
2419         LIBCFS_ALLOC(curnis, nnis * sizeof(lnet_nid_t));
2420         LIBCFS_ALLOC(addnis, nnis * sizeof(lnet_nid_t));
2421         LIBCFS_ALLOC(delnis, nnis * sizeof(lnet_nid_t));
2422         if (!curnis || !addnis || !delnis) {
2423                 rc = -ENOMEM;
2424                 goto out;
2425         }
2426         ncurnis = 0;
2427         naddnis = 0;
2428         ndelnis = 0;
2429
2430         /* Construct the list of NIDs present in peer. */
2431         lpni = NULL;
2432         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
2433                 curnis[ncurnis++] = lpni->lpni_nid;
2434
2435         /*
2436          * Check for NIDs in pbuf not present in curnis[].
2437          * The loop starts at 1 to skip the loopback NID.
2438          */
2439         for (i = 1; i < pbuf->pb_info.pi_nnis; i++) {
2440                 for (j = 0; j < ncurnis; j++)
2441                         if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
2442                                 break;
2443                 if (j == ncurnis)
2444                         addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid;
2445         }
2446         /*
2447          * Check for NIDs in curnis[] not present in pbuf.
2448          * The nested loop starts at 1 to skip the loopback NID.
2449          *
2450          * But never add the loopback NID to delnis[]: if it is
2451          * present in curnis[] then this peer is for this node.
2452          */
2453         for (i = 0; i < ncurnis; i++) {
2454                 if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND)
2455                         continue;
2456                 for (j = 1; j < pbuf->pb_info.pi_nnis; j++)
2457                         if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid)
2458                                 break;
2459                 if (j == pbuf->pb_info.pi_nnis)
2460                         delnis[ndelnis++] = curnis[i];
2461         }
2462
2463         for (i = 0; i < naddnis; i++) {
2464                 rc = lnet_peer_add_nid(lp, addnis[i], flags);
2465                 if (rc) {
2466                         CERROR("Error adding NID %s to peer %s: %d\n",
2467                                libcfs_nid2str(addnis[i]),
2468                                libcfs_nid2str(lp->lp_primary_nid), rc);
2469                         if (rc == -ENOMEM)
2470                                 goto out;
2471                 }
2472         }
2473         for (i = 0; i < ndelnis; i++) {
2474                 rc = lnet_peer_del_nid(lp, delnis[i], flags);
2475                 if (rc) {
2476                         CERROR("Error deleting NID %s from peer %s: %d\n",
2477                                libcfs_nid2str(delnis[i]),
2478                                libcfs_nid2str(lp->lp_primary_nid), rc);
2479                         if (rc == -ENOMEM)
2480                                 goto out;
2481                 }
2482         }
2483         /*
2484          * Errors other than -ENOMEM are due to peers having been
2485          * configured with DLC. Ignore these because DLC overrides
2486          * Discovery.
2487          */
2488         rc = 0;
2489 out:
2490         LIBCFS_FREE(curnis, nnis * sizeof(lnet_nid_t));
2491         LIBCFS_FREE(addnis, nnis * sizeof(lnet_nid_t));
2492         LIBCFS_FREE(delnis, nnis * sizeof(lnet_nid_t));
2493         lnet_ping_buffer_decref(pbuf);
2494         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2495
2496         if (rc) {
2497                 spin_lock(&lp->lp_lock);
2498                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2499                 lp->lp_state |= LNET_PEER_FORCE_PING;
2500                 spin_unlock(&lp->lp_lock);
2501         }
2502         return rc;
2503 }
2504
2505 /*
2506  * The data in pbuf says lp is its primary peer, but the data was
2507  * received by a different peer. Try to update lp with the data.
2508  */
2509 static int
2510 lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
2511 {
2512         struct lnet_handle_md mdh;
2513
2514         /* Queue lp for discovery, and force it on the request queue. */
2515         lnet_net_lock(LNET_LOCK_EX);
2516         if (lnet_peer_queue_for_discovery(lp))
2517                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2518         lnet_net_unlock(LNET_LOCK_EX);
2519
2520         LNetInvalidateMDHandle(&mdh);
2521
2522         /*
2523          * Decide whether we can move the peer to the DATA_PRESENT state.
2524          *
2525          * We replace stale data for a multi-rail peer, repair PING_FAILED
2526          * status, and preempt FORCE_PING.
2527          *
2528          * If after that we have DATA_PRESENT, we merge it into this peer.
2529          */
2530         spin_lock(&lp->lp_lock);
2531         if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2532                 if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) {
2533                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2534                 } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2535                         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2536                         lnet_ping_buffer_decref(pbuf);
2537                         pbuf = lp->lp_data;
2538                         lp->lp_data = NULL;
2539                 }
2540         }
2541         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2542                 lnet_ping_buffer_decref(lp->lp_data);
2543                 lp->lp_data = NULL;
2544                 lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2545         }
2546         if (lp->lp_state & LNET_PEER_PING_FAILED) {
2547                 mdh = lp->lp_ping_mdh;
2548                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2549                 lp->lp_state &= ~LNET_PEER_PING_FAILED;
2550                 lp->lp_ping_error = 0;
2551         }
2552         if (lp->lp_state & LNET_PEER_FORCE_PING)
2553                 lp->lp_state &= ~LNET_PEER_FORCE_PING;
2554         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2555         spin_unlock(&lp->lp_lock);
2556
2557         if (!LNetMDHandleIsInvalid(mdh))
2558                 LNetMDUnlink(mdh);
2559
2560         if (pbuf)
2561                 return lnet_peer_merge_data(lp, pbuf);
2562
2563         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2564         return 0;
2565 }
2566
2567 /*
2568  * Update a peer using the data received.
2569  */
2570 static int lnet_peer_data_present(struct lnet_peer *lp)
2571 __must_hold(&lp->lp_lock)
2572 {
2573         struct lnet_ping_buffer *pbuf;
2574         struct lnet_peer_ni *lpni;
2575         lnet_nid_t nid = LNET_NID_ANY;
2576         unsigned flags;
2577         int rc = 0;
2578
2579         pbuf = lp->lp_data;
2580         lp->lp_data = NULL;
2581         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2582         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2583         spin_unlock(&lp->lp_lock);
2584
2585         /*
2586          * Modifications of peer structures are done while holding the
2587          * ln_api_mutex. A global lock is required because we may be
2588          * modifying multiple peer structures, and a mutex greatly
2589          * simplifies memory management.
2590          *
2591          * The actual changes to the data structures must also protect
2592          * against concurrent lookups, for which the lnet_net_lock in
2593          * LNET_LOCK_EX mode is used.
2594          */
2595         mutex_lock(&the_lnet.ln_api_mutex);
2596         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
2597                 rc = -ESHUTDOWN;
2598                 goto out;
2599         }
2600
2601         /*
2602          * If this peer is not on the peer list then it is being torn
2603          * down, and our reference count may be all that is keeping it
2604          * alive. Don't do any work on it.
2605          */
2606         if (list_empty(&lp->lp_peer_list))
2607                 goto out;
2608
2609         flags = LNET_PEER_DISCOVERED;
2610         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2611                 flags |= LNET_PEER_MULTI_RAIL;
2612
2613         /*
2614          * Check whether the primary NID in the message matches the
2615          * primary NID of the peer. If it does, update the peer, if
2616          * it it does not, check whether there is already a peer with
2617          * that primary NID. If no such peer exists, try to update
2618          * the primary NID of the current peer (allowed if it was
2619          * created due to message traffic) and complete the update.
2620          * If the peer did exist, hand off the data to it.
2621          *
2622          * The peer for the loopback interface is a special case: this
2623          * is the peer for the local node, and we want to set its
2624          * primary NID to the correct value here. Moreover, this peer
2625          * can show up with only the loopback NID in the ping buffer.
2626          */
2627         if (pbuf->pb_info.pi_nnis <= 1)
2628                 goto out;
2629         nid = pbuf->pb_info.pi_ni[1].ns_nid;
2630         if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) {
2631                 rc = lnet_peer_set_primary_nid(lp, nid, flags);
2632                 if (!rc)
2633                         rc = lnet_peer_merge_data(lp, pbuf);
2634         } else if (lp->lp_primary_nid == nid) {
2635                 rc = lnet_peer_merge_data(lp, pbuf);
2636         } else {
2637                 lpni = lnet_find_peer_ni_locked(nid);
2638                 if (!lpni) {
2639                         rc = lnet_peer_set_primary_nid(lp, nid, flags);
2640                         if (rc) {
2641                                 CERROR("Primary NID error %s versus %s: %d\n",
2642                                        libcfs_nid2str(lp->lp_primary_nid),
2643                                        libcfs_nid2str(nid), rc);
2644                         } else {
2645                                 rc = lnet_peer_merge_data(lp, pbuf);
2646                         }
2647                 } else {
2648                         rc = lnet_peer_set_primary_data(
2649                                 lpni->lpni_peer_net->lpn_peer, pbuf);
2650                         lnet_peer_ni_decref_locked(lpni);
2651                 }
2652         }
2653 out:
2654         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2655         mutex_unlock(&the_lnet.ln_api_mutex);
2656
2657         spin_lock(&lp->lp_lock);
2658         /* Tell discovery to re-check the peer immediately. */
2659         if (!rc)
2660                 rc = LNET_REDISCOVER_PEER;
2661         return rc;
2662 }
2663
2664 /*
2665  * A ping failed. Clear the PING_FAILED state and set the
2666  * FORCE_PING state, to ensure a retry even if discovery is
2667  * disabled. This avoids being left with incorrect state.
2668  */
2669 static int lnet_peer_ping_failed(struct lnet_peer *lp)
2670 __must_hold(&lp->lp_lock)
2671 {
2672         struct lnet_handle_md mdh;
2673         int rc;
2674
2675         mdh = lp->lp_ping_mdh;
2676         LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2677         lp->lp_state &= ~LNET_PEER_PING_FAILED;
2678         lp->lp_state |= LNET_PEER_FORCE_PING;
2679         rc = lp->lp_ping_error;
2680         lp->lp_ping_error = 0;
2681         spin_unlock(&lp->lp_lock);
2682
2683         if (!LNetMDHandleIsInvalid(mdh))
2684                 LNetMDUnlink(mdh);
2685
2686         CDEBUG(D_NET, "peer %s:%d\n",
2687                libcfs_nid2str(lp->lp_primary_nid), rc);
2688
2689         spin_lock(&lp->lp_lock);
2690         return rc ? rc : LNET_REDISCOVER_PEER;
2691 }
2692
2693 /*
2694  * Select NID to send a Ping or Push to.
2695  */
2696 static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
2697 {
2698         struct lnet_peer_ni *lpni;
2699
2700         /* Look for a direct-connected NID for this peer. */
2701         lpni = NULL;
2702         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2703                 if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
2704                         continue;
2705                 break;
2706         }
2707         if (lpni)
2708                 return lpni->lpni_nid;
2709
2710         /* Look for a routed-connected NID for this peer. */
2711         lpni = NULL;
2712         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2713                 if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
2714                         continue;
2715                 break;
2716         }
2717         if (lpni)
2718                 return lpni->lpni_nid;
2719
2720         return LNET_NID_ANY;
2721 }
2722
2723 /* Active side of ping. */
2724 static int lnet_peer_send_ping(struct lnet_peer *lp)
2725 __must_hold(&lp->lp_lock)
2726 {
2727         lnet_nid_t pnid;
2728         int nnis;
2729         int rc;
2730         int cpt;
2731
2732         lp->lp_state |= LNET_PEER_PING_SENT;
2733         lp->lp_state &= ~LNET_PEER_FORCE_PING;
2734         spin_unlock(&lp->lp_lock);
2735
2736         cpt = lnet_net_lock_current();
2737         /* Refcount for MD. */
2738         lnet_peer_addref_locked(lp);
2739         pnid = lnet_peer_select_nid(lp);
2740         lnet_net_unlock(cpt);
2741
2742         nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN);
2743
2744         rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp,
2745                             the_lnet.ln_dc_eqh, false);
2746
2747         /*
2748          * if LNetMDBind in lnet_send_ping fails we need to decrement the
2749          * refcount on the peer, otherwise LNetMDUnlink will be called
2750          * which will eventually do that.
2751          */
2752         if (rc > 0) {
2753                 lnet_net_lock(cpt);
2754                 lnet_peer_decref_locked(lp);
2755                 lnet_net_unlock(cpt);
2756                 rc = -rc; /* change the rc to negative value */
2757                 goto fail_error;
2758         } else if (rc < 0) {
2759                 goto fail_error;
2760         }
2761
2762         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2763
2764         spin_lock(&lp->lp_lock);
2765         return 0;
2766
2767 fail_error:
2768         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2769         /*
2770          * The errors that get us here are considered hard errors and
2771          * cause Discovery to terminate. So we clear PING_SENT, but do
2772          * not set either PING_FAILED or FORCE_PING. In fact we need
2773          * to clear PING_FAILED, because the unlink event handler will
2774          * have set it if we called LNetMDUnlink() above.
2775          */
2776         spin_lock(&lp->lp_lock);
2777         lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED);
2778         return rc;
2779 }
2780
2781 /*
2782  * This function exists because you cannot call LNetMDUnlink() from an
2783  * event handler.
2784  */
2785 static int lnet_peer_push_failed(struct lnet_peer *lp)
2786 __must_hold(&lp->lp_lock)
2787 {
2788         struct lnet_handle_md mdh;
2789         int rc;
2790
2791         mdh = lp->lp_push_mdh;
2792         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2793         lp->lp_state &= ~LNET_PEER_PUSH_FAILED;
2794         rc = lp->lp_push_error;
2795         lp->lp_push_error = 0;
2796         spin_unlock(&lp->lp_lock);
2797
2798         if (!LNetMDHandleIsInvalid(mdh))
2799                 LNetMDUnlink(mdh);
2800
2801         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2802         spin_lock(&lp->lp_lock);
2803         return rc ? rc : LNET_REDISCOVER_PEER;
2804 }
2805
2806 /* Active side of push. */
2807 static int lnet_peer_send_push(struct lnet_peer *lp)
2808 __must_hold(&lp->lp_lock)
2809 {
2810         struct lnet_ping_buffer *pbuf;
2811         struct lnet_process_id id;
2812         struct lnet_md md;
2813         int cpt;
2814         int rc;
2815
2816         /* Don't push to a non-multi-rail peer. */
2817         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
2818                 lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2819                 return 0;
2820         }
2821
2822         lp->lp_state |= LNET_PEER_PUSH_SENT;
2823         lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2824         spin_unlock(&lp->lp_lock);
2825
2826         cpt = lnet_net_lock_current();
2827         pbuf = the_lnet.ln_ping_target;
2828         lnet_ping_buffer_addref(pbuf);
2829         lnet_net_unlock(cpt);
2830
2831         /* Push source MD */
2832         md.start     = &pbuf->pb_info;
2833         md.length    = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
2834         md.threshold = 2; /* Put/Ack */
2835         md.max_size  = 0;
2836         md.options   = 0;
2837         md.eq_handle = the_lnet.ln_dc_eqh;
2838         md.user_ptr  = lp;
2839
2840         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
2841         if (rc) {
2842                 lnet_ping_buffer_decref(pbuf);
2843                 CERROR("Can't bind push source MD: %d\n", rc);
2844                 goto fail_error;
2845         }
2846         cpt = lnet_net_lock_current();
2847         /* Refcount for MD. */
2848         lnet_peer_addref_locked(lp);
2849         id.pid = LNET_PID_LUSTRE;
2850         id.nid = lnet_peer_select_nid(lp);
2851         lnet_net_unlock(cpt);
2852
2853         if (id.nid == LNET_NID_ANY) {
2854                 rc = -EHOSTUNREACH;
2855                 goto fail_unlink;
2856         }
2857
2858         rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh,
2859                      LNET_ACK_REQ, id, LNET_RESERVED_PORTAL,
2860                      LNET_PROTO_PING_MATCHBITS, 0, 0);
2861
2862         if (rc)
2863                 goto fail_unlink;
2864
2865         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2866
2867         spin_lock(&lp->lp_lock);
2868         return 0;
2869
2870 fail_unlink:
2871         LNetMDUnlink(lp->lp_push_mdh);
2872         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2873 fail_error:
2874         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2875         /*
2876          * The errors that get us here are considered hard errors and
2877          * cause Discovery to terminate. So we clear PUSH_SENT, but do
2878          * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED,
2879          * because the unlink event handler will have set it if we
2880          * called LNetMDUnlink() above.
2881          */
2882         spin_lock(&lp->lp_lock);
2883         lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED);
2884         return rc;
2885 }
2886
2887 /*
2888  * An unrecoverable error was encountered during discovery.
2889  * Set error status in peer and abort discovery.
2890  */
2891 static void lnet_peer_discovery_error(struct lnet_peer *lp, int error)
2892 {
2893         CDEBUG(D_NET, "Discovery error %s: %d\n",
2894                libcfs_nid2str(lp->lp_primary_nid), error);
2895
2896         spin_lock(&lp->lp_lock);
2897         lp->lp_dc_error = error;
2898         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2899         lp->lp_state |= LNET_PEER_REDISCOVER;
2900         spin_unlock(&lp->lp_lock);
2901 }
2902
2903 /*
2904  * Mark the peer as discovered.
2905  */
2906 static int lnet_peer_discovered(struct lnet_peer *lp)
2907 __must_hold(&lp->lp_lock)
2908 {
2909         lp->lp_state |= LNET_PEER_DISCOVERED;
2910         lp->lp_state &= ~(LNET_PEER_DISCOVERING |
2911                           LNET_PEER_REDISCOVER);
2912
2913         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2914
2915         return 0;
2916 }
2917
2918 /*
2919  * Mark the peer as to be rediscovered.
2920  */
2921 static int lnet_peer_rediscover(struct lnet_peer *lp)
2922 __must_hold(&lp->lp_lock)
2923 {
2924         lp->lp_state |= LNET_PEER_REDISCOVER;
2925         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2926
2927         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2928
2929         return 0;
2930 }
2931
2932 /*
2933  * Discovering this peer is taking too long. Cancel any Ping or Push
2934  * that discovery is waiting on by unlinking the relevant MDs. The
2935  * lnet_discovery_event_handler() will proceed from here and complete
2936  * the cleanup.
2937  */
2938 static void lnet_peer_cancel_discovery(struct lnet_peer *lp)
2939 {
2940         struct lnet_handle_md ping_mdh;
2941         struct lnet_handle_md push_mdh;
2942
2943         LNetInvalidateMDHandle(&ping_mdh);
2944         LNetInvalidateMDHandle(&push_mdh);
2945
2946         spin_lock(&lp->lp_lock);
2947         if (lp->lp_state & LNET_PEER_PING_SENT) {
2948                 ping_mdh = lp->lp_ping_mdh;
2949                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2950         }
2951         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2952                 push_mdh = lp->lp_push_mdh;
2953                 LNetInvalidateMDHandle(&lp->lp_push_mdh);
2954         }
2955         spin_unlock(&lp->lp_lock);
2956
2957         if (!LNetMDHandleIsInvalid(ping_mdh))
2958                 LNetMDUnlink(ping_mdh);
2959         if (!LNetMDHandleIsInvalid(push_mdh))
2960                 LNetMDUnlink(push_mdh);
2961 }
2962
2963 /*
2964  * Wait for work to be queued or some other change that must be
2965  * attended to. Returns non-zero if the discovery thread should shut
2966  * down.
2967  */
2968 static int lnet_peer_discovery_wait_for_work(void)
2969 {
2970         int cpt;
2971         int rc = 0;
2972
2973         DEFINE_WAIT(wait);
2974
2975         cpt = lnet_net_lock_current();
2976         for (;;) {
2977                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
2978                                 TASK_INTERRUPTIBLE);
2979                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
2980                         break;
2981                 if (lnet_push_target_resize_needed())
2982                         break;
2983                 if (!list_empty(&the_lnet.ln_dc_request))
2984                         break;
2985                 if (!list_empty(&the_lnet.ln_msg_resend))
2986                         break;
2987                 lnet_net_unlock(cpt);
2988
2989                 /*
2990                  * wakeup max every second to check if there are peers that
2991                  * have been stuck on the working queue for greater than
2992                  * the peer timeout.
2993                  */
2994                 schedule_timeout(cfs_time_seconds(1));
2995                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
2996                 cpt = lnet_net_lock_current();
2997         }
2998         finish_wait(&the_lnet.ln_dc_waitq, &wait);
2999
3000         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3001                 rc = -ESHUTDOWN;
3002
3003         lnet_net_unlock(cpt);
3004
3005         CDEBUG(D_NET, "woken: %d\n", rc);
3006
3007         return rc;
3008 }
3009
3010 /*
3011  * Messages that were pending on a destroyed peer will be put on a global
3012  * resend list. The message resend list will be checked by
3013  * the discovery thread when it wakes up, and will resend messages. These
3014  * messages can still be sendable in the case the lpni which was the initial
3015  * cause of the message re-queue was transfered to another peer.
3016  *
3017  * It is possible that LNet could be shutdown while we're iterating
3018  * through the list. lnet_shudown_lndnets() will attempt to access the
3019  * resend list, but will have to wait until the spinlock is released, by
3020  * which time there shouldn't be any more messages on the resend list.
3021  * During shutdown lnet_send() will fail and lnet_finalize() will be called
3022  * for the messages so they can be released. The other case is that
3023  * lnet_shudown_lndnets() can finalize all the messages before this
3024  * function can visit the resend list, in which case this function will be
3025  * a no-op.
3026  */
3027 static void lnet_resend_msgs(void)
3028 {
3029         struct lnet_msg *msg, *tmp;
3030         struct list_head resend;
3031         int rc;
3032
3033         INIT_LIST_HEAD(&resend);
3034
3035         spin_lock(&the_lnet.ln_msg_resend_lock);
3036         list_splice(&the_lnet.ln_msg_resend, &resend);
3037         spin_unlock(&the_lnet.ln_msg_resend_lock);
3038
3039         list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
3040                 list_del_init(&msg->msg_list);
3041                 rc = lnet_send(msg->msg_src_nid_param, msg,
3042                                msg->msg_rtr_nid_param);
3043                 if (rc < 0) {
3044                         CNETERR("Error sending %s to %s: %d\n",
3045                                lnet_msgtyp2str(msg->msg_type),
3046                                libcfs_id2str(msg->msg_target), rc);
3047                         lnet_finalize(msg, rc);
3048                 }
3049         }
3050 }
3051
3052 /* The discovery thread. */
3053 static int lnet_peer_discovery(void *arg)
3054 {
3055         struct lnet_peer *lp;
3056         int rc;
3057
3058         CDEBUG(D_NET, "started\n");
3059         cfs_block_allsigs();
3060
3061         for (;;) {
3062                 if (lnet_peer_discovery_wait_for_work())
3063                         break;
3064
3065                 lnet_resend_msgs();
3066
3067                 if (lnet_push_target_resize_needed())
3068                         lnet_push_target_resize();
3069
3070                 lnet_net_lock(LNET_LOCK_EX);
3071                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3072                         break;
3073
3074                 /*
3075                  * Process all incoming discovery work requests.  When
3076                  * discovery must wait on a peer to change state, it
3077                  * is added to the tail of the ln_dc_working queue. A
3078                  * timestamp keeps track of when the peer was added,
3079                  * so we can time out discovery requests that take too
3080                  * long.
3081                  */
3082                 while (!list_empty(&the_lnet.ln_dc_request)) {
3083                         lp = list_first_entry(&the_lnet.ln_dc_request,
3084                                               struct lnet_peer, lp_dc_list);
3085                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
3086                         /*
3087                          * set the time the peer was put on the dc_working
3088                          * queue. It shouldn't remain on the queue
3089                          * forever, in case the GET message (for ping)
3090                          * doesn't get a REPLY or the PUT message (for
3091                          * push) doesn't get an ACK.
3092                          */
3093                         lp->lp_last_queued = ktime_get_real_seconds();
3094                         lnet_net_unlock(LNET_LOCK_EX);
3095
3096                         /*
3097                          * Select an action depending on the state of
3098                          * the peer and whether discovery is disabled.
3099                          * The check whether discovery is disabled is
3100                          * done after the code that handles processing
3101                          * for arrived data, cleanup for failures, and
3102                          * forcing a Ping or Push.
3103                          */
3104                         spin_lock(&lp->lp_lock);
3105                         CDEBUG(D_NET, "peer %s state %#x\n",
3106                                 libcfs_nid2str(lp->lp_primary_nid),
3107                                 lp->lp_state);
3108                         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
3109                                 rc = lnet_peer_data_present(lp);
3110                         else if (lp->lp_state & LNET_PEER_PING_FAILED)
3111                                 rc = lnet_peer_ping_failed(lp);
3112                         else if (lp->lp_state & LNET_PEER_PUSH_FAILED)
3113                                 rc = lnet_peer_push_failed(lp);
3114                         else if (lp->lp_state & LNET_PEER_FORCE_PING)
3115                                 rc = lnet_peer_send_ping(lp);
3116                         else if (lp->lp_state & LNET_PEER_FORCE_PUSH)
3117                                 rc = lnet_peer_send_push(lp);
3118                         else if (lnet_peer_discovery_disabled)
3119                                 rc = lnet_peer_rediscover(lp);
3120                         else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
3121                                 rc = lnet_peer_send_ping(lp);
3122                         else if (lnet_peer_needs_push(lp))
3123                                 rc = lnet_peer_send_push(lp);
3124                         else
3125                                 rc = lnet_peer_discovered(lp);
3126                         CDEBUG(D_NET, "peer %s state %#x rc %d\n",
3127                                 libcfs_nid2str(lp->lp_primary_nid),
3128                                 lp->lp_state, rc);
3129                         spin_unlock(&lp->lp_lock);
3130
3131                         lnet_net_lock(LNET_LOCK_EX);
3132                         if (rc == LNET_REDISCOVER_PEER) {
3133                                 list_move(&lp->lp_dc_list,
3134                                           &the_lnet.ln_dc_request);
3135                         } else if (rc) {
3136                                 lnet_peer_discovery_error(lp, rc);
3137                         }
3138                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
3139                                 lnet_peer_discovery_complete(lp);
3140                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3141                                 break;
3142                 }
3143
3144                 lnet_net_unlock(LNET_LOCK_EX);
3145         }
3146
3147         CDEBUG(D_NET, "stopping\n");
3148         /*
3149          * Clean up before telling lnet_peer_discovery_stop() that
3150          * we're done. Use wake_up() below to somewhat reduce the
3151          * size of the thundering herd if there are multiple threads
3152          * waiting on discovery of a single peer.
3153          */
3154         LNetEQFree(the_lnet.ln_dc_eqh);
3155         LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3156
3157         /* Queue cleanup 1: stop all pending pings and pushes. */
3158         lnet_net_lock(LNET_LOCK_EX);
3159         while (!list_empty(&the_lnet.ln_dc_working)) {
3160                 lp = list_first_entry(&the_lnet.ln_dc_working,
3161                                       struct lnet_peer, lp_dc_list);
3162                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
3163                 lnet_net_unlock(LNET_LOCK_EX);
3164                 lnet_peer_cancel_discovery(lp);
3165                 lnet_net_lock(LNET_LOCK_EX);
3166         }
3167         lnet_net_unlock(LNET_LOCK_EX);
3168
3169         /* Queue cleanup 2: wait for the expired queue to clear. */
3170         while (!list_empty(&the_lnet.ln_dc_expired))
3171                 schedule_timeout(cfs_time_seconds(1));
3172
3173         /* Queue cleanup 3: clear the request queue. */
3174         lnet_net_lock(LNET_LOCK_EX);
3175         while (!list_empty(&the_lnet.ln_dc_request)) {
3176                 lp = list_first_entry(&the_lnet.ln_dc_request,
3177                                       struct lnet_peer, lp_dc_list);
3178                 lnet_peer_discovery_error(lp, -ESHUTDOWN);
3179                 lnet_peer_discovery_complete(lp);
3180         }
3181         lnet_net_unlock(LNET_LOCK_EX);
3182
3183         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3184         wake_up(&the_lnet.ln_dc_waitq);
3185
3186         CDEBUG(D_NET, "stopped\n");
3187
3188         return 0;
3189 }
3190
3191 /* ln_api_mutex is held on entry. */
3192 int lnet_peer_discovery_start(void)
3193 {
3194         struct task_struct *task;
3195         int rc;
3196
3197         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
3198                 return -EALREADY;
3199
3200         rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
3201         if (rc != 0) {
3202                 CERROR("Can't allocate discovery EQ: %d\n", rc);
3203                 return rc;
3204         }
3205
3206         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
3207         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
3208         if (IS_ERR(task)) {
3209                 rc = PTR_ERR(task);
3210                 CERROR("Can't start peer discovery thread: %d\n", rc);
3211
3212                 LNetEQFree(the_lnet.ln_dc_eqh);
3213                 LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3214
3215                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3216         }
3217
3218         CDEBUG(D_NET, "discovery start: %d\n", rc);
3219
3220         return rc;
3221 }
3222
3223 /* ln_api_mutex is held on entry. */
3224 void lnet_peer_discovery_stop(void)
3225 {
3226         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
3227                 return;
3228
3229         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
3230         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
3231         wake_up(&the_lnet.ln_dc_waitq);
3232
3233         wait_event(the_lnet.ln_dc_waitq,
3234                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
3235
3236         LASSERT(list_empty(&the_lnet.ln_dc_request));
3237         LASSERT(list_empty(&the_lnet.ln_dc_working));
3238         LASSERT(list_empty(&the_lnet.ln_dc_expired));
3239
3240         CDEBUG(D_NET, "discovery stopped\n");
3241 }
3242
3243 /* Debugging */
3244
3245 void
3246 lnet_debug_peer(lnet_nid_t nid)
3247 {
3248         char                    *aliveness = "NA";
3249         struct lnet_peer_ni     *lp;
3250         int                     cpt;
3251
3252         cpt = lnet_cpt_of_nid(nid, NULL);
3253         lnet_net_lock(cpt);
3254
3255         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
3256         if (IS_ERR(lp)) {
3257                 lnet_net_unlock(cpt);
3258                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
3259                 return;
3260         }
3261
3262         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
3263                 aliveness = lp->lpni_alive ? "up" : "down";
3264
3265         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
3266                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
3267                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
3268                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
3269                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
3270
3271         lnet_peer_ni_decref_locked(lp);
3272
3273         lnet_net_unlock(cpt);
3274 }
3275
3276 /* Gathering information for userspace. */
3277
3278 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
3279                           char aliveness[LNET_MAX_STR_LEN],
3280                           __u32 *cpt_iter, __u32 *refcount,
3281                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
3282                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
3283                           __u32 *peer_tx_qnob)
3284 {
3285         struct lnet_peer_table          *peer_table;
3286         struct lnet_peer_ni             *lp;
3287         int                             j;
3288         int                             lncpt;
3289         bool                            found = false;
3290
3291         /* get the number of CPTs */
3292         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3293
3294         /* if the cpt number to be examined is >= the number of cpts in
3295          * the system then indicate that there are no more cpts to examin
3296          */
3297         if (*cpt_iter >= lncpt)
3298                 return -ENOENT;
3299
3300         /* get the current table */
3301         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
3302         /* if the ptable is NULL then there are no more cpts to examine */
3303         if (peer_table == NULL)
3304                 return -ENOENT;
3305
3306         lnet_net_lock(*cpt_iter);
3307
3308         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
3309                 struct list_head *peers = &peer_table->pt_hash[j];
3310
3311                 list_for_each_entry(lp, peers, lpni_hashlist) {
3312                         if (peer_index-- > 0)
3313                                 continue;
3314
3315                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
3316                         if (lnet_isrouter(lp) ||
3317                                 lnet_peer_aliveness_enabled(lp))
3318                                 snprintf(aliveness, LNET_MAX_STR_LEN,
3319                                          lp->lpni_alive ? "up" : "down");
3320
3321                         *nid = lp->lpni_nid;
3322                         *refcount = atomic_read(&lp->lpni_refcount);
3323                         *ni_peer_tx_credits =
3324                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
3325                         *peer_tx_credits = lp->lpni_txcredits;
3326                         *peer_rtr_credits = lp->lpni_rtrcredits;
3327                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
3328                         *peer_tx_qnob = lp->lpni_txqnob;
3329
3330                         found = true;
3331                 }
3332
3333         }
3334         lnet_net_unlock(*cpt_iter);
3335
3336         *cpt_iter = lncpt;
3337
3338         return found ? 0 : -ENOENT;
3339 }
3340
3341 /* ln_api_mutex is held, which keeps the peer list stable */
3342 int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
3343 {
3344         struct lnet_ioctl_element_stats *lpni_stats;
3345         struct lnet_ioctl_element_msg_stats *lpni_msg_stats;
3346         struct lnet_ioctl_peer_ni_hstats *lpni_hstats;
3347         struct lnet_peer_ni_credit_info *lpni_info;
3348         struct lnet_peer_ni *lpni;
3349         struct lnet_peer *lp;
3350         lnet_nid_t nid;
3351         __u32 size;
3352         int rc;
3353
3354         lp = lnet_find_peer(cfg->prcfg_prim_nid);
3355
3356         if (!lp) {
3357                 rc = -ENOENT;
3358                 goto out;
3359         }
3360
3361         size = sizeof(nid) + sizeof(*lpni_info) + sizeof(*lpni_stats)
3362                 + sizeof(*lpni_msg_stats) + sizeof(*lpni_hstats);
3363         size *= lp->lp_nnis;
3364         if (size > cfg->prcfg_size) {
3365                 cfg->prcfg_size = size;
3366                 rc = -E2BIG;
3367                 goto out_lp_decref;
3368         }
3369
3370         cfg->prcfg_prim_nid = lp->lp_primary_nid;
3371         cfg->prcfg_mr = lnet_peer_is_multi_rail(lp);
3372         cfg->prcfg_cfg_nid = lp->lp_primary_nid;
3373         cfg->prcfg_count = lp->lp_nnis;
3374         cfg->prcfg_size = size;
3375         cfg->prcfg_state = lp->lp_state;
3376
3377         /* Allocate helper buffers. */
3378         rc = -ENOMEM;
3379         LIBCFS_ALLOC(lpni_info, sizeof(*lpni_info));
3380         if (!lpni_info)
3381                 goto out_lp_decref;
3382         LIBCFS_ALLOC(lpni_stats, sizeof(*lpni_stats));
3383         if (!lpni_stats)
3384                 goto out_free_info;
3385         LIBCFS_ALLOC(lpni_msg_stats, sizeof(*lpni_msg_stats));
3386         if (!lpni_msg_stats)
3387                 goto out_free_stats;
3388         LIBCFS_ALLOC(lpni_hstats, sizeof(*lpni_hstats));
3389         if (!lpni_hstats)
3390                 goto out_free_msg_stats;
3391
3392
3393         lpni = NULL;
3394         rc = -EFAULT;
3395         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
3396                 nid = lpni->lpni_nid;
3397                 if (copy_to_user(bulk, &nid, sizeof(nid)))
3398                         goto out_free_hstats;
3399                 bulk += sizeof(nid);
3400
3401                 memset(lpni_info, 0, sizeof(*lpni_info));
3402                 snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
3403                 if (lnet_isrouter(lpni) ||
3404                         lnet_peer_aliveness_enabled(lpni))
3405                         snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN,
3406                                 lpni->lpni_alive ? "up" : "down");
3407
3408                 lpni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
3409                 lpni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
3410                         lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
3411                 lpni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
3412                 lpni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
3413                 lpni_info->cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
3414                 lpni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
3415                 lpni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
3416                 if (copy_to_user(bulk, lpni_info, sizeof(*lpni_info)))
3417                         goto out_free_hstats;
3418                 bulk += sizeof(*lpni_info);
3419
3420                 memset(lpni_stats, 0, sizeof(*lpni_stats));
3421                 lpni_stats->iel_send_count = lnet_sum_stats(&lpni->lpni_stats,
3422                                                             LNET_STATS_TYPE_SEND);
3423                 lpni_stats->iel_recv_count = lnet_sum_stats(&lpni->lpni_stats,
3424                                                             LNET_STATS_TYPE_RECV);
3425                 lpni_stats->iel_drop_count = lnet_sum_stats(&lpni->lpni_stats,
3426                                                             LNET_STATS_TYPE_DROP);
3427                 if (copy_to_user(bulk, lpni_stats, sizeof(*lpni_stats)))
3428                         goto out_free_hstats;
3429                 bulk += sizeof(*lpni_stats);
3430                 lnet_usr_translate_stats(lpni_msg_stats, &lpni->lpni_stats);
3431                 if (copy_to_user(bulk, lpni_msg_stats, sizeof(*lpni_msg_stats)))
3432                         goto out_free_hstats;
3433                 bulk += sizeof(*lpni_msg_stats);
3434                 lpni_hstats->hlpni_network_timeout =
3435                   atomic_read(&lpni->lpni_hstats.hlt_network_timeout);
3436                 lpni_hstats->hlpni_remote_dropped =
3437                   atomic_read(&lpni->lpni_hstats.hlt_remote_dropped);
3438                 lpni_hstats->hlpni_remote_timeout =
3439                   atomic_read(&lpni->lpni_hstats.hlt_remote_timeout);
3440                 lpni_hstats->hlpni_remote_error =
3441                   atomic_read(&lpni->lpni_hstats.hlt_remote_error);
3442                 lpni_hstats->hlpni_health_value =
3443                   atomic_read(&lpni->lpni_healthv);
3444                 if (copy_to_user(bulk, lpni_hstats, sizeof(*lpni_hstats)))
3445                         goto out_free_hstats;
3446                 bulk += sizeof(*lpni_hstats);
3447         }
3448         rc = 0;
3449
3450 out_free_hstats:
3451         LIBCFS_FREE(lpni_hstats, sizeof(*lpni_hstats));
3452 out_free_msg_stats:
3453         LIBCFS_FREE(lpni_msg_stats, sizeof(*lpni_msg_stats));
3454 out_free_stats:
3455         LIBCFS_FREE(lpni_stats, sizeof(*lpni_stats));
3456 out_free_info:
3457         LIBCFS_FREE(lpni_info, sizeof(*lpni_info));
3458 out_lp_decref:
3459         lnet_peer_decref_locked(lp);
3460 out:
3461         return rc;
3462 }
3463
3464 void
3465 lnet_peer_ni_add_to_recoveryq_locked(struct lnet_peer_ni *lpni)
3466 {
3467         /* the mt could've shutdown and cleaned up the queues */
3468         if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
3469                 return;
3470
3471         if (list_empty(&lpni->lpni_recovery) &&
3472             atomic_read(&lpni->lpni_healthv) < LNET_MAX_HEALTH_VALUE) {
3473                 CERROR("lpni %s added to recovery queue. Health = %d\n",
3474                         libcfs_nid2str(lpni->lpni_nid),
3475                         atomic_read(&lpni->lpni_healthv));
3476                 list_add_tail(&lpni->lpni_recovery, &the_lnet.ln_mt_peerNIRecovq);
3477                 lnet_peer_ni_addref_locked(lpni);
3478         }
3479 }
3480
3481 /* Call with the ln_api_mutex held */
3482 void
3483 lnet_peer_ni_set_healthv(lnet_nid_t nid, int value, bool all)
3484 {
3485         struct lnet_peer_table *ptable;
3486         struct lnet_peer *lp;
3487         struct lnet_peer_net *lpn;
3488         struct lnet_peer_ni *lpni;
3489         int lncpt;
3490         int cpt;
3491
3492         if (the_lnet.ln_state != LNET_STATE_RUNNING)
3493                 return;
3494
3495         if (!all) {
3496                 lnet_net_lock(LNET_LOCK_EX);
3497                 lpni = lnet_find_peer_ni_locked(nid);
3498                 if (!lpni) {
3499                         lnet_net_unlock(LNET_LOCK_EX);
3500                         return;
3501                 }
3502                 atomic_set(&lpni->lpni_healthv, value);
3503                 lnet_peer_ni_add_to_recoveryq_locked(lpni);
3504                 lnet_peer_ni_decref_locked(lpni);
3505                 lnet_net_unlock(LNET_LOCK_EX);
3506                 return;
3507         }
3508
3509         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3510
3511         /*
3512          * Walk all the peers and reset the healhv for each one to the
3513          * maximum value.
3514          */
3515         lnet_net_lock(LNET_LOCK_EX);
3516         for (cpt = 0; cpt < lncpt; cpt++) {
3517                 ptable = the_lnet.ln_peer_tables[cpt];
3518                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
3519                         list_for_each_entry(lpn, &lp->lp_peer_nets, lpn_peer_nets) {
3520                                 list_for_each_entry(lpni, &lpn->lpn_peer_nis,
3521                                                     lpni_peer_nis) {
3522                                         atomic_set(&lpni->lpni_healthv, value);
3523                                         lnet_peer_ni_add_to_recoveryq_locked(lpni);
3524                                 }
3525                         }
3526                 }
3527         }
3528         lnet_net_unlock(LNET_LOCK_EX);
3529 }
3530