From 8badb39913b5e1c614d2fe410ef7200391099855 Mon Sep 17 00:00:00 2001 From: Bobi Jam Date: Fri, 20 Mar 2015 17:58:30 +0800 Subject: [PATCH] LU-6389 llite: restart short read/write for normal IO If normal IO got short read/write, we'd restart the IO from where we've accomplished until we meet EOF or error happens. Signed-off-by: Bobi Jam Signed-off-by: Jinshan Xiong Change-Id: I4582affe8d2030df669b6e22b598b0c321e81466 Reviewed-on: http://review.whamcloud.com/14123 Tested-by: Jenkins Reviewed-by: Andreas Dilger Tested-by: Maloo Reviewed-by: Oleg Drokin --- libcfs/libcfs/fail.c | 1 + lustre/include/obd_support.h | 2 + lustre/llite/file.c | 216 ++++++++++++++++++++++++------------------- lustre/llite/vvp_io.c | 55 +++++++---- lustre/tests/sanity.sh | 17 ++++ 5 files changed, 176 insertions(+), 115 deletions(-) diff --git a/libcfs/libcfs/fail.c b/libcfs/libcfs/fail.c index a5542c7..303ee55 100644 --- a/libcfs/libcfs/fail.c +++ b/libcfs/libcfs/fail.c @@ -112,6 +112,7 @@ int __cfs_fail_check_set(__u32 id, __u32 value, int set) break; case CFS_FAIL_LOC_RESET: cfs_fail_loc = value; + atomic_set(&cfs_fail_count, 0); break; default: LASSERTF(0, "called with bad set %u\n", set); diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 7063ab3..f4eee4b 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -498,6 +498,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_LOV_INIT 0x1403 #define OBD_FAIL_GLIMPSE_DELAY 0x1404 #define OBD_FAIL_LLITE_XATTR_ENOMEM 0x1405 +#define OBD_FAIL_MAKE_LOVEA_HOLE 0x1406 +#define OBD_FAIL_LLITE_LOST_LAYOUT 0x1407 #define OBD_FAIL_FID_INDIR 0x1501 #define OBD_FAIL_FID_INLMA 0x1502 diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 8f77e8c..3b76ada 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -992,13 +992,15 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args, struct file *file, enum cl_io_type iot, loff_t *ppos, size_t count) { - struct inode *inode = file->f_dentry->d_inode; - struct ll_inode_info *lli = ll_i2info(inode); - loff_t end; - struct ll_file_data *fd = LUSTRE_FPRIVATE(file); - struct cl_io *io; - ssize_t result; - struct range_lock range; + struct vvp_io *vio = vvp_env_io(env); + struct inode *inode = file->f_dentry->d_inode; + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct cl_io *io; + ssize_t result = 0; + int rc = 0; + struct range_lock range; + ENTRY; CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: "LPU64", count: %zu\n", @@ -1006,23 +1008,9 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args, restart: io = vvp_env_thread_io(env); - ll_io_init(io, file, iot == CIT_WRITE); - - /* The maximum Lustre file size is variable, based on the - * OST maximum object size and number of stripes. This - * needs another check in addition to the VFS checks earlier. */ - end = (io->u.ci_wr.wr_append ? i_size_read(inode) : *ppos) + count; - if (end > ll_file_maxbytes(inode)) { - result = -EFBIG; - CDEBUG(D_INODE, "%s: file "DFID" offset %llu > maxbytes "LPU64 - ": rc = %zd\n", ll_get_fsname(inode->i_sb, NULL, 0), - PFID(&lli->lli_fid), end, ll_file_maxbytes(inode), - result); - RETURN(result); - } - - if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) { - struct vvp_io *vio = vvp_env_io(env); + ll_io_init(io, file, iot == CIT_WRITE); + + if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) { bool range_locked = false; if (file->f_flags & O_APPEND) @@ -1047,10 +1035,9 @@ restart: !(vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) { CDEBUG(D_VFSTRACE, "Range lock "RL_FMT"\n", RL_PARA(&range)); - result = range_lock(&lli->lli_write_tree, - &range); - if (result < 0) - GOTO(out, result); + rc = range_lock(&lli->lli_write_tree, &range); + if (rc < 0) + GOTO(out, rc); range_locked = true; } @@ -1066,7 +1053,7 @@ restart: } ll_cl_add(file, env, io); - result = cl_io_loop(env, io); + rc = cl_io_loop(env, io); ll_cl_remove(file, env); if (args->via_io_subtype == IO_NORMAL) @@ -1076,47 +1063,54 @@ restart: RL_PARA(&range)); range_unlock(&lli->lli_write_tree, &range); } - } else { - /* cl_io_rw_init() handled IO */ - result = io->ci_result; - } + } else { + /* cl_io_rw_init() handled IO */ + rc = io->ci_result; + } - if (io->ci_nob > 0) { - result = io->ci_nob; - *ppos = io->u.ci_wr.wr.crw_pos; - } - GOTO(out, result); + if (io->ci_nob > 0) { + result += io->ci_nob; + count -= io->ci_nob; + *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */ + + /* prepare IO restart */ + if (count > 0 && args->via_io_subtype == IO_NORMAL) { + args->u.normal.via_iov = vio->vui_iov; + args->u.normal.via_nrsegs = vio->vui_tot_nrsegs; + } + } + GOTO(out, rc); out: - cl_io_fini(env, io); - /* If any bit been read/written (result != 0), we just return - * short read/write instead of restart io. */ - if ((result == 0 || result == -ENODATA) && io->ci_need_restart) { - CDEBUG(D_VFSTRACE, "Restart %s on %s from %lld, count:%zu\n", + cl_io_fini(env, io); + + if ((rc == 0 || rc == -ENODATA) && count > 0 && io->ci_need_restart) { + CDEBUG(D_VFSTRACE, + "%s: restart %s from %lld, count:%zu, result: %zd\n", + file->f_dentry->d_name.name, iot == CIT_READ ? "read" : "write", - file->f_dentry->d_name.name, *ppos, count); - LASSERTF(io->ci_nob == 0, "%zd\n", io->ci_nob); + *ppos, count, result); goto restart; } - if (iot == CIT_READ) { - if (result >= 0) - ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode), - LPROC_LL_READ_BYTES, result); - } else if (iot == CIT_WRITE) { - if (result >= 0) { - ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode), - LPROC_LL_WRITE_BYTES, result); + if (iot == CIT_READ) { + if (result > 0) + ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode), + LPROC_LL_READ_BYTES, result); + } else if (iot == CIT_WRITE) { + if (result > 0) { + ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode), + LPROC_LL_WRITE_BYTES, result); fd->fd_write_failed = false; - } else if (result != -ERESTARTSYS) { + } else if (rc != -ERESTARTSYS) { fd->fd_write_failed = true; } } + CDEBUG(D_VFSTRACE, "iot: %d, result: %zd\n", iot, result); - return result; + return result > 0 ? result : rc; } - /* * XXX: exact copy from kernel code (__generic_file_aio_write_nolock) */ @@ -1149,10 +1143,11 @@ static int ll_file_get_iov_count(const struct iovec *iov, } static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov, - unsigned long nr_segs, loff_t pos) + unsigned long nr_segs, loff_t pos) { - struct lu_env *env; - struct vvp_io_args *args; + struct lu_env *env; + struct vvp_io_args *args; + struct iovec *local_iov; size_t count; ssize_t result; int refcheck; @@ -1166,22 +1161,40 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov, if (IS_ERR(env)) RETURN(PTR_ERR(env)); + if (nr_segs == 1) { + local_iov = &ll_env_info(env)->lti_local_iov; + *local_iov = *iov; + } else { + OBD_ALLOC(local_iov, sizeof(*iov) * nr_segs); + if (local_iov == NULL) { + cl_env_put(env, &refcheck); + RETURN(-ENOMEM); + } + + memcpy(local_iov, iov, sizeof(*iov) * nr_segs); + } + args = ll_env_args(env, IO_NORMAL); - args->u.normal.via_iov = (struct iovec *)iov; - args->u.normal.via_nrsegs = nr_segs; - args->u.normal.via_iocb = iocb; + args->u.normal.via_iov = local_iov; + args->u.normal.via_nrsegs = nr_segs; + args->u.normal.via_iocb = iocb; - result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ, - &iocb->ki_pos, count); - cl_env_put(env, &refcheck); - RETURN(result); + result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ, + &iocb->ki_pos, count); + + cl_env_put(env, &refcheck); + + if (nr_segs > 1) + OBD_FREE(local_iov, sizeof(*iov) * nr_segs); + + RETURN(result); } static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count, - loff_t *ppos) + loff_t *ppos) { - struct lu_env *env; - struct iovec *local_iov; + struct lu_env *env; + struct iovec iov = { .iov_base = buf, .iov_len = count }; struct kiocb *kiocb; ssize_t result; int refcheck; @@ -1191,10 +1204,7 @@ static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count, if (IS_ERR(env)) RETURN(PTR_ERR(env)); - local_iov = &ll_env_info(env)->lti_local_iov; kiocb = &ll_env_info(env)->lti_kiocb; - local_iov->iov_base = (void __user *)buf; - local_iov->iov_len = count; init_sync_kiocb(kiocb, file); kiocb->ki_pos = *ppos; #ifdef HAVE_KIOCB_KI_LEFT @@ -1203,11 +1213,11 @@ static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count, kiocb->ki_nbytes = count; #endif - result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos); - *ppos = kiocb->ki_pos; + result = ll_file_aio_read(kiocb, &iov, 1, kiocb->ki_pos); + *ppos = kiocb->ki_pos; - cl_env_put(env, &refcheck); - RETURN(result); + cl_env_put(env, &refcheck); + RETURN(result); } /* @@ -1215,10 +1225,11 @@ static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count, * AIO stuff */ static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov, - unsigned long nr_segs, loff_t pos) + unsigned long nr_segs, loff_t pos) { - struct lu_env *env; - struct vvp_io_args *args; + struct lu_env *env; + struct vvp_io_args *args; + struct iovec *local_iov; size_t count; ssize_t result; int refcheck; @@ -1232,22 +1243,40 @@ static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov, if (IS_ERR(env)) RETURN(PTR_ERR(env)); + if (nr_segs == 1) { + local_iov = &ll_env_info(env)->lti_local_iov; + *local_iov = *iov; + } else { + OBD_ALLOC(local_iov, sizeof(*iov) * nr_segs); + if (local_iov == NULL) { + cl_env_put(env, &refcheck); + RETURN(-ENOMEM); + } + + memcpy(local_iov, iov, sizeof(*iov) * nr_segs); + } + args = ll_env_args(env, IO_NORMAL); - args->u.normal.via_iov = (struct iovec *)iov; - args->u.normal.via_nrsegs = nr_segs; - args->u.normal.via_iocb = iocb; + args->u.normal.via_iov = local_iov; + args->u.normal.via_nrsegs = nr_segs; + args->u.normal.via_iocb = iocb; - result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE, - &iocb->ki_pos, count); - cl_env_put(env, &refcheck); - RETURN(result); + result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE, + &iocb->ki_pos, count); + cl_env_put(env, &refcheck); + + if (nr_segs > 1) + OBD_FREE(local_iov, sizeof(*iov) * nr_segs); + + RETURN(result); } static ssize_t ll_file_write(struct file *file, const char __user *buf, size_t count, loff_t *ppos) { - struct lu_env *env; - struct iovec *local_iov; + struct lu_env *env; + struct iovec iov = { .iov_base = (void __user *)buf, + .iov_len = count }; struct kiocb *kiocb; ssize_t result; int refcheck; @@ -1257,10 +1286,7 @@ static ssize_t ll_file_write(struct file *file, const char __user *buf, if (IS_ERR(env)) RETURN(PTR_ERR(env)); - local_iov = &ll_env_info(env)->lti_local_iov; kiocb = &ll_env_info(env)->lti_kiocb; - local_iov->iov_base = (void __user *)buf; - local_iov->iov_len = count; init_sync_kiocb(kiocb, file); kiocb->ki_pos = *ppos; #ifdef HAVE_KIOCB_KI_LEFT @@ -1269,11 +1295,11 @@ static ssize_t ll_file_write(struct file *file, const char __user *buf, kiocb->ki_nbytes = count; #endif - result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos); - *ppos = kiocb->ki_pos; + result = ll_file_aio_write(kiocb, &iov, 1, kiocb->ki_pos); + *ppos = kiocb->ki_pos; - cl_env_put(env, &refcheck); - RETURN(result); + cl_env_put(env, &refcheck); + RETURN(result); } /* diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index f7490c3..45c8f1b 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -87,9 +87,10 @@ static bool can_populate_pages(const struct lu_env *env, struct cl_io *io, case CIT_WRITE: /* don't need lock here to check lli_layout_gen as we have held * extent lock and GROUP lock has to hold to swap layout */ - if (ll_layout_version_get(lli) != vio->vui_layout_gen) { + if (ll_layout_version_get(lli) != vio->vui_layout_gen || + OBD_FAIL_CHECK_RESET(OBD_FAIL_LLITE_LOST_LAYOUT, 0)) { io->ci_need_restart = 1; - /* this will return application a short read/write */ + /* this will cause a short read/write */ io->ci_continue = 0; rc = false; } @@ -476,6 +477,7 @@ static void vvp_io_advance(const struct lu_env *env, struct vvp_io *vio = cl2vvp_io(env, ios); struct cl_io *io = ios->cis_io; struct cl_object *obj = ios->cis_io->ci_obj; + struct iovec *iov; CLOBINVRNT(env, obj, vvp_object_invariant(obj)); @@ -485,27 +487,28 @@ static void vvp_io_advance(const struct lu_env *env, LASSERT(vio->vui_tot_nrsegs >= vio->vui_nrsegs); LASSERT(vio->vui_tot_count >= nob); - vio->vui_iov += vio->vui_nrsegs; - vio->vui_tot_nrsegs -= vio->vui_nrsegs; - vio->vui_tot_count -= nob; - - /* update the iov */ + /* Restore the iov changed in vvp_io_update_iov() */ if (vio->vui_iov_olen > 0) { - struct iovec *iv; - - vio->vui_iov--; - vio->vui_tot_nrsegs++; - iv = &vio->vui_iov[0]; - if (io->ci_continue) { - iv->iov_base += iv->iov_len; - LASSERT(vio->vui_iov_olen > iv->iov_len); - iv->iov_len = vio->vui_iov_olen - iv->iov_len; - } else { - /* restore the iov_len, in case of restart io. */ - iv->iov_len = vio->vui_iov_olen; - } + vio->vui_iov[vio->vui_nrsegs - 1].iov_len = vio->vui_iov_olen; vio->vui_iov_olen = 0; } + + /* advance iov */ + iov = vio->vui_iov; + while (nob > 0) { + if (iov->iov_len > nob) { + iov->iov_len -= nob; + iov->iov_base += nob; + break; + } + + nob -= iov->iov_len; + iov++; + vio->vui_tot_nrsegs--; + } + + vio->vui_iov = iov; + vio->vui_tot_count -= nob; } static void vvp_io_update_iov(const struct lu_env *env, @@ -1025,6 +1028,18 @@ static int vvp_io_write_start(const struct lu_env *env, CDEBUG(D_VFSTRACE, "write: [%lli, %lli)\n", pos, pos + (long long)cnt); + /* The maximum Lustre file size is variable, based on the OST maximum + * object size and number of stripes. This needs another check in + * addition to the VFS checks earlier. */ + if (pos + cnt > ll_file_maxbytes(inode)) { + CDEBUG(D_INODE, + "%s: file "DFID" offset %llu > maxbytes "LPU64"\n", + ll_get_fsname(inode->i_sb, NULL, 0), + PFID(ll_inode2fid(inode)), pos + cnt, + ll_file_maxbytes(inode)); + RETURN(-EFBIG); + } + if (vio->vui_iov == NULL) { /* from a temp io in ll_cl_init(). */ result = 0; diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 08f125d..b37ceb6 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -12920,6 +12920,23 @@ test_250() { } run_test 250 "Write above 16T limit" +test_251() { + $SETSTRIPE -c -1 -S 1048576 $DIR/$tfile + + #define OBD_FAIL_LLITE_LOST_LAYOUT 0x1407 + #Skip once - writing the first stripe will succeed + $LCTL set_param fail_loc=0xa0001407 fail_val=1 + $MULTIOP $DIR/$tfile o:O_RDWR:w2097152c 2>&1 | grep -q "short write" && + error "short write happened" + + $LCTL set_param fail_loc=0xa0001407 fail_val=1 + $MULTIOP $DIR/$tfile or2097152c 2>&1 | grep -q "short read" && + error "short read happened" + + rm -f $DIR/$tfile +} +run_test 251 "Handling short read and write correctly" + cleanup_test_300() { trap 0 umask $SAVE_UMASK -- 1.8.3.1