Whamcloud - gitweb
LU-6401 headers: Create a header for obdo related functions
[fs/lustre-release.git] / lustre / osc / osc_io.c
index 4fb098c..69d4e06 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2014, Intel Corporation.
+ * Copyright (c) 2011, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -41,6 +41,8 @@
 
 #define DEBUG_SUBSYSTEM S_OSC
 
+#include <lustre_obdo.h>
+
 #include "osc_cl_internal.h"
 
 /** \addtogroup osc 
  *
  */
 
-static struct osc_req *cl2osc_req(const struct cl_req_slice *slice)
-{
-        LINVRNT(slice->crs_dev->cd_lu_dev.ld_type == &osc_device_type);
-        return container_of0(slice, struct osc_req, or_cl);
-}
-
 static struct osc_io *cl2osc_io(const struct lu_env *env,
                                 const struct cl_io_slice *slice)
 {
@@ -67,21 +63,6 @@ static struct osc_io *cl2osc_io(const struct lu_env *env,
         return oio;
 }
 
-static struct osc_page *osc_cl_page_osc(struct cl_page *page,
-                                       struct osc_object *osc)
-{
-       const struct cl_page_slice *slice;
-
-       if (osc != NULL)
-               slice = cl_object_page_slice(&osc->oo_cl, page);
-       else
-               slice = cl_page_at(page, &osc_device_type);
-       LASSERT(slice != NULL);
-
-       return cl2osc_page(slice);
-}
-
-
 /*****************************************************************************
  *
  * io operations.
@@ -352,55 +333,75 @@ static int osc_io_commit_async(const struct lu_env *env,
        RETURN(result);
 }
 
-static int osc_io_rw_iter_init(const struct lu_env *env,
-                               const struct cl_io_slice *ios)
+static int osc_io_iter_init(const struct lu_env *env,
+                           const struct cl_io_slice *ios)
+{
+       struct osc_object *osc = cl2osc(ios->cis_obj);
+       struct obd_import *imp = osc_cli(osc)->cl_import;
+       int rc = -EIO;
+
+       spin_lock(&imp->imp_lock);
+       if (likely(!imp->imp_invalid)) {
+               struct osc_io *oio = osc_env_io(env);
+
+               atomic_inc(&osc->oo_nr_ios);
+               oio->oi_is_active = 1;
+               rc = 0;
+       }
+       spin_unlock(&imp->imp_lock);
+
+       return rc;
+}
+
+static int osc_io_write_iter_init(const struct lu_env *env,
+                                 const struct cl_io_slice *ios)
 {
        struct cl_io *io = ios->cis_io;
        struct osc_io *oio = osc_env_io(env);
        struct osc_object *osc = cl2osc(ios->cis_obj);
-       struct client_obd *cli = osc_cli(osc);
-       unsigned long c;
        unsigned long npages;
-       unsigned long max_pages;
        ENTRY;
 
        if (cl_io_is_append(io))
-               RETURN(0);
+               RETURN(osc_io_iter_init(env, ios));
 
        npages = io->u.ci_rw.crw_count >> PAGE_CACHE_SHIFT;
        if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
                ++npages;
 
-       max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
-       if (npages > max_pages)
-               npages = max_pages;
+       oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
 
-       c = atomic_long_read(cli->cl_lru_left);
-       if (c < npages && osc_lru_reclaim(cli) > 0)
-               c = atomic_long_read(cli->cl_lru_left);
-       while (c >= npages) {
-               if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
-                       oio->oi_lru_reserved = npages;
-                       break;
-               }
-               c = atomic_long_read(cli->cl_lru_left);
-       }
+       RETURN(osc_io_iter_init(env, ios));
+}
 
-       RETURN(0);
+static void osc_io_iter_fini(const struct lu_env *env,
+                            const struct cl_io_slice *ios)
+{
+       struct osc_io *oio = osc_env_io(env);
+
+       if (oio->oi_is_active) {
+               struct osc_object *osc = cl2osc(ios->cis_obj);
+
+               oio->oi_is_active = 0;
+               LASSERT(atomic_read(&osc->oo_nr_ios) > 0);
+               if (atomic_dec_and_test(&osc->oo_nr_ios))
+                       wake_up_all(&osc->oo_io_waitq);
+       }
 }
 
-static void osc_io_rw_iter_fini(const struct lu_env *env,
-                               const struct cl_io_slice *ios)
+static void osc_io_write_iter_fini(const struct lu_env *env,
+                                  const struct cl_io_slice *ios)
 {
        struct osc_io *oio = osc_env_io(env);
        struct osc_object *osc = cl2osc(ios->cis_obj);
-       struct client_obd *cli = osc_cli(osc);
 
        if (oio->oi_lru_reserved > 0) {
-               atomic_long_add(oio->oi_lru_reserved, cli->cl_lru_left);
+               osc_lru_unreserve(osc_cli(osc), oio->oi_lru_reserved);
                oio->oi_lru_reserved = 0;
        }
        oio->oi_write_osclock = NULL;
+
+       osc_io_iter_fini(env, ios);
 }
 
 static int osc_io_fault_start(const struct lu_env *env,
@@ -492,7 +493,8 @@ static int osc_io_setattr_start(const struct lu_env *env,
 
        /* truncate cache dirty pages first */
        if (cl_io_is_trunc(io))
-               result = osc_cache_truncate_start(env, oio, cl2osc(obj), size);
+               result = osc_cache_truncate_start(env, cl2osc(obj), size,
+                                                 &oio->oi_trunc);
 
        if (result == 0 && oio->oi_lockless == 0) {
                cl_object_attr_lock(obj);
@@ -602,10 +604,8 @@ static void osc_io_setattr_end(const struct lu_env *env,
        if (cl_io_is_trunc(io)) {
                __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
                osc_trunc_check(env, io, oio, size);
-               if (oio->oi_trunc != NULL) {
-                       osc_cache_truncate_end(env, oio, cl2osc(obj));
-                       oio->oi_trunc = NULL;
-               }
+               osc_cache_truncate_end(env, oio->oi_trunc);
+               oio->oi_trunc = NULL;
        }
 }
 
@@ -687,7 +687,7 @@ static int osc_io_data_version_start(const struct lu_env *env,
        dva = ptlrpc_req_async_args(req);
        dva->dva_oio = oio;
 
-       ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+       ptlrpcd_add_req(req);
 
        RETURN(0);
 }
@@ -852,17 +852,21 @@ static void osc_io_end(const struct lu_env *env,
 static const struct cl_io_operations osc_io_ops = {
        .op = {
                [CIT_READ] = {
+                       .cio_iter_init = osc_io_iter_init,
+                       .cio_iter_fini = osc_io_iter_fini,
                        .cio_start  = osc_io_read_start,
                        .cio_fini   = osc_io_fini
                },
                [CIT_WRITE] = {
-                       .cio_iter_init = osc_io_rw_iter_init,
-                       .cio_iter_fini = osc_io_rw_iter_fini,
+                       .cio_iter_init = osc_io_write_iter_init,
+                       .cio_iter_fini = osc_io_write_iter_fini,
                        .cio_start  = osc_io_write_start,
                        .cio_end    = osc_io_end,
                        .cio_fini   = osc_io_fini
                },
                [CIT_SETATTR] = {
+                       .cio_iter_init = osc_io_iter_init,
+                       .cio_iter_fini = osc_io_iter_fini,
                        .cio_start  = osc_io_setattr_start,
                        .cio_end    = osc_io_setattr_end
                },
@@ -871,6 +875,8 @@ static const struct cl_io_operations osc_io_ops = {
                        .cio_end    = osc_io_data_version_end,
                },
                [CIT_FAULT] = {
+                       .cio_iter_init = osc_io_iter_init,
+                       .cio_iter_fini = osc_io_iter_fini,
                        .cio_start  = osc_io_fault_start,
                        .cio_end    = osc_io_end,
                        .cio_fini   = osc_io_fini
@@ -895,104 +901,6 @@ static const struct cl_io_operations osc_io_ops = {
  *
  */
 
-static int osc_req_prep(const struct lu_env *env,
-                        const struct cl_req_slice *slice)
-{
-        return 0;
-}
-
-static void osc_req_completion(const struct lu_env *env,
-                               const struct cl_req_slice *slice, int ioret)
-{
-        struct osc_req *or;
-
-        or = cl2osc_req(slice);
-        OBD_SLAB_FREE_PTR(or, osc_req_kmem);
-}
-
-/**
- * Implementation of struct cl_req_operations::cro_attr_set() for osc
- * layer. osc is responsible for struct obdo::o_id and struct obdo::o_seq
- * fields.
- */
-static void osc_req_attr_set(const struct lu_env *env,
-                            const struct cl_req_slice *slice,
-                            const struct cl_object *obj,
-                            struct cl_req_attr *attr, u64 flags)
-{
-       struct lov_oinfo *oinfo;
-       struct cl_req    *clerq;
-       struct cl_page   *apage; /* _some_ page in @clerq */
-       struct ldlm_lock *lock;  /* _some_ lock protecting @apage */
-       struct osc_page  *opg;
-       struct obdo      *oa;
-       struct ost_lvb   *lvb;
-
-       oinfo   = cl2osc(obj)->oo_oinfo;
-       lvb     = &oinfo->loi_lvb;
-       oa      = attr->cra_oa;
-
-       if ((flags & OBD_MD_FLMTIME) != 0) {
-               oa->o_mtime = lvb->lvb_mtime;
-               oa->o_valid |= OBD_MD_FLMTIME;
-       }
-       if ((flags & OBD_MD_FLATIME) != 0) {
-               oa->o_atime = lvb->lvb_atime;
-               oa->o_valid |= OBD_MD_FLATIME;
-       }
-       if ((flags & OBD_MD_FLCTIME) != 0) {
-               oa->o_ctime = lvb->lvb_ctime;
-               oa->o_valid |= OBD_MD_FLCTIME;
-       }
-       if (flags & OBD_MD_FLGROUP) {
-               ostid_set_seq(&oa->o_oi, ostid_seq(&oinfo->loi_oi));
-               oa->o_valid |= OBD_MD_FLGROUP;
-       }
-       if (flags & OBD_MD_FLID) {
-               ostid_set_id(&oa->o_oi, ostid_id(&oinfo->loi_oi));
-               oa->o_valid |= OBD_MD_FLID;
-       }
-       if (flags & OBD_MD_FLHANDLE) {
-               clerq = slice->crs_req;
-               LASSERT(!list_empty(&clerq->crq_pages));
-               apage = container_of(clerq->crq_pages.next,
-                                    struct cl_page, cp_flight);
-               opg = osc_cl_page_osc(apage, NULL);
-               lock = osc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg),
-                               OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_CANCELING);
-               if (lock == NULL && !opg->ops_srvlock) {
-                       struct ldlm_resource *res;
-                       struct ldlm_res_id *resname;
-
-                       CL_PAGE_DEBUG(D_ERROR, env, apage, "uncovered page!\n");
-
-                       resname = &osc_env_info(env)->oti_resname;
-                       ostid_build_res_name(&oinfo->loi_oi, resname);
-                       res = ldlm_resource_get(
-                               osc_export(cl2osc(obj))->exp_obd->obd_namespace,
-                               NULL, resname, LDLM_EXTENT, 0);
-                       ldlm_resource_dump(D_ERROR, res);
-
-                       libcfs_debug_dumpstack(NULL);
-                       LBUG();
-               }
-
-               /* check for lockless io. */
-               if (lock != NULL) {
-                       oa->o_handle = lock->l_remote_handle;
-                       oa->o_valid |= OBD_MD_FLHANDLE;
-                       LDLM_LOCK_PUT(lock);
-               }
-       }
-}
-
-static const struct cl_req_operations osc_req_ops = {
-        .cro_prep       = osc_req_prep,
-        .cro_attr_set   = osc_req_attr_set,
-        .cro_completion = osc_req_completion
-};
-
-
 int osc_io_init(const struct lu_env *env,
                 struct cl_object *obj, struct cl_io *io)
 {
@@ -1003,19 +911,4 @@ int osc_io_init(const struct lu_env *env,
         return 0;
 }
 
-int osc_req_init(const struct lu_env *env, struct cl_device *dev,
-                struct cl_req *req)
-{
-       struct osc_req *or;
-       int result;
-
-       OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, GFP_NOFS);
-       if (or != NULL) {
-               cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops);
-               result = 0;
-       } else
-               result = -ENOMEM;
-       return result;
-}
-
 /** @} osc */