From 727c49e570544e19f16d1f6dd9ea5d7b7feeff78 Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Fri, 26 Sep 2014 14:45:17 -0700 Subject: [PATCH] LU-4198 clio: generalize cl_sync_io To make cl_sync_io interfaces not just wait for pages, but to be a generic synchronization mechanism. Signed-off-by: Jinshan Xiong Change-Id: Iea43cb56c463efa7a6c6d17d84dacf4bdad8b377 Reviewed-on: http://review.whamcloud.com/8656 Reviewed-by: Bobi Jam Tested-by: Jenkins Reviewed-by: Lai Siyao Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/cl_object.h | 15 +++-- lustre/obdclass/cl_io.c | 150 ++++++++++++++++++++++++--------------------- lustre/obdclass/cl_page.c | 6 +- 3 files changed, 92 insertions(+), 79 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 2e305f3..0b574da 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -3130,13 +3130,18 @@ struct cl_sync_io { atomic_t csi_barrier; /** completion to be signaled when transfer is complete. */ wait_queue_head_t csi_waitq; + /** callback to invoke when this IO is finished */ + void (*csi_end_io)(const struct lu_env *, + struct cl_sync_io *); }; -void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages); -int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io, - struct cl_page_list *queue, struct cl_sync_io *anchor, - long timeout); -void cl_sync_io_note(struct cl_sync_io *anchor, int ioret); +void cl_sync_io_init(struct cl_sync_io *anchor, int nr, + void (*end)(const struct lu_env *, struct cl_sync_io *)); +int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor, + long timeout); +void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor, + int ioret); +void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor); /** @} cl_sync_io */ diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index 3f572ee..a9945a9 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -836,41 +836,41 @@ EXPORT_SYMBOL(cl_io_submit_rw); * If \a timeout is zero, it means to wait for the IO unconditionally. */ int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io, - enum cl_req_type iot, struct cl_2queue *queue, + enum cl_req_type iot, struct cl_2queue *queue, long timeout) { - struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor; - struct cl_page *pg; - int rc; + struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor; + struct cl_page *pg; + int rc; - cl_page_list_for_each(pg, &queue->c2_qin) { - LASSERT(pg->cp_sync_io == NULL); - pg->cp_sync_io = anchor; - } + cl_page_list_for_each(pg, &queue->c2_qin) { + LASSERT(pg->cp_sync_io == NULL); + pg->cp_sync_io = anchor; + } - cl_sync_io_init(anchor, queue->c2_qin.pl_nr); + cl_sync_io_init(anchor, queue->c2_qin.pl_nr, &cl_sync_io_end); rc = cl_io_submit_rw(env, io, iot, queue); - if (rc == 0) { - /* - * If some pages weren't sent for any reason (e.g., - * read found up-to-date pages in the cache, or write found - * clean pages), count them as completed to avoid infinite - * wait. - */ - cl_page_list_for_each(pg, &queue->c2_qin) { - pg->cp_sync_io = NULL; - cl_sync_io_note(anchor, +1); - } - - /* wait for the IO to be finished. */ - rc = cl_sync_io_wait(env, io, &queue->c2_qout, - anchor, timeout); - } else { + if (rc == 0) { + /* + * If some pages weren't sent for any reason (e.g., + * read found up-to-date pages in the cache, or write found + * clean pages), count them as completed to avoid infinite + * wait. + */ + cl_page_list_for_each(pg, &queue->c2_qin) { + pg->cp_sync_io = NULL; + cl_sync_io_note(env, anchor, 1); + } + + /* wait for the IO to be finished. */ + rc = cl_sync_io_wait(env, anchor, timeout); + cl_page_list_assume(env, io, &queue->c2_qout); + } else { LASSERT(list_empty(&queue->c2_qout.pl_pages)); - cl_page_list_for_each(pg, &queue->c2_qin) - pg->cp_sync_io = NULL; - } - return rc; + cl_page_list_for_each(pg, &queue->c2_qin) + pg->cp_sync_io = NULL; + } + return rc; } EXPORT_SYMBOL(cl_io_submit_sync); @@ -1559,61 +1559,68 @@ void cl_req_attr_set(const struct lu_env *env, struct cl_req *req, } EXPORT_SYMBOL(cl_req_attr_set); +/* cl_sync_io_callback assumes the caller must call cl_sync_io_wait() to + * wait for the IO to finish. */ +void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor) +{ + wake_up_all(&anchor->csi_waitq); + + /* it's safe to nuke or reuse anchor now */ + atomic_set(&anchor->csi_barrier, 0); +} +EXPORT_SYMBOL(cl_sync_io_end); + /** - * Initialize synchronous io wait anchor, for transfer of \a nrpages pages. + * Initialize synchronous io wait anchor */ -void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages) +void cl_sync_io_init(struct cl_sync_io *anchor, int nr, + void (*end)(const struct lu_env *, struct cl_sync_io *)) { ENTRY; init_waitqueue_head(&anchor->csi_waitq); - atomic_set(&anchor->csi_sync_nr, nrpages); - atomic_set(&anchor->csi_barrier, nrpages > 0); + atomic_set(&anchor->csi_sync_nr, nr); + atomic_set(&anchor->csi_barrier, nr > 0); anchor->csi_sync_rc = 0; + anchor->csi_end_io = end; + LASSERT(end != NULL); EXIT; } EXPORT_SYMBOL(cl_sync_io_init); /** - * Wait until all transfer completes. Transfer completion routine has to call - * cl_sync_io_note() for every page. + * Wait until all IO completes. Transfer completion routine has to call + * cl_sync_io_note() for every entity. */ -int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io, - struct cl_page_list *queue, struct cl_sync_io *anchor, - long timeout) +int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor, + long timeout) { - struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout), - NULL, NULL, NULL); - int rc; - ENTRY; + struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout), + NULL, NULL, NULL); + int rc; + ENTRY; - LASSERT(timeout >= 0); + LASSERT(timeout >= 0); - rc = l_wait_event(anchor->csi_waitq, + rc = l_wait_event(anchor->csi_waitq, atomic_read(&anchor->csi_sync_nr) == 0, - &lwi); - if (rc < 0) { - CERROR("SYNC IO failed with error: %d, try to cancel " - "%d remaining pages\n", + &lwi); + if (rc < 0) { + CERROR("IO failed: %d, still wait for %d remaining entries\n", rc, atomic_read(&anchor->csi_sync_nr)); - (void)cl_io_cancel(env, io, queue); - - lwi = (struct l_wait_info) { 0 }; - (void)l_wait_event(anchor->csi_waitq, + lwi = (struct l_wait_info) { 0 }; + (void)l_wait_event(anchor->csi_waitq, atomic_read(&anchor->csi_sync_nr) == 0, - &lwi); - } else { - rc = anchor->csi_sync_rc; - } + &lwi); + } else { + rc = anchor->csi_sync_rc; + } LASSERT(atomic_read(&anchor->csi_sync_nr) == 0); - cl_page_list_assume(env, io, queue); /* wait until cl_sync_io_note() has done wakeup */ while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) { cpu_relax(); } - - POISON(anchor, 0x5a, sizeof *anchor); RETURN(rc); } EXPORT_SYMBOL(cl_sync_io_wait); @@ -1621,21 +1628,22 @@ EXPORT_SYMBOL(cl_sync_io_wait); /** * Indicate that transfer of a single page completed. */ -void cl_sync_io_note(struct cl_sync_io *anchor, int ioret) +void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor, + int ioret) { - ENTRY; - if (anchor->csi_sync_rc == 0 && ioret < 0) - anchor->csi_sync_rc = ioret; - /* - * Synchronous IO done without releasing page lock (e.g., as a part of - * ->{prepare,commit}_write(). Completion is used to signal the end of - * IO. - */ + ENTRY; + if (anchor->csi_sync_rc == 0 && ioret < 0) + anchor->csi_sync_rc = ioret; + /* + * Synchronous IO done without releasing page lock (e.g., as a part of + * ->{prepare,commit}_write(). Completion is used to signal the end of + * IO. + */ LASSERT(atomic_read(&anchor->csi_sync_nr) > 0); if (atomic_dec_and_test(&anchor->csi_sync_nr)) { - wake_up_all(&anchor->csi_waitq); - /* it's safe to nuke or reuse anchor now */ - atomic_set(&anchor->csi_barrier, 0); + LASSERT(anchor->csi_end_io != NULL); + anchor->csi_end_io(env, anchor); + /* Can't access anchor any more */ } EXIT; } diff --git a/lustre/obdclass/cl_page.c b/lustre/obdclass/cl_page.c index 7badb8b..d877717 100644 --- a/lustre/obdclass/cl_page.c +++ b/lustre/obdclass/cl_page.c @@ -963,10 +963,10 @@ void cl_page_completion(const struct lu_env *env, */ cl_page_put(env, pg); - if (anchor) - cl_sync_io_note(anchor, ioret); + if (anchor != NULL) + cl_sync_io_note(env, anchor, ioret); - EXIT; + EXIT; } EXPORT_SYMBOL(cl_page_completion); -- 1.8.3.1