Whamcloud - gitweb
LU-15811 llite: Rework upper/lower DIO/AIO 87/47187/17
authorPatrick Farrell <pfarrell@whamcloud.com>
Mon, 2 May 2022 20:17:02 +0000 (16:17 -0400)
committerOleg Drokin <green@whamcloud.com>
Thu, 1 Sep 2022 05:52:36 +0000 (05:52 +0000)
One of the patches for LU-13799,
"Implement lower/upper aio"
(https://review.whamcloud.com/44209/) created a
complicated setup where the cl_dio_aio struct was used
both for the top level DIO or AIO and for the lower level
sub I/Os (corresponding to stripes).

This is quite complicated and hard to follow, so this
rewrites these two uses to be separate structs.  This
incidentally fixes at least one possible memory leak, but
is mostly a cleanup.

Fixes: 46ff761371 "LU-13799 Implement lower/upper aio"
Signed-off-by: Patrick Farrell <pfarrell@whamcloud.com>
Change-Id: Ide4a2b84f48624ee97dfb57fe80d201fbb7fe8d0
Reviewed-on: https://review.whamcloud.com/47187
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Yingjin Qian <qian@ddn.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/llite/file.c
lustre/llite/rw26.c
lustre/obdclass/cl_internal.h
lustre/obdclass/cl_io.c
lustre/obdclass/cl_object.c
lustre/tests/sanity.sh

index cc62e05..1a76b18 100644 (file)
@@ -1780,8 +1780,8 @@ struct cl_io {
         enum cl_io_state               ci_state;
         /** main object this io is against. Immutable after creation. */
         struct cl_object              *ci_obj;
-       /** one AIO request might be split in cl_io_loop */
-       struct cl_dio_aio             *ci_aio;
+       /** top level dio_aio */
+       struct cl_dio_aio             *ci_dio_aio;
         /**
          * Upper layer io, of which this io is a part of. Immutable after
          * creation.
@@ -2516,11 +2516,12 @@ void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj,
 
 struct cl_sync_io;
 struct cl_dio_aio;
+struct cl_sub_dio;
 
 typedef void (cl_sync_io_end_t)(const struct lu_env *, struct cl_sync_io *);
 
-void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
-                           struct cl_dio_aio *aio, cl_sync_io_end_t *end);
+void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr, void *dio_aio,
+                           cl_sync_io_end_t *end);
 
 int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
                    long timeout);
@@ -2528,9 +2529,12 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
                     int ioret);
 int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
                            long timeout, int ioret);
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
-                               struct cl_dio_aio *ll_aio);
-void cl_aio_free(const struct lu_env *env, struct cl_dio_aio *aio);
+struct cl_dio_aio *cl_dio_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
+                                   bool is_aio);
+struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio, bool nofree);
+void cl_dio_aio_free(const struct lu_env *env, struct cl_dio_aio *aio,
+                    bool always_free);
+void cl_sub_dio_free(struct cl_sub_dio *sdio, bool nofree);
 static inline void cl_sync_io_init(struct cl_sync_io *anchor, int nr)
 {
        cl_sync_io_init_notify(anchor, nr, NULL, NULL);
@@ -2551,8 +2555,8 @@ struct cl_sync_io {
        wait_queue_head_t       csi_waitq;
        /** callback to invoke when this IO is finished */
        cl_sync_io_end_t       *csi_end_io;
-       /** aio private data */
-       struct cl_dio_aio      *csi_aio;
+       /* private pointer for an associated DIO/AIO */
+       void                   *csi_dio_aio;
 };
 
 /** direct IO pages */
@@ -2568,19 +2572,27 @@ struct ll_dio_pages {
        loff_t                  ldp_file_offset;
 };
 
-/** To support Direct AIO */
+/* Top level struct used for AIO and DIO */
 struct cl_dio_aio {
        struct cl_sync_io       cda_sync;
-       struct cl_page_list     cda_pages;
        struct cl_object        *cda_obj;
        struct kiocb            *cda_iocb;
        ssize_t                 cda_bytes;
-       struct cl_dio_aio       *cda_ll_aio;
-       struct ll_dio_pages     cda_dio_pages;
        unsigned                cda_no_aio_complete:1,
-                               cda_no_aio_free:1;
+                               cda_no_sub_free:1;
 };
 
+/* Sub-dio used for splitting DIO (and AIO, because AIO is DIO) according to
+ * the layout/striping, so we can do parallel submit of DIO RPCs
+ */
+struct cl_sub_dio {
+       struct cl_sync_io       csd_sync;
+       struct cl_page_list     csd_pages;
+       ssize_t                 csd_bytes;
+       struct cl_dio_aio       *csd_ll_aio;
+       struct ll_dio_pages     csd_dio_pages;
+       unsigned                csd_no_free:1;
+};
 #if defined(HAVE_DIRECTIO_ITER) || defined(HAVE_IOV_ITER_RW) || \
        defined(HAVE_DIRECTIO_2ARGS)
 #define HAVE_DIO_ITER 1
index 73c256c..ba5e959 100644 (file)
@@ -1651,7 +1651,7 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
        unsigned int retried = 0, dio_lock = 0;
        bool is_aio = false;
        bool is_parallel_dio = false;
-       struct cl_dio_aio *ci_aio = NULL;
+       struct cl_dio_aio *ci_dio_aio = NULL;
        size_t per_bytes;
        bool partial_io = false;
        size_t max_io_pages, max_cached_pages;
@@ -1684,9 +1684,9 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
                if (!ll_sbi_has_parallel_dio(sbi))
                        is_parallel_dio = false;
 
-               ci_aio = cl_aio_alloc(args->u.normal.via_iocb,
-                                     ll_i2info(inode)->lli_clob, NULL);
-               if (!ci_aio)
+               ci_dio_aio = cl_dio_aio_alloc(args->u.normal.via_iocb,
+                                         ll_i2info(inode)->lli_clob, is_aio);
+               if (!ci_dio_aio)
                        GOTO(out, rc = -ENOMEM);
        }
 
@@ -1703,7 +1703,7 @@ restart:
        partial_io = per_bytes < count;
        io = vvp_env_thread_io(env);
        ll_io_init(io, file, iot, args);
-       io->ci_aio = ci_aio;
+       io->ci_dio_aio = ci_dio_aio;
        io->ci_dio_lock = dio_lock;
        io->ci_ndelay_tried = retried;
        io->ci_parallel_dio = is_parallel_dio;
@@ -1748,12 +1748,8 @@ restart:
                rc = io->ci_result;
        }
 
-       /* N/B: parallel DIO may be disabled during i/o submission;
-        * if that occurs, async RPCs are resolved before we get here, and this
-        * wait call completes immediately.
-        */
        if (is_parallel_dio) {
-               struct cl_sync_io *anchor = &io->ci_aio->cda_sync;
+               struct cl_sync_io *anchor = &io->ci_dio_aio->cda_sync;
 
                /* for dio, EIOCBQUEUED is an implementation detail,
                 * and we don't return it to userspace
@@ -1761,6 +1757,11 @@ restart:
                if (rc == -EIOCBQUEUED)
                        rc = 0;
 
+               /* N/B: parallel DIO may be disabled during i/o submission;
+                * if that occurs, I/O shifts to sync, so it's all resolved
+                * before we get here, and this wait call completes
+                * immediately.
+                */
                rc2 = cl_sync_io_wait_recycle(env, anchor, 0, 0);
                if (rc2 < 0)
                        rc = rc2;
@@ -1824,24 +1825,29 @@ out:
                goto restart;
        }
 
-       if (io->ci_aio) {
+       if (io->ci_dio_aio) {
                /*
                 * VFS will call aio_complete() if no -EIOCBQUEUED
                 * is returned for AIO, so we can not call aio_complete()
                 * in our end_io().
+                *
+                * NB: This is safe because the atomic_dec_and_lock  in
+                * cl_sync_io_init has implicit memory barriers, so this will
+                * be seen by whichever thread completes the DIO/AIO, even if
+                * it's not this one
                 */
                if (rc != -EIOCBQUEUED)
-                       io->ci_aio->cda_no_aio_complete = 1;
+                       io->ci_dio_aio->cda_no_aio_complete = 1;
                /**
                 * Drop one extra reference so that end_io() could be
                 * called for this IO context, we could call it after
                 * we make sure all AIO requests have been proceed.
                 */
-               cl_sync_io_note(env, &io->ci_aio->cda_sync,
+               cl_sync_io_note(env, &io->ci_dio_aio->cda_sync,
                                rc == -EIOCBQUEUED ? 0 : rc);
                if (!is_aio) {
-                       cl_aio_free(env, io->ci_aio);
-                       io->ci_aio = NULL;
+                       cl_dio_aio_free(env, io->ci_dio_aio, true);
+                       io->ci_dio_aio = NULL;
                }
        }
 
index 89d7891..f558984 100644 (file)
@@ -293,13 +293,13 @@ static unsigned long ll_iov_iter_alignment(struct iov_iter *i)
 
 static int
 ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
-                  int rw, struct inode *inode, struct cl_dio_aio *aio)
+                  int rw, struct inode *inode, struct cl_sub_dio *sdio)
 {
-       struct ll_dio_pages *pv = &aio->cda_dio_pages;
+       struct ll_dio_pages *pv = &sdio->csd_dio_pages;
        struct cl_page    *page;
        struct cl_2queue  *queue = &io->ci_queue;
        struct cl_object  *obj = io->ci_obj;
-       struct cl_sync_io *anchor = &aio->cda_sync;
+       struct cl_sync_io *anchor = &sdio->csd_sync;
        loff_t offset   = pv->ldp_file_offset;
        int io_pages    = 0;
        size_t page_size = cl_page_size(obj);
@@ -361,7 +361,7 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
                smp_mb();
                rc = cl_io_submit_rw(env, io, iot, queue);
                if (rc == 0) {
-                       cl_page_list_splice(&queue->c2_qout, &aio->cda_pages);
+                       cl_page_list_splice(&queue->c2_qout, &sdio->csd_pages);
                } else {
                        atomic_add(-queue->c2_qin.pl_nr,
                                   &anchor->csi_sync_nr);
@@ -406,12 +406,14 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        struct cl_io *io;
        struct file *file = iocb->ki_filp;
        struct inode *inode = file->f_mapping->host;
-       struct cl_dio_aio *ll_aio;
-       struct cl_dio_aio *ldp_aio;
+       struct cl_dio_aio *ll_dio_aio;
+       struct cl_sub_dio *ldp_aio;
        size_t count = iov_iter_count(iter);
        ssize_t tot_bytes = 0, result = 0;
        loff_t file_offset = iocb->ki_pos;
+       bool sync_submit = false;
        struct vvp_io *vio;
+       ssize_t rc2;
 
        /* Check EOF by ourselves */
        if (rw == READ && file_offset >= i_size_read(inode))
@@ -441,9 +443,22 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        io = lcc->lcc_io;
        LASSERT(io != NULL);
 
-       ll_aio = io->ci_aio;
-       LASSERT(ll_aio);
-       LASSERT(ll_aio->cda_iocb == iocb);
+       ll_dio_aio = io->ci_dio_aio;
+       LASSERT(ll_dio_aio);
+       LASSERT(ll_dio_aio->cda_iocb == iocb);
+
+       /* We cannot do parallel submission of sub-I/Os - for AIO or regular
+        * DIO - unless lockless because it causes us to release the lock
+        * early.
+        *
+        * There are also several circumstances in which we must disable
+        * parallel DIO, so we check if it is enabled.
+        *
+        * The check for "is_sync_kiocb" excludes AIO, which does not need to
+        * be disabled in these situations.
+        */
+       if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio))
+               sync_submit = true;
 
        while (iov_iter_count(iter)) {
                struct ll_dio_pages *pvec;
@@ -458,19 +473,22 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
                                count = i_size_read(inode) - file_offset;
                }
 
-               /* this aio is freed on completion from cl_sync_io_note, so we
-                * do not need to directly free the memory here
+               /* if we are doing sync_submit, then we free this below,
+                * otherwise it is freed on the final call to cl_sync_io_note
+                * (either in this function or from a ptlrpcd daemon)
                 */
-               ldp_aio = cl_aio_alloc(iocb, ll_i2info(inode)->lli_clob, ll_aio);
+               ldp_aio = cl_sub_dio_alloc(ll_dio_aio, sync_submit);
                if (!ldp_aio)
                        GOTO(out, result = -ENOMEM);
 
-               pvec = &ldp_aio->cda_dio_pages;
+               pvec = &ldp_aio->csd_dio_pages;
 
                result = ll_get_user_pages(rw, iter, &pages,
                                           &pvec->ldp_count, count);
                if (unlikely(result <= 0)) {
-                       cl_sync_io_note(env, &ldp_aio->cda_sync, result);
+                       cl_sync_io_note(env, &ldp_aio->csd_sync, result);
+                       if (sync_submit)
+                               cl_sub_dio_free(ldp_aio, true);
                        GOTO(out, result);
                }
 
@@ -483,8 +501,15 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
                /* We've submitted pages and can now remove the extra
                 * reference for that
                 */
-               cl_sync_io_note(env, &ldp_aio->cda_sync, result);
-
+               cl_sync_io_note(env, &ldp_aio->csd_sync, result);
+
+               if (sync_submit) {
+                       rc2 = cl_sync_io_wait(env, &ldp_aio->csd_sync,
+                                            0);
+                       if (result == 0 && rc2)
+                               result = rc2;
+                       cl_sub_dio_free(ldp_aio, true);
+               }
                if (unlikely(result < 0))
                        GOTO(out, result);
 
@@ -494,35 +519,18 @@ ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
        }
 
 out:
-       ll_aio->cda_bytes += tot_bytes;
+       ll_dio_aio->cda_bytes += tot_bytes;
 
        if (rw == WRITE)
                vio->u.readwrite.vui_written += tot_bytes;
        else
                vio->u.readwrite.vui_read += tot_bytes;
 
-       /* We cannot do async submission - for AIO or regular DIO - unless
-        * lockless because it causes us to release the lock early.
-        *
-        * There are also several circumstances in which we must disable
-        * parallel DIO, so we check if it is enabled.
-        *
-        * The check for "is_sync_kiocb" excludes AIO, which does not need to
-        * be disabled in these situations.
+       /* AIO is not supported on pipes, so we cannot return EIOCBQEUED like
+        * we normally would for both DIO and AIO here
         */
-       if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio)) {
-               ssize_t rc2;
-
-               /* Wait here rather than doing async submission */
-               rc2 = cl_sync_io_wait_recycle(env, &ll_aio->cda_sync, 0, 0);
-               if (result == 0 && rc2)
-                       result = rc2;
-
-               if (result == 0)
-                       result = tot_bytes;
-       } else if (result == 0) {
+       if (result == 0 && !iov_iter_is_pipe(iter))
                result = -EIOCBQUEUED;
-       }
 
        return result;
 }
index 63b1dab..3c8f22b 100644 (file)
@@ -46,6 +46,7 @@ struct cl_thread_info {
 };
 
 extern struct kmem_cache *cl_dio_aio_kmem;
+extern struct kmem_cache *cl_sub_dio_kmem;
 extern struct kmem_cache *cl_page_kmem_array[16];
 extern unsigned short cl_page_kmem_size_array[16];
 
index dd6b66c..60aabf2 100644 (file)
@@ -1147,7 +1147,7 @@ EXPORT_SYMBOL(cl_req_attr_set);
  */
 
 void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
-                           struct cl_dio_aio *aio, cl_sync_io_end_t *end)
+                           void *dio_aio, cl_sync_io_end_t *end)
 {
        ENTRY;
        memset(anchor, 0, sizeof(*anchor));
@@ -1155,7 +1155,7 @@ void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
        atomic_set(&anchor->csi_sync_nr, nr);
        anchor->csi_sync_rc = 0;
        anchor->csi_end_io = end;
-       anchor->csi_aio = aio;
+       anchor->csi_dio_aio = dio_aio;
        EXIT;
 }
 EXPORT_SYMBOL(cl_sync_io_init_notify);
@@ -1203,35 +1203,43 @@ static inline void aio_complete(struct kiocb *iocb, ssize_t res, ssize_t res2)
 }
 #endif
 
-static void cl_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
+static void cl_dio_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
 {
        struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync);
        ssize_t ret = anchor->csi_sync_rc;
 
        ENTRY;
 
+       if (!aio->cda_no_aio_complete)
+               aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
+
+       EXIT;
+}
+
+static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
+{
+       struct cl_sub_dio *sdio = container_of(anchor, typeof(*sdio), csd_sync);
+       ssize_t ret = anchor->csi_sync_rc;
+
+       ENTRY;
+
        /* release pages */
-       while (aio->cda_pages.pl_nr > 0) {
-               struct cl_page *page = cl_page_list_first(&aio->cda_pages);
+       while (sdio->csd_pages.pl_nr > 0) {
+               struct cl_page *page = cl_page_list_first(&sdio->csd_pages);
 
                cl_page_delete(env, page);
-               cl_page_list_del(env, &aio->cda_pages, page);
+               cl_page_list_del(env, &sdio->csd_pages, page);
        }
 
-       if (!aio->cda_no_aio_complete)
-               aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
-
-       if (aio->cda_ll_aio) {
-               ll_release_user_pages(aio->cda_dio_pages.ldp_pages,
-                                     aio->cda_dio_pages.ldp_count);
-               cl_sync_io_note(env, &aio->cda_ll_aio->cda_sync, ret);
-       }
+       ll_release_user_pages(sdio->csd_dio_pages.ldp_pages,
+                             sdio->csd_dio_pages.ldp_count);
+       cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret);
 
        EXIT;
 }
 
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
-                               struct cl_dio_aio *ll_aio)
+struct cl_dio_aio *cl_dio_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
+                                   bool is_aio)
 {
        struct cl_dio_aio *aio;
 
@@ -1241,47 +1249,63 @@ struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
                 * Hold one ref so that it won't be released until
                 * every pages is added.
                 */
-               cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_aio_end);
-               cl_page_list_init(&aio->cda_pages);
+               cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_dio_aio_end);
                aio->cda_iocb = iocb;
-               if (is_sync_kiocb(iocb) || ll_aio)
-                       aio->cda_no_aio_complete = 1;
-               else
-                       aio->cda_no_aio_complete = 0;
-               /* in the case of a lower level aio struct (ll_aio is set), or
-                * true AIO (!is_sync_kiocb()), the memory is freed by
-                * the daemons calling cl_sync_io_note, because they are the
-                * last users of the aio struct
+               aio->cda_no_aio_complete = !is_aio;
+               /* if this is true AIO, the memory is freed by the last call
+                * to cl_sync_io_note (when all the I/O is complete), because
+                * no one is waiting (in the kernel) for this to complete
                 *
                 * in other cases, the last user is cl_sync_io_wait, and in
-                * that case, the caller frees the aio struct after that call
-                * completes
+                * that case, the caller frees the struct after that call
                 */
-               if (ll_aio || !is_sync_kiocb(iocb))
-                       aio->cda_no_aio_free = 0;
-               else
-                       aio->cda_no_aio_free = 1;
+               aio->cda_no_sub_free = !is_aio;
 
                cl_object_get(obj);
                aio->cda_obj = obj;
-               aio->cda_ll_aio = ll_aio;
-
-               if (ll_aio)
-                       atomic_add(1,  &ll_aio->cda_sync.csi_sync_nr);
        }
        return aio;
 }
-EXPORT_SYMBOL(cl_aio_alloc);
+EXPORT_SYMBOL(cl_dio_aio_alloc);
 
-void cl_aio_free(const struct lu_env *env, struct cl_dio_aio *aio)
+struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio, bool nofree)
 {
-       if (aio) {
+       struct cl_sub_dio *sdio;
+
+       OBD_SLAB_ALLOC_PTR_GFP(sdio, cl_sub_dio_kmem, GFP_NOFS);
+       if (sdio != NULL) {
+               /*
+                * Hold one ref so that it won't be released until
+                * every pages is added.
+                */
+               cl_sync_io_init_notify(&sdio->csd_sync, 1, sdio,
+                                      cl_sub_dio_end);
+               cl_page_list_init(&sdio->csd_pages);
+
+               sdio->csd_ll_aio = ll_aio;
+               atomic_add(1,  &ll_aio->cda_sync.csi_sync_nr);
+               sdio->csd_no_free = nofree;
+       }
+       return sdio;
+}
+EXPORT_SYMBOL(cl_sub_dio_alloc);
+
+void cl_dio_aio_free(const struct lu_env *env, struct cl_dio_aio *aio,
+                    bool always_free)
+{
+       if (aio && (!aio->cda_no_sub_free || always_free)) {
                cl_object_put(env, aio->cda_obj);
                OBD_SLAB_FREE_PTR(aio, cl_dio_aio_kmem);
        }
 }
-EXPORT_SYMBOL(cl_aio_free);
+EXPORT_SYMBOL(cl_dio_aio_free);
 
+void cl_sub_dio_free(struct cl_sub_dio *sdio, bool always_free)
+{
+       if (sdio && (!sdio->csd_no_free || always_free))
+               OBD_SLAB_FREE_PTR(sdio, cl_sub_dio_kmem);
+}
+EXPORT_SYMBOL(cl_sub_dio_free);
 /*
  * ll_release_user_pages - tear down page struct array
  * @pages: array of page struct pointers underlying target buffer
@@ -1327,7 +1351,7 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
        LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
        if (atomic_dec_and_lock(&anchor->csi_sync_nr,
                                &anchor->csi_waitq.lock)) {
-               struct cl_dio_aio *aio = NULL;
+               void *dio_aio = NULL;
 
                cl_sync_io_end_t *end_io = anchor->csi_end_io;
 
@@ -1343,30 +1367,29 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
                if (end_io)
                        end_io(env, anchor);
 
-               aio = anchor->csi_aio;
+               dio_aio = anchor->csi_dio_aio;
 
                spin_unlock(&anchor->csi_waitq.lock);
 
-               if (aio && !aio->cda_no_aio_free)
-                       cl_aio_free(env, aio);
+               if (dio_aio) {
+                       if (end_io == cl_dio_aio_end)
+                               cl_dio_aio_free(env,
+                                               (struct cl_dio_aio *) dio_aio,
+                                               false);
+                       else if (end_io == cl_sub_dio_end)
+                               cl_sub_dio_free((struct cl_sub_dio *) dio_aio,
+                                               false);
+               }
        }
        EXIT;
 }
 EXPORT_SYMBOL(cl_sync_io_note);
 
-
 int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
                            long timeout, int ioret)
 {
-       bool no_aio_free = anchor->csi_aio->cda_no_aio_free;
        int rc = 0;
 
-       /* for true AIO, the daemons running cl_sync_io_note would normally
-        * free the aio struct, but if we're waiting on it, we need them to not
-        * do that.  This ensures the aio is not freed when we drop the
-        * reference count to zero in cl_sync_io_note below
-        */
-       anchor->csi_aio->cda_no_aio_free = 1;
        /*
         * @anchor was inited as 1 to prevent end_io to be
         * called before we add all pages for IO, so drop
@@ -1386,8 +1409,6 @@ int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
         */
        atomic_add(1, &anchor->csi_sync_nr);
 
-       anchor->csi_aio->cda_no_aio_free = no_aio_free;
-
        return rc;
 }
 EXPORT_SYMBOL(cl_sync_io_wait_recycle);
index 9054668..8effbec 100644 (file)
@@ -56,6 +56,7 @@
 
 static struct kmem_cache *cl_env_kmem;
 struct kmem_cache *cl_dio_aio_kmem;
+struct kmem_cache *cl_sub_dio_kmem;
 struct kmem_cache *cl_page_kmem_array[16];
 unsigned short cl_page_kmem_size_array[16];
 
@@ -1040,6 +1041,11 @@ static struct lu_kmem_descr cl_object_caches[] = {
                .ckd_size  = sizeof(struct cl_dio_aio)
        },
        {
+               .ckd_cache = &cl_sub_dio_kmem,
+               .ckd_name  = "cl_sub_dio_kmem",
+               .ckd_size  = sizeof(struct cl_sub_dio)
+       },
+       {
                .ckd_cache = NULL
        }
 };
index 4b0b3bb..6166cfb 100755 (executable)
@@ -5358,6 +5358,10 @@ run_test 42c "test partial truncate of file with cached dirty data"
 test_42d() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
 
+       local olddebug="$($LCTL get_param -n debug 2> /dev/null)"
+       stack_trap "$LCTL set_param -n debug='$olddebug'" EXIT
+       $LCTL set_param debug=+cache
+
        trunc_test 42d 0
        [ $BEFOREWRITES -eq $AFTERWRITES ] ||
                error "beforewrites $BEFOREWRITES != afterwrites $AFTERWRITES on truncate"
@@ -8983,6 +8987,10 @@ test_64e() {
        [ $OST1_VERSION -ge $(version_code 2.11.56) ] ||
                skip "Need OSS version at least 2.11.56"
 
+       local olddebug="$($LCTL get_param -n debug 2> /dev/null)"
+       stack_trap "$LCTL set_param -n debug='$olddebug'" EXIT
+       $LCTL set_param debug=+cache
+
        # Remount client to reset grant
        remount_client $MOUNT || error "failed to remount client"
        local osc_tgt="$FSNAME-OST0000-osc-$($LFS getname -i $DIR)"
@@ -9036,6 +9044,10 @@ run_test 64e "check grant consumption (no grant allocation)"
 test_64f() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
 
+       local olddebug="$($LCTL get_param -n debug 2> /dev/null)"
+       stack_trap "$LCTL set_param -n debug='$olddebug'" EXIT
+       $LCTL set_param debug=+cache
+
        # Remount client to reset grant
        remount_client $MOUNT || error "failed to remount client"
        local osc_tgt="$FSNAME-OST0000-osc-$($LFS getname -i $DIR)"