4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2010, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/ldlm/ldlm_extent.c
33 * Author: Peter Braam <braam@clusterfs.com>
34 * Author: Phil Schwan <phil@clusterfs.com>
38 * This file contains implementation of EXTENT lock type
40 * EXTENT lock type is for locking a contiguous range of values, represented
41 * by 64-bit starting and ending offsets (inclusive). There are several extent
42 * lock modes, some of which may be mutually incompatible. Extent locks are
43 * considered incompatible if their modes are incompatible and their extents
44 * intersect. See the lock mode compatibility matrix in lustre_dlm.h.
47 #define DEBUG_SUBSYSTEM S_LDLM
49 #include <libcfs/libcfs.h>
50 #include <lustre_dlm.h>
51 #include <obd_support.h>
53 #include <obd_class.h>
54 #include <lustre_lib.h>
56 #include "ldlm_internal.h"
58 #ifdef HAVE_SERVER_SUPPORT
59 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
62 * Fix up the ldlm_extent after expanding it.
64 * After expansion has been done, we might still want to do certain adjusting
65 * based on overall contention of the resource and the like to avoid granting
68 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
69 struct ldlm_extent *new_ex,
72 enum ldlm_mode req_mode = req->l_req_mode;
73 __u64 req_start = req->l_req_extent.start;
74 __u64 req_end = req->l_req_extent.end;
75 __u64 req_align, mask;
77 if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
78 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
79 new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
83 if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
88 /* we need to ensure that the lock extent is properly aligned to what
89 * the client requested. Also we need to make sure it's also server
90 * page size aligned otherwise a server page can be covered by two
94 req_align = (req_end + 1) | req_start;
95 if (req_align != 0 && (req_align & (mask - 1)) == 0) {
96 while ((req_align & mask) == 0)
100 /* We can only shrink the lock, not grow it.
101 * This should never cause lock to be smaller than requested,
102 * since requested lock was already aligned on these boundaries.
104 new_ex->start = ((new_ex->start - 1) | mask) + 1;
105 new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
106 LASSERTF(new_ex->start <= req_start,
107 "mask %#llx grant start %llu req start %llu\n",
108 mask, new_ex->start, req_start);
109 LASSERTF(new_ex->end >= req_end,
110 "mask %#llx grant end %llu req end %llu\n",
111 mask, new_ex->end, req_end);
115 * Return the maximum extent that:
116 * - contains the requested extent
117 * - does not overlap existing conflicting extents outside the requested one
119 * This allows clients to request a small required extent range, but if there
120 * is no contention on the lock the full lock can be granted to the client.
121 * This avoids the need for many smaller lock requests to be granted in the
122 * common (uncontended) case.
124 * Use interval tree to expand the lock extent for granted lock.
126 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
127 struct ldlm_extent *new_ex)
129 struct ldlm_resource *res = req->l_resource;
130 enum ldlm_mode req_mode = req->l_req_mode;
131 __u64 req_start = req->l_req_extent.start;
132 __u64 req_end = req->l_req_extent.end;
133 struct ldlm_interval_tree *tree;
134 struct interval_node_extent limiter = {
135 .start = new_ex->start,
143 lockmode_verify(req_mode);
145 /* Using interval tree to handle the LDLM extent granted locks. */
146 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
147 struct interval_node_extent ext = {
152 tree = &res->lr_itree[idx];
153 if (lockmode_compat(tree->lit_mode, req_mode))
156 conflicting += tree->lit_size;
158 limiter.start = req_start;
160 if (interval_is_overlapped(tree->lit_root, &ext))
162 "req_mode = %d, tree->lit_mode = %d, tree->lit_size = %d\n",
163 req_mode, tree->lit_mode, tree->lit_size);
164 interval_expand(tree->lit_root, &ext, &limiter);
165 limiter.start = max(limiter.start, ext.start);
166 limiter.end = min(limiter.end, ext.end);
167 if (limiter.start == req_start && limiter.end == req_end)
171 new_ex->start = limiter.start;
172 new_ex->end = limiter.end;
173 LASSERT(new_ex->start <= req_start);
174 LASSERT(new_ex->end >= req_end);
176 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
180 /* The purpose of this function is to return:
181 * - the maximum extent
182 * - containing the requested extent
183 * - and not overlapping existing conflicting extents outside the requested one
186 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
187 struct ldlm_extent *new_ex)
189 struct ldlm_resource *res = req->l_resource;
190 enum ldlm_mode req_mode = req->l_req_mode;
191 __u64 req_start = req->l_req_extent.start;
192 __u64 req_end = req->l_req_extent.end;
193 struct ldlm_lock *lock;
198 lockmode_verify(req_mode);
200 /* for waiting locks */
201 list_for_each_entry(lock, &res->lr_waiting, l_res_link) {
202 struct ldlm_extent *l_extent = &lock->l_policy_data.l_extent;
204 /* We already hit the minimum requested size, search no more */
205 if (new_ex->start == req_start && new_ex->end == req_end) {
210 /* Don't conflict with ourselves */
214 /* Locks are compatible, overlap doesn't matter
215 * Until bug 20 is fixed, try to avoid granting overlapping
216 * locks on one client (they take a long time to cancel)
218 if (lockmode_compat(lock->l_req_mode, req_mode) &&
219 lock->l_export != req->l_export)
222 /* If this is a high-traffic lock, don't grow downwards at all
223 * or grow upwards too much
227 new_ex->start = req_start;
229 /* If lock doesn't overlap new_ex, skip it. */
230 if (!ldlm_extent_overlap(l_extent, new_ex))
233 /* Locks conflicting in requested extents and we can't satisfy
234 * both locks, so ignore it. Either we will ping-pong this
235 * extent (we would regardless of what extent we granted) or
236 * lock is unused and it shouldn't limit our extent growth.
238 if (ldlm_extent_overlap(&lock->l_req_extent,
242 /* We grow extents downwards only as far as they don't overlap
243 * with already-granted locks, on the assumption that clients
244 * will be writing beyond the initial requested end and would
245 * then need to enqueue a new lock beyond previous request.
246 * l_req_extent->end strictly < req_start, checked above.
248 if (l_extent->start < req_start && new_ex->start != req_start) {
249 if (l_extent->end >= req_start)
250 new_ex->start = req_start;
252 new_ex->start = min(l_extent->end+1, req_start);
255 /* If we need to cancel this lock anyways because our request
256 * overlaps the granted lock, we grow up to its requested
257 * extent start instead of limiting this extent, assuming that
258 * clients are writing forwards and the lock had over grown
259 * its extent downwards before we enqueued our request.
261 if (l_extent->end > req_end) {
262 if (l_extent->start <= req_end)
263 new_ex->end = max(lock->l_req_extent.start - 1,
266 new_ex->end = max(l_extent->start - 1, req_end);
270 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
275 /* In order to determine the largest possible extent we can grant, we need
276 * to scan all of the queues.
278 static void ldlm_extent_policy(struct ldlm_resource *res,
279 struct ldlm_lock *lock, __u64 *flags)
281 struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
283 if (lock->l_export == NULL)
284 /* this is a local lock taken by server (e.g., as a part of
285 * OST-side locking, or unlink handling). Expansion doesn't
286 * make a lot of sense for local locks, because they are
287 * dropped immediately on operation completion and would only
288 * conflict with other threads.
292 if (lock->l_policy_data.l_extent.start == 0 &&
293 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
294 /* fast-path whole file locks */
297 /* Because reprocess_queue zeroes flags and uses it to return
298 * LDLM_FL_LOCK_CHANGED, we must check for the NO_EXPANSION flag
299 * in the lock flags rather than the 'flags' argument
301 if (likely(!(lock->l_flags & LDLM_FL_NO_EXPANSION))) {
302 ldlm_extent_internal_policy_granted(lock, &new_ex);
303 ldlm_extent_internal_policy_waiting(lock, &new_ex);
305 LDLM_DEBUG(lock, "Not expanding manually requested lock.\n");
306 new_ex.start = lock->l_policy_data.l_extent.start;
307 new_ex.end = lock->l_policy_data.l_extent.end;
308 /* In case the request is not on correct boundaries, we call
309 * fixup. (normally called in ldlm_extent_internal_policy_*)
311 ldlm_extent_internal_policy_fixup(lock, &new_ex, 0);
314 if (!ldlm_extent_equal(&new_ex, &lock->l_policy_data.l_extent)) {
315 *flags |= LDLM_FL_LOCK_CHANGED;
316 lock->l_policy_data.l_extent.start = new_ex.start;
317 lock->l_policy_data.l_extent.end = new_ex.end;
321 static bool ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
323 struct ldlm_resource *res = lock->l_resource;
324 time64_t now = ktime_get_seconds();
326 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
329 CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
330 if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
331 res->lr_contention_time = now;
333 return now < res->lr_contention_time +
334 ldlm_res_to_ns(res)->ns_contention_time;
337 struct ldlm_extent_compat_args {
338 struct list_head *work_list;
339 struct ldlm_lock *lock;
345 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
348 struct ldlm_extent_compat_args *priv = data;
349 struct ldlm_interval *node = to_ldlm_interval(n);
350 struct ldlm_extent *extent;
351 struct list_head *work_list = priv->work_list;
352 struct ldlm_lock *lock, *enq = priv->lock;
353 enum ldlm_mode mode = priv->mode;
358 LASSERT(!list_empty(&node->li_group));
360 list_for_each_entry(lock, &node->li_group, l_sl_policy) {
361 /* interval tree is for granted lock */
362 LASSERTF(mode == lock->l_granted_mode,
363 "mode = %s, lock->l_granted_mode = %s\n",
365 ldlm_lockname[lock->l_granted_mode]);
367 if (lock->l_blocking_ast && lock->l_granted_mode != LCK_GROUP)
368 ldlm_add_ast_work_item(lock, enq, work_list);
371 /* don't count conflicting glimpse locks */
372 extent = ldlm_interval_extent(node);
373 if (!(mode == LCK_PR && extent->start == 0 &&
374 extent->end == OBD_OBJECT_EOF))
375 *priv->locks += count;
380 RETURN(INTERVAL_ITER_CONT);
384 * Determine if the lock is compatible with all locks on the queue.
386 * If \a work_list is provided, conflicting locks are linked there.
387 * If \a work_list is not provided, we exit this function on first conflict.
389 * \retval 0 if the lock is not compatible
390 * \retval 1 if the lock is compatible
391 * \retval 2 if \a req is a group lock and it is compatible and requires
392 * no further checking
393 * \retval negative error, such as EAGAIN for group locks
396 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
397 __u64 *flags, struct list_head *work_list,
398 int *contended_locks)
400 struct ldlm_resource *res = req->l_resource;
401 enum ldlm_mode req_mode = req->l_req_mode;
402 __u64 req_start = req->l_req_extent.start;
403 __u64 req_end = req->l_req_extent.end;
404 struct ldlm_lock *lock;
405 int check_contention;
410 lockmode_verify(req_mode);
412 /* Using interval tree for granted lock */
413 if (queue == &res->lr_granted) {
414 struct ldlm_interval_tree *tree;
415 struct ldlm_extent_compat_args data = {
416 .work_list = work_list,
418 .locks = contended_locks,
420 struct interval_node_extent ex = {
425 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
426 tree = &res->lr_itree[idx];
427 if (tree->lit_root == NULL) /* empty tree, skipped */
430 data.mode = tree->lit_mode;
431 if (lockmode_compat(req_mode, tree->lit_mode)) {
432 struct ldlm_interval *node;
433 struct ldlm_extent *extent;
435 if (req_mode != LCK_GROUP)
438 /* group lock,grant it immediately if
440 node = to_ldlm_interval(tree->lit_root);
441 extent = ldlm_interval_extent(node);
442 if (req->l_policy_data.l_extent.gid ==
447 if (tree->lit_mode == LCK_GROUP) {
448 if (*flags & (LDLM_FL_BLOCK_NOWAIT |
449 LDLM_FL_SPECULATIVE)) {
457 /* if work list is not NULL,add all
458 * locks in the tree to work list
461 interval_iterate(tree->lit_root,
462 ldlm_extent_compat_cb, &data);
466 /* We've found a potentially blocking lock, check
467 * compatibility. This handles locks other than GROUP
468 * locks, which are handled separately above.
470 * Locks with FL_SPECULATIVE are asynchronous requests
471 * which must never wait behind another lock, so they
472 * fail if any conflicting lock is found.
474 if (!work_list || (*flags & LDLM_FL_SPECULATIVE)) {
475 rc = interval_is_overlapped(tree->lit_root,
486 interval_search(tree->lit_root, &ex,
487 ldlm_extent_compat_cb, &data);
488 if (!list_empty(work_list) && compat)
492 } else { /* for waiting queue */
493 list_for_each_entry(lock, queue, l_res_link) {
494 check_contention = 1;
496 /* We stop walking the queue if we hit ourselves so
497 * we don't take conflicting locks enqueued after us
498 * into account, or we'd wait forever.
503 /* locks are compatible, overlap doesn't matter */
504 if (lockmode_compat(lock->l_req_mode, req_mode)) {
505 if (req_mode == LCK_PR &&
506 ((lock->l_policy_data.l_extent.start <=
507 req->l_policy_data.l_extent.start) &&
508 (lock->l_policy_data.l_extent.end >=
509 req->l_policy_data.l_extent.end))) {
510 /* If we met a PR lock just like us or
511 * wider, and nobody down the list
512 * conflicted with it, that means we
513 * can skip processing of the rest of
514 * the list and safely place ourselves
515 * at the end of the list, or grant
516 * (dependent if we met an conflicting
517 * locks before in the list). In case
518 * of 1st enqueue only we continue
519 * traversing if there is something
520 * conflicting down the list because
521 * we need to make sure that something
522 * is marked as AST_SENT as well, in
523 * case of empy worklist we would exit
524 * on first conflict met.
527 /* There IS a case where such flag is
528 * not set for a lock, yet it blocks
529 * something. Luckily for us this is
530 * only during destroy, so lock is
531 * exclusive. So here we are safe
533 if (!ldlm_is_ast_sent(lock))
537 /* non-group locks are compatible,
538 * overlap doesn't matter
540 if (likely(req_mode != LCK_GROUP))
543 /* If we are trying to get a GROUP lock and
544 * there is another one of this kind, we need
547 if (req->l_policy_data.l_extent.gid ==
548 lock->l_policy_data.l_extent.gid) {
549 /* If existing lock with matched gid
550 * is granted, we grant new one too.
552 if (ldlm_is_granted(lock))
555 /* Otherwise we are scanning queue of
556 * waiting locks and it means current
557 * request would block along with
558 * existing lock (that is already
559 * blocked. If we are in nonblocking
560 * mode - return immediately
562 if (*flags & (LDLM_FL_BLOCK_NOWAIT |
563 LDLM_FL_SPECULATIVE)) {
567 /* If this group lock is compatible with
568 * another group lock on the waiting
569 * list, they must be together in the
570 * list, so they can be granted at the
571 * same time. Otherwise the later lock
572 * can get stuck behind another,
573 * incompatible, lock.
575 ldlm_resource_insert_lock_after(lock,
577 /* Because 'lock' is not granted, we can
578 * stop processing this queue and return
579 * immediately. There is no need to
580 * check the rest of the list.
586 if (unlikely(req_mode == LCK_GROUP &&
587 !ldlm_is_granted(lock))) {
589 if (lock->l_req_mode != LCK_GROUP) {
590 /* Ok, we hit non-GROUP lock, there
591 * should be no more GROUP locks later
592 * on, queue in front of first
596 ldlm_resource_insert_lock_before(lock,
600 LASSERT(req->l_policy_data.l_extent.gid !=
601 lock->l_policy_data.l_extent.gid);
605 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
606 /* If compared lock is GROUP, then requested is
607 * PR/PW so this is not compatible; extent
608 * range does not matter
610 if (*flags & (LDLM_FL_BLOCK_NOWAIT |
611 LDLM_FL_SPECULATIVE)) {
615 } else if (lock->l_policy_data.l_extent.end < req_start ||
616 lock->l_policy_data.l_extent.start > req_end) {
617 /* if non group lock doesn't overlap skip it */
619 } else if (lock->l_req_extent.end < req_start ||
620 lock->l_req_extent.start > req_end) {
621 /* false contention, the requests doesn't
623 check_contention = 0;
629 if (*flags & LDLM_FL_SPECULATIVE) {
634 /* don't count conflicting glimpse locks */
635 if (lock->l_req_mode == LCK_PR &&
636 lock->l_policy_data.l_extent.start == 0 &&
637 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
638 check_contention = 0;
640 *contended_locks += check_contention;
643 if (lock->l_blocking_ast &&
644 lock->l_req_mode != LCK_GROUP)
645 ldlm_add_ast_work_item(lock, req, work_list);
649 if (ldlm_check_contention(req, *contended_locks) &&
650 compat == 0 && (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
651 req->l_req_mode != LCK_GROUP && req_end - req_start <=
652 ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
653 GOTO(destroylock, compat = -EUSERS);
657 list_del_init(&req->l_res_link);
658 ldlm_lock_destroy_nolock(req);
663 * This function refresh eviction timer for cancelled lock.
664 * \param[in] lock ldlm lock for refresh
665 * \param[in] arg ldlm prolong arguments, timeout, export, extent
666 * and counter are used
668 void ldlm_lock_prolong_one(struct ldlm_lock *lock,
669 struct ldlm_prolong_args *arg)
673 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PROLONG_PAUSE, 3);
675 if (arg->lpa_export != lock->l_export ||
676 lock->l_flags & LDLM_FL_DESTROYED)
677 /* ignore unrelated locks */
680 arg->lpa_locks_cnt++;
682 if (!(lock->l_flags & LDLM_FL_AST_SENT))
683 /* ignore locks not being cancelled */
686 arg->lpa_blocks_cnt++;
688 /* OK. this is a possible lock the user holds doing I/O
689 * let's refresh eviction timer for it.
691 timeout = ldlm_bl_timeout_by_rpc(arg->lpa_req);
692 LDLM_DEBUG(lock, "refreshed to %ds.\n", timeout);
693 ldlm_refresh_waiting_lock(lock, timeout);
695 EXPORT_SYMBOL(ldlm_lock_prolong_one);
697 static enum interval_iter ldlm_resource_prolong_cb(struct interval_node *n,
700 struct ldlm_prolong_args *arg = data;
701 struct ldlm_interval *node = to_ldlm_interval(n);
702 struct ldlm_lock *lock;
706 LASSERT(!list_empty(&node->li_group));
708 list_for_each_entry(lock, &node->li_group, l_sl_policy) {
709 ldlm_lock_prolong_one(lock, arg);
712 RETURN(INTERVAL_ITER_CONT);
716 * Walk through granted tree and prolong locks if they overlaps extent.
718 * \param[in] arg prolong args
720 void ldlm_resource_prolong(struct ldlm_prolong_args *arg)
722 struct ldlm_interval_tree *tree;
723 struct ldlm_resource *res;
724 struct interval_node_extent ex = { .start = arg->lpa_extent.start,
725 .end = arg->lpa_extent.end };
730 res = ldlm_resource_get(arg->lpa_export->exp_obd->obd_namespace,
731 &arg->lpa_resid, LDLM_EXTENT, 0);
733 CDEBUG(D_DLMTRACE, "Failed to get resource for resid %llu/%llu\n",
734 arg->lpa_resid.name[0], arg->lpa_resid.name[1]);
739 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
740 tree = &res->lr_itree[idx];
741 if (tree->lit_root == NULL) /* empty tree, skipped */
744 /* There is no possibility to check for the groupID
745 * so all the group locks are considered as valid
746 * here, especially because the client is supposed
747 * to check it has such a lock before sending an RPC.
749 if (!(tree->lit_mode & arg->lpa_mode))
752 interval_search(tree->lit_root, &ex,
753 ldlm_resource_prolong_cb, arg);
756 ldlm_resource_putref(res);
760 EXPORT_SYMBOL(ldlm_resource_prolong);
763 * Process a granting attempt for extent lock.
764 * Must be called with ns lock held.
766 * This function looks for any conflicts for \a lock in the granted or
767 * waiting queues. The lock is granted if no conflicts are found in
770 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
771 enum ldlm_process_intention intention,
772 enum ldlm_error *err, struct list_head *work_list)
774 struct ldlm_resource *res = lock->l_resource;
776 int contended_locks = 0;
777 struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
781 LASSERT(!ldlm_is_granted(lock));
782 LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
783 !ldlm_is_ast_discard_data(lock));
784 check_res_locked(res);
787 if (intention == LDLM_PROCESS_RESCAN) {
788 /* Careful observers will note that we don't handle -EAGAIN
789 * here, but it's ok for a non-obvious reason -- compat_queue
790 * can only return -EAGAIN if (flags & BLOCK_NOWAIT |
791 * SPECULATIVE). flags should always be zero here, and if that
792 * ever stops being true, we want to find out. */
793 LASSERT(*flags == 0);
794 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
795 NULL, &contended_locks);
797 rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
802 RETURN(LDLM_ITER_STOP);
804 ldlm_resource_unlink_lock(lock);
806 if (!CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
807 ldlm_extent_policy(res, lock, flags);
808 ldlm_grant_lock(lock, grant_work);
809 RETURN(LDLM_ITER_CONTINUE);
813 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
814 work_list, &contended_locks);
816 GOTO(out, *err = rc);
819 rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock,
823 GOTO(out, *err = rc = rc2);
827 ldlm_extent_policy(res, lock, flags);
828 ldlm_resource_unlink_lock(lock);
829 ldlm_grant_lock(lock, grant_work);
831 /* Adding LDLM_FL_NO_TIMEOUT flag to granted lock to
832 * force client to wait for the lock endlessly once
833 * the lock is enqueued -bzzz */
834 *flags |= LDLM_FL_NO_TIMEOUT;
837 RETURN(LDLM_ITER_CONTINUE);
841 #endif /* HAVE_SERVER_SUPPORT */
843 struct ldlm_kms_shift_args {
849 /* Callback for interval_iterate functions, used by ldlm_extent_shift_Kms */
850 static enum interval_iter ldlm_kms_shift_cb(struct interval_node *n,
853 struct ldlm_kms_shift_args *arg = args;
854 struct ldlm_interval *node = to_ldlm_interval(n);
855 struct ldlm_lock *tmplock;
856 struct ldlm_lock *lock = NULL;
860 /* Since all locks in an interval have the same extent, we can just
861 * use the first lock without kms_ignore set. */
862 list_for_each_entry(tmplock, &node->li_group, l_sl_policy) {
863 if (ldlm_is_kms_ignore(tmplock))
871 /* No locks in this interval without kms_ignore set */
873 RETURN(INTERVAL_ITER_CONT);
875 /* If we find a lock with a greater or equal kms, we are not the
876 * highest lock (or we share that distinction with another lock), and
877 * don't need to update KMS. Return old_kms and stop looking. */
878 if (lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF ||
879 lock->l_policy_data.l_extent.end + 1 >= arg->old_kms) {
880 arg->kms = arg->old_kms;
881 arg->complete = true;
882 RETURN(INTERVAL_ITER_STOP);
885 if (lock->l_policy_data.l_extent.end + 1 > arg->kms)
886 arg->kms = lock->l_policy_data.l_extent.end + 1;
888 /* Since interval_iterate_reverse starts with the highest lock and
889 * works down, for PW locks, we only need to check if we should update
890 * the kms, then stop walking the tree. PR locks are not exclusive, so
891 * the highest start does not imply the highest end and we must
892 * continue. (Only one group lock is allowed per resource, so this is
893 * irrelevant for group locks.)*/
894 if (lock->l_granted_mode == LCK_PW)
895 RETURN(INTERVAL_ITER_STOP);
897 RETURN(INTERVAL_ITER_CONT);
900 /* When a lock is cancelled by a client, the KMS may undergo change if this
901 * is the "highest lock". This function returns the new KMS value, updating
902 * it only if we were the highest lock.
904 * Caller must hold lr_lock already.
906 * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes!
908 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
910 struct ldlm_resource *res = lock->l_resource;
911 struct ldlm_interval_tree *tree;
912 struct ldlm_kms_shift_args args;
917 args.old_kms = old_kms;
919 args.complete = false;
921 /* don't let another thread in ldlm_extent_shift_kms race in
922 * just after we finish and take our lock into account in its
923 * calculation of the kms */
924 ldlm_set_kms_ignore(lock);
926 /* We iterate over the lock trees, looking for the largest kms smaller
927 * than the current one. */
928 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
929 tree = &res->lr_itree[idx];
931 /* If our already known kms is >= than the highest 'end' in
932 * this tree, we don't need to check this tree, because
933 * the kms from a tree can be lower than in_max_high (due to
934 * kms_ignore), but it can never be higher. */
935 if (!tree->lit_root || args.kms >= tree->lit_root->in_max_high)
938 interval_iterate_reverse(tree->lit_root, ldlm_kms_shift_cb,
941 /* this tells us we're not the highest lock, so we don't need
942 * to check the remaining trees */
947 LASSERTF(args.kms <= args.old_kms, "kms %llu old_kms %llu\n", args.kms,
952 EXPORT_SYMBOL(ldlm_extent_shift_kms);
954 struct kmem_cache *ldlm_interval_slab;
955 static struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
957 struct ldlm_interval *node;
961 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
962 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
966 INIT_LIST_HEAD(&node->li_group);
967 ldlm_interval_attach(node, lock);
971 void ldlm_interval_free(struct ldlm_interval *node)
974 LASSERT(list_empty(&node->li_group));
975 LASSERT(!interval_is_intree(&node->li_node));
976 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
980 /* interval tree, for LDLM_EXTENT. */
981 void ldlm_interval_attach(struct ldlm_interval *n,
984 LASSERT(l->l_tree_node == NULL);
985 LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
987 list_add_tail(&l->l_sl_policy, &n->li_group);
991 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
993 struct ldlm_interval *n = l->l_tree_node;
998 LASSERT(!list_empty(&n->li_group));
999 l->l_tree_node = NULL;
1000 list_del_init(&l->l_sl_policy);
1002 return list_empty(&n->li_group) ? n : NULL;
1005 static inline int ldlm_mode_to_index(enum ldlm_mode mode)
1010 LASSERT(is_power_of_2(mode));
1011 index = ilog2(mode);
1012 LASSERT(index < LCK_MODE_NUM);
1016 int ldlm_extent_alloc_lock(struct ldlm_lock *lock)
1018 lock->l_tree_node = NULL;
1020 if (ldlm_interval_alloc(lock) == NULL)
1025 /** Add newly granted lock into interval tree for the resource. */
1026 void ldlm_extent_add_lock(struct ldlm_resource *res,
1027 struct ldlm_lock *lock)
1029 struct interval_node *found, **root;
1030 struct ldlm_interval *node;
1031 struct ldlm_extent *extent;
1034 LASSERT(ldlm_is_granted(lock));
1036 node = lock->l_tree_node;
1037 LASSERT(node != NULL);
1038 LASSERT(!interval_is_intree(&node->li_node));
1040 idx = ldlm_mode_to_index(lock->l_granted_mode);
1041 LASSERT(lock->l_granted_mode == BIT(idx));
1042 LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
1044 /* node extent initialize */
1045 extent = &lock->l_policy_data.l_extent;
1047 rc = interval_set(&node->li_node, extent->start, extent->end);
1050 root = &res->lr_itree[idx].lit_root;
1051 found = interval_insert(&node->li_node, root);
1052 if (found) { /* The policy group found. */
1053 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
1055 LASSERT(tmp != NULL);
1056 ldlm_interval_free(tmp);
1057 ldlm_interval_attach(to_ldlm_interval(found), lock);
1059 res->lr_itree[idx].lit_size++;
1061 /* even though we use interval tree to manage the extent lock, we also
1062 * add the locks into grant list, for debug purpose, .. */
1063 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1065 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_GRANT_CHECK)) {
1066 struct ldlm_lock *lck;
1068 list_for_each_entry_reverse(lck, &res->lr_granted,
1072 if (lockmode_compat(lck->l_granted_mode,
1073 lock->l_granted_mode))
1075 if (ldlm_extent_overlap(&lck->l_req_extent,
1076 &lock->l_req_extent)) {
1078 "granting conflicting lock %p %p\n",
1080 ldlm_resource_dump(D_ERROR, res);
1087 /** Remove cancelled lock from resource interval tree. */
1088 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
1090 struct ldlm_resource *res = lock->l_resource;
1091 struct ldlm_interval *node = lock->l_tree_node;
1092 struct ldlm_interval_tree *tree;
1095 if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
1098 idx = ldlm_mode_to_index(lock->l_granted_mode);
1099 LASSERT(lock->l_granted_mode == BIT(idx));
1100 tree = &res->lr_itree[idx];
1102 LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
1105 node = ldlm_interval_detach(lock);
1107 interval_erase(&node->li_node, &tree->lit_root);
1108 ldlm_interval_free(node);
1112 void ldlm_extent_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
1113 union ldlm_policy_data *lpolicy)
1115 lpolicy->l_extent.start = wpolicy->l_extent.start;
1116 lpolicy->l_extent.end = wpolicy->l_extent.end;
1117 lpolicy->l_extent.gid = wpolicy->l_extent.gid;
1120 void ldlm_extent_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
1121 union ldlm_wire_policy_data *wpolicy)
1123 memset(wpolicy, 0, sizeof(*wpolicy));
1124 wpolicy->l_extent.start = lpolicy->l_extent.start;
1125 wpolicy->l_extent.end = lpolicy->l_extent.end;
1126 wpolicy->l_extent.gid = lpolicy->l_extent.gid;