Whamcloud - gitweb
LU-6389 llite: restart short read/write for normal IO 23/14123/21
authorBobi Jam <bobijam.xu@intel.com>
Fri, 20 Mar 2015 09:58:30 +0000 (17:58 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Sun, 17 May 2015 22:50:01 +0000 (22:50 +0000)
If normal IO got short read/write, we'd restart the IO from where
we've accomplished until we meet EOF or error happens.

Signed-off-by: Bobi Jam <bobijam.xu@intel.com>
Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I4582affe8d2030df669b6e22b598b0c321e81466
Reviewed-on: http://review.whamcloud.com/14123
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
libcfs/libcfs/fail.c
lustre/include/obd_support.h
lustre/llite/file.c
lustre/llite/vvp_io.c
lustre/tests/sanity.sh

index a5542c7..303ee55 100644 (file)
@@ -112,6 +112,7 @@ int __cfs_fail_check_set(__u32 id, __u32 value, int set)
                        break;
                case CFS_FAIL_LOC_RESET:
                        cfs_fail_loc = value;
                        break;
                case CFS_FAIL_LOC_RESET:
                        cfs_fail_loc = value;
+                       atomic_set(&cfs_fail_count, 0);
                        break;
                default:
                        LASSERTF(0, "called with bad set %u\n", set);
                        break;
                default:
                        LASSERTF(0, "called with bad set %u\n", set);
index 7063ab3..f4eee4b 100644 (file)
@@ -498,6 +498,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LOV_INIT                          0x1403
 #define OBD_FAIL_GLIMPSE_DELAY                     0x1404
 #define OBD_FAIL_LLITE_XATTR_ENOMEM                0x1405
 #define OBD_FAIL_LOV_INIT                          0x1403
 #define OBD_FAIL_GLIMPSE_DELAY                     0x1404
 #define OBD_FAIL_LLITE_XATTR_ENOMEM                0x1405
+#define OBD_FAIL_MAKE_LOVEA_HOLE                   0x1406
+#define OBD_FAIL_LLITE_LOST_LAYOUT                 0x1407
 
 #define OBD_FAIL_FID_INDIR     0x1501
 #define OBD_FAIL_FID_INLMA     0x1502
 
 #define OBD_FAIL_FID_INDIR     0x1501
 #define OBD_FAIL_FID_INLMA     0x1502
index 8f77e8c..3b76ada 100644 (file)
@@ -992,13 +992,15 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
                   struct file *file, enum cl_io_type iot,
                   loff_t *ppos, size_t count)
 {
                   struct file *file, enum cl_io_type iot,
                   loff_t *ppos, size_t count)
 {
-       struct inode *inode = file->f_dentry->d_inode;
-       struct ll_inode_info *lli = ll_i2info(inode);
-       loff_t               end;
-       struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
-       struct cl_io         *io;
-       ssize_t               result;
-       struct range_lock     range;
+       struct vvp_io           *vio = vvp_env_io(env);
+       struct inode            *inode = file->f_dentry->d_inode;
+       struct ll_inode_info    *lli = ll_i2info(inode);
+       struct ll_file_data     *fd  = LUSTRE_FPRIVATE(file);
+       struct cl_io            *io;
+       ssize_t                 result = 0;
+       int                     rc = 0;
+       struct range_lock       range;
+
        ENTRY;
 
        CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: "LPU64", count: %zu\n",
        ENTRY;
 
        CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: "LPU64", count: %zu\n",
@@ -1006,23 +1008,9 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
 
 restart:
        io = vvp_env_thread_io(env);
 
 restart:
        io = vvp_env_thread_io(env);
-        ll_io_init(io, file, iot == CIT_WRITE);
-
-       /* The maximum Lustre file size is variable, based on the
-        * OST maximum object size and number of stripes.  This
-        * needs another check in addition to the VFS checks earlier. */
-       end = (io->u.ci_wr.wr_append ? i_size_read(inode) : *ppos) + count;
-       if (end > ll_file_maxbytes(inode)) {
-               result = -EFBIG;
-               CDEBUG(D_INODE, "%s: file "DFID" offset %llu > maxbytes "LPU64
-                      ": rc = %zd\n", ll_get_fsname(inode->i_sb, NULL, 0),
-                      PFID(&lli->lli_fid), end, ll_file_maxbytes(inode),
-                      result);
-               RETURN(result);
-       }
-
-        if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
-               struct vvp_io *vio = vvp_env_io(env);
+       ll_io_init(io, file, iot == CIT_WRITE);
+
+       if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
                bool range_locked = false;
 
                if (file->f_flags & O_APPEND)
                bool range_locked = false;
 
                if (file->f_flags & O_APPEND)
@@ -1047,10 +1035,9 @@ restart:
                            !(vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
                                CDEBUG(D_VFSTRACE, "Range lock "RL_FMT"\n",
                                       RL_PARA(&range));
                            !(vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
                                CDEBUG(D_VFSTRACE, "Range lock "RL_FMT"\n",
                                       RL_PARA(&range));
-                               result = range_lock(&lli->lli_write_tree,
-                                                   &range);
-                               if (result < 0)
-                                       GOTO(out, result);
+                               rc = range_lock(&lli->lli_write_tree, &range);
+                               if (rc < 0)
+                                       GOTO(out, rc);
 
                                range_locked = true;
                        }
 
                                range_locked = true;
                        }
@@ -1066,7 +1053,7 @@ restart:
                }
 
                ll_cl_add(file, env, io);
                }
 
                ll_cl_add(file, env, io);
-                result = cl_io_loop(env, io);
+               rc = cl_io_loop(env, io);
                ll_cl_remove(file, env);
 
                if (args->via_io_subtype == IO_NORMAL)
                ll_cl_remove(file, env);
 
                if (args->via_io_subtype == IO_NORMAL)
@@ -1076,47 +1063,54 @@ restart:
                               RL_PARA(&range));
                        range_unlock(&lli->lli_write_tree, &range);
                }
                               RL_PARA(&range));
                        range_unlock(&lli->lli_write_tree, &range);
                }
-        } else {
-                /* cl_io_rw_init() handled IO */
-                result = io->ci_result;
-        }
+       } else {
+               /* cl_io_rw_init() handled IO */
+               rc = io->ci_result;
+       }
 
 
-        if (io->ci_nob > 0) {
-                result = io->ci_nob;
-                *ppos = io->u.ci_wr.wr.crw_pos;
-        }
-        GOTO(out, result);
+       if (io->ci_nob > 0) {
+               result += io->ci_nob;
+               count -= io->ci_nob;
+               *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
+
+               /* prepare IO restart */
+               if (count > 0 && args->via_io_subtype == IO_NORMAL) {
+                       args->u.normal.via_iov = vio->vui_iov;
+                       args->u.normal.via_nrsegs = vio->vui_tot_nrsegs;
+               }
+       }
+       GOTO(out, rc);
 out:
 out:
-        cl_io_fini(env, io);
-       /* If any bit been read/written (result != 0), we just return
-        * short read/write instead of restart io. */
-       if ((result == 0 || result == -ENODATA) && io->ci_need_restart) {
-               CDEBUG(D_VFSTRACE, "Restart %s on %s from %lld, count:%zu\n",
+       cl_io_fini(env, io);
+
+       if ((rc == 0 || rc == -ENODATA) && count > 0 && io->ci_need_restart) {
+               CDEBUG(D_VFSTRACE,
+                      "%s: restart %s from %lld, count:%zu, result: %zd\n",
+                      file->f_dentry->d_name.name,
                       iot == CIT_READ ? "read" : "write",
                       iot == CIT_READ ? "read" : "write",
-                      file->f_dentry->d_name.name, *ppos, count);
-               LASSERTF(io->ci_nob == 0, "%zd\n", io->ci_nob);
+                      *ppos, count, result);
                goto restart;
        }
 
                goto restart;
        }
 
-        if (iot == CIT_READ) {
-                if (result >= 0)
-                        ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
-                                           LPROC_LL_READ_BYTES, result);
-        } else if (iot == CIT_WRITE) {
-                if (result >= 0) {
-                        ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
-                                           LPROC_LL_WRITE_BYTES, result);
+       if (iot == CIT_READ) {
+               if (result > 0)
+                       ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
+                                          LPROC_LL_READ_BYTES, result);
+       } else if (iot == CIT_WRITE) {
+               if (result > 0) {
+                       ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
+                                          LPROC_LL_WRITE_BYTES, result);
                        fd->fd_write_failed = false;
                        fd->fd_write_failed = false;
-               } else if (result != -ERESTARTSYS) {
+               } else if (rc != -ERESTARTSYS) {
                        fd->fd_write_failed = true;
                }
        }
                        fd->fd_write_failed = true;
                }
        }
+
        CDEBUG(D_VFSTRACE, "iot: %d, result: %zd\n", iot, result);
 
        CDEBUG(D_VFSTRACE, "iot: %d, result: %zd\n", iot, result);
 
-       return result;
+       return result > 0 ? result : rc;
 }
 
 }
 
-
 /*
  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
  */
 /*
  * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
  */
@@ -1149,10 +1143,11 @@ static int ll_file_get_iov_count(const struct iovec *iov,
 }
 
 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
 }
 
 static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
-                                unsigned long nr_segs, loff_t pos)
+                               unsigned long nr_segs, loff_t pos)
 {
 {
-        struct lu_env      *env;
-        struct vvp_io_args *args;
+       struct lu_env      *env;
+       struct vvp_io_args *args;
+       struct iovec       *local_iov;
         size_t              count;
         ssize_t             result;
         int                 refcheck;
         size_t              count;
         ssize_t             result;
         int                 refcheck;
@@ -1166,22 +1161,40 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
+       if (nr_segs == 1) {
+               local_iov = &ll_env_info(env)->lti_local_iov;
+               *local_iov = *iov;
+       } else {
+               OBD_ALLOC(local_iov, sizeof(*iov) * nr_segs);
+               if (local_iov == NULL) {
+                       cl_env_put(env, &refcheck);
+                       RETURN(-ENOMEM);
+               }
+
+               memcpy(local_iov, iov, sizeof(*iov) * nr_segs);
+       }
+
        args = ll_env_args(env, IO_NORMAL);
        args = ll_env_args(env, IO_NORMAL);
-        args->u.normal.via_iov = (struct iovec *)iov;
-        args->u.normal.via_nrsegs = nr_segs;
-        args->u.normal.via_iocb = iocb;
+       args->u.normal.via_iov = local_iov;
+       args->u.normal.via_nrsegs = nr_segs;
+       args->u.normal.via_iocb = iocb;
 
 
-        result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
-                                    &iocb->ki_pos, count);
-        cl_env_put(env, &refcheck);
-        RETURN(result);
+       result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
+                                   &iocb->ki_pos, count);
+
+       cl_env_put(env, &refcheck);
+
+       if (nr_segs > 1)
+               OBD_FREE(local_iov, sizeof(*iov) * nr_segs);
+
+       RETURN(result);
 }
 
 static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count,
 }
 
 static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count,
-                            loff_t *ppos)
+                           loff_t *ppos)
 {
 {
-        struct lu_env *env;
-        struct iovec  *local_iov;
+       struct lu_env *env;
+       struct iovec   iov = { .iov_base = buf, .iov_len = count };
         struct kiocb  *kiocb;
         ssize_t        result;
         int            refcheck;
         struct kiocb  *kiocb;
         ssize_t        result;
         int            refcheck;
@@ -1191,10 +1204,7 @@ static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count,
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
-       local_iov = &ll_env_info(env)->lti_local_iov;
        kiocb = &ll_env_info(env)->lti_kiocb;
        kiocb = &ll_env_info(env)->lti_kiocb;
-        local_iov->iov_base = (void __user *)buf;
-        local_iov->iov_len = count;
         init_sync_kiocb(kiocb, file);
         kiocb->ki_pos = *ppos;
 #ifdef HAVE_KIOCB_KI_LEFT
         init_sync_kiocb(kiocb, file);
         kiocb->ki_pos = *ppos;
 #ifdef HAVE_KIOCB_KI_LEFT
@@ -1203,11 +1213,11 @@ static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count,
         kiocb->ki_nbytes = count;
 #endif
 
         kiocb->ki_nbytes = count;
 #endif
 
-        result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
-        *ppos = kiocb->ki_pos;
+       result = ll_file_aio_read(kiocb, &iov, 1, kiocb->ki_pos);
+       *ppos = kiocb->ki_pos;
 
 
-        cl_env_put(env, &refcheck);
-        RETURN(result);
+       cl_env_put(env, &refcheck);
+       RETURN(result);
 }
 
 /*
 }
 
 /*
@@ -1215,10 +1225,11 @@ static ssize_t ll_file_read(struct file *file, char __user *buf, size_t count,
  * AIO stuff
  */
 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
  * AIO stuff
  */
 static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
-                                 unsigned long nr_segs, loff_t pos)
+                                unsigned long nr_segs, loff_t pos)
 {
 {
-        struct lu_env      *env;
-        struct vvp_io_args *args;
+       struct lu_env      *env;
+       struct vvp_io_args *args;
+       struct iovec       *local_iov;
         size_t              count;
         ssize_t             result;
         int                 refcheck;
         size_t              count;
         ssize_t             result;
         int                 refcheck;
@@ -1232,22 +1243,40 @@ static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
+       if (nr_segs == 1) {
+               local_iov = &ll_env_info(env)->lti_local_iov;
+               *local_iov = *iov;
+       } else {
+               OBD_ALLOC(local_iov, sizeof(*iov) * nr_segs);
+               if (local_iov == NULL) {
+                       cl_env_put(env, &refcheck);
+                       RETURN(-ENOMEM);
+               }
+
+               memcpy(local_iov, iov, sizeof(*iov) * nr_segs);
+       }
+
        args = ll_env_args(env, IO_NORMAL);
        args = ll_env_args(env, IO_NORMAL);
-        args->u.normal.via_iov = (struct iovec *)iov;
-        args->u.normal.via_nrsegs = nr_segs;
-        args->u.normal.via_iocb = iocb;
+       args->u.normal.via_iov = local_iov;
+       args->u.normal.via_nrsegs = nr_segs;
+       args->u.normal.via_iocb = iocb;
 
 
-        result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
-                                  &iocb->ki_pos, count);
-        cl_env_put(env, &refcheck);
-        RETURN(result);
+       result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
+                                 &iocb->ki_pos, count);
+       cl_env_put(env, &refcheck);
+
+       if (nr_segs > 1)
+               OBD_FREE(local_iov, sizeof(*iov) * nr_segs);
+
+       RETURN(result);
 }
 
 static ssize_t ll_file_write(struct file *file, const char __user *buf,
                             size_t count, loff_t *ppos)
 {
 }
 
 static ssize_t ll_file_write(struct file *file, const char __user *buf,
                             size_t count, loff_t *ppos)
 {
-        struct lu_env *env;
-        struct iovec  *local_iov;
+       struct lu_env *env;
+       struct iovec   iov = { .iov_base = (void __user *)buf,
+                              .iov_len = count };
         struct kiocb  *kiocb;
         ssize_t        result;
         int            refcheck;
         struct kiocb  *kiocb;
         ssize_t        result;
         int            refcheck;
@@ -1257,10 +1286,7 @@ static ssize_t ll_file_write(struct file *file, const char __user *buf,
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
-       local_iov = &ll_env_info(env)->lti_local_iov;
        kiocb = &ll_env_info(env)->lti_kiocb;
        kiocb = &ll_env_info(env)->lti_kiocb;
-        local_iov->iov_base = (void __user *)buf;
-        local_iov->iov_len = count;
         init_sync_kiocb(kiocb, file);
         kiocb->ki_pos = *ppos;
 #ifdef HAVE_KIOCB_KI_LEFT
         init_sync_kiocb(kiocb, file);
         kiocb->ki_pos = *ppos;
 #ifdef HAVE_KIOCB_KI_LEFT
@@ -1269,11 +1295,11 @@ static ssize_t ll_file_write(struct file *file, const char __user *buf,
         kiocb->ki_nbytes = count;
 #endif
 
         kiocb->ki_nbytes = count;
 #endif
 
-        result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
-        *ppos = kiocb->ki_pos;
+       result = ll_file_aio_write(kiocb, &iov, 1, kiocb->ki_pos);
+       *ppos = kiocb->ki_pos;
 
 
-        cl_env_put(env, &refcheck);
-        RETURN(result);
+       cl_env_put(env, &refcheck);
+       RETURN(result);
 }
 
 /*
 }
 
 /*
index f7490c3..45c8f1b 100644 (file)
@@ -87,9 +87,10 @@ static bool can_populate_pages(const struct lu_env *env, struct cl_io *io,
        case CIT_WRITE:
                /* don't need lock here to check lli_layout_gen as we have held
                 * extent lock and GROUP lock has to hold to swap layout */
        case CIT_WRITE:
                /* don't need lock here to check lli_layout_gen as we have held
                 * extent lock and GROUP lock has to hold to swap layout */
-               if (ll_layout_version_get(lli) != vio->vui_layout_gen) {
+               if (ll_layout_version_get(lli) != vio->vui_layout_gen ||
+                   OBD_FAIL_CHECK_RESET(OBD_FAIL_LLITE_LOST_LAYOUT, 0)) {
                        io->ci_need_restart = 1;
                        io->ci_need_restart = 1;
-                       /* this will return application a short read/write */
+                       /* this will cause a short read/write */
                        io->ci_continue = 0;
                        rc = false;
                }
                        io->ci_continue = 0;
                        rc = false;
                }
@@ -476,6 +477,7 @@ static void vvp_io_advance(const struct lu_env *env,
        struct vvp_io    *vio = cl2vvp_io(env, ios);
        struct cl_io     *io  = ios->cis_io;
        struct cl_object *obj = ios->cis_io->ci_obj;
        struct vvp_io    *vio = cl2vvp_io(env, ios);
        struct cl_io     *io  = ios->cis_io;
        struct cl_object *obj = ios->cis_io->ci_obj;
+       struct iovec     *iov;
 
        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 
 
        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 
@@ -485,27 +487,28 @@ static void vvp_io_advance(const struct lu_env *env,
        LASSERT(vio->vui_tot_nrsegs >= vio->vui_nrsegs);
        LASSERT(vio->vui_tot_count  >= nob);
 
        LASSERT(vio->vui_tot_nrsegs >= vio->vui_nrsegs);
        LASSERT(vio->vui_tot_count  >= nob);
 
-       vio->vui_iov        += vio->vui_nrsegs;
-       vio->vui_tot_nrsegs -= vio->vui_nrsegs;
-       vio->vui_tot_count  -= nob;
-
-       /* update the iov */
+       /* Restore the iov changed in vvp_io_update_iov() */
        if (vio->vui_iov_olen > 0) {
        if (vio->vui_iov_olen > 0) {
-               struct iovec *iv;
-
-               vio->vui_iov--;
-               vio->vui_tot_nrsegs++;
-               iv = &vio->vui_iov[0];
-               if (io->ci_continue) {
-                       iv->iov_base += iv->iov_len;
-                       LASSERT(vio->vui_iov_olen > iv->iov_len);
-                       iv->iov_len = vio->vui_iov_olen - iv->iov_len;
-               } else {
-                       /* restore the iov_len, in case of restart io. */
-                       iv->iov_len = vio->vui_iov_olen;
-               }
+               vio->vui_iov[vio->vui_nrsegs - 1].iov_len = vio->vui_iov_olen;
                vio->vui_iov_olen = 0;
        }
                vio->vui_iov_olen = 0;
        }
+
+       /* advance iov */
+       iov = vio->vui_iov;
+       while (nob > 0) {
+               if (iov->iov_len > nob) {
+                       iov->iov_len -= nob;
+                       iov->iov_base += nob;
+                       break;
+               }
+
+               nob -= iov->iov_len;
+               iov++;
+               vio->vui_tot_nrsegs--;
+       }
+
+       vio->vui_iov = iov;
+       vio->vui_tot_count -= nob;
 }
 
 static void vvp_io_update_iov(const struct lu_env *env,
 }
 
 static void vvp_io_update_iov(const struct lu_env *env,
@@ -1025,6 +1028,18 @@ static int vvp_io_write_start(const struct lu_env *env,
 
        CDEBUG(D_VFSTRACE, "write: [%lli, %lli)\n", pos, pos + (long long)cnt);
 
 
        CDEBUG(D_VFSTRACE, "write: [%lli, %lli)\n", pos, pos + (long long)cnt);
 
+       /* The maximum Lustre file size is variable, based on the OST maximum
+        * object size and number of stripes.  This needs another check in
+        * addition to the VFS checks earlier. */
+       if (pos + cnt > ll_file_maxbytes(inode)) {
+               CDEBUG(D_INODE,
+                      "%s: file "DFID" offset %llu > maxbytes "LPU64"\n",
+                      ll_get_fsname(inode->i_sb, NULL, 0),
+                      PFID(ll_inode2fid(inode)), pos + cnt,
+                      ll_file_maxbytes(inode));
+               RETURN(-EFBIG);
+       }
+
        if (vio->vui_iov == NULL) {
                /* from a temp io in ll_cl_init(). */
                result = 0;
        if (vio->vui_iov == NULL) {
                /* from a temp io in ll_cl_init(). */
                result = 0;
index 08f125d..b37ceb6 100644 (file)
@@ -12920,6 +12920,23 @@ test_250() {
 }
 run_test 250 "Write above 16T limit"
 
 }
 run_test 250 "Write above 16T limit"
 
+test_251() {
+       $SETSTRIPE -c -1 -S 1048576 $DIR/$tfile
+
+       #define OBD_FAIL_LLITE_LOST_LAYOUT 0x1407
+       #Skip once - writing the first stripe will succeed
+       $LCTL set_param fail_loc=0xa0001407 fail_val=1
+       $MULTIOP $DIR/$tfile o:O_RDWR:w2097152c 2>&1 | grep -q "short write" &&
+               error "short write happened"
+
+       $LCTL set_param fail_loc=0xa0001407 fail_val=1
+       $MULTIOP $DIR/$tfile or2097152c 2>&1 | grep -q "short read" &&
+               error "short read happened"
+
+       rm -f $DIR/$tfile
+}
+run_test 251 "Handling short read and write correctly"
+
 cleanup_test_300() {
        trap 0
        umask $SAVE_UMASK
 cleanup_test_300() {
        trap 0
        umask $SAVE_UMASK