static int
ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
- int rw, struct inode *inode, struct cl_dio_aio *aio)
+ int rw, struct inode *inode, struct cl_sub_dio *sdio)
{
- struct ll_dio_pages *pv = &aio->cda_dio_pages;
+ struct ll_dio_pages *pv = &sdio->csd_dio_pages;
struct cl_page *page;
struct cl_2queue *queue = &io->ci_queue;
struct cl_object *obj = io->ci_obj;
- struct cl_sync_io *anchor = &aio->cda_sync;
+ struct cl_sync_io *anchor = &sdio->csd_sync;
loff_t offset = pv->ldp_file_offset;
int io_pages = 0;
size_t page_size = cl_page_size(obj);
smp_mb();
rc = cl_io_submit_rw(env, io, iot, queue);
if (rc == 0) {
- cl_page_list_splice(&queue->c2_qout, &aio->cda_pages);
+ cl_page_list_splice(&queue->c2_qout, &sdio->csd_pages);
} else {
atomic_add(-queue->c2_qin.pl_nr,
&anchor->csi_sync_nr);
struct cl_io *io;
struct file *file = iocb->ki_filp;
struct inode *inode = file->f_mapping->host;
- struct cl_dio_aio *ll_aio;
- struct cl_dio_aio *ldp_aio;
+ struct cl_dio_aio *ll_dio_aio;
+ struct cl_sub_dio *ldp_aio;
size_t count = iov_iter_count(iter);
ssize_t tot_bytes = 0, result = 0;
loff_t file_offset = iocb->ki_pos;
+ bool sync_submit = false;
struct vvp_io *vio;
+ ssize_t rc2;
/* Check EOF by ourselves */
if (rw == READ && file_offset >= i_size_read(inode))
io = lcc->lcc_io;
LASSERT(io != NULL);
- ll_aio = io->ci_aio;
- LASSERT(ll_aio);
- LASSERT(ll_aio->cda_iocb == iocb);
+ ll_dio_aio = io->ci_dio_aio;
+ LASSERT(ll_dio_aio);
+ LASSERT(ll_dio_aio->cda_iocb == iocb);
+
+ /* We cannot do parallel submission of sub-I/Os - for AIO or regular
+ * DIO - unless lockless because it causes us to release the lock
+ * early.
+ *
+ * There are also several circumstances in which we must disable
+ * parallel DIO, so we check if it is enabled.
+ *
+ * The check for "is_sync_kiocb" excludes AIO, which does not need to
+ * be disabled in these situations.
+ */
+ if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio))
+ sync_submit = true;
while (iov_iter_count(iter)) {
struct ll_dio_pages *pvec;
count = i_size_read(inode) - file_offset;
}
- /* this aio is freed on completion from cl_sync_io_note, so we
- * do not need to directly free the memory here
+ /* if we are doing sync_submit, then we free this below,
+ * otherwise it is freed on the final call to cl_sync_io_note
+ * (either in this function or from a ptlrpcd daemon)
*/
- ldp_aio = cl_aio_alloc(iocb, ll_i2info(inode)->lli_clob, ll_aio);
+ ldp_aio = cl_sub_dio_alloc(ll_dio_aio, sync_submit);
if (!ldp_aio)
GOTO(out, result = -ENOMEM);
- pvec = &ldp_aio->cda_dio_pages;
+ pvec = &ldp_aio->csd_dio_pages;
result = ll_get_user_pages(rw, iter, &pages,
&pvec->ldp_count, count);
if (unlikely(result <= 0)) {
- cl_sync_io_note(env, &ldp_aio->cda_sync, result);
+ cl_sync_io_note(env, &ldp_aio->csd_sync, result);
+ if (sync_submit)
+ cl_sub_dio_free(ldp_aio, true);
GOTO(out, result);
}
/* We've submitted pages and can now remove the extra
* reference for that
*/
- cl_sync_io_note(env, &ldp_aio->cda_sync, result);
-
+ cl_sync_io_note(env, &ldp_aio->csd_sync, result);
+
+ if (sync_submit) {
+ rc2 = cl_sync_io_wait(env, &ldp_aio->csd_sync,
+ 0);
+ if (result == 0 && rc2)
+ result = rc2;
+ cl_sub_dio_free(ldp_aio, true);
+ }
if (unlikely(result < 0))
GOTO(out, result);
}
out:
- ll_aio->cda_bytes += tot_bytes;
+ ll_dio_aio->cda_bytes += tot_bytes;
if (rw == WRITE)
vio->u.readwrite.vui_written += tot_bytes;
else
vio->u.readwrite.vui_read += tot_bytes;
- /* We cannot do async submission - for AIO or regular DIO - unless
- * lockless because it causes us to release the lock early.
- *
- * There are also several circumstances in which we must disable
- * parallel DIO, so we check if it is enabled.
- *
- * The check for "is_sync_kiocb" excludes AIO, which does not need to
- * be disabled in these situations.
+ /* AIO is not supported on pipes, so we cannot return EIOCBQEUED like
+ * we normally would for both DIO and AIO here
*/
- if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio)) {
- ssize_t rc2;
-
- /* Wait here rather than doing async submission */
- rc2 = cl_sync_io_wait_recycle(env, &ll_aio->cda_sync, 0, 0);
- if (result == 0 && rc2)
- result = rc2;
-
- if (result == 0)
- result = tot_bytes;
- } else if (result == 0) {
+ if (result == 0 && !iov_iter_is_pipe(iter))
result = -EIOCBQUEUED;
- }
return result;
}