}
+struct cl_dio_pages;
+
/**
* Per-layer io operations.
* \see vvp_io_ops, lov_io_ops, lovsub_io_ops, osc_io_ops
struct cl_io *io,
const struct cl_io_slice *slice,
enum cl_req_type crt, struct cl_2queue *queue);
+ /* the dio version of cio_submit, this either submits all pages
+ * successfully or fails. Uses an array, rather than a queue.
+ */
+ int (*cio_dio_submit)(const struct lu_env *env,
+ struct cl_io *io,
+ const struct cl_io_slice *slice,
+ enum cl_req_type crt,
+ struct cl_dio_pages *cdp);
/**
* Queue async page for write.
* The difference between cio_submit and cio_queue is that
RETURN(0);
}
+static int lov_dio_submit(const struct lu_env *env,
+ struct cl_io *io,
+ const struct cl_io_slice *ios,
+ enum cl_req_type crt, struct cl_dio_pages *cdp)
+{
+ struct cl_page_list *plist = &lov_env_info(env)->lti_plist;
+ struct lov_io *lio = cl2lov_io(env, ios);
+ struct cl_2queue *queue;
+ struct cl_page *page;
+ struct cl_page_list *qin;
+ struct lov_io_sub *sub;
+ struct cl_page *tmp;
+ bool dio = false;
+ int index;
+ int rc = 0;
+ ENTRY;
+
+ cl_dio_pages_2queue(cdp);
+ queue = &cdp->cdp_queue;
+
+ qin = &queue->c2_qin;
+ page = cl_page_list_first(qin);
+
+ if (page->cp_type == CPT_TRANSIENT)
+ dio = true;
+
+ cl_page_list_init(plist);
+ while (qin->pl_nr > 0) {
+ struct cl_2queue *cl2q = &lov_env_info(env)->lti_cl2q;
+
+ page = cl_page_list_first(qin);
+ if (lov_page_is_empty(page)) {
+ cl_page_list_move(&queue->c2_qout, qin, page);
+
+ /*
+ * it could only be mirror read to get here therefore
+ * the pages will be transient. We don't care about
+ * the return code of cl_page_prep() at all.
+ */
+ LASSERT(page->cp_type == CPT_TRANSIENT);
+ cl_page_completion(env, page, crt, 0);
+ continue;
+ }
+
+ cl_2queue_init(cl2q);
+ cl_page_list_move(&cl2q->c2_qin, qin, page);
+
+ index = page->cp_lov_index;
+ /* DIO is already split by stripe */
+ if (!dio) {
+ cl_page_list_for_each_safe(page, tmp, qin) {
+ /* this page is not on this stripe */
+ if (index != page->cp_lov_index)
+ continue;
+
+ cl_page_list_move(&cl2q->c2_qin, qin, page);
+ }
+ } else {
+ cl_page_list_splice(qin, &cl2q->c2_qin);
+ }
+
+ sub = lov_sub_get(env, lio, index);
+ if (!IS_ERR(sub)) {
+ rc = cl_io_submit_rw(sub->sub_env, &sub->sub_io,
+ crt, cl2q);
+ } else {
+ rc = PTR_ERR(sub);
+ }
+
+ cl_page_list_splice(&cl2q->c2_qin, plist);
+ cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
+ cl_2queue_fini(env, cl2q);
+
+ if (rc != 0)
+ break;
+ }
+
+ cl_page_list_splice(plist, qin);
+ cl_page_list_fini(env, plist);
+
+ /* if submit failed, no pages were sent */
+ LASSERT(ergo(rc != 0, list_empty(&queue->c2_qout.pl_pages)));
+ while (queue->c2_qout.pl_nr > 0) {
+ struct cl_page *page;
+
+ page = cl_page_list_first(&queue->c2_qout);
+ cl_page_list_del(env, &queue->c2_qout, page, false);
+ }
+
+ RETURN(rc);
+}
+
/**
* lov implementation of cl_operations::cio_submit() method. It takes a list
* of pages in \a queue, splits it into per-stripe sub-lists, invokes
.cio_read_ahead = lov_io_read_ahead,
.cio_lru_reserve = lov_io_lru_reserve,
.cio_submit = lov_io_submit,
+ .cio_dio_submit = lov_dio_submit,
.cio_commit_async = lov_io_commit_async,
};
EXIT;
}
+static int lov_empty_dio_submit(const struct lu_env *env,
+ struct cl_io *io,
+ const struct cl_io_slice *ios,
+ enum cl_req_type crt, struct cl_dio_pages *cdp)
+{
+ return -EBADF;
+}
+
static int lov_empty_io_submit(const struct lu_env *env,
struct cl_io *io,
const struct cl_io_slice *ios,
}
},
.cio_submit = lov_empty_io_submit,
+ .cio_dio_submit = lov_empty_dio_submit,
.cio_commit_async = LOV_EMPTY_IMPOSSIBLE
};
enum cl_req_type crt, struct cl_dio_pages *cdp)
{
const struct cl_io_slice *scan;
- struct cl_2queue *queue;
int result = 0;
ENTRY;
- cl_dio_pages_2queue(cdp);
- queue = &cdp->cdp_queue;
-
list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
- if (scan->cis_iop->cio_submit == NULL)
+ if (scan->cis_iop->cio_dio_submit == NULL)
continue;
- result = scan->cis_iop->cio_submit(env, io, scan, crt, queue);
+ result = scan->cis_iop->cio_dio_submit(env, io, scan, crt,
+ cdp);
if (result != 0)
break;
}
- /*
- * If ->cio_submit() failed, no pages were sent.
- */
- LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
- while (queue->c2_qout.pl_nr > 0) {
- struct cl_page *page;
-
- page = cl_page_list_first(&queue->c2_qout);
- cl_page_list_del(env, &queue->c2_qout, page, false);
- }
RETURN(result);
}
EXPORT_SYMBOL(cl_dio_submit_rw);