Whamcloud - gitweb
LU-5683 clio: add CIT_DATA_VERSION
[fs/lustre-release.git] / lustre / osc / osc_io.c
index b2e54a9..34679c9 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -92,6 +92,45 @@ static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io)
 {
 }
 
+static void osc_read_ahead_release(const struct lu_env *env,
+                                  void *cbdata)
+{
+       struct ldlm_lock *dlmlock = cbdata;
+       struct lustre_handle lockh;
+
+       ldlm_lock2handle(dlmlock, &lockh);
+       ldlm_lock_decref(&lockh, LCK_PR);
+       LDLM_LOCK_PUT(dlmlock);
+}
+
+static int osc_io_read_ahead(const struct lu_env *env,
+                            const struct cl_io_slice *ios,
+                            pgoff_t start, struct cl_read_ahead *ra)
+{
+       struct osc_object       *osc = cl2osc(ios->cis_obj);
+       struct ldlm_lock        *dlmlock;
+       int                     result = -ENODATA;
+       ENTRY;
+
+       dlmlock = osc_dlmlock_at_pgoff(env, osc, start, 0);
+       if (dlmlock != NULL) {
+               if (dlmlock->l_req_mode != LCK_PR) {
+                       struct lustre_handle lockh;
+                       ldlm_lock2handle(dlmlock, &lockh);
+                       ldlm_lock_addref(&lockh, LCK_PR);
+                       ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
+               }
+
+               ra->cra_end = cl_index(osc2cl(osc),
+                                      dlmlock->l_policy_data.l_extent.end);
+               ra->cra_release = osc_read_ahead_release;
+               ra->cra_cbdata = dlmlock;
+               result = 0;
+       }
+
+       RETURN(result);
+}
+
 /**
  * An implementation of cl_io_operations::cio_io_submit() method for osc
  * layer. Iterates over pages in the in-queue, prepares each for io by calling
@@ -113,11 +152,11 @@ static int osc_io_submit(const struct lu_env *env,
 
        struct cl_page_list *qin      = &queue->c2_qin;
        struct cl_page_list *qout     = &queue->c2_qout;
-       int queued = 0;
+       unsigned int queued = 0;
        int result = 0;
        int cmd;
        int brw_flags;
-       int max_pages;
+       unsigned int max_pages;
 
        LASSERT(qin->pl_nr > 0);
 
@@ -204,7 +243,7 @@ static int osc_io_submit(const struct lu_env *env,
  * Expand stripe KMS if necessary.
  */
 static void osc_page_touch_at(const struct lu_env *env,
-                              struct cl_object *obj, pgoff_t idx, unsigned to)
+                             struct cl_object *obj, pgoff_t idx, size_t to)
 {
         struct lov_oinfo  *loi  = cl2osc(obj)->oo_oinfo;
         struct cl_attr    *attr = &osc_env_info(env)->oti_attr;
@@ -228,16 +267,16 @@ static void osc_page_touch_at(const struct lu_env *env,
 
        attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME);
        valid = CAT_MTIME | CAT_CTIME;
-        if (kms > loi->loi_kms) {
-                attr->cat_kms = kms;
-                valid |= CAT_KMS;
-        }
-        if (kms > loi->loi_lvb.lvb_size) {
-                attr->cat_size = kms;
-                valid |= CAT_SIZE;
-        }
-        cl_object_attr_set(env, obj, attr, valid);
-        cl_object_attr_unlock(obj);
+       if (kms > loi->loi_kms) {
+               attr->cat_kms = kms;
+               valid |= CAT_KMS;
+       }
+       if (kms > loi->loi_lvb.lvb_size) {
+               attr->cat_size = kms;
+               valid |= CAT_SIZE;
+       }
+       cl_object_attr_update(env, obj, attr, valid);
+       cl_object_attr_unlock(obj);
 }
 
 static int osc_io_commit_async(const struct lu_env *env,
@@ -321,30 +360,30 @@ static int osc_io_rw_iter_init(const struct lu_env *env,
        struct osc_object *osc = cl2osc(ios->cis_obj);
        struct client_obd *cli = osc_cli(osc);
        unsigned long c;
-       unsigned int npages;
-       unsigned int max_pages;
+       unsigned long npages;
+       unsigned long max_pages;
        ENTRY;
 
        if (cl_io_is_append(io))
                RETURN(0);
 
        npages = io->u.ci_rw.crw_count >> PAGE_CACHE_SHIFT;
-       if (io->u.ci_rw.crw_pos & ~CFS_PAGE_MASK)
+       if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
                ++npages;
 
        max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
        if (npages > max_pages)
                npages = max_pages;
 
-       c = atomic_read(cli->cl_lru_left);
+       c = atomic_long_read(cli->cl_lru_left);
        if (c < npages && osc_lru_reclaim(cli) > 0)
-               c = atomic_read(cli->cl_lru_left);
+               c = atomic_long_read(cli->cl_lru_left);
        while (c >= npages) {
-               if (c == atomic_cmpxchg(cli->cl_lru_left, c, c - npages)) {
+               if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
                        oio->oi_lru_reserved = npages;
                        break;
                }
-               c = atomic_read(cli->cl_lru_left);
+               c = atomic_long_read(cli->cl_lru_left);
        }
 
        RETURN(0);
@@ -358,32 +397,32 @@ static void osc_io_rw_iter_fini(const struct lu_env *env,
        struct client_obd *cli = osc_cli(osc);
 
        if (oio->oi_lru_reserved > 0) {
-               atomic_add(oio->oi_lru_reserved, cli->cl_lru_left);
+               atomic_long_add(oio->oi_lru_reserved, cli->cl_lru_left);
                oio->oi_lru_reserved = 0;
        }
+       oio->oi_write_osclock = NULL;
 }
 
 static int osc_io_fault_start(const struct lu_env *env,
-                              const struct cl_io_slice *ios)
+                             const struct cl_io_slice *ios)
 {
-        struct cl_io       *io;
-        struct cl_fault_io *fio;
-
-        ENTRY;
+       struct cl_io       *io;
+       struct cl_fault_io *fio;
+       ENTRY;
 
-        io  = ios->cis_io;
-        fio = &io->u.ci_fault;
-        CDEBUG(D_INFO, "%lu %d %d\n",
-               fio->ft_index, fio->ft_writable, fio->ft_nob);
-        /*
-         * If mapping is writeable, adjust kms to cover this page,
-         * but do not extend kms beyond actual file size.
-         * See bug 10919.
-         */
-        if (fio->ft_writable)
-                osc_page_touch_at(env, ios->cis_obj,
-                                  fio->ft_index, fio->ft_nob);
-        RETURN(0);
+       io  = ios->cis_io;
+       fio = &io->u.ci_fault;
+       CDEBUG(D_INFO, "%lu %d %zu\n",
+               fio->ft_index, fio->ft_writable, fio->ft_nob);
+       /*
+        * If mapping is writeable, adjust kms to cover this page,
+        * but do not extend kms beyond actual file size.
+        * See bug 10919.
+        */
+       if (fio->ft_writable)
+               osc_page_touch_at(env, ios->cis_obj,
+                                 fio->ft_index, fio->ft_nob);
+       RETURN(0);
 }
 
 static int osc_async_upcall(void *a, int rc)
@@ -395,7 +434,6 @@ static int osc_async_upcall(void *a, int rc)
         return 0;
 }
 
-#if defined(__KERNEL__)
 /**
  * Checks that there are no pages being written in the extent being truncated.
  */
@@ -412,11 +450,9 @@ static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
                CL_PAGE_DEBUG(D_ERROR, env, page, "exists " LPU64 "/%s.\n",
                                start, current->comm);
 
-#ifdef __linux__
        if (PageLocked(page->cp_vmpage))
                CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
                       ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
-#endif
 
        return CLP_GANG_OKAY;
 }
@@ -439,13 +475,6 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
                                start + partial, CL_PAGE_EOF,
                                trunc_check_cb, (void *)&size);
 }
-#else /* __KERNEL__ */
-static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
-                           struct osc_io *oio, __u64 size)
-{
-       return;
-}
-#endif
 
 static int osc_io_setattr_start(const struct lu_env *env,
                                 const struct cl_io_slice *slice)
@@ -487,20 +516,31 @@ static int osc_io_setattr_start(const struct lu_env *env,
                        }
                        if (ia_valid & ATTR_CTIME_SET) {
                                attr->cat_ctime = lvb->lvb_ctime;
-                                cl_valid |= CAT_CTIME;
-                        }
-                        result = cl_object_attr_set(env, obj, attr, cl_valid);
-                }
-                cl_object_attr_unlock(obj);
-        }
-        memset(oa, 0, sizeof(*oa));
-        if (result == 0) {
+                               cl_valid |= CAT_CTIME;
+                       }
+                       result = cl_object_attr_update(env, obj, attr,
+                                                      cl_valid);
+               }
+               cl_object_attr_unlock(obj);
+       }
+       memset(oa, 0, sizeof(*oa));
+       if (result == 0) {
                oa->o_oi = loi->loi_oi;
-               oa->o_mtime = attr->cat_mtime;
-               oa->o_atime = attr->cat_atime;
-               oa->o_ctime = attr->cat_ctime;
-               oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME |
-                       OBD_MD_FLCTIME | OBD_MD_FLMTIME;
+               obdo_set_parent_fid(oa, io->u.ci_setattr.sa_parent_fid);
+               oa->o_stripe_idx = io->u.ci_setattr.sa_stripe_index;
+               oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
+               if (ia_valid & ATTR_CTIME) {
+                       oa->o_valid |= OBD_MD_FLCTIME;
+                       oa->o_ctime = attr->cat_ctime;
+               }
+               if (ia_valid & ATTR_ATIME) {
+                       oa->o_valid |= OBD_MD_FLATIME;
+                       oa->o_atime = attr->cat_atime;
+               }
+               if (ia_valid & ATTR_MTIME) {
+                       oa->o_valid |= OBD_MD_FLMTIME;
+                       oa->o_mtime = attr->cat_mtime;
+               }
                 if (ia_valid & ATTR_SIZE) {
                         oa->o_size = size;
                         oa->o_blocks = OBD_OBJECT_EOF;
@@ -514,6 +554,11 @@ static int osc_io_setattr_start(const struct lu_env *env,
                         LASSERT(oio->oi_lockless == 0);
                 }
 
+               if (ia_valid & ATTR_ATTR_FLAG) {
+                       oa->o_flags = io->u.ci_setattr.sa_attr_flags;
+                       oa->o_valid |= OBD_MD_FLFLAGS;
+               }
+
                 oinfo.oi_oa = oa;
                 oinfo.oi_capa = io->u.ci_setattr.sa_capa;
                init_completion(&cbargs->opc_sync);
@@ -523,10 +568,10 @@ static int osc_io_setattr_start(const struct lu_env *env,
                                                &oinfo, osc_async_upcall,
                                                 cbargs, PTLRPCD_SET);
                 else
-                        result = osc_setattr_async_base(osc_export(cl2osc(obj)),
-                                                        &oinfo, NULL,
-                                                       osc_async_upcall,
-                                                        cbargs, PTLRPCD_SET);
+                       result = osc_setattr_async(osc_export(cl2osc(obj)),
+                                                  &oinfo,
+                                                  osc_async_upcall,
+                                                  cbargs, PTLRPCD_SET);
                cbargs->opc_rpc_sent = result == 0;
         }
         return result;
@@ -566,6 +611,111 @@ static void osc_io_setattr_end(const struct lu_env *env,
        }
 }
 
+struct osc_data_version_args {
+       struct osc_io *dva_oio;
+};
+
+static int
+osc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req,
+                          void *arg, int rc)
+{
+       struct osc_data_version_args *dva = arg;
+       struct osc_io *oio = dva->dva_oio;
+       const struct ost_body *body;
+
+       ENTRY;
+       if (rc < 0)
+               GOTO(out, rc);
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL)
+               GOTO(out, rc = -EPROTO);
+
+       lustre_get_wire_obdo(&req->rq_import->imp_connect_data, &oio->oi_oa,
+                            &body->oa);
+       EXIT;
+out:
+       oio->oi_cbarg.opc_rc = rc;
+       complete(&oio->oi_cbarg.opc_sync);
+
+       return 0;
+}
+
+static int osc_io_data_version_start(const struct lu_env *env,
+                                    const struct cl_io_slice *slice)
+{
+       struct cl_data_version_io *dv   = &slice->cis_io->u.ci_data_version;
+       struct osc_io           *oio    = cl2osc_io(env, slice);
+       struct obdo             *oa     = &oio->oi_oa;
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+       struct osc_object       *obj    = cl2osc(slice->cis_obj);
+       struct lov_oinfo        *loi    = obj->oo_oinfo;
+       struct obd_export       *exp    = osc_export(obj);
+       struct ptlrpc_request   *req;
+       struct ost_body         *body;
+       struct osc_data_version_args *dva;
+       int rc;
+
+       ENTRY;
+       memset(oa, 0, sizeof(*oa));
+       oa->o_oi = loi->loi_oi;
+       oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+       if (dv->dv_flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) {
+               oa->o_valid |= OBD_MD_FLFLAGS;
+               oa->o_flags |= OBD_FL_SRVLOCK;
+               if (dv->dv_flags & LL_DV_WR_FLUSH)
+                       oa->o_flags |= OBD_FL_FLUSH;
+       }
+
+       init_completion(&cbargs->opc_sync);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
+       if (rc < 0) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+
+       ptlrpc_request_set_replen(req);
+       req->rq_interpret_reply = osc_data_version_interpret;
+       CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args));
+       dva = ptlrpc_req_async_args(req);
+       dva->dva_oio = oio;
+
+       ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+
+       RETURN(0);
+}
+
+static void osc_io_data_version_end(const struct lu_env *env,
+                                   const struct cl_io_slice *slice)
+{
+       struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version;
+       struct osc_io           *oio    = cl2osc_io(env, slice);
+       struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+
+       ENTRY;
+       wait_for_completion(&cbargs->opc_sync);
+
+       if (cbargs->opc_rc != 0) {
+               slice->cis_io->ci_result = cbargs->opc_rc;
+       } else if (!(oio->oi_oa.o_valid & OBD_MD_FLDATAVERSION)) {
+               slice->cis_io->ci_result = -EOPNOTSUPP;
+       } else {
+               dv->dv_data_version = oio->oi_oa.o_data_version;
+               slice->cis_io->ci_result = 0;
+       }
+
+       EXIT;
+}
+
 static int osc_io_read_start(const struct lu_env *env,
                              const struct cl_io_slice *slice)
 {
@@ -577,7 +727,7 @@ static int osc_io_read_start(const struct lu_env *env,
        if (!slice->cis_io->ci_noatime) {
                cl_object_attr_lock(obj);
                attr->cat_atime = LTIME_S(CFS_CURRENT_TIME);
-               rc = cl_object_attr_set(env, obj, attr, CAT_ATIME);
+               rc = cl_object_attr_update(env, obj, attr, CAT_ATIME);
                cl_object_attr_unlock(obj);
        }
 
@@ -595,7 +745,7 @@ static int osc_io_write_start(const struct lu_env *env,
        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1);
        cl_object_attr_lock(obj);
        attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME);
-       rc = cl_object_attr_set(env, obj, attr, CAT_MTIME | CAT_CTIME);
+       rc = cl_object_attr_update(env, obj, attr, CAT_MTIME | CAT_CTIME);
        cl_object_attr_unlock(obj);
 
        RETURN(rc);
@@ -723,6 +873,10 @@ static const struct cl_io_operations osc_io_ops = {
                        .cio_start  = osc_io_setattr_start,
                        .cio_end    = osc_io_setattr_end
                },
+               [CIT_DATA_VERSION] = {
+                       .cio_start  = osc_io_data_version_start,
+                       .cio_end    = osc_io_data_version_end,
+               },
                [CIT_FAULT] = {
                        .cio_start  = osc_io_fault_start,
                        .cio_end    = osc_io_end,
@@ -737,6 +891,7 @@ static const struct cl_io_operations osc_io_ops = {
                        .cio_fini   = osc_io_fini
                }
        },
+       .cio_read_ahead             = osc_io_read_ahead,
        .cio_submit                 = osc_io_submit,
        .cio_commit_async           = osc_io_commit_async
 };
@@ -770,13 +925,12 @@ static void osc_req_completion(const struct lu_env *env,
 static void osc_req_attr_set(const struct lu_env *env,
                             const struct cl_req_slice *slice,
                             const struct cl_object *obj,
-                            struct cl_req_attr *attr, obd_valid flags)
+                            struct cl_req_attr *attr, u64 flags)
 {
        struct lov_oinfo *oinfo;
        struct cl_req    *clerq;
        struct cl_page   *apage; /* _some_ page in @clerq */
-       struct cl_lock   *lock;  /* _some_ lock protecting @apage */
-       struct osc_lock  *olck;
+       struct ldlm_lock *lock;  /* _some_ lock protecting @apage */
        struct osc_page  *opg;
        struct obdo      *oa;
        struct ost_lvb   *lvb;
@@ -806,41 +960,37 @@ static void osc_req_attr_set(const struct lu_env *env,
                oa->o_valid |= OBD_MD_FLID;
        }
        if (flags & OBD_MD_FLHANDLE) {
-               struct cl_object *subobj;
-
                clerq = slice->crs_req;
                LASSERT(!list_empty(&clerq->crq_pages));
                apage = container_of(clerq->crq_pages.next,
                                     struct cl_page, cp_flight);
                opg = osc_cl_page_osc(apage, NULL);
-               subobj = opg->ops_cl.cpl_obj;
-               lock = cl_lock_at_pgoff(env, subobj, osc_index(opg),
-                                       NULL, 1, 1);
-               if (lock == NULL) {
-                       struct cl_object_header *head;
-                       struct cl_lock          *scan;
-
-                       head = cl_object_header(subobj);
-                       list_for_each_entry(scan, &head->coh_locks,
-                                           cll_linkage)
-                                CL_LOCK_DEBUG(D_ERROR, env, scan,
-                                              "no cover page!\n");
-                        CL_PAGE_DEBUG(D_ERROR, env, apage,
-                                      "dump uncover page!\n");
-                        libcfs_debug_dumpstack(NULL);
-                        LBUG();
-                }
+               lock = osc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg),
+                               OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_CANCELING);
+               if (lock == NULL && !opg->ops_srvlock) {
+                       struct ldlm_resource *res;
+                       struct ldlm_res_id *resname;
+
+                       CL_PAGE_DEBUG(D_ERROR, env, apage, "uncovered page!\n");
+
+                       resname = &osc_env_info(env)->oti_resname;
+                       ostid_build_res_name(&oinfo->loi_oi, resname);
+                       res = ldlm_resource_get(
+                               osc_export(cl2osc(obj))->exp_obd->obd_namespace,
+                               NULL, resname, LDLM_EXTENT, 0);
+                       ldlm_resource_dump(D_ERROR, res);
+
+                       libcfs_debug_dumpstack(NULL);
+                       LBUG();
+               }
 
-                olck = osc_lock_at(lock);
-                LASSERT(olck != NULL);
-                LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL));
-                /* check for lockless io. */
-                if (olck->ols_lock != NULL) {
-                        oa->o_handle = olck->ols_lock->l_remote_handle;
-                        oa->o_valid |= OBD_MD_FLHANDLE;
-                }
-                cl_lock_put(env, lock);
-        }
+               /* check for lockless io. */
+               if (lock != NULL) {
+                       oa->o_handle = lock->l_remote_handle;
+                       oa->o_valid |= OBD_MD_FLHANDLE;
+                       LDLM_LOCK_PUT(lock);
+               }
+       }
 }
 
 static const struct cl_req_operations osc_req_ops = {