Whamcloud - gitweb
LU-14687 llite: Return errors for aio
[fs/lustre-release.git] / lustre / llite / rw26.c
index 8ea28c1..6fdeba9 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/lustre/llite/rw26.c
  *
 #include <linux/string.h>
 #include <linux/unistd.h>
 #include <linux/writeback.h>
-
-#ifdef HAVE_MIGRATE_H
 #include <linux/migrate.h>
-#elif defined(HAVE_MIGRATE_MODE_H)
-#include <linux/migrate_mode.h>
-#endif
 
 #define DEBUG_SUBSYSTEM S_LLITE
 
@@ -190,7 +184,7 @@ static void ll_free_user_pages(struct page **pages, int npages)
 #if defined(HAVE_DIO_ITER)
        kvfree(pages);
 #else
-       OBD_FREE_LARGE(pages, npages * sizeof(*pages));
+       OBD_FREE_PTR_ARRAY_LARGE(pages, npages);
 #endif
 }
 
@@ -229,14 +223,14 @@ static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter,
 
        size = min_t(size_t, maxsize, iter->iov->iov_len);
        page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT;
-       OBD_ALLOC_LARGE(*pages, page_count * sizeof(**pages));
+       OBD_ALLOC_PTR_ARRAY_LARGE(*pages, page_count);
        if (*pages == NULL)
                return -ENOMEM;
 
-       down_read(&current->mm->mmap_sem);
+       mmap_read_lock(current->mm);
        result = get_user_pages(current, current->mm, addr, page_count,
                                rw == READ, 0, *pages, NULL);
-       up_read(&current->mm->mmap_sem);
+       mmap_read_unlock(current->mm);
 
        if (unlikely(result != page_count)) {
                ll_free_user_pages(*pages, page_count);
@@ -255,12 +249,12 @@ static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter,
 
 /* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */
 #if defined(HAVE_DIO_ITER)
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
 {
        return iov_iter_alignment(i);
 }
 #else /* copied from alignment_iovec() */
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
 {
        const struct iovec *iov = i->iov;
        unsigned long res;
@@ -287,13 +281,34 @@ static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
 }
 #endif
 
-#ifndef HAVE_AIO_COMPLETE
-static inline void aio_complete(struct kiocb *iocb, ssize_t res, ssize_t res2)
+/*
+ * Lustre could relax a bit for alignment, io count is not
+ * necessary page alignment.
+ */
+static unsigned long ll_iov_iter_alignment(struct iov_iter *i)
 {
-       if (iocb->ki_complete)
-               iocb->ki_complete(iocb, res, res2);
+       size_t orig_size = i->count;
+       size_t count = orig_size & ~PAGE_MASK;
+       unsigned long res;
+
+       if (!count)
+               return iov_iter_alignment_vfs(i);
+
+       if (orig_size > PAGE_SIZE) {
+               iov_iter_truncate(i, orig_size - count);
+               res = iov_iter_alignment_vfs(i);
+               iov_iter_reexpand(i, orig_size);
+
+               return res;
+       }
+
+       res = iov_iter_alignment_vfs(i);
+       /* start address is page aligned */
+       if ((res & ~PAGE_MASK) == orig_size)
+               return PAGE_SIZE;
+
+       return res;
 }
-#endif
 
 /** direct IO pages */
 struct ll_dio_pages {
@@ -309,47 +324,6 @@ struct ll_dio_pages {
        loff_t                  ldp_file_offset;
 };
 
-static void ll_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
-{
-       struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync);
-       ssize_t ret = anchor->csi_sync_rc;
-
-       ENTRY;
-
-       /* release pages */
-       while (aio->cda_pages.pl_nr > 0) {
-               struct cl_page *page = cl_page_list_first(&aio->cda_pages);
-
-               cl_page_get(page);
-               cl_page_list_del(env, &aio->cda_pages, page);
-               cl_page_delete(env, page);
-               cl_page_put(env, page);
-       }
-
-       if (!is_sync_kiocb(aio->cda_iocb))
-               aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
-
-       EXIT;
-}
-
-static struct cl_dio_aio *ll_aio_alloc(struct kiocb *iocb)
-{
-       struct cl_dio_aio *aio;
-
-       OBD_ALLOC_PTR(aio);
-       if (aio != NULL) {
-               /*
-                * Hold one ref so that it won't be released until
-                * every pages is added.
-                */
-               cl_sync_io_init_notify(&aio->cda_sync, 1, is_sync_kiocb(iocb) ?
-                                      NULL : aio, ll_aio_end);
-               cl_page_list_init(&aio->cda_pages);
-               aio->cda_iocb = iocb;
-       }
-       return aio;
-}
-
 static int
 ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
                   int rw, struct inode *inode, struct ll_dio_pages *pv)
@@ -383,6 +357,16 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
                }
 
                page->cp_sync_io = anchor;
+               if (inode && IS_ENCRYPTED(inode)) {
+                       /* In case of Direct IO on encrypted file, we need to
+                        * add a reference to the inode on the cl_page.
+                        * This info is required by llcrypt to proceed
+                        * to encryption/decryption.
+                        * This is safe because we know these pages are private
+                        * to the thread doing the Direct IO.
+                        */
+                       page->cp_inode = inode;
+               }
                cl_2queue_add(queue, page);
                /*
                 * Set page clip to tell transfer formation engine
@@ -400,6 +384,11 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
                int iot = rw == READ ? CRT_READ : CRT_WRITE;
 
                atomic_add(io_pages, &anchor->csi_sync_nr);
+               /*
+                * Avoid out-of-order execution of adding inflight
+                * modifications count and io submit.
+                */
+               smp_mb();
                rc = cl_io_submit_rw(env, io, iot, queue);
                if (rc == 0) {
                        cl_page_list_splice(&queue->c2_qout,
@@ -438,7 +427,7 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
  * then truncate this to be a full-sized RPC.  For 4kB PAGE_SIZE this is
  * up to 22MB for 128kB kmalloc and up to 682MB for 4MB kmalloc. */
 #define MAX_DIO_SIZE ((MAX_MALLOC / sizeof(struct brw_page) * PAGE_SIZE) & \
-                     ~(DT_MAX_BRW_SIZE - 1))
+                     ~((size_t)DT_MAX_BRW_SIZE - 1))
 
 static ssize_t
 ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
@@ -452,14 +441,15 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        size_t count = iov_iter_count(iter);
        ssize_t tot_bytes = 0, result = 0;
        loff_t file_offset = iocb->ki_pos;
+       struct vvp_io *vio;
 
        /* Check EOF by ourselves */
        if (rw == READ && file_offset >= i_size_read(inode))
                return 0;
 
        /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */
-       if ((file_offset & ~PAGE_MASK) || (count & ~PAGE_MASK))
-               return -EINVAL;
+       if (file_offset & ~PAGE_MASK)
+               RETURN(-EINVAL);
 
        CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), "
               "offset=%lld=%llx, pages %zd (max %lu)\n",
@@ -469,7 +459,7 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
 
        /* Check that all user buffers are aligned as well */
        if (ll_iov_iter_alignment(iter) & ~PAGE_MASK)
-               return -EINVAL;
+               RETURN(-EINVAL);
 
        lcc = ll_cl_find(file);
        if (lcc == NULL)
@@ -477,19 +467,13 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
 
        env = lcc->lcc_env;
        LASSERT(!IS_ERR(env));
+       vio = vvp_env_io(env);
        io = lcc->lcc_io;
        LASSERT(io != NULL);
 
-       aio = ll_aio_alloc(iocb);
-       if (!aio)
-               RETURN(-ENOMEM);
-
-       /* 0. Need locking between buffered and direct access. and race with
-        *    size changing by concurrent truncates and writes.
-        * 1. Need inode mutex to operate transient pages.
-        */
-       if (rw == READ)
-               inode_lock(inode);
+       aio = io->ci_aio;
+       LASSERT(aio);
+       LASSERT(aio->cda_iocb == iocb);
 
        while (iov_iter_count(iter)) {
                struct ll_dio_pages pvec = { .ldp_aio = aio };
@@ -526,31 +510,42 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        }
 
 out:
-       aio->cda_bytes = tot_bytes;
-       cl_sync_io_note(env, &aio->cda_sync, result);
+       aio->cda_bytes += tot_bytes;
 
        if (is_sync_kiocb(iocb)) {
+               struct cl_sync_io *anchor = &aio->cda_sync;
                ssize_t rc2;
 
-               rc2 = cl_sync_io_wait(env, &aio->cda_sync, 0);
+               /**
+                * @anchor was inited as 1 to prevent end_io to be
+                * called before we add all pages for IO, so drop
+                * one extra reference to make sure we could wait
+                * count to be zero.
+                */
+               cl_sync_io_note(env, anchor, result);
+
+               rc2 = cl_sync_io_wait(env, anchor, 0);
                if (result == 0 && rc2)
                        result = rc2;
-
+               /**
+                * One extra reference again, as if @anchor is
+                * reused we assume it as 1 before using.
+                */
+               atomic_add(1, &anchor->csi_sync_nr);
                if (result == 0) {
-                       struct vvp_io *vio = vvp_env_io(env);
                        /* no commit async for direct IO */
-                       vio->u.write.vui_written += tot_bytes;
+                       vio->u.readwrite.vui_written += tot_bytes;
                        result = tot_bytes;
                }
-               OBD_FREE_PTR(aio);
-
        } else {
-               result = -EIOCBQUEUED;
+               if (rw == WRITE)
+                       vio->u.readwrite.vui_written += tot_bytes;
+               else
+                       vio->u.readwrite.vui_read += tot_bytes;
+               if (result == 0)
+                       result = -EIOCBQUEUED;
        }
 
-       if (rw == READ)
-               inode_unlock(inode);
-
        return result;
 }
 
@@ -619,10 +614,10 @@ static int ll_prepare_partial_page(const struct lu_env *env, struct cl_io *io,
         * purposes here we can treat it like i_size.
         */
        if (attr->cat_kms <= offset) {
-               char *kaddr = ll_kmap_atomic(vpg->vpg_page, KM_USER0);
+               char *kaddr = kmap_atomic(vpg->vpg_page);
 
                memset(kaddr, 0, cl_page_size(obj));
-               ll_kunmap_atomic(kaddr, KM_USER0);
+               kunmap_atomic(kaddr);
                GOTO(out, result = 0);
        }
 
@@ -652,11 +647,11 @@ out:
        return result;
 }
 
-static int ll_tiny_write_begin(struct page *vmpage)
+static int ll_tiny_write_begin(struct page *vmpage, struct address_space *mapping)
 {
        /* Page must be present, up to date, dirty, and not in writeback. */
        if (!vmpage || !PageUptodate(vmpage) || !PageDirty(vmpage) ||
-           PageWriteback(vmpage))
+           PageWriteback(vmpage) || vmpage->mapping != mapping)
                return -ENODATA;
 
        return 0;
@@ -684,7 +679,7 @@ static int ll_write_begin(struct file *file, struct address_space *mapping,
        lcc = ll_cl_find(file);
        if (lcc == NULL) {
                vmpage = grab_cache_page_nowait(mapping, index);
-               result = ll_tiny_write_begin(vmpage);
+               result = ll_tiny_write_begin(vmpage, mapping);
                GOTO(out, result);
        }
 
@@ -700,12 +695,12 @@ static int ll_write_begin(struct file *file, struct address_space *mapping,
                        GOTO(out, result = -EBUSY);
 
                /**
-                * Direct read can fall back to buffered read, but DIO is done
+                * Direct write can fall back to buffered read, but DIO is done
                 * with lockless i/o, and buffered requires LDLM locking, so
                 * in this case we must restart without lockless.
                 */
-               if (!io->ci_ignore_lockless) {
-                       io->ci_ignore_lockless = 1;
+               if (!io->ci_dio_lock) {
+                       io->ci_dio_lock = 1;
                        io->ci_need_restart = 1;
                        GOTO(out, result = -ENOLCK);
                }
@@ -717,7 +712,7 @@ again:
        if (unlikely(vmpage == NULL ||
                     PageDirty(vmpage) || PageWriteback(vmpage))) {
                struct vvp_io *vio = vvp_env_io(env);
-               struct cl_page_list *plist = &vio->u.write.vui_queue;
+               struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
 
                 /* if the page is already in dirty cache, we have to commit
                 * the pages right now; otherwise, it may cause deadlock
@@ -743,6 +738,15 @@ again:
                }
        }
 
+       /* page was truncated */
+       if (mapping != vmpage->mapping) {
+               CDEBUG(D_VFSTRACE, "page: %lu was truncated\n", index);
+               unlock_page(vmpage);
+               put_page(vmpage);
+               vmpage = NULL;
+               goto again;
+       }
+
        page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
        if (IS_ERR(page))
                GOTO(out, result = PTR_ERR(page));
@@ -869,17 +873,17 @@ static int ll_write_end(struct file *file, struct address_space *mapping,
 
        LASSERT(cl_page_is_owned(page, io));
        if (copied > 0) {
-               struct cl_page_list *plist = &vio->u.write.vui_queue;
+               struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
 
                lcc->lcc_page = NULL; /* page will be queued */
 
                /* Add it into write queue */
                cl_page_list_add(plist, page);
                if (plist->pl_nr == 1) /* first page */
-                       vio->u.write.vui_from = from;
+                       vio->u.readwrite.vui_from = from;
                else
                        LASSERT(from == 0);
-               vio->u.write.vui_to = from + copied;
+               vio->u.readwrite.vui_to = from + copied;
 
                /* To address the deadlock in balance_dirty_pages() where
                 * this dirty page may be written back in the same thread. */
@@ -915,11 +919,8 @@ out:
 
 #ifdef CONFIG_MIGRATION
 static int ll_migratepage(struct address_space *mapping,
-                         struct page *newpage, struct page *page
-#ifdef HAVE_MIGRATEPAGE_4ARGS
-                         , enum migrate_mode mode
-#endif
-       )
+                         struct page *newpage, struct page *page,
+                         enum migrate_mode mode)
 {
         /* Always fail page migration until we have a proper implementation */
         return -EIO;