#include <linux/obd_class.h>
/* this lock protects ldlm_handle2lock's integrity */
-//static spinlock_t ldlm_handle_lock = SPIN_LOCK_UNLOCKED;
+struct lustre_lock ldlm_everything_lock;
/* lock types */
char *ldlm_lockname[] = {
*/
struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
{
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
lock->l_refc++;
ldlm_resource_getref(lock->l_resource);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
return lock;
}
struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
ENTRY;
- l_lock(&ns->ns_lock);
+ l_lock(&ldlm_everything_lock);
lock->l_refc--;
//LDLM_DEBUG(lock, "after refc--");
if (lock->l_refc < 0)
LBUG();
- if (ldlm_resource_put(lock->l_resource))
+ if (ldlm_resource_put(lock->l_resource)) {
+ LASSERT(lock->l_refc == 0);
lock->l_resource = NULL;
+ }
if (lock->l_parent)
LDLM_LOCK_PUT(lock->l_parent);
if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) {
- l_unlock(&ns->ns_lock);
LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
- //spin_lock(&ldlm_handle_lock);
spin_lock(&ns->ns_counter_lock);
ns->ns_locks--;
spin_unlock(&ns->ns_counter_lock);
- lock->l_resource = NULL;
lock->l_random = DEAD_HANDLE_MAGIC;
if (lock->l_export && lock->l_export->exp_connection)
ptlrpc_put_connection(lock->l_export->exp_connection);
kmem_cache_free(ldlm_lock_slab, lock);
- //spin_unlock(&ldlm_handle_lock);
+ l_unlock(&ldlm_everything_lock);
CDEBUG(D_MALLOC, "kfreed 'lock': %d at %p (tot 0).\n",
sizeof(*lock), lock);
} else
- l_unlock(&ns->ns_lock);
+ l_unlock(&ldlm_everything_lock);
EXIT;
}
void ldlm_lock_destroy(struct ldlm_lock *lock)
{
ENTRY;
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
if (!list_empty(&lock->l_children)) {
LDLM_DEBUG(lock, "still has children (%p)!",
}
if (lock->l_flags & LDLM_FL_DESTROYED) {
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
EXIT;
return;
}
if (lock->l_export && lock->l_completion_ast)
lock->l_completion_ast(lock, 0);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
LDLM_LOCK_PUT(lock);
EXIT;
}
spin_unlock(&resource->lr_namespace->ns_counter_lock);
if (parent != NULL) {
- l_lock(&parent->l_resource->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
lock->l_parent = parent;
list_add(&lock->l_childof, &parent->l_children);
- l_unlock(&parent->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
}
CDEBUG(D_MALLOC, "kmalloced 'lock': %d at "
int type, i;
ENTRY;
- l_lock(&ns->ns_lock);
+ l_lock(&ldlm_everything_lock);
if (memcmp(new_resid, lock->l_resource->lr_name,
sizeof(lock->l_resource->lr_name)) == 0) {
/* Nothing to do */
- l_unlock(&ns->ns_lock);
+ l_unlock(&ldlm_everything_lock);
RETURN(0);
}
/* compensate for the initial get above.. */
ldlm_resource_put(lock->l_resource);
- l_unlock(&ns->ns_lock);
+ l_unlock(&ldlm_everything_lock);
RETURN(0);
}
lockh->cookie = lock->l_random;
}
-struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle)
+struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle,
+ int strict)
{
struct ldlm_lock *lock = NULL, *retval = NULL;
ENTRY;
if (!handle || !handle->addr)
RETURN(NULL);
- //spin_lock(&ldlm_handle_lock);
+ l_lock(&ldlm_everything_lock);
lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock)) {
CERROR("bogus lock %p\n", lock);
- GOTO(out2, retval);
+ GOTO(out, retval);
}
if (lock->l_random != handle->cookie) {
CERROR("bogus cookie: lock %p has "LPX64" vs. handle "LPX64"\n",
lock, lock->l_random, handle->cookie);
- GOTO(out2, NULL);
+ GOTO(out, NULL);
}
if (!lock->l_resource) {
CERROR("trying to lock bogus resource: lock %p\n", lock);
LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
- GOTO(out2, retval);
+ GOTO(out, retval);
}
if (!lock->l_resource->lr_namespace) {
CERROR("trying to lock bogus namespace: lock %p\n", lock);
LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
- GOTO(out2, retval);
+ GOTO(out, retval);
}
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
- if (lock->l_flags & LDLM_FL_DESTROYED) {
+ if (strict && lock->l_flags & LDLM_FL_DESTROYED) {
CERROR("lock already destroyed: lock %p\n", lock);
LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
GOTO(out, NULL);
CERROR("lock disappeared below us!!! %p\n", lock);
EXIT;
out:
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- out2:
- //spin_unlock(&ldlm_handle_lock);
+ l_unlock(&ldlm_everything_lock);
return retval;
}
struct ldlm_ast_work *w;
ENTRY;
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
if (new && (lock->l_flags & LDLM_FL_AST_SENT))
GOTO(out, 0);
w->w_lock = LDLM_LOCK_GET(lock);
list_add(&w->w_list, lock->l_resource->lr_tmp);
out:
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
return;
}
/* only called for local locks */
void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
{
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
lock->l_readers++;
else
lock->l_writers++;
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
LDLM_LOCK_GET(lock);
LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
}
/* Args: unlocked lock */
void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
{
- struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+ struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
ENTRY;
if (lock == NULL)
LBUG();
LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
lock->l_readers--;
else
"warning\n");
LDLM_DEBUG(lock, "final decref done on cbpending lock");
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
/* FIXME: need a real 'desc' here */
lock->l_blocking_ast(lock, NULL, lock->l_data,
lock->l_data_len, LDLM_CB_BLOCKING);
} else
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
LDLM_LOCK_PUT(lock); /* matches the handle2lock above */
int rc;
ENTRY;
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
rc = ldlm_lock_compat_list(lock, send_cbs,
&lock->l_resource->lr_granted);
/* FIXME: should we be sending ASTs to converting? */
rc = ldlm_lock_compat_list
(lock, send_cbs, &lock->l_resource->lr_converting);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
RETURN(rc);
}
struct ldlm_resource *res = lock->l_resource;
ENTRY;
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
ldlm_resource_add_lock(res, &res->lr_granted, lock);
lock->l_granted_mode = lock->l_req_mode;
if (lock->l_completion_ast) {
ldlm_add_ast_work_item(lock, NULL);
}
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
EXIT;
}
if (res == NULL)
RETURN(0);
- ns = res->lr_namespace;
- l_lock(&ns->ns_lock);
+ l_lock(&ldlm_everything_lock);
if ((lock = search_queue(&res->lr_granted, mode, cookie)))
GOTO(out, rc = 1);
EXIT;
out:
ldlm_resource_put(res);
- l_unlock(&ns->ns_lock);
+ l_unlock(&ldlm_everything_lock);
if (lock) {
ldlm_lock2handle(lock, lockh);
lock->l_cookie = cookie;
lock->l_cookie_len = cookie_len;
- l_lock(&res->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
if (local && lock->l_req_mode == lock->l_granted_mode) {
/* The server returned a blocked lock, but it was granted before
* we got a chance to actually enqueue it. We don't need to do
ldlm_grant_lock(lock);
EXIT;
out:
- l_unlock(&res->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
/* Don't set 'completion_ast' until here so that if the lock is granted
* immediately we don't do an unnecessary completion call. */
lock->l_completion_ast = completion;
return;
}
- l_lock(&res->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
res->lr_tmp = &rpc_list;
ldlm_reprocess_queue(res, &res->lr_converting);
ldlm_reprocess_queue(res, &res->lr_waiting);
res->lr_tmp = NULL;
- l_unlock(&res->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
ldlm_run_ast_work(&rpc_list);
EXIT;
void ldlm_cancel_callback(struct ldlm_lock *lock)
{
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
if (!(lock->l_flags & LDLM_FL_CANCEL)) {
lock->l_flags |= LDLM_FL_CANCEL;
lock->l_blocking_ast(lock, NULL, lock->l_data,
lock->l_data_len, LDLM_CB_CANCELING);
}
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
}
void ldlm_lock_cancel(struct ldlm_lock *lock)
res = lock->l_resource;
ns = res->lr_namespace;
- l_lock(&ns->ns_lock);
+ l_lock(&ldlm_everything_lock);
if (lock->l_readers || lock->l_writers)
CDEBUG(D_INFO, "lock still has references (%d readers, %d "
"writers)\n", lock->l_readers, lock->l_writers);
ldlm_del_waiting_lock(lock);
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy(lock);
- l_unlock(&ns->ns_lock);
+ l_unlock(&ldlm_everything_lock);
EXIT;
}
res = lock->l_resource;
ns = res->lr_namespace;
- l_lock(&ns->ns_lock);
+ l_lock(&ldlm_everything_lock);
lock->l_req_mode = new_mode;
ldlm_resource_unlink_lock(lock);
*flags |= LDLM_FL_BLOCK_CONV;
}
- l_unlock(&ns->ns_lock);
+ l_unlock(&ldlm_everything_lock);
if (granted)
ldlm_run_ast_work(&rpc_list);
strcpy(ns->ns_name, name);
INIT_LIST_HEAD(&ns->ns_root_list);
- l_lock_init(&ns->ns_lock);
ns->ns_refcount = 0;
ns->ns_client = client;
spin_lock_init(&ns->ns_counter_lock);
{
int i;
- l_lock(&ns->ns_lock);
+ l_lock(&ldlm_everything_lock);
for (i = 0; i < RES_HASH_SIZE; i++) {
struct list_head *tmp, *pos;
list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
cleanup_resource(res, &res->lr_converting, local_only);
cleanup_resource(res, &res->lr_waiting, local_only);
+ /* XXX this is a bit counter-intuitive and should
+ * probably be cleaner: don't force cleanup if we're
+ * local_only (which is only used by recovery). We
+ * probably still have outstanding lock refs which
+ * reference these resources. -phil */
if (!ldlm_resource_put(res) && !local_only) {
CERROR("Resource refcount nonzero (%d) after "
"lock cleanup; forcing cleanup.\n",
}
}
}
- l_unlock(&ns->ns_lock);
+ l_unlock(&ldlm_everything_lock);
return ELDLM_OK;
}
RETURN(NULL);
}
- l_lock(&ns->ns_lock);
+ l_lock(&ldlm_everything_lock);
bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
list_for_each(tmp, bucket) {
if (res == NULL && create)
res = ldlm_resource_add(ns, parent, name, type);
- l_unlock(&ns->ns_lock);
+ l_unlock(&ldlm_everything_lock);
RETURN(res);
}
struct ldlm_namespace *ns = res->lr_namespace;
ENTRY;
- l_lock(&ns->ns_lock);
+ l_lock(&ldlm_everything_lock);
if (atomic_read(&res->lr_refcount) != 0) {
/* We lost the race. */
- l_unlock(&ns->ns_lock);
+ l_unlock(&ldlm_everything_lock);
goto out;
}
list_del(&res->lr_childof);
kmem_cache_free(ldlm_resource_slab, res);
- l_unlock(&ns->ns_lock);
+ l_unlock(&ldlm_everything_lock);
spin_lock(&ns->ns_counter_lock);
ns->ns_resources--;
void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
struct ldlm_lock *lock)
{
- l_lock(&res->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
ldlm_resource_dump(res);
ldlm_lock_dump(lock);
LBUG();
list_add(&lock->l_res_link, head);
- l_unlock(&res->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
}
void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
{
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_lock(&ldlm_everything_lock);
list_del_init(&lock->l_res_link);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ldlm_everything_lock);
}
void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
{
struct list_head *tmp;
- l_lock(&ns->ns_lock);
+ l_lock(&ldlm_everything_lock);
CDEBUG(D_OTHER, "--- Namespace: %s (rc: %d, client: %d)\n", ns->ns_name,
ns->ns_refcount, ns->ns_client);
* them recursively. */
ldlm_resource_dump(res);
}
- l_unlock(&ns->ns_lock);
+ l_unlock(&ldlm_everything_lock);
}
void ldlm_resource_dump(struct ldlm_resource *res)