1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/ldlm/ldlm_extent.c
41 * Author: Peter Braam <braam@clusterfs.com>
42 * Author: Phil Schwan <phil@clusterfs.com>
45 #define DEBUG_SUBSYSTEM S_LDLM
47 # include <liblustre.h>
49 # include <libcfs/libcfs.h>
52 #include <lustre_dlm.h>
53 #include <obd_support.h>
55 #include <obd_class.h>
56 #include <lustre_lib.h>
58 #include "ldlm_internal.h"
60 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
62 /* fixup the ldlm_extent after expanding */
63 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
64 struct ldlm_extent *new_ex,
67 ldlm_mode_t req_mode = req->l_req_mode;
68 __u64 req_start = req->l_req_extent.start;
69 __u64 req_end = req->l_req_extent.end;
70 __u64 req_align, mask;
72 if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
73 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
74 new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
78 if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
83 /* we need to ensure that the lock extent is properly aligned to what
84 * the client requested. Also we need to make sure it's also server
85 * page size aligned otherwise a server page can be covered by two
88 req_align = (req_end + 1) | req_start;
89 if (req_align != 0 && (req_align & (mask - 1)) == 0) {
90 while ((req_align & mask) == 0)
94 /* We can only shrink the lock, not grow it.
95 * This should never cause lock to be smaller than requested,
96 * since requested lock was already aligned on these boundaries. */
97 new_ex->start = ((new_ex->start - 1) | mask) + 1;
98 new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
99 LASSERTF(new_ex->start <= req_start,
100 "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
101 mask, new_ex->start, req_start);
102 LASSERTF(new_ex->end >= req_end,
103 "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
104 mask, new_ex->end, req_end);
107 /* The purpose of this function is to return:
108 * - the maximum extent
109 * - containing the requested extent
110 * - and not overlapping existing conflicting extents outside the requested one
112 * Use interval tree to expand the lock extent for granted lock.
114 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
115 struct ldlm_extent *new_ex)
117 struct ldlm_resource *res = req->l_resource;
118 ldlm_mode_t req_mode = req->l_req_mode;
119 __u64 req_start = req->l_req_extent.start;
120 __u64 req_end = req->l_req_extent.end;
121 struct ldlm_interval_tree *tree;
122 struct interval_node_extent limiter = { new_ex->start, new_ex->end };
127 lockmode_verify(req_mode);
129 /* using interval tree to handle the ldlm extent granted locks */
130 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
131 struct interval_node_extent ext = { req_start, req_end };
133 tree = &res->lr_itree[idx];
134 if (lockmode_compat(tree->lit_mode, req_mode))
137 conflicting += tree->lit_size;
139 limiter.start = req_start;
141 if (interval_is_overlapped(tree->lit_root, &ext))
143 "req_mode = %d, tree->lit_mode = %d, "
144 "tree->lit_size = %d\n",
145 req_mode, tree->lit_mode, tree->lit_size);
146 interval_expand(tree->lit_root, &ext, &limiter);
147 limiter.start = max(limiter.start, ext.start);
148 limiter.end = min(limiter.end, ext.end);
149 if (limiter.start == req_start && limiter.end == req_end)
153 new_ex->start = limiter.start;
154 new_ex->end = limiter.end;
155 LASSERT(new_ex->start <= req_start);
156 LASSERT(new_ex->end >= req_end);
158 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
162 /* The purpose of this function is to return:
163 * - the maximum extent
164 * - containing the requested extent
165 * - and not overlapping existing conflicting extents outside the requested one
168 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
169 struct ldlm_extent *new_ex)
172 struct ldlm_resource *res = req->l_resource;
173 ldlm_mode_t req_mode = req->l_req_mode;
174 __u64 req_start = req->l_req_extent.start;
175 __u64 req_end = req->l_req_extent.end;
179 lockmode_verify(req_mode);
181 /* for waiting locks */
182 cfs_list_for_each(tmp, &res->lr_waiting) {
183 struct ldlm_lock *lock;
184 struct ldlm_extent *l_extent;
186 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
187 l_extent = &lock->l_policy_data.l_extent;
189 /* We already hit the minimum requested size, search no more */
190 if (new_ex->start == req_start && new_ex->end == req_end) {
195 /* Don't conflict with ourselves */
199 /* Locks are compatible, overlap doesn't matter */
200 /* Until bug 20 is fixed, try to avoid granting overlapping
201 * locks on one client (they take a long time to cancel) */
202 if (lockmode_compat(lock->l_req_mode, req_mode) &&
203 lock->l_export != req->l_export)
206 /* If this is a high-traffic lock, don't grow downwards at all
207 * or grow upwards too much */
210 new_ex->start = req_start;
212 /* If lock doesn't overlap new_ex, skip it. */
213 if (!ldlm_extent_overlap(l_extent, new_ex))
216 /* Locks conflicting in requested extents and we can't satisfy
217 * both locks, so ignore it. Either we will ping-pong this
218 * extent (we would regardless of what extent we granted) or
219 * lock is unused and it shouldn't limit our extent growth. */
220 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
223 /* We grow extents downwards only as far as they don't overlap
224 * with already-granted locks, on the assumption that clients
225 * will be writing beyond the initial requested end and would
226 * then need to enqueue a new lock beyond previous request.
227 * l_req_extent->end strictly < req_start, checked above. */
228 if (l_extent->start < req_start && new_ex->start != req_start) {
229 if (l_extent->end >= req_start)
230 new_ex->start = req_start;
232 new_ex->start = min(l_extent->end+1, req_start);
235 /* If we need to cancel this lock anyways because our request
236 * overlaps the granted lock, we grow up to its requested
237 * extent start instead of limiting this extent, assuming that
238 * clients are writing forwards and the lock had over grown
239 * its extent downwards before we enqueued our request. */
240 if (l_extent->end > req_end) {
241 if (l_extent->start <= req_end)
242 new_ex->end = max(lock->l_req_extent.start - 1,
245 new_ex->end = max(l_extent->start - 1, req_end);
249 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
254 /* In order to determine the largest possible extent we can grant, we need
255 * to scan all of the queues. */
256 static void ldlm_extent_policy(struct ldlm_resource *res,
257 struct ldlm_lock *lock, int *flags)
259 struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
261 if (lock->l_export == NULL)
263 * this is local lock taken by server (e.g., as a part of
264 * OST-side locking, or unlink handling). Expansion doesn't
265 * make a lot of sense for local locks, because they are
266 * dropped immediately on operation completion and would only
267 * conflict with other threads.
271 if (lock->l_policy_data.l_extent.start == 0 &&
272 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
273 /* fast-path whole file locks */
276 ldlm_extent_internal_policy_granted(lock, &new_ex);
277 ldlm_extent_internal_policy_waiting(lock, &new_ex);
279 if (new_ex.start != lock->l_policy_data.l_extent.start ||
280 new_ex.end != lock->l_policy_data.l_extent.end) {
281 *flags |= LDLM_FL_LOCK_CHANGED;
282 lock->l_policy_data.l_extent.start = new_ex.start;
283 lock->l_policy_data.l_extent.end = new_ex.end;
287 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
289 struct ldlm_resource *res = lock->l_resource;
290 cfs_time_t now = cfs_time_current();
292 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
295 CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
296 if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
297 res->lr_contention_time = now;
298 return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
299 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
302 struct ldlm_extent_compat_args {
303 cfs_list_t *work_list;
304 struct ldlm_lock *lock;
310 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
313 struct ldlm_extent_compat_args *priv = data;
314 struct ldlm_interval *node = to_ldlm_interval(n);
315 struct ldlm_extent *extent;
316 cfs_list_t *work_list = priv->work_list;
317 struct ldlm_lock *lock, *enq = priv->lock;
318 ldlm_mode_t mode = priv->mode;
322 LASSERT(!cfs_list_empty(&node->li_group));
324 cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
325 /* interval tree is for granted lock */
326 LASSERTF(mode == lock->l_granted_mode,
327 "mode = %s, lock->l_granted_mode = %s\n",
329 ldlm_lockname[lock->l_granted_mode]);
331 if (lock->l_blocking_ast)
332 ldlm_add_ast_work_item(lock, enq, work_list);
335 /* don't count conflicting glimpse locks */
336 extent = ldlm_interval_extent(node);
337 if (!(mode == LCK_PR &&
338 extent->start == 0 && extent->end == OBD_OBJECT_EOF))
339 *priv->locks += count;
344 RETURN(INTERVAL_ITER_CONT);
347 /* Determine if the lock is compatible with all locks on the queue.
348 * We stop walking the queue if we hit ourselves so we don't take
349 * conflicting locks enqueued after us into accound, or we'd wait forever.
351 * 0 if the lock is not compatible
352 * 1 if the lock is compatible
353 * 2 if this group lock is compatible and requires no further checking
354 * negative error, such as EWOULDBLOCK for group locks
357 ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
358 int *flags, ldlm_error_t *err,
359 cfs_list_t *work_list, int *contended_locks)
362 struct ldlm_lock *lock;
363 struct ldlm_resource *res = req->l_resource;
364 ldlm_mode_t req_mode = req->l_req_mode;
365 __u64 req_start = req->l_req_extent.start;
366 __u64 req_end = req->l_req_extent.end;
369 int check_contention;
372 lockmode_verify(req_mode);
374 /* Using interval tree for granted lock */
375 if (queue == &res->lr_granted) {
376 struct ldlm_interval_tree *tree;
377 struct ldlm_extent_compat_args data = {.work_list = work_list,
379 .locks = contended_locks,
381 struct interval_node_extent ex = { .start = req_start,
385 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
386 tree = &res->lr_itree[idx];
387 if (tree->lit_root == NULL) /* empty tree, skipped */
390 data.mode = tree->lit_mode;
391 if (lockmode_compat(req_mode, tree->lit_mode)) {
392 struct ldlm_interval *node;
393 struct ldlm_extent *extent;
395 if (req_mode != LCK_GROUP)
398 /* group lock, grant it immediately if
400 node = to_ldlm_interval(tree->lit_root);
401 extent = ldlm_interval_extent(node);
402 if (req->l_policy_data.l_extent.gid ==
407 if (tree->lit_mode == LCK_GROUP) {
408 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
409 compat = -EWOULDBLOCK;
413 *flags |= LDLM_FL_NO_TIMEOUT;
417 /* if work list is not NULL,add all
418 locks in the tree to work list */
420 interval_iterate(tree->lit_root,
421 ldlm_extent_compat_cb, &data);
426 rc = interval_is_overlapped(tree->lit_root,&ex);
430 interval_search(tree->lit_root, &ex,
431 ldlm_extent_compat_cb, &data);
432 if (!cfs_list_empty(work_list) && compat)
436 } else { /* for waiting queue */
437 cfs_list_for_each(tmp, queue) {
438 check_contention = 1;
440 lock = cfs_list_entry(tmp, struct ldlm_lock,
446 if (unlikely(scan)) {
447 /* We only get here if we are queuing GROUP lock
448 and met some incompatible one. The main idea of this
449 code is to insert GROUP lock past compatible GROUP
450 lock in the waiting queue or if there is not any,
451 then in front of first non-GROUP lock */
452 if (lock->l_req_mode != LCK_GROUP) {
453 /* Ok, we hit non-GROUP lock, there should
454 * be no more GROUP locks later on, queue in
455 * front of first non-GROUP lock */
457 ldlm_resource_insert_lock_after(lock, req);
458 cfs_list_del_init(&lock->l_res_link);
459 ldlm_resource_insert_lock_after(req, lock);
463 if (req->l_policy_data.l_extent.gid ==
464 lock->l_policy_data.l_extent.gid) {
466 ldlm_resource_insert_lock_after(lock, req);
473 /* locks are compatible, overlap doesn't matter */
474 if (lockmode_compat(lock->l_req_mode, req_mode)) {
475 if (req_mode == LCK_PR &&
476 ((lock->l_policy_data.l_extent.start <=
477 req->l_policy_data.l_extent.start) &&
478 (lock->l_policy_data.l_extent.end >=
479 req->l_policy_data.l_extent.end))) {
480 /* If we met a PR lock just like us or wider,
481 and nobody down the list conflicted with
482 it, that means we can skip processing of
483 the rest of the list and safely place
484 ourselves at the end of the list, or grant
485 (dependent if we met an conflicting locks
487 In case of 1st enqueue only we continue
488 traversing if there is something conflicting
489 down the list because we need to make sure
490 that something is marked as AST_SENT as well,
491 in cse of empy worklist we would exit on
492 first conflict met. */
493 /* There IS a case where such flag is
494 not set for a lock, yet it blocks
495 something. Luckily for us this is
496 only during destroy, so lock is
497 exclusive. So here we are safe */
498 if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
503 /* non-group locks are compatible, overlap doesn't
505 if (likely(req_mode != LCK_GROUP))
508 /* If we are trying to get a GROUP lock and there is
509 another one of this kind, we need to compare gid */
510 if (req->l_policy_data.l_extent.gid ==
511 lock->l_policy_data.l_extent.gid) {
512 /* If existing lock with matched gid is granted,
513 we grant new one too. */
514 if (lock->l_req_mode == lock->l_granted_mode)
517 /* Otherwise we are scanning queue of waiting
518 * locks and it means current request would
519 * block along with existing lock (that is
521 * If we are in nonblocking mode - return
523 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
524 compat = -EWOULDBLOCK;
527 /* If this group lock is compatible with another
528 * group lock on the waiting list, they must be
529 * together in the list, so they can be granted
530 * at the same time. Otherwise the later lock
531 * can get stuck behind another, incompatible,
533 ldlm_resource_insert_lock_after(lock, req);
534 /* Because 'lock' is not granted, we can stop
535 * processing this queue and return immediately.
536 * There is no need to check the rest of the
542 if (unlikely(req_mode == LCK_GROUP &&
543 (lock->l_req_mode != lock->l_granted_mode))) {
546 if (lock->l_req_mode != LCK_GROUP) {
547 /* Ok, we hit non-GROUP lock, there should be no
548 more GROUP locks later on, queue in front of
549 first non-GROUP lock */
551 ldlm_resource_insert_lock_after(lock, req);
552 cfs_list_del_init(&lock->l_res_link);
553 ldlm_resource_insert_lock_after(req, lock);
556 if (req->l_policy_data.l_extent.gid ==
557 lock->l_policy_data.l_extent.gid) {
559 ldlm_resource_insert_lock_after(lock, req);
565 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
566 /* If compared lock is GROUP, then requested is PR/PW/
567 * so this is not compatible; extent range does not
569 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
570 compat = -EWOULDBLOCK;
573 *flags |= LDLM_FL_NO_TIMEOUT;
575 } else if (lock->l_policy_data.l_extent.end < req_start ||
576 lock->l_policy_data.l_extent.start > req_end) {
577 /* if a non group lock doesn't overlap skip it */
579 } else if (lock->l_req_extent.end < req_start ||
580 lock->l_req_extent.start > req_end) {
581 /* false contention, the requests doesn't really overlap */
582 check_contention = 0;
588 /* don't count conflicting glimpse locks */
589 if (lock->l_req_mode == LCK_PR &&
590 lock->l_policy_data.l_extent.start == 0 &&
591 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
592 check_contention = 0;
594 *contended_locks += check_contention;
597 if (lock->l_blocking_ast)
598 ldlm_add_ast_work_item(lock, req, work_list);
602 if (ldlm_check_contention(req, *contended_locks) &&
604 (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
605 req->l_req_mode != LCK_GROUP &&
606 req_end - req_start <=
607 ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
608 GOTO(destroylock, compat = -EUSERS);
612 cfs_list_del_init(&req->l_res_link);
613 ldlm_lock_destroy_nolock(req);
618 static void discard_bl_list(cfs_list_t *bl_list)
620 cfs_list_t *tmp, *pos;
623 cfs_list_for_each_safe(pos, tmp, bl_list) {
624 struct ldlm_lock *lock =
625 cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
627 cfs_list_del_init(&lock->l_bl_ast);
628 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
629 lock->l_flags &= ~LDLM_FL_AST_SENT;
630 LASSERT(lock->l_bl_ast_run == 0);
631 LASSERT(lock->l_blocking_lock);
632 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
633 lock->l_blocking_lock = NULL;
634 LDLM_LOCK_RELEASE(lock);
639 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
640 * - blocking ASTs have already been sent
641 * - must call this function with the ns lock held
643 * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
644 * - blocking ASTs have not been sent
645 * - must call this function with the ns lock held once */
646 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
647 ldlm_error_t *err, cfs_list_t *work_list)
649 struct ldlm_resource *res = lock->l_resource;
650 CFS_LIST_HEAD(rpc_list);
652 int contended_locks = 0;
655 LASSERT(cfs_list_empty(&res->lr_converting));
656 LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
657 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
658 check_res_locked(res);
662 /* Careful observers will note that we don't handle -EWOULDBLOCK
663 * here, but it's ok for a non-obvious reason -- compat_queue
664 * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
665 * flags should always be zero here, and if that ever stops
666 * being true, we want to find out. */
667 LASSERT(*flags == 0);
668 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
669 err, NULL, &contended_locks);
671 rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
676 RETURN(LDLM_ITER_STOP);
678 ldlm_resource_unlink_lock(lock);
680 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
681 ldlm_extent_policy(res, lock, flags);
682 ldlm_grant_lock(lock, work_list);
683 RETURN(LDLM_ITER_CONTINUE);
688 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
689 &rpc_list, &contended_locks);
691 GOTO(out, rc); /* lock was destroyed */
695 rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
696 &rpc_list, &contended_locks);
698 GOTO(out, rc = rc2); /* lock was destroyed */
702 ldlm_extent_policy(res, lock, flags);
703 ldlm_resource_unlink_lock(lock);
704 ldlm_grant_lock(lock, NULL);
706 /* If either of the compat_queue()s returned failure, then we
707 * have ASTs to send and must go onto the waiting list.
709 * bug 2322: we used to unlink and re-add here, which was a
710 * terrible folly -- if we goto restart, we could get
711 * re-ordered! Causes deadlock, because ASTs aren't sent! */
712 if (cfs_list_empty(&lock->l_res_link))
713 ldlm_resource_add_lock(res, &res->lr_waiting, lock);
715 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
717 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
718 !ns_is_client(ldlm_res_to_ns(res)))
719 class_fail_export(lock->l_export);
722 if (rc == -ERESTART) {
724 /* 15715: The lock was granted and destroyed after
725 * resource lock was dropped. Interval node was freed
726 * in ldlm_lock_destroy. Anyway, this always happens
727 * when a client is being evicted. So it would be
728 * ok to return an error. -jay */
729 if (lock->l_destroyed) {
731 GOTO(out, rc = -EAGAIN);
734 /* lock was granted while resource was unlocked. */
735 if (lock->l_granted_mode == lock->l_req_mode) {
736 /* bug 11300: if the lock has been granted,
737 * break earlier because otherwise, we will go
738 * to restart and ldlm_resource_unlink will be
739 * called and it causes the interval node to be
740 * freed. Then we will fail at
741 * ldlm_extent_add_lock() */
742 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
747 GOTO(restart, -ERESTART);
750 *flags |= LDLM_FL_BLOCK_GRANTED;
751 /* this way we force client to wait for the lock
752 * endlessly once the lock is enqueued -bzzz */
753 *flags |= LDLM_FL_NO_TIMEOUT;
758 if (!cfs_list_empty(&rpc_list)) {
759 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
760 discard_bl_list(&rpc_list);
765 /* When a lock is cancelled by a client, the KMS may undergo change if this
766 * is the "highest lock". This function returns the new KMS value.
767 * Caller must hold lr_lock already.
769 * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
770 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
772 struct ldlm_resource *res = lock->l_resource;
774 struct ldlm_lock *lck;
778 /* don't let another thread in ldlm_extent_shift_kms race in
779 * just after we finish and take our lock into account in its
780 * calculation of the kms */
781 lock->l_flags |= LDLM_FL_KMS_IGNORE;
783 cfs_list_for_each(tmp, &res->lr_granted) {
784 lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
786 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
789 if (lck->l_policy_data.l_extent.end >= old_kms)
792 /* This extent _has_ to be smaller than old_kms (checked above)
793 * so kms can only ever be smaller or the same as old_kms. */
794 if (lck->l_policy_data.l_extent.end + 1 > kms)
795 kms = lck->l_policy_data.l_extent.end + 1;
797 LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
802 cfs_mem_cache_t *ldlm_interval_slab;
803 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
805 struct ldlm_interval *node;
808 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
809 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
813 CFS_INIT_LIST_HEAD(&node->li_group);
814 ldlm_interval_attach(node, lock);
818 void ldlm_interval_free(struct ldlm_interval *node)
821 LASSERT(cfs_list_empty(&node->li_group));
822 LASSERT(!interval_is_intree(&node->li_node));
823 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
827 /* interval tree, for LDLM_EXTENT. */
828 void ldlm_interval_attach(struct ldlm_interval *n,
831 LASSERT(l->l_tree_node == NULL);
832 LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
834 cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
838 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
840 struct ldlm_interval *n = l->l_tree_node;
845 LASSERT(!cfs_list_empty(&n->li_group));
846 l->l_tree_node = NULL;
847 cfs_list_del_init(&l->l_sl_policy);
849 return (cfs_list_empty(&n->li_group) ? n : NULL);
852 static inline int lock_mode_to_index(ldlm_mode_t mode)
857 LASSERT(IS_PO2(mode));
858 for (index = -1; mode; index++, mode >>= 1) ;
859 LASSERT(index < LCK_MODE_NUM);
863 void ldlm_extent_add_lock(struct ldlm_resource *res,
864 struct ldlm_lock *lock)
866 struct interval_node *found, **root;
867 struct ldlm_interval *node;
868 struct ldlm_extent *extent;
871 LASSERT(lock->l_granted_mode == lock->l_req_mode);
873 node = lock->l_tree_node;
874 LASSERT(node != NULL);
875 LASSERT(!interval_is_intree(&node->li_node));
877 idx = lock_mode_to_index(lock->l_granted_mode);
878 LASSERT(lock->l_granted_mode == 1 << idx);
879 LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
881 /* node extent initialize */
882 extent = &lock->l_policy_data.l_extent;
883 interval_set(&node->li_node, extent->start, extent->end);
885 root = &res->lr_itree[idx].lit_root;
886 found = interval_insert(&node->li_node, root);
887 if (found) { /* The policy group found. */
888 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
889 LASSERT(tmp != NULL);
890 ldlm_interval_free(tmp);
891 ldlm_interval_attach(to_ldlm_interval(found), lock);
893 res->lr_itree[idx].lit_size++;
895 /* even though we use interval tree to manage the extent lock, we also
896 * add the locks into grant list, for debug purpose, .. */
897 ldlm_resource_add_lock(res, &res->lr_granted, lock);
900 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
902 struct ldlm_resource *res = lock->l_resource;
903 struct ldlm_interval *node = lock->l_tree_node;
904 struct ldlm_interval_tree *tree;
907 if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
910 idx = lock_mode_to_index(lock->l_granted_mode);
911 LASSERT(lock->l_granted_mode == 1 << idx);
912 tree = &res->lr_itree[idx];
914 LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
917 node = ldlm_interval_detach(lock);
919 interval_erase(&node->li_node, &tree->lit_root);
920 ldlm_interval_free(node);
924 void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
925 ldlm_policy_data_t *lpolicy)
927 memset(lpolicy, 0, sizeof(*lpolicy));
928 lpolicy->l_extent.start = wpolicy->l_extent.start;
929 lpolicy->l_extent.end = wpolicy->l_extent.end;
930 lpolicy->l_extent.gid = wpolicy->l_extent.gid;
933 void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
934 ldlm_wire_policy_data_t *wpolicy)
936 memset(wpolicy, 0, sizeof(*wpolicy));
937 wpolicy->l_extent.start = lpolicy->l_extent.start;
938 wpolicy->l_extent.end = lpolicy->l_extent.end;
939 wpolicy->l_extent.gid = lpolicy->l_extent.gid;