Whamcloud - gitweb
LU-12681 osc: wrong cache of LVB attrs, part2 40/40740/3
authorVitaly Fertman <c17818@cray.com>
Wed, 11 Sep 2019 15:22:23 +0000 (18:22 +0300)
committerOleg Drokin <green@whamcloud.com>
Wed, 17 Mar 2021 23:21:13 +0000 (23:21 +0000)
It may happen that osc oinfo lvb cache has size < kms.

It occurs if a reply re-ordering happens and an older size is applied
to oinfo unconditionally.

Another possibility is RA, when osc_match_base() attaches the dlm lock
to osc object but does not cache the lvb. The next layout change will
overwrites the lock lvb by the oinfo cache (previous LUS-7731 fix),
presumably smaller values. Therefore, the next lock re-use may run
into a problem with partial page write which thinks the preliminary
read is not needed.

Do not let the cached oinfo lvb size to become less than kms.
Also, cache the lock's lvb in the oinfo on osc_match_base().

Lustre-change: https://review.whamcloud.com/36200
Lustre-commit: 40319db5bc649adaf3dad066e2c1bb49f7f1c04a

Change-Id: I50136f57491364146ce7b6a81b814e474e3edb86
Signed-off-by: Mikhail Pershin <mpershin@whamcloud.com>
Reviewed-on: https://review.whamcloud.com/40740
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/mdc/mdc_dev.c
lustre/osc/osc_internal.h
lustre/osc/osc_lock.c
lustre/osc/osc_object.c
lustre/osc/osc_request.c

index 4eabf06..3606778 100644 (file)
@@ -67,21 +67,17 @@ static void mdc_lock_lvb_update(const struct lu_env *env,
                                struct ldlm_lock *dlmlock,
                                struct ost_lvb *lvb);
 
-static int mdc_set_dom_lock_data(const struct lu_env *env,
-                                struct ldlm_lock *lock, void *data)
+static int mdc_set_dom_lock_data(struct ldlm_lock *lock, void *data)
 {
-       struct osc_object *obj = data;
        int set = 0;
 
        LASSERT(lock != NULL);
        LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
 
        lock_res_and_lock(lock);
-       if (lock->l_ast_data == NULL) {
-               lock->l_ast_data = data;
-               mdc_lock_lvb_update(env, obj, lock, NULL);
-       }
 
+       if (lock->l_ast_data == NULL)
+               lock->l_ast_data = data;
        if (lock->l_ast_data == data)
                set = 1;
 
@@ -91,9 +87,9 @@ static int mdc_set_dom_lock_data(const struct lu_env *env,
 }
 
 int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp,
-                      struct ldlm_res_id *res_id,
-                      enum ldlm_type type, union ldlm_policy_data *policy,
-                      enum ldlm_mode mode, __u64 *flags, void *data,
+                      struct ldlm_res_id *res_id, enum ldlm_type type,
+                      union ldlm_policy_data *policy, enum ldlm_mode mode,
+                      __u64 *flags, struct osc_object *obj,
                       struct lustre_handle *lockh, int unref)
 {
        struct obd_device *obd = exp->exp_obd;
@@ -107,11 +103,19 @@ int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp,
        if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
                RETURN(rc);
 
-       if (data != NULL) {
+       if (obj != NULL) {
                struct ldlm_lock *lock = ldlm_handle2lock(lockh);
 
                LASSERT(lock != NULL);
-               if (!mdc_set_dom_lock_data(env, lock, data)) {
+               if (mdc_set_dom_lock_data(lock, obj)) {
+                       lock_res_and_lock(lock);
+                       if (!ldlm_is_lvb_cached(lock)) {
+                               LASSERT(lock->l_ast_data == obj);
+                               mdc_lock_lvb_update(env, obj, lock, NULL);
+                               ldlm_set_lvb_cached(lock);
+                       }
+                       unlock_res_and_lock(lock);
+               } else {
                        ldlm_lock_decref(lockh, rc);
                        rc = 0;
                }
@@ -408,6 +412,7 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc,
        struct cl_attr *attr = &osc_env_info(env)->oti_attr;
        unsigned valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME |
                         CAT_SIZE;
+       unsigned int setkms = 0;
 
        ENTRY;
 
@@ -425,16 +430,22 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc,
                size = lvb->lvb_size;
 
                if (size >= oinfo->loi_kms) {
-                       LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu,"
-                                  " kms=%llu", lvb->lvb_size, size);
                        valid |= CAT_KMS;
                        attr->cat_kms = size;
-               } else {
-                       LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu,"
-                                  " leaving kms=%llu",
-                                  lvb->lvb_size, oinfo->loi_kms);
+                       setkms = 1;
                }
        }
+
+       /* The size should not be less than the kms */
+       if (attr->cat_size < oinfo->loi_kms)
+               attr->cat_size = oinfo->loi_kms;
+
+       LDLM_DEBUG(dlmlock, "acquired size %llu, setting rss=%llu;%s "
+                  "kms=%llu, end=%llu", lvb->lvb_size, attr->cat_size,
+                  setkms ? "" : " leaving",
+                  setkms ? attr->cat_kms : oinfo->loi_kms,
+                  dlmlock ? dlmlock->l_policy_data.l_extent.end : -1ull);
+
        cl_object_attr_update(env, obj, attr, valid);
        cl_object_attr_unlock(obj);
        EXIT;
@@ -443,6 +454,7 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc,
 static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
                             struct lustre_handle *lockh)
 {
+       struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj);
        struct ldlm_lock *dlmlock;
 
        ENTRY;
@@ -483,8 +495,8 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
                /* no lvb update for matched lock */
                if (!ldlm_is_lvb_cached(dlmlock)) {
                        LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
-                       mdc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj),
-                                           dlmlock, NULL);
+                       LASSERT(osc == dlmlock->l_ast_data);
+                       mdc_lock_lvb_update(env, osc, dlmlock, NULL);
                        ldlm_set_lvb_cached(dlmlock);
                }
        }
@@ -711,7 +723,7 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GLIMPSE_DDOS))
                        ldlm_set_kms_ignore(matched);
 
-               if (mdc_set_dom_lock_data(env, matched, einfo->ei_cbdata)) {
+               if (mdc_set_dom_lock_data(matched, einfo->ei_cbdata)) {
                        *flags |= LDLM_FL_LVB_READY;
 
                        /* We already have a lock, and it's referenced. */
@@ -1385,14 +1397,19 @@ static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data)
 
                /* Updates lvb in lock by the cached oinfo */
                oinfo = osc->oo_oinfo;
+
+               LDLM_DEBUG(lock, "update lock size %llu blocks %llu [cma]time: "
+                          "%llu %llu %llu by oinfo size %llu blocks %llu "
+                          "[cma]time %llu %llu %llu", lvb->lvb_size,
+                          lvb->lvb_blocks, lvb->lvb_ctime, lvb->lvb_mtime,
+                          lvb->lvb_atime, oinfo->loi_lvb.lvb_size,
+                          oinfo->loi_lvb.lvb_blocks, oinfo->loi_lvb.lvb_ctime,
+                          oinfo->loi_lvb.lvb_mtime, oinfo->loi_lvb.lvb_atime);
+               LASSERT(oinfo->loi_lvb.lvb_size >= oinfo->loi_kms);
+
                cl_object_attr_lock(&osc->oo_cl);
                memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb));
                cl_object_attr_unlock(&osc->oo_cl);
-
-               LDLM_DEBUG(lock, "update lvb size %llu blocks %llu [cma]time: "
-                          "%llu %llu %llu", lvb->lvb_size, lvb->lvb_blocks,
-                          lvb->lvb_ctime, lvb->lvb_mtime, lvb->lvb_atime);
-
                ldlm_clear_lvb_cached(lock);
        }
        RETURN(LDLM_ITER_CONTINUE);
@@ -1418,7 +1435,7 @@ static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj,
         * lock with DOM bit and it may have no l_ast_data initialized yet,
         * so init it here with given osc_object.
         */
-       mdc_set_dom_lock_data(env, lock, cl2osc(obj));
+       mdc_set_dom_lock_data(lock, cl2osc(obj));
        RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING));
 }
 
index 9dc800e..519a4d1 100644 (file)
@@ -53,6 +53,11 @@ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
 
 extern struct ptlrpc_request_set *PTLRPCD_SET;
 
+void osc_lock_lvb_update(const struct lu_env *env,
+                        struct osc_object *osc,
+                        struct ldlm_lock *dlmlock,
+                        struct ost_lvb *lvb);
+
 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                     __u64 *flags, union ldlm_policy_data *policy,
                     struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
@@ -60,9 +65,10 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                     struct ptlrpc_request_set *rqset, int async,
                     bool speculative);
 
-int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
-                  enum ldlm_type type, union ldlm_policy_data *policy,
-                  enum ldlm_mode mode, __u64 *flags, void *data,
+int osc_match_base(const struct lu_env *env, struct obd_export *exp,
+                  struct ldlm_res_id *res_id, enum ldlm_type type,
+                  union ldlm_policy_data *policy, enum ldlm_mode mode,
+                  __u64 *flags, struct osc_object *obj,
                   struct lustre_handle *lockh, int unref);
 
 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
index 72893d0..4c85e86 100644 (file)
@@ -146,15 +146,15 @@ static void osc_lock_build_policy(const struct lu_env *env,
  *
  * Called under lock and resource spin-locks.
  */
-static void osc_lock_lvb_update(const struct lu_env *env,
-                               struct osc_object *osc,
-                               struct ldlm_lock *dlmlock,
-                               struct ost_lvb *lvb)
+void osc_lock_lvb_update(const struct lu_env *env,
+                        struct osc_object *osc,
+                        struct ldlm_lock *dlmlock,
+                        struct ost_lvb *lvb)
 {
-       struct cl_object  *obj = osc2cl(osc);
-       struct lov_oinfo  *oinfo = osc->oo_oinfo;
-       struct cl_attr    *attr = &osc_env_info(env)->oti_attr;
-       unsigned           valid;
+       struct cl_object *obj = osc2cl(osc);
+       struct lov_oinfo *oinfo = osc->oo_oinfo;
+       struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+       unsigned valid, setkms = 0;
 
        ENTRY;
 
@@ -179,19 +179,23 @@ static void osc_lock_lvb_update(const struct lu_env *env,
                 if (size > dlmlock->l_policy_data.l_extent.end)
                         size = dlmlock->l_policy_data.l_extent.end + 1;
                 if (size >= oinfo->loi_kms) {
-                       LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu"
-                                  ", kms=%llu", lvb->lvb_size, size);
                         valid |= CAT_KMS;
                         attr->cat_kms = size;
-                } else {
-                        LDLM_DEBUG(dlmlock, "lock acquired, setting rss="
-                                  "%llu; leaving kms=%llu, end=%llu",
-                                   lvb->lvb_size, oinfo->loi_kms,
-                                   dlmlock->l_policy_data.l_extent.end);
+                       setkms = 1;
                 }
                ldlm_lock_allow_match_locked(dlmlock);
        }
 
+       /* The size should not be less than the kms */
+       if (attr->cat_size < oinfo->loi_kms)
+               attr->cat_size = oinfo->loi_kms;
+
+       LDLM_DEBUG(dlmlock, "acquired size %llu, setting rss=%llu;%s "
+                  "kms=%llu, end=%llu", lvb->lvb_size, attr->cat_size,
+                  setkms ? "" : " leaving",
+                  setkms ? attr->cat_kms : oinfo->loi_kms,
+                  dlmlock ? dlmlock->l_policy_data.l_extent.end : -1ull);
+
        cl_object_attr_update(env, obj, attr, valid);
        cl_object_attr_unlock(obj);
 
@@ -201,6 +205,7 @@ static void osc_lock_lvb_update(const struct lu_env *env,
 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
                             struct lustre_handle *lockh)
 {
+       struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj);
        struct ldlm_lock *dlmlock;
 
        dlmlock = ldlm_handle2lock_long(lockh, 0);
@@ -241,8 +246,8 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
                /* no lvb update for matched lock */
                if (!ldlm_is_lvb_cached(dlmlock)) {
                        LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
-                       osc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj),
-                                           dlmlock, NULL);
+                       LASSERT(osc == dlmlock->l_ast_data);
+                       osc_lock_lvb_update(env, osc, dlmlock, NULL);
                        ldlm_set_lvb_cached(dlmlock);
                }
                LINVRNT(osc_lock_invariant(oscl));
@@ -1285,9 +1290,9 @@ struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env,
         * with a uniq gid and it conflicts with all other lock modes too
         */
 again:
-       mode = osc_match_base(osc_export(obj), resname, LDLM_EXTENT, policy,
-                              LCK_PR | LCK_PW | LCK_GROUP, &flags, obj, &lockh,
-                              dap_flags & OSC_DAP_FL_CANCELING);
+       mode = osc_match_base(env, osc_export(obj), resname, LDLM_EXTENT,
+                             policy, LCK_PR | LCK_PW | LCK_GROUP, &flags,
+                             obj, &lockh, dap_flags & OSC_DAP_FL_CANCELING);
        if (mode != 0) {
                lock = ldlm_handle2lock(&lockh);
                /* RACE: the lock is cancelled so let's try again */
index b6b01a3..a99747c 100644 (file)
@@ -214,14 +214,19 @@ static int osc_object_ast_clear(struct ldlm_lock *lock, void *data)
 
                /* Updates lvb in lock by the cached oinfo */
                oinfo = osc->oo_oinfo;
+
+               LDLM_DEBUG(lock, "update lock size %llu blocks %llu [cma]time: "
+                          "%llu %llu %llu by oinfo size %llu blocks %llu "
+                          "[cma]time %llu %llu %llu", lvb->lvb_size,
+                          lvb->lvb_blocks, lvb->lvb_ctime, lvb->lvb_mtime,
+                          lvb->lvb_atime, oinfo->loi_lvb.lvb_size,
+                          oinfo->loi_lvb.lvb_blocks, oinfo->loi_lvb.lvb_ctime,
+                          oinfo->loi_lvb.lvb_mtime, oinfo->loi_lvb.lvb_atime);
+               LASSERT(oinfo->loi_lvb.lvb_size >= oinfo->loi_kms);
+
                cl_object_attr_lock(&osc->oo_cl);
                memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb));
                cl_object_attr_unlock(&osc->oo_cl);
-
-               LDLM_DEBUG(lock, "update lvb size %llu blocks %llu [cma]time: "
-                          "%llu %llu %llu", lvb->lvb_size, lvb->lvb_blocks,
-                          lvb->lvb_ctime, lvb->lvb_mtime, lvb->lvb_atime);
-
                ldlm_clear_lvb_cached(lock);
        }
        RETURN(LDLM_ITER_CONTINUE);
index 4c0f5af..731ead8 100644 (file)
@@ -2631,9 +2631,10 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        RETURN(rc);
 }
 
-int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
-                  enum ldlm_type type, union ldlm_policy_data *policy,
-                  enum ldlm_mode mode, __u64 *flags, void *data,
+int osc_match_base(const struct lu_env *env, struct obd_export *exp,
+                  struct ldlm_res_id *res_id, enum ldlm_type type,
+                  union ldlm_policy_data *policy, enum ldlm_mode mode,
+                  __u64 *flags, struct osc_object *obj,
                   struct lustre_handle *lockh, int unref)
 {
        struct obd_device *obd = exp->exp_obd;
@@ -2661,11 +2662,19 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
                RETURN(rc);
 
-       if (data != NULL) {
+       if (obj != NULL) {
                struct ldlm_lock *lock = ldlm_handle2lock(lockh);
 
                LASSERT(lock != NULL);
-               if (!osc_set_lock_data(lock, data)) {
+               if (osc_set_lock_data(lock, obj)) {
+                       lock_res_and_lock(lock);
+                       if (!ldlm_is_lvb_cached(lock)) {
+                               LASSERT(lock->l_ast_data == obj);
+                               osc_lock_lvb_update(env, obj, lock, NULL);
+                               ldlm_set_lvb_cached(lock);
+                       }
+                       unlock_res_and_lock(lock);
+               } else {
                        ldlm_lock_decref(lockh, rc);
                        rc = 0;
                }