#include <lustre_obdo.h>
#include <lustre_osc.h>
#include <linux/pagevec.h>
+#include <linux/falloc.h>
#include "osc_internal.h"
ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
}
- ra->cra_rpc_size = osc_cli(osc)->cl_max_pages_per_rpc;
- ra->cra_end = cl_index(osc2cl(osc),
- dlmlock->l_policy_data.l_extent.end);
+ ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc;
+ ra->cra_end_idx = cl_index(osc2cl(osc),
+ dlmlock->l_policy_data.l_extent.end);
ra->cra_release = osc_read_ahead_release;
ra->cra_cbdata = dlmlock;
- if (ra->cra_end != CL_PAGE_EOF)
+ if (ra->cra_end_idx != CL_PAGE_EOF)
ra->cra_contention = true;
result = 0;
}
struct osc_object *osc = NULL; /* to keep gcc happy */
struct osc_page *opg;
struct cl_io *io;
- struct list_head list = LIST_HEAD_INIT(list);
+ LIST_HEAD(list);
struct cl_page_list *qin = &queue->c2_qin;
struct cl_page_list *qout = &queue->c2_qout;
if (crt == CRT_READ && ios->cis_io->ci_ndelay)
brw_flags |= OBD_BRW_NDELAY;
+ page = cl_page_list_first(qin);
+ if (page->cp_type == CPT_TRANSIENT)
+ brw_flags |= OBD_BRW_NOCACHE;
+
/*
* NOTE: here @page is a top-level page. This is done to avoid
* creation of sub-page-list.
}
EXPORT_SYMBOL(osc_io_commit_async);
+static bool osc_import_not_healthy(struct obd_import *imp)
+{
+ return imp->imp_invalid || imp->imp_deactive ||
+ !(imp->imp_state == LUSTRE_IMP_FULL ||
+ imp->imp_state == LUSTRE_IMP_IDLE);
+}
+
int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios)
{
struct osc_object *osc = cl2osc(ios->cis_obj);
struct obd_import *imp = osc_cli(osc)->cl_import;
struct osc_io *oio = osc_env_io(env);
int rc = -EIO;
+ ENTRY;
spin_lock(&imp->imp_lock);
- if (likely(!imp->imp_invalid)) {
+ /**
+ * check whether this OSC device is available for non-delay read,
+ * fast switching mirror if we haven't tried all mirrors.
+ */
+ if (ios->cis_io->ci_type == CIT_READ && ios->cis_io->ci_ndelay &&
+ !ios->cis_io->ci_tried_all_mirrors && osc_import_not_healthy(imp)) {
+ rc = -EWOULDBLOCK;
+ } else if (likely(!imp->imp_invalid)) {
atomic_inc(&osc->oo_nr_ios);
oio->oi_is_active = 1;
rc = 0;
if (cfs_capable(CFS_CAP_SYS_RESOURCE))
oio->oi_cap_sys_resource = 1;
- return rc;
+ RETURN(rc);
}
EXPORT_SYMBOL(osc_io_iter_init);
/**
* Checks that there are no pages being written in the extent being truncated.
*/
-static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
+static bool trunc_check_cb(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops , void *cbdata)
{
struct cl_page *page = ops->ops_cl.cpl_page;
CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
- return CLP_GANG_OKAY;
+ return true;
}
static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
static int osc_io_setattr_start(const struct lu_env *env,
const struct cl_io_slice *slice)
{
- struct cl_io *io = slice->cis_io;
- struct osc_io *oio = cl2osc_io(env, slice);
- struct cl_object *obj = slice->cis_obj;
- struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
- struct cl_attr *attr = &osc_env_info(env)->oti_attr;
- struct obdo *oa = &oio->oi_oa;
+ struct cl_io *io = slice->cis_io;
+ struct osc_io *oio = cl2osc_io(env, slice);
+ struct cl_object *obj = slice->cis_obj;
+ struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
+ struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+ struct obdo *oa = &oio->oi_oa;
struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
- __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
unsigned int ia_avalid = io->u.ci_setattr.sa_avalid;
enum op_xvalid ia_xvalid = io->u.ci_setattr.sa_xvalid;
int result = 0;
+ __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
+ __u64 end = OBD_OBJECT_EOF;
+ bool io_is_falloc = false;
ENTRY;
/* truncate cache dirty pages first */
- if (cl_io_is_trunc(io))
+ if (cl_io_is_trunc(io)) {
result = osc_cache_truncate_start(env, cl2osc(obj), size,
&oio->oi_trunc);
+ } else if (cl_io_is_fallocate(io)) {
+ io_is_falloc = true;
+ size = io->u.ci_setattr.sa_falloc_offset;
+ end = io->u.ci_setattr.sa_falloc_end;
+ }
if (result == 0 && oio->oi_lockless == 0) {
cl_object_attr_lock(obj);
oa->o_mtime = attr->cat_mtime;
}
if (ia_avalid & ATTR_SIZE) {
- oa->o_size = size;
- oa->o_blocks = OBD_OBJECT_EOF;
- oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+ if (io_is_falloc) {
+ oa->o_size = size;
+ oa->o_blocks = end;
+ oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+ } else {
+ oa->o_size = size;
+ oa->o_blocks = OBD_OBJECT_EOF;
+ oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+ }
if (oio->oi_lockless) {
oa->o_flags = OBD_FL_SRVLOCK;
init_completion(&cbargs->opc_sync);
- if (ia_avalid & ATTR_SIZE)
+ if (io_is_falloc) {
+ int falloc_mode = io->u.ci_setattr.sa_falloc_mode;
+
+ result = osc_fallocate_base(osc_export(cl2osc(obj)),
+ oa, osc_async_upcall,
+ cbargs, falloc_mode);
+ } else if (ia_avalid & ATTR_SIZE) {
result = osc_punch_send(osc_export(cl2osc(obj)),
oa, osc_async_upcall, cbargs);
- else
+ } else {
result = osc_setattr_async(osc_export(cl2osc(obj)),
oa, osc_async_upcall,
cbargs, PTLRPCD_SET);
-
+ }
cbargs->opc_rpc_sent = result == 0;
}
struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
LASSERT(cl_io_is_trunc(io));
+ LASSERT(cl_io_is_trunc(io) || cl_io_is_fallocate(io));
/* XXX: Need a lock. */
osd->od_stats.os_lockless_truncates++;
}
if (cl_io_is_trunc(io)) {
__u64 size = io->u.ci_setattr.sa_attr.lvb_size;
+
+ if (result == 0) {
+ cl_object_attr_lock(obj);
+ if (oa->o_valid & OBD_MD_FLBLOCKS) {
+ attr->cat_blocks = oa->o_blocks;
+ cl_valid |= CAT_BLOCKS;
+ }
+
+ cl_object_attr_update(env, obj, attr, cl_valid);
+ cl_object_attr_unlock(obj);
+ }
+ osc_trunc_check(env, io, oio, size);
+ osc_cache_truncate_end(env, oio->oi_trunc);
+ oio->oi_trunc = NULL;
+ }
+
+ if (cl_io_is_fallocate(io)) {
cl_object_attr_lock(obj);
+
+ /* update blocks */
if (oa->o_valid & OBD_MD_FLBLOCKS) {
attr->cat_blocks = oa->o_blocks;
cl_valid |= CAT_BLOCKS;
}
+ /* update size */
+ if (oa->o_valid & OBD_MD_FLSIZE) {
+ attr->cat_size = oa->o_size;
+ cl_valid |= CAT_SIZE;
+ }
+
cl_object_attr_update(env, obj, attr, cl_valid);
cl_object_attr_unlock(obj);
- osc_trunc_check(env, io, oio, size);
- osc_cache_truncate_end(env, oio->oi_trunc);
- oio->oi_trunc = NULL;
}
}
EXPORT_SYMBOL(osc_io_setattr_end);
}
EXPORT_SYMBOL(osc_io_end);
+struct osc_lseek_args {
+ struct osc_io *lsa_oio;
+};
+
+static int osc_lseek_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ void *arg, int rc)
+{
+ struct ost_body *reply;
+ struct osc_lseek_args *lsa = arg;
+ struct osc_io *oio = lsa->lsa_oio;
+ struct cl_io *io = oio->oi_cl.cis_io;
+ struct cl_lseek_io *lsio = &io->u.ci_lseek;
+
+ ENTRY;
+
+ if (rc != 0)
+ GOTO(out, rc);
+
+ reply = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+ if (reply == NULL)
+ GOTO(out, rc = -EPROTO);
+
+ lsio->ls_result = reply->oa.o_size;
+out:
+ osc_async_upcall(&oio->oi_cbarg, rc);
+ RETURN(rc);
+}
+
+int osc_io_lseek_start(const struct lu_env *env,
+ const struct cl_io_slice *slice)
+{
+ struct cl_io *io = slice->cis_io;
+ struct osc_io *oio = cl2osc_io(env, slice);
+ struct cl_object *obj = slice->cis_obj;
+ struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
+ struct cl_lseek_io *lsio = &io->u.ci_lseek;
+ struct obdo *oa = &oio->oi_oa;
+ struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+ struct obd_export *exp = osc_export(cl2osc(obj));
+ struct ptlrpc_request *req;
+ struct ost_body *body;
+ struct osc_lseek_args *lsa;
+ int rc = 0;
+
+ ENTRY;
+
+ /* No negative values at this point */
+ LASSERT(lsio->ls_start >= 0);
+ LASSERT(lsio->ls_whence == SEEK_HOLE || lsio->ls_whence == SEEK_DATA);
+
+ /* with IO lock taken we have object size in LVB and can check
+ * boundaries prior sending LSEEK RPC
+ */
+ if (lsio->ls_start >= loi->loi_lvb.lvb_size) {
+ /* consider area beyond end of object as hole */
+ if (lsio->ls_whence == SEEK_HOLE)
+ lsio->ls_result = lsio->ls_start;
+ else
+ lsio->ls_result = -ENXIO;
+ RETURN(0);
+ }
+
+ /* if LSEEK RPC is not supported by server, consider whole stripe
+ * object is data with hole after end of object
+ */
+ if (!exp_connect_lseek(exp)) {
+ if (lsio->ls_whence == SEEK_HOLE)
+ lsio->ls_result = loi->loi_lvb.lvb_size;
+ else
+ lsio->ls_result = lsio->ls_start;
+ RETURN(0);
+ }
+
+ memset(oa, 0, sizeof(*oa));
+ oa->o_oi = loi->loi_oi;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+ oa->o_size = lsio->ls_start;
+ oa->o_mode = lsio->ls_whence;
+ if (oio->oi_lockless) {
+ oa->o_flags = OBD_FL_SRVLOCK;
+ oa->o_valid |= OBD_MD_FLFLAGS;
+ }
+
+ init_completion(&cbargs->opc_sync);
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SEEK);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SEEK);
+ if (rc < 0) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+ ptlrpc_request_set_replen(req);
+ req->rq_interpret_reply = osc_lseek_interpret;
+ lsa = ptlrpc_req_async_args(lsa, req);
+ lsa->lsa_oio = oio;
+
+ ptlrpcd_add_req(req);
+ cbargs->opc_rpc_sent = 1;
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(osc_io_lseek_start);
+
+void osc_io_lseek_end(const struct lu_env *env,
+ const struct cl_io_slice *slice)
+{
+ struct osc_io *oio = cl2osc_io(env, slice);
+ struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+ int rc = 0;
+
+ if (cbargs->opc_rpc_sent) {
+ wait_for_completion(&cbargs->opc_sync);
+ rc = cbargs->opc_rc;
+ }
+ slice->cis_io->ci_result = rc;
+}
+EXPORT_SYMBOL(osc_io_lseek_end);
+
static const struct cl_io_operations osc_io_ops = {
.op = {
[CIT_READ] = {
.cio_end = osc_io_ladvise_end,
.cio_fini = osc_io_fini
},
+ [CIT_LSEEK] = {
+ .cio_start = osc_io_lseek_start,
+ .cio_end = osc_io_lseek_end,
+ .cio_fini = osc_io_fini
+ },
[CIT_MISC] = {
.cio_fini = osc_io_fini
}