Whamcloud - gitweb
New tag 2.15.63
[fs/lustre-release.git] / lnet / lnet / router.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
4  *
5  * Copyright (c) 2011, 2017, Intel Corporation.
6  */
7
8 /* This file is part of Lustre, http://www.lustre.org/ */
9
10 #define DEBUG_SUBSYSTEM S_LNET
11
12 #include <linux/random.h>
13 #include <lnet/lib-lnet.h>
14
15 #define LNET_NRB_TINY_MIN       512     /* min value for each CPT */
16 #define LNET_NRB_TINY           (LNET_NRB_TINY_MIN * 4)
17 #define LNET_NRB_SMALL_MIN      4096    /* min value for each CPT */
18 #define LNET_NRB_SMALL          (LNET_NRB_SMALL_MIN * 4)
19 #define LNET_NRB_SMALL_PAGES    1
20 #define LNET_NRB_LARGE_MIN      256     /* min value for each CPT */
21 #define LNET_NRB_LARGE          (LNET_NRB_LARGE_MIN * 4)
22 #define LNET_NRB_LARGE_PAGES    ((LNET_MTU + PAGE_SIZE - 1) >> \
23                                   PAGE_SHIFT)
24
25 static char *forwarding = "";
26 module_param(forwarding, charp, 0444);
27 MODULE_PARM_DESC(forwarding, "Explicitly enable/disable forwarding between networks");
28
29 static int tiny_router_buffers;
30 module_param(tiny_router_buffers, int, 0444);
31 MODULE_PARM_DESC(tiny_router_buffers, "# of 0 payload messages to buffer in the router");
32 static int small_router_buffers;
33 module_param(small_router_buffers, int, 0444);
34 MODULE_PARM_DESC(small_router_buffers, "# of small (1 page) messages to buffer in the router");
35 static int large_router_buffers;
36 module_param(large_router_buffers, int, 0444);
37 MODULE_PARM_DESC(large_router_buffers, "# of large messages to buffer in the router");
38 static int peer_buffer_credits;
39 module_param(peer_buffer_credits, int, 0444);
40 MODULE_PARM_DESC(peer_buffer_credits, "# router buffer credits per peer");
41
42 static int auto_down = 1;
43 module_param(auto_down, int, 0444);
44 MODULE_PARM_DESC(auto_down, "Automatically mark peers down on comms error");
45
46 int
47 lnet_peer_buffer_credits(struct lnet_net *net)
48 {
49         /* NI option overrides LNet default */
50         if (net->net_tunables.lct_peer_rtr_credits > 0)
51                 return net->net_tunables.lct_peer_rtr_credits;
52         if (peer_buffer_credits > 0)
53                 return peer_buffer_credits;
54
55         /* As an approximation, allow this peer the same number of router
56          * buffers as it is allowed outstanding sends */
57         return net->net_tunables.lct_peer_tx_credits;
58 }
59
60 static int check_routers_before_use;
61 module_param(check_routers_before_use, int, 0444);
62 MODULE_PARM_DESC(check_routers_before_use, "Assume routers are down and ping them before use");
63
64 int avoid_asym_router_failure = 1;
65 module_param(avoid_asym_router_failure, int, 0644);
66 MODULE_PARM_DESC(avoid_asym_router_failure, "Avoid asymmetrical router failures (0 to disable)");
67
68 int dead_router_check_interval = INT_MIN;
69 module_param(dead_router_check_interval, int, 0444);
70 MODULE_PARM_DESC(dead_router_check_interval, "(DEPRECATED - Use alive_router_check_interval)");
71
72 int live_router_check_interval = INT_MIN;
73 module_param(live_router_check_interval, int, 0444);
74 MODULE_PARM_DESC(live_router_check_interval, "(DEPRECATED - Use alive_router_check_interval)");
75
76 int alive_router_check_interval = 60;
77 module_param(alive_router_check_interval, int, 0644);
78 MODULE_PARM_DESC(alive_router_check_interval, "Seconds between live router health checks (<= 0 to disable)");
79
80 static int router_ping_timeout = 50;
81 module_param(router_ping_timeout, int, 0644);
82 MODULE_PARM_DESC(router_ping_timeout, "Seconds to wait for the reply to a router health query");
83
84 /*
85  * A value between 0 and 100. 0 meaning that even if router's interfaces
86  * have the worse health still consider the gateway usable.
87  * 100 means that at least one interface on the route's remote net is 100%
88  * healthy to consider the route alive.
89  * The default is set to 100 to ensure we maintain the original behavior.
90  */
91 unsigned int router_sensitivity_percentage = 100;
92 static int rtr_sensitivity_set(const char *val, cfs_kernel_param_arg_t *kp);
93 static struct kernel_param_ops param_ops_rtr_sensitivity = {
94         .set = rtr_sensitivity_set,
95         .get = param_get_int,
96 };
97 #define param_check_rtr_sensitivity(name, p) \
98                 __param_check(name, p, int)
99 #ifdef HAVE_KERNEL_PARAM_OPS
100 module_param(router_sensitivity_percentage, rtr_sensitivity, S_IRUGO|S_IWUSR);
101 #else
102 module_param_call(router_sensitivity_percentage, rtr_sensitivity_set, param_get_int,
103                   &router_sensitivity_percentage, S_IRUGO|S_IWUSR);
104 #endif
105 MODULE_PARM_DESC(router_sensitivity_percentage,
106                 "How healthy a gateway should be to be used in percent");
107
108 static void lnet_add_route_to_rnet(struct lnet_remotenet *rnet,
109                                    struct lnet_route *route);
110 static void lnet_del_route_from_rnet(struct lnet_nid *gw_nid,
111                                      struct list_head *route_list,
112                                      struct list_head *zombies);
113
114 static int
115 rtr_sensitivity_set(const char *val, cfs_kernel_param_arg_t *kp)
116 {
117         int rc;
118         unsigned *sen = (unsigned *)kp->arg;
119         unsigned long value;
120
121         rc = kstrtoul(val, 0, &value);
122         if (rc) {
123                 CERROR("Invalid module parameter value for 'router_sensitivity_percentage'\n");
124                 return rc;
125         }
126
127         if (value < 0 || value > 100) {
128                 CERROR("Invalid value: %lu for 'router_sensitivity_percentage'\n", value);
129                 return -EINVAL;
130         }
131
132         /*
133          * The purpose of locking the api_mutex here is to ensure that
134          * the correct value ends up stored properly.
135          */
136         mutex_lock(&the_lnet.ln_api_mutex);
137
138         *sen = value;
139
140         mutex_unlock(&the_lnet.ln_api_mutex);
141
142         return 0;
143 }
144
145 void
146 lnet_move_route(struct lnet_route *route, struct lnet_peer *lp,
147                 struct list_head *rt_list)
148 __must_hold(&the_lnet.ln_api_mutex)
149 {
150         struct lnet_remotenet *rnet;
151         struct list_head zombies;
152         struct list_head *l;
153
154         INIT_LIST_HEAD(&zombies);
155
156         if (rt_list)
157                 l = rt_list;
158         else
159                 l = &zombies;
160
161         rnet = lnet_find_rnet_locked(route->lr_net);
162         LASSERT(rnet);
163
164         CDEBUG(D_NET, "deleting route %s->%s\n",
165                libcfs_net2str(route->lr_net),
166                libcfs_nidstr(&route->lr_nid));
167
168         /*
169          * use the gateway's lp_primary_nid to delete the route as the
170          * lr_nid can be a constituent NID of the peer
171          */
172         lnet_del_route_from_rnet(
173                 &route->lr_gateway->lp_primary_nid,
174                 &rnet->lrn_routes, l);
175
176         if (lp) {
177                 route = list_first_entry(l, struct lnet_route,
178                                          lr_list);
179                 route->lr_gateway = lp;
180                 lnet_add_route_to_rnet(rnet, route);
181         } else {
182                 while (!list_empty(l) && !rt_list) {
183                         route = list_first_entry(l, struct lnet_route,
184                                                  lr_list);
185                         list_del(&route->lr_list);
186                         LIBCFS_FREE(route, sizeof(*route));
187                 }
188         }
189 }
190
191 void
192 lnet_rtr_transfer_to_peer(struct lnet_peer *src, struct lnet_peer *target)
193 {
194         struct lnet_route *route;
195         struct lnet_route *tmp, *tmp2;
196
197         lnet_net_lock(LNET_LOCK_EX);
198         CDEBUG(D_NET, "transfering routes from %s -> %s\n",
199                libcfs_nidstr(&src->lp_primary_nid),
200                libcfs_nidstr(&target->lp_primary_nid));
201         list_for_each_entry(route, &src->lp_routes, lr_gwlist) {
202                 CDEBUG(D_NET, "%s: %s->%s\n",
203                        libcfs_nidstr(&src->lp_primary_nid),
204                        libcfs_net2str(route->lr_net),
205                        libcfs_nidstr(&route->lr_nid));
206         }
207         list_splice_init(&src->lp_rtrq, &target->lp_rtrq);
208         list_for_each_entry_safe(route, tmp, &src->lp_routes, lr_gwlist) {
209                 struct lnet_route *r2;
210                 bool present = false;
211                 list_for_each_entry_safe(r2, tmp2, &target->lp_routes, lr_gwlist) {
212                         if (route->lr_net == r2->lr_net) {
213                                 if (route->lr_priority >= r2->lr_priority)
214                                         present = true;
215                                 else if (route->lr_hops >= r2->lr_hops)
216                                         present = true;
217                                 else
218                                         lnet_move_route(r2, NULL, NULL);
219                         }
220                 }
221                 if (present)
222                         lnet_move_route(route, NULL, NULL);
223                 else
224                         lnet_move_route(route, target, NULL);
225         }
226
227         if (list_empty(&target->lp_rtr_list)) {
228                 lnet_peer_addref_locked(target);
229                 list_add_tail(&target->lp_rtr_list, &the_lnet.ln_routers);
230         }
231
232         the_lnet.ln_routers_version++;
233         lnet_net_unlock(LNET_LOCK_EX);
234 }
235
236 int
237 lnet_peers_start_down(void)
238 {
239         return check_routers_before_use;
240 }
241
242 /*
243  * The peer_net of a gateway is alive if at least one of the peer_ni's on
244  * that peer_net is alive.
245  */
246 static bool
247 lnet_is_gateway_net_alive(struct lnet_peer_net *lpn)
248 {
249         struct lnet_peer_ni *lpni;
250
251         list_for_each_entry(lpni, &lpn->lpn_peer_nis, lpni_peer_nis) {
252                 if (lnet_is_peer_ni_alive(lpni))
253                         return true;
254         }
255
256         return false;
257 }
258
259 /*
260  * a gateway is alive only if all its nets are alive
261  * called with cpt lock held
262  */
263 bool lnet_is_gateway_alive(struct lnet_peer *gw)
264 {
265         struct lnet_peer_net *lpn;
266
267         if (!gw->lp_alive)
268                 return false;
269
270         list_for_each_entry(lpn, &gw->lp_peer_nets, lpn_peer_nets) {
271                 if (!lnet_is_gateway_net_alive(lpn))
272                         return false;
273         }
274
275         return true;
276 }
277
278 /*
279  * lnet_is_route_alive() needs to be called with cpt lock held
280  * A route is alive if the gateway can route between the local network and
281  * the remote network of the route.
282  * This means at least one NI is alive on each of the local and remote
283  * networks of the gateway.
284  */
285 bool lnet_is_route_alive(struct lnet_route *route)
286 {
287         struct lnet_peer *gw = route->lr_gateway;
288         struct lnet_peer_net *llpn;
289         struct lnet_peer_net *rlpn;
290
291         /* If the gateway is down then all routes are considered down */
292         if (!gw->lp_alive)
293                 return false;
294
295         /*
296          * if discovery is disabled then rely on the cached aliveness
297          * information. This is handicapped information which we log when
298          * we receive the discovery ping response. The most uptodate
299          * aliveness information can only be obtained when discovery is
300          * enabled.
301          */
302         if (lnet_is_discovery_disabled(gw))
303                 return atomic_read(&route->lr_alive) == 1;
304
305         /*
306          * check the gateway's interfaces on the local network
307          */
308         llpn = lnet_peer_get_net_locked(gw, route->lr_lnet);
309         if (!llpn)
310                 return false;
311
312         if (!lnet_is_gateway_net_alive(llpn))
313                 return false;
314
315         /*
316          * For single hop routes avoid_asym_router_failure dictates
317          * that the remote net must exist on the gateway. For multi-hop
318          * routes the next-hop will not have the remote net.
319          */
320         if (avoid_asym_router_failure &&
321             (route->lr_hops == 1 || route->lr_single_hop)) {
322                 rlpn = lnet_peer_get_net_locked(gw, route->lr_net);
323                 if (!rlpn)
324                         return false;
325                 if (!lnet_is_gateway_net_alive(rlpn))
326                         return false;
327         }
328
329         spin_lock(&gw->lp_lock);
330         if (!(gw->lp_state & LNET_PEER_ROUTER_ENABLED)) {
331                 spin_unlock(&gw->lp_lock);
332                 if (gw->lp_rtr_refcount > 0)
333                         CERROR("peer %s is being used as a gateway but routing feature is not turned on\n",
334                                libcfs_nidstr(&gw->lp_primary_nid));
335                 return false;
336         }
337         spin_unlock(&gw->lp_lock);
338
339         return true;
340 }
341
342 void
343 lnet_consolidate_routes_locked(struct lnet_peer *orig_lp,
344                                struct lnet_peer *new_lp)
345 {
346         struct lnet_peer_ni *lpni;
347         struct lnet_route *route;
348
349         /*
350          * Although a route is correlated with a peer, but when it's added
351          * a specific NID is used. That NID refers to a peer_ni within
352          * a peer. There could be other peer_nis on the same net, which
353          * can be used to send to that gateway. However when we are
354          * consolidating gateways because of discovery, the nid used to
355          * add the route might've moved between gateway peers. In this
356          * case we want to move the route to the new gateway as well. The
357          * intent here is not to confuse the user who added the route.
358          */
359         list_for_each_entry(route, &orig_lp->lp_routes, lr_gwlist) {
360                 lpni = lnet_peer_ni_get_locked(orig_lp, &route->lr_nid);
361                 if (!lpni) {
362                         lnet_net_lock(LNET_LOCK_EX);
363                         list_move(&route->lr_gwlist, &new_lp->lp_routes);
364                         lnet_net_unlock(LNET_LOCK_EX);
365                 }
366         }
367 }
368
369 static inline void
370 lnet_check_route_inconsistency(struct lnet_route *route)
371 {
372         if (!route->lr_single_hop && route->lr_hops == 1 &&
373             avoid_asym_router_failure) {
374                 CWARN("route %s->%s is detected to be multi-hop but hop count is set to %d\n",
375                         libcfs_net2str(route->lr_net),
376                         libcfs_nidstr(&route->lr_gateway->lp_primary_nid),
377                         (int) route->lr_hops);
378         }
379 }
380
381 /* Routes are added and removed under both ln_api_mutex and net_lock/EX
382  * Since we are not modifying anything we simply require the ln_api_mutex be
383  * held so that things are not modified underneath us
384  */
385 void
386 lnet_router_discovery_ping_reply(struct lnet_peer *lp,
387                                  struct lnet_ping_buffer *pbuf)
388 __must_hold(&the_lnet.ln_api_mutex)
389 {
390         struct lnet_ping_iter piter;
391         struct lnet_peer_net *llpn;
392         struct lnet_route *route;
393         struct lnet_nid nid;
394         bool single_hop = false;
395         bool net_up = false;
396         u32 *stp;
397
398         if (pbuf->pb_info.pi_features & LNET_PING_FEAT_RTE_DISABLED) {
399                 CERROR("Peer %s is being used as a gateway but routing feature is not turned on\n",
400                        libcfs_nidstr(&lp->lp_primary_nid));
401                 list_for_each_entry(route, &lp->lp_routes, lr_gwlist)
402                         lnet_set_route_aliveness(route, false);
403                 return;
404         }
405
406         CDEBUG(D_NET, "Processing reply for gw: %s nnis %d\n",
407                libcfs_nidstr(&lp->lp_primary_nid), pbuf->pb_info.pi_nnis);
408
409         /* examine the ping response to determine if the routes on that
410          * gateway should be declared alive.
411          * The route is alive if:
412          *  1. local network to reach the route is alive and
413          *  2. route is single hop, avoid_async_router_failure is set and
414          *     there exists at least one NI on the route's remote net
415          */
416         list_for_each_entry(route, &lp->lp_routes, lr_gwlist) {
417                 llpn = lnet_peer_get_net_locked(lp, route->lr_lnet);
418                 if (!llpn) {
419                         lnet_set_route_aliveness(route, false);
420                         continue;
421                 }
422
423                 if (!lnet_is_gateway_net_alive(llpn)) {
424                         lnet_set_route_aliveness(route, false);
425                         continue;
426                 }
427
428                 single_hop = net_up = false;
429                 for (stp = ping_iter_first(&piter, pbuf, &nid);
430                      stp;
431                      stp = ping_iter_next(&piter, &nid)) {
432                         if (route->lr_net == LNET_NID_NET(&nid)) {
433                                 single_hop = true;
434                                 if (*stp == LNET_NI_STATUS_UP) {
435                                         net_up = true;
436                                         break;
437                                 }
438                         }
439                 }
440
441                 route->lr_single_hop = single_hop;
442                 if (avoid_asym_router_failure &&
443                     (route->lr_hops == 1 || route->lr_single_hop))
444                         lnet_set_route_aliveness(route, net_up);
445                 else
446                         lnet_set_route_aliveness(route, true);
447
448                 /*
449                  * warn that the route is configured as single-hop but it
450                  * really is multi-hop as far as we can tell.
451                  */
452                 lnet_check_route_inconsistency(route);
453         }
454 }
455
456 void
457 lnet_router_discovery_complete(struct lnet_peer *lp)
458 {
459         struct lnet_peer_ni *lpni = NULL;
460         struct lnet_route *route;
461
462         spin_lock(&lp->lp_lock);
463         lp->lp_state &= ~LNET_PEER_RTR_DISCOVERY;
464         lp->lp_state |= LNET_PEER_RTR_DISCOVERED;
465         lp->lp_alive = lp->lp_dc_error == 0;
466         spin_unlock(&lp->lp_lock);
467
468         if (!lp->lp_dc_error)
469                 return;
470
471         /*
472          * We do not send messages directly to the remote interfaces
473          * of an LNet router. As such, we rely on the PING response
474          * to determine the up/down status of these interfaces. If
475          * a PING response is not receieved, or some other problem with
476          * discovery occurs that prevents us from getting this status,
477          * we assume all interfaces are down until we're able to
478          * determine otherwise.
479          */
480         CDEBUG(D_NET, "%s: Router discovery failed %d\n",
481                libcfs_nidstr(&lp->lp_primary_nid), lp->lp_dc_error);
482         while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
483                 lpni->lpni_ns_status = LNET_NI_STATUS_DOWN;
484
485         list_for_each_entry(route, &lp->lp_routes, lr_gwlist)
486                 lnet_set_route_aliveness(route, false);
487 }
488
489 static void
490 lnet_rtr_addref_locked(struct lnet_peer *lp)
491 {
492         LASSERT(lp->lp_rtr_refcount >= 0);
493
494         /* lnet_net_lock must be exclusively locked */
495         lp->lp_rtr_refcount++;
496         if (lp->lp_rtr_refcount == 1) {
497                 list_add_tail(&lp->lp_rtr_list, &the_lnet.ln_routers);
498                 /* addref for the_lnet.ln_routers */
499                 lnet_peer_addref_locked(lp);
500                 the_lnet.ln_routers_version++;
501         }
502 }
503
504 static void
505 lnet_rtr_decref_locked(struct lnet_peer *lp)
506 {
507         LASSERT(atomic_read(&lp->lp_refcount) > 0);
508         LASSERT(lp->lp_rtr_refcount > 0);
509
510         /* lnet_net_lock must be exclusively locked */
511         lp->lp_rtr_refcount--;
512         if (lp->lp_rtr_refcount == 0) {
513                 LASSERT(list_empty(&lp->lp_routes));
514
515                 list_del(&lp->lp_rtr_list);
516                 /* decref for the_lnet.ln_routers */
517                 lnet_peer_decref_locked(lp);
518                 the_lnet.ln_routers_version++;
519         }
520 }
521
522 struct lnet_remotenet *
523 lnet_find_rnet_locked(__u32 net)
524 {
525         struct lnet_remotenet *rnet;
526         struct list_head *rn_list;
527
528         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
529
530         rn_list = lnet_net2rnethash(net);
531         list_for_each_entry(rnet, rn_list, lrn_list) {
532                 if (rnet->lrn_net == net)
533                         return rnet;
534         }
535         return NULL;
536 }
537
538 static void lnet_shuffle_seed(void)
539 {
540         static int seeded;
541         struct lnet_ni *ni = NULL;
542
543         if (seeded)
544                 return;
545
546         /* Nodes with small feet have little entropy
547          * the NID for this node gives the most entropy in the low bits */
548         while ((ni = lnet_get_next_ni_locked(NULL, ni)))
549                 add_device_randomness(&ni->ni_nid, sizeof(ni->ni_nid));
550
551         seeded = 1;
552 }
553
554 /* NB expects LNET_LOCK held */
555 static void
556 lnet_add_route_to_rnet(struct lnet_remotenet *rnet, struct lnet_route *route)
557 {
558         struct lnet_peer_net *lpn;
559         unsigned int offset = 0;
560         unsigned int len = 0;
561         struct list_head *e;
562         time64_t now;
563
564         lnet_shuffle_seed();
565
566         list_for_each(e, &rnet->lrn_routes)
567                 len++;
568
569         /*
570          * Randomly adding routes to the list is done to ensure that when
571          * different nodes are using the same list of routers, they end up
572          * preferring different routers.
573          */
574         offset = get_random_u32_below(len + 1);
575         list_for_each(e, &rnet->lrn_routes) {
576                 if (offset == 0)
577                         break;
578                 offset--;
579         }
580         list_add(&route->lr_list, e);
581         /*
582          * force a router check on the gateway to make sure the route is
583          * alive
584          */
585         now = ktime_get_real_seconds();
586         list_for_each_entry(lpn, &route->lr_gateway->lp_peer_nets,
587                             lpn_peer_nets) {
588                 lpn->lpn_next_ping = now;
589         }
590
591         the_lnet.ln_remote_nets_version++;
592
593         /* add the route on the gateway list */
594         list_add(&route->lr_gwlist, &route->lr_gateway->lp_routes);
595
596         /* take a router reference count on the gateway */
597         lnet_rtr_addref_locked(route->lr_gateway);
598 }
599
600 int
601 lnet_add_route(__u32 net, __u32 hops, struct lnet_nid *gateway,
602                __u32 priority, __u32 sensitivity)
603 __must_hold(&the_lnet.ln_api_mutex)
604 {
605         struct list_head *route_entry;
606         struct lnet_remotenet *rnet;
607         struct lnet_remotenet *rnet2;
608         struct lnet_route *route;
609         struct lnet_peer_ni *lpni;
610         struct lnet_peer *gw;
611         int add_route;
612         int rc;
613
614         CDEBUG(D_NET, "Add route: remote net %s hops %d priority %u gw %s\n",
615                libcfs_net2str(net), hops, priority, libcfs_nidstr(gateway));
616
617         if (LNET_NID_IS_ANY(gateway) ||
618             nid_is_lo0(gateway) ||
619             net == LNET_NET_ANY ||
620             LNET_NETTYP(net) == LOLND ||
621             LNET_NID_NET(gateway) == net ||
622             (hops != LNET_UNDEFINED_HOPS && (hops < 1 || hops > 255)))
623                 return -EINVAL;
624
625         /* it's a local network */
626         if (lnet_islocalnet(net))
627                 return -EEXIST;
628
629         if (!lnet_islocalnet(LNET_NID_NET(gateway))) {
630                 CERROR("Cannot add route with gateway %s. There is no local interface configured on LNet %s\n",
631                        libcfs_nidstr(gateway),
632                        libcfs_net2str(LNET_NID_NET(gateway)));
633                 return -EHOSTUNREACH;
634         }
635
636         /* Assume net, route, all new */
637         LIBCFS_ALLOC(route, sizeof(*route));
638         LIBCFS_ALLOC(rnet, sizeof(*rnet));
639         if (route == NULL || rnet == NULL) {
640                 CERROR("Out of memory creating route %s %d %s\n",
641                        libcfs_net2str(net), hops, libcfs_nidstr(gateway));
642                 if (route != NULL)
643                         LIBCFS_FREE(route, sizeof(*route));
644                 if (rnet != NULL)
645                         LIBCFS_FREE(rnet, sizeof(*rnet));
646                 return -ENOMEM;
647         }
648
649         INIT_LIST_HEAD(&rnet->lrn_routes);
650         rnet->lrn_net = net;
651         /* store the local and remote net that the route represents */
652         route->lr_lnet = LNET_NID_NET(gateway);
653         route->lr_net = net;
654         route->lr_nid = *gateway;
655         route->lr_priority = priority;
656         route->lr_hops = hops;
657         if (lnet_peers_start_down())
658                 atomic_set(&route->lr_alive, 0);
659         else
660                 atomic_set(&route->lr_alive, 1);
661
662         lnet_net_lock(LNET_LOCK_EX);
663
664         /*
665          * lnet_nid2peerni_ex() grabs a ref on the lpni. We will need to
666          * lose that once we're done
667          */
668         lpni = lnet_nid2peerni_ex(gateway);
669         if (IS_ERR(lpni)) {
670                 lnet_net_unlock(LNET_LOCK_EX);
671
672                 LIBCFS_FREE(route, sizeof(*route));
673                 LIBCFS_FREE(rnet, sizeof(*rnet));
674
675                 rc = PTR_ERR(lpni);
676                 CERROR("Error %d creating route %s %d %s\n", rc,
677                         libcfs_net2str(net), hops,
678                         libcfs_nidstr(gateway));
679                 return rc;
680         }
681
682         LASSERT(lpni);
683         LASSERT(lpni->lpni_peer_net);
684         LASSERT(lpni->lpni_peer_net->lpn_peer);
685         gw = lpni->lpni_peer_net->lpn_peer;
686
687         route->lr_gateway = gw;
688
689         rnet2 = lnet_find_rnet_locked(net);
690         if (rnet2 == NULL) {
691                 /* new network */
692                 list_add_tail(&rnet->lrn_list, lnet_net2rnethash(net));
693                 rnet2 = rnet;
694         }
695
696         /* Search for a duplicate route (it's a NOOP if it is) */
697         add_route = 1;
698         list_for_each(route_entry, &rnet2->lrn_routes) {
699                 struct lnet_route *route2;
700
701                 route2 = list_entry(route_entry, struct lnet_route, lr_list);
702                 if (route2->lr_gateway == route->lr_gateway) {
703                         add_route = 0;
704                         break;
705                 }
706
707                 /* our lookups must be true */
708                 LASSERT(!nid_same(&route2->lr_gateway->lp_primary_nid,
709                                   gateway));
710         }
711
712         /*
713          * It is possible to add multiple routes through the same peer,
714          * but it'll be using a different NID of that peer. When the
715          * gateway is discovered, discovery will consolidate the different
716          * peers into one peer. In this case the discovery code will have
717          * to move the routes from the peer that's being deleted to the
718          * consolidated peer lp_routes list
719          */
720         if (add_route) {
721                 gw->lp_health_sensitivity = sensitivity;
722                 lnet_add_route_to_rnet(rnet2, route);
723                 if (lnet_peer_discovery_disabled)
724                         CWARN("Consider turning discovery on to enable full Multi-Rail routing functionality\n");
725         }
726
727         /*
728          * get rid of the reference on the lpni.
729          */
730         lnet_peer_ni_decref_locked(lpni);
731         lnet_net_unlock(LNET_LOCK_EX);
732
733         /* If avoid_asym_router_failure is enabled and hop count is not
734          * set to 1 for a route that is actually single-hop, then the
735          * feature will fail to prevent the router from being selected
736          * if it is missing a NI on the remote network due to misconfiguration.
737          */
738         if (avoid_asym_router_failure && hops == LNET_UNDEFINED_HOPS)
739                 CWARN("Use hops = 1 for a single-hop route when avoid_asym_router_failure feature is enabled\n");
740
741         rc = 0;
742
743         if (!add_route) {
744                 rc = -EEXIST;
745                 LIBCFS_FREE(route, sizeof(*route));
746         }
747
748         if (rnet != rnet2)
749                 LIBCFS_FREE(rnet, sizeof(*rnet));
750
751         /* kick start the monitor thread to handle the added route */
752         complete(&the_lnet.ln_mt_wait_complete);
753
754         return rc;
755 }
756
757 void
758 lnet_del_route_from_rnet(struct lnet_nid *gw_nid,
759                          struct list_head *route_list,
760                          struct list_head *zombies)
761 {
762         struct lnet_peer *gateway;
763         struct lnet_route *route;
764         struct lnet_route *tmp;
765
766         list_for_each_entry_safe(route, tmp, route_list, lr_list) {
767                 gateway = route->lr_gateway;
768                 if (gw_nid && !nid_same(gw_nid, &gateway->lp_primary_nid))
769                         continue;
770
771                 /*
772                  * move to zombie to delete outside the lock
773                  * Note that this function is called with the
774                  * ln_api_mutex held as well as the exclusive net
775                  * lock. Adding to the remote net list happens
776                  * under the same conditions. Same goes for the
777                  * gateway router list
778                  */
779                 list_move(&route->lr_list, zombies);
780                 the_lnet.ln_remote_nets_version++;
781
782                 list_del(&route->lr_gwlist);
783                 lnet_rtr_decref_locked(gateway);
784         }
785 }
786
787 int
788 lnet_del_route(__u32 net, struct lnet_nid *gw)
789 __must_hold(&the_lnet.ln_api_mutex)
790 {
791         LIST_HEAD(rnet_zombies);
792         struct lnet_remotenet *rnet;
793         struct lnet_remotenet *tmp;
794         struct list_head *rn_list;
795         struct lnet_peer_ni *lpni;
796         struct lnet_route *route;
797         struct lnet_nid gw_nid;
798         LIST_HEAD(zombies);
799         struct lnet_peer *lp = NULL;
800         int i = 0;
801
802         CDEBUG(D_NET, "Del route: net %s : gw %s\n",
803                libcfs_net2str(net), libcfs_nidstr(gw));
804
805         /* NB Caller may specify either all routes via the given gateway
806          * or a specific route entry actual NIDs) */
807
808         lnet_net_lock(LNET_LOCK_EX);
809
810         if (gw)
811                 lpni = lnet_peer_ni_find_locked(gw);
812         else
813                 lpni = NULL;
814         if (lpni) {
815                 lp = lpni->lpni_peer_net->lpn_peer;
816                 LASSERT(lp);
817                 gw_nid = lp->lp_primary_nid;
818                 gw = &gw_nid;
819         }
820
821         if (net != LNET_NET_ANY) {
822                 rnet = lnet_find_rnet_locked(net);
823                 if (!rnet) {
824                         if (lpni)
825                                 lnet_peer_ni_decref_locked(lpni);
826                         lnet_net_unlock(LNET_LOCK_EX);
827                         return -ENOENT;
828                 }
829                 lnet_del_route_from_rnet(gw, &rnet->lrn_routes,
830                                          &zombies);
831                 if (list_empty(&rnet->lrn_routes))
832                         list_move(&rnet->lrn_list, &rnet_zombies);
833                 goto delete_zombies;
834         }
835
836         for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
837                 rn_list = &the_lnet.ln_remote_nets_hash[i];
838
839                 list_for_each_entry_safe(rnet, tmp, rn_list, lrn_list) {
840                         lnet_del_route_from_rnet(gw, &rnet->lrn_routes,
841                                                  &zombies);
842                         if (list_empty(&rnet->lrn_routes))
843                                 list_move(&rnet->lrn_list, &rnet_zombies);
844                 }
845         }
846
847 delete_zombies:
848         /*
849          * check if there are any routes remaining on the gateway
850          * If there are no more routes make sure to set the peer's
851          * lp_disc_net_id to 0 (invalid), in case we add more routes in
852          * the future on that gateway, then we start our discovery process
853          * from scratch
854          */
855         if (lpni) {
856                 if (list_empty(&lp->lp_routes))
857                         lp->lp_disc_net_id = 0;
858                 lnet_peer_ni_decref_locked(lpni);
859         }
860
861         lnet_net_unlock(LNET_LOCK_EX);
862
863         while (!list_empty(&zombies)) {
864                 route = list_first_entry(&zombies, struct lnet_route, lr_list);
865                 list_del(&route->lr_list);
866                 LIBCFS_FREE(route, sizeof(*route));
867         }
868
869         while (!list_empty(&rnet_zombies)) {
870                 rnet = list_first_entry(&rnet_zombies, struct lnet_remotenet,
871                                         lrn_list);
872                 list_del(&rnet->lrn_list);
873                 LIBCFS_FREE(rnet, sizeof(*rnet));
874         }
875
876         return 0;
877 }
878
879 void
880 lnet_destroy_routes(void)
881 {
882         lnet_del_route(LNET_NET_ANY, NULL);
883 }
884
885 int lnet_get_rtr_pool_cfg(int cpt, struct lnet_ioctl_pool_cfg *pool_cfg)
886 {
887         struct lnet_rtrbufpool *rbp;
888         int i, rc = -ENOENT, j;
889
890         if (the_lnet.ln_rtrpools == NULL)
891                 return rc;
892
893
894         cfs_percpt_for_each(rbp, i, the_lnet.ln_rtrpools) {
895                 if (i != cpt)
896                         continue;
897
898                 lnet_net_lock(i);
899                 for (j = 0; j < LNET_NRBPOOLS; j++) {
900                         pool_cfg->pl_pools[j].pl_npages = rbp[j].rbp_npages;
901                         pool_cfg->pl_pools[j].pl_nbuffers = rbp[j].rbp_nbuffers;
902                         pool_cfg->pl_pools[j].pl_credits = rbp[j].rbp_credits;
903                         pool_cfg->pl_pools[j].pl_mincredits = rbp[j].rbp_mincredits;
904                 }
905                 lnet_net_unlock(i);
906                 rc = 0;
907                 break;
908         }
909
910         lnet_net_lock(LNET_LOCK_EX);
911         pool_cfg->pl_routing = the_lnet.ln_routing;
912         lnet_net_unlock(LNET_LOCK_EX);
913
914         return rc;
915 }
916
917 int
918 lnet_get_route(int idx, __u32 *net, __u32 *hops, lnet_nid_t *gateway,
919                __u32 *flags, __u32 *priority, __u32 *sensitivity)
920 {
921         struct lnet_remotenet *rnet;
922         struct list_head *rn_list;
923         struct lnet_route *route;
924         int cpt;
925         int i;
926
927         cpt = lnet_net_lock_current();
928
929         for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
930                 rn_list = &the_lnet.ln_remote_nets_hash[i];
931                 list_for_each_entry(rnet, rn_list, lrn_list) {
932                         list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
933                                 if (idx-- == 0) {
934                                         *net      = rnet->lrn_net;
935                                         *gateway  = lnet_nid_to_nid4(&route->lr_nid);
936                                         *hops     = route->lr_hops;
937                                         *priority = route->lr_priority;
938                                         *sensitivity = route->lr_gateway->
939                                                 lp_health_sensitivity;
940                                         if (lnet_is_route_alive(route))
941                                                 *flags |= LNET_RT_ALIVE;
942                                         else
943                                                 *flags &= ~LNET_RT_ALIVE;
944                                         if (route->lr_single_hop)
945                                                 *flags &= ~LNET_RT_MULTI_HOP;
946                                         else
947                                                 *flags |= LNET_RT_MULTI_HOP;
948                                         lnet_net_unlock(cpt);
949                                         return 0;
950                                 }
951                         }
952                 }
953         }
954
955         lnet_net_unlock(cpt);
956         return -ENOENT;
957 }
958
959 static void
960 lnet_wait_known_routerstate(void)
961 {
962         struct lnet_peer *rtr;
963         int all_known;
964
965         LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
966
967         for (;;) {
968                 int cpt = lnet_net_lock_current();
969
970                 all_known = 1;
971                 list_for_each_entry(rtr, &the_lnet.ln_routers, lp_rtr_list) {
972                         spin_lock(&rtr->lp_lock);
973
974                         if ((rtr->lp_state & LNET_PEER_RTR_DISCOVERED) == 0) {
975                                 all_known = 0;
976                                 spin_unlock(&rtr->lp_lock);
977                                 break;
978                         }
979                         spin_unlock(&rtr->lp_lock);
980                 }
981
982                 lnet_net_unlock(cpt);
983
984                 if (all_known)
985                         return;
986
987                 schedule_timeout_uninterruptible(cfs_time_seconds(1));
988         }
989 }
990
991 static inline bool
992 lnet_net_set_status_locked(struct lnet_net *net, __u32 status)
993 {
994         struct lnet_ni *ni;
995         bool update = false;
996
997         list_for_each_entry(ni, &net->net_ni_list, ni_netlist)
998                 if (lnet_ni_set_status(ni, status))
999                         update = true;
1000
1001         return update;
1002 }
1003
1004 static bool
1005 lnet_update_ni_status_locked(void)
1006 {
1007         struct lnet_net *net;
1008         struct lnet_ni *ni;
1009         bool push = false;
1010         time64_t now;
1011         time64_t timeout;
1012
1013         LASSERT(the_lnet.ln_routing);
1014
1015         timeout = router_ping_timeout + alive_router_check_interval;
1016
1017         now = ktime_get_seconds();
1018         list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
1019                 if (net->net_lnd->lnd_type == LOLND)
1020                         continue;
1021
1022                 if (now < net->net_last_alive + timeout)
1023                         goto check_ni_fatal;
1024
1025                 spin_lock(&net->net_lock);
1026                 /* re-check with lock */
1027                 if (now < net->net_last_alive + timeout) {
1028                         spin_unlock(&net->net_lock);
1029                         goto check_ni_fatal;
1030                 }
1031                 spin_unlock(&net->net_lock);
1032
1033                 /*
1034                  * if the net didn't receive any traffic for past the
1035                  * timeout on any of its constituent NIs, then mark all
1036                  * the NIs down.
1037                  */
1038                 if (lnet_net_set_status_locked(net, LNET_NI_STATUS_DOWN)) {
1039                         push = true;
1040                         continue;
1041                 }
1042
1043 check_ni_fatal:
1044                 list_for_each_entry(ni, &net->net_ni_list, ni_netlist) {
1045                         /* lnet_ni_set_status() will perform the same check of
1046                          * ni_status while holding the ni lock. We can safely
1047                          * check ni_status without that lock because it is only
1048                          * written to under net_lock/EX and our caller is
1049                          * holding a net lock.
1050                          */
1051                         if (atomic_read(&ni->ni_fatal_error_on) &&
1052                             ni->ni_status &&
1053                             *ni->ni_status != LNET_NI_STATUS_DOWN &&
1054                             lnet_ni_set_status(ni, LNET_NI_STATUS_DOWN))
1055                                 push = true;
1056                 }
1057         }
1058
1059         return push;
1060 }
1061
1062 void lnet_wait_router_start(void)
1063 {
1064         if (check_routers_before_use) {
1065                 /* Note that a helpful side-effect of pinging all known routers
1066                  * at startup is that it makes them drop stale connections they
1067                  * may have to a previous instance of me. */
1068                 lnet_wait_known_routerstate();
1069         }
1070 }
1071
1072 /*
1073  * This function is called from the monitor thread to check if there are
1074  * any active routers that need to be checked.
1075  */
1076 bool lnet_router_checker_active(void)
1077 {
1078         /* Router Checker thread needs to run when routing is enabled in
1079          * order to call lnet_update_ni_status_locked() */
1080         if (the_lnet.ln_routing)
1081                 return true;
1082
1083         return !list_empty(&the_lnet.ln_routers) &&
1084                 alive_router_check_interval > 0;
1085 }
1086
1087 void
1088 lnet_check_routers(void)
1089 {
1090         struct lnet_peer_net *first_lpn;
1091         struct lnet_peer_net *lpn;
1092         struct lnet_peer_ni *lpni;
1093         struct lnet_peer *rtr;
1094         bool push = false;
1095         bool needs_ping;
1096         bool found_lpn;
1097         __u64 version;
1098         __u32 net_id;
1099         time64_t now;
1100         int cpt;
1101         int rc;
1102
1103         cpt = lnet_net_lock_current();
1104 rescan:
1105         version = the_lnet.ln_routers_version;
1106
1107         list_for_each_entry(rtr, &the_lnet.ln_routers, lp_rtr_list) {
1108                 /* If we're currently discovering the peer then don't
1109                  * issue another discovery
1110                  */
1111                 if (rtr->lp_state & LNET_PEER_RTR_DISCOVERY)
1112                         continue;
1113
1114                 now = ktime_get_real_seconds();
1115
1116                 /* find the next local peer net which needs to be ping'd */
1117                 needs_ping = false;
1118                 first_lpn = NULL;
1119                 found_lpn = false;
1120                 net_id = rtr->lp_disc_net_id;
1121                 do {
1122                         lpn = lnet_get_next_peer_net_locked(rtr, net_id);
1123                         if (!lpn) {
1124                                 CERROR("gateway %s has no networks\n",
1125                                 libcfs_nidstr(&rtr->lp_primary_nid));
1126                                 break;
1127                         }
1128
1129                         /* We looped back to the first peer net */
1130                         if (first_lpn == lpn)
1131                                 break;
1132                         if (!first_lpn)
1133                                 first_lpn = lpn;
1134
1135                         net_id = lpn->lpn_net_id;
1136                         if (!lnet_islocalnet_locked(net_id))
1137                                 continue;
1138
1139                         found_lpn = true;
1140
1141                         CDEBUG(D_NET, "rtr %s(%p) %s(%p) next ping %lld\n",
1142                                libcfs_nidstr(&rtr->lp_primary_nid), rtr,
1143                                libcfs_net2str(net_id), lpn,
1144                                lpn->lpn_next_ping);
1145
1146                         needs_ping = now >= lpn->lpn_next_ping;
1147
1148                 } while (!needs_ping);
1149
1150                 if (!found_lpn || !lpn) {
1151                         CERROR("no local network found for gateway %s\n",
1152                                libcfs_nidstr(&rtr->lp_primary_nid));
1153                         continue;
1154                 }
1155
1156                 if (!needs_ping)
1157                         continue;
1158
1159                 spin_lock(&rtr->lp_lock);
1160                 /* make sure we fully discover the router */
1161                 rtr->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
1162                 rtr->lp_state |= LNET_PEER_FORCE_PING | LNET_PEER_FORCE_PUSH |
1163                         LNET_PEER_RTR_DISCOVERY;
1164                 spin_unlock(&rtr->lp_lock);
1165
1166                 /* find the peer_ni associated with the primary NID */
1167                 lpni = lnet_peer_ni_get_locked(rtr, &rtr->lp_primary_nid);
1168                 if (!lpni) {
1169                         CDEBUG(D_NET, "Expected to find an lpni for %s, but non found\n",
1170                                libcfs_nidstr(&rtr->lp_primary_nid));
1171                         continue;
1172                 }
1173                 lnet_peer_ni_addref_locked(lpni);
1174
1175                 /* specify the net to use */
1176                 rtr->lp_disc_net_id = lpn->lpn_net_id;
1177
1178                 /* discover the router */
1179                 CDEBUG(D_NET, "discover %s, cpt = %d\n",
1180                        libcfs_nidstr(&lpni->lpni_nid), cpt);
1181                 rc = lnet_discover_peer_locked(lpni, cpt, false);
1182
1183                 /* drop ref taken above */
1184                 lnet_peer_ni_decref_locked(lpni);
1185
1186                 if (!rc)
1187                         lpn->lpn_next_ping = now + alive_router_check_interval;
1188                 else
1189                         CERROR("Failed to discover router %s\n",
1190                                libcfs_nidstr(&rtr->lp_primary_nid));
1191
1192                 /* NB cpt lock was dropped in lnet_discover_peer_locked() */
1193                 if (version != the_lnet.ln_routers_version) {
1194                         /* the routers list has changed */
1195                         goto rescan;
1196                 }
1197         }
1198
1199         if (the_lnet.ln_routing)
1200                 push = lnet_update_ni_status_locked();
1201
1202         lnet_net_unlock(cpt);
1203
1204         /* if the status of the ni changed update the peers */
1205         if (push)
1206                 lnet_push_update_to_peers(1);
1207 }
1208
1209 void
1210 lnet_destroy_rtrbuf(struct lnet_rtrbuf *rb, int npages)
1211 {
1212         int sz = offsetof(struct lnet_rtrbuf, rb_kiov[npages]);
1213
1214         while (--npages >= 0)
1215                 __free_page(rb->rb_kiov[npages].bv_page);
1216
1217         LIBCFS_FREE(rb, sz);
1218 }
1219
1220 static struct lnet_rtrbuf *
1221 lnet_new_rtrbuf(struct lnet_rtrbufpool *rbp, int cpt)
1222 {
1223         int            npages = rbp->rbp_npages;
1224         int            sz = offsetof(struct lnet_rtrbuf, rb_kiov[npages]);
1225         struct page   *page;
1226         struct lnet_rtrbuf *rb;
1227         int            i;
1228
1229         LIBCFS_CPT_ALLOC(rb, lnet_cpt_table(), cpt, sz);
1230         if (rb == NULL)
1231                 return NULL;
1232
1233         rb->rb_pool = rbp;
1234
1235         for (i = 0; i < npages; i++) {
1236                 page = cfs_page_cpt_alloc(lnet_cpt_table(), cpt, GFP_KERNEL |
1237                                           __GFP_ZERO | __GFP_NORETRY);
1238                 if (page == NULL) {
1239                         while (--i >= 0)
1240                                 __free_page(rb->rb_kiov[i].bv_page);
1241
1242                         LIBCFS_FREE(rb, sz);
1243                         return NULL;
1244                 }
1245
1246                 rb->rb_kiov[i].bv_len = PAGE_SIZE;
1247                 rb->rb_kiov[i].bv_offset = 0;
1248                 rb->rb_kiov[i].bv_page = page;
1249         }
1250
1251         return rb;
1252 }
1253
1254 static void
1255 lnet_rtrpool_free_bufs(struct lnet_rtrbufpool *rbp, int cpt)
1256 {
1257         int npages = rbp->rbp_npages;
1258         struct lnet_rtrbuf *rb;
1259         LIST_HEAD(tmp);
1260
1261         if (rbp->rbp_nbuffers == 0) /* not initialized or already freed */
1262                 return;
1263
1264         lnet_net_lock(cpt);
1265         list_splice_init(&rbp->rbp_msgs, &tmp);
1266         lnet_drop_routed_msgs_locked(&tmp, cpt);
1267         list_splice_init(&rbp->rbp_bufs, &tmp);
1268         rbp->rbp_req_nbuffers = 0;
1269         rbp->rbp_nbuffers = rbp->rbp_credits = 0;
1270         rbp->rbp_mincredits = 0;
1271         lnet_net_unlock(cpt);
1272
1273         /* Free buffers on the free list. */
1274         while (!list_empty(&tmp)) {
1275                 rb = list_first_entry(&tmp, struct lnet_rtrbuf, rb_list);
1276                 list_del(&rb->rb_list);
1277                 lnet_destroy_rtrbuf(rb, npages);
1278         }
1279 }
1280
1281 static int
1282 lnet_rtrpool_adjust_bufs(struct lnet_rtrbufpool *rbp, int nbufs, int cpt)
1283 {
1284         LIST_HEAD(rb_list);
1285         struct lnet_rtrbuf *rb;
1286         int             num_rb;
1287         int             num_buffers = 0;
1288         int             old_req_nbufs;
1289         int             npages = rbp->rbp_npages;
1290
1291         lnet_net_lock(cpt);
1292         /* If we are called for less buffers than already in the pool, we
1293          * just lower the req_nbuffers number and excess buffers will be
1294          * thrown away as they are returned to the free list.  Credits
1295          * then get adjusted as well.
1296          * If we already have enough buffers allocated to serve the
1297          * increase requested, then we can treat that the same way as we
1298          * do the decrease. */
1299         num_rb = nbufs - rbp->rbp_nbuffers;
1300         if (nbufs <= rbp->rbp_req_nbuffers || num_rb <= 0) {
1301                 rbp->rbp_req_nbuffers = nbufs;
1302                 lnet_net_unlock(cpt);
1303                 return 0;
1304         }
1305         /* store the older value of rbp_req_nbuffers and then set it to
1306          * the new request to prevent lnet_return_rx_credits_locked() from
1307          * freeing buffers that we need to keep around */
1308         old_req_nbufs = rbp->rbp_req_nbuffers;
1309         rbp->rbp_req_nbuffers = nbufs;
1310         lnet_net_unlock(cpt);
1311
1312         /* allocate the buffers on a local list first.  If all buffers are
1313          * allocated successfully then join this list to the rbp buffer
1314          * list.  If not then free all allocated buffers. */
1315         while (num_rb-- > 0) {
1316                 rb = lnet_new_rtrbuf(rbp, cpt);
1317                 if (rb == NULL) {
1318                         CERROR("lnet: error allocating %ux%u page router buffers on CPT %u: rc = %d\n",
1319                                nbufs, npages, cpt, -ENOMEM);
1320
1321                         lnet_net_lock(cpt);
1322                         rbp->rbp_req_nbuffers = old_req_nbufs;
1323                         lnet_net_unlock(cpt);
1324
1325                         goto failed;
1326                 }
1327
1328                 list_add(&rb->rb_list, &rb_list);
1329                 num_buffers++;
1330         }
1331
1332         lnet_net_lock(cpt);
1333
1334         list_splice_tail(&rb_list, &rbp->rbp_bufs);
1335         rbp->rbp_nbuffers += num_buffers;
1336         rbp->rbp_credits += num_buffers;
1337         rbp->rbp_mincredits = rbp->rbp_credits;
1338         /* We need to schedule blocked msg using the newly
1339          * added buffers. */
1340         while (!list_empty(&rbp->rbp_bufs) &&
1341                !list_empty(&rbp->rbp_msgs))
1342                 lnet_schedule_blocked_locked(rbp);
1343
1344         lnet_net_unlock(cpt);
1345
1346         return 0;
1347
1348 failed:
1349         while ((rb = list_first_entry_or_null(&rb_list,
1350                                               struct lnet_rtrbuf,
1351                                               rb_list)) != NULL) {
1352                 list_del(&rb->rb_list);
1353                 lnet_destroy_rtrbuf(rb, npages);
1354         }
1355
1356         return -ENOMEM;
1357 }
1358
1359 static void
1360 lnet_rtrpool_init(struct lnet_rtrbufpool *rbp, int npages)
1361 {
1362         INIT_LIST_HEAD(&rbp->rbp_msgs);
1363         INIT_LIST_HEAD(&rbp->rbp_bufs);
1364
1365         rbp->rbp_npages = npages;
1366         rbp->rbp_credits = 0;
1367         rbp->rbp_mincredits = 0;
1368 }
1369
1370 void
1371 lnet_rtrpools_free(int keep_pools)
1372 {
1373         struct lnet_rtrbufpool *rtrp;
1374         int               i;
1375
1376         if (the_lnet.ln_rtrpools == NULL) /* uninitialized or freed */
1377                 return;
1378
1379         cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1380                 lnet_rtrpool_free_bufs(&rtrp[LNET_TINY_BUF_IDX], i);
1381                 lnet_rtrpool_free_bufs(&rtrp[LNET_SMALL_BUF_IDX], i);
1382                 lnet_rtrpool_free_bufs(&rtrp[LNET_LARGE_BUF_IDX], i);
1383         }
1384
1385         if (!keep_pools) {
1386                 cfs_percpt_free(the_lnet.ln_rtrpools);
1387                 the_lnet.ln_rtrpools = NULL;
1388         }
1389 }
1390
1391 static int
1392 lnet_nrb_tiny_calculate(void)
1393 {
1394         int nrbs = LNET_NRB_TINY;
1395
1396         if (tiny_router_buffers < 0) {
1397                 LCONSOLE_ERROR_MSG(0x10c,
1398                                    "tiny_router_buffers=%d invalid when "
1399                                    "routing enabled\n", tiny_router_buffers);
1400                 return -EINVAL;
1401         }
1402
1403         if (tiny_router_buffers > 0) {
1404                 if (tiny_router_buffers < LNET_NRB_TINY_MIN)
1405                         CWARN("tiny_router_buffers=%d less than recommended minimum %d\n",
1406                               tiny_router_buffers, LNET_NRB_TINY_MIN);
1407                 nrbs = tiny_router_buffers;
1408         }
1409
1410         nrbs /= LNET_CPT_NUMBER;
1411         return max(nrbs, 1);
1412 }
1413
1414 static int
1415 lnet_nrb_small_calculate(void)
1416 {
1417         int nrbs = LNET_NRB_SMALL;
1418
1419         if (small_router_buffers < 0) {
1420                 LCONSOLE_ERROR_MSG(0x10c,
1421                                    "small_router_buffers=%d invalid when "
1422                                    "routing enabled\n", small_router_buffers);
1423                 return -EINVAL;
1424         }
1425
1426         if (small_router_buffers > 0) {
1427                 if (small_router_buffers < LNET_NRB_SMALL_MIN)
1428                         CWARN("small_router_buffers=%d less than recommended minimum %d\n",
1429                               small_router_buffers, LNET_NRB_SMALL_MIN);
1430                 nrbs = small_router_buffers;
1431         }
1432
1433         nrbs /= LNET_CPT_NUMBER;
1434         return max(nrbs, 1);
1435 }
1436
1437 static int
1438 lnet_nrb_large_calculate(void)
1439 {
1440         int nrbs = LNET_NRB_LARGE;
1441
1442         if (large_router_buffers < 0) {
1443                 LCONSOLE_ERROR_MSG(0x10c,
1444                                    "large_router_buffers=%d invalid when "
1445                                    "routing enabled\n", large_router_buffers);
1446                 return -EINVAL;
1447         }
1448
1449         if (large_router_buffers > 0) {
1450                 if (large_router_buffers < LNET_NRB_LARGE_MIN)
1451                         CWARN("large_router_buffers=%d less than recommended minimum %d\n",
1452                               large_router_buffers, LNET_NRB_LARGE_MIN);
1453                 nrbs = large_router_buffers;
1454         }
1455
1456         nrbs /= LNET_CPT_NUMBER;
1457         return max(nrbs, 1);
1458 }
1459
1460 int
1461 lnet_rtrpools_alloc(int im_a_router)
1462 {
1463         struct lnet_rtrbufpool *rtrp;
1464         int     nrb_tiny;
1465         int     nrb_small;
1466         int     nrb_large;
1467         int     rc;
1468         int     i;
1469
1470         if (!strcmp(forwarding, "")) {
1471                 /* not set either way */
1472                 if (!im_a_router)
1473                         return 0;
1474         } else if (!strcmp(forwarding, "disabled")) {
1475                 /* explicitly disabled */
1476                 return 0;
1477         } else if (!strcmp(forwarding, "enabled")) {
1478                 /* explicitly enabled */
1479         } else {
1480                 rc = -EINVAL;
1481                 LCONSOLE_ERROR_MSG(0x10b,
1482                                    "lnet: forwarding='%s' not set to either 'enabled' or 'disabled': rc = %d\n",
1483                                    forwarding, rc);
1484                 return rc;
1485         }
1486
1487         nrb_tiny = lnet_nrb_tiny_calculate();
1488         if (nrb_tiny < 0)
1489                 return -EINVAL;
1490
1491         nrb_small = lnet_nrb_small_calculate();
1492         if (nrb_small < 0)
1493                 return -EINVAL;
1494
1495         nrb_large = lnet_nrb_large_calculate();
1496         if (nrb_large < 0)
1497                 return -EINVAL;
1498
1499         the_lnet.ln_rtrpools = cfs_percpt_alloc(lnet_cpt_table(),
1500                                                 LNET_NRBPOOLS *
1501                                                 sizeof(struct lnet_rtrbufpool));
1502         if (the_lnet.ln_rtrpools == NULL) {
1503                 rc = -ENOMEM;
1504                 LCONSOLE_ERROR_MSG(0x10c,
1505                         "lnet: error allocating router buffer pool: rc = %d\n",
1506                         rc);
1507                 return rc;
1508         }
1509
1510         cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1511                 lnet_rtrpool_init(&rtrp[LNET_TINY_BUF_IDX], 0);
1512                 rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
1513                                               nrb_tiny, i);
1514                 if (rc)
1515                         goto failed;
1516
1517                 lnet_rtrpool_init(&rtrp[LNET_SMALL_BUF_IDX],
1518                                   LNET_NRB_SMALL_PAGES);
1519                 rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
1520                                               nrb_small, i);
1521                 if (rc)
1522                         goto failed;
1523
1524                 lnet_rtrpool_init(&rtrp[LNET_LARGE_BUF_IDX],
1525                                   LNET_NRB_LARGE_PAGES);
1526                 rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
1527                                               nrb_large, i);
1528                 if (rc)
1529                         goto failed;
1530         }
1531
1532         lnet_net_lock(LNET_LOCK_EX);
1533         the_lnet.ln_routing = 1;
1534         lnet_net_unlock(LNET_LOCK_EX);
1535         complete(&the_lnet.ln_mt_wait_complete);
1536         return 0;
1537
1538  failed:
1539         lnet_rtrpools_free(0);
1540         return rc;
1541 }
1542
1543 static int
1544 lnet_rtrpools_adjust_helper(int tiny, int small, int large)
1545 {
1546         int nrb = 0;
1547         int rc = 0;
1548         int i;
1549         struct lnet_rtrbufpool *rtrp;
1550
1551         /* If the provided values for each buffer pool are different than the
1552          * configured values, we need to take action. */
1553         if (tiny >= 0) {
1554                 tiny_router_buffers = tiny;
1555                 nrb = lnet_nrb_tiny_calculate();
1556                 cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1557                         rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
1558                                                       nrb, i);
1559                         if (rc != 0)
1560                                 return rc;
1561                 }
1562         }
1563         if (small >= 0) {
1564                 small_router_buffers = small;
1565                 nrb = lnet_nrb_small_calculate();
1566                 cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1567                         rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
1568                                                       nrb, i);
1569                         if (rc != 0)
1570                                 return rc;
1571                 }
1572         }
1573         if (large >= 0) {
1574                 large_router_buffers = large;
1575                 nrb = lnet_nrb_large_calculate();
1576                 cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1577                         rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
1578                                                       nrb, i);
1579                         if (rc != 0)
1580                                 return rc;
1581                 }
1582         }
1583
1584         return 0;
1585 }
1586
1587 int
1588 lnet_rtrpools_adjust(int tiny, int small, int large)
1589 {
1590         /* this function doesn't revert the changes if adding new buffers
1591          * failed.  It's up to the user space caller to revert the
1592          * changes. */
1593
1594         if (!the_lnet.ln_routing)
1595                 return 0;
1596
1597         return lnet_rtrpools_adjust_helper(tiny, small, large);
1598 }
1599
1600 int
1601 lnet_rtrpools_enable(void)
1602 {
1603         int rc = 0;
1604
1605         if (the_lnet.ln_routing)
1606                 return 0;
1607
1608         if (the_lnet.ln_rtrpools == NULL)
1609                 /* If routing is turned off, and we have never
1610                  * initialized the pools before, just call the
1611                  * standard buffer pool allocation routine as
1612                  * if we are just configuring this for the first
1613                  * time. */
1614                 rc = lnet_rtrpools_alloc(1);
1615         else
1616                 rc = lnet_rtrpools_adjust_helper(0, 0, 0);
1617         if (rc != 0)
1618                 return rc;
1619
1620         lnet_net_lock(LNET_LOCK_EX);
1621         the_lnet.ln_routing = 1;
1622
1623         the_lnet.ln_ping_target->pb_info.pi_features &=
1624                 ~LNET_PING_FEAT_RTE_DISABLED;
1625         lnet_net_unlock(LNET_LOCK_EX);
1626
1627         if (lnet_peer_discovery_disabled)
1628                 CWARN("Consider turning discovery on to enable full "
1629                       "Multi-Rail routing functionality\n");
1630
1631         return rc;
1632 }
1633
1634 void
1635 lnet_rtrpools_disable(void)
1636 {
1637         if (!the_lnet.ln_routing)
1638                 return;
1639
1640         lnet_net_lock(LNET_LOCK_EX);
1641         the_lnet.ln_routing = 0;
1642         the_lnet.ln_ping_target->pb_info.pi_features |=
1643                 LNET_PING_FEAT_RTE_DISABLED;
1644
1645         tiny_router_buffers = 0;
1646         small_router_buffers = 0;
1647         large_router_buffers = 0;
1648         lnet_net_unlock(LNET_LOCK_EX);
1649         lnet_rtrpools_free(1);
1650 }
1651
1652 static inline void
1653 lnet_notify_peer_down(struct lnet_ni *ni, struct lnet_nid *nid)
1654 {
1655         if (ni->ni_net->net_lnd->lnd_notify_peer_down != NULL)
1656                 (ni->ni_net->net_lnd->lnd_notify_peer_down)(nid);
1657 }
1658
1659 /*
1660  * ni: local NI used to communicate with the peer
1661  * nid: peer NID
1662  * alive: true if peer is alive, false otherwise
1663  * reset: reset health value. This is requested by the LND.
1664  * when: notificaiton time.
1665  */
1666 int
1667 lnet_notify(struct lnet_ni *ni, struct lnet_nid *nid, bool alive, bool reset,
1668             time64_t when)
1669 {
1670         struct lnet_peer_ni *lpni = NULL;
1671         struct lnet_route *route;
1672         struct lnet_peer *lp;
1673         time64_t now = ktime_get_seconds();
1674         int cpt;
1675
1676         LASSERT(!in_interrupt());
1677
1678         CDEBUG(D_NET, "%s notifying %s: %s\n",
1679                (ni == NULL) ? "userspace" : libcfs_nidstr(&ni->ni_nid),
1680                libcfs_nidstr(nid), alive ? "up" : "down");
1681
1682         if (ni != NULL &&
1683             LNET_NID_NET(&ni->ni_nid) != LNET_NID_NET(nid)) {
1684                 CWARN("Ignoring notification of %s %s by %s (different net)\n",
1685                       libcfs_nidstr(nid), alive ? "birth" : "death",
1686                       libcfs_nidstr(&ni->ni_nid));
1687                 return -EINVAL;
1688         }
1689
1690         /* can't do predictions... */
1691         if (when > now) {
1692                 CWARN("Ignoring prediction from %s of %s %s %lld seconds in the future\n",
1693                         ni ? libcfs_nidstr(&ni->ni_nid) :  "userspace",
1694                         libcfs_nidstr(nid), alive ? "up" : "down", when - now);
1695                 return -EINVAL;
1696         }
1697
1698         if (ni != NULL && !alive &&             /* LND telling me she's down */
1699             !auto_down) {                       /* auto-down disabled */
1700                 CDEBUG(D_NET, "Auto-down disabled\n");
1701                 return 0;
1702         }
1703
1704         /* must lock 0 since this is used for synchronization */
1705         lnet_net_lock(0);
1706
1707         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
1708                 lnet_net_unlock(0);
1709                 return -ESHUTDOWN;
1710         }
1711
1712         lpni = lnet_peer_ni_find_locked(nid);
1713         if (lpni == NULL) {
1714                 /* nid not found */
1715                 lnet_net_unlock(0);
1716                 CDEBUG(D_NET, "%s not found\n", libcfs_nidstr(nid));
1717                 return 0;
1718         }
1719
1720         if (alive) {
1721                 if (reset) {
1722                         lpni->lpni_ns_status = LNET_NI_STATUS_UP;
1723                         lnet_set_lpni_healthv_locked(lpni,
1724                                                      LNET_MAX_HEALTH_VALUE);
1725                 } else {
1726                         lnet_inc_lpni_healthv_locked(lpni);
1727                 }
1728         } else if (reset) {
1729                 lpni->lpni_ns_status = LNET_NI_STATUS_DOWN;
1730         }
1731
1732         /* recalculate aliveness */
1733         alive = lnet_is_peer_ni_alive(lpni);
1734
1735         lp = lpni->lpni_peer_net->lpn_peer;
1736         /* If this is an LNet router then update route aliveness */
1737         if (lp->lp_rtr_refcount) {
1738                 if (reset)
1739                         /* reset flag indicates gateway peer went up or down */
1740                         lp->lp_alive = alive;
1741
1742                 /* If discovery is disabled, locally or on the gateway, then
1743                  * any routes using lpni as next-hop need to be updated
1744                  *
1745                  * NB: We can get many notifications while a route is down, so
1746                  * we try and avoid the expensive net_lock/EX here for the
1747                  * common case of receiving duplicate lnet_notify() calls (i.e.
1748                  * only grab EX lock when we actually need to update the route
1749                  * aliveness).
1750                  */
1751                 if (lnet_is_discovery_disabled(lp)) {
1752                         list_for_each_entry(route, &lp->lp_routes, lr_gwlist) {
1753                                 if (nid_same(&route->lr_nid, &lpni->lpni_nid))
1754                                         lnet_set_route_aliveness(route, alive);
1755                         }
1756                 }
1757         }
1758
1759         lnet_net_unlock(0);
1760
1761         if (ni != NULL && !alive)
1762                 lnet_notify_peer_down(ni, &lpni->lpni_nid);
1763
1764         cpt = lpni->lpni_cpt;
1765         lnet_net_lock(cpt);
1766         lnet_peer_ni_decref_locked(lpni);
1767         lnet_net_unlock(cpt);
1768
1769         return 0;
1770 }
1771 EXPORT_SYMBOL(lnet_notify);