4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2010, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ldlm/ldlm_extent.c
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
43 * This file contains implementation of EXTENT lock type
45 * EXTENT lock type is for locking a contiguous range of values, represented
46 * by 64-bit starting and ending offsets (inclusive). There are several extent
47 * lock modes, some of which may be mutually incompatible. Extent locks are
48 * considered incompatible if their modes are incompatible and their extents
49 * intersect. See the lock mode compatibility matrix in lustre_dlm.h.
52 #define DEBUG_SUBSYSTEM S_LDLM
54 # include <liblustre.h>
56 # include <libcfs/libcfs.h>
59 #include <lustre_dlm.h>
60 #include <obd_support.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
65 #include "ldlm_internal.h"
67 #ifdef HAVE_SERVER_SUPPORT
68 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
71 * Fix up the ldlm_extent after expanding it.
73 * After expansion has been done, we might still want to do certain adjusting
74 * based on overall contention of the resource and the like to avoid granting
77 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
78 struct ldlm_extent *new_ex,
81 ldlm_mode_t req_mode = req->l_req_mode;
82 __u64 req_start = req->l_req_extent.start;
83 __u64 req_end = req->l_req_extent.end;
84 __u64 req_align, mask;
86 if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
87 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
88 new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
92 if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
97 /* we need to ensure that the lock extent is properly aligned to what
98 * the client requested. Also we need to make sure it's also server
99 * page size aligned otherwise a server page can be covered by two
101 mask = PAGE_CACHE_SIZE;
102 req_align = (req_end + 1) | req_start;
103 if (req_align != 0 && (req_align & (mask - 1)) == 0) {
104 while ((req_align & mask) == 0)
108 /* We can only shrink the lock, not grow it.
109 * This should never cause lock to be smaller than requested,
110 * since requested lock was already aligned on these boundaries. */
111 new_ex->start = ((new_ex->start - 1) | mask) + 1;
112 new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
113 LASSERTF(new_ex->start <= req_start,
114 "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
115 mask, new_ex->start, req_start);
116 LASSERTF(new_ex->end >= req_end,
117 "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
118 mask, new_ex->end, req_end);
122 * Return the maximum extent that:
123 * - contains the requested extent
124 * - does not overlap existing conflicting extents outside the requested one
126 * This allows clients to request a small required extent range, but if there
127 * is no contention on the lock the full lock can be granted to the client.
128 * This avoids the need for many smaller lock requests to be granted in the
129 * common (uncontended) case.
131 * Use interval tree to expand the lock extent for granted lock.
133 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
134 struct ldlm_extent *new_ex)
136 struct ldlm_resource *res = req->l_resource;
137 ldlm_mode_t req_mode = req->l_req_mode;
138 __u64 req_start = req->l_req_extent.start;
139 __u64 req_end = req->l_req_extent.end;
140 struct ldlm_interval_tree *tree;
141 struct interval_node_extent limiter = { new_ex->start, new_ex->end };
146 lockmode_verify(req_mode);
148 /* Using interval tree to handle the LDLM extent granted locks. */
149 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
150 struct interval_node_extent ext = { req_start, req_end };
152 tree = &res->lr_itree[idx];
153 if (lockmode_compat(tree->lit_mode, req_mode))
156 conflicting += tree->lit_size;
158 limiter.start = req_start;
160 if (interval_is_overlapped(tree->lit_root, &ext))
162 "req_mode = %d, tree->lit_mode = %d, "
163 "tree->lit_size = %d\n",
164 req_mode, tree->lit_mode, tree->lit_size);
165 interval_expand(tree->lit_root, &ext, &limiter);
166 limiter.start = max(limiter.start, ext.start);
167 limiter.end = min(limiter.end, ext.end);
168 if (limiter.start == req_start && limiter.end == req_end)
172 new_ex->start = limiter.start;
173 new_ex->end = limiter.end;
174 LASSERT(new_ex->start <= req_start);
175 LASSERT(new_ex->end >= req_end);
177 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
181 /* The purpose of this function is to return:
182 * - the maximum extent
183 * - containing the requested extent
184 * - and not overlapping existing conflicting extents outside the requested one
187 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
188 struct ldlm_extent *new_ex)
190 struct ldlm_resource *res = req->l_resource;
191 ldlm_mode_t req_mode = req->l_req_mode;
192 __u64 req_start = req->l_req_extent.start;
193 __u64 req_end = req->l_req_extent.end;
194 struct ldlm_lock *lock;
198 lockmode_verify(req_mode);
200 /* for waiting locks */
201 list_for_each_entry(lock, &res->lr_waiting, l_res_link) {
202 struct ldlm_extent *l_extent = &lock->l_policy_data.l_extent;
204 /* We already hit the minimum requested size, search no more */
205 if (new_ex->start == req_start && new_ex->end == req_end) {
210 /* Don't conflict with ourselves */
214 /* Locks are compatible, overlap doesn't matter */
215 /* Until bug 20 is fixed, try to avoid granting overlapping
216 * locks on one client (they take a long time to cancel) */
217 if (lockmode_compat(lock->l_req_mode, req_mode) &&
218 lock->l_export != req->l_export)
221 /* If this is a high-traffic lock, don't grow downwards at all
222 * or grow upwards too much */
225 new_ex->start = req_start;
227 /* If lock doesn't overlap new_ex, skip it. */
228 if (!ldlm_extent_overlap(l_extent, new_ex))
231 /* Locks conflicting in requested extents and we can't satisfy
232 * both locks, so ignore it. Either we will ping-pong this
233 * extent (we would regardless of what extent we granted) or
234 * lock is unused and it shouldn't limit our extent growth. */
235 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
238 /* We grow extents downwards only as far as they don't overlap
239 * with already-granted locks, on the assumption that clients
240 * will be writing beyond the initial requested end and would
241 * then need to enqueue a new lock beyond previous request.
242 * l_req_extent->end strictly < req_start, checked above. */
243 if (l_extent->start < req_start && new_ex->start != req_start) {
244 if (l_extent->end >= req_start)
245 new_ex->start = req_start;
247 new_ex->start = min(l_extent->end+1, req_start);
250 /* If we need to cancel this lock anyways because our request
251 * overlaps the granted lock, we grow up to its requested
252 * extent start instead of limiting this extent, assuming that
253 * clients are writing forwards and the lock had over grown
254 * its extent downwards before we enqueued our request. */
255 if (l_extent->end > req_end) {
256 if (l_extent->start <= req_end)
257 new_ex->end = max(lock->l_req_extent.start - 1,
260 new_ex->end = max(l_extent->start - 1, req_end);
264 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
269 /* In order to determine the largest possible extent we can grant, we need
270 * to scan all of the queues. */
271 static void ldlm_extent_policy(struct ldlm_resource *res,
272 struct ldlm_lock *lock, __u64 *flags)
274 struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
276 if (lock->l_export == NULL)
278 * this is local lock taken by server (e.g., as a part of
279 * OST-side locking, or unlink handling). Expansion doesn't
280 * make a lot of sense for local locks, because they are
281 * dropped immediately on operation completion and would only
282 * conflict with other threads.
286 if (lock->l_policy_data.l_extent.start == 0 &&
287 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
288 /* fast-path whole file locks */
291 ldlm_extent_internal_policy_granted(lock, &new_ex);
292 ldlm_extent_internal_policy_waiting(lock, &new_ex);
294 if (new_ex.start != lock->l_policy_data.l_extent.start ||
295 new_ex.end != lock->l_policy_data.l_extent.end) {
296 *flags |= LDLM_FL_LOCK_CHANGED;
297 lock->l_policy_data.l_extent.start = new_ex.start;
298 lock->l_policy_data.l_extent.end = new_ex.end;
302 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
304 struct ldlm_resource *res = lock->l_resource;
305 cfs_time_t now = cfs_time_current();
307 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
310 CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
311 if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
312 res->lr_contention_time = now;
313 return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
314 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
317 struct ldlm_extent_compat_args {
318 struct list_head *work_list;
319 struct ldlm_lock *lock;
325 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
328 struct ldlm_extent_compat_args *priv = data;
329 struct ldlm_interval *node = to_ldlm_interval(n);
330 struct ldlm_extent *extent;
331 struct list_head *work_list = priv->work_list;
332 struct ldlm_lock *lock, *enq = priv->lock;
333 ldlm_mode_t mode = priv->mode;
337 LASSERT(!list_empty(&node->li_group));
339 list_for_each_entry(lock, &node->li_group, l_sl_policy) {
340 /* interval tree is for granted lock */
341 LASSERTF(mode == lock->l_granted_mode,
342 "mode = %s, lock->l_granted_mode = %s\n",
344 ldlm_lockname[lock->l_granted_mode]);
346 if (lock->l_blocking_ast)
347 ldlm_add_ast_work_item(lock, enq, work_list);
350 /* don't count conflicting glimpse locks */
351 extent = ldlm_interval_extent(node);
352 if (!(mode == LCK_PR &&
353 extent->start == 0 && extent->end == OBD_OBJECT_EOF))
354 *priv->locks += count;
359 RETURN(INTERVAL_ITER_CONT);
363 * Determine if the lock is compatible with all locks on the queue.
365 * If \a work_list is provided, conflicting locks are linked there.
366 * If \a work_list is not provided, we exit this function on first conflict.
368 * \retval 0 if the lock is not compatible
369 * \retval 1 if the lock is compatible
370 * \retval 2 if \a req is a group lock and it is compatible and requires
371 * no further checking
372 * \retval negative error, such as EWOULDBLOCK for group locks
375 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
376 __u64 *flags, ldlm_error_t *err,
377 struct list_head *work_list, int *contended_locks)
379 struct ldlm_resource *res = req->l_resource;
380 ldlm_mode_t req_mode = req->l_req_mode;
381 __u64 req_start = req->l_req_extent.start;
382 __u64 req_end = req->l_req_extent.end;
383 struct ldlm_lock *lock;
384 int check_contention;
389 lockmode_verify(req_mode);
391 /* Using interval tree for granted lock */
392 if (queue == &res->lr_granted) {
393 struct ldlm_interval_tree *tree;
394 struct ldlm_extent_compat_args data = {.work_list = work_list,
396 .locks = contended_locks,
398 struct interval_node_extent ex = { .start = req_start,
402 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
403 tree = &res->lr_itree[idx];
404 if (tree->lit_root == NULL) /* empty tree, skipped */
407 data.mode = tree->lit_mode;
408 if (lockmode_compat(req_mode, tree->lit_mode)) {
409 struct ldlm_interval *node;
410 struct ldlm_extent *extent;
412 if (req_mode != LCK_GROUP)
415 /* group lock, grant it immediately if
417 node = to_ldlm_interval(tree->lit_root);
418 extent = ldlm_interval_extent(node);
419 if (req->l_policy_data.l_extent.gid ==
424 if (tree->lit_mode == LCK_GROUP) {
425 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
426 compat = -EWOULDBLOCK;
430 *flags |= LDLM_FL_NO_TIMEOUT;
434 /* if work list is not NULL,add all
435 locks in the tree to work list */
437 interval_iterate(tree->lit_root,
438 ldlm_extent_compat_cb, &data);
443 rc = interval_is_overlapped(tree->lit_root,&ex);
447 interval_search(tree->lit_root, &ex,
448 ldlm_extent_compat_cb, &data);
449 if (!list_empty(work_list) && compat)
453 } else { /* for waiting queue */
454 list_for_each_entry(lock, queue, l_res_link) {
455 check_contention = 1;
457 /* We stop walking the queue if we hit ourselves so
458 * we don't take conflicting locks enqueued after us
459 * into account, or we'd wait forever. */
463 if (unlikely(scan)) {
464 /* We only get here if we are queuing GROUP lock
465 and met some incompatible one. The main idea of this
466 code is to insert GROUP lock past compatible GROUP
467 lock in the waiting queue or if there is not any,
468 then in front of first non-GROUP lock */
469 if (lock->l_req_mode != LCK_GROUP) {
470 /* Ok, we hit non-GROUP lock, there should
471 * be no more GROUP locks later on, queue in
472 * front of first non-GROUP lock */
474 ldlm_resource_insert_lock_after(lock, req);
475 list_del_init(&lock->l_res_link);
476 ldlm_resource_insert_lock_after(req, lock);
480 if (req->l_policy_data.l_extent.gid ==
481 lock->l_policy_data.l_extent.gid) {
483 ldlm_resource_insert_lock_after(lock, req);
490 /* locks are compatible, overlap doesn't matter */
491 if (lockmode_compat(lock->l_req_mode, req_mode)) {
492 if (req_mode == LCK_PR &&
493 ((lock->l_policy_data.l_extent.start <=
494 req->l_policy_data.l_extent.start) &&
495 (lock->l_policy_data.l_extent.end >=
496 req->l_policy_data.l_extent.end))) {
497 /* If we met a PR lock just like us or
498 wider, and nobody down the list
499 conflicted with it, that means we
500 can skip processing of the rest of
501 the list and safely place ourselves
502 at the end of the list, or grant
503 (dependent if we met an conflicting
504 locks before in the list). In case
505 of 1st enqueue only we continue
506 traversing if there is something
507 conflicting down the list because
508 we need to make sure that something
509 is marked as AST_SENT as well, in
510 cse of empy worklist we would exit
511 on first conflict met. */
512 /* There IS a case where such flag is
513 not set for a lock, yet it blocks
514 something. Luckily for us this is
515 only during destroy, so lock is
516 exclusive. So here we are safe */
517 if (!ldlm_is_ast_sent(lock))
521 /* non-group locks are compatible, overlap doesn't
523 if (likely(req_mode != LCK_GROUP))
526 /* If we are trying to get a GROUP lock and there is
527 another one of this kind, we need to compare gid */
528 if (req->l_policy_data.l_extent.gid ==
529 lock->l_policy_data.l_extent.gid) {
530 /* If existing lock with matched gid is granted,
531 we grant new one too. */
532 if (lock->l_req_mode == lock->l_granted_mode)
535 /* Otherwise we are scanning queue of waiting
536 * locks and it means current request would
537 * block along with existing lock (that is
539 * If we are in nonblocking mode - return
541 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
542 compat = -EWOULDBLOCK;
545 /* If this group lock is compatible with another
546 * group lock on the waiting list, they must be
547 * together in the list, so they can be granted
548 * at the same time. Otherwise the later lock
549 * can get stuck behind another, incompatible,
551 ldlm_resource_insert_lock_after(lock, req);
552 /* Because 'lock' is not granted, we can stop
553 * processing this queue and return immediately.
554 * There is no need to check the rest of the
560 if (unlikely(req_mode == LCK_GROUP &&
561 (lock->l_req_mode != lock->l_granted_mode))) {
564 if (lock->l_req_mode != LCK_GROUP) {
565 /* Ok, we hit non-GROUP lock, there should be no
566 more GROUP locks later on, queue in front of
567 first non-GROUP lock */
569 ldlm_resource_insert_lock_after(lock, req);
570 list_del_init(&lock->l_res_link);
571 ldlm_resource_insert_lock_after(req, lock);
574 if (req->l_policy_data.l_extent.gid ==
575 lock->l_policy_data.l_extent.gid) {
577 ldlm_resource_insert_lock_after(lock, req);
583 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
584 /* If compared lock is GROUP, then requested is PR/PW/
585 * so this is not compatible; extent range does not
587 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
588 compat = -EWOULDBLOCK;
591 *flags |= LDLM_FL_NO_TIMEOUT;
593 } else if (lock->l_policy_data.l_extent.end < req_start ||
594 lock->l_policy_data.l_extent.start > req_end) {
595 /* if a non group lock doesn't overlap skip it */
597 } else if (lock->l_req_extent.end < req_start ||
598 lock->l_req_extent.start > req_end) {
599 /* false contention, the requests doesn't really overlap */
600 check_contention = 0;
606 /* don't count conflicting glimpse locks */
607 if (lock->l_req_mode == LCK_PR &&
608 lock->l_policy_data.l_extent.start == 0 &&
609 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
610 check_contention = 0;
612 *contended_locks += check_contention;
615 if (lock->l_blocking_ast)
616 ldlm_add_ast_work_item(lock, req, work_list);
620 if (ldlm_check_contention(req, *contended_locks) &&
622 (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
623 req->l_req_mode != LCK_GROUP &&
624 req_end - req_start <=
625 ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
626 GOTO(destroylock, compat = -EUSERS);
630 list_del_init(&req->l_res_link);
631 ldlm_lock_destroy_nolock(req);
637 * Discard all AST work items from list.
639 * If for whatever reason we do not want to send ASTs to conflicting locks
640 * anymore, disassemble the list with this function.
642 static void discard_bl_list(struct list_head *bl_list)
644 struct list_head *tmp, *pos;
647 list_for_each_safe(pos, tmp, bl_list) {
648 struct ldlm_lock *lock =
649 list_entry(pos, struct ldlm_lock, l_bl_ast);
651 list_del_init(&lock->l_bl_ast);
652 LASSERT(ldlm_is_ast_sent(lock));
653 ldlm_clear_ast_sent(lock);
654 LASSERT(lock->l_bl_ast_run == 0);
655 LASSERT(lock->l_blocking_lock);
656 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
657 lock->l_blocking_lock = NULL;
658 LDLM_LOCK_RELEASE(lock);
664 * Process a granting attempt for extent lock.
665 * Must be called with ns lock held.
667 * This function looks for any conflicts for \a lock in the granted or
668 * waiting queues. The lock is granted if no conflicts are found in
671 * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
672 * - blocking ASTs have already been sent
674 * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
675 * - blocking ASTs have not been sent yet, so list of conflicting locks
676 * would be collected and ASTs sent.
678 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
679 int first_enq, ldlm_error_t *err,
680 struct list_head *work_list)
682 struct ldlm_resource *res = lock->l_resource;
683 struct list_head rpc_list;
685 int contended_locks = 0;
688 LASSERT(lock->l_granted_mode != lock->l_req_mode);
689 LASSERT(list_empty(&res->lr_converting));
690 LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
691 !ldlm_is_ast_discard_data(lock));
692 INIT_LIST_HEAD(&rpc_list);
693 check_res_locked(res);
697 /* Careful observers will note that we don't handle -EWOULDBLOCK
698 * here, but it's ok for a non-obvious reason -- compat_queue
699 * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
700 * flags should always be zero here, and if that ever stops
701 * being true, we want to find out. */
702 LASSERT(*flags == 0);
703 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
704 err, NULL, &contended_locks);
706 rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
711 RETURN(LDLM_ITER_STOP);
713 ldlm_resource_unlink_lock(lock);
715 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
716 ldlm_extent_policy(res, lock, flags);
717 ldlm_grant_lock(lock, work_list);
718 RETURN(LDLM_ITER_CONTINUE);
723 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
724 &rpc_list, &contended_locks);
726 GOTO(out, rc); /* lock was destroyed */
730 rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
731 &rpc_list, &contended_locks);
733 GOTO(out, rc = rc2); /* lock was destroyed */
737 ldlm_extent_policy(res, lock, flags);
738 ldlm_resource_unlink_lock(lock);
739 ldlm_grant_lock(lock, NULL);
741 /* If either of the compat_queue()s returned failure, then we
742 * have ASTs to send and must go onto the waiting list.
744 * bug 2322: we used to unlink and re-add here, which was a
745 * terrible folly -- if we goto restart, we could get
746 * re-ordered! Causes deadlock, because ASTs aren't sent! */
747 if (list_empty(&lock->l_res_link))
748 ldlm_resource_add_lock(res, &res->lr_waiting, lock);
750 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
753 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
754 !ns_is_client(ldlm_res_to_ns(res)))
755 class_fail_export(lock->l_export);
758 if (rc == -ERESTART) {
759 /* 15715: The lock was granted and destroyed after
760 * resource lock was dropped. Interval node was freed
761 * in ldlm_lock_destroy. Anyway, this always happens
762 * when a client is being evicted. So it would be
763 * ok to return an error. -jay */
764 if (ldlm_is_destroyed(lock)) {
766 GOTO(out, rc = -EAGAIN);
769 /* lock was granted while resource was unlocked. */
770 if (lock->l_granted_mode == lock->l_req_mode) {
771 /* bug 11300: if the lock has been granted,
772 * break earlier because otherwise, we will go
773 * to restart and ldlm_resource_unlink will be
774 * called and it causes the interval node to be
775 * freed. Then we will fail at
776 * ldlm_extent_add_lock() */
777 *flags &= ~LDLM_FL_BLOCKED_MASK;
784 /* this way we force client to wait for the lock
785 * endlessly once the lock is enqueued -bzzz */
786 *flags |= LDLM_FL_BLOCK_GRANTED | LDLM_FL_NO_TIMEOUT;
791 if (!list_empty(&rpc_list)) {
792 LASSERT(!ldlm_is_ast_discard_data(lock));
793 discard_bl_list(&rpc_list);
797 #endif /* HAVE_SERVER_SUPPORT */
799 /* When a lock is cancelled by a client, the KMS may undergo change if this
800 * is the "highest lock". This function returns the new KMS value.
801 * Caller must hold lr_lock already.
803 * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
804 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
806 struct ldlm_resource *res = lock->l_resource;
807 struct list_head *tmp;
808 struct ldlm_lock *lck;
812 /* don't let another thread in ldlm_extent_shift_kms race in
813 * just after we finish and take our lock into account in its
814 * calculation of the kms */
815 ldlm_set_kms_ignore(lock);
817 list_for_each(tmp, &res->lr_granted) {
818 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
820 if (ldlm_is_kms_ignore(lck))
823 if (lck->l_policy_data.l_extent.end >= old_kms)
826 /* This extent _has_ to be smaller than old_kms (checked above)
827 * so kms can only ever be smaller or the same as old_kms. */
828 if (lck->l_policy_data.l_extent.end + 1 > kms)
829 kms = lck->l_policy_data.l_extent.end + 1;
831 LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
835 EXPORT_SYMBOL(ldlm_extent_shift_kms);
837 struct kmem_cache *ldlm_interval_slab;
838 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
840 struct ldlm_interval *node;
843 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
844 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
848 INIT_LIST_HEAD(&node->li_group);
849 ldlm_interval_attach(node, lock);
853 void ldlm_interval_free(struct ldlm_interval *node)
856 LASSERT(list_empty(&node->li_group));
857 LASSERT(!interval_is_intree(&node->li_node));
858 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
862 /* interval tree, for LDLM_EXTENT. */
863 void ldlm_interval_attach(struct ldlm_interval *n,
866 LASSERT(l->l_tree_node == NULL);
867 LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
869 list_add_tail(&l->l_sl_policy, &n->li_group);
873 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
875 struct ldlm_interval *n = l->l_tree_node;
880 LASSERT(!list_empty(&n->li_group));
881 l->l_tree_node = NULL;
882 list_del_init(&l->l_sl_policy);
884 return list_empty(&n->li_group) ? n : NULL;
887 static inline int lock_mode_to_index(ldlm_mode_t mode)
892 LASSERT(IS_PO2(mode));
893 for (index = -1; mode; index++, mode >>= 1) ;
894 LASSERT(index < LCK_MODE_NUM);
898 /** Add newly granted lock into interval tree for the resource. */
899 void ldlm_extent_add_lock(struct ldlm_resource *res,
900 struct ldlm_lock *lock)
902 struct interval_node *found, **root;
903 struct ldlm_interval *node;
904 struct ldlm_extent *extent;
907 LASSERT(lock->l_granted_mode == lock->l_req_mode);
909 node = lock->l_tree_node;
910 LASSERT(node != NULL);
911 LASSERT(!interval_is_intree(&node->li_node));
913 idx = lock_mode_to_index(lock->l_granted_mode);
914 LASSERT(lock->l_granted_mode == 1 << idx);
915 LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
917 /* node extent initialize */
918 extent = &lock->l_policy_data.l_extent;
919 interval_set(&node->li_node, extent->start, extent->end);
921 root = &res->lr_itree[idx].lit_root;
922 found = interval_insert(&node->li_node, root);
923 if (found) { /* The policy group found. */
924 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
925 LASSERT(tmp != NULL);
926 ldlm_interval_free(tmp);
927 ldlm_interval_attach(to_ldlm_interval(found), lock);
929 res->lr_itree[idx].lit_size++;
931 /* even though we use interval tree to manage the extent lock, we also
932 * add the locks into grant list, for debug purpose, .. */
933 ldlm_resource_add_lock(res, &res->lr_granted, lock);
936 /** Remove cancelled lock from resource interval tree. */
937 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
939 struct ldlm_resource *res = lock->l_resource;
940 struct ldlm_interval *node = lock->l_tree_node;
941 struct ldlm_interval_tree *tree;
944 if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
947 idx = lock_mode_to_index(lock->l_granted_mode);
948 LASSERT(lock->l_granted_mode == 1 << idx);
949 tree = &res->lr_itree[idx];
951 LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
954 node = ldlm_interval_detach(lock);
956 interval_erase(&node->li_node, &tree->lit_root);
957 ldlm_interval_free(node);
961 void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
962 ldlm_policy_data_t *lpolicy)
964 memset(lpolicy, 0, sizeof(*lpolicy));
965 lpolicy->l_extent.start = wpolicy->l_extent.start;
966 lpolicy->l_extent.end = wpolicy->l_extent.end;
967 lpolicy->l_extent.gid = wpolicy->l_extent.gid;
970 void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
971 ldlm_wire_policy_data_t *wpolicy)
973 memset(wpolicy, 0, sizeof(*wpolicy));
974 wpolicy->l_extent.start = lpolicy->l_extent.start;
975 wpolicy->l_extent.end = lpolicy->l_extent.end;
976 wpolicy->l_extent.gid = lpolicy->l_extent.gid;