1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
6 * This file is part of Portals
7 * http://sourceforge.net/projects/sandiaportals/
9 * Portals is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Portals is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Portals; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #define DEBUG_SUBSYSTEM S_LNET
25 #include <lnet/lib-lnet.h>
27 #if defined(__KERNEL__) && defined(LNET_ROUTER)
29 static char *forwarding = "";
30 CFS_MODULE_PARM(forwarding, "s", charp, 0444,
31 "Explicitly enable/disable forwarding between networks");
33 static int tiny_router_buffers = 1024;
34 CFS_MODULE_PARM(tiny_router_buffers, "i", int, 0444,
35 "# of 0 payload messages to buffer in the router");
36 static int small_router_buffers = 8192;
37 CFS_MODULE_PARM(small_router_buffers, "i", int, 0444,
38 "# of small (1 page) messages to buffer in the router");
39 static int large_router_buffers = 512;
40 CFS_MODULE_PARM(large_router_buffers, "i", int, 0444,
41 "# of large messages to buffer in the router");
42 static int peer_buffer_credits = 0;
43 CFS_MODULE_PARM(peer_buffer_credits, "i", int, 0444,
44 "# router buffer credits per peer");
46 static int auto_down = 1;
47 CFS_MODULE_PARM(auto_down, "i", int, 0444,
48 "Automatically mark peers down on comms error");
50 static int check_routers_before_use = 0;
51 CFS_MODULE_PARM(check_routers_before_use, "i", int, 0444,
52 "Assume routers are down and ping them before use");
54 static int dead_router_check_interval = 0;
55 CFS_MODULE_PARM(dead_router_check_interval, "i", int, 0444,
56 "Seconds between dead router health checks (<= 0 to disable)");
58 static int live_router_check_interval = 0;
59 CFS_MODULE_PARM(live_router_check_interval, "i", int, 0444,
60 "Seconds between live router health checks (<= 0 to disable)");
62 static int router_ping_timeout = 50;
63 CFS_MODULE_PARM(router_ping_timeout, "i", int, 0444,
64 "Seconds to wait for the reply to a router health query");
67 lnet_peers_start_down(void)
69 return check_routers_before_use;
73 lnet_peer_buffer_credits(lnet_ni_t *ni)
75 /* NI option overrides LNet default */
76 if (ni->ni_peerrtrcredits > 0)
77 return ni->ni_peerrtrcredits;
78 if (peer_buffer_credits > 0)
79 return peer_buffer_credits;
81 /* As an approximation, allow this peer the same number of router
82 * buffers as it is allowed outstanding sends */
83 return ni->ni_peertxcredits;
87 lnet_notify_locked(lnet_peer_t *lp, int notifylnd, int alive, time_t when)
89 if (when < lp->lp_timestamp) { /* out of date information */
90 CDEBUG(D_NET, "Out of date\n");
94 lp->lp_timestamp = when; /* update timestamp */
95 lp->lp_ping_deadline = 0; /* disable ping timeout */
97 if (lp->lp_alive_count != 0 && /* got old news */
98 (!lp->lp_alive) == (!alive)) { /* new date for old news */
99 CDEBUG(D_NET, "Old news\n");
103 /* Flag that notification is outstanding */
105 lp->lp_alive_count++;
106 lp->lp_alive = !(!alive); /* 1 bit! */
108 lp->lp_notifylnd |= notifylnd;
110 CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lp_nid), alive);
114 lnet_do_notify (lnet_peer_t *lp)
116 lnet_ni_t *ni = lp->lp_ni;
122 /* Notify only in 1 thread at any time to ensure ordered notification.
123 * NB individual events can be missed; the only guarantee is that you
124 * always get the most recent news */
126 if (lp->lp_notifying) {
131 lp->lp_notifying = 1;
133 while (lp->lp_notify) {
134 alive = lp->lp_alive;
135 notifylnd = lp->lp_notifylnd;
137 lp->lp_notifylnd = 0;
140 if (notifylnd && ni->ni_lnd->lnd_notify != NULL) {
143 /* A new notification could happen now; I'll handle it
144 * when control returns to me */
146 (ni->ni_lnd->lnd_notify)(ni, lp->lp_nid, alive);
152 lp->lp_notifying = 0;
158 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, time_t when)
160 lnet_peer_t *lp = NULL;
161 time_t now = cfs_time_current_sec();
163 LASSERT (!in_interrupt ());
165 CDEBUG (D_NET, "%s notifying %s: %s\n",
166 (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
168 alive ? "up" : "down");
171 LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid)) {
172 CWARN ("Ignoring notification of %s %s by %s (different net)\n",
173 libcfs_nid2str(nid), alive ? "birth" : "death",
174 libcfs_nid2str(ni->ni_nid));
178 /* can't do predictions... */
180 CWARN ("Ignoring prediction from %s of %s %s "
181 "%ld seconds in the future\n",
182 (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
183 libcfs_nid2str(nid), alive ? "up" : "down",
188 if (ni != NULL && !alive && /* LND telling me she's down */
189 !auto_down) { /* auto-down disabled */
190 CDEBUG(D_NET, "Auto-down disabled\n");
196 lp = lnet_find_peer_locked(nid);
200 CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
204 /* We can't fully trust LND on reporting exact peer last_alive
205 * if he notifies us about dead peer. For example ksocklnd can
206 * call us with when == _time_when_the_node_was_booted_ if
207 * no connections were successfully established */
208 if (ni != NULL && !alive && when < lp->lp_last_alive)
209 when = lp->lp_last_alive;
211 lnet_notify_locked(lp, ni == NULL, alive, when);
219 lnet_peer_decref_locked(lp);
224 EXPORT_SYMBOL(lnet_notify);
229 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, time_t when)
235 lnet_notify_locked (lnet_peer_t *lp, int notifylnd, int alive, time_t when)
243 lnet_rtr_addref_locked(lnet_peer_t *lp)
245 LASSERT (lp->lp_refcount > 0);
246 LASSERT (lp->lp_rtr_refcount >= 0);
248 lp->lp_rtr_refcount++;
249 if (lp->lp_rtr_refcount == 1) {
250 struct list_head *pos;
252 /* a simple insertion sort */
253 list_for_each_prev(pos, &the_lnet.ln_routers) {
254 lnet_peer_t *rtr = list_entry(pos, lnet_peer_t,
257 if (rtr->lp_nid < lp->lp_nid)
261 list_add(&lp->lp_rtr_list, pos);
262 /* addref for the_lnet.ln_routers */
263 lnet_peer_addref_locked(lp);
264 the_lnet.ln_routers_version++;
269 lnet_rtr_decref_locked(lnet_peer_t *lp)
271 LASSERT (lp->lp_refcount > 0);
272 LASSERT (lp->lp_rtr_refcount > 0);
274 lp->lp_rtr_refcount--;
275 if (lp->lp_rtr_refcount == 0) {
276 list_del(&lp->lp_rtr_list);
277 /* decref for the_lnet.ln_routers */
278 lnet_peer_decref_locked(lp);
279 the_lnet.ln_routers_version++;
284 lnet_find_net_locked (__u32 net)
286 lnet_remotenet_t *rnet;
287 struct list_head *tmp;
289 LASSERT (!the_lnet.ln_shutdown);
291 list_for_each (tmp, &the_lnet.ln_remote_nets) {
292 rnet = list_entry(tmp, lnet_remotenet_t, lrn_list);
294 if (rnet->lrn_net == net)
301 lnet_add_route (__u32 net, unsigned int hops, lnet_nid_t gateway)
303 struct list_head zombies;
305 lnet_remotenet_t *rnet;
306 lnet_remotenet_t *rnet2;
308 lnet_route_t *route2;
313 CDEBUG(D_NET, "Add route: net %s hops %u gw %s\n",
314 libcfs_net2str(net), hops, libcfs_nid2str(gateway));
316 if (gateway == LNET_NID_ANY ||
317 LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
318 net == LNET_NIDNET(LNET_NID_ANY) ||
319 LNET_NETTYP(net) == LOLND ||
320 LNET_NIDNET(gateway) == net ||
321 hops < 1 || hops > 255)
324 if (lnet_islocalnet(net)) /* it's a local network */
325 return 0; /* ignore the route entry */
327 /* Assume net, route, all new */
328 LIBCFS_ALLOC(route, sizeof(*route));
329 LIBCFS_ALLOC(rnet, sizeof(*rnet));
330 if (route == NULL || rnet == NULL) {
331 CERROR("Out of memory creating route %s %d %s\n",
332 libcfs_net2str(net), hops, libcfs_nid2str(gateway));
334 LIBCFS_FREE(route, sizeof(*route));
336 LIBCFS_FREE(rnet, sizeof(*rnet));
340 CFS_INIT_LIST_HEAD(&rnet->lrn_routes);
342 rnet->lrn_hops = hops;
346 rc = lnet_nid2peer_locked(&route->lr_gateway, gateway);
350 LIBCFS_FREE(route, sizeof(*route));
351 LIBCFS_FREE(rnet, sizeof(*rnet));
353 if (rc == -EHOSTUNREACH) /* gateway is not on a local net */
354 return 0; /* ignore the route entry */
356 CERROR("Error %d creating route %s %d %s\n", rc,
357 libcfs_net2str(net), hops, libcfs_nid2str(gateway));
361 LASSERT (!the_lnet.ln_shutdown);
362 CFS_INIT_LIST_HEAD(&zombies);
364 rnet2 = lnet_find_net_locked(net);
367 list_add_tail(&rnet->lrn_list, &the_lnet.ln_remote_nets);
371 if (hops > rnet2->lrn_hops) {
372 /* New route is longer; ignore it */
374 } else if (hops < rnet2->lrn_hops) {
375 /* new route supercedes all currently known routes to this
377 list_add(&zombies, &rnet2->lrn_routes);
378 list_del_init(&rnet2->lrn_routes);
382 /* New route has the same hopcount as existing routes; search
383 * for a duplicate route (it's a NOOP if it is) */
384 list_for_each (e, &rnet2->lrn_routes) {
385 route2 = list_entry(e, lnet_route_t, lr_list);
387 if (route2->lr_gateway == route->lr_gateway) {
392 /* our loopups must be true */
393 LASSERT (route2->lr_gateway->lp_nid != gateway);
398 ni = route->lr_gateway->lp_ni;
399 lnet_ni_addref_locked(ni);
402 list_add_tail(&route->lr_list, &rnet2->lrn_routes);
403 the_lnet.ln_remote_nets_version++;
405 lnet_rtr_addref_locked(route->lr_gateway);
409 /* XXX Assume alive */
410 if (ni->ni_lnd->lnd_notify != NULL)
411 (ni->ni_lnd->lnd_notify)(ni, gateway, 1);
415 lnet_peer_decref_locked(route->lr_gateway);
417 LIBCFS_FREE(route, sizeof(*route));
421 LIBCFS_FREE(rnet, sizeof(*rnet));
423 while (!list_empty(&zombies)) {
424 route = list_entry(zombies.next, lnet_route_t, lr_list);
425 list_del(&route->lr_list);
428 lnet_rtr_decref_locked(route->lr_gateway);
429 lnet_peer_decref_locked(route->lr_gateway);
431 LIBCFS_FREE(route, sizeof(*route));
438 lnet_check_routes (void)
440 lnet_remotenet_t *rnet;
442 lnet_route_t *route2;
443 struct list_head *e1;
444 struct list_head *e2;
448 list_for_each (e1, &the_lnet.ln_remote_nets) {
449 rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
452 list_for_each (e2, &rnet->lrn_routes) {
453 route = list_entry(e2, lnet_route_t, lr_list);
457 else if (route->lr_gateway->lp_ni !=
458 route2->lr_gateway->lp_ni) {
461 CERROR("Routes to %s via %s and %s not supported\n",
462 libcfs_net2str(rnet->lrn_net),
463 libcfs_nid2str(route->lr_gateway->lp_nid),
464 libcfs_nid2str(route2->lr_gateway->lp_nid));
475 lnet_del_route (__u32 net, lnet_nid_t gw_nid)
477 lnet_remotenet_t *rnet;
479 struct list_head *e1;
480 struct list_head *e2;
483 CDEBUG(D_NET, "Del route: net %s : gw %s\n",
484 libcfs_net2str(net), libcfs_nid2str(gw_nid));
486 /* NB Caller may specify either all routes via the given gateway
487 * or a specific route entry actual NIDs) */
492 list_for_each (e1, &the_lnet.ln_remote_nets) {
493 rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
495 if (!(net == LNET_NIDNET(LNET_NID_ANY) ||
496 net == rnet->lrn_net))
499 list_for_each (e2, &rnet->lrn_routes) {
500 route = list_entry(e2, lnet_route_t, lr_list);
502 if (!(gw_nid == LNET_NID_ANY ||
503 gw_nid == route->lr_gateway->lp_nid))
506 list_del(&route->lr_list);
507 the_lnet.ln_remote_nets_version++;
509 if (list_empty(&rnet->lrn_routes))
510 list_del(&rnet->lrn_list);
514 lnet_rtr_decref_locked(route->lr_gateway);
515 lnet_peer_decref_locked(route->lr_gateway);
518 LIBCFS_FREE(route, sizeof (*route));
521 LIBCFS_FREE(rnet, sizeof(*rnet));
533 lnet_destroy_routes (void)
535 lnet_del_route(LNET_NIDNET(LNET_NID_ANY), LNET_NID_ANY);
539 lnet_get_route (int idx, __u32 *net, __u32 *hops,
540 lnet_nid_t *gateway, __u32 *alive)
542 struct list_head *e1;
543 struct list_head *e2;
544 lnet_remotenet_t *rnet;
549 list_for_each (e1, &the_lnet.ln_remote_nets) {
550 rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
552 list_for_each (e2, &rnet->lrn_routes) {
553 route = list_entry(e2, lnet_route_t, lr_list);
556 *net = rnet->lrn_net;
557 *hops = rnet->lrn_hops;
558 *gateway = route->lr_gateway->lp_nid;
559 *alive = route->lr_gateway->lp_alive;
570 #if defined(__KERNEL__) && defined(LNET_ROUTER)
572 lnet_router_checker_event (lnet_event_t *event)
574 /* CAVEAT EMPTOR: I'm called with LNET_LOCKed and I'm not allowed to
575 * drop it (that's how come I see _every_ event, even ones that would
580 if (event->unlinked) {
581 /* The router checker thread has unlinked the rc_md
583 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKING);
584 the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKED;
585 mutex_up(&the_lnet.ln_rc_signal);
589 LASSERT (event->type == LNET_EVENT_SEND ||
590 event->type == LNET_EVENT_REPLY);
592 nid = (event->type == LNET_EVENT_SEND) ?
593 event->target.nid : event->initiator.nid;
595 lp = lnet_find_peer_locked(nid);
597 /* router may have been removed */
598 CDEBUG(D_NET, "Router %s not found\n", libcfs_nid2str(nid));
602 if (event->type == LNET_EVENT_SEND) /* re-enable another ping */
603 lp->lp_ping_notsent = 0;
605 if (lnet_isrouter(lp) && /* ignore if no longer a router */
606 (event->status != 0 ||
607 event->type == LNET_EVENT_REPLY)) {
609 /* A successful REPLY means the router is up. If _any_ comms
610 * to the router fail I assume it's down (this will happen if
611 * we ping alive routers to try to detect router death before
612 * apps get burned). */
614 lnet_notify_locked(lp, 1, (event->status == 0),
615 cfs_time_current_sec());
617 /* The router checker will wake up very shortly and do the
618 * actual notification.
619 * XXX If 'lp' stops being a router before then, it will still
620 * have the notification pending!!! */
623 /* This decref will NOT drop LNET_LOCK (it had to have 1 ref when it
624 * was in the peer table and I've not dropped the lock, so no-one else
625 * can have reduced the refcount) */
626 LASSERT(lp->lp_refcount > 1);
628 lnet_peer_decref_locked(lp);
632 lnet_router_checker(void *arg)
634 static lnet_ping_info_t pinginfo;
637 lnet_handle_md_t mdh;
640 struct list_head *entry;
642 lnet_process_id_t rtr_id;
645 cfs_daemonize("router_checker");
648 rtr_id.pid = LUSTRE_SRV_LNET_PID;
650 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
652 /* initialize md content */
653 md.start = &pinginfo;
654 md.length = sizeof(pinginfo);
655 md.threshold = LNET_MD_THRESH_INF;
657 md.options = LNET_MD_TRUNCATE,
659 md.eq_handle = the_lnet.ln_rc_eqh;
661 rc = LNetMDBind(md, LNET_UNLINK, &mdh);
664 CERROR("Can't bind MD: %d\n", rc);
665 the_lnet.ln_rc_state = rc;
666 mutex_up(&the_lnet.ln_rc_signal);
672 the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
673 mutex_up(&the_lnet.ln_rc_signal); /* let my parent go */
675 while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
680 version = the_lnet.ln_routers_version;
682 list_for_each (entry, &the_lnet.ln_routers) {
683 rtr = list_entry(entry, lnet_peer_t, lp_rtr_list);
685 lnet_peer_addref_locked(rtr);
687 now = cfs_time_current_sec();
689 if (rtr->lp_ping_deadline != 0 && /* ping timed out? */
690 now > rtr->lp_ping_deadline)
691 lnet_notify_locked(rtr, 1, 0, now);
695 /* Run any outstanding notificiations */
699 secs = live_router_check_interval;
701 secs = dead_router_check_interval;
707 !rtr->lp_ping_notsent &&
708 now > rtr->lp_ping_timestamp + secs) {
709 CDEBUG(D_NET, "Check: %s\n",
710 libcfs_nid2str(rtr->lp_nid));
713 rtr_id.nid = rtr->lp_nid;
714 rtr->lp_ping_notsent = 1;
715 rtr->lp_ping_timestamp = now;
717 if (rtr->lp_ping_deadline == 0)
718 rtr->lp_ping_deadline =
719 now + router_ping_timeout;
723 LNetGet(LNET_NID_ANY, mdh, rtr_id,
724 LNET_RESERVED_PORTAL,
725 LNET_PROTO_PING_MATCHBITS, 0);
729 lnet_peer_decref_locked(rtr);
731 if (version != the_lnet.ln_routers_version) {
732 /* the routers list has changed */
739 /* Call cfs_pause() here always adds 1 to load average
740 * because kernel counts # active tasks as nr_running
741 * + nr_uninterruptible. */
742 cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE,
743 cfs_time_seconds(1));
746 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_STOPTHREAD);
747 the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKING;
749 rc = LNetMDUnlink(mdh);
752 /* The unlink event callback will signal final completion */
758 lnet_wait_known_routerstate(void)
761 struct list_head *entry;
768 list_for_each (entry, &the_lnet.ln_routers) {
769 rtr = list_entry(entry, lnet_peer_t, lp_rtr_list);
771 if (rtr->lp_alive_count == 0) {
782 cfs_pause(cfs_time_seconds(1));
787 lnet_router_checker_stop(void)
791 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING ||
792 the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
794 if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
797 the_lnet.ln_rc_state = LNET_RC_STATE_STOPTHREAD;
798 /* block until event callback signals exit */
799 mutex_down(&the_lnet.ln_rc_signal);
801 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKED);
803 rc = LNetEQFree(the_lnet.ln_rc_eqh);
806 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
810 lnet_router_checker_start(void)
814 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
816 if (check_routers_before_use &&
817 dead_router_check_interval <= 0) {
818 LCONSOLE_ERROR_MSG(0x10a, "'dead_router_check_interval' must be"
819 " set if 'check_routers_before_use' is set"
824 if (live_router_check_interval <= 0 &&
825 dead_router_check_interval <= 0)
828 init_mutex_locked(&the_lnet.ln_rc_signal);
830 /* EQ size doesn't matter; the callback is guaranteed to get every
832 rc = LNetEQAlloc(1, lnet_router_checker_event,
833 &the_lnet.ln_rc_eqh);
835 CERROR("Can't allocate EQ: %d\n", rc);
839 rc = (int)cfs_kernel_thread(lnet_router_checker, NULL, 0);
841 CERROR("Can't start router checker thread: %d\n", rc);
845 mutex_down(&the_lnet.ln_rc_signal); /* wait for checker to startup */
847 rc = the_lnet.ln_rc_state;
849 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
853 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
855 if (check_routers_before_use) {
856 /* Note that a helpful side-effect of pinging all known routers
857 * at startup is that it makes them drop stale connections they
858 * may have to a previous instance of me. */
859 lnet_wait_known_routerstate();
865 rc = LNetEQFree(the_lnet.ln_rc_eqh);
871 lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
873 int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
875 while (--npages >= 0)
876 cfs_free_page(rb->rb_kiov[npages].kiov_page);
882 lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp)
884 int npages = rbp->rbp_npages;
885 int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
890 LIBCFS_ALLOC(rb, sz);
896 for (i = 0; i < npages; i++) {
897 page = cfs_alloc_page(CFS_ALLOC_ZERO | CFS_ALLOC_STD);
900 cfs_free_page(rb->rb_kiov[i].kiov_page);
906 rb->rb_kiov[i].kiov_len = CFS_PAGE_SIZE;
907 rb->rb_kiov[i].kiov_offset = 0;
908 rb->rb_kiov[i].kiov_page = page;
915 lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
917 int npages = rbp->rbp_npages;
921 LASSERT (list_empty(&rbp->rbp_msgs));
922 LASSERT (rbp->rbp_credits == rbp->rbp_nbuffers);
924 while (!list_empty(&rbp->rbp_bufs)) {
925 LASSERT (rbp->rbp_credits > 0);
927 rb = list_entry(rbp->rbp_bufs.next,
928 lnet_rtrbuf_t, rb_list);
929 list_del(&rb->rb_list);
930 lnet_destroy_rtrbuf(rb, npages);
934 LASSERT (rbp->rbp_nbuffers == nbuffers);
935 LASSERT (rbp->rbp_credits == nbuffers);
937 rbp->rbp_nbuffers = rbp->rbp_credits = 0;
941 lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs)
946 if (rbp->rbp_nbuffers != 0) {
947 LASSERT (rbp->rbp_nbuffers == nbufs);
951 for (i = 0; i < nbufs; i++) {
952 rb = lnet_new_rtrbuf(rbp);
955 CERROR("Failed to allocate %d router bufs of %d pages\n",
956 nbufs, rbp->rbp_npages);
962 rbp->rbp_mincredits++;
963 list_add(&rb->rb_list, &rbp->rbp_bufs);
965 /* No allocation "under fire" */
966 /* Otherwise we'd need code to schedule blocked msgs etc */
967 LASSERT (!the_lnet.ln_routing);
970 LASSERT (rbp->rbp_credits == nbufs);
975 lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
977 CFS_INIT_LIST_HEAD(&rbp->rbp_msgs);
978 CFS_INIT_LIST_HEAD(&rbp->rbp_bufs);
980 rbp->rbp_npages = npages;
981 rbp->rbp_credits = 0;
982 rbp->rbp_mincredits = 0;
986 lnet_free_rtrpools(void)
988 lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[0]);
989 lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[1]);
990 lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[2]);
994 lnet_init_rtrpools(void)
997 int large_pages = (LNET_MTU + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
999 lnet_rtrpool_init(&the_lnet.ln_rtrpools[0], 0);
1000 lnet_rtrpool_init(&the_lnet.ln_rtrpools[1], small_pages);
1001 lnet_rtrpool_init(&the_lnet.ln_rtrpools[2], large_pages);
1006 lnet_alloc_rtrpools(int im_a_router)
1010 if (!strcmp(forwarding, "")) {
1011 /* not set either way */
1014 } else if (!strcmp(forwarding, "disabled")) {
1015 /* explicitly disabled */
1017 } else if (!strcmp(forwarding, "enabled")) {
1018 /* explicitly enabled */
1020 LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either "
1021 "'enabled' or 'disabled'\n");
1025 if (tiny_router_buffers <= 0) {
1026 LCONSOLE_ERROR_MSG(0x10c, "tiny_router_buffers=%d invalid when "
1027 "routing enabled\n", tiny_router_buffers);
1032 rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[0],
1033 tiny_router_buffers);
1037 if (small_router_buffers <= 0) {
1038 LCONSOLE_ERROR_MSG(0x10d, "small_router_buffers=%d invalid when"
1039 " routing enabled\n", small_router_buffers);
1044 rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[1],
1045 small_router_buffers);
1049 if (large_router_buffers <= 0) {
1050 LCONSOLE_ERROR_MSG(0x10e, "large_router_buffers=%d invalid when"
1051 " routing enabled\n", large_router_buffers);
1056 rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[2],
1057 large_router_buffers);
1062 the_lnet.ln_routing = 1;
1068 lnet_free_rtrpools();
1075 lnet_peers_start_down(void)
1081 lnet_peer_buffer_credits(lnet_ni_t *ni)
1087 lnet_router_checker_stop(void)
1093 lnet_router_checker_start(void)
1099 lnet_free_rtrpools (void)
1104 lnet_init_rtrpools (void)
1109 lnet_alloc_rtrpools (int im_a_arouter)