* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2016, Intel Corporation.
+ * Copyright (c) 2010, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* NOTE: called by
* - ldlm_lock_enqueue
* - ldlm_reprocess_queue
- * - ldlm_lock_convert
*
* must be called with lr_lock held
*/
GOTO(out, rc = 1);
if (flags & LDLM_FL_BLOCK_GRANTED)
GOTO(out, rc = 0);
- lock = search_queue(&res->lr_converting, &data);
- if (lock != NULL)
- GOTO(out, rc = 1);
lock = search_queue(&res->lr_waiting, &data);
if (lock != NULL)
GOTO(out, rc = 1);
RETURN(ERR_PTR(rc));
}
+#ifdef HAVE_SERVER_SUPPORT
+static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock,
+ __u64 *flags)
+{
+ struct ldlm_resource *res = lock->l_resource;
+ enum ldlm_error rc = ELDLM_OK;
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ ldlm_processing_policy policy;
+ ENTRY;
+
+ policy = ldlm_processing_policy_table[res->lr_type];
+restart:
+ policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
+ if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
+ res->lr_type != LDLM_FLOCK) {
+ rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list);
+ if (rc == -ERESTART)
+ GOTO(restart, rc);
+ }
+
+ if (!list_empty(&rpc_list))
+ ldlm_discard_bl_list(&rpc_list);
+
+ RETURN(rc);
+}
+#endif
+
/**
* Enqueue (request) a lock.
*
struct ldlm_lock *lock = *lockp;
struct ldlm_resource *res = lock->l_resource;
int local = ns_is_client(ldlm_res_to_ns(res));
-#ifdef HAVE_SERVER_SUPPORT
- ldlm_processing_policy policy;
-#endif
enum ldlm_error rc = ELDLM_OK;
struct ldlm_interval *node = NULL;
ENTRY;
* more or less trusting the clients not to lie.
*
* FIXME (bug 268): Detect obvious lies by checking compatibility in
- * granted/converting queues. */
+ * granted queue. */
if (local) {
- if (*flags & LDLM_FL_BLOCK_CONV)
- ldlm_resource_add_lock(res, &res->lr_converting, lock);
- else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
- ldlm_resource_add_lock(res, &res->lr_waiting, lock);
- else
- ldlm_grant_lock(lock, NULL);
+ if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
+ ldlm_resource_add_lock(res, &res->lr_waiting, lock);
+ else
+ ldlm_grant_lock(lock, NULL);
GOTO(out, rc = ELDLM_OK);
#ifdef HAVE_SERVER_SUPPORT
- } else if (*flags & LDLM_FL_REPLAY) {
- if (*flags & LDLM_FL_BLOCK_CONV) {
- ldlm_resource_add_lock(res, &res->lr_converting, lock);
- GOTO(out, rc = ELDLM_OK);
- } else if (*flags & LDLM_FL_BLOCK_WAIT) {
- ldlm_resource_add_lock(res, &res->lr_waiting, lock);
+ } else if (*flags & LDLM_FL_REPLAY) {
+ if (*flags & LDLM_FL_BLOCK_WAIT) {
+ ldlm_resource_add_lock(res, &res->lr_waiting, lock);
GOTO(out, rc = ELDLM_OK);
- } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
- ldlm_grant_lock(lock, NULL);
+ } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
+ ldlm_grant_lock(lock, NULL);
GOTO(out, rc = ELDLM_OK);
- }
- /* If no flags, fall through to normal enqueue path. */
- }
+ }
+ /* If no flags, fall through to normal enqueue path. */
+ }
- policy = ldlm_processing_policy_table[res->lr_type];
- policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL);
- GOTO(out, rc);
+ rc = ldlm_lock_enqueue_helper(lock, flags);
+ GOTO(out, rc);
#else
} else {
CERROR("This is client-side-only module, cannot handle "
__u64 flags;
int rc = LDLM_ITER_CONTINUE;
enum ldlm_error err;
+ struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list);
ENTRY;
check_res_locked(res);
LASSERT(intention == LDLM_PROCESS_RESCAN ||
intention == LDLM_PROCESS_RECOVERY);
+restart:
list_for_each_safe(tmp, pos, queue) {
struct ldlm_lock *pending;
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
pending = list_entry(tmp, struct ldlm_lock, l_res_link);
CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
flags = 0;
- rc = policy(pending, &flags, intention, &err, work_list);
+ rc = policy(pending, &flags, intention, &err, &rpc_list);
+ if (pending->l_granted_mode == pending->l_req_mode ||
+ res->lr_type == LDLM_FLOCK) {
+ list_splice(&rpc_list, work_list);
+ } else {
+ list_splice(&rpc_list, &bl_ast_list);
+ }
/*
* When this is called from recovery done, we always want
* to scan the whole list no matter what 'rc' is returned.
break;
}
+ if (!list_empty(&bl_ast_list)) {
+ unlock_res(res);
+
+ LASSERT(intention == LDLM_PROCESS_RECOVERY);
+
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list,
+ LDLM_WORK_BL_AST);
+
+ lock_res(res);
+ if (rc == -ERESTART)
+ GOTO(restart, rc);
+ }
+
+ if (!list_empty(&bl_ast_list))
+ ldlm_discard_bl_list(&bl_ast_list);
+
RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
}
* \param[in] lock The lock to be enqueued.
* \param[out] flags Lock flags for the lock to be enqueued.
* \param[in] rpc_list Conflicting locks list.
- * \param[in] grant_flags extra flags when granting a lock.
*
* \retval -ERESTART: Some lock was instantly canceled while sending
* blocking ASTs, caller needs to re-check conflicting
* \reval 0: Lock is successfully added in waiting list.
*/
int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
- struct list_head *rpc_list, __u64 grant_flags)
+ struct list_head *rpc_list)
{
struct ldlm_resource *res = lock->l_resource;
int rc;
RETURN(rc);
}
- *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags);
+ *flags |= LDLM_FL_BLOCK_GRANTED;
RETURN(0);
}
/**
* Try to grant all waiting locks on a resource.
*
- * Calls ldlm_reprocess_queue on converting and waiting queues.
+ * Calls ldlm_reprocess_queue on waiting queue.
*
* Typically called after some resource locks are cancelled to see
* if anything could be granted as a result of the cancellation.
RETURN_EXIT;
restart:
lock_res(res);
- rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list,
- intention);
- if (rc == LDLM_ITER_CONTINUE)
- ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list,
- intention);
+ ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, intention);
unlock_res(res);
- rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
- LDLM_WORK_CP_AST);
- if (rc == -ERESTART) {
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
+ LDLM_WORK_CP_AST);
+ if (rc == -ERESTART) {
LASSERT(list_empty(&rpc_list));
- goto restart;
- }
+ goto restart;
+ }
#else
ENTRY;
}
/**
- * Downgrade an exclusive lock.
+ * Downgrade an PW/EX lock to COS mode.
*
- * A fast variant of ldlm_lock_convert for convertion of exclusive locks. The
+ * A lock mode convertion from PW/EX mode to less conflict mode. The
* convertion may fail if lock was canceled before downgrade, but it doesn't
* indicate any problem, because such lock has no reader or writer, and will
* be released soon.
- * Used by Commit on Sharing (COS) code.
+ * Used by Commit on Sharing (COS) code only for now.
*
* \param lock A lock to convert
* \param new_mode new lock mode
*/
-void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
+void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
{
ENTRY;
EXIT;
}
-EXPORT_SYMBOL(ldlm_lock_downgrade);
-
-/**
- * Attempt to convert already granted lock to a different mode.
- *
- * While lock conversion is not currently used, future client-side
- * optimizations could take advantage of it to avoid discarding cached
- * pages on a file.
- */
-struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock,
- enum ldlm_mode new_mode, __u32 *flags)
-{
- struct list_head rpc_list;
- struct ldlm_resource *res;
- struct ldlm_namespace *ns;
- int granted = 0;
-#ifdef HAVE_SERVER_SUPPORT
- int old_mode;
- struct sl_insert_point prev;
-#endif
- struct ldlm_interval *node;
- ENTRY;
-
- INIT_LIST_HEAD(&rpc_list);
- /* Just return if mode is unchanged. */
- if (new_mode == lock->l_granted_mode) {
- *flags |= LDLM_FL_BLOCK_GRANTED;
- RETURN(lock->l_resource);
- }
-
- /* I can't check the type of lock here because the bitlock of lock
- * is not held here, so do the allocation blindly. -jay */
- OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
- if (node == NULL) /* Actually, this causes EDEADLOCK to be returned */
- RETURN(NULL);
-
- LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
- "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
-
- lock_res_and_lock(lock);
-
- res = lock->l_resource;
- ns = ldlm_res_to_ns(res);
-
-#ifdef HAVE_SERVER_SUPPORT
- old_mode = lock->l_req_mode;
-#endif
- lock->l_req_mode = new_mode;
- if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
-#ifdef HAVE_SERVER_SUPPORT
- /* remember the lock position where the lock might be
- * added back to the granted list later and also
- * remember the join mode for skiplist fixing. */
- prev.res_link = lock->l_res_link.prev;
- prev.mode_link = lock->l_sl_mode.prev;
- prev.policy_link = lock->l_sl_policy.prev;
-#endif
- ldlm_resource_unlink_lock(lock);
- } else {
- ldlm_resource_unlink_lock(lock);
- if (res->lr_type == LDLM_EXTENT) {
- /* FIXME: ugly code, I have to attach the lock to a
- * interval node again since perhaps it will be granted
- * soon */
- INIT_LIST_HEAD(&node->li_group);
- ldlm_interval_attach(node, lock);
- node = NULL;
- }
- }
-
- /*
- * Remove old lock from the pool before adding the lock with new
- * mode below in ->policy()
- */
- ldlm_pool_del(&ns->ns_pool, lock);
-
- /* If this is a local resource, put it on the appropriate list. */
- if (ns_is_client(ldlm_res_to_ns(res))) {
- if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
- ldlm_resource_add_lock(res, &res->lr_converting, lock);
- } else {
- /* This should never happen, because of the way the
- * server handles conversions. */
- LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
- *flags);
- LBUG();
-
- ldlm_grant_lock(lock, &rpc_list);
- granted = 1;
- /* FIXME: completion handling not with lr_lock held ! */
- if (lock->l_completion_ast)
- lock->l_completion_ast(lock, 0, NULL);
- }
-#ifdef HAVE_SERVER_SUPPORT
- } else {
- int rc;
- enum ldlm_error err;
- __u64 pflags = 0;
- ldlm_processing_policy policy;
-
- policy = ldlm_processing_policy_table[res->lr_type];
- rc = policy(lock, &pflags, LDLM_PROCESS_RESCAN, &err,
- &rpc_list);
- if (rc == LDLM_ITER_STOP) {
- lock->l_req_mode = old_mode;
- if (res->lr_type == LDLM_EXTENT)
- ldlm_extent_add_lock(res, lock);
- else
- ldlm_granted_list_add_lock(lock, &prev);
-
- res = NULL;
- } else {
- *flags |= LDLM_FL_BLOCK_GRANTED;
- granted = 1;
- }
- }
-#else
- } else {
- CERROR("This is client-side-only module, cannot handle "
- "LDLM_NAMESPACE_SERVER resource type lock.\n");
- LBUG();
- }
-#endif
- unlock_res_and_lock(lock);
-
- if (granted)
- ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
- if (node)
- OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
- RETURN(res);
-}
+EXPORT_SYMBOL(ldlm_lock_mode_downgrade);
/**
* Print lock with lock handle \a lockh description into debug log.
va_start(args, fmt);
if (exp && exp->exp_connection) {
- nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
+ nid = obd_export_nid2str(exp);
} else if (exp && exp->exp_obd != NULL) {
struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
- nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
+ nid = obd_import_nid2str(imp);
}
if (resource == NULL) {