The problem is currently AIO could not handle i/o size > stripe size:
We need cl io loop to handle io across stripes, since -EIOCBQUEUED is
returned for AIO, io loop will be stopped thus short io happen.
The patch try to fix the problem by making IO engine aware of
special error, and it will be proceed to finish all IO requests.
Fixes: d1dde ("LU-4198 clio: AIO support for direct IO")
Signed-off-by: Wang Shilong <wshilong@ddn.com>
Change-Id: I1885e0fc510571417d888249b381f9c2f130ca5a
Reviewed-on: https://review.whamcloud.com/39104
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
enum cl_io_state ci_state;
/** main object this io is against. Immutable after creation. */
struct cl_object *ci_obj;
enum cl_io_state ci_state;
/** main object this io is against. Immutable after creation. */
struct cl_object *ci_obj;
+ /** one AIO request might be split in cl_io_loop */
+ struct cl_dio_aio *ci_aio;
/**
* Upper layer io, of which this io is a part of. Immutable after
* creation.
/**
* Upper layer io, of which this io is a part of. Immutable after
* creation.
struct file *file, enum cl_io_type iot,
loff_t *ppos, size_t count)
{
struct file *file, enum cl_io_type iot,
loff_t *ppos, size_t count)
{
- struct vvp_io *vio = vvp_env_io(env);
- struct inode *inode = file_inode(file);
- struct ll_inode_info *lli = ll_i2info(inode);
- struct ll_file_data *fd = file->private_data;
- struct range_lock range;
- struct cl_io *io;
- ssize_t result = 0;
- int rc = 0;
- unsigned retried = 0;
- unsigned ignore_lockless = 0;
+ struct vvp_io *vio = vvp_env_io(env);
+ struct inode *inode = file_inode(file);
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct ll_file_data *fd = file->private_data;
+ struct range_lock range;
+ struct cl_io *io;
+ ssize_t result = 0;
+ int rc = 0;
+ unsigned int retried = 0, ignore_lockless = 0;
+ bool is_aio = false;
case IO_NORMAL:
vio->vui_iter = args->u.normal.via_iter;
vio->vui_iocb = args->u.normal.via_iocb;
case IO_NORMAL:
vio->vui_iter = args->u.normal.via_iter;
vio->vui_iocb = args->u.normal.via_iocb;
+ if (file->f_flags & O_DIRECT) {
+ if (!is_sync_kiocb(vio->vui_iocb))
+ is_aio = true;
+ io->ci_aio = cl_aio_alloc(vio->vui_iocb);
+ if (!io->ci_aio)
+ GOTO(out, rc = -ENOMEM);
+ }
/* Direct IO reads must also take range lock,
* or multiple reads will try to work on the same pages
* See LU-6227 for details. */
/* Direct IO reads must also take range lock,
* or multiple reads will try to work on the same pages
* See LU-6227 for details. */
+ /*
+ * In order to move forward AIO, ci_nob was increased,
+ * but that doesn't mean io have been finished, it just
+ * means io have been submited, we will always return
+ * EIOCBQUEUED to the caller, So we could only return
+ * number of bytes in non-AIO case.
+ */
+ if (io->ci_nob > 0 && !is_aio) {
result += io->ci_nob;
count -= io->ci_nob;
*ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
result += io->ci_nob;
count -= io->ci_nob;
*ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
args->u.normal.via_iter = vio->vui_iter;
}
out:
args->u.normal.via_iter = vio->vui_iter;
}
out:
+ if (io->ci_aio) {
+ /**
+ * Drop one extra reference so that end_io() could be
+ * called for this IO context, we could call it after
+ * we make sure all AIO requests have been proceed.
+ */
+ cl_sync_io_note(env, &io->ci_aio->cda_sync,
+ rc == -EIOCBQUEUED ? 0 : rc);
+ if (!is_aio) {
+ cl_aio_free(io->ci_aio);
+ io->ci_aio = NULL;
+ }
+ }
cl_io_fini(env, io);
CDEBUG(D_VFSTRACE,
cl_io_fini(env, io);
CDEBUG(D_VFSTRACE,
size_t count = iov_iter_count(iter);
ssize_t tot_bytes = 0, result = 0;
loff_t file_offset = iocb->ki_pos;
size_t count = iov_iter_count(iter);
ssize_t tot_bytes = 0, result = 0;
loff_t file_offset = iocb->ki_pos;
/* if file is encrypted, return 0 so that we fall back to buffered IO */
if (IS_ENCRYPTED(inode))
/* if file is encrypted, return 0 so that we fall back to buffered IO */
if (IS_ENCRYPTED(inode))
env = lcc->lcc_env;
LASSERT(!IS_ERR(env));
env = lcc->lcc_env;
LASSERT(!IS_ERR(env));
io = lcc->lcc_io;
LASSERT(io != NULL);
io = lcc->lcc_io;
LASSERT(io != NULL);
- aio = cl_aio_alloc(iocb);
- if (!aio)
- RETURN(-ENOMEM);
+ aio = io->ci_aio;
+ LASSERT(aio);
+ LASSERT(aio->cda_iocb == iocb);
/* 0. Need locking between buffered and direct access. and race with
* size changing by concurrent truncates and writes.
/* 0. Need locking between buffered and direct access. and race with
* size changing by concurrent truncates and writes.
- aio->cda_bytes = tot_bytes;
- cl_sync_io_note(env, &aio->cda_sync, result);
+ aio->cda_bytes += tot_bytes;
if (is_sync_kiocb(iocb)) {
if (is_sync_kiocb(iocb)) {
+ struct cl_sync_io *anchor = &aio->cda_sync;
- rc2 = cl_sync_io_wait(env, &aio->cda_sync, 0);
+ /**
+ * @anchor was inited as 1 to prevent end_io to be
+ * called before we add all pages for IO, so drop
+ * one extra reference to make sure we could wait
+ * count to be zero.
+ */
+ cl_sync_io_note(env, anchor, result);
+
+ rc2 = cl_sync_io_wait(env, anchor, 0);
if (result == 0 && rc2)
result = rc2;
if (result == 0 && rc2)
result = rc2;
+ /**
+ * One extra reference again, as if @anchor is
+ * reused we assume it as 1 before using.
+ */
+ atomic_add(1, &anchor->csi_sync_nr);
- struct vvp_io *vio = vvp_env_io(env);
/* no commit async for direct IO */
/* no commit async for direct IO */
- vio->u.write.vui_written += tot_bytes;
+ vio->u.readwrite.vui_written += tot_bytes;
+ if (rw == WRITE)
+ vio->u.readwrite.vui_written += tot_bytes;
+ else
+ vio->u.readwrite.vui_read += tot_bytes;
if (unlikely(vmpage == NULL ||
PageDirty(vmpage) || PageWriteback(vmpage))) {
struct vvp_io *vio = vvp_env_io(env);
if (unlikely(vmpage == NULL ||
PageDirty(vmpage) || PageWriteback(vmpage))) {
struct vvp_io *vio = vvp_env_io(env);
- struct cl_page_list *plist = &vio->u.write.vui_queue;
+ struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
/* if the page is already in dirty cache, we have to commit
* the pages right now; otherwise, it may cause deadlock
/* if the page is already in dirty cache, we have to commit
* the pages right now; otherwise, it may cause deadlock
LASSERT(cl_page_is_owned(page, io));
if (copied > 0) {
LASSERT(cl_page_is_owned(page, io));
if (copied > 0) {
- struct cl_page_list *plist = &vio->u.write.vui_queue;
+ struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
lcc->lcc_page = NULL; /* page will be queued */
/* Add it into write queue */
cl_page_list_add(plist, page);
if (plist->pl_nr == 1) /* first page */
lcc->lcc_page = NULL; /* page will be queued */
/* Add it into write queue */
cl_page_list_add(plist, page);
if (plist->pl_nr == 1) /* first page */
- vio->u.write.vui_from = from;
+ vio->u.readwrite.vui_from = from;
- vio->u.write.vui_to = from + copied;
+ vio->u.readwrite.vui_to = from + copied;
/* To address the deadlock in balance_dirty_pages() where
* this dirty page may be written back in the same thread. */
/* To address the deadlock in balance_dirty_pages() where
* this dirty page may be written back in the same thread. */
struct {
struct cl_page_list vui_queue;
unsigned long vui_written;
struct {
struct cl_page_list vui_queue;
unsigned long vui_written;
+ unsigned long vui_read;
int vui_from;
int vui_to;
int vui_from;
int vui_to;
+ } readwrite; /* normal io */
} u;
enum vvp_io_subtype vui_io_subtype;
} u;
enum vvp_io_subtype vui_io_subtype;
{
struct vvp_io *vio = cl2vvp_io(env, ios);
{
struct vvp_io *vio = cl2vvp_io(env, ios);
- cl_page_list_init(&vio->u.write.vui_queue);
- vio->u.write.vui_written = 0;
- vio->u.write.vui_from = 0;
- vio->u.write.vui_to = PAGE_SIZE;
+ cl_page_list_init(&vio->u.readwrite.vui_queue);
+ vio->u.readwrite.vui_written = 0;
+ vio->u.readwrite.vui_from = 0;
+ vio->u.readwrite.vui_to = PAGE_SIZE;
+
+ return 0;
+}
+
+static int vvp_io_read_iter_init(const struct lu_env *env,
+ const struct cl_io_slice *ios)
+{
+ struct vvp_io *vio = cl2vvp_io(env, ios);
+
+ vio->u.readwrite.vui_read = 0;
{
struct vvp_io *vio = cl2vvp_io(env, ios);
{
struct vvp_io *vio = cl2vvp_io(env, ios);
- LASSERT(vio->u.write.vui_queue.pl_nr == 0);
+ LASSERT(vio->u.readwrite.vui_queue.pl_nr == 0);
}
static int vvp_io_fault_iter_init(const struct lu_env *env,
}
static int vvp_io_fault_iter_init(const struct lu_env *env,
io->ci_continue = 0;
io->ci_nob += result;
result = 0;
io->ci_continue = 0;
io->ci_nob += result;
result = 0;
+ } else if (result == -EIOCBQUEUED) {
+ io->ci_nob += vio->u.readwrite.vui_read;
+ if (vio->vui_iocb)
+ vio->vui_iocb->ki_pos = pos +
+ vio->u.readwrite.vui_read;
struct cl_object *obj = io->ci_obj;
struct inode *inode = vvp_object_inode(obj);
struct vvp_io *vio = vvp_env_io(env);
struct cl_object *obj = io->ci_obj;
struct inode *inode = vvp_object_inode(obj);
struct vvp_io *vio = vvp_env_io(env);
- struct cl_page_list *queue = &vio->u.write.vui_queue;
+ struct cl_page_list *queue = &vio->u.readwrite.vui_queue;
struct cl_page *page;
int rc = 0;
int bytes = 0;
struct cl_page *page;
int rc = 0;
int bytes = 0;
- unsigned int npages = vio->u.write.vui_queue.pl_nr;
+ unsigned int npages = vio->u.readwrite.vui_queue.pl_nr;
ENTRY;
if (npages == 0)
RETURN(0);
CDEBUG(D_VFSTRACE, "commit async pages: %d, from %d, to %d\n",
ENTRY;
if (npages == 0)
RETURN(0);
CDEBUG(D_VFSTRACE, "commit async pages: %d, from %d, to %d\n",
- npages, vio->u.write.vui_from, vio->u.write.vui_to);
+ npages, vio->u.readwrite.vui_from, vio->u.readwrite.vui_to);
LASSERT(page_list_sanity_check(obj, queue));
/* submit IO with async write */
rc = cl_io_commit_async(env, io, queue,
LASSERT(page_list_sanity_check(obj, queue));
/* submit IO with async write */
rc = cl_io_commit_async(env, io, queue,
- vio->u.write.vui_from, vio->u.write.vui_to,
+ vio->u.readwrite.vui_from,
+ vio->u.readwrite.vui_to,
write_commit_callback);
npages -= queue->pl_nr; /* already committed pages */
if (npages > 0) {
write_commit_callback);
npages -= queue->pl_nr; /* already committed pages */
if (npages > 0) {
bytes = npages << PAGE_SHIFT;
/* first page */
bytes = npages << PAGE_SHIFT;
/* first page */
- bytes -= vio->u.write.vui_from;
+ bytes -= vio->u.readwrite.vui_from;
if (queue->pl_nr == 0) /* last page */
if (queue->pl_nr == 0) /* last page */
- bytes -= PAGE_SIZE - vio->u.write.vui_to;
+ bytes -= PAGE_SIZE - vio->u.readwrite.vui_to;
LASSERTF(bytes > 0, "bytes = %d, pages = %d\n", bytes, npages);
LASSERTF(bytes > 0, "bytes = %d, pages = %d\n", bytes, npages);
- vio->u.write.vui_written += bytes;
+ vio->u.readwrite.vui_written += bytes;
CDEBUG(D_VFSTRACE, "Committed %d pages %d bytes, tot: %ld\n",
CDEBUG(D_VFSTRACE, "Committed %d pages %d bytes, tot: %ld\n",
- npages, bytes, vio->u.write.vui_written);
+ npages, bytes, vio->u.readwrite.vui_written);
/* the first page must have been written. */
/* the first page must have been written. */
- vio->u.write.vui_from = 0;
+ vio->u.readwrite.vui_from = 0;
}
LASSERT(page_list_sanity_check(obj, queue));
LASSERT(ergo(rc == 0, queue->pl_nr == 0));
}
LASSERT(page_list_sanity_check(obj, queue));
LASSERT(ergo(rc == 0, queue->pl_nr == 0));
/* out of quota, try sync write */
if (rc == -EDQUOT && !cl_io_is_mkwrite(io)) {
rc = vvp_io_commit_sync(env, io, queue,
/* out of quota, try sync write */
if (rc == -EDQUOT && !cl_io_is_mkwrite(io)) {
rc = vvp_io_commit_sync(env, io, queue,
- vio->u.write.vui_from,
- vio->u.write.vui_to);
+ vio->u.readwrite.vui_from,
+ vio->u.readwrite.vui_to);
- vio->u.write.vui_written += rc;
+ vio->u.readwrite.vui_written += rc;
result = vvp_io_write_commit(env, io);
/* Simulate short commit */
if (CFS_FAULT_CHECK(OBD_FAIL_LLITE_SHORT_COMMIT)) {
result = vvp_io_write_commit(env, io);
/* Simulate short commit */
if (CFS_FAULT_CHECK(OBD_FAIL_LLITE_SHORT_COMMIT)) {
- vio->u.write.vui_written >>= 1;
- if (vio->u.write.vui_written > 0)
+ vio->u.readwrite.vui_written >>= 1;
+ if (vio->u.readwrite.vui_written > 0)
io->ci_need_restart = 1;
}
io->ci_need_restart = 1;
}
- if (vio->u.write.vui_written > 0) {
- result = vio->u.write.vui_written;
+ if (vio->u.readwrite.vui_written > 0) {
+ result = vio->u.readwrite.vui_written;
CDEBUG(D_VFSTRACE, "%s: write nob %zd, result: %zd\n",
file_dentry(file)->d_name.name,
io->ci_nob, result);
CDEBUG(D_VFSTRACE, "%s: write nob %zd, result: %zd\n",
file_dentry(file)->d_name.name,
io->ci_nob, result);
if (result > 0 || result == -EIOCBQUEUED) {
ll_file_set_flag(ll_i2info(inode), LLIF_DATA_MODIFIED);
if (result > 0 || result == -EIOCBQUEUED) {
ll_file_set_flag(ll_i2info(inode), LLIF_DATA_MODIFIED);
+ if (result != -EIOCBQUEUED && result < cnt)
io->ci_continue = 0;
if (result > 0)
result = 0;
io->ci_continue = 0;
if (result > 0)
result = 0;
+ /* move forward */
+ if (result == -EIOCBQUEUED) {
+ io->ci_nob += vio->u.readwrite.vui_written;
+ vio->vui_iocb->ki_pos = pos +
+ vio->u.readwrite.vui_written;
+ }
.op = {
[CIT_READ] = {
.cio_fini = vvp_io_fini,
.op = {
[CIT_READ] = {
.cio_fini = vvp_io_fini,
+ .cio_iter_init = vvp_io_read_iter_init,
.cio_lock = vvp_io_read_lock,
.cio_start = vvp_io_read_start,
.cio_end = vvp_io_rw_end,
.cio_lock = vvp_io_read_lock,
.cio_start = vvp_io_read_start,
.cio_end = vvp_io_rw_end,
*/
int cl_io_loop(const struct lu_env *env, struct cl_io *io)
{
*/
int cl_io_loop(const struct lu_env *env, struct cl_io *io)
{
+ int result = 0;
+ int rc = 0;
LINVRNT(cl_io_is_loopable(io));
ENTRY;
LINVRNT(cl_io_is_loopable(io));
ENTRY;
}
}
cl_io_iter_fini(env, io);
}
}
cl_io_iter_fini(env, io);
- } while (result == 0 && io->ci_continue);
+ if (result)
+ rc = result;
+ } while ((result == 0 || result == -EIOCBQUEUED) &&
+ io->ci_continue);
+
+ if (rc && !result)
+ result = rc;
if (result == -EWOULDBLOCK && io->ci_ndelay) {
io->ci_need_restart = 1;
if (result == -EWOULDBLOCK && io->ci_ndelay) {
io->ci_need_restart = 1;
--filename=$DIR/$tfile
[ $? -eq 0 ] || error "fio mixed read write error"
--filename=$DIR/$tfile
[ $? -eq 0 ] || error "fio mixed read write error"
+ echo "AIO with large block size ${size}M"
+ fio --name=rand-rw --rw=randrw --bs=${size}M --direct=1 \
+ --numjobs=1 --fallocate=none --ioengine=libaio \
+ --iodepth=16 --allow_file_create=0 --size=${size}M \
+ --filename=$DIR/$tfile
+ [ $? -eq 0 ] || error "fio large block size failed"
+
rm -rf $DIR/$tfile
$LCTL set_param debug="$saved_debug"
}
rm -rf $DIR/$tfile
$LCTL set_param debug="$saved_debug"
}