enum cl_io_state ci_state;
/** main object this io is against. Immutable after creation. */
struct cl_object *ci_obj;
+ /** one AIO request might be split in cl_io_loop */
+ struct cl_dio_aio *ci_aio;
/**
* Upper layer io, of which this io is a part of. Immutable after
* creation.
struct file *file, enum cl_io_type iot,
loff_t *ppos, size_t count)
{
- struct vvp_io *vio = vvp_env_io(env);
- struct inode *inode = file_inode(file);
- struct ll_inode_info *lli = ll_i2info(inode);
- struct ll_file_data *fd = file->private_data;
- struct range_lock range;
- struct cl_io *io;
- ssize_t result = 0;
- int rc = 0;
- unsigned retried = 0;
- unsigned ignore_lockless = 0;
+ struct vvp_io *vio = vvp_env_io(env);
+ struct inode *inode = file_inode(file);
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct ll_file_data *fd = file->private_data;
+ struct range_lock range;
+ struct cl_io *io;
+ ssize_t result = 0;
+ int rc = 0;
+ unsigned int retried = 0, ignore_lockless = 0;
+ bool is_aio = false;
ENTRY;
case IO_NORMAL:
vio->vui_iter = args->u.normal.via_iter;
vio->vui_iocb = args->u.normal.via_iocb;
+ if (file->f_flags & O_DIRECT) {
+ if (!is_sync_kiocb(vio->vui_iocb))
+ is_aio = true;
+ io->ci_aio = cl_aio_alloc(vio->vui_iocb);
+ if (!io->ci_aio)
+ GOTO(out, rc = -ENOMEM);
+ }
/* Direct IO reads must also take range lock,
* or multiple reads will try to work on the same pages
* See LU-6227 for details. */
rc = io->ci_result;
}
- if (io->ci_nob > 0) {
+ /*
+ * In order to move forward AIO, ci_nob was increased,
+ * but that doesn't mean io have been finished, it just
+ * means io have been submited, we will always return
+ * EIOCBQUEUED to the caller, So we could only return
+ * number of bytes in non-AIO case.
+ */
+ if (io->ci_nob > 0 && !is_aio) {
result += io->ci_nob;
count -= io->ci_nob;
*ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
args->u.normal.via_iter = vio->vui_iter;
}
out:
+ if (io->ci_aio) {
+ /**
+ * Drop one extra reference so that end_io() could be
+ * called for this IO context, we could call it after
+ * we make sure all AIO requests have been proceed.
+ */
+ cl_sync_io_note(env, &io->ci_aio->cda_sync,
+ rc == -EIOCBQUEUED ? 0 : rc);
+ if (!is_aio) {
+ cl_aio_free(io->ci_aio);
+ io->ci_aio = NULL;
+ }
+ }
cl_io_fini(env, io);
CDEBUG(D_VFSTRACE,
size_t count = iov_iter_count(iter);
ssize_t tot_bytes = 0, result = 0;
loff_t file_offset = iocb->ki_pos;
+ struct vvp_io *vio;
/* if file is encrypted, return 0 so that we fall back to buffered IO */
if (IS_ENCRYPTED(inode))
env = lcc->lcc_env;
LASSERT(!IS_ERR(env));
+ vio = vvp_env_io(env);
io = lcc->lcc_io;
LASSERT(io != NULL);
- aio = cl_aio_alloc(iocb);
- if (!aio)
- RETURN(-ENOMEM);
+ aio = io->ci_aio;
+ LASSERT(aio);
+ LASSERT(aio->cda_iocb == iocb);
/* 0. Need locking between buffered and direct access. and race with
* size changing by concurrent truncates and writes.
}
out:
- aio->cda_bytes = tot_bytes;
- cl_sync_io_note(env, &aio->cda_sync, result);
+ aio->cda_bytes += tot_bytes;
if (is_sync_kiocb(iocb)) {
+ struct cl_sync_io *anchor = &aio->cda_sync;
ssize_t rc2;
- rc2 = cl_sync_io_wait(env, &aio->cda_sync, 0);
+ /**
+ * @anchor was inited as 1 to prevent end_io to be
+ * called before we add all pages for IO, so drop
+ * one extra reference to make sure we could wait
+ * count to be zero.
+ */
+ cl_sync_io_note(env, anchor, result);
+
+ rc2 = cl_sync_io_wait(env, anchor, 0);
if (result == 0 && rc2)
result = rc2;
-
+ /**
+ * One extra reference again, as if @anchor is
+ * reused we assume it as 1 before using.
+ */
+ atomic_add(1, &anchor->csi_sync_nr);
if (result == 0) {
- struct vvp_io *vio = vvp_env_io(env);
/* no commit async for direct IO */
- vio->u.write.vui_written += tot_bytes;
+ vio->u.readwrite.vui_written += tot_bytes;
result = tot_bytes;
}
- cl_aio_free(aio);
} else {
+ if (rw == WRITE)
+ vio->u.readwrite.vui_written += tot_bytes;
+ else
+ vio->u.readwrite.vui_read += tot_bytes;
result = -EIOCBQUEUED;
}
if (unlikely(vmpage == NULL ||
PageDirty(vmpage) || PageWriteback(vmpage))) {
struct vvp_io *vio = vvp_env_io(env);
- struct cl_page_list *plist = &vio->u.write.vui_queue;
+ struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
/* if the page is already in dirty cache, we have to commit
* the pages right now; otherwise, it may cause deadlock
LASSERT(cl_page_is_owned(page, io));
if (copied > 0) {
- struct cl_page_list *plist = &vio->u.write.vui_queue;
+ struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
lcc->lcc_page = NULL; /* page will be queued */
/* Add it into write queue */
cl_page_list_add(plist, page);
if (plist->pl_nr == 1) /* first page */
- vio->u.write.vui_from = from;
+ vio->u.readwrite.vui_from = from;
else
LASSERT(from == 0);
- vio->u.write.vui_to = from + copied;
+ vio->u.readwrite.vui_to = from + copied;
/* To address the deadlock in balance_dirty_pages() where
* this dirty page may be written back in the same thread. */
struct {
struct cl_page_list vui_queue;
unsigned long vui_written;
+ unsigned long vui_read;
int vui_from;
int vui_to;
- } write;
+ } readwrite; /* normal io */
} u;
enum vvp_io_subtype vui_io_subtype;
{
struct vvp_io *vio = cl2vvp_io(env, ios);
- cl_page_list_init(&vio->u.write.vui_queue);
- vio->u.write.vui_written = 0;
- vio->u.write.vui_from = 0;
- vio->u.write.vui_to = PAGE_SIZE;
+ cl_page_list_init(&vio->u.readwrite.vui_queue);
+ vio->u.readwrite.vui_written = 0;
+ vio->u.readwrite.vui_from = 0;
+ vio->u.readwrite.vui_to = PAGE_SIZE;
+
+ return 0;
+}
+
+static int vvp_io_read_iter_init(const struct lu_env *env,
+ const struct cl_io_slice *ios)
+{
+ struct vvp_io *vio = cl2vvp_io(env, ios);
+
+ vio->u.readwrite.vui_read = 0;
return 0;
}
{
struct vvp_io *vio = cl2vvp_io(env, ios);
- LASSERT(vio->u.write.vui_queue.pl_nr == 0);
+ LASSERT(vio->u.readwrite.vui_queue.pl_nr == 0);
}
static int vvp_io_fault_iter_init(const struct lu_env *env,
io->ci_continue = 0;
io->ci_nob += result;
result = 0;
+ } else if (result == -EIOCBQUEUED) {
+ io->ci_nob += vio->u.readwrite.vui_read;
+ if (vio->vui_iocb)
+ vio->vui_iocb->ki_pos = pos +
+ vio->u.readwrite.vui_read;
}
return result;
struct cl_object *obj = io->ci_obj;
struct inode *inode = vvp_object_inode(obj);
struct vvp_io *vio = vvp_env_io(env);
- struct cl_page_list *queue = &vio->u.write.vui_queue;
+ struct cl_page_list *queue = &vio->u.readwrite.vui_queue;
struct cl_page *page;
int rc = 0;
int bytes = 0;
- unsigned int npages = vio->u.write.vui_queue.pl_nr;
+ unsigned int npages = vio->u.readwrite.vui_queue.pl_nr;
ENTRY;
if (npages == 0)
RETURN(0);
CDEBUG(D_VFSTRACE, "commit async pages: %d, from %d, to %d\n",
- npages, vio->u.write.vui_from, vio->u.write.vui_to);
+ npages, vio->u.readwrite.vui_from, vio->u.readwrite.vui_to);
LASSERT(page_list_sanity_check(obj, queue));
/* submit IO with async write */
rc = cl_io_commit_async(env, io, queue,
- vio->u.write.vui_from, vio->u.write.vui_to,
+ vio->u.readwrite.vui_from,
+ vio->u.readwrite.vui_to,
write_commit_callback);
npages -= queue->pl_nr; /* already committed pages */
if (npages > 0) {
bytes = npages << PAGE_SHIFT;
/* first page */
- bytes -= vio->u.write.vui_from;
+ bytes -= vio->u.readwrite.vui_from;
if (queue->pl_nr == 0) /* last page */
- bytes -= PAGE_SIZE - vio->u.write.vui_to;
+ bytes -= PAGE_SIZE - vio->u.readwrite.vui_to;
LASSERTF(bytes > 0, "bytes = %d, pages = %d\n", bytes, npages);
- vio->u.write.vui_written += bytes;
+ vio->u.readwrite.vui_written += bytes;
CDEBUG(D_VFSTRACE, "Committed %d pages %d bytes, tot: %ld\n",
- npages, bytes, vio->u.write.vui_written);
+ npages, bytes, vio->u.readwrite.vui_written);
/* the first page must have been written. */
- vio->u.write.vui_from = 0;
+ vio->u.readwrite.vui_from = 0;
}
LASSERT(page_list_sanity_check(obj, queue));
LASSERT(ergo(rc == 0, queue->pl_nr == 0));
/* out of quota, try sync write */
if (rc == -EDQUOT && !cl_io_is_mkwrite(io)) {
rc = vvp_io_commit_sync(env, io, queue,
- vio->u.write.vui_from,
- vio->u.write.vui_to);
+ vio->u.readwrite.vui_from,
+ vio->u.readwrite.vui_to);
if (rc > 0) {
- vio->u.write.vui_written += rc;
+ vio->u.readwrite.vui_written += rc;
rc = 0;
}
}
result = vvp_io_write_commit(env, io);
/* Simulate short commit */
if (CFS_FAULT_CHECK(OBD_FAIL_LLITE_SHORT_COMMIT)) {
- vio->u.write.vui_written >>= 1;
- if (vio->u.write.vui_written > 0)
+ vio->u.readwrite.vui_written >>= 1;
+ if (vio->u.readwrite.vui_written > 0)
io->ci_need_restart = 1;
}
- if (vio->u.write.vui_written > 0) {
- result = vio->u.write.vui_written;
+ if (vio->u.readwrite.vui_written > 0) {
+ result = vio->u.readwrite.vui_written;
CDEBUG(D_VFSTRACE, "%s: write nob %zd, result: %zd\n",
file_dentry(file)->d_name.name,
io->ci_nob, result);
if (result > 0 || result == -EIOCBQUEUED) {
ll_file_set_flag(ll_i2info(inode), LLIF_DATA_MODIFIED);
- if (result < cnt)
+ if (result != -EIOCBQUEUED && result < cnt)
io->ci_continue = 0;
if (result > 0)
result = 0;
+ /* move forward */
+ if (result == -EIOCBQUEUED) {
+ io->ci_nob += vio->u.readwrite.vui_written;
+ vio->vui_iocb->ki_pos = pos +
+ vio->u.readwrite.vui_written;
+ }
}
RETURN(result);
.op = {
[CIT_READ] = {
.cio_fini = vvp_io_fini,
+ .cio_iter_init = vvp_io_read_iter_init,
.cio_lock = vvp_io_read_lock,
.cio_start = vvp_io_read_start,
.cio_end = vvp_io_rw_end,
*/
int cl_io_loop(const struct lu_env *env, struct cl_io *io)
{
- int result = 0;
+ int result = 0;
+ int rc = 0;
LINVRNT(cl_io_is_loopable(io));
ENTRY;
}
}
cl_io_iter_fini(env, io);
- } while (result == 0 && io->ci_continue);
+ if (result)
+ rc = result;
+ } while ((result == 0 || result == -EIOCBQUEUED) &&
+ io->ci_continue);
+
+ if (rc && !result)
+ result = rc;
if (result == -EWOULDBLOCK && io->ci_ndelay) {
io->ci_need_restart = 1;
--filename=$DIR/$tfile
[ $? -eq 0 ] || error "fio mixed read write error"
+ echo "AIO with large block size ${size}M"
+ fio --name=rand-rw --rw=randrw --bs=${size}M --direct=1 \
+ --numjobs=1 --fallocate=none --ioengine=libaio \
+ --iodepth=16 --allow_file_create=0 --size=${size}M \
+ --filename=$DIR/$tfile
+ [ $? -eq 0 ] || error "fio large block size failed"
+
rm -rf $DIR/$tfile
$LCTL set_param debug="$saved_debug"
}