}
EXPORT_SYMBOL(osc_io_submit);
+static int __osc_dio_submit(const struct lu_env *env, struct cl_io *io,
+ const struct cl_io_slice *ios, enum cl_req_type crt,
+ struct cl_dio_pages *cdp, struct cl_2queue *queue)
+{
+ struct cl_page_list *qout = &queue->c2_qout;
+ struct cl_page_list *qin = &queue->c2_qin;
+ struct osc_object *osc = cl2osc(ios->cis_obj);
+ struct cl_io *top_io = cl_io_top(io);
+ struct client_obd *cli = osc_cli(osc);
+ struct page *vmpage;
+ struct cl_page *page;
+ struct cl_page *tmp;
+ LIST_HEAD(list);
+ /* pages per chunk bits */
+ unsigned int ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
+ unsigned int max_pages = cli->cl_max_pages_per_rpc;
+ unsigned int ppc = 1 << ppc_bits;
+ unsigned int queued = 0;
+ bool sync_queue = false;
+ int result = 0;
+ int brw_flags;
+
+ LASSERT(qin->pl_nr > 0);
+
+ CDEBUG(D_CACHE|D_READA, "%d %d\n", qin->pl_nr, crt);
+
+ brw_flags = osc_io_srvlock(cl2osc_io(env, ios)) ? OBD_BRW_SRVLOCK : 0;
+ brw_flags |= crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
+ if (crt == CRT_READ && ios->cis_io->ci_ndelay)
+ brw_flags |= OBD_BRW_NDELAY;
+
+ vmpage = cdp->cdp_pages[0];
+ brw_flags |= OBD_BRW_NOCACHE;
+ if (lnet_is_rdma_only_page(vmpage))
+ brw_flags |= OBD_BRW_RDMA_ONLY;
+
+ /*
+ * NOTE: here @page is a top-level page. This is done to avoid
+ * creation of sub-page-list.
+ */
+ cl_page_list_for_each_safe(page, tmp, qin) {
+ struct osc_async_page *oap;
+ struct osc_page *opg;
+
+ LASSERT(top_io != NULL);
+
+ opg = osc_cl_page_osc(page, osc);
+ oap = &opg->ops_oap;
+
+ osc_page_submit(env, opg, crt, brw_flags);
+ list_add_tail(&oap->oap_pending_item, &list);
+
+ cl_page_list_move(qout, qin, page);
+
+ queued++;
+ if (queued == max_pages) {
+ sync_queue = true;
+ } else if (crt == CRT_WRITE) {
+ unsigned int next_chunks;
+ unsigned int chunks;
+
+ chunks = (queued + ppc - 1) >> ppc_bits;
+ /* chunk number if add another page */
+ next_chunks = (queued + ppc) >> ppc_bits;
+
+ /* next page will excceed write chunk limit */
+ if (chunks == osc_max_write_chunks(cli) &&
+ next_chunks > chunks)
+ sync_queue = true;
+ }
+
+ if (sync_queue) {
+ result = osc_queue_sync_pages(env, top_io, osc, &list,
+ brw_flags);
+ if (result < 0)
+ break;
+ queued = 0;
+ sync_queue = false;
+ }
+ }
+
+ if (queued > 0)
+ result = osc_queue_sync_pages(env, top_io, osc, &list,
+ brw_flags);
+
+ /* Update c/mtime for sync write. LU-7310 */
+ if (crt == CRT_WRITE && qout->pl_nr > 0 && result == 0) {
+ struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+ struct cl_object *obj = ios->cis_obj;
+
+ cl_object_attr_lock(obj);
+ attr->cat_mtime = attr->cat_ctime = ktime_get_real_seconds();
+ cl_object_attr_update(env, obj, attr, CAT_MTIME | CAT_CTIME);
+ cl_object_attr_unlock(obj);
+ }
+
+ CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result);
+ return qout->pl_nr > 0 ? 0 : result;
+}
+
int osc_dio_submit(const struct lu_env *env, struct cl_io *io,
const struct cl_io_slice *ios, enum cl_req_type crt,
struct cl_dio_pages *cdp)
cl_dio_pages_2queue(cdp);
queue = &cdp->cdp_queue;
- rc = osc_io_submit(env, io, ios, crt, queue);
+ rc = __osc_dio_submit(env, io, ios, crt, cdp, queue);
/* if submit failed, no pages were sent */
LASSERT(ergo(rc != 0, list_empty(&queue->c2_qout.pl_pages)));