*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/lustre/llite/rw26.c
*
if (*pages == NULL)
return -ENOMEM;
- down_read(¤t->mm->mmap_sem);
+ mmap_read_lock(current->mm);
result = get_user_pages(current, current->mm, addr, page_count,
rw == READ, 0, *pages, NULL);
- up_read(¤t->mm->mmap_sem);
+ mmap_read_unlock(current->mm);
if (unlikely(result != page_count)) {
ll_free_user_pages(*pages, page_count);
/* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */
#if defined(HAVE_DIO_ITER)
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
{
return iov_iter_alignment(i);
}
#else /* copied from alignment_iovec() */
-static unsigned long ll_iov_iter_alignment(const struct iov_iter *i)
+static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
{
const struct iovec *iov = i->iov;
unsigned long res;
}
#endif
+/*
+ * Lustre could relax a bit for alignment, io count is not
+ * necessary page alignment.
+ */
+static unsigned long ll_iov_iter_alignment(struct iov_iter *i)
+{
+ size_t orig_size = i->count;
+ size_t count = orig_size & ~PAGE_MASK;
+ unsigned long res;
+
+ if (!count)
+ return iov_iter_alignment_vfs(i);
+
+ if (orig_size > PAGE_SIZE) {
+ iov_iter_truncate(i, orig_size - count);
+ res = iov_iter_alignment_vfs(i);
+ iov_iter_reexpand(i, orig_size);
+
+ return res;
+ }
+
+ res = iov_iter_alignment_vfs(i);
+ /* start address is page aligned */
+ if ((res & ~PAGE_MASK) == orig_size)
+ return PAGE_SIZE;
+
+ return res;
+}
+
/** direct IO pages */
struct ll_dio_pages {
struct cl_dio_aio *ldp_aio;
}
page->cp_sync_io = anchor;
- cl_2queue_add(queue, page);
+ if (inode && IS_ENCRYPTED(inode)) {
+ /* In case of Direct IO on encrypted file, we need to
+ * add a reference to the inode on the cl_page.
+ * This info is required by llcrypt to proceed
+ * to encryption/decryption.
+ * This is safe because we know these pages are private
+ * to the thread doing the Direct IO.
+ */
+ page->cp_inode = inode;
+ }
+ /* We keep the refcount from cl_page_find, so we don't need
+ * another one here
+ */
+ cl_2queue_add(queue, page, false);
/*
* Set page clip to tell transfer formation engine
* that page has to be sent even if it is beyond KMS.
*/
- cl_page_clip(env, page, 0, min(size, page_size));
+ if (size < page_size)
+ cl_page_clip(env, page, 0, size);
++io_pages;
- /* drop the reference count for cl_page_find */
- cl_page_put(env, page);
offset += page_size;
size -= page_size;
}
int iot = rw == READ ? CRT_READ : CRT_WRITE;
atomic_add(io_pages, &anchor->csi_sync_nr);
+ /*
+ * Avoid out-of-order execution of adding inflight
+ * modifications count and io submit.
+ */
+ smp_mb();
rc = cl_io_submit_rw(env, io, iot, queue);
if (rc == 0) {
cl_page_list_splice(&queue->c2_qout,
loff_t file_offset = iocb->ki_pos;
struct vvp_io *vio;
- /* if file is encrypted, return 0 so that we fall back to buffered IO */
- if (IS_ENCRYPTED(inode))
- return 0;
-
/* Check EOF by ourselves */
if (rw == READ && file_offset >= i_size_read(inode))
return 0;
/* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */
- if ((file_offset & ~PAGE_MASK) || (count & ~PAGE_MASK))
- return -EINVAL;
+ if (file_offset & ~PAGE_MASK)
+ RETURN(-EINVAL);
CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), "
"offset=%lld=%llx, pages %zd (max %lu)\n",
/* Check that all user buffers are aligned as well */
if (ll_iov_iter_alignment(iter) & ~PAGE_MASK)
- return -EINVAL;
+ RETURN(-EINVAL);
lcc = ll_cl_find(file);
if (lcc == NULL)
out:
aio->cda_bytes += tot_bytes;
- if (is_sync_kiocb(iocb)) {
- struct cl_sync_io *anchor = &aio->cda_sync;
- ssize_t rc2;
+ if (rw == WRITE)
+ vio->u.readwrite.vui_written += tot_bytes;
+ else
+ vio->u.readwrite.vui_read += tot_bytes;
- /**
- * @anchor was inited as 1 to prevent end_io to be
- * called before we add all pages for IO, so drop
- * one extra reference to make sure we could wait
- * count to be zero.
- */
- cl_sync_io_note(env, anchor, result);
+ /* We cannot do async submission - for AIO or regular DIO - unless
+ * lockless because it causes us to release the lock early.
+ *
+ * There are also several circumstances in which we must disable
+ * parallel DIO, so we check if it is enabled.
+ *
+ * The check for "is_sync_kiocb" excludes AIO, which does not need to
+ * be disabled in these situations.
+ */
+ if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio)) {
+ ssize_t rc2;
- rc2 = cl_sync_io_wait(env, anchor, 0);
+ /* Wait here rather than doing async submission */
+ rc2 = cl_sync_io_wait_recycle(env, &aio->cda_sync, 0, 0);
if (result == 0 && rc2)
result = rc2;
- /**
- * One extra reference again, as if @anchor is
- * reused we assume it as 1 before using.
- */
- atomic_add(1, &anchor->csi_sync_nr);
- if (result == 0) {
- /* no commit async for direct IO */
- vio->u.readwrite.vui_written += tot_bytes;
+
+ if (result == 0)
result = tot_bytes;
- }
- } else {
- if (rw == WRITE)
- vio->u.readwrite.vui_written += tot_bytes;
- else
- vio->u.readwrite.vui_read += tot_bytes;
+ } else if (result == 0) {
result = -EIOCBQUEUED;
}
GOTO(out, result = -EBUSY);
/**
- * Direct read can fall back to buffered read, but DIO is done
+ * Direct write can fall back to buffered read, but DIO is done
* with lockless i/o, and buffered requires LDLM locking, so
* in this case we must restart without lockless.
*/
- if (!io->ci_ignore_lockless) {
- io->ci_ignore_lockless = 1;
+ if (!io->ci_dio_lock) {
+ io->ci_dio_lock = 1;
io->ci_need_restart = 1;
GOTO(out, result = -ENOLCK);
}
lcc->lcc_page = NULL; /* page will be queued */
/* Add it into write queue */
- cl_page_list_add(plist, page);
+ cl_page_list_add(plist, page, true);
if (plist->pl_nr == 1) /* first page */
vio->u.readwrite.vui_from = from;
else