*/
int (*coo_page_init)(const struct lu_env *env, struct cl_object *obj,
struct cl_page *page, pgoff_t index);
+
/**
* Initialize lock slice for this layer. Called top-to-bottom through
* every object layer when a new cl_lock is instantiated. Layer
ra->cra_release(env, ra);
}
-
struct cl_dio_pages;
/**
return atomic_read(&header->loh_ref);
}
+
+ssize_t cl_dio_pages_init(const struct lu_env *env, struct cl_object *obj,
+ struct cl_dio_pages *cdp, struct iov_iter *iter,
+ int rw, size_t maxsize, loff_t offset,
+ bool unaligned);
+
/* cl_page */
struct cl_page *cl_page_find(const struct lu_env *env,
struct cl_object *obj,
struct cl_page **cdp_cl_pages;
struct cl_2queue cdp_queue;
- /** # of pages in the array. */
- size_t cdp_count;
/* the file offset of the first page. */
loff_t cdp_file_offset;
+ /** # of pages in the array. */
+ unsigned int cdp_page_count;
/* the first and last page can be incomplete, this records the
* offsets
*/
}
#endif /* HAVE_AOPS_RELEASE_FOLIO */
-static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter,
- struct cl_dio_pages *cdp,
- size_t maxsize)
-{
-#if defined(HAVE_DIO_ITER)
- size_t start;
- size_t result;
-
- result = iov_iter_get_pages_alloc2(iter, &cdp->cdp_pages, maxsize,
- &start);
- if (result > 0) {
- cdp->cdp_count = DIV_ROUND_UP(result + start, PAGE_SIZE);
- if (user_backed_iter(iter))
- iov_iter_revert(iter, result);
- }
- return result;
-#else
- unsigned long addr;
- size_t page_count;
- size_t size;
- long result;
-
- if (!maxsize)
- return 0;
-
- if (!iter->nr_segs)
- return 0;
-
- addr = (unsigned long)iter->iov->iov_base + iter->iov_offset;
- if (addr & ~PAGE_MASK)
- return -EINVAL;
-
- size = min_t(size_t, maxsize, iter->iov->iov_len);
- page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT;
- OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, page_count);
- if (cdp->cdp_pages == NULL)
- return -ENOMEM;
-
- mmap_read_lock(current->mm);
- result = get_user_pages(current, current->mm, addr, page_count,
- rw == READ, 0, cdp->cdp_pages, NULL);
- mmap_read_unlock(current->mm);
-
- if (unlikely(result != page_count)) {
- ll_release_user_pages(cdp->cdp_pages, page_count);
- cdp->cdp_pages = NULL;
-
- if (result >= 0)
- return -EFAULT;
-
- return result;
- }
- cdp->cdp_count = page_count;
-
- return size;
-#endif
-}
-
/* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */
#if defined(HAVE_DIO_ITER)
static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
int iot = rw == READ ? CRT_READ : CRT_WRITE;
loff_t offset = cdp->cdp_file_offset;
ssize_t rc = 0;
- int i = 0;
+ unsigned int i = 0;
ENTRY;
- cdp->cdp_from = offset & ~PAGE_MASK;
- cdp->cdp_to = (offset + size) & ~PAGE_MASK;
-
- /* this is a special temporary allocation which lets us track the
- * cl_pages and convert them to a list
- *
- * this is used in 'pushing down' the conversion to a page queue
- */
- OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_cl_pages, cdp->cdp_count);
- if (!cdp->cdp_cl_pages)
- GOTO(out, rc = -ENOMEM);
-
while (size > 0) {
size_t from = offset & ~PAGE_MASK;
size_t to = min(from + size, PAGE_SIZE);
/* on success, we should hit every page in the cdp and have no bytes
* left in 'size'
*/
- LASSERT(i == cdp->cdp_count);
+ LASSERT(i == cdp->cdp_page_count);
LASSERT(size == 0);
- atomic_add(cdp->cdp_count, &anchor->csi_sync_nr);
+ atomic_add(cdp->cdp_page_count, &anchor->csi_sync_nr);
/*
* Avoid out-of-order execution of adding inflight
* modifications count and io submit.
smp_mb();
rc = cl_dio_submit_rw(env, io, iot, cdp);
if (rc != 0) {
- atomic_add(-cdp->cdp_count,
+ atomic_add(-cdp->cdp_page_count,
&anchor->csi_sync_nr);
- for (i = 0; i < cdp->cdp_count; i++) {
+ for (i = 0; i < cdp->cdp_page_count; i++) {
page = cdp->cdp_cl_pages[i];
page->cp_sync_io = NULL;
}
struct inode *inode = file->f_mapping->host;
struct cl_dio_aio *ll_dio_aio;
struct cl_sub_dio *sdio;
- size_t count = iov_iter_count(iter);
+ size_t bytes = iov_iter_count(iter);
ssize_t tot_bytes = 0, result = 0;
loff_t file_offset = iocb->ki_pos;
bool sync_submit = false;
CDEBUG(D_VFSTRACE,
"VFS Op:inode="DFID"(%p), size=%zd (max %lu), offset=%lld=%#llx, pages %zd (max %lu)%s%s%s%s\n",
- PFID(ll_inode2fid(inode)), inode, count, MAX_DIO_SIZE,
+ PFID(ll_inode2fid(inode)), inode, bytes, MAX_DIO_SIZE,
file_offset, file_offset,
- (count >> PAGE_SHIFT) + !!(count & ~PAGE_MASK),
+ (bytes >> PAGE_SHIFT) + !!(bytes & ~PAGE_MASK),
MAX_DIO_SIZE >> PAGE_SHIFT,
io->ci_dio_lock ? ", locked" : ", lockless",
io->ci_parallel_dio ? ", parallel" : "",
while (iov_iter_count(iter)) {
struct cl_dio_pages *cdp;
- count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE);
+ bytes = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE);
if (rw == READ) {
if (file_offset >= i_size_read(inode))
break;
- if (file_offset + count > i_size_read(inode))
- count = i_size_read(inode) - file_offset;
+ if (file_offset + bytes > i_size_read(inode))
+ bytes = i_size_read(inode) - file_offset;
}
/* if we are doing sync_submit, then we free this below,
cdp = &sdio->csd_dio_pages;
cdp->cdp_file_offset = file_offset;
-
- if (!unaligned) {
- result = ll_get_user_pages(rw, iter, cdp, count);
- /* ll_get_user_pages returns bytes in the IO or error*/
- count = result;
- } else {
- /* explictly handle the ubuf() case for el9.4 */
- size_t len = iter_is_ubuf(iter) ? iov_iter_count(iter)
- : iter_iov(iter)->iov_len;
-
- /* same calculation used in ll_get_user_pages */
- count = min_t(size_t, count, len);
- result = ll_allocate_dio_buffer(cdp, count);
- /* allocate_dio_buffer returns number of pages or
- * error, so do not set count = result
- */
- }
-
- /* now we have the actual count, so store it in the sdio */
- sdio->csd_bytes = count;
-
+ result = cl_dio_pages_init(env, ll_dio_aio->cda_obj, cdp,
+ iter, rw, bytes, file_offset,
+ unaligned);
if (unlikely(result <= 0)) {
cl_sync_io_note(env, &sdio->csd_sync, result);
if (sync_submit) {
}
GOTO(out, result);
}
+ /* now we have the actual bytes, so store it in the sdio */
+ bytes = result;
+ sdio->csd_bytes = bytes;
- result = ll_direct_rw_pages(env, io, count, rw, inode, sdio);
+ result = ll_direct_rw_pages(env, io, bytes, rw, inode, sdio);
/* if the i/o was unsuccessful, we zero the number of bytes to
* copy back. Note that partial I/O completion isn't possible
* here - I/O either completes or fails. So there's no need to
if (unlikely(result < 0))
GOTO(out, result);
- iov_iter_advance(iter, count);
+ iov_iter_advance(iter, bytes);
- tot_bytes += count;
- file_offset += count;
+ tot_bytes += bytes;
+ file_offset += bytes;
CDEBUG(D_VFSTRACE,
"result %zd tot_bytes %zd count %zd file_offset %lld\n",
- result, tot_bytes, count, file_offset);
+ result, tot_bytes, bytes, file_offset);
}
out:
cl_2queue_init(&cdp->cdp_queue);
- for (i = 0; i < cdp->cdp_count; i++) {
+ for (i = 0; i < cdp->cdp_page_count; i++) {
struct cl_page *page = cdp->cdp_cl_pages[i];
cl_page_list_add(&cdp->cdp_queue.c2_qin, page, false);
ENTRY;
if (cdp->cdp_cl_pages) {
- for (i = 0; i < cdp->cdp_count; i++) {
+ for (i = 0; i < cdp->cdp_page_count; i++) {
struct cl_page *page = cdp->cdp_cl_pages[i];
/* if we failed allocating pages, the page array may be
* incomplete, so check the pointers
array_incomplete = true;
}
OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_cl_pages,
- cdp->cdp_count);
+ cdp->cdp_page_count);
}
if (sdio->csd_unaligned) {
/* unaligned DIO does not get user pages, so it doesn't have to
* release them, but aligned I/O must
*/
- ll_release_user_pages(cdp->cdp_pages, cdp->cdp_count);
+ ll_release_user_pages(cdp->cdp_pages, cdp->cdp_page_count);
}
cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret);
* io_size, making the rest of the calculation aligned
*/
if (pg_offset) {
- cdp->cdp_count++;
+ cdp->cdp_page_count++;
io_size -= min_t(size_t, PAGE_SIZE - pg_offset, io_size);
}
/* calculate pages for the rest of the buffer */
- cdp->cdp_count += (io_size + PAGE_SIZE - 1) >> PAGE_SHIFT;
+ cdp->cdp_page_count += (io_size + PAGE_SIZE - 1) >> PAGE_SHIFT;
#ifdef HAVE_DIO_ITER
- cdp->cdp_pages = kvzalloc(cdp->cdp_count * sizeof(struct page *),
+ cdp->cdp_pages = kvzalloc(cdp->cdp_page_count * sizeof(struct page *),
GFP_NOFS);
#else
- OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_count);
+ OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_page_count);
#endif
if (cdp->cdp_pages == NULL)
GOTO(out, result = -ENOMEM);
- result = obd_pool_get_pages_array(cdp->cdp_pages, cdp->cdp_count);
+ result = obd_pool_get_pages_array(cdp->cdp_pages, cdp->cdp_page_count);
if (result)
GOTO(out, result);
}
if (result == 0)
- result = cdp->cdp_count;
+ result = cdp->cdp_page_count;
RETURN(result);
}
void ll_free_dio_buffer(struct cl_dio_pages *cdp)
{
- obd_pool_put_pages_array(cdp->cdp_pages, cdp->cdp_count);
+ obd_pool_put_pages_array(cdp->cdp_pages, cdp->cdp_page_count);
#ifdef HAVE_DIO_ITER
kvfree(cdp->cdp_pages);
#else
- OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_count);
+ OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_page_count);
#endif
}
EXPORT_SYMBOL(ll_free_dio_buffer);
int short_copies = 0;
bool mm_used = false;
bool locked = false;
+ unsigned int i = 0;
int status = 0;
- int i = 0;
int rw;
ENTRY;
size_t copied; /* bytes successfully copied */
size_t bytes; /* bytes to copy for this page */
- LASSERT(i < cdp->cdp_count);
+ LASSERT(i < cdp->cdp_page_count);
offset = pos & ~PAGE_MASK;
bytes = min_t(unsigned long, PAGE_SIZE - offset, count);
CDEBUG(D_VFSTRACE,
- "count %zd, offset %lu, pos %lld, cdp_count %lu\n",
- count, offset, pos, cdp->cdp_count);
+ "count %zd, offset %lu, pos %lld, cdp_page_count %u\n",
+ count, offset, pos, cdp->cdp_page_count);
if (fatal_signal_pending(current)) {
status = -EINTR;
sdio->csd_write_copied = true;
/* if we complete successfully, we should reach all of the pages */
- LASSERTF(ergo(status == 0, i == cdp->cdp_count - 1),
- "status: %d, i: %d, cdp->cdp_count %zu, count %zu\n",
- status, i, cdp->cdp_count, count);
+ LASSERTF(ergo(status == 0, i == cdp->cdp_page_count - 1),
+ "status: %d, i: %d, cdp->cdp_page_count %u, count %zu\n",
+ status, i, cdp->cdp_page_count, count);
out:
if (mm_used)
slice = cl_page_slice_get(cl_page, i); i >= 0; \
slice = cl_page_slice_get(cl_page, --i))
+/* does the work required to access the pages in the iov, be they userspace
+ * or kernel
+ *
+ * returns number of bytes
+ */
+static ssize_t ll_get_iov_memory(int rw, struct iov_iter *iter,
+ struct cl_dio_pages *cdp,
+ size_t maxsize)
+{
+#if defined(HAVE_DIO_ITER)
+ size_t start;
+ size_t bytes;
+
+ bytes = iov_iter_get_pages_alloc2(iter, &cdp->cdp_pages, maxsize,
+ &start);
+ if (bytes > 0) {
+ cdp->cdp_page_count = DIV_ROUND_UP(bytes + start, PAGE_SIZE);
+ if (user_backed_iter(iter))
+ iov_iter_revert(iter, bytes);
+ }
+ return bytes;
+#else
+ unsigned int page_count;
+ unsigned long addr;
+ size_t size;
+ long result;
+
+ if (!maxsize)
+ return 0;
+
+ if (!iter->nr_segs)
+ return 0;
+
+ addr = (unsigned long)iter->iov->iov_base + iter->iov_offset;
+ if (addr & ~PAGE_MASK)
+ return -EINVAL;
+
+ size = min_t(size_t, maxsize, iter->iov->iov_len);
+ page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT;
+ OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, page_count);
+ if (cdp->cdp_pages == NULL)
+ return -ENOMEM;
+
+ mmap_read_lock(current->mm);
+ result = get_user_pages(current, current->mm, addr, page_count,
+ rw == READ, 0, cdp->cdp_pages, NULL);
+ mmap_read_unlock(current->mm);
+
+ if (unlikely(result != page_count)) {
+ ll_release_user_pages(cdp->cdp_pages, page_count);
+ cdp->cdp_pages = NULL;
+
+ if (result >= 0)
+ return -EFAULT;
+
+ /* if result < 0, return the error */
+ return result;
+ }
+ cdp->cdp_page_count = page_count;
+
+ return size;
+#endif
+}
+
+ssize_t cl_dio_pages_init(const struct lu_env *env, struct cl_object *obj,
+ struct cl_dio_pages *cdp, struct iov_iter *iter,
+ int rw, size_t bytes, loff_t offset, bool unaligned)
+{
+ ssize_t result = 0;
+
+ ENTRY;
+
+ cdp->cdp_file_offset = offset;
+ cdp->cdp_from = offset & ~PAGE_MASK;
+ cdp->cdp_to = ((offset + bytes - 1) & ~PAGE_MASK);
+
+ /* these set cdp->page_count, which is used in coo_dio_pages_init */
+ if (!unaligned) {
+ result = ll_get_iov_memory(rw, iter, cdp, bytes);
+ /* ll_get_iov_memory returns bytes in the IO or error*/
+ bytes = result;
+ } else {
+ /* explictly handle the ubuf() case for el9.4 */
+ size_t len = iter_is_ubuf(iter) ? iov_iter_count(iter)
+ : iter_iov(iter)->iov_len;
+
+ /* same calculation used in ll_get_user_pages */
+ bytes = min_t(size_t, bytes, len);
+ result = ll_allocate_dio_buffer(cdp, bytes);
+ /* allocate_dio_buffer returns number of pages or
+ * error, so do not set bytes = result
+ */
+ if (result > 0)
+ result = 0;
+ }
+ if (result < 0)
+ GOTO(out, result);
+ LASSERT(cdp->cdp_page_count);
+ /* this is special temporary allocation which lets us track the
+ * cl_pages and convert them to a list
+ *
+ * this is used in 'pushing down' the conversion to a page queue
+ */
+ OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_cl_pages, cdp->cdp_page_count);
+ if (!cdp->cdp_cl_pages)
+ GOTO(out, result = -ENOMEM);
+
+out:
+ if (result >= 0)
+ result = bytes;
+ RETURN(result);
+}
+EXPORT_SYMBOL(cl_dio_pages_init);
+
static void __cl_page_free(struct cl_page *cl_page, unsigned short bufsize)
{
if (cl_page->cp_in_kmem_array) {