From: pschwan Date: Sat, 19 Oct 2002 11:33:33 +0000 (+0000) Subject: - Replace per-namespace recursive lock with an ldlm-global lock, to close the X-Git-Tag: v1_7_100~4464 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=69b292d708e6d12e0849ee1293dded75a6e54d0b;p=fs%2Flustre-release.git - Replace per-namespace recursive lock with an ldlm-global lock, to close the race between ldlm_handle2lock and the lock destroy portion of ldlm_lock_put - Add a special interface for ldlm_handle2lock to resolve the following conflict between our invariants: 1. ldlm_handle2lock should never give a reference to a "dying" lock 2. ldlm_lock_decref needs to use ldlm_handle2lock to get a reference to a lock so that it can decrease the readers/writers refcount 3. The lock can't finish dying until readers/writers hits zero. That should be fixed now. --- diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 3bd8320..4324ee2 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -93,7 +93,6 @@ struct ldlm_namespace { struct list_head *ns_hash; /* hash table for ns */ __u32 ns_refcount; /* count of resources in the hash */ struct list_head ns_root_list; /* all root resources in ns */ - struct lustre_lock ns_lock; /* protects hash, refcount, list */ struct list_head ns_list_chain; /* position in global NS list */ struct proc_dir_entry *ns_proc_dir; @@ -266,6 +265,9 @@ do { \ #define LDLM_DEBUG_NOLOCK(format, a...) \ CDEBUG(D_DLMTRACE, "### " format "\n" , ## a) +/* For internal LDLM use */ +extern struct lustre_lock ldlm_everything_lock; + /* ldlm_extent.c */ int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *); int ldlm_extent_policy(struct ldlm_lock *, void *, ldlm_mode_t, void *); @@ -281,10 +283,15 @@ void ldlm_register_intent(int (*arg)(struct ldlm_lock *lock, void *req_cookie, ldlm_mode_t mode, void *data)); void ldlm_unregister_intent(void); void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh); -struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle); +struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *, int strict); void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh); void ldlm_cancel_callback(struct ldlm_lock *lock); +static inline struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *h) +{ + return __ldlm_handle2lock(h, 1); +} + #define LDLM_LOCK_PUT(lock) \ do { \ /*LDLM_DEBUG(lock, "put");*/ \ diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 1f952cd..16c24fd 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -79,11 +79,11 @@ int ldlm_extent_policy(struct ldlm_lock *lock, void *req_cookie, if (!res) LBUG(); - l_lock(&res->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); policy_internal(&res->lr_granted, req_ex, &new_ex, mode); policy_internal(&res->lr_converting, req_ex, &new_ex, mode); policy_internal(&res->lr_waiting, req_ex, &new_ex, mode); - l_unlock(&res->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); memcpy(&lock->l_extent, &new_ex, sizeof(new_ex)); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 065edd9..97a8770 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -31,7 +31,7 @@ #include /* this lock protects ldlm_handle2lock's integrity */ -//static spinlock_t ldlm_handle_lock = SPIN_LOCK_UNLOCKED; +struct lustre_lock ldlm_everything_lock; /* lock types */ char *ldlm_lockname[] = { @@ -132,10 +132,10 @@ void ldlm_unregister_intent(void) */ struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); lock->l_refc++; ldlm_resource_getref(lock->l_resource); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); return lock; } @@ -144,36 +144,35 @@ void ldlm_lock_put(struct ldlm_lock *lock) struct ldlm_namespace *ns = lock->l_resource->lr_namespace; ENTRY; - l_lock(&ns->ns_lock); + l_lock(&ldlm_everything_lock); lock->l_refc--; //LDLM_DEBUG(lock, "after refc--"); if (lock->l_refc < 0) LBUG(); - if (ldlm_resource_put(lock->l_resource)) + if (ldlm_resource_put(lock->l_resource)) { + LASSERT(lock->l_refc == 0); lock->l_resource = NULL; + } if (lock->l_parent) LDLM_LOCK_PUT(lock->l_parent); if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) { - l_unlock(&ns->ns_lock); LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing"); - //spin_lock(&ldlm_handle_lock); spin_lock(&ns->ns_counter_lock); ns->ns_locks--; spin_unlock(&ns->ns_counter_lock); - lock->l_resource = NULL; lock->l_random = DEAD_HANDLE_MAGIC; if (lock->l_export && lock->l_export->exp_connection) ptlrpc_put_connection(lock->l_export->exp_connection); kmem_cache_free(ldlm_lock_slab, lock); - //spin_unlock(&ldlm_handle_lock); + l_unlock(&ldlm_everything_lock); CDEBUG(D_MALLOC, "kfreed 'lock': %d at %p (tot 0).\n", sizeof(*lock), lock); } else - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); EXIT; } @@ -181,7 +180,7 @@ void ldlm_lock_put(struct ldlm_lock *lock) void ldlm_lock_destroy(struct ldlm_lock *lock) { ENTRY; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); if (!list_empty(&lock->l_children)) { LDLM_DEBUG(lock, "still has children (%p)!", @@ -200,7 +199,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock) } if (lock->l_flags & LDLM_FL_DESTROYED) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); EXIT; return; } @@ -215,7 +214,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock) if (lock->l_export && lock->l_completion_ast) lock->l_completion_ast(lock, 0); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); LDLM_LOCK_PUT(lock); EXIT; } @@ -257,10 +256,10 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, spin_unlock(&resource->lr_namespace->ns_counter_lock); if (parent != NULL) { - l_lock(&parent->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); lock->l_parent = parent; list_add(&lock->l_childof, &parent->l_children); - l_unlock(&parent->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); } CDEBUG(D_MALLOC, "kmalloced 'lock': %d at " @@ -277,11 +276,11 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]) int type, i; ENTRY; - l_lock(&ns->ns_lock); + l_lock(&ldlm_everything_lock); if (memcmp(new_resid, lock->l_resource->lr_name, sizeof(lock->l_resource->lr_name)) == 0) { /* Nothing to do */ - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); RETURN(0); } @@ -305,7 +304,7 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]) /* compensate for the initial get above.. */ ldlm_resource_put(lock->l_resource); - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); RETURN(0); } @@ -319,7 +318,8 @@ void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh) lockh->cookie = lock->l_random; } -struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle) +struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, + int strict) { struct ldlm_lock *lock = NULL, *retval = NULL; ENTRY; @@ -327,31 +327,30 @@ struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle) if (!handle || !handle->addr) RETURN(NULL); - //spin_lock(&ldlm_handle_lock); + l_lock(&ldlm_everything_lock); lock = (struct ldlm_lock *)(unsigned long)(handle->addr); if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock)) { CERROR("bogus lock %p\n", lock); - GOTO(out2, retval); + GOTO(out, retval); } if (lock->l_random != handle->cookie) { CERROR("bogus cookie: lock %p has "LPX64" vs. handle "LPX64"\n", lock, lock->l_random, handle->cookie); - GOTO(out2, NULL); + GOTO(out, NULL); } if (!lock->l_resource) { CERROR("trying to lock bogus resource: lock %p\n", lock); LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock); - GOTO(out2, retval); + GOTO(out, retval); } if (!lock->l_resource->lr_namespace) { CERROR("trying to lock bogus namespace: lock %p\n", lock); LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock); - GOTO(out2, retval); + GOTO(out, retval); } - l_lock(&lock->l_resource->lr_namespace->ns_lock); - if (lock->l_flags & LDLM_FL_DESTROYED) { + if (strict && lock->l_flags & LDLM_FL_DESTROYED) { CERROR("lock already destroyed: lock %p\n", lock); LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock); GOTO(out, NULL); @@ -362,9 +361,7 @@ struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle) CERROR("lock disappeared below us!!! %p\n", lock); EXIT; out: - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - out2: - //spin_unlock(&ldlm_handle_lock); + l_unlock(&ldlm_everything_lock); return retval; } @@ -388,7 +385,7 @@ static void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_ast_work *w; ENTRY; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); if (new && (lock->l_flags & LDLM_FL_AST_SENT)) GOTO(out, 0); @@ -407,7 +404,7 @@ static void ldlm_add_ast_work_item(struct ldlm_lock *lock, w->w_lock = LDLM_LOCK_GET(lock); list_add(&w->w_list, lock->l_resource->lr_tmp); out: - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); return; } @@ -423,12 +420,12 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode) /* only called for local locks */ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) lock->l_readers++; else lock->l_writers++; - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); LDLM_LOCK_GET(lock); LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]); } @@ -436,14 +433,14 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) /* Args: unlocked lock */ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode) { - struct ldlm_lock *lock = ldlm_handle2lock(lockh); + struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0); ENTRY; if (lock == NULL) LBUG(); LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) lock->l_readers--; else @@ -459,13 +456,13 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode) "warning\n"); LDLM_DEBUG(lock, "final decref done on cbpending lock"); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); /* FIXME: need a real 'desc' here */ lock->l_blocking_ast(lock, NULL, lock->l_data, lock->l_data_len, LDLM_CB_BLOCKING); } else - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ LDLM_LOCK_PUT(lock); /* matches the handle2lock above */ @@ -514,7 +511,7 @@ static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs) int rc; ENTRY; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); rc = ldlm_lock_compat_list(lock, send_cbs, &lock->l_resource->lr_granted); /* FIXME: should we be sending ASTs to converting? */ @@ -522,7 +519,7 @@ static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs) rc = ldlm_lock_compat_list (lock, send_cbs, &lock->l_resource->lr_converting); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); RETURN(rc); } @@ -534,7 +531,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock) struct ldlm_resource *res = lock->l_resource; ENTRY; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); ldlm_resource_add_lock(res, &res->lr_granted, lock); lock->l_granted_mode = lock->l_req_mode; @@ -544,7 +541,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock) if (lock->l_completion_ast) { ldlm_add_ast_work_item(lock, NULL); } - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); EXIT; } @@ -596,8 +593,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 * res_id, __u32 type, if (res == NULL) RETURN(0); - ns = res->lr_namespace; - l_lock(&ns->ns_lock); + l_lock(&ldlm_everything_lock); if ((lock = search_queue(&res->lr_granted, mode, cookie))) GOTO(out, rc = 1); @@ -609,7 +605,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 * res_id, __u32 type, EXIT; out: ldlm_resource_put(res); - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); if (lock) { ldlm_lock2handle(lock, lockh); @@ -689,7 +685,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock, lock->l_cookie = cookie; lock->l_cookie_len = cookie_len; - l_lock(&res->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); if (local && lock->l_req_mode == lock->l_granted_mode) { /* The server returned a blocked lock, but it was granted before * we got a chance to actually enqueue it. We don't need to do @@ -736,7 +732,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock, ldlm_grant_lock(lock); EXIT; out: - l_unlock(&res->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); /* Don't set 'completion_ast' until here so that if the lock is granted * immediately we don't do an unnecessary completion call. */ lock->l_completion_ast = completion; @@ -804,7 +800,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res) return; } - l_lock(&res->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); res->lr_tmp = &rpc_list; ldlm_reprocess_queue(res, &res->lr_converting); @@ -812,7 +808,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res) ldlm_reprocess_queue(res, &res->lr_waiting); res->lr_tmp = NULL; - l_unlock(&res->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); ldlm_run_ast_work(&rpc_list); EXIT; @@ -820,13 +816,13 @@ void ldlm_reprocess_all(struct ldlm_resource *res) void ldlm_cancel_callback(struct ldlm_lock *lock) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); if (!(lock->l_flags & LDLM_FL_CANCEL)) { lock->l_flags |= LDLM_FL_CANCEL; lock->l_blocking_ast(lock, NULL, lock->l_data, lock->l_data_len, LDLM_CB_CANCELING); } - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); } void ldlm_lock_cancel(struct ldlm_lock *lock) @@ -838,7 +834,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) res = lock->l_resource; ns = res->lr_namespace; - l_lock(&ns->ns_lock); + l_lock(&ldlm_everything_lock); if (lock->l_readers || lock->l_writers) CDEBUG(D_INFO, "lock still has references (%d readers, %d " "writers)\n", lock->l_readers, lock->l_writers); @@ -848,7 +844,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) ldlm_del_waiting_lock(lock); ldlm_resource_unlink_lock(lock); ldlm_lock_destroy(lock); - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); EXIT; } @@ -880,7 +876,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, res = lock->l_resource; ns = res->lr_namespace; - l_lock(&ns->ns_lock); + l_lock(&ldlm_everything_lock); lock->l_req_mode = new_mode; ldlm_resource_unlink_lock(lock); @@ -910,7 +906,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, *flags |= LDLM_FL_BLOCK_CONV; } - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); if (granted) ldlm_run_ast_work(&rpc_list); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index a6abc5e..21f8801 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -267,10 +267,10 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req) lock->l_export = req->rq_export; if (lock->l_export) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); list_add(&lock->l_export_chain, &lock->l_export->exp_ldlm_data.led_held_locks); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); } EXIT; @@ -399,10 +399,10 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req) LDLM_DEBUG(lock, "client blocking AST callback handler START"); - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); if (do_ast) { LDLM_DEBUG(lock, "already unused, calling " @@ -441,7 +441,7 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req) LDLM_DEBUG(lock, "client completion callback handler START"); - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); /* If we receive the completion AST before the actual enqueue returned, * then we might need to switch lock modes, resources, or extents. */ @@ -462,7 +462,7 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req) lock->l_resource->lr_tmp = &ast_list; ldlm_grant_lock(lock); lock->l_resource->lr_tmp = NULL; - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); LDLM_LOCK_PUT(lock); @@ -669,6 +669,8 @@ static int __init ldlm_init(void) return -ENOMEM; } + l_lock_init(&ldlm_everything_lock); + return 0; } @@ -713,6 +715,7 @@ EXPORT_SYMBOL(ldlm_namespace_dump); EXPORT_SYMBOL(ldlm_cancel_locks_for_export); EXPORT_SYMBOL(l_lock); EXPORT_SYMBOL(l_unlock); +EXPORT_SYMBOL(ldlm_everything_lock); MODULE_AUTHOR("Cluster File Systems, Inc. "); MODULE_DESCRIPTION("Lustre Lock Management Module v0.1"); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 90ca711..be8a581 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -426,10 +426,10 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) if (lock->l_connh) { LDLM_DEBUG(lock, "client-side cancel"); /* Set this flag to prevent others from getting new references*/ - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); lock->l_flags |= LDLM_FL_CBPENDING; ldlm_cancel_callback(lock); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh), LDLM_CANCEL, 1, &size, NULL); @@ -485,7 +485,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id, RETURN(0); } - l_lock(&ns->ns_lock); + l_lock(&ldlm_everything_lock); list_for_each(tmp, &res->lr_granted) { struct ldlm_lock *lock; lock = list_entry(tmp, struct ldlm_lock, l_res_link); @@ -506,7 +506,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id, w->w_lock = LDLM_LOCK_GET(lock); list_add(&w->w_list, &list); } - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); list_for_each_safe(tmp, next, &list) { struct lustre_handle lockh; diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index b7f386a..8a7e03d 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -79,7 +79,6 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) strcpy(ns->ns_name, name); INIT_LIST_HEAD(&ns->ns_root_list); - l_lock_init(&ns->ns_lock); ns->ns_refcount = 0; ns->ns_client = client; spin_lock_init(&ns->ns_counter_lock); @@ -159,7 +158,7 @@ int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int local_only) { int i; - l_lock(&ns->ns_lock); + l_lock(&ldlm_everything_lock); for (i = 0; i < RES_HASH_SIZE; i++) { struct list_head *tmp, *pos; list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) { @@ -171,6 +170,11 @@ int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int local_only) cleanup_resource(res, &res->lr_converting, local_only); cleanup_resource(res, &res->lr_waiting, local_only); + /* XXX this is a bit counter-intuitive and should + * probably be cleaner: don't force cleanup if we're + * local_only (which is only used by recovery). We + * probably still have outstanding lock refs which + * reference these resources. -phil */ if (!ldlm_resource_put(res) && !local_only) { CERROR("Resource refcount nonzero (%d) after " "lock cleanup; forcing cleanup.\n", @@ -181,7 +185,7 @@ int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int local_only) } } } - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); return ELDLM_OK; } @@ -313,7 +317,7 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, RETURN(NULL); } - l_lock(&ns->ns_lock); + l_lock(&ldlm_everything_lock); bucket = ns->ns_hash + ldlm_hash_fn(parent, name); list_for_each(tmp, bucket) { @@ -330,7 +334,7 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, if (res == NULL && create) res = ldlm_resource_add(ns, parent, name, type); - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); RETURN(res); } @@ -350,11 +354,11 @@ int ldlm_resource_put(struct ldlm_resource *res) struct ldlm_namespace *ns = res->lr_namespace; ENTRY; - l_lock(&ns->ns_lock); + l_lock(&ldlm_everything_lock); if (atomic_read(&res->lr_refcount) != 0) { /* We lost the race. */ - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); goto out; } @@ -375,7 +379,7 @@ int ldlm_resource_put(struct ldlm_resource *res) list_del(&res->lr_childof); kmem_cache_free(ldlm_resource_slab, res); - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); spin_lock(&ns->ns_counter_lock); ns->ns_resources--; @@ -395,7 +399,7 @@ int ldlm_resource_put(struct ldlm_resource *res) void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, struct ldlm_lock *lock) { - l_lock(&res->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); ldlm_resource_dump(res); ldlm_lock_dump(lock); @@ -404,14 +408,14 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, LBUG(); list_add(&lock->l_res_link, head); - l_unlock(&res->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); } void ldlm_resource_unlink_lock(struct ldlm_lock *lock) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_lock(&ldlm_everything_lock); list_del_init(&lock->l_res_link); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); } void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc) @@ -440,7 +444,7 @@ void ldlm_namespace_dump(struct ldlm_namespace *ns) { struct list_head *tmp; - l_lock(&ns->ns_lock); + l_lock(&ldlm_everything_lock); CDEBUG(D_OTHER, "--- Namespace: %s (rc: %d, client: %d)\n", ns->ns_name, ns->ns_refcount, ns->ns_client); @@ -452,7 +456,7 @@ void ldlm_namespace_dump(struct ldlm_namespace *ns) * them recursively. */ ldlm_resource_dump(res); } - l_unlock(&ns->ns_lock); + l_unlock(&ldlm_everything_lock); } void ldlm_resource_dump(struct ldlm_resource *res) diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 1f60205..c44c087 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -443,10 +443,12 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(0); } - l_lock(&lock->l_resource->lr_namespace->ns_lock); + /* XXX layering violation! When this goes, so can the + * EXPORT_SYMBOL(ldlm_everything_lock) in ldlm/ldlm_lockd.c -phil*/ + l_lock(&ldlm_everything_lock); lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&ldlm_everything_lock); if (do_ast) { struct lustre_handle lockh;