Whamcloud - gitweb
LU-56 lnet: Partitioned LNet resources (ME/MD/EQ)
authorLiang Zhen <liang@whamcloud.com>
Mon, 11 Jun 2012 14:28:23 +0000 (22:28 +0800)
committerOleg Drokin <green@whamcloud.com>
Tue, 26 Jun 2012 16:09:48 +0000 (12:09 -0400)
We already have a new lock lnet_res_lock to protect LNet resources,
but it's still a global lock and could have performance issue.
This patch created partitioned data for LNet, resources are
spreaded into different partitions. Also, lnet_res_lock is not
a single spinlock anymore, it's a percpt lock now, which means
LNet only needs to lock one partition at a time while operating
MD/ME belonging to that partition.

There are a few things are still serialized by exclusive lock:
- EQ allocation/free
- LNetEQPoll (non-zero size EQ)
- delay message on lazy portal
- Steaing MD between partitions.

There operations are either rare or deprecated so they shouldn't
become performance problem.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: If5e88b92dd508b84c0fd91725b3aaed424dd3108
Reviewed-on: http://review.whamcloud.com/3078
Reviewed-by: Bobi Jam <bobijam@whamcloud.com>
Tested-by: Hudson
Reviewed-by: Doug Oucharek <doug@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/include/lnet/types.h
lnet/lnet/api-ni.c
lnet/lnet/lib-eq.c
lnet/lnet/lib-md.c
lnet/lnet/lib-me.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/lnet/lib-ptl.c

index 9bf8099..6f16f49 100644 (file)
 
 extern lnet_t  the_lnet;                        /* THE network */
 
+#if !defined(__KERNEL__) || defined(LNET_USE_LIB_FREELIST)
+/* 1 CPT, simplify implementation... */
+# define LNET_CPT_MAX_BITS      0
+
+#else /* KERNEL and no freelist */
+
+# if (BITS_PER_LONG == 32)
+/* 2 CPTs, allowing more CPTs might make us under memory pressure */
+#  define LNET_CPT_MAX_BITS     1
+
+# else /* 64-bit system */
+/*
+ * 256 CPTs for thousands of CPUs, allowing more CPTs might make us
+ * under risk of consuming all lh_cooke.
+ */
+#  define LNET_CPT_MAX_BITS     8
+# endif /* BITS_PER_LONG == 32 */
+#endif
+
+/* max allowed CPT number */
+#define LNET_CPT_MAX            (1 << LNET_CPT_MAX_BITS)
+
+#define LNET_CPT_NUMBER         (the_lnet.ln_cpt_number)
+#define LNET_CPT_BITS           (the_lnet.ln_cpt_bits)
+#define LNET_CPT_MASK           ((1ULL << LNET_CPT_BITS) - 1)
+
+/** exclusive lock */
+#define LNET_LOCK_EX            CFS_PERCPT_LOCK_EX
+
 static inline int lnet_is_wire_handle_none (lnet_handle_wire_t *wh)
 {
         return (wh->wh_interface_cookie == LNET_WIRE_HANDLE_COOKIE_NONE &&
@@ -86,25 +115,53 @@ static inline int lnet_md_unlinkable (lnet_libmd_t *md)
                 lnet_md_exhausted(md));
 }
 
-#ifdef __KERNEL__
+#define lnet_cpt_table()       (the_lnet.ln_cpt_table)
+#define lnet_cpt_current()     cfs_cpt_current(the_lnet.ln_cpt_table, 1)
+
+static inline int
+lnet_cpt_of_cookie(__u64 cookie)
+{
+       unsigned int cpt = (cookie >> LNET_COOKIE_TYPE_BITS) & LNET_CPT_MASK;
+
+       /* LNET_CPT_NUMBER doesn't have to be power2, which means we can
+        * get illegal cpt from it's invalid cookie */
+       return cpt < LNET_CPT_NUMBER ? cpt : cpt % LNET_CPT_NUMBER;
+}
 
 static inline void
-lnet_res_lock(void)
+lnet_res_lock(int cpt)
 {
-       cfs_spin_lock(&the_lnet.ln_res_lock);
+       cfs_percpt_lock(the_lnet.ln_res_lock, cpt);
 }
 
 static inline void
-lnet_res_unlock(void)
+lnet_res_unlock(int cpt)
 {
-       cfs_spin_unlock(&the_lnet.ln_res_lock);
+       cfs_percpt_unlock(the_lnet.ln_res_lock, cpt);
 }
 
-#define LNET_LOCK()        cfs_spin_lock(&the_lnet.ln_lock)
-#define LNET_UNLOCK()      cfs_spin_unlock(&the_lnet.ln_lock)
-#define LNET_MUTEX_LOCK(m)   cfs_mutex_lock(m)
-#define LNET_MUTEX_UNLOCK(m) cfs_mutex_unlock(m)
-#else
+static inline int
+lnet_res_lock_current(void)
+{
+       int cpt = lnet_cpt_current();
+
+       lnet_res_lock(cpt);
+       return cpt;
+}
+
+#ifdef __KERNEL__
+
+#define lnet_ptl_lock(ptl)     cfs_spin_lock(&(ptl)->ptl_lock)
+#define lnet_ptl_unlock(ptl)   cfs_spin_unlock(&(ptl)->ptl_lock)
+#define lnet_eq_wait_lock()    cfs_spin_lock(&the_lnet.ln_eq_wait_lock)
+#define lnet_eq_wait_unlock()  cfs_spin_unlock(&the_lnet.ln_eq_wait_lock)
+#define LNET_LOCK()            cfs_spin_lock(&the_lnet.ln_lock)
+#define LNET_UNLOCK()          cfs_spin_unlock(&the_lnet.ln_lock)
+#define LNET_MUTEX_LOCK(m)     cfs_mutex_lock(m)
+#define LNET_MUTEX_UNLOCK(m)   cfs_mutex_unlock(m)
+
+#else /* !__KERNEL__ */
+
 # ifndef HAVE_LIBPTHREAD
 #define LNET_SINGLE_THREADED_LOCK(l)            \
 do {                                            \
@@ -123,21 +180,31 @@ do {                                            \
 #define LNET_MUTEX_LOCK(m)     LNET_SINGLE_THREADED_LOCK(*(m))
 #define LNET_MUTEX_UNLOCK(m)   LNET_SINGLE_THREADED_UNLOCK(*(m))
 
-#define lnet_res_lock()                                \
-       LNET_SINGLE_THREADED_LOCK(the_lnet.ln_res_lock)
-#define lnet_res_unlock()                      \
-       LNET_SINGLE_THREADED_UNLOCK(the_lnet.ln_res_lock)
+#define lnet_ptl_lock(ptl)                     \
+       LNET_SINGLE_THREADED_LOCK((ptl)->ptl_lock)
+#define lnet_ptl_unlock(ptl)                   \
+       LNET_SINGLE_THREADED_UNLOCK((ptl)->ptl_lock)
+
+#define lnet_eq_wait_lock()                    \
+       LNET_SINGLE_THREADED_LOCK(the_lnet.ln_eq_wait_lock)
+#define lnet_eq_wait_unlock()                  \
+       LNET_SINGLE_THREADED_UNLOCK(the_lnet.ln_eq_wait_lock)
+
+# else /* HAVE_LIBPTHREAD */
 
-# else
 #define LNET_LOCK()            pthread_mutex_lock(&the_lnet.ln_lock)
 #define LNET_UNLOCK()          pthread_mutex_unlock(&the_lnet.ln_lock)
 #define LNET_MUTEX_LOCK(m)     pthread_mutex_lock(m)
 #define LNET_MUTEX_UNLOCK(m)   pthread_mutex_unlock(m)
-#define lnet_res_lock()                pthread_mutex_lock(&the_lnet.ln_res_lock)
-#define lnet_res_unlock()      pthread_mutex_unlock(&the_lnet.ln_res_lock)
 
-# endif
-#endif
+#define lnet_ptl_lock(ptl)     pthread_mutex_lock(&(ptl)->ptl_lock)
+#define lnet_ptl_unlock(ptl)   pthread_mutex_unlock(&(ptl)->ptl_lock)
+
+#define lnet_eq_wait_lock()    pthread_mutex_lock(&the_lnet.ln_eq_wait_lock)
+#define lnet_eq_wait_unlock()  pthread_mutex_unlock(&the_lnet.ln_eq_wait_lock)
+
+# endif /* HAVE_LIBPTHREAD */
+#endif /* __KERNEL__ */
 
 #define MAX_PORTALS     64
 
@@ -184,9 +251,11 @@ lnet_eq_alloc (void)
        struct lnet_res_container *rec = &the_lnet.ln_eq_container;
        lnet_eq_t                 *eq;
 
-       lnet_res_lock();
+       LASSERT(LNET_CPT_NUMBER == 1);
+
+       lnet_res_lock(0);
        eq = (lnet_eq_t *)lnet_freelist_alloc(&rec->rec_freelist);
-       lnet_res_unlock();
+       lnet_res_unlock(0);
 
        return eq;
 }
@@ -197,27 +266,30 @@ lnet_eq_free_locked(lnet_eq_t *eq)
        /* ALWAYS called with resource lock held */
        struct lnet_res_container *rec = &the_lnet.ln_eq_container;
 
+       LASSERT(LNET_CPT_NUMBER == 1);
        lnet_freelist_free(&rec->rec_freelist, eq);
 }
 
 static inline void
 lnet_eq_free(lnet_eq_t *eq)
 {
-       lnet_res_lock();
+       lnet_res_lock(0);
        lnet_eq_free_locked(eq);
-       lnet_res_unlock();
+       lnet_res_unlock(0);
 }
 
 static inline lnet_libmd_t *
 lnet_md_alloc (lnet_md_t *umd)
 {
        /* NEVER called with resource lock held */
-       struct lnet_res_container *rec = &the_lnet.ln_md_container;
+       struct lnet_res_container *rec = the_lnet.ln_md_containers[0];
        lnet_libmd_t              *md;
 
-       lnet_res_lock();
+       LASSERT(LNET_CPT_NUMBER == 1);
+
+       lnet_res_lock(0);
        md = (lnet_libmd_t *)lnet_freelist_alloc(&rec->rec_freelist);
-       lnet_res_unlock();
+       lnet_res_unlock(0);
 
        if (md != NULL)
                CFS_INIT_LIST_HEAD(&md->md_list);
@@ -229,29 +301,32 @@ static inline void
 lnet_md_free_locked(lnet_libmd_t *md)
 {
        /* ALWAYS called with resource lock held */
-       struct lnet_res_container *rec = &the_lnet.ln_md_container;
+       struct lnet_res_container *rec = the_lnet.ln_md_containers[0];
 
+       LASSERT(LNET_CPT_NUMBER == 1);
        lnet_freelist_free(&rec->rec_freelist, md);
 }
 
 static inline void
 lnet_md_free(lnet_libmd_t *md)
 {
-       lnet_res_lock();
+       lnet_res_lock(0);
        lnet_md_free_locked(md);
-       lnet_res_unlock();
+       lnet_res_unlock(0);
 }
 
 static inline lnet_me_t *
 lnet_me_alloc(void)
 {
        /* NEVER called with resource lock held */
-       struct lnet_res_container *rec = &the_lnet.ln_me_container;
+       struct lnet_res_container *rec = the_lnet.ln_me_containers[0];
        lnet_me_t                 *me;
 
-       lnet_res_lock();
+       LASSERT(LNET_CPT_NUMBER == 1);
+
+       lnet_res_lock(0);
        me = (lnet_me_t *)lnet_freelist_alloc(&rec->rec_freelist);
-       lnet_res_unlock();
+       lnet_res_unlock(0);
 
        return me;
 }
@@ -260,17 +335,18 @@ static inline void
 lnet_me_free_locked(lnet_me_t *me)
 {
        /* ALWAYS called with resource lock held */
-       struct lnet_res_container *rec = &the_lnet.ln_me_container;
+       struct lnet_res_container *rec = the_lnet.ln_me_containers[0];
 
+       LASSERT(LNET_CPT_NUMBER == 1);
        lnet_freelist_free(&rec->rec_freelist, me);
 }
 
 static inline void
 lnet_me_free(lnet_me_t *me)
 {
-       lnet_res_lock();
+       lnet_res_lock(0);
        lnet_me_free_locked(me);
-       lnet_res_unlock();
+       lnet_res_unlock(0);
 }
 
 static inline lnet_msg_t *
@@ -432,6 +508,7 @@ static inline void
 lnet_res_lh_invalidate(lnet_libhandle_t *lh)
 {
        /* ALWAYS called with resource lock held */
+       /* NB: cookie is still useful, don't reset it */
        cfs_list_del(&lh->lh_hash_chain);
 }
 
@@ -470,8 +547,11 @@ lnet_handle2md(lnet_handle_md_t *handle)
 {
        /* ALWAYS called with resource lock held */
        lnet_libhandle_t *lh;
+       int              cpt;
 
-       lh = lnet_res_lh_lookup(&the_lnet.ln_md_container, handle->cookie);
+       cpt = lnet_cpt_of_cookie(handle->cookie);
+       lh = lnet_res_lh_lookup(the_lnet.ln_md_containers[cpt],
+                               handle->cookie);
        if (lh == NULL)
                return NULL;
 
@@ -483,11 +563,13 @@ lnet_wire_handle2md(lnet_handle_wire_t *wh)
 {
        /* ALWAYS called with resource lock held */
        lnet_libhandle_t *lh;
+       int              cpt;
 
        if (wh->wh_interface_cookie != the_lnet.ln_interface_cookie)
                return NULL;
 
-       lh = lnet_res_lh_lookup(&the_lnet.ln_md_container,
+       cpt = lnet_cpt_of_cookie(wh->wh_object_cookie);
+       lh = lnet_res_lh_lookup(the_lnet.ln_md_containers[cpt],
                                wh->wh_object_cookie);
        if (lh == NULL)
                return NULL;
@@ -506,8 +588,11 @@ lnet_handle2me(lnet_handle_me_t *handle)
 {
        /* ALWAYS called with resource lock held */
        lnet_libhandle_t *lh;
+       int              cpt;
 
-       lh = lnet_res_lh_lookup(&the_lnet.ln_me_container, handle->cookie);
+       cpt = lnet_cpt_of_cookie(handle->cookie);
+       lh = lnet_res_lh_lookup(the_lnet.ln_me_containers[cpt],
+                               handle->cookie);
        if (lh == NULL)
                return NULL;
 
@@ -606,6 +691,7 @@ lnet_set_msg_uid(lnet_ni_t *ni, lnet_msg_t *msg, lnet_uid_t uid)
 }
 #endif
 
+extern int lnet_cpt_of_nid(lnet_nid_t nid);
 extern lnet_ni_t *lnet_nid2ni_locked (lnet_nid_t nid);
 extern lnet_ni_t *lnet_net2ni_locked (__u32 net);
 static inline lnet_ni_t *
index 2e789fa..63ef7c3 100644 (file)
@@ -241,14 +241,14 @@ typedef struct lnet_libhandle {
         ((type *)((char *)(ptr)-(char *)(&((type *)0)->member)))
 
 typedef struct lnet_eq {
-        cfs_list_t            eq_list;
-        lnet_libhandle_t      eq_lh;
-        lnet_seq_t            eq_enq_seq;
-        lnet_seq_t            eq_deq_seq;
-        unsigned int          eq_size;
-        lnet_event_t         *eq_events;
-        int                   eq_refcount;
-        lnet_eq_handler_t     eq_callback;
+       cfs_list_t              eq_list;
+       lnet_libhandle_t        eq_lh;
+       lnet_seq_t              eq_enq_seq;
+       lnet_seq_t              eq_deq_seq;
+       unsigned int            eq_size;
+       lnet_eq_handler_t       eq_callback;
+       lnet_event_t            *eq_events;
+       int                     **eq_refs;      /* percpt refcount for EQ */
 } lnet_eq_t;
 
 typedef struct lnet_me {
@@ -549,6 +549,10 @@ enum {
        LNET_MATCHMD_OK         = (1 << 1),
        /* Must be discarded */
        LNET_MATCHMD_DROP       = (1 << 2),
+       /* match and buffer is exhausted */
+       LNET_MATCHMD_EXHAUSTED  = (1 << 3),
+       /* match or drop */
+       LNET_MATCHMD_FINISH     = (LNET_MATCHMD_OK | LNET_MATCHMD_DROP),
 };
 
 /* Options for lnet_portal_t::ptl_options */
@@ -575,19 +579,38 @@ struct lnet_match_table {
        /* reserved for upcoming patches, CPU partition ID */
        unsigned int            mt_cpt;
        unsigned int            mt_portal;      /* portal index */
+       /* match table is set as "enabled" if there's non-exhausted MD
+        * attached on mt_mlist, it's only valide for wildcard portal */
+       unsigned int            mt_enabled;
        cfs_list_t              mt_mlist;       /* matching list */
        cfs_list_t              *mt_mhash;      /* matching hash */
 };
 
 typedef struct lnet_portal {
+#ifdef __KERNEL__
+       cfs_spinlock_t          ptl_lock;
+#else
+# ifndef HAVE_LIBPTHREAD
+       int                     ptl_lock;
+# else
+       pthread_mutex_t         ptl_lock;
+# endif
+#endif
        unsigned int            ptl_index;      /* portal ID, reserved */
        /* flags on this portal: lazy, unique... */
        unsigned int            ptl_options;
-       /* Now we only have single instance for each portal,
-        * will have instance per CPT in upcoming patches */
-       struct lnet_match_table *ptl_mtable;
+       /* list of messags which are stealing buffer */
+       cfs_list_t              ptl_msg_stealing;
        /* messages blocking for MD */
-       cfs_list_t              ptl_msgq;
+       cfs_list_t              ptl_msg_delayed;
+       /* Match table for each CPT */
+       struct lnet_match_table **ptl_mtables;
+       /* spread rotor of incoming "PUT" */
+       int                     ptl_rotor;
+       /* # active entries for this portal */
+       int                     ptl_mt_nmaps;
+       /* array of active entries' cpu-partition-id */
+       int                     ptl_mt_maps[0];
 } lnet_portal_t;
 
 #define LNET_LH_HASH_BITS      12
@@ -627,39 +650,37 @@ struct lnet_msg_container {
 
 typedef struct
 {
-        /* Stuff initialised at LNetInit() */
-        int                    ln_init;             /* LNetInit() called? */
-        int                    ln_refcount;         /* LNetNIInit/LNetNIFini counter */
-        int                    ln_niinit_self;      /* Have I called LNetNIInit myself? */
-       /* shutdown in progress */
-       int                             ln_shutdown;
-       /* registered LNDs */
-       cfs_list_t                      ln_lnds;
+       /* CPU partition table of LNet */
+       struct cfs_cpt_table            *ln_cpt_table;
+       /* number of CPTs in ln_cpt_table */
+       unsigned int                    ln_cpt_number;
+       unsigned int                    ln_cpt_bits;
 
 #ifdef __KERNEL__
        cfs_spinlock_t                  ln_lock;
        cfs_mutex_t                     ln_api_mutex;
        cfs_mutex_t                     ln_lnd_mutex;
        cfs_waitq_t                     ln_eq_waitq;
-       cfs_spinlock_t                  ln_res_lock;
+       cfs_spinlock_t                  ln_eq_wait_lock;
 #else
 # ifndef HAVE_LIBPTHREAD
        int                             ln_lock;
        int                             ln_api_mutex;
        int                             ln_lnd_mutex;
-       int                             ln_res_lock;
+       int                             ln_eq_wait_lock;
 # else
        pthread_mutex_t                 ln_lock;
        pthread_mutex_t                 ln_api_mutex;
        pthread_mutex_t                 ln_lnd_mutex;
        pthread_cond_t                  ln_eq_cond;
-       pthread_mutex_t                 ln_res_lock;
+       pthread_mutex_t                 ln_eq_wait_lock;
 # endif
 #endif
+       struct cfs_percpt_lock          *ln_res_lock;
        /* ME container  */
-       struct lnet_res_container       ln_me_container;
+       struct lnet_res_container       **ln_me_containers;
        /* MD container  */
-       struct lnet_res_container       ln_md_container;
+       struct lnet_res_container       **ln_md_containers;
        /* Event Queue container */
        struct lnet_res_container       ln_eq_container;
 
@@ -668,6 +689,16 @@ typedef struct
        /* the vector of portals */
        lnet_portal_t                   **ln_portals;
 
+       int                             ln_init;        /* LNetInit() called? */
+       /* LNetNIInit/LNetNIFini counter */
+       int                             ln_refcount;
+       /* Have I called LNetNIInit myself? */
+       int                             ln_niinit_self;
+       /* shutdown in progress */
+       int                             ln_shutdown;
+       /* registered LNDs */
+       cfs_list_t                      ln_lnds;
+
         lnet_pid_t             ln_pid;              /* requested pid */
 
         cfs_list_t             ln_nis;              /* LND instances */
index 9f9cd8f..5a0270f 100644 (file)
@@ -159,8 +159,12 @@ typedef enum {
  * or after the last item in the list.
  */
 typedef enum {
-        LNET_INS_BEFORE,
-        LNET_INS_AFTER
+       /** insert ME before current position or head of the list */
+       LNET_INS_BEFORE,
+       /** insert ME after current position or tail of the list */
+       LNET_INS_AFTER,
+       /** attach ME at tail of local CPU partition ME list */
+       LNET_INS_LOCAL
 } lnet_ins_pos_t;
 
 /** @} lnet_me */
index b88704e..a2e9df8 100644 (file)
@@ -92,7 +92,7 @@ void
 lnet_init_locks(void)
 {
        cfs_spin_lock_init(&the_lnet.ln_lock);
-       cfs_spin_lock_init(&the_lnet.ln_res_lock);
+       cfs_spin_lock_init(&the_lnet.ln_eq_wait_lock);
        cfs_waitq_init(&the_lnet.ln_eq_waitq);
        cfs_mutex_init(&the_lnet.ln_lnd_mutex);
        cfs_mutex_init(&the_lnet.ln_api_mutex);
@@ -175,7 +175,7 @@ lnet_get_networks (void)
 void lnet_init_locks(void)
 {
        the_lnet.ln_lock = 0;
-       the_lnet.ln_res_lock = 0;
+       the_lnet.ln_eq_wait_lock = 0;
        the_lnet.ln_lnd_mutex = 0;
        the_lnet.ln_api_mutex = 0;
 }
@@ -185,7 +185,7 @@ void lnet_fini_locks(void)
        LASSERT(the_lnet.ln_api_mutex == 0);
        LASSERT(the_lnet.ln_lnd_mutex == 0);
        LASSERT(the_lnet.ln_lock == 0);
-       LASSERT(the_lnet.ln_res_lock == 0);
+       LASSERT(the_lnet.ln_eq_wait_lock == 0);
 }
 
 # else
@@ -194,7 +194,7 @@ void lnet_init_locks(void)
 {
        pthread_cond_init(&the_lnet.ln_eq_cond, NULL);
        pthread_mutex_init(&the_lnet.ln_lock, NULL);
-       pthread_mutex_init(&the_lnet.ln_res_lock, NULL);
+       pthread_mutex_init(&the_lnet.ln_eq_wait_lock, NULL);
        pthread_mutex_init(&the_lnet.ln_lnd_mutex, NULL);
        pthread_mutex_init(&the_lnet.ln_api_mutex, NULL);
 }
@@ -204,13 +204,37 @@ void lnet_fini_locks(void)
        pthread_mutex_destroy(&the_lnet.ln_api_mutex);
        pthread_mutex_destroy(&the_lnet.ln_lnd_mutex);
        pthread_mutex_destroy(&the_lnet.ln_lock);
-       pthread_mutex_destroy(&the_lnet.ln_res_lock);
+       pthread_mutex_destroy(&the_lnet.ln_eq_wait_lock);
        pthread_cond_destroy(&the_lnet.ln_eq_cond);
 }
 
 # endif
 #endif
 
+static int
+lnet_create_locks(void)
+{
+       lnet_init_locks();
+
+       the_lnet.ln_res_lock = cfs_percpt_lock_alloc(lnet_cpt_table());
+       if (the_lnet.ln_res_lock != NULL)
+               return 0;
+
+       lnet_fini_locks();
+       return -ENOMEM;
+}
+
+static void
+lnet_destroy_locks(void)
+{
+       if (the_lnet.ln_res_lock != NULL) {
+               cfs_percpt_lock_free(the_lnet.ln_res_lock);
+               the_lnet.ln_res_lock = NULL;
+       }
+
+       lnet_fini_locks();
+}
+
 void lnet_assert_wire_constants (void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
@@ -486,7 +510,7 @@ lnet_res_container_cleanup(struct lnet_res_container *rec)
 
 int
 lnet_res_container_setup(struct lnet_res_container *rec,
-                        int type, int objnum, int objsz)
+                        int cpt, int type, int objnum, int objsz)
 {
        int     rc = 0;
        int     i;
@@ -502,11 +526,11 @@ lnet_res_container_setup(struct lnet_res_container *rec,
        if (rc != 0)
                goto out;
 #endif
-       rec->rec_lh_cookie = type;
+       rec->rec_lh_cookie = (cpt << LNET_COOKIE_TYPE_BITS) | type;
 
        /* Arbitrary choice of hash table size */
-       LIBCFS_ALLOC(rec->rec_lh_hash,
-                    LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0]));
+       LIBCFS_CPT_ALLOC(rec->rec_lh_hash, lnet_cpt_table(), cpt,
+                        LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0]));
        if (rec->rec_lh_hash == NULL) {
                rc = -ENOMEM;
                goto out;
@@ -524,6 +548,44 @@ out:
        return rc;
 }
 
+static void
+lnet_res_containers_destroy(struct lnet_res_container **recs)
+{
+       struct lnet_res_container       *rec;
+       int                             i;
+
+       cfs_percpt_for_each(rec, i, recs)
+               lnet_res_container_cleanup(rec);
+
+       cfs_percpt_free(recs);
+}
+
+static struct lnet_res_container **
+lnet_res_containers_create(int type, int objnum, int objsz)
+{
+       struct lnet_res_container       **recs;
+       struct lnet_res_container       *rec;
+       int                             rc;
+       int                             i;
+
+       recs = cfs_percpt_alloc(lnet_cpt_table(), sizeof(*rec));
+       if (recs == NULL) {
+               CERROR("Failed to allocate %s resource containers\n",
+                      lnet_res_type2str(type));
+               return NULL;
+       }
+
+       cfs_percpt_for_each(rec, i, recs) {
+               rc = lnet_res_container_setup(rec, i, type, objnum, objsz);
+               if (rc != 0) {
+                       lnet_res_containers_destroy(recs);
+                       return NULL;
+               }
+       }
+
+       return recs;
+}
+
 lnet_libhandle_t *
 lnet_res_lh_lookup(struct lnet_res_container *rec, __u64 cookie)
 {
@@ -535,7 +597,7 @@ lnet_res_lh_lookup(struct lnet_res_container *rec, __u64 cookie)
        if ((cookie & (LNET_COOKIE_TYPES - 1)) != rec->rec_type)
                return NULL;
 
-       hash = cookie >> LNET_COOKIE_TYPE_BITS;
+       hash = cookie >> (LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS);
        head = &rec->rec_lh_hash[hash & LNET_LH_HASH_MASK];
 
        cfs_list_for_each_entry(lh, head, lh_hash_chain) {
@@ -550,7 +612,7 @@ void
 lnet_res_lh_initialize(struct lnet_res_container *rec, lnet_libhandle_t *lh)
 {
        /* ALWAYS called with lnet_res_lock held */
-       unsigned int    ibits = LNET_COOKIE_TYPE_BITS;
+       unsigned int    ibits = LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS;
        unsigned int    hash;
 
        lh->lh_cookie = rec->rec_lh_cookie;
@@ -578,7 +640,8 @@ int
 lnet_prepare(lnet_pid_t requested_pid)
 {
         /* Prepare to bring up the network */
-        int               rc = 0;
+       struct lnet_res_container **recs;
+       int                       rc = 0;
 
         LASSERT (the_lnet.ln_refcount == 0);
 
@@ -624,31 +687,26 @@ lnet_prepare(lnet_pid_t requested_pid)
        if (rc != 0)
                goto failed1;
 
-       rc = lnet_res_container_setup(&the_lnet.ln_eq_container,
+       rc = lnet_res_container_setup(&the_lnet.ln_eq_container, 0,
                                      LNET_COOKIE_TYPE_EQ, LNET_FL_MAX_EQS,
                                      sizeof(lnet_eq_t));
-       if (rc != 0) {
-               CERROR("Failed to create EQ container for LNet: %d\n", rc);
+       if (rc != 0)
                goto failed2;
-       }
 
-       /* NB: we will have instance of ME container per CPT soon */
-       rc = lnet_res_container_setup(&the_lnet.ln_me_container,
-                                     LNET_COOKIE_TYPE_ME, LNET_FL_MAX_MES,
-                                     sizeof(lnet_me_t));
-       if (rc != 0) {
-               CERROR("Failed to create ME container for LNet: %d\n", rc);
+       recs = lnet_res_containers_create(LNET_COOKIE_TYPE_ME, LNET_FL_MAX_MES,
+                                         sizeof(lnet_me_t));
+       if (recs == NULL)
                goto failed3;
-       }
+
+       the_lnet.ln_me_containers = recs;
 
        /* NB: we will have instance of MD container per CPT soon */
-       rc = lnet_res_container_setup(&the_lnet.ln_md_container,
-                                     LNET_COOKIE_TYPE_MD, LNET_FL_MAX_MDS,
-                                     sizeof(lnet_libmd_t));
-       if (rc != 0) {
-               CERROR("Failed to create MD container for LNet: %d\n", rc);
+       recs = lnet_res_containers_create(LNET_COOKIE_TYPE_MD, LNET_FL_MAX_MDS,
+                                         sizeof(lnet_libmd_t));
+       if (recs == NULL)
                goto failed3;
-       }
+
+       the_lnet.ln_md_containers = recs;
 
        rc = lnet_portals_create();
        if (rc != 0) {
@@ -661,8 +719,14 @@ lnet_prepare(lnet_pid_t requested_pid)
  failed3:
        /* NB: lnet_res_container_cleanup is safe to call for
         * uninitialized container */
-       lnet_res_container_cleanup(&the_lnet.ln_md_container);
-       lnet_res_container_cleanup(&the_lnet.ln_me_container);
+       if (the_lnet.ln_md_containers != NULL) {
+               lnet_res_containers_destroy(the_lnet.ln_md_containers);
+               the_lnet.ln_md_containers = NULL;
+       }
+       if (the_lnet.ln_me_containers != NULL) {
+               lnet_res_containers_destroy(the_lnet.ln_me_containers);
+               the_lnet.ln_me_containers = NULL;
+       }
        lnet_res_container_cleanup(&the_lnet.ln_eq_container);
  failed2:
        lnet_msg_container_cleanup(&the_lnet.ln_msg_container);
@@ -690,8 +754,16 @@ lnet_unprepare (void)
 
        lnet_portals_destroy();
 
-       lnet_res_container_cleanup(&the_lnet.ln_md_container);
-       lnet_res_container_cleanup(&the_lnet.ln_me_container);
+       if (the_lnet.ln_md_containers != NULL) {
+               lnet_res_containers_destroy(the_lnet.ln_md_containers);
+               the_lnet.ln_md_containers = NULL;
+       }
+
+       if (the_lnet.ln_me_containers != NULL) {
+               lnet_res_containers_destroy(the_lnet.ln_me_containers);
+               the_lnet.ln_me_containers = NULL;
+       }
+
        lnet_res_container_cleanup(&the_lnet.ln_eq_container);
 
         lnet_free_rtrpools();
@@ -719,6 +791,30 @@ lnet_net2ni_locked (__u32 net)
         return NULL;
 }
 
+unsigned int
+lnet_nid_cpt_hash(lnet_nid_t nid)
+{
+       __u64           key = nid;
+       unsigned int    val;
+
+       val = cfs_hash_long(key, LNET_CPT_BITS);
+       /* NB: LNET_CP_NUMBER doesn't have to be PO2 */
+       if (val < LNET_CPT_NUMBER)
+               return val;
+
+       return (unsigned int)((key + val + (val >> 1)) % LNET_CPT_NUMBER);
+}
+
+int
+lnet_cpt_of_nid(lnet_nid_t nid)
+{
+       if (LNET_CPT_NUMBER == 1)
+               return 0; /* the only one */
+
+       return lnet_nid_cpt_hash(nid);
+}
+EXPORT_SYMBOL(lnet_cpt_of_nid);
+
 int
 lnet_islocalnet (__u32 net)
 {
@@ -1065,12 +1161,35 @@ lnet_startup_lndnis (void)
 int
 LNetInit(void)
 {
+       int     rc;
+
         lnet_assert_wire_constants ();
         LASSERT (!the_lnet.ln_init);
 
         memset(&the_lnet, 0, sizeof(the_lnet));
 
-        lnet_init_locks();
+       /* refer to global cfs_cpt_table for now */
+       the_lnet.ln_cpt_table   = cfs_cpt_table;
+       the_lnet.ln_cpt_number  = cfs_cpt_number(cfs_cpt_table);
+
+       LASSERT(the_lnet.ln_cpt_number > 0);
+       if (the_lnet.ln_cpt_number > LNET_CPT_MAX) {
+               /* we are under risk of consuming all lh_cookie */
+               CERROR("Can't have %d CPTs for LNet (max allowed is %d), "
+                      "please change setting of CPT-table and retry\n",
+                      the_lnet.ln_cpt_number, LNET_CPT_MAX);
+               return -1;
+       }
+
+       while ((1 << the_lnet.ln_cpt_bits) < the_lnet.ln_cpt_number)
+               the_lnet.ln_cpt_bits++;
+
+       rc = lnet_create_locks();
+       if (rc != 0) {
+               CERROR("Can't create LNet global locks: %d\n", rc);
+               return -1;
+       }
+
         the_lnet.ln_refcount = 0;
         the_lnet.ln_init = 1;
         LNetInvalidateHandle(&the_lnet.ln_rc_eqh);
@@ -1108,15 +1227,15 @@ LNetInit(void)
 void
 LNetFini(void)
 {
-        LASSERT (the_lnet.ln_init);
-        LASSERT (the_lnet.ln_refcount == 0);
+       LASSERT(the_lnet.ln_init);
+       LASSERT(the_lnet.ln_refcount == 0);
 
-        while (!cfs_list_empty(&the_lnet.ln_lnds))
-                lnet_unregister_lnd(cfs_list_entry(the_lnet.ln_lnds.next,
-                                                   lnd_t, lnd_list));
-        lnet_fini_locks();
+       while (!cfs_list_empty(&the_lnet.ln_lnds))
+               lnet_unregister_lnd(cfs_list_entry(the_lnet.ln_lnds.next,
+                                                  lnd_t, lnd_list));
+       lnet_destroy_locks();
 
-        the_lnet.ln_init = 0;
+       the_lnet.ln_init = 0;
 }
 
 /**
index 059afa2..dde4f45 100644 (file)
@@ -97,10 +97,8 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
 
        if (count != 0) {
                LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
-               if (eq->eq_events == NULL) {
-                       lnet_eq_free(eq);
-                       return -ENOMEM;
-               }
+               if (eq->eq_events == NULL)
+                       goto failed;
                /* NB allocator has set all event sequence numbers to 0,
                 * so all them should be earlier than eq_deq_seq */
        }
@@ -108,18 +106,37 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
         eq->eq_deq_seq = 1;
         eq->eq_enq_seq = 1;
         eq->eq_size = count;
-        eq->eq_refcount = 0;
         eq->eq_callback = callback;
 
-       lnet_res_lock();
+       eq->eq_refs = cfs_percpt_alloc(lnet_cpt_table(),
+                                      sizeof(*eq->eq_refs[0]));
+       if (eq->eq_refs == NULL)
+               goto failed;
+
+       /* MUST hold both exclusive lnet_res_lock */
+       lnet_res_lock(LNET_LOCK_EX);
+       /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
+        * both EQ lookup and poll event with only lnet_eq_wait_lock */
+       lnet_eq_wait_lock();
 
        lnet_res_lh_initialize(&the_lnet.ln_eq_container, &eq->eq_lh);
        cfs_list_add(&eq->eq_list, &the_lnet.ln_eq_container.rec_active);
 
-       lnet_res_unlock();
+       lnet_eq_wait_unlock();
+       lnet_res_unlock(LNET_LOCK_EX);
 
        lnet_eq2handle(handle, eq);
        return 0;
+
+failed:
+       if (eq->eq_events != NULL)
+               LIBCFS_FREE(eq->eq_events, count * sizeof(lnet_event_t));
+
+       if (eq->eq_refs != NULL)
+               cfs_percpt_free(eq->eq_refs);
+
+       lnet_eq_free(eq);
+       return -ENOMEM;
 }
 
 /**
@@ -135,48 +152,63 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
 int
 LNetEQFree(lnet_handle_eq_t eqh)
 {
-        lnet_eq_t     *eq;
-        int            size;
-        lnet_event_t  *events;
-
-        LASSERT (the_lnet.ln_init);
-        LASSERT (the_lnet.ln_refcount > 0);
-
-       lnet_res_lock();
+       struct lnet_eq  *eq;
+       lnet_event_t    *events = NULL;
+       int             **refs = NULL;
+       int             *ref;
+       int             rc = 0;
+       int             size = 0;
+       int             i;
+
+       LASSERT(the_lnet.ln_init);
+       LASSERT(the_lnet.ln_refcount > 0);
+
+       lnet_res_lock(LNET_LOCK_EX);
+       /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
+        * both EQ lookup and poll event with only lnet_eq_wait_lock */
+       lnet_eq_wait_lock();
 
        eq = lnet_handle2eq(&eqh);
        if (eq == NULL) {
-               lnet_res_unlock();
-               return -ENOENT;
+               rc = -ENOENT;
+               goto out;
        }
 
-       if (eq->eq_refcount != 0) {
-               CDEBUG(D_NET, "Event queue (%d) busy on destroy.\n",
-                      eq->eq_refcount);
-               lnet_res_unlock();
-               return -EBUSY;
+       cfs_percpt_for_each(ref, i, eq->eq_refs) {
+               LASSERT(*ref >= 0);
+               if (*ref == 0)
+                       continue;
+
+               CDEBUG(D_NET, "Event equeue (%d: %d) busy on destroy.\n",
+                      i, *ref);
+               rc = -EBUSY;
+               goto out;
        }
 
        /* stash for free after lock dropped */
        events  = eq->eq_events;
        size    = eq->eq_size;
+       refs    = eq->eq_refs;
 
        lnet_res_lh_invalidate(&eq->eq_lh);
        cfs_list_del(&eq->eq_list);
        lnet_eq_free_locked(eq);
-
-       lnet_res_unlock();
+ out:
+       lnet_eq_wait_unlock();
+       lnet_res_unlock(LNET_LOCK_EX);
 
        if (events != NULL)
                LIBCFS_FREE(events, size * sizeof(lnet_event_t));
+       if (refs != NULL)
+               cfs_percpt_free(refs);
 
-       return 0;
+       return rc;
 }
 
 void
 lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev)
 {
-       /* MUST called with resource lock hold */
+       /* MUST called with resource lock hold but w/o lnet_eq_wait_lock */
        int index;
 
        if (eq->eq_size == 0) {
@@ -185,6 +217,7 @@ lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev)
                return;
        }
 
+       lnet_eq_wait_lock();
        ev->sequence = eq->eq_enq_seq++;
 
        LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size));
@@ -207,19 +240,20 @@ lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev)
        pthread_cond_broadcast(&the_lnet.ln_eq_cond);
 # endif
 #endif
+       lnet_eq_wait_unlock();
 }
 
 int
 lnet_eq_dequeue_event(lnet_eq_t *eq, lnet_event_t *ev)
 {
-        int           new_index = eq->eq_deq_seq & (eq->eq_size - 1);
-        lnet_event_t *new_event = &eq->eq_events[new_index];
-        int           rc;
-        ENTRY;
+       int             new_index = eq->eq_deq_seq & (eq->eq_size - 1);
+       lnet_event_t    *new_event = &eq->eq_events[new_index];
+       int             rc;
+       ENTRY;
 
-        if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
-                RETURN(0);
-        }
+       /* must called with lnet_eq_wait_lock hold */
+       if (LNET_SEQ_GT(eq->eq_deq_seq, new_event->sequence))
+               RETURN(0);
 
         /* We've got a new event... */
         *ev = *new_event;
@@ -309,7 +343,7 @@ lnet_eq_wait_locked(int *timeout_ms)
        cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
        cfs_waitq_add(&the_lnet.ln_eq_waitq, &wl);
 
-       lnet_res_unlock();
+       lnet_eq_wait_unlock();
 
        if (tms < 0) {
                cfs_waitq_wait(&wl, CFS_TASK_INTERRUPTIBLE);
@@ -329,7 +363,7 @@ lnet_eq_wait_locked(int *timeout_ms)
        wait = tms != 0; /* might need to call here again */
        *timeout_ms = tms;
 
-       lnet_res_lock();
+       lnet_eq_wait_lock();
        cfs_waitq_del(&the_lnet.ln_eq_waitq, &wl);
 
        return wait;
@@ -342,10 +376,11 @@ static void
 lnet_eq_cond_wait(struct timespec *ts)
 {
        if (ts == NULL) {
-               pthread_cond_wait(&the_lnet.ln_eq_cond, &the_lnet.ln_res_lock);
+               pthread_cond_wait(&the_lnet.ln_eq_cond,
+                                 &the_lnet.ln_eq_wait_lock);
        } else {
                pthread_cond_timedwait(&the_lnet.ln_eq_cond,
-                                      &the_lnet.ln_res_lock, ts);
+                                      &the_lnet.ln_eq_wait_lock, ts);
        }
 }
 # endif
@@ -362,14 +397,14 @@ lnet_eq_wait_locked(int *timeout_ms)
        if (the_lnet.ln_eq_waitni != NULL) {
                /* I have a single NI that I have to call into, to get
                 * events queued, or to block. */
-               lnet_res_unlock();
+               lnet_eq_wait_unlock();
 
                LNET_LOCK();
                eq_waitni = the_lnet.ln_eq_waitni;
                if (unlikely(eq_waitni == NULL)) {
                        LNET_UNLOCK();
 
-                       lnet_res_lock();
+                       lnet_eq_wait_lock();
                        return -1;
                }
 
@@ -392,7 +427,7 @@ lnet_eq_wait_locked(int *timeout_ms)
                }
 
                lnet_ni_decref(eq_waitni);
-               lnet_res_lock();
+               lnet_eq_wait_lock();
        } else { /* w/o eq_waitni */
 # ifndef HAVE_LIBPTHREAD
                /* If I'm single-threaded, LNET fails at startup if it can't
@@ -480,30 +515,30 @@ LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
         if (neq < 1)
                 RETURN(-ENOENT);
 
-       lnet_res_lock();
+       lnet_eq_wait_lock();
 
         for (;;) {
 #ifndef __KERNEL__
-               lnet_res_unlock();
+               lnet_eq_wait_unlock();
 
                /* Recursion breaker */
                if (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
                    !LNetHandleIsEqual(eventqs[0], the_lnet.ln_rc_eqh))
                        lnet_router_checker();
 
-               lnet_res_lock();
+               lnet_eq_wait_lock();
 #endif
                for (i = 0; i < neq; i++) {
                        lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]);
 
                        if (eq == NULL) {
-                               lnet_res_unlock();
+                               lnet_eq_wait_unlock();
                                RETURN(-ENOENT);
                        }
 
                        rc = lnet_eq_dequeue_event(eq, event);
                        if (rc != 0) {
-                               lnet_res_unlock();
+                               lnet_eq_wait_unlock();
                                *which = i;
                                RETURN(rc);
                        }
@@ -524,6 +559,6 @@ LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
                        break;
        }
 
-       lnet_res_unlock();
+       lnet_eq_wait_unlock();
        RETURN(0);
 }
index dbb007e..96b4660 100644 (file)
@@ -71,12 +71,14 @@ lnet_md_unlink(lnet_libmd_t *md)
         CDEBUG(D_NET, "Unlinking md %p\n", md);
 
         if (md->md_eq != NULL) {
-                md->md_eq->eq_refcount--;
-                LASSERT (md->md_eq->eq_refcount >= 0);
-        }
+               int     cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
+
+               LASSERT(*md->md_eq->eq_refs[cpt] > 0);
+               (*md->md_eq->eq_refs[cpt])--;
+       }
 
-        LASSERT (!cfs_list_empty(&md->md_list));
-        cfs_list_del_init (&md->md_list);
+       LASSERT(!cfs_list_empty(&md->md_list));
+       cfs_list_del_init(&md->md_list);
        lnet_md_free_locked(md);
 }
 
@@ -163,9 +165,9 @@ lnet_md_build(lnet_libmd_t *lmd, lnet_md_t *umd, int unlink)
 
 /* must be called with resource lock held */
 static int
-lnet_md_link(lnet_libmd_t *md, lnet_handle_eq_t eq_handle)
+lnet_md_link(lnet_libmd_t *md, lnet_handle_eq_t eq_handle, int cpt)
 {
-       struct lnet_res_container *container = &the_lnet.ln_md_container;
+       struct lnet_res_container *container = the_lnet.ln_md_containers[cpt];
 
        /* NB we are passed an allocated, but inactive md.
         * if we return success, caller may lnet_md_unlink() it.
@@ -185,7 +187,7 @@ lnet_md_link(lnet_libmd_t *md, lnet_handle_eq_t eq_handle)
                if (md->md_eq == NULL)
                        return -ENOENT;
 
-               md->md_eq->eq_refcount++;
+               (*md->md_eq->eq_refs[cpt])++;
        }
 
        lnet_res_lh_initialize(container, &md->md_lh);
@@ -263,11 +265,12 @@ int
 LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
              lnet_unlink_t unlink, lnet_handle_md_t *handle)
 {
-       CFS_LIST_HEAD   (matches);
-       CFS_LIST_HEAD   (drops);
-        lnet_me_t     *me;
-        lnet_libmd_t  *md;
-        int            rc;
+       CFS_LIST_HEAD           (matches);
+       CFS_LIST_HEAD           (drops);
+       struct lnet_me          *me;
+       struct lnet_libmd       *md;
+       int                     cpt;
+       int                     rc;
 
         LASSERT (the_lnet.ln_init);
         LASSERT (the_lnet.ln_refcount > 0);
@@ -285,8 +288,9 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
                 return -ENOMEM;
 
        rc = lnet_md_build(md, &umd, unlink);
+       cpt = lnet_cpt_of_cookie(meh.cookie);
 
-       lnet_res_lock();
+       lnet_res_lock(cpt);
        if (rc != 0)
                goto failed;
 
@@ -296,7 +300,7 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
        else if (me->me_md != NULL)
                 rc = -EBUSY;
        else
-               rc = lnet_md_link(md, umd.eq_handle);
+               rc = lnet_md_link(md, umd.eq_handle, cpt);
 
        if (rc != 0)
                goto failed;
@@ -307,7 +311,7 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
 
        lnet_md2handle(handle, md);
 
-       lnet_res_unlock();
+       lnet_res_unlock(cpt);
 
        lnet_drop_delayed_msg_list(&drops, "Bad match");
        lnet_recv_delayed_msg_list(&matches);
@@ -317,7 +321,7 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
  failed:
        lnet_md_free_locked(md);
 
-       lnet_res_unlock();
+       lnet_res_unlock(cpt);
        return rc;
 }
 
@@ -340,8 +344,9 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
 int
 LNetMDBind(lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle)
 {
-        lnet_libmd_t  *md;
-        int            rc;
+       lnet_libmd_t    *md;
+       int             cpt;
+       int             rc;
 
         LASSERT (the_lnet.ln_init);
         LASSERT (the_lnet.ln_refcount > 0);
@@ -360,23 +365,23 @@ LNetMDBind(lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle)
 
        rc = lnet_md_build(md, &umd, unlink);
 
-       lnet_res_lock();
+       cpt = lnet_res_lock_current();
        if (rc != 0)
                goto failed;
 
-       rc = lnet_md_link(md, umd.eq_handle);
+       rc = lnet_md_link(md, umd.eq_handle, cpt);
        if (rc != 0)
                goto failed;
 
        lnet_md2handle(handle, md);
 
-       lnet_res_unlock();
+       lnet_res_unlock(cpt);
        return 0;
 
  failed:
        lnet_md_free_locked(md);
 
-       lnet_res_unlock();
+       lnet_res_unlock(cpt);
        return rc;
 }
 
@@ -412,17 +417,19 @@ LNetMDBind(lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle)
 int
 LNetMDUnlink (lnet_handle_md_t mdh)
 {
-        lnet_event_t     ev;
-        lnet_libmd_t    *md;
+       lnet_event_t    ev;
+       lnet_libmd_t    *md;
+       int             cpt;
 
-        LASSERT (the_lnet.ln_init);
-        LASSERT (the_lnet.ln_refcount > 0);
+       LASSERT(the_lnet.ln_init);
+       LASSERT(the_lnet.ln_refcount > 0);
 
-       lnet_res_lock();
+       cpt = lnet_cpt_of_cookie(mdh.cookie);
+       lnet_res_lock(cpt);
 
        md = lnet_handle2md(&mdh);
        if (md == NULL) {
-               lnet_res_unlock();
+               lnet_res_unlock(cpt);
                 return -ENOENT;
         }
 
@@ -438,6 +445,6 @@ LNetMDUnlink (lnet_handle_md_t mdh)
 
         lnet_md_unlink(md);
 
-       lnet_res_unlock();
+       lnet_res_unlock(cpt);
        return 0;
 }
index d4f325d..4324961 100644 (file)
@@ -78,14 +78,14 @@ LNetMEAttach(unsigned int portal,
              lnet_handle_me_t *handle)
 {
        struct lnet_match_table *mtable;
-        lnet_me_t        *me;
-        cfs_list_t       *head;
+       struct lnet_me          *me;
+       cfs_list_t              *head;
 
-        LASSERT (the_lnet.ln_init);
-        LASSERT (the_lnet.ln_refcount > 0);
+       LASSERT(the_lnet.ln_init);
+       LASSERT(the_lnet.ln_refcount > 0);
 
-        if ((int)portal >= the_lnet.ln_nportals)
-                return -EINVAL;
+       if ((int)portal >= the_lnet.ln_nportals)
+               return -EINVAL;
 
        mtable = lnet_mt_of_attach(portal, match_id,
                                   match_bits, ignore_bits, pos);
@@ -96,7 +96,7 @@ LNetMEAttach(unsigned int portal,
         if (me == NULL)
                 return -ENOMEM;
 
-       lnet_res_lock();
+       lnet_res_lock(mtable->mt_cpt);
 
         me->me_portal = portal;
         me->me_match_id = match_id;
@@ -105,20 +105,19 @@ LNetMEAttach(unsigned int portal,
         me->me_unlink = unlink;
         me->me_md = NULL;
 
-       lnet_res_lh_initialize(&the_lnet.ln_me_container, &me->me_lh);
+       lnet_res_lh_initialize(the_lnet.ln_me_containers[mtable->mt_cpt],
+                              &me->me_lh);
        head = lnet_mt_match_head(mtable, match_id, match_bits);
-        LASSERT (head != NULL);
 
-        if (pos == LNET_INS_AFTER)
-                cfs_list_add_tail(&me->me_list, head);
-        else
-                cfs_list_add(&me->me_list, head);
-
-        lnet_me2handle(handle, me);
+       if (pos == LNET_INS_AFTER || pos == LNET_INS_LOCAL)
+               cfs_list_add_tail(&me->me_list, head);
+       else
+               cfs_list_add(&me->me_list, head);
 
-       lnet_res_unlock();
+       lnet_me2handle(handle, me);
 
-        return 0;
+       lnet_res_unlock(mtable->mt_cpt);
+       return 0;
 }
 
 /**
@@ -145,24 +144,30 @@ LNetMEInsert(lnet_handle_me_t current_meh,
              lnet_unlink_t unlink, lnet_ins_pos_t pos,
              lnet_handle_me_t *handle)
 {
-        lnet_me_t     *current_me;
-        lnet_me_t     *new_me;
-        lnet_portal_t *ptl;
+       struct lnet_me          *current_me;
+       struct lnet_me          *new_me;
+       struct lnet_portal      *ptl;
+       int                     cpt;
+
+       LASSERT(the_lnet.ln_init);
+       LASSERT(the_lnet.ln_refcount > 0);
 
-        LASSERT (the_lnet.ln_init);
-        LASSERT (the_lnet.ln_refcount > 0);
+       if (pos == LNET_INS_LOCAL)
+               return -EPERM;
 
         new_me = lnet_me_alloc();
         if (new_me == NULL)
                 return -ENOMEM;
 
-       lnet_res_lock();
+       cpt = lnet_cpt_of_cookie(current_meh.cookie);
+
+       lnet_res_lock(cpt);
 
        current_me = lnet_handle2me(&current_meh);
        if (current_me == NULL) {
                lnet_me_free_locked(new_me);
 
-               lnet_res_unlock();
+               lnet_res_unlock(cpt);
                return -ENOENT;
        }
 
@@ -172,8 +177,8 @@ LNetMEInsert(lnet_handle_me_t current_meh,
        if (lnet_ptl_is_unique(ptl)) {
                 /* nosense to insertion on unique portal */
                lnet_me_free_locked(new_me);
-               lnet_res_unlock();
-                return -EPERM;
+               lnet_res_unlock(cpt);
+               return -EPERM;
         }
 
         new_me->me_portal = current_me->me_portal;
@@ -183,7 +188,7 @@ LNetMEInsert(lnet_handle_me_t current_meh,
         new_me->me_unlink = unlink;
         new_me->me_md = NULL;
 
-       lnet_res_lh_initialize(&the_lnet.ln_me_container, &new_me->me_lh);
+       lnet_res_lh_initialize(the_lnet.ln_me_containers[cpt], &new_me->me_lh);
 
         if (pos == LNET_INS_AFTER)
                 cfs_list_add(&new_me->me_list, &current_me->me_list);
@@ -192,7 +197,7 @@ LNetMEInsert(lnet_handle_me_t current_meh,
 
         lnet_me2handle(handle, new_me);
 
-       lnet_res_unlock();
+       lnet_res_unlock(cpt);
 
        return 0;
 }
@@ -214,18 +219,20 @@ LNetMEInsert(lnet_handle_me_t current_meh,
 int
 LNetMEUnlink(lnet_handle_me_t meh)
 {
-        lnet_me_t    *me;
-        lnet_libmd_t *md;
-        lnet_event_t  ev;
+       lnet_me_t       *me;
+       lnet_libmd_t    *md;
+       lnet_event_t    ev;
+       int             cpt;
 
-        LASSERT (the_lnet.ln_init);
-        LASSERT (the_lnet.ln_refcount > 0);
+       LASSERT(the_lnet.ln_init);
+       LASSERT(the_lnet.ln_refcount > 0);
 
-       lnet_res_lock();
+       cpt = lnet_cpt_of_cookie(meh.cookie);
+       lnet_res_lock(cpt);
 
-        me = lnet_handle2me(&meh);
-        if (me == NULL) {
-               lnet_res_unlock();
+       me = lnet_handle2me(&meh);
+       if (me == NULL) {
+               lnet_res_unlock(cpt);
                 return -ENOENT;
         }
 
@@ -235,12 +242,12 @@ LNetMEUnlink(lnet_handle_me_t meh)
             md->md_refcount == 0) {
                 lnet_build_unlink_event(md, &ev);
                lnet_eq_enqueue_event(md->md_eq, &ev);
-        }
+       }
 
-        lnet_me_unlink(me);
+       lnet_me_unlink(me);
 
-       lnet_res_unlock();
-        return 0;
+       lnet_res_unlock(cpt);
+       return 0;
 }
 
 /* call with lnet_res_lock please */
index 183bb72..099bcd0 100644 (file)
@@ -1483,8 +1483,10 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
         lnet_libmd_t     *md;
         int               rlength;
         int               mlength;
+       int                     cpt;
 
-       lnet_res_lock();
+       cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
+       lnet_res_lock(cpt);
 
         src.nid = hdr->src_nid;
         src.pid = hdr->src_pid;
@@ -1502,7 +1504,7 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
                         CERROR("REPLY MD also attached to portal %d\n",
                                md->md_me->me_portal);
 
-               lnet_res_unlock();
+               lnet_res_unlock(cpt);
                 return ENOENT;                  /* +ve: OK but no match */
         }
 
@@ -1518,7 +1520,7 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
                         rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
                         mlength);
-               lnet_res_unlock();
+               lnet_res_unlock(cpt);
                 return ENOENT;          /* +ve: OK but no match */
         }
 
@@ -1531,7 +1533,7 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
         if (mlength != 0)
                 lnet_setpayloadbuffer(msg);
 
-       lnet_res_unlock();
+       lnet_res_unlock(cpt);
 
        lnet_build_msg_event(msg, LNET_EVENT_REPLY);
 
@@ -1545,6 +1547,7 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
         lnet_hdr_t       *hdr = &msg->msg_hdr;
         lnet_process_id_t src = {0};
         lnet_libmd_t     *md;
+       int                     cpt;
 
         src.nid = hdr->src_nid;
         src.pid = hdr->src_pid;
@@ -1553,7 +1556,8 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
         hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
         hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
 
-       lnet_res_lock();
+       cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
+       lnet_res_lock(cpt);
 
         /* NB handles only looked up by creator (no flips) */
         md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
@@ -1569,7 +1573,7 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
                         CERROR("Source MD also attached to portal %d\n",
                                md->md_me->me_portal);
 
-               lnet_res_unlock();
+               lnet_res_unlock(cpt);
                 return ENOENT;                  /* +ve! */
         }
 
@@ -1579,7 +1583,7 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
 
        lnet_msg_attach_md(msg, md, 0, 0);
 
-       lnet_res_unlock();
+       lnet_res_unlock(cpt);
 
        lnet_build_msg_event(msg, LNET_EVENT_ACK);
 
@@ -2036,9 +2040,10 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
         __u64 match_bits, unsigned int offset,
         __u64 hdr_data)
 {
-        lnet_msg_t       *msg;
-        lnet_libmd_t     *md;
-        int               rc;
+       struct lnet_msg         *msg;
+       struct lnet_libmd       *md;
+       int                     cpt;
+       int                     rc;
 
         LASSERT (the_lnet.ln_init);
         LASSERT (the_lnet.ln_refcount > 0);
@@ -2059,7 +2064,8 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
         }
         msg->msg_vmflush = !!cfs_memory_pressure_get();
 
-       lnet_res_lock();
+       cpt = lnet_cpt_of_cookie(mdh.cookie);
+       lnet_res_lock(cpt);
 
         md = lnet_handle2md(&mdh);
         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
@@ -2069,11 +2075,9 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
                 if (md != NULL && md->md_me != NULL)
                         CERROR("Source MD also attached to portal %d\n",
                                md->md_me->me_portal);
-
-               lnet_res_unlock();
+               lnet_res_unlock(cpt);
 
                lnet_msg_free(msg);
-
                 return -ENOENT;
         }
 
@@ -2101,7 +2105,7 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
                         LNET_WIRE_HANDLE_COOKIE_NONE;
         }
 
-       lnet_res_unlock();
+       lnet_res_unlock(cpt);
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
 
@@ -2126,14 +2130,16 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
          * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
          * lnet_finalize() is called on it, so the LND must call this first */
 
-        lnet_msg_t        *msg = lnet_msg_alloc();
-        lnet_libmd_t      *getmd = getmsg->msg_md;
-        lnet_process_id_t  peer_id = getmsg->msg_target;
+       struct lnet_msg         *msg = lnet_msg_alloc();
+       struct lnet_libmd       *getmd = getmsg->msg_md;
+       lnet_process_id_t       peer_id = getmsg->msg_target;
+       int                     cpt;
 
-        LASSERT (!getmsg->msg_target_is_router);
-        LASSERT (!getmsg->msg_routing);
+       LASSERT(!getmsg->msg_target_is_router);
+       LASSERT(!getmsg->msg_routing);
 
-       lnet_res_lock();
+       cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
+       lnet_res_lock(cpt);
 
         LASSERT (getmd->md_refcount > 0);
 
@@ -2147,7 +2153,7 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
                 CERROR ("%s: Dropping REPLY from %s for inactive MD %p\n",
                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), 
                         getmd);
-               lnet_res_unlock();
+               lnet_res_unlock(cpt);
                goto drop;
         }
 
@@ -2164,7 +2170,7 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
        msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
 
        lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
-       lnet_res_unlock();
+       lnet_res_unlock(cpt);
 
        LNET_LOCK();
        lnet_msg_commit(msg, 0);
@@ -2227,9 +2233,10 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
         lnet_process_id_t target, unsigned int portal, 
         __u64 match_bits, unsigned int offset)
 {
-        lnet_msg_t       *msg;
-        lnet_libmd_t     *md;
-        int               rc;
+       struct lnet_msg         *msg;
+       struct lnet_libmd       *md;
+       int                     cpt;
+       int                     rc;
 
         LASSERT (the_lnet.ln_init);
         LASSERT (the_lnet.ln_refcount > 0);
@@ -2249,7 +2256,8 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
                 return -ENOMEM;
         }
 
-       lnet_res_lock();
+       cpt = lnet_cpt_of_cookie(mdh.cookie);
+       lnet_res_lock(cpt);
 
         md = lnet_handle2md(&mdh);
         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
@@ -2260,7 +2268,7 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
                         CERROR("REPLY MD also attached to portal %d\n",
                                md->md_me->me_portal);
 
-               lnet_res_unlock();
+               lnet_res_unlock(cpt);
 
                lnet_msg_free(msg);
 
@@ -2279,12 +2287,12 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
         msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
 
         /* NB handles only looked up by creator (no flips) */
-        msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie = 
-                the_lnet.ln_interface_cookie;
-        msg->msg_hdr.msg.get.return_wmd.wh_object_cookie = 
-                md->md_lh.lh_cookie;
+       msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
+               the_lnet.ln_interface_cookie;
+       msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
+               md->md_lh.lh_cookie;
 
-       lnet_res_unlock();
+       lnet_res_unlock(cpt);
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
 
index c3cb58b..f03df6b 100644 (file)
@@ -387,6 +387,7 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
 {
        struct lnet_msg_container       *container;
        int                             my_slot;
+       int                             cpt;
        int                             i;
 
         LASSERT (!cfs_in_interrupt ());
@@ -416,9 +417,11 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
         msg->msg_ev.status = status;
 
        if (msg->msg_md != NULL) {
-               lnet_res_lock();
+               cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
+
+               lnet_res_lock(cpt);
                lnet_msg_detach_md(msg, status);
-               lnet_res_unlock();
+               lnet_res_unlock(cpt);
        }
 
        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
index b1ce5a8..2ccb0cf 100644 (file)
 
 #include <lnet/lib-lnet.h>
 
+/* NB: add /proc interfaces in upcoming patches */
+int    portal_rotor;
+CFS_MODULE_PARM(portal_rotor, "i", int, 0644,
+               "redirect PUTs to different cpu-partitions");
+
 static int
 lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id,
                    __u64 mbits, __u64 ignore_bits)
@@ -56,10 +61,10 @@ lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id,
                goto match;
 
        /* unset, new portal */
-       lnet_res_lock();
+       lnet_ptl_lock(ptl);
        /* check again with lock */
        if (unlikely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl))) {
-               lnet_res_unlock();
+               lnet_ptl_unlock(ptl);
                goto match;
        }
 
@@ -69,7 +74,7 @@ lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id,
        else
                lnet_ptl_setopt(ptl, LNET_PTL_MATCH_WILDCARD);
 
-       lnet_res_unlock();
+       lnet_ptl_unlock(ptl);
 
        return 1;
 
@@ -80,6 +85,56 @@ lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id,
        return 1;
 }
 
+static void
+lnet_ptl_enable_mt(struct lnet_portal *ptl, int cpt)
+{
+       struct lnet_match_table *mtable = ptl->ptl_mtables[cpt];
+       int                     i;
+
+       /* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
+       LASSERT(lnet_ptl_is_wildcard(ptl));
+
+       mtable->mt_enabled = 1;
+
+       ptl->ptl_mt_maps[ptl->ptl_mt_nmaps] = cpt;
+       for (i = ptl->ptl_mt_nmaps - 1; i >= 0; i--) {
+               LASSERT(ptl->ptl_mt_maps[i] != cpt);
+               if (ptl->ptl_mt_maps[i] < cpt)
+                       break;
+
+               /* swap to order */
+               ptl->ptl_mt_maps[i + 1] = ptl->ptl_mt_maps[i];
+               ptl->ptl_mt_maps[i] = cpt;
+       }
+
+       ptl->ptl_mt_nmaps++;
+}
+
+static void
+lnet_ptl_disable_mt(struct lnet_portal *ptl, int cpt)
+{
+       struct lnet_match_table *mtable = ptl->ptl_mtables[cpt];
+       int                     i;
+
+       /* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
+       LASSERT(lnet_ptl_is_wildcard(ptl));
+
+       if (LNET_CPT_NUMBER == 1)
+               return; /* never disable the only match-table */
+
+       mtable->mt_enabled = 0;
+
+       LASSERT(ptl->ptl_mt_nmaps > 0 &&
+               ptl->ptl_mt_nmaps <= LNET_CPT_NUMBER);
+
+       /* remove it from mt_maps */
+       ptl->ptl_mt_nmaps--;
+       for (i = 0; i < ptl->ptl_mt_nmaps; i++) {
+               if (ptl->ptl_mt_maps[i] >= cpt) /* overwrite it */
+                       ptl->ptl_mt_maps[i] = ptl->ptl_mt_maps[i + 1];
+       }
+}
+
 static int
 lnet_try_match_md(lnet_libmd_t *md,
                  struct lnet_match_info *info, struct lnet_msg *msg)
@@ -90,12 +145,12 @@ lnet_try_match_md(lnet_libmd_t *md,
        unsigned int    mlength;
        lnet_me_t       *me = md->md_me;
 
-       /* mismatched MD op */
-       if ((md->md_options & info->mi_opc) == 0)
-               return LNET_MATCHMD_NONE;
-
        /* MD exhausted */
        if (lnet_md_exhausted(md))
+               return LNET_MATCHMD_NONE | LNET_MATCHMD_EXHAUSTED;
+
+       /* mismatched MD op */
+       if ((md->md_options & info->mi_opc) == 0)
                return LNET_MATCHMD_NONE;
 
        /* mismatched ME nid/pid? */
@@ -147,53 +202,105 @@ lnet_try_match_md(lnet_libmd_t *md,
        lnet_msg_attach_md(msg, md, offset, mlength);
        md->md_offset = offset + mlength;
 
+       if (!lnet_md_exhausted(md))
+               return LNET_MATCHMD_OK;
+
        /* Auto-unlink NOW, so the ME gets unlinked if required.
         * We bumped md->md_refcount above so the MD just gets flagged
         * for unlink when it is finalized. */
-       if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0 &&
-           lnet_md_exhausted(md)) {
+       if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0)
                lnet_md_unlink(md);
-       }
 
-       return LNET_MATCHMD_OK;
+       return LNET_MATCHMD_OK | LNET_MATCHMD_EXHAUSTED;
+}
+
+static struct lnet_match_table *
+lnet_match2mt(struct lnet_portal *ptl, lnet_process_id_t id, __u64 mbits)
+{
+       if (LNET_CPT_NUMBER == 1)
+               return ptl->ptl_mtables[0]; /* the only one */
+
+       /* if it's a unique portal, return match-table hashed by NID */
+       return lnet_ptl_is_unique(ptl) ?
+              ptl->ptl_mtables[lnet_cpt_of_nid(id.nid)] : NULL;
 }
 
 struct lnet_match_table *
 lnet_mt_of_attach(unsigned int index, lnet_process_id_t id,
                  __u64 mbits, __u64 ignore_bits, lnet_ins_pos_t pos)
 {
-       struct lnet_portal *ptl;
+       struct lnet_portal      *ptl;
+       struct lnet_match_table *mtable;
 
+       /* NB: called w/o lock */
        LASSERT(index < the_lnet.ln_nportals);
 
        if (!lnet_ptl_match_type(index, id, mbits, ignore_bits))
                return NULL;
 
        ptl = the_lnet.ln_portals[index];
-       /* NB: Now we only have one match-table for each portal,
-        * and will have match-table per CPT in upcoming changes,
-        * ME will be scattered to different match-tables based
-        * on attaching information */
-       return ptl->ptl_mtable;
+
+       mtable = lnet_match2mt(ptl, id, mbits);
+       if (mtable != NULL) /* unique portal or only one match-table */
+               return mtable;
+
+       /* it's a wildcard portal */
+       switch (pos) {
+       default:
+               return NULL;
+       case LNET_INS_BEFORE:
+       case LNET_INS_AFTER:
+               /* posted by no affinity thread, always hash to specific
+                * match-table to avoid buffer stealing which is heavy */
+               return ptl->ptl_mtables[ptl->ptl_index % LNET_CPT_NUMBER];
+       case LNET_INS_LOCAL:
+               /* posted by cpu-affinity thread */
+               return ptl->ptl_mtables[lnet_cpt_current()];
+       }
 }
 
 struct lnet_match_table *
 lnet_mt_of_match(unsigned int index, lnet_process_id_t id, __u64 mbits)
 {
-       struct lnet_portal *ptl;
+       struct lnet_match_table *mtable;
+       struct lnet_portal      *ptl;
+       int                     nmaps;
+       int                     rotor;
+       int                     cpt;
 
+       /* NB: called w/o lock */
        LASSERT(index < the_lnet.ln_nportals);
-
        ptl = the_lnet.ln_portals[index];
-       if (!lnet_ptl_is_unique(ptl) &&
-           !lnet_ptl_is_wildcard(ptl) && !lnet_ptl_is_lazy(ptl))
-               return NULL;
 
-       /* NB: Now we only have one match-table for each portal,
-        * and will have match-table per CPT in upcoming changes,
-        * request will be scattered to different match-tables based
-        * on matching information */
-       return ptl->ptl_mtable;
+       LASSERT(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl));
+
+       mtable = lnet_match2mt(ptl, id, mbits);
+       if (mtable != NULL)
+               return mtable;
+
+       /* it's a wildcard portal */
+       if (!portal_rotor) {
+               cpt = lnet_cpt_current();
+               if (ptl->ptl_mtables[cpt]->mt_enabled)
+                       return ptl->ptl_mtables[cpt];
+       }
+
+       rotor = ptl->ptl_rotor++;
+       cpt = rotor % LNET_CPT_NUMBER;
+
+       if (!ptl->ptl_mtables[cpt]->mt_enabled) {
+               /* is there any active entry for this portal? */
+               nmaps = ptl->ptl_mt_nmaps;
+               /* map to an active mtable to avoid heavy "stealing" */
+               if (nmaps != 0) {
+                       /* NB: there is possibility that ptl_mt_maps is being
+                        * changed because we are not under protection of
+                        * lnet_ptl_lock, but it shouldn't hurt anything */
+                       cpt = ptl->ptl_mt_maps[rotor % nmaps];
+               }
+       }
+
+       return ptl->ptl_mtables[cpt];
 }
 
 cfs_list_t *
@@ -205,14 +312,13 @@ lnet_mt_match_head(struct lnet_match_table *mtable,
        if (lnet_ptl_is_wildcard(ptl)) {
                return &mtable->mt_mlist;
 
-       } else if (lnet_ptl_is_unique(ptl)) {
+       } else {
                unsigned long hash = mbits + id.nid + id.pid;
 
+               LASSERT(lnet_ptl_is_unique(ptl));
                hash = cfs_hash_long(hash, LNET_MT_HASH_BITS);
                return &mtable->mt_mhash[hash];
        }
-
-       return NULL;
 }
 
 int
@@ -222,11 +328,14 @@ lnet_mt_match_md(struct lnet_match_table *mtable,
        cfs_list_t              *head;
        lnet_me_t               *me;
        lnet_me_t               *tmp;
+       int                     exhausted = 0;
        int                     rc;
 
+       /* NB: only wildcard portal can return LNET_MATCHMD_EXHAUSTED */
+       if (lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]))
+               exhausted = LNET_MATCHMD_EXHAUSTED;
+
        head = lnet_mt_match_head(mtable, info->mi_id, info->mi_mbits);
-       if (head == NULL) /* nobody posted anything on this portal */
-               goto out;
 
        cfs_list_for_each_entry_safe(me, tmp, head, me_list) {
                /* ME attached but MD not attached yet */
@@ -236,28 +345,128 @@ lnet_mt_match_md(struct lnet_match_table *mtable,
                LASSERT(me == me->me_md->md_me);
 
                rc = lnet_try_match_md(me->me_md, info, msg);
-               switch (rc) {
-               default:
-                       LBUG();
-
-               case LNET_MATCHMD_NONE:
-                       continue;
-
-               case LNET_MATCHMD_OK:
-                       return LNET_MATCHMD_OK;
+               if ((rc & LNET_MATCHMD_EXHAUSTED) == 0)
+                       exhausted = 0; /* mlist is not empty */
 
-               case LNET_MATCHMD_DROP:
-                       return LNET_MATCHMD_DROP;
+               if ((rc & LNET_MATCHMD_FINISH) != 0) {
+                       /* don't return EXHAUSTED bit because we don't know
+                        * whether the mlist is empty or not */
+                       return rc & ~LNET_MATCHMD_EXHAUSTED;
                }
-               /* not reached */
        }
 
- out:
        if (info->mi_opc == LNET_MD_OP_GET ||
            !lnet_ptl_is_lazy(the_lnet.ln_portals[info->mi_portal]))
-               return LNET_MATCHMD_DROP;
+               return LNET_MATCHMD_DROP | exhausted;
+
+       return LNET_MATCHMD_NONE | exhausted;
+}
+
+static int
+lnet_ptl_match_early(struct lnet_portal *ptl, struct lnet_msg *msg)
+{
+       int     rc;
+
+       /* message arrived before any buffer posting on this portal,
+        * simply delay or drop this message */
+       if (likely(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)))
+               return 0;
+
+       lnet_ptl_lock(ptl);
+       /* check it again with hold of lock */
+       if (lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)) {
+               lnet_ptl_unlock(ptl);
+               return 0;
+       }
+
+       if (lnet_ptl_is_lazy(ptl)) {
+               if (msg->msg_rx_ready_delay) {
+                       msg->msg_rx_delayed = 1;
+                       cfs_list_add_tail(&msg->msg_list,
+                                         &ptl->ptl_msg_delayed);
+               }
+               rc = LNET_MATCHMD_NONE;
+       } else {
+               rc = LNET_MATCHMD_DROP;
+       }
 
-       return LNET_MATCHMD_NONE;
+       lnet_ptl_unlock(ptl);
+       return rc;
+}
+
+static int
+lnet_ptl_match_delay(struct lnet_portal *ptl,
+                    struct lnet_match_info *info, struct lnet_msg *msg)
+{
+       int     first = ptl->ptl_mt_maps[0]; /* read w/o lock */
+       int     rc = 0;
+       int     i;
+
+       /* steal buffer from other CPTs, and delay it if nothing to steal,
+        * this function is more expensive than a regular match, but we
+        * don't expect it can happen a lot */
+       LASSERT(lnet_ptl_is_wildcard(ptl));
+
+       for (i = 0; i < LNET_CPT_NUMBER; i++) {
+               struct lnet_match_table *mtable;
+               int                     cpt;
+
+               cpt = (first + i) % LNET_CPT_NUMBER;
+               mtable = ptl->ptl_mtables[cpt];
+               if (i != 0 && i != LNET_CPT_NUMBER - 1 && !mtable->mt_enabled)
+                       continue;
+
+               lnet_res_lock(cpt);
+               lnet_ptl_lock(ptl);
+
+               if (i == 0) { /* the first try, attach on stealing list */
+                       cfs_list_add_tail(&msg->msg_list,
+                                         &ptl->ptl_msg_stealing);
+               }
+
+               if (!cfs_list_empty(&msg->msg_list)) { /* on stealing list */
+                       rc = lnet_mt_match_md(mtable, info, msg);
+
+                       if ((rc & LNET_MATCHMD_EXHAUSTED) != 0 &&
+                           mtable->mt_enabled)
+                               lnet_ptl_disable_mt(ptl, cpt);
+
+                       if ((rc & LNET_MATCHMD_FINISH) != 0)
+                               cfs_list_del_init(&msg->msg_list);
+
+               } else {
+                       /* could be matched by lnet_ptl_attach_md()
+                        * which is called by another thread */
+                       rc = msg->msg_md == NULL ?
+                            LNET_MATCHMD_DROP : LNET_MATCHMD_OK;
+               }
+
+               if (!cfs_list_empty(&msg->msg_list) && /* not matched yet */
+                   (i == LNET_CPT_NUMBER - 1 || /* the last CPT */
+                    ptl->ptl_mt_nmaps == 0 ||   /* no active CPT */
+                    (ptl->ptl_mt_nmaps == 1 &&  /* the only active CPT */
+                     ptl->ptl_mt_maps[0] == cpt))) {
+                       /* nothing to steal, delay or drop */
+                       cfs_list_del_init(&msg->msg_list);
+
+                       if (lnet_ptl_is_lazy(ptl)) {
+                               msg->msg_rx_delayed = 1;
+                               cfs_list_add_tail(&msg->msg_list,
+                                                 &ptl->ptl_msg_delayed);
+                               rc = LNET_MATCHMD_NONE;
+                       } else {
+                               rc = LNET_MATCHMD_DROP;
+                       }
+               }
+
+               lnet_ptl_unlock(ptl);
+               lnet_res_unlock(cpt);
+
+               if ((rc & LNET_MATCHMD_FINISH) != 0 || msg->msg_rx_delayed)
+                       break;
+       }
+
+       return rc;
 }
 
 int
@@ -277,43 +486,64 @@ lnet_ptl_match_md(struct lnet_match_info *info, struct lnet_msg *msg)
                return LNET_MATCHMD_DROP;
        }
 
+       ptl = the_lnet.ln_portals[info->mi_portal];
+       rc = lnet_ptl_match_early(ptl, msg);
+       if (rc != 0) /* matched or delayed early message */
+               return rc;
+
        mtable = lnet_mt_of_match(info->mi_portal,
                                  info->mi_id, info->mi_mbits);
-       if (mtable == NULL) {
-               CDEBUG(D_NET, "Drop early message from %s of length %d into "
-                             "portal %d MB="LPX64"\n",
-                             libcfs_id2str(info->mi_id), info->mi_rlength,
-                             info->mi_portal, info->mi_mbits);
-               return LNET_MATCHMD_DROP;
-       }
-
-       ptl = the_lnet.ln_portals[info->mi_portal];
-       lnet_res_lock();
+       lnet_res_lock(mtable->mt_cpt);
 
        if (the_lnet.ln_shutdown) {
-               rc =  LNET_MATCHMD_DROP;
-               goto out;
+               rc = LNET_MATCHMD_DROP;
+               goto out1;
        }
 
        rc = lnet_mt_match_md(mtable, info, msg);
-       if (rc != LNET_MATCHMD_NONE) /* matched or dropping */
-               goto out;
+       if ((rc & LNET_MATCHMD_EXHAUSTED) != 0 && mtable->mt_enabled) {
+               lnet_ptl_lock(ptl);
+               lnet_ptl_disable_mt(ptl, mtable->mt_cpt);
+               lnet_ptl_unlock(ptl);
+       }
+
+       if ((rc & LNET_MATCHMD_FINISH) != 0)    /* matched or dropping */
+               goto out1;
 
        if (!msg->msg_rx_ready_delay)
-               goto out;
+               goto out1;
 
+       LASSERT(lnet_ptl_is_lazy(ptl));
        LASSERT(!msg->msg_rx_delayed);
-       msg->msg_rx_delayed = 1;
-       cfs_list_add_tail(&msg->msg_list, &ptl->ptl_msgq);
-
-       CDEBUG(D_NET,
-              "Delaying %s from %s portal %d MB "LPX64" offset %d len %d\n",
-              info->mi_opc == LNET_MD_OP_PUT ? "PUT" : "GET",
-              libcfs_id2str(info->mi_id), info->mi_portal,
-              info->mi_mbits, info->mi_roffset, info->mi_rlength);
- out:
-       lnet_res_unlock();
-       return rc;
+
+       /* NB: we don't expect "delay" can happen a lot */
+       if (lnet_ptl_is_unique(ptl) || LNET_CPT_NUMBER == 1) {
+               lnet_ptl_lock(ptl);
+
+               msg->msg_rx_delayed = 1;
+               cfs_list_add_tail(&msg->msg_list, &ptl->ptl_msg_delayed);
+
+               lnet_ptl_unlock(ptl);
+               lnet_res_unlock(mtable->mt_cpt);
+
+       } else  {
+               lnet_res_unlock(mtable->mt_cpt);
+               rc = lnet_ptl_match_delay(ptl, info, msg);
+       }
+
+       if (msg->msg_rx_delayed) {
+               CDEBUG(D_NET,
+                      "Delaying %s from %s ptl %d MB "LPX64" off %d len %d\n",
+                      info->mi_opc == LNET_MD_OP_PUT ? "PUT" : "GET",
+                      libcfs_id2str(info->mi_id), info->mi_portal,
+                      info->mi_mbits, info->mi_roffset, info->mi_rlength);
+       }
+       goto out0;
+ out1:
+       lnet_res_unlock(mtable->mt_cpt);
+ out0:
+       /* EXHAUSTED bit is only meaningful for internal functions */
+       return rc & ~LNET_MATCHMD_EXHAUSTED;
 }
 
 void
@@ -331,20 +561,35 @@ lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md,
                   cfs_list_t *matches, cfs_list_t *drops)
 {
        struct lnet_portal      *ptl = the_lnet.ln_portals[me->me_portal];
+       struct lnet_match_table *mtable;
+       cfs_list_t              *head;
        lnet_msg_t              *tmp;
        lnet_msg_t              *msg;
+       int                     exhausted = 0;
+       int                     cpt;
 
        LASSERT(md->md_refcount == 0); /* a brand new MD */
 
        me->me_md = md;
        md->md_me = me;
 
-       cfs_list_for_each_entry_safe(msg, tmp, &ptl->ptl_msgq, msg_list) {
+       cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
+       mtable = ptl->ptl_mtables[cpt];
+
+       if (cfs_list_empty(&ptl->ptl_msg_stealing) &&
+           cfs_list_empty(&ptl->ptl_msg_delayed) &&
+           mtable->mt_enabled)
+               return;
+
+       lnet_ptl_lock(ptl);
+       head = &ptl->ptl_msg_stealing;
+ again:
+       cfs_list_for_each_entry_safe(msg, tmp, head, msg_list) {
                struct lnet_match_info  info;
                lnet_hdr_t              *hdr;
                int                     rc;
 
-               LASSERT(msg->msg_rx_delayed);
+               LASSERT(msg->msg_rx_delayed || head == &ptl->ptl_msg_stealing);
 
                hdr   = &msg->msg_hdr;
                info.mi_id.nid  = hdr->src_nid;
@@ -357,13 +602,25 @@ lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md,
 
                rc = lnet_try_match_md(md, &info, msg);
 
-               if (rc == LNET_MATCHMD_NONE)
+               exhausted = (rc & LNET_MATCHMD_EXHAUSTED) != 0;
+               if ((rc & LNET_MATCHMD_NONE) != 0) {
+                       if (exhausted)
+                               break;
                        continue;
+               }
 
                /* Hurrah! This _is_ a match */
-               cfs_list_del(&msg->msg_list);
+               LASSERT((rc & LNET_MATCHMD_FINISH) != 0);
+               cfs_list_del_init(&msg->msg_list);
 
-               if (rc == LNET_MATCHMD_OK) {
+               if (head == &ptl->ptl_msg_stealing) {
+                       if (exhausted)
+                               break;
+                       /* stealing thread will handle the message */
+                       continue;
+               }
+
+               if ((rc & LNET_MATCHMD_OK) != 0) {
                        cfs_list_add_tail(&msg->msg_list, matches);
 
                        CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
@@ -372,33 +629,45 @@ lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md,
                               info.mi_portal, info.mi_mbits,
                               info.mi_roffset, info.mi_rlength);
                } else {
-                       LASSERT(rc == LNET_MATCHMD_DROP);
-
                        cfs_list_add_tail(&msg->msg_list, drops);
                }
 
-               if (lnet_md_exhausted(md))
+               if (exhausted)
                        break;
        }
+
+       if (!exhausted && head == &ptl->ptl_msg_stealing) {
+               head = &ptl->ptl_msg_delayed;
+               goto again;
+       }
+
+       if (lnet_ptl_is_wildcard(ptl) && !exhausted && !mtable->mt_enabled)
+               lnet_ptl_enable_mt(ptl, cpt);
+
+       lnet_ptl_unlock(ptl);
 }
 
 void
 lnet_ptl_cleanup(struct lnet_portal *ptl)
 {
        struct lnet_match_table *mtable;
+       int                     i;
 
-       LASSERT(cfs_list_empty(&ptl->ptl_msgq));
-
-       if (ptl->ptl_mtable == NULL) /* uninitialized portal */
+       if (ptl->ptl_mtables == NULL) /* uninitialized portal */
                return;
 
-       do { /* iterate over match-tables when we have percpt match-table */
+       LASSERT(cfs_list_empty(&ptl->ptl_msg_delayed));
+       LASSERT(cfs_list_empty(&ptl->ptl_msg_stealing));
+#ifndef __KERNEL__
+# ifdef HAVE_LIBPTHREAD
+       pthread_mutex_destroy(&ptl->ptl_lock);
+# endif
+#endif
+       cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
                cfs_list_t      *mhash;
                lnet_me_t       *me;
                int             j;
 
-               mtable = ptl->ptl_mtable;
-
                if (mtable->mt_mhash == NULL) /* uninitialized match-table */
                        continue;
 
@@ -423,10 +692,10 @@ lnet_ptl_cleanup(struct lnet_portal *ptl)
                }
 
                LIBCFS_FREE(mhash, sizeof(*mhash) * LNET_MT_HASH_SIZE);
-       } while (0);
+       }
 
-       LIBCFS_FREE(ptl->ptl_mtable, sizeof(*mtable));
-       ptl->ptl_mtable = NULL;
+       cfs_percpt_free(ptl->ptl_mtables);
+       ptl->ptl_mtables = NULL;
 }
 
 int
@@ -434,20 +703,29 @@ lnet_ptl_setup(struct lnet_portal *ptl, int index)
 {
        struct lnet_match_table *mtable;
        cfs_list_t              *mhash;
+       int                     i;
        int                     j;
 
-       ptl->ptl_index = index;
-       CFS_INIT_LIST_HEAD(&ptl->ptl_msgq);
-
-       LIBCFS_ALLOC(mtable, sizeof(*mtable));
-       if (mtable == NULL) {
+       ptl->ptl_mtables = cfs_percpt_alloc(lnet_cpt_table(),
+                                           sizeof(struct lnet_match_table));
+       if (ptl->ptl_mtables == NULL) {
                CERROR("Failed to create match table for portal %d\n", index);
                return -ENOMEM;
        }
 
-       ptl->ptl_mtable = mtable;
-       do { /* iterate over match-tables when we have percpt match-table */
-               LIBCFS_ALLOC(mhash, sizeof(*mhash) * LNET_MT_HASH_SIZE);
+       ptl->ptl_index = index;
+       CFS_INIT_LIST_HEAD(&ptl->ptl_msg_delayed);
+       CFS_INIT_LIST_HEAD(&ptl->ptl_msg_stealing);
+#ifdef __KERNEL__
+       cfs_spin_lock_init(&ptl->ptl_lock);
+#else
+# ifdef HAVE_LIBPTHREAD
+       pthread_mutex_init(&ptl->ptl_lock, NULL);
+# endif
+#endif
+       cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
+               LIBCFS_CPT_ALLOC(mhash, lnet_cpt_table(), i,
+                                sizeof(*mhash) * LNET_MT_HASH_SIZE);
                if (mhash == NULL) {
                        CERROR("Failed to create match hash for portal %d\n",
                               index);
@@ -460,7 +738,8 @@ lnet_ptl_setup(struct lnet_portal *ptl, int index)
 
                CFS_INIT_LIST_HEAD(&mtable->mt_mlist);
                mtable->mt_portal = index;
-       } while (0);
+               mtable->mt_cpt = i;
+       }
 
        return 0;
  failed:
@@ -489,7 +768,7 @@ lnet_portals_create(void)
        int     size;
        int     i;
 
-       size = sizeof(struct lnet_portal);
+       size = offsetof(struct lnet_portal, ptl_mt_maps[LNET_CPT_NUMBER]);
 
        the_lnet.ln_nportals = MAX_PORTALS;
        the_lnet.ln_portals = cfs_array_alloc(the_lnet.ln_nportals, size);
@@ -547,9 +826,13 @@ LNetSetLazyPortal(int portal)
        CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
        ptl = the_lnet.ln_portals[portal];
 
-       lnet_res_lock();
+       lnet_res_lock(LNET_LOCK_EX);
+       lnet_ptl_lock(ptl);
+
        lnet_ptl_setopt(ptl, LNET_PTL_LAZY);
-       lnet_res_unlock();
+
+       lnet_ptl_unlock(ptl);
+       lnet_res_unlock(LNET_LOCK_EX);
 
        return 0;
 }
@@ -574,10 +857,12 @@ LNetClearLazyPortal(int portal)
 
        ptl = the_lnet.ln_portals[portal];
 
-       lnet_res_lock();
+       lnet_res_lock(LNET_LOCK_EX);
+       lnet_ptl_lock(ptl);
 
        if (!lnet_ptl_is_lazy(ptl)) {
-               lnet_res_unlock();
+               lnet_ptl_unlock(ptl);
+               lnet_res_unlock(LNET_LOCK_EX);
                return 0;
        }
 
@@ -587,11 +872,12 @@ LNetClearLazyPortal(int portal)
                CDEBUG(D_NET, "clearing portal %d lazy\n", portal);
 
        /* grab all the blocked messages atomically */
-       cfs_list_splice_init(&ptl->ptl_msgq, &zombies);
+       cfs_list_splice_init(&ptl->ptl_msg_delayed, &zombies);
 
        lnet_ptl_unsetopt(ptl, LNET_PTL_LAZY);
 
-       lnet_res_unlock();
+       lnet_ptl_unlock(ptl);
+       lnet_res_unlock(LNET_LOCK_EX);
 
        lnet_drop_delayed_msg_list(&zombies, "Clearing lazy portal attr");