*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_LDLM
#include <libcfs/libcfs.h>
+
+#include <lustre_swab.h>
#include <obd_class.h>
+
#include "ldlm_internal.h"
/* lock types */
[LDLM_FLOCK] = "FLK",
[LDLM_IBITS] = "IBT",
};
-EXPORT_SYMBOL(ldlm_typename);
-static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
+static ldlm_policy_wire_to_local_t ldlm_policy_wire_to_local[] = {
[LDLM_PLAIN - LDLM_MIN_TYPE] = ldlm_plain_policy_wire_to_local,
[LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
- [LDLM_FLOCK - LDLM_MIN_TYPE] = ldlm_flock_policy_wire18_to_local,
- [LDLM_IBITS - LDLM_MIN_TYPE] = ldlm_ibits_policy_wire_to_local,
-};
-
-static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
- [LDLM_PLAIN - LDLM_MIN_TYPE] = ldlm_plain_policy_wire_to_local,
- [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
- [LDLM_FLOCK - LDLM_MIN_TYPE] = ldlm_flock_policy_wire21_to_local,
+ [LDLM_FLOCK - LDLM_MIN_TYPE] = ldlm_flock_policy_wire_to_local,
[LDLM_IBITS - LDLM_MIN_TYPE] = ldlm_ibits_policy_wire_to_local,
};
/**
* Converts lock policy from local format to on the wire lock_desc format
*/
-void ldlm_convert_policy_to_wire(ldlm_type_t type,
- const ldlm_policy_data_t *lpolicy,
- ldlm_wire_policy_data_t *wpolicy)
+void ldlm_convert_policy_to_wire(enum ldlm_type type,
+ const union ldlm_policy_data *lpolicy,
+ union ldlm_wire_policy_data *wpolicy)
{
- ldlm_policy_local_to_wire_t convert;
+ ldlm_policy_local_to_wire_t convert;
- convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
+ convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
- convert(lpolicy, wpolicy);
+ convert(lpolicy, wpolicy);
}
/**
* Converts lock policy from on the wire lock_desc format to local format
*/
-void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
- const ldlm_wire_policy_data_t *wpolicy,
- ldlm_policy_data_t *lpolicy)
+void ldlm_convert_policy_to_local(struct obd_export *exp, enum ldlm_type type,
+ const union ldlm_wire_policy_data *wpolicy,
+ union ldlm_policy_data *lpolicy)
{
ldlm_policy_wire_to_local_t convert;
- int new_client;
- /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
- new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0;
- if (new_client)
- convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
- else
- convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
+ convert = ldlm_policy_wire_to_local[type - LDLM_MIN_TYPE];
convert(wpolicy, lpolicy);
}
-char *ldlm_it2str(int it)
-{
- switch (it) {
- case IT_OPEN:
- return "open";
- case IT_CREAT:
- return "creat";
- case (IT_OPEN | IT_CREAT):
- return "open|creat";
- case IT_READDIR:
- return "readdir";
- case IT_GETATTR:
- return "getattr";
- case IT_LOOKUP:
- return "lookup";
- case IT_UNLINK:
- return "unlink";
- case IT_GETXATTR:
- return "getxattr";
- case IT_LAYOUT:
- return "layout";
- default:
- CERROR("Unknown intent %d\n", it);
- return "UNKNOWN";
- }
+const char *ldlm_it2str(enum ldlm_intent_flags it)
+{
+ switch (it) {
+ case IT_OPEN:
+ return "open";
+ case IT_CREAT:
+ return "creat";
+ case (IT_OPEN | IT_CREAT):
+ return "open|creat";
+ case IT_READDIR:
+ return "readdir";
+ case IT_GETATTR:
+ return "getattr";
+ case IT_LOOKUP:
+ return "lookup";
+ case IT_UNLINK:
+ return "unlink";
+ case IT_GETXATTR:
+ return "getxattr";
+ case IT_LAYOUT:
+ return "layout";
+ default:
+ CERROR("Unknown intent 0x%08x\n", it);
+ return "UNKNOWN";
+ }
}
EXPORT_SYMBOL(ldlm_it2str);
res = lock->l_resource;
LASSERT(ldlm_is_destroyed(lock));
+ LASSERT(list_empty(&lock->l_exp_list));
LASSERT(list_empty(&lock->l_res_link));
LASSERT(list_empty(&lock->l_pending_chain));
/**
* Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
+ *
+ * If \a last_use is non-zero, it will remove the lock from LRU only if
+ * it matches lock's l_last_used.
+ *
+ * \retval 0 if \a last_use is set, the lock is not in LRU list or \a last_use
+ * doesn't match lock's l_last_used;
+ * otherwise, the lock hasn't been in the LRU list.
+ * \retval 1 the lock was in LRU list and removed.
*/
-int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
+int ldlm_lock_remove_from_lru_check(struct ldlm_lock *lock, cfs_time_t last_use)
{
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
- int rc;
+ int rc = 0;
ENTRY;
if (ldlm_is_ns_srv(lock)) {
}
spin_lock(&ns->ns_lock);
- rc = ldlm_lock_remove_from_lru_nolock(lock);
+ if (last_use == 0 || last_use == lock->l_last_used)
+ rc = ldlm_lock_remove_from_lru_nolock(lock);
spin_unlock(&ns->ns_lock);
- EXIT;
- return rc;
+
+ RETURN(rc);
}
/**
* ldlm_lock_destroy, you can never drop your final references on this lock.
* Because it's not in the hash table anymore. -phil
*/
-int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
+static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
{
ENTRY;
ldlm_lock_remove_from_lru(lock);
class_handle_unhash(&lock->l_handle);
-#if 0
- /* Wake anyone waiting for this lock */
- /* FIXME: I should probably add yet another flag, instead of using
- * l_export to only call this on clients */
- if (lock->l_export)
- class_export_put(lock->l_export);
- lock->l_export = NULL;
- if (lock->l_export && lock->l_completion_ast)
- lock->l_completion_ast(lock, 0);
-#endif
EXIT;
return 1;
}
OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
}
-struct portals_handle_ops lock_handle_ops = {
+static struct portals_handle_ops lock_handle_ops = {
.hop_addref = lock_handle_addref,
.hop_free = lock_handle_free,
};
RETURN(0);
}
-EXPORT_SYMBOL(ldlm_lock_change_resource);
/** \defgroup ldlm_handles LDLM HANDLES
* Ways to get hold of locks without any addresses.
if (lock == NULL)
RETURN(NULL);
+ if (lock->l_export != NULL && lock->l_export->exp_failed) {
+ CDEBUG(D_INFO, "lock export failed: lock %p, exp %p\n",
+ lock, lock->l_export);
+ LDLM_LOCK_PUT(lock);
+ RETURN(NULL);
+ }
+
/* It's unlikely but possible that someone marked the lock as
* destroyed after we did handle2object on it */
if ((flags == 0) && !ldlm_is_destroyed(lock)) {
&lock->l_policy_data,
&desc->l_policy_data);
}
-EXPORT_SYMBOL(ldlm_lock2desc);
/**
* Add a lock to list of conflicting locks to send AST to.
*
* Only add if we have not sent a blocking AST to the lock yet.
*/
-void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
- struct list_head *work_list)
+static void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
+ struct list_head *work_list)
{
if (!ldlm_is_ast_sent(lock)) {
LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
/**
* Add a lock to list of just granted locks to send completion AST to.
*/
-void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
+static void ldlm_add_cp_work_item(struct ldlm_lock *lock,
+ struct list_head *work_list)
{
if (!ldlm_is_cp_reqd(lock)) {
ldlm_set_cp_reqd(lock);
* r/w reference type is determined by \a mode
* Calls ldlm_lock_addref_internal.
*/
-void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
+void ldlm_lock_addref(const struct lustre_handle *lockh, enum ldlm_mode mode)
{
struct ldlm_lock *lock;
lock = ldlm_handle2lock(lockh);
- LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
+ LASSERTF(lock != NULL, "Non-existing lock: %#llx\n", lockh->cookie);
ldlm_lock_addref_internal(lock, mode);
LDLM_LOCK_PUT(lock);
}
* Removes lock from LRU if it is there.
* Assumes the LDLM lock is already locked.
*/
-void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
+void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock,
+ enum ldlm_mode mode)
{
ldlm_lock_remove_from_lru(lock);
if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
*
* \retval -EAGAIN lock is being canceled.
*/
-int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
+int ldlm_lock_addref_try(const struct lustre_handle *lockh, enum ldlm_mode mode)
{
struct ldlm_lock *lock;
int result;
* Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
* Only called for local locks.
*/
-void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
+void ldlm_lock_addref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
{
- lock_res_and_lock(lock);
- ldlm_lock_addref_internal_nolock(lock, mode);
- unlock_res_and_lock(lock);
+ lock_res_and_lock(lock);
+ ldlm_lock_addref_internal_nolock(lock, mode);
+ unlock_res_and_lock(lock);
}
/**
* Does NOT add lock to LRU if no r/w references left to accomodate flock locks
* that cannot be placed in LRU.
*/
-void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
+void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock,
+ enum ldlm_mode mode)
{
LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
* on the namespace.
* For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
*/
-void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
+void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
{
struct ldlm_namespace *ns;
ENTRY;
ldlm_lock_decref_internal_nolock(lock, mode);
- if (ldlm_is_local(lock) &&
- !lock->l_readers && !lock->l_writers) {
- /* If this is a local lock on a server namespace and this was
- * the last reference, cancel the lock. */
- CDEBUG(D_INFO, "forcing cancel of local lock\n");
+ if ((ldlm_is_local(lock) || lock->l_req_mode == LCK_GROUP) &&
+ !lock->l_readers && !lock->l_writers) {
+ /* If this is a local lock on a server namespace and this was
+ * the last reference, cancel the lock.
+ *
+ * Group locks are special:
+ * They must not go in LRU, but they are not called back
+ * like non-group locks, instead they are manually released.
+ * They have an l_writers reference which they keep until
+ * they are manually released, so we remove them when they have
+ * no more reader or writer references. - LU-6368 */
ldlm_set_cbpending(lock);
- }
+ }
- if (!lock->l_readers && !lock->l_writers &&
- ldlm_is_cbpending(lock)) {
- /* If we received a blocked AST and this was the last reference,
- * run the callback. */
+ if (!lock->l_readers && !lock->l_writers && ldlm_is_cbpending(lock)) {
+ /* If we received a blocked AST and this was the last reference,
+ * run the callback. */
if (ldlm_is_ns_srv(lock) && lock->l_export)
CERROR("FL_CBPENDING set on non-local lock--just a "
"warning\n");
/**
* Decrease reader/writer refcount for LDLM lock with handle \a lockh
*/
-void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
+void ldlm_lock_decref(const struct lustre_handle *lockh, enum ldlm_mode mode)
{
struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
- LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
+ LASSERTF(lock != NULL, "Non-existing lock: %#llx\n", lockh->cookie);
ldlm_lock_decref_internal(lock, mode);
LDLM_LOCK_PUT(lock);
}
* \a lockh and mark it for subsequent cancellation once r/w refcount
* drops to zero instead of putting into LRU.
*
- * Typical usage is for GROUP locks which we cannot allow to be cached.
*/
-void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
+void ldlm_lock_decref_and_cancel(const struct lustre_handle *lockh,
+ enum ldlm_mode mode)
{
struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
ENTRY;
if (work_list && lock->l_completion_ast != NULL)
ldlm_add_ast_work_item(lock, NULL, work_list);
- /* We should not add locks to granted list in the following cases:
- * - this is an UNLOCK but not a real lock;
- * - this is a TEST lock;
- * - this is a F_CANCELLK lock (async flock has req_mode == 0)
- * - this is a deadlock (flock cannot be granted) */
- if (lock->l_req_mode == 0 ||
- lock->l_req_mode == LCK_NL ||
- ldlm_is_test_lock(lock) ||
- ldlm_is_flock_deadlock(lock))
- RETURN_EXIT;
-
if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
ldlm_grant_lock_with_skiplist(lock);
else if (res->lr_type == LDLM_EXTENT)
ldlm_extent_add_lock(res, lock);
- else
- ldlm_resource_add_lock(res, &res->lr_granted, lock);
-
- if (lock->l_granted_mode < res->lr_most_restr)
- res->lr_most_restr = lock->l_granted_mode;
+ else if (res->lr_type == LDLM_FLOCK) {
+ /* We should not add locks to granted list in the following
+ * cases:
+ * - this is an UNLOCK but not a real lock;
+ * - this is a TEST lock;
+ * - this is a F_CANCELLK lock (async flock has req_mode == 0)
+ * - this is a deadlock (flock cannot be granted) */
+ if (lock->l_req_mode == 0 ||
+ lock->l_req_mode == LCK_NL ||
+ ldlm_is_test_lock(lock) ||
+ ldlm_is_flock_deadlock(lock))
+ RETURN_EXIT;
+ ldlm_resource_add_lock(res, &res->lr_granted, lock);
+ } else {
+ LBUG();
+ }
ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
EXIT;
}
/**
- * Search for a lock with given properties in a queue.
+ * Describe the overlap between two locks. itree_overlap_cb data.
+ */
+struct lock_match_data {
+ struct ldlm_lock *lmd_old;
+ struct ldlm_lock *lmd_lock;
+ enum ldlm_mode *lmd_mode;
+ union ldlm_policy_data *lmd_policy;
+ __u64 lmd_flags;
+ int lmd_unref;
+};
+
+/**
+ * Check if the given @lock meets the criteria for a match.
+ * A reference on the lock is taken if matched.
*
- * \retval a referenced lock or NULL. See the flag descriptions below, in the
- * comment above ldlm_lock_match
+ * \param lock test-against this lock
+ * \param data parameters
*/
-static struct ldlm_lock *search_queue(struct list_head *queue,
- ldlm_mode_t *mode,
- ldlm_policy_data_t *policy,
- struct ldlm_lock *old_lock,
- __u64 flags, int unref)
-{
- struct ldlm_lock *lock;
- struct list_head *tmp;
+static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
+{
+ union ldlm_policy_data *lpol = &lock->l_policy_data;
+ enum ldlm_mode match;
+
+ if (lock == data->lmd_old)
+ return INTERVAL_ITER_STOP;
+
+ /* Check if this lock can be matched.
+ * Used by LU-2919(exclusive open) for open lease lock */
+ if (ldlm_is_excl(lock))
+ return INTERVAL_ITER_CONT;
+
+ /* llite sometimes wants to match locks that will be
+ * canceled when their users drop, but we allow it to match
+ * if it passes in CBPENDING and the lock still has users.
+ * this is generally only going to be used by children
+ * whose parents already hold a lock so forward progress
+ * can still happen. */
+ if (ldlm_is_cbpending(lock) &&
+ !(data->lmd_flags & LDLM_FL_CBPENDING))
+ return INTERVAL_ITER_CONT;
+ if (!data->lmd_unref && ldlm_is_cbpending(lock) &&
+ lock->l_readers == 0 && lock->l_writers == 0)
+ return INTERVAL_ITER_CONT;
+
+ if (!(lock->l_req_mode & *data->lmd_mode))
+ return INTERVAL_ITER_CONT;
+ match = lock->l_req_mode;
+
+ switch (lock->l_resource->lr_type) {
+ case LDLM_EXTENT:
+ if (lpol->l_extent.start > data->lmd_policy->l_extent.start ||
+ lpol->l_extent.end < data->lmd_policy->l_extent.end)
+ return INTERVAL_ITER_CONT;
- list_for_each(tmp, queue) {
- ldlm_mode_t match;
+ if (unlikely(match == LCK_GROUP) &&
+ data->lmd_policy->l_extent.gid != LDLM_GID_ANY &&
+ lpol->l_extent.gid != data->lmd_policy->l_extent.gid)
+ return INTERVAL_ITER_CONT;
+ break;
+ case LDLM_IBITS:
+ /* We match if we have existing lock with same or wider set
+ of bits. */
+ if ((lpol->l_inodebits.bits &
+ data->lmd_policy->l_inodebits.bits) !=
+ data->lmd_policy->l_inodebits.bits)
+ return INTERVAL_ITER_CONT;
+ break;
+ default:
+ ;
+ }
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ /* We match if we have existing lock with same or wider set
+ of bits. */
+ if (!data->lmd_unref && LDLM_HAVE_MASK(lock, GONE))
+ return INTERVAL_ITER_CONT;
- if (lock == old_lock)
- break;
+ if (!equi(data->lmd_flags & LDLM_FL_LOCAL_ONLY, ldlm_is_local(lock)))
+ return INTERVAL_ITER_CONT;
- /* Check if this lock can be matched.
- * Used by LU-2919(exclusive open) for open lease lock */
- if (ldlm_is_excl(lock))
- continue;
+ if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
+ LDLM_LOCK_GET(lock);
+ ldlm_lock_touch_in_lru(lock);
+ } else {
+ ldlm_lock_addref_internal_nolock(lock, match);
+ }
- /* llite sometimes wants to match locks that will be
- * canceled when their users drop, but we allow it to match
- * if it passes in CBPENDING and the lock still has users.
- * this is generally only going to be used by children
- * whose parents already hold a lock so forward progress
- * can still happen. */
- if (ldlm_is_cbpending(lock) &&
- !(flags & LDLM_FL_CBPENDING))
- continue;
- if (!unref && ldlm_is_cbpending(lock) &&
- lock->l_readers == 0 && lock->l_writers == 0)
- continue;
+ *data->lmd_mode = match;
+ data->lmd_lock = lock;
- if (!(lock->l_req_mode & *mode))
- continue;
- match = lock->l_req_mode;
+ return INTERVAL_ITER_STOP;
+}
- if (lock->l_resource->lr_type == LDLM_EXTENT &&
- (lock->l_policy_data.l_extent.start >
- policy->l_extent.start ||
- lock->l_policy_data.l_extent.end < policy->l_extent.end))
- continue;
+static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
+{
+ struct ldlm_interval *node = to_ldlm_interval(in);
+ struct lock_match_data *data = args;
+ struct ldlm_lock *lock;
+ int rc;
- if (unlikely(match == LCK_GROUP) &&
- lock->l_resource->lr_type == LDLM_EXTENT &&
- policy->l_extent.gid != LDLM_GID_ANY &&
- lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
+ list_for_each_entry(lock, &node->li_group, l_sl_policy) {
+ rc = lock_matches(lock, data);
+ if (rc == INTERVAL_ITER_STOP)
+ return INTERVAL_ITER_STOP;
+ }
+ return INTERVAL_ITER_CONT;
+}
+
+/**
+ * Search for a lock with given parameters in interval trees.
+ *
+ * \param res search for a lock in this resource
+ * \param data parameters
+ *
+ * \retval a referenced lock or NULL.
+ */
+static struct ldlm_lock *search_itree(struct ldlm_resource *res,
+ struct lock_match_data *data)
+{
+ struct interval_node_extent ext = {
+ .start = data->lmd_policy->l_extent.start,
+ .end = data->lmd_policy->l_extent.end
+ };
+ int idx;
+
+ for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+ struct ldlm_interval_tree *tree = &res->lr_itree[idx];
+
+ if (tree->lit_root == NULL)
continue;
- /* We match if we have existing lock with same or wider set
- of bits. */
- if (lock->l_resource->lr_type == LDLM_IBITS &&
- ((lock->l_policy_data.l_inodebits.bits &
- policy->l_inodebits.bits) !=
- policy->l_inodebits.bits))
- continue;
+ if (!(tree->lit_mode & *data->lmd_mode))
+ continue;
- if (!unref && LDLM_HAVE_MASK(lock, GONE))
- continue;
+ interval_search(tree->lit_root, &ext,
+ itree_overlap_cb, data);
+ }
+ return data->lmd_lock;
+}
- if ((flags & LDLM_FL_LOCAL_ONLY) &&
- !ldlm_is_local(lock))
- continue;
- if (flags & LDLM_FL_TEST_LOCK) {
- LDLM_LOCK_GET(lock);
- ldlm_lock_touch_in_lru(lock);
- } else {
- ldlm_lock_addref_internal_nolock(lock, match);
- }
- *mode = match;
- return lock;
- }
+/**
+ * Search for a lock with given properties in a queue.
+ *
+ * \param queue search for a lock in this queue
+ * \param data parameters
+ *
+ * \retval a referenced lock or NULL.
+ */
+static struct ldlm_lock *search_queue(struct list_head *queue,
+ struct lock_match_data *data)
+{
+ struct ldlm_lock *lock;
+ int rc;
- return NULL;
+ list_for_each_entry(lock, queue, l_res_link) {
+ rc = lock_matches(lock, data);
+ if (rc == INTERVAL_ITER_STOP)
+ return data->lmd_lock;
+ }
+ return NULL;
}
void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
ldlm_lock_fail_match_locked(lock);
unlock_res_and_lock(lock);
}
-EXPORT_SYMBOL(ldlm_lock_fail_match);
/**
* Mark lock as "matchable" by OST.
* keep caller code unchanged), the context failure will be discovered by
* caller sometime later.
*/
-ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
- const struct ldlm_res_id *res_id, ldlm_type_t type,
- ldlm_policy_data_t *policy, ldlm_mode_t mode,
- struct lustre_handle *lockh, int unref)
-{
- struct ldlm_resource *res;
- struct ldlm_lock *lock, *old_lock = NULL;
- int rc = 0;
- ENTRY;
+enum ldlm_mode ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
+ const struct ldlm_res_id *res_id,
+ enum ldlm_type type,
+ union ldlm_policy_data *policy,
+ enum ldlm_mode mode,
+ struct lustre_handle *lockh, int unref)
+{
+ struct lock_match_data data = {
+ .lmd_old = NULL,
+ .lmd_lock = NULL,
+ .lmd_mode = &mode,
+ .lmd_policy = policy,
+ .lmd_flags = flags,
+ .lmd_unref = unref,
+ };
+ struct ldlm_resource *res;
+ struct ldlm_lock *lock;
+ int rc = 0;
+ ENTRY;
- if (ns == NULL) {
- old_lock = ldlm_handle2lock(lockh);
- LASSERT(old_lock);
+ if (ns == NULL) {
+ data.lmd_old = ldlm_handle2lock(lockh);
+ LASSERT(data.lmd_old != NULL);
- ns = ldlm_lock_to_ns(old_lock);
- res_id = &old_lock->l_resource->lr_name;
- type = old_lock->l_resource->lr_type;
- mode = old_lock->l_req_mode;
- }
+ ns = ldlm_lock_to_ns(data.lmd_old);
+ res_id = &data.lmd_old->l_resource->lr_name;
+ type = data.lmd_old->l_resource->lr_type;
+ *data.lmd_mode = data.lmd_old->l_req_mode;
+ }
res = ldlm_resource_get(ns, NULL, res_id, type, 0);
if (IS_ERR(res)) {
- LASSERT(old_lock == NULL);
+ LASSERT(data.lmd_old == NULL);
RETURN(0);
}
- LDLM_RESOURCE_ADDREF(res);
- lock_res(res);
-
- lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
- flags, unref);
- if (lock != NULL)
- GOTO(out, rc = 1);
- if (flags & LDLM_FL_BLOCK_GRANTED)
- GOTO(out, rc = 0);
- lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
- flags, unref);
- if (lock != NULL)
- GOTO(out, rc = 1);
- lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
- flags, unref);
- if (lock != NULL)
- GOTO(out, rc = 1);
+ LDLM_RESOURCE_ADDREF(res);
+ lock_res(res);
+
+ if (res->lr_type == LDLM_EXTENT)
+ lock = search_itree(res, &data);
+ else
+ lock = search_queue(&res->lr_granted, &data);
+ if (lock != NULL)
+ GOTO(out, rc = 1);
+ if (flags & LDLM_FL_BLOCK_GRANTED)
+ GOTO(out, rc = 0);
+ lock = search_queue(&res->lr_converting, &data);
+ if (lock != NULL)
+ GOTO(out, rc = 1);
+ lock = search_queue(&res->lr_waiting, &data);
+ if (lock != NULL)
+ GOTO(out, rc = 1);
EXIT;
out:
}
out2:
if (rc) {
- LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
+ LDLM_DEBUG(lock, "matched (%llu %llu)",
(type == LDLM_PLAIN || type == LDLM_IBITS) ?
res_id->name[2] : policy->l_extent.start,
(type == LDLM_PLAIN || type == LDLM_IBITS) ?
} else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
- LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
+ "%llu/%llu (%llu %llu)", ns,
type, mode, res_id->name[0], res_id->name[1],
(type == LDLM_PLAIN || type == LDLM_IBITS) ?
res_id->name[2] :policy->l_extent.start,
(type == LDLM_PLAIN || type == LDLM_IBITS) ?
res_id->name[3] : policy->l_extent.end);
}
- if (old_lock)
- LDLM_LOCK_PUT(old_lock);
+ if (data.lmd_old != NULL)
+ LDLM_LOCK_PUT(data.lmd_old);
- return rc ? mode : 0;
+ return rc ? mode : 0;
}
EXPORT_SYMBOL(ldlm_lock_match);
-ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
- __u64 *bits)
+enum ldlm_mode ldlm_revalidate_lock_handle(const struct lustre_handle *lockh,
+ __u64 *bits)
{
- struct ldlm_lock *lock;
- ldlm_mode_t mode = 0;
- ENTRY;
+ struct ldlm_lock *lock;
+ enum ldlm_mode mode = 0;
+ ENTRY;
- lock = ldlm_handle2lock(lockh);
- if (lock != NULL) {
- lock_res_and_lock(lock);
+ lock = ldlm_handle2lock(lockh);
+ if (lock != NULL) {
+ lock_res_and_lock(lock);
if (LDLM_HAVE_MASK(lock, GONE))
- GOTO(out, mode);
+ GOTO(out, mode);
if (ldlm_is_cbpending(lock) &&
lock->l_readers == 0 && lock->l_writers == 0)
memcpy(data, lvb, size);
break;
default:
- LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
+ LDLM_ERROR(lock, "Unknown LVB type: %d", lock->l_lvb_type);
libcfs_debug_dumpstack(NULL);
RETURN(-EINVAL);
}
*/
struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
- ldlm_type_t type,
- ldlm_mode_t mode,
+ enum ldlm_type type,
+ enum ldlm_mode mode,
const struct ldlm_callback_suite *cbs,
void *data, __u32 lvb_len,
enum lvb_type lvb_type)
* set, skip all the enqueueing and delegate lock processing to intent policy
* function.
*/
-ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
- struct ldlm_lock **lockp,
- void *cookie, __u64 *flags)
+enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
+ struct ldlm_lock **lockp,
+ void *cookie, __u64 *flags)
{
- struct ldlm_lock *lock = *lockp;
- struct ldlm_resource *res = lock->l_resource;
- int local = ns_is_client(ldlm_res_to_ns(res));
+ struct ldlm_lock *lock = *lockp;
+ struct ldlm_resource *res = lock->l_resource;
+ int local = ns_is_client(ldlm_res_to_ns(res));
#ifdef HAVE_SERVER_SUPPORT
- ldlm_processing_policy policy;
+ ldlm_processing_policy policy;
#endif
- ldlm_error_t rc = ELDLM_OK;
- struct ldlm_interval *node = NULL;
- ENTRY;
+ enum ldlm_error rc = ELDLM_OK;
+ struct ldlm_interval *node = NULL;
+ ENTRY;
/* policies are not executed on the client or during replay */
if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
}
*flags |= LDLM_FL_LOCK_CHANGED;
RETURN(0);
+ } else if (rc != ELDLM_OK &&
+ lock->l_req_mode == lock->l_granted_mode) {
+ LASSERT(*flags & LDLM_FL_RESENT);
+ /* It may happen that ns_policy returns an error in
+ * resend case, object may be unlinked or just some
+ * error occurs. It is unclear if lock reached the
+ * client in the original reply, just leave the lock on
+ * server, not returning it again to client. Due to
+ * LU-6529, the server will not OOM. */
+ RETURN(rc);
} else if (rc != ELDLM_OK ||
(rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
ldlm_lock_destroy(lock);
ldlm_set_ast_discard_data(lock);
if (*flags & LDLM_FL_TEST_LOCK)
ldlm_set_test_lock(lock);
+ if (*flags & LDLM_FL_COS_INCOMPAT)
+ ldlm_set_cos_incompat(lock);
+ if (*flags & LDLM_FL_COS_ENABLED)
+ ldlm_set_cos_enabled(lock);
/* This distinction between local lock trees is very important; a client
* namespace only has information about locks taken by that client, and
}
policy = ldlm_processing_policy_table[res->lr_type];
- policy(lock, flags, 1, &rc, NULL);
+ policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL);
GOTO(out, rc);
#else
} else {
* Must be called with resource lock held.
*/
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
- struct list_head *work_list)
+ struct list_head *work_list,
+ enum ldlm_process_intention intention)
{
struct list_head *tmp, *pos;
- ldlm_processing_policy policy;
+ ldlm_processing_policy policy;
__u64 flags;
- int rc = LDLM_ITER_CONTINUE;
- ldlm_error_t err;
- ENTRY;
+ int rc = LDLM_ITER_CONTINUE;
+ enum ldlm_error err;
+ ENTRY;
- check_res_locked(res);
+ check_res_locked(res);
- policy = ldlm_processing_policy_table[res->lr_type];
- LASSERT(policy);
+ policy = ldlm_processing_policy_table[res->lr_type];
+ LASSERT(policy);
+ LASSERT(intention == LDLM_PROCESS_RESCAN ||
+ intention == LDLM_PROCESS_RECOVERY);
list_for_each_safe(tmp, pos, queue) {
- struct ldlm_lock *pending;
+ struct ldlm_lock *pending;
+
pending = list_entry(tmp, struct ldlm_lock, l_res_link);
CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
flags = 0;
- rc = policy(pending, &flags, 0, &err, work_list);
- if (rc != LDLM_ITER_CONTINUE)
- break;
+ rc = policy(pending, &flags, intention, &err, work_list);
+ /*
+ * When this is called from recovery done, we always want
+ * to scan the whole list no matter what 'rc' is returned.
+ */
+ if (rc != LDLM_ITER_CONTINUE &&
+ intention == LDLM_PROCESS_RESCAN)
+ break;
}
- RETURN(rc);
+ RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
}
+
+/**
+ * Conflicting locks are detected for a lock to be enqueued, add the lock
+ * into waiting list and send blocking ASTs to the conflicting locks.
+ *
+ * \param[in] lock The lock to be enqueued.
+ * \param[out] flags Lock flags for the lock to be enqueued.
+ * \param[in] rpc_list Conflicting locks list.
+ * \param[in] grant_flags extra flags when granting a lock.
+ *
+ * \retval -ERESTART: Some lock was instantly canceled while sending
+ * blocking ASTs, caller needs to re-check conflicting
+ * locks.
+ * \retval -EAGAIN: Lock was destroyed, caller should return error.
+ * \reval 0: Lock is successfully added in waiting list.
+ */
+int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
+ struct list_head *rpc_list, __u64 grant_flags)
+{
+ struct ldlm_resource *res = lock->l_resource;
+ int rc;
+ ENTRY;
+
+ check_res_locked(res);
+
+ /* If either of the compat_queue()s returned failure, then we
+ * have ASTs to send and must go onto the waiting list.
+ *
+ * bug 2322: we used to unlink and re-add here, which was a
+ * terrible folly -- if we goto restart, we could get
+ * re-ordered! Causes deadlock, because ASTs aren't sent! */
+ if (list_empty(&lock->l_res_link))
+ ldlm_resource_add_lock(res, &res->lr_waiting, lock);
+ unlock_res(res);
+
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), rpc_list,
+ LDLM_WORK_BL_AST);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
+ !ns_is_client(ldlm_res_to_ns(res)))
+ class_fail_export(lock->l_export);
+
+ lock_res(res);
+ if (rc == -ERESTART) {
+ /* 15715: The lock was granted and destroyed after
+ * resource lock was dropped. Interval node was freed
+ * in ldlm_lock_destroy. Anyway, this always happens
+ * when a client is being evicted. So it would be
+ * ok to return an error. -jay */
+ if (ldlm_is_destroyed(lock))
+ RETURN(-EAGAIN);
+
+ /* lock was granted while resource was unlocked. */
+ if (lock->l_granted_mode == lock->l_req_mode) {
+ /* bug 11300: if the lock has been granted,
+ * break earlier because otherwise, we will go
+ * to restart and ldlm_resource_unlink will be
+ * called and it causes the interval node to be
+ * freed. Then we will fail at
+ * ldlm_extent_add_lock() */
+ *flags &= ~LDLM_FL_BLOCKED_MASK;
+ RETURN(0);
+ }
+
+ RETURN(rc);
+ }
+ *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags);
+
+ RETURN(0);
+}
+
+/**
+ * Discard all AST work items from list.
+ *
+ * If for whatever reason we do not want to send ASTs to conflicting locks
+ * anymore, disassemble the list with this function.
+ */
+void ldlm_discard_bl_list(struct list_head *bl_list)
+{
+ struct list_head *tmp, *pos;
+ ENTRY;
+
+ list_for_each_safe(pos, tmp, bl_list) {
+ struct ldlm_lock *lock =
+ list_entry(pos, struct ldlm_lock, l_bl_ast);
+
+ list_del_init(&lock->l_bl_ast);
+ LASSERT(ldlm_is_ast_sent(lock));
+ ldlm_clear_ast_sent(lock);
+ LASSERT(lock->l_bl_ast_run == 0);
+ LASSERT(lock->l_blocking_lock);
+ LDLM_LOCK_RELEASE(lock->l_blocking_lock);
+ lock->l_blocking_lock = NULL;
+ LDLM_LOCK_RELEASE(lock);
+ }
+ EXIT;
+}
+
#endif
/**
/* transfer the glimpse descriptor to ldlm_cb_set_arg */
arg->gl_desc = gl_work->gl_desc;
+ arg->gl_interpret_reply = gl_work->gl_interpret_reply;
+ arg->gl_interpret_data = gl_work->gl_interpret_data;
/* invoke the actual glimpse callback */
if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
return rc;
}
-static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
-{
- ldlm_reprocess_all(res);
- return LDLM_ITER_CONTINUE;
-}
-
-static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
- struct hlist_node *hnode, void *arg)
-{
- struct ldlm_resource *res = cfs_hash_object(hs, hnode);
- int rc;
-
- rc = reprocess_one_queue(res, arg);
-
- return rc == LDLM_ITER_STOP;
-}
-
-/**
- * Iterate through all resources on a namespace attempting to grant waiting
- * locks.
- */
-void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
-{
- ENTRY;
-
- if (ns != NULL) {
- cfs_hash_for_each_nolock(ns->ns_rs_hash,
- ldlm_reprocess_res, NULL);
- }
- EXIT;
-}
-EXPORT_SYMBOL(ldlm_reprocess_all_ns);
-
/**
* Try to grant all waiting locks on a resource.
*
* Typically called after some resource locks are cancelled to see
* if anything could be granted as a result of the cancellation.
*/
-void ldlm_reprocess_all(struct ldlm_resource *res)
+static void __ldlm_reprocess_all(struct ldlm_resource *res,
+ enum ldlm_process_intention intention)
{
struct list_head rpc_list;
#ifdef HAVE_SERVER_SUPPORT
+ struct obd_device *obd;
int rc;
ENTRY;
return;
}
+ /* Disable reprocess during lock replay stage but allow during
+ * request replay stage.
+ */
+ obd = ldlm_res_to_ns(res)->ns_obd;
+ if (obd->obd_recovering &&
+ atomic_read(&obd->obd_req_replay_clients) == 0)
+ RETURN_EXIT;
restart:
- lock_res(res);
- rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
- if (rc == LDLM_ITER_CONTINUE)
- ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
- unlock_res(res);
+ lock_res(res);
+ rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list,
+ intention);
+ if (rc == LDLM_ITER_CONTINUE)
+ ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list,
+ intention);
+ unlock_res(res);
rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
LDLM_WORK_CP_AST);
#endif
EXIT;
}
+
+void ldlm_reprocess_all(struct ldlm_resource *res)
+{
+ __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN);
+}
EXPORT_SYMBOL(ldlm_reprocess_all);
+static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+ struct hlist_node *hnode, void *arg)
+{
+ struct ldlm_resource *res = cfs_hash_object(hs, hnode);
+
+ /* This is only called once after recovery done. LU-8306. */
+ __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY);
+ return 0;
+}
+
+/**
+ * Iterate through all resources on a namespace attempting to grant waiting
+ * locks.
+ */
+void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns)
+{
+ ENTRY;
+
+ if (ns != NULL) {
+ cfs_hash_for_each_nolock(ns->ns_rs_hash,
+ ldlm_reprocess_res, NULL, 0);
+ }
+ EXIT;
+}
+
+static bool is_bl_done(struct ldlm_lock *lock)
+{
+ bool bl_done = true;
+
+ if (!ldlm_is_bl_done(lock)) {
+ lock_res_and_lock(lock);
+ bl_done = ldlm_is_bl_done(lock);
+ unlock_res_and_lock(lock);
+ }
+
+ return bl_done;
+}
+
/**
* Helper function to call blocking AST for LDLM lock \a lock in a
* "cancelling" mode.
} else {
LDLM_DEBUG(lock, "no blocking ast");
}
- }
- ldlm_set_bl_done(lock);
+
+ /* only canceller can set bl_done bit */
+ ldlm_set_bl_done(lock);
+ wake_up_all(&lock->l_waitq);
+ } else if (!ldlm_is_bl_done(lock)) {
+ struct l_wait_info lwi = { 0 };
+
+ /* The lock is guaranteed to have been canceled once
+ * returning from this function. */
+ unlock_res_and_lock(lock);
+ l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi);
+ lock_res_and_lock(lock);
+ }
}
/**
/**
* Set opaque data into the lock that only makes sense to upper layer.
*/
-int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
+int ldlm_lock_set_data(const struct lustre_handle *lockh, void *data)
{
struct ldlm_lock *lock = ldlm_handle2lock(lockh);
int rc = -EINVAL;
int ecl_loop;
};
+static void ldlm_cancel_lock_for_export(struct obd_export *exp,
+ struct ldlm_lock *lock,
+ struct export_cl_data *ecl)
+{
+ struct ldlm_resource *res;
+
+ res = ldlm_resource_getref(lock->l_resource);
+
+ ldlm_res_lvbo_update(res, NULL, 1);
+ ldlm_lock_cancel(lock);
+ if (!exp->exp_obd->obd_stopping)
+ ldlm_reprocess_all(res);
+ ldlm_resource_putref(res);
+
+ ecl->ecl_loop++;
+ if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
+ CDEBUG(D_INFO, "Export %p, %d locks cancelled.\n",
+ exp, ecl->ecl_loop);
+ }
+}
+
/**
- * Iterator function for ldlm_cancel_locks_for_export.
+ * Iterator function for ldlm_export_cancel_locks.
* Cancels passed locks.
*/
-int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
- struct hlist_node *hnode, void *data)
+static int
+ldlm_cancel_locks_for_export_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+ struct hlist_node *hnode, void *data)
{
struct export_cl_data *ecl = (struct export_cl_data *)data;
struct obd_export *exp = ecl->ecl_exp;
- struct ldlm_lock *lock = cfs_hash_object(hs, hnode);
- struct ldlm_resource *res;
+ struct ldlm_lock *lock = cfs_hash_object(hs, hnode);
- res = ldlm_resource_getref(lock->l_resource);
- LDLM_LOCK_GET(lock);
+ LDLM_LOCK_GET(lock);
+ ldlm_cancel_lock_for_export(exp, lock, ecl);
+ LDLM_LOCK_RELEASE(lock);
- LDLM_DEBUG(lock, "export %p", exp);
- ldlm_res_lvbo_update(res, NULL, 1);
- ldlm_lock_cancel(lock);
- ldlm_reprocess_all(res);
- ldlm_resource_putref(res);
- LDLM_LOCK_RELEASE(lock);
+ return 0;
+}
- ecl->ecl_loop++;
- if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
- CDEBUG(D_INFO,
- "Cancel lock %p for export %p (loop %d), still have "
- "%d locks left on hash table.\n",
- lock, exp, ecl->ecl_loop,
- atomic_read(&hs->hs_count));
+/**
+ * Cancel all blocked locks for given export.
+ *
+ * Typically called on client disconnection/eviction
+ */
+int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
+{
+ struct export_cl_data ecl = {
+ .ecl_exp = exp,
+ .ecl_loop = 0,
+ };
+
+ while (!list_empty(&exp->exp_bl_list)) {
+ struct ldlm_lock *lock;
+
+ spin_lock_bh(&exp->exp_bl_list_lock);
+ if (!list_empty(&exp->exp_bl_list)) {
+ lock = list_entry(exp->exp_bl_list.next,
+ struct ldlm_lock, l_exp_list);
+ LDLM_LOCK_GET(lock);
+ list_del_init(&lock->l_exp_list);
+ } else {
+ lock = NULL;
+ }
+ spin_unlock_bh(&exp->exp_bl_list_lock);
+
+ if (lock == NULL)
+ break;
+
+ ldlm_cancel_lock_for_export(exp, lock, &ecl);
+ LDLM_LOCK_RELEASE(lock);
}
- return 0;
+ CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
+ "left on hash table %d.\n", exp, ecl.ecl_loop,
+ atomic_read(&exp->exp_lock_hash->hs_count));
+
+ return ecl.ecl_loop;
}
/**
* Cancel all locks for given export.
*
- * Typically called on client disconnection/eviction
+ * Typically called after client disconnection/eviction
*/
-void ldlm_cancel_locks_for_export(struct obd_export *exp)
+int ldlm_export_cancel_locks(struct obd_export *exp)
{
struct export_cl_data ecl = {
.ecl_exp = exp,
cfs_hash_for_each_empty(exp->exp_lock_hash,
ldlm_cancel_locks_for_export_cb, &ecl);
+
+ CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
+ "left on hash table %d.\n", exp, ecl.ecl_loop,
+ atomic_read(&exp->exp_lock_hash->hs_count));
+
+ if (ecl.ecl_loop > 0 &&
+ atomic_read(&exp->exp_lock_hash->hs_count) == 0 &&
+ exp->exp_obd->obd_stopping)
+ ldlm_reprocess_recovery_done(exp->exp_obd->obd_namespace);
+
+ return ecl.ecl_loop;
}
/**
* Downgrade an exclusive lock.
*
- * A fast variant of ldlm_lock_convert for convertion of exclusive
- * locks. The convertion is always successful.
+ * A fast variant of ldlm_lock_convert for convertion of exclusive locks. The
+ * convertion may fail if lock was canceled before downgrade, but it doesn't
+ * indicate any problem, because such lock has no reader or writer, and will
+ * be released soon.
* Used by Commit on Sharing (COS) code.
*
* \param lock A lock to convert
* \param new_mode new lock mode
*/
-void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
+void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
{
- ENTRY;
+ ENTRY;
- LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
- LASSERT(new_mode == LCK_COS);
+ LASSERT(new_mode == LCK_COS);
- lock_res_and_lock(lock);
- ldlm_resource_unlink_lock(lock);
- /*
- * Remove the lock from pool as it will be added again in
- * ldlm_grant_lock() called below.
- */
- ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
+ lock_res_and_lock(lock);
- lock->l_req_mode = new_mode;
- ldlm_grant_lock(lock, NULL);
- unlock_res_and_lock(lock);
- ldlm_reprocess_all(lock->l_resource);
+ if (!(lock->l_granted_mode & (LCK_PW | LCK_EX))) {
+ unlock_res_and_lock(lock);
- EXIT;
+ LASSERT(lock->l_granted_mode == LCK_MINMODE);
+ LDLM_DEBUG(lock, "lock was canceled before downgrade");
+ RETURN_EXIT;
+ }
+
+ ldlm_resource_unlink_lock(lock);
+ /*
+ * Remove the lock from pool as it will be added again in
+ * ldlm_grant_lock() called below.
+ */
+ ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
+ lock->l_req_mode = new_mode;
+ ldlm_grant_lock(lock, NULL);
+
+ unlock_res_and_lock(lock);
+
+ ldlm_reprocess_all(lock->l_resource);
+
+ EXIT;
}
EXPORT_SYMBOL(ldlm_lock_downgrade);
* optimizations could take advantage of it to avoid discarding cached
* pages on a file.
*/
-struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
- __u32 *flags)
+struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock,
+ enum ldlm_mode new_mode, __u32 *flags)
{
struct list_head rpc_list;
struct ldlm_resource *res;
lock->l_completion_ast(lock, 0, NULL);
}
#ifdef HAVE_SERVER_SUPPORT
- } else {
- int rc;
- ldlm_error_t err;
+ } else {
+ int rc;
+ enum ldlm_error err;
__u64 pflags = 0;
- ldlm_processing_policy policy;
+ ldlm_processing_policy policy;
+
policy = ldlm_processing_policy_table[res->lr_type];
- rc = policy(lock, &pflags, 0, &err, &rpc_list);
+ rc = policy(lock, &pflags, LDLM_PROCESS_RESCAN, &err,
+ &rpc_list);
if (rc == LDLM_ITER_STOP) {
lock->l_req_mode = old_mode;
if (res->lr_type == LDLM_EXTENT)
OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
RETURN(res);
}
-EXPORT_SYMBOL(ldlm_lock_convert);
/**
* Print lock with lock handle \a lockh description into debug log.
*
* Used when printing all locks on a resource for debug purposes.
*/
-void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
+void ldlm_lock_dump_handle(int level, const struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
{
va_list args;
struct obd_export *exp = lock->l_export;
- struct ldlm_resource *resource = lock->l_resource;
+ struct ldlm_resource *resource = NULL;
char *nid = "local";
+ /* on server-side resource of lock doesn't change */
+ if ((lock->l_flags & LDLM_FL_NS_SRV) != 0) {
+ if (lock->l_resource != NULL)
+ resource = ldlm_resource_getref(lock->l_resource);
+ } else if (spin_trylock(&lock->l_lock)) {
+ if (lock->l_resource != NULL)
+ resource = ldlm_resource_getref(lock->l_resource);
+ spin_unlock(&lock->l_lock);
+ }
+
va_start(args, fmt);
if (exp && exp->exp_connection) {
if (resource == NULL) {
libcfs_debug_vmsg2(msgdata, fmt, args,
- " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
- "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
- "remote: "LPX64" expref: %d pid: %u timeout: %lu "
+ " ns: \?\? lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+ "res: \?\? rrc=\?\? type: \?\?\? flags: %#llx nid: %s "
+ "remote: %#llx expref: %d pid: %u timeout: %lu "
"lvb_type: %d\n",
lock,
lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
switch (resource->lr_type) {
case LDLM_EXTENT:
libcfs_debug_vmsg2(msgdata, fmt, args,
- " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
- "res: "DLDLMRES" rrc: %d type: %s ["LPU64"->"LPU64"] "
- "(req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote: "
- LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
+ " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+ "res: "DLDLMRES" rrc: %d type: %s [%llu->%llu] "
+ "(req %llu->%llu) flags: %#llx nid: %s remote: "
+ "%#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
ldlm_lock_to_ns_name(lock), lock,
lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
lock->l_readers, lock->l_writers,
case LDLM_FLOCK:
libcfs_debug_vmsg2(msgdata, fmt, args,
- " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
+ " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
"res: "DLDLMRES" rrc: %d type: %s pid: %d "
- "["LPU64"->"LPU64"] flags: "LPX64" nid: %s "
- "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
+ "[%llu->%llu] flags: %#llx nid: %s "
+ "remote: %#llx expref: %d pid: %u timeout: %lu\n",
ldlm_lock_to_ns_name(lock), lock,
lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
lock->l_readers, lock->l_writers,
case LDLM_IBITS:
libcfs_debug_vmsg2(msgdata, fmt, args,
- " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
- "res: "DLDLMRES" bits "LPX64" rrc: %d type: %s "
- "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
+ " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+ "res: "DLDLMRES" bits %#llx/%#llx rrc: %d type: %s "
+ "flags: %#llx nid: %s remote: %#llx expref: %d "
"pid: %u timeout: %lu lvb_type: %d\n",
ldlm_lock_to_ns_name(lock),
lock, lock->l_handle.h_cookie,
ldlm_lockname[lock->l_req_mode],
PLDLMRES(resource),
lock->l_policy_data.l_inodebits.bits,
+ lock->l_policy_data.l_inodebits.try_bits,
atomic_read(&resource->lr_refcount),
ldlm_typename[resource->lr_type],
lock->l_flags, nid, lock->l_remote_handle.cookie,
default:
libcfs_debug_vmsg2(msgdata, fmt, args,
- " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
- "res: "DLDLMRES" rrc: %d type: %s flags: "LPX64" "
- "nid: %s remote: "LPX64" expref: %d pid: %u "
+ " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+ "res: "DLDLMRES" rrc: %d type: %s flags: %#llx "
+ "nid: %s remote: %#llx expref: %d pid: %u "
"timeout: %lu lvb_type: %d\n",
ldlm_lock_to_ns_name(lock),
lock, lock->l_handle.h_cookie,
break;
}
va_end(args);
+ ldlm_resource_putref(resource);
}
EXPORT_SYMBOL(_ldlm_lock_debug);