Whamcloud - gitweb
LU-56 lnet: Partitioned LNet resources (ME/MD/EQ)
[fs/lustre-release.git] / lnet / lnet / router.c
1 /*
2  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3  *
4  * Copyright (c) 2011, Whamcloud, Inc.
5  *
6  *   This file is part of Portals
7  *   http://sourceforge.net/projects/sandiaportals/
8  *
9  *   Portals is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Portals is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Portals; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #define DEBUG_SUBSYSTEM S_LNET
25 #include <lnet/lib-lnet.h>
26
27 #if defined(__KERNEL__) && defined(LNET_ROUTER)
28
29 static char *forwarding = "";
30 CFS_MODULE_PARM(forwarding, "s", charp, 0444,
31                 "Explicitly enable/disable forwarding between networks");
32
33 static int tiny_router_buffers = 1024;
34 CFS_MODULE_PARM(tiny_router_buffers, "i", int, 0444,
35                 "# of 0 payload messages to buffer in the router");
36 static int small_router_buffers = 8192;
37 CFS_MODULE_PARM(small_router_buffers, "i", int, 0444,
38                 "# of small (1 page) messages to buffer in the router");
39 static int large_router_buffers = 512;
40 CFS_MODULE_PARM(large_router_buffers, "i", int, 0444,
41                 "# of large messages to buffer in the router");
42 static int peer_buffer_credits = 0;
43 CFS_MODULE_PARM(peer_buffer_credits, "i", int, 0444,
44                 "# router buffer credits per peer");
45
46 static int auto_down = 1;
47 CFS_MODULE_PARM(auto_down, "i", int, 0444,
48                 "Automatically mark peers down on comms error");
49
50 int
51 lnet_peer_buffer_credits(lnet_ni_t *ni)
52 {
53         /* NI option overrides LNet default */
54         if (ni->ni_peerrtrcredits > 0)
55                 return ni->ni_peerrtrcredits;
56         if (peer_buffer_credits > 0)
57                 return peer_buffer_credits;
58
59         /* As an approximation, allow this peer the same number of router
60          * buffers as it is allowed outstanding sends */
61         return ni->ni_peertxcredits;
62 }
63
64 /* forward ref's */
65 static int lnet_router_checker(void *);
66 #else
67
68 int
69 lnet_peer_buffer_credits(lnet_ni_t *ni)
70 {
71         return 0;
72 }
73
74 #endif
75
76 static int check_routers_before_use = 0;
77 CFS_MODULE_PARM(check_routers_before_use, "i", int, 0444,
78                 "Assume routers are down and ping them before use");
79
80 static int avoid_asym_router_failure = 0;
81 CFS_MODULE_PARM(avoid_asym_router_failure, "i", int, 0444,
82                 "Avoid asymmetrical failures: reserved, use at your own risk");
83
84 static int dead_router_check_interval = 0;
85 CFS_MODULE_PARM(dead_router_check_interval, "i", int, 0444,
86                 "Seconds between dead router health checks (<= 0 to disable)");
87
88 static int live_router_check_interval = 0;
89 CFS_MODULE_PARM(live_router_check_interval, "i", int, 0444,
90                 "Seconds between live router health checks (<= 0 to disable)");
91
92 static int router_ping_timeout = 50;
93 CFS_MODULE_PARM(router_ping_timeout, "i", int, 0444,
94                 "Seconds to wait for the reply to a router health query");
95
96 int
97 lnet_peers_start_down(void)
98 {
99         return check_routers_before_use;
100 }
101
102 void
103 lnet_notify_locked(lnet_peer_t *lp, int notifylnd, int alive, cfs_time_t when)
104 {
105         if (cfs_time_before(when, lp->lp_timestamp)) { /* out of date information */
106                 CDEBUG(D_NET, "Out of date\n");
107                 return;
108         }
109
110         lp->lp_timestamp = when;                /* update timestamp */
111         lp->lp_ping_deadline = 0;               /* disable ping timeout */
112
113         if (lp->lp_alive_count != 0 &&          /* got old news */
114             (!lp->lp_alive) == (!alive)) {      /* new date for old news */
115                 CDEBUG(D_NET, "Old news\n");
116                 return;
117         }
118
119         /* Flag that notification is outstanding */
120
121         lp->lp_alive_count++;
122         lp->lp_alive = !(!alive);               /* 1 bit! */
123         lp->lp_notify = 1;
124         lp->lp_notifylnd |= notifylnd;
125         if (lp->lp_alive)
126                 lp->lp_ping_version = LNET_PROTO_PING_UNKNOWN; /* reset */
127
128         CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lp_nid), alive);
129 }
130
131 void
132 lnet_ni_notify_locked(lnet_ni_t *ni, lnet_peer_t *lp)
133 {
134         int        alive;
135         int        notifylnd;
136
137         /* Notify only in 1 thread at any time to ensure ordered notification.
138          * NB individual events can be missed; the only guarantee is that you
139          * always get the most recent news */
140
141         if (lp->lp_notifying)
142                 return;
143
144         lp->lp_notifying = 1;
145
146         while (lp->lp_notify) {
147                 alive     = lp->lp_alive;
148                 notifylnd = lp->lp_notifylnd;
149
150                 lp->lp_notifylnd = 0;
151                 lp->lp_notify    = 0;
152
153                 if (notifylnd && ni->ni_lnd->lnd_notify != NULL) {
154                         LNET_UNLOCK();
155
156                         /* A new notification could happen now; I'll handle it
157                          * when control returns to me */
158
159                         (ni->ni_lnd->lnd_notify)(ni, lp->lp_nid, alive);
160
161                         LNET_LOCK();
162                 }
163         }
164
165         lp->lp_notifying = 0;
166 }
167
168
169 static void
170 lnet_rtr_addref_locked(lnet_peer_t *lp)
171 {
172         LASSERT (lp->lp_refcount > 0);
173         LASSERT (lp->lp_rtr_refcount >= 0);
174
175         lp->lp_rtr_refcount++;
176         if (lp->lp_rtr_refcount == 1) {
177                 cfs_list_t *pos;
178
179                 /* a simple insertion sort */
180                 cfs_list_for_each_prev(pos, &the_lnet.ln_routers) {
181                         lnet_peer_t *rtr = cfs_list_entry(pos, lnet_peer_t,
182                                                           lp_rtr_list);
183
184                         if (rtr->lp_nid < lp->lp_nid)
185                                 break;
186                 }
187
188                 cfs_list_add(&lp->lp_rtr_list, pos);
189                 /* addref for the_lnet.ln_routers */
190                 lnet_peer_addref_locked(lp);
191                 the_lnet.ln_routers_version++;
192         }
193 }
194
195 static void
196 lnet_rtr_decref_locked(lnet_peer_t *lp)
197 {
198         LASSERT (lp->lp_refcount > 0);
199         LASSERT (lp->lp_rtr_refcount > 0);
200
201         lp->lp_rtr_refcount--;
202         if (lp->lp_rtr_refcount == 0) {
203                 LASSERT(cfs_list_empty(&lp->lp_routes));
204
205                 if (lp->lp_rcd != NULL) {
206                         cfs_list_add(&lp->lp_rcd->rcd_list,
207                                      &the_lnet.ln_rcd_deathrow);
208                         lp->lp_rcd = NULL;
209                 }
210
211                 cfs_list_del(&lp->lp_rtr_list);
212                 /* decref for the_lnet.ln_routers */
213                 lnet_peer_decref_locked(lp);
214                 the_lnet.ln_routers_version++;
215         }
216 }
217
218 lnet_remotenet_t *
219 lnet_find_net_locked (__u32 net)
220 {
221         lnet_remotenet_t *rnet;
222         cfs_list_t       *tmp;
223
224         LASSERT (!the_lnet.ln_shutdown);
225
226         cfs_list_for_each (tmp, &the_lnet.ln_remote_nets) {
227                 rnet = cfs_list_entry(tmp, lnet_remotenet_t, lrn_list);
228
229                 if (rnet->lrn_net == net)
230                         return rnet;
231         }
232         return NULL;
233 }
234
235 static void lnet_shuffle_seed(void)
236 {
237         static int seeded = 0;
238         int lnd_type, seed[2];
239         struct timeval tv;
240         lnet_ni_t *ni;
241         cfs_list_t *tmp;
242
243         if (seeded)
244                 return;
245
246         cfs_get_random_bytes(seed, sizeof(seed));
247
248         /* Nodes with small feet have little entropy
249          * the NID for this node gives the most entropy in the low bits */
250         cfs_list_for_each(tmp, &the_lnet.ln_nis) {
251                 ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
252                 lnd_type = LNET_NETTYP(LNET_NIDNET(ni->ni_nid));
253
254                 if (lnd_type != LOLND)
255                         seed[0] ^= (LNET_NIDADDR(ni->ni_nid) | lnd_type);
256         }
257
258         cfs_gettimeofday(&tv);
259         cfs_srand(tv.tv_sec ^ seed[0], tv.tv_usec ^ seed[1]);
260         seeded = 1;
261         return;
262 }
263
264 /* NB expects LNET_LOCK held */
265 void
266 lnet_add_route_to_rnet (lnet_remotenet_t *rnet, lnet_route_t *route)
267 {
268         unsigned int      len = 0;
269         unsigned int      offset = 0;
270         cfs_list_t       *e;
271
272         lnet_shuffle_seed();
273
274         cfs_list_for_each (e, &rnet->lrn_routes) {
275                 len++;
276         }
277
278         /* len+1 positions to add a new entry, also prevents division by 0 */
279         offset = cfs_rand() % (len + 1);
280         cfs_list_for_each (e, &rnet->lrn_routes) {
281                 if (offset == 0)
282                         break;
283                 offset--;
284         }
285         cfs_list_add(&route->lr_list, e);
286         cfs_list_add(&route->lr_gwlist, &route->lr_gateway->lp_routes);
287
288         the_lnet.ln_remote_nets_version++;
289         lnet_rtr_addref_locked(route->lr_gateway);
290 }
291
292 int
293 lnet_add_route (__u32 net, unsigned int hops, lnet_nid_t gateway)
294 {
295         cfs_list_t          *e;
296         lnet_remotenet_t    *rnet;
297         lnet_remotenet_t    *rnet2;
298         lnet_route_t        *route;
299         lnet_ni_t           *ni;
300         int                  add_route;
301         int                  rc;
302
303         CDEBUG(D_NET, "Add route: net %s hops %u gw %s\n",
304                libcfs_net2str(net), hops, libcfs_nid2str(gateway));
305
306         if (gateway == LNET_NID_ANY ||
307             LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
308             net == LNET_NIDNET(LNET_NID_ANY) ||
309             LNET_NETTYP(net) == LOLND ||
310             LNET_NIDNET(gateway) == net ||
311             hops < 1 || hops > 255)
312                 return (-EINVAL);
313
314         if (lnet_islocalnet(net))               /* it's a local network */
315                 return 0;                       /* ignore the route entry */
316
317         /* Assume net, route, all new */
318         LIBCFS_ALLOC(route, sizeof(*route));
319         LIBCFS_ALLOC(rnet, sizeof(*rnet));
320         if (route == NULL || rnet == NULL) {
321                 CERROR("Out of memory creating route %s %d %s\n",
322                        libcfs_net2str(net), hops, libcfs_nid2str(gateway));
323                 if (route != NULL)
324                         LIBCFS_FREE(route, sizeof(*route));
325                 if (rnet != NULL)
326                         LIBCFS_FREE(rnet, sizeof(*rnet));
327                 return -ENOMEM;
328         }
329
330         CFS_INIT_LIST_HEAD(&rnet->lrn_routes);
331         rnet->lrn_net = net;
332         route->lr_hops = hops;
333         route->lr_net = net;
334
335         LNET_LOCK();
336
337         rc = lnet_nid2peer_locked(&route->lr_gateway, gateway);
338         if (rc != 0) {
339                 LNET_UNLOCK();
340
341                 LIBCFS_FREE(route, sizeof(*route));
342                 LIBCFS_FREE(rnet, sizeof(*rnet));
343
344                 if (rc == -EHOSTUNREACH) { /* gateway is not on a local net */
345                         return 0;               /* ignore the route entry */
346                 } else {
347                         CERROR("Error %d creating route %s %d %s\n", rc,
348                                libcfs_net2str(net), hops,
349                                libcfs_nid2str(gateway));
350                 }
351                 return rc;
352         }
353
354         LASSERT (!the_lnet.ln_shutdown);
355
356         rnet2 = lnet_find_net_locked(net);
357         if (rnet2 == NULL) {
358                 /* new network */
359                 cfs_list_add_tail(&rnet->lrn_list, &the_lnet.ln_remote_nets);
360                 rnet2 = rnet;
361         }
362
363         /* Search for a duplicate route (it's a NOOP if it is) */
364         add_route = 1;
365         cfs_list_for_each (e, &rnet2->lrn_routes) {
366                 lnet_route_t *route2 = cfs_list_entry(e, lnet_route_t, lr_list);
367
368                 if (route2->lr_gateway == route->lr_gateway) {
369                         add_route = 0;
370                         break;
371                 }
372
373                 /* our lookups must be true */
374                 LASSERT (route2->lr_gateway->lp_nid != gateway);
375         }
376
377         if (add_route) {
378                 lnet_peer_addref_locked(route->lr_gateway); /* +1 for notify */
379                 lnet_add_route_to_rnet(rnet2, route);
380
381                 ni = route->lr_gateway->lp_ni;
382                 LNET_UNLOCK();
383
384                 /* XXX Assume alive */
385                 if (ni->ni_lnd->lnd_notify != NULL)
386                         (ni->ni_lnd->lnd_notify)(ni, gateway, 1);
387
388                 LNET_LOCK();
389         }
390
391         /* -1 for notify or !add_route */
392         lnet_peer_decref_locked(route->lr_gateway);
393         LNET_UNLOCK();
394
395         if (!add_route)
396                 LIBCFS_FREE(route, sizeof(*route));
397
398         if (rnet != rnet2)
399                 LIBCFS_FREE(rnet, sizeof(*rnet));
400
401         return 0;
402 }
403
404 int
405 lnet_check_routes (void)
406 {
407         lnet_remotenet_t    *rnet;
408         lnet_route_t        *route;
409         lnet_route_t        *route2;
410         cfs_list_t          *e1;
411         cfs_list_t          *e2;
412
413         LNET_LOCK();
414
415         cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
416                 rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
417
418                 route2 = NULL;
419                 cfs_list_for_each (e2, &rnet->lrn_routes) {
420                         lnet_nid_t      nid1;
421                         lnet_nid_t      nid2;
422                         int             net;
423
424                         route = cfs_list_entry(e2, lnet_route_t, lr_list);
425
426                         if (route2 == NULL) {
427                                 route2 = route;
428                                 continue;
429                         }
430
431                         if (route->lr_gateway->lp_ni ==
432                             route2->lr_gateway->lp_ni)
433                                 continue;
434
435                         nid1 = route->lr_gateway->lp_nid;
436                         nid2 = route2->lr_gateway->lp_nid;
437                         net = rnet->lrn_net;
438
439                         LNET_UNLOCK();
440
441                         CERROR("Routes to %s via %s and %s not supported\n",
442                                libcfs_net2str(net), libcfs_nid2str(nid1),
443                                libcfs_nid2str(nid2));
444                         return -EINVAL;
445                 }
446         }
447
448         LNET_UNLOCK();
449         return 0;
450 }
451
452 int
453 lnet_del_route (__u32 net, lnet_nid_t gw_nid)
454 {
455         struct lnet_peer        *gateway;
456         lnet_remotenet_t    *rnet;
457         lnet_route_t        *route;
458         cfs_list_t          *e1;
459         cfs_list_t          *e2;
460         int                  rc = -ENOENT;
461
462         CDEBUG(D_NET, "Del route: net %s : gw %s\n",
463                libcfs_net2str(net), libcfs_nid2str(gw_nid));
464
465         /* NB Caller may specify either all routes via the given gateway
466          * or a specific route entry actual NIDs) */
467
468  again:
469         LNET_LOCK();
470
471         cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
472                 rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
473
474                 if (!(net == LNET_NIDNET(LNET_NID_ANY) ||
475                       net == rnet->lrn_net))
476                         continue;
477
478                 cfs_list_for_each (e2, &rnet->lrn_routes) {
479                         route = cfs_list_entry(e2, lnet_route_t, lr_list);
480
481                         gateway = route->lr_gateway;
482                         if (!(gw_nid == LNET_NID_ANY ||
483                               gw_nid == gateway->lp_nid))
484                                 continue;
485
486                         cfs_list_del(&route->lr_list);
487                         cfs_list_del(&route->lr_gwlist);
488                         the_lnet.ln_remote_nets_version++;
489
490                         if (cfs_list_empty(&rnet->lrn_routes))
491                                 cfs_list_del(&rnet->lrn_list);
492                         else
493                                 rnet = NULL;
494
495                         lnet_rtr_decref_locked(gateway);
496                         lnet_peer_decref_locked(gateway);
497                         LNET_UNLOCK();
498
499                         LIBCFS_FREE(route, sizeof (*route));
500
501                         if (rnet != NULL)
502                                 LIBCFS_FREE(rnet, sizeof(*rnet));
503
504                         rc = 0;
505                         goto again;
506                 }
507         }
508
509         LNET_UNLOCK();
510         return rc;
511 }
512
513 void
514 lnet_destroy_routes (void)
515 {
516         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), LNET_NID_ANY);
517 }
518
519 int
520 lnet_get_route (int idx, __u32 *net, __u32 *hops,
521                lnet_nid_t *gateway, __u32 *alive)
522 {
523         cfs_list_t          *e1;
524         cfs_list_t          *e2;
525         lnet_remotenet_t    *rnet;
526         lnet_route_t        *route;
527
528         LNET_LOCK();
529
530         cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
531                 rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
532
533                 cfs_list_for_each (e2, &rnet->lrn_routes) {
534                         route = cfs_list_entry(e2, lnet_route_t, lr_list);
535
536                         if (idx-- == 0) {
537                                 *net     = rnet->lrn_net;
538                                 *hops    = route->lr_hops;
539                                 *gateway = route->lr_gateway->lp_nid;
540                                 *alive   = route->lr_gateway->lp_alive;
541                                 LNET_UNLOCK();
542                                 return 0;
543                         }
544                 }
545         }
546
547         LNET_UNLOCK();
548         return -ENOENT;
549 }
550
551 void
552 lnet_swap_pinginfo(lnet_ping_info_t *info)
553 {
554         int               i;
555         lnet_ni_status_t *stat;
556
557         __swab32s(&info->pi_magic);
558         __swab32s(&info->pi_version);
559         __swab32s(&info->pi_pid);
560         __swab32s(&info->pi_nnis);
561         for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
562                 stat = &info->pi_ni[i];
563                 __swab64s(&stat->ns_nid);
564                 __swab32s(&stat->ns_status);
565         }
566         return;
567 }
568
569 /**
570  * parse router-checker pinginfo, record number of down NIs for remote
571  * networks on that router.
572  */
573 static void
574 lnet_parse_rc_info(lnet_rc_data_t *rcd)
575 {
576         lnet_ping_info_t        *info = rcd->rcd_pinginfo;
577         struct lnet_peer        *gw   = rcd->rcd_gateway;
578         lnet_route_t            *rtr;
579
580         if (!gw->lp_alive)
581                 return;
582
583         if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
584                 lnet_swap_pinginfo(info);
585
586         /* NB always racing with network! */
587         if (info->pi_magic != LNET_PROTO_PING_MAGIC) {
588                 CDEBUG(D_NET, "%s: Unexpected magic %08x\n",
589                        libcfs_nid2str(gw->lp_nid), info->pi_magic);
590                 gw->lp_ping_version = LNET_PROTO_PING_UNKNOWN;
591                 return;
592         }
593
594         gw->lp_ping_version = info->pi_version;
595         if (gw->lp_ping_version == LNET_PROTO_PING_VERSION_1)
596                 return; /* v1 doesn't carry NI status info */
597
598         if (gw->lp_ping_version != LNET_PROTO_PING_VERSION) {
599                 CDEBUG(D_NET, "%s: Unexpected version 0x%x\n",
600                        libcfs_nid2str(gw->lp_nid), gw->lp_ping_version);
601                 gw->lp_ping_version = LNET_PROTO_PING_UNKNOWN;
602                 return;
603         }
604
605         cfs_list_for_each_entry(rtr, &gw->lp_routes, lr_gwlist) {
606                 int     ptl_status = LNET_NI_STATUS_INVALID;
607                 int     down = 0;
608                 int     up = 0;
609                 int     i;
610
611                 for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
612                         lnet_ni_status_t *stat = &info->pi_ni[i];
613                         lnet_nid_t       nid = stat->ns_nid;
614
615                         if (nid == LNET_NID_ANY) {
616                                 CDEBUG(D_NET, "%s: unexpected LNET_NID_ANY\n",
617                                        libcfs_nid2str(gw->lp_nid));
618                                 gw->lp_ping_version = LNET_PROTO_PING_UNKNOWN;
619                                 return;
620                         }
621
622                         if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
623                                 continue;
624
625                         if (stat->ns_status == LNET_NI_STATUS_DOWN) {
626                                 if (LNET_NETTYP(LNET_NIDNET(nid)) != PTLLND)
627                                         down++;
628                                 else if (ptl_status != LNET_NI_STATUS_UP)
629                                         ptl_status = LNET_NI_STATUS_DOWN;
630                                 continue;
631                         }
632
633                         if (stat->ns_status == LNET_NI_STATUS_UP) {
634                                 if (LNET_NIDNET(nid) == rtr->lr_net) {
635                                         up = 1;
636                                         break;
637                                 }
638                                 /* ptl NIs are considered down only when
639                                  * they're all down */
640                                 if (LNET_NETTYP(LNET_NIDNET(nid)) == PTLLND)
641                                         ptl_status = LNET_NI_STATUS_UP;
642                                 continue;
643                         }
644
645                         CDEBUG(D_NET, "%s: Unexpected status 0x%x\n",
646                                libcfs_nid2str(gw->lp_nid), stat->ns_status);
647                         gw->lp_ping_version = LNET_PROTO_PING_UNKNOWN;
648                         return;
649                 }
650
651                 if (up) { /* ignore downed NIs if NI for dest network is up */
652                         rtr->lr_downis = 0;
653                         continue;
654                 }
655                 rtr->lr_downis = down + (ptl_status == LNET_NI_STATUS_DOWN);
656         }
657 }
658
659 static void
660 lnet_router_checker_event(lnet_event_t *event)
661 {
662         lnet_rc_data_t          *rcd = event->md.user_ptr;
663         struct lnet_peer        *lp;
664
665         LASSERT(rcd != NULL);
666
667         if (event->unlinked) {
668                 LNetInvalidateHandle(&rcd->rcd_mdh);
669                 return;
670         }
671
672         LASSERT(event->type == LNET_EVENT_SEND ||
673                 event->type == LNET_EVENT_REPLY);
674
675         lp = rcd->rcd_gateway;
676         LASSERT(lp != NULL);
677
678         LNET_LOCK();
679         if (!lnet_isrouter(lp) || lp->lp_rcd != rcd) {
680                 /* ignore if no longer a router or rcd is replaced */
681                 goto out;
682         }
683
684         if (event->type == LNET_EVENT_SEND) {
685                 lp->lp_ping_notsent = 0;
686                 if (event->status == 0)
687                         goto out;
688         }
689
690         /* LNET_EVENT_REPLY */
691         /* A successful REPLY means the router is up.  If _any_ comms
692          * to the router fail I assume it's down (this will happen if
693          * we ping alive routers to try to detect router death before
694          * apps get burned). */
695
696         lnet_notify_locked(lp, 1, (event->status == 0), cfs_time_current());
697         /* The router checker will wake up very shortly and do the
698          * actual notification.
699          * XXX If 'lp' stops being a router before then, it will still
700          * have the notification pending!!! */
701
702         if (avoid_asym_router_failure && event->status == 0)
703                 lnet_parse_rc_info(rcd);
704
705  out:
706         LNET_UNLOCK();
707 }
708
709 void
710 lnet_wait_known_routerstate(void)
711 {
712         lnet_peer_t         *rtr;
713         cfs_list_t          *entry;
714         int                  all_known;
715
716         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
717
718         for (;;) {
719                 LNET_LOCK();
720
721                 all_known = 1;
722                 cfs_list_for_each (entry, &the_lnet.ln_routers) {
723                         rtr = cfs_list_entry(entry, lnet_peer_t, lp_rtr_list);
724
725                         if (rtr->lp_alive_count == 0) {
726                                 all_known = 0;
727                                 break;
728                         }
729                 }
730
731                 LNET_UNLOCK();
732
733                 if (all_known)
734                         return;
735
736 #ifndef __KERNEL__
737                 lnet_router_checker();
738 #endif
739                 cfs_pause(cfs_time_seconds(1));
740         }
741 }
742
743 void
744 lnet_update_ni_status_locked(void)
745 {
746         lnet_ni_t       *ni;
747         long            now;
748         int             timeout;
749
750         LASSERT(the_lnet.ln_routing);
751
752         timeout = router_ping_timeout +
753                   MAX(live_router_check_interval, dead_router_check_interval);
754
755         now = cfs_time_current_sec();
756         cfs_list_for_each_entry(ni, &the_lnet.ln_nis, ni_list) {
757                 if (ni->ni_lnd->lnd_type == LOLND)
758                         continue;
759
760                 if (now < ni->ni_last_alive + timeout)
761                         continue;
762
763                 LASSERT(ni->ni_status != NULL);
764
765                 if (ni->ni_status->ns_status != LNET_NI_STATUS_DOWN) {
766                         CDEBUG(D_NET, "NI(%s:%d) status changed to down\n",
767                                libcfs_nid2str(ni->ni_nid), timeout);
768                         /* NB: so far, this is the only place to set
769                          * NI status to "down" */
770                         ni->ni_status->ns_status = LNET_NI_STATUS_DOWN;
771                 }
772         }
773 }
774
775 void
776 lnet_destroy_rc_data (lnet_rc_data_t *rcd)
777 {
778         LASSERT(cfs_list_empty(&rcd->rcd_list));
779         /* detached from network */
780         LASSERT(LNetHandleIsInvalid(rcd->rcd_mdh));
781
782         if (rcd->rcd_gateway != NULL) {
783                 LNET_LOCK();
784                 lnet_peer_decref_locked(rcd->rcd_gateway);
785                 LNET_UNLOCK();
786         }
787
788         if (rcd->rcd_pinginfo != NULL)
789                 LIBCFS_FREE(rcd->rcd_pinginfo, LNET_PINGINFO_SIZE);
790
791         LIBCFS_FREE(rcd, sizeof(*rcd));
792 }
793
794 lnet_rc_data_t *
795 lnet_create_rc_data_locked(lnet_peer_t *gateway)
796 {
797         lnet_rc_data_t          *rcd = NULL;
798         lnet_ping_info_t        *pi;
799         int                     rc;
800         int                     i;
801
802         LNET_UNLOCK();
803
804         LIBCFS_ALLOC(rcd, sizeof(*rcd));
805         if (rcd == NULL)
806                 goto out;
807
808         LNetInvalidateHandle(&rcd->rcd_mdh);
809         CFS_INIT_LIST_HEAD(&rcd->rcd_list);
810
811         LIBCFS_ALLOC(pi, LNET_PINGINFO_SIZE);
812         if (pi == NULL)
813                 goto out;
814
815         memset(pi, 0, LNET_PINGINFO_SIZE);
816         for (i = 0; i < LNET_MAX_RTR_NIS; i++) {
817                 pi->pi_ni[i].ns_nid = LNET_NID_ANY;
818                 pi->pi_ni[i].ns_status = LNET_NI_STATUS_INVALID;
819         }
820         rcd->rcd_pinginfo = pi;
821
822         LASSERT (!LNetHandleIsInvalid(the_lnet.ln_rc_eqh));
823         rc = LNetMDBind((lnet_md_t){.start     = pi,
824                                     .user_ptr  = rcd,
825                                     .length    = LNET_PINGINFO_SIZE,
826                                     .threshold = LNET_MD_THRESH_INF,
827                                     .options   = LNET_MD_TRUNCATE,
828                                     .eq_handle = the_lnet.ln_rc_eqh},
829                         LNET_UNLINK,
830                         &rcd->rcd_mdh);
831         if (rc < 0) {
832                 CERROR("Can't bind MD: %d\n", rc);
833                 goto out;
834         }
835         LASSERT(rc == 0);
836
837         LNET_LOCK();
838         /* router table changed or someone has created rcd for this gateway */
839         if (!lnet_isrouter(gateway) || gateway->lp_rcd != NULL) {
840                 LNET_UNLOCK();
841                 goto out;
842         }
843
844         lnet_peer_addref_locked(gateway);
845         rcd->rcd_gateway = gateway;
846         gateway->lp_rcd = rcd;
847         gateway->lp_ping_notsent = 0;
848
849         return rcd;
850
851  out:
852         if (rcd != NULL) {
853                 if (!LNetHandleIsInvalid(rcd->rcd_mdh)) {
854                         rc = LNetMDUnlink(rcd->rcd_mdh);
855                         LASSERT(rc == 0);
856                 }
857                 lnet_destroy_rc_data(rcd);
858         }
859
860         LNET_LOCK();
861         return gateway->lp_rcd;
862 }
863
864 static int
865 lnet_router_check_interval (lnet_peer_t *rtr)
866 {
867         int secs;
868
869         secs = rtr->lp_alive ? live_router_check_interval :
870                                dead_router_check_interval;
871         if (secs < 0)
872                 secs = 0;
873
874         return secs;
875 }
876
877 static void
878 lnet_ping_router_locked (lnet_peer_t *rtr)
879 {
880         lnet_rc_data_t *rcd = NULL;
881         cfs_time_t      now = cfs_time_current();
882         int             secs;
883
884         lnet_peer_addref_locked(rtr);
885
886         if (rtr->lp_ping_deadline != 0 && /* ping timed out? */
887             cfs_time_after(now, rtr->lp_ping_deadline))
888                 lnet_notify_locked(rtr, 1, 0, now);
889
890         /* Run any outstanding notifications */
891         lnet_ni_notify_locked(rtr->lp_ni, rtr);
892
893         if (!lnet_isrouter(rtr) ||
894             the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
895                 /* router table changed or router checker is shutting down */
896                 lnet_peer_decref_locked(rtr);
897                 return;
898         }
899
900         rcd = rtr->lp_rcd != NULL ?
901               rtr->lp_rcd : lnet_create_rc_data_locked(rtr);
902
903         if (rcd == NULL)
904                 return;
905
906         secs = lnet_router_check_interval(rtr);
907
908         CDEBUG(D_NET,
909                "rtr %s %d: deadline %lu ping_notsent %d alive %d "
910                "alive_count %d lp_ping_timestamp %lu\n",
911                libcfs_nid2str(rtr->lp_nid), secs,
912                rtr->lp_ping_deadline, rtr->lp_ping_notsent,
913                rtr->lp_alive, rtr->lp_alive_count, rtr->lp_ping_timestamp);
914
915         if (secs != 0 && !rtr->lp_ping_notsent &&
916             cfs_time_after(now, cfs_time_add(rtr->lp_ping_timestamp,
917                                              cfs_time_seconds(secs)))) {
918                 int               rc;
919                 lnet_process_id_t id;
920                 lnet_handle_md_t  mdh;
921
922                 id.nid = rtr->lp_nid;
923                 id.pid = LUSTRE_SRV_LNET_PID;
924                 CDEBUG(D_NET, "Check: %s\n", libcfs_id2str(id));
925
926                 rtr->lp_ping_notsent   = 1;
927                 rtr->lp_ping_timestamp = now;
928
929                 mdh = rcd->rcd_mdh;
930
931                 if (rtr->lp_ping_deadline == 0) {
932                         rtr->lp_ping_deadline = \
933                                 cfs_time_shift(router_ping_timeout);
934                 }
935
936                 LNET_UNLOCK();
937
938                 rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL,
939                              LNET_PROTO_PING_MATCHBITS, 0);
940
941                 LNET_LOCK();
942                 if (rc != 0)
943                         rtr->lp_ping_notsent = 0; /* no event pending */
944         }
945
946         lnet_peer_decref_locked(rtr);
947         return;
948 }
949
950 int
951 lnet_router_checker_start(void)
952 {
953         int          rc;
954         int          eqsz;
955 #ifndef __KERNEL__
956         lnet_peer_t *rtr;
957         __u64        version;
958         int          nrtr = 0;
959         int          router_checker_max_eqsize = 10240;
960
961         LASSERT (check_routers_before_use);
962         LASSERT (dead_router_check_interval > 0);
963
964         LNET_LOCK();
965
966         /* As an approximation, allow each router the same number of
967          * outstanding events as it is allowed outstanding sends */
968         eqsz = 0;
969         version = the_lnet.ln_routers_version;
970         cfs_list_for_each_entry(rtr, &the_lnet.ln_routers, lp_rtr_list) {
971                 lnet_ni_t         *ni = rtr->lp_ni;
972                 lnet_process_id_t  id;
973
974                 nrtr++;
975                 eqsz += ni->ni_peertxcredits;
976
977                 /* one async ping reply per router */
978                 id.nid = rtr->lp_nid;
979                 id.pid = LUSTRE_SRV_LNET_PID;
980
981                 LNET_UNLOCK();
982
983                 rc = LNetSetAsync(id, 1);
984                 if (rc != 0) {
985                         CWARN("LNetSetAsync %s failed: %d\n",
986                               libcfs_id2str(id), rc);
987                         return rc;
988                 }
989
990                 LNET_LOCK();
991                 /* NB router list doesn't change in userspace */
992                 LASSERT (version == the_lnet.ln_routers_version);
993         }
994
995         LNET_UNLOCK();
996
997         if (nrtr == 0) {
998                 CDEBUG(D_NET,
999                        "No router found, not starting router checker\n");
1000                 return 0;
1001         }
1002
1003         /* at least allow a SENT and a REPLY per router */
1004         if (router_checker_max_eqsize < 2 * nrtr)
1005                 router_checker_max_eqsize = 2 * nrtr;
1006
1007         LASSERT (eqsz > 0);
1008         if (eqsz > router_checker_max_eqsize)
1009                 eqsz = router_checker_max_eqsize;
1010 #endif
1011
1012         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1013
1014         if (check_routers_before_use &&
1015             dead_router_check_interval <= 0) {
1016                 LCONSOLE_ERROR_MSG(0x10a, "'dead_router_check_interval' must be"
1017                                    " set if 'check_routers_before_use' is set"
1018                                    "\n");
1019                 return -EINVAL;
1020         }
1021
1022         if (!the_lnet.ln_routing &&
1023             live_router_check_interval <= 0 &&
1024             dead_router_check_interval <= 0)
1025                 return 0;
1026
1027 #ifdef __KERNEL__
1028         cfs_sema_init(&the_lnet.ln_rc_signal, 0);
1029         /* EQ size doesn't matter; the callback is guaranteed to get every
1030          * event */
1031         eqsz = 0;
1032         rc = LNetEQAlloc(eqsz, lnet_router_checker_event,
1033                          &the_lnet.ln_rc_eqh);
1034 #else
1035         rc = LNetEQAlloc(eqsz, LNET_EQ_HANDLER_NONE,
1036                          &the_lnet.ln_rc_eqh);
1037 #endif
1038         if (rc != 0) {
1039                 CERROR("Can't allocate EQ(%d): %d\n", eqsz, rc);
1040                 return -ENOMEM;
1041         }
1042
1043         the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
1044 #ifdef __KERNEL__
1045         rc = cfs_create_thread(lnet_router_checker, NULL, 0);
1046         if (rc < 0) {
1047                 CERROR("Can't start router checker thread: %d\n", rc);
1048                 /* block until event callback signals exit */
1049                 cfs_down(&the_lnet.ln_rc_signal);
1050                 rc = LNetEQFree(the_lnet.ln_rc_eqh);
1051                 LASSERT (rc == 0);
1052                 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1053                 return -ENOMEM;
1054         }
1055 #endif
1056
1057         if (check_routers_before_use) {
1058                 /* Note that a helpful side-effect of pinging all known routers
1059                  * at startup is that it makes them drop stale connections they
1060                  * may have to a previous instance of me. */
1061                 lnet_wait_known_routerstate();
1062         }
1063
1064         return 0;
1065 }
1066
1067 void
1068 lnet_router_checker_stop (void)
1069 {
1070         int rc;
1071
1072         if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
1073                 return;
1074
1075         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1076         the_lnet.ln_rc_state = LNET_RC_STATE_STOPPING;
1077
1078 #ifdef __KERNEL__
1079         /* block until event callback signals exit */
1080         cfs_down(&the_lnet.ln_rc_signal);
1081 #else
1082         lnet_router_checker();
1083 #endif
1084         LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1085
1086         rc = LNetEQFree(the_lnet.ln_rc_eqh);
1087         LASSERT (rc == 0);
1088         return;
1089 }
1090
1091 static void
1092 lnet_prune_rc_data(int wait_unlink)
1093 {
1094         lnet_rc_data_t          *rcd;
1095         lnet_rc_data_t          *tmp;
1096         lnet_peer_t             *lp;
1097         cfs_list_t              head;
1098         int                     i = 2;
1099
1100         if (likely(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
1101                    cfs_list_empty(&the_lnet.ln_rcd_deathrow) &&
1102                    cfs_list_empty(&the_lnet.ln_rcd_zombie)))
1103                 return;
1104
1105         CFS_INIT_LIST_HEAD(&head);
1106
1107         LNET_LOCK();
1108
1109         if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
1110                 /* router checker is stopping, prune all */
1111                 cfs_list_for_each_entry(lp, &the_lnet.ln_routers,
1112                                         lp_rtr_list) {
1113                         if (lp->lp_rcd == NULL)
1114                                 continue;
1115
1116                         LASSERT(cfs_list_empty(&lp->lp_rcd->rcd_list));
1117                         cfs_list_add(&lp->lp_rcd->rcd_list,
1118                                      &the_lnet.ln_rcd_deathrow);
1119                         lp->lp_rcd = NULL;
1120                 }
1121         }
1122
1123         /* unlink all RCDs on deathrow list */
1124         cfs_list_splice_init(&the_lnet.ln_rcd_deathrow, &head);
1125
1126         if (!cfs_list_empty(&head)) {
1127                 LNET_UNLOCK();
1128
1129                 cfs_list_for_each_entry(rcd, &head, rcd_list)
1130                         LNetMDUnlink(rcd->rcd_mdh);
1131
1132                 LNET_LOCK();
1133         }
1134
1135         cfs_list_splice_init(&head, &the_lnet.ln_rcd_zombie);
1136
1137         /* release all zombie RCDs */
1138         while (!cfs_list_empty(&the_lnet.ln_rcd_zombie)) {
1139                 cfs_list_for_each_entry_safe(rcd, tmp, &the_lnet.ln_rcd_zombie,
1140                                              rcd_list) {
1141                         if (!LNetHandleIsInvalid(rcd->rcd_mdh))
1142                                 cfs_list_move(&rcd->rcd_list, &head);
1143                 }
1144
1145                 wait_unlink = wait_unlink &&
1146                               !cfs_list_empty(&the_lnet.ln_rcd_zombie);
1147
1148                 LNET_UNLOCK();
1149
1150                 while (!cfs_list_empty(&head)) {
1151                         rcd = cfs_list_entry(head.next,
1152                                              lnet_rc_data_t, rcd_list);
1153                         cfs_list_del_init(&rcd->rcd_list);
1154                         lnet_destroy_rc_data(rcd);
1155                 }
1156
1157                 if (!wait_unlink)
1158                         break;
1159
1160                 i++;
1161                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
1162                        "Waiting for rc buffers to unlink\n");
1163                 cfs_pause(cfs_time_seconds(1) / 4);
1164
1165                 LNET_LOCK();
1166         }
1167 }
1168
1169
1170 #if defined(__KERNEL__) && defined(LNET_ROUTER)
1171
1172 static int
1173 lnet_router_checker(void *arg)
1174 {
1175         lnet_peer_t       *rtr;
1176         cfs_list_t        *entry;
1177
1178         cfs_daemonize("router_checker");
1179         cfs_block_allsigs();
1180
1181         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1182
1183         while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
1184                 __u64 version;
1185
1186                 LNET_LOCK();
1187 rescan:
1188                 version = the_lnet.ln_routers_version;
1189
1190                 cfs_list_for_each (entry, &the_lnet.ln_routers) {
1191                         rtr = cfs_list_entry(entry, lnet_peer_t, lp_rtr_list);
1192                         lnet_ping_router_locked(rtr);
1193
1194                         /* NB dropped lock */
1195                         if (version != the_lnet.ln_routers_version) {
1196                                 /* the routers list has changed */
1197                                 goto rescan;
1198                         }
1199                 }
1200
1201                 if (the_lnet.ln_routing)
1202                         lnet_update_ni_status_locked();
1203
1204                 LNET_UNLOCK();
1205
1206                 lnet_prune_rc_data(0); /* don't wait for UNLINK */
1207
1208                 /* Call cfs_pause() here always adds 1 to load average 
1209                  * because kernel counts # active tasks as nr_running 
1210                  * + nr_uninterruptible. */
1211                 cfs_schedule_timeout_and_set_state(CFS_TASK_INTERRUPTIBLE,
1212                                                    cfs_time_seconds(1));
1213         }
1214
1215         LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING);
1216
1217         lnet_prune_rc_data(1); /* wait for UNLINK */
1218
1219         the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1220         cfs_up(&the_lnet.ln_rc_signal);
1221         /* The unlink event callback will signal final completion */
1222         return 0;
1223 }
1224
1225 void
1226 lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
1227 {
1228         int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
1229
1230         while (--npages >= 0)
1231                 cfs_free_page(rb->rb_kiov[npages].kiov_page);
1232
1233         LIBCFS_FREE(rb, sz);
1234 }
1235
1236 lnet_rtrbuf_t *
1237 lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp)
1238 {
1239         int            npages = rbp->rbp_npages;
1240         int            sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
1241         struct page   *page;
1242         lnet_rtrbuf_t *rb;
1243         int            i;
1244
1245         LIBCFS_ALLOC(rb, sz);
1246         if (rb == NULL)
1247                 return NULL;
1248
1249         rb->rb_pool = rbp;
1250
1251         for (i = 0; i < npages; i++) {
1252                 page = cfs_alloc_page(CFS_ALLOC_ZERO | CFS_ALLOC_STD);
1253                 if (page == NULL) {
1254                         while (--i >= 0)
1255                                 cfs_free_page(rb->rb_kiov[i].kiov_page);
1256
1257                         LIBCFS_FREE(rb, sz);
1258                         return NULL;
1259                 }
1260
1261                 rb->rb_kiov[i].kiov_len = CFS_PAGE_SIZE;
1262                 rb->rb_kiov[i].kiov_offset = 0;
1263                 rb->rb_kiov[i].kiov_page = page;
1264         }
1265
1266         return rb;
1267 }
1268
1269 void
1270 lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
1271 {
1272         int            npages = rbp->rbp_npages;
1273         int            nbuffers = 0;
1274         lnet_rtrbuf_t *rb;
1275
1276         LASSERT (cfs_list_empty(&rbp->rbp_msgs));
1277         LASSERT (rbp->rbp_credits == rbp->rbp_nbuffers);
1278
1279         while (!cfs_list_empty(&rbp->rbp_bufs)) {
1280                 LASSERT (rbp->rbp_credits > 0);
1281
1282                 rb = cfs_list_entry(rbp->rbp_bufs.next,
1283                                     lnet_rtrbuf_t, rb_list);
1284                 cfs_list_del(&rb->rb_list);
1285                 lnet_destroy_rtrbuf(rb, npages);
1286                 nbuffers++;
1287         }
1288
1289         LASSERT (rbp->rbp_nbuffers == nbuffers);
1290         LASSERT (rbp->rbp_credits == nbuffers);
1291
1292         rbp->rbp_nbuffers = rbp->rbp_credits = 0;
1293 }
1294
1295 int
1296 lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs)
1297 {
1298         lnet_rtrbuf_t *rb;
1299         int            i;
1300
1301         if (rbp->rbp_nbuffers != 0) {
1302                 LASSERT (rbp->rbp_nbuffers == nbufs);
1303                 return 0;
1304         }
1305
1306         for (i = 0; i < nbufs; i++) {
1307                 rb = lnet_new_rtrbuf(rbp);
1308
1309                 if (rb == NULL) {
1310                         CERROR("Failed to allocate %d router bufs of %d pages\n",
1311                                nbufs, rbp->rbp_npages);
1312                         return -ENOMEM;
1313                 }
1314
1315                 rbp->rbp_nbuffers++;
1316                 rbp->rbp_credits++;
1317                 rbp->rbp_mincredits++;
1318                 cfs_list_add(&rb->rb_list, &rbp->rbp_bufs);
1319
1320                 /* No allocation "under fire" */
1321                 /* Otherwise we'd need code to schedule blocked msgs etc */
1322                 LASSERT (!the_lnet.ln_routing);
1323         }
1324
1325         LASSERT (rbp->rbp_credits == nbufs);
1326         return 0;
1327 }
1328
1329 void
1330 lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
1331 {
1332         CFS_INIT_LIST_HEAD(&rbp->rbp_msgs);
1333         CFS_INIT_LIST_HEAD(&rbp->rbp_bufs);
1334
1335         rbp->rbp_npages = npages;
1336         rbp->rbp_credits = 0;
1337         rbp->rbp_mincredits = 0;
1338 }
1339
1340 void
1341 lnet_free_rtrpools(void)
1342 {
1343         lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[0]);
1344         lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[1]);
1345         lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[2]);
1346 }
1347
1348 void
1349 lnet_init_rtrpools(void)
1350 {
1351         int small_pages = 1;
1352         int large_pages = (LNET_MTU + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1353
1354         lnet_rtrpool_init(&the_lnet.ln_rtrpools[0], 0);
1355         lnet_rtrpool_init(&the_lnet.ln_rtrpools[1], small_pages);
1356         lnet_rtrpool_init(&the_lnet.ln_rtrpools[2], large_pages);
1357 }
1358
1359
1360 int
1361 lnet_alloc_rtrpools(int im_a_router)
1362 {
1363         int       rc;
1364
1365         if (!strcmp(forwarding, "")) {
1366                 /* not set either way */
1367                 if (!im_a_router)
1368                         return 0;
1369         } else if (!strcmp(forwarding, "disabled")) {
1370                 /* explicitly disabled */
1371                 return 0;
1372         } else if (!strcmp(forwarding, "enabled")) {
1373                 /* explicitly enabled */
1374         } else {
1375                 LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either "
1376                                    "'enabled' or 'disabled'\n");
1377                 return -EINVAL;
1378         }
1379
1380         if (tiny_router_buffers <= 0) {
1381                 LCONSOLE_ERROR_MSG(0x10c, "tiny_router_buffers=%d invalid when "
1382                                    "routing enabled\n", tiny_router_buffers);
1383                 rc = -EINVAL;
1384                 goto failed;
1385         }
1386
1387         rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[0],
1388                                      tiny_router_buffers);
1389         if (rc != 0)
1390                 goto failed;
1391
1392         if (small_router_buffers <= 0) {
1393                 LCONSOLE_ERROR_MSG(0x10d, "small_router_buffers=%d invalid when"
1394                                    " routing enabled\n", small_router_buffers);
1395                 rc = -EINVAL;
1396                 goto failed;
1397         }
1398
1399         rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[1],
1400                                      small_router_buffers);
1401         if (rc != 0)
1402                 goto failed;
1403
1404         if (large_router_buffers <= 0) {
1405                 LCONSOLE_ERROR_MSG(0x10e, "large_router_buffers=%d invalid when"
1406                                    " routing enabled\n", large_router_buffers);
1407                 rc = -EINVAL;
1408                 goto failed;
1409         }
1410
1411         rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[2],
1412                                      large_router_buffers);
1413         if (rc != 0)
1414                 goto failed;
1415
1416         LNET_LOCK();
1417         the_lnet.ln_routing = 1;
1418         LNET_UNLOCK();
1419
1420         return 0;
1421
1422  failed:
1423         lnet_free_rtrpools();
1424         return rc;
1425 }
1426
1427 int
1428 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
1429 {
1430         lnet_peer_t *lp = NULL;
1431         cfs_time_t   now = cfs_time_current();
1432
1433         LASSERT (!cfs_in_interrupt ());
1434
1435         CDEBUG (D_NET, "%s notifying %s: %s\n",
1436                 (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1437                 libcfs_nid2str(nid),
1438                 alive ? "up" : "down");
1439
1440         if (ni != NULL &&
1441             LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid)) {
1442                 CWARN ("Ignoring notification of %s %s by %s (different net)\n",
1443                         libcfs_nid2str(nid), alive ? "birth" : "death",
1444                         libcfs_nid2str(ni->ni_nid));
1445                 return -EINVAL;
1446         }
1447
1448         /* can't do predictions... */
1449         if (cfs_time_after(when, now)) {
1450                 CWARN ("Ignoring prediction from %s of %s %s "
1451                        "%ld seconds in the future\n",
1452                        (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1453                        libcfs_nid2str(nid), alive ? "up" : "down",
1454                        cfs_duration_sec(cfs_time_sub(when, now)));
1455                 return -EINVAL;
1456         }
1457
1458         if (ni != NULL && !alive &&             /* LND telling me she's down */
1459             !auto_down) {                       /* auto-down disabled */
1460                 CDEBUG(D_NET, "Auto-down disabled\n");
1461                 return 0;
1462         }
1463
1464         LNET_LOCK();
1465
1466         lp = lnet_find_peer_locked(nid);
1467         if (lp == NULL) {
1468                 /* nid not found */
1469                 LNET_UNLOCK();
1470                 CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
1471                 return 0;
1472         }
1473
1474         /* We can't fully trust LND on reporting exact peer last_alive
1475          * if he notifies us about dead peer. For example ksocklnd can
1476          * call us with when == _time_when_the_node_was_booted_ if
1477          * no connections were successfully established */
1478         if (ni != NULL && !alive && when < lp->lp_last_alive)
1479                 when = lp->lp_last_alive;
1480
1481         lnet_notify_locked(lp, ni == NULL, alive, when);
1482
1483         lnet_ni_notify_locked(ni, lp);
1484
1485         lnet_peer_decref_locked(lp);
1486
1487         LNET_UNLOCK();
1488         return 0;
1489 }
1490 EXPORT_SYMBOL(lnet_notify);
1491
1492 void
1493 lnet_get_tunables (void)
1494 {
1495         return;
1496 }
1497
1498 #else
1499
1500 int
1501 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
1502 {
1503         return -EOPNOTSUPP;
1504 }
1505
1506 void
1507 lnet_router_checker (void)
1508 {
1509         static time_t last = 0;
1510         static int    running = 0;
1511
1512         time_t            now = cfs_time_current_sec();
1513         int               interval = now - last;
1514         int               rc;
1515         __u64             version;
1516         lnet_peer_t      *rtr;
1517
1518         /* It's no use to call me again within a sec - all intervals and
1519          * timeouts are measured in seconds */
1520         if (last != 0 && interval < 2)
1521                 return;
1522
1523         if (last != 0 &&
1524             interval > MAX(live_router_check_interval,
1525                            dead_router_check_interval))
1526                 CNETERR("Checker(%d/%d) not called for %d seconds\n",
1527                         live_router_check_interval, dead_router_check_interval,
1528                         interval);
1529
1530         LNET_LOCK();
1531         LASSERT (!running); /* recursion check */
1532         running = 1;
1533         LNET_UNLOCK();
1534
1535         last = now;
1536
1537         if (the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING)
1538                 lnet_prune_rc_data(0); /* unlink all rcd and nowait */
1539
1540         /* consume all pending events */
1541         while (1) {
1542                 int          i;
1543                 lnet_event_t ev;
1544
1545                 /* NB ln_rc_eqh must be the 1st in 'eventqs' otherwise the
1546                  * recursion breaker in LNetEQPoll would fail */
1547                 rc = LNetEQPoll(&the_lnet.ln_rc_eqh, 1, 0, &ev, &i);
1548                 if (rc == 0)   /* no event pending */
1549                         break;
1550
1551                 /* NB a lost SENT prevents me from pinging a router again */
1552                 if (rc == -EOVERFLOW) {
1553                         CERROR("Dropped an event!!!\n");
1554                         abort();
1555                 }
1556
1557                 LASSERT (rc == 1);
1558
1559                 lnet_router_checker_event(&ev);
1560         }
1561
1562         if (the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING) {
1563                 lnet_prune_rc_data(1); /* release rcd */
1564                 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1565                 running = 0;
1566                 return;
1567         }
1568
1569         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1570
1571         LNET_LOCK();
1572
1573         version = the_lnet.ln_routers_version;
1574         cfs_list_for_each_entry (rtr, &the_lnet.ln_routers, lp_rtr_list) {
1575                 lnet_ping_router_locked(rtr);
1576                 LASSERT (version == the_lnet.ln_routers_version);
1577         }
1578
1579         LNET_UNLOCK();
1580
1581         running = 0; /* lock only needed for the recursion check */
1582         return;
1583 }
1584
1585 /* NB lnet_peers_start_down depends on me,
1586  * so must be called before any peer creation */
1587 void
1588 lnet_get_tunables (void)
1589 {
1590         char *s;
1591
1592         s = getenv("LNET_ROUTER_PING_TIMEOUT");
1593         if (s != NULL) router_ping_timeout = atoi(s);
1594
1595         s = getenv("LNET_LIVE_ROUTER_CHECK_INTERVAL");
1596         if (s != NULL) live_router_check_interval = atoi(s);
1597
1598         s = getenv("LNET_DEAD_ROUTER_CHECK_INTERVAL");
1599         if (s != NULL) dead_router_check_interval = atoi(s);
1600
1601         /* This replaces old lnd_notify mechanism */
1602         check_routers_before_use = 1;
1603         if (dead_router_check_interval <= 0)
1604                 dead_router_check_interval = 30;
1605 }
1606
1607 void
1608 lnet_free_rtrpools (void)
1609 {
1610 }
1611
1612 void
1613 lnet_init_rtrpools (void)
1614 {
1615 }
1616
1617 int
1618 lnet_alloc_rtrpools (int im_a_arouter)
1619 {
1620         return 0;
1621 }
1622
1623 #endif