*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/lustre/llite/rw26.c
*
if (*pages == NULL)
return -ENOMEM;
- down_read(¤t->mm->mmap_sem);
+ mmap_read_lock(current->mm);
result = get_user_pages(current, current->mm, addr, page_count,
rw == READ, 0, *pages, NULL);
- up_read(¤t->mm->mmap_sem);
+ mmap_read_unlock(current->mm);
if (unlikely(result != page_count)) {
ll_free_user_pages(*pages, page_count);
/* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */
#if defined(HAVE_DIO_ITER)
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
{
return iov_iter_alignment(i);
}
#else /* copied from alignment_iovec() */
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
{
const struct iovec *iov = i->iov;
unsigned long res;
}
#endif
+/*
+ * Lustre could relax a bit for alignment, io count is not
+ * necessary page alignment.
+ */
+static unsigned long ll_iov_iter_alignment(struct iov_iter *i)
+{
+ size_t orig_size = i->count;
+ size_t count = orig_size & ~PAGE_MASK;
+ unsigned long res;
+
+ if (!count)
+ return iov_iter_alignment_vfs(i);
+
+ if (orig_size > PAGE_SIZE) {
+ iov_iter_truncate(i, orig_size - count);
+ res = iov_iter_alignment_vfs(i);
+ iov_iter_reexpand(i, orig_size);
+
+ return res;
+ }
+
+ res = iov_iter_alignment_vfs(i);
+ /* start address is page aligned */
+ if ((res & ~PAGE_MASK) == orig_size)
+ return PAGE_SIZE;
+
+ return res;
+}
+
/** direct IO pages */
struct ll_dio_pages {
struct cl_dio_aio *ldp_aio;
}
page->cp_sync_io = anchor;
+ if (inode && IS_ENCRYPTED(inode)) {
+ /* In case of Direct IO on encrypted file, we need to
+ * add a reference to the inode on the cl_page.
+ * This info is required by llcrypt to proceed
+ * to encryption/decryption.
+ * This is safe because we know these pages are private
+ * to the thread doing the Direct IO.
+ */
+ page->cp_inode = inode;
+ }
cl_2queue_add(queue, page);
/*
* Set page clip to tell transfer formation engine
int iot = rw == READ ? CRT_READ : CRT_WRITE;
atomic_add(io_pages, &anchor->csi_sync_nr);
+ /*
+ * Avoid out-of-order execution of adding inflight
+ * modifications count and io submit.
+ */
+ smp_mb();
rc = cl_io_submit_rw(env, io, iot, queue);
if (rc == 0) {
cl_page_list_splice(&queue->c2_qout,
* then truncate this to be a full-sized RPC. For 4kB PAGE_SIZE this is
* up to 22MB for 128kB kmalloc and up to 682MB for 4MB kmalloc. */
#define MAX_DIO_SIZE ((MAX_MALLOC / sizeof(struct brw_page) * PAGE_SIZE) & \
- ~(DT_MAX_BRW_SIZE - 1))
+ ~((size_t)DT_MAX_BRW_SIZE - 1))
static ssize_t
ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
size_t count = iov_iter_count(iter);
ssize_t tot_bytes = 0, result = 0;
loff_t file_offset = iocb->ki_pos;
+ struct vvp_io *vio;
/* Check EOF by ourselves */
if (rw == READ && file_offset >= i_size_read(inode))
return 0;
/* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */
- if ((file_offset & ~PAGE_MASK) || (count & ~PAGE_MASK))
- return -EINVAL;
+ if (file_offset & ~PAGE_MASK)
+ RETURN(-EINVAL);
CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), "
"offset=%lld=%llx, pages %zd (max %lu)\n",
/* Check that all user buffers are aligned as well */
if (ll_iov_iter_alignment(iter) & ~PAGE_MASK)
- return -EINVAL;
+ RETURN(-EINVAL);
lcc = ll_cl_find(file);
if (lcc == NULL)
env = lcc->lcc_env;
LASSERT(!IS_ERR(env));
+ vio = vvp_env_io(env);
io = lcc->lcc_io;
LASSERT(io != NULL);
- aio = cl_aio_alloc(iocb);
- if (!aio)
- RETURN(-ENOMEM);
-
- /* 0. Need locking between buffered and direct access. and race with
- * size changing by concurrent truncates and writes.
- * 1. Need inode mutex to operate transient pages.
- */
- if (rw == READ)
- inode_lock(inode);
+ aio = io->ci_aio;
+ LASSERT(aio);
+ LASSERT(aio->cda_iocb == iocb);
while (iov_iter_count(iter)) {
struct ll_dio_pages pvec = { .ldp_aio = aio };
}
out:
- aio->cda_bytes = tot_bytes;
- cl_sync_io_note(env, &aio->cda_sync, result);
+ aio->cda_bytes += tot_bytes;
if (is_sync_kiocb(iocb)) {
+ struct cl_sync_io *anchor = &aio->cda_sync;
ssize_t rc2;
- rc2 = cl_sync_io_wait(env, &aio->cda_sync, 0);
+ /**
+ * @anchor was inited as 1 to prevent end_io to be
+ * called before we add all pages for IO, so drop
+ * one extra reference to make sure we could wait
+ * count to be zero.
+ */
+ cl_sync_io_note(env, anchor, result);
+
+ rc2 = cl_sync_io_wait(env, anchor, 0);
if (result == 0 && rc2)
result = rc2;
-
+ /**
+ * One extra reference again, as if @anchor is
+ * reused we assume it as 1 before using.
+ */
+ atomic_add(1, &anchor->csi_sync_nr);
if (result == 0) {
- struct vvp_io *vio = vvp_env_io(env);
/* no commit async for direct IO */
- vio->u.write.vui_written += tot_bytes;
+ vio->u.readwrite.vui_written += tot_bytes;
result = tot_bytes;
}
- OBD_FREE_PTR(aio);
-
} else {
- result = -EIOCBQUEUED;
+ if (rw == WRITE)
+ vio->u.readwrite.vui_written += tot_bytes;
+ else
+ vio->u.readwrite.vui_read += tot_bytes;
+ if (result == 0)
+ result = -EIOCBQUEUED;
}
- if (rw == READ)
- inode_unlock(inode);
-
return result;
}
GOTO(out, result = -EBUSY);
/**
- * Direct read can fall back to buffered read, but DIO is done
+ * Direct write can fall back to buffered read, but DIO is done
* with lockless i/o, and buffered requires LDLM locking, so
* in this case we must restart without lockless.
*/
- if (!io->ci_ignore_lockless) {
- io->ci_ignore_lockless = 1;
+ if (!io->ci_dio_lock) {
+ io->ci_dio_lock = 1;
io->ci_need_restart = 1;
GOTO(out, result = -ENOLCK);
}
if (unlikely(vmpage == NULL ||
PageDirty(vmpage) || PageWriteback(vmpage))) {
struct vvp_io *vio = vvp_env_io(env);
- struct cl_page_list *plist = &vio->u.write.vui_queue;
+ struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
/* if the page is already in dirty cache, we have to commit
* the pages right now; otherwise, it may cause deadlock
LASSERT(cl_page_is_owned(page, io));
if (copied > 0) {
- struct cl_page_list *plist = &vio->u.write.vui_queue;
+ struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
lcc->lcc_page = NULL; /* page will be queued */
/* Add it into write queue */
cl_page_list_add(plist, page);
if (plist->pl_nr == 1) /* first page */
- vio->u.write.vui_from = from;
+ vio->u.readwrite.vui_from = from;
else
LASSERT(from == 0);
- vio->u.write.vui_to = from + copied;
+ vio->u.readwrite.vui_to = from + copied;
/* To address the deadlock in balance_dirty_pages() where
* this dirty page may be written back in the same thread. */