From 1fb36699d2b71b41e78f15bcc5eb99976880af8d Mon Sep 17 00:00:00 2001 From: Patrick Farrell Date: Fri, 12 Jul 2024 11:58:13 -0400 Subject: [PATCH] LU-13814 clio: add cdp_cl_pages tracking This enables us start the lengthy process of converting the DIO path from using lists of cl_pages to using a single cl_dio_pages array. We add a temporary cdp_cl_pages array tracking the cl_pages and a function to convert it to a queue. Subsequent patches will push this through the DIO path, gradually converting each step to use cl_dio_pages until there are no steps left which use lists of cl_page. This moves the queue usage to the cl_dio_pages queue for ll_direct_rw_pages, the first step. Signed-off-by: Patrick Farrell Change-Id: If7cb5cbb01afd96939b05124358345a0d518c180 Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/52103 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Marc Vef Reviewed-by: Oleg Drokin --- lustre/include/cl_object.h | 6 +++++- lustre/llite/rw26.c | 36 ++++++++++++++++++++++------------ lustre/obdclass/cl_io.c | 48 ++++++++++++++++++++++++++++++++++++---------- 3 files changed, 67 insertions(+), 23 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index fc9407c..b8987bd 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -2462,6 +2462,7 @@ void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj, struct cl_sync_io; struct cl_dio_aio; struct cl_sub_dio; +struct cl_dio_pages; typedef void (cl_sync_io_end_t)(const struct lu_env *, struct cl_sync_io *); @@ -2474,6 +2475,7 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor, int ioret); int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor, long timeout, int ioret); +void cl_dio_pages_2queue(struct cl_dio_pages *ldp); struct cl_dio_aio *cl_dio_aio_alloc(struct kiocb *iocb, struct cl_object *obj, bool is_aio); struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio, @@ -2514,6 +2516,9 @@ struct cl_dio_pages { * pages, but for unaligned i/o, this is the internal buffer */ struct page **cdp_pages; + + struct cl_page **cdp_cl_pages; + struct cl_2queue cdp_queue; /** # of pages in the array. */ size_t cdp_count; /* the file offset of the first page. */ @@ -2547,7 +2552,6 @@ struct cl_iter_dup { */ struct cl_sub_dio { struct cl_sync_io csd_sync; - struct cl_page_list csd_pages; ssize_t csd_bytes; struct cl_dio_aio *csd_ll_aio; struct cl_dio_pages csd_dio_pages; diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index b37e5a0..bfcaa24 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -363,8 +363,8 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, { struct cl_dio_pages *cdp = &sdio->csd_dio_pages; struct cl_sync_io *anchor = &sdio->csd_sync; - struct cl_2queue *queue = &io->ci_queue; struct cl_object *obj = io->ci_obj; + struct cl_2queue *queue = NULL; struct cl_page *page; int iot = rw == READ ? CRT_READ : CRT_WRITE; loff_t offset = cdp->cdp_file_offset; @@ -377,7 +377,15 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, cdp->cdp_from = offset & ~PAGE_MASK; cdp->cdp_to = (offset + size) & ~PAGE_MASK; - cl_2queue_init(queue); + /* this is a special temporary allocation which lets us track the + * cl_pages and convert them to a list + * + * this is used in 'pushing down' the conversion to a page queue + */ + OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_cl_pages, cdp->cdp_count); + if (!cdp->cdp_cl_pages) + GOTO(out, rc = -ENOMEM); + while (size > 0) { size_t from = offset & ~PAGE_MASK; size_t to = min(from + size, PAGE_SIZE); @@ -400,10 +408,7 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, */ page->cp_inode = inode; } - /* We keep the refcount from cl_page_find, so we don't need - * another one here - */ - cl_page_list_add(&queue->c2_qin, page, false); + cdp->cdp_cl_pages[i] = page; /* * Call page clip for incomplete pages, to set range of bytes * in the page and to tell transfer formation engine to send @@ -423,6 +428,8 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, LASSERT(i == cdp->cdp_count); LASSERT(size == 0); + cl_dio_pages_2queue(cdp); + queue = &cdp->cdp_queue; atomic_add(io_pages, &anchor->csi_sync_nr); /* * Avoid out-of-order execution of adding inflight @@ -430,13 +437,20 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, */ smp_mb(); rc = cl_io_submit_rw(env, io, iot, queue); + /* pages must be off the queue when they're freed */ if (rc == 0) { - cl_page_list_splice(&queue->c2_qout, &sdio->csd_pages); + while (queue->c2_qout.pl_nr > 0) { + page = cl_page_list_first(&queue->c2_qout); + cl_page_list_del(env, &queue->c2_qout, page, + false); + } } else { atomic_add(-queue->c2_qin.pl_nr, &anchor->csi_sync_nr); - cl_page_list_for_each(page, &queue->c2_qin) + for (i = 0; i < cdp->cdp_count; i++) { + page = cdp->cdp_cl_pages[i]; page->cp_sync_io = NULL; + } } /* handle partially submitted reqs */ if (queue->c2_qin.pl_nr > 0) { @@ -448,11 +462,9 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size, } out: - /* if pages were not submitted successfully above, this takes care of - * taking them off the list and removing the single reference they have - * from when they were created + /* cleanup of the page array is handled by cl_sub_dio_end, so there's + * no work to do on error here */ - cl_2queue_fini(env, queue); RETURN(rc); } diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index a42ede6..826d4a0 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -1235,6 +1235,20 @@ static inline void dio_aio_complete(struct kiocb *iocb, ssize_t res) #endif } +void cl_dio_pages_2queue(struct cl_dio_pages *cdp) +{ + int i = 0; + + cl_2queue_init(&cdp->cdp_queue); + + for (i = 0; i < cdp->cdp_count; i++) { + struct cl_page *page = cdp->cdp_cl_pages[i]; + + cl_page_list_add(&cdp->cdp_queue.c2_qin, page, false); + } +} +EXPORT_SYMBOL(cl_dio_pages_2queue); + static void cl_dio_aio_end(const struct lu_env *env, struct cl_sync_io *anchor) { struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync); @@ -1259,16 +1273,32 @@ static inline void csd_dup_free(struct cl_iter_dup *dup) static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor) { struct cl_sub_dio *sdio = container_of(anchor, typeof(*sdio), csd_sync); + struct cl_dio_pages *cdp = &sdio->csd_dio_pages; ssize_t ret = anchor->csi_sync_rc; + bool array_incomplete = false; + int i; ENTRY; - /* release pages */ - while (sdio->csd_pages.pl_nr > 0) { - struct cl_page *page = cl_page_list_first(&sdio->csd_pages); - - cl_page_list_del(env, &sdio->csd_pages, page, false); - cl_page_put(env, page); + if (cdp->cdp_cl_pages) { + for (i = 0; i < cdp->cdp_count; i++) { + struct cl_page *page = cdp->cdp_cl_pages[i]; + /* if we failed allocating pages, the page array may be + * incomplete, so check the pointers + * + * FIXME: This extra tracking of array completeness is + * just a debug check and will be removed later in the + * series. + */ + if (page) + cl_page_put(env, page); + else if (array_incomplete) + LASSERT(!page); + else + array_incomplete = true; + } + OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_cl_pages, + cdp->cdp_count); } if (sdio->csd_unaligned) { @@ -1281,7 +1311,7 @@ static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor) */ if (!sdio->csd_write && sdio->csd_bytes > 0) ret = ll_dio_user_copy(sdio); - ll_free_dio_buffer(&sdio->csd_dio_pages); + ll_free_dio_buffer(cdp); /* handle the freeing here rather than in cl_sub_dio_free * because we have the unmodified iovec pointer */ @@ -1290,8 +1320,7 @@ static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor) /* unaligned DIO does not get user pages, so it doesn't have to * release them, but aligned I/O must */ - ll_release_user_pages(sdio->csd_dio_pages.cdp_pages, - sdio->csd_dio_pages.cdp_count); + ll_release_user_pages(cdp->cdp_pages, cdp->cdp_count); } cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret); @@ -1344,7 +1373,6 @@ struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio, */ cl_sync_io_init_notify(&sdio->csd_sync, 1, sdio, cl_sub_dio_end); - cl_page_list_init(&sdio->csd_pages); sdio->csd_ll_aio = ll_aio; sdio->csd_creator_free = sync; -- 1.8.3.1