X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_extent.c;h=4eba10fb2f67142b23d2eea82cf25df9e99ec37e;hb=5cced5eb96914a5a51a551ed12e92ebfc7834029;hp=86e2ae3ca8a4ce672c5aaba278f19b5b8ccc1024;hpb=6869932b552ac705f411de3362f01bd50c1f6f7d;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 86e2ae3..4eba10f 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -26,7 +26,7 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. */ /* @@ -49,6 +49,7 @@ #include #include #include +#include #include #include "ldlm_internal.h" @@ -64,7 +65,7 @@ static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req, __u64 req_start = req->l_req_extent.start; __u64 req_end = req->l_req_extent.end; __u64 req_align, mask; - + if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) { if (req_end < req_start + LDLM_MAX_GROWN_EXTENT) new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT, @@ -99,202 +100,29 @@ static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req, mask, new_ex->end, req_end); } -/* The purpose of this function is to return: - * - the maximum extent - * - containing the requested extent - * - and not overlapping existing conflicting extents outside the requested one - * - * Use interval tree to expand the lock extent for granted lock. - */ -static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req, - struct ldlm_extent *new_ex) -{ - struct ldlm_resource *res = req->l_resource; - ldlm_mode_t req_mode = req->l_req_mode; - __u64 req_start = req->l_req_extent.start; - __u64 req_end = req->l_req_extent.end; - struct ldlm_interval_tree *tree; - struct interval_node_extent limiter = { new_ex->start, new_ex->end }; - int conflicting = 0; - int idx; - ENTRY; - - lockmode_verify(req_mode); - - /* using interval tree to handle the ldlm extent granted locks */ - for (idx = 0; idx < LCK_MODE_NUM; idx++) { - struct interval_node_extent ext = { req_start, req_end }; - - tree = &res->lr_itree[idx]; - if (lockmode_compat(tree->lit_mode, req_mode)) - continue; - - conflicting += tree->lit_size; - if (conflicting > 4) - limiter.start = req_start; - - if (interval_is_overlapped(tree->lit_root, &ext)) - printk("req_mode = %d, tree->lit_mode = %d, tree->lit_size = %d\n", - req_mode, tree->lit_mode, tree->lit_size); - interval_expand(tree->lit_root, &ext, &limiter); - limiter.start = max(limiter.start, ext.start); - limiter.end = min(limiter.end, ext.end); - if (limiter.start == req_start && limiter.end == req_end) - break; - } - - new_ex->start = limiter.start; - new_ex->end = limiter.end; - LASSERT(new_ex->start <= req_start); - LASSERT(new_ex->end >= req_end); - - ldlm_extent_internal_policy_fixup(req, new_ex, conflicting); - EXIT; -} - -/* The purpose of this function is to return: - * - the maximum extent - * - containing the requested extent - * - and not overlapping existing conflicting extents outside the requested one - */ -static void -ldlm_extent_internal_policy_waiting(struct ldlm_lock *req, - struct ldlm_extent *new_ex) -{ - struct list_head *tmp; - struct ldlm_resource *res = req->l_resource; - ldlm_mode_t req_mode = req->l_req_mode; - __u64 req_start = req->l_req_extent.start; - __u64 req_end = req->l_req_extent.end; - int conflicting = 0; - ENTRY; - - lockmode_verify(req_mode); - - /* for waiting locks */ - list_for_each(tmp, &res->lr_waiting) { - struct ldlm_lock *lock; - struct ldlm_extent *l_extent; - - lock = list_entry(tmp, struct ldlm_lock, l_res_link); - l_extent = &lock->l_policy_data.l_extent; - - /* We already hit the minimum requested size, search no more */ - if (new_ex->start == req_start && new_ex->end == req_end) { - EXIT; - return; - } - - /* Don't conflict with ourselves */ - if (req == lock) - continue; - - /* Locks are compatible, overlap doesn't matter */ - /* Until bug 20 is fixed, try to avoid granting overlapping - * locks on one client (they take a long time to cancel) */ - if (lockmode_compat(lock->l_req_mode, req_mode) && - lock->l_export != req->l_export) - continue; - - /* If this is a high-traffic lock, don't grow downwards at all - * or grow upwards too much */ - ++conflicting; - if (conflicting > 4) - new_ex->start = req_start; - - /* If lock doesn't overlap new_ex, skip it. */ - if (!ldlm_extent_overlap(l_extent, new_ex)) - continue; - - /* Locks conflicting in requested extents and we can't satisfy - * both locks, so ignore it. Either we will ping-pong this - * extent (we would regardless of what extent we granted) or - * lock is unused and it shouldn't limit our extent growth. */ - if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent)) - continue; - - /* We grow extents downwards only as far as they don't overlap - * with already-granted locks, on the assumtion that clients - * will be writing beyond the initial requested end and would - * then need to enqueue a new lock beyond previous request. - * l_req_extent->end strictly < req_start, checked above. */ - if (l_extent->start < req_start && new_ex->start != req_start) { - if (l_extent->end >= req_start) - new_ex->start = req_start; - else - new_ex->start = min(l_extent->end+1, req_start); - } - - /* If we need to cancel this lock anyways because our request - * overlaps the granted lock, we grow up to its requested - * extent start instead of limiting this extent, assuming that - * clients are writing forwards and the lock had over grown - * its extent downwards before we enqueued our request. */ - if (l_extent->end > req_end) { - if (l_extent->start <= req_end) - new_ex->end = max(lock->l_req_extent.start - 1, - req_end); - else - new_ex->end = max(l_extent->start - 1, req_end); - } - } - - ldlm_extent_internal_policy_fixup(req, new_ex, conflicting); - EXIT; -} - - -/* In order to determine the largest possible extent we can grant, we need - * to scan all of the queues. */ -static void ldlm_extent_policy(struct ldlm_resource *res, - struct ldlm_lock *lock, int *flags) -{ - struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF }; - - if (lock->l_export == NULL) - /* - * this is local lock taken by server (e.g., as a part of - * OST-side locking, or unlink handling). Expansion doesn't - * make a lot of sense for local locks, because they are - * dropped immediately on operation completion and would only - * conflict with other threads. - */ - return; - - if (lock->l_policy_data.l_extent.start == 0 && - lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF) - /* fast-path whole file locks */ - return; - - ldlm_extent_internal_policy_granted(lock, &new_ex); - ldlm_extent_internal_policy_waiting(lock, &new_ex); - - if (new_ex.start != lock->l_policy_data.l_extent.start || - new_ex.end != lock->l_policy_data.l_extent.end) { - *flags |= LDLM_FL_LOCK_CHANGED; - lock->l_policy_data.l_extent.start = new_ex.start; - lock->l_policy_data.l_extent.end = new_ex.end; - } -} static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks) { struct ldlm_resource *res = lock->l_resource; cfs_time_t now = cfs_time_current(); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION)) + return 1; + CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks); - if (contended_locks > res->lr_namespace->ns_contended_locks) + if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks) res->lr_contention_time = now; return cfs_time_before(now, cfs_time_add(res->lr_contention_time, - cfs_time_seconds(res->lr_namespace->ns_contention_time))); + cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time))); } struct ldlm_extent_compat_args { - struct list_head *work_list; + cfs_list_t *work_list; struct ldlm_lock *lock; ldlm_mode_t mode; int *locks; int *compat; + int *conflicts; }; static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n, @@ -303,21 +131,28 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n, struct ldlm_extent_compat_args *priv = data; struct ldlm_interval *node = to_ldlm_interval(n); struct ldlm_extent *extent; - struct list_head *work_list = priv->work_list; + cfs_list_t *work_list = priv->work_list; struct ldlm_lock *lock, *enq = priv->lock; ldlm_mode_t mode = priv->mode; int count = 0; ENTRY; - LASSERT(!list_empty(&node->li_group)); + LASSERT(!cfs_list_empty(&node->li_group)); - list_for_each_entry(lock, &node->li_group, l_sl_policy) { + cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) { /* interval tree is for granted lock */ LASSERTF(mode == lock->l_granted_mode, "mode = %s, lock->l_granted_mode = %s\n", ldlm_lockname[mode], ldlm_lockname[lock->l_granted_mode]); - count++; + + /* only count _requested_ region overlapped locks as contended + * locks */ + if (lock->l_req_extent.end >= enq->l_req_extent.start && + lock->l_req_extent.start <= enq->l_req_extent.end) { + count++; + (*priv->conflicts)++; + } if (lock->l_blocking_ast) ldlm_add_ast_work_item(lock, enq, work_list); } @@ -334,297 +169,435 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n, RETURN(INTERVAL_ITER_CONT); } -/* Determine if the lock is compatible with all locks on the queue. - * We stop walking the queue if we hit ourselves so we don't take - * conflicting locks enqueued after us into accound, or we'd wait forever. - * - * 0 if the lock is not compatible - * 1 if the lock is compatible - * 2 if this group lock is compatible and requires no further checking - * negative error, such as EWOULDBLOCK for group locks - */ static int -ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, - int *flags, ldlm_error_t *err, - struct list_head *work_list, int *contended_locks) +ldlm_extent_compat_granted_queue(cfs_list_t *queue, struct ldlm_lock *req, + int *flags, ldlm_error_t *err, + cfs_list_t *work_list, int *contended_locks) { - struct list_head *tmp; - struct ldlm_lock *lock; struct ldlm_resource *res = req->l_resource; ldlm_mode_t req_mode = req->l_req_mode; __u64 req_start = req->l_req_extent.start; __u64 req_end = req->l_req_extent.end; - int compat = 1; - int scan = 0; - int check_contention; + int compat = 1, conflicts; + /* Using interval tree for granted lock */ + struct ldlm_interval_tree *tree; + struct ldlm_extent_compat_args data = {.work_list = work_list, + .lock = req, + .locks = contended_locks, + .compat = &compat, + .conflicts = &conflicts }; + struct interval_node_extent ex = { .start = req_start, + .end = req_end }; + int idx, rc; ENTRY; - lockmode_verify(req_mode); - /* Using interval tree for granted lock */ - if (queue == &res->lr_granted) { - struct ldlm_interval_tree *tree; - struct ldlm_extent_compat_args data = {.work_list = work_list, - .lock = req, - .locks = contended_locks, - .compat = &compat }; - struct interval_node_extent ex = { .start = req_start, - .end = req_end }; - int idx, rc; - - for (idx = 0; idx < LCK_MODE_NUM; idx++) { - tree = &res->lr_itree[idx]; - if (tree->lit_root == NULL) /* empty tree, skipped */ + for (idx = 0; idx < LCK_MODE_NUM; idx++) { + tree = &res->lr_itree[idx]; + if (tree->lit_root == NULL) /* empty tree, skipped */ + continue; + + data.mode = tree->lit_mode; + if (lockmode_compat(req_mode, tree->lit_mode)) { + struct ldlm_interval *node; + struct ldlm_extent *extent; + + if (req_mode != LCK_GROUP) continue; - data.mode = tree->lit_mode; - if (lockmode_compat(req_mode, tree->lit_mode)) { - struct ldlm_interval *node; - struct ldlm_extent *extent; - - if (req_mode != LCK_GROUP) - continue; - - /* group lock, grant it immediately if - * compatible */ - node = to_ldlm_interval(tree->lit_root); - extent = ldlm_interval_extent(node); - if (req->l_policy_data.l_extent.gid == - extent->gid) - RETURN(2); + /* group lock, grant it immediately if + * compatible */ + node = to_ldlm_interval(tree->lit_root); + extent = ldlm_interval_extent(node); + if (req->l_policy_data.l_extent.gid == + extent->gid) + RETURN(2); + } + + if (tree->lit_mode == LCK_GROUP) { + if (*flags & LDLM_FL_BLOCK_NOWAIT) { + compat = -EWOULDBLOCK; + goto destroylock; } - if (tree->lit_mode == LCK_GROUP) { - if (*flags & LDLM_FL_BLOCK_NOWAIT) { - compat = -EWOULDBLOCK; - goto destroylock; - } + *flags |= LDLM_FL_NO_TIMEOUT; + if (!work_list) + RETURN(0); - *flags |= LDLM_FL_NO_TIMEOUT; - if (!work_list) - RETURN(0); + /* if work list is not NULL,add all + locks in the tree to work list */ + compat = 0; + interval_iterate(tree->lit_root, + ldlm_extent_compat_cb, &data); + continue; + } - /* if work list is not NULL,add all - locks in the tree to work list */ - compat = 0; - interval_iterate(tree->lit_root, - ldlm_extent_compat_cb, &data); - continue; - } - if (!work_list) { - rc = interval_is_overlapped(tree->lit_root,&ex); - if (rc) - RETURN(0); - } else { - interval_search(tree->lit_root, &ex, - ldlm_extent_compat_cb, &data); - if (!list_empty(work_list) && compat) + if (!work_list) { + rc = interval_is_overlapped(tree->lit_root, &ex); + if (rc) + RETURN(0); + } else { + struct interval_node_extent result_ext = { + .start = req->l_policy_data.l_extent.start, + .end = req->l_policy_data.l_extent.end }; + + conflicts = 0; + interval_search_expand_extent(tree->lit_root, &ex, + &result_ext, + ldlm_extent_compat_cb, + &data); + req->l_policy_data.l_extent.start = result_ext.start; + req->l_policy_data.l_extent.end = result_ext.end; + /* for granted locks, count non-compatible not overlapping + * locks in traffic index */ + req->l_traffic += tree->lit_size - conflicts; + + if (!cfs_list_empty(work_list)) { + if (compat) compat = 0; + /* if there is at least 1 conflicting lock, we + * do not expand to the left, since we often + * continue writing to the right. + */ + req->l_policy_data.l_extent.start = req_start; } } - } else { /* for waiting queue */ - list_for_each(tmp, queue) { - check_contention = 1; + } - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + RETURN(compat); +destroylock: + cfs_list_del_init(&req->l_res_link); + ldlm_lock_destroy_nolock(req); + *err = compat; + RETURN(compat); +} - if (req == lock) - break; +static int +ldlm_extent_compat_waiting_queue(cfs_list_t *queue, struct ldlm_lock *req, + int *flags, ldlm_error_t *err, + cfs_list_t *work_list, int *contended_locks) +{ + cfs_list_t *tmp; + struct ldlm_lock *lock; + ldlm_mode_t req_mode = req->l_req_mode; + __u64 req_start = req->l_req_extent.start; + __u64 req_end = req->l_req_extent.end; + int compat = 1; + int scan = 0; + int check_contention; + ENTRY; - if (unlikely(scan)) { - /* We only get here if we are queuing GROUP lock - and met some incompatible one. The main idea of this - code is to insert GROUP lock past compatible GROUP - lock in the waiting queue or if there is not any, - then in front of first non-GROUP lock */ - if (lock->l_req_mode != LCK_GROUP) { - /* Ok, we hit non-GROUP lock, there should - * be no more GROUP locks later on, queue in - * front of first non-GROUP lock */ - - ldlm_resource_insert_lock_after(lock, req); - list_del_init(&lock->l_res_link); - ldlm_resource_insert_lock_after(req, lock); - compat = 0; - break; - } - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* found it */ - ldlm_resource_insert_lock_after(lock, req); - compat = 0; - break; - } - continue; - } + cfs_list_for_each(tmp, queue) { + check_contention = 1; - /* locks are compatible, overlap doesn't matter */ - if (lockmode_compat(lock->l_req_mode, req_mode)) { - if (req_mode == LCK_PR && - ((lock->l_policy_data.l_extent.start <= - req->l_policy_data.l_extent.start) && - (lock->l_policy_data.l_extent.end >= - req->l_policy_data.l_extent.end))) { - /* If we met a PR lock just like us or wider, - and nobody down the list conflicted with - it, that means we can skip processing of - the rest of the list and safely place - ourselves at the end of the list, or grant - (dependent if we met an conflicting locks - before in the list). - In case of 1st enqueue only we continue - traversing if there is something conflicting - down the list because we need to make sure - that something is marked as AST_SENT as well, - in cse of empy worklist we would exit on - first conflict met. */ - /* There IS a case where such flag is - not set for a lock, yet it blocks - something. Luckily for us this is - only during destroy, so lock is - exclusive. So here we are safe */ - if (!(lock->l_flags & LDLM_FL_AST_SENT)) { - RETURN(compat); - } - } + lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link); - /* non-group locks are compatible, overlap doesn't - matter */ - if (likely(req_mode != LCK_GROUP)) - continue; - - /* If we are trying to get a GROUP lock and there is - another one of this kind, we need to compare gid */ - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* If existing lock with matched gid is granted, - we grant new one too. */ - if (lock->l_req_mode == lock->l_granted_mode) - RETURN(2); - - /* Otherwise we are scanning queue of waiting - * locks and it means current request would - * block along with existing lock (that is - * already blocked. - * If we are in nonblocking mode - return - * immediately */ - if (*flags & LDLM_FL_BLOCK_NOWAIT) { - compat = -EWOULDBLOCK; - goto destroylock; - } - /* If this group lock is compatible with another - * group lock on the waiting list, they must be - * together in the list, so they can be granted - * at the same time. Otherwise the later lock - * can get stuck behind another, incompatible, - * lock. */ - ldlm_resource_insert_lock_after(lock, req); - /* Because 'lock' is not granted, we can stop - * processing this queue and return immediately. - * There is no need to check the rest of the - * list. */ - RETURN(0); - } - } + if (req == lock) + break; - if (unlikely(req_mode == LCK_GROUP && - (lock->l_req_mode != lock->l_granted_mode))) { - scan = 1; + if (unlikely(scan)) { + /* We only get here if we are queuing GROUP lock + and met some incompatible one. The main idea of this + code is to insert GROUP lock past compatible GROUP + lock in the waiting queue or if there is not any, + then in front of first non-GROUP lock */ + if (lock->l_req_mode != LCK_GROUP) { + /* Ok, we hit non-GROUP lock, there should be no + more GROUP locks later on, queue in front of + first non-GROUP lock */ + + ldlm_resource_insert_lock_after(lock, req); + cfs_list_del_init(&lock->l_res_link); + ldlm_resource_insert_lock_after(req, lock); compat = 0; - if (lock->l_req_mode != LCK_GROUP) { - /* Ok, we hit non-GROUP lock, there should be no - more GROUP locks later on, queue in front of - first non-GROUP lock */ - - ldlm_resource_insert_lock_after(lock, req); - list_del_init(&lock->l_res_link); - ldlm_resource_insert_lock_after(req, lock); - break; - } - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* found it */ - ldlm_resource_insert_lock_after(lock, req); - break; + break; + } + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* found it */ + ldlm_resource_insert_lock_after(lock, req); + compat = 0; + break; + } + continue; + } + + /* locks are compatible, overlap doesn't matter */ + if (lockmode_compat(lock->l_req_mode, req_mode)) { + if (req_mode == LCK_PR && + ((lock->l_policy_data.l_extent.start <= + req->l_policy_data.l_extent.start) && + (lock->l_policy_data.l_extent.end >= + req->l_policy_data.l_extent.end))) { + /* If we met a PR lock just like us or wider, + and nobody down the list conflicted with + it, that means we can skip processing of + the rest of the list and safely place + ourselves at the end of the list, or grant + (dependent if we met an conflicting locks + before in the list). + In case of 1st enqueue only we continue + traversing if there is something conflicting + down the list because we need to make sure + that something is marked as AST_SENT as well, + in cse of empy worklist we would exit on + first conflict met. */ + /* There IS a case where such flag is + not set for a lock, yet it blocks + something. Luckily for us this is + only during destroy, so lock is + exclusive. So here we are safe */ + if (!(lock->l_flags & LDLM_FL_AST_SENT)) { + RETURN(compat); } - continue; } - if (unlikely(lock->l_req_mode == LCK_GROUP)) { - /* If compared lock is GROUP, then requested is PR/PW/ - * so this is not compatible; extent range does not - * matter */ + /* non-group locks are compatible, overlap doesn't + matter */ + if (likely(req_mode != LCK_GROUP)) + continue; + + /* If we are trying to get a GROUP lock and there is + another one of this kind, we need to compare gid */ + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* We are scanning queue of waiting + * locks and it means current request would + * block along with existing lock (that is + * already blocked. + * If we are in nonblocking mode - return + * immediately */ if (*flags & LDLM_FL_BLOCK_NOWAIT) { compat = -EWOULDBLOCK; goto destroylock; - } else { - *flags |= LDLM_FL_NO_TIMEOUT; } - } else if (lock->l_policy_data.l_extent.end < req_start || - lock->l_policy_data.l_extent.start > req_end) { + /* If this group lock is compatible with another + * group lock on the waiting list, they must be + * together in the list, so they can be granted + * at the same time. Otherwise the later lock + * can get stuck behind another, incompatible, + * lock. */ + ldlm_resource_insert_lock_after(lock, req); + /* Because 'lock' is not granted, we can stop + * processing this queue and return immediately. + * There is no need to check the rest of the + * list. */ + RETURN(0); + } + } + + if (unlikely(req_mode == LCK_GROUP && + (lock->l_req_mode != lock->l_granted_mode))) { + scan = 1; + compat = 0; + if (lock->l_req_mode != LCK_GROUP) { + /* Ok, we hit non-GROUP lock, there should + * be no more GROUP locks later on, queue in + * front of first non-GROUP lock */ + + ldlm_resource_insert_lock_after(lock, req); + cfs_list_del_init(&lock->l_res_link); + ldlm_resource_insert_lock_after(req, lock); + break; + } + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* found it */ + ldlm_resource_insert_lock_after(lock, req); + break; + } + continue; + } + + if (unlikely(lock->l_req_mode == LCK_GROUP)) { + /* If compared lock is GROUP, then requested is PR/PW/ + * so this is not compatible; extent range does not + * matter */ + if (*flags & LDLM_FL_BLOCK_NOWAIT) { + compat = -EWOULDBLOCK; + goto destroylock; + } else { + *flags |= LDLM_FL_NO_TIMEOUT; + } + } else if (!work_list) { + if (lock->l_policy_data.l_extent.end < req_start || + lock->l_policy_data.l_extent.start > req_end) /* if a non group lock doesn't overlap skip it */ continue; - } else if (lock->l_req_extent.end < req_start || - lock->l_req_extent.start > req_end) { - /* false contention, the requests doesn't really overlap */ - check_contention = 0; + RETURN(0); + } else { + /* for waiting locks, count all non-compatible locks in + * traffic index */ + ++req->l_traffic; + ++lock->l_traffic; + + /* adjust policy */ + if (lock->l_policy_data.l_extent.end < req_start) { + /* lock req + * ------------+ + * ++++++ | +++++++ + * + | + + * ++++++ | +++++++ + * ------------+ + */ + if (lock->l_policy_data.l_extent.end > + req->l_policy_data.l_extent.start) + req->l_policy_data.l_extent.start = + lock->l_policy_data.l_extent.end+1; + continue; + } else if (lock->l_req_extent.end < req_start) { + /* lock req + * ------------------+ + * ++++++ +++++++ + * + + | + * ++++++ +++++++ + * ------------------+ + */ + lock->l_policy_data.l_extent.end = + req_start - 1; + req->l_policy_data.l_extent.start = + req_start; + continue; + } else if (lock->l_policy_data.l_extent.start > + req_end) { + /* req lock + * +-------------- + * +++++++ | +++++++ + * + | + + * +++++++ | +++++++ + * +-------------- + */ + if (lock->l_policy_data.l_extent.start < + req->l_policy_data.l_extent.end) + req->l_policy_data.l_extent.end = + lock->l_policy_data.l_extent.start-1; + continue; + } else if (lock->l_req_extent.start > req_end) { + /* req lock + * +---------------------- + * +++++++ +++++++ + * | + + + * +++++++ +++++++ + * +---------------------- + */ + lock->l_policy_data.l_extent.start = + req_end + 1; + req->l_policy_data.l_extent.end=req_end; + continue; } + } /* policy_adj */ - if (!work_list) - RETURN(0); - + compat = 0; + if (work_list) { /* don't count conflicting glimpse locks */ - if (lock->l_req_mode == LCK_PR && - lock->l_policy_data.l_extent.start == 0 && - lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF) + if (lock->l_flags & LDLM_FL_HAS_INTENT) check_contention = 0; *contended_locks += check_contention; - compat = 0; if (lock->l_blocking_ast) ldlm_add_ast_work_item(lock, req, work_list); } } + RETURN(compat); +destroylock: + cfs_list_del_init(&req->l_res_link); + ldlm_lock_destroy_nolock(req); + *err = compat; + RETURN(compat); +} +/* Determine if the lock is compatible with all locks on the queue. + * We stop walking the queue if we hit ourselves so we don't take + * conflicting locks enqueued after us into accound, or we'd wait forever. + * + * 0 if the lock is not compatible + * 1 if the lock is compatible + * 2 if this group lock is compatible and requires no further checking + * negative error, such as EWOULDBLOCK for group locks + * + * Note: policy adjustment only happends during the 1st lock enqueue procedure + */ +static int +ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req, + int *flags, ldlm_error_t *err, + cfs_list_t *work_list, int *contended_locks) +{ + struct ldlm_resource *res = req->l_resource; + ldlm_mode_t req_mode = req->l_req_mode; + __u64 req_start = req->l_req_extent.start; + __u64 req_end = req->l_req_extent.end; + int compat = 1; + ENTRY; + + lockmode_verify(req_mode); + + if (queue == &res->lr_granted) + compat = ldlm_extent_compat_granted_queue(queue, req, flags, + err, work_list, + contended_locks); + else + compat = ldlm_extent_compat_waiting_queue(queue, req, flags, + err, work_list, + contended_locks); + + if (ldlm_check_contention(req, *contended_locks) && compat == 0 && (*flags & LDLM_FL_DENY_ON_CONTENTION) && req->l_req_mode != LCK_GROUP && req_end - req_start <= - req->l_resource->lr_namespace->ns_max_nolock_size) + ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size) GOTO(destroylock, compat = -EUSERS); RETURN(compat); destroylock: - list_del_init(&req->l_res_link); + cfs_list_del_init(&req->l_res_link); ldlm_lock_destroy_nolock(req); *err = compat; RETURN(compat); } -static void discard_bl_list(struct list_head *bl_list) +static void discard_bl_list(cfs_list_t *bl_list) { - struct list_head *tmp, *pos; + cfs_list_t *tmp, *pos; ENTRY; - list_for_each_safe(pos, tmp, bl_list) { + cfs_list_for_each_safe(pos, tmp, bl_list) { struct ldlm_lock *lock = - list_entry(pos, struct ldlm_lock, l_bl_ast); + cfs_list_entry(pos, struct ldlm_lock, l_bl_ast); - list_del_init(&lock->l_bl_ast); + cfs_list_del_init(&lock->l_bl_ast); LASSERT(lock->l_flags & LDLM_FL_AST_SENT); lock->l_flags &= ~LDLM_FL_AST_SENT; LASSERT(lock->l_bl_ast_run == 0); LASSERT(lock->l_blocking_lock); - LDLM_LOCK_PUT(lock->l_blocking_lock); + LDLM_LOCK_RELEASE(lock->l_blocking_lock); lock->l_blocking_lock = NULL; - LDLM_LOCK_PUT(lock); + LDLM_LOCK_RELEASE(lock); } EXIT; } +static inline void ldlm_process_extent_init(struct ldlm_lock *lock) +{ + lock->l_policy_data.l_extent.start = 0; + lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF; +} + +static inline void ldlm_process_extent_fini(struct ldlm_lock *lock, int *flags) +{ + if (lock->l_traffic > 4) + lock->l_policy_data.l_extent.start = lock->l_req_extent.start; + ldlm_extent_internal_policy_fixup(lock, + &lock->l_policy_data.l_extent, + lock->l_traffic); + if (lock->l_req_extent.start != lock->l_policy_data.l_extent.start || + lock->l_req_extent.end != lock->l_policy_data.l_extent.end) + *flags |= LDLM_FL_LOCK_CHANGED; +} + /* If first_enq is 0 (ie, called from ldlm_reprocess_queue): * - blocking ASTs have already been sent * - must call this function with the ns lock held @@ -633,7 +606,7 @@ static void discard_bl_list(struct list_head *bl_list) * - blocking ASTs have not been sent * - must call this function with the ns lock held once */ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err, struct list_head *work_list) + ldlm_error_t *err, cfs_list_t *work_list) { struct ldlm_resource *res = lock->l_resource; CFS_LIST_HEAD(rpc_list); @@ -641,7 +614,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, int contended_locks = 0; ENTRY; - LASSERT(list_empty(&res->lr_converting)); + LASSERT(cfs_list_empty(&res->lr_converting)); LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) || !(lock->l_flags & LDLM_AST_DISCARD_DATA)); check_res_locked(res); @@ -666,14 +639,24 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_resource_unlink_lock(lock); - if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) - ldlm_extent_policy(res, lock, flags); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) { + lock->l_policy_data.l_extent.start = + lock->l_req_extent.start; + lock->l_policy_data.l_extent.end = + lock->l_req_extent.end; + } else { + ldlm_process_extent_fini(lock, flags); + } + ldlm_grant_lock(lock, work_list); RETURN(LDLM_ITER_CONTINUE); } restart: contended_locks = 0; + + ldlm_process_extent_init(lock); + rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list, &contended_locks); if (rc < 0) @@ -688,8 +671,8 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, if (rc + rc2 == 2) { grant: - ldlm_extent_policy(res, lock, flags); ldlm_resource_unlink_lock(lock); + ldlm_process_extent_fini(lock, flags); ldlm_grant_lock(lock, NULL); } else { /* If either of the compat_queue()s returned failure, then we @@ -698,20 +681,35 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, * bug 2322: we used to unlink and re-add here, which was a * terrible folly -- if we goto restart, we could get * re-ordered! Causes deadlock, because ASTs aren't sent! */ - if (list_empty(&lock->l_res_link)) + if (cfs_list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); unlock_res(res); rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST); - lock_res(res); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) && + !ns_is_client(ldlm_res_to_ns(res))) + class_fail_export(lock->l_export); + + lock_res(res); if (rc == -ERESTART) { + + /* 15715: The lock was granted and destroyed after + * resource lock was dropped. Interval node was freed + * in ldlm_lock_destroy. Anyway, this always happens + * when a client is being evicted. So it would be + * ok to return an error. -jay */ + if (lock->l_destroyed) { + *err = -EAGAIN; + GOTO(out, rc = -EAGAIN); + } + /* lock was granted while resource was unlocked. */ if (lock->l_granted_mode == lock->l_req_mode) { /* bug 11300: if the lock has been granted, * break earlier because otherwise, we will go * to restart and ldlm_resource_unlink will be * called and it causes the interval node to be - * freed. Then we will fail at + * freed. Then we will fail at * ldlm_extent_add_lock() */ *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT); @@ -729,7 +727,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, } RETURN(0); out: - if (!list_empty(&rpc_list)) { + if (!cfs_list_empty(&rpc_list)) { LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA)); discard_bl_list(&rpc_list); } @@ -744,7 +742,7 @@ out: __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms) { struct ldlm_resource *res = lock->l_resource; - struct list_head *tmp; + cfs_list_t *tmp; struct ldlm_lock *lck; __u64 kms = 0; ENTRY; @@ -754,8 +752,8 @@ __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms) * calculation of the kms */ lock->l_flags |= LDLM_FL_KMS_IGNORE; - list_for_each(tmp, &res->lr_granted) { - lck = list_entry(tmp, struct ldlm_lock, l_res_link); + cfs_list_for_each(tmp, &res->lr_granted) { + lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link); if (lck->l_flags & LDLM_FL_KMS_IGNORE) continue; @@ -780,7 +778,7 @@ struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock) ENTRY; LASSERT(lock->l_resource->lr_type == LDLM_EXTENT); - OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node)); + OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO); if (node == NULL) RETURN(NULL); @@ -792,7 +790,8 @@ struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock) void ldlm_interval_free(struct ldlm_interval *node) { if (node) { - LASSERT(list_empty(&node->li_group)); + LASSERT(cfs_list_empty(&node->li_group)); + LASSERT(!interval_is_intree(&node->li_node)); OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node)); } } @@ -804,7 +803,7 @@ void ldlm_interval_attach(struct ldlm_interval *n, LASSERT(l->l_tree_node == NULL); LASSERT(l->l_resource->lr_type == LDLM_EXTENT); - list_add_tail(&l->l_sl_policy, &n->li_group); + cfs_list_add_tail(&l->l_sl_policy, &n->li_group); l->l_tree_node = n; } @@ -815,11 +814,11 @@ struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l) if (n == NULL) return NULL; - LASSERT(!list_empty(&n->li_group)); + LASSERT(!cfs_list_empty(&n->li_group)); l->l_tree_node = NULL; - list_del_init(&l->l_sl_policy); + cfs_list_del_init(&l->l_sl_policy); - return (list_empty(&n->li_group) ? n : NULL); + return (cfs_list_empty(&n->li_group) ? n : NULL); } static inline int lock_mode_to_index(ldlm_mode_t mode) @@ -845,6 +844,7 @@ void ldlm_extent_add_lock(struct ldlm_resource *res, node = lock->l_tree_node; LASSERT(node != NULL); + LASSERT(!interval_is_intree(&node->li_node)); idx = lock_mode_to_index(lock->l_granted_mode); LASSERT(lock->l_granted_mode == 1 << idx); @@ -872,14 +872,13 @@ void ldlm_extent_add_lock(struct ldlm_resource *res, void ldlm_extent_unlink_lock(struct ldlm_lock *lock) { struct ldlm_resource *res = lock->l_resource; - struct ldlm_interval *node; + struct ldlm_interval *node = lock->l_tree_node; struct ldlm_interval_tree *tree; int idx; - if (lock->l_granted_mode != lock->l_req_mode) + if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */ return; - LASSERT(lock->l_tree_node != NULL); idx = lock_mode_to_index(lock->l_granted_mode); LASSERT(lock->l_granted_mode == 1 << idx); tree = &res->lr_itree[idx];