4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ldlm/ldlm_extent.c
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_LDLM
44 # include <liblustre.h>
46 # include <libcfs/libcfs.h>
49 #include <lustre_dlm.h>
50 #include <obd_support.h>
52 #include <obd_class.h>
53 #include <lustre_lib.h>
55 #include "ldlm_internal.h"
57 #ifdef HAVE_SERVER_SUPPORT
58 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
60 /* fixup the ldlm_extent after expanding */
61 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
62 struct ldlm_extent *new_ex,
65 ldlm_mode_t req_mode = req->l_req_mode;
66 __u64 req_start = req->l_req_extent.start;
67 __u64 req_end = req->l_req_extent.end;
68 __u64 req_align, mask;
70 if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
71 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
72 new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
76 if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
81 /* we need to ensure that the lock extent is properly aligned to what
82 * the client requested. Also we need to make sure it's also server
83 * page size aligned otherwise a server page can be covered by two
86 req_align = (req_end + 1) | req_start;
87 if (req_align != 0 && (req_align & (mask - 1)) == 0) {
88 while ((req_align & mask) == 0)
92 /* We can only shrink the lock, not grow it.
93 * This should never cause lock to be smaller than requested,
94 * since requested lock was already aligned on these boundaries. */
95 new_ex->start = ((new_ex->start - 1) | mask) + 1;
96 new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
97 LASSERTF(new_ex->start <= req_start,
98 "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
99 mask, new_ex->start, req_start);
100 LASSERTF(new_ex->end >= req_end,
101 "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
102 mask, new_ex->end, req_end);
105 /* The purpose of this function is to return:
106 * - the maximum extent
107 * - containing the requested extent
108 * - and not overlapping existing conflicting extents outside the requested one
110 * Use interval tree to expand the lock extent for granted lock.
112 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
113 struct ldlm_extent *new_ex)
115 struct ldlm_resource *res = req->l_resource;
116 ldlm_mode_t req_mode = req->l_req_mode;
117 __u64 req_start = req->l_req_extent.start;
118 __u64 req_end = req->l_req_extent.end;
119 struct ldlm_interval_tree *tree;
120 struct interval_node_extent limiter = { new_ex->start, new_ex->end };
125 lockmode_verify(req_mode);
127 /* using interval tree to handle the ldlm extent granted locks */
128 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
129 struct interval_node_extent ext = { req_start, req_end };
131 tree = &res->lr_itree[idx];
132 if (lockmode_compat(tree->lit_mode, req_mode))
135 conflicting += tree->lit_size;
137 limiter.start = req_start;
139 if (interval_is_overlapped(tree->lit_root, &ext))
141 "req_mode = %d, tree->lit_mode = %d, "
142 "tree->lit_size = %d\n",
143 req_mode, tree->lit_mode, tree->lit_size);
144 interval_expand(tree->lit_root, &ext, &limiter);
145 limiter.start = max(limiter.start, ext.start);
146 limiter.end = min(limiter.end, ext.end);
147 if (limiter.start == req_start && limiter.end == req_end)
151 new_ex->start = limiter.start;
152 new_ex->end = limiter.end;
153 LASSERT(new_ex->start <= req_start);
154 LASSERT(new_ex->end >= req_end);
156 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
160 /* The purpose of this function is to return:
161 * - the maximum extent
162 * - containing the requested extent
163 * - and not overlapping existing conflicting extents outside the requested one
166 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
167 struct ldlm_extent *new_ex)
170 struct ldlm_resource *res = req->l_resource;
171 ldlm_mode_t req_mode = req->l_req_mode;
172 __u64 req_start = req->l_req_extent.start;
173 __u64 req_end = req->l_req_extent.end;
177 lockmode_verify(req_mode);
179 /* for waiting locks */
180 cfs_list_for_each(tmp, &res->lr_waiting) {
181 struct ldlm_lock *lock;
182 struct ldlm_extent *l_extent;
184 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
185 l_extent = &lock->l_policy_data.l_extent;
187 /* We already hit the minimum requested size, search no more */
188 if (new_ex->start == req_start && new_ex->end == req_end) {
193 /* Don't conflict with ourselves */
197 /* Locks are compatible, overlap doesn't matter */
198 /* Until bug 20 is fixed, try to avoid granting overlapping
199 * locks on one client (they take a long time to cancel) */
200 if (lockmode_compat(lock->l_req_mode, req_mode) &&
201 lock->l_export != req->l_export)
204 /* If this is a high-traffic lock, don't grow downwards at all
205 * or grow upwards too much */
208 new_ex->start = req_start;
210 /* If lock doesn't overlap new_ex, skip it. */
211 if (!ldlm_extent_overlap(l_extent, new_ex))
214 /* Locks conflicting in requested extents and we can't satisfy
215 * both locks, so ignore it. Either we will ping-pong this
216 * extent (we would regardless of what extent we granted) or
217 * lock is unused and it shouldn't limit our extent growth. */
218 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
221 /* We grow extents downwards only as far as they don't overlap
222 * with already-granted locks, on the assumption that clients
223 * will be writing beyond the initial requested end and would
224 * then need to enqueue a new lock beyond previous request.
225 * l_req_extent->end strictly < req_start, checked above. */
226 if (l_extent->start < req_start && new_ex->start != req_start) {
227 if (l_extent->end >= req_start)
228 new_ex->start = req_start;
230 new_ex->start = min(l_extent->end+1, req_start);
233 /* If we need to cancel this lock anyways because our request
234 * overlaps the granted lock, we grow up to its requested
235 * extent start instead of limiting this extent, assuming that
236 * clients are writing forwards and the lock had over grown
237 * its extent downwards before we enqueued our request. */
238 if (l_extent->end > req_end) {
239 if (l_extent->start <= req_end)
240 new_ex->end = max(lock->l_req_extent.start - 1,
243 new_ex->end = max(l_extent->start - 1, req_end);
247 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
252 /* In order to determine the largest possible extent we can grant, we need
253 * to scan all of the queues. */
254 static void ldlm_extent_policy(struct ldlm_resource *res,
255 struct ldlm_lock *lock, __u64 *flags)
257 struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
259 if (lock->l_export == NULL)
261 * this is local lock taken by server (e.g., as a part of
262 * OST-side locking, or unlink handling). Expansion doesn't
263 * make a lot of sense for local locks, because they are
264 * dropped immediately on operation completion and would only
265 * conflict with other threads.
269 if (lock->l_policy_data.l_extent.start == 0 &&
270 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
271 /* fast-path whole file locks */
274 ldlm_extent_internal_policy_granted(lock, &new_ex);
275 ldlm_extent_internal_policy_waiting(lock, &new_ex);
277 if (new_ex.start != lock->l_policy_data.l_extent.start ||
278 new_ex.end != lock->l_policy_data.l_extent.end) {
279 *flags |= LDLM_FL_LOCK_CHANGED;
280 lock->l_policy_data.l_extent.start = new_ex.start;
281 lock->l_policy_data.l_extent.end = new_ex.end;
285 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
287 struct ldlm_resource *res = lock->l_resource;
288 cfs_time_t now = cfs_time_current();
290 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
293 CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
294 if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
295 res->lr_contention_time = now;
296 return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
297 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
300 struct ldlm_extent_compat_args {
301 cfs_list_t *work_list;
302 struct ldlm_lock *lock;
308 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
311 struct ldlm_extent_compat_args *priv = data;
312 struct ldlm_interval *node = to_ldlm_interval(n);
313 struct ldlm_extent *extent;
314 cfs_list_t *work_list = priv->work_list;
315 struct ldlm_lock *lock, *enq = priv->lock;
316 ldlm_mode_t mode = priv->mode;
320 LASSERT(!cfs_list_empty(&node->li_group));
322 cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
323 /* interval tree is for granted lock */
324 LASSERTF(mode == lock->l_granted_mode,
325 "mode = %s, lock->l_granted_mode = %s\n",
327 ldlm_lockname[lock->l_granted_mode]);
329 if (lock->l_blocking_ast)
330 ldlm_add_ast_work_item(lock, enq, work_list);
333 /* don't count conflicting glimpse locks */
334 extent = ldlm_interval_extent(node);
335 if (!(mode == LCK_PR &&
336 extent->start == 0 && extent->end == OBD_OBJECT_EOF))
337 *priv->locks += count;
342 RETURN(INTERVAL_ITER_CONT);
345 /* Determine if the lock is compatible with all locks on the queue.
346 * We stop walking the queue if we hit ourselves so we don't take
347 * conflicting locks enqueued after us into accound, or we'd wait forever.
349 * 0 if the lock is not compatible
350 * 1 if the lock is compatible
351 * 2 if this group lock is compatible and requires no further checking
352 * negative error, such as EWOULDBLOCK for group locks
355 ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
356 __u64 *flags, ldlm_error_t *err,
357 cfs_list_t *work_list, int *contended_locks)
360 struct ldlm_lock *lock;
361 struct ldlm_resource *res = req->l_resource;
362 ldlm_mode_t req_mode = req->l_req_mode;
363 __u64 req_start = req->l_req_extent.start;
364 __u64 req_end = req->l_req_extent.end;
367 int check_contention;
370 lockmode_verify(req_mode);
372 /* Using interval tree for granted lock */
373 if (queue == &res->lr_granted) {
374 struct ldlm_interval_tree *tree;
375 struct ldlm_extent_compat_args data = {.work_list = work_list,
377 .locks = contended_locks,
379 struct interval_node_extent ex = { .start = req_start,
383 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
384 tree = &res->lr_itree[idx];
385 if (tree->lit_root == NULL) /* empty tree, skipped */
388 data.mode = tree->lit_mode;
389 if (lockmode_compat(req_mode, tree->lit_mode)) {
390 struct ldlm_interval *node;
391 struct ldlm_extent *extent;
393 if (req_mode != LCK_GROUP)
396 /* group lock, grant it immediately if
398 node = to_ldlm_interval(tree->lit_root);
399 extent = ldlm_interval_extent(node);
400 if (req->l_policy_data.l_extent.gid ==
405 if (tree->lit_mode == LCK_GROUP) {
406 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
407 compat = -EWOULDBLOCK;
411 *flags |= LDLM_FL_NO_TIMEOUT;
415 /* if work list is not NULL,add all
416 locks in the tree to work list */
418 interval_iterate(tree->lit_root,
419 ldlm_extent_compat_cb, &data);
424 rc = interval_is_overlapped(tree->lit_root,&ex);
428 interval_search(tree->lit_root, &ex,
429 ldlm_extent_compat_cb, &data);
430 if (!cfs_list_empty(work_list) && compat)
434 } else { /* for waiting queue */
435 cfs_list_for_each(tmp, queue) {
436 check_contention = 1;
438 lock = cfs_list_entry(tmp, struct ldlm_lock,
444 if (unlikely(scan)) {
445 /* We only get here if we are queuing GROUP lock
446 and met some incompatible one. The main idea of this
447 code is to insert GROUP lock past compatible GROUP
448 lock in the waiting queue or if there is not any,
449 then in front of first non-GROUP lock */
450 if (lock->l_req_mode != LCK_GROUP) {
451 /* Ok, we hit non-GROUP lock, there should
452 * be no more GROUP locks later on, queue in
453 * front of first non-GROUP lock */
455 ldlm_resource_insert_lock_after(lock, req);
456 cfs_list_del_init(&lock->l_res_link);
457 ldlm_resource_insert_lock_after(req, lock);
461 if (req->l_policy_data.l_extent.gid ==
462 lock->l_policy_data.l_extent.gid) {
464 ldlm_resource_insert_lock_after(lock, req);
471 /* locks are compatible, overlap doesn't matter */
472 if (lockmode_compat(lock->l_req_mode, req_mode)) {
473 if (req_mode == LCK_PR &&
474 ((lock->l_policy_data.l_extent.start <=
475 req->l_policy_data.l_extent.start) &&
476 (lock->l_policy_data.l_extent.end >=
477 req->l_policy_data.l_extent.end))) {
478 /* If we met a PR lock just like us or wider,
479 and nobody down the list conflicted with
480 it, that means we can skip processing of
481 the rest of the list and safely place
482 ourselves at the end of the list, or grant
483 (dependent if we met an conflicting locks
485 In case of 1st enqueue only we continue
486 traversing if there is something conflicting
487 down the list because we need to make sure
488 that something is marked as AST_SENT as well,
489 in cse of empy worklist we would exit on
490 first conflict met. */
491 /* There IS a case where such flag is
492 not set for a lock, yet it blocks
493 something. Luckily for us this is
494 only during destroy, so lock is
495 exclusive. So here we are safe */
496 if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
501 /* non-group locks are compatible, overlap doesn't
503 if (likely(req_mode != LCK_GROUP))
506 /* If we are trying to get a GROUP lock and there is
507 another one of this kind, we need to compare gid */
508 if (req->l_policy_data.l_extent.gid ==
509 lock->l_policy_data.l_extent.gid) {
510 /* If existing lock with matched gid is granted,
511 we grant new one too. */
512 if (lock->l_req_mode == lock->l_granted_mode)
515 /* Otherwise we are scanning queue of waiting
516 * locks and it means current request would
517 * block along with existing lock (that is
519 * If we are in nonblocking mode - return
521 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
522 compat = -EWOULDBLOCK;
525 /* If this group lock is compatible with another
526 * group lock on the waiting list, they must be
527 * together in the list, so they can be granted
528 * at the same time. Otherwise the later lock
529 * can get stuck behind another, incompatible,
531 ldlm_resource_insert_lock_after(lock, req);
532 /* Because 'lock' is not granted, we can stop
533 * processing this queue and return immediately.
534 * There is no need to check the rest of the
540 if (unlikely(req_mode == LCK_GROUP &&
541 (lock->l_req_mode != lock->l_granted_mode))) {
544 if (lock->l_req_mode != LCK_GROUP) {
545 /* Ok, we hit non-GROUP lock, there should be no
546 more GROUP locks later on, queue in front of
547 first non-GROUP lock */
549 ldlm_resource_insert_lock_after(lock, req);
550 cfs_list_del_init(&lock->l_res_link);
551 ldlm_resource_insert_lock_after(req, lock);
554 if (req->l_policy_data.l_extent.gid ==
555 lock->l_policy_data.l_extent.gid) {
557 ldlm_resource_insert_lock_after(lock, req);
563 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
564 /* If compared lock is GROUP, then requested is PR/PW/
565 * so this is not compatible; extent range does not
567 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
568 compat = -EWOULDBLOCK;
571 *flags |= LDLM_FL_NO_TIMEOUT;
573 } else if (lock->l_policy_data.l_extent.end < req_start ||
574 lock->l_policy_data.l_extent.start > req_end) {
575 /* if a non group lock doesn't overlap skip it */
577 } else if (lock->l_req_extent.end < req_start ||
578 lock->l_req_extent.start > req_end) {
579 /* false contention, the requests doesn't really overlap */
580 check_contention = 0;
586 /* don't count conflicting glimpse locks */
587 if (lock->l_req_mode == LCK_PR &&
588 lock->l_policy_data.l_extent.start == 0 &&
589 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
590 check_contention = 0;
592 *contended_locks += check_contention;
595 if (lock->l_blocking_ast)
596 ldlm_add_ast_work_item(lock, req, work_list);
600 if (ldlm_check_contention(req, *contended_locks) &&
602 (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
603 req->l_req_mode != LCK_GROUP &&
604 req_end - req_start <=
605 ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
606 GOTO(destroylock, compat = -EUSERS);
610 cfs_list_del_init(&req->l_res_link);
611 ldlm_lock_destroy_nolock(req);
616 static void discard_bl_list(cfs_list_t *bl_list)
618 cfs_list_t *tmp, *pos;
621 cfs_list_for_each_safe(pos, tmp, bl_list) {
622 struct ldlm_lock *lock =
623 cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
625 cfs_list_del_init(&lock->l_bl_ast);
626 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
627 lock->l_flags &= ~LDLM_FL_AST_SENT;
628 LASSERT(lock->l_bl_ast_run == 0);
629 LASSERT(lock->l_blocking_lock);
630 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
631 lock->l_blocking_lock = NULL;
632 LDLM_LOCK_RELEASE(lock);
637 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
638 * - blocking ASTs have already been sent
639 * - must call this function with the ns lock held
641 * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
642 * - blocking ASTs have not been sent
643 * - must call this function with the ns lock held once */
644 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
645 int first_enq, ldlm_error_t *err,
646 cfs_list_t *work_list)
648 struct ldlm_resource *res = lock->l_resource;
649 CFS_LIST_HEAD(rpc_list);
651 int contended_locks = 0;
654 LASSERT(cfs_list_empty(&res->lr_converting));
655 LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
656 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
657 check_res_locked(res);
661 /* Careful observers will note that we don't handle -EWOULDBLOCK
662 * here, but it's ok for a non-obvious reason -- compat_queue
663 * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
664 * flags should always be zero here, and if that ever stops
665 * being true, we want to find out. */
666 LASSERT(*flags == 0);
667 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
668 err, NULL, &contended_locks);
670 rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
675 RETURN(LDLM_ITER_STOP);
677 ldlm_resource_unlink_lock(lock);
679 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
680 ldlm_extent_policy(res, lock, flags);
681 ldlm_grant_lock(lock, work_list);
682 RETURN(LDLM_ITER_CONTINUE);
687 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
688 &rpc_list, &contended_locks);
690 GOTO(out, rc); /* lock was destroyed */
694 rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
695 &rpc_list, &contended_locks);
697 GOTO(out, rc = rc2); /* lock was destroyed */
701 ldlm_extent_policy(res, lock, flags);
702 ldlm_resource_unlink_lock(lock);
703 ldlm_grant_lock(lock, NULL);
705 /* If either of the compat_queue()s returned failure, then we
706 * have ASTs to send and must go onto the waiting list.
708 * bug 2322: we used to unlink and re-add here, which was a
709 * terrible folly -- if we goto restart, we could get
710 * re-ordered! Causes deadlock, because ASTs aren't sent! */
711 if (cfs_list_empty(&lock->l_res_link))
712 ldlm_resource_add_lock(res, &res->lr_waiting, lock);
714 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
717 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
718 !ns_is_client(ldlm_res_to_ns(res)))
719 class_fail_export(lock->l_export);
722 if (rc == -ERESTART) {
724 /* 15715: The lock was granted and destroyed after
725 * resource lock was dropped. Interval node was freed
726 * in ldlm_lock_destroy. Anyway, this always happens
727 * when a client is being evicted. So it would be
728 * ok to return an error. -jay */
729 if (lock->l_destroyed) {
731 GOTO(out, rc = -EAGAIN);
734 /* lock was granted while resource was unlocked. */
735 if (lock->l_granted_mode == lock->l_req_mode) {
736 /* bug 11300: if the lock has been granted,
737 * break earlier because otherwise, we will go
738 * to restart and ldlm_resource_unlink will be
739 * called and it causes the interval node to be
740 * freed. Then we will fail at
741 * ldlm_extent_add_lock() */
742 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
747 GOTO(restart, -ERESTART);
750 *flags |= LDLM_FL_BLOCK_GRANTED;
751 /* this way we force client to wait for the lock
752 * endlessly once the lock is enqueued -bzzz */
753 *flags |= LDLM_FL_NO_TIMEOUT;
758 if (!cfs_list_empty(&rpc_list)) {
759 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
760 discard_bl_list(&rpc_list);
764 #endif /* HAVE_SERVER_SUPPORT */
766 /* When a lock is cancelled by a client, the KMS may undergo change if this
767 * is the "highest lock". This function returns the new KMS value.
768 * Caller must hold lr_lock already.
770 * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
771 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
773 struct ldlm_resource *res = lock->l_resource;
775 struct ldlm_lock *lck;
779 /* don't let another thread in ldlm_extent_shift_kms race in
780 * just after we finish and take our lock into account in its
781 * calculation of the kms */
782 lock->l_flags |= LDLM_FL_KMS_IGNORE;
784 cfs_list_for_each(tmp, &res->lr_granted) {
785 lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
787 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
790 if (lck->l_policy_data.l_extent.end >= old_kms)
793 /* This extent _has_ to be smaller than old_kms (checked above)
794 * so kms can only ever be smaller or the same as old_kms. */
795 if (lck->l_policy_data.l_extent.end + 1 > kms)
796 kms = lck->l_policy_data.l_extent.end + 1;
798 LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
802 EXPORT_SYMBOL(ldlm_extent_shift_kms);
804 cfs_mem_cache_t *ldlm_interval_slab;
805 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
807 struct ldlm_interval *node;
810 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
811 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
815 CFS_INIT_LIST_HEAD(&node->li_group);
816 ldlm_interval_attach(node, lock);
820 void ldlm_interval_free(struct ldlm_interval *node)
823 LASSERT(cfs_list_empty(&node->li_group));
824 LASSERT(!interval_is_intree(&node->li_node));
825 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
829 /* interval tree, for LDLM_EXTENT. */
830 void ldlm_interval_attach(struct ldlm_interval *n,
833 LASSERT(l->l_tree_node == NULL);
834 LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
836 cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
840 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
842 struct ldlm_interval *n = l->l_tree_node;
847 LASSERT(!cfs_list_empty(&n->li_group));
848 l->l_tree_node = NULL;
849 cfs_list_del_init(&l->l_sl_policy);
851 return (cfs_list_empty(&n->li_group) ? n : NULL);
854 static inline int lock_mode_to_index(ldlm_mode_t mode)
859 LASSERT(IS_PO2(mode));
860 for (index = -1; mode; index++, mode >>= 1) ;
861 LASSERT(index < LCK_MODE_NUM);
865 void ldlm_extent_add_lock(struct ldlm_resource *res,
866 struct ldlm_lock *lock)
868 struct interval_node *found, **root;
869 struct ldlm_interval *node;
870 struct ldlm_extent *extent;
873 LASSERT(lock->l_granted_mode == lock->l_req_mode);
875 node = lock->l_tree_node;
876 LASSERT(node != NULL);
877 LASSERT(!interval_is_intree(&node->li_node));
879 idx = lock_mode_to_index(lock->l_granted_mode);
880 LASSERT(lock->l_granted_mode == 1 << idx);
881 LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
883 /* node extent initialize */
884 extent = &lock->l_policy_data.l_extent;
885 interval_set(&node->li_node, extent->start, extent->end);
887 root = &res->lr_itree[idx].lit_root;
888 found = interval_insert(&node->li_node, root);
889 if (found) { /* The policy group found. */
890 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
891 LASSERT(tmp != NULL);
892 ldlm_interval_free(tmp);
893 ldlm_interval_attach(to_ldlm_interval(found), lock);
895 res->lr_itree[idx].lit_size++;
897 /* even though we use interval tree to manage the extent lock, we also
898 * add the locks into grant list, for debug purpose, .. */
899 ldlm_resource_add_lock(res, &res->lr_granted, lock);
902 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
904 struct ldlm_resource *res = lock->l_resource;
905 struct ldlm_interval *node = lock->l_tree_node;
906 struct ldlm_interval_tree *tree;
909 if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
912 idx = lock_mode_to_index(lock->l_granted_mode);
913 LASSERT(lock->l_granted_mode == 1 << idx);
914 tree = &res->lr_itree[idx];
916 LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
919 node = ldlm_interval_detach(lock);
921 interval_erase(&node->li_node, &tree->lit_root);
922 ldlm_interval_free(node);
926 void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
927 ldlm_policy_data_t *lpolicy)
929 memset(lpolicy, 0, sizeof(*lpolicy));
930 lpolicy->l_extent.start = wpolicy->l_extent.start;
931 lpolicy->l_extent.end = wpolicy->l_extent.end;
932 lpolicy->l_extent.gid = wpolicy->l_extent.gid;
935 void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
936 ldlm_wire_policy_data_t *wpolicy)
938 memset(wpolicy, 0, sizeof(*wpolicy));
939 wpolicy->l_extent.start = lpolicy->l_extent.start;
940 wpolicy->l_extent.end = lpolicy->l_extent.end;
941 wpolicy->l_extent.gid = lpolicy->l_extent.gid;