From 0bd27be7f20a671e7128f341a070838a2bd318dc Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Wed, 26 Oct 2011 13:48:17 -0600 Subject: [PATCH] LU-571 ldlm: add parallel ast flow control Commit {hash: 8c83e7d75989ef527e43a824a0dbe46bffabd07d} removed the parallel AST limit on the server. However, if there are too many locks to revoke, it will have to allocate too many RPCs. Return to having an upper limit on the number of AST RPCs in flight by adding a flow control algorithm that allows a configurable upper limit on the number of RPCs in flight. Change-Id: Ifb68991acf7a33119b334447aec50f7717ed546e Signed-off-by: Jinshan Xiong Reviewed-on: http://review.whamcloud.com/1608 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/lustre_dlm.h | 1 + lustre/ldlm/ldlm_internal.h | 9 ++++---- lustre/ldlm/ldlm_lock.c | 55 ++++++++++++--------------------------------- lustre/ldlm/ldlm_lockd.c | 8 +++---- lustre/ldlm/ldlm_resource.c | 1 + 5 files changed, 25 insertions(+), 49 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 91c715c..cf9c70d 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -75,6 +75,7 @@ struct obd_device; #endif #define LDLM_DEFAULT_MAX_ALIVE (cfs_time_seconds(36000)) #define LDLM_CTIME_AGE_LIMIT (10) +#define LDLM_DEFAULT_PARALLEL_AST_LIMIT 1024 typedef enum { ELDLM_OK = 0, diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 12ff5ce..f6bb70e 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -104,10 +104,11 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns); /* ldlm_lock.c */ struct ldlm_cb_set_arg { - struct ptlrpc_request_set *set; - cfs_atomic_t restart; - int type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */ - int rpcs; /* # of rpcs in set */ + int type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */ + unsigned int threshold; /* threshold to wake up the waiting proc */ + cfs_atomic_t rpcs; /* # of inflight rpcs in set */ + cfs_atomic_t restart; + cfs_waitq_t waitq; }; typedef enum { diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 9378385..eaaaa43 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1455,33 +1455,6 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue, RETURN(rc); } -/* Helper function for ldlm_run_ast_work(). - * - * Send an existing rpc set specified by @arg->set and then - * destroy it. Create new one if @do_create flag is set. */ -static int ldlm_deliver_cb_set(struct ldlm_cb_set_arg *arg, int do_create) -{ - int rc = 0; - ENTRY; - - if (arg->set) { - ptlrpc_set_wait(arg->set); - if (arg->type == LDLM_BL_CALLBACK) - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2); - ptlrpc_set_destroy(arg->set); - arg->set = NULL; - arg->rpcs = 0; - } - - if (do_create) { - arg->set = ptlrpc_prep_set(); - if (arg->set == NULL) - rc = -ENOMEM; - } - - RETURN(rc); -} - static int ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) { @@ -1574,18 +1547,18 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type) { struct ldlm_cb_set_arg arg = { 0 }; + struct l_wait_info lwi = { 0 }; cfs_list_t *tmp, *pos; int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg); unsigned int max_ast_count; - int rc; ENTRY; if (cfs_list_empty(rpc_list)) RETURN(0); - rc = ldlm_deliver_cb_set(&arg, 1); - if (rc != 0) - RETURN(rc); + cfs_atomic_set(&arg.restart, 0); + cfs_atomic_set(&arg.rpcs, 0); + cfs_waitq_init(&arg.waitq); switch (ast_type) { case LDLM_WORK_BL_AST: @@ -1605,22 +1578,22 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, } max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX; + arg.threshold = max_ast_count; cfs_list_for_each_safe(tmp, pos, rpc_list) { (void)work_ast_lock(tmp, &arg); - if (arg.rpcs > max_ast_count) { - rc = ldlm_deliver_cb_set(&arg, 1); - if (rc != 0) - break; - } - } + if (cfs_atomic_read(&arg.rpcs) < max_ast_count) + continue; - (void)ldlm_deliver_cb_set(&arg, 0); + l_wait_event(arg.waitq, + cfs_atomic_read(&arg.rpcs) < arg.threshold, + &lwi); + } - if (rc == 0 && cfs_atomic_read(&arg.restart)) - rc = -ERESTART; + arg.threshold = 1; + l_wait_event(arg.waitq, cfs_atomic_read(&arg.rpcs) == 0, &lwi); - RETURN(rc); + RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0); } static int reprocess_one_queue(struct ldlm_resource *res, void *closure) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index af7c6c4..d009a1f 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -650,7 +650,6 @@ static int ldlm_cb_interpret(const struct lu_env *env, struct ldlm_cb_async_args *ca = data; struct ldlm_lock *lock = ca->ca_lock; struct ldlm_cb_set_arg *arg = ca->ca_set_arg; - struct ptlrpc_request_set *set = arg->set; ENTRY; LASSERT(lock != NULL); @@ -663,7 +662,8 @@ static int ldlm_cb_interpret(const struct lu_env *env, } LDLM_LOCK_RELEASE(lock); - cfs_waitq_signal(&set->set_waitq); + if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold) + cfs_waitq_signal(&arg->waitq); RETURN(0); } @@ -682,8 +682,8 @@ static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req, cfs_atomic_inc(&arg->restart); } else { LDLM_LOCK_GET(lock); - ptlrpc_set_add_req(arg->set, req); - ++arg->rpcs; + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + cfs_atomic_inc(&arg->rpcs); } RETURN(rc); diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index eb28062..4d6dab4 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -619,6 +619,7 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obd, char *name, ns->ns_contention_time = NS_DEFAULT_CONTENTION_SECONDS; ns->ns_contended_locks = NS_DEFAULT_CONTENDED_LOCKS; + ns->ns_max_parallel_ast = LDLM_DEFAULT_PARALLEL_AST_LIMIT; ns->ns_nr_unused = 0; ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE; ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE; -- 1.8.3.1