{
struct ldlm_resource *res = lock->l_resource;
- if (!RB_EMPTY_NODE(&lock->l_fl_rb))
- flock_remove(lock, &res->lr_flock_node.lfn_root);
+ LASSERT(!RB_EMPTY_NODE(&lock->l_fl_rb));
+
+ flock_remove(lock, &res->lr_flock_node.lfn_root);
START(lock) = start;
LAST(lock) = end;
- if (!RB_EMPTY_NODE(&lock->l_fl_rb))
- flock_insert(lock, &res->lr_flock_node.lfn_root);
+ flock_insert(lock, &res->lr_flock_node.lfn_root);
}
/**
{
struct ldlm_resource *res = req->l_resource;
struct ldlm_namespace *ns = ldlm_res_to_ns(res);
- struct ldlm_lock *tmp;
- struct ldlm_lock *ownlocks = NULL;
- struct ldlm_lock *lock = NULL;
- struct ldlm_lock *new = req;
- struct ldlm_lock *new2 = NULL;
+ struct ldlm_lock *ownlocks, **ownlocks_end;
+ struct ldlm_lock *nextlock, *lock = NULL;
+ struct ldlm_lock *new, *new2 = NULL;
enum ldlm_mode mode = req->l_req_mode;
int local = ns_is_client(ns);
int added = (mode == LCK_NL);
int splitted = 0;
+ __u64 start = START(req), end = LAST(req);
const struct ldlm_callback_suite null_cbs = { NULL };
#ifdef HAVE_SERVER_SUPPORT
struct list_head *grant_work = (intention == LDLM_PROCESS_ENQUEUE ?
ENTRY;
CDEBUG(D_DLMTRACE,
"flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
- *flags, new->l_policy_data.l_flock.owner,
- new->l_policy_data.l_flock.pid, mode,
+ *flags, req->l_policy_data.l_flock.owner,
+ req->l_policy_data.l_flock.pid, mode,
req->l_policy_data.l_flock.start,
req->l_policy_data.l_flock.end);
}
reprocess:
-
+ ownlocks = NULL;
+ ownlocks_end = &ownlocks;
+ new = req;
+ if (!(*flags & LDLM_FL_TEST_LOCK) && (mode != LCK_NL)) {
+ if (start > 0)
+ start--;
+ if (end < OBD_OBJECT_EOF)
+ end++;
+ }
if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
- /* This loop determines where this processes locks start
- * in the resource lr_granted list.
+ /* This loop collects all overlapping locks with the
+ * same owner.
*/
- list_for_each_entry(lock, &res->lr_granted, l_res_link) {
+ for (lock = flock_iter_first(&res->lr_flock_node.lfn_root,
+ start, end);
+ lock;
+ lock = flock_iter_next(lock, start, end))
if (ldlm_same_flock_owner(lock, req)) {
- ownlocks = lock;
- break;
+ lock->l_same_owner = NULL;
+ *ownlocks_end = lock;
+ ownlocks_end = &lock->l_same_owner;
}
- }
}
#ifdef HAVE_SERVER_SUPPORT
else {
- int reprocess_failed = 0;
- int pr_matched = 0;
-
lockmode_verify(mode);
/* This loop determines if there are existing locks
- * that conflict with the new lock request.
+ * that conflict with the new lock request, and collects
+ * overlapping locks for same owner.
*/
- list_for_each_entry(lock, &res->lr_granted, l_res_link) {
+ for (lock = flock_iter_first(&res->lr_flock_node.lfn_root,
+ start, end);
+ lock;
+ lock = flock_iter_next(lock, start, end)) {
if (ldlm_same_flock_owner(lock, req)) {
- if (!ownlocks) {
- ownlocks = lock;
- if (pr_matched)
- break;
- }
+ lock->l_same_owner = NULL;
+ *ownlocks_end = lock;
+ ownlocks_end = &lock->l_same_owner;
continue;
}
- if (req->l_req_mode == LCK_PR &&
- lock->l_granted_mode == LCK_PR &&
- lock->l_policy_data.l_flock.start <=
- req->l_policy_data.l_flock.start &&
- lock->l_policy_data.l_flock.end >=
- req->l_policy_data.l_flock.end) {
- /* there can't be granted WR lock */
- if (ownlocks)
- break;
- pr_matched = 1;
+ /* Only make sense for same owner */
+ if (!ldlm_flocks_overlap(lock, req))
continue;
- }
+
/* locks are compatible, overlap doesn't matter */
if (lockmode_compat(lock->l_granted_mode, mode))
continue;
- if (!ldlm_flocks_overlap(lock, req))
- continue;
-
if (intention != LDLM_PROCESS_ENQUEUE) {
ldlm_flock_blocking_unlink(req);
ldlm_flock_blocking_link(req, lock);
req, grant_work);
RETURN(LDLM_ITER_CONTINUE);
}
- reprocess_failed = 1;
- break;
+ RETURN(LDLM_ITER_CONTINUE);
}
if (*flags & LDLM_FL_BLOCK_NOWAIT) {
RETURN(LDLM_ITER_STOP);
}
- /* add lock to blocking list before deadlock
+ /* Add lock to blocking list before deadlock
* check to prevent race
*/
ldlm_flock_blocking_link(req, lock);
*flags |= LDLM_FL_BLOCK_GRANTED;
RETURN(LDLM_ITER_STOP);
}
- if (reprocess_failed)
- RETURN(LDLM_ITER_CONTINUE);
}
if (*flags & LDLM_FL_TEST_LOCK) {
ldlm_flock_blocking_unlink(req);
#endif /* HAVE_SERVER_SUPPORT */
- /* Scan the locks owned by this process to find the insertion point
- * (as locks are ordered), and to handle overlaps.
+ /* Scan the locks owned by this process to handle overlaps.
* We may have to merge or split existing locks.
*/
- if (ownlocks)
- lock = ownlocks;
- else
- lock = list_entry(&res->lr_granted,
- struct ldlm_lock, l_res_link);
- list_for_each_entry_safe_from(lock, tmp, &res->lr_granted, l_res_link) {
- if (!ldlm_same_flock_owner(lock, new))
- break;
+ for (lock = ownlocks; lock; lock = nextlock) {
+ nextlock = lock->l_same_owner;
if (lock->l_granted_mode == mode) {
- /* If the modes are the same then we need to process
- * locks that overlap OR adjoin the new lock. The extra
- * logic condition is necessary to deal with arithmetic
- * overflow and underflow.
+ /*
+ * If the modes are the same then we need to process
+ * locks that overlap OR adjoin the new lock.
*/
- if ((new->l_policy_data.l_flock.start >
- (lock->l_policy_data.l_flock.end + 1))
- && (lock->l_policy_data.l_flock.end !=
- OBD_OBJECT_EOF))
- continue;
-
- if ((new->l_policy_data.l_flock.end <
- (lock->l_policy_data.l_flock.start - 1))
- && (lock->l_policy_data.l_flock.start != 0))
- break;
-
- if (new->l_policy_data.l_flock.start <
- lock->l_policy_data.l_flock.start)
- ldlm_flock_range_update(lock, START(new), LAST(lock));
+ if (START(new) < START(lock))
+ START(lock) = START(new);
else
- ldlm_flock_range_update(new, START(lock), LAST(new));
+ START(new) = START(lock);
- if (new->l_policy_data.l_flock.end >
- lock->l_policy_data.l_flock.end)
- ldlm_flock_range_update(lock, START(lock), LAST(new));
+ if (LAST(new) > LAST(lock))
+ LAST(lock) = LAST(new);
else
- ldlm_flock_range_update(new, START(new), LAST(lock));
+ LAST(new) = LAST(lock);
if (added) {
ldlm_flock_destroy(lock, mode, *flags);
new = lock;
added = 1;
}
+ ldlm_flock_range_update(new, START(new), LAST(new));
continue;
}
- if (new->l_policy_data.l_flock.start >
- lock->l_policy_data.l_flock.end)
+ /* Non-overlap locks only for same mode */
+ if (!ldlm_flocks_overlap(lock, new))
continue;
- if (new->l_policy_data.l_flock.end <
- lock->l_policy_data.l_flock.start)
- break;
-
res->lr_flock_node.lfn_needs_reprocess = true;
- if (new->l_policy_data.l_flock.start <=
- lock->l_policy_data.l_flock.start) {
- if (new->l_policy_data.l_flock.end <
- lock->l_policy_data.l_flock.end) {
- ldlm_flock_range_update(lock, LAST(new)+1, LAST(lock));
- break;
- }
- ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
+ if (START(new) <= START(lock)) {
+ if (LAST(new) < LAST(lock))
+ ldlm_flock_range_update(lock,
+ LAST(new) + 1,
+ LAST(lock));
+ else
+ ldlm_flock_destroy(lock, lock->l_req_mode,
+ *flags);
continue;
}
- if (new->l_policy_data.l_flock.end >=
- lock->l_policy_data.l_flock.end) {
+ if (LAST(new) >= LAST(lock)) {
ldlm_flock_range_update(lock, START(lock), LAST(new) - 1);
continue;
}
new->l_policy_data.l_flock.pid;
new2->l_policy_data.l_flock.owner =
new->l_policy_data.l_flock.owner;
- new2->l_policy_data.l_flock.start =
- lock->l_policy_data.l_flock.start;
- new2->l_policy_data.l_flock.end =
- new->l_policy_data.l_flock.start - 1;
+ START(new2) = START(lock);
+ LAST(new2) = START(new) - 1;
ldlm_flock_range_update(lock, LAST(new) + 1, LAST(lock));
new2->l_conn_export = lock->l_conn_export;
if (lock->l_export != NULL) {
ldlm_lock_addref_internal_nolock(new2,
lock->l_granted_mode);
- /* insert new2 at lock */
- ldlm_flock_add_lock(res, &lock->l_res_link, new2);
+ ldlm_flock_add_lock(res, &res->lr_granted, new2);
LDLM_LOCK_RELEASE(new2);
break;
}
/* Add req to the granted queue before calling ldlm_reprocess_all(). */
if (!added) {
list_del_init(&req->l_res_link);
- /* insert new lock before "lock", which might be the
- * next lock for this owner, or might be the first
- * lock for the next owner, or might not be a lock at
- * all, but instead points at the head of the list
- */
- ldlm_flock_add_lock(res, &lock->l_res_link, req);
+ ldlm_flock_add_lock(res, &res->lr_granted, req);
}
if (*flags != LDLM_FL_WAIT_NOREPROC) {