Whamcloud - gitweb
b=19325 adjust waiting extent locks during 1st enqueue
authorVitaly Fertman <Vitaly.Fertman@Sun.COM>
Tue, 28 Sep 2010 21:52:03 +0000 (01:52 +0400)
committerVitaly Fertman <vitaly.fertman@sun.com>
Thu, 7 Oct 2010 23:03:40 +0000 (03:03 +0400)
o=bobijam
i=vitaly
i=green

Re-landing. Adjust locks' extents on their first enqueue, so that at the time
they get granted, there is no need for another pass through the queues since
they are already shaped into the proper forms.

lustre/include/interval_tree.h
lustre/include/lustre_dlm.h
lustre/ldlm/interval_tree.c
lustre/ldlm/ldlm_extent.c

index 2b0d391..b77b68c 100644 (file)
@@ -109,6 +109,10 @@ void interval_erase(struct interval_node *node, struct interval_node **root);
 enum interval_iter interval_search(struct interval_node *root,
                                    struct interval_node_extent *ex,
                                    interval_callback_t func, void *data);
+enum interval_iter interval_search_expand_extent(struct interval_node *root,
+                                   struct interval_node_extent *ex,
+                                   struct interval_node_extent *result_ext,
+                                   interval_callback_t func, void *data);
 
 /* Iterate every node in the tree - by reverse order or regular order. */
 enum interval_iter interval_iterate(struct interval_node *root, 
index 4b3521d..970d222 100644 (file)
@@ -648,6 +648,13 @@ struct ldlm_lock {
 
         ldlm_policy_data_t       l_policy_data;
 
+        /**
+         * Traffic index indicating how busy the resource will be, if it is
+         * high, the lock's granted region will not be so big lest it conflicts
+         * other locks, causing frequent lock cancellation and re-enqueue
+         */
+        int                   l_traffic;
+
         /*
          * Protected by lr_lock. Various counters: readers, writers, etc.
          */
index 5d54f25..a43b996 100644 (file)
@@ -681,6 +681,60 @@ enum interval_iter interval_search(struct interval_node *node,
 }
 EXPORT_SYMBOL(interval_search);
 
+enum interval_iter interval_search_expand_extent( struct interval_node *node,
+                                       struct interval_node_extent *ext,
+                                       struct interval_node_extent *result_ext,
+                                       interval_callback_t func, void *data)
+{
+        struct interval_node *parent;
+        enum interval_iter rc = INTERVAL_ITER_CONT;
+
+        LASSERT(ext != NULL);
+        LASSERT(func != NULL);
+
+        while (node) {
+                if (ext->end < interval_low(node)) {
+                        if (result_ext->end > interval_low(node) - 1)
+                                result_ext->end = interval_low(node) - 1;
+                        if (node->in_left) {
+                                node = node->in_left;
+                                continue;
+                        }
+                } else if (ext->start > node->in_max_high) {
+                        if (result_ext->start < node->in_max_high + 1)
+                                result_ext->start = node->in_max_high + 1;
+                } else {
+                        if (extent_overlapped(ext, &node->in_extent)) {
+                                rc = func(node, data);
+                                if (rc == INTERVAL_ITER_STOP)
+                                        break;
+                        }
+
+                        if (node->in_left) {
+                                node = node->in_left;
+                                continue;
+                        }
+                        if (node->in_right) {
+                                node = node->in_right;
+                                continue;
+                        }
+                }
+
+                parent = node->in_parent;
+                while (parent) {
+                        if (node_is_left_child(node) && parent->in_right) {
+                                node = parent->in_right;
+                                break;
+                        }
+                        node = parent;
+                        parent = node->in_parent;
+                }
+                if (parent == NULL)
+                        break;
+        }
+        return rc;
+}
+
 static enum interval_iter interval_overlap_cb(struct interval_node *n,
                                               void *args)
 {
index a8aa5ec..4eba10f 100644 (file)
@@ -100,185 +100,6 @@ static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
                  mask, new_ex->end, req_end);
 }
 
-/* The purpose of this function is to return:
- * - the maximum extent
- * - containing the requested extent
- * - and not overlapping existing conflicting extents outside the requested one
- *
- * Use interval tree to expand the lock extent for granted lock.
- */
-static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
-                                                struct ldlm_extent *new_ex)
-{
-        struct ldlm_resource *res = req->l_resource;
-        ldlm_mode_t req_mode = req->l_req_mode;
-        __u64 req_start = req->l_req_extent.start;
-        __u64 req_end = req->l_req_extent.end;
-        struct ldlm_interval_tree *tree;
-        struct interval_node_extent limiter = { new_ex->start, new_ex->end };
-        int conflicting = 0;
-        int idx;
-        ENTRY;
-
-        lockmode_verify(req_mode);
-
-        /* using interval tree to handle the ldlm extent granted locks */
-        for (idx = 0; idx < LCK_MODE_NUM; idx++) {
-                struct interval_node_extent ext = { req_start, req_end };
-
-                tree = &res->lr_itree[idx];
-                if (lockmode_compat(tree->lit_mode, req_mode))
-                        continue;
-
-                conflicting += tree->lit_size;
-                if (conflicting > 4)
-                        limiter.start = req_start;
-
-                if (interval_is_overlapped(tree->lit_root, &ext))
-                        CDEBUG(D_INFO, 
-                               "req_mode = %d, tree->lit_mode = %d, "
-                               "tree->lit_size = %d\n",
-                               req_mode, tree->lit_mode, tree->lit_size);
-                interval_expand(tree->lit_root, &ext, &limiter);
-                limiter.start = max(limiter.start, ext.start);
-                limiter.end = min(limiter.end, ext.end);
-                if (limiter.start == req_start && limiter.end == req_end)
-                        break;
-        }
-
-        new_ex->start = limiter.start;
-        new_ex->end = limiter.end;
-        LASSERT(new_ex->start <= req_start);
-        LASSERT(new_ex->end >= req_end);
-
-        ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
-        EXIT;
-}
-
-/* The purpose of this function is to return:
- * - the maximum extent
- * - containing the requested extent
- * - and not overlapping existing conflicting extents outside the requested one
- */
-static void
-ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
-                                    struct ldlm_extent *new_ex)
-{
-        cfs_list_t *tmp;
-        struct ldlm_resource *res = req->l_resource;
-        ldlm_mode_t req_mode = req->l_req_mode;
-        __u64 req_start = req->l_req_extent.start;
-        __u64 req_end = req->l_req_extent.end;
-        int conflicting = 0;
-        ENTRY;
-
-        lockmode_verify(req_mode);
-
-        /* for waiting locks */
-        cfs_list_for_each(tmp, &res->lr_waiting) {
-                struct ldlm_lock *lock;
-                struct ldlm_extent *l_extent;
-
-                lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
-                l_extent = &lock->l_policy_data.l_extent;
-
-                /* We already hit the minimum requested size, search no more */
-                if (new_ex->start == req_start && new_ex->end == req_end) {
-                        EXIT;
-                        return;
-                }
-
-                /* Don't conflict with ourselves */
-                if (req == lock)
-                        continue;
-
-                /* Locks are compatible, overlap doesn't matter */
-                /* Until bug 20 is fixed, try to avoid granting overlapping
-                 * locks on one client (they take a long time to cancel) */
-                if (lockmode_compat(lock->l_req_mode, req_mode) &&
-                    lock->l_export != req->l_export)
-                        continue;
-
-                /* If this is a high-traffic lock, don't grow downwards at all
-                 * or grow upwards too much */
-                ++conflicting;
-                if (conflicting > 4)
-                        new_ex->start = req_start;
-
-                /* If lock doesn't overlap new_ex, skip it. */
-                if (!ldlm_extent_overlap(l_extent, new_ex))
-                        continue;
-
-                /* Locks conflicting in requested extents and we can't satisfy
-                 * both locks, so ignore it.  Either we will ping-pong this
-                 * extent (we would regardless of what extent we granted) or
-                 * lock is unused and it shouldn't limit our extent growth. */
-                if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
-                        continue;
-
-                /* We grow extents downwards only as far as they don't overlap
-                 * with already-granted locks, on the assumption that clients
-                 * will be writing beyond the initial requested end and would
-                 * then need to enqueue a new lock beyond previous request.
-                 * l_req_extent->end strictly < req_start, checked above. */
-                if (l_extent->start < req_start && new_ex->start != req_start) {
-                        if (l_extent->end >= req_start)
-                                new_ex->start = req_start;
-                        else
-                                new_ex->start = min(l_extent->end+1, req_start);
-                }
-
-                /* If we need to cancel this lock anyways because our request
-                 * overlaps the granted lock, we grow up to its requested
-                 * extent start instead of limiting this extent, assuming that
-                 * clients are writing forwards and the lock had over grown
-                 * its extent downwards before we enqueued our request. */
-                if (l_extent->end > req_end) {
-                        if (l_extent->start <= req_end)
-                                new_ex->end = max(lock->l_req_extent.start - 1,
-                                                  req_end);
-                        else
-                                new_ex->end = max(l_extent->start - 1, req_end);
-                }
-        }
-
-        ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
-        EXIT;
-}
-
-
-/* In order to determine the largest possible extent we can grant, we need
- * to scan all of the queues. */
-static void ldlm_extent_policy(struct ldlm_resource *res,
-                               struct ldlm_lock *lock, int *flags)
-{
-        struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
-
-        if (lock->l_export == NULL)
-                /*
-                 * this is local lock taken by server (e.g., as a part of
-                 * OST-side locking, or unlink handling). Expansion doesn't
-                 * make a lot of sense for local locks, because they are
-                 * dropped immediately on operation completion and would only
-                 * conflict with other threads.
-                 */
-                return;
-
-        if (lock->l_policy_data.l_extent.start == 0 &&
-            lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
-                /* fast-path whole file locks */
-                return;
-
-        ldlm_extent_internal_policy_granted(lock, &new_ex);
-        ldlm_extent_internal_policy_waiting(lock, &new_ex);
-
-        if (new_ex.start != lock->l_policy_data.l_extent.start ||
-            new_ex.end != lock->l_policy_data.l_extent.end) {
-                *flags |= LDLM_FL_LOCK_CHANGED;
-                lock->l_policy_data.l_extent.start = new_ex.start;
-                lock->l_policy_data.l_extent.end = new_ex.end;
-        }
-}
 
 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
 {
@@ -301,6 +122,7 @@ struct ldlm_extent_compat_args {
         ldlm_mode_t mode;
         int *locks;
         int *compat;
+        int *conflicts;
 };
 
 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
@@ -323,7 +145,14 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
                          "mode = %s, lock->l_granted_mode = %s\n",
                          ldlm_lockname[mode],
                          ldlm_lockname[lock->l_granted_mode]);
-                count++;
+
+                /* only count _requested_ region overlapped locks as contended
+                 * locks */
+                if (lock->l_req_extent.end >= enq->l_req_extent.start &&
+                    lock->l_req_extent.start <= enq->l_req_extent.end) {
+                        count++;
+                        (*priv->conflicts)++;
+                }
                 if (lock->l_blocking_ast)
                         ldlm_add_ast_work_item(lock, enq, work_list);
         }
@@ -340,261 +169,380 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
         RETURN(INTERVAL_ITER_CONT);
 }
 
-/* Determine if the lock is compatible with all locks on the queue.
- * We stop walking the queue if we hit ourselves so we don't take
- * conflicting locks enqueued after us into accound, or we'd wait forever.
- *
- * 0 if the lock is not compatible
- * 1 if the lock is compatible
- * 2 if this group lock is compatible and requires no further checking
- * negative error, such as EWOULDBLOCK for group locks
- */
 static int
-ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
-                         int *flags, ldlm_error_t *err,
-                         cfs_list_t *work_list, int *contended_locks)
+ldlm_extent_compat_granted_queue(cfs_list_t *queue, struct ldlm_lock *req,
+                                 int *flags, ldlm_error_t *err,
+                                 cfs_list_t *work_list, int *contended_locks)
 {
-        cfs_list_t *tmp;
-        struct ldlm_lock *lock;
         struct ldlm_resource *res = req->l_resource;
         ldlm_mode_t req_mode = req->l_req_mode;
         __u64 req_start = req->l_req_extent.start;
         __u64 req_end = req->l_req_extent.end;
-        int compat = 1;
-        int scan = 0;
-        int check_contention;
+        int compat = 1, conflicts;
+        /* Using interval tree for granted lock */
+        struct ldlm_interval_tree *tree;
+        struct ldlm_extent_compat_args data = {.work_list = work_list,
+                                       .lock = req,
+                                       .locks = contended_locks,
+                                       .compat = &compat,
+                                       .conflicts = &conflicts };
+        struct interval_node_extent ex = { .start = req_start,
+                                           .end = req_end };
+        int idx, rc;
         ENTRY;
 
-        lockmode_verify(req_mode);
 
-        /* Using interval tree for granted lock */
-        if (queue == &res->lr_granted) {
-                struct ldlm_interval_tree *tree;
-                struct ldlm_extent_compat_args data = {.work_list = work_list,
-                                               .lock = req,
-                                               .locks = contended_locks,
-                                               .compat = &compat };
-                struct interval_node_extent ex = { .start = req_start,
-                                                   .end = req_end };
-                int idx, rc;
-
-                for (idx = 0; idx < LCK_MODE_NUM; idx++) {
-                        tree = &res->lr_itree[idx];
-                        if (tree->lit_root == NULL) /* empty tree, skipped */
+        for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+                tree = &res->lr_itree[idx];
+                if (tree->lit_root == NULL) /* empty tree, skipped */
+                        continue;
+
+                data.mode = tree->lit_mode;
+                if (lockmode_compat(req_mode, tree->lit_mode)) {
+                        struct ldlm_interval *node;
+                        struct ldlm_extent *extent;
+
+                        if (req_mode != LCK_GROUP)
                                 continue;
 
-                        data.mode = tree->lit_mode;
-                        if (lockmode_compat(req_mode, tree->lit_mode)) {
-                                struct ldlm_interval *node;
-                                struct ldlm_extent *extent;
-
-                                if (req_mode != LCK_GROUP)
-                                        continue;
-
-                                /* group lock, grant it immediately if
-                                 * compatible */
-                                node = to_ldlm_interval(tree->lit_root);
-                                extent = ldlm_interval_extent(node);
-                                if (req->l_policy_data.l_extent.gid ==
-                                    extent->gid)
-                                        RETURN(2);
+                        /* group lock, grant it immediately if
+                         * compatible */
+                        node = to_ldlm_interval(tree->lit_root);
+                        extent = ldlm_interval_extent(node);
+                        if (req->l_policy_data.l_extent.gid ==
+                            extent->gid)
+                                RETURN(2);
+                }
+
+                if (tree->lit_mode == LCK_GROUP) {
+                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+                                compat = -EWOULDBLOCK;
+                                goto destroylock;
                         }
 
-                        if (tree->lit_mode == LCK_GROUP) {
-                                if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                                        compat = -EWOULDBLOCK;
-                                        goto destroylock;
-                                }
+                        *flags |= LDLM_FL_NO_TIMEOUT;
+                        if (!work_list)
+                                RETURN(0);
 
-                                *flags |= LDLM_FL_NO_TIMEOUT;
-                                if (!work_list)
-                                        RETURN(0);
+                        /* if work list is not NULL,add all
+                           locks in the tree to work list */
+                        compat = 0;
+                        interval_iterate(tree->lit_root,
+                                         ldlm_extent_compat_cb, &data);
+                        continue;
+                }
 
-                                /* if work list is not NULL,add all
-                                   locks in the tree to work list */
-                                compat = 0;
-                                interval_iterate(tree->lit_root,
-                                                 ldlm_extent_compat_cb, &data);
-                                continue;
-                        }
 
-                        if (!work_list) {
-                                rc = interval_is_overlapped(tree->lit_root,&ex);
-                                if (rc)
-                                        RETURN(0);
-                        } else {
-                                interval_search(tree->lit_root, &ex,
-                                                ldlm_extent_compat_cb, &data);
-                                if (!cfs_list_empty(work_list) && compat)
+                if (!work_list) {
+                        rc = interval_is_overlapped(tree->lit_root, &ex);
+                        if (rc)
+                                RETURN(0);
+                } else {
+                        struct interval_node_extent result_ext = {
+                                .start = req->l_policy_data.l_extent.start,
+                                .end = req->l_policy_data.l_extent.end };
+
+                        conflicts = 0;
+                        interval_search_expand_extent(tree->lit_root, &ex,
+                                                      &result_ext,
+                                                      ldlm_extent_compat_cb,
+                                                      &data);
+                        req->l_policy_data.l_extent.start = result_ext.start;
+                        req->l_policy_data.l_extent.end = result_ext.end;
+                        /* for granted locks, count non-compatible not overlapping
+                         * locks in traffic index */
+                        req->l_traffic += tree->lit_size - conflicts;
+
+                        if (!cfs_list_empty(work_list)) {
+                                if (compat)
                                         compat = 0;
+                                /* if there is at least 1 conflicting lock, we
+                                 * do not expand to the left, since we often
+                                 * continue writing to the right.
+                                 */
+                                req->l_policy_data.l_extent.start = req_start;
                         }
                 }
-        } else { /* for waiting queue */
-                cfs_list_for_each(tmp, queue) {
-                        check_contention = 1;
+        }
 
-                        lock = cfs_list_entry(tmp, struct ldlm_lock,
-                                              l_res_link);
+        RETURN(compat);
+destroylock:
+        cfs_list_del_init(&req->l_res_link);
+        ldlm_lock_destroy_nolock(req);
+        *err = compat;
+        RETURN(compat);
+}
 
-                        if (req == lock)
-                                break;
+static int
+ldlm_extent_compat_waiting_queue(cfs_list_t *queue, struct ldlm_lock *req,
+                                 int *flags, ldlm_error_t *err,
+                                 cfs_list_t *work_list, int *contended_locks)
+{
+        cfs_list_t *tmp;
+        struct ldlm_lock *lock;
+        ldlm_mode_t req_mode = req->l_req_mode;
+        __u64 req_start = req->l_req_extent.start;
+        __u64 req_end = req->l_req_extent.end;
+        int compat = 1;
+        int scan = 0;
+        int check_contention;
+        ENTRY;
 
-                        if (unlikely(scan)) {
-                                /* We only get here if we are queuing GROUP lock
-                                   and met some incompatible one. The main idea of this
-                                   code is to insert GROUP lock past compatible GROUP
-                                   lock in the waiting queue or if there is not any,
-                                   then in front of first non-GROUP lock */
-                                if (lock->l_req_mode != LCK_GROUP) {
-                                        /* Ok, we hit non-GROUP lock, there should
-                                         * be no more GROUP locks later on, queue in
-                                         * front of first non-GROUP lock */
-
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        cfs_list_del_init(&lock->l_res_link);
-                                        ldlm_resource_insert_lock_after(req, lock);
-                                        compat = 0;
-                                        break;
-                                }
-                                if (req->l_policy_data.l_extent.gid ==
-                                    lock->l_policy_data.l_extent.gid) {
-                                        /* found it */
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        compat = 0;
-                                        break;
-                                }
-                                continue;
-                        }
+        cfs_list_for_each(tmp, queue) {
+                check_contention = 1;
 
-                        /* locks are compatible, overlap doesn't matter */
-                        if (lockmode_compat(lock->l_req_mode, req_mode)) {
-                                if (req_mode == LCK_PR &&
-                                    ((lock->l_policy_data.l_extent.start <=
-                                      req->l_policy_data.l_extent.start) &&
-                                     (lock->l_policy_data.l_extent.end >=
-                                      req->l_policy_data.l_extent.end))) {
-                                        /* If we met a PR lock just like us or wider,
-                                           and nobody down the list conflicted with
-                                           it, that means we can skip processing of
-                                           the rest of the list and safely place
-                                           ourselves at the end of the list, or grant
-                                           (dependent if we met an conflicting locks
-                                           before in the list).
-                                           In case of 1st enqueue only we continue
-                                           traversing if there is something conflicting
-                                           down the list because we need to make sure
-                                           that something is marked as AST_SENT as well,
-                                           in cse of empy worklist we would exit on
-                                           first conflict met. */
-                                        /* There IS a case where such flag is
-                                           not set for a lock, yet it blocks
-                                           something. Luckily for us this is
-                                           only during destroy, so lock is
-                                           exclusive. So here we are safe */
-                                        if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
-                                                RETURN(compat);
-                                        }
-                                }
+                lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
 
-                                /* non-group locks are compatible, overlap doesn't
-                                   matter */
-                                if (likely(req_mode != LCK_GROUP))
-                                        continue;
-
-                                /* If we are trying to get a GROUP lock and there is
-                                   another one of this kind, we need to compare gid */
-                                if (req->l_policy_data.l_extent.gid ==
-                                    lock->l_policy_data.l_extent.gid) {
-                                        /* If existing lock with matched gid is granted,
-                                           we grant new one too. */
-                                        if (lock->l_req_mode == lock->l_granted_mode)
-                                                RETURN(2);
-
-                                        /* Otherwise we are scanning queue of waiting
-                                         * locks and it means current request would
-                                         * block along with existing lock (that is
-                                         * already blocked.
-                                         * If we are in nonblocking mode - return
-                                         * immediately */
-                                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                                                compat = -EWOULDBLOCK;
-                                                goto destroylock;
-                                        }
-                                        /* If this group lock is compatible with another
-                                         * group lock on the waiting list, they must be
-                                         * together in the list, so they can be granted
-                                         * at the same time.  Otherwise the later lock
-                                         * can get stuck behind another, incompatible,
-                                         * lock. */
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        /* Because 'lock' is not granted, we can stop
-                                         * processing this queue and return immediately.
-                                         * There is no need to check the rest of the
-                                         * list. */
-                                        RETURN(0);
-                                }
-                        }
+                if (req == lock)
+                        break;
 
-                        if (unlikely(req_mode == LCK_GROUP &&
-                                     (lock->l_req_mode != lock->l_granted_mode))) {
-                                scan = 1;
+                if (unlikely(scan)) {
+                        /* We only get here if we are queuing GROUP lock
+                           and met some incompatible one. The main idea of this
+                           code is to insert GROUP lock past compatible GROUP
+                           lock in the waiting queue or if there is not any,
+                           then in front of first non-GROUP lock */
+                        if (lock->l_req_mode != LCK_GROUP) {
+                                /* Ok, we hit non-GROUP lock, there should be no
+                                   more GROUP locks later on, queue in front of
+                                   first non-GROUP lock */
+
+                                ldlm_resource_insert_lock_after(lock, req);
+                                cfs_list_del_init(&lock->l_res_link);
+                                ldlm_resource_insert_lock_after(req, lock);
                                 compat = 0;
-                                if (lock->l_req_mode != LCK_GROUP) {
-                                        /* Ok, we hit non-GROUP lock, there should be no
-                                           more GROUP locks later on, queue in front of
-                                           first non-GROUP lock */
-
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        cfs_list_del_init(&lock->l_res_link);
-                                        ldlm_resource_insert_lock_after(req, lock);
-                                        break;
-                                }
-                                if (req->l_policy_data.l_extent.gid ==
-                                    lock->l_policy_data.l_extent.gid) {
-                                        /* found it */
-                                        ldlm_resource_insert_lock_after(lock, req);
-                                        break;
+                                break;
+                        }
+                        if (req->l_policy_data.l_extent.gid ==
+                            lock->l_policy_data.l_extent.gid) {
+                                /* found it */
+                                ldlm_resource_insert_lock_after(lock, req);
+                                compat = 0;
+                                break;
+                        }
+                        continue;
+                }
+
+                /* locks are compatible, overlap doesn't matter */
+                if (lockmode_compat(lock->l_req_mode, req_mode)) {
+                        if (req_mode == LCK_PR &&
+                            ((lock->l_policy_data.l_extent.start <=
+                              req->l_policy_data.l_extent.start) &&
+                             (lock->l_policy_data.l_extent.end >=
+                              req->l_policy_data.l_extent.end))) {
+                                /* If we met a PR lock just like us or wider,
+                                   and nobody down the list conflicted with
+                                   it, that means we can skip processing of
+                                   the rest of the list and safely place
+                                   ourselves at the end of the list, or grant
+                                   (dependent if we met an conflicting locks
+                                   before in the list).
+                                   In case of 1st enqueue only we continue
+                                   traversing if there is something conflicting
+                                   down the list because we need to make sure
+                                   that something is marked as AST_SENT as well,
+                                   in cse of empy worklist we would exit on
+                                   first conflict met. */
+                                /* There IS a case where such flag is
+                                   not set for a lock, yet it blocks
+                                   something. Luckily for us this is
+                                   only during destroy, so lock is
+                                   exclusive. So here we are safe */
+                                if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
+                                        RETURN(compat);
                                 }
-                                continue;
                         }
 
-                        if (unlikely(lock->l_req_mode == LCK_GROUP)) {
-                                /* If compared lock is GROUP, then requested is PR/PW/
-                                 * so this is not compatible; extent range does not
-                                 * matter */
+                        /* non-group locks are compatible, overlap doesn't
+                           matter */
+                        if (likely(req_mode != LCK_GROUP))
+                                continue;
+
+                        /* If we are trying to get a GROUP lock and there is
+                           another one of this kind, we need to compare gid */
+                        if (req->l_policy_data.l_extent.gid ==
+                            lock->l_policy_data.l_extent.gid) {
+                                /* We are scanning queue of waiting
+                                 * locks and it means current request would
+                                 * block along with existing lock (that is
+                                 * already blocked.
+                                 * If we are in nonblocking mode - return
+                                 * immediately */
                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
                                         compat = -EWOULDBLOCK;
                                         goto destroylock;
-                                } else {
-                                        *flags |= LDLM_FL_NO_TIMEOUT;
                                 }
-                        } else if (lock->l_policy_data.l_extent.end < req_start ||
-                                   lock->l_policy_data.l_extent.start > req_end) {
+                                /* If this group lock is compatible with another
+                                 * group lock on the waiting list, they must be
+                                 * together in the list, so they can be granted
+                                 * at the same time.  Otherwise the later lock
+                                 * can get stuck behind another, incompatible,
+                                 * lock. */
+                                ldlm_resource_insert_lock_after(lock, req);
+                                /* Because 'lock' is not granted, we can stop
+                                 * processing this queue and return immediately.
+                                 * There is no need to check the rest of the
+                                 * list. */
+                                RETURN(0);
+                        }
+                }
+
+                if (unlikely(req_mode == LCK_GROUP &&
+                             (lock->l_req_mode != lock->l_granted_mode))) {
+                        scan = 1;
+                        compat = 0;
+                        if (lock->l_req_mode != LCK_GROUP) {
+                                /* Ok, we hit non-GROUP lock, there should
+                                 * be no more GROUP locks later on, queue in
+                                 * front of first non-GROUP lock */
+
+                                ldlm_resource_insert_lock_after(lock, req);
+                                cfs_list_del_init(&lock->l_res_link);
+                                ldlm_resource_insert_lock_after(req, lock);
+                                break;
+                        }
+                        if (req->l_policy_data.l_extent.gid ==
+                            lock->l_policy_data.l_extent.gid) {
+                                /* found it */
+                                ldlm_resource_insert_lock_after(lock, req);
+                                break;
+                        }
+                        continue;
+                }
+
+                if (unlikely(lock->l_req_mode == LCK_GROUP)) {
+                        /* If compared lock is GROUP, then requested is PR/PW/
+                         * so this is not compatible; extent range does not
+                         * matter */
+                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+                                compat = -EWOULDBLOCK;
+                                goto destroylock;
+                        } else {
+                                *flags |= LDLM_FL_NO_TIMEOUT;
+                        }
+                } else if (!work_list) {
+                        if (lock->l_policy_data.l_extent.end < req_start ||
+                            lock->l_policy_data.l_extent.start > req_end)
                                 /* if a non group lock doesn't overlap skip it */
                                 continue;
-                        } else if (lock->l_req_extent.end < req_start ||
-                                   lock->l_req_extent.start > req_end) {
-                                /* false contention, the requests doesn't really overlap */
-                                check_contention = 0;
+                        RETURN(0);
+                } else {
+                        /* for waiting locks, count all non-compatible locks in
+                         * traffic index */
+                        ++req->l_traffic;
+                        ++lock->l_traffic;
+
+                        /* adjust policy */
+                        if (lock->l_policy_data.l_extent.end < req_start) {
+                                /*     lock            req
+                                 * ------------+
+                                 * ++++++      |   +++++++
+                                 *      +      |   +
+                                 * ++++++      |   +++++++
+                                 * ------------+
+                                 */
+                                if (lock->l_policy_data.l_extent.end >
+                                    req->l_policy_data.l_extent.start)
+                                        req->l_policy_data.l_extent.start =
+                                             lock->l_policy_data.l_extent.end+1;
+                                continue;
+                        } else if (lock->l_req_extent.end < req_start) {
+                                /*     lock            req
+                                 * ------------------+
+                                 * ++++++          +++++++
+                                 *      +          + |
+                                 * ++++++          +++++++
+                                 * ------------------+
+                                 */
+                                lock->l_policy_data.l_extent.end =
+                                                          req_start - 1;
+                                req->l_policy_data.l_extent.start =
+                                                              req_start;
+                                continue;
+                        } else if (lock->l_policy_data.l_extent.start >
+                                   req_end) {
+                                /*  req              lock
+                                 *              +--------------
+                                 *  +++++++     |    +++++++
+                                 *        +     |    +
+                                 *  +++++++     |    +++++++
+                                 *              +--------------
+                                 */
+                                if (lock->l_policy_data.l_extent.start <
+                                    req->l_policy_data.l_extent.end)
+                                        req->l_policy_data.l_extent.end =
+                                           lock->l_policy_data.l_extent.start-1;
+                                continue;
+                        } else if (lock->l_req_extent.start > req_end) {
+                                /*  req              lock
+                                 *      +----------------------
+                                 *  +++++++          +++++++
+                                 *      | +          +
+                                 *  +++++++          +++++++
+                                 *      +----------------------
+                                 */
+                                lock->l_policy_data.l_extent.start =
+                                                            req_end + 1;
+                                req->l_policy_data.l_extent.end=req_end;
+                                continue;
                         }
+                } /* policy_adj */
 
-                        if (!work_list)
-                                RETURN(0);
-
+                compat = 0;
+                if (work_list) {
                         /* don't count conflicting glimpse locks */
-                        if (lock->l_req_mode == LCK_PR &&
-                            lock->l_policy_data.l_extent.start == 0 &&
-                            lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
+                        if (lock->l_flags & LDLM_FL_HAS_INTENT)
                                 check_contention = 0;
 
                         *contended_locks += check_contention;
 
-                        compat = 0;
                         if (lock->l_blocking_ast)
                                 ldlm_add_ast_work_item(lock, req, work_list);
                 }
         }
 
+        RETURN(compat);
+destroylock:
+        cfs_list_del_init(&req->l_res_link);
+        ldlm_lock_destroy_nolock(req);
+        *err = compat;
+        RETURN(compat);
+}
+/* Determine if the lock is compatible with all locks on the queue.
+ * We stop walking the queue if we hit ourselves so we don't take
+ * conflicting locks enqueued after us into accound, or we'd wait forever.
+ *
+ * 0 if the lock is not compatible
+ * 1 if the lock is compatible
+ * 2 if this group lock is compatible and requires no further checking
+ * negative error, such as EWOULDBLOCK for group locks
+ *
+ * Note: policy adjustment only happends during the 1st lock enqueue procedure
+ */
+static int
+ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
+                         int *flags, ldlm_error_t *err,
+                         cfs_list_t *work_list, int *contended_locks)
+{
+        struct ldlm_resource *res = req->l_resource;
+        ldlm_mode_t req_mode = req->l_req_mode;
+        __u64 req_start = req->l_req_extent.start;
+        __u64 req_end = req->l_req_extent.end;
+        int compat = 1;
+        ENTRY;
+
+        lockmode_verify(req_mode);
+
+        if (queue == &res->lr_granted)
+                compat = ldlm_extent_compat_granted_queue(queue, req, flags,
+                                                          err, work_list,
+                                                          contended_locks);
+        else
+                compat = ldlm_extent_compat_waiting_queue(queue, req, flags,
+                                                          err, work_list,
+                                                          contended_locks);
+
+
         if (ldlm_check_contention(req, *contended_locks) &&
             compat == 0 &&
             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
@@ -632,6 +580,24 @@ static void discard_bl_list(cfs_list_t *bl_list)
         EXIT;
 }
 
+static inline void ldlm_process_extent_init(struct ldlm_lock *lock)
+{
+        lock->l_policy_data.l_extent.start = 0;
+        lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
+}
+
+static inline void ldlm_process_extent_fini(struct ldlm_lock *lock, int *flags)
+{
+        if (lock->l_traffic > 4)
+                lock->l_policy_data.l_extent.start = lock->l_req_extent.start;
+        ldlm_extent_internal_policy_fixup(lock,
+                                          &lock->l_policy_data.l_extent,
+                                          lock->l_traffic);
+        if (lock->l_req_extent.start != lock->l_policy_data.l_extent.start ||
+            lock->l_req_extent.end   != lock->l_policy_data.l_extent.end)
+                *flags |= LDLM_FL_LOCK_CHANGED;
+}
+
 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
   *   - blocking ASTs have already been sent
   *   - must call this function with the ns lock held
@@ -673,14 +639,24 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
 
                 ldlm_resource_unlink_lock(lock);
 
-                if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
-                        ldlm_extent_policy(res, lock, flags);
+                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) {
+                        lock->l_policy_data.l_extent.start =
+                                lock->l_req_extent.start;
+                        lock->l_policy_data.l_extent.end =
+                                lock->l_req_extent.end;
+                } else {
+                        ldlm_process_extent_fini(lock, flags);
+                }
+
                 ldlm_grant_lock(lock, work_list);
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
  restart:
         contended_locks = 0;
+
+        ldlm_process_extent_init(lock);
+
         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
                                       &rpc_list, &contended_locks);
         if (rc < 0)
@@ -695,8 +671,8 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
 
         if (rc + rc2 == 2) {
         grant:
-                ldlm_extent_policy(res, lock, flags);
                 ldlm_resource_unlink_lock(lock);
+                ldlm_process_extent_fini(lock, flags);
                 ldlm_grant_lock(lock, NULL);
         } else {
                 /* If either of the compat_queue()s returned failure, then we