Whamcloud - gitweb
LU-15811 llite: Rework upper/lower DIO/AIO
[fs/lustre-release.git] / lustre / llite / rw26.c
index 89d7891..f558984 100644 (file)
@@ -293,13 +293,13 @@ static unsigned long ll_iov_iter_alignment(struct iov_iter *i)
 
 static int
 ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
-                  int rw, struct inode *inode, struct cl_dio_aio *aio)
+                  int rw, struct inode *inode, struct cl_sub_dio *sdio)
 {
-       struct ll_dio_pages *pv = &aio->cda_dio_pages;
+       struct ll_dio_pages *pv = &sdio->csd_dio_pages;
        struct cl_page    *page;
        struct cl_2queue  *queue = &io->ci_queue;
        struct cl_object  *obj = io->ci_obj;
-       struct cl_sync_io *anchor = &aio->cda_sync;
+       struct cl_sync_io *anchor = &sdio->csd_sync;
        loff_t offset   = pv->ldp_file_offset;
        int io_pages    = 0;
        size_t page_size = cl_page_size(obj);
@@ -361,7 +361,7 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
                smp_mb();
                rc = cl_io_submit_rw(env, io, iot, queue);
                if (rc == 0) {
-                       cl_page_list_splice(&queue->c2_qout, &aio->cda_pages);
+                       cl_page_list_splice(&queue->c2_qout, &sdio->csd_pages);
                } else {
                        atomic_add(-queue->c2_qin.pl_nr,
                                   &anchor->csi_sync_nr);
@@ -406,12 +406,14 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        struct cl_io *io;
        struct file *file = iocb->ki_filp;
        struct inode *inode = file->f_mapping->host;
-       struct cl_dio_aio *ll_aio;
-       struct cl_dio_aio *ldp_aio;
+       struct cl_dio_aio *ll_dio_aio;
+       struct cl_sub_dio *ldp_aio;
        size_t count = iov_iter_count(iter);
        ssize_t tot_bytes = 0, result = 0;
        loff_t file_offset = iocb->ki_pos;
+       bool sync_submit = false;
        struct vvp_io *vio;
+       ssize_t rc2;
 
        /* Check EOF by ourselves */
        if (rw == READ && file_offset >= i_size_read(inode))
@@ -441,9 +443,22 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        io = lcc->lcc_io;
        LASSERT(io != NULL);
 
-       ll_aio = io->ci_aio;
-       LASSERT(ll_aio);
-       LASSERT(ll_aio->cda_iocb == iocb);
+       ll_dio_aio = io->ci_dio_aio;
+       LASSERT(ll_dio_aio);
+       LASSERT(ll_dio_aio->cda_iocb == iocb);
+
+       /* We cannot do parallel submission of sub-I/Os - for AIO or regular
+        * DIO - unless lockless because it causes us to release the lock
+        * early.
+        *
+        * There are also several circumstances in which we must disable
+        * parallel DIO, so we check if it is enabled.
+        *
+        * The check for "is_sync_kiocb" excludes AIO, which does not need to
+        * be disabled in these situations.
+        */
+       if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio))
+               sync_submit = true;
 
        while (iov_iter_count(iter)) {
                struct ll_dio_pages *pvec;
@@ -458,19 +473,22 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
                                count = i_size_read(inode) - file_offset;
                }
 
-               /* this aio is freed on completion from cl_sync_io_note, so we
-                * do not need to directly free the memory here
+               /* if we are doing sync_submit, then we free this below,
+                * otherwise it is freed on the final call to cl_sync_io_note
+                * (either in this function or from a ptlrpcd daemon)
                 */
-               ldp_aio = cl_aio_alloc(iocb, ll_i2info(inode)->lli_clob, ll_aio);
+               ldp_aio = cl_sub_dio_alloc(ll_dio_aio, sync_submit);
                if (!ldp_aio)
                        GOTO(out, result = -ENOMEM);
 
-               pvec = &ldp_aio->cda_dio_pages;
+               pvec = &ldp_aio->csd_dio_pages;
 
                result = ll_get_user_pages(rw, iter, &pages,
                                           &pvec->ldp_count, count);
                if (unlikely(result <= 0)) {
-                       cl_sync_io_note(env, &ldp_aio->cda_sync, result);
+                       cl_sync_io_note(env, &ldp_aio->csd_sync, result);
+                       if (sync_submit)
+                               cl_sub_dio_free(ldp_aio, true);
                        GOTO(out, result);
                }
 
@@ -483,8 +501,15 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
                /* We've submitted pages and can now remove the extra
                 * reference for that
                 */
-               cl_sync_io_note(env, &ldp_aio->cda_sync, result);
-
+               cl_sync_io_note(env, &ldp_aio->csd_sync, result);
+
+               if (sync_submit) {
+                       rc2 = cl_sync_io_wait(env, &ldp_aio->csd_sync,
+                                            0);
+                       if (result == 0 && rc2)
+                               result = rc2;
+                       cl_sub_dio_free(ldp_aio, true);
+               }
                if (unlikely(result < 0))
                        GOTO(out, result);
 
@@ -494,35 +519,18 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        }
 
 out:
-       ll_aio->cda_bytes += tot_bytes;
+       ll_dio_aio->cda_bytes += tot_bytes;
 
        if (rw == WRITE)
                vio->u.readwrite.vui_written += tot_bytes;
        else
                vio->u.readwrite.vui_read += tot_bytes;
 
-       /* We cannot do async submission - for AIO or regular DIO - unless
-        * lockless because it causes us to release the lock early.
-        *
-        * There are also several circumstances in which we must disable
-        * parallel DIO, so we check if it is enabled.
-        *
-        * The check for "is_sync_kiocb" excludes AIO, which does not need to
-        * be disabled in these situations.
+       /* AIO is not supported on pipes, so we cannot return EIOCBQEUED like
+        * we normally would for both DIO and AIO here
         */
-       if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio)) {
-               ssize_t rc2;
-
-               /* Wait here rather than doing async submission */
-               rc2 = cl_sync_io_wait_recycle(env, &ll_aio->cda_sync, 0, 0);
-               if (result == 0 && rc2)
-                       result = rc2;
-
-               if (result == 0)
-                       result = tot_bytes;
-       } else if (result == 0) {
+       if (result == 0 && !iov_iter_is_pipe(iter))
                result = -EIOCBQUEUED;
-       }
 
        return result;
 }