Whamcloud - gitweb
LU-13814 clio: add cdp_cl_pages tracking 03/52103/29
authorPatrick Farrell <paf0187@gmail.com>
Fri, 12 Jul 2024 15:58:13 +0000 (11:58 -0400)
committerOleg Drokin <green@whamcloud.com>
Thu, 10 Apr 2025 06:49:02 +0000 (06:49 +0000)
This enables us start the lengthy process of converting the
DIO path from using lists of cl_pages to using a single
cl_dio_pages array.

We add a temporary cdp_cl_pages array tracking the cl_pages
and a function to convert it to a queue.  Subsequent
patches will push this through the DIO path, gradually
converting each step to use cl_dio_pages until there are no
steps left which use lists of cl_page.

This moves the queue usage to the cl_dio_pages queue for
ll_direct_rw_pages, the first step.

Signed-off-by: Patrick Farrell <patrick.farrell@oracle.com>
Change-Id: If7cb5cbb01afd96939b05124358345a0d518c180
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/52103
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Marc Vef <mvef@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/llite/rw26.c
lustre/obdclass/cl_io.c

index fc9407c..b8987bd 100644 (file)
@@ -2462,6 +2462,7 @@ void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj,
 struct cl_sync_io;
 struct cl_dio_aio;
 struct cl_sub_dio;
+struct cl_dio_pages;
 
 typedef void (cl_sync_io_end_t)(const struct lu_env *, struct cl_sync_io *);
 
@@ -2474,6 +2475,7 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
                     int ioret);
 int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
                            long timeout, int ioret);
+void cl_dio_pages_2queue(struct cl_dio_pages *ldp);
 struct cl_dio_aio *cl_dio_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
                                    bool is_aio);
 struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio,
@@ -2514,6 +2516,9 @@ struct cl_dio_pages {
         * pages, but for unaligned i/o, this is the internal buffer
         */
        struct page             **cdp_pages;
+
+       struct cl_page          **cdp_cl_pages;
+       struct cl_2queue        cdp_queue;
        /** # of pages in the array. */
        size_t                  cdp_count;
        /* the file offset of the first page. */
@@ -2547,7 +2552,6 @@ struct cl_iter_dup {
  */
 struct cl_sub_dio {
        struct cl_sync_io       csd_sync;
-       struct cl_page_list     csd_pages;
        ssize_t                 csd_bytes;
        struct cl_dio_aio       *csd_ll_aio;
        struct cl_dio_pages     csd_dio_pages;
index b37e5a0..bfcaa24 100644 (file)
@@ -363,8 +363,8 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
 {
        struct cl_dio_pages *cdp = &sdio->csd_dio_pages;
        struct cl_sync_io *anchor = &sdio->csd_sync;
-       struct cl_2queue *queue = &io->ci_queue;
        struct cl_object *obj = io->ci_obj;
+       struct cl_2queue *queue = NULL;
        struct cl_page *page;
        int iot = rw == READ ? CRT_READ : CRT_WRITE;
        loff_t offset = cdp->cdp_file_offset;
@@ -377,7 +377,15 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
        cdp->cdp_from = offset & ~PAGE_MASK;
        cdp->cdp_to = (offset + size) & ~PAGE_MASK;
 
-       cl_2queue_init(queue);
+       /* this is a special temporary allocation which lets us track the
+        * cl_pages and convert them to a list
+        *
+        * this is used in 'pushing down' the conversion to a page queue
+        */
+       OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_cl_pages, cdp->cdp_count);
+       if (!cdp->cdp_cl_pages)
+               GOTO(out, rc = -ENOMEM);
+
        while (size > 0) {
                size_t from = offset & ~PAGE_MASK;
                size_t to = min(from + size, PAGE_SIZE);
@@ -400,10 +408,7 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
                         */
                        page->cp_inode = inode;
                }
-               /* We keep the refcount from cl_page_find, so we don't need
-                * another one here
-                */
-               cl_page_list_add(&queue->c2_qin, page, false);
+               cdp->cdp_cl_pages[i] = page;
                /*
                 * Call page clip for incomplete pages, to set range of bytes
                 * in the page and to tell transfer formation engine to send
@@ -423,6 +428,8 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
        LASSERT(i == cdp->cdp_count);
        LASSERT(size == 0);
 
+       cl_dio_pages_2queue(cdp);
+       queue = &cdp->cdp_queue;
        atomic_add(io_pages, &anchor->csi_sync_nr);
        /*
         * Avoid out-of-order execution of adding inflight
@@ -430,13 +437,20 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
         */
        smp_mb();
        rc = cl_io_submit_rw(env, io, iot, queue);
+       /* pages must be off the queue when they're freed */
        if (rc == 0) {
-               cl_page_list_splice(&queue->c2_qout, &sdio->csd_pages);
+               while (queue->c2_qout.pl_nr > 0) {
+                       page = cl_page_list_first(&queue->c2_qout);
+                       cl_page_list_del(env, &queue->c2_qout, page,
+                                                false);
+               }
        } else {
                atomic_add(-queue->c2_qin.pl_nr,
                           &anchor->csi_sync_nr);
-               cl_page_list_for_each(page, &queue->c2_qin)
+               for (i = 0; i < cdp->cdp_count; i++) {
+                       page = cdp->cdp_cl_pages[i];
                        page->cp_sync_io = NULL;
+               }
        }
        /* handle partially submitted reqs */
        if (queue->c2_qin.pl_nr > 0) {
@@ -448,11 +462,9 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
        }
 
 out:
-       /* if pages were not submitted successfully above, this takes care of
-        * taking them off the list and removing the single reference they have
-        * from when they were created
+       /* cleanup of the page array is handled by cl_sub_dio_end, so there's
+        * no work to do on error here
         */
-       cl_2queue_fini(env, queue);
        RETURN(rc);
 }
 
index a42ede6..826d4a0 100644 (file)
@@ -1235,6 +1235,20 @@ static inline void dio_aio_complete(struct kiocb *iocb, ssize_t res)
 #endif
 }
 
+void cl_dio_pages_2queue(struct cl_dio_pages *cdp)
+{
+       int i = 0;
+
+       cl_2queue_init(&cdp->cdp_queue);
+
+       for (i = 0; i < cdp->cdp_count; i++) {
+               struct cl_page *page = cdp->cdp_cl_pages[i];
+
+               cl_page_list_add(&cdp->cdp_queue.c2_qin, page, false);
+       }
+}
+EXPORT_SYMBOL(cl_dio_pages_2queue);
+
 static void cl_dio_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
 {
        struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync);
@@ -1259,16 +1273,32 @@ static inline void csd_dup_free(struct cl_iter_dup *dup)
 static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
 {
        struct cl_sub_dio *sdio = container_of(anchor, typeof(*sdio), csd_sync);
+       struct cl_dio_pages *cdp = &sdio->csd_dio_pages;
        ssize_t ret = anchor->csi_sync_rc;
+       bool array_incomplete = false;
+       int i;
 
        ENTRY;
 
-       /* release pages */
-       while (sdio->csd_pages.pl_nr > 0) {
-               struct cl_page *page = cl_page_list_first(&sdio->csd_pages);
-
-               cl_page_list_del(env, &sdio->csd_pages, page, false);
-               cl_page_put(env, page);
+       if (cdp->cdp_cl_pages) {
+               for (i = 0; i < cdp->cdp_count; i++) {
+                       struct cl_page *page = cdp->cdp_cl_pages[i];
+                       /* if we failed allocating pages, the page array may be
+                        * incomplete, so check the pointers
+                        *
+                        * FIXME: This extra tracking of array completeness is
+                        * just a debug check and will be removed later in the
+                        * series.
+                        */
+                       if (page)
+                               cl_page_put(env, page);
+                       else if (array_incomplete)
+                               LASSERT(!page);
+                       else
+                               array_incomplete = true;
+               }
+               OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_cl_pages,
+                                        cdp->cdp_count);
        }
 
        if (sdio->csd_unaligned) {
@@ -1281,7 +1311,7 @@ static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
                 */
                if (!sdio->csd_write && sdio->csd_bytes > 0)
                        ret = ll_dio_user_copy(sdio);
-               ll_free_dio_buffer(&sdio->csd_dio_pages);
+               ll_free_dio_buffer(cdp);
                /* handle the freeing here rather than in cl_sub_dio_free
                 * because we have the unmodified iovec pointer
                 */
@@ -1290,8 +1320,7 @@ static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
                /* unaligned DIO does not get user pages, so it doesn't have to
                 * release them, but aligned I/O must
                 */
-               ll_release_user_pages(sdio->csd_dio_pages.cdp_pages,
-                                     sdio->csd_dio_pages.cdp_count);
+               ll_release_user_pages(cdp->cdp_pages, cdp->cdp_count);
        }
        cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret);
 
@@ -1344,7 +1373,6 @@ struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio,
                 */
                cl_sync_io_init_notify(&sdio->csd_sync, 1, sdio,
                                       cl_sub_dio_end);
-               cl_page_list_init(&sdio->csd_pages);
 
                sdio->csd_ll_aio = ll_aio;
                sdio->csd_creator_free = sync;