From ae0d69437e35961c257f076da6dcc1842a55456d Mon Sep 17 00:00:00 2001 From: Johann Lombardi Date: Thu, 6 Sep 2012 14:09:05 +0200 Subject: [PATCH] LU-1842 ldlm: support for sending GL ASTs to multiple locks Traditionally, a glimpse callback is sent to one single lock which is the highest PW extent lock. The new quota code uses glimpse callbacks in a different manner since it can issue multiple glimpse ASTs to different slaves (as done for blocking callbacks for instance) to notify those latters of the new qunit value. This patch modifies the glimpse callback path to use ldlm_run_ast_work() to issue multiple glimpse ASTs in one shot (as done currently for other lock callbacks). Signed-off-by: Johann Lombardi Change-Id: I972630fcf3158dd1d361392913f8086f1ebaa39f Reviewed-on: http://review.whamcloud.com/3887 Reviewed-by: Niu Yawei Tested-by: Hudson Reviewed-by: Fan Yong Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/lustre_dlm.h | 10 ++++ lustre/ldlm/ldlm_internal.h | 16 ++++--- lustre/ldlm/ldlm_lock.c | 31 +++++++++++++ lustre/ldlm/ldlm_lockd.c | 110 +++++++++++++++++++++++++++++--------------- lustre/obdfilter/filter.c | 18 +++++++- lustre/ofd/ofd_dlm.c | 19 ++++++-- 6 files changed, 155 insertions(+), 49 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index f70034a..480bced 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -568,6 +568,15 @@ typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags, typedef int (*ldlm_glimpse_callback)(struct ldlm_lock *lock, void *data); typedef unsigned long (*ldlm_weigh_callback)(struct ldlm_lock *lock); +struct ldlm_glimpse_work { + struct ldlm_lock *gl_lock; /* lock to glimpse */ + cfs_list_t gl_list; /* linkage to other gl work structs */ + __u32 gl_flags;/* see LDLM_GL_WORK_* below */ +}; + +/* the ldlm_glimpse_work is allocated on the stack and should not be freed */ +#define LDLM_GL_WORK_NOFREE 0x1 + /* Interval node data for each LDLM_EXTENT lock */ struct ldlm_interval { struct interval_node li_node; /* node for tree mgmt */ @@ -994,6 +1003,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *, void *data, int flag); int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data); int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data); +int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list); int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback, ldlm_blocking_callback, ldlm_glimpse_callback); int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req, diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 237152d..65c1331 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -95,16 +95,17 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns); /* ldlm_lock.c */ struct ldlm_cb_set_arg { - struct ptlrpc_request_set *set; - int type; /* LDLM_{CP,BL}_CALLBACK */ - cfs_atomic_t restart; - cfs_list_t *list; + struct ptlrpc_request_set *set; + int type; /* LDLM_{CP,BL,GL}_CALLBACK */ + cfs_atomic_t restart; + cfs_list_t *list; }; typedef enum { - LDLM_WORK_BL_AST, - LDLM_WORK_CP_AST, - LDLM_WORK_REVOKE_AST + LDLM_WORK_BL_AST, + LDLM_WORK_CP_AST, + LDLM_WORK_REVOKE_AST, + LDLM_WORK_GL_AST } ldlm_desc_ast_t; void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list); @@ -127,6 +128,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue, #endif int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type); +int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq); int ldlm_lock_remove_from_lru(struct ldlm_lock *lock); int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock); void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index bb17c8a..93ccbee 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1618,6 +1618,33 @@ ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) RETURN(rc); } +int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) +{ + struct ldlm_cb_set_arg *arg = opaq; + struct ldlm_glimpse_work *gl_work; + struct ldlm_lock *lock; + int rc = 0; + ENTRY; + + if (cfs_list_empty(arg->list)) + RETURN(-ENOENT); + + gl_work = cfs_list_entry(arg->list->next, struct ldlm_glimpse_work, + gl_list); + cfs_list_del_init(&gl_work->gl_list); + + lock = gl_work->gl_lock; + if (lock->l_glimpse_ast(lock, (void*)arg) == 0) + rc = 1; + + LDLM_LOCK_RELEASE(lock); + + if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0) + OBD_FREE_PTR(gl_work); + + RETURN(rc); +} + int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type) { @@ -1648,6 +1675,10 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, arg->type = LDLM_BL_CALLBACK; work_ast_lock = ldlm_work_revoke_ast_lock; break; + case LDLM_WORK_GL_AST: + arg->type = LDLM_GL_CALLBACK; + work_ast_lock = ldlm_work_gl_ast_lock; + break; default: LBUG(); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 740eae7..6a6eabc 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -699,22 +699,54 @@ static int ldlm_cb_interpret(const struct lu_env *env, ENTRY; LASSERT(lock != NULL); - if (rc != 0) { - rc = ldlm_handle_ast_error(lock, req, rc, - arg->type == LDLM_BL_CALLBACK - ? "blocking" : "completion"); - if (rc == -ERESTART) - cfs_atomic_inc(&arg->restart); - } + + switch (arg->type) { + case LDLM_GL_CALLBACK: + /* Update the LVB from disk if the AST failed + * (this is a legal race) + * + * - Glimpse callback of local lock just returns + * -ELDLM_NO_LOCK_DATA. + * - Glimpse callback of remote lock might return + * -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274 + */ + if (rc == -ELDLM_NO_LOCK_DATA) { + LDLM_DEBUG(lock, "lost race - client has a lock but no " + "inode"); + ldlm_res_lvbo_update(lock->l_resource, NULL, 1); + } else if (rc != 0) { + rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); + } else { + rc = ldlm_res_lvbo_update(lock->l_resource, req, 1); + } + break; + case LDLM_BL_CALLBACK: + if (rc != 0) + rc = ldlm_handle_ast_error(lock, req, rc, "blocking"); + break; + case LDLM_CP_CALLBACK: + if (rc != 0) + rc = ldlm_handle_ast_error(lock, req, rc, "completion"); + break; + default: + LDLM_ERROR(lock, "invalid opcode for lock callback %d", + arg->type); + LBUG(); + } + + /* release extra reference taken in ldlm_ast_fini() */ LDLM_LOCK_RELEASE(lock); + if (rc == -ERESTART) + cfs_atomic_inc(&arg->restart); + RETURN(0); } -static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req, - struct ldlm_cb_set_arg *arg, - struct ldlm_lock *lock, - int instant_cancel) +static inline int ldlm_ast_fini(struct ptlrpc_request *req, + struct ldlm_cb_set_arg *arg, + struct ldlm_lock *lock, + int instant_cancel) { int rc = 0; ENTRY; @@ -853,7 +885,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_BL_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); + rc = ldlm_ast_fini(req, arg, lock, instant_cancel); RETURN(rc); } @@ -968,17 +1000,18 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_CP_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); + rc = ldlm_ast_fini(req, arg, lock, instant_cancel); RETURN(rc); } int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) { - struct ldlm_resource *res = lock->l_resource; - struct ldlm_request *body; - struct ptlrpc_request *req; - int rc; + struct ldlm_cb_set_arg *arg = data; + struct ldlm_request *body; + struct ptlrpc_request *req; + struct ldlm_cb_async_args *ca; + int rc; ENTRY; LASSERT(lock != NULL); @@ -994,44 +1027,44 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) body->lock_handle[0] = lock->l_remote_handle; ldlm_lock2desc(lock, &body->lock_desc); + CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args)); + ca = ptlrpc_req_async_args(req); + ca->ca_set_arg = arg; + ca->ca_lock = lock; + /* server namespace, doesn't need lock */ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lock->l_resource->lr_lvb_len); - res = lock->l_resource; ptlrpc_request_set_replen(req); - req->rq_send_state = LUSTRE_IMP_FULL; /* ptlrpc_request_alloc_pack already set timeout */ if (AT_OFF) req->rq_timeout = ldlm_get_rq_timeout(); + req->rq_interpret_reply = ldlm_cb_interpret; + if (lock->l_export && lock->l_export->exp_nid_stats && lock->l_export->exp_nid_stats->nid_ldlm_stats) lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_GL_CALLBACK - LDLM_FIRST_OPC); - rc = ptlrpc_queue_wait(req); - /* Update the LVB from disk if the AST failed (this is a legal race) - * - * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA. - * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA - * when inode is cleared. LU-274 - */ - if (rc == -ELDLM_NO_LOCK_DATA) { - LDLM_DEBUG(lock, "lost race - client has a lock but no inode"); - ldlm_res_lvbo_update(res, NULL, 1); - } else if (rc != 0) { - rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); - } else { - rc = ldlm_res_lvbo_update(res, req, 1); - } + rc = ldlm_ast_fini(req, arg, lock, 0); - ptlrpc_req_finished(req); - if (rc == -ERESTART) - ldlm_reprocess_all(res); + RETURN(rc); +} - RETURN(rc); +int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list) +{ + int rc; + ENTRY; + + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list, + LDLM_WORK_GL_AST); + if (rc == -ERESTART) + ldlm_reprocess_all(res); + + RETURN(rc); } static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req, @@ -2922,6 +2955,7 @@ EXPORT_SYMBOL(ldlm_cli_cancel_list); EXPORT_SYMBOL(ldlm_server_blocking_ast); EXPORT_SYMBOL(ldlm_server_completion_ast); EXPORT_SYMBOL(ldlm_server_glimpse_ast); +EXPORT_SYMBOL(ldlm_glimpse_locks); EXPORT_SYMBOL(ldlm_handle_enqueue); EXPORT_SYMBOL(ldlm_handle_enqueue0); EXPORT_SYMBOL(ldlm_handle_cancel); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 23a992e..ef25f36 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1698,6 +1698,8 @@ static int filter_intent_policy(struct ldlm_namespace *ns, __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREPLY_OFF] = sizeof(*rep), [DLM_REPLY_REC_OFF] = sizeof(*reply_lvb) }; + struct ldlm_glimpse_work gl_work; + CFS_LIST_HEAD(gl_list); ENTRY; policy = ldlm_get_processing_policy(res); @@ -1820,7 +1822,21 @@ static int filter_intent_policy(struct ldlm_namespace *ns, } LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l); - rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */ + + /* Populate the gl_work structure. + * Grab additional reference on the lock which will be released in + * ldlm_work_gl_ast_lock() */ + gl_work.gl_lock = LDLM_LOCK_GET(l); + /* The glimpse callback is sent to one single extent lock. As a result, + * the gl_work list is just composed of one element */ + cfs_list_add_tail(&gl_work.gl_list, &gl_list); + /* the ldlm_glimpse_work structure is allocated on the stack */ + gl_work.gl_flags = LDLM_GL_WORK_NOFREE; + + rc = ldlm_glimpse_locks(res, &gl_list); /* this will update the LVB */ + + if (!cfs_list_empty(&gl_list)) + LDLM_LOCK_RELEASE(l); lock_res(res); *reply_lvb = *res_lvb; diff --git a/lustre/ofd/ofd_dlm.c b/lustre/ofd/ofd_dlm.c index 34790c5..22effa0 100644 --- a/lustre/ofd/ofd_dlm.c +++ b/lustre/ofd/ofd_dlm.c @@ -107,7 +107,8 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, [DLM_LOCKREPLY_OFF] = sizeof(*rep), [DLM_REPLY_REC_OFF] = sizeof(*reply_lvb) }; - + struct ldlm_glimpse_work gl_work; + CFS_LIST_HEAD(gl_list); ENTRY; policy = ldlm_get_processing_policy(res); @@ -226,8 +227,20 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, goto out; } - LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l); - rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */ + /* Populate the gl_work structure. + * Grab additional reference on the lock which will be released in + * ldlm_work_gl_ast_lock() */ + gl_work.gl_lock = LDLM_LOCK_GET(l); + /* The glimpse callback is sent to one single extent lock. As a result, + * the gl_work list is just composed of one element */ + cfs_list_add_tail(&gl_work.gl_list, &gl_list); + /* the ldlm_glimpse_work structure is allocated on the stack */ + gl_work.gl_flags = LDLM_GL_WORK_NOFREE; + + rc = ldlm_glimpse_locks(res, &gl_list); /* this will update the LVB */ + + if (!cfs_list_empty(&gl_list)) + LDLM_LOCK_RELEASE(l); lock_res(res); *reply_lvb = *res_lvb; -- 1.8.3.1