From 8ee6cf665060a159b264a1c1562d44ada3c35f05 Mon Sep 17 00:00:00 2001 From: Eric Mei Date: Thu, 18 Mar 2010 12:47:19 -0700 Subject: [PATCH] b=22147 fix echo client, don't send dlm lock with intent. o=jay r=wangdi r=ericm --- lustre/obdclass/cl_lock.c | 5 --- lustre/obdecho/echo_client.c | 4 +-- lustre/osc/osc_cl_internal.h | 6 ++-- lustre/osc/osc_lock.c | 83 ++++++++------------------------------------ 4 files changed, 20 insertions(+), 78 deletions(-) diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index 6535e3a..0726ab6 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -2093,14 +2093,10 @@ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io, const char *scope, const void *source) { struct cl_lock *lock; - const struct lu_fid *fid; int rc; - int iter; __u32 enqflags = need->cld_enq_flags; ENTRY; - fid = lu_object_fid(&io->ci_obj->co_lu); - iter = 0; do { lock = cl_lock_hold_mutex(env, io, need, scope, source); if (!IS_ERR(lock)) { @@ -2122,7 +2118,6 @@ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io, lock = ERR_PTR(rc); } else rc = PTR_ERR(lock); - iter++; } while (rc == 0); RETURN(lock); } diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 2109aa8..7659c88 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -998,7 +998,7 @@ static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco, descr->cld_start = cl_index(obj, start); descr->cld_end = cl_index(obj, end); descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ; - descr->cld_enq_flags = CEF_ASYNC | enqflags; + descr->cld_enq_flags = enqflags; io->ci_obj = obj; lck = cl_lock_request(env, io, descr, "ec enqueue", eco); @@ -1158,7 +1158,7 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset, rc = cl_echo_enqueue0(env, eco, offset, offset + npages * CFS_PAGE_SIZE - 1, rw == READ ? LCK_PW : LCK_PW, &lh.cookie, - CILR_NEVER); + CEF_NEVER); if (rc < 0) GOTO(error_lock, rc); diff --git a/lustre/osc/osc_cl_internal.h b/lustre/osc/osc_cl_internal.h index fb5f74d..d3445e2 100644 --- a/lustre/osc/osc_cl_internal.h +++ b/lustre/osc/osc_cl_internal.h @@ -245,9 +245,11 @@ struct osc_lock { ols_glimpse:1; /** * IO that owns this lock. This field is used for a dead-lock - * avoidance by osc_lock_enqueue(). + * avoidance by osc_lock_enqueue_wait(). * - * \see osc_deadlock_is_possible() + * XXX: unfortunately, the owner of a osc_lock is not unique, + * the lock may have multiple users, if the lock is granted and + * then matched. */ struct osc_io *ols_owner; }; diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index bc552de..5718937 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -1108,68 +1108,9 @@ static int osc_lock_enqueue_wait(const struct lu_env *env, } /** - * Deadlock avoidance for osc_lock_enqueue(). Consider following scenario: - * - * - Thread0: obtains PR:[0, 10]. Lock is busy. - * - * - Thread1: enqueues PW:[5, 50]. Blocking ast is sent to - * PR:[0, 10], but cancellation of busy lock is postponed. - * - * - Thread0: enqueue PR:[30, 40]. Lock is locally matched to - * PW:[5, 50], and thread0 waits for the lock completion never - * releasing PR:[0, 10]---deadlock. - * - * The second PR lock can be glimpse (it is to deal with that situation that - * ll_glimpse_size() has second argument, preventing local match of - * not-yet-granted locks, see bug 10295). Similar situation is possible in the - * case of memory mapped user level buffer. - * - * To prevent this we can detect a situation when current "thread" or "io" - * already holds a lock on this object and either add LDLM_FL_BLOCK_GRANTED to - * the ols->ols_flags, or prevent local match with PW locks. - */ -static int osc_deadlock_is_possible(const struct lu_env *env, - struct cl_lock *lock) -{ - struct cl_object *obj; - struct cl_object_header *head; - struct cl_lock *scan; - struct osc_io *oio; - - int result; - - ENTRY; - - LASSERT(cl_lock_is_mutexed(lock)); - - oio = osc_env_io(env); - obj = lock->cll_descr.cld_obj; - head = cl_object_header(obj); - - result = 0; - cfs_spin_lock(&head->coh_lock_guard); - cfs_list_for_each_entry(scan, &head->coh_locks, cll_linkage) { - if (scan != lock) { - struct osc_lock *oscan; - - oscan = osc_lock_at(scan); - LASSERT(oscan != NULL); - if (oscan->ols_owner == oio) { - result = 1; - break; - } - } - } - cfs_spin_unlock(&head->coh_lock_guard); - RETURN(result); -} - -/** * Implementation of cl_lock_operations::clo_enqueue() method for osc * layer. This initiates ldlm enqueue: * - * - checks for possible dead-lock conditions (osc_deadlock_is_possible()); - * * - cancels conflicting locks early (osc_lock_enqueue_wait()); * * - calls osc_enqueue_base() to do actual enqueue. @@ -1201,8 +1142,6 @@ static int osc_lock_enqueue(const struct lu_env *env, osc_lock_build_res(env, obj, resname); osc_lock_build_policy(env, lock, policy); ols->ols_flags = osc_enq2ldlm_flags(enqflags); - if (osc_deadlock_is_possible(env, lock)) - ols->ols_flags |= LDLM_FL_BLOCK_GRANTED; if (ols->ols_flags & LDLM_FL_HAS_INTENT) ols->ols_glimpse = 1; if (!(enqflags & CEF_MUST)) @@ -1239,6 +1178,7 @@ static int osc_lock_enqueue(const struct lu_env *env, } } else { ols->ols_state = OLS_GRANTED; + ols->ols_owner = osc_env_io(env); } } LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols))); @@ -1524,20 +1464,17 @@ static int osc_lock_fits_into(const struct lu_env *env, * will not release sublock1. Bang! */ if (ols->ols_state < OLS_GRANTED || - ols->ols_state > OLS_RELEASED) + ols->ols_state > OLS_RELEASED) return 0; } else if (need->cld_enq_flags & CEF_MUST) { - /* + /* * If the lock hasn't ever enqueued, it can't be matched * because enqueue process brings in many information * which can be used to determine things such as lockless, * CEF_MUST, etc. */ - if (ols->ols_state < OLS_GRANTED || - ols->ols_state > OLS_RELEASED) - return 0; if (ols->ols_state < OLS_UPCALL_RECEIVED && - ols->ols_locklessable) + ols->ols_locklessable) return 0; } return 1; @@ -1614,7 +1551,7 @@ static void osc_lock_lockless_state(const struct lu_env *env, if (state == CLS_HELD) { struct osc_io *oio = osc_env_io(env); - LASSERT(lock->ols_owner == NULL); + LASSERT(ergo(lock->ols_owner, lock->ols_owner == oio)); lock->ols_owner = oio; /* set the io to be lockless if this lock is for io's @@ -1630,7 +1567,15 @@ static int osc_lock_lockless_fits_into(const struct lu_env *env, const struct cl_lock_descr *need, const struct cl_io *io) { - return 0; + struct osc_lock *lock = cl2osc_lock(slice); + + if (!(need->cld_enq_flags & CEF_NEVER)) + return 0; + + /* To solve the problem of stacking echo client upon osc directly. + * see bug 22147 for details. + */ + return (lock->ols_owner == osc_env_io(env)); } static const struct cl_lock_operations osc_lock_lockless_ops = { -- 1.8.3.1