From: alex Date: Wed, 27 Jul 2005 18:54:34 +0000 (+0000) Subject: b=7200 X-Git-Tag: 1.4.10~850 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=11e0902cb38306ccb570ae2aab6348f64bdb9825;p=fs%2Flustre-release.git b=7200 - protect lock->l_resource from concurrent ldlm_lock_change_resource() --- diff --git a/lustre/cmobd/cm_oss_reint.c b/lustre/cmobd/cm_oss_reint.c index 81b616a..3db865a 100644 --- a/lustre/cmobd/cm_oss_reint.c +++ b/lustre/cmobd/cm_oss_reint.c @@ -141,7 +141,7 @@ static int cache_blocking_ast(struct ldlm_lock *lock, } /* XXX layering violation! -phil */ - lock_res(lock->l_resource); + lock_res_and_lock(lock); /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy, * such that filter_blocking_ast is called just before l_i_p takes the @@ -149,13 +149,13 @@ static int cache_blocking_ast(struct ldlm_lock *lock, * correct blocking function anymore. So check, and return early, if * so. */ if (lock->l_blocking_ast != cache_blocking_ast) { - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); RETURN(0); } lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); if (do_ast) { struct lustre_handle lockh; diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 2ad9b21..dad6885 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -106,9 +106,12 @@ typedef enum { #define LDLM_FL_CLEANED 0x800000 /* optimization hint: LDLM can run blocking callback from current context - * w/o involving separate thread. in order to decrease cs rate -bzzz */ + * w/o involving separate thread. in order to decrease cs rate */ #define LDLM_FL_ATOMIC_CB 0x1000000 +/* while this flag is set, the lock can't change resource */ +#define LDLM_FL_LOCK_PROTECT 0x4000000 +#define LDLM_FL_LOCK_PROTECT_BIT 26 /* The blocking callback is overloaded to perform two functions. These flags * indicate which operation should be performed. */ @@ -307,6 +310,7 @@ struct ldlm_lock { unsigned long l_callback_timeout; __u32 l_pid; /* pid which created this lock */ + __u32 l_pidb; /* who holds LOCK_PROTECT_BIT */ struct list_head l_tmp; @@ -681,5 +685,7 @@ static inline void check_res_locked(struct ldlm_resource *res) LASSERT_SPIN_LOCKED(&res->lr_lock); } +struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock); +void unlock_res_and_lock(struct ldlm_lock *lock); #endif diff --git a/lustre/ldlm/l_lock.c b/lustre/ldlm/l_lock.c index 746b485..fb41ccb 100644 --- a/lustre/ldlm/l_lock.c +++ b/lustre/ldlm/l_lock.c @@ -48,3 +48,48 @@ #include #include +/* + * ldlm locking uses resource to serialize access to locks + * but there is a case when we change resource of lock upon + * enqueue reply. we rely on that lock->l_resource = new_res + * is atomic + */ +struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock) +{ + struct ldlm_resource *res = lock->l_resource; + + if (!res->lr_namespace->ns_client) { + /* on server-side resource of lock doesn't change */ + lock_res(res); + return res; + } + + bit_spin_lock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags); + LASSERT(lock->l_pidb == 0); + res = lock->l_resource; + lock->l_pidb = current->pid; + lock_res(res); + return res; +} + +void unlock_bitlock(struct ldlm_lock *lock) +{ + LASSERT(lock->l_pidb == current->pid); + lock->l_pidb = 0; + bit_spin_unlock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags); +} + +void unlock_res_and_lock(struct ldlm_lock *lock) +{ + struct ldlm_resource *res = lock->l_resource; + + if (!res->lr_namespace->ns_client) { + /* on server-side resource of lock doesn't change */ + unlock_res(res); + return; + } + + unlock_res(res); + unlock_bitlock(lock); +} + diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index d73b52a..c545c89 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -129,25 +129,24 @@ void ldlm_lock_put(struct ldlm_lock *lock) LASSERT(lock->l_resource != LP_POISON); LASSERT(atomic_read(&lock->l_refc) > 0); if (atomic_dec_and_test(&lock->l_refc)) { - struct ldlm_resource *res = lock->l_resource; - struct ldlm_namespace *ns = res->lr_namespace; + struct ldlm_resource *res; LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing"); - LASSERT(lock->l_resource != LP_POISON); - lock_res(res); + lock_res_and_lock(lock); + res = lock->l_resource; LASSERT(lock->l_destroyed); LASSERT(list_empty(&lock->l_res_link)); if (lock->l_parent) LDLM_LOCK_PUT(lock->l_parent); - unlock_res(res); + unlock_res_and_lock(lock); - ldlm_resource_putref(lock->l_resource); + atomic_dec(&res->lr_namespace->ns_locks); + ldlm_resource_putref(res); lock->l_resource = NULL; if (lock->l_export) class_export_put(lock->l_export); - atomic_dec(&ns->ns_locks); if (lock->l_lvb_data != NULL) OBD_FREE(lock->l_lvb_data, lock->l_lvb_len); @@ -181,7 +180,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock) { ENTRY; - lock_res(lock->l_resource); + lock_res_and_lock(lock); if (!list_empty(&lock->l_children)) { LDLM_ERROR(lock, "still has children (%p)!", @@ -203,7 +202,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock) if (lock->l_destroyed) { LASSERT(list_empty(&lock->l_lru)); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); EXIT; return; } @@ -232,7 +231,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock) lock->l_completion_ast(lock, 0); #endif - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); EXIT; } @@ -275,6 +274,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, INIT_LIST_HEAD(&lock->l_cp_ast); init_waitqueue_head(&lock->l_waitq); lock->l_blocking_lock = NULL; + lock->l_pidb = 0; atomic_inc(&resource->lr_namespace->ns_locks); @@ -291,17 +291,23 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, RETURN(lock); } +void unlock_bitlock(struct ldlm_lock *lock); + int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, struct ldlm_res_id new_resid) { struct ldlm_resource *oldres = lock->l_resource; + struct ldlm_resource *newres; + int type; ENTRY; - lock_res(oldres); + LASSERT(ns->ns_client != 0); + + lock_res_and_lock(lock); if (memcmp(&new_resid, &lock->l_resource->lr_name, sizeof(lock->l_resource->lr_name)) == 0) { /* Nothing to do */ - unlock_res(oldres); + unlock_res_and_lock(lock); RETURN(0); } @@ -310,15 +316,18 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, /* This function assumes that the lock isn't on any lists */ LASSERT(list_empty(&lock->l_res_link)); - lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, - lock->l_resource->lr_type, - 1); - if (lock->l_resource == NULL) { + type = oldres->lr_type; + newres = ldlm_resource_get(ns, NULL, new_resid, type, 1); + if (newres == NULL) { LBUG(); RETURN(-ENOMEM); } + lock_res(newres); + lock->l_resource = newres; + unlock_res(newres); unlock_res(oldres); + unlock_bitlock(lock); /* ...and the flowers are still standing! */ ldlm_resource_putref(oldres); @@ -355,19 +364,19 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags) ns = lock->l_resource->lr_namespace; LASSERT(ns != NULL); - lock_res(lock->l_resource); + lock_res_and_lock(lock); /* It's unlikely but possible that someone marked the lock as * destroyed after we did handle2object on it */ if (lock->l_destroyed) { - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock); LDLM_LOCK_PUT(lock); GOTO(out, retval); } if (flags && (lock->l_flags & flags)) { - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); GOTO(out, retval); } @@ -375,7 +384,7 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags) if (flags) lock->l_flags |= flags; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); retval = lock; EXIT; out: @@ -465,9 +474,9 @@ void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode) /* only called for local locks */ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) { - lock_res(lock->l_resource); + lock_res_and_lock(lock); ldlm_lock_addref_internal_nolock(lock, mode); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); } void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) @@ -475,9 +484,9 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) struct ldlm_namespace *ns; ENTRY; - ns = lock->l_resource->lr_namespace; + lock_res_and_lock(lock); - lock_res(lock->l_resource); + ns = lock->l_resource->lr_namespace; LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); if (mode & (LCK_NL | LCK_CR | LCK_PR)) { @@ -509,7 +518,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) LDLM_LOCK_GET(lock); /* dropped by bl thread */ ldlm_lock_remove_from_lru(lock); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); if ((lock->l_flags & LDLM_FL_ATOMIC_CB) || ldlm_bl_to_thread(ns, NULL, lock) != 0) ldlm_handle_bl_callback(ns, NULL, lock); @@ -523,10 +532,10 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) list_add_tail(&lock->l_lru, &ns->ns_unused_list); ns->ns_nr_unused++; spin_unlock(&ns->ns_unused_lock); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); ldlm_cancel_lru(ns, LDLM_ASYNC); } else { - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); } LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ @@ -552,9 +561,9 @@ void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode) LASSERT(lock != NULL); LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); - lock_res(lock->l_resource); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); ldlm_lock_decref_internal(lock, mode); LDLM_LOCK_PUT(lock); } @@ -654,10 +663,10 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, void ldlm_lock_allow_match(struct ldlm_lock *lock) { - lock_res(lock->l_resource); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CAN_MATCH; wake_up(&lock->l_waitq); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); } /* Can be called in two ways: @@ -859,7 +868,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, LASSERT(rc == ELDLM_OK); } - lock_res(lock->l_resource); + lock_res_and_lock(lock); if (local && lock->l_req_mode == lock->l_granted_mode) { /* The server returned a blocked lock, but it was granted before * we got a chance to actually enqueue it. We don't need to do @@ -911,7 +920,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, policy(lock, flags, 1, &rc, NULL); EXIT; out: - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); return rc; } @@ -958,14 +967,14 @@ int ldlm_run_bl_ast_work(struct list_head *rpc_list) list_entry(tmp, struct ldlm_lock, l_bl_ast); /* nobody should touch l_bl_ast */ - lock_res(lock->l_resource); + lock_res_and_lock(lock); list_del_init(&lock->l_bl_ast); LASSERT(lock->l_flags & LDLM_FL_AST_SENT); LASSERT(lock->l_bl_ast_run == 0); LASSERT(lock->l_blocking_lock); lock->l_bl_ast_run++; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); ldlm_lock2desc(lock->l_blocking_lock, &d); @@ -1005,11 +1014,11 @@ int ldlm_run_cp_ast_work(struct list_head *rpc_list) list_entry(tmp, struct ldlm_lock, l_cp_ast); /* nobody should touch l_cp_ast */ - lock_res(lock->l_resource); + lock_res_and_lock(lock); list_del_init(&lock->l_cp_ast); LASSERT(lock->l_flags & LDLM_FL_CP_REQD); lock->l_flags &= ~LDLM_FL_CP_REQD; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); if (lock->l_completion_ast != NULL) rc = lock->l_completion_ast(lock, 0, 0); @@ -1094,10 +1103,10 @@ void ldlm_cancel_callback(struct ldlm_lock *lock) if (!(lock->l_flags & LDLM_FL_CANCEL)) { lock->l_flags |= LDLM_FL_CANCEL; if (lock->l_blocking_ast) { - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); lock->l_blocking_ast(lock, NULL, lock->l_ast_data, LDLM_CB_CANCELING); - lock_res(lock->l_resource); + lock_res_and_lock(lock); } else { LDLM_DEBUG(lock, "no blocking ast"); } @@ -1110,12 +1119,12 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) struct ldlm_namespace *ns; ENTRY; + ldlm_del_waiting_lock(lock); + lock_res_and_lock(lock); + res = lock->l_resource; ns = res->lr_namespace; - ldlm_del_waiting_lock(lock); - lock_res(res); - /* Please do not, no matter how tempting, remove this LBUG without * talking to me first. -phik */ if (lock->l_readers || lock->l_writers) { @@ -1126,7 +1135,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) ldlm_cancel_callback(lock); ldlm_resource_unlink_lock(lock); - unlock_res(res); + unlock_res_and_lock(lock); ldlm_lock_destroy(lock); @@ -1189,11 +1198,11 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, LASSERTF(new_mode == LCK_PW && lock->l_granted_mode == LCK_PR, "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode); + lock_res_and_lock(lock); + res = lock->l_resource; ns = res->lr_namespace; - lock_res(res); - old_mode = lock->l_req_mode; lock->l_req_mode = new_mode; ldlm_resource_unlink_lock(lock); @@ -1229,7 +1238,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, granted = 1; } } - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); if (granted) ldlm_run_cp_ast_work(&rpc_list); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index a7275d0..f4fae3c 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -416,11 +416,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (req == NULL) RETURN(-ENOMEM); - lock_res(lock->l_resource); + lock_res_and_lock(lock); if (lock->l_granted_mode != lock->l_req_mode) { /* this blocking AST will be communicated as part of the * completion AST instead */ - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); ptlrpc_req_finished(req); RETURN(0); @@ -428,7 +428,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (lock->l_destroyed) { /* What's the point? */ - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); ptlrpc_req_finished(req); RETURN(0); } @@ -444,7 +444,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (lock->l_granted_mode == lock->l_req_mode) ldlm_add_waiting_lock(lock); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); req->rq_send_state = LUSTRE_IMP_FULL; req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */ @@ -481,12 +481,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (total_enqueue_wait / 1000000 > obd_timeout) LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait); - lock_res(lock->l_resource); + lock_res_and_lock(lock); if (lock->l_resource->lr_lvb_len) { buffers = 2; size[1] = lock->l_resource->lr_lvb_len; } - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK, @@ -505,10 +505,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) lvb = lustre_msg_buf(req->rq_reqmsg, 1, lock->l_resource->lr_lvb_len); - lock_res(lock->l_resource); + lock_res_and_lock(lock); memcpy(lvb, lock->l_resource->lr_lvb_data, lock->l_resource->lr_lvb_len); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); } LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)", @@ -519,12 +519,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */ /* We only send real blocking ASTs after the lock is granted */ - lock_res(lock->l_resource); + lock_res_and_lock(lock); if (lock->l_flags & LDLM_FL_AST_SENT) { body->lock_flags |= LDLM_FL_AST_SENT; ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */ } - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); rc = ptlrpc_queue_wait(req); if (rc != 0) @@ -537,7 +537,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) { - struct ldlm_resource *res = lock->l_resource; + struct ldlm_resource *res; struct ldlm_request *body; struct ptlrpc_request *req; int rc = 0, size = sizeof(*body); @@ -556,9 +556,10 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) sizeof(body->lock_handle1)); ldlm_lock2desc(lock, &body->lock_desc); - lock_res(lock->l_resource); + lock_res_and_lock(lock); size = lock->l_resource->lr_lvb_len; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); + res = lock->l_resource; req->rq_replen = lustre_msg_size(1, &size); req->rq_send_state = LUSTRE_IMP_FULL; @@ -671,12 +672,12 @@ existing_lock: cookie = req; } else { int buffers = 1; - lock_res(lock->l_resource); + lock_res_and_lock(lock); if (lock->l_resource->lr_lvb_len) { size[1] = lock->l_resource->lr_lvb_len; buffers = 2; } - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR)) GOTO(out, rc = -ENOMEM); @@ -705,13 +706,13 @@ existing_lock: /* We never send a blocking AST until the lock is granted, but * we can tell it right now */ - lock_res(lock->l_resource); + lock_res_and_lock(lock); if (lock->l_flags & LDLM_FL_AST_SENT) { dlm_rep->lock_flags |= LDLM_FL_AST_SENT; if (lock->l_granted_mode == lock->l_req_mode) ldlm_add_waiting_lock(lock); } - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); EXIT; out: @@ -730,7 +731,7 @@ existing_lock: "(err=%d, rc=%d)", err, rc); if (rc == 0) { - lock_res(lock->l_resource); + lock_res_and_lock(lock); size[1] = lock->l_resource->lr_lvb_len; if (size[1] > 0) { void *lvb = lustre_msg_buf(req->rq_repmsg, @@ -741,7 +742,7 @@ existing_lock: memcpy(lvb, lock->l_resource->lr_lvb_data, size[1]); } - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); } else { ldlm_lock_destroy(lock); } @@ -878,10 +879,10 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, LDLM_DEBUG(lock, "client blocking AST callback handler START"); - lock_res(lock->l_resource); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); if (do_ast) { LDLM_DEBUG(lock, "already unused, calling " @@ -904,13 +905,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, struct ldlm_request *dlm_req, struct ldlm_lock *lock) { - struct ldlm_resource *res = lock->l_resource; LIST_HEAD(ast_list); ENTRY; LDLM_DEBUG(lock, "client completion callback handler START"); - lock_res(res); + lock_res_and_lock(lock); /* If we receive the completion AST before the actual enqueue returned, * then we might need to switch lock modes, resources, or extents. */ @@ -929,11 +929,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, if (memcmp(&dlm_req->lock_desc.l_resource.lr_name, &lock->l_resource->lr_name, sizeof(lock->l_resource->lr_name)) != 0) { - unlock_res(res); + unlock_res_and_lock(lock); ldlm_lock_change_resource(ns, lock, dlm_req->lock_desc.l_resource.lr_name); LDLM_DEBUG(lock, "completion AST, new resource"); - lock_res(res); + CERROR("change resource!\n"); + lock_res_and_lock(lock); } if (dlm_req->lock_flags & LDLM_FL_AST_SENT) { @@ -954,7 +955,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } ldlm_grant_lock(lock, &ast_list); - unlock_res(res); + unlock_res_and_lock(lock); LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); LDLM_LOCK_PUT(lock); @@ -986,18 +987,18 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, ptlrpc_error(req); } - lock_res(lock->l_resource); + lock_res_and_lock(lock); if (lock->l_granted_mode == LCK_PW && !lock->l_readers && !lock->l_writers && time_after(jiffies, lock->l_last_used + 10 * HZ)) { - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); if (ldlm_bl_to_thread(ns, NULL, lock)) ldlm_handle_bl_callback(ns, NULL, lock); EXIT; return; } - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); EXIT; } @@ -1186,7 +1187,9 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) } /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */ + lock_res_and_lock(lock); lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS); + unlock_res_and_lock(lock); /* We want the ost thread to get this reply so that it can respond * to ost requests (write cache writeback) that might be triggered @@ -1654,3 +1657,9 @@ EXPORT_SYMBOL(target_send_reply); EXPORT_SYMBOL(target_queue_recovery_request); EXPORT_SYMBOL(target_handle_ping); EXPORT_SYMBOL(target_handle_disconnect); + +/* l_lock.c */ +EXPORT_SYMBOL(lock_res_and_lock); +EXPORT_SYMBOL(unlock_res_and_lock); + + diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 2a9d8a8..b57f1f7 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -214,9 +214,9 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, struct lustre_handle *lockh, int mode) { /* Set a flag to prevent us from sending a CANCEL (bug 407) */ - lock_res(lock->l_resource); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_LOCAL_ONLY; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY"); ldlm_lock_decref_and_cancel(lockh, mode); @@ -402,9 +402,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, } if ((*flags) & LDLM_FL_AST_SENT) { - lock_res(lock->l_resource); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); LDLM_DEBUG(lock, "enqueue reply includes blocking AST"); } @@ -573,11 +573,11 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) LDLM_DEBUG(lock, "client-side cancel"); /* Set this flag to prevent others from getting new references*/ - lock_res(lock->l_resource); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; local_only = lock->l_flags & LDLM_FL_LOCAL_ONLY; ldlm_cancel_callback(lock); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); if (local_only) { CDEBUG(D_INFO, "not sending request (at caller's " @@ -676,7 +676,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) LDLM_LOCK_GET(lock); /* dropped by bl thread */ spin_unlock(&ns->ns_unused_lock); - lock_res(lock->l_resource); + lock_res_and_lock(lock); ldlm_lock_remove_from_lru(lock); /* Setting the CBPENDING flag is a little misleading, but @@ -694,7 +694,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) if (sync != LDLM_ASYNC || ldlm_bl_to_thread(ns, NULL, lock)) list_add(&lock->l_tmp, &cblist); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); spin_lock(&ns->ns_unused_lock); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 54233fc..1f9673c 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -806,7 +806,7 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, ll_pgcache_remove_extent(inode, lsm, lock, stripe); down(&lli->lli_size_sem); - lock_res(lock->l_resource); + lock_res_and_lock(lock); kms = ldlm_extent_shift_kms(lock, lsm->lsm_oinfo[stripe].loi_kms); @@ -814,7 +814,7 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, lsm->lsm_oinfo[stripe].loi_kms, kms); lsm->lsm_oinfo[stripe].loi_kms = kms; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); up(&lli->lli_size_sem); //ll_try_done_writing(inode); iput: @@ -862,14 +862,14 @@ int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data) lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size; down(&inode->i_sem); - lock_res(lock->l_resource); + lock_res_and_lock(lock); kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size); kms = ldlm_extent_shift_kms(NULL, kms); if (lsm->lsm_oinfo[stripe].loi_kms != kms) LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, lsm->lsm_oinfo[stripe].loi_kms, kms); lsm->lsm_oinfo[stripe].loi_kms = kms; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); up(&inode->i_sem); } diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index ac174a5..380c4db 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -1013,7 +1013,7 @@ struct inode *ll_inode_from_lock(struct ldlm_lock *lock) struct inode *inode = NULL; /* NOTE: we depend on atomic igrab() -bzzz */ - lock_res(lock->l_resource); + lock_res_and_lock(lock); if (lock->l_ast_data) { struct ll_inode_info *lli = ll_i2info(lock->l_ast_data); if (lli->lli_inode_magic == LLI_INODE_MAGIC) { @@ -1026,7 +1026,7 @@ struct inode *ll_inode_from_lock(struct ldlm_lock *lock) inode = NULL; } } - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); return inode; } diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 96b87f0..7b08a84 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -138,7 +138,7 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *l, void *data) lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); - lock_res(lock->l_resource); + lock_res_and_lock(lock); #ifdef __KERNEL__ if (lock->l_ast_data && lock->l_ast_data != data) { struct inode *new_inode = data; @@ -152,7 +152,7 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *l, void *data) } #endif lock->l_ast_data = data; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); EXIT; diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index ef135b5..63ba972 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -805,7 +805,7 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, } /* XXX layering violation! -phil */ - lock_res(lock->l_resource); + lock_res_and_lock(lock); /* * get this: if mds_blocking_ast is racing with mds_intent_policy, such @@ -814,13 +814,13 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, * blocking function anymore. So check, and return early, if so. */ if (lock->l_blocking_ast != mds_blocking_ast) { - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); RETURN(0); } lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); if (do_ast) { struct lustre_handle lockh; @@ -2698,26 +2698,26 @@ static void mds_revoke_export_locks(struct obd_export *exp) spin_lock(&exp->exp_ldlm_data.led_lock); list_for_each_entry_safe(lock, next, locklist, l_export_chain) { - lock_res(lock->l_resource); + lock_res_and_lock(lock); if (lock->l_req_mode != lock->l_granted_mode) { - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); continue; } LASSERT(lock->l_resource); if (lock->l_resource->lr_type != LDLM_IBITS && lock->l_resource->lr_type != LDLM_PLAIN) { - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); continue; } if (lock->l_flags & LDLM_FL_AST_SENT) { - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); continue; } lock->l_flags |= LDLM_FL_AST_SENT; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); /* the desc just pretend to exclusive */ ldlm_lock2desc(lock, &desc); @@ -4140,7 +4140,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns, } /* Fixup the lock to be given to the client */ - lock_res(new_lock->l_resource); + lock_res_and_lock(new_lock); new_lock->l_readers = 0; new_lock->l_writers = 0; @@ -4159,7 +4159,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns, new_lock->l_flags &= ~LDLM_FL_LOCAL; - unlock_res(new_lock->l_resource); + unlock_res_and_lock(new_lock); LDLM_LOCK_PUT(new_lock); RETURN(ELDLM_LOCK_REPLACED); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index f236898..1652e69 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1068,20 +1068,20 @@ static int filter_blocking_ast(struct ldlm_lock *lock, } /* XXX layering violation! -phil */ - lock_res(lock->l_resource); + lock_res_and_lock(lock); /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy, * such that filter_blocking_ast is called just before l_i_p takes the * ns_lock, then by the time we get the lock, we might not be the * correct blocking function anymore. So check, and return early, if * so. */ if (lock->l_blocking_ast != filter_blocking_ast) { - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); RETURN(0); } lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); if (do_ast) { struct lustre_handle lockh; @@ -1308,7 +1308,8 @@ static int filter_intent_policy(struct ldlm_namespace *ns, lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF; lock->l_req_mode = LCK_PR; - lock_res(res); + lock_res_and_lock(lock); + res = lock->l_resource; rc = policy(lock, &tmpflags, 0, &err, &rpc_list); /* FIXME: we should change the policy function slightly, to not make @@ -1325,7 +1326,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, if (rc == LDLM_ITER_CONTINUE) { /* The lock met with no resistance; we're finished. */ - unlock_res(res); + unlock_res_and_lock(lock); RETURN(ELDLM_LOCK_REPLACED); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index a0d4323..9dc9d44 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2377,7 +2377,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data) return; } - lock_res(lock->l_resource); + lock_res_and_lock(lock); #ifdef __KERNEL__ if (lock->l_ast_data && lock->l_ast_data != data) { struct inode *new_inode = data; @@ -2393,7 +2393,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data) } #endif lock->l_ast_data = data; - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); }