From: pschwan Date: Mon, 24 Jun 2002 03:17:07 +0000 (+0000) Subject: First swing at DLM lock cleanup. Do not update your tree if you value X-Git-Tag: 0.4.2~67 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=e781b979552f276100a54648725830e2ffd34fc1;p=fs%2Flustre-release.git First swing at DLM lock cleanup. Do not update your tree if you value compilation. --- diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 4c922d9..ac10c5f 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -18,7 +18,7 @@ #include #include -extern kmem_cache_t *ldlm_lock_slab; +static kmem_cache_t *ldlm_lock_slab; int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL; int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL; @@ -38,6 +38,163 @@ ldlm_res_policy ldlm_res_policy_table [] = { [LDLM_MDSINTENT] ldlm_intent_policy }; +void ldlm_lock2handle(struct ldlm_lock *lock, struct ldlm_handle *lockh) +{ + handle->addr = (__u64)(unsigned long)lock; + handle->cookie = lock->l_cookie; +} + +struct *ldlm_handle2lock(struct ldlm_handle *handle) +{ + struct ldlm_lock *lock = NULL; + ENTRY; + + if (!handle) + RETURN(NULL); + lock = (struct ldlm_lock *)(unsigned long)(handle->addr); + + if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock)) + RETURN(NULL); + + l_lock(&lock->l_resource->lr_namespace->ns_lock); + if (lock->l_cookie != handle->cookie) + GOTO(out, handle = NULL); + + if (lock->l_flags & LDLM_FL_DESTROYED) + GOTO(out, handle = NULL); + + lock->l_refc++; + EXIT; + out: + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + return handle; +} + +struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock) +{ + l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock->l_refc++; + ldlm_resource_getref(lock->l_resource); + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + return lock; +} + +void ldlm_lock_put(struct ldlm_lock *lock) +{ + struct l_lock *nslock = &lock->l_resource->lr_namespace->ns_lock; + ENTRY; + + l_lock(&nslock); + lock->l_refc--; + if (lock->l_refc < 0) + LBUG(); + + ldlm_resource_put(lock->l_resource); + if (lock->l_parent) + ldlm_lock_put(lock->l_parent); + + if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) { + if (lock->l_connection) + ptlrpc_put_connection(lock->l_connection); + kmem_cache_free(ldlm_lock_slab, lock); + } + l_unlock(&nslock); + EXIT; + return; +} + +void ldlm_lock_destroy(struct ldlm_lock *lock) +{ + ENTRY; + l_lock(&lock->l_resource->lr_namespace->ns_lock); + + if (!list_empty(&lock->l_children)) { + CERROR("lock %p still has children (%p)!\n", lock, + lock->l_children.next); + ldlm_lock_dump(lock); + LBUG(); + } + if (lock->l_readers || lock->l_writers) { + CDEBUG(D_INFO, "lock still has references (%d readers, %d " + "writers)\n", lock->l_readers, lock->l_writers); + LBUG(); + } + + if (!list_empty(lock->l_res)) + LBUG(); + + lock->l_flags = LDLM_FL_DESTROYED; + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + ldlm_lock_put(lock); + EXIT; + return; +} +/* + usage: pass in a resource on which you have done get + pass in a parent lock on which you have done a get + do not put the resource or the parent + returns: lock with refcount 1 +*/ +static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, + struct ldlm_resource *resource) +{ + struct ldlm_lock *lock; + ENTRY; + + if (resource == NULL) + LBUG(); + + lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL); + if (lock == NULL) + RETURN(NULL); + + memset(lock, 0, sizeof(*lock)); + get_random_bytes(&lock->l_cookie, sizeof(__u64)); + + lock->l_resource = resource; + lock->l_refc = 1; + INIT_LIST_HEAD(&lock->l_children); + INIT_LIST_HEAD(&lock->l_res_link); + init_waitqueue_head(&lock->l_waitq); + + if (parent != NULL) { + l_lock(&parent->l_resource->lr_namespace->ns_lock); + lock->l_parent = parent; + list_add(&lock->l_childof, &parent->l_children); + l_unlock(&parent->l_resource->lr_namespace->ns_lock); + } + RETURN(lock); +} + +int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]) +{ + struct ldlm_namespace *ns = lock->l_resource->lr_namespace; + int type, i; + ENTRY; + + l_lock(&ns->ns_lock); + type = lock->l_resource->lr_type; + + for (i = 0; i < lock->l_refc; i++) { + int rc; + rc = ldlm_resource_put(lock->l_resource); + if (rc == 1 && i != lock->l_refc - 1) + LBUG(); + } + + lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1); + if (lock->l_resource == NULL) { + LBUG(); + RETURN(-ENOMEM); + } + + for (i = 1; i < lock->l_refc; i++) + ldlm_resource_addref(lock->l_resource); + + l_unlock(&ns->ns_lock); + RETURN(0); +} + static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, ldlm_mode_t mode, void *data) { @@ -164,13 +321,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, CDEBUG(D_INFO, "remote intent: locking %d instead of" "%ld\n", mds_rep->ino, (long)old_res); - spin_lock(&lock->l_resource->lr_lock); - if (!ldlm_resource_put(lock->l_resource)) - /* unlock it unless the resource was freed */ - spin_unlock(&lock->l_resource->lr_lock); - lock->l_resource = - ldlm_resource_get(ns, NULL, new_resid, type, 1); + ldlm_lock_change_resource(lock, new_resid); if (lock->l_resource == NULL) { LBUG(); RETURN(-ENOMEM); @@ -196,46 +348,13 @@ static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b) return lockmode_compat(a->l_req_mode, b->l_req_mode); } -/* Args: referenced, unlocked parent (or NULL) - * referenced, unlocked resource - * Locks: parent->l_lock */ -static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, - struct ldlm_resource *resource) -{ - struct ldlm_lock *lock; - - if (resource == NULL) - LBUG(); - - lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL); - if (lock == NULL) - return NULL; - - memset(lock, 0, sizeof(*lock)); - lock->l_resource = resource; - INIT_LIST_HEAD(&lock->l_children); - INIT_LIST_HEAD(&lock->l_res_link); - init_waitqueue_head(&lock->l_waitq); - lock->l_lock = SPIN_LOCK_UNLOCKED; - - if (parent != NULL) { - spin_lock(&parent->l_lock); - lock->l_parent = parent; - list_add(&lock->l_childof, &parent->l_children); - spin_unlock(&parent->l_lock); - } - - return lock; -} - /* Args: unreferenced, locked lock * * Caller must do its own ldlm_resource_put() on lock->l_resource */ void ldlm_lock_free(struct ldlm_lock *lock) { if (!list_empty(&lock->l_children)) { - CERROR("lock %p still has children (%p)!\n", lock, - lock->l_children.next); + CERROR("lock %p still has children!\n", lock); ldlm_lock_dump(lock); LBUG(); } @@ -258,30 +377,22 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version)); } -/* Args: unlocked lock */ -void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode) -{ - spin_lock(&lock->l_lock); - if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) - lock->l_readers++; - else - lock->l_writers++; - spin_unlock(&lock->l_lock); -} - -int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new) +static int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new) { struct ptlrpc_request *req = NULL; ENTRY; - spin_lock(&lock->l_lock); - if (lock->l_flags & LDLM_FL_AST_SENT) + l_lock(&lock->l_resource->lr_namespace->ns_lock); + if (lock->l_flags & LDLM_FL_AST_SENT) { + l_unlock(&lock->l_resource->lr_namespace->ns_lock); RETURN(0); + } lock->l_flags |= LDLM_FL_AST_SENT; lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req); - spin_unlock(&lock->l_lock); + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + if (req != NULL) { struct list_head *list = lock->l_resource->lr_tmp; list_add(&req->rq_multi, list); @@ -290,6 +401,18 @@ int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new) } /* Args: unlocked lock */ +void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode) +{ + l_lock(&lock->l_resource->lr_namespace->ns_lock); + if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) + lock->l_readers++; + else + lock->l_writers++; + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + ldlm_lock_get(lock); +} + +/* Args: unlocked lock */ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode) { ENTRY; @@ -297,14 +420,16 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode) if (lock == NULL) LBUG(); - spin_lock(&lock->l_lock); + l_lock(&lock->l_resource->lr_namespace->ns_lock); if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) lock->l_readers--; else lock->l_writers--; + + /* If we received a blocked AST and this was the last reference, + * run the callback. */ if (!lock->l_readers && !lock->l_writers && - lock->l_flags & LDLM_FL_DYING) { - /* Read this lock its rights. */ + (lock->l_flags & LDLM_FL_BLOCKED_PENDING)) { if (!lock->l_resource->lr_namespace->ns_client) { CERROR("LDLM_FL_DYING set on non-local lock!\n"); LBUG(); @@ -312,17 +437,18 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode) CDEBUG(D_INFO, "final decref done on dying lock, " "calling callback.\n"); - spin_unlock(&lock->l_lock); - /* This function pointer is unfortunately overloaded. This - * call will not result in an RPC. */ + l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock->l_blocking_ast(lock, NULL, lock->l_data, lock->l_data_len, NULL); } else - spin_unlock(&lock->l_lock); + l_lock(&lock->l_resource->lr_namespace->ns_lock); + + ldlm_lock_put(lock); + EXIT; } -/* Args: unlocked lock */ static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs, struct list_head *queue) { @@ -365,21 +491,21 @@ static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs, return rc; } -/* Args: unlocked lock */ static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs) { int rc; ENTRY; + l_lock(&lock->l_resource->lr_namespace->ns_lock); rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted); /* FIXME: should we be sending ASTs to converting? */ rc |= _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_converting); + l_unlock(&lock->l_resource->lr_namespace->ns_lock); RETURN(rc); } -/* Args: locked lock, locked resource */ void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) { struct ptlrpc_request *req = NULL; @@ -445,6 +571,7 @@ int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, struct lustre_handle *lockh) { struct ldlm_resource *res; + struct ldlm_namespace *ns; int rc = 0; ENTRY; @@ -452,7 +579,9 @@ int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, if (res == NULL) RETURN(0); - spin_lock(&res->lr_lock); + ns = res->lr_namespace; + l_lock(&ns->ns_lock); + if (search_queue(&res->lr_granted, mode, cookie, lockh)) GOTO(out, rc = 1); if (search_queue(&res->lr_converting, mode, cookie, lockh)) @@ -462,8 +591,9 @@ int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, EXIT; out: - if (!ldlm_resource_put(res)) - spin_unlock(&res->lr_lock); + ldlm_resource_put(res); + l_unlock(&ns->ns_lock); + return rc; } @@ -490,9 +620,7 @@ ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns, lock = ldlm_lock_new(parent_lock, res); if (lock == NULL) { - spin_lock(&res->lr_lock); - if (!ldlm_resource_put(res)) - spin_unlock(&res->lr_lock); + ldlm_resource_put(res); RETURN(-ENOMEM); } @@ -521,7 +649,6 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh, lock = lustre_handle2object(lockh); res = lock->l_resource; local = res->lr_namespace->ns_client; - spin_lock(&res->lr_lock); lock->l_blocking_ast = blocking; @@ -534,26 +661,20 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh, /* We do this dancing with refcounts and locks because the * policy function could send an RPC */ - res->lr_refcount++; - spin_unlock(&res->lr_lock); + ldlm_resource_addref(res); rc = policy(lock, cookie, lock->l_req_mode, NULL); - spin_lock(&res->lr_lock); - if (ldlm_resource_put(res)) - res = NULL; + if (ldlm_resource_put(res) && rc != ELDLM_LOCK_CHANGED) + /* ldlm_resource_put() should not destroy 'res' unless + * 'res' is no longer the resource for this lock. */ + LBUG(); if (rc == ELDLM_LOCK_CHANGED) { - if (res) - spin_unlock(&res->lr_lock); res = lock->l_resource; - spin_lock(&res->lr_lock); *flags |= LDLM_FL_LOCK_CHANGED; } else if (rc == ELDLM_LOCK_ABORTED) { - /* Abort. */ - if (res && !ldlm_resource_put(res)) - spin_unlock(&res->lr_lock); - ldlm_lock_free(lock); + ldlm_lock_destroy(lock); RETURN(rc); } } @@ -565,7 +686,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh, /* The server returned a blocked lock, but it was granted before * we got a chance to actually enqueue it. We don't need to do * anything else. */ - GOTO(out_noput, ELDLM_OK); + GOTO(out, ELDLM_OK); } /* If this is a local resource, put it on the appropriate list. */ @@ -602,17 +723,9 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh, ldlm_grant_lock(res, lock); EXIT; out: - /* We're called with a lock that has a referenced resource and is not on - * any resource list. When we added it to a list, we incurred an extra - * reference. */ - if (ldlm_resource_put(lock->l_resource)) - res = NULL; - out_noput: /* Don't set 'completion_ast' until here so that if the lock is granted * immediately we don't do an unnecessary completion call. */ lock->l_completion_ast = completion; - if (res) - spin_unlock(&res->lr_lock); return ELDLM_OK; } @@ -629,7 +742,6 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res, CDEBUG(D_INFO, "Reprocessing lock %p\n", pending); - /* the resource lock protects ldlm_lock_compat */ if (ldlm_lock_compat(pending, 1)) RETURN(1); @@ -676,7 +788,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res) return; } - spin_lock(&res->lr_lock); + l_lock(&res->lr_namespace->ns_lock); res->lr_tmp = &rpc_list; ldlm_reprocess_queue(res, &res->lr_converting); @@ -684,7 +796,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res) ldlm_reprocess_queue(res, &res->lr_waiting); res->lr_tmp = NULL; - spin_unlock(&res->lr_lock); + l_unlock(&res->lr_namespace->ns_lock); ldlm_send_delayed_asts(&rpc_list); EXIT; @@ -694,22 +806,21 @@ void ldlm_reprocess_all(struct ldlm_resource *res) struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock) { struct ldlm_resource *res; + struct ldlm_namespace *ns; ENTRY; res = lock->l_resource; + ns = res->lr_namespace; - spin_lock(&res->lr_lock); - spin_lock(&lock->l_lock); - + l_lock(&ns->ns_lock); if (lock->l_readers || lock->l_writers) CDEBUG(D_INFO, "lock still has references (%d readers, %d " "writers)\n", lock->l_readers, lock->l_writers); - if (ldlm_resource_del_lock(lock)) - res = NULL; /* res was freed, nothing else to do. */ - else - spin_unlock(&res->lr_lock); - ldlm_lock_free(lock); + ldlm_resource_del_lock(lock); + ldlm_lock_destroy(lock); + + l_unlock(&ns->ns_lock); RETURN(res); } @@ -720,15 +831,17 @@ struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh, { struct ldlm_lock *lock; struct ldlm_resource *res; + struct ldlm_namespace *ns; ENTRY; lock = lustre_handle2object(lockh); res = lock->l_resource; + ns = res->lr_namespace; - spin_lock(&res->lr_lock); + l_lock(&ns->ns_lock); lock->l_req_mode = new_mode; - list_del_init(&lock->l_res_link); + ldlm_resource_del_lock(lock); /* If this is a local resource, put it on the appropriate list. */ if (res->lr_namespace->ns_client) { @@ -743,7 +856,7 @@ struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh, list_add(&lock->l_res_link, res->lr_converting.prev); } - spin_unlock(&res->lr_lock); + l_unlock(&ns->ns_lock); RETURN(res); } diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 7d09521..0094dc3 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -43,7 +43,7 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) &ns->ns_rpc_client); INIT_LIST_HEAD(&ns->ns_root_list); - ns->ns_lock = SPIN_LOCK_UNLOCKED; + l_lock_init(&ns->ns_lock); ns->ns_refcount = 0; ns->ns_client = client; @@ -52,12 +52,12 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) INIT_LIST_HEAD(bucket); RETURN(ns); - out: + out: if (ns && ns->ns_hash) vfree(ns->ns_hash); if (ns && ns->ns_name) OBD_FREE(ns->ns_name, strlen(name) + 1); - if (ns) + if (ns) OBD_FREE(ns, sizeof(*ns)); return NULL; } @@ -73,7 +73,6 @@ static int cleanup_resource(struct ldlm_resource *res, struct list_head *q) lock = list_entry(tmp, struct ldlm_lock, l_res_link); if (client) { - spin_unlock(&res->lr_lock); rc = ldlm_cli_cancel(lock->l_client, lock); if (rc < 0) { CERROR("ldlm_cli_cancel: %d\n", rc); @@ -81,12 +80,9 @@ static int cleanup_resource(struct ldlm_resource *res, struct list_head *q) } if (rc == ELDLM_RESOURCE_FREED) rc = 1; - else - spin_lock(&res->lr_lock); } else { CERROR("Freeing a lock still held by a client node.\n"); - spin_lock(&lock->l_lock); ldlm_resource_del_lock(lock); ldlm_lock_free(lock); @@ -104,14 +100,14 @@ int ldlm_namespace_free(struct ldlm_namespace *ns) if (!ns) RETURN(ELDLM_OK); - /* We should probably take the ns_lock, but then ldlm_resource_put - * couldn't take it. Hmm. */ + + l_lock(&ns->ns_lock); + for (i = 0; i < RES_HASH_SIZE; i++) { list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) { struct ldlm_resource *res; res = list_entry(tmp, struct ldlm_resource, lr_hash); - spin_lock(&res->lr_lock); rc = cleanup_resource(res, &res->lr_granted); if (!rc) rc = cleanup_resource(res, &res->lr_converting); @@ -130,7 +126,7 @@ int ldlm_namespace_free(struct ldlm_namespace *ns) } vfree(ns->ns_hash /* , sizeof(struct list_head) * RES_HASH_SIZE */); - ptlrpc_cleanup_client(&ns->ns_rpc_client); + ptlrpc_cleanup_client(&ns->ns_rpc_client); OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1); OBD_FREE(ns, sizeof(*ns)); @@ -167,8 +163,7 @@ static struct ldlm_resource *ldlm_resource_new(void) INIT_LIST_HEAD(&res->lr_converting); INIT_LIST_HEAD(&res->lr_waiting); - res->lr_lock = SPIN_LOCK_UNLOCKED; - res->lr_refcount = 1; + atomic_set(&res->lr_refcount, 1); return res; } @@ -198,7 +193,7 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns, res->lr_namespace = ns; ns->ns_refcount++; - res->lr_type = type; + res->lr_type = type; res->lr_most_restr = LCK_NL; bucket = ns->ns_hash + ldlm_hash_fn(parent, name); @@ -232,7 +227,7 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, RETURN(NULL); } - spin_lock(&ns->ns_lock); + l_lock(&ns->ns_lock); bucket = ns->ns_hash + ldlm_hash_fn(parent, name); list_for_each(tmp, bucket) { @@ -241,9 +236,7 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, if (memcmp(chk->lr_name, name, sizeof(chk->lr_name)) == 0) { res = chk; - spin_lock(&res->lr_lock); - res->lr_refcount++; - spin_unlock(&res->lr_lock); + atomic_inc(&res->lr_refcount); EXIT; break; } @@ -251,29 +244,31 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, if (res == NULL && create) res = ldlm_resource_add(ns, parent, name, type); - spin_unlock(&ns->ns_lock); + l_unlock(&ns->ns_lock); RETURN(res); } -/* Args: locked resource - * Locks: takes and releases res->lr_lock - * takes and releases ns->ns_lock iff res->lr_refcount falls to 0 - */ +struct ldlm_resource *ldlm_resource_addref(struct ldlm_resource *res) +{ + atomic_inc(&res->lr_refcount); + return res; +} + +/* Returns 1 if the resource was freed, 0 if it remains. */ int ldlm_resource_put(struct ldlm_resource *res) { int rc = 0; - if (res->lr_refcount == 1) { + if (atomic_dec_and_test(&res->lr_refcount)) { struct ldlm_namespace *ns = res->lr_namespace; ENTRY; - spin_unlock(&res->lr_lock); - spin_lock(&ns->ns_lock); - spin_lock(&res->lr_lock); + l_lock(&ns->ns_lock); - if (res->lr_refcount != 1) { - spin_unlock(&ns->ns_lock); + if (atomic_read(&res->lr_refcount) != 0) { + /* We lost the race. */ + l_unlock(&ns->ns_lock); goto out; } @@ -295,40 +290,38 @@ int ldlm_resource_put(struct ldlm_resource *res) list_del(&res->lr_childof); kmem_cache_free(ldlm_resource_slab, res); - spin_unlock(&ns->ns_lock); + l_unlock(&ns->ns_lock); rc = 1; } else { ENTRY; out: - res->lr_refcount--; - if (res->lr_refcount < 0) + if (atomic_read(&res->lr_refcount) < 0) LBUG(); } - RETURN(rc); + RETURN(rc); } -/* Must be called with resource->lr_lock taken */ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, struct ldlm_lock *lock) { - ldlm_resource_dump(res); - ldlm_lock_dump(lock); + l_lock(&res->lr_namespace->ns_lock); + + ldlm_resource_dump(res); + ldlm_lock_dump(lock); + if (!list_empty(&lock->l_res_link)) LBUG(); list_add(&lock->l_res_link, head); - res->lr_refcount++; + l_unlock(&res->lr_namespace->ns_lock); } -/* Must be called with resource->lr_lock taken */ -int ldlm_resource_del_lock(struct ldlm_lock *lock) +void ldlm_resource_del_lock(struct ldlm_lock *lock) { - if (!list_empty(&lock->l_res_link)) { - list_del_init(&lock->l_res_link); - return ldlm_resource_put(lock->l_resource); - } - return 0; + l_lock(&res->lr_namespace->ns_lock); + list_del_init(&lock->l_res_link); + l_unlock(&res->lr_namespace->ns_lock); } int ldlm_get_resource_handle(struct ldlm_resource *res, struct lustre_handle *h)