* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2013, Intel Corporation.
*
*/
/*
#define OSC_DUMP_GRANT(cli, fmt, args...) do { \
struct client_obd *__tmp = (cli); \
CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d " \
- "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " fmt, \
+ "unstable_pages: %d/%d dropped: %ld avail: %ld, " \
+ "reserved: %ld, flight: %d } " fmt, \
__tmp->cl_import->imp_obd->obd_name, \
__tmp->cl_dirty, __tmp->cl_dirty_max, \
cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages, \
+ cfs_atomic_read(&obd_unstable_pages), obd_max_dirty_pages, \
__tmp->cl_lost_grant, __tmp->cl_avail_grant, \
__tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args); \
} while (0)
return 0;
if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
- cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
+ cfs_atomic_read(&obd_unstable_pages) + 1 +
+ cfs_atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
osc_consume_write_grant(cli, &oap->oap_brw_page);
if (transient) {
cli->cl_dirty_transit += CFS_PAGE_SIZE;
{
int rc;
client_obd_list_lock(&cli->cl_loi_list_lock);
- rc = cfs_list_empty(&ocw->ocw_entry) || cli->cl_w_in_flight == 0;
+ rc = cfs_list_empty(&ocw->ocw_entry);
client_obd_list_unlock(&cli->cl_loi_list_lock);
return rc;
}
cfs_waitq_init(&ocw.ocw_waitq);
ocw.ocw_oap = oap;
ocw.ocw_grant = bytes;
- if (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) {
+ while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) {
cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
ocw.ocw_rc = 0;
client_obd_list_unlock(&cli->cl_loi_list_lock);
GOTO(out, rc);
}
- /* If ocw_entry isn't empty, which means it's not waked up
- * by osc_wake_cache_waiters(), then the page must not be
- * granted yet. */
- if (!cfs_list_empty(&ocw.ocw_entry)) {
- rc = -EDQUOT;
- cfs_list_del_init(&ocw.ocw_entry);
- } else {
- rc = ocw.ocw_rc;
- }
+ LASSERT(cfs_list_empty(&ocw.ocw_entry));
+ rc = ocw.ocw_rc;
if (rc != -EDQUOT)
GOTO(out, rc);
if (osc_enter_cache_try(cli, oap, bytes, 0))
- rc = 0;
+ GOTO(out, rc = 0);
}
EXIT;
out:
ENTRY;
cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
- /* if we can't dirty more, we must wait until some is written */
- if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
- (cfs_atomic_read(&obd_dirty_pages) + 1 >
- obd_max_dirty_pages)) {
+ ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
+ cfs_list_del_init(&ocw->ocw_entry);
+
+ ocw->ocw_rc = -EDQUOT;
+ /* we can't dirty more */
+ if (cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max ||
+ cfs_atomic_read(&obd_unstable_pages) + 1 +
+ cfs_atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
"osc max %ld, sys max %d\n", cli->cl_dirty,
cli->cl_dirty_max, obd_max_dirty_pages);
- return;
+ goto wakeup;
}
- /* if still dirty cache but no grant wait for pending RPCs that
- * may yet return us some grant before doing sync writes */
- if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
- CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
- cli->cl_w_in_flight);
- return;
- }
-
- ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
- cfs_list_del_init(&ocw->ocw_entry);
-
ocw->ocw_rc = 0;
if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
ocw->ocw_rc = -EDQUOT;
+wakeup:
CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
ar->ar_force_sync = 0;
}
+/* Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable. */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ obd_count page_count = desc->bd_iov_count;
+ int i;
+
+ /* No unstable page tracking */
+ if (cli->cl_cache == NULL)
+ return;
+
+ LASSERT(page_count >= 0);
+
+ for (i = 0; i < page_count; i++)
+ dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+ cfs_atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr);
+ LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+
+ cfs_atomic_sub(page_count, &obd_unstable_pages);
+ LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+
+ spin_lock(&req->rq_lock);
+ req->rq_committed = 1;
+ req->rq_unstable = 0;
+ spin_unlock(&req->rq_lock);
+
+ cfs_waitq_broadcast(&cli->cl_cache->ccc_unstable_waitq);
+}
+
+/* "unstable" page accounting. See: osc_dec_unstable_pages. */
+void osc_inc_unstable_pages(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ obd_count page_count = desc->bd_iov_count;
+ int i;
+
+ /* No unstable page tracking */
+ if (cli->cl_cache == NULL)
+ return;
+
+ LASSERT(page_count >= 0);
+
+ for (i = 0; i < page_count; i++)
+ inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+ LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+ cfs_atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr);
+
+ LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+ cfs_atomic_add(page_count, &obd_unstable_pages);
+
+ spin_lock(&req->rq_lock);
+
+ /* If the request has already been committed (i.e. brw_commit
+ * called via rq_commit_cb), we need to undo the unstable page
+ * increments we just performed because rq_commit_cb wont be
+ * called again. Otherwise, just set the commit callback so the
+ * unstable page accounting is properly updated when the request
+ * is committed */
+ if (req->rq_committed) {
+ /* Drop lock before calling osc_dec_unstable_pages */
+ spin_unlock(&req->rq_lock);
+ osc_dec_unstable_pages(req);
+ spin_lock(&req->rq_lock);
+ } else {
+ req->rq_unstable = 1;
+ req->rq_commit_cb = osc_dec_unstable_pages;
+ }
+
+ spin_unlock(&req->rq_lock);
+}
+
/* this must be called holding the loi list lock to give coverage to exit_cache,
* async_flag maintenance, and oap_request */
static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
ENTRY;
if (oap->oap_request != NULL) {
+ if (rc == 0)
+ osc_inc_unstable_pages(oap->oap_request);
+
xid = ptlrpc_req_xid(oap->oap_request);
ptlrpc_req_finished(oap->oap_request);
oap->oap_request = NULL;
has_rpcs = __osc_list_maint(cli, osc);
if (has_rpcs) {
if (!async) {
+ /* disable osc_lru_shrink() temporarily to avoid
+ * potential stack overrun problem. LU-2859 */
+ cfs_atomic_inc(&cli->cl_lru_shrinkers);
osc_check_rpcs(env, cli, pol);
+ cfs_atomic_dec(&cli->cl_lru_shrinkers);
} else {
CDEBUG(D_CACHE, "Queue writeback work for client %p.\n",
cli);
}
osc_object_unlock(obj);
- osc_io_unplug(env, cli, obj, PDL_POLICY_ROUND);
+ osc_io_unplug_async(env, cli, obj);
RETURN(0);
}