Whamcloud - gitweb
LU-1061 agl: cl_locks_prune() waits for the last user
[fs/lustre-release.git] / lustre / obdclass / cl_lock.c
index c3dd3c4..4202603 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -54,7 +57,7 @@
 #include "cl_internal.h"
 
 /** Lock class of cl_lock::cll_guard */
-static struct lock_class_key cl_lock_guard_class;
+static cfs_lock_class_key_t cl_lock_guard_class;
 static cfs_mem_cache_t *cl_lock_kmem;
 
 static struct lu_kmem_descr cl_lock_caches[] = {
@@ -77,10 +80,8 @@ static struct lu_kmem_descr cl_lock_caches[] = {
 static int cl_lock_invariant_trusted(const struct lu_env *env,
                                      const struct cl_lock *lock)
 {
-        return
-                cl_is_lock(lock) &&
-                ergo(lock->cll_state == CLS_FREEING, lock->cll_holds == 0) &&
-                atomic_read(&lock->cll_ref) >= lock->cll_holds &&
+        return  ergo(lock->cll_state == CLS_FREEING, lock->cll_holds == 0) &&
+                cfs_atomic_read(&lock->cll_ref) >= lock->cll_holds &&
                 lock->cll_holds >= lock->cll_users &&
                 lock->cll_holds >= 0 &&
                 lock->cll_users >= 0 &&
@@ -97,17 +98,57 @@ static int cl_lock_invariant(const struct lu_env *env,
 {
         int result;
 
-        result = atomic_read(&lock->cll_ref) > 0 &&
+        result = cfs_atomic_read(&lock->cll_ref) > 0 &&
                 cl_lock_invariant_trusted(env, lock);
         if (!result && env != NULL)
                 CL_LOCK_DEBUG(D_ERROR, env, lock, "invariant broken");
         return result;
 }
 
+/**
+ * Returns lock "nesting": 0 for a top-lock and 1 for a sub-lock.
+ */
+static enum clt_nesting_level cl_lock_nesting(const struct cl_lock *lock)
+{
+        return cl_object_header(lock->cll_descr.cld_obj)->coh_nesting;
+}
+
+/**
+ * Returns a set of counters for this lock, depending on a lock nesting.
+ */
+static struct cl_thread_counters *cl_lock_counters(const struct lu_env *env,
+                                                   const struct cl_lock *lock)
+{
+        struct cl_thread_info *info;
+        enum clt_nesting_level nesting;
+
+        info = cl_env_info(env);
+        nesting = cl_lock_nesting(lock);
+        LASSERT(nesting < ARRAY_SIZE(info->clt_counters));
+        return &info->clt_counters[nesting];
+}
+
+static void cl_lock_trace0(int level, const struct lu_env *env,
+                           const char *prefix, const struct cl_lock *lock,
+                           const char *func, const int line)
+{
+        struct cl_object_header *h = cl_object_header(lock->cll_descr.cld_obj);
+        CDEBUG(level, "%s: %p@(%d %p %d %d %d %d %d %lx)"
+                      "(%p/%d/%d) at %s():%d\n",
+               prefix, lock, cfs_atomic_read(&lock->cll_ref),
+               lock->cll_guarder, lock->cll_depth,
+               lock->cll_state, lock->cll_error, lock->cll_holds,
+               lock->cll_users, lock->cll_flags,
+               env, h->coh_nesting, cl_lock_nr_mutexed(env),
+               func, line);
+}
+#define cl_lock_trace(level, env, prefix, lock)                         \
+        cl_lock_trace0(level, env, prefix, lock, __FUNCTION__, __LINE__)
+
 #define RETIP ((unsigned long)__builtin_return_address(0))
 
 #ifdef CONFIG_LOCKDEP
-static struct lock_class_key cl_lock_key;
+static cfs_lock_class_key_t cl_lock_key;
 
 static void cl_lock_lockdep_init(struct cl_lock *lock)
 {
@@ -117,16 +158,20 @@ static void cl_lock_lockdep_init(struct cl_lock *lock)
 static void cl_lock_lockdep_acquire(const struct lu_env *env,
                                     struct cl_lock *lock, __u32 enqflags)
 {
-        cl_env_info(env)->clt_nr_locks_acquired++;
+        cl_lock_counters(env, lock)->ctc_nr_locks_acquired++;
+#ifdef HAVE_LOCK_MAP_ACQUIRE
+        lock_map_acquire(&lock->dep_map);
+#else  /* HAVE_LOCK_MAP_ACQUIRE */
         lock_acquire(&lock->dep_map, !!(enqflags & CEF_ASYNC),
                      /* try: */ 0, lock->cll_descr.cld_mode <= CLM_READ,
                      /* check: */ 2, RETIP);
+#endif /* HAVE_LOCK_MAP_ACQUIRE */
 }
 
 static void cl_lock_lockdep_release(const struct lu_env *env,
                                     struct cl_lock *lock)
 {
-        cl_env_info(env)->clt_nr_locks_acquired--;
+        cl_lock_counters(env, lock)->ctc_nr_locks_acquired--;
         lock_release(&lock->dep_map, 0, RETIP);
 }
 
@@ -158,7 +203,7 @@ void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
 {
         ENTRY;
         slice->cls_lock = lock;
-        list_add_tail(&slice->cls_linkage, &lock->cll_layers);
+        cfs_list_add_tail(&slice->cls_linkage, &lock->cll_layers);
         slice->cls_obj = obj;
         slice->cls_ops = ops;
         EXIT;
@@ -171,12 +216,18 @@ EXPORT_SYMBOL(cl_lock_slice_add);
  */
 int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
 {
-        LINVRNT(need == CLM_READ || need == CLM_WRITE || need == CLM_PHANTOM);
-        LINVRNT(has == CLM_READ || has == CLM_WRITE || has == CLM_PHANTOM);
+        LINVRNT(need == CLM_READ || need == CLM_WRITE ||
+                need == CLM_PHANTOM || need == CLM_GROUP);
+        LINVRNT(has == CLM_READ || has == CLM_WRITE ||
+                has == CLM_PHANTOM || has == CLM_GROUP);
         CLASSERT(CLM_PHANTOM < CLM_READ);
         CLASSERT(CLM_READ < CLM_WRITE);
+        CLASSERT(CLM_WRITE < CLM_GROUP);
 
-        return need <= has;
+        if (has != CLM_GROUP)
+                return need <= has;
+        else
+                return need == has;
 }
 EXPORT_SYMBOL(cl_lock_mode_match);
 
@@ -189,7 +240,8 @@ int cl_lock_ext_match(const struct cl_lock_descr *has,
         return
                 has->cld_start <= need->cld_start &&
                 has->cld_end >= need->cld_end &&
-                cl_lock_mode_match(has->cld_mode, need->cld_mode);
+                cl_lock_mode_match(has->cld_mode, need->cld_mode) &&
+                (has->cld_mode != CLM_GROUP || has->cld_gid == need->cld_gid);
 }
 EXPORT_SYMBOL(cl_lock_ext_match);
 
@@ -210,26 +262,26 @@ static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock)
 {
         struct cl_object *obj = lock->cll_descr.cld_obj;
 
-        LASSERT(cl_is_lock(lock));
         LINVRNT(!cl_lock_is_mutexed(lock));
 
         ENTRY;
-        might_sleep();
-        while (!list_empty(&lock->cll_layers)) {
+        cl_lock_trace(D_DLMTRACE, env, "free lock", lock);
+        cfs_might_sleep();
+        while (!cfs_list_empty(&lock->cll_layers)) {
                 struct cl_lock_slice *slice;
 
-                slice = list_entry(lock->cll_layers.next, struct cl_lock_slice,
-                                   cls_linkage);
-                list_del_init(lock->cll_layers.next);
+                slice = cfs_list_entry(lock->cll_layers.next,
+                                       struct cl_lock_slice, cls_linkage);
+                cfs_list_del_init(lock->cll_layers.next);
                 slice->cls_ops->clo_fini(env, slice);
         }
-        atomic_dec(&cl_object_site(obj)->cs_locks.cs_total);
-        atomic_dec(&cl_object_site(obj)->cs_locks_state[lock->cll_state]);
+        cfs_atomic_dec(&cl_object_site(obj)->cs_locks.cs_total);
+        cfs_atomic_dec(&cl_object_site(obj)->cs_locks_state[lock->cll_state]);
         lu_object_ref_del_at(&obj->co_lu, lock->cll_obj_ref, "cl_lock", lock);
         cl_object_put(env, obj);
         lu_ref_fini(&lock->cll_reference);
         lu_ref_fini(&lock->cll_holders);
-        mutex_destroy(&lock->cll_guard);
+        cfs_mutex_destroy(&lock->cll_guard);
         OBD_SLAB_FREE_PTR(lock, cl_lock_kmem);
         EXIT;
 }
@@ -246,25 +298,23 @@ static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock)
 void cl_lock_put(const struct lu_env *env, struct cl_lock *lock)
 {
         struct cl_object        *obj;
-        struct cl_object_header *head;
         struct cl_site          *site;
 
         LINVRNT(cl_lock_invariant(env, lock));
         ENTRY;
         obj = lock->cll_descr.cld_obj;
         LINVRNT(obj != NULL);
-        head = cl_object_header(obj);
         site = cl_object_site(obj);
 
-        CDEBUG(D_DLMTRACE, "releasing reference: %d %p %lu\n",
-               atomic_read(&lock->cll_ref), lock, RETIP);
+        CDEBUG(D_TRACE, "releasing reference: %d %p %lu\n",
+               cfs_atomic_read(&lock->cll_ref), lock, RETIP);
 
-        if (atomic_dec_and_test(&lock->cll_ref)) {
+        if (cfs_atomic_dec_and_test(&lock->cll_ref)) {
                 if (lock->cll_state == CLS_FREEING) {
-                        LASSERT(list_empty(&lock->cll_linkage));
+                        LASSERT(cfs_list_empty(&lock->cll_linkage));
                         cl_lock_free(env, lock);
                 }
-                atomic_dec(&site->cs_locks.cs_busy);
+                cfs_atomic_dec(&site->cs_locks.cs_busy);
         }
         EXIT;
 }
@@ -281,9 +331,9 @@ EXPORT_SYMBOL(cl_lock_put);
 void cl_lock_get(struct cl_lock *lock)
 {
         LINVRNT(cl_lock_invariant(NULL, lock));
-        CDEBUG(D_DLMTRACE|D_TRACE, "acquiring reference: %d %p %lu\n",
-               atomic_read(&lock->cll_ref), lock, RETIP);
-        atomic_inc(&lock->cll_ref);
+        CDEBUG(D_TRACE, "acquiring reference: %d %p %lu\n",
+               cfs_atomic_read(&lock->cll_ref), lock, RETIP);
+        cfs_atomic_inc(&lock->cll_ref);
 }
 EXPORT_SYMBOL(cl_lock_get);
 
@@ -300,11 +350,10 @@ void cl_lock_get_trust(struct cl_lock *lock)
 {
         struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
 
-        LASSERT(cl_is_lock(lock));
-        CDEBUG(D_DLMTRACE|D_TRACE, "acquiring trusted reference: %d %p %lu\n",
-               atomic_read(&lock->cll_ref), lock, RETIP);
-        if (atomic_inc_return(&lock->cll_ref) == 1)
-                atomic_inc(&site->cs_locks.cs_busy);
+        CDEBUG(D_TRACE, "acquiring trusted reference: %d %p %lu\n",
+               cfs_atomic_read(&lock->cll_ref), lock, RETIP);
+        if (cfs_atomic_inc_return(&lock->cll_ref) == 1)
+                cfs_atomic_inc(&site->cs_locks.cs_busy);
 }
 EXPORT_SYMBOL(cl_lock_get_trust);
 
@@ -317,6 +366,7 @@ EXPORT_SYMBOL(cl_lock_get_trust);
 static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
 {
         cl_lock_mutex_get(env, lock);
+        cl_lock_cancel(env, lock);
         cl_lock_delete(env, lock);
         cl_lock_mutex_put(env, lock);
         cl_lock_put(env, lock);
@@ -334,7 +384,7 @@ static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
         ENTRY;
         OBD_SLAB_ALLOC_PTR_GFP(lock, cl_lock_kmem, CFS_ALLOC_IO);
         if (lock != NULL) {
-                atomic_set(&lock->cll_ref, 1);
+                cfs_atomic_set(&lock->cll_ref, 1);
                 lock->cll_descr = *descr;
                 lock->cll_state = CLS_NEW;
                 cl_object_get(obj);
@@ -345,15 +395,16 @@ static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
                 CFS_INIT_LIST_HEAD(&lock->cll_inclosure);
                 lu_ref_init(&lock->cll_reference);
                 lu_ref_init(&lock->cll_holders);
-                mutex_init(&lock->cll_guard);
-                lockdep_set_class(&lock->cll_guard, &cl_lock_guard_class);
+                cfs_mutex_init(&lock->cll_guard);
+                cfs_lockdep_set_class(&lock->cll_guard, &cl_lock_guard_class);
                 cfs_waitq_init(&lock->cll_wq);
                 head = obj->co_lu.lo_header;
-                atomic_inc(&site->cs_locks_state[CLS_NEW]);
-                atomic_inc(&site->cs_locks.cs_total);
-                atomic_inc(&site->cs_locks.cs_created);
+                cfs_atomic_inc(&site->cs_locks_state[CLS_NEW]);
+                cfs_atomic_inc(&site->cs_locks.cs_total);
+                cfs_atomic_inc(&site->cs_locks.cs_created);
                 cl_lock_lockdep_init(lock);
-                list_for_each_entry(obj, &head->loh_layers, co_lu.lo_linkage) {
+                cfs_list_for_each_entry(obj, &head->loh_layers,
+                                        co_lu.lo_linkage) {
                         int err;
 
                         err = obj->co_ops->coo_lock_init(env, obj, lock, io);
@@ -369,6 +420,57 @@ static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
 }
 
 /**
+ * Transfer the lock into INTRANSIT state and return the original state.
+ *
+ * \pre  state: CLS_CACHED, CLS_HELD or CLS_ENQUEUED
+ * \post state: CLS_INTRANSIT
+ * \see CLS_INTRANSIT
+ */
+enum cl_lock_state cl_lock_intransit(const struct lu_env *env,
+                                     struct cl_lock *lock)
+{
+        enum cl_lock_state state = lock->cll_state;
+
+        LASSERT(cl_lock_is_mutexed(lock));
+        LASSERT(state != CLS_INTRANSIT);
+        LASSERTF(state >= CLS_ENQUEUED && state <= CLS_CACHED,
+                 "Malformed lock state %d.\n", state);
+
+        cl_lock_state_set(env, lock, CLS_INTRANSIT);
+        lock->cll_intransit_owner = cfs_current();
+        cl_lock_hold_add(env, lock, "intransit", cfs_current());
+        return state;
+}
+EXPORT_SYMBOL(cl_lock_intransit);
+
+/**
+ *  Exit the intransit state and restore the lock state to the original state
+ */
+void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock,
+                       enum cl_lock_state state)
+{
+        LASSERT(cl_lock_is_mutexed(lock));
+        LASSERT(lock->cll_state == CLS_INTRANSIT);
+        LASSERT(state != CLS_INTRANSIT);
+        LASSERT(lock->cll_intransit_owner == cfs_current());
+
+        lock->cll_intransit_owner = NULL;
+        cl_lock_state_set(env, lock, state);
+        cl_lock_unhold(env, lock, "intransit", cfs_current());
+}
+EXPORT_SYMBOL(cl_lock_extransit);
+
+/**
+ * Checking whether the lock is intransit state
+ */
+int cl_lock_is_intransit(struct cl_lock *lock)
+{
+        LASSERT(cl_lock_is_mutexed(lock));
+        return lock->cll_state == CLS_INTRANSIT &&
+               lock->cll_intransit_owner != cfs_current();
+}
+EXPORT_SYMBOL(cl_lock_is_intransit);
+/**
  * Returns true iff lock is "suitable" for given io. E.g., locks acquired by
  * truncate and O_APPEND cannot be reused for read/non-append-write, as they
  * cover multiple stripes and can trigger cascading timeouts.
@@ -382,7 +484,7 @@ static int cl_lock_fits_into(const struct lu_env *env,
 
         LINVRNT(cl_lock_invariant_trusted(env, lock));
         ENTRY;
-        list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
+        cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
                 if (slice->cls_ops->clo_fits_into != NULL &&
                     !slice->cls_ops->clo_fits_into(env, slice, need, io))
                         RETURN(0);
@@ -404,23 +506,21 @@ static struct cl_lock *cl_lock_lookup(const struct lu_env *env,
         head = cl_object_header(obj);
         site = cl_object_site(obj);
         LINVRNT_SPIN_LOCKED(&head->coh_lock_guard);
-        atomic_inc(&site->cs_locks.cs_lookup);
-        list_for_each_entry(lock, &head->coh_locks, cll_linkage) {
+        cfs_atomic_inc(&site->cs_locks.cs_lookup);
+        cfs_list_for_each_entry(lock, &head->coh_locks, cll_linkage) {
                 int matched;
 
-                LASSERT(cl_is_lock(lock));
                 matched = cl_lock_ext_match(&lock->cll_descr, need) &&
-                        lock->cll_state < CLS_FREEING &&
-                        !(lock->cll_flags & CLF_CANCELLED) &&
-                        cl_lock_fits_into(env, lock, need, io);
-                CDEBUG(D_DLMTRACE, "has: "DDESCR"(%i) need: "DDESCR": %d\n",
+                          lock->cll_state < CLS_FREEING &&
+                          lock->cll_error == 0 &&
+                          !(lock->cll_flags & CLF_CANCELLED) &&
+                          cl_lock_fits_into(env, lock, need, io);
+                CDEBUG(D_DLMTRACE, "has: "DDESCR"(%d) need: "DDESCR": %d\n",
                        PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
                        matched);
                 if (matched) {
                         cl_lock_get_trust(lock);
-                        /* move the lock to the LRU head */
-                        list_move(&lock->cll_linkage, &head->coh_locks);
-                        atomic_inc(&cl_object_site(obj)->cs_locks.cs_hit);
+                        cfs_atomic_inc(&cl_object_site(obj)->cs_locks.cs_hit);
                         RETURN(lock);
                 }
         }
@@ -452,23 +552,24 @@ static struct cl_lock *cl_lock_find(const struct lu_env *env,
         head = cl_object_header(obj);
         site = cl_object_site(obj);
 
-        spin_lock(&head->coh_lock_guard);
+        cfs_spin_lock(&head->coh_lock_guard);
         lock = cl_lock_lookup(env, obj, io, need);
-        spin_unlock(&head->coh_lock_guard);
+        cfs_spin_unlock(&head->coh_lock_guard);
 
         if (lock == NULL) {
                 lock = cl_lock_alloc(env, obj, io, need);
                 if (!IS_ERR(lock)) {
                         struct cl_lock *ghost;
 
-                        spin_lock(&head->coh_lock_guard);
+                        cfs_spin_lock(&head->coh_lock_guard);
                         ghost = cl_lock_lookup(env, obj, io, need);
                         if (ghost == NULL) {
-                                list_add(&lock->cll_linkage, &head->coh_locks);
-                                spin_unlock(&head->coh_lock_guard);
-                                atomic_inc(&site->cs_locks.cs_busy);
+                                cfs_list_add_tail(&lock->cll_linkage,
+                                                  &head->coh_locks);
+                                cfs_spin_unlock(&head->coh_lock_guard);
+                                cfs_atomic_inc(&site->cs_locks.cs_busy);
                         } else {
-                                spin_unlock(&head->coh_lock_guard);
+                                cfs_spin_unlock(&head->coh_lock_guard);
                                 /*
                                  * Other threads can acquire references to the
                                  * top-lock through its sub-locks. Hence, it
@@ -494,31 +595,39 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
         struct cl_object_header *head;
         struct cl_object        *obj;
         struct cl_lock          *lock;
+        int ok;
 
         obj  = need->cld_obj;
         head = cl_object_header(obj);
 
-        spin_lock(&head->coh_lock_guard);
+        cfs_spin_lock(&head->coh_lock_guard);
         lock = cl_lock_lookup(env, obj, io, need);
-        spin_unlock(&head->coh_lock_guard);
+        cfs_spin_unlock(&head->coh_lock_guard);
 
-        if (lock != NULL) {
-                int ok;
+        if (lock == NULL)
+                return NULL;
 
-                cl_lock_mutex_get(env, lock);
-                if (lock->cll_state == CLS_CACHED)
-                        cl_use_try(env, lock);
-                ok = lock->cll_state == CLS_HELD;
-                if (ok) {
-                        cl_lock_hold_add(env, lock, scope, source);
-                        cl_lock_user_add(env, lock);
-                }
-                cl_lock_mutex_put(env, lock);
-                if (!ok) {
-                        cl_lock_put(env, lock);
-                        lock = NULL;
-                }
+        cl_lock_mutex_get(env, lock);
+        if (lock->cll_state == CLS_INTRANSIT)
+                cl_lock_state_wait(env, lock); /* Don't care return value. */
+        if (lock->cll_state == CLS_CACHED) {
+                int result;
+                result = cl_use_try(env, lock, 1);
+                if (result < 0)
+                        cl_lock_error(env, lock, result);
+        }
+        ok = lock->cll_state == CLS_HELD;
+        if (ok) {
+                cl_lock_hold_add(env, lock, scope, source);
+                cl_lock_user_add(env, lock);
+                cl_lock_put(env, lock);
+        }
+        cl_lock_mutex_put(env, lock);
+        if (!ok) {
+                cl_lock_put(env, lock);
+                lock = NULL;
         }
+
         return lock;
 }
 EXPORT_SYMBOL(cl_lock_peek);
@@ -537,7 +646,7 @@ const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock,
         LINVRNT(cl_lock_invariant_trusted(NULL, lock));
         ENTRY;
 
-        list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
+        cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
                 if (slice->cls_obj->co_lu.lo_dev->ld_type == dtype)
                         RETURN(slice);
         }
@@ -545,23 +654,15 @@ const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock,
 }
 EXPORT_SYMBOL(cl_lock_at);
 
-static void cl_lock_trace(struct cl_thread_info *info,
-                          const char *prefix, const struct cl_lock *lock)
-{
-        CDEBUG(D_DLMTRACE|D_TRACE, "%s: %i@%p %p %i %i\n", prefix,
-               atomic_read(&lock->cll_ref), lock, lock->cll_guarder,
-               lock->cll_depth, info->clt_nr_locks_locked);
-}
-
 static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
 {
-        struct cl_thread_info *info;
+        struct cl_thread_counters *counters;
 
-        info = cl_env_info(env);
+        counters = cl_lock_counters(env, lock);
         lock->cll_depth++;
-        info->clt_nr_locks_locked++;
-        lu_ref_add(&info->clt_locks_locked, "cll_guard", lock);
-        cl_lock_trace(info, "got mutex", lock);
+        counters->ctc_nr_locks_locked++;
+        lu_ref_add(&counters->ctc_locks_locked, "cll_guard", lock);
+        cl_lock_trace(D_TRACE, env, "got mutex", lock);
 }
 
 /**
@@ -583,10 +684,18 @@ void cl_lock_mutex_get(const struct lu_env *env, struct cl_lock *lock)
                 LINVRNT(lock->cll_depth > 0);
         } else {
                 struct cl_object_header *hdr;
+                struct cl_thread_info   *info;
+                int i;
 
                 LINVRNT(lock->cll_guarder != cfs_current());
                 hdr = cl_object_header(lock->cll_descr.cld_obj);
-                mutex_lock_nested(&lock->cll_guard, hdr->coh_nesting);
+                /*
+                 * Check that mutices are taken in the bottom-to-top order.
+                 */
+                info = cl_env_info(env);
+                for (i = 0; i < hdr->coh_nesting; ++i)
+                        LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
+                cfs_mutex_lock_nested(&lock->cll_guard, hdr->coh_nesting);
                 lock->cll_guarder = cfs_current();
                 LINVRNT(lock->cll_depth == 0);
         }
@@ -616,7 +725,7 @@ int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock)
         if (lock->cll_guarder == cfs_current()) {
                 LINVRNT(lock->cll_depth > 0);
                 cl_lock_mutex_tail(env, lock);
-        } else if (mutex_trylock(&lock->cll_guard)) {
+        } else if (cfs_mutex_trylock(&lock->cll_guard)) {
                 LINVRNT(lock->cll_depth == 0);
                 lock->cll_guarder = cfs_current();
                 cl_lock_mutex_tail(env, lock);
@@ -627,7 +736,7 @@ int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock)
 EXPORT_SYMBOL(cl_lock_mutex_try);
 
 /**
- * Unlocks cl_lock object.
{* Unlocks cl_lock object.
  *
  * \pre cl_lock_is_mutexed(lock)
  *
@@ -635,22 +744,22 @@ EXPORT_SYMBOL(cl_lock_mutex_try);
  */
 void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock)
 {
-        struct cl_thread_info *info;
+        struct cl_thread_counters *counters;
 
         LINVRNT(cl_lock_invariant(env, lock));
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(lock->cll_guarder == cfs_current());
         LINVRNT(lock->cll_depth > 0);
 
-        info = cl_env_info(env);
-        LINVRNT(info->clt_nr_locks_locked > 0);
+        counters = cl_lock_counters(env, lock);
+        LINVRNT(counters->ctc_nr_locks_locked > 0);
 
-        cl_lock_trace(info, "put mutex", lock);
-        lu_ref_del(&info->clt_locks_locked, "cll_guard", lock);
-        info->clt_nr_locks_locked--;
+        cl_lock_trace(D_TRACE, env, "put mutex", lock);
+        lu_ref_del(&counters->ctc_locks_locked, "cll_guard", lock);
+        counters->ctc_nr_locks_locked--;
         if (--lock->cll_depth == 0) {
                 lock->cll_guarder = NULL;
-                mutex_unlock(&lock->cll_guard);
+                cfs_mutex_unlock(&lock->cll_guard);
         }
 }
 EXPORT_SYMBOL(cl_lock_mutex_put);
@@ -669,7 +778,19 @@ EXPORT_SYMBOL(cl_lock_is_mutexed);
  */
 int cl_lock_nr_mutexed(const struct lu_env *env)
 {
-        return cl_env_info(env)->clt_nr_locks_locked;
+        struct cl_thread_info *info;
+        int i;
+        int locked;
+
+        /*
+         * NOTE: if summation across all nesting levels (currently 2) proves
+         *       too expensive, a summary counter can be added to
+         *       struct cl_thread_info.
+         */
+        info = cl_env_info(env);
+        for (i = 0, locked = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
+                locked += info->clt_counters[i].ctc_nr_locks_locked;
+        return locked;
 }
 EXPORT_SYMBOL(cl_lock_nr_mutexed);
 
@@ -682,8 +803,8 @@ static void cl_lock_cancel0(const struct lu_env *env, struct cl_lock *lock)
                 const struct cl_lock_slice *slice;
 
                 lock->cll_flags |= CLF_CANCELLED;
-                list_for_each_entry_reverse(slice, &lock->cll_layers,
-                                            cls_linkage) {
+                cfs_list_for_each_entry_reverse(slice, &lock->cll_layers,
+                                                cls_linkage) {
                         if (slice->cls_ops->clo_cancel != NULL)
                                 slice->cls_ops->clo_cancel(env, slice);
                 }
@@ -701,26 +822,21 @@ static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
 
         ENTRY;
         if (lock->cll_state < CLS_FREEING) {
+                LASSERT(lock->cll_state != CLS_INTRANSIT);
                 cl_lock_state_set(env, lock, CLS_FREEING);
 
                 head = cl_object_header(lock->cll_descr.cld_obj);
 
-                spin_lock(&head->coh_lock_guard);
-                list_del_init(&lock->cll_linkage);
-                /*
-                 * No locks, no pages. This is only valid for bottom sub-locks
-                 * and head->coh_nesting == 1 check assumes two level top-sub
-                 * hierarchy.
-                 */
-                LASSERT(ergo(head->coh_nesting == 1 &&
-                             list_empty(&head->coh_locks), !head->coh_pages));
-                spin_unlock(&head->coh_lock_guard);
+                cfs_spin_lock(&head->coh_lock_guard);
+                cfs_list_del_init(&lock->cll_linkage);
+
+                cfs_spin_unlock(&head->coh_lock_guard);
                 /*
                  * From now on, no new references to this lock can be acquired
                  * by cl_lock_lookup().
                  */
-                list_for_each_entry_reverse(slice, &lock->cll_layers,
-                                            cls_linkage) {
+                cfs_list_for_each_entry_reverse(slice, &lock->cll_layers,
+                                                cls_linkage) {
                         if (slice->cls_ops->clo_delete != NULL)
                                 slice->cls_ops->clo_delete(env, slice);
                 }
@@ -737,33 +853,43 @@ static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
         EXIT;
 }
 
+/**
+ * Mod(ifie)s cl_lock::cll_holds counter for a given lock. Also, for a
+ * top-lock (nesting == 0) accounts for this modification in the per-thread
+ * debugging counters. Sub-lock holds can be released by a thread different
+ * from one that acquired it.
+ */
 static void cl_lock_hold_mod(const struct lu_env *env, struct cl_lock *lock,
                              int delta)
 {
-        struct cl_thread_info   *cti;
-        struct cl_object_header *hdr;
+        struct cl_thread_counters *counters;
+        enum clt_nesting_level     nesting;
 
-        cti = cl_env_info(env);
-        hdr = cl_object_header(lock->cll_descr.cld_obj);
         lock->cll_holds += delta;
-        if (hdr->coh_nesting == 0) {
-                cti->clt_nr_held += delta;
-                LASSERT(cti->clt_nr_held >= 0);
+        nesting = cl_lock_nesting(lock);
+        if (nesting == CNL_TOP) {
+                counters = &cl_env_info(env)->clt_counters[CNL_TOP];
+                counters->ctc_nr_held += delta;
+                LASSERT(counters->ctc_nr_held >= 0);
         }
 }
 
+/**
+ * Mod(ifie)s cl_lock::cll_users counter for a given lock. See
+ * cl_lock_hold_mod() for the explanation of the debugging code.
+ */
 static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock,
                              int delta)
 {
-        struct cl_thread_info   *cti;
-        struct cl_object_header *hdr;
+        struct cl_thread_counters *counters;
+        enum clt_nesting_level     nesting;
 
-        cti = cl_env_info(env);
-        hdr = cl_object_header(lock->cll_descr.cld_obj);
         lock->cll_users += delta;
-        if (hdr->coh_nesting == 0) {
-                cti->clt_nr_used += delta;
-                LASSERT(cti->clt_nr_used >= 0);
+        nesting = cl_lock_nesting(lock);
+        if (nesting == CNL_TOP) {
+                counters = &cl_env_info(env)->clt_counters[CNL_TOP];
+                counters->ctc_nr_used += delta;
+                LASSERT(counters->ctc_nr_used >= 0);
         }
 }
 
@@ -775,13 +901,15 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
         LASSERT(lock->cll_holds > 0);
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "hold release lock", lock);
         lu_ref_del(&lock->cll_holders, scope, source);
         cl_lock_hold_mod(env, lock, -1);
         if (lock->cll_holds == 0) {
-                if (lock->cll_descr.cld_mode == CLM_PHANTOM)
+                if (lock->cll_descr.cld_mode == CLM_PHANTOM ||
+                    lock->cll_descr.cld_mode == CLM_GROUP)
                         /*
-                         * If lock is still phantom when user is done with
-                         * it---destroy the lock.
+                         * If lock is still phantom or grouplock when user is
+                         * done with it---destroy the lock.
                          */
                         lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
                 if (lock->cll_flags & CLF_CANCELPEND) {
@@ -797,7 +925,6 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
         EXIT;
 }
 
-
 /**
  * Waits until lock state is changed.
  *
@@ -820,6 +947,7 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
 int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
 {
         cfs_waitlink_t waiter;
+        cfs_sigset_t blocked;
         int result;
 
         ENTRY;
@@ -828,22 +956,30 @@ int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
         LASSERT(lock->cll_depth == 1);
         LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
 
+        cl_lock_trace(D_DLMTRACE, env, "state wait lock", lock);
         result = lock->cll_error;
-        if (result == 0 && !(lock->cll_flags & CLF_STATE)) {
+        if (result == 0) {
+                /* To avoid being interrupted by the 'non-fatal' signals
+                 * (SIGCHLD, for instance), we'd block them temporarily.
+                 * LU-305 */
+                blocked = cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+
                 cfs_waitlink_init(&waiter);
                 cfs_waitq_add(&lock->cll_wq, &waiter);
-                set_current_state(CFS_TASK_INTERRUPTIBLE);
+                cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
                 cl_lock_mutex_put(env, lock);
 
                 LASSERT(cl_lock_nr_mutexed(env) == 0);
                 cfs_waitq_wait(&waiter, CFS_TASK_INTERRUPTIBLE);
 
                 cl_lock_mutex_get(env, lock);
-                set_current_state(CFS_TASK_RUNNING);
+                cfs_set_current_state(CFS_TASK_RUNNING);
                 cfs_waitq_del(&lock->cll_wq, &waiter);
                 result = cfs_signal_pending() ? -EINTR : 0;
+
+                /* Restore old blocked signals */
+                cfs_restore_sigs(blocked);
         }
-        lock->cll_flags &= ~CLF_STATE;
         RETURN(result);
 }
 EXPORT_SYMBOL(cl_lock_state_wait);
@@ -857,10 +993,9 @@ static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(cl_lock_invariant(env, lock));
 
-        list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
+        cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
                 if (slice->cls_ops->clo_state != NULL)
                         slice->cls_ops->clo_state(env, slice, state);
-        lock->cll_flags |= CLF_STATE;
         cfs_waitq_broadcast(&lock->cll_wq);
         EXIT;
 }
@@ -875,6 +1010,7 @@ static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
 void cl_lock_signal(const struct lu_env *env, struct cl_lock *lock)
 {
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "state signal lock", lock);
         cl_lock_state_signal(env, lock, lock->cll_state);
         EXIT;
 }
@@ -899,13 +1035,14 @@ void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
         LASSERT(lock->cll_state <= state ||
                 (lock->cll_state == CLS_CACHED &&
                  (state == CLS_HELD || /* lock found in cache */
-                  state == CLS_NEW     /* sub-lock canceled */)) ||
-                /* sub-lock canceled during unlocking */
-                (lock->cll_state == CLS_UNLOCKING && state == CLS_NEW));
+                  state == CLS_NEW  ||   /* sub-lock canceled */
+                  state == CLS_INTRANSIT)) ||
+                /* lock is in transit state */
+                lock->cll_state == CLS_INTRANSIT);
 
         if (lock->cll_state != state) {
-                atomic_dec(&site->cs_locks_state[lock->cll_state]);
-                atomic_inc(&site->cs_locks_state[state]);
+                cfs_atomic_dec(&site->cs_locks_state[lock->cll_state]);
+                cfs_atomic_inc(&site->cs_locks_state[state]);
 
                 cl_lock_state_signal(env, lock, state);
                 lock->cll_state = state;
@@ -914,18 +1051,55 @@ void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
 }
 EXPORT_SYMBOL(cl_lock_state_set);
 
+static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
+{
+        const struct cl_lock_slice *slice;
+        int result;
+
+        do {
+                result = 0;
+
+                LINVRNT(cl_lock_is_mutexed(lock));
+                LINVRNT(cl_lock_invariant(env, lock));
+                LASSERT(lock->cll_state == CLS_INTRANSIT);
+
+                result = -ENOSYS;
+                cfs_list_for_each_entry_reverse(slice, &lock->cll_layers,
+                                                cls_linkage) {
+                        if (slice->cls_ops->clo_unuse != NULL) {
+                                result = slice->cls_ops->clo_unuse(env, slice);
+                                if (result != 0)
+                                        break;
+                        }
+                }
+                LASSERT(result != -ENOSYS);
+        } while (result == CLO_REPEAT);
+
+        return result;
+}
+
 /**
  * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling
  * cl_lock_operations::clo_use() top-to-bottom to notify layers.
+ * @atomic = 1, it must unuse the lock to recovery the lock to keep the
+ *  use process atomic
  */
-int cl_use_try(const struct lu_env *env, struct cl_lock *lock)
+int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
 {
-        int result;
         const struct cl_lock_slice *slice;
+        int result;
+        enum cl_lock_state state;
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "use lock", lock);
+
+        LASSERT(lock->cll_state == CLS_CACHED);
+        if (lock->cll_error)
+                RETURN(lock->cll_error);
+
         result = -ENOSYS;
-        list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
+        state = cl_lock_intransit(env, lock);
+        cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
                 if (slice->cls_ops->clo_use != NULL) {
                         result = slice->cls_ops->clo_use(env, slice);
                         if (result != 0)
@@ -933,8 +1107,34 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock)
                 }
         }
         LASSERT(result != -ENOSYS);
-        if (result == 0)
-                cl_lock_state_set(env, lock, CLS_HELD);
+
+        LASSERTF(lock->cll_state == CLS_INTRANSIT, "Wrong state %d.\n",
+                 lock->cll_state);
+
+        if (result == 0) {
+                state = CLS_HELD;
+        } else {
+                if (result == -ESTALE) {
+                        /*
+                         * ESTALE means sublock being cancelled
+                         * at this time, and set lock state to
+                         * be NEW here and ask the caller to repeat.
+                         */
+                        state = CLS_NEW;
+                        result = CLO_REPEAT;
+                }
+
+                /* @atomic means back-off-on-failure. */
+                if (atomic) {
+                        int rc;
+                        rc = cl_unuse_try_internal(env, lock);
+                        /* Vet the results. */
+                        if (rc < 0 && result > 0)
+                                result = rc;
+                }
+
+        }
+        cl_lock_extransit(env, lock, state);
         RETURN(result);
 }
 EXPORT_SYMBOL(cl_use_try);
@@ -952,7 +1152,7 @@ static int cl_enqueue_kick(const struct lu_env *env,
 
         ENTRY;
         result = -ENOSYS;
-        list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
+        cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
                 if (slice->cls_ops->clo_enqueue != NULL) {
                         result = slice->cls_ops->clo_enqueue(env,
                                                              slice, io, flags);
@@ -983,6 +1183,7 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
         int result;
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "enqueue lock", lock);
         do {
                 result = 0;
 
@@ -1000,14 +1201,13 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
                         if (result == 0)
                                 cl_lock_state_set(env, lock, CLS_ENQUEUED);
                         break;
-                case CLS_UNLOCKING:
-                        /* wait until unlocking finishes, and enqueue lock
-                         * afresh. */
+                case CLS_INTRANSIT:
+                        LASSERT(cl_lock_is_intransit(lock));
                         result = CLO_WAIT;
                         break;
                 case CLS_CACHED:
                         /* yank lock from the cache. */
-                        result = cl_use_try(env, lock);
+                        result = cl_use_try(env, lock, 0);
                         break;
                 case CLS_ENQUEUED:
                 case CLS_HELD:
@@ -1029,6 +1229,51 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
 }
 EXPORT_SYMBOL(cl_enqueue_try);
 
+/**
+ * Cancel the conflicting lock found during previous enqueue.
+ *
+ * \retval 0 conflicting lock has been canceled.
+ * \retval -ve error code.
+ */
+int cl_lock_enqueue_wait(const struct lu_env *env,
+                         struct cl_lock *lock,
+                         int keep_mutex)
+{
+        struct cl_lock  *conflict;
+        int              rc = 0;
+        ENTRY;
+
+        LASSERT(cl_lock_is_mutexed(lock));
+        LASSERT(lock->cll_state == CLS_QUEUING);
+        LASSERT(lock->cll_conflict != NULL);
+
+        conflict = lock->cll_conflict;
+        lock->cll_conflict = NULL;
+
+        cl_lock_mutex_put(env, lock);
+        LASSERT(cl_lock_nr_mutexed(env) == 0);
+
+        cl_lock_mutex_get(env, conflict);
+        cl_lock_cancel(env, conflict);
+        cl_lock_delete(env, conflict);
+
+        while (conflict->cll_state != CLS_FREEING) {
+                rc = cl_lock_state_wait(env, conflict);
+                if (rc != 0)
+                        break;
+        }
+        cl_lock_mutex_put(env, conflict);
+        lu_ref_del(&conflict->cll_reference, "cancel-wait", lock);
+        cl_lock_put(env, conflict);
+
+        if (keep_mutex)
+                cl_lock_mutex_get(env, lock);
+
+        LASSERT(rc <= 0);
+        RETURN(rc);
+}
+EXPORT_SYMBOL(cl_lock_enqueue_wait);
+
 static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
                              struct cl_io *io, __u32 enqflags)
 {
@@ -1044,7 +1289,10 @@ static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
         do {
                 result = cl_enqueue_try(env, lock, io, enqflags);
                 if (result == CLO_WAIT) {
-                        result = cl_lock_state_wait(env, lock);
+                        if (lock->cll_conflict != NULL)
+                                result = cl_lock_enqueue_wait(env, lock, 1);
+                        else
+                                result = cl_lock_state_wait(env, lock);
                         if (result == 0)
                                 continue;
                 }
@@ -1052,10 +1300,10 @@ static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
         } while (1);
         if (result != 0) {
                 cl_lock_user_del(env, lock);
-                if (result != -EINTR)
-                        cl_lock_error(env, lock, result);
+                cl_lock_error(env, lock, result);
         }
-        LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
+        LASSERT(ergo(result == 0 && !(enqflags & CEF_AGL),
+                     lock->cll_state == CLS_ENQUEUED ||
                      lock->cll_state == CLS_HELD));
         RETURN(result);
 }
@@ -1093,8 +1341,9 @@ EXPORT_SYMBOL(cl_enqueue);
  *
  * This function is called repeatedly by cl_unuse() until either lock is
  * unlocked, or error occurs.
+ * cl_unuse_try is a one-shot operation, so it must NOT return CLO_WAIT.
  *
- * \ppre lock->cll_state <= CLS_HELD || lock->cll_state == CLS_UNLOCKING
+ * \pre  lock->cll_state == CLS_HELD
  *
  * \post ergo(result == 0, lock->cll_state == CLS_CACHED)
  *
@@ -1103,57 +1352,31 @@ EXPORT_SYMBOL(cl_enqueue);
  */
 int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
 {
-        const struct cl_lock_slice *slice;
         int                         result;
+        enum cl_lock_state          state = CLS_NEW;
 
         ENTRY;
-        if (lock->cll_state != CLS_UNLOCKING) {
-                if (lock->cll_users > 1) {
-                        cl_lock_user_del(env, lock);
-                        RETURN(0);
-                }
-                /*
-                 * New lock users (->cll_users) are not protecting unlocking
-                 * from proceeding. From this point, lock eventually reaches
-                 * CLS_CACHED, is reinitialized to CLS_NEW or fails into
-                 * CLS_FREEING.
-                 */
-                cl_lock_state_set(env, lock, CLS_UNLOCKING);
-        }
-        do {
-                result = 0;
+        cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock);
 
-                if (lock->cll_error != 0)
-                        break;
+        LASSERT(lock->cll_state == CLS_HELD || lock->cll_state == CLS_ENQUEUED);
+        if (lock->cll_users > 1) {
+                cl_lock_user_del(env, lock);
+                RETURN(0);
+        }
 
-                LINVRNT(cl_lock_is_mutexed(lock));
-                LINVRNT(cl_lock_invariant(env, lock));
-                LASSERT(lock->cll_state == CLS_UNLOCKING);
-                LASSERT(lock->cll_users > 0);
-                LASSERT(lock->cll_holds > 0);
+        /*
+         * New lock users (->cll_users) are not protecting unlocking
+         * from proceeding. From this point, lock eventually reaches
+         * CLS_CACHED, is reinitialized to CLS_NEW or fails into
+         * CLS_FREEING.
+         */
+        state = cl_lock_intransit(env, lock);
 
-                result = -ENOSYS;
-                list_for_each_entry_reverse(slice, &lock->cll_layers,
-                                            cls_linkage) {
-                        if (slice->cls_ops->clo_unuse != NULL) {
-                                result = slice->cls_ops->clo_unuse(env, slice);
-                                if (result != 0)
-                                        break;
-                        }
-                }
-                LASSERT(result != -ENOSYS);
-        } while (result == CLO_REPEAT);
-        if (result != CLO_WAIT)
-                /*
-                 * Once there is no more need to iterate ->clo_unuse() calls,
-                 * remove lock user. This is done even if unrecoverable error
-                 * happened during unlocking, because nothing else can be
-                 * done.
-                 */
-                cl_lock_user_del(env, lock);
+        result = cl_unuse_try_internal(env, lock);
+        LASSERT(lock->cll_state == CLS_INTRANSIT);
+        LASSERT(result != CLO_WAIT);
+        cl_lock_user_del(env, lock);
         if (result == 0 || result == -ESTALE) {
-                enum cl_lock_state state;
-
                 /*
                  * Return lock back to the cache. This is the only
                  * place where lock is moved into CLS_CACHED state.
@@ -1163,8 +1386,11 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  * re-initialized. This happens e.g., when a sub-lock was
                  * canceled while unlocking was in progress.
                  */
-                state = result == 0 ? CLS_CACHED : CLS_NEW;
-                cl_lock_state_set(env, lock, state);
+                if (state == CLS_HELD && result == 0)
+                        state = CLS_CACHED;
+                else
+                        state = CLS_NEW;
+                cl_lock_extransit(env, lock, state);
 
                 /*
                  * Hide -ESTALE error.
@@ -1176,7 +1402,11 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  * pages won't be written to OSTs. -jay
                  */
                 result = 0;
+        } else {
+                CERROR("result = %d, this is unlikely!\n", result);
+                cl_lock_extransit(env, lock, state);
         }
+
         result = result ?: lock->cll_error;
         if (result < 0)
                 cl_lock_error(env, lock, result);
@@ -1186,19 +1416,13 @@ EXPORT_SYMBOL(cl_unuse_try);
 
 static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
 {
+        int result;
         ENTRY;
-        LASSERT(lock->cll_state <= CLS_HELD);
-        do {
-                int result;
 
-                result = cl_unuse_try(env, lock);
-                if (result == CLO_WAIT) {
-                        result = cl_lock_state_wait(env, lock);
-                        if (result == 0)
-                                continue;
-                }
-                break;
-        } while (1);
+        result = cl_unuse_try(env, lock);
+        if (result)
+                CL_LOCK_DEBUG(D_ERROR, env, lock, "unuse return %d\n", result);
+
         EXIT;
 }
 
@@ -1232,23 +1456,31 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
         int                         result;
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "wait lock try", lock);
         do {
                 LINVRNT(cl_lock_is_mutexed(lock));
                 LINVRNT(cl_lock_invariant(env, lock));
                 LASSERT(lock->cll_state == CLS_ENQUEUED ||
-                        lock->cll_state == CLS_HELD);
+                        lock->cll_state == CLS_HELD ||
+                        lock->cll_state == CLS_INTRANSIT);
                 LASSERT(lock->cll_users > 0);
                 LASSERT(lock->cll_holds > 0);
 
                 result = 0;
                 if (lock->cll_error != 0)
                         break;
+
+                if (cl_lock_is_intransit(lock)) {
+                        result = CLO_WAIT;
+                        break;
+                }
+
                 if (lock->cll_state == CLS_HELD)
                         /* nothing to do */
                         break;
 
                 result = -ENOSYS;
-                list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
+                cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
                         if (slice->cls_ops->clo_wait != NULL) {
                                 result = slice->cls_ops->clo_wait(env, slice);
                                 if (result != 0)
@@ -1256,8 +1488,10 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
                         }
                 }
                 LASSERT(result != -ENOSYS);
-                if (result == 0)
+                if (result == 0) {
+                        LASSERT(lock->cll_state != CLS_INTRANSIT);
                         cl_lock_state_set(env, lock, CLS_HELD);
+                }
         } while (result == CLO_REPEAT);
         RETURN(result ?: lock->cll_error);
 }
@@ -1280,7 +1514,8 @@ int cl_wait(const struct lu_env *env, struct cl_lock *lock)
         cl_lock_mutex_get(env, lock);
 
         LINVRNT(cl_lock_invariant(env, lock));
-        LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD);
+        LASSERTF(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD,
+                 "Wrong state %d \n", lock->cll_state);
         LASSERT(lock->cll_holds > 0);
 
         do {
@@ -1294,10 +1529,10 @@ int cl_wait(const struct lu_env *env, struct cl_lock *lock)
         } while (1);
         if (result < 0) {
                 cl_lock_user_del(env, lock);
-                if (result != -EINTR)
-                        cl_lock_error(env, lock, result);
+                cl_lock_error(env, lock, result);
                 cl_lock_lockdep_release(env, lock);
         }
+        cl_lock_trace(D_DLMTRACE, env, "wait lock", lock);
         cl_lock_mutex_put(env, lock);
         LASSERT(ergo(result == 0, lock->cll_state == CLS_HELD));
         RETURN(result);
@@ -1319,7 +1554,7 @@ unsigned long cl_lock_weigh(const struct lu_env *env, struct cl_lock *lock)
         LINVRNT(cl_lock_invariant(env, lock));
 
         pound = 0;
-        list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
+        cfs_list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
                 if (slice->cls_ops->clo_weigh != NULL) {
                         ounce = slice->cls_ops->clo_weigh(env, slice);
                         pound += ounce;
@@ -1350,12 +1585,13 @@ int cl_lock_modify(const struct lu_env *env, struct cl_lock *lock,
         int result;
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "modify lock", lock);
         /* don't allow object to change */
         LASSERT(obj == desc->cld_obj);
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(cl_lock_invariant(env, lock));
 
-        list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
+        cfs_list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
                 if (slice->cls_ops->clo_modify != NULL) {
                         result = slice->cls_ops->clo_modify(env, slice, desc);
                         if (result != 0)
@@ -1369,9 +1605,9 @@ int cl_lock_modify(const struct lu_env *env, struct cl_lock *lock,
          * now. If locks were indexed according to their extent and/or mode,
          * that index would have to be updated here.
          */
-        spin_lock(&hdr->coh_lock_guard);
+        cfs_spin_lock(&hdr->coh_lock_guard);
         lock->cll_descr = *desc;
-        spin_unlock(&hdr->coh_lock_guard);
+        cfs_spin_unlock(&hdr->coh_lock_guard);
         RETURN(0);
 }
 EXPORT_SYMBOL(cl_lock_modify);
@@ -1417,7 +1653,7 @@ int cl_lock_closure_build(const struct lu_env *env, struct cl_lock *lock,
 
         result = cl_lock_enclosure(env, lock, closure);
         if (result == 0) {
-                list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
+                cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
                         if (slice->cls_ops->clo_closure != NULL) {
                                 result = slice->cls_ops->clo_closure(env, slice,
                                                                      closure);
@@ -1442,17 +1678,18 @@ EXPORT_SYMBOL(cl_lock_closure_build);
 int cl_lock_enclosure(const struct lu_env *env, struct cl_lock *lock,
                       struct cl_lock_closure *closure)
 {
-        int result;
+        int result = 0;
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "enclosure lock", lock);
         if (!cl_lock_mutex_try(env, lock)) {
                 /*
                  * If lock->cll_inclosure is not empty, lock is already in
                  * this closure.
                  */
-                if (list_empty(&lock->cll_inclosure)) {
+                if (cfs_list_empty(&lock->cll_inclosure)) {
                         cl_lock_get_trust(lock);
                         lu_ref_add(&lock->cll_reference, "closure", closure);
-                        list_add(&lock->cll_inclosure, &closure->clc_list);
+                        cfs_list_add(&lock->cll_inclosure, &closure->clc_list);
                         closure->clc_nr++;
                 } else
                         cl_lock_mutex_put(env, lock);
@@ -1485,8 +1722,10 @@ void cl_lock_disclosure(const struct lu_env *env,
         struct cl_lock *scan;
         struct cl_lock *temp;
 
-        list_for_each_entry_safe(scan, temp, &closure->clc_list, cll_inclosure){
-                list_del_init(&scan->cll_inclosure);
+        cl_lock_trace(D_DLMTRACE, env, "disclosure lock", closure->clc_origin);
+        cfs_list_for_each_entry_safe(scan, temp, &closure->clc_list,
+                                     cll_inclosure){
+                cfs_list_del_init(&scan->cll_inclosure);
                 cl_lock_mutex_put(env, scan);
                 lu_ref_del(&scan->cll_reference, "closure", closure);
                 cl_lock_put(env, scan);
@@ -1500,7 +1739,7 @@ EXPORT_SYMBOL(cl_lock_disclosure);
 void cl_lock_closure_fini(struct cl_lock_closure *closure)
 {
         LASSERT(closure->clc_nr == 0);
-        LASSERT(list_empty(&closure->clc_list));
+        LASSERT(cfs_list_empty(&closure->clc_list));
 }
 EXPORT_SYMBOL(cl_lock_closure_fini);
 
@@ -1516,6 +1755,11 @@ EXPORT_SYMBOL(cl_lock_closure_fini);
  * cl_lock_put() to finish it.
  *
  * \pre atomic_read(&lock->cll_ref) > 0
+ * \pre ergo(cl_lock_nesting(lock) == CNL_TOP,
+ *           cl_lock_nr_mutexed(env) == 1)
+ *      [i.e., if a top-lock is deleted, mutices of no other locks can be
+ *      held, as deletion of sub-locks might require releasing a top-lock
+ *      mutex]
  *
  * \see cl_lock_operations::clo_delete()
  * \see cl_lock::cll_holds
@@ -1524,8 +1768,11 @@ void cl_lock_delete(const struct lu_env *env, struct cl_lock *lock)
 {
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(cl_lock_invariant(env, lock));
+        LASSERT(ergo(cl_lock_nesting(lock) == CNL_TOP,
+                     cl_lock_nr_mutexed(env) == 1));
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "delete lock", lock);
         if (lock->cll_holds == 0)
                 cl_lock_delete0(env, lock);
         else
@@ -1550,6 +1797,7 @@ void cl_lock_error(const struct lu_env *env, struct cl_lock *lock, int error)
         LINVRNT(cl_lock_invariant(env, lock));
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "set lock error", lock);
         if (lock->cll_error == 0 && error != 0) {
                 lock->cll_error = error;
                 cl_lock_signal(env, lock);
@@ -1575,7 +1823,9 @@ void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock)
 {
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(cl_lock_invariant(env, lock));
+
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "cancel lock", lock);
         if (lock->cll_holds == 0)
                 cl_lock_cancel0(env, lock);
         else
@@ -1606,11 +1856,16 @@ struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
         need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
                                     * not PHANTOM */
         need->cld_start = need->cld_end = page->cp_index;
+        need->cld_enq_flags = 0;
 
-        spin_lock(&head->coh_lock_guard);
-        list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
+        cfs_spin_lock(&head->coh_lock_guard);
+        /* It is fine to match any group lock since there could be only one
+         * with a uniq gid and it conflicts with all other lock modes too */
+        cfs_list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
                 if (scan != except &&
-                    cl_lock_ext_match(&scan->cll_descr, need) &&
+                    (scan->cll_descr.cld_mode == CLM_GROUP ||
+                    cl_lock_ext_match(&scan->cll_descr, need)) &&
+                    scan->cll_state >= CLS_HELD &&
                     scan->cll_state < CLS_FREEING &&
                     /*
                      * This check is racy as the lock can be canceled right
@@ -1626,74 +1881,85 @@ struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
                         break;
                 }
         }
-        spin_unlock(&head->coh_lock_guard);
+        cfs_spin_unlock(&head->coh_lock_guard);
         RETURN(lock);
 }
 EXPORT_SYMBOL(cl_lock_at_page);
 
 /**
- * Returns a list of pages protected (only) by a given lock.
- *
- * Scans an extent of page radix tree, corresponding to the \a lock and queues
- * all pages that are not protected by locks other than \a lock into \a queue.
+ * Calculate the page offset at the layer of @lock.
+ * At the time of this writing, @page is top page and @lock is sub lock.
  */
-void cl_lock_page_list_fixup(const struct lu_env *env,
-                             struct cl_io *io, struct cl_lock *lock,
-                             struct cl_page_list *queue)
+static pgoff_t pgoff_at_lock(struct cl_page *page, struct cl_lock *lock)
 {
-        struct cl_page        *page;
-        struct cl_page        *temp;
-        struct cl_page_list   *plist = &cl_env_info(env)->clt_list;
-
-        LINVRNT(cl_lock_invariant(env, lock));
-        ENTRY;
+        struct lu_device_type *dtype;
+        const struct cl_page_slice *slice;
 
-        /* Now, we have a list of cl_pages under the \a lock, we need
-         * to check if some of pages are covered by other ldlm lock.
-         * If this is the case, they aren't needed to be written out this time.
-         *
-         * For example, we have A:[0,200] & B:[100,300] PW locks on client, now
-         * the latter is to be canceled, this means other client is
-         * reading/writing [200,300] since A won't canceled. Actually
-         * we just need to write the pages covered by [200,300]. This is safe,
-         * since [100,200] is also protected lock A.
-         */
+        dtype = lock->cll_descr.cld_obj->co_lu.lo_dev->ld_type;
+        slice = cl_page_at(page, dtype);
+        LASSERT(slice != NULL);
+        return slice->cpl_page->cp_index;
+}
 
-        cl_page_list_init(plist);
-        cl_page_list_for_each_safe(page, temp, queue) {
-                pgoff_t                idx = page->cp_index;
-                struct cl_lock        *found;
-                struct cl_lock_descr  *descr;
-
-                /* The algorithm counts on the index-ascending page index. */
-                LASSERT(ergo(&temp->cp_batch != &queue->pl_pages,
-                        page->cp_index < temp->cp_index));
-
-                found = cl_lock_at_page(env, lock->cll_descr.cld_obj,
-                                        page, lock, 0, 0);
-                if (found == NULL)
-                        continue;
-
-                descr = &found->cll_descr;
-                list_for_each_entry_safe_from(page, temp, &queue->pl_pages,
-                                              cp_batch) {
-                        idx = page->cp_index;
-                        if (descr->cld_start > idx || descr->cld_end < idx)
-                                break;
-                        cl_page_list_move(plist, queue, page);
+/**
+ * Check if page @page is covered by an extra lock or discard it.
+ */
+static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
+                                struct cl_page *page, void *cbdata)
+{
+        struct cl_thread_info *info = cl_env_info(env);
+        struct cl_lock *lock = cbdata;
+        pgoff_t index = pgoff_at_lock(page, lock);
+
+        if (index >= info->clt_fn_index) {
+                struct cl_lock *tmp;
+
+                /* refresh non-overlapped index */
+                tmp = cl_lock_at_page(env, lock->cll_descr.cld_obj, page, lock,
+                                      1, 0);
+                if (tmp != NULL) {
+                        /* Cache the first-non-overlapped index so as to skip
+                         * all pages within [index, clt_fn_index). This
+                         * is safe because if tmp lock is canceled, it will
+                         * discard these pages. */
+                        info->clt_fn_index = tmp->cll_descr.cld_end + 1;
+                        if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
+                                info->clt_fn_index = CL_PAGE_EOF;
+                        cl_lock_put(env, tmp);
+                } else { /* discard the page */
+                        cl_page_own(env, io, page);
+                        cl_page_unmap(env, io, page);
+                        cl_page_discard(env, io, page);
+                        cl_page_disown(env, io, page);
                 }
-                cl_lock_put(env, found);
         }
 
-        /* The pages in plist are covered by other locks, don't handle them
-         * this time.
-         */
-        if (io != NULL)
-                cl_page_list_disown(env, io, plist);
-        cl_page_list_fini(env, plist);
-        EXIT;
+        info->clt_next_index = index + 1;
+        return CLP_GANG_OKAY;
+}
+
+static int pageout_cb(const struct lu_env *env, struct cl_io *io,
+                      struct cl_page *page, void *cbdata)
+{
+        struct cl_thread_info *info  = cl_env_info(env);
+        struct cl_page_list   *queue = &info->clt_queue.c2_qin;
+        struct cl_lock        *lock  = cbdata;
+        typeof(cl_page_own)   *page_own;
+        int rc = CLP_GANG_OKAY;
+
+        page_own = queue->pl_nr ? cl_page_own_try : cl_page_own;
+        if (page_own(env, io, page) == 0) {
+                cl_page_list_add(queue, page);
+                info->clt_next_index = pgoff_at_lock(page, lock) + 1;
+        } else if (page->cp_state != CPS_FREEING) {
+                /* cl_page_own() won't fail unless
+                 * the page is being freed. */
+                LASSERT(queue->pl_nr != 0);
+                rc = CLP_GANG_AGAIN;
+        }
+
+        return rc;
 }
-EXPORT_SYMBOL(cl_lock_page_list_fixup);
 
 /**
  * Invalidate pages protected by the given lock, sending them out to the
@@ -1724,35 +1990,58 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
         struct cl_io          *io    = &info->clt_io;
         struct cl_2queue      *queue = &info->clt_queue;
         struct cl_lock_descr  *descr = &lock->cll_descr;
-        int                      result;
-        int                      rc0;
-        int                      rc1;
+        cl_page_gang_cb_t      cb;
+        long page_count;
+        int res;
+        int result;
 
         LINVRNT(cl_lock_invariant(env, lock));
         ENTRY;
 
         io->ci_obj = cl_object_top(descr->cld_obj);
         result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
-        if (result == 0) {
+        if (result != 0)
+                GOTO(out, result);
 
+        cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : pageout_cb;
+        info->clt_fn_index = info->clt_next_index = descr->cld_start;
+        do {
                 cl_2queue_init(queue);
-                cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
-                                    descr->cld_end, &queue->c2_qin);
-                if (queue->c2_qin.pl_nr > 0) {
+                res = cl_page_gang_lookup(env, descr->cld_obj, io,
+                                          info->clt_next_index, descr->cld_end,
+                                          cb, (void *)lock);
+                page_count = queue->c2_qin.pl_nr;
+                if (page_count > 0) {
+                        /* must be writeback case */
+                        LASSERTF(descr->cld_mode >= CLM_WRITE, "lock mode %s\n",
+                                 cl_lock_mode_name(descr->cld_mode));
+
                         result = cl_page_list_unmap(env, io, &queue->c2_qin);
                         if (!discard) {
-                                rc0 = cl_io_submit_rw(env, io,
-                                                      CRT_WRITE, queue);
-                                rc1 = cl_page_list_own(env, io,
-                                                       &queue->c2_qout);
-                                result = result ?: rc0 ?: rc1;
+                                long timeout = 600; /* 10 minutes. */
+                                /* for debug purpose, if this request can't be
+                                 * finished in 10 minutes, we hope it can
+                                 * notify us.
+                                 */
+                                result = cl_io_submit_sync(env, io, CRT_WRITE,
+                                                           queue, CRP_CANCEL,
+                                                           timeout);
+                                if (result)
+                                        CWARN("Writing %lu pages error: %d\n",
+                                              page_count, result);
                         }
-                        cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
                         cl_2queue_discard(env, io, queue);
                         cl_2queue_disown(env, io, queue);
+                        cl_2queue_fini(env, queue);
                 }
-                cl_2queue_fini(env, queue);
-        }
+
+                if (info->clt_next_index > descr->cld_end)
+                        break;
+
+                if (res == CLP_GANG_RESCHED)
+                        cfs_cond_resched();
+        } while (res != CLP_GANG_OKAY);
+out:
         cl_io_fini(env, io);
         RETURN(result);
 }
@@ -1780,17 +2069,29 @@ void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
         LASSERT(ergo(!cancel,
                      head->coh_tree.rnode == NULL && head->coh_pages == 0));
 
-        spin_lock(&head->coh_lock_guard);
-        while (!list_empty(&head->coh_locks)) {
+        cfs_spin_lock(&head->coh_lock_guard);
+        while (!cfs_list_empty(&head->coh_locks)) {
                 lock = container_of(head->coh_locks.next,
                                     struct cl_lock, cll_linkage);
                 cl_lock_get_trust(lock);
-                spin_unlock(&head->coh_lock_guard);
+                cfs_spin_unlock(&head->coh_lock_guard);
                 lu_ref_add(&lock->cll_reference, "prune", cfs_current());
+
+again:
                 cl_lock_mutex_get(env, lock);
                 if (lock->cll_state < CLS_FREEING) {
                         LASSERT(lock->cll_holds == 0);
-                        LASSERT(lock->cll_users == 0);
+                        LASSERT(lock->cll_users <= 1);
+                        if (unlikely(lock->cll_users == 1)) {
+                                struct l_wait_info lwi = { 0 };
+
+                                cl_lock_mutex_put(env, lock);
+                                l_wait_event(lock->cll_wq,
+                                             lock->cll_users == 0,
+                                             &lwi);
+                                goto again;
+                        }
+
                         if (cancel)
                                 cl_lock_cancel(env, lock);
                         cl_lock_delete(env, lock);
@@ -1798,27 +2099,13 @@ void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
                 cl_lock_mutex_put(env, lock);
                 lu_ref_del(&lock->cll_reference, "prune", cfs_current());
                 cl_lock_put(env, lock);
-                spin_lock(&head->coh_lock_guard);
+                cfs_spin_lock(&head->coh_lock_guard);
         }
-        spin_unlock(&head->coh_lock_guard);
+        cfs_spin_unlock(&head->coh_lock_guard);
         EXIT;
 }
 EXPORT_SYMBOL(cl_locks_prune);
 
-/**
- * Returns true if \a addr is an address of an allocated cl_lock. Used in
- * assertions. This check is optimistically imprecise, i.e., it occasionally
- * returns true for the incorrect addresses, but if it returns false, then the
- * address is guaranteed to be incorrect. (Should be named cl_lockp().)
- *
- * \see cl_is_page()
- */
-int cl_is_lock(const void *addr)
-{
-        return cfs_mem_is_in_cache(addr, cl_lock_kmem);
-}
-EXPORT_SYMBOL(cl_is_lock);
-
 static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env,
                                           const struct cl_io *io,
                                           const struct cl_lock_descr *need,
@@ -1833,7 +2120,8 @@ static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env,
                 if (IS_ERR(lock))
                         break;
                 cl_lock_mutex_get(env, lock);
-                if (lock->cll_state < CLS_FREEING) {
+                if (lock->cll_state < CLS_FREEING &&
+                    !(lock->cll_flags & CLF_CANCELLED)) {
                         cl_lock_hold_mod(env, lock, +1);
                         lu_ref_add(&lock->cll_holders, scope, source);
                         lu_ref_add(&lock->cll_reference, scope, source);
@@ -1873,45 +2161,43 @@ EXPORT_SYMBOL(cl_lock_hold);
  */
 struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
                                 const struct cl_lock_descr *need,
-                                __u32 enqflags,
                                 const char *scope, const void *source)
 {
         struct cl_lock       *lock;
-        const struct lu_fid  *fid;
         int                   rc;
-        int                   iter;
-        int warn;
+        __u32                 enqflags = need->cld_enq_flags;
 
         ENTRY;
-        fid = lu_object_fid(&io->ci_obj->co_lu);
-        iter = 0;
         do {
-                warn = iter >= 16 && IS_PO2(iter);
-                CDEBUG(warn ? D_WARNING : D_DLMTRACE,
-                       DDESCR"@"DFID" %i %08x `%s'\n",
-                       PDESCR(need), PFID(fid), iter, enqflags, scope);
                 lock = cl_lock_hold_mutex(env, io, need, scope, source);
-                if (!IS_ERR(lock)) {
-                        rc = cl_enqueue_locked(env, lock, io, enqflags);
-                        if (rc == 0) {
-                                if (cl_lock_fits_into(env, lock, need, io)) {
+                if (IS_ERR(lock))
+                        break;
+
+                rc = cl_enqueue_locked(env, lock, io, enqflags);
+                if (rc == 0) {
+                        if (cl_lock_fits_into(env, lock, need, io)) {
+                                if (!(enqflags & CEF_AGL)) {
                                         cl_lock_mutex_put(env, lock);
-                                        cl_lock_lockdep_acquire(env,
-                                                                lock, enqflags);
+                                        cl_lock_lockdep_acquire(env, lock,
+                                                                enqflags);
                                         break;
-                                } else if (warn)
-                                        CL_LOCK_DEBUG(D_WARNING, env, lock,
-                                                      "got (see bug 17665)\n");
-                                cl_unuse_locked(env, lock);
+                                }
+                                rc = 1;
                         }
-                        cl_lock_hold_release(env, lock, scope, source);
-                        cl_lock_mutex_put(env, lock);
-                        lu_ref_del(&lock->cll_reference, scope, source);
-                        cl_lock_put(env, lock);
+                        cl_unuse_locked(env, lock);
+                }
+                cl_lock_trace(D_DLMTRACE, env,
+                              rc <= 0 ? "enqueue failed" : "agl succeed", lock);
+                cl_lock_hold_release(env, lock, scope, source);
+                cl_lock_mutex_put(env, lock);
+                lu_ref_del(&lock->cll_reference, scope, source);
+                cl_lock_put(env, lock);
+                if (rc > 0) {
+                        LASSERT(enqflags & CEF_AGL);
+                        lock = NULL;
+                } else if (rc != 0) {
                         lock = ERR_PTR(rc);
-                } else
-                        rc = PTR_ERR(lock);
-                iter++;
+                }
         } while (rc == 0);
         RETURN(lock);
 }
@@ -1960,6 +2246,7 @@ void cl_lock_release(const struct lu_env *env, struct cl_lock *lock,
 {
         LINVRNT(cl_lock_invariant(env, lock));
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "release lock", lock);
         cl_lock_mutex_get(env, lock);
         cl_lock_hold_release(env, lock, scope, source);
         cl_lock_mutex_put(env, lock);
@@ -1980,7 +2267,7 @@ void cl_lock_user_add(const struct lu_env *env, struct cl_lock *lock)
 }
 EXPORT_SYMBOL(cl_lock_user_add);
 
-int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
+void cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
 {
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(cl_lock_invariant(env, lock));
@@ -1988,40 +2275,24 @@ int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
 
         ENTRY;
         cl_lock_used_mod(env, lock, -1);
-        RETURN(lock->cll_users == 0);
+        if (lock->cll_users == 0)
+                cfs_waitq_broadcast(&lock->cll_wq);
+        EXIT;
 }
 EXPORT_SYMBOL(cl_lock_user_del);
 
-/**
- * Check if two lock's mode are compatible.
- *
- * This returns true iff en-queuing \a lock2 won't cause cancellation of \a
- * lock1 even when these locks overlap.
- */
-int cl_lock_compatible(const struct cl_lock *lock1, const struct cl_lock *lock2)
-{
-        enum cl_lock_mode mode1;
-        enum cl_lock_mode mode2;
-
-        ENTRY;
-        mode1 = lock1->cll_descr.cld_mode;
-        mode2 = lock2->cll_descr.cld_mode;
-        RETURN(mode2 == CLM_PHANTOM ||
-               (mode1 == CLM_READ && mode2 == CLM_READ));
-}
-EXPORT_SYMBOL(cl_lock_compatible);
-
 const char *cl_lock_mode_name(const enum cl_lock_mode mode)
 {
         static const char *names[] = {
-                [CLM_PHANTOM] = "PHANTOM",
-                [CLM_READ]    = "READ",
-                [CLM_WRITE]   = "WRITE"
+                [CLM_PHANTOM] = "P",
+                [CLM_READ]    = "R",
+                [CLM_WRITE]   = "W",
+                [CLM_GROUP]   = "G"
         };
         if (0 <= mode && mode < ARRAY_SIZE(names))
                 return names[mode];
         else
-                return "UNKNW";
+                return "U";
 }
 EXPORT_SYMBOL(cl_lock_mode_name);
 
@@ -2047,13 +2318,13 @@ void cl_lock_print(const struct lu_env *env, void *cookie,
 {
         const struct cl_lock_slice *slice;
         (*printer)(env, cookie, "lock@%p[%d %d %d %d %d %08lx] ",
-                   lock, atomic_read(&lock->cll_ref),
+                   lock, cfs_atomic_read(&lock->cll_ref),
                    lock->cll_state, lock->cll_error, lock->cll_holds,
                    lock->cll_users, lock->cll_flags);
         cl_lock_descr_print(env, cookie, printer, &lock->cll_descr);
         (*printer)(env, cookie, " {\n");
 
-        list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
+        cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
                 (*printer)(env, cookie, "    %s@%p: ",
                            slice->cls_obj->co_lu.lo_dev->ld_type->ldt_name,
                            slice);