*/
static int
ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
- struct list_head *work_list)
+ __u64 *ldlm_flags, struct list_head *work_list)
{
+ enum ldlm_mode req_mode = req->l_req_mode;
struct list_head *tmp;
struct ldlm_lock *lock;
__u64 req_bits = req->l_policy_data.l_inodebits.bits;
ENTRY;
+ lockmode_verify(req_mode);
+
/* There is no sense in lock with no bits set. Also such a lock
* would be compatible with any other bit lock.
* Meanwhile that can be true if there were just try_bits and all
continue;
}
- /* locks' mode are compatible, bits don't matter */
- if (lockmode_compat(lock->l_req_mode, req->l_req_mode)) {
- /* jump to last lock in mode group */
- tmp = mode_tail;
+ if (lockmode_compat(lock->l_req_mode, req_mode)) {
+ /* non group locks are compatible, bits don't matter */
+ if (likely(req_mode != LCK_GROUP)) {
+ /* jump to last lock in mode group */
+ tmp = mode_tail;
+ continue;
+ }
+
+ if (req->l_policy_data.l_inodebits.li_gid ==
+ lock->l_policy_data.l_inodebits.li_gid) {
+ if (ldlm_is_granted(lock))
+ RETURN(2);
+
+ if (*ldlm_flags & LDLM_FL_BLOCK_NOWAIT)
+ RETURN(-EWOULDBLOCK);
+
+ /* Place the same group together */
+ ldlm_resource_insert_lock_after(lock, req);
+ RETURN(0);
+ }
+ }
+
+ /* GROUP locks are placed to a head of the waiting list, but
+ * grouped by gid. */
+ if (unlikely(req_mode == LCK_GROUP && !ldlm_is_granted(lock))) {
+ compat = 0;
+ if (lock->l_req_mode != LCK_GROUP) {
+ /* Already not a GROUP lock, insert before. */
+ ldlm_resource_insert_lock_before(lock, req);
+ break;
+ }
+ /* Still GROUP but a different gid(the same gid would
+ * be handled above). Keep searching for the same gid */
+ LASSERT(req->l_policy_data.l_inodebits.li_gid !=
+ lock->l_policy_data.l_inodebits.li_gid);
continue;
}
!ldlm_is_cos_incompat(req) &&
ldlm_is_cos_enabled(req) &&
lock->l_client_cookie == req->l_client_cookie)
- goto not_conflicting;
+ goto skip_work_list;
+
+ compat = 0;
+
+ if (unlikely(lock->l_req_mode == LCK_GROUP)) {
+ LASSERT(ldlm_has_dom(lock));
+
+ if (*ldlm_flags & LDLM_FL_BLOCK_NOWAIT)
+ RETURN(-EWOULDBLOCK);
+
+ goto skip_work_list;
+ }
/* Found a conflicting policy group. */
if (!work_list)
RETURN(0);
- compat = 0;
-
/* Add locks of the policy group to @work_list
* as blocking locks for @req */
if (lock->l_blocking_ast)
ldlm_add_ast_work_item(lock,
req, work_list);
}
-not_conflicting:
+skip_work_list:
if (tmp == mode_tail)
break;
* waiting queues. The lock is granted if no conflicts are found in
* either queue.
*/
-int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
+int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *ldlm_flags,
enum ldlm_process_intention intention,
enum ldlm_error *err,
struct list_head *work_list)
struct ldlm_resource *res = lock->l_resource;
struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
NULL : work_list;
- int rc;
+ int rc, rc2 = 0;
ENTRY;
*err = ELDLM_LOCK_ABORTED;
if (intention == LDLM_PROCESS_RESCAN) {
struct list_head *bl_list =
- *flags & LDLM_FL_BLOCK_NOWAIT ? NULL : work_list;
+ *ldlm_flags & LDLM_FL_BLOCK_NOWAIT ? NULL : work_list;
LASSERT(lock->l_policy_data.l_inodebits.bits != 0);
* any blocked locks from granted queue during every reprocess
* and bl_ast will be sent if needed.
*/
+ *ldlm_flags = 0;
rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock,
- bl_list);
+ ldlm_flags, bl_list);
if (!rc)
RETURN(LDLM_ITER_STOP);
- rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, NULL);
+ rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock,
+ ldlm_flags, NULL);
if (!rc)
RETURN(LDLM_ITER_STOP);
lock->l_policy_data.l_inodebits.bits |=
lock->l_policy_data.l_inodebits.try_bits;
lock->l_policy_data.l_inodebits.try_bits = 0;
- *flags |= LDLM_FL_LOCK_CHANGED;
+ *ldlm_flags |= LDLM_FL_LOCK_CHANGED;
}
ldlm_resource_unlink_lock(lock);
ldlm_grant_lock(lock, grant_work);
RETURN(LDLM_ITER_CONTINUE);
}
- rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, work_list);
- rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, work_list);
+ rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock,
+ ldlm_flags, work_list);
+ if (rc < 0)
+ GOTO(out, *err = rc);
if (rc != 2) {
+ rc2 = ldlm_inodebits_compat_queue(&res->lr_waiting, lock,
+ ldlm_flags, work_list);
+ if (rc2 < 0)
+ GOTO(out, *err = rc = rc2);
+ }
+
+ if (rc + rc2 != 2) {
/* if there were only bits to try and all are conflicting */
if ((lock->l_policy_data.l_inodebits.bits |
lock->l_policy_data.l_inodebits.try_bits)) {
- /* There is no sense to set LDLM_FL_NO_TIMEOUT to @flags
- * for DOM lock while they are enqueued through intents,
- * i.e. @lock here is local which does not timeout. */
+ /* There is no sense to set LDLM_FL_NO_TIMEOUT to
+ * @ldlm_flags for DOM lock while they are enqueued
+ * through intents, i.e. @lock here is local which does
+ * not timeout. */
*err = ELDLM_OK;
}
} else {
lock->l_policy_data.l_inodebits.bits |=
lock->l_policy_data.l_inodebits.try_bits;
lock->l_policy_data.l_inodebits.try_bits = 0;
- *flags |= LDLM_FL_LOCK_CHANGED;
+ *ldlm_flags |= LDLM_FL_LOCK_CHANGED;
}
LASSERT(lock->l_policy_data.l_inodebits.bits);
ldlm_resource_unlink_lock(lock);
}
RETURN(LDLM_ITER_CONTINUE);
+out:
+ return rc;
}
#endif /* HAVE_SERVER_SUPPORT */
union ldlm_policy_data *lpolicy)
{
lpolicy->l_inodebits.bits = wpolicy->l_inodebits.bits;
+ lpolicy->l_inodebits.li_gid = wpolicy->l_inodebits.li_gid;
/**
* try_bits are to be handled outside of generic write_to_local due
* to different behavior on a server and client.
memset(wpolicy, 0, sizeof(*wpolicy));
wpolicy->l_inodebits.bits = lpolicy->l_inodebits.bits;
wpolicy->l_inodebits.try_bits = lpolicy->l_inodebits.try_bits;
+ wpolicy->l_inodebits.li_gid = lpolicy->l_inodebits.li_gid;
}
/**
}
void ldlm_inodebits_add_lock(struct ldlm_resource *res, struct list_head *head,
- struct ldlm_lock *lock)
+ struct ldlm_lock *lock, bool tail)
{
int i;
if (head == &res->lr_waiting) {
for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) {
- if (lock->l_policy_data.l_inodebits.bits & BIT(i))
+ if (!(lock->l_policy_data.l_inodebits.bits & BIT(i)))
+ continue;
+ if (tail)
list_add_tail(&lock->l_ibits_node->lin_link[i],
- &res->lr_ibits_queues->liq_waiting[i]);
+ &res->lr_ibits_queues->liq_waiting[i]);
+ else
+ list_add(&lock->l_ibits_node->lin_link[i],
+ &res->lr_ibits_queues->liq_waiting[i]);
}
} else if (head == &res->lr_granted && lock->l_ibits_node != NULL) {
for (i = 0; i < MDS_INODELOCK_NUMBITS; i++)
LASSERT(list_empty(&lock->l_ibits_node->lin_link[i]));
OBD_SLAB_FREE_PTR(lock->l_ibits_node, ldlm_inodebits_slab);
lock->l_ibits_node = NULL;
+ } else if (head != &res->lr_granted) {
+ /* we are inserting in a middle of a list, after @head */
+ struct ldlm_lock *orig = list_entry(head, struct ldlm_lock,
+ l_res_link);
+ LASSERT(orig->l_policy_data.l_inodebits.bits ==
+ lock->l_policy_data.l_inodebits.bits);
+ /* The is no a use case to insert before with exactly matched
+ * set of bits */
+ LASSERT(tail == false);
+
+ for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) {
+ if (!(lock->l_policy_data.l_inodebits.bits & (1 << i)))
+ continue;
+ list_add(&lock->l_ibits_node->lin_link[i],
+ &orig->l_ibits_node->lin_link[i]);
+ }
}
}