* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011 Whamcloud, Inc.
+ * Copyright (c) 2012, 2013, Intel Corporation.
*
*/
/*
struct client_obd *cli = osc_cli(ext->oe_obj);
struct osc_async_page *oap;
struct osc_async_page *tmp;
- struct osc_async_page *last = NULL;
int nr_pages = ext->oe_nr_pages;
int lost_grant = 0;
int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
+ __u64 last_off = 0;
+ int last_count = -1;
ENTRY;
OSC_EXTENT_DUMP(D_CACHE, ext, "extent finished.\n");
oap_pending_item) {
cfs_list_del_init(&oap->oap_rpc_item);
cfs_list_del_init(&oap->oap_pending_item);
- if (last == NULL || last->oap_obj_off < oap->oap_obj_off)
- last = oap;
+ if (last_off <= oap->oap_obj_off) {
+ last_off = oap->oap_obj_off;
+ last_count = oap->oap_count;
+ }
--ext->oe_nr_pages;
osc_ap_completion(env, cli, oap, sent, rc);
if (!sent) {
lost_grant = ext->oe_grants;
} else if (blocksize < CFS_PAGE_SIZE &&
- last->oap_count != CFS_PAGE_SIZE) {
+ last_count != CFS_PAGE_SIZE) {
/* For short writes we shouldn't count parts of pages that
* span a whole chunk on the OST side, or our accounting goes
* wrong. Should match the code in filter_grant_check. */
RETURN(0);
}
+static int extent_wait_cb(struct osc_extent *ext, int state)
+{
+ int ret;
+
+ osc_object_lock(ext->oe_obj);
+ ret = ext->oe_state == state;
+ osc_object_unlock(ext->oe_obj);
+
+ return ret;
+}
+
/**
* Wait for the extent's state to become @state.
*/
int state)
{
struct osc_object *obj = ext->oe_obj;
- struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
+ LWI_ON_SIGNAL_NOOP, NULL);
int rc = 0;
ENTRY;
osc_extent_release(env, ext);
/* wait for the extent until its state becomes @state */
- rc = l_wait_event(ext->oe_waitq, ext->oe_state == state, &lwi);
+ rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), &lwi);
+ if (rc == -ETIMEDOUT) {
+ OSC_EXTENT_DUMP(D_ERROR, ext,
+ "%s: wait ext to %d timedout, recovery in progress?\n",
+ osc_export(obj)->exp_obd->obd_name, state);
+
+ lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
+ &lwi);
+ }
if (rc == 0 && ext->oe_rc < 0)
rc = ext->oe_rc;
RETURN(rc);
* Discard pages with index greater than @size. If @ext is overlapped with
* @size, then partial truncate happens.
*/
-static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index)
+static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
+ bool partial)
{
struct cl_env_nest nest;
struct lu_env *env;
/* only discard the pages with their index greater than
* trunc_index, and ... */
- if (sub->cp_index < trunc_index) {
+ if (sub->cp_index < trunc_index ||
+ (sub->cp_index == trunc_index && partial)) {
/* accounting how many pages remaining in the chunk
* so that we can calculate grants correctly. */
if (sub->cp_index >> ppc_bits == trunc_chunk)
--ext->oe_nr_pages;
++nr_pages;
}
- EASSERTF(ergo(ext->oe_start >= trunc_index, ext->oe_nr_pages == 0),
- ext, "trunc_index %lu\n", trunc_index);
+ EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
+ ext->oe_nr_pages == 0),
+ ext, "trunc_index %lu, partial %d\n", trunc_index, partial);
osc_object_lock(obj);
if (ext->oe_nr_pages == 0) {
rc = osc_make_ready(env, oap, OBD_BRW_WRITE);
switch (rc) {
case 0:
- cfs_spin_lock(&oap->oap_lock);
+ spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_READY;
- cfs_spin_unlock(&oap->oap_lock);
+ spin_unlock(&oap->oap_lock);
break;
case -EALREADY:
LASSERT((oap->oap_async_flags & ASYNC_READY) != 0);
/* Clear opg->ops_transfer_pinned before VM lock is released. */
opg->ops_transfer_pinned = 0;
- cfs_spin_lock(&obj->oo_seatbelt);
+ spin_lock(&obj->oo_seatbelt);
LASSERT(opg->ops_submitter != NULL);
LASSERT(!cfs_list_empty(&opg->ops_inflight));
cfs_list_del_init(&opg->ops_inflight);
opg->ops_submitter = NULL;
- cfs_spin_unlock(&obj->oo_seatbelt);
+ spin_unlock(&obj->oo_seatbelt);
opg->ops_submit_time = 0;
srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
- cl_page_completion(env, page, crt, rc);
-
/* statistic */
if (rc == 0 && srvlock) {
struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
* reference counter protects page from concurrent reclaim.
*/
lu_ref_del(&page->cp_reference, "transfer", page);
- /*
- * As page->cp_obj is pinned by a reference from page->cp_req, it is
- * safe to call cl_page_put() without risking object destruction in a
- * non-blocking context.
- */
- cl_page_put(env, page);
+
+ cl_page_completion(env, page, crt, rc);
+
RETURN(0);
}
#define OSC_DUMP_GRANT(cli, fmt, args...) do { \
struct client_obd *__tmp = (cli); \
CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d " \
- "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " fmt, \
+ "unstable_pages: %d/%d dropped: %ld avail: %ld, " \
+ "reserved: %ld, flight: %d } " fmt, \
__tmp->cl_import->imp_obd->obd_name, \
__tmp->cl_dirty, __tmp->cl_dirty_max, \
cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages, \
+ cfs_atomic_read(&obd_unstable_pages), obd_max_dirty_pages, \
__tmp->cl_lost_grant, __tmp->cl_avail_grant, \
__tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args); \
} while (0)
} else {
cli->cl_avail_grant += unused;
}
- if (unused > 0)
- osc_wake_cache_waiters(cli);
}
void osc_unreserve_grant(struct client_obd *cli,
{
client_obd_list_lock(&cli->cl_loi_list_lock);
__osc_unreserve_grant(cli, reserved, unused);
+ if (unused > 0)
+ osc_wake_cache_waiters(cli);
client_obd_list_unlock(&cli->cl_loi_list_lock);
}
cli->cl_avail_grant, cli->cl_dirty);
}
-/* The companion to osc_enter_cache(), called when @oap is no longer part of
- * the dirty accounting. Writeback completes or truncate happens before
- * writing starts. Must be called with the loi lock held. */
+/**
+ * The companion to osc_enter_cache(), called when @oap is no longer part of
+ * the dirty accounting due to error.
+ */
static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap)
{
+ client_obd_list_lock(&cli->cl_loi_list_lock);
osc_release_write_grant(cli, &oap->oap_brw_page);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
}
/**
return 0;
if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
- cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
+ cfs_atomic_read(&obd_unstable_pages) + 1 +
+ cfs_atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
osc_consume_write_grant(cli, &oap->oap_brw_page);
if (transient) {
cli->cl_dirty_transit += CFS_PAGE_SIZE;
return rc;
}
-/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
- * grant or cache space. */
+static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
+{
+ int rc;
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ rc = cfs_list_empty(&ocw->ocw_entry);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ return rc;
+}
+
+/**
+ * The main entry to reserve dirty page accounting. Usually the grant reserved
+ * in this function will be freed in bulk in osc_free_grant() unless it fails
+ * to add osc cache, in that case, it will be freed in osc_exit_cache().
+ *
+ * The process will be put into sleep if it's already run out of grant.
+ */
static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
struct osc_async_page *oap, int bytes)
{
ocw.ocw_rc = 0;
client_obd_list_unlock(&cli->cl_loi_list_lock);
- osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND);
+ osc_io_unplug_async(env, cli, NULL);
CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
cli->cl_import->imp_obd->obd_name, &ocw, oap);
- rc = l_wait_event(ocw.ocw_waitq,
- cfs_list_empty(&ocw.ocw_entry), &lwi);
+ rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
client_obd_list_lock(&cli->cl_loi_list_lock);
- cfs_list_del_init(&ocw.ocw_entry);
- if (rc < 0)
- break;
+ /* l_wait_event is interrupted by signal */
+ if (rc < 0) {
+ cfs_list_del_init(&ocw.ocw_entry);
+ GOTO(out, rc);
+ }
+
+ LASSERT(cfs_list_empty(&ocw.ocw_entry));
rc = ocw.ocw_rc;
+
if (rc != -EDQUOT)
- break;
- if (osc_enter_cache_try(cli, oap, bytes, 0)) {
- rc = 0;
- break;
- }
+ GOTO(out, rc);
+ if (osc_enter_cache_try(cli, oap, bytes, 0))
+ GOTO(out, rc = 0);
}
EXIT;
-
out:
client_obd_list_unlock(&cli->cl_loi_list_lock);
OSC_DUMP_GRANT(cli, "returned %d.\n", rc);
ENTRY;
cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
- /* if we can't dirty more, we must wait until some is written */
- if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
- (cfs_atomic_read(&obd_dirty_pages) + 1 >
- obd_max_dirty_pages)) {
+ ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
+ cfs_list_del_init(&ocw->ocw_entry);
+
+ ocw->ocw_rc = -EDQUOT;
+ /* we can't dirty more */
+ if (cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max ||
+ cfs_atomic_read(&obd_unstable_pages) + 1 +
+ cfs_atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
"osc max %ld, sys max %d\n", cli->cl_dirty,
cli->cl_dirty_max, obd_max_dirty_pages);
- return;
- }
-
- /* if still dirty cache but no grant wait for pending RPCs that
- * may yet return us some grant before doing sync writes */
- if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
- CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
- cli->cl_w_in_flight);
- return;
+ goto wakeup;
}
- ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
- cfs_list_del_init(&ocw->ocw_entry);
-
ocw->ocw_rc = 0;
if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
ocw->ocw_rc = -EDQUOT;
+wakeup:
CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
ar->ar_force_sync = 0;
}
+/* Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable. */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ obd_count page_count = desc->bd_iov_count;
+ int i;
+
+ /* No unstable page tracking */
+ if (cli->cl_cache == NULL)
+ return;
+
+ LASSERT(page_count >= 0);
+
+ for (i = 0; i < page_count; i++)
+ dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+ cfs_atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr);
+ LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+
+ cfs_atomic_sub(page_count, &obd_unstable_pages);
+ LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+
+ spin_lock(&req->rq_lock);
+ req->rq_committed = 1;
+ req->rq_unstable = 0;
+ spin_unlock(&req->rq_lock);
+
+ cfs_waitq_broadcast(&cli->cl_cache->ccc_unstable_waitq);
+}
+
+/* "unstable" page accounting. See: osc_dec_unstable_pages. */
+void osc_inc_unstable_pages(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ obd_count page_count = desc->bd_iov_count;
+ int i;
+
+ /* No unstable page tracking */
+ if (cli->cl_cache == NULL)
+ return;
+
+ LASSERT(page_count >= 0);
+
+ for (i = 0; i < page_count; i++)
+ inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+ LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+ cfs_atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr);
+
+ LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+ cfs_atomic_add(page_count, &obd_unstable_pages);
+
+ spin_lock(&req->rq_lock);
+
+ /* If the request has already been committed (i.e. brw_commit
+ * called via rq_commit_cb), we need to undo the unstable page
+ * increments we just performed because rq_commit_cb wont be
+ * called again. Otherwise, just set the commit callback so the
+ * unstable page accounting is properly updated when the request
+ * is committed */
+ if (req->rq_committed) {
+ /* Drop lock before calling osc_dec_unstable_pages */
+ spin_unlock(&req->rq_lock);
+ osc_dec_unstable_pages(req);
+ spin_lock(&req->rq_lock);
+ } else {
+ req->rq_unstable = 1;
+ req->rq_commit_cb = osc_dec_unstable_pages;
+ }
+
+ spin_unlock(&req->rq_lock);
+}
+
/* this must be called holding the loi list lock to give coverage to exit_cache,
* async_flag maintenance, and oap_request */
static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
ENTRY;
if (oap->oap_request != NULL) {
+ if (rc == 0)
+ osc_inc_unstable_pages(oap->oap_request);
+
xid = ptlrpc_req_xid(oap->oap_request);
ptlrpc_req_finished(oap->oap_request);
oap->oap_request = NULL;
}
/* As the transfer for this page is being done, clear the flags */
- cfs_spin_lock(&oap->oap_lock);
+ spin_lock(&oap->oap_lock);
oap->oap_async_flags = 0;
- cfs_spin_unlock(&oap->oap_lock);
+ spin_unlock(&oap->oap_lock);
oap->oap_interrupted = 0;
if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) {
has_rpcs = __osc_list_maint(cli, osc);
if (has_rpcs) {
if (!async) {
+ /* disable osc_lru_shrink() temporarily to avoid
+ * potential stack overrun problem. LU-2859 */
+ cfs_atomic_inc(&cli->cl_lru_shrinkers);
osc_check_rpcs(env, cli, pol);
+ cfs_atomic_dec(&cli->cl_lru_shrinkers);
} else {
CDEBUG(D_CACHE, "Queue writeback work for client %p.\n",
cli);
CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
- cfs_spin_lock_init(&oap->oap_lock);
+ spin_lock_init(&oap->oap_lock);
CDEBUG(D_INFO, "oap %p page %p obj off "LPU64"\n",
oap, page, oap->oap_obj_off);
RETURN(0);
struct cl_page *cp = ops->ops_cl.cpl_page;
pgoff_t index = cp->cp_index;
struct osc_async_page *oap = &ops->ops_oap;
- int unplug = 0;
+ bool unplug = false;
int rc = 0;
ENTRY;
if (rc)
GOTO(out, rc);
- cfs_spin_lock(&oap->oap_lock);
+ spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
- cfs_spin_unlock(&oap->oap_lock);
+ spin_unlock(&oap->oap_lock);
if (cfs_memory_pressure_get())
ext->oe_memalloc = 1;
ext->oe_urgent = 1;
- if (ext->oe_state == OES_CACHE && cfs_list_empty(&ext->oe_link)) {
+ if (ext->oe_state == OES_CACHE) {
OSC_EXTENT_DUMP(D_CACHE, ext,
"flush page %p make it urgent.\n", oap);
- cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
- unplug = 1;
+ if (cfs_list_empty(&ext->oe_link))
+ cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ unplug = true;
}
rc = 0;
EXIT;
}
osc_object_unlock(obj);
- osc_io_unplug(env, cli, obj, PDL_POLICY_ROUND);
+ osc_io_unplug_async(env, cli, obj);
RETURN(0);
}
pgoff_t index;
CFS_LIST_HEAD(list);
int result = 0;
+ bool partial;
ENTRY;
/* pages with index greater or equal to index will be truncated. */
- index = cl_index(osc2cl(obj), size + CFS_PAGE_SIZE - 1);
+ index = cl_index(osc2cl(obj), size);
+ partial = size > cl_offset(osc2cl(obj), index);
again:
osc_object_lock(obj);
break;
}
+ OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:"LPU64".\n", size);
+
osc_extent_get(ext);
if (ext->oe_state == OES_ACTIVE) {
/* though we grab inode mutex for write path, but we
if (ext->oe_state != OES_TRUNC)
osc_extent_wait(env, ext, OES_TRUNC);
- rc = osc_extent_truncate(ext, index);
+ rc = osc_extent_truncate(ext, index, partial);
if (rc < 0) {
if (result == 0)
result = rc;
/* this must be an overlapped extent which means only
* part of pages in this extent have been truncated.
*/
- EASSERTF(ext->oe_start < index, ext,
- "trunc index = %lu.\n", index);
+ EASSERTF(ext->oe_start <= index, ext,
+ "trunc index = %lu/%d.\n", index, partial);
/* fix index to skip this partially truncated extent */
index = ext->oe_end + 1;
+ partial = false;
/* we need to hold this extent in OES_TRUNC state so
* that no writeback will happen. This is to avoid
osc_extent_put(env, ext);
}
if (waiting != NULL) {
- if (result == 0)
- result = osc_extent_wait(env, waiting, OES_INV);
+ int rc;
+
+ /* ignore the result of osc_extent_wait the write initiator
+ * should take care of it. */
+ rc = osc_extent_wait(env, waiting, OES_INV);
+ if (rc < 0)
+ OSC_EXTENT_DUMP(D_CACHE, ext, "wait error: %d.\n", rc);
osc_extent_put(env, waiting);
waiting = NULL;
- if (result == 0)
- goto again;
+ goto again;
}
RETURN(result);
}
oio->oi_trunc = NULL;
if (ext != NULL) {
+ bool unplug = false;
+
EASSERT(ext->oe_nr_pages > 0, ext);
EASSERT(ext->oe_state == OES_TRUNC, ext);
EASSERT(!ext->oe_urgent, ext);
if (ext->oe_fsync_wait && !ext->oe_urgent) {
ext->oe_urgent = 1;
cfs_list_move_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ unplug = true;
}
osc_update_pending(obj, OBD_BRW_WRITE, ext->oe_nr_pages);
osc_object_unlock(obj);
osc_extent_put(env, ext);
- osc_list_maint(osc_cli(obj), obj);
+ if (unplug)
+ osc_io_unplug_async(env, osc_cli(obj), obj);
}
}
* The caller must have called osc_cache_writeback_range() to issue IO
* otherwise it will take a long time for this function to finish.
*
- * Caller must hold inode_mutex and i_alloc_sem, or cancel exclusive
- * dlm lock so that nobody else can dirty this range of file while we're
- * waiting for extents to be written.
+ * Caller must hold inode_mutex , or cancel exclusive dlm lock so that
+ * nobody else can dirty this range of file while we're waiting for
+ * extents to be written.
*/
int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
pgoff_t start, pgoff_t end)
{
struct osc_extent *ext;
CFS_LIST_HEAD(discard_list);
- int unplug = 0;
+ bool unplug = false;
int result = 0;
ENTRY;
ext->oe_urgent = 1;
list = &obj->oo_urgent_exts;
}
- if (list != NULL) {
+ if (list != NULL)
cfs_list_move_tail(&ext->oe_link, list);
- unplug = 1;
- }
+ unplug = true;
} else {
/* the only discarder is lock cancelling, so
* [start, end] must contain this extent */