From f9132c2d4d214772318d5b02865a8196a23d7b5e Mon Sep 17 00:00:00 2001 From: jxiong Date: Thu, 19 Nov 2009 03:01:03 +0000 Subject: [PATCH] b=19906 r=wangdi,ericm cleanup patch, add more debug info. --- lustre/include/cl_object.h | 4 +-- lustre/lclient/lcommon_cl.c | 8 +++--- lustre/lov/lovsub_lock.c | 3 ++- lustre/obdclass/cl_lock.c | 61 +++++++++++++++++++++++++++++++------------ lustre/osc/osc_io.c | 15 ++++++++++- lustre/osc/osc_lock.c | 63 ++++++++++++++++++++++++++++----------------- 6 files changed, 104 insertions(+), 50 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 20b14d2..ba66ba7 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1414,7 +1414,7 @@ enum cl_lock_state { * state, it must wait for the lock. * See state diagram for details. */ - CLS_INTRANSIT, + CLS_INTRANSIT, /** * Lock granted, not used. */ @@ -1667,7 +1667,7 @@ struct cl_lock_operations { * -ESTALE to indicate that lock cannot be returned to the cache, and * has to be re-initialized. * - * \see ccc_lock_unlock(), lov_lock_unlock(), osc_lock_unlock() + * \see ccc_lock_unuse(), lov_lock_unuse(), osc_lock_unuse() */ int (*clo_unuse)(const struct lu_env *env, const struct cl_lock_slice *slice); diff --git a/lustre/lclient/lcommon_cl.c b/lustre/lclient/lcommon_cl.c index dc2be2f..195ce87 100644 --- a/lustre/lclient/lcommon_cl.c +++ b/lustre/lclient/lcommon_cl.c @@ -629,9 +629,6 @@ int ccc_lock_fits_into(const struct lu_env *env, /* * Also, don't match incomplete write locks for read, otherwise read * would enqueue missing sub-locks in the write mode. - * - * XXX this is a candidate for generic locking policy, to be moved - * into cl_lock_lookup(). */ else if (need->cld_mode != descr->cld_mode) result = lock->cll_state >= CLS_ENQUEUED; @@ -693,8 +690,9 @@ void ccc_lock_state(const struct lu_env *env, cl_inode_mtime(inode) = attr->cat_mtime; cl_inode_atime(inode) = attr->cat_atime; cl_inode_ctime(inode) = attr->cat_ctime; - } else - CL_LOCK_DEBUG(D_ERROR, env, lock, "attr_get: %i\n", rc); + } else { + CL_LOCK_DEBUG(D_INFO, env, lock, "attr_get: %i\n", rc); + } cl_object_attr_unlock(obj); cl_isize_unlock(inode, 0); } diff --git a/lustre/lov/lovsub_lock.c b/lustre/lov/lovsub_lock.c index aab5aea..f682995 100644 --- a/lustre/lov/lovsub_lock.c +++ b/lustre/lov/lovsub_lock.c @@ -327,7 +327,7 @@ static int lovsub_lock_closure(const struct lu_env *env, static int lovsub_lock_delete_one(const struct lu_env *env, struct cl_lock *child, struct lov_lock *lov) { - struct cl_lock *parent; + struct cl_lock *parent; int result; ENTRY; @@ -411,6 +411,7 @@ static int lovsub_lock_delete_one(const struct lu_env *env, } break; case CLS_HELD: + CL_LOCK_DEBUG(D_ERROR, env, parent, "Delete CLS_HELD lock\n"); default: CERROR("Impossible state: %i\n", parent->cll_state); LBUG(); diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index 8db4297..b85becc 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -127,6 +127,23 @@ static struct cl_thread_counters *cl_lock_counters(const struct lu_env *env, return &info->clt_counters[nesting]; } +static void cl_lock_trace0(int level, const struct lu_env *env, + const char *prefix, const struct cl_lock *lock, + const char *func, const int line) +{ + struct cl_object_header *h = cl_object_header(lock->cll_descr.cld_obj); + CDEBUG(level, "%s: %p@(%i %p %i %d %d %d %d %lx)" + "(%p/%d/%i) at %s():%d\n", + prefix, lock, + atomic_read(&lock->cll_ref), lock->cll_guarder, lock->cll_depth, + lock->cll_state, lock->cll_error, lock->cll_holds, + lock->cll_users, lock->cll_flags, + env, h->coh_nesting, cl_lock_nr_mutexed(env), + func, line); +} +#define cl_lock_trace(level, env, prefix, lock) \ + cl_lock_trace0(level, env, prefix, lock, __FUNCTION__, __LINE__) + #define RETIP ((unsigned long)__builtin_return_address(0)) #ifdef CONFIG_LOCKDEP @@ -244,6 +261,7 @@ static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock) LINVRNT(!cl_lock_is_mutexed(lock)); ENTRY; + cl_lock_trace(D_DLMTRACE, env, "free lock", lock); might_sleep(); while (!list_empty(&lock->cll_layers)) { struct cl_lock_slice *slice; @@ -634,14 +652,6 @@ const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock, } EXPORT_SYMBOL(cl_lock_at); -static void cl_lock_trace(struct cl_thread_counters *counters, - const char *prefix, const struct cl_lock *lock) -{ - CDEBUG(D_DLMTRACE|D_TRACE, "%s: %i@%p %p %i %i\n", prefix, - atomic_read(&lock->cll_ref), lock, lock->cll_guarder, - lock->cll_depth, counters->ctc_nr_locks_locked); -} - static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock) { struct cl_thread_counters *counters; @@ -650,7 +660,7 @@ static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock) lock->cll_depth++; counters->ctc_nr_locks_locked++; lu_ref_add(&counters->ctc_locks_locked, "cll_guard", lock); - cl_lock_trace(counters, "got mutex", lock); + cl_lock_trace(D_TRACE, env, "got mutex", lock); } /** @@ -742,7 +752,7 @@ void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock) counters = cl_lock_counters(env, lock); LINVRNT(counters->ctc_nr_locks_locked > 0); - cl_lock_trace(counters, "put mutex", lock); + cl_lock_trace(D_TRACE, env, "put mutex", lock); lu_ref_del(&counters->ctc_locks_locked, "cll_guard", lock); counters->ctc_nr_locks_locked--; if (--lock->cll_depth == 0) { @@ -888,6 +898,7 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock, LASSERT(lock->cll_holds > 0); ENTRY; + cl_lock_trace(D_DLMTRACE, env, "hold release lock", lock); lu_ref_del(&lock->cll_holders, scope, source); cl_lock_hold_mod(env, lock, -1); if (lock->cll_holds == 0) { @@ -942,6 +953,7 @@ int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock) LASSERT(lock->cll_depth == 1); LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */ + cl_lock_trace(D_DLMTRACE, env, "state wait lock", lock); result = lock->cll_error; if (result == 0) { cfs_waitlink_init(&waiter); @@ -987,6 +999,7 @@ static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock, void cl_lock_signal(const struct lu_env *env, struct cl_lock *lock) { ENTRY; + cl_lock_trace(D_DLMTRACE, env, "state signal lock", lock); cl_lock_state_signal(env, lock, lock->cll_state); EXIT; } @@ -1072,8 +1085,9 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic) enum cl_lock_state state; ENTRY; - result = -ENOSYS; + cl_lock_trace(D_DLMTRACE, env, "use lock", lock); + result = -ENOSYS; state = cl_lock_intransit(env, lock); list_for_each_entry(slice, &lock->cll_layers, cls_linkage) { if (slice->cls_ops->clo_use != NULL) { @@ -1168,6 +1182,7 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock, int result; ENTRY; + cl_lock_trace(D_DLMTRACE, env, "enqueue lock", lock); do { result = 0; @@ -1291,6 +1306,8 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) enum cl_lock_state state = CLS_NEW; ENTRY; + cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock); + if (lock->cll_state != CLS_INTRANSIT) { if (lock->cll_users > 1) { cl_lock_user_del(env, lock); @@ -1398,6 +1415,7 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock) int result; ENTRY; + cl_lock_trace(D_DLMTRACE, env, "wait lock try", lock); do { LINVRNT(cl_lock_is_mutexed(lock)); LINVRNT(cl_lock_invariant(env, lock)); @@ -1455,6 +1473,7 @@ int cl_wait(const struct lu_env *env, struct cl_lock *lock) LINVRNT(cl_lock_invariant(env, lock)); LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD); LASSERT(lock->cll_holds > 0); + cl_lock_trace(D_DLMTRACE, env, "wait lock", lock); do { result = cl_wait_try(env, lock); @@ -1523,6 +1542,7 @@ int cl_lock_modify(const struct lu_env *env, struct cl_lock *lock, int result; ENTRY; + cl_lock_trace(D_DLMTRACE, env, "modify lock", lock); /* don't allow object to change */ LASSERT(obj == desc->cld_obj); LINVRNT(cl_lock_is_mutexed(lock)); @@ -1615,8 +1635,9 @@ EXPORT_SYMBOL(cl_lock_closure_build); int cl_lock_enclosure(const struct lu_env *env, struct cl_lock *lock, struct cl_lock_closure *closure) { - int result; + int result = 0; ENTRY; + cl_lock_trace(D_DLMTRACE, env, "enclosure lock", lock); if (!cl_lock_mutex_try(env, lock)) { /* * If lock->cll_inclosure is not empty, lock is already in @@ -1658,6 +1679,7 @@ void cl_lock_disclosure(const struct lu_env *env, struct cl_lock *scan; struct cl_lock *temp; + cl_lock_trace(D_DLMTRACE, env, "disclosure lock", closure->clc_origin); list_for_each_entry_safe(scan, temp, &closure->clc_list, cll_inclosure){ list_del_init(&scan->cll_inclosure); cl_lock_mutex_put(env, scan); @@ -1706,6 +1728,7 @@ void cl_lock_delete(const struct lu_env *env, struct cl_lock *lock) cl_lock_nr_mutexed(env) == 1)); ENTRY; + cl_lock_trace(D_DLMTRACE, env, "delete lock", lock); if (lock->cll_holds == 0) cl_lock_delete0(env, lock); else @@ -1730,6 +1753,7 @@ void cl_lock_error(const struct lu_env *env, struct cl_lock *lock, int error) LINVRNT(cl_lock_invariant(env, lock)); ENTRY; + cl_lock_trace(D_DLMTRACE, env, "set lock error", lock); if (lock->cll_error == 0 && error != 0) { lock->cll_error = error; cl_lock_signal(env, lock); @@ -1757,6 +1781,7 @@ void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock) LINVRNT(cl_lock_invariant(env, lock)); ENTRY; + cl_lock_trace(D_DLMTRACE, env, "cancel lock", lock); if (lock->cll_holds == 0) cl_lock_cancel0(env, lock); else @@ -2102,6 +2127,7 @@ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io, "got (see bug 17665)\n"); cl_unuse_locked(env, lock); } + cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock); cl_lock_hold_release(env, lock, scope, source); cl_lock_mutex_put(env, lock); lu_ref_del(&lock->cll_reference, scope, source); @@ -2158,6 +2184,7 @@ void cl_lock_release(const struct lu_env *env, struct cl_lock *lock, { LINVRNT(cl_lock_invariant(env, lock)); ENTRY; + cl_lock_trace(D_DLMTRACE, env, "release lock", lock); cl_lock_mutex_get(env, lock); cl_lock_hold_release(env, lock, scope, source); cl_lock_mutex_put(env, lock); @@ -2212,15 +2239,15 @@ EXPORT_SYMBOL(cl_lock_compatible); const char *cl_lock_mode_name(const enum cl_lock_mode mode) { static const char *names[] = { - [CLM_PHANTOM] = "PHANTOM", - [CLM_READ] = "READ", - [CLM_WRITE] = "WRITE", - [CLM_GROUP] = "GROUP" + [CLM_PHANTOM] = "P", + [CLM_READ] = "R", + [CLM_WRITE] = "W", + [CLM_GROUP] = "G" }; if (0 <= mode && mode < ARRAY_SIZE(names)) return names[mode]; else - return "UNKNW"; + return "U"; } EXPORT_SYMBOL(cl_lock_mode_name); diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 8680a7b..294f415 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -637,7 +637,20 @@ static void osc_req_attr_set(const struct lu_env *env, opg = osc_cl_page_osc(apage); apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */ lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1); - LASSERT(lock != NULL); + if (lock == NULL) { + struct cl_object_header *head; + struct cl_lock *scan; + + head = cl_object_header(apage->cp_obj); + list_for_each_entry(scan, &head->coh_locks, cll_linkage) + CL_LOCK_DEBUG(D_ERROR, env, scan, + "no cover page!\n"); + CL_PAGE_DEBUG(D_ERROR, env, apage, + "dump uncover page!\n"); + libcfs_debug_dumpstack(NULL); + LBUG(); + } + olck = osc_lock_at(lock); LASSERT(olck != NULL); /* check for lockless io. */ diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 6ca2014..e9d49fe 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -135,6 +135,8 @@ static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck) { struct ldlm_lock *dlmlock; + /* reset the osc lock's state because it might be queued again. */ + olck->ols_state = OLS_NEW; spin_lock(&osc_ast_guard); dlmlock = olck->ols_lock; if (dlmlock == NULL) { @@ -167,15 +169,28 @@ static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck) unlock_res_and_lock(dlmlock); /* release a reference taken in osc_lock_upcall0(). */ + LASSERT(olck->ols_has_ref); lu_ref_del(&dlmlock->l_reference, "osc_lock", olck); LDLM_LOCK_RELEASE(dlmlock); + olck->ols_has_ref = 0; +} + +static int osc_lock_unhold(struct osc_lock *ols) +{ + int result = 0; + + if (ols->ols_hold) { + ols->ols_hold = 0; + result = osc_cancel_base(&ols->ols_handle, + ols->ols_einfo.ei_mode); + } + return result; } static int osc_lock_unuse(const struct lu_env *env, const struct cl_lock_slice *slice) { struct osc_lock *ols = cl2osc_lock(slice); - int result; LASSERT(ols->ols_state == OLS_GRANTED || ols->ols_state == OLS_UPCALL_RECEIVED); @@ -193,10 +208,7 @@ static int osc_lock_unuse(const struct lu_env *env, * e.g., for liblustre) sees that lock is released. */ ols->ols_state = OLS_RELEASED; - ols->ols_hold = 0; - result = osc_cancel_base(&ols->ols_handle, ols->ols_einfo.ei_mode); - ols->ols_has_ref = 0; - return result; + return osc_lock_unhold(ols); } static void osc_lock_fini(const struct lu_env *env, @@ -211,8 +223,7 @@ static void osc_lock_fini(const struct lu_env *env, * to the lock), before reply from a server was received. In this case * lock is destroyed immediately after upcall. */ - if (ols->ols_hold) - osc_lock_unuse(env, slice); + osc_lock_unhold(ols); LASSERT(ols->ols_lock == NULL); OBD_SLAB_FREE_PTR(ols, osc_lock_kmem); @@ -462,11 +473,12 @@ static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck) * this. */ ldlm_lock_addref(&olck->ols_handle, olck->ols_einfo.ei_mode); - olck->ols_hold = olck->ols_has_ref = 1; + olck->ols_hold = 1; /* lock reference taken by ldlm_handle2lock_long() is owned by * osc_lock and released in osc_lock_detach() */ lu_ref_add(&dlmlock->l_reference, "osc_lock", olck); + olck->ols_has_ref = 1; } /** @@ -569,12 +581,11 @@ static void osc_lock_blocking(const struct lu_env *env, CLASSERT(OLS_BLOCKED < OLS_CANCELLED); LASSERT(!osc_lock_is_lockless(olck)); - if (olck->ols_hold) - /* - * Lock might be still addref-ed here, if e.g., blocking ast - * is sent for a failed lock. - */ - osc_lock_unuse(env, &olck->ols_cl); + /* + * Lock might be still addref-ed here, if e.g., blocking ast + * is sent for a failed lock. + */ + osc_lock_unhold(olck); if (blocking && olck->ols_state < OLS_BLOCKED) /* @@ -771,7 +782,7 @@ static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock, LASSERT(dlmlock->l_lvb_data != NULL); lock_res_and_lock(dlmlock); olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data; - if (olck->ols_lock == NULL) + if (olck->ols_lock == NULL) { /* * upcall (osc_lock_upcall()) hasn't yet been * called. Do nothing now, upcall will bind @@ -781,12 +792,17 @@ static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock, * and ldlm_lock are always bound when * osc_lock is in OLS_GRANTED state. */ - ; - else if (dlmlock->l_granted_mode != LCK_MINMODE) + } else if (dlmlock->l_granted_mode == + dlmlock->l_req_mode) { osc_lock_granted(env, olck, dlmlock, dlmrc); + } unlock_res_and_lock(dlmlock); - if (dlmrc != 0) + + if (dlmrc != 0) { + CL_LOCK_DEBUG(D_ERROR, env, lock, + "dlmlock returned %d\n", dlmrc); cl_lock_error(env, lock, dlmrc); + } cl_lock_mutex_put(env, lock); osc_ast_data_put(env, olck); result = 0; @@ -1343,13 +1359,14 @@ static int osc_lock_use(const struct lu_env *env, int rc; LASSERT(!olck->ols_hold); + /* * Atomically check for LDLM_FL_CBPENDING and addref a lock if this * flag is not set. This protects us from a concurrent blocking ast. */ rc = ldlm_lock_addref_try(&olck->ols_handle, olck->ols_einfo.ei_mode); if (rc == 0) { - olck->ols_hold = olck->ols_has_ref = 1; + olck->ols_hold = 1; olck->ols_state = OLS_GRANTED; } else { struct cl_lock *lock; @@ -1422,8 +1439,7 @@ static void osc_lock_cancel(const struct lu_env *env, discard = dlmlock->l_flags & LDLM_FL_DISCARD_DATA; result = osc_lock_flush(olck, discard); - if (olck->ols_hold) - osc_lock_unuse(env, slice); + osc_lock_unhold(olck); lock_res_and_lock(dlmlock); /* Now that we're the only user of dlm read/write reference, @@ -1521,8 +1537,7 @@ static void osc_lock_delete(const struct lu_env *env, LINVRNT(osc_lock_invariant(olck)); LINVRNT(!osc_lock_has_pages(olck)); - if (olck->ols_hold) - osc_lock_unuse(env, slice); + osc_lock_unhold(olck); osc_lock_detach(env, olck); } @@ -1562,7 +1577,7 @@ static int osc_lock_print(const struct lu_env *env, void *cookie, /* * XXX print ldlm lock and einfo properly. */ - (*p)(env, cookie, "%p %08x "LPU64" %d %p ", + (*p)(env, cookie, "%p %08x "LPX64" %d %p ", lock->ols_lock, lock->ols_flags, lock->ols_handle.cookie, lock->ols_state, lock->ols_owner); osc_lvb_print(env, cookie, p, &lock->ols_lvb); -- 1.8.3.1