Whamcloud - gitweb
LU-16046 ldlm: group lock unlock fix 08/49008/3
authorVitaly Fertman <c17818@cray.com>
Thu, 27 Oct 2022 19:54:18 +0000 (22:54 +0300)
committerOleg Drokin <green@whamcloud.com>
Mon, 14 Nov 2022 08:26:36 +0000 (08:26 +0000)
The original LU-9964 fix had a problem because with many pages in
memory grouplock unlock takes 10+ seconds just to discard them.

The current patch makes grouplock unlock thread to be not atomic, but
makes a new grouplock enqueue to wait until previous CBPENDING lock
gets destroyed.

HPE-bug-id: LUS-10644
Signed-off-by: Vitaly Fertman <vitaly.fertman@hpe.com>
Change-Id: I7798138b953320c477ce60c4e34eac40ada95a69
Reviewed-on: https://es-gerrit.dev.cray.com/161411
Reviewed-by: Andriy Skulysh <c17819@cray.com>
Reviewed-by: Alexander Boyko <alexander.boyko@hpe.com>
Tested-by: Alexander Lezhoev <alexander.lezhoev@hpe.com>
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/49008
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Alexander <alexander.boyko@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_request.c
lustre/mdc/mdc_dev.c
lustre/osc/osc_lock.c
lustre/osc/osc_request.c

index ff211a5..e40a79b 100644 (file)
@@ -1015,6 +1015,7 @@ enum ldlm_match_flags {
        LDLM_MATCH_AST     = BIT(1),
        LDLM_MATCH_AST_ANY = BIT(2),
        LDLM_MATCH_RIGHT   = BIT(3),
+       LDLM_MATCH_GROUP   = BIT(4),
 };
 
 /**
index b1b00d3..58ae27f 100644 (file)
@@ -393,6 +393,7 @@ static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
                return 0;
        }
        ldlm_set_destroyed(lock);
+       wake_up(&lock->l_waitq);
 
        if (lock->l_export && lock->l_export->exp_lock_hash) {
                /* NB: it's safe to call cfs_hash_del() even lock isn't
@@ -1176,10 +1177,12 @@ static bool lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
         * whose parents already hold a lock so forward progress
         * can still happen. */
        if (ldlm_is_cbpending(lock) &&
-           !(data->lmd_flags & LDLM_FL_CBPENDING))
+           !(data->lmd_flags & LDLM_FL_CBPENDING) &&
+           !(data->lmd_match & LDLM_MATCH_GROUP))
                return false;
 
-       if (!(data->lmd_match & LDLM_MATCH_UNREF) && ldlm_is_cbpending(lock) &&
+       if (!(data->lmd_match & (LDLM_MATCH_UNREF | LDLM_MATCH_GROUP)) &&
+           ldlm_is_cbpending(lock) &&
            lock->l_readers == 0 && lock->l_writers == 0)
                return false;
 
@@ -1242,7 +1245,12 @@ static bool lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
                return false;
 
 matched:
-       if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
+       /**
+        * In case the lock is a CBPENDING grouplock, just pin it and return,
+        * we need to wait until it gets to DESTROYED.
+        */
+       if ((data->lmd_flags & LDLM_FL_TEST_LOCK) ||
+           (ldlm_is_cbpending(lock) && (data->lmd_match & LDLM_MATCH_GROUP))) {
                LDLM_LOCK_GET(lock);
                ldlm_lock_touch_in_lru(lock);
        } else {
@@ -1424,6 +1432,7 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
        };
        struct ldlm_resource *res;
        struct ldlm_lock *lock;
+       struct ldlm_lock *group_lock;
        int matched;
 
        ENTRY;
@@ -1444,6 +1453,8 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                RETURN(0);
        }
 
+repeat:
+       group_lock = NULL;
        LDLM_RESOURCE_ADDREF(res);
        lock_res(res);
        if (res->lr_type == LDLM_EXTENT)
@@ -1453,8 +1464,19 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
        if (!lock && !(flags & LDLM_FL_BLOCK_GRANTED))
                lock = search_queue(&res->lr_waiting, &data);
        matched = lock ? mode : 0;
+
+       if (lock && ldlm_is_cbpending(lock) &&
+           (data.lmd_match & LDLM_MATCH_GROUP))
+               group_lock = lock;
        unlock_res(res);
        LDLM_RESOURCE_DELREF(res);
+
+       if (group_lock) {
+               l_wait_event_abortable(group_lock->l_waitq,
+                                      ldlm_is_destroyed(lock));
+               LDLM_LOCK_RELEASE(lock);
+               goto repeat;
+       }
        ldlm_resource_putref(res);
 
        if (lock) {
index 62595ac..8d609d1 100644 (file)
@@ -1032,8 +1032,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        lock->l_conn_export = exp;
        lock->l_export = NULL;
        lock->l_blocking_ast = einfo->ei_cb_bl;
-       lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL |
-                                   LDLM_FL_ATOMIC_CB));
+       lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
        lock->l_activity = ktime_get_real_seconds();
 
        /* lock not sent to server yet */
index 4f9e3cc..339d18a 100644 (file)
@@ -714,7 +714,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        struct ldlm_intent *lit;
        enum ldlm_mode mode;
        bool glimpse = *flags & LDLM_FL_HAS_INTENT;
-       __u64 match_flags = *flags;
+       __u64 search_flags = *flags;
+       __u64 match_flags = 0;
        LIST_HEAD(cancels);
        int rc, count;
        int lvb_size;
@@ -726,11 +727,14 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        if (einfo->ei_mode == LCK_PR)
                mode |= LCK_PW;
 
-       match_flags |= LDLM_FL_LVB_READY;
+       search_flags |= LDLM_FL_LVB_READY;
        if (glimpse)
-               match_flags |= LDLM_FL_BLOCK_GRANTED;
-       mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
-                              einfo->ei_type, policy, mode, &lockh);
+               search_flags |= LDLM_FL_BLOCK_GRANTED;
+       if (mode == LCK_GROUP)
+               match_flags = LDLM_MATCH_GROUP;
+       mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
+                                        res_id, einfo->ei_type, policy, mode,
+                                        &lockh, match_flags);
        if (mode) {
                struct ldlm_lock *matched;
 
@@ -973,8 +977,6 @@ int mdc_lock_init(const struct lu_env *env, struct cl_object *obj,
 
        ols->ols_flags = flags;
        ols->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
-       if (lock->cll_descr.cld_mode == CLM_GROUP)
-               ols->ols_flags |= LDLM_FL_ATOMIC_CB;
 
        if (ols->ols_flags & LDLM_FL_HAS_INTENT) {
                ols->ols_flags |= LDLM_FL_BLOCK_GRANTED;
index 1ebd516..6453b31 100644 (file)
@@ -1233,8 +1233,6 @@ int osc_lock_init(const struct lu_env *env,
 
        oscl->ols_flags = osc_enq2ldlm_flags(enqflags);
        oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
-       if (lock->cll_descr.cld_mode == CLM_GROUP)
-               oscl->ols_flags |= LDLM_FL_ATOMIC_CB;
 
        if (oscl->ols_flags & LDLM_FL_HAS_INTENT) {
                oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;
index 57bc02a..f7323c8 100644 (file)
@@ -2994,7 +2994,8 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        struct lustre_handle lockh = { 0 };
        struct ptlrpc_request *req = NULL;
        int intent = *flags & LDLM_FL_HAS_INTENT;
-       __u64 match_flags = *flags;
+       __u64 search_flags = *flags;
+       __u64 match_flags = 0;
        enum ldlm_mode mode;
        int rc;
        ENTRY;
@@ -3023,11 +3024,14 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
         * matching a lock; speculative lock requests do not need to,
         * because they will not actually use the lock. */
        if (!speculative)
-               match_flags |= LDLM_FL_LVB_READY;
+               search_flags |= LDLM_FL_LVB_READY;
        if (intent != 0)
-               match_flags |= LDLM_FL_BLOCK_GRANTED;
-       mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
-                              einfo->ei_type, policy, mode, &lockh);
+               search_flags |= LDLM_FL_BLOCK_GRANTED;
+       if (mode == LCK_GROUP)
+               match_flags = LDLM_MATCH_GROUP;
+       mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
+                                        res_id, einfo->ei_type, policy, mode,
+                                        &lockh, match_flags);
        if (mode) {
                struct ldlm_lock *matched;