* super-class definitions.
*/
#include <libcfs/libcfs.h>
+#include <libcfs/libcfs_ptask.h>
#include <lu_object.h>
#include <linux/atomic.h>
#include <linux/mutex.h>
struct cl_req_attr;
+extern struct cfs_ptask_engine *cl_io_engine;
+
/**
* Device in the client stack.
*
CL_FSYNC_ALL = 3
};
-struct cl_io_rw_common {
- loff_t crw_pos;
- size_t crw_count;
- int crw_nonblock;
+struct cl_io_range {
+ loff_t cir_pos;
+ size_t cir_count;
+};
+
+struct cl_io_pt {
+ struct cl_io_pt *cip_next;
+ struct cfs_ptask cip_task;
+ struct kiocb cip_iocb;
+ struct iov_iter cip_iter;
+ struct file *cip_file;
+ enum cl_io_type cip_iot;
+ loff_t cip_pos;
+ size_t cip_count;
+ ssize_t cip_result;
};
/**
/** lock requirements, this is just a help info for sublayers. */
enum cl_io_lock_dmd ci_lockreq;
union {
- struct cl_rd_io {
- struct cl_io_rw_common rd;
- } ci_rd;
- struct cl_wr_io {
- struct cl_io_rw_common wr;
- int wr_append;
- int wr_sync;
- } ci_wr;
- struct cl_io_rw_common ci_rw;
+ struct cl_rw_io {
+ struct iov_iter rw_iter;
+ struct kiocb rw_iocb;
+ struct cl_io_range rw_range;
+ struct file *rw_file;
+ unsigned int rw_nonblock:1,
+ rw_append:1,
+ rw_sync:1;
+ int (*rw_ptask)(struct cfs_ptask *ptask);
+ } ci_rw;
struct cl_setattr_io {
struct ost_lvb sa_attr;
unsigned int sa_attr_flags;
/**
* O_NOATIME
*/
- ci_noatime:1;
+ ci_noatime:1,
+ /** Set to 1 if parallel execution is allowed for current I/O? */
+ ci_pio:1;
/**
* Number of pages owned by this IO. For invariant checking.
*/
*/
static inline int cl_io_is_append(const struct cl_io *io)
{
- return io->ci_type == CIT_WRITE && io->u.ci_wr.wr_append;
+ return io->ci_type == CIT_WRITE && io->u.ci_rw.rw_append;
}
static inline int cl_io_is_sync_write(const struct cl_io *io)
{
- return io->ci_type == CIT_WRITE && io->u.ci_wr.wr_sync;
+ return io->ci_type == CIT_WRITE && io->u.ci_rw.rw_sync;
}
static inline int cl_io_is_mkwrite(const struct cl_io *io)
#include <libcfs/libcfs.h>
#include <lustre_patchless_compat.h>
+#include <obd_support.h>
#ifdef HAVE_FS_STRUCT_RWLOCK
# define LOCK_FS_STRUCT(fs) write_lock(&(fs)->lock)
# define GET_POSIX_ACL_XATTR_ENTRY(head) ((head)->a_entries)
#endif
+#ifndef HAVE_IOV_ITER_TRUNCATE
+static inline void iov_iter_truncate(struct iov_iter *i, u64 count)
+{
+ if (i->count > count)
+ i->count = count;
+}
+#endif
+
+#ifndef HAVE_IS_SXID
+static inline bool is_sxid(umode_t mode)
+{
+ return (mode & S_ISUID) || ((mode & S_ISGID) && (mode & S_IXGRP));
+}
+#endif
+
+#ifndef IS_NOSEC
+#define IS_NOSEC(inode) (!is_sxid(inode->i_mode))
+#endif
+
+#ifndef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
+static inline void iov_iter_reexpand(struct iov_iter *i, size_t count)
+{
+ i->count = count;
+}
+
+static inline struct iovec iov_iter_iovec(const struct iov_iter *iter)
+{
+ return (struct iovec) {
+ .iov_base = iter->iov->iov_base + iter->iov_offset,
+ .iov_len = min(iter->count,
+ iter->iov->iov_len - iter->iov_offset),
+ };
+}
+
+#define iov_for_each(iov, iter, start) \
+ for (iter = (start); \
+ (iter).count && ((iov = iov_iter_iovec(&(iter))), 1); \
+ iov_iter_advance(&(iter), (iov).iov_len))
+
+static inline ssize_t
+generic_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
+{
+ struct iovec iov;
+ struct iov_iter i;
+ ssize_t bytes = 0;
+
+ iov_for_each(iov, i, *iter) {
+ ssize_t res;
+
+ res = generic_file_aio_read(iocb, &iov, 1, iocb->ki_pos);
+ if (res <= 0) {
+ if (bytes == 0)
+ bytes = res;
+ break;
+ }
+
+ bytes += res;
+ if (res < iov.iov_len)
+ break;
+ }
+
+ if (bytes > 0)
+ iov_iter_advance(iter, bytes);
+ return bytes;
+}
+
+static inline ssize_t
+__generic_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
+{
+ struct iovec iov;
+ struct iov_iter i;
+ ssize_t bytes = 0;
+
+ /* Since LLITE updates file size at the end of I/O in
+ * vvp_io_commit_write(), append write has to be done in atomic when
+ * there are multiple segments because otherwise each iteration to
+ * __generic_file_aio_write() will see original file size */
+ if (unlikely(iocb->ki_filp->f_flags & O_APPEND && iter->nr_segs > 1)) {
+ struct iovec *iov_copy;
+ int count = 0;
+
+ OBD_ALLOC(iov_copy, sizeof(*iov_copy) * iter->nr_segs);
+ if (!iov_copy)
+ return -ENOMEM;
+
+ iov_for_each(iov, i, *iter)
+ iov_copy[count++] = iov;
+
+ bytes = __generic_file_aio_write(iocb, iov_copy, count,
+ &iocb->ki_pos);
+ OBD_FREE(iov_copy, sizeof(*iov_copy) * iter->nr_segs);
+
+ if (bytes > 0)
+ iov_iter_advance(iter, bytes);
+ return bytes;
+ }
+
+ iov_for_each(iov, i, *iter) {
+ ssize_t res;
+
+ res = __generic_file_aio_write(iocb, &iov, 1, &iocb->ki_pos);
+ if (res <= 0) {
+ if (bytes == 0)
+ bytes = res;
+ break;
+ }
+
+ bytes += res;
+ if (res < iov.iov_len)
+ break;
+ }
+
+ if (bytes > 0)
+ iov_iter_advance(iter, bytes);
+ return bytes;
+}
+#endif /* HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
+
#endif /* _LUSTRE_COMPAT_H */
#define OBD_FAIL_LLITE_NEWNODE_PAUSE 0x140a
#define OBD_FAIL_LLITE_SETDIRSTRIPE_PAUSE 0x140b
#define OBD_FAIL_LLITE_CREATE_NODE_PAUSE 0x140c
+#define OBD_FAIL_LLITE_PTASK_IO_FAIL 0x140d
#define OBD_FAIL_FID_INDIR 0x1501
return false;
}
-static void ll_io_init(struct cl_io *io, const struct file *file, int write)
+static int ll_file_io_ptask(struct cfs_ptask *ptask);
+
+static void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot)
{
- struct inode *inode = file_inode((struct file *)file);
-
- io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
- if (write) {
- io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
- io->u.ci_wr.wr_sync = file->f_flags & O_SYNC ||
- file->f_flags & O_DIRECT ||
- IS_SYNC(inode);
- }
- io->ci_obj = ll_i2info(inode)->lli_clob;
- io->ci_lockreq = CILR_MAYBE;
- if (ll_file_nolock(file)) {
- io->ci_lockreq = CILR_NEVER;
- io->ci_no_srvlock = 1;
- } else if (file->f_flags & O_APPEND) {
- io->ci_lockreq = CILR_MANDATORY;
- }
+ struct inode *inode = file_inode(file);
+ memset(&io->u.ci_rw.rw_iter, 0, sizeof(io->u.ci_rw.rw_iter));
+ init_sync_kiocb(&io->u.ci_rw.rw_iocb, file);
+ io->u.ci_rw.rw_file = file;
+ io->u.ci_rw.rw_ptask = ll_file_io_ptask;
+ io->u.ci_rw.rw_nonblock = !!(file->f_flags & O_NONBLOCK);
+ if (iot == CIT_WRITE) {
+ io->u.ci_rw.rw_append = !!(file->f_flags & O_APPEND);
+ io->u.ci_rw.rw_sync = !!(file->f_flags & O_SYNC ||
+ file->f_flags & O_DIRECT ||
+ IS_SYNC(inode));
+ }
+ io->ci_obj = ll_i2info(inode)->lli_clob;
+ io->ci_lockreq = CILR_MAYBE;
+ if (ll_file_nolock(file)) {
+ io->ci_lockreq = CILR_NEVER;
+ io->ci_no_srvlock = 1;
+ } else if (file->f_flags & O_APPEND) {
+ io->ci_lockreq = CILR_MANDATORY;
+ }
io->ci_noatime = file_is_noatime(file);
+ if (ll_i2sbi(inode)->ll_flags & LL_SBI_PIO)
+ io->ci_pio = !io->u.ci_rw.rw_append;
+ else
+ io->ci_pio = 0;
+}
+
+static int ll_file_io_ptask(struct cfs_ptask *ptask)
+{
+ struct cl_io_pt *pt = ptask->pt_cbdata;
+ struct file *file = pt->cip_file;
+ struct lu_env *env;
+ struct cl_io *io;
+ loff_t pos = pt->cip_pos;
+ int rc;
+ __u16 refcheck;
+ ENTRY;
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(PTR_ERR(env));
+
+ CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n",
+ file_dentry(file)->d_name.name,
+ pt->cip_iot == CIT_READ ? "read" : "write",
+ pos, pos + pt->cip_count);
+
+restart:
+ io = vvp_env_thread_io(env);
+ ll_io_init(io, file, pt->cip_iot);
+ io->u.ci_rw.rw_iter = pt->cip_iter;
+ io->u.ci_rw.rw_iocb = pt->cip_iocb;
+ io->ci_pio = 0; /* It's already in parallel task */
+
+ rc = cl_io_rw_init(env, io, pt->cip_iot, pos,
+ pt->cip_count - pt->cip_result);
+ if (!rc) {
+ struct vvp_io *vio = vvp_env_io(env);
+
+ vio->vui_io_subtype = IO_NORMAL;
+ vio->vui_fd = LUSTRE_FPRIVATE(file);
+
+ ll_cl_add(file, env, io, LCC_RW);
+ rc = cl_io_loop(env, io);
+ ll_cl_remove(file, env);
+ } else {
+ /* cl_io_rw_init() handled IO */
+ rc = io->ci_result;
+ }
+
+ if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LLITE_PTASK_IO_FAIL, 0)) {
+ if (io->ci_nob > 0)
+ io->ci_nob /= 2;
+ rc = -EIO;
+ }
+
+ if (io->ci_nob > 0) {
+ pt->cip_result += io->ci_nob;
+ iov_iter_advance(&pt->cip_iter, io->ci_nob);
+ pos += io->ci_nob;
+ pt->cip_iocb.ki_pos = pos;
+#ifdef HAVE_KIOCB_KI_LEFT
+ pt->cip_iocb.ki_left = pt->cip_count - pt->cip_result;
+#elif defined(HAVE_KI_NBYTES)
+ pt->cip_iocb.ki_nbytes = pt->cip_count - pt->cip_result;
+#endif
+ }
+
+ cl_io_fini(env, io);
+
+ if ((rc == 0 || rc == -ENODATA) &&
+ pt->cip_result < pt->cip_count &&
+ io->ci_need_restart) {
+ CDEBUG(D_VFSTRACE,
+ "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
+ file_dentry(file)->d_name.name,
+ pt->cip_iot == CIT_READ ? "read" : "write",
+ pos, pos + pt->cip_count - pt->cip_result,
+ pt->cip_result, rc);
+ goto restart;
+ }
+
+ CDEBUG(D_VFSTRACE, "%s: %s ret: %zd, rc: %d\n",
+ file_dentry(file)->d_name.name,
+ pt->cip_iot == CIT_READ ? "read" : "write",
+ pt->cip_result, rc);
+
+ cl_env_put(env, &refcheck);
+ RETURN(pt->cip_result > 0 ? 0 : rc);
}
static ssize_t
struct file *file, enum cl_io_type iot,
loff_t *ppos, size_t count)
{
+ struct range_lock range;
struct vvp_io *vio = vvp_env_io(env);
struct inode *inode = file_inode(file);
struct ll_inode_info *lli = ll_i2info(inode);
struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
struct cl_io *io;
+ loff_t pos = *ppos;
ssize_t result = 0;
int rc = 0;
- struct range_lock range;
ENTRY;
- CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: %llu, count: %zu\n",
- file_dentry(file)->d_name.name, iot, *ppos, count);
+ CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n",
+ file_dentry(file)->d_name.name,
+ iot == CIT_READ ? "read" : "write", pos, pos + count);
restart:
io = vvp_env_thread_io(env);
- ll_io_init(io, file, iot == CIT_WRITE);
+ ll_io_init(io, file, iot);
+ if (args->via_io_subtype == IO_NORMAL) {
+ io->u.ci_rw.rw_iter = *args->u.normal.via_iter;
+ io->u.ci_rw.rw_iocb = *args->u.normal.via_iocb;
+ } else {
+ io->ci_pio = 0;
+ }
- if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
+ if (cl_io_rw_init(env, io, iot, pos, count) == 0) {
bool range_locked = false;
if (file->f_flags & O_APPEND)
range_lock_init(&range, 0, LUSTRE_EOF);
else
- range_lock_init(&range, *ppos, *ppos + count - 1);
+ range_lock_init(&range, pos, pos + count - 1);
vio->vui_fd = LUSTRE_FPRIVATE(file);
vio->vui_io_subtype = args->via_io_subtype;
switch (vio->vui_io_subtype) {
case IO_NORMAL:
- vio->vui_iter = args->u.normal.via_iter;
- vio->vui_iocb = args->u.normal.via_iocb;
/* Direct IO reads must also take range lock,
* or multiple reads will try to work on the same pages
* See LU-6227 for details. */
}
ll_cl_add(file, env, io, LCC_RW);
+ if (io->ci_pio && iot == CIT_WRITE && !IS_NOSEC(inode) &&
+ !lli->lli_inode_locked) {
+ inode_lock(inode);
+ lli->lli_inode_locked = 1;
+ }
rc = cl_io_loop(env, io);
+ if (lli->lli_inode_locked) {
+ lli->lli_inode_locked = 0;
+ inode_unlock(inode);
+ }
ll_cl_remove(file, env);
if (range_locked) {
if (io->ci_nob > 0) {
result += io->ci_nob;
- count -= io->ci_nob;
- *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
+ count -= io->ci_nob;
- /* prepare IO restart */
- if (count > 0 && args->via_io_subtype == IO_NORMAL)
- args->u.normal.via_iter = vio->vui_iter;
+ if (args->via_io_subtype == IO_NORMAL) {
+ iov_iter_advance(args->u.normal.via_iter, io->ci_nob);
+ pos += io->ci_nob;
+ args->u.normal.via_iocb->ki_pos = pos;
+#ifdef HAVE_KIOCB_KI_LEFT
+ args->u.normal.via_iocb->ki_left = count;
+#elif defined(HAVE_KI_NBYTES)
+ args->u.normal.via_iocb->ki_nbytes = count;
+#endif
+ } else {
+ /* for splice */
+ pos = io->u.ci_rw.rw_range.cir_pos;
+ }
}
out:
cl_io_fini(env, io);
if ((rc == 0 || rc == -ENODATA) && count > 0 && io->ci_need_restart) {
CDEBUG(D_VFSTRACE,
- "%s: restart %s from %lld, count:%zu, result: %zd\n",
- file_dentry(file)->d_name.name,
- iot == CIT_READ ? "read" : "write",
- *ppos, count, result);
+ "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
+ file_dentry(file)->d_name.name,
+ iot == CIT_READ ? "read" : "write",
+ pos, pos + count, result, rc);
goto restart;
}
}
}
- CDEBUG(D_VFSTRACE, "iot: %d, result: %zd\n", iot, result);
+ CDEBUG(D_VFSTRACE, "%s: %s *ppos: %llu, pos: %llu, ret: %zd, rc: %d\n",
+ file_dentry(file)->d_name.name,
+ iot == CIT_READ ? "read" : "write", *ppos, pos, result, rc);
+
+ *ppos = pos;
RETURN(result > 0 ? result : rc);
}
int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
{
struct dentry *dentry = file_dentry(file);
+ bool lock_inode;
#elif defined(HAVE_FILE_FSYNC_2ARGS)
int ll_fsync(struct file *file, int datasync)
{
#ifdef HAVE_FILE_FSYNC_4ARGS
rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
- inode_lock(inode);
+ lock_inode = !lli->lli_inode_locked;
+ if (lock_inode)
+ inode_lock(inode);
#else
/* fsync's caller has already called _fdata{sync,write}, we want
* that IO to finish before calling the osc and mdc sync methods */
}
#ifdef HAVE_FILE_FSYNC_4ARGS
- inode_unlock(inode);
+ if (lock_inode)
+ inode_unlock(inode);
#endif
RETURN(rc);
}
#include <linux/compat.h>
#include <linux/aio.h>
+#include <lustre_compat.h>
#include "vvp_internal.h"
#include "range_lock.h"
/* update atime from MDS no matter if it's older than
* local inode atime. */
- unsigned int lli_update_atime:1;
+ unsigned int lli_update_atime:1,
+ lli_inode_locked:1;
/* Try to make the d::member and f::member are aligned. Before using
* these members, make clear whether it is directory or not. */
* suppress_pings */
#define LL_SBI_FAST_READ 0x400000 /* fast read support */
#define LL_SBI_FILE_SECCTX 0x800000 /* set file security context at create */
+#define LL_SBI_PIO 0x1000000 /* parallel IO support */
#define LL_SBI_FLAGS { \
"nolck", \
"always_ping", \
"fast_read", \
"file_secctx", \
+ "pio", \
}
/* This is embedded into llite super-blocks to keep track of connect
u64 cl_fid_build_ino(const struct lu_fid *fid, int api32);
u32 cl_fid_build_gen(const struct lu_fid *fid);
-#ifndef HAVE_IOV_ITER_TRUNCATE
-static inline void iov_iter_truncate(struct iov_iter *i, u64 count)
-{
- if (i->count > count)
- i->count = count;
-}
-#endif
-
-#ifndef HAVE_IS_SXID
-static inline bool is_sxid(umode_t mode)
-{
- return (mode & S_ISUID) || ((mode & S_ISGID) && (mode & S_IXGRP));
-}
-#endif
-
-#ifndef IS_NOSEC
-#define IS_NOSEC(inode) (!is_sxid(inode->i_mode))
-#endif
-
-#ifndef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
-static inline void iov_iter_reexpand(struct iov_iter *i, size_t count)
-{
- i->count = count;
-}
-
-static inline struct iovec iov_iter_iovec(const struct iov_iter *iter)
-{
- return (struct iovec) {
- .iov_base = iter->iov->iov_base + iter->iov_offset,
- .iov_len = min(iter->count,
- iter->iov->iov_len - iter->iov_offset),
- };
-}
-
-#define iov_for_each(iov, iter, start) \
- for (iter = (start); \
- (iter).count && ((iov = iov_iter_iovec(&(iter))), 1); \
- iov_iter_advance(&(iter), (iov).iov_len))
-
-static inline ssize_t
-generic_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
-{
- struct iovec iov;
- struct iov_iter i;
- ssize_t bytes = 0;
-
- iov_for_each(iov, i, *iter) {
- ssize_t res;
-
- res = generic_file_aio_read(iocb, &iov, 1, iocb->ki_pos);
- if (res <= 0) {
- if (bytes == 0)
- bytes = res;
- break;
- }
-
- bytes += res;
- if (res < iov.iov_len)
- break;
- }
-
- if (bytes > 0)
- iov_iter_advance(iter, bytes);
- return bytes;
-}
-
-static inline ssize_t
-__generic_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
-{
- struct iovec iov;
- struct iov_iter i;
- ssize_t bytes = 0;
-
- /* Since LLITE updates file size at the end of I/O in
- * vvp_io_commit_write(), append write has to be done in atomic when
- * there are multiple segments because otherwise each iteration to
- * __generic_file_aio_write() will see original file size */
- if (unlikely(iocb->ki_filp->f_flags & O_APPEND && iter->nr_segs > 1)) {
- struct iovec *iov_copy;
- int count = 0;
-
- OBD_ALLOC(iov_copy, sizeof(*iov_copy) * iter->nr_segs);
- if (!iov_copy)
- return -ENOMEM;
-
- iov_for_each(iov, i, *iter)
- iov_copy[count++] = iov;
-
- bytes = __generic_file_aio_write(iocb, iov_copy, count,
- &iocb->ki_pos);
- OBD_FREE(iov_copy, sizeof(*iov_copy) * iter->nr_segs);
-
- if (bytes > 0)
- iov_iter_advance(iter, bytes);
- return bytes;
- }
-
- iov_for_each(iov, i, *iter) {
- ssize_t res;
-
- res = __generic_file_aio_write(iocb, &iov, 1, &iocb->ki_pos);
- if (res <= 0) {
- if (bytes == 0)
- bytes = res;
- break;
- }
-
- bytes += res;
- if (res < iov.iov_len)
- break;
- }
-
- if (bytes > 0)
- iov_iter_advance(iter, bytes);
- return bytes;
-}
-#endif /* HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
#endif /* LLITE_INTERNAL_H */
}
LPROC_SEQ_FOPS(ll_fast_read);
+static int ll_pio_seq_show(struct seq_file *m, void *v)
+{
+ struct super_block *sb = m->private;
+ struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+ seq_printf(m, "%u\n", !!(sbi->ll_flags & LL_SBI_PIO));
+ return 0;
+}
+
+static ssize_t ll_pio_seq_write(struct file *file, const char __user *buffer,
+ size_t count, loff_t *off)
+{
+ struct seq_file *m = file->private_data;
+ struct super_block *sb = m->private;
+ struct ll_sb_info *sbi = ll_s2sbi(sb);
+ int rc;
+ __s64 val;
+
+ rc = lprocfs_str_to_s64(buffer, count, &val);
+ if (rc)
+ return rc;
+
+ spin_lock(&sbi->ll_lock);
+ if (val == 1)
+ sbi->ll_flags |= LL_SBI_PIO;
+ else
+ sbi->ll_flags &= ~LL_SBI_PIO;
+ spin_unlock(&sbi->ll_lock);
+
+ return count;
+}
+LPROC_SEQ_FOPS(ll_pio);
+
static int ll_unstable_stats_seq_show(struct seq_file *m, void *v)
{
struct super_block *sb = m->private;
.fops = &ll_root_squash_fops },
{ .name = "nosquash_nids",
.fops = &ll_nosquash_nids_fops },
- { .name = "fast_read",
- .fops = &ll_fast_read_fops, },
+ { .name = "fast_read",
+ .fops = &ll_fast_read_fops, },
+ { .name = "pio",
+ .fops = &ll_pio_fops, },
{ NULL }
};
int result = 0;
ENTRY;
- CDEBUG(D_VFSTRACE, "Writing %lu of %d to %d bytes\n", index, from, len);
+ CDEBUG(D_PAGE, "Writing %lu of %d to %d bytes\n", index, from, len);
lcc = ll_cl_find(file);
if (lcc == NULL) {
if (plist->pl_nr >= PTLRPC_MAX_BRW_PAGES)
unplug = true;
- CL_PAGE_DEBUG(D_VFSTRACE, env, page,
+ CL_PAGE_DEBUG(D_PAGE, env, page,
"queued page: %d.\n", plist->pl_nr);
} else {
cl_page_disown(env, io, page);
/** super class */
struct cl_io_slice vui_cl;
struct cl_io_lock_link vui_link;
- /**
- * I/O vector information to or from which read/write is going.
- */
- struct iov_iter *vui_iter;
- /**
- * Total size for the left IO.
- */
+ /** Total size for the left IO. */
size_t vui_tot_count;
union {
* File descriptor against which IO is done.
*/
struct ll_file_data *vui_fd;
- struct kiocb *vui_iocb;
/* Readahead state. */
pgoff_t vui_ra_start;
CLOBINVRNT(env, obj, vvp_object_invariant(obj));
CDEBUG(D_VFSTRACE, DFID" ignore/verify layout %d/%d, layout version %d "
- "need write layout %d, restore needed %d\n",
+ "need write layout %d, restore needed %d\n",
PFID(lu_object_fid(&obj->co_lu)),
io->ci_ignore_layout, io->ci_verify_layout,
vio->vui_layout_gen, io->ci_need_write_intent,
if (io->ci_type == CIT_WRITE) {
if (!cl_io_is_append(io)) {
- start = io->u.ci_rw.crw_pos;
- end = start + io->u.ci_rw.crw_count;
+ start = io->u.ci_rw.rw_range.cir_pos;
+ end = start + io->u.ci_rw.rw_range.cir_count;
}
} else if (cl_io_is_trunc(io)) {
end = io->u.ci_setattr.sa_attr.lvb_size;
end = cl_offset(io->ci_obj, index + 1);
}
- CDEBUG(D_VFSTRACE, DFID" type %d [%llx, %llx)\n",
+ CDEBUG(D_VFSTRACE, DFID" write layout, type %u [%llu, %llu)\n",
PFID(lu_object_fid(&obj->co_lu)), io->ci_type,
start, end);
rc = ll_layout_write_intent(inode, start, end);
return CLM_READ;
}
-static int vvp_mmap_locks(const struct lu_env *env,
- struct vvp_io *vio, struct cl_io *io)
+static int vvp_mmap_locks(const struct lu_env *env, struct cl_io *io)
{
struct vvp_thread_info *vti = vvp_env_info(env);
struct mm_struct *mm = current->mm;
if (!cl_is_normalio(env, io))
RETURN(0);
- /* nfs or loop back device write */
- if (vio->vui_iter == NULL)
- RETURN(0);
-
/* No MM (e.g. NFS)? No vmas too. */
if (mm == NULL)
RETURN(0);
- iov_for_each(iov, i, *(vio->vui_iter)) {
+ iov_for_each(iov, i, io->u.ci_rw.rw_iter) {
unsigned long addr = (unsigned long)iov.iov_base;
size_t count = iov.iov_len;
return;
vio->vui_tot_count -= nob;
- iov_iter_reexpand(vio->vui_iter, vio->vui_tot_count);
-}
-
-static void vvp_io_update_iov(const struct lu_env *env,
- struct vvp_io *vio, struct cl_io *io)
-{
- size_t size = io->u.ci_rw.crw_count;
-
- if (!cl_is_normalio(env, io) || vio->vui_iter == NULL)
- return;
-
- iov_iter_truncate(vio->vui_iter, size);
+ if (io->ci_pio) {
+ iov_iter_advance(&io->u.ci_rw.rw_iter, nob);
+ io->u.ci_rw.rw_iocb.ki_pos = io->u.ci_rw.rw_range.cir_pos;
+#ifdef HAVE_KIOCB_KI_LEFT
+ io->u.ci_rw.rw_iocb.ki_left = vio->vui_tot_count;
+#elif defined(HAVE_KI_NBYTES)
+ io->u.ci_rw.rw_iocb.ki_nbytes = vio->vui_tot_count;
+#endif
+ } else {
+ /* It was truncated to stripe size in vvp_io_rw_lock() */
+ iov_iter_reexpand(&io->u.ci_rw.rw_iter, vio->vui_tot_count);
+ }
}
static int vvp_io_rw_lock(const struct lu_env *env, struct cl_io *io,
enum cl_lock_mode mode, loff_t start, loff_t end)
{
- struct vvp_io *vio = vvp_env_io(env);
int result;
int ast_flags = 0;
LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
ENTRY;
- vvp_io_update_iov(env, vio, io);
+ if (cl_is_normalio(env, io))
+ iov_iter_truncate(&io->u.ci_rw.rw_iter,
+ io->u.ci_rw.rw_range.cir_count);
- if (io->u.ci_rw.crw_nonblock)
+ if (io->u.ci_rw.rw_nonblock)
ast_flags |= CEF_NONBLOCK;
- result = vvp_mmap_locks(env, vio, io);
+ result = vvp_mmap_locks(env, io);
if (result == 0)
result = vvp_io_one_lock(env, io, ast_flags, mode, start, end);
- RETURN(result);
+ RETURN(result);
}
static int vvp_io_read_lock(const struct lu_env *env,
const struct cl_io_slice *ios)
{
- struct cl_io *io = ios->cis_io;
- struct cl_io_rw_common *rd = &io->u.ci_rd.rd;
- int result;
+ struct cl_io *io = ios->cis_io;
+ struct cl_io_range *range = &io->u.ci_rw.rw_range;
+ int rc;
ENTRY;
- result = vvp_io_rw_lock(env, io, CLM_READ, rd->crw_pos,
- rd->crw_pos + rd->crw_count - 1);
- RETURN(result);
+ rc = vvp_io_rw_lock(env, io, CLM_READ, range->cir_pos,
+ range->cir_pos + range->cir_count - 1);
+ RETURN(rc);
}
static int vvp_io_fault_lock(const struct lu_env *env,
static int vvp_io_write_lock(const struct lu_env *env,
const struct cl_io_slice *ios)
{
- struct cl_io *io = ios->cis_io;
- loff_t start;
- loff_t end;
+ struct cl_io *io = ios->cis_io;
+ loff_t start;
+ loff_t end;
+ int rc;
- if (io->u.ci_wr.wr_append) {
- start = 0;
- end = OBD_OBJECT_EOF;
- } else {
- start = io->u.ci_wr.wr.crw_pos;
- end = start + io->u.ci_wr.wr.crw_count - 1;
- }
- return vvp_io_rw_lock(env, io, CLM_WRITE, start, end);
+ ENTRY;
+ if (io->u.ci_rw.rw_append) {
+ start = 0;
+ end = OBD_OBJECT_EOF;
+ } else {
+ start = io->u.ci_rw.rw_range.cir_pos;
+ end = start + io->u.ci_rw.rw_range.cir_count - 1;
+ }
+ rc = vvp_io_rw_lock(env, io, CLM_WRITE, start, end);
+ RETURN(rc);
}
static int vvp_io_setattr_iter_init(const struct lu_env *env,
struct inode *inode = vvp_object_inode(obj);
struct ll_inode_info *lli = ll_i2info(inode);
struct file *file = vio->vui_fd->fd_file;
-
- int result;
- loff_t pos = io->u.ci_rd.rd.crw_pos;
- long cnt = io->u.ci_rd.rd.crw_count;
- long tot = vio->vui_tot_count;
- int exceed = 0;
+ struct cl_io_range *range = &io->u.ci_rw.rw_range;
+ loff_t pos = range->cir_pos; /* for generic_file_splice_read() only */
+ size_t tot = vio->vui_tot_count;
+ int exceed = 0;
+ int result;
CLOBINVRNT(env, obj, vvp_object_invariant(obj));
- CDEBUG(D_VFSTRACE, "read: -> [%lli, %lli)\n", pos, pos + cnt);
+ CDEBUG(D_VFSTRACE, "%s: read [%llu, %llu)\n",
+ file_dentry(file)->d_name.name,
+ range->cir_pos, range->cir_pos + range->cir_count);
if (vio->vui_io_subtype == IO_NORMAL)
down_read(&lli->lli_trunc_sem);
if (!can_populate_pages(env, io, inode))
return 0;
- result = vvp_prep_size(env, obj, io, pos, tot, &exceed);
+ result = vvp_prep_size(env, obj, io, range->cir_pos, tot, &exceed);
if (result != 0)
return result;
else if (exceed != 0)
goto out;
LU_OBJECT_HEADER(D_INODE, env, &obj->co_lu,
- "Read ino %lu, %lu bytes, offset %lld, size %llu\n",
- inode->i_ino, cnt, pos, i_size_read(inode));
+ "Read ino %lu, %lu bytes, offset %lld, size %llu\n",
+ inode->i_ino, range->cir_count, range->cir_pos,
+ i_size_read(inode));
/* turn off the kernel's read-ahead */
vio->vui_fd->fd_file->f_ra.ra_pages = 0;
/* initialize read-ahead window once per syscall */
if (!vio->vui_ra_valid) {
vio->vui_ra_valid = true;
- vio->vui_ra_start = cl_index(obj, pos);
+ vio->vui_ra_start = cl_index(obj, range->cir_pos);
vio->vui_ra_count = cl_index(obj, tot + PAGE_SIZE - 1);
ll_ras_enter(file);
}
file_accessed(file);
switch (vio->vui_io_subtype) {
case IO_NORMAL:
- LASSERT(vio->vui_iocb->ki_pos == pos);
- result = generic_file_read_iter(vio->vui_iocb, vio->vui_iter);
+ LASSERTF(io->u.ci_rw.rw_iocb.ki_pos == range->cir_pos,
+ "ki_pos %lld [%lld, %lld)\n",
+ io->u.ci_rw.rw_iocb.ki_pos,
+ range->cir_pos, range->cir_pos + range->cir_count);
+ result = generic_file_read_iter(&io->u.ci_rw.rw_iocb,
+ &io->u.ci_rw.rw_iter);
break;
case IO_SPLICE:
result = generic_file_splice_read(file, &pos,
- vio->u.splice.vui_pipe, cnt,
+ vio->u.splice.vui_pipe,
+ range->cir_count,
vio->u.splice.vui_flags);
/* LU-1109: do splice read stripe by stripe otherwise if it
* may make nfsd stuck if this read occupied all internal pipe
out:
if (result >= 0) {
- if (result < cnt)
+ if (result < range->cir_count)
io->ci_continue = 0;
io->ci_nob += result;
ll_rw_stats_tally(ll_i2sbi(inode), current->pid, vio->vui_fd,
- pos, result, READ);
+ range->cir_pos, result, READ);
result = 0;
}
SetPageUptodate(cl_page_vmpage(page));
cl_page_disown(env, io, page);
- /* held in ll_cl_init() */
lu_ref_del(&page->cp_reference, "cl_io", io);
cl_page_put(env, page);
}
cl_page_disown(env, io, page);
- /* held in ll_cl_init() */
lu_ref_del(&page->cp_reference, "cl_io", cl_io_top(io));
cl_page_put(env, page);
}
cl_page_disown(env, io, page);
- /* held in ll_cl_init() */
lu_ref_del(&page->cp_reference, "cl_io", io);
cl_page_put(env, page);
}
struct cl_object *obj = io->ci_obj;
struct inode *inode = vvp_object_inode(obj);
struct ll_inode_info *lli = ll_i2info(inode);
+ struct file *file = vio->vui_fd->fd_file;
+ struct cl_io_range *range = &io->u.ci_rw.rw_range;
+ bool lock_inode = !lli->lli_inode_locked &&
+ !IS_NOSEC(inode);
ssize_t result = 0;
- loff_t pos = io->u.ci_wr.wr.crw_pos;
- size_t cnt = io->u.ci_wr.wr.crw_count;
-
ENTRY;
if (vio->vui_io_subtype == IO_NORMAL)
* out-of-order writes.
*/
ll_merge_attr(env, inode);
- pos = io->u.ci_wr.wr.crw_pos = i_size_read(inode);
- vio->vui_iocb->ki_pos = pos;
+ range->cir_pos = i_size_read(inode);
+ io->u.ci_rw.rw_iocb.ki_pos = range->cir_pos;
} else {
- LASSERT(vio->vui_iocb->ki_pos == pos);
+ LASSERTF(io->u.ci_rw.rw_iocb.ki_pos == range->cir_pos,
+ "ki_pos %lld [%lld, %lld)\n",
+ io->u.ci_rw.rw_iocb.ki_pos,
+ range->cir_pos, range->cir_pos + range->cir_count);
}
- CDEBUG(D_VFSTRACE, "write: [%lli, %lli)\n", pos, pos + (long long)cnt);
+ CDEBUG(D_VFSTRACE, "%s: write [%llu, %llu)\n",
+ file_dentry(file)->d_name.name,
+ range->cir_pos, range->cir_pos + range->cir_count);
/* The maximum Lustre file size is variable, based on the OST maximum
* object size and number of stripes. This needs another check in
* addition to the VFS checks earlier. */
- if (pos + cnt > ll_file_maxbytes(inode)) {
+ if (range->cir_pos + range->cir_count > ll_file_maxbytes(inode)) {
CDEBUG(D_INODE,
- "%s: file "DFID" offset %llu > maxbytes %llu\n",
+ "%s: file %s ("DFID") offset %llu > maxbytes %llu\n",
ll_get_fsname(inode->i_sb, NULL, 0),
- PFID(ll_inode2fid(inode)), pos + cnt,
+ file_dentry(file)->d_name.name,
+ PFID(ll_inode2fid(inode)),
+ range->cir_pos + range->cir_count,
ll_file_maxbytes(inode));
RETURN(-EFBIG);
}
- if (vio->vui_iter == NULL) {
- /* from a temp io in ll_cl_init(). */
- result = 0;
- } else {
- /*
- * When using the locked AIO function (generic_file_aio_write())
- * testing has shown the inode mutex to be a limiting factor
- * with multi-threaded single shared file performance. To get
- * around this, we now use the lockless version. To maintain
- * consistency, proper locking to protect against writes,
- * trucates, etc. is handled in the higher layers of lustre.
- */
- bool lock_node = !IS_NOSEC(inode);
-
- if (lock_node)
- inode_lock(inode);
- result = __generic_file_write_iter(vio->vui_iocb,
- vio->vui_iter);
- if (lock_node)
- inode_unlock(inode);
+ /*
+ * When using the locked AIO function (generic_file_aio_write())
+ * testing has shown the inode mutex to be a limiting factor
+ * with multi-threaded single shared file performance. To get
+ * around this, we now use the lockless version. To maintain
+ * consistency, proper locking to protect against writes,
+ * trucates, etc. is handled in the higher layers of lustre.
+ */
+ if (lock_inode)
+ inode_lock(inode);
+ result = __generic_file_write_iter(&io->u.ci_rw.rw_iocb,
+ &io->u.ci_rw.rw_iter);
+ if (lock_inode)
+ inode_unlock(inode);
- if (result > 0 || result == -EIOCBQUEUED)
+ if (result > 0 || result == -EIOCBQUEUED)
#ifdef HAVE_GENERIC_WRITE_SYNC_2ARGS
- result = generic_write_sync(vio->vui_iocb, result);
+ result = generic_write_sync(&io->u.ci_rw.rw_iocb, result);
#else
- {
- ssize_t err;
+ {
+ ssize_t err;
- err = generic_write_sync(vio->vui_iocb->ki_filp, pos,
- result);
- if (err < 0 && result > 0)
- result = err;
- }
+ err = generic_write_sync(io->u.ci_rw.rw_iocb.ki_filp,
+ range->cir_pos, result);
+ if (err < 0 && result > 0)
+ result = err;
+ }
#endif
- }
if (result > 0) {
result = vvp_io_write_commit(env, io);
if (vio->u.write.vui_written > 0) {
result = vio->u.write.vui_written;
- io->ci_nob += result;
-
- CDEBUG(D_VFSTRACE, "write: nob %zd, result: %zd\n",
+ CDEBUG(D_VFSTRACE, "%s: write nob %zd, result: %zd\n",
+ file_dentry(file)->d_name.name,
io->ci_nob, result);
+ io->ci_nob += result;
}
}
if (result > 0) {
ll_file_set_flag(ll_i2info(inode), LLIF_DATA_MODIFIED);
- if (result < cnt)
+ if (result < range->cir_count)
io->ci_continue = 0;
ll_rw_stats_tally(ll_i2sbi(inode), current->pid,
- vio->vui_fd, pos, result, WRITE);
+ vio->vui_fd, range->cir_pos, result, WRITE);
result = 0;
}
vio->vui_ra_valid = false;
result = 0;
if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
- size_t count;
struct ll_inode_info *lli = ll_i2info(inode);
- count = io->u.ci_rw.crw_count;
- /* "If nbyte is 0, read() will return 0 and have no other
- * results." -- Single Unix Spec */
- if (count == 0)
- result = 1;
- else
- vio->vui_tot_count = count;
+ vio->vui_tot_count = io->u.ci_rw.rw_range.cir_count;
+ /* "If nbyte is 0, read() will return 0 and have no other
+ * results." -- Single Unix Spec */
+ if (vio->vui_tot_count == 0)
+ result = 1;
/* for read/write, we store the jobid in the inode, and
* it'll be fetched by osc when building RPC.
sub_io->ci_type = io->ci_type;
sub_io->ci_no_srvlock = io->ci_no_srvlock;
sub_io->ci_noatime = io->ci_noatime;
+ sub_io->ci_pio = io->ci_pio;
result = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
LASSERT(obj->lo_lsm != NULL);
- switch (io->ci_type) {
- case CIT_READ:
- case CIT_WRITE:
- lio->lis_pos = io->u.ci_rw.crw_pos;
- lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
- lio->lis_io_endpos = lio->lis_endpos;
- if (cl_io_is_append(io)) {
- LASSERT(io->ci_type == CIT_WRITE);
+ switch (io->ci_type) {
+ case CIT_READ:
+ case CIT_WRITE:
+ lio->lis_pos = io->u.ci_rw.rw_range.cir_pos;
+ lio->lis_endpos = lio->lis_pos + io->u.ci_rw.rw_range.cir_count;
+ lio->lis_io_endpos = lio->lis_endpos;
+ if (cl_io_is_append(io)) {
+ LASSERT(io->ci_type == CIT_WRITE);
/* If there is LOV EA hole, then we may cannot locate
* the current file-tail exactly. */
LOV_PATTERN_F_HOLE))
RETURN(-EIO);
- lio->lis_pos = 0;
- lio->lis_endpos = OBD_OBJECT_EOF;
- }
- break;
+ lio->lis_pos = 0;
+ lio->lis_endpos = OBD_OBJECT_EOF;
+ }
+ break;
case CIT_SETATTR:
if (cl_io_is_trunc(io))
int index = lov_comp_entry(sub->sub_subio_index);
int stripe = lov_comp_stripe(sub->sub_subio_index);
+ io->ci_pio = parent->ci_pio;
switch (io->ci_type) {
case CIT_SETATTR: {
io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
}
case CIT_READ:
case CIT_WRITE: {
- io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
+ io->u.ci_rw.rw_ptask = parent->u.ci_rw.rw_ptask;
+ io->u.ci_rw.rw_iter = parent->u.ci_rw.rw_iter;
+ io->u.ci_rw.rw_iocb = parent->u.ci_rw.rw_iocb;
+ io->u.ci_rw.rw_file = parent->u.ci_rw.rw_file;
+ io->u.ci_rw.rw_sync = parent->u.ci_rw.rw_sync;
if (cl_io_is_append(parent)) {
- io->u.ci_wr.wr_append = 1;
+ io->u.ci_rw.rw_append = 1;
} else {
- io->u.ci_rw.crw_pos = start;
- io->u.ci_rw.crw_count = end - start;
+ io->u.ci_rw.rw_range.cir_pos = start;
+ io->u.ci_rw.rw_range.cir_count = end - start;
}
break;
}
* it's handled in lov_io_setattr_iter_init() */
if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io)) {
io->ci_need_write_intent = 1;
+ /* execute it in main thread */
+ io->ci_pio = 0;
rc = -ENODATA;
break;
}
if (rc != 0)
break;
- CDEBUG(D_VFSTRACE, "shrink: %d [%llu, %llu)\n",
- stripe, start, end);
+ CDEBUG(D_VFSTRACE,
+ "shrink stripe: {%d, %d} range: [%llu, %llu)\n",
+ index, stripe, start, end);
list_add_tail(&sub->sub_linkage, &lio->lis_active);
}
static int lov_io_rw_iter_init(const struct lu_env *env,
const struct cl_io_slice *ios)
{
- struct lov_io *lio = cl2lov_io(env, ios);
- struct cl_io *io = ios->cis_io;
+ struct cl_io *io = ios->cis_io;
+ struct lov_io *lio = cl2lov_io(env, ios);
struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
struct lov_stripe_md_entry *lse;
- loff_t start = io->u.ci_rw.crw_pos;
+ struct cl_io_range *range = &io->u.ci_rw.rw_range;
+ loff_t start = range->cir_pos;
loff_t next;
unsigned long ssize;
int index;
if (cl_io_is_append(io))
RETURN(lov_io_iter_init(env, ios));
- index = lov_lsm_entry(lsm, io->u.ci_rw.crw_pos);
+ index = lov_lsm_entry(lsm, range->cir_pos);
if (index < 0) { /* non-existing layout component */
if (io->ci_type == CIT_READ) {
/* TODO: it needs to detect the next component and
* then set the next pos */
io->ci_continue = 0;
+ /* execute it in main thread */
+ io->ci_pio = 0;
RETURN(lov_io_iter_init(env, ios));
}
if (next <= start * ssize)
next = ~0ull;
- LASSERTF(io->u.ci_rw.crw_pos >= lse->lsme_extent.e_start,
- "pos %lld, [%lld, %lld)\n", io->u.ci_rw.crw_pos,
+ LASSERTF(range->cir_pos >= lse->lsme_extent.e_start,
+ "pos %lld, [%lld, %lld)\n", range->cir_pos,
lse->lsme_extent.e_start, lse->lsme_extent.e_end);
next = min_t(__u64, next, lse->lsme_extent.e_end);
next = min_t(loff_t, next, lio->lis_io_endpos);
- io->ci_continue = next < lio->lis_io_endpos;
- io->u.ci_rw.crw_count = next - io->u.ci_rw.crw_pos;
- lio->lis_pos = io->u.ci_rw.crw_pos;
- lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
+ io->ci_continue = next < lio->lis_io_endpos;
+ range->cir_count = next - range->cir_pos;
+ lio->lis_pos = range->cir_pos;
+ lio->lis_endpos = range->cir_pos + range->cir_count;
CDEBUG(D_VFSTRACE,
- "stripe: %llu chunk: [%llu, %llu) %llu, %zd\n",
- (__u64)start, lio->lis_pos, lio->lis_endpos,
- (__u64)lio->lis_io_endpos, io->u.ci_rw.crw_count);
+ "stripe: {%d, %llu} range: [%llu, %llu) end: %llu, count: %zd\n",
+ index, start, lio->lis_pos, lio->lis_endpos,
+ lio->lis_io_endpos, range->cir_count);
+
+ if (!io->ci_continue) {
+ /* the last piece of IO, execute it in main thread */
+ io->ci_pio = 0;
+ }
+
+ if (io->ci_pio) {
+ /* it only splits IO here for parallel IO,
+ * there will be no actual IO going to occur,
+ * so it doesn't need to invoke lov_io_iter_init()
+ * to initialize sub IOs. */
+ if (!lsm_entry_inited(lsm, index)) {
+ io->ci_need_write_intent = 1;
+ RETURN(-ENODATA);
+ }
+ RETURN(0);
+ }
/*
* XXX The following call should be optimized: we know, that
#include <lustre_fid.h>
#include <cl_object.h>
#include "cl_internal.h"
+#include <lustre_compat.h>
/*****************************************************************************
*
int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
enum cl_io_type iot, loff_t pos, size_t count)
{
- LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
- LINVRNT(io->ci_obj != NULL);
- ENTRY;
+ LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
+ LINVRNT(io->ci_obj != NULL);
+ ENTRY;
+
+ if (cfs_ptengine_weight(cl_io_engine) < 2)
+ io->ci_pio = 0;
+
+ LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
+ "io %s range: [%llu, %llu) %s %s %s %s\n",
+ iot == CIT_READ ? "read" : "write",
+ pos, pos + count,
+ io->u.ci_rw.rw_nonblock ? "nonblock" : "block",
+ io->u.ci_rw.rw_append ? "append" : "-",
+ io->u.ci_rw.rw_sync ? "sync" : "-",
+ io->ci_pio ? "pio" : "-");
+
+ io->u.ci_rw.rw_range.cir_pos = pos;
+ io->u.ci_rw.rw_range.cir_count = count;
- LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
- "io range: %u [%llu, %llu) %u %u\n",
- iot, (__u64)pos, (__u64)pos + count,
- io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
- io->u.ci_rw.crw_pos = pos;
- io->u.ci_rw.crw_count = count;
- RETURN(cl_io_init(env, io, iot, io->ci_obj));
+ RETURN(cl_io_init(env, io, iot, io->ci_obj));
}
EXPORT_SYMBOL(cl_io_rw_init);
ENTRY;
- io->u.ci_rw.crw_pos += nob;
- io->u.ci_rw.crw_count -= nob;
+ io->u.ci_rw.rw_range.cir_pos += nob;
+ io->u.ci_rw.rw_range.cir_count -= nob;
/* layers have to be notified. */
cl_io_for_each_reverse(scan, io) {
return result;
}
+static
+struct cl_io_pt *cl_io_submit_pt(struct cl_io *io, loff_t pos, size_t count)
+{
+ struct cl_io_pt *pt;
+ int rc;
+
+ OBD_ALLOC(pt, sizeof(*pt));
+ if (pt == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ pt->cip_next = NULL;
+ init_sync_kiocb(&pt->cip_iocb, io->u.ci_rw.rw_file);
+ pt->cip_iocb.ki_pos = pos;
+#ifdef HAVE_KIOCB_KI_LEFT
+ pt->cip_iocb.ki_left = count;
+#elif defined(HAVE_KI_NBYTES)
+ pt->cip_iocb.ki_nbytes = count;
+#endif
+ pt->cip_iter = io->u.ci_rw.rw_iter;
+ iov_iter_truncate(&pt->cip_iter, count);
+ pt->cip_file = io->u.ci_rw.rw_file;
+ pt->cip_iot = io->ci_type;
+ pt->cip_pos = pos;
+ pt->cip_count = count;
+ pt->cip_result = 0;
+
+ rc = cfs_ptask_init(&pt->cip_task, io->u.ci_rw.rw_ptask, pt,
+ PTF_ORDERED | PTF_COMPLETE |
+ PTF_USER_MM | PTF_RETRY, smp_processor_id());
+ if (rc)
+ GOTO(out_error, rc);
+
+ CDEBUG(D_VFSTRACE, "submit %s range: [%llu, %llu)\n",
+ io->ci_type == CIT_READ ? "read" : "write",
+ pos, pos + count);
+
+ rc = cfs_ptask_submit(&pt->cip_task, cl_io_engine);
+ if (rc)
+ GOTO(out_error, rc);
+
+ RETURN(pt);
+
+out_error:
+ OBD_FREE(pt, sizeof(*pt));
+ RETURN(ERR_PTR(rc));
+}
+
/**
* Main io loop.
*
*/
int cl_io_loop(const struct lu_env *env, struct cl_io *io)
{
- int result = 0;
+ struct cl_io_pt *pt = NULL, *head = NULL;
+ struct cl_io_pt **tail = &head;
+ loff_t pos;
+ size_t count;
+ size_t last_chunk_count = 0;
+ bool short_io = false;
+ int rc = 0;
+ ENTRY;
- LINVRNT(cl_io_is_loopable(io));
- ENTRY;
+ LINVRNT(cl_io_is_loopable(io));
- do {
- size_t nob;
-
- io->ci_continue = 0;
- result = cl_io_iter_init(env, io);
- if (result == 0) {
- nob = io->ci_nob;
- result = cl_io_lock(env, io);
- if (result == 0) {
- /*
- * Notify layers that locks has been taken,
- * and do actual i/o.
- *
- * - llite: kms, short read;
- * - llite: generic_file_read();
- */
- result = cl_io_start(env, io);
- /*
- * Send any remaining pending
- * io, etc.
- *
- * - llite: ll_rw_stats_tally.
- */
- cl_io_end(env, io);
- cl_io_unlock(env, io);
- cl_io_rw_advance(env, io, io->ci_nob - nob);
- }
- }
- cl_io_iter_fini(env, io);
- } while (result == 0 && io->ci_continue);
- if (result == 0)
- result = io->ci_result;
- RETURN(result < 0 ? result : 0);
+ do {
+ io->ci_continue = 0;
+
+ rc = cl_io_iter_init(env, io);
+ if (rc) {
+ cl_io_iter_fini(env, io);
+ break;
+ }
+
+ pos = io->u.ci_rw.rw_range.cir_pos;
+ count = io->u.ci_rw.rw_range.cir_count;
+
+ if (io->ci_pio) {
+ /* submit this range for parallel execution */
+ pt = cl_io_submit_pt(io, pos, count);
+ if (IS_ERR(pt)) {
+ cl_io_iter_fini(env, io);
+ rc = PTR_ERR(pt);
+ break;
+ }
+
+ *tail = pt;
+ tail = &pt->cip_next;
+ } else {
+ size_t nob = io->ci_nob;
+
+ CDEBUG(D_VFSTRACE,
+ "execute type %u range: [%llu, %llu) nob: %zu %s\n",
+ io->ci_type, pos, pos + count, nob,
+ io->ci_continue ? "continue" : "stop");
+
+ rc = cl_io_lock(env, io);
+ if (rc) {
+ cl_io_iter_fini(env, io);
+ break;
+ }
+
+ /*
+ * Notify layers that locks has been taken,
+ * and do actual i/o.
+ *
+ * - llite: kms, short read;
+ * - llite: generic_file_read();
+ */
+ rc = cl_io_start(env, io);
+
+ /*
+ * Send any remaining pending
+ * io, etc.
+ *
+ * - llite: ll_rw_stats_tally.
+ */
+ cl_io_end(env, io);
+ cl_io_unlock(env, io);
+
+ count = io->ci_nob - nob;
+ last_chunk_count = count;
+ }
+
+ cl_io_rw_advance(env, io, count);
+ cl_io_iter_fini(env, io);
+ } while (!rc && io->ci_continue);
+
+ CDEBUG(D_VFSTRACE, "loop type %u done: nob: %zu, rc: %d %s\n",
+ io->ci_type, io->ci_nob, rc,
+ io->ci_continue ? "continue" : "stop");
+
+ while (head != NULL) {
+ int rc2;
+
+ pt = head;
+ head = head->cip_next;
+
+ rc2 = cfs_ptask_wait_for(&pt->cip_task);
+ LASSERTF(!rc2, "wait for task error: %d\n", rc2);
+
+ rc2 = cfs_ptask_result(&pt->cip_task);
+ CDEBUG(D_VFSTRACE,
+ "done %s range: [%llu, %llu) ret: %zd, rc: %d\n",
+ pt->cip_iot == CIT_READ ? "read" : "write",
+ pt->cip_pos, pt->cip_pos + pt->cip_count,
+ pt->cip_result, rc2);
+ if (rc2)
+ rc = rc ? rc : rc2;
+ if (!short_io) {
+ if (!rc2) /* IO is done by this task successfully */
+ io->ci_nob += pt->cip_result;
+ if (pt->cip_result < pt->cip_count) {
+ /* short IO happened.
+ * Not necessary to be an error */
+ CDEBUG(D_VFSTRACE,
+ "incomplete range: [%llu, %llu) "
+ "last_chunk_count: %zu\n",
+ pt->cip_pos,
+ pt->cip_pos + pt->cip_count,
+ last_chunk_count);
+ io->ci_nob -= last_chunk_count;
+ short_io = true;
+ }
+ }
+ OBD_FREE(pt, sizeof(*pt));
+ }
+
+ CDEBUG(D_VFSTRACE, "return nob: %zu (%s io), rc: %d\n",
+ io->ci_nob, short_io ? "short" : "full", rc);
+
+ RETURN(rc < 0 ? rc : io->ci_result);
}
EXPORT_SYMBOL(cl_io_loop);
}
};
+struct cfs_ptask_engine *cl_io_engine;
+
/**
* Global initialization of cl-data. Create kmem caches, register
* lu_context_key's, etc.
if (result) /* no cl_env_percpu_fini on error */
GOTO(out_keys, result);
+ cl_io_engine = cfs_ptengine_init("clio", cpu_online_mask);
+ if (IS_ERR(cl_io_engine)) {
+ result = PTR_ERR(cl_io_engine);
+ cl_io_engine = NULL;
+ GOTO(out_percpu, result);
+ }
+
return 0;
+out_percpu:
+ cl_env_percpu_fini();
out_keys:
lu_context_key_degister(&cl_key);
out_kmem:
*/
void cl_global_fini(void)
{
+ cfs_ptengine_fini(cl_io_engine);
+ cl_io_engine = NULL;
cl_env_percpu_fini();
lu_context_key_degister(&cl_key);
lu_kmem_fini(cl_object_caches);
if (cl_io_is_append(io))
RETURN(osc_io_iter_init(env, ios));
- npages = io->u.ci_rw.crw_count >> PAGE_SHIFT;
- if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
+ npages = io->u.ci_rw.rw_range.cir_count >> PAGE_SHIFT;
+ if (io->u.ci_rw.rw_range.cir_pos & ~PAGE_MASK)
++npages;
oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
return;
if (likely(io->ci_type == CIT_WRITE)) {
- io_start = cl_index(obj, io->u.ci_rw.crw_pos);
- io_end = cl_index(obj, io->u.ci_rw.crw_pos +
- io->u.ci_rw.crw_count - 1);
+ io_start = cl_index(obj, io->u.ci_rw.rw_range.cir_pos);
+ io_end = cl_index(obj, io->u.ci_rw.rw_range.cir_pos +
+ io->u.ci_rw.rw_range.cir_count - 1);
} else {
LASSERT(cl_io_is_mkwrite(io));
io_start = io_end = io->u.ci_fault.ft_index;
}
run_test 82 "Basic grouplock test ==============================="
+test_83() {
+ local sfile="/boot/System.map-$(uname -r)"
+ # define OBD_FAIL_LLITE_PTASK_IO_FAIL 0x140d
+ $LCTL set_param fail_loc=0x140d
+ cp $sfile $DIR/$tfile || error "write failed"
+ diff -c $sfile $DIR/$tfile || error "files are different"
+ $LCTL set_param fail_loc=0
+ rm -f $DIR/$tfile
+}
+run_test 83 "Short write in ptask ==============================="
+
test_99a() {
[ -z "$(which cvs 2>/dev/null)" ] && skip_env "could not find cvs" &&
return