Whamcloud - gitweb
LU-6124 test: skip tests require remote server with nodsh set
[fs/lustre-release.git] / lustre / lov / lov_io.c
index bdb8140..cf1ccfe 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -88,11 +88,15 @@ static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
        struct lov_stripe_md *lsm    = lio->lis_object->lo_lsm;
        struct cl_io         *parent = lio->lis_cl.cis_io;
 
-        switch(io->ci_type) {
-        case CIT_SETATTR: {
-                io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
-                io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
-                io->u.ci_setattr.sa_capa = parent->u.ci_setattr.sa_capa;
+       switch (io->ci_type) {
+       case CIT_SETATTR: {
+               io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
+               io->u.ci_setattr.sa_attr_flags =
+                       parent->u.ci_setattr.sa_attr_flags;
+               io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
+               io->u.ci_setattr.sa_stripe_index = stripe;
+               io->u.ci_setattr.sa_parent_fid =
+                                       parent->u.ci_setattr.sa_parent_fid;
                 if (cl_io_is_trunc(io)) {
                         loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
 
@@ -101,6 +105,12 @@ static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
                 }
                 break;
         }
+       case CIT_DATA_VERSION: {
+               io->u.ci_data_version.dv_data_version = 0;
+               io->u.ci_data_version.dv_flags =
+                       parent->u.ci_data_version.dv_flags;
+               break;
+       }
         case CIT_FAULT: {
                 struct cl_object *obj = parent->ci_obj;
                 loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
@@ -113,7 +123,6 @@ static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
        case CIT_FSYNC: {
                io->u.ci_fsync.fi_start = start;
                io->u.ci_fsync.fi_end = end;
-               io->u.ci_fsync.fi_capa = parent->u.ci_fsync.fi_capa;
                io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
                io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
                break;
@@ -344,6 +353,11 @@ static int lov_io_slice_init(struct lov_io *lio,
                 lio->lis_endpos = OBD_OBJECT_EOF;
                 break;
 
+       case CIT_DATA_VERSION:
+               lio->lis_pos = 0;
+               lio->lis_endpos = OBD_OBJECT_EOF;
+               break;
+
         case CIT_FAULT: {
                 pgoff_t index = io->u.ci_fault.ft_index;
                 lio->lis_pos = cl_offset(io->ci_obj, index);
@@ -390,7 +404,7 @@ static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
        EXIT;
 }
 
-static obd_off lov_offset_mod(obd_off val, int delta)
+static loff_t lov_offset_mod(loff_t val, int delta)
 {
         if (val != OBD_OBJECT_EOF)
                 val += delta;
@@ -402,10 +416,10 @@ static int lov_io_iter_init(const struct lu_env *env,
 {
        struct lov_io        *lio = cl2lov_io(env, ios);
        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
-        struct lov_io_sub    *sub;
-        obd_off endpos;
-        obd_off start;
-        obd_off end;
+       struct lov_io_sub    *sub;
+       loff_t endpos;
+       loff_t start;
+       loff_t end;
         int stripe;
         int rc = 0;
 
@@ -550,6 +564,27 @@ static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
         LASSERT(rc == 0);
 }
 
+static void
+lov_io_data_version_end(const struct lu_env *env, const struct cl_io_slice *ios)
+{
+       struct lov_io *lio = cl2lov_io(env, ios);
+       struct cl_io *parent = lio->lis_cl.cis_io;
+       struct lov_io_sub *sub;
+
+       ENTRY;
+       list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
+               lov_io_end_wrapper(env, sub->sub_io);
+
+               parent->u.ci_data_version.dv_data_version +=
+                       sub->sub_io->u.ci_data_version.dv_data_version;
+
+               if (parent->ci_result == 0)
+                       parent->ci_result = sub->sub_io->ci_result;
+       }
+
+       EXIT;
+}
+
 static void lov_io_iter_fini(const struct lu_env *env,
                              const struct cl_io_slice *ios)
 {
@@ -575,6 +610,65 @@ static void lov_io_unlock(const struct lu_env *env,
         EXIT;
 }
 
+static int lov_io_read_ahead(const struct lu_env *env,
+                            const struct cl_io_slice *ios,
+                            pgoff_t start, struct cl_read_ahead *ra)
+{
+       struct lov_io           *lio = cl2lov_io(env, ios);
+       struct lov_object       *loo = lio->lis_object;
+       struct cl_object        *obj = lov2cl(loo);
+       struct lov_layout_raid0 *r0 = lov_r0(loo);
+       struct lov_io_sub       *sub;
+       loff_t                   suboff;
+       pgoff_t                  ra_end;
+       unsigned int             pps; /* pages per stripe */
+       int                      stripe;
+       int                      rc;
+       ENTRY;
+
+       stripe = lov_stripe_number(loo->lo_lsm, cl_offset(obj, start));
+       if (unlikely(r0->lo_sub[stripe] == NULL))
+               RETURN(-EIO);
+
+       sub = lov_sub_get(env, lio, stripe);
+
+       lov_stripe_offset(loo->lo_lsm, cl_offset(obj, start), stripe, &suboff);
+       rc = cl_io_read_ahead(sub->sub_env, sub->sub_io,
+                             cl_index(lovsub2cl(r0->lo_sub[stripe]), suboff),
+                             ra);
+       lov_sub_put(sub);
+
+       CDEBUG(D_READA, DFID " cra_end = %lu, stripes = %d, rc = %d\n",
+              PFID(lu_object_fid(lov2lu(loo))), ra->cra_end, r0->lo_nr, rc);
+       if (rc != 0)
+               RETURN(rc);
+
+       /**
+        * Adjust the stripe index by layout of raid0. ra->cra_end is the maximum
+        * page index covered by an underlying DLM lock.
+        * This function converts cra_end from stripe level to file level, and
+        * make sure it's not beyond stripe boundary.
+        */
+       if (r0->lo_nr == 1) /* single stripe file */
+               RETURN(0);
+
+       /* cra_end is stripe level, convert it into file level */
+       ra_end = ra->cra_end;
+       if (ra_end != CL_PAGE_EOF)
+               ra_end = lov_stripe_pgoff(loo->lo_lsm, ra_end, stripe);
+
+       pps = loo->lo_lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
+
+       CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, "
+              "stripe_size = %u, stripe no = %u, start index = %lu\n",
+              PFID(lu_object_fid(lov2lu(loo))), ra_end, pps,
+              loo->lo_lsm->lsm_stripe_size, stripe, start);
+
+       /* never exceed the end of the stripe */
+       ra->cra_end = min_t(pgoff_t, ra_end, start + pps - start % pps - 1);
+       RETURN(0);
+}
+
 /**
  * lov implementation of cl_operations::cio_submit() method. It takes a list
  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
@@ -801,6 +895,15 @@ static const struct cl_io_operations lov_io_ops = {
                         .cio_start     = lov_io_start,
                         .cio_end       = lov_io_end
                 },
+               [CIT_DATA_VERSION] = {
+                       .cio_fini       = lov_io_fini,
+                       .cio_iter_init  = lov_io_iter_init,
+                       .cio_iter_fini  = lov_io_iter_fini,
+                       .cio_lock       = lov_io_lock,
+                       .cio_unlock     = lov_io_unlock,
+                       .cio_start      = lov_io_start,
+                       .cio_end        = lov_io_data_version_end,
+               },
                 [CIT_FAULT] = {
                         .cio_fini      = lov_io_fini,
                         .cio_iter_init = lov_io_iter_init,
@@ -823,6 +926,7 @@ static const struct cl_io_operations lov_io_ops = {
                        .cio_fini      = lov_io_fini
                }
        },
+       .cio_read_ahead                = lov_io_read_ahead,
        .cio_submit                    = lov_io_submit,
        .cio_commit_async              = lov_io_commit_async,
 };
@@ -844,6 +948,13 @@ static void lov_empty_io_fini(const struct lu_env *env,
        EXIT;
 }
 
+static int lov_empty_io_submit(const struct lu_env *env,
+                              const struct cl_io_slice *ios,
+                              enum cl_req_type crt, struct cl_2queue *queue)
+{
+       return -EBADF;
+}
+
 static void lov_empty_impossible(const struct lu_env *env,
                                  struct cl_io_slice *ios)
 {
@@ -894,7 +1005,7 @@ static const struct cl_io_operations lov_empty_io_ops = {
                        .cio_fini      = lov_empty_io_fini
                }
        },
-       .cio_submit                    = LOV_EMPTY_IMPOSSIBLE,
+       .cio_submit                    = lov_empty_io_submit,
        .cio_commit_async              = LOV_EMPTY_IMPOSSIBLE
 };
 
@@ -938,6 +1049,7 @@ int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
                break;
        case CIT_FSYNC:
        case CIT_SETATTR:
+       case CIT_DATA_VERSION:
                result = +1;
                break;
        case CIT_WRITE:
@@ -974,6 +1086,7 @@ int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
                LASSERTF(0, "invalid type %d\n", io->ci_type);
        case CIT_MISC:
        case CIT_FSYNC:
+       case CIT_DATA_VERSION:
                result = 1;
                break;
        case CIT_SETATTR:
@@ -982,9 +1095,11 @@ int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
                 * - in setattr, for truncate
                 */
                /* the truncate is for size > 0 so triggers a restore */
-               if (cl_io_is_trunc(io))
+               if (cl_io_is_trunc(io)) {
                        io->ci_restore_needed = 1;
-               result = -ENODATA;
+                       result = -ENODATA;
+               } else
+                       result = 1;
                break;
        case CIT_READ:
        case CIT_WRITE: