From a9c31485993bcaa19608bd1333170fc14330c87c Mon Sep 17 00:00:00 2001 From: johann Date: Thu, 6 Dec 2007 12:41:56 +0000 Subject: [PATCH] Branch HEAD b=13843 i=adilger i=nathan Dynamically create DLM blocking callback threads + fix an issue with MacOS implementation of atomic_sub_and_test(). --- lustre/include/liblustre.h | 2 + lustre/ldlm/ldlm_lockd.c | 129 +++++++++++++++++++++++++++++++++------------ 2 files changed, 98 insertions(+), 33 deletions(-) diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index feff186..0fbd5bd 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -735,6 +735,8 @@ typedef struct { volatile int counter; } atomic_t; #define atomic_inc(a) (((a)->counter)++) #define atomic_dec(a) do { (a)->counter--; } while (0) #define atomic_add(b,a) do {(a)->counter += b;} while (0) +#define atomic_add_return(n,a) ((a)->counter = n) +#define atomic_inc_return(a) atomic_add_return(1,a) #define atomic_sub(b,a) do {(a)->counter -= b;} while (0) #define atomic_sub_return(n,a) ((a)->counter -= n) #define atomic_dec_return(a) atomic_sub_return(1,a) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 7eb6127..ef065d51 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -40,6 +40,12 @@ #include #include "ldlm_internal.h" +#ifdef __KERNEL__ +static int ldlm_num_threads; +CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444, + "number of DLM service threads to start"); +#endif + extern cfs_mem_cache_t *ldlm_resource_slab; extern cfs_mem_cache_t *ldlm_lock_slab; static struct semaphore ldlm_ref_sem; @@ -85,8 +91,11 @@ struct ldlm_bl_pool { spinlock_t blp_lock; struct list_head blp_list; cfs_waitq_t blp_waitq; - atomic_t blp_num_threads; struct completion blp_comp; + atomic_t blp_num_threads; + atomic_t blp_busy_threads; + int blp_min_threads; + int blp_max_threads; }; struct ldlm_bl_work_item { @@ -1746,37 +1755,83 @@ static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp) return blwi; } +/* This only contains temporary data until the thread starts */ struct ldlm_bl_thread_data { - int bltd_num; + char bltd_name[CFS_CURPROC_COMM_MAX]; struct ldlm_bl_pool *bltd_blp; + struct completion bltd_comp; + int bltd_num; }; +static int ldlm_bl_thread_main(void *arg); + +static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) +{ + struct ldlm_bl_thread_data bltd = { .bltd_blp = blp }; + int rc; + + init_completion(&bltd.bltd_comp); + rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0); + if (rc < 0) { + CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n", + atomic_read(&blp->blp_num_threads), rc); + return rc; + } + wait_for_completion(&bltd.bltd_comp); + + return 0; +} + static int ldlm_bl_thread_main(void *arg) { - struct ldlm_bl_thread_data *bltd = arg; - struct ldlm_bl_pool *blp = bltd->bltd_blp; + struct ldlm_bl_pool *blp; ENTRY; { - char name[CFS_CURPROC_COMM_MAX]; - snprintf(name, sizeof(name) - 1, "ldlm_bl_%02d", - bltd->bltd_num); - cfs_daemonize(name); - } + struct ldlm_bl_thread_data *bltd = arg; - atomic_inc(&blp->blp_num_threads); - complete(&blp->blp_comp); + blp = bltd->bltd_blp; + + bltd->bltd_num = atomic_inc_return(&blp->blp_num_threads) - 1; + atomic_inc(&blp->blp_busy_threads); + + snprintf(bltd->bltd_name, sizeof(bltd->bltd_name) - 1, + "ldlm_bl_%02d", bltd->bltd_num); + cfs_daemonize(bltd->bltd_name); - while(1) { + complete(&bltd->bltd_comp); + /* cannot use bltd after this, it is only on caller's stack */ + } + + while (1) { struct l_wait_info lwi = { 0 }; struct ldlm_bl_work_item *blwi = NULL; - l_wait_event_exclusive(blp->blp_waitq, - (blwi = ldlm_bl_get_work(blp)) != NULL, - &lwi); + blwi = ldlm_bl_get_work(blp); - if (blwi->blwi_ns == NULL) - break; + if (blwi == NULL) { + int busy; + + atomic_dec(&blp->blp_busy_threads); + l_wait_event_exclusive(blp->blp_waitq, + (blwi = ldlm_bl_get_work(blp)) != NULL, + &lwi); + busy = atomic_inc_return(&blp->blp_busy_threads); + + if (blwi->blwi_ns == NULL) + /* added by ldlm_cleanup() */ + break; + + /* Not fatal if racy and have a few too many threads */ + if (unlikely(busy < blp->blp_max_threads && + busy >= atomic_read(&blp->blp_num_threads))) + /* discard the return value, we tried */ + ldlm_bl_thread_start(blp); + } else { + if (blwi->blwi_ns == NULL) + /* added by ldlm_cleanup() */ + break; + } if (blwi->blwi_count) { /* The special case when we cancel locks in lru @@ -1792,6 +1847,7 @@ static int ldlm_bl_thread_main(void *arg) OBD_FREE(blwi, sizeof(*blwi)); } + atomic_dec(&blp->blp_busy_threads); atomic_dec(&blp->blp_num_threads); complete(&blp->blp_comp); RETURN(0); @@ -1839,6 +1895,8 @@ static int ldlm_setup(void) { struct ldlm_bl_pool *blp; int rc = 0; + int ldlm_min_threads = LDLM_THREADS_AUTO_MIN; + int ldlm_max_threads = LDLM_THREADS_AUTO_MAX; #ifdef __KERNEL__ int i; #endif @@ -1857,13 +1915,24 @@ static int ldlm_setup(void) GOTO(out_free, rc); #endif +#ifdef __KERNEL__ + if (ldlm_num_threads) { + /* If ldlm_num_threads is set, it is the min and the max. */ + if (ldlm_num_threads > LDLM_THREADS_AUTO_MAX) + ldlm_num_threads = LDLM_THREADS_AUTO_MAX; + if (ldlm_num_threads < LDLM_THREADS_AUTO_MIN) + ldlm_num_threads = LDLM_THREADS_AUTO_MIN; + ldlm_min_threads = ldlm_max_threads = ldlm_num_threads; + } +#endif + ldlm_state->ldlm_cb_service = ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE, LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, ldlm_timeout * 900, ldlm_callback_handler, "ldlm_cbd", ldlm_svc_proc_dir, NULL, - LDLM_THREADS_AUTO_MIN, LDLM_THREADS_AUTO_MAX, + ldlm_min_threads, ldlm_max_threads, "ldlm_cb", LCT_MD_THREAD|LCT_DT_THREAD); @@ -1878,7 +1947,7 @@ static int ldlm_setup(void) LDLM_CANCEL_REPLY_PORTAL, ldlm_timeout * 6000, ldlm_cancel_handler, "ldlm_canceld", ldlm_svc_proc_dir, NULL, - LDLM_THREADS_AUTO_MIN, LDLM_THREADS_AUTO_MAX, + ldlm_min_threads, ldlm_max_threads, "ldlm_cn", LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD); @@ -1892,25 +1961,19 @@ static int ldlm_setup(void) GOTO(out_proc, rc = -ENOMEM); ldlm_state->ldlm_bl_pool = blp; - atomic_set(&blp->blp_num_threads, 0); - cfs_waitq_init(&blp->blp_waitq); spin_lock_init(&blp->blp_lock); - CFS_INIT_LIST_HEAD(&blp->blp_list); + cfs_waitq_init(&blp->blp_waitq); + atomic_set(&blp->blp_num_threads, 0); + atomic_set(&blp->blp_busy_threads, 0); + blp->blp_min_threads = ldlm_min_threads; + blp->blp_max_threads = ldlm_max_threads; #ifdef __KERNEL__ - for (i = 0; i < LDLM_BL_THREADS; i++) { - struct ldlm_bl_thread_data bltd = { - .bltd_num = i, - .bltd_blp = blp, - }; - init_completion(&blp->blp_comp); - rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0); - if (rc < 0) { - CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc); + for (i = 0; i < blp->blp_min_threads; i++) { + rc = ldlm_bl_thread_start(blp); + if (rc < 0) GOTO(out_thread, rc); - } - wait_for_completion(&blp->blp_comp); } rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service); -- 1.8.3.1