* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
__u64 req_start = req->l_req_extent.start;
__u64 req_end = req->l_req_extent.end;
struct ldlm_interval_tree *tree;
- struct interval_node_extent limiter = { new_ex->start, new_ex->end };
+ struct interval_node_extent limiter = {
+ .start = new_ex->start,
+ .end = new_ex->end,
+ };
int conflicting = 0;
int idx;
ENTRY;
/* Using interval tree to handle the LDLM extent granted locks. */
for (idx = 0; idx < LCK_MODE_NUM; idx++) {
- struct interval_node_extent ext = { req_start, req_end };
+ struct interval_node_extent ext = {
+ .start = req_start,
+ .end = req_end,
+ };
tree = &res->lr_itree[idx];
if (lockmode_compat(tree->lit_mode, req_mode))
static void ldlm_extent_policy(struct ldlm_resource *res,
struct ldlm_lock *lock, __u64 *flags)
{
- struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
-
- if (lock->l_export == NULL)
- /*
- * this is local lock taken by server (e.g., as a part of
- * OST-side locking, or unlink handling). Expansion doesn't
- * make a lot of sense for local locks, because they are
- * dropped immediately on operation completion and would only
- * conflict with other threads.
- */
- return;
+ struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
+
+ if (lock->l_export == NULL)
+ /*
+ * this is a local lock taken by server (e.g., as a part of
+ * OST-side locking, or unlink handling). Expansion doesn't
+ * make a lot of sense for local locks, because they are
+ * dropped immediately on operation completion and would only
+ * conflict with other threads.
+ */
+ return;
- if (lock->l_policy_data.l_extent.start == 0 &&
- lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
- /* fast-path whole file locks */
- return;
+ if (lock->l_policy_data.l_extent.start == 0 &&
+ lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
+ /* fast-path whole file locks */
+ return;
- ldlm_extent_internal_policy_granted(lock, &new_ex);
- ldlm_extent_internal_policy_waiting(lock, &new_ex);
+ /* Because reprocess_queue zeroes flags and uses it to return
+ * LDLM_FL_LOCK_CHANGED, we must check for the NO_EXPANSION flag
+ * in the lock flags rather than the 'flags' argument */
+ if (likely(!(lock->l_flags & LDLM_FL_NO_EXPANSION))) {
+ ldlm_extent_internal_policy_granted(lock, &new_ex);
+ ldlm_extent_internal_policy_waiting(lock, &new_ex);
+ } else {
+ LDLM_DEBUG(lock, "Not expanding manually requested lock.\n");
+ new_ex.start = lock->l_policy_data.l_extent.start;
+ new_ex.end = lock->l_policy_data.l_extent.end;
+ /* In case the request is not on correct boundaries, we call
+ * fixup. (normally called in ldlm_extent_internal_policy_*) */
+ ldlm_extent_internal_policy_fixup(lock, &new_ex, 0);
+ }
- if (new_ex.start != lock->l_policy_data.l_extent.start ||
- new_ex.end != lock->l_policy_data.l_extent.end) {
- *flags |= LDLM_FL_LOCK_CHANGED;
- lock->l_policy_data.l_extent.start = new_ex.start;
- lock->l_policy_data.l_extent.end = new_ex.end;
- }
+ if (!ldlm_extent_equal(&new_ex, &lock->l_policy_data.l_extent)) {
+ *flags |= LDLM_FL_LOCK_CHANGED;
+ lock->l_policy_data.l_extent.start = new_ex.start;
+ lock->l_policy_data.l_extent.end = new_ex.end;
+ }
}
static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
{
struct ldlm_resource *res = lock->l_resource;
- cfs_time_t now = cfs_time_current();
+ time64_t now = ktime_get_seconds();
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
return 1;
CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
res->lr_contention_time = now;
- return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
- cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
+
+ return now < res->lr_contention_time +
+ ldlm_res_to_ns(res)->ns_contention_time;
}
struct ldlm_extent_compat_args {
}
if (tree->lit_mode == LCK_GROUP) {
- if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+ if (*flags & (LDLM_FL_BLOCK_NOWAIT |
+ LDLM_FL_SPECULATIVE)) {
compat = -EWOULDBLOCK;
goto destroylock;
}
continue;
}
- if (!work_list) {
- rc = interval_is_overlapped(tree->lit_root,&ex);
- if (rc)
- RETURN(0);
+ /* We've found a potentially blocking lock, check
+ * compatibility. This handles locks other than GROUP
+ * locks, which are handled separately above.
+ *
+ * Locks with FL_SPECULATIVE are asynchronous requests
+ * which must never wait behind another lock, so they
+ * fail if any conflicting lock is found. */
+ if (!work_list || (*flags & LDLM_FL_SPECULATIVE)) {
+ rc = interval_is_overlapped(tree->lit_root,
+ &ex);
+ if (rc) {
+ if (!work_list) {
+ RETURN(0);
+ } else {
+ compat = -EWOULDBLOCK;
+ goto destroylock;
+ }
+ }
} else {
interval_search(tree->lit_root, &ex,
ldlm_extent_compat_cb, &data);
lock->l_policy_data.l_extent.gid) {
/* If existing lock with matched gid is granted,
we grant new one too. */
- if (lock->l_req_mode == lock->l_granted_mode)
- RETURN(2);
+ if (ldlm_is_granted(lock))
+ RETURN(2);
/* Otherwise we are scanning queue of waiting
* locks and it means current request would
* already blocked.
* If we are in nonblocking mode - return
* immediately */
- if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+ if (*flags & (LDLM_FL_BLOCK_NOWAIT
+ | LDLM_FL_SPECULATIVE)) {
compat = -EWOULDBLOCK;
goto destroylock;
}
}
}
- if (unlikely(req_mode == LCK_GROUP &&
- (lock->l_req_mode != lock->l_granted_mode))) {
+ if (unlikely(req_mode == LCK_GROUP &&
+ !ldlm_is_granted(lock))) {
scan = 1;
compat = 0;
if (lock->l_req_mode != LCK_GROUP) {
}
if (unlikely(lock->l_req_mode == LCK_GROUP)) {
- /* If compared lock is GROUP, then requested is PR/PW/
- * so this is not compatible; extent range does not
- * matter */
- if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+ /* If compared lock is GROUP, then requested is
+ * PR/PW so this is not compatible; extent
+ * range does not matter */
+ if (*flags & (LDLM_FL_BLOCK_NOWAIT
+ | LDLM_FL_SPECULATIVE)) {
compat = -EWOULDBLOCK;
goto destroylock;
} else {
if (!work_list)
RETURN(0);
+ if (*flags & LDLM_FL_SPECULATIVE) {
+ compat = -EWOULDBLOCK;
+ goto destroylock;
+ }
+
/* don't count conflicting glimpse locks */
if (lock->l_req_mode == LCK_PR &&
lock->l_policy_data.l_extent.start == 0 &&
void ldlm_lock_prolong_one(struct ldlm_lock *lock,
struct ldlm_prolong_args *arg)
{
- int timeout;
+ time64_t timeout;
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PROLONG_PAUSE, 3);
if (arg->lpa_export != lock->l_export ||
lock->l_flags & LDLM_FL_DESTROYED)
*/
timeout = arg->lpa_timeout + (ldlm_bl_timeout(lock) >> 1);
- LDLM_DEBUG(lock, "refreshed to %ds.\n", timeout);
+ LDLM_DEBUG(lock, "refreshed to %llds.\n", timeout);
arg->lpa_blocks_cnt++;
}
EXPORT_SYMBOL(ldlm_resource_prolong);
-
-/**
- * Discard all AST work items from list.
- *
- * If for whatever reason we do not want to send ASTs to conflicting locks
- * anymore, disassemble the list with this function.
- */
-static void discard_bl_list(struct list_head *bl_list)
-{
- struct list_head *tmp, *pos;
- ENTRY;
-
- list_for_each_safe(pos, tmp, bl_list) {
- struct ldlm_lock *lock =
- list_entry(pos, struct ldlm_lock, l_bl_ast);
-
- list_del_init(&lock->l_bl_ast);
- LASSERT(ldlm_is_ast_sent(lock));
- ldlm_clear_ast_sent(lock);
- LASSERT(lock->l_bl_ast_run == 0);
- LASSERT(lock->l_blocking_lock);
- LDLM_LOCK_RELEASE(lock->l_blocking_lock);
- lock->l_blocking_lock = NULL;
- LDLM_LOCK_RELEASE(lock);
- }
- EXIT;
-}
-
/**
* Process a granting attempt for extent lock.
* Must be called with ns lock held.
* This function looks for any conflicts for \a lock in the granted or
* waiting queues. The lock is granted if no conflicts are found in
* either queue.
- *
- * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
- * - blocking ASTs have already been sent
- *
- * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
- * - blocking ASTs have not been sent yet, so list of conflicting locks
- * would be collected and ASTs sent.
*/
int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
- int first_enq, enum ldlm_error *err,
- struct list_head *work_list)
+ enum ldlm_process_intention intention,
+ enum ldlm_error *err, struct list_head *work_list)
{
struct ldlm_resource *res = lock->l_resource;
- struct list_head rpc_list;
int rc, rc2;
int contended_locks = 0;
+ struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
+ NULL : work_list;
ENTRY;
- LASSERT(lock->l_granted_mode != lock->l_req_mode);
- LASSERT(list_empty(&res->lr_converting));
+ LASSERT(!ldlm_is_granted(lock));
LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
!ldlm_is_ast_discard_data(lock));
- INIT_LIST_HEAD(&rpc_list);
check_res_locked(res);
*err = ELDLM_OK;
- if (!first_enq) {
- /* Careful observers will note that we don't handle -EWOULDBLOCK
- * here, but it's ok for a non-obvious reason -- compat_queue
- * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
- * flags should always be zero here, and if that ever stops
- * being true, we want to find out. */
+ if (intention == LDLM_PROCESS_RESCAN) {
+ /* Careful observers will note that we don't handle -EWOULDBLOCK
+ * here, but it's ok for a non-obvious reason -- compat_queue
+ * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT |
+ * SPECULATIVE). flags should always be zero here, and if that
+ * ever stops being true, we want to find out. */
LASSERT(*flags == 0);
rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
err, NULL, &contended_locks);
if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
ldlm_extent_policy(res, lock, flags);
- ldlm_grant_lock(lock, work_list);
+ ldlm_grant_lock(lock, grant_work);
RETURN(LDLM_ITER_CONTINUE);
}
- restart:
contended_locks = 0;
rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
- &rpc_list, &contended_locks);
- if (rc < 0)
- GOTO(out, rc); /* lock was destroyed */
- if (rc == 2)
- goto grant;
-
- rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
- &rpc_list, &contended_locks);
- if (rc2 < 0)
- GOTO(out, rc = rc2); /* lock was destroyed */
-
- if (rc + rc2 == 2) {
- grant:
- ldlm_extent_policy(res, lock, flags);
- ldlm_resource_unlink_lock(lock);
- ldlm_grant_lock(lock, NULL);
- } else {
- /* If either of the compat_queue()s returned failure, then we
- * have ASTs to send and must go onto the waiting list.
- *
- * bug 2322: we used to unlink and re-add here, which was a
- * terrible folly -- if we goto restart, we could get
- * re-ordered! Causes deadlock, because ASTs aren't sent! */
- if (list_empty(&lock->l_res_link))
- ldlm_resource_add_lock(res, &res->lr_waiting, lock);
- unlock_res(res);
- rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
- LDLM_WORK_BL_AST);
-
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
- !ns_is_client(ldlm_res_to_ns(res)))
- class_fail_export(lock->l_export);
-
- lock_res(res);
- if (rc == -ERESTART) {
- /* 15715: The lock was granted and destroyed after
- * resource lock was dropped. Interval node was freed
- * in ldlm_lock_destroy. Anyway, this always happens
- * when a client is being evicted. So it would be
- * ok to return an error. -jay */
- if (ldlm_is_destroyed(lock)) {
- *err = -EAGAIN;
- GOTO(out, rc = -EAGAIN);
- }
-
- /* lock was granted while resource was unlocked. */
- if (lock->l_granted_mode == lock->l_req_mode) {
- /* bug 11300: if the lock has been granted,
- * break earlier because otherwise, we will go
- * to restart and ldlm_resource_unlink will be
- * called and it causes the interval node to be
- * freed. Then we will fail at
- * ldlm_extent_add_lock() */
- *flags &= ~LDLM_FL_BLOCKED_MASK;
- GOTO(out, rc = 0);
- }
-
- GOTO(restart, rc);
- }
-
- /* this way we force client to wait for the lock
- * endlessly once the lock is enqueued -bzzz */
- *flags |= LDLM_FL_BLOCK_GRANTED | LDLM_FL_NO_TIMEOUT;
-
+ work_list, &contended_locks);
+ if (rc < 0)
+ GOTO(out_rpc_list, rc);
+
+ rc2 = 0;
+ if (rc != 2) {
+ rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock,
+ flags, err, work_list,
+ &contended_locks);
+ if (rc2 < 0)
+ GOTO(out_rpc_list, rc = rc2);
}
- RETURN(0);
-out:
- if (!list_empty(&rpc_list)) {
- LASSERT(!ldlm_is_ast_discard_data(lock));
- discard_bl_list(&rpc_list);
+
+ if (rc + rc2 == 2) {
+ ldlm_extent_policy(res, lock, flags);
+ ldlm_resource_unlink_lock(lock);
+ ldlm_grant_lock(lock, grant_work);
+ } else {
+ /* Adding LDLM_FL_NO_TIMEOUT flag to granted lock to
+ * force client to wait for the lock endlessly once
+ * the lock is enqueued -bzzz */
+ *flags |= LDLM_FL_NO_TIMEOUT;
}
+ rc = LDLM_ITER_CONTINUE;
+
+out_rpc_list:
RETURN(rc);
}
#endif /* HAVE_SERVER_SUPPORT */
EXPORT_SYMBOL(ldlm_extent_shift_kms);
struct kmem_cache *ldlm_interval_slab;
-struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
+static struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
{
struct ldlm_interval *node;
ENTRY;
int index;
LASSERT(mode != 0);
- LASSERT(IS_PO2(mode));
- for (index = -1; mode != 0; index++, mode >>= 1)
- /* do nothing */;
+ LASSERT(is_power_of_2(mode));
+ index = ilog2(mode);
LASSERT(index < LCK_MODE_NUM);
return index;
}
+int ldlm_extent_alloc_lock(struct ldlm_lock *lock)
+{
+ lock->l_tree_node = NULL;
+ if (ldlm_interval_alloc(lock) == NULL)
+ return -ENOMEM;
+ return 0;
+}
+
/** Add newly granted lock into interval tree for the resource. */
void ldlm_extent_add_lock(struct ldlm_resource *res,
struct ldlm_lock *lock)
struct ldlm_extent *extent;
int idx, rc;
- LASSERT(lock->l_granted_mode == lock->l_req_mode);
+ LASSERT(ldlm_is_granted(lock));
node = lock->l_tree_node;
LASSERT(node != NULL);