struct cl_sync_io;
struct cl_dio_aio;
struct cl_sub_dio;
+struct cl_dio_pages;
typedef void (cl_sync_io_end_t)(const struct lu_env *, struct cl_sync_io *);
int ioret);
int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout, int ioret);
+void cl_dio_pages_2queue(struct cl_dio_pages *ldp);
struct cl_dio_aio *cl_dio_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
bool is_aio);
struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio,
* pages, but for unaligned i/o, this is the internal buffer
*/
struct page **cdp_pages;
+
+ struct cl_page **cdp_cl_pages;
+ struct cl_2queue cdp_queue;
/** # of pages in the array. */
size_t cdp_count;
/* the file offset of the first page. */
*/
struct cl_sub_dio {
struct cl_sync_io csd_sync;
- struct cl_page_list csd_pages;
ssize_t csd_bytes;
struct cl_dio_aio *csd_ll_aio;
struct cl_dio_pages csd_dio_pages;
{
struct cl_dio_pages *cdp = &sdio->csd_dio_pages;
struct cl_sync_io *anchor = &sdio->csd_sync;
- struct cl_2queue *queue = &io->ci_queue;
struct cl_object *obj = io->ci_obj;
+ struct cl_2queue *queue = NULL;
struct cl_page *page;
int iot = rw == READ ? CRT_READ : CRT_WRITE;
loff_t offset = cdp->cdp_file_offset;
cdp->cdp_from = offset & ~PAGE_MASK;
cdp->cdp_to = (offset + size) & ~PAGE_MASK;
- cl_2queue_init(queue);
+ /* this is a special temporary allocation which lets us track the
+ * cl_pages and convert them to a list
+ *
+ * this is used in 'pushing down' the conversion to a page queue
+ */
+ OBD_ALLOC_PTR_ARRAY_LARGE(cdp->cdp_cl_pages, cdp->cdp_count);
+ if (!cdp->cdp_cl_pages)
+ GOTO(out, rc = -ENOMEM);
+
while (size > 0) {
size_t from = offset & ~PAGE_MASK;
size_t to = min(from + size, PAGE_SIZE);
*/
page->cp_inode = inode;
}
- /* We keep the refcount from cl_page_find, so we don't need
- * another one here
- */
- cl_page_list_add(&queue->c2_qin, page, false);
+ cdp->cdp_cl_pages[i] = page;
/*
* Call page clip for incomplete pages, to set range of bytes
* in the page and to tell transfer formation engine to send
LASSERT(i == cdp->cdp_count);
LASSERT(size == 0);
+ cl_dio_pages_2queue(cdp);
+ queue = &cdp->cdp_queue;
atomic_add(io_pages, &anchor->csi_sync_nr);
/*
* Avoid out-of-order execution of adding inflight
*/
smp_mb();
rc = cl_io_submit_rw(env, io, iot, queue);
+ /* pages must be off the queue when they're freed */
if (rc == 0) {
- cl_page_list_splice(&queue->c2_qout, &sdio->csd_pages);
+ while (queue->c2_qout.pl_nr > 0) {
+ page = cl_page_list_first(&queue->c2_qout);
+ cl_page_list_del(env, &queue->c2_qout, page,
+ false);
+ }
} else {
atomic_add(-queue->c2_qin.pl_nr,
&anchor->csi_sync_nr);
- cl_page_list_for_each(page, &queue->c2_qin)
+ for (i = 0; i < cdp->cdp_count; i++) {
+ page = cdp->cdp_cl_pages[i];
page->cp_sync_io = NULL;
+ }
}
/* handle partially submitted reqs */
if (queue->c2_qin.pl_nr > 0) {
}
out:
- /* if pages were not submitted successfully above, this takes care of
- * taking them off the list and removing the single reference they have
- * from when they were created
+ /* cleanup of the page array is handled by cl_sub_dio_end, so there's
+ * no work to do on error here
*/
- cl_2queue_fini(env, queue);
RETURN(rc);
}
#endif
}
+void cl_dio_pages_2queue(struct cl_dio_pages *cdp)
+{
+ int i = 0;
+
+ cl_2queue_init(&cdp->cdp_queue);
+
+ for (i = 0; i < cdp->cdp_count; i++) {
+ struct cl_page *page = cdp->cdp_cl_pages[i];
+
+ cl_page_list_add(&cdp->cdp_queue.c2_qin, page, false);
+ }
+}
+EXPORT_SYMBOL(cl_dio_pages_2queue);
+
static void cl_dio_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
{
struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync);
static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
{
struct cl_sub_dio *sdio = container_of(anchor, typeof(*sdio), csd_sync);
+ struct cl_dio_pages *cdp = &sdio->csd_dio_pages;
ssize_t ret = anchor->csi_sync_rc;
+ bool array_incomplete = false;
+ int i;
ENTRY;
- /* release pages */
- while (sdio->csd_pages.pl_nr > 0) {
- struct cl_page *page = cl_page_list_first(&sdio->csd_pages);
-
- cl_page_list_del(env, &sdio->csd_pages, page, false);
- cl_page_put(env, page);
+ if (cdp->cdp_cl_pages) {
+ for (i = 0; i < cdp->cdp_count; i++) {
+ struct cl_page *page = cdp->cdp_cl_pages[i];
+ /* if we failed allocating pages, the page array may be
+ * incomplete, so check the pointers
+ *
+ * FIXME: This extra tracking of array completeness is
+ * just a debug check and will be removed later in the
+ * series.
+ */
+ if (page)
+ cl_page_put(env, page);
+ else if (array_incomplete)
+ LASSERT(!page);
+ else
+ array_incomplete = true;
+ }
+ OBD_FREE_PTR_ARRAY_LARGE(cdp->cdp_cl_pages,
+ cdp->cdp_count);
}
if (sdio->csd_unaligned) {
*/
if (!sdio->csd_write && sdio->csd_bytes > 0)
ret = ll_dio_user_copy(sdio);
- ll_free_dio_buffer(&sdio->csd_dio_pages);
+ ll_free_dio_buffer(cdp);
/* handle the freeing here rather than in cl_sub_dio_free
* because we have the unmodified iovec pointer
*/
/* unaligned DIO does not get user pages, so it doesn't have to
* release them, but aligned I/O must
*/
- ll_release_user_pages(sdio->csd_dio_pages.cdp_pages,
- sdio->csd_dio_pages.cdp_count);
+ ll_release_user_pages(cdp->cdp_pages, cdp->cdp_count);
}
cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret);
*/
cl_sync_io_init_notify(&sdio->csd_sync, 1, sdio,
cl_sub_dio_end);
- cl_page_list_init(&sdio->csd_pages);
sdio->csd_ll_aio = ll_aio;
sdio->csd_creator_free = sync;