#include <linux/lustre_dlm.h>
#include <linux/lustre_mds.h>
-extern kmem_cache_t *ldlm_lock_slab;
+static kmem_cache_t *ldlm_lock_slab;
int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
[LDLM_MDSINTENT] ldlm_intent_policy
};
+void ldlm_lock2handle(struct ldlm_lock *lock, struct ldlm_handle *lockh)
+{
+ handle->addr = (__u64)(unsigned long)lock;
+ handle->cookie = lock->l_cookie;
+}
+
+struct *ldlm_handle2lock(struct ldlm_handle *handle)
+{
+ struct ldlm_lock *lock = NULL;
+ ENTRY;
+
+ if (!handle)
+ RETURN(NULL);
+ lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
+
+ if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock))
+ RETURN(NULL);
+
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ if (lock->l_cookie != handle->cookie)
+ GOTO(out, handle = NULL);
+
+ if (lock->l_flags & LDLM_FL_DESTROYED)
+ GOTO(out, handle = NULL);
+
+ lock->l_refc++;
+ EXIT;
+ out:
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ return handle;
+}
+
+struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
+{
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock->l_refc++;
+ ldlm_resource_getref(lock->l_resource);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ return lock;
+}
+
+void ldlm_lock_put(struct ldlm_lock *lock)
+{
+ struct l_lock *nslock = &lock->l_resource->lr_namespace->ns_lock;
+ ENTRY;
+
+ l_lock(&nslock);
+ lock->l_refc--;
+ if (lock->l_refc < 0)
+ LBUG();
+
+ ldlm_resource_put(lock->l_resource);
+ if (lock->l_parent)
+ ldlm_lock_put(lock->l_parent);
+
+ if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) {
+ if (lock->l_connection)
+ ptlrpc_put_connection(lock->l_connection);
+ kmem_cache_free(ldlm_lock_slab, lock);
+ }
+ l_unlock(&nslock);
+ EXIT;
+ return;
+}
+
+void ldlm_lock_destroy(struct ldlm_lock *lock)
+{
+ ENTRY;
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+
+ if (!list_empty(&lock->l_children)) {
+ CERROR("lock %p still has children (%p)!\n", lock,
+ lock->l_children.next);
+ ldlm_lock_dump(lock);
+ LBUG();
+ }
+ if (lock->l_readers || lock->l_writers) {
+ CDEBUG(D_INFO, "lock still has references (%d readers, %d "
+ "writers)\n", lock->l_readers, lock->l_writers);
+ LBUG();
+ }
+
+ if (!list_empty(lock->l_res))
+ LBUG();
+
+ lock->l_flags = LDLM_FL_DESTROYED;
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ ldlm_lock_put(lock);
+ EXIT;
+ return;
+}
+/*
+ usage: pass in a resource on which you have done get
+ pass in a parent lock on which you have done a get
+ do not put the resource or the parent
+ returns: lock with refcount 1
+*/
+static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
+ struct ldlm_resource *resource)
+{
+ struct ldlm_lock *lock;
+ ENTRY;
+
+ if (resource == NULL)
+ LBUG();
+
+ lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
+ if (lock == NULL)
+ RETURN(NULL);
+
+ memset(lock, 0, sizeof(*lock));
+ get_random_bytes(&lock->l_cookie, sizeof(__u64));
+
+ lock->l_resource = resource;
+ lock->l_refc = 1;
+ INIT_LIST_HEAD(&lock->l_children);
+ INIT_LIST_HEAD(&lock->l_res_link);
+ init_waitqueue_head(&lock->l_waitq);
+
+ if (parent != NULL) {
+ l_lock(&parent->l_resource->lr_namespace->ns_lock);
+ lock->l_parent = parent;
+ list_add(&lock->l_childof, &parent->l_children);
+ l_unlock(&parent->l_resource->lr_namespace->ns_lock);
+ }
+ RETURN(lock);
+}
+
+int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
+{
+ struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
+ int type, i;
+ ENTRY;
+
+ l_lock(&ns->ns_lock);
+ type = lock->l_resource->lr_type;
+
+ for (i = 0; i < lock->l_refc; i++) {
+ int rc;
+ rc = ldlm_resource_put(lock->l_resource);
+ if (rc == 1 && i != lock->l_refc - 1)
+ LBUG();
+ }
+
+ lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1);
+ if (lock->l_resource == NULL) {
+ LBUG();
+ RETURN(-ENOMEM);
+ }
+
+ for (i = 1; i < lock->l_refc; i++)
+ ldlm_resource_addref(lock->l_resource);
+
+ l_unlock(&ns->ns_lock);
+ RETURN(0);
+}
+
static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
ldlm_mode_t mode, void *data)
{
CDEBUG(D_INFO, "remote intent: locking %d instead of"
"%ld\n", mds_rep->ino, (long)old_res);
- spin_lock(&lock->l_resource->lr_lock);
- if (!ldlm_resource_put(lock->l_resource))
- /* unlock it unless the resource was freed */
- spin_unlock(&lock->l_resource->lr_lock);
- lock->l_resource =
- ldlm_resource_get(ns, NULL, new_resid, type, 1);
+ ldlm_lock_change_resource(lock, new_resid);
if (lock->l_resource == NULL) {
LBUG();
RETURN(-ENOMEM);
return lockmode_compat(a->l_req_mode, b->l_req_mode);
}
-/* Args: referenced, unlocked parent (or NULL)
- * referenced, unlocked resource
- * Locks: parent->l_lock */
-static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
- struct ldlm_resource *resource)
-{
- struct ldlm_lock *lock;
-
- if (resource == NULL)
- LBUG();
-
- lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
- if (lock == NULL)
- return NULL;
-
- memset(lock, 0, sizeof(*lock));
- lock->l_resource = resource;
- INIT_LIST_HEAD(&lock->l_children);
- INIT_LIST_HEAD(&lock->l_res_link);
- init_waitqueue_head(&lock->l_waitq);
- lock->l_lock = SPIN_LOCK_UNLOCKED;
-
- if (parent != NULL) {
- spin_lock(&parent->l_lock);
- lock->l_parent = parent;
- list_add(&lock->l_childof, &parent->l_children);
- spin_unlock(&parent->l_lock);
- }
-
- return lock;
-}
-
/* Args: unreferenced, locked lock
*
* Caller must do its own ldlm_resource_put() on lock->l_resource */
void ldlm_lock_free(struct ldlm_lock *lock)
{
if (!list_empty(&lock->l_children)) {
- CERROR("lock %p still has children (%p)!\n", lock,
- lock->l_children.next);
+ CERROR("lock %p still has children!\n", lock);
ldlm_lock_dump(lock);
LBUG();
}
memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
}
-/* Args: unlocked lock */
-void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
-{
- spin_lock(&lock->l_lock);
- if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
- lock->l_readers++;
- else
- lock->l_writers++;
- spin_unlock(&lock->l_lock);
-}
-
-int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
+static int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
{
struct ptlrpc_request *req = NULL;
ENTRY;
- spin_lock(&lock->l_lock);
- if (lock->l_flags & LDLM_FL_AST_SENT)
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ if (lock->l_flags & LDLM_FL_AST_SENT) {
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
RETURN(0);
+ }
lock->l_flags |= LDLM_FL_AST_SENT;
lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req);
- spin_unlock(&lock->l_lock);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
if (req != NULL) {
struct list_head *list = lock->l_resource->lr_tmp;
list_add(&req->rq_multi, list);
}
/* Args: unlocked lock */
+void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
+{
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+ lock->l_readers++;
+ else
+ lock->l_writers++;
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ ldlm_lock_get(lock);
+}
+
+/* Args: unlocked lock */
void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
{
ENTRY;
if (lock == NULL)
LBUG();
- spin_lock(&lock->l_lock);
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
lock->l_readers--;
else
lock->l_writers--;
+
+ /* If we received a blocked AST and this was the last reference,
+ * run the callback. */
if (!lock->l_readers && !lock->l_writers &&
- lock->l_flags & LDLM_FL_DYING) {
- /* Read this lock its rights. */
+ (lock->l_flags & LDLM_FL_BLOCKED_PENDING)) {
if (!lock->l_resource->lr_namespace->ns_client) {
CERROR("LDLM_FL_DYING set on non-local lock!\n");
LBUG();
CDEBUG(D_INFO, "final decref done on dying lock, "
"calling callback.\n");
- spin_unlock(&lock->l_lock);
- /* This function pointer is unfortunately overloaded. This
- * call will not result in an RPC. */
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+
lock->l_blocking_ast(lock, NULL, lock->l_data,
lock->l_data_len, NULL);
} else
- spin_unlock(&lock->l_lock);
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+
+ ldlm_lock_put(lock);
+
EXIT;
}
-/* Args: unlocked lock */
static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
struct list_head *queue)
{
return rc;
}
-/* Args: unlocked lock */
static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
{
int rc;
ENTRY;
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
/* FIXME: should we be sending ASTs to converting? */
rc |= _ldlm_lock_compat(lock, send_cbs,
&lock->l_resource->lr_converting);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
RETURN(rc);
}
-/* Args: locked lock, locked resource */
void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
{
struct ptlrpc_request *req = NULL;
struct lustre_handle *lockh)
{
struct ldlm_resource *res;
+ struct ldlm_namespace *ns;
int rc = 0;
ENTRY;
if (res == NULL)
RETURN(0);
- spin_lock(&res->lr_lock);
+ ns = res->lr_namespace;
+ l_lock(&ns->ns_lock);
+
if (search_queue(&res->lr_granted, mode, cookie, lockh))
GOTO(out, rc = 1);
if (search_queue(&res->lr_converting, mode, cookie, lockh))
EXIT;
out:
- if (!ldlm_resource_put(res))
- spin_unlock(&res->lr_lock);
+ ldlm_resource_put(res);
+ l_unlock(&ns->ns_lock);
+
return rc;
}
lock = ldlm_lock_new(parent_lock, res);
if (lock == NULL) {
- spin_lock(&res->lr_lock);
- if (!ldlm_resource_put(res))
- spin_unlock(&res->lr_lock);
+ ldlm_resource_put(res);
RETURN(-ENOMEM);
}
lock = lustre_handle2object(lockh);
res = lock->l_resource;
local = res->lr_namespace->ns_client;
- spin_lock(&res->lr_lock);
lock->l_blocking_ast = blocking;
/* We do this dancing with refcounts and locks because the
* policy function could send an RPC */
- res->lr_refcount++;
- spin_unlock(&res->lr_lock);
+ ldlm_resource_addref(res);
rc = policy(lock, cookie, lock->l_req_mode, NULL);
- spin_lock(&res->lr_lock);
- if (ldlm_resource_put(res))
- res = NULL;
+ if (ldlm_resource_put(res) && rc != ELDLM_LOCK_CHANGED)
+ /* ldlm_resource_put() should not destroy 'res' unless
+ * 'res' is no longer the resource for this lock. */
+ LBUG();
if (rc == ELDLM_LOCK_CHANGED) {
- if (res)
- spin_unlock(&res->lr_lock);
res = lock->l_resource;
- spin_lock(&res->lr_lock);
*flags |= LDLM_FL_LOCK_CHANGED;
} else if (rc == ELDLM_LOCK_ABORTED) {
- /* Abort. */
- if (res && !ldlm_resource_put(res))
- spin_unlock(&res->lr_lock);
- ldlm_lock_free(lock);
+ ldlm_lock_destroy(lock);
RETURN(rc);
}
}
/* The server returned a blocked lock, but it was granted before
* we got a chance to actually enqueue it. We don't need to do
* anything else. */
- GOTO(out_noput, ELDLM_OK);
+ GOTO(out, ELDLM_OK);
}
/* If this is a local resource, put it on the appropriate list. */
ldlm_grant_lock(res, lock);
EXIT;
out:
- /* We're called with a lock that has a referenced resource and is not on
- * any resource list. When we added it to a list, we incurred an extra
- * reference. */
- if (ldlm_resource_put(lock->l_resource))
- res = NULL;
- out_noput:
/* Don't set 'completion_ast' until here so that if the lock is granted
* immediately we don't do an unnecessary completion call. */
lock->l_completion_ast = completion;
- if (res)
- spin_unlock(&res->lr_lock);
return ELDLM_OK;
}
CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
- /* the resource lock protects ldlm_lock_compat */
if (ldlm_lock_compat(pending, 1))
RETURN(1);
return;
}
- spin_lock(&res->lr_lock);
+ l_lock(&res->lr_namespace->ns_lock);
res->lr_tmp = &rpc_list;
ldlm_reprocess_queue(res, &res->lr_converting);
ldlm_reprocess_queue(res, &res->lr_waiting);
res->lr_tmp = NULL;
- spin_unlock(&res->lr_lock);
+ l_unlock(&res->lr_namespace->ns_lock);
ldlm_send_delayed_asts(&rpc_list);
EXIT;
struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
{
struct ldlm_resource *res;
+ struct ldlm_namespace *ns;
ENTRY;
res = lock->l_resource;
+ ns = res->lr_namespace;
- spin_lock(&res->lr_lock);
- spin_lock(&lock->l_lock);
-
+ l_lock(&ns->ns_lock);
if (lock->l_readers || lock->l_writers)
CDEBUG(D_INFO, "lock still has references (%d readers, %d "
"writers)\n", lock->l_readers, lock->l_writers);
- if (ldlm_resource_del_lock(lock))
- res = NULL; /* res was freed, nothing else to do. */
- else
- spin_unlock(&res->lr_lock);
- ldlm_lock_free(lock);
+ ldlm_resource_del_lock(lock);
+ ldlm_lock_destroy(lock);
+
+ l_unlock(&ns->ns_lock);
RETURN(res);
}
{
struct ldlm_lock *lock;
struct ldlm_resource *res;
+ struct ldlm_namespace *ns;
ENTRY;
lock = lustre_handle2object(lockh);
res = lock->l_resource;
+ ns = res->lr_namespace;
- spin_lock(&res->lr_lock);
+ l_lock(&ns->ns_lock);
lock->l_req_mode = new_mode;
- list_del_init(&lock->l_res_link);
+ ldlm_resource_del_lock(lock);
/* If this is a local resource, put it on the appropriate list. */
if (res->lr_namespace->ns_client) {
list_add(&lock->l_res_link, res->lr_converting.prev);
}
- spin_unlock(&res->lr_lock);
+ l_unlock(&ns->ns_lock);
RETURN(res);
}
&ns->ns_rpc_client);
INIT_LIST_HEAD(&ns->ns_root_list);
- ns->ns_lock = SPIN_LOCK_UNLOCKED;
+ l_lock_init(&ns->ns_lock);
ns->ns_refcount = 0;
ns->ns_client = client;
INIT_LIST_HEAD(bucket);
RETURN(ns);
- out:
+ out:
if (ns && ns->ns_hash)
vfree(ns->ns_hash);
if (ns && ns->ns_name)
OBD_FREE(ns->ns_name, strlen(name) + 1);
- if (ns)
+ if (ns)
OBD_FREE(ns, sizeof(*ns));
return NULL;
}
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
if (client) {
- spin_unlock(&res->lr_lock);
rc = ldlm_cli_cancel(lock->l_client, lock);
if (rc < 0) {
CERROR("ldlm_cli_cancel: %d\n", rc);
}
if (rc == ELDLM_RESOURCE_FREED)
rc = 1;
- else
- spin_lock(&res->lr_lock);
} else {
CERROR("Freeing a lock still held by a client node.\n");
- spin_lock(&lock->l_lock);
ldlm_resource_del_lock(lock);
ldlm_lock_free(lock);
if (!ns)
RETURN(ELDLM_OK);
- /* We should probably take the ns_lock, but then ldlm_resource_put
- * couldn't take it. Hmm. */
+
+ l_lock(&ns->ns_lock);
+
for (i = 0; i < RES_HASH_SIZE; i++) {
list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
struct ldlm_resource *res;
res = list_entry(tmp, struct ldlm_resource, lr_hash);
- spin_lock(&res->lr_lock);
rc = cleanup_resource(res, &res->lr_granted);
if (!rc)
rc = cleanup_resource(res, &res->lr_converting);
}
vfree(ns->ns_hash /* , sizeof(struct list_head) * RES_HASH_SIZE */);
- ptlrpc_cleanup_client(&ns->ns_rpc_client);
+ ptlrpc_cleanup_client(&ns->ns_rpc_client);
OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
OBD_FREE(ns, sizeof(*ns));
INIT_LIST_HEAD(&res->lr_converting);
INIT_LIST_HEAD(&res->lr_waiting);
- res->lr_lock = SPIN_LOCK_UNLOCKED;
- res->lr_refcount = 1;
+ atomic_set(&res->lr_refcount, 1);
return res;
}
res->lr_namespace = ns;
ns->ns_refcount++;
- res->lr_type = type;
+ res->lr_type = type;
res->lr_most_restr = LCK_NL;
bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
RETURN(NULL);
}
- spin_lock(&ns->ns_lock);
+ l_lock(&ns->ns_lock);
bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
list_for_each(tmp, bucket) {
if (memcmp(chk->lr_name, name, sizeof(chk->lr_name)) == 0) {
res = chk;
- spin_lock(&res->lr_lock);
- res->lr_refcount++;
- spin_unlock(&res->lr_lock);
+ atomic_inc(&res->lr_refcount);
EXIT;
break;
}
if (res == NULL && create)
res = ldlm_resource_add(ns, parent, name, type);
- spin_unlock(&ns->ns_lock);
+ l_unlock(&ns->ns_lock);
RETURN(res);
}
-/* Args: locked resource
- * Locks: takes and releases res->lr_lock
- * takes and releases ns->ns_lock iff res->lr_refcount falls to 0
- */
+struct ldlm_resource *ldlm_resource_addref(struct ldlm_resource *res)
+{
+ atomic_inc(&res->lr_refcount);
+ return res;
+}
+
+/* Returns 1 if the resource was freed, 0 if it remains. */
int ldlm_resource_put(struct ldlm_resource *res)
{
int rc = 0;
- if (res->lr_refcount == 1) {
+ if (atomic_dec_and_test(&res->lr_refcount)) {
struct ldlm_namespace *ns = res->lr_namespace;
ENTRY;
- spin_unlock(&res->lr_lock);
- spin_lock(&ns->ns_lock);
- spin_lock(&res->lr_lock);
+ l_lock(&ns->ns_lock);
- if (res->lr_refcount != 1) {
- spin_unlock(&ns->ns_lock);
+ if (atomic_read(&res->lr_refcount) != 0) {
+ /* We lost the race. */
+ l_unlock(&ns->ns_lock);
goto out;
}
list_del(&res->lr_childof);
kmem_cache_free(ldlm_resource_slab, res);
- spin_unlock(&ns->ns_lock);
+ l_unlock(&ns->ns_lock);
rc = 1;
} else {
ENTRY;
out:
- res->lr_refcount--;
- if (res->lr_refcount < 0)
+ if (atomic_read(&res->lr_refcount) < 0)
LBUG();
}
- RETURN(rc);
+ RETURN(rc);
}
-/* Must be called with resource->lr_lock taken */
void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
struct ldlm_lock *lock)
{
- ldlm_resource_dump(res);
- ldlm_lock_dump(lock);
+ l_lock(&res->lr_namespace->ns_lock);
+
+ ldlm_resource_dump(res);
+ ldlm_lock_dump(lock);
+
if (!list_empty(&lock->l_res_link))
LBUG();
list_add(&lock->l_res_link, head);
- res->lr_refcount++;
+ l_unlock(&res->lr_namespace->ns_lock);
}
-/* Must be called with resource->lr_lock taken */
-int ldlm_resource_del_lock(struct ldlm_lock *lock)
+void ldlm_resource_del_lock(struct ldlm_lock *lock)
{
- if (!list_empty(&lock->l_res_link)) {
- list_del_init(&lock->l_res_link);
- return ldlm_resource_put(lock->l_resource);
- }
- return 0;
+ l_lock(&res->lr_namespace->ns_lock);
+ list_del_init(&lock->l_res_link);
+ l_unlock(&res->lr_namespace->ns_lock);
}
int ldlm_get_resource_handle(struct ldlm_resource *res, struct lustre_handle *h)