From 9d79f92076b6a9ca735dbe4420c122f47d240263 Mon Sep 17 00:00:00 2001 From: Vitaly Fertman Date: Tue, 28 Feb 2023 04:45:15 +0300 Subject: [PATCH] LU-16285 ldlm: BL_AST lock cancel still can be batched The previous patch makes BLAST locks to be cancelled separately. However the main problem is flushing the data under the other batched locks, thus still possible to batch it with those with no data. Could be optimized for not yet CANCELLING locks only, otherwise it is already in the l_bl_ast list. Fixes: b65374d9 ("LU-16285 ldlm: send the cancel RPC asap") Signed-off-by: Vitaly Fertman Change-Id: Ie4a7c7f3e0f5462290f72af7c3b2ff410a31f5e7 Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/50158 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Yang Sheng Reviewed-by: Oleg Drokin --- lustre/include/lustre_dlm.h | 6 +++--- lustre/ldlm/ldlm_lockd.c | 3 ++- lustre/ldlm/ldlm_request.c | 35 +++++++++++++++++++++-------------- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 9825227..f58d582 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -727,7 +727,6 @@ enum ldlm_cancel_flags { LCF_ASYNC = 0x1, /* Cancel locks asynchronously. */ LCF_LOCAL = 0x2, /* Cancel locks locally, not notifing server */ LCF_BL_AST = 0x4, /* Cancel LDLM_FL_BL_AST locks in the same RPC */ - LCF_ONE_LOCK = 0x8, /* Cancel locks pack only one lock. */ }; struct ldlm_flock { @@ -1765,8 +1764,9 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, union ldlm_policy_data *policy, enum ldlm_mode mode, enum ldlm_cancel_flags flags, void *opaque); -int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr, - int count, enum ldlm_cancel_flags flags); +int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock, + struct list_head *head, int count, + enum ldlm_cancel_flags flags); int ldlm_cancel_resource_local(struct ldlm_resource *res, struct list_head *cancels, union ldlm_policy_data *policy, diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index cb85fac..8ae0a22 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -2472,7 +2472,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) * we can tell the server we have no lock. Otherwise, we * should send cancel after dropping the cache. */ - if (ldlm_is_ast_sent(lock) || ldlm_is_failed(lock)) { + if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) || + ldlm_is_failed(lock)) { LDLM_DEBUG(lock, "callback on lock %llx - lock disappeared", dlm_req->lock_handle[0].cookie); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index a6f4a75..20d39bc 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -1328,8 +1328,9 @@ static int _ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock, * Prepare and send a batched cancel RPC. It will include \a count lock * handles of locks given in \a cancels list. */ -int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr, - int count, enum ldlm_cancel_flags flags) +int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock, + struct list_head *head, int count, + enum ldlm_cancel_flags flags) { struct ptlrpc_request *req = NULL; struct obd_import *imp; @@ -1340,6 +1341,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr, LASSERT(exp != NULL); LASSERT(count > 0); + LASSERT(head == NULL || lock == NULL); CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val); @@ -1394,10 +1396,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr, req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; ptlrpc_at_set_req_timeout(req); - if (flags & LCF_ONE_LOCK) - rc = _ldlm_cancel_pack(req, ptr, NULL, count); - else - rc = _ldlm_cancel_pack(req, NULL, ptr, count); + rc = _ldlm_cancel_pack(req, lock, head, count); if (rc == 0) { ptlrpc_req_finished(req); sent = count; @@ -1555,7 +1554,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, enum ldlm_cancel_flags flags) { struct obd_export *exp; - int avail, count = 1, bl_ast = 0; + int avail, count = 1, separate = 0; + enum ldlm_lru_flags lru_flags = 0; __u64 rc = 0; struct ldlm_namespace *ns; struct ldlm_lock *lock; @@ -1578,7 +1578,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, LDLM_LOCK_RELEASE(lock); RETURN(0); } - bl_ast = 1; + if (ldlm_is_canceling(lock)) + separate = 1; } else if (ldlm_is_canceling(lock)) { /* Lock is being canceled and the caller doesn't want to wait */ if (flags & LCF_ASYNC) { @@ -1602,11 +1603,17 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, if (rc == LDLM_FL_LOCAL_ONLY || flags & LCF_LOCAL) { LDLM_LOCK_RELEASE(lock); RETURN(0); + } else if (rc == LDLM_FL_BL_AST) { + /* BL_AST lock must not wait. */ + lru_flags |= LDLM_LRU_FLAG_NO_WAIT; } exp = lock->l_conn_export; - if (bl_ast) { /* Send RPC immedaitly for LDLM_FL_BL_AST */ - ldlm_cli_cancel_req(exp, lock, count, flags | LCF_ONE_LOCK); + /* If a lock has been taken from lru for a batched cancel and a later + * BL_AST came, send a CANCEL RPC individually for it right away, not + * waiting for the batch to be handled. */ + if (separate) { + ldlm_cli_cancel_req(exp, lock, NULL, 1, flags); LDLM_LOCK_RELEASE(lock); RETURN(0); } @@ -1626,7 +1633,7 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, ns = ldlm_lock_to_ns(lock); count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1, - LCF_BL_AST, 0); + LCF_BL_AST, lru_flags); } ldlm_cli_cancel_list(&cancels, count, NULL, flags); @@ -1678,7 +1685,7 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count, } if (bl_ast > 0) { count -= bl_ast; - ldlm_cli_cancel_list(&head, bl_ast, NULL, 0); + ldlm_cli_cancel_list(&head, bl_ast, NULL, cancel_flags); } RETURN(count); @@ -2187,11 +2194,11 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count, ldlm_cancel_pack(req, cancels, count); else res = ldlm_cli_cancel_req(lock->l_conn_export, - cancels, count, + NULL, cancels, count, flags); } else { res = ldlm_cli_cancel_req(lock->l_conn_export, - cancels, 1, flags); + NULL, cancels, 1, flags); } if (res < 0) { -- 1.8.3.1