Whamcloud - gitweb
LU-12681 osc: wrong cache of LVB attrs
[fs/lustre-release.git] / lustre / osc / osc_lock.c
index a5b6aa3..688cc7b 100644 (file)
@@ -144,9 +144,6 @@ static void osc_lock_build_policy(const struct lu_env *env,
  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
  * logic.
  *
  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
  * logic.
  *
- * This can be optimized to not update attributes when lock is a result of a
- * local match.
- *
  * Called under lock and resource spin-locks.
  */
 static void osc_lock_lvb_update(const struct lu_env *env,
  * Called under lock and resource spin-locks.
  */
 static void osc_lock_lvb_update(const struct lu_env *env,
@@ -202,7 +199,7 @@ static void osc_lock_lvb_update(const struct lu_env *env,
 }
 
 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
 }
 
 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
-                            struct lustre_handle *lockh, bool lvb_update)
+                            struct lustre_handle *lockh)
 {
        struct ldlm_lock *dlmlock;
 
 {
        struct ldlm_lock *dlmlock;
 
@@ -242,10 +239,11 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
                descr->cld_gid   = ext->gid;
 
                /* no lvb update for matched lock */
                descr->cld_gid   = ext->gid;
 
                /* no lvb update for matched lock */
-               if (lvb_update) {
+               if (!ldlm_is_lvb_cached(dlmlock)) {
                        LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
                        osc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj),
                                            dlmlock, NULL);
                        LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
                        osc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj),
                                            dlmlock, NULL);
+                       ldlm_set_lvb_cached(dlmlock);
                }
                LINVRNT(osc_lock_invariant(oscl));
        }
                }
                LINVRNT(osc_lock_invariant(oscl));
        }
@@ -285,7 +283,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh,
        }
 
        if (rc == 0)
        }
 
        if (rc == 0)
-               osc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK);
+               osc_lock_granted(env, oscl, lockh);
 
        /* Error handling, some errors are tolerable. */
        if (oscl->ols_locklessable && rc == -EUSERS) {
 
        /* Error handling, some errors are tolerable. */
        if (oscl->ols_locklessable && rc == -EUSERS) {
@@ -341,7 +339,8 @@ static int osc_lock_upcall_speculative(void *cookie,
        lock_res_and_lock(dlmlock);
        LASSERT(ldlm_is_granted(dlmlock));
 
        lock_res_and_lock(dlmlock);
        LASSERT(ldlm_is_granted(dlmlock));
 
-       /* there is no osc_lock associated with speculative locks */
+       /* there is no osc_lock associated with speculative locks
+        * thus no need to set LDLM_FL_LVB_CACHED */
        osc_lock_lvb_update(env, osc, dlmlock, NULL);
 
        unlock_res_and_lock(dlmlock);
        osc_lock_lvb_update(env, osc, dlmlock, NULL);
 
        unlock_res_and_lock(dlmlock);
@@ -554,6 +553,10 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
        struct ost_lvb          *lvb;
        struct req_capsule      *cap;
        struct cl_object        *obj = NULL;
        struct ost_lvb          *lvb;
        struct req_capsule      *cap;
        struct cl_object        *obj = NULL;
+       struct ldlm_resource    *res = dlmlock->l_resource;
+       struct ldlm_match_data  matchdata = { 0 };
+       union ldlm_policy_data  policy;
+       enum ldlm_mode          mode = LCK_PW | LCK_GROUP | LCK_PR;
        int                     result;
        __u16                   refcheck;
 
        int                     result;
        __u16                   refcheck;
 
@@ -565,13 +568,40 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
        if (IS_ERR(env))
                GOTO(out, result = PTR_ERR(env));
 
        if (IS_ERR(env))
                GOTO(out, result = PTR_ERR(env));
 
+       policy.l_extent.start = 0;
+       policy.l_extent.end = LUSTRE_EOF;
 
 
-       lock_res_and_lock(dlmlock);
-       if (dlmlock->l_ast_data != NULL) {
-               obj = osc2cl(dlmlock->l_ast_data);
-               cl_object_get(obj);
+       matchdata.lmd_mode = &mode;
+       matchdata.lmd_policy = &policy;
+       matchdata.lmd_flags = LDLM_FL_TEST_LOCK | LDLM_FL_CBPENDING;
+       matchdata.lmd_unref = 1;
+       matchdata.lmd_has_ast_data = true;
+
+       LDLM_LOCK_GET(dlmlock);
+
+       /* If any dlmlock has l_ast_data set, we must find it or we risk
+        * missing a size update done under a different lock.
+        */
+       while (dlmlock) {
+               lock_res_and_lock(dlmlock);
+               if (dlmlock->l_ast_data) {
+                       obj = osc2cl(dlmlock->l_ast_data);
+                       cl_object_get(obj);
+               }
+               unlock_res_and_lock(dlmlock);
+               LDLM_LOCK_PUT(dlmlock);
+
+               dlmlock = NULL;
+
+               if (obj == NULL && res->lr_type == LDLM_EXTENT) {
+                       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA))
+                               break;
+
+                       lock_res(res);
+                       dlmlock = search_itree(res, &matchdata);
+                       unlock_res(res);
+               }
        }
        }
-       unlock_res_and_lock(dlmlock);
 
        if (obj != NULL) {
                /* Do not grab the mutex of cl_lock for glimpse.
 
        if (obj != NULL) {
                /* Do not grab the mutex of cl_lock for glimpse.
@@ -684,7 +714,8 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
                RETURN(1);
 
        LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT ||
                RETURN(1);
 
        LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT ||
-               ldlm_has_dom(dlmlock));
+               dlmlock->l_resource->lr_type == LDLM_IBITS);
+
        lock_res_and_lock(dlmlock);
        obj = dlmlock->l_ast_data;
        if (obj)
        lock_res_and_lock(dlmlock);
        obj = dlmlock->l_ast_data;
        if (obj)
@@ -709,12 +740,17 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
                GOTO(out, weight = 1);
        }
 
                GOTO(out, weight = 1);
        }
 
-       if (ldlm_has_dom(dlmlock))
-               weight = osc_lock_weight(env, obj, 0, OBD_OBJECT_EOF);
-       else
+       if (dlmlock->l_resource->lr_type == LDLM_EXTENT)
                weight = osc_lock_weight(env, obj,
                                         dlmlock->l_policy_data.l_extent.start,
                                         dlmlock->l_policy_data.l_extent.end);
                weight = osc_lock_weight(env, obj,
                                         dlmlock->l_policy_data.l_extent.start,
                                         dlmlock->l_policy_data.l_extent.end);
+       else if (ldlm_has_dom(dlmlock))
+               weight = osc_lock_weight(env, obj, 0, OBD_OBJECT_EOF);
+       /* The DOM bit can be cancelled at any time; in that case, we know
+        * there are no pages, so just return weight of 0
+        */
+       else
+               weight = 0;
 
        EXIT;
 
 
        EXIT;
 
@@ -998,7 +1034,6 @@ enqueue_base:
        }
        result = osc_enqueue_base(exp, resname, &oscl->ols_flags,
                                  policy, &oscl->ols_lvb,
        }
        result = osc_enqueue_base(exp, resname, &oscl->ols_flags,
                                  policy, &oscl->ols_lvb,
-                                 osc->oo_oinfo->loi_kms_valid,
                                  upcall, cookie,
                                  &oscl->ols_einfo, PTLRPCD_SET, async,
                                  oscl->ols_speculative);
                                  upcall, cookie,
                                  &oscl->ols_einfo, PTLRPCD_SET, async,
                                  oscl->ols_speculative);
@@ -1194,6 +1229,8 @@ int osc_lock_init(const struct lu_env *env,
 
        oscl->ols_flags = osc_enq2ldlm_flags(enqflags);
        oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
 
        oscl->ols_flags = osc_enq2ldlm_flags(enqflags);
        oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
+       if (lock->cll_descr.cld_mode == CLM_GROUP)
+               oscl->ols_flags |= LDLM_FL_ATOMIC_CB;
 
        if (oscl->ols_flags & LDLM_FL_HAS_INTENT) {
                oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;
 
        if (oscl->ols_flags & LDLM_FL_HAS_INTENT) {
                oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;