Whamcloud - gitweb
LU-11297 lnet: invalidate recovery ping mdh
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <linux/sched.h>
38 #ifdef HAVE_SCHED_HEADERS
39 #include <linux/sched/signal.h>
40 #endif
41 #include <linux/uaccess.h>
42
43 #include <lnet/lib-lnet.h>
44 #include <uapi/linux/lnet/lnet-dlc.h>
45
46 /* Value indicating that recovery needs to re-check a peer immediately. */
47 #define LNET_REDISCOVER_PEER    (1)
48
49 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp);
50
51 static void
52 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
53 {
54         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
55                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
56                 lnet_peer_ni_decref_locked(lpni);
57         }
58 }
59
60 void
61 lnet_peer_net_added(struct lnet_net *net)
62 {
63         struct lnet_peer_ni *lpni, *tmp;
64
65         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
66                                  lpni_on_remote_peer_ni_list) {
67
68                 if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
69                         lpni->lpni_net = net;
70
71                         spin_lock(&lpni->lpni_lock);
72                         lpni->lpni_txcredits =
73                                 lpni->lpni_net->net_tunables.lct_peer_tx_credits;
74                         lpni->lpni_mintxcredits = lpni->lpni_txcredits;
75                         lpni->lpni_rtrcredits =
76                                 lnet_peer_buffer_credits(lpni->lpni_net);
77                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
78                         spin_unlock(&lpni->lpni_lock);
79
80                         lnet_peer_remove_from_remote_list(lpni);
81                 }
82         }
83 }
84
85 static void
86 lnet_peer_tables_destroy(void)
87 {
88         struct lnet_peer_table  *ptable;
89         struct list_head        *hash;
90         int                     i;
91         int                     j;
92
93         if (!the_lnet.ln_peer_tables)
94                 return;
95
96         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
97                 hash = ptable->pt_hash;
98                 if (!hash) /* not intialized */
99                         break;
100
101                 LASSERT(list_empty(&ptable->pt_zombie_list));
102
103                 ptable->pt_hash = NULL;
104                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
105                         LASSERT(list_empty(&hash[j]));
106
107                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
108         }
109
110         cfs_percpt_free(the_lnet.ln_peer_tables);
111         the_lnet.ln_peer_tables = NULL;
112 }
113
114 int
115 lnet_peer_tables_create(void)
116 {
117         struct lnet_peer_table  *ptable;
118         struct list_head        *hash;
119         int                     i;
120         int                     j;
121
122         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
123                                                    sizeof(*ptable));
124         if (the_lnet.ln_peer_tables == NULL) {
125                 CERROR("Failed to allocate cpu-partition peer tables\n");
126                 return -ENOMEM;
127         }
128
129         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
130                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
131                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
132                 if (hash == NULL) {
133                         CERROR("Failed to create peer hash table\n");
134                         lnet_peer_tables_destroy();
135                         return -ENOMEM;
136                 }
137
138                 spin_lock_init(&ptable->pt_zombie_lock);
139                 INIT_LIST_HEAD(&ptable->pt_zombie_list);
140
141                 INIT_LIST_HEAD(&ptable->pt_peer_list);
142
143                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
144                         INIT_LIST_HEAD(&hash[j]);
145                 ptable->pt_hash = hash; /* sign of initialization */
146         }
147
148         return 0;
149 }
150
151 static struct lnet_peer_ni *
152 lnet_peer_ni_alloc(lnet_nid_t nid)
153 {
154         struct lnet_peer_ni *lpni;
155         struct lnet_net *net;
156         int cpt;
157
158         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
159
160         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
161         if (!lpni)
162                 return NULL;
163
164         INIT_LIST_HEAD(&lpni->lpni_txq);
165         INIT_LIST_HEAD(&lpni->lpni_rtrq);
166         INIT_LIST_HEAD(&lpni->lpni_routes);
167         INIT_LIST_HEAD(&lpni->lpni_hashlist);
168         INIT_LIST_HEAD(&lpni->lpni_peer_nis);
169         INIT_LIST_HEAD(&lpni->lpni_recovery);
170         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
171         LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
172
173         spin_lock_init(&lpni->lpni_lock);
174
175         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
176         lpni->lpni_last_alive = ktime_get_seconds(); /* assumes alive */
177         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
178         lpni->lpni_nid = nid;
179         lpni->lpni_cpt = cpt;
180         atomic_set(&lpni->lpni_healthv, LNET_MAX_HEALTH_VALUE);
181
182         net = lnet_get_net_locked(LNET_NIDNET(nid));
183         lpni->lpni_net = net;
184         if (net) {
185                 lpni->lpni_txcredits = net->net_tunables.lct_peer_tx_credits;
186                 lpni->lpni_mintxcredits = lpni->lpni_txcredits;
187                 lpni->lpni_rtrcredits = lnet_peer_buffer_credits(net);
188                 lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
189         } else {
190                 /*
191                  * This peer_ni is not on a local network, so we
192                  * cannot add the credits here. In case the net is
193                  * added later, add the peer_ni to the remote peer ni
194                  * list so it can be easily found and revisited.
195                  */
196                 /* FIXME: per-net implementation instead? */
197                 atomic_inc(&lpni->lpni_refcount);
198                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
199                               &the_lnet.ln_remote_peer_ni_list);
200         }
201
202         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
203
204         return lpni;
205 }
206
207 static struct lnet_peer_net *
208 lnet_peer_net_alloc(__u32 net_id)
209 {
210         struct lnet_peer_net *lpn;
211
212         LIBCFS_CPT_ALLOC(lpn, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lpn));
213         if (!lpn)
214                 return NULL;
215
216         INIT_LIST_HEAD(&lpn->lpn_peer_nets);
217         INIT_LIST_HEAD(&lpn->lpn_peer_nis);
218         lpn->lpn_net_id = net_id;
219
220         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
221
222         return lpn;
223 }
224
225 void
226 lnet_destroy_peer_net_locked(struct lnet_peer_net *lpn)
227 {
228         struct lnet_peer *lp;
229
230         CDEBUG(D_NET, "%p net %s\n", lpn, libcfs_net2str(lpn->lpn_net_id));
231
232         LASSERT(atomic_read(&lpn->lpn_refcount) == 0);
233         LASSERT(list_empty(&lpn->lpn_peer_nis));
234         LASSERT(list_empty(&lpn->lpn_peer_nets));
235         lp = lpn->lpn_peer;
236         lpn->lpn_peer = NULL;
237         LIBCFS_FREE(lpn, sizeof(*lpn));
238
239         lnet_peer_decref_locked(lp);
240 }
241
242 static struct lnet_peer *
243 lnet_peer_alloc(lnet_nid_t nid)
244 {
245         struct lnet_peer *lp;
246
247         LIBCFS_CPT_ALLOC(lp, lnet_cpt_table(), CFS_CPT_ANY, sizeof(*lp));
248         if (!lp)
249                 return NULL;
250
251         INIT_LIST_HEAD(&lp->lp_peer_list);
252         INIT_LIST_HEAD(&lp->lp_peer_nets);
253         INIT_LIST_HEAD(&lp->lp_dc_list);
254         INIT_LIST_HEAD(&lp->lp_dc_pendq);
255         init_waitqueue_head(&lp->lp_dc_waitq);
256         spin_lock_init(&lp->lp_lock);
257         lp->lp_primary_nid = nid;
258         /*
259          * Turn off discovery for loopback peer. If you're creating a peer
260          * for the loopback interface then that was initiated when we
261          * attempted to send a message over the loopback. There is no need
262          * to ever use a different interface when sending messages to
263          * myself.
264          */
265         if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
266                 lp->lp_state = LNET_PEER_NO_DISCOVERY;
267         lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
268
269         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
270
271         return lp;
272 }
273
274 void
275 lnet_destroy_peer_locked(struct lnet_peer *lp)
276 {
277         CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
278
279         LASSERT(atomic_read(&lp->lp_refcount) == 0);
280         LASSERT(list_empty(&lp->lp_peer_nets));
281         LASSERT(list_empty(&lp->lp_peer_list));
282         LASSERT(list_empty(&lp->lp_dc_list));
283
284         if (lp->lp_data)
285                 lnet_ping_buffer_decref(lp->lp_data);
286
287         /*
288          * if there are messages still on the pending queue, then make
289          * sure to queue them on the ln_msg_resend list so they can be
290          * resent at a later point if the discovery thread is still
291          * running.
292          * If the discovery thread has stopped, then the wakeup will be a
293          * no-op, and it is expected the lnet_shutdown_lndnets() will
294          * eventually be called, which will traverse this list and
295          * finalize the messages on the list.
296          * We can not resend them now because we're holding the cpt lock.
297          * Releasing the lock can cause an inconsistent state
298          */
299         spin_lock(&the_lnet.ln_msg_resend_lock);
300         list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
301         spin_unlock(&the_lnet.ln_msg_resend_lock);
302         wake_up(&the_lnet.ln_dc_waitq);
303
304         LIBCFS_FREE(lp, sizeof(*lp));
305 }
306
307 /*
308  * Detach a peer_ni from its peer_net. If this was the last peer_ni on
309  * that peer_net, detach the peer_net from the peer.
310  *
311  * Call with lnet_net_lock/EX held
312  */
313 static void
314 lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
315 {
316         struct lnet_peer_table *ptable;
317         struct lnet_peer_net *lpn;
318         struct lnet_peer *lp;
319
320         /*
321          * Belts and suspenders: gracefully handle teardown of a
322          * partially connected peer_ni.
323          */
324         lpn = lpni->lpni_peer_net;
325
326         list_del_init(&lpni->lpni_peer_nis);
327         /*
328          * If there are no lpni's left, we detach lpn from
329          * lp_peer_nets, so it cannot be found anymore.
330          */
331         if (list_empty(&lpn->lpn_peer_nis))
332                 list_del_init(&lpn->lpn_peer_nets);
333
334         /* Update peer NID count. */
335         lp = lpn->lpn_peer;
336         lp->lp_nnis--;
337
338         /*
339          * If there are no more peer nets, make the peer unfindable
340          * via the peer_tables.
341          *
342          * Otherwise, if the peer is DISCOVERED, tell discovery to
343          * take another look at it. This is a no-op if discovery for
344          * this peer did the detaching.
345          */
346         if (list_empty(&lp->lp_peer_nets)) {
347                 list_del_init(&lp->lp_peer_list);
348                 ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
349                 ptable->pt_peers--;
350         } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) {
351                 /* Discovery isn't running, nothing to do here. */
352         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
353                 lnet_peer_queue_for_discovery(lp);
354                 wake_up(&the_lnet.ln_dc_waitq);
355         }
356         CDEBUG(D_NET, "peer %s NID %s\n",
357                 libcfs_nid2str(lp->lp_primary_nid),
358                 libcfs_nid2str(lpni->lpni_nid));
359 }
360
361 /* called with lnet_net_lock LNET_LOCK_EX held */
362 static int
363 lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
364 {
365         struct lnet_peer_table *ptable = NULL;
366
367         /* don't remove a peer_ni if it's also a gateway */
368         if (lpni->lpni_rtr_refcount > 0) {
369                 CERROR("Peer NI %s is a gateway. Can not delete it\n",
370                        libcfs_nid2str(lpni->lpni_nid));
371                 return -EBUSY;
372         }
373
374         lnet_peer_remove_from_remote_list(lpni);
375
376         /* remove peer ni from the hash list. */
377         list_del_init(&lpni->lpni_hashlist);
378
379         /*
380          * indicate the peer is being deleted so the monitor thread can
381          * remove it from the recovery queue.
382          */
383         spin_lock(&lpni->lpni_lock);
384         lpni->lpni_state |= LNET_PEER_NI_DELETING;
385         spin_unlock(&lpni->lpni_lock);
386
387         /* decrement the ref count on the peer table */
388         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
389         LASSERT(ptable->pt_number > 0);
390         ptable->pt_number--;
391
392         /*
393          * The peer_ni can no longer be found with a lookup. But there
394          * can be current users, so keep track of it on the zombie
395          * list until the reference count has gone to zero.
396          *
397          * The last reference may be lost in a place where the
398          * lnet_net_lock locks only a single cpt, and that cpt may not
399          * be lpni->lpni_cpt. So the zombie list of lnet_peer_table
400          * has its own lock.
401          */
402         spin_lock(&ptable->pt_zombie_lock);
403         list_add(&lpni->lpni_hashlist, &ptable->pt_zombie_list);
404         ptable->pt_zombies++;
405         spin_unlock(&ptable->pt_zombie_lock);
406
407         /* no need to keep this peer_ni on the hierarchy anymore */
408         lnet_peer_detach_peer_ni_locked(lpni);
409
410         /* remove hashlist reference on peer_ni */
411         lnet_peer_ni_decref_locked(lpni);
412
413         return 0;
414 }
415
416 void lnet_peer_uninit(void)
417 {
418         struct lnet_peer_ni *lpni, *tmp;
419
420         lnet_net_lock(LNET_LOCK_EX);
421
422         /* remove all peer_nis from the remote peer and the hash list */
423         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
424                                  lpni_on_remote_peer_ni_list)
425                 lnet_peer_ni_del_locked(lpni);
426
427         lnet_peer_tables_destroy();
428
429         lnet_net_unlock(LNET_LOCK_EX);
430 }
431
432 static int
433 lnet_peer_del_locked(struct lnet_peer *peer)
434 {
435         struct lnet_peer_ni *lpni = NULL, *lpni2;
436         int rc = 0, rc2 = 0;
437
438         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(peer->lp_primary_nid));
439
440         lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
441         while (lpni != NULL) {
442                 lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
443                 rc = lnet_peer_ni_del_locked(lpni);
444                 if (rc != 0)
445                         rc2 = rc;
446                 lpni = lpni2;
447         }
448
449         return rc2;
450 }
451
452 static int
453 lnet_peer_del(struct lnet_peer *peer)
454 {
455         lnet_net_lock(LNET_LOCK_EX);
456         lnet_peer_del_locked(peer);
457         lnet_net_unlock(LNET_LOCK_EX);
458
459         return 0;
460 }
461
462 /*
463  * Delete a NID from a peer. Call with ln_api_mutex held.
464  *
465  * Error codes:
466  *  -EPERM:  Non-DLC deletion from DLC-configured peer.
467  *  -ENOENT: No lnet_peer_ni corresponding to the nid.
468  *  -ECHILD: The lnet_peer_ni isn't connected to the peer.
469  *  -EBUSY:  The lnet_peer_ni is the primary, and not the only peer_ni.
470  */
471 static int
472 lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
473 {
474         struct lnet_peer_ni *lpni;
475         lnet_nid_t primary_nid = lp->lp_primary_nid;
476         int rc = 0;
477
478         if (!(flags & LNET_PEER_CONFIGURED)) {
479                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
480                         rc = -EPERM;
481                         goto out;
482                 }
483         }
484         lpni = lnet_find_peer_ni_locked(nid);
485         if (!lpni) {
486                 rc = -ENOENT;
487                 goto out;
488         }
489         lnet_peer_ni_decref_locked(lpni);
490         if (lp != lpni->lpni_peer_net->lpn_peer) {
491                 rc = -ECHILD;
492                 goto out;
493         }
494
495         /*
496          * This function only allows deletion of the primary NID if it
497          * is the only NID.
498          */
499         if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
500                 rc = -EBUSY;
501                 goto out;
502         }
503
504         lnet_net_lock(LNET_LOCK_EX);
505
506         rc = lnet_peer_ni_del_locked(lpni);
507
508         lnet_net_unlock(LNET_LOCK_EX);
509
510 out:
511         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
512                libcfs_nid2str(primary_nid), libcfs_nid2str(nid), flags, rc);
513
514         return rc;
515 }
516
517 static void
518 lnet_peer_table_cleanup_locked(struct lnet_net *net,
519                                struct lnet_peer_table *ptable)
520 {
521         int                      i;
522         struct lnet_peer_ni     *next;
523         struct lnet_peer_ni     *lpni;
524         struct lnet_peer        *peer;
525
526         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
527                 list_for_each_entry_safe(lpni, next, &ptable->pt_hash[i],
528                                          lpni_hashlist) {
529                         if (net != NULL && net != lpni->lpni_net)
530                                 continue;
531
532                         peer = lpni->lpni_peer_net->lpn_peer;
533                         if (peer->lp_primary_nid != lpni->lpni_nid) {
534                                 lnet_peer_ni_del_locked(lpni);
535                                 continue;
536                         }
537                         /*
538                          * Removing the primary NID implies removing
539                          * the entire peer. Advance next beyond any
540                          * peer_ni that belongs to the same peer.
541                          */
542                         list_for_each_entry_from(next, &ptable->pt_hash[i],
543                                                  lpni_hashlist) {
544                                 if (next->lpni_peer_net->lpn_peer != peer)
545                                         break;
546                         }
547                         lnet_peer_del_locked(peer);
548                 }
549         }
550 }
551
552 static void
553 lnet_peer_ni_finalize_wait(struct lnet_peer_table *ptable)
554 {
555         int     i = 3;
556
557         spin_lock(&ptable->pt_zombie_lock);
558         while (ptable->pt_zombies) {
559                 spin_unlock(&ptable->pt_zombie_lock);
560
561                 if (is_power_of_2(i)) {
562                         CDEBUG(D_WARNING,
563                                "Waiting for %d zombies on peer table\n",
564                                ptable->pt_zombies);
565                 }
566                 set_current_state(TASK_UNINTERRUPTIBLE);
567                 schedule_timeout(cfs_time_seconds(1) >> 1);
568                 spin_lock(&ptable->pt_zombie_lock);
569         }
570         spin_unlock(&ptable->pt_zombie_lock);
571 }
572
573 static void
574 lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
575                                 struct lnet_peer_table *ptable)
576 {
577         struct lnet_peer_ni     *lp;
578         struct lnet_peer_ni     *tmp;
579         lnet_nid_t              lpni_nid;
580         int                     i;
581
582         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
583                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
584                                          lpni_hashlist) {
585                         if (net != lp->lpni_net)
586                                 continue;
587
588                         if (lp->lpni_rtr_refcount == 0)
589                                 continue;
590
591                         lpni_nid = lp->lpni_nid;
592
593                         lnet_net_unlock(LNET_LOCK_EX);
594                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
595                         lnet_net_lock(LNET_LOCK_EX);
596                 }
597         }
598 }
599
600 void
601 lnet_peer_tables_cleanup(struct lnet_net *net)
602 {
603         int i;
604         struct lnet_peer_table *ptable;
605
606         LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
607         /* If just deleting the peers for a NI, get rid of any routes these
608          * peers are gateways for. */
609         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
610                 lnet_net_lock(LNET_LOCK_EX);
611                 lnet_peer_table_del_rtrs_locked(net, ptable);
612                 lnet_net_unlock(LNET_LOCK_EX);
613         }
614
615         /* Start the cleanup process */
616         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
617                 lnet_net_lock(LNET_LOCK_EX);
618                 lnet_peer_table_cleanup_locked(net, ptable);
619                 lnet_net_unlock(LNET_LOCK_EX);
620         }
621
622         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables)
623                 lnet_peer_ni_finalize_wait(ptable);
624 }
625
626 static struct lnet_peer_ni *
627 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
628 {
629         struct list_head        *peers;
630         struct lnet_peer_ni     *lp;
631
632         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
633
634         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
635         list_for_each_entry(lp, peers, lpni_hashlist) {
636                 if (lp->lpni_nid == nid) {
637                         lnet_peer_ni_addref_locked(lp);
638                         return lp;
639                 }
640         }
641
642         return NULL;
643 }
644
645 struct lnet_peer_ni *
646 lnet_find_peer_ni_locked(lnet_nid_t nid)
647 {
648         struct lnet_peer_ni *lpni;
649         struct lnet_peer_table *ptable;
650         int cpt;
651
652         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
653
654         ptable = the_lnet.ln_peer_tables[cpt];
655         lpni = lnet_get_peer_ni_locked(ptable, nid);
656
657         return lpni;
658 }
659
660 struct lnet_peer *
661 lnet_find_peer(lnet_nid_t nid)
662 {
663         struct lnet_peer_ni *lpni;
664         struct lnet_peer *lp = NULL;
665         int cpt;
666
667         cpt = lnet_net_lock_current();
668         lpni = lnet_find_peer_ni_locked(nid);
669         if (lpni) {
670                 lp = lpni->lpni_peer_net->lpn_peer;
671                 lnet_peer_addref_locked(lp);
672                 lnet_peer_ni_decref_locked(lpni);
673         }
674         lnet_net_unlock(cpt);
675
676         return lp;
677 }
678
679 struct lnet_peer_ni *
680 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
681                              struct lnet_peer_net *peer_net,
682                              struct lnet_peer_ni *prev)
683 {
684         struct lnet_peer_ni *lpni;
685         struct lnet_peer_net *net = peer_net;
686
687         if (!prev) {
688                 if (!net) {
689                         if (list_empty(&peer->lp_peer_nets))
690                                 return NULL;
691
692                         net = list_entry(peer->lp_peer_nets.next,
693                                          struct lnet_peer_net,
694                                          lpn_peer_nets);
695                 }
696                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
697                                   lpni_peer_nis);
698
699                 return lpni;
700         }
701
702         if (prev->lpni_peer_nis.next == &prev->lpni_peer_net->lpn_peer_nis) {
703                 /*
704                  * if you reached the end of the peer ni list and the peer
705                  * net is specified then there are no more peer nis in that
706                  * net.
707                  */
708                 if (net)
709                         return NULL;
710
711                 /*
712                  * we reached the end of this net ni list. move to the
713                  * next net
714                  */
715                 if (prev->lpni_peer_net->lpn_peer_nets.next ==
716                     &peer->lp_peer_nets)
717                         /* no more nets and no more NIs. */
718                         return NULL;
719
720                 /* get the next net */
721                 net = list_entry(prev->lpni_peer_net->lpn_peer_nets.next,
722                                  struct lnet_peer_net,
723                                  lpn_peer_nets);
724                 /* get the ni on it */
725                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
726                                   lpni_peer_nis);
727
728                 return lpni;
729         }
730
731         /* there are more nis left */
732         lpni = list_entry(prev->lpni_peer_nis.next,
733                           struct lnet_peer_ni, lpni_peer_nis);
734
735         return lpni;
736 }
737
738 /* Call with the ln_api_mutex held */
739 int lnet_get_peer_list(u32 *countp, u32 *sizep, struct lnet_process_id __user *ids)
740 {
741         struct lnet_process_id id;
742         struct lnet_peer_table *ptable;
743         struct lnet_peer *lp;
744         __u32 count = 0;
745         __u32 size = 0;
746         int lncpt;
747         int cpt;
748         __u32 i;
749         int rc;
750
751         rc = -ESHUTDOWN;
752         if (the_lnet.ln_state != LNET_STATE_RUNNING)
753                 goto done;
754
755         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
756
757         /*
758          * Count the number of peers, and return E2BIG if the buffer
759          * is too small. We'll also return the desired size.
760          */
761         rc = -E2BIG;
762         for (cpt = 0; cpt < lncpt; cpt++) {
763                 ptable = the_lnet.ln_peer_tables[cpt];
764                 count += ptable->pt_peers;
765         }
766         size = count * sizeof(*ids);
767         if (size > *sizep)
768                 goto done;
769
770         /*
771          * Walk the peer lists and copy out the primary nids.
772          * This is safe because the peer lists are only modified
773          * while the ln_api_mutex is held. So we don't need to
774          * hold the lnet_net_lock as well, and can therefore
775          * directly call copy_to_user().
776          */
777         rc = -EFAULT;
778         memset(&id, 0, sizeof(id));
779         id.pid = LNET_PID_LUSTRE;
780         i = 0;
781         for (cpt = 0; cpt < lncpt; cpt++) {
782                 ptable = the_lnet.ln_peer_tables[cpt];
783                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
784                         if (i >= count)
785                                 goto done;
786                         id.nid = lp->lp_primary_nid;
787                         if (copy_to_user(&ids[i], &id, sizeof(id)))
788                                 goto done;
789                         i++;
790                 }
791         }
792         rc = 0;
793 done:
794         *countp = count;
795         *sizep = size;
796         return rc;
797 }
798
799 /*
800  * Start pushes to peers that need to be updated for a configuration
801  * change on this node.
802  */
803 void
804 lnet_push_update_to_peers(int force)
805 {
806         struct lnet_peer_table *ptable;
807         struct lnet_peer *lp;
808         int lncpt;
809         int cpt;
810
811         lnet_net_lock(LNET_LOCK_EX);
812         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
813         for (cpt = 0; cpt < lncpt; cpt++) {
814                 ptable = the_lnet.ln_peer_tables[cpt];
815                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
816                         if (force) {
817                                 spin_lock(&lp->lp_lock);
818                                 if (lp->lp_state & LNET_PEER_MULTI_RAIL)
819                                         lp->lp_state |= LNET_PEER_FORCE_PUSH;
820                                 spin_unlock(&lp->lp_lock);
821                         }
822                         if (lnet_peer_needs_push(lp))
823                                 lnet_peer_queue_for_discovery(lp);
824                 }
825         }
826         lnet_net_unlock(LNET_LOCK_EX);
827         wake_up(&the_lnet.ln_dc_waitq);
828 }
829
830 /*
831  * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
832  * this is a preferred point-to-point path. Call with lnet_net_lock in
833  * shared mmode.
834  */
835 bool
836 lnet_peer_is_pref_nid_locked(struct lnet_peer_ni *lpni, lnet_nid_t nid)
837 {
838         int i;
839
840         if (lpni->lpni_pref_nnids == 0)
841                 return false;
842         if (lpni->lpni_pref_nnids == 1)
843                 return lpni->lpni_pref.nid == nid;
844         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
845                 if (lpni->lpni_pref.nids[i] == nid)
846                         return true;
847         }
848         return false;
849 }
850
851 /*
852  * Set a single ni as preferred, provided no preferred ni is already
853  * defined. Only to be used for non-multi-rail peer_ni.
854  */
855 int
856 lnet_peer_ni_set_non_mr_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
857 {
858         int rc = 0;
859
860         spin_lock(&lpni->lpni_lock);
861         if (nid == LNET_NID_ANY) {
862                 rc = -EINVAL;
863         } else if (lpni->lpni_pref_nnids > 0) {
864                 rc = -EPERM;
865         } else if (lpni->lpni_pref_nnids == 0) {
866                 lpni->lpni_pref.nid = nid;
867                 lpni->lpni_pref_nnids = 1;
868                 lpni->lpni_state |= LNET_PEER_NI_NON_MR_PREF;
869         }
870         spin_unlock(&lpni->lpni_lock);
871
872         CDEBUG(D_NET, "peer %s nid %s: %d\n",
873                libcfs_nid2str(lpni->lpni_nid), libcfs_nid2str(nid), rc);
874         return rc;
875 }
876
877 /*
878  * Clear the preferred NID from a non-multi-rail peer_ni, provided
879  * this preference was set by lnet_peer_ni_set_non_mr_pref_nid().
880  */
881 int
882 lnet_peer_ni_clr_non_mr_pref_nid(struct lnet_peer_ni *lpni)
883 {
884         int rc = 0;
885
886         spin_lock(&lpni->lpni_lock);
887         if (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF) {
888                 lpni->lpni_pref_nnids = 0;
889                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
890         } else if (lpni->lpni_pref_nnids == 0) {
891                 rc = -ENOENT;
892         } else {
893                 rc = -EPERM;
894         }
895         spin_unlock(&lpni->lpni_lock);
896
897         CDEBUG(D_NET, "peer %s: %d\n",
898                libcfs_nid2str(lpni->lpni_nid), rc);
899         return rc;
900 }
901
902 /*
903  * Clear the preferred NIDs from a non-multi-rail peer.
904  */
905 void
906 lnet_peer_clr_non_mr_pref_nids(struct lnet_peer *lp)
907 {
908         struct lnet_peer_ni *lpni = NULL;
909
910         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
911                 lnet_peer_ni_clr_non_mr_pref_nid(lpni);
912 }
913
914 int
915 lnet_peer_add_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
916 {
917         lnet_nid_t *nids = NULL;
918         lnet_nid_t *oldnids = NULL;
919         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
920         int size;
921         int i;
922         int rc = 0;
923
924         if (nid == LNET_NID_ANY) {
925                 rc = -EINVAL;
926                 goto out;
927         }
928
929         if (lpni->lpni_pref_nnids == 1 && lpni->lpni_pref.nid == nid) {
930                 rc = -EEXIST;
931                 goto out;
932         }
933
934         /* A non-MR node may have only one preferred NI per peer_ni */
935         if (lpni->lpni_pref_nnids > 0) {
936                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
937                         rc = -EPERM;
938                         goto out;
939                 }
940         }
941
942         if (lpni->lpni_pref_nnids != 0) {
943                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
944                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
945                 if (!nids) {
946                         rc = -ENOMEM;
947                         goto out;
948                 }
949                 for (i = 0; i < lpni->lpni_pref_nnids; i++) {
950                         if (lpni->lpni_pref.nids[i] == nid) {
951                                 LIBCFS_FREE(nids, size);
952                                 rc = -EEXIST;
953                                 goto out;
954                         }
955                         nids[i] = lpni->lpni_pref.nids[i];
956                 }
957                 nids[i] = nid;
958         }
959
960         lnet_net_lock(LNET_LOCK_EX);
961         spin_lock(&lpni->lpni_lock);
962         if (lpni->lpni_pref_nnids == 0) {
963                 lpni->lpni_pref.nid = nid;
964         } else {
965                 oldnids = lpni->lpni_pref.nids;
966                 lpni->lpni_pref.nids = nids;
967         }
968         lpni->lpni_pref_nnids++;
969         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
970         spin_unlock(&lpni->lpni_lock);
971         lnet_net_unlock(LNET_LOCK_EX);
972
973         if (oldnids) {
974                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
975                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
976         }
977 out:
978         if (rc == -EEXIST && (lpni->lpni_state & LNET_PEER_NI_NON_MR_PREF)) {
979                 spin_lock(&lpni->lpni_lock);
980                 lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
981                 spin_unlock(&lpni->lpni_lock);
982         }
983         CDEBUG(D_NET, "peer %s nid %s: %d\n",
984                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
985         return rc;
986 }
987
988 int
989 lnet_peer_del_pref_nid(struct lnet_peer_ni *lpni, lnet_nid_t nid)
990 {
991         lnet_nid_t *nids = NULL;
992         lnet_nid_t *oldnids = NULL;
993         struct lnet_peer *lp = lpni->lpni_peer_net->lpn_peer;
994         int size;
995         int i, j;
996         int rc = 0;
997
998         if (lpni->lpni_pref_nnids == 0) {
999                 rc = -ENOENT;
1000                 goto out;
1001         }
1002
1003         if (lpni->lpni_pref_nnids == 1) {
1004                 if (lpni->lpni_pref.nid != nid) {
1005                         rc = -ENOENT;
1006                         goto out;
1007                 }
1008         } else if (lpni->lpni_pref_nnids == 2) {
1009                 if (lpni->lpni_pref.nids[0] != nid &&
1010                     lpni->lpni_pref.nids[1] != nid) {
1011                         rc = -ENOENT;
1012                         goto out;
1013                 }
1014         } else {
1015                 size = sizeof(*nids) * (lpni->lpni_pref_nnids - 1);
1016                 LIBCFS_CPT_ALLOC(nids, lnet_cpt_table(), lpni->lpni_cpt, size);
1017                 if (!nids) {
1018                         rc = -ENOMEM;
1019                         goto out;
1020                 }
1021                 for (i = 0, j = 0; i < lpni->lpni_pref_nnids; i++) {
1022                         if (lpni->lpni_pref.nids[i] != nid)
1023                                 continue;
1024                         nids[j++] = lpni->lpni_pref.nids[i];
1025                 }
1026                 /* Check if we actually removed a nid. */
1027                 if (j == lpni->lpni_pref_nnids) {
1028                         LIBCFS_FREE(nids, size);
1029                         rc = -ENOENT;
1030                         goto out;
1031                 }
1032         }
1033
1034         lnet_net_lock(LNET_LOCK_EX);
1035         spin_lock(&lpni->lpni_lock);
1036         if (lpni->lpni_pref_nnids == 1) {
1037                 lpni->lpni_pref.nid = LNET_NID_ANY;
1038         } else if (lpni->lpni_pref_nnids == 2) {
1039                 oldnids = lpni->lpni_pref.nids;
1040                 if (oldnids[0] == nid)
1041                         lpni->lpni_pref.nid = oldnids[1];
1042                 else
1043                         lpni->lpni_pref.nid = oldnids[2];
1044         } else {
1045                 oldnids = lpni->lpni_pref.nids;
1046                 lpni->lpni_pref.nids = nids;
1047         }
1048         lpni->lpni_pref_nnids--;
1049         lpni->lpni_state &= ~LNET_PEER_NI_NON_MR_PREF;
1050         spin_unlock(&lpni->lpni_lock);
1051         lnet_net_unlock(LNET_LOCK_EX);
1052
1053         if (oldnids) {
1054                 size = sizeof(*nids) * (lpni->lpni_pref_nnids + 1);
1055                 LIBCFS_FREE(oldnids, sizeof(*oldnids) * size);
1056         }
1057 out:
1058         CDEBUG(D_NET, "peer %s nid %s: %d\n",
1059                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid), rc);
1060         return rc;
1061 }
1062
1063 lnet_nid_t
1064 lnet_peer_primary_nid_locked(lnet_nid_t nid)
1065 {
1066         struct lnet_peer_ni *lpni;
1067         lnet_nid_t primary_nid = nid;
1068
1069         lpni = lnet_find_peer_ni_locked(nid);
1070         if (lpni) {
1071                 primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
1072                 lnet_peer_ni_decref_locked(lpni);
1073         }
1074
1075         return primary_nid;
1076 }
1077
1078 lnet_nid_t
1079 LNetPrimaryNID(lnet_nid_t nid)
1080 {
1081         struct lnet_peer *lp;
1082         struct lnet_peer_ni *lpni;
1083         lnet_nid_t primary_nid = nid;
1084         int rc = 0;
1085         int cpt;
1086
1087         cpt = lnet_net_lock_current();
1088         lpni = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
1089         if (IS_ERR(lpni)) {
1090                 rc = PTR_ERR(lpni);
1091                 goto out_unlock;
1092         }
1093         lp = lpni->lpni_peer_net->lpn_peer;
1094         while (!lnet_peer_is_uptodate(lp)) {
1095                 rc = lnet_discover_peer_locked(lpni, cpt, true);
1096                 if (rc)
1097                         goto out_decref;
1098                 lp = lpni->lpni_peer_net->lpn_peer;
1099         }
1100         primary_nid = lp->lp_primary_nid;
1101 out_decref:
1102         lnet_peer_ni_decref_locked(lpni);
1103 out_unlock:
1104         lnet_net_unlock(cpt);
1105
1106         CDEBUG(D_NET, "NID %s primary NID %s rc %d\n", libcfs_nid2str(nid),
1107                libcfs_nid2str(primary_nid), rc);
1108         return primary_nid;
1109 }
1110 EXPORT_SYMBOL(LNetPrimaryNID);
1111
1112 struct lnet_peer_net *
1113 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
1114 {
1115         struct lnet_peer_net *peer_net;
1116         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
1117                 if (peer_net->lpn_net_id == net_id)
1118                         return peer_net;
1119         }
1120         return NULL;
1121 }
1122
1123 /*
1124  * Attach a peer_ni to a peer_net and peer. This function assumes
1125  * peer_ni is not already attached to the peer_net/peer. The peer_ni
1126  * may be attached to a different peer, in which case it will be
1127  * properly detached first. The whole operation is done atomically.
1128  *
1129  * Always returns 0.  This is the last function called from functions
1130  * that do return an int, so returning 0 here allows the compiler to
1131  * do a tail call.
1132  */
1133 static int
1134 lnet_peer_attach_peer_ni(struct lnet_peer *lp,
1135                                 struct lnet_peer_net *lpn,
1136                                 struct lnet_peer_ni *lpni,
1137                                 unsigned flags)
1138 {
1139         struct lnet_peer_table *ptable;
1140
1141         /* Install the new peer_ni */
1142         lnet_net_lock(LNET_LOCK_EX);
1143         /* Add peer_ni to global peer table hash, if necessary. */
1144         if (list_empty(&lpni->lpni_hashlist)) {
1145                 int hash = lnet_nid2peerhash(lpni->lpni_nid);
1146
1147                 ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1148                 list_add_tail(&lpni->lpni_hashlist, &ptable->pt_hash[hash]);
1149                 ptable->pt_version++;
1150                 ptable->pt_number++;
1151                 /* This is the 1st refcount on lpni. */
1152                 atomic_inc(&lpni->lpni_refcount);
1153         }
1154
1155         /* Detach the peer_ni from an existing peer, if necessary. */
1156         if (lpni->lpni_peer_net) {
1157                 LASSERT(lpni->lpni_peer_net != lpn);
1158                 LASSERT(lpni->lpni_peer_net->lpn_peer != lp);
1159                 lnet_peer_detach_peer_ni_locked(lpni);
1160                 lnet_peer_net_decref_locked(lpni->lpni_peer_net);
1161                 lpni->lpni_peer_net = NULL;
1162         }
1163
1164         /* Add peer_ni to peer_net */
1165         lpni->lpni_peer_net = lpn;
1166         list_add_tail(&lpni->lpni_peer_nis, &lpn->lpn_peer_nis);
1167         lnet_peer_net_addref_locked(lpn);
1168
1169         /* Add peer_net to peer */
1170         if (!lpn->lpn_peer) {
1171                 lpn->lpn_peer = lp;
1172                 list_add_tail(&lpn->lpn_peer_nets, &lp->lp_peer_nets);
1173                 lnet_peer_addref_locked(lp);
1174         }
1175
1176         /* Add peer to global peer list, if necessary */
1177         ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
1178         if (list_empty(&lp->lp_peer_list)) {
1179                 list_add_tail(&lp->lp_peer_list, &ptable->pt_peer_list);
1180                 ptable->pt_peers++;
1181         }
1182
1183
1184         /* Update peer state */
1185         spin_lock(&lp->lp_lock);
1186         if (flags & LNET_PEER_CONFIGURED) {
1187                 if (!(lp->lp_state & LNET_PEER_CONFIGURED))
1188                         lp->lp_state |= LNET_PEER_CONFIGURED;
1189         }
1190         if (flags & LNET_PEER_MULTI_RAIL) {
1191                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1192                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1193                         lnet_peer_clr_non_mr_pref_nids(lp);
1194                 }
1195         }
1196         spin_unlock(&lp->lp_lock);
1197
1198         lp->lp_nnis++;
1199         lnet_net_unlock(LNET_LOCK_EX);
1200
1201         CDEBUG(D_NET, "peer %s NID %s flags %#x\n",
1202                libcfs_nid2str(lp->lp_primary_nid),
1203                libcfs_nid2str(lpni->lpni_nid), flags);
1204
1205         return 0;
1206 }
1207
1208 /*
1209  * Create a new peer, with nid as its primary nid.
1210  *
1211  * Call with the lnet_api_mutex held.
1212  */
1213 static int
1214 lnet_peer_add(lnet_nid_t nid, unsigned flags)
1215 {
1216         struct lnet_peer *lp;
1217         struct lnet_peer_net *lpn;
1218         struct lnet_peer_ni *lpni;
1219         int rc = 0;
1220
1221         LASSERT(nid != LNET_NID_ANY);
1222
1223         /*
1224          * No need for the lnet_net_lock here, because the
1225          * lnet_api_mutex is held.
1226          */
1227         lpni = lnet_find_peer_ni_locked(nid);
1228         if (lpni) {
1229                 /* A peer with this NID already exists. */
1230                 lp = lpni->lpni_peer_net->lpn_peer;
1231                 lnet_peer_ni_decref_locked(lpni);
1232                 /*
1233                  * This is an error if the peer was configured and the
1234                  * primary NID differs or an attempt is made to change
1235                  * the Multi-Rail flag. Otherwise the assumption is
1236                  * that an existing peer is being modified.
1237                  */
1238                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1239                         if (lp->lp_primary_nid != nid)
1240                                 rc = -EEXIST;
1241                         else if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL)
1242                                 rc = -EPERM;
1243                         goto out;
1244                 }
1245                 /* Delete and recreate as a configured peer. */
1246                 lnet_peer_del(lp);
1247         }
1248
1249         /* Create peer, peer_net, and peer_ni. */
1250         rc = -ENOMEM;
1251         lp = lnet_peer_alloc(nid);
1252         if (!lp)
1253                 goto out;
1254         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1255         if (!lpn)
1256                 goto out_free_lp;
1257         lpni = lnet_peer_ni_alloc(nid);
1258         if (!lpni)
1259                 goto out_free_lpn;
1260
1261         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1262
1263 out_free_lpn:
1264         LIBCFS_FREE(lpn, sizeof(*lpn));
1265 out_free_lp:
1266         LIBCFS_FREE(lp, sizeof(*lp));
1267 out:
1268         CDEBUG(D_NET, "peer %s NID flags %#x: %d\n",
1269                libcfs_nid2str(nid), flags, rc);
1270         return rc;
1271 }
1272
1273 /*
1274  * Add a NID to a peer. Call with ln_api_mutex held.
1275  *
1276  * Error codes:
1277  *  -EPERM:    Non-DLC addition to a DLC-configured peer.
1278  *  -EEXIST:   The NID was configured by DLC for a different peer.
1279  *  -ENOMEM:   Out of memory.
1280  *  -ENOTUNIQ: Adding a second peer NID on a single network on a
1281  *             non-multi-rail peer.
1282  */
1283 static int
1284 lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1285 {
1286         struct lnet_peer_net *lpn;
1287         struct lnet_peer_ni *lpni;
1288         int rc = 0;
1289
1290         LASSERT(lp);
1291         LASSERT(nid != LNET_NID_ANY);
1292
1293         /* A configured peer can only be updated through configuration. */
1294         if (!(flags & LNET_PEER_CONFIGURED)) {
1295                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1296                         rc = -EPERM;
1297                         goto out;
1298                 }
1299         }
1300
1301         /*
1302          * The MULTI_RAIL flag can be set but not cleared, because
1303          * that would leave the peer struct in an invalid state.
1304          */
1305         if (flags & LNET_PEER_MULTI_RAIL) {
1306                 spin_lock(&lp->lp_lock);
1307                 if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1308                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1309                         lnet_peer_clr_non_mr_pref_nids(lp);
1310                 }
1311                 spin_unlock(&lp->lp_lock);
1312         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
1313                 rc = -EPERM;
1314                 goto out;
1315         }
1316
1317         lpni = lnet_find_peer_ni_locked(nid);
1318         if (lpni) {
1319                 /*
1320                  * A peer_ni already exists. This is only a problem if
1321                  * it is not connected to this peer and was configured
1322                  * by DLC.
1323                  */
1324                 lnet_peer_ni_decref_locked(lpni);
1325                 if (lpni->lpni_peer_net->lpn_peer == lp)
1326                         goto out;
1327                 if (lnet_peer_ni_is_configured(lpni)) {
1328                         rc = -EEXIST;
1329                         goto out;
1330                 }
1331                 /* If this is the primary NID, destroy the peer. */
1332                 if (lnet_peer_ni_is_primary(lpni)) {
1333                         lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
1334                         lpni = lnet_peer_ni_alloc(nid);
1335                         if (!lpni) {
1336                                 rc = -ENOMEM;
1337                                 goto out;
1338                         }
1339                 }
1340         } else {
1341                 lpni = lnet_peer_ni_alloc(nid);
1342                 if (!lpni) {
1343                         rc = -ENOMEM;
1344                         goto out;
1345                 }
1346         }
1347
1348         /*
1349          * Get the peer_net. Check that we're not adding a second
1350          * peer_ni on a peer_net of a non-multi-rail peer.
1351          */
1352         lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
1353         if (!lpn) {
1354                 lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1355                 if (!lpn) {
1356                         rc = -ENOMEM;
1357                         goto out_free_lpni;
1358                 }
1359         } else if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1360                 rc = -ENOTUNIQ;
1361                 goto out_free_lpni;
1362         }
1363
1364         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1365
1366 out_free_lpni:
1367         /* If the peer_ni was allocated above its peer_net pointer is NULL */
1368         if (!lpni->lpni_peer_net)
1369                 LIBCFS_FREE(lpni, sizeof(*lpni));
1370 out:
1371         CDEBUG(D_NET, "peer %s NID %s flags %#x: %d\n",
1372                libcfs_nid2str(lp->lp_primary_nid), libcfs_nid2str(nid),
1373                flags, rc);
1374         return rc;
1375 }
1376
1377 /*
1378  * Update the primary NID of a peer, if possible.
1379  *
1380  * Call with the lnet_api_mutex held.
1381  */
1382 static int
1383 lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
1384 {
1385         lnet_nid_t old = lp->lp_primary_nid;
1386         int rc = 0;
1387
1388         if (lp->lp_primary_nid == nid)
1389                 goto out;
1390         rc = lnet_peer_add_nid(lp, nid, flags);
1391         if (rc)
1392                 goto out;
1393         lp->lp_primary_nid = nid;
1394 out:
1395         CDEBUG(D_NET, "peer %s NID %s: %d\n",
1396                libcfs_nid2str(old), libcfs_nid2str(nid), rc);
1397         return rc;
1398 }
1399
1400 /*
1401  * lpni creation initiated due to traffic either sending or receiving.
1402  */
1403 static int
1404 lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
1405 {
1406         struct lnet_peer *lp;
1407         struct lnet_peer_net *lpn;
1408         struct lnet_peer_ni *lpni;
1409         unsigned flags = 0;
1410         int rc = 0;
1411
1412         if (nid == LNET_NID_ANY) {
1413                 rc = -EINVAL;
1414                 goto out;
1415         }
1416
1417         /* lnet_net_lock is not needed here because ln_api_lock is held */
1418         lpni = lnet_find_peer_ni_locked(nid);
1419         if (lpni) {
1420                 /*
1421                  * We must have raced with another thread. Since we
1422                  * know next to nothing about a peer_ni created by
1423                  * traffic, we just assume everything is ok and
1424                  * return.
1425                  */
1426                 lnet_peer_ni_decref_locked(lpni);
1427                 goto out;
1428         }
1429
1430         /* Create peer, peer_net, and peer_ni. */
1431         rc = -ENOMEM;
1432         lp = lnet_peer_alloc(nid);
1433         if (!lp)
1434                 goto out;
1435         lpn = lnet_peer_net_alloc(LNET_NIDNET(nid));
1436         if (!lpn)
1437                 goto out_free_lp;
1438         lpni = lnet_peer_ni_alloc(nid);
1439         if (!lpni)
1440                 goto out_free_lpn;
1441         if (pref != LNET_NID_ANY)
1442                 lnet_peer_ni_set_non_mr_pref_nid(lpni, pref);
1443
1444         return lnet_peer_attach_peer_ni(lp, lpn, lpni, flags);
1445
1446 out_free_lpn:
1447         LIBCFS_FREE(lpn, sizeof(*lpn));
1448 out_free_lp:
1449         LIBCFS_FREE(lp, sizeof(*lp));
1450 out:
1451         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(nid), rc);
1452         return rc;
1453 }
1454
1455 /*
1456  * Implementation of IOC_LIBCFS_ADD_PEER_NI.
1457  *
1458  * This API handles the following combinations:
1459  *   Create a peer with its primary NI if only the prim_nid is provided
1460  *   Add a NID to a peer identified by the prim_nid. The peer identified
1461  *   by the prim_nid must already exist.
1462  *   The peer being created may be non-MR.
1463  *
1464  * The caller must hold ln_api_mutex. This prevents the peer from
1465  * being created/modified/deleted by a different thread.
1466  */
1467 int
1468 lnet_add_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid, bool mr)
1469 {
1470         struct lnet_peer *lp = NULL;
1471         struct lnet_peer_ni *lpni;
1472         unsigned flags;
1473
1474         /* The prim_nid must always be specified */
1475         if (prim_nid == LNET_NID_ANY)
1476                 return -EINVAL;
1477
1478         flags = LNET_PEER_CONFIGURED;
1479         if (mr)
1480                 flags |= LNET_PEER_MULTI_RAIL;
1481
1482         /*
1483          * If nid isn't specified, we must create a new peer with
1484          * prim_nid as its primary nid.
1485          */
1486         if (nid == LNET_NID_ANY)
1487                 return lnet_peer_add(prim_nid, flags);
1488
1489         /* Look up the prim_nid, which must exist. */
1490         lpni = lnet_find_peer_ni_locked(prim_nid);
1491         if (!lpni)
1492                 return -ENOENT;
1493         lnet_peer_ni_decref_locked(lpni);
1494         lp = lpni->lpni_peer_net->lpn_peer;
1495
1496         /* Peer must have been configured. */
1497         if (!(lp->lp_state & LNET_PEER_CONFIGURED)) {
1498                 CDEBUG(D_NET, "peer %s was not configured\n",
1499                        libcfs_nid2str(prim_nid));
1500                 return -ENOENT;
1501         }
1502
1503         /* Primary NID must match */
1504         if (lp->lp_primary_nid != prim_nid) {
1505                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1506                        libcfs_nid2str(prim_nid),
1507                        libcfs_nid2str(lp->lp_primary_nid));
1508                 return -ENODEV;
1509         }
1510
1511         /* Multi-Rail flag must match. */
1512         if ((lp->lp_state ^ flags) & LNET_PEER_MULTI_RAIL) {
1513                 CDEBUG(D_NET, "multi-rail state mismatch for peer %s\n",
1514                        libcfs_nid2str(prim_nid));
1515                 return -EPERM;
1516         }
1517
1518         return lnet_peer_add_nid(lp, nid, flags);
1519 }
1520
1521 /*
1522  * Implementation of IOC_LIBCFS_DEL_PEER_NI.
1523  *
1524  * This API handles the following combinations:
1525  *   Delete a NI from a peer if both prim_nid and nid are provided.
1526  *   Delete a peer if only prim_nid is provided.
1527  *   Delete a peer if its primary nid is provided.
1528  *
1529  * The caller must hold ln_api_mutex. This prevents the peer from
1530  * being modified/deleted by a different thread.
1531  */
1532 int
1533 lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
1534 {
1535         struct lnet_peer *lp;
1536         struct lnet_peer_ni *lpni;
1537         unsigned flags;
1538
1539         if (prim_nid == LNET_NID_ANY)
1540                 return -EINVAL;
1541
1542         lpni = lnet_find_peer_ni_locked(prim_nid);
1543         if (!lpni)
1544                 return -ENOENT;
1545         lnet_peer_ni_decref_locked(lpni);
1546         lp = lpni->lpni_peer_net->lpn_peer;
1547
1548         if (prim_nid != lp->lp_primary_nid) {
1549                 CDEBUG(D_NET, "prim_nid %s is not primary for peer %s\n",
1550                        libcfs_nid2str(prim_nid),
1551                        libcfs_nid2str(lp->lp_primary_nid));
1552                 return -ENODEV;
1553         }
1554
1555         if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
1556                 return lnet_peer_del(lp);
1557
1558         flags = LNET_PEER_CONFIGURED;
1559         if (lp->lp_state & LNET_PEER_MULTI_RAIL)
1560                 flags |= LNET_PEER_MULTI_RAIL;
1561
1562         return lnet_peer_del_nid(lp, nid, flags);
1563 }
1564
1565 void
1566 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
1567 {
1568         struct lnet_peer_table *ptable;
1569         struct lnet_peer_net *lpn;
1570
1571         CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
1572
1573         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
1574         LASSERT(lpni->lpni_rtr_refcount == 0);
1575         LASSERT(list_empty(&lpni->lpni_txq));
1576         LASSERT(lpni->lpni_txqnob == 0);
1577         LASSERT(list_empty(&lpni->lpni_peer_nis));
1578         LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
1579
1580         lpn = lpni->lpni_peer_net;
1581         lpni->lpni_peer_net = NULL;
1582         lpni->lpni_net = NULL;
1583
1584         /* remove the peer ni from the zombie list */
1585         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
1586         spin_lock(&ptable->pt_zombie_lock);
1587         list_del_init(&lpni->lpni_hashlist);
1588         ptable->pt_zombies--;
1589         spin_unlock(&ptable->pt_zombie_lock);
1590
1591         if (lpni->lpni_pref_nnids > 1) {
1592                 LIBCFS_FREE(lpni->lpni_pref.nids,
1593                         sizeof(*lpni->lpni_pref.nids) * lpni->lpni_pref_nnids);
1594         }
1595         LIBCFS_FREE(lpni, sizeof(*lpni));
1596
1597         lnet_peer_net_decref_locked(lpn);
1598 }
1599
1600 struct lnet_peer_ni *
1601 lnet_nid2peerni_ex(lnet_nid_t nid, int cpt)
1602 {
1603         struct lnet_peer_ni *lpni = NULL;
1604         int rc;
1605
1606         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1607                 return ERR_PTR(-ESHUTDOWN);
1608
1609         /*
1610          * find if a peer_ni already exists.
1611          * If so then just return that.
1612          */
1613         lpni = lnet_find_peer_ni_locked(nid);
1614         if (lpni)
1615                 return lpni;
1616
1617         lnet_net_unlock(cpt);
1618
1619         rc = lnet_peer_ni_traffic_add(nid, LNET_NID_ANY);
1620         if (rc) {
1621                 lpni = ERR_PTR(rc);
1622                 goto out_net_relock;
1623         }
1624
1625         lpni = lnet_find_peer_ni_locked(nid);
1626         LASSERT(lpni);
1627
1628 out_net_relock:
1629         lnet_net_lock(cpt);
1630
1631         return lpni;
1632 }
1633
1634 /*
1635  * Get a peer_ni for the given nid, create it if necessary. Takes a
1636  * hold on the peer_ni.
1637  */
1638 struct lnet_peer_ni *
1639 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
1640 {
1641         struct lnet_peer_ni *lpni = NULL;
1642         int rc;
1643
1644         if (the_lnet.ln_state != LNET_STATE_RUNNING)
1645                 return ERR_PTR(-ESHUTDOWN);
1646
1647         /*
1648          * find if a peer_ni already exists.
1649          * If so then just return that.
1650          */
1651         lpni = lnet_find_peer_ni_locked(nid);
1652         if (lpni)
1653                 return lpni;
1654
1655         /*
1656          * Slow path:
1657          * use the lnet_api_mutex to serialize the creation of the peer_ni
1658          * and the creation/deletion of the local ni/net. When a local ni is
1659          * created, if there exists a set of peer_nis on that network,
1660          * they need to be traversed and updated. When a local NI is
1661          * deleted, which could result in a network being deleted, then
1662          * all peer nis on that network need to be removed as well.
1663          *
1664          * Creation through traffic should also be serialized with
1665          * creation through DLC.
1666          */
1667         lnet_net_unlock(cpt);
1668         mutex_lock(&the_lnet.ln_api_mutex);
1669         /*
1670          * Shutdown is only set under the ln_api_lock, so a single
1671          * check here is sufficent.
1672          */
1673         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1674                 lpni = ERR_PTR(-ESHUTDOWN);
1675                 goto out_mutex_unlock;
1676         }
1677
1678         rc = lnet_peer_ni_traffic_add(nid, pref);
1679         if (rc) {
1680                 lpni = ERR_PTR(rc);
1681                 goto out_mutex_unlock;
1682         }
1683
1684         lpni = lnet_find_peer_ni_locked(nid);
1685         LASSERT(lpni);
1686
1687 out_mutex_unlock:
1688         mutex_unlock(&the_lnet.ln_api_mutex);
1689         lnet_net_lock(cpt);
1690
1691         /* Lock has been dropped, check again for shutdown. */
1692         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1693                 if (!IS_ERR(lpni))
1694                         lnet_peer_ni_decref_locked(lpni);
1695                 lpni = ERR_PTR(-ESHUTDOWN);
1696         }
1697
1698         return lpni;
1699 }
1700
1701 /*
1702  * Peer Discovery
1703  */
1704
1705 /*
1706  * Is a peer uptodate from the point of view of discovery?
1707  *
1708  * If it is currently being processed, obviously not.
1709  * A forced Ping or Push is also handled by the discovery thread.
1710  *
1711  * Otherwise look at whether the peer needs rediscovering.
1712  */
1713 bool
1714 lnet_peer_is_uptodate(struct lnet_peer *lp)
1715 {
1716         bool rc;
1717
1718         spin_lock(&lp->lp_lock);
1719         if (lp->lp_state & (LNET_PEER_DISCOVERING |
1720                             LNET_PEER_FORCE_PING |
1721                             LNET_PEER_FORCE_PUSH)) {
1722                 rc = false;
1723         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1724                 rc = true;
1725         } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
1726                 if (lnet_peer_discovery_disabled)
1727                         rc = true;
1728                 else
1729                         rc = false;
1730         } else if (lnet_peer_needs_push(lp)) {
1731                 rc = false;
1732         } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
1733                 if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
1734                         rc = true;
1735                 else
1736                         rc = false;
1737         } else {
1738                 rc = false;
1739         }
1740         spin_unlock(&lp->lp_lock);
1741
1742         return rc;
1743 }
1744
1745 /*
1746  * Queue a peer for the attention of the discovery thread.  Call with
1747  * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
1748  * -EALREADY if the peer was already queued.
1749  */
1750 static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
1751 {
1752         int rc;
1753
1754         spin_lock(&lp->lp_lock);
1755         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
1756                 lp->lp_state |= LNET_PEER_DISCOVERING;
1757         spin_unlock(&lp->lp_lock);
1758         if (list_empty(&lp->lp_dc_list)) {
1759                 lnet_peer_addref_locked(lp);
1760                 list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
1761                 wake_up(&the_lnet.ln_dc_waitq);
1762                 rc = 0;
1763         } else {
1764                 rc = -EALREADY;
1765         }
1766
1767         CDEBUG(D_NET, "Queue peer %s: %d\n",
1768                libcfs_nid2str(lp->lp_primary_nid), rc);
1769
1770         return rc;
1771 }
1772
1773 /*
1774  * Discovery of a peer is complete. Wake all waiters on the peer.
1775  * Call with lnet_net_lock/EX held.
1776  */
1777 static void lnet_peer_discovery_complete(struct lnet_peer *lp)
1778 {
1779         struct lnet_msg *msg, *tmp;
1780         int rc = 0;
1781         struct list_head pending_msgs;
1782
1783         INIT_LIST_HEAD(&pending_msgs);
1784
1785         CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n",
1786                libcfs_nid2str(lp->lp_primary_nid));
1787
1788         list_del_init(&lp->lp_dc_list);
1789         list_splice_init(&lp->lp_dc_pendq, &pending_msgs);
1790         wake_up_all(&lp->lp_dc_waitq);
1791
1792         lnet_net_unlock(LNET_LOCK_EX);
1793
1794         /* iterate through all pending messages and send them again */
1795         list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
1796                 list_del_init(&msg->msg_list);
1797                 if (lp->lp_dc_error) {
1798                         lnet_finalize(msg, lp->lp_dc_error);
1799                         continue;
1800                 }
1801
1802                 CDEBUG(D_NET, "sending pending message %s to target %s\n",
1803                        lnet_msgtyp2str(msg->msg_type),
1804                        libcfs_id2str(msg->msg_target));
1805                 rc = lnet_send(msg->msg_src_nid_param, msg,
1806                                msg->msg_rtr_nid_param);
1807                 if (rc < 0) {
1808                         CNETERR("Error sending %s to %s: %d\n",
1809                                lnet_msgtyp2str(msg->msg_type),
1810                                libcfs_id2str(msg->msg_target), rc);
1811                         lnet_finalize(msg, rc);
1812                 }
1813         }
1814         lnet_net_lock(LNET_LOCK_EX);
1815         lnet_peer_decref_locked(lp);
1816 }
1817
1818 /*
1819  * Handle inbound push.
1820  * Like any event handler, called with lnet_res_lock/CPT held.
1821  */
1822 void lnet_peer_push_event(struct lnet_event *ev)
1823 {
1824         struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
1825         struct lnet_peer *lp;
1826
1827         /* lnet_find_peer() adds a refcount */
1828         lp = lnet_find_peer(ev->source.nid);
1829         if (!lp) {
1830                 CDEBUG(D_NET, "Push Put from unknown %s (source %s). Ignoring...\n",
1831                        libcfs_nid2str(ev->initiator.nid),
1832                        libcfs_nid2str(ev->source.nid));
1833                 return;
1834         }
1835
1836         /* Ensure peer state remains consistent while we modify it. */
1837         spin_lock(&lp->lp_lock);
1838
1839         /*
1840          * If some kind of error happened the contents of the message
1841          * cannot be used. Clear the NIDS_UPTODATE and set the
1842          * FORCE_PING flag to trigger a ping.
1843          */
1844         if (ev->status) {
1845                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1846                 lp->lp_state |= LNET_PEER_FORCE_PING;
1847                 CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n",
1848                        ev->status,
1849                        libcfs_nid2str(lp->lp_primary_nid),
1850                        libcfs_nid2str(ev->source.nid));
1851                 goto out;
1852         }
1853
1854         /*
1855          * A push with invalid or corrupted info. Clear the UPTODATE
1856          * flag to trigger a ping.
1857          */
1858         if (lnet_ping_info_validate(&pbuf->pb_info)) {
1859                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1860                 lp->lp_state |= LNET_PEER_FORCE_PING;
1861                 CDEBUG(D_NET, "Corrupted Push from %s\n",
1862                        libcfs_nid2str(lp->lp_primary_nid));
1863                 goto out;
1864         }
1865
1866         /*
1867          * Make sure we'll allocate the correct size ping buffer when
1868          * pinging the peer.
1869          */
1870         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
1871                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
1872
1873         /*
1874          * A non-Multi-Rail peer is not supposed to be capable of
1875          * sending a push.
1876          */
1877         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) {
1878                 CERROR("Push from non-Multi-Rail peer %s dropped\n",
1879                        libcfs_nid2str(lp->lp_primary_nid));
1880                 goto out;
1881         }
1882
1883         /*
1884          * Check the MULTIRAIL flag. Complain if the peer was DLC
1885          * configured without it.
1886          */
1887         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
1888                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
1889                         CERROR("Push says %s is Multi-Rail, DLC says not\n",
1890                                libcfs_nid2str(lp->lp_primary_nid));
1891                 } else {
1892                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
1893                         lnet_peer_clr_non_mr_pref_nids(lp);
1894                 }
1895         }
1896
1897         /*
1898          * The peer may have discovery disabled at its end. Set
1899          * NO_DISCOVERY as appropriate.
1900          */
1901         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
1902                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
1903                        libcfs_nid2str(lp->lp_primary_nid));
1904                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
1905         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
1906                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
1907                        libcfs_nid2str(lp->lp_primary_nid));
1908                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
1909         }
1910
1911         /*
1912          * Check for truncation of the Put message. Clear the
1913          * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping,
1914          * and tell discovery to allocate a bigger buffer.
1915          */
1916         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
1917                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
1918                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
1919                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1920                 lp->lp_state |= LNET_PEER_FORCE_PING;
1921                 CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n",
1922                        libcfs_nid2str(lp->lp_primary_nid),
1923                        pbuf->pb_info.pi_nnis);
1924                 goto out;
1925         }
1926
1927         /*
1928          * Check whether the Put data is stale. Stale data can just be
1929          * dropped.
1930          */
1931         if (pbuf->pb_info.pi_nnis > 1 &&
1932             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid &&
1933             LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
1934                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1935                        libcfs_nid2str(lp->lp_primary_nid),
1936                        LNET_PING_BUFFER_SEQNO(pbuf),
1937                        lp->lp_peer_seqno);
1938                 goto out;
1939         }
1940
1941         /*
1942          * Check whether the Put data is new, in which case we clear
1943          * the UPTODATE flag and prepare to process it.
1944          *
1945          * If the Put data is current, and the peer is UPTODATE then
1946          * we assome everything is all right and drop the data as
1947          * stale.
1948          */
1949         if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) {
1950                 lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
1951                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1952         } else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) {
1953                 CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
1954                        libcfs_nid2str(lp->lp_primary_nid),
1955                        LNET_PING_BUFFER_SEQNO(pbuf),
1956                        lp->lp_peer_seqno);
1957                 goto out;
1958         }
1959
1960         /*
1961          * If there is data present that hasn't been processed yet,
1962          * we'll replace it if the Put contained newer data and it
1963          * fits. We're racing with a Ping or earlier Push in this
1964          * case.
1965          */
1966         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
1967                 if (LNET_PING_BUFFER_SEQNO(pbuf) >
1968                         LNET_PING_BUFFER_SEQNO(lp->lp_data) &&
1969                     pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) {
1970                         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1971                                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1972                         CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n",
1973                               libcfs_nid2str(lp->lp_primary_nid),
1974                               LNET_PING_BUFFER_SEQNO(pbuf),
1975                               LNET_PING_BUFFER_SEQNO(lp->lp_data));
1976                 }
1977                 goto out;
1978         }
1979
1980         /*
1981          * Allocate a buffer to copy the data. On a failure we drop
1982          * the Push and set FORCE_PING to force the discovery
1983          * thread to fix the problem by pinging the peer.
1984          */
1985         lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC);
1986         if (!lp->lp_data) {
1987                 lp->lp_state |= LNET_PEER_FORCE_PING;
1988                 CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n",
1989                        libcfs_nid2str(lp->lp_primary_nid),
1990                        LNET_PING_BUFFER_SEQNO(pbuf));
1991                 goto out;
1992         }
1993
1994         /* Success */
1995         memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
1996                LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
1997         lp->lp_state |= LNET_PEER_DATA_PRESENT;
1998         CDEBUG(D_NET, "Received Push %s %u\n",
1999                libcfs_nid2str(lp->lp_primary_nid),
2000                LNET_PING_BUFFER_SEQNO(pbuf));
2001
2002 out:
2003         /*
2004          * Queue the peer for discovery if not done, force it on the request
2005          * queue and wake the discovery thread if the peer was already queued,
2006          * because its status changed.
2007          */
2008         spin_unlock(&lp->lp_lock);
2009         lnet_net_lock(LNET_LOCK_EX);
2010         if (!lnet_peer_is_uptodate(lp) && lnet_peer_queue_for_discovery(lp)) {
2011                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2012                 wake_up(&the_lnet.ln_dc_waitq);
2013         }
2014         /* Drop refcount from lookup */
2015         lnet_peer_decref_locked(lp);
2016         lnet_net_unlock(LNET_LOCK_EX);
2017 }
2018
2019 /*
2020  * Clear the discovery error state, unless we're already discovering
2021  * this peer, in which case the error is current.
2022  */
2023 static void lnet_peer_clear_discovery_error(struct lnet_peer *lp)
2024 {
2025         spin_lock(&lp->lp_lock);
2026         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
2027                 lp->lp_dc_error = 0;
2028         spin_unlock(&lp->lp_lock);
2029 }
2030
2031 /*
2032  * Peer discovery slow path. The ln_api_mutex is held on entry, and
2033  * dropped/retaken within this function. An lnet_peer_ni is passed in
2034  * because discovery could tear down an lnet_peer.
2035  */
2036 int
2037 lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block)
2038 {
2039         DEFINE_WAIT(wait);
2040         struct lnet_peer *lp;
2041         int rc = 0;
2042
2043 again:
2044         lnet_net_unlock(cpt);
2045         lnet_net_lock(LNET_LOCK_EX);
2046         lp = lpni->lpni_peer_net->lpn_peer;
2047         lnet_peer_clear_discovery_error(lp);
2048
2049         /*
2050          * We're willing to be interrupted. The lpni can become a
2051          * zombie if we race with DLC, so we must check for that.
2052          */
2053         for (;;) {
2054                 prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
2055                 if (signal_pending(current))
2056                         break;
2057                 if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2058                         break;
2059                 if (lp->lp_dc_error)
2060                         break;
2061                 if (lnet_peer_is_uptodate(lp))
2062                         break;
2063                 lnet_peer_queue_for_discovery(lp);
2064                 /*
2065                  * if caller requested a non-blocking operation then
2066                  * return immediately. Once discovery is complete then the
2067                  * peer ref will be decremented and any pending messages
2068                  * that were stopped due to discovery will be transmitted.
2069                  */
2070                 if (!block)
2071                         break;
2072
2073                 lnet_peer_addref_locked(lp);
2074                 lnet_net_unlock(LNET_LOCK_EX);
2075                 schedule();
2076                 finish_wait(&lp->lp_dc_waitq, &wait);
2077                 lnet_net_lock(LNET_LOCK_EX);
2078                 lnet_peer_decref_locked(lp);
2079                 /* Peer may have changed */
2080                 lp = lpni->lpni_peer_net->lpn_peer;
2081         }
2082         finish_wait(&lp->lp_dc_waitq, &wait);
2083
2084         lnet_net_unlock(LNET_LOCK_EX);
2085         lnet_net_lock(cpt);
2086
2087         /*
2088          * If the peer has changed after we've discovered the older peer,
2089          * then we need to discovery the new peer to make sure the
2090          * interface information is up to date
2091          */
2092         if (lp != lpni->lpni_peer_net->lpn_peer)
2093                 goto again;
2094
2095         if (signal_pending(current))
2096                 rc = -EINTR;
2097         else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
2098                 rc = -ESHUTDOWN;
2099         else if (lp->lp_dc_error)
2100                 rc = lp->lp_dc_error;
2101         else if (!block)
2102                 CDEBUG(D_NET, "non-blocking discovery\n");
2103         else if (!lnet_peer_is_uptodate(lp))
2104                 goto again;
2105
2106         CDEBUG(D_NET, "peer %s NID %s: %d. %s\n",
2107                (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"),
2108                libcfs_nid2str(lpni->lpni_nid), rc,
2109                (!block) ? "pending discovery" : "discovery complete");
2110
2111         return rc;
2112 }
2113
2114 /* Handle an incoming ack for a push. */
2115 static void
2116 lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev)
2117 {
2118         struct lnet_ping_buffer *pbuf;
2119
2120         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2121         spin_lock(&lp->lp_lock);
2122         lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2123         lp->lp_push_error = ev->status;
2124         if (ev->status)
2125                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2126         else
2127                 lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2128         spin_unlock(&lp->lp_lock);
2129
2130         CDEBUG(D_NET, "peer %s ev->status %d\n",
2131                libcfs_nid2str(lp->lp_primary_nid), ev->status);
2132 }
2133
2134 /* Handle a Reply message. This is the reply to a Ping message. */
2135 static void
2136 lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
2137 {
2138         struct lnet_ping_buffer *pbuf;
2139         int rc;
2140
2141         spin_lock(&lp->lp_lock);
2142
2143         /*
2144          * If some kind of error happened the contents of message
2145          * cannot be used. Set PING_FAILED to trigger a retry.
2146          */
2147         if (ev->status) {
2148                 lp->lp_state |= LNET_PEER_PING_FAILED;
2149                 lp->lp_ping_error = ev->status;
2150                 CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n",
2151                        ev->status,
2152                        libcfs_nid2str(lp->lp_primary_nid),
2153                        libcfs_nid2str(ev->source.nid));
2154                 goto out;
2155         }
2156
2157         pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
2158         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
2159                 lnet_swap_pinginfo(pbuf);
2160
2161         /*
2162          * A reply with invalid or corrupted info. Set PING_FAILED to
2163          * trigger a retry.
2164          */
2165         rc = lnet_ping_info_validate(&pbuf->pb_info);
2166         if (rc) {
2167                 lp->lp_state |= LNET_PEER_PING_FAILED;
2168                 lp->lp_ping_error = 0;
2169                 CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n",
2170                        libcfs_nid2str(lp->lp_primary_nid), rc);
2171                 goto out;
2172         }
2173
2174         /*
2175          * Update the MULTI_RAIL flag based on the reply. If the peer
2176          * was configured with DLC then the setting should match what
2177          * DLC put in.
2178          */
2179         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
2180                 if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2181                         /* Everything's fine */
2182                 } else if (lp->lp_state & LNET_PEER_CONFIGURED) {
2183                         CWARN("Reply says %s is Multi-Rail, DLC says not\n",
2184                               libcfs_nid2str(lp->lp_primary_nid));
2185                 } else {
2186                         lp->lp_state |= LNET_PEER_MULTI_RAIL;
2187                         lnet_peer_clr_non_mr_pref_nids(lp);
2188                 }
2189         } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2190                 if (lp->lp_state & LNET_PEER_CONFIGURED) {
2191                         CWARN("DLC says %s is Multi-Rail, Reply says not\n",
2192                               libcfs_nid2str(lp->lp_primary_nid));
2193                 } else {
2194                         CERROR("Multi-Rail state vanished from %s\n",
2195                                libcfs_nid2str(lp->lp_primary_nid));
2196                         lp->lp_state &= ~LNET_PEER_MULTI_RAIL;
2197                 }
2198         }
2199
2200         /*
2201          * Make sure we'll allocate the correct size ping buffer when
2202          * pinging the peer.
2203          */
2204         if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
2205                 lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
2206
2207         /*
2208          * The peer may have discovery disabled at its end. Set
2209          * NO_DISCOVERY as appropriate.
2210          */
2211         if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
2212                 CDEBUG(D_NET, "Peer %s has discovery disabled\n",
2213                        libcfs_nid2str(lp->lp_primary_nid));
2214                 lp->lp_state |= LNET_PEER_NO_DISCOVERY;
2215         } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
2216                 CDEBUG(D_NET, "Peer %s has discovery enabled\n",
2217                        libcfs_nid2str(lp->lp_primary_nid));
2218                 lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
2219         }
2220
2221         /*
2222          * Check for truncation of the Reply. Clear PING_SENT and set
2223          * PING_FAILED to trigger a retry.
2224          */
2225         if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
2226                 if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
2227                         the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
2228                 lp->lp_state |= LNET_PEER_PING_FAILED;
2229                 lp->lp_ping_error = 0;
2230                 CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n",
2231                        libcfs_nid2str(lp->lp_primary_nid),
2232                        pbuf->pb_info.pi_nnis);
2233                 goto out;
2234         }
2235
2236         /*
2237          * Check the sequence numbers in the reply. These are only
2238          * available if the reply came from a Multi-Rail peer.
2239          */
2240         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
2241             pbuf->pb_info.pi_nnis > 1 &&
2242             lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
2243                 if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
2244                         CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n",
2245                                 libcfs_nid2str(lp->lp_primary_nid),
2246                                 LNET_PING_BUFFER_SEQNO(pbuf),
2247                                 lp->lp_peer_seqno);
2248                         goto out;
2249                 }
2250
2251                 if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno)
2252                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2253         }
2254
2255         /* We're happy with the state of the data in the buffer. */
2256         CDEBUG(D_NET, "peer %s data present %u\n",
2257                libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno);
2258         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
2259                 lnet_ping_buffer_decref(lp->lp_data);
2260         else
2261                 lp->lp_state |= LNET_PEER_DATA_PRESENT;
2262         lnet_ping_buffer_addref(pbuf);
2263         lp->lp_data = pbuf;
2264 out:
2265         lp->lp_state &= ~LNET_PEER_PING_SENT;
2266         spin_unlock(&lp->lp_lock);
2267 }
2268
2269 /*
2270  * Send event handling. Only matters for error cases, where we clean
2271  * up state on the peer and peer_ni that would otherwise be updated in
2272  * the REPLY event handler for a successful Ping, and the ACK event
2273  * handler for a successful Push.
2274  */
2275 static int
2276 lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev)
2277 {
2278         int rc = 0;
2279
2280         if (!ev->status)
2281                 goto out;
2282
2283         spin_lock(&lp->lp_lock);
2284         if (ev->msg_type == LNET_MSG_GET) {
2285                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2286                 lp->lp_state |= LNET_PEER_PING_FAILED;
2287                 lp->lp_ping_error = ev->status;
2288         } else { /* ev->msg_type == LNET_MSG_PUT */
2289                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2290                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2291                 lp->lp_push_error = ev->status;
2292         }
2293         spin_unlock(&lp->lp_lock);
2294         rc = LNET_REDISCOVER_PEER;
2295 out:
2296         CDEBUG(D_NET, "%s Send to %s: %d\n",
2297                 (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"),
2298                 libcfs_nid2str(ev->target.nid), rc);
2299         return rc;
2300 }
2301
2302 /*
2303  * Unlink event handling. This event is only seen if a call to
2304  * LNetMDUnlink() caused the event to be unlinked. If this call was
2305  * made after the event was set up in LNetGet() or LNetPut() then we
2306  * assume the Ping or Push timed out.
2307  */
2308 static void
2309 lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev)
2310 {
2311         spin_lock(&lp->lp_lock);
2312         /* We've passed through LNetGet() */
2313         if (lp->lp_state & LNET_PEER_PING_SENT) {
2314                 lp->lp_state &= ~LNET_PEER_PING_SENT;
2315                 lp->lp_state |= LNET_PEER_PING_FAILED;
2316                 lp->lp_ping_error = -ETIMEDOUT;
2317                 CDEBUG(D_NET, "Ping Unlink for message to peer %s\n",
2318                         libcfs_nid2str(lp->lp_primary_nid));
2319         }
2320         /* We've passed through LNetPut() */
2321         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2322                 lp->lp_state &= ~LNET_PEER_PUSH_SENT;
2323                 lp->lp_state |= LNET_PEER_PUSH_FAILED;
2324                 lp->lp_push_error = -ETIMEDOUT;
2325                 CDEBUG(D_NET, "Push Unlink for message to peer %s\n",
2326                         libcfs_nid2str(lp->lp_primary_nid));
2327         }
2328         spin_unlock(&lp->lp_lock);
2329 }
2330
2331 /*
2332  * Event handler for the discovery EQ.
2333  *
2334  * Called with lnet_res_lock(cpt) held. The cpt is the
2335  * lnet_cpt_of_cookie() of the md handle cookie.
2336  */
2337 static void lnet_discovery_event_handler(struct lnet_event *event)
2338 {
2339         struct lnet_peer *lp = event->md.user_ptr;
2340         struct lnet_ping_buffer *pbuf;
2341         int rc;
2342
2343         /* discovery needs to take another look */
2344         rc = LNET_REDISCOVER_PEER;
2345
2346         CDEBUG(D_NET, "Received event: %d\n", event->type);
2347
2348         switch (event->type) {
2349         case LNET_EVENT_ACK:
2350                 lnet_discovery_event_ack(lp, event);
2351                 break;
2352         case LNET_EVENT_REPLY:
2353                 lnet_discovery_event_reply(lp, event);
2354                 break;
2355         case LNET_EVENT_SEND:
2356                 /* Only send failure triggers a retry. */
2357                 rc = lnet_discovery_event_send(lp, event);
2358                 break;
2359         case LNET_EVENT_UNLINK:
2360                 /* LNetMDUnlink() was called */
2361                 lnet_discovery_event_unlink(lp, event);
2362                 break;
2363         default:
2364                 /* Invalid events. */
2365                 LBUG();
2366         }
2367         lnet_net_lock(LNET_LOCK_EX);
2368         if (event->unlinked) {
2369                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
2370                 lnet_ping_buffer_decref(pbuf);
2371                 lnet_peer_decref_locked(lp);
2372         }
2373
2374         /* put peer back at end of request queue, if discovery not already
2375          * done */
2376         if (rc == LNET_REDISCOVER_PEER && !lnet_peer_is_uptodate(lp)) {
2377                 list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2378                 wake_up(&the_lnet.ln_dc_waitq);
2379         }
2380         lnet_net_unlock(LNET_LOCK_EX);
2381 }
2382
2383 /*
2384  * Build a peer from incoming data.
2385  *
2386  * The NIDs in the incoming data are supposed to be structured as follows:
2387  *  - loopback
2388  *  - primary NID
2389  *  - other NIDs in same net
2390  *  - NIDs in second net
2391  *  - NIDs in third net
2392  *  - ...
2393  * This due to the way the list of NIDs in the data is created.
2394  *
2395  * Note that this function will mark the peer uptodate unless an
2396  * ENOMEM is encontered. All other errors are due to a conflict
2397  * between the DLC configuration and what discovery sees. We treat DLC
2398  * as binding, and therefore set the NIDS_UPTODATE flag to prevent the
2399  * peer from becoming stuck in discovery.
2400  */
2401 static int lnet_peer_merge_data(struct lnet_peer *lp,
2402                                 struct lnet_ping_buffer *pbuf)
2403 {
2404         struct lnet_peer_ni *lpni;
2405         lnet_nid_t *curnis = NULL;
2406         lnet_nid_t *addnis = NULL;
2407         lnet_nid_t *delnis = NULL;
2408         unsigned flags;
2409         int ncurnis;
2410         int naddnis;
2411         int ndelnis;
2412         int nnis = 0;
2413         int i;
2414         int j;
2415         int rc;
2416
2417         flags = LNET_PEER_DISCOVERED;
2418         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2419                 flags |= LNET_PEER_MULTI_RAIL;
2420
2421         nnis = MAX(lp->lp_nnis, pbuf->pb_info.pi_nnis);
2422         LIBCFS_ALLOC(curnis, nnis * sizeof(lnet_nid_t));
2423         LIBCFS_ALLOC(addnis, nnis * sizeof(lnet_nid_t));
2424         LIBCFS_ALLOC(delnis, nnis * sizeof(lnet_nid_t));
2425         if (!curnis || !addnis || !delnis) {
2426                 rc = -ENOMEM;
2427                 goto out;
2428         }
2429         ncurnis = 0;
2430         naddnis = 0;
2431         ndelnis = 0;
2432
2433         /* Construct the list of NIDs present in peer. */
2434         lpni = NULL;
2435         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
2436                 curnis[ncurnis++] = lpni->lpni_nid;
2437
2438         /*
2439          * Check for NIDs in pbuf not present in curnis[].
2440          * The loop starts at 1 to skip the loopback NID.
2441          */
2442         for (i = 1; i < pbuf->pb_info.pi_nnis; i++) {
2443                 for (j = 0; j < ncurnis; j++)
2444                         if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
2445                                 break;
2446                 if (j == ncurnis)
2447                         addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid;
2448         }
2449         /*
2450          * Check for NIDs in curnis[] not present in pbuf.
2451          * The nested loop starts at 1 to skip the loopback NID.
2452          *
2453          * But never add the loopback NID to delnis[]: if it is
2454          * present in curnis[] then this peer is for this node.
2455          */
2456         for (i = 0; i < ncurnis; i++) {
2457                 if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND)
2458                         continue;
2459                 for (j = 1; j < pbuf->pb_info.pi_nnis; j++)
2460                         if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid)
2461                                 break;
2462                 if (j == pbuf->pb_info.pi_nnis)
2463                         delnis[ndelnis++] = curnis[i];
2464         }
2465
2466         for (i = 0; i < naddnis; i++) {
2467                 rc = lnet_peer_add_nid(lp, addnis[i], flags);
2468                 if (rc) {
2469                         CERROR("Error adding NID %s to peer %s: %d\n",
2470                                libcfs_nid2str(addnis[i]),
2471                                libcfs_nid2str(lp->lp_primary_nid), rc);
2472                         if (rc == -ENOMEM)
2473                                 goto out;
2474                 }
2475         }
2476         for (i = 0; i < ndelnis; i++) {
2477                 rc = lnet_peer_del_nid(lp, delnis[i], flags);
2478                 if (rc) {
2479                         CERROR("Error deleting NID %s from peer %s: %d\n",
2480                                libcfs_nid2str(delnis[i]),
2481                                libcfs_nid2str(lp->lp_primary_nid), rc);
2482                         if (rc == -ENOMEM)
2483                                 goto out;
2484                 }
2485         }
2486         /*
2487          * Errors other than -ENOMEM are due to peers having been
2488          * configured with DLC. Ignore these because DLC overrides
2489          * Discovery.
2490          */
2491         rc = 0;
2492 out:
2493         LIBCFS_FREE(curnis, nnis * sizeof(lnet_nid_t));
2494         LIBCFS_FREE(addnis, nnis * sizeof(lnet_nid_t));
2495         LIBCFS_FREE(delnis, nnis * sizeof(lnet_nid_t));
2496         lnet_ping_buffer_decref(pbuf);
2497         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2498
2499         if (rc) {
2500                 spin_lock(&lp->lp_lock);
2501                 lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
2502                 lp->lp_state |= LNET_PEER_FORCE_PING;
2503                 spin_unlock(&lp->lp_lock);
2504         }
2505         return rc;
2506 }
2507
2508 /*
2509  * The data in pbuf says lp is its primary peer, but the data was
2510  * received by a different peer. Try to update lp with the data.
2511  */
2512 static int
2513 lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
2514 {
2515         struct lnet_handle_md mdh;
2516
2517         /* Queue lp for discovery, and force it on the request queue. */
2518         lnet_net_lock(LNET_LOCK_EX);
2519         if (lnet_peer_queue_for_discovery(lp))
2520                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
2521         lnet_net_unlock(LNET_LOCK_EX);
2522
2523         LNetInvalidateMDHandle(&mdh);
2524
2525         /*
2526          * Decide whether we can move the peer to the DATA_PRESENT state.
2527          *
2528          * We replace stale data for a multi-rail peer, repair PING_FAILED
2529          * status, and preempt FORCE_PING.
2530          *
2531          * If after that we have DATA_PRESENT, we merge it into this peer.
2532          */
2533         spin_lock(&lp->lp_lock);
2534         if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
2535                 if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) {
2536                         lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
2537                 } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2538                         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2539                         lnet_ping_buffer_decref(pbuf);
2540                         pbuf = lp->lp_data;
2541                         lp->lp_data = NULL;
2542                 }
2543         }
2544         if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
2545                 lnet_ping_buffer_decref(lp->lp_data);
2546                 lp->lp_data = NULL;
2547                 lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2548         }
2549         if (lp->lp_state & LNET_PEER_PING_FAILED) {
2550                 mdh = lp->lp_ping_mdh;
2551                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2552                 lp->lp_state &= ~LNET_PEER_PING_FAILED;
2553                 lp->lp_ping_error = 0;
2554         }
2555         if (lp->lp_state & LNET_PEER_FORCE_PING)
2556                 lp->lp_state &= ~LNET_PEER_FORCE_PING;
2557         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2558         spin_unlock(&lp->lp_lock);
2559
2560         if (!LNetMDHandleIsInvalid(mdh))
2561                 LNetMDUnlink(mdh);
2562
2563         if (pbuf)
2564                 return lnet_peer_merge_data(lp, pbuf);
2565
2566         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2567         return 0;
2568 }
2569
2570 /*
2571  * Update a peer using the data received.
2572  */
2573 static int lnet_peer_data_present(struct lnet_peer *lp)
2574 __must_hold(&lp->lp_lock)
2575 {
2576         struct lnet_ping_buffer *pbuf;
2577         struct lnet_peer_ni *lpni;
2578         lnet_nid_t nid = LNET_NID_ANY;
2579         unsigned flags;
2580         int rc = 0;
2581
2582         pbuf = lp->lp_data;
2583         lp->lp_data = NULL;
2584         lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
2585         lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
2586         spin_unlock(&lp->lp_lock);
2587
2588         /*
2589          * Modifications of peer structures are done while holding the
2590          * ln_api_mutex. A global lock is required because we may be
2591          * modifying multiple peer structures, and a mutex greatly
2592          * simplifies memory management.
2593          *
2594          * The actual changes to the data structures must also protect
2595          * against concurrent lookups, for which the lnet_net_lock in
2596          * LNET_LOCK_EX mode is used.
2597          */
2598         mutex_lock(&the_lnet.ln_api_mutex);
2599         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
2600                 rc = -ESHUTDOWN;
2601                 goto out;
2602         }
2603
2604         /*
2605          * If this peer is not on the peer list then it is being torn
2606          * down, and our reference count may be all that is keeping it
2607          * alive. Don't do any work on it.
2608          */
2609         if (list_empty(&lp->lp_peer_list))
2610                 goto out;
2611
2612         flags = LNET_PEER_DISCOVERED;
2613         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
2614                 flags |= LNET_PEER_MULTI_RAIL;
2615
2616         /*
2617          * Check whether the primary NID in the message matches the
2618          * primary NID of the peer. If it does, update the peer, if
2619          * it it does not, check whether there is already a peer with
2620          * that primary NID. If no such peer exists, try to update
2621          * the primary NID of the current peer (allowed if it was
2622          * created due to message traffic) and complete the update.
2623          * If the peer did exist, hand off the data to it.
2624          *
2625          * The peer for the loopback interface is a special case: this
2626          * is the peer for the local node, and we want to set its
2627          * primary NID to the correct value here. Moreover, this peer
2628          * can show up with only the loopback NID in the ping buffer.
2629          */
2630         if (pbuf->pb_info.pi_nnis <= 1)
2631                 goto out;
2632         nid = pbuf->pb_info.pi_ni[1].ns_nid;
2633         if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) {
2634                 rc = lnet_peer_set_primary_nid(lp, nid, flags);
2635                 if (!rc)
2636                         rc = lnet_peer_merge_data(lp, pbuf);
2637         } else if (lp->lp_primary_nid == nid) {
2638                 rc = lnet_peer_merge_data(lp, pbuf);
2639         } else {
2640                 lpni = lnet_find_peer_ni_locked(nid);
2641                 if (!lpni) {
2642                         rc = lnet_peer_set_primary_nid(lp, nid, flags);
2643                         if (rc) {
2644                                 CERROR("Primary NID error %s versus %s: %d\n",
2645                                        libcfs_nid2str(lp->lp_primary_nid),
2646                                        libcfs_nid2str(nid), rc);
2647                         } else {
2648                                 rc = lnet_peer_merge_data(lp, pbuf);
2649                         }
2650                 } else {
2651                         rc = lnet_peer_set_primary_data(
2652                                 lpni->lpni_peer_net->lpn_peer, pbuf);
2653                         lnet_peer_ni_decref_locked(lpni);
2654                 }
2655         }
2656 out:
2657         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2658         mutex_unlock(&the_lnet.ln_api_mutex);
2659
2660         spin_lock(&lp->lp_lock);
2661         /* Tell discovery to re-check the peer immediately. */
2662         if (!rc)
2663                 rc = LNET_REDISCOVER_PEER;
2664         return rc;
2665 }
2666
2667 /*
2668  * A ping failed. Clear the PING_FAILED state and set the
2669  * FORCE_PING state, to ensure a retry even if discovery is
2670  * disabled. This avoids being left with incorrect state.
2671  */
2672 static int lnet_peer_ping_failed(struct lnet_peer *lp)
2673 __must_hold(&lp->lp_lock)
2674 {
2675         struct lnet_handle_md mdh;
2676         int rc;
2677
2678         mdh = lp->lp_ping_mdh;
2679         LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2680         lp->lp_state &= ~LNET_PEER_PING_FAILED;
2681         lp->lp_state |= LNET_PEER_FORCE_PING;
2682         rc = lp->lp_ping_error;
2683         lp->lp_ping_error = 0;
2684         spin_unlock(&lp->lp_lock);
2685
2686         if (!LNetMDHandleIsInvalid(mdh))
2687                 LNetMDUnlink(mdh);
2688
2689         CDEBUG(D_NET, "peer %s:%d\n",
2690                libcfs_nid2str(lp->lp_primary_nid), rc);
2691
2692         spin_lock(&lp->lp_lock);
2693         return rc ? rc : LNET_REDISCOVER_PEER;
2694 }
2695
2696 /*
2697  * Select NID to send a Ping or Push to.
2698  */
2699 static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
2700 {
2701         struct lnet_peer_ni *lpni;
2702
2703         /* Look for a direct-connected NID for this peer. */
2704         lpni = NULL;
2705         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2706                 if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
2707                         continue;
2708                 break;
2709         }
2710         if (lpni)
2711                 return lpni->lpni_nid;
2712
2713         /* Look for a routed-connected NID for this peer. */
2714         lpni = NULL;
2715         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
2716                 if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
2717                         continue;
2718                 break;
2719         }
2720         if (lpni)
2721                 return lpni->lpni_nid;
2722
2723         return LNET_NID_ANY;
2724 }
2725
2726 /* Active side of ping. */
2727 static int lnet_peer_send_ping(struct lnet_peer *lp)
2728 __must_hold(&lp->lp_lock)
2729 {
2730         lnet_nid_t pnid;
2731         int nnis;
2732         int rc;
2733         int cpt;
2734
2735         lp->lp_state |= LNET_PEER_PING_SENT;
2736         lp->lp_state &= ~LNET_PEER_FORCE_PING;
2737         spin_unlock(&lp->lp_lock);
2738
2739         cpt = lnet_net_lock_current();
2740         /* Refcount for MD. */
2741         lnet_peer_addref_locked(lp);
2742         pnid = lnet_peer_select_nid(lp);
2743         lnet_net_unlock(cpt);
2744
2745         nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN);
2746
2747         rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp,
2748                             the_lnet.ln_dc_eqh, false);
2749
2750         /*
2751          * if LNetMDBind in lnet_send_ping fails we need to decrement the
2752          * refcount on the peer, otherwise LNetMDUnlink will be called
2753          * which will eventually do that.
2754          */
2755         if (rc > 0) {
2756                 lnet_net_lock(cpt);
2757                 lnet_peer_decref_locked(lp);
2758                 lnet_net_unlock(cpt);
2759                 rc = -rc; /* change the rc to negative value */
2760                 goto fail_error;
2761         } else if (rc < 0) {
2762                 goto fail_error;
2763         }
2764
2765         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2766
2767         spin_lock(&lp->lp_lock);
2768         return 0;
2769
2770 fail_error:
2771         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2772         /*
2773          * The errors that get us here are considered hard errors and
2774          * cause Discovery to terminate. So we clear PING_SENT, but do
2775          * not set either PING_FAILED or FORCE_PING. In fact we need
2776          * to clear PING_FAILED, because the unlink event handler will
2777          * have set it if we called LNetMDUnlink() above.
2778          */
2779         spin_lock(&lp->lp_lock);
2780         lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED);
2781         return rc;
2782 }
2783
2784 /*
2785  * This function exists because you cannot call LNetMDUnlink() from an
2786  * event handler.
2787  */
2788 static int lnet_peer_push_failed(struct lnet_peer *lp)
2789 __must_hold(&lp->lp_lock)
2790 {
2791         struct lnet_handle_md mdh;
2792         int rc;
2793
2794         mdh = lp->lp_push_mdh;
2795         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2796         lp->lp_state &= ~LNET_PEER_PUSH_FAILED;
2797         rc = lp->lp_push_error;
2798         lp->lp_push_error = 0;
2799         spin_unlock(&lp->lp_lock);
2800
2801         if (!LNetMDHandleIsInvalid(mdh))
2802                 LNetMDUnlink(mdh);
2803
2804         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2805         spin_lock(&lp->lp_lock);
2806         return rc ? rc : LNET_REDISCOVER_PEER;
2807 }
2808
2809 /* Active side of push. */
2810 static int lnet_peer_send_push(struct lnet_peer *lp)
2811 __must_hold(&lp->lp_lock)
2812 {
2813         struct lnet_ping_buffer *pbuf;
2814         struct lnet_process_id id;
2815         struct lnet_md md;
2816         int cpt;
2817         int rc;
2818
2819         /* Don't push to a non-multi-rail peer. */
2820         if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
2821                 lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2822                 return 0;
2823         }
2824
2825         lp->lp_state |= LNET_PEER_PUSH_SENT;
2826         lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
2827         spin_unlock(&lp->lp_lock);
2828
2829         cpt = lnet_net_lock_current();
2830         pbuf = the_lnet.ln_ping_target;
2831         lnet_ping_buffer_addref(pbuf);
2832         lnet_net_unlock(cpt);
2833
2834         /* Push source MD */
2835         md.start     = &pbuf->pb_info;
2836         md.length    = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
2837         md.threshold = 2; /* Put/Ack */
2838         md.max_size  = 0;
2839         md.options   = 0;
2840         md.eq_handle = the_lnet.ln_dc_eqh;
2841         md.user_ptr  = lp;
2842
2843         rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
2844         if (rc) {
2845                 lnet_ping_buffer_decref(pbuf);
2846                 CERROR("Can't bind push source MD: %d\n", rc);
2847                 goto fail_error;
2848         }
2849         cpt = lnet_net_lock_current();
2850         /* Refcount for MD. */
2851         lnet_peer_addref_locked(lp);
2852         id.pid = LNET_PID_LUSTRE;
2853         id.nid = lnet_peer_select_nid(lp);
2854         lnet_net_unlock(cpt);
2855
2856         if (id.nid == LNET_NID_ANY) {
2857                 rc = -EHOSTUNREACH;
2858                 goto fail_unlink;
2859         }
2860
2861         rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh,
2862                      LNET_ACK_REQ, id, LNET_RESERVED_PORTAL,
2863                      LNET_PROTO_PING_MATCHBITS, 0, 0);
2864
2865         if (rc)
2866                 goto fail_unlink;
2867
2868         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2869
2870         spin_lock(&lp->lp_lock);
2871         return 0;
2872
2873 fail_unlink:
2874         LNetMDUnlink(lp->lp_push_mdh);
2875         LNetInvalidateMDHandle(&lp->lp_push_mdh);
2876 fail_error:
2877         CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
2878         /*
2879          * The errors that get us here are considered hard errors and
2880          * cause Discovery to terminate. So we clear PUSH_SENT, but do
2881          * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED,
2882          * because the unlink event handler will have set it if we
2883          * called LNetMDUnlink() above.
2884          */
2885         spin_lock(&lp->lp_lock);
2886         lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED);
2887         return rc;
2888 }
2889
2890 /*
2891  * An unrecoverable error was encountered during discovery.
2892  * Set error status in peer and abort discovery.
2893  */
2894 static void lnet_peer_discovery_error(struct lnet_peer *lp, int error)
2895 {
2896         CDEBUG(D_NET, "Discovery error %s: %d\n",
2897                libcfs_nid2str(lp->lp_primary_nid), error);
2898
2899         spin_lock(&lp->lp_lock);
2900         lp->lp_dc_error = error;
2901         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2902         lp->lp_state |= LNET_PEER_REDISCOVER;
2903         spin_unlock(&lp->lp_lock);
2904 }
2905
2906 /*
2907  * Mark the peer as discovered.
2908  */
2909 static int lnet_peer_discovered(struct lnet_peer *lp)
2910 __must_hold(&lp->lp_lock)
2911 {
2912         lp->lp_state |= LNET_PEER_DISCOVERED;
2913         lp->lp_state &= ~(LNET_PEER_DISCOVERING |
2914                           LNET_PEER_REDISCOVER);
2915
2916         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2917
2918         return 0;
2919 }
2920
2921 /*
2922  * Mark the peer as to be rediscovered.
2923  */
2924 static int lnet_peer_rediscover(struct lnet_peer *lp)
2925 __must_hold(&lp->lp_lock)
2926 {
2927         lp->lp_state |= LNET_PEER_REDISCOVER;
2928         lp->lp_state &= ~LNET_PEER_DISCOVERING;
2929
2930         CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
2931
2932         return 0;
2933 }
2934
2935 /*
2936  * Discovering this peer is taking too long. Cancel any Ping or Push
2937  * that discovery is waiting on by unlinking the relevant MDs. The
2938  * lnet_discovery_event_handler() will proceed from here and complete
2939  * the cleanup.
2940  */
2941 static void lnet_peer_cancel_discovery(struct lnet_peer *lp)
2942 {
2943         struct lnet_handle_md ping_mdh;
2944         struct lnet_handle_md push_mdh;
2945
2946         LNetInvalidateMDHandle(&ping_mdh);
2947         LNetInvalidateMDHandle(&push_mdh);
2948
2949         spin_lock(&lp->lp_lock);
2950         if (lp->lp_state & LNET_PEER_PING_SENT) {
2951                 ping_mdh = lp->lp_ping_mdh;
2952                 LNetInvalidateMDHandle(&lp->lp_ping_mdh);
2953         }
2954         if (lp->lp_state & LNET_PEER_PUSH_SENT) {
2955                 push_mdh = lp->lp_push_mdh;
2956                 LNetInvalidateMDHandle(&lp->lp_push_mdh);
2957         }
2958         spin_unlock(&lp->lp_lock);
2959
2960         if (!LNetMDHandleIsInvalid(ping_mdh))
2961                 LNetMDUnlink(ping_mdh);
2962         if (!LNetMDHandleIsInvalid(push_mdh))
2963                 LNetMDUnlink(push_mdh);
2964 }
2965
2966 /*
2967  * Wait for work to be queued or some other change that must be
2968  * attended to. Returns non-zero if the discovery thread should shut
2969  * down.
2970  */
2971 static int lnet_peer_discovery_wait_for_work(void)
2972 {
2973         int cpt;
2974         int rc = 0;
2975
2976         DEFINE_WAIT(wait);
2977
2978         cpt = lnet_net_lock_current();
2979         for (;;) {
2980                 prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
2981                                 TASK_INTERRUPTIBLE);
2982                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
2983                         break;
2984                 if (lnet_push_target_resize_needed())
2985                         break;
2986                 if (!list_empty(&the_lnet.ln_dc_request))
2987                         break;
2988                 if (!list_empty(&the_lnet.ln_msg_resend))
2989                         break;
2990                 lnet_net_unlock(cpt);
2991
2992                 /*
2993                  * wakeup max every second to check if there are peers that
2994                  * have been stuck on the working queue for greater than
2995                  * the peer timeout.
2996                  */
2997                 schedule_timeout(cfs_time_seconds(1));
2998                 finish_wait(&the_lnet.ln_dc_waitq, &wait);
2999                 cpt = lnet_net_lock_current();
3000         }
3001         finish_wait(&the_lnet.ln_dc_waitq, &wait);
3002
3003         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3004                 rc = -ESHUTDOWN;
3005
3006         lnet_net_unlock(cpt);
3007
3008         CDEBUG(D_NET, "woken: %d\n", rc);
3009
3010         return rc;
3011 }
3012
3013 /*
3014  * Messages that were pending on a destroyed peer will be put on a global
3015  * resend list. The message resend list will be checked by
3016  * the discovery thread when it wakes up, and will resend messages. These
3017  * messages can still be sendable in the case the lpni which was the initial
3018  * cause of the message re-queue was transfered to another peer.
3019  *
3020  * It is possible that LNet could be shutdown while we're iterating
3021  * through the list. lnet_shudown_lndnets() will attempt to access the
3022  * resend list, but will have to wait until the spinlock is released, by
3023  * which time there shouldn't be any more messages on the resend list.
3024  * During shutdown lnet_send() will fail and lnet_finalize() will be called
3025  * for the messages so they can be released. The other case is that
3026  * lnet_shudown_lndnets() can finalize all the messages before this
3027  * function can visit the resend list, in which case this function will be
3028  * a no-op.
3029  */
3030 static void lnet_resend_msgs(void)
3031 {
3032         struct lnet_msg *msg, *tmp;
3033         struct list_head resend;
3034         int rc;
3035
3036         INIT_LIST_HEAD(&resend);
3037
3038         spin_lock(&the_lnet.ln_msg_resend_lock);
3039         list_splice(&the_lnet.ln_msg_resend, &resend);
3040         spin_unlock(&the_lnet.ln_msg_resend_lock);
3041
3042         list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
3043                 list_del_init(&msg->msg_list);
3044                 rc = lnet_send(msg->msg_src_nid_param, msg,
3045                                msg->msg_rtr_nid_param);
3046                 if (rc < 0) {
3047                         CNETERR("Error sending %s to %s: %d\n",
3048                                lnet_msgtyp2str(msg->msg_type),
3049                                libcfs_id2str(msg->msg_target), rc);
3050                         lnet_finalize(msg, rc);
3051                 }
3052         }
3053 }
3054
3055 /* The discovery thread. */
3056 static int lnet_peer_discovery(void *arg)
3057 {
3058         struct lnet_peer *lp;
3059         int rc;
3060
3061         CDEBUG(D_NET, "started\n");
3062         cfs_block_allsigs();
3063
3064         for (;;) {
3065                 if (lnet_peer_discovery_wait_for_work())
3066                         break;
3067
3068                 lnet_resend_msgs();
3069
3070                 if (lnet_push_target_resize_needed())
3071                         lnet_push_target_resize();
3072
3073                 lnet_net_lock(LNET_LOCK_EX);
3074                 if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3075                         break;
3076
3077                 /*
3078                  * Process all incoming discovery work requests.  When
3079                  * discovery must wait on a peer to change state, it
3080                  * is added to the tail of the ln_dc_working queue. A
3081                  * timestamp keeps track of when the peer was added,
3082                  * so we can time out discovery requests that take too
3083                  * long.
3084                  */
3085                 while (!list_empty(&the_lnet.ln_dc_request)) {
3086                         lp = list_first_entry(&the_lnet.ln_dc_request,
3087                                               struct lnet_peer, lp_dc_list);
3088                         list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
3089                         /*
3090                          * set the time the peer was put on the dc_working
3091                          * queue. It shouldn't remain on the queue
3092                          * forever, in case the GET message (for ping)
3093                          * doesn't get a REPLY or the PUT message (for
3094                          * push) doesn't get an ACK.
3095                          */
3096                         lp->lp_last_queued = ktime_get_real_seconds();
3097                         lnet_net_unlock(LNET_LOCK_EX);
3098
3099                         /*
3100                          * Select an action depending on the state of
3101                          * the peer and whether discovery is disabled.
3102                          * The check whether discovery is disabled is
3103                          * done after the code that handles processing
3104                          * for arrived data, cleanup for failures, and
3105                          * forcing a Ping or Push.
3106                          */
3107                         spin_lock(&lp->lp_lock);
3108                         CDEBUG(D_NET, "peer %s state %#x\n",
3109                                 libcfs_nid2str(lp->lp_primary_nid),
3110                                 lp->lp_state);
3111                         if (lp->lp_state & LNET_PEER_DATA_PRESENT)
3112                                 rc = lnet_peer_data_present(lp);
3113                         else if (lp->lp_state & LNET_PEER_PING_FAILED)
3114                                 rc = lnet_peer_ping_failed(lp);
3115                         else if (lp->lp_state & LNET_PEER_PUSH_FAILED)
3116                                 rc = lnet_peer_push_failed(lp);
3117                         else if (lp->lp_state & LNET_PEER_FORCE_PING)
3118                                 rc = lnet_peer_send_ping(lp);
3119                         else if (lp->lp_state & LNET_PEER_FORCE_PUSH)
3120                                 rc = lnet_peer_send_push(lp);
3121                         else if (lnet_peer_discovery_disabled)
3122                                 rc = lnet_peer_rediscover(lp);
3123                         else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
3124                                 rc = lnet_peer_send_ping(lp);
3125                         else if (lnet_peer_needs_push(lp))
3126                                 rc = lnet_peer_send_push(lp);
3127                         else
3128                                 rc = lnet_peer_discovered(lp);
3129                         CDEBUG(D_NET, "peer %s state %#x rc %d\n",
3130                                 libcfs_nid2str(lp->lp_primary_nid),
3131                                 lp->lp_state, rc);
3132                         spin_unlock(&lp->lp_lock);
3133
3134                         lnet_net_lock(LNET_LOCK_EX);
3135                         if (rc == LNET_REDISCOVER_PEER) {
3136                                 list_move(&lp->lp_dc_list,
3137                                           &the_lnet.ln_dc_request);
3138                         } else if (rc) {
3139                                 lnet_peer_discovery_error(lp, rc);
3140                         }
3141                         if (!(lp->lp_state & LNET_PEER_DISCOVERING))
3142                                 lnet_peer_discovery_complete(lp);
3143                         if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
3144                                 break;
3145                 }
3146
3147                 lnet_net_unlock(LNET_LOCK_EX);
3148         }
3149
3150         CDEBUG(D_NET, "stopping\n");
3151         /*
3152          * Clean up before telling lnet_peer_discovery_stop() that
3153          * we're done. Use wake_up() below to somewhat reduce the
3154          * size of the thundering herd if there are multiple threads
3155          * waiting on discovery of a single peer.
3156          */
3157         LNetEQFree(the_lnet.ln_dc_eqh);
3158         LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3159
3160         /* Queue cleanup 1: stop all pending pings and pushes. */
3161         lnet_net_lock(LNET_LOCK_EX);
3162         while (!list_empty(&the_lnet.ln_dc_working)) {
3163                 lp = list_first_entry(&the_lnet.ln_dc_working,
3164                                       struct lnet_peer, lp_dc_list);
3165                 list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
3166                 lnet_net_unlock(LNET_LOCK_EX);
3167                 lnet_peer_cancel_discovery(lp);
3168                 lnet_net_lock(LNET_LOCK_EX);
3169         }
3170         lnet_net_unlock(LNET_LOCK_EX);
3171
3172         /* Queue cleanup 2: wait for the expired queue to clear. */
3173         while (!list_empty(&the_lnet.ln_dc_expired))
3174                 schedule_timeout(cfs_time_seconds(1));
3175
3176         /* Queue cleanup 3: clear the request queue. */
3177         lnet_net_lock(LNET_LOCK_EX);
3178         while (!list_empty(&the_lnet.ln_dc_request)) {
3179                 lp = list_first_entry(&the_lnet.ln_dc_request,
3180                                       struct lnet_peer, lp_dc_list);
3181                 lnet_peer_discovery_error(lp, -ESHUTDOWN);
3182                 lnet_peer_discovery_complete(lp);
3183         }
3184         lnet_net_unlock(LNET_LOCK_EX);
3185
3186         the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3187         wake_up(&the_lnet.ln_dc_waitq);
3188
3189         CDEBUG(D_NET, "stopped\n");
3190
3191         return 0;
3192 }
3193
3194 /* ln_api_mutex is held on entry. */
3195 int lnet_peer_discovery_start(void)
3196 {
3197         struct task_struct *task;
3198         int rc;
3199
3200         if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
3201                 return -EALREADY;
3202
3203         rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
3204         if (rc != 0) {
3205                 CERROR("Can't allocate discovery EQ: %d\n", rc);
3206                 return rc;
3207         }
3208
3209         the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
3210         task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
3211         if (IS_ERR(task)) {
3212                 rc = PTR_ERR(task);
3213                 CERROR("Can't start peer discovery thread: %d\n", rc);
3214
3215                 LNetEQFree(the_lnet.ln_dc_eqh);
3216                 LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
3217
3218                 the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
3219         }
3220
3221         CDEBUG(D_NET, "discovery start: %d\n", rc);
3222
3223         return rc;
3224 }
3225
3226 /* ln_api_mutex is held on entry. */
3227 void lnet_peer_discovery_stop(void)
3228 {
3229         if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
3230                 return;
3231
3232         LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
3233         the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
3234         wake_up(&the_lnet.ln_dc_waitq);
3235
3236         wait_event(the_lnet.ln_dc_waitq,
3237                    the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
3238
3239         LASSERT(list_empty(&the_lnet.ln_dc_request));
3240         LASSERT(list_empty(&the_lnet.ln_dc_working));
3241         LASSERT(list_empty(&the_lnet.ln_dc_expired));
3242
3243         CDEBUG(D_NET, "discovery stopped\n");
3244 }
3245
3246 /* Debugging */
3247
3248 void
3249 lnet_debug_peer(lnet_nid_t nid)
3250 {
3251         char                    *aliveness = "NA";
3252         struct lnet_peer_ni     *lp;
3253         int                     cpt;
3254
3255         cpt = lnet_cpt_of_nid(nid, NULL);
3256         lnet_net_lock(cpt);
3257
3258         lp = lnet_nid2peerni_locked(nid, LNET_NID_ANY, cpt);
3259         if (IS_ERR(lp)) {
3260                 lnet_net_unlock(cpt);
3261                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
3262                 return;
3263         }
3264
3265         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
3266                 aliveness = lp->lpni_alive ? "up" : "down";
3267
3268         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
3269                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
3270                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
3271                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
3272                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
3273
3274         lnet_peer_ni_decref_locked(lp);
3275
3276         lnet_net_unlock(cpt);
3277 }
3278
3279 /* Gathering information for userspace. */
3280
3281 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
3282                           char aliveness[LNET_MAX_STR_LEN],
3283                           __u32 *cpt_iter, __u32 *refcount,
3284                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
3285                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
3286                           __u32 *peer_tx_qnob)
3287 {
3288         struct lnet_peer_table          *peer_table;
3289         struct lnet_peer_ni             *lp;
3290         int                             j;
3291         int                             lncpt;
3292         bool                            found = false;
3293
3294         /* get the number of CPTs */
3295         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3296
3297         /* if the cpt number to be examined is >= the number of cpts in
3298          * the system then indicate that there are no more cpts to examin
3299          */
3300         if (*cpt_iter >= lncpt)
3301                 return -ENOENT;
3302
3303         /* get the current table */
3304         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
3305         /* if the ptable is NULL then there are no more cpts to examine */
3306         if (peer_table == NULL)
3307                 return -ENOENT;
3308
3309         lnet_net_lock(*cpt_iter);
3310
3311         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
3312                 struct list_head *peers = &peer_table->pt_hash[j];
3313
3314                 list_for_each_entry(lp, peers, lpni_hashlist) {
3315                         if (peer_index-- > 0)
3316                                 continue;
3317
3318                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
3319                         if (lnet_isrouter(lp) ||
3320                                 lnet_peer_aliveness_enabled(lp))
3321                                 snprintf(aliveness, LNET_MAX_STR_LEN,
3322                                          lp->lpni_alive ? "up" : "down");
3323
3324                         *nid = lp->lpni_nid;
3325                         *refcount = atomic_read(&lp->lpni_refcount);
3326                         *ni_peer_tx_credits =
3327                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
3328                         *peer_tx_credits = lp->lpni_txcredits;
3329                         *peer_rtr_credits = lp->lpni_rtrcredits;
3330                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
3331                         *peer_tx_qnob = lp->lpni_txqnob;
3332
3333                         found = true;
3334                 }
3335
3336         }
3337         lnet_net_unlock(*cpt_iter);
3338
3339         *cpt_iter = lncpt;
3340
3341         return found ? 0 : -ENOENT;
3342 }
3343
3344 /* ln_api_mutex is held, which keeps the peer list stable */
3345 int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
3346 {
3347         struct lnet_ioctl_element_stats *lpni_stats;
3348         struct lnet_ioctl_element_msg_stats *lpni_msg_stats;
3349         struct lnet_ioctl_peer_ni_hstats *lpni_hstats;
3350         struct lnet_peer_ni_credit_info *lpni_info;
3351         struct lnet_peer_ni *lpni;
3352         struct lnet_peer *lp;
3353         lnet_nid_t nid;
3354         __u32 size;
3355         int rc;
3356
3357         lp = lnet_find_peer(cfg->prcfg_prim_nid);
3358
3359         if (!lp) {
3360                 rc = -ENOENT;
3361                 goto out;
3362         }
3363
3364         size = sizeof(nid) + sizeof(*lpni_info) + sizeof(*lpni_stats)
3365                 + sizeof(*lpni_msg_stats) + sizeof(*lpni_hstats);
3366         size *= lp->lp_nnis;
3367         if (size > cfg->prcfg_size) {
3368                 cfg->prcfg_size = size;
3369                 rc = -E2BIG;
3370                 goto out_lp_decref;
3371         }
3372
3373         cfg->prcfg_prim_nid = lp->lp_primary_nid;
3374         cfg->prcfg_mr = lnet_peer_is_multi_rail(lp);
3375         cfg->prcfg_cfg_nid = lp->lp_primary_nid;
3376         cfg->prcfg_count = lp->lp_nnis;
3377         cfg->prcfg_size = size;
3378         cfg->prcfg_state = lp->lp_state;
3379
3380         /* Allocate helper buffers. */
3381         rc = -ENOMEM;
3382         LIBCFS_ALLOC(lpni_info, sizeof(*lpni_info));
3383         if (!lpni_info)
3384                 goto out_lp_decref;
3385         LIBCFS_ALLOC(lpni_stats, sizeof(*lpni_stats));
3386         if (!lpni_stats)
3387                 goto out_free_info;
3388         LIBCFS_ALLOC(lpni_msg_stats, sizeof(*lpni_msg_stats));
3389         if (!lpni_msg_stats)
3390                 goto out_free_stats;
3391         LIBCFS_ALLOC(lpni_hstats, sizeof(*lpni_hstats));
3392         if (!lpni_hstats)
3393                 goto out_free_msg_stats;
3394
3395
3396         lpni = NULL;
3397         rc = -EFAULT;
3398         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
3399                 nid = lpni->lpni_nid;
3400                 if (copy_to_user(bulk, &nid, sizeof(nid)))
3401                         goto out_free_hstats;
3402                 bulk += sizeof(nid);
3403
3404                 memset(lpni_info, 0, sizeof(*lpni_info));
3405                 snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
3406                 if (lnet_isrouter(lpni) ||
3407                         lnet_peer_aliveness_enabled(lpni))
3408                         snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN,
3409                                 lpni->lpni_alive ? "up" : "down");
3410
3411                 lpni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
3412                 lpni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
3413                         lpni->lpni_net->net_tunables.lct_peer_tx_credits : 0;
3414                 lpni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
3415                 lpni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
3416                 lpni_info->cr_peer_min_rtr_credits = lpni->lpni_minrtrcredits;
3417                 lpni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
3418                 lpni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
3419                 if (copy_to_user(bulk, lpni_info, sizeof(*lpni_info)))
3420                         goto out_free_hstats;
3421                 bulk += sizeof(*lpni_info);
3422
3423                 memset(lpni_stats, 0, sizeof(*lpni_stats));
3424                 lpni_stats->iel_send_count = lnet_sum_stats(&lpni->lpni_stats,
3425                                                             LNET_STATS_TYPE_SEND);
3426                 lpni_stats->iel_recv_count = lnet_sum_stats(&lpni->lpni_stats,
3427                                                             LNET_STATS_TYPE_RECV);
3428                 lpni_stats->iel_drop_count = lnet_sum_stats(&lpni->lpni_stats,
3429                                                             LNET_STATS_TYPE_DROP);
3430                 if (copy_to_user(bulk, lpni_stats, sizeof(*lpni_stats)))
3431                         goto out_free_hstats;
3432                 bulk += sizeof(*lpni_stats);
3433                 lnet_usr_translate_stats(lpni_msg_stats, &lpni->lpni_stats);
3434                 if (copy_to_user(bulk, lpni_msg_stats, sizeof(*lpni_msg_stats)))
3435                         goto out_free_hstats;
3436                 bulk += sizeof(*lpni_msg_stats);
3437                 lpni_hstats->hlpni_network_timeout =
3438                   atomic_read(&lpni->lpni_hstats.hlt_network_timeout);
3439                 lpni_hstats->hlpni_remote_dropped =
3440                   atomic_read(&lpni->lpni_hstats.hlt_remote_dropped);
3441                 lpni_hstats->hlpni_remote_timeout =
3442                   atomic_read(&lpni->lpni_hstats.hlt_remote_timeout);
3443                 lpni_hstats->hlpni_remote_error =
3444                   atomic_read(&lpni->lpni_hstats.hlt_remote_error);
3445                 lpni_hstats->hlpni_health_value =
3446                   atomic_read(&lpni->lpni_healthv);
3447                 if (copy_to_user(bulk, lpni_hstats, sizeof(*lpni_hstats)))
3448                         goto out_free_hstats;
3449                 bulk += sizeof(*lpni_hstats);
3450         }
3451         rc = 0;
3452
3453 out_free_hstats:
3454         LIBCFS_FREE(lpni_hstats, sizeof(*lpni_hstats));
3455 out_free_msg_stats:
3456         LIBCFS_FREE(lpni_msg_stats, sizeof(*lpni_msg_stats));
3457 out_free_stats:
3458         LIBCFS_FREE(lpni_stats, sizeof(*lpni_stats));
3459 out_free_info:
3460         LIBCFS_FREE(lpni_info, sizeof(*lpni_info));
3461 out_lp_decref:
3462         lnet_peer_decref_locked(lp);
3463 out:
3464         return rc;
3465 }
3466
3467 void
3468 lnet_peer_ni_add_to_recoveryq_locked(struct lnet_peer_ni *lpni)
3469 {
3470         /* the mt could've shutdown and cleaned up the queues */
3471         if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
3472                 return;
3473
3474         if (list_empty(&lpni->lpni_recovery) &&
3475             atomic_read(&lpni->lpni_healthv) < LNET_MAX_HEALTH_VALUE) {
3476                 CERROR("lpni %s added to recovery queue. Health = %d\n",
3477                         libcfs_nid2str(lpni->lpni_nid),
3478                         atomic_read(&lpni->lpni_healthv));
3479                 list_add_tail(&lpni->lpni_recovery, &the_lnet.ln_mt_peerNIRecovq);
3480                 lnet_peer_ni_addref_locked(lpni);
3481         }
3482 }
3483
3484 /* Call with the ln_api_mutex held */
3485 void
3486 lnet_peer_ni_set_healthv(lnet_nid_t nid, int value, bool all)
3487 {
3488         struct lnet_peer_table *ptable;
3489         struct lnet_peer *lp;
3490         struct lnet_peer_net *lpn;
3491         struct lnet_peer_ni *lpni;
3492         int lncpt;
3493         int cpt;
3494
3495         if (the_lnet.ln_state != LNET_STATE_RUNNING)
3496                 return;
3497
3498         if (!all) {
3499                 lnet_net_lock(LNET_LOCK_EX);
3500                 lpni = lnet_find_peer_ni_locked(nid);
3501                 if (!lpni) {
3502                         lnet_net_unlock(LNET_LOCK_EX);
3503                         return;
3504                 }
3505                 atomic_set(&lpni->lpni_healthv, value);
3506                 lnet_peer_ni_add_to_recoveryq_locked(lpni);
3507                 lnet_peer_ni_decref_locked(lpni);
3508                 lnet_net_unlock(LNET_LOCK_EX);
3509                 return;
3510         }
3511
3512         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
3513
3514         /*
3515          * Walk all the peers and reset the healhv for each one to the
3516          * maximum value.
3517          */
3518         lnet_net_lock(LNET_LOCK_EX);
3519         for (cpt = 0; cpt < lncpt; cpt++) {
3520                 ptable = the_lnet.ln_peer_tables[cpt];
3521                 list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
3522                         list_for_each_entry(lpn, &lp->lp_peer_nets, lpn_peer_nets) {
3523                                 list_for_each_entry(lpni, &lpn->lpn_peer_nis,
3524                                                     lpni_peer_nis) {
3525                                         atomic_set(&lpni->lpni_healthv, value);
3526                                         lnet_peer_ni_add_to_recoveryq_locked(lpni);
3527                                 }
3528                         }
3529                 }
3530         }
3531         lnet_net_unlock(LNET_LOCK_EX);
3532 }
3533