Whamcloud - gitweb
LU-11102 ldlm: don't skip bl_ast for local lock 58/33458/10
authorMikhail Pershin <mpershin@whamcloud.com>
Tue, 18 Sep 2018 11:00:33 +0000 (14:00 +0300)
committerOleg Drokin <green@whamcloud.com>
Sat, 17 Nov 2018 01:29:26 +0000 (01:29 +0000)
The previous commit 954cc675 skips bl_ast for local lock but
there are cases on MDT when local lock can become a client lock,
see mdt_intent_lock_replace(). In that case the client should be
notified if lock is a blocking lock.

Patch reverts commit 954cc675 and provides alternative solution.
During downgrade to COS the lock renews own blocking AST states
and start reprocessing. Any new lock conflict will cause new
blocking AST and related async commit as needed.

Test-Parameters: mdssizegb=20 testlist=racer,racer,racer
Signed-off-by: Mikhail Pershin <mpershin@whamcloud.com>
Change-Id: I41adab5c805a59fdbeade8ae3556556b779dc3c0
Reviewed-on: https://review.whamcloud.com/33458
Reviewed-by: Vitaly Fertman <c17818@cray.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/mdt/mdt_handler.c

index 738154b..c5660da 100644 (file)
@@ -609,6 +609,11 @@ struct ldlm_glimpse_work {
        void                    *gl_interpret_data;
 };
 
        void                    *gl_interpret_data;
 };
 
+struct ldlm_bl_desc {
+       unsigned int bl_same_client:1,
+                    bl_cos_incompat:1;
+};
+
 struct ldlm_cb_set_arg {
        struct ptlrpc_request_set       *set;
        int                              type; /* LDLM_{CP,BL,GL}_CALLBACK */
 struct ldlm_cb_set_arg {
        struct ptlrpc_request_set       *set;
        int                              type; /* LDLM_{CP,BL,GL}_CALLBACK */
@@ -617,6 +622,7 @@ struct ldlm_cb_set_arg {
        union ldlm_gl_desc              *gl_desc; /* glimpse AST descriptor */
        ptlrpc_interpterer_t             gl_interpret_reply;
        void                            *gl_interpret_data;
        union ldlm_gl_desc              *gl_desc; /* glimpse AST descriptor */
        ptlrpc_interpterer_t             gl_interpret_reply;
        void                            *gl_interpret_data;
+       struct ldlm_bl_desc             *bl_desc;
 };
 
 struct ldlm_cb_async_args {
 };
 
 struct ldlm_cb_async_args {
index d6ededf..fd1c62c 100644 (file)
 #include "ldlm_internal.h"
 
 #ifdef HAVE_SERVER_SUPPORT
 #include "ldlm_internal.h"
 
 #ifdef HAVE_SERVER_SUPPORT
-/*
- * local lock will be canceled after use, and it should run blocking ast only
- * when it should trigger Commit-on-Sharing, otherwise if the blocking ast
- * is run and does nothing, it will prevent subsequent operations to trigger
- * Commit-on-Sharing, see LU-11102.
- */
-static bool ldlm_should_run_bl_ast(const struct ldlm_lock *lock,
-                                  const struct ldlm_lock *req)
-{
-       /* no blocking ast */
-       if (!lock->l_blocking_ast)
-               return false;
-
-       /* not local lock */
-       if (!ldlm_is_local(lock))
-               return true;
-
-       /* should trigger Commit-on-Sharing */
-       if ((lock->l_req_mode & LCK_COS))
-               return true;
-
-       /* local read lock will be canceld after use */
-       if (!(lock->l_req_mode & (LCK_PW | LCK_EX)))
-               return false;
-
-       /* if CoS enabled, check if @req is from different client */
-       if (ldlm_is_cos_enabled(req))
-               return lock->l_client_cookie != req->l_client_cookie;
-
-       /* check if @req is COS incompatible */
-       if (ldlm_is_cos_incompat(req))
-               return true;
-
-       return false;
-}
 
 /**
  * Determine if the lock is compatible with all locks on the queue.
 
 /**
  * Determine if the lock is compatible with all locks on the queue.
@@ -204,12 +169,12 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
 
                                /* Add locks of the policy group to @work_list
                                 * as blocking locks for @req */
 
                                /* Add locks of the policy group to @work_list
                                 * as blocking locks for @req */
-                               if (ldlm_should_run_bl_ast(lock, req))
+                               if (lock->l_blocking_ast)
                                        ldlm_add_ast_work_item(lock, req,
                                                               work_list);
                                head = &lock->l_sl_policy;
                                list_for_each_entry(lock, head, l_sl_policy)
                                        ldlm_add_ast_work_item(lock, req,
                                                               work_list);
                                head = &lock->l_sl_policy;
                                list_for_each_entry(lock, head, l_sl_policy)
-                                       if (ldlm_should_run_bl_ast(lock, req))
+                                       if (lock->l_blocking_ast)
                                                ldlm_add_ast_work_item(lock,
                                                                req, work_list);
                        }
                                                ldlm_add_ast_work_item(lock,
                                                                req, work_list);
                        }
index 97e9333..3b7e55d 100644 (file)
@@ -162,9 +162,8 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
 int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
                              struct list_head *rpc_list);
 void ldlm_discard_bl_list(struct list_head *bl_list);
 int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
                              struct list_head *rpc_list);
 void ldlm_discard_bl_list(struct list_head *bl_list);
-void ldlm_discard_bl_lock(struct ldlm_lock *lock);
 void ldlm_clear_blocking_lock(struct ldlm_lock *lock);
 void ldlm_clear_blocking_lock(struct ldlm_lock *lock);
-
+void ldlm_clear_blocking_data(struct ldlm_lock *lock);
 #endif
 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
                      ldlm_desc_ast_t ast_type);
 #endif
 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
                      ldlm_desc_ast_t ast_type);
index 3c44552..5e7afe6 100644 (file)
@@ -665,12 +665,19 @@ static void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
                 * discard dirty data, rather than writing back. */
                if (ldlm_is_ast_discard_data(new))
                        ldlm_set_discard_data(lock);
                 * discard dirty data, rather than writing back. */
                if (ldlm_is_ast_discard_data(new))
                        ldlm_set_discard_data(lock);
-               LASSERT(list_empty(&lock->l_bl_ast));
-               list_add(&lock->l_bl_ast, work_list);
-                LDLM_LOCK_GET(lock);
-                LASSERT(lock->l_blocking_lock == NULL);
-                lock->l_blocking_lock = LDLM_LOCK_GET(new);
-        }
+
+               /* Lock can be converted from a blocking state back to granted
+                * after lock convert or COS downgrade but still be in an
+                * older bl_list because it is controlled only by
+                * ldlm_work_bl_ast_lock(), let it be processed there.
+                */
+               if (list_empty(&lock->l_bl_ast)) {
+                       list_add(&lock->l_bl_ast, work_list);
+                       LDLM_LOCK_GET(lock);
+               }
+               LASSERT(lock->l_blocking_lock == NULL);
+               lock->l_blocking_lock = LDLM_LOCK_GET(new);
+       }
 }
 
 /**
 }
 
 /**
@@ -2008,36 +2015,6 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
        RETURN(0);
 }
 
        RETURN(0);
 }
 
-/* Discard a single lock from bl_list, may be used by
- * lock convert handler when lock is returned to the granted
- * state.
- */
-void ldlm_discard_bl_lock(struct ldlm_lock *lock)
-{
-       check_res_locked(lock->l_resource);
-
-       LASSERT(!list_empty(&lock->l_bl_ast));
-       list_del_init(&lock->l_bl_ast);
-       LASSERT(ldlm_is_ast_sent(lock));
-       ldlm_clear_ast_sent(lock);
-       LASSERT(lock->l_bl_ast_run == 0);
-       LASSERT(lock->l_blocking_lock);
-       LDLM_LOCK_RELEASE(lock->l_blocking_lock);
-       lock->l_blocking_lock = NULL;
-       LDLM_LOCK_RELEASE(lock);
-}
-
-/* Clear the blocking lock, the race is possible between ldlm_handle_convert0()
- * and ldlm_work_bl_ast_lock(), so this is done under lock with check for NULL.
- */
-void ldlm_clear_blocking_lock(struct ldlm_lock *lock)
-{
-       if (lock->l_blocking_lock) {
-               LDLM_LOCK_RELEASE(lock->l_blocking_lock);
-               lock->l_blocking_lock = NULL;
-       }
-}
-
 /**
  * Discard all AST work items from list.
  *
 /**
  * Discard all AST work items from list.
  *
@@ -2050,13 +2027,17 @@ void ldlm_discard_bl_list(struct list_head *bl_list)
 
        ENTRY;
 
 
        ENTRY;
 
-       list_for_each_entry_safe(lock, tmp, bl_list, l_bl_ast)
-               ldlm_discard_bl_lock(lock);
+       list_for_each_entry_safe(lock, tmp, bl_list, l_bl_ast) {
+               LASSERT(!list_empty(&lock->l_bl_ast));
+               list_del_init(&lock->l_bl_ast);
+               ldlm_clear_ast_sent(lock);
+               LASSERT(lock->l_bl_ast_run == 0);
+               ldlm_clear_blocking_lock(lock);
+               LDLM_LOCK_RELEASE(lock);
+       }
        EXIT;
 }
 
        EXIT;
 }
 
-#endif
-
 /**
  * Process a call to blocking AST callback for a lock in ast_work list
  */
 /**
  * Process a call to blocking AST callback for a lock in ast_work list
  */
@@ -2064,9 +2045,11 @@ static int
 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 {
        struct ldlm_cb_set_arg *arg = opaq;
 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 {
        struct ldlm_cb_set_arg *arg = opaq;
-       struct ldlm_lock_desc   d;
-       int                     rc;
-       struct ldlm_lock       *lock;
+       struct ldlm_lock *lock;
+       struct ldlm_lock_desc d;
+       struct ldlm_bl_desc bld;
+       int rc;
+
        ENTRY;
 
        if (list_empty(arg->list))
        ENTRY;
 
        if (list_empty(arg->list))
@@ -2074,6 +2057,22 @@ ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 
        lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
 
 
        lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
 
+       /* nobody should touch l_bl_ast but some locks in the list may become
+        * granted after lock convert or COS downgrade, these locks should be
+        * just skipped here and removed from the list.
+        */
+       lock_res_and_lock(lock);
+       list_del_init(&lock->l_bl_ast);
+
+       /* lock is not blocking lock anymore, but was kept in the list because
+        * it can managed only here.
+        */
+       if (!ldlm_is_ast_sent(lock)) {
+               unlock_res_and_lock(lock);
+               LDLM_LOCK_RELEASE(lock);
+               RETURN(0);
+       }
+
        LASSERT(lock->l_blocking_lock);
        ldlm_lock2desc(lock->l_blocking_lock, &d);
        /* copy blocking lock ibits in cancel_bits as well,
        LASSERT(lock->l_blocking_lock);
        ldlm_lock2desc(lock->l_blocking_lock, &d);
        /* copy blocking lock ibits in cancel_bits as well,
@@ -2084,66 +2083,23 @@ ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        d.l_policy_data.l_inodebits.cancel_bits =
                lock->l_blocking_lock->l_policy_data.l_inodebits.bits;
 
        d.l_policy_data.l_inodebits.cancel_bits =
                lock->l_blocking_lock->l_policy_data.l_inodebits.bits;
 
-       /* nobody should touch l_bl_ast */
-       lock_res_and_lock(lock);
-       list_del_init(&lock->l_bl_ast);
+       /* Blocking lock is being destroyed here but some information about it
+        * may be needed inside l_blocking_ast() function below,
+        * e.g. in mdt_blocking_ast(). So save needed data in bl_desc.
+        */
+       bld.bl_same_client = lock->l_client_cookie ==
+                            lock->l_blocking_lock->l_client_cookie;
+       bld.bl_cos_incompat = ldlm_is_cos_incompat(lock->l_blocking_lock);
+       arg->bl_desc = &bld;
 
        LASSERT(ldlm_is_ast_sent(lock));
        LASSERT(lock->l_bl_ast_run == 0);
        lock->l_bl_ast_run++;
 
        LASSERT(ldlm_is_ast_sent(lock));
        LASSERT(lock->l_bl_ast_run == 0);
        lock->l_bl_ast_run++;
-       unlock_res_and_lock(lock);
-
-       rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
-#ifdef HAVE_SERVER_SUPPORT
-       lock_res_and_lock(lock);
        ldlm_clear_blocking_lock(lock);
        unlock_res_and_lock(lock);
        ldlm_clear_blocking_lock(lock);
        unlock_res_and_lock(lock);
-#endif
-       LDLM_LOCK_RELEASE(lock);
-
-       RETURN(rc);
-}
-
-/**
- * Process a call to completion AST callback for a lock in ast_work list
- */
-static int
-ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
-{
-       struct ldlm_cb_set_arg  *arg = opaq;
-       int                      rc = 0;
-       struct ldlm_lock        *lock;
-       ldlm_completion_callback completion_callback;
-       ENTRY;
-
-       if (list_empty(arg->list))
-               RETURN(-ENOENT);
-
-       lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
-
-       /* It's possible to receive a completion AST before we've set
-        * the l_completion_ast pointer: either because the AST arrived
-        * before the reply, or simply because there's a small race
-        * window between receiving the reply and finishing the local
-        * enqueue. (bug 842)
-        *
-        * This can't happen with the blocking_ast, however, because we
-        * will never call the local blocking_ast until we drop our
-        * reader/writer reference, which we won't do until we get the
-        * reply and finish enqueueing. */
 
 
-       /* nobody should touch l_cp_ast */
-       lock_res_and_lock(lock);
-       list_del_init(&lock->l_cp_ast);
-       LASSERT(ldlm_is_cp_reqd(lock));
-       /* save l_completion_ast since it can be changed by
-        * mds_intent_policy(), see bug 14225 */
-       completion_callback = lock->l_completion_ast;
-       ldlm_clear_cp_reqd(lock);
-       unlock_res_and_lock(lock);
+       rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
 
 
-       if (completion_callback != NULL)
-               rc = completion_callback(lock, 0, (void *)arg);
        LDLM_LOCK_RELEASE(lock);
 
        RETURN(rc);
        LDLM_LOCK_RELEASE(lock);
 
        RETURN(rc);
@@ -2215,6 +2171,53 @@ int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
+#endif
+
+/**
+ * Process a call to completion AST callback for a lock in ast_work list
+ */
+static int
+ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
+{
+       struct ldlm_cb_set_arg *arg = opaq;
+       struct ldlm_lock *lock;
+       ldlm_completion_callback completion_callback;
+       int rc = 0;
+
+       ENTRY;
+
+       if (list_empty(arg->list))
+               RETURN(-ENOENT);
+
+       lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
+
+       /* It's possible to receive a completion AST before we've set
+        * the l_completion_ast pointer: either because the AST arrived
+        * before the reply, or simply because there's a small race
+        * window between receiving the reply and finishing the local
+        * enqueue. (bug 842)
+        *
+        * This can't happen with the blocking_ast, however, because we
+        * will never call the local blocking_ast until we drop our
+        * reader/writer reference, which we won't do until we get the
+        * reply and finish enqueueing. */
+
+       /* nobody should touch l_cp_ast */
+       lock_res_and_lock(lock);
+       list_del_init(&lock->l_cp_ast);
+       LASSERT(ldlm_is_cp_reqd(lock));
+       /* save l_completion_ast since it can be changed by
+        * mds_intent_policy(), see bug 14225 */
+       completion_callback = lock->l_completion_ast;
+       ldlm_clear_cp_reqd(lock);
+       unlock_res_and_lock(lock);
+
+       if (completion_callback != NULL)
+               rc = completion_callback(lock, 0, (void *)arg);
+       LDLM_LOCK_RELEASE(lock);
+
+       RETURN(rc);
+}
 
 /**
  * Process list of locks in need of ASTs being sent.
 
 /**
  * Process list of locks in need of ASTs being sent.
@@ -2226,8 +2229,8 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
                      ldlm_desc_ast_t ast_type)
 {
        struct ldlm_cb_set_arg *arg;
                      ldlm_desc_ast_t ast_type)
 {
        struct ldlm_cb_set_arg *arg;
-       set_producer_func       work_ast_lock;
-       int                     rc;
+       set_producer_func work_ast_lock;
+       int rc;
 
        if (list_empty(rpc_list))
                RETURN(0);
 
        if (list_empty(rpc_list))
                RETURN(0);
@@ -2240,24 +2243,26 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
        arg->list = rpc_list;
 
        switch (ast_type) {
        arg->list = rpc_list;
 
        switch (ast_type) {
-               case LDLM_WORK_BL_AST:
-                       arg->type = LDLM_BL_CALLBACK;
-                       work_ast_lock = ldlm_work_bl_ast_lock;
-                       break;
-               case LDLM_WORK_CP_AST:
-                       arg->type = LDLM_CP_CALLBACK;
-                       work_ast_lock = ldlm_work_cp_ast_lock;
-                       break;
-               case LDLM_WORK_REVOKE_AST:
-                       arg->type = LDLM_BL_CALLBACK;
-                       work_ast_lock = ldlm_work_revoke_ast_lock;
-                       break;
-               case LDLM_WORK_GL_AST:
-                       arg->type = LDLM_GL_CALLBACK;
-                       work_ast_lock = ldlm_work_gl_ast_lock;
-                       break;
-               default:
-                       LBUG();
+       case LDLM_WORK_CP_AST:
+               arg->type = LDLM_CP_CALLBACK;
+               work_ast_lock = ldlm_work_cp_ast_lock;
+               break;
+#ifdef HAVE_SERVER_SUPPORT
+       case LDLM_WORK_BL_AST:
+               arg->type = LDLM_BL_CALLBACK;
+               work_ast_lock = ldlm_work_bl_ast_lock;
+               break;
+       case LDLM_WORK_REVOKE_AST:
+               arg->type = LDLM_BL_CALLBACK;
+               work_ast_lock = ldlm_work_revoke_ast_lock;
+               break;
+       case LDLM_WORK_GL_AST:
+               arg->type = LDLM_GL_CALLBACK;
+               work_ast_lock = ldlm_work_gl_ast_lock;
+               break;
+#endif
+       default:
+               LBUG();
        }
 
        /* We create a ptlrpc request set with flow control extension.
        }
 
        /* We create a ptlrpc request set with flow control extension.
@@ -2616,13 +2621,18 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
  * convertion may fail if lock was canceled before downgrade, but it doesn't
  * indicate any problem, because such lock has no reader or writer, and will
  * be released soon.
  * convertion may fail if lock was canceled before downgrade, but it doesn't
  * indicate any problem, because such lock has no reader or writer, and will
  * be released soon.
- * Used by Commit on Sharing (COS) code only for now.
+ *
+ * Used by Commit on Sharing (COS) code to force object changes commit in case
+ * of conflict. Converted lock is considered as new lock and all blocking AST
+ * things are cleared, so any pending or new blocked lock on that lock will
+ * cause new call to blocking_ast and force resource object commit.
  *
  * \param lock A lock to convert
  * \param new_mode new lock mode
  */
 void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
 {
  *
  * \param lock A lock to convert
  * \param new_mode new lock mode
  */
 void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
 {
+#ifdef HAVE_SERVER_SUPPORT
        ENTRY;
 
        LASSERT(new_mode == LCK_COS);
        ENTRY;
 
        LASSERT(new_mode == LCK_COS);
@@ -2643,14 +2653,20 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
         * ldlm_grant_lock() called below.
         */
        ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
         * ldlm_grant_lock() called below.
         */
        ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
+
+       /* Consider downgraded lock as a new lock and clear all states
+        * related to a previous blocking AST processing.
+        */
+       ldlm_clear_blocking_data(lock);
+
        lock->l_req_mode = new_mode;
        ldlm_grant_lock(lock, NULL);
        lock->l_req_mode = new_mode;
        ldlm_grant_lock(lock, NULL);
-
        unlock_res_and_lock(lock);
 
        ldlm_reprocess_all(lock->l_resource);
 
        EXIT;
        unlock_res_and_lock(lock);
 
        ldlm_reprocess_all(lock->l_resource);
 
        EXIT;
+#endif
 }
 EXPORT_SYMBOL(ldlm_lock_mode_downgrade);
 
 }
 EXPORT_SYMBOL(ldlm_lock_mode_downgrade);
 
index 5789e8c..5362e8d 100644 (file)
@@ -1490,6 +1490,31 @@ retry:
         return rc;
 }
 
         return rc;
 }
 
+/* Clear the blocking lock, the race is possible between ldlm_handle_convert0()
+ * and ldlm_work_bl_ast_lock(), so this is done under lock with check for NULL.
+ */
+void ldlm_clear_blocking_lock(struct ldlm_lock *lock)
+{
+       if (lock->l_blocking_lock) {
+               LDLM_LOCK_RELEASE(lock->l_blocking_lock);
+               lock->l_blocking_lock = NULL;
+       }
+}
+
+/* A lock can be converted to new ibits or mode and should be considered
+ * as new lock. Clear all states related to a previous blocking AST
+ * processing so new conflicts will cause new blocking ASTs.
+ *
+ * This is used during lock convert below and lock downgrade to COS mode in
+ * ldlm_lock_mode_downgrade().
+ */
+void ldlm_clear_blocking_data(struct ldlm_lock *lock)
+{
+       ldlm_clear_ast_sent(lock);
+       lock->l_bl_ast_run = 0;
+       ldlm_clear_blocking_lock(lock);
+}
+
 /**
  * Main LDLM entry point for server code to process lock conversion requests.
  */
 /**
  * Main LDLM entry point for server code to process lock conversion requests.
  */
@@ -1543,20 +1568,8 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
                        ldlm_clear_cbpending(lock);
                        lock->l_policy_data.l_inodebits.cancel_bits = 0;
                        ldlm_inodebits_drop(lock, bits & ~new);
                        ldlm_clear_cbpending(lock);
                        lock->l_policy_data.l_inodebits.cancel_bits = 0;
                        ldlm_inodebits_drop(lock, bits & ~new);
-                       /* if lock is in a bl_ast list, remove it from the list
-                        * here before reprocessing.
-                        */
-                       if (!list_empty(&lock->l_bl_ast)) {
-                               ldlm_discard_bl_lock(lock);
-                       } else {
-                               /* in this case lock was taken from bl_ast list
-                                * already by ldlm_work_bl_ast_lock() and lock
-                                * must clear only some remaining states.
-                                */
-                               ldlm_clear_ast_sent(lock);
-                               lock->l_bl_ast_run = 0;
-                               ldlm_clear_blocking_lock(lock);
-                       }
+
+                       ldlm_clear_blocking_data(lock);
                        unlock_res_and_lock(lock);
 
                        ldlm_reprocess_all(lock->l_resource);
                        unlock_res_and_lock(lock);
 
                        ldlm_reprocess_all(lock->l_resource);
index fd46f71..94a675a 100644 (file)
@@ -2658,6 +2658,7 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 {
        struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
 {
        struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       struct ldlm_cb_set_arg *arg = data;
        bool commit_async = false;
        int rc;
        ENTRY;
        bool commit_async = false;
        int rc;
        ENTRY;
@@ -2670,17 +2671,22 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                unlock_res_and_lock(lock);
                RETURN(0);
        }
                unlock_res_and_lock(lock);
                RETURN(0);
        }
-       /* There is no lock conflict if l_blocking_lock == NULL,
-        * it indicates a blocking ast sent from ldlm_lock_decref_internal
-        * when the last reference to a local lock was released */
-       if (lock->l_req_mode & (LCK_PW | LCK_EX) &&
-           lock->l_blocking_lock != NULL) {
+
+       /* A blocking ast may be sent from ldlm_lock_decref_internal
+        * when the last reference to a local lock was released and
+        * during blocking event from ldlm_work_bl_ast_lock().
+        * The 'data' parameter is l_ast_data in the first case and
+        * callback arguments in the second one. Distinguish them by that.
+        */
+       if (!data || data == lock->l_ast_data || !arg->bl_desc)
+               goto skip_cos_checks;
+
+       if (lock->l_req_mode & (LCK_PW | LCK_EX)) {
                if (mdt_cos_is_enabled(mdt)) {
                if (mdt_cos_is_enabled(mdt)) {
-                       if (lock->l_client_cookie !=
-                           lock->l_blocking_lock->l_client_cookie)
+                       if (!arg->bl_desc->bl_same_client)
                                mdt_set_lock_sync(lock);
                } else if (mdt_slc_is_enabled(mdt) &&
                                mdt_set_lock_sync(lock);
                } else if (mdt_slc_is_enabled(mdt) &&
-                          ldlm_is_cos_incompat(lock->l_blocking_lock)) {
+                          arg->bl_desc->bl_cos_incompat) {
                        mdt_set_lock_sync(lock);
                        /*
                         * we may do extra commit here, but there is a small
                        mdt_set_lock_sync(lock);
                        /*
                         * we may do extra commit here, but there is a small
@@ -2694,11 +2700,11 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                         */
                        commit_async = true;
                }
                         */
                        commit_async = true;
                }
-       } else if (lock->l_req_mode == LCK_COS &&
-                  lock->l_blocking_lock != NULL) {
+       } else if (lock->l_req_mode == LCK_COS) {
                commit_async = true;
        }
 
                commit_async = true;
        }
 
+skip_cos_checks:
        rc = ldlm_blocking_ast_nocheck(lock);
 
        if (commit_async) {
        rc = ldlm_blocking_ast_nocheck(lock);
 
        if (commit_async) {