Whamcloud - gitweb
LU-14687 llite: Return errors for aio
[fs/lustre-release.git] / lustre / llite / rw26.c
index 815ab13..6fdeba9 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/lustre/llite/rw26.c
  *
@@ -228,10 +227,10 @@ static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter,
        if (*pages == NULL)
                return -ENOMEM;
 
-       down_read(&current->mm->mmap_sem);
+       mmap_read_lock(current->mm);
        result = get_user_pages(current, current->mm, addr, page_count,
                                rw == READ, 0, *pages, NULL);
-       up_read(&current->mm->mmap_sem);
+       mmap_read_unlock(current->mm);
 
        if (unlikely(result != page_count)) {
                ll_free_user_pages(*pages, page_count);
@@ -250,12 +249,12 @@ static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter,
 
 /* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */
 #if defined(HAVE_DIO_ITER)
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
 {
        return iov_iter_alignment(i);
 }
 #else /* copied from alignment_iovec() */
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
 {
        const struct iovec *iov = i->iov;
        unsigned long res;
@@ -282,6 +281,35 @@ static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
 }
 #endif
 
+/*
+ * Lustre could relax a bit for alignment, io count is not
+ * necessary page alignment.
+ */
+static unsigned long ll_iov_iter_alignment(struct iov_iter *i)
+{
+       size_t orig_size = i->count;
+       size_t count = orig_size & ~PAGE_MASK;
+       unsigned long res;
+
+       if (!count)
+               return iov_iter_alignment_vfs(i);
+
+       if (orig_size > PAGE_SIZE) {
+               iov_iter_truncate(i, orig_size - count);
+               res = iov_iter_alignment_vfs(i);
+               iov_iter_reexpand(i, orig_size);
+
+               return res;
+       }
+
+       res = iov_iter_alignment_vfs(i);
+       /* start address is page aligned */
+       if ((res & ~PAGE_MASK) == orig_size)
+               return PAGE_SIZE;
+
+       return res;
+}
+
 /** direct IO pages */
 struct ll_dio_pages {
        struct cl_dio_aio       *ldp_aio;
@@ -329,6 +357,16 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
                }
 
                page->cp_sync_io = anchor;
+               if (inode && IS_ENCRYPTED(inode)) {
+                       /* In case of Direct IO on encrypted file, we need to
+                        * add a reference to the inode on the cl_page.
+                        * This info is required by llcrypt to proceed
+                        * to encryption/decryption.
+                        * This is safe because we know these pages are private
+                        * to the thread doing the Direct IO.
+                        */
+                       page->cp_inode = inode;
+               }
                cl_2queue_add(queue, page);
                /*
                 * Set page clip to tell transfer formation engine
@@ -346,6 +384,11 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
                int iot = rw == READ ? CRT_READ : CRT_WRITE;
 
                atomic_add(io_pages, &anchor->csi_sync_nr);
+               /*
+                * Avoid out-of-order execution of adding inflight
+                * modifications count and io submit.
+                */
+               smp_mb();
                rc = cl_io_submit_rw(env, io, iot, queue);
                if (rc == 0) {
                        cl_page_list_splice(&queue->c2_qout,
@@ -400,17 +443,13 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        loff_t file_offset = iocb->ki_pos;
        struct vvp_io *vio;
 
-       /* if file is encrypted, return 0 so that we fall back to buffered IO */
-       if (IS_ENCRYPTED(inode))
-               return 0;
-
        /* Check EOF by ourselves */
        if (rw == READ && file_offset >= i_size_read(inode))
                return 0;
 
        /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */
-       if ((file_offset & ~PAGE_MASK) || (count & ~PAGE_MASK))
-               return -EINVAL;
+       if (file_offset & ~PAGE_MASK)
+               RETURN(-EINVAL);
 
        CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), "
               "offset=%lld=%llx, pages %zd (max %lu)\n",
@@ -420,7 +459,7 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
 
        /* Check that all user buffers are aligned as well */
        if (ll_iov_iter_alignment(iter) & ~PAGE_MASK)
-               return -EINVAL;
+               RETURN(-EINVAL);
 
        lcc = ll_cl_find(file);
        if (lcc == NULL)
@@ -436,13 +475,6 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        LASSERT(aio);
        LASSERT(aio->cda_iocb == iocb);
 
-       /* 0. Need locking between buffered and direct access. and race with
-        *    size changing by concurrent truncates and writes.
-        * 1. Need inode mutex to operate transient pages.
-        */
-       if (rw == READ)
-               inode_lock(inode);
-
        while (iov_iter_count(iter)) {
                struct ll_dio_pages pvec = { .ldp_aio = aio };
                struct page **pages;
@@ -510,12 +542,10 @@ out:
                        vio->u.readwrite.vui_written += tot_bytes;
                else
                        vio->u.readwrite.vui_read += tot_bytes;
-               result = -EIOCBQUEUED;
+               if (result == 0)
+                       result = -EIOCBQUEUED;
        }
 
-       if (rw == READ)
-               inode_unlock(inode);
-
        return result;
 }
 
@@ -665,12 +695,12 @@ static int ll_write_begin(struct file *file, struct address_space *mapping,
                        GOTO(out, result = -EBUSY);
 
                /**
-                * Direct read can fall back to buffered read, but DIO is done
+                * Direct write can fall back to buffered read, but DIO is done
                 * with lockless i/o, and buffered requires LDLM locking, so
                 * in this case we must restart without lockless.
                 */
-               if (!io->ci_ignore_lockless) {
-                       io->ci_ignore_lockless = 1;
+               if (!io->ci_dio_lock) {
+                       io->ci_dio_lock = 1;
                        io->ci_need_restart = 1;
                        GOTO(out, result = -ENOLCK);
                }