*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* Implementation of cl_io for OSC layer.
*
#include <linux/falloc.h>
#include "osc_internal.h"
+#include <lnet/lnet_rdma.h>
/** \addtogroup osc
* @{
oio->oi_is_readahead = true;
dlmlock = osc_dlmlock_at_pgoff(env, osc, start, 0);
if (dlmlock != NULL) {
+ struct lov_oinfo *oinfo = osc->oo_oinfo;
+
LASSERT(dlmlock->l_ast_data == osc);
if (dlmlock->l_req_mode != LCK_PR) {
struct lustre_handle lockh;
}
ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc;
- ra->cra_end_idx = cl_index(osc2cl(osc),
- dlmlock->l_policy_data.l_extent.end);
+ ra->cra_end_idx =
+ dlmlock->l_policy_data.l_extent.end >> PAGE_SHIFT;
ra->cra_release = osc_read_ahead_release;
ra->cra_dlmlock = dlmlock;
ra->cra_oio = oio;
if (ra->cra_end_idx != CL_PAGE_EOF)
ra->cra_contention = true;
+ ra->cra_end_idx = min_t(pgoff_t,
+ ra->cra_end_idx,
+ (oinfo->loi_kms - 1) >> PAGE_SHIFT);
result = 0;
}
page = cl_page_list_first(qin);
if (page->cp_type == CPT_TRANSIENT)
brw_flags |= OBD_BRW_NOCACHE;
+ if (lnet_is_rdma_only_page(page->cp_vmpage))
+ brw_flags |= OBD_BRW_RDMA_ONLY;
/*
* NOTE: here @page is a top-level page. This is done to avoid
opg = osc_cl_page_osc(page, osc);
oap = &opg->ops_oap;
- LASSERT(osc == oap->oap_obj);
if (!list_empty(&oap->oap_pending_item) ||
!list_empty(&oap->oap_rpc_item)) {
continue;
}
- spin_lock(&oap->oap_lock);
- oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY;
- oap->oap_async_flags |= ASYNC_COUNT_STABLE;
- spin_unlock(&oap->oap_lock);
+ if (page->cp_type != CPT_TRANSIENT) {
+ oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY|ASYNC_COUNT_STABLE;
+ }
osc_page_submit(env, opg, crt, brw_flags);
list_add_tail(&oap->oap_pending_item, &list);
if (page->cp_sync_io != NULL)
cl_page_list_move(qout, qin, page);
else /* async IO */
- cl_page_list_del(env, qin, page);
+ cl_page_list_del(env, qin, page, true);
queued++;
if (queued == max_pages) {
ENTRY;
/* offset within stripe */
- kms = cl_offset(obj, idx) + to;
+ kms = (idx << PAGE_SHIFT) + to;
cl_object_attr_lock(obj);
CDEBUG(D_INODE, "stripe KMS %sincreasing %llu->%llu %llu\n",
struct cl_page_list *qin, int from, int to,
cl_commit_cbt cb)
{
- struct cl_io *io = ios->cis_io;
- struct osc_io *oio = cl2osc_io(env, ios);
+ struct cl_io *io = ios->cis_io;
+ struct osc_io *oio = cl2osc_io(env, ios);
struct osc_object *osc = cl2osc(ios->cis_obj);
- struct cl_page *page;
- struct cl_page *last_page;
+ struct cl_page *page;
+ struct cl_page *last_page;
struct osc_page *opg;
- struct pagevec *pvec = &osc_env_info(env)->oti_pagevec;
+ struct folio_batch *fbatch = &osc_env_info(env)->oti_fbatch;
int result = 0;
ENTRY;
}
}
- ll_pagevec_init(pvec, 0);
+ ll_folio_batch_init(fbatch, 0);
while (qin->pl_nr > 0) {
struct osc_async_page *oap;
opg = osc_cl_page_osc(page, osc);
oap = &opg->ops_oap;
- LASSERTF(osc == oap->oap_obj,
- "obj mismatch: %p / %p\n", osc, oap->oap_obj);
-
if (!list_empty(&oap->oap_rpc_item)) {
CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
oap, opg);
/* The page may be already in dirty cache. */
if (list_empty(&oap->oap_pending_item)) {
- result = osc_page_cache_add(env, opg, io, cb);
+ result = osc_page_cache_add(env, osc, opg, io, cb);
if (result != 0)
break;
}
osc_page_touch_at(env, osc2cl(osc), osc_index(opg),
page == last_page ? to : PAGE_SIZE);
- cl_page_list_del(env, qin, page);
+ cl_page_list_del(env, qin, page, true);
/* if there are no more slots, do the callback & reinit */
- if (pagevec_add(pvec, page->cp_vmpage) == 0) {
- (*cb)(env, io, pvec);
- pagevec_reinit(pvec);
+ if (!folio_batch_add_page(fbatch, page->cp_vmpage)) {
+ (*cb)(env, io, fbatch);
+ folio_batch_reinit(fbatch);
}
}
+ /* The shrink interval is in seconds, so we can update it once per
+ * write, rather than once per page.
+ */
+ osc_update_next_shrink(osc_cli(osc));
+
- /* Clean up any partially full pagevecs */
- if (pagevec_count(pvec) != 0)
- (*cb)(env, io, pvec);
+ /* Clean up any partially full folio_batches */
+ if (folio_batch_count(fbatch) != 0)
+ (*cb)(env, io, fbatch);
/* Can't access these pages any more. Page can be in transfer and
* complete at any time. */
oio->oi_lru_reserved = 0;
}
oio->oi_write_osclock = NULL;
+ oio->oi_read_osclock = NULL;
osc_io_iter_fini(env, ios);
}
io = ios->cis_io;
fio = &io->u.ci_fault;
CDEBUG(D_INFO, "%lu %d %zu\n",
- fio->ft_index, fio->ft_writable, fio->ft_nob);
+ fio->ft_index, fio->ft_writable, fio->ft_bytes);
/*
* If mapping is writeable, adjust kms to cover this page,
* but do not extend kms beyond actual file size.
*/
if (fio->ft_writable)
osc_page_touch_at(env, ios->cis_obj,
- fio->ft_index, fio->ft_nob);
+ fio->ft_index, fio->ft_bytes);
RETURN(0);
}
EXPORT_SYMBOL(osc_io_fault_start);
* Checks that there are no pages being written in the extent being truncated.
*/
static bool trunc_check_cb(const struct lu_env *env, struct cl_io *io,
- struct osc_page *ops , void *cbdata)
+ void **pvec, int count, void *cbdata)
{
- struct cl_page *page = ops->ops_cl.cpl_page;
- struct osc_async_page *oap;
- __u64 start = *(__u64 *)cbdata;
+ int i;
- oap = &ops->ops_oap;
- if (oap->oap_cmd & OBD_BRW_WRITE &&
- !list_empty(&oap->oap_pending_item))
- CL_PAGE_DEBUG(D_ERROR, env, page, "exists %llu/%s.\n",
- start, current->comm);
+ for (i = 0; i < count; i++) {
+ struct osc_page *ops = pvec[i];
+ struct cl_page *page = ops->ops_cl.cpl_page;
+ struct osc_async_page *oap;
+ __u64 start = *(__u64 *)cbdata;
- if (PageLocked(page->cp_vmpage))
- CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
- ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
+ oap = &ops->ops_oap;
+ if (oap->oap_cmd & OBD_BRW_WRITE &&
+ !list_empty(&oap->oap_pending_item))
+ CL_PAGE_DEBUG(D_ERROR, env, page, "exists %llu/%s.\n",
+ start, current->comm);
+ if (PageLocked(page->cp_vmpage))
+ CDEBUG(D_CACHE, "page %p index %lu locked for cmd=%d\n",
+ ops, osc_index(ops), oap->oap_cmd);
+ }
return true;
}
int partial;
pgoff_t start;
- clob = oio->oi_cl.cis_obj;
- start = cl_index(clob, size);
- partial = cl_offset(clob, start) < size;
+ clob = oio->oi_cl.cis_obj;
+ start = size >> PAGE_SHIFT;
+ partial = (start << PAGE_SHIFT) < size;
/*
* Complain if there are pages in the truncated region.
* if server doesn't support fallocate punch, we also need these data to be
* flushed first to prevent re-ordering with the punch
*/
-static int osc_punch_start(const struct lu_env *env, struct cl_io *io,
- struct cl_object *obj)
+int osc_punch_start(const struct lu_env *env, struct cl_io *io,
+ struct cl_object *obj)
{
struct osc_object *osc = cl2osc(obj);
- pgoff_t pg_start = cl_index(obj, io->u.ci_setattr.sa_falloc_offset);
- pgoff_t pg_end = cl_index(obj, io->u.ci_setattr.sa_falloc_end - 1);
+ pgoff_t pg_start = io->u.ci_setattr.sa_falloc_offset >> PAGE_SHIFT;
+ pgoff_t pg_end = (io->u.ci_setattr.sa_falloc_end - 1) >> PAGE_SHIFT;
int rc;
ENTRY;
osc);
RETURN(0);
}
+EXPORT_SYMBOL(osc_punch_start);
static int osc_io_setattr_start(const struct lu_env *env,
const struct cl_io_slice *slice)
oa->o_size = io->u.ci_setattr.sa_falloc_offset;
oa->o_blocks = io->u.ci_setattr.sa_falloc_end;
- oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+ oa->o_uid = io->u.ci_setattr.sa_falloc_uid;
+ oa->o_gid = io->u.ci_setattr.sa_falloc_gid;
+ oa->o_projid = io->u.ci_setattr.sa_falloc_projid;
+ oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+ OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLPROJID;
+
+ CDEBUG(D_INODE,
+ "size %llu blocks %llu uid %u gid %u prjid %u\n",
+ oa->o_size, oa->o_blocks, oa->o_uid, oa->o_gid,
+ oa->o_projid);
result = osc_fallocate_base(osc_export(cl2osc(obj)),
oa, osc_async_upcall,
cbargs, falloc_mode);
result = io->ci_result = cbargs->opc_rc;
}
- if (result == 0) {
- if (oio->oi_lockless) {
- /* lockless truncate */
- struct osc_device *osc = lu2osc_dev(obj->co_lu.lo_dev);
-
- LASSERT(cl_io_is_trunc(io) || cl_io_is_fallocate(io));
- /* XXX: Need a lock. */
- osc->od_stats.os_lockless_truncates++;
- }
- }
-
if (cl_io_is_trunc(io)) {
__u64 size = io->u.ci_setattr.sa_attr.lvb_size;
const struct cl_io_slice *slice)
{
struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version;
- struct osc_io *oio = cl2osc_io(env, slice);
+ struct osc_io *oio = cl2osc_io(env, slice);
+ struct cl_object *obj = slice->cis_obj;
struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+ struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+ struct obdo *oa = &oio->oi_oa;
+ unsigned int cl_valid = 0;
ENTRY;
wait_for_completion(&cbargs->opc_sync);
slice->cis_io->ci_result = cbargs->opc_rc;
} else {
slice->cis_io->ci_result = 0;
- if (!(oio->oi_oa.o_valid &
+ if (!(oa->o_valid &
(OBD_MD_LAYOUT_VERSION | OBD_MD_FLDATAVERSION)))
- slice->cis_io->ci_result = -ENOTSUPP;
+ slice->cis_io->ci_result = -EOPNOTSUPP;
- if (oio->oi_oa.o_valid & OBD_MD_LAYOUT_VERSION)
- dv->dv_layout_version = oio->oi_oa.o_layout_version;
- if (oio->oi_oa.o_valid & OBD_MD_FLDATAVERSION)
- dv->dv_data_version = oio->oi_oa.o_data_version;
+ if (oa->o_valid & OBD_MD_LAYOUT_VERSION)
+ dv->dv_layout_version = oa->o_layout_version;
+ if (oa->o_valid & OBD_MD_FLDATAVERSION)
+ dv->dv_data_version = oa->o_data_version;
+
+ if (dv->dv_flags & LL_DV_SZ_UPDATE) {
+ if (oa->o_valid & OBD_MD_FLSIZE) {
+ attr->cat_size = oa->o_size;
+ cl_valid |= CAT_SIZE;
+ }
+
+ if (oa->o_valid & OBD_MD_FLBLOCKS) {
+ attr->cat_blocks = oa->o_blocks;
+ cl_valid |= CAT_BLOCKS;
+ }
+
+ cl_object_attr_lock(obj);
+ cl_object_attr_update(env, obj, attr, cl_valid);
+ cl_object_attr_unlock(obj);
+ }
}
EXIT;
int rc = 0;
ENTRY;
- OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1);
cl_object_attr_lock(obj);
attr->cat_mtime = attr->cat_ctime = ktime_get_real_seconds();
rc = cl_object_attr_update(env, obj, attr, CAT_MTIME | CAT_CTIME);
}
EXPORT_SYMBOL(osc_fsync_ost);
-int osc_io_fsync_start(const struct lu_env *env,
- const struct cl_io_slice *slice)
+static int osc_io_fsync_start(const struct lu_env *env,
+ const struct cl_io_slice *slice)
{
- struct cl_io *io = slice->cis_io;
+ struct cl_io *io = slice->cis_io;
struct cl_fsync_io *fio = &io->u.ci_fsync;
- struct cl_object *obj = slice->cis_obj;
- struct osc_object *osc = cl2osc(obj);
- pgoff_t start = cl_index(obj, fio->fi_start);
- pgoff_t end = cl_index(obj, fio->fi_end);
- int result = 0;
+ struct cl_object *obj = slice->cis_obj;
+ struct osc_object *osc = cl2osc(obj);
+ pgoff_t start = fio->fi_start >> PAGE_SHIFT;
+ pgoff_t end = fio->fi_end >> PAGE_SHIFT;
+ int result = 0;
+
ENTRY;
+ if (fio->fi_mode == CL_FSYNC_RECLAIM) {
+ struct client_obd *cli = osc_cli(osc);
+
+ if (!atomic_long_read(&cli->cl_unstable_count)) {
+ /* Stop flush when there are no unstable pages? */
+ CDEBUG(D_CACHE, "unstable count is zero\n");
+ RETURN(0);
+ }
+ }
+
if (fio->fi_end == OBD_OBJECT_EOF)
end = CL_PAGE_EOF;
result = osc_cache_writeback_range(env, osc, start, end, 0,
fio->fi_mode == CL_FSYNC_DISCARD);
+ if (result < 0 && fio->fi_mode == CL_FSYNC_DISCARD) {
+ CDEBUG(D_CACHE,
+ "%s: ignore error %d on discarding "DFID":[%lu-%lu]\n",
+ cli_name(osc_cli(osc)), result, PFID(fio->fi_fid),
+ start, end);
+ result = 0;
+ }
if (result > 0) {
fio->fi_nr_written += result;
result = 0;
}
- if (fio->fi_mode == CL_FSYNC_ALL) {
+ if (fio->fi_mode == CL_FSYNC_ALL || fio->fi_mode == CL_FSYNC_RECLAIM) {
+ struct osc_io *oio = cl2osc_io(env, slice);
+ struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
int rc;
/* we have to wait for writeback to finish before we can
* send OST_SYNC RPC. This is bad because it causes extents
* to be written osc by osc. However, we usually start
* writeback before CL_FSYNC_ALL so this won't have any real
- * problem. */
- rc = osc_cache_wait_range(env, osc, start, end);
- if (result == 0)
- result = rc;
+ * problem.
+ * We do not have to wait for waitback to finish in the memory
+ * reclaim environment.
+ */
+ if (fio->fi_mode == CL_FSYNC_ALL) {
+ rc = osc_cache_wait_range(env, osc, start, end);
+ if (result == 0)
+ result = rc;
+ }
+
rc = osc_fsync_ost(env, osc, fio);
- if (result == 0)
+ if (result == 0) {
+ cbargs->opc_rpc_sent = 1;
result = rc;
+ }
}
RETURN(result);
const struct cl_io_slice *slice)
{
struct cl_fsync_io *fio = &slice->cis_io->u.ci_fsync;
- struct cl_object *obj = slice->cis_obj;
- pgoff_t start = cl_index(obj, fio->fi_start);
- pgoff_t end = cl_index(obj, fio->fi_end);
+ struct cl_object *obj = slice->cis_obj;
+ struct osc_io *oio = cl2osc_io(env, slice);
+ struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+ pgoff_t start = fio->fi_start >> PAGE_SHIFT;
+ pgoff_t end = fio->fi_end >> PAGE_SHIFT;
int result = 0;
if (fio->fi_mode == CL_FSYNC_LOCAL) {
result = osc_cache_wait_range(env, cl2osc(obj), start, end);
- } else if (fio->fi_mode == CL_FSYNC_ALL) {
- struct osc_io *oio = cl2osc_io(env, slice);
- struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
+ } else if (cbargs->opc_rpc_sent && (fio->fi_mode == CL_FSYNC_ALL ||
+ fio->fi_mode == CL_FSYNC_RECLAIM)) {
wait_for_completion(&cbargs->opc_sync);
if (result == 0)
memset(ladvise_hdr, 0, buf_size);
ladvise_hdr->lah_magic = LADVISE_MAGIC;
ladvise_hdr->lah_count = num_advise;
- ladvise_hdr->lah_flags = lio->li_flags;
+ ladvise_hdr->lah_flags = lio->lio_flags;
memset(oa, 0, sizeof(*oa));
oa->o_oi = loi->loi_oi;
- oa->o_valid = OBD_MD_FLID;
- obdo_set_parent_fid(oa, lio->li_fid);
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+ obdo_set_parent_fid(oa, lio->lio_fid);
ladvise = ladvise_hdr->lah_advise;
- ladvise->lla_start = lio->li_start;
- ladvise->lla_end = lio->li_end;
- ladvise->lla_advice = lio->li_advice;
+ ladvise->lla_start = lio->lio_start;
+ ladvise->lla_end = lio->lio_end;
+ ladvise->lla_advice = lio->lio_advice;
- if (lio->li_flags & LF_ASYNC) {
+ if (lio->lio_flags & LF_ASYNC) {
result = osc_ladvise_base(osc_export(cl2osc(obj)), oa,
ladvise_hdr, NULL, NULL, NULL);
} else {
int result = 0;
struct cl_ladvise_io *lio = &io->u.ci_ladvise;
- if ((!(lio->li_flags & LF_ASYNC)) && cbargs->opc_rpc_sent) {
+ if ((!(lio->lio_flags & LF_ASYNC)) && cbargs->opc_rpc_sent) {
wait_for_completion(&cbargs->opc_sync);
result = cbargs->opc_rc;
}
int osc_io_init(const struct lu_env *env,
struct cl_object *obj, struct cl_io *io)
{
+ struct obd_export *exp = osc_export(cl2osc(obj));
struct osc_io *oio = osc_env_io(env);
CL_IO_SLICE_CLEAN(oio, oi_cl);
cl_io_slice_add(io, &oio->oi_cl, obj, &osc_io_ops);
+
+ if (!exp_connect_unaligned_dio(exp))
+ cl_io_top(io)->ci_allow_unaligned_dio = false;
+
return 0;
}