From 3bec867c84029e86f7cecf6e92cc4b62e1290e73 Mon Sep 17 00:00:00 2001 From: deen Date: Wed, 13 Feb 2008 19:17:25 +0000 Subject: [PATCH] Changes to parallel callbacks code to avoid duplication and resolve problem with ldlm_revoke_export_locks(). b=11301 i=green i=fanyong --- lustre/ldlm/ldlm_extent.c | 2 +- lustre/ldlm/ldlm_flock.c | 2 +- lustre/ldlm/ldlm_inodebits.c | 2 +- lustre/ldlm/ldlm_internal.h | 9 ++- lustre/ldlm/ldlm_lock.c | 176 +++++++++++++++++++++++-------------------- lustre/ldlm/ldlm_lockd.c | 19 +---- lustre/ldlm/ldlm_plain.c | 2 +- 7 files changed, 110 insertions(+), 102 deletions(-) diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 82fbeed..3ff8e75 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -414,7 +414,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, if (list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); unlock_res(res); - rc = ldlm_run_bl_ast_work(&rpc_list); + rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST); lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 6cb71e7..919097b 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -399,7 +399,7 @@ restart: &rpc_list); unlock_res(res); - rc = ldlm_run_bl_ast_work(&rpc_list); + rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST); lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index 97f941f..34c6295 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -157,7 +157,7 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, if (list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); unlock_res(res); - rc = ldlm_run_bl_ast_work(&rpc_list); + rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST); lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index ebfbaac..741190aa 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -66,6 +66,12 @@ struct ldlm_cb_set_arg { __u32 type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */ }; +typedef enum { + LDLM_WORK_BL_AST, + LDLM_WORK_CP_AST, + LDLM_WORK_REVOKE_AST +} ldlm_desc_ast_t; + void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list); struct ldlm_lock * ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *, @@ -80,8 +86,7 @@ void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, struct list_head *work_list); int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, struct list_head *work_list); -int ldlm_run_bl_ast_work(struct list_head *rpc_list); -int ldlm_run_cp_ast_work(struct list_head *rpc_list); +int ldlm_run_ast_work(struct list_head *rpc_list, ldlm_desc_ast_t ast_type); int ldlm_lock_remove_from_lru(struct ldlm_lock *lock); int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock); void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 88e1ef2..abc77f6 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1270,94 +1270,62 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, RETURN(rc); } -/* Helper function for pair ldlm_run_{bl,cp}_ast_work(). +/* Helper function for ldlm_run_ast_work(). * * Send an existing rpc set specified by @arg->set and then * destroy it. Create new one if @do_create flag is set. */ static void ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create) { - int rc; + ENTRY; - rc = ptlrpc_set_wait(arg->set); + ptlrpc_set_wait(arg->set); if (arg->type == LDLM_BL_CALLBACK) OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2); ptlrpc_set_destroy(arg->set); if (do_create) arg->set = ptlrpc_prep_set(); + + EXIT; } -int ldlm_run_bl_ast_work(struct list_head *rpc_list) +static int +ldlm_work_bl_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) { - struct ldlm_cb_set_arg arg; - struct list_head *tmp, *pos; struct ldlm_lock_desc d; - int ast_count; - int rc = 0; + struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_bl_ast); ENTRY; - arg.set = ptlrpc_prep_set(); - atomic_set(&arg.restart, 0); - arg.type = LDLM_BL_CALLBACK; - - ast_count = 0; - list_for_each_safe(tmp, pos, rpc_list) { - struct ldlm_lock *lock = - list_entry(tmp, struct ldlm_lock, l_bl_ast); - - /* nobody should touch l_bl_ast */ - lock_res_and_lock(lock); - list_del_init(&lock->l_bl_ast); - - LASSERT(lock->l_flags & LDLM_FL_AST_SENT); - LASSERT(lock->l_bl_ast_run == 0); - LASSERT(lock->l_blocking_lock); - lock->l_bl_ast_run++; - unlock_res_and_lock(lock); - - ldlm_lock2desc(lock->l_blocking_lock, &d); + /* nobody should touch l_bl_ast */ + lock_res_and_lock(lock); + list_del_init(&lock->l_bl_ast); - LDLM_LOCK_PUT(lock->l_blocking_lock); - lock->l_blocking_lock = NULL; - rc = lock->l_blocking_ast(lock, &d, (void *)&arg, - LDLM_CB_BLOCKING); - LDLM_LOCK_PUT(lock); - ast_count++; + LASSERT(lock->l_flags & LDLM_FL_AST_SENT); + LASSERT(lock->l_bl_ast_run == 0); + LASSERT(lock->l_blocking_lock); + lock->l_bl_ast_run++; + unlock_res_and_lock(lock); - /* Send the request set if it exceeds the PARALLEL_AST_LIMIT, - * and create a new set for requests that remained in - * @rpc_list */ - if (unlikely(ast_count == PARALLEL_AST_LIMIT)) { - ldlm_send_and_maybe_create_set(&arg, 1); - ast_count = 0; - } - } + ldlm_lock2desc(lock->l_blocking_lock, &d); - if (ast_count > 0) - ldlm_send_and_maybe_create_set(&arg, 0); - else - /* In case when number of ASTs is multiply of - * PARALLEL_AST_LIMIT or @rpc_list was initially empty, - * @arg.set must be destroyed here, otherwise we get - * write memory leaking. */ - ptlrpc_set_destroy(arg.set); + LDLM_LOCK_PUT(lock->l_blocking_lock); + lock->l_blocking_lock = NULL; + lock->l_blocking_ast(lock, &d, (void *)arg, + LDLM_CB_BLOCKING); + LDLM_LOCK_PUT(lock); - RETURN(atomic_read(&arg.restart) ? -ERESTART : 0); + RETURN(1); } -int ldlm_run_cp_ast_work(struct list_head *rpc_list) +static int +ldlm_work_cp_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) { - struct ldlm_cb_set_arg arg; - struct list_head *tmp, *pos; - int ast_count; + struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_cp_ast); + ldlm_completion_callback completion_callback; int rc = 0; ENTRY; - arg.set = ptlrpc_prep_set(); - atomic_set(&arg.restart, 0); - arg.type = LDLM_CP_CALLBACK; - /* It's possible to receive a completion AST before we've set * the l_completion_ast pointer: either because the AST arrived * before the reply, or simply because there's a small race @@ -1369,27 +1337,75 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list) * reader/writer reference, which we won't do until we get the * reply and finish enqueueing. */ + /* nobody should touch l_cp_ast */ + lock_res_and_lock(lock); + list_del_init(&lock->l_cp_ast); + LASSERT(lock->l_flags & LDLM_FL_CP_REQD); + /* save l_completion_ast since it can be changed by + * mds_intent_policy(), see bug 14225 */ + completion_callback = lock->l_completion_ast; + lock->l_flags &= ~LDLM_FL_CP_REQD; + unlock_res_and_lock(lock); + + if (completion_callback != NULL) { + completion_callback(lock, 0, (void *)arg); + rc = 1; + } + LDLM_LOCK_PUT(lock); + + RETURN(rc); +} + +static int +ldlm_work_revoke_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) +{ + struct ldlm_lock_desc desc; + struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_export_chain); + ENTRY; + + list_del_init(&lock->l_export_chain); + + /* the desc just pretend to exclusive */ + ldlm_lock2desc(lock, &desc); + desc.l_req_mode = LCK_EX; + desc.l_granted_mode = 0; + + lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING); + LDLM_LOCK_PUT(lock); + + RETURN(1); +} + +int ldlm_run_ast_work(struct list_head *rpc_list, ldlm_desc_ast_t ast_type) +{ + struct ldlm_cb_set_arg arg; + struct list_head *tmp, *pos; + int (*work_ast_lock)(struct list_head *tmp, struct ldlm_cb_set_arg *arg); + int ast_count; + ENTRY; + + arg.set = ptlrpc_prep_set(); + atomic_set(&arg.restart, 0); + switch (ast_type) { + case LDLM_WORK_BL_AST: + arg.type = LDLM_BL_CALLBACK; + work_ast_lock = ldlm_work_bl_ast_lock; + break; + case LDLM_WORK_CP_AST: + arg.type = LDLM_CP_CALLBACK; + work_ast_lock = ldlm_work_cp_ast_lock; + break; + case LDLM_WORK_REVOKE_AST: + arg.type = LDLM_BL_CALLBACK; + work_ast_lock = ldlm_work_revoke_ast_lock; + break; + default: + LBUG(); + } + ast_count = 0; list_for_each_safe(tmp, pos, rpc_list) { - struct ldlm_lock *lock = - list_entry(tmp, struct ldlm_lock, l_cp_ast); - ldlm_completion_callback completion_callback; - - /* nobody should touch l_cp_ast */ - lock_res_and_lock(lock); - list_del_init(&lock->l_cp_ast); - LASSERT(lock->l_flags & LDLM_FL_CP_REQD); - /* save l_completion_ast since it can be changed by - * mds_intent_policy(), see bug 14225 */ - completion_callback = lock->l_completion_ast; - lock->l_flags &= ~LDLM_FL_CP_REQD; - unlock_res_and_lock(lock); - - if (completion_callback != NULL) { - rc = completion_callback(lock, 0, (void *)&arg); - ast_count++; - } - LDLM_LOCK_PUT(lock); + ast_count += work_ast_lock(tmp, &arg); /* Send the request set if it exceeds the PARALLEL_AST_LIMIT, * and create a new set for requests that remained in @@ -1471,7 +1487,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res) ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list); unlock_res(res); - rc = ldlm_run_cp_ast_work(&rpc_list); + rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST); if (rc == -ERESTART) { LASSERT(list_empty(&rpc_list)); goto restart; @@ -1719,7 +1735,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, unlock_res_and_lock(lock); if (granted) - ldlm_run_cp_ast_work(&rpc_list); + ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST); RETURN(res); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 8ef4562..8c2ea6c 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1389,7 +1389,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); - ldlm_run_cp_ast_work(&ast_list); + ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST); LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", lock); @@ -1760,7 +1760,6 @@ void ldlm_revoke_export_locks(struct obd_export *exp) struct list_head *locklist = &exp->exp_ldlm_data.led_held_locks; struct list_head rpc_list; struct ldlm_lock *lock, *next; - struct ldlm_lock_desc desc; ENTRY; INIT_LIST_HEAD(&rpc_list); @@ -1791,26 +1790,14 @@ void ldlm_revoke_export_locks(struct obd_export *exp) lock->l_flags |= LDLM_FL_AST_SENT; list_move(&lock->l_export_chain, &rpc_list); + LDLM_LOCK_GET(lock); unlock_res_and_lock(lock); } spin_unlock(&exp->exp_ldlm_data.led_lock); - while (!list_empty(&rpc_list)) { - lock = list_entry(rpc_list.next, struct ldlm_lock, - l_export_chain); - list_del_init(&lock->l_export_chain); - - /* the desc just pretend to exclusive */ - ldlm_lock2desc(lock, &desc); - desc.l_req_mode = LCK_EX; - desc.l_granted_mode = 0; + ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST); - LDLM_LOCK_GET(lock); - lock->l_blocking_ast(lock, &desc, lock->l_ast_data, - LDLM_CB_BLOCKING); - LDLM_LOCK_PUT(lock); - } EXIT; } diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c index c6b2c89..6ced357 100644 --- a/lustre/ldlm/ldlm_plain.c +++ b/lustre/ldlm/ldlm_plain.c @@ -132,7 +132,7 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, if (list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); unlock_res(res); - rc = ldlm_run_bl_ast_work(&rpc_list); + rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST); lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); -- 1.8.3.1