Whamcloud - gitweb
LU-13799 llite: Implement lower/upper aio 09/44209/15
authorPatrick Farrell <pfarrell@whamcloud.com>
Fri, 30 Jul 2021 16:12:05 +0000 (12:12 -0400)
committerOleg Drokin <green@whamcloud.com>
Tue, 11 Jan 2022 06:34:42 +0000 (06:34 +0000)
This patch creates a lower level aio struct for each set of
pages submitted, and attaches that to the llite level aio.

That means the completion of i/o (in the sense of
successful RPC/page completion) is associated with the
lower level aio struct, and the higher level aio waits for
the completion of these lower level structs.  Previously,
all pages were associated with the upper level (and only)
aio struct.

This patch is a reorganization/cleanup, which is necessary
for the next patch, which moves release pages to aio_end.
The justification for this (correctness and performance)
will be provided in that patch.

Signed-off-by: Patrick Farrell <pfarrell@whamcloud.com>
Change-Id: I02d6a33a0d9f9bbc1a182bcd539bd836c240bcc5
Reviewed-on: https://review.whamcloud.com/44209
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Yingjin Qian <qian@ddn.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/llite/file.c
lustre/llite/rw26.c
lustre/obdclass/cl_io.c

index d6b3c70..5da48ce 100644 (file)
@@ -2580,7 +2580,8 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
                     int ioret);
 int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
                            long timeout, int ioret);
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj);
+struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
+                               struct cl_dio_aio *ll_aio);
 void cl_aio_free(const struct lu_env *env, struct cl_dio_aio *aio);
 static inline void cl_sync_io_init(struct cl_sync_io *anchor, int nr)
 {
@@ -2613,7 +2614,9 @@ struct cl_dio_aio {
        struct cl_object        *cda_obj;
        struct kiocb            *cda_iocb;
        ssize_t                 cda_bytes;
-       unsigned                cda_no_aio_complete:1;
+       struct cl_dio_aio       *cda_ll_aio;
+       unsigned                cda_no_aio_complete:1,
+                               cda_no_aio_free:1;
 };
 
 /** @} cl_sync_io */
index 1fe29db..28d5226 100644 (file)
@@ -1674,7 +1674,7 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
                        is_parallel_dio = false;
 
                ci_aio = cl_aio_alloc(args->u.normal.via_iocb,
-                                     ll_i2info(inode)->lli_clob);
+                                     ll_i2info(inode)->lli_clob, NULL);
                if (!ci_aio)
                        GOTO(out, rc = -ENOMEM);
        }
index e959e6b..1dbc820 100644 (file)
@@ -439,7 +439,8 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        struct cl_io *io;
        struct file *file = iocb->ki_filp;
        struct inode *inode = file->f_mapping->host;
-       struct cl_dio_aio *aio;
+       struct cl_dio_aio *ll_aio;
+       struct cl_dio_aio *ldp_aio;
        size_t count = iov_iter_count(iter);
        ssize_t tot_bytes = 0, result = 0;
        loff_t file_offset = iocb->ki_pos;
@@ -473,12 +474,12 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        io = lcc->lcc_io;
        LASSERT(io != NULL);
 
-       aio = io->ci_aio;
-       LASSERT(aio);
-       LASSERT(aio->cda_iocb == iocb);
+       ll_aio = io->ci_aio;
+       LASSERT(ll_aio);
+       LASSERT(ll_aio->cda_iocb == iocb);
 
        while (iov_iter_count(iter)) {
-               struct ll_dio_pages pvec = { .ldp_aio = aio };
+               struct ll_dio_pages pvec = {};
                struct page **pages;
 
                count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE);
@@ -490,10 +491,20 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
                                count = i_size_read(inode) - file_offset;
                }
 
+               /* this aio is freed on completion from cl_sync_io_note, so we
+                * do not need to directly free the memory here
+                */
+               ldp_aio = cl_aio_alloc(iocb, ll_i2info(inode)->lli_clob, ll_aio);
+               if (!ldp_aio)
+                       GOTO(out, result = -ENOMEM);
+               pvec.ldp_aio = ldp_aio;
+
                result = ll_get_user_pages(rw, iter, &pages,
                                           &pvec.ldp_count, count);
-               if (unlikely(result <= 0))
+               if (unlikely(result <= 0)) {
+                       cl_sync_io_note(env, &ldp_aio->cda_sync, result);
                        GOTO(out, result);
+               }
 
                count = result;
                pvec.ldp_file_offset = file_offset;
@@ -501,6 +512,10 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
 
                result = ll_direct_rw_pages(env, io, count,
                                            rw, inode, &pvec);
+               /* We've submitted pages and can now remove the extra
+                * reference for that
+                */
+               cl_sync_io_note(env, &ldp_aio->cda_sync, result);
                ll_free_user_pages(pages, pvec.ldp_count);
 
                if (unlikely(result < 0))
@@ -512,7 +527,7 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        }
 
 out:
-       aio->cda_bytes += tot_bytes;
+       ll_aio->cda_bytes += tot_bytes;
 
        if (rw == WRITE)
                vio->u.readwrite.vui_written += tot_bytes;
@@ -532,7 +547,7 @@ out:
                ssize_t rc2;
 
                /* Wait here rather than doing async submission */
-               rc2 = cl_sync_io_wait_recycle(env, &aio->cda_sync, 0, 0);
+               rc2 = cl_sync_io_wait_recycle(env, &ll_aio->cda_sync, 0, 0);
                if (result == 0 && rc2)
                        result = rc2;
 
index 29db2c6..c5aca1a 100644 (file)
@@ -1234,10 +1234,14 @@ static void cl_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
        if (!aio->cda_no_aio_complete)
                aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
 
+       if (aio->cda_ll_aio)
+               cl_sync_io_note(env, &aio->cda_ll_aio->cda_sync, ret);
+
        EXIT;
 }
 
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj)
+struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
+                               struct cl_dio_aio *ll_aio)
 {
        struct cl_dio_aio *aio;
 
@@ -1250,12 +1254,30 @@ struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj)
                cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_aio_end);
                cl_page_list_init(&aio->cda_pages);
                aio->cda_iocb = iocb;
-               if (is_sync_kiocb(iocb))
+               if (is_sync_kiocb(iocb) || ll_aio)
                        aio->cda_no_aio_complete = 1;
                else
                        aio->cda_no_aio_complete = 0;
+               /* in the case of a lower level aio struct (ll_aio is set), or
+                * true AIO (!is_sync_kiocb()), the memory is freed by
+                * the daemons calling cl_sync_io_note, because they are the
+                * last users of the aio struct
+                *
+                * in other cases, the last user is cl_sync_io_wait, and in
+                * that case, the caller frees the aio struct after that call
+                * completes
+                */
+               if (ll_aio || !is_sync_kiocb(iocb))
+                       aio->cda_no_aio_free = 0;
+               else
+                       aio->cda_no_aio_free = 1;
+
                cl_object_get(obj);
                aio->cda_obj = obj;
+               aio->cda_ll_aio = ll_aio;
+
+               if (ll_aio)
+                       atomic_add(1,  &ll_aio->cda_sync.csi_sync_nr);
        }
        return aio;
 }
@@ -1278,6 +1300,7 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
                     int ioret)
 {
        ENTRY;
+
        if (anchor->csi_sync_rc == 0 && ioret < 0)
                anchor->csi_sync_rc = ioret;
        /*
@@ -1308,14 +1331,7 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
 
                spin_unlock(&anchor->csi_waitq.lock);
 
-               /**
-                * For AIO (!is_sync_kiocb), we are responsible for freeing
-                * memory here.  This is because we are the last user of this
-                * aio struct, whereas in other cases, we will call
-                * cl_sync_io_wait to wait after this, and so the memory is
-                * freed after that call.
-                */
-               if (aio && !is_sync_kiocb(aio->cda_iocb))
+               if (aio && !aio->cda_no_aio_free)
                        cl_aio_free(env, aio);
        }
        EXIT;
@@ -1326,8 +1342,15 @@ EXPORT_SYMBOL(cl_sync_io_note);
 int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
                            long timeout, int ioret)
 {
+       bool no_aio_free = anchor->csi_aio->cda_no_aio_free;
        int rc = 0;
 
+       /* for true AIO, the daemons running cl_sync_io_note would normally
+        * free the aio struct, but if we're waiting on it, we need them to not
+        * do that.  This ensures the aio is not freed when we drop the
+        * reference count to zero in cl_sync_io_note below
+        */
+       anchor->csi_aio->cda_no_aio_free = 1;
        /*
         * @anchor was inited as 1 to prevent end_io to be
         * called before we add all pages for IO, so drop
@@ -1347,6 +1370,8 @@ int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
         */
        atomic_add(1, &anchor->csi_sync_nr);
 
+       anchor->csi_aio->cda_no_aio_free = no_aio_free;
+
        return rc;
 }
 EXPORT_SYMBOL(cl_sync_io_wait_recycle);