Whamcloud - gitweb
LU-13814 clio: add cio_dio_submit 05/52105/18
authorPatrick Farrell <pfarrell@whamcloud.com>
Sun, 24 Sep 2023 20:10:41 +0000 (16:10 -0400)
committerPatrick Farrell <pfarrell@whamcloud.com>
Tue, 26 Sep 2023 18:58:04 +0000 (14:58 -0400)
Add cio_dio_submit method and push it down in to the lov
layer.

Note this part of the series relies on the slightly clever
fact that there is no vvp method for this and the LOV
method calls the OSC method.  This lets us do setup and
teardown of the queue in the lov layer.

The next step is to push the queue handling through this
function to the cl_io_submit_rw call here, which may be
non-trivial.

Test-Parameters: forjanitoronly
Signed-off-by: Patrick Farrell <pfarrell@whamcloud.com>
Change-Id: I3a2a7b7921391be3cc9a157d4351228226016a51

lustre/include/cl_object.h
lustre/lov/lov_io.c
lustre/obdclass/cl_io.c

index 344444f..ba42527 100644 (file)
@@ -1436,6 +1436,8 @@ static inline void cl_read_ahead_release(const struct lu_env *env,
 }
 
 
+struct cl_dio_pages;
+
 /**
  * Per-layer io operations.
  * \see vvp_io_ops, lov_io_ops, lovsub_io_ops, osc_io_ops
@@ -1533,6 +1535,14 @@ struct cl_io_operations {
                        const struct cl_io_slice *slice,
                        enum cl_req_type crt,
                        struct cl_2queue *queue);
+       /* the dio version of cio_submit, this either submits all pages
+        * successfully or fails.  Uses an array, rather than a queue.
+        */
+       int  (*cio_dio_submit)(const struct lu_env *env,
+                              struct cl_io *io,
+                              const struct cl_io_slice *slice,
+                              enum cl_req_type crt,
+                              struct cl_dio_pages *cdp);
        /**
         * Queue async page for write.
         * The difference between cio_submit and cio_queue is that
@@ -2369,8 +2379,6 @@ void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock);
 /** \defgroup cl_io cl_io
  * @{ */
 
-struct cl_dio_pages;
-
 int   cl_io_init         (const struct lu_env *env, struct cl_io *io,
                           enum cl_io_type iot, struct cl_object *obj);
 int   cl_io_sub_init     (const struct lu_env *env, struct cl_io *io,
index dd6c43f..f9c3419 100644 (file)
@@ -1284,6 +1284,98 @@ int lov_io_lru_reserve(const struct lu_env *env,
        RETURN(0);
 }
 
+static int lov_dio_submit(const struct lu_env *env,
+                         struct cl_io *io,
+                         const struct cl_io_slice *ios,
+                         enum cl_req_type crt, struct cl_dio_pages *cdp)
+{
+       struct cl_page_list     *plist = &lov_env_info(env)->lti_plist;
+       struct lov_io           *lio = cl2lov_io(env, ios);
+       struct cl_2queue        *queue;
+       struct cl_page          *page;
+       struct cl_page_list     *qin;
+       struct lov_io_sub       *sub;
+       struct cl_page          *tmp;
+       bool dio = false;
+       int index;
+       int rc = 0;
+       ENTRY;
+
+       cl_dio_pages_2queue(cdp);
+       queue = &cdp->cdp_queue;
+
+       qin = &queue->c2_qin;
+       page = cl_page_list_first(qin);
+
+       if (page->cp_type == CPT_TRANSIENT)
+               dio = true;
+
+       cl_page_list_init(plist);
+       while (qin->pl_nr > 0) {
+               struct cl_2queue  *cl2q = &lov_env_info(env)->lti_cl2q;
+
+               page = cl_page_list_first(qin);
+               if (lov_page_is_empty(page)) {
+                       cl_page_list_move(&queue->c2_qout, qin, page);
+
+                       /*
+                        * it could only be mirror read to get here therefore
+                        * the pages will be transient. We don't care about
+                        * the return code of cl_page_prep() at all.
+                        */
+                       LASSERT(page->cp_type == CPT_TRANSIENT);
+                       cl_page_completion(env, page, crt, 0);
+                       continue;
+               }
+
+               cl_2queue_init(cl2q);
+               cl_page_list_move(&cl2q->c2_qin, qin, page);
+
+               index = page->cp_lov_index;
+               /* DIO is already split by stripe */
+               if (!dio) {
+                       cl_page_list_for_each_safe(page, tmp, qin) {
+                               /* this page is not on this stripe */
+                               if (index != page->cp_lov_index)
+                                       continue;
+
+                               cl_page_list_move(&cl2q->c2_qin, qin, page);
+                       }
+               } else {
+                       cl_page_list_splice(qin, &cl2q->c2_qin);
+               }
+
+               sub = lov_sub_get(env, lio, index);
+               if (!IS_ERR(sub)) {
+                       rc = cl_io_submit_rw(sub->sub_env, &sub->sub_io,
+                                            crt, cl2q);
+               } else {
+                       rc = PTR_ERR(sub);
+               }
+
+               cl_page_list_splice(&cl2q->c2_qin, plist);
+               cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
+               cl_2queue_fini(env, cl2q);
+
+               if (rc != 0)
+                       break;
+       }
+
+       cl_page_list_splice(plist, qin);
+       cl_page_list_fini(env, plist);
+
+       /* if submit failed, no pages were sent */
+       LASSERT(ergo(rc != 0, list_empty(&queue->c2_qout.pl_pages)));
+       while (queue->c2_qout.pl_nr > 0) {
+               struct cl_page *page;
+
+               page = cl_page_list_first(&queue->c2_qout);
+               cl_page_list_del(env, &queue->c2_qout, page, false);
+       }
+
+       RETURN(rc);
+}
+
 /**
  * lov implementation of cl_operations::cio_submit() method. It takes a list
  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
@@ -1734,6 +1826,7 @@ static const struct cl_io_operations lov_io_ops = {
        .cio_read_ahead                = lov_io_read_ahead,
        .cio_lru_reserve               = lov_io_lru_reserve,
        .cio_submit                    = lov_io_submit,
+       .cio_dio_submit                = lov_dio_submit,
        .cio_commit_async              = lov_io_commit_async,
 };
 
@@ -1754,6 +1847,14 @@ static void lov_empty_io_fini(const struct lu_env *env,
        EXIT;
 }
 
+static int lov_empty_dio_submit(const struct lu_env *env,
+                               struct cl_io *io,
+                               const struct cl_io_slice *ios,
+                               enum cl_req_type crt, struct cl_dio_pages *cdp)
+{
+       return -EBADF;
+}
+
 static int lov_empty_io_submit(const struct lu_env *env,
                               struct cl_io *io,
                               const struct cl_io_slice *ios,
@@ -1819,6 +1920,7 @@ static const struct cl_io_operations lov_empty_io_ops = {
                }
        },
        .cio_submit                    = lov_empty_io_submit,
+       .cio_dio_submit                = lov_empty_dio_submit,
        .cio_commit_async              = LOV_EMPTY_IMPOSSIBLE
 };
 
index d075048..8344166 100644 (file)
@@ -639,31 +639,18 @@ int cl_dio_submit_rw(const struct lu_env *env, struct cl_io *io,
                     enum cl_req_type crt, struct cl_dio_pages *cdp)
 {
        const struct cl_io_slice *scan;
-       struct cl_2queue *queue;
        int result = 0;
 
        ENTRY;
 
-       cl_dio_pages_2queue(cdp);
-       queue = &cdp->cdp_queue;
-
        list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
-               if (scan->cis_iop->cio_submit == NULL)
+               if (scan->cis_iop->cio_dio_submit == NULL)
                        continue;
-               result = scan->cis_iop->cio_submit(env, io, scan, crt, queue);
+               result = scan->cis_iop->cio_dio_submit(env, io, scan, crt,
+                                                      cdp);
                if (result != 0)
                        break;
        }
-       /*
-        * If ->cio_submit() failed, no pages were sent.
-        */
-       LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
-       while (queue->c2_qout.pl_nr > 0) {
-               struct cl_page *page;
-
-               page = cl_page_list_first(&queue->c2_qout);
-               cl_page_list_del(env, &queue->c2_qout, page, false);
-       }
        RETURN(result);
 }
 EXPORT_SYMBOL(cl_dio_submit_rw);