Whamcloud - gitweb
LU-12681 osc: wrong cache of LVB attrs
[fs/lustre-release.git] / lustre / mdc / mdc_dev.c
index 1adc457..b85481e 100644 (file)
@@ -261,7 +261,9 @@ static int mdc_lock_flush(const struct lu_env *env, struct osc_object *obj,
                        result = 0;
        }
 
-       rc = mdc_lock_discard_pages(env, obj, start, end, discard);
+       /* Avoid lock matching with CLM_WRITE, there can be no other locks */
+       rc = mdc_lock_discard_pages(env, obj, start, end,
+                                   mode == CLM_WRITE || discard);
        if (result == 0 && rc < 0)
                result = rc;
 
@@ -292,7 +294,7 @@ void mdc_lock_lockless_cancel(const struct lu_env *env,
  */
 static int mdc_dlm_blocking_ast0(const struct lu_env *env,
                                 struct ldlm_lock *dlmlock,
-                                void *data, int flag)
+                                int flag)
 {
        struct cl_object *obj = NULL;
        int result = 0;
@@ -320,7 +322,6 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env,
                dlmlock->l_ast_data = NULL;
                cl_object_get(obj);
        }
-       ldlm_set_kms_ignore(dlmlock);
        unlock_res_and_lock(dlmlock);
 
        /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or
@@ -384,7 +385,7 @@ int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
                        break;
                }
 
-               rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag);
+               rc = mdc_dlm_blocking_ast0(env, dlmlock, flag);
                cl_env_put(env, &refcheck);
                break;
        }
@@ -443,7 +444,7 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc,
 }
 
 static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
-                            struct lustre_handle *lockh, bool lvb_update)
+                            struct lustre_handle *lockh)
 {
        struct ldlm_lock *dlmlock;
 
@@ -483,10 +484,11 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
                descr->cld_end = CL_PAGE_EOF;
 
                /* no lvb update for matched lock */
-               if (lvb_update) {
+               if (!ldlm_is_lvb_cached(dlmlock)) {
                        LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
                        mdc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj),
                                            dlmlock, NULL);
+                       ldlm_set_lvb_cached(dlmlock);
                }
        }
        unlock_res_and_lock(dlmlock);
@@ -527,7 +529,7 @@ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh,
 
        CDEBUG(D_INODE, "rc %d, err %d\n", rc, errcode);
        if (rc == 0)
-               mdc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK);
+               mdc_lock_granted(env, oscl, lockh);
 
        /* Error handling, some errors are tolerable. */
        if (oscl->ols_locklessable && rc == -EUSERS) {
@@ -686,7 +688,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        enum ldlm_mode mode;
        bool glimpse = *flags & LDLM_FL_HAS_INTENT;
        __u64 match_flags = *flags;
-       int rc;
+       struct list_head cancels = LIST_HEAD_INIT(cancels);
+       int rc, count;
 
        ENTRY;
 
@@ -700,10 +703,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
         * LVB information, e.g. canceled locks or locks of just pruned object,
         * such locks should be skipped.
         */
-       mode = ldlm_lock_match_with_skip(obd->obd_namespace, match_flags,
-                                        LDLM_FL_KMS_IGNORE, res_id,
-                                        einfo->ei_type, policy, mode,
-                                        &lockh, 0);
+       mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
+                              einfo->ei_type, policy, mode, &lockh, 0);
        if (mode) {
                struct ldlm_lock *matched;
 
@@ -711,13 +712,6 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                        RETURN(ELDLM_OK);
 
                matched = ldlm_handle2lock(&lockh);
-               /* this shouldn't happen but this check is kept to make
-                * related test fail if problem occurs
-                */
-               if (unlikely(ldlm_is_kms_ignore(matched))) {
-                       LDLM_ERROR(matched, "matched lock has KMS ignore flag");
-                       goto no_match;
-               }
 
                if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GLIMPSE_DDOS))
                        ldlm_set_kms_ignore(matched);
@@ -732,7 +726,6 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                        LDLM_LOCK_PUT(matched);
                        RETURN(ELDLM_OK);
                }
-no_match:
                ldlm_lock_decref(&lockh, mode);
                LDLM_LOCK_PUT(matched);
        }
@@ -744,7 +737,15 @@ no_match:
        if (req == NULL)
                RETURN(-ENOMEM);
 
-       rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+       /* For WRITE lock cancel other locks on resource early if any */
+       if (einfo->ei_mode & LCK_PW)
+               count = mdc_resource_get_unused_res(exp, res_id, &cancels,
+                                                   einfo->ei_mode,
+                                                   MDS_INODELOCK_DOM);
+       else
+               count = 0;
+
+       rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
        if (rc < 0) {
                ptlrpc_request_free(req);
                RETURN(rc);
@@ -768,8 +769,7 @@ no_match:
                if (!rc) {
                        struct osc_enqueue_args *aa;
 
-                       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-                       aa = ptlrpc_req_async_args(req);
+                       aa = ptlrpc_req_async_args(aa, req);
                        aa->oa_exp = exp;
                        aa->oa_mode = einfo->ei_mode;
                        aa->oa_type = einfo->ei_type;
@@ -1238,8 +1238,7 @@ static int mdc_io_data_version_start(const struct lu_env *env,
        ptlrpc_request_set_replen(req);
 
        req->rq_interpret_reply = mdc_data_version_interpret;
-       CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args));
-       dva = ptlrpc_req_async_args(req);
+       dva = ptlrpc_req_async_args(dva, req);
        dva->dva_oio = oio;
 
        ptlrpcd_add_req(req);
@@ -1277,13 +1276,13 @@ static void mdc_io_data_version_end(const struct lu_env *env,
 static struct cl_io_operations mdc_io_ops = {
        .op = {
                [CIT_READ] = {
-                       .cio_iter_init = osc_io_iter_init,
-                       .cio_iter_fini = osc_io_iter_fini,
+                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start     = osc_io_read_start,
                },
                [CIT_WRITE] = {
-                       .cio_iter_init = osc_io_write_iter_init,
-                       .cio_iter_fini = osc_io_write_iter_fini,
+                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start     = osc_io_write_start,
                        .cio_end       = osc_io_end,
                },
@@ -1382,11 +1381,30 @@ static int mdc_attr_get(const struct lu_env *env, struct cl_object *obj,
 
 static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data)
 {
+       struct osc_object *osc = (struct osc_object *)data;
+       struct ost_lvb *lvb = &lock->l_ost_lvb;
+       struct lov_oinfo *oinfo;
        ENTRY;
 
-       if (lock->l_ast_data == data)
+       if (lock->l_ast_data == data) {
                lock->l_ast_data = NULL;
-       ldlm_set_kms_ignore(lock);
+
+               LASSERT(osc != NULL);
+               LASSERT(osc->oo_oinfo != NULL);
+               LASSERT(lvb != NULL);
+
+               /* Updates lvb in lock by the cached oinfo */
+               oinfo = osc->oo_oinfo;
+               cl_object_attr_lock(&osc->oo_cl);
+               memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb));
+               cl_object_attr_unlock(&osc->oo_cl);
+
+               LDLM_DEBUG(lock, "update lvb size %llu blocks %llu [cma]time: "
+                          "%llu %llu %llu", lvb->lvb_size, lvb->lvb_blocks,
+                          lvb->lvb_ctime, lvb->lvb_mtime, lvb->lvb_atime);
+
+               ldlm_clear_lvb_cached(lock);
+       }
        RETURN(LDLM_ITER_CONTINUE);
 }
 
@@ -1403,6 +1421,12 @@ int mdc_object_prune(const struct lu_env *env, struct cl_object *obj)
        return 0;
 }
 
+static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj,
+                           struct ldlm_lock *lock)
+{
+       RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING));
+}
+
 static const struct cl_object_operations mdc_ops = {
        .coo_page_init = osc_page_init,
        .coo_lock_init = mdc_lock_init,
@@ -1412,6 +1436,7 @@ static const struct cl_object_operations mdc_ops = {
        .coo_glimpse = osc_object_glimpse,
        .coo_req_attr_set = mdc_req_attr_set,
        .coo_prune = mdc_object_prune,
+       .coo_object_flush = mdc_object_flush
 };
 
 static const struct osc_object_operations mdc_object_ops = {