memset(anchor, 0, sizeof(*anchor));
init_waitqueue_head(&anchor->csi_waitq);
atomic_set(&anchor->csi_sync_nr, nr);
+ atomic_set(&anchor->csi_complete, 0);
anchor->csi_sync_rc = 0;
anchor->csi_end_io = end;
anchor->csi_dio_aio = dio_aio;
if (timeout > 0 &&
wait_event_idle_timeout(anchor->csi_waitq,
- atomic_read(&anchor->csi_sync_nr) == 0,
+ atomic_read(&anchor->csi_complete) == 1,
cfs_time_seconds(timeout)) == 0) {
rc = -ETIMEDOUT;
CERROR("IO failed: %d, still wait for %d remaining entries\n",
- rc, atomic_read(&anchor->csi_sync_nr));
+ rc, atomic_read(&anchor->csi_complete));
}
wait_event_idle(anchor->csi_waitq,
- atomic_read(&anchor->csi_sync_nr) == 0);
+ atomic_read(&anchor->csi_complete) == 1);
if (!rc)
rc = anchor->csi_sync_rc;
/* We take the lock to ensure that cl_sync_io_note() has finished */
spin_lock(&anchor->csi_waitq.lock);
LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
+ LASSERT(atomic_read(&anchor->csi_complete) == 1);
spin_unlock(&anchor->csi_waitq.lock);
RETURN(rc);
* IO.
*/
LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
+ LASSERT(atomic_read(&anchor->csi_complete) == 0);
if (atomic_dec_and_lock(&anchor->csi_sync_nr,
&anchor->csi_waitq.lock)) {
struct cl_sub_dio *sub_dio_aio = NULL;
cl_sync_io_end_t *end_io = anchor->csi_end_io;
+ spin_unlock(&anchor->csi_waitq.lock);
+ /* we cannot do end_io while holding a spin lock, because
+ * end_io may sleep
+ */
+ if (end_io)
+ end_io(env, anchor);
+
+ spin_lock(&anchor->csi_waitq.lock);
+ /* this tells the waiters we've completed, and can only be set
+ * after end_io() has been called and while we're holding the
+ * spinlock
+ */
+ atomic_set(&anchor->csi_complete, 1);
/*
* Holding the lock across both the decrement and
* the wakeup ensures cl_sync_io_wait() doesn't complete
* completes.
*/
wake_up_locked(&anchor->csi_waitq);
- if (end_io)
- end_io(env, anchor);
csi_dio_aio = anchor->csi_dio_aio;
sub_dio_aio = csi_dio_aio;
* reused we assume it as 1 before using.
*/
atomic_add(1, &anchor->csi_sync_nr);
+ /* we must also set this anchor as incomplete */
+ atomic_set(&anchor->csi_complete, 0);
return rc;
}