Whamcloud - gitweb
LU-8272 ldlm: Use interval tree to update kms
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
index ab457c7..6975201 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2010, 2012, Intel Corporation.
+ * Copyright (c) 2010, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  */
 
 #define DEBUG_SUBSYSTEM S_LDLM
-#ifndef __KERNEL__
-# include <liblustre.h>
-#else
-# include <libcfs/libcfs.h>
-#endif
 
+#include <libcfs/libcfs.h>
 #include <lustre_dlm.h>
 #include <obd_support.h>
 #include <obd.h>
  * overly wide locks.
  */
 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
-                                              struct ldlm_extent *new_ex,
-                                              int conflicting)
+                                             struct ldlm_extent *new_ex,
+                                             int conflicting)
 {
-        ldlm_mode_t req_mode = req->l_req_mode;
-        __u64 req_start = req->l_req_extent.start;
-        __u64 req_end = req->l_req_extent.end;
-        __u64 req_align, mask;
+       enum ldlm_mode req_mode = req->l_req_mode;
+       __u64 req_start = req->l_req_extent.start;
+       __u64 req_end = req->l_req_extent.end;
+       __u64 req_align, mask;
 
         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
@@ -98,7 +90,7 @@ static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
          * the client requested. Also we need to make sure it's also server
          * page size aligned otherwise a server page can be covered by two
          * write locks. */
-       mask = PAGE_CACHE_SIZE;
+       mask = PAGE_SIZE;
         req_align = (req_end + 1) | req_start;
         if (req_align != 0 && (req_align & (mask - 1)) == 0) {
                 while ((req_align & mask) == 0)
@@ -111,10 +103,10 @@ static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
         new_ex->start = ((new_ex->start - 1) | mask) + 1;
         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
         LASSERTF(new_ex->start <= req_start,
-                 "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
+                "mask %#llx grant start %llu req start %llu\n",
                  mask, new_ex->start, req_start);
         LASSERTF(new_ex->end >= req_end,
-                 "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
+                "mask %#llx grant end %llu req end %llu\n",
                  mask, new_ex->end, req_end);
 }
 
@@ -133,17 +125,17 @@ static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
                                                 struct ldlm_extent *new_ex)
 {
-        struct ldlm_resource *res = req->l_resource;
-        ldlm_mode_t req_mode = req->l_req_mode;
-        __u64 req_start = req->l_req_extent.start;
-        __u64 req_end = req->l_req_extent.end;
-        struct ldlm_interval_tree *tree;
-        struct interval_node_extent limiter = { new_ex->start, new_ex->end };
-        int conflicting = 0;
-        int idx;
-        ENTRY;
-
-        lockmode_verify(req_mode);
+       struct ldlm_resource *res = req->l_resource;
+       enum ldlm_mode req_mode = req->l_req_mode;
+       __u64 req_start = req->l_req_extent.start;
+       __u64 req_end = req->l_req_extent.end;
+       struct ldlm_interval_tree *tree;
+       struct interval_node_extent limiter = { new_ex->start, new_ex->end };
+       int conflicting = 0;
+       int idx;
+       ENTRY;
+
+       lockmode_verify(req_mode);
 
        /* Using interval tree to handle the LDLM extent granted locks. */
         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
@@ -187,29 +179,25 @@ static void
 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
                                     struct ldlm_extent *new_ex)
 {
-        cfs_list_t *tmp;
-        struct ldlm_resource *res = req->l_resource;
-        ldlm_mode_t req_mode = req->l_req_mode;
-        __u64 req_start = req->l_req_extent.start;
-        __u64 req_end = req->l_req_extent.end;
-        int conflicting = 0;
-        ENTRY;
-
-        lockmode_verify(req_mode);
-
-        /* for waiting locks */
-        cfs_list_for_each(tmp, &res->lr_waiting) {
-                struct ldlm_lock *lock;
-                struct ldlm_extent *l_extent;
-
-                lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
-                l_extent = &lock->l_policy_data.l_extent;
-
-                /* We already hit the minimum requested size, search no more */
-                if (new_ex->start == req_start && new_ex->end == req_end) {
-                        EXIT;
-                        return;
-                }
+       struct ldlm_resource *res = req->l_resource;
+       enum ldlm_mode req_mode = req->l_req_mode;
+       __u64 req_start = req->l_req_extent.start;
+       __u64 req_end = req->l_req_extent.end;
+       struct ldlm_lock *lock;
+       int conflicting = 0;
+       ENTRY;
+
+       lockmode_verify(req_mode);
+
+       /* for waiting locks */
+       list_for_each_entry(lock, &res->lr_waiting, l_res_link) {
+               struct ldlm_extent *l_extent = &lock->l_policy_data.l_extent;
+
+               /* We already hit the minimum requested size, search no more */
+               if (new_ex->start == req_start && new_ex->end == req_end) {
+                       EXIT;
+                       return;
+               }
 
                 /* Don't conflict with ourselves */
                 if (req == lock)
@@ -305,49 +293,50 @@ static void ldlm_extent_policy(struct ldlm_resource *res,
 
 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
 {
-        struct ldlm_resource *res = lock->l_resource;
-        cfs_time_t now = cfs_time_current();
+       struct ldlm_resource *res = lock->l_resource;
+       cfs_time_t now = cfs_time_current();
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
-                return 1;
+       if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
+               return 1;
 
-        CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
-        if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
-                res->lr_contention_time = now;
-        return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
-                cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
+       CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
+       if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
+               res->lr_contention_time = now;
+       return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
+               cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
 }
 
 struct ldlm_extent_compat_args {
-        cfs_list_t *work_list;
-        struct ldlm_lock *lock;
-        ldlm_mode_t mode;
-        int *locks;
-        int *compat;
+       struct list_head *work_list;
+       struct ldlm_lock *lock;
+       enum ldlm_mode mode;
+       int *locks;
+       int *compat;
 };
 
 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
-                                                void *data)
+                                               void *data)
 {
-        struct ldlm_extent_compat_args *priv = data;
-        struct ldlm_interval *node = to_ldlm_interval(n);
-        struct ldlm_extent *extent;
-        cfs_list_t *work_list = priv->work_list;
-        struct ldlm_lock *lock, *enq = priv->lock;
-        ldlm_mode_t mode = priv->mode;
-        int count = 0;
-        ENTRY;
-
-        LASSERT(!cfs_list_empty(&node->li_group));
-
-        cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
+       struct ldlm_extent_compat_args *priv = data;
+       struct ldlm_interval *node = to_ldlm_interval(n);
+       struct ldlm_extent *extent;
+       struct list_head *work_list = priv->work_list;
+       struct ldlm_lock *lock, *enq = priv->lock;
+       enum ldlm_mode mode = priv->mode;
+       int count = 0;
+       ENTRY;
+
+       LASSERT(!list_empty(&node->li_group));
+
+       list_for_each_entry(lock, &node->li_group, l_sl_policy) {
                 /* interval tree is for granted lock */
                 LASSERTF(mode == lock->l_granted_mode,
                          "mode = %s, lock->l_granted_mode = %s\n",
                          ldlm_lockname[mode],
                          ldlm_lockname[lock->l_granted_mode]);
                 count++;
-                if (lock->l_blocking_ast)
+               if (lock->l_blocking_ast &&
+                   lock->l_granted_mode != LCK_GROUP)
                         ldlm_add_ast_work_item(lock, enq, work_list);
         }
 
@@ -376,20 +365,19 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
  * \retval negative error, such as EWOULDBLOCK for group locks
  */
 static int
-ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
-                        __u64 *flags, ldlm_error_t *err,
-                        cfs_list_t *work_list, int *contended_locks)
+ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
+                        __u64 *flags, enum ldlm_error *err,
+                        struct list_head *work_list, int *contended_locks)
 {
-        cfs_list_t *tmp;
-        struct ldlm_lock *lock;
-        struct ldlm_resource *res = req->l_resource;
-        ldlm_mode_t req_mode = req->l_req_mode;
-        __u64 req_start = req->l_req_extent.start;
-        __u64 req_end = req->l_req_extent.end;
-        int compat = 1;
-        int scan = 0;
-        int check_contention;
-        ENTRY;
+       struct ldlm_resource *res = req->l_resource;
+       enum ldlm_mode req_mode = req->l_req_mode;
+       __u64 req_start = req->l_req_extent.start;
+       __u64 req_end = req->l_req_extent.end;
+       struct ldlm_lock *lock;
+       int check_contention;
+       int compat = 1;
+       int scan = 0;
+       ENTRY;
 
         lockmode_verify(req_mode);
 
@@ -451,17 +439,14 @@ ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
                         } else {
                                 interval_search(tree->lit_root, &ex,
                                                 ldlm_extent_compat_cb, &data);
-                                if (!cfs_list_empty(work_list) && compat)
+                               if (!list_empty(work_list) && compat)
                                         compat = 0;
                         }
                 }
         } else { /* for waiting queue */
-                cfs_list_for_each(tmp, queue) {
+               list_for_each_entry(lock, queue, l_res_link) {
                         check_contention = 1;
 
-                        lock = cfs_list_entry(tmp, struct ldlm_lock,
-                                              l_res_link);
-
                        /* We stop walking the queue if we hit ourselves so
                         * we don't take conflicting locks enqueued after us
                         * into account, or we'd wait forever. */
@@ -480,7 +465,7 @@ ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
                                          * front of first non-GROUP lock */
 
                                         ldlm_resource_insert_lock_after(lock, req);
-                                        cfs_list_del_init(&lock->l_res_link);
+                                       list_del_init(&lock->l_res_link);
                                         ldlm_resource_insert_lock_after(req, lock);
                                         compat = 0;
                                         break;
@@ -575,7 +560,7 @@ ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
                                            first non-GROUP lock */
 
                                         ldlm_resource_insert_lock_after(lock, req);
-                                        cfs_list_del_init(&lock->l_res_link);
+                                       list_del_init(&lock->l_res_link);
                                         ldlm_resource_insert_lock_after(req, lock);
                                         break;
                                 }
@@ -620,7 +605,8 @@ ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
                         *contended_locks += check_contention;
 
                         compat = 0;
-                        if (lock->l_blocking_ast)
+                       if (lock->l_blocking_ast &&
+                           lock->l_req_mode != LCK_GROUP)
                                 ldlm_add_ast_work_item(lock, req, work_list);
                 }
         }
@@ -635,28 +621,133 @@ ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
 
         RETURN(compat);
 destroylock:
-        cfs_list_del_init(&req->l_res_link);
+       list_del_init(&req->l_res_link);
         ldlm_lock_destroy_nolock(req);
         *err = compat;
         RETURN(compat);
 }
 
 /**
+ * This function refresh eviction timer for cancelled lock.
+ * \param[in] lock             ldlm lock for refresh
+ * \param[in] arg              ldlm prolong arguments, timeout, export, extent
+ *                             and counter are used
+ */
+void ldlm_lock_prolong_one(struct ldlm_lock *lock,
+                          struct ldlm_prolong_args *arg)
+{
+       int timeout;
+
+       if (arg->lpa_export != lock->l_export ||
+           lock->l_flags & LDLM_FL_DESTROYED)
+               /* ignore unrelated locks */
+               return;
+
+       arg->lpa_locks_cnt++;
+
+       if (!(lock->l_flags & LDLM_FL_AST_SENT))
+               /* ignore locks not being cancelled */
+               return;
+
+       /* We are in the middle of the process - BL AST is sent, CANCEL
+        * is ahead. Take half of BL AT + IO AT process time.
+        */
+       timeout = arg->lpa_timeout + (ldlm_bl_timeout(lock) >> 1);
+
+       LDLM_DEBUG(lock, "refreshed to %ds.\n", timeout);
+
+       arg->lpa_blocks_cnt++;
+
+       /* OK. this is a possible lock the user holds doing I/O
+        * let's refresh eviction timer for it.
+        */
+       ldlm_refresh_waiting_lock(lock, timeout);
+}
+EXPORT_SYMBOL(ldlm_lock_prolong_one);
+
+static enum interval_iter ldlm_resource_prolong_cb(struct interval_node *n,
+                                                  void *data)
+{
+       struct ldlm_prolong_args *arg = data;
+       struct ldlm_interval *node = to_ldlm_interval(n);
+       struct ldlm_lock *lock;
+
+       ENTRY;
+
+       LASSERT(!list_empty(&node->li_group));
+
+       list_for_each_entry(lock, &node->li_group, l_sl_policy) {
+               ldlm_lock_prolong_one(lock, arg);
+       }
+
+       RETURN(INTERVAL_ITER_CONT);
+}
+
+/**
+ * Walk through granted tree and prolong locks if they overlaps extent.
+ *
+ * \param[in] arg              prolong args
+ */
+void ldlm_resource_prolong(struct ldlm_prolong_args *arg)
+{
+       struct ldlm_interval_tree *tree;
+       struct ldlm_resource *res;
+       struct interval_node_extent ex = { .start = arg->lpa_extent.start,
+                                          .end = arg->lpa_extent.end };
+       int idx;
+
+       ENTRY;
+
+       res = ldlm_resource_get(arg->lpa_export->exp_obd->obd_namespace, NULL,
+                               &arg->lpa_resid, LDLM_EXTENT, 0);
+       if (IS_ERR(res)) {
+               CDEBUG(D_DLMTRACE, "Failed to get resource for resid %llu/%llu\n",
+                      arg->lpa_resid.name[0], arg->lpa_resid.name[1]);
+               RETURN_EXIT;
+       }
+
+       lock_res(res);
+       for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+               tree = &res->lr_itree[idx];
+               if (tree->lit_root == NULL) /* empty tree, skipped */
+                       continue;
+
+               /* There is no possibility to check for the groupID
+                * so all the group locks are considered as valid
+                * here, especially because the client is supposed
+                * to check it has such a lock before sending an RPC.
+                */
+               if (!(tree->lit_mode & arg->lpa_mode))
+                       continue;
+
+               interval_search(tree->lit_root, &ex,
+                               ldlm_resource_prolong_cb, arg);
+       }
+
+       unlock_res(res);
+       ldlm_resource_putref(res);
+
+       EXIT;
+}
+EXPORT_SYMBOL(ldlm_resource_prolong);
+
+
+/**
  * Discard all AST work items from list.
  *
  * If for whatever reason we do not want to send ASTs to conflicting locks
  * anymore, disassemble the list with this function.
  */
-static void discard_bl_list(cfs_list_t *bl_list)
+static void discard_bl_list(struct list_head *bl_list)
 {
-        cfs_list_t *tmp, *pos;
+       struct list_head *tmp, *pos;
         ENTRY;
 
-        cfs_list_for_each_safe(pos, tmp, bl_list) {
+       list_for_each_safe(pos, tmp, bl_list) {
                 struct ldlm_lock *lock =
-                        cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
+                       list_entry(pos, struct ldlm_lock, l_bl_ast);
 
-                cfs_list_del_init(&lock->l_bl_ast);
+               list_del_init(&lock->l_bl_ast);
                LASSERT(ldlm_is_ast_sent(lock));
                ldlm_clear_ast_sent(lock);
                 LASSERT(lock->l_bl_ast_run == 0);
@@ -684,20 +775,22 @@ static void discard_bl_list(cfs_list_t *bl_list)
  *     would be collected and ASTs sent.
  */
 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
-                            int first_enq, ldlm_error_t *err,
-                            cfs_list_t *work_list)
+                            int first_enq, enum ldlm_error *err,
+                            struct list_head *work_list)
 {
-        struct ldlm_resource *res = lock->l_resource;
-        CFS_LIST_HEAD(rpc_list);
-        int rc, rc2;
-        int contended_locks = 0;
-        ENTRY;
-
-        LASSERT(cfs_list_empty(&res->lr_converting));
-        LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
+       struct ldlm_resource *res = lock->l_resource;
+       struct list_head rpc_list;
+       int rc, rc2;
+       int contended_locks = 0;
+       ENTRY;
+
+       LASSERT(lock->l_granted_mode != lock->l_req_mode);
+       LASSERT(list_empty(&res->lr_converting));
+       LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
                !ldlm_is_ast_discard_data(lock));
-        check_res_locked(res);
-        *err = ELDLM_OK;
+       INIT_LIST_HEAD(&rpc_list);
+       check_res_locked(res);
+       *err = ELDLM_OK;
 
         if (!first_enq) {
                 /* Careful observers will note that we don't handle -EWOULDBLOCK
@@ -750,7 +843,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
                  * bug 2322: we used to unlink and re-add here, which was a
                  * terrible folly -- if we goto restart, we could get
                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
-                if (cfs_list_empty(&lock->l_res_link))
+               if (list_empty(&lock->l_res_link))
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
                 unlock_res(res);
                 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
@@ -784,7 +877,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
                                GOTO(out, rc = 0);
                        }
 
-                       GOTO(restart, -ERESTART);
+                       GOTO(restart, rc);
                }
 
                /* this way we force client to wait for the lock
@@ -794,7 +887,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
        }
        RETURN(0);
 out:
-       if (!cfs_list_empty(&rpc_list)) {
+       if (!list_empty(&rpc_list)) {
                LASSERT(!ldlm_is_ast_discard_data(lock));
                discard_bl_list(&rpc_list);
        }
@@ -802,64 +895,135 @@ out:
 }
 #endif /* HAVE_SERVER_SUPPORT */
 
+struct ldlm_kms_shift_args {
+       __u64   old_kms;
+       __u64   kms;
+       bool    complete;
+};
+
+/* Callback for interval_iterate functions, used by ldlm_extent_shift_Kms */
+static enum interval_iter ldlm_kms_shift_cb(struct interval_node *n,
+                                           void *args)
+{
+       struct ldlm_kms_shift_args *arg = args;
+       struct ldlm_interval *node = to_ldlm_interval(n);
+       struct ldlm_lock *tmplock;
+       struct ldlm_lock *lock = NULL;
+
+       ENTRY;
+
+       /* Since all locks in an interval have the same extent, we can just
+        * use the first lock without kms_ignore set. */
+       list_for_each_entry(tmplock, &node->li_group, l_sl_policy) {
+               if (ldlm_is_kms_ignore(tmplock))
+                       continue;
+
+               lock = tmplock;
+
+               break;
+       }
+
+       /* No locks in this interval without kms_ignore set */
+       if (!lock)
+               RETURN(INTERVAL_ITER_CONT);
+
+       /* If we find a lock with a greater or equal kms, we are not the
+        * highest lock (or we share that distinction with another lock), and
+        * don't need to update KMS.  Return old_kms and stop looking. */
+       if (lock->l_policy_data.l_extent.end >= arg->old_kms) {
+               arg->kms = arg->old_kms;
+               arg->complete = true;
+               RETURN(INTERVAL_ITER_STOP);
+       }
+
+       if (lock->l_policy_data.l_extent.end + 1 > arg->kms)
+               arg->kms = lock->l_policy_data.l_extent.end + 1;
+
+       /* Since interval_iterate_reverse starts with the highest lock and
+        * works down, for PW locks, we only need to check if we should update
+        * the kms, then stop walking the tree.  PR locks are not exclusive, so
+        * the highest start does not imply the highest end and we must
+        * continue. (Only one group lock is allowed per resource, so this is
+        * irrelevant for group locks.)*/
+       if (lock->l_granted_mode == LCK_PW)
+               RETURN(INTERVAL_ITER_STOP);
+       else
+               RETURN(INTERVAL_ITER_CONT);
+}
+
 /* When a lock is cancelled by a client, the KMS may undergo change if this
- * is the "highest lock".  This function returns the new KMS value.
+ * is the "highest lock".  This function returns the new KMS value, updating
+ * it only if we were the highest lock.
+ *
  * Caller must hold lr_lock already.
  *
  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
 {
-        struct ldlm_resource *res = lock->l_resource;
-        cfs_list_t *tmp;
-        struct ldlm_lock *lck;
-        __u64 kms = 0;
-        ENTRY;
+       struct ldlm_resource *res = lock->l_resource;
+       struct ldlm_interval_tree *tree;
+       struct ldlm_kms_shift_args args;
+       int idx = 0;
 
-        /* don't let another thread in ldlm_extent_shift_kms race in
-         * just after we finish and take our lock into account in its
-         * calculation of the kms */
-       ldlm_set_kms_ignore(lock);
+       ENTRY;
 
-        cfs_list_for_each(tmp, &res->lr_granted) {
-                lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
+       args.old_kms = old_kms;
+       args.kms = 0;
+       args.complete = false;
 
-               if (ldlm_is_kms_ignore(lck))
-                        continue;
+       /* don't let another thread in ldlm_extent_shift_kms race in
+        * just after we finish and take our lock into account in its
+        * calculation of the kms */
+       ldlm_set_kms_ignore(lock);
 
-                if (lck->l_policy_data.l_extent.end >= old_kms)
-                        RETURN(old_kms);
+       /* We iterate over the lock trees, looking for the largest kms smaller
+        * than the current one. */
+       for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+               tree = &res->lr_itree[idx];
+
+               /* If our already known kms is >= than the highest 'end' in
+                * this tree, we don't need to check this tree, because
+                * the kms from a tree can be lower than in_max_high (due to
+                * kms_ignore), but it can never be higher. */
+               if (!tree->lit_root || args.kms >= tree->lit_root->in_max_high)
+                       continue;
+
+               interval_iterate_reverse(tree->lit_root, ldlm_kms_shift_cb,
+                                        &args);
+
+               /* this tells us we're not the highest lock, so we don't need
+                * to check the remaining trees */
+               if (args.complete)
+                       break;
+       }
 
-                /* This extent _has_ to be smaller than old_kms (checked above)
-                 * so kms can only ever be smaller or the same as old_kms. */
-                if (lck->l_policy_data.l_extent.end + 1 > kms)
-                        kms = lck->l_policy_data.l_extent.end + 1;
-        }
-        LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
+       LASSERTF(args.kms <= args.old_kms, "kms %llu old_kms %llu\n", args.kms,
+                args.old_kms);
 
-        RETURN(kms);
+       RETURN(args.kms);
 }
 EXPORT_SYMBOL(ldlm_extent_shift_kms);
 
 struct kmem_cache *ldlm_interval_slab;
 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
 {
-        struct ldlm_interval *node;
-        ENTRY;
+       struct ldlm_interval *node;
+       ENTRY;
 
-        LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
-       OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, __GFP_IO);
-        if (node == NULL)
-                RETURN(NULL);
+       LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
+       OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
+       if (node == NULL)
+               RETURN(NULL);
 
-        CFS_INIT_LIST_HEAD(&node->li_group);
-        ldlm_interval_attach(node, lock);
-        RETURN(node);
+       INIT_LIST_HEAD(&node->li_group);
+       ldlm_interval_attach(node, lock);
+       RETURN(node);
 }
 
 void ldlm_interval_free(struct ldlm_interval *node)
 {
         if (node) {
-                LASSERT(cfs_list_empty(&node->li_group));
+               LASSERT(list_empty(&node->li_group));
                 LASSERT(!interval_is_intree(&node->li_node));
                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
         }
@@ -872,7 +1036,7 @@ void ldlm_interval_attach(struct ldlm_interval *n,
         LASSERT(l->l_tree_node == NULL);
         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
 
-        cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
+       list_add_tail(&l->l_sl_policy, &n->li_group);
         l->l_tree_node = n;
 }
 
@@ -883,22 +1047,23 @@ struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
         if (n == NULL)
                 return NULL;
 
-        LASSERT(!cfs_list_empty(&n->li_group));
+       LASSERT(!list_empty(&n->li_group));
         l->l_tree_node = NULL;
-        cfs_list_del_init(&l->l_sl_policy);
+       list_del_init(&l->l_sl_policy);
 
-        return (cfs_list_empty(&n->li_group) ? n : NULL);
+       return list_empty(&n->li_group) ? n : NULL;
 }
 
-static inline int lock_mode_to_index(ldlm_mode_t mode)
+static inline int ldlm_mode_to_index(enum ldlm_mode mode)
 {
-        int index;
-
-        LASSERT(mode != 0);
-        LASSERT(IS_PO2(mode));
-        for (index = -1; mode; index++, mode >>= 1) ;
-        LASSERT(index < LCK_MODE_NUM);
-        return index;
+       int index;
+
+       LASSERT(mode != 0);
+       LASSERT(IS_PO2(mode));
+       for (index = -1; mode != 0; index++, mode >>= 1)
+               /* do nothing */;
+       LASSERT(index < LCK_MODE_NUM);
+       return index;
 }
 
 /** Add newly granted lock into interval tree for the resource. */
@@ -908,7 +1073,7 @@ void ldlm_extent_add_lock(struct ldlm_resource *res,
         struct interval_node *found, **root;
         struct ldlm_interval *node;
         struct ldlm_extent *extent;
-        int idx;
+       int idx, rc;
 
         LASSERT(lock->l_granted_mode == lock->l_req_mode);
 
@@ -916,13 +1081,15 @@ void ldlm_extent_add_lock(struct ldlm_resource *res,
         LASSERT(node != NULL);
         LASSERT(!interval_is_intree(&node->li_node));
 
-        idx = lock_mode_to_index(lock->l_granted_mode);
-        LASSERT(lock->l_granted_mode == 1 << idx);
-        LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
+       idx = ldlm_mode_to_index(lock->l_granted_mode);
+       LASSERT(lock->l_granted_mode == 1 << idx);
+       LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
 
         /* node extent initialize */
         extent = &lock->l_policy_data.l_extent;
-        interval_set(&node->li_node, extent->start, extent->end);
+
+       rc = interval_set(&node->li_node, extent->start, extent->end);
+       LASSERT(!rc);
 
         root = &res->lr_itree[idx].lit_root;
         found = interval_insert(&node->li_node, root);
@@ -937,48 +1104,67 @@ void ldlm_extent_add_lock(struct ldlm_resource *res,
         /* even though we use interval tree to manage the extent lock, we also
          * add the locks into grant list, for debug purpose, .. */
         ldlm_resource_add_lock(res, &res->lr_granted, lock);
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GRANT_CHECK)) {
+               struct ldlm_lock *lck;
+
+               list_for_each_entry_reverse(lck, &res->lr_granted,
+                                           l_res_link) {
+                       if (lck == lock)
+                               continue;
+                       if (lockmode_compat(lck->l_granted_mode,
+                                           lock->l_granted_mode))
+                               continue;
+                       if (ldlm_extent_overlap(&lck->l_req_extent,
+                                               &lock->l_req_extent)) {
+                               CDEBUG(D_ERROR, "granting conflicting lock %p "
+                                               "%p\n", lck, lock);
+                               ldlm_resource_dump(D_ERROR, res);
+                               LBUG();
+                       }
+               }
+       }
 }
 
 /** Remove cancelled lock from resource interval tree. */
 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
 {
-        struct ldlm_resource *res = lock->l_resource;
-        struct ldlm_interval *node = lock->l_tree_node;
-        struct ldlm_interval_tree *tree;
-        int idx;
+       struct ldlm_resource *res = lock->l_resource;
+       struct ldlm_interval *node = lock->l_tree_node;
+       struct ldlm_interval_tree *tree;
+       int idx;
 
-        if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
-                return;
+       if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
+               return;
 
-        idx = lock_mode_to_index(lock->l_granted_mode);
-        LASSERT(lock->l_granted_mode == 1 << idx);
-        tree = &res->lr_itree[idx];
+       idx = ldlm_mode_to_index(lock->l_granted_mode);
+       LASSERT(lock->l_granted_mode == 1 << idx);
+       tree = &res->lr_itree[idx];
 
-        LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
+       LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
 
-        tree->lit_size--;
-        node = ldlm_interval_detach(lock);
-        if (node) {
-                interval_erase(&node->li_node, &tree->lit_root);
-                ldlm_interval_free(node);
-        }
+       tree->lit_size--;
+       node = ldlm_interval_detach(lock);
+       if (node) {
+               interval_erase(&node->li_node, &tree->lit_root);
+               ldlm_interval_free(node);
+       }
 }
 
-void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
-                                     ldlm_policy_data_t *lpolicy)
+void ldlm_extent_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
+                                     union ldlm_policy_data *lpolicy)
 {
-        memset(lpolicy, 0, sizeof(*lpolicy));
-        lpolicy->l_extent.start = wpolicy->l_extent.start;
-        lpolicy->l_extent.end = wpolicy->l_extent.end;
-        lpolicy->l_extent.gid = wpolicy->l_extent.gid;
+       lpolicy->l_extent.start = wpolicy->l_extent.start;
+       lpolicy->l_extent.end = wpolicy->l_extent.end;
+       lpolicy->l_extent.gid = wpolicy->l_extent.gid;
 }
 
-void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
-                                     ldlm_wire_policy_data_t *wpolicy)
+void ldlm_extent_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
+                                     union ldlm_wire_policy_data *wpolicy)
 {
-        memset(wpolicy, 0, sizeof(*wpolicy));
-        wpolicy->l_extent.start = lpolicy->l_extent.start;
-        wpolicy->l_extent.end = lpolicy->l_extent.end;
-        wpolicy->l_extent.gid = lpolicy->l_extent.gid;
+       memset(wpolicy, 0, sizeof(*wpolicy));
+       wpolicy->l_extent.start = lpolicy->l_extent.start;
+       wpolicy->l_extent.end = lpolicy->l_extent.end;
+       wpolicy->l_extent.gid = lpolicy->l_extent.gid;
 }