From 35c1f53f2403b16415cb445927c02d141eac8555 Mon Sep 17 00:00:00 2001 From: nikita Date: Sat, 18 Oct 2008 17:04:25 +0000 Subject: [PATCH] lu_ref support for ldlm_lock and ldlm_resource. See lu_ref patch. lu_ref fields ->l_reference and ->lr_reference are added to ldlm_lock and ldlm_resource. LDLM interface has to be changed, because code that releases a reference on a lock, has to "know" what reference this is. In the most frequent case lock = ldlm_handle2lock(handle); ... LDLM_LOCK_PUT(lock); no changes are required. When any other reference (received _not_ from ldlm_handle2lock()) is released, LDLM_LOCK_RELEASE() has to be called instead of LDLM_LOCK_PUT(). Arguably, changes are pervasive, and interface requires some discipline for proper use. On the other hand, it was very instrumental in finding a few leaked lock references. b=16450 --- lustre/ChangeLog | 26 ++++++++++++++++++++ lustre/include/lustre_dlm.h | 52 ++++++++++++++++++++++++++++++++++++--- lustre/ldlm/ldlm_extent.c | 4 +-- lustre/ldlm/ldlm_flock.c | 2 +- lustre/ldlm/ldlm_lock.c | 54 ++++++++++++++++++++++++++++------------ lustre/ldlm/ldlm_lockd.c | 23 +++++++++-------- lustre/ldlm/ldlm_request.c | 60 +++++++++++++++++++++++++++++++++++---------- lustre/ldlm/ldlm_resource.c | 10 ++++++-- lustre/mdc/mdc_reint.c | 3 ++- lustre/mdt/mdt_handler.c | 18 +++++++++++--- lustre/obdfilter/filter.c | 8 +++--- lustre/osc/osc_request.c | 2 ++ 12 files changed, 208 insertions(+), 54 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 59a3afd..8d31936 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -1540,6 +1540,32 @@ Bugzilla : 16450 Description: Kill unused ldlm_handle2lock_ns() function. Details : Kill unused ldlm_handle2lock_ns() function. +Severity : minor +Bugzilla : 16450 +Description: Kill unused ldlm_handle2lock_ns() function. +Details : Kill unused ldlm_handle2lock_ns() function. + +Severity : minor +Bugzilla : 16450 +Description: Add lu_ref support to ldlm_lock +Details : lu_ref support for ldlm_lock and ldlm_resource. See lu_ref patch. + lu_ref fields ->l_reference and ->lr_reference are added to ldlm_lock + and ldlm_resource. LDLM interface has to be changed, because code that + releases a reference on a lock, has to "know" what reference this is. + In the most frequent case + + lock = ldlm_handle2lock(handle); + ... + LDLM_LOCK_PUT(lock); + + no changes are required. When any other reference (received _not_ from + ldlm_handle2lock()) is released, LDLM_LOCK_RELEASE() has to be called + instead of LDLM_LOCK_PUT(). + + Arguably, changes are pervasive, and interface requires some discipline + for proper use. On the other hand, it was very instrumental in finding + a few leaked lock references. + -------------------------------------------------------------------------------- 2007-08-10 Cluster File Systems, Inc. diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index b7a4117..0e10778 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -53,6 +53,7 @@ #include #include /* for obd_export, for LDLM_DEBUG */ #include /* for interval_node{}, ldlm_extent */ +#include struct obd_ops; struct obd_device; @@ -699,6 +700,7 @@ struct ldlm_lock { */ struct list_head l_sl_mode; struct list_head l_sl_policy; + struct lu_ref l_reference; }; struct ldlm_resource { @@ -730,6 +732,10 @@ struct ldlm_resource { /* when the resource was considered as contended */ cfs_time_t lr_contention_time; + /** + * List of references to this resource. For debugging. + */ + struct lu_ref lr_reference; }; struct ldlm_ast_work { @@ -860,14 +866,26 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *, int flags); void ldlm_cancel_callback(struct ldlm_lock *); int ldlm_lock_set_data(struct lustre_handle *, void *data); int ldlm_lock_remove_from_lru(struct ldlm_lock *); -struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *, - const struct lustre_handle *); static inline struct ldlm_lock *ldlm_handle2lock(const struct lustre_handle *h) { return __ldlm_handle2lock(h, 0); } +#define LDLM_LOCK_REF_DEL(lock) \ + lu_ref_del(&lock->l_reference, "handle", cfs_current()) + +static inline struct ldlm_lock * +ldlm_handle2lock_long(const struct lustre_handle *h, int flags) +{ + struct ldlm_lock *lock; + + lock = __ldlm_handle2lock(h, flags); + if (lock != NULL) + LDLM_LOCK_REF_DEL(lock); + return lock; +} + static inline int ldlm_res_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m, int buf_idx, int increase) @@ -880,8 +898,27 @@ static inline int ldlm_res_lvbo_update(struct ldlm_resource *res, return 0; } +int ldlm_error2errno(ldlm_error_t error); +ldlm_error_t ldlm_errno2error(int err_no); /* don't call it `errno': this + * confuses user-space. */ + +/** + * Release a temporary lock reference obtained by ldlm_handle2lock() or + * __ldlm_handle2lock(). + */ #define LDLM_LOCK_PUT(lock) \ do { \ + LDLM_LOCK_REF_DEL(lock); \ + /*LDLM_DEBUG((lock), "put");*/ \ + ldlm_lock_put(lock); \ +} while (0) + +/** + * Release a lock reference obtained by some other means (see + * LDLM_LOCK_PUT()). + */ +#define LDLM_LOCK_RELEASE(lock) \ +do { \ /*LDLM_DEBUG((lock), "put");*/ \ ldlm_lock_put(lock); \ } while (0) @@ -901,7 +938,7 @@ do { \ if (c-- == 0) \ break; \ list_del_init(&_lock->member); \ - LDLM_LOCK_PUT(_lock); \ + LDLM_LOCK_RELEASE(_lock); \ } \ LASSERT(c <= 0); \ }) @@ -969,10 +1006,19 @@ void ldlm_resource_dump(int level, struct ldlm_resource *); int ldlm_lock_change_resource(struct ldlm_namespace *, struct ldlm_lock *, const struct ldlm_res_id *); +#define LDLM_RESOURCE_ADDREF(res) do { \ + lu_ref_add(&(res)->lr_reference, __FUNCTION__, cfs_current()); \ +} while (0) + +#define LDLM_RESOURCE_DELREF(res) do { \ + lu_ref_del(&(res)->lr_reference, __FUNCTION__, cfs_current()); \ +} while (0) + struct ldlm_callback_suite { ldlm_completion_callback lcs_completion; ldlm_blocking_callback lcs_blocking; ldlm_glimpse_callback lcs_glimpse; + ldlm_weigh_callback lcs_weigh; }; /* ldlm_request.c */ diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 3b6543e..1541416 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -623,9 +623,9 @@ static void discard_bl_list(struct list_head *bl_list) lock->l_flags &= ~LDLM_FL_AST_SENT; LASSERT(lock->l_bl_ast_run == 0); LASSERT(lock->l_blocking_lock); - LDLM_LOCK_PUT(lock->l_blocking_lock); + LDLM_LOCK_RELEASE(lock->l_blocking_lock); lock->l_blocking_lock = NULL; - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); } EXIT; } diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 93daaf6..2e888ff 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -411,7 +411,7 @@ reprocess: /* insert new2 at lock */ ldlm_resource_add_lock(res, ownlocks, new2); - LDLM_LOCK_PUT(new2); + LDLM_LOCK_RELEASE(new2); break; } diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 8e63622..9977c41 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -159,6 +159,7 @@ void ldlm_lock_put(struct ldlm_lock *lock) LASSERT(list_empty(&lock->l_pending_chain)); atomic_dec(&res->lr_namespace->ns_locks); + lu_ref_del(&res->lr_reference, "lock", lock); ldlm_resource_putref(res); lock->l_resource = NULL; if (lock->l_export) { @@ -170,6 +171,7 @@ void ldlm_lock_put(struct ldlm_lock *lock) OBD_FREE(lock->l_lvb_data, lock->l_lvb_len); ldlm_interval_free(ldlm_interval_detach(lock)); + lu_ref_fini(&lock->l_reference); OBD_FREE_RCU_CB(lock, sizeof(*lock), &lock->l_handle, ldlm_lock_free); } @@ -295,8 +297,10 @@ void ldlm_lock_destroy(struct ldlm_lock *lock) unlock_res_and_lock(lock); /* drop reference from hashtable only for first destroy */ - if (first) - LDLM_LOCK_PUT(lock); + if (first) { + lu_ref_del(&lock->l_reference, "hash", lock); + LDLM_LOCK_RELEASE(lock); + } EXIT; } @@ -306,8 +310,10 @@ void ldlm_lock_destroy_nolock(struct ldlm_lock *lock) ENTRY; first = ldlm_lock_destroy_internal(lock); /* drop reference from hashtable only for first destroy */ - if (first) - LDLM_LOCK_PUT(lock); + if (first) { + lu_ref_del(&lock->l_reference, "hash", lock); + LDLM_LOCK_RELEASE(lock); + } EXIT; } @@ -337,6 +343,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) spin_lock_init(&lock->l_lock); lock->l_resource = ldlm_resource_getref(resource); + lu_ref_add(&resource->lr_reference, "lock", lock); atomic_set(&lock->l_refc, 2); CFS_INIT_LIST_HEAD(&lock->l_res_link); @@ -358,6 +365,8 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) CFS_INIT_LIST_HEAD(&lock->l_extents_list); spin_lock_init(&lock->l_extents_list_lock); CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list); + lu_ref_init(&lock->l_reference); + lu_ref_add(&lock->l_reference, "hash", lock); RETURN(lock); } @@ -389,6 +398,7 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, unlock_res_and_lock(lock); newres = ldlm_resource_get(ns, NULL, new_resid, type, 1); + lu_ref_add(&newres->lr_reference, "lock", lock); if (newres == NULL) RETURN(-ENOMEM); /* @@ -413,6 +423,7 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, unlock_res_and_lock(lock); /* ...and the flowers are still standing! */ + lu_ref_del(&oldres->lr_reference, "lock", lock); ldlm_resource_putref(oldres); RETURN(0); @@ -448,6 +459,7 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle, ns = lock->l_resource->lr_namespace; LASSERT(ns != NULL); + lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current()); lock_res_and_lock(lock); /* It's unlikely but possible that someone marked the lock as @@ -576,11 +588,16 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode) void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode) { ldlm_lock_remove_from_lru(lock); - if (mode & (LCK_NL | LCK_CR | LCK_PR)) + if (mode & (LCK_NL | LCK_CR | LCK_PR)) { lock->l_readers++; - if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) + lu_ref_add_atomic(&lock->l_reference, "reader", lock); + } + if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) { lock->l_writers++; + lu_ref_add_atomic(&lock->l_reference, "writer", lock); + } LDLM_LOCK_GET(lock); + lu_ref_add_atomic(&lock->l_reference, "user", lock); LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]); } @@ -601,14 +618,17 @@ void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode) LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); if (mode & (LCK_NL | LCK_CR | LCK_PR)) { LASSERT(lock->l_readers > 0); + lu_ref_del(&lock->l_reference, "reader", lock); lock->l_readers--; } if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) { LASSERT(lock->l_writers > 0); + lu_ref_del(&lock->l_reference, "writer", lock); lock->l_writers--; } - LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ + lu_ref_del(&lock->l_reference, "user", lock); + LDLM_LOCK_RELEASE(lock); /* matches the LDLM_LOCK_GET() in addref */ } void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) @@ -1076,6 +1096,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, RETURN(0); } + LDLM_RESOURCE_ADDREF(res); lock_res(res); lock = search_queue(&res->lr_granted, &mode, policy, old_lock, flags); @@ -1093,6 +1114,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, EXIT; out: unlock_res(res); + LDLM_RESOURCE_DELREF(res); ldlm_resource_putref(res); if (lock) { @@ -1106,7 +1128,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, NULL); if (err) { if (flags & LDLM_FL_TEST_LOCK) - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); else ldlm_lock_decref_internal(lock, mode); rc = 0; @@ -1140,7 +1162,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, } if (flags & LDLM_FL_TEST_LOCK) - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/ LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res " @@ -1238,7 +1260,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, * work here is done. */ if (lock != *lockp) { ldlm_lock_destroy(lock); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); } *flags |= LDLM_FL_LOCK_CHANGED; RETURN(0); @@ -1396,11 +1418,11 @@ ldlm_work_bl_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) ldlm_lock2desc(lock->l_blocking_lock, &d); - LDLM_LOCK_PUT(lock->l_blocking_lock); + LDLM_LOCK_RELEASE(lock->l_blocking_lock); lock->l_blocking_lock = NULL; lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); RETURN(1); } @@ -1438,7 +1460,7 @@ ldlm_work_cp_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) completion_callback(lock, 0, (void *)arg); rc = 1; } - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); RETURN(rc); } @@ -1458,7 +1480,7 @@ ldlm_work_revoke_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) desc.l_granted_mode = 0; lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); RETURN(1); } @@ -1539,9 +1561,11 @@ void ldlm_reprocess_all_ns(struct ldlm_namespace *ns) ldlm_resource_getref(res); spin_unlock(&ns->ns_hash_lock); + LDLM_RESOURCE_ADDREF(res); rc = reprocess_one_queue(res, NULL); + LDLM_RESOURCE_DELREF(res); spin_lock(&ns->ns_hash_lock); tmp = tmp->next; ldlm_resource_putref_locked(res); @@ -1677,7 +1701,7 @@ void ldlm_cancel_locks_for_export_cb(void *obj, void *data) ldlm_lock_cancel(lock); ldlm_reprocess_all(res); ldlm_resource_putref(res); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); } void ldlm_cancel_locks_for_export(struct obd_export *exp) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 8a7acd5..e4a3205 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -566,7 +566,7 @@ static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc) ? "blocking" : "completion"); } - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); if (rc == -ERESTART) atomic_set(&arg->restart, 1); @@ -1142,7 +1142,7 @@ existing_lock: if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK) ldlm_reprocess_all(lock->l_resource); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); } LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)", @@ -1276,10 +1276,12 @@ int ldlm_request_cancel(struct ptlrpc_request *req, if (res != pres) { if (pres != NULL) { ldlm_reprocess_all(pres); + LDLM_RESOURCE_DELREF(pres); ldlm_resource_putref(pres); } if (res != NULL) { ldlm_resource_getref(res); + LDLM_RESOURCE_ADDREF(res); ldlm_res_lvbo_update(res, NULL, 0, 1); } pres = res; @@ -1289,6 +1291,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req, } if (pres != NULL) { ldlm_reprocess_all(pres); + LDLM_RESOURCE_DELREF(pres); ldlm_resource_putref(pres); } LDLM_DEBUG_NOLOCK("server-side cancel handler END"); @@ -1354,7 +1357,7 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, } LDLM_DEBUG(lock, "client blocking callback handler END"); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); EXIT; } @@ -1384,7 +1387,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, /* bug 11300: the lock has already been granted */ unlock_res_and_lock(lock); LDLM_DEBUG(lock, "Double grant race happened"); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); EXIT; return; } @@ -1409,7 +1412,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, if (ldlm_lock_change_resource(ns, lock, &dlm_req->lock_desc.l_resource.lr_name) != 0) { LDLM_ERROR(lock, "Failed to allocate resource"); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); EXIT; return; } @@ -1448,7 +1451,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", lock); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); EXIT; } @@ -1486,7 +1489,7 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, return; } unlock_res_and_lock(lock); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); EXIT; } @@ -1697,7 +1700,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) CERROR("ldlm_cli_cancel: %d\n", rc); } - lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]); + lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0); if (!lock) { CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock " "disappeared\n", dlm_req->lock_handle[0].cookie); @@ -1718,7 +1721,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) LPX64" - lock disappeared\n", dlm_req->lock_handle[0].cookie); unlock_res_and_lock(lock); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); ldlm_callback_reply(req, -EINVAL); RETURN(0); } @@ -2081,7 +2084,7 @@ ldlm_export_lock_put(struct hlist_node *hnode) ENTRY; lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); RETURN(lock); } diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 6e08c02..0d15a15 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -406,7 +406,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, LDLM_DEBUG(lock, "client-side local enqueue END"); EXIT; out: - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); out_nolock: return err; } @@ -595,7 +595,7 @@ cleanup: failed_lock_cleanup(ns, lock, lockh, mode); /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */ LDLM_LOCK_PUT(lock); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); return rc; } @@ -778,7 +778,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_ENQUEUE); if (req == NULL) { failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); RETURN(-ENOMEM); } req_passed_in = 0; @@ -837,7 +837,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, /* If ldlm_cli_enqueue_fini did not find the lock, we need to free * one reference that we took */ if (err == -ENOLCK) - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); else rc = err; @@ -1189,7 +1189,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) ENTRY; /* concurrent cancels on the same handle can happen */ - lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING); + lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING); if (lock == NULL) { LDLM_DEBUG_NOLOCK("lock is already being destroyed\n"); RETURN(0); @@ -1197,7 +1197,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) rc = ldlm_cli_cancel_local(lock); if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) { - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); RETURN(rc < 0 ? rc : 0); } /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL @@ -1252,7 +1252,7 @@ static int ldlm_cancel_list(struct list_head *cancels, int count, int flags) if (rc == LDLM_FL_LOCAL_ONLY) { /* CANCEL RPC should not be sent to server. */ list_del_init(&lock->l_bl_ast); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); count--; } @@ -1528,6 +1528,10 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, if (&lock->l_lru == &ns->ns_unused_list) break; + LDLM_LOCK_GET(lock); + spin_unlock(&ns->ns_unused_lock); + lu_ref_add(&lock->l_reference, __FUNCTION__, cfs_current()); + /* Pass the lock through the policy filter and see if it * should stay in lru. * @@ -1541,11 +1545,14 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, * old locks, but additionally chose them by * their weight. Big extent locks will stay in * the cache. */ - if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK) + if (pf(ns, lock, unused, added, count) == + LDLM_POLICY_KEEP_LOCK) { + lu_ref_del(&lock->l_reference, + __FUNCTION__, cfs_current()); + LDLM_LOCK_RELEASE(lock); + spin_lock(&ns->ns_unused_lock); break; - - LDLM_LOCK_GET(lock); /* dropped by bl thread */ - spin_unlock(&ns->ns_unused_lock); + } lock_res_and_lock(lock); /* Check flags again under the lock. */ @@ -1557,7 +1564,9 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, * cancel by itseft or the lock is matched * is already not unused. */ unlock_res_and_lock(lock); - LDLM_LOCK_PUT(lock); + lu_ref_del(&lock->l_reference, + __FUNCTION__, cfs_current()); + LDLM_LOCK_RELEASE(lock); spin_lock(&ns->ns_unused_lock); continue; } @@ -1587,6 +1596,7 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, LASSERT(list_empty(&lock->l_bl_ast)); list_add(&lock->l_bl_ast, cancels); unlock_res_and_lock(lock); + lu_ref_del(&lock->l_reference, __FUNCTION__, cfs_current()); spin_lock(&ns->ns_unused_lock); added++; unused--; @@ -1600,17 +1610,22 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns, int count, int max, int flags) { + struct list_head disp = CFS_LIST_HEAD_INIT(disp); ldlm_cancel_lru_policy_t pf; struct ldlm_lock *lock; int added = 0, unused; + int loop_stop = 0; ENTRY; pf = ldlm_cancel_lru_policy(ns, flags); LASSERT(pf != NULL); spin_lock(&ns->ns_unused_lock); unused = ns->ns_nr_unused; + list_splice_init(&ns->ns_unused_list, &disp); + while (!list_empty(&disp)) { + lock = list_entry(disp.next, struct ldlm_lock, l_lru); + list_move_tail(&lock->l_lru, &ns->ns_unused_list); - list_for_each_entry(lock, &ns->ns_unused_list, l_lru) { /* For any flags, stop scanning if @max is reached. */ if (max && added >= max) break; @@ -1622,14 +1637,25 @@ int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns, (lock->l_flags & LDLM_FL_BL_AST)) continue; + LDLM_LOCK_GET(lock); + spin_unlock(&ns->ns_unused_lock); + lu_ref_add(&lock->l_reference, __FUNCTION__, cfs_current()); + /* Pass the lock through the policy filter and see if it * should stay in lru. */ if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK) + loop_stop = 1; + + lu_ref_del(&lock->l_reference, __FUNCTION__, cfs_current()); + LDLM_LOCK_RELEASE(lock); + spin_lock(&ns->ns_unused_lock); + if (loop_stop) break; added++; unused--; } + list_splice(&disp, ns->ns_unused_list.prev); spin_unlock(&ns->ns_unused_lock); RETURN(added); } @@ -1790,12 +1816,14 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, RETURN(0); } + LDLM_RESOURCE_ADDREF(res); count = ldlm_cancel_resource_local(res, &cancels, policy, mode, 0, flags, opaque); rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags); if (rc != ELDLM_OK) CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc); + LDLM_RESOURCE_DELREF(res); ldlm_resource_putref(res); RETURN(0); } @@ -1844,6 +1872,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, ldlm_resource_getref(res); spin_unlock(&ns->ns_hash_lock); + LDLM_RESOURCE_ADDREF(res); rc = ldlm_cli_cancel_unused_resource(ns, &res->lr_name, NULL, LCK_MINMODE, flags, opaque); @@ -1852,6 +1881,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n", res->lr_name.name[0], rc); + LDLM_RESOURCE_DELREF(res); spin_lock(&ns->ns_hash_lock); tmp = tmp->next; ldlm_resource_putref_locked(res); @@ -1940,9 +1970,11 @@ int ldlm_namespace_foreach_res(struct ldlm_namespace *ns, res = list_entry(tmp, struct ldlm_resource, lr_hash); ldlm_resource_getref(res); spin_unlock(&ns->ns_hash_lock); + LDLM_RESOURCE_ADDREF(res); rc = iter(res, closure); + LDLM_RESOURCE_DELREF(res); spin_lock(&ns->ns_hash_lock); tmp = tmp->next; ldlm_resource_putref_locked(res); @@ -1974,7 +2006,9 @@ void ldlm_resource_iterate(struct ldlm_namespace *ns, return; } + LDLM_RESOURCE_ADDREF(res); ldlm_resource_foreach(res, iter, data); + LDLM_RESOURCE_DELREF(res); ldlm_resource_putref(res); EXIT; } diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index f256639..1b4e46e 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -455,7 +455,7 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY"); if (lock->l_completion_ast) lock->l_completion_ast(lock, 0, NULL); - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); continue; } @@ -474,7 +474,7 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, "client node"); ldlm_lock_destroy(lock); } - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); } while (1); EXIT; @@ -498,6 +498,7 @@ int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags) res = list_entry(tmp, struct ldlm_resource, lr_hash); ldlm_resource_getref(res); spin_unlock(&ns->ns_hash_lock); + LDLM_RESOURCE_ADDREF(res); cleanup_resource(res, &res->lr_granted, flags); cleanup_resource(res, &res->lr_converting, flags); @@ -511,6 +512,7 @@ int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags) * client gets blocking ast when lock gets distracted by * server. This is 1_4 branch solution, let's see how * will it behave. */ + LDLM_RESOURCE_DELREF(res); if (!ldlm_resource_putref_locked(res)) CDEBUG(D_INFO, "Namespace %s resource refcount nonzero " @@ -794,6 +796,7 @@ static struct ldlm_resource *ldlm_resource_new(void) atomic_set(&res->lr_refcount, 1); spin_lock_init(&res->lr_lock); + lu_ref_init(&res->lr_reference); /* one who creates the resource must unlock * the semaphore after lvb initialization */ @@ -965,6 +968,7 @@ void __ldlm_resource_putref_final(struct ldlm_resource *res) ldlm_namespace_put_locked(ns, 0); list_del_init(&res->lr_hash); list_del_init(&res->lr_childof); + lu_ref_fini(&res->lr_reference); ns->ns_resources--; if (ns->ns_resources == 0) @@ -1119,11 +1123,13 @@ void ldlm_namespace_dump(int level, struct ldlm_namespace *ns) ldlm_resource_getref(res); spin_unlock(&ns->ns_hash_lock); + LDLM_RESOURCE_ADDREF(res); lock_res(res); ldlm_resource_dump(level, res); unlock_res(res); + LDLM_RESOURCE_DELREF(res); spin_lock(&ns->ns_hash_lock); tmp = tmp->next; ldlm_resource_putref_locked(res); diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 67effaf..12465cd 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -91,11 +91,12 @@ int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid, NULL, &res_id, 0, 0); if (res == NULL) RETURN(0); - + LDLM_RESOURCE_ADDREF(res); /* Initialize ibits lock policy. */ policy.l_inodebits.bits = bits; count = ldlm_cancel_resource_local(res, cancels, &policy, mode, 0, 0, NULL); + LDLM_RESOURCE_DELREF(res); ldlm_resource_putref(res); RETURN(count); } diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index d34b459..366355d 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -2680,7 +2680,7 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, * lock. */ if (new_lock == NULL) - new_lock = ldlm_handle2lock(&lh->mlh_reg_lh); + new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0); if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) { lh->mlh_reg_lh.cookie = 0; @@ -2720,8 +2720,18 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, * Fixup the lock to be given to the client. */ lock_res_and_lock(new_lock); - new_lock->l_readers = 0; - new_lock->l_writers = 0; + /* Zero new_lock->l_readers and new_lock->l_writers without triggering + * possible blocking AST. */ + while (new_lock->l_readers > 0) { + lu_ref_del(&new_lock->l_reference, "reader", new_lock); + lu_ref_del(&new_lock->l_reference, "user", new_lock); + new_lock->l_readers--; + } + while (new_lock->l_writers > 0) { + lu_ref_del(&new_lock->l_reference, "writer", new_lock); + lu_ref_del(&new_lock->l_reference, "user", new_lock); + new_lock->l_writers--; + } new_lock->l_export = class_export_get(req->rq_export); new_lock->l_blocking_ast = lock->l_blocking_ast; @@ -2734,7 +2744,7 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, &new_lock->l_exp_hash); unlock_res_and_lock(new_lock); - LDLM_LOCK_PUT(new_lock); + LDLM_LOCK_RELEASE(new_lock); lh->mlh_reg_lh.cookie = 0; RETURN(ELDLM_LOCK_REPLACED); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index d66bb93..8d664db 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1652,7 +1652,7 @@ static enum interval_iter filter_intent_cb(struct interval_node *n, *v = LDLM_LOCK_GET(lck); } else if ((*v)->l_policy_data.l_extent.start < lck->l_policy_data.l_extent.start) { - LDLM_LOCK_PUT(*v); + LDLM_LOCK_RELEASE(*v); *v = LDLM_LOCK_GET(lck); } @@ -1721,7 +1721,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, LASSERT(lock->l_flags & LDLM_FL_CP_REQD); lock->l_flags &= ~LDLM_FL_CP_REQD; list_del_init(&wlock->l_cp_ast); - LDLM_LOCK_PUT(wlock); + LDLM_LOCK_RELEASE(wlock); } /* The lock met with no resistance; we're finished. */ @@ -1817,7 +1817,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, unlock_res(res); out: - LDLM_LOCK_PUT(l); + LDLM_LOCK_RELEASE(l); RETURN(ELDLM_LOCK_ABORTED); } @@ -3316,7 +3316,9 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, &res_id, LDLM_EXTENT, 0); if (res != NULL) { + LDLM_RESOURCE_ADDREF(res); rc = ldlm_res_lvbo_update(res, NULL, 0, 0); + LDLM_RESOURCE_DELREF(res); ldlm_resource_putref(res); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 33d31e5..9e8bcae 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -631,8 +631,10 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, if (res == NULL) RETURN(0); + LDLM_RESOURCE_ADDREF(res); count = ldlm_cancel_resource_local(res, cancels, NULL, mode, lock_flags, 0, NULL); + LDLM_RESOURCE_DELREF(res); ldlm_resource_putref(res); RETURN(count); } -- 1.8.3.1