1 // SPDX-License-Identifier: GPL-2.0
4 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
5 * Use is subject to license terms.
7 * Copyright (c) 2010, 2017, Intel Corporation.
11 * This file is part of Lustre, http://www.lustre.org/
13 * Author: Peter Braam <braam@clusterfs.com>
14 * Author: Phil Schwan <phil@clusterfs.com>
18 * This file contains implementation of EXTENT lock type
20 * EXTENT lock type is for locking a contiguous range of values, represented
21 * by 64-bit starting and ending offsets (inclusive). There are several extent
22 * lock modes, some of which may be mutually incompatible. Extent locks are
23 * considered incompatible if their modes are incompatible and their extents
24 * intersect. See the lock mode compatibility matrix in lustre_dlm.h.
27 #define DEBUG_SUBSYSTEM S_LDLM
29 #include <libcfs/libcfs.h>
30 #include <lustre_dlm.h>
31 #include <obd_support.h>
33 #include <obd_class.h>
34 #include <lustre_lib.h>
36 #include "ldlm_internal.h"
37 #include <linux/interval_tree_generic.h>
39 #define START(node) ((node)->l_policy_data.l_extent.start)
40 #define LAST(node) ((node)->l_policy_data.l_extent.end)
41 INTERVAL_TREE_DEFINE(struct ldlm_lock, l_rb, u64, l_subtree_last,
42 START, LAST, static inline, extent);
44 static inline struct ldlm_lock *extent_next_lock(struct ldlm_lock *lock)
46 lock = list_next_entry(lock, l_same_extent);
47 if (RB_EMPTY_NODE(&lock->l_rb))
49 return extent_next(lock);
52 static inline struct ldlm_lock *extent_prev_lock(struct ldlm_lock *lock)
54 lock = list_prev_entry(lock, l_same_extent);
55 if (RB_EMPTY_NODE(&lock->l_rb))
57 return extent_prev(lock);
60 static inline struct ldlm_lock *extent_insert_unique(
61 struct ldlm_lock *node, struct interval_tree_root *root)
63 struct rb_node **link, *rb_parent = NULL;
64 u64 start = START(node), last = LAST(node);
65 struct ldlm_lock *parent;
68 #ifdef HAVE_INTERVAL_TREE_CACHED
69 link = &root->rb_root.rb_node;
71 link = &root->rb_node;
75 parent = rb_entry(rb_parent, struct ldlm_lock, l_rb);
76 if (parent->l_subtree_last < last)
77 parent->l_subtree_last = last;
78 if (start == START(parent)) {
79 if (last == LAST(parent))
81 if (last < LAST(parent)) {
82 link = &parent->l_rb.rb_left;
84 link = &parent->l_rb.rb_right;
87 } else if (start < START(parent))
88 link = &parent->l_rb.rb_left;
90 link = &parent->l_rb.rb_right;
95 node->l_subtree_last = last;
96 rb_link_node(&node->l_rb, rb_parent, link);
97 #ifdef HAVE_INTERVAL_TREE_CACHED
98 rb_insert_augmented_cached(&node->l_rb, root,
99 leftmost, &extent_augment);
101 rb_insert_augmented(&node->l_rb, root,
107 static inline void extent_replace(struct ldlm_lock *in_tree,
108 struct ldlm_lock *new,
109 struct interval_tree_root *tree)
111 struct rb_node *p = rb_parent(&in_tree->l_rb);
113 /* place 'new' in the rbtree replacing in_tree */
114 new->l_rb.rb_left = in_tree->l_rb.rb_left;
115 new->l_rb.rb_right = in_tree->l_rb.rb_right;
117 if (new->l_rb.rb_left)
118 rb_set_parent_color(new->l_rb.rb_left, &new->l_rb,
119 rb_color(new->l_rb.rb_left));
120 if (new->l_rb.rb_right)
121 rb_set_parent_color(new->l_rb.rb_right, &new->l_rb,
122 rb_color(new->l_rb.rb_right));
123 rb_set_parent_color(&new->l_rb, p, rb_color(&in_tree->l_rb));
126 #ifdef HAVE_INTERVAL_TREE_CACHED
127 tree->rb_root.rb_node = &new->l_rb;
129 tree->rb_node = &new->l_rb;
131 } else if (p->rb_left == &in_tree->l_rb) {
132 p->rb_left = &new->l_rb;
134 p->rb_right = &new->l_rb;
136 #ifdef HAVE_INTERVAL_TREE_CACHED
137 if (tree->rb_leftmost == &in_tree->l_rb)
138 tree->rb_leftmost = &new->l_rb;
142 #ifdef HAVE_SERVER_SUPPORT
143 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
146 * Fix up the ldlm_extent after expanding it.
148 * After expansion has been done, we might still want to do certain adjusting
149 * based on overall contention of the resource and the like to avoid granting
152 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
153 struct ldlm_extent *new_ex,
156 enum ldlm_mode req_mode = req->l_req_mode;
157 __u64 req_start = req->l_req_extent.start;
158 __u64 req_end = req->l_req_extent.end;
159 __u64 req_align, mask;
161 if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
162 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
163 new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
167 if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
172 /* we need to ensure that the lock extent is properly aligned to what
173 * the client requested. Also we need to make sure it's also server
174 * page size aligned otherwise a server page can be covered by two
178 req_align = (req_end + 1) | req_start;
179 if (req_align != 0 && (req_align & (mask - 1)) == 0) {
180 while ((req_align & mask) == 0)
184 /* We can only shrink the lock, not grow it.
185 * This should never cause lock to be smaller than requested,
186 * since requested lock was already aligned on these boundaries.
188 new_ex->start = ((new_ex->start - 1) | mask) + 1;
189 new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
190 LASSERTF(new_ex->start <= req_start,
191 "mask %#llx grant start %llu req start %llu\n",
192 mask, new_ex->start, req_start);
193 LASSERTF(new_ex->end >= req_end,
194 "mask %#llx grant end %llu req end %llu\n",
195 mask, new_ex->end, req_end);
199 * Return the maximum extent that:
200 * - contains the requested extent
201 * - does not overlap existing conflicting extents outside the requested one
203 * This allows clients to request a small required extent range, but if there
204 * is no contention on the lock the full lock can be granted to the client.
205 * This avoids the need for many smaller lock requests to be granted in the
206 * common (uncontended) case.
208 * Use interval tree to expand the lock extent for granted lock.
210 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
211 struct ldlm_extent *new_ex)
213 struct ldlm_resource *res = req->l_resource;
214 enum ldlm_mode req_mode = req->l_req_mode;
215 __u64 req_start = req->l_req_extent.start;
216 __u64 req_end = req->l_req_extent.end;
217 struct ldlm_interval_tree *tree;
223 lockmode_verify(req_mode);
225 /* Using interval tree to handle the LDLM extent granted locks. */
226 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
227 tree = &res->lr_itree[idx];
228 if (lockmode_compat(tree->lit_mode, req_mode))
231 conflicting += tree->lit_size;
233 if (!INTERVAL_TREE_EMPTY(&tree->lit_root)) {
234 struct ldlm_lock *lck = NULL;
236 LASSERTF(!extent_iter_first(&tree->lit_root,
238 "req_mode=%d, start=%llu, end=%llu\n",
239 req_mode, req_start, req_end);
241 * If any tree is non-empty we don't bother
242 * expanding backwards, it won't be worth
245 new_ex->start = req_start;
247 if (req_end < (U64_MAX - 1))
248 /* lck is the lock with the lowest endpoint
249 * which covers anything after 'req'
251 lck = extent_iter_first(&tree->lit_root,
252 req_end + 1, U64_MAX);
254 new_ex->end = min(new_ex->end, START(lck) - 1);
257 if (new_ex->start == req_start && new_ex->end == req_end)
261 LASSERT(new_ex->start <= req_start);
262 LASSERT(new_ex->end >= req_end);
264 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
268 /* The purpose of this function is to return:
269 * - the maximum extent
270 * - containing the requested extent
271 * - and not overlapping existing conflicting extents outside the requested one
274 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
275 struct ldlm_extent *new_ex)
277 struct ldlm_resource *res = req->l_resource;
278 enum ldlm_mode req_mode = req->l_req_mode;
279 __u64 req_start = req->l_req_extent.start;
280 __u64 req_end = req->l_req_extent.end;
281 struct ldlm_lock *lock;
286 lockmode_verify(req_mode);
288 /* for waiting locks */
289 list_for_each_entry(lock, &res->lr_waiting, l_res_link) {
290 struct ldlm_extent *l_extent = &lock->l_policy_data.l_extent;
292 /* We already hit the minimum requested size, search no more */
293 if (new_ex->start == req_start && new_ex->end == req_end) {
298 /* Don't conflict with ourselves */
302 /* Locks are compatible, overlap doesn't matter
303 * Until bug 20 is fixed, try to avoid granting overlapping
304 * locks on one client (they take a long time to cancel)
306 if (lockmode_compat(lock->l_req_mode, req_mode) &&
307 lock->l_export != req->l_export)
310 /* If this is a high-traffic lock, don't grow downwards at all
311 * or grow upwards too much
315 new_ex->start = req_start;
317 /* If lock doesn't overlap new_ex, skip it. */
318 if (!ldlm_extent_overlap(l_extent, new_ex))
321 /* Locks conflicting in requested extents and we can't satisfy
322 * both locks, so ignore it. Either we will ping-pong this
323 * extent (we would regardless of what extent we granted) or
324 * lock is unused and it shouldn't limit our extent growth.
326 if (ldlm_extent_overlap(&lock->l_req_extent,
330 /* We grow extents downwards only as far as they don't overlap
331 * with already-granted locks, on the assumption that clients
332 * will be writing beyond the initial requested end and would
333 * then need to enqueue a new lock beyond previous request.
334 * l_req_extent->end strictly < req_start, checked above.
336 if (l_extent->start < req_start && new_ex->start != req_start) {
337 if (l_extent->end >= req_start)
338 new_ex->start = req_start;
340 new_ex->start = min(l_extent->end+1, req_start);
343 /* If we need to cancel this lock anyways because our request
344 * overlaps the granted lock, we grow up to its requested
345 * extent start instead of limiting this extent, assuming that
346 * clients are writing forwards and the lock had over grown
347 * its extent downwards before we enqueued our request.
349 if (l_extent->end > req_end) {
350 if (l_extent->start <= req_end)
351 new_ex->end = max(lock->l_req_extent.start - 1,
354 new_ex->end = max(l_extent->start - 1, req_end);
358 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
363 /* In order to determine the largest possible extent we can grant, we need
364 * to scan all of the queues.
366 static void ldlm_extent_policy(struct ldlm_resource *res,
367 struct ldlm_lock *lock, __u64 *flags)
369 struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
371 if (lock->l_export == NULL)
372 /* this is a local lock taken by server (e.g., as a part of
373 * OST-side locking, or unlink handling). Expansion doesn't
374 * make a lot of sense for local locks, because they are
375 * dropped immediately on operation completion and would only
376 * conflict with other threads.
380 if (lock->l_policy_data.l_extent.start == 0 &&
381 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
382 /* fast-path whole file locks */
385 /* Because reprocess_queue zeroes flags and uses it to return
386 * LDLM_FL_LOCK_CHANGED, we must check for the NO_EXPANSION flag
387 * in the lock flags rather than the 'flags' argument
389 if (likely(!(lock->l_flags & LDLM_FL_NO_EXPANSION))) {
390 ldlm_extent_internal_policy_granted(lock, &new_ex);
391 ldlm_extent_internal_policy_waiting(lock, &new_ex);
393 LDLM_DEBUG(lock, "Not expanding manually requested lock");
394 new_ex.start = lock->l_policy_data.l_extent.start;
395 new_ex.end = lock->l_policy_data.l_extent.end;
396 /* In case the request is not on correct boundaries, we call
397 * fixup. (normally called in ldlm_extent_internal_policy_*)
399 ldlm_extent_internal_policy_fixup(lock, &new_ex, 0);
402 if (!ldlm_extent_equal(&new_ex, &lock->l_policy_data.l_extent)) {
403 *flags |= LDLM_FL_LOCK_CHANGED;
404 lock->l_policy_data.l_extent.start = new_ex.start;
405 lock->l_policy_data.l_extent.end = new_ex.end;
409 static bool ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
411 struct ldlm_resource *res = lock->l_resource;
412 time64_t now = ktime_get_seconds();
414 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
417 CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
418 if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
419 res->lr_contention_time = now;
421 return now < res->lr_contention_time +
422 ldlm_res_to_ns(res)->ns_contention_time;
425 struct ldlm_extent_compat_args {
426 struct list_head *work_list;
427 struct ldlm_lock *lock;
433 static bool ldlm_extent_compat_cb(struct ldlm_lock *lock, void *data)
435 struct ldlm_extent_compat_args *priv = data;
436 struct list_head *work_list = priv->work_list;
437 struct ldlm_lock *enq = priv->lock;
438 enum ldlm_mode mode = priv->mode;
441 /* interval tree is for granted lock */
442 LASSERTF(mode == lock->l_granted_mode,
443 "mode = %s, lock->l_granted_mode = %s\n",
445 ldlm_lockname[lock->l_granted_mode]);
446 if (lock->l_blocking_ast && lock->l_granted_mode != LCK_GROUP)
447 ldlm_add_ast_work_item(lock, enq, work_list);
449 /* don't count conflicting glimpse locks */
450 if (!(mode == LCK_PR && lock->l_policy_data.l_extent.start == 0 &&
451 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF))
461 * Determine if the lock is compatible with all locks on the queue.
463 * If \a work_list is provided, conflicting locks are linked there.
464 * If \a work_list is not provided, we exit this function on first conflict.
466 * \retval 0 if the lock is not compatible
467 * \retval 1 if the lock is compatible
468 * \retval 2 if \a req is a group lock and it is compatible and requires
469 * no further checking
470 * \retval negative error, such as EAGAIN for group locks
473 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
474 __u64 *flags, struct list_head *work_list,
475 int *contended_locks)
477 struct ldlm_resource *res = req->l_resource;
478 enum ldlm_mode req_mode = req->l_req_mode;
479 __u64 req_start = req->l_req_extent.start;
480 __u64 req_end = req->l_req_extent.end;
481 struct ldlm_lock *lock;
482 int check_contention;
487 lockmode_verify(req_mode);
489 /* Using interval tree for granted lock */
490 if (queue == &res->lr_granted) {
491 struct ldlm_interval_tree *tree;
492 struct ldlm_extent_compat_args data = {.work_list = work_list,
494 .locks = contended_locks,
498 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
499 tree = &res->lr_itree[idx];
500 if (INTERVAL_TREE_EMPTY(&tree->lit_root))
503 data.mode = tree->lit_mode;
504 if (lockmode_compat(req_mode, tree->lit_mode)) {
505 struct ldlm_lock *lock;
507 if (req_mode != LCK_GROUP)
510 /* group lock, grant it immediately if
513 lock = extent_top(tree);
514 if (req->l_policy_data.l_extent.gid ==
515 lock->l_policy_data.l_extent.gid)
519 if (tree->lit_mode == LCK_GROUP) {
520 if (*flags & (LDLM_FL_BLOCK_NOWAIT |
521 LDLM_FL_SPECULATIVE)) {
529 /* if work list is not NULL, add all
530 * locks in the tree to work list.
533 for (lock = extent_first(tree);
535 lock = extent_next_lock(lock))
536 ldlm_extent_compat_cb(lock, &data);
540 /* We've found a potentially blocking lock, check
541 * compatibility. This handles locks other than GROUP
542 * locks, which are handled separately above.
544 * Locks with FL_SPECULATIVE are asynchronous requests
545 * which must never wait behind another lock, so they
546 * fail if any conflicting lock is found.
548 if (!work_list || (*flags & LDLM_FL_SPECULATIVE)) {
549 if (extent_iter_first(&tree->lit_root,
550 req_start, req_end)) {
559 ldlm_extent_search(&tree->lit_root,
561 ldlm_extent_compat_cb,
563 if (!list_empty(work_list) && compat)
567 } else { /* for waiting queue */
568 list_for_each_entry(lock, queue, l_res_link) {
569 check_contention = 1;
571 /* We stop walking the queue if we hit ourselves so
572 * we don't take conflicting locks enqueued after us
573 * into account, or we'd wait forever.
578 /* locks are compatible, overlap doesn't matter */
579 if (lockmode_compat(lock->l_req_mode, req_mode)) {
580 if (req_mode == LCK_PR &&
581 ((lock->l_policy_data.l_extent.start <=
582 req->l_policy_data.l_extent.start) &&
583 (lock->l_policy_data.l_extent.end >=
584 req->l_policy_data.l_extent.end))) {
585 /* If we met a PR lock just like us or
586 * wider, and nobody down the list
587 * conflicted with it, that means we
588 * can skip processing of the rest of
589 * the list and safely place ourselves
590 * at the end of the list, or grant
591 * (dependent if we met an conflicting
592 * locks before in the list). In case
593 * of 1st enqueue only we continue
594 * traversing if there is something
595 * conflicting down the list because
596 * we need to make sure that something
597 * is marked as AST_SENT as well, in
598 * case of empy worklist we would exit
599 * on first conflict met.
602 /* There IS a case where such flag is
603 * not set for a lock, yet it blocks
604 * something. Luckily for us this is
605 * only during destroy, so lock is
606 * exclusive. So here we are safe
608 if (!ldlm_is_ast_sent(lock))
612 /* non-group locks are compatible,
613 * overlap doesn't matter
615 if (likely(req_mode != LCK_GROUP))
618 /* If we are trying to get a GROUP lock and
619 * there is another one of this kind, we need
622 if (req->l_policy_data.l_extent.gid ==
623 lock->l_policy_data.l_extent.gid) {
624 /* If existing lock with matched gid
625 * is granted, we grant new one too.
627 if (ldlm_is_granted(lock))
630 /* Otherwise we are scanning queue of
631 * waiting locks and it means current
632 * request would block along with
633 * existing lock (that is already
634 * blocked. If we are in nonblocking
635 * mode - return immediately
637 if (*flags & (LDLM_FL_BLOCK_NOWAIT |
638 LDLM_FL_SPECULATIVE)) {
642 /* If this group lock is compatible with
643 * another group lock on the waiting
644 * list, they must be together in the
645 * list, so they can be granted at the
646 * same time. Otherwise the later lock
647 * can get stuck behind another,
648 * incompatible, lock.
650 ldlm_resource_insert_lock_after(lock,
652 /* Because 'lock' is not granted, we can
653 * stop processing this queue and return
654 * immediately. There is no need to
655 * check the rest of the list.
661 if (unlikely(req_mode == LCK_GROUP &&
662 !ldlm_is_granted(lock))) {
664 if (lock->l_req_mode != LCK_GROUP) {
665 /* Ok, we hit non-GROUP lock,
667 * should be no more GROUP
673 ldlm_resource_insert_lock_before(lock,
677 LASSERT(req->l_policy_data.l_extent.gid !=
678 lock->l_policy_data.l_extent.gid);
682 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
683 /* If compared lock is GROUP, then requested is
684 * PR/PW so this is not compatible; extent
685 * range does not matter
687 if (*flags & (LDLM_FL_BLOCK_NOWAIT |
688 LDLM_FL_SPECULATIVE)) {
692 } else if (lock->l_policy_data.l_extent.end < req_start ||
693 lock->l_policy_data.l_extent.start > req_end) {
694 /* if non group lock doesn't overlap skip it */
696 } else if (lock->l_req_extent.end < req_start ||
697 lock->l_req_extent.start > req_end) {
698 /* false contention, the requests doesn't
701 check_contention = 0;
707 if (*flags & LDLM_FL_SPECULATIVE) {
712 /* don't count conflicting glimpse locks */
713 if (lock->l_req_mode == LCK_PR &&
714 lock->l_policy_data.l_extent.start == 0 &&
715 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
716 check_contention = 0;
718 *contended_locks += check_contention;
721 if (lock->l_blocking_ast &&
722 lock->l_req_mode != LCK_GROUP)
723 ldlm_add_ast_work_item(lock, req, work_list);
727 if (ldlm_check_contention(req, *contended_locks) &&
729 (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
730 req->l_req_mode != LCK_GROUP &&
731 req_end - req_start <=
732 ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
733 GOTO(destroylock, compat = -EUSERS);
737 list_del_init(&req->l_res_link);
738 if (ldlm_is_local(req))
739 ldlm_lock_decref_internal_nolock(req, req_mode);
740 ldlm_lock_destroy_nolock(req);
745 * This function refresh eviction timer for cancelled lock.
746 * \param[in] lock ldlm lock for refresh
747 * \param[in] arg ldlm prolong arguments, timeout, export, extent
748 * and counter are used
750 void ldlm_lock_prolong_one(struct ldlm_lock *lock,
751 struct ldlm_prolong_args *arg)
755 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PROLONG_PAUSE, 3);
757 if (arg->lpa_export != lock->l_export ||
758 lock->l_flags & LDLM_FL_DESTROYED)
759 /* ignore unrelated locks */
762 arg->lpa_locks_cnt++;
764 if (!(lock->l_flags & LDLM_FL_AST_SENT))
765 /* ignore locks not being cancelled */
768 arg->lpa_blocks_cnt++;
770 /* OK. this is a possible lock the user holds doing I/O
771 * let's refresh eviction timer for it.
773 timeout = ptlrpc_export_prolong_timeout(arg->lpa_req, false);
774 LDLM_DEBUG(lock, "refreshed to %ds. ", timeout);
775 ldlm_refresh_waiting_lock(lock, timeout);
777 EXPORT_SYMBOL(ldlm_lock_prolong_one);
779 static bool ldlm_resource_prolong_cb(struct ldlm_lock *lock, void *data)
781 struct ldlm_prolong_args *arg = data;
784 ldlm_lock_prolong_one(lock, arg);
790 * Walk through granted tree and prolong locks if they overlaps extent.
792 * \param[in] arg prolong args
794 void ldlm_resource_prolong(struct ldlm_prolong_args *arg)
796 struct ldlm_interval_tree *tree;
797 struct ldlm_resource *res;
802 res = ldlm_resource_get(arg->lpa_export->exp_obd->obd_namespace,
803 &arg->lpa_resid, LDLM_EXTENT, 0);
805 CDEBUG(D_DLMTRACE, "Failed to get resource for resid %llu/%llu\n",
806 arg->lpa_resid.name[0], arg->lpa_resid.name[1]);
811 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
812 tree = &res->lr_itree[idx];
813 if (INTERVAL_TREE_EMPTY(&tree->lit_root))
816 /* There is no possibility to check for the groupID
817 * so all the group locks are considered as valid
818 * here, especially because the client is supposed
819 * to check it has such a lock before sending an RPC.
821 if (!(tree->lit_mode & arg->lpa_mode))
824 ldlm_extent_search(&tree->lit_root, arg->lpa_extent.start,
826 ldlm_resource_prolong_cb, arg);
829 ldlm_resource_putref(res);
833 EXPORT_SYMBOL(ldlm_resource_prolong);
836 * Process a granting attempt for extent lock.
837 * Must be called with ns lock held.
839 * This function looks for any conflicts for \a lock in the granted or
840 * waiting queues. The lock is granted if no conflicts are found in
843 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
844 enum ldlm_process_intention intention,
845 enum ldlm_error *err, struct list_head *work_list)
847 struct ldlm_resource *res = lock->l_resource;
849 int contended_locks = 0;
850 struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
854 LASSERT(!ldlm_is_granted(lock));
855 LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
856 !ldlm_is_ast_discard_data(lock));
857 check_res_locked(res);
860 if (intention == LDLM_PROCESS_RESCAN) {
861 /* Careful observers will note that we don't handle -EAGAIN
862 * here, but it's ok for a non-obvious reason -- compat_queue
863 * can only return -EAGAIN if (flags & BLOCK_NOWAIT |
864 * SPECULATIVE). flags should always be zero here, and if that
865 * ever stops being true, we want to find out. */
866 LASSERT(*flags == 0);
867 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
868 NULL, &contended_locks);
870 rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
875 RETURN(LDLM_ITER_STOP);
877 ldlm_resource_unlink_lock(lock);
879 if (!CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
880 ldlm_extent_policy(res, lock, flags);
881 ldlm_grant_lock(lock, grant_work);
882 RETURN(LDLM_ITER_CONTINUE);
886 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
887 work_list, &contended_locks);
889 GOTO(out, *err = rc);
892 rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock,
896 GOTO(out, *err = rc = rc2);
900 ldlm_extent_policy(res, lock, flags);
901 ldlm_resource_unlink_lock(lock);
902 ldlm_grant_lock(lock, grant_work);
904 /* Adding LDLM_FL_NO_TIMEOUT flag to granted lock to
905 * force client to wait for the lock endlessly once
906 * the lock is enqueued -bzzz */
907 *flags |= LDLM_FL_NO_TIMEOUT;
910 RETURN(LDLM_ITER_CONTINUE);
914 #endif /* HAVE_SERVER_SUPPORT */
916 /* When a lock is cancelled by a client, the KMS may undergo change if this
917 * is the "highest lock". This function returns the new KMS value, updating
918 * it only if we were the highest lock.
920 * Caller must hold lr_lock already.
922 * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes!
924 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
926 struct ldlm_resource *res = lock->l_resource;
927 struct ldlm_interval_tree *tree;
928 struct ldlm_lock *lck;
931 bool complete = false;
935 /* don't let another thread in ldlm_extent_shift_kms race in
936 * just after we finish and take our lock into account in its
937 * calculation of the kms
939 ldlm_set_kms_ignore(lock);
941 /* We iterate over the lock trees, looking for the largest kms
942 * smaller than the current one.
944 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
945 tree = &res->lr_itree[idx];
947 /* If our already known kms is >= than the highest 'end' in
948 * this tree, we don't need to check this tree, because
949 * the kms from a tree can be lower than in_max_high (due to
950 * kms_ignore), but it can never be higher. */
951 lck = extent_top(tree);
952 if (!lck || kms >= lck->l_subtree_last)
955 for (lck = extent_last(tree);
957 lck = extent_prev(lck)) {
958 if (ldlm_is_kms_ignore(lck)) {
959 struct ldlm_lock *lk;
961 list_for_each_entry(lk, &lck->l_same_extent,
963 if (!ldlm_is_kms_ignore(lk)) {
971 /* If this lock has a greater or equal kms, we are not
972 * the highest lock (or we share that distinction
973 * with another lock), and don't need to update KMS.
974 * Record old_kms and stop looking.
976 if (lck->l_policy_data.l_extent.end == OBD_OBJECT_EOF ||
977 lck->l_policy_data.l_extent.end + 1 >= old_kms) {
982 if (lck->l_policy_data.l_extent.end + 1 > kms)
983 kms = lck->l_policy_data.l_extent.end + 1;
985 /* Since we start with the highest lock and work
986 * down, for PW locks, we only need to check if we
987 * should update the kms, then stop walking the tree.
988 * PR locks are not exclusive, so the highest start
989 * does not imply the highest end and we must
990 * continue. (Only one group lock is allowed per
991 * resource, so this is irrelevant for group locks.)
993 if (lck->l_granted_mode == LCK_PW)
997 /* This tells us we're not the highest lock, so we don't need
998 * to check the remaining trees
1004 LASSERTF(kms <= old_kms, "kms %llu old_kms %llu\n", kms, old_kms);
1008 EXPORT_SYMBOL(ldlm_extent_shift_kms);
1010 static inline int ldlm_mode_to_index(enum ldlm_mode mode)
1015 LASSERT(is_power_of_2(mode));
1016 index = ilog2(mode);
1017 LASSERT(index < LCK_MODE_NUM);
1021 /** Add newly granted lock into interval tree for the resource. */
1022 void ldlm_extent_add_lock(struct ldlm_resource *res,
1023 struct ldlm_lock *lock)
1025 struct ldlm_interval_tree *tree;
1026 struct ldlm_lock *orig;
1029 LASSERT(ldlm_is_granted(lock));
1031 LASSERT(RB_EMPTY_NODE(&lock->l_rb));
1032 LASSERT(list_empty(&lock->l_same_extent));
1034 idx = ldlm_mode_to_index(lock->l_granted_mode);
1035 LASSERT(lock->l_granted_mode == BIT(idx));
1036 LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
1038 tree = &res->lr_itree[idx];
1039 orig = extent_insert_unique(lock, &tree->lit_root);
1041 list_add(&lock->l_same_extent, &orig->l_same_extent);
1044 /* even though we use interval tree to manage the extent lock, we also
1045 * add the locks into grant list, for debug purpose, ..
1047 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1049 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_GRANT_CHECK)) {
1050 struct ldlm_lock *lck;
1052 list_for_each_entry_reverse(lck, &res->lr_granted, l_res_link) {
1055 if (lockmode_compat(lck->l_granted_mode,
1056 lock->l_granted_mode))
1058 if (ldlm_extent_overlap(&lck->l_req_extent,
1059 &lock->l_req_extent)) {
1061 "granting conflicting lock %p %p\n",
1063 ldlm_resource_dump(D_ERROR, res);
1070 /** Remove cancelled lock from resource interval tree. */
1071 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
1073 struct ldlm_resource *res = lock->l_resource;
1074 struct ldlm_interval_tree *tree;
1077 if (RB_EMPTY_NODE(&lock->l_rb) &&
1078 list_empty(&lock->l_same_extent)) /* duplicate unlink */
1081 idx = ldlm_mode_to_index(lock->l_granted_mode);
1082 LASSERT(lock->l_granted_mode == BIT(idx));
1083 tree = &res->lr_itree[idx];
1085 LASSERT(!INTERVAL_TREE_EMPTY(&tree->lit_root));
1089 if (RB_EMPTY_NODE(&lock->l_rb)) {
1090 list_del_init(&lock->l_same_extent);
1091 } else if (list_empty(&lock->l_same_extent)) {
1092 extent_remove(lock, &tree->lit_root);
1093 RB_CLEAR_NODE(&lock->l_rb);
1095 struct ldlm_lock *next = list_next_entry(lock, l_same_extent);
1096 list_del_init(&lock->l_same_extent);
1097 extent_remove(lock, &tree->lit_root);
1098 RB_CLEAR_NODE(&lock->l_rb);
1099 extent_insert(next, &tree->lit_root);
1103 void ldlm_extent_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
1104 union ldlm_policy_data *lpolicy)
1106 lpolicy->l_extent.start = wpolicy->l_extent.start;
1107 lpolicy->l_extent.end = wpolicy->l_extent.end;
1108 lpolicy->l_extent.gid = wpolicy->l_extent.gid;
1111 void ldlm_extent_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
1112 union ldlm_wire_policy_data *wpolicy)
1114 memset(wpolicy, 0, sizeof(*wpolicy));
1115 wpolicy->l_extent.start = lpolicy->l_extent.start;
1116 wpolicy->l_extent.end = lpolicy->l_extent.end;
1117 wpolicy->l_extent.gid = lpolicy->l_extent.gid;
1119 void ldlm_extent_search(struct interval_tree_root *root,
1121 bool (*matches)(struct ldlm_lock *lock, void *data),
1124 struct ldlm_lock *lock, *lock2;
1126 for (lock = extent_iter_first(root, start, end);
1128 lock = extent_iter_next(lock, start, end)) {
1129 if (matches(lock, data))
1131 list_for_each_entry(lock2, &lock->l_same_extent, l_same_extent)
1132 if (matches(lock2, data))