Whamcloud - gitweb
Branch HEAD
authorbobijam <bobijam>
Thu, 22 Oct 2009 03:26:03 +0000 (03:26 +0000)
committerbobijam <bobijam>
Thu, 22 Oct 2009 03:26:03 +0000 (03:26 +0000)
b=19325
i=vitaly.fertman
i=oleg.drokin (green)

Description: Adjust locks' extents on their first enqueue, so that at the time
             they get granted, there is no need for another pass through the
             queues since they are already shaped into the proper forms.

lustre/ChangeLog
lustre/include/interval_tree.h
lustre/include/lustre_dlm.h
lustre/ldlm/interval_tree.c
lustre/ldlm/ldlm_extent.c

index 3e583bd..f873fe3 100644 (file)
@@ -14,6 +14,12 @@ tbd  Sun Microsystems, Inc.
         removed cwd "./" (refer to Bugzilla 14399).
        * File join has been disabled in this release, refer to Bugzilla 16929.
 
+Severity   : enhancement
+Bugzilla   : 19325
+Description: Adjust locks' extents on their first enqueue, so that at the time
+             they get granted, there is no need for another pass through the
+             queues since they are already shaped into the proper forms.
+
 Severity   : normal
 Bugzilla   : 20302
 Description: Fix in ptlrpc_expire_one_request() to print the signed time
index b50278b..5958790 100644 (file)
@@ -92,7 +92,7 @@ static inline void interval_set(struct interval_node *node,
  *  - the callback returns INTERVAL_ITER_STOP when it thinks the iteration
  *    should be stopped. It will then cause the iteration function to return
  *    immediately with return value INTERVAL_ITER_STOP.
- *  - callbacks for interval_iterate and interval_iterate_reverse: Every 
+ *  - callbacks for interval_iterate and interval_iterate_reverse: Every
  *    nodes in the tree will be set to @node before the callback being called
  *  - callback for interval_search: Only overlapped node will be set to @node
  *    before the callback being called.
@@ -109,17 +109,21 @@ void interval_erase(struct interval_node *node, struct interval_node **root);
 enum interval_iter interval_search(struct interval_node *root,
                                    struct interval_node_extent *ex,
                                    interval_callback_t func, void *data);
+enum interval_iter interval_search_expand_extent(struct interval_node *root,
+                                   struct interval_node_extent *ex,
+                                   struct interval_node_extent *result_ext,
+                                   interval_callback_t func, void *data);
 
 /* Iterate every node in the tree - by reverse order or regular order. */
-enum interval_iter interval_iterate(struct interval_node *root, 
+enum interval_iter interval_iterate(struct interval_node *root,
                                     interval_callback_t func, void *data);
 enum interval_iter interval_iterate_reverse(struct interval_node *root,
                                     interval_callback_t func,void *data);
 
-void interval_expand(struct interval_node *root, 
+void interval_expand(struct interval_node *root,
                      struct interval_node_extent *ext,
                      struct interval_node_extent *limiter);
-int interval_is_overlapped(struct interval_node *root, 
+int interval_is_overlapped(struct interval_node *root,
                            struct interval_node_extent *ex);
 struct interval_node *interval_find(struct interval_node *root,
                                     struct interval_node_extent *ex);
index 042458e..5c6ea4b 100644 (file)
@@ -189,7 +189,7 @@ typedef enum {
 /* Flags sent in AST lock_flags to be mapped into the receiving lock. */
 #define LDLM_AST_FLAGS         (LDLM_FL_DISCARD_DATA)
 
-/* 
+/*
  * --------------------------------------------------------------------------
  * NOTE! Starting from this point, that is, LDLM_FL_* flags with values above
  * 0x80000000 will not be sent over the wire.
@@ -617,6 +617,10 @@ struct ldlm_lock {
         struct lustre_handle     l_remote_handle;
 
         ldlm_policy_data_t       l_policy_data;
+        /* traffic index indicating how busy the resource will be, if it is
+         * high, the lock's granted region will not be so big lest it conflicts
+         * other locks, causing frequent lock cancellation and re-enqueue */
+        int                   l_traffic;
 
         /*
          * Protected by lr_lock. Various counters: readers, writers, etc.
@@ -640,8 +644,8 @@ struct ldlm_lock {
          */
         cfs_waitq_t           l_waitq;
 
-        /** 
-         * Seconds. it will be updated if there is any activity related to 
+        /**
+         * Seconds. it will be updated if there is any activity related to
          * the lock, e.g. enqueue the lock or send block AST.
          */
         cfs_time_t            l_last_activity;
index 60dcbeb..0b69afc 100644 (file)
@@ -101,7 +101,7 @@ static inline int extent_equal(struct interval_node_extent *e1,
         return (e1->start == e2->start) && (e1->end == e2->end);
 }
 
-static inline int extent_overlapped(struct interval_node_extent *e1, 
+static inline int extent_overlapped(struct interval_node_extent *e1,
                                     struct interval_node_extent *e2)
 {
         return (e1->start <= e2->end) && (e2->start <= e1->end);
@@ -195,7 +195,7 @@ enum interval_iter interval_iterate(struct interval_node *root,
         struct interval_node *node;
         enum interval_iter rc = INTERVAL_ITER_CONT;
         ENTRY;
-        
+
         interval_for_each(node, root) {
                 rc = func(node, data);
                 if (rc == INTERVAL_ITER_STOP)
@@ -213,7 +213,7 @@ enum interval_iter interval_iterate_reverse(struct interval_node *root,
         struct interval_node *node;
         enum interval_iter rc = INTERVAL_ITER_CONT;
         ENTRY;
-        
+
         interval_for_each_reverse(node, root) {
                 rc = func(node, data);
                 if (rc == INTERVAL_ITER_STOP)
@@ -322,10 +322,10 @@ static void __rotate_right(struct interval_node *node,
 } while (0)
 
 /*
- * Operations INSERT and DELETE, when run on a tree with n keys, 
- * take O(logN) time.Because they modify the tree, the result 
- * may violate the red-black properties.To restore these properties, 
- * we must change the colors of some of the nodes in the tree 
+ * Operations INSERT and DELETE, when run on a tree with n keys,
+ * take O(logN) time.Because they modify the tree, the result
+ * may violate the red-black properties.To restore these properties,
+ * we must change the colors of some of the nodes in the tree
  * and also change the pointer structure.
  */
 static void interval_insert_color(struct interval_node *node,
@@ -384,7 +384,7 @@ static void interval_insert_color(struct interval_node *node,
 
 struct interval_node *interval_insert(struct interval_node *node,
                                       struct interval_node **root)
-                     
+
 {
         struct interval_node **p, *parent = NULL;
         ENTRY;
@@ -402,7 +402,7 @@ struct interval_node *interval_insert(struct interval_node *node,
 
                 if (node_compare(node, parent) < 0)
                         p = &parent->in_left;
-                else 
+                else
                         p = &parent->in_right;
         }
 
@@ -499,8 +499,8 @@ static void interval_erase_color(struct interval_node *node,
         EXIT;
 }
 
-/* 
- * if the @max_high value of @node is changed, this function traverse  a path 
+/*
+ * if the @max_high value of @node is changed, this function traverse  a path
  * from node  up to the root to update max_high for the whole tree.
  */
 static void update_maxhigh(struct interval_node *node,
@@ -656,13 +656,13 @@ enum interval_iter interval_search(struct interval_node *node,
                                 node = node->in_right;
                                 continue;
                         }
-                } 
+                }
 
                 parent = node->in_parent;
                 while (parent) {
                         if (node_is_left_child(node) &&
                             parent->in_right) {
-                                /* If we ever got the left, it means that the 
+                                /* If we ever got the left, it means that the
                                  * parent met ext->end<interval_low(parent), or
                                  * may_overlap(parent). If the former is true,
                                  * we needn't go back. So stop early and check
@@ -681,6 +681,60 @@ enum interval_iter interval_search(struct interval_node *node,
 }
 EXPORT_SYMBOL(interval_search);
 
+enum interval_iter interval_search_expand_extent( struct interval_node *node,
+                                       struct interval_node_extent *ext,
+                                       struct interval_node_extent *result_ext,
+                                       interval_callback_t func, void *data)
+{
+        struct interval_node *parent;
+        enum interval_iter rc = INTERVAL_ITER_CONT;
+
+        LASSERT(ext != NULL);
+        LASSERT(func != NULL);
+
+        while (node) {
+                if (ext->end < interval_low(node)) {
+                        if (result_ext->end > interval_low(node) - 1)
+                                result_ext->end = interval_low(node) - 1;
+                        if (node->in_left) {
+                                node = node->in_left;
+                                continue;
+                        }
+                } else if (ext->start > node->in_max_high) {
+                        if (result_ext->start < node->in_max_high + 1)
+                                result_ext->start = node->in_max_high + 1;
+                } else {
+                        if (extent_overlapped(ext, &node->in_extent)) {
+                                rc = func(node, data);
+                                if (rc == INTERVAL_ITER_STOP)
+                                        break;
+                        }
+
+                        if (node->in_left) {
+                                node = node->in_left;
+                                continue;
+                        }
+                        if (node->in_right) {
+                                node = node->in_right;
+                                continue;
+                        }
+                }
+
+                parent = node->in_parent;
+                while (parent) {
+                        if (node_is_left_child(node) && parent->in_right) {
+                                node = parent->in_right;
+                                break;
+                        }
+                        node = parent;
+                        parent = node->in_parent;
+                }
+                if (parent == NULL)
+                        break;
+        }
+        return rc;
+}
+
 static enum interval_iter interval_overlap_cb(struct interval_node *n,
                                               void *args)
 {
@@ -723,7 +777,7 @@ EXPORT_SYMBOL(interval_is_overlapped);
  *        return res;
  * }
  *
- * It's much easy to eliminate the recursion, see interval_search for 
+ * It's much easy to eliminate the recursion, see interval_search for
  * an example. -jay
  */
 static inline __u64 interval_expand_low(struct interval_node *root, __u64 low)
@@ -741,7 +795,7 @@ static inline __u64 interval_expand_high(struct interval_node *node, __u64 high)
         while (node != NULL) {
                 if (node->in_max_high < high)
                         break;
-                        
+
                 if (interval_low(node) > high) {
                         result = interval_low(node) - 1;
                         node = node->in_left;
index 03172d6..44442f2 100644 (file)
@@ -100,186 +100,6 @@ static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
                  mask, new_ex->end, req_end);
 }
 
-/* The purpose of this function is to return:
- * - the maximum extent
- * - containing the requested extent
- * - and not overlapping existing conflicting extents outside the requested one
- *
- * Use interval tree to expand the lock extent for granted lock.
- */
-static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
-                                                struct ldlm_extent *new_ex)
-{
-        struct ldlm_resource *res = req->l_resource;
-        ldlm_mode_t req_mode = req->l_req_mode;
-        __u64 req_start = req->l_req_extent.start;
-        __u64 req_end = req->l_req_extent.end;
-        struct ldlm_interval_tree *tree;
-        struct interval_node_extent limiter = { new_ex->start, new_ex->end };
-        int conflicting = 0;
-        int idx;
-        ENTRY;
-
-        lockmode_verify(req_mode);
-
-        /* using interval tree to handle the ldlm extent granted locks */
-        for (idx = 0; idx < LCK_MODE_NUM; idx++) {
-                struct interval_node_extent ext = { req_start, req_end };
-
-                tree = &res->lr_itree[idx];
-                if (lockmode_compat(tree->lit_mode, req_mode))
-                        continue;
-
-                conflicting += tree->lit_size;
-                if (conflicting > 4)
-                        limiter.start = req_start;
-
-                if (interval_is_overlapped(tree->lit_root, &ext))
-                        CDEBUG(D_INFO, 
-                               "req_mode = %d, tree->lit_mode = %d, "
-                               "tree->lit_size = %d\n",
-                               req_mode, tree->lit_mode, tree->lit_size);
-                interval_expand(tree->lit_root, &ext, &limiter);
-                limiter.start = max(limiter.start, ext.start);
-                limiter.end = min(limiter.end, ext.end);
-                if (limiter.start == req_start && limiter.end == req_end)
-                        break;
-        }
-
-        new_ex->start = limiter.start;
-        new_ex->end = limiter.end;
-        LASSERT(new_ex->start <= req_start);
-        LASSERT(new_ex->end >= req_end);
-
-        ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
-        EXIT;
-}
-
-/* The purpose of this function is to return:
- * - the maximum extent
- * - containing the requested extent
- * - and not overlapping existing conflicting extents outside the requested one
- */
-static void
-ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
-                                    struct ldlm_extent *new_ex)
-{
-        struct list_head *tmp;
-        struct ldlm_resource *res = req->l_resource;
-        ldlm_mode_t req_mode = req->l_req_mode;
-        __u64 req_start = req->l_req_extent.start;
-        __u64 req_end = req->l_req_extent.end;
-        int conflicting = 0;
-        ENTRY;
-
-        lockmode_verify(req_mode);
-
-        /* for waiting locks */
-        list_for_each(tmp, &res->lr_waiting) {
-                struct ldlm_lock *lock;
-                struct ldlm_extent *l_extent;
-
-                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-                l_extent = &lock->l_policy_data.l_extent;
-
-                /* We already hit the minimum requested size, search no more */
-                if (new_ex->start == req_start && new_ex->end == req_end) {
-                        EXIT;
-                        return;
-                }
-
-                /* Don't conflict with ourselves */
-                if (req == lock)
-                        continue;
-
-                /* Locks are compatible, overlap doesn't matter */
-                /* Until bug 20 is fixed, try to avoid granting overlapping
-                 * locks on one client (they take a long time to cancel) */
-                if (lockmode_compat(lock->l_req_mode, req_mode) &&
-                    lock->l_export != req->l_export)
-                        continue;
-
-                /* If this is a high-traffic lock, don't grow downwards at all
-                 * or grow upwards too much */
-                ++conflicting;
-                if (conflicting > 4)
-                        new_ex->start = req_start;
-
-                /* If lock doesn't overlap new_ex, skip it. */
-                if (!ldlm_extent_overlap(l_extent, new_ex))
-                        continue;
-
-                /* Locks conflicting in requested extents and we can't satisfy
-                 * both locks, so ignore it.  Either we will ping-pong this
-                 * extent (we would regardless of what extent we granted) or
-                 * lock is unused and it shouldn't limit our extent growth. */
-                if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
-                        continue;
-
-                /* We grow extents downwards only as far as they don't overlap
-                 * with already-granted locks, on the assumption that clients
-                 * will be writing beyond the initial requested end and would
-                 * then need to enqueue a new lock beyond previous request.
-                 * l_req_extent->end strictly < req_start, checked above. */
-                if (l_extent->start < req_start && new_ex->start != req_start) {
-                        if (l_extent->end >= req_start)
-                                new_ex->start = req_start;
-                        else
-                                new_ex->start = min(l_extent->end+1, req_start);
-                }
-
-                /* If we need to cancel this lock anyways because our request
-                 * overlaps the granted lock, we grow up to its requested
-                 * extent start instead of limiting this extent, assuming that
-                 * clients are writing forwards and the lock had over grown
-                 * its extent downwards before we enqueued our request. */
-                if (l_extent->end > req_end) {
-                        if (l_extent->start <= req_end)
-                                new_ex->end = max(lock->l_req_extent.start - 1,
-                                                  req_end);
-                        else
-                                new_ex->end = max(l_extent->start - 1, req_end);
-                }
-        }
-
-        ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
-        EXIT;
-}
-
-
-/* In order to determine the largest possible extent we can grant, we need
- * to scan all of the queues. */
-static void ldlm_extent_policy(struct ldlm_resource *res,
-                               struct ldlm_lock *lock, int *flags)
-{
-        struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
-
-        if (lock->l_export == NULL)
-                /*
-                 * this is local lock taken by server (e.g., as a part of
-                 * OST-side locking, or unlink handling). Expansion doesn't
-                 * make a lot of sense for local locks, because they are
-                 * dropped immediately on operation completion and would only
-                 * conflict with other threads.
-                 */
-                return;
-
-        if (lock->l_policy_data.l_extent.start == 0 &&
-            lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
-                /* fast-path whole file locks */
-                return;
-
-        ldlm_extent_internal_policy_granted(lock, &new_ex);
-        ldlm_extent_internal_policy_waiting(lock, &new_ex);
-
-        if (new_ex.start != lock->l_policy_data.l_extent.start ||
-            new_ex.end != lock->l_policy_data.l_extent.end) {
-                *flags |= LDLM_FL_LOCK_CHANGED;
-                lock->l_policy_data.l_extent.start = new_ex.start;
-                lock->l_policy_data.l_extent.end = new_ex.end;
-        }
-}
-
 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
 {
         struct ldlm_resource *res = lock->l_resource;
@@ -301,6 +121,7 @@ struct ldlm_extent_compat_args {
         ldlm_mode_t mode;
         int *locks;
         int *compat;
+        int *conflicts;
 };
 
 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
@@ -324,6 +145,11 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
                          ldlm_lockname[mode],
                          ldlm_lockname[lock->l_granted_mode]);
                 count++;
+                /* only count _requested_ region overlapped locks as contended
+                 * locks */
+                if (lock->l_req_extent.end >= enq->l_req_extent.start &&
+                    lock->l_req_extent.start <= enq->l_req_extent.end)
+                        (*priv->conflicts)++;
                 if (lock->l_blocking_ast)
                         ldlm_add_ast_work_item(lock, enq, work_list);
         }
@@ -340,260 +166,379 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
         RETURN(INTERVAL_ITER_CONT);
 }
 
-/* Determine if the lock is compatible with all locks on the queue.
- * We stop walking the queue if we hit ourselves so we don't take
- * conflicting locks enqueued after us into accound, or we'd wait forever.
- *
- * 0 if the lock is not compatible
- * 1 if the lock is compatible
- * 2 if this group lock is compatible and requires no further checking
- * negative error, such as EWOULDBLOCK for group locks
- */
 static int
-ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
-                         int *flags, ldlm_error_t *err,
-                         struct list_head *work_list, int *contended_locks)
+ldlm_extent_compat_granted_queue(struct list_head *queue, struct ldlm_lock *req,
+                                 int *flags, ldlm_error_t *err,
+                                 struct list_head *work_list,
+                                 int *contended_locks)
 {
-        struct list_head *tmp;
-        struct ldlm_lock *lock;
         struct ldlm_resource *res = req->l_resource;
         ldlm_mode_t req_mode = req->l_req_mode;
         __u64 req_start = req->l_req_extent.start;
         __u64 req_end = req->l_req_extent.end;
-        int compat = 1;
-        int scan = 0;
-        int check_contention;
+        int compat = 1, conflicts;
+        /* Using interval tree for granted lock */
+        struct ldlm_interval_tree *tree;
+        struct ldlm_extent_compat_args data = {.work_list = work_list,
+                                       .lock = req,
+                                       .locks = contended_locks,
+                                       .compat = &compat,
+                                       .conflicts = &conflicts };
+        struct interval_node_extent ex = { .start = req_start,
+                                           .end = req_end };
+        int idx, rc;
         ENTRY;
 
-        lockmode_verify(req_mode);
+        for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+                conflicts = 0;
+                tree = &res->lr_itree[idx];
+                if (tree->lit_root == NULL) /* empty tree, skipped */
+                        continue;
 
-        /* Using interval tree for granted lock */
-        if (queue == &res->lr_granted) {
-                struct ldlm_interval_tree *tree;
-                struct ldlm_extent_compat_args data = {.work_list = work_list,
-                                               .lock = req,
-                                               .locks = contended_locks,
-                                               .compat = &compat };
-                struct interval_node_extent ex = { .start = req_start,
-                                                   .end = req_end };
-                int idx, rc;
-
-                for (idx = 0; idx < LCK_MODE_NUM; idx++) {
-                        tree = &res->lr_itree[idx];
-                        if (tree->lit_root == NULL) /* empty tree, skipped */
+                data.mode = tree->lit_mode;
+                if (lockmode_compat(req_mode, tree->lit_mode)) {
+                        struct ldlm_interval *node;
+                        struct ldlm_extent *extent;
+
+                        if (req_mode != LCK_GROUP)
                                 continue;
 
-                        data.mode = tree->lit_mode;
-                        if (lockmode_compat(req_mode, tree->lit_mode)) {
-                                struct ldlm_interval *node;
-                                struct ldlm_extent *extent;
-
-                                if (req_mode != LCK_GROUP)
-                                        continue;
-
-                                /* group lock, grant it immediately if
-                                 * compatible */
-                                node = to_ldlm_interval(tree->lit_root);
-                                extent = ldlm_interval_extent(node);
-                                if (req->l_policy_data.l_extent.gid ==
-                                    extent->gid)
-                                        RETURN(2);
+                        /* group lock, grant it immediately if
+                         * compatible */
+                        node = to_ldlm_interval(tree->lit_root);
+                        extent = ldlm_interval_extent(node);
+                        if (req->l_policy_data.l_extent.gid ==
+                            extent->gid)
+                                RETURN(2);
+                }
+
+                if (tree->lit_mode == LCK_GROUP) {
+                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+                                compat = -EWOULDBLOCK;
+                                goto destroylock;
                         }
 
-                        if (tree->lit_mode == LCK_GROUP) {
-                                if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                                        compat = -EWOULDBLOCK;
-                                        goto destroylock;
-                                }
+                        *flags |= LDLM_FL_NO_TIMEOUT;
+                        if (!work_list)
+                                RETURN(0);
 
-                                *flags |= LDLM_FL_NO_TIMEOUT;
-                                if (!work_list)
-                                        RETURN(0);
 
-                                /* if work list is not NULL,add all
-                                   locks in the tree to work list */
-                                compat = 0;
-                                interval_iterate(tree->lit_root,
-                                                 ldlm_extent_compat_cb, &data);
-                                continue;
-                        }
+                        /* if work list is not NULL,add all
+                           locks in the tree to work list */
+                        compat = 0;
+                        interval_iterate(tree->lit_root,
+                                         ldlm_extent_compat_cb, &data);
+                        continue;
+                }
 
-                        if (!work_list) {
-                                rc = interval_is_overlapped(tree->lit_root,&ex);
-                                if (rc)
-                                        RETURN(0);
-                        } else {
-                                interval_search(tree->lit_root, &ex,
-                                                ldlm_extent_compat_cb, &data);
-                                if (!list_empty(work_list) && compat)
-                                        compat = 0;
+                if (!work_list) {
+                        rc = interval_is_overlapped(tree->lit_root, &ex);
+                        if (rc)
+                                RETURN(0);
+                } else {
+                        struct interval_node_extent result_ext = {
+                                .start = req->l_policy_data.l_extent.start,
+                                .end = req->l_policy_data.l_extent.end };
+
+                        interval_search_expand_extent(tree->lit_root, &ex,
+                                                      &result_ext,
+                                                      ldlm_extent_compat_cb,
+                                                      &data);
+                        req->l_policy_data.l_extent.start = result_ext.start;
+                        req->l_policy_data.l_extent.end = result_ext.end;
+                        /* for granted locks, count non-compatible not overlapping
+                         * locks in traffic index */
+                        req->l_traffic += tree->lit_size - conflicts;
+
+                        if (!list_empty(work_list)) {
+                                compat = 0;
+                               /* if there is at least 1 conflicting lock, we
+                                * do not expand to the left, since we often
+                                * continue writing to the right.
+                                */
+                               req->l_policy_data.l_extent.start = req_start;
                         }
                 }
-        } else { /* for waiting queue */
-                list_for_each(tmp, queue) {
-                        check_contention = 1;
+        }
 
-                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+        RETURN(compat);
+destroylock:
+        list_del_init(&req->l_res_link);
+        ldlm_lock_destroy_nolock(req);
+        *err = compat;
+        RETURN(compat);
+}
 
-                        if (req == lock)
-                                break;
+static int
+ldlm_extent_compat_waiting_queue(struct list_head *queue, struct ldlm_lock *req,
+                                 int *flags, ldlm_error_t *err,
+                                 struct list_head *work_list,
+                                 int *contended_locks)
+{
+        struct list_head *tmp;
+        struct ldlm_lock *lock;
+        ldlm_mode_t req_mode = req->l_req_mode;
+        __u64 req_start = req->l_req_extent.start;
+        __u64 req_end = req->l_req_extent.end;
+        int compat = 1;
+        int scan = 0;
+        int check_contention;
+        ENTRY;
 
-                        if (unlikely(scan)) {
-                                /* We only get here if we are queuing GROUP lock
-                                   and met some incompatible one. The main idea of this
-                                   code is to insert GROUP lock past compatible GROUP
-                                   lock in the waiting queue or if there is not any,
-                                   then in front of first non-GROUP lock */
-                                if (lock->l_req_mode != LCK_GROUP) {
-                                        /* Ok, we hit non-GROUP lock, there should
-                                         * be no more GROUP locks later on, queue in
-                                         * front of first non-GROUP lock */
-
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        list_del_init(&lock->l_res_link);
-                                        ldlm_resource_insert_lock_after(req, lock);
-                                        compat = 0;
-                                        break;
-                                }
-                                if (req->l_policy_data.l_extent.gid ==
-                                    lock->l_policy_data.l_extent.gid) {
-                                        /* found it */
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        compat = 0;
-                                        break;
-                                }
-                                continue;
-                        }
+        list_for_each(tmp, queue) {
+                check_contention = 1;
 
-                        /* locks are compatible, overlap doesn't matter */
-                        if (lockmode_compat(lock->l_req_mode, req_mode)) {
-                                if (req_mode == LCK_PR &&
-                                    ((lock->l_policy_data.l_extent.start <=
-                                      req->l_policy_data.l_extent.start) &&
-                                     (lock->l_policy_data.l_extent.end >=
-                                      req->l_policy_data.l_extent.end))) {
-                                        /* If we met a PR lock just like us or wider,
-                                           and nobody down the list conflicted with
-                                           it, that means we can skip processing of
-                                           the rest of the list and safely place
-                                           ourselves at the end of the list, or grant
-                                           (dependent if we met an conflicting locks
-                                           before in the list).
-                                           In case of 1st enqueue only we continue
-                                           traversing if there is something conflicting
-                                           down the list because we need to make sure
-                                           that something is marked as AST_SENT as well,
-                                           in cse of empy worklist we would exit on
-                                           first conflict met. */
-                                        /* There IS a case where such flag is
-                                           not set for a lock, yet it blocks
-                                           something. Luckily for us this is
-                                           only during destroy, so lock is
-                                           exclusive. So here we are safe */
-                                        if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
-                                                RETURN(compat);
-                                        }
-                                }
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
-                                /* non-group locks are compatible, overlap doesn't
-                                   matter */
-                                if (likely(req_mode != LCK_GROUP))
-                                        continue;
-
-                                /* If we are trying to get a GROUP lock and there is
-                                   another one of this kind, we need to compare gid */
-                                if (req->l_policy_data.l_extent.gid ==
-                                    lock->l_policy_data.l_extent.gid) {
-                                        /* If existing lock with matched gid is granted,
-                                           we grant new one too. */
-                                        if (lock->l_req_mode == lock->l_granted_mode)
-                                                RETURN(2);
-
-                                        /* Otherwise we are scanning queue of waiting
-                                         * locks and it means current request would
-                                         * block along with existing lock (that is
-                                         * already blocked.
-                                         * If we are in nonblocking mode - return
-                                         * immediately */
-                                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                                                compat = -EWOULDBLOCK;
-                                                goto destroylock;
-                                        }
-                                        /* If this group lock is compatible with another
-                                         * group lock on the waiting list, they must be
-                                         * together in the list, so they can be granted
-                                         * at the same time.  Otherwise the later lock
-                                         * can get stuck behind another, incompatible,
-                                         * lock. */
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        /* Because 'lock' is not granted, we can stop
-                                         * processing this queue and return immediately.
-                                         * There is no need to check the rest of the
-                                         * list. */
-                                        RETURN(0);
-                                }
-                        }
+                if (req == lock)
+                        break;
 
-                        if (unlikely(req_mode == LCK_GROUP &&
-                                     (lock->l_req_mode != lock->l_granted_mode))) {
-                                scan = 1;
+                if (unlikely(scan)) {
+                        /* We only get here if we are queuing GROUP lock
+                           and met some incompatible one. The main idea of this
+                           code is to insert GROUP lock past compatible GROUP
+                           lock in the waiting queue or if there is not any,
+                           then in front of first non-GROUP lock */
+                        if (lock->l_req_mode != LCK_GROUP) {
+                                /* Ok, we hit non-GROUP lock, there should be no
+                                   more GROUP locks later on, queue in front of
+                                   first non-GROUP lock */
+
+                                ldlm_resource_insert_lock_after(lock, req);
+                                list_del_init(&lock->l_res_link);
+                                ldlm_resource_insert_lock_after(req, lock);
                                 compat = 0;
-                                if (lock->l_req_mode != LCK_GROUP) {
-                                        /* Ok, we hit non-GROUP lock, there should be no
-                                           more GROUP locks later on, queue in front of
-                                           first non-GROUP lock */
-
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        list_del_init(&lock->l_res_link);
-                                        ldlm_resource_insert_lock_after(req, lock);
-                                        break;
-                                }
-                                if (req->l_policy_data.l_extent.gid ==
-                                    lock->l_policy_data.l_extent.gid) {
-                                        /* found it */
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        break;
+                                break;
+                        }
+                        if (req->l_policy_data.l_extent.gid ==
+                            lock->l_policy_data.l_extent.gid) {
+                                /* found it */
+                                ldlm_resource_insert_lock_after(lock, req);
+                                compat = 0;
+                                break;
+                        }
+                        continue;
+                }
+
+                /* locks are compatible, overlap doesn't matter */
+                if (lockmode_compat(lock->l_req_mode, req_mode)) {
+                        if (req_mode == LCK_PR &&
+                            ((lock->l_policy_data.l_extent.start <=
+                              req->l_policy_data.l_extent.start) &&
+                             (lock->l_policy_data.l_extent.end >=
+                              req->l_policy_data.l_extent.end))) {
+                                /* If we met a PR lock just like us or wider,
+                                   and nobody down the list conflicted with
+                                   it, that means we can skip processing of
+                                   the rest of the list and safely place
+                                   ourselves at the end of the list, or grant
+                                   (dependent if we met an conflicting locks
+                                   before in the list).
+                                   In case of 1st enqueue only we continue
+                                   traversing if there is something conflicting
+                                   down the list because we need to make sure
+                                   that something is marked as AST_SENT as well,
+                                   in cse of empy worklist we would exit on
+                                   first conflict met. */
+                                /* There IS a case where such flag is
+                                   not set for a lock, yet it blocks
+                                   something. Luckily for us this is
+                                   only during destroy, so lock is
+                                   exclusive. So here we are safe */
+                                if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
+                                        RETURN(compat);
                                 }
-                                continue;
                         }
 
-                        if (unlikely(lock->l_req_mode == LCK_GROUP)) {
-                                /* If compared lock is GROUP, then requested is PR/PW/
-                                 * so this is not compatible; extent range does not
-                                 * matter */
+                        /* non-group locks are compatible, overlap doesn't
+                           matter */
+                        if (likely(req_mode != LCK_GROUP))
+                                continue;
+
+                        /* If we are trying to get a GROUP lock and there is
+                           another one of this kind, we need to compare gid */
+                        if (req->l_policy_data.l_extent.gid ==
+                            lock->l_policy_data.l_extent.gid) {
+                                /* We are scanning queue of waiting
+                                 * locks and it means current request would
+                                 * block along with existing lock (that is
+                                 * already blocked.
+                                 * If we are in nonblocking mode - return
+                                 * immediately */
                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
                                         compat = -EWOULDBLOCK;
                                         goto destroylock;
-                                } else {
-                                        *flags |= LDLM_FL_NO_TIMEOUT;
                                 }
-                        } else if (lock->l_policy_data.l_extent.end < req_start ||
-                                   lock->l_policy_data.l_extent.start > req_end) {
+                                /* If this group lock is compatible with another
+                                 * group lock on the waiting list, they must be
+                                 * together in the list, so they can be granted
+                                 * at the same time.  Otherwise the later lock
+                                 * can get stuck behind another, incompatible,
+                                 * lock. */
+                                ldlm_resource_insert_lock_after(lock, req);
+                                /* Because 'lock' is not granted, we can stop
+                                 * processing this queue and return immediately.
+                                 * There is no need to check the rest of the
+                                 * list. */
+                                RETURN(0);
+                        }
+                }
+
+                if (unlikely(req_mode == LCK_GROUP &&
+                             (lock->l_req_mode != lock->l_granted_mode))) {
+                        scan = 1;
+                        compat = 0;
+                        if (lock->l_req_mode != LCK_GROUP) {
+                                /* Ok, we hit non-GROUP lock, there should
+                                 * be no more GROUP locks later on, queue in
+                                 * front of first non-GROUP lock */
+
+                                ldlm_resource_insert_lock_after(lock, req);
+                                list_del_init(&lock->l_res_link);
+                                ldlm_resource_insert_lock_after(req, lock);
+                                break;
+                        }
+                        if (req->l_policy_data.l_extent.gid ==
+                            lock->l_policy_data.l_extent.gid) {
+                                /* found it */
+                                ldlm_resource_insert_lock_after(lock, req);
+                                break;
+                        }
+                        continue;
+                }
+
+                if (unlikely(lock->l_req_mode == LCK_GROUP)) {
+                        /* If compared lock is GROUP, then requested is PR/PW/
+                         * so this is not compatible; extent range does not
+                         * matter */
+                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+                                compat = -EWOULDBLOCK;
+                                goto destroylock;
+                        } else {
+                                *flags |= LDLM_FL_NO_TIMEOUT;
+                        }
+                } else if (!work_list) {
+                        if (lock->l_policy_data.l_extent.end < req_start ||
+                            lock->l_policy_data.l_extent.start > req_end)
                                 /* if a non group lock doesn't overlap skip it */
                                 continue;
-                        } else if (lock->l_req_extent.end < req_start ||
-                                   lock->l_req_extent.start > req_end) {
-                                /* false contention, the requests doesn't really overlap */
-                                check_contention = 0;
+                        RETURN(0);
+                } else {
+                        /* for waiting locks, count all non-compatible locks in
+                         * traffic index */
+                        ++req->l_traffic;
+                        ++lock->l_traffic;
+
+                        /* adjust policy */
+                        if (lock->l_policy_data.l_extent.end < req_start) {
+                                /*     lock            req
+                                 * ------------+
+                                 * ++++++      |   +++++++
+                                 *      +      |   +
+                                 * ++++++      |   +++++++
+                                 * ------------+
+                                 */
+                                if (lock->l_policy_data.l_extent.end >
+                                    req->l_policy_data.l_extent.start)
+                                        req->l_policy_data.l_extent.start =
+                                             lock->l_policy_data.l_extent.end+1;
+                                continue;
+                        } else if (lock->l_req_extent.end < req_start) {
+                                /*     lock            req
+                                 * ------------------+
+                                 * ++++++          +++++++
+                                 *      +          + |
+                                 * ++++++          +++++++
+                                 * ------------------+
+                                 */
+                                lock->l_policy_data.l_extent.end =
+                                                          req_start - 1;
+                                req->l_policy_data.l_extent.start =
+                                                              req_start;
+                                continue;
+                        } else if (lock->l_policy_data.l_extent.start >
+                                   req_end) {
+                                /*  req              lock
+                                 *              +--------------
+                                 *  +++++++     |    +++++++
+                                 *        +     |    +
+                                 *  +++++++     |    +++++++
+                                 *              +--------------
+                                 */
+                                if (lock->l_policy_data.l_extent.start <
+                                     req->l_policy_data.l_extent.end)
+                                        req->l_policy_data.l_extent.end =
+                                           lock->l_policy_data.l_extent.start-1;
+                                continue;
+                        } else if (lock->l_req_extent.start > req_end) {
+                                /*  req              lock
+                                 *      +----------------------
+                                 *  +++++++          +++++++
+                                 *      | +          +
+                                 *  +++++++          +++++++
+                                 *      +----------------------
+                                 */
+                                lock->l_policy_data.l_extent.start =
+                                                            req_end + 1;
+                                req->l_policy_data.l_extent.end=req_end;
+                                continue;
                         }
+                } /* policy_adj */
 
-                        if (!work_list)
-                                RETURN(0);
-
+                compat = 0;
+                if (work_list) {
                         /* don't count conflicting glimpse locks */
-                        if (lock->l_req_mode == LCK_PR &&
-                            lock->l_policy_data.l_extent.start == 0 &&
-                            lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
+                        if (lock->l_flags & LDLM_FL_HAS_INTENT)
                                 check_contention = 0;
 
                         *contended_locks += check_contention;
 
-                        compat = 0;
                         if (lock->l_blocking_ast)
                                 ldlm_add_ast_work_item(lock, req, work_list);
                 }
         }
 
+        RETURN(compat);
+destroylock:
+        list_del_init(&req->l_res_link);
+        ldlm_lock_destroy_nolock(req);
+        *err = compat;
+        RETURN(compat);
+}
+
+/* Determine if the lock is compatible with all locks on the queue.
+ * We stop walking the queue if we hit ourselves so we don't take
+ * conflicting locks enqueued after us into accound, or we'd wait forever.
+ *
+ * 0 if the lock is not compatible
+ * 1 if the lock is compatible
+ * 2 if this group lock is compatible and requires no further checking
+ * negative error, such as EWOULDBLOCK for group locks
+ *
+ * Note: policy adjustment only happends during the 1st lock enqueue procedure
+ */
+static int
+ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
+                         int *flags, ldlm_error_t *err,
+                         struct list_head *work_list, int *contended_locks)
+{
+        struct ldlm_resource *res = req->l_resource;
+        ldlm_mode_t req_mode = req->l_req_mode;
+        __u64 req_start = req->l_req_extent.start;
+        __u64 req_end = req->l_req_extent.end;
+        int compat = 1;
+        ENTRY;
+
+        lockmode_verify(req_mode);
+
+        if (queue == &res->lr_granted)
+                compat = ldlm_extent_compat_granted_queue(queue, req, flags,
+                                                          err, work_list,
+                                                          contended_locks);
+        else
+                compat = ldlm_extent_compat_waiting_queue(queue, req, flags,
+                                                          err, work_list,
+                                                          contended_locks);
         if (ldlm_check_contention(req, *contended_locks) &&
             compat == 0 &&
             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
@@ -631,6 +576,24 @@ static void discard_bl_list(struct list_head *bl_list)
         EXIT;
 }
 
+static inline void ldlm_process_extent_init(struct ldlm_lock *lock)
+{
+        lock->l_policy_data.l_extent.start = 0;
+        lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
+}
+
+static inline void ldlm_process_extent_fini(struct ldlm_lock *lock, int *flags)
+{
+        if (lock->l_traffic > 4)
+                lock->l_policy_data.l_extent.start = lock->l_req_extent.start;
+        ldlm_extent_internal_policy_fixup(lock,
+                                          &lock->l_policy_data.l_extent,
+                                          lock->l_traffic);
+        if (lock->l_req_extent.start != lock->l_policy_data.l_extent.start ||
+            lock->l_req_extent.end   != lock->l_policy_data.l_extent.end)
+                *flags |= LDLM_FL_LOCK_CHANGED;
+}
+
 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
   *   - blocking ASTs have already been sent
   *   - must call this function with the ns lock held
@@ -672,14 +635,24 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
 
                 ldlm_resource_unlink_lock(lock);
 
-                if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
-                        ldlm_extent_policy(res, lock, flags);
+                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) {
+                        lock->l_policy_data.l_extent.start =
+                                lock->l_req_extent.start;
+                        lock->l_policy_data.l_extent.end =
+                                lock->l_req_extent.end;
+                } else {
+                        ldlm_process_extent_fini(lock, flags);
+                }
+
                 ldlm_grant_lock(lock, work_list);
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
  restart:
         contended_locks = 0;
+
+        ldlm_process_extent_init(lock);
+
         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
                                       &rpc_list, &contended_locks);
         if (rc < 0)
@@ -694,8 +667,8 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
 
         if (rc + rc2 == 2) {
         grant:
-                ldlm_extent_policy(res, lock, flags);
                 ldlm_resource_unlink_lock(lock);
+                ldlm_process_extent_fini(lock, flags);
                 ldlm_grant_lock(lock, NULL);
         } else {
                 /* If either of the compat_queue()s returned failure, then we
@@ -712,7 +685,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
                     !ns_is_client(res->lr_namespace))
                         class_fail_export(lock->l_export);
+
                 lock_res(res);
                 if (rc == -ERESTART) {