Whamcloud - gitweb
LU-12616 obclass: fix MDS start/stop race
[fs/lustre-release.git] / lustre / mdc / mdc_dev.c
index c06749e..394bb3e 100644 (file)
@@ -34,6 +34,7 @@
 
 #include <obd_class.h>
 #include <lustre_osc.h>
+#include <uapi/linux/lustre/lustre_param.h>
 
 #include "mdc_internal.h"
 
@@ -260,7 +261,9 @@ static int mdc_lock_flush(const struct lu_env *env, struct osc_object *obj,
                        result = 0;
        }
 
-       rc = mdc_lock_discard_pages(env, obj, start, end, discard);
+       /* Avoid lock matching with CLM_WRITE, there can be no other locks */
+       rc = mdc_lock_discard_pages(env, obj, start, end,
+                                   mode == CLM_WRITE || discard);
        if (result == 0 && rc < 0)
                result = rc;
 
@@ -291,7 +294,7 @@ void mdc_lock_lockless_cancel(const struct lu_env *env,
  */
 static int mdc_dlm_blocking_ast0(const struct lu_env *env,
                                 struct ldlm_lock *dlmlock,
-                                void *data, int flag)
+                                int flag)
 {
        struct cl_object *obj = NULL;
        int result = 0;
@@ -383,7 +386,7 @@ int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
                        break;
                }
 
-               rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag);
+               rc = mdc_dlm_blocking_ast0(env, dlmlock, flag);
                cl_env_put(env, &refcheck);
                break;
        }
@@ -619,8 +622,9 @@ int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
 }
 
 int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
-                         struct osc_enqueue_args *aa, int rc)
+                         void *args, int rc)
 {
+       struct osc_enqueue_args *aa = args;
        struct ldlm_lock *lock;
        struct lustre_handle *lockh = &aa->oa_lockh;
        enum ldlm_mode mode = aa->oa_mode;
@@ -684,7 +688,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        enum ldlm_mode mode;
        bool glimpse = *flags & LDLM_FL_HAS_INTENT;
        __u64 match_flags = *flags;
-       int rc;
+       struct list_head cancels = LIST_HEAD_INIT(cancels);
+       int rc, count;
 
        ENTRY;
 
@@ -692,10 +697,16 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
        if (einfo->ei_mode == LCK_PR)
                mode |= LCK_PW;
 
-       if (!glimpse)
+       if (glimpse)
                match_flags |= LDLM_FL_BLOCK_GRANTED;
-       mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
-                              einfo->ei_type, policy, mode, &lockh, 0);
+       /* DOM locking uses LDLM_FL_KMS_IGNORE to mark locks wich have no valid
+        * LVB information, e.g. canceled locks or locks of just pruned object,
+        * such locks should be skipped.
+        */
+       mode = ldlm_lock_match_with_skip(obd->obd_namespace, match_flags,
+                                        LDLM_FL_KMS_IGNORE, res_id,
+                                        einfo->ei_type, policy, mode,
+                                        &lockh, 0);
        if (mode) {
                struct ldlm_lock *matched;
 
@@ -703,8 +714,16 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                        RETURN(ELDLM_OK);
 
                matched = ldlm_handle2lock(&lockh);
-               if (!matched || ldlm_is_kms_ignore(matched))
+               /* this shouldn't happen but this check is kept to make
+                * related test fail if problem occurs
+                */
+               if (unlikely(ldlm_is_kms_ignore(matched))) {
+                       LDLM_ERROR(matched, "matched lock has KMS ignore flag");
                        goto no_match;
+               }
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GLIMPSE_DDOS))
+                       ldlm_set_kms_ignore(matched);
 
                if (mdc_set_dom_lock_data(env, matched, einfo->ei_cbdata)) {
                        *flags |= LDLM_FL_LVB_READY;
@@ -728,7 +747,15 @@ no_match:
        if (req == NULL)
                RETURN(-ENOMEM);
 
-       rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+       /* For WRITE lock cancel other locks on resource early if any */
+       if (einfo->ei_mode & LCK_PW)
+               count = mdc_resource_get_unused_res(exp, res_id, &cancels,
+                                                   einfo->ei_mode,
+                                                   MDS_INODELOCK_DOM);
+       else
+               count = 0;
+
+       rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
        if (rc < 0) {
                ptlrpc_request_free(req);
                RETURN(rc);
@@ -764,8 +791,7 @@ no_match:
                        aa->oa_flags = flags;
                        aa->oa_lvb = lvb;
 
-                       req->rq_interpret_reply =
-                               (ptlrpc_interpterer_t)mdc_enqueue_interpret;
+                       req->rq_interpret_reply = mdc_enqueue_interpret;
                        ptlrpcd_add_req(req);
                } else {
                        ptlrpc_req_finished(req);
@@ -1147,9 +1173,9 @@ struct mdc_data_version_args {
 
 static int
 mdc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req,
-                          void *arg, int rc)
+                          void *args, int rc)
 {
-       struct mdc_data_version_args *dva = arg;
+       struct mdc_data_version_args *dva = args;
        struct osc_io *oio = dva->dva_oio;
        const struct mdt_body *body;
 
@@ -1262,13 +1288,13 @@ static void mdc_io_data_version_end(const struct lu_env *env,
 static struct cl_io_operations mdc_io_ops = {
        .op = {
                [CIT_READ] = {
-                       .cio_iter_init = osc_io_iter_init,
-                       .cio_iter_fini = osc_io_iter_fini,
+                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start     = osc_io_read_start,
                },
                [CIT_WRITE] = {
-                       .cio_iter_init = osc_io_write_iter_init,
-                       .cio_iter_fini = osc_io_write_iter_fini,
+                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start     = osc_io_write_start,
                        .cio_end       = osc_io_end,
                },
@@ -1369,11 +1395,9 @@ static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data)
 {
        ENTRY;
 
-       if ((lock->l_ast_data == NULL && !ldlm_is_kms_ignore(lock)) ||
-           (lock->l_ast_data == data)) {
+       if (lock->l_ast_data == data)
                lock->l_ast_data = NULL;
-               ldlm_set_kms_ignore(lock);
-       }
+       ldlm_set_kms_ignore(lock);
        RETURN(LDLM_ITER_CONTINUE);
 }
 
@@ -1390,6 +1414,12 @@ int mdc_object_prune(const struct lu_env *env, struct cl_object *obj)
        return 0;
 }
 
+static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj,
+                           struct ldlm_lock *lock)
+{
+       RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING));
+}
+
 static const struct cl_object_operations mdc_ops = {
        .coo_page_init = osc_page_init,
        .coo_lock_init = mdc_lock_init,
@@ -1399,6 +1429,7 @@ static const struct cl_object_operations mdc_ops = {
        .coo_glimpse = osc_object_glimpse,
        .coo_req_attr_set = mdc_req_attr_set,
        .coo_prune = mdc_object_prune,
+       .coo_object_flush = mdc_object_flush
 };
 
 static const struct osc_object_operations mdc_object_ops = {
@@ -1454,15 +1485,17 @@ struct lu_object *mdc_object_alloc(const struct lu_env *env,
        return obj;
 }
 
-static int mdc_cl_process_config(const struct lu_env *env,
-                                struct lu_device *d, struct lustre_cfg *cfg)
+static int mdc_process_config(const struct lu_env *env, struct lu_device *d,
+                             struct lustre_cfg *cfg)
 {
-       return mdc_process_config(d->ld_obd, 0, cfg);
+       size_t count  = class_modify_config(cfg, PARAM_MDC,
+                                           &d->ld_obd->obd_kset.kobj);
+       return count > 0 ? 0 : count;
 }
 
 const struct lu_device_operations mdc_lu_ops = {
        .ldo_object_alloc = mdc_object_alloc,
-       .ldo_process_config = mdc_cl_process_config,
+       .ldo_process_config = mdc_process_config,
        .ldo_recovery_complete = NULL,
 };