#include <libcfs/list.h>
#include "ldlm_internal.h"
+#ifdef __KERNEL__
+static int ldlm_num_threads;
+CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
+ "number of DLM service threads to start");
+#endif
+
extern cfs_mem_cache_t *ldlm_resource_slab;
extern cfs_mem_cache_t *ldlm_lock_slab;
static struct semaphore ldlm_ref_sem;
spinlock_t blp_lock;
struct list_head blp_list;
cfs_waitq_t blp_waitq;
- atomic_t blp_num_threads;
struct completion blp_comp;
+ atomic_t blp_num_threads;
+ atomic_t blp_busy_threads;
+ int blp_min_threads;
+ int blp_max_threads;
};
struct ldlm_bl_work_item {
return blwi;
}
+/* This only contains temporary data until the thread starts */
struct ldlm_bl_thread_data {
- int bltd_num;
+ char bltd_name[CFS_CURPROC_COMM_MAX];
struct ldlm_bl_pool *bltd_blp;
+ struct completion bltd_comp;
+ int bltd_num;
};
+static int ldlm_bl_thread_main(void *arg);
+
+static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
+{
+ struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
+ int rc;
+
+ init_completion(&bltd.bltd_comp);
+ rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
+ if (rc < 0) {
+ CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
+ atomic_read(&blp->blp_num_threads), rc);
+ return rc;
+ }
+ wait_for_completion(&bltd.bltd_comp);
+
+ return 0;
+}
+
static int ldlm_bl_thread_main(void *arg)
{
- struct ldlm_bl_thread_data *bltd = arg;
- struct ldlm_bl_pool *blp = bltd->bltd_blp;
+ struct ldlm_bl_pool *blp;
ENTRY;
{
- char name[CFS_CURPROC_COMM_MAX];
- snprintf(name, sizeof(name) - 1, "ldlm_bl_%02d",
- bltd->bltd_num);
- cfs_daemonize(name);
- }
+ struct ldlm_bl_thread_data *bltd = arg;
- atomic_inc(&blp->blp_num_threads);
- complete(&blp->blp_comp);
+ blp = bltd->bltd_blp;
+
+ bltd->bltd_num = atomic_inc_return(&blp->blp_num_threads) - 1;
+ atomic_inc(&blp->blp_busy_threads);
+
+ snprintf(bltd->bltd_name, sizeof(bltd->bltd_name) - 1,
+ "ldlm_bl_%02d", bltd->bltd_num);
+ cfs_daemonize(bltd->bltd_name);
- while(1) {
+ complete(&bltd->bltd_comp);
+ /* cannot use bltd after this, it is only on caller's stack */
+ }
+
+ while (1) {
struct l_wait_info lwi = { 0 };
struct ldlm_bl_work_item *blwi = NULL;
- l_wait_event_exclusive(blp->blp_waitq,
- (blwi = ldlm_bl_get_work(blp)) != NULL,
- &lwi);
+ blwi = ldlm_bl_get_work(blp);
- if (blwi->blwi_ns == NULL)
- break;
+ if (blwi == NULL) {
+ int busy;
+
+ atomic_dec(&blp->blp_busy_threads);
+ l_wait_event_exclusive(blp->blp_waitq,
+ (blwi = ldlm_bl_get_work(blp)) != NULL,
+ &lwi);
+ busy = atomic_inc_return(&blp->blp_busy_threads);
+
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ break;
+
+ /* Not fatal if racy and have a few too many threads */
+ if (unlikely(busy < blp->blp_max_threads &&
+ busy >= atomic_read(&blp->blp_num_threads)))
+ /* discard the return value, we tried */
+ ldlm_bl_thread_start(blp);
+ } else {
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ break;
+ }
if (blwi->blwi_count) {
/* The special case when we cancel locks in lru
OBD_FREE(blwi, sizeof(*blwi));
}
+ atomic_dec(&blp->blp_busy_threads);
atomic_dec(&blp->blp_num_threads);
complete(&blp->blp_comp);
RETURN(0);
{
struct ldlm_bl_pool *blp;
int rc = 0;
+ int ldlm_min_threads = LDLM_THREADS_AUTO_MIN;
+ int ldlm_max_threads = LDLM_THREADS_AUTO_MAX;
#ifdef __KERNEL__
int i;
#endif
GOTO(out_free, rc);
#endif
+#ifdef __KERNEL__
+ if (ldlm_num_threads) {
+ /* If ldlm_num_threads is set, it is the min and the max. */
+ if (ldlm_num_threads > LDLM_THREADS_AUTO_MAX)
+ ldlm_num_threads = LDLM_THREADS_AUTO_MAX;
+ if (ldlm_num_threads < LDLM_THREADS_AUTO_MIN)
+ ldlm_num_threads = LDLM_THREADS_AUTO_MIN;
+ ldlm_min_threads = ldlm_max_threads = ldlm_num_threads;
+ }
+#endif
+
ldlm_state->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
LDLM_CB_REPLY_PORTAL, ldlm_timeout * 900,
ldlm_callback_handler, "ldlm_cbd",
ldlm_svc_proc_dir, NULL,
- LDLM_THREADS_AUTO_MIN, LDLM_THREADS_AUTO_MAX,
+ ldlm_min_threads, ldlm_max_threads,
"ldlm_cb",
LCT_MD_THREAD|LCT_DT_THREAD);
LDLM_CANCEL_REPLY_PORTAL, ldlm_timeout * 6000,
ldlm_cancel_handler, "ldlm_canceld",
ldlm_svc_proc_dir, NULL,
- LDLM_THREADS_AUTO_MIN, LDLM_THREADS_AUTO_MAX,
+ ldlm_min_threads, ldlm_max_threads,
"ldlm_cn",
LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD);
GOTO(out_proc, rc = -ENOMEM);
ldlm_state->ldlm_bl_pool = blp;
- atomic_set(&blp->blp_num_threads, 0);
- cfs_waitq_init(&blp->blp_waitq);
spin_lock_init(&blp->blp_lock);
-
CFS_INIT_LIST_HEAD(&blp->blp_list);
+ cfs_waitq_init(&blp->blp_waitq);
+ atomic_set(&blp->blp_num_threads, 0);
+ atomic_set(&blp->blp_busy_threads, 0);
+ blp->blp_min_threads = ldlm_min_threads;
+ blp->blp_max_threads = ldlm_max_threads;
#ifdef __KERNEL__
- for (i = 0; i < LDLM_BL_THREADS; i++) {
- struct ldlm_bl_thread_data bltd = {
- .bltd_num = i,
- .bltd_blp = blp,
- };
- init_completion(&blp->blp_comp);
- rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
- if (rc < 0) {
- CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
+ for (i = 0; i < blp->blp_min_threads; i++) {
+ rc = ldlm_bl_thread_start(blp);
+ if (rc < 0)
GOTO(out_thread, rc);
- }
- wait_for_completion(&blp->blp_comp);
}
rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service);