#include <lustre_obdo.h>
#include <lustre_osc.h>
+#include <linux/pagevec.h>
#include "osc_internal.h"
ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
}
- ra->cra_rpc_size = osc_cli(osc)->cl_max_pages_per_rpc;
- ra->cra_end = cl_index(osc2cl(osc),
- dlmlock->l_policy_data.l_extent.end);
+ ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc;
+ ra->cra_end_idx = cl_index(osc2cl(osc),
+ dlmlock->l_policy_data.l_extent.end);
ra->cra_release = osc_read_ahead_release;
ra->cra_cbdata = dlmlock;
+ if (ra->cra_end_idx != CL_PAGE_EOF)
+ ra->cra_contention = true;
result = 0;
}
struct osc_object *osc = NULL; /* to keep gcc happy */
struct osc_page *opg;
struct cl_io *io;
- struct list_head list = LIST_HEAD_INIT(list);
+ LIST_HEAD(list);
struct cl_page_list *qin = &queue->c2_qin;
struct cl_page_list *qout = &queue->c2_qout;
if (crt == CRT_READ && ios->cis_io->ci_ndelay)
brw_flags |= OBD_BRW_NDELAY;
+ page = cl_page_list_first(qin);
+ if (page->cp_type == CPT_TRANSIENT)
+ brw_flags |= OBD_BRW_NOCACHE;
+
/*
* NOTE: here @page is a top-level page. This is done to avoid
* creation of sub-page-list.
struct cl_page *page;
struct cl_page *last_page;
struct osc_page *opg;
+ struct pagevec *pvec = &osc_env_info(env)->oti_pagevec;
int result = 0;
ENTRY;
}
}
+ ll_pagevec_init(pvec, 0);
+
while (qin->pl_nr > 0) {
struct osc_async_page *oap;
/* The page may be already in dirty cache. */
if (list_empty(&oap->oap_pending_item)) {
- result = osc_page_cache_add(env, &opg->ops_cl, io);
+ result = osc_page_cache_add(env, opg, io, cb);
if (result != 0)
break;
}
cl_page_list_del(env, qin, page);
- (*cb)(env, io, page);
- /* Can't access page any more. Page can be in transfer and
- * complete at any time. */
+ /* if there are no more slots, do the callback & reinit */
+ if (pagevec_add(pvec, page->cp_vmpage) == 0) {
+ (*cb)(env, io, pvec);
+ pagevec_reinit(pvec);
+ }
}
+ /* Clean up any partially full pagevecs */
+ if (pagevec_count(pvec) != 0)
+ (*cb)(env, io, pvec);
+
+ /* Can't access these pages any more. Page can be in transfer and
+ * complete at any time. */
+
/* for sync write, kernel will wait for this page to be flushed before
* osc_io_end() is called, so release it earlier.
* for mkwrite(), it's known there is no further pages. */
}
EXPORT_SYMBOL(osc_io_commit_async);
+static bool osc_import_not_healthy(struct obd_import *imp)
+{
+ return imp->imp_invalid || imp->imp_deactive ||
+ !(imp->imp_state == LUSTRE_IMP_FULL ||
+ imp->imp_state == LUSTRE_IMP_IDLE);
+}
+
int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios)
{
struct osc_object *osc = cl2osc(ios->cis_obj);
struct obd_import *imp = osc_cli(osc)->cl_import;
struct osc_io *oio = osc_env_io(env);
int rc = -EIO;
+ ENTRY;
spin_lock(&imp->imp_lock);
- if (likely(!imp->imp_invalid)) {
+ /**
+ * check whether this OSC device is available for non-delay read,
+ * fast switching mirror if we haven't tried all mirrors.
+ */
+ if (ios->cis_io->ci_type == CIT_READ && ios->cis_io->ci_ndelay &&
+ !ios->cis_io->ci_tried_all_mirrors && osc_import_not_healthy(imp)) {
+ rc = -EWOULDBLOCK;
+ } else if (likely(!imp->imp_invalid)) {
atomic_inc(&osc->oo_nr_ios);
oio->oi_is_active = 1;
rc = 0;
if (cfs_capable(CFS_CAP_SYS_RESOURCE))
oio->oi_cap_sys_resource = 1;
- return rc;
+ RETURN(rc);
}
EXPORT_SYMBOL(osc_io_iter_init);
ptlrpc_request_set_replen(req);
req->rq_interpret_reply = osc_data_version_interpret;
- CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args));
- dva = ptlrpc_req_async_args(req);
+ dva = ptlrpc_req_async_args(dva, req);
dva->dva_oio = oio;
ptlrpcd_add_req(req);