#include <libcfs/list.h>
#include "ldlm_internal.h"
+#ifdef __KERNEL__
+static int ldlm_num_threads;
+CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
+ "number of DLM service threads to start");
+#endif
+
extern cfs_mem_cache_t *ldlm_resource_slab;
extern cfs_mem_cache_t *ldlm_lock_slab;
static struct semaphore ldlm_ref_sem;
spinlock_t blp_lock;
struct list_head blp_list;
cfs_waitq_t blp_waitq;
- atomic_t blp_num_threads;
struct completion blp_comp;
+ atomic_t blp_num_threads;
+ atomic_t blp_busy_threads;
+ int blp_min_threads;
+ int blp_max_threads;
};
struct ldlm_bl_work_item {
return rc;
}
+static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc)
+{
+ struct ldlm_cb_set_arg *arg;
+ struct ldlm_lock *lock;
+ ENTRY;
+
+ LASSERT(data != NULL);
+
+ arg = req->rq_async_args.pointer_arg[0];
+ lock = req->rq_async_args.pointer_arg[1];
+ LASSERT(lock != NULL);
+ if (rc != 0) {
+ /* If client canceled the lock but the cancel has not
+ * been recieved yet, we need to update lvbo to have the
+ * proper attributes cached. */
+ if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
+ ldlm_res_lvbo_update(lock->l_resource, NULL,
+ 0, 1);
+ rc = ldlm_handle_ast_error(lock, req, rc,
+ arg->type == LDLM_BL_CALLBACK
+ ? "blocking" : "completion");
+ }
+
+ LDLM_LOCK_PUT(lock);
+
+ if (rc == -ERESTART)
+ atomic_set(&arg->restart, 1);
+
+ RETURN(0);
+}
+
+static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
+ struct ldlm_cb_set_arg *arg,
+ struct ldlm_lock *lock,
+ int instant_cancel)
+{
+ int rc = 0;
+ ENTRY;
+
+ if (unlikely(instant_cancel)) {
+ rc = ptl_send_rpc(req, 1);
+ ptlrpc_req_finished(req);
+ if (rc == 0)
+ /* If we cancelled the lock, we need to restart
+ * ldlm_reprocess_queue */
+ atomic_set(&arg->restart, 1);
+ } else {
+ LDLM_LOCK_GET(lock);
+ ptlrpc_set_add_req(arg->set, req);
+ }
+
+ RETURN(rc);
+}
+
/*
* ->l_blocking_ast() method for server-side locks. This is invoked when newly
* enqueued server lock conflicts with given one.
struct ldlm_lock_desc *desc,
void *data, int flag)
{
+ struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
struct ptlrpc_request *req;
int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREQ_OFF] = sizeof(*body) };
- int instant_cancel = 0, rc = 0;
+ int instant_cancel = 0, rc;
ENTRY;
if (flag == LDLM_CB_CANCELING) {
}
LASSERT(lock);
+ LASSERT(data != NULL);
if (lock->l_export->exp_obd->obd_recovering != 0) {
LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
ldlm_lock_dump(D_ERROR, lock, 0);
if (req == NULL)
RETURN(-ENOMEM);
+ req->rq_async_args.pointer_arg[0] = arg;
+ req->rq_async_args.pointer_arg[1] = lock;
+ req->rq_interpret_reply = ldlm_cb_interpret;
+ req->rq_no_resend = 1;
+
lock_res(lock->l_resource);
if (lock->l_granted_mode != lock->l_req_mode) {
/* this blocking AST will be communicated as part of the
lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
- if (unlikely(instant_cancel)) {
- rc = ptl_send_rpc(req, 1);
- } else {
- rc = ptlrpc_queue_wait(req);
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
- }
- if (rc != 0) {
- /* If client canceled the lock but the cancel has not been
- * recieved yet, we need to update lvbo to have the proper
- * attributes cached. */
- if (rc == -EINVAL)
- ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1);
- rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
- }
-
- ptlrpc_req_finished(req);
-
- /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */
- if (!rc && instant_cancel)
- rc = -ERESTART;
+ rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
RETURN(rc);
}
int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
{
+ struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
struct ptlrpc_request *req;
struct timeval granted_time;
long total_enqueue_wait;
int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREQ_OFF] = sizeof(*body) };
- int rc = 0, buffers = 2, instant_cancel = 0;
+ int rc, buffers = 2, instant_cancel = 0;
ENTRY;
LASSERT(lock != NULL);
+ LASSERT(data != NULL);
do_gettimeofday(&granted_time);
total_enqueue_wait = cfs_timeval_sub(&granted_time,
if (req == NULL)
RETURN(-ENOMEM);
+ req->rq_async_args.pointer_arg[0] = arg;
+ req->rq_async_args.pointer_arg[1] = lock;
+ req->rq_interpret_reply = ldlm_cb_interpret;
+ req->rq_no_resend = 1;
+
body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
body->lock_handle[0] = lock->l_remote_handle;
body->lock_flags = flags;
lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
- rc = ptlrpc_queue_wait(req);
- if (rc != 0)
- rc = ldlm_handle_ast_error(lock, req, rc, "completion");
-
- ptlrpc_req_finished(req);
-
- /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */
- if (!rc && instant_cancel)
- rc = -ERESTART;
+ rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
RETURN(rc);
}
lock = find_existing_lock(req->rq_export,
&dlm_req->lock_handle[0]);
if (lock != NULL) {
- DEBUG_REQ(D_HA, req, "found existing lock cookie "LPX64,
- lock->l_handle.h_cookie);
+ DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
+ LPX64, lock->l_handle.h_cookie);
GOTO(existing_lock, rc = 0);
}
}
LDLM_CONVERT - LDLM_FIRST_OPC);
rc = lustre_pack_reply(req, 2, size, NULL);
- if (rc) {
- CERROR("out of memory\n");
- RETURN(-ENOMEM);
- }
+ if (rc)
+ RETURN(rc);
+
dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
sizeof(*dlm_rep));
dlm_rep->lock_flags = dlm_req->lock_flags;
LDLM_CANCEL - LDLM_FIRST_OPC);
rc = lustre_pack_reply(req, 1, NULL, NULL);
- if (rc) {
- CERROR("out of memory\n");
- RETURN(-ENOMEM);
- }
+ if (rc)
+ RETURN(rc);
if (!ldlm_request_cancel(req, dlm_req, 0))
req->rq_status = ESTALE;
return blwi;
}
+/* This only contains temporary data until the thread starts */
struct ldlm_bl_thread_data {
- int bltd_num;
+ char bltd_name[CFS_CURPROC_COMM_MAX];
struct ldlm_bl_pool *bltd_blp;
+ struct completion bltd_comp;
+ int bltd_num;
};
+static int ldlm_bl_thread_main(void *arg);
+
+static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
+{
+ struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
+ int rc;
+
+ init_completion(&bltd.bltd_comp);
+ rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
+ if (rc < 0) {
+ CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
+ atomic_read(&blp->blp_num_threads), rc);
+ return rc;
+ }
+ wait_for_completion(&bltd.bltd_comp);
+
+ return 0;
+}
+
static int ldlm_bl_thread_main(void *arg)
{
- struct ldlm_bl_thread_data *bltd = arg;
- struct ldlm_bl_pool *blp = bltd->bltd_blp;
+ struct ldlm_bl_pool *blp;
ENTRY;
{
- char name[CFS_CURPROC_COMM_MAX];
- snprintf(name, sizeof(name) - 1, "ldlm_bl_%02d",
- bltd->bltd_num);
- cfs_daemonize(name);
- }
+ struct ldlm_bl_thread_data *bltd = arg;
- atomic_inc(&blp->blp_num_threads);
- complete(&blp->blp_comp);
+ blp = bltd->bltd_blp;
+
+ bltd->bltd_num = atomic_inc_return(&blp->blp_num_threads) - 1;
+ atomic_inc(&blp->blp_busy_threads);
+
+ snprintf(bltd->bltd_name, sizeof(bltd->bltd_name) - 1,
+ "ldlm_bl_%02d", bltd->bltd_num);
+ cfs_daemonize(bltd->bltd_name);
- while(1) {
+ complete(&bltd->bltd_comp);
+ /* cannot use bltd after this, it is only on caller's stack */
+ }
+
+ while (1) {
struct l_wait_info lwi = { 0 };
struct ldlm_bl_work_item *blwi = NULL;
- l_wait_event_exclusive(blp->blp_waitq,
- (blwi = ldlm_bl_get_work(blp)) != NULL,
- &lwi);
+ blwi = ldlm_bl_get_work(blp);
- if (blwi->blwi_ns == NULL)
- break;
+ if (blwi == NULL) {
+ int busy;
+
+ atomic_dec(&blp->blp_busy_threads);
+ l_wait_event_exclusive(blp->blp_waitq,
+ (blwi = ldlm_bl_get_work(blp)) != NULL,
+ &lwi);
+ busy = atomic_inc_return(&blp->blp_busy_threads);
+
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ break;
+
+ /* Not fatal if racy and have a few too many threads */
+ if (unlikely(busy < blp->blp_max_threads &&
+ busy >= atomic_read(&blp->blp_num_threads)))
+ /* discard the return value, we tried */
+ ldlm_bl_thread_start(blp);
+ } else {
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ break;
+ }
if (blwi->blwi_count) {
/* The special case when we cancel locks in lru
OBD_FREE(blwi, sizeof(*blwi));
}
+ atomic_dec(&blp->blp_busy_threads);
atomic_dec(&blp->blp_num_threads);
complete(&blp->blp_comp);
RETURN(0);
#endif
-static int ldlm_setup(ldlm_side_t client);
-static int ldlm_cleanup(ldlm_side_t client, int force);
+static int ldlm_setup(void);
+static int ldlm_cleanup(int force);
-int ldlm_get_ref(ldlm_side_t client)
+int ldlm_get_ref(void)
{
int rc = 0;
ENTRY;
mutex_down(&ldlm_ref_sem);
if (++ldlm_refcount == 1) {
- rc = ldlm_setup(client);
+ rc = ldlm_setup();
if (rc)
ldlm_refcount--;
}
RETURN(rc);
}
-void ldlm_put_ref(ldlm_side_t client, int force)
+void ldlm_put_ref(int force)
{
ENTRY;
mutex_down(&ldlm_ref_sem);
if (ldlm_refcount == 1) {
- int rc = ldlm_cleanup(client, force);
+ int rc = ldlm_cleanup(force);
if (rc)
CERROR("ldlm_cleanup failed: %d\n", rc);
else
EXIT;
}
-static int ldlm_setup(ldlm_side_t client)
+static int ldlm_setup(void)
{
struct ldlm_bl_pool *blp;
int rc = 0;
+ int ldlm_min_threads = LDLM_THREADS_AUTO_MIN;
+ int ldlm_max_threads = LDLM_THREADS_AUTO_MAX;
#ifdef __KERNEL__
int i;
#endif
GOTO(out_free, rc);
#endif
+#ifdef __KERNEL__
+ if (ldlm_num_threads) {
+ /* If ldlm_num_threads is set, it is the min and the max. */
+ if (ldlm_num_threads > LDLM_THREADS_AUTO_MAX)
+ ldlm_num_threads = LDLM_THREADS_AUTO_MAX;
+ if (ldlm_num_threads < LDLM_THREADS_AUTO_MIN)
+ ldlm_num_threads = LDLM_THREADS_AUTO_MIN;
+ ldlm_min_threads = ldlm_max_threads = ldlm_num_threads;
+ }
+#endif
+
ldlm_state->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
LDLM_CB_REPLY_PORTAL, ldlm_timeout * 900,
ldlm_callback_handler, "ldlm_cbd",
ldlm_svc_proc_dir, NULL,
- LDLM_THREADS_AUTO_MIN, LDLM_THREADS_AUTO_MAX,
+ ldlm_min_threads, ldlm_max_threads,
"ldlm_cb",
LCT_MD_THREAD|LCT_DT_THREAD);
LDLM_CANCEL_REPLY_PORTAL, ldlm_timeout * 6000,
ldlm_cancel_handler, "ldlm_canceld",
ldlm_svc_proc_dir, NULL,
- LDLM_THREADS_AUTO_MIN, LDLM_THREADS_AUTO_MAX,
+ ldlm_min_threads, ldlm_max_threads,
"ldlm_cn",
LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD);
GOTO(out_proc, rc = -ENOMEM);
ldlm_state->ldlm_bl_pool = blp;
- atomic_set(&blp->blp_num_threads, 0);
- cfs_waitq_init(&blp->blp_waitq);
spin_lock_init(&blp->blp_lock);
-
CFS_INIT_LIST_HEAD(&blp->blp_list);
+ cfs_waitq_init(&blp->blp_waitq);
+ atomic_set(&blp->blp_num_threads, 0);
+ atomic_set(&blp->blp_busy_threads, 0);
+ blp->blp_min_threads = ldlm_min_threads;
+ blp->blp_max_threads = ldlm_max_threads;
#ifdef __KERNEL__
- for (i = 0; i < LDLM_BL_THREADS; i++) {
- struct ldlm_bl_thread_data bltd = {
- .bltd_num = i,
- .bltd_blp = blp,
- };
- init_completion(&blp->blp_comp);
- rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
- if (rc < 0) {
- CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
+ for (i = 0; i < blp->blp_min_threads; i++) {
+ rc = ldlm_bl_thread_start(blp);
+ if (rc < 0)
GOTO(out_thread, rc);
- }
- wait_for_completion(&blp->blp_comp);
}
rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service);
#endif
#ifdef __KERNEL__
- rc = ldlm_pools_init(client);
+ rc = ldlm_pools_init();
if (rc)
GOTO(out_thread, rc);
#endif
return rc;
}
-static int ldlm_cleanup(ldlm_side_t client, int force)
+static int ldlm_cleanup(int force)
{
#ifdef __KERNEL__
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;