From: Liang Zhen Date: Mon, 11 Jun 2012 14:28:23 +0000 (+0800) Subject: LU-56 lnet: Partitioned LNet resources (ME/MD/EQ) X-Git-Tag: 2.2.58~6 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=279bbc81e03dc74d273ec12b4d9e703ca94404c4 LU-56 lnet: Partitioned LNet resources (ME/MD/EQ) We already have a new lock lnet_res_lock to protect LNet resources, but it's still a global lock and could have performance issue. This patch created partitioned data for LNet, resources are spreaded into different partitions. Also, lnet_res_lock is not a single spinlock anymore, it's a percpt lock now, which means LNet only needs to lock one partition at a time while operating MD/ME belonging to that partition. There are a few things are still serialized by exclusive lock: - EQ allocation/free - LNetEQPoll (non-zero size EQ) - delay message on lazy portal - Steaing MD between partitions. There operations are either rare or deprecated so they shouldn't become performance problem. Signed-off-by: Liang Zhen Change-Id: If5e88b92dd508b84c0fd91725b3aaed424dd3108 Reviewed-on: http://review.whamcloud.com/3078 Reviewed-by: Bobi Jam Tested-by: Hudson Reviewed-by: Doug Oucharek Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lnet/include/lnet/lib-lnet.h b/lnet/include/lnet/lib-lnet.h index 9bf8099..6f16f49 100644 --- a/lnet/include/lnet/lib-lnet.h +++ b/lnet/include/lnet/lib-lnet.h @@ -56,6 +56,35 @@ extern lnet_t the_lnet; /* THE network */ +#if !defined(__KERNEL__) || defined(LNET_USE_LIB_FREELIST) +/* 1 CPT, simplify implementation... */ +# define LNET_CPT_MAX_BITS 0 + +#else /* KERNEL and no freelist */ + +# if (BITS_PER_LONG == 32) +/* 2 CPTs, allowing more CPTs might make us under memory pressure */ +# define LNET_CPT_MAX_BITS 1 + +# else /* 64-bit system */ +/* + * 256 CPTs for thousands of CPUs, allowing more CPTs might make us + * under risk of consuming all lh_cooke. + */ +# define LNET_CPT_MAX_BITS 8 +# endif /* BITS_PER_LONG == 32 */ +#endif + +/* max allowed CPT number */ +#define LNET_CPT_MAX (1 << LNET_CPT_MAX_BITS) + +#define LNET_CPT_NUMBER (the_lnet.ln_cpt_number) +#define LNET_CPT_BITS (the_lnet.ln_cpt_bits) +#define LNET_CPT_MASK ((1ULL << LNET_CPT_BITS) - 1) + +/** exclusive lock */ +#define LNET_LOCK_EX CFS_PERCPT_LOCK_EX + static inline int lnet_is_wire_handle_none (lnet_handle_wire_t *wh) { return (wh->wh_interface_cookie == LNET_WIRE_HANDLE_COOKIE_NONE && @@ -86,25 +115,53 @@ static inline int lnet_md_unlinkable (lnet_libmd_t *md) lnet_md_exhausted(md)); } -#ifdef __KERNEL__ +#define lnet_cpt_table() (the_lnet.ln_cpt_table) +#define lnet_cpt_current() cfs_cpt_current(the_lnet.ln_cpt_table, 1) + +static inline int +lnet_cpt_of_cookie(__u64 cookie) +{ + unsigned int cpt = (cookie >> LNET_COOKIE_TYPE_BITS) & LNET_CPT_MASK; + + /* LNET_CPT_NUMBER doesn't have to be power2, which means we can + * get illegal cpt from it's invalid cookie */ + return cpt < LNET_CPT_NUMBER ? cpt : cpt % LNET_CPT_NUMBER; +} static inline void -lnet_res_lock(void) +lnet_res_lock(int cpt) { - cfs_spin_lock(&the_lnet.ln_res_lock); + cfs_percpt_lock(the_lnet.ln_res_lock, cpt); } static inline void -lnet_res_unlock(void) +lnet_res_unlock(int cpt) { - cfs_spin_unlock(&the_lnet.ln_res_lock); + cfs_percpt_unlock(the_lnet.ln_res_lock, cpt); } -#define LNET_LOCK() cfs_spin_lock(&the_lnet.ln_lock) -#define LNET_UNLOCK() cfs_spin_unlock(&the_lnet.ln_lock) -#define LNET_MUTEX_LOCK(m) cfs_mutex_lock(m) -#define LNET_MUTEX_UNLOCK(m) cfs_mutex_unlock(m) -#else +static inline int +lnet_res_lock_current(void) +{ + int cpt = lnet_cpt_current(); + + lnet_res_lock(cpt); + return cpt; +} + +#ifdef __KERNEL__ + +#define lnet_ptl_lock(ptl) cfs_spin_lock(&(ptl)->ptl_lock) +#define lnet_ptl_unlock(ptl) cfs_spin_unlock(&(ptl)->ptl_lock) +#define lnet_eq_wait_lock() cfs_spin_lock(&the_lnet.ln_eq_wait_lock) +#define lnet_eq_wait_unlock() cfs_spin_unlock(&the_lnet.ln_eq_wait_lock) +#define LNET_LOCK() cfs_spin_lock(&the_lnet.ln_lock) +#define LNET_UNLOCK() cfs_spin_unlock(&the_lnet.ln_lock) +#define LNET_MUTEX_LOCK(m) cfs_mutex_lock(m) +#define LNET_MUTEX_UNLOCK(m) cfs_mutex_unlock(m) + +#else /* !__KERNEL__ */ + # ifndef HAVE_LIBPTHREAD #define LNET_SINGLE_THREADED_LOCK(l) \ do { \ @@ -123,21 +180,31 @@ do { \ #define LNET_MUTEX_LOCK(m) LNET_SINGLE_THREADED_LOCK(*(m)) #define LNET_MUTEX_UNLOCK(m) LNET_SINGLE_THREADED_UNLOCK(*(m)) -#define lnet_res_lock() \ - LNET_SINGLE_THREADED_LOCK(the_lnet.ln_res_lock) -#define lnet_res_unlock() \ - LNET_SINGLE_THREADED_UNLOCK(the_lnet.ln_res_lock) +#define lnet_ptl_lock(ptl) \ + LNET_SINGLE_THREADED_LOCK((ptl)->ptl_lock) +#define lnet_ptl_unlock(ptl) \ + LNET_SINGLE_THREADED_UNLOCK((ptl)->ptl_lock) + +#define lnet_eq_wait_lock() \ + LNET_SINGLE_THREADED_LOCK(the_lnet.ln_eq_wait_lock) +#define lnet_eq_wait_unlock() \ + LNET_SINGLE_THREADED_UNLOCK(the_lnet.ln_eq_wait_lock) + +# else /* HAVE_LIBPTHREAD */ -# else #define LNET_LOCK() pthread_mutex_lock(&the_lnet.ln_lock) #define LNET_UNLOCK() pthread_mutex_unlock(&the_lnet.ln_lock) #define LNET_MUTEX_LOCK(m) pthread_mutex_lock(m) #define LNET_MUTEX_UNLOCK(m) pthread_mutex_unlock(m) -#define lnet_res_lock() pthread_mutex_lock(&the_lnet.ln_res_lock) -#define lnet_res_unlock() pthread_mutex_unlock(&the_lnet.ln_res_lock) -# endif -#endif +#define lnet_ptl_lock(ptl) pthread_mutex_lock(&(ptl)->ptl_lock) +#define lnet_ptl_unlock(ptl) pthread_mutex_unlock(&(ptl)->ptl_lock) + +#define lnet_eq_wait_lock() pthread_mutex_lock(&the_lnet.ln_eq_wait_lock) +#define lnet_eq_wait_unlock() pthread_mutex_unlock(&the_lnet.ln_eq_wait_lock) + +# endif /* HAVE_LIBPTHREAD */ +#endif /* __KERNEL__ */ #define MAX_PORTALS 64 @@ -184,9 +251,11 @@ lnet_eq_alloc (void) struct lnet_res_container *rec = &the_lnet.ln_eq_container; lnet_eq_t *eq; - lnet_res_lock(); + LASSERT(LNET_CPT_NUMBER == 1); + + lnet_res_lock(0); eq = (lnet_eq_t *)lnet_freelist_alloc(&rec->rec_freelist); - lnet_res_unlock(); + lnet_res_unlock(0); return eq; } @@ -197,27 +266,30 @@ lnet_eq_free_locked(lnet_eq_t *eq) /* ALWAYS called with resource lock held */ struct lnet_res_container *rec = &the_lnet.ln_eq_container; + LASSERT(LNET_CPT_NUMBER == 1); lnet_freelist_free(&rec->rec_freelist, eq); } static inline void lnet_eq_free(lnet_eq_t *eq) { - lnet_res_lock(); + lnet_res_lock(0); lnet_eq_free_locked(eq); - lnet_res_unlock(); + lnet_res_unlock(0); } static inline lnet_libmd_t * lnet_md_alloc (lnet_md_t *umd) { /* NEVER called with resource lock held */ - struct lnet_res_container *rec = &the_lnet.ln_md_container; + struct lnet_res_container *rec = the_lnet.ln_md_containers[0]; lnet_libmd_t *md; - lnet_res_lock(); + LASSERT(LNET_CPT_NUMBER == 1); + + lnet_res_lock(0); md = (lnet_libmd_t *)lnet_freelist_alloc(&rec->rec_freelist); - lnet_res_unlock(); + lnet_res_unlock(0); if (md != NULL) CFS_INIT_LIST_HEAD(&md->md_list); @@ -229,29 +301,32 @@ static inline void lnet_md_free_locked(lnet_libmd_t *md) { /* ALWAYS called with resource lock held */ - struct lnet_res_container *rec = &the_lnet.ln_md_container; + struct lnet_res_container *rec = the_lnet.ln_md_containers[0]; + LASSERT(LNET_CPT_NUMBER == 1); lnet_freelist_free(&rec->rec_freelist, md); } static inline void lnet_md_free(lnet_libmd_t *md) { - lnet_res_lock(); + lnet_res_lock(0); lnet_md_free_locked(md); - lnet_res_unlock(); + lnet_res_unlock(0); } static inline lnet_me_t * lnet_me_alloc(void) { /* NEVER called with resource lock held */ - struct lnet_res_container *rec = &the_lnet.ln_me_container; + struct lnet_res_container *rec = the_lnet.ln_me_containers[0]; lnet_me_t *me; - lnet_res_lock(); + LASSERT(LNET_CPT_NUMBER == 1); + + lnet_res_lock(0); me = (lnet_me_t *)lnet_freelist_alloc(&rec->rec_freelist); - lnet_res_unlock(); + lnet_res_unlock(0); return me; } @@ -260,17 +335,18 @@ static inline void lnet_me_free_locked(lnet_me_t *me) { /* ALWAYS called with resource lock held */ - struct lnet_res_container *rec = &the_lnet.ln_me_container; + struct lnet_res_container *rec = the_lnet.ln_me_containers[0]; + LASSERT(LNET_CPT_NUMBER == 1); lnet_freelist_free(&rec->rec_freelist, me); } static inline void lnet_me_free(lnet_me_t *me) { - lnet_res_lock(); + lnet_res_lock(0); lnet_me_free_locked(me); - lnet_res_unlock(); + lnet_res_unlock(0); } static inline lnet_msg_t * @@ -432,6 +508,7 @@ static inline void lnet_res_lh_invalidate(lnet_libhandle_t *lh) { /* ALWAYS called with resource lock held */ + /* NB: cookie is still useful, don't reset it */ cfs_list_del(&lh->lh_hash_chain); } @@ -470,8 +547,11 @@ lnet_handle2md(lnet_handle_md_t *handle) { /* ALWAYS called with resource lock held */ lnet_libhandle_t *lh; + int cpt; - lh = lnet_res_lh_lookup(&the_lnet.ln_md_container, handle->cookie); + cpt = lnet_cpt_of_cookie(handle->cookie); + lh = lnet_res_lh_lookup(the_lnet.ln_md_containers[cpt], + handle->cookie); if (lh == NULL) return NULL; @@ -483,11 +563,13 @@ lnet_wire_handle2md(lnet_handle_wire_t *wh) { /* ALWAYS called with resource lock held */ lnet_libhandle_t *lh; + int cpt; if (wh->wh_interface_cookie != the_lnet.ln_interface_cookie) return NULL; - lh = lnet_res_lh_lookup(&the_lnet.ln_md_container, + cpt = lnet_cpt_of_cookie(wh->wh_object_cookie); + lh = lnet_res_lh_lookup(the_lnet.ln_md_containers[cpt], wh->wh_object_cookie); if (lh == NULL) return NULL; @@ -506,8 +588,11 @@ lnet_handle2me(lnet_handle_me_t *handle) { /* ALWAYS called with resource lock held */ lnet_libhandle_t *lh; + int cpt; - lh = lnet_res_lh_lookup(&the_lnet.ln_me_container, handle->cookie); + cpt = lnet_cpt_of_cookie(handle->cookie); + lh = lnet_res_lh_lookup(the_lnet.ln_me_containers[cpt], + handle->cookie); if (lh == NULL) return NULL; @@ -606,6 +691,7 @@ lnet_set_msg_uid(lnet_ni_t *ni, lnet_msg_t *msg, lnet_uid_t uid) } #endif +extern int lnet_cpt_of_nid(lnet_nid_t nid); extern lnet_ni_t *lnet_nid2ni_locked (lnet_nid_t nid); extern lnet_ni_t *lnet_net2ni_locked (__u32 net); static inline lnet_ni_t * diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index 2e789fa..63ef7c3 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -241,14 +241,14 @@ typedef struct lnet_libhandle { ((type *)((char *)(ptr)-(char *)(&((type *)0)->member))) typedef struct lnet_eq { - cfs_list_t eq_list; - lnet_libhandle_t eq_lh; - lnet_seq_t eq_enq_seq; - lnet_seq_t eq_deq_seq; - unsigned int eq_size; - lnet_event_t *eq_events; - int eq_refcount; - lnet_eq_handler_t eq_callback; + cfs_list_t eq_list; + lnet_libhandle_t eq_lh; + lnet_seq_t eq_enq_seq; + lnet_seq_t eq_deq_seq; + unsigned int eq_size; + lnet_eq_handler_t eq_callback; + lnet_event_t *eq_events; + int **eq_refs; /* percpt refcount for EQ */ } lnet_eq_t; typedef struct lnet_me { @@ -549,6 +549,10 @@ enum { LNET_MATCHMD_OK = (1 << 1), /* Must be discarded */ LNET_MATCHMD_DROP = (1 << 2), + /* match and buffer is exhausted */ + LNET_MATCHMD_EXHAUSTED = (1 << 3), + /* match or drop */ + LNET_MATCHMD_FINISH = (LNET_MATCHMD_OK | LNET_MATCHMD_DROP), }; /* Options for lnet_portal_t::ptl_options */ @@ -575,19 +579,38 @@ struct lnet_match_table { /* reserved for upcoming patches, CPU partition ID */ unsigned int mt_cpt; unsigned int mt_portal; /* portal index */ + /* match table is set as "enabled" if there's non-exhausted MD + * attached on mt_mlist, it's only valide for wildcard portal */ + unsigned int mt_enabled; cfs_list_t mt_mlist; /* matching list */ cfs_list_t *mt_mhash; /* matching hash */ }; typedef struct lnet_portal { +#ifdef __KERNEL__ + cfs_spinlock_t ptl_lock; +#else +# ifndef HAVE_LIBPTHREAD + int ptl_lock; +# else + pthread_mutex_t ptl_lock; +# endif +#endif unsigned int ptl_index; /* portal ID, reserved */ /* flags on this portal: lazy, unique... */ unsigned int ptl_options; - /* Now we only have single instance for each portal, - * will have instance per CPT in upcoming patches */ - struct lnet_match_table *ptl_mtable; + /* list of messags which are stealing buffer */ + cfs_list_t ptl_msg_stealing; /* messages blocking for MD */ - cfs_list_t ptl_msgq; + cfs_list_t ptl_msg_delayed; + /* Match table for each CPT */ + struct lnet_match_table **ptl_mtables; + /* spread rotor of incoming "PUT" */ + int ptl_rotor; + /* # active entries for this portal */ + int ptl_mt_nmaps; + /* array of active entries' cpu-partition-id */ + int ptl_mt_maps[0]; } lnet_portal_t; #define LNET_LH_HASH_BITS 12 @@ -627,39 +650,37 @@ struct lnet_msg_container { typedef struct { - /* Stuff initialised at LNetInit() */ - int ln_init; /* LNetInit() called? */ - int ln_refcount; /* LNetNIInit/LNetNIFini counter */ - int ln_niinit_self; /* Have I called LNetNIInit myself? */ - /* shutdown in progress */ - int ln_shutdown; - /* registered LNDs */ - cfs_list_t ln_lnds; + /* CPU partition table of LNet */ + struct cfs_cpt_table *ln_cpt_table; + /* number of CPTs in ln_cpt_table */ + unsigned int ln_cpt_number; + unsigned int ln_cpt_bits; #ifdef __KERNEL__ cfs_spinlock_t ln_lock; cfs_mutex_t ln_api_mutex; cfs_mutex_t ln_lnd_mutex; cfs_waitq_t ln_eq_waitq; - cfs_spinlock_t ln_res_lock; + cfs_spinlock_t ln_eq_wait_lock; #else # ifndef HAVE_LIBPTHREAD int ln_lock; int ln_api_mutex; int ln_lnd_mutex; - int ln_res_lock; + int ln_eq_wait_lock; # else pthread_mutex_t ln_lock; pthread_mutex_t ln_api_mutex; pthread_mutex_t ln_lnd_mutex; pthread_cond_t ln_eq_cond; - pthread_mutex_t ln_res_lock; + pthread_mutex_t ln_eq_wait_lock; # endif #endif + struct cfs_percpt_lock *ln_res_lock; /* ME container */ - struct lnet_res_container ln_me_container; + struct lnet_res_container **ln_me_containers; /* MD container */ - struct lnet_res_container ln_md_container; + struct lnet_res_container **ln_md_containers; /* Event Queue container */ struct lnet_res_container ln_eq_container; @@ -668,6 +689,16 @@ typedef struct /* the vector of portals */ lnet_portal_t **ln_portals; + int ln_init; /* LNetInit() called? */ + /* LNetNIInit/LNetNIFini counter */ + int ln_refcount; + /* Have I called LNetNIInit myself? */ + int ln_niinit_self; + /* shutdown in progress */ + int ln_shutdown; + /* registered LNDs */ + cfs_list_t ln_lnds; + lnet_pid_t ln_pid; /* requested pid */ cfs_list_t ln_nis; /* LND instances */ diff --git a/lnet/include/lnet/types.h b/lnet/include/lnet/types.h index 9f9cd8f..5a0270f 100644 --- a/lnet/include/lnet/types.h +++ b/lnet/include/lnet/types.h @@ -159,8 +159,12 @@ typedef enum { * or after the last item in the list. */ typedef enum { - LNET_INS_BEFORE, - LNET_INS_AFTER + /** insert ME before current position or head of the list */ + LNET_INS_BEFORE, + /** insert ME after current position or tail of the list */ + LNET_INS_AFTER, + /** attach ME at tail of local CPU partition ME list */ + LNET_INS_LOCAL } lnet_ins_pos_t; /** @} lnet_me */ diff --git a/lnet/lnet/api-ni.c b/lnet/lnet/api-ni.c index b88704e..a2e9df8 100644 --- a/lnet/lnet/api-ni.c +++ b/lnet/lnet/api-ni.c @@ -92,7 +92,7 @@ void lnet_init_locks(void) { cfs_spin_lock_init(&the_lnet.ln_lock); - cfs_spin_lock_init(&the_lnet.ln_res_lock); + cfs_spin_lock_init(&the_lnet.ln_eq_wait_lock); cfs_waitq_init(&the_lnet.ln_eq_waitq); cfs_mutex_init(&the_lnet.ln_lnd_mutex); cfs_mutex_init(&the_lnet.ln_api_mutex); @@ -175,7 +175,7 @@ lnet_get_networks (void) void lnet_init_locks(void) { the_lnet.ln_lock = 0; - the_lnet.ln_res_lock = 0; + the_lnet.ln_eq_wait_lock = 0; the_lnet.ln_lnd_mutex = 0; the_lnet.ln_api_mutex = 0; } @@ -185,7 +185,7 @@ void lnet_fini_locks(void) LASSERT(the_lnet.ln_api_mutex == 0); LASSERT(the_lnet.ln_lnd_mutex == 0); LASSERT(the_lnet.ln_lock == 0); - LASSERT(the_lnet.ln_res_lock == 0); + LASSERT(the_lnet.ln_eq_wait_lock == 0); } # else @@ -194,7 +194,7 @@ void lnet_init_locks(void) { pthread_cond_init(&the_lnet.ln_eq_cond, NULL); pthread_mutex_init(&the_lnet.ln_lock, NULL); - pthread_mutex_init(&the_lnet.ln_res_lock, NULL); + pthread_mutex_init(&the_lnet.ln_eq_wait_lock, NULL); pthread_mutex_init(&the_lnet.ln_lnd_mutex, NULL); pthread_mutex_init(&the_lnet.ln_api_mutex, NULL); } @@ -204,13 +204,37 @@ void lnet_fini_locks(void) pthread_mutex_destroy(&the_lnet.ln_api_mutex); pthread_mutex_destroy(&the_lnet.ln_lnd_mutex); pthread_mutex_destroy(&the_lnet.ln_lock); - pthread_mutex_destroy(&the_lnet.ln_res_lock); + pthread_mutex_destroy(&the_lnet.ln_eq_wait_lock); pthread_cond_destroy(&the_lnet.ln_eq_cond); } # endif #endif +static int +lnet_create_locks(void) +{ + lnet_init_locks(); + + the_lnet.ln_res_lock = cfs_percpt_lock_alloc(lnet_cpt_table()); + if (the_lnet.ln_res_lock != NULL) + return 0; + + lnet_fini_locks(); + return -ENOMEM; +} + +static void +lnet_destroy_locks(void) +{ + if (the_lnet.ln_res_lock != NULL) { + cfs_percpt_lock_free(the_lnet.ln_res_lock); + the_lnet.ln_res_lock = NULL; + } + + lnet_fini_locks(); +} + void lnet_assert_wire_constants (void) { /* Wire protocol assertions generated by 'wirecheck' @@ -486,7 +510,7 @@ lnet_res_container_cleanup(struct lnet_res_container *rec) int lnet_res_container_setup(struct lnet_res_container *rec, - int type, int objnum, int objsz) + int cpt, int type, int objnum, int objsz) { int rc = 0; int i; @@ -502,11 +526,11 @@ lnet_res_container_setup(struct lnet_res_container *rec, if (rc != 0) goto out; #endif - rec->rec_lh_cookie = type; + rec->rec_lh_cookie = (cpt << LNET_COOKIE_TYPE_BITS) | type; /* Arbitrary choice of hash table size */ - LIBCFS_ALLOC(rec->rec_lh_hash, - LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0])); + LIBCFS_CPT_ALLOC(rec->rec_lh_hash, lnet_cpt_table(), cpt, + LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0])); if (rec->rec_lh_hash == NULL) { rc = -ENOMEM; goto out; @@ -524,6 +548,44 @@ out: return rc; } +static void +lnet_res_containers_destroy(struct lnet_res_container **recs) +{ + struct lnet_res_container *rec; + int i; + + cfs_percpt_for_each(rec, i, recs) + lnet_res_container_cleanup(rec); + + cfs_percpt_free(recs); +} + +static struct lnet_res_container ** +lnet_res_containers_create(int type, int objnum, int objsz) +{ + struct lnet_res_container **recs; + struct lnet_res_container *rec; + int rc; + int i; + + recs = cfs_percpt_alloc(lnet_cpt_table(), sizeof(*rec)); + if (recs == NULL) { + CERROR("Failed to allocate %s resource containers\n", + lnet_res_type2str(type)); + return NULL; + } + + cfs_percpt_for_each(rec, i, recs) { + rc = lnet_res_container_setup(rec, i, type, objnum, objsz); + if (rc != 0) { + lnet_res_containers_destroy(recs); + return NULL; + } + } + + return recs; +} + lnet_libhandle_t * lnet_res_lh_lookup(struct lnet_res_container *rec, __u64 cookie) { @@ -535,7 +597,7 @@ lnet_res_lh_lookup(struct lnet_res_container *rec, __u64 cookie) if ((cookie & (LNET_COOKIE_TYPES - 1)) != rec->rec_type) return NULL; - hash = cookie >> LNET_COOKIE_TYPE_BITS; + hash = cookie >> (LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS); head = &rec->rec_lh_hash[hash & LNET_LH_HASH_MASK]; cfs_list_for_each_entry(lh, head, lh_hash_chain) { @@ -550,7 +612,7 @@ void lnet_res_lh_initialize(struct lnet_res_container *rec, lnet_libhandle_t *lh) { /* ALWAYS called with lnet_res_lock held */ - unsigned int ibits = LNET_COOKIE_TYPE_BITS; + unsigned int ibits = LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS; unsigned int hash; lh->lh_cookie = rec->rec_lh_cookie; @@ -578,7 +640,8 @@ int lnet_prepare(lnet_pid_t requested_pid) { /* Prepare to bring up the network */ - int rc = 0; + struct lnet_res_container **recs; + int rc = 0; LASSERT (the_lnet.ln_refcount == 0); @@ -624,31 +687,26 @@ lnet_prepare(lnet_pid_t requested_pid) if (rc != 0) goto failed1; - rc = lnet_res_container_setup(&the_lnet.ln_eq_container, + rc = lnet_res_container_setup(&the_lnet.ln_eq_container, 0, LNET_COOKIE_TYPE_EQ, LNET_FL_MAX_EQS, sizeof(lnet_eq_t)); - if (rc != 0) { - CERROR("Failed to create EQ container for LNet: %d\n", rc); + if (rc != 0) goto failed2; - } - /* NB: we will have instance of ME container per CPT soon */ - rc = lnet_res_container_setup(&the_lnet.ln_me_container, - LNET_COOKIE_TYPE_ME, LNET_FL_MAX_MES, - sizeof(lnet_me_t)); - if (rc != 0) { - CERROR("Failed to create ME container for LNet: %d\n", rc); + recs = lnet_res_containers_create(LNET_COOKIE_TYPE_ME, LNET_FL_MAX_MES, + sizeof(lnet_me_t)); + if (recs == NULL) goto failed3; - } + + the_lnet.ln_me_containers = recs; /* NB: we will have instance of MD container per CPT soon */ - rc = lnet_res_container_setup(&the_lnet.ln_md_container, - LNET_COOKIE_TYPE_MD, LNET_FL_MAX_MDS, - sizeof(lnet_libmd_t)); - if (rc != 0) { - CERROR("Failed to create MD container for LNet: %d\n", rc); + recs = lnet_res_containers_create(LNET_COOKIE_TYPE_MD, LNET_FL_MAX_MDS, + sizeof(lnet_libmd_t)); + if (recs == NULL) goto failed3; - } + + the_lnet.ln_md_containers = recs; rc = lnet_portals_create(); if (rc != 0) { @@ -661,8 +719,14 @@ lnet_prepare(lnet_pid_t requested_pid) failed3: /* NB: lnet_res_container_cleanup is safe to call for * uninitialized container */ - lnet_res_container_cleanup(&the_lnet.ln_md_container); - lnet_res_container_cleanup(&the_lnet.ln_me_container); + if (the_lnet.ln_md_containers != NULL) { + lnet_res_containers_destroy(the_lnet.ln_md_containers); + the_lnet.ln_md_containers = NULL; + } + if (the_lnet.ln_me_containers != NULL) { + lnet_res_containers_destroy(the_lnet.ln_me_containers); + the_lnet.ln_me_containers = NULL; + } lnet_res_container_cleanup(&the_lnet.ln_eq_container); failed2: lnet_msg_container_cleanup(&the_lnet.ln_msg_container); @@ -690,8 +754,16 @@ lnet_unprepare (void) lnet_portals_destroy(); - lnet_res_container_cleanup(&the_lnet.ln_md_container); - lnet_res_container_cleanup(&the_lnet.ln_me_container); + if (the_lnet.ln_md_containers != NULL) { + lnet_res_containers_destroy(the_lnet.ln_md_containers); + the_lnet.ln_md_containers = NULL; + } + + if (the_lnet.ln_me_containers != NULL) { + lnet_res_containers_destroy(the_lnet.ln_me_containers); + the_lnet.ln_me_containers = NULL; + } + lnet_res_container_cleanup(&the_lnet.ln_eq_container); lnet_free_rtrpools(); @@ -719,6 +791,30 @@ lnet_net2ni_locked (__u32 net) return NULL; } +unsigned int +lnet_nid_cpt_hash(lnet_nid_t nid) +{ + __u64 key = nid; + unsigned int val; + + val = cfs_hash_long(key, LNET_CPT_BITS); + /* NB: LNET_CP_NUMBER doesn't have to be PO2 */ + if (val < LNET_CPT_NUMBER) + return val; + + return (unsigned int)((key + val + (val >> 1)) % LNET_CPT_NUMBER); +} + +int +lnet_cpt_of_nid(lnet_nid_t nid) +{ + if (LNET_CPT_NUMBER == 1) + return 0; /* the only one */ + + return lnet_nid_cpt_hash(nid); +} +EXPORT_SYMBOL(lnet_cpt_of_nid); + int lnet_islocalnet (__u32 net) { @@ -1065,12 +1161,35 @@ lnet_startup_lndnis (void) int LNetInit(void) { + int rc; + lnet_assert_wire_constants (); LASSERT (!the_lnet.ln_init); memset(&the_lnet, 0, sizeof(the_lnet)); - lnet_init_locks(); + /* refer to global cfs_cpt_table for now */ + the_lnet.ln_cpt_table = cfs_cpt_table; + the_lnet.ln_cpt_number = cfs_cpt_number(cfs_cpt_table); + + LASSERT(the_lnet.ln_cpt_number > 0); + if (the_lnet.ln_cpt_number > LNET_CPT_MAX) { + /* we are under risk of consuming all lh_cookie */ + CERROR("Can't have %d CPTs for LNet (max allowed is %d), " + "please change setting of CPT-table and retry\n", + the_lnet.ln_cpt_number, LNET_CPT_MAX); + return -1; + } + + while ((1 << the_lnet.ln_cpt_bits) < the_lnet.ln_cpt_number) + the_lnet.ln_cpt_bits++; + + rc = lnet_create_locks(); + if (rc != 0) { + CERROR("Can't create LNet global locks: %d\n", rc); + return -1; + } + the_lnet.ln_refcount = 0; the_lnet.ln_init = 1; LNetInvalidateHandle(&the_lnet.ln_rc_eqh); @@ -1108,15 +1227,15 @@ LNetInit(void) void LNetFini(void) { - LASSERT (the_lnet.ln_init); - LASSERT (the_lnet.ln_refcount == 0); + LASSERT(the_lnet.ln_init); + LASSERT(the_lnet.ln_refcount == 0); - while (!cfs_list_empty(&the_lnet.ln_lnds)) - lnet_unregister_lnd(cfs_list_entry(the_lnet.ln_lnds.next, - lnd_t, lnd_list)); - lnet_fini_locks(); + while (!cfs_list_empty(&the_lnet.ln_lnds)) + lnet_unregister_lnd(cfs_list_entry(the_lnet.ln_lnds.next, + lnd_t, lnd_list)); + lnet_destroy_locks(); - the_lnet.ln_init = 0; + the_lnet.ln_init = 0; } /** diff --git a/lnet/lnet/lib-eq.c b/lnet/lnet/lib-eq.c index 059afa2..dde4f45 100644 --- a/lnet/lnet/lib-eq.c +++ b/lnet/lnet/lib-eq.c @@ -97,10 +97,8 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback, if (count != 0) { LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t)); - if (eq->eq_events == NULL) { - lnet_eq_free(eq); - return -ENOMEM; - } + if (eq->eq_events == NULL) + goto failed; /* NB allocator has set all event sequence numbers to 0, * so all them should be earlier than eq_deq_seq */ } @@ -108,18 +106,37 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback, eq->eq_deq_seq = 1; eq->eq_enq_seq = 1; eq->eq_size = count; - eq->eq_refcount = 0; eq->eq_callback = callback; - lnet_res_lock(); + eq->eq_refs = cfs_percpt_alloc(lnet_cpt_table(), + sizeof(*eq->eq_refs[0])); + if (eq->eq_refs == NULL) + goto failed; + + /* MUST hold both exclusive lnet_res_lock */ + lnet_res_lock(LNET_LOCK_EX); + /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do + * both EQ lookup and poll event with only lnet_eq_wait_lock */ + lnet_eq_wait_lock(); lnet_res_lh_initialize(&the_lnet.ln_eq_container, &eq->eq_lh); cfs_list_add(&eq->eq_list, &the_lnet.ln_eq_container.rec_active); - lnet_res_unlock(); + lnet_eq_wait_unlock(); + lnet_res_unlock(LNET_LOCK_EX); lnet_eq2handle(handle, eq); return 0; + +failed: + if (eq->eq_events != NULL) + LIBCFS_FREE(eq->eq_events, count * sizeof(lnet_event_t)); + + if (eq->eq_refs != NULL) + cfs_percpt_free(eq->eq_refs); + + lnet_eq_free(eq); + return -ENOMEM; } /** @@ -135,48 +152,63 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback, int LNetEQFree(lnet_handle_eq_t eqh) { - lnet_eq_t *eq; - int size; - lnet_event_t *events; - - LASSERT (the_lnet.ln_init); - LASSERT (the_lnet.ln_refcount > 0); - - lnet_res_lock(); + struct lnet_eq *eq; + lnet_event_t *events = NULL; + int **refs = NULL; + int *ref; + int rc = 0; + int size = 0; + int i; + + LASSERT(the_lnet.ln_init); + LASSERT(the_lnet.ln_refcount > 0); + + lnet_res_lock(LNET_LOCK_EX); + /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do + * both EQ lookup and poll event with only lnet_eq_wait_lock */ + lnet_eq_wait_lock(); eq = lnet_handle2eq(&eqh); if (eq == NULL) { - lnet_res_unlock(); - return -ENOENT; + rc = -ENOENT; + goto out; } - if (eq->eq_refcount != 0) { - CDEBUG(D_NET, "Event queue (%d) busy on destroy.\n", - eq->eq_refcount); - lnet_res_unlock(); - return -EBUSY; + cfs_percpt_for_each(ref, i, eq->eq_refs) { + LASSERT(*ref >= 0); + if (*ref == 0) + continue; + + CDEBUG(D_NET, "Event equeue (%d: %d) busy on destroy.\n", + i, *ref); + rc = -EBUSY; + goto out; } /* stash for free after lock dropped */ events = eq->eq_events; size = eq->eq_size; + refs = eq->eq_refs; lnet_res_lh_invalidate(&eq->eq_lh); cfs_list_del(&eq->eq_list); lnet_eq_free_locked(eq); - - lnet_res_unlock(); + out: + lnet_eq_wait_unlock(); + lnet_res_unlock(LNET_LOCK_EX); if (events != NULL) LIBCFS_FREE(events, size * sizeof(lnet_event_t)); + if (refs != NULL) + cfs_percpt_free(refs); - return 0; + return rc; } void lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev) { - /* MUST called with resource lock hold */ + /* MUST called with resource lock hold but w/o lnet_eq_wait_lock */ int index; if (eq->eq_size == 0) { @@ -185,6 +217,7 @@ lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev) return; } + lnet_eq_wait_lock(); ev->sequence = eq->eq_enq_seq++; LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size)); @@ -207,19 +240,20 @@ lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev) pthread_cond_broadcast(&the_lnet.ln_eq_cond); # endif #endif + lnet_eq_wait_unlock(); } int lnet_eq_dequeue_event(lnet_eq_t *eq, lnet_event_t *ev) { - int new_index = eq->eq_deq_seq & (eq->eq_size - 1); - lnet_event_t *new_event = &eq->eq_events[new_index]; - int rc; - ENTRY; + int new_index = eq->eq_deq_seq & (eq->eq_size - 1); + lnet_event_t *new_event = &eq->eq_events[new_index]; + int rc; + ENTRY; - if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) { - RETURN(0); - } + /* must called with lnet_eq_wait_lock hold */ + if (LNET_SEQ_GT(eq->eq_deq_seq, new_event->sequence)) + RETURN(0); /* We've got a new event... */ *ev = *new_event; @@ -309,7 +343,7 @@ lnet_eq_wait_locked(int *timeout_ms) cfs_set_current_state(CFS_TASK_INTERRUPTIBLE); cfs_waitq_add(&the_lnet.ln_eq_waitq, &wl); - lnet_res_unlock(); + lnet_eq_wait_unlock(); if (tms < 0) { cfs_waitq_wait(&wl, CFS_TASK_INTERRUPTIBLE); @@ -329,7 +363,7 @@ lnet_eq_wait_locked(int *timeout_ms) wait = tms != 0; /* might need to call here again */ *timeout_ms = tms; - lnet_res_lock(); + lnet_eq_wait_lock(); cfs_waitq_del(&the_lnet.ln_eq_waitq, &wl); return wait; @@ -342,10 +376,11 @@ static void lnet_eq_cond_wait(struct timespec *ts) { if (ts == NULL) { - pthread_cond_wait(&the_lnet.ln_eq_cond, &the_lnet.ln_res_lock); + pthread_cond_wait(&the_lnet.ln_eq_cond, + &the_lnet.ln_eq_wait_lock); } else { pthread_cond_timedwait(&the_lnet.ln_eq_cond, - &the_lnet.ln_res_lock, ts); + &the_lnet.ln_eq_wait_lock, ts); } } # endif @@ -362,14 +397,14 @@ lnet_eq_wait_locked(int *timeout_ms) if (the_lnet.ln_eq_waitni != NULL) { /* I have a single NI that I have to call into, to get * events queued, or to block. */ - lnet_res_unlock(); + lnet_eq_wait_unlock(); LNET_LOCK(); eq_waitni = the_lnet.ln_eq_waitni; if (unlikely(eq_waitni == NULL)) { LNET_UNLOCK(); - lnet_res_lock(); + lnet_eq_wait_lock(); return -1; } @@ -392,7 +427,7 @@ lnet_eq_wait_locked(int *timeout_ms) } lnet_ni_decref(eq_waitni); - lnet_res_lock(); + lnet_eq_wait_lock(); } else { /* w/o eq_waitni */ # ifndef HAVE_LIBPTHREAD /* If I'm single-threaded, LNET fails at startup if it can't @@ -480,30 +515,30 @@ LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms, if (neq < 1) RETURN(-ENOENT); - lnet_res_lock(); + lnet_eq_wait_lock(); for (;;) { #ifndef __KERNEL__ - lnet_res_unlock(); + lnet_eq_wait_unlock(); /* Recursion breaker */ if (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING && !LNetHandleIsEqual(eventqs[0], the_lnet.ln_rc_eqh)) lnet_router_checker(); - lnet_res_lock(); + lnet_eq_wait_lock(); #endif for (i = 0; i < neq; i++) { lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]); if (eq == NULL) { - lnet_res_unlock(); + lnet_eq_wait_unlock(); RETURN(-ENOENT); } rc = lnet_eq_dequeue_event(eq, event); if (rc != 0) { - lnet_res_unlock(); + lnet_eq_wait_unlock(); *which = i; RETURN(rc); } @@ -524,6 +559,6 @@ LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms, break; } - lnet_res_unlock(); + lnet_eq_wait_unlock(); RETURN(0); } diff --git a/lnet/lnet/lib-md.c b/lnet/lnet/lib-md.c index dbb007e..96b4660 100644 --- a/lnet/lnet/lib-md.c +++ b/lnet/lnet/lib-md.c @@ -71,12 +71,14 @@ lnet_md_unlink(lnet_libmd_t *md) CDEBUG(D_NET, "Unlinking md %p\n", md); if (md->md_eq != NULL) { - md->md_eq->eq_refcount--; - LASSERT (md->md_eq->eq_refcount >= 0); - } + int cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie); + + LASSERT(*md->md_eq->eq_refs[cpt] > 0); + (*md->md_eq->eq_refs[cpt])--; + } - LASSERT (!cfs_list_empty(&md->md_list)); - cfs_list_del_init (&md->md_list); + LASSERT(!cfs_list_empty(&md->md_list)); + cfs_list_del_init(&md->md_list); lnet_md_free_locked(md); } @@ -163,9 +165,9 @@ lnet_md_build(lnet_libmd_t *lmd, lnet_md_t *umd, int unlink) /* must be called with resource lock held */ static int -lnet_md_link(lnet_libmd_t *md, lnet_handle_eq_t eq_handle) +lnet_md_link(lnet_libmd_t *md, lnet_handle_eq_t eq_handle, int cpt) { - struct lnet_res_container *container = &the_lnet.ln_md_container; + struct lnet_res_container *container = the_lnet.ln_md_containers[cpt]; /* NB we are passed an allocated, but inactive md. * if we return success, caller may lnet_md_unlink() it. @@ -185,7 +187,7 @@ lnet_md_link(lnet_libmd_t *md, lnet_handle_eq_t eq_handle) if (md->md_eq == NULL) return -ENOENT; - md->md_eq->eq_refcount++; + (*md->md_eq->eq_refs[cpt])++; } lnet_res_lh_initialize(container, &md->md_lh); @@ -263,11 +265,12 @@ int LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle) { - CFS_LIST_HEAD (matches); - CFS_LIST_HEAD (drops); - lnet_me_t *me; - lnet_libmd_t *md; - int rc; + CFS_LIST_HEAD (matches); + CFS_LIST_HEAD (drops); + struct lnet_me *me; + struct lnet_libmd *md; + int cpt; + int rc; LASSERT (the_lnet.ln_init); LASSERT (the_lnet.ln_refcount > 0); @@ -285,8 +288,9 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd, return -ENOMEM; rc = lnet_md_build(md, &umd, unlink); + cpt = lnet_cpt_of_cookie(meh.cookie); - lnet_res_lock(); + lnet_res_lock(cpt); if (rc != 0) goto failed; @@ -296,7 +300,7 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd, else if (me->me_md != NULL) rc = -EBUSY; else - rc = lnet_md_link(md, umd.eq_handle); + rc = lnet_md_link(md, umd.eq_handle, cpt); if (rc != 0) goto failed; @@ -307,7 +311,7 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd, lnet_md2handle(handle, md); - lnet_res_unlock(); + lnet_res_unlock(cpt); lnet_drop_delayed_msg_list(&drops, "Bad match"); lnet_recv_delayed_msg_list(&matches); @@ -317,7 +321,7 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd, failed: lnet_md_free_locked(md); - lnet_res_unlock(); + lnet_res_unlock(cpt); return rc; } @@ -340,8 +344,9 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd, int LNetMDBind(lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle) { - lnet_libmd_t *md; - int rc; + lnet_libmd_t *md; + int cpt; + int rc; LASSERT (the_lnet.ln_init); LASSERT (the_lnet.ln_refcount > 0); @@ -360,23 +365,23 @@ LNetMDBind(lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle) rc = lnet_md_build(md, &umd, unlink); - lnet_res_lock(); + cpt = lnet_res_lock_current(); if (rc != 0) goto failed; - rc = lnet_md_link(md, umd.eq_handle); + rc = lnet_md_link(md, umd.eq_handle, cpt); if (rc != 0) goto failed; lnet_md2handle(handle, md); - lnet_res_unlock(); + lnet_res_unlock(cpt); return 0; failed: lnet_md_free_locked(md); - lnet_res_unlock(); + lnet_res_unlock(cpt); return rc; } @@ -412,17 +417,19 @@ LNetMDBind(lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle) int LNetMDUnlink (lnet_handle_md_t mdh) { - lnet_event_t ev; - lnet_libmd_t *md; + lnet_event_t ev; + lnet_libmd_t *md; + int cpt; - LASSERT (the_lnet.ln_init); - LASSERT (the_lnet.ln_refcount > 0); + LASSERT(the_lnet.ln_init); + LASSERT(the_lnet.ln_refcount > 0); - lnet_res_lock(); + cpt = lnet_cpt_of_cookie(mdh.cookie); + lnet_res_lock(cpt); md = lnet_handle2md(&mdh); if (md == NULL) { - lnet_res_unlock(); + lnet_res_unlock(cpt); return -ENOENT; } @@ -438,6 +445,6 @@ LNetMDUnlink (lnet_handle_md_t mdh) lnet_md_unlink(md); - lnet_res_unlock(); + lnet_res_unlock(cpt); return 0; } diff --git a/lnet/lnet/lib-me.c b/lnet/lnet/lib-me.c index d4f325d..4324961 100644 --- a/lnet/lnet/lib-me.c +++ b/lnet/lnet/lib-me.c @@ -78,14 +78,14 @@ LNetMEAttach(unsigned int portal, lnet_handle_me_t *handle) { struct lnet_match_table *mtable; - lnet_me_t *me; - cfs_list_t *head; + struct lnet_me *me; + cfs_list_t *head; - LASSERT (the_lnet.ln_init); - LASSERT (the_lnet.ln_refcount > 0); + LASSERT(the_lnet.ln_init); + LASSERT(the_lnet.ln_refcount > 0); - if ((int)portal >= the_lnet.ln_nportals) - return -EINVAL; + if ((int)portal >= the_lnet.ln_nportals) + return -EINVAL; mtable = lnet_mt_of_attach(portal, match_id, match_bits, ignore_bits, pos); @@ -96,7 +96,7 @@ LNetMEAttach(unsigned int portal, if (me == NULL) return -ENOMEM; - lnet_res_lock(); + lnet_res_lock(mtable->mt_cpt); me->me_portal = portal; me->me_match_id = match_id; @@ -105,20 +105,19 @@ LNetMEAttach(unsigned int portal, me->me_unlink = unlink; me->me_md = NULL; - lnet_res_lh_initialize(&the_lnet.ln_me_container, &me->me_lh); + lnet_res_lh_initialize(the_lnet.ln_me_containers[mtable->mt_cpt], + &me->me_lh); head = lnet_mt_match_head(mtable, match_id, match_bits); - LASSERT (head != NULL); - if (pos == LNET_INS_AFTER) - cfs_list_add_tail(&me->me_list, head); - else - cfs_list_add(&me->me_list, head); - - lnet_me2handle(handle, me); + if (pos == LNET_INS_AFTER || pos == LNET_INS_LOCAL) + cfs_list_add_tail(&me->me_list, head); + else + cfs_list_add(&me->me_list, head); - lnet_res_unlock(); + lnet_me2handle(handle, me); - return 0; + lnet_res_unlock(mtable->mt_cpt); + return 0; } /** @@ -145,24 +144,30 @@ LNetMEInsert(lnet_handle_me_t current_meh, lnet_unlink_t unlink, lnet_ins_pos_t pos, lnet_handle_me_t *handle) { - lnet_me_t *current_me; - lnet_me_t *new_me; - lnet_portal_t *ptl; + struct lnet_me *current_me; + struct lnet_me *new_me; + struct lnet_portal *ptl; + int cpt; + + LASSERT(the_lnet.ln_init); + LASSERT(the_lnet.ln_refcount > 0); - LASSERT (the_lnet.ln_init); - LASSERT (the_lnet.ln_refcount > 0); + if (pos == LNET_INS_LOCAL) + return -EPERM; new_me = lnet_me_alloc(); if (new_me == NULL) return -ENOMEM; - lnet_res_lock(); + cpt = lnet_cpt_of_cookie(current_meh.cookie); + + lnet_res_lock(cpt); current_me = lnet_handle2me(¤t_meh); if (current_me == NULL) { lnet_me_free_locked(new_me); - lnet_res_unlock(); + lnet_res_unlock(cpt); return -ENOENT; } @@ -172,8 +177,8 @@ LNetMEInsert(lnet_handle_me_t current_meh, if (lnet_ptl_is_unique(ptl)) { /* nosense to insertion on unique portal */ lnet_me_free_locked(new_me); - lnet_res_unlock(); - return -EPERM; + lnet_res_unlock(cpt); + return -EPERM; } new_me->me_portal = current_me->me_portal; @@ -183,7 +188,7 @@ LNetMEInsert(lnet_handle_me_t current_meh, new_me->me_unlink = unlink; new_me->me_md = NULL; - lnet_res_lh_initialize(&the_lnet.ln_me_container, &new_me->me_lh); + lnet_res_lh_initialize(the_lnet.ln_me_containers[cpt], &new_me->me_lh); if (pos == LNET_INS_AFTER) cfs_list_add(&new_me->me_list, ¤t_me->me_list); @@ -192,7 +197,7 @@ LNetMEInsert(lnet_handle_me_t current_meh, lnet_me2handle(handle, new_me); - lnet_res_unlock(); + lnet_res_unlock(cpt); return 0; } @@ -214,18 +219,20 @@ LNetMEInsert(lnet_handle_me_t current_meh, int LNetMEUnlink(lnet_handle_me_t meh) { - lnet_me_t *me; - lnet_libmd_t *md; - lnet_event_t ev; + lnet_me_t *me; + lnet_libmd_t *md; + lnet_event_t ev; + int cpt; - LASSERT (the_lnet.ln_init); - LASSERT (the_lnet.ln_refcount > 0); + LASSERT(the_lnet.ln_init); + LASSERT(the_lnet.ln_refcount > 0); - lnet_res_lock(); + cpt = lnet_cpt_of_cookie(meh.cookie); + lnet_res_lock(cpt); - me = lnet_handle2me(&meh); - if (me == NULL) { - lnet_res_unlock(); + me = lnet_handle2me(&meh); + if (me == NULL) { + lnet_res_unlock(cpt); return -ENOENT; } @@ -235,12 +242,12 @@ LNetMEUnlink(lnet_handle_me_t meh) md->md_refcount == 0) { lnet_build_unlink_event(md, &ev); lnet_eq_enqueue_event(md->md_eq, &ev); - } + } - lnet_me_unlink(me); + lnet_me_unlink(me); - lnet_res_unlock(); - return 0; + lnet_res_unlock(cpt); + return 0; } /* call with lnet_res_lock please */ diff --git a/lnet/lnet/lib-move.c b/lnet/lnet/lib-move.c index 183bb72..099bcd0 100644 --- a/lnet/lnet/lib-move.c +++ b/lnet/lnet/lib-move.c @@ -1483,8 +1483,10 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg) lnet_libmd_t *md; int rlength; int mlength; + int cpt; - lnet_res_lock(); + cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie); + lnet_res_lock(cpt); src.nid = hdr->src_nid; src.pid = hdr->src_pid; @@ -1502,7 +1504,7 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg) CERROR("REPLY MD also attached to portal %d\n", md->md_me->me_portal); - lnet_res_unlock(); + lnet_res_unlock(cpt); return ENOENT; /* +ve: OK but no match */ } @@ -1518,7 +1520,7 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg) libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), rlength, hdr->msg.reply.dst_wmd.wh_object_cookie, mlength); - lnet_res_unlock(); + lnet_res_unlock(cpt); return ENOENT; /* +ve: OK but no match */ } @@ -1531,7 +1533,7 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg) if (mlength != 0) lnet_setpayloadbuffer(msg); - lnet_res_unlock(); + lnet_res_unlock(cpt); lnet_build_msg_event(msg, LNET_EVENT_REPLY); @@ -1545,6 +1547,7 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg) lnet_hdr_t *hdr = &msg->msg_hdr; lnet_process_id_t src = {0}; lnet_libmd_t *md; + int cpt; src.nid = hdr->src_nid; src.pid = hdr->src_pid; @@ -1553,7 +1556,8 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg) hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits); hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength); - lnet_res_lock(); + cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie); + lnet_res_lock(cpt); /* NB handles only looked up by creator (no flips) */ md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd); @@ -1569,7 +1573,7 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg) CERROR("Source MD also attached to portal %d\n", md->md_me->me_portal); - lnet_res_unlock(); + lnet_res_unlock(cpt); return ENOENT; /* +ve! */ } @@ -1579,7 +1583,7 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg) lnet_msg_attach_md(msg, md, 0, 0); - lnet_res_unlock(); + lnet_res_unlock(cpt); lnet_build_msg_event(msg, LNET_EVENT_ACK); @@ -2036,9 +2040,10 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack, __u64 match_bits, unsigned int offset, __u64 hdr_data) { - lnet_msg_t *msg; - lnet_libmd_t *md; - int rc; + struct lnet_msg *msg; + struct lnet_libmd *md; + int cpt; + int rc; LASSERT (the_lnet.ln_init); LASSERT (the_lnet.ln_refcount > 0); @@ -2059,7 +2064,8 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack, } msg->msg_vmflush = !!cfs_memory_pressure_get(); - lnet_res_lock(); + cpt = lnet_cpt_of_cookie(mdh.cookie); + lnet_res_lock(cpt); md = lnet_handle2md(&mdh); if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) { @@ -2069,11 +2075,9 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack, if (md != NULL && md->md_me != NULL) CERROR("Source MD also attached to portal %d\n", md->md_me->me_portal); - - lnet_res_unlock(); + lnet_res_unlock(cpt); lnet_msg_free(msg); - return -ENOENT; } @@ -2101,7 +2105,7 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack, LNET_WIRE_HANDLE_COOKIE_NONE; } - lnet_res_unlock(); + lnet_res_unlock(cpt); lnet_build_msg_event(msg, LNET_EVENT_SEND); @@ -2126,14 +2130,16 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg) * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when * lnet_finalize() is called on it, so the LND must call this first */ - lnet_msg_t *msg = lnet_msg_alloc(); - lnet_libmd_t *getmd = getmsg->msg_md; - lnet_process_id_t peer_id = getmsg->msg_target; + struct lnet_msg *msg = lnet_msg_alloc(); + struct lnet_libmd *getmd = getmsg->msg_md; + lnet_process_id_t peer_id = getmsg->msg_target; + int cpt; - LASSERT (!getmsg->msg_target_is_router); - LASSERT (!getmsg->msg_routing); + LASSERT(!getmsg->msg_target_is_router); + LASSERT(!getmsg->msg_routing); - lnet_res_lock(); + cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie); + lnet_res_lock(cpt); LASSERT (getmd->md_refcount > 0); @@ -2147,7 +2153,7 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg) CERROR ("%s: Dropping REPLY from %s for inactive MD %p\n", libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd); - lnet_res_unlock(); + lnet_res_unlock(cpt); goto drop; } @@ -2164,7 +2170,7 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg) msg->msg_receiving = 1; /* required by lnet_msg_attach_md */ lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length); - lnet_res_unlock(); + lnet_res_unlock(cpt); LNET_LOCK(); lnet_msg_commit(msg, 0); @@ -2227,9 +2233,10 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh, lnet_process_id_t target, unsigned int portal, __u64 match_bits, unsigned int offset) { - lnet_msg_t *msg; - lnet_libmd_t *md; - int rc; + struct lnet_msg *msg; + struct lnet_libmd *md; + int cpt; + int rc; LASSERT (the_lnet.ln_init); LASSERT (the_lnet.ln_refcount > 0); @@ -2249,7 +2256,8 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh, return -ENOMEM; } - lnet_res_lock(); + cpt = lnet_cpt_of_cookie(mdh.cookie); + lnet_res_lock(cpt); md = lnet_handle2md(&mdh); if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) { @@ -2260,7 +2268,7 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh, CERROR("REPLY MD also attached to portal %d\n", md->md_me->me_portal); - lnet_res_unlock(); + lnet_res_unlock(cpt); lnet_msg_free(msg); @@ -2279,12 +2287,12 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh, msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length); /* NB handles only looked up by creator (no flips) */ - msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie = - the_lnet.ln_interface_cookie; - msg->msg_hdr.msg.get.return_wmd.wh_object_cookie = - md->md_lh.lh_cookie; + msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie = + the_lnet.ln_interface_cookie; + msg->msg_hdr.msg.get.return_wmd.wh_object_cookie = + md->md_lh.lh_cookie; - lnet_res_unlock(); + lnet_res_unlock(cpt); lnet_build_msg_event(msg, LNET_EVENT_SEND); diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index c3cb58b..f03df6b 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -387,6 +387,7 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) { struct lnet_msg_container *container; int my_slot; + int cpt; int i; LASSERT (!cfs_in_interrupt ()); @@ -416,9 +417,11 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) msg->msg_ev.status = status; if (msg->msg_md != NULL) { - lnet_res_lock(); + cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); + + lnet_res_lock(cpt); lnet_msg_detach_md(msg, status); - lnet_res_unlock(); + lnet_res_unlock(cpt); } if (!msg->msg_tx_committed && !msg->msg_rx_committed) { diff --git a/lnet/lnet/lib-ptl.c b/lnet/lnet/lib-ptl.c index b1ce5a8..2ccb0cf 100644 --- a/lnet/lnet/lib-ptl.c +++ b/lnet/lnet/lib-ptl.c @@ -38,6 +38,11 @@ #include +/* NB: add /proc interfaces in upcoming patches */ +int portal_rotor; +CFS_MODULE_PARM(portal_rotor, "i", int, 0644, + "redirect PUTs to different cpu-partitions"); + static int lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id, __u64 mbits, __u64 ignore_bits) @@ -56,10 +61,10 @@ lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id, goto match; /* unset, new portal */ - lnet_res_lock(); + lnet_ptl_lock(ptl); /* check again with lock */ if (unlikely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl))) { - lnet_res_unlock(); + lnet_ptl_unlock(ptl); goto match; } @@ -69,7 +74,7 @@ lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id, else lnet_ptl_setopt(ptl, LNET_PTL_MATCH_WILDCARD); - lnet_res_unlock(); + lnet_ptl_unlock(ptl); return 1; @@ -80,6 +85,56 @@ lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id, return 1; } +static void +lnet_ptl_enable_mt(struct lnet_portal *ptl, int cpt) +{ + struct lnet_match_table *mtable = ptl->ptl_mtables[cpt]; + int i; + + /* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */ + LASSERT(lnet_ptl_is_wildcard(ptl)); + + mtable->mt_enabled = 1; + + ptl->ptl_mt_maps[ptl->ptl_mt_nmaps] = cpt; + for (i = ptl->ptl_mt_nmaps - 1; i >= 0; i--) { + LASSERT(ptl->ptl_mt_maps[i] != cpt); + if (ptl->ptl_mt_maps[i] < cpt) + break; + + /* swap to order */ + ptl->ptl_mt_maps[i + 1] = ptl->ptl_mt_maps[i]; + ptl->ptl_mt_maps[i] = cpt; + } + + ptl->ptl_mt_nmaps++; +} + +static void +lnet_ptl_disable_mt(struct lnet_portal *ptl, int cpt) +{ + struct lnet_match_table *mtable = ptl->ptl_mtables[cpt]; + int i; + + /* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */ + LASSERT(lnet_ptl_is_wildcard(ptl)); + + if (LNET_CPT_NUMBER == 1) + return; /* never disable the only match-table */ + + mtable->mt_enabled = 0; + + LASSERT(ptl->ptl_mt_nmaps > 0 && + ptl->ptl_mt_nmaps <= LNET_CPT_NUMBER); + + /* remove it from mt_maps */ + ptl->ptl_mt_nmaps--; + for (i = 0; i < ptl->ptl_mt_nmaps; i++) { + if (ptl->ptl_mt_maps[i] >= cpt) /* overwrite it */ + ptl->ptl_mt_maps[i] = ptl->ptl_mt_maps[i + 1]; + } +} + static int lnet_try_match_md(lnet_libmd_t *md, struct lnet_match_info *info, struct lnet_msg *msg) @@ -90,12 +145,12 @@ lnet_try_match_md(lnet_libmd_t *md, unsigned int mlength; lnet_me_t *me = md->md_me; - /* mismatched MD op */ - if ((md->md_options & info->mi_opc) == 0) - return LNET_MATCHMD_NONE; - /* MD exhausted */ if (lnet_md_exhausted(md)) + return LNET_MATCHMD_NONE | LNET_MATCHMD_EXHAUSTED; + + /* mismatched MD op */ + if ((md->md_options & info->mi_opc) == 0) return LNET_MATCHMD_NONE; /* mismatched ME nid/pid? */ @@ -147,53 +202,105 @@ lnet_try_match_md(lnet_libmd_t *md, lnet_msg_attach_md(msg, md, offset, mlength); md->md_offset = offset + mlength; + if (!lnet_md_exhausted(md)) + return LNET_MATCHMD_OK; + /* Auto-unlink NOW, so the ME gets unlinked if required. * We bumped md->md_refcount above so the MD just gets flagged * for unlink when it is finalized. */ - if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0 && - lnet_md_exhausted(md)) { + if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0) lnet_md_unlink(md); - } - return LNET_MATCHMD_OK; + return LNET_MATCHMD_OK | LNET_MATCHMD_EXHAUSTED; +} + +static struct lnet_match_table * +lnet_match2mt(struct lnet_portal *ptl, lnet_process_id_t id, __u64 mbits) +{ + if (LNET_CPT_NUMBER == 1) + return ptl->ptl_mtables[0]; /* the only one */ + + /* if it's a unique portal, return match-table hashed by NID */ + return lnet_ptl_is_unique(ptl) ? + ptl->ptl_mtables[lnet_cpt_of_nid(id.nid)] : NULL; } struct lnet_match_table * lnet_mt_of_attach(unsigned int index, lnet_process_id_t id, __u64 mbits, __u64 ignore_bits, lnet_ins_pos_t pos) { - struct lnet_portal *ptl; + struct lnet_portal *ptl; + struct lnet_match_table *mtable; + /* NB: called w/o lock */ LASSERT(index < the_lnet.ln_nportals); if (!lnet_ptl_match_type(index, id, mbits, ignore_bits)) return NULL; ptl = the_lnet.ln_portals[index]; - /* NB: Now we only have one match-table for each portal, - * and will have match-table per CPT in upcoming changes, - * ME will be scattered to different match-tables based - * on attaching information */ - return ptl->ptl_mtable; + + mtable = lnet_match2mt(ptl, id, mbits); + if (mtable != NULL) /* unique portal or only one match-table */ + return mtable; + + /* it's a wildcard portal */ + switch (pos) { + default: + return NULL; + case LNET_INS_BEFORE: + case LNET_INS_AFTER: + /* posted by no affinity thread, always hash to specific + * match-table to avoid buffer stealing which is heavy */ + return ptl->ptl_mtables[ptl->ptl_index % LNET_CPT_NUMBER]; + case LNET_INS_LOCAL: + /* posted by cpu-affinity thread */ + return ptl->ptl_mtables[lnet_cpt_current()]; + } } struct lnet_match_table * lnet_mt_of_match(unsigned int index, lnet_process_id_t id, __u64 mbits) { - struct lnet_portal *ptl; + struct lnet_match_table *mtable; + struct lnet_portal *ptl; + int nmaps; + int rotor; + int cpt; + /* NB: called w/o lock */ LASSERT(index < the_lnet.ln_nportals); - ptl = the_lnet.ln_portals[index]; - if (!lnet_ptl_is_unique(ptl) && - !lnet_ptl_is_wildcard(ptl) && !lnet_ptl_is_lazy(ptl)) - return NULL; - /* NB: Now we only have one match-table for each portal, - * and will have match-table per CPT in upcoming changes, - * request will be scattered to different match-tables based - * on matching information */ - return ptl->ptl_mtable; + LASSERT(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)); + + mtable = lnet_match2mt(ptl, id, mbits); + if (mtable != NULL) + return mtable; + + /* it's a wildcard portal */ + if (!portal_rotor) { + cpt = lnet_cpt_current(); + if (ptl->ptl_mtables[cpt]->mt_enabled) + return ptl->ptl_mtables[cpt]; + } + + rotor = ptl->ptl_rotor++; + cpt = rotor % LNET_CPT_NUMBER; + + if (!ptl->ptl_mtables[cpt]->mt_enabled) { + /* is there any active entry for this portal? */ + nmaps = ptl->ptl_mt_nmaps; + /* map to an active mtable to avoid heavy "stealing" */ + if (nmaps != 0) { + /* NB: there is possibility that ptl_mt_maps is being + * changed because we are not under protection of + * lnet_ptl_lock, but it shouldn't hurt anything */ + cpt = ptl->ptl_mt_maps[rotor % nmaps]; + } + } + + return ptl->ptl_mtables[cpt]; } cfs_list_t * @@ -205,14 +312,13 @@ lnet_mt_match_head(struct lnet_match_table *mtable, if (lnet_ptl_is_wildcard(ptl)) { return &mtable->mt_mlist; - } else if (lnet_ptl_is_unique(ptl)) { + } else { unsigned long hash = mbits + id.nid + id.pid; + LASSERT(lnet_ptl_is_unique(ptl)); hash = cfs_hash_long(hash, LNET_MT_HASH_BITS); return &mtable->mt_mhash[hash]; } - - return NULL; } int @@ -222,11 +328,14 @@ lnet_mt_match_md(struct lnet_match_table *mtable, cfs_list_t *head; lnet_me_t *me; lnet_me_t *tmp; + int exhausted = 0; int rc; + /* NB: only wildcard portal can return LNET_MATCHMD_EXHAUSTED */ + if (lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal])) + exhausted = LNET_MATCHMD_EXHAUSTED; + head = lnet_mt_match_head(mtable, info->mi_id, info->mi_mbits); - if (head == NULL) /* nobody posted anything on this portal */ - goto out; cfs_list_for_each_entry_safe(me, tmp, head, me_list) { /* ME attached but MD not attached yet */ @@ -236,28 +345,128 @@ lnet_mt_match_md(struct lnet_match_table *mtable, LASSERT(me == me->me_md->md_me); rc = lnet_try_match_md(me->me_md, info, msg); - switch (rc) { - default: - LBUG(); - - case LNET_MATCHMD_NONE: - continue; - - case LNET_MATCHMD_OK: - return LNET_MATCHMD_OK; + if ((rc & LNET_MATCHMD_EXHAUSTED) == 0) + exhausted = 0; /* mlist is not empty */ - case LNET_MATCHMD_DROP: - return LNET_MATCHMD_DROP; + if ((rc & LNET_MATCHMD_FINISH) != 0) { + /* don't return EXHAUSTED bit because we don't know + * whether the mlist is empty or not */ + return rc & ~LNET_MATCHMD_EXHAUSTED; } - /* not reached */ } - out: if (info->mi_opc == LNET_MD_OP_GET || !lnet_ptl_is_lazy(the_lnet.ln_portals[info->mi_portal])) - return LNET_MATCHMD_DROP; + return LNET_MATCHMD_DROP | exhausted; + + return LNET_MATCHMD_NONE | exhausted; +} + +static int +lnet_ptl_match_early(struct lnet_portal *ptl, struct lnet_msg *msg) +{ + int rc; + + /* message arrived before any buffer posting on this portal, + * simply delay or drop this message */ + if (likely(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl))) + return 0; + + lnet_ptl_lock(ptl); + /* check it again with hold of lock */ + if (lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)) { + lnet_ptl_unlock(ptl); + return 0; + } + + if (lnet_ptl_is_lazy(ptl)) { + if (msg->msg_rx_ready_delay) { + msg->msg_rx_delayed = 1; + cfs_list_add_tail(&msg->msg_list, + &ptl->ptl_msg_delayed); + } + rc = LNET_MATCHMD_NONE; + } else { + rc = LNET_MATCHMD_DROP; + } - return LNET_MATCHMD_NONE; + lnet_ptl_unlock(ptl); + return rc; +} + +static int +lnet_ptl_match_delay(struct lnet_portal *ptl, + struct lnet_match_info *info, struct lnet_msg *msg) +{ + int first = ptl->ptl_mt_maps[0]; /* read w/o lock */ + int rc = 0; + int i; + + /* steal buffer from other CPTs, and delay it if nothing to steal, + * this function is more expensive than a regular match, but we + * don't expect it can happen a lot */ + LASSERT(lnet_ptl_is_wildcard(ptl)); + + for (i = 0; i < LNET_CPT_NUMBER; i++) { + struct lnet_match_table *mtable; + int cpt; + + cpt = (first + i) % LNET_CPT_NUMBER; + mtable = ptl->ptl_mtables[cpt]; + if (i != 0 && i != LNET_CPT_NUMBER - 1 && !mtable->mt_enabled) + continue; + + lnet_res_lock(cpt); + lnet_ptl_lock(ptl); + + if (i == 0) { /* the first try, attach on stealing list */ + cfs_list_add_tail(&msg->msg_list, + &ptl->ptl_msg_stealing); + } + + if (!cfs_list_empty(&msg->msg_list)) { /* on stealing list */ + rc = lnet_mt_match_md(mtable, info, msg); + + if ((rc & LNET_MATCHMD_EXHAUSTED) != 0 && + mtable->mt_enabled) + lnet_ptl_disable_mt(ptl, cpt); + + if ((rc & LNET_MATCHMD_FINISH) != 0) + cfs_list_del_init(&msg->msg_list); + + } else { + /* could be matched by lnet_ptl_attach_md() + * which is called by another thread */ + rc = msg->msg_md == NULL ? + LNET_MATCHMD_DROP : LNET_MATCHMD_OK; + } + + if (!cfs_list_empty(&msg->msg_list) && /* not matched yet */ + (i == LNET_CPT_NUMBER - 1 || /* the last CPT */ + ptl->ptl_mt_nmaps == 0 || /* no active CPT */ + (ptl->ptl_mt_nmaps == 1 && /* the only active CPT */ + ptl->ptl_mt_maps[0] == cpt))) { + /* nothing to steal, delay or drop */ + cfs_list_del_init(&msg->msg_list); + + if (lnet_ptl_is_lazy(ptl)) { + msg->msg_rx_delayed = 1; + cfs_list_add_tail(&msg->msg_list, + &ptl->ptl_msg_delayed); + rc = LNET_MATCHMD_NONE; + } else { + rc = LNET_MATCHMD_DROP; + } + } + + lnet_ptl_unlock(ptl); + lnet_res_unlock(cpt); + + if ((rc & LNET_MATCHMD_FINISH) != 0 || msg->msg_rx_delayed) + break; + } + + return rc; } int @@ -277,43 +486,64 @@ lnet_ptl_match_md(struct lnet_match_info *info, struct lnet_msg *msg) return LNET_MATCHMD_DROP; } + ptl = the_lnet.ln_portals[info->mi_portal]; + rc = lnet_ptl_match_early(ptl, msg); + if (rc != 0) /* matched or delayed early message */ + return rc; + mtable = lnet_mt_of_match(info->mi_portal, info->mi_id, info->mi_mbits); - if (mtable == NULL) { - CDEBUG(D_NET, "Drop early message from %s of length %d into " - "portal %d MB="LPX64"\n", - libcfs_id2str(info->mi_id), info->mi_rlength, - info->mi_portal, info->mi_mbits); - return LNET_MATCHMD_DROP; - } - - ptl = the_lnet.ln_portals[info->mi_portal]; - lnet_res_lock(); + lnet_res_lock(mtable->mt_cpt); if (the_lnet.ln_shutdown) { - rc = LNET_MATCHMD_DROP; - goto out; + rc = LNET_MATCHMD_DROP; + goto out1; } rc = lnet_mt_match_md(mtable, info, msg); - if (rc != LNET_MATCHMD_NONE) /* matched or dropping */ - goto out; + if ((rc & LNET_MATCHMD_EXHAUSTED) != 0 && mtable->mt_enabled) { + lnet_ptl_lock(ptl); + lnet_ptl_disable_mt(ptl, mtable->mt_cpt); + lnet_ptl_unlock(ptl); + } + + if ((rc & LNET_MATCHMD_FINISH) != 0) /* matched or dropping */ + goto out1; if (!msg->msg_rx_ready_delay) - goto out; + goto out1; + LASSERT(lnet_ptl_is_lazy(ptl)); LASSERT(!msg->msg_rx_delayed); - msg->msg_rx_delayed = 1; - cfs_list_add_tail(&msg->msg_list, &ptl->ptl_msgq); - - CDEBUG(D_NET, - "Delaying %s from %s portal %d MB "LPX64" offset %d len %d\n", - info->mi_opc == LNET_MD_OP_PUT ? "PUT" : "GET", - libcfs_id2str(info->mi_id), info->mi_portal, - info->mi_mbits, info->mi_roffset, info->mi_rlength); - out: - lnet_res_unlock(); - return rc; + + /* NB: we don't expect "delay" can happen a lot */ + if (lnet_ptl_is_unique(ptl) || LNET_CPT_NUMBER == 1) { + lnet_ptl_lock(ptl); + + msg->msg_rx_delayed = 1; + cfs_list_add_tail(&msg->msg_list, &ptl->ptl_msg_delayed); + + lnet_ptl_unlock(ptl); + lnet_res_unlock(mtable->mt_cpt); + + } else { + lnet_res_unlock(mtable->mt_cpt); + rc = lnet_ptl_match_delay(ptl, info, msg); + } + + if (msg->msg_rx_delayed) { + CDEBUG(D_NET, + "Delaying %s from %s ptl %d MB "LPX64" off %d len %d\n", + info->mi_opc == LNET_MD_OP_PUT ? "PUT" : "GET", + libcfs_id2str(info->mi_id), info->mi_portal, + info->mi_mbits, info->mi_roffset, info->mi_rlength); + } + goto out0; + out1: + lnet_res_unlock(mtable->mt_cpt); + out0: + /* EXHAUSTED bit is only meaningful for internal functions */ + return rc & ~LNET_MATCHMD_EXHAUSTED; } void @@ -331,20 +561,35 @@ lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md, cfs_list_t *matches, cfs_list_t *drops) { struct lnet_portal *ptl = the_lnet.ln_portals[me->me_portal]; + struct lnet_match_table *mtable; + cfs_list_t *head; lnet_msg_t *tmp; lnet_msg_t *msg; + int exhausted = 0; + int cpt; LASSERT(md->md_refcount == 0); /* a brand new MD */ me->me_md = md; md->md_me = me; - cfs_list_for_each_entry_safe(msg, tmp, &ptl->ptl_msgq, msg_list) { + cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie); + mtable = ptl->ptl_mtables[cpt]; + + if (cfs_list_empty(&ptl->ptl_msg_stealing) && + cfs_list_empty(&ptl->ptl_msg_delayed) && + mtable->mt_enabled) + return; + + lnet_ptl_lock(ptl); + head = &ptl->ptl_msg_stealing; + again: + cfs_list_for_each_entry_safe(msg, tmp, head, msg_list) { struct lnet_match_info info; lnet_hdr_t *hdr; int rc; - LASSERT(msg->msg_rx_delayed); + LASSERT(msg->msg_rx_delayed || head == &ptl->ptl_msg_stealing); hdr = &msg->msg_hdr; info.mi_id.nid = hdr->src_nid; @@ -357,13 +602,25 @@ lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md, rc = lnet_try_match_md(md, &info, msg); - if (rc == LNET_MATCHMD_NONE) + exhausted = (rc & LNET_MATCHMD_EXHAUSTED) != 0; + if ((rc & LNET_MATCHMD_NONE) != 0) { + if (exhausted) + break; continue; + } /* Hurrah! This _is_ a match */ - cfs_list_del(&msg->msg_list); + LASSERT((rc & LNET_MATCHMD_FINISH) != 0); + cfs_list_del_init(&msg->msg_list); - if (rc == LNET_MATCHMD_OK) { + if (head == &ptl->ptl_msg_stealing) { + if (exhausted) + break; + /* stealing thread will handle the message */ + continue; + } + + if ((rc & LNET_MATCHMD_OK) != 0) { cfs_list_add_tail(&msg->msg_list, matches); CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d " @@ -372,33 +629,45 @@ lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md, info.mi_portal, info.mi_mbits, info.mi_roffset, info.mi_rlength); } else { - LASSERT(rc == LNET_MATCHMD_DROP); - cfs_list_add_tail(&msg->msg_list, drops); } - if (lnet_md_exhausted(md)) + if (exhausted) break; } + + if (!exhausted && head == &ptl->ptl_msg_stealing) { + head = &ptl->ptl_msg_delayed; + goto again; + } + + if (lnet_ptl_is_wildcard(ptl) && !exhausted && !mtable->mt_enabled) + lnet_ptl_enable_mt(ptl, cpt); + + lnet_ptl_unlock(ptl); } void lnet_ptl_cleanup(struct lnet_portal *ptl) { struct lnet_match_table *mtable; + int i; - LASSERT(cfs_list_empty(&ptl->ptl_msgq)); - - if (ptl->ptl_mtable == NULL) /* uninitialized portal */ + if (ptl->ptl_mtables == NULL) /* uninitialized portal */ return; - do { /* iterate over match-tables when we have percpt match-table */ + LASSERT(cfs_list_empty(&ptl->ptl_msg_delayed)); + LASSERT(cfs_list_empty(&ptl->ptl_msg_stealing)); +#ifndef __KERNEL__ +# ifdef HAVE_LIBPTHREAD + pthread_mutex_destroy(&ptl->ptl_lock); +# endif +#endif + cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) { cfs_list_t *mhash; lnet_me_t *me; int j; - mtable = ptl->ptl_mtable; - if (mtable->mt_mhash == NULL) /* uninitialized match-table */ continue; @@ -423,10 +692,10 @@ lnet_ptl_cleanup(struct lnet_portal *ptl) } LIBCFS_FREE(mhash, sizeof(*mhash) * LNET_MT_HASH_SIZE); - } while (0); + } - LIBCFS_FREE(ptl->ptl_mtable, sizeof(*mtable)); - ptl->ptl_mtable = NULL; + cfs_percpt_free(ptl->ptl_mtables); + ptl->ptl_mtables = NULL; } int @@ -434,20 +703,29 @@ lnet_ptl_setup(struct lnet_portal *ptl, int index) { struct lnet_match_table *mtable; cfs_list_t *mhash; + int i; int j; - ptl->ptl_index = index; - CFS_INIT_LIST_HEAD(&ptl->ptl_msgq); - - LIBCFS_ALLOC(mtable, sizeof(*mtable)); - if (mtable == NULL) { + ptl->ptl_mtables = cfs_percpt_alloc(lnet_cpt_table(), + sizeof(struct lnet_match_table)); + if (ptl->ptl_mtables == NULL) { CERROR("Failed to create match table for portal %d\n", index); return -ENOMEM; } - ptl->ptl_mtable = mtable; - do { /* iterate over match-tables when we have percpt match-table */ - LIBCFS_ALLOC(mhash, sizeof(*mhash) * LNET_MT_HASH_SIZE); + ptl->ptl_index = index; + CFS_INIT_LIST_HEAD(&ptl->ptl_msg_delayed); + CFS_INIT_LIST_HEAD(&ptl->ptl_msg_stealing); +#ifdef __KERNEL__ + cfs_spin_lock_init(&ptl->ptl_lock); +#else +# ifdef HAVE_LIBPTHREAD + pthread_mutex_init(&ptl->ptl_lock, NULL); +# endif +#endif + cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) { + LIBCFS_CPT_ALLOC(mhash, lnet_cpt_table(), i, + sizeof(*mhash) * LNET_MT_HASH_SIZE); if (mhash == NULL) { CERROR("Failed to create match hash for portal %d\n", index); @@ -460,7 +738,8 @@ lnet_ptl_setup(struct lnet_portal *ptl, int index) CFS_INIT_LIST_HEAD(&mtable->mt_mlist); mtable->mt_portal = index; - } while (0); + mtable->mt_cpt = i; + } return 0; failed: @@ -489,7 +768,7 @@ lnet_portals_create(void) int size; int i; - size = sizeof(struct lnet_portal); + size = offsetof(struct lnet_portal, ptl_mt_maps[LNET_CPT_NUMBER]); the_lnet.ln_nportals = MAX_PORTALS; the_lnet.ln_portals = cfs_array_alloc(the_lnet.ln_nportals, size); @@ -547,9 +826,13 @@ LNetSetLazyPortal(int portal) CDEBUG(D_NET, "Setting portal %d lazy\n", portal); ptl = the_lnet.ln_portals[portal]; - lnet_res_lock(); + lnet_res_lock(LNET_LOCK_EX); + lnet_ptl_lock(ptl); + lnet_ptl_setopt(ptl, LNET_PTL_LAZY); - lnet_res_unlock(); + + lnet_ptl_unlock(ptl); + lnet_res_unlock(LNET_LOCK_EX); return 0; } @@ -574,10 +857,12 @@ LNetClearLazyPortal(int portal) ptl = the_lnet.ln_portals[portal]; - lnet_res_lock(); + lnet_res_lock(LNET_LOCK_EX); + lnet_ptl_lock(ptl); if (!lnet_ptl_is_lazy(ptl)) { - lnet_res_unlock(); + lnet_ptl_unlock(ptl); + lnet_res_unlock(LNET_LOCK_EX); return 0; } @@ -587,11 +872,12 @@ LNetClearLazyPortal(int portal) CDEBUG(D_NET, "clearing portal %d lazy\n", portal); /* grab all the blocked messages atomically */ - cfs_list_splice_init(&ptl->ptl_msgq, &zombies); + cfs_list_splice_init(&ptl->ptl_msg_delayed, &zombies); lnet_ptl_unsetopt(ptl, LNET_PTL_LAZY); - lnet_res_unlock(); + lnet_ptl_unlock(ptl); + lnet_res_unlock(LNET_LOCK_EX); lnet_drop_delayed_msg_list(&zombies, "Clearing lazy portal attr");