From 8eba9a2ef31238e0ddc2d227a6f4921c2f818123 Mon Sep 17 00:00:00 2001 From: Patrick Farrell Date: Wed, 19 Jan 2022 10:46:24 -0500 Subject: [PATCH] LU-13799 llite: Implement lower/upper aio This patch creates a lower level aio struct for each set of pages submitted, and attaches that to the llite level aio. That means the completion of i/o (in the sense of successful RPC/page completion) is associated with the lower level aio struct, and the higher level aio waits for the completion of these lower level structs. Previously, all pages were associated with the upper level (and only) aio struct. This patch is a reorganization/cleanup, which is necessary for the next patch, which moves release pages to aio_end. The justification for this (correctness and performance) will be provided in that patch. Lustre-change: https://review.whamcloud.com/44209 Lustre-commit: 46ff76137160b66f1d4437b3443859027faae9c4 Signed-off-by: Patrick Farrell Change-Id: I2d1c01875c5478c54d7cdd93af7ff12bb4928d94 Reviewed-on: https://review.whamcloud.com/44684 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger --- lustre/include/cl_object.h | 7 +++++-- lustre/llite/file.c | 2 +- lustre/llite/rw26.c | 31 +++++++++++++++++++++++-------- lustre/obdclass/cl_io.c | 45 +++++++++++++++++++++++++++++++++++---------- 4 files changed, 64 insertions(+), 21 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 903e992..0e35e32 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -2580,7 +2580,8 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor, int ioret); int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor, long timeout, int ioret); -struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj); +struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj, + struct cl_dio_aio *ll_aio); void cl_aio_free(const struct lu_env *env, struct cl_dio_aio *aio); static inline void cl_sync_io_init(struct cl_sync_io *anchor, int nr) { @@ -2613,7 +2614,9 @@ struct cl_dio_aio { struct cl_object *cda_obj; struct kiocb *cda_iocb; ssize_t cda_bytes; - unsigned cda_no_aio_complete:1; + struct cl_dio_aio *cda_ll_aio; + unsigned cda_no_aio_complete:1, + cda_no_aio_free:1; }; /** @} cl_sync_io */ diff --git a/lustre/llite/file.c b/lustre/llite/file.c index c8e2fc8..6c68b8a 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1656,7 +1656,7 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args, is_parallel_dio = false; ci_aio = cl_aio_alloc(args->u.normal.via_iocb, - ll_i2info(inode)->lli_clob); + ll_i2info(inode)->lli_clob, NULL); if (!ci_aio) GOTO(out, rc = -ENOMEM); } diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index aec3383..86e0a8f 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -439,7 +439,8 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) struct cl_io *io; struct file *file = iocb->ki_filp; struct inode *inode = file->f_mapping->host; - struct cl_dio_aio *aio; + struct cl_dio_aio *ll_aio; + struct cl_dio_aio *ldp_aio; size_t count = iov_iter_count(iter); ssize_t tot_bytes = 0, result = 0; loff_t file_offset = iocb->ki_pos; @@ -473,12 +474,12 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) io = lcc->lcc_io; LASSERT(io != NULL); - aio = io->ci_aio; - LASSERT(aio); - LASSERT(aio->cda_iocb == iocb); + ll_aio = io->ci_aio; + LASSERT(ll_aio); + LASSERT(ll_aio->cda_iocb == iocb); while (iov_iter_count(iter)) { - struct ll_dio_pages pvec = { .ldp_aio = aio }; + struct ll_dio_pages pvec = {}; struct page **pages; count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE); @@ -490,10 +491,20 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) count = i_size_read(inode) - file_offset; } + /* this aio is freed on completion from cl_sync_io_note, so we + * do not need to directly free the memory here + */ + ldp_aio = cl_aio_alloc(iocb, ll_i2info(inode)->lli_clob, ll_aio); + if (!ldp_aio) + GOTO(out, result = -ENOMEM); + pvec.ldp_aio = ldp_aio; + result = ll_get_user_pages(rw, iter, &pages, &pvec.ldp_count, count); - if (unlikely(result <= 0)) + if (unlikely(result <= 0)) { + cl_sync_io_note(env, &ldp_aio->cda_sync, result); GOTO(out, result); + } count = result; pvec.ldp_file_offset = file_offset; @@ -501,6 +512,10 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) result = ll_direct_rw_pages(env, io, count, rw, inode, &pvec); + /* We've submitted pages and can now remove the extra + * reference for that + */ + cl_sync_io_note(env, &ldp_aio->cda_sync, result); ll_free_user_pages(pages, pvec.ldp_count); if (unlikely(result < 0)) @@ -512,7 +527,7 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw) } out: - aio->cda_bytes += tot_bytes; + ll_aio->cda_bytes += tot_bytes; if (rw == WRITE) vio->u.readwrite.vui_written += tot_bytes; @@ -532,7 +547,7 @@ out: ssize_t rc2; /* Wait here rather than doing async submission */ - rc2 = cl_sync_io_wait_recycle(env, &aio->cda_sync, 0, 0); + rc2 = cl_sync_io_wait_recycle(env, &ll_aio->cda_sync, 0, 0); if (result == 0 && rc2) result = rc2; diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index 2fea1e3..40d3251 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -1222,10 +1222,14 @@ static void cl_aio_end(const struct lu_env *env, struct cl_sync_io *anchor) if (!aio->cda_no_aio_complete) aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0); + if (aio->cda_ll_aio) + cl_sync_io_note(env, &aio->cda_ll_aio->cda_sync, ret); + EXIT; } -struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj) +struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj, + struct cl_dio_aio *ll_aio) { struct cl_dio_aio *aio; @@ -1238,12 +1242,30 @@ struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj) cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_aio_end); cl_page_list_init(&aio->cda_pages); aio->cda_iocb = iocb; - if (is_sync_kiocb(iocb)) + if (is_sync_kiocb(iocb) || ll_aio) aio->cda_no_aio_complete = 1; else aio->cda_no_aio_complete = 0; + /* in the case of a lower level aio struct (ll_aio is set), or + * true AIO (!is_sync_kiocb()), the memory is freed by + * the daemons calling cl_sync_io_note, because they are the + * last users of the aio struct + * + * in other cases, the last user is cl_sync_io_wait, and in + * that case, the caller frees the aio struct after that call + * completes + */ + if (ll_aio || !is_sync_kiocb(iocb)) + aio->cda_no_aio_free = 0; + else + aio->cda_no_aio_free = 1; + cl_object_get(obj); aio->cda_obj = obj; + aio->cda_ll_aio = ll_aio; + + if (ll_aio) + atomic_add(1, &ll_aio->cda_sync.csi_sync_nr); } return aio; } @@ -1266,6 +1288,7 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor, int ioret) { ENTRY; + if (anchor->csi_sync_rc == 0 && ioret < 0) anchor->csi_sync_rc = ioret; /* @@ -1296,14 +1319,7 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor, spin_unlock(&anchor->csi_waitq.lock); - /** - * For AIO (!is_sync_kiocb), we are responsible for freeing - * memory here. This is because we are the last user of this - * aio struct, whereas in other cases, we will call - * cl_sync_io_wait to wait after this, and so the memory is - * freed after that call. - */ - if (aio && !is_sync_kiocb(aio->cda_iocb)) + if (aio && !aio->cda_no_aio_free) cl_aio_free(env, aio); } EXIT; @@ -1314,8 +1330,15 @@ EXPORT_SYMBOL(cl_sync_io_note); int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor, long timeout, int ioret) { + bool no_aio_free = anchor->csi_aio->cda_no_aio_free; int rc = 0; + /* for true AIO, the daemons running cl_sync_io_note would normally + * free the aio struct, but if we're waiting on it, we need them to not + * do that. This ensures the aio is not freed when we drop the + * reference count to zero in cl_sync_io_note below + */ + anchor->csi_aio->cda_no_aio_free = 1; /* * @anchor was inited as 1 to prevent end_io to be * called before we add all pages for IO, so drop @@ -1335,6 +1358,8 @@ int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor, */ atomic_add(1, &anchor->csi_sync_nr); + anchor->csi_aio->cda_no_aio_free = no_aio_free; + return rc; } EXPORT_SYMBOL(cl_sync_io_wait_recycle); -- 1.8.3.1