Whamcloud - gitweb
LU-11696 utils: "lfs getsom" returns "24" to userspace
[fs/lustre-release.git] / lustre / mdc / mdc_dev.c
index 3b37a40..3565f2b 100644 (file)
@@ -20,7 +20,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2017 Intel Corporation.
+ * Copyright (c) 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -151,7 +151,8 @@ again:
         * VFS and page cache already protect us locally, so lots of readers/
         * writers can share a single PW lock. */
        mode = mdc_dom_lock_match(env, osc_export(obj), resname, LDLM_IBITS,
-                                 policy, LCK_PR | LCK_PW, &flags, obj, &lockh,
+                                 policy, LCK_PR | LCK_PW | LCK_GROUP, &flags,
+                                 obj, &lockh,
                                  dap_flags & OSC_DAP_FL_CANCELING);
        if (mode != 0) {
                lock = ldlm_handle2lock(&lockh);
@@ -618,8 +619,9 @@ int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
 }
 
 int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
-                         struct osc_enqueue_args *aa, int rc)
+                         void *args, int rc)
 {
+       struct osc_enqueue_args *aa = args;
        struct ldlm_lock *lock;
        struct lustre_handle *lockh = &aa->oa_lockh;
        enum ldlm_mode mode = aa->oa_mode;
@@ -702,7 +704,7 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp,
                        RETURN(ELDLM_OK);
 
                matched = ldlm_handle2lock(&lockh);
-               if (ldlm_is_kms_ignore(matched))
+               if (!matched || ldlm_is_kms_ignore(matched))
                        goto no_match;
 
                if (mdc_set_dom_lock_data(env, matched, einfo->ei_cbdata)) {
@@ -763,8 +765,7 @@ no_match:
                        aa->oa_flags = flags;
                        aa->oa_lvb = lvb;
 
-                       req->rq_interpret_reply =
-                               (ptlrpc_interpterer_t)mdc_enqueue_interpret;
+                       req->rq_interpret_reply = mdc_enqueue_interpret;
                        ptlrpcd_add_req(req);
                } else {
                        ptlrpc_req_finished(req);
@@ -959,6 +960,33 @@ static int mdc_async_upcall(void *a, int rc)
        return 0;
 }
 
+static int mdc_get_lock_handle(const struct lu_env *env, struct osc_object *osc,
+                              pgoff_t index, struct lustre_handle *lh)
+{
+       struct ldlm_lock *lock;
+
+       /* find DOM lock protecting object */
+       lock = mdc_dlmlock_at_pgoff(env, osc, index,
+                                   OSC_DAP_FL_TEST_LOCK |
+                                   OSC_DAP_FL_CANCELING);
+       if (lock == NULL) {
+               struct ldlm_resource *res;
+               struct ldlm_res_id *resname;
+
+               resname = &osc_env_info(env)->oti_resname;
+               fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname);
+               res = ldlm_resource_get(osc_export(osc)->exp_obd->obd_namespace,
+                                       NULL, resname, LDLM_IBITS, 0);
+               ldlm_resource_dump(D_ERROR, res);
+               libcfs_debug_dumpstack(NULL);
+               return -ENOENT;
+       } else {
+               *lh = lock->l_remote_handle;
+               LDLM_LOCK_PUT(lock);
+       }
+       return 0;
+}
+
 static int mdc_io_setattr_start(const struct lu_env *env,
                                const struct cl_io_slice *slice)
 {
@@ -970,7 +998,8 @@ static int mdc_io_setattr_start(const struct lu_env *env,
        struct obdo *oa = &oio->oi_oa;
        struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
        __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
-       unsigned int ia_valid = io->u.ci_setattr.sa_valid;
+       unsigned int ia_avalid = io->u.ci_setattr.sa_avalid;
+       enum op_xvalid ia_xvalid = io->u.ci_setattr.sa_xvalid;
        int rc;
 
        /* silently ignore non-truncate setattr for Data-on-MDT object */
@@ -989,19 +1018,20 @@ static int mdc_io_setattr_start(const struct lu_env *env,
                        struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr;
                        unsigned int cl_valid = 0;
 
-                       if (ia_valid & ATTR_SIZE) {
-                               attr->cat_size = attr->cat_kms = size;
+                       if (ia_avalid & ATTR_SIZE) {
+                               attr->cat_size = size;
+                               attr->cat_kms = size;
                                cl_valid = (CAT_SIZE | CAT_KMS);
                        }
-                       if (ia_valid & ATTR_MTIME_SET) {
+                       if (ia_avalid & ATTR_MTIME_SET) {
                                attr->cat_mtime = lvb->lvb_mtime;
                                cl_valid |= CAT_MTIME;
                        }
-                       if (ia_valid & ATTR_ATIME_SET) {
+                       if (ia_avalid & ATTR_ATIME_SET) {
                                attr->cat_atime = lvb->lvb_atime;
                                cl_valid |= CAT_ATIME;
                        }
-                       if (ia_valid & ATTR_CTIME_SET) {
+                       if (ia_xvalid & OP_XVALID_CTIME_SET) {
                                attr->cat_ctime = lvb->lvb_ctime;
                                cl_valid |= CAT_CTIME;
                        }
@@ -1012,7 +1042,7 @@ static int mdc_io_setattr_start(const struct lu_env *env,
                        return rc;
        }
 
-       if (!(ia_valid & ATTR_SIZE))
+       if (!(ia_avalid & ATTR_SIZE))
                return 0;
 
        memset(oa, 0, sizeof(*oa));
@@ -1029,6 +1059,11 @@ static int mdc_io_setattr_start(const struct lu_env *env,
        if (oio->oi_lockless) {
                oa->o_flags = OBD_FL_SRVLOCK;
                oa->o_valid |= OBD_MD_FLFLAGS;
+       } else {
+               rc = mdc_get_lock_handle(env, cl2osc(obj), CL_PAGE_EOF,
+                                        &oa->o_handle);
+               if (!rc)
+                       oa->o_valid |= OBD_MD_FLHANDLE;
        }
 
        init_completion(&cbargs->opc_sync);
@@ -1068,6 +1103,162 @@ static int mdc_io_read_ahead(const struct lu_env *env,
        RETURN(0);
 }
 
+int mdc_io_fsync_start(const struct lu_env *env,
+                      const struct cl_io_slice *slice)
+{
+       struct cl_io *io = slice->cis_io;
+       struct cl_fsync_io *fio = &io->u.ci_fsync;
+       struct cl_object *obj = slice->cis_obj;
+       struct osc_object *osc = cl2osc(obj);
+       int result = 0;
+
+       ENTRY;
+
+       /* a MDC lock always covers whole object, do sync for whole
+        * possible range despite of supplied start/end values.
+        */
+       result = osc_cache_writeback_range(env, osc, 0, CL_PAGE_EOF, 0,
+                                          fio->fi_mode == CL_FSYNC_DISCARD);
+       if (result > 0) {
+               fio->fi_nr_written += result;
+               result = 0;
+       }
+       if (fio->fi_mode == CL_FSYNC_ALL) {
+               int rc;
+
+               rc = osc_cache_wait_range(env, osc, 0, CL_PAGE_EOF);
+               if (result == 0)
+                       result = rc;
+               /* Use OSC sync code because it is asynchronous.
+                * It is to be added into MDC and avoid the using of
+                * OST_SYNC at both MDC and MDT.
+                */
+               rc = osc_fsync_ost(env, osc, fio);
+               if (result == 0)
+                       result = rc;
+       }
+
+       RETURN(result);
+}
+
+struct mdc_data_version_args {
+       struct osc_io *dva_oio;
+};
+
+static int
+mdc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req,
+                          void *args, int rc)
+{
+       struct mdc_data_version_args *dva = args;
+       struct osc_io *oio = dva->dva_oio;
+       const struct mdt_body *body;
+
+       ENTRY;
+       if (rc < 0)
+               GOTO(out, rc);
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+       if (body == NULL)
+               GOTO(out, rc = -EPROTO);
+
+       /* Prepare OBDO from mdt_body for CLIO */
+       oio->oi_oa.o_valid = body->mbo_valid;
+       oio->oi_oa.o_flags = body->mbo_flags;
+       oio->oi_oa.o_data_version = body->mbo_version;
+       oio->oi_oa.o_layout_version = body->mbo_layout_gen;
+       EXIT;
+out:
+       oio->oi_cbarg.opc_rc = rc;
+       complete(&oio->oi_cbarg.opc_sync);
+       return 0;
+}
+
+static int mdc_io_data_version_start(const struct lu_env *env,
+                                    const struct cl_io_slice *slice)
+{
+       struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version;
+       struct osc_io *oio = cl2osc_io(env, slice);
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+       struct osc_object *obj = cl2osc(slice->cis_obj);
+       struct obd_export *exp = osc_export(obj);
+       struct ptlrpc_request *req;
+       struct mdt_body *body;
+       struct mdc_data_version_args *dva;
+       int rc;
+
+       ENTRY;
+
+       memset(&oio->oi_oa, 0, sizeof(oio->oi_oa));
+       oio->oi_oa.o_oi.oi_fid = *lu_object_fid(osc2lu(obj));
+       oio->oi_oa.o_valid = OBD_MD_FLID;
+
+       init_completion(&cbargs->opc_sync);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
+       if (rc < 0) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY);
+       body->mbo_fid1 = *lu_object_fid(osc2lu(obj));
+       body->mbo_valid = OBD_MD_FLID;
+       /* Indicate that data version is needed */
+       body->mbo_valid |= OBD_MD_FLDATAVERSION;
+       body->mbo_flags = 0;
+
+       if (dv->dv_flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) {
+               body->mbo_valid |= OBD_MD_FLFLAGS;
+               body->mbo_flags |= OBD_FL_SRVLOCK;
+               if (dv->dv_flags & LL_DV_WR_FLUSH)
+                       body->mbo_flags |= OBD_FL_FLUSH;
+       }
+
+       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
+       req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0);
+       ptlrpc_request_set_replen(req);
+
+       req->rq_interpret_reply = mdc_data_version_interpret;
+       CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args));
+       dva = ptlrpc_req_async_args(req);
+       dva->dva_oio = oio;
+
+       ptlrpcd_add_req(req);
+
+       RETURN(0);
+}
+
+static void mdc_io_data_version_end(const struct lu_env *env,
+                                   const struct cl_io_slice *slice)
+{
+       struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version;
+       struct osc_io *oio = cl2osc_io(env, slice);
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+
+       ENTRY;
+       wait_for_completion(&cbargs->opc_sync);
+
+       if (cbargs->opc_rc != 0) {
+               slice->cis_io->ci_result = cbargs->opc_rc;
+       } else {
+               slice->cis_io->ci_result = 0;
+               if (!(oio->oi_oa.o_valid &
+                     (OBD_MD_LAYOUT_VERSION | OBD_MD_FLDATAVERSION)))
+                       slice->cis_io->ci_result = -ENOTSUPP;
+
+               if (oio->oi_oa.o_valid & OBD_MD_LAYOUT_VERSION)
+                       dv->dv_layout_version = oio->oi_oa.o_layout_version;
+               if (oio->oi_oa.o_valid & OBD_MD_FLDATAVERSION)
+                       dv->dv_data_version = oio->oi_oa.o_data_version;
+       }
+
+       EXIT;
+}
+
 static struct cl_io_operations mdc_io_ops = {
        .op = {
                [CIT_READ] = {
@@ -1087,10 +1278,9 @@ static struct cl_io_operations mdc_io_ops = {
                        .cio_start     = mdc_io_setattr_start,
                        .cio_end       = osc_io_setattr_end,
                },
-               /* no support for data version so far */
                [CIT_DATA_VERSION] = {
-                       .cio_start = NULL,
-                       .cio_end   = NULL,
+                       .cio_start = mdc_io_data_version_start,
+                       .cio_end   = mdc_io_data_version_end,
                },
                [CIT_FAULT] = {
                        .cio_iter_init = osc_io_iter_init,
@@ -1099,7 +1289,7 @@ static struct cl_io_operations mdc_io_ops = {
                        .cio_end       = osc_io_end,
                },
                [CIT_FSYNC] = {
-                       .cio_start = osc_io_fsync_start,
+                       .cio_start = mdc_io_fsync_start,
                        .cio_end   = osc_io_fsync_end,
                },
        },
@@ -1144,35 +1334,22 @@ static void mdc_req_attr_set(const struct lu_env *env, struct cl_object *obj,
                attr->cra_oa->o_valid |= OBD_MD_FLID;
 
        if (flags & OBD_MD_FLHANDLE) {
-               struct ldlm_lock *lock;  /* _some_ lock protecting @apage */
                struct osc_page *opg;
 
                opg = osc_cl_page_osc(attr->cra_page, cl2osc(obj));
-               lock = mdc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg),
-                               OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_CANCELING);
-               if (lock == NULL && !opg->ops_srvlock) {
-                       struct ldlm_resource *res;
-                       struct ldlm_res_id *resname;
-
-                       CL_PAGE_DEBUG(D_ERROR, env, attr->cra_page,
-                                     "uncovered page!\n");
-
-                       resname = &osc_env_info(env)->oti_resname;
-                       mdc_build_res_name(cl2osc(obj), resname);
-                       res = ldlm_resource_get(
-                               osc_export(cl2osc(obj))->exp_obd->obd_namespace,
-                               NULL, resname, LDLM_IBITS, 0);
-                       ldlm_resource_dump(D_ERROR, res);
-
-                       libcfs_debug_dumpstack(NULL);
-                       LBUG();
-               }
-
-               /* check for lockless io. */
-               if (lock != NULL) {
-                       attr->cra_oa->o_handle = lock->l_remote_handle;
-                       attr->cra_oa->o_valid |= OBD_MD_FLHANDLE;
-                       LDLM_LOCK_PUT(lock);
+               if (!opg->ops_srvlock) {
+                       int rc;
+
+                       rc = mdc_get_lock_handle(env, cl2osc(obj),
+                                                osc_index(opg),
+                                                &attr->cra_oa->o_handle);
+                       if (rc) {
+                               CL_PAGE_DEBUG(D_ERROR, env, attr->cra_page,
+                                             "uncovered page!\n");
+                               LBUG();
+                       } else {
+                               attr->cra_oa->o_valid |= OBD_MD_FLHANDLE;
+                       }
                }
        }
 }
@@ -1192,10 +1369,6 @@ static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data)
 {
        ENTRY;
 
-       CDEBUG(D_DLMTRACE, "obj: %p/%p, lock %p\n",
-               data, lock->l_ast_data, lock);
-
-       LASSERT(lock->l_granted_mode == lock->l_req_mode);
        if ((lock->l_ast_data == NULL && !ldlm_is_kms_ignore(lock)) ||
            (lock->l_ast_data == data)) {
                lock->l_ast_data = NULL;