From 05c7ee7e1f5214fba9c1a86aa6854b5a7902bdca Mon Sep 17 00:00:00 2001 From: huangwei Date: Mon, 2 Jul 2007 09:59:34 +0000 Subject: [PATCH] b=11880 r=alex,oleg landing patch from 11880. --- lustre/ldlm/ldlm_flock.c | 43 ++++++++++++++++++++++++++++++------------- lustre/ldlm/ldlm_internal.h | 2 ++ lustre/ldlm/ldlm_lock.c | 28 ++++++++++++++++++---------- 3 files changed, 50 insertions(+), 23 deletions(-) diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 80f87e0..4dba55b 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -88,7 +88,10 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags) if (flags == LDLM_FL_WAIT_NOREPROC) { /* client side - set a flag to prevent sending a CANCEL */ lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING; - ldlm_lock_decref_internal(lock, mode); + + /* when reaching here, it is under lock_res_and_lock(). Thus, + need call the nolock version of ldlm_lock_decref_internal*/ + ldlm_lock_decref_internal_nolock(lock, mode); } ldlm_lock_destroy_nolock(lock); @@ -137,6 +140,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, int local = ns->ns_client; int added = (mode == LCK_NL); int overlaps = 0; + int splitted = 0; ENTRY; CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64 @@ -155,6 +159,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, req->l_blocking_ast = ldlm_flock_blocking_ast; } +reprocess: if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) { /* This loop determines where this processes locks start * in the resource lr_granted list. */ @@ -334,15 +339,22 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, /* XXX - if ldlm_lock_new() can sleep we should * release the ns_lock, allocate the new lock, * and restart processing this lock. */ - new2 = ldlm_lock_create(ns, res->lr_name, LDLM_FLOCK, + if (!new2) { + unlock_res_and_lock(req); + new2 = ldlm_lock_create(ns, res->lr_name, LDLM_FLOCK, lock->l_granted_mode, NULL, NULL, NULL, NULL, 0); - if (!new2) { - ldlm_flock_destroy(req, lock->l_granted_mode, *flags); - *err = -ENOLCK; - RETURN(LDLM_ITER_STOP); + lock_res_and_lock(req); + if (!new2) { + ldlm_flock_destroy(req, lock->l_granted_mode, *flags); + *err = -ENOLCK; + RETURN(LDLM_ITER_STOP); + } + goto reprocess; } + splitted = 1; + new2->l_granted_mode = lock->l_granted_mode; new2->l_policy_data.l_flock.pid = new->l_policy_data.l_flock.pid; @@ -360,8 +372,9 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, &new2->l_export->exp_ldlm_data.led_held_locks); spin_unlock(&new2->l_export->exp_ldlm_data.led_lock); } - if (*flags == LDLM_FL_WAIT_NOREPROC) - ldlm_lock_addref_internal(new2, lock->l_granted_mode); + if (*flags == LDLM_FL_WAIT_NOREPROC) { + ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode); + } /* insert new2 at lock */ ldlm_resource_add_lock(res, ownlocks, new2); @@ -369,6 +382,10 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, break; } + /* if new2 is created but never used, destroy it*/ + if (splitted == 0 && new2 != NULL) + ldlm_lock_destroy_nolock(new2); + /* At this point we're granting the lock request. */ req->l_granted_mode = req->l_req_mode; @@ -398,9 +415,9 @@ restart: ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list); - unlock_res(res); - rc = ldlm_run_bl_ast_work(&rpc_list); - lock_res(res); + unlock_res_and_lock(req); + rc = ldlm_run_cp_ast_work(&rpc_list); + lock_res_and_lock(req); if (rc == -ERESTART) GOTO(restart, -ERESTART); } @@ -500,7 +517,7 @@ granted: LDLM_DEBUG(lock, "client-side enqueue granted"); ns = lock->l_resource->lr_namespace; - lock_res(lock->l_resource); + lock_res_and_lock(lock); /* take lock off the deadlock detection waitq. */ list_del_init(&lock->l_flock_waitq); @@ -535,7 +552,7 @@ granted: if (flags == 0) cfs_waitq_signal(&lock->l_waitq); } - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); RETURN(0); } EXPORT_SYMBOL(ldlm_flock_completion_ast); diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 1b9ac7d..a11a85a 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -30,7 +30,9 @@ ldlm_lock_create(struct ldlm_namespace *ns, struct ldlm_res_id, ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **, void *cookie, int *flags); void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode); +void ldlm_lock_addref_internal_nolock(struct ldlm_lock *, __u32 mode); void ldlm_lock_decref_internal(struct ldlm_lock *, __u32 mode); +void ldlm_lock_decref_internal_nolock(struct ldlm_lock *, __u32 mode); void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, struct list_head *work_list); int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 12995ee..e95d2c3 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -545,15 +545,12 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) unlock_res_and_lock(lock); } -void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) +/* only called in ldlm_flock_destroy and for local locks. + * for LDLM_FLOCK type locks, l_blocking_ast is null, and + * ldlm_lock_remove_from_lru() does nothing, it is safe + * for ldlm_flock_destroy usage by dropping some code */ +void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode) { - struct ldlm_namespace *ns; - ENTRY; - - lock_res_and_lock(lock); - - ns = lock->l_resource->lr_namespace; - LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); if (mode & (LCK_NL | LCK_CR | LCK_PR)) { LASSERT(lock->l_readers > 0); @@ -564,6 +561,19 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) lock->l_writers--; } + LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ +} +void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) +{ + struct ldlm_namespace *ns; + ENTRY; + + lock_res_and_lock(lock); + + ns = lock->l_resource->lr_namespace; + + ldlm_lock_decref_internal_nolock(lock, mode); + if (lock->l_flags & LDLM_FL_LOCAL && !lock->l_readers && !lock->l_writers) { /* If this is a local lock on a server namespace and this was @@ -609,8 +619,6 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) unlock_res_and_lock(lock); } - LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ - EXIT; } -- 1.8.3.1