Whamcloud - gitweb
First swing at DLM lock cleanup. Do not update your tree if you value
authorpschwan <pschwan>
Mon, 24 Jun 2002 03:17:07 +0000 (03:17 +0000)
committerpschwan <pschwan>
Mon, 24 Jun 2002 03:17:07 +0000 (03:17 +0000)
compilation.

lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_resource.c

index 4c922d9..ac10c5f 100644 (file)
@@ -18,7 +18,7 @@
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_mds.h>
 
-extern kmem_cache_t *ldlm_lock_slab;
+static kmem_cache_t *ldlm_lock_slab;
 int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
 int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
 
@@ -38,6 +38,163 @@ ldlm_res_policy ldlm_res_policy_table [] = {
         [LDLM_MDSINTENT] ldlm_intent_policy
 };
 
+void ldlm_lock2handle(struct ldlm_lock *lock, struct ldlm_handle *lockh)
+{
+        handle->addr = (__u64)(unsigned long)lock;
+        handle->cookie = lock->l_cookie;
+}
+
+struct *ldlm_handle2lock(struct ldlm_handle *handle)
+{
+        struct ldlm_lock *lock = NULL;
+        ENTRY;
+
+        if (!handle)
+                RETURN(NULL);
+        lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
+
+        if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock))
+                RETURN(NULL);
+
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        if (lock->l_cookie != handle->cookie)
+                GOTO(out, handle = NULL);
+
+        if (lock->l_flags & LDLM_FL_DESTROYED)
+                GOTO(out, handle = NULL);
+
+        lock->l_refc++;
+        EXIT;
+ out:
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        return  handle;
+}
+
+struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
+{
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        lock->l_refc++;
+        ldlm_resource_getref(lock->l_resource);
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        return lock;
+}
+
+void ldlm_lock_put(struct ldlm_lock *lock)
+{
+        struct l_lock *nslock = &lock->l_resource->lr_namespace->ns_lock;
+        ENTRY;
+
+        l_lock(&nslock);
+        lock->l_refc--;
+        if (lock->l_refc < 0)
+                LBUG();
+
+        ldlm_resource_put(lock->l_resource);
+        if (lock->l_parent)
+                ldlm_lock_put(lock->l_parent);
+
+        if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) {
+                if (lock->l_connection)
+                        ptlrpc_put_connection(lock->l_connection);
+                kmem_cache_free(ldlm_lock_slab, lock);
+        }
+        l_unlock(&nslock);
+        EXIT;
+        return;
+}
+
+void ldlm_lock_destroy(struct ldlm_lock *lock)
+{
+        ENTRY;
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+
+        if (!list_empty(&lock->l_children)) {
+                CERROR("lock %p still has children (%p)!\n", lock,
+                       lock->l_children.next);
+                ldlm_lock_dump(lock);
+                LBUG();
+        }
+        if (lock->l_readers || lock->l_writers) {
+                CDEBUG(D_INFO, "lock still has references (%d readers, %d "
+                       "writers)\n", lock->l_readers, lock->l_writers);
+                LBUG();
+        }
+
+        if (!list_empty(lock->l_res))
+                LBUG();
+
+        lock->l_flags = LDLM_FL_DESTROYED;
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        ldlm_lock_put(lock);
+        EXIT;
+        return;
+}
+/*
+   usage: pass in a resource on which you have done get
+          pass in a parent lock on which you have done a get
+          do not put the resource or the parent
+   returns: lock with refcount 1
+*/
+static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
+                                       struct ldlm_resource *resource)
+{
+        struct ldlm_lock *lock;
+        ENTRY;
+
+        if (resource == NULL)
+                LBUG();
+
+        lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
+        if (lock == NULL)
+                RETURN(NULL);
+
+        memset(lock, 0, sizeof(*lock));
+        get_random_bytes(&lock->l_cookie, sizeof(__u64));
+
+        lock->l_resource = resource;
+        lock->l_refc = 1;
+        INIT_LIST_HEAD(&lock->l_children);
+        INIT_LIST_HEAD(&lock->l_res_link);
+        init_waitqueue_head(&lock->l_waitq);
+
+        if (parent != NULL) {
+                l_lock(&parent->l_resource->lr_namespace->ns_lock);
+                lock->l_parent = parent;
+                list_add(&lock->l_childof, &parent->l_children);
+                l_unlock(&parent->l_resource->lr_namespace->ns_lock);
+        }
+        RETURN(lock);
+}
+
+int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
+{
+        struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
+        int type, i;
+        ENTRY;
+
+        l_lock(&ns->ns_lock);
+        type = lock->l_resource->lr_type;
+
+        for (i = 0; i < lock->l_refc; i++) {
+                int rc;
+                rc = ldlm_resource_put(lock->l_resource);
+                if (rc == 1 && i != lock->l_refc - 1)
+                        LBUG();
+        }
+
+        lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1);
+        if (lock->l_resource == NULL) {
+                LBUG();
+                RETURN(-ENOMEM);
+        }
+
+        for (i = 1; i < lock->l_refc; i++)
+                ldlm_resource_addref(lock->l_resource);
+
+        l_unlock(&ns->ns_lock);
+        RETURN(0);
+}
+
 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                               ldlm_mode_t mode, void *data)
 {
@@ -164,13 +321,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
 
                 CDEBUG(D_INFO, "remote intent: locking %d instead of"
                        "%ld\n", mds_rep->ino, (long)old_res);
-                spin_lock(&lock->l_resource->lr_lock);
-                if (!ldlm_resource_put(lock->l_resource))
-                        /* unlock it unless the resource was freed */
-                        spin_unlock(&lock->l_resource->lr_lock);
 
-                lock->l_resource =
-                        ldlm_resource_get(ns, NULL, new_resid, type, 1);
+                ldlm_lock_change_resource(lock, new_resid);
                 if (lock->l_resource == NULL) {
                         LBUG();
                         RETURN(-ENOMEM);
@@ -196,46 +348,13 @@ static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
         return lockmode_compat(a->l_req_mode, b->l_req_mode);
 }
 
-/* Args: referenced, unlocked parent (or NULL)
- *       referenced, unlocked resource
- * Locks: parent->l_lock */
-static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
-                                       struct ldlm_resource *resource)
-{
-        struct ldlm_lock *lock;
-
-        if (resource == NULL)
-                LBUG();
-
-        lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
-        if (lock == NULL)
-                return NULL;
-
-        memset(lock, 0, sizeof(*lock));
-        lock->l_resource = resource;
-        INIT_LIST_HEAD(&lock->l_children);
-        INIT_LIST_HEAD(&lock->l_res_link);
-        init_waitqueue_head(&lock->l_waitq);
-        lock->l_lock = SPIN_LOCK_UNLOCKED;
-
-        if (parent != NULL) {
-                spin_lock(&parent->l_lock);
-                lock->l_parent = parent;
-                list_add(&lock->l_childof, &parent->l_children);
-                spin_unlock(&parent->l_lock);
-        }
-
-        return lock;
-}
-
 /* Args: unreferenced, locked lock
  *
  * Caller must do its own ldlm_resource_put() on lock->l_resource */
 void ldlm_lock_free(struct ldlm_lock *lock)
 {
         if (!list_empty(&lock->l_children)) {
-                CERROR("lock %p still has children (%p)!\n", lock,
-                       lock->l_children.next);
+                CERROR("lock %p still has children!\n", lock);
                 ldlm_lock_dump(lock);
                 LBUG();
         }
@@ -258,30 +377,22 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
 }
 
-/* Args: unlocked lock */
-void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
-{
-        spin_lock(&lock->l_lock);
-        if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
-                lock->l_readers++;
-        else
-                lock->l_writers++;
-        spin_unlock(&lock->l_lock);
-}
-
-int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
+static int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
 {
         struct ptlrpc_request *req = NULL;
         ENTRY;
 
-        spin_lock(&lock->l_lock);
-        if (lock->l_flags & LDLM_FL_AST_SENT)
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        if (lock->l_flags & LDLM_FL_AST_SENT) {
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 RETURN(0);
+        }
 
         lock->l_flags |= LDLM_FL_AST_SENT;
 
         lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req);
-        spin_unlock(&lock->l_lock);
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
         if (req != NULL) {
                 struct list_head *list = lock->l_resource->lr_tmp;
                 list_add(&req->rq_multi, list);
@@ -290,6 +401,18 @@ int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
 }
 
 /* Args: unlocked lock */
+void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
+{
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+                lock->l_readers++;
+        else
+                lock->l_writers++;
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        ldlm_lock_get(lock);
+}
+
+/* Args: unlocked lock */
 void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
 {
         ENTRY;
@@ -297,14 +420,16 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
         if (lock == NULL)
                 LBUG();
 
-        spin_lock(&lock->l_lock);
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
                 lock->l_readers--;
         else
                 lock->l_writers--;
+
+        /* If we received a blocked AST and this was the last reference,
+         * run the callback. */
         if (!lock->l_readers && !lock->l_writers &&
-            lock->l_flags & LDLM_FL_DYING) {
-                /* Read this lock its rights. */
+            (lock->l_flags & LDLM_FL_BLOCKED_PENDING)) {
                 if (!lock->l_resource->lr_namespace->ns_client) {
                         CERROR("LDLM_FL_DYING set on non-local lock!\n");
                         LBUG();
@@ -312,17 +437,18 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
 
                 CDEBUG(D_INFO, "final decref done on dying lock, "
                        "calling callback.\n");
-                spin_unlock(&lock->l_lock);
-                /* This function pointer is unfortunately overloaded.  This
-                 * call will not result in an RPC. */
+                l_lock(&lock->l_resource->lr_namespace->ns_lock);
+
                 lock->l_blocking_ast(lock, NULL, lock->l_data,
                                      lock->l_data_len, NULL);
         } else
-                spin_unlock(&lock->l_lock);
+                l_lock(&lock->l_resource->lr_namespace->ns_lock);
+
+        ldlm_lock_put(lock);
+
         EXIT;
 }
 
-/* Args: unlocked lock */
 static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
                              struct list_head *queue)
 {
@@ -365,21 +491,21 @@ static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
         return rc;
 }
 
-/* Args: unlocked lock */
 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
 {
         int rc;
         ENTRY;
 
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
         rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
         /* FIXME: should we be sending ASTs to converting? */
         rc |= _ldlm_lock_compat(lock, send_cbs,
                                 &lock->l_resource->lr_converting);
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         RETURN(rc);
 }
 
-/* Args: locked lock, locked resource */
 void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
 {
         struct ptlrpc_request *req = NULL;
@@ -445,6 +571,7 @@ int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
                           struct lustre_handle *lockh)
 {
         struct ldlm_resource *res;
+        struct ldlm_namespace *ns;
         int rc = 0;
         ENTRY;
 
@@ -452,7 +579,9 @@ int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
         if (res == NULL)
                 RETURN(0);
 
-        spin_lock(&res->lr_lock);
+        ns = res->lr_namespace;
+        l_lock(&ns->ns_lock);
+
         if (search_queue(&res->lr_granted, mode, cookie, lockh))
                 GOTO(out, rc = 1);
         if (search_queue(&res->lr_converting, mode, cookie, lockh))
@@ -462,8 +591,9 @@ int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
 
         EXIT;
  out:
-        if (!ldlm_resource_put(res))
-                spin_unlock(&res->lr_lock);
+        ldlm_resource_put(res);
+        l_unlock(&ns->ns_lock);
+
         return rc;
 }
 
@@ -490,9 +620,7 @@ ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
 
         lock = ldlm_lock_new(parent_lock, res);
         if (lock == NULL) {
-                spin_lock(&res->lr_lock);
-                if (!ldlm_resource_put(res))
-                        spin_unlock(&res->lr_lock);
+                ldlm_resource_put(res);
                 RETURN(-ENOMEM);
         }
 
@@ -521,7 +649,6 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
         lock = lustre_handle2object(lockh);
         res = lock->l_resource;
         local = res->lr_namespace->ns_client;
-        spin_lock(&res->lr_lock);
 
         lock->l_blocking_ast = blocking;
 
@@ -534,26 +661,20 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
 
                 /* We do this dancing with refcounts and locks because the
                  * policy function could send an RPC */
-                res->lr_refcount++;
-                spin_unlock(&res->lr_lock);
+                ldlm_resource_addref(res);
 
                 rc = policy(lock, cookie, lock->l_req_mode, NULL);
 
-                spin_lock(&res->lr_lock);
-                if (ldlm_resource_put(res))
-                        res = NULL;
+                if (ldlm_resource_put(res) && rc != ELDLM_LOCK_CHANGED)
+                        /* ldlm_resource_put() should not destroy 'res' unless
+                         * 'res' is no longer the resource for this lock. */
+                        LBUG();
 
                 if (rc == ELDLM_LOCK_CHANGED) {
-                        if (res)
-                                spin_unlock(&res->lr_lock);
                         res = lock->l_resource;
-                        spin_lock(&res->lr_lock);
                         *flags |= LDLM_FL_LOCK_CHANGED;
                 } else if (rc == ELDLM_LOCK_ABORTED) {
-                        /* Abort. */
-                        if (res && !ldlm_resource_put(res))
-                                spin_unlock(&res->lr_lock);
-                        ldlm_lock_free(lock);
+                        ldlm_lock_destroy(lock);
                         RETURN(rc);
                 }
         }
@@ -565,7 +686,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
                 /* The server returned a blocked lock, but it was granted before
                  * we got a chance to actually enqueue it.  We don't need to do
                  * anything else. */
-                GOTO(out_noput, ELDLM_OK);
+                GOTO(out, ELDLM_OK);
         }
 
         /* If this is a local resource, put it on the appropriate list. */
@@ -602,17 +723,9 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
         ldlm_grant_lock(res, lock);
         EXIT;
  out:
-        /* We're called with a lock that has a referenced resource and is not on
-         * any resource list.  When we added it to a list, we incurred an extra
-         * reference. */
-        if (ldlm_resource_put(lock->l_resource))
-                res = NULL;
- out_noput:
         /* Don't set 'completion_ast' until here so that if the lock is granted
          * immediately we don't do an unnecessary completion call. */
         lock->l_completion_ast = completion;
-        if (res)
-                spin_unlock(&res->lr_lock);
         return ELDLM_OK;
 }
 
@@ -629,7 +742,6 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res,
 
                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
 
-                /* the resource lock protects ldlm_lock_compat */
                 if (ldlm_lock_compat(pending, 1))
                         RETURN(1);
 
@@ -676,7 +788,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
                 return;
         }
 
-        spin_lock(&res->lr_lock);
+        l_lock(&res->lr_namespace->ns_lock);
         res->lr_tmp = &rpc_list;
 
         ldlm_reprocess_queue(res, &res->lr_converting);
@@ -684,7 +796,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
                 ldlm_reprocess_queue(res, &res->lr_waiting);
 
         res->lr_tmp = NULL;
-        spin_unlock(&res->lr_lock);
+        l_unlock(&res->lr_namespace->ns_lock);
 
         ldlm_send_delayed_asts(&rpc_list);
         EXIT;
@@ -694,22 +806,21 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
 struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
 {
         struct ldlm_resource *res;
+        struct ldlm_namespace *ns;
         ENTRY;
 
         res = lock->l_resource;
+        ns = res->lr_namespace;
 
-        spin_lock(&res->lr_lock);
-        spin_lock(&lock->l_lock);
-
+        l_lock(&ns->ns_lock);
         if (lock->l_readers || lock->l_writers)
                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
                        "writers)\n", lock->l_readers, lock->l_writers);
 
-        if (ldlm_resource_del_lock(lock))
-                res = NULL; /* res was freed, nothing else to do. */
-        else
-                spin_unlock(&res->lr_lock);
-        ldlm_lock_free(lock);
+        ldlm_resource_del_lock(lock);
+        ldlm_lock_destroy(lock);
+
+        l_unlock(&ns->ns_lock);
 
         RETURN(res);
 }
@@ -720,15 +831,17 @@ struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh,
 {
         struct ldlm_lock *lock;
         struct ldlm_resource *res;
+        struct ldlm_namespace *ns;
         ENTRY;
 
         lock = lustre_handle2object(lockh);
         res = lock->l_resource;
+        ns = res->lr_namespace;
 
-        spin_lock(&res->lr_lock);
+        l_lock(&ns->ns_lock);
 
         lock->l_req_mode = new_mode;
-        list_del_init(&lock->l_res_link);
+        ldlm_resource_del_lock(lock);
 
         /* If this is a local resource, put it on the appropriate list. */
         if (res->lr_namespace->ns_client) {
@@ -743,7 +856,7 @@ struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh,
                 list_add(&lock->l_res_link, res->lr_converting.prev);
         }
 
-        spin_unlock(&res->lr_lock);
+        l_unlock(&ns->ns_lock);
 
         RETURN(res);
 }
index 7d09521..0094dc3 100644 (file)
@@ -43,7 +43,7 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
                            &ns->ns_rpc_client);
 
         INIT_LIST_HEAD(&ns->ns_root_list);
-        ns->ns_lock = SPIN_LOCK_UNLOCKED;
+        l_lock_init(&ns->ns_lock);
         ns->ns_refcount = 0;
         ns->ns_client = client;
 
@@ -52,12 +52,12 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
                 INIT_LIST_HEAD(bucket);
         RETURN(ns);
 
- out: 
+ out:
         if (ns && ns->ns_hash)
                 vfree(ns->ns_hash);
         if (ns && ns->ns_name)
                 OBD_FREE(ns->ns_name, strlen(name) + 1);
-        if (ns) 
+        if (ns)
                 OBD_FREE(ns, sizeof(*ns));
         return NULL;
 }
@@ -73,7 +73,6 @@ static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 if (client) {
-                        spin_unlock(&res->lr_lock);
                         rc = ldlm_cli_cancel(lock->l_client, lock);
                         if (rc < 0) {
                                 CERROR("ldlm_cli_cancel: %d\n", rc);
@@ -81,12 +80,9 @@ static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
                         }
                         if (rc == ELDLM_RESOURCE_FREED)
                                 rc = 1;
-                        else
-                                spin_lock(&res->lr_lock);
                 } else {
                         CERROR("Freeing a lock still held by a client node.\n");
 
-                        spin_lock(&lock->l_lock);
                         ldlm_resource_del_lock(lock);
                         ldlm_lock_free(lock);
 
@@ -104,14 +100,14 @@ int ldlm_namespace_free(struct ldlm_namespace *ns)
 
         if (!ns)
                 RETURN(ELDLM_OK);
-        /* We should probably take the ns_lock, but then ldlm_resource_put
-         * couldn't take it.  Hmm. */
+
+        l_lock(&ns->ns_lock);
+
         for (i = 0; i < RES_HASH_SIZE; i++) {
                 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
                         struct ldlm_resource *res;
                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
 
-                        spin_lock(&res->lr_lock);
                         rc = cleanup_resource(res, &res->lr_granted);
                         if (!rc)
                                 rc = cleanup_resource(res, &res->lr_converting);
@@ -130,7 +126,7 @@ int ldlm_namespace_free(struct ldlm_namespace *ns)
         }
 
         vfree(ns->ns_hash /* , sizeof(struct list_head) * RES_HASH_SIZE */);
-        ptlrpc_cleanup_client(&ns->ns_rpc_client); 
+        ptlrpc_cleanup_client(&ns->ns_rpc_client);
         OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
         OBD_FREE(ns, sizeof(*ns));
 
@@ -167,8 +163,7 @@ static struct ldlm_resource *ldlm_resource_new(void)
         INIT_LIST_HEAD(&res->lr_converting);
         INIT_LIST_HEAD(&res->lr_waiting);
 
-        res->lr_lock = SPIN_LOCK_UNLOCKED;
-        res->lr_refcount = 1;
+        atomic_set(&res->lr_refcount, 1);
 
         return res;
 }
@@ -198,7 +193,7 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
         res->lr_namespace = ns;
         ns->ns_refcount++;
 
-        res->lr_type = type; 
+        res->lr_type = type;
         res->lr_most_restr = LCK_NL;
 
         bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
@@ -232,7 +227,7 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
                 RETURN(NULL);
         }
 
-        spin_lock(&ns->ns_lock);
+        l_lock(&ns->ns_lock);
         bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
 
         list_for_each(tmp, bucket) {
@@ -241,9 +236,7 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
 
                 if (memcmp(chk->lr_name, name, sizeof(chk->lr_name)) == 0) {
                         res = chk;
-                        spin_lock(&res->lr_lock);
-                        res->lr_refcount++;
-                        spin_unlock(&res->lr_lock);
+                        atomic_inc(&res->lr_refcount);
                         EXIT;
                         break;
                 }
@@ -251,29 +244,31 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
 
         if (res == NULL && create)
                 res = ldlm_resource_add(ns, parent, name, type);
-        spin_unlock(&ns->ns_lock);
+        l_unlock(&ns->ns_lock);
 
         RETURN(res);
 }
 
-/* Args: locked resource
- * Locks: takes and releases res->lr_lock
- *        takes and releases ns->ns_lock iff res->lr_refcount falls to 0
- */
+struct ldlm_resource *ldlm_resource_addref(struct ldlm_resource *res)
+{
+        atomic_inc(&res->lr_refcount);
+        return res;
+}
+
+/* Returns 1 if the resource was freed, 0 if it remains. */
 int ldlm_resource_put(struct ldlm_resource *res)
 {
         int rc = 0;
 
-        if (res->lr_refcount == 1) {
+        if (atomic_dec_and_test(&res->lr_refcount)) {
                 struct ldlm_namespace *ns = res->lr_namespace;
                 ENTRY;
 
-                spin_unlock(&res->lr_lock);
-                spin_lock(&ns->ns_lock);
-                spin_lock(&res->lr_lock);
+                l_lock(&ns->ns_lock);
 
-                if (res->lr_refcount != 1) {
-                        spin_unlock(&ns->ns_lock);
+                if (atomic_read(&res->lr_refcount) != 0) {
+                        /* We lost the race. */
+                        l_unlock(&ns->ns_lock);
                         goto out;
                 }
 
@@ -295,40 +290,38 @@ int ldlm_resource_put(struct ldlm_resource *res)
                 list_del(&res->lr_childof);
 
                 kmem_cache_free(ldlm_resource_slab, res);
-                spin_unlock(&ns->ns_lock);
+                l_unlock(&ns->ns_lock);
                 rc = 1;
         } else {
                 ENTRY;
         out:
-                res->lr_refcount--;
-                if (res->lr_refcount < 0)
+                if (atomic_read(&res->lr_refcount) < 0)
                         LBUG();
         }
 
-        RETURN(rc); 
+        RETURN(rc);
 }
 
-/* Must be called with resource->lr_lock taken */
 void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
                             struct ldlm_lock *lock)
 {
-        ldlm_resource_dump(res); 
-        ldlm_lock_dump(lock); 
+        l_lock(&res->lr_namespace->ns_lock);
+
+        ldlm_resource_dump(res);
+        ldlm_lock_dump(lock);
+
         if (!list_empty(&lock->l_res_link))
                 LBUG();
 
         list_add(&lock->l_res_link, head);
-        res->lr_refcount++;
+        l_unlock(&res->lr_namespace->ns_lock);
 }
 
-/* Must be called with resource->lr_lock taken */
-int ldlm_resource_del_lock(struct ldlm_lock *lock)
+void ldlm_resource_del_lock(struct ldlm_lock *lock)
 {
-        if (!list_empty(&lock->l_res_link)) {
-                list_del_init(&lock->l_res_link);
-                return ldlm_resource_put(lock->l_resource);
-        }
-        return 0;
+        l_lock(&res->lr_namespace->ns_lock);
+        list_del_init(&lock->l_res_link);
+        l_unlock(&res->lr_namespace->ns_lock);
 }
 
 int ldlm_get_resource_handle(struct ldlm_resource *res, struct lustre_handle *h)