Whamcloud - gitweb
LU-16285 ldlm: send the cancel RPC asap 27/49527/8
authorYang Sheng <ys@whamcloud.com>
Sat, 14 Jan 2023 17:56:14 +0000 (01:56 +0800)
committerOleg Drokin <green@whamcloud.com>
Fri, 27 Jan 2023 00:35:13 +0000 (00:35 +0000)
This patch try to send cancel RPC ASAP when bl_ast
received from server. The exist problem is that
lock could be added in regular queue before bl_ast
arrived since other reason. It will prevent lock
canceling in timely manner. The other problem is
that we collect many locks in one RPC to save
the network traffic. But this process could take
a long time when dirty pages flushing.

 - The lock canceling will be processed even lock has
   been added to bl queue while bl_ast arrived. Unless
   the cancel RPC has been sent.
 - Send the cancel RPC immediatly for bl_ast lock. Don't
   try to add more locks in such case.

Signed-off-by: Yang Sheng <ys@whamcloud.com>
Change-Id: Ie5efff3f1ed4e46448371185a0c08968233e7644
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/49527
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Qian Yingjin <qian@ddn.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c

index e40a79b..9825227 100644 (file)
@@ -727,6 +727,7 @@ enum ldlm_cancel_flags {
        LCF_ASYNC       = 0x1, /* Cancel locks asynchronously. */
        LCF_LOCAL       = 0x2, /* Cancel locks locally, not notifing server */
        LCF_BL_AST      = 0x4, /* Cancel LDLM_FL_BL_AST locks in the same RPC */
+       LCF_ONE_LOCK    = 0x8, /* Cancel locks pack only one lock. */
 };
 
 struct ldlm_flock {
@@ -1764,7 +1765,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                                    union ldlm_policy_data *policy,
                                    enum ldlm_mode mode,
                                    enum ldlm_cancel_flags flags, void *opaque);
-int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *head,
+int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr,
                        int count, enum ldlm_cancel_flags flags);
 int ldlm_cancel_resource_local(struct ldlm_resource *res,
                               struct list_head *cancels,
index e116e9e..a5b20b5 100644 (file)
@@ -2472,8 +2472,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 * we can tell the server we have no lock. Otherwise, we
                 * should send cancel after dropping the cache.
                 */
-               if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
-                    ldlm_is_failed(lock)) {
+               if (ldlm_is_ast_sent(lock) || ldlm_is_failed(lock)) {
                        LDLM_DEBUG(lock,
                                   "callback on lock %llx - lock disappeared",
                                   dlm_req->lock_handle[0].cookie);
@@ -2508,7 +2507,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
 
        switch (lustre_msg_get_opc(req->rq_reqmsg)) {
        case LDLM_BL_CALLBACK:
-               CDEBUG(D_INODE, "blocking ast\n");
+               LDLM_DEBUG(lock, "blocking ast\n");
                req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
                if (!ldlm_is_cancel_on_block(lock)) {
                        rc = ldlm_callback_reply(req, 0);
@@ -2520,14 +2519,14 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                        ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
                break;
        case LDLM_CP_CALLBACK:
-               CDEBUG(D_INODE, "completion ast\n");
+               LDLM_DEBUG(lock, "completion ast\n");
                req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
                rc = ldlm_handle_cp_callback(req, ns, dlm_req, lock);
                if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE))
                        ldlm_callback_reply(req, rc);
                break;
        case LDLM_GL_CALLBACK:
-               CDEBUG(D_INODE, "glimpse ast\n");
+               LDLM_DEBUG(lock, "glimpse ast\n");
                req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
                ldlm_handle_gl_callback(req, ns, dlm_req, lock);
                break;
index 8d609d1..a51e555 100644 (file)
@@ -1265,14 +1265,33 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
        RETURN(rc);
 }
 
+static inline int __ldlm_pack_lock(struct ldlm_lock *lock,
+                                  struct ldlm_request *dlm)
+{
+       LASSERT(lock->l_conn_export);
+       lock_res_and_lock(lock);
+       if (ldlm_is_ast_sent(lock)) {
+               unlock_res_and_lock(lock);
+               return 0;
+       }
+       ldlm_set_ast_sent(lock);
+       unlock_res_and_lock(lock);
+
+       /* Pack the lock handle to the given request buffer. */
+       LDLM_DEBUG(lock, "packing");
+       dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
+
+       return 1;
+}
+#define ldlm_cancel_pack(req, head, count) \
+               _ldlm_cancel_pack(req, NULL, head, count)
 /**
  * Pack \a count locks in \a head into ldlm_request buffer of request \a req.
  */
-static void ldlm_cancel_pack(struct ptlrpc_request *req,
+static int _ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock,
                             struct list_head *head, int count)
 {
        struct ldlm_request *dlm;
-       struct ldlm_lock *lock;
        int max, packed = 0;
 
        ENTRY;
@@ -1292,24 +1311,24 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req,
         * so that the server cancel would call filter_lvbo_update() less
         * frequently.
         */
-       list_for_each_entry(lock, head, l_bl_ast) {
-               if (!count--)
-                       break;
-               LASSERT(lock->l_conn_export);
-               /* Pack the lock handle to the given request buffer. */
-               LDLM_DEBUG(lock, "packing");
-               dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
-               packed++;
+       if (lock) { /* only pack one lock */
+               packed = __ldlm_pack_lock(lock, dlm);
+       } else {
+               list_for_each_entry(lock, head, l_bl_ast) {
+                       if (!count--)
+                               break;
+                       packed += __ldlm_pack_lock(lock, dlm);
+               }
        }
        CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
-       EXIT;
+       RETURN(packed);
 }
 
 /**
  * Prepare and send a batched cancel RPC. It will include \a count lock
  * handles of locks given in \a cancels list.
  */
-int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
+int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr,
                        int count, enum ldlm_cancel_flags flags)
 {
        struct ptlrpc_request *req = NULL;
@@ -1375,7 +1394,15 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
                ptlrpc_at_set_req_timeout(req);
 
-               ldlm_cancel_pack(req, cancels, count);
+               if (flags & LCF_ONE_LOCK)
+                       rc = _ldlm_cancel_pack(req, ptr, NULL, count);
+               else
+                       rc = _ldlm_cancel_pack(req, NULL, ptr, count);
+               if (rc == 0) {
+                       ptlrpc_req_finished(req);
+                       sent = count;
+                       GOTO(out, rc);
+               }
 
                ptlrpc_request_set_replen(req);
                if (flags & LCF_ASYNC) {
@@ -1525,10 +1552,10 @@ EXPORT_SYMBOL(ldlm_cli_convert);
  * Lock must not have any readers or writers by this time.
  */
 int ldlm_cli_cancel(const struct lustre_handle *lockh,
-                   enum ldlm_cancel_flags cancel_flags)
+                   enum ldlm_cancel_flags flags)
 {
        struct obd_export *exp;
-       int avail, count = 1;
+       int avail, count = 1, bl_ast = 0;
        __u64 rc = 0;
        struct ldlm_namespace *ns;
        struct ldlm_lock *lock;
@@ -1545,9 +1572,16 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
        lock_res_and_lock(lock);
        LASSERT(!ldlm_is_converting(lock));
 
-       /* Lock is being canceled and the caller doesn't want to wait */
-       if (ldlm_is_canceling(lock)) {
-               if (cancel_flags & LCF_ASYNC) {
+       if (ldlm_is_bl_ast(lock)) {
+               if (ldlm_is_ast_sent(lock)) {
+                       unlock_res_and_lock(lock);
+                       LDLM_LOCK_RELEASE(lock);
+                       RETURN(0);
+               }
+               bl_ast = 1;
+       } else if (ldlm_is_canceling(lock)) {
+               /* Lock is being canceled and the caller doesn't want to wait */
+               if (flags & LCF_ASYNC) {
                        unlock_res_and_lock(lock);
                } else {
                        unlock_res_and_lock(lock);
@@ -1560,24 +1594,30 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
        ldlm_set_canceling(lock);
        unlock_res_and_lock(lock);
 
-       if (cancel_flags & LCF_LOCAL)
+       if (flags & LCF_LOCAL)
                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_LOCAL_CANCEL_PAUSE,
                                 cfs_fail_val);
 
        rc = ldlm_cli_cancel_local(lock);
-       if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
+       if (rc == LDLM_FL_LOCAL_ONLY || flags & LCF_LOCAL) {
                LDLM_LOCK_RELEASE(lock);
                RETURN(0);
        }
-       /*
-        * Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
-        * RPC which goes to canceld portal, so we can cancel other LRU locks
-        * here and send them all as one LDLM_CANCEL RPC.
-        */
-       LASSERT(list_empty(&lock->l_bl_ast));
-       list_add(&lock->l_bl_ast, &cancels);
 
        exp = lock->l_conn_export;
+       if (bl_ast) { /* Send RPC immedaitly for LDLM_FL_BL_AST */
+               ldlm_cli_cancel_req(exp, lock, count, flags | LCF_ONE_LOCK);
+               LDLM_LOCK_RELEASE(lock);
+               RETURN(0);
+       }
+
+       LASSERT(list_empty(&lock->l_bl_ast));
+       list_add(&lock->l_bl_ast, &cancels);
+       /*
+        * This is a LDLM_CANCEL RPC which goes to canceld portal,
+        * so we can cancel other LRU locks here and send them all
+        * as one LDLM_CANCEL RPC.
+        */
        if (exp_connect_cancelset(exp)) {
                avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
                                                  &RQF_LDLM_CANCEL,
@@ -1588,7 +1628,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
                                               LCF_BL_AST, 0);
        }
-       ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
+       ldlm_cli_cancel_list(&cancels, count, NULL, flags);
+
        RETURN(0);
 }
 EXPORT_SYMBOL(ldlm_cli_cancel);