Whamcloud - gitweb
LU-12014 llite: check correct size in ll_dom_finish_open()
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <linux/sched.h>
38 #ifdef HAVE_SCHED_HEADERS
39 #include <linux/sched/signal.h>
40 #endif
41 #include <linux/uaccess.h>
42
43 #include <lnet/lib-lnet.h>
44 #include <uapi/linux/lnet/lnet-dlc.h>
45
46 /* Value indicating that recovery needs to re-check a peer immediately. */
47 #define LNET_REDISCOVER_PEER    (1)
48
49 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp);
50
51 static void
52 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
53 {
54         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
55                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
56                 lnet_peer_ni_decref_locked(lpni);
57         }
58 }
59
60 void
61 lnet_peer_net_added(struct lnet_net *net)
62 {
63         struct lnet_peer_ni *lpni, *tmp;
64
65         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
66                                  lpni_on_remote_peer_ni_list) {
67
68                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
69                         lpni->lpni_net = net;
70
71                         spin_lock(&lpni->lpni_lock);
72                         lpni->lpni_txcredits =
73                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
74                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
75                         lpni->lpni_rtrcredits =
76                                 lnet_peer_buffer_credits(lpni->lpni_net);
77                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
78                         spin_unlock(&lpni->lpni_lock);
79
80                         lnet_peer_remove_from_remote_list(lpni);
81                 }
82         }
83 }
84
85 static void
86 lnet_peer_tables_destroy(void)
87 {
88         struct lnet_peer_table  *ptable;
89         struct list_head        *hash;
90         int                     i;
91         int                     j;
92
93         if (!the_lnet.ln_peer_tables)
94                 return;
95
96         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
97                 hash = ptable->pt_hash;
98                 if (!hash) /* not intialized */
99                         break;
100
101                 LASSERT(list_empty(&ptable->pt_zombie_list));
102
103                 ptable->pt_hash = NULL;
104                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
105                         LASSERT(list_empty(&hash[j]));
106
107                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
108         }
109
110         cfs_percpt_free(the_lnet.ln_peer_tables);
111         the_lnet.ln_peer_tables = NULL;
112 }
113
114 int
115 lnet_peer_tables_create(void)
116 {
117         struct lnet_peer_table  *ptable;
118         struct list_head        *hash;
119         int                     i;
120         int                     j;
121
122         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
123                                                    sizeof(*ptable));
124         if (the_lnet.ln_peer_tables == NULL) {
125                 CERROR("Failed to allocate cpu-partition peer tables\n");
126                 return -ENOMEM;
127         }
128
129         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
130                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
131                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
132                 if (hash == NULL) {
133                         CERROR("Failed to create peer hash table\n");
134                         lnet_peer_tables_destroy();
135                         return -ENOMEM;
136                 }
137
138                 spin_lock_init(&ptable->pt_zombie_lock);
139                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
140
141                 INIT_LIST_HEAD(&ptable->pt_peer_list);
142
143                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
144                         INIT_LIST_HEAD(&hash[j]);
145                 ptable->pt_hash = hash; /* sign of initialization */
146         }
147
148         return 0;
149 }
150
151 static struct lnet_peer_ni *
152 lnet_peer_ni_alloc(lnet_nid_t nid)
153 {
154         struct lnet_peer_ni *lpni;
155         struct lnet_net *net;
156         int cpt;
157
158         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
159
160         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
161         if (!lpni)
162                 return NULL;
163
164         INIT_LIST_HEAD(&lpni->lpni_txq);
165         INIT_LIST_HEAD(&lpni->lpni_rtrq);
166         INIT_LIST_HEAD(&lpni->lpni_routes);
167         INIT_LIST_HEAD(&lpni->lpni_hashlist);
168         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
169         INIT_LIST_HEAD(&lpni->lpni_recovery);
170         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
171
172         spin_lock_init(&lpni->lpni_lock);
173
174         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
175         lpni->lpni_last_alive = ktime_get_seconds(); /* assumes alive */
176         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
177         lpni->lpni_nid = nid;
178         lpni->lpni_cpt = cpt;
179         atomic_set(&lpni->lpni_healthv, LNET_MAX_HEALTH_VALUE);
180
181         net = lnet_get_net_locked(LNET_NIDNET(nid));
182         lpni->lpni_net = net;
183         if (net) {
184                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
185                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
186                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
187                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
188         } else {
189                 /*
190                  * This peer_ni is not on a local network, so we
191                  * cannot add the credits here. In case the net is
192                  * added later, add the peer_ni to the remote peer ni
193                  * list so it can be easily found and revisited.
194                  */
195                 /* FIXME: per-net implementation instead? */
196                 atomic_inc(&lpni->lpni_refcount);
197                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
198                               &the_lnet.ln_remote_peer_ni_list);
199         }
200
201         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
202
203         return lpni;
204 }
205
206 static struct lnet_peer_net *
207 lnet_peer_net_alloc(__u32 net_id)
208 {
209         struct lnet_peer_net *lpn;
210
211         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
212         if (!lpn)
213                 return NULL;
214
215         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
216         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
217         lpn->lpn_net_id = net_id;
218
219         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
220
221         return lpn;
222 }
223
224 void
225 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
226 {
227         struct lnet_peer *lp;
228
229         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
230
231         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
232         LASSERT(list_empty(&lpn->lpn_peer_nis));
233         LASSERT(list_empty(&lpn->lpn_peer_nets));
234         lp = lpn->lpn_peer;
235         lpn->lpn_peer = NULL;
236         LIBCFS_FREE(lpn, sizeof(*lpn));
237
238         lnet_peer_decref_locked(lp);
239 }
240
241 static struct lnet_peer *
242 lnet_peer_alloc(lnet_nid_t nid)
243 {
244         struct lnet_peer *lp;
245
246         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
247         if (!lp)
248                 return NULL;
249
250         INIT_LIST_HEAD(&lp->lp_peer_list);
251         INIT_LIST_HEAD(&lp->lp_peer_nets);
252         INIT_LIST_HEAD(&lp->lp_dc_list);
253         INIT_LIST_HEAD(&lp->lp_dc_pendq);
254         init_waitqueue_head(&lp->lp_dc_waitq);
255         spin_lock_init(&lp->lp_lock);
256         lp->lp_primary_nid = nid;
257         /*
258          * Turn off discovery for loopback peer. If you're creating a peer
259          * for the loopback interface then that was initiated when we
260          * attempted to send a message over the loopback. There is no need
261          * to ever use a different interface when sending messages to
262          * myself.
263          */
264         if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
265                 lp->lp_state = LNET_PEER_NO_DISCOVERY;
266         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
267
268         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
269
270         return lp;
271 }
272
273 void
274 lnet_destroy_peer_locked(struct lnet_peer *lp)
275 {
276         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
277
278         LASSERT(atomic_read(&lp->lp_refcount) == 0);
279         LASSERT(list_empty(&lp->lp_peer_nets));
280         LASSERT(list_empty(&lp->lp_peer_list));
281         LASSERT(list_empty(&lp->lp_dc_list));
282
283         if (lp->lp_data)
284                 lnet_ping_buffer_decref(lp->lp_data);
285
286         /*
287          * if there are messages still on the pending queue, then make
288          * sure to queue them on the ln_msg_resend list so they can be
289          * resent at a later point if the discovery thread is still
290          * running.
291          * If the discovery thread has stopped, then the wakeup will be a
292          * no-op, and it is expected the lnet_shutdown_lndnets() will
293          * eventually be called, which will traverse this list and
294          * finalize the messages on the list.
295          * We can not resend them now because we're holding the cpt lock.
296          * Releasing the lock can cause an inconsistent state
297          */
298         spin_lock(&the_lnet.ln_msg_resend_lock);
299         list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
300         spin_unlock(&the_lnet.ln_msg_resend_lock);
301         wake_up(&the_lnet.ln_dc_waitq);
302
303         LIBCFS_FREE(lp, sizeof(*lp));
304 }
305
306 /*
307  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
308  * that peer_net, detach the peer_net from the peer.
309  *
310  * Call with lnet_net_lock/EX held
311  */
312 static void
313 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
314 {
315         struct lnet_peer_table *ptable;
316         struct lnet_peer_net *lpn;
317         struct lnet_peer *lp;
318
319         /*
320          * Belts and suspenders: gracefully handle teardown of a
321          * partially connected peer_ni.
322          */
323         lpn = lpni->lpni_peer_net;
324
325         list_del_init(&lpni->lpni_peer_nis);
326         /*
327          * If there are no lpni's left, we detach lpn from
328          * lp_peer_nets, so it cannot be found anymore.
329          */
330         if (list_empty(&lpn->lpn_peer_nis))
331                 list_del_init(&lpn->lpn_peer_nets);
332
333         /* Update peer NID count. */
334         lp = lpn->lpn_peer;
335         lp->lp_nnis--;
336
337         /*
338          * If there are no more peer nets, make the peer unfindable
339          * via the peer_tables.
340          *
341          * Otherwise, if the peer is DISCOVERED, tell discovery to
342          * take another look at it. This is a no-op if discovery for
343          * this peer did the detaching.
344          */
345         if (list_empty(&lp->lp_peer_nets)) {
346                 list_del_init(&lp->lp_peer_list);
347                 ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
348                 ptable->pt_peers--;
349         } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) {
350                 /* Discovery isn't running, nothing to do here. */
351         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
352                 lnet_peer_queue_for_discovery(lp);
353                 wake_up(&the_lnet.ln_dc_waitq);
354         }
355         CDEBUG(D_NET, "peer %s NID %s\n",
356                 libcfs_nid2str(lp->lp_primary_nid),
357                 libcfs_nid2str(lpni->lpni_nid));
358 }
359
360 /* called with lnet_net_lock LNET_LOCK_EX held */
361 static int
362 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
363 {
364         struct lnet_peer_table *ptable = NULL;
365
366         /* don't remove a peer_ni if it's also a gateway */
367         if (lpni->lpni_rtr_refcount > 0) {
368                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
369                        libcfs_nid2str(lpni->lpni_nid));
370                 return -EBUSY;
371         }
372
373         lnet_peer_remove_from_remote_list(lpni);
374
375         /* remove peer ni from the hash list. */
376         list_del_init(&lpni->lpni_hashlist);
377
378         /*
379          * indicate the peer is being deleted so the monitor thread can
380          * remove it from the recovery queue.
381          */
382         spin_lock(&lpni->lpni_lock);
383         lpni->lpni_state |= LNET_PEER_NI_DELETING;
384         spin_unlock(&lpni->lpni_lock);
385
386         /* decrement the ref count on the peer table */
387         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
388         LASSERT(ptable->pt_number > 0);
389         ptable->pt_number--;
390
391         /*
392          * The peer_ni can no longer be found with a lookup. But there
393          * can be current users, so keep track of it on the zombie
394          * list until the reference count has gone to zero.
395          *
396          * The last reference may be lost in a place where the
397          * lnet_net_lock locks only a single cpt, and that cpt may not
398          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
399          * has its own lock.
400          */
401         spin_lock(&ptable->pt_zombie_lock);
402         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
403         ptable->pt_zombies++;
404         spin_unlock(&ptable->pt_zombie_lock);
405
406         /* no need to keep this peer_ni on the hierarchy anymore */
407         lnet_peer_detach_peer_ni_locked(lpni);
408
409         /* remove hashlist reference on peer_ni */
410         lnet_peer_ni_decref_locked(lpni);
411
412         return 0;
413 }
414
415 void lnet_peer_uninit(void)
416 {
417         struct lnet_peer_ni *lpni, *tmp;
418
419         lnet_net_lock(LNET_LOCK_EX);
420
421         /* remove all peer_nis from the remote peer and the hash list */
422         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
423                                  lpni_on_remote_peer_ni_list)
424                 lnet_peer_ni_del_locked(lpni);
425
426         lnet_peer_tables_destroy();
427
428         lnet_net_unlock(LNET_LOCK_EX);
429 }
430
431 static int
432 lnet_peer_del_locked(struct lnet_peer *peer)
433 {
434         struct lnet_peer_ni *lpni = NULL, *lpni2;
435         int rc = 0, rc2 = 0;
436
437         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
438
439         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
440         while (lpni != NULL) {
441                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
442                 rc = lnet_peer_ni_del_locked(lpni);
443                 if (rc != 0)
444                         rc2 = rc;
445                 lpni = lpni2;
446         }
447
448         return rc2;
449 }
450
451 static int
452 lnet_peer_del(struct lnet_peer *peer)
453 {
454         lnet_net_lock(LNET_LOCK_EX);
455         lnet_peer_del_locked(peer);
456         lnet_net_unlock(LNET_LOCK_EX);
457
458         return 0;
459 }
460
461 /*
462  * Delete a NID from a peer. Call with ln_api_mutex held.
463  *
464  * Error codes:
465  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
466  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
467  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
468  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
469  */
470 static int
471 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
472 {
473         struct lnet_peer_ni *lpni;
474         lnet_nid_t primary_nid = lp->lp_primary_nid;
475         int rc = 0;
476
477         if (!(flags & LNET_PEER_CONFIGURED)) {
478                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
479                         rc = -EPERM;
480                         goto out;
481                 }
482         }
483         lpni = lnet_find_peer_ni_locked(nid);
484         if (!lpni) {
485                 rc = -ENOENT;
486                 goto out;
487         }
488         lnet_peer_ni_decref_locked(lpni);
489         if (lp != lpni->lpni_peer_net->lpn_peer) {
490                 rc = -ECHILD;
491                 goto out;
492         }
493
494         /*
495          * This function only allows deletion of the primary NID if it
496          * is the only NID.
497          */
498         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
499                 rc = -EBUSY;
500                 goto out;
501         }
502
503         lnet_net_lock(LNET_LOCK_EX);
504
505         rc = lnet_peer_ni_del_locked(lpni);
506
507         lnet_net_unlock(LNET_LOCK_EX);
508
509 out:
510         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
511                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
512
513         return rc;
514 }
515
516 static void
517 lnet_peer_table_cleanup_locked(struct lnet_net *net,
518                                struct lnet_peer_table *ptable)
519 {
520         int                      i;
521         struct lnet_peer_ni     *next;
522         struct lnet_peer_ni     *lpni;
523         struct lnet_peer        *peer;
524
525         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
526                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
527                                          lpni_hashlist) {
528                         if (net != NULL && net != lpni->lpni_net)
529                                 continue;
530
531                         peer = lpni->lpni_peer_net->lpn_peer;
532                         if (peer->lp_primary_nid != lpni->lpni_nid) {
533                                 lnet_peer_ni_del_locked(lpni);
534                                 continue;
535                         }
536                         /*
537                          * Removing the primary NID implies removing
538                          * the entire peer. Advance next beyond any
539                          * peer_ni that belongs to the same peer.
540                          */
541                         list_for_each_entry_from(next, &ptable->pt_hash[i],
542                                                  lpni_hashlist) {
543                                 if (next->lpni_peer_net->lpn_peer != peer)
544                                         break;
545                         }
546                         lnet_peer_del_locked(peer);
547                 }
548         }
549 }
550
551 static void
552 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
553 {
554         int     i = 3;
555
556         spin_lock(&ptable->pt_zombie_lock);
557         while (ptable->pt_zombies) {
558                 spin_unlock(&ptable->pt_zombie_lock);
559
560                 if (is_power_of_2(i)) {
561                         CDEBUG(D_WARNING,
562                                "Waiting for %d zombies on peer table\n",
563                                ptable->pt_zombies);
564                 }
565                 set_current_state(TASK_UNINTERRUPTIBLE);
566                 schedule_timeout(cfs_time_seconds(1) >> 1);
567                 spin_lock(&ptable->pt_zombie_lock);
568         }
569         spin_unlock(&ptable->pt_zombie_lock);
570 }
571
572 static void
573 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
574                                 struct lnet_peer_table *ptable)
575 {
576         struct lnet_peer_ni     *lp;
577         struct lnet_peer_ni     *tmp;
578         lnet_nid_t              lpni_nid;
579         int                     i;
580
581         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
582                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
583                                          lpni_hashlist) {
584                         if (net != lp->lpni_net)
585                                 continue;
586
587                         if (lp->lpni_rtr_refcount == 0)
588                                 continue;
589
590                         lpni_nid = lp->lpni_nid;
591
592                         lnet_net_unlock(LNET_LOCK_EX);
593                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
594                         lnet_net_lock(LNET_LOCK_EX);
595                 }
596         }
597 }
598
599 void
600 lnet_peer_tables_cleanup(struct lnet_net *net)
601 {
602         int i;
603         struct lnet_peer_table *ptable;
604
605         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
606         /* If just deleting the peers for a NI, get rid of any routes these
607          * peers are gateways for. */
608         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
609                 lnet_net_lock(LNET_LOCK_EX);
610                 lnet_peer_table_del_rtrs_locked(net, ptable);
611                 lnet_net_unlock(LNET_LOCK_EX);
612         }
613
614         /* Start the cleanup process */
615         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
616                 lnet_net_lock(LNET_LOCK_EX);
617                 lnet_peer_table_cleanup_locked(net, ptable);
618                 lnet_net_unlock(LNET_LOCK_EX);
619         }
620
621         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
622                 lnet_peer_ni_finalize_wait(ptable);
623 }
624
625 static struct lnet_peer_ni *
626 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
627 {
628         struct list_head        *peers;
629         struct lnet_peer_ni     *lp;
630
631         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
632
633         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
634         list_for_each_entry(lp, peers, lpni_hashlist) {
635                 if (lp->lpni_nid == nid) {
636                         lnet_peer_ni_addref_locked(lp);
637                         return lp;
638                 }
639         }
640
641         return NULL;
642 }
643
644 struct lnet_peer_ni *
645 lnet_find_peer_ni_locked(lnet_nid_t nid)
646 {
647         struct lnet_peer_ni *lpni;
648         struct lnet_peer_table *ptable;
649         int cpt;
650
651         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
652
653         ptable = the_lnet.ln_peer_tables[cpt];
654         lpni = lnet_get_peer_ni_locked(ptable, nid);
655
656         return lpni;
657 }
658
659 struct lnet_peer *
660 lnet_find_peer(lnet_nid_t nid)
661 {
662         struct lnet_peer_ni *lpni;
663         struct lnet_peer *lp = NULL;
664         int cpt;
665
666         cpt = lnet_net_lock_current();
667         lpni = lnet_find_peer_ni_locked(nid);
668         if (lpni) {
669                 lp = lpni->lpni_peer_net->lpn_peer;
670                 lnet_peer_addref_locked(lp);
671                 lnet_peer_ni_decref_locked(lpni);
672         }
673         lnet_net_unlock(cpt);
674
675         return lp;
676 }
677
678 struct lnet_peer_ni *
679 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
680                              struct lnet_peer_net *peer_net,
681                              struct lnet_peer_ni *prev)
682 {
683         struct lnet_peer_ni *lpni;
684         struct lnet_peer_net *net = peer_net;
685
686         if (!prev) {
687                 if (!net) {
688                         if (list_empty(&peer->lp_peer_nets))
689                                 return NULL;
690
691                         net = list_entry(peer->lp_peer_nets.next,
692                                          struct lnet_peer_net,
693                                          lpn_peer_nets);
694                 }
695                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
696                                   lpni_peer_nis);
697
698                 return lpni;
699         }
700
701         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
702                 /*
703                  * if you reached the end of the peer ni list and the peer
704                  * net is specified then there are no more peer nis in that
705                  * net.
706                  */
707                 if (net)
708                         return NULL;
709
710                 /*
711                  * we reached the end of this net ni list. move to the
712                  * next net
713                  */
714                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
715                     &peer->lp_peer_nets)
716                         /* no more nets and no more NIs. */
717                         return NULL;
718
719                 /* get the next net */
720                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
721                                  struct lnet_peer_net,
722                                  lpn_peer_nets);
723                 /* get the ni on it */
724                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
725                                   lpni_peer_nis);
726
727                 return lpni;
728         }
729
730         /* there are more nis left */
731         lpni = list_entry(prev->lpni_peer_nis.next,
732                           struct lnet_peer_ni, lpni_peer_nis);
733
734         return lpni;
735 }
736
737 /* Call with the ln_api_mutex held */
738 int lnet_get_peer_list(u32 *countp, u32 *sizep, struct lnet_process_id __user *ids)
739 {
740         struct lnet_process_id id;
741         struct lnet_peer_table *ptable;
742         struct lnet_peer *lp;
743         __u32 count = 0;
744         __u32 size = 0;
745         int lncpt;
746         int cpt;
747         __u32 i;
748         int rc;
749
750         rc = -ESHUTDOWN;
751         if (the_lnet.ln_state != LNET_STATE_RUNNING)
752                 goto done;
753
754         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
755
756         /*
757          * Count the number of peers, and return E2BIG if the buffer
758          * is too small. We'll also return the desired size.
759          */
760         rc = -E2BIG;
761         for (cpt = 0; cpt < lncpt; cpt++) {
762                 ptable = the_lnet.ln_peer_tables[cpt];
763                 count += ptable->pt_peers;
764         }
765         size = count * sizeof(*ids);
766         if (size > *sizep)
767                 goto done;
768
769         /*
770          * Walk the peer lists and copy out the primary nids.
771          * This is safe because the peer lists are only modified
772          * while the ln_api_mutex is held. So we don't need to
773          * hold the lnet_net_lock as well, and can therefore
774          * directly call copy_to_user().
775          */
776         rc = -EFAULT;
777         memset(&id, 0, sizeof(id));
778         id.pid = LNET_PID_LUSTRE;
779         i = 0;
780         for (cpt = 0; cpt < lncpt; cpt++) {
781                 ptable = the_lnet.ln_peer_tables[cpt];
782                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
783                         if (i >= count)
784                                 goto done;
785                         id.nid = lp->lp_primary_nid;
786                         if (copy_to_user(&ids[i], &id, sizeof(id)))
787                                 goto done;
788                         i++;
789                 }
790         }
791         rc = 0;
792 done:
793         *countp = count;
794         *sizep = size;
795         return rc;
796 }
797
798 /*
799  * Start pushes to peers that need to be updated for a configuration
800  * change on this node.
801  */
802 void
803 lnet_push_update_to_peers(int force)
804 {
805         struct lnet_peer_table *ptable;
806         struct lnet_peer *lp;
807         int lncpt;
808         int cpt;
809
810         lnet_net_lock(LNET_LOCK_EX);
811         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
812         for (cpt = 0; cpt < lncpt; cpt++) {
813                 ptable = the_lnet.ln_peer_tables[cpt];
814                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
815                         if (force) {
816                                 spin_lock(&lp->lp_lock);
817                                 if (lp->lp_state & LNET_PEER_MULTI_RAIL)
818                                         lp->lp_state |= LNET_PEER_FORCE_PUSH;
819                                 spin_unlock(&lp->lp_lock);
820                         }
821                         if (lnet_peer_needs_push(lp))
822                                 lnet_peer_queue_for_discovery(lp);
823                 }
824         }
825         lnet_net_unlock(LNET_LOCK_EX);
826         wake_up(&the_lnet.ln_dc_waitq);
827 }
828
829 /*
830  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
831  * this is a preferred point-to-point path. Call with lnet_net_lock in
832  * shared mmode.
833  */
834 bool
835 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
836 {
837         int i;
838
839         if (lpni->lpni_pref_nnids == 0)
840                 return false;
841         if (lpni->lpni_pref_nnids == 1)
842                 return lpni->lpni_pref.nid == nid;
843         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
844                 if (lpni->lpni_pref.nids[i] == nid)
845                         return true;
846         }
847         return false;
848 }
849
850 /*
851  * Set a single ni as preferred, provided no preferred ni is already
852  * defined. Only to be used for non-multi-rail peer_ni.
853  */
854 int
855 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
856 {
857         int rc = 0;
858
859         spin_lock(&lpni->lpni_lock);
860         if (nid == LNET_NID_ANY) {
861                 rc = -EINVAL;
862         } else if (lpni->lpni_pref_nnids > 0) {
863                 rc = -EPERM;
864         } else if (lpni->lpni_pref_nnids == 0) {
865                 lpni->lpni_pref.nid = nid;
866                 lpni->lpni_pref_nnids = 1;
867                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
868         }
869         spin_unlock(&lpni->lpni_lock);
870
871         CDEBUG(D_NET, "peer %s nid %s: %d\n",
872                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
873         return rc;
874 }
875
876 /*
877  * Clear the preferred NID from a non-multi-rail peer_ni, provided
878  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
879  */
880 int
881 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
882 {
883         int rc = 0;
884
885         spin_lock(&lpni->lpni_lock);
886         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
887                 lpni->lpni_pref_nnids = 0;
888                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
889         } else if (lpni->lpni_pref_nnids == 0) {
890                 rc = -ENOENT;
891         } else {
892                 rc = -EPERM;
893         }
894         spin_unlock(&lpni->lpni_lock);
895
896         CDEBUG(D_NET, "peer %s: %d\n",
897                libcfs_nid2str(lpni->lpni_nid), rc);
898         return rc;
899 }
900
901 /*
902  * Clear the preferred NIDs from a non-multi-rail peer.
903  */
904 void
905 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
906 {
907         struct lnet_peer_ni *lpni = NULL;
908
909         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
910                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
911 }
912
913 int
914 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
915 {
916         lnet_nid_t *nids = NULL;
917         lnet_nid_t *oldnids = NULL;
918         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
919         int size;
920         int i;
921         int rc = 0;
922
923         if (nid == LNET_NID_ANY) {
924                 rc = -EINVAL;
925                 goto out;
926         }
927
928         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
929                 rc = -EEXIST;
930                 goto out;
931         }
932
933         /* A non-MR node may have only one preferred NI per peer_ni */
934         if (lpni->lpni_pref_nnids > 0) {
935                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
936                         rc = -EPERM;
937                         goto out;
938                 }
939         }
940
941         if (lpni->lpni_pref_nnids != 0) {
942                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
943                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
944                 if (!nids) {
945                         rc = -ENOMEM;
946                         goto out;
947                 }
948                 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
949                         if (lpni->lpni_pref.nids[i] == nid) {
950                                 LIBCFS_FREE(nids, size);
951                                 rc = -EEXIST;
952                                 goto out;
953                         }
954                         nids[i] = lpni->lpni_pref.nids[i];
955                 }
956                 nids[i] = nid;
957         }
958
959         lnet_net_lock(LNET_LOCK_EX);
960         spin_lock(&lpni->lpni_lock);
961         if (lpni->lpni_pref_nnids == 0) {
962                 lpni->lpni_pref.nid = nid;
963         } else {
964                 oldnids = lpni->lpni_pref.nids;
965                 lpni->lpni_pref.nids = nids;
966         }
967         lpni->lpni_pref_nnids++;
968         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
969         spin_unlock(&lpni->lpni_lock);
970         lnet_net_unlock(LNET_LOCK_EX);
971
972         if (oldnids) {
973                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
974                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
975         }
976 out:
977         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
978                 spin_lock(&lpni->lpni_lock);
979                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
980                 spin_unlock(&lpni->lpni_lock);
981         }
982         CDEBUG(D_NET, "peer %s nid %s: %d\n",
983                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
984         return rc;
985 }
986
987 int
988 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
989 {
990         lnet_nid_t *nids = NULL;
991         lnet_nid_t *oldnids = NULL;
992         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
993         int size;
994         int i, j;
995         int rc = 0;
996
997         if (lpni->lpni_pref_nnids == 0) {
998                 rc = -ENOENT;
999                 goto out;
1000         }
1001
1002         if (lpni->lpni_pref_nnids == 1) {
1003                 if (lpni->lpni_pref.nid != nid) {
1004                         rc = -ENOENT;
1005                         goto out;
1006                 }
1007         } else if (lpni->lpni_pref_nnids == 2) {
1008                 if (lpni->lpni_pref.nids[0] != nid &&
1009                     lpni->lpni_pref.nids[1] != nid) {
1010                         rc = -ENOENT;
1011                         goto out;
1012                 }
1013         } else {
1014                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
1015                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
1016                 if (!nids) {
1017                         rc = -ENOMEM;
1018                         goto out;
1019                 }
1020                 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
1021                         if (lpni->lpni_pref.nids[i] != nid)
1022                                 continue;
1023                         nids[j++] = lpni->lpni_pref.nids[i];
1024                 }
1025                 /* Check if we actually removed a nid. */
1026                 if (j == lpni->lpni_pref_nnids) {
1027                         LIBCFS_FREE(nids, size);
1028                         rc = -ENOENT;
1029                         goto out;
1030                 }
1031         }
1032
1033         lnet_net_lock(LNET_LOCK_EX);
1034         spin_lock(&lpni->lpni_lock);
1035         if (lpni->lpni_pref_nnids == 1) {
1036                 lpni->lpni_pref.nid = LNET_NID_ANY;
1037         } else if (lpni->lpni_pref_nnids == 2) {
1038                 oldnids = lpni->lpni_pref.nids;
1039                 if (oldnids[0] == nid)
1040                         lpni->lpni_pref.nid = oldnids[1];
1041                 else
1042                         lpni->lpni_pref.nid = oldnids[2];
1043         } else {
1044                 oldnids = lpni->lpni_pref.nids;
1045                 lpni->lpni_pref.nids = nids;
1046         }
1047         lpni->lpni_pref_nnids--;
1048         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1049         spin_unlock(&lpni->lpni_lock);
1050         lnet_net_unlock(LNET_LOCK_EX);
1051
1052         if (oldnids) {
1053                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
1054                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
1055         }
1056 out:
1057         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1058                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
1059         return rc;
1060 }
1061
1062 lnet_nid_t
1063 lnet_peer_primary_nid_locked(lnet_nid_t nid)
1064 {
1065         struct lnet_peer_ni *lpni;
1066         lnet_nid_t primary_nid = nid;
1067
1068         lpni = lnet_find_peer_ni_locked(nid);
1069         if (lpni) {
1070                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
1071                 lnet_peer_ni_decref_locked(lpni);
1072         }
1073
1074         return primary_nid;
1075 }
1076
1077 lnet_nid_t
1078 LNetPrimaryNID(lnet_nid_t nid)
1079 {
1080         struct lnet_peer *lp;
1081         struct lnet_peer_ni *lpni;
1082         lnet_nid_t primary_nid = nid;
1083         int rc = 0;
1084         int cpt;
1085
1086         cpt = lnet_net_lock_current();
1087         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1088         if (IS_ERR(lpni)) {
1089                 rc = PTR_ERR(lpni);
1090                 goto out_unlock;
1091         }
1092         lp = lpni->lpni_peer_net->lpn_peer;
1093         while (!lnet_peer_is_uptodate(lp)) {
1094                 rc = lnet_discover_peer_locked(lpni, cpt, true);
1095                 if (rc)
1096                         goto out_decref;
1097                 lp = lpni->lpni_peer_net->lpn_peer;
1098         }
1099         primary_nid = lp->lp_primary_nid;
1100 out_decref:
1101         lnet_peer_ni_decref_locked(lpni);
1102 out_unlock:
1103         lnet_net_unlock(cpt);
1104
1105         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
1106                libcfs_nid2str(primary_nid), rc);
1107         return primary_nid;
1108 }
1109 EXPORT_SYMBOL(LNetPrimaryNID);
1110
1111 struct lnet_peer_net *
1112 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
1113 {
1114         struct lnet_peer_net *peer_net;
1115         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
1116                 if (peer_net->lpn_net_id == net_id)
1117                         return peer_net;
1118         }
1119         return NULL;
1120 }
1121
1122 /*
1123  * Attach a peer_ni to a peer_net and peer. This function assumes
1124  * peer_ni is not already attached to the peer_net/peer. The peer_ni
1125  * may be attached to a different peer, in which case it will be
1126  * properly detached first. The whole operation is done atomically.
1127  *
1128  * Always returns 0.  This is the last function called from functions
1129  * that do return an int, so returning 0 here allows the compiler to
1130  * do a tail call.
1131  */
1132 static int
1133 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
1134                                 struct lnet_peer_net *lpn,
1135                                 struct lnet_peer_ni *lpni,
1136                                 unsigned flags)
1137 {
1138         struct lnet_peer_table *ptable;
1139
1140         /* Install the new peer_ni */
1141         lnet_net_lock(LNET_LOCK_EX);
1142         /* Add peer_ni to global peer table hash, if necessary. */
1143         if (list_empty(&lpni->lpni_hashlist)) {
1144                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1145
1146                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1147                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1148                 ptable->pt_version++;
1149                 ptable->pt_number++;
1150                 /* This is the 1st refcount on lpni. */
1151                 atomic_inc(&lpni->lpni_refcount);
1152         }
1153
1154         /* Detach the peer_ni from an existing peer, if necessary. */
1155         if (lpni->lpni_peer_net) {
1156                 LASSERT(lpni->lpni_peer_net != lpn);
1157                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1158                 lnet_peer_detach_peer_ni_locked(lpni);
1159                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1160                 lpni->lpni_peer_net = NULL;
1161         }
1162
1163         /* Add peer_ni to peer_net */
1164         lpni->lpni_peer_net = lpn;
1165         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1166         lnet_peer_net_addref_locked(lpn);
1167
1168         /* Add peer_net to peer */
1169         if (!lpn->lpn_peer) {
1170                 lpn->lpn_peer = lp;
1171                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1172                 lnet_peer_addref_locked(lp);
1173         }
1174
1175         /* Add peer to global peer list, if necessary */
1176         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1177         if (list_empty(&lp->lp_peer_list)) {
1178                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1179                 ptable->pt_peers++;
1180         }
1181
1182
1183         /* Update peer state */
1184         spin_lock(&lp->lp_lock);
1185         if (flags & LNET_PEER_CONFIGURED) {
1186                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1187                         lp->lp_state |= LNET_PEER_CONFIGURED;
1188         }
1189         if (flags & LNET_PEER_MULTI_RAIL) {
1190                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1191                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1192                         lnet_peer_clr_non_mr_pref_nids(lp);
1193                 }
1194         }
1195         spin_unlock(&lp->lp_lock);
1196
1197         lp->lp_nnis++;
1198         lnet_net_unlock(LNET_LOCK_EX);
1199
1200         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1201                libcfs_nid2str(lp->lp_primary_nid),
1202                libcfs_nid2str(lpni->lpni_nid), flags);
1203
1204         return 0;
1205 }
1206
1207 /*
1208  * Create a new peer, with nid as its primary nid.
1209  *
1210  * Call with the lnet_api_mutex held.
1211  */
1212 static int
1213 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1214 {
1215         struct lnet_peer *lp;
1216         struct lnet_peer_net *lpn;
1217         struct lnet_peer_ni *lpni;
1218         int rc = 0;
1219
1220         LASSERT(nid != LNET_NID_ANY);
1221
1222         /*
1223          * No need for the lnet_net_lock here, because the
1224          * lnet_api_mutex is held.
1225          */
1226         lpni = lnet_find_peer_ni_locked(nid);
1227         if (lpni) {
1228                 /* A peer with this NID already exists. */
1229                 lp = lpni->lpni_peer_net->lpn_peer;
1230                 lnet_peer_ni_decref_locked(lpni);
1231                 /*
1232                  * This is an error if the peer was configured and the
1233                  * primary NID differs or an attempt is made to change
1234                  * the Multi-Rail flag. Otherwise the assumption is
1235                  * that an existing peer is being modified.
1236                  */
1237                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1238                         if (lp->lp_primary_nid != nid)
1239                                 rc = -EEXIST;
1240                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1241                                 rc = -EPERM;
1242                         goto out;
1243                 }
1244                 /* Delete and recreate as a configured peer. */
1245                 lnet_peer_del(lp);
1246         }
1247
1248         /* Create peer, peer_net, and peer_ni. */
1249         rc = -ENOMEM;
1250         lp = lnet_peer_alloc(nid);
1251         if (!lp)
1252                 goto out;
1253         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1254         if (!lpn)
1255                 goto out_free_lp;
1256         lpni = lnet_peer_ni_alloc(nid);
1257         if (!lpni)
1258                 goto out_free_lpn;
1259
1260         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1261
1262 out_free_lpn:
1263         LIBCFS_FREE(lpn, sizeof(*lpn));
1264 out_free_lp:
1265         LIBCFS_FREE(lp, sizeof(*lp));
1266 out:
1267         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1268                libcfs_nid2str(nid), flags, rc);
1269         return rc;
1270 }
1271
1272 /*
1273  * Add a NID to a peer. Call with ln_api_mutex held.
1274  *
1275  * Error codes:
1276  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1277  *  -EEXIST:   The NID was configured by DLC for a different peer.
1278  *  -ENOMEM:   Out of memory.
1279  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1280  *             non-multi-rail peer.
1281  */
1282 static int
1283 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1284 {
1285         struct lnet_peer_net *lpn;
1286         struct lnet_peer_ni *lpni;
1287         int rc = 0;
1288
1289         LASSERT(lp);
1290         LASSERT(nid != LNET_NID_ANY);
1291
1292         /* A configured peer can only be updated through configuration. */
1293         if (!(flags & LNET_PEER_CONFIGURED)) {
1294                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1295                         rc = -EPERM;
1296                         goto out;
1297                 }
1298         }
1299
1300         /*
1301          * The MULTI_RAIL flag can be set but not cleared, because
1302          * that would leave the peer struct in an invalid state.
1303          */
1304         if (flags & LNET_PEER_MULTI_RAIL) {
1305                 spin_lock(&lp->lp_lock);
1306                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1307                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1308                         lnet_peer_clr_non_mr_pref_nids(lp);
1309                 }
1310                 spin_unlock(&lp->lp_lock);
1311         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1312                 rc = -EPERM;
1313                 goto out;
1314         }
1315
1316         lpni = lnet_find_peer_ni_locked(nid);
1317         if (lpni) {
1318                 /*
1319                  * A peer_ni already exists. This is only a problem if
1320                  * it is not connected to this peer and was configured
1321                  * by DLC.
1322                  */
1323                 lnet_peer_ni_decref_locked(lpni);
1324                 if (lpni->lpni_peer_net->lpn_peer == lp)
1325                         goto out;
1326                 if (lnet_peer_ni_is_configured(lpni)) {
1327                         rc = -EEXIST;
1328                         goto out;
1329                 }
1330                 /* If this is the primary NID, destroy the peer. */
1331                 if (lnet_peer_ni_is_primary(lpni)) {
1332                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1333                         lpni = lnet_peer_ni_alloc(nid);
1334                         if (!lpni) {
1335                                 rc = -ENOMEM;
1336                                 goto out;
1337                         }
1338                 }
1339         } else {
1340                 lpni = lnet_peer_ni_alloc(nid);
1341                 if (!lpni) {
1342                         rc = -ENOMEM;
1343                         goto out;
1344                 }
1345         }
1346
1347         /*
1348          * Get the peer_net. Check that we're not adding a second
1349          * peer_ni on a peer_net of a non-multi-rail peer.
1350          */
1351         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1352         if (!lpn) {
1353                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1354                 if (!lpn) {
1355                         rc = -ENOMEM;
1356                         goto out_free_lpni;
1357                 }
1358         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1359                 rc = -ENOTUNIQ;
1360                 goto out_free_lpni;
1361         }
1362
1363         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1364
1365 out_free_lpni:
1366         /* If the peer_ni was allocated above its peer_net pointer is NULL */
1367         if (!lpni->lpni_peer_net)
1368                 LIBCFS_FREE(lpni, sizeof(*lpni));
1369 out:
1370         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1371                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1372                flags, rc);
1373         return rc;
1374 }
1375
1376 /*
1377  * Update the primary NID of a peer, if possible.
1378  *
1379  * Call with the lnet_api_mutex held.
1380  */
1381 static int
1382 lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1383 {
1384         lnet_nid_t old = lp->lp_primary_nid;
1385         int rc = 0;
1386
1387         if (lp->lp_primary_nid == nid)
1388                 goto out;
1389         rc = lnet_peer_add_nid(lp, nid, flags);
1390         if (rc)
1391                 goto out;
1392         lp->lp_primary_nid = nid;
1393 out:
1394         CDEBUG(D_NET, "peer %s NID %s: %d\n",
1395                libcfs_nid2str(old), libcfs_nid2str(nid), rc);
1396         return rc;
1397 }
1398
1399 /*
1400  * lpni creation initiated due to traffic either sending or receiving.
1401  */
1402 static int
1403 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1404 {
1405         struct lnet_peer *lp;
1406         struct lnet_peer_net *lpn;
1407         struct lnet_peer_ni *lpni;
1408         unsigned flags = 0;
1409         int rc = 0;
1410
1411         if (nid == LNET_NID_ANY) {
1412                 rc = -EINVAL;
1413                 goto out;
1414         }
1415
1416         /* lnet_net_lock is not needed here because ln_api_lock is held */
1417         lpni = lnet_find_peer_ni_locked(nid);
1418         if (lpni) {
1419                 /*
1420                  * We must have raced with another thread. Since we
1421                  * know next to nothing about a peer_ni created by
1422                  * traffic, we just assume everything is ok and
1423                  * return.
1424                  */
1425                 lnet_peer_ni_decref_locked(lpni);
1426                 goto out;
1427         }
1428
1429         /* Create peer, peer_net, and peer_ni. */
1430         rc = -ENOMEM;
1431         lp = lnet_peer_alloc(nid);
1432         if (!lp)
1433                 goto out;
1434         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1435         if (!lpn)
1436                 goto out_free_lp;
1437         lpni = lnet_peer_ni_alloc(nid);
1438         if (!lpni)
1439                 goto out_free_lpn;
1440         if (pref != LNET_NID_ANY)
1441                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1442
1443         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1444
1445 out_free_lpn:
1446         LIBCFS_FREE(lpn, sizeof(*lpn));
1447 out_free_lp:
1448         LIBCFS_FREE(lp, sizeof(*lp));
1449 out:
1450         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1451         return rc;
1452 }
1453
1454 /*
1455  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1456  *
1457  * This API handles the following combinations:
1458  *   Create a peer with its primary NI if only the prim_nid is provided
1459  *   Add a NID to a peer identified by the prim_nid. The peer identified
1460  *   by the prim_nid must already exist.
1461  *   The peer being created may be non-MR.
1462  *
1463  * The caller must hold ln_api_mutex. This prevents the peer from
1464  * being created/modified/deleted by a different thread.
1465  */
1466 int
1467 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1468 {
1469         struct lnet_peer *lp = NULL;
1470         struct lnet_peer_ni *lpni;
1471         unsigned flags;
1472
1473         /* The prim_nid must always be specified */
1474         if (prim_nid == LNET_NID_ANY)
1475                 return -EINVAL;
1476
1477         flags = LNET_PEER_CONFIGURED;
1478         if (mr)
1479                 flags |= LNET_PEER_MULTI_RAIL;
1480
1481         /*
1482          * If nid isn't specified, we must create a new peer with
1483          * prim_nid as its primary nid.
1484          */
1485         if (nid == LNET_NID_ANY)
1486                 return lnet_peer_add(prim_nid, flags);
1487
1488         /* Look up the prim_nid, which must exist. */
1489         lpni = lnet_find_peer_ni_locked(prim_nid);
1490         if (!lpni)
1491                 return -ENOENT;
1492         lnet_peer_ni_decref_locked(lpni);
1493         lp = lpni->lpni_peer_net->lpn_peer;
1494
1495         /* Peer must have been configured. */
1496         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1497                 CDEBUG(D_NET, "peer %s was not configured\n",
1498                        libcfs_nid2str(prim_nid));
1499                 return -ENOENT;
1500         }
1501
1502         /* Primary NID must match */
1503         if (lp->lp_primary_nid != prim_nid) {
1504                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1505                        libcfs_nid2str(prim_nid),
1506                        libcfs_nid2str(lp->lp_primary_nid));
1507                 return -ENODEV;
1508         }
1509
1510         /* Multi-Rail flag must match. */
1511         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1512                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1513                        libcfs_nid2str(prim_nid));
1514                 return -EPERM;
1515         }
1516
1517         return lnet_peer_add_nid(lp, nid, flags);
1518 }
1519
1520 /*
1521  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1522  *
1523  * This API handles the following combinations:
1524  *   Delete a NI from a peer if both prim_nid and nid are provided.
1525  *   Delete a peer if only prim_nid is provided.
1526  *   Delete a peer if its primary nid is provided.
1527  *
1528  * The caller must hold ln_api_mutex. This prevents the peer from
1529  * being modified/deleted by a different thread.
1530  */
1531 int
1532 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1533 {
1534         struct lnet_peer *lp;
1535         struct lnet_peer_ni *lpni;
1536         unsigned flags;
1537
1538         if (prim_nid == LNET_NID_ANY)
1539                 return -EINVAL;
1540
1541         lpni = lnet_find_peer_ni_locked(prim_nid);
1542         if (!lpni)
1543                 return -ENOENT;
1544         lnet_peer_ni_decref_locked(lpni);
1545         lp = lpni->lpni_peer_net->lpn_peer;
1546
1547         if (prim_nid != lp->lp_primary_nid) {
1548                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1549                        libcfs_nid2str(prim_nid),
1550                        libcfs_nid2str(lp->lp_primary_nid));
1551                 return -ENODEV;
1552         }
1553
1554         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1555                 return lnet_peer_del(lp);
1556
1557         flags = LNET_PEER_CONFIGURED;
1558         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1559                 flags |= LNET_PEER_MULTI_RAIL;
1560
1561         return lnet_peer_del_nid(lp, nid, flags);
1562 }
1563
1564 void
1565 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1566 {
1567         struct lnet_peer_table *ptable;
1568         struct lnet_peer_net *lpn;
1569
1570         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1571
1572         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1573         LASSERT(lpni->lpni_rtr_refcount == 0);
1574         LASSERT(list_empty(&lpni->lpni_txq));
1575         LASSERT(lpni->lpni_txqnob == 0);
1576         LASSERT(list_empty(&lpni->lpni_peer_nis));
1577         LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
1578
1579         lpn = lpni->lpni_peer_net;
1580         lpni->lpni_peer_net = NULL;
1581         lpni->lpni_net = NULL;
1582
1583         /* remove the peer ni from the zombie list */
1584         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1585         spin_lock(&ptable->pt_zombie_lock);
1586         list_del_init(&lpni->lpni_hashlist);
1587         ptable->pt_zombies--;
1588         spin_unlock(&ptable->pt_zombie_lock);
1589
1590         if (lpni->lpni_pref_nnids > 1) {
1591                 LIBCFS_FREE(lpni->lpni_pref.nids,
1592                         sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1593         }
1594         LIBCFS_FREE(lpni, sizeof(*lpni));
1595
1596         lnet_peer_net_decref_locked(lpn);
1597 }
1598
1599 struct lnet_peer_ni *
1600 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1601 {
1602         struct lnet_peer_ni *lpni = NULL;
1603         int rc;
1604
1605         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1606                 return ERR_PTR(-ESHUTDOWN);
1607
1608         /*
1609          * find if a peer_ni already exists.
1610          * If so then just return that.
1611          */
1612         lpni = lnet_find_peer_ni_locked(nid);
1613         if (lpni)
1614                 return lpni;
1615
1616         lnet_net_unlock(cpt);
1617
1618         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1619         if (rc) {
1620                 lpni = ERR_PTR(rc);
1621                 goto out_net_relock;
1622         }
1623
1624         lpni = lnet_find_peer_ni_locked(nid);
1625         LASSERT(lpni);
1626
1627 out_net_relock:
1628         lnet_net_lock(cpt);
1629
1630         return lpni;
1631 }
1632
1633 /*
1634  * Get a peer_ni for the given nid, create it if necessary. Takes a
1635  * hold on the peer_ni.
1636  */
1637 struct lnet_peer_ni *
1638 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1639 {
1640         struct lnet_peer_ni *lpni = NULL;
1641         int rc;
1642
1643         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1644                 return ERR_PTR(-ESHUTDOWN);
1645
1646         /*
1647          * find if a peer_ni already exists.
1648          * If so then just return that.
1649          */
1650         lpni = lnet_find_peer_ni_locked(nid);
1651         if (lpni)
1652                 return lpni;
1653
1654         /*
1655          * Slow path:
1656          * use the lnet_api_mutex to serialize the creation of the peer_ni
1657          * and the creation/deletion of the local ni/net. When a local ni is
1658          * created, if there exists a set of peer_nis on that network,
1659          * they need to be traversed and updated. When a local NI is
1660          * deleted, which could result in a network being deleted, then
1661          * all peer nis on that network need to be removed as well.
1662          *
1663          * Creation through traffic should also be serialized with
1664          * creation through DLC.
1665          */
1666         lnet_net_unlock(cpt);
1667         mutex_lock(&the_lnet.ln_api_mutex);
1668         /*
1669          * Shutdown is only set under the ln_api_lock, so a single
1670          * check here is sufficent.
1671          */
1672         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1673                 lpni = ERR_PTR(-ESHUTDOWN);
1674                 goto out_mutex_unlock;
1675         }
1676
1677         rc = lnet_peer_ni_traffic_add(nid, pref);
1678         if (rc) {
1679                 lpni = ERR_PTR(rc);
1680                 goto out_mutex_unlock;
1681         }
1682
1683         lpni = lnet_find_peer_ni_locked(nid);
1684         LASSERT(lpni);
1685
1686 out_mutex_unlock:
1687         mutex_unlock(&the_lnet.ln_api_mutex);
1688         lnet_net_lock(cpt);
1689
1690         /* Lock has been dropped, check again for shutdown. */
1691         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1692                 if (!IS_ERR(lpni))
1693                         lnet_peer_ni_decref_locked(lpni);
1694                 lpni = ERR_PTR(-ESHUTDOWN);
1695         }
1696
1697         return lpni;
1698 }
1699
1700 /*
1701  * Peer Discovery
1702  */
1703
1704 /*
1705  * Is a peer uptodate from the point of view of discovery?
1706  *
1707  * If it is currently being processed, obviously not.
1708  * A forced Ping or Push is also handled by the discovery thread.
1709  *
1710  * Otherwise look at whether the peer needs rediscovering.
1711  */
1712 bool
1713 lnet_peer_is_uptodate(struct lnet_peer *lp)
1714 {
1715         bool rc;
1716
1717         spin_lock(&lp->lp_lock);
1718         if (lp->lp_state & (LNET_PEER_DISCOVERING |
1719                             LNET_PEER_FORCE_PING |
1720                             LNET_PEER_FORCE_PUSH)) {
1721                 rc = false;
1722         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1723                 rc = true;
1724         } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
1725                 if (lnet_peer_discovery_disabled)
1726                         rc = true;
1727                 else
1728                         rc = false;
1729         } else if (lnet_peer_needs_push(lp)) {
1730                 rc = false;
1731         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1732                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1733                         rc = true;
1734                 else
1735                         rc = false;
1736         } else {
1737                 rc = false;
1738         }
1739         spin_unlock(&lp->lp_lock);
1740
1741         return rc;
1742 }
1743
1744 /*
1745  * Queue a peer for the attention of the discovery thread.  Call with
1746  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1747  * -EALREADY if the peer was already queued.
1748  */
1749 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1750 {
1751         int rc;
1752
1753         spin_lock(&lp->lp_lock);
1754         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1755                 lp->lp_state |= LNET_PEER_DISCOVERING;
1756         spin_unlock(&lp->lp_lock);
1757         if (list_empty(&lp->lp_dc_list)) {
1758                 lnet_peer_addref_locked(lp);
1759                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1760                 wake_up(&the_lnet.ln_dc_waitq);
1761                 rc = 0;
1762         } else {
1763                 rc = -EALREADY;
1764         }
1765
1766         CDEBUG(D_NET, "Queue peer %s: %d\n",
1767                libcfs_nid2str(lp->lp_primary_nid), rc);
1768
1769         return rc;
1770 }
1771
1772 /*
1773  * Discovery of a peer is complete. Wake all waiters on the peer.
1774  * Call with lnet_net_lock/EX held.
1775  */
1776 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1777 {
1778         struct lnet_msg *msg, *tmp;
1779         int rc = 0;
1780         struct list_head pending_msgs;
1781
1782         INIT_LIST_HEAD(&pending_msgs);
1783
1784         CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n",
1785                libcfs_nid2str(lp->lp_primary_nid));
1786
1787         list_del_init(&lp->lp_dc_list);
1788         list_splice_init(&lp->lp_dc_pendq, &pending_msgs);
1789         wake_up_all(&lp->lp_dc_waitq);
1790
1791         lnet_net_unlock(LNET_LOCK_EX);
1792
1793         /* iterate through all pending messages and send them again */
1794         list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
1795                 list_del_init(&msg->msg_list);
1796                 if (lp->lp_dc_error) {
1797                         lnet_finalize(msg, lp->lp_dc_error);
1798                         continue;
1799                 }
1800
1801                 CDEBUG(D_NET, "sending pending message %s to target %s\n",
1802                        lnet_msgtyp2str(msg->msg_type),
1803                        libcfs_id2str(msg->msg_target));
1804                 rc = lnet_send(msg->msg_src_nid_param, msg,
1805                                msg->msg_rtr_nid_param);
1806                 if (rc < 0) {
1807                         CNETERR("Error sending %s to %s: %d\n",
1808                                lnet_msgtyp2str(msg->msg_type),
1809                                libcfs_id2str(msg->msg_target), rc);
1810                         lnet_finalize(msg, rc);
1811                 }
1812         }
1813         lnet_net_lock(LNET_LOCK_EX);
1814         lnet_peer_decref_locked(lp);
1815 }
1816
1817 /*
1818  * Handle inbound push.
1819  * Like any event handler, called with lnet_res_lock/CPT held.
1820  */
1821 void lnet_peer_push_event(struct lnet_event *ev)
1822 {
1823         struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
1824         struct lnet_peer *lp;
1825
1826         /* lnet_find_peer() adds a refcount */
1827         lp = lnet_find_peer(ev->source.nid);
1828         if (!lp) {
1829                 CDEBUG(D_NET, "Push Put from unknown %s (source %s). Ignoring...\n",
1830                        libcfs_nid2str(ev->initiator.nid),
1831                        libcfs_nid2str(ev->source.nid));
1832                 return;
1833         }
1834
1835         /* Ensure peer state remains consistent while we modify it. */
1836         spin_lock(&lp->lp_lock);
1837
1838         /*
1839          * If some kind of error happened the contents of the message
1840          * cannot be used. Clear the NIDS_UPTODATE and set the
1841          * FORCE_PING flag to trigger a ping.
1842          */
1843         if (ev->status) {
1844                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1845                 lp->lp_state |= LNET_PEER_FORCE_PING;
1846                 CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n",
1847                        ev->status,
1848                        libcfs_nid2str(lp->lp_primary_nid),
1849                        libcfs_nid2str(ev->source.nid));
1850                 goto out;
1851         }
1852
1853         /*
1854          * A push with invalid or corrupted info. Clear the UPTODATE
1855          * flag to trigger a ping.
1856          */
1857         if (lnet_ping_info_validate(&pbuf->pb_info)) {
1858                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1859                 lp->lp_state |= LNET_PEER_FORCE_PING;
1860                 CDEBUG(D_NET, "Corrupted Push from %s\n",
1861                        libcfs_nid2str(lp->lp_primary_nid));
1862                 goto out;
1863         }
1864
1865         /*
1866          * Make sure we'll allocate the correct size ping buffer when
1867          * pinging the peer.
1868          */
1869         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
1870                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
1871
1872         /*
1873          * A non-Multi-Rail peer is not supposed to be capable of
1874          * sending a push.
1875          */
1876         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) {
1877                 CERROR("Push from non-Multi-Rail peer %s dropped\n",
1878                        libcfs_nid2str(lp->lp_primary_nid));
1879                 goto out;
1880         }
1881
1882         /*
1883          * Check the MULTIRAIL flag. Complain if the peer was DLC
1884          * configured without it.
1885          */
1886         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1887                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1888                         CERROR("Push says %s is Multi-Rail, DLC says not\n",
1889                                libcfs_nid2str(lp->lp_primary_nid));
1890                 } else {
1891                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1892                         lnet_peer_clr_non_mr_pref_nids(lp);
1893                 }
1894         }
1895
1896         /*
1897          * The peer may have discovery disabled at its end. Set
1898          * NO_DISCOVERY as appropriate.
1899          */
1900         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
1901                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
1902                        libcfs_nid2str(lp->lp_primary_nid));
1903                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
1904         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1905                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
1906                        libcfs_nid2str(lp->lp_primary_nid));
1907                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
1908         }
1909
1910         /*
1911          * Check for truncation of the Put message. Clear the
1912          * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping,
1913          * and tell discovery to allocate a bigger buffer.
1914          */
1915         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
1916                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
1917                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
1918                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1919                 lp->lp_state |= LNET_PEER_FORCE_PING;
1920                 CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n",
1921                        libcfs_nid2str(lp->lp_primary_nid),
1922                        pbuf->pb_info.pi_nnis);
1923                 goto out;
1924         }
1925
1926         /*
1927          * Check whether the Put data is stale. Stale data can just be
1928          * dropped.
1929          */
1930         if (pbuf->pb_info.pi_nnis > 1 &&
1931             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid &&
1932             LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
1933                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1934                        libcfs_nid2str(lp->lp_primary_nid),
1935                        LNET_PING_BUFFER_SEQNO(pbuf),
1936                        lp->lp_peer_seqno);
1937                 goto out;
1938         }
1939
1940         /*
1941          * Check whether the Put data is new, in which case we clear
1942          * the UPTODATE flag and prepare to process it.
1943          *
1944          * If the Put data is current, and the peer is UPTODATE then
1945          * we assome everything is all right and drop the data as
1946          * stale.
1947          */
1948         if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) {
1949                 lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
1950                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1951         } else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) {
1952                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1953                        libcfs_nid2str(lp->lp_primary_nid),
1954                        LNET_PING_BUFFER_SEQNO(pbuf),
1955                        lp->lp_peer_seqno);
1956                 goto out;
1957         }
1958
1959         /*
1960          * If there is data present that hasn't been processed yet,
1961          * we'll replace it if the Put contained newer data and it
1962          * fits. We're racing with a Ping or earlier Push in this
1963          * case.
1964          */
1965         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
1966                 if (LNET_PING_BUFFER_SEQNO(pbuf) >
1967                         LNET_PING_BUFFER_SEQNO(lp->lp_data) &&
1968                     pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) {
1969                         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1970                                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1971                         CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n",
1972                               libcfs_nid2str(lp->lp_primary_nid),
1973                               LNET_PING_BUFFER_SEQNO(pbuf),
1974                               LNET_PING_BUFFER_SEQNO(lp->lp_data));
1975                 }
1976                 goto out;
1977         }
1978
1979         /*
1980          * Allocate a buffer to copy the data. On a failure we drop
1981          * the Push and set FORCE_PING to force the discovery
1982          * thread to fix the problem by pinging the peer.
1983          */
1984         lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC);
1985         if (!lp->lp_data) {
1986                 lp->lp_state |= LNET_PEER_FORCE_PING;
1987                 CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n",
1988                        libcfs_nid2str(lp->lp_primary_nid),
1989                        LNET_PING_BUFFER_SEQNO(pbuf));
1990                 goto out;
1991         }
1992
1993         /* Success */
1994         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1995                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1996         lp->lp_state |= LNET_PEER_DATA_PRESENT;
1997         CDEBUG(D_NET, "Received Push %s %u\n",
1998                libcfs_nid2str(lp->lp_primary_nid),
1999                LNET_PING_BUFFER_SEQNO(pbuf));
2000
2001 out:
2002         /*
2003          * Queue the peer for discovery if not done, force it on the request
2004          * queue and wake the discovery thread if the peer was already queued,
2005          * because its status changed.
2006          */
2007         spin_unlock(&lp->lp_lock);
2008         lnet_net_lock(LNET_LOCK_EX);
2009         if (!lnet_peer_is_uptodate(lp) && lnet_peer_queue_for_discovery(lp)) {
2010                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2011                 wake_up(&the_lnet.ln_dc_waitq);
2012         }
2013         /* Drop refcount from lookup */
2014         lnet_peer_decref_locked(lp);
2015         lnet_net_unlock(LNET_LOCK_EX);
2016 }
2017
2018 /*
2019  * Clear the discovery error state, unless we're already discovering
2020  * this peer, in which case the error is current.
2021  */
2022 static void lnet_peer_clear_discovery_error(struct lnet_peer *lp)
2023 {
2024         spin_lock(&lp->lp_lock);
2025         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
2026                 lp->lp_dc_error = 0;
2027         spin_unlock(&lp->lp_lock);
2028 }
2029
2030 /*
2031  * Peer discovery slow path. The ln_api_mutex is held on entry, and
2032  * dropped/retaken within this function. An lnet_peer_ni is passed in
2033  * because discovery could tear down an lnet_peer.
2034  */
2035 int
2036 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block)
2037 {
2038         DEFINE_WAIT(wait);
2039         struct lnet_peer *lp;
2040         int rc = 0;
2041
2042 again:
2043         lnet_net_unlock(cpt);
2044         lnet_net_lock(LNET_LOCK_EX);
2045         lp = lpni->lpni_peer_net->lpn_peer;
2046         lnet_peer_clear_discovery_error(lp);
2047
2048         /*
2049          * We're willing to be interrupted. The lpni can become a
2050          * zombie if we race with DLC, so we must check for that.
2051          */
2052         for (;;) {
2053                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
2054                 if (signal_pending(current))
2055                         break;
2056                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2057                         break;
2058                 if (lp->lp_dc_error)
2059                         break;
2060                 if (lnet_peer_is_uptodate(lp))
2061                         break;
2062                 lnet_peer_queue_for_discovery(lp);
2063                 /*
2064                  * if caller requested a non-blocking operation then
2065                  * return immediately. Once discovery is complete then the
2066                  * peer ref will be decremented and any pending messages
2067                  * that were stopped due to discovery will be transmitted.
2068                  */
2069                 if (!block)
2070                         break;
2071
2072                 lnet_peer_addref_locked(lp);
2073                 lnet_net_unlock(LNET_LOCK_EX);
2074                 schedule();
2075                 finish_wait(&lp->lp_dc_waitq, &wait);
2076                 lnet_net_lock(LNET_LOCK_EX);
2077                 lnet_peer_decref_locked(lp);
2078                 /* Peer may have changed */
2079                 lp = lpni->lpni_peer_net->lpn_peer;
2080         }
2081         finish_wait(&lp->lp_dc_waitq, &wait);
2082
2083         lnet_net_unlock(LNET_LOCK_EX);
2084         lnet_net_lock(cpt);
2085
2086         /*
2087          * If the peer has changed after we've discovered the older peer,
2088          * then we need to discovery the new peer to make sure the
2089          * interface information is up to date
2090          */
2091         if (lp != lpni->lpni_peer_net->lpn_peer)
2092                 goto again;
2093
2094         if (signal_pending(current))
2095                 rc = -EINTR;
2096         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2097                 rc = -ESHUTDOWN;
2098         else if (lp->lp_dc_error)
2099                 rc = lp->lp_dc_error;
2100         else if (!block)
2101                 CDEBUG(D_NET, "non-blocking discovery\n");
2102         else if (!lnet_peer_is_uptodate(lp))
2103                 goto again;
2104
2105         CDEBUG(D_NET, "peer %s NID %s: %d. %s\n",
2106                (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"),
2107                libcfs_nid2str(lpni->lpni_nid), rc,
2108                (!block) ? "pending discovery" : "discovery complete");
2109
2110         return rc;
2111 }
2112
2113 /* Handle an incoming ack for a push. */
2114 static void
2115 lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev)
2116 {
2117         struct lnet_ping_buffer *pbuf;
2118
2119         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2120         spin_lock(&lp->lp_lock);
2121         lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2122         lp->lp_push_error = ev->status;
2123         if (ev->status)
2124                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2125         else
2126                 lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2127         spin_unlock(&lp->lp_lock);
2128
2129         CDEBUG(D_NET, "peer %s ev->status %d\n",
2130                libcfs_nid2str(lp->lp_primary_nid), ev->status);
2131 }
2132
2133 /* Handle a Reply message. This is the reply to a Ping message. */
2134 static void
2135 lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
2136 {
2137         struct lnet_ping_buffer *pbuf;
2138         int rc;
2139
2140         spin_lock(&lp->lp_lock);
2141
2142         /*
2143          * If some kind of error happened the contents of message
2144          * cannot be used. Set PING_FAILED to trigger a retry.
2145          */
2146         if (ev->status) {
2147                 lp->lp_state |= LNET_PEER_PING_FAILED;
2148                 lp->lp_ping_error = ev->status;
2149                 CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n",
2150                        ev->status,
2151                        libcfs_nid2str(lp->lp_primary_nid),
2152                        libcfs_nid2str(ev->source.nid));
2153                 goto out;
2154         }
2155
2156         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2157         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
2158                 lnet_swap_pinginfo(pbuf);
2159
2160         /*
2161          * A reply with invalid or corrupted info. Set PING_FAILED to
2162          * trigger a retry.
2163          */
2164         rc = lnet_ping_info_validate(&pbuf->pb_info);
2165         if (rc) {
2166                 lp->lp_state |= LNET_PEER_PING_FAILED;
2167                 lp->lp_ping_error = 0;
2168                 CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n",
2169                        libcfs_nid2str(lp->lp_primary_nid), rc);
2170                 goto out;
2171         }
2172
2173         /*
2174          * Update the MULTI_RAIL flag based on the reply. If the peer
2175          * was configured with DLC then the setting should match what
2176          * DLC put in.
2177          */
2178         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
2179                 if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2180                         /* Everything's fine */
2181                 } else if (lp->lp_state & LNET_PEER_CONFIGURED) {
2182                         CWARN("Reply says %s is Multi-Rail, DLC says not\n",
2183                               libcfs_nid2str(lp->lp_primary_nid));
2184                 } else {
2185                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
2186                         lnet_peer_clr_non_mr_pref_nids(lp);
2187                 }
2188         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2189                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
2190                         CWARN("DLC says %s is Multi-Rail, Reply says not\n",
2191                               libcfs_nid2str(lp->lp_primary_nid));
2192                 } else {
2193                         CERROR("Multi-Rail state vanished from %s\n",
2194                                libcfs_nid2str(lp->lp_primary_nid));
2195                         lp->lp_state &= ~LNET_PEER_MULTI_RAIL;
2196                 }
2197         }
2198
2199         /*
2200          * Make sure we'll allocate the correct size ping buffer when
2201          * pinging the peer.
2202          */
2203         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
2204                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
2205
2206         /*
2207          * The peer may have discovery disabled at its end. Set
2208          * NO_DISCOVERY as appropriate.
2209          */
2210         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
2211                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
2212                        libcfs_nid2str(lp->lp_primary_nid));
2213                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
2214         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
2215                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
2216                        libcfs_nid2str(lp->lp_primary_nid));
2217                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
2218         }
2219
2220         /*
2221          * Check for truncation of the Reply. Clear PING_SENT and set
2222          * PING_FAILED to trigger a retry.
2223          */
2224         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
2225                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
2226                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
2227                 lp->lp_state |= LNET_PEER_PING_FAILED;
2228                 lp->lp_ping_error = 0;
2229                 CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n",
2230                        libcfs_nid2str(lp->lp_primary_nid),
2231                        pbuf->pb_info.pi_nnis);
2232                 goto out;
2233         }
2234
2235         /*
2236          * Check the sequence numbers in the reply. These are only
2237          * available if the reply came from a Multi-Rail peer.
2238          */
2239         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
2240             pbuf->pb_info.pi_nnis > 1 &&
2241             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
2242                 if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
2243                         CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n",
2244                                 libcfs_nid2str(lp->lp_primary_nid),
2245                                 LNET_PING_BUFFER_SEQNO(pbuf),
2246                                 lp->lp_peer_seqno);
2247                         goto out;
2248                 }
2249
2250                 if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno)
2251                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2252         }
2253
2254         /* We're happy with the state of the data in the buffer. */
2255         CDEBUG(D_NET, "peer %s data present %u\n",
2256                libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno);
2257         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
2258                 lnet_ping_buffer_decref(lp->lp_data);
2259         else
2260                 lp->lp_state |= LNET_PEER_DATA_PRESENT;
2261         lnet_ping_buffer_addref(pbuf);
2262         lp->lp_data = pbuf;
2263 out:
2264         lp->lp_state &= ~LNET_PEER_PING_SENT;
2265         spin_unlock(&lp->lp_lock);
2266 }
2267
2268 /*
2269  * Send event handling. Only matters for error cases, where we clean
2270  * up state on the peer and peer_ni that would otherwise be updated in
2271  * the REPLY event handler for a successful Ping, and the ACK event
2272  * handler for a successful Push.
2273  */
2274 static int
2275 lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev)
2276 {
2277         int rc = 0;
2278
2279         if (!ev->status)
2280                 goto out;
2281
2282         spin_lock(&lp->lp_lock);
2283         if (ev->msg_type == LNET_MSG_GET) {
2284                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2285                 lp->lp_state |= LNET_PEER_PING_FAILED;
2286                 lp->lp_ping_error = ev->status;
2287         } else { /* ev->msg_type == LNET_MSG_PUT */
2288                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2289                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2290                 lp->lp_push_error = ev->status;
2291         }
2292         spin_unlock(&lp->lp_lock);
2293         rc = LNET_REDISCOVER_PEER;
2294 out:
2295         CDEBUG(D_NET, "%s Send to %s: %d\n",
2296                 (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"),
2297                 libcfs_nid2str(ev->target.nid), rc);
2298         return rc;
2299 }
2300
2301 /*
2302  * Unlink event handling. This event is only seen if a call to
2303  * LNetMDUnlink() caused the event to be unlinked. If this call was
2304  * made after the event was set up in LNetGet() or LNetPut() then we
2305  * assume the Ping or Push timed out.
2306  */
2307 static void
2308 lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev)
2309 {
2310         spin_lock(&lp->lp_lock);
2311         /* We've passed through LNetGet() */
2312         if (lp->lp_state & LNET_PEER_PING_SENT) {
2313                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2314                 lp->lp_state |= LNET_PEER_PING_FAILED;
2315                 lp->lp_ping_error = -ETIMEDOUT;
2316                 CDEBUG(D_NET, "Ping Unlink for message to peer %s\n",
2317                         libcfs_nid2str(lp->lp_primary_nid));
2318         }
2319         /* We've passed through LNetPut() */
2320         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2321                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2322                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2323                 lp->lp_push_error = -ETIMEDOUT;
2324                 CDEBUG(D_NET, "Push Unlink for message to peer %s\n",
2325                         libcfs_nid2str(lp->lp_primary_nid));
2326         }
2327         spin_unlock(&lp->lp_lock);
2328 }
2329
2330 /*
2331  * Event handler for the discovery EQ.
2332  *
2333  * Called with lnet_res_lock(cpt) held. The cpt is the
2334  * lnet_cpt_of_cookie() of the md handle cookie.
2335  */
2336 static void lnet_discovery_event_handler(struct lnet_event *event)
2337 {
2338         struct lnet_peer *lp = event->md.user_ptr;
2339         struct lnet_ping_buffer *pbuf;
2340         int rc;
2341
2342         /* discovery needs to take another look */
2343         rc = LNET_REDISCOVER_PEER;
2344
2345         CDEBUG(D_NET, "Received event: %d\n", event->type);
2346
2347         switch (event->type) {
2348         case LNET_EVENT_ACK:
2349                 lnet_discovery_event_ack(lp, event);
2350                 break;
2351         case LNET_EVENT_REPLY:
2352                 lnet_discovery_event_reply(lp, event);
2353                 break;
2354         case LNET_EVENT_SEND:
2355                 /* Only send failure triggers a retry. */
2356                 rc = lnet_discovery_event_send(lp, event);
2357                 break;
2358         case LNET_EVENT_UNLINK:
2359                 /* LNetMDUnlink() was called */
2360                 lnet_discovery_event_unlink(lp, event);
2361                 break;
2362         default:
2363                 /* Invalid events. */
2364                 LBUG();
2365         }
2366         lnet_net_lock(LNET_LOCK_EX);
2367         if (event->unlinked) {
2368                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
2369                 lnet_ping_buffer_decref(pbuf);
2370                 lnet_peer_decref_locked(lp);
2371         }
2372
2373         /* put peer back at end of request queue, if discovery not already
2374          * done */
2375         if (rc == LNET_REDISCOVER_PEER && !lnet_peer_is_uptodate(lp)) {
2376                 list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2377                 wake_up(&the_lnet.ln_dc_waitq);
2378         }
2379         lnet_net_unlock(LNET_LOCK_EX);
2380 }
2381
2382 /*
2383  * Build a peer from incoming data.
2384  *
2385  * The NIDs in the incoming data are supposed to be structured as follows:
2386  *  - loopback
2387  *  - primary NID
2388  *  - other NIDs in same net
2389  *  - NIDs in second net
2390  *  - NIDs in third net
2391  *  - ...
2392  * This due to the way the list of NIDs in the data is created.
2393  *
2394  * Note that this function will mark the peer uptodate unless an
2395  * ENOMEM is encontered. All other errors are due to a conflict
2396  * between the DLC configuration and what discovery sees. We treat DLC
2397  * as binding, and therefore set the NIDS_UPTODATE flag to prevent the
2398  * peer from becoming stuck in discovery.
2399  */
2400 static int lnet_peer_merge_data(struct lnet_peer *lp,
2401                                 struct lnet_ping_buffer *pbuf)
2402 {
2403         struct lnet_peer_ni *lpni;
2404         lnet_nid_t *curnis = NULL;
2405         lnet_nid_t *addnis = NULL;
2406         lnet_nid_t *delnis = NULL;
2407         unsigned flags;
2408         int ncurnis;
2409         int naddnis;
2410         int ndelnis;
2411         int nnis = 0;
2412         int i;
2413         int j;
2414         int rc;
2415
2416         flags = LNET_PEER_DISCOVERED;
2417         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2418                 flags |= LNET_PEER_MULTI_RAIL;
2419
2420         nnis = MAX(lp->lp_nnis, pbuf->pb_info.pi_nnis);
2421         LIBCFS_ALLOC(curnis, nnis * sizeof(lnet_nid_t));
2422         LIBCFS_ALLOC(addnis, nnis * sizeof(lnet_nid_t));
2423         LIBCFS_ALLOC(delnis, nnis * sizeof(lnet_nid_t));
2424         if (!curnis || !addnis || !delnis) {
2425                 rc = -ENOMEM;
2426                 goto out;
2427         }
2428         ncurnis = 0;
2429         naddnis = 0;
2430         ndelnis = 0;
2431
2432         /* Construct the list of NIDs present in peer. */
2433         lpni = NULL;
2434         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
2435                 curnis[ncurnis++] = lpni->lpni_nid;
2436
2437         /*
2438          * Check for NIDs in pbuf not present in curnis[].
2439          * The loop starts at 1 to skip the loopback NID.
2440          */
2441         for (i = 1; i < pbuf->pb_info.pi_nnis; i++) {
2442                 for (j = 0; j < ncurnis; j++)
2443                         if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
2444                                 break;
2445                 if (j == ncurnis)
2446                         addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid;
2447         }
2448         /*
2449          * Check for NIDs in curnis[] not present in pbuf.
2450          * The nested loop starts at 1 to skip the loopback NID.
2451          *
2452          * But never add the loopback NID to delnis[]: if it is
2453          * present in curnis[] then this peer is for this node.
2454          */
2455         for (i = 0; i < ncurnis; i++) {
2456                 if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND)
2457                         continue;
2458                 for (j = 1; j < pbuf->pb_info.pi_nnis; j++)
2459                         if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid)
2460                                 break;
2461                 if (j == pbuf->pb_info.pi_nnis)
2462                         delnis[ndelnis++] = curnis[i];
2463         }
2464
2465         for (i = 0; i < naddnis; i++) {
2466                 rc = lnet_peer_add_nid(lp, addnis[i], flags);
2467                 if (rc) {
2468                         CERROR("Error adding NID %s to peer %s: %d\n",
2469                                libcfs_nid2str(addnis[i]),
2470                                libcfs_nid2str(lp->lp_primary_nid), rc);
2471                         if (rc == -ENOMEM)
2472                                 goto out;
2473                 }
2474         }
2475         for (i = 0; i < ndelnis; i++) {
2476                 rc = lnet_peer_del_nid(lp, delnis[i], flags);
2477                 if (rc) {
2478                         CERROR("Error deleting NID %s from peer %s: %d\n",
2479                                libcfs_nid2str(delnis[i]),
2480                                libcfs_nid2str(lp->lp_primary_nid), rc);
2481                         if (rc == -ENOMEM)
2482                                 goto out;
2483                 }
2484         }
2485         /*
2486          * Errors other than -ENOMEM are due to peers having been
2487          * configured with DLC. Ignore these because DLC overrides
2488          * Discovery.
2489          */
2490         rc = 0;
2491 out:
2492         LIBCFS_FREE(curnis, nnis * sizeof(lnet_nid_t));
2493         LIBCFS_FREE(addnis, nnis * sizeof(lnet_nid_t));
2494         LIBCFS_FREE(delnis, nnis * sizeof(lnet_nid_t));
2495         lnet_ping_buffer_decref(pbuf);
2496         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2497
2498         if (rc) {
2499                 spin_lock(&lp->lp_lock);
2500                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2501                 lp->lp_state |= LNET_PEER_FORCE_PING;
2502                 spin_unlock(&lp->lp_lock);
2503         }
2504         return rc;
2505 }
2506
2507 /*
2508  * The data in pbuf says lp is its primary peer, but the data was
2509  * received by a different peer. Try to update lp with the data.
2510  */
2511 static int
2512 lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
2513 {
2514         struct lnet_handle_md mdh;
2515
2516         /* Queue lp for discovery, and force it on the request queue. */
2517         lnet_net_lock(LNET_LOCK_EX);
2518         if (lnet_peer_queue_for_discovery(lp))
2519                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2520         lnet_net_unlock(LNET_LOCK_EX);
2521
2522         LNetInvalidateMDHandle(&mdh);
2523
2524         /*
2525          * Decide whether we can move the peer to the DATA_PRESENT state.
2526          *
2527          * We replace stale data for a multi-rail peer, repair PING_FAILED
2528          * status, and preempt FORCE_PING.
2529          *
2530          * If after that we have DATA_PRESENT, we merge it into this peer.
2531          */
2532         spin_lock(&lp->lp_lock);
2533         if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2534                 if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) {
2535                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2536                 } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2537                         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2538                         lnet_ping_buffer_decref(pbuf);
2539                         pbuf = lp->lp_data;
2540                         lp->lp_data = NULL;
2541                 }
2542         }
2543         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2544                 lnet_ping_buffer_decref(lp->lp_data);
2545                 lp->lp_data = NULL;
2546                 lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2547         }
2548         if (lp->lp_state & LNET_PEER_PING_FAILED) {
2549                 mdh = lp->lp_ping_mdh;
2550                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2551                 lp->lp_state &= ~LNET_PEER_PING_FAILED;
2552                 lp->lp_ping_error = 0;
2553         }
2554         if (lp->lp_state & LNET_PEER_FORCE_PING)
2555                 lp->lp_state &= ~LNET_PEER_FORCE_PING;
2556         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2557         spin_unlock(&lp->lp_lock);
2558
2559         if (!LNetMDHandleIsInvalid(mdh))
2560                 LNetMDUnlink(mdh);
2561
2562         if (pbuf)
2563                 return lnet_peer_merge_data(lp, pbuf);
2564
2565         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2566         return 0;
2567 }
2568
2569 /*
2570  * Update a peer using the data received.
2571  */
2572 static int lnet_peer_data_present(struct lnet_peer *lp)
2573 __must_hold(&lp->lp_lock)
2574 {
2575         struct lnet_ping_buffer *pbuf;
2576         struct lnet_peer_ni *lpni;
2577         lnet_nid_t nid = LNET_NID_ANY;
2578         unsigned flags;
2579         int rc = 0;
2580
2581         pbuf = lp->lp_data;
2582         lp->lp_data = NULL;
2583         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2584         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2585         spin_unlock(&lp->lp_lock);
2586
2587         /*
2588          * Modifications of peer structures are done while holding the
2589          * ln_api_mutex. A global lock is required because we may be
2590          * modifying multiple peer structures, and a mutex greatly
2591          * simplifies memory management.
2592          *
2593          * The actual changes to the data structures must also protect
2594          * against concurrent lookups, for which the lnet_net_lock in
2595          * LNET_LOCK_EX mode is used.
2596          */
2597         mutex_lock(&the_lnet.ln_api_mutex);
2598         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
2599                 rc = -ESHUTDOWN;
2600                 goto out;
2601         }
2602
2603         /*
2604          * If this peer is not on the peer list then it is being torn
2605          * down, and our reference count may be all that is keeping it
2606          * alive. Don't do any work on it.
2607          */
2608         if (list_empty(&lp->lp_peer_list))
2609                 goto out;
2610
2611         flags = LNET_PEER_DISCOVERED;
2612         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2613                 flags |= LNET_PEER_MULTI_RAIL;
2614
2615         /*
2616          * Check whether the primary NID in the message matches the
2617          * primary NID of the peer. If it does, update the peer, if
2618          * it it does not, check whether there is already a peer with
2619          * that primary NID. If no such peer exists, try to update
2620          * the primary NID of the current peer (allowed if it was
2621          * created due to message traffic) and complete the update.
2622          * If the peer did exist, hand off the data to it.
2623          *
2624          * The peer for the loopback interface is a special case: this
2625          * is the peer for the local node, and we want to set its
2626          * primary NID to the correct value here. Moreover, this peer
2627          * can show up with only the loopback NID in the ping buffer.
2628          */
2629         if (pbuf->pb_info.pi_nnis <= 1)
2630                 goto out;
2631         nid = pbuf->pb_info.pi_ni[1].ns_nid;
2632         if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) {
2633                 rc = lnet_peer_set_primary_nid(lp, nid, flags);
2634                 if (!rc)
2635                         rc = lnet_peer_merge_data(lp, pbuf);
2636         } else if (lp->lp_primary_nid == nid) {
2637                 rc = lnet_peer_merge_data(lp, pbuf);
2638         } else {
2639                 lpni = lnet_find_peer_ni_locked(nid);
2640                 if (!lpni) {
2641                         rc = lnet_peer_set_primary_nid(lp, nid, flags);
2642                         if (rc) {
2643                                 CERROR("Primary NID error %s versus %s: %d\n",
2644                                        libcfs_nid2str(lp->lp_primary_nid),
2645                                        libcfs_nid2str(nid), rc);
2646                         } else {
2647                                 rc = lnet_peer_merge_data(lp, pbuf);
2648                         }
2649                 } else {
2650                         rc = lnet_peer_set_primary_data(
2651                                 lpni->lpni_peer_net->lpn_peer, pbuf);
2652                         lnet_peer_ni_decref_locked(lpni);
2653                 }
2654         }
2655 out:
2656         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2657         mutex_unlock(&the_lnet.ln_api_mutex);
2658
2659         spin_lock(&lp->lp_lock);
2660         /* Tell discovery to re-check the peer immediately. */
2661         if (!rc)
2662                 rc = LNET_REDISCOVER_PEER;
2663         return rc;
2664 }
2665
2666 /*
2667  * A ping failed. Clear the PING_FAILED state and set the
2668  * FORCE_PING state, to ensure a retry even if discovery is
2669  * disabled. This avoids being left with incorrect state.
2670  */
2671 static int lnet_peer_ping_failed(struct lnet_peer *lp)
2672 __must_hold(&lp->lp_lock)
2673 {
2674         struct lnet_handle_md mdh;
2675         int rc;
2676
2677         mdh = lp->lp_ping_mdh;
2678         LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2679         lp->lp_state &= ~LNET_PEER_PING_FAILED;
2680         lp->lp_state |= LNET_PEER_FORCE_PING;
2681         rc = lp->lp_ping_error;
2682         lp->lp_ping_error = 0;
2683         spin_unlock(&lp->lp_lock);
2684
2685         if (!LNetMDHandleIsInvalid(mdh))
2686                 LNetMDUnlink(mdh);
2687
2688         CDEBUG(D_NET, "peer %s:%d\n",
2689                libcfs_nid2str(lp->lp_primary_nid), rc);
2690
2691         spin_lock(&lp->lp_lock);
2692         return rc ? rc : LNET_REDISCOVER_PEER;
2693 }
2694
2695 /*
2696  * Select NID to send a Ping or Push to.
2697  */
2698 static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
2699 {
2700         struct lnet_peer_ni *lpni;
2701
2702         /* Look for a direct-connected NID for this peer. */
2703         lpni = NULL;
2704         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2705                 if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
2706                         continue;
2707                 break;
2708         }
2709         if (lpni)
2710                 return lpni->lpni_nid;
2711
2712         /* Look for a routed-connected NID for this peer. */
2713         lpni = NULL;
2714         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2715                 if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
2716                         continue;
2717                 break;
2718         }
2719         if (lpni)
2720                 return lpni->lpni_nid;
2721
2722         return LNET_NID_ANY;
2723 }
2724
2725 /* Active side of ping. */
2726 static int lnet_peer_send_ping(struct lnet_peer *lp)
2727 __must_hold(&lp->lp_lock)
2728 {
2729         lnet_nid_t pnid;
2730         int nnis;
2731         int rc;
2732         int cpt;
2733
2734         lp->lp_state |= LNET_PEER_PING_SENT;
2735         lp->lp_state &= ~LNET_PEER_FORCE_PING;
2736         spin_unlock(&lp->lp_lock);
2737
2738         cpt = lnet_net_lock_current();
2739         /* Refcount for MD. */
2740         lnet_peer_addref_locked(lp);
2741         pnid = lnet_peer_select_nid(lp);
2742         lnet_net_unlock(cpt);
2743
2744         nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN);
2745
2746         rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp,
2747                             the_lnet.ln_dc_eqh, false);
2748
2749         /*
2750          * if LNetMDBind in lnet_send_ping fails we need to decrement the
2751          * refcount on the peer, otherwise LNetMDUnlink will be called
2752          * which will eventually do that.
2753          */
2754         if (rc > 0) {
2755                 lnet_net_lock(cpt);
2756                 lnet_peer_decref_locked(lp);
2757                 lnet_net_unlock(cpt);
2758                 rc = -rc; /* change the rc to negative value */
2759                 goto fail_error;
2760         } else if (rc < 0) {
2761                 goto fail_error;
2762         }
2763
2764         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2765
2766         spin_lock(&lp->lp_lock);
2767         return 0;
2768
2769 fail_error:
2770         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2771         /*
2772          * The errors that get us here are considered hard errors and
2773          * cause Discovery to terminate. So we clear PING_SENT, but do
2774          * not set either PING_FAILED or FORCE_PING. In fact we need
2775          * to clear PING_FAILED, because the unlink event handler will
2776          * have set it if we called LNetMDUnlink() above.
2777          */
2778         spin_lock(&lp->lp_lock);
2779         lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED);
2780         return rc;
2781 }
2782
2783 /*
2784  * This function exists because you cannot call LNetMDUnlink() from an
2785  * event handler.
2786  */
2787 static int lnet_peer_push_failed(struct lnet_peer *lp)
2788 __must_hold(&lp->lp_lock)
2789 {
2790         struct lnet_handle_md mdh;
2791         int rc;
2792
2793         mdh = lp->lp_push_mdh;
2794         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2795         lp->lp_state &= ~LNET_PEER_PUSH_FAILED;
2796         rc = lp->lp_push_error;
2797         lp->lp_push_error = 0;
2798         spin_unlock(&lp->lp_lock);
2799
2800         if (!LNetMDHandleIsInvalid(mdh))
2801                 LNetMDUnlink(mdh);
2802
2803         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2804         spin_lock(&lp->lp_lock);
2805         return rc ? rc : LNET_REDISCOVER_PEER;
2806 }
2807
2808 /* Active side of push. */
2809 static int lnet_peer_send_push(struct lnet_peer *lp)
2810 __must_hold(&lp->lp_lock)
2811 {
2812         struct lnet_ping_buffer *pbuf;
2813         struct lnet_process_id id;
2814         struct lnet_md md;
2815         int cpt;
2816         int rc;
2817
2818         /* Don't push to a non-multi-rail peer. */
2819         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
2820                 lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2821                 return 0;
2822         }
2823
2824         lp->lp_state |= LNET_PEER_PUSH_SENT;
2825         lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2826         spin_unlock(&lp->lp_lock);
2827
2828         cpt = lnet_net_lock_current();
2829         pbuf = the_lnet.ln_ping_target;
2830         lnet_ping_buffer_addref(pbuf);
2831         lnet_net_unlock(cpt);
2832
2833         /* Push source MD */
2834         md.start     = &pbuf->pb_info;
2835         md.length    = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
2836         md.threshold = 2; /* Put/Ack */
2837         md.max_size  = 0;
2838         md.options   = 0;
2839         md.eq_handle = the_lnet.ln_dc_eqh;
2840         md.user_ptr  = lp;
2841
2842         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
2843         if (rc) {
2844                 lnet_ping_buffer_decref(pbuf);
2845                 CERROR("Can't bind push source MD: %d\n", rc);
2846                 goto fail_error;
2847         }
2848         cpt = lnet_net_lock_current();
2849         /* Refcount for MD. */
2850         lnet_peer_addref_locked(lp);
2851         id.pid = LNET_PID_LUSTRE;
2852         id.nid = lnet_peer_select_nid(lp);
2853         lnet_net_unlock(cpt);
2854
2855         if (id.nid == LNET_NID_ANY) {
2856                 rc = -EHOSTUNREACH;
2857                 goto fail_unlink;
2858         }
2859
2860         rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh,
2861                      LNET_ACK_REQ, id, LNET_RESERVED_PORTAL,
2862                      LNET_PROTO_PING_MATCHBITS, 0, 0);
2863
2864         if (rc)
2865                 goto fail_unlink;
2866
2867         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2868
2869         spin_lock(&lp->lp_lock);
2870         return 0;
2871
2872 fail_unlink:
2873         LNetMDUnlink(lp->lp_push_mdh);
2874         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2875 fail_error:
2876         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2877         /*
2878          * The errors that get us here are considered hard errors and
2879          * cause Discovery to terminate. So we clear PUSH_SENT, but do
2880          * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED,
2881          * because the unlink event handler will have set it if we
2882          * called LNetMDUnlink() above.
2883          */
2884         spin_lock(&lp->lp_lock);
2885         lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED);
2886         return rc;
2887 }
2888
2889 /*
2890  * An unrecoverable error was encountered during discovery.
2891  * Set error status in peer and abort discovery.
2892  */
2893 static void lnet_peer_discovery_error(struct lnet_peer *lp, int error)
2894 {
2895         CDEBUG(D_NET, "Discovery error %s: %d\n",
2896                libcfs_nid2str(lp->lp_primary_nid), error);
2897
2898         spin_lock(&lp->lp_lock);
2899         lp->lp_dc_error = error;
2900         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2901         lp->lp_state |= LNET_PEER_REDISCOVER;
2902         spin_unlock(&lp->lp_lock);
2903 }
2904
2905 /*
2906  * Mark the peer as discovered.
2907  */
2908 static int lnet_peer_discovered(struct lnet_peer *lp)
2909 __must_hold(&lp->lp_lock)
2910 {
2911         lp->lp_state |= LNET_PEER_DISCOVERED;
2912         lp->lp_state &= ~(LNET_PEER_DISCOVERING |
2913                           LNET_PEER_REDISCOVER);
2914
2915         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2916
2917         return 0;
2918 }
2919
2920 /*
2921  * Mark the peer as to be rediscovered.
2922  */
2923 static int lnet_peer_rediscover(struct lnet_peer *lp)
2924 __must_hold(&lp->lp_lock)
2925 {
2926         lp->lp_state |= LNET_PEER_REDISCOVER;
2927         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2928
2929         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2930
2931         return 0;
2932 }
2933
2934 /*
2935  * Discovering this peer is taking too long. Cancel any Ping or Push
2936  * that discovery is waiting on by unlinking the relevant MDs. The
2937  * lnet_discovery_event_handler() will proceed from here and complete
2938  * the cleanup.
2939  */
2940 static void lnet_peer_cancel_discovery(struct lnet_peer *lp)
2941 {
2942         struct lnet_handle_md ping_mdh;
2943         struct lnet_handle_md push_mdh;
2944
2945         LNetInvalidateMDHandle(&ping_mdh);
2946         LNetInvalidateMDHandle(&push_mdh);
2947
2948         spin_lock(&lp->lp_lock);
2949         if (lp->lp_state & LNET_PEER_PING_SENT) {
2950                 ping_mdh = lp->lp_ping_mdh;
2951                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2952         }
2953         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2954                 push_mdh = lp->lp_push_mdh;
2955                 LNetInvalidateMDHandle(&lp->lp_push_mdh);
2956         }
2957         spin_unlock(&lp->lp_lock);
2958
2959         if (!LNetMDHandleIsInvalid(ping_mdh))
2960                 LNetMDUnlink(ping_mdh);
2961         if (!LNetMDHandleIsInvalid(push_mdh))
2962                 LNetMDUnlink(push_mdh);
2963 }
2964
2965 /*
2966  * Wait for work to be queued or some other change that must be
2967  * attended to. Returns non-zero if the discovery thread should shut
2968  * down.
2969  */
2970 static int lnet_peer_discovery_wait_for_work(void)
2971 {
2972         int cpt;
2973         int rc = 0;
2974
2975         DEFINE_WAIT(wait);
2976
2977         cpt = lnet_net_lock_current();
2978         for (;;) {
2979                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
2980                                 TASK_INTERRUPTIBLE);
2981                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
2982                         break;
2983                 if (lnet_push_target_resize_needed())
2984                         break;
2985                 if (!list_empty(&the_lnet.ln_dc_request))
2986                         break;
2987                 if (!list_empty(&the_lnet.ln_msg_resend))
2988                         break;
2989                 lnet_net_unlock(cpt);
2990
2991                 /*
2992                  * wakeup max every second to check if there are peers that
2993                  * have been stuck on the working queue for greater than
2994                  * the peer timeout.
2995                  */
2996                 schedule_timeout(cfs_time_seconds(1));
2997                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
2998                 cpt = lnet_net_lock_current();
2999         }
3000         finish_wait(&the_lnet.ln_dc_waitq, &wait);
3001
3002         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3003                 rc = -ESHUTDOWN;
3004
3005         lnet_net_unlock(cpt);
3006
3007         CDEBUG(D_NET, "woken: %d\n", rc);
3008
3009         return rc;
3010 }
3011
3012 /*
3013  * Messages that were pending on a destroyed peer will be put on a global
3014  * resend list. The message resend list will be checked by
3015  * the discovery thread when it wakes up, and will resend messages. These
3016  * messages can still be sendable in the case the lpni which was the initial
3017  * cause of the message re-queue was transfered to another peer.
3018  *
3019  * It is possible that LNet could be shutdown while we're iterating
3020  * through the list. lnet_shudown_lndnets() will attempt to access the
3021  * resend list, but will have to wait until the spinlock is released, by
3022  * which time there shouldn't be any more messages on the resend list.
3023  * During shutdown lnet_send() will fail and lnet_finalize() will be called
3024  * for the messages so they can be released. The other case is that
3025  * lnet_shudown_lndnets() can finalize all the messages before this
3026  * function can visit the resend list, in which case this function will be
3027  * a no-op.
3028  */
3029 static void lnet_resend_msgs(void)
3030 {
3031         struct lnet_msg *msg, *tmp;
3032         struct list_head resend;
3033         int rc;
3034
3035         INIT_LIST_HEAD(&resend);
3036
3037         spin_lock(&the_lnet.ln_msg_resend_lock);
3038         list_splice(&the_lnet.ln_msg_resend, &resend);
3039         spin_unlock(&the_lnet.ln_msg_resend_lock);
3040
3041         list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
3042                 list_del_init(&msg->msg_list);
3043                 rc = lnet_send(msg->msg_src_nid_param, msg,
3044                                msg->msg_rtr_nid_param);
3045                 if (rc < 0) {
3046                         CNETERR("Error sending %s to %s: %d\n",
3047                                lnet_msgtyp2str(msg->msg_type),
3048                                libcfs_id2str(msg->msg_target), rc);
3049                         lnet_finalize(msg, rc);
3050                 }
3051         }
3052 }
3053
3054 /* The discovery thread. */
3055 static int lnet_peer_discovery(void *arg)
3056 {
3057         struct lnet_peer *lp;
3058         int rc;
3059
3060         CDEBUG(D_NET, "started\n");
3061         cfs_block_allsigs();
3062
3063         for (;;) {
3064                 if (lnet_peer_discovery_wait_for_work())
3065                         break;
3066
3067                 lnet_resend_msgs();
3068
3069                 if (lnet_push_target_resize_needed())
3070                         lnet_push_target_resize();
3071
3072                 lnet_net_lock(LNET_LOCK_EX);
3073                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3074                         break;
3075
3076                 /*
3077                  * Process all incoming discovery work requests.  When
3078                  * discovery must wait on a peer to change state, it
3079                  * is added to the tail of the ln_dc_working queue. A
3080                  * timestamp keeps track of when the peer was added,
3081                  * so we can time out discovery requests that take too
3082                  * long.
3083                  */
3084                 while (!list_empty(&the_lnet.ln_dc_request)) {
3085                         lp = list_first_entry(&the_lnet.ln_dc_request,
3086                                               struct lnet_peer, lp_dc_list);
3087                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
3088                         /*
3089                          * set the time the peer was put on the dc_working
3090                          * queue. It shouldn't remain on the queue
3091                          * forever, in case the GET message (for ping)
3092                          * doesn't get a REPLY or the PUT message (for
3093                          * push) doesn't get an ACK.
3094                          */
3095                         lp->lp_last_queued = ktime_get_real_seconds();
3096                         lnet_net_unlock(LNET_LOCK_EX);
3097
3098                         /*
3099                          * Select an action depending on the state of
3100                          * the peer and whether discovery is disabled.
3101                          * The check whether discovery is disabled is
3102                          * done after the code that handles processing
3103                          * for arrived data, cleanup for failures, and
3104                          * forcing a Ping or Push.
3105                          */
3106                         spin_lock(&lp->lp_lock);
3107                         CDEBUG(D_NET, "peer %s state %#x\n",
3108                                 libcfs_nid2str(lp->lp_primary_nid),
3109                                 lp->lp_state);
3110                         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
3111                                 rc = lnet_peer_data_present(lp);
3112                         else if (lp->lp_state & LNET_PEER_PING_FAILED)
3113                                 rc = lnet_peer_ping_failed(lp);
3114                         else if (lp->lp_state & LNET_PEER_PUSH_FAILED)
3115                                 rc = lnet_peer_push_failed(lp);
3116                         else if (lp->lp_state & LNET_PEER_FORCE_PING)
3117                                 rc = lnet_peer_send_ping(lp);
3118                         else if (lp->lp_state & LNET_PEER_FORCE_PUSH)
3119                                 rc = lnet_peer_send_push(lp);
3120                         else if (lnet_peer_discovery_disabled)
3121                                 rc = lnet_peer_rediscover(lp);
3122                         else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
3123                                 rc = lnet_peer_send_ping(lp);
3124                         else if (lnet_peer_needs_push(lp))
3125                                 rc = lnet_peer_send_push(lp);
3126                         else
3127                                 rc = lnet_peer_discovered(lp);
3128                         CDEBUG(D_NET, "peer %s state %#x rc %d\n",
3129                                 libcfs_nid2str(lp->lp_primary_nid),
3130                                 lp->lp_state, rc);
3131                         spin_unlock(&lp->lp_lock);
3132
3133                         lnet_net_lock(LNET_LOCK_EX);
3134                         if (rc == LNET_REDISCOVER_PEER) {
3135                                 list_move(&lp->lp_dc_list,
3136                                           &the_lnet.ln_dc_request);
3137                         } else if (rc) {
3138                                 lnet_peer_discovery_error(lp, rc);
3139                         }
3140                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
3141                                 lnet_peer_discovery_complete(lp);
3142                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3143                                 break;
3144                 }
3145
3146                 lnet_net_unlock(LNET_LOCK_EX);
3147         }
3148
3149         CDEBUG(D_NET, "stopping\n");
3150         /*
3151          * Clean up before telling lnet_peer_discovery_stop() that
3152          * we're done. Use wake_up() below to somewhat reduce the
3153          * size of the thundering herd if there are multiple threads
3154          * waiting on discovery of a single peer.
3155          */
3156         LNetEQFree(the_lnet.ln_dc_eqh);
3157         LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3158
3159         /* Queue cleanup 1: stop all pending pings and pushes. */
3160         lnet_net_lock(LNET_LOCK_EX);
3161         while (!list_empty(&the_lnet.ln_dc_working)) {
3162                 lp = list_first_entry(&the_lnet.ln_dc_working,
3163                                       struct lnet_peer, lp_dc_list);
3164                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
3165                 lnet_net_unlock(LNET_LOCK_EX);
3166                 lnet_peer_cancel_discovery(lp);
3167                 lnet_net_lock(LNET_LOCK_EX);
3168         }
3169         lnet_net_unlock(LNET_LOCK_EX);
3170
3171         /* Queue cleanup 2: wait for the expired queue to clear. */
3172         while (!list_empty(&the_lnet.ln_dc_expired))
3173                 schedule_timeout(cfs_time_seconds(1));
3174
3175         /* Queue cleanup 3: clear the request queue. */
3176         lnet_net_lock(LNET_LOCK_EX);
3177         while (!list_empty(&the_lnet.ln_dc_request)) {
3178                 lp = list_first_entry(&the_lnet.ln_dc_request,
3179                                       struct lnet_peer, lp_dc_list);
3180                 lnet_peer_discovery_error(lp, -ESHUTDOWN);
3181                 lnet_peer_discovery_complete(lp);
3182         }
3183         lnet_net_unlock(LNET_LOCK_EX);
3184
3185         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3186         wake_up(&the_lnet.ln_dc_waitq);
3187
3188         CDEBUG(D_NET, "stopped\n");
3189
3190         return 0;
3191 }
3192
3193 /* ln_api_mutex is held on entry. */
3194 int lnet_peer_discovery_start(void)
3195 {
3196         struct task_struct *task;
3197         int rc;
3198
3199         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
3200                 return -EALREADY;
3201
3202         rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
3203         if (rc != 0) {
3204                 CERROR("Can't allocate discovery EQ: %d\n", rc);
3205                 return rc;
3206         }
3207
3208         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
3209         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
3210         if (IS_ERR(task)) {
3211                 rc = PTR_ERR(task);
3212                 CERROR("Can't start peer discovery thread: %d\n", rc);
3213
3214                 LNetEQFree(the_lnet.ln_dc_eqh);
3215                 LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3216
3217                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3218         }
3219
3220         CDEBUG(D_NET, "discovery start: %d\n", rc);
3221
3222         return rc;
3223 }
3224
3225 /* ln_api_mutex is held on entry. */
3226 void lnet_peer_discovery_stop(void)
3227 {
3228         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
3229                 return;
3230
3231         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
3232         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
3233         wake_up(&the_lnet.ln_dc_waitq);
3234
3235         wait_event(the_lnet.ln_dc_waitq,
3236                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
3237
3238         LASSERT(list_empty(&the_lnet.ln_dc_request));
3239         LASSERT(list_empty(&the_lnet.ln_dc_working));
3240         LASSERT(list_empty(&the_lnet.ln_dc_expired));
3241
3242         CDEBUG(D_NET, "discovery stopped\n");
3243 }
3244
3245 /* Debugging */
3246
3247 void
3248 lnet_debug_peer(lnet_nid_t nid)
3249 {
3250         char                    *aliveness = "NA";
3251         struct lnet_peer_ni     *lp;
3252         int                     cpt;
3253
3254         cpt = lnet_cpt_of_nid(nid, NULL);
3255         lnet_net_lock(cpt);
3256
3257         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
3258         if (IS_ERR(lp)) {
3259                 lnet_net_unlock(cpt);
3260                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
3261                 return;
3262         }
3263
3264         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
3265                 aliveness = lp->lpni_alive ? "up" : "down";
3266
3267         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
3268                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
3269                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
3270                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
3271                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
3272
3273         lnet_peer_ni_decref_locked(lp);
3274
3275         lnet_net_unlock(cpt);
3276 }
3277
3278 /* Gathering information for userspace. */
3279
3280 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
3281                           char aliveness[LNET_MAX_STR_LEN],
3282                           __u32 *cpt_iter, __u32 *refcount,
3283                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
3284                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
3285                           __u32 *peer_tx_qnob)
3286 {
3287         struct lnet_peer_table          *peer_table;
3288         struct lnet_peer_ni             *lp;
3289         int                             j;
3290         int                             lncpt;
3291         bool                            found = false;
3292
3293         /* get the number of CPTs */
3294         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3295
3296         /* if the cpt number to be examined is >= the number of cpts in
3297          * the system then indicate that there are no more cpts to examin
3298          */
3299         if (*cpt_iter >= lncpt)
3300                 return -ENOENT;
3301
3302         /* get the current table */
3303         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
3304         /* if the ptable is NULL then there are no more cpts to examine */
3305         if (peer_table == NULL)
3306                 return -ENOENT;
3307
3308         lnet_net_lock(*cpt_iter);
3309
3310         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
3311                 struct list_head *peers = &peer_table->pt_hash[j];
3312
3313                 list_for_each_entry(lp, peers, lpni_hashlist) {
3314                         if (peer_index-- > 0)
3315                                 continue;
3316
3317                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
3318                         if (lnet_isrouter(lp) ||
3319                                 lnet_peer_aliveness_enabled(lp))
3320                                 snprintf(aliveness, LNET_MAX_STR_LEN,
3321                                          lp->lpni_alive ? "up" : "down");
3322
3323                         *nid = lp->lpni_nid;
3324                         *refcount = atomic_read(&lp->lpni_refcount);
3325                         *ni_peer_tx_credits =
3326                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
3327                         *peer_tx_credits = lp->lpni_txcredits;
3328                         *peer_rtr_credits = lp->lpni_rtrcredits;
3329                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
3330                         *peer_tx_qnob = lp->lpni_txqnob;
3331
3332                         found = true;
3333                 }
3334
3335         }
3336         lnet_net_unlock(*cpt_iter);
3337
3338         *cpt_iter = lncpt;
3339
3340         return found ? 0 : -ENOENT;
3341 }
3342
3343 /* ln_api_mutex is held, which keeps the peer list stable */
3344 int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
3345 {
3346         struct lnet_ioctl_element_stats *lpni_stats;
3347         struct lnet_ioctl_element_msg_stats *lpni_msg_stats;
3348         struct lnet_ioctl_peer_ni_hstats *lpni_hstats;
3349         struct lnet_peer_ni_credit_info *lpni_info;
3350         struct lnet_peer_ni *lpni;
3351         struct lnet_peer *lp;
3352         lnet_nid_t nid;
3353         __u32 size;
3354         int rc;
3355
3356         lp = lnet_find_peer(cfg->prcfg_prim_nid);
3357
3358         if (!lp) {
3359                 rc = -ENOENT;
3360                 goto out;
3361         }
3362
3363         size = sizeof(nid) + sizeof(*lpni_info) + sizeof(*lpni_stats)
3364                 + sizeof(*lpni_msg_stats) + sizeof(*lpni_hstats);
3365         size *= lp->lp_nnis;
3366         if (size > cfg->prcfg_size) {
3367                 cfg->prcfg_size = size;
3368                 rc = -E2BIG;
3369                 goto out_lp_decref;
3370         }
3371
3372         cfg->prcfg_prim_nid = lp->lp_primary_nid;
3373         cfg->prcfg_mr = lnet_peer_is_multi_rail(lp);
3374         cfg->prcfg_cfg_nid = lp->lp_primary_nid;
3375         cfg->prcfg_count = lp->lp_nnis;
3376         cfg->prcfg_size = size;
3377         cfg->prcfg_state = lp->lp_state;
3378
3379         /* Allocate helper buffers. */
3380         rc = -ENOMEM;
3381         LIBCFS_ALLOC(lpni_info, sizeof(*lpni_info));
3382         if (!lpni_info)
3383                 goto out_lp_decref;
3384         LIBCFS_ALLOC(lpni_stats, sizeof(*lpni_stats));
3385         if (!lpni_stats)
3386                 goto out_free_info;
3387         LIBCFS_ALLOC(lpni_msg_stats, sizeof(*lpni_msg_stats));
3388         if (!lpni_msg_stats)
3389                 goto out_free_stats;
3390         LIBCFS_ALLOC(lpni_hstats, sizeof(*lpni_hstats));
3391         if (!lpni_hstats)
3392                 goto out_free_msg_stats;
3393
3394
3395         lpni = NULL;
3396         rc = -EFAULT;
3397         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
3398                 nid = lpni->lpni_nid;
3399                 if (copy_to_user(bulk, &nid, sizeof(nid)))
3400                         goto out_free_hstats;
3401                 bulk += sizeof(nid);
3402
3403                 memset(lpni_info, 0, sizeof(*lpni_info));
3404                 snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
3405                 if (lnet_isrouter(lpni) ||
3406                         lnet_peer_aliveness_enabled(lpni))
3407                         snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN,
3408                                 lpni->lpni_alive ? "up" : "down");
3409
3410                 lpni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
3411                 lpni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
3412                         lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
3413                 lpni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
3414                 lpni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
3415                 lpni_info->cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
3416                 lpni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
3417                 lpni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
3418                 if (copy_to_user(bulk, lpni_info, sizeof(*lpni_info)))
3419                         goto out_free_hstats;
3420                 bulk += sizeof(*lpni_info);
3421
3422                 memset(lpni_stats, 0, sizeof(*lpni_stats));
3423                 lpni_stats->iel_send_count = lnet_sum_stats(&lpni->lpni_stats,
3424                                                             LNET_STATS_TYPE_SEND);
3425                 lpni_stats->iel_recv_count = lnet_sum_stats(&lpni->lpni_stats,
3426                                                             LNET_STATS_TYPE_RECV);
3427                 lpni_stats->iel_drop_count = lnet_sum_stats(&lpni->lpni_stats,
3428                                                             LNET_STATS_TYPE_DROP);
3429                 if (copy_to_user(bulk, lpni_stats, sizeof(*lpni_stats)))
3430                         goto out_free_hstats;
3431                 bulk += sizeof(*lpni_stats);
3432                 lnet_usr_translate_stats(lpni_msg_stats, &lpni->lpni_stats);
3433                 if (copy_to_user(bulk, lpni_msg_stats, sizeof(*lpni_msg_stats)))
3434                         goto out_free_hstats;
3435                 bulk += sizeof(*lpni_msg_stats);
3436                 lpni_hstats->hlpni_network_timeout =
3437                   atomic_read(&lpni->lpni_hstats.hlt_network_timeout);
3438                 lpni_hstats->hlpni_remote_dropped =
3439                   atomic_read(&lpni->lpni_hstats.hlt_remote_dropped);
3440                 lpni_hstats->hlpni_remote_timeout =
3441                   atomic_read(&lpni->lpni_hstats.hlt_remote_timeout);
3442                 lpni_hstats->hlpni_remote_error =
3443                   atomic_read(&lpni->lpni_hstats.hlt_remote_error);
3444                 lpni_hstats->hlpni_health_value =
3445                   atomic_read(&lpni->lpni_healthv);
3446                 if (copy_to_user(bulk, lpni_hstats, sizeof(*lpni_hstats)))
3447                         goto out_free_hstats;
3448                 bulk += sizeof(*lpni_hstats);
3449         }
3450         rc = 0;
3451
3452 out_free_hstats:
3453         LIBCFS_FREE(lpni_hstats, sizeof(*lpni_hstats));
3454 out_free_msg_stats:
3455         LIBCFS_FREE(lpni_msg_stats, sizeof(*lpni_msg_stats));
3456 out_free_stats:
3457         LIBCFS_FREE(lpni_stats, sizeof(*lpni_stats));
3458 out_free_info:
3459         LIBCFS_FREE(lpni_info, sizeof(*lpni_info));
3460 out_lp_decref:
3461         lnet_peer_decref_locked(lp);
3462 out:
3463         return rc;
3464 }
3465
3466 void
3467 lnet_peer_ni_add_to_recoveryq_locked(struct lnet_peer_ni *lpni)
3468 {
3469         /* the mt could've shutdown and cleaned up the queues */
3470         if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
3471                 return;
3472
3473         if (list_empty(&lpni->lpni_recovery) &&
3474             atomic_read(&lpni->lpni_healthv) < LNET_MAX_HEALTH_VALUE) {
3475                 CERROR("lpni %s added to recovery queue. Health = %d\n",
3476                         libcfs_nid2str(lpni->lpni_nid),
3477                         atomic_read(&lpni->lpni_healthv));
3478                 list_add_tail(&lpni->lpni_recovery, &the_lnet.ln_mt_peerNIRecovq);
3479                 lnet_peer_ni_addref_locked(lpni);
3480         }
3481 }
3482
3483 /* Call with the ln_api_mutex held */
3484 void
3485 lnet_peer_ni_set_healthv(lnet_nid_t nid, int value, bool all)
3486 {
3487         struct lnet_peer_table *ptable;
3488         struct lnet_peer *lp;
3489         struct lnet_peer_net *lpn;
3490         struct lnet_peer_ni *lpni;
3491         int lncpt;
3492         int cpt;
3493
3494         if (the_lnet.ln_state != LNET_STATE_RUNNING)
3495                 return;
3496
3497         if (!all) {
3498                 lnet_net_lock(LNET_LOCK_EX);
3499                 lpni = lnet_find_peer_ni_locked(nid);
3500                 if (!lpni) {
3501                         lnet_net_unlock(LNET_LOCK_EX);
3502                         return;
3503                 }
3504                 atomic_set(&lpni->lpni_healthv, value);
3505                 lnet_peer_ni_add_to_recoveryq_locked(lpni);
3506                 lnet_peer_ni_decref_locked(lpni);
3507                 lnet_net_unlock(LNET_LOCK_EX);
3508                 return;
3509         }
3510
3511         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3512
3513         /*
3514          * Walk all the peers and reset the healhv for each one to the
3515          * maximum value.
3516          */
3517         lnet_net_lock(LNET_LOCK_EX);
3518         for (cpt = 0; cpt < lncpt; cpt++) {
3519                 ptable = the_lnet.ln_peer_tables[cpt];
3520                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
3521                         list_for_each_entry(lpn, &lp->lp_peer_nets, lpn_peer_nets) {
3522                                 list_for_each_entry(lpni, &lpn->lpn_peer_nis,
3523                                                     lpni_peer_nis) {
3524                                         atomic_set(&lpni->lpni_healthv, value);
3525                                         lnet_peer_ni_add_to_recoveryq_locked(lpni);
3526                                 }
3527                         }
3528                 }
3529         }
3530         lnet_net_unlock(LNET_LOCK_EX);
3531 }
3532