+ struct cl_page *page;
+ struct cl_2queue *queue = &io->ci_queue;
+ struct cl_object *obj = io->ci_obj;
+ struct cl_sync_io *anchor = &pv->ldp_aio->cda_sync;
+ loff_t offset = pv->ldp_file_offset;
+ int io_pages = 0;
+ size_t page_size = cl_page_size(obj);
+ int i;
+ ssize_t rc = 0;
+
+ ENTRY;
+
+ cl_2queue_init(queue);
+ for (i = 0; i < pv->ldp_count; i++) {
+ LASSERT(!(offset & (PAGE_SIZE - 1)));
+ page = cl_page_find(env, obj, cl_index(obj, offset),
+ pv->ldp_pages[i], CPT_TRANSIENT);
+ if (IS_ERR(page)) {
+ rc = PTR_ERR(page);
+ break;
+ }
+ LASSERT(page->cp_type == CPT_TRANSIENT);
+ rc = cl_page_own(env, io, page);
+ if (rc) {
+ cl_page_put(env, page);
+ break;
+ }
+
+ page->cp_sync_io = anchor;
+ if (inode && IS_ENCRYPTED(inode)) {
+ /* In case of Direct IO on encrypted file, we need to
+ * add a reference to the inode on the cl_page.
+ * This info is required by llcrypt to proceed
+ * to encryption/decryption.
+ * This is safe because we know these pages are private
+ * to the thread doing the Direct IO.
+ */
+ page->cp_inode = inode;
+ }
+ cl_2queue_add(queue, page);
+ /*
+ * Set page clip to tell transfer formation engine
+ * that page has to be sent even if it is beyond KMS.
+ */
+ cl_page_clip(env, page, 0, min(size, page_size));
+ ++io_pages;
+
+ /* drop the reference count for cl_page_find */
+ cl_page_put(env, page);
+ offset += page_size;
+ size -= page_size;
+ }
+ if (rc == 0 && io_pages > 0) {
+ int iot = rw == READ ? CRT_READ : CRT_WRITE;
+
+ atomic_add(io_pages, &anchor->csi_sync_nr);
+ /*
+ * Avoid out-of-order execution of adding inflight
+ * modifications count and io submit.
+ */
+ smp_mb();
+ rc = cl_io_submit_rw(env, io, iot, queue);
+ if (rc == 0) {
+ cl_page_list_splice(&queue->c2_qout,
+ &pv->ldp_aio->cda_pages);
+ } else {
+ atomic_add(-queue->c2_qin.pl_nr,
+ &anchor->csi_sync_nr);
+ cl_page_list_for_each(page, &queue->c2_qin)
+ page->cp_sync_io = NULL;
+ }
+ /* handle partially submitted reqs */
+ if (queue->c2_qin.pl_nr > 0) {
+ CERROR(DFID " failed to submit %d dio pages: %zd\n",
+ PFID(lu_object_fid(&obj->co_lu)),
+ queue->c2_qin.pl_nr, rc);
+ if (rc == 0)
+ rc = -EIO;
+ }
+ }
+
+ cl_2queue_discard(env, io, queue);
+ cl_2queue_disown(env, io, queue);
+ cl_2queue_fini(env, queue);
+ RETURN(rc);
+}
+
+#ifdef KMALLOC_MAX_SIZE
+#define MAX_MALLOC KMALLOC_MAX_SIZE
+#else
+#define MAX_MALLOC (128 * 1024)
+#endif
+
+/* This is the maximum size of a single O_DIRECT request, based on the
+ * kmalloc limit. We need to fit all of the brw_page structs, each one
+ * representing PAGE_SIZE worth of user data, into a single buffer, and
+ * then truncate this to be a full-sized RPC. For 4kB PAGE_SIZE this is
+ * up to 22MB for 128kB kmalloc and up to 682MB for 4MB kmalloc. */
+#define MAX_DIO_SIZE ((MAX_MALLOC / sizeof(struct brw_page) * PAGE_SIZE) & \
+ ~((size_t)DT_MAX_BRW_SIZE - 1))
+
+static ssize_t
+ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
+{
+ struct ll_cl_context *lcc;
+ const struct lu_env *env;
+ struct cl_io *io;
+ struct file *file = iocb->ki_filp;
+ struct inode *inode = file->f_mapping->host;
+ struct cl_dio_aio *aio;
+ size_t count = iov_iter_count(iter);
+ ssize_t tot_bytes = 0, result = 0;
+ loff_t file_offset = iocb->ki_pos;
+ struct vvp_io *vio;
+
+ /* Check EOF by ourselves */
+ if (rw == READ && file_offset >= i_size_read(inode))
+ return 0;
+
+ /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */
+ if (file_offset & ~PAGE_MASK)
+ RETURN(-EINVAL);
+
+ CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), "
+ "offset=%lld=%llx, pages %zd (max %lu)\n",
+ PFID(ll_inode2fid(inode)), inode, count, MAX_DIO_SIZE,
+ file_offset, file_offset, count >> PAGE_SHIFT,
+ MAX_DIO_SIZE >> PAGE_SHIFT);
+
+ /* Check that all user buffers are aligned as well */
+ if (ll_iov_iter_alignment(iter) & ~PAGE_MASK)
+ RETURN(-EINVAL);
+
+ lcc = ll_cl_find(file);
+ if (lcc == NULL)
+ RETURN(-EIO);
+
+ env = lcc->lcc_env;
+ LASSERT(!IS_ERR(env));
+ vio = vvp_env_io(env);
+ io = lcc->lcc_io;
+ LASSERT(io != NULL);
+
+ aio = io->ci_aio;
+ LASSERT(aio);
+ LASSERT(aio->cda_iocb == iocb);
+
+ while (iov_iter_count(iter)) {
+ struct ll_dio_pages pvec = { .ldp_aio = aio };
+ struct page **pages;
+
+ count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE);
+ if (rw == READ) {
+ if (file_offset >= i_size_read(inode))
+ break;
+
+ if (file_offset + count > i_size_read(inode))
+ count = i_size_read(inode) - file_offset;
+ }
+
+ result = ll_get_user_pages(rw, iter, &pages,
+ &pvec.ldp_count, count);
+ if (unlikely(result <= 0))
+ GOTO(out, result);
+
+ count = result;
+ pvec.ldp_file_offset = file_offset;
+ pvec.ldp_pages = pages;
+
+ result = ll_direct_rw_pages(env, io, count,
+ rw, inode, &pvec);
+ ll_free_user_pages(pages, pvec.ldp_count);
+
+ if (unlikely(result < 0))
+ GOTO(out, result);
+
+ iov_iter_advance(iter, count);
+ tot_bytes += count;
+ file_offset += count;
+ }