X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_resource.c;h=a198bf3979d2d8228eb88e2730bd171326cc3a2a;hb=59e2fe88fcd4b033642f9c76cdf15ecfe1a54cb6;hp=600219baef22a4d0f580470bf19207d2b8d17241;hpb=8dfe224dba672732bf6ea8d9be61dceda4af955d;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 600219b..a198bf3 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -9,102 +9,178 @@ * by Cluster File Systems, Inc. */ -#define EXPORT_SYMTAB #define DEBUG_SUBSYSTEM S_LDLM #include +#include kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab; -struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, - __u32 local) +spinlock_t ldlm_namespace_lock = SPIN_LOCK_UNLOCKED; +struct list_head ldlm_namespace_list = LIST_HEAD_INIT(ldlm_namespace_list); +static struct proc_dir_entry *ldlm_ns_proc_dir = NULL; + +int ldlm_proc_setup(struct obd_device *obd) +{ + ENTRY; + + if (obd->obd_proc_entry == NULL) + RETURN(-EINVAL); + + ldlm_ns_proc_dir = proc_mkdir("namespaces", obd->obd_proc_entry); + if (ldlm_ns_proc_dir == NULL) { + CERROR("Couldn't create /proc/lustre/ldlm/namespaces\n"); + RETURN(-EPERM); + } + RETURN(0); +} + +void ldlm_proc_cleanup(struct obd_device *obd) +{ + proc_lustre_remove_obd_entry("namespaces", obd); +} + +struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) { - struct ldlm_namespace *ns; + struct ldlm_namespace *ns = NULL; struct list_head *bucket; OBD_ALLOC(ns, sizeof(*ns)); if (!ns) { LBUG(); - RETURN(NULL); + GOTO(out, NULL); } + ns->ns_hash = vmalloc(sizeof(*ns->ns_hash) * RES_HASH_SIZE); if (!ns->ns_hash) { - OBD_FREE(ns, sizeof(*ns)); LBUG(); - RETURN(NULL); + GOTO(out, ns); + } + obd_memory += sizeof(*ns->ns_hash) * RES_HASH_SIZE; + + OBD_ALLOC(ns->ns_name, strlen(name) + 1); + if (!ns->ns_name) { + LBUG(); + GOTO(out, ns); } + strcpy(ns->ns_name, name); - ns->ns_obddev = obddev; INIT_LIST_HEAD(&ns->ns_root_list); - ns->ns_lock = SPIN_LOCK_UNLOCKED; + l_lock_init(&ns->ns_lock); ns->ns_refcount = 0; - ns->ns_local = local; + ns->ns_client = client; for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash; bucket--) INIT_LIST_HEAD(bucket); - return ns; + spin_lock(&ldlm_namespace_lock); + list_add(&ns->ns_list_chain, &ldlm_namespace_list); + ns->ns_proc_dir = proc_mkdir(ns->ns_name, ldlm_ns_proc_dir); + if (ns->ns_proc_dir == NULL) + CERROR("Unable to create proc directory for namespace.\n"); + spin_unlock(&ldlm_namespace_lock); + + RETURN(ns); + + out: + if (ns && ns->ns_hash) { + vfree(ns->ns_hash); + obd_memory -= sizeof(*ns->ns_hash) * RES_HASH_SIZE; + } + if (ns && ns->ns_name) + OBD_FREE(ns->ns_name, strlen(name) + 1); + if (ns) + OBD_FREE(ns, sizeof(*ns)); + return NULL; } -static int cleanup_resource(struct ldlm_resource *res, struct list_head *q) +extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock); + +static void cleanup_resource(struct ldlm_resource *res, struct list_head *q) { struct list_head *tmp, *pos; - int rc = 0; + int rc = 0, client = res->lr_namespace->ns_client; + ENTRY; list_for_each_safe(tmp, pos, q) { struct ldlm_lock *lock; - - if (rc) { - /* Res was already cleaned up. */ - LBUG(); - } - lock = list_entry(tmp, struct ldlm_lock, l_res_link); - spin_lock(&lock->l_lock); - ldlm_resource_del_lock(lock); - ldlm_lock_free(lock); + LDLM_LOCK_GET(lock); + + if (client) { + struct lustre_handle lockh; + ldlm_lock2handle(lock, &lockh); + /* can we get away without a connh here? */ + rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) { + /* It failed remotely, but we'll force it to + * cleanup locally. */ + CERROR("ldlm_cli_cancel: %d\n", rc); + ldlm_lock_cancel(lock); + } + } else { + LDLM_DEBUG(lock, "Freeing a lock still held by a " + "client node.\n"); - rc = ldlm_resource_put(res); + ldlm_resource_unlink_lock(lock); + ldlm_lock_destroy(lock); + } + LDLM_LOCK_PUT(lock); } - - return rc; } int ldlm_namespace_free(struct ldlm_namespace *ns) { struct list_head *tmp, *pos; - int i, rc; + int i; + + if (!ns) + RETURN(ELDLM_OK); + + spin_lock(&ldlm_namespace_lock); + list_del(&ns->ns_list_chain); + remove_proc_entry(ns->ns_name, ldlm_ns_proc_dir); + spin_unlock(&ldlm_namespace_lock); + + l_lock(&ns->ns_lock); - /* We should probably take the ns_lock, but then ldlm_resource_put - * couldn't take it. Hmm. */ for (i = 0; i < RES_HASH_SIZE; i++) { list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) { struct ldlm_resource *res; res = list_entry(tmp, struct ldlm_resource, lr_hash); - - spin_lock(&res->lr_lock); - rc = cleanup_resource(res, &res->lr_granted); - if (!rc) - rc = cleanup_resource(res, &res->lr_converting); - if (!rc) - rc = cleanup_resource(res, &res->lr_waiting); - - if (rc == 0) { - CERROR("Resource refcount nonzero after lock " - "cleanup; forcing cleanup.\n"); - res->lr_refcount = 1; - rc = ldlm_resource_put(res); + ldlm_resource_getref(res); + + cleanup_resource(res, &res->lr_granted); + cleanup_resource(res, &res->lr_converting); + cleanup_resource(res, &res->lr_waiting); + + if (!ldlm_resource_put(res)) { + CERROR("Resource refcount nonzero (%d) after " + "lock cleanup; forcing cleanup.\n", + atomic_read(&res->lr_refcount)); + ldlm_resource_dump(res); + atomic_set(&res->lr_refcount, 1); + ldlm_resource_put(res); } } } - vfree(ns->ns_hash /* , sizeof(struct list_head) * RES_HASH_SIZE */); + vfree(ns->ns_hash /* , sizeof(*ns->ns_hash) * RES_HASH_SIZE */); + obd_memory -= sizeof(*ns->ns_hash) * RES_HASH_SIZE; + OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1); OBD_FREE(ns, sizeof(*ns)); return ELDLM_OK; } +int ldlm_client_free(struct obd_export *exp) +{ + struct ldlm_export_data *led = &exp->exp_ldlm_data; + ptlrpc_cleanup_client(&led->led_client); + RETURN(0); +} + static __u32 ldlm_hash_fn(struct ldlm_resource *parent, __u64 *name) { __u32 hash = 0; @@ -135,8 +211,7 @@ static struct ldlm_resource *ldlm_resource_new(void) INIT_LIST_HEAD(&res->lr_converting); INIT_LIST_HEAD(&res->lr_waiting); - res->lr_lock = SPIN_LOCK_UNLOCKED; - res->lr_refcount = 1; + atomic_set(&res->lr_refcount, 1); return res; } @@ -151,7 +226,7 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *res; ENTRY; - if (type < 0 || type > LDLM_MAX_TYPE) { + if (type < LDLM_MIN_TYPE || type > LDLM_MAX_TYPE) { LBUG(); RETURN(NULL); } @@ -166,16 +241,15 @@ static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns, res->lr_namespace = ns; ns->ns_refcount++; - res->lr_type = type; + res->lr_type = type; res->lr_most_restr = LCK_NL; bucket = ns->ns_hash + ldlm_hash_fn(parent, name); list_add(&res->lr_hash, bucket); - if (parent == NULL) { - res->lr_parent = res; - list_add(&res->lr_rootlink, &ns->ns_root_list); - } else { + if (parent == NULL) + list_add(&res->lr_childof, &ns->ns_root_list); + else { res->lr_parent = parent; list_add(&res->lr_childof, &parent->lr_children); } @@ -193,13 +267,14 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, struct list_head *bucket; struct list_head *tmp = bucket; struct ldlm_resource *res = NULL; - ENTRY; - if (ns->ns_hash == NULL) + if (ns == NULL || ns->ns_hash == NULL) { + LBUG(); RETURN(NULL); + } - spin_lock(&ns->ns_lock); + l_lock(&ns->ns_lock); bucket = ns->ns_hash + ldlm_hash_fn(parent, name); list_for_each(tmp, bucket) { @@ -208,9 +283,7 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, if (memcmp(chk->lr_name, name, sizeof(chk->lr_name)) == 0) { res = chk; - spin_lock(&res->lr_lock); - res->lr_refcount++; - spin_unlock(&res->lr_lock); + atomic_inc(&res->lr_refcount); EXIT; break; } @@ -218,29 +291,31 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, if (res == NULL && create) res = ldlm_resource_add(ns, parent, name, type); - spin_unlock(&ns->ns_lock); + l_unlock(&ns->ns_lock); RETURN(res); } -/* Args: locked resource - * Locks: takes and releases res->lr_lock - * takes and releases ns->ns_lock iff res->lr_refcount falls to 0 - */ +struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res) +{ + atomic_inc(&res->lr_refcount); + return res; +} + +/* Returns 1 if the resource was freed, 0 if it remains. */ int ldlm_resource_put(struct ldlm_resource *res) { int rc = 0; - if (res->lr_refcount == 1) { + if (atomic_dec_and_test(&res->lr_refcount)) { struct ldlm_namespace *ns = res->lr_namespace; ENTRY; - spin_unlock(&res->lr_lock); - spin_lock(&ns->ns_lock); - spin_lock(&res->lr_lock); + l_lock(&ns->ns_lock); - if (res->lr_refcount != 1) { - spin_unlock(&ns->ns_lock); + if (atomic_read(&res->lr_refcount) != 0) { + /* We lost the race. */ + l_unlock(&ns->ns_lock); goto out; } @@ -258,42 +333,41 @@ int ldlm_resource_put(struct ldlm_resource *res) ns->ns_refcount--; list_del(&res->lr_hash); - list_del(&res->lr_rootlink); list_del(&res->lr_childof); kmem_cache_free(ldlm_resource_slab, res); - spin_unlock(&ns->ns_lock); + l_unlock(&ns->ns_lock); rc = 1; } else { ENTRY; out: - res->lr_refcount--; - if (res->lr_refcount < 0) + if (atomic_read(&res->lr_refcount) < 0) LBUG(); } - RETURN(rc); + RETURN(rc); } -/* Must be called with resource->lr_lock taken */ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, struct ldlm_lock *lock) { + l_lock(&res->lr_namespace->ns_lock); + + ldlm_resource_dump(res); + ldlm_lock_dump(lock); + + if (!list_empty(&lock->l_res_link)) + LBUG(); + list_add(&lock->l_res_link, head); - res->lr_refcount++; + l_unlock(&res->lr_namespace->ns_lock); } -/* Must be called with resource->lr_lock taken */ -void ldlm_resource_del_lock(struct ldlm_lock *lock) +void ldlm_resource_unlink_lock(struct ldlm_lock *lock) { + l_lock(&lock->l_resource->lr_namespace->ns_lock); list_del_init(&lock->l_res_link); - lock->l_resource->lr_refcount--; -} - -int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h) -{ - LBUG(); - return 0; + l_unlock(&lock->l_resource->lr_namespace->ns_lock); } void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc) @@ -303,6 +377,40 @@ void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc) memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version)); } +void ldlm_dump_all_namespaces(void) +{ + struct list_head *tmp; + + spin_lock(&ldlm_namespace_lock); + + list_for_each(tmp, &ldlm_namespace_list) { + struct ldlm_namespace *ns; + ns = list_entry(tmp, struct ldlm_namespace, ns_list_chain); + ldlm_namespace_dump(ns); + } + + spin_unlock(&ldlm_namespace_lock); +} + +void ldlm_namespace_dump(struct ldlm_namespace *ns) +{ + struct list_head *tmp; + + l_lock(&ns->ns_lock); + CDEBUG(D_OTHER, "--- Namespace: %s (rc: %d, client: %d)\n", ns->ns_name, + ns->ns_refcount, ns->ns_client); + + list_for_each(tmp, &ns->ns_root_list) { + struct ldlm_resource *res; + res = list_entry(tmp, struct ldlm_resource, lr_childof); + + /* Once we have resources with children, this should really dump + * them recursively. */ + ldlm_resource_dump(res); + } + l_unlock(&ns->ns_lock); +} + void ldlm_resource_dump(struct ldlm_resource *res) { struct list_head *tmp; @@ -316,8 +424,10 @@ void ldlm_resource_dump(struct ldlm_resource *res) (unsigned long long)res->lr_name[1], (unsigned long long)res->lr_name[2]); - CDEBUG(D_OTHER, "--- Resource: %p (%s)\n", res, name); - CDEBUG(D_OTHER, "Namespace: %p\n", res->lr_namespace); + CDEBUG(D_OTHER, "--- Resource: %p (%s) (rc: %d)\n", res, name, + atomic_read(&res->lr_refcount)); + CDEBUG(D_OTHER, "Namespace: %p (%s)\n", res->lr_namespace, + res->lr_namespace->ns_name); CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root); CDEBUG(D_OTHER, "Granted locks:\n");