Whamcloud - gitweb
LU-12635 build: Support for gcc -Wimplicit-fallthrough
[fs/lustre-release.git] / lustre / mdc / mdc_dev.c
index 3672ed1..08c868e 100644 (file)
@@ -34,6 +34,7 @@
 
 #include <obd_class.h>
 #include <lustre_osc.h>
+#include <uapi/linux/lustre/lustre_param.h>
 
 #include "mdc_internal.h"
 
@@ -151,7 +152,8 @@ again:
         * VFS and page cache already protect us locally, so lots of readers/
         * writers can share a single PW lock. */
        mode = mdc_dom_lock_match(env, osc_export(obj), resname, LDLM_IBITS,
-                                 policy, LCK_PR | LCK_PW, &flags, obj, &lockh,
+                                 policy, LCK_PR | LCK_PW | LCK_GROUP, &flags,
+                                 obj, &lockh,
                                  dap_flags & OSC_DAP_FL_CANCELING);
        if (mode != 0) {
                lock = ldlm_handle2lock(&lockh);
@@ -259,7 +261,9 @@ static int mdc_lock_flush(const struct lu_env *env, struct osc_object *obj,
                        result = 0;
        }
 
-       rc = mdc_lock_discard_pages(env, obj, start, end, discard);
+       /* Avoid lock matching with CLM_WRITE, there can be no other locks */
+       rc = mdc_lock_discard_pages(env, obj, start, end,
+                                   mode == CLM_WRITE || discard);
        if (result == 0 && rc < 0)
                result = rc;
 
@@ -290,7 +294,7 @@ void mdc_lock_lockless_cancel(const struct lu_env *env,
  */
 static int mdc_dlm_blocking_ast0(const struct lu_env *env,
                                 struct ldlm_lock *dlmlock,
-                                void *data, int flag)
+                                int flag)
 {
        struct cl_object *obj = NULL;
        int result = 0;
@@ -382,7 +386,7 @@ int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
                        break;
                }
 
-               rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag);
+               rc = mdc_dlm_blocking_ast0(env, dlmlock, flag);
                cl_env_put(env, &refcheck);
                break;
        }
@@ -618,8 +622,9 @@ int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
 }
 
 int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
-                         struct osc_enqueue_args *aa, int rc)
+                         void *args, int rc)
 {
+       struct osc_enqueue_args *aa = args;
        struct ldlm_lock *lock;
        struct lustre_handle *lockh = &aa->oa_lockh;
        enum ldlm_mode mode = aa->oa_mode;
@@ -683,7 +688,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        enum ldlm_mode mode;
        bool glimpse = *flags & LDLM_FL_HAS_INTENT;
        __u64 match_flags = *flags;
-       int rc;
+       struct list_head cancels = LIST_HEAD_INIT(cancels);
+       int rc, count;
 
        ENTRY;
 
@@ -691,10 +697,16 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        if (einfo->ei_mode == LCK_PR)
                mode |= LCK_PW;
 
-       if (!glimpse)
+       if (glimpse)
                match_flags |= LDLM_FL_BLOCK_GRANTED;
-       mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
-                              einfo->ei_type, policy, mode, &lockh, 0);
+       /* DOM locking uses LDLM_FL_KMS_IGNORE to mark locks wich have no valid
+        * LVB information, e.g. canceled locks or locks of just pruned object,
+        * such locks should be skipped.
+        */
+       mode = ldlm_lock_match_with_skip(obd->obd_namespace, match_flags,
+                                        LDLM_FL_KMS_IGNORE, res_id,
+                                        einfo->ei_type, policy, mode,
+                                        &lockh, 0);
        if (mode) {
                struct ldlm_lock *matched;
 
@@ -702,8 +714,16 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                        RETURN(ELDLM_OK);
 
                matched = ldlm_handle2lock(&lockh);
-               if (!matched || ldlm_is_kms_ignore(matched))
+               /* this shouldn't happen but this check is kept to make
+                * related test fail if problem occurs
+                */
+               if (unlikely(ldlm_is_kms_ignore(matched))) {
+                       LDLM_ERROR(matched, "matched lock has KMS ignore flag");
                        goto no_match;
+               }
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GLIMPSE_DDOS))
+                       ldlm_set_kms_ignore(matched);
 
                if (mdc_set_dom_lock_data(env, matched, einfo->ei_cbdata)) {
                        *flags |= LDLM_FL_LVB_READY;
@@ -727,7 +747,15 @@ no_match:
        if (req == NULL)
                RETURN(-ENOMEM);
 
-       rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+       /* For WRITE lock cancel other locks on resource early if any */
+       if (einfo->ei_mode & LCK_PW)
+               count = mdc_resource_get_unused_res(exp, res_id, &cancels,
+                                                   einfo->ei_mode,
+                                                   MDS_INODELOCK_DOM);
+       else
+               count = 0;
+
+       rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
        if (rc < 0) {
                ptlrpc_request_free(req);
                RETURN(rc);
@@ -751,8 +779,7 @@ no_match:
                if (!rc) {
                        struct osc_enqueue_args *aa;
 
-                       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-                       aa = ptlrpc_req_async_args(req);
+                       aa = ptlrpc_req_async_args(aa, req);
                        aa->oa_exp = exp;
                        aa->oa_mode = einfo->ei_mode;
                        aa->oa_type = einfo->ei_type;
@@ -763,8 +790,7 @@ no_match:
                        aa->oa_flags = flags;
                        aa->oa_lvb = lvb;
 
-                       req->rq_interpret_reply =
-                               (ptlrpc_interpterer_t)mdc_enqueue_interpret;
+                       req->rq_interpret_reply = mdc_enqueue_interpret;
                        ptlrpcd_add_req(req);
                } else {
                        ptlrpc_req_finished(req);
@@ -997,7 +1023,8 @@ static int mdc_io_setattr_start(const struct lu_env *env,
        struct obdo *oa = &oio->oi_oa;
        struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
        __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
-       unsigned int ia_valid = io->u.ci_setattr.sa_valid;
+       unsigned int ia_avalid = io->u.ci_setattr.sa_avalid;
+       enum op_xvalid ia_xvalid = io->u.ci_setattr.sa_xvalid;
        int rc;
 
        /* silently ignore non-truncate setattr for Data-on-MDT object */
@@ -1016,19 +1043,20 @@ static int mdc_io_setattr_start(const struct lu_env *env,
                        struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr;
                        unsigned int cl_valid = 0;
 
-                       if (ia_valid & ATTR_SIZE) {
-                               attr->cat_size = attr->cat_kms = size;
+                       if (ia_avalid & ATTR_SIZE) {
+                               attr->cat_size = size;
+                               attr->cat_kms = size;
                                cl_valid = (CAT_SIZE | CAT_KMS);
                        }
-                       if (ia_valid & ATTR_MTIME_SET) {
+                       if (ia_avalid & ATTR_MTIME_SET) {
                                attr->cat_mtime = lvb->lvb_mtime;
                                cl_valid |= CAT_MTIME;
                        }
-                       if (ia_valid & ATTR_ATIME_SET) {
+                       if (ia_avalid & ATTR_ATIME_SET) {
                                attr->cat_atime = lvb->lvb_atime;
                                cl_valid |= CAT_ATIME;
                        }
-                       if (ia_valid & ATTR_CTIME_SET) {
+                       if (ia_xvalid & OP_XVALID_CTIME_SET) {
                                attr->cat_ctime = lvb->lvb_ctime;
                                cl_valid |= CAT_CTIME;
                        }
@@ -1039,7 +1067,7 @@ static int mdc_io_setattr_start(const struct lu_env *env,
                        return rc;
        }
 
-       if (!(ia_valid & ATTR_SIZE))
+       if (!(ia_avalid & ATTR_SIZE))
                return 0;
 
        memset(oa, 0, sizeof(*oa));
@@ -1144,9 +1172,9 @@ struct mdc_data_version_args {
 
 static int
 mdc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req,
-                          void *arg, int rc)
+                          void *args, int rc)
 {
-       struct mdc_data_version_args *dva = arg;
+       struct mdc_data_version_args *dva = args;
        struct osc_io *oio = dva->dva_oio;
        const struct mdt_body *body;
 
@@ -1220,8 +1248,7 @@ static int mdc_io_data_version_start(const struct lu_env *env,
        ptlrpc_request_set_replen(req);
 
        req->rq_interpret_reply = mdc_data_version_interpret;
-       CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args));
-       dva = ptlrpc_req_async_args(req);
+       dva = ptlrpc_req_async_args(dva, req);
        dva->dva_oio = oio;
 
        ptlrpcd_add_req(req);
@@ -1259,13 +1286,13 @@ static void mdc_io_data_version_end(const struct lu_env *env,
 static struct cl_io_operations mdc_io_ops = {
        .op = {
                [CIT_READ] = {
-                       .cio_iter_init = osc_io_iter_init,
-                       .cio_iter_fini = osc_io_iter_fini,
+                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start     = osc_io_read_start,
                },
                [CIT_WRITE] = {
-                       .cio_iter_init = osc_io_write_iter_init,
-                       .cio_iter_fini = osc_io_write_iter_fini,
+                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start     = osc_io_write_start,
                        .cio_end       = osc_io_end,
                },
@@ -1366,11 +1393,9 @@ static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data)
 {
        ENTRY;
 
-       if ((lock->l_ast_data == NULL && !ldlm_is_kms_ignore(lock)) ||
-           (lock->l_ast_data == data)) {
+       if (lock->l_ast_data == data)
                lock->l_ast_data = NULL;
-               ldlm_set_kms_ignore(lock);
-       }
+       ldlm_set_kms_ignore(lock);
        RETURN(LDLM_ITER_CONTINUE);
 }
 
@@ -1387,6 +1412,12 @@ int mdc_object_prune(const struct lu_env *env, struct cl_object *obj)
        return 0;
 }
 
+static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj,
+                           struct ldlm_lock *lock)
+{
+       RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING));
+}
+
 static const struct cl_object_operations mdc_ops = {
        .coo_page_init = osc_page_init,
        .coo_lock_init = mdc_lock_init,
@@ -1396,6 +1427,7 @@ static const struct cl_object_operations mdc_ops = {
        .coo_glimpse = osc_object_glimpse,
        .coo_req_attr_set = mdc_req_attr_set,
        .coo_prune = mdc_object_prune,
+       .coo_object_flush = mdc_object_flush
 };
 
 static const struct osc_object_operations mdc_object_ops = {
@@ -1451,15 +1483,17 @@ struct lu_object *mdc_object_alloc(const struct lu_env *env,
        return obj;
 }
 
-static int mdc_cl_process_config(const struct lu_env *env,
-                                struct lu_device *d, struct lustre_cfg *cfg)
+static int mdc_process_config(const struct lu_env *env, struct lu_device *d,
+                             struct lustre_cfg *cfg)
 {
-       return mdc_process_config(d->ld_obd, 0, cfg);
+       size_t count  = class_modify_config(cfg, PARAM_MDC,
+                                           &d->ld_obd->obd_kset.kobj);
+       return count > 0 ? 0 : count;
 }
 
 const struct lu_device_operations mdc_lu_ops = {
        .ldo_object_alloc = mdc_object_alloc,
-       .ldo_process_config = mdc_cl_process_config,
+       .ldo_process_config = mdc_process_config,
        .ldo_recovery_complete = NULL,
 };