Whamcloud - gitweb
LU-16285 ldlm: BL_AST lock cancel still can be batched 58/50158/3
authorVitaly Fertman <c17818@cray.com>
Tue, 28 Feb 2023 01:45:15 +0000 (04:45 +0300)
committerOleg Drokin <green@whamcloud.com>
Wed, 8 Mar 2023 03:29:29 +0000 (03:29 +0000)
The previous patch makes BLAST locks to be cancelled separately.
However the main problem is flushing the data under the other batched
locks, thus still possible to batch it with those with no data.
Could be optimized for not yet CANCELLING locks only, otherwise it is
already in the l_bl_ast list.

Fixes: b65374d9 ("LU-16285 ldlm: send the cancel RPC asap")
Signed-off-by: Vitaly Fertman <vitaly.fertman@hpe.com>
Change-Id: Ie4a7c7f3e0f5462290f72af7c3b2ff410a31f5e7
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/50158
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Yang Sheng <ys@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c

index 9825227..f58d582 100644 (file)
@@ -727,7 +727,6 @@ enum ldlm_cancel_flags {
        LCF_ASYNC       = 0x1, /* Cancel locks asynchronously. */
        LCF_LOCAL       = 0x2, /* Cancel locks locally, not notifing server */
        LCF_BL_AST      = 0x4, /* Cancel LDLM_FL_BL_AST locks in the same RPC */
-       LCF_ONE_LOCK    = 0x8, /* Cancel locks pack only one lock. */
 };
 
 struct ldlm_flock {
@@ -1765,8 +1764,9 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                                    union ldlm_policy_data *policy,
                                    enum ldlm_mode mode,
                                    enum ldlm_cancel_flags flags, void *opaque);
-int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr,
-                       int count, enum ldlm_cancel_flags flags);
+int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock,
+                       struct list_head *head, int count,
+                       enum ldlm_cancel_flags flags);
 int ldlm_cancel_resource_local(struct ldlm_resource *res,
                               struct list_head *cancels,
                               union ldlm_policy_data *policy,
index cb85fac..8ae0a22 100644 (file)
@@ -2472,7 +2472,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 * we can tell the server we have no lock. Otherwise, we
                 * should send cancel after dropping the cache.
                 */
-               if (ldlm_is_ast_sent(lock) || ldlm_is_failed(lock)) {
+               if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
+                    ldlm_is_failed(lock)) {
                        LDLM_DEBUG(lock,
                                   "callback on lock %llx - lock disappeared",
                                   dlm_req->lock_handle[0].cookie);
index a6f4a75..20d39bc 100644 (file)
@@ -1328,8 +1328,9 @@ static int _ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock,
  * Prepare and send a batched cancel RPC. It will include \a count lock
  * handles of locks given in \a cancels list.
  */
-int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr,
-                       int count, enum ldlm_cancel_flags flags)
+int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock,
+                       struct list_head *head, int count,
+                       enum ldlm_cancel_flags flags)
 {
        struct ptlrpc_request *req = NULL;
        struct obd_import *imp;
@@ -1340,6 +1341,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr,
 
        LASSERT(exp != NULL);
        LASSERT(count > 0);
+       LASSERT(head == NULL || lock == NULL);
 
        CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
 
@@ -1394,10 +1396,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr,
                req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
                ptlrpc_at_set_req_timeout(req);
 
-               if (flags & LCF_ONE_LOCK)
-                       rc = _ldlm_cancel_pack(req, ptr, NULL, count);
-               else
-                       rc = _ldlm_cancel_pack(req, NULL, ptr, count);
+               rc = _ldlm_cancel_pack(req, lock, head, count);
                if (rc == 0) {
                        ptlrpc_req_finished(req);
                        sent = count;
@@ -1555,7 +1554,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                    enum ldlm_cancel_flags flags)
 {
        struct obd_export *exp;
-       int avail, count = 1, bl_ast = 0;
+       int avail, count = 1, separate = 0;
+       enum ldlm_lru_flags lru_flags = 0;
        __u64 rc = 0;
        struct ldlm_namespace *ns;
        struct ldlm_lock *lock;
@@ -1578,7 +1578,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                        LDLM_LOCK_RELEASE(lock);
                        RETURN(0);
                }
-               bl_ast = 1;
+               if (ldlm_is_canceling(lock))
+                       separate = 1;
        } else if (ldlm_is_canceling(lock)) {
                /* Lock is being canceled and the caller doesn't want to wait */
                if (flags & LCF_ASYNC) {
@@ -1602,11 +1603,17 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
        if (rc == LDLM_FL_LOCAL_ONLY || flags & LCF_LOCAL) {
                LDLM_LOCK_RELEASE(lock);
                RETURN(0);
+       } else if (rc == LDLM_FL_BL_AST) {
+               /* BL_AST lock must not wait. */
+               lru_flags |= LDLM_LRU_FLAG_NO_WAIT;
        }
 
        exp = lock->l_conn_export;
-       if (bl_ast) { /* Send RPC immedaitly for LDLM_FL_BL_AST */
-               ldlm_cli_cancel_req(exp, lock, count, flags | LCF_ONE_LOCK);
+       /* If a lock has been taken from lru for a batched cancel and a later
+        * BL_AST came, send a CANCEL RPC individually for it right away, not
+        * waiting for the batch to be handled. */
+       if (separate) {
+               ldlm_cli_cancel_req(exp, lock, NULL, 1, flags);
                LDLM_LOCK_RELEASE(lock);
                RETURN(0);
        }
@@ -1626,7 +1633,7 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
 
                ns = ldlm_lock_to_ns(lock);
                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
-                                              LCF_BL_AST, 0);
+                                              LCF_BL_AST, lru_flags);
        }
        ldlm_cli_cancel_list(&cancels, count, NULL, flags);
 
@@ -1678,7 +1685,7 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
        }
        if (bl_ast > 0) {
                count -= bl_ast;
-               ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
+               ldlm_cli_cancel_list(&head, bl_ast, NULL, cancel_flags);
        }
 
        RETURN(count);
@@ -2187,11 +2194,11 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
                                ldlm_cancel_pack(req, cancels, count);
                        else
                                res = ldlm_cli_cancel_req(lock->l_conn_export,
-                                                         cancels, count,
+                                                         NULL, cancels, count,
                                                          flags);
                } else {
                        res = ldlm_cli_cancel_req(lock->l_conn_export,
-                                                 cancels, 1, flags);
+                                                 NULL, cancels, 1, flags);
                }
 
                if (res < 0) {