1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This file is part of Portals
7 * http://sourceforge.net/projects/sandiaportals/
9 * Portals is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Portals is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Portals; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #define DEBUG_SUBSYSTEM S_LNET
25 #include <lnet/lib-lnet.h>
27 #if defined(__KERNEL__) && defined(LNET_ROUTER)
29 static char *forwarding = "";
30 CFS_MODULE_PARM(forwarding, "s", charp, 0444,
31 "Explicitly enable/disable forwarding between networks");
33 static int tiny_router_buffers = 512;
34 CFS_MODULE_PARM(tiny_router_buffers, "i", int, 0444,
35 "# of 0 payload messages to buffer in the router");
36 static int small_router_buffers = 256;
37 CFS_MODULE_PARM(small_router_buffers, "i", int, 0444,
38 "# of small (1 page) messages to buffer in the router");
39 static int large_router_buffers = 32;
40 CFS_MODULE_PARM(large_router_buffers, "i", int, 0444,
41 "# of large messages to buffer in the router");
43 static int auto_down = 1;
44 CFS_MODULE_PARM(auto_down, "i", int, 0444,
45 "Automatically mark peers down on comms error");
47 static int check_routers_before_use = 0;
48 CFS_MODULE_PARM(check_routers_before_use, "i", int, 0444,
49 "Assume routers are down and ping them before use");
51 static int dead_router_check_interval = 0;
52 CFS_MODULE_PARM(dead_router_check_interval, "i", int, 0444,
53 "Seconds between dead router health checks (<= 0 to disable)");
55 static int live_router_check_interval = 0;
56 CFS_MODULE_PARM(live_router_check_interval, "i", int, 0444,
57 "Seconds between live router health checks (<= 0 to disable)");
59 static int router_ping_timeout = 50;
60 CFS_MODULE_PARM(router_ping_timeout, "i", int, 0444,
61 "Seconds to wait for the reply to a router health query");
65 work_struct_t kpru_tq;
72 kpr_do_upcall (void *arg)
74 kpr_upcall_t *u = (kpr_upcall_t *)arg;
84 u->kpru_alive ? "up" : "down",
88 snprintf (nidstr, sizeof(nidstr), "%s", libcfs_nid2str(u->kpru_nid));
89 snprintf (whenstr, sizeof(whenstr), "%ld", u->kpru_when);
91 libcfs_run_upcall (argv);
93 #endif /* __WINNT__ */
95 LIBCFS_FREE(u, sizeof(*u));
99 kpr_upcall (lnet_nid_t gw_nid, int alive, time_t when)
101 /* May be in arbitrary context */
104 LIBCFS_ALLOC_ATOMIC(u, sizeof(*u));
106 CERROR ("Upcall out of memory: nid %s %s\n",
107 libcfs_nid2str(gw_nid), alive ? "up" : "down");
111 u->kpru_nid = gw_nid;
112 u->kpru_alive = alive;
115 prepare_work (&u->kpru_tq, kpr_do_upcall, u);
116 schedule_work (&u->kpru_tq);
120 lnet_peers_start_down(void)
122 return check_routers_before_use;
126 lnet_notify_locked(lnet_peer_t *lp, int notifylnd, int alive, time_t when)
128 if (when < lp->lp_timestamp) { /* out of date information */
129 CDEBUG(D_NET, "Out of date\n");
133 lp->lp_timestamp = when; /* update timestamp */
134 lp->lp_ping_deadline = 0; /* disable ping timeout */
136 if (lp->lp_alive_count != 0 && /* got old news */
137 (!lp->lp_alive) == (!alive)) { /* new date for old news */
138 CDEBUG(D_NET, "Old news\n");
142 /* Flag that notification is outstanding */
144 lp->lp_alive_count++;
145 lp->lp_alive = !(!alive); /* 1 bit! */
147 lp->lp_notifylnd = notifylnd;
149 CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lp_nid), alive);
153 lnet_do_notify (lnet_peer_t *lp)
155 lnet_ni_t *ni = lp->lp_ni;
162 /* Notify only in 1 thread at any time to ensure ordered notification.
163 * NB individual events can be missed; the only guarantee is that you
164 * always get the most recent news */
166 if (lp->lp_notifying) {
171 lp->lp_notifying = 1;
173 while (lp->lp_notify) {
174 alive = lp->lp_alive;
175 when = lp->lp_timestamp;
176 lnd = lp->lp_notifylnd;
182 /* A new notification could happen now; I'll handle it when
183 * control returns to me */
186 CDEBUG(D_NET, "Upcall: NID %s is %s\n",
187 libcfs_nid2str(lp->lp_nid),
188 alive ? "alive" : "dead");
189 kpr_upcall(lp->lp_nid, alive, when);
191 if (ni->ni_lnd->lnd_notify != NULL)
192 (ni->ni_lnd->lnd_notify)(ni, lp->lp_nid, alive);
198 lp->lp_notifying = 0;
204 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, time_t when)
206 lnet_peer_t *lp = NULL;
207 time_t now = cfs_time_current_sec();
209 LASSERT (!in_interrupt ());
211 CDEBUG (D_NET, "%s notifying %s: %s\n",
212 (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
214 alive ? "up" : "down");
217 LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid)) {
218 CWARN ("Ignoring notification of %s %s by %s (different net)\n",
219 libcfs_nid2str(nid), alive ? "birth" : "death",
220 libcfs_nid2str(ni->ni_nid));
224 /* can't do predictions... */
226 CWARN ("Ignoring prediction from %s of %s %s "
227 "%ld seconds in the future\n",
228 (ni == NULL) ? "userspace" : libcfs_nid2str(ni->ni_nid),
229 libcfs_nid2str(nid), alive ? "up" : "down",
234 if (ni != NULL && !alive && /* LND telling me she's down */
235 !auto_down) { /* auto-down disabled */
236 CDEBUG(D_NET, "Auto-down disabled\n");
242 lp = lnet_find_peer_locked(nid);
246 CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
250 lnet_notify_locked(lp, ni == NULL, alive, when);
258 lnet_peer_decref_locked(lp);
263 EXPORT_SYMBOL(lnet_notify);
268 lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, time_t when)
276 lnet_rtr_addref_locked(lnet_peer_t *lp)
278 LASSERT (lp->lp_refcount > 0);
279 LASSERT (lp->lp_rtr_refcount >= 0);
281 lp->lp_rtr_refcount++;
282 if (lp->lp_rtr_refcount == 1) {
283 struct list_head *pos;
285 /* a simple insertion sort */
286 list_for_each_prev(pos, &the_lnet.ln_routers) {
287 lnet_peer_t *rtr = list_entry(pos, lnet_peer_t,
290 if (rtr->lp_nid < lp->lp_nid)
294 list_add(&lp->lp_rtr_list, pos);
295 /* addref for the_lnet.ln_routers */
296 lnet_peer_addref_locked(lp);
297 the_lnet.ln_routers_version++;
302 lnet_rtr_decref_locked(lnet_peer_t *lp)
304 LASSERT (lp->lp_refcount > 0);
305 LASSERT (lp->lp_rtr_refcount > 0);
307 lp->lp_rtr_refcount--;
308 if (lp->lp_rtr_refcount == 0) {
309 list_del(&lp->lp_rtr_list);
310 /* decref for the_lnet.ln_routers */
311 lnet_peer_decref_locked(lp);
312 the_lnet.ln_routers_version++;
317 lnet_find_net_locked (__u32 net)
319 lnet_remotenet_t *rnet;
320 struct list_head *tmp;
322 LASSERT (!the_lnet.ln_shutdown);
324 list_for_each (tmp, &the_lnet.ln_remote_nets) {
325 rnet = list_entry(tmp, lnet_remotenet_t, lrn_list);
327 if (rnet->lrn_net == net)
334 lnet_add_route (__u32 net, unsigned int hops, lnet_nid_t gateway)
336 struct list_head zombies;
338 lnet_remotenet_t *rnet;
339 lnet_remotenet_t *rnet2;
341 lnet_route_t *route2;
346 CDEBUG(D_NET, "Add route: net %s hops %u gw %s\n",
347 libcfs_net2str(net), hops, libcfs_nid2str(gateway));
349 if (gateway == LNET_NID_ANY ||
350 LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
351 net == LNET_NIDNET(LNET_NID_ANY) ||
352 LNET_NETTYP(net) == LOLND ||
353 LNET_NIDNET(gateway) == net ||
354 hops < 1 || hops > 255)
357 if (lnet_islocalnet(net)) /* it's a local network */
358 return 0; /* ignore the route entry */
360 /* Assume net, route, all new */
361 LIBCFS_ALLOC(route, sizeof(*route));
362 LIBCFS_ALLOC(rnet, sizeof(*rnet));
363 if (route == NULL || rnet == NULL) {
364 CERROR("Out of memory creating route %s %d %s\n",
365 libcfs_net2str(net), hops, libcfs_nid2str(gateway));
367 LIBCFS_FREE(route, sizeof(*route));
369 LIBCFS_FREE(rnet, sizeof(*rnet));
373 INIT_LIST_HEAD(&rnet->lrn_routes);
375 rnet->lrn_hops = hops;
379 rc = lnet_nid2peer_locked(&route->lr_gateway, gateway);
383 LIBCFS_FREE(route, sizeof(*route));
384 LIBCFS_FREE(rnet, sizeof(*rnet));
386 if (rc == -EHOSTUNREACH) /* gateway is not on a local net */
387 return 0; /* ignore the route entry */
389 CERROR("Error %d creating route %s %d %s\n", rc,
390 libcfs_net2str(net), hops, libcfs_nid2str(gateway));
394 LASSERT (!the_lnet.ln_shutdown);
395 CFS_INIT_LIST_HEAD(&zombies);
397 rnet2 = lnet_find_net_locked(net);
400 list_add_tail(&rnet->lrn_list, &the_lnet.ln_remote_nets);
404 if (hops > rnet2->lrn_hops) {
405 /* New route is longer; ignore it */
407 } else if (hops < rnet2->lrn_hops) {
408 /* new route supercedes all currently known routes to this
410 list_add(&zombies, &rnet2->lrn_routes);
411 list_del_init(&rnet2->lrn_routes);
415 /* New route has the same hopcount as existing routes; search
416 * for a duplicate route (it's a NOOP if it is) */
417 list_for_each (e, &rnet2->lrn_routes) {
418 route2 = list_entry(e, lnet_route_t, lr_list);
420 if (route2->lr_gateway == route->lr_gateway) {
425 /* our loopups must be true */
426 LASSERT (route2->lr_gateway->lp_nid != gateway);
431 ni = route->lr_gateway->lp_ni;
432 lnet_ni_addref_locked(ni);
435 list_add_tail(&route->lr_list, &rnet2->lrn_routes);
436 the_lnet.ln_remote_nets_version++;
438 lnet_rtr_addref_locked(route->lr_gateway);
442 /* XXX Assume alive */
443 if (ni->ni_lnd->lnd_notify != NULL)
444 (ni->ni_lnd->lnd_notify)(ni, gateway, 1);
448 lnet_peer_decref_locked(route->lr_gateway);
450 LIBCFS_FREE(route, sizeof(*route));
454 LIBCFS_FREE(rnet, sizeof(*rnet));
456 while (!list_empty(&zombies)) {
457 route = list_entry(zombies.next, lnet_route_t, lr_list);
458 list_del(&route->lr_list);
461 lnet_peer_decref_locked(route->lr_gateway);
463 LIBCFS_FREE(route, sizeof(*route));
470 lnet_check_routes (void)
472 lnet_remotenet_t *rnet;
474 lnet_route_t *route2;
475 struct list_head *e1;
476 struct list_head *e2;
480 list_for_each (e1, &the_lnet.ln_remote_nets) {
481 rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
484 list_for_each (e2, &rnet->lrn_routes) {
485 route = list_entry(e2, lnet_route_t, lr_list);
489 else if (route->lr_gateway->lp_ni !=
490 route2->lr_gateway->lp_ni) {
493 CERROR("Routes to %s via %s and %s not supported\n",
494 libcfs_net2str(rnet->lrn_net),
495 libcfs_nid2str(route->lr_gateway->lp_nid),
496 libcfs_nid2str(route2->lr_gateway->lp_nid));
507 lnet_del_route (__u32 net, lnet_nid_t gw_nid)
509 lnet_remotenet_t *rnet;
511 struct list_head *e1;
512 struct list_head *e2;
515 CDEBUG(D_NET, "Del route: net %s : gw %s\n",
516 libcfs_net2str(net), libcfs_nid2str(gw_nid));
518 /* NB Caller may specify either all routes via the given gateway
519 * or a specific route entry actual NIDs) */
524 list_for_each (e1, &the_lnet.ln_remote_nets) {
525 rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
527 if (!(net == LNET_NIDNET(LNET_NID_ANY) ||
528 net == rnet->lrn_net))
531 list_for_each (e2, &rnet->lrn_routes) {
532 route = list_entry(e2, lnet_route_t, lr_list);
534 if (!(gw_nid == LNET_NID_ANY ||
535 gw_nid == route->lr_gateway->lp_nid))
538 list_del(&route->lr_list);
539 the_lnet.ln_remote_nets_version++;
541 if (list_empty(&rnet->lrn_routes))
542 list_del(&rnet->lrn_list);
546 lnet_rtr_decref_locked(route->lr_gateway);
547 lnet_peer_decref_locked(route->lr_gateway);
550 LIBCFS_FREE(route, sizeof (*route));
553 LIBCFS_FREE(rnet, sizeof(*rnet));
565 lnet_destroy_routes (void)
567 lnet_del_route(LNET_NIDNET(LNET_NID_ANY), LNET_NID_ANY);
571 lnet_get_route (int idx, __u32 *net, __u32 *hops,
572 lnet_nid_t *gateway, __u32 *alive)
574 struct list_head *e1;
575 struct list_head *e2;
576 lnet_remotenet_t *rnet;
581 list_for_each (e1, &the_lnet.ln_remote_nets) {
582 rnet = list_entry(e1, lnet_remotenet_t, lrn_list);
584 list_for_each (e2, &rnet->lrn_routes) {
585 route = list_entry(e2, lnet_route_t, lr_list);
588 *net = rnet->lrn_net;
589 *hops = rnet->lrn_hops;
590 *gateway = route->lr_gateway->lp_nid;
591 *alive = route->lr_gateway->lp_alive;
602 #if defined(__KERNEL__) && defined(LNET_ROUTER)
604 lnet_router_checker_event (lnet_event_t *event)
606 /* CAVEAT EMPTOR: I'm called with LNET_LOCKed and I'm not allowed to
607 * drop it (that's how come I see _every_ event, even ones that would
612 if (event->unlinked) {
613 /* The router checker thread has unlinked the rc_md
615 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKING);
616 the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKED;
617 mutex_up(&the_lnet.ln_rc_signal);
621 LASSERT (event->type == LNET_EVENT_SEND ||
622 event->type == LNET_EVENT_REPLY);
624 nid = (event->type == LNET_EVENT_SEND) ?
625 event->target.nid : event->initiator.nid;
627 lp = lnet_find_peer_locked(nid);
629 /* router may have been removed */
630 CDEBUG(D_NET, "Router %s not found\n", libcfs_nid2str(nid));
634 if (event->type == LNET_EVENT_SEND) /* re-enable another ping */
635 lp->lp_ping_notsent = 0;
637 if (lnet_isrouter(lp) && /* ignore if no longer a router */
638 (event->status != 0 ||
639 event->type == LNET_EVENT_REPLY)) {
641 /* A successful REPLY means the router is up. If _any_ comms
642 * to the router fail I assume it's down (this will happen if
643 * we ping alive routers to try to detect router death before
644 * apps get burned). */
646 lnet_notify_locked(lp, 1, (event->status == 0),
647 cfs_time_current_sec());
649 /* The router checker will wake up very shortly and do the
650 * actual notification.
651 * XXX If 'lp' stops being a router before then, it will still
652 * have the notification pending!!! */
655 /* This decref will NOT drop LNET_LOCK (it had to have 1 ref when it
656 * was in the peer table and I've not dropped the lock, so no-one else
657 * can have reduced the refcount) */
658 LASSERT(lp->lp_refcount > 1);
660 lnet_peer_decref_locked(lp);
664 lnet_router_checker(void *arg)
666 static lnet_ping_info_t pinginfo;
669 lnet_handle_md_t mdh;
671 struct list_head *entry;
673 lnet_process_id_t rtr_id;
676 cfs_daemonize("router_checker");
679 rtr_id.pid = LUSTRE_SRV_LNET_PID;
681 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
683 rc = LNetMDBind((lnet_md_t){.start = &pinginfo,
684 .length = sizeof(pinginfo),
685 .threshold = LNET_MD_THRESH_INF,
686 .options = LNET_MD_TRUNCATE,
687 .eq_handle = the_lnet.ln_rc_eqh},
692 CERROR("Can't bind MD: %d\n", rc);
693 the_lnet.ln_rc_state = rc;
694 mutex_up(&the_lnet.ln_rc_signal);
700 the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
701 mutex_up(&the_lnet.ln_rc_signal); /* let my parent go */
703 while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
708 version = the_lnet.ln_routers_version;
710 list_for_each (entry, &the_lnet.ln_routers) {
711 rtr = list_entry(entry, lnet_peer_t, lp_rtr_list);
713 lnet_peer_addref_locked(rtr);
715 now = cfs_time_current_sec();
717 if (rtr->lp_ping_deadline != 0 && /* ping timed out? */
718 now > rtr->lp_ping_deadline)
719 lnet_notify_locked(rtr, 1, 0, now);
723 /* Run any outstanding notificiations */
727 secs = live_router_check_interval;
729 secs = dead_router_check_interval;
735 !rtr->lp_ping_notsent &&
736 now > rtr->lp_ping_timestamp + secs) {
737 CDEBUG(D_NET, "Check: %s\n",
738 libcfs_nid2str(rtr->lp_nid));
741 rtr_id.nid = rtr->lp_nid;
742 rtr->lp_ping_notsent = 1;
743 rtr->lp_ping_timestamp = now;
745 if (rtr->lp_ping_deadline == 0)
746 rtr->lp_ping_deadline =
747 now + router_ping_timeout;
751 LNetGet(LNET_NID_ANY, mdh, rtr_id,
752 LNET_RESERVED_PORTAL,
753 LNET_PROTO_PING_MATCHBITS, 0);
757 lnet_peer_decref_locked(rtr);
759 if (version != the_lnet.ln_routers_version) {
760 /* the routers list has changed */
767 /* Call cfs_pause() here always adds 1 to load average
768 * because kernel counts # active tasks as nr_running
769 * + nr_uninterruptible. */
770 set_current_state(CFS_TASK_INTERRUPTIBLE);
771 cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE,
772 cfs_time_seconds(1));
775 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_STOPTHREAD);
776 the_lnet.ln_rc_state = LNET_RC_STATE_UNLINKING;
778 rc = LNetMDUnlink(mdh);
781 /* The unlink event callback will signal final completion */
788 lnet_wait_known_routerstate(void)
791 struct list_head *entry;
798 list_for_each (entry, &the_lnet.ln_routers) {
799 rtr = list_entry(entry, lnet_peer_t, lp_rtr_list);
801 if (rtr->lp_alive_count == 0) {
812 cfs_pause(cfs_time_seconds(1));
817 lnet_router_checker_stop(void)
821 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING ||
822 the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
824 if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
827 the_lnet.ln_rc_state = LNET_RC_STATE_STOPTHREAD;
828 /* block until event callback signals exit */
829 mutex_down(&the_lnet.ln_rc_signal);
831 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_UNLINKED);
833 rc = LNetEQFree(the_lnet.ln_rc_eqh);
836 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
840 lnet_router_checker_start(void)
844 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
846 if (check_routers_before_use &&
847 dead_router_check_interval <= 0) {
848 LCONSOLE_ERROR("'dead_router_check_interval' must be set if "
849 "'check_routers_before_use' is set\n");
853 if (live_router_check_interval <= 0 &&
854 dead_router_check_interval <= 0)
857 init_mutex_locked(&the_lnet.ln_rc_signal);
859 /* EQ size doesn't matter; the callback is guaranteed to get every
861 rc = LNetEQAlloc(1, lnet_router_checker_event,
862 &the_lnet.ln_rc_eqh);
864 CERROR("Can't allocate EQ: %d\n", rc);
868 rc = (int)cfs_kernel_thread(lnet_router_checker, NULL, 0);
870 CERROR("Can't start router checker thread: %d\n", rc);
874 mutex_down(&the_lnet.ln_rc_signal); /* wait for checker to startup */
876 rc = the_lnet.ln_rc_state;
878 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
882 LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
884 if (check_routers_before_use) {
885 /* Note that a helpful side-effect of pinging all known routers
886 * at startup is that it makes them drop stale connections they
887 * may have to a previous instance of me. */
888 lnet_wait_known_routerstate();
894 rc = LNetEQFree(the_lnet.ln_rc_eqh);
900 lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
902 int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
904 while (--npages >= 0)
905 cfs_free_page(rb->rb_kiov[npages].kiov_page);
911 lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp)
913 int npages = rbp->rbp_npages;
914 int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
919 LIBCFS_ALLOC(rb, sz);
923 for (i = 0; i < npages; i++) {
924 page = cfs_alloc_page(CFS_ALLOC_ZERO | CFS_ALLOC_STD);
927 cfs_free_page(rb->rb_kiov[i].kiov_page);
933 rb->rb_kiov[i].kiov_len = CFS_PAGE_SIZE;
934 rb->rb_kiov[i].kiov_offset = 0;
935 rb->rb_kiov[i].kiov_page = page;
942 lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
944 int npages = rbp->rbp_npages;
948 LASSERT (list_empty(&rbp->rbp_msgs));
949 LASSERT (rbp->rbp_credits == rbp->rbp_nbuffers);
951 while (!list_empty(&rbp->rbp_bufs)) {
952 LASSERT (rbp->rbp_credits > 0);
954 rb = list_entry(rbp->rbp_bufs.next,
955 lnet_rtrbuf_t, rb_list);
956 list_del(&rb->rb_list);
957 lnet_destroy_rtrbuf(rb, npages);
961 LASSERT (rbp->rbp_nbuffers == nbuffers);
962 LASSERT (rbp->rbp_credits == nbuffers);
964 rbp->rbp_nbuffers = rbp->rbp_credits = 0;
968 lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs)
973 if (rbp->rbp_nbuffers != 0) {
974 LASSERT (rbp->rbp_nbuffers == nbufs);
978 for (i = 0; i < nbufs; i++) {
979 rb = lnet_new_rtrbuf(rbp);
982 CERROR("Failed to allocate %d router bufs of %d pages\n",
983 nbufs, rbp->rbp_npages);
989 rbp->rbp_mincredits++;
990 list_add(&rb->rb_list, &rbp->rbp_bufs);
992 /* No allocation "under fire" */
993 /* Otherwise we'd need code to schedule blocked msgs etc */
994 LASSERT (!the_lnet.ln_routing);
997 LASSERT (rbp->rbp_credits == nbufs);
1002 lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
1004 CFS_INIT_LIST_HEAD(&rbp->rbp_msgs);
1005 CFS_INIT_LIST_HEAD(&rbp->rbp_bufs);
1007 rbp->rbp_npages = npages;
1008 rbp->rbp_credits = 0;
1009 rbp->rbp_mincredits = 0;
1013 lnet_free_rtrpools(void)
1015 lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[0]);
1016 lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[1]);
1017 lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[2]);
1021 lnet_init_rtrpools(void)
1023 int small_pages = 1;
1024 int large_pages = (LNET_MTU + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
1026 lnet_rtrpool_init(&the_lnet.ln_rtrpools[0], 0);
1027 lnet_rtrpool_init(&the_lnet.ln_rtrpools[1], small_pages);
1028 lnet_rtrpool_init(&the_lnet.ln_rtrpools[2], large_pages);
1033 lnet_alloc_rtrpools(int im_a_router)
1037 if (!strcmp(forwarding, "")) {
1038 /* not set either way */
1041 } else if (!strcmp(forwarding, "disabled")) {
1042 /* explicitly disabled */
1044 } else if (!strcmp(forwarding, "enabled")) {
1045 /* explicitly enabled */
1047 LCONSOLE_ERROR("'forwarding' not set to either "
1048 "'enabled' or 'disabled'\n");
1052 if (tiny_router_buffers <= 0) {
1053 LCONSOLE_ERROR("tiny_router_buffers=%d invalid when "
1054 "routing enabled\n", tiny_router_buffers);
1059 rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[0],
1060 tiny_router_buffers);
1064 if (small_router_buffers <= 0) {
1065 LCONSOLE_ERROR("small_router_buffers=%d invalid when "
1066 "routing enabled\n", small_router_buffers);
1071 rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[1],
1072 small_router_buffers);
1076 if (large_router_buffers <= 0) {
1077 LCONSOLE_ERROR("large_router_buffers=%d invalid when "
1078 "routing enabled\n", large_router_buffers);
1083 rc = lnet_rtrpool_alloc_bufs(&the_lnet.ln_rtrpools[2],
1084 large_router_buffers);
1089 the_lnet.ln_routing = 1;
1095 lnet_free_rtrpools();
1102 lnet_peers_start_down(void)
1108 lnet_router_checker_stop(void)
1114 lnet_router_checker_start(void)
1120 lnet_free_rtrpools (void)
1125 lnet_init_rtrpools (void)
1130 lnet_alloc_rtrpools (int im_a_arouter)