Whamcloud - gitweb
LU-18229 ldlm: BL_AST|CANCELLING lock still can be batched 89/56389/4
authorVitaly Fertman <c17818@cray.com>
Tue, 17 Sep 2024 18:07:18 +0000 (21:07 +0300)
committerOleg Drokin <green@whamcloud.com>
Wed, 22 Jan 2025 18:40:54 +0000 (18:40 +0000)
The current code makes BL_AST locks which are also CANCELLING to be
cancelled individually (one lh per RPC), because they are already in
the l_bl_list. This still could be optimised.

A small cleanup in mdc_rename(): ldlm_cli_cancel_list() is already
called by mdc_prep_elc_req()->ldlm_prep_elc_req(), aligned with other
mdc_prep_elc_req() calls.

HPE-bug-id: LUS-12470
Fixes: b65374d9 ("LU-16285 ldlm: send the cancel RPC asap")
Signed-off-by: Vitaly Fertman <vitaly.fertman@hpe.com>
Change-Id: I218d38bc56a885845c48a3c982840b35b132f213
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/56389
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andriy Skulysh <andriy.skulysh@hpe.com>
Reviewed-by: Alexander Boyko <alexander.boyko@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c

index 32da69e..a8dcc1a 100644 (file)
@@ -1835,6 +1835,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
 int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
                               enum ldlm_cancel_flags flags);
 int ldlm_cli_cancel_list(struct list_head *head, int count,
+                        struct ldlm_lock *primary,
                         struct ptlrpc_request *req,
                         enum ldlm_cancel_flags flags);
 
index 4692a3d..ba3ae64 100644 (file)
@@ -2953,7 +2953,7 @@ static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
                count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
                                                   blwi->blwi_count,
                                                   LCF_BL_AST);
-               ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
+               ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, NULL,
                                     blwi->blwi_flags);
        } else if (blwi->blwi_lock) {
                ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
index d2e6c1f..9f4d32d 100644 (file)
@@ -899,9 +899,9 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
                        dlm->lock_count = canceloff;
                }
                /* Pack into the request @pack lock handles. */
-               ldlm_cli_cancel_list(cancels, pack, req, 0);
+               ldlm_cli_cancel_list(cancels, pack, NULL, req, 0);
                /* Prepare and send separate cancel RPC for others. */
-               ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
+               ldlm_cli_cancel_list(cancels, count - pack, NULL, NULL, 0);
        } else {
                ldlm_lock_list_put(cancels, l_bl_ast, count);
        }
@@ -1332,13 +1332,13 @@ static inline int __ldlm_pack_lock(struct ldlm_lock *lock,
 
        return 1;
 }
-#define ldlm_cancel_pack(req, head, count) \
-               _ldlm_cancel_pack(req, NULL, head, count)
+
 /**
- * Pack \a count locks in \a head into ldlm_request buffer of request \a req.
+ * Pack \a count locks in \a lock and \a head into ldlm_request buffer of
+ * the request \a req.
  */
-static int _ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock,
-                            struct list_head *head, int count)
+static int ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock,
+                           struct list_head *head, int count)
 {
        struct ldlm_request *dlm;
        int max, packed = 0;
@@ -1362,20 +1362,25 @@ static int _ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock,
         */
        if (lock) { /* only pack one lock */
                packed = __ldlm_pack_lock(lock, dlm);
-       } else {
+               count--;
+       }
+
+       if (count) { /* pack the list as well if given */
                list_for_each_entry(lock, head, l_bl_ast) {
-                       if (!count--)
-                               break;
                        packed += __ldlm_pack_lock(lock, dlm);
+                       if (!--count)
+                               break;
                }
        }
+
        CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
+       LASSERT(count == 0);
        RETURN(packed);
 }
 
 /**
  * Prepare and send a batched cancel RPC. It will include \a count lock
- * handles of locks given in \a cancels list.
+ * handles of locks given in \a lock and \a cancels list.
  */
 int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock,
                        struct list_head *head, int count,
@@ -1383,14 +1388,14 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock,
 {
        struct ptlrpc_request *req = NULL;
        struct obd_import *imp;
-       int free, sent = 0;
+       int free;
        int rc = 0;
 
        ENTRY;
 
        LASSERT(exp != NULL);
        LASSERT(count > 0);
-       LASSERT(head == NULL || lock == NULL);
+       LASSERT(head != NULL);
 
        CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
 
@@ -1445,18 +1450,16 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock,
                req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
                ptlrpc_at_set_req_timeout(req);
 
-               rc = _ldlm_cancel_pack(req, lock, head, count);
+               rc = ldlm_cancel_pack(req, lock, head, count);
                if (rc == 0) {
                        ptlrpc_req_put(req);
-                       sent = count;
                        GOTO(out, rc);
                }
 
                ptlrpc_request_set_replen(req);
                if (flags & LCF_ASYNC) {
                        ptlrpcd_add_req(req);
-                       sent = count;
-                       GOTO(out, 0);
+                       GOTO(out, rc = 0);
                }
 
                rc = ptlrpc_queue_wait(req);
@@ -1467,6 +1470,19 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock,
                        rc = 0;
                } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
                           req->rq_import_generation == imp->imp_generation) {
+                       struct ldlm_lock *pos;
+
+                       list_for_each_entry(pos, head, l_bl_ast) {
+                               lock_res_and_lock(pos);
+                               ldlm_clear_ast_sent(pos);
+                               unlock_res_and_lock(pos);
+                       }
+                       if (lock) {
+                               lock_res_and_lock(lock);
+                               ldlm_clear_ast_sent(lock);
+                               unlock_res_and_lock(lock);
+                       }
+
                        ptlrpc_req_put(req);
                        continue;
                } else if (rc != ELDLM_OK) {
@@ -1476,14 +1492,13 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock,
                                     rc);
                        break;
                }
-               sent = count;
                break;
        }
 
        ptlrpc_req_put(req);
        EXIT;
 out:
-       return sent ? sent : rc;
+       return rc ? : count;
 }
 
 static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
@@ -1603,7 +1618,7 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                    enum ldlm_cancel_flags flags)
 {
        struct obd_export *exp;
-       int avail, count = 1, separate = 0;
+       int avail, count = 1;
        enum ldlm_lru_flags lru_flags = 0;
        __u64 rc = 0;
        struct ldlm_namespace *ns;
@@ -1627,8 +1642,6 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                        ldlm_lock_put(lock);
                        RETURN(0);
                }
-               if (ldlm_is_canceling(lock))
-                       separate = 1;
        } else if (ldlm_is_canceling(lock)) {
                /* Lock is being canceled and the caller doesn't want to wait */
                if (flags & LCF_ASYNC) {
@@ -1658,17 +1671,6 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
        }
 
        exp = lock->l_conn_export;
-       /* If a lock has been taken from lru for a batched cancel and a later
-        * BL_AST came, send a CANCEL RPC individually for it right away, not
-        * waiting for the batch to be handled. */
-       if (separate) {
-               ldlm_cli_cancel_req(exp, lock, NULL, 1, flags);
-               ldlm_lock_put(lock);
-               RETURN(0);
-       }
-
-       LASSERT(list_empty(&lock->l_bl_ast));
-       list_add(&lock->l_bl_ast, &cancels);
        /*
         * This is a LDLM_CANCEL RPC which goes to canceld portal,
         * so we can cancel other LRU locks here and send them all
@@ -1684,7 +1686,7 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
                                               LCF_BL_AST, lru_flags);
        }
-       ldlm_cli_cancel_list(&cancels, count, NULL, flags);
+       ldlm_cli_cancel_list(&cancels, count, lock, NULL, flags);
 
        RETURN(0);
 }
@@ -1734,7 +1736,7 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
        }
        if (bl_ast > 0) {
                count -= bl_ast;
-               ldlm_cli_cancel_list(&head, bl_ast, NULL, cancel_flags);
+               ldlm_cli_cancel_list(&head, bl_ast, NULL, NULL, cancel_flags);
        }
 
        RETURN(count);
@@ -2199,27 +2201,43 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
 EXPORT_SYMBOL(ldlm_cancel_resource_local);
 
 /**
- * Cancel client-side locks from a list and send/prepare cancel RPCs to the
- * server.
- * If \a req is NULL, send CANCEL request to server with handles of locks
- * in the \a cancels. If EARLY_CANCEL is not supported, send CANCEL requests
- * separately per lock.
- * If \a req is not NULL, put handles of locks in \a cancels into the request
- * buffer at the offset \a off.
- * Destroy \a cancels at the end.
+ * Cancel client-side locks and send/prepare cancel RPCs to the server.
+ *
+ * Locks passed as a \a cancels list of locks or/and as a \a primary lock.
+ *
+ * \a count must not be larger than the total amount of passed locks in \a
+ * primary and \a cancels. Thus, it includes the \a primary (if passed), but
+ * may not include all the locks in \a cancels (if passed).
+ *
+ * If \a req is NULL, a new CANCEL RPC is created and sent with \a count lh
+ * packed into it, starting with the \a primary one.
+ * If EARLY_CANCEL is not supported, \a count of CANCEL RPCs is created and
+ * sent with one lh in each request.
+ *
+ * If \a req is not NULL, \a count of lh are just need to be placed into the
+ * RMF_DLM_REQ buffer of the given req.
+ *
+ * Destroy \a count of locks at the end, starting with the \a primary one.
  */
 int ldlm_cli_cancel_list(struct list_head *cancels, int count,
+                        struct ldlm_lock *primary,
                         struct ptlrpc_request *req,
                         enum ldlm_cancel_flags flags)
 {
-       struct ldlm_lock *lock;
+       struct ldlm_lock *lock = primary;
+       struct obd_export *export;
        int res = 0;
-
        ENTRY;
 
-       if (list_empty(cancels) || count == 0)
+       if (count == 0)
                RETURN(0);
 
+       LASSERT(primary || !list_empty(cancels));
+       if (lock == NULL)
+               lock = list_first_entry(cancels, struct ldlm_lock, l_bl_ast);
+       export = lock->l_conn_export;
+       LASSERT(export != NULL);
+
        /*
         * XXX: requests (both batched and not) could be sent in parallel.
         * Usually it is enough to have just 1 RPC, but it is possible that
@@ -2228,21 +2246,18 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
         * the feature.
         */
        while (count > 0) {
-               LASSERT(!list_empty(cancels));
-               lock = list_first_entry(cancels, struct ldlm_lock, l_bl_ast);
-               LASSERT(lock->l_conn_export);
-
-               if (exp_connect_cancelset(lock->l_conn_export)) {
-                       res = count;
+               LASSERT(primary || !list_empty(cancels));
+               if (exp_connect_cancelset(export)) {
                        if (req)
-                               ldlm_cancel_pack(req, cancels, count);
+                               res = ldlm_cancel_pack(req, primary,
+                                                      cancels, count);
                        else
-                               res = ldlm_cli_cancel_req(lock->l_conn_export,
-                                                         NULL, cancels, count,
-                                                         flags);
+                               res = ldlm_cli_cancel_req(export,
+                                                         primary, cancels,
+                                                         count, flags);
                } else {
-                       res = ldlm_cli_cancel_req(lock->l_conn_export,
-                                                 NULL, cancels, 1, flags);
+                       res = ldlm_cli_cancel_req(export, primary,
+                                                 cancels, 1, flags);
                }
 
                if (res < 0) {
@@ -2252,6 +2267,11 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
                }
 
                count -= res;
+               if (primary && res) {
+                       ldlm_lock_put(primary);
+                       primary = NULL;
+                       res--;
+               }
                ldlm_lock_list_put(cancels, l_bl_ast, res);
        }
        LASSERT(count == 0);
@@ -2287,7 +2307,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
 
        count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
                                           0, flags | LCF_BL_AST, opaque);
-       rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
+       rc = ldlm_cli_cancel_list(&cancels, count, NULL, NULL, flags);
        if (rc != ELDLM_OK)
                CERROR("canceling unused lock "DLDLMRES": rc = %d\n",
                       PLDLMRES(res), rc);
index 6faa624..47ce251 100644 (file)
@@ -494,9 +494,6 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
        if (rc)
                GOTO(err_put_sepol, rc);
 
-       if (exp_connect_cancelset(exp) && req)
-               ldlm_cli_cancel_list(&cancels, count, req, 0);
-
        if (op_data->op_cli_flags & CLI_MIGRATE)
                mdc_migrate_pack(&req->rq_pill, op_data, old, oldlen);
        else
index 9db02c3..2e4c8d0 100644 (file)
@@ -2014,7 +2014,7 @@ static int mdc_ioc_hsm_request(struct obd_export *exp,
 
        /* Cancel existing locks */
        count = mdc_hsm_request_lock_to_cancel(exp, hur, &cancels);
-       ldlm_cli_cancel_list(&cancels, count, NULL, 0);
+       ldlm_cli_cancel_list(&cancels, count, NULL, NULL, 0);
        mdc_pack_body(&req->rq_pill, NULL, 0, 0, -1, 0);
 
        /* Copy hsm_request struct */