Whamcloud - gitweb
LU-12681 osc: wrong cache of LVB attrs
[fs/lustre-release.git] / lustre / osc / osc_lock.c
index ea7648b..688cc7b 100644 (file)
@@ -37,7 +37,6 @@
 
 #define DEBUG_SUBSYSTEM S_OSC
 
-#include <libcfs/libcfs.h>
 /* fid_build_reg_res_name() */
 #include <lustre_fid.h>
 #include <lustre_osc.h>
@@ -107,7 +106,7 @@ static int osc_lock_invariant(struct osc_lock *ols)
 
        if (! ergo(ols->ols_state == OLS_GRANTED,
                   olock != NULL &&
-                  olock->l_req_mode == olock->l_granted_mode &&
+                  ldlm_is_granted(olock) &&
                   ols->ols_hold))
                return 0;
        return 1;
@@ -145,9 +144,6 @@ static void osc_lock_build_policy(const struct lu_env *env,
  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
  * logic.
  *
- * This can be optimized to not update attributes when lock is a result of a
- * local match.
- *
  * Called under lock and resource spin-locks.
  */
 static void osc_lock_lvb_update(const struct lu_env *env,
@@ -203,7 +199,7 @@ static void osc_lock_lvb_update(const struct lu_env *env,
 }
 
 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
-                            struct lustre_handle *lockh, bool lvb_update)
+                            struct lustre_handle *lockh)
 {
        struct ldlm_lock *dlmlock;
 
@@ -231,7 +227,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
 
        /* Lock must have been granted. */
        lock_res_and_lock(dlmlock);
-       if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
+       if (ldlm_is_granted(dlmlock)) {
                struct ldlm_extent *ext = &dlmlock->l_policy_data.l_extent;
                struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr;
 
@@ -243,10 +239,11 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
                descr->cld_gid   = ext->gid;
 
                /* no lvb update for matched lock */
-               if (lvb_update) {
+               if (!ldlm_is_lvb_cached(dlmlock)) {
                        LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
                        osc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj),
                                            dlmlock, NULL);
+                       ldlm_set_lvb_cached(dlmlock);
                }
                LINVRNT(osc_lock_invariant(oscl));
        }
@@ -286,7 +283,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh,
        }
 
        if (rc == 0)
-               osc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK);
+               osc_lock_granted(env, oscl, lockh);
 
        /* Error handling, some errors are tolerable. */
        if (oscl->ols_locklessable && rc == -EUSERS) {
@@ -340,9 +337,10 @@ static int osc_lock_upcall_speculative(void *cookie,
        LASSERT(dlmlock != NULL);
 
        lock_res_and_lock(dlmlock);
-       LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode);
+       LASSERT(ldlm_is_granted(dlmlock));
 
-       /* there is no osc_lock associated with speculative locks */
+       /* there is no osc_lock associated with speculative locks
+        * thus no need to set LDLM_FL_LVB_CACHED */
        osc_lock_lvb_update(env, osc, dlmlock, NULL);
 
        unlock_res_and_lock(dlmlock);
@@ -378,7 +376,12 @@ static int osc_lock_flush(struct osc_object *obj, pgoff_t start, pgoff_t end,
                        rc = 0;
        }
 
-       rc2 = osc_lock_discard_pages(env, obj, start, end, discard);
+       /*
+        * Do not try to match other locks with CLM_WRITE since we already
+        * know there're none
+        */
+       rc2 = osc_lock_discard_pages(env, obj, start, end,
+                                    mode == CLM_WRITE || discard);
        if (rc == 0 && rc2 < 0)
                rc = rc2;
 
@@ -403,7 +406,7 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env,
        LASSERT(flag == LDLM_CB_CANCELING);
 
        lock_res_and_lock(dlmlock);
-       if (dlmlock->l_granted_mode != dlmlock->l_req_mode) {
+       if (!ldlm_is_granted(dlmlock)) {
                dlmlock->l_ast_data = NULL;
                unlock_res_and_lock(dlmlock);
                RETURN(0);
@@ -550,6 +553,10 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
        struct ost_lvb          *lvb;
        struct req_capsule      *cap;
        struct cl_object        *obj = NULL;
+       struct ldlm_resource    *res = dlmlock->l_resource;
+       struct ldlm_match_data  matchdata = { 0 };
+       union ldlm_policy_data  policy;
+       enum ldlm_mode          mode = LCK_PW | LCK_GROUP | LCK_PR;
        int                     result;
        __u16                   refcheck;
 
@@ -561,13 +568,40 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
        if (IS_ERR(env))
                GOTO(out, result = PTR_ERR(env));
 
+       policy.l_extent.start = 0;
+       policy.l_extent.end = LUSTRE_EOF;
 
-       lock_res_and_lock(dlmlock);
-       if (dlmlock->l_ast_data != NULL) {
-               obj = osc2cl(dlmlock->l_ast_data);
-               cl_object_get(obj);
+       matchdata.lmd_mode = &mode;
+       matchdata.lmd_policy = &policy;
+       matchdata.lmd_flags = LDLM_FL_TEST_LOCK | LDLM_FL_CBPENDING;
+       matchdata.lmd_unref = 1;
+       matchdata.lmd_has_ast_data = true;
+
+       LDLM_LOCK_GET(dlmlock);
+
+       /* If any dlmlock has l_ast_data set, we must find it or we risk
+        * missing a size update done under a different lock.
+        */
+       while (dlmlock) {
+               lock_res_and_lock(dlmlock);
+               if (dlmlock->l_ast_data) {
+                       obj = osc2cl(dlmlock->l_ast_data);
+                       cl_object_get(obj);
+               }
+               unlock_res_and_lock(dlmlock);
+               LDLM_LOCK_PUT(dlmlock);
+
+               dlmlock = NULL;
+
+               if (obj == NULL && res->lr_type == LDLM_EXTENT) {
+                       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA))
+                               break;
+
+                       lock_res(res);
+                       dlmlock = search_itree(res, &matchdata);
+                       unlock_res(res);
+               }
        }
-       unlock_res_and_lock(dlmlock);
 
        if (obj != NULL) {
                /* Do not grab the mutex of cl_lock for glimpse.
@@ -612,9 +646,8 @@ static int weigh_cb(const struct lu_env *env, struct cl_io *io,
 {
        struct cl_page *page = ops->ops_cl.cpl_page;
 
-       if (cl_page_is_vmlocked(env, page)
-           || PageDirty(page->cp_vmpage) || PageWriteback(page->cp_vmpage)
-          )
+       if (cl_page_is_vmlocked(env, page) || PageDirty(page->cp_vmpage) ||
+           PageWriteback(page->cp_vmpage))
                return CLP_GANG_ABORT;
 
        *(pgoff_t *)cbdata = osc_index(ops) + 1;
@@ -623,12 +656,13 @@ static int weigh_cb(const struct lu_env *env, struct cl_io *io,
 
 static unsigned long osc_lock_weight(const struct lu_env *env,
                                     struct osc_object *oscobj,
-                                    struct ldlm_extent *extent)
+                                    loff_t start, loff_t end)
 {
-       struct cl_io     *io = osc_env_thread_io(env);
+       struct cl_io *io = osc_env_thread_io(env);
        struct cl_object *obj = cl_object_top(&oscobj->oo_cl);
-       pgoff_t          page_index;
-       int              result;
+       pgoff_t page_index;
+       int result;
+
        ENTRY;
 
        io->ci_obj = obj;
@@ -637,11 +671,10 @@ static unsigned long osc_lock_weight(const struct lu_env *env,
        if (result != 0)
                RETURN(result);
 
-       page_index = cl_index(obj, extent->start);
+       page_index = cl_index(obj, start);
        do {
                result = osc_page_gang_lookup(env, io, oscobj,
-                                             page_index,
-                                             cl_index(obj, extent->end),
+                                             page_index, cl_index(obj, end),
                                              weigh_cb, (void *)&page_index);
                if (result == CLP_GANG_ABORT)
                        break;
@@ -658,12 +691,13 @@ static unsigned long osc_lock_weight(const struct lu_env *env,
  */
 unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
 {
-       struct lu_env           *env;
-       struct osc_object       *obj;
-       struct osc_lock         *oscl;
-       unsigned long            weight;
-       bool                    found = false;
-       __u16                   refcheck;
+       struct lu_env *env;
+       struct osc_object *obj;
+       struct osc_lock *oscl;
+       unsigned long weight;
+       bool found = false;
+       __u16 refcheck;
+
        ENTRY;
 
        might_sleep();
@@ -679,7 +713,9 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
                /* Mostly because lack of memory, do not eliminate this lock */
                RETURN(1);
 
-       LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT);
+       LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT ||
+               dlmlock->l_resource->lr_type == LDLM_IBITS);
+
        lock_res_and_lock(dlmlock);
        obj = dlmlock->l_ast_data;
        if (obj)
@@ -691,9 +727,10 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
 
        spin_lock(&obj->oo_ol_spin);
        list_for_each_entry(oscl, &obj->oo_ol_list, ols_nextlock_oscobj) {
-               if (oscl->ols_dlmlock != NULL && oscl->ols_dlmlock != dlmlock)
-                       continue;
-               found = true;
+               if (oscl->ols_dlmlock == dlmlock) {
+                       found = true;
+                       break;
+               }
        }
        spin_unlock(&obj->oo_ol_spin);
        if (found) {
@@ -703,7 +740,18 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
                GOTO(out, weight = 1);
        }
 
-       weight = osc_lock_weight(env, obj, &dlmlock->l_policy_data.l_extent);
+       if (dlmlock->l_resource->lr_type == LDLM_EXTENT)
+               weight = osc_lock_weight(env, obj,
+                                        dlmlock->l_policy_data.l_extent.start,
+                                        dlmlock->l_policy_data.l_extent.end);
+       else if (ldlm_has_dom(dlmlock))
+               weight = osc_lock_weight(env, obj, 0, OBD_OBJECT_EOF);
+       /* The DOM bit can be cancelled at any time; in that case, we know
+        * there are no pages, so just return weight of 0
+        */
+       else
+               weight = 0;
+
        EXIT;
 
 out:
@@ -713,6 +761,7 @@ out:
        cl_env_put(env, &refcheck);
        return weight;
 }
+EXPORT_SYMBOL(osc_ldlm_weigh_ast);
 
 static void osc_lock_build_einfo(const struct lu_env *env,
                                 const struct cl_lock *lock,
@@ -859,7 +908,7 @@ restart:
                        continue;
 
                /* wait for conflicting lock to be canceled */
-               cl_sync_io_init(waiter, 1, cl_sync_io_end);
+               cl_sync_io_init(waiter, 1);
                oscl->ols_owner = waiter;
 
                spin_lock(&tmp_oscl->ols_lock);
@@ -985,7 +1034,6 @@ enqueue_base:
        }
        result = osc_enqueue_base(exp, resname, &oscl->ols_flags,
                                  policy, &oscl->ols_lvb,
-                                 osc->oo_oinfo->loi_kms_valid,
                                  upcall, cookie,
                                  &oscl->ols_einfo, PTLRPCD_SET, async,
                                  oscl->ols_speculative);
@@ -1135,9 +1183,9 @@ void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io,
                return;
 
        if (likely(io->ci_type == CIT_WRITE)) {
-               io_start = cl_index(obj, io->u.ci_rw.rw_range.cir_pos);
-               io_end = cl_index(obj, io->u.ci_rw.rw_range.cir_pos +
-                                 io->u.ci_rw.rw_range.cir_count - 1);
+               io_start = cl_index(obj, io->u.ci_rw.crw_pos);
+               io_end = cl_index(obj, io->u.ci_rw.crw_pos +
+                                               io->u.ci_rw.crw_count - 1);
        } else {
                LASSERT(cl_io_is_mkwrite(io));
                io_start = io_end = io->u.ci_fault.ft_index;
@@ -1181,6 +1229,8 @@ int osc_lock_init(const struct lu_env *env,
 
        oscl->ols_flags = osc_enq2ldlm_flags(enqflags);
        oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
+       if (lock->cll_descr.cld_mode == CLM_GROUP)
+               oscl->ols_flags |= LDLM_FL_ATOMIC_CB;
 
        if (oscl->ols_flags & LDLM_FL_HAS_INTENT) {
                oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;