Whamcloud - gitweb
LU-15811 llite: Rework upper/lower DIO/AIO
[fs/lustre-release.git] / lustre / obdclass / cl_io.c
index dd6b66c..60aabf2 100644 (file)
@@ -1147,7 +1147,7 @@ EXPORT_SYMBOL(cl_req_attr_set);
  */
 
 void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
-                           struct cl_dio_aio *aio, cl_sync_io_end_t *end)
+                           void *dio_aio, cl_sync_io_end_t *end)
 {
        ENTRY;
        memset(anchor, 0, sizeof(*anchor));
@@ -1155,7 +1155,7 @@ void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
        atomic_set(&anchor->csi_sync_nr, nr);
        anchor->csi_sync_rc = 0;
        anchor->csi_end_io = end;
-       anchor->csi_aio = aio;
+       anchor->csi_dio_aio = dio_aio;
        EXIT;
 }
 EXPORT_SYMBOL(cl_sync_io_init_notify);
@@ -1203,35 +1203,43 @@ static inline void aio_complete(struct kiocb *iocb, ssize_t res, ssize_t res2)
 }
 #endif
 
-static void cl_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
+static void cl_dio_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
 {
        struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync);
        ssize_t ret = anchor->csi_sync_rc;
 
        ENTRY;
 
+       if (!aio->cda_no_aio_complete)
+               aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
+
+       EXIT;
+}
+
+static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
+{
+       struct cl_sub_dio *sdio = container_of(anchor, typeof(*sdio), csd_sync);
+       ssize_t ret = anchor->csi_sync_rc;
+
+       ENTRY;
+
        /* release pages */
-       while (aio->cda_pages.pl_nr > 0) {
-               struct cl_page *page = cl_page_list_first(&aio->cda_pages);
+       while (sdio->csd_pages.pl_nr > 0) {
+               struct cl_page *page = cl_page_list_first(&sdio->csd_pages);
 
                cl_page_delete(env, page);
-               cl_page_list_del(env, &aio->cda_pages, page);
+               cl_page_list_del(env, &sdio->csd_pages, page);
        }
 
-       if (!aio->cda_no_aio_complete)
-               aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes, 0);
-
-       if (aio->cda_ll_aio) {
-               ll_release_user_pages(aio->cda_dio_pages.ldp_pages,
-                                     aio->cda_dio_pages.ldp_count);
-               cl_sync_io_note(env, &aio->cda_ll_aio->cda_sync, ret);
-       }
+       ll_release_user_pages(sdio->csd_dio_pages.ldp_pages,
+                             sdio->csd_dio_pages.ldp_count);
+       cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret);
 
        EXIT;
 }
 
-struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
-                               struct cl_dio_aio *ll_aio)
+struct cl_dio_aio *cl_dio_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
+                                   bool is_aio)
 {
        struct cl_dio_aio *aio;
 
@@ -1241,47 +1249,63 @@ struct cl_dio_aio *cl_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
                 * Hold one ref so that it won't be released until
                 * every pages is added.
                 */
-               cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_aio_end);
-               cl_page_list_init(&aio->cda_pages);
+               cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_dio_aio_end);
                aio->cda_iocb = iocb;
-               if (is_sync_kiocb(iocb) || ll_aio)
-                       aio->cda_no_aio_complete = 1;
-               else
-                       aio->cda_no_aio_complete = 0;
-               /* in the case of a lower level aio struct (ll_aio is set), or
-                * true AIO (!is_sync_kiocb()), the memory is freed by
-                * the daemons calling cl_sync_io_note, because they are the
-                * last users of the aio struct
+               aio->cda_no_aio_complete = !is_aio;
+               /* if this is true AIO, the memory is freed by the last call
+                * to cl_sync_io_note (when all the I/O is complete), because
+                * no one is waiting (in the kernel) for this to complete
                 *
                 * in other cases, the last user is cl_sync_io_wait, and in
-                * that case, the caller frees the aio struct after that call
-                * completes
+                * that case, the caller frees the struct after that call
                 */
-               if (ll_aio || !is_sync_kiocb(iocb))
-                       aio->cda_no_aio_free = 0;
-               else
-                       aio->cda_no_aio_free = 1;
+               aio->cda_no_sub_free = !is_aio;
 
                cl_object_get(obj);
                aio->cda_obj = obj;
-               aio->cda_ll_aio = ll_aio;
-
-               if (ll_aio)
-                       atomic_add(1,  &ll_aio->cda_sync.csi_sync_nr);
        }
        return aio;
 }
-EXPORT_SYMBOL(cl_aio_alloc);
+EXPORT_SYMBOL(cl_dio_aio_alloc);
 
-void cl_aio_free(const struct lu_env *env, struct cl_dio_aio *aio)
+struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio, bool nofree)
 {
-       if (aio) {
+       struct cl_sub_dio *sdio;
+
+       OBD_SLAB_ALLOC_PTR_GFP(sdio, cl_sub_dio_kmem, GFP_NOFS);
+       if (sdio != NULL) {
+               /*
+                * Hold one ref so that it won't be released until
+                * every pages is added.
+                */
+               cl_sync_io_init_notify(&sdio->csd_sync, 1, sdio,
+                                      cl_sub_dio_end);
+               cl_page_list_init(&sdio->csd_pages);
+
+               sdio->csd_ll_aio = ll_aio;
+               atomic_add(1,  &ll_aio->cda_sync.csi_sync_nr);
+               sdio->csd_no_free = nofree;
+       }
+       return sdio;
+}
+EXPORT_SYMBOL(cl_sub_dio_alloc);
+
+void cl_dio_aio_free(const struct lu_env *env, struct cl_dio_aio *aio,
+                    bool always_free)
+{
+       if (aio && (!aio->cda_no_sub_free || always_free)) {
                cl_object_put(env, aio->cda_obj);
                OBD_SLAB_FREE_PTR(aio, cl_dio_aio_kmem);
        }
 }
-EXPORT_SYMBOL(cl_aio_free);
+EXPORT_SYMBOL(cl_dio_aio_free);
 
+void cl_sub_dio_free(struct cl_sub_dio *sdio, bool always_free)
+{
+       if (sdio && (!sdio->csd_no_free || always_free))
+               OBD_SLAB_FREE_PTR(sdio, cl_sub_dio_kmem);
+}
+EXPORT_SYMBOL(cl_sub_dio_free);
 /*
  * ll_release_user_pages - tear down page struct array
  * @pages: array of page struct pointers underlying target buffer
@@ -1327,7 +1351,7 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
        LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
        if (atomic_dec_and_lock(&anchor->csi_sync_nr,
                                &anchor->csi_waitq.lock)) {
-               struct cl_dio_aio *aio = NULL;
+               void *dio_aio = NULL;
 
                cl_sync_io_end_t *end_io = anchor->csi_end_io;
 
@@ -1343,30 +1367,29 @@ void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
                if (end_io)
                        end_io(env, anchor);
 
-               aio = anchor->csi_aio;
+               dio_aio = anchor->csi_dio_aio;
 
                spin_unlock(&anchor->csi_waitq.lock);
 
-               if (aio && !aio->cda_no_aio_free)
-                       cl_aio_free(env, aio);
+               if (dio_aio) {
+                       if (end_io == cl_dio_aio_end)
+                               cl_dio_aio_free(env,
+                                               (struct cl_dio_aio *) dio_aio,
+                                               false);
+                       else if (end_io == cl_sub_dio_end)
+                               cl_sub_dio_free((struct cl_sub_dio *) dio_aio,
+                                               false);
+               }
        }
        EXIT;
 }
 EXPORT_SYMBOL(cl_sync_io_note);
 
-
 int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
                            long timeout, int ioret)
 {
-       bool no_aio_free = anchor->csi_aio->cda_no_aio_free;
        int rc = 0;
 
-       /* for true AIO, the daemons running cl_sync_io_note would normally
-        * free the aio struct, but if we're waiting on it, we need them to not
-        * do that.  This ensures the aio is not freed when we drop the
-        * reference count to zero in cl_sync_io_note below
-        */
-       anchor->csi_aio->cda_no_aio_free = 1;
        /*
         * @anchor was inited as 1 to prevent end_io to be
         * called before we add all pages for IO, so drop
@@ -1386,8 +1409,6 @@ int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
         */
        atomic_add(1, &anchor->csi_sync_nr);
 
-       anchor->csi_aio->cda_no_aio_free = no_aio_free;
-
        return rc;
 }
 EXPORT_SYMBOL(cl_sync_io_wait_recycle);