/* Determine if the lock is compatible with all locks on the queue.
* We stop walking the queue if we hit ourselves so we don't take
- * conflicting locks enqueued after us into accound, or we'd wait forever.
- *
- * 0 if the lock is not compatible
- * 1 if the lock is compatible
- * 2 if this group lock is compatible and requires no further checking
- * negative error, such as EWOULDBLOCK for group locks
- */
+ * conflicting locks enqueued after us into accound, or we'd wait forever. */
static int
ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
- int send_cbs, int *flags, ldlm_error_t *err)
+ int send_cbs)
{
struct list_head *tmp;
struct ldlm_lock *lock;
__u64 req_start = req->l_req_extent.start;
__u64 req_end = req->l_req_extent.end;
int compat = 1;
- int scan = 0;
ENTRY;
lockmode_verify(req_mode);
if (req == lock)
RETURN(compat);
- if (scan) {
- /* We only get here if we are queuing GROUP lock
- and met some incompatible one. The main idea of this
- code is to insert GROUP lock past compatible GROUP
- lock in the waiting queue or if there is not any,
- then in front of first non-GROUP lock */
- if (lock->l_req_mode != LCK_GROUP) {
- /* Ok, we hit non-GROUP lock, there should be no
- more GROUP locks later on, queue in front of
- first non-GROUP lock */
-
- ldlm_resource_insert_lock_after(lock, req);
- list_del_init(&lock->l_res_link);
- ldlm_resource_insert_lock_after(req, lock);
- RETURN(0);
- }
- if (req->l_policy_data.l_extent.gid ==
- lock->l_policy_data.l_extent.gid) {
- /* found it */
- ldlm_resource_insert_lock_after(lock,
- req);
- RETURN(0);
- }
- continue;
- }
-
/* locks are compatible, overlap doesn't matter */
- if (lockmode_compat(lock->l_req_mode, req_mode)) {
- /* non-group locks are compatible, overlap doesn't
- matter */
- if (req_mode != LCK_GROUP)
- continue;
-
- /* If we are trying to get a GROUP lock and there is
- another one of this kind, we need to compare gid */
- if (req->l_policy_data.l_extent.gid ==
- lock->l_policy_data.l_extent.gid) {
- if (lock->l_req_mode == lock->l_granted_mode)
- RETURN(2);
-
- /* If we are in nonblocking mode - return
- immediately */
- if (*flags & LDLM_FL_BLOCK_NOWAIT) {
- compat = -EWOULDBLOCK;
- goto destroylock;
- }
- /* If this group lock is compatible with another
- * group lock on the waiting list, they must be
- * together in the list, so they can be granted
- * at the same time. Otherwise the later lock
- * can get stuck behind another, incompatible,
- * lock. */
- ldlm_resource_insert_lock_after(lock, req);
- /* Because 'lock' is not granted, we can stop
- * processing this queue and return immediately.
- * There is no need to check the rest of the
- * list. */
- RETURN(0);
- }
- }
-
- if (req_mode == LCK_GROUP &&
- (lock->l_req_mode != lock->l_granted_mode)) {
- scan = 1;
- compat = 0;
- if (lock->l_req_mode != LCK_GROUP) {
- /* Ok, we hit non-GROUP lock, there should be no
- more GROUP locks later on, queue in front of
- first non-GROUP lock */
-
- ldlm_resource_insert_lock_after(lock, req);
- list_del_init(&lock->l_res_link);
- ldlm_resource_insert_lock_after(req, lock);
- RETURN(0);
- }
- if (req->l_policy_data.l_extent.gid ==
- lock->l_policy_data.l_extent.gid) {
- /* found it */
- ldlm_resource_insert_lock_after(lock, req);
- RETURN(0);
- }
+ if (lockmode_compat(lock->l_req_mode, req_mode))
continue;
- }
- if (lock->l_req_mode == LCK_GROUP) {
- /* If compared lock is GROUP, then requested is PR/PW/=>
- * this is not compatible; extent range does not
- * matter */
- if (*flags & LDLM_FL_BLOCK_NOWAIT) {
- compat = -EWOULDBLOCK;
- goto destroylock;
- } else {
- *flags |= LDLM_FL_NO_TIMEOUT;
- }
- } else if (lock->l_policy_data.l_extent.end < req_start ||
- lock->l_policy_data.l_extent.start > req_end) {
- /* if a non grouplock doesn't overlap skip it */
+ /* if lock doesn't overlap skip it */
+ if (lock->l_policy_data.l_extent.end < req_start ||
+ lock->l_policy_data.l_extent.start > req_end)
continue;
- }
if (!send_cbs)
RETURN(0);
ldlm_add_ast_work_item(lock, req, NULL, 0);
}
- return(compat);
-destroylock:
- list_del_init(&req->l_res_link);
- ldlm_lock_destroy(req);
- *err = compat;
RETURN(compat);
}
{
struct ldlm_resource *res = lock->l_resource;
struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
- int rc, rc2;
+ int rc;
ENTRY;
LASSERT(list_empty(&res->lr_converting));
- *err = ELDLM_OK;
if (!first_enq) {
- /* Careful observers will note that we don't handle -EWOULDBLOCK
- * here, but it's ok for a non-obvious reason -- compat_queue
- * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
- * flags should always be zero here, and if that ever stops
- * being true, we want to find out. */
- LASSERT(*flags == 0);
LASSERT(res->lr_tmp != NULL);
- rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 0, flags,
- err);
- if (rc == 1) {
- rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, 0,
- flags, err);
- }
- if (rc == 0)
+ rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 0);
+ if (!rc)
+ RETURN(LDLM_ITER_STOP);
+ rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, 0);
+ if (!rc)
RETURN(LDLM_ITER_STOP);
ldlm_resource_unlink_lock(lock);
restart:
LASSERT(res->lr_tmp == NULL);
res->lr_tmp = &rpc_list;
- rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 1, flags, err);
- if (rc < 0)
- GOTO(out, rc); /* lock was destroyed */
- if (rc == 2) {
- res->lr_tmp = NULL;
- goto grant;
- }
-
- rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, 1, flags, err);
- if (rc2 < 0)
- GOTO(out, rc = rc2); /* lock was destroyed */
+ rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 1);
+ rc += ldlm_extent_compat_queue(&res->lr_waiting, lock, 1);
res->lr_tmp = NULL;
- if (rc + rc2 == 2) {
- grant:
- ldlm_extent_policy(res, lock, flags);
- ldlm_resource_unlink_lock(lock);
- ldlm_grant_lock(lock, NULL, 0, 0);
- } else {
- /* If either of the compat_queue()s returned failure, then we
+ if (rc != 2) {
+ /* If either of the compat_queue()s returned 0, then we
* have ASTs to send and must go onto the waiting list.
*
* bug 2322: we used to unlink and re-add here, which was a
if (rc == -ERESTART)
GOTO(restart, -ERESTART);
*flags |= LDLM_FL_BLOCK_GRANTED;
+ } else {
+ ldlm_extent_policy(res, lock, flags);
+ ldlm_resource_unlink_lock(lock);
+ ldlm_grant_lock(lock, NULL, 0, 0);
}
- rc = 0;
-out:
- res->lr_tmp = NULL;
- RETURN(rc);
+ RETURN(0);
}
/* When a lock is cancelled by a client, the KMS may undergo change if this
* is the "highest lock". This function returns the new KMS value.
+ * Caller must hold ns_lock already.
*
* NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
__u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
__u64 kms = 0;
ENTRY;
- l_lock(&res->lr_namespace->ns_lock);
-
- /* don't let another thread in ldlm_extent_shift_kms race in just after
- * we finish and take our lock into account in its calculation of the
- * kms */
+ /* don't let another thread in ldlm_extent_shift_kms race in
+ * just after we finish and take our lock into account in its
+ * calculation of the kms */
lock->l_flags |= LDLM_FL_KMS_IGNORE;
list_for_each(tmp, &res->lr_granted) {
continue;
if (lck->l_policy_data.l_extent.end >= old_kms)
- GOTO(out, kms = old_kms);
+ RETURN(old_kms);
/* This extent _has_ to be smaller than old_kms (checked above)
* so kms can only ever be smaller or the same as old_kms. */
}
LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
- GOTO(out, kms);
- out:
- l_unlock(&res->lr_namespace->ns_lock);
- return kms;
+ RETURN(kms);
}