Whamcloud - gitweb
LU-8964 clio: Parallelize generic I/O 68/26468/17
authorDmitry Eremin <dmitry.eremin@intel.com>
Thu, 30 Mar 2017 19:38:56 +0000 (22:38 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 30 May 2017 13:28:12 +0000 (13:28 +0000)
Add parallel version of cl_io_loop() function which use information
about stripes from LOV layer and process them in parallel.
This feature is disabled by default. To enable it you should run
"lctl set_param llite.*.pio=1" command.

IOR results on KNL for:
access             = file-per-process
ordering in a file = sequential offsets
ordering inter file= no tasks offsets
clients            = 1 (1 per node)
repetitions        = 1
blocksize          = 128 GiB
aggregate filesize = 128 GiB

xfsize pio Write Read
16 none 170.46 372.12
16 off 370.46 926.53
16 on 668.49 899.55
32 off 368.75 908.95
32 on 469.54 987.64

IOR results on Broadwell Xeon for:
access             = file-per-process
ordering in a file = sequential offsets
ordering inter file= no tasks offsets
clients            = 1 (1 per node)
repetitions        = 1
xfersize           = 16 MiB
blocksize          = 128 GiB
aggregate filesize = 128 GiB

pio Write Read
none 1419.80 1277.88
off 1348.98 2245.84
on  990.76 2320.08

The scalability IOR results on other Broadwell Xeon for:
access             = file-per-process
ordering in a file = sequential offsets
ordering inter file= no tasks offsets
repetitions        = 1
xfersize           = 4 MiB
blocksize          = 8 GiB

Threads pio Write Read
 32 off 9358.38 2649.28
 32 on 9147.14 2677.44
 64 off 8538.65 2811.05
 64 on 8944.19 2908.44
128 off 7978.61 2937.03
128 on 8613.91 2928.44

The numbers are in ‘MB/s’

Signed-off-by: Dmitry Eremin <dmitry.eremin@intel.com>
Change-Id: Id028faba1726fb377d0e903e8b8095d5ea9d1ee2
Reviewed-on: https://review.whamcloud.com/26468
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
15 files changed:
lustre/include/cl_object.h
lustre/include/lustre_compat.h
lustre/include/obd_support.h
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/lproc_llite.c
lustre/llite/rw26.c
lustre/llite/vvp_internal.h
lustre/llite/vvp_io.c
lustre/lov/lov_io.c
lustre/obdclass/cl_io.c
lustre/obdclass/cl_object.c
lustre/osc/osc_io.c
lustre/osc/osc_lock.c
lustre/tests/sanity.sh

index 976c695..ca9b520 100644 (file)
@@ -89,6 +89,7 @@
  * super-class definitions.
  */
 #include <libcfs/libcfs.h>
+#include <libcfs/libcfs_ptask.h>
 #include <lu_object.h>
 #include <linux/atomic.h>
 #include <linux/mutex.h>
@@ -117,6 +118,8 @@ struct cl_io_slice;
 
 struct cl_req_attr;
 
+extern struct cfs_ptask_engine *cl_io_engine;
+
 /**
  * Device in the client stack.
  *
@@ -1728,10 +1731,21 @@ enum cl_fsync_mode {
        CL_FSYNC_ALL   = 3
 };
 
-struct cl_io_rw_common {
-        loff_t      crw_pos;
-        size_t      crw_count;
-        int         crw_nonblock;
+struct cl_io_range {
+       loff_t cir_pos;
+       size_t cir_count;
+};
+
+struct cl_io_pt {
+       struct cl_io_pt         *cip_next;
+       struct cfs_ptask         cip_task;
+       struct kiocb             cip_iocb;
+       struct iov_iter          cip_iter;
+       struct file             *cip_file;
+       enum cl_io_type          cip_iot;
+       loff_t                   cip_pos;
+       size_t                   cip_count;
+       ssize_t                  cip_result;
 };
 
 /**
@@ -1762,15 +1776,16 @@ struct cl_io {
         /** lock requirements, this is just a help info for sublayers. */
         enum cl_io_lock_dmd            ci_lockreq;
         union {
-                struct cl_rd_io {
-                        struct cl_io_rw_common rd;
-                } ci_rd;
-               struct cl_wr_io {
-                       struct cl_io_rw_common wr;
-                       int                    wr_append;
-                       int                    wr_sync;
-               } ci_wr;
-               struct cl_io_rw_common ci_rw;
+               struct cl_rw_io {
+                       struct iov_iter          rw_iter;
+                       struct kiocb             rw_iocb;
+                       struct cl_io_range       rw_range;
+                       struct file             *rw_file;
+                       unsigned int             rw_nonblock:1,
+                                                rw_append:1,
+                                                rw_sync:1;
+                       int (*rw_ptask)(struct cfs_ptask *ptask);
+               } ci_rw;
                struct cl_setattr_io {
                        struct ost_lvb           sa_attr;
                        unsigned int             sa_attr_flags;
@@ -1854,7 +1869,9 @@ struct cl_io {
        /**
         * O_NOATIME
         */
-                            ci_noatime:1;
+                            ci_noatime:1,
+       /** Set to 1 if parallel execution is allowed for current I/O? */
+                            ci_pio:1;
        /**
         * Number of pages owned by this IO. For invariant checking.
         */
@@ -2289,12 +2306,12 @@ int   cl_io_cancel       (const struct lu_env *env, struct cl_io *io,
  */
 static inline int cl_io_is_append(const struct cl_io *io)
 {
-        return io->ci_type == CIT_WRITE && io->u.ci_wr.wr_append;
+       return io->ci_type == CIT_WRITE && io->u.ci_rw.rw_append;
 }
 
 static inline int cl_io_is_sync_write(const struct cl_io *io)
 {
-       return io->ci_type == CIT_WRITE && io->u.ci_wr.wr_sync;
+       return io->ci_type == CIT_WRITE && io->u.ci_rw.rw_sync;
 }
 
 static inline int cl_io_is_mkwrite(const struct cl_io *io)
index 8eff04d..498d27b 100644 (file)
@@ -41,6 +41,7 @@
 
 #include <libcfs/libcfs.h>
 #include <lustre_patchless_compat.h>
+#include <obd_support.h>
 
 #ifdef HAVE_FS_STRUCT_RWLOCK
 # define LOCK_FS_STRUCT(fs)    write_lock(&(fs)->lock)
@@ -442,4 +443,122 @@ static inline void truncate_inode_pages_final(struct address_space *map)
 # define GET_POSIX_ACL_XATTR_ENTRY(head) ((head)->a_entries)
 #endif
 
+#ifndef HAVE_IOV_ITER_TRUNCATE
+static inline void iov_iter_truncate(struct iov_iter *i, u64 count)
+{
+       if (i->count > count)
+               i->count = count;
+}
+#endif
+
+#ifndef HAVE_IS_SXID
+static inline bool is_sxid(umode_t mode)
+{
+       return (mode & S_ISUID) || ((mode & S_ISGID) && (mode & S_IXGRP));
+}
+#endif
+
+#ifndef IS_NOSEC
+#define IS_NOSEC(inode)        (!is_sxid(inode->i_mode))
+#endif
+
+#ifndef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
+static inline void iov_iter_reexpand(struct iov_iter *i, size_t count)
+{
+       i->count = count;
+}
+
+static inline struct iovec iov_iter_iovec(const struct iov_iter *iter)
+{
+       return (struct iovec) {
+               .iov_base = iter->iov->iov_base + iter->iov_offset,
+               .iov_len = min(iter->count,
+                              iter->iov->iov_len - iter->iov_offset),
+       };
+}
+
+#define iov_for_each(iov, iter, start)                                 \
+       for (iter = (start);                                            \
+            (iter).count && ((iov = iov_iter_iovec(&(iter))), 1);      \
+            iov_iter_advance(&(iter), (iov).iov_len))
+
+static inline ssize_t
+generic_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
+{
+       struct iovec iov;
+       struct iov_iter i;
+       ssize_t bytes = 0;
+
+       iov_for_each(iov, i, *iter) {
+               ssize_t res;
+
+               res = generic_file_aio_read(iocb, &iov, 1, iocb->ki_pos);
+               if (res <= 0) {
+                       if (bytes == 0)
+                               bytes = res;
+                       break;
+               }
+
+               bytes += res;
+               if (res < iov.iov_len)
+                       break;
+       }
+
+       if (bytes > 0)
+               iov_iter_advance(iter, bytes);
+       return bytes;
+}
+
+static inline ssize_t
+__generic_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
+{
+       struct iovec iov;
+       struct iov_iter i;
+       ssize_t bytes = 0;
+
+       /* Since LLITE updates file size at the end of I/O in
+        * vvp_io_commit_write(), append write has to be done in atomic when
+        * there are multiple segments because otherwise each iteration to
+        * __generic_file_aio_write() will see original file size */
+       if (unlikely(iocb->ki_filp->f_flags & O_APPEND && iter->nr_segs > 1)) {
+               struct iovec *iov_copy;
+               int count = 0;
+
+               OBD_ALLOC(iov_copy, sizeof(*iov_copy) * iter->nr_segs);
+               if (!iov_copy)
+                       return -ENOMEM;
+
+               iov_for_each(iov, i, *iter)
+                       iov_copy[count++] = iov;
+
+               bytes = __generic_file_aio_write(iocb, iov_copy, count,
+                                                &iocb->ki_pos);
+               OBD_FREE(iov_copy, sizeof(*iov_copy) * iter->nr_segs);
+
+               if (bytes > 0)
+                       iov_iter_advance(iter, bytes);
+               return bytes;
+       }
+
+       iov_for_each(iov, i, *iter) {
+               ssize_t res;
+
+               res = __generic_file_aio_write(iocb, &iov, 1, &iocb->ki_pos);
+               if (res <= 0) {
+                       if (bytes == 0)
+                               bytes = res;
+                       break;
+               }
+
+               bytes += res;
+               if (res < iov.iov_len)
+                       break;
+       }
+
+       if (bytes > 0)
+               iov_iter_advance(iter, bytes);
+       return bytes;
+}
+#endif /* HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
+
 #endif /* _LUSTRE_COMPAT_H */
index 9f10001..b130242 100644 (file)
@@ -531,6 +531,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_LLITE_NEWNODE_PAUSE               0x140a
 #define OBD_FAIL_LLITE_SETDIRSTRIPE_PAUSE          0x140b
 #define OBD_FAIL_LLITE_CREATE_NODE_PAUSE           0x140c
+#define OBD_FAIL_LLITE_PTASK_IO_FAIL               0x140d
 
 
 #define OBD_FAIL_FID_INDIR     0x1501
index edd438c..09bc0bf 100644 (file)
@@ -1083,27 +1083,120 @@ static bool file_is_noatime(const struct file *file)
        return false;
 }
 
-static void ll_io_init(struct cl_io *io, const struct file *file, int write)
+static int ll_file_io_ptask(struct cfs_ptask *ptask);
+
+static void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot)
 {
-       struct inode *inode = file_inode((struct file *)file);
-
-        io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
-       if (write) {
-               io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
-               io->u.ci_wr.wr_sync = file->f_flags & O_SYNC ||
-                                     file->f_flags & O_DIRECT ||
-                                     IS_SYNC(inode);
-       }
-        io->ci_obj     = ll_i2info(inode)->lli_clob;
-        io->ci_lockreq = CILR_MAYBE;
-        if (ll_file_nolock(file)) {
-                io->ci_lockreq = CILR_NEVER;
-                io->ci_no_srvlock = 1;
-        } else if (file->f_flags & O_APPEND) {
-                io->ci_lockreq = CILR_MANDATORY;
-        }
+       struct inode *inode = file_inode(file);
 
+       memset(&io->u.ci_rw.rw_iter, 0, sizeof(io->u.ci_rw.rw_iter));
+       init_sync_kiocb(&io->u.ci_rw.rw_iocb, file);
+       io->u.ci_rw.rw_file = file;
+       io->u.ci_rw.rw_ptask = ll_file_io_ptask;
+       io->u.ci_rw.rw_nonblock = !!(file->f_flags & O_NONBLOCK);
+       if (iot == CIT_WRITE) {
+               io->u.ci_rw.rw_append = !!(file->f_flags & O_APPEND);
+               io->u.ci_rw.rw_sync   = !!(file->f_flags & O_SYNC ||
+                                          file->f_flags & O_DIRECT ||
+                                          IS_SYNC(inode));
+       }
+       io->ci_obj = ll_i2info(inode)->lli_clob;
+       io->ci_lockreq = CILR_MAYBE;
+       if (ll_file_nolock(file)) {
+               io->ci_lockreq = CILR_NEVER;
+               io->ci_no_srvlock = 1;
+       } else if (file->f_flags & O_APPEND) {
+               io->ci_lockreq = CILR_MANDATORY;
+       }
        io->ci_noatime = file_is_noatime(file);
+       if (ll_i2sbi(inode)->ll_flags & LL_SBI_PIO)
+               io->ci_pio = !io->u.ci_rw.rw_append;
+       else
+               io->ci_pio = 0;
+}
+
+static int ll_file_io_ptask(struct cfs_ptask *ptask)
+{
+       struct cl_io_pt *pt = ptask->pt_cbdata;
+       struct file *file = pt->cip_file;
+       struct lu_env *env;
+       struct cl_io *io;
+       loff_t pos = pt->cip_pos;
+       int rc;
+       __u16 refcheck;
+       ENTRY;
+
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n",
+               file_dentry(file)->d_name.name,
+               pt->cip_iot == CIT_READ ? "read" : "write",
+               pos, pos + pt->cip_count);
+
+restart:
+       io = vvp_env_thread_io(env);
+       ll_io_init(io, file, pt->cip_iot);
+       io->u.ci_rw.rw_iter = pt->cip_iter;
+       io->u.ci_rw.rw_iocb = pt->cip_iocb;
+       io->ci_pio = 0; /* It's already in parallel task */
+
+       rc = cl_io_rw_init(env, io, pt->cip_iot, pos,
+                          pt->cip_count - pt->cip_result);
+       if (!rc) {
+               struct vvp_io *vio = vvp_env_io(env);
+
+               vio->vui_io_subtype = IO_NORMAL;
+               vio->vui_fd = LUSTRE_FPRIVATE(file);
+
+               ll_cl_add(file, env, io, LCC_RW);
+               rc = cl_io_loop(env, io);
+               ll_cl_remove(file, env);
+       } else {
+               /* cl_io_rw_init() handled IO */
+               rc = io->ci_result;
+       }
+
+       if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LLITE_PTASK_IO_FAIL, 0)) {
+               if (io->ci_nob > 0)
+                       io->ci_nob /= 2;
+               rc = -EIO;
+       }
+
+       if (io->ci_nob > 0) {
+               pt->cip_result += io->ci_nob;
+               iov_iter_advance(&pt->cip_iter, io->ci_nob);
+               pos += io->ci_nob;
+               pt->cip_iocb.ki_pos = pos;
+#ifdef HAVE_KIOCB_KI_LEFT
+               pt->cip_iocb.ki_left = pt->cip_count - pt->cip_result;
+#elif defined(HAVE_KI_NBYTES)
+               pt->cip_iocb.ki_nbytes = pt->cip_count - pt->cip_result;
+#endif
+       }
+
+       cl_io_fini(env, io);
+
+       if ((rc == 0 || rc == -ENODATA) &&
+           pt->cip_result < pt->cip_count &&
+           io->ci_need_restart) {
+               CDEBUG(D_VFSTRACE,
+                       "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
+                       file_dentry(file)->d_name.name,
+                       pt->cip_iot == CIT_READ ? "read" : "write",
+                       pos, pos + pt->cip_count - pt->cip_result,
+                       pt->cip_result, rc);
+               goto restart;
+       }
+
+       CDEBUG(D_VFSTRACE, "%s: %s ret: %zd, rc: %d\n",
+               file_dentry(file)->d_name.name,
+               pt->cip_iot == CIT_READ ? "read" : "write",
+               pt->cip_result, rc);
+
+       cl_env_put(env, &refcheck);
+       RETURN(pt->cip_result > 0 ? 0 : rc);
 }
 
 static ssize_t
@@ -1111,39 +1204,45 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
                   struct file *file, enum cl_io_type iot,
                   loff_t *ppos, size_t count)
 {
+       struct range_lock       range;
        struct vvp_io           *vio = vvp_env_io(env);
        struct inode            *inode = file_inode(file);
        struct ll_inode_info    *lli = ll_i2info(inode);
        struct ll_file_data     *fd  = LUSTRE_FPRIVATE(file);
        struct cl_io            *io;
+       loff_t                  pos = *ppos;
        ssize_t                 result = 0;
        int                     rc = 0;
-       struct range_lock       range;
 
        ENTRY;
 
-       CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: %llu, count: %zu\n",
-               file_dentry(file)->d_name.name, iot, *ppos, count);
+       CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n",
+               file_dentry(file)->d_name.name,
+               iot == CIT_READ ? "read" : "write", pos, pos + count);
 
 restart:
        io = vvp_env_thread_io(env);
-       ll_io_init(io, file, iot == CIT_WRITE);
+       ll_io_init(io, file, iot);
+       if (args->via_io_subtype == IO_NORMAL) {
+               io->u.ci_rw.rw_iter = *args->u.normal.via_iter;
+               io->u.ci_rw.rw_iocb = *args->u.normal.via_iocb;
+       } else {
+               io->ci_pio = 0;
+       }
 
-       if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
+       if (cl_io_rw_init(env, io, iot, pos, count) == 0) {
                bool range_locked = false;
 
                if (file->f_flags & O_APPEND)
                        range_lock_init(&range, 0, LUSTRE_EOF);
                else
-                       range_lock_init(&range, *ppos, *ppos + count - 1);
+                       range_lock_init(&range, pos, pos + count - 1);
 
                vio->vui_fd  = LUSTRE_FPRIVATE(file);
                vio->vui_io_subtype = args->via_io_subtype;
 
                switch (vio->vui_io_subtype) {
                case IO_NORMAL:
-                       vio->vui_iter = args->u.normal.via_iter;
-                       vio->vui_iocb = args->u.normal.via_iocb;
                        /* Direct IO reads must also take range lock,
                         * or multiple reads will try to work on the same pages
                         * See LU-6227 for details. */
@@ -1169,7 +1268,16 @@ restart:
                }
 
                ll_cl_add(file, env, io, LCC_RW);
+               if (io->ci_pio && iot == CIT_WRITE && !IS_NOSEC(inode) &&
+                   !lli->lli_inode_locked) {
+                       inode_lock(inode);
+                       lli->lli_inode_locked = 1;
+               }
                rc = cl_io_loop(env, io);
+               if (lli->lli_inode_locked) {
+                       lli->lli_inode_locked = 0;
+                       inode_unlock(inode);
+               }
                ll_cl_remove(file, env);
 
                if (range_locked) {
@@ -1184,22 +1292,31 @@ restart:
 
        if (io->ci_nob > 0) {
                result += io->ci_nob;
-               count -= io->ci_nob;
-               *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
+               count  -= io->ci_nob;
 
-               /* prepare IO restart */
-               if (count > 0 && args->via_io_subtype == IO_NORMAL)
-                       args->u.normal.via_iter = vio->vui_iter;
+               if (args->via_io_subtype == IO_NORMAL) {
+                       iov_iter_advance(args->u.normal.via_iter, io->ci_nob);
+                       pos += io->ci_nob;
+                       args->u.normal.via_iocb->ki_pos = pos;
+#ifdef HAVE_KIOCB_KI_LEFT
+                       args->u.normal.via_iocb->ki_left = count;
+#elif defined(HAVE_KI_NBYTES)
+                       args->u.normal.via_iocb->ki_nbytes = count;
+#endif
+               } else {
+                       /* for splice */
+                       pos = io->u.ci_rw.rw_range.cir_pos;
+               }
        }
 out:
        cl_io_fini(env, io);
 
        if ((rc == 0 || rc == -ENODATA) && count > 0 && io->ci_need_restart) {
                CDEBUG(D_VFSTRACE,
-                      "%s: restart %s from %lld, count:%zu, result: %zd\n",
-                      file_dentry(file)->d_name.name,
-                      iot == CIT_READ ? "read" : "write",
-                      *ppos, count, result);
+                       "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
+                       file_dentry(file)->d_name.name,
+                       iot == CIT_READ ? "read" : "write",
+                       pos, pos + count, result, rc);
                goto restart;
        }
 
@@ -1223,7 +1340,11 @@ out:
                }
        }
 
-       CDEBUG(D_VFSTRACE, "iot: %d, result: %zd\n", iot, result);
+       CDEBUG(D_VFSTRACE, "%s: %s *ppos: %llu, pos: %llu, ret: %zd, rc: %d\n",
+               file_dentry(file)->d_name.name,
+               iot == CIT_READ ? "read" : "write", *ppos, pos, result, rc);
+
+       *ppos = pos;
 
        RETURN(result > 0 ? result : rc);
 }
@@ -3007,6 +3128,7 @@ int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
 int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 {
        struct dentry *dentry = file_dentry(file);
+       bool lock_inode;
 #elif defined(HAVE_FILE_FSYNC_2ARGS)
 int ll_fsync(struct file *file, int datasync)
 {
@@ -3031,7 +3153,9 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
 
 #ifdef HAVE_FILE_FSYNC_4ARGS
        rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
-       inode_lock(inode);
+       lock_inode = !lli->lli_inode_locked;
+       if (lock_inode)
+               inode_lock(inode);
 #else
        /* fsync's caller has already called _fdata{sync,write}, we want
         * that IO to finish before calling the osc and mdc sync methods */
@@ -3071,7 +3195,8 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
        }
 
 #ifdef HAVE_FILE_FSYNC_4ARGS
-       inode_unlock(inode);
+       if (lock_inode)
+               inode_unlock(inode);
 #endif
        RETURN(rc);
 }
index 28f42d2..eb9298d 100644 (file)
@@ -46,6 +46,7 @@
 #include <linux/compat.h>
 #include <linux/aio.h>
 
+#include <lustre_compat.h>
 #include "vvp_internal.h"
 #include "range_lock.h"
 
@@ -135,7 +136,8 @@ struct ll_inode_info {
 
        /* update atime from MDS no matter if it's older than
         * local inode atime. */
-       unsigned int    lli_update_atime:1;
+       unsigned int    lli_update_atime:1,
+                       lli_inode_locked:1;
 
        /* Try to make the d::member and f::member are aligned. Before using
         * these members, make clear whether it is directory or not. */
@@ -429,6 +431,7 @@ enum stats_track_type {
                                       * suppress_pings */
 #define LL_SBI_FAST_READ     0x400000 /* fast read support */
 #define LL_SBI_FILE_SECCTX   0x800000 /* set file security context at create */
+#define LL_SBI_PIO          0x1000000 /* parallel IO support */
 
 #define LL_SBI_FLAGS {         \
        "nolck",        \
@@ -455,6 +458,7 @@ enum stats_track_type {
        "always_ping",  \
        "fast_read",    \
        "file_secctx",  \
+       "pio",          \
 }
 
 /* This is embedded into llite super-blocks to keep track of connect
@@ -1451,121 +1455,4 @@ void cl_inode_fini(struct inode *inode);
 u64 cl_fid_build_ino(const struct lu_fid *fid, int api32);
 u32 cl_fid_build_gen(const struct lu_fid *fid);
 
-#ifndef HAVE_IOV_ITER_TRUNCATE
-static inline void iov_iter_truncate(struct iov_iter *i, u64 count)
-{
-       if (i->count > count)
-               i->count = count;
-}
-#endif
-
-#ifndef HAVE_IS_SXID
-static inline bool is_sxid(umode_t mode)
-{
-       return (mode & S_ISUID) || ((mode & S_ISGID) && (mode & S_IXGRP));
-}
-#endif
-
-#ifndef IS_NOSEC
-#define IS_NOSEC(inode)        (!is_sxid(inode->i_mode))
-#endif
-
-#ifndef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
-static inline void iov_iter_reexpand(struct iov_iter *i, size_t count)
-{
-       i->count = count;
-}
-
-static inline struct iovec iov_iter_iovec(const struct iov_iter *iter)
-{
-       return (struct iovec) {
-               .iov_base = iter->iov->iov_base + iter->iov_offset,
-               .iov_len = min(iter->count,
-                              iter->iov->iov_len - iter->iov_offset),
-       };
-}
-
-#define iov_for_each(iov, iter, start)                                 \
-       for (iter = (start);                                            \
-            (iter).count && ((iov = iov_iter_iovec(&(iter))), 1);      \
-            iov_iter_advance(&(iter), (iov).iov_len))
-
-static inline ssize_t
-generic_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
-{
-       struct iovec iov;
-       struct iov_iter i;
-       ssize_t bytes = 0;
-
-       iov_for_each(iov, i, *iter) {
-               ssize_t res;
-
-               res = generic_file_aio_read(iocb, &iov, 1, iocb->ki_pos);
-               if (res <= 0) {
-                       if (bytes == 0)
-                               bytes = res;
-                       break;
-               }
-
-               bytes += res;
-               if (res < iov.iov_len)
-                       break;
-       }
-
-       if (bytes > 0)
-               iov_iter_advance(iter, bytes);
-       return bytes;
-}
-
-static inline ssize_t
-__generic_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
-{
-       struct iovec iov;
-       struct iov_iter i;
-       ssize_t bytes = 0;
-
-       /* Since LLITE updates file size at the end of I/O in
-        * vvp_io_commit_write(), append write has to be done in atomic when
-        * there are multiple segments because otherwise each iteration to
-        * __generic_file_aio_write() will see original file size */
-       if (unlikely(iocb->ki_filp->f_flags & O_APPEND && iter->nr_segs > 1)) {
-               struct iovec *iov_copy;
-               int count = 0;
-
-               OBD_ALLOC(iov_copy, sizeof(*iov_copy) * iter->nr_segs);
-               if (!iov_copy)
-                       return -ENOMEM;
-
-               iov_for_each(iov, i, *iter)
-                       iov_copy[count++] = iov;
-
-               bytes = __generic_file_aio_write(iocb, iov_copy, count,
-                                                &iocb->ki_pos);
-               OBD_FREE(iov_copy, sizeof(*iov_copy) * iter->nr_segs);
-
-               if (bytes > 0)
-                       iov_iter_advance(iter, bytes);
-               return bytes;
-       }
-
-       iov_for_each(iov, i, *iter) {
-               ssize_t res;
-
-               res = __generic_file_aio_write(iocb, &iov, 1, &iocb->ki_pos);
-               if (res <= 0) {
-                       if (bytes == 0)
-                               bytes = res;
-                       break;
-               }
-
-               bytes += res;
-               if (res < iov.iov_len)
-                       break;
-       }
-
-       if (bytes > 0)
-               iov_iter_advance(iter, bytes);
-       return bytes;
-}
-#endif /* HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
 #endif /* LLITE_INTERNAL_H */
index 90ca5e4..9166966 100644 (file)
@@ -926,6 +926,39 @@ ll_fast_read_seq_write(struct file *file, const char __user *buffer,
 }
 LPROC_SEQ_FOPS(ll_fast_read);
 
+static int ll_pio_seq_show(struct seq_file *m, void *v)
+{
+       struct super_block *sb = m->private;
+       struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+       seq_printf(m, "%u\n", !!(sbi->ll_flags & LL_SBI_PIO));
+       return 0;
+}
+
+static ssize_t ll_pio_seq_write(struct file *file, const char __user *buffer,
+                               size_t count, loff_t *off)
+{
+       struct seq_file *m = file->private_data;
+       struct super_block *sb = m->private;
+       struct ll_sb_info *sbi = ll_s2sbi(sb);
+       int rc;
+       __s64 val;
+
+       rc = lprocfs_str_to_s64(buffer, count, &val);
+       if (rc)
+               return rc;
+
+       spin_lock(&sbi->ll_lock);
+       if (val == 1)
+               sbi->ll_flags |= LL_SBI_PIO;
+       else
+               sbi->ll_flags &= ~LL_SBI_PIO;
+       spin_unlock(&sbi->ll_lock);
+
+       return count;
+}
+LPROC_SEQ_FOPS(ll_pio);
+
 static int ll_unstable_stats_seq_show(struct seq_file *m, void *v)
 {
        struct super_block      *sb    = m->private;
@@ -1105,8 +1138,10 @@ struct lprocfs_vars lprocfs_llite_obd_vars[] = {
          .fops =       &ll_root_squash_fops                    },
        { .name =       "nosquash_nids",
          .fops =       &ll_nosquash_nids_fops                  },
-       { .name =       "fast_read",
-         .fops =       &ll_fast_read_fops,                     },
+       { .name =       "fast_read",
+         .fops =       &ll_fast_read_fops,                     },
+       { .name =       "pio",
+         .fops =       &ll_pio_fops,                           },
        { NULL }
 };
 
index 7dd9510..79a252a 100644 (file)
@@ -649,7 +649,7 @@ static int ll_write_begin(struct file *file, struct address_space *mapping,
        int result = 0;
        ENTRY;
 
-       CDEBUG(D_VFSTRACE, "Writing %lu of %d to %d bytes\n", index, from, len);
+       CDEBUG(D_PAGE, "Writing %lu of %d to %d bytes\n", index, from, len);
 
        lcc = ll_cl_find(file);
        if (lcc == NULL) {
@@ -784,7 +784,7 @@ static int ll_write_end(struct file *file, struct address_space *mapping,
                if (plist->pl_nr >= PTLRPC_MAX_BRW_PAGES)
                        unplug = true;
 
-               CL_PAGE_DEBUG(D_VFSTRACE, env, page,
+               CL_PAGE_DEBUG(D_PAGE, env, page,
                              "queued page: %d.\n", plist->pl_nr);
        } else {
                cl_page_disown(env, io, page);
index c9240df..645b4b5 100644 (file)
@@ -61,13 +61,7 @@ struct vvp_io {
        /** super class */
        struct cl_io_slice     vui_cl;
        struct cl_io_lock_link vui_link;
-       /**
-        * I/O vector information to or from which read/write is going.
-        */
-       struct iov_iter *vui_iter;
-       /**
-        * Total size for the left IO.
-        */
+       /** Total size for the left IO. */
        size_t vui_tot_count;
 
        union {
@@ -117,7 +111,6 @@ struct vvp_io {
        * File descriptor against which IO is done.
        */
        struct ll_file_data     *vui_fd;
-       struct kiocb            *vui_iocb;
 
        /* Readahead state. */
        pgoff_t vui_ra_start;
index 7aa3cfd..35a5c3d 100644 (file)
@@ -303,7 +303,7 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 
        CDEBUG(D_VFSTRACE, DFID" ignore/verify layout %d/%d, layout version %d "
-                          "need write layout %d, restore needed %d\n",
+              "need write layout %d, restore needed %d\n",
               PFID(lu_object_fid(&obj->co_lu)),
               io->ci_ignore_layout, io->ci_verify_layout,
               vio->vui_layout_gen, io->ci_need_write_intent,
@@ -349,8 +349,8 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
 
                if (io->ci_type == CIT_WRITE) {
                        if (!cl_io_is_append(io)) {
-                               start = io->u.ci_rw.crw_pos;
-                               end = start + io->u.ci_rw.crw_count;
+                               start = io->u.ci_rw.rw_range.cir_pos;
+                               end = start + io->u.ci_rw.rw_range.cir_count;
                        }
                } else if (cl_io_is_trunc(io)) {
                        end = io->u.ci_setattr.sa_attr.lvb_size;
@@ -361,7 +361,7 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
                        end = cl_offset(io->ci_obj, index + 1);
                }
 
-               CDEBUG(D_VFSTRACE, DFID" type %d [%llx, %llx)\n",
+               CDEBUG(D_VFSTRACE, DFID" write layout, type %u [%llu, %llu)\n",
                       PFID(lu_object_fid(&obj->co_lu)), io->ci_type,
                       start, end);
                rc = ll_layout_write_intent(inode, start, end);
@@ -418,8 +418,7 @@ static enum cl_lock_mode vvp_mode_from_vma(struct vm_area_struct *vma)
         return CLM_READ;
 }
 
-static int vvp_mmap_locks(const struct lu_env *env,
-                         struct vvp_io *vio, struct cl_io *io)
+static int vvp_mmap_locks(const struct lu_env *env, struct cl_io *io)
 {
        struct vvp_thread_info *vti = vvp_env_info(env);
        struct mm_struct *mm = current->mm;
@@ -436,15 +435,11 @@ static int vvp_mmap_locks(const struct lu_env *env,
        if (!cl_is_normalio(env, io))
                RETURN(0);
 
-       /* nfs or loop back device write */
-       if (vio->vui_iter == NULL)
-               RETURN(0);
-
        /* No MM (e.g. NFS)? No vmas too. */
        if (mm == NULL)
                RETURN(0);
 
-       iov_for_each(iov, i, *(vio->vui_iter)) {
+       iov_for_each(iov, i, io->u.ci_rw.rw_iter) {
                unsigned long addr = (unsigned long)iov.iov_base;
                size_t count = iov.iov_len;
 
@@ -517,53 +512,54 @@ static void vvp_io_advance(const struct lu_env *env,
                return;
 
        vio->vui_tot_count -= nob;
-       iov_iter_reexpand(vio->vui_iter, vio->vui_tot_count);
-}
-
-static void vvp_io_update_iov(const struct lu_env *env,
-                             struct vvp_io *vio, struct cl_io *io)
-{
-       size_t size = io->u.ci_rw.crw_count;
-
-       if (!cl_is_normalio(env, io) || vio->vui_iter == NULL)
-               return;
-
-       iov_iter_truncate(vio->vui_iter, size);
+       if (io->ci_pio) {
+               iov_iter_advance(&io->u.ci_rw.rw_iter, nob);
+               io->u.ci_rw.rw_iocb.ki_pos = io->u.ci_rw.rw_range.cir_pos;
+#ifdef HAVE_KIOCB_KI_LEFT
+               io->u.ci_rw.rw_iocb.ki_left = vio->vui_tot_count;
+#elif defined(HAVE_KI_NBYTES)
+               io->u.ci_rw.rw_iocb.ki_nbytes = vio->vui_tot_count;
+#endif
+       } else {
+               /* It was truncated to stripe size in vvp_io_rw_lock() */
+               iov_iter_reexpand(&io->u.ci_rw.rw_iter, vio->vui_tot_count);
+       }
 }
 
 static int vvp_io_rw_lock(const struct lu_env *env, struct cl_io *io,
                           enum cl_lock_mode mode, loff_t start, loff_t end)
 {
-       struct vvp_io *vio = vvp_env_io(env);
        int result;
        int ast_flags = 0;
 
        LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
        ENTRY;
 
-       vvp_io_update_iov(env, vio, io);
+       if (cl_is_normalio(env, io))
+               iov_iter_truncate(&io->u.ci_rw.rw_iter,
+                                 io->u.ci_rw.rw_range.cir_count);
 
-       if (io->u.ci_rw.crw_nonblock)
+       if (io->u.ci_rw.rw_nonblock)
                ast_flags |= CEF_NONBLOCK;
 
-       result = vvp_mmap_locks(env, vio, io);
+       result = vvp_mmap_locks(env, io);
        if (result == 0)
                result = vvp_io_one_lock(env, io, ast_flags, mode, start, end);
 
-        RETURN(result);
+       RETURN(result);
 }
 
 static int vvp_io_read_lock(const struct lu_env *env,
                             const struct cl_io_slice *ios)
 {
-       struct cl_io            *io  = ios->cis_io;
-       struct cl_io_rw_common  *rd = &io->u.ci_rd.rd;
-       int result;
+       struct cl_io *io = ios->cis_io;
+       struct cl_io_range *range = &io->u.ci_rw.rw_range;
+       int rc;
 
        ENTRY;
-       result = vvp_io_rw_lock(env, io, CLM_READ, rd->crw_pos,
-                               rd->crw_pos + rd->crw_count - 1);
-       RETURN(result);
+       rc = vvp_io_rw_lock(env, io, CLM_READ, range->cir_pos,
+                           range->cir_pos + range->cir_count - 1);
+       RETURN(rc);
 }
 
 static int vvp_io_fault_lock(const struct lu_env *env,
@@ -584,18 +580,21 @@ static int vvp_io_fault_lock(const struct lu_env *env,
 static int vvp_io_write_lock(const struct lu_env *env,
                              const struct cl_io_slice *ios)
 {
-        struct cl_io *io = ios->cis_io;
-        loff_t start;
-        loff_t end;
+       struct cl_io *io = ios->cis_io;
+       loff_t start;
+       loff_t end;
+       int rc;
 
-        if (io->u.ci_wr.wr_append) {
-                start = 0;
-                end   = OBD_OBJECT_EOF;
-        } else {
-                start = io->u.ci_wr.wr.crw_pos;
-                end   = start + io->u.ci_wr.wr.crw_count - 1;
-        }
-        return vvp_io_rw_lock(env, io, CLM_WRITE, start, end);
+       ENTRY;
+       if (io->u.ci_rw.rw_append) {
+               start = 0;
+               end   = OBD_OBJECT_EOF;
+       } else {
+               start = io->u.ci_rw.rw_range.cir_pos;
+               end   = start + io->u.ci_rw.rw_range.cir_count - 1;
+       }
+       rc = vvp_io_rw_lock(env, io, CLM_WRITE, start, end);
+       RETURN(rc);
 }
 
 static int vvp_io_setattr_iter_init(const struct lu_env *env,
@@ -749,16 +748,17 @@ static int vvp_io_read_start(const struct lu_env *env,
        struct inode            *inode = vvp_object_inode(obj);
        struct ll_inode_info    *lli   = ll_i2info(inode);
        struct file             *file  = vio->vui_fd->fd_file;
-
-       int     result;
-       loff_t  pos = io->u.ci_rd.rd.crw_pos;
-       long    cnt = io->u.ci_rd.rd.crw_count;
-       long    tot = vio->vui_tot_count;
-        int     exceed = 0;
+       struct cl_io_range      *range = &io->u.ci_rw.rw_range;
+       loff_t pos = range->cir_pos; /* for generic_file_splice_read() only */
+       size_t tot = vio->vui_tot_count;
+       int exceed = 0;
+       int result;
 
        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 
-       CDEBUG(D_VFSTRACE, "read: -> [%lli, %lli)\n", pos, pos + cnt);
+       CDEBUG(D_VFSTRACE, "%s: read [%llu, %llu)\n",
+               file_dentry(file)->d_name.name,
+               range->cir_pos, range->cir_pos + range->cir_count);
 
        if (vio->vui_io_subtype == IO_NORMAL)
                down_read(&lli->lli_trunc_sem);
@@ -766,15 +766,16 @@ static int vvp_io_read_start(const struct lu_env *env,
        if (!can_populate_pages(env, io, inode))
                return 0;
 
-       result = vvp_prep_size(env, obj, io, pos, tot, &exceed);
+       result = vvp_prep_size(env, obj, io, range->cir_pos, tot, &exceed);
        if (result != 0)
                return result;
        else if (exceed != 0)
                goto out;
 
        LU_OBJECT_HEADER(D_INODE, env, &obj->co_lu,
-                       "Read ino %lu, %lu bytes, offset %lld, size %llu\n",
-                       inode->i_ino, cnt, pos, i_size_read(inode));
+                        "Read ino %lu, %lu bytes, offset %lld, size %llu\n",
+                        inode->i_ino, range->cir_count, range->cir_pos,
+                        i_size_read(inode));
 
        /* turn off the kernel's read-ahead */
        vio->vui_fd->fd_file->f_ra.ra_pages = 0;
@@ -782,7 +783,7 @@ static int vvp_io_read_start(const struct lu_env *env,
        /* initialize read-ahead window once per syscall */
        if (!vio->vui_ra_valid) {
                vio->vui_ra_valid = true;
-               vio->vui_ra_start = cl_index(obj, pos);
+               vio->vui_ra_start = cl_index(obj, range->cir_pos);
                vio->vui_ra_count = cl_index(obj, tot + PAGE_SIZE - 1);
                ll_ras_enter(file);
        }
@@ -791,12 +792,17 @@ static int vvp_io_read_start(const struct lu_env *env,
        file_accessed(file);
        switch (vio->vui_io_subtype) {
        case IO_NORMAL:
-               LASSERT(vio->vui_iocb->ki_pos == pos);
-               result = generic_file_read_iter(vio->vui_iocb, vio->vui_iter);
+               LASSERTF(io->u.ci_rw.rw_iocb.ki_pos == range->cir_pos,
+                        "ki_pos %lld [%lld, %lld)\n",
+                        io->u.ci_rw.rw_iocb.ki_pos,
+                        range->cir_pos, range->cir_pos + range->cir_count);
+               result = generic_file_read_iter(&io->u.ci_rw.rw_iocb,
+                                               &io->u.ci_rw.rw_iter);
                break;
        case IO_SPLICE:
                result = generic_file_splice_read(file, &pos,
-                                                 vio->u.splice.vui_pipe, cnt,
+                                                 vio->u.splice.vui_pipe,
+                                                 range->cir_count,
                                                  vio->u.splice.vui_flags);
                /* LU-1109: do splice read stripe by stripe otherwise if it
                 * may make nfsd stuck if this read occupied all internal pipe
@@ -810,11 +816,11 @@ static int vvp_io_read_start(const struct lu_env *env,
 
 out:
        if (result >= 0) {
-               if (result < cnt)
+               if (result < range->cir_count)
                        io->ci_continue = 0;
                io->ci_nob += result;
                ll_rw_stats_tally(ll_i2sbi(inode), current->pid, vio->vui_fd,
-                                 pos, result, READ);
+                                 range->cir_pos, result, READ);
                result = 0;
        }
 
@@ -870,7 +876,6 @@ static int vvp_io_commit_sync(const struct lu_env *env, struct cl_io *io,
                        SetPageUptodate(cl_page_vmpage(page));
                        cl_page_disown(env, io, page);
 
-                       /* held in ll_cl_init() */
                        lu_ref_del(&page->cp_reference, "cl_io", io);
                        cl_page_put(env, page);
                }
@@ -889,7 +894,6 @@ static void write_commit_callback(const struct lu_env *env, struct cl_io *io,
 
        cl_page_disown(env, io, page);
 
-       /* held in ll_cl_init() */
        lu_ref_del(&page->cp_reference, "cl_io", cl_io_top(io));
        cl_page_put(env, page);
 }
@@ -990,7 +994,6 @@ int vvp_io_write_commit(const struct lu_env *env, struct cl_io *io)
 
                cl_page_disown(env, io, page);
 
-               /* held in ll_cl_init() */
                lu_ref_del(&page->cp_reference, "cl_io", io);
                cl_page_put(env, page);
        }
@@ -1007,10 +1010,11 @@ static int vvp_io_write_start(const struct lu_env *env,
        struct cl_object        *obj   = io->ci_obj;
        struct inode            *inode = vvp_object_inode(obj);
        struct ll_inode_info    *lli   = ll_i2info(inode);
+       struct file             *file  = vio->vui_fd->fd_file;
+       struct cl_io_range      *range = &io->u.ci_rw.rw_range;
+       bool                     lock_inode = !lli->lli_inode_locked &&
+                                             !IS_NOSEC(inode);
        ssize_t                  result = 0;
-       loff_t                   pos = io->u.ci_wr.wr.crw_pos;
-       size_t                   cnt = io->u.ci_wr.wr.crw_count;
-
        ENTRY;
 
        if (vio->vui_io_subtype == IO_NORMAL)
@@ -1025,79 +1029,79 @@ static int vvp_io_write_start(const struct lu_env *env,
                 * out-of-order writes.
                 */
                ll_merge_attr(env, inode);
-               pos = io->u.ci_wr.wr.crw_pos = i_size_read(inode);
-               vio->vui_iocb->ki_pos = pos;
+               range->cir_pos = i_size_read(inode);
+               io->u.ci_rw.rw_iocb.ki_pos = range->cir_pos;
        } else {
-               LASSERT(vio->vui_iocb->ki_pos == pos);
+               LASSERTF(io->u.ci_rw.rw_iocb.ki_pos == range->cir_pos,
+                        "ki_pos %lld [%lld, %lld)\n",
+                        io->u.ci_rw.rw_iocb.ki_pos,
+                        range->cir_pos, range->cir_pos + range->cir_count);
        }
 
-       CDEBUG(D_VFSTRACE, "write: [%lli, %lli)\n", pos, pos + (long long)cnt);
+       CDEBUG(D_VFSTRACE, "%s: write [%llu, %llu)\n",
+               file_dentry(file)->d_name.name,
+               range->cir_pos, range->cir_pos + range->cir_count);
 
        /* The maximum Lustre file size is variable, based on the OST maximum
         * object size and number of stripes.  This needs another check in
         * addition to the VFS checks earlier. */
-       if (pos + cnt > ll_file_maxbytes(inode)) {
+       if (range->cir_pos + range->cir_count > ll_file_maxbytes(inode)) {
                CDEBUG(D_INODE,
-                      "%s: file "DFID" offset %llu > maxbytes %llu\n",
+                      "%s: file %s ("DFID") offset %llu > maxbytes %llu\n",
                       ll_get_fsname(inode->i_sb, NULL, 0),
-                      PFID(ll_inode2fid(inode)), pos + cnt,
+                      file_dentry(file)->d_name.name,
+                      PFID(ll_inode2fid(inode)),
+                      range->cir_pos + range->cir_count,
                       ll_file_maxbytes(inode));
                RETURN(-EFBIG);
        }
 
-       if (vio->vui_iter == NULL) {
-               /* from a temp io in ll_cl_init(). */
-               result = 0;
-       } else {
-               /*
-                * When using the locked AIO function (generic_file_aio_write())
-                * testing has shown the inode mutex to be a limiting factor
-                * with multi-threaded single shared file performance. To get
-                * around this, we now use the lockless version. To maintain
-                * consistency, proper locking to protect against writes,
-                * trucates, etc. is handled in the higher layers of lustre.
-                */
-               bool lock_node = !IS_NOSEC(inode);
-
-               if (lock_node)
-                       inode_lock(inode);
-               result = __generic_file_write_iter(vio->vui_iocb,
-                                                  vio->vui_iter);
-               if (lock_node)
-                       inode_unlock(inode);
+       /*
+        * When using the locked AIO function (generic_file_aio_write())
+        * testing has shown the inode mutex to be a limiting factor
+        * with multi-threaded single shared file performance. To get
+        * around this, we now use the lockless version. To maintain
+        * consistency, proper locking to protect against writes,
+        * trucates, etc. is handled in the higher layers of lustre.
+        */
+       if (lock_inode)
+               inode_lock(inode);
+       result = __generic_file_write_iter(&io->u.ci_rw.rw_iocb,
+                                          &io->u.ci_rw.rw_iter);
+       if (lock_inode)
+               inode_unlock(inode);
 
-               if (result > 0 || result == -EIOCBQUEUED)
+       if (result > 0 || result == -EIOCBQUEUED)
 #ifdef HAVE_GENERIC_WRITE_SYNC_2ARGS
-                       result = generic_write_sync(vio->vui_iocb, result);
+               result = generic_write_sync(&io->u.ci_rw.rw_iocb, result);
 #else
-               {
-                       ssize_t err;
+       {
+               ssize_t err;
 
-                       err = generic_write_sync(vio->vui_iocb->ki_filp, pos,
-                                                result);
-                       if (err < 0 && result > 0)
-                               result = err;
-               }
+               err = generic_write_sync(io->u.ci_rw.rw_iocb.ki_filp,
+                                        range->cir_pos, result);
+               if (err < 0 && result > 0)
+                       result = err;
+       }
 #endif
 
-       }
        if (result > 0) {
                result = vvp_io_write_commit(env, io);
                if (vio->u.write.vui_written > 0) {
                        result = vio->u.write.vui_written;
-                       io->ci_nob += result;
-
-                       CDEBUG(D_VFSTRACE, "write: nob %zd, result: %zd\n",
+                       CDEBUG(D_VFSTRACE, "%s: write nob %zd, result: %zd\n",
+                               file_dentry(file)->d_name.name,
                                io->ci_nob, result);
+                       io->ci_nob += result;
                }
        }
        if (result > 0) {
                ll_file_set_flag(ll_i2info(inode), LLIF_DATA_MODIFIED);
 
-               if (result < cnt)
+               if (result < range->cir_count)
                        io->ci_continue = 0;
                ll_rw_stats_tally(ll_i2sbi(inode), current->pid,
-                                 vio->vui_fd, pos, result, WRITE);
+                                 vio->vui_fd, range->cir_pos, result, WRITE);
                result = 0;
        }
 
@@ -1426,16 +1430,13 @@ int vvp_io_init(const struct lu_env *env, struct cl_object *obj,
        vio->vui_ra_valid = false;
        result = 0;
        if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
-               size_t count;
                struct ll_inode_info *lli = ll_i2info(inode);
 
-                count = io->u.ci_rw.crw_count;
-                /* "If nbyte is 0, read() will return 0 and have no other
-                 *  results."  -- Single Unix Spec */
-                if (count == 0)
-                        result = 1;
-               else
-                       vio->vui_tot_count = count;
+               vio->vui_tot_count = io->u.ci_rw.rw_range.cir_count;
+               /* "If nbyte is 0, read() will return 0 and have no other
+                *  results."  -- Single Unix Spec */
+               if (vio->vui_tot_count == 0)
+                       result = 1;
 
                /* for read/write, we store the jobid in the inode, and
                 * it'll be fetched by osc when building RPC.
index 73c16e2..2339ea5 100644 (file)
@@ -122,6 +122,7 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
        sub_io->ci_type    = io->ci_type;
        sub_io->ci_no_srvlock = io->ci_no_srvlock;
        sub_io->ci_noatime = io->ci_noatime;
+       sub_io->ci_pio = io->ci_pio;
 
        result = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
 
@@ -208,14 +209,14 @@ static int lov_io_slice_init(struct lov_io *lio,
 
        LASSERT(obj->lo_lsm != NULL);
 
-        switch (io->ci_type) {
-        case CIT_READ:
-        case CIT_WRITE:
-                lio->lis_pos = io->u.ci_rw.crw_pos;
-                lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
-                lio->lis_io_endpos = lio->lis_endpos;
-                if (cl_io_is_append(io)) {
-                        LASSERT(io->ci_type == CIT_WRITE);
+       switch (io->ci_type) {
+       case CIT_READ:
+       case CIT_WRITE:
+               lio->lis_pos = io->u.ci_rw.rw_range.cir_pos;
+               lio->lis_endpos = lio->lis_pos + io->u.ci_rw.rw_range.cir_count;
+               lio->lis_io_endpos = lio->lis_endpos;
+               if (cl_io_is_append(io)) {
+                       LASSERT(io->ci_type == CIT_WRITE);
 
                        /* If there is LOV EA hole, then we may cannot locate
                         * the current file-tail exactly. */
@@ -223,10 +224,10 @@ static int lov_io_slice_init(struct lov_io *lio,
                                     LOV_PATTERN_F_HOLE))
                                RETURN(-EIO);
 
-                        lio->lis_pos = 0;
-                        lio->lis_endpos = OBD_OBJECT_EOF;
-                }
-                break;
+                       lio->lis_pos = 0;
+                       lio->lis_endpos = OBD_OBJECT_EOF;
+               }
+               break;
 
         case CIT_SETATTR:
                 if (cl_io_is_trunc(io))
@@ -309,6 +310,7 @@ static void lov_io_sub_inherit(struct lov_io_sub *sub, struct lov_io *lio,
        int index = lov_comp_entry(sub->sub_subio_index);
        int stripe = lov_comp_stripe(sub->sub_subio_index);
 
+       io->ci_pio = parent->ci_pio;
        switch (io->ci_type) {
        case CIT_SETATTR: {
                io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
@@ -353,12 +355,16 @@ static void lov_io_sub_inherit(struct lov_io_sub *sub, struct lov_io *lio,
        }
        case CIT_READ:
        case CIT_WRITE: {
-               io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
+               io->u.ci_rw.rw_ptask = parent->u.ci_rw.rw_ptask;
+               io->u.ci_rw.rw_iter = parent->u.ci_rw.rw_iter;
+               io->u.ci_rw.rw_iocb = parent->u.ci_rw.rw_iocb;
+               io->u.ci_rw.rw_file = parent->u.ci_rw.rw_file;
+               io->u.ci_rw.rw_sync = parent->u.ci_rw.rw_sync;
                if (cl_io_is_append(parent)) {
-                       io->u.ci_wr.wr_append = 1;
+                       io->u.ci_rw.rw_append = 1;
                } else {
-                       io->u.ci_rw.crw_pos = start;
-                       io->u.ci_rw.crw_count = end - start;
+                       io->u.ci_rw.rw_range.cir_pos = start;
+                       io->u.ci_rw.rw_range.cir_count = end - start;
                }
                break;
        }
@@ -417,6 +423,8 @@ static int lov_io_iter_init(const struct lu_env *env,
                         * it's handled in lov_io_setattr_iter_init() */
                        if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io)) {
                                io->ci_need_write_intent = 1;
+                               /* execute it in main thread */
+                               io->ci_pio = 0;
                                rc = -ENODATA;
                                break;
                        }
@@ -455,8 +463,9 @@ static int lov_io_iter_init(const struct lu_env *env,
                        if (rc != 0)
                                break;
 
-                       CDEBUG(D_VFSTRACE, "shrink: %d [%llu, %llu)\n",
-                              stripe, start, end);
+                       CDEBUG(D_VFSTRACE,
+                               "shrink stripe: {%d, %d} range: [%llu, %llu)\n",
+                               index, stripe, start, end);
 
                        list_add_tail(&sub->sub_linkage, &lio->lis_active);
                }
@@ -469,11 +478,12 @@ static int lov_io_iter_init(const struct lu_env *env,
 static int lov_io_rw_iter_init(const struct lu_env *env,
                               const struct cl_io_slice *ios)
 {
-       struct lov_io        *lio = cl2lov_io(env, ios);
-       struct cl_io         *io  = ios->cis_io;
+       struct cl_io *io = ios->cis_io;
+       struct lov_io *lio = cl2lov_io(env, ios);
        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
        struct lov_stripe_md_entry *lse;
-       loff_t start = io->u.ci_rw.crw_pos;
+       struct cl_io_range *range = &io->u.ci_rw.rw_range;
+       loff_t start = range->cir_pos;
        loff_t next;
        unsigned long ssize;
        int index;
@@ -484,12 +494,14 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
        if (cl_io_is_append(io))
                RETURN(lov_io_iter_init(env, ios));
 
-       index = lov_lsm_entry(lsm, io->u.ci_rw.crw_pos);
+       index = lov_lsm_entry(lsm, range->cir_pos);
        if (index < 0) { /* non-existing layout component */
                if (io->ci_type == CIT_READ) {
                        /* TODO: it needs to detect the next component and
                         * then set the next pos */
                        io->ci_continue = 0;
+                       /* execute it in main thread */
+                       io->ci_pio = 0;
 
                        RETURN(lov_io_iter_init(env, ios));
                }
@@ -505,20 +517,37 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
        if (next <= start * ssize)
                next = ~0ull;
 
-       LASSERTF(io->u.ci_rw.crw_pos >= lse->lsme_extent.e_start,
-                "pos %lld, [%lld, %lld)\n", io->u.ci_rw.crw_pos,
+       LASSERTF(range->cir_pos >= lse->lsme_extent.e_start,
+                "pos %lld, [%lld, %lld)\n", range->cir_pos,
                 lse->lsme_extent.e_start, lse->lsme_extent.e_end);
        next = min_t(__u64, next, lse->lsme_extent.e_end);
        next = min_t(loff_t, next, lio->lis_io_endpos);
 
-       io->ci_continue = next < lio->lis_io_endpos;
-       io->u.ci_rw.crw_count = next - io->u.ci_rw.crw_pos;
-       lio->lis_pos    = io->u.ci_rw.crw_pos;
-       lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
+       io->ci_continue  = next < lio->lis_io_endpos;
+       range->cir_count = next - range->cir_pos;
+       lio->lis_pos     = range->cir_pos;
+       lio->lis_endpos  = range->cir_pos + range->cir_count;
        CDEBUG(D_VFSTRACE,
-              "stripe: %llu chunk: [%llu, %llu) %llu, %zd\n",
-              (__u64)start, lio->lis_pos, lio->lis_endpos,
-              (__u64)lio->lis_io_endpos, io->u.ci_rw.crw_count);
+              "stripe: {%d, %llu} range: [%llu, %llu) end: %llu, count: %zd\n",
+              index, start, lio->lis_pos, lio->lis_endpos,
+              lio->lis_io_endpos, range->cir_count);
+
+       if (!io->ci_continue) {
+               /* the last piece of IO, execute it in main thread */
+               io->ci_pio = 0;
+       }
+
+       if (io->ci_pio) {
+               /* it only splits IO here for parallel IO,
+                * there will be no actual IO going to occur,
+                * so it doesn't need to invoke lov_io_iter_init()
+                * to initialize sub IOs. */
+               if (!lsm_entry_inited(lsm, index)) {
+                       io->ci_need_write_intent = 1;
+                       RETURN(-ENODATA);
+               }
+               RETURN(0);
+       }
 
        /*
         * XXX The following call should be optimized: we know, that
index 94c3fc5..12535f4 100644 (file)
@@ -44,6 +44,7 @@
 #include <lustre_fid.h>
 #include <cl_object.h>
 #include "cl_internal.h"
+#include <lustre_compat.h>
 
 /*****************************************************************************
  *
@@ -206,17 +207,26 @@ EXPORT_SYMBOL(cl_io_init);
 int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
                   enum cl_io_type iot, loff_t pos, size_t count)
 {
-        LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
-        LINVRNT(io->ci_obj != NULL);
-        ENTRY;
+       LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
+       LINVRNT(io->ci_obj != NULL);
+       ENTRY;
+
+       if (cfs_ptengine_weight(cl_io_engine) < 2)
+               io->ci_pio = 0;
+
+       LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
+                        "io %s range: [%llu, %llu) %s %s %s %s\n",
+                        iot == CIT_READ ? "read" : "write",
+                        pos, pos + count,
+                        io->u.ci_rw.rw_nonblock ? "nonblock" : "block",
+                        io->u.ci_rw.rw_append ? "append" : "-",
+                        io->u.ci_rw.rw_sync ? "sync" : "-",
+                        io->ci_pio ? "pio" : "-");
+
+       io->u.ci_rw.rw_range.cir_pos   = pos;
+       io->u.ci_rw.rw_range.cir_count = count;
 
-        LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
-                        "io range: %u [%llu, %llu) %u %u\n",
-                         iot, (__u64)pos, (__u64)pos + count,
-                         io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
-        io->u.ci_rw.crw_pos    = pos;
-        io->u.ci_rw.crw_count  = count;
-        RETURN(cl_io_init(env, io, iot, io->ci_obj));
+       RETURN(cl_io_init(env, io, iot, io->ci_obj));
 }
 EXPORT_SYMBOL(cl_io_rw_init);
 
@@ -474,8 +484,8 @@ void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io, size_t nob)
 
         ENTRY;
 
-        io->u.ci_rw.crw_pos   += nob;
-        io->u.ci_rw.crw_count -= nob;
+       io->u.ci_rw.rw_range.cir_pos   += nob;
+       io->u.ci_rw.rw_range.cir_count -= nob;
 
         /* layers have to be notified. */
         cl_io_for_each_reverse(scan, io) {
@@ -733,6 +743,53 @@ int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
         return result;
 }
 
+static
+struct cl_io_pt *cl_io_submit_pt(struct cl_io *io, loff_t pos, size_t count)
+{
+       struct cl_io_pt *pt;
+       int rc;
+
+       OBD_ALLOC(pt, sizeof(*pt));
+       if (pt == NULL)
+               RETURN(ERR_PTR(-ENOMEM));
+
+       pt->cip_next = NULL;
+       init_sync_kiocb(&pt->cip_iocb, io->u.ci_rw.rw_file);
+       pt->cip_iocb.ki_pos = pos;
+#ifdef HAVE_KIOCB_KI_LEFT
+       pt->cip_iocb.ki_left = count;
+#elif defined(HAVE_KI_NBYTES)
+       pt->cip_iocb.ki_nbytes = count;
+#endif
+       pt->cip_iter = io->u.ci_rw.rw_iter;
+       iov_iter_truncate(&pt->cip_iter, count);
+       pt->cip_file   = io->u.ci_rw.rw_file;
+       pt->cip_iot    = io->ci_type;
+       pt->cip_pos    = pos;
+       pt->cip_count  = count;
+       pt->cip_result = 0;
+
+       rc = cfs_ptask_init(&pt->cip_task, io->u.ci_rw.rw_ptask, pt,
+                           PTF_ORDERED | PTF_COMPLETE |
+                           PTF_USER_MM | PTF_RETRY, smp_processor_id());
+       if (rc)
+               GOTO(out_error, rc);
+
+       CDEBUG(D_VFSTRACE, "submit %s range: [%llu, %llu)\n",
+               io->ci_type == CIT_READ ? "read" : "write",
+               pos, pos + count);
+
+       rc = cfs_ptask_submit(&pt->cip_task, cl_io_engine);
+       if (rc)
+               GOTO(out_error, rc);
+
+       RETURN(pt);
+
+out_error:
+       OBD_FREE(pt, sizeof(*pt));
+       RETURN(ERR_PTR(rc));
+}
+
 /**
  * Main io loop.
  *
@@ -754,44 +811,124 @@ int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
  */
 int cl_io_loop(const struct lu_env *env, struct cl_io *io)
 {
-        int result   = 0;
+       struct cl_io_pt *pt = NULL, *head = NULL;
+       struct cl_io_pt **tail = &head;
+       loff_t pos;
+       size_t count;
+       size_t last_chunk_count = 0;
+       bool short_io = false;
+       int rc = 0;
+       ENTRY;
 
-        LINVRNT(cl_io_is_loopable(io));
-        ENTRY;
+       LINVRNT(cl_io_is_loopable(io));
 
-        do {
-                size_t nob;
-
-                io->ci_continue = 0;
-                result = cl_io_iter_init(env, io);
-                if (result == 0) {
-                        nob    = io->ci_nob;
-                        result = cl_io_lock(env, io);
-                        if (result == 0) {
-                                /*
-                                 * Notify layers that locks has been taken,
-                                 * and do actual i/o.
-                                 *
-                                 *   - llite: kms, short read;
-                                 *   - llite: generic_file_read();
-                                 */
-                                result = cl_io_start(env, io);
-                                /*
-                                 * Send any remaining pending
-                                 * io, etc.
-                                 *
-                                 *   - llite: ll_rw_stats_tally.
-                                 */
-                                cl_io_end(env, io);
-                                cl_io_unlock(env, io);
-                                cl_io_rw_advance(env, io, io->ci_nob - nob);
-                        }
-                }
-                cl_io_iter_fini(env, io);
-        } while (result == 0 && io->ci_continue);
-       if (result == 0)
-               result = io->ci_result;
-       RETURN(result < 0 ? result : 0);
+       do {
+               io->ci_continue = 0;
+
+               rc = cl_io_iter_init(env, io);
+               if (rc) {
+                       cl_io_iter_fini(env, io);
+                       break;
+               }
+
+               pos   = io->u.ci_rw.rw_range.cir_pos;
+               count = io->u.ci_rw.rw_range.cir_count;
+
+               if (io->ci_pio) {
+                       /* submit this range for parallel execution */
+                       pt = cl_io_submit_pt(io, pos, count);
+                       if (IS_ERR(pt)) {
+                               cl_io_iter_fini(env, io);
+                               rc = PTR_ERR(pt);
+                               break;
+                       }
+
+                       *tail = pt;
+                       tail = &pt->cip_next;
+               } else {
+                       size_t nob = io->ci_nob;
+
+                       CDEBUG(D_VFSTRACE,
+                               "execute type %u range: [%llu, %llu) nob: %zu %s\n",
+                               io->ci_type, pos, pos + count, nob,
+                               io->ci_continue ? "continue" : "stop");
+
+                       rc = cl_io_lock(env, io);
+                       if (rc) {
+                               cl_io_iter_fini(env, io);
+                               break;
+                       }
+
+                       /*
+                        * Notify layers that locks has been taken,
+                        * and do actual i/o.
+                        *
+                        *   - llite: kms, short read;
+                        *   - llite: generic_file_read();
+                        */
+                       rc = cl_io_start(env, io);
+
+                       /*
+                        * Send any remaining pending
+                        * io, etc.
+                        *
+                        *   - llite: ll_rw_stats_tally.
+                        */
+                       cl_io_end(env, io);
+                       cl_io_unlock(env, io);
+
+                       count = io->ci_nob - nob;
+                       last_chunk_count = count;
+               }
+
+               cl_io_rw_advance(env, io, count);
+               cl_io_iter_fini(env, io);
+       } while (!rc && io->ci_continue);
+
+       CDEBUG(D_VFSTRACE, "loop type %u done: nob: %zu, rc: %d %s\n",
+               io->ci_type, io->ci_nob, rc,
+               io->ci_continue ? "continue" : "stop");
+
+       while (head != NULL) {
+               int rc2;
+
+               pt = head;
+               head = head->cip_next;
+
+               rc2 = cfs_ptask_wait_for(&pt->cip_task);
+               LASSERTF(!rc2, "wait for task error: %d\n", rc2);
+
+               rc2 = cfs_ptask_result(&pt->cip_task);
+               CDEBUG(D_VFSTRACE,
+                       "done %s range: [%llu, %llu) ret: %zd, rc: %d\n",
+                       pt->cip_iot == CIT_READ ? "read" : "write",
+                       pt->cip_pos, pt->cip_pos + pt->cip_count,
+                       pt->cip_result, rc2);
+               if (rc2)
+                       rc = rc ? rc : rc2;
+               if (!short_io) {
+                       if (!rc2) /* IO is done by this task successfully */
+                               io->ci_nob += pt->cip_result;
+                       if (pt->cip_result < pt->cip_count) {
+                               /* short IO happened.
+                                * Not necessary to be an error */
+                               CDEBUG(D_VFSTRACE,
+                                       "incomplete range: [%llu, %llu) "
+                                       "last_chunk_count: %zu\n",
+                                       pt->cip_pos,
+                                       pt->cip_pos + pt->cip_count,
+                                       last_chunk_count);
+                               io->ci_nob -= last_chunk_count;
+                               short_io = true;
+                       }
+               }
+               OBD_FREE(pt, sizeof(*pt));
+       }
+
+       CDEBUG(D_VFSTRACE, "return nob: %zu (%s io), rc: %d\n",
+               io->ci_nob, short_io ? "short" : "full", rc);
+
+       RETURN(rc < 0 ? rc : io->ci_result);
 }
 EXPORT_SYMBOL(cl_io_loop);
 
index 4087020..7c7c41f 100644 (file)
@@ -1042,6 +1042,8 @@ static struct lu_kmem_descr cl_object_caches[] = {
         }
 };
 
+struct cfs_ptask_engine *cl_io_engine;
+
 /**
  * Global initialization of cl-data. Create kmem caches, register
  * lu_context_key's, etc.
@@ -1069,8 +1071,17 @@ int cl_global_init(void)
        if (result) /* no cl_env_percpu_fini on error */
                GOTO(out_keys, result);
 
+       cl_io_engine = cfs_ptengine_init("clio", cpu_online_mask);
+       if (IS_ERR(cl_io_engine)) {
+               result = PTR_ERR(cl_io_engine);
+               cl_io_engine = NULL;
+               GOTO(out_percpu, result);
+       }
+
        return 0;
 
+out_percpu:
+       cl_env_percpu_fini();
 out_keys:
        lu_context_key_degister(&cl_key);
 out_kmem:
@@ -1086,6 +1097,8 @@ out:
  */
 void cl_global_fini(void)
 {
+       cfs_ptengine_fini(cl_io_engine);
+       cl_io_engine = NULL;
        cl_env_percpu_fini();
        lu_context_key_degister(&cl_key);
        lu_kmem_fini(cl_object_caches);
index 9c3c8f1..1ab41d8 100644 (file)
@@ -374,8 +374,8 @@ static int osc_io_write_iter_init(const struct lu_env *env,
        if (cl_io_is_append(io))
                RETURN(osc_io_iter_init(env, ios));
 
-       npages = io->u.ci_rw.crw_count >> PAGE_SHIFT;
-       if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
+       npages = io->u.ci_rw.rw_range.cir_count >> PAGE_SHIFT;
+       if (io->u.ci_rw.rw_range.cir_pos & ~PAGE_MASK)
                ++npages;
 
        oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
index 474a5e5..fda349b 100644 (file)
@@ -1141,9 +1141,9 @@ static void osc_lock_set_writer(const struct lu_env *env,
                return;
 
        if (likely(io->ci_type == CIT_WRITE)) {
-               io_start = cl_index(obj, io->u.ci_rw.crw_pos);
-               io_end = cl_index(obj, io->u.ci_rw.crw_pos +
-                                               io->u.ci_rw.crw_count - 1);
+               io_start = cl_index(obj, io->u.ci_rw.rw_range.cir_pos);
+               io_end = cl_index(obj, io->u.ci_rw.rw_range.cir_pos +
+                                 io->u.ci_rw.rw_range.cir_count - 1);
        } else {
                LASSERT(cl_io_is_mkwrite(io));
                io_start = io_end = io->u.ci_fault.ft_index;
index a37944f..3bd6019 100755 (executable)
@@ -6484,6 +6484,17 @@ test_82() { # LU-1031
 }
 run_test 82 "Basic grouplock test ==============================="
 
+test_83() {
+       local sfile="/boot/System.map-$(uname -r)"
+       # define OBD_FAIL_LLITE_PTASK_IO_FAIL 0x140d
+       $LCTL set_param fail_loc=0x140d
+       cp $sfile $DIR/$tfile || error "write failed"
+       diff -c $sfile $DIR/$tfile || error "files are different"
+       $LCTL set_param fail_loc=0
+       rm -f $DIR/$tfile
+}
+run_test 83 "Short write in ptask ==============================="
+
 test_99a() {
        [ -z "$(which cvs 2>/dev/null)" ] && skip_env "could not find cvs" &&
                return