Whamcloud - gitweb
LU-56 lnet: Partitioned LNet networks
authorLiang Zhen <liang@whamcloud.com>
Wed, 13 Jun 2012 12:37:39 +0000 (20:37 +0800)
committerOleg Drokin <green@whamcloud.com>
Wed, 27 Jun 2012 03:59:16 +0000 (23:59 -0400)
We have implemented partitioned LNet sources (MD/ME/EQ),
This patch created partitioned data for other LNet objects:
- Peer-tables
  Peers are hashed into peer-table on different partitions by NID
- NI refcount and message queue
  NI will have refcount and message queue for each partition
- counters for each partition

These objects are protected by percpt lock lnet_t::ln_net_lock,
which replaced the original LNET_LOCK

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: I7c8c1359aca04a7f859672ccd3268f0282505dd5
Reviewed-on: http://review.whamcloud.com/3113
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Doug Oucharek <doug@whamcloud.com>
Reviewed-by: Bobi Jam <bobijam@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
12 files changed:
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/acceptor.c
lnet/lnet/api-ni.c
lnet/lnet/config.c
lnet/lnet/lib-eq.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/lnet/module.c
lnet/lnet/peer.c
lnet/lnet/router.c
lnet/lnet/router_proc.c

index 1abeb33..cc991c2 100644 (file)
@@ -149,14 +149,38 @@ lnet_res_lock_current(void)
        return cpt;
 }
 
+static inline void
+lnet_net_lock(int cpt)
+{
+       cfs_percpt_lock(the_lnet.ln_net_lock, cpt);
+}
+
+static inline void
+lnet_net_unlock(int cpt)
+{
+       cfs_percpt_unlock(the_lnet.ln_net_lock, cpt);
+}
+
+static inline int
+lnet_net_lock_current(void)
+{
+       int cpt = lnet_cpt_current();
+
+       lnet_net_lock(cpt);
+       return cpt;
+}
+
+#define LNET_LOCK()            lnet_net_lock(LNET_LOCK_EX)
+#define LNET_UNLOCK()          lnet_net_unlock(LNET_LOCK_EX)
+
 #ifdef __KERNEL__
 
 #define lnet_ptl_lock(ptl)     cfs_spin_lock(&(ptl)->ptl_lock)
 #define lnet_ptl_unlock(ptl)   cfs_spin_unlock(&(ptl)->ptl_lock)
 #define lnet_eq_wait_lock()    cfs_spin_lock(&the_lnet.ln_eq_wait_lock)
 #define lnet_eq_wait_unlock()  cfs_spin_unlock(&the_lnet.ln_eq_wait_lock)
-#define LNET_LOCK()            cfs_spin_lock(&the_lnet.ln_lock)
-#define LNET_UNLOCK()          cfs_spin_unlock(&the_lnet.ln_lock)
+#define lnet_ni_lock(ni)       cfs_spin_lock(&(ni)->ni_lock)
+#define lnet_ni_unlock(ni)     cfs_spin_unlock(&(ni)->ni_lock)
 #define LNET_MUTEX_LOCK(m)     cfs_mutex_lock(m)
 #define LNET_MUTEX_UNLOCK(m)   cfs_mutex_unlock(m)
 
@@ -175,8 +199,6 @@ do {                                            \
         (l) = 0;                                \
 } while (0)
 
-#define LNET_LOCK()            LNET_SINGLE_THREADED_LOCK(the_lnet.ln_lock)
-#define LNET_UNLOCK()          LNET_SINGLE_THREADED_UNLOCK(the_lnet.ln_lock)
 #define LNET_MUTEX_LOCK(m)     LNET_SINGLE_THREADED_LOCK(*(m))
 #define LNET_MUTEX_UNLOCK(m)   LNET_SINGLE_THREADED_UNLOCK(*(m))
 
@@ -190,10 +212,13 @@ do {                                            \
 #define lnet_eq_wait_unlock()                  \
        LNET_SINGLE_THREADED_UNLOCK(the_lnet.ln_eq_wait_lock)
 
+#define lnet_ni_lock(ni)                       \
+       LNET_SINGLE_THREADED_LOCK((ni)->ni_lock)
+#define lnet_ni_unlock(ni)                     \
+       LNET_SINGLE_THREADED_UNLOCK((ni)->ni_lock)
+
 # else /* HAVE_LIBPTHREAD */
 
-#define LNET_LOCK()            pthread_mutex_lock(&the_lnet.ln_lock)
-#define LNET_UNLOCK()          pthread_mutex_unlock(&the_lnet.ln_lock)
 #define LNET_MUTEX_LOCK(m)     pthread_mutex_lock(m)
 #define LNET_MUTEX_UNLOCK(m)   pthread_mutex_unlock(m)
 
@@ -203,6 +228,9 @@ do {                                            \
 #define lnet_eq_wait_lock()    pthread_mutex_lock(&the_lnet.ln_eq_wait_lock)
 #define lnet_eq_wait_unlock()  pthread_mutex_unlock(&the_lnet.ln_eq_wait_lock)
 
+#define lnet_ni_lock(ni)       pthread_mutex_lock(&(ni)->ni_lock)
+#define lnet_ni_unlock(ni)     pthread_mutex_unlock(&(ni)->ni_lock)
+
 # endif /* HAVE_LIBPTHREAD */
 #endif /* __KERNEL__ */
 
@@ -353,12 +381,14 @@ static inline lnet_msg_t *
 lnet_msg_alloc (void)
 {
        /* NEVER called with network lock held */
-       struct lnet_msg_container *msc = &the_lnet.ln_msg_container;
+       struct lnet_msg_container *msc = the_lnet.ln_msg_containers[0];
        lnet_msg_t                *msg;
 
-       LNET_LOCK();
+       LASSERT(LNET_CPT_NUMBER == 1);
+
+       lnet_net_lock(0);
        msg = (lnet_msg_t *)lnet_freelist_alloc(&msc->msc_freelist);
-       LNET_UNLOCK();
+       lnet_net_unlock(0);
 
        if (msg != NULL) {
                /* NULL pointers, clear flags etc */
@@ -374,8 +404,9 @@ static inline void
 lnet_msg_free_locked(lnet_msg_t *msg)
 {
        /* ALWAYS called with network lock held */
-       struct lnet_msg_container *msc = &the_lnet.ln_msg_container;
+       struct lnet_msg_container *msc = the_lnet.ln_msg_containers[0];
 
+       LASSERT(LNET_CPT_NUMBER == 1);
        LASSERT(!msg->msg_onactivelist);
        lnet_freelist_free(&msc->msc_freelist, msg);
 }
@@ -383,9 +414,9 @@ lnet_msg_free_locked(lnet_msg_t *msg)
 static inline void
 lnet_msg_free (lnet_msg_t *msg)
 {
-       LNET_LOCK();
+       lnet_net_lock(0);
        lnet_msg_free_locked(msg);
-       LNET_UNLOCK();
+       lnet_net_unlock(0);
 }
 
 #else /* !LNET_USE_LIB_FREELIST */
@@ -624,37 +655,41 @@ lnet_isrouter(lnet_peer_t *lp)
 }
 
 static inline void
-lnet_ni_addref_locked(lnet_ni_t *ni)
+lnet_ni_addref_locked(lnet_ni_t *ni, int cpt)
 {
-        LASSERT (ni->ni_refcount > 0);
-        ni->ni_refcount++;
+       LASSERT(cpt >= 0 && cpt < LNET_CPT_NUMBER);
+       LASSERT(*ni->ni_refs[cpt] >= 0);
+
+       (*ni->ni_refs[cpt])++;
 }
 
 static inline void
 lnet_ni_addref(lnet_ni_t *ni)
 {
-        LNET_LOCK();
-        lnet_ni_addref_locked(ni);
-        LNET_UNLOCK();
+       lnet_net_lock(0);
+       lnet_ni_addref_locked(ni, 0);
+       lnet_net_unlock(0);
 }
 
 static inline void
-lnet_ni_decref_locked(lnet_ni_t *ni)
+lnet_ni_decref_locked(lnet_ni_t *ni, int cpt)
 {
-        LASSERT (ni->ni_refcount > 0);
-        ni->ni_refcount--;
-        if (ni->ni_refcount == 0)
-                cfs_list_add_tail(&ni->ni_list, &the_lnet.ln_zombie_nis);
+       LASSERT(cpt >= 0 && cpt < LNET_CPT_NUMBER);
+       LASSERT(*ni->ni_refs[cpt] > 0);
+
+       (*ni->ni_refs[cpt])--;
 }
 
 static inline void
 lnet_ni_decref(lnet_ni_t *ni)
 {
-        LNET_LOCK();
-        lnet_ni_decref_locked(ni);
-        LNET_UNLOCK();
+       lnet_net_lock(0);
+       lnet_ni_decref_locked(ni, 0);
+       lnet_net_unlock(0);
 }
 
+void lnet_ni_free(lnet_ni_t *ni);
+
 static inline int
 lnet_nid2peerhash(lnet_nid_t nid)
 {
@@ -692,18 +727,18 @@ lnet_set_msg_uid(lnet_ni_t *ni, lnet_msg_t *msg, lnet_uid_t uid)
 #endif
 
 extern int lnet_cpt_of_nid(lnet_nid_t nid);
-extern lnet_ni_t *lnet_nid2ni_locked (lnet_nid_t nid);
-extern lnet_ni_t *lnet_net2ni_locked (__u32 net);
+extern lnet_ni_t *lnet_nid2ni_locked(lnet_nid_t nid, int cpt);
+extern lnet_ni_t *lnet_net2ni_locked(__u32 net, int cpt);
 static inline lnet_ni_t *
-lnet_net2ni (__u32 net)
+lnet_net2ni(__u32 net)
 {
-        lnet_ni_t *ni;
+       lnet_ni_t *ni;
 
-        LNET_LOCK();
-        ni = lnet_net2ni_locked(net);
-        LNET_UNLOCK();
+       lnet_net_lock(0);
+       ni = lnet_net2ni_locked(net, 0);
+       lnet_net_unlock(0);
 
-        return ni;
+       return ni;
 }
 
 int lnet_notify(lnet_ni_t *ni, lnet_nid_t peer, int alive, cfs_time_t when);
@@ -728,13 +763,13 @@ void lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
 void lnet_msg_detach_md(lnet_msg_t *msg, int status);
 void lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev);
 void lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type);
-void lnet_msg_commit(lnet_msg_t *msg, int sending);
-void lnet_msg_decommit(lnet_msg_t *msg, int status);
+void lnet_msg_commit(lnet_msg_t *msg, int cpt);
+void lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status);
 
 void lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev);
 void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
                     unsigned int offset, unsigned int len);
-int lnet_send(lnet_nid_t nid, lnet_msg_t *msg);
+int lnet_send(lnet_nid_t nid, lnet_msg_t *msg, lnet_nid_t rtr_nid);
 void lnet_return_tx_credits_locked(lnet_msg_t *msg);
 void lnet_return_rx_credits_locked(lnet_msg_t *msg);
 
@@ -803,8 +838,10 @@ void lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int rc);
 void lnet_drop_delayed_msg_list(cfs_list_t *head, char *reason);
 void lnet_recv_delayed_msg_list(cfs_list_t *head);
 
-int lnet_msg_container_setup(struct lnet_msg_container *container);
+int lnet_msg_container_setup(struct lnet_msg_container *container, int cpt);
 void lnet_msg_container_cleanup(struct lnet_msg_container *container);
+void lnet_msg_containers_destroy(void);
+int lnet_msg_containers_create(void);
 
 char *lnet_msgtyp2str (int type);
 void lnet_print_hdr (lnet_hdr_t * hdr);
@@ -926,11 +963,12 @@ int lnet_parse_ip2nets (char **networksp, char *ip2nets);
 int lnet_parse_routes (char *route_str, int *im_a_router);
 int lnet_parse_networks (cfs_list_t *nilist, char *networks);
 
-int lnet_nid2peer_locked(lnet_peer_t **lpp, lnet_nid_t nid);
-lnet_peer_t *lnet_find_peer_locked (lnet_nid_t nid);
-void lnet_peer_table_cleanup(void);
-void lnet_peer_table_destroy(void);
-int lnet_peer_table_create(void);
+int lnet_nid2peer_locked(lnet_peer_t **lpp, lnet_nid_t nid, int cpt);
+lnet_peer_t *lnet_find_peer_locked(struct lnet_peer_table *ptable,
+                                  lnet_nid_t nid);
+void lnet_peer_tables_cleanup(void);
+void lnet_peer_tables_destroy(void);
+int lnet_peer_tables_create(void);
 void lnet_debug_peer(lnet_nid_t nid);
 
 #ifndef __KERNEL__
index 4ba4a40..22ef8f7 100644 (file)
@@ -193,10 +193,14 @@ typedef struct lnet_msg {
 
        /* commited for sending */
        unsigned int            msg_tx_committed:1;
-       /* queued for tx credit */
-       unsigned int            msg_tx_delayed:1;
+       /* CPT # this message committed for sending */
+       unsigned int            msg_tx_cpt:15;
        /* commited for receiving */
        unsigned int            msg_rx_committed:1;
+       /* CPT # this message committed for receiving */
+       unsigned int            msg_rx_cpt:15;
+       /* queued for tx credit */
+       unsigned int            msg_tx_delayed:1;
        /* queued for RX buffer */
        unsigned int            msg_rx_delayed:1;
        /* ready for pending on RX delay list */
@@ -398,23 +402,39 @@ typedef struct {
         __u32      ns_unused;
 } WIRE_ATTR lnet_ni_status_t;
 
+struct lnet_tx_queue {
+       int                     tq_credits;     /* # tx credits free */
+       int                     tq_credits_min; /* lowest it's been */
+       int                     tq_credits_max; /* total # tx credits */
+       cfs_list_t              tq_delayed;     /* delayed TXs */
+};
+
 #define LNET_MAX_INTERFACES   16
 
 typedef struct lnet_ni {
-        cfs_list_t        ni_list;              /* chain on ln_nis */
-        cfs_list_t        ni_txq;               /* messages waiting for tx credits */
-        int               ni_maxtxcredits;      /* # tx credits  */
-        int               ni_txcredits;         /* # tx credits free */
-        int               ni_mintxcredits;      /* lowest it's been */
-        int               ni_peertxcredits;     /* # per-peer send credits */
-        int               ni_peerrtrcredits;    /* # per-peer router buffer credits */
-        int               ni_peertimeout;       /* seconds to consider peer dead */
-        lnet_nid_t        ni_nid;               /* interface's NID */
-        void             *ni_data;              /* instance-specific data */
-        lnd_t            *ni_lnd;               /* procedural interface */
-        int               ni_refcount;          /* reference count */
-       /* when I was last alive */
-       long                    ni_last_alive;
+#ifdef __KERNEL__
+       cfs_spinlock_t          ni_lock;
+#else
+# ifndef HAVE_LIBPTHREAD
+       int                     ni_lock;
+# else
+       pthread_mutex_t         ni_lock;
+# endif
+#endif
+       cfs_list_t              ni_list;        /* chain on ln_nis */
+       int                     ni_maxtxcredits; /* # tx credits  */
+       /* # per-peer send credits */
+       int                     ni_peertxcredits;
+       /* # per-peer router buffer credits */
+       int                     ni_peerrtrcredits;
+       /* seconds to consider peer dead */
+       int                     ni_peertimeout;
+       lnet_nid_t              ni_nid;         /* interface's NID */
+       void                    *ni_data;       /* instance-specific data */
+       lnd_t                   *ni_lnd;        /* procedural interface */
+       struct lnet_tx_queue    **ni_tx_queues; /* percpt TX queues */
+       int                     **ni_refs;      /* percpt reference count */
+       long                    ni_last_alive;  /* when I was last alive */
        lnet_ni_status_t        *ni_status;     /* my health status */
        /* equivalent interfaces to use */
        char                    *ni_interfaces[LNET_MAX_INTERFACES];
@@ -470,7 +490,9 @@ typedef struct lnet_peer {
         lnet_ni_t        *lp_ni;                /* interface peer is on */
         lnet_nid_t        lp_nid;               /* peer's NID */
         int               lp_refcount;          /* # refs */
-        int               lp_rtr_refcount;      /* # refs from lnet_route_t::lr_gateway */
+       int                     lp_cpt;         /* CPT this peer attached on */
+       /* # refs from lnet_route_t::lr_gateway */
+       int                     lp_rtr_refcount;
        /* returned RC ping version */
        unsigned int            lp_ping_version;
        cfs_list_t              lp_routes;      /* routers on this peer */
@@ -497,6 +519,7 @@ typedef struct {
        cfs_list_t              lr_gwlist;      /* chain on gateway */
        lnet_peer_t             *lr_gateway;    /* router node */
        __u32                   lr_net;         /* remote network number */
+       int                     lr_seq;         /* sequence for round-robin */
        unsigned int            lr_downis;      /* number of down NIs */
        unsigned int            lr_hops;        /* how far I am */
 } lnet_route_t;
@@ -656,90 +679,61 @@ typedef struct
        unsigned int                    ln_cpt_number;
        unsigned int                    ln_cpt_bits;
 
+       /* protect LNet resources (ME/MD/EQ) */
+       struct cfs_percpt_lock          *ln_res_lock;
+       /* # portals */
+       int                             ln_nportals;
+       /* the vector of portals */
+       lnet_portal_t                   **ln_portals;
+       /* ME container  */
+       struct lnet_res_container       **ln_me_containers;
+       /* MD container  */
+       struct lnet_res_container       **ln_md_containers;
+
+       /* Event Queue container */
+       struct lnet_res_container       ln_eq_container;
 #ifdef __KERNEL__
-       cfs_spinlock_t                  ln_lock;
-       cfs_mutex_t                     ln_api_mutex;
-       cfs_mutex_t                     ln_lnd_mutex;
        cfs_waitq_t                     ln_eq_waitq;
        cfs_spinlock_t                  ln_eq_wait_lock;
 #else
 # ifndef HAVE_LIBPTHREAD
-       int                             ln_lock;
-       int                             ln_api_mutex;
-       int                             ln_lnd_mutex;
        int                             ln_eq_wait_lock;
 # else
-       pthread_mutex_t                 ln_lock;
-       pthread_mutex_t                 ln_api_mutex;
-       pthread_mutex_t                 ln_lnd_mutex;
        pthread_cond_t                  ln_eq_cond;
        pthread_mutex_t                 ln_eq_wait_lock;
 # endif
 #endif
-       struct cfs_percpt_lock          *ln_res_lock;
-       /* ME container  */
-       struct lnet_res_container       **ln_me_containers;
-       /* MD container  */
-       struct lnet_res_container       **ln_md_containers;
-       /* Event Queue container */
-       struct lnet_res_container       ln_eq_container;
-
-       /* # portals */
-       int                             ln_nportals;
-       /* the vector of portals */
-       lnet_portal_t                   **ln_portals;
-
-       int                             ln_init;        /* LNetInit() called? */
-       /* LNetNIInit/LNetNIFini counter */
-       int                             ln_refcount;
-       /* Have I called LNetNIInit myself? */
-       int                             ln_niinit_self;
-       /* shutdown in progress */
-       int                             ln_shutdown;
-       /* registered LNDs */
-       cfs_list_t                      ln_lnds;
-
-        lnet_pid_t             ln_pid;              /* requested pid */
-
-        cfs_list_t             ln_nis;              /* LND instances */
-        lnet_ni_t             *ln_loni;             /* the loopback NI */
+       /* protect NI, peer table, credits, routers, rtrbuf... */
+       struct cfs_percpt_lock          *ln_net_lock;
+       /* message container for active/finalizing/freed message */
+       struct lnet_msg_container       **ln_msg_containers;
+       lnet_counters_t                 **ln_counters;
+       struct lnet_peer_table          **ln_peer_tables;
+       /* failure simulation */
+       cfs_list_t                      ln_test_peers;
+
+       cfs_list_t                      ln_nis;         /* LND instances */
+       /* dying LND instances */
+       cfs_list_t                      ln_nis_zombie;
+       lnet_ni_t                       *ln_loni;       /* the loopback NI */
        /* NI to wait for events in */
        lnet_ni_t                       *ln_eq_waitni;
 
-        cfs_list_t             ln_zombie_nis;       /* dying LND instances */
-        int                    ln_nzombie_nis;      /* # of NIs to wait for */
-
-        cfs_list_t             ln_remote_nets;      /* remote networks with routes to them */
-        __u64                  ln_remote_nets_version; /* validity stamp */
-
-        cfs_list_t             ln_routers;       /* list of all known routers */
-        __u64                  ln_routers_version;  /* validity stamp */
-
-        int                    ln_routing;          /* am I a router? */
+       /* remote networks with routes to them */
+       cfs_list_t                      ln_remote_nets;
+       /* validity stamp */
+       __u64                           ln_remote_nets_version;
+       /* list of all known routers */
+       cfs_list_t                      ln_routers;
+       /* validity stamp */
+       __u64                           ln_routers_version;
        /* router buffer pools */
-       lnet_rtrbufpool_t               *ln_rtrpools;
-
-        __u64                  ln_interface_cookie; /* uniquely identifies this ni in this epoch */
-
-        char                  *ln_network_tokens;   /* space for network names */
-        int                    ln_network_tokens_nob;
-
-        int                    ln_testprotocompat;  /* test protocol compatibility flags */
-
-        cfs_list_t             ln_test_peers;       /* failure simulation */
-
-       /* message container */
-       struct lnet_msg_container       ln_msg_container;
-       struct lnet_peer_table          *ln_peer_table;
-       lnet_counters_t                 *ln_counters;
+       lnet_rtrbufpool_t               **ln_rtrpools;
 
-        lnet_handle_md_t       ln_ping_target_md;
-        lnet_handle_eq_t       ln_ping_target_eq;
-        lnet_ping_info_t      *ln_ping_info;
+       lnet_handle_md_t                ln_ping_target_md;
+       lnet_handle_eq_t                ln_ping_target_eq;
+       lnet_ping_info_t                *ln_ping_info;
 
-#ifdef __KERNEL__
-        cfs_semaphore_t        ln_rc_signal;        /* serialise startup/shutdown */
-#endif
        /* router checker startup/shutdown state */
        int                             ln_rc_state;
        /* router checker's event queue */
@@ -748,14 +742,48 @@ typedef struct
        cfs_list_t                      ln_rcd_deathrow;
        /* rcd ready for free */
        cfs_list_t                      ln_rcd_zombie;
+#ifdef __KERNEL__
+       /* serialise startup/shutdown */
+       cfs_semaphore_t                 ln_rc_signal;
 
-#ifndef __KERNEL__
-        /* Temporary workaround to allow uOSS and test programs force
-         * server mode in userspace. The only place where we use it is
-         * lnet_prepare(). The only way to turn this flag on is to
-         * call lnet_server_mode() */
+       cfs_mutex_t                     ln_api_mutex;
+       cfs_mutex_t                     ln_lnd_mutex;
+#else
+# ifndef HAVE_LIBPTHREAD
+       int                             ln_api_mutex;
+       int                             ln_lnd_mutex;
+# else
+       pthread_mutex_t                 ln_api_mutex;
+       pthread_mutex_t                 ln_lnd_mutex;
+# endif
+#endif
+       int                             ln_init;        /* LNetInit() called? */
+       /* Have I called LNetNIInit myself? */
+       int                             ln_niinit_self;
+       /* LNetNIInit/LNetNIFini counter */
+       int                             ln_refcount;
+       /* shutdown in progress */
+       int                             ln_shutdown;
 
-        int                    ln_server_mode_flag;
+       int                             ln_routing;     /* am I a router? */
+       lnet_pid_t                      ln_pid;         /* requested pid */
+       /* uniquely identifies this ni in this epoch */
+       __u64                           ln_interface_cookie;
+       /* registered LNDs */
+       cfs_list_t                      ln_lnds;
+
+       /* space for network names */
+       char                            *ln_network_tokens;
+       int                             ln_network_tokens_nob;
+       /* test protocol compatibility flags */
+       int                             ln_testprotocompat;
+
+#ifndef __KERNEL__
+       /* Temporary workaround to allow uOSS and test programs force
+        * server mode in userspace. The only place where we use it is
+        * lnet_prepare(). The only way to turn this flag on is to
+        * call lnet_server_mode() */
+       int                             ln_server_mode_flag;
 #endif
 } lnet_t;
 
index 302697e..a28da78 100644 (file)
@@ -197,7 +197,7 @@ lnet_connect(cfs_socket_t **sockp, lnet_nid_t peer_nid,
 
                 if (the_lnet.ln_testprotocompat != 0) {
                         /* single-shot proto check */
-                        LNET_LOCK();
+                       lnet_net_lock(LNET_LOCK_EX);
                         if ((the_lnet.ln_testprotocompat & 4) != 0) {
                                 cr.acr_version++;
                                 the_lnet.ln_testprotocompat &= ~4;
@@ -206,7 +206,7 @@ lnet_connect(cfs_socket_t **sockp, lnet_nid_t peer_nid,
                                 cr.acr_magic = LNET_PROTO_MAGIC;
                                 the_lnet.ln_testprotocompat &= ~8;
                         }
-                        LNET_UNLOCK();
+                       lnet_net_unlock(LNET_LOCK_EX);
                 }
 
                 rc = libcfs_sock_write(sock, &cr, sizeof(cr),
index eb4e0b7..91450f1 100644 (file)
@@ -91,7 +91,6 @@ lnet_get_networks(void)
 void
 lnet_init_locks(void)
 {
-       cfs_spin_lock_init(&the_lnet.ln_lock);
        cfs_spin_lock_init(&the_lnet.ln_eq_wait_lock);
        cfs_waitq_init(&the_lnet.ln_eq_waitq);
        cfs_mutex_init(&the_lnet.ln_lnd_mutex);
@@ -174,7 +173,6 @@ lnet_get_networks (void)
 
 void lnet_init_locks(void)
 {
-       the_lnet.ln_lock = 0;
        the_lnet.ln_eq_wait_lock = 0;
        the_lnet.ln_lnd_mutex = 0;
        the_lnet.ln_api_mutex = 0;
@@ -184,7 +182,6 @@ void lnet_fini_locks(void)
 {
        LASSERT(the_lnet.ln_api_mutex == 0);
        LASSERT(the_lnet.ln_lnd_mutex == 0);
-       LASSERT(the_lnet.ln_lock == 0);
        LASSERT(the_lnet.ln_eq_wait_lock == 0);
 }
 
@@ -193,7 +190,6 @@ void lnet_fini_locks(void)
 void lnet_init_locks(void)
 {
        pthread_cond_init(&the_lnet.ln_eq_cond, NULL);
-       pthread_mutex_init(&the_lnet.ln_lock, NULL);
        pthread_mutex_init(&the_lnet.ln_eq_wait_lock, NULL);
        pthread_mutex_init(&the_lnet.ln_lnd_mutex, NULL);
        pthread_mutex_init(&the_lnet.ln_api_mutex, NULL);
@@ -203,7 +199,6 @@ void lnet_fini_locks(void)
 {
        pthread_mutex_destroy(&the_lnet.ln_api_mutex);
        pthread_mutex_destroy(&the_lnet.ln_lnd_mutex);
-       pthread_mutex_destroy(&the_lnet.ln_lock);
        pthread_mutex_destroy(&the_lnet.ln_eq_wait_lock);
        pthread_cond_destroy(&the_lnet.ln_eq_cond);
 }
@@ -217,9 +212,16 @@ lnet_create_locks(void)
        lnet_init_locks();
 
        the_lnet.ln_res_lock = cfs_percpt_lock_alloc(lnet_cpt_table());
-       if (the_lnet.ln_res_lock != NULL)
-               return 0;
+       if (the_lnet.ln_res_lock == NULL)
+               goto failed;
+
+       the_lnet.ln_net_lock = cfs_percpt_lock_alloc(lnet_cpt_table());
+       if (the_lnet.ln_net_lock == NULL)
+               goto failed;
 
+       return 0;
+
+ failed:
        lnet_fini_locks();
        return -ENOMEM;
 }
@@ -232,6 +234,11 @@ lnet_destroy_locks(void)
                the_lnet.ln_res_lock = NULL;
        }
 
+       if (the_lnet.ln_net_lock != NULL) {
+               cfs_percpt_lock_free(the_lnet.ln_net_lock);
+               the_lnet.ln_net_lock = NULL;
+       }
+
        lnet_fini_locks();
 }
 
@@ -381,12 +388,13 @@ void
 lnet_counters_get(lnet_counters_t *counters)
 {
        lnet_counters_t *ctr;
+       int             i;
 
        memset(counters, 0, sizeof(*counters));
 
-       LNET_LOCK();
-       ctr = the_lnet.ln_counters;
-       do {    /* iterate over counters of all CPTs in upcoming patches */
+       lnet_net_lock(LNET_LOCK_EX);
+
+       cfs_percpt_for_each(ctr, i, the_lnet.ln_counters) {
                counters->msgs_max     += ctr->msgs_max;
                counters->msgs_alloc   += ctr->msgs_alloc;
                counters->errors       += ctr->errors;
@@ -398,9 +406,9 @@ lnet_counters_get(lnet_counters_t *counters)
                counters->recv_length  += ctr->recv_length;
                counters->route_length += ctr->route_length;
                counters->drop_length  += ctr->drop_length;
-       } while (0);
 
-       LNET_UNLOCK();
+       }
+       lnet_net_unlock(LNET_LOCK_EX);
 }
 EXPORT_SYMBOL(lnet_counters_get);
 
@@ -408,13 +416,14 @@ void
 lnet_counters_reset(void)
 {
        lnet_counters_t *counters;
+       int             i;
+
+       lnet_net_lock(LNET_LOCK_EX);
 
-       LNET_LOCK();
-       counters = the_lnet.ln_counters;
-       do {    /* iterate over counters of all CPTs in upcoming patches */
+       cfs_percpt_for_each(counters, i, the_lnet.ln_counters)
                memset(counters, 0, sizeof(lnet_counters_t));
-       } while (0);
-       LNET_UNLOCK();
+
+       lnet_net_unlock(LNET_LOCK_EX);
 }
 EXPORT_SYMBOL(lnet_counters_reset);
 
@@ -710,25 +719,26 @@ lnet_prepare(lnet_pid_t requested_pid)
 
        CFS_INIT_LIST_HEAD(&the_lnet.ln_test_peers);
        CFS_INIT_LIST_HEAD(&the_lnet.ln_nis);
-       CFS_INIT_LIST_HEAD(&the_lnet.ln_zombie_nis);
+       CFS_INIT_LIST_HEAD(&the_lnet.ln_nis_zombie);
        CFS_INIT_LIST_HEAD(&the_lnet.ln_remote_nets);
        CFS_INIT_LIST_HEAD(&the_lnet.ln_routers);
 
        the_lnet.ln_interface_cookie = lnet_create_interface_cookie();
 
-       LIBCFS_ALLOC(the_lnet.ln_counters, sizeof(lnet_counters_t));
+       the_lnet.ln_counters = cfs_percpt_alloc(lnet_cpt_table(),
+                                               sizeof(lnet_counters_t));
        if (the_lnet.ln_counters == NULL) {
                CERROR("Failed to allocate counters for LNet\n");
                rc = -ENOMEM;
                goto failed;
        }
 
-       rc = lnet_peer_table_create();
+       rc = lnet_peer_tables_create();
        if (rc != 0)
                goto failed;
 
        /* NB: we will have instance of message container per CPT soon */
-       rc = lnet_msg_container_setup(&the_lnet.ln_msg_container);
+       rc = lnet_msg_containers_create();
        if (rc != 0)
                goto failed;
 
@@ -774,13 +784,12 @@ lnet_unprepare (void)
          * descriptors, even those that appear committed to a network op (eg MD
          * with non-zero pending count) */
 
-        lnet_fail_nid(LNET_NID_ANY, 0);
+       lnet_fail_nid(LNET_NID_ANY, 0);
 
-        LASSERT (cfs_list_empty(&the_lnet.ln_test_peers));
-        LASSERT (the_lnet.ln_refcount == 0);
-        LASSERT (cfs_list_empty(&the_lnet.ln_nis));
-        LASSERT (cfs_list_empty(&the_lnet.ln_zombie_nis));
-        LASSERT (the_lnet.ln_nzombie_nis == 0);
+       LASSERT(the_lnet.ln_refcount == 0);
+       LASSERT(cfs_list_empty(&the_lnet.ln_test_peers));
+       LASSERT(cfs_list_empty(&the_lnet.ln_nis));
+       LASSERT(cfs_list_empty(&the_lnet.ln_nis_zombie));
 
        lnet_portals_destroy();
 
@@ -796,12 +805,12 @@ lnet_unprepare (void)
 
        lnet_res_container_cleanup(&the_lnet.ln_eq_container);
 
-       lnet_msg_container_cleanup(&the_lnet.ln_msg_container);
-       lnet_peer_table_destroy();
+       lnet_msg_containers_destroy();
+       lnet_peer_tables_destroy();
        lnet_rtrpools_free();
 
        if (the_lnet.ln_counters != NULL) {
-               LIBCFS_FREE(the_lnet.ln_counters, sizeof(lnet_counters_t));
+               cfs_percpt_free(the_lnet.ln_counters);
                the_lnet.ln_counters = NULL;
        }
 
@@ -809,21 +818,23 @@ lnet_unprepare (void)
 }
 
 lnet_ni_t  *
-lnet_net2ni_locked (__u32 net)
+lnet_net2ni_locked(__u32 net, int cpt)
 {
-        cfs_list_t       *tmp;
-        lnet_ni_t        *ni;
+       cfs_list_t      *tmp;
+       lnet_ni_t       *ni;
 
-        cfs_list_for_each (tmp, &the_lnet.ln_nis) {
-                ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
+       LASSERT(cpt != LNET_LOCK_EX);
 
-                if (LNET_NIDNET(ni->ni_nid) == net) {
-                        lnet_ni_addref_locked(ni);
-                        return ni;
-                }
-        }
+       cfs_list_for_each(tmp, &the_lnet.ln_nis) {
+               ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
 
-        return NULL;
+               if (LNET_NIDNET(ni->ni_nid) == net) {
+                       lnet_ni_addref_locked(ni, cpt);
+                       return ni;
+               }
+       }
+
+       return NULL;
 }
 
 unsigned int
@@ -851,72 +862,91 @@ lnet_cpt_of_nid(lnet_nid_t nid)
 EXPORT_SYMBOL(lnet_cpt_of_nid);
 
 int
-lnet_islocalnet (__u32 net)
+lnet_islocalnet(__u32 net)
 {
-        lnet_ni_t        *ni;
+       struct lnet_ni  *ni;
+       int             cpt;
+
+       cpt = lnet_net_lock_current();
+
+       ni = lnet_net2ni_locked(net, cpt);
+       if (ni != NULL)
+               lnet_ni_decref_locked(ni, cpt);
 
-        LNET_LOCK();
-        ni = lnet_net2ni_locked(net);
-        if (ni != NULL)
-                lnet_ni_decref_locked(ni);
-        LNET_UNLOCK();
+       lnet_net_unlock(cpt);
 
-        return ni != NULL;
+       return ni != NULL;
 }
 
 lnet_ni_t  *
-lnet_nid2ni_locked (lnet_nid_t nid)
+lnet_nid2ni_locked(lnet_nid_t nid, int cpt)
 {
-        cfs_list_t       *tmp;
-        lnet_ni_t        *ni;
+       struct lnet_ni  *ni;
+       cfs_list_t      *tmp;
 
-        cfs_list_for_each (tmp, &the_lnet.ln_nis) {
-                ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
+       LASSERT(cpt != LNET_LOCK_EX);
 
-                if (ni->ni_nid == nid) {
-                        lnet_ni_addref_locked(ni);
-                        return ni;
-                }
-        }
+       cfs_list_for_each(tmp, &the_lnet.ln_nis) {
+               ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
 
-        return NULL;
+               if (ni->ni_nid == nid) {
+                       lnet_ni_addref_locked(ni, cpt);
+                       return ni;
+               }
+       }
+
+       return NULL;
 }
 
 int
-lnet_islocalnid (lnet_nid_t nid)
+lnet_islocalnid(lnet_nid_t nid)
 {
-        lnet_ni_t     *ni;
+       struct lnet_ni  *ni;
+       int             cpt;
 
-        LNET_LOCK();
-        ni = lnet_nid2ni_locked(nid);
-        if (ni != NULL)
-                lnet_ni_decref_locked(ni);
-        LNET_UNLOCK();
+       cpt = lnet_net_lock_current();
+       ni = lnet_nid2ni_locked(nid, cpt);
+       if (ni != NULL)
+               lnet_ni_decref_locked(ni, cpt);
+       lnet_net_unlock(cpt);
 
-        return ni != NULL;
+       return ni != NULL;
 }
 
 int
 lnet_count_acceptor_nis (void)
 {
-        /* Return the # of NIs that need the acceptor. */
-        int            count = 0;
+       /* Return the # of NIs that need the acceptor. */
+       int             count = 0;
 #if defined(__KERNEL__) || defined(HAVE_LIBPTHREAD)
-        cfs_list_t    *tmp;
-        lnet_ni_t     *ni;
+       cfs_list_t      *tmp;
+       struct lnet_ni  *ni;
+       int             cpt;
 
-        LNET_LOCK();
-        cfs_list_for_each (tmp, &the_lnet.ln_nis) {
-                ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
+       cpt = lnet_net_lock_current();
+       cfs_list_for_each(tmp, &the_lnet.ln_nis) {
+               ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
 
-                if (ni->ni_lnd->lnd_accept != NULL)
-                        count++;
-        }
+               if (ni->ni_lnd->lnd_accept != NULL)
+                       count++;
+       }
 
-        LNET_UNLOCK();
+       lnet_net_unlock(cpt);
 
 #endif /* defined(__KERNEL__) || defined(HAVE_LIBPTHREAD) */
-        return count;
+       return count;
+}
+
+static int
+lnet_ni_tq_credits(lnet_ni_t *ni)
+{
+       int     credits;
+
+       credits = ni->ni_maxtxcredits / LNET_CPT_NUMBER;
+       credits = max(credits, 8 * ni->ni_peertxcredits);
+       credits = min(credits, ni->ni_maxtxcredits);
+
+       return credits;
 }
 
 void
@@ -929,38 +959,36 @@ lnet_shutdown_lndnis (void)
         /* NB called holding the global mutex */
 
         /* All quiet on the API front */
-        LASSERT (!the_lnet.ln_shutdown);
-        LASSERT (the_lnet.ln_refcount == 0);
-        LASSERT (cfs_list_empty(&the_lnet.ln_zombie_nis));
-        LASSERT (the_lnet.ln_nzombie_nis == 0);
-        LASSERT (cfs_list_empty(&the_lnet.ln_remote_nets));
-
-        LNET_LOCK();
-        the_lnet.ln_shutdown = 1;               /* flag shutdown */
-
-        /* Unlink NIs from the global table */
-        while (!cfs_list_empty(&the_lnet.ln_nis)) {
-                ni = cfs_list_entry(the_lnet.ln_nis.next,
-                                    lnet_ni_t, ni_list);
-                cfs_list_del (&ni->ni_list);
-
-                the_lnet.ln_nzombie_nis++;
-                lnet_ni_decref_locked(ni); /* drop ln_nis' ref */
-        }
+       LASSERT(!the_lnet.ln_shutdown);
+       LASSERT(the_lnet.ln_refcount == 0);
+       LASSERT(cfs_list_empty(&the_lnet.ln_nis_zombie));
+       LASSERT(cfs_list_empty(&the_lnet.ln_remote_nets));
+
+       lnet_net_lock(LNET_LOCK_EX);
+       the_lnet.ln_shutdown = 1;       /* flag shutdown */
+
+       /* Unlink NIs from the global table */
+       while (!cfs_list_empty(&the_lnet.ln_nis)) {
+               ni = cfs_list_entry(the_lnet.ln_nis.next,
+                                   lnet_ni_t, ni_list);
+               /* move it to zombie list and nobody can find it anymore */
+               cfs_list_move(&ni->ni_list, &the_lnet.ln_nis_zombie);
+               lnet_ni_decref_locked(ni, 0);   /* drop ln_nis' ref */
+       }
 
-        /* Drop the cached eqwait NI. */
+       /* Drop the cached eqwait NI. */
        if (the_lnet.ln_eq_waitni != NULL) {
-               lnet_ni_decref_locked(the_lnet.ln_eq_waitni);
+               lnet_ni_decref_locked(the_lnet.ln_eq_waitni, 0);
                the_lnet.ln_eq_waitni = NULL;
        }
 
-        /* Drop the cached loopback NI. */
-        if (the_lnet.ln_loni != NULL) {
-                lnet_ni_decref_locked(the_lnet.ln_loni);
-                the_lnet.ln_loni = NULL;
-        }
+       /* Drop the cached loopback NI. */
+       if (the_lnet.ln_loni != NULL) {
+               lnet_ni_decref_locked(the_lnet.ln_loni, 0);
+               the_lnet.ln_loni = NULL;
+       }
 
-        LNET_UNLOCK();
+       lnet_net_unlock(LNET_LOCK_EX);
 
         /* Clear lazy portals and drop delayed messages which hold refs
          * on their lnet_msg_t::msg_rxpeer */
@@ -969,30 +997,42 @@ lnet_shutdown_lndnis (void)
 
         /* Clear the peer table and wait for all peers to go (they hold refs on
          * their NIs) */
-       lnet_peer_table_cleanup();
-
-        LNET_LOCK();
-        /* Now wait for the NI's I just nuked to show up on ln_zombie_nis
-         * and shut them down in guaranteed thread context */
-        i = 2;
-        while (the_lnet.ln_nzombie_nis != 0) {
-
-                while (cfs_list_empty(&the_lnet.ln_zombie_nis)) {
-                        LNET_UNLOCK();
-                        ++i;
-                        if ((i & (-i)) == i)
-                                CDEBUG(D_WARNING,"Waiting for %d zombie NIs\n",
-                                       the_lnet.ln_nzombie_nis);
-                        cfs_pause(cfs_time_seconds(1));
-                        LNET_LOCK();
-                }
+       lnet_peer_tables_cleanup();
+
+       lnet_net_lock(LNET_LOCK_EX);
+       /* Now wait for the NI's I just nuked to show up on ln_zombie_nis
+        * and shut them down in guaranteed thread context */
+       i = 2;
+       while (!cfs_list_empty(&the_lnet.ln_nis_zombie)) {
+               int     *ref;
+               int     j;
+
+               ni = cfs_list_entry(the_lnet.ln_nis_zombie.next,
+                                   lnet_ni_t, ni_list);
+               cfs_list_del_init(&ni->ni_list);
+               cfs_percpt_for_each(ref, j, ni->ni_refs) {
+                       if (*ref == 0)
+                               continue;
+                       /* still busy, add it back to zombie list */
+                       cfs_list_add(&ni->ni_list, &the_lnet.ln_nis_zombie);
+                       break;
+               }
 
-                ni = cfs_list_entry(the_lnet.ln_zombie_nis.next,
-                                    lnet_ni_t, ni_list);
-                cfs_list_del(&ni->ni_list);
-                ni->ni_lnd->lnd_refcount--;
+               while (!cfs_list_empty(&ni->ni_list)) {
+                       lnet_net_unlock(LNET_LOCK_EX);
+                       ++i;
+                       if ((i & (-i)) == i) {
+                               CDEBUG(D_WARNING,
+                                      "Waiting for zombie LNI %s\n",
+                                      libcfs_nid2str(ni->ni_nid));
+                       }
+                       cfs_pause(cfs_time_seconds(1));
+                       lnet_net_lock(LNET_LOCK_EX);
+                       continue;
+               }
 
-                LNET_UNLOCK();
+               ni->ni_lnd->lnd_refcount--;
+               lnet_net_unlock(LNET_LOCK_EX);
 
                 islo = ni->ni_lnd->lnd_type == LOLND;
 
@@ -1006,28 +1046,28 @@ lnet_shutdown_lndnis (void)
                         CDEBUG(D_LNI, "Removed LNI %s\n",
                                libcfs_nid2str(ni->ni_nid));
 
-                LIBCFS_FREE(ni, sizeof(*ni));
-
-                LNET_LOCK();
-                the_lnet.ln_nzombie_nis--;
-        }
+               lnet_ni_free(ni);
+               lnet_net_lock(LNET_LOCK_EX);
+       }
 
-        the_lnet.ln_shutdown = 0;
-        LNET_UNLOCK();
+       the_lnet.ln_shutdown = 0;
+       lnet_net_unlock(LNET_LOCK_EX);
 
-        if (the_lnet.ln_network_tokens != NULL) {
-                LIBCFS_FREE(the_lnet.ln_network_tokens,
-                            the_lnet.ln_network_tokens_nob);
-                the_lnet.ln_network_tokens = NULL;
-        }
+       if (the_lnet.ln_network_tokens != NULL) {
+               LIBCFS_FREE(the_lnet.ln_network_tokens,
+                           the_lnet.ln_network_tokens_nob);
+               the_lnet.ln_network_tokens = NULL;
+       }
 }
 
 int
 lnet_startup_lndnis (void)
 {
-        lnd_t             *lnd;
-        lnet_ni_t         *ni;
-        cfs_list_t         nilist;
+       lnd_t                   *lnd;
+       struct lnet_ni          *ni;
+       struct lnet_tx_queue    *tq;
+       cfs_list_t              nilist;
+       int                     i;
         int                rc = 0;
         int                lnd_type;
         int                nicount = 0;
@@ -1090,11 +1130,9 @@ lnet_startup_lndnis (void)
                 }
 #endif
 
-                ni->ni_refcount = 1;
-
-                LNET_LOCK();
-                lnd->lnd_refcount++;
-                LNET_UNLOCK();
+               lnet_net_lock(LNET_LOCK_EX);
+               lnd->lnd_refcount++;
+               lnet_net_unlock(LNET_LOCK_EX);
 
                 ni->ni_lnd = lnd;
 
@@ -1106,9 +1144,9 @@ lnet_startup_lndnis (void)
                         LCONSOLE_ERROR_MSG(0x105, "Error %d starting up LNI %s"
                                            "\n",
                                            rc, libcfs_lnd2str(lnd->lnd_type));
-                        LNET_LOCK();
-                        lnd->lnd_refcount--;
-                        LNET_UNLOCK();
+                       lnet_net_lock(LNET_LOCK_EX);
+                       lnd->lnd_refcount--;
+                       lnet_net_unlock(LNET_LOCK_EX);
                         goto failed;
                 }
 
@@ -1116,9 +1154,12 @@ lnet_startup_lndnis (void)
 
                 cfs_list_del(&ni->ni_list);
 
-                LNET_LOCK();
-                cfs_list_add_tail(&ni->ni_list, &the_lnet.ln_nis);
-                LNET_UNLOCK();
+               lnet_net_lock(LNET_LOCK_EX);
+               /* refcount for ln_nis */
+               lnet_ni_addref_locked(ni, 0);
+               cfs_list_add_tail(&ni->ni_list, &the_lnet.ln_nis);
+
+               lnet_net_unlock(LNET_LOCK_EX);
 
                 if (lnd->lnd_type == LOLND) {
                         lnet_ni_addref(ni);
@@ -1151,15 +1192,19 @@ lnet_startup_lndnis (void)
                         goto failed;
                 }
 
-                ni->ni_txcredits = ni->ni_mintxcredits = ni->ni_maxtxcredits;
+               cfs_percpt_for_each(tq, i, ni->ni_tx_queues) {
+                       tq->tq_credits_min =
+                       tq->tq_credits_max =
+                       tq->tq_credits = lnet_ni_tq_credits(ni);
+               }
 
-                CDEBUG(D_LNI, "Added LNI %s [%d/%d/%d/%d]\n",
-                       libcfs_nid2str(ni->ni_nid),
-                       ni->ni_peertxcredits, ni->ni_txcredits,
-                       ni->ni_peerrtrcredits, ni->ni_peertimeout);
+               CDEBUG(D_LNI, "Added LNI %s [%d/%d/%d/%d]\n",
+                      libcfs_nid2str(ni->ni_nid), ni->ni_peertxcredits,
+                      lnet_ni_tq_credits(ni) * LNET_CPT_NUMBER,
+                      ni->ni_peerrtrcredits, ni->ni_peertimeout);
 
-                nicount++;
-        }
+               nicount++;
+       }
 
        if (the_lnet.ln_eq_waitni != NULL && nicount > 1) {
                lnd_type = the_lnet.ln_eq_waitni->ni_lnd->lnd_type;
@@ -1177,10 +1222,10 @@ lnet_startup_lndnis (void)
         while (!cfs_list_empty(&nilist)) {
                 ni = cfs_list_entry(nilist.next, lnet_ni_t, ni_list);
                 cfs_list_del(&ni->ni_list);
-                LIBCFS_FREE(ni, sizeof(*ni));
-        }
+               lnet_ni_free(ni);
+       }
 
-        return -ENETDOWN;
+       return -ENETDOWN;
 }
 
 /**
@@ -1473,9 +1518,9 @@ LNetCtl(unsigned int cmd, void *arg)
                 return 0;
 
         case IOC_LIBCFS_TESTPROTOCOMPAT:
-                LNET_LOCK();
-                the_lnet.ln_testprotocompat = data->ioc_flags;
-                LNET_UNLOCK();
+               lnet_net_lock(LNET_LOCK_EX);
+               the_lnet.ln_testprotocompat = data->ioc_flags;
+               lnet_net_unlock(LNET_LOCK_EX);
                 return 0;
 
         case IOC_LIBCFS_PING:
@@ -1542,14 +1587,15 @@ LNetCtl(unsigned int cmd, void *arg)
 int
 LNetGetId(unsigned int index, lnet_process_id_t *id)
 {
-        lnet_ni_t        *ni;
-        cfs_list_t       *tmp;
-        int               rc = -ENOENT;
+       struct lnet_ni  *ni;
+       cfs_list_t      *tmp;
+       int             cpt;
+       int             rc = -ENOENT;
 
-        LASSERT (the_lnet.ln_init);
-        LASSERT (the_lnet.ln_refcount > 0);
+       LASSERT(the_lnet.ln_init);
+       LASSERT(the_lnet.ln_refcount > 0);
 
-        LNET_LOCK();
+       cpt = lnet_net_lock_current();
 
         cfs_list_for_each(tmp, &the_lnet.ln_nis) {
                 if (index-- != 0)
@@ -1563,9 +1609,8 @@ LNetGetId(unsigned int index, lnet_process_id_t *id)
                 break;
         }
 
-        LNET_UNLOCK();
-
-        return rc;
+       lnet_net_unlock(cpt);
+       return rc;
 }
 
 /**
@@ -1618,15 +1663,18 @@ lnet_create_ping_info(void)
                 ns->ns_nid    = id.nid;
                 ns->ns_status = LNET_NI_STATUS_UP;
 
-                LNET_LOCK();
+               lnet_net_lock(0);
+
+               ni = lnet_nid2ni_locked(id.nid, 0);
+               LASSERT(ni != NULL);
 
-                ni = lnet_nid2ni_locked(id.nid);
-                LASSERT (ni != NULL);
-                LASSERT (ni->ni_status == NULL);
-                ni->ni_status = ns;
-                lnet_ni_decref_locked(ni);
+               lnet_ni_lock(ni);
+               LASSERT(ni->ni_status == NULL);
+               ni->ni_status = ns;
+               lnet_ni_unlock(ni);
 
-                LNET_UNLOCK();
+               lnet_ni_decref_locked(ni, 0);
+               lnet_net_unlock(0);
         }
 
         the_lnet.ln_ping_info = pinfo;
@@ -1636,15 +1684,17 @@ lnet_create_ping_info(void)
 static void
 lnet_destroy_ping_info(void)
 {
-        lnet_ni_t *ni;
+       struct lnet_ni  *ni;
 
-        LNET_LOCK();
+       lnet_net_lock(0);
 
-        cfs_list_for_each_entry (ni, &the_lnet.ln_nis, ni_list) {
-                ni->ni_status = NULL;
-        }
+       cfs_list_for_each_entry(ni, &the_lnet.ln_nis, ni_list) {
+               lnet_ni_lock(ni);
+               ni->ni_status = NULL;
+               lnet_ni_unlock(ni);
+       }
 
-        LNET_UNLOCK();
+       lnet_net_unlock(0);
 
         LIBCFS_FREE(the_lnet.ln_ping_info,
                     offsetof(lnet_ping_info_t,
index 9c4660e..a20044b 100644 (file)
@@ -91,10 +91,29 @@ lnet_net_unique(__u32 net, cfs_list_t *nilist)
         return 1;
 }
 
+void
+lnet_ni_free(struct lnet_ni *ni)
+{
+       if (ni->ni_refs != NULL)
+               cfs_percpt_free(ni->ni_refs);
+
+       if (ni->ni_tx_queues != NULL)
+               cfs_percpt_free(ni->ni_tx_queues);
+
+#ifndef __KERNEL__
+# ifdef HAVE_LIBPTHREAD
+       pthread_mutex_destroy(&ni->ni_lock);
+# endif
+#endif
+       LIBCFS_FREE(ni, sizeof(*ni));
+}
+
 lnet_ni_t *
-lnet_new_ni(__u32 net, cfs_list_t *nilist)
+lnet_ni_alloc(__u32 net, cfs_list_t *nilist)
 {
-        lnet_ni_t *ni;
+       struct lnet_tx_queue    *tq;
+       struct lnet_ni          *ni;
+       int                     i;
 
         if (!lnet_net_unique(net, nilist)) {
                 LCONSOLE_ERROR_MSG(0x111, "Duplicate network specified: %s\n",
@@ -109,16 +128,34 @@ lnet_new_ni(__u32 net, cfs_list_t *nilist)
                 return NULL;
         }
 
-        /* zero counters/flags, NULL pointers... */
-        memset(ni, 0, sizeof(*ni));
+#ifdef __KERNEL__
+       cfs_spin_lock_init(&ni->ni_lock);
+#else
+# ifdef HAVE_LIBPTHREAD
+       pthread_mutex_init(&ni->ni_lock, NULL);
+# endif
+#endif
+       ni->ni_refs = cfs_percpt_alloc(lnet_cpt_table(),
+                                      sizeof(*ni->ni_refs[0]));
+       if (ni->ni_refs == NULL)
+               goto failed;
+
+       ni->ni_tx_queues = cfs_percpt_alloc(lnet_cpt_table(),
+                                           sizeof(*ni->ni_tx_queues[0]));
+       if (ni->ni_tx_queues == NULL)
+               goto failed;
+
+       cfs_percpt_for_each(tq, i, ni->ni_tx_queues)
+               CFS_INIT_LIST_HEAD(&tq->tq_delayed);
 
         /* LND will fill in the address part of the NID */
         ni->ni_nid = LNET_MKNID(net, 0);
-        CFS_INIT_LIST_HEAD(&ni->ni_txq);
         ni->ni_last_alive = cfs_time_current();
-
         cfs_list_add_tail(&ni->ni_list, nilist);
         return ni;
+ failed:
+       lnet_ni_free(ni);
+       return NULL;
 }
 
 int
@@ -148,12 +185,12 @@ lnet_parse_networks(cfs_list_t *nilist, char *networks)
         the_lnet.ln_network_tokens_nob = tokensize;
         memcpy (tokens, networks, tokensize);
         str = tokens;
-        
-        /* Add in the loopback network */
-        ni = lnet_new_ni(LNET_MKNET(LOLND, 0), nilist);
-        if (ni == NULL)
-                goto failed;
-        
+
+       /* Add in the loopback network */
+       ni = lnet_ni_alloc(LNET_MKNET(LOLND, 0), nilist);
+       if (ni == NULL)
+               goto failed;
+
         while (str != NULL && *str != 0) {
                 char      *comma = strchr(str, ',');
                 char      *bracket = strchr(str, '(');
@@ -180,8 +217,8 @@ lnet_parse_networks(cfs_list_t *nilist, char *networks)
                                 goto failed;
                         }
 
-                        if (LNET_NETTYP(net) != LOLND && /* loopback is implicit */
-                            lnet_new_ni(net, nilist) == NULL)
+                       if (LNET_NETTYP(net) != LOLND && /* LO is implicit */
+                           lnet_ni_alloc(net, nilist) == NULL)
                                 goto failed;
 
                        str = comma;
@@ -197,7 +234,7 @@ lnet_parse_networks(cfs_list_t *nilist, char *networks)
                }
 
                 nnets++;
-                ni = lnet_new_ni(net, nilist);
+               ni = lnet_ni_alloc(net, nilist);
                 if (ni == NULL)
                         goto failed;
 
@@ -264,14 +301,14 @@ lnet_parse_networks(cfs_list_t *nilist, char *networks)
  failed:
         while (!cfs_list_empty(nilist)) {
                 ni = cfs_list_entry(nilist->next, lnet_ni_t, ni_list);
-                
-                cfs_list_del(&ni->ni_list);
-                LIBCFS_FREE(ni, sizeof(*ni));
-        }
+
+               cfs_list_del(&ni->ni_list);
+               lnet_ni_free(ni);
+       }
        LIBCFS_FREE(tokens, tokensize);
-        the_lnet.ln_network_tokens = NULL;
+       the_lnet.ln_network_tokens = NULL;
 
-        return -EINVAL;
+       return -EINVAL;
 }
 
 lnet_text_buf_t *
index dde4f45..2fcc98a 100644 (file)
@@ -399,17 +399,17 @@ lnet_eq_wait_locked(int *timeout_ms)
                 * events queued, or to block. */
                lnet_eq_wait_unlock();
 
-               LNET_LOCK();
+               lnet_net_lock(0);
                eq_waitni = the_lnet.ln_eq_waitni;
                if (unlikely(eq_waitni == NULL)) {
-                       LNET_UNLOCK();
+                       lnet_net_unlock(0);
 
                        lnet_eq_wait_lock();
                        return -1;
                }
 
-               lnet_ni_addref_locked(eq_waitni);
-               LNET_UNLOCK();
+               lnet_ni_addref_locked(eq_waitni, 0);
+               lnet_net_unlock(0);
 
                if (tms <= 0) { /* even for tms == 0 */
                        (eq_waitni->ni_lnd->lnd_wait)(eq_waitni, tms);
index c1289b7..07b2d97 100644 (file)
@@ -54,6 +54,7 @@ lnet_fail_nid (lnet_nid_t nid, unsigned int threshold)
 
         LASSERT (the_lnet.ln_init);
 
+       /* NB: use lnet_net_lock(0) to serialize operations on test peers */
         if (threshold != 0) {
                 /* Adding a new entry */
                 LIBCFS_ALLOC(tp, sizeof(*tp));
@@ -63,16 +64,16 @@ lnet_fail_nid (lnet_nid_t nid, unsigned int threshold)
                 tp->tp_nid = nid;
                 tp->tp_threshold = threshold;
 
-                LNET_LOCK();
-                cfs_list_add_tail (&tp->tp_list, &the_lnet.ln_test_peers);
-                LNET_UNLOCK();
-                return 0;
-        }
+               lnet_net_lock(0);
+               cfs_list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
+               lnet_net_unlock(0);
+               return 0;
+       }
 
-        /* removing entries */
-        CFS_INIT_LIST_HEAD (&cull);
+       /* removing entries */
+       CFS_INIT_LIST_HEAD(&cull);
 
-        LNET_LOCK();
+       lnet_net_lock(0);
 
         cfs_list_for_each_safe (el, next, &the_lnet.ln_test_peers) {
                 tp = cfs_list_entry (el, lnet_test_peer_t, tp_list);
@@ -86,7 +87,7 @@ lnet_fail_nid (lnet_nid_t nid, unsigned int threshold)
                 }
         }
 
-        LNET_UNLOCK();
+       lnet_net_unlock(0);
 
         while (!cfs_list_empty (&cull)) {
                 tp = cfs_list_entry (cull.next, lnet_test_peer_t, tp_list);
@@ -108,7 +109,8 @@ fail_peer (lnet_nid_t nid, int outgoing)
 
         CFS_INIT_LIST_HEAD (&cull);
 
-        LNET_LOCK();
+       /* NB: use lnet_net_lock(0) to serialize operations on test peers */
+       lnet_net_lock(0);
 
         cfs_list_for_each_safe (el, next, &the_lnet.ln_test_peers) {
                 tp = cfs_list_entry (el, lnet_test_peer_t, tp_list);
@@ -142,7 +144,7 @@ fail_peer (lnet_nid_t nid, int outgoing)
                 }
         }
 
-        LNET_UNLOCK ();
+       lnet_net_unlock(0);
 
         while (!cfs_list_empty (&cull)) {
                 tp = cfs_list_entry (cull.next, lnet_test_peer_t, tp_list);
@@ -706,27 +708,27 @@ lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
        return rc;
 }
 
-/* NB: caller shall hold a ref on 'lp' as I'd drop LNET_LOCK */
+/* NB: caller shall hold a ref on 'lp' as I'd drop lnet_net_lock */
 void
 lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
 {
        cfs_time_t      last_alive = 0;
 
-        LASSERT (lnet_peer_aliveness_enabled(lp));
-        LASSERT (ni->ni_lnd->lnd_query != NULL);
-        LASSERT (the_lnet.ln_routing == 1);
+       LASSERT(lnet_peer_aliveness_enabled(lp));
+       LASSERT(ni->ni_lnd->lnd_query != NULL);
+       LASSERT(the_lnet.ln_routing == 1);
 
-        LNET_UNLOCK();
-        (ni->ni_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
-        LNET_LOCK();
+       lnet_net_unlock(lp->lp_cpt);
+       (ni->ni_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
+       lnet_net_lock(lp->lp_cpt);
 
-        lp->lp_last_query = cfs_time_current();
+       lp->lp_last_query = cfs_time_current();
 
-        if (last_alive != 0) /* NI has updated timestamp */
-                lp->lp_last_alive = last_alive;
+       if (last_alive != 0) /* NI has updated timestamp */
+               lp->lp_last_alive = last_alive;
 }
 
-/* NB: always called with LNET_LOCK held */
+/* NB: always called with lnet_net_lock held */
 static inline int
 lnet_peer_is_alive (lnet_peer_t *lp, cfs_time_t now)
 {
@@ -760,7 +762,7 @@ lnet_peer_is_alive (lnet_peer_t *lp, cfs_time_t now)
 
 
 /* NB: returns 1 when alive, 0 when dead, negative when error;
- *     may drop the LNET_LOCK */
+ *     may drop the lnet_net_lock */
 int
 lnet_peer_alive_locked (lnet_peer_t *lp)
 {
@@ -808,34 +810,40 @@ lnet_peer_alive_locked (lnet_peer_t *lp)
 }
 
 int
-lnet_post_send_locked (lnet_msg_t *msg, int do_send)
+lnet_post_send_locked(lnet_msg_t *msg, int do_send)
 {
-        /* lnet_send is going to LNET_UNLOCK immediately after this, so it sets
-         * do_send FALSE and I don't do the unlock/send/lock bit.  I return
-         * EAGAIN if msg blocked, EHOSTUNREACH if msg_txpeer appears dead, and
-         * 0 if sent or OK to send */
-        lnet_peer_t *lp = msg->msg_txpeer;
-        lnet_ni_t   *ni = lp->lp_ni;
-
-        /* non-lnet_send() callers have checked before */
+       /* lnet_send is going to lnet_net_unlock immediately after this,
+        * so it sets do_send FALSE and I don't do the unlock/send/lock bit.
+        * I return EAGAIN if msg blocked, EHOSTUNREACH if msg_txpeer
+        * appears dead, and 0 if sent or OK to send */
+       struct lnet_peer        *lp = msg->msg_txpeer;
+       struct lnet_ni          *ni = lp->lp_ni;
+       struct lnet_tx_queue    *tq;
+       int                     cpt;
+
+       /* non-lnet_send() callers have checked before */
        LASSERT(!do_send || msg->msg_tx_delayed);
        LASSERT(!msg->msg_receiving);
+       LASSERT(msg->msg_tx_committed);
+
+       cpt = msg->msg_tx_cpt;
+       tq = ni->ni_tx_queues[cpt];
 
        /* NB 'lp' is always the next hop */
        if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
            lnet_peer_alive_locked(lp) == 0) {
-               the_lnet.ln_counters->drop_count++;
-               the_lnet.ln_counters->drop_length += msg->msg_len;
-                LNET_UNLOCK();
+               the_lnet.ln_counters[cpt]->drop_count++;
+               the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
+               lnet_net_unlock(cpt);
 
-                CNETERR("Dropping message for %s: peer not alive\n",
-                        libcfs_id2str(msg->msg_target));
-                if (do_send)
-                        lnet_finalize(ni, msg, -EHOSTUNREACH);
+               CNETERR("Dropping message for %s: peer not alive\n",
+                       libcfs_id2str(msg->msg_target));
+               if (do_send)
+                       lnet_finalize(ni, msg, -EHOSTUNREACH);
 
-                LNET_LOCK();
-                return EHOSTUNREACH;
-        }
+               lnet_net_lock(cpt);
+               return EHOSTUNREACH;
+       }
 
         if (!msg->msg_peertxcredit) {
                 LASSERT ((lp->lp_txcredits < 0) ==
@@ -856,28 +864,28 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send)
         }
 
         if (!msg->msg_txcredit) {
-                LASSERT ((ni->ni_txcredits < 0) ==
-                         !cfs_list_empty(&ni->ni_txq));
+               LASSERT((tq->tq_credits < 0) ==
+                       !cfs_list_empty(&tq->tq_delayed));
 
-                msg->msg_txcredit = 1;
-                ni->ni_txcredits--;
+               msg->msg_txcredit = 1;
+               tq->tq_credits--;
 
-                if (ni->ni_txcredits < ni->ni_mintxcredits)
-                        ni->ni_mintxcredits = ni->ni_txcredits;
+               if (tq->tq_credits < tq->tq_credits_min)
+                       tq->tq_credits_min = tq->tq_credits;
 
-                if (ni->ni_txcredits < 0) {
+               if (tq->tq_credits < 0) {
                        msg->msg_tx_delayed = 1;
-                        cfs_list_add_tail(&msg->msg_list, &ni->ni_txq);
-                        return EAGAIN;
-                }
-        }
+                       cfs_list_add_tail(&msg->msg_list, &tq->tq_delayed);
+                       return EAGAIN;
+               }
+       }
 
-        if (do_send) {
-                LNET_UNLOCK();
-                lnet_ni_send(ni, msg);
-                LNET_LOCK();
-        }
-        return 0;
+       if (do_send) {
+               lnet_net_unlock(cpt);
+               lnet_ni_send(ni, msg);
+               lnet_net_lock(cpt);
+       }
+       return 0;
 }
 
 #ifdef __KERNEL__
@@ -885,23 +893,29 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send)
 lnet_rtrbufpool_t *
 lnet_msg2bufpool(lnet_msg_t *msg)
 {
-        lnet_rtrbufpool_t *rbp = &the_lnet.ln_rtrpools[0];
+       lnet_rtrbufpool_t       *rbp;
+       int                     cpt;
 
-        LASSERT (msg->msg_len <= LNET_MTU);
-        while (msg->msg_len > (unsigned int)rbp->rbp_npages * CFS_PAGE_SIZE) {
-                rbp++;
-                LASSERT (rbp < &the_lnet.ln_rtrpools[LNET_NRBPOOLS]);
-        }
+       LASSERT(msg->msg_rx_committed);
+
+       cpt = msg->msg_rx_cpt;
+       rbp = &the_lnet.ln_rtrpools[cpt][0];
+
+       LASSERT(msg->msg_len <= LNET_MTU);
+       while (msg->msg_len > (unsigned int)rbp->rbp_npages * CFS_PAGE_SIZE) {
+               rbp++;
+               LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
+       }
 
-        return rbp;
+       return rbp;
 }
 
 int
 lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
 {
-        /* lnet_parse is going to LNET_UNLOCK immediately after this, so it
-         * sets do_recv FALSE and I don't do the unlock/send/lock bit.  I
-         * return EAGAIN if msg blocked and 0 if received or OK to receive */
+       /* lnet_parse is going to lnet_net_unlock immediately after this, so it
+        * sets do_recv FALSE and I don't do the unlock/send/lock bit.  I
+        * return EAGAIN if msg blocked and 0 if received or OK to receive */
         lnet_peer_t         *lp = msg->msg_rxpeer;
         lnet_rtrbufpool_t   *rbp;
         lnet_rtrbuf_t       *rb;
@@ -962,12 +976,14 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
         msg->msg_kiov = &rb->rb_kiov[0];
 
         if (do_recv) {
-                LNET_UNLOCK();
-                lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
-                             0, msg->msg_len, msg->msg_len);
-                LNET_LOCK();
-        }
-        return 0;
+               int cpt = msg->msg_rx_cpt;
+
+               lnet_net_unlock(cpt);
+               lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
+                            0, msg->msg_len, msg->msg_len);
+               lnet_net_lock(cpt);
+       }
+       return 0;
 }
 #endif
 
@@ -976,22 +992,24 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
 {
        lnet_peer_t     *txpeer = msg->msg_txpeer;
        lnet_msg_t      *msg2;
-       lnet_ni_t       *ni;
 
-        if (msg->msg_txcredit) {
-                /* give back NI txcredits */
-                msg->msg_txcredit = 0;
-                ni = txpeer->lp_ni;
+       if (msg->msg_txcredit) {
+               struct lnet_ni       *ni = txpeer->lp_ni;
+               struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
 
-                LASSERT((ni->ni_txcredits < 0) == !cfs_list_empty(&ni->ni_txq));
+               /* give back NI txcredits */
+               msg->msg_txcredit = 0;
 
-                ni->ni_txcredits++;
-                if (ni->ni_txcredits <= 0) {
-                        msg2 = cfs_list_entry(ni->ni_txq.next, lnet_msg_t,
-                                              msg_list);
-                        cfs_list_del(&msg2->msg_list);
+               LASSERT((tq->tq_credits < 0) ==
+                       !cfs_list_empty(&tq->tq_delayed));
+
+               tq->tq_credits++;
+               if (tq->tq_credits <= 0) {
+                       msg2 = cfs_list_entry(tq->tq_delayed.next,
+                                             lnet_msg_t, msg_list);
+                       cfs_list_del(&msg2->msg_list);
 
-                        LASSERT(msg2->msg_txpeer->lp_ni == ni);
+                       LASSERT(msg2->msg_txpeer->lp_ni == ni);
                        LASSERT(msg2->msg_tx_delayed);
 
                         (void) lnet_post_send_locked(msg2, 1);
@@ -1117,11 +1135,14 @@ lnet_compare_routes(lnet_route_t *r1, lnet_route_t *r2)
        if (p1->lp_txcredits < p2->lp_txcredits)
                return -1;
 
-       return 0;
+       if (r1->lr_seq - r2->lr_seq <= 0)
+               return 1;
+
+       return -1;
 }
 
 static lnet_peer_t *
-lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target)
+lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
 {
        lnet_remotenet_t        *rnet;
        lnet_route_t            *rtr;
@@ -1131,6 +1152,9 @@ lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target)
        struct lnet_peer        *lp;
        int                     rc;
 
+       /* If @rtr_nid is not LNET_NID_ANY, return the gateway with
+        * rtr_nid nid, otherwise find the best gateway I can use */
+
        rnet = lnet_find_net_locked(LNET_NIDNET(target));
        if (rnet == NULL)
                return NULL;
@@ -1148,12 +1172,19 @@ lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target)
                if (ni != NULL && lp->lp_ni != ni)
                        continue;
 
+               if (lp->lp_nid == rtr_nid) /* it's pre-determined router */
+                       return lp;
+
                if (lp_best == NULL) {
                        rtr_best = rtr_last = rtr;
                        lp_best = lp;
                        continue;
                }
 
+               /* no protection on below fields, but it's harmless */
+               if (rtr_last->lr_seq - rtr->lr_seq < 0)
+                       rtr_last = rtr;
+
                rc = lnet_compare_routes(rtr, rtr_best);
                if (rc < 0)
                        continue;
@@ -1162,25 +1193,29 @@ lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target)
                lp_best = lp;
        }
 
-       if (rtr_best != NULL) {
-               /* Place selected route at the end of the route list to ensure
-                * fairness; everything else being equal... */
-               cfs_list_del(&rtr_best->lr_list);
-               cfs_list_add_tail(&rtr_best->lr_list, &rnet->lrn_routes);
-       }
-
+       /* set sequence number on the best router to the latest sequence + 1
+        * so we can round-robin all routers, it's race and inaccurate but
+        * harmless and functional  */
+       if (rtr_best != NULL)
+               rtr_best->lr_seq = rtr_last->lr_seq + 1;
        return lp_best;
 }
 
 int
-lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
+lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
 {
-        lnet_nid_t        dst_nid = msg->msg_target.nid;
-        lnet_ni_t        *src_ni;
-        lnet_ni_t        *local_ni;
-        lnet_peer_t      *lp;
-        int               rc;
+       lnet_nid_t              dst_nid = msg->msg_target.nid;
+       struct lnet_ni          *src_ni;
+       struct lnet_ni          *local_ni;
+       struct lnet_peer        *lp;
+       int                     cpt;
+       int                     cpt2;
+       int                     rc;
 
+       /* NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
+        * but we might want to use pre-determined router for ACK/REPLY
+        * in the future */
+       /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
         LASSERT (msg->msg_txpeer == NULL);
         LASSERT (!msg->msg_sending);
         LASSERT (!msg->msg_target_is_router);
@@ -1188,21 +1223,22 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
 
         msg->msg_sending = 1;
 
-        /* NB! ni != NULL == interface pre-determined (ACK/REPLY) */
-
-        LNET_LOCK();
+       LASSERT(!msg->msg_tx_committed);
+       cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid);
+ again:
+       lnet_net_lock(cpt);
 
-        if (the_lnet.ln_shutdown) {
-                LNET_UNLOCK();
-                return -ESHUTDOWN;
-        }
+       if (the_lnet.ln_shutdown) {
+               lnet_net_unlock(cpt);
+               return -ESHUTDOWN;
+       }
 
-        if (src_nid == LNET_NID_ANY) {
-                src_ni = NULL;
-        } else {
-                src_ni = lnet_nid2ni_locked(src_nid);
-                if (src_ni == NULL) {
-                        LNET_UNLOCK();
+       if (src_nid == LNET_NID_ANY) {
+               src_ni = NULL;
+       } else {
+               src_ni = lnet_nid2ni_locked(src_nid, cpt);
+               if (src_ni == NULL) {
+                       lnet_net_unlock(cpt);
                         LCONSOLE_WARN("Can't send to %s: src %s is not a "
                                       "local nid\n", libcfs_nid2str(dst_nid),
                                       libcfs_nid2str(src_nid));
@@ -1211,43 +1247,47 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
                 LASSERT (!msg->msg_routing);
         }
 
-       lnet_msg_commit(msg, 1);
         /* Is this for someone on a local network? */
-        local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid));
+       local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid), cpt);
 
         if (local_ni != NULL) {
                 if (src_ni == NULL) {
                         src_ni = local_ni;
                         src_nid = src_ni->ni_nid;
                 } else if (src_ni == local_ni) {
-                        lnet_ni_decref_locked(local_ni);
-                } else {
-                        lnet_ni_decref_locked(local_ni);
-                        lnet_ni_decref_locked(src_ni);
-                        LNET_UNLOCK();
-                        LCONSOLE_WARN("No route to %s via from %s\n",
-                                      libcfs_nid2str(dst_nid),
-                                      libcfs_nid2str(src_nid));
-                        return -EINVAL;
-                }
+                       lnet_ni_decref_locked(local_ni, cpt);
+               } else {
+                       lnet_ni_decref_locked(local_ni, cpt);
+                       lnet_ni_decref_locked(src_ni, cpt);
+                       lnet_net_unlock(cpt);
+                       LCONSOLE_WARN("No route to %s via from %s\n",
+                                     libcfs_nid2str(dst_nid),
+                                     libcfs_nid2str(src_nid));
+                       return -EINVAL;
+               }
 
-                LASSERT (src_nid != LNET_NID_ANY);
+               LASSERT(src_nid != LNET_NID_ANY);
+               lnet_msg_commit(msg, cpt);
 
                 if (!msg->msg_routing)
                         msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
 
                 if (src_ni == the_lnet.ln_loni) {
                         /* No send credit hassles with LOLND */
-                        LNET_UNLOCK();
-                        lnet_ni_send(src_ni, msg);
-                        lnet_ni_decref(src_ni);
-                        return 0;
-                }
+                       lnet_net_unlock(cpt);
+                       lnet_ni_send(src_ni, msg);
+
+                       lnet_net_lock(cpt);
+                       lnet_ni_decref_locked(src_ni, cpt);
+                       lnet_net_unlock(cpt);
+                       return 0;
+               }
 
-                rc = lnet_nid2peer_locked(&lp, dst_nid);
-                lnet_ni_decref_locked(src_ni);  /* lp has ref on src_ni; lose mine */
-                if (rc != 0) {
-                        LNET_UNLOCK();
+               rc = lnet_nid2peer_locked(&lp, dst_nid, cpt);
+               /* lp has ref on src_ni; lose mine */
+               lnet_ni_decref_locked(src_ni, cpt);
+               if (rc != 0) {
+                       lnet_net_unlock(cpt);
                         LCONSOLE_WARN("Error %d finding peer %s\n", rc,
                                       libcfs_nid2str(dst_nid));
                         /* ENOMEM or shutting down */
@@ -1256,7 +1296,7 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
                 LASSERT (lp->lp_ni == src_ni);
         } else {
 #ifndef __KERNEL__
-                LNET_UNLOCK();
+               lnet_net_unlock(cpt);
 
                 /* NB
                  * - once application finishes computation, check here to update
@@ -1266,14 +1306,14 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
                 if (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING)
                         lnet_router_checker();
 
-                LNET_LOCK();
+               lnet_net_lock(cpt);
 #endif
-                /* sending to a remote network */
-               lp = lnet_find_route_locked(src_ni, dst_nid);
-                if (lp == NULL) {
-                        if (src_ni != NULL)
-                                lnet_ni_decref_locked(src_ni);
-                        LNET_UNLOCK();
+               /* sending to a remote network */
+               lp = lnet_find_route_locked(src_ni, dst_nid, rtr_nid);
+               if (lp == NULL) {
+                       if (src_ni != NULL)
+                               lnet_ni_decref_locked(src_ni, cpt);
+                       lnet_net_unlock(cpt);
 
                         LCONSOLE_WARN("No route to %s via %s "
                                       "(all routers down)\n",
@@ -1282,6 +1322,23 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
                         return -EHOSTUNREACH;
                 }
 
+               /* rtr_nid is LNET_NID_ANY or NID of pre-determined router,
+                * it's possible that rtr_nid isn't LNET_NID_ANY and lp isn't
+                * pre-determined router, this can happen if router table
+                * was changed when we release the lock */
+               if (rtr_nid != lp->lp_nid) {
+                       cpt2 = lnet_cpt_of_nid(lp->lp_nid);
+                       if (cpt2 != cpt) {
+                               if (src_ni != NULL)
+                                       lnet_ni_decref_locked(src_ni, cpt);
+                               lnet_net_unlock(cpt);
+
+                               rtr_nid = lp->lp_nid;
+                               cpt = cpt2;
+                               goto again;
+                       }
+               }
+
                 CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
                        libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
                        lnet_msgtyp2str(msg->msg_type), msg->msg_len);
@@ -1291,12 +1348,13 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
                         src_nid = src_ni->ni_nid;
                 } else {
                         LASSERT (src_ni == lp->lp_ni);
-                        lnet_ni_decref_locked(src_ni);
-                }
+                       lnet_ni_decref_locked(src_ni, cpt);
+               }
 
-                lnet_peer_addref_locked(lp);
+               lnet_peer_addref_locked(lp);
 
-                LASSERT (src_nid != LNET_NID_ANY);
+               LASSERT(src_nid != LNET_NID_ANY);
+               lnet_msg_commit(msg, cpt);
 
                 if (!msg->msg_routing) {
                         /* I'm the source and now I know which NI to send on */
@@ -1317,7 +1375,7 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
         msg->msg_txpeer = lp;                   /* msg takes my ref on lp */
 
         rc = lnet_post_send_locked(msg, 0);
-        LNET_UNLOCK();
+       lnet_net_unlock(cpt);
 
         if (rc == EHOSTUNREACH)
                 return -EHOSTUNREACH;
@@ -1329,12 +1387,12 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
 }
 
 static void
-lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
+lnet_drop_message(lnet_ni_t *ni, int cpt, void *private, unsigned int nob)
 {
-       LNET_LOCK();
-       the_lnet.ln_counters->drop_count++;
-       the_lnet.ln_counters->drop_length += nob;
-       LNET_UNLOCK();
+       lnet_net_lock(cpt);
+       the_lnet.ln_counters[cpt]->drop_count++;
+       the_lnet.ln_counters[cpt]->drop_length += nob;
+       lnet_net_unlock(cpt);
 
        lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
 }
@@ -1461,9 +1519,9 @@ lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
         lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
         msg->msg_receiving = 0;
 
-        rc = lnet_send(ni->ni_nid, msg);
-        if (rc < 0) {
-                /* didn't get as far as lnet_ni_send() */
+       rc = lnet_send(ni->ni_nid, msg, LNET_NID_ANY);
+       if (rc < 0) {
+               /* didn't get as far as lnet_ni_send() */
                 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
                       libcfs_nid2str(ni->ni_nid),
                       libcfs_id2str(info.mi_id), rc);
@@ -1602,9 +1660,9 @@ lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
                if (ni->ni_lnd->lnd_eager_recv == NULL) {
                        msg->msg_rx_ready_delay = 1;
                } else {
-                       LNET_UNLOCK();
+                       lnet_net_unlock(msg->msg_rx_cpt);
                        rc = lnet_ni_eager_recv(ni, msg);
-                       LNET_LOCK();
+                       lnet_net_lock(msg->msg_rx_cpt);
                }
        }
 
@@ -1698,12 +1756,13 @@ lnet_print_hdr(lnet_hdr_t * hdr)
 }
 
 int
-lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, 
-           void *private, int rdma_req)
+lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
+          void *private, int rdma_req)
 {
-        int            rc = 0;
-        int            for_me;
-        lnet_msg_t    *msg;
+       int             rc = 0;
+       int             cpt;
+       int             for_me;
+       struct lnet_msg *msg;
         lnet_pid_t     dest_pid;
         lnet_nid_t     dest_nid;
         lnet_nid_t     src_nid;
@@ -1719,6 +1778,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         payload_length = le32_to_cpu(hdr->payload_length);
 
         for_me = (ni->ni_nid == dest_nid);
+       cpt = lnet_cpt_of_nid(from_nid);
 
         switch (type) {
         case LNET_MSG_ACK:
@@ -1755,14 +1815,14 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
 
        if (the_lnet.ln_routing &&
            ni->ni_last_alive != cfs_time_current_sec()) {
-               LNET_LOCK();
+               lnet_ni_lock(ni);
 
                /* NB: so far here is the only place to set NI status to "up */
                ni->ni_last_alive = cfs_time_current_sec();
                if (ni->ni_status != NULL &&
                    ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
                        ni->ni_status->ns_status = LNET_NI_STATUS_UP;
-               LNET_UNLOCK();
+               lnet_ni_unlock(ni);
         }
 
         /* Regard a bad destination NID as a protocol error.  Senders should
@@ -1855,23 +1915,23 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
                msg->msg_hdr.payload_length = payload_length;
        }
 
-        LNET_LOCK();
-        rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid);
-        if (rc != 0) {
-                LNET_UNLOCK();
-                CERROR("%s, src %s: Dropping %s "
-                       "(error %d looking up sender)\n",
-                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
-                       lnet_msgtyp2str(type), rc);
+       lnet_net_lock(cpt);
+       rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid, cpt);
+       if (rc != 0) {
+               lnet_net_unlock(cpt);
+               CERROR("%s, src %s: Dropping %s "
+                      "(error %d looking up sender)\n",
+                      libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
+                      lnet_msgtyp2str(type), rc);
                lnet_msg_free(msg);
                goto drop;
        }
 
-       lnet_msg_commit(msg, 0);
+       lnet_msg_commit(msg, cpt);
 
        if (!for_me) {
                rc = lnet_parse_forward_locked(ni, msg);
-               LNET_UNLOCK();
+               lnet_net_unlock(cpt);
 
                if (rc < 0)
                        goto free_drop;
@@ -1882,7 +1942,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
                return 0;
        }
 
-       LNET_UNLOCK();
+       lnet_net_unlock(cpt);
 
         switch (type) {
         case LNET_MSG_ACK:
@@ -1913,7 +1973,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
        lnet_finalize(ni, msg, rc);
 
  drop:
-       lnet_drop_message(ni, private, payload_length);
+       lnet_drop_message(ni, cpt, private, payload_length);
        return 0;
 }
 
@@ -1948,11 +2008,12 @@ lnet_drop_delayed_msg_list(cfs_list_t *head, char *reason)
                 * until that's done */
 
                lnet_drop_message(msg->msg_rxpeer->lp_ni,
+                                 msg->msg_rxpeer->lp_cpt,
                                  msg->msg_private, msg->msg_len);
 
-               LNET_LOCK();
+               lnet_net_lock(msg->msg_rxpeer->lp_cpt);
                lnet_peer_decref_locked(msg->msg_rxpeer);
-               LNET_UNLOCK();
+               lnet_net_unlock(msg->msg_rxpeer->lp_cpt);
 
                lnet_msg_free(msg);
        }
@@ -2109,7 +2170,7 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
 
-        rc = lnet_send(self, msg);
+       rc = lnet_send(self, msg, LNET_NID_ANY);
         if (rc != 0) {
                 CNETERR( "Error sending PUT to %s: %d\n",
                        libcfs_id2str(target), rc);
@@ -2172,19 +2233,23 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
        lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
        lnet_res_unlock(cpt);
 
-       LNET_LOCK();
-       lnet_msg_commit(msg, 0);
-       LNET_UNLOCK();
+       cpt = lnet_cpt_of_nid(peer_id.nid);
+
+       lnet_net_lock(cpt);
+       lnet_msg_commit(msg, cpt);
+       lnet_net_unlock(cpt);
 
        lnet_build_msg_event(msg, LNET_EVENT_REPLY);
 
        return msg;
 
  drop:
-       LNET_LOCK();
-       the_lnet.ln_counters->drop_count++;
-       the_lnet.ln_counters->drop_length += getmd->md_length;
-       LNET_UNLOCK ();
+       cpt = lnet_cpt_of_nid(peer_id.nid);
+
+       lnet_net_lock(cpt);
+       the_lnet.ln_counters[cpt]->drop_count++;
+       the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
+       lnet_net_unlock(cpt);
 
        if (msg != NULL)
                lnet_msg_free(msg);
@@ -2296,7 +2361,7 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
 
-        rc = lnet_send(self, msg);
+       rc = lnet_send(self, msg, LNET_NID_ANY);
         if (rc < 0) {
                 CNETERR( "Error sending GET to %s: %d\n",
                        libcfs_id2str(target), rc);
@@ -2322,14 +2387,15 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
  * \retval -EHOSTUNREACH If \a dstnid is not reachable.
  */
 int
-LNetDist (lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
+LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
 {
-        cfs_list_t       *e;
-        lnet_ni_t        *ni;
-        lnet_remotenet_t *rnet;
-        __u32             dstnet = LNET_NIDNET(dstnid);
-        int               hops;
-        __u32             order = 2;
+       cfs_list_t              *e;
+       struct lnet_ni          *ni;
+       lnet_remotenet_t        *rnet;
+       __u32                   dstnet = LNET_NIDNET(dstnid);
+       int                     hops;
+       int                     cpt;
+       __u32                   order = 2;
 
         /* if !local_nid_dist_zero, I don't return a distance of 0 ever
          * (when lustre sees a distance of 0, it substitutes 0@lo), so I
@@ -2339,7 +2405,7 @@ LNetDist (lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
         LASSERT (the_lnet.ln_init);
         LASSERT (the_lnet.ln_refcount > 0);
 
-        LNET_LOCK();
+       cpt = lnet_net_lock_current();
 
         cfs_list_for_each (e, &the_lnet.ln_nis) {
                 ni = cfs_list_entry(e, lnet_ni_t, ni_list);
@@ -2353,7 +2419,7 @@ LNetDist (lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
                                 else
                                         *orderp = 1;
                         }
-                        LNET_UNLOCK();
+                       lnet_net_unlock(cpt);
 
                         return local_nid_dist_zero ? 0 : 1;
                 }
@@ -2363,7 +2429,7 @@ LNetDist (lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
                                 *srcnidp = ni->ni_nid;
                         if (orderp != NULL)
                                 *orderp = order;
-                        LNET_UNLOCK();
+                       lnet_net_unlock(cpt);
                         return 1;
                 }
 
@@ -2392,14 +2458,14 @@ LNetDist (lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
                                 *srcnidp = shortest->lr_gateway->lp_ni->ni_nid;
                         if (orderp != NULL)
                                 *orderp = order;
-                        LNET_UNLOCK();
-                        return hops + 1;
-                }
-                order++;
-        }
+                       lnet_net_unlock(cpt);
+                       return hops + 1;
+               }
+               order++;
+       }
 
-        LNET_UNLOCK();
-        return -EHOSTUNREACH;
+       lnet_net_unlock(cpt);
+       return -EHOSTUNREACH;
 }
 
 /**
@@ -2434,6 +2500,7 @@ LNetSetAsync(lnet_process_id_t id, int nasync)
         int               maxnids = 256;
         int               rc = 0;
         int               rc2;
+       int                     cpt;
 
         /* Target on a local network? */
         ni = lnet_net2ni(LNET_NIDNET(id.nid));
@@ -2452,12 +2519,12 @@ LNetSetAsync(lnet_process_id_t id, int nasync)
         nnids = 0;
 
         /* Snapshot all the router NIDs */
-        LNET_LOCK();
-        rnet = lnet_find_net_locked(LNET_NIDNET(id.nid));
-        if (rnet != NULL) {
-                cfs_list_for_each(tmp, &rnet->lrn_routes) {
-                        if (nnids == maxnids) {
-                                LNET_UNLOCK();
+       cpt = lnet_net_lock_current();
+       rnet = lnet_find_net_locked(LNET_NIDNET(id.nid));
+       if (rnet != NULL) {
+               cfs_list_for_each(tmp, &rnet->lrn_routes) {
+                       if (nnids == maxnids) {
+                               lnet_net_unlock(cpt);
                                 LIBCFS_FREE(nids, maxnids * sizeof(*nids));
                                 maxnids *= 2;
                                 goto again;
@@ -2467,7 +2534,7 @@ LNetSetAsync(lnet_process_id_t id, int nasync)
                         nids[nnids++] = route->lr_gateway->lp_nid;
                 }
         }
-        LNET_UNLOCK();
+       lnet_net_unlock(cpt);
 
         /* set async on all the routers */
         while (nnids-- > 0) {
index 1d78ddb..6c705f0 100644 (file)
@@ -135,19 +135,27 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
 }
 
 void
-lnet_msg_commit(lnet_msg_t *msg, int sending)
+lnet_msg_commit(lnet_msg_t *msg, int cpt)
 {
-       struct lnet_msg_container *container = &the_lnet.ln_msg_container;
-       lnet_counters_t           *counters  = the_lnet.ln_counters;
+       struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt];
+       lnet_counters_t           *counters  = the_lnet.ln_counters[cpt];
 
        /* routed message can be committed for both receiving and sending */
        LASSERT(!msg->msg_tx_committed);
 
-       if (msg->msg_rx_committed) { /* routed message, or reply for GET */
-               LASSERT(sending);
-               LASSERT(msg->msg_onactivelist);
+       if (msg->msg_sending) {
+               LASSERT(!msg->msg_receiving);
+
+               msg->msg_tx_cpt = cpt;
                msg->msg_tx_committed = 1;
-               return;
+               if (msg->msg_rx_committed) { /* routed message REPLY */
+                       LASSERT(msg->msg_onactivelist);
+                       return;
+               }
+       } else {
+               LASSERT(!msg->msg_sending);
+               msg->msg_rx_cpt = cpt;
+               msg->msg_rx_committed = 1;
        }
 
        LASSERT(!msg->msg_onactivelist);
@@ -157,23 +165,19 @@ lnet_msg_commit(lnet_msg_t *msg, int sending)
        counters->msgs_alloc++;
        if (counters->msgs_alloc > counters->msgs_max)
                counters->msgs_max = counters->msgs_alloc;
-
-       if (sending)
-               msg->msg_tx_committed = 1;
-       else
-               msg->msg_rx_committed = 1;
 }
 
 static void
-lnet_msg_tx_decommit(lnet_msg_t *msg, int status)
+lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
 {
-       lnet_counters_t *counters = the_lnet.ln_counters;
-       lnet_event_t *ev = &msg->msg_ev;
+       lnet_counters_t *counters;
+       lnet_event_t    *ev = &msg->msg_ev;
 
        LASSERT(msg->msg_tx_committed);
        if (status != 0)
                goto out;
 
+       counters = the_lnet.ln_counters[msg->msg_tx_cpt];
        switch (ev->type) {
        default: /* routed message */
                LASSERT(msg->msg_routing);
@@ -215,12 +219,12 @@ lnet_msg_tx_decommit(lnet_msg_t *msg, int status)
 }
 
 static void
-lnet_msg_rx_decommit(lnet_msg_t *msg, int status)
+lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
 {
-       lnet_counters_t *counters = the_lnet.ln_counters;
-       lnet_event_t *ev = &msg->msg_ev;
+       lnet_counters_t *counters;
+       lnet_event_t    *ev = &msg->msg_ev;
 
-       LASSERT(!msg->msg_tx_committed); /* decommitted or uncommitted */
+       LASSERT(!msg->msg_tx_committed); /* decommitted or never committed */
        LASSERT(msg->msg_rx_committed);
 
        if (status != 0)
@@ -250,6 +254,7 @@ lnet_msg_rx_decommit(lnet_msg_t *msg, int status)
                break;
        }
 
+       counters = the_lnet.ln_counters[msg->msg_rx_cpt];
        counters->recv_count++;
        if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
                counters->recv_length += msg->msg_wanted;
@@ -260,28 +265,44 @@ lnet_msg_rx_decommit(lnet_msg_t *msg, int status)
 }
 
 void
-lnet_msg_decommit(lnet_msg_t *msg, int status)
+lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status)
 {
-       lnet_counters_t *counters = the_lnet.ln_counters;
+       int     cpt2 = cpt;
 
        LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
        LASSERT(msg->msg_onactivelist);
 
-       if (msg->msg_tx_committed) /* always decommit for sending first */
-               lnet_msg_tx_decommit(msg, status);
+       if (msg->msg_tx_committed) { /* always decommit for sending first */
+               LASSERT(cpt == msg->msg_tx_cpt);
+               lnet_msg_decommit_tx(msg, status);
+       }
 
-       if (msg->msg_rx_committed)
-               lnet_msg_rx_decommit(msg, status);
+       if (msg->msg_rx_committed) {
+               /* forwarding msg committed for both receiving and sending */
+               if (cpt != msg->msg_rx_cpt) {
+                       lnet_net_unlock(cpt);
+                       cpt2 = msg->msg_rx_cpt;
+                       lnet_net_lock(cpt2);
+               }
+               lnet_msg_decommit_rx(msg, status);
+       }
 
        cfs_list_del(&msg->msg_activelist);
        msg->msg_onactivelist = 0;
-       counters->msgs_alloc--;
+
+       the_lnet.ln_counters[cpt2]->msgs_alloc--;
+
+       if (cpt2 != cpt) {
+               lnet_net_unlock(cpt2);
+               lnet_net_lock(cpt);
+       }
 }
 
 void
 lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
                   unsigned int offset, unsigned int mlen)
 {
+       /* NB: @offset and @len are only useful for receiving */
        /* Here, we attach the MD on lnet_msg and mark it busy and
         * decrementing its threshold. Come what may, the lnet_msg "owns"
         * the MD until a call to lnet_msg_detach_md or lnet_finalize()
@@ -329,7 +350,7 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status)
 }
 
 void
-lnet_complete_msg_locked(lnet_msg_t *msg)
+lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
 {
         lnet_handle_wire_t ack_wmd;
         int                rc;
@@ -340,10 +361,10 @@ lnet_complete_msg_locked(lnet_msg_t *msg)
         if (status == 0 && msg->msg_ack) {
                 /* Only send an ACK if the PUT completed successfully */
 
-               lnet_msg_decommit(msg, 0);
+               lnet_msg_decommit(msg, cpt, 0);
 
-                msg->msg_ack = 0;
-                LNET_UNLOCK();
+               msg->msg_ack = 0;
+               lnet_net_unlock(cpt);
 
                 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
                 LASSERT(!msg->msg_routing);
@@ -356,32 +377,32 @@ lnet_complete_msg_locked(lnet_msg_t *msg)
                 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
                 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
 
-                rc = lnet_send(msg->msg_ev.target.nid, msg);
+               /* NB: we probably want to use NID of msg::msg_from as 3rd
+                * parameter (router NID) if it's routed message */
+               rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY);
+
+               lnet_net_lock(cpt);
 
-                LNET_LOCK();
+               if (rc == 0)
+                       return;
+       } else if (status == 0 &&       /* OK so far */
+                  (msg->msg_routing && !msg->msg_sending)) {
+               /* not forwarded */
+               LASSERT(!msg->msg_receiving);   /* called back recv already */
+               lnet_net_unlock(cpt);
 
-                if (rc == 0)
-                        return;
-        } else if (status == 0 &&               /* OK so far */
-                   (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
-                
-                LASSERT (!msg->msg_receiving);  /* called back recv already */
-        
-                LNET_UNLOCK();
-                
-                rc = lnet_send(LNET_NID_ANY, msg);
+               rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY);
 
-                LNET_LOCK();
+               lnet_net_lock(cpt);
 
-                if (rc == 0)
-                        return;
-        }
+               if (rc == 0)
+                       return;
+       }
 
-       lnet_msg_decommit(msg, status);
+       lnet_msg_decommit(msg, cpt, status);
        lnet_msg_free_locked(msg);
 }
 
-
 void
 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
 {
@@ -431,8 +452,15 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                return;
        }
 
-       LNET_LOCK();
-       container = &the_lnet.ln_msg_container;
+       /*
+        * NB: routed message can be commited for both receiving and sending,
+        * we should finalize in LIFO order and keep counters correct.
+        * (finalize sending first then finalize receiving)
+        */
+       cpt = msg->msg_tx_committed ? msg->msg_tx_cpt : msg->msg_rx_cpt;
+       lnet_net_lock(cpt);
+
+       container = the_lnet.ln_msg_containers[cpt];
        cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
 
        /* Recursion breaker.  Don't complete the message here if I am (or
@@ -463,18 +491,18 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
 
        while (!cfs_list_empty(&container->msc_finalizing)) {
                msg = cfs_list_entry(container->msc_finalizing.next,
-                                     lnet_msg_t, msg_list);
+                                    lnet_msg_t, msg_list);
 
-                cfs_list_del(&msg->msg_list);
+               cfs_list_del(&msg->msg_list);
 
-                /* NB drops and regains the lnet lock if it actually does
-                 * anything, so my finalizing friends can chomp along too */
-                lnet_complete_msg_locked(msg);
-        }
+               /* NB drops and regains the lnet lock if it actually does
+                * anything, so my finalizing friends can chomp along too */
+               lnet_complete_msg_locked(msg, cpt);
+       }
 
        container->msc_finalizers[my_slot] = NULL;
  out:
-       LNET_UNLOCK();
+       lnet_net_unlock(cpt);
 }
 
 void
@@ -512,7 +540,7 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container)
 }
 
 int
-lnet_msg_container_setup(struct lnet_msg_container *container)
+lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
 {
        int     rc;
 
@@ -535,11 +563,11 @@ lnet_msg_container_setup(struct lnet_msg_container *container)
        rc = 0;
 #endif
        /* number of CPUs */
-       container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table,
-                                                   CFS_CPT_ANY);
-       LIBCFS_ALLOC(container->msc_finalizers,
-                    container->msc_nfinalizers *
-                    sizeof(*container->msc_finalizers));
+       container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
+
+       LIBCFS_CPT_ALLOC(container->msc_finalizers, lnet_cpt_table(), cpt,
+                        container->msc_nfinalizers *
+                        sizeof(*container->msc_finalizers));
 
        if (container->msc_finalizers == NULL) {
                CERROR("Failed to allocate message finalizers\n");
@@ -549,3 +577,45 @@ lnet_msg_container_setup(struct lnet_msg_container *container)
 
        return rc;
 }
+
+void
+lnet_msg_containers_destroy(void)
+{
+       struct lnet_msg_container *container;
+       int     i;
+
+       if (the_lnet.ln_msg_containers == NULL)
+               return;
+
+       cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers)
+               lnet_msg_container_cleanup(container);
+
+       cfs_percpt_free(the_lnet.ln_msg_containers);
+       the_lnet.ln_msg_containers = NULL;
+}
+
+int
+lnet_msg_containers_create(void)
+{
+       struct lnet_msg_container *container;
+       int     rc;
+       int     i;
+
+       the_lnet.ln_msg_containers = cfs_percpt_alloc(lnet_cpt_table(),
+                                                     sizeof(*container));
+
+       if (the_lnet.ln_msg_containers == NULL) {
+               CERROR("Failed to allocate cpu-partition data for network\n");
+               return -ENOMEM;
+       }
+
+       cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) {
+               rc = lnet_msg_container_setup(container, i);
+               if (rc != 0) {
+                       lnet_msg_containers_destroy();
+                       return rc;
+               }
+       }
+
+       return 0;
+}
index 2327797..8f7ec08 100644 (file)
@@ -185,7 +185,6 @@ EXPORT_SYMBOL(lnet_parse);
 EXPORT_SYMBOL(lnet_create_reply_msg);
 EXPORT_SYMBOL(lnet_set_reply_msg_len);
 EXPORT_SYMBOL(lnet_msgtyp2str);
-EXPORT_SYMBOL(lnet_net2ni_locked);
 
 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
 MODULE_DESCRIPTION("Portals v3.1");
index 419372f..b257624 100644 (file)
 #include <lnet/lib-lnet.h>
 
 int
-lnet_peer_table_create(void)
+lnet_peer_tables_create(void)
 {
        struct lnet_peer_table  *ptable;
        cfs_list_t              *hash;
+       int                     i;
        int                     j;
 
-       LIBCFS_ALLOC(ptable, sizeof(*ptable));
-       if (ptable == NULL) {
+       the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
+                                                  sizeof(*ptable));
+       if (the_lnet.ln_peer_tables == NULL) {
                CERROR("Failed to allocate cpu-partition peer tables\n");
                return -ENOMEM;
        }
 
-       the_lnet.ln_peer_table = ptable;
-
-       do { /* we will have per CPT peer-tables iterate them by then */
+       cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
                CFS_INIT_LIST_HEAD(&ptable->pt_deathrow);
 
-               LIBCFS_ALLOC(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
+               LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
+                                LNET_PEER_HASH_SIZE * sizeof(*hash));
                if (hash == NULL) {
                        CERROR("Failed to create peer hash table\n");
-                       lnet_peer_table_destroy();
+                       lnet_peer_tables_destroy();
                        return -ENOMEM;
                }
 
                for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
                        CFS_INIT_LIST_HEAD(&hash[j]);
                ptable->pt_hash = hash; /* sign of initialization */
-       } while (0);
+       }
 
        return 0;
 }
 
 void
-lnet_peer_table_destroy(void)
+lnet_peer_tables_destroy(void)
 {
        struct lnet_peer_table  *ptable;
        cfs_list_t              *hash;
+       int                     i;
        int                     j;
 
-       if (the_lnet.ln_peer_table == NULL)
+       if (the_lnet.ln_peer_tables == NULL)
                return;
 
-       ptable = the_lnet.ln_peer_table;
-
-       do { /* we will have per CPT peer-tables iterate them by then */
+       cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
                hash = ptable->pt_hash;
                if (hash == NULL) /* not intialized */
                        break;
@@ -95,23 +95,23 @@ lnet_peer_table_destroy(void)
                        LASSERT(cfs_list_empty(&hash[j]));
 
                LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
-       } while (0);
+       }
 
-       LIBCFS_FREE(ptable, sizeof(*ptable));
-       the_lnet.ln_peer_table = NULL;
+       cfs_percpt_free(the_lnet.ln_peer_tables);
+       the_lnet.ln_peer_tables = NULL;
 }
 
 void
-lnet_peer_table_cleanup(void)
+lnet_peer_tables_cleanup(void)
 {
        struct lnet_peer_table  *ptable;
+       int                     i;
        int                     j;
 
        LASSERT(the_lnet.ln_shutdown);  /* i.e. no new peers */
-       ptable = the_lnet.ln_peer_table;
 
-       do { /* we will have per CPT peer-tables iterate them by then */
-               LNET_LOCK();
+       cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
+               lnet_net_lock(i);
 
                for (j = 0; j < LNET_PEER_HASH_SIZE; j++) {
                        cfs_list_t *peers = &ptable->pt_hash[j];
@@ -126,17 +126,17 @@ lnet_peer_table_cleanup(void)
                        }
                }
 
-               LNET_UNLOCK();
-       } while (0);
+               lnet_net_unlock(i);
+       }
 
-       do { /* we will have per CPT peer-tables iterate them by then */
+       cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
                CFS_LIST_HEAD   (deathrow);
                lnet_peer_t     *lp;
 
-               LNET_LOCK();
+               lnet_net_lock(i);
 
                for (j = 3; ptable->pt_number != 0; j++) {
-                       LNET_UNLOCK();
+                       lnet_net_unlock(i);
 
                        if ((j & (j - 1)) == 0) {
                                CDEBUG(D_WARNING,
@@ -144,11 +144,11 @@ lnet_peer_table_cleanup(void)
                                       ptable->pt_number);
                        }
                        cfs_pause(cfs_time_seconds(1) / 2);
-                       LNET_LOCK();
+                       lnet_net_lock(i);
                }
                cfs_list_splice_init(&ptable->pt_deathrow, &deathrow);
 
-               LNET_UNLOCK();
+               lnet_net_unlock(i);
 
                while (!cfs_list_empty(&deathrow)) {
                        lp = cfs_list_entry(deathrow.next,
@@ -156,13 +156,13 @@ lnet_peer_table_cleanup(void)
                        cfs_list_del(&lp->lp_hashlist);
                        LIBCFS_FREE(lp, sizeof(*lp));
                }
-       } while (0);
+       }
 }
 
 void
 lnet_destroy_peer_locked(lnet_peer_t *lp)
 {
-       struct lnet_peer_table *ptable = the_lnet.ln_peer_table;
+       struct lnet_peer_table *ptable;
 
        LASSERT(lp->lp_refcount == 0);
        LASSERT(lp->lp_rtr_refcount == 0);
@@ -170,17 +170,18 @@ lnet_destroy_peer_locked(lnet_peer_t *lp)
        LASSERT(cfs_list_empty(&lp->lp_hashlist));
        LASSERT(lp->lp_txqnob == 0);
 
+       ptable = the_lnet.ln_peer_tables[lp->lp_cpt];
        LASSERT(ptable->pt_number > 0);
        ptable->pt_number--;
 
-       lnet_ni_decref_locked(lp->lp_ni);
+       lnet_ni_decref_locked(lp->lp_ni, lp->lp_cpt);
        lp->lp_ni = NULL;
 
        cfs_list_add(&lp->lp_hashlist, &ptable->pt_deathrow);
 }
 
 lnet_peer_t *
-lnet_find_peer_locked(lnet_nid_t nid)
+lnet_find_peer_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
 {
        cfs_list_t      *peers;
        lnet_peer_t     *lp;
@@ -188,7 +189,7 @@ lnet_find_peer_locked(lnet_nid_t nid)
        if (the_lnet.ln_shutdown)
                return NULL;
 
-       peers = &the_lnet.ln_peer_table->pt_hash[lnet_nid2peerhash(nid)];
+       peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
        cfs_list_for_each_entry(lp, peers, lp_hashlist) {
                if (lp->lp_nid == nid) {
                        lnet_peer_addref_locked(lp);
@@ -200,13 +201,18 @@ lnet_find_peer_locked(lnet_nid_t nid)
 }
 
 int
-lnet_nid2peer_locked(lnet_peer_t **lpp, lnet_nid_t nid)
+lnet_nid2peer_locked(lnet_peer_t **lpp, lnet_nid_t nid, int cpt)
 {
-       struct lnet_peer_table  *ptable = the_lnet.ln_peer_table;
+       struct lnet_peer_table  *ptable;
        lnet_peer_t             *lp = NULL;
        lnet_peer_t             *lp2;
+       int                     cpt2;
+
+       /* cpt can be LNET_LOCK_EX if it's called from router functions */
+       cpt2 = cpt != LNET_LOCK_EX ? cpt : lnet_cpt_of_nid(nid);
 
-        lp = lnet_find_peer_locked(nid);
+       ptable = the_lnet.ln_peer_tables[cpt2];
+       lp = lnet_find_peer_locked(ptable, nid);
         if (lp != NULL) {
                 *lpp = lp;
                 return 0;
@@ -218,7 +224,7 @@ lnet_nid2peer_locked(lnet_peer_t **lpp, lnet_nid_t nid)
                cfs_list_del(&lp->lp_hashlist);
        }
 
-       LNET_UNLOCK();
+       lnet_net_unlock(cpt);
 
        if (lp != NULL)
                memset(lp, 0, sizeof(*lp));
@@ -245,14 +251,15 @@ lnet_nid2peer_locked(lnet_peer_t **lpp, lnet_nid_t nid)
         lp->lp_last_query = 0; /* haven't asked NI yet */
         lp->lp_ping_timestamp = 0;
        lp->lp_ping_version = LNET_PROTO_PING_UNKNOWN;
-        lp->lp_nid = nid;
-        lp->lp_refcount = 2;                    /* 1 for caller; 1 for hash */
-        lp->lp_rtr_refcount = 0;
+       lp->lp_nid = nid;
+       lp->lp_cpt = cpt2;
+       lp->lp_refcount = 2;                    /* 1 for caller; 1 for hash */
+       lp->lp_rtr_refcount = 0;
 
-        LNET_LOCK();
+       lnet_net_lock(cpt);
 
-        lp2 = lnet_find_peer_locked(nid);
-        if (lp2 != NULL) {
+       lp2 = lnet_find_peer_locked(ptable, nid);
+       if (lp2 != NULL) {
                cfs_list_add(&lp->lp_hashlist, &ptable->pt_deathrow);
 
                 if (the_lnet.ln_shutdown) {
@@ -264,9 +271,9 @@ lnet_nid2peer_locked(lnet_peer_t **lpp, lnet_nid_t nid)
                 *lpp = lp2;
                 return 0;
         }
-                
-        lp->lp_ni = lnet_net2ni_locked(LNET_NIDNET(nid));
-        if (lp->lp_ni == NULL) {
+
+       lp->lp_ni = lnet_net2ni_locked(LNET_NIDNET(nid), cpt2);
+       if (lp->lp_ni == NULL) {
                cfs_list_add(&lp->lp_hashlist, &ptable->pt_deathrow);
 
                 *lpp = NULL;
@@ -293,15 +300,17 @@ lnet_nid2peer_locked(lnet_peer_t **lpp, lnet_nid_t nid)
 void
 lnet_debug_peer(lnet_nid_t nid)
 {
-        char        *aliveness = "NA";
-        int          rc;
-        lnet_peer_t *lp;
+       char            *aliveness = "NA";
+       lnet_peer_t     *lp;
+       int             rc;
+       int             cpt;
 
-        LNET_LOCK();
+       cpt = lnet_cpt_of_nid(nid);
+       lnet_net_lock(cpt);
 
-        rc = lnet_nid2peer_locked(&lp, nid);
-        if (rc != 0) {
-                LNET_UNLOCK();
+       rc = lnet_nid2peer_locked(&lp, nid, cpt);
+       if (rc != 0) {
+               lnet_net_unlock(cpt);
                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
                 return;
         }
@@ -317,5 +326,5 @@ lnet_debug_peer(lnet_nid_t nid)
 
         lnet_peer_decref_locked(lp);
 
-        LNET_UNLOCK();
+       lnet_net_unlock(cpt);
 }
index 6faf95b..2ba0a11 100644 (file)
 
 #if defined(__KERNEL__) && defined(LNET_ROUTER)
 
-#define LNET_NRB_TINY          1024
-#define LNET_NRB_SMALL         8192
-#define LNET_NRB_LARGE         512
+#define LNET_NRB_TINY_MIN      512     /* min value for each CPT */
+#define LNET_NRB_TINY          (LNET_NRB_TINY_MIN * 4)
+#define LNET_NRB_SMALL_MIN     4096    /* min value for each CPT */
+#define LNET_NRB_SMALL         (LNET_NRB_SMALL_MIN * 4)
+#define LNET_NRB_LARGE_MIN     256     /* min value for each CPT */
+#define LNET_NRB_LARGE         (LNET_NRB_LARGE_MIN * 4)
 
 static char *forwarding = "";
 CFS_MODULE_PARM(forwarding, "s", charp, 0444,
@@ -155,27 +158,28 @@ lnet_ni_notify_locked(lnet_ni_t *ni, lnet_peer_t *lp)
                 lp->lp_notify    = 0;
 
                 if (notifylnd && ni->ni_lnd->lnd_notify != NULL) {
-                        LNET_UNLOCK();
+                       lnet_net_unlock(lp->lp_cpt);
 
-                        /* A new notification could happen now; I'll handle it
-                         * when control returns to me */
+                       /* A new notification could happen now; I'll handle it
+                        * when control returns to me */
 
-                        (ni->ni_lnd->lnd_notify)(ni, lp->lp_nid, alive);
+                       (ni->ni_lnd->lnd_notify)(ni, lp->lp_nid, alive);
 
-                        LNET_LOCK();
-                }
-        }
+                       lnet_net_lock(lp->lp_cpt);
+               }
+       }
 
-        lp->lp_notifying = 0;
+       lp->lp_notifying = 0;
 }
 
 
 static void
 lnet_rtr_addref_locked(lnet_peer_t *lp)
 {
-        LASSERT (lp->lp_refcount > 0);
-        LASSERT (lp->lp_rtr_refcount >= 0);
+       LASSERT(lp->lp_refcount > 0);
+       LASSERT(lp->lp_rtr_refcount >= 0);
 
+       /* lnet_net_lock must be exclusively locked */
         lp->lp_rtr_refcount++;
         if (lp->lp_rtr_refcount == 1) {
                 cfs_list_t *pos;
@@ -199,15 +203,16 @@ lnet_rtr_addref_locked(lnet_peer_t *lp)
 static void
 lnet_rtr_decref_locked(lnet_peer_t *lp)
 {
-        LASSERT (lp->lp_refcount > 0);
-        LASSERT (lp->lp_rtr_refcount > 0);
+       LASSERT(lp->lp_refcount > 0);
+       LASSERT(lp->lp_rtr_refcount > 0);
 
-        lp->lp_rtr_refcount--;
-        if (lp->lp_rtr_refcount == 0) {
+       /* lnet_net_lock must be exclusively locked */
+       lp->lp_rtr_refcount--;
+       if (lp->lp_rtr_refcount == 0) {
                LASSERT(cfs_list_empty(&lp->lp_routes));
 
-                if (lp->lp_rcd != NULL) {
-                        cfs_list_add(&lp->lp_rcd->rcd_list,
+               if (lp->lp_rcd != NULL) {
+                       cfs_list_add(&lp->lp_rcd->rcd_list,
                                     &the_lnet.ln_rcd_deathrow);
                         lp->lp_rcd = NULL;
                 }
@@ -336,14 +341,14 @@ lnet_add_route (__u32 net, unsigned int hops, lnet_nid_t gateway)
         route->lr_hops = hops;
        route->lr_net = net;
 
-        LNET_LOCK();
+       lnet_net_lock(LNET_LOCK_EX);
 
-        rc = lnet_nid2peer_locked(&route->lr_gateway, gateway);
-        if (rc != 0) {
-                LNET_UNLOCK();
+       rc = lnet_nid2peer_locked(&route->lr_gateway, gateway, LNET_LOCK_EX);
+       if (rc != 0) {
+               lnet_net_unlock(LNET_LOCK_EX);
 
-                LIBCFS_FREE(route, sizeof(*route));
-                LIBCFS_FREE(rnet, sizeof(*rnet));
+               LIBCFS_FREE(route, sizeof(*route));
+               LIBCFS_FREE(rnet, sizeof(*rnet));
 
                if (rc == -EHOSTUNREACH) { /* gateway is not on a local net */
                         return 0;               /* ignore the route entry */
@@ -383,18 +388,18 @@ lnet_add_route (__u32 net, unsigned int hops, lnet_nid_t gateway)
                lnet_add_route_to_rnet(rnet2, route);
 
                ni = route->lr_gateway->lp_ni;
-               LNET_UNLOCK();
+               lnet_net_unlock(LNET_LOCK_EX);
 
                /* XXX Assume alive */
                if (ni->ni_lnd->lnd_notify != NULL)
                        (ni->ni_lnd->lnd_notify)(ni, gateway, 1);
 
-               LNET_LOCK();
+               lnet_net_lock(LNET_LOCK_EX);
        }
 
        /* -1 for notify or !add_route */
        lnet_peer_decref_locked(route->lr_gateway);
-       LNET_UNLOCK();
+       lnet_net_unlock(LNET_LOCK_EX);
 
        if (!add_route)
                LIBCFS_FREE(route, sizeof(*route));
@@ -406,26 +411,27 @@ lnet_add_route (__u32 net, unsigned int hops, lnet_nid_t gateway)
 }
 
 int
-lnet_check_routes (void)
+lnet_check_routes(void)
 {
-        lnet_remotenet_t    *rnet;
-        lnet_route_t        *route;
-        lnet_route_t        *route2;
-        cfs_list_t          *e1;
-        cfs_list_t          *e2;
+       lnet_remotenet_t        *rnet;
+       lnet_route_t            *route;
+       lnet_route_t            *route2;
+       cfs_list_t              *e1;
+       cfs_list_t              *e2;
+       int                     cpt;
 
-        LNET_LOCK();
+       cpt = lnet_net_lock_current();
 
-        cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
-                rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
+       cfs_list_for_each(e1, &the_lnet.ln_remote_nets) {
+               rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
 
-                route2 = NULL;
-                cfs_list_for_each (e2, &rnet->lrn_routes) {
+               route2 = NULL;
+               cfs_list_for_each(e2, &rnet->lrn_routes) {
                        lnet_nid_t      nid1;
                        lnet_nid_t      nid2;
                        int             net;
 
-                        route = cfs_list_entry(e2, lnet_route_t, lr_list);
+                       route = cfs_list_entry(e2, lnet_route_t, lr_list);
 
                        if (route2 == NULL) {
                                route2 = route;
@@ -440,17 +446,17 @@ lnet_check_routes (void)
                        nid2 = route2->lr_gateway->lp_nid;
                        net = rnet->lrn_net;
 
-                       LNET_UNLOCK();
+                       lnet_net_unlock(cpt);
 
                        CERROR("Routes to %s via %s and %s not supported\n",
                               libcfs_net2str(net), libcfs_nid2str(nid1),
                               libcfs_nid2str(nid2));
                        return -EINVAL;
-                }
-        }
+               }
+       }
 
-        LNET_UNLOCK();
-        return 0;
+       lnet_net_unlock(cpt);
+       return 0;
 }
 
 int
@@ -470,7 +476,7 @@ lnet_del_route (__u32 net, lnet_nid_t gw_nid)
          * or a specific route entry actual NIDs) */
 
  again:
-        LNET_LOCK();
+       lnet_net_lock(LNET_LOCK_EX);
 
         cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
                 rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
@@ -498,7 +504,8 @@ lnet_del_route (__u32 net, lnet_nid_t gw_nid)
 
                        lnet_rtr_decref_locked(gateway);
                        lnet_peer_decref_locked(gateway);
-                        LNET_UNLOCK();
+
+                       lnet_net_unlock(LNET_LOCK_EX);
 
                         LIBCFS_FREE(route, sizeof (*route));
 
@@ -510,8 +517,8 @@ lnet_del_route (__u32 net, lnet_nid_t gw_nid)
                 }
         }
 
-        LNET_UNLOCK();
-        return rc;
+       lnet_net_unlock(LNET_LOCK_EX);
+       return rc;
 }
 
 void
@@ -521,15 +528,16 @@ lnet_destroy_routes (void)
 }
 
 int
-lnet_get_route (int idx, __u32 *net, __u32 *hops,
-               lnet_nid_t *gateway, __u32 *alive)
+lnet_get_route(int idx, __u32 *net, __u32 *hops,
+              lnet_nid_t *gateway, __u32 *alive)
 {
-        cfs_list_t          *e1;
-        cfs_list_t          *e2;
-        lnet_remotenet_t    *rnet;
-        lnet_route_t        *route;
+       cfs_list_t              *e1;
+       cfs_list_t              *e2;
+       lnet_remotenet_t        *rnet;
+       lnet_route_t            *route;
+       int                     cpt;
 
-        LNET_LOCK();
+       cpt = lnet_net_lock_current();
 
         cfs_list_for_each (e1, &the_lnet.ln_remote_nets) {
                 rnet = cfs_list_entry(e1, lnet_remotenet_t, lrn_list);
@@ -542,14 +550,14 @@ lnet_get_route (int idx, __u32 *net, __u32 *hops,
                                 *hops    = route->lr_hops;
                                 *gateway = route->lr_gateway->lp_nid;
                                 *alive   = route->lr_gateway->lp_alive;
-                                LNET_UNLOCK();
-                                return 0;
-                        }
-                }
-        }
+                               lnet_net_unlock(cpt);
+                               return 0;
+                       }
+               }
+       }
 
-        LNET_UNLOCK();
-        return -ENOENT;
+       lnet_net_unlock(cpt);
+       return -ENOENT;
 }
 
 void
@@ -679,7 +687,10 @@ lnet_router_checker_event(lnet_event_t *event)
        lp = rcd->rcd_gateway;
        LASSERT(lp != NULL);
 
-       LNET_LOCK();
+        /* NB: it's called with holding lnet_res_lock, we have a few
+         * places need to hold both locks at the same time, please take
+         * care of lock ordering */
+       lnet_net_lock(lp->lp_cpt);
        if (!lnet_isrouter(lp) || lp->lp_rcd != rcd) {
                /* ignore if no longer a router or rcd is replaced */
                goto out;
@@ -707,7 +718,7 @@ lnet_router_checker_event(lnet_event_t *event)
                lnet_parse_rc_info(rcd);
 
  out:
-       LNET_UNLOCK();
+       lnet_net_unlock(lp->lp_cpt);
 }
 
 void
@@ -720,7 +731,7 @@ lnet_wait_known_routerstate(void)
         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
 
         for (;;) {
-                LNET_LOCK();
+               int     cpt = lnet_net_lock_current();
 
                 all_known = 1;
                 cfs_list_for_each (entry, &the_lnet.ln_routers) {
@@ -732,7 +743,7 @@ lnet_wait_known_routerstate(void)
                         }
                 }
 
-                LNET_UNLOCK();
+               lnet_net_unlock(cpt);
 
                 if (all_known)
                         return;
@@ -764,6 +775,13 @@ lnet_update_ni_status_locked(void)
                if (now < ni->ni_last_alive + timeout)
                        continue;
 
+               lnet_ni_lock(ni);
+               /* re-check with lock */
+               if (now < ni->ni_last_alive + timeout) {
+                       lnet_ni_unlock(ni);
+                       continue;
+               }
+
                LASSERT(ni->ni_status != NULL);
 
                if (ni->ni_status->ns_status != LNET_NI_STATUS_DOWN) {
@@ -773,20 +791,23 @@ lnet_update_ni_status_locked(void)
                         * NI status to "down" */
                        ni->ni_status->ns_status = LNET_NI_STATUS_DOWN;
                }
+               lnet_ni_unlock(ni);
        }
 }
 
 void
-lnet_destroy_rc_data (lnet_rc_data_t *rcd)
+lnet_destroy_rc_data(lnet_rc_data_t *rcd)
 {
        LASSERT(cfs_list_empty(&rcd->rcd_list));
        /* detached from network */
        LASSERT(LNetHandleIsInvalid(rcd->rcd_mdh));
 
        if (rcd->rcd_gateway != NULL) {
-               LNET_LOCK();
+               int cpt = rcd->rcd_gateway->lp_cpt;
+
+               lnet_net_lock(cpt);
                lnet_peer_decref_locked(rcd->rcd_gateway);
-               LNET_UNLOCK();
+               lnet_net_unlock(cpt);
        }
 
        if (rcd->rcd_pinginfo != NULL)
@@ -803,7 +824,7 @@ lnet_create_rc_data_locked(lnet_peer_t *gateway)
        int                     rc;
        int                     i;
 
-       LNET_UNLOCK();
+       lnet_net_unlock(gateway->lp_cpt);
 
        LIBCFS_ALLOC(rcd, sizeof(*rcd));
        if (rcd == NULL)
@@ -838,10 +859,10 @@ lnet_create_rc_data_locked(lnet_peer_t *gateway)
        }
        LASSERT(rc == 0);
 
-       LNET_LOCK();
+       lnet_net_lock(gateway->lp_cpt);
        /* router table changed or someone has created rcd for this gateway */
        if (!lnet_isrouter(gateway) || gateway->lp_rcd != NULL) {
-               LNET_UNLOCK();
+               lnet_net_unlock(gateway->lp_cpt);
                goto out;
        }
 
@@ -861,7 +882,7 @@ lnet_create_rc_data_locked(lnet_peer_t *gateway)
                lnet_destroy_rc_data(rcd);
        }
 
-       LNET_LOCK();
+       lnet_net_lock(gateway->lp_cpt);
        return gateway->lp_rcd;
 }
 
@@ -937,12 +958,12 @@ lnet_ping_router_locked (lnet_peer_t *rtr)
                                cfs_time_shift(router_ping_timeout);
                }
 
-                LNET_UNLOCK();
+               lnet_net_unlock(rtr->lp_cpt);
 
-                rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL,
-                             LNET_PROTO_PING_MATCHBITS, 0);
+               rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL,
+                            LNET_PROTO_PING_MATCHBITS, 0);
 
-                LNET_LOCK();
+               lnet_net_lock(rtr->lp_cpt);
                 if (rc != 0)
                         rtr->lp_ping_notsent = 0; /* no event pending */
         }
@@ -965,7 +986,7 @@ lnet_router_checker_start(void)
         LASSERT (check_routers_before_use);
         LASSERT (dead_router_check_interval > 0);
 
-        LNET_LOCK();
+       lnet_net_lock(0);
 
         /* As an approximation, allow each router the same number of
          * outstanding events as it is allowed outstanding sends */
@@ -982,7 +1003,7 @@ lnet_router_checker_start(void)
                 id.nid = rtr->lp_nid;
                 id.pid = LUSTRE_SRV_LNET_PID;
 
-                LNET_UNLOCK();
+               lnet_net_unlock(0);
 
                 rc = LNetSetAsync(id, 1);
                 if (rc != 0) {
@@ -991,12 +1012,12 @@ lnet_router_checker_start(void)
                         return rc;
                 }
 
-                LNET_LOCK();
-                /* NB router list doesn't change in userspace */
-                LASSERT (version == the_lnet.ln_routers_version);
-        }
+               lnet_net_lock(0);
+               /* NB router list doesn't change in userspace */
+               LASSERT(version == the_lnet.ln_routers_version);
+       }
 
-        LNET_UNLOCK();
+       lnet_net_unlock(0);
 
         if (nrtr == 0) {
                 CDEBUG(D_NET,
@@ -1108,7 +1129,7 @@ lnet_prune_rc_data(int wait_unlink)
 
        CFS_INIT_LIST_HEAD(&head);
 
-       LNET_LOCK();
+       lnet_net_lock(LNET_LOCK_EX);
 
        if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
                /* router checker is stopping, prune all */
@@ -1128,12 +1149,12 @@ lnet_prune_rc_data(int wait_unlink)
        cfs_list_splice_init(&the_lnet.ln_rcd_deathrow, &head);
 
        if (!cfs_list_empty(&head)) {
-               LNET_UNLOCK();
+               lnet_net_unlock(LNET_LOCK_EX);
 
                cfs_list_for_each_entry(rcd, &head, rcd_list)
                        LNetMDUnlink(rcd->rcd_mdh);
 
-               LNET_LOCK();
+               lnet_net_lock(LNET_LOCK_EX);
         }
 
        cfs_list_splice_init(&head, &the_lnet.ln_rcd_zombie);
@@ -1149,7 +1170,7 @@ lnet_prune_rc_data(int wait_unlink)
                wait_unlink = wait_unlink &&
                              !cfs_list_empty(&the_lnet.ln_rcd_zombie);
 
-               LNET_UNLOCK();
+               lnet_net_unlock(LNET_LOCK_EX);
 
                while (!cfs_list_empty(&head)) {
                        rcd = cfs_list_entry(head.next,
@@ -1166,7 +1187,7 @@ lnet_prune_rc_data(int wait_unlink)
                       "Waiting for rc buffers to unlink\n");
                cfs_pause(cfs_time_seconds(1) / 4);
 
-               LNET_LOCK();
+               lnet_net_lock(LNET_LOCK_EX);
        }
 }
 
@@ -1185,14 +1206,27 @@ lnet_router_checker(void *arg)
         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
 
         while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
-                __u64 version;
+               __u64   version;
+               int     cpt;
+               int     cpt2;
 
-                LNET_LOCK();
+               cpt = lnet_net_lock_current();
 rescan:
-                version = the_lnet.ln_routers_version;
+               version = the_lnet.ln_routers_version;
+
+               cfs_list_for_each(entry, &the_lnet.ln_routers) {
+                       rtr = cfs_list_entry(entry, lnet_peer_t, lp_rtr_list);
+
+                       cpt2 = lnet_cpt_of_nid(rtr->lp_nid);
+                       if (cpt != cpt2) {
+                               lnet_net_unlock(cpt);
+                               cpt = cpt2;
+                               lnet_net_lock(cpt);
+                               /* the routers list has changed */
+                               if (version != the_lnet.ln_routers_version)
+                                       goto rescan;
+                       }
 
-                cfs_list_for_each (entry, &the_lnet.ln_routers) {
-                        rtr = cfs_list_entry(entry, lnet_peer_t, lp_rtr_list);
                         lnet_ping_router_locked(rtr);
 
                         /* NB dropped lock */
@@ -1205,7 +1239,7 @@ rescan:
                if (the_lnet.ln_routing)
                        lnet_update_ni_status_locked();
 
-               LNET_UNLOCK();
+               lnet_net_unlock(cpt);
 
                lnet_prune_rc_data(0); /* don't wait for UNLINK */
 
@@ -1238,7 +1272,7 @@ lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
 }
 
 lnet_rtrbuf_t *
-lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp)
+lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp, int cpt)
 {
         int            npages = rbp->rbp_npages;
         int            sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
@@ -1246,14 +1280,15 @@ lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp)
         lnet_rtrbuf_t *rb;
         int            i;
 
-        LIBCFS_ALLOC(rb, sz);
-        if (rb == NULL)
-                return NULL;
+       LIBCFS_CPT_ALLOC(rb, lnet_cpt_table(), cpt, sz);
+       if (rb == NULL)
+               return NULL;
 
-        rb->rb_pool = rbp;
+       rb->rb_pool = rbp;
 
-        for (i = 0; i < npages; i++) {
-                page = cfs_alloc_page(CFS_ALLOC_ZERO | CFS_ALLOC_STD);
+       for (i = 0; i < npages; i++) {
+               page = cfs_page_cpt_alloc(lnet_cpt_table(), cpt,
+                                         CFS_ALLOC_ZERO | CFS_ALLOC_STD);
                 if (page == NULL) {
                         while (--i >= 0)
                                 cfs_free_page(rb->rb_kiov[i].kiov_page);
@@ -1300,7 +1335,7 @@ lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
 }
 
 int
-lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs)
+lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt)
 {
         lnet_rtrbuf_t *rb;
         int            i;
@@ -1311,7 +1346,7 @@ lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs)
         }
 
         for (i = 0; i < nbufs; i++) {
-                rb = lnet_new_rtrbuf(rbp);
+               rb = lnet_new_rtrbuf(rbp, cpt);
 
                 if (rb == NULL) {
                         CERROR("Failed to allocate %d router bufs of %d pages\n",
@@ -1347,58 +1382,77 @@ lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
 void
 lnet_rtrpools_free(void)
 {
+       lnet_rtrbufpool_t *rtrp;
+       int                i;
+
        if (the_lnet.ln_rtrpools == NULL) /* uninitialized or freed */
                return;
 
-       lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[0]);
-       lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[1]);
-       lnet_rtrpool_free_bufs(&the_lnet.ln_rtrpools[2]);
+       cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+               lnet_rtrpool_free_bufs(&rtrp[0]);
+               lnet_rtrpool_free_bufs(&rtrp[1]);
+               lnet_rtrpool_free_bufs(&rtrp[2]);
+       }
 
-       LIBCFS_FREE(the_lnet.ln_rtrpools,
-                   sizeof(lnet_rtrbufpool_t) * LNET_NRBPOOLS);
+       cfs_percpt_free(the_lnet.ln_rtrpools);
        the_lnet.ln_rtrpools = NULL;
 }
 
 static int
 lnet_nrb_tiny_calculate(int npages)
 {
-       if (tiny_router_buffers > 0)
-               return tiny_router_buffers;
+       int     nrbs = LNET_NRB_TINY;
 
-       if (tiny_router_buffers == 0)
-               return LNET_NRB_TINY;
+       if (tiny_router_buffers < 0) {
+               LCONSOLE_ERROR_MSG(0x10c,
+                                  "tiny_router_buffers=%d invalid when "
+                                  "routing enabled\n", tiny_router_buffers);
+               return -1;
+       }
+
+       if (tiny_router_buffers > 0)
+               nrbs = tiny_router_buffers;
 
-       LCONSOLE_ERROR_MSG(0x10c, "tiny_router_buffers=%d invalid when "
-                                 "routing enabled\n", tiny_router_buffers);
-       return -1;
+       nrbs /= LNET_CPT_NUMBER;
+       return max(nrbs, LNET_NRB_TINY_MIN);
 }
 
 static int
 lnet_nrb_small_calculate(int npages)
 {
-       if (small_router_buffers > 0)
-               return tiny_router_buffers;
+       int     nrbs = LNET_NRB_SMALL;
 
-       if (small_router_buffers == 0)
-               return LNET_NRB_SMALL;
+       if (small_router_buffers < 0) {
+               LCONSOLE_ERROR_MSG(0x10c,
+                                  "small_router_buffers=%d invalid when "
+                                  "routing enabled\n", small_router_buffers);
+               return -1;
+       }
+
+       if (small_router_buffers > 0)
+               nrbs = small_router_buffers;
 
-       LCONSOLE_ERROR_MSG(0x10d, "small_router_buffers=%d invalid when "
-                                 "routing enabled\n", small_router_buffers);
-       return -1;
+       nrbs /= LNET_CPT_NUMBER;
+       return max(nrbs, LNET_NRB_SMALL_MIN);
 }
 
 static int
 lnet_nrb_large_calculate(int npages)
 {
-       if (large_router_buffers > 0)
-               return large_router_buffers;
+       int     nrbs = LNET_NRB_LARGE;
 
-       if (large_router_buffers == 0)
-               return LNET_NRB_LARGE;
+       if (large_router_buffers < 0) {
+               LCONSOLE_ERROR_MSG(0x10c,
+                                  "large_router_buffers=%d invalid when "
+                                  "routing enabled\n", large_router_buffers);
+               return -1;
+       }
 
-       LCONSOLE_ERROR_MSG(0x10e, "large_router_buffers=%d invalid when"
-                                 " routing enabled\n", large_router_buffers);
-       return -1;
+       if (large_router_buffers > 0)
+               nrbs = large_router_buffers;
+
+       nrbs /= LNET_CPT_NUMBER;
+       return max(nrbs, LNET_NRB_LARGE_MIN);
 }
 
 int
@@ -1411,6 +1465,7 @@ lnet_rtrpools_alloc(int im_a_router)
        int     nrb_small;
        int     nrb_large;
        int     rc;
+       int     i;
 
         if (!strcmp(forwarding, "")) {
                 /* not set either way */
@@ -1439,36 +1494,35 @@ lnet_rtrpools_alloc(int im_a_router)
        if (nrb_large < 0)
                return -EINVAL;
 
-       LIBCFS_ALLOC(the_lnet.ln_rtrpools,
-                    sizeof(lnet_rtrbufpool_t) * LNET_NRBPOOLS);
+       the_lnet.ln_rtrpools = cfs_percpt_alloc(lnet_cpt_table(),
+                                               LNET_NRBPOOLS *
+                                               sizeof(lnet_rtrbufpool_t));
        if (the_lnet.ln_rtrpools == NULL) {
                LCONSOLE_ERROR_MSG(0x10c,
                                   "Failed to initialize router buffe pool\n");
                return -ENOMEM;
        }
 
-       do {    /* iterate over rtrpools on all CPTs in upcoming patches */
-               rtrp = the_lnet.ln_rtrpools;
-
+       cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
                lnet_rtrpool_init(&rtrp[0], 0);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[0], nrb_tiny);
+               rc = lnet_rtrpool_alloc_bufs(&rtrp[0], nrb_tiny, i);
                if (rc != 0)
                        goto failed;
 
                lnet_rtrpool_init(&rtrp[1], small_pages);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[1], nrb_small);
+               rc = lnet_rtrpool_alloc_bufs(&rtrp[1], nrb_small, i);
                if (rc != 0)
                        goto failed;
 
                lnet_rtrpool_init(&rtrp[2], large_pages);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[2], nrb_large);
+               rc = lnet_rtrpool_alloc_bufs(&rtrp[2], nrb_large, i);
                if (rc != 0)
                        goto failed;
-       } while (0);
+       }
 
-       LNET_LOCK();
+       lnet_net_lock(LNET_LOCK_EX);
        the_lnet.ln_routing = 1;
-       LNET_UNLOCK();
+       lnet_net_unlock(LNET_LOCK_EX);
 
        return 0;
 
@@ -1478,10 +1532,11 @@ lnet_rtrpools_alloc(int im_a_router)
 }
 
 int
-lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
+lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
 {
-        lnet_peer_t *lp = NULL;
-        cfs_time_t   now = cfs_time_current();
+       struct lnet_peer        *lp = NULL;
+       cfs_time_t              now = cfs_time_current();
+       int                     cpt = lnet_cpt_of_nid(nid);
 
         LASSERT (!cfs_in_interrupt ());
 
@@ -1514,12 +1569,12 @@ lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
                 return 0;
         }
 
-        LNET_LOCK();
+       lnet_net_lock(cpt);
 
-        lp = lnet_find_peer_locked(nid);
-        if (lp == NULL) {
-                /* nid not found */
-                LNET_UNLOCK();
+       lp = lnet_find_peer_locked(the_lnet.ln_peer_tables[cpt], nid);
+       if (lp == NULL) {
+               /* nid not found */
+               lnet_net_unlock(cpt);
                 CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
                 return 0;
         }
@@ -1535,10 +1590,10 @@ lnet_notify (lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
 
        lnet_ni_notify_locked(ni, lp);
 
-        lnet_peer_decref_locked(lp);
+       lnet_peer_decref_locked(lp);
 
-        LNET_UNLOCK();
-        return 0;
+       lnet_net_unlock(cpt);
+       return 0;
 }
 EXPORT_SYMBOL(lnet_notify);
 
@@ -1580,12 +1635,14 @@ lnet_router_checker (void)
                         live_router_check_interval, dead_router_check_interval,
                         interval);
 
-        LNET_LOCK();
-        LASSERT (!running); /* recursion check */
-        running = 1;
-        LNET_UNLOCK();
+       LASSERT(LNET_CPT_NUMBER == 1);
 
-        last = now;
+       lnet_net_lock(0);
+       LASSERT(!running); /* recursion check */
+       running = 1;
+       lnet_net_unlock(0);
+
+       last = now;
 
        if (the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING)
                lnet_prune_rc_data(0); /* unlink all rcd and nowait */
@@ -1621,7 +1678,7 @@ lnet_router_checker (void)
 
         LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
 
-        LNET_LOCK();
+       lnet_net_lock(0);
 
         version = the_lnet.ln_routers_version;
         cfs_list_for_each_entry (rtr, &the_lnet.ln_routers, lp_rtr_list) {
@@ -1629,10 +1686,10 @@ lnet_router_checker (void)
                 LASSERT (version == the_lnet.ln_routers_version);
         }
 
-        LNET_UNLOCK();
+       lnet_net_unlock(0);
 
-        running = 0; /* lock only needed for the recursion check */
-        return;
+       running = 0; /* lock only needed for the recursion check */
+       return;
 }
 
 /* NB lnet_peers_start_down depends on me,
index 60e0dda..5f85792 100644 (file)
@@ -52,40 +52,52 @@ enum {
 #define PSDEV_LNET_NIS     CTL_UNNUMBERED
 #endif
 
+#define LNET_LOFFT_BITS                (sizeof(loff_t) * 8)
 /*
- * NB: we don't use the highest bit of *ppos because it's signed;
- *     next 9 bits is used to stash idx (assuming that
- *     LNET_PEER_HASHSIZE < 512)
+ * NB: max allowed LNET_CPT_BITS is 8 on 64-bit system and 2 on 32-bit system
  */
-#define LNET_LOFFT_BITS        (sizeof(loff_t) * 8)
-#define LNET_VERSION_BITS      MAX(((MIN(LNET_LOFFT_BITS, 64)) / 4), 8)
-#define LNET_PHASH_IDX_BITS    9
-#define LNET_PHASH_NUM_BITS    (LNET_LOFFT_BITS - 1 -\
-                                LNET_VERSION_BITS - LNET_PHASH_IDX_BITS)
-#define LNET_PHASH_BITS        (LNET_PHASH_IDX_BITS + LNET_PHASH_NUM_BITS)
-
-#define LNET_VERSION_BITMASK   ((1ULL << LNET_VERSION_BITS) - 1)
-#define LNET_PHASH_IDX_BITMASK ((1ULL << LNET_PHASH_IDX_BITS) - 1)
-#define LNET_PHASH_NUM_BITMASK ((1ULL << LNET_PHASH_NUM_BITS) - 1)
-
-#define LNET_VERSION_MASK      (LNET_VERSION_BITMASK << LNET_PHASH_BITS)
-#define LNET_PHASH_IDX_MASK    (LNET_PHASH_IDX_BITMASK << LNET_PHASH_NUM_BITS)
-#define LNET_PHASH_NUM_MASK    (LNET_PHASH_NUM_BITMASK)
-
-#define LNET_VERSION_GET(pos)   (int)(((pos) & LNET_VERSION_MASK) >> \
-                                     LNET_PHASH_BITS)
-#define LNET_PHASH_IDX_GET(pos) (int)(((pos) & LNET_PHASH_IDX_MASK) >> \
-                                      LNET_PHASH_NUM_BITS)
-#define LNET_PHASH_NUM_GET(pos) (int)((pos) & LNET_PHASH_NUM_MASK)
-#define LNET_VERSION_VALID_MASK(ver) \
-                                (unsigned int)((ver) & \
-                                 LNET_VERSION_BITMASK)
-#define LNET_PHASH_POS_MAKE(ver, idx, num)                                     \
-                                (((((loff_t)(ver)) & LNET_VERSION_BITMASK) <<  \
-                                   LNET_PHASH_BITS) |                          \
-                                 ((((loff_t)(idx)) & LNET_PHASH_IDX_BITMASK) <<\
-                                   LNET_PHASH_NUM_BITS) |                      \
-                                 ((num) & LNET_PHASH_NUM_BITMASK))
+#define LNET_PROC_CPT_BITS     LNET_CPT_BITS
+/* change version, 16 bits or 8 bits */
+#define LNET_PROC_VER_BITS     MAX(((MIN(LNET_LOFFT_BITS, 64)) / 4), 8)
+
+#define LNET_PROC_HASH_BITS    LNET_PEER_HASH_BITS
+/*
+ * bits for peer hash offset
+ * NB: we don't use the highest bit of *ppos because it's signed
+ */
+#define LNET_PROC_HOFF_BITS    (LNET_LOFFT_BITS -       \
+                                LNET_PROC_CPT_BITS -    \
+                                LNET_PROC_VER_BITS -    \
+                                LNET_PROC_HASH_BITS - 1)
+/* bits for hash index + position */
+#define LNET_PROC_HPOS_BITS    (LNET_PROC_HASH_BITS + LNET_PROC_HOFF_BITS)
+/* bits for peer hash table + hash version */
+#define LNET_PROC_VPOS_BITS    (LNET_PROC_HPOS_BITS + LNET_PROC_VER_BITS)
+
+#define LNET_PROC_CPT_MASK     ((1ULL << LNET_PROC_CPT_BITS) - 1)
+#define LNET_PROC_VER_MASK     ((1ULL << LNET_PROC_VER_BITS) - 1)
+#define LNET_PROC_HASH_MASK    ((1ULL << LNET_PROC_HASH_BITS) - 1)
+#define LNET_PROC_HOFF_MASK    ((1ULL << LNET_PROC_HOFF_BITS) - 1)
+
+#define LNET_PROC_CPT_GET(pos)                         \
+       (int)(((pos) >> LNET_PROC_VPOS_BITS) & LNET_PROC_CPT_MASK)
+
+#define LNET_PROC_VER_GET(pos)                         \
+       (int)(((pos) >> LNET_PROC_HPOS_BITS) & LNET_PROC_VER_MASK)
+
+#define LNET_PROC_HASH_GET(pos)                                \
+       (int)(((pos) >> LNET_PROC_HOFF_BITS) & LNET_PROC_HASH_MASK)
+
+#define LNET_PROC_HOFF_GET(pos)                                \
+       (int)((pos) & LNET_PROC_HOFF_MASK)
+
+#define LNET_PROC_POS_MAKE(cpt, ver, hash, off)                \
+       (((((loff_t)(cpt)) & LNET_PROC_CPT_MASK) << LNET_PROC_VPOS_BITS) |   \
+       ((((loff_t)(ver)) & LNET_PROC_VER_MASK) << LNET_PROC_HPOS_BITS) |   \
+       ((((loff_t)(hash)) & LNET_PROC_HASH_MASK) << LNET_PROC_HOFF_BITS) | \
+       ((off) & LNET_PROC_HOFF_MASK))
+
+#define LNET_PROC_VERSION(v)   ((unsigned int)((v) & LNET_PROC_VER_MASK))
 
 static int __proc_lnet_stats(void *data, int write,
                              loff_t pos, void *buffer, int nob)
@@ -140,18 +152,20 @@ DECLARE_PROC_HANDLER(proc_lnet_stats);
 
 int LL_PROC_PROTO(proc_lnet_routes)
 {
-        int        rc     = 0;
-        char      *tmpstr;
-        char      *s;
-        const int  tmpsiz = 256;
-        int        len;
-        int        ver;
-        int        num;
+       const int       tmpsiz = 256;
+       char            *tmpstr;
+       char            *s;
+       int             rc = 0;
+       int             len;
+       int             ver;
+       int             off;
 
-        DECLARE_LL_PROC_PPOS_DECL;
+       DECLARE_LL_PROC_PPOS_DECL;
+
+       CLASSERT(sizeof(loff_t) >= 4);
 
-        num = LNET_PHASH_NUM_GET(*ppos);
-        ver = LNET_VERSION_GET(*ppos);
+       off = LNET_PROC_HOFF_GET(*ppos);
+       ver = LNET_PROC_VER_GET(*ppos);
 
         LASSERT (!write);
 
@@ -173,21 +187,21 @@ int LL_PROC_PROTO(proc_lnet_routes)
                               "net", "hops", "state", "router");
                 LASSERT (tmpstr + tmpsiz - s > 0);
 
-                LNET_LOCK();
-                ver = (unsigned int)the_lnet.ln_remote_nets_version;
-                LNET_UNLOCK();
-                *ppos = LNET_PHASH_POS_MAKE(ver, 0, num);
-        } else {
-                cfs_list_t        *n;
-                cfs_list_t        *r;
-                lnet_route_t      *route = NULL;
-                lnet_remotenet_t  *rnet  = NULL;
-                int                skip  = num - 1;
-
-                LNET_LOCK();
-
-                if (ver != LNET_VERSION_VALID_MASK(the_lnet.ln_remote_nets_version)) {
-                        LNET_UNLOCK();
+               lnet_net_lock(0);
+               ver = (unsigned int)the_lnet.ln_remote_nets_version;
+               lnet_net_unlock(0);
+               *ppos = LNET_PROC_POS_MAKE(0, ver, 0, off);
+       } else {
+               cfs_list_t              *n;
+               cfs_list_t              *r;
+               lnet_route_t            *route = NULL;
+               lnet_remotenet_t        *rnet  = NULL;
+               int                     skip  = off - 1;
+
+               lnet_net_lock(0);
+
+               if (ver != LNET_PROC_VERSION(the_lnet.ln_remote_nets_version)) {
+                       lnet_net_unlock(0);
                         LIBCFS_FREE(tmpstr, tmpsiz);
                         return -ESTALE;
                 }
@@ -221,13 +235,15 @@ int LL_PROC_PROTO(proc_lnet_routes)
                         lnet_nid_t   nid   = route->lr_gateway->lp_nid;
                         int          alive = route->lr_gateway->lp_alive;
 
-                        s += snprintf(s, tmpstr + tmpsiz - s, "%-8s %4u %7s %s\n",
-                                      libcfs_net2str(net), hops,
-                                      alive ? "up" : "down", libcfs_nid2str(nid));
-                        LASSERT (tmpstr + tmpsiz - s > 0);
-                }
+                       s += snprintf(s, tmpstr + tmpsiz - s,
+                                     "%-8s %4u %7s %s\n",
+                                     libcfs_net2str(net), hops,
+                                     alive ? "up" : "down",
+                                     libcfs_nid2str(nid));
+                       LASSERT(tmpstr + tmpsiz - s > 0);
+               }
 
-                LNET_UNLOCK();
+               lnet_net_unlock(0);
         }
 
         len = s - tmpstr;     /* how many bytes was written */
@@ -238,8 +254,8 @@ int LL_PROC_PROTO(proc_lnet_routes)
                 if (cfs_copy_to_user(buffer, tmpstr, len))
                         rc = -EFAULT;
                 else {
-                        num += 1;
-                        *ppos = LNET_PHASH_POS_MAKE(ver, 0, num);
+                       off += 1;
+                       *ppos = LNET_PROC_POS_MAKE(0, ver, 0, off);
                 }
         }
 
@@ -259,12 +275,12 @@ int LL_PROC_PROTO(proc_lnet_routers)
         const int  tmpsiz = 256;
         int        len;
         int        ver;
-        int        num;
+       int        off;
 
-        DECLARE_LL_PROC_PPOS_DECL;
+       DECLARE_LL_PROC_PPOS_DECL;
 
-        num = LNET_PHASH_NUM_GET(*ppos);
-        ver = LNET_VERSION_GET(*ppos);
+       off = LNET_PROC_HOFF_GET(*ppos);
+       ver = LNET_PROC_VER_GET(*ppos);
 
         LASSERT (!write);
 
@@ -278,25 +294,27 @@ int LL_PROC_PROTO(proc_lnet_routers)
         s = tmpstr; /* points to current position in tmpstr[] */
 
         if (*ppos == 0) {
-                s += snprintf(s, tmpstr + tmpsiz - s,
-                              "%-4s %7s %9s %6s %12s %9s %8s %7s %s\n",
-                              "ref", "rtr_ref", "alive_cnt", "state", "last_ping",
-                              "ping_sent", "deadline", "down_ni", "router");
-                LASSERT (tmpstr + tmpsiz - s > 0);
-
-                LNET_LOCK();
-                ver = (unsigned int)the_lnet.ln_routers_version;
-                LNET_UNLOCK();
-                *ppos = LNET_PHASH_POS_MAKE(ver, 0, num);
-        } else {
-                cfs_list_t        *r;
-                lnet_peer_t       *peer = NULL;
-                int                skip = num - 1;
+               s += snprintf(s, tmpstr + tmpsiz - s,
+                             "%-4s %7s %9s %6s %12s %9s %8s %7s %s\n",
+                             "ref", "rtr_ref", "alive_cnt", "state",
+                             "last_ping", "ping_sent", "deadline",
+                             "down_ni", "router");
+               LASSERT(tmpstr + tmpsiz - s > 0);
+
+               lnet_net_lock(0);
+               ver = (unsigned int)the_lnet.ln_routers_version;
+               lnet_net_unlock(0);
+               *ppos = LNET_PROC_POS_MAKE(0, ver, 0, off);
+       } else {
+               cfs_list_t              *r;
+               struct lnet_peer        *peer = NULL;
+               int                     skip = off - 1;
+
+               lnet_net_lock(0);
+
+               if (ver != LNET_PROC_VERSION(the_lnet.ln_routers_version)) {
+                       lnet_net_unlock(0);
 
-                LNET_LOCK();
-
-                if (ver != LNET_VERSION_VALID_MASK(the_lnet.ln_routers_version)) {
-                        LNET_UNLOCK();
                         LIBCFS_FREE(tmpstr, tmpsiz);
                         return -ESTALE;
                 }
@@ -360,7 +378,7 @@ int LL_PROC_PROTO(proc_lnet_routers)
                         LASSERT (tmpstr + tmpsiz - s > 0);
                 }
 
-                LNET_UNLOCK();
+               lnet_net_unlock(0);
         }
 
         len = s - tmpstr;     /* how many bytes was written */
@@ -371,8 +389,8 @@ int LL_PROC_PROTO(proc_lnet_routers)
                 if (cfs_copy_to_user(buffer, tmpstr, len))
                         rc = -EFAULT;
                 else {
-                        num += 1;
-                        *ppos = LNET_PHASH_POS_MAKE(ver, 0, num);
+                       off += 1;
+                       *ppos = LNET_PROC_POS_MAKE(0, ver, 0, off);
                 }
         }
 
@@ -386,28 +404,25 @@ int LL_PROC_PROTO(proc_lnet_routers)
 
 int LL_PROC_PROTO(proc_lnet_peers)
 {
-       struct lnet_peer_table  *ptable = the_lnet.ln_peer_table;
-        int        rc = 0;
-        char      *tmpstr;
-        char      *s;
-        const int  tmpsiz      = 256;
-        int        len;
-        int        ver;
-        int        idx;
-        int        num;
-
-        DECLARE_LL_PROC_PPOS_DECL;
-
-        idx = LNET_PHASH_IDX_GET(*ppos);
-        num = LNET_PHASH_NUM_GET(*ppos);
-        ver = LNET_VERSION_GET(*ppos);
-
-        CLASSERT ((1ULL << LNET_PHASH_BITS) > LNET_PEER_HASHSIZE);
-
-        LASSERT (!write);
+       const int               tmpsiz  = 256;
+       struct lnet_peer_table  *ptable;
+       char                    *tmpstr;
+       char                    *s;
+       int                     cpt  = LNET_PROC_CPT_GET(*ppos);
+       int                     ver  = LNET_PROC_VER_GET(*ppos);
+       int                     hoff = LNET_PROC_HOFF_GET(*ppos);
+       int                     hash = LNET_PROC_HASH_GET(*ppos);
+       int                     rc = 0;
+       int                     len;
+
+       CLASSERT(LNET_PROC_HASH_BITS >= LNET_PEER_HASH_BITS);
+       LASSERT(!write);
+
+       if (*lenp == 0)
+               return 0;
 
-        if (*lenp == 0)
-                return 0;
+       if (cpt >= LNET_CPT_NUMBER)
+               return 0;
 
         LIBCFS_ALLOC(tmpstr, tmpsiz);
         if (tmpstr == NULL)
@@ -422,30 +437,29 @@ int LL_PROC_PROTO(proc_lnet_peers)
                               "rtr", "min", "tx", "min", "queue");
                 LASSERT (tmpstr + tmpsiz - s > 0);
 
-                LNET_LOCK();
-               ver = (unsigned int)ptable->pt_version;
-                LNET_UNLOCK();
-                *ppos = LNET_PHASH_POS_MAKE(ver, idx, num);
-
-                num++;
-        } else {
-                cfs_list_t        *p    = NULL;
-                lnet_peer_t       *peer = NULL;
-                int                skip = num - 1;
-
-                LNET_LOCK();
-
-               if (ver != LNET_VERSION_VALID_MASK(ptable->pt_version)) {
-                        LNET_UNLOCK();
-                        LIBCFS_FREE(tmpstr, tmpsiz);
-                        return -ESTALE;
-                }
-
-                while (idx < LNET_PEER_HASHSIZE) {
-                        if (p == NULL)
-                               p = ptable->pt_hash[idx].next;
-
-                       while (p != &ptable->pt_hash[idx]) {
+               hoff++;
+       } else {
+               struct lnet_peer        *peer   = NULL;
+               cfs_list_t              *p      = NULL;
+               int                     skip    = hoff - 1;
+
+ again:
+               lnet_net_lock(cpt);
+               ptable = the_lnet.ln_peer_tables[cpt];
+               if (hoff == 1)
+                       ver = LNET_PROC_VERSION(ptable->pt_version);
+
+               if (ver != LNET_PROC_VERSION(ptable->pt_version)) {
+                       lnet_net_unlock(cpt);
+                       LIBCFS_FREE(tmpstr, tmpsiz);
+                       return -ESTALE;
+               }
+
+               while (hash < LNET_PEER_HASH_SIZE) {
+                       if (p == NULL)
+                               p = ptable->pt_hash[hash].next;
+
+                       while (p != &ptable->pt_hash[hash]) {
                                 lnet_peer_t *lp = cfs_list_entry(p, lnet_peer_t,
                                                                  lp_hashlist);
                                 if (skip == 0) {
@@ -455,11 +469,11 @@ int LL_PROC_PROTO(proc_lnet_peers)
                                          * on next iteration if we've just
                                          * drained lp_hashlist */
                                        if (lp->lp_hashlist.next ==
-                                           &ptable->pt_hash[idx]) {
-                                                num = 1;
-                                                idx++;
-                                        } else {
-                                                num++;
+                                           &ptable->pt_hash[hash]) {
+                                               hoff = 1;
+                                               hash++;
+                                       } else {
+                                               hoff++;
                                         }
 
                                         break;
@@ -473,8 +487,8 @@ int LL_PROC_PROTO(proc_lnet_peers)
                                 break;
 
                         p = NULL;
-                        num = 1;
-                        idx++;
+                       hoff = 1;
+                       hash++;
                 }
 
                 if (peer != NULL) {
@@ -508,15 +522,26 @@ int LL_PROC_PROTO(proc_lnet_peers)
                                         lastalive = 9999;
                         }
 
+                       lnet_net_unlock(cpt);
+
                         s += snprintf(s, tmpstr + tmpsiz - s,
                                       "%-24s %4d %5s %5d %5d %5d %5d %5d %5d %d\n",
                                       libcfs_nid2str(nid), nrefs, aliveness,
                                       lastalive, maxcr, rtrcr, minrtrcr, txcr,
                                       mintxcr, txqnob);
                         LASSERT (tmpstr + tmpsiz - s > 0);
-                }
 
-                LNET_UNLOCK();
+               } else { /* peer is NULL */
+                       lnet_net_unlock(cpt);
+
+                       if (hash == LNET_PEER_HASH_SIZE &&
+                           cpt < LNET_CPT_NUMBER - 1) {
+                               cpt++;
+                               hash = 0;
+                               hoff = 1;
+                               goto again;
+                       }
+               }
         }
 
         len = s - tmpstr;     /* how many bytes was written */
@@ -527,7 +552,7 @@ int LL_PROC_PROTO(proc_lnet_peers)
                 if (cfs_copy_to_user(buffer, tmpstr, len))
                         rc = -EFAULT;
                 else
-                        *ppos = LNET_PHASH_POS_MAKE(ver, idx, num);
+                       *ppos = LNET_PROC_POS_MAKE(cpt, ver, hash, hoff);
         }
 
         LIBCFS_FREE(tmpstr, tmpsiz);
@@ -541,16 +566,18 @@ int LL_PROC_PROTO(proc_lnet_peers)
 static int __proc_lnet_buffers(void *data, int write,
                                loff_t pos, void *buffer, int nob)
 {
-
-        int              rc;
-        int              len;
-        char            *s;
-        char            *tmpstr;
-        const int        tmpsiz = 64 * (LNET_NRBPOOLS + 1); /* (4 %d) * 4 */
-        int              idx;
-
-        LASSERT (!write);
-
+       char            *s;
+       char            *tmpstr;
+       int             tmpsiz;
+       int             idx;
+       int             len;
+       int             rc;
+       int             i;
+
+       LASSERT(!write);
+
+       /* (4 %d) * 4 * LNET_CPT_NUMBER */
+       tmpsiz = 64 * (LNET_NRBPOOLS + 1) * LNET_CPT_NUMBER;
         LIBCFS_ALLOC(tmpstr, tmpsiz);
         if (tmpstr == NULL)
                 return -ENOMEM;
@@ -565,23 +592,21 @@ static int __proc_lnet_buffers(void *data, int write,
        if (the_lnet.ln_rtrpools == NULL)
                goto out; /* I'm not a router */
 
-        LNET_LOCK();
-
-        for (idx = 0; idx < LNET_NRBPOOLS; idx++) {
-                lnet_rtrbufpool_t *rbp = &the_lnet.ln_rtrpools[idx];
-
-                int npages = rbp->rbp_npages;
-                int nbuf   = rbp->rbp_nbuffers;
-                int cr     = rbp->rbp_credits;
-                int mincr  = rbp->rbp_mincredits;
-
-                s += snprintf(s, tmpstr + tmpsiz - s,
-                              "%5d %5d %7d %7d\n",
-                              npages, nbuf, cr, mincr);
-                LASSERT (tmpstr + tmpsiz - s > 0);
-        }
-
-        LNET_UNLOCK();
+       for (idx = 0; idx < LNET_NRBPOOLS; idx++) {
+               lnet_rtrbufpool_t *rbp;
+
+               lnet_net_lock(LNET_LOCK_EX);
+               cfs_percpt_for_each(rbp, i, the_lnet.ln_rtrpools) {
+                       s += snprintf(s, tmpstr + tmpsiz - s,
+                                     "%5d %5d %7d %7d\n",
+                                     rbp[idx].rbp_npages,
+                                     rbp[idx].rbp_nbuffers,
+                                     rbp[idx].rbp_credits,
+                                     rbp[idx].rbp_mincredits);
+                       LASSERT(tmpstr + tmpsiz - s > 0);
+               }
+               lnet_net_unlock(LNET_LOCK_EX);
+       }
 
  out:
         len = s - tmpstr;
@@ -600,10 +625,10 @@ DECLARE_PROC_HANDLER(proc_lnet_buffers);
 
 int LL_PROC_PROTO(proc_lnet_nis)
 {
+       int     tmpsiz = 128 * LNET_CPT_NUMBER;
         int        rc = 0;
         char      *tmpstr;
         char      *s;
-        const int  tmpsiz = 256;
         int        len;
 
         DECLARE_LL_PROC_PPOS_DECL;
@@ -630,7 +655,7 @@ int LL_PROC_PROTO(proc_lnet_nis)
                 lnet_ni_t         *ni   = NULL;
                 int                skip = *ppos - 1;
 
-                LNET_LOCK();
+               lnet_net_lock(0);
 
                 n = the_lnet.ln_nis.next;
 
@@ -647,36 +672,46 @@ int LL_PROC_PROTO(proc_lnet_nis)
                 }
 
                 if (ni != NULL) {
-                        cfs_time_t now = cfs_time_current();
-                        int        last_alive = -1;
-                        int        maxtxcr = ni->ni_maxtxcredits;
-                        int        txcr = ni->ni_txcredits;
-                        int        mintxcr = ni->ni_mintxcredits;
-                        int        npeertxcr = ni->ni_peertxcredits;
-                        int        npeerrtrcr = ni->ni_peerrtrcredits;
-                        lnet_nid_t nid = ni->ni_nid;
-                        int        nref = ni->ni_refcount;
-                        char      *stat;
-
-                        if (the_lnet.ln_routing)
-                                last_alive = cfs_duration_sec(cfs_time_sub(now,
-                                                            ni->ni_last_alive));
-                        if (ni->ni_lnd->lnd_type == LOLND)  /* @lo forever alive */
-                                last_alive = 0;
-
-                        LASSERT (ni->ni_status != NULL);
-                        stat = (ni->ni_status->ns_status == LNET_NI_STATUS_UP) ?
-                                                                  "up" : "down";
-
-                        s += snprintf(s, tmpstr + tmpsiz - s,
-                                      "%-24s %6s %5d %4d %4d %4d %5d %5d %5d\n",
-                                      libcfs_nid2str(nid), stat, last_alive, nref,
-                                      npeertxcr, npeerrtrcr, maxtxcr,
-                                      txcr, mintxcr);
-                        LASSERT (tmpstr + tmpsiz - s > 0);
-                }
+                       char    *stat;
+                       struct lnet_tx_queue    *tq;
+                       long    now = cfs_time_current_sec();
+                       int     last_alive = -1;
+                       int     i;
+
+                       if (the_lnet.ln_routing)
+                               last_alive = now - ni->ni_last_alive;
+
+                       /* @lo forever alive */
+                       if (ni->ni_lnd->lnd_type == LOLND)
+                               last_alive = 0;
+
+                       lnet_ni_lock(ni);
+                       LASSERT(ni->ni_status != NULL);
+                       stat = (ni->ni_status->ns_status ==
+                               LNET_NI_STATUS_UP) ? "up" : "down";
+                       lnet_ni_unlock(ni);
+
+                       /* we actually output credits information for
+                        * TX queue of each partition */
+                       cfs_percpt_for_each(tq, i, ni->ni_tx_queues) {
+                               if (i != 0)
+                                       lnet_net_lock(i);
+
+                               s += snprintf(s, tmpstr + tmpsiz - s,
+                                     "%-24s %6s %5d %4d %4d %4d %5d %5d %5d\n",
+                                     libcfs_nid2str(ni->ni_nid), stat,
+                                     last_alive, *ni->ni_refs[i],
+                                     ni->ni_peertxcredits,
+                                     ni->ni_peerrtrcredits,
+                                     tq->tq_credits_max,
+                                     tq->tq_credits, tq->tq_credits_min);
+                               if (i != 0)
+                                       lnet_net_unlock(i);
+                       }
+                       LASSERT(tmpstr + tmpsiz - s > 0);
+               }
 
-                LNET_UNLOCK();
+               lnet_net_unlock(0);
         }
 
         len = s - tmpstr;     /* how many bytes was written */