}
/**
- * An implementation of cl_lock_operations::clo_delete() method. This is
- * invoked in "bottom-to-top" delete, when lock destruction starts from the
- * sub-lock (e.g, as a result of ldlm lock LRU policy).
+ * A helper function for lovsub_lock_delete() that deals with a given parent
+ * top-lock.
*/
-static void lovsub_lock_delete(const struct lu_env *env,
- const struct cl_lock_slice *slice)
+static int lovsub_lock_delete_one(const struct lu_env *env,
+ struct cl_lock *child, struct lov_lock *lov)
{
- struct lovsub_lock *sub = cl2lovsub_lock(slice);
- struct lov_lock *lov;
struct cl_lock *parent;
- struct lov_lock_link *scan;
- struct lov_lock_link *temp;
- struct lov_lock_sub *subdata;
-
- LASSERT(cl_lock_is_mutexed(slice->cls_lock));
+ int result;
ENTRY;
- list_for_each_entry_safe(scan, temp, &sub->lss_parents, lll_list) {
- lov = scan->lll_super;
- subdata = &lov->lls_sub[scan->lll_idx];
- parent = lov->lls_cl.cls_lock;
- lovsub_parent_lock(env, lov);
- subdata->sub_got = subdata->sub_descr;
- lov_lock_unlink(env, scan, sub);
- CDEBUG(D_DLMTRACE, "%p %p %i %i\n", parent, sub,
- lov->lls_nr_filled, parent->cll_state);
- switch (parent->cll_state) {
- case CLS_NEW:
- case CLS_QUEUING:
- case CLS_ENQUEUED:
- case CLS_FREEING:
- cl_lock_signal(env, parent);
- break;
- case CLS_UNLOCKING:
- /*
- * Here lies a problem: a sub-lock is canceled while
- * top-lock is being unlocked. Top-lock cannot be
- * moved into CLS_NEW state, because unlocking has to
- * succeed eventually by placing lock into CLS_CACHED
- * (or failing it), see cl_unuse_try(). Nor can
- * top-lock be left in CLS_CACHED state, because lov
- * maintains an invariant that all sub-locks exist in
- * CLS_CACHED (this allows cached top-lock to be
- * reused immediately). Nor can we wait for top-lock
- * state to change, because this can be synchronous to
- * the current thread.
+ parent = lov->lls_cl.cls_lock;
+ result = 0;
+
+ switch (parent->cll_state) {
+ case CLS_NEW:
+ case CLS_QUEUING:
+ case CLS_ENQUEUED:
+ case CLS_FREEING:
+ cl_lock_signal(env, parent);
+ break;
+ case CLS_UNLOCKING:
+ /*
+ * Here lies a problem: a sub-lock is canceled while top-lock
+ * is being unlocked. Top-lock cannot be moved into CLS_NEW
+ * state, because unlocking has to succeed eventually by
+ * placing lock into CLS_CACHED (or failing it), see
+ * cl_unuse_try(). Nor can top-lock be left in CLS_CACHED
+ * state, because lov maintains an invariant that all
+ * sub-locks exist in CLS_CACHED (this allows cached top-lock
+ * to be reused immediately). Nor can we wait for top-lock
+ * state to change, because this can be synchronous to the
+ * current thread.
+ *
+ * We know for sure that lov_lock_unuse() will be called at
+ * least one more time to finish un-using, so leave a mark on
+ * the top-lock, that will be seen by the next call to
+ * lov_lock_unuse().
+ */
+ lov->lls_unuse_race = 1;
+ break;
+ case CLS_CACHED:
+ /*
+ * if a sub-lock is canceled move its top-lock into CLS_NEW
+ * state to preserve an invariant that a top-lock in
+ * CLS_CACHED is immediately ready for re-use (i.e., has all
+ * sub-locks), and so that next attempt to re-use the top-lock
+ * enqueues missing sub-lock.
+ */
+ cl_lock_state_set(env, parent, CLS_NEW);
+ /*
+ * if last sub-lock is canceled, destroy the top-lock (which
+ * is now `empty') proactively.
+ */
+ if (lov->lls_nr_filled == 0) {
+ /* ... but unfortunately, this cannot be done easily,
+ * as cancellation of a top-lock might acquire mutices
+ * of its other sub-locks, violating lock ordering,
+ * see cl_lock_{cancel,delete}() preconditions.
*
- * We know for sure that lov_lock_unuse() will be
- * called at least one more time to finish un-using,
- * so leave a mark on the top-lock, that will be seen
- * by the next call to lov_lock_unuse().
+ * To work around this, the mutex of this sub-lock is
+ * released, top-lock is destroyed, and sub-lock mutex
+ * acquired again. The list of parents has to be
+ * re-scanned from the beginning after this.
+ *
+ * Only do this if no mutices other than on @child and
+ * @parent are held by the current thread.
+ *
+ * TODO: The lock modal here is too complex, because
+ * the lock may be canceled and deleted by voluntarily:
+ * cl_lock_request
+ * -> osc_lock_enqueue_wait
+ * -> osc_lock_cancel_wait
+ * -> cl_lock_delete
+ * -> lovsub_lock_delete
+ * -> cl_lock_cancel/delete
+ * -> ...
+ *
+ * The better choice is to spawn a kernel thread for
+ * this purpose. -jay
*/
- lov->lls_unuse_race = 1;
- break;
- case CLS_CACHED:
- cl_lock_state_set(env, parent, CLS_NEW);
- if (lov->lls_nr_filled == 0) {
+ if (cl_lock_nr_mutexed(env) == 2) {
+ cl_lock_mutex_put(env, child);
cl_lock_cancel(env, parent);
cl_lock_delete(env, parent);
- cl_lock_signal(env, parent);
+ result = 1;
}
- break;
- case CLS_HELD:
- default:
- CERROR("Impossible state: %i\n", parent->cll_state);
- LBUG();
}
- lovsub_parent_unlock(env, lov);
+ break;
+ case CLS_HELD:
+ default:
+ CERROR("Impossible state: %i\n", parent->cll_state);
+ LBUG();
}
+
+ RETURN(result);
+}
+
+/**
+ * An implementation of cl_lock_operations::clo_delete() method. This is
+ * invoked in "bottom-to-top" delete, when lock destruction starts from the
+ * sub-lock (e.g, as a result of ldlm lock LRU policy).
+ */
+static void lovsub_lock_delete(const struct lu_env *env,
+ const struct cl_lock_slice *slice)
+{
+ struct cl_lock *child = slice->cls_lock;
+ struct lovsub_lock *sub = cl2lovsub_lock(slice);
+ int restart;
+
+ LASSERT(cl_lock_is_mutexed(child));
+
+ ENTRY;
+ /*
+ * Destruction of a sub-lock might take multiple iterations, because
+ * when the last sub-lock of a given top-lock is deleted, top-lock is
+ * canceled proactively, and this requires to release sub-lock
+ * mutex. Once sub-lock mutex has been released, list of its parents
+ * has to be re-scanned from the beginning.
+ */
+ do {
+ struct lov_lock *lov;
+ struct lov_lock_link *scan;
+ struct lov_lock_link *temp;
+ struct lov_lock_sub *subdata;
+
+ restart = 0;
+ list_for_each_entry_safe(scan, temp,
+ &sub->lss_parents, lll_list) {
+ lov = scan->lll_super;
+ subdata = &lov->lls_sub[scan->lll_idx];
+ lovsub_parent_lock(env, lov);
+ subdata->sub_got = subdata->sub_descr;
+ lov_lock_unlink(env, scan, sub);
+ restart = lovsub_lock_delete_one(env, child, lov);
+ lovsub_parent_unlock(env, lov);
+
+ if (restart) {
+ cl_lock_mutex_get(env, child);
+ break;
+ }
+ }
+ } while (restart);
EXIT;
}
#define CLT_PVEC_SIZE (14)
/**
+ * Possible levels of the nesting. Currently this is 2: there are "top"
+ * entities (files, extent locks), and "sub" entities (stripes and stripe
+ * locks). This is used only for debugging counters right now.
+ */
+enum clt_nesting_level {
+ CNL_TOP,
+ CNL_SUB,
+ CNL_NR
+};
+
+/**
+ * Counters used to check correctness of cl_lock interface usage.
+ */
+struct cl_thread_counters {
+ /**
+ * Number of outstanding calls to cl_lock_mutex_get() made by the
+ * current thread. For debugging.
+ */
+ int ctc_nr_locks_locked;
+ /** List of locked locks. */
+ struct lu_ref ctc_locks_locked;
+ /** Number of outstanding holds on locks. */
+ int ctc_nr_held;
+ /** Number of outstanding uses on locks. */
+ int ctc_nr_used;
+ /** Number of held extent locks. */
+ int ctc_nr_locks_acquired;
+};
+
+/**
* Thread local state internal for generic cl-code.
*/
struct cl_thread_info {
struct cl_lock_descr clt_descr;
struct cl_page_list clt_list;
/**
- * \name debugging.
- *
- * Counters used to check correctness of cl_lock interface usage.
- * @{
+ * Counters for every level of lock nesting.
*/
- /**
- * Number of outstanding calls to cl_lock_mutex_get() made by the
- * current thread. For debugging.
- */
- int clt_nr_locks_locked;
- /** List of locked locks. */
- struct lu_ref clt_locks_locked;
- /** Number of outstanding holds on the top-level locks. */
- int clt_nr_held;
- /** Number of outstanding uses on the top-level locks. */
- int clt_nr_used;
- /** Number of held top-level extent locks. */
- int clt_nr_locks_acquired;
+ struct cl_thread_counters clt_counters[CNL_NR];
/** @} debugging */
/*
scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
}
io->ci_state = CIS_UNLOCKED;
- LASSERT(cl_env_info(env)->clt_nr_locks_acquired == 0);
+ LASSERT(!cl_env_info(env)->clt_counters[CNL_TOP].ctc_nr_locks_acquired);
EXIT;
}
EXPORT_SYMBOL(cl_io_unlock);
return result;
}
+/**
+ * Returns lock "nesting": 0 for a top-lock and 1 for a sub-lock.
+ */
+static enum clt_nesting_level cl_lock_nesting(const struct cl_lock *lock)
+{
+ return cl_object_header(lock->cll_descr.cld_obj)->coh_nesting;
+}
+
+/**
+ * Returns a set of counters for this lock, depending on a lock nesting.
+ */
+static struct cl_thread_counters *cl_lock_counters(const struct lu_env *env,
+ const struct cl_lock *lock)
+{
+ struct cl_thread_info *info;
+ enum clt_nesting_level nesting;
+
+ info = cl_env_info(env);
+ nesting = cl_lock_nesting(lock);
+ LASSERT(nesting < ARRAY_SIZE(info->clt_counters));
+ return &info->clt_counters[nesting];
+}
+
#define RETIP ((unsigned long)__builtin_return_address(0))
#ifdef CONFIG_LOCKDEP
static void cl_lock_lockdep_acquire(const struct lu_env *env,
struct cl_lock *lock, __u32 enqflags)
{
- cl_env_info(env)->clt_nr_locks_acquired++;
+ cl_lock_counters(env, lock)->ctc_nr_locks_acquired++;
lock_acquire(&lock->dep_map, !!(enqflags & CEF_ASYNC),
/* try: */ 0, lock->cll_descr.cld_mode <= CLM_READ,
/* check: */ 2, RETIP);
static void cl_lock_lockdep_release(const struct lu_env *env,
struct cl_lock *lock)
{
- cl_env_info(env)->clt_nr_locks_acquired--;
+ cl_lock_counters(env, lock)->ctc_nr_locks_acquired--;
lock_release(&lock->dep_map, 0, RETIP);
}
}
EXPORT_SYMBOL(cl_lock_at);
-static void cl_lock_trace(struct cl_thread_info *info,
+static void cl_lock_trace(struct cl_thread_counters *counters,
const char *prefix, const struct cl_lock *lock)
{
CDEBUG(D_DLMTRACE|D_TRACE, "%s: %i@%p %p %i %i\n", prefix,
atomic_read(&lock->cll_ref), lock, lock->cll_guarder,
- lock->cll_depth, info->clt_nr_locks_locked);
+ lock->cll_depth, counters->ctc_nr_locks_locked);
}
static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
{
- struct cl_thread_info *info;
+ struct cl_thread_counters *counters;
- info = cl_env_info(env);
+ counters = cl_lock_counters(env, lock);
lock->cll_depth++;
- info->clt_nr_locks_locked++;
- lu_ref_add(&info->clt_locks_locked, "cll_guard", lock);
- cl_lock_trace(info, "got mutex", lock);
+ counters->ctc_nr_locks_locked++;
+ lu_ref_add(&counters->ctc_locks_locked, "cll_guard", lock);
+ cl_lock_trace(counters, "got mutex", lock);
}
/**
LINVRNT(lock->cll_depth > 0);
} else {
struct cl_object_header *hdr;
+ struct cl_thread_info *info;
+ int i;
LINVRNT(lock->cll_guarder != cfs_current());
hdr = cl_object_header(lock->cll_descr.cld_obj);
+ /*
+ * Check that mutices are taken in the bottom-to-top order.
+ */
+ info = cl_env_info(env);
+ for (i = 0; i < hdr->coh_nesting; ++i)
+ LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
mutex_lock_nested(&lock->cll_guard, hdr->coh_nesting);
lock->cll_guarder = cfs_current();
LINVRNT(lock->cll_depth == 0);
*/
void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock)
{
- struct cl_thread_info *info;
+ struct cl_thread_counters *counters;
LINVRNT(cl_lock_invariant(env, lock));
LINVRNT(cl_lock_is_mutexed(lock));
LINVRNT(lock->cll_guarder == cfs_current());
LINVRNT(lock->cll_depth > 0);
- info = cl_env_info(env);
- LINVRNT(info->clt_nr_locks_locked > 0);
+ counters = cl_lock_counters(env, lock);
+ LINVRNT(counters->ctc_nr_locks_locked > 0);
- cl_lock_trace(info, "put mutex", lock);
- lu_ref_del(&info->clt_locks_locked, "cll_guard", lock);
- info->clt_nr_locks_locked--;
+ cl_lock_trace(counters, "put mutex", lock);
+ lu_ref_del(&counters->ctc_locks_locked, "cll_guard", lock);
+ counters->ctc_nr_locks_locked--;
if (--lock->cll_depth == 0) {
lock->cll_guarder = NULL;
mutex_unlock(&lock->cll_guard);
*/
int cl_lock_nr_mutexed(const struct lu_env *env)
{
- return cl_env_info(env)->clt_nr_locks_locked;
+ struct cl_thread_info *info;
+ int i;
+ int locked;
+
+ /*
+ * NOTE: if summation across all nesting levels (currently 2) proves
+ * too expensive, a summary counter can be added to
+ * struct cl_thread_info.
+ */
+ info = cl_env_info(env);
+ for (i = 0, locked = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
+ locked += info->clt_counters[i].ctc_nr_locks_locked;
+ return locked;
}
EXPORT_SYMBOL(cl_lock_nr_mutexed);
EXIT;
}
+/**
+ * Mod(ifie)s cl_lock::cll_holds counter for a given lock. Also, for a
+ * top-lock (nesting == 0) accounts for this modification in the per-thread
+ * debugging counters. Sub-lock holds can be released by a thread different
+ * from one that acquired it.
+ */
static void cl_lock_hold_mod(const struct lu_env *env, struct cl_lock *lock,
int delta)
{
- struct cl_thread_info *cti;
- struct cl_object_header *hdr;
+ struct cl_thread_counters *counters;
+ enum clt_nesting_level nesting;
- cti = cl_env_info(env);
- hdr = cl_object_header(lock->cll_descr.cld_obj);
lock->cll_holds += delta;
- if (hdr->coh_nesting == 0) {
- cti->clt_nr_held += delta;
- LASSERT(cti->clt_nr_held >= 0);
+ nesting = cl_lock_nesting(lock);
+ if (nesting == CNL_TOP) {
+ counters = &cl_env_info(env)->clt_counters[CNL_TOP];
+ counters->ctc_nr_held += delta;
+ LASSERT(counters->ctc_nr_held >= 0);
}
}
+/**
+ * Mod(ifie)s cl_lock::cll_users counter for a given lock. See
+ * cl_lock_hold_mod() for the explanation of the debugging code.
+ */
static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock,
int delta)
{
- struct cl_thread_info *cti;
- struct cl_object_header *hdr;
+ struct cl_thread_counters *counters;
+ enum clt_nesting_level nesting;
- cti = cl_env_info(env);
- hdr = cl_object_header(lock->cll_descr.cld_obj);
lock->cll_users += delta;
- if (hdr->coh_nesting == 0) {
- cti->clt_nr_used += delta;
- LASSERT(cti->clt_nr_used >= 0);
+ nesting = cl_lock_nesting(lock);
+ if (nesting == CNL_TOP) {
+ counters = &cl_env_info(env)->clt_counters[CNL_TOP];
+ counters->ctc_nr_used += delta;
+ LASSERT(counters->ctc_nr_used >= 0);
}
}
* cl_lock_put() to finish it.
*
* \pre atomic_read(&lock->cll_ref) > 0
+ * \pre ergo(cl_lock_nesting(lock) == CNL_TOP,
+ * cl_lock_nr_mutexed(env) == 1)
+ * [i.e., if a top-lock is deleted, mutices of no other locks can be
+ * held, as deletion of sub-locks might require releasing a top-lock
+ * mutex]
*
* \see cl_lock_operations::clo_delete()
* \see cl_lock::cll_holds
{
LINVRNT(cl_lock_is_mutexed(lock));
LINVRNT(cl_lock_invariant(env, lock));
+ LASSERT(ergo(cl_lock_nesting(lock) == CNL_TOP,
+ cl_lock_nr_mutexed(env) == 1));
ENTRY;
if (lock->cll_holds == 0)
* time-out happens.
*
* \pre atomic_read(&lock->cll_ref) > 0
+ * \pre ergo(cl_lock_nesting(lock) == CNL_TOP,
+ * cl_lock_nr_mutexed(env) == 1)
+ * [i.e., if a top-lock failed, mutices of no other locks can be held, as
+ * failing sub-locks might require releasing a top-lock mutex]
*
* \see clo_lock_delete()
* \see cl_lock::cll_holds
*
* Cancellation notification is delivered to layers at most once.
*
+ * \pre ergo(cl_lock_nesting(lock) == CNL_TOP,
+ * cl_lock_nr_mutexed(env) == 1)
+ * [i.e., if a top-lock is canceled, mutices of no other locks can be
+ * held, as cancellation of sub-locks might require releasing a top-lock
+ * mutex]
+ *
* \see cl_lock_operations::clo_cancel()
* \see cl_lock::cll_holds
*/
{
LINVRNT(cl_lock_is_mutexed(lock));
LINVRNT(cl_lock_invariant(env, lock));
+ LASSERT(ergo(cl_lock_nesting(lock) == CNL_TOP,
+ cl_lock_nr_mutexed(env) == 1));
+
ENTRY;
if (lock->cll_holds == 0)
cl_lock_cancel0(env, lock);
struct cl_thread_info *info;
info = cl0_key_init(ctx, key);
- if (!IS_ERR(info))
- lu_ref_init(&info->clt_locks_locked);
+ if (!IS_ERR(info)) {
+ int i;
+
+ for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
+ lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
+ }
return info;
}
struct lu_context_key *key, void *data)
{
struct cl_thread_info *info;
+ int i;
info = data;
- lu_ref_fini(&info->clt_locks_locked);
+ for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
+ lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
cl0_key_fini(ctx, key, data);
}
struct lu_context_key *key, void *data)
{
struct cl_thread_info *info = data;
+ int i;
- LASSERT(info->clt_nr_locks_locked == 0);
- LASSERT(info->clt_nr_held == 0);
- LASSERT(info->clt_nr_used == 0);
- LASSERT(info->clt_nr_locks_acquired == 0);
-
- lu_ref_fini(&info->clt_locks_locked);
- lu_ref_init(&info->clt_locks_locked);
+ for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i) {
+ LASSERT(info->clt_counters[i].ctc_nr_held == 0);
+ LASSERT(info->clt_counters[i].ctc_nr_used == 0);
+ LASSERT(info->clt_counters[i].ctc_nr_locks_acquired == 0);
+ LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
+ lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
+ lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
+ }
}
static struct lu_context_key cl_key = {