From: Patrick Farrell Date: Wed, 26 Mar 2025 21:09:14 +0000 (-0400) Subject: LU-13814 clio: add cl_dio_pages_init X-Git-Tag: 2.16.55~82 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=8c0d073c1746329b986afdf303dea7787a6fb42d;p=fs%2Flustre-release.git LU-13814 clio: add cl_dio_pages_init Just like the cl_page it's replacing, the cl_dio_pages struct needs various pieces of information from the different layers of the cl_object in order to do the IO. This means we need a cl_dio_pages init, analogous to cl_page_alloc and coo_pages_init. Note this does not implement coo_pages_init for any layers, it just moves parts of the existing init code. coo_dio_pages_init will be implemented in the next patch. Signed-off-by: Patrick Farrell Change-Id: I1fcf407b16d4077d94c7ba5afbc63bdd3fb3dfb4 Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/52109 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Oleg Drokin Reviewed-by: Andreas Dilger Reviewed-by: Shaun Tancheff --- diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index d05d302..108c3f7 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -313,6 +313,7 @@ struct cl_object_operations { */ int (*coo_page_init)(const struct lu_env *env, struct cl_object *obj, struct cl_page *page, pgoff_t index); + /** * Initialize lock slice for this layer. Called top-to-bottom through * every object layer when a new cl_lock is instantiated. Layer @@ -1415,7 +1416,6 @@ static inline void cl_read_ahead_release(const struct lu_env *env, ra->cra_release(env, ra); } - struct cl_dio_pages; /** @@ -2197,6 +2197,12 @@ static inline int cl_object_refc(struct cl_object *clob) return atomic_read(&header->loh_ref); } + +ssize_t cl_dio_pages_init(const struct lu_env *env, struct cl_object *obj, + struct cl_dio_pages *cdp, struct iov_iter *iter, + int rw, size_t maxsize, loff_t offset, + bool unaligned); + /* cl_page */ struct cl_page *cl_page_find(const struct lu_env *env, struct cl_object *obj, @@ -2539,10 +2545,10 @@ struct cl_dio_pages { struct cl_page **cdp_cl_pages; struct cl_2queue cdp_queue; - /** # of pages in the array. */ - size_t cdp_count; /* the file offset of the first page. */ loff_t cdp_file_offset; + /** # of pages in the array. */ + unsigned int cdp_page_count; /* the first and last page can be incomplete, this records the * offsets */ diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index 2525d04..d5e2f43 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -230,64 +230,6 @@ static int ll_releasepage(struct page *vmpage, RELEASEPAGE_ARG_TYPE gfp_mask) } #endif /* HAVE_AOPS_RELEASE_FOLIO */ -static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter, - struct cl_dio_pages *cdp, - size_t maxsize) -{ -#if defined(HAVE_DIO_ITER) - size_t start; - size_t result; - - result = iov_iter_get_pages_alloc2(iter, &cdp->cdp_pages, maxsize, - &start); - if (result > 0) { - cdp->cdp_count = DIV_ROUND_UP(result + start, PAGE_SIZE); - if (user_backed_iter(iter)) - iov_iter_revert(iter, result); - } - return result; -#else - unsigned long addr; - size_t page_count; - size_t size; - long result; - - if (!maxsize) - return 0; - - if (!iter->nr_segs) - return 0; - - addr = (unsigned long)iter->iov->iov_base + iter->iov_offset; - if (addr & ~PAGE_MASK) - return -EINVAL; - - size = min_t(size_t, maxsize, iter->iov->iov_len); - page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT; - OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, page_count); - if (cdp->cdp_pages == NULL) - return -ENOMEM; - - mmap_read_lock(current->mm); - result = get_user_pages(current, current->mm, addr, page_count, - rw == READ, 0, cdp->cdp_pages, NULL); - mmap_read_unlock(current->mm); - - if (unlikely(result != page_count)) { - ll_release_user_pages(cdp->cdp_pages, page_count); - cdp->cdp_pages = NULL; - - if (result >= 0) - return -EFAULT; - - return result; - } - cdp->cdp_count = page_count; - - return size; -#endif -} - /* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */ #if defined(HAVE_DIO_ITER) static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i) @@ -368,22 +310,10 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, int iot = rw == READ ? CRT_READ : CRT_WRITE; loff_t offset = cdp->cdp_file_offset; ssize_t rc = 0; - int i = 0; + unsigned int i = 0; ENTRY; - cdp->cdp_from = offset & ~PAGE_MASK; - cdp->cdp_to = (offset + size) & ~PAGE_MASK; - - /* this is a special temporary allocation which lets us track the - * cl_pages and convert them to a list - * - * this is used in 'pushing down' the conversion to a page queue - */ - OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_cl_pages, cdp->cdp_count); - if (!cdp->cdp_cl_pages) - GOTO(out, rc = -ENOMEM); - while (size > 0) { size_t from = offset & ~PAGE_MASK; size_t to = min(from + size, PAGE_SIZE); @@ -422,10 +352,10 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, /* on success, we should hit every page in the cdp and have no bytes * left in 'size' */ - LASSERT(i == cdp->cdp_count); + LASSERT(i == cdp->cdp_page_count); LASSERT(size == 0); - atomic_add(cdp->cdp_count, &anchor->csi_sync_nr); + atomic_add(cdp->cdp_page_count, &anchor->csi_sync_nr); /* * Avoid out-of-order execution of adding inflight * modifications count and io submit. @@ -433,9 +363,9 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, smp_mb(); rc = cl_dio_submit_rw(env, io, iot, cdp); if (rc != 0) { - atomic_add(-cdp->cdp_count, + atomic_add(-cdp->cdp_page_count, &anchor->csi_sync_nr); - for (i = 0; i < cdp->cdp_count; i++) { + for (i = 0; i < cdp->cdp_page_count; i++) { page = cdp->cdp_cl_pages[i]; page->cp_sync_io = NULL; } @@ -472,7 +402,7 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) struct inode *inode = file->f_mapping->host; struct cl_dio_aio *ll_dio_aio; struct cl_sub_dio *sdio; - size_t count = iov_iter_count(iter); + size_t bytes = iov_iter_count(iter); ssize_t tot_bytes = 0, result = 0; loff_t file_offset = iocb->ki_pos; bool sync_submit = false; @@ -499,9 +429,9 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), offset=%lld=%#llx, pages %zd (max %lu)%s%s%s%s\n", - PFID(ll_inode2fid(inode)), inode, count, MAX_DIO_SIZE, + PFID(ll_inode2fid(inode)), inode, bytes, MAX_DIO_SIZE, file_offset, file_offset, - (count >> PAGE_SHIFT) + !!(count & ~PAGE_MASK), + (bytes >> PAGE_SHIFT) + !!(bytes & ~PAGE_MASK), MAX_DIO_SIZE >> PAGE_SHIFT, io->ci_dio_lock ? ", locked" : ", lockless", io->ci_parallel_dio ? ", parallel" : "", @@ -566,13 +496,13 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) while (iov_iter_count(iter)) { struct cl_dio_pages *cdp; - count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE); + bytes = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE); if (rw == READ) { if (file_offset >= i_size_read(inode)) break; - if (file_offset + count > i_size_read(inode)) - count = i_size_read(inode) - file_offset; + if (file_offset + bytes > i_size_read(inode)) + bytes = i_size_read(inode) - file_offset; } /* if we are doing sync_submit, then we free this below, @@ -586,27 +516,9 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) cdp = &sdio->csd_dio_pages; cdp->cdp_file_offset = file_offset; - - if (!unaligned) { - result = ll_get_user_pages(rw, iter, cdp, count); - /* ll_get_user_pages returns bytes in the IO or error*/ - count = result; - } else { - /* explictly handle the ubuf() case for el9.4 */ - size_t len = iter_is_ubuf(iter) ? iov_iter_count(iter) - : iter_iov(iter)->iov_len; - - /* same calculation used in ll_get_user_pages */ - count = min_t(size_t, count, len); - result = ll_allocate_dio_buffer(cdp, count); - /* allocate_dio_buffer returns number of pages or - * error, so do not set count = result - */ - } - - /* now we have the actual count, so store it in the sdio */ - sdio->csd_bytes = count; - + result = cl_dio_pages_init(env, ll_dio_aio->cda_obj, cdp, + iter, rw, bytes, file_offset, + unaligned); if (unlikely(result <= 0)) { cl_sync_io_note(env, &sdio->csd_sync, result); if (sync_submit) { @@ -615,8 +527,11 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) } GOTO(out, result); } + /* now we have the actual bytes, so store it in the sdio */ + bytes = result; + sdio->csd_bytes = bytes; - result = ll_direct_rw_pages(env, io, count, rw, inode, sdio); + result = ll_direct_rw_pages(env, io, bytes, rw, inode, sdio); /* if the i/o was unsuccessful, we zero the number of bytes to * copy back. Note that partial I/O completion isn't possible * here - I/O either completes or fails. So there's no need to @@ -646,13 +561,13 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) if (unlikely(result < 0)) GOTO(out, result); - iov_iter_advance(iter, count); + iov_iter_advance(iter, bytes); - tot_bytes += count; - file_offset += count; + tot_bytes += bytes; + file_offset += bytes; CDEBUG(D_VFSTRACE, "result %zd tot_bytes %zd count %zd file_offset %lld\n", - result, tot_bytes, count, file_offset); + result, tot_bytes, bytes, file_offset); } out: diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index 11cb6f1..38513b1 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -1261,7 +1261,7 @@ void cl_dio_pages_2queue(struct cl_dio_pages *cdp) cl_2queue_init(&cdp->cdp_queue); - for (i = 0; i < cdp->cdp_count; i++) { + for (i = 0; i < cdp->cdp_page_count; i++) { struct cl_page *page = cdp->cdp_cl_pages[i]; cl_page_list_add(&cdp->cdp_queue.c2_qin, page, false); @@ -1301,7 +1301,7 @@ static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor) ENTRY; if (cdp->cdp_cl_pages) { - for (i = 0; i < cdp->cdp_count; i++) { + for (i = 0; i < cdp->cdp_page_count; i++) { struct cl_page *page = cdp->cdp_cl_pages[i]; /* if we failed allocating pages, the page array may be * incomplete, so check the pointers @@ -1318,7 +1318,7 @@ static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor) array_incomplete = true; } OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_cl_pages, - cdp->cdp_count); + cdp->cdp_page_count); } if (sdio->csd_unaligned) { @@ -1340,7 +1340,7 @@ static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor) /* unaligned DIO does not get user pages, so it doesn't have to * release them, but aligned I/O must */ - ll_release_user_pages(cdp->cdp_pages, cdp->cdp_count); + ll_release_user_pages(cdp->cdp_pages, cdp->cdp_page_count); } cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret); @@ -1486,23 +1486,23 @@ int ll_allocate_dio_buffer(struct cl_dio_pages *cdp, size_t io_size) * io_size, making the rest of the calculation aligned */ if (pg_offset) { - cdp->cdp_count++; + cdp->cdp_page_count++; io_size -= min_t(size_t, PAGE_SIZE - pg_offset, io_size); } /* calculate pages for the rest of the buffer */ - cdp->cdp_count += (io_size + PAGE_SIZE - 1) >> PAGE_SHIFT; + cdp->cdp_page_count += (io_size + PAGE_SIZE - 1) >> PAGE_SHIFT; #ifdef HAVE_DIO_ITER - cdp->cdp_pages = kvzalloc(cdp->cdp_count * sizeof(struct page *), + cdp->cdp_pages = kvzalloc(cdp->cdp_page_count * sizeof(struct page *), GFP_NOFS); #else - OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_count); + OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_page_count); #endif if (cdp->cdp_pages == NULL) GOTO(out, result = -ENOMEM); - result = obd_pool_get_pages_array(cdp->cdp_pages, cdp->cdp_count); + result = obd_pool_get_pages_array(cdp->cdp_pages, cdp->cdp_page_count); if (result) GOTO(out, result); @@ -1513,7 +1513,7 @@ out: } if (result == 0) - result = cdp->cdp_count; + result = cdp->cdp_page_count; RETURN(result); } @@ -1521,12 +1521,12 @@ EXPORT_SYMBOL(ll_allocate_dio_buffer); void ll_free_dio_buffer(struct cl_dio_pages *cdp) { - obd_pool_put_pages_array(cdp->cdp_pages, cdp->cdp_count); + obd_pool_put_pages_array(cdp->cdp_pages, cdp->cdp_page_count); #ifdef HAVE_DIO_ITER kvfree(cdp->cdp_pages); #else - OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_count); + OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_page_count); #endif } EXPORT_SYMBOL(ll_free_dio_buffer); @@ -1583,8 +1583,8 @@ static ssize_t __ll_dio_user_copy(struct cl_sub_dio *sdio) int short_copies = 0; bool mm_used = false; bool locked = false; + unsigned int i = 0; int status = 0; - int i = 0; int rw; ENTRY; @@ -1644,14 +1644,14 @@ static ssize_t __ll_dio_user_copy(struct cl_sub_dio *sdio) size_t copied; /* bytes successfully copied */ size_t bytes; /* bytes to copy for this page */ - LASSERT(i < cdp->cdp_count); + LASSERT(i < cdp->cdp_page_count); offset = pos & ~PAGE_MASK; bytes = min_t(unsigned long, PAGE_SIZE - offset, count); CDEBUG(D_VFSTRACE, - "count %zd, offset %lu, pos %lld, cdp_count %lu\n", - count, offset, pos, cdp->cdp_count); + "count %zd, offset %lu, pos %lld, cdp_page_count %u\n", + count, offset, pos, cdp->cdp_page_count); if (fatal_signal_pending(current)) { status = -EINTR; @@ -1720,9 +1720,9 @@ static ssize_t __ll_dio_user_copy(struct cl_sub_dio *sdio) sdio->csd_write_copied = true; /* if we complete successfully, we should reach all of the pages */ - LASSERTF(ergo(status == 0, i == cdp->cdp_count - 1), - "status: %d, i: %d, cdp->cdp_count %zu, count %zu\n", - status, i, cdp->cdp_count, count); + LASSERTF(ergo(status == 0, i == cdp->cdp_page_count - 1), + "status: %d, i: %d, cdp->cdp_page_count %u, count %zu\n", + status, i, cdp->cdp_page_count, count); out: if (mm_used) diff --git a/lustre/obdclass/cl_page.c b/lustre/obdclass/cl_page.c index ba371e3..652f985 100644 --- a/lustre/obdclass/cl_page.c +++ b/lustre/obdclass/cl_page.c @@ -127,6 +127,120 @@ cl_page_slice_get(const struct cl_page *cl_page, int index) slice = cl_page_slice_get(cl_page, i); i >= 0; \ slice = cl_page_slice_get(cl_page, --i)) +/* does the work required to access the pages in the iov, be they userspace + * or kernel + * + * returns number of bytes + */ +static ssize_t ll_get_iov_memory(int rw, struct iov_iter *iter, + struct cl_dio_pages *cdp, + size_t maxsize) +{ +#if defined(HAVE_DIO_ITER) + size_t start; + size_t bytes; + + bytes = iov_iter_get_pages_alloc2(iter, &cdp->cdp_pages, maxsize, + &start); + if (bytes > 0) { + cdp->cdp_page_count = DIV_ROUND_UP(bytes + start, PAGE_SIZE); + if (user_backed_iter(iter)) + iov_iter_revert(iter, bytes); + } + return bytes; +#else + unsigned int page_count; + unsigned long addr; + size_t size; + long result; + + if (!maxsize) + return 0; + + if (!iter->nr_segs) + return 0; + + addr = (unsigned long)iter->iov->iov_base + iter->iov_offset; + if (addr & ~PAGE_MASK) + return -EINVAL; + + size = min_t(size_t, maxsize, iter->iov->iov_len); + page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT; + OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, page_count); + if (cdp->cdp_pages == NULL) + return -ENOMEM; + + mmap_read_lock(current->mm); + result = get_user_pages(current, current->mm, addr, page_count, + rw == READ, 0, cdp->cdp_pages, NULL); + mmap_read_unlock(current->mm); + + if (unlikely(result != page_count)) { + ll_release_user_pages(cdp->cdp_pages, page_count); + cdp->cdp_pages = NULL; + + if (result >= 0) + return -EFAULT; + + /* if result < 0, return the error */ + return result; + } + cdp->cdp_page_count = page_count; + + return size; +#endif +} + +ssize_t cl_dio_pages_init(const struct lu_env *env, struct cl_object *obj, + struct cl_dio_pages *cdp, struct iov_iter *iter, + int rw, size_t bytes, loff_t offset, bool unaligned) +{ + ssize_t result = 0; + + ENTRY; + + cdp->cdp_file_offset = offset; + cdp->cdp_from = offset & ~PAGE_MASK; + cdp->cdp_to = ((offset + bytes - 1) & ~PAGE_MASK); + + /* these set cdp->page_count, which is used in coo_dio_pages_init */ + if (!unaligned) { + result = ll_get_iov_memory(rw, iter, cdp, bytes); + /* ll_get_iov_memory returns bytes in the IO or error*/ + bytes = result; + } else { + /* explictly handle the ubuf() case for el9.4 */ + size_t len = iter_is_ubuf(iter) ? iov_iter_count(iter) + : iter_iov(iter)->iov_len; + + /* same calculation used in ll_get_user_pages */ + bytes = min_t(size_t, bytes, len); + result = ll_allocate_dio_buffer(cdp, bytes); + /* allocate_dio_buffer returns number of pages or + * error, so do not set bytes = result + */ + if (result > 0) + result = 0; + } + if (result < 0) + GOTO(out, result); + LASSERT(cdp->cdp_page_count); + /* this is special temporary allocation which lets us track the + * cl_pages and convert them to a list + * + * this is used in 'pushing down' the conversion to a page queue + */ + OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_cl_pages, cdp->cdp_page_count); + if (!cdp->cdp_cl_pages) + GOTO(out, result = -ENOMEM); + +out: + if (result >= 0) + result = bytes; + RETURN(result); +} +EXPORT_SYMBOL(cl_dio_pages_init); + static void __cl_page_free(struct cl_page *cl_page, unsigned short bufsize) { if (cl_page->cp_in_kmem_array) {