*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/lustre/llite/rw26.c
*
#include <linux/string.h>
#include <linux/unistd.h>
#include <linux/writeback.h>
-
-#ifdef HAVE_MIGRATE_H
#include <linux/migrate.h>
-#elif defined(HAVE_MIGRATE_MODE_H)
-#include <linux/migrate_mode.h>
-#endif
#define DEBUG_SUBSYSTEM S_LLITE
#if defined(HAVE_DIO_ITER)
kvfree(pages);
#else
- OBD_FREE_LARGE(pages, npages * sizeof(*pages));
+ OBD_FREE_PTR_ARRAY_LARGE(pages, npages);
#endif
}
size = min_t(size_t, maxsize, iter->iov->iov_len);
page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT;
- OBD_ALLOC_LARGE(*pages, page_count * sizeof(**pages));
+ OBD_ALLOC_PTR_ARRAY_LARGE(*pages, page_count);
if (*pages == NULL)
return -ENOMEM;
- down_read(¤t->mm->mmap_sem);
+ mmap_read_lock(current->mm);
result = get_user_pages(current, current->mm, addr, page_count,
rw == READ, 0, *pages, NULL);
- up_read(¤t->mm->mmap_sem);
+ mmap_read_unlock(current->mm);
if (unlikely(result != page_count)) {
ll_free_user_pages(*pages, page_count);
/* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */
#if defined(HAVE_DIO_ITER)
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
{
return iov_iter_alignment(i);
}
#else /* copied from alignment_iovec() */
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
{
const struct iovec *iov = i->iov;
unsigned long res;
}
#endif
-#ifndef HAVE_AIO_COMPLETE
-static inline void aio_complete(struct kiocb *iocb, ssize_t res, ssize_t res2)
+/*
+ * Lustre could relax a bit for alignment, io count is not
+ * necessary page alignment.
+ */
+static unsigned long ll_iov_iter_alignment(struct iov_iter *i)
{
- if (iocb->ki_complete)
- iocb->ki_complete(iocb, res, res2);
+ size_t orig_size = i->count;
+ size_t count = orig_size & ~PAGE_MASK;
+ unsigned long res;
+
+ if (!count)
+ return iov_iter_alignment_vfs(i);
+
+ if (orig_size > PAGE_SIZE) {
+ iov_iter_truncate(i, orig_size - count);
+ res = iov_iter_alignment_vfs(i);
+ iov_iter_reexpand(i, orig_size);
+
+ return res;
+ }
+
+ res = iov_iter_alignment_vfs(i);
+ /* start address is page aligned */
+ if ((res & ~PAGE_MASK) == orig_size)
+ return PAGE_SIZE;
+
+ return res;
}
-#endif
/** direct IO pages */
struct ll_dio_pages {
loff_t ldp_file_offset;
};
-static void ll_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
-{
- struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync);
- ssize_t ret = anchor->csi_sync_rc;
-
- ENTRY;
-
- /* release pages */
- while (aio->cda_pages.pl_nr > 0) {
- struct cl_page *page = cl_page_list_first(&aio->cda_pages);
-
- cl_page_get(page);
- cl_page_list_del(env, &aio->cda_pages, page);
- cl_page_delete(env, page);
- cl_page_put(env, page);
- }
-
- if (!is_sync_kiocb(aio->cda_iocb))
- aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
-
- EXIT;
-}
-
-static struct cl_dio_aio *ll_aio_alloc(struct kiocb *iocb)
-{
- struct cl_dio_aio *aio;
-
- OBD_ALLOC_PTR(aio);
- if (aio != NULL) {
- /*
- * Hold one ref so that it won't be released until
- * every pages is added.
- */
- cl_sync_io_init_notify(&aio->cda_sync, 1, is_sync_kiocb(iocb) ?
- NULL : aio, ll_aio_end);
- cl_page_list_init(&aio->cda_pages);
- aio->cda_iocb = iocb;
- }
- return aio;
-}
-
static int
ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
int rw, struct inode *inode, struct ll_dio_pages *pv)
}
page->cp_sync_io = anchor;
- cl_2queue_add(queue, page);
+ if (inode && IS_ENCRYPTED(inode)) {
+ /* In case of Direct IO on encrypted file, we need to
+ * add a reference to the inode on the cl_page.
+ * This info is required by llcrypt to proceed
+ * to encryption/decryption.
+ * This is safe because we know these pages are private
+ * to the thread doing the Direct IO.
+ */
+ page->cp_inode = inode;
+ }
+ /* We keep the refcount from cl_page_find, so we don't need
+ * another one here
+ */
+ cl_2queue_add(queue, page, false);
/*
* Set page clip to tell transfer formation engine
* that page has to be sent even if it is beyond KMS.
*/
- cl_page_clip(env, page, 0, min(size, page_size));
+ if (size < page_size)
+ cl_page_clip(env, page, 0, size);
++io_pages;
- /* drop the reference count for cl_page_find */
- cl_page_put(env, page);
offset += page_size;
size -= page_size;
}
int iot = rw == READ ? CRT_READ : CRT_WRITE;
atomic_add(io_pages, &anchor->csi_sync_nr);
+ /*
+ * Avoid out-of-order execution of adding inflight
+ * modifications count and io submit.
+ */
+ smp_mb();
rc = cl_io_submit_rw(env, io, iot, queue);
if (rc == 0) {
cl_page_list_splice(&queue->c2_qout,
* then truncate this to be a full-sized RPC. For 4kB PAGE_SIZE this is
* up to 22MB for 128kB kmalloc and up to 682MB for 4MB kmalloc. */
#define MAX_DIO_SIZE ((MAX_MALLOC / sizeof(struct brw_page) * PAGE_SIZE) & \
- ~(DT_MAX_BRW_SIZE - 1))
+ ~((size_t)DT_MAX_BRW_SIZE - 1))
static ssize_t
ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
size_t count = iov_iter_count(iter);
ssize_t tot_bytes = 0, result = 0;
loff_t file_offset = iocb->ki_pos;
+ struct vvp_io *vio;
/* Check EOF by ourselves */
if (rw == READ && file_offset >= i_size_read(inode))
return 0;
/* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */
- if ((file_offset & ~PAGE_MASK) || (count & ~PAGE_MASK))
- return -EINVAL;
+ if (file_offset & ~PAGE_MASK)
+ RETURN(-EINVAL);
CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), "
"offset=%lld=%llx, pages %zd (max %lu)\n",
/* Check that all user buffers are aligned as well */
if (ll_iov_iter_alignment(iter) & ~PAGE_MASK)
- return -EINVAL;
+ RETURN(-EINVAL);
lcc = ll_cl_find(file);
if (lcc == NULL)
env = lcc->lcc_env;
LASSERT(!IS_ERR(env));
+ vio = vvp_env_io(env);
io = lcc->lcc_io;
LASSERT(io != NULL);
- aio = ll_aio_alloc(iocb);
- if (!aio)
- RETURN(-ENOMEM);
-
- /* 0. Need locking between buffered and direct access. and race with
- * size changing by concurrent truncates and writes.
- * 1. Need inode mutex to operate transient pages.
- */
- if (rw == READ)
- inode_lock(inode);
+ aio = io->ci_aio;
+ LASSERT(aio);
+ LASSERT(aio->cda_iocb == iocb);
while (iov_iter_count(iter)) {
struct ll_dio_pages pvec = { .ldp_aio = aio };
}
out:
- aio->cda_bytes = tot_bytes;
- cl_sync_io_note(env, &aio->cda_sync, result);
+ aio->cda_bytes += tot_bytes;
- if (is_sync_kiocb(iocb)) {
+ if (rw == WRITE)
+ vio->u.readwrite.vui_written += tot_bytes;
+ else
+ vio->u.readwrite.vui_read += tot_bytes;
+
+ /* We cannot do async submission - for AIO or regular DIO - unless
+ * lockless because it causes us to release the lock early.
+ *
+ * There are also several circumstances in which we must disable
+ * parallel DIO, so we check if it is enabled.
+ *
+ * The check for "is_sync_kiocb" excludes AIO, which does not need to
+ * be disabled in these situations.
+ */
+ if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio)) {
ssize_t rc2;
- rc2 = cl_sync_io_wait(env, &aio->cda_sync, 0);
+ /* Wait here rather than doing async submission */
+ rc2 = cl_sync_io_wait_recycle(env, &aio->cda_sync, 0, 0);
if (result == 0 && rc2)
result = rc2;
- if (result == 0) {
- struct vvp_io *vio = vvp_env_io(env);
- /* no commit async for direct IO */
- vio->u.write.vui_written += tot_bytes;
+ if (result == 0)
result = tot_bytes;
- }
- OBD_FREE_PTR(aio);
-
- } else {
+ } else if (result == 0) {
result = -EIOCBQUEUED;
}
- if (rw == READ)
- inode_unlock(inode);
-
return result;
}
* purposes here we can treat it like i_size.
*/
if (attr->cat_kms <= offset) {
- char *kaddr = ll_kmap_atomic(vpg->vpg_page, KM_USER0);
+ char *kaddr = kmap_atomic(vpg->vpg_page);
memset(kaddr, 0, cl_page_size(obj));
- ll_kunmap_atomic(kaddr, KM_USER0);
+ kunmap_atomic(kaddr);
GOTO(out, result = 0);
}
return result;
}
-static int ll_tiny_write_begin(struct page *vmpage)
+static int ll_tiny_write_begin(struct page *vmpage, struct address_space *mapping)
{
/* Page must be present, up to date, dirty, and not in writeback. */
if (!vmpage || !PageUptodate(vmpage) || !PageDirty(vmpage) ||
- PageWriteback(vmpage))
+ PageWriteback(vmpage) || vmpage->mapping != mapping)
return -ENODATA;
return 0;
lcc = ll_cl_find(file);
if (lcc == NULL) {
vmpage = grab_cache_page_nowait(mapping, index);
- result = ll_tiny_write_begin(vmpage);
+ result = ll_tiny_write_begin(vmpage, mapping);
GOTO(out, result);
}
GOTO(out, result = -EBUSY);
/**
- * Direct read can fall back to buffered read, but DIO is done
+ * Direct write can fall back to buffered read, but DIO is done
* with lockless i/o, and buffered requires LDLM locking, so
* in this case we must restart without lockless.
*/
- if (!io->ci_ignore_lockless) {
- io->ci_ignore_lockless = 1;
+ if (!io->ci_dio_lock) {
+ io->ci_dio_lock = 1;
io->ci_need_restart = 1;
GOTO(out, result = -ENOLCK);
}
if (unlikely(vmpage == NULL ||
PageDirty(vmpage) || PageWriteback(vmpage))) {
struct vvp_io *vio = vvp_env_io(env);
- struct cl_page_list *plist = &vio->u.write.vui_queue;
+ struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
/* if the page is already in dirty cache, we have to commit
* the pages right now; otherwise, it may cause deadlock
}
}
+ /* page was truncated */
+ if (mapping != vmpage->mapping) {
+ CDEBUG(D_VFSTRACE, "page: %lu was truncated\n", index);
+ unlock_page(vmpage);
+ put_page(vmpage);
+ vmpage = NULL;
+ goto again;
+ }
+
page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
if (IS_ERR(page))
GOTO(out, result = PTR_ERR(page));
LASSERT(cl_page_is_owned(page, io));
if (copied > 0) {
- struct cl_page_list *plist = &vio->u.write.vui_queue;
+ struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
lcc->lcc_page = NULL; /* page will be queued */
/* Add it into write queue */
- cl_page_list_add(plist, page);
+ cl_page_list_add(plist, page, true);
if (plist->pl_nr == 1) /* first page */
- vio->u.write.vui_from = from;
+ vio->u.readwrite.vui_from = from;
else
LASSERT(from == 0);
- vio->u.write.vui_to = from + copied;
+ vio->u.readwrite.vui_to = from + copied;
/* To address the deadlock in balance_dirty_pages() where
* this dirty page may be written back in the same thread. */
#ifdef CONFIG_MIGRATION
static int ll_migratepage(struct address_space *mapping,
- struct page *newpage, struct page *page
-#ifdef HAVE_MIGRATEPAGE_4ARGS
- , enum migrate_mode mode
-#endif
- )
+ struct page *newpage, struct page *page,
+ enum migrate_mode mode)
{
/* Always fail page migration until we have a proper implementation */
return -EIO;