Whamcloud - gitweb
LU-506 FC15: ctl_name & strategy removed from ctl_table.
[fs/lustre-release.git] / lnet / lnet / router.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
5  *
6  *   This file is part of Portals
7  *   http://sourceforge.net/projects/sandiaportals/
8  *
9  *   Portals is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Portals is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Portals; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #define DEBUG_SUBSYSTEM S_LNET
25 #include <lnet/lib-lnet.h>
26
27 #if defined(__KERNEL__) && defined(LNET_ROUTER)
28
29 static char *forwarding = "";
30 CFS_MODULE_PARM(forwarding, "s", charp, 0444,
31                 "Explicitly enable/disable forwarding between networks");
32
33 static int tiny_router_buffers = 1024;
34 CFS_MODULE_PARM(tiny_router_buffers, "i", int, 0444,
35                 "# of 0 payload messages to buffer in the router");
36 static int small_router_buffers = 8192;
37 CFS_MODULE_PARM(small_router_buffers, "i", int, 0444,
38                 "# of small (1 page) messages to buffer in the router");
39 static int large_router_buffers = 512;
40 CFS_MODULE_PARM(large_router_buffers, "i", int, 0444,
41                 "# of large messages to buffer in the router");
42 static int peer_buffer_credits = 0;
43 CFS_MODULE_PARM(peer_buffer_credits, "i", int, 0444,
44                 "# router buffer credits per peer");
45
46 static int auto_down = 1;
47 CFS_MODULE_PARM(auto_down, "i", int, 0444,
48                 "Automatically mark peers down on comms error");
49
50 int
51 lnet_peer_buffer_credits(lnet_ni_t *ni)
52 {
53         /* NI option overrides LNet default */
54         if (ni->ni_peerrtrcredits > 0)
55                 return ni->ni_peerrtrcredits;
56         if (peer_buffer_credits > 0)
57                 return peer_buffer_credits;
58
59         /* As an approximation, allow this peer the same number of router
60          * buffers as it is allowed outstanding sends */
61         return ni->ni_peertxcredits;
62 }
63
64 /* forward ref's */
65 static int lnet_router_checker(void *);
66 #else
67
68 int
69 lnet_peer_buffer_credits(lnet_ni_t *ni)
70 {
71         return 0;
72 }
73
74 #endif
75
76 static int check_routers_before_use = 0;
77 CFS_MODULE_PARM(check_routers_before_use, "i", int, 0444,
78                 "Assume routers are down and ping them before use");
79
80 static int avoid_asym_router_failure = 0;
81 CFS_MODULE_PARM(avoid_asym_router_failure, "i", int, 0444,
82                 "Avoid asymmetrical failures: reserved, use at your own risk");
83
84 static int dead_router_check_interval = 0;
85 CFS_MODULE_PARM(dead_router_check_interval, "i", int, 0444,
86                 "Seconds between dead router health checks (<= 0 to disable)");
87
88 static int live_router_check_interval = 0;
89 CFS_MODULE_PARM(live_router_check_interval, "i", int, 0444,
90                 "Seconds between live router health checks (<= 0 to disable)");
91
92 static int router_ping_timeout = 50;
93 CFS_MODULE_PARM(router_ping_timeout, "i", int, 0444,
94                 "Seconds to wait for the reply to a router health query");
95
96 int
97 lnet_peers_start_down(void)
98 {
99         return check_routers_before_use;
100 }
101
102 void
103 lnet_notify_locked(lnet_peer_t *lp, int notifylnd, int alive, cfs_time_t when)
104 {
105         if (cfs_time_before(when, lp->lp_timestamp)) { /* out of date information */
106                 CDEBUG(D_NET, "Out of date\n");
107                 return;
108         }
109
110         lp->lp_timestamp = when;                /* update timestamp */
111         lp->lp_ping_deadline = 0;               /* disable ping timeout */
112
113         if (lp->lp_alive_count != 0 &&          /* got old news */
114             (!lp->lp_alive) == (!alive)) {      /* new date for old news */
115                 CDEBUG(D_NET, "Old news\n");
116                 return;
117         }
118
119         /* Flag that notification is outstanding */
120
121         lp->lp_alive_count++;
122         lp->lp_alive = !(!alive);               /* 1 bit! */
123         lp->lp_notify = 1;
124         lp->lp_notifylnd |= notifylnd;
125
126         CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lp_nid), alive);
127 }
128
129 void
130 lnet_do_notify (lnet_peer_t *lp)
131 {
132         lnet_ni_t *ni = lp->lp_ni;
133         int        alive;
134         int        notifylnd;
135
136         LNET_LOCK();
137
138         /* Notify only in 1 thread at any time to ensure ordered notification.
139          * NB individual events can be missed; the only guarantee is that you
140          * always get the most recent news */
141
142         if (lp->lp_notifying) {
143                 LNET_UNLOCK();
144                 return;
145         }
146
147         lp->lp_notifying = 1;
148
149         while (lp->lp_notify) {
150                 alive     = lp->lp_alive;
151                 notifylnd = lp->lp_notifylnd;
152
153                 lp->lp_notifylnd = 0;
154                 lp->lp_notify    = 0;
155
156                 if (notifylnd && ni->ni_lnd->lnd_notify != NULL) {
157                         LNET_UNLOCK();
158
159                         /* A new notification could happen now; I'll handle it
160                          * when control returns to me */
161
162                         (ni->ni_lnd->lnd_notify)(ni, lp->lp_nid, alive);
163
164                         LNET_LOCK();
165                 }
166         }
167
168         lp->lp_notifying = 0;
169
170         LNET_UNLOCK();
171 }
172
173
174 static void
175 lnet_rtr_addref_locked(lnet_peer_t *lp)
176 {
177         LASSERT (lp->lp_refcount > 0);
178         LASSERT (lp->lp_rtr_refcount >= 0);
179
180         lp->lp_rtr_refcount++;
181         if (lp->lp_rtr_refcount == 1) {
182                 cfs_list_t *pos;
183
184                 /* a simple insertion sort */
185                 cfs_list_for_each_prev(pos, &the_lnet.ln_routers) {
186                         lnet_peer_t *rtr = cfs_list_entry(pos, lnet_peer_t,
187                                                           lp_rtr_list);
188
189                         if (rtr->lp_nid < lp->lp_nid)
190                                 break;
191                 }
192
193                 cfs_list_add(&lp->lp_rtr_list, pos);
194                 /* addref for the_lnet.ln_routers */
195                 lnet_peer_addref_locked(lp);
196                 the_lnet.ln_routers_version++;
197         }
198 }
199
200 static void
201 lnet_rtr_decref_locked(lnet_peer_t *lp)
202 {
203         LASSERT (lp->lp_refcount > 0);
204         LASSERT (lp->lp_rtr_refcount > 0);
205
206         lp->lp_rtr_refcount--;
207         if (lp->lp_rtr_refcount == 0) {
208                 if (lp->lp_rcd != NULL) {
209                         cfs_list_add(&lp->lp_rcd->rcd_list,
210                                      &the_lnet.ln_zombie_rcd);
211                         lp->lp_rcd = NULL;
212                 }
213
214                 cfs_list_del(&lp->lp_rtr_list);
215                 /* decref for the_lnet.ln_routers */
216                 lnet_peer_decref_locked(lp);
217                 the_lnet.ln_routers_version++;
218         }
219 }
220
221 lnet_remotenet_t *
222 lnet_find_net_locked (__u32 net)
223 {
224         lnet_remotenet_t *rnet;
225         cfs_list_t       *tmp;
226
227         LASSERT (!the_lnet.ln_shutdown);
228
229         cfs_list_for_each (tmp, &the_lnet.ln_remote_nets) {
230                 rnet = cfs_list_entry(tmp, lnet_remotenet_t, lrn_list);
231
232                 if (rnet->lrn_net == net)
233                         return rnet;
234         }
235         return NULL;
236 }
237
238 /* NB expects LNET_LOCK held */
239 void
240 lnet_add_route_to_rnet (lnet_remotenet_t *rnet, lnet_route_t *route)
241 {
242         unsigned int      len = 0;
243         unsigned int      offset = 0;
244         cfs_list_t       *e;
245         extern __u64 lnet_create_interface_cookie(void);
246
247         cfs_list_for_each (e, &rnet->lrn_routes) {
248                 len++;
249         }
250
251         /* FIXME use Lustre random function when it's moved to libcfs.
252          * See bug 18751 */
253         /* len+1 positions to add a new entry, also prevents division by 0 */
254         offset = ((unsigned int) lnet_create_interface_cookie()) % (len + 1);
255         cfs_list_for_each (e, &rnet->lrn_routes) {
256                 if (offset == 0)
257                         break;
258                 offset--;
259         }
260         cfs_list_add(&route->lr_list, e);
261
262         the_lnet.ln_remote_nets_version++;
263         lnet_rtr_addref_locked(route->lr_gateway);
264 }
265
266 int
267 lnet_add_route (__u32 net, unsigned int hops, lnet_nid_t gateway)
268 {
269         cfs_list_t          *e;
270         lnet_remotenet_t    *rnet;
271         lnet_remotenet_t    *rnet2;
272         lnet_route_t        *route;
273         lnet_ni_t           *ni;
274         int                  add_route;
275         int                  rc;
276
277         CDEBUG(D_NET, "Add route: net %s hops %u gw %s\n",
278                libcfs_net2str(net), hops, libcfs_nid2str(gateway));
279
280         if (gateway == LNET_NID_ANY ||
281             LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
282             net == LNET_NIDNET(LNET_NID_ANY) ||
283             LNET_NETTYP(net) == LOLND ||
284             LNET_NIDNET(gateway) == net ||
285             hops < 1 || hops > 255)
286                 return (-EINVAL);
287
288         if (lnet_islocalnet(net))               /* it's a local network */
289                 return 0;                       /* ignore the route entry */
290
291         /* Assume net, route, all new */
292         LIBCFS_ALLOC(route, sizeof(*route));
293         LIBCFS_ALLOC(rnet, sizeof(*rnet));
294         if (route == NULL || rnet == NULL) {
295                 CERROR("Out of memory creating route %s %d %s\n",
296                        libcfs_net2str(net), hops, libcfs_nid2str(gateway));
297                 if (route != NULL)
298                         LIBCFS_FREE(route, sizeof(*route));
299                 if (rnet != NULL)
300                         LIBCFS_FREE(rnet, sizeof(*rnet));
301                 return -ENOMEM;
302         }
303
304         CFS_INIT_LIST_HEAD(&rnet->lrn_routes);
305         rnet->lrn_net = net;
306         route->lr_hops = hops;
307
308         LNET_LOCK();
309
310         rc = lnet_nid2peer_locked(&route->lr_gateway, gateway);
311         if (rc != 0) {
312                 LNET_UNLOCK();
313
314                 LIBCFS_FREE(route, sizeof(*route));
315                 LIBCFS_FREE(rnet, sizeof(*rnet));
316
317                 if (rc == -EHOSTUNREACH)        /* gateway is not on a local net */
318                         return 0;               /* ignore the route entry */
319
320                 CERROR("Error %d creating route %s %d %s\n", rc,
321                        libcfs_net2str(net), hops, libcfs_nid2str(gateway));
322                 return rc;
323         }
324
325         LASSERT (!the_lnet.ln_shutdown);
326
327         rnet2 = lnet_find_net_locked(net);
328         if (rnet2 == NULL) {
329                 /* new network */
330                 cfs_list_add_tail(&rnet->lrn_list, &the_lnet.ln_remote_nets);
331                 rnet2 = rnet;
332         }
333
334         /* Search for a duplicate route (it's a NOOP if it is) */
335         add_route = 1;
336         cfs_list_for_each (e, &rnet2->lrn_routes) {
337                 lnet_route_t *route2 = cfs_list_entry(e, lnet_route_t, lr_list);
338
339                 if (route2->lr_gateway == route->lr_gateway) {
340                         add_route = 0;
341                         break;
342                 }
343
344                 /* our lookups must be true */
345                 LASSERT (route2->lr_gateway->lp_nid != gateway);
346         }
347
348         if (add_route) {
349                 ni = route->lr_gateway->lp_ni;
350                 lnet_ni_addref_locked(ni);
351
352                 lnet_add_route_to_rnet(rnet2, route);
353                 LNET_UNLOCK();
354
355                 /* XXX Assume alive */
356                 if (ni->ni_lnd->lnd_notify != NULL)
357                         (ni->ni_lnd->lnd_notify)(ni, gateway, 1);
358
359                 lnet_ni_decref(ni);
360         } else {
361                 lnet_peer_decref_locked(route->lr_gateway);
362                 LNET_UNLOCK();
363                 LIBCFS_FREE(route, sizeof(*route));
364         }
365
366         if (rnet != rnet2)
367                 LIBCFS_FREE(rnet, sizeof(*rnet));
368
369         return 0;
370 }
371
372 int
373 lnet_check_routes (void)
374 {
375         lnet_remotenet_t    *rnet;
376         lnet_route_t        *route;
377         lnet_route_t        *route2;
378         cfs_list_t          *e1;
379         cfs_list_t          *e2;
380
381         LNET_LOCK();
382
383         cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
384                 rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
385
386                 route2 = NULL;
387                 cfs_list_for_each (e2, &rnet->lrn_routes) {
388                         route = cfs_list_entry(e2, lnet_route_t, lr_list);
389
390                         if (route2 == NULL)
391                                 route2 = route;
392                         else if (route->lr_gateway->lp_ni !=
393                                  route2->lr_gateway->lp_ni) {
394                                 LNET_UNLOCK();
395
396                                 CERROR("Routes to %s via %s and %s not supported\n",
397                                        libcfs_net2str(rnet->lrn_net),
398                                        libcfs_nid2str(route->lr_gateway->lp_nid),
399                                        libcfs_nid2str(route2->lr_gateway->lp_nid));
400                                 return -EINVAL;
401                         }
402                 }
403         }
404
405         LNET_UNLOCK();
406         return 0;
407 }
408
409 int
410 lnet_del_route (__u32 net, lnet_nid_t gw_nid)
411 {
412         lnet_remotenet_t    *rnet;
413         lnet_route_t        *route;
414         cfs_list_t          *e1;
415         cfs_list_t          *e2;
416         int                  rc = -ENOENT;
417
418         CDEBUG(D_NET, "Del route: net %s : gw %s\n",
419                libcfs_net2str(net), libcfs_nid2str(gw_nid));
420
421         /* NB Caller may specify either all routes via the given gateway
422          * or a specific route entry actual NIDs) */
423
424  again:
425         LNET_LOCK();
426
427         cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
428                 rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
429
430                 if (!(net == LNET_NIDNET(LNET_NID_ANY) ||
431                       net == rnet->lrn_net))
432                         continue;
433
434                 cfs_list_for_each (e2, &rnet->lrn_routes) {
435                         route = cfs_list_entry(e2, lnet_route_t, lr_list);
436
437                         if (!(gw_nid == LNET_NID_ANY ||
438                               gw_nid == route->lr_gateway->lp_nid))
439                                 continue;
440
441                         cfs_list_del(&route->lr_list);
442                         the_lnet.ln_remote_nets_version++;
443
444                         if (cfs_list_empty(&rnet->lrn_routes))
445                                 cfs_list_del(&rnet->lrn_list);
446                         else
447                                 rnet = NULL;
448
449                         lnet_rtr_decref_locked(route->lr_gateway);
450                         lnet_peer_decref_locked(route->lr_gateway);
451                         LNET_UNLOCK();
452
453                         LIBCFS_FREE(route, sizeof (*route));
454
455                         if (rnet != NULL)
456                                 LIBCFS_FREE(rnet, sizeof(*rnet));
457
458                         rc = 0;
459                         goto again;
460                 }
461         }
462
463         LNET_UNLOCK();
464         return rc;
465 }
466
467 void
468 lnet_destroy_routes (void)
469 {
470         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), LNET_NID_ANY);
471 }
472
473 int
474 lnet_get_route (int idx, __u32 *net, __u32 *hops,
475                lnet_nid_t *gateway, __u32 *alive)
476 {
477         cfs_list_t          *e1;
478         cfs_list_t          *e2;
479         lnet_remotenet_t    *rnet;
480         lnet_route_t        *route;
481
482         LNET_LOCK();
483
484         cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
485                 rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
486
487                 cfs_list_for_each (e2, &rnet->lrn_routes) {
488                         route = cfs_list_entry(e2, lnet_route_t, lr_list);
489
490                         if (idx-- == 0) {
491                                 *net     = rnet->lrn_net;
492                                 *hops    = route->lr_hops;
493                                 *gateway = route->lr_gateway->lp_nid;
494                                 *alive   = route->lr_gateway->lp_alive;
495                                 LNET_UNLOCK();
496                                 return 0;
497                         }
498                 }
499         }
500
501         LNET_UNLOCK();
502         return -ENOENT;
503 }
504
505 void
506 lnet_swap_pinginfo(lnet_ping_info_t *info)
507 {
508         int               i;
509         lnet_ni_status_t *stat;
510
511         __swab32s(&info->pi_version);
512         __swab32s(&info->pi_pid);
513         __swab32s(&info->pi_nnis);
514         for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
515                 stat = &info->pi_ni[i];
516                 __swab64s(&stat->ns_nid);
517                 __swab32s(&stat->ns_status);
518         }
519         return;
520 }
521
522 /* Returns # of down NIs, or negative error codes; ignore downed NIs
523  * if a NI in 'net' is up */
524 int
525 lnet_router_down_ni(lnet_peer_t *rtr, __u32 net)
526 {
527         int               i;
528         int               down = 0;
529         int               ptl_up = 0;
530         int               ptl_down = 0;
531         lnet_ping_info_t *info;
532
533         if (!avoid_asym_router_failure)
534                 return -ENOENT;
535
536         if (rtr->lp_rcd == NULL)
537                 return -EINVAL;
538
539         if (!rtr->lp_alive)
540                 return -EINVAL;  /* stale lp_rcd */
541
542         info = rtr->lp_rcd->rcd_pinginfo;
543         LASSERT (info != NULL);
544
545         /* NB always racing with network! */
546         if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC)) {
547                 lnet_swap_pinginfo(info);
548         } else if (info->pi_magic != LNET_PROTO_PING_MAGIC) {
549                 CNETERR("%s: Unexpected magic %08x\n",
550                         libcfs_nid2str(rtr->lp_nid), info->pi_magic);
551                 return -EPROTO;
552         }
553
554         if (info->pi_version == LNET_PROTO_PING_VERSION1)
555                 return -ENOENT;  /* v1 doesn't carry NI status info */
556
557         if (info->pi_version != LNET_PROTO_PING_VERSION) {
558                 CNETERR("%s: Unexpected version 0x%x\n",
559                         libcfs_nid2str(rtr->lp_nid), info->pi_version);
560                 return -EPROTO;
561         }
562
563         for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
564                 lnet_ni_status_t *stat = &info->pi_ni[i];
565                 lnet_nid_t        nid = stat->ns_nid;
566
567                 if (nid == LNET_NID_ANY) {
568                         CNETERR("%s: unexpected LNET_NID_ANY\n",
569                                 libcfs_nid2str(rtr->lp_nid));
570                         return -EPROTO;
571                 }
572
573                 if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
574                         continue;
575
576                 if (stat->ns_status == LNET_NI_STATUS_DOWN) {
577                         if (LNET_NETTYP(LNET_NIDNET(nid)) == PTLLND)
578                                 ptl_down = 1;
579                         else
580                                 down++;
581                         continue;
582                 }
583
584                 if (stat->ns_status != LNET_NI_STATUS_UP) {
585                         CNETERR("%s: Unexpected status 0x%x\n",
586                                 libcfs_nid2str(rtr->lp_nid), stat->ns_status);
587                         return -EPROTO;
588                 }
589
590                 /* ignore downed NIs if there's a NI up for dest network */
591                 if (LNET_NIDNET(nid) == net)
592                         return 0;
593
594                 if (LNET_NETTYP(LNET_NIDNET(nid)) == PTLLND)
595                         ptl_up = 1;
596         }
597
598         /* ptl NIs are considered down only when they're all down */
599         return down + (ptl_up ? 0 : ptl_down);
600 }
601
602 void
603 lnet_wait_known_routerstate(void)
604 {
605         lnet_peer_t         *rtr;
606         cfs_list_t          *entry;
607         int                  all_known;
608
609         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
610
611         for (;;) {
612                 LNET_LOCK();
613
614                 all_known = 1;
615                 cfs_list_for_each (entry, &the_lnet.ln_routers) {
616                         rtr = cfs_list_entry(entry, lnet_peer_t, lp_rtr_list);
617
618                         if (rtr->lp_alive_count == 0) {
619                                 all_known = 0;
620                                 break;
621                         }
622                 }
623
624                 LNET_UNLOCK();
625
626                 if (all_known)
627                         return;
628
629 #ifndef __KERNEL__
630                 lnet_router_checker();
631 #endif
632                 cfs_pause(cfs_time_seconds(1));
633         }
634 }
635
636 static void
637 lnet_router_checker_event (lnet_event_t *event)
638 {
639         /* CAVEAT EMPTOR: I'm called with LNET_LOCKed and I'm not allowed to
640          * drop it (that's how come I see _every_ event, even ones that would
641          * overflow my EQ) */
642         lnet_rc_data_t *rcd = event->md.user_ptr;
643         lnet_peer_t    *lp;
644         lnet_nid_t      nid;
645
646         if (event->unlinked) {
647                 if (rcd != NULL) {
648                         LNetInvalidateHandle(&rcd->rcd_mdh);
649                         return;
650                 }
651
652                 /* The router checker thread has unlinked the default rc_md
653                  * and exited. */
654                 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKING);
655                 the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKED;
656 #ifdef __KERNEL__
657                 cfs_mutex_up(&the_lnet.ln_rc_signal);
658 #endif
659                 return;
660         }
661
662         LASSERT (event->type == LNET_EVENT_SEND ||
663                  event->type == LNET_EVENT_REPLY);
664
665         nid = (event->type == LNET_EVENT_SEND) ?
666               event->target.nid : event->initiator.nid;
667
668         lp = lnet_find_peer_locked(nid);
669         if (lp == NULL) {
670                 /* router may have been removed */
671                 CDEBUG(D_NET, "Router %s not found\n", libcfs_nid2str(nid));
672                 return;
673         }
674
675         if (event->type == LNET_EVENT_SEND)     /* re-enable another ping */
676                 lp->lp_ping_notsent = 0;
677
678         if (lnet_isrouter(lp) &&                /* ignore if no longer a router */
679             (event->status != 0 ||
680              event->type == LNET_EVENT_REPLY)) {
681
682                 /* A successful REPLY means the router is up.  If _any_ comms
683                  * to the router fail I assume it's down (this will happen if
684                  * we ping alive routers to try to detect router death before
685                  * apps get burned). */
686
687                 lnet_notify_locked(lp, 1, (event->status == 0),
688                                    cfs_time_current());
689
690                 /* The router checker will wake up very shortly and do the
691                  * actual notification.  
692                  * XXX If 'lp' stops being a router before then, it will still
693                  * have the notification pending!!! */
694         }
695
696         /* This decref will NOT drop LNET_LOCK (it had to have 1 ref when it
697          * was in the peer table and I've not dropped the lock, so no-one else
698          * can have reduced the refcount) */
699         LASSERT(lp->lp_refcount > 1);
700
701         lnet_peer_decref_locked(lp);
702 }
703
704 void
705 lnet_update_ni_status(void)
706 {
707         cfs_time_t now = cfs_time_current();
708         lnet_ni_t *ni;
709         int        status;
710         int        timeout;
711
712         LASSERT (the_lnet.ln_routing);
713
714         timeout = router_ping_timeout +
715                   MAX(live_router_check_interval, dead_router_check_interval);
716
717         LNET_LOCK();
718
719         cfs_list_for_each_entry (ni, &the_lnet.ln_nis, ni_list) {
720                 lnet_ni_status_t *ns = ni->ni_status;
721
722                 LASSERT (ns != NULL);
723
724                 status = LNET_NI_STATUS_UP;
725                 if (ni->ni_lnd->lnd_type != LOLND &&  /* @lo forever alive */
726                     cfs_time_after(now, cfs_time_add(ni->ni_last_alive,
727                                                      cfs_time_seconds(timeout))))
728                         status = LNET_NI_STATUS_DOWN;
729
730                 if (ns->ns_status != status) {
731                         ns->ns_status = status;
732                         CDEBUG(D_NET, "NI(%s:%d) status changed to %s\n",
733                                libcfs_nid2str(ni->ni_nid), timeout,
734                                status == LNET_NI_STATUS_UP ? "up" : "down");
735                 }
736         }
737
738         LNET_UNLOCK();
739 }
740
741 void
742 lnet_destroy_rc_data (lnet_rc_data_t *rcd)
743 {
744         LASSERT (cfs_list_empty(&rcd->rcd_list));
745         /* detached from network */
746         LASSERT (LNetHandleIsInvalid(rcd->rcd_mdh));
747
748         LIBCFS_FREE(rcd->rcd_pinginfo, LNET_PINGINFO_SIZE);
749         LIBCFS_FREE(rcd, sizeof(*rcd));
750         return;
751 }
752
753 lnet_rc_data_t *
754 lnet_create_rc_data (void)
755 {
756         int               i;
757         int               rc;
758         lnet_ping_info_t *pi;
759         lnet_rc_data_t   *rcd;
760
761         LIBCFS_ALLOC(rcd, sizeof(*rcd));
762         if (rcd == NULL)
763                 return NULL;
764
765         LIBCFS_ALLOC(pi, LNET_PINGINFO_SIZE);
766         if (pi == NULL) {
767                 LIBCFS_FREE(rcd, sizeof(*rcd));
768                 return NULL;
769         }
770
771         memset(pi, 0, LNET_PINGINFO_SIZE);
772         for (i = 0; i < LNET_MAX_RTR_NIS; i++) {
773                 pi->pi_ni[i].ns_nid = LNET_NID_ANY;
774                 pi->pi_ni[i].ns_status = LNET_NI_STATUS_INVALID;
775         }
776         rcd->rcd_pinginfo = pi;
777         LNetInvalidateHandle(&rcd->rcd_mdh);
778         CFS_INIT_LIST_HEAD(&rcd->rcd_list);
779
780         LASSERT (!LNetHandleIsInvalid(the_lnet.ln_rc_eqh));
781         rc = LNetMDBind((lnet_md_t){.start     = pi,
782                                     .user_ptr  = rcd,
783                                     .length    = LNET_PINGINFO_SIZE,
784                                     .threshold = LNET_MD_THRESH_INF,
785                                     .options   = LNET_MD_TRUNCATE,
786                                     .eq_handle = the_lnet.ln_rc_eqh},
787                         LNET_UNLINK,
788                         &rcd->rcd_mdh);
789         if (rc < 0) {
790                 CERROR("Can't bind MD: %d\n", rc);
791                 lnet_destroy_rc_data(rcd);
792                 return NULL;
793         }
794         LASSERT (rc == 0);
795         return rcd;
796 }
797
798 static int
799 lnet_router_check_interval (lnet_peer_t *rtr)
800 {
801         int secs;
802
803         secs = rtr->lp_alive ? live_router_check_interval :
804                                dead_router_check_interval;
805         if (secs < 0)
806                 secs = 0;
807
808         return secs;
809 }
810
811 static void
812 lnet_ping_router_locked (lnet_peer_t *rtr)
813 {
814         int             newrcd = 0;
815         lnet_rc_data_t *rcd = NULL;
816         cfs_time_t      now = cfs_time_current();
817         int             secs;
818
819         lnet_peer_addref_locked(rtr);
820
821         if (rtr->lp_ping_deadline != 0 && /* ping timed out? */
822             cfs_time_after(now, rtr->lp_ping_deadline))
823                 lnet_notify_locked(rtr, 1, 0, now);
824
825         if (avoid_asym_router_failure && rtr->lp_rcd == NULL)
826                 newrcd = 1;
827
828         LNET_UNLOCK();
829
830         /* Run any outstanding notifications */
831         lnet_do_notify(rtr);
832
833         if (newrcd)
834                 rcd = lnet_create_rc_data();
835
836         LNET_LOCK();
837
838         if (!lnet_isrouter(rtr)) {
839                 lnet_peer_decref_locked(rtr);
840                 if (rcd != NULL)
841                         cfs_list_add(&rcd->rcd_list, &the_lnet.ln_zombie_rcd);
842                 return; /* router table changed! */
843         }
844
845         if (rcd != NULL) {
846                 LASSERT (rtr->lp_rcd == NULL);
847                 rtr->lp_rcd = rcd;
848         }
849
850         secs = lnet_router_check_interval(rtr);
851
852         CDEBUG(D_NET,
853                "rtr %s %d: deadline %lu ping_notsent %d alive %d "
854                "alive_count %d lp_ping_timestamp %lu\n",
855                libcfs_nid2str(rtr->lp_nid), secs,
856                rtr->lp_ping_deadline, rtr->lp_ping_notsent,
857                rtr->lp_alive, rtr->lp_alive_count, rtr->lp_ping_timestamp);
858
859         if (secs != 0 && !rtr->lp_ping_notsent &&
860             cfs_time_after(now, cfs_time_add(rtr->lp_ping_timestamp,
861                                              cfs_time_seconds(secs)))) {
862                 int               rc;
863                 lnet_process_id_t id;
864                 lnet_handle_md_t  mdh;
865
866                 id.nid = rtr->lp_nid;
867                 id.pid = LUSTRE_SRV_LNET_PID;
868                 CDEBUG(D_NET, "Check: %s\n", libcfs_id2str(id));
869
870                 rtr->lp_ping_notsent   = 1;
871                 rtr->lp_ping_timestamp = now;
872                 mdh = (rtr->lp_rcd == NULL) ? the_lnet.ln_rc_mdh :
873                                               rtr->lp_rcd->rcd_mdh;
874
875                 if (rtr->lp_ping_deadline == 0)
876                         rtr->lp_ping_deadline = cfs_time_shift(router_ping_timeout);
877
878                 LNET_UNLOCK();
879
880                 rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL,
881                              LNET_PROTO_PING_MATCHBITS, 0);
882
883                 LNET_LOCK();
884                 if (rc != 0)
885                         rtr->lp_ping_notsent = 0; /* no event pending */
886         }
887
888         lnet_peer_decref_locked(rtr);
889         return;
890 }
891
892 int
893 lnet_router_checker_start(void)
894 {
895         static lnet_ping_info_t pinginfo;
896
897         lnet_md_t    md;
898         int          rc;
899         int          eqsz;
900 #ifndef __KERNEL__
901         lnet_peer_t *rtr;
902         __u64        version;
903         int          nrtr = 0;
904         int          router_checker_max_eqsize = 10240;
905
906         LASSERT (check_routers_before_use);
907         LASSERT (dead_router_check_interval > 0);
908
909         LNET_LOCK();
910
911         /* As an approximation, allow each router the same number of
912          * outstanding events as it is allowed outstanding sends */
913         eqsz = 0;
914         version = the_lnet.ln_routers_version;
915         cfs_list_for_each_entry(rtr, &the_lnet.ln_routers, lp_rtr_list) {
916                 lnet_ni_t         *ni = rtr->lp_ni;
917                 lnet_process_id_t  id;
918
919                 nrtr++;
920                 eqsz += ni->ni_peertxcredits;
921
922                 /* one async ping reply per router */
923                 id.nid = rtr->lp_nid;
924                 id.pid = LUSTRE_SRV_LNET_PID;
925
926                 LNET_UNLOCK();
927
928                 rc = LNetSetAsync(id, 1);
929                 if (rc != 0) {
930                         CWARN("LNetSetAsync %s failed: %d\n",
931                               libcfs_id2str(id), rc);
932                         return rc;
933                 }
934
935                 LNET_LOCK();
936                 /* NB router list doesn't change in userspace */
937                 LASSERT (version == the_lnet.ln_routers_version);
938         }
939
940         LNET_UNLOCK();
941
942         if (nrtr == 0) {
943                 CDEBUG(D_NET,
944                        "No router found, not starting router checker\n");
945                 return 0;
946         }
947
948         /* at least allow a SENT and a REPLY per router */
949         if (router_checker_max_eqsize < 2 * nrtr)
950                 router_checker_max_eqsize = 2 * nrtr;
951
952         LASSERT (eqsz > 0);
953         if (eqsz > router_checker_max_eqsize)
954                 eqsz = router_checker_max_eqsize;
955 #endif
956
957         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
958
959         if (check_routers_before_use &&
960             dead_router_check_interval <= 0) {
961                 LCONSOLE_ERROR_MSG(0x10a, "'dead_router_check_interval' must be"
962                                    " set if 'check_routers_before_use' is set"
963                                    "\n");
964                 return -EINVAL;
965         }
966
967         if (!the_lnet.ln_routing &&
968             live_router_check_interval <= 0 &&
969             dead_router_check_interval <= 0)
970                 return 0;
971
972 #ifdef __KERNEL__
973         cfs_init_mutex_locked(&the_lnet.ln_rc_signal);
974         /* EQ size doesn't matter; the callback is guaranteed to get every
975          * event */
976         eqsz = 1;
977         rc = LNetEQAlloc(eqsz, lnet_router_checker_event,
978                          &the_lnet.ln_rc_eqh);
979 #else
980         rc = LNetEQAlloc(eqsz, LNET_EQ_HANDLER_NONE,
981                          &the_lnet.ln_rc_eqh);
982 #endif
983         if (rc != 0) {
984                 CERROR("Can't allocate EQ(%d): %d\n", eqsz, rc);
985                 return -ENOMEM;
986         }
987
988         memset(&md, 0, sizeof(md));
989         md.user_ptr  = NULL;
990         md.start     = &pinginfo;
991         md.length    = sizeof(pinginfo);
992         md.options   = LNET_MD_TRUNCATE;
993         md.threshold = LNET_MD_THRESH_INF;
994         md.eq_handle = the_lnet.ln_rc_eqh;
995         rc = LNetMDBind(md, LNET_UNLINK, &the_lnet.ln_rc_mdh);
996         if (rc < 0) {
997                 CERROR("Can't bind MD: %d\n", rc);
998                 rc = LNetEQFree(the_lnet.ln_rc_eqh);
999                 LASSERT (rc == 0);
1000                 return -ENOMEM;
1001         }
1002         LASSERT (rc == 0);
1003
1004         the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
1005 #ifdef __KERNEL__
1006         rc = cfs_create_thread(lnet_router_checker, NULL, 0);
1007         if (rc < 0) {
1008                 CERROR("Can't start router checker thread: %d\n", rc);
1009                 the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKING;
1010                 rc = LNetMDUnlink(the_lnet.ln_rc_mdh);
1011                 LASSERT (rc == 0);
1012                 /* block until event callback signals exit */
1013                 cfs_mutex_down(&the_lnet.ln_rc_signal);
1014                 rc = LNetEQFree(the_lnet.ln_rc_eqh);
1015                 LASSERT (rc == 0);
1016                 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1017                 return -ENOMEM;
1018         }
1019 #endif
1020
1021         if (check_routers_before_use) {
1022                 /* Note that a helpful side-effect of pinging all known routers
1023                  * at startup is that it makes them drop stale connections they
1024                  * may have to a previous instance of me. */
1025                 lnet_wait_known_routerstate();
1026         }
1027
1028         return 0;
1029 }
1030
1031 void
1032 lnet_router_checker_stop (void)
1033 {
1034         int rc;
1035
1036         if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
1037                 return;
1038
1039         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1040         the_lnet.ln_rc_state = LNET_RC_STATE_STOPTHREAD;
1041
1042 #ifdef __KERNEL__
1043         /* block until event callback signals exit */
1044         cfs_mutex_down(&the_lnet.ln_rc_signal);
1045 #else
1046         while (the_lnet.ln_rc_state != LNET_RC_STATE_UNLINKED) {
1047                 lnet_router_checker();
1048                 cfs_pause(cfs_time_seconds(1));
1049         }
1050 #endif
1051         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKED);
1052
1053         rc = LNetEQFree(the_lnet.ln_rc_eqh);
1054         LASSERT (rc == 0);
1055         the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1056         return;
1057 }
1058
1059 #if defined(__KERNEL__) && defined(LNET_ROUTER)
1060
1061 static void
1062 lnet_prune_zombie_rcd (int wait_unlink)
1063 {
1064         lnet_rc_data_t   *rcd;
1065         lnet_rc_data_t   *tmp;
1066         cfs_list_t        free_rcd;
1067         int               i;
1068         __u64             version;
1069
1070         CFS_INIT_LIST_HEAD(&free_rcd);
1071
1072         LNET_LOCK();
1073 rescan:
1074         version = the_lnet.ln_routers_version;
1075         cfs_list_for_each_entry_safe (rcd, tmp, &the_lnet.ln_zombie_rcd,
1076                                       rcd_list) {
1077                 if (LNetHandleIsInvalid(rcd->rcd_mdh)) {
1078                         cfs_list_del(&rcd->rcd_list);
1079                         cfs_list_add(&rcd->rcd_list, &free_rcd);
1080                         continue;
1081                 }
1082
1083                 LNET_UNLOCK();
1084
1085                 LNetMDUnlink(rcd->rcd_mdh);
1086
1087                 LNET_LOCK();
1088                 if (version != the_lnet.ln_routers_version)
1089                         goto rescan;
1090         }
1091
1092         i = 2;
1093         while (wait_unlink && !cfs_list_empty(&the_lnet.ln_zombie_rcd)) {
1094                 rcd = cfs_list_entry(the_lnet.ln_zombie_rcd.next,
1095                                      lnet_rc_data_t, rcd_list);
1096                 if (LNetHandleIsInvalid(rcd->rcd_mdh)) {
1097                         cfs_list_del(&rcd->rcd_list);
1098                         cfs_list_add(&rcd->rcd_list, &free_rcd);
1099                         continue;
1100                 }
1101
1102                 LNET_UNLOCK();
1103
1104                 LNetMDUnlink(rcd->rcd_mdh);
1105
1106                 i++;
1107                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
1108                        "Waiting for rc buffers to unlink\n");
1109                 cfs_pause(cfs_time_seconds(1));
1110
1111                 LNET_LOCK();
1112         }
1113
1114         LNET_UNLOCK();
1115
1116         while (!cfs_list_empty(&free_rcd)) {
1117                 rcd = cfs_list_entry(free_rcd.next, lnet_rc_data_t, rcd_list);
1118                 cfs_list_del_init(&rcd->rcd_list);
1119                 lnet_destroy_rc_data(rcd);
1120         }
1121         return;
1122 }
1123
1124 static int
1125 lnet_router_checker(void *arg)
1126 {
1127         int                rc;
1128         lnet_peer_t       *rtr;
1129         cfs_list_t        *entry;
1130         lnet_process_id_t  rtr_id;
1131
1132         cfs_daemonize("router_checker");
1133         cfs_block_allsigs();
1134
1135         rtr_id.pid = LUSTRE_SRV_LNET_PID;
1136
1137         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1138
1139         while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
1140                 __u64 version;
1141
1142                 LNET_LOCK();
1143 rescan:
1144                 version = the_lnet.ln_routers_version;
1145
1146                 cfs_list_for_each (entry, &the_lnet.ln_routers) {
1147                         rtr = cfs_list_entry(entry, lnet_peer_t, lp_rtr_list);
1148                         lnet_ping_router_locked(rtr);
1149
1150                         /* NB dropped lock */
1151                         if (version != the_lnet.ln_routers_version) {
1152                                 /* the routers list has changed */
1153                                 goto rescan;
1154                         }
1155                 }
1156
1157                 LNET_UNLOCK();
1158
1159                 if (the_lnet.ln_routing)
1160                         lnet_update_ni_status();
1161
1162                 lnet_prune_zombie_rcd(0); /* don't wait for UNLINK */
1163
1164                 /* Call cfs_pause() here always adds 1 to load average 
1165                  * because kernel counts # active tasks as nr_running 
1166                  * + nr_uninterruptible. */
1167                 cfs_schedule_timeout_and_set_state(CFS_TASK_INTERRUPTIBLE,
1168                                                    cfs_time_seconds(1));
1169         }
1170
1171         LNET_LOCK();
1172
1173         cfs_list_for_each (entry, &the_lnet.ln_routers) {
1174                 rtr = cfs_list_entry(entry, lnet_peer_t, lp_rtr_list);
1175
1176                 if (rtr->lp_rcd == NULL)
1177                         continue;
1178
1179                 LASSERT (cfs_list_empty(&rtr->lp_rcd->rcd_list));
1180                 cfs_list_add(&rtr->lp_rcd->rcd_list, &the_lnet.ln_zombie_rcd);
1181                 rtr->lp_rcd = NULL;
1182         }
1183
1184         LNET_UNLOCK();
1185
1186         lnet_prune_zombie_rcd(1); /* wait for UNLINK */
1187
1188         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_STOPTHREAD);
1189         the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKING;
1190
1191         rc = LNetMDUnlink(the_lnet.ln_rc_mdh);
1192         LASSERT (rc == 0);
1193
1194         /* The unlink event callback will signal final completion */
1195         return 0;
1196 }
1197
1198 void
1199 lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
1200 {
1201         int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
1202
1203         while (--npages >= 0)
1204                 cfs_free_page(rb->rb_kiov[npages].kiov_page);
1205
1206         LIBCFS_FREE(rb, sz);
1207 }
1208
1209 lnet_rtrbuf_t *
1210 lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp)
1211 {
1212         int            npages = rbp->rbp_npages;
1213         int            sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
1214         struct page   *page;
1215         lnet_rtrbuf_t *rb;
1216         int            i;
1217
1218         LIBCFS_ALLOC(rb, sz);
1219         if (rb == NULL)
1220                 return NULL;
1221
1222         rb->rb_pool = rbp;
1223
1224         for (i = 0; i < npages; i++) {
1225                 page = cfs_alloc_page(CFS_ALLOC_ZERO | CFS_ALLOC_STD);
1226                 if (page == NULL) {
1227                         while (--i >= 0)
1228                                 cfs_free_page(rb->rb_kiov[i].kiov_page);
1229
1230                         LIBCFS_FREE(rb, sz);
1231                         return NULL;
1232                 }
1233
1234                 rb->rb_kiov[i].kiov_len = CFS_PAGE_SIZE;
1235                 rb->rb_kiov[i].kiov_offset = 0;
1236                 rb->rb_kiov[i].kiov_page = page;
1237         }
1238
1239         return rb;
1240 }
1241
1242 void
1243 lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
1244 {
1245         int            npages = rbp->rbp_npages;
1246         int            nbuffers = 0;
1247         lnet_rtrbuf_t *rb;
1248
1249         LASSERT (cfs_list_empty(&rbp->rbp_msgs));
1250         LASSERT (rbp->rbp_credits == rbp->rbp_nbuffers);
1251
1252         while (!cfs_list_empty(&rbp->rbp_bufs)) {
1253                 LASSERT (rbp->rbp_credits > 0);
1254
1255                 rb = cfs_list_entry(rbp->rbp_bufs.next,
1256                                     lnet_rtrbuf_t, rb_list);
1257                 cfs_list_del(&rb->rb_list);
1258                 lnet_destroy_rtrbuf(rb, npages);
1259                 nbuffers++;
1260         }
1261
1262         LASSERT (rbp->rbp_nbuffers == nbuffers);
1263         LASSERT (rbp->rbp_credits == nbuffers);
1264
1265         rbp->rbp_nbuffers = rbp->rbp_credits = 0;
1266 }
1267
1268 int
1269 lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs)
1270 {
1271         lnet_rtrbuf_t *rb;
1272         int            i;
1273
1274         if (rbp->rbp_nbuffers != 0) {
1275                 LASSERT (rbp->rbp_nbuffers == nbufs);
1276                 return 0;
1277         }
1278
1279         for (i = 0; i < nbufs; i++) {
1280                 rb = lnet_new_rtrbuf(rbp);
1281
1282                 if (rb == NULL) {
1283                         CERROR("Failed to allocate %d router bufs of %d pages\n",
1284                                nbufs, rbp->rbp_npages);
1285                         return -ENOMEM;
1286                 }
1287
1288                 rbp->rbp_nbuffers++;
1289                 rbp->rbp_credits++;
1290                 rbp->rbp_mincredits++;
1291                 cfs_list_add(&rb->rb_list, &rbp->rbp_bufs);
1292
1293                 /* No allocation "under fire" */
1294                 /* Otherwise we'd need code to schedule blocked msgs etc */
1295                 LASSERT (!the_lnet.ln_routing);
1296         }
1297
1298         LASSERT (rbp->rbp_credits == nbufs);
1299         return 0;
1300 }
1301
1302 void
1303 lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
1304 {
1305         CFS_INIT_LIST_HEAD(&rbp->rbp_msgs);
1306         CFS_INIT_LIST_HEAD(&rbp->rbp_bufs);
1307
1308         rbp->rbp_npages = npages;
1309         rbp->rbp_credits = 0;
1310         rbp->rbp_mincredits = 0;
1311 }
1312
1313 void
1314 lnet_free_rtrpools(void)
1315 {
1316         lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[0]);
1317         lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[1]);
1318         lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[2]);
1319 }
1320
1321 void
1322 lnet_init_rtrpools(void)
1323 {
1324         int small_pages = 1;
1325         int large_pages = (LNET_MTU + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1326
1327         lnet_rtrpool_init(&the_lnet.ln_rtrpools[0], 0);
1328         lnet_rtrpool_init(&the_lnet.ln_rtrpools[1], small_pages);
1329         lnet_rtrpool_init(&the_lnet.ln_rtrpools[2], large_pages);
1330 }
1331
1332
1333 int
1334 lnet_alloc_rtrpools(int im_a_router)
1335 {
1336         int       rc;
1337
1338         if (!strcmp(forwarding, "")) {
1339                 /* not set either way */
1340                 if (!im_a_router)
1341                         return 0;
1342         } else if (!strcmp(forwarding, "disabled")) {
1343                 /* explicitly disabled */
1344                 return 0;
1345         } else if (!strcmp(forwarding, "enabled")) {
1346                 /* explicitly enabled */
1347         } else {
1348                 LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either "
1349                                    "'enabled' or 'disabled'\n");
1350                 return -EINVAL;
1351         }
1352
1353         if (tiny_router_buffers <= 0) {
1354                 LCONSOLE_ERROR_MSG(0x10c, "tiny_router_buffers=%d invalid when "
1355                                    "routing enabled\n", tiny_router_buffers);
1356                 rc = -EINVAL;
1357                 goto failed;
1358         }
1359
1360         rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[0],
1361                                      tiny_router_buffers);
1362         if (rc != 0)
1363                 goto failed;
1364
1365         if (small_router_buffers <= 0) {
1366                 LCONSOLE_ERROR_MSG(0x10d, "small_router_buffers=%d invalid when"
1367                                    " routing enabled\n", small_router_buffers);
1368                 rc = -EINVAL;
1369                 goto failed;
1370         }
1371
1372         rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[1],
1373                                      small_router_buffers);
1374         if (rc != 0)
1375                 goto failed;
1376
1377         if (large_router_buffers <= 0) {
1378                 LCONSOLE_ERROR_MSG(0x10e, "large_router_buffers=%d invalid when"
1379                                    " routing enabled\n", large_router_buffers);
1380                 rc = -EINVAL;
1381                 goto failed;
1382         }
1383
1384         rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[2],
1385                                      large_router_buffers);
1386         if (rc != 0)
1387                 goto failed;
1388
1389         LNET_LOCK();
1390         the_lnet.ln_routing = 1;
1391         LNET_UNLOCK();
1392
1393         return 0;
1394
1395  failed:
1396         lnet_free_rtrpools();
1397         return rc;
1398 }
1399
1400 int
1401 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
1402 {
1403         lnet_peer_t *lp = NULL;
1404         cfs_time_t   now = cfs_time_current();
1405
1406         LASSERT (!cfs_in_interrupt ());
1407
1408         CDEBUG (D_NET, "%s notifying %s: %s\n",
1409                 (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1410                 libcfs_nid2str(nid),
1411                 alive ? "up" : "down");
1412
1413         if (ni != NULL &&
1414             LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid)) {
1415                 CWARN ("Ignoring notification of %s %s by %s (different net)\n",
1416                         libcfs_nid2str(nid), alive ? "birth" : "death",
1417                         libcfs_nid2str(ni->ni_nid));
1418                 return -EINVAL;
1419         }
1420
1421         /* can't do predictions... */
1422         if (cfs_time_after(when, now)) {
1423                 CWARN ("Ignoring prediction from %s of %s %s "
1424                        "%ld seconds in the future\n",
1425                        (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1426                        libcfs_nid2str(nid), alive ? "up" : "down",
1427                        cfs_duration_sec(cfs_time_sub(when, now)));
1428                 return -EINVAL;
1429         }
1430
1431         if (ni != NULL && !alive &&             /* LND telling me she's down */
1432             !auto_down) {                       /* auto-down disabled */
1433                 CDEBUG(D_NET, "Auto-down disabled\n");
1434                 return 0;
1435         }
1436
1437         LNET_LOCK();
1438
1439         lp = lnet_find_peer_locked(nid);
1440         if (lp == NULL) {
1441                 /* nid not found */
1442                 LNET_UNLOCK();
1443                 CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
1444                 return 0;
1445         }
1446
1447         /* We can't fully trust LND on reporting exact peer last_alive
1448          * if he notifies us about dead peer. For example ksocklnd can
1449          * call us with when == _time_when_the_node_was_booted_ if
1450          * no connections were successfully established */
1451         if (ni != NULL && !alive && when < lp->lp_last_alive)
1452                 when = lp->lp_last_alive;
1453
1454         lnet_notify_locked(lp, ni == NULL, alive, when);
1455
1456         LNET_UNLOCK();
1457
1458         lnet_do_notify(lp);
1459
1460         LNET_LOCK();
1461
1462         lnet_peer_decref_locked(lp);
1463
1464         LNET_UNLOCK();
1465         return 0;
1466 }
1467 EXPORT_SYMBOL(lnet_notify);
1468
1469 void
1470 lnet_get_tunables (void)
1471 {
1472         return;
1473 }
1474
1475 #else
1476
1477 int
1478 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
1479 {
1480         return -EOPNOTSUPP;
1481 }
1482
1483 void
1484 lnet_router_checker (void)
1485 {
1486         static time_t last = 0;
1487         static int    running = 0;
1488
1489         time_t            now = cfs_time_current_sec();
1490         int               interval = now - last;
1491         int               rc;
1492         __u64             version;
1493         lnet_peer_t      *rtr;
1494
1495         /* It's no use to call me again within a sec - all intervals and
1496          * timeouts are measured in seconds */
1497         if (last != 0 && interval < 2)
1498                 return;
1499
1500         if (last != 0 &&
1501             interval > MAX(live_router_check_interval,
1502                            dead_router_check_interval))
1503                 CNETERR("Checker(%d/%d) not called for %d seconds\n",
1504                         live_router_check_interval, dead_router_check_interval,
1505                         interval);
1506
1507         LNET_LOCK();
1508         LASSERT (!running); /* recursion check */
1509         running = 1;
1510         LNET_UNLOCK();
1511
1512         last = now;
1513
1514         if (the_lnet.ln_rc_state == LNET_RC_STATE_STOPTHREAD) {
1515                 the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKING;
1516                 rc = LNetMDUnlink(the_lnet.ln_rc_mdh);
1517                 LASSERT (rc == 0);
1518         }
1519
1520         /* consume all pending events */
1521         while (1) {
1522                 int          i;
1523                 lnet_event_t ev;
1524
1525                 /* NB ln_rc_eqh must be the 1st in 'eventqs' otherwise the
1526                  * recursion breaker in LNetEQPoll would fail */
1527                 rc = LNetEQPoll(&the_lnet.ln_rc_eqh, 1, 0, &ev, &i);
1528                 if (rc == 0)   /* no event pending */
1529                         break;
1530
1531                 /* NB a lost SENT prevents me from pinging a router again */
1532                 if (rc == -EOVERFLOW) {
1533                         CERROR("Dropped an event!!!\n");
1534                         abort();
1535                 }
1536
1537                 LASSERT (rc == 1);
1538
1539                 LNET_LOCK();
1540                 lnet_router_checker_event(&ev);
1541                 LNET_UNLOCK();
1542         }
1543
1544         if (the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKED ||
1545             the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKING) {
1546                 running = 0;
1547                 return;
1548         }
1549
1550         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1551
1552         LNET_LOCK();
1553
1554         version = the_lnet.ln_routers_version;
1555         cfs_list_for_each_entry (rtr, &the_lnet.ln_routers, lp_rtr_list) {
1556                 lnet_ping_router_locked(rtr);
1557                 LASSERT (version == the_lnet.ln_routers_version);
1558         }
1559
1560         LNET_UNLOCK();
1561
1562         running = 0; /* lock only needed for the recursion check */
1563         return;
1564 }
1565
1566 /* NB lnet_peers_start_down depends on me,
1567  * so must be called before any peer creation */
1568 void
1569 lnet_get_tunables (void)
1570 {
1571         char *s;
1572
1573         s = getenv("LNET_ROUTER_PING_TIMEOUT");
1574         if (s != NULL) router_ping_timeout = atoi(s);
1575
1576         s = getenv("LNET_LIVE_ROUTER_CHECK_INTERVAL");
1577         if (s != NULL) live_router_check_interval = atoi(s);
1578
1579         s = getenv("LNET_DEAD_ROUTER_CHECK_INTERVAL");
1580         if (s != NULL) dead_router_check_interval = atoi(s);
1581
1582         /* This replaces old lnd_notify mechanism */
1583         check_routers_before_use = 1;
1584         if (dead_router_check_interval <= 0)
1585                 dead_router_check_interval = 30;
1586 }
1587
1588 void
1589 lnet_free_rtrpools (void)
1590 {
1591 }
1592
1593 void
1594 lnet_init_rtrpools (void)
1595 {
1596 }
1597
1598 int
1599 lnet_alloc_rtrpools (int im_a_arouter)
1600 {
1601         return 0;
1602 }
1603
1604 #endif