* discard dirty data, rather than writing back. */
if (ldlm_is_ast_discard_data(new))
ldlm_set_discard_data(lock);
- LASSERT(list_empty(&lock->l_bl_ast));
- list_add(&lock->l_bl_ast, work_list);
- LDLM_LOCK_GET(lock);
- LASSERT(lock->l_blocking_lock == NULL);
- lock->l_blocking_lock = LDLM_LOCK_GET(new);
- }
+
+ /* Lock can be converted from a blocking state back to granted
+ * after lock convert or COS downgrade but still be in an
+ * older bl_list because it is controlled only by
+ * ldlm_work_bl_ast_lock(), let it be processed there.
+ */
+ if (list_empty(&lock->l_bl_ast)) {
+ list_add(&lock->l_bl_ast, work_list);
+ LDLM_LOCK_GET(lock);
+ }
+ LASSERT(lock->l_blocking_lock == NULL);
+ lock->l_blocking_lock = LDLM_LOCK_GET(new);
+ }
}
/**
RETURN(0);
}
-/* Discard a single lock from bl_list, may be used by
- * lock convert handler when lock is returned to the granted
- * state.
- */
-void ldlm_discard_bl_lock(struct ldlm_lock *lock)
-{
- check_res_locked(lock->l_resource);
-
- LASSERT(!list_empty(&lock->l_bl_ast));
- list_del_init(&lock->l_bl_ast);
- LASSERT(ldlm_is_ast_sent(lock));
- ldlm_clear_ast_sent(lock);
- LASSERT(lock->l_bl_ast_run == 0);
- LASSERT(lock->l_blocking_lock);
- LDLM_LOCK_RELEASE(lock->l_blocking_lock);
- lock->l_blocking_lock = NULL;
- LDLM_LOCK_RELEASE(lock);
-}
-
-/* Clear the blocking lock, the race is possible between ldlm_handle_convert0()
- * and ldlm_work_bl_ast_lock(), so this is done under lock with check for NULL.
- */
-void ldlm_clear_blocking_lock(struct ldlm_lock *lock)
-{
- if (lock->l_blocking_lock) {
- LDLM_LOCK_RELEASE(lock->l_blocking_lock);
- lock->l_blocking_lock = NULL;
- }
-}
-
/**
* Discard all AST work items from list.
*
ENTRY;
- list_for_each_entry_safe(lock, tmp, bl_list, l_bl_ast)
- ldlm_discard_bl_lock(lock);
+ list_for_each_entry_safe(lock, tmp, bl_list, l_bl_ast) {
+ LASSERT(!list_empty(&lock->l_bl_ast));
+ list_del_init(&lock->l_bl_ast);
+ ldlm_clear_ast_sent(lock);
+ LASSERT(lock->l_bl_ast_run == 0);
+ ldlm_clear_blocking_lock(lock);
+ LDLM_LOCK_RELEASE(lock);
+ }
EXIT;
}
-#endif
-
/**
* Process a call to blocking AST callback for a lock in ast_work list
*/
ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
{
struct ldlm_cb_set_arg *arg = opaq;
- struct ldlm_lock_desc d;
- int rc;
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock;
+ struct ldlm_lock_desc d;
+ struct ldlm_bl_desc bld;
+ int rc;
+
ENTRY;
if (list_empty(arg->list))
lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
+ /* nobody should touch l_bl_ast but some locks in the list may become
+ * granted after lock convert or COS downgrade, these locks should be
+ * just skipped here and removed from the list.
+ */
+ lock_res_and_lock(lock);
+ list_del_init(&lock->l_bl_ast);
+
+ /* lock is not blocking lock anymore, but was kept in the list because
+ * it can managed only here.
+ */
+ if (!ldlm_is_ast_sent(lock)) {
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_RELEASE(lock);
+ RETURN(0);
+ }
+
LASSERT(lock->l_blocking_lock);
ldlm_lock2desc(lock->l_blocking_lock, &d);
/* copy blocking lock ibits in cancel_bits as well,
d.l_policy_data.l_inodebits.cancel_bits =
lock->l_blocking_lock->l_policy_data.l_inodebits.bits;
- /* nobody should touch l_bl_ast */
- lock_res_and_lock(lock);
- list_del_init(&lock->l_bl_ast);
+ /* Blocking lock is being destroyed here but some information about it
+ * may be needed inside l_blocking_ast() function below,
+ * e.g. in mdt_blocking_ast(). So save needed data in bl_desc.
+ */
+ bld.bl_same_client = lock->l_client_cookie ==
+ lock->l_blocking_lock->l_client_cookie;
+ bld.bl_cos_incompat = ldlm_is_cos_incompat(lock->l_blocking_lock);
+ arg->bl_desc = &bld;
LASSERT(ldlm_is_ast_sent(lock));
LASSERT(lock->l_bl_ast_run == 0);
lock->l_bl_ast_run++;
- unlock_res_and_lock(lock);
-
- rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
-#ifdef HAVE_SERVER_SUPPORT
- lock_res_and_lock(lock);
ldlm_clear_blocking_lock(lock);
unlock_res_and_lock(lock);
-#endif
- LDLM_LOCK_RELEASE(lock);
-
- RETURN(rc);
-}
-
-/**
- * Process a call to completion AST callback for a lock in ast_work list
- */
-static int
-ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
-{
- struct ldlm_cb_set_arg *arg = opaq;
- int rc = 0;
- struct ldlm_lock *lock;
- ldlm_completion_callback completion_callback;
- ENTRY;
-
- if (list_empty(arg->list))
- RETURN(-ENOENT);
-
- lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
-
- /* It's possible to receive a completion AST before we've set
- * the l_completion_ast pointer: either because the AST arrived
- * before the reply, or simply because there's a small race
- * window between receiving the reply and finishing the local
- * enqueue. (bug 842)
- *
- * This can't happen with the blocking_ast, however, because we
- * will never call the local blocking_ast until we drop our
- * reader/writer reference, which we won't do until we get the
- * reply and finish enqueueing. */
- /* nobody should touch l_cp_ast */
- lock_res_and_lock(lock);
- list_del_init(&lock->l_cp_ast);
- LASSERT(ldlm_is_cp_reqd(lock));
- /* save l_completion_ast since it can be changed by
- * mds_intent_policy(), see bug 14225 */
- completion_callback = lock->l_completion_ast;
- ldlm_clear_cp_reqd(lock);
- unlock_res_and_lock(lock);
+ rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
- if (completion_callback != NULL)
- rc = completion_callback(lock, 0, (void *)arg);
LDLM_LOCK_RELEASE(lock);
RETURN(rc);
RETURN(rc);
}
+#endif
+
+/**
+ * Process a call to completion AST callback for a lock in ast_work list
+ */
+static int
+ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
+{
+ struct ldlm_cb_set_arg *arg = opaq;
+ struct ldlm_lock *lock;
+ ldlm_completion_callback completion_callback;
+ int rc = 0;
+
+ ENTRY;
+
+ if (list_empty(arg->list))
+ RETURN(-ENOENT);
+
+ lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
+
+ /* It's possible to receive a completion AST before we've set
+ * the l_completion_ast pointer: either because the AST arrived
+ * before the reply, or simply because there's a small race
+ * window between receiving the reply and finishing the local
+ * enqueue. (bug 842)
+ *
+ * This can't happen with the blocking_ast, however, because we
+ * will never call the local blocking_ast until we drop our
+ * reader/writer reference, which we won't do until we get the
+ * reply and finish enqueueing. */
+
+ /* nobody should touch l_cp_ast */
+ lock_res_and_lock(lock);
+ list_del_init(&lock->l_cp_ast);
+ LASSERT(ldlm_is_cp_reqd(lock));
+ /* save l_completion_ast since it can be changed by
+ * mds_intent_policy(), see bug 14225 */
+ completion_callback = lock->l_completion_ast;
+ ldlm_clear_cp_reqd(lock);
+ unlock_res_and_lock(lock);
+
+ if (completion_callback != NULL)
+ rc = completion_callback(lock, 0, (void *)arg);
+ LDLM_LOCK_RELEASE(lock);
+
+ RETURN(rc);
+}
/**
* Process list of locks in need of ASTs being sent.
ldlm_desc_ast_t ast_type)
{
struct ldlm_cb_set_arg *arg;
- set_producer_func work_ast_lock;
- int rc;
+ set_producer_func work_ast_lock;
+ int rc;
if (list_empty(rpc_list))
RETURN(0);
arg->list = rpc_list;
switch (ast_type) {
- case LDLM_WORK_BL_AST:
- arg->type = LDLM_BL_CALLBACK;
- work_ast_lock = ldlm_work_bl_ast_lock;
- break;
- case LDLM_WORK_CP_AST:
- arg->type = LDLM_CP_CALLBACK;
- work_ast_lock = ldlm_work_cp_ast_lock;
- break;
- case LDLM_WORK_REVOKE_AST:
- arg->type = LDLM_BL_CALLBACK;
- work_ast_lock = ldlm_work_revoke_ast_lock;
- break;
- case LDLM_WORK_GL_AST:
- arg->type = LDLM_GL_CALLBACK;
- work_ast_lock = ldlm_work_gl_ast_lock;
- break;
- default:
- LBUG();
+ case LDLM_WORK_CP_AST:
+ arg->type = LDLM_CP_CALLBACK;
+ work_ast_lock = ldlm_work_cp_ast_lock;
+ break;
+#ifdef HAVE_SERVER_SUPPORT
+ case LDLM_WORK_BL_AST:
+ arg->type = LDLM_BL_CALLBACK;
+ work_ast_lock = ldlm_work_bl_ast_lock;
+ break;
+ case LDLM_WORK_REVOKE_AST:
+ arg->type = LDLM_BL_CALLBACK;
+ work_ast_lock = ldlm_work_revoke_ast_lock;
+ break;
+ case LDLM_WORK_GL_AST:
+ arg->type = LDLM_GL_CALLBACK;
+ work_ast_lock = ldlm_work_gl_ast_lock;
+ break;
+#endif
+ default:
+ LBUG();
}
/* We create a ptlrpc request set with flow control extension.
* convertion may fail if lock was canceled before downgrade, but it doesn't
* indicate any problem, because such lock has no reader or writer, and will
* be released soon.
- * Used by Commit on Sharing (COS) code only for now.
+ *
+ * Used by Commit on Sharing (COS) code to force object changes commit in case
+ * of conflict. Converted lock is considered as new lock and all blocking AST
+ * things are cleared, so any pending or new blocked lock on that lock will
+ * cause new call to blocking_ast and force resource object commit.
*
* \param lock A lock to convert
* \param new_mode new lock mode
*/
void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
{
+#ifdef HAVE_SERVER_SUPPORT
ENTRY;
LASSERT(new_mode == LCK_COS);
* ldlm_grant_lock() called below.
*/
ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
+
+ /* Consider downgraded lock as a new lock and clear all states
+ * related to a previous blocking AST processing.
+ */
+ ldlm_clear_blocking_data(lock);
+
lock->l_req_mode = new_mode;
ldlm_grant_lock(lock, NULL);
-
unlock_res_and_lock(lock);
ldlm_reprocess_all(lock->l_resource);
EXIT;
+#endif
}
EXPORT_SYMBOL(ldlm_lock_mode_downgrade);