From 6b810c5a75e9a8772b4ac391ce3d0f4b535ac15f Mon Sep 17 00:00:00 2001 From: deen Date: Tue, 20 Nov 2007 16:34:01 +0000 Subject: [PATCH] Instead of sending blocking and completion callbacks as separated requests, adding them to a set and sending in parallel. b=11301 i=green i=tappro --- lustre/ChangeLog | 6 +++ lustre/ldlm/ldlm_internal.h | 11 +++++ lustre/ldlm/ldlm_lock.c | 97 +++++++++++++++++++++++++++++++++-------- lustre/ldlm/ldlm_lockd.c | 103 +++++++++++++++++++++++++++++++------------- 4 files changed, 168 insertions(+), 49 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index a0f3f1e..84d35a0 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -492,6 +492,12 @@ Description: touch file failed when fs is not full Details : OST in recovery should not be discarded by MDS in alloc_qos(), otherwise we can get ENOSP while fs is not full. +Severity : normal +Bugzilla : 11301 +Description: parallel lock callbacks +Details : Instead of sending blocking and completion callbacks as separated + requests, adding them to a set and sending in parallel. + -------------------------------------------------------------------------------- 2007-08-10 Cluster File Systems, Inc. diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 89b63c8..5b4b0dd 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -48,6 +48,17 @@ void ldlm_resource_insert_lock_after(struct ldlm_lock *original, struct ldlm_lock *new); /* ldlm_lock.c */ + +/* Number of blocking/completion callbacks that will be sent in + * parallel (see bug 11301). */ +#define PARALLEL_AST_LIMIT 200 + +struct ldlm_cb_set_arg { + struct ptlrpc_request_set *set; + atomic_t restart; + __u16 type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */ +}; + void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list); struct ldlm_lock * ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 4f9be34..c7ec85a 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1268,13 +1268,38 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, RETURN(rc); } +/* Helper function for pair ldlm_run_{bl,cp}_ast_work(). + * + * Send an existing rpc set specified by @arg->set and then + * destroy it. Create new one if @do_create flag is set. */ +static void +ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create) +{ + int rc; + + rc = ptlrpc_set_wait(arg->set); + if (arg->type == LDLM_BL_CALLBACK) + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2); + ptlrpc_set_destroy(arg->set); + + if (do_create) + arg->set = ptlrpc_prep_set(); +} + int ldlm_run_bl_ast_work(struct list_head *rpc_list) { + struct ldlm_cb_set_arg arg; struct list_head *tmp, *pos; struct ldlm_lock_desc d; - int rc = 0, retval = 0; + int ast_count; + int rc = 0; ENTRY; + arg.set = ptlrpc_prep_set(); + atomic_set(&arg.restart, 0); + arg.type = LDLM_BL_CALLBACK; + + ast_count = 0; list_for_each_safe(tmp, pos, rpc_list) { struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_bl_ast); @@ -1293,24 +1318,44 @@ int ldlm_run_bl_ast_work(struct list_head *rpc_list) LDLM_LOCK_PUT(lock->l_blocking_lock); lock->l_blocking_lock = NULL; - rc = lock->l_blocking_ast(lock, &d, NULL, LDLM_CB_BLOCKING); - - if (rc == -ERESTART) - retval = rc; - else if (rc) - CDEBUG(D_DLMTRACE, "Failed AST - should clean & " - "disconnect client\n"); + rc = lock->l_blocking_ast(lock, &d, (void *)&arg, + LDLM_CB_BLOCKING); LDLM_LOCK_PUT(lock); + ast_count++; + + /* Send the request set if it exceeds the PARALLEL_AST_LIMIT, + * and create a new set for requests that remained in + * @rpc_list */ + if (unlikely(ast_count == PARALLEL_AST_LIMIT)) { + ldlm_send_and_maybe_create_set(&arg, 1); + ast_count = 0; + } } - RETURN(retval); + + if (ast_count > 0) + ldlm_send_and_maybe_create_set(&arg, 0); + else + /* In case when number of ASTs is multiply of + * PARALLEL_AST_LIMIT or @rpc_list was initially empty, + * @arg.set must be destroyed here, otherwise we get + * write memory leaking. */ + ptlrpc_set_destroy(arg.set); + + RETURN(atomic_read(&arg.restart) ? -ERESTART : 0); } int ldlm_run_cp_ast_work(struct list_head *rpc_list) { + struct ldlm_cb_set_arg arg; struct list_head *tmp, *pos; - int rc = 0, retval = 0; + int ast_count; + int rc = 0; ENTRY; + arg.set = ptlrpc_prep_set(); + atomic_set(&arg.restart, 0); + arg.type = LDLM_CP_CALLBACK; + /* It's possible to receive a completion AST before we've set * the l_completion_ast pointer: either because the AST arrived * before the reply, or simply because there's a small race @@ -1322,6 +1367,7 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list) * reader/writer reference, which we won't do until we get the * reply and finish enqueueing. */ + ast_count = 0; list_for_each_safe(tmp, pos, rpc_list) { struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_cp_ast); @@ -1333,16 +1379,31 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list) lock->l_flags &= ~LDLM_FL_CP_REQD; unlock_res_and_lock(lock); - if (lock->l_completion_ast != NULL) - rc = lock->l_completion_ast(lock, 0, 0); - if (rc == -ERESTART) - retval = rc; - else if (rc) - CDEBUG(D_DLMTRACE, "Failed AST - should clean & " - "disconnect client\n"); + if (lock->l_completion_ast != NULL) { + rc = lock->l_completion_ast(lock, 0, (void *)&arg); + ast_count++; + } LDLM_LOCK_PUT(lock); + + /* Send the request set if it exceeds the PARALLEL_AST_LIMIT, + * and create a new set for requests that remained in + * @rpc_list */ + if (unlikely(ast_count == PARALLEL_AST_LIMIT)) { + ldlm_send_and_maybe_create_set(&arg, 1); + ast_count = 0; + } } - RETURN(retval); + + if (ast_count > 0) + ldlm_send_and_maybe_create_set(&arg, 0); + else + /* In case when number of ASTs is multiply of + * PARALLEL_AST_LIMIT or @rpc_list was initially empty, + * @arg.set must be destroyed here, otherwise we get + * write memory leaking. */ + ptlrpc_set_destroy(arg.set); + + RETURN(atomic_read(&arg.restart) ? -ERESTART : 0); } static int reprocess_one_queue(struct ldlm_resource *res, void *closure) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 08b2ead..35b08ca 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -495,6 +495,60 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, return rc; } +static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc) +{ + struct ldlm_cb_set_arg *arg; + struct ldlm_lock *lock; + ENTRY; + + LASSERT(data != NULL); + + arg = req->rq_async_args.pointer_arg[0]; + lock = req->rq_async_args.pointer_arg[1]; + LASSERT(lock != NULL); + if (rc != 0) { + /* If client canceled the lock but the cancel has not + * been recieved yet, we need to update lvbo to have the + * proper attributes cached. */ + if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK) + ldlm_res_lvbo_update(lock->l_resource, NULL, + 0, 1); + rc = ldlm_handle_ast_error(lock, req, rc, + arg->type == LDLM_BL_CALLBACK + ? "blocking" : "completion"); + } + + LDLM_LOCK_PUT(lock); + + if (rc == -ERESTART) + atomic_set(&arg->restart, 1); + + RETURN(0); +} + +static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, + struct ldlm_cb_set_arg *arg, + struct ldlm_lock *lock, + int instant_cancel) +{ + int rc = 0; + ENTRY; + + if (unlikely(instant_cancel)) { + rc = ptl_send_rpc(req, 1); + ptlrpc_req_finished(req); + if (rc == 0) + /* If we cancelled the lock, we need to restart + * ldlm_reprocess_queue */ + atomic_set(&arg->restart, 1); + } else { + LDLM_LOCK_GET(lock); + ptlrpc_set_add_req(arg->set, req); + } + + RETURN(rc); +} + /* * ->l_blocking_ast() method for server-side locks. This is invoked when newly * enqueued server lock conflicts with given one. @@ -506,11 +560,12 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { + struct ldlm_cb_set_arg *arg = (struct ldlm_cb_set_arg *)data; struct ldlm_request *body; struct ptlrpc_request *req; int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(*body) }; - int instant_cancel = 0, rc = 0; + int instant_cancel = 0, rc; ENTRY; if (flag == LDLM_CB_CANCELING) { @@ -519,6 +574,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, } LASSERT(lock); + LASSERT(data != NULL); if (lock->l_export->exp_obd->obd_recovering != 0) { LDLM_ERROR(lock, "BUG 6063: lock collide during recovery"); ldlm_lock_dump(D_ERROR, lock, 0); @@ -530,6 +586,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (req == NULL) RETURN(-ENOMEM); + req->rq_async_args.pointer_arg[0] = arg; + req->rq_async_args.pointer_arg[1] = lock; + req->rq_interpret_reply = ldlm_cb_interpret; + req->rq_no_resend = 1; + lock_res(lock->l_resource); if (lock->l_granted_mode != lock->l_req_mode) { /* this blocking AST will be communicated as part of the @@ -574,42 +635,25 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, lprocfs_counter_incr(lock->l_export->exp_ldlm_stats, LDLM_BL_CALLBACK - LDLM_FIRST_OPC); - if (unlikely(instant_cancel)) { - rc = ptl_send_rpc(req, 1); - } else { - rc = ptlrpc_queue_wait(req); - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2); - } - if (rc != 0) { - /* If client canceled the lock but the cancel has not been - * recieved yet, we need to update lvbo to have the proper - * attributes cached. */ - if (rc == -EINVAL) - ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1); - rc = ldlm_handle_ast_error(lock, req, rc, "blocking"); - } - - ptlrpc_req_finished(req); - - /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */ - if (!rc && instant_cancel) - rc = -ERESTART; + rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel); RETURN(rc); } int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) { + struct ldlm_cb_set_arg *arg = (struct ldlm_cb_set_arg *)data; struct ldlm_request *body; struct ptlrpc_request *req; struct timeval granted_time; long total_enqueue_wait; int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(*body) }; - int rc = 0, buffers = 2, instant_cancel = 0; + int rc, buffers = 2, instant_cancel = 0; ENTRY; LASSERT(lock != NULL); + LASSERT(data != NULL); do_gettimeofday(&granted_time); total_enqueue_wait = cfs_timeval_sub(&granted_time, @@ -632,6 +676,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (req == NULL) RETURN(-ENOMEM); + req->rq_async_args.pointer_arg[0] = arg; + req->rq_async_args.pointer_arg[1] = lock; + req->rq_interpret_reply = ldlm_cb_interpret; + req->rq_no_resend = 1; + body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); body->lock_handle[0] = lock->l_remote_handle; body->lock_flags = flags; @@ -683,15 +732,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) lprocfs_counter_incr(lock->l_export->exp_ldlm_stats, LDLM_CP_CALLBACK - LDLM_FIRST_OPC); - rc = ptlrpc_queue_wait(req); - if (rc != 0) - rc = ldlm_handle_ast_error(lock, req, rc, "completion"); - - ptlrpc_req_finished(req); - - /* If we cancelled the lock, we need to restart ldlm_reprocess_queue */ - if (!rc && instant_cancel) - rc = -ERESTART; + rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel); RETURN(rc); } -- 1.8.3.1