From 1072318b4d89cddba00e9adeb939249f485f2d13 Mon Sep 17 00:00:00 2001 From: jxiong Date: Thu, 19 Nov 2009 03:39:07 +0000 Subject: [PATCH] b=19906 r=wangdi,ericm Make racer work at the client side. --- lustre/include/cl_object.h | 14 ++--- lustre/lclient/glimpse.c | 6 +-- lustre/lclient/lcommon_cl.c | 2 +- lustre/lclient/lcommon_misc.c | 5 +- lustre/llite/vvp_io.c | 3 +- lustre/lov/lov_lock.c | 88 +++++++++++++++++++++++++++--- lustre/lov/lovsub_lock.c | 6 ++- lustre/obdclass/cl_io.c | 8 ++- lustre/obdclass/cl_lock.c | 121 ++++++++++++++++++------------------------ lustre/obdecho/echo_client.c | 4 +- lustre/osc/osc_lock.c | 48 +++++++++++------ 11 files changed, 189 insertions(+), 116 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index ba66ba7..563edd1 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1301,6 +1301,11 @@ struct cl_lock_descr { __u64 cld_gid; /** Lock mode. */ enum cl_lock_mode cld_mode; + /** + * flags to enqueue lock. A combination of bit-flags from + * enum cl_enq_flags. + */ + __u32 cld_enq_flags; }; #define DDESCR "%s(%d):[%lu, %lu]" @@ -1666,6 +1671,7 @@ struct cl_lock_operations { * usual return values of lock state-machine methods, this can return * -ESTALE to indicate that lock cannot be returned to the cache, and * has to be re-initialized. + * unuse is a one-shot operation, so it must NOT return CLO_WAIT. * * \see ccc_lock_unuse(), lov_lock_unuse(), osc_lock_unuse() */ @@ -2156,11 +2162,6 @@ struct cl_io_lock_link { struct list_head cill_linkage; struct cl_lock_descr cill_descr; struct cl_lock *cill_lock; - /** - * flags to enqueue lock for this IO. A combination of bit-flags from - * enum cl_enq_flags. - */ - __u32 cill_enq_flags; /** optional destructor */ void (*cill_fini)(const struct lu_env *env, struct cl_io_lock_link *link); @@ -2763,7 +2764,6 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io, const char *scope, const void *source); struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io, const struct cl_lock_descr *need, - __u32 enqflags, const char *scope, const void *source); struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj, struct cl_page *page, struct cl_lock *except, @@ -2901,7 +2901,7 @@ void cl_io_end (const struct lu_env *env, struct cl_io *io); int cl_io_lock_add (const struct lu_env *env, struct cl_io *io, struct cl_io_lock_link *link); int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io, - struct cl_lock_descr *descr, int enqflags); + struct cl_lock_descr *descr); int cl_io_read_page (const struct lu_env *env, struct cl_io *io, struct cl_page *page); int cl_io_prepare_write(const struct lu_env *env, struct cl_io *io, diff --git a/lustre/lclient/glimpse.c b/lustre/lclient/glimpse.c index ed81f15..1527635 100644 --- a/lustre/lclient/glimpse.c +++ b/lustre/lclient/glimpse.c @@ -118,6 +118,7 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io, *descr = whole_file; descr->cld_obj = clob; descr->cld_mode = CLM_PHANTOM; + descr->cld_enq_flags = CEF_ASYNC | CEF_MUST; cio->cui_glimpse = 1; /* * CEF_ASYNC is used because glimpse sub-locks cannot @@ -127,9 +128,8 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io, * CEF_MUST protects glimpse lock from conversion into * a lockless mode. */ - lock = cl_lock_request(env, io, descr, - CEF_ASYNC|CEF_MUST, - "glimpse", cfs_current()); + lock = cl_lock_request(env, io, descr, "glimpse", + cfs_current()); cio->cui_glimpse = 0; if (!IS_ERR(lock)) { result = cl_wait(env, lock); diff --git a/lustre/lclient/lcommon_cl.c b/lustre/lclient/lcommon_cl.c index 195ce87..3d8ea67 100644 --- a/lustre/lclient/lcommon_cl.c +++ b/lustre/lclient/lcommon_cl.c @@ -736,8 +736,8 @@ int ccc_io_one_lock_index(const struct lu_env *env, struct cl_io *io, descr->cld_obj = obj; descr->cld_start = start; descr->cld_end = end; + descr->cld_enq_flags = enqflags; - cio->cui_link.cill_enq_flags = enqflags; cl_io_lock_add(env, io, &cio->cui_link); RETURN(0); } diff --git a/lustre/lclient/lcommon_misc.c b/lustre/lclient/lcommon_misc.c index 24e896d..ee4fef3 100644 --- a/lustre/lclient/lcommon_misc.c +++ b/lustre/lclient/lcommon_misc.c @@ -154,8 +154,9 @@ int cl_get_grouplock(struct cl_object *obj, unsigned long gid, int nonblock, descr->cld_mode = CLM_GROUP; enqflags = CEF_MUST | (nonblock ? CEF_NONBLOCK : 0); - lock = cl_lock_request(env, io, descr, enqflags, - GROUPLOCK_SCOPE, cfs_current()); + descr->cld_enq_flags = enqflags; + + lock = cl_lock_request(env, io, descr, GROUPLOCK_SCOPE, cfs_current()); if (IS_ERR(lock)) { cl_io_fini(env, io); cl_env_put(env, &refcheck); diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index a602672..5107795 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -178,7 +178,8 @@ static int vvp_mmap_locks(const struct lu_env *env, policy.l_extent.start); descr->cld_end = cl_index(descr->cld_obj, policy.l_extent.end); - result = cl_io_lock_alloc_add(env, io, descr, flags); + descr->cld_enq_flags = flags; + result = cl_io_lock_alloc_add(env, io, descr); if (result < 0) RETURN(result); diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 4a02d03..fae9437 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -216,7 +216,8 @@ static int lov_sublock_lock(const struct lu_env *env, LASSERT(cl_lock_is_mutexed(child)); sublock->lss_active = parent; - if (unlikely(child->cll_state == CLS_FREEING)) { + if (unlikely((child->cll_state == CLS_FREEING) || + (child->cll_flags & CLF_CANCELLED))) { struct lov_lock_link *link; /* * we could race with lock deletion which temporarily @@ -345,6 +346,7 @@ static int lov_lock_sub_init(const struct lu_env *env, descr->cld_end = cl_index(descr->cld_obj, end); descr->cld_mode = parent->cll_descr.cld_mode; descr->cld_gid = parent->cll_descr.cld_gid; + descr->cld_enq_flags = parent->cll_descr.cld_enq_flags; /* XXX has no effect */ lck->lls_sub[nr].sub_got = *descr; lck->lls_sub[nr].sub_stripe = i; @@ -366,6 +368,7 @@ static int lov_lock_sub_init(const struct lu_env *env, result = PTR_ERR(sublock); break; } + cl_lock_get_trust(sublock); cl_lock_mutex_get(env, sublock); cl_lock_mutex_get(env, parent); /* @@ -383,6 +386,7 @@ static int lov_lock_sub_init(const struct lu_env *env, "lov-parent", parent); } cl_lock_mutex_put(env, sublock); + cl_lock_put(env, sublock); } } /* @@ -536,10 +540,11 @@ static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent, cl_lock_mutex_get(env, parent); if (!IS_ERR(sublock)) { + cl_lock_get_trust(sublock); if (parent->cll_state == CLS_QUEUING && - lck->lls_sub[idx].sub_lock == NULL) + lck->lls_sub[idx].sub_lock == NULL) { lov_sublock_adopt(env, lck, sublock, idx, link); - else { + } else { OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem); /* other thread allocated sub-lock, or enqueue is no * longer going on */ @@ -548,6 +553,7 @@ static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent, cl_lock_mutex_get(env, parent); } cl_lock_mutex_put(env, sublock); + cl_lock_put(env, sublock); result = CLO_REPEAT; } else result = PTR_ERR(sublock); @@ -659,15 +665,11 @@ static int lov_lock_unuse(const struct lu_env *env, if (lls->sub_flags & LSF_HELD) { LASSERT(sublock->cll_state == CLS_HELD); rc = cl_unuse_try(subenv->lse_env, sublock); - if (rc != CLO_WAIT) - rc = lov_sublock_release(env, lck, - i, 0, rc); + rc = lov_sublock_release(env, lck, i, 0, rc); } lov_sublock_unlock(env, sub, closure, subenv); } result = lov_subresult(result, rc); - if (result < 0) - break; } if (result == 0 && lck->lls_cancel_race) { @@ -678,6 +680,75 @@ static int lov_lock_unuse(const struct lu_env *env, RETURN(result); } + +static void lov_lock_cancel(const struct lu_env *env, + const struct cl_lock_slice *slice) +{ + struct lov_lock *lck = cl2lov_lock(slice); + struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock); + int i; + int result; + + ENTRY; + + for (result = 0, i = 0; i < lck->lls_nr; ++i) { + int rc; + struct lovsub_lock *sub; + struct cl_lock *sublock; + struct lov_lock_sub *lls; + struct lov_sublock_env *subenv; + + /* top-lock state cannot change concurrently, because single + * thread (one that released the last hold) carries unlocking + * to the completion. */ + lls = &lck->lls_sub[i]; + sub = lls->sub_lock; + if (sub == NULL) + continue; + + sublock = sub->lss_cl.cls_lock; + rc = lov_sublock_lock(env, lck, lls, closure, &subenv); + if (rc == 0) { + if (!(lls->sub_flags & LSF_HELD)) { + lov_sublock_unlock(env, sub, closure, subenv); + continue; + } + + switch(sublock->cll_state) { + case CLS_HELD: + rc = cl_unuse_try(subenv->lse_env, + sublock); + lov_sublock_release(env, lck, i, 0, 0); + break; + case CLS_ENQUEUED: + /* TODO: it's not a good idea to cancel this + * lock because it's innocent. But it's + * acceptable. The better way would be to + * define a new lock method to unhold the + * dlm lock. */ + cl_lock_cancel(env, sublock); + default: + lov_sublock_release(env, lck, i, 1, 0); + break; + } + lov_sublock_unlock(env, sub, closure, subenv); + } + + if (rc == CLO_REPEAT) { + --i; + continue; + } + + result = lov_subresult(result, rc); + } + + if (result) + CL_LOCK_DEBUG(D_ERROR, env, slice->cls_lock, + "lov_lock_cancel fails with %d.\n", result); + + cl_lock_closure_fini(closure); +} + static int lov_lock_wait(const struct lu_env *env, const struct cl_lock_slice *slice) { @@ -1049,6 +1120,7 @@ static const struct cl_lock_operations lov_lock_ops = { .clo_wait = lov_lock_wait, .clo_use = lov_lock_use, .clo_unuse = lov_lock_unuse, + .clo_cancel = lov_lock_cancel, .clo_fits_into = lov_lock_fits_into, .clo_delete = lov_lock_delete, .clo_print = lov_lock_print diff --git a/lustre/lov/lovsub_lock.c b/lustre/lov/lovsub_lock.c index f682995..0d75bbe 100644 --- a/lustre/lov/lovsub_lock.c +++ b/lustre/lov/lovsub_lock.c @@ -331,9 +331,11 @@ static int lovsub_lock_delete_one(const struct lu_env *env, int result; ENTRY; - parent = lov->lls_cl.cls_lock; - result = 0; + parent = lov->lls_cl.cls_lock; + if (parent->cll_error) + RETURN(0); + result = 0; switch (parent->cll_state) { case CLS_NEW: case CLS_QUEUING: diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index fffe551..e386396 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -323,12 +323,11 @@ static int cl_lockset_lock_one(const struct lu_env *env, ENTRY; - lock = cl_lock_request(env, io, &link->cill_descr, link->cill_enq_flags, - "io", io); + lock = cl_lock_request(env, io, &link->cill_descr, "io", io); if (!IS_ERR(lock)) { link->cill_lock = lock; list_move(&link->cill_linkage, &set->cls_curr); - if (!(link->cill_enq_flags & CEF_ASYNC)) { + if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) { result = cl_wait(env, lock); if (result == 0) list_move(&link->cill_linkage, &set->cls_done); @@ -573,7 +572,7 @@ static void cl_free_io_lock_link(const struct lu_env *env, * Allocates new lock link, and uses it to add a lock to a lockset. */ int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io, - struct cl_lock_descr *descr, int enqflags) + struct cl_lock_descr *descr) { struct cl_io_lock_link *link; int result; @@ -582,7 +581,6 @@ int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io, OBD_ALLOC_PTR(link); if (link != NULL) { link->cill_descr = *descr; - link->cill_enq_flags = enqflags; link->cill_fini = cl_free_io_lock_link; result = cl_io_lock_add(env, io, link); if (result) /* lock match */ diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index b85becc..50d44f5 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -365,6 +365,7 @@ EXPORT_SYMBOL(cl_lock_get_trust); static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock) { cl_lock_mutex_get(env, lock); + cl_lock_cancel(env, lock); cl_lock_delete(env, lock); cl_lock_mutex_put(env, lock); cl_lock_put(env, lock); @@ -509,9 +510,10 @@ static struct cl_lock *cl_lock_lookup(const struct lu_env *env, LASSERT(cl_is_lock(lock)); matched = cl_lock_ext_match(&lock->cll_descr, need) && - lock->cll_state < CLS_FREEING && - !(lock->cll_flags & CLF_CANCELLED) && - cl_lock_fits_into(env, lock, need, io); + lock->cll_state < CLS_FREEING && + lock->cll_error == 0 && + !(lock->cll_flags & CLF_CANCELLED) && + cl_lock_fits_into(env, lock, need, io); CDEBUG(D_DLMTRACE, "has: "DDESCR"(%i) need: "DDESCR": %d\n", PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need), matched); @@ -820,6 +822,7 @@ static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock) ENTRY; if (lock->cll_state < CLS_FREEING) { + LASSERT(lock->cll_state != CLS_INTRANSIT); cl_lock_state_set(env, lock, CLS_FREEING); head = cl_object_header(lock->cll_descr.cld_obj); @@ -1048,9 +1051,6 @@ static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock) do { result = 0; - if (lock->cll_error != 0) - break; - LINVRNT(cl_lock_is_mutexed(lock)); LINVRNT(cl_lock_invariant(env, lock)); LASSERT(lock->cll_state == CLS_INTRANSIT); @@ -1069,7 +1069,7 @@ static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock) LASSERT(result != -ENOSYS); } while (result == CLO_REPEAT); - return result ?: lock->cll_error; + return result; } /** @@ -1087,6 +1087,10 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic) ENTRY; cl_lock_trace(D_DLMTRACE, env, "use lock", lock); + LASSERT(lock->cll_state == CLS_CACHED); + if (lock->cll_error) + RETURN(lock->cll_error); + result = -ENOSYS; state = cl_lock_intransit(env, lock); list_for_each_entry(slice, &lock->cll_layers, cls_linkage) { @@ -1098,7 +1102,8 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic) } LASSERT(result != -ENOSYS); - LASSERT(lock->cll_state == CLS_INTRANSIT); + LASSERTF(lock->cll_state == CLS_INTRANSIT, "Wrong state %d.\n", + lock->cll_state); if (result == 0) { state = CLS_HELD; @@ -1116,17 +1121,7 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic) /* @atomic means back-off-on-failure. */ if (atomic) { int rc; - - do { - rc = cl_unuse_try_internal(env, lock); - if (rc == 0) - break; - if (rc == CLO_WAIT) - rc = cl_lock_state_wait(env, lock); - if (rc < 0) - break; - } while(1); - + rc = cl_unuse_try_internal(env, lock); /* Vet the results. */ if (rc < 0 && result > 0) result = rc; @@ -1251,8 +1246,7 @@ static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock, } while (1); if (result != 0) { cl_lock_user_del(env, lock); - if (result != -EINTR) - cl_lock_error(env, lock, result); + cl_lock_error(env, lock, result); } LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD)); @@ -1292,8 +1286,9 @@ EXPORT_SYMBOL(cl_enqueue); * * This function is called repeatedly by cl_unuse() until either lock is * unlocked, or error occurs. + * cl_unuse_try is a one-shot operation, so it must NOT return CLO_WAIT. * - * \pre lock->cll_state <= CLS_HELD || cl_lock_is_intransit(lock) + * \pre lock->cll_state == CLS_HELD * * \post ergo(result == 0, lock->cll_state == CLS_CACHED) * @@ -1308,30 +1303,24 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) ENTRY; cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock); - if (lock->cll_state != CLS_INTRANSIT) { - if (lock->cll_users > 1) { - cl_lock_user_del(env, lock); - RETURN(0); - } - /* - * New lock users (->cll_users) are not protecting unlocking - * from proceeding. From this point, lock eventually reaches - * CLS_CACHED, is reinitialized to CLS_NEW or fails into - * CLS_FREEING. - */ - state = cl_lock_intransit(env, lock); + LASSERT(lock->cll_state == CLS_HELD || lock->cll_state == CLS_ENQUEUED); + if (lock->cll_users > 1) { + cl_lock_user_del(env, lock); + RETURN(0); } + /* + * New lock users (->cll_users) are not protecting unlocking + * from proceeding. From this point, lock eventually reaches + * CLS_CACHED, is reinitialized to CLS_NEW or fails into + * CLS_FREEING. + */ + state = cl_lock_intransit(env, lock); + result = cl_unuse_try_internal(env, lock); LASSERT(lock->cll_state == CLS_INTRANSIT); - if (result != CLO_WAIT) - /* - * Once there is no more need to iterate ->clo_unuse() calls, - * remove lock user. This is done even if unrecoverable error - * happened during unlocking, because nothing else can be - * done. - */ - cl_lock_user_del(env, lock); + LASSERT(result != CLO_WAIT); + cl_lock_user_del(env, lock); if (result == 0 || result == -ESTALE) { /* * Return lock back to the cache. This is the only @@ -1342,7 +1331,10 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) * re-initialized. This happens e.g., when a sub-lock was * canceled while unlocking was in progress. */ - state = result == 0 ? CLS_CACHED : CLS_NEW; + if (state == CLS_HELD && result == 0) + state = CLS_CACHED; + else + state = CLS_NEW; cl_lock_extransit(env, lock, state); /* @@ -1356,7 +1348,7 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) */ result = 0; } else { - CWARN("result = %d, this is unlikely!\n", result); + CERROR("result = %d, this is unlikely!\n", result); cl_lock_extransit(env, lock, state); } @@ -1369,19 +1361,13 @@ EXPORT_SYMBOL(cl_unuse_try); static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock) { + int result; ENTRY; - LASSERT(lock->cll_state <= CLS_HELD); - do { - int result; - result = cl_unuse_try(env, lock); - if (result == CLO_WAIT) { - result = cl_lock_state_wait(env, lock); - if (result == 0) - continue; - } - break; - } while (1); + result = cl_unuse_try(env, lock); + if (result) + CL_LOCK_DEBUG(D_ERROR, env, lock, "unuse return %d\n", result); + EXIT; } @@ -1447,8 +1433,10 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock) } } LASSERT(result != -ENOSYS); - if (result == 0) + if (result == 0) { + LASSERT(lock->cll_state != CLS_INTRANSIT); cl_lock_state_set(env, lock, CLS_HELD); + } } while (result == CLO_REPEAT); RETURN(result ?: lock->cll_error); } @@ -1471,7 +1459,8 @@ int cl_wait(const struct lu_env *env, struct cl_lock *lock) cl_lock_mutex_get(env, lock); LINVRNT(cl_lock_invariant(env, lock)); - LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD); + LASSERTF(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD, + "Wrong state %d \n", lock->cll_state); LASSERT(lock->cll_holds > 0); cl_lock_trace(D_DLMTRACE, env, "wait lock", lock); @@ -1486,8 +1475,7 @@ int cl_wait(const struct lu_env *env, struct cl_lock *lock) } while (1); if (result < 0) { cl_lock_user_del(env, lock); - if (result != -EINTR) - cl_lock_error(env, lock, result); + cl_lock_error(env, lock, result); cl_lock_lockdep_release(env, lock); } cl_lock_mutex_put(env, lock); @@ -1812,6 +1800,7 @@ struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj, need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but * not PHANTOM */ need->cld_start = need->cld_end = page->cp_index; + need->cld_enq_flags = 0; spin_lock(&head->coh_lock_guard); /* It is fine to match any group lock since there could be only one @@ -2056,7 +2045,8 @@ static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env, if (IS_ERR(lock)) break; cl_lock_mutex_get(env, lock); - if (lock->cll_state < CLS_FREEING) { + if (lock->cll_state < CLS_FREEING && + !(lock->cll_flags & CLF_CANCELLED)) { cl_lock_hold_mod(env, lock, +1); lu_ref_add(&lock->cll_holders, scope, source); lu_ref_add(&lock->cll_reference, scope, source); @@ -2096,23 +2086,18 @@ EXPORT_SYMBOL(cl_lock_hold); */ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io, const struct cl_lock_descr *need, - __u32 enqflags, const char *scope, const void *source) { struct cl_lock *lock; const struct lu_fid *fid; int rc; int iter; - int warn; + __u32 enqflags = need->cld_enq_flags; ENTRY; fid = lu_object_fid(&io->ci_obj->co_lu); iter = 0; do { - warn = iter >= 16 && IS_PO2(iter); - CDEBUG(warn ? D_WARNING : D_DLMTRACE, - DDESCR"@"DFID" %i %08x `%s'\n", - PDESCR(need), PFID(fid), iter, enqflags, scope); lock = cl_lock_hold_mutex(env, io, need, scope, source); if (!IS_ERR(lock)) { rc = cl_enqueue_locked(env, lock, io, enqflags); @@ -2122,9 +2107,7 @@ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io, cl_lock_lockdep_acquire(env, lock, enqflags); break; - } else if (warn) - CL_LOCK_DEBUG(D_WARNING, env, lock, - "got (see bug 17665)\n"); + } cl_unuse_locked(env, lock); } cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock); diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 504196a..3f54f2e 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -995,10 +995,10 @@ static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco, descr->cld_start = cl_index(obj, start); descr->cld_end = cl_index(obj, end); descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ; + descr->cld_enq_flags = CEF_ASYNC | enqflags; io->ci_obj = obj; - lck = cl_lock_request(env, io, descr, CEF_ASYNC | enqflags, - "ec enqueue", eco); + lck = cl_lock_request(env, io, descr, "ec enqueue", eco); if (lck) { struct echo_client_obd *ec = eco->eo_dev->ed_ec; struct echo_lock *el; diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index e9d49fe..b9e711b 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -1591,24 +1591,40 @@ static int osc_lock_fits_into(const struct lu_env *env, { struct osc_lock *ols = cl2osc_lock(slice); - /* If the lock hasn't ever enqueued, it can't be matched because - * enqueue process brings in many information which can be used to - * determine things such as lockless, CEF_MUST, etc. - */ - if (ols->ols_state < OLS_ENQUEUED) - return 0; - - /* Don't match this lock if the lock is able to become lockless lock. - * This is because the new lock might be covering a mmap region and - * so that it must have a cached at the local side. */ - if (ols->ols_state < OLS_UPCALL_RECEIVED && ols->ols_locklessable) - return 0; - - /* If the lock is going to be canceled, no reason to match it as well */ - if (ols->ols_state > OLS_RELEASED) + if (need->cld_enq_flags & CEF_NEVER) return 0; - /* go for it. */ + if (need->cld_mode == CLM_PHANTOM) { + /* + * Note: the QUEUED lock can't be matched here, otherwise + * it might cause the deadlocks. + * In read_process, + * P1: enqueued read lock, create sublock1 + * P2: enqueued write lock, create sublock2(conflicted + * with sublock1). + * P1: Grant read lock. + * P1: enqueued glimpse lock(with holding sublock1_read), + * matched with sublock2, waiting sublock2 to be granted. + * But sublock2 can not be granted, because P1 + * will not release sublock1. Bang! + */ + if (ols->ols_state < OLS_GRANTED || + ols->ols_state > OLS_RELEASED) + return 0; + } else if (need->cld_enq_flags & CEF_MUST) { + /* + * If the lock hasn't ever enqueued, it can't be matched + * because enqueue process brings in many information + * which can be used to determine things such as lockless, + * CEF_MUST, etc. + */ + if (ols->ols_state < OLS_GRANTED || + ols->ols_state > OLS_RELEASED) + return 0; + if (ols->ols_state < OLS_UPCALL_RECEIVED && + ols->ols_locklessable) + return 0; + } return 1; } -- 1.8.3.1