Whamcloud - gitweb
LU-12681 osc: wrong cache of LVB attrs
[fs/lustre-release.git] / lustre / mdc / mdc_dev.c
index 14b0e65..b85481e 100644 (file)
@@ -34,6 +34,7 @@
 
 #include <obd_class.h>
 #include <lustre_osc.h>
+#include <uapi/linux/lustre/lustre_param.h>
 
 #include "mdc_internal.h"
 
@@ -260,7 +261,9 @@ static int mdc_lock_flush(const struct lu_env *env, struct osc_object *obj,
                        result = 0;
        }
 
-       rc = mdc_lock_discard_pages(env, obj, start, end, discard);
+       /* Avoid lock matching with CLM_WRITE, there can be no other locks */
+       rc = mdc_lock_discard_pages(env, obj, start, end,
+                                   mode == CLM_WRITE || discard);
        if (result == 0 && rc < 0)
                result = rc;
 
@@ -291,7 +294,7 @@ void mdc_lock_lockless_cancel(const struct lu_env *env,
  */
 static int mdc_dlm_blocking_ast0(const struct lu_env *env,
                                 struct ldlm_lock *dlmlock,
-                                void *data, int flag)
+                                int flag)
 {
        struct cl_object *obj = NULL;
        int result = 0;
@@ -319,7 +322,6 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env,
                dlmlock->l_ast_data = NULL;
                cl_object_get(obj);
        }
-       ldlm_set_kms_ignore(dlmlock);
        unlock_res_and_lock(dlmlock);
 
        /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or
@@ -383,7 +385,7 @@ int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
                        break;
                }
 
-               rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag);
+               rc = mdc_dlm_blocking_ast0(env, dlmlock, flag);
                cl_env_put(env, &refcheck);
                break;
        }
@@ -442,7 +444,7 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc,
 }
 
 static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
-                            struct lustre_handle *lockh, bool lvb_update)
+                            struct lustre_handle *lockh)
 {
        struct ldlm_lock *dlmlock;
 
@@ -482,10 +484,11 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
                descr->cld_end = CL_PAGE_EOF;
 
                /* no lvb update for matched lock */
-               if (lvb_update) {
+               if (!ldlm_is_lvb_cached(dlmlock)) {
                        LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
                        mdc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj),
                                            dlmlock, NULL);
+                       ldlm_set_lvb_cached(dlmlock);
                }
        }
        unlock_res_and_lock(dlmlock);
@@ -526,7 +529,7 @@ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh,
 
        CDEBUG(D_INODE, "rc %d, err %d\n", rc, errcode);
        if (rc == 0)
-               mdc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK);
+               mdc_lock_granted(env, oscl, lockh);
 
        /* Error handling, some errors are tolerable. */
        if (oscl->ols_locklessable && rc == -EUSERS) {
@@ -619,8 +622,9 @@ int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
 }
 
 int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
-                         struct osc_enqueue_args *aa, int rc)
+                         void *args, int rc)
 {
+       struct osc_enqueue_args *aa = args;
        struct ldlm_lock *lock;
        struct lustre_handle *lockh = &aa->oa_lockh;
        enum ldlm_mode mode = aa->oa_mode;
@@ -684,7 +688,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        enum ldlm_mode mode;
        bool glimpse = *flags & LDLM_FL_HAS_INTENT;
        __u64 match_flags = *flags;
-       int rc;
+       struct list_head cancels = LIST_HEAD_INIT(cancels);
+       int rc, count;
 
        ENTRY;
 
@@ -692,8 +697,12 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        if (einfo->ei_mode == LCK_PR)
                mode |= LCK_PW;
 
-       if (!glimpse)
+       if (glimpse)
                match_flags |= LDLM_FL_BLOCK_GRANTED;
+       /* DOM locking uses LDLM_FL_KMS_IGNORE to mark locks wich have no valid
+        * LVB information, e.g. canceled locks or locks of just pruned object,
+        * such locks should be skipped.
+        */
        mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
                               einfo->ei_type, policy, mode, &lockh, 0);
        if (mode) {
@@ -703,8 +712,9 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                        RETURN(ELDLM_OK);
 
                matched = ldlm_handle2lock(&lockh);
-               if (!matched || ldlm_is_kms_ignore(matched))
-                       goto no_match;
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GLIMPSE_DDOS))
+                       ldlm_set_kms_ignore(matched);
 
                if (mdc_set_dom_lock_data(env, matched, einfo->ei_cbdata)) {
                        *flags |= LDLM_FL_LVB_READY;
@@ -716,7 +726,6 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                        LDLM_LOCK_PUT(matched);
                        RETURN(ELDLM_OK);
                }
-no_match:
                ldlm_lock_decref(&lockh, mode);
                LDLM_LOCK_PUT(matched);
        }
@@ -728,7 +737,15 @@ no_match:
        if (req == NULL)
                RETURN(-ENOMEM);
 
-       rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+       /* For WRITE lock cancel other locks on resource early if any */
+       if (einfo->ei_mode & LCK_PW)
+               count = mdc_resource_get_unused_res(exp, res_id, &cancels,
+                                                   einfo->ei_mode,
+                                                   MDS_INODELOCK_DOM);
+       else
+               count = 0;
+
+       rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
        if (rc < 0) {
                ptlrpc_request_free(req);
                RETURN(rc);
@@ -752,8 +769,7 @@ no_match:
                if (!rc) {
                        struct osc_enqueue_args *aa;
 
-                       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-                       aa = ptlrpc_req_async_args(req);
+                       aa = ptlrpc_req_async_args(aa, req);
                        aa->oa_exp = exp;
                        aa->oa_mode = einfo->ei_mode;
                        aa->oa_type = einfo->ei_type;
@@ -764,8 +780,7 @@ no_match:
                        aa->oa_flags = flags;
                        aa->oa_lvb = lvb;
 
-                       req->rq_interpret_reply =
-                               (ptlrpc_interpterer_t)mdc_enqueue_interpret;
+                       req->rq_interpret_reply = mdc_enqueue_interpret;
                        ptlrpcd_add_req(req);
                } else {
                        ptlrpc_req_finished(req);
@@ -998,7 +1013,8 @@ static int mdc_io_setattr_start(const struct lu_env *env,
        struct obdo *oa = &oio->oi_oa;
        struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
        __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
-       unsigned int ia_valid = io->u.ci_setattr.sa_valid;
+       unsigned int ia_avalid = io->u.ci_setattr.sa_avalid;
+       enum op_xvalid ia_xvalid = io->u.ci_setattr.sa_xvalid;
        int rc;
 
        /* silently ignore non-truncate setattr for Data-on-MDT object */
@@ -1017,19 +1033,20 @@ static int mdc_io_setattr_start(const struct lu_env *env,
                        struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr;
                        unsigned int cl_valid = 0;
 
-                       if (ia_valid & ATTR_SIZE) {
-                               attr->cat_size = attr->cat_kms = size;
+                       if (ia_avalid & ATTR_SIZE) {
+                               attr->cat_size = size;
+                               attr->cat_kms = size;
                                cl_valid = (CAT_SIZE | CAT_KMS);
                        }
-                       if (ia_valid & ATTR_MTIME_SET) {
+                       if (ia_avalid & ATTR_MTIME_SET) {
                                attr->cat_mtime = lvb->lvb_mtime;
                                cl_valid |= CAT_MTIME;
                        }
-                       if (ia_valid & ATTR_ATIME_SET) {
+                       if (ia_avalid & ATTR_ATIME_SET) {
                                attr->cat_atime = lvb->lvb_atime;
                                cl_valid |= CAT_ATIME;
                        }
-                       if (ia_valid & ATTR_CTIME_SET) {
+                       if (ia_xvalid & OP_XVALID_CTIME_SET) {
                                attr->cat_ctime = lvb->lvb_ctime;
                                cl_valid |= CAT_CTIME;
                        }
@@ -1040,7 +1057,7 @@ static int mdc_io_setattr_start(const struct lu_env *env,
                        return rc;
        }
 
-       if (!(ia_valid & ATTR_SIZE))
+       if (!(ia_avalid & ATTR_SIZE))
                return 0;
 
        memset(oa, 0, sizeof(*oa));
@@ -1145,9 +1162,9 @@ struct mdc_data_version_args {
 
 static int
 mdc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req,
-                          void *arg, int rc)
+                          void *args, int rc)
 {
-       struct mdc_data_version_args *dva = arg;
+       struct mdc_data_version_args *dva = args;
        struct osc_io *oio = dva->dva_oio;
        const struct mdt_body *body;
 
@@ -1221,8 +1238,7 @@ static int mdc_io_data_version_start(const struct lu_env *env,
        ptlrpc_request_set_replen(req);
 
        req->rq_interpret_reply = mdc_data_version_interpret;
-       CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args));
-       dva = ptlrpc_req_async_args(req);
+       dva = ptlrpc_req_async_args(dva, req);
        dva->dva_oio = oio;
 
        ptlrpcd_add_req(req);
@@ -1260,13 +1276,13 @@ static void mdc_io_data_version_end(const struct lu_env *env,
 static struct cl_io_operations mdc_io_ops = {
        .op = {
                [CIT_READ] = {
-                       .cio_iter_init = osc_io_iter_init,
-                       .cio_iter_fini = osc_io_iter_fini,
+                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start     = osc_io_read_start,
                },
                [CIT_WRITE] = {
-                       .cio_iter_init = osc_io_write_iter_init,
-                       .cio_iter_fini = osc_io_write_iter_fini,
+                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start     = osc_io_write_start,
                        .cio_end       = osc_io_end,
                },
@@ -1365,12 +1381,29 @@ static int mdc_attr_get(const struct lu_env *env, struct cl_object *obj,
 
 static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data)
 {
+       struct osc_object *osc = (struct osc_object *)data;
+       struct ost_lvb *lvb = &lock->l_ost_lvb;
+       struct lov_oinfo *oinfo;
        ENTRY;
 
-       if ((lock->l_ast_data == NULL && !ldlm_is_kms_ignore(lock)) ||
-           (lock->l_ast_data == data)) {
+       if (lock->l_ast_data == data) {
                lock->l_ast_data = NULL;
-               ldlm_set_kms_ignore(lock);
+
+               LASSERT(osc != NULL);
+               LASSERT(osc->oo_oinfo != NULL);
+               LASSERT(lvb != NULL);
+
+               /* Updates lvb in lock by the cached oinfo */
+               oinfo = osc->oo_oinfo;
+               cl_object_attr_lock(&osc->oo_cl);
+               memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb));
+               cl_object_attr_unlock(&osc->oo_cl);
+
+               LDLM_DEBUG(lock, "update lvb size %llu blocks %llu [cma]time: "
+                          "%llu %llu %llu", lvb->lvb_size, lvb->lvb_blocks,
+                          lvb->lvb_ctime, lvb->lvb_mtime, lvb->lvb_atime);
+
+               ldlm_clear_lvb_cached(lock);
        }
        RETURN(LDLM_ITER_CONTINUE);
 }
@@ -1388,6 +1421,12 @@ int mdc_object_prune(const struct lu_env *env, struct cl_object *obj)
        return 0;
 }
 
+static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj,
+                           struct ldlm_lock *lock)
+{
+       RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING));
+}
+
 static const struct cl_object_operations mdc_ops = {
        .coo_page_init = osc_page_init,
        .coo_lock_init = mdc_lock_init,
@@ -1397,6 +1436,7 @@ static const struct cl_object_operations mdc_ops = {
        .coo_glimpse = osc_object_glimpse,
        .coo_req_attr_set = mdc_req_attr_set,
        .coo_prune = mdc_object_prune,
+       .coo_object_flush = mdc_object_flush
 };
 
 static const struct osc_object_operations mdc_object_ops = {
@@ -1452,15 +1492,17 @@ struct lu_object *mdc_object_alloc(const struct lu_env *env,
        return obj;
 }
 
-static int mdc_cl_process_config(const struct lu_env *env,
-                                struct lu_device *d, struct lustre_cfg *cfg)
+static int mdc_process_config(const struct lu_env *env, struct lu_device *d,
+                             struct lustre_cfg *cfg)
 {
-       return mdc_process_config(d->ld_obd, 0, cfg);
+       size_t count  = class_modify_config(cfg, PARAM_MDC,
+                                           &d->ld_obd->obd_kset.kobj);
+       return count > 0 ? 0 : count;
 }
 
 const struct lu_device_operations mdc_lu_ops = {
        .ldo_object_alloc = mdc_object_alloc,
-       .ldo_process_config = mdc_cl_process_config,
+       .ldo_process_config = mdc_process_config,
        .ldo_recovery_complete = NULL,
 };