extern lnet_t the_lnet; /* THE network */
+#if !defined(__KERNEL__) || defined(LNET_USE_LIB_FREELIST)
+/* 1 CPT, simplify implementation... */
+# define LNET_CPT_MAX_BITS 0
+
+#else /* KERNEL and no freelist */
+
+# if (BITS_PER_LONG == 32)
+/* 2 CPTs, allowing more CPTs might make us under memory pressure */
+# define LNET_CPT_MAX_BITS 1
+
+# else /* 64-bit system */
+/*
+ * 256 CPTs for thousands of CPUs, allowing more CPTs might make us
+ * under risk of consuming all lh_cooke.
+ */
+# define LNET_CPT_MAX_BITS 8
+# endif /* BITS_PER_LONG == 32 */
+#endif
+
+/* max allowed CPT number */
+#define LNET_CPT_MAX (1 << LNET_CPT_MAX_BITS)
+
+#define LNET_CPT_NUMBER (the_lnet.ln_cpt_number)
+#define LNET_CPT_BITS (the_lnet.ln_cpt_bits)
+#define LNET_CPT_MASK ((1ULL << LNET_CPT_BITS) - 1)
+
+/** exclusive lock */
+#define LNET_LOCK_EX CFS_PERCPT_LOCK_EX
+
static inline int lnet_is_wire_handle_none (lnet_handle_wire_t *wh)
{
return (wh->wh_interface_cookie == LNET_WIRE_HANDLE_COOKIE_NONE &&
lnet_md_exhausted(md));
}
-#ifdef __KERNEL__
+#define lnet_cpt_table() (the_lnet.ln_cpt_table)
+#define lnet_cpt_current() cfs_cpt_current(the_lnet.ln_cpt_table, 1)
+
+static inline int
+lnet_cpt_of_cookie(__u64 cookie)
+{
+ unsigned int cpt = (cookie >> LNET_COOKIE_TYPE_BITS) & LNET_CPT_MASK;
+
+ /* LNET_CPT_NUMBER doesn't have to be power2, which means we can
+ * get illegal cpt from it's invalid cookie */
+ return cpt < LNET_CPT_NUMBER ? cpt : cpt % LNET_CPT_NUMBER;
+}
static inline void
-lnet_res_lock(void)
+lnet_res_lock(int cpt)
{
- cfs_spin_lock(&the_lnet.ln_res_lock);
+ cfs_percpt_lock(the_lnet.ln_res_lock, cpt);
}
static inline void
-lnet_res_unlock(void)
+lnet_res_unlock(int cpt)
{
- cfs_spin_unlock(&the_lnet.ln_res_lock);
+ cfs_percpt_unlock(the_lnet.ln_res_lock, cpt);
}
-#define LNET_LOCK() cfs_spin_lock(&the_lnet.ln_lock)
-#define LNET_UNLOCK() cfs_spin_unlock(&the_lnet.ln_lock)
-#define LNET_MUTEX_LOCK(m) cfs_mutex_lock(m)
-#define LNET_MUTEX_UNLOCK(m) cfs_mutex_unlock(m)
-#else
+static inline int
+lnet_res_lock_current(void)
+{
+ int cpt = lnet_cpt_current();
+
+ lnet_res_lock(cpt);
+ return cpt;
+}
+
+#ifdef __KERNEL__
+
+#define lnet_ptl_lock(ptl) cfs_spin_lock(&(ptl)->ptl_lock)
+#define lnet_ptl_unlock(ptl) cfs_spin_unlock(&(ptl)->ptl_lock)
+#define lnet_eq_wait_lock() cfs_spin_lock(&the_lnet.ln_eq_wait_lock)
+#define lnet_eq_wait_unlock() cfs_spin_unlock(&the_lnet.ln_eq_wait_lock)
+#define LNET_LOCK() cfs_spin_lock(&the_lnet.ln_lock)
+#define LNET_UNLOCK() cfs_spin_unlock(&the_lnet.ln_lock)
+#define LNET_MUTEX_LOCK(m) cfs_mutex_lock(m)
+#define LNET_MUTEX_UNLOCK(m) cfs_mutex_unlock(m)
+
+#else /* !__KERNEL__ */
+
# ifndef HAVE_LIBPTHREAD
#define LNET_SINGLE_THREADED_LOCK(l) \
do { \
#define LNET_MUTEX_LOCK(m) LNET_SINGLE_THREADED_LOCK(*(m))
#define LNET_MUTEX_UNLOCK(m) LNET_SINGLE_THREADED_UNLOCK(*(m))
-#define lnet_res_lock() \
- LNET_SINGLE_THREADED_LOCK(the_lnet.ln_res_lock)
-#define lnet_res_unlock() \
- LNET_SINGLE_THREADED_UNLOCK(the_lnet.ln_res_lock)
+#define lnet_ptl_lock(ptl) \
+ LNET_SINGLE_THREADED_LOCK((ptl)->ptl_lock)
+#define lnet_ptl_unlock(ptl) \
+ LNET_SINGLE_THREADED_UNLOCK((ptl)->ptl_lock)
+
+#define lnet_eq_wait_lock() \
+ LNET_SINGLE_THREADED_LOCK(the_lnet.ln_eq_wait_lock)
+#define lnet_eq_wait_unlock() \
+ LNET_SINGLE_THREADED_UNLOCK(the_lnet.ln_eq_wait_lock)
+
+# else /* HAVE_LIBPTHREAD */
-# else
#define LNET_LOCK() pthread_mutex_lock(&the_lnet.ln_lock)
#define LNET_UNLOCK() pthread_mutex_unlock(&the_lnet.ln_lock)
#define LNET_MUTEX_LOCK(m) pthread_mutex_lock(m)
#define LNET_MUTEX_UNLOCK(m) pthread_mutex_unlock(m)
-#define lnet_res_lock() pthread_mutex_lock(&the_lnet.ln_res_lock)
-#define lnet_res_unlock() pthread_mutex_unlock(&the_lnet.ln_res_lock)
-# endif
-#endif
+#define lnet_ptl_lock(ptl) pthread_mutex_lock(&(ptl)->ptl_lock)
+#define lnet_ptl_unlock(ptl) pthread_mutex_unlock(&(ptl)->ptl_lock)
+
+#define lnet_eq_wait_lock() pthread_mutex_lock(&the_lnet.ln_eq_wait_lock)
+#define lnet_eq_wait_unlock() pthread_mutex_unlock(&the_lnet.ln_eq_wait_lock)
+
+# endif /* HAVE_LIBPTHREAD */
+#endif /* __KERNEL__ */
#define MAX_PORTALS 64
struct lnet_res_container *rec = &the_lnet.ln_eq_container;
lnet_eq_t *eq;
- lnet_res_lock();
+ LASSERT(LNET_CPT_NUMBER == 1);
+
+ lnet_res_lock(0);
eq = (lnet_eq_t *)lnet_freelist_alloc(&rec->rec_freelist);
- lnet_res_unlock();
+ lnet_res_unlock(0);
return eq;
}
/* ALWAYS called with resource lock held */
struct lnet_res_container *rec = &the_lnet.ln_eq_container;
+ LASSERT(LNET_CPT_NUMBER == 1);
lnet_freelist_free(&rec->rec_freelist, eq);
}
static inline void
lnet_eq_free(lnet_eq_t *eq)
{
- lnet_res_lock();
+ lnet_res_lock(0);
lnet_eq_free_locked(eq);
- lnet_res_unlock();
+ lnet_res_unlock(0);
}
static inline lnet_libmd_t *
lnet_md_alloc (lnet_md_t *umd)
{
/* NEVER called with resource lock held */
- struct lnet_res_container *rec = &the_lnet.ln_md_container;
+ struct lnet_res_container *rec = the_lnet.ln_md_containers[0];
lnet_libmd_t *md;
- lnet_res_lock();
+ LASSERT(LNET_CPT_NUMBER == 1);
+
+ lnet_res_lock(0);
md = (lnet_libmd_t *)lnet_freelist_alloc(&rec->rec_freelist);
- lnet_res_unlock();
+ lnet_res_unlock(0);
if (md != NULL)
CFS_INIT_LIST_HEAD(&md->md_list);
lnet_md_free_locked(lnet_libmd_t *md)
{
/* ALWAYS called with resource lock held */
- struct lnet_res_container *rec = &the_lnet.ln_md_container;
+ struct lnet_res_container *rec = the_lnet.ln_md_containers[0];
+ LASSERT(LNET_CPT_NUMBER == 1);
lnet_freelist_free(&rec->rec_freelist, md);
}
static inline void
lnet_md_free(lnet_libmd_t *md)
{
- lnet_res_lock();
+ lnet_res_lock(0);
lnet_md_free_locked(md);
- lnet_res_unlock();
+ lnet_res_unlock(0);
}
static inline lnet_me_t *
lnet_me_alloc(void)
{
/* NEVER called with resource lock held */
- struct lnet_res_container *rec = &the_lnet.ln_me_container;
+ struct lnet_res_container *rec = the_lnet.ln_me_containers[0];
lnet_me_t *me;
- lnet_res_lock();
+ LASSERT(LNET_CPT_NUMBER == 1);
+
+ lnet_res_lock(0);
me = (lnet_me_t *)lnet_freelist_alloc(&rec->rec_freelist);
- lnet_res_unlock();
+ lnet_res_unlock(0);
return me;
}
lnet_me_free_locked(lnet_me_t *me)
{
/* ALWAYS called with resource lock held */
- struct lnet_res_container *rec = &the_lnet.ln_me_container;
+ struct lnet_res_container *rec = the_lnet.ln_me_containers[0];
+ LASSERT(LNET_CPT_NUMBER == 1);
lnet_freelist_free(&rec->rec_freelist, me);
}
static inline void
lnet_me_free(lnet_me_t *me)
{
- lnet_res_lock();
+ lnet_res_lock(0);
lnet_me_free_locked(me);
- lnet_res_unlock();
+ lnet_res_unlock(0);
}
static inline lnet_msg_t *
lnet_res_lh_invalidate(lnet_libhandle_t *lh)
{
/* ALWAYS called with resource lock held */
+ /* NB: cookie is still useful, don't reset it */
cfs_list_del(&lh->lh_hash_chain);
}
{
/* ALWAYS called with resource lock held */
lnet_libhandle_t *lh;
+ int cpt;
- lh = lnet_res_lh_lookup(&the_lnet.ln_md_container, handle->cookie);
+ cpt = lnet_cpt_of_cookie(handle->cookie);
+ lh = lnet_res_lh_lookup(the_lnet.ln_md_containers[cpt],
+ handle->cookie);
if (lh == NULL)
return NULL;
{
/* ALWAYS called with resource lock held */
lnet_libhandle_t *lh;
+ int cpt;
if (wh->wh_interface_cookie != the_lnet.ln_interface_cookie)
return NULL;
- lh = lnet_res_lh_lookup(&the_lnet.ln_md_container,
+ cpt = lnet_cpt_of_cookie(wh->wh_object_cookie);
+ lh = lnet_res_lh_lookup(the_lnet.ln_md_containers[cpt],
wh->wh_object_cookie);
if (lh == NULL)
return NULL;
{
/* ALWAYS called with resource lock held */
lnet_libhandle_t *lh;
+ int cpt;
- lh = lnet_res_lh_lookup(&the_lnet.ln_me_container, handle->cookie);
+ cpt = lnet_cpt_of_cookie(handle->cookie);
+ lh = lnet_res_lh_lookup(the_lnet.ln_me_containers[cpt],
+ handle->cookie);
if (lh == NULL)
return NULL;
}
#endif
+extern int lnet_cpt_of_nid(lnet_nid_t nid);
extern lnet_ni_t *lnet_nid2ni_locked (lnet_nid_t nid);
extern lnet_ni_t *lnet_net2ni_locked (__u32 net);
static inline lnet_ni_t *
((type *)((char *)(ptr)-(char *)(&((type *)0)->member)))
typedef struct lnet_eq {
- cfs_list_t eq_list;
- lnet_libhandle_t eq_lh;
- lnet_seq_t eq_enq_seq;
- lnet_seq_t eq_deq_seq;
- unsigned int eq_size;
- lnet_event_t *eq_events;
- int eq_refcount;
- lnet_eq_handler_t eq_callback;
+ cfs_list_t eq_list;
+ lnet_libhandle_t eq_lh;
+ lnet_seq_t eq_enq_seq;
+ lnet_seq_t eq_deq_seq;
+ unsigned int eq_size;
+ lnet_eq_handler_t eq_callback;
+ lnet_event_t *eq_events;
+ int **eq_refs; /* percpt refcount for EQ */
} lnet_eq_t;
typedef struct lnet_me {
LNET_MATCHMD_OK = (1 << 1),
/* Must be discarded */
LNET_MATCHMD_DROP = (1 << 2),
+ /* match and buffer is exhausted */
+ LNET_MATCHMD_EXHAUSTED = (1 << 3),
+ /* match or drop */
+ LNET_MATCHMD_FINISH = (LNET_MATCHMD_OK | LNET_MATCHMD_DROP),
};
/* Options for lnet_portal_t::ptl_options */
/* reserved for upcoming patches, CPU partition ID */
unsigned int mt_cpt;
unsigned int mt_portal; /* portal index */
+ /* match table is set as "enabled" if there's non-exhausted MD
+ * attached on mt_mlist, it's only valide for wildcard portal */
+ unsigned int mt_enabled;
cfs_list_t mt_mlist; /* matching list */
cfs_list_t *mt_mhash; /* matching hash */
};
typedef struct lnet_portal {
+#ifdef __KERNEL__
+ cfs_spinlock_t ptl_lock;
+#else
+# ifndef HAVE_LIBPTHREAD
+ int ptl_lock;
+# else
+ pthread_mutex_t ptl_lock;
+# endif
+#endif
unsigned int ptl_index; /* portal ID, reserved */
/* flags on this portal: lazy, unique... */
unsigned int ptl_options;
- /* Now we only have single instance for each portal,
- * will have instance per CPT in upcoming patches */
- struct lnet_match_table *ptl_mtable;
+ /* list of messags which are stealing buffer */
+ cfs_list_t ptl_msg_stealing;
/* messages blocking for MD */
- cfs_list_t ptl_msgq;
+ cfs_list_t ptl_msg_delayed;
+ /* Match table for each CPT */
+ struct lnet_match_table **ptl_mtables;
+ /* spread rotor of incoming "PUT" */
+ int ptl_rotor;
+ /* # active entries for this portal */
+ int ptl_mt_nmaps;
+ /* array of active entries' cpu-partition-id */
+ int ptl_mt_maps[0];
} lnet_portal_t;
#define LNET_LH_HASH_BITS 12
typedef struct
{
- /* Stuff initialised at LNetInit() */
- int ln_init; /* LNetInit() called? */
- int ln_refcount; /* LNetNIInit/LNetNIFini counter */
- int ln_niinit_self; /* Have I called LNetNIInit myself? */
- /* shutdown in progress */
- int ln_shutdown;
- /* registered LNDs */
- cfs_list_t ln_lnds;
+ /* CPU partition table of LNet */
+ struct cfs_cpt_table *ln_cpt_table;
+ /* number of CPTs in ln_cpt_table */
+ unsigned int ln_cpt_number;
+ unsigned int ln_cpt_bits;
#ifdef __KERNEL__
cfs_spinlock_t ln_lock;
cfs_mutex_t ln_api_mutex;
cfs_mutex_t ln_lnd_mutex;
cfs_waitq_t ln_eq_waitq;
- cfs_spinlock_t ln_res_lock;
+ cfs_spinlock_t ln_eq_wait_lock;
#else
# ifndef HAVE_LIBPTHREAD
int ln_lock;
int ln_api_mutex;
int ln_lnd_mutex;
- int ln_res_lock;
+ int ln_eq_wait_lock;
# else
pthread_mutex_t ln_lock;
pthread_mutex_t ln_api_mutex;
pthread_mutex_t ln_lnd_mutex;
pthread_cond_t ln_eq_cond;
- pthread_mutex_t ln_res_lock;
+ pthread_mutex_t ln_eq_wait_lock;
# endif
#endif
+ struct cfs_percpt_lock *ln_res_lock;
/* ME container */
- struct lnet_res_container ln_me_container;
+ struct lnet_res_container **ln_me_containers;
/* MD container */
- struct lnet_res_container ln_md_container;
+ struct lnet_res_container **ln_md_containers;
/* Event Queue container */
struct lnet_res_container ln_eq_container;
/* the vector of portals */
lnet_portal_t **ln_portals;
+ int ln_init; /* LNetInit() called? */
+ /* LNetNIInit/LNetNIFini counter */
+ int ln_refcount;
+ /* Have I called LNetNIInit myself? */
+ int ln_niinit_self;
+ /* shutdown in progress */
+ int ln_shutdown;
+ /* registered LNDs */
+ cfs_list_t ln_lnds;
+
lnet_pid_t ln_pid; /* requested pid */
cfs_list_t ln_nis; /* LND instances */
* or after the last item in the list.
*/
typedef enum {
- LNET_INS_BEFORE,
- LNET_INS_AFTER
+ /** insert ME before current position or head of the list */
+ LNET_INS_BEFORE,
+ /** insert ME after current position or tail of the list */
+ LNET_INS_AFTER,
+ /** attach ME at tail of local CPU partition ME list */
+ LNET_INS_LOCAL
} lnet_ins_pos_t;
/** @} lnet_me */
lnet_init_locks(void)
{
cfs_spin_lock_init(&the_lnet.ln_lock);
- cfs_spin_lock_init(&the_lnet.ln_res_lock);
+ cfs_spin_lock_init(&the_lnet.ln_eq_wait_lock);
cfs_waitq_init(&the_lnet.ln_eq_waitq);
cfs_mutex_init(&the_lnet.ln_lnd_mutex);
cfs_mutex_init(&the_lnet.ln_api_mutex);
void lnet_init_locks(void)
{
the_lnet.ln_lock = 0;
- the_lnet.ln_res_lock = 0;
+ the_lnet.ln_eq_wait_lock = 0;
the_lnet.ln_lnd_mutex = 0;
the_lnet.ln_api_mutex = 0;
}
LASSERT(the_lnet.ln_api_mutex == 0);
LASSERT(the_lnet.ln_lnd_mutex == 0);
LASSERT(the_lnet.ln_lock == 0);
- LASSERT(the_lnet.ln_res_lock == 0);
+ LASSERT(the_lnet.ln_eq_wait_lock == 0);
}
# else
{
pthread_cond_init(&the_lnet.ln_eq_cond, NULL);
pthread_mutex_init(&the_lnet.ln_lock, NULL);
- pthread_mutex_init(&the_lnet.ln_res_lock, NULL);
+ pthread_mutex_init(&the_lnet.ln_eq_wait_lock, NULL);
pthread_mutex_init(&the_lnet.ln_lnd_mutex, NULL);
pthread_mutex_init(&the_lnet.ln_api_mutex, NULL);
}
pthread_mutex_destroy(&the_lnet.ln_api_mutex);
pthread_mutex_destroy(&the_lnet.ln_lnd_mutex);
pthread_mutex_destroy(&the_lnet.ln_lock);
- pthread_mutex_destroy(&the_lnet.ln_res_lock);
+ pthread_mutex_destroy(&the_lnet.ln_eq_wait_lock);
pthread_cond_destroy(&the_lnet.ln_eq_cond);
}
# endif
#endif
+static int
+lnet_create_locks(void)
+{
+ lnet_init_locks();
+
+ the_lnet.ln_res_lock = cfs_percpt_lock_alloc(lnet_cpt_table());
+ if (the_lnet.ln_res_lock != NULL)
+ return 0;
+
+ lnet_fini_locks();
+ return -ENOMEM;
+}
+
+static void
+lnet_destroy_locks(void)
+{
+ if (the_lnet.ln_res_lock != NULL) {
+ cfs_percpt_lock_free(the_lnet.ln_res_lock);
+ the_lnet.ln_res_lock = NULL;
+ }
+
+ lnet_fini_locks();
+}
+
void lnet_assert_wire_constants (void)
{
/* Wire protocol assertions generated by 'wirecheck'
int
lnet_res_container_setup(struct lnet_res_container *rec,
- int type, int objnum, int objsz)
+ int cpt, int type, int objnum, int objsz)
{
int rc = 0;
int i;
if (rc != 0)
goto out;
#endif
- rec->rec_lh_cookie = type;
+ rec->rec_lh_cookie = (cpt << LNET_COOKIE_TYPE_BITS) | type;
/* Arbitrary choice of hash table size */
- LIBCFS_ALLOC(rec->rec_lh_hash,
- LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0]));
+ LIBCFS_CPT_ALLOC(rec->rec_lh_hash, lnet_cpt_table(), cpt,
+ LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0]));
if (rec->rec_lh_hash == NULL) {
rc = -ENOMEM;
goto out;
return rc;
}
+static void
+lnet_res_containers_destroy(struct lnet_res_container **recs)
+{
+ struct lnet_res_container *rec;
+ int i;
+
+ cfs_percpt_for_each(rec, i, recs)
+ lnet_res_container_cleanup(rec);
+
+ cfs_percpt_free(recs);
+}
+
+static struct lnet_res_container **
+lnet_res_containers_create(int type, int objnum, int objsz)
+{
+ struct lnet_res_container **recs;
+ struct lnet_res_container *rec;
+ int rc;
+ int i;
+
+ recs = cfs_percpt_alloc(lnet_cpt_table(), sizeof(*rec));
+ if (recs == NULL) {
+ CERROR("Failed to allocate %s resource containers\n",
+ lnet_res_type2str(type));
+ return NULL;
+ }
+
+ cfs_percpt_for_each(rec, i, recs) {
+ rc = lnet_res_container_setup(rec, i, type, objnum, objsz);
+ if (rc != 0) {
+ lnet_res_containers_destroy(recs);
+ return NULL;
+ }
+ }
+
+ return recs;
+}
+
lnet_libhandle_t *
lnet_res_lh_lookup(struct lnet_res_container *rec, __u64 cookie)
{
if ((cookie & (LNET_COOKIE_TYPES - 1)) != rec->rec_type)
return NULL;
- hash = cookie >> LNET_COOKIE_TYPE_BITS;
+ hash = cookie >> (LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS);
head = &rec->rec_lh_hash[hash & LNET_LH_HASH_MASK];
cfs_list_for_each_entry(lh, head, lh_hash_chain) {
lnet_res_lh_initialize(struct lnet_res_container *rec, lnet_libhandle_t *lh)
{
/* ALWAYS called with lnet_res_lock held */
- unsigned int ibits = LNET_COOKIE_TYPE_BITS;
+ unsigned int ibits = LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS;
unsigned int hash;
lh->lh_cookie = rec->rec_lh_cookie;
lnet_prepare(lnet_pid_t requested_pid)
{
/* Prepare to bring up the network */
- int rc = 0;
+ struct lnet_res_container **recs;
+ int rc = 0;
LASSERT (the_lnet.ln_refcount == 0);
if (rc != 0)
goto failed1;
- rc = lnet_res_container_setup(&the_lnet.ln_eq_container,
+ rc = lnet_res_container_setup(&the_lnet.ln_eq_container, 0,
LNET_COOKIE_TYPE_EQ, LNET_FL_MAX_EQS,
sizeof(lnet_eq_t));
- if (rc != 0) {
- CERROR("Failed to create EQ container for LNet: %d\n", rc);
+ if (rc != 0)
goto failed2;
- }
- /* NB: we will have instance of ME container per CPT soon */
- rc = lnet_res_container_setup(&the_lnet.ln_me_container,
- LNET_COOKIE_TYPE_ME, LNET_FL_MAX_MES,
- sizeof(lnet_me_t));
- if (rc != 0) {
- CERROR("Failed to create ME container for LNet: %d\n", rc);
+ recs = lnet_res_containers_create(LNET_COOKIE_TYPE_ME, LNET_FL_MAX_MES,
+ sizeof(lnet_me_t));
+ if (recs == NULL)
goto failed3;
- }
+
+ the_lnet.ln_me_containers = recs;
/* NB: we will have instance of MD container per CPT soon */
- rc = lnet_res_container_setup(&the_lnet.ln_md_container,
- LNET_COOKIE_TYPE_MD, LNET_FL_MAX_MDS,
- sizeof(lnet_libmd_t));
- if (rc != 0) {
- CERROR("Failed to create MD container for LNet: %d\n", rc);
+ recs = lnet_res_containers_create(LNET_COOKIE_TYPE_MD, LNET_FL_MAX_MDS,
+ sizeof(lnet_libmd_t));
+ if (recs == NULL)
goto failed3;
- }
+
+ the_lnet.ln_md_containers = recs;
rc = lnet_portals_create();
if (rc != 0) {
failed3:
/* NB: lnet_res_container_cleanup is safe to call for
* uninitialized container */
- lnet_res_container_cleanup(&the_lnet.ln_md_container);
- lnet_res_container_cleanup(&the_lnet.ln_me_container);
+ if (the_lnet.ln_md_containers != NULL) {
+ lnet_res_containers_destroy(the_lnet.ln_md_containers);
+ the_lnet.ln_md_containers = NULL;
+ }
+ if (the_lnet.ln_me_containers != NULL) {
+ lnet_res_containers_destroy(the_lnet.ln_me_containers);
+ the_lnet.ln_me_containers = NULL;
+ }
lnet_res_container_cleanup(&the_lnet.ln_eq_container);
failed2:
lnet_msg_container_cleanup(&the_lnet.ln_msg_container);
lnet_portals_destroy();
- lnet_res_container_cleanup(&the_lnet.ln_md_container);
- lnet_res_container_cleanup(&the_lnet.ln_me_container);
+ if (the_lnet.ln_md_containers != NULL) {
+ lnet_res_containers_destroy(the_lnet.ln_md_containers);
+ the_lnet.ln_md_containers = NULL;
+ }
+
+ if (the_lnet.ln_me_containers != NULL) {
+ lnet_res_containers_destroy(the_lnet.ln_me_containers);
+ the_lnet.ln_me_containers = NULL;
+ }
+
lnet_res_container_cleanup(&the_lnet.ln_eq_container);
lnet_free_rtrpools();
return NULL;
}
+unsigned int
+lnet_nid_cpt_hash(lnet_nid_t nid)
+{
+ __u64 key = nid;
+ unsigned int val;
+
+ val = cfs_hash_long(key, LNET_CPT_BITS);
+ /* NB: LNET_CP_NUMBER doesn't have to be PO2 */
+ if (val < LNET_CPT_NUMBER)
+ return val;
+
+ return (unsigned int)((key + val + (val >> 1)) % LNET_CPT_NUMBER);
+}
+
+int
+lnet_cpt_of_nid(lnet_nid_t nid)
+{
+ if (LNET_CPT_NUMBER == 1)
+ return 0; /* the only one */
+
+ return lnet_nid_cpt_hash(nid);
+}
+EXPORT_SYMBOL(lnet_cpt_of_nid);
+
int
lnet_islocalnet (__u32 net)
{
int
LNetInit(void)
{
+ int rc;
+
lnet_assert_wire_constants ();
LASSERT (!the_lnet.ln_init);
memset(&the_lnet, 0, sizeof(the_lnet));
- lnet_init_locks();
+ /* refer to global cfs_cpt_table for now */
+ the_lnet.ln_cpt_table = cfs_cpt_table;
+ the_lnet.ln_cpt_number = cfs_cpt_number(cfs_cpt_table);
+
+ LASSERT(the_lnet.ln_cpt_number > 0);
+ if (the_lnet.ln_cpt_number > LNET_CPT_MAX) {
+ /* we are under risk of consuming all lh_cookie */
+ CERROR("Can't have %d CPTs for LNet (max allowed is %d), "
+ "please change setting of CPT-table and retry\n",
+ the_lnet.ln_cpt_number, LNET_CPT_MAX);
+ return -1;
+ }
+
+ while ((1 << the_lnet.ln_cpt_bits) < the_lnet.ln_cpt_number)
+ the_lnet.ln_cpt_bits++;
+
+ rc = lnet_create_locks();
+ if (rc != 0) {
+ CERROR("Can't create LNet global locks: %d\n", rc);
+ return -1;
+ }
+
the_lnet.ln_refcount = 0;
the_lnet.ln_init = 1;
LNetInvalidateHandle(&the_lnet.ln_rc_eqh);
void
LNetFini(void)
{
- LASSERT (the_lnet.ln_init);
- LASSERT (the_lnet.ln_refcount == 0);
+ LASSERT(the_lnet.ln_init);
+ LASSERT(the_lnet.ln_refcount == 0);
- while (!cfs_list_empty(&the_lnet.ln_lnds))
- lnet_unregister_lnd(cfs_list_entry(the_lnet.ln_lnds.next,
- lnd_t, lnd_list));
- lnet_fini_locks();
+ while (!cfs_list_empty(&the_lnet.ln_lnds))
+ lnet_unregister_lnd(cfs_list_entry(the_lnet.ln_lnds.next,
+ lnd_t, lnd_list));
+ lnet_destroy_locks();
- the_lnet.ln_init = 0;
+ the_lnet.ln_init = 0;
}
/**
if (count != 0) {
LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
- if (eq->eq_events == NULL) {
- lnet_eq_free(eq);
- return -ENOMEM;
- }
+ if (eq->eq_events == NULL)
+ goto failed;
/* NB allocator has set all event sequence numbers to 0,
* so all them should be earlier than eq_deq_seq */
}
eq->eq_deq_seq = 1;
eq->eq_enq_seq = 1;
eq->eq_size = count;
- eq->eq_refcount = 0;
eq->eq_callback = callback;
- lnet_res_lock();
+ eq->eq_refs = cfs_percpt_alloc(lnet_cpt_table(),
+ sizeof(*eq->eq_refs[0]));
+ if (eq->eq_refs == NULL)
+ goto failed;
+
+ /* MUST hold both exclusive lnet_res_lock */
+ lnet_res_lock(LNET_LOCK_EX);
+ /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
+ * both EQ lookup and poll event with only lnet_eq_wait_lock */
+ lnet_eq_wait_lock();
lnet_res_lh_initialize(&the_lnet.ln_eq_container, &eq->eq_lh);
cfs_list_add(&eq->eq_list, &the_lnet.ln_eq_container.rec_active);
- lnet_res_unlock();
+ lnet_eq_wait_unlock();
+ lnet_res_unlock(LNET_LOCK_EX);
lnet_eq2handle(handle, eq);
return 0;
+
+failed:
+ if (eq->eq_events != NULL)
+ LIBCFS_FREE(eq->eq_events, count * sizeof(lnet_event_t));
+
+ if (eq->eq_refs != NULL)
+ cfs_percpt_free(eq->eq_refs);
+
+ lnet_eq_free(eq);
+ return -ENOMEM;
}
/**
int
LNetEQFree(lnet_handle_eq_t eqh)
{
- lnet_eq_t *eq;
- int size;
- lnet_event_t *events;
-
- LASSERT (the_lnet.ln_init);
- LASSERT (the_lnet.ln_refcount > 0);
-
- lnet_res_lock();
+ struct lnet_eq *eq;
+ lnet_event_t *events = NULL;
+ int **refs = NULL;
+ int *ref;
+ int rc = 0;
+ int size = 0;
+ int i;
+
+ LASSERT(the_lnet.ln_init);
+ LASSERT(the_lnet.ln_refcount > 0);
+
+ lnet_res_lock(LNET_LOCK_EX);
+ /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
+ * both EQ lookup and poll event with only lnet_eq_wait_lock */
+ lnet_eq_wait_lock();
eq = lnet_handle2eq(&eqh);
if (eq == NULL) {
- lnet_res_unlock();
- return -ENOENT;
+ rc = -ENOENT;
+ goto out;
}
- if (eq->eq_refcount != 0) {
- CDEBUG(D_NET, "Event queue (%d) busy on destroy.\n",
- eq->eq_refcount);
- lnet_res_unlock();
- return -EBUSY;
+ cfs_percpt_for_each(ref, i, eq->eq_refs) {
+ LASSERT(*ref >= 0);
+ if (*ref == 0)
+ continue;
+
+ CDEBUG(D_NET, "Event equeue (%d: %d) busy on destroy.\n",
+ i, *ref);
+ rc = -EBUSY;
+ goto out;
}
/* stash for free after lock dropped */
events = eq->eq_events;
size = eq->eq_size;
+ refs = eq->eq_refs;
lnet_res_lh_invalidate(&eq->eq_lh);
cfs_list_del(&eq->eq_list);
lnet_eq_free_locked(eq);
-
- lnet_res_unlock();
+ out:
+ lnet_eq_wait_unlock();
+ lnet_res_unlock(LNET_LOCK_EX);
if (events != NULL)
LIBCFS_FREE(events, size * sizeof(lnet_event_t));
+ if (refs != NULL)
+ cfs_percpt_free(refs);
- return 0;
+ return rc;
}
void
lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev)
{
- /* MUST called with resource lock hold */
+ /* MUST called with resource lock hold but w/o lnet_eq_wait_lock */
int index;
if (eq->eq_size == 0) {
return;
}
+ lnet_eq_wait_lock();
ev->sequence = eq->eq_enq_seq++;
LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size));
pthread_cond_broadcast(&the_lnet.ln_eq_cond);
# endif
#endif
+ lnet_eq_wait_unlock();
}
int
lnet_eq_dequeue_event(lnet_eq_t *eq, lnet_event_t *ev)
{
- int new_index = eq->eq_deq_seq & (eq->eq_size - 1);
- lnet_event_t *new_event = &eq->eq_events[new_index];
- int rc;
- ENTRY;
+ int new_index = eq->eq_deq_seq & (eq->eq_size - 1);
+ lnet_event_t *new_event = &eq->eq_events[new_index];
+ int rc;
+ ENTRY;
- if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
- RETURN(0);
- }
+ /* must called with lnet_eq_wait_lock hold */
+ if (LNET_SEQ_GT(eq->eq_deq_seq, new_event->sequence))
+ RETURN(0);
/* We've got a new event... */
*ev = *new_event;
cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
cfs_waitq_add(&the_lnet.ln_eq_waitq, &wl);
- lnet_res_unlock();
+ lnet_eq_wait_unlock();
if (tms < 0) {
cfs_waitq_wait(&wl, CFS_TASK_INTERRUPTIBLE);
wait = tms != 0; /* might need to call here again */
*timeout_ms = tms;
- lnet_res_lock();
+ lnet_eq_wait_lock();
cfs_waitq_del(&the_lnet.ln_eq_waitq, &wl);
return wait;
lnet_eq_cond_wait(struct timespec *ts)
{
if (ts == NULL) {
- pthread_cond_wait(&the_lnet.ln_eq_cond, &the_lnet.ln_res_lock);
+ pthread_cond_wait(&the_lnet.ln_eq_cond,
+ &the_lnet.ln_eq_wait_lock);
} else {
pthread_cond_timedwait(&the_lnet.ln_eq_cond,
- &the_lnet.ln_res_lock, ts);
+ &the_lnet.ln_eq_wait_lock, ts);
}
}
# endif
if (the_lnet.ln_eq_waitni != NULL) {
/* I have a single NI that I have to call into, to get
* events queued, or to block. */
- lnet_res_unlock();
+ lnet_eq_wait_unlock();
LNET_LOCK();
eq_waitni = the_lnet.ln_eq_waitni;
if (unlikely(eq_waitni == NULL)) {
LNET_UNLOCK();
- lnet_res_lock();
+ lnet_eq_wait_lock();
return -1;
}
}
lnet_ni_decref(eq_waitni);
- lnet_res_lock();
+ lnet_eq_wait_lock();
} else { /* w/o eq_waitni */
# ifndef HAVE_LIBPTHREAD
/* If I'm single-threaded, LNET fails at startup if it can't
if (neq < 1)
RETURN(-ENOENT);
- lnet_res_lock();
+ lnet_eq_wait_lock();
for (;;) {
#ifndef __KERNEL__
- lnet_res_unlock();
+ lnet_eq_wait_unlock();
/* Recursion breaker */
if (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
!LNetHandleIsEqual(eventqs[0], the_lnet.ln_rc_eqh))
lnet_router_checker();
- lnet_res_lock();
+ lnet_eq_wait_lock();
#endif
for (i = 0; i < neq; i++) {
lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]);
if (eq == NULL) {
- lnet_res_unlock();
+ lnet_eq_wait_unlock();
RETURN(-ENOENT);
}
rc = lnet_eq_dequeue_event(eq, event);
if (rc != 0) {
- lnet_res_unlock();
+ lnet_eq_wait_unlock();
*which = i;
RETURN(rc);
}
break;
}
- lnet_res_unlock();
+ lnet_eq_wait_unlock();
RETURN(0);
}
CDEBUG(D_NET, "Unlinking md %p\n", md);
if (md->md_eq != NULL) {
- md->md_eq->eq_refcount--;
- LASSERT (md->md_eq->eq_refcount >= 0);
- }
+ int cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
+
+ LASSERT(*md->md_eq->eq_refs[cpt] > 0);
+ (*md->md_eq->eq_refs[cpt])--;
+ }
- LASSERT (!cfs_list_empty(&md->md_list));
- cfs_list_del_init (&md->md_list);
+ LASSERT(!cfs_list_empty(&md->md_list));
+ cfs_list_del_init(&md->md_list);
lnet_md_free_locked(md);
}
/* must be called with resource lock held */
static int
-lnet_md_link(lnet_libmd_t *md, lnet_handle_eq_t eq_handle)
+lnet_md_link(lnet_libmd_t *md, lnet_handle_eq_t eq_handle, int cpt)
{
- struct lnet_res_container *container = &the_lnet.ln_md_container;
+ struct lnet_res_container *container = the_lnet.ln_md_containers[cpt];
/* NB we are passed an allocated, but inactive md.
* if we return success, caller may lnet_md_unlink() it.
if (md->md_eq == NULL)
return -ENOENT;
- md->md_eq->eq_refcount++;
+ (*md->md_eq->eq_refs[cpt])++;
}
lnet_res_lh_initialize(container, &md->md_lh);
LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
lnet_unlink_t unlink, lnet_handle_md_t *handle)
{
- CFS_LIST_HEAD (matches);
- CFS_LIST_HEAD (drops);
- lnet_me_t *me;
- lnet_libmd_t *md;
- int rc;
+ CFS_LIST_HEAD (matches);
+ CFS_LIST_HEAD (drops);
+ struct lnet_me *me;
+ struct lnet_libmd *md;
+ int cpt;
+ int rc;
LASSERT (the_lnet.ln_init);
LASSERT (the_lnet.ln_refcount > 0);
return -ENOMEM;
rc = lnet_md_build(md, &umd, unlink);
+ cpt = lnet_cpt_of_cookie(meh.cookie);
- lnet_res_lock();
+ lnet_res_lock(cpt);
if (rc != 0)
goto failed;
else if (me->me_md != NULL)
rc = -EBUSY;
else
- rc = lnet_md_link(md, umd.eq_handle);
+ rc = lnet_md_link(md, umd.eq_handle, cpt);
if (rc != 0)
goto failed;
lnet_md2handle(handle, md);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
lnet_drop_delayed_msg_list(&drops, "Bad match");
lnet_recv_delayed_msg_list(&matches);
failed:
lnet_md_free_locked(md);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
return rc;
}
int
LNetMDBind(lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle)
{
- lnet_libmd_t *md;
- int rc;
+ lnet_libmd_t *md;
+ int cpt;
+ int rc;
LASSERT (the_lnet.ln_init);
LASSERT (the_lnet.ln_refcount > 0);
rc = lnet_md_build(md, &umd, unlink);
- lnet_res_lock();
+ cpt = lnet_res_lock_current();
if (rc != 0)
goto failed;
- rc = lnet_md_link(md, umd.eq_handle);
+ rc = lnet_md_link(md, umd.eq_handle, cpt);
if (rc != 0)
goto failed;
lnet_md2handle(handle, md);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
return 0;
failed:
lnet_md_free_locked(md);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
return rc;
}
int
LNetMDUnlink (lnet_handle_md_t mdh)
{
- lnet_event_t ev;
- lnet_libmd_t *md;
+ lnet_event_t ev;
+ lnet_libmd_t *md;
+ int cpt;
- LASSERT (the_lnet.ln_init);
- LASSERT (the_lnet.ln_refcount > 0);
+ LASSERT(the_lnet.ln_init);
+ LASSERT(the_lnet.ln_refcount > 0);
- lnet_res_lock();
+ cpt = lnet_cpt_of_cookie(mdh.cookie);
+ lnet_res_lock(cpt);
md = lnet_handle2md(&mdh);
if (md == NULL) {
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
return -ENOENT;
}
lnet_md_unlink(md);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
return 0;
}
lnet_handle_me_t *handle)
{
struct lnet_match_table *mtable;
- lnet_me_t *me;
- cfs_list_t *head;
+ struct lnet_me *me;
+ cfs_list_t *head;
- LASSERT (the_lnet.ln_init);
- LASSERT (the_lnet.ln_refcount > 0);
+ LASSERT(the_lnet.ln_init);
+ LASSERT(the_lnet.ln_refcount > 0);
- if ((int)portal >= the_lnet.ln_nportals)
- return -EINVAL;
+ if ((int)portal >= the_lnet.ln_nportals)
+ return -EINVAL;
mtable = lnet_mt_of_attach(portal, match_id,
match_bits, ignore_bits, pos);
if (me == NULL)
return -ENOMEM;
- lnet_res_lock();
+ lnet_res_lock(mtable->mt_cpt);
me->me_portal = portal;
me->me_match_id = match_id;
me->me_unlink = unlink;
me->me_md = NULL;
- lnet_res_lh_initialize(&the_lnet.ln_me_container, &me->me_lh);
+ lnet_res_lh_initialize(the_lnet.ln_me_containers[mtable->mt_cpt],
+ &me->me_lh);
head = lnet_mt_match_head(mtable, match_id, match_bits);
- LASSERT (head != NULL);
- if (pos == LNET_INS_AFTER)
- cfs_list_add_tail(&me->me_list, head);
- else
- cfs_list_add(&me->me_list, head);
-
- lnet_me2handle(handle, me);
+ if (pos == LNET_INS_AFTER || pos == LNET_INS_LOCAL)
+ cfs_list_add_tail(&me->me_list, head);
+ else
+ cfs_list_add(&me->me_list, head);
- lnet_res_unlock();
+ lnet_me2handle(handle, me);
- return 0;
+ lnet_res_unlock(mtable->mt_cpt);
+ return 0;
}
/**
lnet_unlink_t unlink, lnet_ins_pos_t pos,
lnet_handle_me_t *handle)
{
- lnet_me_t *current_me;
- lnet_me_t *new_me;
- lnet_portal_t *ptl;
+ struct lnet_me *current_me;
+ struct lnet_me *new_me;
+ struct lnet_portal *ptl;
+ int cpt;
+
+ LASSERT(the_lnet.ln_init);
+ LASSERT(the_lnet.ln_refcount > 0);
- LASSERT (the_lnet.ln_init);
- LASSERT (the_lnet.ln_refcount > 0);
+ if (pos == LNET_INS_LOCAL)
+ return -EPERM;
new_me = lnet_me_alloc();
if (new_me == NULL)
return -ENOMEM;
- lnet_res_lock();
+ cpt = lnet_cpt_of_cookie(current_meh.cookie);
+
+ lnet_res_lock(cpt);
current_me = lnet_handle2me(¤t_meh);
if (current_me == NULL) {
lnet_me_free_locked(new_me);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
return -ENOENT;
}
if (lnet_ptl_is_unique(ptl)) {
/* nosense to insertion on unique portal */
lnet_me_free_locked(new_me);
- lnet_res_unlock();
- return -EPERM;
+ lnet_res_unlock(cpt);
+ return -EPERM;
}
new_me->me_portal = current_me->me_portal;
new_me->me_unlink = unlink;
new_me->me_md = NULL;
- lnet_res_lh_initialize(&the_lnet.ln_me_container, &new_me->me_lh);
+ lnet_res_lh_initialize(the_lnet.ln_me_containers[cpt], &new_me->me_lh);
if (pos == LNET_INS_AFTER)
cfs_list_add(&new_me->me_list, ¤t_me->me_list);
lnet_me2handle(handle, new_me);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
return 0;
}
int
LNetMEUnlink(lnet_handle_me_t meh)
{
- lnet_me_t *me;
- lnet_libmd_t *md;
- lnet_event_t ev;
+ lnet_me_t *me;
+ lnet_libmd_t *md;
+ lnet_event_t ev;
+ int cpt;
- LASSERT (the_lnet.ln_init);
- LASSERT (the_lnet.ln_refcount > 0);
+ LASSERT(the_lnet.ln_init);
+ LASSERT(the_lnet.ln_refcount > 0);
- lnet_res_lock();
+ cpt = lnet_cpt_of_cookie(meh.cookie);
+ lnet_res_lock(cpt);
- me = lnet_handle2me(&meh);
- if (me == NULL) {
- lnet_res_unlock();
+ me = lnet_handle2me(&meh);
+ if (me == NULL) {
+ lnet_res_unlock(cpt);
return -ENOENT;
}
md->md_refcount == 0) {
lnet_build_unlink_event(md, &ev);
lnet_eq_enqueue_event(md->md_eq, &ev);
- }
+ }
- lnet_me_unlink(me);
+ lnet_me_unlink(me);
- lnet_res_unlock();
- return 0;
+ lnet_res_unlock(cpt);
+ return 0;
}
/* call with lnet_res_lock please */
lnet_libmd_t *md;
int rlength;
int mlength;
+ int cpt;
- lnet_res_lock();
+ cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
+ lnet_res_lock(cpt);
src.nid = hdr->src_nid;
src.pid = hdr->src_pid;
CERROR("REPLY MD also attached to portal %d\n",
md->md_me->me_portal);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
return ENOENT; /* +ve: OK but no match */
}
libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
mlength);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
return ENOENT; /* +ve: OK but no match */
}
if (mlength != 0)
lnet_setpayloadbuffer(msg);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
lnet_build_msg_event(msg, LNET_EVENT_REPLY);
lnet_hdr_t *hdr = &msg->msg_hdr;
lnet_process_id_t src = {0};
lnet_libmd_t *md;
+ int cpt;
src.nid = hdr->src_nid;
src.pid = hdr->src_pid;
hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
- lnet_res_lock();
+ cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
+ lnet_res_lock(cpt);
/* NB handles only looked up by creator (no flips) */
md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
CERROR("Source MD also attached to portal %d\n",
md->md_me->me_portal);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
return ENOENT; /* +ve! */
}
lnet_msg_attach_md(msg, md, 0, 0);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
lnet_build_msg_event(msg, LNET_EVENT_ACK);
__u64 match_bits, unsigned int offset,
__u64 hdr_data)
{
- lnet_msg_t *msg;
- lnet_libmd_t *md;
- int rc;
+ struct lnet_msg *msg;
+ struct lnet_libmd *md;
+ int cpt;
+ int rc;
LASSERT (the_lnet.ln_init);
LASSERT (the_lnet.ln_refcount > 0);
}
msg->msg_vmflush = !!cfs_memory_pressure_get();
- lnet_res_lock();
+ cpt = lnet_cpt_of_cookie(mdh.cookie);
+ lnet_res_lock(cpt);
md = lnet_handle2md(&mdh);
if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
if (md != NULL && md->md_me != NULL)
CERROR("Source MD also attached to portal %d\n",
md->md_me->me_portal);
-
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
lnet_msg_free(msg);
-
return -ENOENT;
}
LNET_WIRE_HANDLE_COOKIE_NONE;
}
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
lnet_build_msg_event(msg, LNET_EVENT_SEND);
* CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
* lnet_finalize() is called on it, so the LND must call this first */
- lnet_msg_t *msg = lnet_msg_alloc();
- lnet_libmd_t *getmd = getmsg->msg_md;
- lnet_process_id_t peer_id = getmsg->msg_target;
+ struct lnet_msg *msg = lnet_msg_alloc();
+ struct lnet_libmd *getmd = getmsg->msg_md;
+ lnet_process_id_t peer_id = getmsg->msg_target;
+ int cpt;
- LASSERT (!getmsg->msg_target_is_router);
- LASSERT (!getmsg->msg_routing);
+ LASSERT(!getmsg->msg_target_is_router);
+ LASSERT(!getmsg->msg_routing);
- lnet_res_lock();
+ cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
+ lnet_res_lock(cpt);
LASSERT (getmd->md_refcount > 0);
CERROR ("%s: Dropping REPLY from %s for inactive MD %p\n",
libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
getmd);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
goto drop;
}
msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
LNET_LOCK();
lnet_msg_commit(msg, 0);
lnet_process_id_t target, unsigned int portal,
__u64 match_bits, unsigned int offset)
{
- lnet_msg_t *msg;
- lnet_libmd_t *md;
- int rc;
+ struct lnet_msg *msg;
+ struct lnet_libmd *md;
+ int cpt;
+ int rc;
LASSERT (the_lnet.ln_init);
LASSERT (the_lnet.ln_refcount > 0);
return -ENOMEM;
}
- lnet_res_lock();
+ cpt = lnet_cpt_of_cookie(mdh.cookie);
+ lnet_res_lock(cpt);
md = lnet_handle2md(&mdh);
if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
CERROR("REPLY MD also attached to portal %d\n",
md->md_me->me_portal);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
lnet_msg_free(msg);
msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
/* NB handles only looked up by creator (no flips) */
- msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
- the_lnet.ln_interface_cookie;
- msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
- md->md_lh.lh_cookie;
+ msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
+ the_lnet.ln_interface_cookie;
+ msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
+ md->md_lh.lh_cookie;
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
lnet_build_msg_event(msg, LNET_EVENT_SEND);
{
struct lnet_msg_container *container;
int my_slot;
+ int cpt;
int i;
LASSERT (!cfs_in_interrupt ());
msg->msg_ev.status = status;
if (msg->msg_md != NULL) {
- lnet_res_lock();
+ cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
+
+ lnet_res_lock(cpt);
lnet_msg_detach_md(msg, status);
- lnet_res_unlock();
+ lnet_res_unlock(cpt);
}
if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
#include <lnet/lib-lnet.h>
+/* NB: add /proc interfaces in upcoming patches */
+int portal_rotor;
+CFS_MODULE_PARM(portal_rotor, "i", int, 0644,
+ "redirect PUTs to different cpu-partitions");
+
static int
lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id,
__u64 mbits, __u64 ignore_bits)
goto match;
/* unset, new portal */
- lnet_res_lock();
+ lnet_ptl_lock(ptl);
/* check again with lock */
if (unlikely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl))) {
- lnet_res_unlock();
+ lnet_ptl_unlock(ptl);
goto match;
}
else
lnet_ptl_setopt(ptl, LNET_PTL_MATCH_WILDCARD);
- lnet_res_unlock();
+ lnet_ptl_unlock(ptl);
return 1;
return 1;
}
+static void
+lnet_ptl_enable_mt(struct lnet_portal *ptl, int cpt)
+{
+ struct lnet_match_table *mtable = ptl->ptl_mtables[cpt];
+ int i;
+
+ /* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
+ LASSERT(lnet_ptl_is_wildcard(ptl));
+
+ mtable->mt_enabled = 1;
+
+ ptl->ptl_mt_maps[ptl->ptl_mt_nmaps] = cpt;
+ for (i = ptl->ptl_mt_nmaps - 1; i >= 0; i--) {
+ LASSERT(ptl->ptl_mt_maps[i] != cpt);
+ if (ptl->ptl_mt_maps[i] < cpt)
+ break;
+
+ /* swap to order */
+ ptl->ptl_mt_maps[i + 1] = ptl->ptl_mt_maps[i];
+ ptl->ptl_mt_maps[i] = cpt;
+ }
+
+ ptl->ptl_mt_nmaps++;
+}
+
+static void
+lnet_ptl_disable_mt(struct lnet_portal *ptl, int cpt)
+{
+ struct lnet_match_table *mtable = ptl->ptl_mtables[cpt];
+ int i;
+
+ /* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
+ LASSERT(lnet_ptl_is_wildcard(ptl));
+
+ if (LNET_CPT_NUMBER == 1)
+ return; /* never disable the only match-table */
+
+ mtable->mt_enabled = 0;
+
+ LASSERT(ptl->ptl_mt_nmaps > 0 &&
+ ptl->ptl_mt_nmaps <= LNET_CPT_NUMBER);
+
+ /* remove it from mt_maps */
+ ptl->ptl_mt_nmaps--;
+ for (i = 0; i < ptl->ptl_mt_nmaps; i++) {
+ if (ptl->ptl_mt_maps[i] >= cpt) /* overwrite it */
+ ptl->ptl_mt_maps[i] = ptl->ptl_mt_maps[i + 1];
+ }
+}
+
static int
lnet_try_match_md(lnet_libmd_t *md,
struct lnet_match_info *info, struct lnet_msg *msg)
unsigned int mlength;
lnet_me_t *me = md->md_me;
- /* mismatched MD op */
- if ((md->md_options & info->mi_opc) == 0)
- return LNET_MATCHMD_NONE;
-
/* MD exhausted */
if (lnet_md_exhausted(md))
+ return LNET_MATCHMD_NONE | LNET_MATCHMD_EXHAUSTED;
+
+ /* mismatched MD op */
+ if ((md->md_options & info->mi_opc) == 0)
return LNET_MATCHMD_NONE;
/* mismatched ME nid/pid? */
lnet_msg_attach_md(msg, md, offset, mlength);
md->md_offset = offset + mlength;
+ if (!lnet_md_exhausted(md))
+ return LNET_MATCHMD_OK;
+
/* Auto-unlink NOW, so the ME gets unlinked if required.
* We bumped md->md_refcount above so the MD just gets flagged
* for unlink when it is finalized. */
- if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0 &&
- lnet_md_exhausted(md)) {
+ if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0)
lnet_md_unlink(md);
- }
- return LNET_MATCHMD_OK;
+ return LNET_MATCHMD_OK | LNET_MATCHMD_EXHAUSTED;
+}
+
+static struct lnet_match_table *
+lnet_match2mt(struct lnet_portal *ptl, lnet_process_id_t id, __u64 mbits)
+{
+ if (LNET_CPT_NUMBER == 1)
+ return ptl->ptl_mtables[0]; /* the only one */
+
+ /* if it's a unique portal, return match-table hashed by NID */
+ return lnet_ptl_is_unique(ptl) ?
+ ptl->ptl_mtables[lnet_cpt_of_nid(id.nid)] : NULL;
}
struct lnet_match_table *
lnet_mt_of_attach(unsigned int index, lnet_process_id_t id,
__u64 mbits, __u64 ignore_bits, lnet_ins_pos_t pos)
{
- struct lnet_portal *ptl;
+ struct lnet_portal *ptl;
+ struct lnet_match_table *mtable;
+ /* NB: called w/o lock */
LASSERT(index < the_lnet.ln_nportals);
if (!lnet_ptl_match_type(index, id, mbits, ignore_bits))
return NULL;
ptl = the_lnet.ln_portals[index];
- /* NB: Now we only have one match-table for each portal,
- * and will have match-table per CPT in upcoming changes,
- * ME will be scattered to different match-tables based
- * on attaching information */
- return ptl->ptl_mtable;
+
+ mtable = lnet_match2mt(ptl, id, mbits);
+ if (mtable != NULL) /* unique portal or only one match-table */
+ return mtable;
+
+ /* it's a wildcard portal */
+ switch (pos) {
+ default:
+ return NULL;
+ case LNET_INS_BEFORE:
+ case LNET_INS_AFTER:
+ /* posted by no affinity thread, always hash to specific
+ * match-table to avoid buffer stealing which is heavy */
+ return ptl->ptl_mtables[ptl->ptl_index % LNET_CPT_NUMBER];
+ case LNET_INS_LOCAL:
+ /* posted by cpu-affinity thread */
+ return ptl->ptl_mtables[lnet_cpt_current()];
+ }
}
struct lnet_match_table *
lnet_mt_of_match(unsigned int index, lnet_process_id_t id, __u64 mbits)
{
- struct lnet_portal *ptl;
+ struct lnet_match_table *mtable;
+ struct lnet_portal *ptl;
+ int nmaps;
+ int rotor;
+ int cpt;
+ /* NB: called w/o lock */
LASSERT(index < the_lnet.ln_nportals);
-
ptl = the_lnet.ln_portals[index];
- if (!lnet_ptl_is_unique(ptl) &&
- !lnet_ptl_is_wildcard(ptl) && !lnet_ptl_is_lazy(ptl))
- return NULL;
- /* NB: Now we only have one match-table for each portal,
- * and will have match-table per CPT in upcoming changes,
- * request will be scattered to different match-tables based
- * on matching information */
- return ptl->ptl_mtable;
+ LASSERT(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl));
+
+ mtable = lnet_match2mt(ptl, id, mbits);
+ if (mtable != NULL)
+ return mtable;
+
+ /* it's a wildcard portal */
+ if (!portal_rotor) {
+ cpt = lnet_cpt_current();
+ if (ptl->ptl_mtables[cpt]->mt_enabled)
+ return ptl->ptl_mtables[cpt];
+ }
+
+ rotor = ptl->ptl_rotor++;
+ cpt = rotor % LNET_CPT_NUMBER;
+
+ if (!ptl->ptl_mtables[cpt]->mt_enabled) {
+ /* is there any active entry for this portal? */
+ nmaps = ptl->ptl_mt_nmaps;
+ /* map to an active mtable to avoid heavy "stealing" */
+ if (nmaps != 0) {
+ /* NB: there is possibility that ptl_mt_maps is being
+ * changed because we are not under protection of
+ * lnet_ptl_lock, but it shouldn't hurt anything */
+ cpt = ptl->ptl_mt_maps[rotor % nmaps];
+ }
+ }
+
+ return ptl->ptl_mtables[cpt];
}
cfs_list_t *
if (lnet_ptl_is_wildcard(ptl)) {
return &mtable->mt_mlist;
- } else if (lnet_ptl_is_unique(ptl)) {
+ } else {
unsigned long hash = mbits + id.nid + id.pid;
+ LASSERT(lnet_ptl_is_unique(ptl));
hash = cfs_hash_long(hash, LNET_MT_HASH_BITS);
return &mtable->mt_mhash[hash];
}
-
- return NULL;
}
int
cfs_list_t *head;
lnet_me_t *me;
lnet_me_t *tmp;
+ int exhausted = 0;
int rc;
+ /* NB: only wildcard portal can return LNET_MATCHMD_EXHAUSTED */
+ if (lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]))
+ exhausted = LNET_MATCHMD_EXHAUSTED;
+
head = lnet_mt_match_head(mtable, info->mi_id, info->mi_mbits);
- if (head == NULL) /* nobody posted anything on this portal */
- goto out;
cfs_list_for_each_entry_safe(me, tmp, head, me_list) {
/* ME attached but MD not attached yet */
LASSERT(me == me->me_md->md_me);
rc = lnet_try_match_md(me->me_md, info, msg);
- switch (rc) {
- default:
- LBUG();
-
- case LNET_MATCHMD_NONE:
- continue;
-
- case LNET_MATCHMD_OK:
- return LNET_MATCHMD_OK;
+ if ((rc & LNET_MATCHMD_EXHAUSTED) == 0)
+ exhausted = 0; /* mlist is not empty */
- case LNET_MATCHMD_DROP:
- return LNET_MATCHMD_DROP;
+ if ((rc & LNET_MATCHMD_FINISH) != 0) {
+ /* don't return EXHAUSTED bit because we don't know
+ * whether the mlist is empty or not */
+ return rc & ~LNET_MATCHMD_EXHAUSTED;
}
- /* not reached */
}
- out:
if (info->mi_opc == LNET_MD_OP_GET ||
!lnet_ptl_is_lazy(the_lnet.ln_portals[info->mi_portal]))
- return LNET_MATCHMD_DROP;
+ return LNET_MATCHMD_DROP | exhausted;
+
+ return LNET_MATCHMD_NONE | exhausted;
+}
+
+static int
+lnet_ptl_match_early(struct lnet_portal *ptl, struct lnet_msg *msg)
+{
+ int rc;
+
+ /* message arrived before any buffer posting on this portal,
+ * simply delay or drop this message */
+ if (likely(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)))
+ return 0;
+
+ lnet_ptl_lock(ptl);
+ /* check it again with hold of lock */
+ if (lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)) {
+ lnet_ptl_unlock(ptl);
+ return 0;
+ }
+
+ if (lnet_ptl_is_lazy(ptl)) {
+ if (msg->msg_rx_ready_delay) {
+ msg->msg_rx_delayed = 1;
+ cfs_list_add_tail(&msg->msg_list,
+ &ptl->ptl_msg_delayed);
+ }
+ rc = LNET_MATCHMD_NONE;
+ } else {
+ rc = LNET_MATCHMD_DROP;
+ }
- return LNET_MATCHMD_NONE;
+ lnet_ptl_unlock(ptl);
+ return rc;
+}
+
+static int
+lnet_ptl_match_delay(struct lnet_portal *ptl,
+ struct lnet_match_info *info, struct lnet_msg *msg)
+{
+ int first = ptl->ptl_mt_maps[0]; /* read w/o lock */
+ int rc = 0;
+ int i;
+
+ /* steal buffer from other CPTs, and delay it if nothing to steal,
+ * this function is more expensive than a regular match, but we
+ * don't expect it can happen a lot */
+ LASSERT(lnet_ptl_is_wildcard(ptl));
+
+ for (i = 0; i < LNET_CPT_NUMBER; i++) {
+ struct lnet_match_table *mtable;
+ int cpt;
+
+ cpt = (first + i) % LNET_CPT_NUMBER;
+ mtable = ptl->ptl_mtables[cpt];
+ if (i != 0 && i != LNET_CPT_NUMBER - 1 && !mtable->mt_enabled)
+ continue;
+
+ lnet_res_lock(cpt);
+ lnet_ptl_lock(ptl);
+
+ if (i == 0) { /* the first try, attach on stealing list */
+ cfs_list_add_tail(&msg->msg_list,
+ &ptl->ptl_msg_stealing);
+ }
+
+ if (!cfs_list_empty(&msg->msg_list)) { /* on stealing list */
+ rc = lnet_mt_match_md(mtable, info, msg);
+
+ if ((rc & LNET_MATCHMD_EXHAUSTED) != 0 &&
+ mtable->mt_enabled)
+ lnet_ptl_disable_mt(ptl, cpt);
+
+ if ((rc & LNET_MATCHMD_FINISH) != 0)
+ cfs_list_del_init(&msg->msg_list);
+
+ } else {
+ /* could be matched by lnet_ptl_attach_md()
+ * which is called by another thread */
+ rc = msg->msg_md == NULL ?
+ LNET_MATCHMD_DROP : LNET_MATCHMD_OK;
+ }
+
+ if (!cfs_list_empty(&msg->msg_list) && /* not matched yet */
+ (i == LNET_CPT_NUMBER - 1 || /* the last CPT */
+ ptl->ptl_mt_nmaps == 0 || /* no active CPT */
+ (ptl->ptl_mt_nmaps == 1 && /* the only active CPT */
+ ptl->ptl_mt_maps[0] == cpt))) {
+ /* nothing to steal, delay or drop */
+ cfs_list_del_init(&msg->msg_list);
+
+ if (lnet_ptl_is_lazy(ptl)) {
+ msg->msg_rx_delayed = 1;
+ cfs_list_add_tail(&msg->msg_list,
+ &ptl->ptl_msg_delayed);
+ rc = LNET_MATCHMD_NONE;
+ } else {
+ rc = LNET_MATCHMD_DROP;
+ }
+ }
+
+ lnet_ptl_unlock(ptl);
+ lnet_res_unlock(cpt);
+
+ if ((rc & LNET_MATCHMD_FINISH) != 0 || msg->msg_rx_delayed)
+ break;
+ }
+
+ return rc;
}
int
return LNET_MATCHMD_DROP;
}
+ ptl = the_lnet.ln_portals[info->mi_portal];
+ rc = lnet_ptl_match_early(ptl, msg);
+ if (rc != 0) /* matched or delayed early message */
+ return rc;
+
mtable = lnet_mt_of_match(info->mi_portal,
info->mi_id, info->mi_mbits);
- if (mtable == NULL) {
- CDEBUG(D_NET, "Drop early message from %s of length %d into "
- "portal %d MB="LPX64"\n",
- libcfs_id2str(info->mi_id), info->mi_rlength,
- info->mi_portal, info->mi_mbits);
- return LNET_MATCHMD_DROP;
- }
-
- ptl = the_lnet.ln_portals[info->mi_portal];
- lnet_res_lock();
+ lnet_res_lock(mtable->mt_cpt);
if (the_lnet.ln_shutdown) {
- rc = LNET_MATCHMD_DROP;
- goto out;
+ rc = LNET_MATCHMD_DROP;
+ goto out1;
}
rc = lnet_mt_match_md(mtable, info, msg);
- if (rc != LNET_MATCHMD_NONE) /* matched or dropping */
- goto out;
+ if ((rc & LNET_MATCHMD_EXHAUSTED) != 0 && mtable->mt_enabled) {
+ lnet_ptl_lock(ptl);
+ lnet_ptl_disable_mt(ptl, mtable->mt_cpt);
+ lnet_ptl_unlock(ptl);
+ }
+
+ if ((rc & LNET_MATCHMD_FINISH) != 0) /* matched or dropping */
+ goto out1;
if (!msg->msg_rx_ready_delay)
- goto out;
+ goto out1;
+ LASSERT(lnet_ptl_is_lazy(ptl));
LASSERT(!msg->msg_rx_delayed);
- msg->msg_rx_delayed = 1;
- cfs_list_add_tail(&msg->msg_list, &ptl->ptl_msgq);
-
- CDEBUG(D_NET,
- "Delaying %s from %s portal %d MB "LPX64" offset %d len %d\n",
- info->mi_opc == LNET_MD_OP_PUT ? "PUT" : "GET",
- libcfs_id2str(info->mi_id), info->mi_portal,
- info->mi_mbits, info->mi_roffset, info->mi_rlength);
- out:
- lnet_res_unlock();
- return rc;
+
+ /* NB: we don't expect "delay" can happen a lot */
+ if (lnet_ptl_is_unique(ptl) || LNET_CPT_NUMBER == 1) {
+ lnet_ptl_lock(ptl);
+
+ msg->msg_rx_delayed = 1;
+ cfs_list_add_tail(&msg->msg_list, &ptl->ptl_msg_delayed);
+
+ lnet_ptl_unlock(ptl);
+ lnet_res_unlock(mtable->mt_cpt);
+
+ } else {
+ lnet_res_unlock(mtable->mt_cpt);
+ rc = lnet_ptl_match_delay(ptl, info, msg);
+ }
+
+ if (msg->msg_rx_delayed) {
+ CDEBUG(D_NET,
+ "Delaying %s from %s ptl %d MB "LPX64" off %d len %d\n",
+ info->mi_opc == LNET_MD_OP_PUT ? "PUT" : "GET",
+ libcfs_id2str(info->mi_id), info->mi_portal,
+ info->mi_mbits, info->mi_roffset, info->mi_rlength);
+ }
+ goto out0;
+ out1:
+ lnet_res_unlock(mtable->mt_cpt);
+ out0:
+ /* EXHAUSTED bit is only meaningful for internal functions */
+ return rc & ~LNET_MATCHMD_EXHAUSTED;
}
void
cfs_list_t *matches, cfs_list_t *drops)
{
struct lnet_portal *ptl = the_lnet.ln_portals[me->me_portal];
+ struct lnet_match_table *mtable;
+ cfs_list_t *head;
lnet_msg_t *tmp;
lnet_msg_t *msg;
+ int exhausted = 0;
+ int cpt;
LASSERT(md->md_refcount == 0); /* a brand new MD */
me->me_md = md;
md->md_me = me;
- cfs_list_for_each_entry_safe(msg, tmp, &ptl->ptl_msgq, msg_list) {
+ cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
+ mtable = ptl->ptl_mtables[cpt];
+
+ if (cfs_list_empty(&ptl->ptl_msg_stealing) &&
+ cfs_list_empty(&ptl->ptl_msg_delayed) &&
+ mtable->mt_enabled)
+ return;
+
+ lnet_ptl_lock(ptl);
+ head = &ptl->ptl_msg_stealing;
+ again:
+ cfs_list_for_each_entry_safe(msg, tmp, head, msg_list) {
struct lnet_match_info info;
lnet_hdr_t *hdr;
int rc;
- LASSERT(msg->msg_rx_delayed);
+ LASSERT(msg->msg_rx_delayed || head == &ptl->ptl_msg_stealing);
hdr = &msg->msg_hdr;
info.mi_id.nid = hdr->src_nid;
rc = lnet_try_match_md(md, &info, msg);
- if (rc == LNET_MATCHMD_NONE)
+ exhausted = (rc & LNET_MATCHMD_EXHAUSTED) != 0;
+ if ((rc & LNET_MATCHMD_NONE) != 0) {
+ if (exhausted)
+ break;
continue;
+ }
/* Hurrah! This _is_ a match */
- cfs_list_del(&msg->msg_list);
+ LASSERT((rc & LNET_MATCHMD_FINISH) != 0);
+ cfs_list_del_init(&msg->msg_list);
- if (rc == LNET_MATCHMD_OK) {
+ if (head == &ptl->ptl_msg_stealing) {
+ if (exhausted)
+ break;
+ /* stealing thread will handle the message */
+ continue;
+ }
+
+ if ((rc & LNET_MATCHMD_OK) != 0) {
cfs_list_add_tail(&msg->msg_list, matches);
CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
info.mi_portal, info.mi_mbits,
info.mi_roffset, info.mi_rlength);
} else {
- LASSERT(rc == LNET_MATCHMD_DROP);
-
cfs_list_add_tail(&msg->msg_list, drops);
}
- if (lnet_md_exhausted(md))
+ if (exhausted)
break;
}
+
+ if (!exhausted && head == &ptl->ptl_msg_stealing) {
+ head = &ptl->ptl_msg_delayed;
+ goto again;
+ }
+
+ if (lnet_ptl_is_wildcard(ptl) && !exhausted && !mtable->mt_enabled)
+ lnet_ptl_enable_mt(ptl, cpt);
+
+ lnet_ptl_unlock(ptl);
}
void
lnet_ptl_cleanup(struct lnet_portal *ptl)
{
struct lnet_match_table *mtable;
+ int i;
- LASSERT(cfs_list_empty(&ptl->ptl_msgq));
-
- if (ptl->ptl_mtable == NULL) /* uninitialized portal */
+ if (ptl->ptl_mtables == NULL) /* uninitialized portal */
return;
- do { /* iterate over match-tables when we have percpt match-table */
+ LASSERT(cfs_list_empty(&ptl->ptl_msg_delayed));
+ LASSERT(cfs_list_empty(&ptl->ptl_msg_stealing));
+#ifndef __KERNEL__
+# ifdef HAVE_LIBPTHREAD
+ pthread_mutex_destroy(&ptl->ptl_lock);
+# endif
+#endif
+ cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
cfs_list_t *mhash;
lnet_me_t *me;
int j;
- mtable = ptl->ptl_mtable;
-
if (mtable->mt_mhash == NULL) /* uninitialized match-table */
continue;
}
LIBCFS_FREE(mhash, sizeof(*mhash) * LNET_MT_HASH_SIZE);
- } while (0);
+ }
- LIBCFS_FREE(ptl->ptl_mtable, sizeof(*mtable));
- ptl->ptl_mtable = NULL;
+ cfs_percpt_free(ptl->ptl_mtables);
+ ptl->ptl_mtables = NULL;
}
int
{
struct lnet_match_table *mtable;
cfs_list_t *mhash;
+ int i;
int j;
- ptl->ptl_index = index;
- CFS_INIT_LIST_HEAD(&ptl->ptl_msgq);
-
- LIBCFS_ALLOC(mtable, sizeof(*mtable));
- if (mtable == NULL) {
+ ptl->ptl_mtables = cfs_percpt_alloc(lnet_cpt_table(),
+ sizeof(struct lnet_match_table));
+ if (ptl->ptl_mtables == NULL) {
CERROR("Failed to create match table for portal %d\n", index);
return -ENOMEM;
}
- ptl->ptl_mtable = mtable;
- do { /* iterate over match-tables when we have percpt match-table */
- LIBCFS_ALLOC(mhash, sizeof(*mhash) * LNET_MT_HASH_SIZE);
+ ptl->ptl_index = index;
+ CFS_INIT_LIST_HEAD(&ptl->ptl_msg_delayed);
+ CFS_INIT_LIST_HEAD(&ptl->ptl_msg_stealing);
+#ifdef __KERNEL__
+ cfs_spin_lock_init(&ptl->ptl_lock);
+#else
+# ifdef HAVE_LIBPTHREAD
+ pthread_mutex_init(&ptl->ptl_lock, NULL);
+# endif
+#endif
+ cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
+ LIBCFS_CPT_ALLOC(mhash, lnet_cpt_table(), i,
+ sizeof(*mhash) * LNET_MT_HASH_SIZE);
if (mhash == NULL) {
CERROR("Failed to create match hash for portal %d\n",
index);
CFS_INIT_LIST_HEAD(&mtable->mt_mlist);
mtable->mt_portal = index;
- } while (0);
+ mtable->mt_cpt = i;
+ }
return 0;
failed:
int size;
int i;
- size = sizeof(struct lnet_portal);
+ size = offsetof(struct lnet_portal, ptl_mt_maps[LNET_CPT_NUMBER]);
the_lnet.ln_nportals = MAX_PORTALS;
the_lnet.ln_portals = cfs_array_alloc(the_lnet.ln_nportals, size);
CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
ptl = the_lnet.ln_portals[portal];
- lnet_res_lock();
+ lnet_res_lock(LNET_LOCK_EX);
+ lnet_ptl_lock(ptl);
+
lnet_ptl_setopt(ptl, LNET_PTL_LAZY);
- lnet_res_unlock();
+
+ lnet_ptl_unlock(ptl);
+ lnet_res_unlock(LNET_LOCK_EX);
return 0;
}
ptl = the_lnet.ln_portals[portal];
- lnet_res_lock();
+ lnet_res_lock(LNET_LOCK_EX);
+ lnet_ptl_lock(ptl);
if (!lnet_ptl_is_lazy(ptl)) {
- lnet_res_unlock();
+ lnet_ptl_unlock(ptl);
+ lnet_res_unlock(LNET_LOCK_EX);
return 0;
}
CDEBUG(D_NET, "clearing portal %d lazy\n", portal);
/* grab all the blocked messages atomically */
- cfs_list_splice_init(&ptl->ptl_msgq, &zombies);
+ cfs_list_splice_init(&ptl->ptl_msg_delayed, &zombies);
lnet_ptl_unsetopt(ptl, LNET_PTL_LAZY);
- lnet_res_unlock();
+ lnet_ptl_unlock(ptl);
+ lnet_res_unlock(LNET_LOCK_EX);
lnet_drop_delayed_msg_list(&zombies, "Clearing lazy portal attr");