Whamcloud - gitweb
LU-14687 llite: Return errors for aio
[fs/lustre-release.git] / lustre / llite / rw26.c
index d6b6196..6fdeba9 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/lustre/llite/rw26.c
  *
@@ -228,10 +227,10 @@ static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter,
        if (*pages == NULL)
                return -ENOMEM;
 
-       down_read(&current->mm->mmap_sem);
+       mmap_read_lock(current->mm);
        result = get_user_pages(current, current->mm, addr, page_count,
                                rw == READ, 0, *pages, NULL);
-       up_read(&current->mm->mmap_sem);
+       mmap_read_unlock(current->mm);
 
        if (unlikely(result != page_count)) {
                ll_free_user_pages(*pages, page_count);
@@ -250,12 +249,12 @@ static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter,
 
 /* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */
 #if defined(HAVE_DIO_ITER)
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
 {
        return iov_iter_alignment(i);
 }
 #else /* copied from alignment_iovec() */
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
 {
        const struct iovec *iov = i->iov;
        unsigned long res;
@@ -282,6 +281,35 @@ static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
 }
 #endif
 
+/*
+ * Lustre could relax a bit for alignment, io count is not
+ * necessary page alignment.
+ */
+static unsigned long ll_iov_iter_alignment(struct iov_iter *i)
+{
+       size_t orig_size = i->count;
+       size_t count = orig_size & ~PAGE_MASK;
+       unsigned long res;
+
+       if (!count)
+               return iov_iter_alignment_vfs(i);
+
+       if (orig_size > PAGE_SIZE) {
+               iov_iter_truncate(i, orig_size - count);
+               res = iov_iter_alignment_vfs(i);
+               iov_iter_reexpand(i, orig_size);
+
+               return res;
+       }
+
+       res = iov_iter_alignment_vfs(i);
+       /* start address is page aligned */
+       if ((res & ~PAGE_MASK) == orig_size)
+               return PAGE_SIZE;
+
+       return res;
+}
+
 /** direct IO pages */
 struct ll_dio_pages {
        struct cl_dio_aio       *ldp_aio;
@@ -329,6 +357,16 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
                }
 
                page->cp_sync_io = anchor;
+               if (inode && IS_ENCRYPTED(inode)) {
+                       /* In case of Direct IO on encrypted file, we need to
+                        * add a reference to the inode on the cl_page.
+                        * This info is required by llcrypt to proceed
+                        * to encryption/decryption.
+                        * This is safe because we know these pages are private
+                        * to the thread doing the Direct IO.
+                        */
+                       page->cp_inode = inode;
+               }
                cl_2queue_add(queue, page);
                /*
                 * Set page clip to tell transfer formation engine
@@ -346,6 +384,11 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
                int iot = rw == READ ? CRT_READ : CRT_WRITE;
 
                atomic_add(io_pages, &anchor->csi_sync_nr);
+               /*
+                * Avoid out-of-order execution of adding inflight
+                * modifications count and io submit.
+                */
+               smp_mb();
                rc = cl_io_submit_rw(env, io, iot, queue);
                if (rc == 0) {
                        cl_page_list_splice(&queue->c2_qout,
@@ -398,18 +441,15 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        size_t count = iov_iter_count(iter);
        ssize_t tot_bytes = 0, result = 0;
        loff_t file_offset = iocb->ki_pos;
-
-       /* if file is encrypted, return 0 so that we fall back to buffered IO */
-       if (IS_ENCRYPTED(inode))
-               return 0;
+       struct vvp_io *vio;
 
        /* Check EOF by ourselves */
        if (rw == READ && file_offset >= i_size_read(inode))
                return 0;
 
        /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */
-       if ((file_offset & ~PAGE_MASK) || (count & ~PAGE_MASK))
-               return -EINVAL;
+       if (file_offset & ~PAGE_MASK)
+               RETURN(-EINVAL);
 
        CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), "
               "offset=%lld=%llx, pages %zd (max %lu)\n",
@@ -419,7 +459,7 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
 
        /* Check that all user buffers are aligned as well */
        if (ll_iov_iter_alignment(iter) & ~PAGE_MASK)
-               return -EINVAL;
+               RETURN(-EINVAL);
 
        lcc = ll_cl_find(file);
        if (lcc == NULL)
@@ -427,19 +467,13 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
 
        env = lcc->lcc_env;
        LASSERT(!IS_ERR(env));
+       vio = vvp_env_io(env);
        io = lcc->lcc_io;
        LASSERT(io != NULL);
 
-       aio = cl_aio_alloc(iocb);
-       if (!aio)
-               RETURN(-ENOMEM);
-
-       /* 0. Need locking between buffered and direct access. and race with
-        *    size changing by concurrent truncates and writes.
-        * 1. Need inode mutex to operate transient pages.
-        */
-       if (rw == READ)
-               inode_lock(inode);
+       aio = io->ci_aio;
+       LASSERT(aio);
+       LASSERT(aio->cda_iocb == iocb);
 
        while (iov_iter_count(iter)) {
                struct ll_dio_pages pvec = { .ldp_aio = aio };
@@ -476,30 +510,42 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        }
 
 out:
-       aio->cda_bytes = tot_bytes;
-       cl_sync_io_note(env, &aio->cda_sync, result);
+       aio->cda_bytes += tot_bytes;
 
        if (is_sync_kiocb(iocb)) {
+               struct cl_sync_io *anchor = &aio->cda_sync;
                ssize_t rc2;
 
-               rc2 = cl_sync_io_wait(env, &aio->cda_sync, 0);
+               /**
+                * @anchor was inited as 1 to prevent end_io to be
+                * called before we add all pages for IO, so drop
+                * one extra reference to make sure we could wait
+                * count to be zero.
+                */
+               cl_sync_io_note(env, anchor, result);
+
+               rc2 = cl_sync_io_wait(env, anchor, 0);
                if (result == 0 && rc2)
                        result = rc2;
-
+               /**
+                * One extra reference again, as if @anchor is
+                * reused we assume it as 1 before using.
+                */
+               atomic_add(1, &anchor->csi_sync_nr);
                if (result == 0) {
-                       struct vvp_io *vio = vvp_env_io(env);
                        /* no commit async for direct IO */
-                       vio->u.write.vui_written += tot_bytes;
+                       vio->u.readwrite.vui_written += tot_bytes;
                        result = tot_bytes;
                }
-               cl_aio_free(aio);
        } else {
-               result = -EIOCBQUEUED;
+               if (rw == WRITE)
+                       vio->u.readwrite.vui_written += tot_bytes;
+               else
+                       vio->u.readwrite.vui_read += tot_bytes;
+               if (result == 0)
+                       result = -EIOCBQUEUED;
        }
 
-       if (rw == READ)
-               inode_unlock(inode);
-
        return result;
 }
 
@@ -649,12 +695,12 @@ static int ll_write_begin(struct file *file, struct address_space *mapping,
                        GOTO(out, result = -EBUSY);
 
                /**
-                * Direct read can fall back to buffered read, but DIO is done
+                * Direct write can fall back to buffered read, but DIO is done
                 * with lockless i/o, and buffered requires LDLM locking, so
                 * in this case we must restart without lockless.
                 */
-               if (!io->ci_ignore_lockless) {
-                       io->ci_ignore_lockless = 1;
+               if (!io->ci_dio_lock) {
+                       io->ci_dio_lock = 1;
                        io->ci_need_restart = 1;
                        GOTO(out, result = -ENOLCK);
                }
@@ -666,7 +712,7 @@ again:
        if (unlikely(vmpage == NULL ||
                     PageDirty(vmpage) || PageWriteback(vmpage))) {
                struct vvp_io *vio = vvp_env_io(env);
-               struct cl_page_list *plist = &vio->u.write.vui_queue;
+               struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
 
                 /* if the page is already in dirty cache, we have to commit
                 * the pages right now; otherwise, it may cause deadlock
@@ -827,17 +873,17 @@ static int ll_write_end(struct file *file, struct address_space *mapping,
 
        LASSERT(cl_page_is_owned(page, io));
        if (copied > 0) {
-               struct cl_page_list *plist = &vio->u.write.vui_queue;
+               struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
 
                lcc->lcc_page = NULL; /* page will be queued */
 
                /* Add it into write queue */
                cl_page_list_add(plist, page);
                if (plist->pl_nr == 1) /* first page */
-                       vio->u.write.vui_from = from;
+                       vio->u.readwrite.vui_from = from;
                else
                        LASSERT(from == 0);
-               vio->u.write.vui_to = from + copied;
+               vio->u.readwrite.vui_to = from + copied;
 
                /* To address the deadlock in balance_dirty_pages() where
                 * this dirty page may be written back in the same thread. */