Whamcloud - gitweb
LU-13814 clio: add cl_dio_pages_init 09/52109/44
authorPatrick Farrell <pfarrell@whamcloud.com>
Wed, 26 Mar 2025 21:09:14 +0000 (17:09 -0400)
committerOleg Drokin <green@whamcloud.com>
Fri, 2 May 2025 02:16:46 +0000 (02:16 +0000)
Just like the cl_page it's replacing, the cl_dio_pages
struct needs various pieces of information from the
different layers of the cl_object in order to do the IO.

This means we need a cl_dio_pages init, analogous to
cl_page_alloc and coo_pages_init.

Note this does not implement coo_pages_init for any layers,
it just moves parts of the existing init code.

coo_dio_pages_init will be implemented in the next
patch.

Signed-off-by: Patrick Farrell <pfarrell@whamcloud.com>
Change-Id: I1fcf407b16d4077d94c7ba5afbc63bdd3fb3dfb4
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/52109
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Shaun Tancheff <shaun.tancheff@hpe.com>
lustre/include/cl_object.h
lustre/llite/rw26.c
lustre/obdclass/cl_io.c
lustre/obdclass/cl_page.c

index d05d302..108c3f7 100644 (file)
@@ -313,6 +313,7 @@ struct cl_object_operations {
         */
        int  (*coo_page_init)(const struct lu_env *env, struct cl_object *obj,
                              struct cl_page *page, pgoff_t index);
+
        /**
         * Initialize lock slice for this layer. Called top-to-bottom through
         * every object layer when a new cl_lock is instantiated. Layer
@@ -1415,7 +1416,6 @@ static inline void cl_read_ahead_release(const struct lu_env *env,
                ra->cra_release(env, ra);
 }
 
-
 struct cl_dio_pages;
 
 /**
@@ -2197,6 +2197,12 @@ static inline int cl_object_refc(struct cl_object *clob)
        return atomic_read(&header->loh_ref);
 }
 
+
+ssize_t cl_dio_pages_init(const struct lu_env *env, struct cl_object *obj,
+                         struct cl_dio_pages *cdp, struct iov_iter *iter,
+                         int rw, size_t maxsize, loff_t offset,
+                         bool unaligned);
+
 /* cl_page */
 struct cl_page *cl_page_find(const struct lu_env *env,
                             struct cl_object *obj,
@@ -2539,10 +2545,10 @@ struct cl_dio_pages {
 
        struct cl_page          **cdp_cl_pages;
        struct cl_2queue        cdp_queue;
-       /** # of pages in the array. */
-       size_t                  cdp_count;
        /* the file offset of the first page. */
        loff_t                  cdp_file_offset;
+       /** # of pages in the array. */
+       unsigned int            cdp_page_count;
        /* the first and last page can be incomplete, this records the
         * offsets
         */
index 2525d04..d5e2f43 100644 (file)
@@ -230,64 +230,6 @@ static int ll_releasepage(struct page *vmpage, RELEASEPAGE_ARG_TYPE gfp_mask)
 }
 #endif /* HAVE_AOPS_RELEASE_FOLIO */
 
-static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter,
-                               struct cl_dio_pages *cdp,
-                               size_t maxsize)
-{
-#if defined(HAVE_DIO_ITER)
-       size_t start;
-       size_t result;
-
-       result = iov_iter_get_pages_alloc2(iter, &cdp->cdp_pages, maxsize,
-                                         &start);
-       if (result > 0) {
-               cdp->cdp_count = DIV_ROUND_UP(result + start, PAGE_SIZE);
-               if (user_backed_iter(iter))
-                       iov_iter_revert(iter, result);
-       }
-       return result;
-#else
-       unsigned long addr;
-       size_t page_count;
-       size_t size;
-       long result;
-
-       if (!maxsize)
-               return 0;
-
-       if (!iter->nr_segs)
-               return 0;
-
-       addr = (unsigned long)iter->iov->iov_base + iter->iov_offset;
-       if (addr & ~PAGE_MASK)
-               return -EINVAL;
-
-       size = min_t(size_t, maxsize, iter->iov->iov_len);
-       page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT;
-       OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, page_count);
-       if (cdp->cdp_pages == NULL)
-               return -ENOMEM;
-
-       mmap_read_lock(current->mm);
-       result = get_user_pages(current, current->mm, addr, page_count,
-                               rw == READ, 0, cdp->cdp_pages, NULL);
-       mmap_read_unlock(current->mm);
-
-       if (unlikely(result != page_count)) {
-               ll_release_user_pages(cdp->cdp_pages, page_count);
-               cdp->cdp_pages = NULL;
-
-               if (result >= 0)
-                       return -EFAULT;
-
-               return result;
-       }
-       cdp->cdp_count = page_count;
-
-       return size;
-#endif
-}
-
 /* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */
 #if defined(HAVE_DIO_ITER)
 static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
@@ -368,22 +310,10 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
        int iot = rw == READ ? CRT_READ : CRT_WRITE;
        loff_t offset = cdp->cdp_file_offset;
        ssize_t rc = 0;
-       int i = 0;
+       unsigned int i = 0;
 
        ENTRY;
 
-       cdp->cdp_from = offset & ~PAGE_MASK;
-       cdp->cdp_to = (offset + size) & ~PAGE_MASK;
-
-       /* this is a special temporary allocation which lets us track the
-        * cl_pages and convert them to a list
-        *
-        * this is used in 'pushing down' the conversion to a page queue
-        */
-       OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_cl_pages, cdp->cdp_count);
-       if (!cdp->cdp_cl_pages)
-               GOTO(out, rc = -ENOMEM);
-
        while (size > 0) {
                size_t from = offset & ~PAGE_MASK;
                size_t to = min(from + size, PAGE_SIZE);
@@ -422,10 +352,10 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
        /* on success, we should hit every page in the cdp and have no bytes
         * left in 'size'
         */
-       LASSERT(i == cdp->cdp_count);
+       LASSERT(i == cdp->cdp_page_count);
        LASSERT(size == 0);
 
-       atomic_add(cdp->cdp_count, &anchor->csi_sync_nr);
+       atomic_add(cdp->cdp_page_count, &anchor->csi_sync_nr);
        /*
         * Avoid out-of-order execution of adding inflight
         * modifications count and io submit.
@@ -433,9 +363,9 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
        smp_mb();
        rc = cl_dio_submit_rw(env, io, iot, cdp);
        if (rc != 0) {
-               atomic_add(-cdp->cdp_count,
+               atomic_add(-cdp->cdp_page_count,
                           &anchor->csi_sync_nr);
-               for (i = 0; i < cdp->cdp_count; i++) {
+               for (i = 0; i < cdp->cdp_page_count; i++) {
                        page = cdp->cdp_cl_pages[i];
                        page->cp_sync_io = NULL;
                }
@@ -472,7 +402,7 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        struct inode *inode = file->f_mapping->host;
        struct cl_dio_aio *ll_dio_aio;
        struct cl_sub_dio *sdio;
-       size_t count = iov_iter_count(iter);
+       size_t bytes = iov_iter_count(iter);
        ssize_t tot_bytes = 0, result = 0;
        loff_t file_offset = iocb->ki_pos;
        bool sync_submit = false;
@@ -499,9 +429,9 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
 
        CDEBUG(D_VFSTRACE,
               "VFS Op:inode="DFID"(%p), size=%zd (max %lu), offset=%lld=%#llx, pages %zd (max %lu)%s%s%s%s\n",
-              PFID(ll_inode2fid(inode)), inode, count, MAX_DIO_SIZE,
+              PFID(ll_inode2fid(inode)), inode, bytes, MAX_DIO_SIZE,
               file_offset, file_offset,
-              (count >> PAGE_SHIFT) + !!(count & ~PAGE_MASK),
+              (bytes >> PAGE_SHIFT) + !!(bytes & ~PAGE_MASK),
               MAX_DIO_SIZE >> PAGE_SHIFT,
               io->ci_dio_lock ? ", locked" : ", lockless",
               io->ci_parallel_dio ? ", parallel" : "",
@@ -566,13 +496,13 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        while (iov_iter_count(iter)) {
                struct cl_dio_pages *cdp;
 
-               count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE);
+               bytes = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE);
                if (rw == READ) {
                        if (file_offset >= i_size_read(inode))
                                break;
 
-                       if (file_offset + count > i_size_read(inode))
-                               count = i_size_read(inode) - file_offset;
+                       if (file_offset + bytes > i_size_read(inode))
+                               bytes = i_size_read(inode) - file_offset;
                }
 
                /* if we are doing sync_submit, then we free this below,
@@ -586,27 +516,9 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
 
                cdp = &sdio->csd_dio_pages;
                cdp->cdp_file_offset = file_offset;
-
-               if (!unaligned) {
-                       result = ll_get_user_pages(rw, iter, cdp, count);
-                       /* ll_get_user_pages returns bytes in the IO or error*/
-                       count = result;
-               } else {
-                       /* explictly handle the ubuf() case for el9.4 */
-                       size_t len = iter_is_ubuf(iter) ? iov_iter_count(iter)
-                                  : iter_iov(iter)->iov_len;
-
-                       /* same calculation used in ll_get_user_pages */
-                       count = min_t(size_t, count, len);
-                       result = ll_allocate_dio_buffer(cdp, count);
-                       /* allocate_dio_buffer returns number of pages or
-                        * error, so do not set count = result
-                        */
-               }
-
-               /* now we have the actual count, so store it in the sdio */
-               sdio->csd_bytes = count;
-
+               result = cl_dio_pages_init(env, ll_dio_aio->cda_obj, cdp,
+                                          iter, rw, bytes, file_offset,
+                                          unaligned);
                if (unlikely(result <= 0)) {
                        cl_sync_io_note(env, &sdio->csd_sync, result);
                        if (sync_submit) {
@@ -615,8 +527,11 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
                        }
                        GOTO(out, result);
                }
+               /* now we have the actual bytes, so store it in the sdio */
+               bytes = result;
+               sdio->csd_bytes = bytes;
 
-               result = ll_direct_rw_pages(env, io, count, rw, inode, sdio);
+               result = ll_direct_rw_pages(env, io, bytes, rw, inode, sdio);
                /* if the i/o was unsuccessful, we zero the number of bytes to
                 * copy back.  Note that partial I/O completion isn't possible
                 * here - I/O either completes or fails.  So there's no need to
@@ -646,13 +561,13 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
                if (unlikely(result < 0))
                        GOTO(out, result);
 
-               iov_iter_advance(iter, count);
+               iov_iter_advance(iter, bytes);
 
-               tot_bytes += count;
-               file_offset += count;
+               tot_bytes += bytes;
+               file_offset += bytes;
                CDEBUG(D_VFSTRACE,
                       "result %zd tot_bytes %zd count %zd file_offset %lld\n",
-                      result, tot_bytes, count, file_offset);
+                      result, tot_bytes, bytes, file_offset);
        }
 
 out:
index 11cb6f1..38513b1 100644 (file)
@@ -1261,7 +1261,7 @@ void cl_dio_pages_2queue(struct cl_dio_pages *cdp)
 
        cl_2queue_init(&cdp->cdp_queue);
 
-       for (i = 0; i < cdp->cdp_count; i++) {
+       for (i = 0; i < cdp->cdp_page_count; i++) {
                struct cl_page *page = cdp->cdp_cl_pages[i];
 
                cl_page_list_add(&cdp->cdp_queue.c2_qin, page, false);
@@ -1301,7 +1301,7 @@ static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
        ENTRY;
 
        if (cdp->cdp_cl_pages) {
-               for (i = 0; i < cdp->cdp_count; i++) {
+               for (i = 0; i < cdp->cdp_page_count; i++) {
                        struct cl_page *page = cdp->cdp_cl_pages[i];
                        /* if we failed allocating pages, the page array may be
                         * incomplete, so check the pointers
@@ -1318,7 +1318,7 @@ static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
                                array_incomplete = true;
                }
                OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_cl_pages,
-                                        cdp->cdp_count);
+                                        cdp->cdp_page_count);
        }
 
        if (sdio->csd_unaligned) {
@@ -1340,7 +1340,7 @@ static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
                /* unaligned DIO does not get user pages, so it doesn't have to
                 * release them, but aligned I/O must
                 */
-               ll_release_user_pages(cdp->cdp_pages, cdp->cdp_count);
+               ll_release_user_pages(cdp->cdp_pages, cdp->cdp_page_count);
        }
        cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret);
 
@@ -1486,23 +1486,23 @@ int ll_allocate_dio_buffer(struct cl_dio_pages *cdp, size_t io_size)
         * io_size, making the rest of the calculation aligned
         */
        if (pg_offset) {
-               cdp->cdp_count++;
+               cdp->cdp_page_count++;
                io_size -= min_t(size_t, PAGE_SIZE - pg_offset, io_size);
        }
 
        /* calculate pages for the rest of the buffer */
-       cdp->cdp_count += (io_size + PAGE_SIZE - 1) >> PAGE_SHIFT;
+       cdp->cdp_page_count += (io_size + PAGE_SIZE - 1) >> PAGE_SHIFT;
 
 #ifdef HAVE_DIO_ITER
-       cdp->cdp_pages = kvzalloc(cdp->cdp_count * sizeof(struct page *),
+       cdp->cdp_pages = kvzalloc(cdp->cdp_page_count * sizeof(struct page *),
                                  GFP_NOFS);
 #else
-       OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_count);
+       OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_page_count);
 #endif
        if (cdp->cdp_pages == NULL)
                GOTO(out, result = -ENOMEM);
 
-       result = obd_pool_get_pages_array(cdp->cdp_pages, cdp->cdp_count);
+       result = obd_pool_get_pages_array(cdp->cdp_pages, cdp->cdp_page_count);
        if (result)
                GOTO(out, result);
 
@@ -1513,7 +1513,7 @@ out:
        }
 
        if (result == 0)
-               result = cdp->cdp_count;
+               result = cdp->cdp_page_count;
 
        RETURN(result);
 }
@@ -1521,12 +1521,12 @@ EXPORT_SYMBOL(ll_allocate_dio_buffer);
 
 void ll_free_dio_buffer(struct cl_dio_pages *cdp)
 {
-       obd_pool_put_pages_array(cdp->cdp_pages, cdp->cdp_count);
+       obd_pool_put_pages_array(cdp->cdp_pages, cdp->cdp_page_count);
 
 #ifdef HAVE_DIO_ITER
        kvfree(cdp->cdp_pages);
 #else
-       OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_count);
+       OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_pages, cdp->cdp_page_count);
 #endif
 }
 EXPORT_SYMBOL(ll_free_dio_buffer);
@@ -1583,8 +1583,8 @@ static ssize_t __ll_dio_user_copy(struct cl_sub_dio *sdio)
        int short_copies = 0;
        bool mm_used = false;
        bool locked = false;
+       unsigned int i = 0;
        int status = 0;
-       int i = 0;
        int rw;
 
        ENTRY;
@@ -1644,14 +1644,14 @@ static ssize_t __ll_dio_user_copy(struct cl_sub_dio *sdio)
                size_t copied; /* bytes successfully copied */
                size_t bytes; /* bytes to copy for this page */
 
-               LASSERT(i < cdp->cdp_count);
+               LASSERT(i < cdp->cdp_page_count);
 
                offset = pos & ~PAGE_MASK;
                bytes = min_t(unsigned long, PAGE_SIZE - offset, count);
 
                CDEBUG(D_VFSTRACE,
-                      "count %zd, offset %lu, pos %lld, cdp_count %lu\n",
-                      count, offset, pos, cdp->cdp_count);
+                      "count %zd, offset %lu, pos %lld, cdp_page_count %u\n",
+                      count, offset, pos, cdp->cdp_page_count);
 
                if (fatal_signal_pending(current)) {
                        status = -EINTR;
@@ -1720,9 +1720,9 @@ static ssize_t __ll_dio_user_copy(struct cl_sub_dio *sdio)
                sdio->csd_write_copied = true;
 
        /* if we complete successfully, we should reach all of the pages */
-       LASSERTF(ergo(status == 0, i == cdp->cdp_count - 1),
-                "status: %d, i: %d, cdp->cdp_count %zu, count %zu\n",
-                 status, i, cdp->cdp_count, count);
+       LASSERTF(ergo(status == 0, i == cdp->cdp_page_count - 1),
+                "status: %d, i: %d, cdp->cdp_page_count %u, count %zu\n",
+                 status, i, cdp->cdp_page_count, count);
 
 out:
        if (mm_used)
index ba371e3..652f985 100644 (file)
@@ -127,6 +127,120 @@ cl_page_slice_get(const struct cl_page *cl_page, int index)
             slice = cl_page_slice_get(cl_page, i); i >= 0;     \
             slice = cl_page_slice_get(cl_page, --i))
 
+/* does the work required to access the pages in the iov, be they userspace
+ * or kernel
+ *
+ * returns number of bytes
+ */
+static ssize_t ll_get_iov_memory(int rw, struct iov_iter *iter,
+                               struct cl_dio_pages *cdp,
+                               size_t maxsize)
+{
+#if defined(HAVE_DIO_ITER)
+       size_t start;
+       size_t bytes;
+
+       bytes = iov_iter_get_pages_alloc2(iter, &cdp->cdp_pages, maxsize,
+                                         &start);
+       if (bytes > 0) {
+               cdp->cdp_page_count = DIV_ROUND_UP(bytes + start, PAGE_SIZE);
+               if (user_backed_iter(iter))
+                       iov_iter_revert(iter, bytes);
+       }
+       return bytes;
+#else
+       unsigned int page_count;
+       unsigned long addr;
+       size_t size;
+       long result;
+
+       if (!maxsize)
+               return 0;
+
+       if (!iter->nr_segs)
+               return 0;
+
+       addr = (unsigned long)iter->iov->iov_base + iter->iov_offset;
+       if (addr & ~PAGE_MASK)
+               return -EINVAL;
+
+       size = min_t(size_t, maxsize, iter->iov->iov_len);
+       page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT;
+       OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_pages, page_count);
+       if (cdp->cdp_pages == NULL)
+               return -ENOMEM;
+
+       mmap_read_lock(current->mm);
+       result = get_user_pages(current, current->mm, addr, page_count,
+                               rw == READ, 0, cdp->cdp_pages, NULL);
+       mmap_read_unlock(current->mm);
+
+       if (unlikely(result != page_count)) {
+               ll_release_user_pages(cdp->cdp_pages, page_count);
+               cdp->cdp_pages = NULL;
+
+               if (result >= 0)
+                       return -EFAULT;
+
+               /* if result < 0, return the error */
+               return result;
+       }
+       cdp->cdp_page_count = page_count;
+
+       return size;
+#endif
+}
+
+ssize_t cl_dio_pages_init(const struct lu_env *env, struct cl_object *obj,
+                         struct cl_dio_pages *cdp, struct iov_iter *iter,
+                         int rw, size_t bytes, loff_t offset, bool unaligned)
+{
+       ssize_t result = 0;
+
+       ENTRY;
+
+       cdp->cdp_file_offset = offset;
+       cdp->cdp_from = offset & ~PAGE_MASK;
+       cdp->cdp_to = ((offset + bytes - 1) & ~PAGE_MASK);
+
+       /* these set cdp->page_count, which is used in coo_dio_pages_init */
+       if (!unaligned) {
+               result = ll_get_iov_memory(rw, iter, cdp, bytes);
+               /* ll_get_iov_memory returns bytes in the IO or error*/
+               bytes = result;
+       } else {
+               /* explictly handle the ubuf() case for el9.4 */
+               size_t len = iter_is_ubuf(iter) ? iov_iter_count(iter)
+                          : iter_iov(iter)->iov_len;
+
+               /* same calculation used in ll_get_user_pages */
+               bytes = min_t(size_t, bytes, len);
+               result = ll_allocate_dio_buffer(cdp, bytes);
+               /* allocate_dio_buffer returns number of pages or
+                * error, so do not set bytes = result
+                */
+               if (result > 0)
+                       result = 0;
+       }
+       if (result < 0)
+               GOTO(out, result);
+       LASSERT(cdp->cdp_page_count);
+       /* this is special temporary allocation which lets us track the
+        * cl_pages and convert them to a list
+        *
+        * this is used in 'pushing down' the conversion to a page queue
+        */
+       OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_cl_pages, cdp->cdp_page_count);
+       if (!cdp->cdp_cl_pages)
+               GOTO(out, result = -ENOMEM);
+
+out:
+       if (result >= 0)
+               result = bytes;
+       RETURN(result);
+}
+EXPORT_SYMBOL(cl_dio_pages_init);
+
 static void __cl_page_free(struct cl_page *cl_page, unsigned short bufsize)
 {
        if (cl_page->cp_in_kmem_array) {