* If \a timeout is zero, it means to wait for the IO unconditionally.
*/
int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
- enum cl_req_type iot, struct cl_2queue *queue,
+ enum cl_req_type iot, struct cl_2queue *queue,
long timeout)
{
- struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
- struct cl_page *pg;
- int rc;
+ struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
+ struct cl_page *pg;
+ int rc;
- cl_page_list_for_each(pg, &queue->c2_qin) {
- LASSERT(pg->cp_sync_io == NULL);
- pg->cp_sync_io = anchor;
- }
+ cl_page_list_for_each(pg, &queue->c2_qin) {
+ LASSERT(pg->cp_sync_io == NULL);
+ pg->cp_sync_io = anchor;
+ }
- cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
+ cl_sync_io_init(anchor, queue->c2_qin.pl_nr, &cl_sync_io_end);
rc = cl_io_submit_rw(env, io, iot, queue);
- if (rc == 0) {
- /*
- * If some pages weren't sent for any reason (e.g.,
- * read found up-to-date pages in the cache, or write found
- * clean pages), count them as completed to avoid infinite
- * wait.
- */
- cl_page_list_for_each(pg, &queue->c2_qin) {
- pg->cp_sync_io = NULL;
- cl_sync_io_note(anchor, +1);
- }
-
- /* wait for the IO to be finished. */
- rc = cl_sync_io_wait(env, io, &queue->c2_qout,
- anchor, timeout);
- } else {
+ if (rc == 0) {
+ /*
+ * If some pages weren't sent for any reason (e.g.,
+ * read found up-to-date pages in the cache, or write found
+ * clean pages), count them as completed to avoid infinite
+ * wait.
+ */
+ cl_page_list_for_each(pg, &queue->c2_qin) {
+ pg->cp_sync_io = NULL;
+ cl_sync_io_note(env, anchor, 1);
+ }
+
+ /* wait for the IO to be finished. */
+ rc = cl_sync_io_wait(env, anchor, timeout);
+ cl_page_list_assume(env, io, &queue->c2_qout);
+ } else {
LASSERT(list_empty(&queue->c2_qout.pl_pages));
- cl_page_list_for_each(pg, &queue->c2_qin)
- pg->cp_sync_io = NULL;
- }
- return rc;
+ cl_page_list_for_each(pg, &queue->c2_qin)
+ pg->cp_sync_io = NULL;
+ }
+ return rc;
}
EXPORT_SYMBOL(cl_io_submit_sync);
}
EXPORT_SYMBOL(cl_req_attr_set);
+/* cl_sync_io_callback assumes the caller must call cl_sync_io_wait() to
+ * wait for the IO to finish. */
+void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor)
+{
+ wake_up_all(&anchor->csi_waitq);
+
+ /* it's safe to nuke or reuse anchor now */
+ atomic_set(&anchor->csi_barrier, 0);
+}
+EXPORT_SYMBOL(cl_sync_io_end);
+
/**
- * Initialize synchronous io wait anchor, for transfer of \a nrpages pages.
+ * Initialize synchronous io wait anchor
*/
-void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages)
+void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
+ void (*end)(const struct lu_env *, struct cl_sync_io *))
{
ENTRY;
init_waitqueue_head(&anchor->csi_waitq);
- atomic_set(&anchor->csi_sync_nr, nrpages);
- atomic_set(&anchor->csi_barrier, nrpages > 0);
+ atomic_set(&anchor->csi_sync_nr, nr);
+ atomic_set(&anchor->csi_barrier, nr > 0);
anchor->csi_sync_rc = 0;
+ anchor->csi_end_io = end;
+ LASSERT(end != NULL);
EXIT;
}
EXPORT_SYMBOL(cl_sync_io_init);
/**
- * Wait until all transfer completes. Transfer completion routine has to call
- * cl_sync_io_note() for every page.
+ * Wait until all IO completes. Transfer completion routine has to call
+ * cl_sync_io_note() for every entity.
*/
-int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue, struct cl_sync_io *anchor,
- long timeout)
+int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
+ long timeout)
{
- struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
- NULL, NULL, NULL);
- int rc;
- ENTRY;
+ struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
+ NULL, NULL, NULL);
+ int rc;
+ ENTRY;
- LASSERT(timeout >= 0);
+ LASSERT(timeout >= 0);
- rc = l_wait_event(anchor->csi_waitq,
+ rc = l_wait_event(anchor->csi_waitq,
atomic_read(&anchor->csi_sync_nr) == 0,
- &lwi);
- if (rc < 0) {
- CERROR("SYNC IO failed with error: %d, try to cancel "
- "%d remaining pages\n",
+ &lwi);
+ if (rc < 0) {
+ CERROR("IO failed: %d, still wait for %d remaining entries\n",
rc, atomic_read(&anchor->csi_sync_nr));
- (void)cl_io_cancel(env, io, queue);
-
- lwi = (struct l_wait_info) { 0 };
- (void)l_wait_event(anchor->csi_waitq,
+ lwi = (struct l_wait_info) { 0 };
+ (void)l_wait_event(anchor->csi_waitq,
atomic_read(&anchor->csi_sync_nr) == 0,
- &lwi);
- } else {
- rc = anchor->csi_sync_rc;
- }
+ &lwi);
+ } else {
+ rc = anchor->csi_sync_rc;
+ }
LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
- cl_page_list_assume(env, io, queue);
/* wait until cl_sync_io_note() has done wakeup */
while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) {
cpu_relax();
}
-
- POISON(anchor, 0x5a, sizeof *anchor);
RETURN(rc);
}
EXPORT_SYMBOL(cl_sync_io_wait);
/**
* Indicate that transfer of a single page completed.
*/
-void cl_sync_io_note(struct cl_sync_io *anchor, int ioret)
+void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
+ int ioret)
{
- ENTRY;
- if (anchor->csi_sync_rc == 0 && ioret < 0)
- anchor->csi_sync_rc = ioret;
- /*
- * Synchronous IO done without releasing page lock (e.g., as a part of
- * ->{prepare,commit}_write(). Completion is used to signal the end of
- * IO.
- */
+ ENTRY;
+ if (anchor->csi_sync_rc == 0 && ioret < 0)
+ anchor->csi_sync_rc = ioret;
+ /*
+ * Synchronous IO done without releasing page lock (e.g., as a part of
+ * ->{prepare,commit}_write(). Completion is used to signal the end of
+ * IO.
+ */
LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
if (atomic_dec_and_test(&anchor->csi_sync_nr)) {
- wake_up_all(&anchor->csi_waitq);
- /* it's safe to nuke or reuse anchor now */
- atomic_set(&anchor->csi_barrier, 0);
+ LASSERT(anchor->csi_end_io != NULL);
+ anchor->csi_end_io(env, anchor);
+ /* Can't access anchor any more */
}
EXIT;
}