Whamcloud - gitweb
LU-2456 lnet: Dynamic LNet Configuration (DLC) dynamic routing 31/9831/8
authorAmir Shehata <amir.shehata@intel.com>
Fri, 28 Mar 2014 00:46:05 +0000 (17:46 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 15 Aug 2014 15:19:33 +0000 (15:19 +0000)
This is the second patch of a set of patches that enables DLC.

This patch adds the following features to LNET.  Currently these
features are not driven by user space.
- Enabling Routing on Demand.  The default number of router
  buffers are allocated.
- Disable Routing on demand. Unused router buffers are freed and
  used router buffers are freed when they are no longer in use.
  The following time routing is enabled the default router buffer
  values are used.  It has been decided that remembering the
  user set router buffer values should be remembered and re-set
  by user space scripts.
- Increase the number of router buffers on demand, by allocating
  new ones.
- Decrease the number of router buffers.  Exccess buffers are freed
  if they are not in use.  Otherwise they are freed once they are
  no longer in use.

Signed-off-by: Amir Shehata <amir.shehata@intel.com>
Change-Id: Id07d4ad424d8f5ba72475d4149380afe2ac54e77
Reviewed-on: http://review.whamcloud.com/9831
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: James Simmons <uja.ornl@gmail.com>
Reviewed-by: Doug Oucharek <doug.s.oucharek@intel.com>
Reviewed-by: Liang Zhen <liang.zhen@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/api-ni.c
lnet/lnet/lib-move.c
lnet/lnet/router.c

index ebdf5d9..122aeb4 100644 (file)
@@ -745,7 +745,11 @@ int lnet_get_route(int idx, __u32 *net, __u32 *hops,
 void lnet_proc_init(void);
 void lnet_proc_fini(void);
 int  lnet_rtrpools_alloc(int im_a_router);
-void lnet_rtrpools_free(void);
+void lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages);
+int  lnet_rtrpools_adjust(int tiny, int small, int large);
+int lnet_rtrpools_enable(void);
+void lnet_rtrpools_disable(void);
+void lnet_rtrpools_free(int keep_pools);
 lnet_remotenet_t *lnet_find_net_locked (__u32 net);
 
 int lnet_islocalnid(lnet_nid_t nid);
@@ -765,6 +769,8 @@ void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
 int lnet_send(lnet_nid_t nid, lnet_msg_t *msg, lnet_nid_t rtr_nid);
 void lnet_return_tx_credits_locked(lnet_msg_t *msg);
 void lnet_return_rx_credits_locked(lnet_msg_t *msg);
+void lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp);
+void lnet_drop_routed_msgs_locked(struct list_head *list, int cpt);
 
 /* portals functions */
 /* portals attributes */
index 8823d89..ddfc7f4 100644 (file)
@@ -450,6 +450,7 @@ typedef struct lnet_ni {
 #define LNET_PING_FEAT_INVAL           (0)             /* no feature */
 #define LNET_PING_FEAT_BASE            (1 << 0)        /* just a ping */
 #define LNET_PING_FEAT_NI_STATUS       (1 << 1)        /* return NI status */
+#define LNET_PING_FEAT_RTE_DISABLED    (1 << 2)        /* Routing enabled */
 
 #define LNET_PING_FEAT_MASK            (LNET_PING_FEAT_BASE | \
                                         LNET_PING_FEAT_NI_STATUS)
@@ -607,7 +608,12 @@ typedef struct {
 
 #define LNET_PEER_HASHSIZE   503                /* prime! */
 
-#define LNET_NRBPOOLS         3                 /* # different router buffer pools */
+#define LNET_TINY_BUF_IDX      0
+#define LNET_SMALL_BUF_IDX     1
+#define LNET_LARGE_BUF_IDX     2
+
+/* # different router buffer pools */
+#define LNET_NRBPOOLS          (LNET_LARGE_BUF_IDX + 1)
 
 enum {
        /* Didn't match anything */
index 3eb2ee7..53beecd 100644 (file)
@@ -834,7 +834,7 @@ lnet_unprepare (void)
 
        lnet_msg_containers_destroy();
        lnet_peer_tables_destroy();
-       lnet_rtrpools_free();
+       lnet_rtrpools_free(0);
 
        if (the_lnet.ln_counters != NULL) {
                cfs_percpt_free(the_lnet.ln_counters);
@@ -1755,19 +1755,21 @@ lnet_create_ping_info(void)
                 return -ENOMEM;
         }
 
-        pinfo->pi_nnis    = n;
-        pinfo->pi_pid     = the_lnet.ln_pid;
-        pinfo->pi_magic   = LNET_PROTO_PING_MAGIC;
+       pinfo->pi_nnis    = n;
+       pinfo->pi_pid     = the_lnet.ln_pid;
+       pinfo->pi_magic   = LNET_PROTO_PING_MAGIC;
        pinfo->pi_features = LNET_PING_FEAT_NI_STATUS;
+       if (!the_lnet.ln_routing)
+               pinfo->pi_features |= LNET_PING_FEAT_RTE_DISABLED;
 
-        for (i = 0; i < n; i++) {
-                lnet_ni_status_t *ns = &pinfo->pi_ni[i];
+       for (i = 0; i < n; i++) {
+               lnet_ni_status_t *ns = &pinfo->pi_ni[i];
 
-                rc = LNetGetId(i, &id);
-                LASSERT (rc == 0);
+               rc = LNetGetId(i, &id);
+               LASSERT(rc == 0);
 
-                ns->ns_nid    = id.nid;
-                ns->ns_status = LNET_NI_STATUS_UP;
+               ns->ns_nid    = id.nid;
+               ns->ns_status = LNET_NI_STATUS_UP;
 
                lnet_net_lock(0);
 
index 2cd3dad..7657ea8 100644 (file)
@@ -981,13 +981,10 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
        rbp = lnet_msg2bufpool(msg);
 
        if (!msg->msg_rtrcredit) {
-               LASSERT((rbp->rbp_credits < 0) ==
-                       !list_empty(&rbp->rbp_msgs));
-
-                msg->msg_rtrcredit = 1;
-                rbp->rbp_credits--;
-                if (rbp->rbp_credits < rbp->rbp_mincredits)
-                        rbp->rbp_mincredits = rbp->rbp_credits;
+               msg->msg_rtrcredit = 1;
+               rbp->rbp_credits--;
+               if (rbp->rbp_credits < rbp->rbp_mincredits)
+                       rbp->rbp_mincredits = rbp->rbp_credits;
 
                if (rbp->rbp_credits < 0) {
                        /* must have checked eager_recv before here */
@@ -1075,6 +1072,45 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
         }
 }
 
+#ifdef __KERNEL__
+void
+lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp)
+{
+       lnet_msg_t      *msg;
+
+       if (list_empty(&rbp->rbp_msgs))
+               return;
+       msg = list_entry(rbp->rbp_msgs.next,
+                        lnet_msg_t, msg_list);
+       list_del(&msg->msg_list);
+
+       (void)lnet_post_routed_recv_locked(msg, 1);
+}
+#endif
+
+void
+lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
+{
+       lnet_msg_t       *msg;
+       lnet_msg_t       *tmp;
+       struct list_head drop;
+
+       INIT_LIST_HEAD(&drop);
+
+       list_splice_init(list, &drop);
+
+       lnet_net_unlock(cpt);
+
+       list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
+               lnet_ni_recv(msg->msg_rxpeer->lp_ni, msg->msg_private, NULL,
+                            0, 0, 0, msg->msg_hdr.payload_length);
+               list_del_init(&msg->msg_list);
+               lnet_finalize(NULL, msg, -ECANCELED);
+       }
+
+       lnet_net_lock(cpt);
+}
+
 void
 lnet_return_rx_credits_locked(lnet_msg_t *msg)
 {
@@ -1082,39 +1118,49 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg)
 #ifdef __KERNEL__
        lnet_msg_t      *msg2;
 
-        if (msg->msg_rtrcredit) {
-                /* give back global router credits */
-                lnet_rtrbuf_t     *rb;
-                lnet_rtrbufpool_t *rbp;
+       if (msg->msg_rtrcredit) {
+               /* give back global router credits */
+               lnet_rtrbuf_t     *rb;
+               lnet_rtrbufpool_t *rbp;
 
-                /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
-                 * there until it gets one allocated, or aborts the wait
-                 * itself */
-                LASSERT (msg->msg_kiov != NULL);
+               /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
+                * there until it gets one allocated, or aborts the wait
+                * itself */
+               LASSERT(msg->msg_kiov != NULL);
 
                rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
-                rbp = rb->rb_pool;
-                LASSERT (rbp == lnet_msg2bufpool(msg));
+               rbp = rb->rb_pool;
 
-                msg->msg_kiov = NULL;
-                msg->msg_rtrcredit = 0;
+               msg->msg_kiov = NULL;
+               msg->msg_rtrcredit = 0;
 
-                LASSERT((rbp->rbp_credits < 0) ==
-                       !list_empty(&rbp->rbp_msgs));
-                LASSERT((rbp->rbp_credits > 0) ==
+               LASSERT(rbp == lnet_msg2bufpool(msg));
+
+               LASSERT((rbp->rbp_credits > 0) ==
                        !list_empty(&rbp->rbp_bufs));
 
-               list_add(&rb->rb_list, &rbp->rbp_bufs);
-                rbp->rbp_credits++;
-                if (rbp->rbp_credits <= 0) {
-                       msg2 = list_entry(rbp->rbp_msgs.next,
-                                         lnet_msg_t, msg_list);
-                       list_del(&msg2->msg_list);
+               /* If routing is now turned off, we just drop this buffer and
+                * don't bother trying to return credits.  */
+               if (!the_lnet.ln_routing) {
+                       lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
+                       goto routing_off;
+               }
 
-                       (void) lnet_post_routed_recv_locked(msg2, 1);
+               /* It is possible that a user has lowered the desired number of
+                * buffers in this pool.  Make sure we never put back
+                * more buffers than the stated number. */
+               if (rbp->rbp_credits >= rbp->rbp_nbuffers) {
+                       /* Discard this buffer so we don't have too many. */
+                       lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
+               } else {
+                       list_add(&rb->rb_list, &rbp->rbp_bufs);
+                       rbp->rbp_credits++;
+                       if (rbp->rbp_credits <= 0)
+                               lnet_schedule_blocked_locked(rbp);
                }
        }
 
+routing_off:
        if (msg->msg_peerrtrcredit) {
                /* give back peer router credits */
                msg->msg_peerrtrcredit = 0;
@@ -1123,7 +1169,13 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg)
                        !list_empty(&rxpeer->lp_rtrq));
 
                rxpeer->lp_rtrcredits++;
-               if (rxpeer->lp_rtrcredits <= 0) {
+
+               /* drop all messages which are queued to be routed on that
+                * peer. */
+               if (!the_lnet.ln_routing) {
+                       lnet_drop_routed_msgs_locked(&rxpeer->lp_rtrq,
+                                                    msg->msg_rx_cpt);
+               } else if (rxpeer->lp_rtrcredits <= 0) {
                        msg2 = list_entry(rxpeer->lp_rtrq.next,
                                          lnet_msg_t, msg_list);
                        list_del(&msg2->msg_list);
@@ -1132,13 +1184,13 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg)
                }
        }
 #else
-        LASSERT (!msg->msg_rtrcredit);
-        LASSERT (!msg->msg_peerrtrcredit);
+       LASSERT(!msg->msg_rtrcredit);
+       LASSERT(!msg->msg_peerrtrcredit);
 #endif
-        if (rxpeer != NULL) {
-                msg->msg_rxpeer = NULL;
-                lnet_peer_decref_locked(rxpeer);
-        }
+       if (rxpeer != NULL) {
+               msg->msg_rxpeer = NULL;
+               lnet_peer_decref_locked(rxpeer);
+       }
 }
 
 static int
@@ -1694,6 +1746,9 @@ lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
        int     rc = 0;
 
 #ifdef __KERNEL__
+       if (!the_lnet.ln_routing)
+               return -ECANCELED;
+
        if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
            lnet_msg2bufpool(msg)->rbp_credits <= 0) {
                if (ni->ni_lnd->lnd_eager_recv == NULL) {
@@ -1803,8 +1858,8 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
        int             cpt;
        int             for_me;
        struct lnet_msg *msg;
-        lnet_pid_t     dest_pid;
-        lnet_nid_t     dest_nid;
+       lnet_pid_t     dest_pid;
+       lnet_nid_t     dest_nid;
        lnet_nid_t     src_nid;
        __u32          payload_length;
        __u32          type;
@@ -1815,128 +1870,129 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
        src_nid = le64_to_cpu(hdr->src_nid);
        dest_nid = le64_to_cpu(hdr->dest_nid);
        dest_pid = le32_to_cpu(hdr->dest_pid);
-        payload_length = le32_to_cpu(hdr->payload_length);
+       payload_length = le32_to_cpu(hdr->payload_length);
 
-        for_me = (ni->ni_nid == dest_nid);
+       for_me = (ni->ni_nid == dest_nid);
        cpt = lnet_cpt_of_nid(from_nid);
 
-        switch (type) {
-        case LNET_MSG_ACK:
-        case LNET_MSG_GET:
-                if (payload_length > 0) {
-                        CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
-                               libcfs_nid2str(from_nid),
-                               libcfs_nid2str(src_nid),
-                               lnet_msgtyp2str(type), payload_length);
-                        return -EPROTO;
-                }
-                break;
-
-        case LNET_MSG_PUT:
-        case LNET_MSG_REPLY:
-                if (payload_length > (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
-                        CERROR("%s, src %s: bad %s payload %d "
-                               "(%d max expected)\n",
-                               libcfs_nid2str(from_nid),
-                               libcfs_nid2str(src_nid),
-                               lnet_msgtyp2str(type),
-                               payload_length,
-                               for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
-                        return -EPROTO;
-                }
-                break;
+       switch (type) {
+       case LNET_MSG_ACK:
+       case LNET_MSG_GET:
+               if (payload_length > 0) {
+                       CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
+                              libcfs_nid2str(from_nid),
+                              libcfs_nid2str(src_nid),
+                              lnet_msgtyp2str(type), payload_length);
+                       return -EPROTO;
+               }
+               break;
+
+       case LNET_MSG_PUT:
+       case LNET_MSG_REPLY:
+               if (payload_length >
+                   (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
+                       CERROR("%s, src %s: bad %s payload %d "
+                              "(%d max expected)\n",
+                              libcfs_nid2str(from_nid),
+                              libcfs_nid2str(src_nid),
+                              lnet_msgtyp2str(type),
+                              payload_length,
+                              for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
+                       return -EPROTO;
+               }
+               break;
 
-        default:
-                CERROR("%s, src %s: Bad message type 0x%x\n",
-                       libcfs_nid2str(from_nid),
-                       libcfs_nid2str(src_nid), type);
-                return -EPROTO;
-        }
+       default:
+               CERROR("%s, src %s: Bad message type 0x%x\n",
+                      libcfs_nid2str(from_nid),
+                      libcfs_nid2str(src_nid), type);
+               return -EPROTO;
+       }
 
        if (the_lnet.ln_routing &&
            ni->ni_last_alive != cfs_time_current_sec()) {
-               lnet_ni_lock(ni);
-
                /* NB: so far here is the only place to set NI status to "up */
+               lnet_ni_lock(ni);
                ni->ni_last_alive = cfs_time_current_sec();
                if (ni->ni_status != NULL &&
                    ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
                        ni->ni_status->ns_status = LNET_NI_STATUS_UP;
                lnet_ni_unlock(ni);
-        }
+       }
 
-        /* Regard a bad destination NID as a protocol error.  Senders should
-         * know what they're doing; if they don't they're misconfigured, buggy
-         * or malicious so we chop them off at the knees :) */
-
-        if (!for_me) {
-                if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
-                        /* should have gone direct */
-                        CERROR ("%s, src %s: Bad dest nid %s "
-                                "(should have been sent direct)\n",
-                                libcfs_nid2str(from_nid),
-                                libcfs_nid2str(src_nid),
-                                libcfs_nid2str(dest_nid));
-                        return -EPROTO;
-                }
+       /* Regard a bad destination NID as a protocol error.  Senders should
+        * know what they're doing; if they don't they're misconfigured, buggy
+        * or malicious so we chop them off at the knees :) */
 
-                if (lnet_islocalnid(dest_nid)) {
-                        /* dest is another local NI; sender should have used
-                         * this node's NID on its own network */
-                        CERROR ("%s, src %s: Bad dest nid %s "
-                                "(it's my nid but on a different network)\n",
-                                libcfs_nid2str(from_nid),
-                                libcfs_nid2str(src_nid),
-                                libcfs_nid2str(dest_nid));
-                        return -EPROTO;
-                }
+       if (!for_me) {
+               if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
+                       /* should have gone direct */
+                       CERROR("%s, src %s: Bad dest nid %s "
+                              "(should have been sent direct)\n",
+                               libcfs_nid2str(from_nid),
+                               libcfs_nid2str(src_nid),
+                               libcfs_nid2str(dest_nid));
+                       return -EPROTO;
+               }
 
-                if (rdma_req && type == LNET_MSG_GET) {
-                        CERROR ("%s, src %s: Bad optimized GET for %s "
-                                "(final destination must be me)\n",
-                                libcfs_nid2str(from_nid),
-                                libcfs_nid2str(src_nid),
-                                libcfs_nid2str(dest_nid));
-                        return -EPROTO;
-                }
+               if (lnet_islocalnid(dest_nid)) {
+                       /* dest is another local NI; sender should have used
+                        * this node's NID on its own network */
+                       CERROR("%s, src %s: Bad dest nid %s "
+                              "(it's my nid but on a different network)\n",
+                               libcfs_nid2str(from_nid),
+                               libcfs_nid2str(src_nid),
+                               libcfs_nid2str(dest_nid));
+                       return -EPROTO;
+               }
 
-                if (!the_lnet.ln_routing) {
-                        CERROR ("%s, src %s: Dropping message for %s "
-                                "(routing not enabled)\n",
-                                libcfs_nid2str(from_nid),
-                                libcfs_nid2str(src_nid),
-                                libcfs_nid2str(dest_nid));
-                        goto drop;
-                }
-        }
+               if (rdma_req && type == LNET_MSG_GET) {
+                       CERROR("%s, src %s: Bad optimized GET for %s "
+                              "(final destination must be me)\n",
+                               libcfs_nid2str(from_nid),
+                               libcfs_nid2str(src_nid),
+                               libcfs_nid2str(dest_nid));
+                       return -EPROTO;
+               }
 
-        /* Message looks OK; we're not going to return an error, so we MUST
-         * call back lnd_recv() come what may... */
+               if (!the_lnet.ln_routing) {
+                       CERROR("%s, src %s: Dropping message for %s "
+                              "(routing not enabled)\n",
+                               libcfs_nid2str(from_nid),
+                               libcfs_nid2str(src_nid),
+                               libcfs_nid2str(dest_nid));
+                       goto drop;
+               }
+       }
+
+       /* Message looks OK; we're not going to return an error, so we MUST
+        * call back lnd_recv() come what may... */
 
        if (!list_empty(&the_lnet.ln_test_peers) &&     /* normally we don't */
            fail_peer(src_nid, 0)) {                    /* shall we now? */
                CERROR("%s, src %s: Dropping %s to simulate failure\n",
                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
-               lnet_msgtyp2str(type));
+                      lnet_msgtyp2str(type));
                goto drop;
        }
 
-        msg = lnet_msg_alloc();
-        if (msg == NULL) {
-                CERROR("%s, src %s: Dropping %s (out of memory)\n",
-                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid), 
-                       lnet_msgtyp2str(type));
-                goto drop;
-        }
+       msg = lnet_msg_alloc();
+       if (msg == NULL) {
+               CERROR("%s, src %s: Dropping %s (out of memory)\n",
+                      libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
+                      lnet_msgtyp2str(type));
+               goto drop;
+       }
 
-        /* msg zeroed in lnet_msg_alloc; i.e. flags all clear, pointers NULL etc */
+       /* msg zeroed in lnet_msg_alloc; i.e. flags all clear,
+        * pointers NULL etc */
 
-        msg->msg_type = type;
-        msg->msg_private = private;
-        msg->msg_receiving = 1;
-        msg->msg_len = msg->msg_wanted = payload_length;
-        msg->msg_offset = 0;
-        msg->msg_hdr = *hdr;
+       msg->msg_type = type;
+       msg->msg_private = private;
+       msg->msg_receiving = 1;
+       msg->msg_len = msg->msg_wanted = payload_length;
+       msg->msg_offset = 0;
+       msg->msg_hdr = *hdr;
        /* for building message event */
        msg->msg_from = from_nid;
        if (!for_me) {
@@ -1984,29 +2040,29 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
 
        lnet_net_unlock(cpt);
 
-        switch (type) {
-        case LNET_MSG_ACK:
-                rc = lnet_parse_ack(ni, msg);
-                break;
-        case LNET_MSG_PUT:
-                rc = lnet_parse_put(ni, msg);
-                break;
-        case LNET_MSG_GET:
-                rc = lnet_parse_get(ni, msg, rdma_req);
-                break;
-        case LNET_MSG_REPLY:
-                rc = lnet_parse_reply(ni, msg);
-                break;
-        default:
-                LASSERT(0);
+       switch (type) {
+       case LNET_MSG_ACK:
+               rc = lnet_parse_ack(ni, msg);
+               break;
+       case LNET_MSG_PUT:
+               rc = lnet_parse_put(ni, msg);
+               break;
+       case LNET_MSG_GET:
+               rc = lnet_parse_get(ni, msg, rdma_req);
+               break;
+       case LNET_MSG_REPLY:
+               rc = lnet_parse_reply(ni, msg);
+               break;
+       default:
+               LASSERT(0);
                rc = -EPROTO;
-                goto free_drop;  /* prevent an unused label if !kernel */
-        }
+               goto free_drop;  /* prevent an unused label if !kernel */
+       }
 
-        if (rc == 0)
-                return 0;
+       if (rc == 0)
+               return 0;
 
-        LASSERT (rc == ENOENT);
+       LASSERT(rc == ENOENT);
 
  free_drop:
        LASSERT(msg->msg_md == NULL);
index e041a37..f7bb49c 100644 (file)
 #define LNET_NRB_TINY          (LNET_NRB_TINY_MIN * 4)
 #define LNET_NRB_SMALL_MIN     4096    /* min value for each CPT */
 #define LNET_NRB_SMALL         (LNET_NRB_SMALL_MIN * 4)
+#define LNET_NRB_SMALL_PAGES   1
 #define LNET_NRB_LARGE_MIN     256     /* min value for each CPT */
 #define LNET_NRB_LARGE         (LNET_NRB_LARGE_MIN * 4)
+#define LNET_NRB_LARGE_PAGES   ((LNET_MTU + PAGE_CACHE_SIZE - 1) >> \
+                                 PAGE_CACHE_SHIFT)
 
 static char *forwarding = "";
 CFS_MODULE_PARM(forwarding, "s", charp, 0444,
@@ -304,44 +307,44 @@ int
 lnet_add_route(__u32 net, unsigned int hops, lnet_nid_t gateway,
               unsigned int priority)
 {
-       struct list_head *e;
-       lnet_remotenet_t *rnet;
-       lnet_remotenet_t *rnet2;
-       lnet_route_t     *route;
-       lnet_ni_t        *ni;
-       int               add_route;
-       int               rc;
+       struct list_head        *e;
+       lnet_remotenet_t        *rnet;
+       lnet_remotenet_t        *rnet2;
+       lnet_route_t            *route;
+       lnet_ni_t               *ni;
+       int                     add_route;
+       int                     rc;
 
        CDEBUG(D_NET, "Add route: net %s hops %u priority %u gw %s\n",
               libcfs_net2str(net), hops, priority, libcfs_nid2str(gateway));
 
-        if (gateway == LNET_NID_ANY ||
-            LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
-            net == LNET_NIDNET(LNET_NID_ANY) ||
-            LNET_NETTYP(net) == LOLND ||
-            LNET_NIDNET(gateway) == net ||
-            hops < 1 || hops > 255)
-                return (-EINVAL);
-
-        if (lnet_islocalnet(net))               /* it's a local network */
-                return 0;                       /* ignore the route entry */
-
-        /* Assume net, route, all new */
-        LIBCFS_ALLOC(route, sizeof(*route));
-        LIBCFS_ALLOC(rnet, sizeof(*rnet));
-        if (route == NULL || rnet == NULL) {
-                CERROR("Out of memory creating route %s %d %s\n",
-                       libcfs_net2str(net), hops, libcfs_nid2str(gateway));
-                if (route != NULL)
-                        LIBCFS_FREE(route, sizeof(*route));
-                if (rnet != NULL)
-                        LIBCFS_FREE(rnet, sizeof(*rnet));
-                return -ENOMEM;
-        }
+       if (gateway == LNET_NID_ANY ||
+           LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
+           net == LNET_NIDNET(LNET_NID_ANY) ||
+           LNET_NETTYP(net) == LOLND ||
+           LNET_NIDNET(gateway) == net ||
+           hops < 1 || hops > 255)
+               return -EINVAL;
+
+       if (lnet_islocalnet(net))       /* it's a local network */
+               return 0;               /* ignore the route entry */
+
+       /* Assume net, route, all new */
+       LIBCFS_ALLOC(route, sizeof(*route));
+       LIBCFS_ALLOC(rnet, sizeof(*rnet));
+       if (route == NULL || rnet == NULL) {
+               CERROR("Out of memory creating route %s %d %s\n",
+                      libcfs_net2str(net), hops, libcfs_nid2str(gateway));
+               if (route != NULL)
+                       LIBCFS_FREE(route, sizeof(*route));
+               if (rnet != NULL)
+                       LIBCFS_FREE(rnet, sizeof(*rnet));
+               return -ENOMEM;
+       }
 
        INIT_LIST_HEAD(&rnet->lrn_routes);
-        rnet->lrn_net = net;
-        route->lr_hops = hops;
+       rnet->lrn_net = net;
+       route->lr_hops = hops;
        route->lr_net = net;
        route->lr_priority = priority;
 
@@ -354,17 +357,15 @@ lnet_add_route(__u32 net, unsigned int hops, lnet_nid_t gateway,
                LIBCFS_FREE(route, sizeof(*route));
                LIBCFS_FREE(rnet, sizeof(*rnet));
 
-               if (rc == -EHOSTUNREACH) { /* gateway is not on a local net */
-                       return 0;       /* ignore the route entry */
-               } else {
-                       CERROR("Error %d creating route %s %d %s\n", rc,
-                              libcfs_net2str(net), hops,
-                              libcfs_nid2str(gateway));
-               }
-                return rc;
-        }
+               if (rc == -EHOSTUNREACH) /* gateway is not on a local net. */
+                       return 0;        /* ignore the route entry */
+               CERROR("Error %d creating route %s %d %s\n", rc,
+                       libcfs_net2str(net), hops,
+                       libcfs_nid2str(gateway));
+               return rc;
+       }
 
-        LASSERT (!the_lnet.ln_shutdown);
+       LASSERT(!the_lnet.ln_shutdown);
 
        rnet2 = lnet_find_net_locked(net);
        if (rnet2 == NULL) {
@@ -378,16 +379,16 @@ lnet_add_route(__u32 net, unsigned int hops, lnet_nid_t gateway,
        list_for_each(e, &rnet2->lrn_routes) {
                lnet_route_t *route2 = list_entry(e, lnet_route_t, lr_list);
 
-                if (route2->lr_gateway == route->lr_gateway) {
-                        add_route = 0;
-                        break;
-                }
+               if (route2->lr_gateway == route->lr_gateway) {
+                       add_route = 0;
+                       break;
+               }
 
-                /* our lookups must be true */
-                LASSERT (route2->lr_gateway->lp_nid != gateway);
-        }
+               /* our lookups must be true */
+               LASSERT(route2->lr_gateway->lp_nid != gateway);
+       }
 
-        if (add_route) {
+       if (add_route) {
                lnet_peer_addref_locked(route->lr_gateway); /* +1 for notify */
                lnet_add_route_to_rnet(rnet2, route);
 
@@ -580,7 +581,9 @@ lnet_get_route(int idx, __u32 *net, __u32 *hops,
                                        *hops     = route->lr_hops;
                                        *priority = route->lr_priority;
                                        *gateway  = route->lr_gateway->lp_nid;
-                                       *alive    = route->lr_gateway->lp_alive;
+                                       *alive    =
+                                         route->lr_gateway->lp_alive &&
+                                          !route->lr_downis;
                                        lnet_net_unlock(cpt);
                                        return 0;
                                }
@@ -600,14 +603,14 @@ lnet_swap_pinginfo(lnet_ping_info_t *info)
 
        __swab32s(&info->pi_magic);
        __swab32s(&info->pi_features);
-        __swab32s(&info->pi_pid);
-        __swab32s(&info->pi_nnis);
-        for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
-                stat = &info->pi_ni[i];
-                __swab64s(&stat->ns_nid);
-                __swab32s(&stat->ns_status);
-        }
-        return;
+       __swab32s(&info->pi_pid);
+       __swab32s(&info->pi_nnis);
+       for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
+               stat = &info->pi_ni[i];
+               __swab64s(&stat->ns_nid);
+               __swab32s(&stat->ns_status);
+       }
+       return;
 }
 
 /**
@@ -619,7 +622,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
 {
        lnet_ping_info_t        *info = rcd->rcd_pinginfo;
        struct lnet_peer        *gw   = rcd->rcd_gateway;
-       lnet_route_t            *rtr;
+       lnet_route_t            *rte;
 
        if (!gw->lp_alive)
                return;
@@ -645,11 +648,16 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
        if ((gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS) == 0)
                return; /* can't carry NI status info */
 
-       list_for_each_entry(rtr, &gw->lp_routes, lr_gwlist) {
+       list_for_each_entry(rte, &gw->lp_routes, lr_gwlist) {
                int     down = 0;
                int     up = 0;
                int     i;
 
+               if ((gw->lp_ping_feats & LNET_PING_FEAT_RTE_DISABLED) != 0) {
+                       rte->lr_downis = 1;
+                       continue;
+               }
+
                for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
                        lnet_ni_status_t *stat = &info->pi_ni[i];
                        lnet_nid_t       nid = stat->ns_nid;
@@ -670,7 +678,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
                        }
 
                        if (stat->ns_status == LNET_NI_STATUS_UP) {
-                               if (LNET_NIDNET(nid) == rtr->lr_net) {
+                               if (LNET_NIDNET(nid) == rte->lr_net) {
                                        up = 1;
                                        break;
                                }
@@ -684,10 +692,10 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
                }
 
                if (up) { /* ignore downed NIs if NI for dest network is up */
-                       rtr->lr_downis = 0;
+                       rte->lr_downis = 0;
                        continue;
                }
-               rtr->lr_downis = down;
+               rte->lr_downis = down;
        }
 }
 
@@ -1300,11 +1308,11 @@ lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
 lnet_rtrbuf_t *
 lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp, int cpt)
 {
-        int            npages = rbp->rbp_npages;
-        int            sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
-        struct page   *page;
-        lnet_rtrbuf_t *rb;
-        int            i;
+       int            npages = rbp->rbp_npages;
+       int            sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
+       struct page   *page;
+       lnet_rtrbuf_t *rb;
+       int            i;
 
        LIBCFS_CPT_ALLOC(rb, lnet_cpt_table(), cpt, sz);
        if (rb == NULL)
@@ -1332,66 +1340,96 @@ lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp, int cpt)
 }
 
 void
-lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
+lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp, int cpt)
 {
-       int             npages = rbp->rbp_npages;
-       int             nbuffers = 0;
-       lnet_rtrbuf_t   *rb;
+       int              npages = rbp->rbp_npages;
+       lnet_rtrbuf_t    *rb;
+       struct list_head tmp;
 
        if (rbp->rbp_nbuffers == 0) /* not initialized or already freed */
                return;
 
-       LASSERT(list_empty(&rbp->rbp_msgs));
-       LASSERT(rbp->rbp_credits == rbp->rbp_nbuffers);
+       INIT_LIST_HEAD(&tmp);
 
-       while (!list_empty(&rbp->rbp_bufs)) {
-               LASSERT(rbp->rbp_credits > 0);
+       lnet_net_lock(cpt);
+       lnet_drop_routed_msgs_locked(&rbp->rbp_msgs, cpt);
+       list_splice_init(&rbp->rbp_bufs, &tmp);
+       rbp->rbp_nbuffers = rbp->rbp_credits = 0;
+       rbp->rbp_mincredits = 0;
+       lnet_net_unlock(cpt);
 
-               rb = list_entry(rbp->rbp_bufs.next,
-                               lnet_rtrbuf_t, rb_list);
+       /* Free buffers on the free list. */
+       while (!list_empty(&tmp)) {
+               rb = list_entry(tmp.next, lnet_rtrbuf_t, rb_list);
                list_del(&rb->rb_list);
                lnet_destroy_rtrbuf(rb, npages);
-               nbuffers++;
        }
-
-       LASSERT(rbp->rbp_nbuffers == nbuffers);
-       LASSERT(rbp->rbp_credits == nbuffers);
-
-       rbp->rbp_nbuffers = rbp->rbp_credits = 0;
 }
 
-int
-lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt)
+static int
+lnet_rtrpool_adjust_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt)
 {
-        lnet_rtrbuf_t *rb;
-        int            i;
+       struct list_head rb_list;
+       lnet_rtrbuf_t   *rb;
+       int             num_rb;
+       int             num_buffers = 0;
+       int             npages = rbp->rbp_npages;
 
-        if (rbp->rbp_nbuffers != 0) {
-                LASSERT (rbp->rbp_nbuffers == nbufs);
-                return 0;
-        }
+       /* If we are called for less buffers than already in the pool, we
+        * just lower the nbuffers number and excess buffers will be
+        * thrown away as they are returned to the free list.  Credits
+        * then get adjusted as well. */
+       if (nbufs <= rbp->rbp_nbuffers) {
+               lnet_net_lock(cpt);
+               rbp->rbp_nbuffers = nbufs;
+               lnet_net_unlock(cpt);
+               return 0;
+       }
 
-        for (i = 0; i < nbufs; i++) {
-               rb = lnet_new_rtrbuf(rbp, cpt);
+       INIT_LIST_HEAD(&rb_list);
 
-                if (rb == NULL) {
-                        CERROR("Failed to allocate %d router bufs of %d pages\n",
-                               nbufs, rbp->rbp_npages);
-                        return -ENOMEM;
-                }
+       /* allocate the buffers on a local list first.  If all buffers are
+        * allocated successfully then join this list to the rbp buffer
+        * list.  If not then free all allocated buffers. */
+       num_rb = rbp->rbp_nbuffers;
 
-               rbp->rbp_nbuffers++;
-               rbp->rbp_credits++;
-               rbp->rbp_mincredits++;
-               list_add(&rb->rb_list, &rbp->rbp_bufs);
+       while (num_rb < nbufs) {
+               rb = lnet_new_rtrbuf(rbp, cpt);
+               if (rb == NULL) {
+                       CERROR("Failed to allocate %d route bufs of %d pages\n",
+                              nbufs, npages);
+                       goto failed;
+               }
 
-                /* No allocation "under fire" */
-                /* Otherwise we'd need code to schedule blocked msgs etc */
-               LASSERT(!the_lnet.ln_routing);
+               list_add(&rb->rb_list, &rb_list);
+               num_buffers++;
+               num_rb++;
        }
 
-       LASSERT(rbp->rbp_credits == nbufs);
+       lnet_net_lock(cpt);
+
+       list_splice_tail(&rb_list, &rbp->rbp_bufs);
+       rbp->rbp_nbuffers += num_buffers;
+       rbp->rbp_credits += num_buffers;
+       rbp->rbp_mincredits = rbp->rbp_credits;
+       /* We need to schedule blocked msg using the newly
+        * added buffers. */
+       while (!list_empty(&rbp->rbp_bufs) &&
+              !list_empty(&rbp->rbp_msgs))
+               lnet_schedule_blocked_locked(rbp);
+
+       lnet_net_unlock(cpt);
+
        return 0;
+
+failed:
+       while (!list_empty(&rb_list)) {
+               rb = list_entry(rb_list.next, lnet_rtrbuf_t, rb_list);
+               list_del(&rb->rb_list);
+               lnet_destroy_rtrbuf(rb, npages);
+       }
+
+       return -ENOMEM;
 }
 
 void
@@ -1406,7 +1444,7 @@ lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
 }
 
 void
-lnet_rtrpools_free(void)
+lnet_rtrpools_free(int keep_pools)
 {
        lnet_rtrbufpool_t *rtrp;
        int               i;
@@ -1415,17 +1453,19 @@ lnet_rtrpools_free(void)
                return;
 
        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
-               lnet_rtrpool_free_bufs(&rtrp[0]);
-               lnet_rtrpool_free_bufs(&rtrp[1]);
-               lnet_rtrpool_free_bufs(&rtrp[2]);
+               lnet_rtrpool_free_bufs(&rtrp[LNET_TINY_BUF_IDX], i);
+               lnet_rtrpool_free_bufs(&rtrp[LNET_SMALL_BUF_IDX], i);
+               lnet_rtrpool_free_bufs(&rtrp[LNET_LARGE_BUF_IDX], i);
        }
 
-       cfs_percpt_free(the_lnet.ln_rtrpools);
-       the_lnet.ln_rtrpools = NULL;
+       if (!keep_pools) {
+               cfs_percpt_free(the_lnet.ln_rtrpools);
+               the_lnet.ln_rtrpools = NULL;
+       }
 }
 
 static int
-lnet_nrb_tiny_calculate(int npages)
+lnet_nrb_tiny_calculate(void)
 {
        int     nrbs = LNET_NRB_TINY;
 
@@ -1444,7 +1484,7 @@ lnet_nrb_tiny_calculate(int npages)
 }
 
 static int
-lnet_nrb_small_calculate(int npages)
+lnet_nrb_small_calculate(void)
 {
        int     nrbs = LNET_NRB_SMALL;
 
@@ -1463,7 +1503,7 @@ lnet_nrb_small_calculate(int npages)
 }
 
 static int
-lnet_nrb_large_calculate(int npages)
+lnet_nrb_large_calculate(void)
 {
        int     nrbs = LNET_NRB_LARGE;
 
@@ -1485,38 +1525,36 @@ int
 lnet_rtrpools_alloc(int im_a_router)
 {
        lnet_rtrbufpool_t *rtrp;
-       int     large_pages = (LNET_MTU + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
-       int     small_pages = 1;
        int     nrb_tiny;
        int     nrb_small;
        int     nrb_large;
        int     rc;
        int     i;
 
-        if (!strcmp(forwarding, "")) {
-                /* not set either way */
-                if (!im_a_router)
-                        return 0;
-        } else if (!strcmp(forwarding, "disabled")) {
-                /* explicitly disabled */
-                return 0;
-        } else if (!strcmp(forwarding, "enabled")) {
-                /* explicitly enabled */
-        } else {
-                LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either "
-                                   "'enabled' or 'disabled'\n");
-                return -EINVAL;
-        }
+       if (!strcmp(forwarding, "")) {
+               /* not set either way */
+               if (!im_a_router)
+                       return 0;
+       } else if (!strcmp(forwarding, "disabled")) {
+               /* explicitly disabled */
+               return 0;
+       } else if (!strcmp(forwarding, "enabled")) {
+               /* explicitly enabled */
+       } else {
+               LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either "
+                                  "'enabled' or 'disabled'\n");
+               return -EINVAL;
+       }
 
-       nrb_tiny = lnet_nrb_tiny_calculate(0);
+       nrb_tiny = lnet_nrb_tiny_calculate();
        if (nrb_tiny < 0)
                return -EINVAL;
 
-       nrb_small = lnet_nrb_small_calculate(small_pages);
+       nrb_small = lnet_nrb_small_calculate();
        if (nrb_small < 0)
                return -EINVAL;
 
-       nrb_large = lnet_nrb_large_calculate(large_pages);
+       nrb_large = lnet_nrb_large_calculate();
        if (nrb_large < 0)
                return -EINVAL;
 
@@ -1530,18 +1568,23 @@ lnet_rtrpools_alloc(int im_a_router)
        }
 
        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
-               lnet_rtrpool_init(&rtrp[0], 0);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[0], nrb_tiny, i);
+               lnet_rtrpool_init(&rtrp[LNET_TINY_BUF_IDX], 0);
+               rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
+                                             nrb_tiny, i);
                if (rc != 0)
                        goto failed;
 
-               lnet_rtrpool_init(&rtrp[1], small_pages);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[1], nrb_small, i);
+               lnet_rtrpool_init(&rtrp[LNET_SMALL_BUF_IDX],
+                                 LNET_NRB_SMALL_PAGES);
+               rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
+                                             nrb_small, i);
                if (rc != 0)
                        goto failed;
 
-               lnet_rtrpool_init(&rtrp[2], large_pages);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[2], nrb_large, i);
+               lnet_rtrpool_init(&rtrp[LNET_LARGE_BUF_IDX],
+                                 LNET_NRB_LARGE_PAGES);
+               rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
+                                             nrb_large, i);
                if (rc != 0)
                        goto failed;
        }
@@ -1549,15 +1592,111 @@ lnet_rtrpools_alloc(int im_a_router)
        lnet_net_lock(LNET_LOCK_EX);
        the_lnet.ln_routing = 1;
        lnet_net_unlock(LNET_LOCK_EX);
-
        return 0;
 
  failed:
-       lnet_rtrpools_free();
+       lnet_rtrpools_free(0);
        return rc;
 }
 
 int
+lnet_rtrpools_adjust(int tiny, int small, int large)
+{
+       int nrb = 0;
+       int rc = 0;
+       int i;
+       lnet_rtrbufpool_t *rtrp;
+
+       /* this function doesn't revert the changes if adding new buffers
+        * failed.  It's up to the user space caller to revert the
+        * changes. */
+
+       if (!the_lnet.ln_routing)
+               return 0;
+
+       /* If the provided values for each buffer pool are different than the
+        * configured values, we need to take action. */
+       if (tiny >= 0 && tiny != tiny_router_buffers) {
+               tiny_router_buffers = tiny;
+               nrb = lnet_nrb_tiny_calculate();
+               cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+                       rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
+                                                     nrb, i);
+                       if (rc != 0)
+                               return rc;
+               }
+       }
+       if (small >= 0 && small != small_router_buffers) {
+               small_router_buffers = small;
+               nrb = lnet_nrb_small_calculate();
+               cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+                       rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
+                                                     nrb, i);
+                       if (rc != 0)
+                               return rc;
+               }
+       }
+       if (large >= 0 && large != large_router_buffers) {
+               large_router_buffers = large;
+               nrb = lnet_nrb_large_calculate();
+               cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+                       rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
+                                                     nrb, i);
+                       if (rc != 0)
+                               return rc;
+               }
+       }
+
+       return 0;
+}
+
+int
+lnet_rtrpools_enable(void)
+{
+       int rc;
+
+       if (the_lnet.ln_routing)
+               return 0;
+
+       if (the_lnet.ln_rtrpools == NULL)
+               /* If routing is turned off, and we have never
+                * initialized the pools before, just call the
+                * standard buffer pool allocation routine as
+                * if we are just configuring this for the first
+                * time. */
+               return lnet_rtrpools_alloc(1);
+
+       rc = lnet_rtrpools_adjust(0, 0, 0);
+       if (rc != 0)
+               return rc;
+
+       lnet_net_lock(LNET_LOCK_EX);
+       the_lnet.ln_routing = 1;
+
+       the_lnet.ln_ping_info->pi_features &= ~LNET_PING_FEAT_RTE_DISABLED;
+       lnet_net_unlock(LNET_LOCK_EX);
+
+       return 0;
+}
+
+void
+lnet_rtrpools_disable(void)
+{
+       if (!the_lnet.ln_routing)
+               return;
+
+       lnet_net_lock(LNET_LOCK_EX);
+       the_lnet.ln_routing = 0;
+       the_lnet.ln_ping_info->pi_features |= LNET_PING_FEAT_RTE_DISABLED;
+
+       tiny_router_buffers = 0;
+       small_router_buffers = 0;
+       large_router_buffers = 0;
+       lnet_net_unlock(LNET_LOCK_EX);
+       lnet_rtrpools_free(1);
+}
+
+int
 lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
 {
        struct lnet_peer        *lp = NULL;
@@ -1747,7 +1886,7 @@ lnet_get_tunables (void)
 }
 
 void
-lnet_rtrpools_free(void)
+lnet_rtrpools_free(int keep_pools)
 {
 }