Whamcloud - gitweb
LU-11299 lnet: lnet_add/del_route()
[fs/lustre-release.git] / lnet / lnet / router.c
1 /*
2  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3  *
4  * Copyright (c) 2011, 2017, Intel Corporation.
5  *
6  *   This file is part of Lustre, https://wiki.whamcloud.com/
7  *
8  *   Portals is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Portals is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Portals; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_LNET
24
25 #include <linux/random.h>
26 #include <lnet/lib-lnet.h>
27
28 #define LNET_NRB_TINY_MIN       512     /* min value for each CPT */
29 #define LNET_NRB_TINY           (LNET_NRB_TINY_MIN * 4)
30 #define LNET_NRB_SMALL_MIN      4096    /* min value for each CPT */
31 #define LNET_NRB_SMALL          (LNET_NRB_SMALL_MIN * 4)
32 #define LNET_NRB_SMALL_PAGES    1
33 #define LNET_NRB_LARGE_MIN      256     /* min value for each CPT */
34 #define LNET_NRB_LARGE          (LNET_NRB_LARGE_MIN * 4)
35 #define LNET_NRB_LARGE_PAGES    ((LNET_MTU + PAGE_SIZE - 1) >> \
36                                   PAGE_SHIFT)
37
38 static char *forwarding = "";
39 module_param(forwarding, charp, 0444);
40 MODULE_PARM_DESC(forwarding, "Explicitly enable/disable forwarding between networks");
41
42 static int tiny_router_buffers;
43 module_param(tiny_router_buffers, int, 0444);
44 MODULE_PARM_DESC(tiny_router_buffers, "# of 0 payload messages to buffer in the router");
45 static int small_router_buffers;
46 module_param(small_router_buffers, int, 0444);
47 MODULE_PARM_DESC(small_router_buffers, "# of small (1 page) messages to buffer in the router");
48 static int large_router_buffers;
49 module_param(large_router_buffers, int, 0444);
50 MODULE_PARM_DESC(large_router_buffers, "# of large messages to buffer in the router");
51 static int peer_buffer_credits;
52 module_param(peer_buffer_credits, int, 0444);
53 MODULE_PARM_DESC(peer_buffer_credits, "# router buffer credits per peer");
54
55 static int auto_down = 1;
56 module_param(auto_down, int, 0444);
57 MODULE_PARM_DESC(auto_down, "Automatically mark peers down on comms error");
58
59 int
60 lnet_peer_buffer_credits(struct lnet_net *net)
61 {
62         /* NI option overrides LNet default */
63         if (net->net_tunables.lct_peer_rtr_credits > 0)
64                 return net->net_tunables.lct_peer_rtr_credits;
65         if (peer_buffer_credits > 0)
66                 return peer_buffer_credits;
67
68         /* As an approximation, allow this peer the same number of router
69          * buffers as it is allowed outstanding sends */
70         return net->net_tunables.lct_peer_tx_credits;
71 }
72
73 static int check_routers_before_use;
74 module_param(check_routers_before_use, int, 0444);
75 MODULE_PARM_DESC(check_routers_before_use, "Assume routers are down and ping them before use");
76
77 int avoid_asym_router_failure = 1;
78 module_param(avoid_asym_router_failure, int, 0644);
79 MODULE_PARM_DESC(avoid_asym_router_failure, "Avoid asymmetrical router failures (0 to disable)");
80
81 static int dead_router_check_interval = 60;
82 module_param(dead_router_check_interval, int, 0644);
83 MODULE_PARM_DESC(dead_router_check_interval, "Seconds between dead router health checks (<= 0 to disable)");
84
85 static int live_router_check_interval = 60;
86 module_param(live_router_check_interval, int, 0644);
87 MODULE_PARM_DESC(live_router_check_interval, "Seconds between live router health checks (<= 0 to disable)");
88
89 static int router_ping_timeout = 50;
90 module_param(router_ping_timeout, int, 0644);
91 MODULE_PARM_DESC(router_ping_timeout, "Seconds to wait for the reply to a router health query");
92
93 int
94 lnet_peers_start_down(void)
95 {
96         return check_routers_before_use;
97 }
98
99 void
100 lnet_notify_locked(struct lnet_peer_ni *lp, int notifylnd, int alive,
101                    time64_t when)
102 {
103         if (lp->lpni_timestamp > when) { /* out of date information */
104                 CDEBUG(D_NET, "Out of date\n");
105                 return;
106         }
107
108         /*
109          * This function can be called with different cpt locks being
110          * held. lpni_alive_count modification needs to be properly protected.
111          * Significant reads to lpni_alive_count are also protected with
112          * the same lock
113          */
114         spin_lock(&lp->lpni_lock);
115
116         lp->lpni_timestamp = when; /* update timestamp */
117
118         if (lp->lpni_alive_count != 0 &&          /* got old news */
119             (!lp->lpni_alive) == (!alive)) {      /* new date for old news */
120                 spin_unlock(&lp->lpni_lock);
121                 CDEBUG(D_NET, "Old news\n");
122                 return;
123         }
124
125         /* Flag that notification is outstanding */
126
127         lp->lpni_alive_count++;
128         lp->lpni_alive = (alive) ? 1 : 0;
129         lp->lpni_notify = 1;
130         lp->lpni_notifylnd = notifylnd;
131         if (lp->lpni_alive)
132                 lp->lpni_ping_feats = LNET_PING_FEAT_INVAL; /* reset */
133
134         spin_unlock(&lp->lpni_lock);
135
136         CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lpni_nid), alive);
137 }
138
139 /*
140  * This function will always be called with lp->lpni_cpt lock held.
141  */
142 static void
143 lnet_ni_notify_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp)
144 {
145         int alive;
146         int notifylnd;
147
148         /* Notify only in 1 thread at any time to ensure ordered notification.
149          * NB individual events can be missed; the only guarantee is that you
150          * always get the most recent news */
151
152         spin_lock(&lp->lpni_lock);
153
154         if (lp->lpni_notifying || ni == NULL) {
155                 spin_unlock(&lp->lpni_lock);
156                 return;
157         }
158
159         lp->lpni_notifying = 1;
160
161         /*
162          * lp->lpni_notify needs to be protected because it can be set in
163          * lnet_notify_locked().
164          */
165         while (lp->lpni_notify) {
166                 alive     = lp->lpni_alive;
167                 notifylnd = lp->lpni_notifylnd;
168
169                 lp->lpni_notifylnd = 0;
170                 lp->lpni_notify    = 0;
171
172                 if (notifylnd && ni->ni_net->net_lnd->lnd_notify != NULL) {
173                         spin_unlock(&lp->lpni_lock);
174                         lnet_net_unlock(lp->lpni_cpt);
175
176                         /* A new notification could happen now; I'll handle it
177                          * when control returns to me */
178
179                         (ni->ni_net->net_lnd->lnd_notify)(ni, lp->lpni_nid,
180                                                           alive);
181
182                         lnet_net_lock(lp->lpni_cpt);
183                         spin_lock(&lp->lpni_lock);
184                 }
185         }
186
187         lp->lpni_notifying = 0;
188         spin_unlock(&lp->lpni_lock);
189 }
190
191 static void
192 lnet_rtr_addref_locked(struct lnet_peer *lp)
193 {
194         LASSERT(lp->lp_rtr_refcount >= 0);
195
196         /* lnet_net_lock must be exclusively locked */
197         lp->lp_rtr_refcount++;
198         if (lp->lp_rtr_refcount == 1) {
199                 list_add_tail(&lp->lp_rtr_list, &the_lnet.ln_routers);
200                 /* addref for the_lnet.ln_routers */
201                 lnet_peer_addref_locked(lp);
202                 the_lnet.ln_routers_version++;
203         }
204 }
205
206 static void
207 lnet_rtr_decref_locked(struct lnet_peer *lp)
208 {
209         LASSERT(atomic_read(&lp->lp_refcount) > 0);
210         LASSERT(lp->lp_rtr_refcount > 0);
211
212         /* lnet_net_lock must be exclusively locked */
213         lp->lp_rtr_refcount--;
214         if (lp->lp_rtr_refcount == 0) {
215                 LASSERT(list_empty(&lp->lp_routes));
216
217                 list_del(&lp->lp_rtr_list);
218                 /* decref for the_lnet.ln_routers */
219                 lnet_peer_decref_locked(lp);
220                 the_lnet.ln_routers_version++;
221         }
222 }
223
224 struct lnet_remotenet *
225 lnet_find_rnet_locked(__u32 net)
226 {
227         struct lnet_remotenet *rnet;
228         struct list_head *tmp;
229         struct list_head *rn_list;
230
231         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
232
233         rn_list = lnet_net2rnethash(net);
234         list_for_each(tmp, rn_list) {
235                 rnet = list_entry(tmp, struct lnet_remotenet, lrn_list);
236
237                 if (rnet->lrn_net == net)
238                         return rnet;
239         }
240         return NULL;
241 }
242
243 static void lnet_shuffle_seed(void)
244 {
245         static int seeded;
246         struct lnet_ni *ni = NULL;
247
248         if (seeded)
249                 return;
250
251         /* Nodes with small feet have little entropy
252          * the NID for this node gives the most entropy in the low bits */
253         while ((ni = lnet_get_next_ni_locked(NULL, ni)))
254                 add_device_randomness(&ni->ni_nid, sizeof(ni->ni_nid));
255
256         seeded = 1;
257         return;
258 }
259
260 /* NB expects LNET_LOCK held */
261 static void
262 lnet_add_route_to_rnet(struct lnet_remotenet *rnet, struct lnet_route *route)
263 {
264         unsigned int len = 0;
265         unsigned int offset = 0;
266         struct list_head *e;
267
268         lnet_shuffle_seed();
269
270         list_for_each(e, &rnet->lrn_routes)
271                 len++;
272
273         /*
274          * Randomly adding routes to the list is done to ensure that when
275          * different nodes are using the same list of routers, they end up
276          * preferring different routers.
277          */
278         offset = cfs_rand() % (len + 1);
279         list_for_each(e, &rnet->lrn_routes) {
280                 if (offset == 0)
281                         break;
282                 offset--;
283         }
284         list_add(&route->lr_list, e);
285         /*
286          * force a router check on the gateway to make sure the route is
287          * alive
288          */
289         route->lr_gateway->lp_rtrcheck_timestamp = 0;
290
291         the_lnet.ln_remote_nets_version++;
292
293         /* add the route on the gateway list */
294         list_add(&route->lr_gwlist, &route->lr_gateway->lp_routes);
295
296         /* take a router reference count on the gateway */
297         lnet_rtr_addref_locked(route->lr_gateway);
298 }
299
300 int
301 lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway,
302                unsigned int priority)
303 {
304         struct list_head *route_entry;
305         struct lnet_remotenet *rnet;
306         struct lnet_remotenet *rnet2;
307         struct lnet_route *route;
308         struct lnet_peer_ni *lpni;
309         struct lnet_peer *gw;
310         int add_route;
311         int rc;
312
313         CDEBUG(D_NET, "Add route: remote net %s hops %d priority %u gw %s\n",
314                libcfs_net2str(net), hops, priority, libcfs_nid2str(gateway));
315
316         if (gateway == LNET_NID_ANY ||
317             LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
318             net == LNET_NIDNET(LNET_NID_ANY) ||
319             LNET_NETTYP(net) == LOLND ||
320             LNET_NIDNET(gateway) == net ||
321             (hops != LNET_UNDEFINED_HOPS && (hops < 1 || hops > 255)))
322                 return -EINVAL;
323
324         /* it's a local network */
325         if (lnet_islocalnet(net))
326                 return -EEXIST;
327
328         /* Assume net, route, all new */
329         LIBCFS_ALLOC(route, sizeof(*route));
330         LIBCFS_ALLOC(rnet, sizeof(*rnet));
331         if (route == NULL || rnet == NULL) {
332                 CERROR("Out of memory creating route %s %d %s\n",
333                        libcfs_net2str(net), hops, libcfs_nid2str(gateway));
334                 if (route != NULL)
335                         LIBCFS_FREE(route, sizeof(*route));
336                 if (rnet != NULL)
337                         LIBCFS_FREE(rnet, sizeof(*rnet));
338                 return -ENOMEM;
339         }
340
341         INIT_LIST_HEAD(&rnet->lrn_routes);
342         rnet->lrn_net = net;
343         /* store the local and remote net that the route represents */
344         route->lr_lnet = LNET_NIDNET(gateway);
345         route->lr_net = net;
346         route->lr_priority = priority;
347         route->lr_hops = hops;
348
349         lnet_net_lock(LNET_LOCK_EX);
350
351         /*
352          * lnet_nid2peerni_ex() grabs a ref on the lpni. We will need to
353          * lose that once we're done
354          */
355         lpni = lnet_nid2peerni_ex(gateway, LNET_LOCK_EX);
356         if (IS_ERR(lpni)) {
357                 lnet_net_unlock(LNET_LOCK_EX);
358
359                 LIBCFS_FREE(route, sizeof(*route));
360                 LIBCFS_FREE(rnet, sizeof(*rnet));
361
362                 rc = PTR_ERR(lpni);
363                 CERROR("Error %d creating route %s %d %s\n", rc,
364                         libcfs_net2str(net), hops,
365                         libcfs_nid2str(gateway));
366                 return rc;
367         }
368
369         LASSERT(lpni->lpni_peer_net && lpni->lpni_peer_net->lpn_peer);
370         gw = lpni->lpni_peer_net->lpn_peer;
371
372         route->lr_gateway = gw;
373
374         rnet2 = lnet_find_rnet_locked(net);
375         if (rnet2 == NULL) {
376                 /* new network */
377                 list_add_tail(&rnet->lrn_list, lnet_net2rnethash(net));
378                 rnet2 = rnet;
379         }
380
381         /* Search for a duplicate route (it's a NOOP if it is) */
382         add_route = 1;
383         list_for_each(route_entry, &rnet2->lrn_routes) {
384                 struct lnet_route *route2;
385
386                 route2 = list_entry(route_entry, struct lnet_route, lr_list);
387                 if (route2->lr_gateway == route->lr_gateway) {
388                         add_route = 0;
389                         break;
390                 }
391
392                 /* our lookups must be true */
393                 LASSERT(route2->lr_gateway->lp_primary_nid != gateway);
394         }
395
396         /*
397          * It is possible to add multiple routes through the same peer,
398          * but it'll be using a different NID of that peer. When the
399          * gateway is discovered, discovery will consolidate the different
400          * peers into one peer. In this case the discovery code will have
401          * to move the routes from the peer that's being deleted to the
402          * consolidated peer lp_routes list
403          */
404         if (add_route)
405                 lnet_add_route_to_rnet(rnet2, route);
406
407         /*
408          * get rid of the reference on the lpni.
409          */
410         lnet_peer_ni_decref_locked(lpni);
411         lnet_net_unlock(LNET_LOCK_EX);
412
413         rc = 0;
414
415         if (!add_route) {
416                 rc = -EEXIST;
417                 LIBCFS_FREE(route, sizeof(*route));
418         }
419
420         if (rnet != rnet2)
421                 LIBCFS_FREE(rnet, sizeof(*rnet));
422
423         /* kick start the monitor thread to handle the added route */
424         wake_up(&the_lnet.ln_mt_waitq);
425
426         return rc;
427 }
428
429 static void
430 lnet_del_route_from_rnet(lnet_nid_t gw_nid, struct list_head *route_list,
431                          struct list_head *zombies)
432 {
433         struct lnet_peer *gateway;
434         struct lnet_route *route;
435         struct lnet_route *tmp;
436
437         list_for_each_entry_safe(route, tmp, route_list, lr_list) {
438                 gateway = route->lr_gateway;
439                 if (gw_nid != LNET_NID_ANY &&
440                     gw_nid != gateway->lp_primary_nid)
441                         continue;
442
443                 /*
444                  * move to zombie to delete outside the lock
445                  * Note that this function is called with the
446                  * ln_api_mutex held as well as the exclusive net
447                  * lock. Adding to the remote net list happens
448                  * under the same conditions. Same goes for the
449                  * gateway router list
450                  */
451                 list_move(&route->lr_list, zombies);
452                 the_lnet.ln_remote_nets_version++;
453
454                 list_del(&route->lr_gwlist);
455                 lnet_rtr_decref_locked(gateway);
456         }
457 }
458
459 int
460 lnet_del_route(__u32 net, lnet_nid_t gw_nid)
461 {
462         struct list_head rnet_zombies;
463         struct lnet_remotenet *rnet;
464         struct lnet_remotenet *tmp;
465         struct list_head *rn_list;
466         struct lnet_peer_ni *lpni;
467         struct lnet_route *route;
468         struct list_head zombies;
469         struct lnet_peer *lp;
470         int i = 0;
471
472         INIT_LIST_HEAD(&rnet_zombies);
473         INIT_LIST_HEAD(&zombies);
474
475         CDEBUG(D_NET, "Del route: net %s : gw %s\n",
476                libcfs_net2str(net), libcfs_nid2str(gw_nid));
477
478         /* NB Caller may specify either all routes via the given gateway
479          * or a specific route entry actual NIDs) */
480
481         lnet_net_lock(LNET_LOCK_EX);
482
483         lpni = lnet_find_peer_ni_locked(gw_nid);
484         if (lpni) {
485                 lp = lpni->lpni_peer_net->lpn_peer;
486                 LASSERT(lp);
487                 gw_nid = lp->lp_primary_nid;
488                 lnet_peer_ni_decref_locked(lpni);
489         }
490
491         if (net != LNET_NIDNET(LNET_NID_ANY)) {
492                 rnet = lnet_find_rnet_locked(net);
493                 if (!rnet) {
494                         lnet_net_unlock(LNET_LOCK_EX);
495                         return -ENOENT;
496                 }
497                 lnet_del_route_from_rnet(gw_nid, &rnet->lrn_routes,
498                                          &zombies);
499                 if (list_empty(&rnet->lrn_routes))
500                         list_move(&rnet->lrn_list, &rnet_zombies);
501                 goto delete_zombies;
502         }
503
504         for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
505                 rn_list = &the_lnet.ln_remote_nets_hash[i];
506
507                 list_for_each_entry_safe(rnet, tmp, rn_list, lrn_list) {
508                         lnet_del_route_from_rnet(gw_nid, &rnet->lrn_routes,
509                                                  &zombies);
510                         if (list_empty(&rnet->lrn_routes))
511                                 list_move(&rnet->lrn_list, &rnet_zombies);
512                 }
513         }
514
515 delete_zombies:
516         lnet_net_unlock(LNET_LOCK_EX);
517
518         while (!list_empty(&zombies)) {
519                 route = list_first_entry(&zombies, struct lnet_route, lr_list);
520                 list_del(&route->lr_list);
521                 LIBCFS_FREE(route, sizeof(*route));
522         }
523
524         while (!list_empty(&rnet_zombies)) {
525                 rnet = list_first_entry(&rnet_zombies, struct lnet_remotenet,
526                                         lrn_list);
527                 list_del(&rnet->lrn_list);
528                 LIBCFS_FREE(rnet, sizeof(*rnet));
529         }
530
531         return 0;
532 }
533
534 void
535 lnet_destroy_routes (void)
536 {
537         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), LNET_NID_ANY);
538 }
539
540 int lnet_get_rtr_pool_cfg(int cpt, struct lnet_ioctl_pool_cfg *pool_cfg)
541 {
542         struct lnet_rtrbufpool *rbp;
543         int i, rc = -ENOENT, j;
544
545         if (the_lnet.ln_rtrpools == NULL)
546                 return rc;
547
548
549         cfs_percpt_for_each(rbp, i, the_lnet.ln_rtrpools) {
550                 if (i != cpt)
551                         continue;
552
553                 lnet_net_lock(i);
554                 for (j = 0; j < LNET_NRBPOOLS; j++) {
555                         pool_cfg->pl_pools[j].pl_npages = rbp[j].rbp_npages;
556                         pool_cfg->pl_pools[j].pl_nbuffers = rbp[j].rbp_nbuffers;
557                         pool_cfg->pl_pools[j].pl_credits = rbp[j].rbp_credits;
558                         pool_cfg->pl_pools[j].pl_mincredits = rbp[j].rbp_mincredits;
559                 }
560                 lnet_net_unlock(i);
561                 rc = 0;
562                 break;
563         }
564
565         lnet_net_lock(LNET_LOCK_EX);
566         pool_cfg->pl_routing = the_lnet.ln_routing;
567         lnet_net_unlock(LNET_LOCK_EX);
568
569         return rc;
570 }
571
572 int
573 lnet_get_route(int idx, __u32 *net, __u32 *hops,
574                lnet_nid_t *gateway, __u32 *alive, __u32 *priority)
575 {
576         struct list_head *e1;
577         struct list_head *e2;
578         struct lnet_remotenet *rnet;
579         struct lnet_route        *route;
580         int               cpt;
581         int               i;
582         struct list_head *rn_list;
583
584         cpt = lnet_net_lock_current();
585
586         for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
587                 rn_list = &the_lnet.ln_remote_nets_hash[i];
588                 list_for_each(e1, rn_list) {
589                         rnet = list_entry(e1, struct lnet_remotenet, lrn_list);
590
591                         list_for_each(e2, &rnet->lrn_routes) {
592                                 route = list_entry(e2, struct lnet_route,
593                                                    lr_list);
594
595                                 if (idx-- == 0) {
596                                         *net      = rnet->lrn_net;
597                                         *hops     = route->lr_hops;
598                                         *priority = route->lr_priority;
599                                         *gateway  = route->lr_gateway->lp_primary_nid;
600                                         *alive    = lnet_is_route_alive(route);
601                                         lnet_net_unlock(cpt);
602                                         return 0;
603                                 }
604                         }
605                 }
606         }
607
608         lnet_net_unlock(cpt);
609         return -ENOENT;
610 }
611
612 void
613 lnet_swap_pinginfo(struct lnet_ping_buffer *pbuf)
614 {
615         struct lnet_ni_status *stat;
616         int nnis;
617         int i;
618
619         __swab32s(&pbuf->pb_info.pi_magic);
620         __swab32s(&pbuf->pb_info.pi_features);
621         __swab32s(&pbuf->pb_info.pi_pid);
622         __swab32s(&pbuf->pb_info.pi_nnis);
623         nnis = pbuf->pb_info.pi_nnis;
624         if (nnis > pbuf->pb_nnis)
625                 nnis = pbuf->pb_nnis;
626         for (i = 0; i < nnis; i++) {
627                 stat = &pbuf->pb_info.pi_ni[i];
628                 __swab64s(&stat->ns_nid);
629                 __swab32s(&stat->ns_status);
630         }
631         return;
632 }
633
634 /**
635  * TODO: re-implement
636  */
637 static void
638 lnet_parse_rc_info(struct lnet_rc_data *rcd)
639 {
640         rcd = rcd;
641 }
642
643 static void
644 lnet_router_checker_event(struct lnet_event *event)
645 {
646         struct lnet_rc_data *rcd = event->md.user_ptr;
647         struct lnet_peer_ni *lp;
648
649         LASSERT(rcd != NULL);
650
651         if (event->unlinked) {
652                 LNetInvalidateMDHandle(&rcd->rcd_mdh);
653                 return;
654         }
655
656         LASSERT(event->type == LNET_EVENT_SEND ||
657                 event->type == LNET_EVENT_REPLY);
658
659         lp = rcd->rcd_gateway;
660         LASSERT(lp != NULL);
661
662          /* NB: it's called with holding lnet_res_lock, we have a few
663           * places need to hold both locks at the same time, please take
664           * care of lock ordering */
665         lnet_net_lock(lp->lpni_cpt);
666         if (!lnet_isrouter(lp) || lp->lpni_rcd != rcd) {
667                 /* ignore if no longer a router or rcd is replaced */
668                 goto out;
669         }
670
671         if (event->type == LNET_EVENT_SEND) {
672                 if (event->status == 0)
673                         goto out;
674         }
675
676         /* LNET_EVENT_REPLY */
677         /* A successful REPLY means the router is up.  If _any_ comms
678          * to the router fail I assume it's down (this will happen if
679          * we ping alive routers to try to detect router death before
680          * apps get burned). */
681
682         lnet_notify_locked(lp, 1, !event->status, ktime_get_seconds());
683         /* The router checker will wake up very shortly and do the
684          * actual notification.
685          * XXX If 'lp' stops being a router before then, it will still
686          * have the notification pending!!! */
687
688         if (avoid_asym_router_failure && event->status == 0)
689                 lnet_parse_rc_info(rcd);
690
691  out:
692         lnet_net_unlock(lp->lpni_cpt);
693 }
694
695 static void
696 lnet_wait_known_routerstate(void)
697 {
698         struct lnet_peer *rtr;
699         struct list_head *entry;
700         int all_known;
701
702         LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
703
704         for (;;) {
705                 int cpt = lnet_net_lock_current();
706
707                 all_known = 1;
708                 list_for_each(entry, &the_lnet.ln_routers) {
709                         rtr = list_entry(entry, struct lnet_peer,
710                                          lp_rtr_list);
711
712                         spin_lock(&rtr->lp_lock);
713
714                         if ((rtr->lp_state & LNET_PEER_DISCOVERED) == 0) {
715                                 all_known = 0;
716                                 spin_unlock(&rtr->lp_lock);
717                                 break;
718                         }
719                         spin_unlock(&rtr->lp_lock);
720                 }
721
722                 lnet_net_unlock(cpt);
723
724                 if (all_known)
725                         return;
726
727                 set_current_state(TASK_UNINTERRUPTIBLE);
728                 schedule_timeout(cfs_time_seconds(1));
729         }
730 }
731
732 /* TODO: reimplement */
733 void
734 lnet_router_ni_update_locked(struct lnet_peer_ni *gw, __u32 net)
735 {
736         struct lnet_route *rte;
737         struct lnet_peer *lp;
738
739         if ((gw->lpni_ping_feats & LNET_PING_FEAT_NI_STATUS) != 0)
740                 lp = gw->lpni_peer_net->lpn_peer;
741         else
742                 return;
743
744         list_for_each_entry(rte, &lp->lp_routes, lr_gwlist) {
745                 if (rte->lr_net == net) {
746                         rte->lr_downis = 0;
747                         break;
748                 }
749         }
750 }
751
752 static void
753 lnet_update_ni_status_locked(void)
754 {
755         struct lnet_ni *ni = NULL;
756         time64_t now;
757         time64_t timeout;
758
759         LASSERT(the_lnet.ln_routing);
760
761         timeout = router_ping_timeout +
762                   MAX(live_router_check_interval, dead_router_check_interval);
763
764         now = ktime_get_real_seconds();
765         while ((ni = lnet_get_next_ni_locked(NULL, ni))) {
766                 if (ni->ni_net->net_lnd->lnd_type == LOLND)
767                         continue;
768
769                 if (now < ni->ni_last_alive + timeout)
770                         continue;
771
772                 lnet_ni_lock(ni);
773                 /* re-check with lock */
774                 if (now < ni->ni_last_alive + timeout) {
775                         lnet_ni_unlock(ni);
776                         continue;
777                 }
778
779                 LASSERT(ni->ni_status != NULL);
780
781                 if (ni->ni_status->ns_status != LNET_NI_STATUS_DOWN) {
782                         CDEBUG(D_NET, "NI(%s:%lld) status changed to down\n",
783                                libcfs_nid2str(ni->ni_nid), timeout);
784                         /* NB: so far, this is the only place to set
785                          * NI status to "down" */
786                         ni->ni_status->ns_status = LNET_NI_STATUS_DOWN;
787                 }
788                 lnet_ni_unlock(ni);
789         }
790 }
791
792 int lnet_router_pre_mt_start(void)
793 {
794         int rc;
795
796         if (check_routers_before_use &&
797             dead_router_check_interval <= 0) {
798                 LCONSOLE_ERROR_MSG(0x10a, "'dead_router_check_interval' must be"
799                                    " set if 'check_routers_before_use' is set"
800                                    "\n");
801                 return -EINVAL;
802         }
803
804         rc = LNetEQAlloc(0, lnet_router_checker_event, &the_lnet.ln_rc_eqh);
805         if (rc != 0) {
806                 CERROR("Can't allocate EQ(0): %d\n", rc);
807                 return -ENOMEM;
808         }
809
810         return 0;
811 }
812
813 void lnet_router_post_mt_start(void)
814 {
815         if (check_routers_before_use) {
816                 /* Note that a helpful side-effect of pinging all known routers
817                  * at startup is that it makes them drop stale connections they
818                  * may have to a previous instance of me. */
819                 lnet_wait_known_routerstate();
820         }
821 }
822
823 void
824 lnet_router_cleanup(void)
825 {
826         int rc;
827
828         rc = LNetEQFree(the_lnet.ln_rc_eqh);
829         LASSERT(rc == 0);
830         return;
831 }
832
833 void
834 lnet_prune_rc_data(int wait_unlink)
835 {
836         wait_unlink = wait_unlink;
837 }
838
839 /*
840  * This function is called from the monitor thread to check if there are
841  * any active routers that need to be checked.
842  */
843 inline bool
844 lnet_router_checker_active(void)
845 {
846         if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
847                 return true;
848
849         /* Router Checker thread needs to run when routing is enabled in
850          * order to call lnet_update_ni_status_locked() */
851         if (the_lnet.ln_routing)
852                 return true;
853
854         /* if there are routers that need to be cleaned up then do so */
855         if (!list_empty(&the_lnet.ln_rcd_deathrow) ||
856             !list_empty(&the_lnet.ln_rcd_zombie))
857                 return true;
858
859         return !list_empty(&the_lnet.ln_routers) &&
860                 (live_router_check_interval > 0 ||
861                  dead_router_check_interval > 0);
862 }
863
864 void
865 lnet_check_routers(void)
866 {
867         struct lnet_peer *rtr;
868         struct list_head *entry;
869         __u64   version;
870         int     cpt;
871
872         cpt = lnet_net_lock_current();
873 rescan:
874         version = the_lnet.ln_routers_version;
875
876         list_for_each(entry, &the_lnet.ln_routers) {
877                 rtr = list_entry(entry, struct lnet_peer,
878                                  lp_rtr_list);
879
880                 /* TODO use discovery to determine if router is alive */
881
882                 /* NB dropped lock */
883                 if (version != the_lnet.ln_routers_version) {
884                         /* the routers list has changed */
885                         goto rescan;
886                 }
887         }
888
889         if (the_lnet.ln_routing)
890                 lnet_update_ni_status_locked();
891
892         lnet_net_unlock(cpt);
893
894         lnet_prune_rc_data(0); /* don't wait for UNLINK */
895 }
896
897 void
898 lnet_destroy_rtrbuf(struct lnet_rtrbuf *rb, int npages)
899 {
900         int sz = offsetof(struct lnet_rtrbuf, rb_kiov[npages]);
901
902         while (--npages >= 0)
903                 __free_page(rb->rb_kiov[npages].kiov_page);
904
905         LIBCFS_FREE(rb, sz);
906 }
907
908 static struct lnet_rtrbuf *
909 lnet_new_rtrbuf(struct lnet_rtrbufpool *rbp, int cpt)
910 {
911         int            npages = rbp->rbp_npages;
912         int            sz = offsetof(struct lnet_rtrbuf, rb_kiov[npages]);
913         struct page   *page;
914         struct lnet_rtrbuf *rb;
915         int            i;
916
917         LIBCFS_CPT_ALLOC(rb, lnet_cpt_table(), cpt, sz);
918         if (rb == NULL)
919                 return NULL;
920
921         rb->rb_pool = rbp;
922
923         for (i = 0; i < npages; i++) {
924                 page = cfs_page_cpt_alloc(lnet_cpt_table(), cpt,
925                                           GFP_KERNEL | __GFP_ZERO);
926                 if (page == NULL) {
927                         while (--i >= 0)
928                                 __free_page(rb->rb_kiov[i].kiov_page);
929
930                         LIBCFS_FREE(rb, sz);
931                         return NULL;
932                 }
933
934                 rb->rb_kiov[i].kiov_len = PAGE_SIZE;
935                 rb->rb_kiov[i].kiov_offset = 0;
936                 rb->rb_kiov[i].kiov_page = page;
937         }
938
939         return rb;
940 }
941
942 static void
943 lnet_rtrpool_free_bufs(struct lnet_rtrbufpool *rbp, int cpt)
944 {
945         int npages = rbp->rbp_npages;
946         struct lnet_rtrbuf *rb;
947         struct list_head tmp;
948
949         if (rbp->rbp_nbuffers == 0) /* not initialized or already freed */
950                 return;
951
952         INIT_LIST_HEAD(&tmp);
953
954         lnet_net_lock(cpt);
955         list_splice_init(&rbp->rbp_msgs, &tmp);
956         lnet_drop_routed_msgs_locked(&tmp, cpt);
957         list_splice_init(&rbp->rbp_bufs, &tmp);
958         rbp->rbp_req_nbuffers = 0;
959         rbp->rbp_nbuffers = rbp->rbp_credits = 0;
960         rbp->rbp_mincredits = 0;
961         lnet_net_unlock(cpt);
962
963         /* Free buffers on the free list. */
964         while (!list_empty(&tmp)) {
965                 rb = list_entry(tmp.next, struct lnet_rtrbuf, rb_list);
966                 list_del(&rb->rb_list);
967                 lnet_destroy_rtrbuf(rb, npages);
968         }
969 }
970
971 static int
972 lnet_rtrpool_adjust_bufs(struct lnet_rtrbufpool *rbp, int nbufs, int cpt)
973 {
974         struct list_head rb_list;
975         struct lnet_rtrbuf *rb;
976         int             num_rb;
977         int             num_buffers = 0;
978         int             old_req_nbufs;
979         int             npages = rbp->rbp_npages;
980
981         lnet_net_lock(cpt);
982         /* If we are called for less buffers than already in the pool, we
983          * just lower the req_nbuffers number and excess buffers will be
984          * thrown away as they are returned to the free list.  Credits
985          * then get adjusted as well.
986          * If we already have enough buffers allocated to serve the
987          * increase requested, then we can treat that the same way as we
988          * do the decrease. */
989         num_rb = nbufs - rbp->rbp_nbuffers;
990         if (nbufs <= rbp->rbp_req_nbuffers || num_rb <= 0) {
991                 rbp->rbp_req_nbuffers = nbufs;
992                 lnet_net_unlock(cpt);
993                 return 0;
994         }
995         /* store the older value of rbp_req_nbuffers and then set it to
996          * the new request to prevent lnet_return_rx_credits_locked() from
997          * freeing buffers that we need to keep around */
998         old_req_nbufs = rbp->rbp_req_nbuffers;
999         rbp->rbp_req_nbuffers = nbufs;
1000         lnet_net_unlock(cpt);
1001
1002         INIT_LIST_HEAD(&rb_list);
1003
1004         /* allocate the buffers on a local list first.  If all buffers are
1005          * allocated successfully then join this list to the rbp buffer
1006          * list.  If not then free all allocated buffers. */
1007         while (num_rb-- > 0) {
1008                 rb = lnet_new_rtrbuf(rbp, cpt);
1009                 if (rb == NULL) {
1010                         CERROR("Failed to allocate %d route bufs of %d pages\n",
1011                                nbufs, npages);
1012
1013                         lnet_net_lock(cpt);
1014                         rbp->rbp_req_nbuffers = old_req_nbufs;
1015                         lnet_net_unlock(cpt);
1016
1017                         goto failed;
1018                 }
1019
1020                 list_add(&rb->rb_list, &rb_list);
1021                 num_buffers++;
1022         }
1023
1024         lnet_net_lock(cpt);
1025
1026         list_splice_tail(&rb_list, &rbp->rbp_bufs);
1027         rbp->rbp_nbuffers += num_buffers;
1028         rbp->rbp_credits += num_buffers;
1029         rbp->rbp_mincredits = rbp->rbp_credits;
1030         /* We need to schedule blocked msg using the newly
1031          * added buffers. */
1032         while (!list_empty(&rbp->rbp_bufs) &&
1033                !list_empty(&rbp->rbp_msgs))
1034                 lnet_schedule_blocked_locked(rbp);
1035
1036         lnet_net_unlock(cpt);
1037
1038         return 0;
1039
1040 failed:
1041         while (!list_empty(&rb_list)) {
1042                 rb = list_entry(rb_list.next, struct lnet_rtrbuf, rb_list);
1043                 list_del(&rb->rb_list);
1044                 lnet_destroy_rtrbuf(rb, npages);
1045         }
1046
1047         return -ENOMEM;
1048 }
1049
1050 static void
1051 lnet_rtrpool_init(struct lnet_rtrbufpool *rbp, int npages)
1052 {
1053         INIT_LIST_HEAD(&rbp->rbp_msgs);
1054         INIT_LIST_HEAD(&rbp->rbp_bufs);
1055
1056         rbp->rbp_npages = npages;
1057         rbp->rbp_credits = 0;
1058         rbp->rbp_mincredits = 0;
1059 }
1060
1061 void
1062 lnet_rtrpools_free(int keep_pools)
1063 {
1064         struct lnet_rtrbufpool *rtrp;
1065         int               i;
1066
1067         if (the_lnet.ln_rtrpools == NULL) /* uninitialized or freed */
1068                 return;
1069
1070         cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1071                 lnet_rtrpool_free_bufs(&rtrp[LNET_TINY_BUF_IDX], i);
1072                 lnet_rtrpool_free_bufs(&rtrp[LNET_SMALL_BUF_IDX], i);
1073                 lnet_rtrpool_free_bufs(&rtrp[LNET_LARGE_BUF_IDX], i);
1074         }
1075
1076         if (!keep_pools) {
1077                 cfs_percpt_free(the_lnet.ln_rtrpools);
1078                 the_lnet.ln_rtrpools = NULL;
1079         }
1080 }
1081
1082 static int
1083 lnet_nrb_tiny_calculate(void)
1084 {
1085         int     nrbs = LNET_NRB_TINY;
1086
1087         if (tiny_router_buffers < 0) {
1088                 LCONSOLE_ERROR_MSG(0x10c,
1089                                    "tiny_router_buffers=%d invalid when "
1090                                    "routing enabled\n", tiny_router_buffers);
1091                 return -EINVAL;
1092         }
1093
1094         if (tiny_router_buffers > 0)
1095                 nrbs = tiny_router_buffers;
1096
1097         nrbs /= LNET_CPT_NUMBER;
1098         return max(nrbs, LNET_NRB_TINY_MIN);
1099 }
1100
1101 static int
1102 lnet_nrb_small_calculate(void)
1103 {
1104         int     nrbs = LNET_NRB_SMALL;
1105
1106         if (small_router_buffers < 0) {
1107                 LCONSOLE_ERROR_MSG(0x10c,
1108                                    "small_router_buffers=%d invalid when "
1109                                    "routing enabled\n", small_router_buffers);
1110                 return -EINVAL;
1111         }
1112
1113         if (small_router_buffers > 0)
1114                 nrbs = small_router_buffers;
1115
1116         nrbs /= LNET_CPT_NUMBER;
1117         return max(nrbs, LNET_NRB_SMALL_MIN);
1118 }
1119
1120 static int
1121 lnet_nrb_large_calculate(void)
1122 {
1123         int     nrbs = LNET_NRB_LARGE;
1124
1125         if (large_router_buffers < 0) {
1126                 LCONSOLE_ERROR_MSG(0x10c,
1127                                    "large_router_buffers=%d invalid when "
1128                                    "routing enabled\n", large_router_buffers);
1129                 return -EINVAL;
1130         }
1131
1132         if (large_router_buffers > 0)
1133                 nrbs = large_router_buffers;
1134
1135         nrbs /= LNET_CPT_NUMBER;
1136         return max(nrbs, LNET_NRB_LARGE_MIN);
1137 }
1138
1139 int
1140 lnet_rtrpools_alloc(int im_a_router)
1141 {
1142         struct lnet_rtrbufpool *rtrp;
1143         int     nrb_tiny;
1144         int     nrb_small;
1145         int     nrb_large;
1146         int     rc;
1147         int     i;
1148
1149         if (!strcmp(forwarding, "")) {
1150                 /* not set either way */
1151                 if (!im_a_router)
1152                         return 0;
1153         } else if (!strcmp(forwarding, "disabled")) {
1154                 /* explicitly disabled */
1155                 return 0;
1156         } else if (!strcmp(forwarding, "enabled")) {
1157                 /* explicitly enabled */
1158         } else {
1159                 LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either "
1160                                    "'enabled' or 'disabled'\n");
1161                 return -EINVAL;
1162         }
1163
1164         nrb_tiny = lnet_nrb_tiny_calculate();
1165         if (nrb_tiny < 0)
1166                 return -EINVAL;
1167
1168         nrb_small = lnet_nrb_small_calculate();
1169         if (nrb_small < 0)
1170                 return -EINVAL;
1171
1172         nrb_large = lnet_nrb_large_calculate();
1173         if (nrb_large < 0)
1174                 return -EINVAL;
1175
1176         the_lnet.ln_rtrpools = cfs_percpt_alloc(lnet_cpt_table(),
1177                                                 LNET_NRBPOOLS *
1178                                                 sizeof(struct lnet_rtrbufpool));
1179         if (the_lnet.ln_rtrpools == NULL) {
1180                 LCONSOLE_ERROR_MSG(0x10c,
1181                                    "Failed to initialize router buffe pool\n");
1182                 return -ENOMEM;
1183         }
1184
1185         cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1186                 lnet_rtrpool_init(&rtrp[LNET_TINY_BUF_IDX], 0);
1187                 rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
1188                                               nrb_tiny, i);
1189                 if (rc != 0)
1190                         goto failed;
1191
1192                 lnet_rtrpool_init(&rtrp[LNET_SMALL_BUF_IDX],
1193                                   LNET_NRB_SMALL_PAGES);
1194                 rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
1195                                               nrb_small, i);
1196                 if (rc != 0)
1197                         goto failed;
1198
1199                 lnet_rtrpool_init(&rtrp[LNET_LARGE_BUF_IDX],
1200                                   LNET_NRB_LARGE_PAGES);
1201                 rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
1202                                               nrb_large, i);
1203                 if (rc != 0)
1204                         goto failed;
1205         }
1206
1207         lnet_net_lock(LNET_LOCK_EX);
1208         the_lnet.ln_routing = 1;
1209         lnet_net_unlock(LNET_LOCK_EX);
1210         wake_up(&the_lnet.ln_mt_waitq);
1211         return 0;
1212
1213  failed:
1214         lnet_rtrpools_free(0);
1215         return rc;
1216 }
1217
1218 static int
1219 lnet_rtrpools_adjust_helper(int tiny, int small, int large)
1220 {
1221         int nrb = 0;
1222         int rc = 0;
1223         int i;
1224         struct lnet_rtrbufpool *rtrp;
1225
1226         /* If the provided values for each buffer pool are different than the
1227          * configured values, we need to take action. */
1228         if (tiny >= 0) {
1229                 tiny_router_buffers = tiny;
1230                 nrb = lnet_nrb_tiny_calculate();
1231                 cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1232                         rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
1233                                                       nrb, i);
1234                         if (rc != 0)
1235                                 return rc;
1236                 }
1237         }
1238         if (small >= 0) {
1239                 small_router_buffers = small;
1240                 nrb = lnet_nrb_small_calculate();
1241                 cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1242                         rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
1243                                                       nrb, i);
1244                         if (rc != 0)
1245                                 return rc;
1246                 }
1247         }
1248         if (large >= 0) {
1249                 large_router_buffers = large;
1250                 nrb = lnet_nrb_large_calculate();
1251                 cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1252                         rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
1253                                                       nrb, i);
1254                         if (rc != 0)
1255                                 return rc;
1256                 }
1257         }
1258
1259         return 0;
1260 }
1261
1262 int
1263 lnet_rtrpools_adjust(int tiny, int small, int large)
1264 {
1265         /* this function doesn't revert the changes if adding new buffers
1266          * failed.  It's up to the user space caller to revert the
1267          * changes. */
1268
1269         if (!the_lnet.ln_routing)
1270                 return 0;
1271
1272         return lnet_rtrpools_adjust_helper(tiny, small, large);
1273 }
1274
1275 int
1276 lnet_rtrpools_enable(void)
1277 {
1278         int rc = 0;
1279
1280         if (the_lnet.ln_routing)
1281                 return 0;
1282
1283         if (the_lnet.ln_rtrpools == NULL)
1284                 /* If routing is turned off, and we have never
1285                  * initialized the pools before, just call the
1286                  * standard buffer pool allocation routine as
1287                  * if we are just configuring this for the first
1288                  * time. */
1289                 rc = lnet_rtrpools_alloc(1);
1290         else
1291                 rc = lnet_rtrpools_adjust_helper(0, 0, 0);
1292         if (rc != 0)
1293                 return rc;
1294
1295         lnet_net_lock(LNET_LOCK_EX);
1296         the_lnet.ln_routing = 1;
1297
1298         the_lnet.ln_ping_target->pb_info.pi_features &=
1299                 ~LNET_PING_FEAT_RTE_DISABLED;
1300         lnet_net_unlock(LNET_LOCK_EX);
1301
1302         return rc;
1303 }
1304
1305 void
1306 lnet_rtrpools_disable(void)
1307 {
1308         if (!the_lnet.ln_routing)
1309                 return;
1310
1311         lnet_net_lock(LNET_LOCK_EX);
1312         the_lnet.ln_routing = 0;
1313         the_lnet.ln_ping_target->pb_info.pi_features |=
1314                 LNET_PING_FEAT_RTE_DISABLED;
1315
1316         tiny_router_buffers = 0;
1317         small_router_buffers = 0;
1318         large_router_buffers = 0;
1319         lnet_net_unlock(LNET_LOCK_EX);
1320         lnet_rtrpools_free(1);
1321 }
1322
1323 int
1324 lnet_notify(struct lnet_ni *ni, lnet_nid_t nid, int alive, time64_t when)
1325 {
1326         struct lnet_peer_ni *lp = NULL;
1327         time64_t now = ktime_get_seconds();
1328         int cpt = lnet_cpt_of_nid(nid, ni);
1329
1330         LASSERT (!in_interrupt ());
1331
1332         CDEBUG (D_NET, "%s notifying %s: %s\n",
1333                 (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1334                 libcfs_nid2str(nid),
1335                 alive ? "up" : "down");
1336
1337         if (ni != NULL &&
1338             LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid)) {
1339                 CWARN("Ignoring notification of %s %s by %s (different net)\n",
1340                       libcfs_nid2str(nid), alive ? "birth" : "death",
1341                       libcfs_nid2str(ni->ni_nid));
1342                 return -EINVAL;
1343         }
1344
1345         /* can't do predictions... */
1346         if (when > now) {
1347                 CWARN("Ignoring prediction from %s of %s %s "
1348                       "%lld seconds in the future\n",
1349                       (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1350                       libcfs_nid2str(nid), alive ? "up" : "down", when - now);
1351                 return -EINVAL;
1352         }
1353
1354         if (ni != NULL && !alive &&             /* LND telling me she's down */
1355             !auto_down) {                       /* auto-down disabled */
1356                 CDEBUG(D_NET, "Auto-down disabled\n");
1357                 return 0;
1358         }
1359
1360         lnet_net_lock(cpt);
1361
1362         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1363                 lnet_net_unlock(cpt);
1364                 return -ESHUTDOWN;
1365         }
1366
1367         lp = lnet_find_peer_ni_locked(nid);
1368         if (lp == NULL) {
1369                 /* nid not found */
1370                 lnet_net_unlock(cpt);
1371                 CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
1372                 return 0;
1373         }
1374
1375         /*
1376          * It is possible for this function to be called for the same peer
1377          * but with different NIs. We want to synchronize the notification
1378          * between the different calls. So we will use the lpni_cpt to
1379          * grab the net lock.
1380          */
1381         if (lp->lpni_cpt != cpt) {
1382                 lnet_net_unlock(cpt);
1383                 cpt = lp->lpni_cpt;
1384                 lnet_net_lock(cpt);
1385         }
1386
1387         /* We can't fully trust LND on reporting exact peer last_alive
1388          * if he notifies us about dead peer. For example ksocklnd can
1389          * call us with when == _time_when_the_node_was_booted_ if
1390          * no connections were successfully established */
1391         if (ni != NULL && !alive && when < lp->lpni_last_alive)
1392                 when = lp->lpni_last_alive;
1393
1394         lnet_notify_locked(lp, ni == NULL, alive, when);
1395
1396         if (ni != NULL)
1397                 lnet_ni_notify_locked(ni, lp);
1398
1399         lnet_peer_ni_decref_locked(lp);
1400
1401         lnet_net_unlock(cpt);
1402         return 0;
1403 }
1404 EXPORT_SYMBOL(lnet_notify);