Whamcloud - gitweb
LU-10810 clio: SEEK_HOLE/SEEK_DATA on client side
[fs/lustre-release.git] / lustre / lov / lov_io.c
index 095fdb6..152984e 100644 (file)
@@ -115,7 +115,9 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
                     !lov_r0(lov, index)->lo_sub[stripe]))
                RETURN(-EIO);
 
-       LASSERTF(is_index_within_mirror(lov, index, lio->lis_mirror_index),
+       LASSERTF(ergo(lov_is_flr(lov),
+                     is_index_within_mirror(lov, index,
+                                            lio->lis_mirror_index)),
                 DFID "iot = %d, index = %d, mirror = %d\n",
                 PFID(lu_object_fid(lov2lu(lov))), io->ci_type, index,
                 lio->lis_mirror_index);
@@ -138,9 +140,11 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
        sub_io->ci_type    = io->ci_type;
        sub_io->ci_no_srvlock = io->ci_no_srvlock;
        sub_io->ci_noatime = io->ci_noatime;
+       sub_io->ci_async_readahead = io->ci_async_readahead;
        sub_io->ci_lock_no_expand = io->ci_lock_no_expand;
        sub_io->ci_ndelay = io->ci_ndelay;
        sub_io->ci_layout_version = io->ci_layout_version;
+       sub_io->ci_tried_all_mirrors = io->ci_tried_all_mirrors;
 
        result = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
 
@@ -182,6 +186,8 @@ struct lov_io_sub *lov_sub_get(const struct lu_env *env,
 out:
        if (rc < 0)
                sub = ERR_PTR(rc);
+       else
+               sub->sub_io.ci_noquota = lio->lis_cl.cis_io->ci_noquota;
        RETURN(sub);
 }
 
@@ -190,19 +196,6 @@ out:
  * Lov io operations.
  *
  */
-
-int lov_page_index(const struct cl_page *page)
-{
-       const struct cl_page_slice *slice;
-       ENTRY;
-
-       slice = cl_page_at(page, &lov_device_type);
-       LASSERT(slice != NULL);
-       LASSERT(slice->cpl_obj != NULL);
-
-       RETURN(cl2lov_page(slice)->lps_index);
-}
-
 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
                              struct cl_io *io)
 {
@@ -268,8 +261,8 @@ static int lov_io_mirror_write_intent(struct lov_io *lio,
                if (!lu_extent_is_overlapped(ext, lle->lle_extent))
                        continue;
 
-               ext->e_start = MIN(ext->e_start, lle->lle_extent->e_start);
-               ext->e_end = MAX(ext->e_end, lle->lle_extent->e_end);
+               ext->e_start = min(ext->e_start, lle->lle_extent->e_start);
+               ext->e_end = max(ext->e_end, lle->lle_extent->e_end);
                ++count;
        }
        if (count == 0) {
@@ -315,8 +308,13 @@ static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
        ENTRY;
 
        if (!lov_is_flr(obj)) {
-               LASSERT(comp->lo_preferred_mirror == 0);
-               lio->lis_mirror_index = comp->lo_preferred_mirror;
+               /* only locks/pages are manipulated for CIT_MISC op, no
+                * cl_io_loop() will be called, don't check/set mirror info.
+                */
+               if (io->ci_type != CIT_MISC) {
+                       LASSERT(comp->lo_preferred_mirror == 0);
+                       lio->lis_mirror_index = comp->lo_preferred_mirror;
+               }
                io->ci_ndelay = 0;
                RETURN(0);
        }
@@ -375,11 +373,12 @@ static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
                        /**
                         * for truncate, we uses [size, EOF) to judge whether
                         * a write intent needs to be send, but we need to
-                        * restore the write extent to [0, size).
+                        * restore the write extent to [0, size], in truncate,
+                        * the byte in the size position is accessed.
                         */
                        io->ci_write_intent.e_start = 0;
                        io->ci_write_intent.e_end =
-                                       io->u.ci_setattr.sa_attr.lvb_size;
+                                       io->u.ci_setattr.sa_attr.lvb_size + 1;
                }
                /* stop cl_io_init() loop */
                RETURN(1);
@@ -417,13 +416,13 @@ static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
                                found = true;
                                break;
                        }
-               }
-
+               } /* each component of the mirror */
                if (found) {
                        index = (index + i) % comp->lo_mirror_count;
                        break;
                }
-       }
+       } /* each mirror */
+
        if (i == comp->lo_mirror_count) {
                CERROR(DFID": failed to find a component covering "
                       "I/O region at %llu\n",
@@ -447,16 +446,21 @@ static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
         * of this client has been partitioned. We should relinquish CPU for
         * a while before trying again.
         */
-       ++io->ci_ndelay_tried;
-       if (io->ci_ndelay && io->ci_ndelay_tried >= comp->lo_mirror_count) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               schedule_timeout(msecs_to_jiffies(MSEC_PER_SEC)); /* 10ms */
+       if (io->ci_ndelay && io->ci_ndelay_tried > 0 &&
+           (io->ci_ndelay_tried % comp->lo_mirror_count == 0)) {
+               schedule_timeout_interruptible(cfs_time_seconds(1) / 100);
                if (signal_pending(current))
                        RETURN(-EINTR);
 
-               /* reset retry counter */
-               io->ci_ndelay_tried = 1;
+               /**
+                * we'd set ci_tried_all_mirrors to turn off fast mirror
+                * switching for read after we've tried all mirrors several
+                * rounds.
+                */
+               io->ci_tried_all_mirrors = io->ci_ndelay_tried %
+                                          (comp->lo_mirror_count * 4) == 0;
        }
+       ++io->ci_ndelay_tried;
 
        CDEBUG(D_VFSTRACE, "use %sdelayed RPC state for this IO\n",
               io->ci_ndelay ? "non-" : "");
@@ -499,11 +503,16 @@ static int lov_io_slice_init(struct lov_io *lio,
                break;
 
        case CIT_SETATTR:
-               if (cl_io_is_trunc(io))
+               if (cl_io_is_fallocate(io)) {
+                       lio->lis_pos = io->u.ci_setattr.sa_falloc_offset;
+                       lio->lis_endpos = io->u.ci_setattr.sa_falloc_end;
+               } else if (cl_io_is_trunc(io)) {
                        lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
-               else
+                       lio->lis_endpos = OBD_OBJECT_EOF;
+               } else {
                        lio->lis_pos = 0;
-               lio->lis_endpos = OBD_OBJECT_EOF;
+                       lio->lis_endpos = OBD_OBJECT_EOF;
+               }
                break;
 
        case CIT_DATA_VERSION:
@@ -531,6 +540,12 @@ static int lov_io_slice_init(struct lov_io *lio,
                break;
        }
 
+       case CIT_LSEEK: {
+               lio->lis_pos = io->u.ci_lseek.ls_start;
+               lio->lis_endpos = OBD_OBJECT_EOF;
+               break;
+       }
+
        case CIT_GLIMPSE:
                lio->lis_pos = 0;
                lio->lis_endpos = OBD_OBJECT_EOF;
@@ -565,7 +580,15 @@ static int lov_io_slice_init(struct lov_io *lio,
         */
        if (cl_io_is_trunc(io)) {
                io->ci_write_intent.e_start = 0;
-               io->ci_write_intent.e_end = io->u.ci_setattr.sa_attr.lvb_size;
+               /* for writes, e_end is endpos, the location of the file
+                * pointer after the write is completed, so it is not accessed.
+                * For truncate, 'end' is the size, and *is* acccessed.
+                * In other words, writes are [start, end), but truncate is
+                * [start, size], where both are included.  So add 1 to the
+                * size when creating the write intent to account for this.
+                */
+               io->ci_write_intent.e_end =
+                       io->u.ci_setattr.sa_attr.lvb_size + 1;
        } else {
                io->ci_write_intent.e_start = lio->lis_pos;
                io->ci_write_intent.e_end = lio->lis_endpos;
@@ -647,15 +670,24 @@ static void lov_io_sub_inherit(struct lov_io_sub *sub, struct lov_io *lio,
                        parent->u.ci_setattr.sa_attr_flags;
                io->u.ci_setattr.sa_avalid = parent->u.ci_setattr.sa_avalid;
                io->u.ci_setattr.sa_xvalid = parent->u.ci_setattr.sa_xvalid;
+               io->u.ci_setattr.sa_falloc_mode =
+                       parent->u.ci_setattr.sa_falloc_mode;
                io->u.ci_setattr.sa_stripe_index = stripe;
                io->u.ci_setattr.sa_parent_fid =
                                        parent->u.ci_setattr.sa_parent_fid;
+               /* For SETATTR(fallocate) pass the subtype to lower IO */
+               io->u.ci_setattr.sa_subtype = parent->u.ci_setattr.sa_subtype;
                if (cl_io_is_trunc(io)) {
                        loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
 
                        new_size = lov_size_to_stripe(lsm, index, new_size,
                                                      stripe);
                        io->u.ci_setattr.sa_attr.lvb_size = new_size;
+               } else if (cl_io_is_fallocate(io)) {
+                       io->u.ci_setattr.sa_falloc_offset = start;
+                       io->u.ci_setattr.sa_falloc_end = end;
+                       io->u.ci_setattr.sa_attr.lvb_size =
+                               parent->u.ci_setattr.sa_attr.lvb_size;
                }
                lov_lsm2layout(lsm, lsm->lsm_entries[index],
                               &io->u.ci_setattr.sa_layout);
@@ -686,6 +718,7 @@ static void lov_io_sub_inherit(struct lov_io_sub *sub, struct lov_io *lio,
        case CIT_READ:
        case CIT_WRITE: {
                io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
+               io->ci_tried_all_mirrors = parent->ci_tried_all_mirrors;
                if (cl_io_is_append(parent)) {
                        io->u.ci_wr.wr_append = 1;
                } else {
@@ -702,6 +735,12 @@ static void lov_io_sub_inherit(struct lov_io_sub *sub, struct lov_io *lio,
                io->u.ci_ladvise.li_flags = parent->u.ci_ladvise.li_flags;
                break;
        }
+       case CIT_LSEEK: {
+               io->u.ci_lseek.ls_start = start;
+               io->u.ci_lseek.ls_whence = parent->u.ci_lseek.ls_whence;
+               io->u.ci_lseek.ls_result = parent->u.ci_lseek.ls_result;
+               break;
+       }
        case CIT_GLIMPSE:
        case CIT_MISC:
        default:
@@ -1040,7 +1079,8 @@ static int lov_io_read_ahead(const struct lu_env *env,
                              ra);
 
        CDEBUG(D_READA, DFID " cra_end = %lu, stripes = %d, rc = %d\n",
-              PFID(lu_object_fid(lov2lu(loo))), ra->cra_end, r0->lo_nr, rc);
+              PFID(lu_object_fid(lov2lu(loo))), ra->cra_end_idx,
+                   r0->lo_nr, rc);
        if (rc != 0)
                RETURN(rc);
 
@@ -1052,29 +1092,29 @@ static int lov_io_read_ahead(const struct lu_env *env,
         */
 
        /* cra_end is stripe level, convert it into file level */
-       ra_end = ra->cra_end;
+       ra_end = ra->cra_end_idx;
        if (ra_end != CL_PAGE_EOF)
-               ra->cra_end = lov_stripe_pgoff(loo->lo_lsm, index,
-                                              ra_end, stripe);
+               ra->cra_end_idx = lov_stripe_pgoff(loo->lo_lsm, index,
+                                                  ra_end, stripe);
 
        /* boundary of current component */
        ra_end = cl_index(obj, (loff_t)lov_io_extent(lio, index)->e_end);
-       if (ra_end != CL_PAGE_EOF && ra->cra_end >= ra_end)
-               ra->cra_end = ra_end - 1;
+       if (ra_end != CL_PAGE_EOF && ra->cra_end_idx >= ra_end)
+               ra->cra_end_idx = ra_end - 1;
 
        if (r0->lo_nr == 1) /* single stripe file */
                RETURN(0);
 
        pps = lov_lse(loo, index)->lsme_stripe_size >> PAGE_SHIFT;
 
-       CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, index = %u, "
+       CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, index = %d, "
               "stripe_size = %u, stripe no = %u, start index = %lu\n",
-              PFID(lu_object_fid(lov2lu(loo))), ra->cra_end, pps, index,
+              PFID(lu_object_fid(lov2lu(loo))), ra->cra_end_idx, pps, index,
               lov_lse(loo, index)->lsme_stripe_size, stripe, start);
 
        /* never exceed the end of the stripe */
-       ra->cra_end = min_t(pgoff_t,
-                           ra->cra_end, start + pps - start % pps - 1);
+       ra->cra_end_idx = min_t(pgoff_t, ra->cra_end_idx,
+                               start + pps - start % pps - 1);
        RETURN(0);
 }
 
@@ -1102,6 +1142,7 @@ static int lov_io_submit(const struct lu_env *env,
        struct lov_io_sub       *sub;
        struct cl_page_list     *plist = &lov_env_info(env)->lti_plist;
        struct cl_page          *page;
+       struct cl_page          *tmp;
        int index;
        int rc = 0;
        ENTRY;
@@ -1127,11 +1168,11 @@ static int lov_io_submit(const struct lu_env *env,
                cl_2queue_init(cl2q);
                cl_page_list_move(&cl2q->c2_qin, qin, page);
 
-               index = lov_page_index(page);
-               while (qin->pl_nr > 0) {
-                       page = cl_page_list_first(qin);
-                       if (index != lov_page_index(page))
-                               break;
+               index = page->cp_lov_index;
+               cl_page_list_for_each_safe(page, tmp, qin) {
+                       /* this page is not on this stripe */
+                       if (index != page->cp_lov_index)
+                               continue;
 
                        cl_page_list_move(&cl2q->c2_qin, qin, page);
                }
@@ -1194,10 +1235,10 @@ static int lov_io_commit_async(const struct lu_env *env,
 
                cl_page_list_move(plist, queue, page);
 
-               index = lov_page_index(page);
+               index = page->cp_lov_index;
                while (queue->pl_nr > 0) {
                        page = cl_page_list_first(queue);
-                       if (index != lov_page_index(page))
+                       if (index != page->cp_lov_index)
                                break;
 
                        cl_page_list_move(plist, queue, page);
@@ -1243,7 +1284,7 @@ static int lov_io_fault_start(const struct lu_env *env,
 
        fio = &ios->cis_io->u.ci_fault;
        lio = cl2lov_io(env, ios);
-       sub = lov_sub_get(env, lio, lov_page_index(fio->ft_page));
+       sub = lov_sub_get(env, lio, fio->ft_page->cp_lov_index);
        sub->sub_io.u.ci_fault.ft_nob = fio->ft_nob;
 
        RETURN(lov_io_start(env, ios));
@@ -1269,6 +1310,83 @@ static void lov_io_fsync_end(const struct lu_env *env,
        RETURN_EXIT;
 }
 
+static void lov_io_lseek_end(const struct lu_env *env,
+                            const struct cl_io_slice *ios)
+{
+       struct lov_io *lio = cl2lov_io(env, ios);
+       struct cl_io *io = lio->lis_cl.cis_io;
+       struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+       struct lov_io_sub *sub;
+       loff_t offset = -ENXIO;
+       bool seek_hole = io->u.ci_lseek.ls_whence == SEEK_HOLE;
+
+       ENTRY;
+
+       list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
+               struct cl_io *subio = &sub->sub_io;
+               int index = lov_comp_entry(sub->sub_subio_index);
+               int stripe = lov_comp_stripe(sub->sub_subio_index);
+               loff_t sub_off, lov_off;
+
+               lov_io_end_wrapper(sub->sub_env, subio);
+
+               if (io->ci_result == 0)
+                       io->ci_result = sub->sub_io.ci_result;
+
+               if (io->ci_result)
+                       continue;
+
+               CDEBUG(D_INFO, DFID": entry %x stripe %u: SEEK_%s from %lld\n",
+                      PFID(lu_object_fid(lov2lu(lio->lis_object))),
+                      index, stripe, seek_hole ? "HOLE" : "DATA",
+                      subio->u.ci_lseek.ls_start);
+
+               /* first subio with positive result is what we need */
+               sub_off = subio->u.ci_lseek.ls_result;
+               /* Expected error, offset is out of stripe file size */
+               if (sub_off == -ENXIO)
+                       continue;
+               /* Any other errors are not expected with ci_result == 0 */
+               if (sub_off < 0) {
+                       CDEBUG(D_INFO, "unexpected error: rc = %lld\n",
+                              sub_off);
+                       io->ci_result = sub_off;
+                       continue;
+               }
+               lov_off = lov_stripe_size(lsm, index, sub_off + 1, stripe) - 1;
+               if (lov_off < 0) {
+                       /* the only way to get negatove lov_off here is too big
+                        * result. Return -EOVERFLOW then.
+                        */
+                       io->ci_result = -EOVERFLOW;
+                       CDEBUG(D_INFO, "offset %llu is too big: rc = %d\n",
+                              (u64)lov_off, io->ci_result);
+                       continue;
+               }
+               if (lov_off < io->u.ci_lseek.ls_start) {
+                       io->ci_result = -EINVAL;
+                       CDEBUG(D_INFO, "offset %lld < start %lld: rc = %d\n",
+                              sub_off, io->u.ci_lseek.ls_start, io->ci_result);
+                       continue;
+               }
+               /* resulting offset can be out of component range if stripe
+                * object is full and its file size was returned as virtual
+                * hole start. Skip this result, the next component will give
+                * us correct lseek result.
+                */
+               if (lov_off >= lsm->lsm_entries[index]->lsme_extent.e_end)
+                       continue;
+
+               CDEBUG(D_INFO, "SEEK_%s: %lld->%lld/%lld: rc = %d\n",
+                      seek_hole ? "HOLE" : "DATA",
+                      subio->u.ci_lseek.ls_start, sub_off, lov_off,
+                      sub->sub_io.ci_result);
+               offset = min_t(__u64, offset, lov_off);
+       }
+       io->u.ci_lseek.ls_result = offset;
+       RETURN_EXIT;
+}
+
 static const struct cl_io_operations lov_io_ops = {
        .op = {
                [CIT_READ] = {
@@ -1334,6 +1452,15 @@ static const struct cl_io_operations lov_io_ops = {
                        .cio_start     = lov_io_start,
                        .cio_end       = lov_io_end
                },
+               [CIT_LSEEK] = {
+                       .cio_fini      = lov_io_fini,
+                       .cio_iter_init = lov_io_iter_init,
+                       .cio_iter_fini = lov_io_iter_fini,
+                       .cio_lock      = lov_io_lock,
+                       .cio_unlock    = lov_io_unlock,
+                       .cio_start     = lov_io_start,
+                       .cio_end       = lov_io_lseek_end
+               },
                [CIT_GLIMPSE] = {
                        .cio_fini      = lov_io_fini,
                },
@@ -1474,6 +1601,7 @@ int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
                break;
        case CIT_FSYNC:
        case CIT_LADVISE:
+       case CIT_LSEEK:
        case CIT_SETATTR:
        case CIT_DATA_VERSION:
                result = +1;
@@ -1525,8 +1653,11 @@ int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
                 * - in open, for open O_TRUNC
                 * - in setattr, for truncate
                 */
-               /* the truncate is for size > 0 so triggers a restore */
-               if (cl_io_is_trunc(io)) {
+               /*
+                * the truncate is for size > 0 so triggers a restore,
+                * also trigger a restore for prealloc/punch
+                */
+               if (cl_io_is_trunc(io) || cl_io_is_fallocate(io)) {
                        io->ci_restore_needed = 1;
                        result = -ENODATA;
                } else
@@ -1535,6 +1666,7 @@ int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
        case CIT_READ:
        case CIT_WRITE:
        case CIT_FAULT:
+       case CIT_LSEEK:
                io->ci_restore_needed = 1;
                result = -ENODATA;
                break;