1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
6 * This file is part of Portals
7 * http://sourceforge.net/projects/sandiaportals/
9 * Portals is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Portals is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Portals; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #define DEBUG_SUBSYSTEM S_LNET
25 #include <lnet/lib-lnet.h>
27 #if defined(__KERNEL__) && defined(LNET_ROUTER)
29 static char *forwarding = "";
30 CFS_MODULE_PARM(forwarding, "s", charp, 0444,
31 "Explicitly enable/disable forwarding between networks");
33 static int tiny_router_buffers = 1024;
34 CFS_MODULE_PARM(tiny_router_buffers, "i", int, 0444,
35 "# of 0 payload messages to buffer in the router");
36 static int small_router_buffers = 8192;
37 CFS_MODULE_PARM(small_router_buffers, "i", int, 0444,
38 "# of small (1 page) messages to buffer in the router");
39 static int large_router_buffers = 512;
40 CFS_MODULE_PARM(large_router_buffers, "i", int, 0444,
41 "# of large messages to buffer in the router");
42 static int peer_buffer_credits = 0;
43 CFS_MODULE_PARM(peer_buffer_credits, "i", int, 0444,
44 "# router buffer credits per peer");
46 static int auto_down = 1;
47 CFS_MODULE_PARM(auto_down, "i", int, 0444,
48 "Automatically mark peers down on comms error");
51 lnet_peer_buffer_credits(lnet_ni_t *ni)
53 /* NI option overrides LNet default */
54 if (ni->ni_peerrtrcredits > 0)
55 return ni->ni_peerrtrcredits;
56 if (peer_buffer_credits > 0)
57 return peer_buffer_credits;
59 /* As an approximation, allow this peer the same number of router
60 * buffers as it is allowed outstanding sends */
61 return ni->ni_peertxcredits;
65 static int lnet_router_checker(void *);
69 lnet_peer_buffer_credits(lnet_ni_t *ni)
76 static int check_routers_before_use = 0;
77 CFS_MODULE_PARM(check_routers_before_use, "i", int, 0444,
78 "Assume routers are down and ping them before use");
80 static int dead_router_check_interval = 0;
81 CFS_MODULE_PARM(dead_router_check_interval, "i", int, 0444,
82 "Seconds between dead router health checks (<= 0 to disable)");
84 static int live_router_check_interval = 0;
85 CFS_MODULE_PARM(live_router_check_interval, "i", int, 0444,
86 "Seconds between live router health checks (<= 0 to disable)");
88 static int router_ping_timeout = 50;
89 CFS_MODULE_PARM(router_ping_timeout, "i", int, 0444,
90 "Seconds to wait for the reply to a router health query");
93 lnet_peers_start_down(void)
95 return check_routers_before_use;
99 lnet_notify_locked(lnet_peer_t *lp, int notifylnd, int alive, time_t when)
101 if (when < lp->lp_timestamp) { /* out of date information */
102 CDEBUG(D_NET, "Out of date\n");
106 lp->lp_timestamp = when; /* update timestamp */
107 lp->lp_ping_deadline = 0; /* disable ping timeout */
109 if (lp->lp_alive_count != 0 && /* got old news */
110 (!lp->lp_alive) == (!alive)) { /* new date for old news */
111 CDEBUG(D_NET, "Old news\n");
115 /* Flag that notification is outstanding */
117 lp->lp_alive_count++;
118 lp->lp_alive = !(!alive); /* 1 bit! */
120 lp->lp_notifylnd |= notifylnd;
122 CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lp_nid), alive);
126 lnet_do_notify (lnet_peer_t *lp)
128 lnet_ni_t *ni = lp->lp_ni;
134 /* Notify only in 1 thread at any time to ensure ordered notification.
135 * NB individual events can be missed; the only guarantee is that you
136 * always get the most recent news */
138 if (lp->lp_notifying) {
143 lp->lp_notifying = 1;
145 while (lp->lp_notify) {
146 alive = lp->lp_alive;
147 notifylnd = lp->lp_notifylnd;
149 lp->lp_notifylnd = 0;
152 if (notifylnd && ni->ni_lnd->lnd_notify != NULL) {
155 /* A new notification could happen now; I'll handle it
156 * when control returns to me */
158 (ni->ni_lnd->lnd_notify)(ni, lp->lp_nid, alive);
164 lp->lp_notifying = 0;
171 lnet_rtr_addref_locked(lnet_peer_t *lp)
173 LASSERT (lp->lp_refcount > 0);
174 LASSERT (lp->lp_rtr_refcount >= 0);
176 lp->lp_rtr_refcount++;
177 if (lp->lp_rtr_refcount == 1) {
178 struct list_head *pos;
180 /* a simple insertion sort */
181 list_for_each_prev(pos, &the_lnet.ln_routers) {
182 lnet_peer_t *rtr = list_entry(pos, lnet_peer_t,
185 if (rtr->lp_nid < lp->lp_nid)
189 list_add(&lp->lp_rtr_list, pos);
190 /* addref for the_lnet.ln_routers */
191 lnet_peer_addref_locked(lp);
192 the_lnet.ln_routers_version++;
197 lnet_rtr_decref_locked(lnet_peer_t *lp)
199 LASSERT (lp->lp_refcount > 0);
200 LASSERT (lp->lp_rtr_refcount > 0);
202 lp->lp_rtr_refcount--;
203 if (lp->lp_rtr_refcount == 0) {
204 list_del(&lp->lp_rtr_list);
205 /* decref for the_lnet.ln_routers */
206 lnet_peer_decref_locked(lp);
207 the_lnet.ln_routers_version++;
212 lnet_find_net_locked (__u32 net)
214 lnet_remotenet_t *rnet;
215 struct list_head *tmp;
217 LASSERT (!the_lnet.ln_shutdown);
219 list_for_each (tmp, &the_lnet.ln_remote_nets) {
220 rnet = list_entry(tmp, lnet_remotenet_t, lrn_list);
222 if (rnet->lrn_net == net)
229 lnet_add_route (__u32 net, unsigned int hops, lnet_nid_t gateway)
231 struct list_head zombies;
233 lnet_remotenet_t *rnet;
234 lnet_remotenet_t *rnet2;
236 lnet_route_t *route2;
241 CDEBUG(D_NET, "Add route: net %s hops %u gw %s\n",
242 libcfs_net2str(net), hops, libcfs_nid2str(gateway));
244 if (gateway == LNET_NID_ANY ||
245 LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
246 net == LNET_NIDNET(LNET_NID_ANY) ||
247 LNET_NETTYP(net) == LOLND ||
248 LNET_NIDNET(gateway) == net ||
249 hops < 1 || hops > 255)
252 if (lnet_islocalnet(net)) /* it's a local network */
253 return 0; /* ignore the route entry */
255 /* Assume net, route, all new */
256 LIBCFS_ALLOC(route, sizeof(*route));
257 LIBCFS_ALLOC(rnet, sizeof(*rnet));
258 if (route == NULL || rnet == NULL) {
259 CERROR("Out of memory creating route %s %d %s\n",
260 libcfs_net2str(net), hops, libcfs_nid2str(gateway));
262 LIBCFS_FREE(route, sizeof(*route));
264 LIBCFS_FREE(rnet, sizeof(*rnet));
268 CFS_INIT_LIST_HEAD(&rnet->lrn_routes);
270 rnet->lrn_hops = hops;
274 rc = lnet_nid2peer_locked(&route->lr_gateway, gateway);
278 LIBCFS_FREE(route, sizeof(*route));
279 LIBCFS_FREE(rnet, sizeof(*rnet));
281 if (rc == -EHOSTUNREACH) /* gateway is not on a local net */
282 return 0; /* ignore the route entry */
284 CERROR("Error %d creating route %s %d %s\n", rc,
285 libcfs_net2str(net), hops, libcfs_nid2str(gateway));
289 LASSERT (!the_lnet.ln_shutdown);
290 CFS_INIT_LIST_HEAD(&zombies);
292 rnet2 = lnet_find_net_locked(net);
295 list_add_tail(&rnet->lrn_list, &the_lnet.ln_remote_nets);
299 if (hops > rnet2->lrn_hops) {
300 /* New route is longer; ignore it */
302 } else if (hops < rnet2->lrn_hops) {
303 /* new route supercedes all currently known routes to this
305 list_add(&zombies, &rnet2->lrn_routes);
306 list_del_init(&rnet2->lrn_routes);
310 /* New route has the same hopcount as existing routes; search
311 * for a duplicate route (it's a NOOP if it is) */
312 list_for_each (e, &rnet2->lrn_routes) {
313 route2 = list_entry(e, lnet_route_t, lr_list);
315 if (route2->lr_gateway == route->lr_gateway) {
320 /* our loopups must be true */
321 LASSERT (route2->lr_gateway->lp_nid != gateway);
326 ni = route->lr_gateway->lp_ni;
327 lnet_ni_addref_locked(ni);
330 list_add_tail(&route->lr_list, &rnet2->lrn_routes);
331 the_lnet.ln_remote_nets_version++;
333 lnet_rtr_addref_locked(route->lr_gateway);
337 /* XXX Assume alive */
338 if (ni->ni_lnd->lnd_notify != NULL)
339 (ni->ni_lnd->lnd_notify)(ni, gateway, 1);
343 lnet_peer_decref_locked(route->lr_gateway);
345 LIBCFS_FREE(route, sizeof(*route));
349 LIBCFS_FREE(rnet, sizeof(*rnet));
351 while (!list_empty(&zombies)) {
352 route = list_entry(zombies.next, lnet_route_t, lr_list);
353 list_del(&route->lr_list);
356 lnet_rtr_decref_locked(route->lr_gateway);
357 lnet_peer_decref_locked(route->lr_gateway);
359 LIBCFS_FREE(route, sizeof(*route));
366 lnet_check_routes (void)
368 lnet_remotenet_t *rnet;
370 lnet_route_t *route2;
371 struct list_head *e1;
372 struct list_head *e2;
376 list_for_each (e1, &the_lnet.ln_remote_nets) {
377 rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
380 list_for_each (e2, &rnet->lrn_routes) {
381 route = list_entry(e2, lnet_route_t, lr_list);
385 else if (route->lr_gateway->lp_ni !=
386 route2->lr_gateway->lp_ni) {
389 CERROR("Routes to %s via %s and %s not supported\n",
390 libcfs_net2str(rnet->lrn_net),
391 libcfs_nid2str(route->lr_gateway->lp_nid),
392 libcfs_nid2str(route2->lr_gateway->lp_nid));
403 lnet_del_route (__u32 net, lnet_nid_t gw_nid)
405 lnet_remotenet_t *rnet;
407 struct list_head *e1;
408 struct list_head *e2;
411 CDEBUG(D_NET, "Del route: net %s : gw %s\n",
412 libcfs_net2str(net), libcfs_nid2str(gw_nid));
414 /* NB Caller may specify either all routes via the given gateway
415 * or a specific route entry actual NIDs) */
420 list_for_each (e1, &the_lnet.ln_remote_nets) {
421 rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
423 if (!(net == LNET_NIDNET(LNET_NID_ANY) ||
424 net == rnet->lrn_net))
427 list_for_each (e2, &rnet->lrn_routes) {
428 route = list_entry(e2, lnet_route_t, lr_list);
430 if (!(gw_nid == LNET_NID_ANY ||
431 gw_nid == route->lr_gateway->lp_nid))
434 list_del(&route->lr_list);
435 the_lnet.ln_remote_nets_version++;
437 if (list_empty(&rnet->lrn_routes))
438 list_del(&rnet->lrn_list);
442 lnet_rtr_decref_locked(route->lr_gateway);
443 lnet_peer_decref_locked(route->lr_gateway);
446 LIBCFS_FREE(route, sizeof (*route));
449 LIBCFS_FREE(rnet, sizeof(*rnet));
461 lnet_destroy_routes (void)
463 lnet_del_route(LNET_NIDNET(LNET_NID_ANY), LNET_NID_ANY);
467 lnet_get_route (int idx, __u32 *net, __u32 *hops,
468 lnet_nid_t *gateway, __u32 *alive)
470 struct list_head *e1;
471 struct list_head *e2;
472 lnet_remotenet_t *rnet;
477 list_for_each (e1, &the_lnet.ln_remote_nets) {
478 rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
480 list_for_each (e2, &rnet->lrn_routes) {
481 route = list_entry(e2, lnet_route_t, lr_list);
484 *net = rnet->lrn_net;
485 *hops = rnet->lrn_hops;
486 *gateway = route->lr_gateway->lp_nid;
487 *alive = route->lr_gateway->lp_alive;
499 lnet_wait_known_routerstate(void)
502 struct list_head *entry;
505 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
511 list_for_each (entry, &the_lnet.ln_routers) {
512 rtr = list_entry(entry, lnet_peer_t, lp_rtr_list);
514 if (rtr->lp_alive_count == 0) {
526 lnet_router_checker();
528 cfs_pause(cfs_time_seconds(1));
533 lnet_router_checker_event (lnet_event_t *event)
535 /* CAVEAT EMPTOR: I'm called with LNET_LOCKed and I'm not allowed to
536 * drop it (that's how come I see _every_ event, even ones that would
541 if (event->unlinked) {
542 /* The router checker thread has unlinked the rc_md
544 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKING);
545 the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKED;
547 mutex_up(&the_lnet.ln_rc_signal);
552 LASSERT (event->type == LNET_EVENT_SEND ||
553 event->type == LNET_EVENT_REPLY);
555 nid = (event->type == LNET_EVENT_SEND) ?
556 event->target.nid : event->initiator.nid;
558 lp = lnet_find_peer_locked(nid);
560 /* router may have been removed */
561 CDEBUG(D_NET, "Router %s not found\n", libcfs_nid2str(nid));
565 if (event->type == LNET_EVENT_SEND) /* re-enable another ping */
566 lp->lp_ping_notsent = 0;
568 if (lnet_isrouter(lp) && /* ignore if no longer a router */
569 (event->status != 0 ||
570 event->type == LNET_EVENT_REPLY)) {
572 /* A successful REPLY means the router is up. If _any_ comms
573 * to the router fail I assume it's down (this will happen if
574 * we ping alive routers to try to detect router death before
575 * apps get burned). */
577 lnet_notify_locked(lp, 1, (event->status == 0),
578 cfs_time_current_sec());
580 /* The router checker will wake up very shortly and do the
581 * actual notification.
582 * XXX If 'lp' stops being a router before then, it will still
583 * have the notification pending!!! */
586 /* This decref will NOT drop LNET_LOCK (it had to have 1 ref when it
587 * was in the peer table and I've not dropped the lock, so no-one else
588 * can have reduced the refcount) */
589 LASSERT(lp->lp_refcount > 1);
591 lnet_peer_decref_locked(lp);
595 lnet_router_check_interval (lnet_peer_t *rtr)
599 secs = rtr->lp_alive ? live_router_check_interval :
600 dead_router_check_interval;
608 lnet_ping_router_locked (lnet_peer_t *rtr)
610 lnet_process_id_t id;
612 time_t now = cfs_time_current_sec();
614 lnet_peer_addref_locked(rtr);
616 if (rtr->lp_ping_deadline != 0 && /* ping timed out? */
617 now > rtr->lp_ping_deadline)
618 lnet_notify_locked(rtr, 1, 0, now);
622 /* Run any outstanding notifications */
627 secs = lnet_router_check_interval(rtr);
630 "rtr %s %d: deadline %lu ping_notsent %d alive %d "
631 "alive_count %d lp_ping_timestamp %lu\n",
632 libcfs_nid2str(rtr->lp_nid), secs,
633 rtr->lp_ping_deadline, rtr->lp_ping_notsent,
634 rtr->lp_alive, rtr->lp_alive_count, rtr->lp_ping_timestamp);
636 if (secs != 0 && !rtr->lp_ping_notsent &&
637 now > rtr->lp_ping_timestamp + secs) {
638 id.nid = rtr->lp_nid;
639 id.pid = LUSTRE_SRV_LNET_PID;
640 CDEBUG(D_NET, "Check: %s\n", libcfs_id2str(id));
642 rtr->lp_ping_notsent = 1;
643 rtr->lp_ping_timestamp = now;
645 if (rtr->lp_ping_deadline == 0)
646 rtr->lp_ping_deadline = now + router_ping_timeout;
650 LNetGet(LNET_NID_ANY, the_lnet.ln_rc_mdh, id,
651 LNET_RESERVED_PORTAL, LNET_PROTO_PING_MATCHBITS, 0);
656 lnet_peer_decref_locked(rtr);
661 lnet_router_checker_start(void)
663 static lnet_ping_info_t pinginfo;
672 int router_checker_max_eqsize = 10240;
674 LASSERT (check_routers_before_use);
675 LASSERT (dead_router_check_interval > 0);
679 /* As an approximation, allow each router the same number of
680 * outstanding events as it is allowed outstanding sends */
682 version = the_lnet.ln_routers_version;
683 list_for_each_entry(rtr, &the_lnet.ln_routers, lp_rtr_list) {
684 lnet_ni_t *ni = rtr->lp_ni;
685 lnet_process_id_t id;
688 eqsz += ni->ni_peertxcredits;
690 /* one async ping reply per router */
691 id.nid = rtr->lp_nid;
692 id.pid = LUSTRE_SRV_LNET_PID;
696 rc = LNetSetAsync(id, 1);
698 CWARN("LNetSetAsync %s failed: %d\n",
699 libcfs_id2str(id), rc);
704 /* NB router list doesn't change in userspace */
705 LASSERT (version == the_lnet.ln_routers_version);
712 "No router found, not starting router checker\n");
716 /* at least allow a SENT and a REPLY per router */
717 if (router_checker_max_eqsize < 2 * nrtr)
718 router_checker_max_eqsize = 2 * nrtr;
721 if (eqsz > router_checker_max_eqsize)
722 eqsz = router_checker_max_eqsize;
725 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
727 if (check_routers_before_use &&
728 dead_router_check_interval <= 0) {
729 LCONSOLE_ERROR_MSG(0x10a, "'dead_router_check_interval' must be"
730 " set if 'check_routers_before_use' is set"
735 if (live_router_check_interval <= 0 &&
736 dead_router_check_interval <= 0)
740 init_mutex_locked(&the_lnet.ln_rc_signal);
741 /* EQ size doesn't matter; the callback is guaranteed to get every
744 rc = LNetEQAlloc(eqsz, lnet_router_checker_event,
745 &the_lnet.ln_rc_eqh);
747 rc = LNetEQAlloc(eqsz, LNET_EQ_HANDLER_NONE,
748 &the_lnet.ln_rc_eqh);
751 CERROR("Can't allocate EQ(%d): %d\n", eqsz, rc);
755 memset(&md, 0, sizeof(md));
756 md.start = &pinginfo;
757 md.length = sizeof(pinginfo);
758 md.options = LNET_MD_TRUNCATE;
759 md.threshold = LNET_MD_THRESH_INF;
760 md.eq_handle = the_lnet.ln_rc_eqh;
761 rc = LNetMDBind(md, LNET_UNLINK, &the_lnet.ln_rc_mdh);
763 CERROR("Can't bind MD: %d\n", rc);
764 rc = LNetEQFree(the_lnet.ln_rc_eqh);
770 the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
772 rc = (int)cfs_kernel_thread(lnet_router_checker, NULL, 0);
774 CERROR("Can't start router checker thread: %d\n", rc);
775 the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKING;
776 rc = LNetMDUnlink(the_lnet.ln_rc_mdh);
778 /* block until event callback signals exit */
779 mutex_down(&the_lnet.ln_rc_signal);
780 rc = LNetEQFree(the_lnet.ln_rc_eqh);
782 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
787 if (check_routers_before_use) {
788 /* Note that a helpful side-effect of pinging all known routers
789 * at startup is that it makes them drop stale connections they
790 * may have to a previous instance of me. */
791 lnet_wait_known_routerstate();
798 lnet_router_checker_stop (void)
802 if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
805 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
806 the_lnet.ln_rc_state = LNET_RC_STATE_STOPTHREAD;
809 /* block until event callback signals exit */
810 mutex_down(&the_lnet.ln_rc_signal);
812 while (the_lnet.ln_rc_state != LNET_RC_STATE_UNLINKED) {
813 lnet_router_checker();
814 cfs_pause(cfs_time_seconds(1));
817 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKED);
819 rc = LNetEQFree(the_lnet.ln_rc_eqh);
821 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
825 #if defined(__KERNEL__) && defined(LNET_ROUTER)
828 lnet_router_checker(void *arg)
832 struct list_head *entry;
833 lnet_process_id_t rtr_id;
835 cfs_daemonize("router_checker");
838 rtr_id.pid = LUSTRE_SRV_LNET_PID;
840 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
842 while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
847 version = the_lnet.ln_routers_version;
849 list_for_each (entry, &the_lnet.ln_routers) {
850 rtr = list_entry(entry, lnet_peer_t, lp_rtr_list);
851 lnet_ping_router_locked(rtr);
853 /* NB dropped lock */
854 if (version != the_lnet.ln_routers_version) {
855 /* the routers list has changed */
862 /* Call cfs_pause() here always adds 1 to load average
863 * because kernel counts # active tasks as nr_running
864 * + nr_uninterruptible. */
865 cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE,
866 cfs_time_seconds(1));
869 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_STOPTHREAD);
870 the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKING;
872 rc = LNetMDUnlink(the_lnet.ln_rc_mdh);
875 /* The unlink event callback will signal final completion */
880 lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
882 int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
884 while (--npages >= 0)
885 cfs_free_page(rb->rb_kiov[npages].kiov_page);
891 lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp)
893 int npages = rbp->rbp_npages;
894 int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
899 LIBCFS_ALLOC(rb, sz);
905 for (i = 0; i < npages; i++) {
906 page = cfs_alloc_page(CFS_ALLOC_ZERO | CFS_ALLOC_STD);
909 cfs_free_page(rb->rb_kiov[i].kiov_page);
915 rb->rb_kiov[i].kiov_len = CFS_PAGE_SIZE;
916 rb->rb_kiov[i].kiov_offset = 0;
917 rb->rb_kiov[i].kiov_page = page;
924 lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
926 int npages = rbp->rbp_npages;
930 LASSERT (list_empty(&rbp->rbp_msgs));
931 LASSERT (rbp->rbp_credits == rbp->rbp_nbuffers);
933 while (!list_empty(&rbp->rbp_bufs)) {
934 LASSERT (rbp->rbp_credits > 0);
936 rb = list_entry(rbp->rbp_bufs.next,
937 lnet_rtrbuf_t, rb_list);
938 list_del(&rb->rb_list);
939 lnet_destroy_rtrbuf(rb, npages);
943 LASSERT (rbp->rbp_nbuffers == nbuffers);
944 LASSERT (rbp->rbp_credits == nbuffers);
946 rbp->rbp_nbuffers = rbp->rbp_credits = 0;
950 lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs)
955 if (rbp->rbp_nbuffers != 0) {
956 LASSERT (rbp->rbp_nbuffers == nbufs);
960 for (i = 0; i < nbufs; i++) {
961 rb = lnet_new_rtrbuf(rbp);
964 CERROR("Failed to allocate %d router bufs of %d pages\n",
965 nbufs, rbp->rbp_npages);
971 rbp->rbp_mincredits++;
972 list_add(&rb->rb_list, &rbp->rbp_bufs);
974 /* No allocation "under fire" */
975 /* Otherwise we'd need code to schedule blocked msgs etc */
976 LASSERT (!the_lnet.ln_routing);
979 LASSERT (rbp->rbp_credits == nbufs);
984 lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
986 CFS_INIT_LIST_HEAD(&rbp->rbp_msgs);
987 CFS_INIT_LIST_HEAD(&rbp->rbp_bufs);
989 rbp->rbp_npages = npages;
990 rbp->rbp_credits = 0;
991 rbp->rbp_mincredits = 0;
995 lnet_free_rtrpools(void)
997 lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[0]);
998 lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[1]);
999 lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[2]);
1003 lnet_init_rtrpools(void)
1005 int small_pages = 1;
1006 int large_pages = (LNET_MTU + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1008 lnet_rtrpool_init(&the_lnet.ln_rtrpools[0], 0);
1009 lnet_rtrpool_init(&the_lnet.ln_rtrpools[1], small_pages);
1010 lnet_rtrpool_init(&the_lnet.ln_rtrpools[2], large_pages);
1015 lnet_alloc_rtrpools(int im_a_router)
1019 if (!strcmp(forwarding, "")) {
1020 /* not set either way */
1023 } else if (!strcmp(forwarding, "disabled")) {
1024 /* explicitly disabled */
1026 } else if (!strcmp(forwarding, "enabled")) {
1027 /* explicitly enabled */
1029 LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either "
1030 "'enabled' or 'disabled'\n");
1034 if (tiny_router_buffers <= 0) {
1035 LCONSOLE_ERROR_MSG(0x10c, "tiny_router_buffers=%d invalid when "
1036 "routing enabled\n", tiny_router_buffers);
1041 rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[0],
1042 tiny_router_buffers);
1046 if (small_router_buffers <= 0) {
1047 LCONSOLE_ERROR_MSG(0x10d, "small_router_buffers=%d invalid when"
1048 " routing enabled\n", small_router_buffers);
1053 rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[1],
1054 small_router_buffers);
1058 if (large_router_buffers <= 0) {
1059 LCONSOLE_ERROR_MSG(0x10e, "large_router_buffers=%d invalid when"
1060 " routing enabled\n", large_router_buffers);
1065 rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[2],
1066 large_router_buffers);
1071 the_lnet.ln_routing = 1;
1077 lnet_free_rtrpools();
1082 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, time_t when)
1084 lnet_peer_t *lp = NULL;
1085 time_t now = cfs_time_current_sec();
1087 LASSERT (!in_interrupt ());
1089 CDEBUG (D_NET, "%s notifying %s: %s\n",
1090 (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1091 libcfs_nid2str(nid),
1092 alive ? "up" : "down");
1095 LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid)) {
1096 CWARN ("Ignoring notification of %s %s by %s (different net)\n",
1097 libcfs_nid2str(nid), alive ? "birth" : "death",
1098 libcfs_nid2str(ni->ni_nid));
1102 /* can't do predictions... */
1104 CWARN ("Ignoring prediction from %s of %s %s "
1105 "%ld seconds in the future\n",
1106 (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
1107 libcfs_nid2str(nid), alive ? "up" : "down",
1112 if (ni != NULL && !alive && /* LND telling me she's down */
1113 !auto_down) { /* auto-down disabled */
1114 CDEBUG(D_NET, "Auto-down disabled\n");
1120 lp = lnet_find_peer_locked(nid);
1124 CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
1128 /* We can't fully trust LND on reporting exact peer last_alive
1129 * if he notifies us about dead peer. For example ksocklnd can
1130 * call us with when == _time_when_the_node_was_booted_ if
1131 * no connections were successfully established */
1132 if (ni != NULL && !alive && when < lp->lp_last_alive)
1133 when = lp->lp_last_alive;
1135 lnet_notify_locked(lp, ni == NULL, alive, when);
1143 lnet_peer_decref_locked(lp);
1148 EXPORT_SYMBOL(lnet_notify);
1151 lnet_get_tunables (void)
1159 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, time_t when)
1165 lnet_router_checker (void)
1167 static time_t last = 0;
1168 static int running = 0;
1170 time_t now = cfs_time_current_sec();
1171 int interval = now - last;
1176 /* It's no use to call me again within a sec - all intervals and
1177 * timeouts are measured in seconds */
1178 if (last != 0 && interval < 2)
1182 interval > MAX(live_router_check_interval,
1183 dead_router_check_interval))
1184 CDEBUG(D_NETERROR, "Checker(%d/%d) not called for %d seconds\n",
1185 live_router_check_interval, dead_router_check_interval,
1189 LASSERT (!running); /* recursion check */
1195 if (the_lnet.ln_rc_state == LNET_RC_STATE_STOPTHREAD) {
1196 the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKING;
1197 rc = LNetMDUnlink(the_lnet.ln_rc_mdh);
1201 /* consume all pending events */
1206 /* NB ln_rc_eqh must be the 1st in 'eventqs' otherwise the
1207 * recursion breaker in LNetEQPoll would fail */
1208 rc = LNetEQPoll(&the_lnet.ln_rc_eqh, 1, 0, &ev, &i);
1209 if (rc == 0) /* no event pending */
1212 /* NB a lost SENT prevents me from pinging a router again */
1213 if (rc == -EOVERFLOW) {
1214 CERROR("Dropped an event!!!\n");
1221 lnet_router_checker_event(&ev);
1225 if (the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKED ||
1226 the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKING) {
1231 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1235 version = the_lnet.ln_routers_version;
1236 list_for_each_entry (rtr, &the_lnet.ln_routers, lp_rtr_list) {
1237 lnet_ping_router_locked(rtr);
1238 LASSERT (version == the_lnet.ln_routers_version);
1243 running = 0; /* lock only needed for the recursion check */
1247 /* NB lnet_peers_start_down depends on me,
1248 * so must be called before any peer creation */
1250 lnet_get_tunables (void)
1254 s = getenv("LNET_ROUTER_PING_TIMEOUT");
1255 if (s != NULL) router_ping_timeout = atoi(s);
1257 s = getenv("LNET_LIVE_ROUTER_CHECK_INTERVAL");
1258 if (s != NULL) live_router_check_interval = atoi(s);
1260 s = getenv("LNET_DEAD_ROUTER_CHECK_INTERVAL");
1261 if (s != NULL) dead_router_check_interval = atoi(s);
1263 /* This replaces old lnd_notify mechanism */
1264 check_routers_before_use = 1;
1265 if (dead_router_check_interval <= 0)
1266 dead_router_check_interval = 30;
1270 lnet_free_rtrpools (void)
1275 lnet_init_rtrpools (void)
1280 lnet_alloc_rtrpools (int im_a_arouter)