#include <lustre_obdo.h>
#include <lustre_osc.h>
+#include <linux/pagevec.h>
+#include <linux/falloc.h>
#include "osc_internal.h"
ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
}
- ra->cra_rpc_size = osc_cli(osc)->cl_max_pages_per_rpc;
- ra->cra_end = cl_index(osc2cl(osc),
- dlmlock->l_policy_data.l_extent.end);
+ ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc;
+ ra->cra_end_idx = cl_index(osc2cl(osc),
+ dlmlock->l_policy_data.l_extent.end);
ra->cra_release = osc_read_ahead_release;
ra->cra_cbdata = dlmlock;
+ if (ra->cra_end_idx != CL_PAGE_EOF)
+ ra->cra_contention = true;
result = 0;
}
struct osc_object *osc = NULL; /* to keep gcc happy */
struct osc_page *opg;
struct cl_io *io;
- struct list_head list = LIST_HEAD_INIT(list);
+ LIST_HEAD(list);
struct cl_page_list *qin = &queue->c2_qin;
struct cl_page_list *qout = &queue->c2_qout;
if (crt == CRT_READ && ios->cis_io->ci_ndelay)
brw_flags |= OBD_BRW_NDELAY;
+ page = cl_page_list_first(qin);
+ if (page->cp_type == CPT_TRANSIENT)
+ brw_flags |= OBD_BRW_NOCACHE;
+
/*
* NOTE: here @page is a top-level page. This is done to avoid
* creation of sub-page-list.
struct cl_page *page;
struct cl_page *last_page;
struct osc_page *opg;
+ struct pagevec *pvec = &osc_env_info(env)->oti_pagevec;
int result = 0;
ENTRY;
}
}
+ ll_pagevec_init(pvec, 0);
+
while (qin->pl_nr > 0) {
struct osc_async_page *oap;
/* The page may be already in dirty cache. */
if (list_empty(&oap->oap_pending_item)) {
- result = osc_page_cache_add(env, &opg->ops_cl, io);
+ result = osc_page_cache_add(env, opg, io, cb);
if (result != 0)
break;
}
cl_page_list_del(env, qin, page);
- (*cb)(env, io, page);
- /* Can't access page any more. Page can be in transfer and
- * complete at any time. */
+ /* if there are no more slots, do the callback & reinit */
+ if (pagevec_add(pvec, page->cp_vmpage) == 0) {
+ (*cb)(env, io, pvec);
+ pagevec_reinit(pvec);
+ }
}
+ /* Clean up any partially full pagevecs */
+ if (pagevec_count(pvec) != 0)
+ (*cb)(env, io, pvec);
+
+ /* Can't access these pages any more. Page can be in transfer and
+ * complete at any time. */
+
/* for sync write, kernel will wait for this page to be flushed before
* osc_io_end() is called, so release it earlier.
* for mkwrite(), it's known there is no further pages. */
}
EXPORT_SYMBOL(osc_io_commit_async);
+static bool osc_import_not_healthy(struct obd_import *imp)
+{
+ return imp->imp_invalid || imp->imp_deactive ||
+ !(imp->imp_state == LUSTRE_IMP_FULL ||
+ imp->imp_state == LUSTRE_IMP_IDLE);
+}
+
int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios)
{
struct osc_object *osc = cl2osc(ios->cis_obj);
struct obd_import *imp = osc_cli(osc)->cl_import;
+ struct osc_io *oio = osc_env_io(env);
int rc = -EIO;
+ ENTRY;
spin_lock(&imp->imp_lock);
- if (likely(!imp->imp_invalid)) {
- struct osc_io *oio = osc_env_io(env);
-
+ /**
+ * check whether this OSC device is available for non-delay read,
+ * fast switching mirror if we haven't tried all mirrors.
+ */
+ if (ios->cis_io->ci_type == CIT_READ && ios->cis_io->ci_ndelay &&
+ !ios->cis_io->ci_tried_all_mirrors && osc_import_not_healthy(imp)) {
+ rc = -EWOULDBLOCK;
+ } else if (likely(!imp->imp_invalid)) {
atomic_inc(&osc->oo_nr_ios);
oio->oi_is_active = 1;
rc = 0;
}
spin_unlock(&imp->imp_lock);
- return rc;
+ if (cfs_capable(CFS_CAP_SYS_RESOURCE))
+ oio->oi_cap_sys_resource = 1;
+
+ RETURN(rc);
}
EXPORT_SYMBOL(osc_io_iter_init);
-int osc_io_write_iter_init(const struct lu_env *env,
- const struct cl_io_slice *ios)
+int osc_io_rw_iter_init(const struct lu_env *env,
+ const struct cl_io_slice *ios)
{
struct cl_io *io = ios->cis_io;
struct osc_io *oio = osc_env_io(env);
RETURN(osc_io_iter_init(env, ios));
}
-EXPORT_SYMBOL(osc_io_write_iter_init);
+EXPORT_SYMBOL(osc_io_rw_iter_init);
void osc_io_iter_fini(const struct lu_env *env,
const struct cl_io_slice *ios)
}
EXPORT_SYMBOL(osc_io_iter_fini);
-void osc_io_write_iter_fini(const struct lu_env *env,
- const struct cl_io_slice *ios)
+void osc_io_rw_iter_fini(const struct lu_env *env,
+ const struct cl_io_slice *ios)
{
struct osc_io *oio = osc_env_io(env);
struct osc_object *osc = cl2osc(ios->cis_obj);
osc_io_iter_fini(env, ios);
}
-EXPORT_SYMBOL(osc_io_write_iter_fini);
+EXPORT_SYMBOL(osc_io_rw_iter_fini);
int osc_io_fault_start(const struct lu_env *env, const struct cl_io_slice *ios)
{
/**
* Checks that there are no pages being written in the extent being truncated.
*/
-static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
+static bool trunc_check_cb(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops , void *cbdata)
{
struct cl_page *page = ops->ops_cl.cpl_page;
CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
- return CLP_GANG_OKAY;
+ return true;
}
static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
static int osc_io_setattr_start(const struct lu_env *env,
const struct cl_io_slice *slice)
{
- struct cl_io *io = slice->cis_io;
- struct osc_io *oio = cl2osc_io(env, slice);
- struct cl_object *obj = slice->cis_obj;
- struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
- struct cl_attr *attr = &osc_env_info(env)->oti_attr;
- struct obdo *oa = &oio->oi_oa;
+ struct cl_io *io = slice->cis_io;
+ struct osc_io *oio = cl2osc_io(env, slice);
+ struct cl_object *obj = slice->cis_obj;
+ struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
+ struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+ struct obdo *oa = &oio->oi_oa;
struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
- __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
unsigned int ia_avalid = io->u.ci_setattr.sa_avalid;
enum op_xvalid ia_xvalid = io->u.ci_setattr.sa_xvalid;
int result = 0;
+ __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
+ __u64 end = OBD_OBJECT_EOF;
+ bool io_is_falloc = false;
ENTRY;
/* truncate cache dirty pages first */
- if (cl_io_is_trunc(io))
+ if (cl_io_is_trunc(io)) {
result = osc_cache_truncate_start(env, cl2osc(obj), size,
&oio->oi_trunc);
+ } else if (cl_io_is_fallocate(io)) {
+ io_is_falloc = true;
+ size = io->u.ci_setattr.sa_falloc_offset;
+ end = io->u.ci_setattr.sa_falloc_end;
+ }
if (result == 0 && oio->oi_lockless == 0) {
cl_object_attr_lock(obj);
oa->o_mtime = attr->cat_mtime;
}
if (ia_avalid & ATTR_SIZE) {
- oa->o_size = size;
- oa->o_blocks = OBD_OBJECT_EOF;
- oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+ if (io_is_falloc) {
+ oa->o_size = size;
+ oa->o_blocks = end;
+ oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+ } else {
+ oa->o_size = size;
+ oa->o_blocks = OBD_OBJECT_EOF;
+ oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+ }
if (oio->oi_lockless) {
oa->o_flags = OBD_FL_SRVLOCK;
init_completion(&cbargs->opc_sync);
- if (ia_avalid & ATTR_SIZE)
+ if (io_is_falloc) {
+ int falloc_mode = io->u.ci_setattr.sa_falloc_mode;
+
+ result = osc_fallocate_base(osc_export(cl2osc(obj)),
+ oa, osc_async_upcall,
+ cbargs, falloc_mode);
+ } else if (ia_avalid & ATTR_SIZE) {
result = osc_punch_send(osc_export(cl2osc(obj)),
oa, osc_async_upcall, cbargs);
- else
+ } else {
result = osc_setattr_async(osc_export(cl2osc(obj)),
oa, osc_async_upcall,
cbargs, PTLRPCD_SET);
-
+ }
cbargs->opc_rpc_sent = result == 0;
}
struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
LASSERT(cl_io_is_trunc(io));
+ LASSERT(cl_io_is_trunc(io) || cl_io_is_fallocate(io));
/* XXX: Need a lock. */
osd->od_stats.os_lockless_truncates++;
}
if (cl_io_is_trunc(io)) {
__u64 size = io->u.ci_setattr.sa_attr.lvb_size;
+
+ if (result == 0) {
+ cl_object_attr_lock(obj);
+ if (oa->o_valid & OBD_MD_FLBLOCKS) {
+ attr->cat_blocks = oa->o_blocks;
+ cl_valid |= CAT_BLOCKS;
+ }
+
+ cl_object_attr_update(env, obj, attr, cl_valid);
+ cl_object_attr_unlock(obj);
+ }
+ osc_trunc_check(env, io, oio, size);
+ osc_cache_truncate_end(env, oio->oi_trunc);
+ oio->oi_trunc = NULL;
+ }
+
+ if (cl_io_is_fallocate(io)) {
cl_object_attr_lock(obj);
+
+ /* update blocks */
if (oa->o_valid & OBD_MD_FLBLOCKS) {
attr->cat_blocks = oa->o_blocks;
cl_valid |= CAT_BLOCKS;
}
+ /* update size */
+ if (oa->o_valid & OBD_MD_FLSIZE) {
+ attr->cat_size = oa->o_size;
+ cl_valid |= CAT_SIZE;
+ }
+
cl_object_attr_update(env, obj, attr, cl_valid);
cl_object_attr_unlock(obj);
- osc_trunc_check(env, io, oio, size);
- osc_cache_truncate_end(env, oio->oi_trunc);
- oio->oi_trunc = NULL;
}
}
EXPORT_SYMBOL(osc_io_setattr_end);
ptlrpc_request_set_replen(req);
req->rq_interpret_reply = osc_data_version_interpret;
- CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args));
- dva = ptlrpc_req_async_args(req);
+ dva = ptlrpc_req_async_args(dva, req);
dva->dva_oio = oio;
ptlrpcd_add_req(req);
static const struct cl_io_operations osc_io_ops = {
.op = {
[CIT_READ] = {
- .cio_iter_init = osc_io_iter_init,
- .cio_iter_fini = osc_io_iter_fini,
+ .cio_iter_init = osc_io_rw_iter_init,
+ .cio_iter_fini = osc_io_rw_iter_fini,
.cio_start = osc_io_read_start,
.cio_fini = osc_io_fini
},
[CIT_WRITE] = {
- .cio_iter_init = osc_io_write_iter_init,
- .cio_iter_fini = osc_io_write_iter_fini,
+ .cio_iter_init = osc_io_rw_iter_init,
+ .cio_iter_fini = osc_io_rw_iter_fini,
.cio_start = osc_io_write_start,
.cio_end = osc_io_end,
.cio_fini = osc_io_fini