Whamcloud - gitweb
LU-13805 clio: Add csi_complete 13/49913/35
authorPatrick Farrell <pfarrell@whamcloud.com>
Mon, 6 Feb 2023 18:10:10 +0000 (13:10 -0500)
committerOleg Drokin <green@whamcloud.com>
Wed, 6 Sep 2023 06:14:39 +0000 (06:14 +0000)
The next patch will make end_io potentially sleep, so we
need to modify how completion works to avoid holding a
spinlock over the end_io() call.

This patch is strictly supporting work for the next patch
and has been pulled out so it can be tested by itself.

Signed-off-by: Patrick Farrell <pfarrell@whamcloud.com>
Change-Id: Iba3388a0e09fdd0ab2f4a95f1cde96908a485cfa
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/49913
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Sebastien Buisson <sbuisson@ddn.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/obdclass/cl_io.c

index 845944b..c21aea8 100644 (file)
@@ -2532,6 +2532,8 @@ static inline void cl_sync_io_init(struct cl_sync_io *anchor, int nr)
 struct cl_sync_io {
        /** number of pages yet to be transferred. */
        atomic_t                csi_sync_nr;
+       /** has this i/o completed? */
+       atomic_t                csi_complete;
        /** error code. */
        int                     csi_sync_rc;
        /** completion to be signaled when transfer is complete. */
index 8d7b6f2..bae22ca 100644 (file)
@@ -1120,6 +1120,7 @@ void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
        memset(anchor, 0, sizeof(*anchor));
        init_waitqueue_head(&anchor->csi_waitq);
        atomic_set(&anchor->csi_sync_nr, nr);
+       atomic_set(&anchor->csi_complete, 0);
        anchor->csi_sync_rc = 0;
        anchor->csi_end_io = end;
        anchor->csi_dio_aio = dio_aio;
@@ -1141,21 +1142,22 @@ int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
 
        if (timeout > 0 &&
            wait_event_idle_timeout(anchor->csi_waitq,
-                                   atomic_read(&anchor->csi_sync_nr) == 0,
+                                   atomic_read(&anchor->csi_complete) == 1,
                                    cfs_time_seconds(timeout)) == 0) {
                rc = -ETIMEDOUT;
                CERROR("IO failed: %d, still wait for %d remaining entries\n",
-                      rc, atomic_read(&anchor->csi_sync_nr));
+                      rc, atomic_read(&anchor->csi_complete));
        }
 
        wait_event_idle(anchor->csi_waitq,
-                       atomic_read(&anchor->csi_sync_nr) == 0);
+                       atomic_read(&anchor->csi_complete) == 1);
        if (!rc)
                rc = anchor->csi_sync_rc;
 
        /* We take the lock to ensure that cl_sync_io_note() has finished */
        spin_lock(&anchor->csi_waitq.lock);
        LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
+       LASSERT(atomic_read(&anchor->csi_complete) == 1);
        spin_unlock(&anchor->csi_waitq.lock);
 
        RETURN(rc);
@@ -1349,6 +1351,7 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
         * IO.
         */
        LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
+       LASSERT(atomic_read(&anchor->csi_complete) == 0);
        if (atomic_dec_and_lock(&anchor->csi_sync_nr,
                                &anchor->csi_waitq.lock)) {
                struct cl_sub_dio *sub_dio_aio = NULL;
@@ -1358,6 +1361,19 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
 
                cl_sync_io_end_t *end_io = anchor->csi_end_io;
 
+               spin_unlock(&anchor->csi_waitq.lock);
+               /* we cannot do end_io while holding a spin lock, because
+                * end_io may sleep
+                */
+               if (end_io)
+                       end_io(env, anchor);
+
+               spin_lock(&anchor->csi_waitq.lock);
+               /* this tells the waiters we've completed, and can only be set
+                * after end_io() has been called and while we're holding the
+                * spinlock
+                */
+               atomic_set(&anchor->csi_complete, 1);
                /*
                 * Holding the lock across both the decrement and
                 * the wakeup ensures cl_sync_io_wait() doesn't complete
@@ -1367,8 +1383,6 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
                 * completes.
                 */
                wake_up_locked(&anchor->csi_waitq);
-               if (end_io)
-                       end_io(env, anchor);
 
                csi_dio_aio = anchor->csi_dio_aio;
                sub_dio_aio = csi_dio_aio;
@@ -1417,6 +1431,8 @@ int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
         * reused we assume it as 1 before using.
         */
        atomic_add(1, &anchor->csi_sync_nr);
+       /* we must also set this anchor as incomplete */
+       atomic_set(&anchor->csi_complete, 0);
 
        return rc;
 }