From 1c9115f919109936576193a4d20baeccedb8be41 Mon Sep 17 00:00:00 2001 From: Vitaly Fertman Date: Tue, 17 Sep 2024 21:07:18 +0300 Subject: [PATCH] LU-18229 ldlm: BL_AST|CANCELLING lock still can be batched The current code makes BL_AST locks which are also CANCELLING to be cancelled individually (one lh per RPC), because they are already in the l_bl_list. This still could be optimised. A small cleanup in mdc_rename(): ldlm_cli_cancel_list() is already called by mdc_prep_elc_req()->ldlm_prep_elc_req(), aligned with other mdc_prep_elc_req() calls. HPE-bug-id: LUS-12470 Fixes: b65374d9 ("LU-16285 ldlm: send the cancel RPC asap") Signed-off-by: Vitaly Fertman Change-Id: I218d38bc56a885845c48a3c982840b35b132f213 Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/56389 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andriy Skulysh Reviewed-by: Alexander Boyko Reviewed-by: Oleg Drokin --- lustre/include/lustre_dlm.h | 1 + lustre/ldlm/ldlm_lockd.c | 2 +- lustre/ldlm/ldlm_request.c | 138 +++++++++++++++++++++++++------------------- lustre/mdc/mdc_reint.c | 3 - lustre/mdc/mdc_request.c | 2 +- 5 files changed, 82 insertions(+), 64 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 32da69e..a8dcc1a 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -1835,6 +1835,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, int ldlm_cli_cancel_list_local(struct list_head *cancels, int count, enum ldlm_cancel_flags flags); int ldlm_cli_cancel_list(struct list_head *head, int count, + struct ldlm_lock *primary, struct ptlrpc_request *req, enum ldlm_cancel_flags flags); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 4692a3d..ba3ae64 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -2953,7 +2953,7 @@ static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp, count = ldlm_cli_cancel_list_local(&blwi->blwi_head, blwi->blwi_count, LCF_BL_AST); - ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, + ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, NULL, blwi->blwi_flags); } else if (blwi->blwi_lock) { ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index d2e6c1f..9f4d32d 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -899,9 +899,9 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, dlm->lock_count = canceloff; } /* Pack into the request @pack lock handles. */ - ldlm_cli_cancel_list(cancels, pack, req, 0); + ldlm_cli_cancel_list(cancels, pack, NULL, req, 0); /* Prepare and send separate cancel RPC for others. */ - ldlm_cli_cancel_list(cancels, count - pack, NULL, 0); + ldlm_cli_cancel_list(cancels, count - pack, NULL, NULL, 0); } else { ldlm_lock_list_put(cancels, l_bl_ast, count); } @@ -1332,13 +1332,13 @@ static inline int __ldlm_pack_lock(struct ldlm_lock *lock, return 1; } -#define ldlm_cancel_pack(req, head, count) \ - _ldlm_cancel_pack(req, NULL, head, count) + /** - * Pack \a count locks in \a head into ldlm_request buffer of request \a req. + * Pack \a count locks in \a lock and \a head into ldlm_request buffer of + * the request \a req. */ -static int _ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock, - struct list_head *head, int count) +static int ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock, + struct list_head *head, int count) { struct ldlm_request *dlm; int max, packed = 0; @@ -1362,20 +1362,25 @@ static int _ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock, */ if (lock) { /* only pack one lock */ packed = __ldlm_pack_lock(lock, dlm); - } else { + count--; + } + + if (count) { /* pack the list as well if given */ list_for_each_entry(lock, head, l_bl_ast) { - if (!count--) - break; packed += __ldlm_pack_lock(lock, dlm); + if (!--count) + break; } } + CDEBUG(D_DLMTRACE, "%d locks packed\n", packed); + LASSERT(count == 0); RETURN(packed); } /** * Prepare and send a batched cancel RPC. It will include \a count lock - * handles of locks given in \a cancels list. + * handles of locks given in \a lock and \a cancels list. */ int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock, struct list_head *head, int count, @@ -1383,14 +1388,14 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock, { struct ptlrpc_request *req = NULL; struct obd_import *imp; - int free, sent = 0; + int free; int rc = 0; ENTRY; LASSERT(exp != NULL); LASSERT(count > 0); - LASSERT(head == NULL || lock == NULL); + LASSERT(head != NULL); CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val); @@ -1445,18 +1450,16 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock, req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; ptlrpc_at_set_req_timeout(req); - rc = _ldlm_cancel_pack(req, lock, head, count); + rc = ldlm_cancel_pack(req, lock, head, count); if (rc == 0) { ptlrpc_req_put(req); - sent = count; GOTO(out, rc); } ptlrpc_request_set_replen(req); if (flags & LCF_ASYNC) { ptlrpcd_add_req(req); - sent = count; - GOTO(out, 0); + GOTO(out, rc = 0); } rc = ptlrpc_queue_wait(req); @@ -1467,6 +1470,19 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock, rc = 0; } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/ req->rq_import_generation == imp->imp_generation) { + struct ldlm_lock *pos; + + list_for_each_entry(pos, head, l_bl_ast) { + lock_res_and_lock(pos); + ldlm_clear_ast_sent(pos); + unlock_res_and_lock(pos); + } + if (lock) { + lock_res_and_lock(lock); + ldlm_clear_ast_sent(lock); + unlock_res_and_lock(lock); + } + ptlrpc_req_put(req); continue; } else if (rc != ELDLM_OK) { @@ -1476,14 +1492,13 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock, rc); break; } - sent = count; break; } ptlrpc_req_put(req); EXIT; out: - return sent ? sent : rc; + return rc ? : count; } static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp) @@ -1603,7 +1618,7 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, enum ldlm_cancel_flags flags) { struct obd_export *exp; - int avail, count = 1, separate = 0; + int avail, count = 1; enum ldlm_lru_flags lru_flags = 0; __u64 rc = 0; struct ldlm_namespace *ns; @@ -1627,8 +1642,6 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, ldlm_lock_put(lock); RETURN(0); } - if (ldlm_is_canceling(lock)) - separate = 1; } else if (ldlm_is_canceling(lock)) { /* Lock is being canceled and the caller doesn't want to wait */ if (flags & LCF_ASYNC) { @@ -1658,17 +1671,6 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, } exp = lock->l_conn_export; - /* If a lock has been taken from lru for a batched cancel and a later - * BL_AST came, send a CANCEL RPC individually for it right away, not - * waiting for the batch to be handled. */ - if (separate) { - ldlm_cli_cancel_req(exp, lock, NULL, 1, flags); - ldlm_lock_put(lock); - RETURN(0); - } - - LASSERT(list_empty(&lock->l_bl_ast)); - list_add(&lock->l_bl_ast, &cancels); /* * This is a LDLM_CANCEL RPC which goes to canceld portal, * so we can cancel other LRU locks here and send them all @@ -1684,7 +1686,7 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1, LCF_BL_AST, lru_flags); } - ldlm_cli_cancel_list(&cancels, count, NULL, flags); + ldlm_cli_cancel_list(&cancels, count, lock, NULL, flags); RETURN(0); } @@ -1734,7 +1736,7 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count, } if (bl_ast > 0) { count -= bl_ast; - ldlm_cli_cancel_list(&head, bl_ast, NULL, cancel_flags); + ldlm_cli_cancel_list(&head, bl_ast, NULL, NULL, cancel_flags); } RETURN(count); @@ -2199,27 +2201,43 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, EXPORT_SYMBOL(ldlm_cancel_resource_local); /** - * Cancel client-side locks from a list and send/prepare cancel RPCs to the - * server. - * If \a req is NULL, send CANCEL request to server with handles of locks - * in the \a cancels. If EARLY_CANCEL is not supported, send CANCEL requests - * separately per lock. - * If \a req is not NULL, put handles of locks in \a cancels into the request - * buffer at the offset \a off. - * Destroy \a cancels at the end. + * Cancel client-side locks and send/prepare cancel RPCs to the server. + * + * Locks passed as a \a cancels list of locks or/and as a \a primary lock. + * + * \a count must not be larger than the total amount of passed locks in \a + * primary and \a cancels. Thus, it includes the \a primary (if passed), but + * may not include all the locks in \a cancels (if passed). + * + * If \a req is NULL, a new CANCEL RPC is created and sent with \a count lh + * packed into it, starting with the \a primary one. + * If EARLY_CANCEL is not supported, \a count of CANCEL RPCs is created and + * sent with one lh in each request. + * + * If \a req is not NULL, \a count of lh are just need to be placed into the + * RMF_DLM_REQ buffer of the given req. + * + * Destroy \a count of locks at the end, starting with the \a primary one. */ int ldlm_cli_cancel_list(struct list_head *cancels, int count, + struct ldlm_lock *primary, struct ptlrpc_request *req, enum ldlm_cancel_flags flags) { - struct ldlm_lock *lock; + struct ldlm_lock *lock = primary; + struct obd_export *export; int res = 0; - ENTRY; - if (list_empty(cancels) || count == 0) + if (count == 0) RETURN(0); + LASSERT(primary || !list_empty(cancels)); + if (lock == NULL) + lock = list_first_entry(cancels, struct ldlm_lock, l_bl_ast); + export = lock->l_conn_export; + LASSERT(export != NULL); + /* * XXX: requests (both batched and not) could be sent in parallel. * Usually it is enough to have just 1 RPC, but it is possible that @@ -2228,21 +2246,18 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count, * the feature. */ while (count > 0) { - LASSERT(!list_empty(cancels)); - lock = list_first_entry(cancels, struct ldlm_lock, l_bl_ast); - LASSERT(lock->l_conn_export); - - if (exp_connect_cancelset(lock->l_conn_export)) { - res = count; + LASSERT(primary || !list_empty(cancels)); + if (exp_connect_cancelset(export)) { if (req) - ldlm_cancel_pack(req, cancels, count); + res = ldlm_cancel_pack(req, primary, + cancels, count); else - res = ldlm_cli_cancel_req(lock->l_conn_export, - NULL, cancels, count, - flags); + res = ldlm_cli_cancel_req(export, + primary, cancels, + count, flags); } else { - res = ldlm_cli_cancel_req(lock->l_conn_export, - NULL, cancels, 1, flags); + res = ldlm_cli_cancel_req(export, primary, + cancels, 1, flags); } if (res < 0) { @@ -2252,6 +2267,11 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count, } count -= res; + if (primary && res) { + ldlm_lock_put(primary); + primary = NULL; + res--; + } ldlm_lock_list_put(cancels, l_bl_ast, res); } LASSERT(count == 0); @@ -2287,7 +2307,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, count = ldlm_cancel_resource_local(res, &cancels, policy, mode, 0, flags | LCF_BL_AST, opaque); - rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags); + rc = ldlm_cli_cancel_list(&cancels, count, NULL, NULL, flags); if (rc != ELDLM_OK) CERROR("canceling unused lock "DLDLMRES": rc = %d\n", PLDLMRES(res), rc); diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 6faa624..47ce251 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -494,9 +494,6 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data, if (rc) GOTO(err_put_sepol, rc); - if (exp_connect_cancelset(exp) && req) - ldlm_cli_cancel_list(&cancels, count, req, 0); - if (op_data->op_cli_flags & CLI_MIGRATE) mdc_migrate_pack(&req->rq_pill, op_data, old, oldlen); else diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 9db02c3..2e4c8d0 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -2014,7 +2014,7 @@ static int mdc_ioc_hsm_request(struct obd_export *exp, /* Cancel existing locks */ count = mdc_hsm_request_lock_to_cancel(exp, hur, &cancels); - ldlm_cli_cancel_list(&cancels, count, NULL, 0); + ldlm_cli_cancel_list(&cancels, count, NULL, NULL, 0); mdc_pack_body(&req->rq_pill, NULL, 0, 0, -1, 0); /* Copy hsm_request struct */ -- 1.8.3.1