*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/ldlm/ldlm_lock.c
*
[LCK_CR] = "CR",
[LCK_NL] = "NL",
[LCK_GROUP] = "GROUP",
- [LCK_COS] = "COS"
+ [LCK_COS] = "COS",
+ [LCK_TXN] = "TXN"
};
EXPORT_SYMBOL(ldlm_lockname);
ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
{
- return ldlm_processing_policy_table[res->lr_type];
+ return ldlm_processing_policy_table[res->lr_type];
}
EXPORT_SYMBOL(ldlm_get_processing_policy);
void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
{
- ns->ns_policy = arg;
+ ns->ns_policy = arg;
}
EXPORT_SYMBOL(ldlm_register_intent);
-/*
- * REFCOUNTED LOCK OBJECTS
- */
+/* REFCOUNTED LOCK OBJECTS */
/**
struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
{
refcount_inc(&lock->l_handle.h_ref);
- return lock;
+ return lock;
}
EXPORT_SYMBOL(ldlm_lock_get);
*/
void ldlm_lock_put(struct ldlm_lock *lock)
{
- ENTRY;
+ ENTRY;
- LASSERT(lock->l_resource != LP_POISON);
+ LASSERT(lock->l_resource != LP_POISON);
LASSERT(refcount_read(&lock->l_handle.h_ref) > 0);
if (refcount_dec_and_test(&lock->l_handle.h_ref)) {
- struct ldlm_resource *res;
+ struct ldlm_resource *res;
- LDLM_DEBUG(lock,
- "final lock_put on destroyed lock, freeing it.");
+ LDLM_DEBUG(lock,
+ "final lock_put on destroyed lock, freeing it.");
- res = lock->l_resource;
+ res = lock->l_resource;
LASSERT(ldlm_is_destroyed(lock));
LASSERT(list_empty(&lock->l_exp_list));
LASSERT(list_empty(&lock->l_res_link));
LASSERT(list_empty(&lock->l_pending_chain));
- lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
- LDLM_NSS_LOCKS);
- lu_ref_del(&res->lr_reference, "lock", lock);
- if (lock->l_export) {
- class_export_lock_put(lock->l_export, lock);
- lock->l_export = NULL;
- }
+ lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
+ LDLM_NSS_LOCKS);
+ lu_ref_del(&res->lr_reference, "lock", lock);
+ if (lock->l_export) {
+ class_export_lock_put(lock->l_export, lock);
+ lock->l_export = NULL;
+ }
- if (lock->l_lvb_data != NULL)
- OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
+ if (lock->l_lvb_data != NULL)
+ OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
- if (res->lr_type == LDLM_EXTENT) {
+ if (res->lr_type == LDLM_EXTENT || res->lr_type == LDLM_FLOCK) {
ldlm_interval_free(ldlm_interval_detach(lock));
} else if (res->lr_type == LDLM_IBITS) {
if (lock->l_ibits_node != NULL)
}
ldlm_resource_putref(res);
lock->l_resource = NULL;
- lu_ref_fini(&lock->l_reference);
- call_rcu(&lock->l_handle.h_rcu, lock_handle_free);
- }
+ lu_ref_fini(&lock->l_reference);
+ if (lock->l_flags & BIT(63))
+ /* Performance testing - bypassing RCU removes overhead */
+ lock_handle_free(&lock->l_handle.h_rcu);
+ else
+ call_rcu(&lock->l_handle.h_rcu, lock_handle_free);
+ }
- EXIT;
+ EXIT;
}
EXPORT_SYMBOL(ldlm_lock_put);
int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
{
int rc = 0;
+
if (!list_empty(&lock->l_lru)) {
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
RETURN(rc);
}
-/**
- * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
- */
+/* Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked. */
void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
{
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
ns->ns_nr_unused++;
}
-/**
- * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
- * first.
- */
-void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
+/* Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks first */
+static void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
{
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
*/
static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
{
- ENTRY;
+ ENTRY;
- if (lock->l_readers || lock->l_writers) {
- LDLM_ERROR(lock, "lock still has references");
- LBUG();
- }
+ if (lock->l_readers || lock->l_writers) {
+ LDLM_ERROR(lock, "lock still has references");
+ LBUG();
+ }
if (!list_empty(&lock->l_res_link)) {
- LDLM_ERROR(lock, "lock still on resource");
- LBUG();
- }
+ LDLM_ERROR(lock, "lock still on resource");
+ LBUG();
+ }
if (ldlm_is_destroyed(lock)) {
LASSERT(list_empty(&lock->l_lru));
return 0;
}
ldlm_set_destroyed(lock);
+ wake_up(&lock->l_waitq);
if (lock->l_export && lock->l_export->exp_lock_hash) {
- /* NB: it's safe to call cfs_hash_del() even lock isn't
- * in exp_lock_hash. */
- /* In the function below, .hs_keycmp resolves to
- * ldlm_export_lock_keycmp() */
- /* coverity[overrun-buffer-val] */
+ /* Safe to call cfs_hash_del as lock isn't in exp_lock_hash. */
+ /* below, .hs_keycmp resolves to ldlm_export_lock_keycmp() */
cfs_hash_del(lock->l_export->exp_lock_hash,
&lock->l_remote_handle, &lock->l_exp_hash);
}
- ldlm_lock_remove_from_lru(lock);
- class_handle_unhash(&lock->l_handle);
+ ldlm_lock_remove_from_lru(lock);
+ class_handle_unhash(&lock->l_handle);
- EXIT;
- return 1;
+ EXIT;
+ return 1;
}
-/**
- * Destroys a LDLM lock \a lock. Performs necessary locking first.
- */
+/* Destroys a LDLM lock \a lock. Performs necessary locking first. */
void ldlm_lock_destroy(struct ldlm_lock *lock)
{
- int first;
- ENTRY;
- lock_res_and_lock(lock);
- first = ldlm_lock_destroy_internal(lock);
- unlock_res_and_lock(lock);
+ int first;
+
+ ENTRY;
+ lock_res_and_lock(lock);
+ first = ldlm_lock_destroy_internal(lock);
+ unlock_res_and_lock(lock);
- /* drop reference from hashtable only for first destroy */
- if (first) {
- lu_ref_del(&lock->l_reference, "hash", lock);
- LDLM_LOCK_RELEASE(lock);
- }
- EXIT;
+ /* drop reference from hashtable only for first destroy */
+ if (first) {
+ lu_ref_del(&lock->l_reference, "hash", lock);
+ LDLM_LOCK_RELEASE(lock);
+ }
+ EXIT;
}
-/**
- * Destroys a LDLM lock \a lock that is already locked.
- */
+/* Destroys a LDLM lock \a lock that is already locked. */
void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
{
- int first;
- ENTRY;
- first = ldlm_lock_destroy_internal(lock);
- /* drop reference from hashtable only for first destroy */
- if (first) {
- lu_ref_del(&lock->l_reference, "hash", lock);
- LDLM_LOCK_RELEASE(lock);
- }
- EXIT;
+ int first;
+
+ ENTRY;
+ first = ldlm_lock_destroy_internal(lock);
+ /* drop reference from hashtable only for first destroy */
+ if (first) {
+ lu_ref_del(&lock->l_reference, "hash", lock);
+ LDLM_LOCK_RELEASE(lock);
+ }
+ EXIT;
}
static const char lock_handle_owner[] = "ldlm";
static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
{
struct ldlm_lock *lock;
+
ENTRY;
if (resource == NULL)
if (lock == NULL)
RETURN(NULL);
- spin_lock_init(&lock->l_lock);
- lock->l_resource = resource;
+ RCU_INIT_POINTER(lock->l_resource, resource);
lu_ref_add(&resource->lr_reference, "lock", lock);
refcount_set(&lock->l_handle.h_ref, 2);
INIT_HLIST_NODE(&lock->l_exp_hash);
INIT_HLIST_NODE(&lock->l_exp_flock_hash);
- lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
- LDLM_NSS_LOCKS);
+ lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
+ LDLM_NSS_LOCKS);
INIT_HLIST_NODE(&lock->l_handle.h_link);
class_handle_hash(&lock->l_handle, lock_handle_owner);
- lu_ref_init(&lock->l_reference);
- lu_ref_add(&lock->l_reference, "hash", lock);
- lock->l_callback_timeout = 0;
+ lu_ref_init(&lock->l_reference);
+ lu_ref_add(&lock->l_reference, "hash", lock);
+ lock->l_callback_timestamp = 0;
lock->l_activity = 0;
#if LUSTRE_TRACKS_LOCK_EXP_REFS
INIT_LIST_HEAD(&lock->l_exp_refs_link);
- lock->l_exp_refs_nr = 0;
- lock->l_exp_refs_target = NULL;
+ lock->l_exp_refs_nr = 0;
+ lock->l_exp_refs_target = NULL;
#endif
INIT_LIST_HEAD(&lock->l_exp_list);
- RETURN(lock);
+ RETURN(lock);
}
+struct ldlm_lock *ldlm_lock_new_testing(struct ldlm_resource *resource)
+{
+ struct ldlm_lock *lock = ldlm_lock_new(resource);
+ int rc;
+
+ if (!lock)
+ return NULL;
+ lock->l_flags |= BIT(63);
+ switch (resource->lr_type) {
+ case LDLM_EXTENT:
+ rc = ldlm_extent_alloc_lock(lock);
+ break;
+ case LDLM_IBITS:
+ rc = ldlm_inodebits_alloc_lock(lock);
+ break;
+ default:
+ rc = 0;
+ }
+
+ if (!rc)
+ return lock;
+ ldlm_lock_destroy(lock);
+ LDLM_LOCK_RELEASE(lock);
+ return NULL;
+}
+EXPORT_SYMBOL(ldlm_lock_new_testing);
+
/**
* Moves LDLM lock \a lock to another resource.
* This is used on client when server returns some other lock than requested
* (typically as a result of intent operation)
*/
int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
- const struct ldlm_res_id *new_resid)
+ const struct ldlm_res_id *new_resid)
{
- struct ldlm_resource *oldres = lock->l_resource;
- struct ldlm_resource *newres;
- int type;
- ENTRY;
+ struct ldlm_resource *oldres;
+ struct ldlm_resource *newres;
+ int type;
- LASSERT(ns_is_client(ns));
+ ENTRY;
+
+ LASSERT(ns_is_client(ns));
- lock_res_and_lock(lock);
- if (memcmp(new_resid, &lock->l_resource->lr_name,
- sizeof(lock->l_resource->lr_name)) == 0) {
- /* Nothing to do */
- unlock_res_and_lock(lock);
- RETURN(0);
- }
+ oldres = lock_res_and_lock(lock);
+ if (memcmp(new_resid, &oldres->lr_name,
+ sizeof(oldres->lr_name)) == 0) {
+ /* Nothing to do */
+ unlock_res_and_lock(lock);
+ RETURN(0);
+ }
- LASSERT(new_resid->name[0] != 0);
+ LASSERT(new_resid->name[0] != 0);
- /* This function assumes that the lock isn't on any lists */
+ /* This function assumes that the lock isn't on any lists */
LASSERT(list_empty(&lock->l_res_link));
- type = oldres->lr_type;
- unlock_res_and_lock(lock);
+ type = oldres->lr_type;
+ unlock_res_and_lock(lock);
- newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
+ newres = ldlm_resource_get(ns, new_resid, type, 1);
if (IS_ERR(newres))
RETURN(PTR_ERR(newres));
- lu_ref_add(&newres->lr_reference, "lock", lock);
- /*
- * To flip the lock from the old to the new resource, lock, oldres and
- * newres have to be locked. Resource spin-locks are nested within
- * lock->l_lock, and are taken in the memory address order to avoid
- * dead-locks.
- */
- spin_lock(&lock->l_lock);
- oldres = lock->l_resource;
- if (oldres < newres) {
- lock_res(oldres);
- lock_res_nested(newres, LRT_NEW);
- } else {
- lock_res(newres);
- lock_res_nested(oldres, LRT_NEW);
- }
- LASSERT(memcmp(new_resid, &oldres->lr_name,
- sizeof oldres->lr_name) != 0);
- lock->l_resource = newres;
- unlock_res(oldres);
- unlock_res_and_lock(lock);
-
- /* ...and the flowers are still standing! */
- lu_ref_del(&oldres->lr_reference, "lock", lock);
- ldlm_resource_putref(oldres);
-
- RETURN(0);
+ lu_ref_add(&newres->lr_reference, "lock", lock);
+ /*
+ * To flip the lock from the old to the new resource, oldres
+ * and newres have to be locked. Resource spin-locks are taken
+ * in the memory address order to avoid dead-locks.
+ * As this is the only circumstance where ->l_resource
+ * can change, and this cannot race with itself, it is safe
+ * to access lock->l_resource without being careful about locking.
+ */
+ oldres = lock->l_resource;
+ if (oldres < newres) {
+ lock_res(oldres);
+ lock_res_nested(newres, LRT_NEW);
+ } else {
+ lock_res(newres);
+ lock_res_nested(oldres, LRT_NEW);
+ }
+ LASSERT(memcmp(new_resid, &oldres->lr_name,
+ sizeof(oldres->lr_name)) != 0);
+ rcu_assign_pointer(lock->l_resource, newres);
+ unlock_res(oldres);
+ unlock_res(newres);
+
+ /* ...and the flowers are still standing! */
+ lu_ref_del(&oldres->lr_reference, "lock", lock);
+ ldlm_resource_putref(oldres);
+
+ RETURN(0);
}
/** \defgroup ldlm_handles LDLM HANDLES
__u64 flags)
{
struct ldlm_lock *lock;
+
ENTRY;
LASSERT(handle);
}
/* It's unlikely but possible that someone marked the lock as
- * destroyed after we did handle2object on it */
+ * destroyed after we did handle2object on it
+ */
if ((flags == 0) && !ldlm_is_destroyed(lock)) {
- lu_ref_add(&lock->l_reference, "handle", current);
+ lu_ref_add_atomic(&lock->l_reference, "handle", lock);
RETURN(lock);
}
LASSERT(lock->l_resource != NULL);
- lu_ref_add_atomic(&lock->l_reference, "handle", current);
+ lu_ref_add_atomic(&lock->l_reference, "handle", lock);
if (unlikely(ldlm_is_destroyed(lock))) {
unlock_res_and_lock(lock);
CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
ldlm_set_ast_sent(lock);
/* If the enqueuing client said so, tell the AST recipient to
- * discard dirty data, rather than writing back. */
+ * discard dirty data, rather than writing back.
+ */
if (ldlm_is_ast_discard_data(new))
ldlm_set_discard_data(lock);
}
}
-/**
- * Add a lock to list of just granted locks to send completion AST to.
- */
+/* Add a lock to list of just granted locks to send completion AST to. */
static void ldlm_add_cp_work_item(struct ldlm_lock *lock,
struct list_head *work_list)
{
if (!ldlm_is_cp_reqd(lock)) {
ldlm_set_cp_reqd(lock);
- LDLM_DEBUG(lock, "lock granted; sending completion AST.");
+ LDLM_DEBUG(lock, "lock granted; sending completion AST.");
LASSERT(list_empty(&lock->l_cp_ast));
list_add(&lock->l_cp_ast, work_list);
- LDLM_LOCK_GET(lock);
- }
+ LDLM_LOCK_GET(lock);
+ }
}
/**
void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
struct list_head *work_list)
{
- ENTRY;
- check_res_locked(lock->l_resource);
- if (new)
- ldlm_add_bl_work_item(lock, new, work_list);
- else
- ldlm_add_cp_work_item(lock, work_list);
- EXIT;
+ ENTRY;
+ check_res_locked(lock->l_resource);
+ if (new)
+ ldlm_add_bl_work_item(lock, new, work_list);
+ else
+ ldlm_add_cp_work_item(lock, work_list);
+ EXIT;
}
/**
void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock,
enum ldlm_mode mode)
{
- ldlm_lock_remove_from_lru(lock);
- if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
- lock->l_readers++;
- lu_ref_add_atomic(&lock->l_reference, "reader", lock);
- }
- if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
- lock->l_writers++;
- lu_ref_add_atomic(&lock->l_reference, "writer", lock);
- }
- LDLM_LOCK_GET(lock);
- lu_ref_add_atomic(&lock->l_reference, "user", lock);
- LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
+ ldlm_lock_remove_from_lru(lock);
+ if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
+ lock->l_readers++;
+ lu_ref_add_atomic(&lock->l_reference, "reader", lock);
+ }
+ if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS | LCK_TXN)) {
+ lock->l_writers++;
+ lu_ref_add_atomic(&lock->l_reference, "writer", lock);
+ }
+ LDLM_LOCK_GET(lock);
+ lu_ref_add_atomic(&lock->l_reference, "user", lock);
+ LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
}
/**
*/
int ldlm_lock_addref_try(const struct lustre_handle *lockh, enum ldlm_mode mode)
{
- struct ldlm_lock *lock;
- int result;
+ struct ldlm_lock *lock;
+ int result;
- result = -EAGAIN;
- lock = ldlm_handle2lock(lockh);
- if (lock != NULL) {
- lock_res_and_lock(lock);
- if (lock->l_readers != 0 || lock->l_writers != 0 ||
+ result = -EAGAIN;
+ lock = ldlm_handle2lock(lockh);
+ if (lock != NULL) {
+ lock_res_and_lock(lock);
+ if (lock->l_readers != 0 || lock->l_writers != 0 ||
!ldlm_is_cbpending(lock)) {
- ldlm_lock_addref_internal_nolock(lock, mode);
- result = 0;
- }
- unlock_res_and_lock(lock);
- LDLM_LOCK_PUT(lock);
- }
- return result;
+ ldlm_lock_addref_internal_nolock(lock, mode);
+ result = 0;
+ }
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_PUT(lock);
+ }
+ return result;
}
EXPORT_SYMBOL(ldlm_lock_addref_try);
void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock,
enum ldlm_mode mode)
{
- LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
- if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
- LASSERT(lock->l_readers > 0);
- lu_ref_del(&lock->l_reference, "reader", lock);
- lock->l_readers--;
- }
- if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
- LASSERT(lock->l_writers > 0);
- lu_ref_del(&lock->l_reference, "writer", lock);
- lock->l_writers--;
- }
+ LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
+ if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
+ LASSERT(lock->l_readers > 0);
+ lu_ref_del(&lock->l_reference, "reader", lock);
+ lock->l_readers--;
+ }
+ if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS | LCK_TXN)) {
+ LASSERT(lock->l_writers > 0);
+ lu_ref_del(&lock->l_reference, "writer", lock);
+ lock->l_writers--;
+ }
- lu_ref_del(&lock->l_reference, "user", lock);
- LDLM_LOCK_RELEASE(lock); /* matches the LDLM_LOCK_GET() in addref */
+ lu_ref_del(&lock->l_reference, "user", lock);
+ LDLM_LOCK_RELEASE(lock); /* matches the LDLM_LOCK_GET() in addref */
}
/**
*/
void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
{
- struct ldlm_namespace *ns;
- ENTRY;
+ struct ldlm_namespace *ns;
- lock_res_and_lock(lock);
+ ENTRY;
- ns = ldlm_lock_to_ns(lock);
+ lock_res_and_lock(lock);
- ldlm_lock_decref_internal_nolock(lock, mode);
+ ns = ldlm_lock_to_ns(lock);
+
+ ldlm_lock_decref_internal_nolock(lock, mode);
if ((ldlm_is_local(lock) || lock->l_req_mode == LCK_GROUP) &&
!lock->l_readers && !lock->l_writers) {
* like non-group locks, instead they are manually released.
* They have an l_writers reference which they keep until
* they are manually released, so we remove them when they have
- * no more reader or writer references. - LU-6368 */
+ * no more reader or writer references. - LU-6368
+ */
ldlm_set_cbpending(lock);
}
if (!lock->l_readers && !lock->l_writers && ldlm_is_cbpending(lock)) {
+ unsigned int mask = D_DLMTRACE;
+
/* If we received a blocked AST and this was the last reference,
- * run the callback. */
+ * run the callback.
+ */
if (ldlm_is_ns_srv(lock) && lock->l_export)
- CERROR("FL_CBPENDING set on non-local lock--just a "
- "warning\n");
+ mask |= D_WARNING;
+ LDLM_DEBUG_LIMIT(mask, lock,
+ "final decref done on %sCBPENDING lock",
+ mask & D_WARNING ? "non-local " : "");
- LDLM_DEBUG(lock, "final decref done on cbpending lock");
-
- LDLM_LOCK_GET(lock); /* dropped by bl thread */
- ldlm_lock_remove_from_lru(lock);
- unlock_res_and_lock(lock);
+ LDLM_LOCK_GET(lock); /* dropped by bl thread */
+ ldlm_lock_remove_from_lru(lock);
+ unlock_res_and_lock(lock);
if (ldlm_is_fail_loc(lock))
- OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+ CFS_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
- if (ldlm_is_atomic_cb(lock) ||
- ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
- ldlm_handle_bl_callback(ns, NULL, lock);
- } else if (ns_is_client(ns) &&
- !lock->l_readers && !lock->l_writers &&
+ if (ldlm_is_atomic_cb(lock) || ldlm_is_local(lock) ||
+ ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
+ ldlm_handle_bl_callback(ns, NULL, lock);
+ } else if (ns_is_client(ns) &&
+ !lock->l_readers && !lock->l_writers &&
!ldlm_is_no_lru(lock) &&
!ldlm_is_bl_ast(lock) &&
!ldlm_is_converting(lock)) {
- LDLM_DEBUG(lock, "add lock into lru list");
-
- /* If this is a client-side namespace and this was the last
- * reference, put it on the LRU. */
- ldlm_lock_add_to_lru(lock);
- unlock_res_and_lock(lock);
+ /* If this is a client-side namespace and this was the last
+ * reference, put it on the LRU.
+ */
+ ldlm_lock_add_to_lru(lock);
+ unlock_res_and_lock(lock);
+ LDLM_DEBUG(lock, "add lock into lru list");
if (ldlm_is_fail_loc(lock))
- OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+ CFS_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
- /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
- * are not supported by the server, otherwise, it is done on
- * enqueue. */
- if (!exp_connect_cancelset(lock->l_conn_export) &&
- !ns_connect_lru_resize(ns))
- ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
- } else {
- LDLM_DEBUG(lock, "do not add lock into lru list");
- unlock_res_and_lock(lock);
- }
+ ldlm_pool_recalc(&ns->ns_pool, true);
+ } else {
+ LDLM_DEBUG(lock, "do not add lock into lru list");
+ unlock_res_and_lock(lock);
+ }
- EXIT;
+ EXIT;
}
/**
*/
void ldlm_lock_decref(const struct lustre_handle *lockh, enum ldlm_mode mode)
{
- struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
+ struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
+
LASSERTF(lock != NULL, "Non-existing lock: %#llx\n", lockh->cookie);
- ldlm_lock_decref_internal(lock, mode);
- LDLM_LOCK_PUT(lock);
+ ldlm_lock_decref_internal(lock, mode);
+ LDLM_LOCK_PUT(lock);
}
EXPORT_SYMBOL(ldlm_lock_decref);
void ldlm_lock_decref_and_cancel(const struct lustre_handle *lockh,
enum ldlm_mode mode)
{
- struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
- ENTRY;
+ struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
- LASSERT(lock != NULL);
+ ENTRY;
+
+ LASSERT(lock != NULL);
- LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
- lock_res_and_lock(lock);
+ LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
+ lock_res_and_lock(lock);
ldlm_set_cbpending(lock);
- unlock_res_and_lock(lock);
- ldlm_lock_decref_internal(lock, mode);
- LDLM_LOCK_PUT(lock);
+ unlock_res_and_lock(lock);
+ ldlm_lock_decref_internal(lock, mode);
+ LDLM_LOCK_PUT(lock);
}
EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
* - ldlm_grant_lock_with_skiplist
*/
static void search_granted_lock(struct list_head *queue,
- struct ldlm_lock *req,
- struct sl_insert_point *prev)
+ struct ldlm_lock *req,
+ struct sl_insert_point *prev)
{
- struct list_head *tmp;
- struct ldlm_lock *lock, *mode_end, *policy_end;
- ENTRY;
-
- list_for_each(tmp, queue) {
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ struct ldlm_lock *lock, *mode_end, *policy_end;
+ ENTRY;
+ list_for_each_entry(lock, queue, l_res_link) {
mode_end = list_entry(lock->l_sl_mode.prev,
- struct ldlm_lock, l_sl_mode);
-
- if (lock->l_req_mode != req->l_req_mode) {
- /* jump to last lock of mode group */
- tmp = &mode_end->l_res_link;
- continue;
- }
-
- /* suitable mode group is found */
- if (lock->l_resource->lr_type == LDLM_PLAIN) {
- /* insert point is last lock of the mode group */
- prev->res_link = &mode_end->l_res_link;
- prev->mode_link = &mode_end->l_sl_mode;
- prev->policy_link = &req->l_sl_policy;
- EXIT;
- return;
- } else if (lock->l_resource->lr_type == LDLM_IBITS) {
- for (;;) {
- policy_end =
+ struct ldlm_lock, l_sl_mode);
+
+ if (lock->l_req_mode != req->l_req_mode) {
+ /* jump to last lock of mode group */
+ lock = mode_end;
+ continue;
+ }
+
+ /* suitable mode group is found */
+ if (lock->l_resource->lr_type == LDLM_PLAIN) {
+ /* insert point is last lock of the mode group */
+ prev->res_link = &mode_end->l_res_link;
+ prev->mode_link = &mode_end->l_sl_mode;
+ prev->policy_link = &req->l_sl_policy;
+ EXIT;
+ return;
+ } else if (lock->l_resource->lr_type == LDLM_IBITS) {
+ for (;;) {
+ policy_end =
list_entry(lock->l_sl_policy.prev,
- struct ldlm_lock,
- l_sl_policy);
-
- if (lock->l_policy_data.l_inodebits.bits ==
- req->l_policy_data.l_inodebits.bits) {
- /* insert point is last lock of
- * the policy group */
- prev->res_link =
- &policy_end->l_res_link;
- prev->mode_link =
- &policy_end->l_sl_mode;
- prev->policy_link =
- &policy_end->l_sl_policy;
- EXIT;
- return;
- }
-
- if (policy_end == mode_end)
- /* done with mode group */
- break;
-
- /* go to next policy group within mode group */
- tmp = policy_end->l_res_link.next;
- lock = list_entry(tmp, struct ldlm_lock,
- l_res_link);
- } /* loop over policy groups within the mode group */
-
- /* insert point is last lock of the mode group,
- * new policy group is started */
- prev->res_link = &mode_end->l_res_link;
- prev->mode_link = &mode_end->l_sl_mode;
- prev->policy_link = &req->l_sl_policy;
- EXIT;
- return;
- } else {
- LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
- LBUG();
- }
- }
-
- /* insert point is last lock on the queue,
- * new mode group and new policy group are started */
- prev->res_link = queue->prev;
- prev->mode_link = &req->l_sl_mode;
- prev->policy_link = &req->l_sl_policy;
- EXIT;
+ struct ldlm_lock,
+ l_sl_policy);
+
+ if (lock->l_policy_data.l_inodebits.bits ==
+ req->l_policy_data.l_inodebits.bits) {
+ /* inserting last lock of policy grp */
+ prev->res_link =
+ &policy_end->l_res_link;
+ prev->mode_link =
+ &policy_end->l_sl_mode;
+ prev->policy_link =
+ &policy_end->l_sl_policy;
+ EXIT;
+ return;
+ }
+
+ if (policy_end == mode_end)
+ /* done with mode group */
+ break;
+
+ /* go to next policy group within mode group */
+ lock = list_next_entry(policy_end, l_res_link);
+ } /* loop over policy groups within the mode group */
+
+ /* insert point is last lock of the mode group,
+ * new policy group is started
+ */
+ prev->res_link = &mode_end->l_res_link;
+ prev->mode_link = &mode_end->l_sl_mode;
+ prev->policy_link = &req->l_sl_policy;
+ EXIT;
+ return;
+ }
+ LDLM_ERROR(lock, "is not LDLM_PLAIN or LDLM_IBITS lock");
+ LBUG();
+ }
+
+ /* insert point is last lock on the queue,
+ * new mode group and new policy group are started
+ */
+ prev->res_link = queue->prev;
+ prev->mode_link = &req->l_sl_mode;
+ prev->policy_link = &req->l_sl_policy;
+ EXIT;
}
/**
* \a prev.
*/
static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
- struct sl_insert_point *prev)
+ struct sl_insert_point *prev)
{
- struct ldlm_resource *res = lock->l_resource;
- ENTRY;
+ struct ldlm_resource *res = lock->l_resource;
- check_res_locked(res);
+ ENTRY;
- ldlm_resource_dump(D_INFO, res);
- LDLM_DEBUG(lock, "About to add lock:");
+ check_res_locked(res);
+
+ ldlm_resource_dump(D_INFO, res);
+ LDLM_DEBUG(lock, "About to add lock:");
if (ldlm_is_destroyed(lock)) {
- CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
- return;
- }
+ CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
+ return;
+ }
LASSERT(list_empty(&lock->l_res_link));
LASSERT(list_empty(&lock->l_sl_mode));
if (&lock->l_sl_policy != prev->policy_link)
list_add(&lock->l_sl_policy, prev->policy_link);
- EXIT;
+ EXIT;
}
/**
*/
void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
{
- struct ldlm_resource *res = lock->l_resource;
- ENTRY;
+ struct ldlm_resource *res = lock->l_resource;
+
+ ENTRY;
- check_res_locked(res);
+ check_res_locked(res);
- lock->l_granted_mode = lock->l_req_mode;
+ lock->l_granted_mode = lock->l_req_mode;
if (work_list && lock->l_completion_ast != NULL)
ldlm_add_ast_work_item(lock, NULL, work_list);
- if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
- ldlm_grant_lock_with_skiplist(lock);
- else if (res->lr_type == LDLM_EXTENT)
- ldlm_extent_add_lock(res, lock);
+ if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
+ ldlm_grant_lock_with_skiplist(lock);
+ else if (res->lr_type == LDLM_EXTENT)
+ ldlm_extent_add_lock(res, lock);
else if (res->lr_type == LDLM_FLOCK) {
/* We should not add locks to granted list in the following
* cases:
* - this is an UNLOCK but not a real lock;
* - this is a TEST lock;
* - this is a F_CANCELLK lock (async flock has req_mode == 0)
- * - this is a deadlock (flock cannot be granted) */
+ * - this is a deadlock (flock cannot be granted)
+ */
if (lock->l_req_mode == 0 ||
lock->l_req_mode == LCK_NL ||
ldlm_is_test_lock(lock) ||
LBUG();
}
- ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
- EXIT;
+ ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
+ EXIT;
}
/**
* Check if the given @lock meets the criteria for a match.
* A reference on the lock is taken if matched.
*
- * \param lock test-against this lock
- * \param data parameters
+ * @lock test-against this lock
+ * @data parameters
+ *
+ * RETURN returns true if @lock matches @data, false otherwise
*/
-static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
+static bool lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
{
union ldlm_policy_data *lpol = &lock->l_policy_data;
enum ldlm_mode match = LCK_MINMODE;
if (lock == data->lmd_old)
- return INTERVAL_ITER_STOP;
+ return true;
/* Check if this lock can be matched.
- * Used by LU-2919(exclusive open) for open lease lock */
+ * Used by LU-2919(exclusive open) for open lease lock
+ */
if (ldlm_is_excl(lock))
- return INTERVAL_ITER_CONT;
+ return false;
/* llite sometimes wants to match locks that will be
* canceled when their users drop, but we allow it to match
* if it passes in CBPENDING and the lock still has users.
* this is generally only going to be used by children
* whose parents already hold a lock so forward progress
- * can still happen. */
+ * can still happen.
+ */
if (ldlm_is_cbpending(lock) &&
- !(data->lmd_flags & LDLM_FL_CBPENDING))
- return INTERVAL_ITER_CONT;
- if (!data->lmd_unref && ldlm_is_cbpending(lock) &&
+ !(data->lmd_flags & LDLM_FL_CBPENDING) &&
+ !(data->lmd_match & LDLM_MATCH_GROUP))
+ return false;
+
+ if (!(data->lmd_match & (LDLM_MATCH_UNREF | LDLM_MATCH_GROUP)) &&
+ ldlm_is_cbpending(lock) &&
lock->l_readers == 0 && lock->l_writers == 0)
- return INTERVAL_ITER_CONT;
+ return false;
if (!(lock->l_req_mode & *data->lmd_mode))
- return INTERVAL_ITER_CONT;
+ return false;
/* When we search for ast_data, we are not doing a traditional match,
* so we don't worry about IBITS or extent matching.
*/
- if (data->lmd_has_ast_data) {
+ if (data->lmd_match & (LDLM_MATCH_AST | LDLM_MATCH_AST_ANY)) {
if (!lock->l_ast_data)
- return INTERVAL_ITER_CONT;
+ return false;
- goto matched;
+ if (data->lmd_match & LDLM_MATCH_AST_ANY)
+ goto matched;
}
match = lock->l_req_mode;
switch (lock->l_resource->lr_type) {
case LDLM_EXTENT:
- if (lpol->l_extent.start > data->lmd_policy->l_extent.start ||
- lpol->l_extent.end < data->lmd_policy->l_extent.end)
- return INTERVAL_ITER_CONT;
+ if (!(data->lmd_match & LDLM_MATCH_RIGHT) &&
+ (lpol->l_extent.start > data->lmd_policy->l_extent.start ||
+ lpol->l_extent.end < data->lmd_policy->l_extent.end))
+ return false;
if (unlikely(match == LCK_GROUP) &&
data->lmd_policy->l_extent.gid != LDLM_GID_ANY &&
lpol->l_extent.gid != data->lmd_policy->l_extent.gid)
- return INTERVAL_ITER_CONT;
+ return false;
break;
case LDLM_IBITS:
- /* We match if we have existing lock with same or wider set
- of bits. */
+ /* We match with existing lock with same or wider set of bits */
if ((lpol->l_inodebits.bits &
data->lmd_policy->l_inodebits.bits) !=
data->lmd_policy->l_inodebits.bits)
- return INTERVAL_ITER_CONT;
+ return false;
+
+ if (unlikely(match == LCK_GROUP) &&
+ data->lmd_policy->l_inodebits.li_gid != LDLM_GID_ANY &&
+ lpol->l_inodebits.li_gid !=
+ data->lmd_policy->l_inodebits.li_gid)
+ return false;
break;
default:
- ;
+ break;
}
- /* We match if we have existing lock with same or wider set
- of bits. */
- if (!data->lmd_unref && LDLM_HAVE_MASK(lock, GONE))
- return INTERVAL_ITER_CONT;
+ /* We match if we have existing lock with same or wider set of bits. */
+ if (!(data->lmd_match & LDLM_MATCH_UNREF) && LDLM_HAVE_MASK(lock, GONE))
+ return false;
if (!equi(data->lmd_flags & LDLM_FL_LOCAL_ONLY, ldlm_is_local(lock)))
- return INTERVAL_ITER_CONT;
+ return false;
/* Filter locks by skipping flags */
if (data->lmd_skip_flags & lock->l_flags)
- return INTERVAL_ITER_CONT;
+ return false;
matched:
- if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
+ /**
+ * In case the lock is a CBPENDING grouplock, just pin it and return,
+ * we need to wait until it gets to DESTROYED.
+ */
+ if ((data->lmd_flags & LDLM_FL_TEST_LOCK) ||
+ (ldlm_is_cbpending(lock) && (data->lmd_match & LDLM_MATCH_GROUP))) {
LDLM_LOCK_GET(lock);
ldlm_lock_touch_in_lru(lock);
} else {
*data->lmd_mode = match;
data->lmd_lock = lock;
- return INTERVAL_ITER_STOP;
+ return true;
}
static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
struct ldlm_interval *node = to_ldlm_interval(in);
struct ldlm_match_data *data = args;
struct ldlm_lock *lock;
- int rc;
list_for_each_entry(lock, &node->li_group, l_sl_policy) {
- rc = lock_matches(lock, data);
- if (rc == INTERVAL_ITER_STOP)
+ if (lock_matches(lock, data))
return INTERVAL_ITER_STOP;
}
return INTERVAL_ITER_CONT;
data->lmd_lock = NULL;
+ if (data->lmd_match & LDLM_MATCH_RIGHT)
+ ext.end = OBD_OBJECT_EOF;
+
for (idx = 0; idx < LCK_MODE_NUM; idx++) {
struct ldlm_interval_tree *tree = &res->lr_itree[idx];
struct ldlm_match_data *data)
{
struct ldlm_lock *lock;
- int rc;
data->lmd_lock = NULL;
- list_for_each_entry(lock, queue, l_res_link) {
- rc = lock_matches(lock, data);
- if (rc == INTERVAL_ITER_STOP)
+ list_for_each_entry(lock, queue, l_res_link)
+ if (lock_matches(lock, data))
return data->lmd_lock;
- }
return NULL;
}
{
if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
- wake_up_all(&lock->l_waitq);
+ wake_up(&lock->l_waitq);
}
}
EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
void ldlm_lock_fail_match(struct ldlm_lock *lock)
{
- lock_res_and_lock(lock);
- ldlm_lock_fail_match_locked(lock);
- unlock_res_and_lock(lock);
+ lock_res_and_lock(lock);
+ ldlm_lock_fail_match_locked(lock);
+ unlock_res_and_lock(lock);
}
/**
void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
{
ldlm_set_lvb_ready(lock);
- wake_up_all(&lock->l_waitq);
+ wake_up(&lock->l_waitq);
}
EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
*/
void ldlm_lock_allow_match(struct ldlm_lock *lock)
{
- lock_res_and_lock(lock);
- ldlm_lock_allow_match_locked(lock);
- unlock_res_and_lock(lock);
+ lock_res_and_lock(lock);
+ ldlm_lock_allow_match_locked(lock);
+ unlock_res_and_lock(lock);
}
EXPORT_SYMBOL(ldlm_lock_allow_match);
enum ldlm_type type,
union ldlm_policy_data *policy,
enum ldlm_mode mode,
- struct lustre_handle *lockh, int unref)
+ struct lustre_handle *lockh,
+ enum ldlm_match_flags match_flags)
{
struct ldlm_match_data data = {
.lmd_old = NULL,
.lmd_policy = policy,
.lmd_flags = flags,
.lmd_skip_flags = skip_flags,
- .lmd_unref = unref,
- .lmd_has_ast_data = false,
+ .lmd_match = match_flags,
};
struct ldlm_resource *res;
struct ldlm_lock *lock;
+ struct ldlm_lock *group_lock;
int matched;
ENTRY;
*data.lmd_mode = data.lmd_old->l_req_mode;
}
- res = ldlm_resource_get(ns, NULL, res_id, type, 0);
+ res = ldlm_resource_get(ns, res_id, type, 0);
if (IS_ERR(res)) {
LASSERT(data.lmd_old == NULL);
RETURN(0);
}
+repeat:
+ group_lock = NULL;
LDLM_RESOURCE_ADDREF(res);
lock_res(res);
if (res->lr_type == LDLM_EXTENT)
if (!lock && !(flags & LDLM_FL_BLOCK_GRANTED))
lock = search_queue(&res->lr_waiting, &data);
matched = lock ? mode : 0;
+
+ if (lock && ldlm_is_cbpending(lock) &&
+ (data.lmd_match & LDLM_MATCH_GROUP))
+ group_lock = lock;
unlock_res(res);
LDLM_RESOURCE_DELREF(res);
+
+ if (group_lock) {
+ l_wait_event_abortable(group_lock->l_waitq,
+ ldlm_is_destroyed(lock));
+ LDLM_LOCK_RELEASE(lock);
+ goto repeat;
+ }
ldlm_resource_putref(res);
if (lock) {
{
struct ldlm_lock *lock;
enum ldlm_mode mode = 0;
+
ENTRY;
lock = ldlm_handle2lock(lockh);
GOTO(out, mode);
if (ldlm_is_cbpending(lock) &&
- lock->l_readers == 0 && lock->l_writers == 0)
- GOTO(out, mode);
+ lock->l_readers == 0 && lock->l_writers == 0)
+ GOTO(out, mode);
- if (bits)
- *bits = lock->l_policy_data.l_inodebits.bits;
- mode = lock->l_granted_mode;
- ldlm_lock_addref_internal_nolock(lock, mode);
- }
+ if (bits)
+ *bits = lock->l_policy_data.l_inodebits.bits;
+ mode = lock->l_granted_mode;
+ ldlm_lock_addref_internal_nolock(lock, mode);
+ }
- EXIT;
+ EXIT;
out:
- if (lock != NULL) {
- unlock_res_and_lock(lock);
- LDLM_LOCK_PUT(lock);
- }
- return mode;
+ if (lock != NULL) {
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_PUT(lock);
+ }
+ return mode;
}
EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
enum req_location loc, void *data, int size)
{
void *lvb;
+
ENTRY;
LASSERT(data != NULL);
memcpy(data, lvb, size);
} else {
- LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
+ LDLM_ERROR(lock,
+ "Replied unexpected lquota LVB size %d",
size);
RETURN(-EINVAL);
}
break;
default:
LDLM_ERROR(lock, "Unknown LVB type: %d", lock->l_lvb_type);
- libcfs_debug_dumpstack(NULL);
+ dump_stack();
RETURN(-EINVAL);
}
RETURN(0);
}
-/**
- * Create and fill in new LDLM lock with specified properties.
- * Returns a referenced lock
+/* Create and fill in new LDLM lock with specified properties.
+ * Returns: a referenced lock
*/
struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
struct ldlm_lock *lock;
struct ldlm_resource *res;
int rc;
+
ENTRY;
- res = ldlm_resource_get(ns, NULL, res_id, type, 1);
+ res = ldlm_resource_get(ns, res_id, type, 1);
if (IS_ERR(res))
RETURN(ERR_CAST(res));
lock = ldlm_lock_new(res);
- if (lock == NULL)
+ if (!lock) {
+ ldlm_resource_putref(res);
RETURN(ERR_PTR(-ENOMEM));
+ }
lock->l_req_mode = mode;
lock->l_ast_data = data;
- lock->l_pid = current_pid();
+ lock->l_pid = current->pid;
if (ns_is_server(ns))
ldlm_set_ns_srv(lock);
if (cbs) {
switch (type) {
case LDLM_EXTENT:
+ case LDLM_FLOCK:
rc = ldlm_extent_alloc_lock(lock);
break;
case LDLM_IBITS:
}
lock->l_lvb_type = lvb_type;
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
+ if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
GOTO(out, rc = -ENOENT);
RETURN(lock);
ENTRY;
policy = ldlm_get_processing_policy(res);
-restart:
policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
- res->lr_type != LDLM_FLOCK) {
+ res->lr_type != LDLM_FLOCK)
rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list);
- if (rc == -ERESTART)
- GOTO(restart, rc);
- }
if (!list_empty(&rpc_list))
ldlm_discard_bl_list(&rpc_list);
void *cookie, __u64 *flags)
{
struct ldlm_lock *lock = *lockp;
- struct ldlm_resource *res = lock->l_resource;
- int local = ns_is_client(ldlm_res_to_ns(res));
+ struct ldlm_resource *res;
+ int local = ns_is_client(ns);
enum ldlm_error rc = ELDLM_OK;
struct ldlm_interval *node = NULL;
#ifdef HAVE_SERVER_SUPPORT
#endif
ENTRY;
- /* policies are not executed on the client or during replay */
- if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
- && !local && ns->ns_policy) {
+ /* policies are not executed on the client or during replay */
+ if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
+ && !local && ns->ns_policy) {
rc = ns->ns_policy(env, ns, lockp, cookie, lock->l_req_mode,
*flags, NULL);
- if (rc == ELDLM_LOCK_REPLACED) {
- /* The lock that was returned has already been granted,
- * and placed into lockp. If it's not the same as the
- * one we passed in, then destroy the old one and our
- * work here is done. */
- if (lock != *lockp) {
- ldlm_lock_destroy(lock);
- LDLM_LOCK_RELEASE(lock);
- }
- *flags |= LDLM_FL_LOCK_CHANGED;
- RETURN(0);
+ if (rc == ELDLM_LOCK_REPLACED) {
+ /* The lock that was returned has already been granted,
+ * and placed into lockp. If it's not the same as the
+ * one we passed in, then destroy the old one and our
+ * work here is done.
+ */
+ if (lock != *lockp) {
+ ldlm_lock_destroy(lock);
+ LDLM_LOCK_RELEASE(lock);
+ }
+ *flags |= LDLM_FL_LOCK_CHANGED;
+ RETURN(0);
} else if (rc != ELDLM_OK &&
ldlm_is_granted(lock)) {
LASSERT(*flags & LDLM_FL_RESENT);
* error occurs. It is unclear if lock reached the
* client in the original reply, just leave the lock on
* server, not returning it again to client. Due to
- * LU-6529, the server will not OOM. */
+ * LU-6529, the server will not OOM.
+ */
RETURN(rc);
- } else if (rc != ELDLM_OK ||
- (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
- ldlm_lock_destroy(lock);
- RETURN(rc);
- }
- }
+ } else if (rc != ELDLM_OK ||
+ (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
+ ldlm_lock_destroy(lock);
+ RETURN(rc);
+ }
+ }
if (*flags & LDLM_FL_RESENT) {
/* Reconstruct LDLM_FL_SRV_ENQ_MASK @flags for reply.
* Set LOCK_CHANGED always.
* Check if the lock is granted for BLOCK_GRANTED.
* Take NO_TIMEOUT from the lock as it is inherited through
- * LDLM_FL_INHERIT_MASK */
+ * LDLM_FL_INHERIT_MASK
+ */
*flags |= LDLM_FL_LOCK_CHANGED;
if (!ldlm_is_granted(lock))
*flags |= LDLM_FL_BLOCK_GRANTED;
RETURN(ELDLM_OK);
}
+#ifdef HAVE_SERVER_SUPPORT
/* For a replaying lock, it might be already in granted list. So
* unlinking the lock will cause the interval node to be freed, we
* have to allocate the interval node early otherwise we can't regrant
- * this lock in the future. - jay */
- if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
+ * this lock in the future. - jay
+ *
+ * The only time the ldlm_resource changes for the ldlm_lock is when
+ * ldlm_lock_change_resource() is called and that only happens for
+ * the Lustre client case.
+ */
+ if (!local && (*flags & LDLM_FL_REPLAY) &&
+ lock->l_resource->lr_type == LDLM_EXTENT)
OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
-#ifdef HAVE_SERVER_SUPPORT
- reconstruct = !local && res->lr_type == LDLM_FLOCK &&
+ reconstruct = !local && lock->l_resource->lr_type == LDLM_FLOCK &&
!(*flags & LDLM_FL_TEST_LOCK);
if (reconstruct) {
rc = req_can_reconstruct(cookie, NULL);
RETURN(rc);
}
}
-#endif
- lock_res_and_lock(lock);
+ if (!local && lock->l_resource->lr_type == LDLM_FLOCK) {
+ struct ldlm_flock_node *fn = &lock->l_resource->lr_flock_node;
+
+ if (lock->l_req_mode == LCK_NL) {
+ atomic_inc(&fn->lfn_unlock_pending);
+ res = lock_res_and_lock(lock);
+ atomic_dec(&fn->lfn_unlock_pending);
+ } else {
+ res = lock_res_and_lock(lock);
+
+ while (atomic_read(&fn->lfn_unlock_pending)) {
+ unlock_res_and_lock(lock);
+ cond_resched();
+ lock_res_and_lock(lock);
+ }
+ }
+ } else
+#endif
+ {
+ res = lock_res_and_lock(lock);
+ }
if (local && ldlm_is_granted(lock)) {
- /* The server returned a blocked lock, but it was granted
- * before we got a chance to actually enqueue it. We don't
- * need to do anything else. */
- *flags &= ~LDLM_FL_BLOCKED_MASK;
+ /* The server returned a blocked lock, but it was granted
+ * before we got a chance to actually enqueue it. We don't
+ * need to do anything else.
+ */
+ *flags &= ~LDLM_FL_BLOCKED_MASK;
GOTO(out, rc = ELDLM_OK);
- }
+ }
- ldlm_resource_unlink_lock(lock);
- if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
- if (node == NULL) {
- ldlm_lock_destroy_nolock(lock);
- GOTO(out, rc = -ENOMEM);
- }
+ ldlm_resource_unlink_lock(lock);
+ if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
+ if (node == NULL) {
+ ldlm_lock_destroy_nolock(lock);
+ GOTO(out, rc = -ENOMEM);
+ }
INIT_LIST_HEAD(&node->li_group);
- ldlm_interval_attach(node, lock);
- node = NULL;
- }
+ ldlm_interval_attach(node, lock);
+ node = NULL;
+ }
/* Some flags from the enqueue want to make it into the AST, via the
- * lock's l_flags. */
+ * lock's l_flags.
+ */
if (*flags & LDLM_FL_AST_DISCARD_DATA)
ldlm_set_ast_discard_data(lock);
if (*flags & LDLM_FL_TEST_LOCK)
ldlm_set_test_lock(lock);
- if (*flags & LDLM_FL_COS_INCOMPAT)
- ldlm_set_cos_incompat(lock);
- if (*flags & LDLM_FL_COS_ENABLED)
- ldlm_set_cos_enabled(lock);
/* This distinction between local lock trees is very important; a client
* namespace only has information about locks taken by that client, and
* more or less trusting the clients not to lie.
*
* FIXME (bug 268): Detect obvious lies by checking compatibility in
- * granted queue. */
- if (local) {
+ * granted queue.
+ */
+ if (local) {
if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
else
rc = ldlm_lock_enqueue_helper(lock, flags);
GOTO(out, rc);
#else
- } else {
- CERROR("This is client-side-only module, cannot handle "
- "LDLM_NAMESPACE_SERVER resource type lock.\n");
- LBUG();
- }
+ } else {
+ CERROR("This is client-side-only module, cannot handle LDLM_NAMESPACE_SERVER resource type lock.\n");
+ LBUG();
+ }
#endif
out:
- unlock_res_and_lock(lock);
+ unlock_res_and_lock(lock);
#ifdef HAVE_SERVER_SUPPORT
if (reconstruct) {
req, 0, NULL, false, 0);
}
#endif
- if (node)
- OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
- return rc;
+ if (node)
+ OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
+ return rc;
}
#ifdef HAVE_SERVER_SUPPORT
*/
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
struct list_head *work_list,
- enum ldlm_process_intention intention,
- struct ldlm_lock *hint)
+ enum ldlm_process_intention intention, __u64 hint)
{
struct list_head *tmp, *pos;
ldlm_processing_policy policy;
pending = list_entry(tmp, struct ldlm_lock, l_res_link);
- CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
+ CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
- flags = 0;
+ flags = 0;
rc = policy(pending, &flags, intention, &err, &rpc_list);
if (pending->l_granted_mode == pending->l_req_mode ||
res->lr_type == LDLM_FLOCK) {
if (rc != LDLM_ITER_CONTINUE &&
intention == LDLM_PROCESS_RESCAN)
break;
- }
+ }
if (!list_empty(&bl_ast_list)) {
unlock_res(res);
if (!list_empty(&bl_ast_list))
ldlm_discard_bl_list(&bl_ast_list);
- RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
+ RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
}
/**
* \param[in] rpc_list Conflicting locks list.
*
* \retval -ERESTART: Some lock was instantly canceled while sending
- * blocking ASTs, caller needs to re-check conflicting
- * locks.
+ * blocking ASTs, caller needs to re-check conflicting
+ * locks.
* \retval -EAGAIN: Lock was destroyed, caller should return error.
* \reval 0: Lock is successfully added in waiting list.
*/
{
struct ldlm_resource *res = lock->l_resource;
int rc;
+
ENTRY;
check_res_locked(res);
*
* bug 2322: we used to unlink and re-add here, which was a
* terrible folly -- if we goto restart, we could get
- * re-ordered! Causes deadlock, because ASTs aren't sent! */
+ * re-ordered! Causes deadlock, because ASTs aren't sent!
+ */
if (list_empty(&lock->l_res_link))
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
unlock_res(res);
rc = ldlm_run_ast_work(ldlm_res_to_ns(res), rpc_list,
LDLM_WORK_BL_AST);
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
+ if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
!ns_is_client(ldlm_res_to_ns(res)))
class_fail_export(lock->l_export);
+ if (rc == -ERESTART)
+ ldlm_reprocess_all(res, 0);
+
lock_res(res);
if (rc == -ERESTART) {
/* 15715: The lock was granted and destroyed after
* resource lock was dropped. Interval node was freed
* in ldlm_lock_destroy. Anyway, this always happens
* when a client is being evicted. So it would be
- * ok to return an error. -jay */
+ * ok to return an error. -jay
+ */
if (ldlm_is_destroyed(lock))
RETURN(-EAGAIN);
* to restart and ldlm_resource_unlink will be
* called and it causes the interval node to be
* freed. Then we will fail at
- * ldlm_extent_add_lock() */
+ * ldlm_extent_add_lock()
+ */
*flags &= ~LDLM_FL_BLOCKED_MASK;
- RETURN(0);
}
- RETURN(rc);
}
*flags |= LDLM_FL_BLOCK_GRANTED;
EXIT;
}
-/**
- * Process a call to blocking AST callback for a lock in ast_work list
- */
+/* Process a call to blocking AST callback for a lock in ast_work list */
static int
ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
{
if (list_empty(arg->list))
RETURN(-ENOENT);
- lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
+ lock = list_first_entry(arg->list, struct ldlm_lock, l_bl_ast);
/* nobody should touch l_bl_ast but some locks in the list may become
* granted after lock convert or COS downgrade, these locks should be
*/
bld.bl_same_client = lock->l_client_cookie ==
lock->l_blocking_lock->l_client_cookie;
- bld.bl_cos_incompat = ldlm_is_cos_incompat(lock->l_blocking_lock);
+ /* if two locks are initiated from the same MDT, transactions are
+ * independent, or the request lock mode isn't EX|PW, no need to trigger
+ * CoS because current lock will be downgraded to TXN mode soon, then
+ * the blocking lock can be granted.
+ */
+ if (lock->l_blocking_lock->l_policy_data.l_inodebits.li_initiator_id ==
+ lock->l_policy_data.l_inodebits.li_initiator_id ||
+ !(lock->l_blocking_lock->l_req_mode & (LCK_EX | LCK_PW)))
+ bld.bl_txn_dependent = false;
+ else
+ bld.bl_txn_dependent = true;
arg->bl_desc = &bld;
LASSERT(ldlm_is_ast_sent(lock));
RETURN(rc);
}
-/**
- * Process a call to revocation AST callback for a lock in ast_work list
- */
+/* Process a call to revocation AST callback for a lock in ast_work list */
static int
ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
{
struct ldlm_lock_desc desc;
int rc;
struct ldlm_lock *lock;
+
ENTRY;
if (list_empty(arg->list))
RETURN(-ENOENT);
- lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
+ lock = list_first_entry(arg->list, struct ldlm_lock, l_rk_ast);
list_del_init(&lock->l_rk_ast);
/* the desc just pretend to exclusive */
desc.l_req_mode = LCK_EX;
desc.l_granted_mode = 0;
- rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
+ rc = lock->l_blocking_ast(lock, &desc, (void *)arg, LDLM_CB_BLOCKING);
LDLM_LOCK_RELEASE(lock);
RETURN(rc);
}
-/**
- * Process a call to glimpse AST callback for a lock in ast_work list
- */
+/* Process a call to glimpse AST callback for a lock in ast_work list */
int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
{
struct ldlm_cb_set_arg *arg = opaq;
struct ldlm_glimpse_work *gl_work;
struct ldlm_lock *lock;
int rc = 0;
+
ENTRY;
if (list_empty(arg->list))
RETURN(-ENOENT);
- gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work,
- gl_list);
+ gl_work = list_first_entry(arg->list, struct ldlm_glimpse_work,
+ gl_list);
list_del_init(&gl_work->gl_list);
lock = gl_work->gl_lock;
arg->gl_interpret_data = gl_work->gl_interpret_data;
/* invoke the actual glimpse callback */
- if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
- rc = 1;
+ rc = lock->l_glimpse_ast(lock, (void *)arg);
+ if (rc == 0)
+ rc = 1; /* update LVB if this is server lock */
+ else if (rc == -ELDLM_NO_LOCK_DATA)
+ ldlm_lvbo_update(lock->l_resource, lock, NULL, 1);
LDLM_LOCK_RELEASE(lock);
if (gl_work->gl_flags & LDLM_GL_WORK_SLAB_ALLOCATED)
}
#endif
-/**
- * Process a call to completion AST callback for a lock in ast_work list
- */
+/* Process a call to completion AST callback for a lock in ast_work list */
static int
ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
{
if (list_empty(arg->list))
RETURN(-ENOENT);
- lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
+ lock = list_first_entry(arg->list, struct ldlm_lock, l_cp_ast);
/* It's possible to receive a completion AST before we've set
* the l_completion_ast pointer: either because the AST arrived
* This can't happen with the blocking_ast, however, because we
* will never call the local blocking_ast until we drop our
* reader/writer reference, which we won't do until we get the
- * reply and finish enqueueing. */
+ * reply and finish enqueueing.
+ */
/* nobody should touch l_cp_ast */
lock_res_and_lock(lock);
list_del_init(&lock->l_cp_ast);
LASSERT(ldlm_is_cp_reqd(lock));
/* save l_completion_ast since it can be changed by
- * mds_intent_policy(), see bug 14225 */
+ * mds_intent_policy(), see bug 14225
+ */
completion_callback = lock->l_completion_ast;
ldlm_clear_cp_reqd(lock);
unlock_res_and_lock(lock);
/* We create a ptlrpc request set with flow control extension.
* This request set will use the work_ast_lock function to produce new
* requests and will send a new request each time one completes in order
- * to keep the number of requests in flight to ns_max_parallel_ast */
+ * to keep the number of requests in flight to ns_max_parallel_ast
+ */
arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
work_ast_lock, arg);
if (arg->set == NULL)
*/
static void __ldlm_reprocess_all(struct ldlm_resource *res,
enum ldlm_process_intention intention,
- struct ldlm_lock *hint)
+ __u64 hint)
{
LIST_HEAD(rpc_list);
#ifdef HAVE_SERVER_SUPPORT
LDLM_WORK_CP_AST);
if (rc == -ERESTART) {
LASSERT(list_empty(&rpc_list));
+ hint = 0;
goto restart;
}
#else
ENTRY;
if (!ns_is_client(ldlm_res_to_ns(res))) {
- CERROR("This is client-side-only module, cannot handle "
- "LDLM_NAMESPACE_SERVER resource type lock.\n");
+ CERROR("This is client-side-only module, cannot handle LDLM_NAMESPACE_SERVER resource type lock.\n");
LBUG();
}
#endif
EXIT;
}
-void ldlm_reprocess_all(struct ldlm_resource *res, struct ldlm_lock *hint)
+void ldlm_reprocess_all(struct ldlm_resource *res, __u64 hint)
{
__ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN, hint);
}
struct ldlm_resource *res = cfs_hash_object(hs, hnode);
/* This is only called once after recovery done. LU-8306. */
- __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, NULL);
+ __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, 0);
return 0;
}
-/**
- * Iterate through all resources on a namespace attempting to grant waiting
- * locks.
- */
+/* Iterate on all resources on namespace attempting to grant waiting locks. */
void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns)
{
ENTRY;
EXIT;
}
-/**
- * Helper function to call blocking AST for LDLM lock \a lock in a
- * "cancelling" mode.
- */
+/* Helper to call blocking AST for LDLM lock \a lock in a "cancelling" mode. */
void ldlm_cancel_callback(struct ldlm_lock *lock)
{
check_res_locked(lock->l_resource);
/* only canceller can set bl_done bit */
ldlm_set_bl_done(lock);
- wake_up_all(&lock->l_waitq);
+ wake_up(&lock->l_waitq);
} else if (!ldlm_is_bl_done(lock)) {
- /* The lock is guaranteed to have been canceled once
- * returning from this function. */
+ /* lock is guaranteed to be canceled returning from function. */
unlock_res_and_lock(lock);
wait_event_idle(lock->l_waitq, is_bl_done(lock));
lock_res_and_lock(lock);
}
}
-/**
- * Remove skiplist-enabled LDLM lock \a req from granted list
- */
+/* Remove skiplist-enabled LDLM lock \a req from granted list */
void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
{
- if (req->l_resource->lr_type != LDLM_PLAIN &&
- req->l_resource->lr_type != LDLM_IBITS)
- return;
+ if (req->l_resource->lr_type != LDLM_PLAIN &&
+ req->l_resource->lr_type != LDLM_IBITS)
+ return;
list_del_init(&req->l_sl_policy);
list_del_init(&req->l_sl_mode);
}
-/**
- * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
- */
+/* Attempts to cancel LDLM lock \a lock that has no reader/writer references. */
void ldlm_lock_cancel(struct ldlm_lock *lock)
{
- struct ldlm_resource *res;
- struct ldlm_namespace *ns;
- ENTRY;
+ struct ldlm_resource *res;
+ struct ldlm_namespace *ns;
- lock_res_and_lock(lock);
+ ENTRY;
- res = lock->l_resource;
- ns = ldlm_res_to_ns(res);
+ lock_res_and_lock(lock);
- /* Please do not, no matter how tempting, remove this LBUG without
- * talking to me first. -phik */
- if (lock->l_readers || lock->l_writers) {
- LDLM_ERROR(lock, "lock still has references");
+ res = lock->l_resource;
+ ns = ldlm_res_to_ns(res);
+
+ /* Please do not remove this LBUG without talking to me first. -phik */
+ if (lock->l_readers || lock->l_writers) {
+ LDLM_ERROR(lock, "lock still has references");
unlock_res_and_lock(lock);
- LBUG();
- }
+ LBUG();
+ }
if (ldlm_is_waited(lock))
ldlm_del_waiting_lock(lock);
- /* Releases cancel callback. */
- ldlm_cancel_callback(lock);
+ /* Releases cancel callback. */
+ ldlm_cancel_callback(lock);
/* Yes, second time, just in case it was added again while we were
- * running with no res lock in ldlm_cancel_callback */
+ * running with no res lock in ldlm_cancel_callback
+ */
if (ldlm_is_waited(lock))
ldlm_del_waiting_lock(lock);
- ldlm_resource_unlink_lock(lock);
- ldlm_lock_destroy_nolock(lock);
+ ldlm_resource_unlink_lock(lock);
+ ldlm_lock_destroy_nolock(lock);
if (ldlm_is_granted(lock))
ldlm_pool_del(&ns->ns_pool, lock);
- /* Make sure we will not be called again for same lock what is possible
- * if not to zero out lock->l_granted_mode */
- lock->l_granted_mode = LCK_MINMODE;
- unlock_res_and_lock(lock);
+ /* should not be called again for same lock(zero out l_granted_mode) */
+ lock->l_granted_mode = LCK_MINMODE;
+ unlock_res_and_lock(lock);
- EXIT;
+ EXIT;
}
EXPORT_SYMBOL(ldlm_lock_cancel);
-/**
- * Set opaque data into the lock that only makes sense to upper layer.
- */
+/* Set opaque data into the lock that only makes sense to upper layer. */
int ldlm_lock_set_data(const struct lustre_handle *lockh, void *data)
{
- struct ldlm_lock *lock = ldlm_handle2lock(lockh);
- int rc = -EINVAL;
- ENTRY;
+ struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+ int rc = -EINVAL;
- if (lock) {
- if (lock->l_ast_data == NULL)
- lock->l_ast_data = data;
- if (lock->l_ast_data == data)
- rc = 0;
- LDLM_LOCK_PUT(lock);
- }
- RETURN(rc);
+ ENTRY;
+
+ if (lock) {
+ if (lock->l_ast_data == NULL)
+ lock->l_ast_data = data;
+ if (lock->l_ast_data == data)
+ rc = 0;
+ LDLM_LOCK_PUT(lock);
+ }
+ RETURN(rc);
}
EXPORT_SYMBOL(ldlm_lock_set_data);
ldlm_lvbo_update(res, lock, NULL, 1);
ldlm_lock_cancel(lock);
if (!exp->exp_obd->obd_stopping)
- ldlm_reprocess_all(res, lock);
+ ldlm_reprocess_all(res, lock->l_policy_data.l_inodebits.bits);
ldlm_resource_putref(res);
ecl->ecl_loop++;
spin_lock_bh(&exp->exp_bl_list_lock);
if (!list_empty(&exp->exp_bl_list)) {
- lock = list_entry(exp->exp_bl_list.next,
- struct ldlm_lock, l_exp_list);
+ lock = list_first_entry(&exp->exp_bl_list,
+ struct ldlm_lock, l_exp_list);
LDLM_LOCK_GET(lock);
list_del_init(&lock->l_exp_list);
} else {
lu_env_fini(&env);
- CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
- "left on hash table %d.\n", exp, ecl.ecl_loop,
- atomic_read(&exp->exp_lock_hash->hs_count));
+ CDEBUG(D_DLMTRACE,
+ "Export %p, canceled %d locks, left on hash table %d.\n", exp,
+ ecl.ecl_loop, atomic_read(&exp->exp_lock_hash->hs_count));
return ecl.ecl_loop;
}
cfs_hash_for_each_empty(exp->exp_lock_hash,
ldlm_cancel_locks_for_export_cb, &ecl);
- CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
- "left on hash table %d.\n", exp, ecl.ecl_loop,
- atomic_read(&exp->exp_lock_hash->hs_count));
+ CDEBUG(D_DLMTRACE,
+ "Export %p, canceled %d locks, left on hash table %d.\n", exp,
+ ecl.ecl_loop, atomic_read(&exp->exp_lock_hash->hs_count));
if (ecl.ecl_loop > 0 &&
atomic_read(&exp->exp_lock_hash->hs_count) == 0 &&
}
/**
- * Downgrade an PW/EX lock to COS | CR mode.
+ * Downgrade an PW/EX lock to COS, TXN or CR mode.
*
* A lock mode convertion from PW/EX mode to less conflict mode. The
* convertion may fail if lock was canceled before downgrade, but it doesn't
* things are cleared, so any pending or new blocked lock on that lock will
* cause new call to blocking_ast and force resource object commit.
*
+ * Used by DNE to force commit upon operation dependency.
+ *
* Also used by layout_change to replace EX lock to CR lock.
*
* \param lock A lock to convert
#ifdef HAVE_SERVER_SUPPORT
ENTRY;
- LASSERT(new_mode == LCK_COS || new_mode == LCK_CR);
+ LASSERT(new_mode == LCK_COS || new_mode == LCK_TXN ||
+ new_mode == LCK_CR);
lock_res_and_lock(lock);
ldlm_grant_lock(lock, NULL);
unlock_res_and_lock(lock);
- ldlm_reprocess_all(lock->l_resource, lock);
+ ldlm_reprocess_all(lock->l_resource,
+ lock->l_policy_data.l_inodebits.bits);
EXIT;
#endif
*/
void ldlm_lock_dump_handle(int level, const struct lustre_handle *lockh)
{
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock;
- if (!((libcfs_debug | D_ERROR) & level))
- return;
+ if (!((libcfs_debug | D_ERROR) & level))
+ return;
- lock = ldlm_handle2lock(lockh);
- if (lock == NULL)
- return;
+ lock = ldlm_handle2lock(lockh);
+ if (lock == NULL)
+ return;
- LDLM_DEBUG_LIMIT(level, lock, "###");
+ LDLM_DEBUG_LIMIT(level, lock, "###");
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_PUT(lock);
}
EXPORT_SYMBOL(ldlm_lock_dump_handle);
* Helper function.
*/
void _ldlm_lock_debug(struct ldlm_lock *lock,
- struct libcfs_debug_msg_data *msgdata,
- const char *fmt, ...)
+ struct libcfs_debug_msg_data *msgdata,
+ const char *fmt, ...)
{
- va_list args;
- struct obd_export *exp = lock->l_export;
+ va_list args;
+ struct obd_export *exp = lock->l_export;
struct ldlm_resource *resource = NULL;
struct va_format vaf;
- char *nid = "local";
+ char *nid = "local";
- /* on server-side resource of lock doesn't change */
- if ((lock->l_flags & LDLM_FL_NS_SRV) != 0) {
- if (lock->l_resource != NULL)
- resource = ldlm_resource_getref(lock->l_resource);
- } else if (spin_trylock(&lock->l_lock)) {
- if (lock->l_resource != NULL)
- resource = ldlm_resource_getref(lock->l_resource);
- spin_unlock(&lock->l_lock);
- }
+ rcu_read_lock();
+ resource = rcu_dereference(lock->l_resource);
+ if (resource && !refcount_inc_not_zero(&resource->lr_refcount))
+ resource = NULL;
+ rcu_read_unlock();
- va_start(args, fmt);
+ va_start(args, fmt);
vaf.fmt = fmt;
vaf.va = &args;
- if (exp && exp->exp_connection) {
+ if (exp && exp->exp_connection) {
nid = obd_export_nid2str(exp);
- } else if (exp && exp->exp_obd != NULL) {
- struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
+ } else if (exp && exp->exp_obd != NULL) {
+ struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
+
nid = obd_import_nid2str(imp);
- }
+ }
- if (resource == NULL) {
+ if (resource == NULL) {
libcfs_debug_msg(msgdata,
"%pV ns: \?\? lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: \?\? rrc=\?\? type: \?\?\? flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
&vaf,
lock->l_flags, nid,
lock->l_remote_handle.cookie,
exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
- lock->l_pid, lock->l_callback_timeout,
+ lock->l_pid, lock->l_callback_timestamp,
lock->l_lvb_type);
- va_end(args);
- return;
- }
+ va_end(args);
+ return;
+ }
switch (resource->lr_type) {
case LDLM_EXTENT:
libcfs_debug_msg(msgdata,
- "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s [%llu->%llu] (req %llu->%llu) flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+ "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s [%llu->%llu] (req %llu->%llu) gid %llu flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
&vaf,
ldlm_lock_to_ns_name(lock), lock,
lock->l_handle.h_cookie,
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
PLDLMRES(resource),
- atomic_read(&resource->lr_refcount),
+ refcount_read(&resource->lr_refcount),
ldlm_typename[resource->lr_type],
lock->l_policy_data.l_extent.start,
lock->l_policy_data.l_extent.end,
lock->l_req_extent.start, lock->l_req_extent.end,
+ lock->l_req_extent.gid,
lock->l_flags, nid,
lock->l_remote_handle.cookie,
exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
- lock->l_pid, lock->l_callback_timeout,
+ lock->l_pid, lock->l_callback_timestamp,
lock->l_lvb_type);
break;
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
PLDLMRES(resource),
- atomic_read(&resource->lr_refcount),
+ refcount_read(&resource->lr_refcount),
ldlm_typename[resource->lr_type],
lock->l_policy_data.l_flock.pid,
lock->l_policy_data.l_flock.start,
lock->l_flags, nid,
lock->l_remote_handle.cookie,
exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
- lock->l_pid, lock->l_callback_timeout);
+ lock->l_pid, lock->l_callback_timestamp);
break;
case LDLM_IBITS:
- libcfs_debug_msg(msgdata,
- "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx/%#llx rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+ if (!lock->l_remote_handle.cookie)
+ libcfs_debug_msg(msgdata,
+ "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx/%#llx rrc: %d type: %s flags: %#llx pid: %u initiator: MDT%d\n",
+ &vaf,
+ ldlm_lock_to_ns_name(lock),
+ lock, lock->l_handle.h_cookie,
+ refcount_read(&lock->l_handle.h_ref),
+ lock->l_readers, lock->l_writers,
+ ldlm_lockname[lock->l_granted_mode],
+ ldlm_lockname[lock->l_req_mode],
+ PLDLMRES(resource),
+ lock->l_policy_data.l_inodebits.bits,
+ lock->l_policy_data.l_inodebits.try_bits,
+ refcount_read(&resource->lr_refcount),
+ ldlm_typename[resource->lr_type],
+ lock->l_flags, lock->l_pid,
+ lock->l_policy_data.l_inodebits.li_initiator_id);
+ else
+ libcfs_debug_msg(msgdata,
+ "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx/%#llx rrc: %d type: %s gid %llu flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
&vaf,
ldlm_lock_to_ns_name(lock),
lock, lock->l_handle.h_cookie,
PLDLMRES(resource),
lock->l_policy_data.l_inodebits.bits,
lock->l_policy_data.l_inodebits.try_bits,
- atomic_read(&resource->lr_refcount),
+ refcount_read(&resource->lr_refcount),
ldlm_typename[resource->lr_type],
+ lock->l_policy_data.l_inodebits.li_gid,
lock->l_flags, nid,
lock->l_remote_handle.cookie,
exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
- lock->l_pid, lock->l_callback_timeout,
+ lock->l_pid, lock->l_callback_timestamp,
lock->l_lvb_type);
break;
ldlm_lockname[lock->l_granted_mode],
ldlm_lockname[lock->l_req_mode],
PLDLMRES(resource),
- atomic_read(&resource->lr_refcount),
+ refcount_read(&resource->lr_refcount),
ldlm_typename[resource->lr_type],
lock->l_flags, nid,
lock->l_remote_handle.cookie,
exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
- lock->l_pid, lock->l_callback_timeout,
+ lock->l_pid, lock->l_callback_timestamp,
lock->l_lvb_type);
break;
}