Whamcloud - gitweb
86d24ad921ff5d5b47aec73ab18592ef3eb0716d
[fs/lustre-release.git] / lnet / lnet / router.c
1 /*
2  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3  *
4  * Copyright (c) 2011, Whamcloud, Inc.
5  *
6  *   This file is part of Portals
7  *   http://sourceforge.net/projects/sandiaportals/
8  *
9  *   Portals is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Portals is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Portals; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #define DEBUG_SUBSYSTEM S_LNET
25 #include <lnet/lib-lnet.h>
26
27 #if defined(__KERNEL__) && defined(LNET_ROUTER)
28
29 static char *forwarding = "";
30 CFS_MODULE_PARM(forwarding, "s", charp, 0444,
31                 "Explicitly enable/disable forwarding between networks");
32
33 static int tiny_router_buffers = 1024;
34 CFS_MODULE_PARM(tiny_router_buffers, "i", int, 0444,
35                 "# of 0 payload messages to buffer in the router");
36 static int small_router_buffers = 8192;
37 CFS_MODULE_PARM(small_router_buffers, "i", int, 0444,
38                 "# of small (1 page) messages to buffer in the router");
39 static int large_router_buffers = 512;
40 CFS_MODULE_PARM(large_router_buffers, "i", int, 0444,
41                 "# of large messages to buffer in the router");
42 static int peer_buffer_credits = 0;
43 CFS_MODULE_PARM(peer_buffer_credits, "i", int, 0444,
44                 "# router buffer credits per peer");
45
46 static int auto_down = 1;
47 CFS_MODULE_PARM(auto_down, "i", int, 0444,
48                 "Automatically mark peers down on comms error");
49
50 int
51 lnet_peer_buffer_credits(lnet_ni_t *ni)
52 {
53         /* NI option overrides LNet default */
54         if (ni->ni_peerrtrcredits > 0)
55                 return ni->ni_peerrtrcredits;
56         if (peer_buffer_credits > 0)
57                 return peer_buffer_credits;
58
59         /* As an approximation, allow this peer the same number of router
60          * buffers as it is allowed outstanding sends */
61         return ni->ni_peertxcredits;
62 }
63
64 /* forward ref's */
65 static int lnet_router_checker(void *);
66 #else
67
68 int
69 lnet_peer_buffer_credits(lnet_ni_t *ni)
70 {
71         return 0;
72 }
73
74 #endif
75
76 static int check_routers_before_use = 0;
77 CFS_MODULE_PARM(check_routers_before_use, "i", int, 0444,
78                 "Assume routers are down and ping them before use");
79
80 static int avoid_asym_router_failure = 0;
81 CFS_MODULE_PARM(avoid_asym_router_failure, "i", int, 0444,
82                 "Avoid asymmetrical failures: reserved, use at your own risk");
83
84 static int dead_router_check_interval = 0;
85 CFS_MODULE_PARM(dead_router_check_interval, "i", int, 0444,
86                 "Seconds between dead router health checks (<= 0 to disable)");
87
88 static int live_router_check_interval = 0;
89 CFS_MODULE_PARM(live_router_check_interval, "i", int, 0444,
90                 "Seconds between live router health checks (<= 0 to disable)");
91
92 static int router_ping_timeout = 50;
93 CFS_MODULE_PARM(router_ping_timeout, "i", int, 0444,
94                 "Seconds to wait for the reply to a router health query");
95
96 int
97 lnet_peers_start_down(void)
98 {
99         return check_routers_before_use;
100 }
101
102 void
103 lnet_notify_locked(lnet_peer_t *lp, int notifylnd, int alive, cfs_time_t when)
104 {
105         if (cfs_time_before(when, lp->lp_timestamp)) { /* out of date information */
106                 CDEBUG(D_NET, "Out of date\n");
107                 return;
108         }
109
110         lp->lp_timestamp = when;                /* update timestamp */
111         lp->lp_ping_deadline = 0;               /* disable ping timeout */
112
113         if (lp->lp_alive_count != 0 &&          /* got old news */
114             (!lp->lp_alive) == (!alive)) {      /* new date for old news */
115                 CDEBUG(D_NET, "Old news\n");
116                 return;
117         }
118
119         /* Flag that notification is outstanding */
120
121         lp->lp_alive_count++;
122         lp->lp_alive = !(!alive);               /* 1 bit! */
123         lp->lp_notify = 1;
124         lp->lp_notifylnd |= notifylnd;
125         if (lp->lp_alive)
126                 lp->lp_ping_version = LNET_PROTO_PING_UNKNOWN; /* reset */
127
128         CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lp_nid), alive);
129 }
130
131 void
132 lnet_ni_notify_locked(lnet_ni_t *ni, lnet_peer_t *lp)
133 {
134         int        alive;
135         int        notifylnd;
136
137         /* Notify only in 1 thread at any time to ensure ordered notification.
138          * NB individual events can be missed; the only guarantee is that you
139          * always get the most recent news */
140
141         if (lp->lp_notifying)
142                 return;
143
144         lp->lp_notifying = 1;
145
146         while (lp->lp_notify) {
147                 alive     = lp->lp_alive;
148                 notifylnd = lp->lp_notifylnd;
149
150                 lp->lp_notifylnd = 0;
151                 lp->lp_notify    = 0;
152
153                 if (notifylnd && ni->ni_lnd->lnd_notify != NULL) {
154                         LNET_UNLOCK();
155
156                         /* A new notification could happen now; I'll handle it
157                          * when control returns to me */
158
159                         (ni->ni_lnd->lnd_notify)(ni, lp->lp_nid, alive);
160
161                         LNET_LOCK();
162                 }
163         }
164
165         lp->lp_notifying = 0;
166 }
167
168
169 static void
170 lnet_rtr_addref_locked(lnet_peer_t *lp)
171 {
172         LASSERT (lp->lp_refcount > 0);
173         LASSERT (lp->lp_rtr_refcount >= 0);
174
175         lp->lp_rtr_refcount++;
176         if (lp->lp_rtr_refcount == 1) {
177                 cfs_list_t *pos;
178
179                 /* a simple insertion sort */
180                 cfs_list_for_each_prev(pos, &the_lnet.ln_routers) {
181                         lnet_peer_t *rtr = cfs_list_entry(pos, lnet_peer_t,
182                                                           lp_rtr_list);
183
184                         if (rtr->lp_nid < lp->lp_nid)
185                                 break;
186                 }
187
188                 cfs_list_add(&lp->lp_rtr_list, pos);
189                 /* addref for the_lnet.ln_routers */
190                 lnet_peer_addref_locked(lp);
191                 the_lnet.ln_routers_version++;
192         }
193 }
194
195 static void
196 lnet_rtr_decref_locked(lnet_peer_t *lp)
197 {
198         LASSERT (lp->lp_refcount > 0);
199         LASSERT (lp->lp_rtr_refcount > 0);
200
201         lp->lp_rtr_refcount--;
202         if (lp->lp_rtr_refcount == 0) {
203                 LASSERT(cfs_list_empty(&lp->lp_routes));
204
205                 if (lp->lp_rcd != NULL) {
206                         cfs_list_add(&lp->lp_rcd->rcd_list,
207                                      &the_lnet.ln_rcd_deathrow);
208                         lp->lp_rcd = NULL;
209                 }
210
211                 cfs_list_del(&lp->lp_rtr_list);
212                 /* decref for the_lnet.ln_routers */
213                 lnet_peer_decref_locked(lp);
214                 the_lnet.ln_routers_version++;
215         }
216 }
217
218 lnet_remotenet_t *
219 lnet_find_net_locked (__u32 net)
220 {
221         lnet_remotenet_t *rnet;
222         cfs_list_t       *tmp;
223
224         LASSERT (!the_lnet.ln_shutdown);
225
226         cfs_list_for_each (tmp, &the_lnet.ln_remote_nets) {
227                 rnet = cfs_list_entry(tmp, lnet_remotenet_t, lrn_list);
228
229                 if (rnet->lrn_net == net)
230                         return rnet;
231         }
232         return NULL;
233 }
234
235 static void lnet_shuffle_seed(void)
236 {
237         static int seeded = 0;
238         int lnd_type, seed[2];
239         struct timeval tv;
240         lnet_ni_t *ni;
241         cfs_list_t *tmp;
242
243         if (seeded)
244                 return;
245
246         cfs_get_random_bytes(seed, sizeof(seed));
247
248         /* Nodes with small feet have little entropy
249          * the NID for this node gives the most entropy in the low bits */
250         cfs_list_for_each(tmp, &the_lnet.ln_nis) {
251                 ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
252                 lnd_type = LNET_NETTYP(LNET_NIDNET(ni->ni_nid));
253
254                 if (lnd_type != LOLND)
255                         seed[0] ^= (LNET_NIDADDR(ni->ni_nid) | lnd_type);
256         }
257
258         cfs_gettimeofday(&tv);
259         cfs_srand(tv.tv_sec ^ seed[0], tv.tv_usec ^ seed[1]);
260         seeded = 1;
261         return;
262 }
263
264 /* NB expects LNET_LOCK held */
265 void
266 lnet_add_route_to_rnet (lnet_remotenet_t *rnet, lnet_route_t *route)
267 {
268         unsigned int      len = 0;
269         unsigned int      offset = 0;
270         cfs_list_t       *e;
271
272         lnet_shuffle_seed();
273
274         cfs_list_for_each (e, &rnet->lrn_routes) {
275                 len++;
276         }
277
278         /* len+1 positions to add a new entry, also prevents division by 0 */
279         offset = cfs_rand() % (len + 1);
280         cfs_list_for_each (e, &rnet->lrn_routes) {
281                 if (offset == 0)
282                         break;
283                 offset--;
284         }
285         cfs_list_add(&route->lr_list, e);
286         cfs_list_add(&route->lr_gwlist, &route->lr_gateway->lp_routes);
287
288         the_lnet.ln_remote_nets_version++;
289         lnet_rtr_addref_locked(route->lr_gateway);
290 }
291
292 int
293 lnet_add_route (__u32 net, unsigned int hops, lnet_nid_t gateway)
294 {
295         cfs_list_t          *e;
296         lnet_remotenet_t    *rnet;
297         lnet_remotenet_t    *rnet2;
298         lnet_route_t        *route;
299         lnet_ni_t           *ni;
300         int                  add_route;
301         int                  rc;
302
303         CDEBUG(D_NET, "Add route: net %s hops %u gw %s\n",
304                libcfs_net2str(net), hops, libcfs_nid2str(gateway));
305
306         if (gateway == LNET_NID_ANY ||
307             LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
308             net == LNET_NIDNET(LNET_NID_ANY) ||
309             LNET_NETTYP(net) == LOLND ||
310             LNET_NIDNET(gateway) == net ||
311             hops < 1 || hops > 255)
312                 return (-EINVAL);
313
314         if (lnet_islocalnet(net))               /* it's a local network */
315                 return 0;                       /* ignore the route entry */
316
317         /* Assume net, route, all new */
318         LIBCFS_ALLOC(route, sizeof(*route));
319         LIBCFS_ALLOC(rnet, sizeof(*rnet));
320         if (route == NULL || rnet == NULL) {
321                 CERROR("Out of memory creating route %s %d %s\n",
322                        libcfs_net2str(net), hops, libcfs_nid2str(gateway));
323                 if (route != NULL)
324                         LIBCFS_FREE(route, sizeof(*route));
325                 if (rnet != NULL)
326                         LIBCFS_FREE(rnet, sizeof(*rnet));
327                 return -ENOMEM;
328         }
329
330         CFS_INIT_LIST_HEAD(&rnet->lrn_routes);
331         rnet->lrn_net = net;
332         route->lr_hops = hops;
333         route->lr_net = net;
334
335         LNET_LOCK();
336
337         rc = lnet_nid2peer_locked(&route->lr_gateway, gateway);
338         if (rc != 0) {
339                 LNET_UNLOCK();
340
341                 LIBCFS_FREE(route, sizeof(*route));
342                 LIBCFS_FREE(rnet, sizeof(*rnet));
343
344                 if (rc == -EHOSTUNREACH) { /* gateway is not on a local net */
345                         return 0;               /* ignore the route entry */
346                 } else {
347                         CERROR("Error %d creating route %s %d %s\n", rc,
348                                libcfs_net2str(net), hops,
349                                libcfs_nid2str(gateway));
350                 }
351                 return rc;
352         }
353
354         LASSERT (!the_lnet.ln_shutdown);
355
356         rnet2 = lnet_find_net_locked(net);
357         if (rnet2 == NULL) {
358                 /* new network */
359                 cfs_list_add_tail(&rnet->lrn_list, &the_lnet.ln_remote_nets);
360                 rnet2 = rnet;
361         }
362
363         /* Search for a duplicate route (it's a NOOP if it is) */
364         add_route = 1;
365         cfs_list_for_each (e, &rnet2->lrn_routes) {
366                 lnet_route_t *route2 = cfs_list_entry(e, lnet_route_t, lr_list);
367
368                 if (route2->lr_gateway == route->lr_gateway) {
369                         add_route = 0;
370                         break;
371                 }
372
373                 /* our lookups must be true */
374                 LASSERT (route2->lr_gateway->lp_nid != gateway);
375         }
376
377         if (add_route) {
378                 lnet_peer_addref_locked(route->lr_gateway); /* +1 for notify */
379                 lnet_add_route_to_rnet(rnet2, route);
380
381                 ni = route->lr_gateway->lp_ni;
382                 LNET_UNLOCK();
383
384                 /* XXX Assume alive */
385                 if (ni->ni_lnd->lnd_notify != NULL)
386                         (ni->ni_lnd->lnd_notify)(ni, gateway, 1);
387
388                 LNET_LOCK();
389         }
390
391         /* -1 for notify or !add_route */
392         lnet_peer_decref_locked(route->lr_gateway);
393         LNET_UNLOCK();
394
395         if (!add_route)
396                 LIBCFS_FREE(route, sizeof(*route));
397
398         if (rnet != rnet2)
399                 LIBCFS_FREE(rnet, sizeof(*rnet));
400
401         return 0;
402 }
403
404 int
405 lnet_check_routes (void)
406 {
407         lnet_remotenet_t    *rnet;
408         lnet_route_t        *route;
409         lnet_route_t        *route2;
410         cfs_list_t          *e1;
411         cfs_list_t          *e2;
412
413         LNET_LOCK();
414
415         cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
416                 rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
417
418                 route2 = NULL;
419                 cfs_list_for_each (e2, &rnet->lrn_routes) {
420                         lnet_nid_t      nid1;
421                         lnet_nid_t      nid2;
422                         int             net;
423
424                         route = cfs_list_entry(e2, lnet_route_t, lr_list);
425
426                         if (route2 == NULL) {
427                                 route2 = route;
428                                 continue;
429                         }
430
431                         if (route->lr_gateway->lp_ni ==
432                             route2->lr_gateway->lp_ni)
433                                 continue;
434
435                         nid1 = route->lr_gateway->lp_nid;
436                         nid2 = route2->lr_gateway->lp_nid;
437                         net = rnet->lrn_net;
438
439                         LNET_UNLOCK();
440
441                         CERROR("Routes to %s via %s and %s not supported\n",
442                                libcfs_net2str(net), libcfs_nid2str(nid1),
443                                libcfs_nid2str(nid2));
444                         return -EINVAL;
445                 }
446         }
447
448         LNET_UNLOCK();
449         return 0;
450 }
451
452 int
453 lnet_del_route (__u32 net, lnet_nid_t gw_nid)
454 {
455         struct lnet_peer        *gateway;
456         lnet_remotenet_t    *rnet;
457         lnet_route_t        *route;
458         cfs_list_t          *e1;
459         cfs_list_t          *e2;
460         int                  rc = -ENOENT;
461
462         CDEBUG(D_NET, "Del route: net %s : gw %s\n",
463                libcfs_net2str(net), libcfs_nid2str(gw_nid));
464
465         /* NB Caller may specify either all routes via the given gateway
466          * or a specific route entry actual NIDs) */
467
468  again:
469         LNET_LOCK();
470
471         cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
472                 rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
473
474                 if (!(net == LNET_NIDNET(LNET_NID_ANY) ||
475                       net == rnet->lrn_net))
476                         continue;
477
478                 cfs_list_for_each (e2, &rnet->lrn_routes) {
479                         route = cfs_list_entry(e2, lnet_route_t, lr_list);
480
481                         gateway = route->lr_gateway;
482                         if (!(gw_nid == LNET_NID_ANY ||
483                               gw_nid == gateway->lp_nid))
484                                 continue;
485
486                         cfs_list_del(&route->lr_list);
487                         cfs_list_del(&route->lr_gwlist);
488                         the_lnet.ln_remote_nets_version++;
489
490                         if (cfs_list_empty(&rnet->lrn_routes))
491                                 cfs_list_del(&rnet->lrn_list);
492                         else
493                                 rnet = NULL;
494
495                         lnet_rtr_decref_locked(gateway);
496                         lnet_peer_decref_locked(gateway);
497                         LNET_UNLOCK();
498
499                         LIBCFS_FREE(route, sizeof (*route));
500
501                         if (rnet != NULL)
502                                 LIBCFS_FREE(rnet, sizeof(*rnet));
503
504                         rc = 0;
505                         goto again;
506                 }
507         }
508
509         LNET_UNLOCK();
510         return rc;
511 }
512
513 void
514 lnet_destroy_routes (void)
515 {
516         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), LNET_NID_ANY);
517 }
518
519 int
520 lnet_get_route (int idx, __u32 *net, __u32 *hops,
521                lnet_nid_t *gateway, __u32 *alive)
522 {
523         cfs_list_t          *e1;
524         cfs_list_t          *e2;
525         lnet_remotenet_t    *rnet;
526         lnet_route_t        *route;
527
528         LNET_LOCK();
529
530         cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
531                 rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
532
533                 cfs_list_for_each (e2, &rnet->lrn_routes) {
534                         route = cfs_list_entry(e2, lnet_route_t, lr_list);
535
536                         if (idx-- == 0) {
537                                 *net     = rnet->lrn_net;
538                                 *hops    = route->lr_hops;
539                                 *gateway = route->lr_gateway->lp_nid;
540                                 *alive   = route->lr_gateway->lp_alive;
541                                 LNET_UNLOCK();
542                                 return 0;
543                         }
544                 }
545         }
546
547         LNET_UNLOCK();
548         return -ENOENT;
549 }
550
551 void
552 lnet_swap_pinginfo(lnet_ping_info_t *info)
553 {
554         int               i;
555         lnet_ni_status_t *stat;
556
557         __swab32s(&info->pi_magic);
558         __swab32s(&info->pi_version);
559         __swab32s(&info->pi_pid);
560         __swab32s(&info->pi_nnis);
561         for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
562                 stat = &info->pi_ni[i];
563                 __swab64s(&stat->ns_nid);
564                 __swab32s(&stat->ns_status);
565         }
566         return;
567 }
568
569 /**
570  * parse router-checker pinginfo, record number of down NIs for remote
571  * networks on that router.
572  */
573 static void
574 lnet_parse_rc_info(lnet_rc_data_t *rcd)
575 {
576         lnet_ping_info_t        *info = rcd->rcd_pinginfo;
577         struct lnet_peer        *gw   = rcd->rcd_gateway;
578         lnet_route_t            *rtr;
579
580         if (!gw->lp_alive)
581                 return;
582
583         if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
584                 lnet_swap_pinginfo(info);
585
586         /* NB always racing with network! */
587         if (info->pi_magic != LNET_PROTO_PING_MAGIC) {
588                 CDEBUG(D_NET, "%s: Unexpected magic %08x\n",
589                        libcfs_nid2str(gw->lp_nid), info->pi_magic);
590                 gw->lp_ping_version = LNET_PROTO_PING_UNKNOWN;
591                 return;
592         }
593
594         gw->lp_ping_version = info->pi_version;
595         if (gw->lp_ping_version == LNET_PROTO_PING_VERSION_1)
596                 return; /* v1 doesn't carry NI status info */
597
598         if (gw->lp_ping_version != LNET_PROTO_PING_VERSION) {
599                 CDEBUG(D_NET, "%s: Unexpected version 0x%x\n",
600                        libcfs_nid2str(gw->lp_nid), gw->lp_ping_version);
601                 gw->lp_ping_version = LNET_PROTO_PING_UNKNOWN;
602                 return;
603         }
604
605         cfs_list_for_each_entry(rtr, &gw->lp_routes, lr_gwlist) {
606                 int     ptl_status = LNET_NI_STATUS_INVALID;
607                 int     down = 0;
608                 int     up = 0;
609                 int     i;
610
611                 for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
612                         lnet_ni_status_t *stat = &info->pi_ni[i];
613                         lnet_nid_t       nid = stat->ns_nid;
614
615                         if (nid == LNET_NID_ANY) {
616                                 CDEBUG(D_NET, "%s: unexpected LNET_NID_ANY\n",
617                                        libcfs_nid2str(gw->lp_nid));
618                                 gw->lp_ping_version = LNET_PROTO_PING_UNKNOWN;
619                                 return;
620                         }
621
622                         if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
623                                 continue;
624
625                         if (stat->ns_status == LNET_NI_STATUS_DOWN) {
626                                 if (LNET_NETTYP(LNET_NIDNET(nid)) != PTLLND)
627                                         down++;
628                                 else if (ptl_status != LNET_NI_STATUS_UP)
629                                         ptl_status = LNET_NI_STATUS_DOWN;
630                                 continue;
631                         }
632
633                         if (stat->ns_status == LNET_NI_STATUS_UP) {
634                                 if (LNET_NIDNET(nid) == rtr->lr_net) {
635                                         up = 1;
636                                         break;
637                                 }
638                                 /* ptl NIs are considered down only when
639                                  * they're all down */
640                                 if (LNET_NETTYP(LNET_NIDNET(nid)) == PTLLND)
641                                         ptl_status = LNET_NI_STATUS_UP;
642                                 continue;
643                         }
644
645                         CDEBUG(D_NET, "%s: Unexpected status 0x%x\n",
646                                libcfs_nid2str(gw->lp_nid), stat->ns_status);
647                         gw->lp_ping_version = LNET_PROTO_PING_UNKNOWN;
648                         return;
649                 }
650
651                 if (up) { /* ignore downed NIs if NI for dest network is up */
652                         rtr->lr_downis = 0;
653                         continue;
654                 }
655                 rtr->lr_downis = down + (ptl_status == LNET_NI_STATUS_DOWN);
656         }
657 }
658
659 static void
660 lnet_router_checker_event(lnet_event_t *event)
661 {
662         /* CAVEAT EMPTOR: I'm called with lnet_res_locked */
663         lnet_rc_data_t          *rcd = event->md.user_ptr;
664         struct lnet_peer        *lp;
665
666         LASSERT(rcd != NULL);
667
668         if (event->unlinked) {
669                 LNetInvalidateHandle(&rcd->rcd_mdh);
670                 return;
671         }
672
673         LASSERT(event->type == LNET_EVENT_SEND ||
674                 event->type == LNET_EVENT_REPLY);
675
676         lp = rcd->rcd_gateway;
677         LASSERT(lp != NULL);
678
679         if (!lnet_isrouter(lp)) /* ignore if no longer a router */
680                 return;
681
682         if (event->type == LNET_EVENT_SEND) {
683                 lp->lp_ping_notsent = 0; /* NB: re-enable another ping */
684                 if (event->status == 0)
685                         return;
686         }
687
688         /* LNET_EVENT_REPLY */
689         /* A successful REPLY means the router is up.  If _any_ comms
690          * to the router fail I assume it's down (this will happen if
691          * we ping alive routers to try to detect router death before
692          * apps get burned). */
693
694         lnet_notify_locked(lp, 1, (event->status == 0), cfs_time_current());
695         /* The router checker will wake up very shortly and do the
696          * actual notification.
697          * XXX If 'lp' stops being a router before then, it will still
698          * have the notification pending!!! */
699
700         if (avoid_asym_router_failure && event->status == 0)
701                 lnet_parse_rc_info(rcd);
702 }
703
704 void
705 lnet_wait_known_routerstate(void)
706 {
707         lnet_peer_t         *rtr;
708         cfs_list_t          *entry;
709         int                  all_known;
710
711         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
712
713         for (;;) {
714                 LNET_LOCK();
715
716                 all_known = 1;
717                 cfs_list_for_each (entry, &the_lnet.ln_routers) {
718                         rtr = cfs_list_entry(entry, lnet_peer_t, lp_rtr_list);
719
720                         if (rtr->lp_alive_count == 0) {
721                                 all_known = 0;
722                                 break;
723                         }
724                 }
725
726                 LNET_UNLOCK();
727
728                 if (all_known)
729                         return;
730
731 #ifndef __KERNEL__
732                 lnet_router_checker();
733 #endif
734                 cfs_pause(cfs_time_seconds(1));
735         }
736 }
737
738 void
739 lnet_update_ni_status_locked(void)
740 {
741         lnet_ni_t       *ni;
742         long            now;
743         int             timeout;
744
745         LASSERT(the_lnet.ln_routing);
746
747         timeout = router_ping_timeout +
748                   MAX(live_router_check_interval, dead_router_check_interval);
749
750         now = cfs_time_current_sec();
751         cfs_list_for_each_entry(ni, &the_lnet.ln_nis, ni_list) {
752                 if (ni->ni_lnd->lnd_type == LOLND)
753                         continue;
754
755                 if (now < ni->ni_last_alive + timeout)
756                         continue;
757
758                 LASSERT(ni->ni_status != NULL);
759
760                 if (ni->ni_status->ns_status != LNET_NI_STATUS_DOWN) {
761                         CDEBUG(D_NET, "NI(%s:%d) status changed to down\n",
762                                libcfs_nid2str(ni->ni_nid), timeout);
763                         /* NB: so far, this is the only place to set
764                          * NI status to "down" */
765                         ni->ni_status->ns_status = LNET_NI_STATUS_DOWN;
766                 }
767         }
768 }
769
770 void
771 lnet_destroy_rc_data (lnet_rc_data_t *rcd)
772 {
773         LASSERT(cfs_list_empty(&rcd->rcd_list));
774         /* detached from network */
775         LASSERT(LNetHandleIsInvalid(rcd->rcd_mdh));
776
777         if (rcd->rcd_gateway != NULL) {
778                 LNET_LOCK();
779                 lnet_peer_decref_locked(rcd->rcd_gateway);
780                 LNET_UNLOCK();
781         }
782
783         if (rcd->rcd_pinginfo != NULL)
784                 LIBCFS_FREE(rcd->rcd_pinginfo, LNET_PINGINFO_SIZE);
785
786         LIBCFS_FREE(rcd, sizeof(*rcd));
787 }
788
789 lnet_rc_data_t *
790 lnet_create_rc_data_locked(lnet_peer_t *gateway)
791 {
792         lnet_rc_data_t          *rcd = NULL;
793         lnet_ping_info_t        *pi;
794         int                     rc;
795         int                     i;
796
797         LNET_UNLOCK();
798
799         LIBCFS_ALLOC(rcd, sizeof(*rcd));
800         if (rcd == NULL)
801                 goto out;
802
803         LNetInvalidateHandle(&rcd->rcd_mdh);
804         CFS_INIT_LIST_HEAD(&rcd->rcd_list);
805
806         LIBCFS_ALLOC(pi, LNET_PINGINFO_SIZE);
807         if (pi == NULL)
808                 goto out;
809
810         memset(pi, 0, LNET_PINGINFO_SIZE);
811         for (i = 0; i < LNET_MAX_RTR_NIS; i++) {
812                 pi->pi_ni[i].ns_nid = LNET_NID_ANY;
813                 pi->pi_ni[i].ns_status = LNET_NI_STATUS_INVALID;
814         }
815         rcd->rcd_pinginfo = pi;
816
817         LASSERT (!LNetHandleIsInvalid(the_lnet.ln_rc_eqh));
818         rc = LNetMDBind((lnet_md_t){.start     = pi,
819                                     .user_ptr  = rcd,
820                                     .length    = LNET_PINGINFO_SIZE,
821                                     .threshold = LNET_MD_THRESH_INF,
822                                     .options   = LNET_MD_TRUNCATE,
823                                     .eq_handle = the_lnet.ln_rc_eqh},
824                         LNET_UNLINK,
825                         &rcd->rcd_mdh);
826         if (rc < 0) {
827                 CERROR("Can't bind MD: %d\n", rc);
828                 goto out;
829         }
830         LASSERT(rc == 0);
831
832         LNET_LOCK();
833         /* router table changed or someone has created rcd for this gateway */
834         if (!lnet_isrouter(gateway) || gateway->lp_rcd != NULL) {
835                 LNET_UNLOCK();
836                 goto out;
837         }
838
839         lnet_peer_addref_locked(gateway);
840         rcd->rcd_gateway = gateway;
841         gateway->lp_rcd = rcd;
842         return rcd;
843
844  out:
845         if (rcd != NULL) {
846                 if (!LNetHandleIsInvalid(rcd->rcd_mdh)) {
847                         rc = LNetMDUnlink(rcd->rcd_mdh);
848                         LASSERT(rc == 0);
849                 }
850                 lnet_destroy_rc_data(rcd);
851         }
852
853         LNET_LOCK();
854         return gateway->lp_rcd;
855 }
856
857 static int
858 lnet_router_check_interval (lnet_peer_t *rtr)
859 {
860         int secs;
861
862         secs = rtr->lp_alive ? live_router_check_interval :
863                                dead_router_check_interval;
864         if (secs < 0)
865                 secs = 0;
866
867         return secs;
868 }
869
870 static void
871 lnet_ping_router_locked (lnet_peer_t *rtr)
872 {
873         lnet_rc_data_t *rcd = NULL;
874         cfs_time_t      now = cfs_time_current();
875         int             secs;
876
877         lnet_peer_addref_locked(rtr);
878
879         if (rtr->lp_ping_deadline != 0 && /* ping timed out? */
880             cfs_time_after(now, rtr->lp_ping_deadline))
881                 lnet_notify_locked(rtr, 1, 0, now);
882
883         /* Run any outstanding notifications */
884         lnet_ni_notify_locked(rtr->lp_ni, rtr);
885
886         if (!lnet_isrouter(rtr) ||
887             the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
888                 /* router table changed or router checker is shutting down */
889                 lnet_peer_decref_locked(rtr);
890                 return;
891         }
892
893         rcd = rtr->lp_rcd != NULL ?
894               rtr->lp_rcd : lnet_create_rc_data_locked(rtr);
895
896         if (rcd == NULL)
897                 return;
898
899         secs = lnet_router_check_interval(rtr);
900
901         CDEBUG(D_NET,
902                "rtr %s %d: deadline %lu ping_notsent %d alive %d "
903                "alive_count %d lp_ping_timestamp %lu\n",
904                libcfs_nid2str(rtr->lp_nid), secs,
905                rtr->lp_ping_deadline, rtr->lp_ping_notsent,
906                rtr->lp_alive, rtr->lp_alive_count, rtr->lp_ping_timestamp);
907
908         if (secs != 0 && !rtr->lp_ping_notsent &&
909             cfs_time_after(now, cfs_time_add(rtr->lp_ping_timestamp,
910                                              cfs_time_seconds(secs)))) {
911                 int               rc;
912                 lnet_process_id_t id;
913                 lnet_handle_md_t  mdh;
914
915                 id.nid = rtr->lp_nid;
916                 id.pid = LUSTRE_SRV_LNET_PID;
917                 CDEBUG(D_NET, "Check: %s\n", libcfs_id2str(id));
918
919                 rtr->lp_ping_notsent   = 1;
920                 rtr->lp_ping_timestamp = now;
921
922                 mdh = rcd->rcd_mdh;
923
924                 if (rtr->lp_ping_deadline == 0) {
925                         rtr->lp_ping_deadline = \
926                                 cfs_time_shift(router_ping_timeout);
927                 }
928
929                 LNET_UNLOCK();
930
931                 rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL,
932                              LNET_PROTO_PING_MATCHBITS, 0);
933
934                 LNET_LOCK();
935                 if (rc != 0)
936                         rtr->lp_ping_notsent = 0; /* no event pending */
937         }
938
939         lnet_peer_decref_locked(rtr);
940         return;
941 }
942
943 int
944 lnet_router_checker_start(void)
945 {
946         int          rc;
947         int          eqsz;
948 #ifndef __KERNEL__
949         lnet_peer_t *rtr;
950         __u64        version;
951         int          nrtr = 0;
952         int          router_checker_max_eqsize = 10240;
953
954         LASSERT (check_routers_before_use);
955         LASSERT (dead_router_check_interval > 0);
956
957         LNET_LOCK();
958
959         /* As an approximation, allow each router the same number of
960          * outstanding events as it is allowed outstanding sends */
961         eqsz = 0;
962         version = the_lnet.ln_routers_version;
963         cfs_list_for_each_entry(rtr, &the_lnet.ln_routers, lp_rtr_list) {
964                 lnet_ni_t         *ni = rtr->lp_ni;
965                 lnet_process_id_t  id;
966
967                 nrtr++;
968                 eqsz += ni->ni_peertxcredits;
969
970                 /* one async ping reply per router */
971                 id.nid = rtr->lp_nid;
972                 id.pid = LUSTRE_SRV_LNET_PID;
973
974                 LNET_UNLOCK();
975
976                 rc = LNetSetAsync(id, 1);
977                 if (rc != 0) {
978                         CWARN("LNetSetAsync %s failed: %d\n",
979                               libcfs_id2str(id), rc);
980                         return rc;
981                 }
982
983                 LNET_LOCK();
984                 /* NB router list doesn't change in userspace */
985                 LASSERT (version == the_lnet.ln_routers_version);
986         }
987
988         LNET_UNLOCK();
989
990         if (nrtr == 0) {
991                 CDEBUG(D_NET,
992                        "No router found, not starting router checker\n");
993                 return 0;
994         }
995
996         /* at least allow a SENT and a REPLY per router */
997         if (router_checker_max_eqsize < 2 * nrtr)
998                 router_checker_max_eqsize = 2 * nrtr;
999
1000         LASSERT (eqsz > 0);
1001         if (eqsz > router_checker_max_eqsize)
1002                 eqsz = router_checker_max_eqsize;
1003 #endif
1004
1005         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1006
1007         if (check_routers_before_use &&
1008             dead_router_check_interval <= 0) {
1009                 LCONSOLE_ERROR_MSG(0x10a, "'dead_router_check_interval' must be"
1010                                    " set if 'check_routers_before_use' is set"
1011                                    "\n");
1012                 return -EINVAL;
1013         }
1014
1015         if (!the_lnet.ln_routing &&
1016             live_router_check_interval <= 0 &&
1017             dead_router_check_interval <= 0)
1018                 return 0;
1019
1020 #ifdef __KERNEL__
1021         cfs_sema_init(&the_lnet.ln_rc_signal, 0);
1022         /* EQ size doesn't matter; the callback is guaranteed to get every
1023          * event */
1024         eqsz = 0;
1025         rc = LNetEQAlloc(eqsz, lnet_router_checker_event,
1026                          &the_lnet.ln_rc_eqh);
1027 #else
1028         rc = LNetEQAlloc(eqsz, LNET_EQ_HANDLER_NONE,
1029                          &the_lnet.ln_rc_eqh);
1030 #endif
1031         if (rc != 0) {
1032                 CERROR("Can't allocate EQ(%d): %d\n", eqsz, rc);
1033                 return -ENOMEM;
1034         }
1035
1036         the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
1037 #ifdef __KERNEL__
1038         rc = cfs_create_thread(lnet_router_checker, NULL, 0);
1039         if (rc < 0) {
1040                 CERROR("Can't start router checker thread: %d\n", rc);
1041                 /* block until event callback signals exit */
1042                 cfs_down(&the_lnet.ln_rc_signal);
1043                 rc = LNetEQFree(the_lnet.ln_rc_eqh);
1044                 LASSERT (rc == 0);
1045                 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1046                 return -ENOMEM;
1047         }
1048 #endif
1049
1050         if (check_routers_before_use) {
1051                 /* Note that a helpful side-effect of pinging all known routers
1052                  * at startup is that it makes them drop stale connections they
1053                  * may have to a previous instance of me. */
1054                 lnet_wait_known_routerstate();
1055         }
1056
1057         return 0;
1058 }
1059
1060 void
1061 lnet_router_checker_stop (void)
1062 {
1063         int rc;
1064
1065         if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
1066                 return;
1067
1068         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1069         the_lnet.ln_rc_state = LNET_RC_STATE_STOPPING;
1070
1071 #ifdef __KERNEL__
1072         /* block until event callback signals exit */
1073         cfs_down(&the_lnet.ln_rc_signal);
1074 #else
1075         lnet_router_checker();
1076 #endif
1077         LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1078
1079         rc = LNetEQFree(the_lnet.ln_rc_eqh);
1080         LASSERT (rc == 0);
1081         return;
1082 }
1083
1084 static void
1085 lnet_prune_rc_data(int wait_unlink)
1086 {
1087         lnet_rc_data_t          *rcd;
1088         lnet_rc_data_t          *tmp;
1089         lnet_peer_t             *lp;
1090         cfs_list_t              head;
1091         int                     i = 2;
1092
1093         if (likely(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
1094                    cfs_list_empty(&the_lnet.ln_rcd_deathrow) &&
1095                    cfs_list_empty(&the_lnet.ln_rcd_zombie)))
1096                 return;
1097
1098         CFS_INIT_LIST_HEAD(&head);
1099
1100         LNET_LOCK();
1101
1102         if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
1103                 /* router checker is stopping, prune all */
1104                 cfs_list_for_each_entry(lp, &the_lnet.ln_routers,
1105                                         lp_rtr_list) {
1106                         if (lp->lp_rcd == NULL)
1107                                 continue;
1108
1109                         LASSERT(cfs_list_empty(&lp->lp_rcd->rcd_list));
1110                         cfs_list_add(&lp->lp_rcd->rcd_list,
1111                                      &the_lnet.ln_rcd_deathrow);
1112                         lp->lp_rcd = NULL;
1113                 }
1114         }
1115
1116         /* unlink all RCDs on deathrow list */
1117         cfs_list_splice_init(&the_lnet.ln_rcd_deathrow, &head);
1118
1119         if (!cfs_list_empty(&head)) {
1120                 LNET_UNLOCK();
1121
1122                 cfs_list_for_each_entry(rcd, &head, rcd_list)
1123                         LNetMDUnlink(rcd->rcd_mdh);
1124
1125                 LNET_LOCK();
1126         }
1127
1128         cfs_list_splice_init(&head, &the_lnet.ln_rcd_zombie);
1129
1130         /* release all zombie RCDs */
1131         while (!cfs_list_empty(&the_lnet.ln_rcd_zombie)) {
1132                 cfs_list_for_each_entry_safe(rcd, tmp, &the_lnet.ln_rcd_zombie,
1133                                              rcd_list) {
1134                         if (!LNetHandleIsInvalid(rcd->rcd_mdh))
1135                                 cfs_list_move(&rcd->rcd_list, &head);
1136                 }
1137
1138                 wait_unlink = wait_unlink &&
1139                               !cfs_list_empty(&the_lnet.ln_rcd_zombie);
1140
1141                 LNET_UNLOCK();
1142
1143                 while (!cfs_list_empty(&head)) {
1144                         rcd = cfs_list_entry(head.next,
1145                                              lnet_rc_data_t, rcd_list);
1146                         cfs_list_del_init(&rcd->rcd_list);
1147                         lnet_destroy_rc_data(rcd);
1148                 }
1149
1150                 if (!wait_unlink)
1151                         break;
1152
1153                 i++;
1154                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
1155                        "Waiting for rc buffers to unlink\n");
1156                 cfs_pause(cfs_time_seconds(1) / 4);
1157
1158                 LNET_LOCK();
1159         }
1160 }
1161
1162
1163 #if defined(__KERNEL__) && defined(LNET_ROUTER)
1164
1165 static int
1166 lnet_router_checker(void *arg)
1167 {
1168         lnet_peer_t       *rtr;
1169         cfs_list_t        *entry;
1170
1171         cfs_daemonize("router_checker");
1172         cfs_block_allsigs();
1173
1174         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1175
1176         while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
1177                 __u64 version;
1178
1179                 LNET_LOCK();
1180 rescan:
1181                 version = the_lnet.ln_routers_version;
1182
1183                 cfs_list_for_each (entry, &the_lnet.ln_routers) {
1184                         rtr = cfs_list_entry(entry, lnet_peer_t, lp_rtr_list);
1185                         lnet_ping_router_locked(rtr);
1186
1187                         /* NB dropped lock */
1188                         if (version != the_lnet.ln_routers_version) {
1189                                 /* the routers list has changed */
1190                                 goto rescan;
1191                         }
1192                 }
1193
1194                 if (the_lnet.ln_routing)
1195                         lnet_update_ni_status_locked();
1196
1197                 LNET_UNLOCK();
1198
1199                 lnet_prune_rc_data(0); /* don't wait for UNLINK */
1200
1201                 /* Call cfs_pause() here always adds 1 to load average 
1202                  * because kernel counts # active tasks as nr_running 
1203                  * + nr_uninterruptible. */
1204                 cfs_schedule_timeout_and_set_state(CFS_TASK_INTERRUPTIBLE,
1205                                                    cfs_time_seconds(1));
1206         }
1207
1208         LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING);
1209
1210         lnet_prune_rc_data(1); /* wait for UNLINK */
1211
1212         the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1213         cfs_up(&the_lnet.ln_rc_signal);
1214         /* The unlink event callback will signal final completion */
1215         return 0;
1216 }
1217
1218 void
1219 lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
1220 {
1221         int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
1222
1223         while (--npages >= 0)
1224                 cfs_free_page(rb->rb_kiov[npages].kiov_page);
1225
1226         LIBCFS_FREE(rb, sz);
1227 }
1228
1229 lnet_rtrbuf_t *
1230 lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp)
1231 {
1232         int            npages = rbp->rbp_npages;
1233         int            sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
1234         struct page   *page;
1235         lnet_rtrbuf_t *rb;
1236         int            i;
1237
1238         LIBCFS_ALLOC(rb, sz);
1239         if (rb == NULL)
1240                 return NULL;
1241
1242         rb->rb_pool = rbp;
1243
1244         for (i = 0; i < npages; i++) {
1245                 page = cfs_alloc_page(CFS_ALLOC_ZERO | CFS_ALLOC_STD);
1246                 if (page == NULL) {
1247                         while (--i >= 0)
1248                                 cfs_free_page(rb->rb_kiov[i].kiov_page);
1249
1250                         LIBCFS_FREE(rb, sz);
1251                         return NULL;
1252                 }
1253
1254                 rb->rb_kiov[i].kiov_len = CFS_PAGE_SIZE;
1255                 rb->rb_kiov[i].kiov_offset = 0;
1256                 rb->rb_kiov[i].kiov_page = page;
1257         }
1258
1259         return rb;
1260 }
1261
1262 void
1263 lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
1264 {
1265         int            npages = rbp->rbp_npages;
1266         int            nbuffers = 0;
1267         lnet_rtrbuf_t *rb;
1268
1269         LASSERT (cfs_list_empty(&rbp->rbp_msgs));
1270         LASSERT (rbp->rbp_credits == rbp->rbp_nbuffers);
1271
1272         while (!cfs_list_empty(&rbp->rbp_bufs)) {
1273                 LASSERT (rbp->rbp_credits > 0);
1274
1275                 rb = cfs_list_entry(rbp->rbp_bufs.next,
1276                                     lnet_rtrbuf_t, rb_list);
1277                 cfs_list_del(&rb->rb_list);
1278                 lnet_destroy_rtrbuf(rb, npages);
1279                 nbuffers++;
1280         }
1281
1282         LASSERT (rbp->rbp_nbuffers == nbuffers);
1283         LASSERT (rbp->rbp_credits == nbuffers);
1284
1285         rbp->rbp_nbuffers = rbp->rbp_credits = 0;
1286 }
1287
1288 int
1289 lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs)
1290 {
1291         lnet_rtrbuf_t *rb;
1292         int            i;
1293
1294         if (rbp->rbp_nbuffers != 0) {
1295                 LASSERT (rbp->rbp_nbuffers == nbufs);
1296                 return 0;
1297         }
1298
1299         for (i = 0; i < nbufs; i++) {
1300                 rb = lnet_new_rtrbuf(rbp);
1301
1302                 if (rb == NULL) {
1303                         CERROR("Failed to allocate %d router bufs of %d pages\n",
1304                                nbufs, rbp->rbp_npages);
1305                         return -ENOMEM;
1306                 }
1307
1308                 rbp->rbp_nbuffers++;
1309                 rbp->rbp_credits++;
1310                 rbp->rbp_mincredits++;
1311                 cfs_list_add(&rb->rb_list, &rbp->rbp_bufs);
1312
1313                 /* No allocation "under fire" */
1314                 /* Otherwise we'd need code to schedule blocked msgs etc */
1315                 LASSERT (!the_lnet.ln_routing);
1316         }
1317
1318         LASSERT (rbp->rbp_credits == nbufs);
1319         return 0;
1320 }
1321
1322 void
1323 lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
1324 {
1325         CFS_INIT_LIST_HEAD(&rbp->rbp_msgs);
1326         CFS_INIT_LIST_HEAD(&rbp->rbp_bufs);
1327
1328         rbp->rbp_npages = npages;
1329         rbp->rbp_credits = 0;
1330         rbp->rbp_mincredits = 0;
1331 }
1332
1333 void
1334 lnet_free_rtrpools(void)
1335 {
1336         lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[0]);
1337         lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[1]);
1338         lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[2]);
1339 }
1340
1341 void
1342 lnet_init_rtrpools(void)
1343 {
1344         int small_pages = 1;
1345         int large_pages = (LNET_MTU + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1346
1347         lnet_rtrpool_init(&the_lnet.ln_rtrpools[0], 0);
1348         lnet_rtrpool_init(&the_lnet.ln_rtrpools[1], small_pages);
1349         lnet_rtrpool_init(&the_lnet.ln_rtrpools[2], large_pages);
1350 }
1351
1352
1353 int
1354 lnet_alloc_rtrpools(int im_a_router)
1355 {
1356         int       rc;
1357
1358         if (!strcmp(forwarding, "")) {
1359                 /* not set either way */
1360                 if (!im_a_router)
1361                         return 0;
1362         } else if (!strcmp(forwarding, "disabled")) {
1363                 /* explicitly disabled */
1364                 return 0;
1365         } else if (!strcmp(forwarding, "enabled")) {
1366                 /* explicitly enabled */
1367         } else {
1368                 LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either "
1369                                    "'enabled' or 'disabled'\n");
1370                 return -EINVAL;
1371         }
1372
1373         if (tiny_router_buffers <= 0) {
1374                 LCONSOLE_ERROR_MSG(0x10c, "tiny_router_buffers=%d invalid when "
1375                                    "routing enabled\n", tiny_router_buffers);
1376                 rc = -EINVAL;
1377                 goto failed;
1378         }
1379
1380         rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[0],
1381                                      tiny_router_buffers);
1382         if (rc != 0)
1383                 goto failed;
1384
1385         if (small_router_buffers <= 0) {
1386                 LCONSOLE_ERROR_MSG(0x10d, "small_router_buffers=%d invalid when"
1387                                    " routing enabled\n", small_router_buffers);
1388                 rc = -EINVAL;
1389                 goto failed;
1390         }
1391
1392         rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[1],
1393                                      small_router_buffers);
1394         if (rc != 0)
1395                 goto failed;
1396
1397         if (large_router_buffers <= 0) {
1398                 LCONSOLE_ERROR_MSG(0x10e, "large_router_buffers=%d invalid when"
1399                                    " routing enabled\n", large_router_buffers);
1400                 rc = -EINVAL;
1401                 goto failed;
1402         }
1403
1404         rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[2],
1405                                      large_router_buffers);
1406         if (rc != 0)
1407                 goto failed;
1408
1409         LNET_LOCK();
1410         the_lnet.ln_routing = 1;
1411         LNET_UNLOCK();
1412
1413         return 0;
1414
1415  failed:
1416         lnet_free_rtrpools();
1417         return rc;
1418 }
1419
1420 int
1421 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
1422 {
1423         lnet_peer_t *lp = NULL;
1424         cfs_time_t   now = cfs_time_current();
1425
1426         LASSERT (!cfs_in_interrupt ());
1427
1428         CDEBUG (D_NET, "%s notifying %s: %s\n",
1429                 (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1430                 libcfs_nid2str(nid),
1431                 alive ? "up" : "down");
1432
1433         if (ni != NULL &&
1434             LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid)) {
1435                 CWARN ("Ignoring notification of %s %s by %s (different net)\n",
1436                         libcfs_nid2str(nid), alive ? "birth" : "death",
1437                         libcfs_nid2str(ni->ni_nid));
1438                 return -EINVAL;
1439         }
1440
1441         /* can't do predictions... */
1442         if (cfs_time_after(when, now)) {
1443                 CWARN ("Ignoring prediction from %s of %s %s "
1444                        "%ld seconds in the future\n",
1445                        (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1446                        libcfs_nid2str(nid), alive ? "up" : "down",
1447                        cfs_duration_sec(cfs_time_sub(when, now)));
1448                 return -EINVAL;
1449         }
1450
1451         if (ni != NULL && !alive &&             /* LND telling me she's down */
1452             !auto_down) {                       /* auto-down disabled */
1453                 CDEBUG(D_NET, "Auto-down disabled\n");
1454                 return 0;
1455         }
1456
1457         LNET_LOCK();
1458
1459         lp = lnet_find_peer_locked(nid);
1460         if (lp == NULL) {
1461                 /* nid not found */
1462                 LNET_UNLOCK();
1463                 CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
1464                 return 0;
1465         }
1466
1467         /* We can't fully trust LND on reporting exact peer last_alive
1468          * if he notifies us about dead peer. For example ksocklnd can
1469          * call us with when == _time_when_the_node_was_booted_ if
1470          * no connections were successfully established */
1471         if (ni != NULL && !alive && when < lp->lp_last_alive)
1472                 when = lp->lp_last_alive;
1473
1474         lnet_notify_locked(lp, ni == NULL, alive, when);
1475
1476         lnet_ni_notify_locked(ni, lp);
1477
1478         lnet_peer_decref_locked(lp);
1479
1480         LNET_UNLOCK();
1481         return 0;
1482 }
1483 EXPORT_SYMBOL(lnet_notify);
1484
1485 void
1486 lnet_get_tunables (void)
1487 {
1488         return;
1489 }
1490
1491 #else
1492
1493 int
1494 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
1495 {
1496         return -EOPNOTSUPP;
1497 }
1498
1499 void
1500 lnet_router_checker (void)
1501 {
1502         static time_t last = 0;
1503         static int    running = 0;
1504
1505         time_t            now = cfs_time_current_sec();
1506         int               interval = now - last;
1507         int               rc;
1508         __u64             version;
1509         lnet_peer_t      *rtr;
1510
1511         /* It's no use to call me again within a sec - all intervals and
1512          * timeouts are measured in seconds */
1513         if (last != 0 && interval < 2)
1514                 return;
1515
1516         if (last != 0 &&
1517             interval > MAX(live_router_check_interval,
1518                            dead_router_check_interval))
1519                 CNETERR("Checker(%d/%d) not called for %d seconds\n",
1520                         live_router_check_interval, dead_router_check_interval,
1521                         interval);
1522
1523         LNET_LOCK();
1524         LASSERT (!running); /* recursion check */
1525         running = 1;
1526         LNET_UNLOCK();
1527
1528         last = now;
1529
1530         if (the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING)
1531                 lnet_prune_rc_data(0); /* unlink all rcd and nowait */
1532
1533         /* consume all pending events */
1534         while (1) {
1535                 int          i;
1536                 lnet_event_t ev;
1537
1538                 /* NB ln_rc_eqh must be the 1st in 'eventqs' otherwise the
1539                  * recursion breaker in LNetEQPoll would fail */
1540                 rc = LNetEQPoll(&the_lnet.ln_rc_eqh, 1, 0, &ev, &i);
1541                 if (rc == 0)   /* no event pending */
1542                         break;
1543
1544                 /* NB a lost SENT prevents me from pinging a router again */
1545                 if (rc == -EOVERFLOW) {
1546                         CERROR("Dropped an event!!!\n");
1547                         abort();
1548                 }
1549
1550                 LASSERT (rc == 1);
1551
1552                 LNET_LOCK();
1553                 lnet_router_checker_event(&ev);
1554                 LNET_UNLOCK();
1555         }
1556
1557         if (the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING) {
1558                 lnet_prune_rc_data(1); /* release rcd */
1559                 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1560                 running = 0;
1561                 return;
1562         }
1563
1564         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1565
1566         LNET_LOCK();
1567
1568         version = the_lnet.ln_routers_version;
1569         cfs_list_for_each_entry (rtr, &the_lnet.ln_routers, lp_rtr_list) {
1570                 lnet_ping_router_locked(rtr);
1571                 LASSERT (version == the_lnet.ln_routers_version);
1572         }
1573
1574         LNET_UNLOCK();
1575
1576         running = 0; /* lock only needed for the recursion check */
1577         return;
1578 }
1579
1580 /* NB lnet_peers_start_down depends on me,
1581  * so must be called before any peer creation */
1582 void
1583 lnet_get_tunables (void)
1584 {
1585         char *s;
1586
1587         s = getenv("LNET_ROUTER_PING_TIMEOUT");
1588         if (s != NULL) router_ping_timeout = atoi(s);
1589
1590         s = getenv("LNET_LIVE_ROUTER_CHECK_INTERVAL");
1591         if (s != NULL) live_router_check_interval = atoi(s);
1592
1593         s = getenv("LNET_DEAD_ROUTER_CHECK_INTERVAL");
1594         if (s != NULL) dead_router_check_interval = atoi(s);
1595
1596         /* This replaces old lnd_notify mechanism */
1597         check_routers_before_use = 1;
1598         if (dead_router_check_interval <= 0)
1599                 dead_router_check_interval = 30;
1600 }
1601
1602 void
1603 lnet_free_rtrpools (void)
1604 {
1605 }
1606
1607 void
1608 lnet_init_rtrpools (void)
1609 {
1610 }
1611
1612 int
1613 lnet_alloc_rtrpools (int im_a_arouter)
1614 {
1615         return 0;
1616 }
1617
1618 #endif