4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2010, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ldlm/ldlm_extent.c
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
43 * This file contains implementation of EXTENT lock type
45 * EXTENT lock type is for locking a contiguous range of values, represented
46 * by 64-bit starting and ending offsets (inclusive). There are several extent
47 * lock modes, some of which may be mutually incompatible. Extent locks are
48 * considered incompatible if their modes are incompatible and their extents
49 * intersect. See the lock mode compatibility matrix in lustre_dlm.h.
52 #define DEBUG_SUBSYSTEM S_LDLM
54 # include <liblustre.h>
56 # include <libcfs/libcfs.h>
59 #include <lustre_dlm.h>
60 #include <obd_support.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
65 #include "ldlm_internal.h"
67 #ifdef HAVE_SERVER_SUPPORT
68 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
71 * Fix up the ldlm_extent after expanding it.
73 * After expansion has been done, we might still want to do certain adjusting
74 * based on overall contention of the resource and the like to avoid granting
77 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
78 struct ldlm_extent *new_ex,
81 ldlm_mode_t req_mode = req->l_req_mode;
82 __u64 req_start = req->l_req_extent.start;
83 __u64 req_end = req->l_req_extent.end;
84 __u64 req_align, mask;
86 if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
87 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
88 new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
92 if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
97 /* we need to ensure that the lock extent is properly aligned to what
98 * the client requested. Also we need to make sure it's also server
99 * page size aligned otherwise a server page can be covered by two
101 mask = CFS_PAGE_SIZE;
102 req_align = (req_end + 1) | req_start;
103 if (req_align != 0 && (req_align & (mask - 1)) == 0) {
104 while ((req_align & mask) == 0)
108 /* We can only shrink the lock, not grow it.
109 * This should never cause lock to be smaller than requested,
110 * since requested lock was already aligned on these boundaries. */
111 new_ex->start = ((new_ex->start - 1) | mask) + 1;
112 new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
113 LASSERTF(new_ex->start <= req_start,
114 "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
115 mask, new_ex->start, req_start);
116 LASSERTF(new_ex->end >= req_end,
117 "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
118 mask, new_ex->end, req_end);
122 * Return the maximum extent that:
123 * - contains the requested extent
124 * - does not overlap existing conflicting extents outside the requested one
126 * This allows clients to request a small required extent range, but if there
127 * is no contention on the lock the full lock can be granted to the client.
128 * This avoids the need for many smaller lock requests to be granted in the
129 * common (uncontended) case.
131 * Use interval tree to expand the lock extent for granted lock.
133 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
134 struct ldlm_extent *new_ex)
136 struct ldlm_resource *res = req->l_resource;
137 ldlm_mode_t req_mode = req->l_req_mode;
138 __u64 req_start = req->l_req_extent.start;
139 __u64 req_end = req->l_req_extent.end;
140 struct ldlm_interval_tree *tree;
141 struct interval_node_extent limiter = { new_ex->start, new_ex->end };
146 lockmode_verify(req_mode);
148 /* Using interval tree to handle the LDLM extent granted locks. */
149 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
150 struct interval_node_extent ext = { req_start, req_end };
152 tree = &res->lr_itree[idx];
153 if (lockmode_compat(tree->lit_mode, req_mode))
156 conflicting += tree->lit_size;
158 limiter.start = req_start;
160 if (interval_is_overlapped(tree->lit_root, &ext))
162 "req_mode = %d, tree->lit_mode = %d, "
163 "tree->lit_size = %d\n",
164 req_mode, tree->lit_mode, tree->lit_size);
165 interval_expand(tree->lit_root, &ext, &limiter);
166 limiter.start = max(limiter.start, ext.start);
167 limiter.end = min(limiter.end, ext.end);
168 if (limiter.start == req_start && limiter.end == req_end)
172 new_ex->start = limiter.start;
173 new_ex->end = limiter.end;
174 LASSERT(new_ex->start <= req_start);
175 LASSERT(new_ex->end >= req_end);
177 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
181 /* The purpose of this function is to return:
182 * - the maximum extent
183 * - containing the requested extent
184 * - and not overlapping existing conflicting extents outside the requested one
187 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
188 struct ldlm_extent *new_ex)
191 struct ldlm_resource *res = req->l_resource;
192 ldlm_mode_t req_mode = req->l_req_mode;
193 __u64 req_start = req->l_req_extent.start;
194 __u64 req_end = req->l_req_extent.end;
198 lockmode_verify(req_mode);
200 /* for waiting locks */
201 cfs_list_for_each(tmp, &res->lr_waiting) {
202 struct ldlm_lock *lock;
203 struct ldlm_extent *l_extent;
205 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
206 l_extent = &lock->l_policy_data.l_extent;
208 /* We already hit the minimum requested size, search no more */
209 if (new_ex->start == req_start && new_ex->end == req_end) {
214 /* Don't conflict with ourselves */
218 /* Locks are compatible, overlap doesn't matter */
219 /* Until bug 20 is fixed, try to avoid granting overlapping
220 * locks on one client (they take a long time to cancel) */
221 if (lockmode_compat(lock->l_req_mode, req_mode) &&
222 lock->l_export != req->l_export)
225 /* If this is a high-traffic lock, don't grow downwards at all
226 * or grow upwards too much */
229 new_ex->start = req_start;
231 /* If lock doesn't overlap new_ex, skip it. */
232 if (!ldlm_extent_overlap(l_extent, new_ex))
235 /* Locks conflicting in requested extents and we can't satisfy
236 * both locks, so ignore it. Either we will ping-pong this
237 * extent (we would regardless of what extent we granted) or
238 * lock is unused and it shouldn't limit our extent growth. */
239 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
242 /* We grow extents downwards only as far as they don't overlap
243 * with already-granted locks, on the assumption that clients
244 * will be writing beyond the initial requested end and would
245 * then need to enqueue a new lock beyond previous request.
246 * l_req_extent->end strictly < req_start, checked above. */
247 if (l_extent->start < req_start && new_ex->start != req_start) {
248 if (l_extent->end >= req_start)
249 new_ex->start = req_start;
251 new_ex->start = min(l_extent->end+1, req_start);
254 /* If we need to cancel this lock anyways because our request
255 * overlaps the granted lock, we grow up to its requested
256 * extent start instead of limiting this extent, assuming that
257 * clients are writing forwards and the lock had over grown
258 * its extent downwards before we enqueued our request. */
259 if (l_extent->end > req_end) {
260 if (l_extent->start <= req_end)
261 new_ex->end = max(lock->l_req_extent.start - 1,
264 new_ex->end = max(l_extent->start - 1, req_end);
268 ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
273 /* In order to determine the largest possible extent we can grant, we need
274 * to scan all of the queues. */
275 static void ldlm_extent_policy(struct ldlm_resource *res,
276 struct ldlm_lock *lock, __u64 *flags)
278 struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
280 if (lock->l_export == NULL)
282 * this is local lock taken by server (e.g., as a part of
283 * OST-side locking, or unlink handling). Expansion doesn't
284 * make a lot of sense for local locks, because they are
285 * dropped immediately on operation completion and would only
286 * conflict with other threads.
290 if (lock->l_policy_data.l_extent.start == 0 &&
291 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
292 /* fast-path whole file locks */
295 ldlm_extent_internal_policy_granted(lock, &new_ex);
296 ldlm_extent_internal_policy_waiting(lock, &new_ex);
298 if (new_ex.start != lock->l_policy_data.l_extent.start ||
299 new_ex.end != lock->l_policy_data.l_extent.end) {
300 *flags |= LDLM_FL_LOCK_CHANGED;
301 lock->l_policy_data.l_extent.start = new_ex.start;
302 lock->l_policy_data.l_extent.end = new_ex.end;
306 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
308 struct ldlm_resource *res = lock->l_resource;
309 cfs_time_t now = cfs_time_current();
311 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
314 CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
315 if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
316 res->lr_contention_time = now;
317 return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
318 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
321 struct ldlm_extent_compat_args {
322 cfs_list_t *work_list;
323 struct ldlm_lock *lock;
329 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
332 struct ldlm_extent_compat_args *priv = data;
333 struct ldlm_interval *node = to_ldlm_interval(n);
334 struct ldlm_extent *extent;
335 cfs_list_t *work_list = priv->work_list;
336 struct ldlm_lock *lock, *enq = priv->lock;
337 ldlm_mode_t mode = priv->mode;
341 LASSERT(!cfs_list_empty(&node->li_group));
343 cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
344 /* interval tree is for granted lock */
345 LASSERTF(mode == lock->l_granted_mode,
346 "mode = %s, lock->l_granted_mode = %s\n",
348 ldlm_lockname[lock->l_granted_mode]);
350 if (lock->l_blocking_ast)
351 ldlm_add_ast_work_item(lock, enq, work_list);
354 /* don't count conflicting glimpse locks */
355 extent = ldlm_interval_extent(node);
356 if (!(mode == LCK_PR &&
357 extent->start == 0 && extent->end == OBD_OBJECT_EOF))
358 *priv->locks += count;
363 RETURN(INTERVAL_ITER_CONT);
367 * Determine if the lock is compatible with all locks on the queue.
369 * If \a work_list is provided, conflicting locks are linked there.
370 * If \a work_list is not provided, we exit this function on first conflict.
372 * \retval 0 if the lock is not compatible
373 * \retval 1 if the lock is compatible
374 * \retval 2 if \a req is a group lock and it is compatible and requires
375 * no further checking
376 * \retval negative error, such as EWOULDBLOCK for group locks
379 ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
380 __u64 *flags, ldlm_error_t *err,
381 cfs_list_t *work_list, int *contended_locks)
384 struct ldlm_lock *lock;
385 struct ldlm_resource *res = req->l_resource;
386 ldlm_mode_t req_mode = req->l_req_mode;
387 __u64 req_start = req->l_req_extent.start;
388 __u64 req_end = req->l_req_extent.end;
391 int check_contention;
394 lockmode_verify(req_mode);
396 /* Using interval tree for granted lock */
397 if (queue == &res->lr_granted) {
398 struct ldlm_interval_tree *tree;
399 struct ldlm_extent_compat_args data = {.work_list = work_list,
401 .locks = contended_locks,
403 struct interval_node_extent ex = { .start = req_start,
407 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
408 tree = &res->lr_itree[idx];
409 if (tree->lit_root == NULL) /* empty tree, skipped */
412 data.mode = tree->lit_mode;
413 if (lockmode_compat(req_mode, tree->lit_mode)) {
414 struct ldlm_interval *node;
415 struct ldlm_extent *extent;
417 if (req_mode != LCK_GROUP)
420 /* group lock, grant it immediately if
422 node = to_ldlm_interval(tree->lit_root);
423 extent = ldlm_interval_extent(node);
424 if (req->l_policy_data.l_extent.gid ==
429 if (tree->lit_mode == LCK_GROUP) {
430 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
431 compat = -EWOULDBLOCK;
435 *flags |= LDLM_FL_NO_TIMEOUT;
439 /* if work list is not NULL,add all
440 locks in the tree to work list */
442 interval_iterate(tree->lit_root,
443 ldlm_extent_compat_cb, &data);
448 rc = interval_is_overlapped(tree->lit_root,&ex);
452 interval_search(tree->lit_root, &ex,
453 ldlm_extent_compat_cb, &data);
454 if (!cfs_list_empty(work_list) && compat)
458 } else { /* for waiting queue */
459 cfs_list_for_each(tmp, queue) {
460 check_contention = 1;
462 lock = cfs_list_entry(tmp, struct ldlm_lock,
465 /* We stop walking the queue if we hit ourselves so
466 * we don't take conflicting locks enqueued after us
467 * into account, or we'd wait forever. */
471 if (unlikely(scan)) {
472 /* We only get here if we are queuing GROUP lock
473 and met some incompatible one. The main idea of this
474 code is to insert GROUP lock past compatible GROUP
475 lock in the waiting queue or if there is not any,
476 then in front of first non-GROUP lock */
477 if (lock->l_req_mode != LCK_GROUP) {
478 /* Ok, we hit non-GROUP lock, there should
479 * be no more GROUP locks later on, queue in
480 * front of first non-GROUP lock */
482 ldlm_resource_insert_lock_after(lock, req);
483 cfs_list_del_init(&lock->l_res_link);
484 ldlm_resource_insert_lock_after(req, lock);
488 if (req->l_policy_data.l_extent.gid ==
489 lock->l_policy_data.l_extent.gid) {
491 ldlm_resource_insert_lock_after(lock, req);
498 /* locks are compatible, overlap doesn't matter */
499 if (lockmode_compat(lock->l_req_mode, req_mode)) {
500 if (req_mode == LCK_PR &&
501 ((lock->l_policy_data.l_extent.start <=
502 req->l_policy_data.l_extent.start) &&
503 (lock->l_policy_data.l_extent.end >=
504 req->l_policy_data.l_extent.end))) {
505 /* If we met a PR lock just like us or wider,
506 and nobody down the list conflicted with
507 it, that means we can skip processing of
508 the rest of the list and safely place
509 ourselves at the end of the list, or grant
510 (dependent if we met an conflicting locks
512 In case of 1st enqueue only we continue
513 traversing if there is something conflicting
514 down the list because we need to make sure
515 that something is marked as AST_SENT as well,
516 in cse of empy worklist we would exit on
517 first conflict met. */
518 /* There IS a case where such flag is
519 not set for a lock, yet it blocks
520 something. Luckily for us this is
521 only during destroy, so lock is
522 exclusive. So here we are safe */
523 if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
528 /* non-group locks are compatible, overlap doesn't
530 if (likely(req_mode != LCK_GROUP))
533 /* If we are trying to get a GROUP lock and there is
534 another one of this kind, we need to compare gid */
535 if (req->l_policy_data.l_extent.gid ==
536 lock->l_policy_data.l_extent.gid) {
537 /* If existing lock with matched gid is granted,
538 we grant new one too. */
539 if (lock->l_req_mode == lock->l_granted_mode)
542 /* Otherwise we are scanning queue of waiting
543 * locks and it means current request would
544 * block along with existing lock (that is
546 * If we are in nonblocking mode - return
548 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
549 compat = -EWOULDBLOCK;
552 /* If this group lock is compatible with another
553 * group lock on the waiting list, they must be
554 * together in the list, so they can be granted
555 * at the same time. Otherwise the later lock
556 * can get stuck behind another, incompatible,
558 ldlm_resource_insert_lock_after(lock, req);
559 /* Because 'lock' is not granted, we can stop
560 * processing this queue and return immediately.
561 * There is no need to check the rest of the
567 if (unlikely(req_mode == LCK_GROUP &&
568 (lock->l_req_mode != lock->l_granted_mode))) {
571 if (lock->l_req_mode != LCK_GROUP) {
572 /* Ok, we hit non-GROUP lock, there should be no
573 more GROUP locks later on, queue in front of
574 first non-GROUP lock */
576 ldlm_resource_insert_lock_after(lock, req);
577 cfs_list_del_init(&lock->l_res_link);
578 ldlm_resource_insert_lock_after(req, lock);
581 if (req->l_policy_data.l_extent.gid ==
582 lock->l_policy_data.l_extent.gid) {
584 ldlm_resource_insert_lock_after(lock, req);
590 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
591 /* If compared lock is GROUP, then requested is PR/PW/
592 * so this is not compatible; extent range does not
594 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
595 compat = -EWOULDBLOCK;
598 *flags |= LDLM_FL_NO_TIMEOUT;
600 } else if (lock->l_policy_data.l_extent.end < req_start ||
601 lock->l_policy_data.l_extent.start > req_end) {
602 /* if a non group lock doesn't overlap skip it */
604 } else if (lock->l_req_extent.end < req_start ||
605 lock->l_req_extent.start > req_end) {
606 /* false contention, the requests doesn't really overlap */
607 check_contention = 0;
613 /* don't count conflicting glimpse locks */
614 if (lock->l_req_mode == LCK_PR &&
615 lock->l_policy_data.l_extent.start == 0 &&
616 lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
617 check_contention = 0;
619 *contended_locks += check_contention;
622 if (lock->l_blocking_ast)
623 ldlm_add_ast_work_item(lock, req, work_list);
627 if (ldlm_check_contention(req, *contended_locks) &&
629 (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
630 req->l_req_mode != LCK_GROUP &&
631 req_end - req_start <=
632 ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
633 GOTO(destroylock, compat = -EUSERS);
637 cfs_list_del_init(&req->l_res_link);
638 ldlm_lock_destroy_nolock(req);
644 * Discard all AST work items from list.
646 * If for whatever reason we do not want to send ASTs to conflicting locks
647 * anymore, disassemble the list with this function.
649 static void discard_bl_list(cfs_list_t *bl_list)
651 cfs_list_t *tmp, *pos;
654 cfs_list_for_each_safe(pos, tmp, bl_list) {
655 struct ldlm_lock *lock =
656 cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
658 cfs_list_del_init(&lock->l_bl_ast);
659 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
660 lock->l_flags &= ~LDLM_FL_AST_SENT;
661 LASSERT(lock->l_bl_ast_run == 0);
662 LASSERT(lock->l_blocking_lock);
663 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
664 lock->l_blocking_lock = NULL;
665 LDLM_LOCK_RELEASE(lock);
671 * Process a granting attempt for extent lock.
672 * Must be called with ns lock held.
674 * This function looks for any conflicts for \a lock in the granted or
675 * waiting queues. The lock is granted if no conflicts are found in
678 * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
679 * - blocking ASTs have already been sent
681 * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
682 * - blocking ASTs have not been sent yet, so list of conflicting locks
683 * would be collected and ASTs sent.
685 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
686 int first_enq, ldlm_error_t *err,
687 cfs_list_t *work_list)
689 struct ldlm_resource *res = lock->l_resource;
690 CFS_LIST_HEAD(rpc_list);
692 int contended_locks = 0;
695 LASSERT(cfs_list_empty(&res->lr_converting));
696 LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
697 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
698 check_res_locked(res);
702 /* Careful observers will note that we don't handle -EWOULDBLOCK
703 * here, but it's ok for a non-obvious reason -- compat_queue
704 * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
705 * flags should always be zero here, and if that ever stops
706 * being true, we want to find out. */
707 LASSERT(*flags == 0);
708 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
709 err, NULL, &contended_locks);
711 rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
716 RETURN(LDLM_ITER_STOP);
718 ldlm_resource_unlink_lock(lock);
720 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
721 ldlm_extent_policy(res, lock, flags);
722 ldlm_grant_lock(lock, work_list);
723 RETURN(LDLM_ITER_CONTINUE);
728 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
729 &rpc_list, &contended_locks);
731 GOTO(out, rc); /* lock was destroyed */
735 rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
736 &rpc_list, &contended_locks);
738 GOTO(out, rc = rc2); /* lock was destroyed */
742 ldlm_extent_policy(res, lock, flags);
743 ldlm_resource_unlink_lock(lock);
744 ldlm_grant_lock(lock, NULL);
746 /* If either of the compat_queue()s returned failure, then we
747 * have ASTs to send and must go onto the waiting list.
749 * bug 2322: we used to unlink and re-add here, which was a
750 * terrible folly -- if we goto restart, we could get
751 * re-ordered! Causes deadlock, because ASTs aren't sent! */
752 if (cfs_list_empty(&lock->l_res_link))
753 ldlm_resource_add_lock(res, &res->lr_waiting, lock);
755 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
758 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
759 !ns_is_client(ldlm_res_to_ns(res)))
760 class_fail_export(lock->l_export);
763 if (rc == -ERESTART) {
764 /* 15715: The lock was granted and destroyed after
765 * resource lock was dropped. Interval node was freed
766 * in ldlm_lock_destroy. Anyway, this always happens
767 * when a client is being evicted. So it would be
768 * ok to return an error. -jay */
769 if (lock->l_destroyed) {
771 GOTO(out, rc = -EAGAIN);
774 /* lock was granted while resource was unlocked. */
775 if (lock->l_granted_mode == lock->l_req_mode) {
776 /* bug 11300: if the lock has been granted,
777 * break earlier because otherwise, we will go
778 * to restart and ldlm_resource_unlink will be
779 * called and it causes the interval node to be
780 * freed. Then we will fail at
781 * ldlm_extent_add_lock() */
782 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
787 GOTO(restart, -ERESTART);
790 *flags |= LDLM_FL_BLOCK_GRANTED;
791 /* this way we force client to wait for the lock
792 * endlessly once the lock is enqueued -bzzz */
793 *flags |= LDLM_FL_NO_TIMEOUT;
798 if (!cfs_list_empty(&rpc_list)) {
799 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
800 discard_bl_list(&rpc_list);
804 #endif /* HAVE_SERVER_SUPPORT */
806 /* When a lock is cancelled by a client, the KMS may undergo change if this
807 * is the "highest lock". This function returns the new KMS value.
808 * Caller must hold lr_lock already.
810 * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
811 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
813 struct ldlm_resource *res = lock->l_resource;
815 struct ldlm_lock *lck;
819 /* don't let another thread in ldlm_extent_shift_kms race in
820 * just after we finish and take our lock into account in its
821 * calculation of the kms */
822 lock->l_flags |= LDLM_FL_KMS_IGNORE;
824 cfs_list_for_each(tmp, &res->lr_granted) {
825 lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
827 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
830 if (lck->l_policy_data.l_extent.end >= old_kms)
833 /* This extent _has_ to be smaller than old_kms (checked above)
834 * so kms can only ever be smaller or the same as old_kms. */
835 if (lck->l_policy_data.l_extent.end + 1 > kms)
836 kms = lck->l_policy_data.l_extent.end + 1;
838 LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
842 EXPORT_SYMBOL(ldlm_extent_shift_kms);
844 cfs_mem_cache_t *ldlm_interval_slab;
845 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
847 struct ldlm_interval *node;
850 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
851 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
855 CFS_INIT_LIST_HEAD(&node->li_group);
856 ldlm_interval_attach(node, lock);
860 void ldlm_interval_free(struct ldlm_interval *node)
863 LASSERT(cfs_list_empty(&node->li_group));
864 LASSERT(!interval_is_intree(&node->li_node));
865 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
869 /* interval tree, for LDLM_EXTENT. */
870 void ldlm_interval_attach(struct ldlm_interval *n,
873 LASSERT(l->l_tree_node == NULL);
874 LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
876 cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
880 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
882 struct ldlm_interval *n = l->l_tree_node;
887 LASSERT(!cfs_list_empty(&n->li_group));
888 l->l_tree_node = NULL;
889 cfs_list_del_init(&l->l_sl_policy);
891 return (cfs_list_empty(&n->li_group) ? n : NULL);
894 static inline int lock_mode_to_index(ldlm_mode_t mode)
899 LASSERT(IS_PO2(mode));
900 for (index = -1; mode; index++, mode >>= 1) ;
901 LASSERT(index < LCK_MODE_NUM);
905 /** Add newly granted lock into interval tree for the resource. */
906 void ldlm_extent_add_lock(struct ldlm_resource *res,
907 struct ldlm_lock *lock)
909 struct interval_node *found, **root;
910 struct ldlm_interval *node;
911 struct ldlm_extent *extent;
914 LASSERT(lock->l_granted_mode == lock->l_req_mode);
916 node = lock->l_tree_node;
917 LASSERT(node != NULL);
918 LASSERT(!interval_is_intree(&node->li_node));
920 idx = lock_mode_to_index(lock->l_granted_mode);
921 LASSERT(lock->l_granted_mode == 1 << idx);
922 LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
924 /* node extent initialize */
925 extent = &lock->l_policy_data.l_extent;
926 interval_set(&node->li_node, extent->start, extent->end);
928 root = &res->lr_itree[idx].lit_root;
929 found = interval_insert(&node->li_node, root);
930 if (found) { /* The policy group found. */
931 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
932 LASSERT(tmp != NULL);
933 ldlm_interval_free(tmp);
934 ldlm_interval_attach(to_ldlm_interval(found), lock);
936 res->lr_itree[idx].lit_size++;
938 /* even though we use interval tree to manage the extent lock, we also
939 * add the locks into grant list, for debug purpose, .. */
940 ldlm_resource_add_lock(res, &res->lr_granted, lock);
943 /** Remove cancelled lock from resource interval tree. */
944 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
946 struct ldlm_resource *res = lock->l_resource;
947 struct ldlm_interval *node = lock->l_tree_node;
948 struct ldlm_interval_tree *tree;
951 if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
954 idx = lock_mode_to_index(lock->l_granted_mode);
955 LASSERT(lock->l_granted_mode == 1 << idx);
956 tree = &res->lr_itree[idx];
958 LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
961 node = ldlm_interval_detach(lock);
963 interval_erase(&node->li_node, &tree->lit_root);
964 ldlm_interval_free(node);
968 void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
969 ldlm_policy_data_t *lpolicy)
971 memset(lpolicy, 0, sizeof(*lpolicy));
972 lpolicy->l_extent.start = wpolicy->l_extent.start;
973 lpolicy->l_extent.end = wpolicy->l_extent.end;
974 lpolicy->l_extent.gid = wpolicy->l_extent.gid;
977 void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
978 ldlm_wire_policy_data_t *wpolicy)
980 memset(wpolicy, 0, sizeof(*wpolicy));
981 wpolicy->l_extent.start = lpolicy->l_extent.start;
982 wpolicy->l_extent.end = lpolicy->l_extent.end;
983 wpolicy->l_extent.gid = lpolicy->l_extent.gid;