Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 94445d3..c41a2f1 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/ldlm/ldlm_lock.c
  *
@@ -57,7 +56,8 @@ char *ldlm_lockname[] = {
        [LCK_CR] = "CR",
        [LCK_NL] = "NL",
        [LCK_GROUP] = "GROUP",
-       [LCK_COS] = "COS"
+       [LCK_COS] = "COS",
+       [LCK_TXN] = "TXN"
 };
 EXPORT_SYMBOL(ldlm_lockname);
 
@@ -146,7 +146,7 @@ static ldlm_processing_policy ldlm_processing_policy_table[] = {
 
 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
 {
-        return ldlm_processing_policy_table[res->lr_type];
+       return ldlm_processing_policy_table[res->lr_type];
 }
 EXPORT_SYMBOL(ldlm_get_processing_policy);
 
@@ -166,13 +166,11 @@ ldlm_reprocessing_policy ldlm_get_reprocessing_policy(struct ldlm_resource *res)
 
 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
 {
-        ns->ns_policy = arg;
+       ns->ns_policy = arg;
 }
 EXPORT_SYMBOL(ldlm_register_intent);
 
-/*
- * REFCOUNTED LOCK OBJECTS
- */
+/* REFCOUNTED LOCK OBJECTS */
 
 
 /**
@@ -186,7 +184,7 @@ EXPORT_SYMBOL(ldlm_register_intent);
 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
 {
        refcount_inc(&lock->l_handle.h_ref);
-        return lock;
+       return lock;
 }
 EXPORT_SYMBOL(ldlm_lock_get);
 
@@ -206,34 +204,34 @@ static void lock_handle_free(struct rcu_head *rcu)
  */
 void ldlm_lock_put(struct ldlm_lock *lock)
 {
-        ENTRY;
+       ENTRY;
 
-        LASSERT(lock->l_resource != LP_POISON);
+       LASSERT(lock->l_resource != LP_POISON);
        LASSERT(refcount_read(&lock->l_handle.h_ref) > 0);
        if (refcount_dec_and_test(&lock->l_handle.h_ref)) {
-                struct ldlm_resource *res;
+               struct ldlm_resource *res;
 
-                LDLM_DEBUG(lock,
-                           "final lock_put on destroyed lock, freeing it.");
+               LDLM_DEBUG(lock,
+                          "final lock_put on destroyed lock, freeing it.");
 
-                res = lock->l_resource;
+               res = lock->l_resource;
                LASSERT(ldlm_is_destroyed(lock));
                LASSERT(list_empty(&lock->l_exp_list));
                LASSERT(list_empty(&lock->l_res_link));
                LASSERT(list_empty(&lock->l_pending_chain));
 
-                lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
-                                     LDLM_NSS_LOCKS);
-                lu_ref_del(&res->lr_reference, "lock", lock);
-                if (lock->l_export) {
-                        class_export_lock_put(lock->l_export, lock);
-                        lock->l_export = NULL;
-                }
+               lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
+                                    LDLM_NSS_LOCKS);
+               lu_ref_del(&res->lr_reference, "lock", lock);
+               if (lock->l_export) {
+                       class_export_lock_put(lock->l_export, lock);
+                       lock->l_export = NULL;
+               }
 
-                if (lock->l_lvb_data != NULL)
-                        OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
+               if (lock->l_lvb_data != NULL)
+                       OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
 
-               if (res->lr_type == LDLM_EXTENT) {
+               if (res->lr_type == LDLM_EXTENT || res->lr_type == LDLM_FLOCK) {
                        ldlm_interval_free(ldlm_interval_detach(lock));
                } else if (res->lr_type == LDLM_IBITS) {
                        if (lock->l_ibits_node != NULL)
@@ -242,11 +240,15 @@ void ldlm_lock_put(struct ldlm_lock *lock)
                }
                ldlm_resource_putref(res);
                lock->l_resource = NULL;
-                lu_ref_fini(&lock->l_reference);
-               call_rcu(&lock->l_handle.h_rcu, lock_handle_free);
-        }
+               lu_ref_fini(&lock->l_reference);
+               if (lock->l_flags & BIT(63))
+                       /* Performance testing - bypassing RCU removes overhead */
+                       lock_handle_free(&lock->l_handle.h_rcu);
+               else
+                       call_rcu(&lock->l_handle.h_rcu, lock_handle_free);
+       }
 
-        EXIT;
+       EXIT;
 }
 EXPORT_SYMBOL(ldlm_lock_put);
 
@@ -256,6 +258,7 @@ EXPORT_SYMBOL(ldlm_lock_put);
 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
 {
        int rc = 0;
+
        if (!list_empty(&lock->l_lru)) {
                struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 
@@ -301,9 +304,7 @@ int ldlm_lock_remove_from_lru_check(struct ldlm_lock *lock, ktime_t last_use)
        RETURN(rc);
 }
 
-/**
- * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
- */
+/* Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.  */
 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
 {
        struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
@@ -316,11 +317,8 @@ void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
        ns->ns_nr_unused++;
 }
 
-/**
- * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
- * first.
- */
-void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
+/* Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks first */
+static void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
 {
        struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 
@@ -376,17 +374,17 @@ void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
  */
 static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
 {
-        ENTRY;
+       ENTRY;
 
-        if (lock->l_readers || lock->l_writers) {
-                LDLM_ERROR(lock, "lock still has references");
-                LBUG();
-        }
+       if (lock->l_readers || lock->l_writers) {
+               LDLM_ERROR(lock, "lock still has references");
+               LBUG();
+       }
 
        if (!list_empty(&lock->l_res_link)) {
-                LDLM_ERROR(lock, "lock still on resource");
-                LBUG();
-        }
+               LDLM_ERROR(lock, "lock still on resource");
+               LBUG();
+       }
 
        if (ldlm_is_destroyed(lock)) {
                LASSERT(list_empty(&lock->l_lru));
@@ -394,57 +392,53 @@ static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
                return 0;
        }
        ldlm_set_destroyed(lock);
+       wake_up(&lock->l_waitq);
 
        if (lock->l_export && lock->l_export->exp_lock_hash) {
-               /* NB: it's safe to call cfs_hash_del() even lock isn't
-                * in exp_lock_hash. */
-               /* In the function below, .hs_keycmp resolves to
-                * ldlm_export_lock_keycmp() */
-               /* coverity[overrun-buffer-val] */
+               /* Safe to call cfs_hash_del as lock isn't in exp_lock_hash. */
+               /* below, .hs_keycmp resolves to ldlm_export_lock_keycmp() */
                cfs_hash_del(lock->l_export->exp_lock_hash,
                             &lock->l_remote_handle, &lock->l_exp_hash);
        }
 
-        ldlm_lock_remove_from_lru(lock);
-        class_handle_unhash(&lock->l_handle);
+       ldlm_lock_remove_from_lru(lock);
+       class_handle_unhash(&lock->l_handle);
 
-        EXIT;
-        return 1;
+       EXIT;
+       return 1;
 }
 
-/**
- * Destroys a LDLM lock \a lock. Performs necessary locking first.
- */
+/* Destroys a LDLM lock \a lock. Performs necessary locking first.  */
 void ldlm_lock_destroy(struct ldlm_lock *lock)
 {
-        int first;
-        ENTRY;
-        lock_res_and_lock(lock);
-        first = ldlm_lock_destroy_internal(lock);
-        unlock_res_and_lock(lock);
+       int first;
+
+       ENTRY;
+       lock_res_and_lock(lock);
+       first = ldlm_lock_destroy_internal(lock);
+       unlock_res_and_lock(lock);
 
-        /* drop reference from hashtable only for first destroy */
-        if (first) {
-                lu_ref_del(&lock->l_reference, "hash", lock);
-                LDLM_LOCK_RELEASE(lock);
-        }
-        EXIT;
+       /* drop reference from hashtable only for first destroy */
+       if (first) {
+               lu_ref_del(&lock->l_reference, "hash", lock);
+               LDLM_LOCK_RELEASE(lock);
+       }
+       EXIT;
 }
 
-/**
- * Destroys a LDLM lock \a lock that is already locked.
- */
+/* Destroys a LDLM lock \a lock that is already locked.  */
 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
 {
-        int first;
-        ENTRY;
-        first = ldlm_lock_destroy_internal(lock);
-        /* drop reference from hashtable only for first destroy */
-        if (first) {
-                lu_ref_del(&lock->l_reference, "hash", lock);
-                LDLM_LOCK_RELEASE(lock);
-        }
-        EXIT;
+       int first;
+
+       ENTRY;
+       first = ldlm_lock_destroy_internal(lock);
+       /* drop reference from hashtable only for first destroy */
+       if (first) {
+               lu_ref_del(&lock->l_reference, "hash", lock);
+               LDLM_LOCK_RELEASE(lock);
+       }
+       EXIT;
 }
 
 static const char lock_handle_owner[] = "ldlm";
@@ -460,6 +454,7 @@ static const char lock_handle_owner[] = "ldlm";
 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
 {
        struct ldlm_lock *lock;
+
        ENTRY;
 
        if (resource == NULL)
@@ -469,8 +464,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
        if (lock == NULL)
                RETURN(NULL);
 
-       spin_lock_init(&lock->l_lock);
-       lock->l_resource = resource;
+       RCU_INIT_POINTER(lock->l_resource, resource);
        lu_ref_add(&resource->lr_reference, "lock", lock);
 
        refcount_set(&lock->l_handle.h_ref, 2);
@@ -487,88 +481,117 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
        INIT_HLIST_NODE(&lock->l_exp_hash);
        INIT_HLIST_NODE(&lock->l_exp_flock_hash);
 
-        lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
-                             LDLM_NSS_LOCKS);
+       lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
+                            LDLM_NSS_LOCKS);
        INIT_HLIST_NODE(&lock->l_handle.h_link);
        class_handle_hash(&lock->l_handle, lock_handle_owner);
 
-        lu_ref_init(&lock->l_reference);
-        lu_ref_add(&lock->l_reference, "hash", lock);
-        lock->l_callback_timeout = 0;
+       lu_ref_init(&lock->l_reference);
+       lu_ref_add(&lock->l_reference, "hash", lock);
+       lock->l_callback_timestamp = 0;
        lock->l_activity = 0;
 
 #if LUSTRE_TRACKS_LOCK_EXP_REFS
        INIT_LIST_HEAD(&lock->l_exp_refs_link);
-        lock->l_exp_refs_nr = 0;
-        lock->l_exp_refs_target = NULL;
+       lock->l_exp_refs_nr = 0;
+       lock->l_exp_refs_target = NULL;
 #endif
        INIT_LIST_HEAD(&lock->l_exp_list);
 
-        RETURN(lock);
+       RETURN(lock);
 }
 
+struct ldlm_lock *ldlm_lock_new_testing(struct ldlm_resource *resource)
+{
+       struct ldlm_lock *lock = ldlm_lock_new(resource);
+       int rc;
+
+       if (!lock)
+               return NULL;
+       lock->l_flags |= BIT(63);
+       switch (resource->lr_type) {
+       case LDLM_EXTENT:
+               rc = ldlm_extent_alloc_lock(lock);
+               break;
+       case LDLM_IBITS:
+               rc = ldlm_inodebits_alloc_lock(lock);
+               break;
+       default:
+               rc = 0;
+       }
+
+       if (!rc)
+               return lock;
+       ldlm_lock_destroy(lock);
+       LDLM_LOCK_RELEASE(lock);
+       return NULL;
+}
+EXPORT_SYMBOL(ldlm_lock_new_testing);
+
 /**
  * Moves LDLM lock \a lock to another resource.
  * This is used on client when server returns some other lock than requested
  * (typically as a result of intent operation)
  */
 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
-                              const struct ldlm_res_id *new_resid)
+                             const struct ldlm_res_id *new_resid)
 {
-        struct ldlm_resource *oldres = lock->l_resource;
-        struct ldlm_resource *newres;
-        int type;
-        ENTRY;
+       struct ldlm_resource *oldres;
+       struct ldlm_resource *newres;
+       int type;
 
-        LASSERT(ns_is_client(ns));
+       ENTRY;
+
+       LASSERT(ns_is_client(ns));
 
-        lock_res_and_lock(lock);
-        if (memcmp(new_resid, &lock->l_resource->lr_name,
-                   sizeof(lock->l_resource->lr_name)) == 0) {
-                /* Nothing to do */
-                unlock_res_and_lock(lock);
-                RETURN(0);
-        }
+       oldres = lock_res_and_lock(lock);
+       if (memcmp(new_resid, &oldres->lr_name,
+                  sizeof(oldres->lr_name)) == 0) {
+               /* Nothing to do */
+               unlock_res_and_lock(lock);
+               RETURN(0);
+       }
 
-        LASSERT(new_resid->name[0] != 0);
+       LASSERT(new_resid->name[0] != 0);
 
-        /* This function assumes that the lock isn't on any lists */
+       /* This function assumes that the lock isn't on any lists */
        LASSERT(list_empty(&lock->l_res_link));
 
-        type = oldres->lr_type;
-        unlock_res_and_lock(lock);
+       type = oldres->lr_type;
+       unlock_res_and_lock(lock);
 
-       newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
+       newres = ldlm_resource_get(ns, new_resid, type, 1);
        if (IS_ERR(newres))
                RETURN(PTR_ERR(newres));
 
-        lu_ref_add(&newres->lr_reference, "lock", lock);
-        /*
-         * To flip the lock from the old to the new resource, lock, oldres and
-         * newres have to be locked. Resource spin-locks are nested within
-         * lock->l_lock, and are taken in the memory address order to avoid
-         * dead-locks.
-         */
-       spin_lock(&lock->l_lock);
-        oldres = lock->l_resource;
-        if (oldres < newres) {
-                lock_res(oldres);
-                lock_res_nested(newres, LRT_NEW);
-        } else {
-                lock_res(newres);
-                lock_res_nested(oldres, LRT_NEW);
-        }
-        LASSERT(memcmp(new_resid, &oldres->lr_name,
-                       sizeof oldres->lr_name) != 0);
-        lock->l_resource = newres;
-        unlock_res(oldres);
-        unlock_res_and_lock(lock);
-
-        /* ...and the flowers are still standing! */
-        lu_ref_del(&oldres->lr_reference, "lock", lock);
-        ldlm_resource_putref(oldres);
-
-        RETURN(0);
+       lu_ref_add(&newres->lr_reference, "lock", lock);
+       /*
+        * To flip the lock from the old to the new resource, oldres
+        * and newres have to be locked. Resource spin-locks are taken
+        * in the memory address order to avoid dead-locks.
+        * As this is the only circumstance where ->l_resource
+        * can change, and this cannot race with itself, it is safe
+        * to access lock->l_resource without being careful about locking.
+        */
+       oldres = lock->l_resource;
+       if (oldres < newres) {
+               lock_res(oldres);
+               lock_res_nested(newres, LRT_NEW);
+       } else {
+               lock_res(newres);
+               lock_res_nested(oldres, LRT_NEW);
+       }
+       LASSERT(memcmp(new_resid, &oldres->lr_name,
+                      sizeof(oldres->lr_name)) != 0);
+       rcu_assign_pointer(lock->l_resource, newres);
+       unlock_res(oldres);
+       unlock_res(newres);
+
+       /* ...and the flowers are still standing! */
+       lu_ref_del(&oldres->lr_reference, "lock", lock);
+       ldlm_resource_putref(oldres);
+
+       RETURN(0);
 }
 
 /** \defgroup ldlm_handles LDLM HANDLES
@@ -596,6 +619,7 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
                                     __u64 flags)
 {
        struct ldlm_lock *lock;
+
        ENTRY;
 
        LASSERT(handle);
@@ -615,9 +639,10 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
        }
 
        /* It's unlikely but possible that someone marked the lock as
-        * destroyed after we did handle2object on it */
+        * destroyed after we did handle2object on it
+        */
        if ((flags == 0) && !ldlm_is_destroyed(lock)) {
-               lu_ref_add(&lock->l_reference, "handle", current);
+               lu_ref_add_atomic(&lock->l_reference, "handle", lock);
                RETURN(lock);
        }
 
@@ -625,7 +650,7 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
 
        LASSERT(lock->l_resource != NULL);
 
-       lu_ref_add_atomic(&lock->l_reference, "handle", current);
+       lu_ref_add_atomic(&lock->l_reference, "handle", lock);
        if (unlikely(ldlm_is_destroyed(lock))) {
                unlock_res_and_lock(lock);
                CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
@@ -676,7 +701,8 @@ static void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
                LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
                ldlm_set_ast_sent(lock);
                /* If the enqueuing client said so, tell the AST recipient to
-                * discard dirty data, rather than writing back. */
+                * discard dirty data, rather than writing back.
+                */
                if (ldlm_is_ast_discard_data(new))
                        ldlm_set_discard_data(lock);
 
@@ -694,19 +720,17 @@ static void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
        }
 }
 
-/**
- * Add a lock to list of just granted locks to send completion AST to.
- */
+/* Add a lock to list of just granted locks to send completion AST to.  */
 static void ldlm_add_cp_work_item(struct ldlm_lock *lock,
                                  struct list_head *work_list)
 {
        if (!ldlm_is_cp_reqd(lock)) {
                ldlm_set_cp_reqd(lock);
-                LDLM_DEBUG(lock, "lock granted; sending completion AST.");
+               LDLM_DEBUG(lock, "lock granted; sending completion AST.");
                LASSERT(list_empty(&lock->l_cp_ast));
                list_add(&lock->l_cp_ast, work_list);
-                LDLM_LOCK_GET(lock);
-        }
+               LDLM_LOCK_GET(lock);
+       }
 }
 
 /**
@@ -718,13 +742,13 @@ static void ldlm_add_cp_work_item(struct ldlm_lock *lock,
 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
                            struct list_head *work_list)
 {
-        ENTRY;
-        check_res_locked(lock->l_resource);
-        if (new)
-                ldlm_add_bl_work_item(lock, new, work_list);
-        else
-                ldlm_add_cp_work_item(lock, work_list);
-        EXIT;
+       ENTRY;
+       check_res_locked(lock->l_resource);
+       if (new)
+               ldlm_add_bl_work_item(lock, new, work_list);
+       else
+               ldlm_add_cp_work_item(lock, work_list);
+       EXIT;
 }
 
 /**
@@ -753,18 +777,18 @@ EXPORT_SYMBOL(ldlm_lock_addref);
 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock,
                                      enum ldlm_mode mode)
 {
-        ldlm_lock_remove_from_lru(lock);
-        if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
-                lock->l_readers++;
-                lu_ref_add_atomic(&lock->l_reference, "reader", lock);
-        }
-        if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
-                lock->l_writers++;
-                lu_ref_add_atomic(&lock->l_reference, "writer", lock);
-        }
-        LDLM_LOCK_GET(lock);
-        lu_ref_add_atomic(&lock->l_reference, "user", lock);
-        LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
+       ldlm_lock_remove_from_lru(lock);
+       if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
+               lock->l_readers++;
+               lu_ref_add_atomic(&lock->l_reference, "reader", lock);
+       }
+       if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS | LCK_TXN)) {
+               lock->l_writers++;
+               lu_ref_add_atomic(&lock->l_reference, "writer", lock);
+       }
+       LDLM_LOCK_GET(lock);
+       lu_ref_add_atomic(&lock->l_reference, "user", lock);
+       LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
 }
 
 /**
@@ -777,22 +801,22 @@ void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock,
  */
 int ldlm_lock_addref_try(const struct lustre_handle *lockh, enum ldlm_mode mode)
 {
-        struct ldlm_lock *lock;
-        int               result;
+       struct ldlm_lock *lock;
+       int               result;
 
-        result = -EAGAIN;
-        lock = ldlm_handle2lock(lockh);
-        if (lock != NULL) {
-                lock_res_and_lock(lock);
-                if (lock->l_readers != 0 || lock->l_writers != 0 ||
+       result = -EAGAIN;
+       lock = ldlm_handle2lock(lockh);
+       if (lock != NULL) {
+               lock_res_and_lock(lock);
+               if (lock->l_readers != 0 || lock->l_writers != 0 ||
                    !ldlm_is_cbpending(lock)) {
-                        ldlm_lock_addref_internal_nolock(lock, mode);
-                        result = 0;
-                }
-                unlock_res_and_lock(lock);
-                LDLM_LOCK_PUT(lock);
-        }
-        return result;
+                       ldlm_lock_addref_internal_nolock(lock, mode);
+                       result = 0;
+               }
+               unlock_res_and_lock(lock);
+               LDLM_LOCK_PUT(lock);
+       }
+       return result;
 }
 EXPORT_SYMBOL(ldlm_lock_addref_try);
 
@@ -818,20 +842,20 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock,
                                      enum ldlm_mode mode)
 {
-        LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
-        if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
-                LASSERT(lock->l_readers > 0);
-                lu_ref_del(&lock->l_reference, "reader", lock);
-                lock->l_readers--;
-        }
-        if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
-                LASSERT(lock->l_writers > 0);
-                lu_ref_del(&lock->l_reference, "writer", lock);
-                lock->l_writers--;
-        }
+       LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
+       if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
+               LASSERT(lock->l_readers > 0);
+               lu_ref_del(&lock->l_reference, "reader", lock);
+               lock->l_readers--;
+       }
+       if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS | LCK_TXN)) {
+               LASSERT(lock->l_writers > 0);
+               lu_ref_del(&lock->l_reference, "writer", lock);
+               lock->l_writers--;
+       }
 
-        lu_ref_del(&lock->l_reference, "user", lock);
-        LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
+       lu_ref_del(&lock->l_reference, "user", lock);
+       LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
 }
 
 /**
@@ -844,14 +868,15 @@ void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock,
  */
 void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
 {
-        struct ldlm_namespace *ns;
-        ENTRY;
+       struct ldlm_namespace *ns;
 
-        lock_res_and_lock(lock);
+       ENTRY;
 
-        ns = ldlm_lock_to_ns(lock);
+       lock_res_and_lock(lock);
 
-        ldlm_lock_decref_internal_nolock(lock, mode);
+       ns = ldlm_lock_to_ns(lock);
+
+       ldlm_lock_decref_internal_nolock(lock, mode);
 
        if ((ldlm_is_local(lock) || lock->l_req_mode == LCK_GROUP) &&
            !lock->l_readers && !lock->l_writers) {
@@ -863,57 +888,56 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
                 * like non-group locks, instead they are manually released.
                 * They have an l_writers reference which they keep until
                 * they are manually released, so we remove them when they have
-                * no more reader or writer references. - LU-6368 */
+                * no more reader or writer references. - LU-6368
+                */
                ldlm_set_cbpending(lock);
        }
 
        if (!lock->l_readers && !lock->l_writers && ldlm_is_cbpending(lock)) {
+               unsigned int mask = D_DLMTRACE;
+
                /* If we received a blocked AST and this was the last reference,
-                * run the callback. */
+                * run the callback.
+                */
                if (ldlm_is_ns_srv(lock) && lock->l_export)
-                        CERROR("FL_CBPENDING set on non-local lock--just a "
-                               "warning\n");
+                       mask |= D_WARNING;
+               LDLM_DEBUG_LIMIT(mask, lock,
+                                "final decref done on %sCBPENDING lock",
+                                mask & D_WARNING ? "non-local " : "");
 
-                LDLM_DEBUG(lock, "final decref done on cbpending lock");
-
-                LDLM_LOCK_GET(lock); /* dropped by bl thread */
-                ldlm_lock_remove_from_lru(lock);
-                unlock_res_and_lock(lock);
+               LDLM_LOCK_GET(lock); /* dropped by bl thread */
+               ldlm_lock_remove_from_lru(lock);
+               unlock_res_and_lock(lock);
 
                if (ldlm_is_fail_loc(lock))
-                        OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+                       CFS_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
 
-               if (ldlm_is_atomic_cb(lock) ||
-                    ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
-                        ldlm_handle_bl_callback(ns, NULL, lock);
-        } else if (ns_is_client(ns) &&
-                   !lock->l_readers && !lock->l_writers &&
+               if (ldlm_is_atomic_cb(lock) || ldlm_is_local(lock) ||
+                   ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
+                       ldlm_handle_bl_callback(ns, NULL, lock);
+       } else if (ns_is_client(ns) &&
+                  !lock->l_readers && !lock->l_writers &&
                   !ldlm_is_no_lru(lock) &&
                   !ldlm_is_bl_ast(lock) &&
                   !ldlm_is_converting(lock)) {
 
-                LDLM_DEBUG(lock, "add lock into lru list");
-
-                /* If this is a client-side namespace and this was the last
-                 * reference, put it on the LRU. */
-                ldlm_lock_add_to_lru(lock);
-                unlock_res_and_lock(lock);
+               /* If this is a client-side namespace and this was the last
+                * reference, put it on the LRU.
+                */
+               ldlm_lock_add_to_lru(lock);
+               unlock_res_and_lock(lock);
+               LDLM_DEBUG(lock, "add lock into lru list");
 
                if (ldlm_is_fail_loc(lock))
-                        OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+                       CFS_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
 
-                /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
-                 * are not supported by the server, otherwise, it is done on
-                 * enqueue. */
-                if (!exp_connect_cancelset(lock->l_conn_export) &&
-                    !ns_connect_lru_resize(ns))
-                       ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
-        } else {
-                LDLM_DEBUG(lock, "do not add lock into lru list");
-                unlock_res_and_lock(lock);
-        }
+               ldlm_pool_recalc(&ns->ns_pool, true);
+       } else {
+               LDLM_DEBUG(lock, "do not add lock into lru list");
+               unlock_res_and_lock(lock);
+       }
 
-        EXIT;
+       EXIT;
 }
 
 /**
@@ -921,10 +945,11 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
  */
 void ldlm_lock_decref(const struct lustre_handle *lockh, enum ldlm_mode mode)
 {
-        struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
+       struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
+
        LASSERTF(lock != NULL, "Non-existing lock: %#llx\n", lockh->cookie);
-        ldlm_lock_decref_internal(lock, mode);
-        LDLM_LOCK_PUT(lock);
+       ldlm_lock_decref_internal(lock, mode);
+       LDLM_LOCK_PUT(lock);
 }
 EXPORT_SYMBOL(ldlm_lock_decref);
 
@@ -937,17 +962,18 @@ EXPORT_SYMBOL(ldlm_lock_decref);
 void ldlm_lock_decref_and_cancel(const struct lustre_handle *lockh,
                                 enum ldlm_mode mode)
 {
-        struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
-        ENTRY;
+       struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
 
-        LASSERT(lock != NULL);
+       ENTRY;
+
+       LASSERT(lock != NULL);
 
-        LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
-        lock_res_and_lock(lock);
+       LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
+       lock_res_and_lock(lock);
        ldlm_set_cbpending(lock);
-        unlock_res_and_lock(lock);
-        ldlm_lock_decref_internal(lock, mode);
-        LDLM_LOCK_PUT(lock);
+       unlock_res_and_lock(lock);
+       ldlm_lock_decref_internal(lock, mode);
+       LDLM_LOCK_PUT(lock);
 }
 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
 
@@ -972,83 +998,78 @@ struct sl_insert_point {
  *  - ldlm_grant_lock_with_skiplist
  */
 static void search_granted_lock(struct list_head *queue,
-                                struct ldlm_lock *req,
-                                struct sl_insert_point *prev)
+                               struct ldlm_lock *req,
+                               struct sl_insert_point *prev)
 {
-       struct list_head *tmp;
-        struct ldlm_lock *lock, *mode_end, *policy_end;
-        ENTRY;
-
-       list_for_each(tmp, queue) {
-               lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+       struct ldlm_lock *lock, *mode_end, *policy_end;
 
+       ENTRY;
+       list_for_each_entry(lock, queue, l_res_link) {
                mode_end = list_entry(lock->l_sl_mode.prev,
-                                          struct ldlm_lock, l_sl_mode);
-
-                if (lock->l_req_mode != req->l_req_mode) {
-                        /* jump to last lock of mode group */
-                        tmp = &mode_end->l_res_link;
-                        continue;
-                }
-
-                /* suitable mode group is found */
-                if (lock->l_resource->lr_type == LDLM_PLAIN) {
-                        /* insert point is last lock of the mode group */
-                        prev->res_link = &mode_end->l_res_link;
-                        prev->mode_link = &mode_end->l_sl_mode;
-                        prev->policy_link = &req->l_sl_policy;
-                        EXIT;
-                        return;
-                } else if (lock->l_resource->lr_type == LDLM_IBITS) {
-                        for (;;) {
-                                policy_end =
+                                     struct ldlm_lock, l_sl_mode);
+
+               if (lock->l_req_mode != req->l_req_mode) {
+                       /* jump to last lock of mode group */
+                       lock = mode_end;
+                       continue;
+               }
+
+               /* suitable mode group is found */
+               if (lock->l_resource->lr_type == LDLM_PLAIN) {
+                       /* insert point is last lock of the mode group */
+                       prev->res_link = &mode_end->l_res_link;
+                       prev->mode_link = &mode_end->l_sl_mode;
+                       prev->policy_link = &req->l_sl_policy;
+                       EXIT;
+                       return;
+               } else if (lock->l_resource->lr_type == LDLM_IBITS) {
+                       for (;;) {
+                               policy_end =
                                        list_entry(lock->l_sl_policy.prev,
-                                                       struct ldlm_lock,
-                                                       l_sl_policy);
-
-                                if (lock->l_policy_data.l_inodebits.bits ==
-                                    req->l_policy_data.l_inodebits.bits) {
-                                        /* insert point is last lock of
-                                         * the policy group */
-                                        prev->res_link =
-                                                &policy_end->l_res_link;
-                                        prev->mode_link =
-                                                &policy_end->l_sl_mode;
-                                        prev->policy_link =
-                                                &policy_end->l_sl_policy;
-                                        EXIT;
-                                        return;
-                                }
-
-                                if (policy_end == mode_end)
-                                        /* done with mode group */
-                                        break;
-
-                                /* go to next policy group within mode group */
-                                tmp = policy_end->l_res_link.next;
-                               lock = list_entry(tmp, struct ldlm_lock,
-                                                      l_res_link);
-                        }  /* loop over policy groups within the mode group */
-
-                        /* insert point is last lock of the mode group,
-                         * new policy group is started */
-                        prev->res_link = &mode_end->l_res_link;
-                        prev->mode_link = &mode_end->l_sl_mode;
-                        prev->policy_link = &req->l_sl_policy;
-                        EXIT;
-                        return;
-                } else {
-                        LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
-                        LBUG();
-                }
-        }
-
-        /* insert point is last lock on the queue,
-         * new mode group and new policy group are started */
-        prev->res_link = queue->prev;
-        prev->mode_link = &req->l_sl_mode;
-        prev->policy_link = &req->l_sl_policy;
-        EXIT;
+                                                  struct ldlm_lock,
+                                                  l_sl_policy);
+
+                               if (lock->l_policy_data.l_inodebits.bits ==
+                                   req->l_policy_data.l_inodebits.bits) {
+                                       /* inserting last lock of policy grp */
+                                       prev->res_link =
+                                               &policy_end->l_res_link;
+                                       prev->mode_link =
+                                               &policy_end->l_sl_mode;
+                                       prev->policy_link =
+                                               &policy_end->l_sl_policy;
+                                       EXIT;
+                                       return;
+                               }
+
+                               if (policy_end == mode_end)
+                                       /* done with mode group */
+                                       break;
+
+                               /* go to next policy group within mode group */
+                               lock = list_next_entry(policy_end, l_res_link);
+                       }  /* loop over policy groups within the mode group */
+
+                       /* insert point is last lock of the mode group,
+                        * new policy group is started
+                        */
+                       prev->res_link = &mode_end->l_res_link;
+                       prev->mode_link = &mode_end->l_sl_mode;
+                       prev->policy_link = &req->l_sl_policy;
+                       EXIT;
+                       return;
+               }
+               LDLM_ERROR(lock, "is not LDLM_PLAIN or LDLM_IBITS lock");
+               LBUG();
+       }
+
+       /* insert point is last lock on the queue,
+        * new mode group and new policy group are started
+        */
+       prev->res_link = queue->prev;
+       prev->mode_link = &req->l_sl_mode;
+       prev->policy_link = &req->l_sl_policy;
+       EXIT;
 }
 
 /**
@@ -1056,20 +1077,21 @@ static void search_granted_lock(struct list_head *queue,
  * \a prev.
  */
 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
-                                       struct sl_insert_point *prev)
+                                      struct sl_insert_point *prev)
 {
-        struct ldlm_resource *res = lock->l_resource;
-        ENTRY;
+       struct ldlm_resource *res = lock->l_resource;
 
-        check_res_locked(res);
+       ENTRY;
 
-        ldlm_resource_dump(D_INFO, res);
-        LDLM_DEBUG(lock, "About to add lock:");
+       check_res_locked(res);
+
+       ldlm_resource_dump(D_INFO, res);
+       LDLM_DEBUG(lock, "About to add lock:");
 
        if (ldlm_is_destroyed(lock)) {
-                CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
-                return;
-        }
+               CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
+               return;
+       }
 
        LASSERT(list_empty(&lock->l_res_link));
        LASSERT(list_empty(&lock->l_sl_mode));
@@ -1086,7 +1108,7 @@ static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
        if (&lock->l_sl_policy != prev->policy_link)
                list_add(&lock->l_sl_policy, prev->policy_link);
 
-        EXIT;
+       EXIT;
 }
 
 /**
@@ -1115,27 +1137,29 @@ void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
  */
 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
 {
-        struct ldlm_resource *res = lock->l_resource;
-        ENTRY;
+       struct ldlm_resource *res = lock->l_resource;
+
+       ENTRY;
 
-        check_res_locked(res);
+       check_res_locked(res);
 
-        lock->l_granted_mode = lock->l_req_mode;
+       lock->l_granted_mode = lock->l_req_mode;
 
        if (work_list && lock->l_completion_ast != NULL)
                ldlm_add_ast_work_item(lock, NULL, work_list);
 
-        if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
-                ldlm_grant_lock_with_skiplist(lock);
-        else if (res->lr_type == LDLM_EXTENT)
-                ldlm_extent_add_lock(res, lock);
+       if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
+               ldlm_grant_lock_with_skiplist(lock);
+       else if (res->lr_type == LDLM_EXTENT)
+               ldlm_extent_add_lock(res, lock);
        else if (res->lr_type == LDLM_FLOCK) {
                /* We should not add locks to granted list in the following
                 * cases:
                 * - this is an UNLOCK but not a real lock;
                 * - this is a TEST lock;
                 * - this is a F_CANCELLK lock (async flock has req_mode == 0)
-                * - this is a deadlock (flock cannot be granted) */
+                * - this is a deadlock (flock cannot be granted)
+                */
                if (lock->l_req_mode == 0 ||
                    lock->l_req_mode == LCK_NL ||
                    ldlm_is_test_lock(lock) ||
@@ -1146,95 +1170,113 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
                LBUG();
        }
 
-        ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
-        EXIT;
+       ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
+       EXIT;
 }
 
 /**
  * Check if the given @lock meets the criteria for a match.
  * A reference on the lock is taken if matched.
  *
- * \param lock     test-against this lock
- * \param data    parameters
+ * @lock       test-against this lock
+ * @data       parameters
+ *
+ * RETURN      returns true if @lock matches @data, false otherwise
  */
-static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
+static bool lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
 {
        union ldlm_policy_data *lpol = &lock->l_policy_data;
        enum ldlm_mode match = LCK_MINMODE;
 
        if (lock == data->lmd_old)
-               return INTERVAL_ITER_STOP;
+               return true;
 
        /* Check if this lock can be matched.
-        * Used by LU-2919(exclusive open) for open lease lock */
+        * Used by LU-2919(exclusive open) for open lease lock
+        */
        if (ldlm_is_excl(lock))
-               return INTERVAL_ITER_CONT;
+               return false;
 
        /* llite sometimes wants to match locks that will be
         * canceled when their users drop, but we allow it to match
         * if it passes in CBPENDING and the lock still has users.
         * this is generally only going to be used by children
         * whose parents already hold a lock so forward progress
-        * can still happen. */
+        * can still happen.
+        */
        if (ldlm_is_cbpending(lock) &&
-           !(data->lmd_flags & LDLM_FL_CBPENDING))
-               return INTERVAL_ITER_CONT;
-       if (!data->lmd_unref && ldlm_is_cbpending(lock) &&
+           !(data->lmd_flags & LDLM_FL_CBPENDING) &&
+           !(data->lmd_match & LDLM_MATCH_GROUP))
+               return false;
+
+       if (!(data->lmd_match & (LDLM_MATCH_UNREF | LDLM_MATCH_GROUP)) &&
+           ldlm_is_cbpending(lock) &&
            lock->l_readers == 0 && lock->l_writers == 0)
-               return INTERVAL_ITER_CONT;
+               return false;
 
        if (!(lock->l_req_mode & *data->lmd_mode))
-               return INTERVAL_ITER_CONT;
+               return false;
 
        /* When we search for ast_data, we are not doing a traditional match,
         * so we don't worry about IBITS or extent matching.
         */
-       if (data->lmd_has_ast_data) {
+       if (data->lmd_match & (LDLM_MATCH_AST | LDLM_MATCH_AST_ANY)) {
                if (!lock->l_ast_data)
-                       return INTERVAL_ITER_CONT;
+                       return false;
 
-               goto matched;
+               if (data->lmd_match & LDLM_MATCH_AST_ANY)
+                       goto matched;
        }
 
        match = lock->l_req_mode;
 
        switch (lock->l_resource->lr_type) {
        case LDLM_EXTENT:
-               if (lpol->l_extent.start > data->lmd_policy->l_extent.start ||
-                   lpol->l_extent.end < data->lmd_policy->l_extent.end)
-                       return INTERVAL_ITER_CONT;
+               if (!(data->lmd_match & LDLM_MATCH_RIGHT) &&
+                   (lpol->l_extent.start > data->lmd_policy->l_extent.start ||
+                    lpol->l_extent.end < data->lmd_policy->l_extent.end))
+                       return false;
 
                if (unlikely(match == LCK_GROUP) &&
                    data->lmd_policy->l_extent.gid != LDLM_GID_ANY &&
                    lpol->l_extent.gid != data->lmd_policy->l_extent.gid)
-                       return INTERVAL_ITER_CONT;
+                       return false;
                break;
        case LDLM_IBITS:
-               /* We match if we have existing lock with same or wider set
-                  of bits. */
+               /* We match with existing lock with same or wider set of bits */
                if ((lpol->l_inodebits.bits &
                     data->lmd_policy->l_inodebits.bits) !=
                    data->lmd_policy->l_inodebits.bits)
-                       return INTERVAL_ITER_CONT;
+                       return false;
+
+               if (unlikely(match == LCK_GROUP) &&
+                   data->lmd_policy->l_inodebits.li_gid != LDLM_GID_ANY &&
+                   lpol->l_inodebits.li_gid !=
+                   data->lmd_policy->l_inodebits.li_gid)
+                       return false;
                break;
        default:
-               ;
+               break;
        }
 
-       /* We match if we have existing lock with same or wider set
-          of bits. */
-       if (!data->lmd_unref && LDLM_HAVE_MASK(lock, GONE))
-               return INTERVAL_ITER_CONT;
+       /* We match if we have existing lock with same or wider set of bits. */
+       if (!(data->lmd_match & LDLM_MATCH_UNREF) && LDLM_HAVE_MASK(lock, GONE))
+               return false;
 
        if (!equi(data->lmd_flags & LDLM_FL_LOCAL_ONLY, ldlm_is_local(lock)))
-               return INTERVAL_ITER_CONT;
+               return false;
 
        /* Filter locks by skipping flags */
        if (data->lmd_skip_flags & lock->l_flags)
-               return INTERVAL_ITER_CONT;
+               return false;
 
 matched:
-       if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
+       /**
+        * In case the lock is a CBPENDING grouplock, just pin it and return,
+        * we need to wait until it gets to DESTROYED.
+        */
+       if ((data->lmd_flags & LDLM_FL_TEST_LOCK) ||
+           (ldlm_is_cbpending(lock) && (data->lmd_match & LDLM_MATCH_GROUP))) {
                LDLM_LOCK_GET(lock);
                ldlm_lock_touch_in_lru(lock);
        } else {
@@ -1244,7 +1286,7 @@ matched:
        *data->lmd_mode = match;
        data->lmd_lock = lock;
 
-       return INTERVAL_ITER_STOP;
+       return true;
 }
 
 static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
@@ -1252,11 +1294,9 @@ static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
        struct ldlm_interval *node = to_ldlm_interval(in);
        struct ldlm_match_data *data = args;
        struct ldlm_lock *lock;
-       int rc;
 
        list_for_each_entry(lock, &node->li_group, l_sl_policy) {
-               rc = lock_matches(lock, data);
-               if (rc == INTERVAL_ITER_STOP)
+               if (lock_matches(lock, data))
                        return INTERVAL_ITER_STOP;
        }
        return INTERVAL_ITER_CONT;
@@ -1281,6 +1321,9 @@ struct ldlm_lock *search_itree(struct ldlm_resource *res,
 
        data->lmd_lock = NULL;
 
+       if (data->lmd_match & LDLM_MATCH_RIGHT)
+               ext.end = OBD_OBJECT_EOF;
+
        for (idx = 0; idx < LCK_MODE_NUM; idx++) {
                struct ldlm_interval_tree *tree = &res->lr_itree[idx];
 
@@ -1313,15 +1356,12 @@ static struct ldlm_lock *search_queue(struct list_head *queue,
                                      struct ldlm_match_data *data)
 {
        struct ldlm_lock *lock;
-       int rc;
 
        data->lmd_lock = NULL;
 
-       list_for_each_entry(lock, queue, l_res_link) {
-               rc = lock_matches(lock, data);
-               if (rc == INTERVAL_ITER_STOP)
+       list_for_each_entry(lock, queue, l_res_link)
+               if (lock_matches(lock, data))
                        return data->lmd_lock;
-       }
 
        return NULL;
 }
@@ -1330,16 +1370,16 @@ void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
 {
        if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
                lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
-               wake_up_all(&lock->l_waitq);
+               wake_up(&lock->l_waitq);
        }
 }
 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
 
 void ldlm_lock_fail_match(struct ldlm_lock *lock)
 {
-        lock_res_and_lock(lock);
-        ldlm_lock_fail_match_locked(lock);
-        unlock_res_and_lock(lock);
+       lock_res_and_lock(lock);
+       ldlm_lock_fail_match_locked(lock);
+       unlock_res_and_lock(lock);
 }
 
 /**
@@ -1352,7 +1392,7 @@ void ldlm_lock_fail_match(struct ldlm_lock *lock)
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
 {
        ldlm_set_lvb_ready(lock);
-       wake_up_all(&lock->l_waitq);
+       wake_up(&lock->l_waitq);
 }
 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
 
@@ -1362,9 +1402,9 @@ EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
  */
 void ldlm_lock_allow_match(struct ldlm_lock *lock)
 {
-        lock_res_and_lock(lock);
-        ldlm_lock_allow_match_locked(lock);
-        unlock_res_and_lock(lock);
+       lock_res_and_lock(lock);
+       ldlm_lock_allow_match_locked(lock);
+       unlock_res_and_lock(lock);
 }
 EXPORT_SYMBOL(ldlm_lock_allow_match);
 
@@ -1404,7 +1444,8 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                                         enum ldlm_type type,
                                         union ldlm_policy_data *policy,
                                         enum ldlm_mode mode,
-                                        struct lustre_handle *lockh, int unref)
+                                        struct lustre_handle *lockh,
+                                        enum ldlm_match_flags match_flags)
 {
        struct ldlm_match_data data = {
                .lmd_old = NULL,
@@ -1413,11 +1454,11 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                .lmd_policy = policy,
                .lmd_flags = flags,
                .lmd_skip_flags = skip_flags,
-               .lmd_unref = unref,
-               .lmd_has_ast_data = false,
+               .lmd_match = match_flags,
        };
        struct ldlm_resource *res;
        struct ldlm_lock *lock;
+       struct ldlm_lock *group_lock;
        int matched;
 
        ENTRY;
@@ -1432,12 +1473,14 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                *data.lmd_mode = data.lmd_old->l_req_mode;
        }
 
-       res = ldlm_resource_get(ns, NULL, res_id, type, 0);
+       res = ldlm_resource_get(ns, res_id, type, 0);
        if (IS_ERR(res)) {
                LASSERT(data.lmd_old == NULL);
                RETURN(0);
        }
 
+repeat:
+       group_lock = NULL;
        LDLM_RESOURCE_ADDREF(res);
        lock_res(res);
        if (res->lr_type == LDLM_EXTENT)
@@ -1447,8 +1490,19 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
        if (!lock && !(flags & LDLM_FL_BLOCK_GRANTED))
                lock = search_queue(&res->lr_waiting, &data);
        matched = lock ? mode : 0;
+
+       if (lock && ldlm_is_cbpending(lock) &&
+           (data.lmd_match & LDLM_MATCH_GROUP))
+               group_lock = lock;
        unlock_res(res);
        LDLM_RESOURCE_DELREF(res);
+
+       if (group_lock) {
+               l_wait_event_abortable(group_lock->l_waitq,
+                                      ldlm_is_destroyed(lock));
+               LDLM_LOCK_RELEASE(lock);
+               goto repeat;
+       }
        ldlm_resource_putref(res);
 
        if (lock) {
@@ -1516,6 +1570,7 @@ enum ldlm_mode ldlm_revalidate_lock_handle(const struct lustre_handle *lockh,
 {
        struct ldlm_lock *lock;
        enum ldlm_mode mode = 0;
+
        ENTRY;
 
        lock = ldlm_handle2lock(lockh);
@@ -1525,23 +1580,23 @@ enum ldlm_mode ldlm_revalidate_lock_handle(const struct lustre_handle *lockh,
                        GOTO(out, mode);
 
                if (ldlm_is_cbpending(lock) &&
-                    lock->l_readers == 0 && lock->l_writers == 0)
-                        GOTO(out, mode);
+                   lock->l_readers == 0 && lock->l_writers == 0)
+                       GOTO(out, mode);
 
-                if (bits)
-                        *bits = lock->l_policy_data.l_inodebits.bits;
-                mode = lock->l_granted_mode;
-                ldlm_lock_addref_internal_nolock(lock, mode);
-        }
+               if (bits)
+                       *bits = lock->l_policy_data.l_inodebits.bits;
+               mode = lock->l_granted_mode;
+               ldlm_lock_addref_internal_nolock(lock, mode);
+       }
 
-        EXIT;
+       EXIT;
 
 out:
-        if (lock != NULL) {
-                unlock_res_and_lock(lock);
-                LDLM_LOCK_PUT(lock);
-        }
-        return mode;
+       if (lock != NULL) {
+               unlock_res_and_lock(lock);
+               LDLM_LOCK_PUT(lock);
+       }
+       return mode;
 }
 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
 
@@ -1550,6 +1605,7 @@ int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
                  enum req_location loc, void *data, int size)
 {
        void *lvb;
+
        ENTRY;
 
        LASSERT(data != NULL);
@@ -1615,7 +1671,8 @@ int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
 
                        memcpy(data, lvb, size);
                } else {
-                       LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
+                       LDLM_ERROR(lock,
+                                  "Replied unexpected lquota LVB size %d",
                                   size);
                        RETURN(-EINVAL);
                }
@@ -1637,16 +1694,15 @@ int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
                break;
        default:
                LDLM_ERROR(lock, "Unknown LVB type: %d", lock->l_lvb_type);
-               libcfs_debug_dumpstack(NULL);
+               dump_stack();
                RETURN(-EINVAL);
        }
 
        RETURN(0);
 }
 
-/**
- * Create and fill in new LDLM lock with specified properties.
- * Returns a referenced lock
+/* Create and fill in new LDLM lock with specified properties.
+ * Returns: a referenced lock
  */
 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
                                   const struct ldlm_res_id *res_id,
@@ -1659,19 +1715,22 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
        struct ldlm_lock        *lock;
        struct ldlm_resource    *res;
        int                     rc;
+
        ENTRY;
 
-       res = ldlm_resource_get(ns, NULL, res_id, type, 1);
+       res = ldlm_resource_get(ns, res_id, type, 1);
        if (IS_ERR(res))
                RETURN(ERR_CAST(res));
 
        lock = ldlm_lock_new(res);
-       if (lock == NULL)
+       if (!lock) {
+               ldlm_resource_putref(res);
                RETURN(ERR_PTR(-ENOMEM));
+       }
 
        lock->l_req_mode = mode;
        lock->l_ast_data = data;
-       lock->l_pid = current_pid();
+       lock->l_pid = current->pid;
        if (ns_is_server(ns))
                ldlm_set_ns_srv(lock);
        if (cbs) {
@@ -1682,6 +1741,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
 
        switch (type) {
        case LDLM_EXTENT:
+       case LDLM_FLOCK:
                rc = ldlm_extent_alloc_lock(lock);
                break;
        case LDLM_IBITS:
@@ -1701,7 +1761,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
        }
 
        lock->l_lvb_type = lvb_type;
-       if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
+       if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
                GOTO(out, rc = -ENOENT);
 
        RETURN(lock);
@@ -1724,14 +1784,10 @@ static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock,
        ENTRY;
 
        policy = ldlm_get_processing_policy(res);
-restart:
        policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
        if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
-           res->lr_type != LDLM_FLOCK) {
+           res->lr_type != LDLM_FLOCK)
                rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list);
-               if (rc == -ERESTART)
-                       GOTO(restart, rc);
-       }
 
        if (!list_empty(&rpc_list))
                ldlm_discard_bl_list(&rpc_list);
@@ -1756,8 +1812,8 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
                                  void *cookie, __u64 *flags)
 {
        struct ldlm_lock *lock = *lockp;
-       struct ldlm_resource *res = lock->l_resource;
-       int local = ns_is_client(ldlm_res_to_ns(res));
+       struct ldlm_resource *res;
+       int local = ns_is_client(ns);
        enum ldlm_error rc = ELDLM_OK;
        struct ldlm_interval *node = NULL;
 #ifdef HAVE_SERVER_SUPPORT
@@ -1765,22 +1821,23 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
 #endif
        ENTRY;
 
-        /* policies are not executed on the client or during replay */
-        if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
-            && !local && ns->ns_policy) {
+       /* policies are not executed on the client or during replay */
+       if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
+           && !local && ns->ns_policy) {
                rc = ns->ns_policy(env, ns, lockp, cookie, lock->l_req_mode,
                                   *flags, NULL);
-                if (rc == ELDLM_LOCK_REPLACED) {
-                        /* The lock that was returned has already been granted,
-                         * and placed into lockp.  If it's not the same as the
-                         * one we passed in, then destroy the old one and our
-                         * work here is done. */
-                        if (lock != *lockp) {
-                                ldlm_lock_destroy(lock);
-                                LDLM_LOCK_RELEASE(lock);
-                        }
-                        *flags |= LDLM_FL_LOCK_CHANGED;
-                        RETURN(0);
+               if (rc == ELDLM_LOCK_REPLACED) {
+                       /* The lock that was returned has already been granted,
+                        * and placed into lockp.  If it's not the same as the
+                        * one we passed in, then destroy the old one and our
+                        * work here is done.
+                        */
+                       if (lock != *lockp) {
+                               ldlm_lock_destroy(lock);
+                               LDLM_LOCK_RELEASE(lock);
+                       }
+                       *flags |= LDLM_FL_LOCK_CHANGED;
+                       RETURN(0);
                } else if (rc != ELDLM_OK &&
                           ldlm_is_granted(lock)) {
                        LASSERT(*flags & LDLM_FL_RESENT);
@@ -1789,21 +1846,23 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
                         * error occurs. It is unclear if lock reached the
                         * client in the original reply, just leave the lock on
                         * server, not returning it again to client. Due to
-                        * LU-6529, the server will not OOM. */
+                        * LU-6529, the server will not OOM.
+                        */
                        RETURN(rc);
-                } else if (rc != ELDLM_OK ||
-                           (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
-                        ldlm_lock_destroy(lock);
-                        RETURN(rc);
-                }
-        }
+               } else if (rc != ELDLM_OK ||
+                          (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
+                       ldlm_lock_destroy(lock);
+                       RETURN(rc);
+               }
+       }
 
        if (*flags & LDLM_FL_RESENT) {
                /* Reconstruct LDLM_FL_SRV_ENQ_MASK @flags for reply.
                 * Set LOCK_CHANGED always.
                 * Check if the lock is granted for BLOCK_GRANTED.
                 * Take NO_TIMEOUT from the lock as it is inherited through
-                * LDLM_FL_INHERIT_MASK */
+                * LDLM_FL_INHERIT_MASK
+                */
                *flags |= LDLM_FL_LOCK_CHANGED;
                if (!ldlm_is_granted(lock))
                        *flags |= LDLM_FL_BLOCK_GRANTED;
@@ -1811,15 +1870,21 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
                RETURN(ELDLM_OK);
        }
 
+#ifdef HAVE_SERVER_SUPPORT
        /* For a replaying lock, it might be already in granted list. So
         * unlinking the lock will cause the interval node to be freed, we
         * have to allocate the interval node early otherwise we can't regrant
-        * this lock in the future. - jay */
-       if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
+        * this lock in the future. - jay
+        *
+        * The only time the ldlm_resource changes for the ldlm_lock is when
+        * ldlm_lock_change_resource() is called and that only happens for
+        * the Lustre client case.
+        */
+       if (!local && (*flags & LDLM_FL_REPLAY) &&
+           lock->l_resource->lr_type == LDLM_EXTENT)
                OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
 
-#ifdef HAVE_SERVER_SUPPORT
-       reconstruct = !local && res->lr_type == LDLM_FLOCK &&
+       reconstruct = !local && lock->l_resource->lr_type == LDLM_FLOCK &&
                      !(*flags & LDLM_FL_TEST_LOCK);
        if (reconstruct) {
                rc = req_can_reconstruct(cookie, NULL);
@@ -1829,39 +1894,56 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
                        RETURN(rc);
                }
        }
-#endif
 
-       lock_res_and_lock(lock);
+       if (!local && lock->l_resource->lr_type == LDLM_FLOCK) {
+               struct ldlm_flock_node *fn = &lock->l_resource->lr_flock_node;
+
+               if (lock->l_req_mode == LCK_NL) {
+                       atomic_inc(&fn->lfn_unlock_pending);
+                       res = lock_res_and_lock(lock);
+                       atomic_dec(&fn->lfn_unlock_pending);
+               } else {
+                       res  = lock_res_and_lock(lock);
+
+                       while (atomic_read(&fn->lfn_unlock_pending)) {
+                               unlock_res_and_lock(lock);
+                               cond_resched();
+                               lock_res_and_lock(lock);
+                       }
+               }
+       } else
+#endif
+       {
+               res = lock_res_and_lock(lock);
+       }
        if (local && ldlm_is_granted(lock)) {
-                /* The server returned a blocked lock, but it was granted
-                 * before we got a chance to actually enqueue it.  We don't
-                 * need to do anything else. */
-                *flags &= ~LDLM_FL_BLOCKED_MASK;
+               /* The server returned a blocked lock, but it was granted
+                * before we got a chance to actually enqueue it.  We don't
+                * need to do anything else.
+                */
+               *flags &= ~LDLM_FL_BLOCKED_MASK;
                GOTO(out, rc = ELDLM_OK);
-        }
+       }
 
-        ldlm_resource_unlink_lock(lock);
-        if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
-                if (node == NULL) {
-                        ldlm_lock_destroy_nolock(lock);
-                        GOTO(out, rc = -ENOMEM);
-                }
+       ldlm_resource_unlink_lock(lock);
+       if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
+               if (node == NULL) {
+                       ldlm_lock_destroy_nolock(lock);
+                       GOTO(out, rc = -ENOMEM);
+               }
 
                INIT_LIST_HEAD(&node->li_group);
-                ldlm_interval_attach(node, lock);
-                node = NULL;
-        }
+               ldlm_interval_attach(node, lock);
+               node = NULL;
+       }
 
        /* Some flags from the enqueue want to make it into the AST, via the
-        * lock's l_flags. */
+        * lock's l_flags.
+        */
        if (*flags & LDLM_FL_AST_DISCARD_DATA)
                ldlm_set_ast_discard_data(lock);
        if (*flags & LDLM_FL_TEST_LOCK)
                ldlm_set_test_lock(lock);
-       if (*flags & LDLM_FL_COS_INCOMPAT)
-               ldlm_set_cos_incompat(lock);
-       if (*flags & LDLM_FL_COS_ENABLED)
-               ldlm_set_cos_enabled(lock);
 
        /* This distinction between local lock trees is very important; a client
         * namespace only has information about locks taken by that client, and
@@ -1873,8 +1955,9 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
         * more or less trusting the clients not to lie.
         *
         * FIXME (bug 268): Detect obvious lies by checking compatibility in
-        * granted queue. */
-        if (local) {
+        * granted queue.
+        */
+       if (local) {
                if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
                        ldlm_resource_add_lock(res, &res->lr_waiting, lock);
                else
@@ -1895,15 +1978,14 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
        rc = ldlm_lock_enqueue_helper(lock, flags);
        GOTO(out, rc);
 #else
-        } else {
-                CERROR("This is client-side-only module, cannot handle "
-                       "LDLM_NAMESPACE_SERVER resource type lock.\n");
-                LBUG();
-        }
+       } else {
+               CERROR("This is client-side-only module, cannot handle LDLM_NAMESPACE_SERVER resource type lock.\n");
+               LBUG();
+       }
 #endif
 
 out:
-        unlock_res_and_lock(lock);
+       unlock_res_and_lock(lock);
 
 #ifdef HAVE_SERVER_SUPPORT
        if (reconstruct) {
@@ -1914,9 +1996,9 @@ out:
                                  req, 0, NULL, false, 0);
        }
 #endif
-        if (node)
-                OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
-        return rc;
+       if (node)
+               OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
+       return rc;
 }
 
 #ifdef HAVE_SERVER_SUPPORT
@@ -1928,8 +2010,7 @@ out:
  */
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                         struct list_head *work_list,
-                        enum ldlm_process_intention intention,
-                        struct ldlm_lock *hint)
+                        enum ldlm_process_intention intention, __u64 hint)
 {
        struct list_head *tmp, *pos;
        ldlm_processing_policy policy;
@@ -1954,9 +2035,9 @@ restart:
 
                pending = list_entry(tmp, struct ldlm_lock, l_res_link);
 
-                CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
+               CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
 
-                flags = 0;
+               flags = 0;
                rc = policy(pending, &flags, intention, &err, &rpc_list);
                if (pending->l_granted_mode == pending->l_req_mode ||
                    res->lr_type == LDLM_FLOCK) {
@@ -1971,7 +2052,7 @@ restart:
                if (rc != LDLM_ITER_CONTINUE &&
                    intention == LDLM_PROCESS_RESCAN)
                        break;
-        }
+       }
 
        if (!list_empty(&bl_ast_list)) {
                unlock_res(res);
@@ -1987,7 +2068,7 @@ restart:
        if (!list_empty(&bl_ast_list))
                ldlm_discard_bl_list(&bl_ast_list);
 
-        RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
+       RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
 }
 
 /**
@@ -1999,8 +2080,8 @@ restart:
  * \param[in] rpc_list         Conflicting locks list.
  *
  * \retval -ERESTART:  Some lock was instantly canceled while sending
- *                     blocking ASTs, caller needs to re-check conflicting
- *                     locks.
+ *                     blocking ASTs, caller needs to re-check conflicting
+ *                     locks.
  * \retval -EAGAIN:    Lock was destroyed, caller should return error.
  * \reval 0:           Lock is successfully added in waiting list.
  */
@@ -2009,6 +2090,7 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
 {
        struct ldlm_resource *res = lock->l_resource;
        int rc;
+
        ENTRY;
 
        check_res_locked(res);
@@ -2018,7 +2100,8 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
         *
         * bug 2322: we used to unlink and re-add here, which was a
         * terrible folly -- if we goto restart, we could get
-        * re-ordered!  Causes deadlock, because ASTs aren't sent! */
+        * re-ordered!  Causes deadlock, because ASTs aren't sent!
+        */
        if (list_empty(&lock->l_res_link))
                ldlm_resource_add_lock(res, &res->lr_waiting, lock);
        unlock_res(res);
@@ -2026,17 +2109,21 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
        rc = ldlm_run_ast_work(ldlm_res_to_ns(res), rpc_list,
                               LDLM_WORK_BL_AST);
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
+       if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
            !ns_is_client(ldlm_res_to_ns(res)))
                class_fail_export(lock->l_export);
 
+       if (rc == -ERESTART)
+               ldlm_reprocess_all(res, 0);
+
        lock_res(res);
        if (rc == -ERESTART) {
                /* 15715: The lock was granted and destroyed after
                 * resource lock was dropped. Interval node was freed
                 * in ldlm_lock_destroy. Anyway, this always happens
                 * when a client is being evicted. So it would be
-                * ok to return an error. -jay */
+                * ok to return an error. -jay
+                */
                if (ldlm_is_destroyed(lock))
                        RETURN(-EAGAIN);
 
@@ -2047,12 +2134,11 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
                         * to restart and ldlm_resource_unlink will be
                         * called and it causes the interval node to be
                         * freed. Then we will fail at
-                        * ldlm_extent_add_lock() */
+                        * ldlm_extent_add_lock()
+                        */
                        *flags &= ~LDLM_FL_BLOCKED_MASK;
-                       RETURN(0);
                }
 
-               RETURN(rc);
        }
        *flags |= LDLM_FL_BLOCK_GRANTED;
 
@@ -2082,9 +2168,7 @@ void ldlm_discard_bl_list(struct list_head *bl_list)
        EXIT;
 }
 
-/**
- * Process a call to blocking AST callback for a lock in ast_work list
- */
+/* Process a call to blocking AST callback for a lock in ast_work list */
 static int
 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 {
@@ -2099,7 +2183,7 @@ ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        if (list_empty(arg->list))
                RETURN(-ENOENT);
 
-       lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
+       lock = list_first_entry(arg->list, struct ldlm_lock, l_bl_ast);
 
        /* nobody should touch l_bl_ast but some locks in the list may become
         * granted after lock convert or COS downgrade, these locks should be
@@ -2133,7 +2217,17 @@ ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
         */
        bld.bl_same_client = lock->l_client_cookie ==
                             lock->l_blocking_lock->l_client_cookie;
-       bld.bl_cos_incompat = ldlm_is_cos_incompat(lock->l_blocking_lock);
+       /* if two locks are initiated from the same MDT, transactions are
+        * independent, or the request lock mode isn't EX|PW, no need to trigger
+        * CoS because current lock will be downgraded to TXN mode soon, then
+        * the blocking lock can be granted.
+        */
+       if (lock->l_blocking_lock->l_policy_data.l_inodebits.li_initiator_id ==
+               lock->l_policy_data.l_inodebits.li_initiator_id ||
+           !(lock->l_blocking_lock->l_req_mode & (LCK_EX | LCK_PW)))
+               bld.bl_txn_dependent = false;
+       else
+               bld.bl_txn_dependent = true;
        arg->bl_desc = &bld;
 
        LASSERT(ldlm_is_ast_sent(lock));
@@ -2149,9 +2243,7 @@ ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        RETURN(rc);
 }
 
-/**
- * Process a call to revocation AST callback for a lock in ast_work list
- */
+/* Process a call to revocation AST callback for a lock in ast_work list */
 static int
 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 {
@@ -2159,12 +2251,13 @@ ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        struct ldlm_lock_desc   desc;
        int                     rc;
        struct ldlm_lock       *lock;
+
        ENTRY;
 
        if (list_empty(arg->list))
                RETURN(-ENOENT);
 
-       lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
+       lock = list_first_entry(arg->list, struct ldlm_lock, l_rk_ast);
        list_del_init(&lock->l_rk_ast);
 
        /* the desc just pretend to exclusive */
@@ -2172,28 +2265,27 @@ ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        desc.l_req_mode = LCK_EX;
        desc.l_granted_mode = 0;
 
-       rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
+       rc = lock->l_blocking_ast(lock, &desc, (void *)arg, LDLM_CB_BLOCKING);
        LDLM_LOCK_RELEASE(lock);
 
        RETURN(rc);
 }
 
-/**
- * Process a call to glimpse AST callback for a lock in ast_work list
- */
+/* Process a call to glimpse AST callback for a lock in ast_work list */
 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 {
        struct ldlm_cb_set_arg          *arg = opaq;
        struct ldlm_glimpse_work        *gl_work;
        struct ldlm_lock                *lock;
        int                              rc = 0;
+
        ENTRY;
 
        if (list_empty(arg->list))
                RETURN(-ENOENT);
 
-       gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work,
-                                gl_list);
+       gl_work = list_first_entry(arg->list, struct ldlm_glimpse_work,
+                                  gl_list);
        list_del_init(&gl_work->gl_list);
 
        lock = gl_work->gl_lock;
@@ -2204,8 +2296,11 @@ int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        arg->gl_interpret_data = gl_work->gl_interpret_data;
 
        /* invoke the actual glimpse callback */
-       if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
-               rc = 1;
+       rc = lock->l_glimpse_ast(lock, (void *)arg);
+       if (rc == 0)
+               rc = 1; /* update LVB if this is server lock */
+       else if (rc == -ELDLM_NO_LOCK_DATA)
+               ldlm_lvbo_update(lock->l_resource, lock, NULL, 1);
 
        LDLM_LOCK_RELEASE(lock);
        if (gl_work->gl_flags & LDLM_GL_WORK_SLAB_ALLOCATED)
@@ -2217,9 +2312,7 @@ int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 }
 #endif
 
-/**
- * Process a call to completion AST callback for a lock in ast_work list
- */
+/* Process a call to completion AST callback for a lock in ast_work list */
 static int
 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 {
@@ -2233,7 +2326,7 @@ ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        if (list_empty(arg->list))
                RETURN(-ENOENT);
 
-       lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
+       lock = list_first_entry(arg->list, struct ldlm_lock, l_cp_ast);
 
        /* It's possible to receive a completion AST before we've set
         * the l_completion_ast pointer: either because the AST arrived
@@ -2244,14 +2337,16 @@ ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
         * This can't happen with the blocking_ast, however, because we
         * will never call the local blocking_ast until we drop our
         * reader/writer reference, which we won't do until we get the
-        * reply and finish enqueueing. */
+        * reply and finish enqueueing.
+        */
 
        /* nobody should touch l_cp_ast */
        lock_res_and_lock(lock);
        list_del_init(&lock->l_cp_ast);
        LASSERT(ldlm_is_cp_reqd(lock));
        /* save l_completion_ast since it can be changed by
-        * mds_intent_policy(), see bug 14225 */
+        * mds_intent_policy(), see bug 14225
+        */
        completion_callback = lock->l_completion_ast;
        ldlm_clear_cp_reqd(lock);
        unlock_res_and_lock(lock);
@@ -2312,7 +2407,8 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
        /* We create a ptlrpc request set with flow control extension.
         * This request set will use the work_ast_lock function to produce new
         * requests and will send a new request each time one completes in order
-        * to keep the number of requests in flight to ns_max_parallel_ast */
+        * to keep the number of requests in flight to ns_max_parallel_ast
+        */
        arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
                                     work_ast_lock, arg);
        if (arg->set == NULL)
@@ -2338,7 +2434,7 @@ out:
  */
 static void __ldlm_reprocess_all(struct ldlm_resource *res,
                                 enum ldlm_process_intention intention,
-                                struct ldlm_lock *hint)
+                                __u64 hint)
 {
        LIST_HEAD(rpc_list);
 #ifdef HAVE_SERVER_SUPPORT
@@ -2371,21 +2467,21 @@ restart:
                               LDLM_WORK_CP_AST);
        if (rc == -ERESTART) {
                LASSERT(list_empty(&rpc_list));
+               hint = 0;
                goto restart;
        }
 #else
        ENTRY;
 
        if (!ns_is_client(ldlm_res_to_ns(res))) {
-               CERROR("This is client-side-only module, cannot handle "
-                      "LDLM_NAMESPACE_SERVER resource type lock.\n");
+               CERROR("This is client-side-only module, cannot handle LDLM_NAMESPACE_SERVER resource type lock.\n");
                LBUG();
        }
 #endif
        EXIT;
 }
 
-void ldlm_reprocess_all(struct ldlm_resource *res, struct ldlm_lock *hint)
+void ldlm_reprocess_all(struct ldlm_resource *res, __u64 hint)
 {
        __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN, hint);
 }
@@ -2397,14 +2493,11 @@ static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
        struct ldlm_resource *res = cfs_hash_object(hs, hnode);
 
        /* This is only called once after recovery done. LU-8306. */
-       __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, NULL);
+       __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, 0);
        return 0;
 }
 
-/**
- * Iterate through all resources on a namespace attempting to grant waiting
- * locks.
- */
+/* Iterate on all resources on namespace attempting to grant waiting locks. */
 void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns)
 {
        ENTRY;
@@ -2416,10 +2509,7 @@ void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns)
        EXIT;
 }
 
-/**
- * Helper function to call blocking AST for LDLM lock \a lock in a
- * "cancelling" mode.
- */
+/* Helper to call blocking AST for LDLM lock \a lock in a "cancelling" mode. */
 void ldlm_cancel_callback(struct ldlm_lock *lock)
 {
        check_res_locked(lock->l_resource);
@@ -2436,94 +2526,88 @@ void ldlm_cancel_callback(struct ldlm_lock *lock)
 
                /* only canceller can set bl_done bit */
                ldlm_set_bl_done(lock);
-               wake_up_all(&lock->l_waitq);
+               wake_up(&lock->l_waitq);
        } else if (!ldlm_is_bl_done(lock)) {
-               /* The lock is guaranteed to have been canceled once
-                * returning from this function. */
+               /* lock is guaranteed to be canceled returning from function. */
                unlock_res_and_lock(lock);
                wait_event_idle(lock->l_waitq, is_bl_done(lock));
                lock_res_and_lock(lock);
        }
 }
 
-/**
- * Remove skiplist-enabled LDLM lock \a req from granted list
- */
+/* Remove skiplist-enabled LDLM lock \a req from granted list */
 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
 {
-        if (req->l_resource->lr_type != LDLM_PLAIN &&
-            req->l_resource->lr_type != LDLM_IBITS)
-                return;
+       if (req->l_resource->lr_type != LDLM_PLAIN &&
+           req->l_resource->lr_type != LDLM_IBITS)
+               return;
 
        list_del_init(&req->l_sl_policy);
        list_del_init(&req->l_sl_mode);
 }
 
-/**
- * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
- */
+/* Attempts to cancel LDLM lock \a lock that has no reader/writer references. */
 void ldlm_lock_cancel(struct ldlm_lock *lock)
 {
-        struct ldlm_resource *res;
-        struct ldlm_namespace *ns;
-        ENTRY;
+       struct ldlm_resource *res;
+       struct ldlm_namespace *ns;
 
-        lock_res_and_lock(lock);
+       ENTRY;
 
-        res = lock->l_resource;
-        ns  = ldlm_res_to_ns(res);
+       lock_res_and_lock(lock);
 
-        /* Please do not, no matter how tempting, remove this LBUG without
-         * talking to me first. -phik */
-        if (lock->l_readers || lock->l_writers) {
-                LDLM_ERROR(lock, "lock still has references");
+       res = lock->l_resource;
+       ns  = ldlm_res_to_ns(res);
+
+       /* Please do not remove this LBUG without talking to me first. -phik */
+       if (lock->l_readers || lock->l_writers) {
+               LDLM_ERROR(lock, "lock still has references");
                unlock_res_and_lock(lock);
-                LBUG();
-        }
+               LBUG();
+       }
 
        if (ldlm_is_waited(lock))
                ldlm_del_waiting_lock(lock);
 
-        /* Releases cancel callback. */
-        ldlm_cancel_callback(lock);
+       /* Releases cancel callback. */
+       ldlm_cancel_callback(lock);
 
        /* Yes, second time, just in case it was added again while we were
-        * running with no res lock in ldlm_cancel_callback */
+        * running with no res lock in ldlm_cancel_callback
+        */
        if (ldlm_is_waited(lock))
                ldlm_del_waiting_lock(lock);
 
-        ldlm_resource_unlink_lock(lock);
-        ldlm_lock_destroy_nolock(lock);
+       ldlm_resource_unlink_lock(lock);
+       ldlm_lock_destroy_nolock(lock);
 
        if (ldlm_is_granted(lock))
                ldlm_pool_del(&ns->ns_pool, lock);
 
-        /* Make sure we will not be called again for same lock what is possible
-         * if not to zero out lock->l_granted_mode */
-        lock->l_granted_mode = LCK_MINMODE;
-        unlock_res_and_lock(lock);
+       /* should not be called again for same lock(zero out l_granted_mode) */
+       lock->l_granted_mode = LCK_MINMODE;
+       unlock_res_and_lock(lock);
 
-        EXIT;
+       EXIT;
 }
 EXPORT_SYMBOL(ldlm_lock_cancel);
 
-/**
- * Set opaque data into the lock that only makes sense to upper layer.
- */
+/* Set opaque data into the lock that only makes sense to upper layer. */
 int ldlm_lock_set_data(const struct lustre_handle *lockh, void *data)
 {
-        struct ldlm_lock *lock = ldlm_handle2lock(lockh);
-        int rc = -EINVAL;
-        ENTRY;
+       struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+       int rc = -EINVAL;
 
-        if (lock) {
-                if (lock->l_ast_data == NULL)
-                        lock->l_ast_data = data;
-                if (lock->l_ast_data == data)
-                        rc = 0;
-                LDLM_LOCK_PUT(lock);
-        }
-        RETURN(rc);
+       ENTRY;
+
+       if (lock) {
+               if (lock->l_ast_data == NULL)
+                       lock->l_ast_data = data;
+               if (lock->l_ast_data == data)
+                       rc = 0;
+               LDLM_LOCK_PUT(lock);
+       }
+       RETURN(rc);
 }
 EXPORT_SYMBOL(ldlm_lock_set_data);
 
@@ -2544,7 +2628,7 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp,
        ldlm_lvbo_update(res, lock, NULL, 1);
        ldlm_lock_cancel(lock);
        if (!exp->exp_obd->obd_stopping)
-               ldlm_reprocess_all(res, lock);
+               ldlm_reprocess_all(res, lock->l_policy_data.l_inodebits.bits);
        ldlm_resource_putref(res);
 
        ecl->ecl_loop++;
@@ -2598,8 +2682,8 @@ int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
 
                spin_lock_bh(&exp->exp_bl_list_lock);
                if (!list_empty(&exp->exp_bl_list)) {
-                       lock = list_entry(exp->exp_bl_list.next,
-                                         struct ldlm_lock, l_exp_list);
+                       lock = list_first_entry(&exp->exp_bl_list,
+                                               struct ldlm_lock, l_exp_list);
                        LDLM_LOCK_GET(lock);
                        list_del_init(&lock->l_exp_list);
                } else {
@@ -2616,9 +2700,9 @@ int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
 
        lu_env_fini(&env);
 
-       CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
-              "left on hash table %d.\n", exp, ecl.ecl_loop,
-              atomic_read(&exp->exp_lock_hash->hs_count));
+       CDEBUG(D_DLMTRACE,
+              "Export %p, canceled %d locks, left on hash table %d.\n", exp,
+              ecl.ecl_loop, atomic_read(&exp->exp_lock_hash->hs_count));
 
        return ecl.ecl_loop;
 }
@@ -2644,9 +2728,9 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
        cfs_hash_for_each_empty(exp->exp_lock_hash,
                                ldlm_cancel_locks_for_export_cb, &ecl);
 
-       CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
-              "left on hash table %d.\n", exp, ecl.ecl_loop,
-              atomic_read(&exp->exp_lock_hash->hs_count));
+       CDEBUG(D_DLMTRACE,
+              "Export %p, canceled %d locks, left on hash table %d.\n", exp,
+              ecl.ecl_loop, atomic_read(&exp->exp_lock_hash->hs_count));
 
        if (ecl.ecl_loop > 0 &&
            atomic_read(&exp->exp_lock_hash->hs_count) == 0 &&
@@ -2659,7 +2743,7 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
 }
 
 /**
- * Downgrade an PW/EX lock to COS | CR mode.
+ * Downgrade an PW/EX lock to COS, TXN or CR mode.
  *
  * A lock mode convertion from PW/EX mode to less conflict mode. The
  * convertion may fail if lock was canceled before downgrade, but it doesn't
@@ -2671,6 +2755,8 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
  * things are cleared, so any pending or new blocked lock on that lock will
  * cause new call to blocking_ast and force resource object commit.
  *
+ * Used by DNE to force commit upon operation dependency.
+ *
  * Also used by layout_change to replace EX lock to CR lock.
  *
  * \param lock A lock to convert
@@ -2681,7 +2767,8 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
 #ifdef HAVE_SERVER_SUPPORT
        ENTRY;
 
-       LASSERT(new_mode == LCK_COS || new_mode == LCK_CR);
+       LASSERT(new_mode == LCK_COS || new_mode == LCK_TXN ||
+               new_mode == LCK_CR);
 
        lock_res_and_lock(lock);
 
@@ -2709,7 +2796,8 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
        ldlm_grant_lock(lock, NULL);
        unlock_res_and_lock(lock);
 
-       ldlm_reprocess_all(lock->l_resource, lock);
+       ldlm_reprocess_all(lock->l_resource,
+                          lock->l_policy_data.l_inodebits.bits);
 
        EXIT;
 #endif
@@ -2723,18 +2811,18 @@ EXPORT_SYMBOL(ldlm_lock_mode_downgrade);
  */
 void ldlm_lock_dump_handle(int level, const struct lustre_handle *lockh)
 {
-        struct ldlm_lock *lock;
+       struct ldlm_lock *lock;
 
-        if (!((libcfs_debug | D_ERROR) & level))
-                return;
+       if (!((libcfs_debug | D_ERROR) & level))
+               return;
 
-        lock = ldlm_handle2lock(lockh);
-        if (lock == NULL)
-                return;
+       lock = ldlm_handle2lock(lockh);
+       if (lock == NULL)
+               return;
 
-        LDLM_DEBUG_LIMIT(level, lock, "###");
+       LDLM_DEBUG_LIMIT(level, lock, "###");
 
-        LDLM_LOCK_PUT(lock);
+       LDLM_LOCK_PUT(lock);
 }
 EXPORT_SYMBOL(ldlm_lock_dump_handle);
 
@@ -2743,37 +2831,34 @@ EXPORT_SYMBOL(ldlm_lock_dump_handle);
  * Helper function.
  */
 void _ldlm_lock_debug(struct ldlm_lock *lock,
-                      struct libcfs_debug_msg_data *msgdata,
-                      const char *fmt, ...)
+                     struct libcfs_debug_msg_data *msgdata,
+                     const char *fmt, ...)
 {
-        va_list args;
-        struct obd_export *exp = lock->l_export;
+       va_list args;
+       struct obd_export *exp = lock->l_export;
        struct ldlm_resource *resource = NULL;
        struct va_format vaf;
-        char *nid = "local";
+       char *nid = "local";
 
-       /* on server-side resource of lock doesn't change */
-       if ((lock->l_flags & LDLM_FL_NS_SRV) != 0) {
-               if (lock->l_resource != NULL)
-                       resource = ldlm_resource_getref(lock->l_resource);
-       } else if (spin_trylock(&lock->l_lock)) {
-               if (lock->l_resource != NULL)
-                       resource = ldlm_resource_getref(lock->l_resource);
-               spin_unlock(&lock->l_lock);
-       }
+       rcu_read_lock();
+       resource = rcu_dereference(lock->l_resource);
+       if (resource && !refcount_inc_not_zero(&resource->lr_refcount))
+               resource = NULL;
+       rcu_read_unlock();
 
-        va_start(args, fmt);
+       va_start(args, fmt);
        vaf.fmt = fmt;
        vaf.va = &args;
 
-        if (exp && exp->exp_connection) {
+       if (exp && exp->exp_connection) {
                nid = obd_export_nid2str(exp);
-        } else if (exp && exp->exp_obd != NULL) {
-                struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
+       } else if (exp && exp->exp_obd != NULL) {
+               struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
+
                nid = obd_import_nid2str(imp);
-        }
+       }
 
-        if (resource == NULL) {
+       if (resource == NULL) {
                libcfs_debug_msg(msgdata,
                                 "%pV ns: \?\? lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: \?\? rrc=\?\? type: \?\?\? flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
                                 &vaf,
@@ -2786,16 +2871,16 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
                                 lock->l_flags, nid,
                                 lock->l_remote_handle.cookie,
                                 exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
-                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_pid, lock->l_callback_timestamp,
                                 lock->l_lvb_type);
-                va_end(args);
-                return;
-        }
+               va_end(args);
+               return;
+       }
 
        switch (resource->lr_type) {
        case LDLM_EXTENT:
                libcfs_debug_msg(msgdata,
-                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s [%llu->%llu] (req %llu->%llu) flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s [%llu->%llu] (req %llu->%llu) gid %llu flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
                                 &vaf,
                                 ldlm_lock_to_ns_name(lock), lock,
                                 lock->l_handle.h_cookie,
@@ -2804,15 +2889,16 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
                                 ldlm_lockname[lock->l_granted_mode],
                                 ldlm_lockname[lock->l_req_mode],
                                 PLDLMRES(resource),
-                                atomic_read(&resource->lr_refcount),
+                                refcount_read(&resource->lr_refcount),
                                 ldlm_typename[resource->lr_type],
                                 lock->l_policy_data.l_extent.start,
                                 lock->l_policy_data.l_extent.end,
                                 lock->l_req_extent.start, lock->l_req_extent.end,
+                                lock->l_req_extent.gid,
                                 lock->l_flags, nid,
                                 lock->l_remote_handle.cookie,
                                 exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
-                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_pid, lock->l_callback_timestamp,
                                 lock->l_lvb_type);
                break;
 
@@ -2827,7 +2913,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
                                 ldlm_lockname[lock->l_granted_mode],
                                 ldlm_lockname[lock->l_req_mode],
                                 PLDLMRES(resource),
-                                atomic_read(&resource->lr_refcount),
+                                refcount_read(&resource->lr_refcount),
                                 ldlm_typename[resource->lr_type],
                                 lock->l_policy_data.l_flock.pid,
                                 lock->l_policy_data.l_flock.start,
@@ -2835,12 +2921,30 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
                                 lock->l_flags, nid,
                                 lock->l_remote_handle.cookie,
                                 exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
-                                lock->l_pid, lock->l_callback_timeout);
+                                lock->l_pid, lock->l_callback_timestamp);
                break;
 
        case LDLM_IBITS:
-               libcfs_debug_msg(msgdata,
-                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx/%#llx rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+               if (!lock->l_remote_handle.cookie)
+                       libcfs_debug_msg(msgdata,
+                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx/%#llx rrc: %d type: %s flags: %#llx pid: %u initiator: MDT%d\n",
+                                &vaf,
+                                ldlm_lock_to_ns_name(lock),
+                                lock, lock->l_handle.h_cookie,
+                                refcount_read(&lock->l_handle.h_ref),
+                                lock->l_readers, lock->l_writers,
+                                ldlm_lockname[lock->l_granted_mode],
+                                ldlm_lockname[lock->l_req_mode],
+                                PLDLMRES(resource),
+                                lock->l_policy_data.l_inodebits.bits,
+                                lock->l_policy_data.l_inodebits.try_bits,
+                                refcount_read(&resource->lr_refcount),
+                                ldlm_typename[resource->lr_type],
+                                lock->l_flags, lock->l_pid,
+                                lock->l_policy_data.l_inodebits.li_initiator_id);
+               else
+                       libcfs_debug_msg(msgdata,
+                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx/%#llx rrc: %d type: %s gid %llu flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
                                 &vaf,
                                 ldlm_lock_to_ns_name(lock),
                                 lock, lock->l_handle.h_cookie,
@@ -2851,12 +2955,13 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
                                 PLDLMRES(resource),
                                 lock->l_policy_data.l_inodebits.bits,
                                 lock->l_policy_data.l_inodebits.try_bits,
-                                atomic_read(&resource->lr_refcount),
+                                refcount_read(&resource->lr_refcount),
                                 ldlm_typename[resource->lr_type],
+                                lock->l_policy_data.l_inodebits.li_gid,
                                 lock->l_flags, nid,
                                 lock->l_remote_handle.cookie,
                                 exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
-                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_pid, lock->l_callback_timestamp,
                                 lock->l_lvb_type);
                break;
 
@@ -2871,12 +2976,12 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
                                 ldlm_lockname[lock->l_granted_mode],
                                 ldlm_lockname[lock->l_req_mode],
                                 PLDLMRES(resource),
-                                atomic_read(&resource->lr_refcount),
+                                refcount_read(&resource->lr_refcount),
                                 ldlm_typename[resource->lr_type],
                                 lock->l_flags, nid,
                                 lock->l_remote_handle.cookie,
                                 exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
-                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_pid, lock->l_callback_timestamp,
                                 lock->l_lvb_type);
                break;
        }