From 8c83e7d75989ef527e43a824a0dbe46bffabd07d Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Thu, 11 Aug 2011 16:56:06 -0700 Subject: [PATCH] LU-571 ldlm: Remove parallel AST limitation Imperative Recovery needs to notify clients as soon as possible. However, limiting the AST count to 200 will definitely block it. This may result in the recovery window has passed before the MGS notifies all clients. In this patch, I revised the above limitation to be configurable, and it is no limit by default. Change-Id: I141da9dbd8b282a76252da15b93c432ac5253bfd Signed-off-by: Jinshan Xiong Reviewed-on: http://review.whamcloud.com/1190 Reviewed-by: Andreas Dilger Tested-by: Hudson Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/lustre_dlm.h | 6 +++ lustre/ldlm/ldlm_extent.c | 5 ++- lustre/ldlm/ldlm_flock.c | 2 +- lustre/ldlm/ldlm_inodebits.c | 3 +- lustre/ldlm/ldlm_internal.h | 12 +++--- lustre/ldlm/ldlm_lock.c | 92 +++++++++++++++++++++++--------------------- lustre/ldlm/ldlm_lockd.c | 30 +++++++-------- lustre/ldlm/ldlm_plain.c | 3 +- lustre/ldlm/ldlm_resource.c | 7 ++++ 9 files changed, 89 insertions(+), 71 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index c8f321d..5c31761 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -508,6 +508,12 @@ struct ldlm_namespace { * Limit size of nolock requests, in bytes. */ unsigned ns_max_nolock_size; + + /** + * Limit of parallel AST RPC count. + */ + unsigned ns_max_parallel_ast; + /* callback to cancel locks before replaying it during recovery */ ldlm_cancel_for_recovery ns_cancel_for_recovery; /** diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 310cc42..fe730f5 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -711,12 +711,13 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, if (cfs_list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); unlock_res(res); - rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST); + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list, + LDLM_WORK_BL_AST); if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) && !ns_is_client(ldlm_res_to_ns(res))) class_fail_export(lock->l_export); - + lock_res(res); if (rc == -ERESTART) { diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index b598615..7e8a754 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -460,7 +460,7 @@ restart: &rpc_list); unlock_res_and_lock(req); - rc = ldlm_run_ast_work(&rpc_list, + rc = ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST); lock_res_and_lock(req); if (rc == -ERESTART) diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index 53f065c..2223d58 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -183,7 +183,8 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, if (cfs_list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); unlock_res(res); - rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST); + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list, + LDLM_WORK_BL_AST); lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index e6d184d..f205ea6 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -97,14 +97,11 @@ void ldlm_namespace_free_prior(struct ldlm_namespace *ns, void ldlm_namespace_free_post(struct ldlm_namespace *ns); /* ldlm_lock.c */ -/* Number of blocking/completion callbacks that will be sent in - * parallel (see bug 11301). */ -#define PARALLEL_AST_LIMIT 200 - struct ldlm_cb_set_arg { struct ptlrpc_request_set *set; - cfs_atomic_t restart; - __u32 type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */ + cfs_atomic_t restart; + int type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */ + int rpcs; /* # of rpcs in set */ }; typedef enum { @@ -129,7 +126,8 @@ void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, cfs_list_t *work_list); int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue, cfs_list_t *work_list); -int ldlm_run_ast_work(cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type); +int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, + ldlm_desc_ast_t ast_type); int ldlm_lock_remove_from_lru(struct ldlm_lock *lock); int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock); void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 15d8126..2123535 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1420,20 +1420,27 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue, * * Send an existing rpc set specified by @arg->set and then * destroy it. Create new one if @do_create flag is set. */ -static void -ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create) +static int ldlm_deliver_cb_set(struct ldlm_cb_set_arg *arg, int do_create) { + int rc = 0; ENTRY; - ptlrpc_set_wait(arg->set); - if (arg->type == LDLM_BL_CALLBACK) - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2); - ptlrpc_set_destroy(arg->set); + if (arg->set) { + ptlrpc_set_wait(arg->set); + if (arg->type == LDLM_BL_CALLBACK) + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2); + ptlrpc_set_destroy(arg->set); + arg->set = NULL; + arg->rpcs = 0; + } - if (do_create) + if (do_create) { arg->set = ptlrpc_prep_set(); + if (arg->set == NULL) + rc = -ENOMEM; + } - EXIT; + RETURN(rc); } static int @@ -1442,6 +1449,7 @@ ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) struct ldlm_lock_desc d; struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, l_bl_ast); + int rc; ENTRY; /* nobody should touch l_bl_ast */ @@ -1456,13 +1464,13 @@ ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) ldlm_lock2desc(lock->l_blocking_lock, &d); - lock->l_blocking_ast(lock, &d, (void *)arg, - LDLM_CB_BLOCKING); + rc = lock->l_blocking_ast(lock, &d, (void *)arg, + LDLM_CB_BLOCKING); LDLM_LOCK_RELEASE(lock->l_blocking_lock); lock->l_blocking_lock = NULL; LDLM_LOCK_RELEASE(lock); - RETURN(1); + RETURN(rc); } static int @@ -1494,10 +1502,8 @@ ldlm_work_cp_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) lock->l_flags &= ~LDLM_FL_CP_REQD; unlock_res_and_lock(lock); - if (completion_callback != NULL) { - completion_callback(lock, 0, (void *)arg); - rc = 1; - } + if (completion_callback != NULL) + rc = completion_callback(lock, 0, (void *)arg); LDLM_LOCK_RELEASE(lock); RETURN(rc); @@ -1509,6 +1515,7 @@ ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) struct ldlm_lock_desc desc; struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, l_rk_ast); + int rc; ENTRY; cfs_list_del_init(&lock->l_rk_ast); @@ -1518,27 +1525,29 @@ ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) desc.l_req_mode = LCK_EX; desc.l_granted_mode = 0; - lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING); + rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING); LDLM_LOCK_RELEASE(lock); - RETURN(1); + RETURN(rc); } -int ldlm_run_ast_work(cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type) +int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, + ldlm_desc_ast_t ast_type) { - struct ldlm_cb_set_arg arg; + struct ldlm_cb_set_arg arg = { 0 }; cfs_list_t *tmp, *pos; int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg); - int ast_count; + unsigned int max_ast_count; + int rc; ENTRY; if (cfs_list_empty(rpc_list)) RETURN(0); - arg.set = ptlrpc_prep_set(); - if (NULL == arg.set) - RETURN(-ERESTART); - cfs_atomic_set(&arg.restart, 0); + rc = ldlm_deliver_cb_set(&arg, 1); + if (rc != 0) + RETURN(rc); + switch (ast_type) { case LDLM_WORK_BL_AST: arg.type = LDLM_BL_CALLBACK; @@ -1556,29 +1565,23 @@ int ldlm_run_ast_work(cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type) LBUG(); } - ast_count = 0; + max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX; + cfs_list_for_each_safe(tmp, pos, rpc_list) { - ast_count += work_ast_lock(tmp, &arg); - - /* Send the request set if it exceeds the PARALLEL_AST_LIMIT, - * and create a new set for requests that remained in - * @rpc_list */ - if (unlikely(ast_count == PARALLEL_AST_LIMIT)) { - ldlm_send_and_maybe_create_set(&arg, 1); - ast_count = 0; + (void)work_ast_lock(tmp, &arg); + if (arg.rpcs > max_ast_count) { + rc = ldlm_deliver_cb_set(&arg, 1); + if (rc != 0) + break; } } - if (ast_count > 0) - ldlm_send_and_maybe_create_set(&arg, 0); - else - /* In case when number of ASTs is multiply of - * PARALLEL_AST_LIMIT or @rpc_list was initially empty, - * @arg.set must be destroyed here, otherwise we get - * write memory leaking. */ - ptlrpc_set_destroy(arg.set); + (void)ldlm_deliver_cb_set(&arg, 0); - RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0); + if (rc == 0 && cfs_atomic_read(&arg.restart)) + rc = -ERESTART; + + RETURN(rc); } static int reprocess_one_queue(struct ldlm_resource *res, void *closure) @@ -1628,7 +1631,8 @@ void ldlm_reprocess_all(struct ldlm_resource *res) ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list); unlock_res(res); - rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST); + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list, + LDLM_WORK_CP_AST); if (rc == -ERESTART) { LASSERT(cfs_list_empty(&rpc_list)); goto restart; @@ -1873,7 +1877,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, unlock_res_and_lock(lock); if (granted) - ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST); + ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST); if (node) OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node)); RETURN(res); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 2990edc..1b80090 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -649,9 +649,10 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, static int ldlm_cb_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc) { - struct ldlm_cb_async_args *ca = data; - struct ldlm_cb_set_arg *arg = ca->ca_set_arg; - struct ldlm_lock *lock = ca->ca_lock; + struct ldlm_cb_async_args *ca = data; + struct ldlm_lock *lock = ca->ca_lock; + struct ldlm_cb_set_arg *arg = ca->ca_set_arg; + struct ptlrpc_request_set *set = arg->set; ENTRY; LASSERT(lock != NULL); @@ -659,17 +660,16 @@ static int ldlm_cb_interpret(const struct lu_env *env, rc = ldlm_handle_ast_error(lock, req, rc, arg->type == LDLM_BL_CALLBACK ? "blocking" : "completion"); + if (rc == -ERESTART) + cfs_atomic_inc(&arg->restart); } - LDLM_LOCK_RELEASE(lock); - if (rc == -ERESTART) - cfs_atomic_set(&arg->restart, 1); - + cfs_waitq_signal(&set->set_waitq); RETURN(0); } -static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, +static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req, struct ldlm_cb_set_arg *arg, struct ldlm_lock *lock, int instant_cancel) @@ -681,12 +681,11 @@ static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, rc = ptl_send_rpc(req, 1); ptlrpc_req_finished(req); if (rc == 0) - /* If we cancelled the lock, we need to restart - * ldlm_reprocess_queue */ - cfs_atomic_set(&arg->restart, 1); + cfs_atomic_inc(&arg->restart); } else { LDLM_LOCK_GET(lock); ptlrpc_set_add_req(arg->set, req); + ++arg->rpcs; } RETURN(rc); @@ -810,7 +809,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_BL_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel); + rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); RETURN(rc); } @@ -925,7 +924,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_CP_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel); + rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); RETURN(rc); } @@ -1604,7 +1603,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, * l_ast_data */ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2); - ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST); + ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST); LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", lock); @@ -2147,7 +2146,8 @@ void ldlm_revoke_export_locks(struct obd_export *exp) CFS_INIT_LIST_HEAD(&rpc_list); cfs_hash_for_each_empty(exp->exp_lock_hash, ldlm_revoke_lock_cb, &rpc_list); - ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST); + ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list, + LDLM_WORK_REVOKE_AST); EXIT; } diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c index 7417f3d..486885c 100644 --- a/lustre/ldlm/ldlm_plain.c +++ b/lustre/ldlm/ldlm_plain.c @@ -150,7 +150,8 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, if (cfs_list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); unlock_res(res); - rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST); + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list, + LDLM_WORK_BL_AST); lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index e6eb905..e11eacf 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -365,6 +365,13 @@ int ldlm_namespace_proc_register(struct ldlm_namespace *ns) lock_vars[0].read_fptr = lprocfs_rd_uint; lock_vars[0].write_fptr = lprocfs_wr_uint; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); + + snprintf(lock_name, MAX_STRING_SIZE, "%s/max_parallel_ast", + ldlm_ns_name(ns)); + lock_vars[0].data = &ns->ns_max_parallel_ast; + lock_vars[0].read_fptr = lprocfs_rd_uint; + lock_vars[0].write_fptr = lprocfs_wr_uint; + lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); } return 0; } -- 1.8.3.1