Whamcloud - gitweb
1ad2fc26139dfe67e85d151d648e3cdfb76e2d58
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <linux/sched.h>
38 #ifdef HAVE_SCHED_HEADERS
39 #include <linux/sched/signal.h>
40 #endif
41 #include <linux/uaccess.h>
42
43 #include <lnet/lib-lnet.h>
44 #include <uapi/linux/lnet/lnet-dlc.h>
45
46 /* Value indicating that recovery needs to re-check a peer immediately. */
47 #define LNET_REDISCOVER_PEER    (1)
48
49 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp);
50
51 static void
52 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
53 {
54         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
55                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
56                 lnet_peer_ni_decref_locked(lpni);
57         }
58 }
59
60 void
61 lnet_peer_net_added(struct lnet_net *net)
62 {
63         struct lnet_peer_ni *lpni, *tmp;
64
65         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
66                                  lpni_on_remote_peer_ni_list) {
67
68                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
69                         lpni->lpni_net = net;
70
71                         spin_lock(&lpni->lpni_lock);
72                         lpni->lpni_txcredits =
73                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
74                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
75                         lpni->lpni_rtrcredits =
76                                 lnet_peer_buffer_credits(lpni->lpni_net);
77                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
78                         spin_unlock(&lpni->lpni_lock);
79
80                         lnet_peer_remove_from_remote_list(lpni);
81                 }
82         }
83 }
84
85 static void
86 lnet_peer_tables_destroy(void)
87 {
88         struct lnet_peer_table  *ptable;
89         struct list_head        *hash;
90         int                     i;
91         int                     j;
92
93         if (!the_lnet.ln_peer_tables)
94                 return;
95
96         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
97                 hash = ptable->pt_hash;
98                 if (!hash) /* not intialized */
99                         break;
100
101                 LASSERT(list_empty(&ptable->pt_zombie_list));
102
103                 ptable->pt_hash = NULL;
104                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
105                         LASSERT(list_empty(&hash[j]));
106
107                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
108         }
109
110         cfs_percpt_free(the_lnet.ln_peer_tables);
111         the_lnet.ln_peer_tables = NULL;
112 }
113
114 int
115 lnet_peer_tables_create(void)
116 {
117         struct lnet_peer_table  *ptable;
118         struct list_head        *hash;
119         int                     i;
120         int                     j;
121
122         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
123                                                    sizeof(*ptable));
124         if (the_lnet.ln_peer_tables == NULL) {
125                 CERROR("Failed to allocate cpu-partition peer tables\n");
126                 return -ENOMEM;
127         }
128
129         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
130                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
131                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
132                 if (hash == NULL) {
133                         CERROR("Failed to create peer hash table\n");
134                         lnet_peer_tables_destroy();
135                         return -ENOMEM;
136                 }
137
138                 spin_lock_init(&ptable->pt_zombie_lock);
139                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
140
141                 INIT_LIST_HEAD(&ptable->pt_peer_list);
142
143                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
144                         INIT_LIST_HEAD(&hash[j]);
145                 ptable->pt_hash = hash; /* sign of initialization */
146         }
147
148         return 0;
149 }
150
151 static struct lnet_peer_ni *
152 lnet_peer_ni_alloc(lnet_nid_t nid)
153 {
154         struct lnet_peer_ni *lpni;
155         struct lnet_net *net;
156         int cpt;
157
158         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
159
160         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
161         if (!lpni)
162                 return NULL;
163
164         INIT_LIST_HEAD(&lpni->lpni_txq);
165         INIT_LIST_HEAD(&lpni->lpni_rtrq);
166         INIT_LIST_HEAD(&lpni->lpni_routes);
167         INIT_LIST_HEAD(&lpni->lpni_hashlist);
168         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
169         INIT_LIST_HEAD(&lpni->lpni_recovery);
170         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
171         LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
172
173         spin_lock_init(&lpni->lpni_lock);
174
175         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
176         lpni->lpni_last_alive = ktime_get_seconds(); /* assumes alive */
177         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
178         lpni->lpni_nid = nid;
179         lpni->lpni_cpt = cpt;
180         atomic_set(&lpni->lpni_healthv, LNET_MAX_HEALTH_VALUE);
181
182         net = lnet_get_net_locked(LNET_NIDNET(nid));
183         lpni->lpni_net = net;
184         if (net) {
185                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
186                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
187                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
188                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
189         } else {
190                 /*
191                  * This peer_ni is not on a local network, so we
192                  * cannot add the credits here. In case the net is
193                  * added later, add the peer_ni to the remote peer ni
194                  * list so it can be easily found and revisited.
195                  */
196                 /* FIXME: per-net implementation instead? */
197                 atomic_inc(&lpni->lpni_refcount);
198                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
199                               &the_lnet.ln_remote_peer_ni_list);
200         }
201
202         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
203
204         return lpni;
205 }
206
207 static struct lnet_peer_net *
208 lnet_peer_net_alloc(__u32 net_id)
209 {
210         struct lnet_peer_net *lpn;
211
212         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
213         if (!lpn)
214                 return NULL;
215
216         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
217         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
218         lpn->lpn_net_id = net_id;
219
220         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
221
222         return lpn;
223 }
224
225 void
226 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
227 {
228         struct lnet_peer *lp;
229
230         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
231
232         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
233         LASSERT(list_empty(&lpn->lpn_peer_nis));
234         LASSERT(list_empty(&lpn->lpn_peer_nets));
235         lp = lpn->lpn_peer;
236         lpn->lpn_peer = NULL;
237         LIBCFS_FREE(lpn, sizeof(*lpn));
238
239         lnet_peer_decref_locked(lp);
240 }
241
242 static struct lnet_peer *
243 lnet_peer_alloc(lnet_nid_t nid)
244 {
245         struct lnet_peer *lp;
246
247         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
248         if (!lp)
249                 return NULL;
250
251         INIT_LIST_HEAD(&lp->lp_peer_list);
252         INIT_LIST_HEAD(&lp->lp_peer_nets);
253         INIT_LIST_HEAD(&lp->lp_dc_list);
254         INIT_LIST_HEAD(&lp->lp_dc_pendq);
255         init_waitqueue_head(&lp->lp_dc_waitq);
256         spin_lock_init(&lp->lp_lock);
257         lp->lp_primary_nid = nid;
258         /*
259          * Turn off discovery for loopback peer. If you're creating a peer
260          * for the loopback interface then that was initiated when we
261          * attempted to send a message over the loopback. There is no need
262          * to ever use a different interface when sending messages to
263          * myself.
264          */
265         if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
266                 lp->lp_state = LNET_PEER_NO_DISCOVERY;
267         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
268
269         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
270
271         return lp;
272 }
273
274 void
275 lnet_destroy_peer_locked(struct lnet_peer *lp)
276 {
277         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
278
279         LASSERT(atomic_read(&lp->lp_refcount) == 0);
280         LASSERT(list_empty(&lp->lp_peer_nets));
281         LASSERT(list_empty(&lp->lp_peer_list));
282         LASSERT(list_empty(&lp->lp_dc_list));
283
284         if (lp->lp_data)
285                 lnet_ping_buffer_decref(lp->lp_data);
286
287         /*
288          * if there are messages still on the pending queue, then make
289          * sure to queue them on the ln_msg_resend list so they can be
290          * resent at a later point if the discovery thread is still
291          * running.
292          * If the discovery thread has stopped, then the wakeup will be a
293          * no-op, and it is expected the lnet_shutdown_lndnets() will
294          * eventually be called, which will traverse this list and
295          * finalize the messages on the list.
296          * We can not resend them now because we're holding the cpt lock.
297          * Releasing the lock can cause an inconsistent state
298          */
299         spin_lock(&the_lnet.ln_msg_resend_lock);
300         spin_lock(&lp->lp_lock);
301         list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
302         spin_unlock(&lp->lp_lock);
303         spin_unlock(&the_lnet.ln_msg_resend_lock);
304         wake_up(&the_lnet.ln_dc_waitq);
305
306         LIBCFS_FREE(lp, sizeof(*lp));
307 }
308
309 /*
310  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
311  * that peer_net, detach the peer_net from the peer.
312  *
313  * Call with lnet_net_lock/EX held
314  */
315 static void
316 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
317 {
318         struct lnet_peer_table *ptable;
319         struct lnet_peer_net *lpn;
320         struct lnet_peer *lp;
321
322         /*
323          * Belts and suspenders: gracefully handle teardown of a
324          * partially connected peer_ni.
325          */
326         lpn = lpni->lpni_peer_net;
327
328         list_del_init(&lpni->lpni_peer_nis);
329         /*
330          * If there are no lpni's left, we detach lpn from
331          * lp_peer_nets, so it cannot be found anymore.
332          */
333         if (list_empty(&lpn->lpn_peer_nis))
334                 list_del_init(&lpn->lpn_peer_nets);
335
336         /* Update peer NID count. */
337         lp = lpn->lpn_peer;
338         lp->lp_nnis--;
339
340         /*
341          * If there are no more peer nets, make the peer unfindable
342          * via the peer_tables.
343          *
344          * Otherwise, if the peer is DISCOVERED, tell discovery to
345          * take another look at it. This is a no-op if discovery for
346          * this peer did the detaching.
347          */
348         if (list_empty(&lp->lp_peer_nets)) {
349                 list_del_init(&lp->lp_peer_list);
350                 ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
351                 ptable->pt_peers--;
352         } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) {
353                 /* Discovery isn't running, nothing to do here. */
354         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
355                 lnet_peer_queue_for_discovery(lp);
356                 wake_up(&the_lnet.ln_dc_waitq);
357         }
358         CDEBUG(D_NET, "peer %s NID %s\n",
359                 libcfs_nid2str(lp->lp_primary_nid),
360                 libcfs_nid2str(lpni->lpni_nid));
361 }
362
363 /* called with lnet_net_lock LNET_LOCK_EX held */
364 static int
365 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
366 {
367         struct lnet_peer_table *ptable = NULL;
368
369         /* don't remove a peer_ni if it's also a gateway */
370         if (lpni->lpni_rtr_refcount > 0) {
371                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
372                        libcfs_nid2str(lpni->lpni_nid));
373                 return -EBUSY;
374         }
375
376         lnet_peer_remove_from_remote_list(lpni);
377
378         /* remove peer ni from the hash list. */
379         list_del_init(&lpni->lpni_hashlist);
380
381         /*
382          * indicate the peer is being deleted so the monitor thread can
383          * remove it from the recovery queue.
384          */
385         spin_lock(&lpni->lpni_lock);
386         lpni->lpni_state |= LNET_PEER_NI_DELETING;
387         spin_unlock(&lpni->lpni_lock);
388
389         /* decrement the ref count on the peer table */
390         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
391         LASSERT(ptable->pt_number > 0);
392         ptable->pt_number--;
393
394         /*
395          * The peer_ni can no longer be found with a lookup. But there
396          * can be current users, so keep track of it on the zombie
397          * list until the reference count has gone to zero.
398          *
399          * The last reference may be lost in a place where the
400          * lnet_net_lock locks only a single cpt, and that cpt may not
401          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
402          * has its own lock.
403          */
404         spin_lock(&ptable->pt_zombie_lock);
405         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
406         ptable->pt_zombies++;
407         spin_unlock(&ptable->pt_zombie_lock);
408
409         /* no need to keep this peer_ni on the hierarchy anymore */
410         lnet_peer_detach_peer_ni_locked(lpni);
411
412         /* remove hashlist reference on peer_ni */
413         lnet_peer_ni_decref_locked(lpni);
414
415         return 0;
416 }
417
418 void lnet_peer_uninit(void)
419 {
420         struct lnet_peer_ni *lpni, *tmp;
421
422         lnet_net_lock(LNET_LOCK_EX);
423
424         /* remove all peer_nis from the remote peer and the hash list */
425         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
426                                  lpni_on_remote_peer_ni_list)
427                 lnet_peer_ni_del_locked(lpni);
428
429         lnet_peer_tables_destroy();
430
431         lnet_net_unlock(LNET_LOCK_EX);
432 }
433
434 static int
435 lnet_peer_del_locked(struct lnet_peer *peer)
436 {
437         struct lnet_peer_ni *lpni = NULL, *lpni2;
438         int rc = 0, rc2 = 0;
439
440         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
441
442         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
443         while (lpni != NULL) {
444                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
445                 rc = lnet_peer_ni_del_locked(lpni);
446                 if (rc != 0)
447                         rc2 = rc;
448                 lpni = lpni2;
449         }
450
451         return rc2;
452 }
453
454 static int
455 lnet_peer_del(struct lnet_peer *peer)
456 {
457         lnet_net_lock(LNET_LOCK_EX);
458         lnet_peer_del_locked(peer);
459         lnet_net_unlock(LNET_LOCK_EX);
460
461         return 0;
462 }
463
464 /*
465  * Delete a NID from a peer. Call with ln_api_mutex held.
466  *
467  * Error codes:
468  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
469  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
470  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
471  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
472  */
473 static int
474 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
475 {
476         struct lnet_peer_ni *lpni;
477         lnet_nid_t primary_nid = lp->lp_primary_nid;
478         int rc = 0;
479
480         if (!(flags & LNET_PEER_CONFIGURED)) {
481                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
482                         rc = -EPERM;
483                         goto out;
484                 }
485         }
486         lpni = lnet_find_peer_ni_locked(nid);
487         if (!lpni) {
488                 rc = -ENOENT;
489                 goto out;
490         }
491         lnet_peer_ni_decref_locked(lpni);
492         if (lp != lpni->lpni_peer_net->lpn_peer) {
493                 rc = -ECHILD;
494                 goto out;
495         }
496
497         /*
498          * This function only allows deletion of the primary NID if it
499          * is the only NID.
500          */
501         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
502                 rc = -EBUSY;
503                 goto out;
504         }
505
506         lnet_net_lock(LNET_LOCK_EX);
507
508         rc = lnet_peer_ni_del_locked(lpni);
509
510         lnet_net_unlock(LNET_LOCK_EX);
511
512 out:
513         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
514                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
515
516         return rc;
517 }
518
519 static void
520 lnet_peer_table_cleanup_locked(struct lnet_net *net,
521                                struct lnet_peer_table *ptable)
522 {
523         int                      i;
524         struct lnet_peer_ni     *next;
525         struct lnet_peer_ni     *lpni;
526         struct lnet_peer        *peer;
527
528         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
529                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
530                                          lpni_hashlist) {
531                         if (net != NULL && net != lpni->lpni_net)
532                                 continue;
533
534                         peer = lpni->lpni_peer_net->lpn_peer;
535                         if (peer->lp_primary_nid != lpni->lpni_nid) {
536                                 lnet_peer_ni_del_locked(lpni);
537                                 continue;
538                         }
539                         /*
540                          * Removing the primary NID implies removing
541                          * the entire peer. Advance next beyond any
542                          * peer_ni that belongs to the same peer.
543                          */
544                         list_for_each_entry_from(next, &ptable->pt_hash[i],
545                                                  lpni_hashlist) {
546                                 if (next->lpni_peer_net->lpn_peer != peer)
547                                         break;
548                         }
549                         lnet_peer_del_locked(peer);
550                 }
551         }
552 }
553
554 static void
555 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
556 {
557         int     i = 3;
558
559         spin_lock(&ptable->pt_zombie_lock);
560         while (ptable->pt_zombies) {
561                 spin_unlock(&ptable->pt_zombie_lock);
562
563                 if (is_power_of_2(i)) {
564                         CDEBUG(D_WARNING,
565                                "Waiting for %d zombies on peer table\n",
566                                ptable->pt_zombies);
567                 }
568                 set_current_state(TASK_UNINTERRUPTIBLE);
569                 schedule_timeout(cfs_time_seconds(1) >> 1);
570                 spin_lock(&ptable->pt_zombie_lock);
571         }
572         spin_unlock(&ptable->pt_zombie_lock);
573 }
574
575 static void
576 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
577                                 struct lnet_peer_table *ptable)
578 {
579         struct lnet_peer_ni     *lp;
580         struct lnet_peer_ni     *tmp;
581         lnet_nid_t              lpni_nid;
582         int                     i;
583
584         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
585                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
586                                          lpni_hashlist) {
587                         if (net != lp->lpni_net)
588                                 continue;
589
590                         if (lp->lpni_rtr_refcount == 0)
591                                 continue;
592
593                         lpni_nid = lp->lpni_nid;
594
595                         lnet_net_unlock(LNET_LOCK_EX);
596                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
597                         lnet_net_lock(LNET_LOCK_EX);
598                 }
599         }
600 }
601
602 void
603 lnet_peer_tables_cleanup(struct lnet_net *net)
604 {
605         int i;
606         struct lnet_peer_table *ptable;
607
608         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
609         /* If just deleting the peers for a NI, get rid of any routes these
610          * peers are gateways for. */
611         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
612                 lnet_net_lock(LNET_LOCK_EX);
613                 lnet_peer_table_del_rtrs_locked(net, ptable);
614                 lnet_net_unlock(LNET_LOCK_EX);
615         }
616
617         /* Start the cleanup process */
618         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
619                 lnet_net_lock(LNET_LOCK_EX);
620                 lnet_peer_table_cleanup_locked(net, ptable);
621                 lnet_net_unlock(LNET_LOCK_EX);
622         }
623
624         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
625                 lnet_peer_ni_finalize_wait(ptable);
626 }
627
628 static struct lnet_peer_ni *
629 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
630 {
631         struct list_head        *peers;
632         struct lnet_peer_ni     *lp;
633
634         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
635
636         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
637         list_for_each_entry(lp, peers, lpni_hashlist) {
638                 if (lp->lpni_nid == nid) {
639                         lnet_peer_ni_addref_locked(lp);
640                         return lp;
641                 }
642         }
643
644         return NULL;
645 }
646
647 struct lnet_peer_ni *
648 lnet_find_peer_ni_locked(lnet_nid_t nid)
649 {
650         struct lnet_peer_ni *lpni;
651         struct lnet_peer_table *ptable;
652         int cpt;
653
654         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
655
656         ptable = the_lnet.ln_peer_tables[cpt];
657         lpni = lnet_get_peer_ni_locked(ptable, nid);
658
659         return lpni;
660 }
661
662 struct lnet_peer *
663 lnet_find_peer(lnet_nid_t nid)
664 {
665         struct lnet_peer_ni *lpni;
666         struct lnet_peer *lp = NULL;
667         int cpt;
668
669         cpt = lnet_net_lock_current();
670         lpni = lnet_find_peer_ni_locked(nid);
671         if (lpni) {
672                 lp = lpni->lpni_peer_net->lpn_peer;
673                 lnet_peer_addref_locked(lp);
674                 lnet_peer_ni_decref_locked(lpni);
675         }
676         lnet_net_unlock(cpt);
677
678         return lp;
679 }
680
681 struct lnet_peer_ni *
682 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
683                              struct lnet_peer_net *peer_net,
684                              struct lnet_peer_ni *prev)
685 {
686         struct lnet_peer_ni *lpni;
687         struct lnet_peer_net *net = peer_net;
688
689         if (!prev) {
690                 if (!net) {
691                         if (list_empty(&peer->lp_peer_nets))
692                                 return NULL;
693
694                         net = list_entry(peer->lp_peer_nets.next,
695                                          struct lnet_peer_net,
696                                          lpn_peer_nets);
697                 }
698                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
699                                   lpni_peer_nis);
700
701                 return lpni;
702         }
703
704         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
705                 /*
706                  * if you reached the end of the peer ni list and the peer
707                  * net is specified then there are no more peer nis in that
708                  * net.
709                  */
710                 if (net)
711                         return NULL;
712
713                 /*
714                  * we reached the end of this net ni list. move to the
715                  * next net
716                  */
717                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
718                     &peer->lp_peer_nets)
719                         /* no more nets and no more NIs. */
720                         return NULL;
721
722                 /* get the next net */
723                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
724                                  struct lnet_peer_net,
725                                  lpn_peer_nets);
726                 /* get the ni on it */
727                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
728                                   lpni_peer_nis);
729
730                 return lpni;
731         }
732
733         /* there are more nis left */
734         lpni = list_entry(prev->lpni_peer_nis.next,
735                           struct lnet_peer_ni, lpni_peer_nis);
736
737         return lpni;
738 }
739
740 /* Call with the ln_api_mutex held */
741 int lnet_get_peer_list(u32 *countp, u32 *sizep, struct lnet_process_id __user *ids)
742 {
743         struct lnet_process_id id;
744         struct lnet_peer_table *ptable;
745         struct lnet_peer *lp;
746         __u32 count = 0;
747         __u32 size = 0;
748         int lncpt;
749         int cpt;
750         __u32 i;
751         int rc;
752
753         rc = -ESHUTDOWN;
754         if (the_lnet.ln_state != LNET_STATE_RUNNING)
755                 goto done;
756
757         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
758
759         /*
760          * Count the number of peers, and return E2BIG if the buffer
761          * is too small. We'll also return the desired size.
762          */
763         rc = -E2BIG;
764         for (cpt = 0; cpt < lncpt; cpt++) {
765                 ptable = the_lnet.ln_peer_tables[cpt];
766                 count += ptable->pt_peers;
767         }
768         size = count * sizeof(*ids);
769         if (size > *sizep)
770                 goto done;
771
772         /*
773          * Walk the peer lists and copy out the primary nids.
774          * This is safe because the peer lists are only modified
775          * while the ln_api_mutex is held. So we don't need to
776          * hold the lnet_net_lock as well, and can therefore
777          * directly call copy_to_user().
778          */
779         rc = -EFAULT;
780         memset(&id, 0, sizeof(id));
781         id.pid = LNET_PID_LUSTRE;
782         i = 0;
783         for (cpt = 0; cpt < lncpt; cpt++) {
784                 ptable = the_lnet.ln_peer_tables[cpt];
785                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
786                         if (i >= count)
787                                 goto done;
788                         id.nid = lp->lp_primary_nid;
789                         if (copy_to_user(&ids[i], &id, sizeof(id)))
790                                 goto done;
791                         i++;
792                 }
793         }
794         rc = 0;
795 done:
796         *countp = count;
797         *sizep = size;
798         return rc;
799 }
800
801 /*
802  * Start pushes to peers that need to be updated for a configuration
803  * change on this node.
804  */
805 void
806 lnet_push_update_to_peers(int force)
807 {
808         struct lnet_peer_table *ptable;
809         struct lnet_peer *lp;
810         int lncpt;
811         int cpt;
812
813         lnet_net_lock(LNET_LOCK_EX);
814         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
815         for (cpt = 0; cpt < lncpt; cpt++) {
816                 ptable = the_lnet.ln_peer_tables[cpt];
817                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
818                         if (force) {
819                                 spin_lock(&lp->lp_lock);
820                                 if (lp->lp_state & LNET_PEER_MULTI_RAIL)
821                                         lp->lp_state |= LNET_PEER_FORCE_PUSH;
822                                 spin_unlock(&lp->lp_lock);
823                         }
824                         if (lnet_peer_needs_push(lp))
825                                 lnet_peer_queue_for_discovery(lp);
826                 }
827         }
828         lnet_net_unlock(LNET_LOCK_EX);
829         wake_up(&the_lnet.ln_dc_waitq);
830 }
831
832 /*
833  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
834  * this is a preferred point-to-point path. Call with lnet_net_lock in
835  * shared mmode.
836  */
837 bool
838 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
839 {
840         int i;
841
842         if (lpni->lpni_pref_nnids == 0)
843                 return false;
844         if (lpni->lpni_pref_nnids == 1)
845                 return lpni->lpni_pref.nid == nid;
846         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
847                 if (lpni->lpni_pref.nids[i] == nid)
848                         return true;
849         }
850         return false;
851 }
852
853 /*
854  * Set a single ni as preferred, provided no preferred ni is already
855  * defined. Only to be used for non-multi-rail peer_ni.
856  */
857 int
858 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
859 {
860         int rc = 0;
861
862         spin_lock(&lpni->lpni_lock);
863         if (nid == LNET_NID_ANY) {
864                 rc = -EINVAL;
865         } else if (lpni->lpni_pref_nnids > 0) {
866                 rc = -EPERM;
867         } else if (lpni->lpni_pref_nnids == 0) {
868                 lpni->lpni_pref.nid = nid;
869                 lpni->lpni_pref_nnids = 1;
870                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
871         }
872         spin_unlock(&lpni->lpni_lock);
873
874         CDEBUG(D_NET, "peer %s nid %s: %d\n",
875                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
876         return rc;
877 }
878
879 /*
880  * Clear the preferred NID from a non-multi-rail peer_ni, provided
881  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
882  */
883 int
884 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
885 {
886         int rc = 0;
887
888         spin_lock(&lpni->lpni_lock);
889         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
890                 lpni->lpni_pref_nnids = 0;
891                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
892         } else if (lpni->lpni_pref_nnids == 0) {
893                 rc = -ENOENT;
894         } else {
895                 rc = -EPERM;
896         }
897         spin_unlock(&lpni->lpni_lock);
898
899         CDEBUG(D_NET, "peer %s: %d\n",
900                libcfs_nid2str(lpni->lpni_nid), rc);
901         return rc;
902 }
903
904 /*
905  * Clear the preferred NIDs from a non-multi-rail peer.
906  */
907 void
908 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
909 {
910         struct lnet_peer_ni *lpni = NULL;
911
912         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
913                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
914 }
915
916 int
917 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
918 {
919         lnet_nid_t *nids = NULL;
920         lnet_nid_t *oldnids = NULL;
921         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
922         int size;
923         int i;
924         int rc = 0;
925
926         if (nid == LNET_NID_ANY) {
927                 rc = -EINVAL;
928                 goto out;
929         }
930
931         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
932                 rc = -EEXIST;
933                 goto out;
934         }
935
936         /* A non-MR node may have only one preferred NI per peer_ni */
937         if (lpni->lpni_pref_nnids > 0) {
938                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
939                         rc = -EPERM;
940                         goto out;
941                 }
942         }
943
944         if (lpni->lpni_pref_nnids != 0) {
945                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
946                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
947                 if (!nids) {
948                         rc = -ENOMEM;
949                         goto out;
950                 }
951                 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
952                         if (lpni->lpni_pref.nids[i] == nid) {
953                                 LIBCFS_FREE(nids, size);
954                                 rc = -EEXIST;
955                                 goto out;
956                         }
957                         nids[i] = lpni->lpni_pref.nids[i];
958                 }
959                 nids[i] = nid;
960         }
961
962         lnet_net_lock(LNET_LOCK_EX);
963         spin_lock(&lpni->lpni_lock);
964         if (lpni->lpni_pref_nnids == 0) {
965                 lpni->lpni_pref.nid = nid;
966         } else {
967                 oldnids = lpni->lpni_pref.nids;
968                 lpni->lpni_pref.nids = nids;
969         }
970         lpni->lpni_pref_nnids++;
971         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
972         spin_unlock(&lpni->lpni_lock);
973         lnet_net_unlock(LNET_LOCK_EX);
974
975         if (oldnids) {
976                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
977                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
978         }
979 out:
980         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
981                 spin_lock(&lpni->lpni_lock);
982                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
983                 spin_unlock(&lpni->lpni_lock);
984         }
985         CDEBUG(D_NET, "peer %s nid %s: %d\n",
986                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
987         return rc;
988 }
989
990 int
991 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
992 {
993         lnet_nid_t *nids = NULL;
994         lnet_nid_t *oldnids = NULL;
995         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
996         int size;
997         int i, j;
998         int rc = 0;
999
1000         if (lpni->lpni_pref_nnids == 0) {
1001                 rc = -ENOENT;
1002                 goto out;
1003         }
1004
1005         if (lpni->lpni_pref_nnids == 1) {
1006                 if (lpni->lpni_pref.nid != nid) {
1007                         rc = -ENOENT;
1008                         goto out;
1009                 }
1010         } else if (lpni->lpni_pref_nnids == 2) {
1011                 if (lpni->lpni_pref.nids[0] != nid &&
1012                     lpni->lpni_pref.nids[1] != nid) {
1013                         rc = -ENOENT;
1014                         goto out;
1015                 }
1016         } else {
1017                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
1018                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
1019                 if (!nids) {
1020                         rc = -ENOMEM;
1021                         goto out;
1022                 }
1023                 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
1024                         if (lpni->lpni_pref.nids[i] != nid)
1025                                 continue;
1026                         nids[j++] = lpni->lpni_pref.nids[i];
1027                 }
1028                 /* Check if we actually removed a nid. */
1029                 if (j == lpni->lpni_pref_nnids) {
1030                         LIBCFS_FREE(nids, size);
1031                         rc = -ENOENT;
1032                         goto out;
1033                 }
1034         }
1035
1036         lnet_net_lock(LNET_LOCK_EX);
1037         spin_lock(&lpni->lpni_lock);
1038         if (lpni->lpni_pref_nnids == 1) {
1039                 lpni->lpni_pref.nid = LNET_NID_ANY;
1040         } else if (lpni->lpni_pref_nnids == 2) {
1041                 oldnids = lpni->lpni_pref.nids;
1042                 if (oldnids[0] == nid)
1043                         lpni->lpni_pref.nid = oldnids[1];
1044                 else
1045                         lpni->lpni_pref.nid = oldnids[2];
1046         } else {
1047                 oldnids = lpni->lpni_pref.nids;
1048                 lpni->lpni_pref.nids = nids;
1049         }
1050         lpni->lpni_pref_nnids--;
1051         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1052         spin_unlock(&lpni->lpni_lock);
1053         lnet_net_unlock(LNET_LOCK_EX);
1054
1055         if (oldnids) {
1056                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
1057                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
1058         }
1059 out:
1060         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1061                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
1062         return rc;
1063 }
1064
1065 lnet_nid_t
1066 lnet_peer_primary_nid_locked(lnet_nid_t nid)
1067 {
1068         struct lnet_peer_ni *lpni;
1069         lnet_nid_t primary_nid = nid;
1070
1071         lpni = lnet_find_peer_ni_locked(nid);
1072         if (lpni) {
1073                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
1074                 lnet_peer_ni_decref_locked(lpni);
1075         }
1076
1077         return primary_nid;
1078 }
1079
1080 lnet_nid_t
1081 LNetPrimaryNID(lnet_nid_t nid)
1082 {
1083         struct lnet_peer *lp;
1084         struct lnet_peer_ni *lpni;
1085         lnet_nid_t primary_nid = nid;
1086         int rc = 0;
1087         int cpt;
1088
1089         cpt = lnet_net_lock_current();
1090         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1091         if (IS_ERR(lpni)) {
1092                 rc = PTR_ERR(lpni);
1093                 goto out_unlock;
1094         }
1095         lp = lpni->lpni_peer_net->lpn_peer;
1096         while (!lnet_peer_is_uptodate(lp)) {
1097                 rc = lnet_discover_peer_locked(lpni, cpt, true);
1098                 if (rc)
1099                         goto out_decref;
1100                 lp = lpni->lpni_peer_net->lpn_peer;
1101         }
1102         primary_nid = lp->lp_primary_nid;
1103 out_decref:
1104         lnet_peer_ni_decref_locked(lpni);
1105 out_unlock:
1106         lnet_net_unlock(cpt);
1107
1108         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
1109                libcfs_nid2str(primary_nid), rc);
1110         return primary_nid;
1111 }
1112 EXPORT_SYMBOL(LNetPrimaryNID);
1113
1114 struct lnet_peer_net *
1115 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
1116 {
1117         struct lnet_peer_net *peer_net;
1118         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
1119                 if (peer_net->lpn_net_id == net_id)
1120                         return peer_net;
1121         }
1122         return NULL;
1123 }
1124
1125 /*
1126  * Attach a peer_ni to a peer_net and peer. This function assumes
1127  * peer_ni is not already attached to the peer_net/peer. The peer_ni
1128  * may be attached to a different peer, in which case it will be
1129  * properly detached first. The whole operation is done atomically.
1130  *
1131  * Always returns 0.  This is the last function called from functions
1132  * that do return an int, so returning 0 here allows the compiler to
1133  * do a tail call.
1134  */
1135 static int
1136 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
1137                                 struct lnet_peer_net *lpn,
1138                                 struct lnet_peer_ni *lpni,
1139                                 unsigned flags)
1140 {
1141         struct lnet_peer_table *ptable;
1142
1143         /* Install the new peer_ni */
1144         lnet_net_lock(LNET_LOCK_EX);
1145         /* Add peer_ni to global peer table hash, if necessary. */
1146         if (list_empty(&lpni->lpni_hashlist)) {
1147                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1148
1149                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1150                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1151                 ptable->pt_version++;
1152                 ptable->pt_number++;
1153                 /* This is the 1st refcount on lpni. */
1154                 atomic_inc(&lpni->lpni_refcount);
1155         }
1156
1157         /* Detach the peer_ni from an existing peer, if necessary. */
1158         if (lpni->lpni_peer_net) {
1159                 LASSERT(lpni->lpni_peer_net != lpn);
1160                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1161                 lnet_peer_detach_peer_ni_locked(lpni);
1162                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1163                 lpni->lpni_peer_net = NULL;
1164         }
1165
1166         /* Add peer_ni to peer_net */
1167         lpni->lpni_peer_net = lpn;
1168         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1169         lnet_peer_net_addref_locked(lpn);
1170
1171         /* Add peer_net to peer */
1172         if (!lpn->lpn_peer) {
1173                 lpn->lpn_peer = lp;
1174                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1175                 lnet_peer_addref_locked(lp);
1176         }
1177
1178         /* Add peer to global peer list, if necessary */
1179         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1180         if (list_empty(&lp->lp_peer_list)) {
1181                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1182                 ptable->pt_peers++;
1183         }
1184
1185
1186         /* Update peer state */
1187         spin_lock(&lp->lp_lock);
1188         if (flags & LNET_PEER_CONFIGURED) {
1189                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1190                         lp->lp_state |= LNET_PEER_CONFIGURED;
1191         }
1192         if (flags & LNET_PEER_MULTI_RAIL) {
1193                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1194                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1195                         lnet_peer_clr_non_mr_pref_nids(lp);
1196                 }
1197         }
1198         spin_unlock(&lp->lp_lock);
1199
1200         lp->lp_nnis++;
1201         lnet_net_unlock(LNET_LOCK_EX);
1202
1203         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1204                libcfs_nid2str(lp->lp_primary_nid),
1205                libcfs_nid2str(lpni->lpni_nid), flags);
1206
1207         return 0;
1208 }
1209
1210 /*
1211  * Create a new peer, with nid as its primary nid.
1212  *
1213  * Call with the lnet_api_mutex held.
1214  */
1215 static int
1216 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1217 {
1218         struct lnet_peer *lp;
1219         struct lnet_peer_net *lpn;
1220         struct lnet_peer_ni *lpni;
1221         int rc = 0;
1222
1223         LASSERT(nid != LNET_NID_ANY);
1224
1225         /*
1226          * No need for the lnet_net_lock here, because the
1227          * lnet_api_mutex is held.
1228          */
1229         lpni = lnet_find_peer_ni_locked(nid);
1230         if (lpni) {
1231                 /* A peer with this NID already exists. */
1232                 lp = lpni->lpni_peer_net->lpn_peer;
1233                 lnet_peer_ni_decref_locked(lpni);
1234                 /*
1235                  * This is an error if the peer was configured and the
1236                  * primary NID differs or an attempt is made to change
1237                  * the Multi-Rail flag. Otherwise the assumption is
1238                  * that an existing peer is being modified.
1239                  */
1240                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1241                         if (lp->lp_primary_nid != nid)
1242                                 rc = -EEXIST;
1243                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1244                                 rc = -EPERM;
1245                         goto out;
1246                 }
1247                 /* Delete and recreate as a configured peer. */
1248                 lnet_peer_del(lp);
1249         }
1250
1251         /* Create peer, peer_net, and peer_ni. */
1252         rc = -ENOMEM;
1253         lp = lnet_peer_alloc(nid);
1254         if (!lp)
1255                 goto out;
1256         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1257         if (!lpn)
1258                 goto out_free_lp;
1259         lpni = lnet_peer_ni_alloc(nid);
1260         if (!lpni)
1261                 goto out_free_lpn;
1262
1263         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1264
1265 out_free_lpn:
1266         LIBCFS_FREE(lpn, sizeof(*lpn));
1267 out_free_lp:
1268         LIBCFS_FREE(lp, sizeof(*lp));
1269 out:
1270         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1271                libcfs_nid2str(nid), flags, rc);
1272         return rc;
1273 }
1274
1275 /*
1276  * Add a NID to a peer. Call with ln_api_mutex held.
1277  *
1278  * Error codes:
1279  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1280  *  -EEXIST:   The NID was configured by DLC for a different peer.
1281  *  -ENOMEM:   Out of memory.
1282  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1283  *             non-multi-rail peer.
1284  */
1285 static int
1286 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1287 {
1288         struct lnet_peer_net *lpn;
1289         struct lnet_peer_ni *lpni;
1290         int rc = 0;
1291
1292         LASSERT(lp);
1293         LASSERT(nid != LNET_NID_ANY);
1294
1295         /* A configured peer can only be updated through configuration. */
1296         if (!(flags & LNET_PEER_CONFIGURED)) {
1297                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1298                         rc = -EPERM;
1299                         goto out;
1300                 }
1301         }
1302
1303         /*
1304          * The MULTI_RAIL flag can be set but not cleared, because
1305          * that would leave the peer struct in an invalid state.
1306          */
1307         if (flags & LNET_PEER_MULTI_RAIL) {
1308                 spin_lock(&lp->lp_lock);
1309                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1310                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1311                         lnet_peer_clr_non_mr_pref_nids(lp);
1312                 }
1313                 spin_unlock(&lp->lp_lock);
1314         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1315                 rc = -EPERM;
1316                 goto out;
1317         }
1318
1319         lpni = lnet_find_peer_ni_locked(nid);
1320         if (lpni) {
1321                 /*
1322                  * A peer_ni already exists. This is only a problem if
1323                  * it is not connected to this peer and was configured
1324                  * by DLC.
1325                  */
1326                 lnet_peer_ni_decref_locked(lpni);
1327                 if (lpni->lpni_peer_net->lpn_peer == lp)
1328                         goto out;
1329                 if (lnet_peer_ni_is_configured(lpni)) {
1330                         rc = -EEXIST;
1331                         goto out;
1332                 }
1333                 /* If this is the primary NID, destroy the peer. */
1334                 if (lnet_peer_ni_is_primary(lpni)) {
1335                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1336                         lpni = lnet_peer_ni_alloc(nid);
1337                         if (!lpni) {
1338                                 rc = -ENOMEM;
1339                                 goto out;
1340                         }
1341                 }
1342         } else {
1343                 lpni = lnet_peer_ni_alloc(nid);
1344                 if (!lpni) {
1345                         rc = -ENOMEM;
1346                         goto out;
1347                 }
1348         }
1349
1350         /*
1351          * Get the peer_net. Check that we're not adding a second
1352          * peer_ni on a peer_net of a non-multi-rail peer.
1353          */
1354         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1355         if (!lpn) {
1356                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1357                 if (!lpn) {
1358                         rc = -ENOMEM;
1359                         goto out_free_lpni;
1360                 }
1361         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1362                 rc = -ENOTUNIQ;
1363                 goto out_free_lpni;
1364         }
1365
1366         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1367
1368 out_free_lpni:
1369         /* If the peer_ni was allocated above its peer_net pointer is NULL */
1370         if (!lpni->lpni_peer_net)
1371                 LIBCFS_FREE(lpni, sizeof(*lpni));
1372 out:
1373         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1374                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1375                flags, rc);
1376         return rc;
1377 }
1378
1379 /*
1380  * Update the primary NID of a peer, if possible.
1381  *
1382  * Call with the lnet_api_mutex held.
1383  */
1384 static int
1385 lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1386 {
1387         lnet_nid_t old = lp->lp_primary_nid;
1388         int rc = 0;
1389
1390         if (lp->lp_primary_nid == nid)
1391                 goto out;
1392         rc = lnet_peer_add_nid(lp, nid, flags);
1393         if (rc)
1394                 goto out;
1395         lp->lp_primary_nid = nid;
1396 out:
1397         CDEBUG(D_NET, "peer %s NID %s: %d\n",
1398                libcfs_nid2str(old), libcfs_nid2str(nid), rc);
1399         return rc;
1400 }
1401
1402 /*
1403  * lpni creation initiated due to traffic either sending or receiving.
1404  */
1405 static int
1406 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1407 {
1408         struct lnet_peer *lp;
1409         struct lnet_peer_net *lpn;
1410         struct lnet_peer_ni *lpni;
1411         unsigned flags = 0;
1412         int rc = 0;
1413
1414         if (nid == LNET_NID_ANY) {
1415                 rc = -EINVAL;
1416                 goto out;
1417         }
1418
1419         /* lnet_net_lock is not needed here because ln_api_lock is held */
1420         lpni = lnet_find_peer_ni_locked(nid);
1421         if (lpni) {
1422                 /*
1423                  * We must have raced with another thread. Since we
1424                  * know next to nothing about a peer_ni created by
1425                  * traffic, we just assume everything is ok and
1426                  * return.
1427                  */
1428                 lnet_peer_ni_decref_locked(lpni);
1429                 goto out;
1430         }
1431
1432         /* Create peer, peer_net, and peer_ni. */
1433         rc = -ENOMEM;
1434         lp = lnet_peer_alloc(nid);
1435         if (!lp)
1436                 goto out;
1437         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1438         if (!lpn)
1439                 goto out_free_lp;
1440         lpni = lnet_peer_ni_alloc(nid);
1441         if (!lpni)
1442                 goto out_free_lpn;
1443         if (pref != LNET_NID_ANY)
1444                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1445
1446         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1447
1448 out_free_lpn:
1449         LIBCFS_FREE(lpn, sizeof(*lpn));
1450 out_free_lp:
1451         LIBCFS_FREE(lp, sizeof(*lp));
1452 out:
1453         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1454         return rc;
1455 }
1456
1457 /*
1458  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1459  *
1460  * This API handles the following combinations:
1461  *   Create a peer with its primary NI if only the prim_nid is provided
1462  *   Add a NID to a peer identified by the prim_nid. The peer identified
1463  *   by the prim_nid must already exist.
1464  *   The peer being created may be non-MR.
1465  *
1466  * The caller must hold ln_api_mutex. This prevents the peer from
1467  * being created/modified/deleted by a different thread.
1468  */
1469 int
1470 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1471 {
1472         struct lnet_peer *lp = NULL;
1473         struct lnet_peer_ni *lpni;
1474         unsigned flags;
1475
1476         /* The prim_nid must always be specified */
1477         if (prim_nid == LNET_NID_ANY)
1478                 return -EINVAL;
1479
1480         flags = LNET_PEER_CONFIGURED;
1481         if (mr)
1482                 flags |= LNET_PEER_MULTI_RAIL;
1483
1484         /*
1485          * If nid isn't specified, we must create a new peer with
1486          * prim_nid as its primary nid.
1487          */
1488         if (nid == LNET_NID_ANY)
1489                 return lnet_peer_add(prim_nid, flags);
1490
1491         /* Look up the prim_nid, which must exist. */
1492         lpni = lnet_find_peer_ni_locked(prim_nid);
1493         if (!lpni)
1494                 return -ENOENT;
1495         lnet_peer_ni_decref_locked(lpni);
1496         lp = lpni->lpni_peer_net->lpn_peer;
1497
1498         /* Peer must have been configured. */
1499         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1500                 CDEBUG(D_NET, "peer %s was not configured\n",
1501                        libcfs_nid2str(prim_nid));
1502                 return -ENOENT;
1503         }
1504
1505         /* Primary NID must match */
1506         if (lp->lp_primary_nid != prim_nid) {
1507                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1508                        libcfs_nid2str(prim_nid),
1509                        libcfs_nid2str(lp->lp_primary_nid));
1510                 return -ENODEV;
1511         }
1512
1513         /* Multi-Rail flag must match. */
1514         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1515                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1516                        libcfs_nid2str(prim_nid));
1517                 return -EPERM;
1518         }
1519
1520         return lnet_peer_add_nid(lp, nid, flags);
1521 }
1522
1523 /*
1524  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1525  *
1526  * This API handles the following combinations:
1527  *   Delete a NI from a peer if both prim_nid and nid are provided.
1528  *   Delete a peer if only prim_nid is provided.
1529  *   Delete a peer if its primary nid is provided.
1530  *
1531  * The caller must hold ln_api_mutex. This prevents the peer from
1532  * being modified/deleted by a different thread.
1533  */
1534 int
1535 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1536 {
1537         struct lnet_peer *lp;
1538         struct lnet_peer_ni *lpni;
1539         unsigned flags;
1540
1541         if (prim_nid == LNET_NID_ANY)
1542                 return -EINVAL;
1543
1544         lpni = lnet_find_peer_ni_locked(prim_nid);
1545         if (!lpni)
1546                 return -ENOENT;
1547         lnet_peer_ni_decref_locked(lpni);
1548         lp = lpni->lpni_peer_net->lpn_peer;
1549
1550         if (prim_nid != lp->lp_primary_nid) {
1551                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1552                        libcfs_nid2str(prim_nid),
1553                        libcfs_nid2str(lp->lp_primary_nid));
1554                 return -ENODEV;
1555         }
1556
1557         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1558                 return lnet_peer_del(lp);
1559
1560         flags = LNET_PEER_CONFIGURED;
1561         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1562                 flags |= LNET_PEER_MULTI_RAIL;
1563
1564         return lnet_peer_del_nid(lp, nid, flags);
1565 }
1566
1567 void
1568 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1569 {
1570         struct lnet_peer_table *ptable;
1571         struct lnet_peer_net *lpn;
1572
1573         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1574
1575         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1576         LASSERT(lpni->lpni_rtr_refcount == 0);
1577         LASSERT(list_empty(&lpni->lpni_txq));
1578         LASSERT(lpni->lpni_txqnob == 0);
1579         LASSERT(list_empty(&lpni->lpni_peer_nis));
1580         LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
1581
1582         lpn = lpni->lpni_peer_net;
1583         lpni->lpni_peer_net = NULL;
1584         lpni->lpni_net = NULL;
1585
1586         /* remove the peer ni from the zombie list */
1587         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1588         spin_lock(&ptable->pt_zombie_lock);
1589         list_del_init(&lpni->lpni_hashlist);
1590         ptable->pt_zombies--;
1591         spin_unlock(&ptable->pt_zombie_lock);
1592
1593         if (lpni->lpni_pref_nnids > 1) {
1594                 LIBCFS_FREE(lpni->lpni_pref.nids,
1595                         sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1596         }
1597         LIBCFS_FREE(lpni, sizeof(*lpni));
1598
1599         lnet_peer_net_decref_locked(lpn);
1600 }
1601
1602 struct lnet_peer_ni *
1603 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1604 {
1605         struct lnet_peer_ni *lpni = NULL;
1606         int rc;
1607
1608         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1609                 return ERR_PTR(-ESHUTDOWN);
1610
1611         /*
1612          * find if a peer_ni already exists.
1613          * If so then just return that.
1614          */
1615         lpni = lnet_find_peer_ni_locked(nid);
1616         if (lpni)
1617                 return lpni;
1618
1619         lnet_net_unlock(cpt);
1620
1621         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1622         if (rc) {
1623                 lpni = ERR_PTR(rc);
1624                 goto out_net_relock;
1625         }
1626
1627         lpni = lnet_find_peer_ni_locked(nid);
1628         LASSERT(lpni);
1629
1630 out_net_relock:
1631         lnet_net_lock(cpt);
1632
1633         return lpni;
1634 }
1635
1636 /*
1637  * Get a peer_ni for the given nid, create it if necessary. Takes a
1638  * hold on the peer_ni.
1639  */
1640 struct lnet_peer_ni *
1641 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1642 {
1643         struct lnet_peer_ni *lpni = NULL;
1644         int rc;
1645
1646         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1647                 return ERR_PTR(-ESHUTDOWN);
1648
1649         /*
1650          * find if a peer_ni already exists.
1651          * If so then just return that.
1652          */
1653         lpni = lnet_find_peer_ni_locked(nid);
1654         if (lpni)
1655                 return lpni;
1656
1657         /*
1658          * Slow path:
1659          * use the lnet_api_mutex to serialize the creation of the peer_ni
1660          * and the creation/deletion of the local ni/net. When a local ni is
1661          * created, if there exists a set of peer_nis on that network,
1662          * they need to be traversed and updated. When a local NI is
1663          * deleted, which could result in a network being deleted, then
1664          * all peer nis on that network need to be removed as well.
1665          *
1666          * Creation through traffic should also be serialized with
1667          * creation through DLC.
1668          */
1669         lnet_net_unlock(cpt);
1670         mutex_lock(&the_lnet.ln_api_mutex);
1671         /*
1672          * Shutdown is only set under the ln_api_lock, so a single
1673          * check here is sufficent.
1674          */
1675         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1676                 lpni = ERR_PTR(-ESHUTDOWN);
1677                 goto out_mutex_unlock;
1678         }
1679
1680         rc = lnet_peer_ni_traffic_add(nid, pref);
1681         if (rc) {
1682                 lpni = ERR_PTR(rc);
1683                 goto out_mutex_unlock;
1684         }
1685
1686         lpni = lnet_find_peer_ni_locked(nid);
1687         LASSERT(lpni);
1688
1689 out_mutex_unlock:
1690         mutex_unlock(&the_lnet.ln_api_mutex);
1691         lnet_net_lock(cpt);
1692
1693         /* Lock has been dropped, check again for shutdown. */
1694         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1695                 if (!IS_ERR(lpni))
1696                         lnet_peer_ni_decref_locked(lpni);
1697                 lpni = ERR_PTR(-ESHUTDOWN);
1698         }
1699
1700         return lpni;
1701 }
1702
1703 /*
1704  * Peer Discovery
1705  */
1706
1707 /*
1708  * Is a peer uptodate from the point of view of discovery?
1709  *
1710  * If it is currently being processed, obviously not.
1711  * A forced Ping or Push is also handled by the discovery thread.
1712  *
1713  * Otherwise look at whether the peer needs rediscovering.
1714  */
1715 bool
1716 lnet_peer_is_uptodate(struct lnet_peer *lp)
1717 {
1718         bool rc;
1719
1720         spin_lock(&lp->lp_lock);
1721         if (lp->lp_state & (LNET_PEER_DISCOVERING |
1722                             LNET_PEER_FORCE_PING |
1723                             LNET_PEER_FORCE_PUSH)) {
1724                 rc = false;
1725         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1726                 rc = true;
1727         } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
1728                 if (lnet_peer_discovery_disabled)
1729                         rc = true;
1730                 else
1731                         rc = false;
1732         } else if (lnet_peer_needs_push(lp)) {
1733                 rc = false;
1734         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1735                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1736                         rc = true;
1737                 else
1738                         rc = false;
1739         } else {
1740                 rc = false;
1741         }
1742         spin_unlock(&lp->lp_lock);
1743
1744         return rc;
1745 }
1746
1747 /*
1748  * Queue a peer for the attention of the discovery thread.  Call with
1749  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1750  * -EALREADY if the peer was already queued.
1751  */
1752 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1753 {
1754         int rc;
1755
1756         spin_lock(&lp->lp_lock);
1757         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1758                 lp->lp_state |= LNET_PEER_DISCOVERING;
1759         spin_unlock(&lp->lp_lock);
1760         if (list_empty(&lp->lp_dc_list)) {
1761                 lnet_peer_addref_locked(lp);
1762                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1763                 wake_up(&the_lnet.ln_dc_waitq);
1764                 rc = 0;
1765         } else {
1766                 rc = -EALREADY;
1767         }
1768
1769         CDEBUG(D_NET, "Queue peer %s: %d\n",
1770                libcfs_nid2str(lp->lp_primary_nid), rc);
1771
1772         return rc;
1773 }
1774
1775 /*
1776  * Discovery of a peer is complete. Wake all waiters on the peer.
1777  * Call with lnet_net_lock/EX held.
1778  */
1779 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1780 {
1781         struct lnet_msg *msg, *tmp;
1782         int rc = 0;
1783         struct list_head pending_msgs;
1784
1785         INIT_LIST_HEAD(&pending_msgs);
1786
1787         CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n",
1788                libcfs_nid2str(lp->lp_primary_nid));
1789
1790         list_del_init(&lp->lp_dc_list);
1791         spin_lock(&lp->lp_lock);
1792         list_splice_init(&lp->lp_dc_pendq, &pending_msgs);
1793         spin_unlock(&lp->lp_lock);
1794         wake_up_all(&lp->lp_dc_waitq);
1795
1796         lnet_net_unlock(LNET_LOCK_EX);
1797
1798         /* iterate through all pending messages and send them again */
1799         list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
1800                 list_del_init(&msg->msg_list);
1801                 if (lp->lp_dc_error) {
1802                         lnet_finalize(msg, lp->lp_dc_error);
1803                         continue;
1804                 }
1805
1806                 CDEBUG(D_NET, "sending pending message %s to target %s\n",
1807                        lnet_msgtyp2str(msg->msg_type),
1808                        libcfs_id2str(msg->msg_target));
1809                 rc = lnet_send(msg->msg_src_nid_param, msg,
1810                                msg->msg_rtr_nid_param);
1811                 if (rc < 0) {
1812                         CNETERR("Error sending %s to %s: %d\n",
1813                                lnet_msgtyp2str(msg->msg_type),
1814                                libcfs_id2str(msg->msg_target), rc);
1815                         lnet_finalize(msg, rc);
1816                 }
1817         }
1818         lnet_net_lock(LNET_LOCK_EX);
1819         lnet_peer_decref_locked(lp);
1820 }
1821
1822 /*
1823  * Handle inbound push.
1824  * Like any event handler, called with lnet_res_lock/CPT held.
1825  */
1826 void lnet_peer_push_event(struct lnet_event *ev)
1827 {
1828         struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
1829         struct lnet_peer *lp;
1830
1831         /* lnet_find_peer() adds a refcount */
1832         lp = lnet_find_peer(ev->source.nid);
1833         if (!lp) {
1834                 CDEBUG(D_NET, "Push Put from unknown %s (source %s). Ignoring...\n",
1835                        libcfs_nid2str(ev->initiator.nid),
1836                        libcfs_nid2str(ev->source.nid));
1837                 return;
1838         }
1839
1840         /* Ensure peer state remains consistent while we modify it. */
1841         spin_lock(&lp->lp_lock);
1842
1843         /*
1844          * If some kind of error happened the contents of the message
1845          * cannot be used. Clear the NIDS_UPTODATE and set the
1846          * FORCE_PING flag to trigger a ping.
1847          */
1848         if (ev->status) {
1849                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1850                 lp->lp_state |= LNET_PEER_FORCE_PING;
1851                 CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n",
1852                        ev->status,
1853                        libcfs_nid2str(lp->lp_primary_nid),
1854                        libcfs_nid2str(ev->source.nid));
1855                 goto out;
1856         }
1857
1858         /*
1859          * A push with invalid or corrupted info. Clear the UPTODATE
1860          * flag to trigger a ping.
1861          */
1862         if (lnet_ping_info_validate(&pbuf->pb_info)) {
1863                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1864                 lp->lp_state |= LNET_PEER_FORCE_PING;
1865                 CDEBUG(D_NET, "Corrupted Push from %s\n",
1866                        libcfs_nid2str(lp->lp_primary_nid));
1867                 goto out;
1868         }
1869
1870         /*
1871          * Make sure we'll allocate the correct size ping buffer when
1872          * pinging the peer.
1873          */
1874         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
1875                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
1876
1877         /*
1878          * A non-Multi-Rail peer is not supposed to be capable of
1879          * sending a push.
1880          */
1881         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) {
1882                 CERROR("Push from non-Multi-Rail peer %s dropped\n",
1883                        libcfs_nid2str(lp->lp_primary_nid));
1884                 goto out;
1885         }
1886
1887         /*
1888          * Check the MULTIRAIL flag. Complain if the peer was DLC
1889          * configured without it.
1890          */
1891         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1892                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1893                         CERROR("Push says %s is Multi-Rail, DLC says not\n",
1894                                libcfs_nid2str(lp->lp_primary_nid));
1895                 } else {
1896                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1897                         lnet_peer_clr_non_mr_pref_nids(lp);
1898                 }
1899         }
1900
1901         /*
1902          * The peer may have discovery disabled at its end. Set
1903          * NO_DISCOVERY as appropriate.
1904          */
1905         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
1906                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
1907                        libcfs_nid2str(lp->lp_primary_nid));
1908                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
1909         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1910                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
1911                        libcfs_nid2str(lp->lp_primary_nid));
1912                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
1913         }
1914
1915         /*
1916          * Check for truncation of the Put message. Clear the
1917          * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping,
1918          * and tell discovery to allocate a bigger buffer.
1919          */
1920         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
1921                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
1922                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
1923                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1924                 lp->lp_state |= LNET_PEER_FORCE_PING;
1925                 CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n",
1926                        libcfs_nid2str(lp->lp_primary_nid),
1927                        pbuf->pb_info.pi_nnis);
1928                 goto out;
1929         }
1930
1931         /* always assume new data */
1932         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
1933         lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1934
1935         /*
1936          * If there is data present that hasn't been processed yet,
1937          * we'll replace it if the Put contained newer data and it
1938          * fits. We're racing with a Ping or earlier Push in this
1939          * case.
1940          */
1941         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
1942                 if (LNET_PING_BUFFER_SEQNO(pbuf) >
1943                         LNET_PING_BUFFER_SEQNO(lp->lp_data) &&
1944                     pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) {
1945                         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1946                                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1947                         CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n",
1948                               libcfs_nid2str(lp->lp_primary_nid),
1949                               LNET_PING_BUFFER_SEQNO(pbuf),
1950                               LNET_PING_BUFFER_SEQNO(lp->lp_data));
1951                 }
1952                 goto out;
1953         }
1954
1955         /*
1956          * Allocate a buffer to copy the data. On a failure we drop
1957          * the Push and set FORCE_PING to force the discovery
1958          * thread to fix the problem by pinging the peer.
1959          */
1960         lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC);
1961         if (!lp->lp_data) {
1962                 lp->lp_state |= LNET_PEER_FORCE_PING;
1963                 CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n",
1964                        libcfs_nid2str(lp->lp_primary_nid),
1965                        LNET_PING_BUFFER_SEQNO(pbuf));
1966                 goto out;
1967         }
1968
1969         /* Success */
1970         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1971                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1972         lp->lp_state |= LNET_PEER_DATA_PRESENT;
1973         CDEBUG(D_NET, "Received Push %s %u\n",
1974                libcfs_nid2str(lp->lp_primary_nid),
1975                LNET_PING_BUFFER_SEQNO(pbuf));
1976
1977 out:
1978         /*
1979          * Queue the peer for discovery if not done, force it on the request
1980          * queue and wake the discovery thread if the peer was already queued,
1981          * because its status changed.
1982          */
1983         spin_unlock(&lp->lp_lock);
1984         lnet_net_lock(LNET_LOCK_EX);
1985         if (!lnet_peer_is_uptodate(lp) && lnet_peer_queue_for_discovery(lp)) {
1986                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1987                 wake_up(&the_lnet.ln_dc_waitq);
1988         }
1989         /* Drop refcount from lookup */
1990         lnet_peer_decref_locked(lp);
1991         lnet_net_unlock(LNET_LOCK_EX);
1992 }
1993
1994 /*
1995  * Clear the discovery error state, unless we're already discovering
1996  * this peer, in which case the error is current.
1997  */
1998 static void lnet_peer_clear_discovery_error(struct lnet_peer *lp)
1999 {
2000         spin_lock(&lp->lp_lock);
2001         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
2002                 lp->lp_dc_error = 0;
2003         spin_unlock(&lp->lp_lock);
2004 }
2005
2006 /*
2007  * Peer discovery slow path. The ln_api_mutex is held on entry, and
2008  * dropped/retaken within this function. An lnet_peer_ni is passed in
2009  * because discovery could tear down an lnet_peer.
2010  */
2011 int
2012 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block)
2013 {
2014         DEFINE_WAIT(wait);
2015         struct lnet_peer *lp;
2016         int rc = 0;
2017
2018 again:
2019         lnet_net_unlock(cpt);
2020         lnet_net_lock(LNET_LOCK_EX);
2021         lp = lpni->lpni_peer_net->lpn_peer;
2022         lnet_peer_clear_discovery_error(lp);
2023
2024         /*
2025          * We're willing to be interrupted. The lpni can become a
2026          * zombie if we race with DLC, so we must check for that.
2027          */
2028         for (;;) {
2029                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
2030                 if (signal_pending(current))
2031                         break;
2032                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2033                         break;
2034                 if (lp->lp_dc_error)
2035                         break;
2036                 if (lnet_peer_is_uptodate(lp))
2037                         break;
2038                 lnet_peer_queue_for_discovery(lp);
2039                 /*
2040                  * if caller requested a non-blocking operation then
2041                  * return immediately. Once discovery is complete then the
2042                  * peer ref will be decremented and any pending messages
2043                  * that were stopped due to discovery will be transmitted.
2044                  */
2045                 if (!block)
2046                         break;
2047
2048                 lnet_peer_addref_locked(lp);
2049                 lnet_net_unlock(LNET_LOCK_EX);
2050                 schedule();
2051                 finish_wait(&lp->lp_dc_waitq, &wait);
2052                 lnet_net_lock(LNET_LOCK_EX);
2053                 lnet_peer_decref_locked(lp);
2054                 /* Peer may have changed */
2055                 lp = lpni->lpni_peer_net->lpn_peer;
2056         }
2057         finish_wait(&lp->lp_dc_waitq, &wait);
2058
2059         lnet_net_unlock(LNET_LOCK_EX);
2060         lnet_net_lock(cpt);
2061
2062         /*
2063          * If the peer has changed after we've discovered the older peer,
2064          * then we need to discovery the new peer to make sure the
2065          * interface information is up to date
2066          */
2067         if (lp != lpni->lpni_peer_net->lpn_peer)
2068                 goto again;
2069
2070         if (signal_pending(current))
2071                 rc = -EINTR;
2072         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2073                 rc = -ESHUTDOWN;
2074         else if (lp->lp_dc_error)
2075                 rc = lp->lp_dc_error;
2076         else if (!block)
2077                 CDEBUG(D_NET, "non-blocking discovery\n");
2078         else if (!lnet_peer_is_uptodate(lp))
2079                 goto again;
2080
2081         CDEBUG(D_NET, "peer %s NID %s: %d. %s\n",
2082                (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"),
2083                libcfs_nid2str(lpni->lpni_nid), rc,
2084                (!block) ? "pending discovery" : "discovery complete");
2085
2086         return rc;
2087 }
2088
2089 /* Handle an incoming ack for a push. */
2090 static void
2091 lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev)
2092 {
2093         struct lnet_ping_buffer *pbuf;
2094
2095         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2096         spin_lock(&lp->lp_lock);
2097         lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2098         lp->lp_push_error = ev->status;
2099         if (ev->status)
2100                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2101         else
2102                 lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2103         spin_unlock(&lp->lp_lock);
2104
2105         CDEBUG(D_NET, "peer %s ev->status %d\n",
2106                libcfs_nid2str(lp->lp_primary_nid), ev->status);
2107 }
2108
2109 /* Handle a Reply message. This is the reply to a Ping message. */
2110 static void
2111 lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
2112 {
2113         struct lnet_ping_buffer *pbuf;
2114         int rc;
2115
2116         spin_lock(&lp->lp_lock);
2117
2118         /*
2119          * If some kind of error happened the contents of message
2120          * cannot be used. Set PING_FAILED to trigger a retry.
2121          */
2122         if (ev->status) {
2123                 lp->lp_state |= LNET_PEER_PING_FAILED;
2124                 lp->lp_ping_error = ev->status;
2125                 CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n",
2126                        ev->status,
2127                        libcfs_nid2str(lp->lp_primary_nid),
2128                        libcfs_nid2str(ev->source.nid));
2129                 goto out;
2130         }
2131
2132         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2133         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
2134                 lnet_swap_pinginfo(pbuf);
2135
2136         /*
2137          * A reply with invalid or corrupted info. Set PING_FAILED to
2138          * trigger a retry.
2139          */
2140         rc = lnet_ping_info_validate(&pbuf->pb_info);
2141         if (rc) {
2142                 lp->lp_state |= LNET_PEER_PING_FAILED;
2143                 lp->lp_ping_error = 0;
2144                 CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n",
2145                        libcfs_nid2str(lp->lp_primary_nid), rc);
2146                 goto out;
2147         }
2148
2149         /*
2150          * Update the MULTI_RAIL flag based on the reply. If the peer
2151          * was configured with DLC then the setting should match what
2152          * DLC put in.
2153          */
2154         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
2155                 if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2156                         /* Everything's fine */
2157                 } else if (lp->lp_state & LNET_PEER_CONFIGURED) {
2158                         CWARN("Reply says %s is Multi-Rail, DLC says not\n",
2159                               libcfs_nid2str(lp->lp_primary_nid));
2160                 } else {
2161                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
2162                         lnet_peer_clr_non_mr_pref_nids(lp);
2163                 }
2164         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2165                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
2166                         CWARN("DLC says %s is Multi-Rail, Reply says not\n",
2167                               libcfs_nid2str(lp->lp_primary_nid));
2168                 } else {
2169                         CERROR("Multi-Rail state vanished from %s\n",
2170                                libcfs_nid2str(lp->lp_primary_nid));
2171                         lp->lp_state &= ~LNET_PEER_MULTI_RAIL;
2172                 }
2173         }
2174
2175         /*
2176          * Make sure we'll allocate the correct size ping buffer when
2177          * pinging the peer.
2178          */
2179         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
2180                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
2181
2182         /*
2183          * The peer may have discovery disabled at its end. Set
2184          * NO_DISCOVERY as appropriate.
2185          */
2186         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
2187                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
2188                        libcfs_nid2str(lp->lp_primary_nid));
2189                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
2190         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
2191                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
2192                        libcfs_nid2str(lp->lp_primary_nid));
2193                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
2194         }
2195
2196         /*
2197          * Check for truncation of the Reply. Clear PING_SENT and set
2198          * PING_FAILED to trigger a retry.
2199          */
2200         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
2201                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
2202                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
2203                 lp->lp_state |= LNET_PEER_PING_FAILED;
2204                 lp->lp_ping_error = 0;
2205                 CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n",
2206                        libcfs_nid2str(lp->lp_primary_nid),
2207                        pbuf->pb_info.pi_nnis);
2208                 goto out;
2209         }
2210
2211         /*
2212          * Check the sequence numbers in the reply. These are only
2213          * available if the reply came from a Multi-Rail peer.
2214          */
2215         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
2216             pbuf->pb_info.pi_nnis > 1 &&
2217             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
2218                 if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno)
2219                         CDEBUG(D_NET, "peer %s: seq# got %u have %u. peer rebooted?\n",
2220                                 libcfs_nid2str(lp->lp_primary_nid),
2221                                 LNET_PING_BUFFER_SEQNO(pbuf),
2222                                 lp->lp_peer_seqno);
2223
2224                 lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2225         }
2226
2227         /* We're happy with the state of the data in the buffer. */
2228         CDEBUG(D_NET, "peer %s data present %u\n",
2229                libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno);
2230         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
2231                 lnet_ping_buffer_decref(lp->lp_data);
2232         else
2233                 lp->lp_state |= LNET_PEER_DATA_PRESENT;
2234         lnet_ping_buffer_addref(pbuf);
2235         lp->lp_data = pbuf;
2236 out:
2237         lp->lp_state &= ~LNET_PEER_PING_SENT;
2238         spin_unlock(&lp->lp_lock);
2239 }
2240
2241 /*
2242  * Send event handling. Only matters for error cases, where we clean
2243  * up state on the peer and peer_ni that would otherwise be updated in
2244  * the REPLY event handler for a successful Ping, and the ACK event
2245  * handler for a successful Push.
2246  */
2247 static int
2248 lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev)
2249 {
2250         int rc = 0;
2251
2252         if (!ev->status)
2253                 goto out;
2254
2255         spin_lock(&lp->lp_lock);
2256         if (ev->msg_type == LNET_MSG_GET) {
2257                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2258                 lp->lp_state |= LNET_PEER_PING_FAILED;
2259                 lp->lp_ping_error = ev->status;
2260         } else { /* ev->msg_type == LNET_MSG_PUT */
2261                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2262                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2263                 lp->lp_push_error = ev->status;
2264         }
2265         spin_unlock(&lp->lp_lock);
2266         rc = LNET_REDISCOVER_PEER;
2267 out:
2268         CDEBUG(D_NET, "%s Send to %s: %d\n",
2269                 (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"),
2270                 libcfs_nid2str(ev->target.nid), rc);
2271         return rc;
2272 }
2273
2274 /*
2275  * Unlink event handling. This event is only seen if a call to
2276  * LNetMDUnlink() caused the event to be unlinked. If this call was
2277  * made after the event was set up in LNetGet() or LNetPut() then we
2278  * assume the Ping or Push timed out.
2279  */
2280 static void
2281 lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev)
2282 {
2283         spin_lock(&lp->lp_lock);
2284         /* We've passed through LNetGet() */
2285         if (lp->lp_state & LNET_PEER_PING_SENT) {
2286                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2287                 lp->lp_state |= LNET_PEER_PING_FAILED;
2288                 lp->lp_ping_error = -ETIMEDOUT;
2289                 CDEBUG(D_NET, "Ping Unlink for message to peer %s\n",
2290                         libcfs_nid2str(lp->lp_primary_nid));
2291         }
2292         /* We've passed through LNetPut() */
2293         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2294                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2295                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2296                 lp->lp_push_error = -ETIMEDOUT;
2297                 CDEBUG(D_NET, "Push Unlink for message to peer %s\n",
2298                         libcfs_nid2str(lp->lp_primary_nid));
2299         }
2300         spin_unlock(&lp->lp_lock);
2301 }
2302
2303 /*
2304  * Event handler for the discovery EQ.
2305  *
2306  * Called with lnet_res_lock(cpt) held. The cpt is the
2307  * lnet_cpt_of_cookie() of the md handle cookie.
2308  */
2309 static void lnet_discovery_event_handler(struct lnet_event *event)
2310 {
2311         struct lnet_peer *lp = event->md.user_ptr;
2312         struct lnet_ping_buffer *pbuf;
2313         int rc;
2314
2315         /* discovery needs to take another look */
2316         rc = LNET_REDISCOVER_PEER;
2317
2318         CDEBUG(D_NET, "Received event: %d\n", event->type);
2319
2320         switch (event->type) {
2321         case LNET_EVENT_ACK:
2322                 lnet_discovery_event_ack(lp, event);
2323                 break;
2324         case LNET_EVENT_REPLY:
2325                 lnet_discovery_event_reply(lp, event);
2326                 break;
2327         case LNET_EVENT_SEND:
2328                 /* Only send failure triggers a retry. */
2329                 rc = lnet_discovery_event_send(lp, event);
2330                 break;
2331         case LNET_EVENT_UNLINK:
2332                 /* LNetMDUnlink() was called */
2333                 lnet_discovery_event_unlink(lp, event);
2334                 break;
2335         default:
2336                 /* Invalid events. */
2337                 LBUG();
2338         }
2339         lnet_net_lock(LNET_LOCK_EX);
2340         if (event->unlinked) {
2341                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
2342                 lnet_ping_buffer_decref(pbuf);
2343                 lnet_peer_decref_locked(lp);
2344         }
2345
2346         /* put peer back at end of request queue, if discovery not already
2347          * done */
2348         if (rc == LNET_REDISCOVER_PEER && !lnet_peer_is_uptodate(lp)) {
2349                 list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2350                 wake_up(&the_lnet.ln_dc_waitq);
2351         }
2352         lnet_net_unlock(LNET_LOCK_EX);
2353 }
2354
2355 /*
2356  * Build a peer from incoming data.
2357  *
2358  * The NIDs in the incoming data are supposed to be structured as follows:
2359  *  - loopback
2360  *  - primary NID
2361  *  - other NIDs in same net
2362  *  - NIDs in second net
2363  *  - NIDs in third net
2364  *  - ...
2365  * This due to the way the list of NIDs in the data is created.
2366  *
2367  * Note that this function will mark the peer uptodate unless an
2368  * ENOMEM is encontered. All other errors are due to a conflict
2369  * between the DLC configuration and what discovery sees. We treat DLC
2370  * as binding, and therefore set the NIDS_UPTODATE flag to prevent the
2371  * peer from becoming stuck in discovery.
2372  */
2373 static int lnet_peer_merge_data(struct lnet_peer *lp,
2374                                 struct lnet_ping_buffer *pbuf)
2375 {
2376         struct lnet_peer_ni *lpni;
2377         lnet_nid_t *curnis = NULL;
2378         lnet_nid_t *addnis = NULL;
2379         lnet_nid_t *delnis = NULL;
2380         unsigned flags;
2381         int ncurnis;
2382         int naddnis;
2383         int ndelnis;
2384         int nnis = 0;
2385         int i;
2386         int j;
2387         int rc;
2388
2389         flags = LNET_PEER_DISCOVERED;
2390         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2391                 flags |= LNET_PEER_MULTI_RAIL;
2392
2393         nnis = MAX(lp->lp_nnis, pbuf->pb_info.pi_nnis);
2394         LIBCFS_ALLOC(curnis, nnis * sizeof(lnet_nid_t));
2395         LIBCFS_ALLOC(addnis, nnis * sizeof(lnet_nid_t));
2396         LIBCFS_ALLOC(delnis, nnis * sizeof(lnet_nid_t));
2397         if (!curnis || !addnis || !delnis) {
2398                 rc = -ENOMEM;
2399                 goto out;
2400         }
2401         ncurnis = 0;
2402         naddnis = 0;
2403         ndelnis = 0;
2404
2405         /* Construct the list of NIDs present in peer. */
2406         lpni = NULL;
2407         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
2408                 curnis[ncurnis++] = lpni->lpni_nid;
2409
2410         /*
2411          * Check for NIDs in pbuf not present in curnis[].
2412          * The loop starts at 1 to skip the loopback NID.
2413          */
2414         for (i = 1; i < pbuf->pb_info.pi_nnis; i++) {
2415                 for (j = 0; j < ncurnis; j++)
2416                         if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
2417                                 break;
2418                 if (j == ncurnis)
2419                         addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid;
2420         }
2421         /*
2422          * Check for NIDs in curnis[] not present in pbuf.
2423          * The nested loop starts at 1 to skip the loopback NID.
2424          *
2425          * But never add the loopback NID to delnis[]: if it is
2426          * present in curnis[] then this peer is for this node.
2427          */
2428         for (i = 0; i < ncurnis; i++) {
2429                 if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND)
2430                         continue;
2431                 for (j = 1; j < pbuf->pb_info.pi_nnis; j++)
2432                         if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid)
2433                                 break;
2434                 if (j == pbuf->pb_info.pi_nnis)
2435                         delnis[ndelnis++] = curnis[i];
2436         }
2437
2438         for (i = 0; i < naddnis; i++) {
2439                 rc = lnet_peer_add_nid(lp, addnis[i], flags);
2440                 if (rc) {
2441                         CERROR("Error adding NID %s to peer %s: %d\n",
2442                                libcfs_nid2str(addnis[i]),
2443                                libcfs_nid2str(lp->lp_primary_nid), rc);
2444                         if (rc == -ENOMEM)
2445                                 goto out;
2446                 }
2447         }
2448         for (i = 0; i < ndelnis; i++) {
2449                 rc = lnet_peer_del_nid(lp, delnis[i], flags);
2450                 if (rc) {
2451                         CERROR("Error deleting NID %s from peer %s: %d\n",
2452                                libcfs_nid2str(delnis[i]),
2453                                libcfs_nid2str(lp->lp_primary_nid), rc);
2454                         if (rc == -ENOMEM)
2455                                 goto out;
2456                 }
2457         }
2458         /*
2459          * Errors other than -ENOMEM are due to peers having been
2460          * configured with DLC. Ignore these because DLC overrides
2461          * Discovery.
2462          */
2463         rc = 0;
2464 out:
2465         LIBCFS_FREE(curnis, nnis * sizeof(lnet_nid_t));
2466         LIBCFS_FREE(addnis, nnis * sizeof(lnet_nid_t));
2467         LIBCFS_FREE(delnis, nnis * sizeof(lnet_nid_t));
2468         lnet_ping_buffer_decref(pbuf);
2469         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2470
2471         if (rc) {
2472                 spin_lock(&lp->lp_lock);
2473                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2474                 lp->lp_state |= LNET_PEER_FORCE_PING;
2475                 spin_unlock(&lp->lp_lock);
2476         }
2477         return rc;
2478 }
2479
2480 /*
2481  * The data in pbuf says lp is its primary peer, but the data was
2482  * received by a different peer. Try to update lp with the data.
2483  */
2484 static int
2485 lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
2486 {
2487         struct lnet_handle_md mdh;
2488
2489         /* Queue lp for discovery, and force it on the request queue. */
2490         lnet_net_lock(LNET_LOCK_EX);
2491         if (lnet_peer_queue_for_discovery(lp))
2492                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2493         lnet_net_unlock(LNET_LOCK_EX);
2494
2495         LNetInvalidateMDHandle(&mdh);
2496
2497         /*
2498          * Decide whether we can move the peer to the DATA_PRESENT state.
2499          *
2500          * We replace stale data for a multi-rail peer, repair PING_FAILED
2501          * status, and preempt FORCE_PING.
2502          *
2503          * If after that we have DATA_PRESENT, we merge it into this peer.
2504          */
2505         spin_lock(&lp->lp_lock);
2506         if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2507                 if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) {
2508                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2509                 } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2510                         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2511                         lnet_ping_buffer_decref(pbuf);
2512                         pbuf = lp->lp_data;
2513                         lp->lp_data = NULL;
2514                 }
2515         }
2516         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2517                 lnet_ping_buffer_decref(lp->lp_data);
2518                 lp->lp_data = NULL;
2519                 lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2520         }
2521         if (lp->lp_state & LNET_PEER_PING_FAILED) {
2522                 mdh = lp->lp_ping_mdh;
2523                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2524                 lp->lp_state &= ~LNET_PEER_PING_FAILED;
2525                 lp->lp_ping_error = 0;
2526         }
2527         if (lp->lp_state & LNET_PEER_FORCE_PING)
2528                 lp->lp_state &= ~LNET_PEER_FORCE_PING;
2529         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2530         spin_unlock(&lp->lp_lock);
2531
2532         if (!LNetMDHandleIsInvalid(mdh))
2533                 LNetMDUnlink(mdh);
2534
2535         if (pbuf)
2536                 return lnet_peer_merge_data(lp, pbuf);
2537
2538         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2539         return 0;
2540 }
2541
2542 /*
2543  * Update a peer using the data received.
2544  */
2545 static int lnet_peer_data_present(struct lnet_peer *lp)
2546 __must_hold(&lp->lp_lock)
2547 {
2548         struct lnet_ping_buffer *pbuf;
2549         struct lnet_peer_ni *lpni;
2550         lnet_nid_t nid = LNET_NID_ANY;
2551         unsigned flags;
2552         int rc = 0;
2553
2554         pbuf = lp->lp_data;
2555         lp->lp_data = NULL;
2556         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2557         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2558         spin_unlock(&lp->lp_lock);
2559
2560         /*
2561          * Modifications of peer structures are done while holding the
2562          * ln_api_mutex. A global lock is required because we may be
2563          * modifying multiple peer structures, and a mutex greatly
2564          * simplifies memory management.
2565          *
2566          * The actual changes to the data structures must also protect
2567          * against concurrent lookups, for which the lnet_net_lock in
2568          * LNET_LOCK_EX mode is used.
2569          */
2570         mutex_lock(&the_lnet.ln_api_mutex);
2571         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
2572                 rc = -ESHUTDOWN;
2573                 goto out;
2574         }
2575
2576         /*
2577          * If this peer is not on the peer list then it is being torn
2578          * down, and our reference count may be all that is keeping it
2579          * alive. Don't do any work on it.
2580          */
2581         if (list_empty(&lp->lp_peer_list))
2582                 goto out;
2583
2584         flags = LNET_PEER_DISCOVERED;
2585         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2586                 flags |= LNET_PEER_MULTI_RAIL;
2587
2588         /*
2589          * Check whether the primary NID in the message matches the
2590          * primary NID of the peer. If it does, update the peer, if
2591          * it it does not, check whether there is already a peer with
2592          * that primary NID. If no such peer exists, try to update
2593          * the primary NID of the current peer (allowed if it was
2594          * created due to message traffic) and complete the update.
2595          * If the peer did exist, hand off the data to it.
2596          *
2597          * The peer for the loopback interface is a special case: this
2598          * is the peer for the local node, and we want to set its
2599          * primary NID to the correct value here. Moreover, this peer
2600          * can show up with only the loopback NID in the ping buffer.
2601          */
2602         if (pbuf->pb_info.pi_nnis <= 1)
2603                 goto out;
2604         nid = pbuf->pb_info.pi_ni[1].ns_nid;
2605         if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) {
2606                 rc = lnet_peer_set_primary_nid(lp, nid, flags);
2607                 if (!rc)
2608                         rc = lnet_peer_merge_data(lp, pbuf);
2609         } else if (lp->lp_primary_nid == nid) {
2610                 rc = lnet_peer_merge_data(lp, pbuf);
2611         } else {
2612                 lpni = lnet_find_peer_ni_locked(nid);
2613                 if (!lpni) {
2614                         rc = lnet_peer_set_primary_nid(lp, nid, flags);
2615                         if (rc) {
2616                                 CERROR("Primary NID error %s versus %s: %d\n",
2617                                        libcfs_nid2str(lp->lp_primary_nid),
2618                                        libcfs_nid2str(nid), rc);
2619                         } else {
2620                                 rc = lnet_peer_merge_data(lp, pbuf);
2621                         }
2622                 } else {
2623                         rc = lnet_peer_set_primary_data(
2624                                 lpni->lpni_peer_net->lpn_peer, pbuf);
2625                         lnet_peer_ni_decref_locked(lpni);
2626                 }
2627         }
2628 out:
2629         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2630         mutex_unlock(&the_lnet.ln_api_mutex);
2631
2632         spin_lock(&lp->lp_lock);
2633         /* Tell discovery to re-check the peer immediately. */
2634         if (!rc)
2635                 rc = LNET_REDISCOVER_PEER;
2636         return rc;
2637 }
2638
2639 /*
2640  * A ping failed. Clear the PING_FAILED state and set the
2641  * FORCE_PING state, to ensure a retry even if discovery is
2642  * disabled. This avoids being left with incorrect state.
2643  */
2644 static int lnet_peer_ping_failed(struct lnet_peer *lp)
2645 __must_hold(&lp->lp_lock)
2646 {
2647         struct lnet_handle_md mdh;
2648         int rc;
2649
2650         mdh = lp->lp_ping_mdh;
2651         LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2652         lp->lp_state &= ~LNET_PEER_PING_FAILED;
2653         lp->lp_state |= LNET_PEER_FORCE_PING;
2654         rc = lp->lp_ping_error;
2655         lp->lp_ping_error = 0;
2656         spin_unlock(&lp->lp_lock);
2657
2658         if (!LNetMDHandleIsInvalid(mdh))
2659                 LNetMDUnlink(mdh);
2660
2661         CDEBUG(D_NET, "peer %s:%d\n",
2662                libcfs_nid2str(lp->lp_primary_nid), rc);
2663
2664         spin_lock(&lp->lp_lock);
2665         return rc ? rc : LNET_REDISCOVER_PEER;
2666 }
2667
2668 /*
2669  * Select NID to send a Ping or Push to.
2670  */
2671 static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
2672 {
2673         struct lnet_peer_ni *lpni;
2674
2675         /* Look for a direct-connected NID for this peer. */
2676         lpni = NULL;
2677         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2678                 if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
2679                         continue;
2680                 break;
2681         }
2682         if (lpni)
2683                 return lpni->lpni_nid;
2684
2685         /* Look for a routed-connected NID for this peer. */
2686         lpni = NULL;
2687         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2688                 if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
2689                         continue;
2690                 break;
2691         }
2692         if (lpni)
2693                 return lpni->lpni_nid;
2694
2695         return LNET_NID_ANY;
2696 }
2697
2698 /* Active side of ping. */
2699 static int lnet_peer_send_ping(struct lnet_peer *lp)
2700 __must_hold(&lp->lp_lock)
2701 {
2702         lnet_nid_t pnid;
2703         int nnis;
2704         int rc;
2705         int cpt;
2706
2707         lp->lp_state |= LNET_PEER_PING_SENT;
2708         lp->lp_state &= ~LNET_PEER_FORCE_PING;
2709         spin_unlock(&lp->lp_lock);
2710
2711         cpt = lnet_net_lock_current();
2712         /* Refcount for MD. */
2713         lnet_peer_addref_locked(lp);
2714         pnid = lnet_peer_select_nid(lp);
2715         lnet_net_unlock(cpt);
2716
2717         nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN);
2718
2719         rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp,
2720                             the_lnet.ln_dc_eqh, false);
2721
2722         /*
2723          * if LNetMDBind in lnet_send_ping fails we need to decrement the
2724          * refcount on the peer, otherwise LNetMDUnlink will be called
2725          * which will eventually do that.
2726          */
2727         if (rc > 0) {
2728                 lnet_net_lock(cpt);
2729                 lnet_peer_decref_locked(lp);
2730                 lnet_net_unlock(cpt);
2731                 rc = -rc; /* change the rc to negative value */
2732                 goto fail_error;
2733         } else if (rc < 0) {
2734                 goto fail_error;
2735         }
2736
2737         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2738
2739         spin_lock(&lp->lp_lock);
2740         return 0;
2741
2742 fail_error:
2743         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2744         /*
2745          * The errors that get us here are considered hard errors and
2746          * cause Discovery to terminate. So we clear PING_SENT, but do
2747          * not set either PING_FAILED or FORCE_PING. In fact we need
2748          * to clear PING_FAILED, because the unlink event handler will
2749          * have set it if we called LNetMDUnlink() above.
2750          */
2751         spin_lock(&lp->lp_lock);
2752         lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED);
2753         return rc;
2754 }
2755
2756 /*
2757  * This function exists because you cannot call LNetMDUnlink() from an
2758  * event handler.
2759  */
2760 static int lnet_peer_push_failed(struct lnet_peer *lp)
2761 __must_hold(&lp->lp_lock)
2762 {
2763         struct lnet_handle_md mdh;
2764         int rc;
2765
2766         mdh = lp->lp_push_mdh;
2767         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2768         lp->lp_state &= ~LNET_PEER_PUSH_FAILED;
2769         rc = lp->lp_push_error;
2770         lp->lp_push_error = 0;
2771         spin_unlock(&lp->lp_lock);
2772
2773         if (!LNetMDHandleIsInvalid(mdh))
2774                 LNetMDUnlink(mdh);
2775
2776         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2777         spin_lock(&lp->lp_lock);
2778         return rc ? rc : LNET_REDISCOVER_PEER;
2779 }
2780
2781 /* Active side of push. */
2782 static int lnet_peer_send_push(struct lnet_peer *lp)
2783 __must_hold(&lp->lp_lock)
2784 {
2785         struct lnet_ping_buffer *pbuf;
2786         struct lnet_process_id id;
2787         struct lnet_md md;
2788         int cpt;
2789         int rc;
2790
2791         /* Don't push to a non-multi-rail peer. */
2792         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
2793                 lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2794                 return 0;
2795         }
2796
2797         lp->lp_state |= LNET_PEER_PUSH_SENT;
2798         lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2799         spin_unlock(&lp->lp_lock);
2800
2801         cpt = lnet_net_lock_current();
2802         pbuf = the_lnet.ln_ping_target;
2803         lnet_ping_buffer_addref(pbuf);
2804         lnet_net_unlock(cpt);
2805
2806         /* Push source MD */
2807         md.start     = &pbuf->pb_info;
2808         md.length    = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
2809         md.threshold = 2; /* Put/Ack */
2810         md.max_size  = 0;
2811         md.options   = 0;
2812         md.eq_handle = the_lnet.ln_dc_eqh;
2813         md.user_ptr  = lp;
2814
2815         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
2816         if (rc) {
2817                 lnet_ping_buffer_decref(pbuf);
2818                 CERROR("Can't bind push source MD: %d\n", rc);
2819                 goto fail_error;
2820         }
2821         cpt = lnet_net_lock_current();
2822         /* Refcount for MD. */
2823         lnet_peer_addref_locked(lp);
2824         id.pid = LNET_PID_LUSTRE;
2825         id.nid = lnet_peer_select_nid(lp);
2826         lnet_net_unlock(cpt);
2827
2828         if (id.nid == LNET_NID_ANY) {
2829                 rc = -EHOSTUNREACH;
2830                 goto fail_unlink;
2831         }
2832
2833         rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh,
2834                      LNET_ACK_REQ, id, LNET_RESERVED_PORTAL,
2835                      LNET_PROTO_PING_MATCHBITS, 0, 0);
2836
2837         if (rc)
2838                 goto fail_unlink;
2839
2840         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2841
2842         spin_lock(&lp->lp_lock);
2843         return 0;
2844
2845 fail_unlink:
2846         LNetMDUnlink(lp->lp_push_mdh);
2847         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2848 fail_error:
2849         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2850         /*
2851          * The errors that get us here are considered hard errors and
2852          * cause Discovery to terminate. So we clear PUSH_SENT, but do
2853          * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED,
2854          * because the unlink event handler will have set it if we
2855          * called LNetMDUnlink() above.
2856          */
2857         spin_lock(&lp->lp_lock);
2858         lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED);
2859         return rc;
2860 }
2861
2862 /*
2863  * An unrecoverable error was encountered during discovery.
2864  * Set error status in peer and abort discovery.
2865  */
2866 static void lnet_peer_discovery_error(struct lnet_peer *lp, int error)
2867 {
2868         CDEBUG(D_NET, "Discovery error %s: %d\n",
2869                libcfs_nid2str(lp->lp_primary_nid), error);
2870
2871         spin_lock(&lp->lp_lock);
2872         lp->lp_dc_error = error;
2873         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2874         lp->lp_state |= LNET_PEER_REDISCOVER;
2875         spin_unlock(&lp->lp_lock);
2876 }
2877
2878 /*
2879  * Mark the peer as discovered.
2880  */
2881 static int lnet_peer_discovered(struct lnet_peer *lp)
2882 __must_hold(&lp->lp_lock)
2883 {
2884         lp->lp_state |= LNET_PEER_DISCOVERED;
2885         lp->lp_state &= ~(LNET_PEER_DISCOVERING |
2886                           LNET_PEER_REDISCOVER);
2887
2888         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2889
2890         return 0;
2891 }
2892
2893 /*
2894  * Mark the peer as to be rediscovered.
2895  */
2896 static int lnet_peer_rediscover(struct lnet_peer *lp)
2897 __must_hold(&lp->lp_lock)
2898 {
2899         lp->lp_state |= LNET_PEER_REDISCOVER;
2900         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2901
2902         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2903
2904         return 0;
2905 }
2906
2907 /*
2908  * Discovering this peer is taking too long. Cancel any Ping or Push
2909  * that discovery is waiting on by unlinking the relevant MDs. The
2910  * lnet_discovery_event_handler() will proceed from here and complete
2911  * the cleanup.
2912  */
2913 static void lnet_peer_cancel_discovery(struct lnet_peer *lp)
2914 {
2915         struct lnet_handle_md ping_mdh;
2916         struct lnet_handle_md push_mdh;
2917
2918         LNetInvalidateMDHandle(&ping_mdh);
2919         LNetInvalidateMDHandle(&push_mdh);
2920
2921         spin_lock(&lp->lp_lock);
2922         if (lp->lp_state & LNET_PEER_PING_SENT) {
2923                 ping_mdh = lp->lp_ping_mdh;
2924                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2925         }
2926         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2927                 push_mdh = lp->lp_push_mdh;
2928                 LNetInvalidateMDHandle(&lp->lp_push_mdh);
2929         }
2930         spin_unlock(&lp->lp_lock);
2931
2932         if (!LNetMDHandleIsInvalid(ping_mdh))
2933                 LNetMDUnlink(ping_mdh);
2934         if (!LNetMDHandleIsInvalid(push_mdh))
2935                 LNetMDUnlink(push_mdh);
2936 }
2937
2938 /*
2939  * Wait for work to be queued or some other change that must be
2940  * attended to. Returns non-zero if the discovery thread should shut
2941  * down.
2942  */
2943 static int lnet_peer_discovery_wait_for_work(void)
2944 {
2945         int cpt;
2946         int rc = 0;
2947
2948         DEFINE_WAIT(wait);
2949
2950         cpt = lnet_net_lock_current();
2951         for (;;) {
2952                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
2953                                 TASK_INTERRUPTIBLE);
2954                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
2955                         break;
2956                 if (lnet_push_target_resize_needed())
2957                         break;
2958                 if (!list_empty(&the_lnet.ln_dc_request))
2959                         break;
2960                 if (!list_empty(&the_lnet.ln_msg_resend))
2961                         break;
2962                 lnet_net_unlock(cpt);
2963
2964                 /*
2965                  * wakeup max every second to check if there are peers that
2966                  * have been stuck on the working queue for greater than
2967                  * the peer timeout.
2968                  */
2969                 schedule_timeout(cfs_time_seconds(1));
2970                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
2971                 cpt = lnet_net_lock_current();
2972         }
2973         finish_wait(&the_lnet.ln_dc_waitq, &wait);
2974
2975         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
2976                 rc = -ESHUTDOWN;
2977
2978         lnet_net_unlock(cpt);
2979
2980         CDEBUG(D_NET, "woken: %d\n", rc);
2981
2982         return rc;
2983 }
2984
2985 /*
2986  * Messages that were pending on a destroyed peer will be put on a global
2987  * resend list. The message resend list will be checked by
2988  * the discovery thread when it wakes up, and will resend messages. These
2989  * messages can still be sendable in the case the lpni which was the initial
2990  * cause of the message re-queue was transfered to another peer.
2991  *
2992  * It is possible that LNet could be shutdown while we're iterating
2993  * through the list. lnet_shudown_lndnets() will attempt to access the
2994  * resend list, but will have to wait until the spinlock is released, by
2995  * which time there shouldn't be any more messages on the resend list.
2996  * During shutdown lnet_send() will fail and lnet_finalize() will be called
2997  * for the messages so they can be released. The other case is that
2998  * lnet_shudown_lndnets() can finalize all the messages before this
2999  * function can visit the resend list, in which case this function will be
3000  * a no-op.
3001  */
3002 static void lnet_resend_msgs(void)
3003 {
3004         struct lnet_msg *msg, *tmp;
3005         struct list_head resend;
3006         int rc;
3007
3008         INIT_LIST_HEAD(&resend);
3009
3010         spin_lock(&the_lnet.ln_msg_resend_lock);
3011         list_splice(&the_lnet.ln_msg_resend, &resend);
3012         spin_unlock(&the_lnet.ln_msg_resend_lock);
3013
3014         list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
3015                 list_del_init(&msg->msg_list);
3016                 rc = lnet_send(msg->msg_src_nid_param, msg,
3017                                msg->msg_rtr_nid_param);
3018                 if (rc < 0) {
3019                         CNETERR("Error sending %s to %s: %d\n",
3020                                lnet_msgtyp2str(msg->msg_type),
3021                                libcfs_id2str(msg->msg_target), rc);
3022                         lnet_finalize(msg, rc);
3023                 }
3024         }
3025 }
3026
3027 /* The discovery thread. */
3028 static int lnet_peer_discovery(void *arg)
3029 {
3030         struct lnet_peer *lp;
3031         int rc;
3032
3033         CDEBUG(D_NET, "started\n");
3034         cfs_block_allsigs();
3035
3036         for (;;) {
3037                 if (lnet_peer_discovery_wait_for_work())
3038                         break;
3039
3040                 lnet_resend_msgs();
3041
3042                 if (lnet_push_target_resize_needed())
3043                         lnet_push_target_resize();
3044
3045                 lnet_net_lock(LNET_LOCK_EX);
3046                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3047                         break;
3048
3049                 /*
3050                  * Process all incoming discovery work requests.  When
3051                  * discovery must wait on a peer to change state, it
3052                  * is added to the tail of the ln_dc_working queue. A
3053                  * timestamp keeps track of when the peer was added,
3054                  * so we can time out discovery requests that take too
3055                  * long.
3056                  */
3057                 while (!list_empty(&the_lnet.ln_dc_request)) {
3058                         lp = list_first_entry(&the_lnet.ln_dc_request,
3059                                               struct lnet_peer, lp_dc_list);
3060                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
3061                         /*
3062                          * set the time the peer was put on the dc_working
3063                          * queue. It shouldn't remain on the queue
3064                          * forever, in case the GET message (for ping)
3065                          * doesn't get a REPLY or the PUT message (for
3066                          * push) doesn't get an ACK.
3067                          */
3068                         lp->lp_last_queued = ktime_get_real_seconds();
3069                         lnet_net_unlock(LNET_LOCK_EX);
3070
3071                         /*
3072                          * Select an action depending on the state of
3073                          * the peer and whether discovery is disabled.
3074                          * The check whether discovery is disabled is
3075                          * done after the code that handles processing
3076                          * for arrived data, cleanup for failures, and
3077                          * forcing a Ping or Push.
3078                          */
3079                         spin_lock(&lp->lp_lock);
3080                         CDEBUG(D_NET, "peer %s state %#x\n",
3081                                 libcfs_nid2str(lp->lp_primary_nid),
3082                                 lp->lp_state);
3083                         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
3084                                 rc = lnet_peer_data_present(lp);
3085                         else if (lp->lp_state & LNET_PEER_PING_FAILED)
3086                                 rc = lnet_peer_ping_failed(lp);
3087                         else if (lp->lp_state & LNET_PEER_PUSH_FAILED)
3088                                 rc = lnet_peer_push_failed(lp);
3089                         else if (lp->lp_state & LNET_PEER_FORCE_PING)
3090                                 rc = lnet_peer_send_ping(lp);
3091                         else if (lp->lp_state & LNET_PEER_FORCE_PUSH)
3092                                 rc = lnet_peer_send_push(lp);
3093                         else if (lnet_peer_discovery_disabled)
3094                                 rc = lnet_peer_rediscover(lp);
3095                         else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
3096                                 rc = lnet_peer_send_ping(lp);
3097                         else if (lnet_peer_needs_push(lp))
3098                                 rc = lnet_peer_send_push(lp);
3099                         else
3100                                 rc = lnet_peer_discovered(lp);
3101                         CDEBUG(D_NET, "peer %s state %#x rc %d\n",
3102                                 libcfs_nid2str(lp->lp_primary_nid),
3103                                 lp->lp_state, rc);
3104                         spin_unlock(&lp->lp_lock);
3105
3106                         lnet_net_lock(LNET_LOCK_EX);
3107                         if (rc == LNET_REDISCOVER_PEER) {
3108                                 list_move(&lp->lp_dc_list,
3109                                           &the_lnet.ln_dc_request);
3110                         } else if (rc) {
3111                                 lnet_peer_discovery_error(lp, rc);
3112                         }
3113                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
3114                                 lnet_peer_discovery_complete(lp);
3115                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3116                                 break;
3117                 }
3118
3119                 lnet_net_unlock(LNET_LOCK_EX);
3120         }
3121
3122         CDEBUG(D_NET, "stopping\n");
3123         /*
3124          * Clean up before telling lnet_peer_discovery_stop() that
3125          * we're done. Use wake_up() below to somewhat reduce the
3126          * size of the thundering herd if there are multiple threads
3127          * waiting on discovery of a single peer.
3128          */
3129
3130         /* Queue cleanup 1: stop all pending pings and pushes. */
3131         lnet_net_lock(LNET_LOCK_EX);
3132         while (!list_empty(&the_lnet.ln_dc_working)) {
3133                 lp = list_first_entry(&the_lnet.ln_dc_working,
3134                                       struct lnet_peer, lp_dc_list);
3135                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
3136                 lnet_net_unlock(LNET_LOCK_EX);
3137                 lnet_peer_cancel_discovery(lp);
3138                 lnet_net_lock(LNET_LOCK_EX);
3139         }
3140         lnet_net_unlock(LNET_LOCK_EX);
3141
3142         /* Queue cleanup 2: wait for the expired queue to clear. */
3143         while (!list_empty(&the_lnet.ln_dc_expired))
3144                 schedule_timeout(cfs_time_seconds(1));
3145
3146         /* Queue cleanup 3: clear the request queue. */
3147         lnet_net_lock(LNET_LOCK_EX);
3148         while (!list_empty(&the_lnet.ln_dc_request)) {
3149                 lp = list_first_entry(&the_lnet.ln_dc_request,
3150                                       struct lnet_peer, lp_dc_list);
3151                 lnet_peer_discovery_error(lp, -ESHUTDOWN);
3152                 lnet_peer_discovery_complete(lp);
3153         }
3154         lnet_net_unlock(LNET_LOCK_EX);
3155
3156         LNetEQFree(the_lnet.ln_dc_eqh);
3157         LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3158
3159         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3160         wake_up(&the_lnet.ln_dc_waitq);
3161
3162         CDEBUG(D_NET, "stopped\n");
3163
3164         return 0;
3165 }
3166
3167 /* ln_api_mutex is held on entry. */
3168 int lnet_peer_discovery_start(void)
3169 {
3170         struct task_struct *task;
3171         int rc;
3172
3173         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
3174                 return -EALREADY;
3175
3176         rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
3177         if (rc != 0) {
3178                 CERROR("Can't allocate discovery EQ: %d\n", rc);
3179                 return rc;
3180         }
3181
3182         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
3183         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
3184         if (IS_ERR(task)) {
3185                 rc = PTR_ERR(task);
3186                 CERROR("Can't start peer discovery thread: %d\n", rc);
3187
3188                 LNetEQFree(the_lnet.ln_dc_eqh);
3189                 LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3190
3191                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3192         }
3193
3194         CDEBUG(D_NET, "discovery start: %d\n", rc);
3195
3196         return rc;
3197 }
3198
3199 /* ln_api_mutex is held on entry. */
3200 void lnet_peer_discovery_stop(void)
3201 {
3202         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
3203                 return;
3204
3205         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
3206         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
3207         wake_up(&the_lnet.ln_dc_waitq);
3208
3209         wait_event(the_lnet.ln_dc_waitq,
3210                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
3211
3212         LASSERT(list_empty(&the_lnet.ln_dc_request));
3213         LASSERT(list_empty(&the_lnet.ln_dc_working));
3214         LASSERT(list_empty(&the_lnet.ln_dc_expired));
3215
3216         CDEBUG(D_NET, "discovery stopped\n");
3217 }
3218
3219 /* Debugging */
3220
3221 void
3222 lnet_debug_peer(lnet_nid_t nid)
3223 {
3224         char                    *aliveness = "NA";
3225         struct lnet_peer_ni     *lp;
3226         int                     cpt;
3227
3228         cpt = lnet_cpt_of_nid(nid, NULL);
3229         lnet_net_lock(cpt);
3230
3231         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
3232         if (IS_ERR(lp)) {
3233                 lnet_net_unlock(cpt);
3234                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
3235                 return;
3236         }
3237
3238         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
3239                 aliveness = lp->lpni_alive ? "up" : "down";
3240
3241         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
3242                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
3243                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
3244                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
3245                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
3246
3247         lnet_peer_ni_decref_locked(lp);
3248
3249         lnet_net_unlock(cpt);
3250 }
3251
3252 /* Gathering information for userspace. */
3253
3254 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
3255                           char aliveness[LNET_MAX_STR_LEN],
3256                           __u32 *cpt_iter, __u32 *refcount,
3257                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
3258                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
3259                           __u32 *peer_tx_qnob)
3260 {
3261         struct lnet_peer_table          *peer_table;
3262         struct lnet_peer_ni             *lp;
3263         int                             j;
3264         int                             lncpt;
3265         bool                            found = false;
3266
3267         /* get the number of CPTs */
3268         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3269
3270         /* if the cpt number to be examined is >= the number of cpts in
3271          * the system then indicate that there are no more cpts to examin
3272          */
3273         if (*cpt_iter >= lncpt)
3274                 return -ENOENT;
3275
3276         /* get the current table */
3277         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
3278         /* if the ptable is NULL then there are no more cpts to examine */
3279         if (peer_table == NULL)
3280                 return -ENOENT;
3281
3282         lnet_net_lock(*cpt_iter);
3283
3284         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
3285                 struct list_head *peers = &peer_table->pt_hash[j];
3286
3287                 list_for_each_entry(lp, peers, lpni_hashlist) {
3288                         if (peer_index-- > 0)
3289                                 continue;
3290
3291                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
3292                         if (lnet_isrouter(lp) ||
3293                                 lnet_peer_aliveness_enabled(lp))
3294                                 snprintf(aliveness, LNET_MAX_STR_LEN,
3295                                          lp->lpni_alive ? "up" : "down");
3296
3297                         *nid = lp->lpni_nid;
3298                         *refcount = atomic_read(&lp->lpni_refcount);
3299                         *ni_peer_tx_credits =
3300                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
3301                         *peer_tx_credits = lp->lpni_txcredits;
3302                         *peer_rtr_credits = lp->lpni_rtrcredits;
3303                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
3304                         *peer_tx_qnob = lp->lpni_txqnob;
3305
3306                         found = true;
3307                 }
3308
3309         }
3310         lnet_net_unlock(*cpt_iter);
3311
3312         *cpt_iter = lncpt;
3313
3314         return found ? 0 : -ENOENT;
3315 }
3316
3317 /* ln_api_mutex is held, which keeps the peer list stable */
3318 int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
3319 {
3320         struct lnet_ioctl_element_stats *lpni_stats;
3321         struct lnet_ioctl_element_msg_stats *lpni_msg_stats;
3322         struct lnet_ioctl_peer_ni_hstats *lpni_hstats;
3323         struct lnet_peer_ni_credit_info *lpni_info;
3324         struct lnet_peer_ni *lpni;
3325         struct lnet_peer *lp;
3326         lnet_nid_t nid;
3327         __u32 size;
3328         int rc;
3329
3330         lp = lnet_find_peer(cfg->prcfg_prim_nid);
3331
3332         if (!lp) {
3333                 rc = -ENOENT;
3334                 goto out;
3335         }
3336
3337         size = sizeof(nid) + sizeof(*lpni_info) + sizeof(*lpni_stats)
3338                 + sizeof(*lpni_msg_stats) + sizeof(*lpni_hstats);
3339         size *= lp->lp_nnis;
3340         if (size > cfg->prcfg_size) {
3341                 cfg->prcfg_size = size;
3342                 rc = -E2BIG;
3343                 goto out_lp_decref;
3344         }
3345
3346         cfg->prcfg_prim_nid = lp->lp_primary_nid;
3347         cfg->prcfg_mr = lnet_peer_is_multi_rail(lp);
3348         cfg->prcfg_cfg_nid = lp->lp_primary_nid;
3349         cfg->prcfg_count = lp->lp_nnis;
3350         cfg->prcfg_size = size;
3351         cfg->prcfg_state = lp->lp_state;
3352
3353         /* Allocate helper buffers. */
3354         rc = -ENOMEM;
3355         LIBCFS_ALLOC(lpni_info, sizeof(*lpni_info));
3356         if (!lpni_info)
3357                 goto out_lp_decref;
3358         LIBCFS_ALLOC(lpni_stats, sizeof(*lpni_stats));
3359         if (!lpni_stats)
3360                 goto out_free_info;
3361         LIBCFS_ALLOC(lpni_msg_stats, sizeof(*lpni_msg_stats));
3362         if (!lpni_msg_stats)
3363                 goto out_free_stats;
3364         LIBCFS_ALLOC(lpni_hstats, sizeof(*lpni_hstats));
3365         if (!lpni_hstats)
3366                 goto out_free_msg_stats;
3367
3368
3369         lpni = NULL;
3370         rc = -EFAULT;
3371         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
3372                 nid = lpni->lpni_nid;
3373                 if (copy_to_user(bulk, &nid, sizeof(nid)))
3374                         goto out_free_hstats;
3375                 bulk += sizeof(nid);
3376
3377                 memset(lpni_info, 0, sizeof(*lpni_info));
3378                 snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
3379                 if (lnet_isrouter(lpni) ||
3380                         lnet_peer_aliveness_enabled(lpni))
3381                         snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN,
3382                                 lpni->lpni_alive ? "up" : "down");
3383
3384                 lpni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
3385                 lpni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
3386                         lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
3387                 lpni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
3388                 lpni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
3389                 lpni_info->cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
3390                 lpni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
3391                 lpni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
3392                 if (copy_to_user(bulk, lpni_info, sizeof(*lpni_info)))
3393                         goto out_free_hstats;
3394                 bulk += sizeof(*lpni_info);
3395
3396                 memset(lpni_stats, 0, sizeof(*lpni_stats));
3397                 lpni_stats->iel_send_count = lnet_sum_stats(&lpni->lpni_stats,
3398                                                             LNET_STATS_TYPE_SEND);
3399                 lpni_stats->iel_recv_count = lnet_sum_stats(&lpni->lpni_stats,
3400                                                             LNET_STATS_TYPE_RECV);
3401                 lpni_stats->iel_drop_count = lnet_sum_stats(&lpni->lpni_stats,
3402                                                             LNET_STATS_TYPE_DROP);
3403                 if (copy_to_user(bulk, lpni_stats, sizeof(*lpni_stats)))
3404                         goto out_free_hstats;
3405                 bulk += sizeof(*lpni_stats);
3406                 lnet_usr_translate_stats(lpni_msg_stats, &lpni->lpni_stats);
3407                 if (copy_to_user(bulk, lpni_msg_stats, sizeof(*lpni_msg_stats)))
3408                         goto out_free_hstats;
3409                 bulk += sizeof(*lpni_msg_stats);
3410                 lpni_hstats->hlpni_network_timeout =
3411                   atomic_read(&lpni->lpni_hstats.hlt_network_timeout);
3412                 lpni_hstats->hlpni_remote_dropped =
3413                   atomic_read(&lpni->lpni_hstats.hlt_remote_dropped);
3414                 lpni_hstats->hlpni_remote_timeout =
3415                   atomic_read(&lpni->lpni_hstats.hlt_remote_timeout);
3416                 lpni_hstats->hlpni_remote_error =
3417                   atomic_read(&lpni->lpni_hstats.hlt_remote_error);
3418                 lpni_hstats->hlpni_health_value =
3419                   atomic_read(&lpni->lpni_healthv);
3420                 if (copy_to_user(bulk, lpni_hstats, sizeof(*lpni_hstats)))
3421                         goto out_free_hstats;
3422                 bulk += sizeof(*lpni_hstats);
3423         }
3424         rc = 0;
3425
3426 out_free_hstats:
3427         LIBCFS_FREE(lpni_hstats, sizeof(*lpni_hstats));
3428 out_free_msg_stats:
3429         LIBCFS_FREE(lpni_msg_stats, sizeof(*lpni_msg_stats));
3430 out_free_stats:
3431         LIBCFS_FREE(lpni_stats, sizeof(*lpni_stats));
3432 out_free_info:
3433         LIBCFS_FREE(lpni_info, sizeof(*lpni_info));
3434 out_lp_decref:
3435         lnet_peer_decref_locked(lp);
3436 out:
3437         return rc;
3438 }
3439
3440 void
3441 lnet_peer_ni_add_to_recoveryq_locked(struct lnet_peer_ni *lpni)
3442 {
3443         /* the mt could've shutdown and cleaned up the queues */
3444         if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
3445                 return;
3446
3447         if (list_empty(&lpni->lpni_recovery) &&
3448             atomic_read(&lpni->lpni_healthv) < LNET_MAX_HEALTH_VALUE) {
3449                 CERROR("lpni %s added to recovery queue. Health = %d\n",
3450                         libcfs_nid2str(lpni->lpni_nid),
3451                         atomic_read(&lpni->lpni_healthv));
3452                 list_add_tail(&lpni->lpni_recovery, &the_lnet.ln_mt_peerNIRecovq);
3453                 lnet_peer_ni_addref_locked(lpni);
3454         }
3455 }
3456
3457 /* Call with the ln_api_mutex held */
3458 void
3459 lnet_peer_ni_set_healthv(lnet_nid_t nid, int value, bool all)
3460 {
3461         struct lnet_peer_table *ptable;
3462         struct lnet_peer *lp;
3463         struct lnet_peer_net *lpn;
3464         struct lnet_peer_ni *lpni;
3465         int lncpt;
3466         int cpt;
3467
3468         if (the_lnet.ln_state != LNET_STATE_RUNNING)
3469                 return;
3470
3471         if (!all) {
3472                 lnet_net_lock(LNET_LOCK_EX);
3473                 lpni = lnet_find_peer_ni_locked(nid);
3474                 if (!lpni) {
3475                         lnet_net_unlock(LNET_LOCK_EX);
3476                         return;
3477                 }
3478                 atomic_set(&lpni->lpni_healthv, value);
3479                 lnet_peer_ni_add_to_recoveryq_locked(lpni);
3480                 lnet_peer_ni_decref_locked(lpni);
3481                 lnet_net_unlock(LNET_LOCK_EX);
3482                 return;
3483         }
3484
3485         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3486
3487         /*
3488          * Walk all the peers and reset the healhv for each one to the
3489          * maximum value.
3490          */
3491         lnet_net_lock(LNET_LOCK_EX);
3492         for (cpt = 0; cpt < lncpt; cpt++) {
3493                 ptable = the_lnet.ln_peer_tables[cpt];
3494                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
3495                         list_for_each_entry(lpn, &lp->lp_peer_nets, lpn_peer_nets) {
3496                                 list_for_each_entry(lpni, &lpn->lpn_peer_nis,
3497                                                     lpni_peer_nis) {
3498                                         atomic_set(&lpni->lpni_healthv, value);
3499                                         lnet_peer_ni_add_to_recoveryq_locked(lpni);
3500                                 }
3501                         }
3502                 }
3503         }
3504         lnet_net_unlock(LNET_LOCK_EX);
3505 }
3506