* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_LDLM
#include <libcfs/libcfs.h>
-#include <linux/lustre_intent.h>
#include <obd_class.h>
#include "ldlm_internal.h"
[LDLM_FLOCK] = "FLK",
[LDLM_IBITS] = "IBT",
};
-EXPORT_SYMBOL(ldlm_typename);
-static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
+static ldlm_policy_wire_to_local_t ldlm_policy_wire_to_local[] = {
[LDLM_PLAIN - LDLM_MIN_TYPE] = ldlm_plain_policy_wire_to_local,
[LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
- [LDLM_FLOCK - LDLM_MIN_TYPE] = ldlm_flock_policy_wire18_to_local,
- [LDLM_IBITS - LDLM_MIN_TYPE] = ldlm_ibits_policy_wire_to_local,
-};
-
-static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
- [LDLM_PLAIN - LDLM_MIN_TYPE] = ldlm_plain_policy_wire_to_local,
- [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
- [LDLM_FLOCK - LDLM_MIN_TYPE] = ldlm_flock_policy_wire21_to_local,
+ [LDLM_FLOCK - LDLM_MIN_TYPE] = ldlm_flock_policy_wire_to_local,
[LDLM_IBITS - LDLM_MIN_TYPE] = ldlm_ibits_policy_wire_to_local,
};
ldlm_policy_data_t *lpolicy)
{
ldlm_policy_wire_to_local_t convert;
- int new_client;
- /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
- new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0;
- if (new_client)
- convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
- else
- convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
+ convert = ldlm_policy_wire_to_local[type - LDLM_MIN_TYPE];
convert(wpolicy, lpolicy);
}
/**
* Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
+ *
+ * If \a last_use is non-zero, it will remove the lock from LRU only if
+ * it matches lock's l_last_used.
+ *
+ * \retval 0 if \a last_use is set, the lock is not in LRU list or \a last_use
+ * doesn't match lock's l_last_used;
+ * otherwise, the lock hasn't been in the LRU list.
+ * \retval 1 the lock was in LRU list and removed.
*/
-int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
+int ldlm_lock_remove_from_lru_check(struct ldlm_lock *lock, cfs_time_t last_use)
{
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
- int rc;
+ int rc = 0;
ENTRY;
if (ldlm_is_ns_srv(lock)) {
}
spin_lock(&ns->ns_lock);
- rc = ldlm_lock_remove_from_lru_nolock(lock);
+ if (last_use == 0 || last_use == lock->l_last_used)
+ rc = ldlm_lock_remove_from_lru_nolock(lock);
spin_unlock(&ns->ns_lock);
- EXIT;
- return rc;
+
+ RETURN(rc);
}
/**
* ldlm_lock_destroy, you can never drop your final references on this lock.
* Because it's not in the hash table anymore. -phil
*/
-int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
+static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
{
ENTRY;
OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
}
-struct portals_handle_ops lock_handle_ops = {
+static struct portals_handle_ops lock_handle_ops = {
.hop_addref = lock_handle_addref,
.hop_free = lock_handle_free,
};
RETURN(0);
}
-EXPORT_SYMBOL(ldlm_lock_change_resource);
/** \defgroup ldlm_handles LDLM HANDLES
* Ways to get hold of locks without any addresses.
&lock->l_policy_data,
&desc->l_policy_data);
}
-EXPORT_SYMBOL(ldlm_lock2desc);
/**
* Add a lock to list of conflicting locks to send AST to.
*
* Only add if we have not sent a blocking AST to the lock yet.
*/
-void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
- struct list_head *work_list)
+static void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
+ struct list_head *work_list)
{
if (!ldlm_is_ast_sent(lock)) {
LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
/**
* Add a lock to list of just granted locks to send completion AST to.
*/
-void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
+static void ldlm_add_cp_work_item(struct ldlm_lock *lock,
+ struct list_head *work_list)
{
if (!ldlm_is_cp_reqd(lock)) {
ldlm_set_cp_reqd(lock);
*/
void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
{
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock;
- lock = ldlm_handle2lock(lockh);
- LASSERT(lock != NULL);
- ldlm_lock_addref_internal(lock, mode);
- LDLM_LOCK_PUT(lock);
+ lock = ldlm_handle2lock(lockh);
+ LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
+ ldlm_lock_addref_internal(lock, mode);
+ LDLM_LOCK_PUT(lock);
}
EXPORT_SYMBOL(ldlm_lock_addref);
ldlm_set_cbpending(lock);
}
- if (!lock->l_readers && !lock->l_writers &&
- ldlm_is_cbpending(lock)) {
- /* If we received a blocked AST and this was the last reference,
- * run the callback. */
+ if (!lock->l_readers && !lock->l_writers &&
+ (ldlm_is_cbpending(lock) || lock->l_req_mode == LCK_GROUP)) {
+ /* If we received a blocked AST and this was the last reference,
+ * run the callback.
+ * Group locks are special:
+ * They must not go in LRU, but they are not called back
+ * like non-group locks, instead they are manually released.
+ * They have an l_writers reference which they keep until
+ * they are manually released, so we remove them when they have
+ * no more reader or writer references. - LU-6368 */
if (ldlm_is_ns_srv(lock) && lock->l_export)
CERROR("FL_CBPENDING set on non-local lock--just a "
"warning\n");
* \a lockh and mark it for subsequent cancellation once r/w refcount
* drops to zero instead of putting into LRU.
*
- * Typical usage is for GROUP locks which we cannot allow to be cached.
*/
void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
{
}
/**
- * Search for a lock with given properties in a queue.
+ * Describe the overlap between two locks. itree_overlap_cb data.
+ */
+struct lock_match_data {
+ struct ldlm_lock *lmd_old;
+ struct ldlm_lock *lmd_lock;
+ ldlm_mode_t *lmd_mode;
+ ldlm_policy_data_t *lmd_policy;
+ __u64 lmd_flags;
+ int lmd_unref;
+};
+
+/**
+ * Check if the given @lock meets the criteria for a match.
+ * A reference on the lock is taken if matched.
*
- * \retval a referenced lock or NULL. See the flag descriptions below, in the
- * comment above ldlm_lock_match
+ * \param lock test-against this lock
+ * \param data parameters
*/
-static struct ldlm_lock *search_queue(struct list_head *queue,
- ldlm_mode_t *mode,
- ldlm_policy_data_t *policy,
- struct ldlm_lock *old_lock,
- __u64 flags, int unref)
-{
- struct ldlm_lock *lock;
- struct list_head *tmp;
+static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
+{
+ ldlm_policy_data_t *lpol = &lock->l_policy_data;
+ ldlm_mode_t match;
+
+ if (lock == data->lmd_old)
+ return INTERVAL_ITER_STOP;
+
+ /* Check if this lock can be matched.
+ * Used by LU-2919(exclusive open) for open lease lock */
+ if (ldlm_is_excl(lock))
+ return INTERVAL_ITER_CONT;
+
+ /* llite sometimes wants to match locks that will be
+ * canceled when their users drop, but we allow it to match
+ * if it passes in CBPENDING and the lock still has users.
+ * this is generally only going to be used by children
+ * whose parents already hold a lock so forward progress
+ * can still happen. */
+ if (ldlm_is_cbpending(lock) &&
+ !(data->lmd_flags & LDLM_FL_CBPENDING))
+ return INTERVAL_ITER_CONT;
+ if (!data->lmd_unref && ldlm_is_cbpending(lock) &&
+ lock->l_readers == 0 && lock->l_writers == 0)
+ return INTERVAL_ITER_CONT;
+
+ if (!(lock->l_req_mode & *data->lmd_mode))
+ return INTERVAL_ITER_CONT;
+ match = lock->l_req_mode;
+
+ switch (lock->l_resource->lr_type) {
+ case LDLM_EXTENT:
+ if (lpol->l_extent.start > data->lmd_policy->l_extent.start ||
+ lpol->l_extent.end < data->lmd_policy->l_extent.end)
+ return INTERVAL_ITER_CONT;
+
+ if (unlikely(match == LCK_GROUP) &&
+ data->lmd_policy->l_extent.gid != LDLM_GID_ANY &&
+ lpol->l_extent.gid != data->lmd_policy->l_extent.gid)
+ return INTERVAL_ITER_CONT;
+ break;
+ case LDLM_IBITS:
+ /* We match if we have existing lock with same or wider set
+ of bits. */
+ if ((lpol->l_inodebits.bits &
+ data->lmd_policy->l_inodebits.bits) !=
+ data->lmd_policy->l_inodebits.bits)
+ return INTERVAL_ITER_CONT;
+ break;
+ default:
+ ;
+ }
- list_for_each(tmp, queue) {
- ldlm_mode_t match;
+ /* We match if we have existing lock with same or wider set
+ of bits. */
+ if (!data->lmd_unref && LDLM_HAVE_MASK(lock, GONE))
+ return INTERVAL_ITER_CONT;
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ if ((data->lmd_flags & LDLM_FL_LOCAL_ONLY) &&
+ !ldlm_is_local(lock))
+ return INTERVAL_ITER_CONT;
- if (lock == old_lock)
- break;
+ if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
+ LDLM_LOCK_GET(lock);
+ ldlm_lock_touch_in_lru(lock);
+ } else {
+ ldlm_lock_addref_internal_nolock(lock, match);
+ }
- /* Check if this lock can be matched.
- * Used by LU-2919(exclusive open) for open lease lock */
- if (ldlm_is_excl(lock))
- continue;
+ *data->lmd_mode = match;
+ data->lmd_lock = lock;
- /* llite sometimes wants to match locks that will be
- * canceled when their users drop, but we allow it to match
- * if it passes in CBPENDING and the lock still has users.
- * this is generally only going to be used by children
- * whose parents already hold a lock so forward progress
- * can still happen. */
- if (ldlm_is_cbpending(lock) &&
- !(flags & LDLM_FL_CBPENDING))
- continue;
- if (!unref && ldlm_is_cbpending(lock) &&
- lock->l_readers == 0 && lock->l_writers == 0)
- continue;
+ return INTERVAL_ITER_STOP;
+}
- if (!(lock->l_req_mode & *mode))
- continue;
- match = lock->l_req_mode;
+static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
+{
+ struct ldlm_interval *node = to_ldlm_interval(in);
+ struct lock_match_data *data = args;
+ struct ldlm_lock *lock;
+ int rc;
- if (lock->l_resource->lr_type == LDLM_EXTENT &&
- (lock->l_policy_data.l_extent.start >
- policy->l_extent.start ||
- lock->l_policy_data.l_extent.end < policy->l_extent.end))
- continue;
+ list_for_each_entry(lock, &node->li_group, l_sl_policy) {
+ rc = lock_matches(lock, data);
+ if (rc == INTERVAL_ITER_STOP)
+ return INTERVAL_ITER_STOP;
+ }
+ return INTERVAL_ITER_CONT;
+}
- if (unlikely(match == LCK_GROUP) &&
- lock->l_resource->lr_type == LDLM_EXTENT &&
- lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
- continue;
+/**
+ * Search for a lock with given parameters in interval trees.
+ *
+ * \param res search for a lock in this resource
+ * \param data parameters
+ *
+ * \retval a referenced lock or NULL.
+ */
+static struct ldlm_lock *search_itree(struct ldlm_resource *res,
+ struct lock_match_data *data)
+{
+ struct interval_node_extent ext = {
+ .start = data->lmd_policy->l_extent.start,
+ .end = data->lmd_policy->l_extent.end
+ };
+ int idx;
- /* We match if we have existing lock with same or wider set
- of bits. */
- if (lock->l_resource->lr_type == LDLM_IBITS &&
- ((lock->l_policy_data.l_inodebits.bits &
- policy->l_inodebits.bits) !=
- policy->l_inodebits.bits))
- continue;
+ for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+ struct ldlm_interval_tree *tree = &res->lr_itree[idx];
- if (!unref && LDLM_HAVE_MASK(lock, GONE))
- continue;
+ if (tree->lit_root == NULL)
+ continue;
- if ((flags & LDLM_FL_LOCAL_ONLY) &&
- !ldlm_is_local(lock))
- continue;
+ if (!(tree->lit_mode & *data->lmd_mode))
+ continue;
+
+ interval_search(tree->lit_root, &ext,
+ itree_overlap_cb, data);
+ }
+ return data->lmd_lock;
+}
- if (flags & LDLM_FL_TEST_LOCK) {
- LDLM_LOCK_GET(lock);
- ldlm_lock_touch_in_lru(lock);
- } else {
- ldlm_lock_addref_internal_nolock(lock, match);
- }
- *mode = match;
- return lock;
- }
- return NULL;
+/**
+ * Search for a lock with given properties in a queue.
+ *
+ * \param queue search for a lock in this queue
+ * \param data parameters
+ *
+ * \retval a referenced lock or NULL.
+ */
+static struct ldlm_lock *search_queue(struct list_head *queue,
+ struct lock_match_data *data)
+{
+ struct ldlm_lock *lock;
+ int rc;
+
+ list_for_each_entry(lock, queue, l_res_link) {
+ rc = lock_matches(lock, data);
+ if (rc == INTERVAL_ITER_STOP)
+ return data->lmd_lock;
+ }
+ return NULL;
}
void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
ldlm_lock_fail_match_locked(lock);
unlock_res_and_lock(lock);
}
-EXPORT_SYMBOL(ldlm_lock_fail_match);
/**
* Mark lock as "matchable" by OST.
*/
ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
const struct ldlm_res_id *res_id, ldlm_type_t type,
- ldlm_policy_data_t *policy, ldlm_mode_t mode,
- struct lustre_handle *lockh, int unref)
-{
- struct ldlm_resource *res;
- struct ldlm_lock *lock, *old_lock = NULL;
- int rc = 0;
- ENTRY;
+ ldlm_policy_data_t *policy, ldlm_mode_t mode,
+ struct lustre_handle *lockh, int unref)
+{
+ struct lock_match_data data = {
+ .lmd_old = NULL,
+ .lmd_lock = NULL,
+ .lmd_mode = &mode,
+ .lmd_policy = policy,
+ .lmd_flags = flags,
+ .lmd_unref = unref,
+ };
+ struct ldlm_resource *res;
+ struct ldlm_lock *lock;
+ int rc = 0;
+ ENTRY;
- if (ns == NULL) {
- old_lock = ldlm_handle2lock(lockh);
- LASSERT(old_lock);
+ if (ns == NULL) {
+ data.lmd_old = ldlm_handle2lock(lockh);
+ LASSERT(data.lmd_old != NULL);
- ns = ldlm_lock_to_ns(old_lock);
- res_id = &old_lock->l_resource->lr_name;
- type = old_lock->l_resource->lr_type;
- mode = old_lock->l_req_mode;
- }
+ ns = ldlm_lock_to_ns(data.lmd_old);
+ res_id = &data.lmd_old->l_resource->lr_name;
+ type = data.lmd_old->l_resource->lr_type;
+ *data.lmd_mode = data.lmd_old->l_req_mode;
+ }
res = ldlm_resource_get(ns, NULL, res_id, type, 0);
if (IS_ERR(res)) {
- LASSERT(old_lock == NULL);
+ LASSERT(data.lmd_old == NULL);
RETURN(0);
}
- LDLM_RESOURCE_ADDREF(res);
- lock_res(res);
+ LDLM_RESOURCE_ADDREF(res);
+ lock_res(res);
- lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
- flags, unref);
- if (lock != NULL)
- GOTO(out, rc = 1);
- if (flags & LDLM_FL_BLOCK_GRANTED)
- GOTO(out, rc = 0);
- lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
- flags, unref);
- if (lock != NULL)
- GOTO(out, rc = 1);
- lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
- flags, unref);
- if (lock != NULL)
- GOTO(out, rc = 1);
+ if (res->lr_type == LDLM_EXTENT)
+ lock = search_itree(res, &data);
+ else
+ lock = search_queue(&res->lr_granted, &data);
+ if (lock != NULL)
+ GOTO(out, rc = 1);
+ if (flags & LDLM_FL_BLOCK_GRANTED)
+ GOTO(out, rc = 0);
+ lock = search_queue(&res->lr_converting, &data);
+ if (lock != NULL)
+ GOTO(out, rc = 1);
+ lock = search_queue(&res->lr_waiting, &data);
+ if (lock != NULL)
+ GOTO(out, rc = 1);
EXIT;
out:
(type == LDLM_PLAIN || type == LDLM_IBITS) ?
res_id->name[3] : policy->l_extent.end);
}
- if (old_lock)
- LDLM_LOCK_PUT(old_lock);
+ if (data.lmd_old != NULL)
+ LDLM_LOCK_PUT(data.lmd_old);
- return rc ? mode : 0;
+ return rc ? mode : 0;
}
EXPORT_SYMBOL(ldlm_lock_match);
struct ldlm_interval *node = NULL;
ENTRY;
- lock->l_last_activity = cfs_time_current_sec();
/* policies are not executed on the client or during replay */
if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
&& !local && ns->ns_policy) {
}
}
- if (*flags & LDLM_FL_RESENT)
+ if (*flags & LDLM_FL_RESENT) {
+ /* Reconstruct LDLM_FL_SRV_ENQ_MASK @flags for reply.
+ * Set LOCK_CHANGED always.
+ * Check if the lock is granted for BLOCK_GRANTED.
+ * Take NO_TIMEOUT from the lock as it is inherited through
+ * LDLM_FL_INHERIT_MASK */
+ *flags |= LDLM_FL_LOCK_CHANGED;
+ if (lock->l_req_mode != lock->l_granted_mode)
+ *flags |= LDLM_FL_BLOCK_GRANTED;
+ *flags |= lock->l_flags & LDLM_FL_NO_TIMEOUT;
RETURN(ELDLM_OK);
+ }
/* For a replaying lock, it might be already in granted list. So
* unlinking the lock will cause the interval node to be freed, we
/* The server returned a blocked lock, but it was granted
* before we got a chance to actually enqueue it. We don't
* need to do anything else. */
- *flags &= ~(LDLM_FL_BLOCK_GRANTED |
- LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
+ *flags &= ~LDLM_FL_BLOCKED_MASK;
GOTO(out, rc = ELDLM_OK);
}
return LDLM_ITER_CONTINUE;
}
-static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
+static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
struct hlist_node *hnode, void *arg)
{
struct ldlm_resource *res = cfs_hash_object(hs, hnode);
}
EXIT;
}
-EXPORT_SYMBOL(ldlm_reprocess_all_ns);
/**
* Try to grant all waiting locks on a resource.
/* Releases cancel callback. */
ldlm_cancel_callback(lock);
- /* Yes, second time, just in case it was added again while we were
- * running with no res lock in ldlm_cancel_callback */
- if (ldlm_is_waited(lock))
- ldlm_del_waiting_lock(lock);
+ LASSERT(!ldlm_is_waited(lock));
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy_nolock(lock);
* Iterator function for ldlm_cancel_locks_for_export.
* Cancels passed locks.
*/
-int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
- struct hlist_node *hnode, void *data)
+static int
+ldlm_cancel_locks_for_export_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+ struct hlist_node *hnode, void *data)
{
struct export_cl_data *ecl = (struct export_cl_data *)data;
OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
RETURN(res);
}
-EXPORT_SYMBOL(ldlm_lock_convert);
/**
* Print lock with lock handle \a lockh description into debug log.