Whamcloud - gitweb
LU-4198 clio: generalize cl_sync_io 56/8656/18
authorJinshan Xiong <jinshan.xiong@intel.com>
Fri, 26 Sep 2014 21:45:17 +0000 (14:45 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 31 Oct 2014 20:36:44 +0000 (20:36 +0000)
To make cl_sync_io interfaces not just wait for pages, but to be
a generic synchronization mechanism.

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: Iea43cb56c463efa7a6c6d17d84dacf4bdad8b377
Reviewed-on: http://review.whamcloud.com/8656
Reviewed-by: Bobi Jam <bobijam@gmail.com>
Tested-by: Jenkins
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/cl_object.h
lustre/obdclass/cl_io.c
lustre/obdclass/cl_page.c

index 2e305f3..0b574da 100644 (file)
@@ -3130,13 +3130,18 @@ struct cl_sync_io {
        atomic_t                csi_barrier;
        /** completion to be signaled when transfer is complete. */
        wait_queue_head_t       csi_waitq;
+       /** callback to invoke when this IO is finished */
+       void                    (*csi_end_io)(const struct lu_env *,
+                                             struct cl_sync_io *);
 };
 
-void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages);
-int  cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
-                     struct cl_page_list *queue, struct cl_sync_io *anchor,
-                     long timeout);
-void cl_sync_io_note(struct cl_sync_io *anchor, int ioret);
+void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
+                    void (*end)(const struct lu_env *, struct cl_sync_io *));
+int  cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
+                    long timeout);
+void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
+                    int ioret);
+void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor);
 
 /** @} cl_sync_io */
 
index 3f572ee..a9945a9 100644 (file)
@@ -836,41 +836,41 @@ EXPORT_SYMBOL(cl_io_submit_rw);
  * If \a timeout is zero, it means to wait for the IO unconditionally.
  */
 int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
-                      enum cl_req_type iot, struct cl_2queue *queue,
+                     enum cl_req_type iot, struct cl_2queue *queue,
                      long timeout)
 {
-        struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
-        struct cl_page *pg;
-        int rc;
+       struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
+       struct cl_page *pg;
+       int rc;
 
-        cl_page_list_for_each(pg, &queue->c2_qin) {
-                LASSERT(pg->cp_sync_io == NULL);
-                pg->cp_sync_io = anchor;
-        }
+       cl_page_list_for_each(pg, &queue->c2_qin) {
+               LASSERT(pg->cp_sync_io == NULL);
+               pg->cp_sync_io = anchor;
+       }
 
-        cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
+       cl_sync_io_init(anchor, queue->c2_qin.pl_nr, &cl_sync_io_end);
        rc = cl_io_submit_rw(env, io, iot, queue);
-        if (rc == 0) {
-                /*
-                 * If some pages weren't sent for any reason (e.g.,
-                 * read found up-to-date pages in the cache, or write found
-                 * clean pages), count them as completed to avoid infinite
-                 * wait.
-                 */
-                 cl_page_list_for_each(pg, &queue->c2_qin) {
-                        pg->cp_sync_io = NULL;
-                        cl_sync_io_note(anchor, +1);
-                 }
-
-                 /* wait for the IO to be finished. */
-                 rc = cl_sync_io_wait(env, io, &queue->c2_qout,
-                                      anchor, timeout);
-        } else {
+       if (rc == 0) {
+               /*
+                * If some pages weren't sent for any reason (e.g.,
+                * read found up-to-date pages in the cache, or write found
+                * clean pages), count them as completed to avoid infinite
+                * wait.
+                */
+               cl_page_list_for_each(pg, &queue->c2_qin) {
+                       pg->cp_sync_io = NULL;
+                       cl_sync_io_note(env, anchor, 1);
+               }
+
+               /* wait for the IO to be finished. */
+               rc = cl_sync_io_wait(env, anchor, timeout);
+               cl_page_list_assume(env, io, &queue->c2_qout);
+       } else {
                LASSERT(list_empty(&queue->c2_qout.pl_pages));
-                cl_page_list_for_each(pg, &queue->c2_qin)
-                        pg->cp_sync_io = NULL;
-        }
-        return rc;
+               cl_page_list_for_each(pg, &queue->c2_qin)
+                       pg->cp_sync_io = NULL;
+       }
+       return rc;
 }
 EXPORT_SYMBOL(cl_io_submit_sync);
 
@@ -1559,61 +1559,68 @@ void cl_req_attr_set(const struct lu_env *env, struct cl_req *req,
 }
 EXPORT_SYMBOL(cl_req_attr_set);
 
+/* cl_sync_io_callback assumes the caller must call cl_sync_io_wait() to
+ * wait for the IO to finish. */
+void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor)
+{
+       wake_up_all(&anchor->csi_waitq);
+
+       /* it's safe to nuke or reuse anchor now */
+       atomic_set(&anchor->csi_barrier, 0);
+}
+EXPORT_SYMBOL(cl_sync_io_end);
+
 /**
- * Initialize synchronous io wait anchor, for transfer of \a nrpages pages.
+ * Initialize synchronous io wait anchor
  */
-void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages)
+void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
+                    void (*end)(const struct lu_env *, struct cl_sync_io *))
 {
        ENTRY;
        init_waitqueue_head(&anchor->csi_waitq);
-       atomic_set(&anchor->csi_sync_nr, nrpages);
-       atomic_set(&anchor->csi_barrier, nrpages > 0);
+       atomic_set(&anchor->csi_sync_nr, nr);
+       atomic_set(&anchor->csi_barrier, nr > 0);
        anchor->csi_sync_rc = 0;
+       anchor->csi_end_io = end;
+       LASSERT(end != NULL);
        EXIT;
 }
 EXPORT_SYMBOL(cl_sync_io_init);
 
 /**
- * Wait until all transfer completes. Transfer completion routine has to call
- * cl_sync_io_note() for every page.
+ * Wait until all IO completes. Transfer completion routine has to call
+ * cl_sync_io_note() for every entity.
  */
-int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
-                    struct cl_page_list *queue, struct cl_sync_io *anchor,
-                    long timeout)
+int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
+                   long timeout)
 {
-        struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
-                                                  NULL, NULL, NULL);
-        int rc;
-        ENTRY;
+       struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
+                                                 NULL, NULL, NULL);
+       int rc;
+       ENTRY;
 
-        LASSERT(timeout >= 0);
+       LASSERT(timeout >= 0);
 
-        rc = l_wait_event(anchor->csi_waitq,
+       rc = l_wait_event(anchor->csi_waitq,
                          atomic_read(&anchor->csi_sync_nr) == 0,
-                          &lwi);
-        if (rc < 0) {
-                CERROR("SYNC IO failed with error: %d, try to cancel "
-                       "%d remaining pages\n",
+                         &lwi);
+       if (rc < 0) {
+               CERROR("IO failed: %d, still wait for %d remaining entries\n",
                       rc, atomic_read(&anchor->csi_sync_nr));
 
-                (void)cl_io_cancel(env, io, queue);
-
-                lwi = (struct l_wait_info) { 0 };
-                (void)l_wait_event(anchor->csi_waitq,
+               lwi = (struct l_wait_info) { 0 };
+               (void)l_wait_event(anchor->csi_waitq,
                                   atomic_read(&anchor->csi_sync_nr) == 0,
-                                   &lwi);
-        } else {
-                rc = anchor->csi_sync_rc;
-        }
+                                  &lwi);
+       } else {
+               rc = anchor->csi_sync_rc;
+       }
        LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
-        cl_page_list_assume(env, io, queue);
 
        /* wait until cl_sync_io_note() has done wakeup */
        while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) {
                cpu_relax();
        }
-
-       POISON(anchor, 0x5a, sizeof *anchor);
        RETURN(rc);
 }
 EXPORT_SYMBOL(cl_sync_io_wait);
@@ -1621,21 +1628,22 @@ EXPORT_SYMBOL(cl_sync_io_wait);
 /**
  * Indicate that transfer of a single page completed.
  */
-void cl_sync_io_note(struct cl_sync_io *anchor, int ioret)
+void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
+                    int ioret)
 {
-        ENTRY;
-        if (anchor->csi_sync_rc == 0 && ioret < 0)
-                anchor->csi_sync_rc = ioret;
-        /*
-         * Synchronous IO done without releasing page lock (e.g., as a part of
-         * ->{prepare,commit}_write(). Completion is used to signal the end of
-         * IO.
-         */
+       ENTRY;
+       if (anchor->csi_sync_rc == 0 && ioret < 0)
+               anchor->csi_sync_rc = ioret;
+       /*
+        * Synchronous IO done without releasing page lock (e.g., as a part of
+        * ->{prepare,commit}_write(). Completion is used to signal the end of
+        * IO.
+        */
        LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
        if (atomic_dec_and_test(&anchor->csi_sync_nr)) {
-               wake_up_all(&anchor->csi_waitq);
-               /* it's safe to nuke or reuse anchor now */
-               atomic_set(&anchor->csi_barrier, 0);
+               LASSERT(anchor->csi_end_io != NULL);
+               anchor->csi_end_io(env, anchor);
+               /* Can't access anchor any more */
        }
        EXIT;
 }
index 7badb8b..d877717 100644 (file)
@@ -963,10 +963,10 @@ void cl_page_completion(const struct lu_env *env,
         */
        cl_page_put(env, pg);
 
-       if (anchor)
-                cl_sync_io_note(anchor, ioret);
+       if (anchor != NULL)
+               cl_sync_io_note(env, anchor, ioret);
 
-        EXIT;
+       EXIT;
 }
 EXPORT_SYMBOL(cl_page_completion);