Whamcloud - gitweb
LU-10810 clio: SEEK_HOLE/SEEK_DATA on client side
[fs/lustre-release.git] / lustre / lov / lov_io.c
index e699603..152984e 100644 (file)
@@ -115,7 +115,9 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
                     !lov_r0(lov, index)->lo_sub[stripe]))
                RETURN(-EIO);
 
-       LASSERTF(is_index_within_mirror(lov, index, lio->lis_mirror_index),
+       LASSERTF(ergo(lov_is_flr(lov),
+                     is_index_within_mirror(lov, index,
+                                            lio->lis_mirror_index)),
                 DFID "iot = %d, index = %d, mirror = %d\n",
                 PFID(lu_object_fid(lov2lu(lov))), io->ci_type, index,
                 lio->lis_mirror_index);
@@ -184,6 +186,8 @@ struct lov_io_sub *lov_sub_get(const struct lu_env *env,
 out:
        if (rc < 0)
                sub = ERR_PTR(rc);
+       else
+               sub->sub_io.ci_noquota = lio->lis_cl.cis_io->ci_noquota;
        RETURN(sub);
 }
 
@@ -304,8 +308,13 @@ static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
        ENTRY;
 
        if (!lov_is_flr(obj)) {
-               LASSERT(comp->lo_preferred_mirror == 0);
-               lio->lis_mirror_index = comp->lo_preferred_mirror;
+               /* only locks/pages are manipulated for CIT_MISC op, no
+                * cl_io_loop() will be called, don't check/set mirror info.
+                */
+               if (io->ci_type != CIT_MISC) {
+                       LASSERT(comp->lo_preferred_mirror == 0);
+                       lio->lis_mirror_index = comp->lo_preferred_mirror;
+               }
                io->ci_ndelay = 0;
                RETURN(0);
        }
@@ -364,11 +373,12 @@ static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
                        /**
                         * for truncate, we uses [size, EOF) to judge whether
                         * a write intent needs to be send, but we need to
-                        * restore the write extent to [0, size).
+                        * restore the write extent to [0, size], in truncate,
+                        * the byte in the size position is accessed.
                         */
                        io->ci_write_intent.e_start = 0;
                        io->ci_write_intent.e_end =
-                                       io->u.ci_setattr.sa_attr.lvb_size;
+                                       io->u.ci_setattr.sa_attr.lvb_size + 1;
                }
                /* stop cl_io_init() loop */
                RETURN(1);
@@ -438,8 +448,7 @@ static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
         */
        if (io->ci_ndelay && io->ci_ndelay_tried > 0 &&
            (io->ci_ndelay_tried % comp->lo_mirror_count == 0)) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               schedule_timeout(cfs_time_seconds(1) / 100); /* 10ms */
+               schedule_timeout_interruptible(cfs_time_seconds(1) / 100);
                if (signal_pending(current))
                        RETURN(-EINTR);
 
@@ -494,11 +503,16 @@ static int lov_io_slice_init(struct lov_io *lio,
                break;
 
        case CIT_SETATTR:
-               if (cl_io_is_trunc(io))
+               if (cl_io_is_fallocate(io)) {
+                       lio->lis_pos = io->u.ci_setattr.sa_falloc_offset;
+                       lio->lis_endpos = io->u.ci_setattr.sa_falloc_end;
+               } else if (cl_io_is_trunc(io)) {
                        lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
-               else
+                       lio->lis_endpos = OBD_OBJECT_EOF;
+               } else {
                        lio->lis_pos = 0;
-               lio->lis_endpos = OBD_OBJECT_EOF;
+                       lio->lis_endpos = OBD_OBJECT_EOF;
+               }
                break;
 
        case CIT_DATA_VERSION:
@@ -526,6 +540,12 @@ static int lov_io_slice_init(struct lov_io *lio,
                break;
        }
 
+       case CIT_LSEEK: {
+               lio->lis_pos = io->u.ci_lseek.ls_start;
+               lio->lis_endpos = OBD_OBJECT_EOF;
+               break;
+       }
+
        case CIT_GLIMPSE:
                lio->lis_pos = 0;
                lio->lis_endpos = OBD_OBJECT_EOF;
@@ -650,15 +670,24 @@ static void lov_io_sub_inherit(struct lov_io_sub *sub, struct lov_io *lio,
                        parent->u.ci_setattr.sa_attr_flags;
                io->u.ci_setattr.sa_avalid = parent->u.ci_setattr.sa_avalid;
                io->u.ci_setattr.sa_xvalid = parent->u.ci_setattr.sa_xvalid;
+               io->u.ci_setattr.sa_falloc_mode =
+                       parent->u.ci_setattr.sa_falloc_mode;
                io->u.ci_setattr.sa_stripe_index = stripe;
                io->u.ci_setattr.sa_parent_fid =
                                        parent->u.ci_setattr.sa_parent_fid;
+               /* For SETATTR(fallocate) pass the subtype to lower IO */
+               io->u.ci_setattr.sa_subtype = parent->u.ci_setattr.sa_subtype;
                if (cl_io_is_trunc(io)) {
                        loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
 
                        new_size = lov_size_to_stripe(lsm, index, new_size,
                                                      stripe);
                        io->u.ci_setattr.sa_attr.lvb_size = new_size;
+               } else if (cl_io_is_fallocate(io)) {
+                       io->u.ci_setattr.sa_falloc_offset = start;
+                       io->u.ci_setattr.sa_falloc_end = end;
+                       io->u.ci_setattr.sa_attr.lvb_size =
+                               parent->u.ci_setattr.sa_attr.lvb_size;
                }
                lov_lsm2layout(lsm, lsm->lsm_entries[index],
                               &io->u.ci_setattr.sa_layout);
@@ -706,6 +735,12 @@ static void lov_io_sub_inherit(struct lov_io_sub *sub, struct lov_io *lio,
                io->u.ci_ladvise.li_flags = parent->u.ci_ladvise.li_flags;
                break;
        }
+       case CIT_LSEEK: {
+               io->u.ci_lseek.ls_start = start;
+               io->u.ci_lseek.ls_whence = parent->u.ci_lseek.ls_whence;
+               io->u.ci_lseek.ls_result = parent->u.ci_lseek.ls_result;
+               break;
+       }
        case CIT_GLIMPSE:
        case CIT_MISC:
        default:
@@ -1275,6 +1310,83 @@ static void lov_io_fsync_end(const struct lu_env *env,
        RETURN_EXIT;
 }
 
+static void lov_io_lseek_end(const struct lu_env *env,
+                            const struct cl_io_slice *ios)
+{
+       struct lov_io *lio = cl2lov_io(env, ios);
+       struct cl_io *io = lio->lis_cl.cis_io;
+       struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+       struct lov_io_sub *sub;
+       loff_t offset = -ENXIO;
+       bool seek_hole = io->u.ci_lseek.ls_whence == SEEK_HOLE;
+
+       ENTRY;
+
+       list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
+               struct cl_io *subio = &sub->sub_io;
+               int index = lov_comp_entry(sub->sub_subio_index);
+               int stripe = lov_comp_stripe(sub->sub_subio_index);
+               loff_t sub_off, lov_off;
+
+               lov_io_end_wrapper(sub->sub_env, subio);
+
+               if (io->ci_result == 0)
+                       io->ci_result = sub->sub_io.ci_result;
+
+               if (io->ci_result)
+                       continue;
+
+               CDEBUG(D_INFO, DFID": entry %x stripe %u: SEEK_%s from %lld\n",
+                      PFID(lu_object_fid(lov2lu(lio->lis_object))),
+                      index, stripe, seek_hole ? "HOLE" : "DATA",
+                      subio->u.ci_lseek.ls_start);
+
+               /* first subio with positive result is what we need */
+               sub_off = subio->u.ci_lseek.ls_result;
+               /* Expected error, offset is out of stripe file size */
+               if (sub_off == -ENXIO)
+                       continue;
+               /* Any other errors are not expected with ci_result == 0 */
+               if (sub_off < 0) {
+                       CDEBUG(D_INFO, "unexpected error: rc = %lld\n",
+                              sub_off);
+                       io->ci_result = sub_off;
+                       continue;
+               }
+               lov_off = lov_stripe_size(lsm, index, sub_off + 1, stripe) - 1;
+               if (lov_off < 0) {
+                       /* the only way to get negatove lov_off here is too big
+                        * result. Return -EOVERFLOW then.
+                        */
+                       io->ci_result = -EOVERFLOW;
+                       CDEBUG(D_INFO, "offset %llu is too big: rc = %d\n",
+                              (u64)lov_off, io->ci_result);
+                       continue;
+               }
+               if (lov_off < io->u.ci_lseek.ls_start) {
+                       io->ci_result = -EINVAL;
+                       CDEBUG(D_INFO, "offset %lld < start %lld: rc = %d\n",
+                              sub_off, io->u.ci_lseek.ls_start, io->ci_result);
+                       continue;
+               }
+               /* resulting offset can be out of component range if stripe
+                * object is full and its file size was returned as virtual
+                * hole start. Skip this result, the next component will give
+                * us correct lseek result.
+                */
+               if (lov_off >= lsm->lsm_entries[index]->lsme_extent.e_end)
+                       continue;
+
+               CDEBUG(D_INFO, "SEEK_%s: %lld->%lld/%lld: rc = %d\n",
+                      seek_hole ? "HOLE" : "DATA",
+                      subio->u.ci_lseek.ls_start, sub_off, lov_off,
+                      sub->sub_io.ci_result);
+               offset = min_t(__u64, offset, lov_off);
+       }
+       io->u.ci_lseek.ls_result = offset;
+       RETURN_EXIT;
+}
+
 static const struct cl_io_operations lov_io_ops = {
        .op = {
                [CIT_READ] = {
@@ -1340,6 +1452,15 @@ static const struct cl_io_operations lov_io_ops = {
                        .cio_start     = lov_io_start,
                        .cio_end       = lov_io_end
                },
+               [CIT_LSEEK] = {
+                       .cio_fini      = lov_io_fini,
+                       .cio_iter_init = lov_io_iter_init,
+                       .cio_iter_fini = lov_io_iter_fini,
+                       .cio_lock      = lov_io_lock,
+                       .cio_unlock    = lov_io_unlock,
+                       .cio_start     = lov_io_start,
+                       .cio_end       = lov_io_lseek_end
+               },
                [CIT_GLIMPSE] = {
                        .cio_fini      = lov_io_fini,
                },
@@ -1480,6 +1601,7 @@ int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
                break;
        case CIT_FSYNC:
        case CIT_LADVISE:
+       case CIT_LSEEK:
        case CIT_SETATTR:
        case CIT_DATA_VERSION:
                result = +1;
@@ -1531,8 +1653,11 @@ int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
                 * - in open, for open O_TRUNC
                 * - in setattr, for truncate
                 */
-               /* the truncate is for size > 0 so triggers a restore */
-               if (cl_io_is_trunc(io)) {
+               /*
+                * the truncate is for size > 0 so triggers a restore,
+                * also trigger a restore for prealloc/punch
+                */
+               if (cl_io_is_trunc(io) || cl_io_is_fallocate(io)) {
                        io->ci_restore_needed = 1;
                        result = -ENODATA;
                } else
@@ -1541,6 +1666,7 @@ int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
        case CIT_READ:
        case CIT_WRITE:
        case CIT_FAULT:
+       case CIT_LSEEK:
                io->ci_restore_needed = 1;
                result = -ENODATA;
                break;