Whamcloud - gitweb
LU-13645 ldlm: group locks for DOM IBIT lock
[fs/lustre-release.git] / lustre / ldlm / ldlm_inodebits.c
index 7dd78d0..68914d9 100644 (file)
@@ -157,8 +157,9 @@ restart:
  */
 static int
 ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
-                           struct list_head *work_list)
+                           __u64 *ldlm_flags, struct list_head *work_list)
 {
+       enum ldlm_mode req_mode = req->l_req_mode;
        struct list_head *tmp;
        struct ldlm_lock *lock;
        __u64 req_bits = req->l_policy_data.l_inodebits.bits;
@@ -167,6 +168,8 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
 
        ENTRY;
 
+       lockmode_verify(req_mode);
+
        /* There is no sense in lock with no bits set. Also such a lock
         * would be compatible with any other bit lock.
         * Meanwhile that can be true if there were just try_bits and all
@@ -201,10 +204,41 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                        continue;
                }
 
-               /* locks' mode are compatible, bits don't matter */
-               if (lockmode_compat(lock->l_req_mode, req->l_req_mode)) {
-                       /* jump to last lock in mode group */
-                       tmp = mode_tail;
+               if (lockmode_compat(lock->l_req_mode, req_mode)) {
+                       /* non group locks are compatible, bits don't matter */
+                       if (likely(req_mode != LCK_GROUP)) {
+                               /* jump to last lock in mode group */
+                               tmp = mode_tail;
+                               continue;
+                       }
+
+                       if (req->l_policy_data.l_inodebits.li_gid ==
+                           lock->l_policy_data.l_inodebits.li_gid) {
+                               if (ldlm_is_granted(lock))
+                                       RETURN(2);
+
+                               if (*ldlm_flags & LDLM_FL_BLOCK_NOWAIT)
+                                       RETURN(-EWOULDBLOCK);
+
+                               /* Place the same group together */
+                               ldlm_resource_insert_lock_after(lock, req);
+                               RETURN(0);
+                       }
+               }
+
+               /* GROUP locks are placed to a head of the waiting list, but
+                * grouped by gid. */
+               if (unlikely(req_mode == LCK_GROUP && !ldlm_is_granted(lock))) {
+                       compat = 0;
+                       if (lock->l_req_mode != LCK_GROUP) {
+                               /* Already not a GROUP lock, insert before. */
+                               ldlm_resource_insert_lock_before(lock, req);
+                               break;
+                       }
+                       /* Still GROUP but a different gid(the same gid would
+                        * be handled above). Keep searching for the same gid */
+                       LASSERT(req->l_policy_data.l_inodebits.li_gid !=
+                               lock->l_policy_data.l_inodebits.li_gid);
                        continue;
                }
 
@@ -241,14 +275,23 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                                    !ldlm_is_cos_incompat(req) &&
                                    ldlm_is_cos_enabled(req) &&
                                    lock->l_client_cookie == req->l_client_cookie)
-                                       goto not_conflicting;
+                                       goto skip_work_list;
+
+                               compat = 0;
+
+                               if (unlikely(lock->l_req_mode == LCK_GROUP)) {
+                                       LASSERT(ldlm_has_dom(lock));
+
+                                       if (*ldlm_flags & LDLM_FL_BLOCK_NOWAIT)
+                                               RETURN(-EWOULDBLOCK);
+
+                                       goto skip_work_list;
+                               }
 
                                /* Found a conflicting policy group. */
                                if (!work_list)
                                        RETURN(0);
 
-                               compat = 0;
-
                                /* Add locks of the policy group to @work_list
                                 * as blocking locks for @req */
                                if (lock->l_blocking_ast)
@@ -260,7 +303,7 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                                                ldlm_add_ast_work_item(lock,
                                                                req, work_list);
                        }
-not_conflicting:
+skip_work_list:
                        if (tmp == mode_tail)
                                break;
 
@@ -280,7 +323,7 @@ not_conflicting:
  * waiting queues. The lock is granted if no conflicts are found in
  * either queue.
  */
-int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
+int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *ldlm_flags,
                                enum ldlm_process_intention intention,
                                enum ldlm_error *err,
                                struct list_head *work_list)
@@ -288,7 +331,7 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
        struct ldlm_resource *res = lock->l_resource;
        struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
                                                        NULL : work_list;
-       int rc;
+       int rc, rc2 = 0;
        ENTRY;
 
        *err = ELDLM_LOCK_ABORTED;
@@ -297,7 +340,7 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
 
        if (intention == LDLM_PROCESS_RESCAN) {
                struct list_head *bl_list =
-                       *flags & LDLM_FL_BLOCK_NOWAIT ? NULL : work_list;
+                       *ldlm_flags & LDLM_FL_BLOCK_NOWAIT ? NULL : work_list;
 
                LASSERT(lock->l_policy_data.l_inodebits.bits != 0);
 
@@ -320,11 +363,13 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
                 * any blocked locks from granted queue during every reprocess
                 * and bl_ast will be sent if needed.
                 */
+               *ldlm_flags = 0;
                rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock,
-                                                bl_list);
+                                                ldlm_flags, bl_list);
                if (!rc)
                        RETURN(LDLM_ITER_STOP);
-               rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, NULL);
+               rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock,
+                                                ldlm_flags, NULL);
                if (!rc)
                        RETURN(LDLM_ITER_STOP);
 
@@ -333,7 +378,7 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
                        lock->l_policy_data.l_inodebits.bits |=
                                lock->l_policy_data.l_inodebits.try_bits;
                        lock->l_policy_data.l_inodebits.try_bits = 0;
-                       *flags |= LDLM_FL_LOCK_CHANGED;
+                       *ldlm_flags |= LDLM_FL_LOCK_CHANGED;
                }
                ldlm_resource_unlink_lock(lock);
                ldlm_grant_lock(lock, grant_work);
@@ -342,16 +387,26 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
                RETURN(LDLM_ITER_CONTINUE);
        }
 
-       rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, work_list);
-       rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, work_list);
+       rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock,
+                                        ldlm_flags, work_list);
+       if (rc < 0)
+               GOTO(out, *err = rc);
 
        if (rc != 2) {
+               rc2 = ldlm_inodebits_compat_queue(&res->lr_waiting, lock,
+                                                 ldlm_flags, work_list);
+               if (rc2 < 0)
+                       GOTO(out, *err = rc = rc2);
+       }
+
+       if (rc + rc2 != 2) {
                /* if there were only bits to try and all are conflicting */
                if ((lock->l_policy_data.l_inodebits.bits |
                     lock->l_policy_data.l_inodebits.try_bits)) {
-                       /* There is no sense to set LDLM_FL_NO_TIMEOUT to @flags
-                        * for DOM lock while they are enqueued through intents,
-                        * i.e. @lock here is local which does not timeout. */
+                       /* There is no sense to set LDLM_FL_NO_TIMEOUT to
+                        * @ldlm_flags for DOM lock while they are enqueued
+                        * through intents, i.e. @lock here is local which does
+                        * not timeout. */
                        *err = ELDLM_OK;
                }
        } else {
@@ -360,7 +415,7 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
                        lock->l_policy_data.l_inodebits.bits |=
                                lock->l_policy_data.l_inodebits.try_bits;
                        lock->l_policy_data.l_inodebits.try_bits = 0;
-                       *flags |= LDLM_FL_LOCK_CHANGED;
+                       *ldlm_flags |= LDLM_FL_LOCK_CHANGED;
                }
                LASSERT(lock->l_policy_data.l_inodebits.bits);
                ldlm_resource_unlink_lock(lock);
@@ -369,6 +424,8 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
        }
 
        RETURN(LDLM_ITER_CONTINUE);
+out:
+       return rc;
 }
 #endif /* HAVE_SERVER_SUPPORT */
 
@@ -376,6 +433,7 @@ void ldlm_ibits_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
                                     union ldlm_policy_data *lpolicy)
 {
        lpolicy->l_inodebits.bits = wpolicy->l_inodebits.bits;
+       lpolicy->l_inodebits.li_gid = wpolicy->l_inodebits.li_gid;
        /**
         * try_bits are to be handled outside of generic write_to_local due
         * to different behavior on a server and client.
@@ -388,6 +446,7 @@ void ldlm_ibits_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
        memset(wpolicy, 0, sizeof(*wpolicy));
        wpolicy->l_inodebits.bits = lpolicy->l_inodebits.bits;
        wpolicy->l_inodebits.try_bits = lpolicy->l_inodebits.try_bits;
+       wpolicy->l_inodebits.li_gid = lpolicy->l_inodebits.li_gid;
 }
 
 /**
@@ -535,7 +594,7 @@ int ldlm_inodebits_alloc_lock(struct ldlm_lock *lock)
 }
 
 void ldlm_inodebits_add_lock(struct ldlm_resource *res, struct list_head *head,
-                            struct ldlm_lock *lock)
+                            struct ldlm_lock *lock, bool tail)
 {
        int i;
 
@@ -544,15 +603,36 @@ void ldlm_inodebits_add_lock(struct ldlm_resource *res, struct list_head *head,
 
        if (head == &res->lr_waiting) {
                for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) {
-                       if (lock->l_policy_data.l_inodebits.bits & BIT(i))
+                       if (!(lock->l_policy_data.l_inodebits.bits & BIT(i)))
+                               continue;
+                       if (tail)
                                list_add_tail(&lock->l_ibits_node->lin_link[i],
-                                       &res->lr_ibits_queues->liq_waiting[i]);
+                                        &res->lr_ibits_queues->liq_waiting[i]);
+                       else
+                               list_add(&lock->l_ibits_node->lin_link[i],
+                                        &res->lr_ibits_queues->liq_waiting[i]);
                }
        } else if (head == &res->lr_granted && lock->l_ibits_node != NULL) {
                for (i = 0; i < MDS_INODELOCK_NUMBITS; i++)
                        LASSERT(list_empty(&lock->l_ibits_node->lin_link[i]));
                OBD_SLAB_FREE_PTR(lock->l_ibits_node, ldlm_inodebits_slab);
                lock->l_ibits_node = NULL;
+       } else if (head != &res->lr_granted) {
+               /* we are inserting in a middle of a list, after @head */
+               struct ldlm_lock *orig = list_entry(head, struct ldlm_lock,
+                                                   l_res_link);
+               LASSERT(orig->l_policy_data.l_inodebits.bits ==
+                       lock->l_policy_data.l_inodebits.bits);
+               /* The is no a use case to insert before with exactly matched
+                * set of bits */
+               LASSERT(tail == false);
+
+               for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) {
+                       if (!(lock->l_policy_data.l_inodebits.bits & (1 << i)))
+                               continue;
+                       list_add(&lock->l_ibits_node->lin_link[i],
+                                &orig->l_ibits_node->lin_link[i]);
+               }
        }
 }