page_count = 0;
cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
- pgoff_t index = oap2cl_page(oap)->cp_index;
+ pgoff_t index = osc_index(oap2osc(oap));
++page_count;
if (index > ext->oe_end || index < ext->oe_start)
GOTO(out, rc = 110);
/* For short writes we shouldn't count parts of pages that
* span a whole chunk on the OST side, or our accounting goes
* wrong. Should match the code in filter_grant_check. */
- int offset = oap->oap_page_off & ~CFS_PAGE_MASK;
- int count = oap->oap_count + (offset & (blocksize - 1));
- int end = (offset + oap->oap_count) & (blocksize - 1);
+ int offset = last_off & ~CFS_PAGE_MASK;
+ int count = last_count + (offset & (blocksize - 1));
+ int end = (offset + last_count) & (blocksize - 1);
if (end)
count += blocksize - end;
/* discard all pages with index greater then trunc_index */
cfs_list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
oap_pending_item) {
- struct cl_page *sub = oap2cl_page(oap);
- struct cl_page *page = cl_page_top(sub);
+ pgoff_t index = osc_index(oap2osc(oap));
+ struct cl_page *page = oap2cl_page(oap);
LASSERT(cfs_list_empty(&oap->oap_rpc_item));
/* only discard the pages with their index greater than
* trunc_index, and ... */
- if (sub->cp_index < trunc_index ||
- (sub->cp_index == trunc_index && partial)) {
+ if (index < trunc_index ||
+ (index == trunc_index && partial)) {
/* accounting how many pages remaining in the chunk
* so that we can calculate grants correctly. */
- if (sub->cp_index >> ppc_bits == trunc_chunk)
+ if (index >> ppc_bits == trunc_chunk)
++pages_in_chunk;
continue;
}
lu_ref_add(&page->cp_reference, "truncate", current);
if (cl_page_own(env, io, page) == 0) {
- cl_page_unmap(env, io, page);
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
} else {
int cmd)
{
struct osc_page *opg = oap2osc_page(oap);
- struct cl_page *page = cl_page_top(oap2cl_page(oap));
+ struct cl_page *page = oap2cl_page(oap);
int result;
LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
struct osc_async_page *oap, int cmd)
{
struct osc_page *opg = oap2osc_page(oap);
- struct cl_page *page = oap2cl_page(oap);
+ pgoff_t index = osc_index(oap2osc(oap));
struct cl_object *obj;
struct cl_attr *attr = &osc_env_info(env)->oti_attr;
if (result < 0)
return result;
kms = attr->cat_kms;
- if (cl_offset(obj, page->cp_index) >= kms)
+ if (cl_offset(obj, index) >= kms)
/* catch race with truncate */
return 0;
- else if (cl_offset(obj, page->cp_index + 1) > kms)
+ else if (cl_offset(obj, index + 1) > kms)
/* catch sub-page write at end of file */
return kms % PAGE_CACHE_SIZE;
else
int cmd, int rc)
{
struct osc_page *opg = oap2osc_page(oap);
- struct cl_page *page = cl_page_top(oap2cl_page(oap));
+ struct cl_page *page = oap2cl_page(oap);
struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
enum cl_req_type crt;
int srvlock;
cl_object_get(obj);
client_obd_list_unlock(&cli->cl_loi_list_lock);
- lu_object_ref_add_at(&obj->co_lu, &link, "check",
- current);
+ lu_object_ref_add_at(&obj->co_lu, &link, "check", current);
/* attempt some read/write balancing by alternating between
* reads and writes in an object. The makes_rpc checks here
osc_object_unlock(osc);
osc_list_maint(cli, osc);
- lu_object_ref_del_at(&obj->co_lu, &link, "check",
- current);
+ lu_object_ref_del_at(&obj->co_lu, &link, "check", current);
cl_object_put(env, obj);
client_obd_list_lock(&cli->cl_loi_list_lock);
OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
oap, oap->oap_page, oap->oap_cmd & OBD_BRW_RWMASK);
- index = oap2cl_page(oap)->cp_index;
+ index = osc_index(oap2osc(oap));
/* Add this page into extent by the following steps:
* 1. if there exists an active extent for this IO, mostly this page
LASSERT(oap->oap_magic == OAP_MAGIC);
CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
- oap, ops, oap2cl_page(oap)->cp_index);
+ oap, ops, osc_index(oap2osc(oap)));
osc_object_lock(obj);
if (!cfs_list_empty(&oap->oap_rpc_item)) {
CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap);
rc = -EBUSY;
} else if (!cfs_list_empty(&oap->oap_pending_item)) {
- ext = osc_extent_lookup(obj, oap2cl_page(oap)->cp_index);
+ ext = osc_extent_lookup(obj, osc_index(oap2osc(oap)));
/* only truncated pages are allowed to be taken out.
* See osc_extent_truncate() and osc_cache_truncate_start()
* for details. */
if (ext != NULL && ext->oe_state != OES_TRUNC) {
OSC_EXTENT_DUMP(D_ERROR, ext, "trunc at %lu.\n",
- oap2cl_page(oap)->cp_index);
+ osc_index(oap2osc(oap)));
rc = -EBUSY;
}
}
struct osc_extent *ext = NULL;
struct osc_object *obj = cl2osc(ops->ops_cl.cpl_obj);
struct cl_page *cp = ops->ops_cl.cpl_page;
- pgoff_t index = cp->cp_index;
+ pgoff_t index = osc_index(ops);
struct osc_async_page *oap = &ops->ops_oap;
bool unplug = false;
int rc = 0;
switch (ext->oe_state) {
case OES_RPC:
case OES_LOCK_DONE:
- CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(cp),
- "flush an in-rpc page?\n");
+ CL_PAGE_DEBUG(D_ERROR, env, cp, "flush an in-rpc page?\n");
LASSERT(0);
break;
case OES_LOCKING:
* really sending the RPC. */
case OES_TRUNC:
/* race with truncate, page will be redirtied */
+ case OES_ACTIVE:
+ /* The extent is active so we need to abort and let the caller
+ * re-dirty the page. If we continued on here, and we were the
+ * one making the extent active, we could deadlock waiting for
+ * the page writeback to clear but it won't because the extent
+ * is active and won't be written out. */
GOTO(out, rc = -EAGAIN);
default:
break;
}
- rc = cl_page_prep(env, io, cl_page_top(cp), CRT_WRITE);
+ rc = cl_page_prep(env, io, cp, CRT_WRITE);
if (rc)
GOTO(out, rc);
struct osc_extent *ext;
struct osc_extent *found = NULL;
cfs_list_t *plist;
- pgoff_t index = oap2cl_page(oap)->cp_index;
+ pgoff_t index = osc_index(ops);
int rc = -EBUSY;
int cmd;
ENTRY;
ENTRY;
cfs_list_for_each_entry(oap, list, oap_pending_item) {
- struct cl_page *cp = oap2cl_page(oap);
- if (cp->cp_index > end)
- end = cp->cp_index;
- if (cp->cp_index < start)
- start = cp->cp_index;
+ pgoff_t index = osc_index(oap2osc(oap));
+ if (index > end)
+ end = index;
+ if (index < start)
+ start = index;
++page_count;
mppr <<= (page_count > mppr);
}
RETURN(result);
}
+/**
+ * Returns a list of pages by a given [start, end] of \a obj.
+ *
+ * \param resched If not NULL, then we give up before hogging CPU for too
+ * long and set *resched = 1, in that case caller should implement a retry
+ * logic.
+ *
+ * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
+ * crucial in the face of [offset, EOF] locks.
+ *
+ * Return at least one page in @queue unless there is no covered page.
+ */
+int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
+ struct osc_object *osc, pgoff_t start, pgoff_t end,
+ osc_page_gang_cbt cb, void *cbdata)
+{
+ struct osc_page *ops;
+ void **pvec;
+ pgoff_t idx;
+ unsigned int nr;
+ unsigned int i;
+ unsigned int j;
+ int res = CLP_GANG_OKAY;
+ bool tree_lock = true;
+ ENTRY;
+
+ idx = start;
+ pvec = osc_env_info(env)->oti_pvec;
+ spin_lock(&osc->oo_tree_lock);
+ while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
+ idx, OTI_PVEC_SIZE)) > 0) {
+ struct cl_page *page;
+ bool end_of_region = false;
+
+ for (i = 0, j = 0; i < nr; ++i) {
+ ops = pvec[i];
+ pvec[i] = NULL;
+
+ idx = osc_index(ops);
+ if (idx > end) {
+ end_of_region = true;
+ break;
+ }
+
+ page = ops->ops_cl.cpl_page;
+ LASSERT(page->cp_type == CPT_CACHEABLE);
+ if (page->cp_state == CPS_FREEING)
+ continue;
+
+ cl_page_get(page);
+ lu_ref_add_atomic(&page->cp_reference,
+ "gang_lookup", current);
+ pvec[j++] = ops;
+ }
+ ++idx;
+
+ /*
+ * Here a delicate locking dance is performed. Current thread
+ * holds a reference to a page, but has to own it before it
+ * can be placed into queue. Owning implies waiting, so
+ * radix-tree lock is to be released. After a wait one has to
+ * check that pages weren't truncated (cl_page_own() returns
+ * error in the latter case).
+ */
+ spin_unlock(&osc->oo_tree_lock);
+ tree_lock = false;
+
+ for (i = 0; i < j; ++i) {
+ ops = pvec[i];
+ if (res == CLP_GANG_OKAY)
+ res = (*cb)(env, io, ops, cbdata);
+
+ page = ops->ops_cl.cpl_page;
+ lu_ref_del(&page->cp_reference, "gang_lookup", current);
+ cl_page_put(env, page);
+ }
+ if (nr < OTI_PVEC_SIZE || end_of_region)
+ break;
+
+ if (res == CLP_GANG_OKAY && need_resched())
+ res = CLP_GANG_RESCHED;
+ if (res != CLP_GANG_OKAY)
+ break;
+
+ spin_lock(&osc->oo_tree_lock);
+ tree_lock = true;
+ }
+ if (tree_lock)
+ spin_unlock(&osc->oo_tree_lock);
+ RETURN(res);
+}
+
+/**
+ * Check if page @page is covered by an extra lock or discard it.
+ */
+static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
+ struct osc_page *ops, void *cbdata)
+{
+ struct osc_thread_info *info = osc_env_info(env);
+ struct cl_lock *lock = cbdata;
+ pgoff_t index;
+
+ index = osc_index(ops);
+ if (index >= info->oti_fn_index) {
+ struct cl_lock *tmp;
+ struct cl_page *page = ops->ops_cl.cpl_page;
+
+ /* refresh non-overlapped index */
+ tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
+ lock, 1, 0);
+ if (tmp != NULL) {
+ /* Cache the first-non-overlapped index so as to skip
+ * all pages within [index, oti_fn_index). This
+ * is safe because if tmp lock is canceled, it will
+ * discard these pages. */
+ info->oti_fn_index = tmp->cll_descr.cld_end + 1;
+ if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
+ info->oti_fn_index = CL_PAGE_EOF;
+ cl_lock_put(env, tmp);
+ } else if (cl_page_own(env, io, page) == 0) {
+ /* discard the page */
+ cl_page_discard(env, io, page);
+ cl_page_disown(env, io, page);
+ } else {
+ LASSERT(page->cp_state == CPS_FREEING);
+ }
+ }
+
+ info->oti_next_index = index + 1;
+ return CLP_GANG_OKAY;
+}
+
+static int discard_cb(const struct lu_env *env, struct cl_io *io,
+ struct osc_page *ops, void *cbdata)
+{
+ struct osc_thread_info *info = osc_env_info(env);
+ struct cl_lock *lock = cbdata;
+ struct cl_page *page = ops->ops_cl.cpl_page;
+
+ LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
+
+ /* page is top page. */
+ info->oti_next_index = osc_index(ops) + 1;
+ if (cl_page_own(env, io, page) == 0) {
+ KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
+ !PageDirty(cl_page_vmpage(page))));
+
+ /* discard the page */
+ cl_page_discard(env, io, page);
+ cl_page_disown(env, io, page);
+ } else {
+ LASSERT(page->cp_state == CPS_FREEING);
+ }
+
+ return CLP_GANG_OKAY;
+}
+
+/**
+ * Discard pages protected by the given lock. This function traverses radix
+ * tree to find all covering pages and discard them. If a page is being covered
+ * by other locks, it should remain in cache.
+ *
+ * If error happens on any step, the process continues anyway (the reasoning
+ * behind this being that lock cancellation cannot be delayed indefinitely).
+ */
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *ols)
+{
+ struct osc_thread_info *info = osc_env_info(env);
+ struct cl_io *io = &info->oti_io;
+ struct cl_object *osc = ols->ols_cl.cls_obj;
+ struct cl_lock *lock = ols->ols_cl.cls_lock;
+ struct cl_lock_descr *descr = &lock->cll_descr;
+ osc_page_gang_cbt cb;
+ int res;
+ int result;
+
+ ENTRY;
+
+ io->ci_obj = cl_object_top(osc);
+ io->ci_ignore_layout = 1;
+ result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+ if (result != 0)
+ GOTO(out, result);
+
+ cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
+ info->oti_fn_index = info->oti_next_index = descr->cld_start;
+ do {
+ res = osc_page_gang_lookup(env, io, cl2osc(osc),
+ info->oti_next_index, descr->cld_end,
+ cb, (void *)lock);
+ if (info->oti_next_index > descr->cld_end)
+ break;
+
+ if (res == CLP_GANG_RESCHED)
+ cond_resched();
+ } while (res != CLP_GANG_OKAY);
+out:
+ cl_io_fini(env, io);
+ RETURN(result);
+}
+
+
/** @} osc */