&ns->ns_rpc_client);
INIT_LIST_HEAD(&ns->ns_root_list);
- ns->ns_lock = SPIN_LOCK_UNLOCKED;
+ l_lock_init(&ns->ns_lock);
ns->ns_refcount = 0;
ns->ns_client = client;
INIT_LIST_HEAD(bucket);
RETURN(ns);
- out:
+ out:
if (ns && ns->ns_hash)
vfree(ns->ns_hash);
if (ns && ns->ns_name)
OBD_FREE(ns->ns_name, strlen(name) + 1);
- if (ns)
+ if (ns)
OBD_FREE(ns, sizeof(*ns));
return NULL;
}
-static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
+extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock);
+
+static void cleanup_resource(struct ldlm_resource *res, struct list_head *q)
{
struct list_head *tmp, *pos;
int rc = 0, client = res->lr_namespace->ns_client;
list_for_each_safe(tmp, pos, q) {
struct ldlm_lock *lock;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ ldlm_lock_get(lock);
if (client) {
- rc = ldlm_cli_cancel(lock->l_client, lock);
+ struct lustre_handle lockh;
+ ldlm_lock2handle(lock, &lockh);
+ /* can we get away without a connh here? */
+ rc = ldlm_cli_cancel(&lockh, NULL);
if (rc < 0) {
CERROR("ldlm_cli_cancel: %d\n", rc);
LBUG();
}
- if (rc == ELDLM_RESOURCE_FREED)
- rc = 1;
} else {
CERROR("Freeing a lock still held by a client node.\n");
- spin_lock(&lock->l_lock);
- ldlm_resource_del_lock(lock);
- ldlm_lock_free(lock);
-
- rc = ldlm_resource_put(res);
+ ldlm_resource_unlink_lock(lock);
+ ldlm_lock_destroy(lock);
}
+ ldlm_lock_put(lock);
}
- RETURN(rc);
+ return;
}
int ldlm_namespace_free(struct ldlm_namespace *ns)
{
struct list_head *tmp, *pos;
- int i, rc;
+ int i;
if (!ns)
RETURN(ELDLM_OK);
- /* We should probably take the ns_lock, but then ldlm_resource_put
- * couldn't take it. Hmm. */
+
+ l_lock(&ns->ns_lock);
+
for (i = 0; i < RES_HASH_SIZE; i++) {
list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
struct ldlm_resource *res;
res = list_entry(tmp, struct ldlm_resource, lr_hash);
+ ldlm_resource_getref(res);
- spin_lock(&res->lr_lock);
- rc = cleanup_resource(res, &res->lr_granted);
- if (!rc)
- rc = cleanup_resource(res, &res->lr_converting);
- if (!rc)
- rc = cleanup_resource(res, &res->lr_waiting);
-
- if (rc == 0) {
+ cleanup_resource(res, &res->lr_granted);
+ cleanup_resource(res, &res->lr_converting);
+ cleanup_resource(res, &res->lr_waiting);
+
+ if (!ldlm_resource_put(res)) {
CERROR("Resource refcount nonzero (%d) after "
"lock cleanup; forcing cleanup.\n",
- res->lr_refcount);
+ atomic_read(&res->lr_refcount));
ldlm_resource_dump(res);
- res->lr_refcount = 1;
- rc = ldlm_resource_put(res);
+ atomic_set(&res->lr_refcount, 1);
+ ldlm_resource_put(res);
}
}
}
vfree(ns->ns_hash /* , sizeof(struct list_head) * RES_HASH_SIZE */);
- ptlrpc_cleanup_client(&ns->ns_rpc_client);
+ ptlrpc_cleanup_client(&ns->ns_rpc_client);
OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
OBD_FREE(ns, sizeof(*ns));
INIT_LIST_HEAD(&res->lr_converting);
INIT_LIST_HEAD(&res->lr_waiting);
- res->lr_lock = SPIN_LOCK_UNLOCKED;
- res->lr_refcount = 1;
+ atomic_set(&res->lr_refcount, 1);
return res;
}
res->lr_namespace = ns;
ns->ns_refcount++;
- res->lr_type = type;
+ res->lr_type = type;
res->lr_most_restr = LCK_NL;
bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
RETURN(NULL);
}
- spin_lock(&ns->ns_lock);
+ l_lock(&ns->ns_lock);
bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
list_for_each(tmp, bucket) {
if (memcmp(chk->lr_name, name, sizeof(chk->lr_name)) == 0) {
res = chk;
- spin_lock(&res->lr_lock);
- res->lr_refcount++;
- spin_unlock(&res->lr_lock);
+ atomic_inc(&res->lr_refcount);
EXIT;
break;
}
if (res == NULL && create)
res = ldlm_resource_add(ns, parent, name, type);
- spin_unlock(&ns->ns_lock);
+ l_unlock(&ns->ns_lock);
RETURN(res);
}
-/* Args: locked resource
- * Locks: takes and releases res->lr_lock
- * takes and releases ns->ns_lock iff res->lr_refcount falls to 0
- */
+struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res)
+{
+ atomic_inc(&res->lr_refcount);
+ return res;
+}
+
+/* Returns 1 if the resource was freed, 0 if it remains. */
int ldlm_resource_put(struct ldlm_resource *res)
{
int rc = 0;
- if (res->lr_refcount == 1) {
+ if (atomic_dec_and_test(&res->lr_refcount)) {
struct ldlm_namespace *ns = res->lr_namespace;
ENTRY;
- spin_unlock(&res->lr_lock);
- spin_lock(&ns->ns_lock);
- spin_lock(&res->lr_lock);
+ l_lock(&ns->ns_lock);
- if (res->lr_refcount != 1) {
- spin_unlock(&ns->ns_lock);
+ if (atomic_read(&res->lr_refcount) != 0) {
+ /* We lost the race. */
+ l_unlock(&ns->ns_lock);
goto out;
}
list_del(&res->lr_childof);
kmem_cache_free(ldlm_resource_slab, res);
- spin_unlock(&ns->ns_lock);
+ l_unlock(&ns->ns_lock);
rc = 1;
} else {
ENTRY;
out:
- res->lr_refcount--;
- if (res->lr_refcount < 0)
+ if (atomic_read(&res->lr_refcount) < 0)
LBUG();
}
- RETURN(rc);
+ RETURN(rc);
}
-/* Must be called with resource->lr_lock taken */
void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
struct ldlm_lock *lock)
{
- ldlm_resource_dump(res);
- ldlm_lock_dump(lock);
+ l_lock(&res->lr_namespace->ns_lock);
+
+ ldlm_resource_dump(res);
+ ldlm_lock_dump(lock);
+
if (!list_empty(&lock->l_res_link))
LBUG();
list_add(&lock->l_res_link, head);
- res->lr_refcount++;
-}
-
-/* Must be called with resource->lr_lock taken */
-int ldlm_resource_del_lock(struct ldlm_lock *lock)
-{
- if (!list_empty(&lock->l_res_link)) {
- list_del_init(&lock->l_res_link);
- return ldlm_resource_put(lock->l_resource);
- }
- return 0;
+ l_unlock(&res->lr_namespace->ns_lock);
}
-int ldlm_get_resource_handle(struct ldlm_resource *res, struct lustre_handle *h)
+void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
{
- LBUG();
- return 0;
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ list_del_init(&lock->l_res_link);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
}
void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
(unsigned long long)res->lr_name[2]);
CDEBUG(D_OTHER, "--- Resource: %p (%s) (rc: %d)\n", res, name,
- res->lr_refcount);
+ atomic_read(&res->lr_refcount));
CDEBUG(D_OTHER, "Namespace: %p (%s)\n", res->lr_namespace,
res->lr_namespace->ns_name);
CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root);