* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2013, Intel Corporation.
*
*/
/*
#define EXTSTR "[%lu -> %lu/%lu]"
#define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
+static const char *oes_strings[] = {
+ "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
#define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do { \
struct osc_extent *__ext = (extent); \
- const char *__str[] = OES_STRINGS; \
char __buf[16]; \
\
CDEBUG(lvl, \
"extent %p@{" EXTSTR ", " \
"[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt, \
- /* ----- extent part 0 ----- */ \
+ /* ----- extent part 0 ----- */ \
__ext, EXTPARA(__ext), \
/* ----- part 1 ----- */ \
cfs_atomic_read(&__ext->oe_refc), \
cfs_atomic_read(&__ext->oe_users), \
list_empty_marker(&__ext->oe_link), \
- __str[__ext->oe_state], ext_flags(__ext, __buf), \
+ oes_strings[__ext->oe_state], ext_flags(__ext, __buf), \
__ext->oe_obj, \
/* ----- part 2 ----- */ \
__ext->oe_grants, __ext->oe_nr_pages, \
list_empty_marker(&__ext->oe_pages), \
- cfs_waitq_active(&__ext->oe_waitq) ? '+' : '-', \
+ waitqueue_active(&__ext->oe_waitq) ? '+' : '-', \
__ext->oe_osclock, __ext->oe_mppr, __ext->oe_owner, \
/* ----- part 4 ----- */ \
## __VA_ARGS__); \
#undef EASSERTF
#define EASSERTF(expr, ext, fmt, args...) do { \
if (!(expr)) { \
- OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args); \
- osc_extent_tree_dump(D_ERROR, (ext)->oe_obj); \
+ OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args); \
+ osc_extent_tree_dump(D_ERROR, (ext)->oe_obj); \
LASSERT(expr); \
- } \
+ } \
} while (0)
#undef EASSERT
page_count = 0;
cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
- pgoff_t index = oap2cl_page(oap)->cp_index;
+ pgoff_t index = osc_index(oap2osc(oap));
++page_count;
if (index > ext->oe_end || index < ext->oe_start)
GOTO(out, rc = 110);
/* TODO: validate the state machine */
ext->oe_state = state;
- cfs_waitq_broadcast(&ext->oe_waitq);
+ wake_up_all(&ext->oe_waitq);
}
static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
{
struct osc_extent *ext;
- OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, CFS_ALLOC_STD);
+ OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, GFP_IOFS);
if (ext == NULL)
return NULL;
CFS_INIT_LIST_HEAD(&ext->oe_link);
ext->oe_state = OES_INV;
CFS_INIT_LIST_HEAD(&ext->oe_pages);
- cfs_waitq_init(&ext->oe_waitq);
+ init_waitqueue_head(&ext->oe_waitq);
ext->oe_osclock = NULL;
return ext;
return -ERANGE;
LASSERT(cur->oe_osclock == victim->oe_osclock);
- ppc_bits = osc_cli(obj)->cl_chunkbits - CFS_PAGE_SHIFT;
+ ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_CACHE_SHIFT;
chunk_start = cur->oe_start >> ppc_bits;
chunk_end = cur->oe_end >> ppc_bits;
if (chunk_start != (victim->oe_end >> ppc_bits) + 1 &&
LASSERT(lock != NULL);
LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
- LASSERT(cli->cl_chunkbits >= CFS_PAGE_SHIFT);
- ppc_bits = cli->cl_chunkbits - CFS_PAGE_SHIFT;
+ LASSERT(cli->cl_chunkbits >= PAGE_CACHE_SHIFT);
+ ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
chunk_mask = ~((1 << ppc_bits) - 1);
chunksize = 1 << cli->cl_chunkbits;
chunk = index >> ppc_bits;
ext->oe_rc = rc ?: ext->oe_nr_pages;
EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext);
+
+ osc_lru_add_batch(cli, &ext->oe_pages);
cfs_list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
oap_pending_item) {
cfs_list_del_init(&oap->oap_rpc_item);
if (!sent) {
lost_grant = ext->oe_grants;
- } else if (blocksize < CFS_PAGE_SIZE &&
- last_count != CFS_PAGE_SIZE) {
+ } else if (blocksize < PAGE_CACHE_SIZE &&
+ last_count != PAGE_CACHE_SIZE) {
/* For short writes we shouldn't count parts of pages that
* span a whole chunk on the OST side, or our accounting goes
* wrong. Should match the code in filter_grant_check. */
- int offset = oap->oap_page_off & ~CFS_PAGE_MASK;
- int count = oap->oap_count + (offset & (blocksize - 1));
- int end = (offset + oap->oap_count) & (blocksize - 1);
+ int offset = last_off & ~CFS_PAGE_MASK;
+ int count = last_count + (offset & (blocksize - 1));
+ int end = (offset + last_count) & (blocksize - 1);
if (end)
count += blocksize - end;
- lost_grant = CFS_PAGE_SIZE - count;
+ lost_grant = PAGE_CACHE_SIZE - count;
}
if (ext->oe_grants > 0)
osc_free_grant(cli, nr_pages, lost_grant);
"%s: wait ext to %d timedout, recovery in progress?\n",
osc_export(obj)->exp_obd->obd_name, state);
- lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ lwi = LWI_INTR(NULL, NULL);
rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
&lwi);
}
struct osc_async_page *oap;
struct osc_async_page *tmp;
int pages_in_chunk = 0;
- int ppc_bits = cli->cl_chunkbits - CFS_PAGE_SHIFT;
+ int ppc_bits = cli->cl_chunkbits -
+ PAGE_CACHE_SHIFT;
__u64 trunc_chunk = trunc_index >> ppc_bits;
int grants = 0;
int nr_pages = 0;
/* discard all pages with index greater then trunc_index */
cfs_list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
oap_pending_item) {
- struct cl_page *sub = oap2cl_page(oap);
- struct cl_page *page = cl_page_top(sub);
+ pgoff_t index = osc_index(oap2osc(oap));
+ struct cl_page *page = oap2cl_page(oap);
LASSERT(cfs_list_empty(&oap->oap_rpc_item));
/* only discard the pages with their index greater than
* trunc_index, and ... */
- if (sub->cp_index < trunc_index ||
- (sub->cp_index == trunc_index && partial)) {
+ if (index < trunc_index ||
+ (index == trunc_index && partial)) {
/* accounting how many pages remaining in the chunk
* so that we can calculate grants correctly. */
- if (sub->cp_index >> ppc_bits == trunc_chunk)
+ if (index >> ppc_bits == trunc_chunk)
++pages_in_chunk;
continue;
}
cfs_list_del_init(&oap->oap_pending_item);
cl_page_get(page);
- lu_ref_add(&page->cp_reference, "truncate", cfs_current());
+ lu_ref_add(&page->cp_reference, "truncate", current);
if (cl_page_own(env, io, page) == 0) {
- cl_page_unmap(env, io, page);
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
} else {
LASSERT(0);
}
- lu_ref_del(&page->cp_reference, "truncate", cfs_current());
+ lu_ref_del(&page->cp_reference, "truncate", current);
cl_page_put(env, page);
--ext->oe_nr_pages;
if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
LASSERT(last->oap_count > 0);
- LASSERT(last->oap_page_off + last->oap_count <= CFS_PAGE_SIZE);
+ LASSERT(last->oap_page_off + last->oap_count <= PAGE_CACHE_SIZE);
+ spin_lock(&last->oap_lock);
last->oap_async_flags |= ASYNC_COUNT_STABLE;
+ spin_unlock(&last->oap_lock);
}
/* for the rest of pages, we don't need to call osf_refresh_count()
* because it's known they are not the last page */
cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
- oap->oap_count = CFS_PAGE_SIZE - oap->oap_page_off;
+ oap->oap_count = PAGE_CACHE_SIZE - oap->oap_page_off;
+ spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_COUNT_STABLE;
+ spin_unlock(&oap->oap_lock);
}
}
struct osc_object *obj = ext->oe_obj;
struct client_obd *cli = osc_cli(obj);
struct osc_extent *next;
- int ppc_bits = cli->cl_chunkbits - CFS_PAGE_SHIFT;
+ int ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
pgoff_t chunk = index >> ppc_bits;
pgoff_t end_chunk;
pgoff_t end_index;
int cmd)
{
struct osc_page *opg = oap2osc_page(oap);
- struct cl_page *page = cl_page_top(oap2cl_page(oap));
+ struct cl_page *page = oap2cl_page(oap);
int result;
LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
struct osc_async_page *oap, int cmd)
{
struct osc_page *opg = oap2osc_page(oap);
- struct cl_page *page = oap2cl_page(oap);
+ pgoff_t index = osc_index(oap2osc(oap));
struct cl_object *obj;
struct cl_attr *attr = &osc_env_info(env)->oti_attr;
if (result < 0)
return result;
kms = attr->cat_kms;
- if (cl_offset(obj, page->cp_index) >= kms)
+ if (cl_offset(obj, index) >= kms)
/* catch race with truncate */
return 0;
- else if (cl_offset(obj, page->cp_index + 1) > kms)
+ else if (cl_offset(obj, index + 1) > kms)
/* catch sub-page write at end of file */
- return kms % CFS_PAGE_SIZE;
+ return kms % PAGE_CACHE_SIZE;
else
- return CFS_PAGE_SIZE;
+ return PAGE_CACHE_SIZE;
}
static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
int cmd, int rc)
{
struct osc_page *opg = oap2osc_page(oap);
- struct cl_page *page = cl_page_top(oap2cl_page(oap));
+ struct cl_page *page = oap2cl_page(oap);
struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
enum cl_req_type crt;
int srvlock;
ENTRY;
cmd &= ~OBD_BRW_NOQUOTA;
- LASSERT(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ));
- LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
+ LASSERTF(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ),
+ "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
+ LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE),
+ "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
LASSERT(opg->ops_transfer_pinned);
/*
RETURN(0);
}
-#define OSC_DUMP_GRANT(cli, fmt, args...) do { \
+#define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do { \
struct client_obd *__tmp = (cli); \
- CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d " \
- "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " fmt, \
+ CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d " \
+ "unstable_pages: %d/%d dropped: %ld avail: %ld, " \
+ "reserved: %ld, flight: %d } lru {in list: %d, " \
+ "left: %d, waiters: %d }" fmt, \
__tmp->cl_import->imp_obd->obd_name, \
__tmp->cl_dirty, __tmp->cl_dirty_max, \
cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages, \
+ cfs_atomic_read(&obd_unstable_pages), obd_max_dirty_pages, \
__tmp->cl_lost_grant, __tmp->cl_avail_grant, \
- __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args); \
+ __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, \
+ cfs_atomic_read(&__tmp->cl_lru_in_list), \
+ cfs_atomic_read(&__tmp->cl_lru_busy), \
+ cfs_atomic_read(&__tmp->cl_lru_shrinkers), ##args); \
} while (0)
/* caller must hold loi_list_lock */
static void osc_consume_write_grant(struct client_obd *cli,
struct brw_page *pga)
{
- LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
+ LASSERT(spin_is_locked(&cli->cl_loi_list_lock.lock));
LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
cfs_atomic_inc(&obd_dirty_pages);
- cli->cl_dirty += CFS_PAGE_SIZE;
+ cli->cl_dirty += PAGE_CACHE_SIZE;
pga->flag |= OBD_BRW_FROM_GRANT;
CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
- CFS_PAGE_SIZE, pga, pga->pg);
+ PAGE_CACHE_SIZE, pga, pga->pg);
osc_update_next_shrink(cli);
}
{
ENTRY;
- LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
+ LASSERT(spin_is_locked(&cli->cl_loi_list_lock.lock));
if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
EXIT;
return;
pga->flag &= ~OBD_BRW_FROM_GRANT;
cfs_atomic_dec(&obd_dirty_pages);
- cli->cl_dirty -= CFS_PAGE_SIZE;
+ cli->cl_dirty -= PAGE_CACHE_SIZE;
if (pga->flag & OBD_BRW_NOCACHE) {
pga->flag &= ~OBD_BRW_NOCACHE;
cfs_atomic_dec(&obd_dirty_transit_pages);
- cli->cl_dirty_transit -= CFS_PAGE_SIZE;
+ cli->cl_dirty_transit -= PAGE_CACHE_SIZE;
}
EXIT;
}
* used, we should return these grants to OST. There're two cases where grants
* can be lost:
* 1. truncate;
- * 2. blocksize at OST is less than CFS_PAGE_SIZE and a partial page was
+ * 2. blocksize at OST is less than PAGE_CACHE_SIZE and a partial page was
* written. In this case OST may use less chunks to serve this partial
* write. OSTs don't actually know the page size on the client side. so
* clients have to calculate lost grant by the blocksize on the OST.
client_obd_list_lock(&cli->cl_loi_list_lock);
cfs_atomic_sub(nr_pages, &obd_dirty_pages);
- cli->cl_dirty -= nr_pages << CFS_PAGE_SHIFT;
+ cli->cl_dirty -= nr_pages << PAGE_CACHE_SHIFT;
cli->cl_lost_grant += lost_grant;
if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) {
/* borrow some grant from truncate to avoid the case that
{
int rc;
- OSC_DUMP_GRANT(cli, "need:%d.\n", bytes);
+ OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
rc = osc_reserve_grant(cli, bytes);
if (rc < 0)
return 0;
- if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
- cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
+ if (cli->cl_dirty + PAGE_CACHE_SIZE <= cli->cl_dirty_max &&
+ cfs_atomic_read(&obd_unstable_pages) + 1 +
+ cfs_atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
osc_consume_write_grant(cli, &oap->oap_brw_page);
if (transient) {
- cli->cl_dirty_transit += CFS_PAGE_SIZE;
+ cli->cl_dirty_transit += PAGE_CACHE_SIZE;
cfs_atomic_inc(&obd_dirty_transit_pages);
oap->oap_brw_flags |= OBD_BRW_NOCACHE;
}
{
int rc;
client_obd_list_lock(&cli->cl_loi_list_lock);
- rc = cfs_list_empty(&ocw->ocw_entry) || cli->cl_w_in_flight == 0;
+ rc = cfs_list_empty(&ocw->ocw_entry);
client_obd_list_unlock(&cli->cl_loi_list_lock);
return rc;
}
struct osc_object *osc = oap->oap_obj;
struct lov_oinfo *loi = osc->oo_oinfo;
struct osc_cache_waiter ocw;
- struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
+ LWI_ON_SIGNAL_NOOP, NULL);
int rc = -EDQUOT;
ENTRY;
- OSC_DUMP_GRANT(cli, "need:%d.\n", bytes);
+ OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
client_obd_list_lock(&cli->cl_loi_list_lock);
/* force the caller to try sync io. this can jump the list
* of queued writes and create a discontiguous rpc stream */
if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
- cli->cl_dirty_max < CFS_PAGE_SIZE ||
+ cli->cl_dirty_max < PAGE_CACHE_SIZE ||
cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
GOTO(out, rc = -EDQUOT);
* RPC size will be.
* The exiting condition is no avail grants and no dirty pages caching,
* that really means there is no space on the OST. */
- cfs_waitq_init(&ocw.ocw_waitq);
+ init_waitqueue_head(&ocw.ocw_waitq);
ocw.ocw_oap = oap;
ocw.ocw_grant = bytes;
- if (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) {
+ while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) {
cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
ocw.ocw_rc = 0;
client_obd_list_unlock(&cli->cl_loi_list_lock);
client_obd_list_lock(&cli->cl_loi_list_lock);
- /* l_wait_event is interrupted by signal */
+ /* l_wait_event is interrupted by signal, or timed out */
if (rc < 0) {
+ switch (rc) {
+ case -ETIMEDOUT:
+ OSC_DUMP_GRANT(D_ERROR, cli,
+ "try to reserve %d.\n", bytes);
+ osc_extent_tree_dump(D_ERROR, osc);
+ rc = -EDQUOT;
+ break;
+ case -EINTR:
+ /* Ensures restartability - LU-3581 */
+ rc = -ERESTARTSYS;
+ break;
+ default:
+ CDEBUG(D_CACHE, "%s: event for cache space @"
+ " %p never arrived due to %d\n",
+ cli->cl_import->imp_obd->obd_name,
+ &ocw, rc);
+ break;
+ }
cfs_list_del_init(&ocw.ocw_entry);
GOTO(out, rc);
}
- /* If ocw_entry isn't empty, which means it's not waked up
- * by osc_wake_cache_waiters(), then the page must not be
- * granted yet. */
- if (!cfs_list_empty(&ocw.ocw_entry)) {
- rc = -EDQUOT;
- cfs_list_del_init(&ocw.ocw_entry);
- } else {
- rc = ocw.ocw_rc;
- }
+ LASSERT(cfs_list_empty(&ocw.ocw_entry));
+ rc = ocw.ocw_rc;
if (rc != -EDQUOT)
GOTO(out, rc);
if (osc_enter_cache_try(cli, oap, bytes, 0))
- rc = 0;
+ GOTO(out, rc = 0);
}
EXIT;
out:
client_obd_list_unlock(&cli->cl_loi_list_lock);
- OSC_DUMP_GRANT(cli, "returned %d.\n", rc);
+ OSC_DUMP_GRANT(D_CACHE, cli, "returned %d.\n", rc);
RETURN(rc);
}
ENTRY;
cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
- /* if we can't dirty more, we must wait until some is written */
- if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
- (cfs_atomic_read(&obd_dirty_pages) + 1 >
- obd_max_dirty_pages)) {
+ ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
+ cfs_list_del_init(&ocw->ocw_entry);
+
+ ocw->ocw_rc = -EDQUOT;
+ /* we can't dirty more */
+ if ((cli->cl_dirty + PAGE_CACHE_SIZE > cli->cl_dirty_max) ||
+ (cfs_atomic_read(&obd_unstable_pages) + 1 +
+ cfs_atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) {
CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
"osc max %ld, sys max %d\n", cli->cl_dirty,
cli->cl_dirty_max, obd_max_dirty_pages);
- return;
- }
-
- /* if still dirty cache but no grant wait for pending RPCs that
- * may yet return us some grant before doing sync writes */
- if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
- CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
- cli->cl_w_in_flight);
- return;
+ goto wakeup;
}
- ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
- cfs_list_del_init(&ocw->ocw_entry);
-
ocw->ocw_rc = 0;
if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
ocw->ocw_rc = -EDQUOT;
+wakeup:
CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
- cfs_waitq_signal(&ocw->ocw_waitq);
+ wake_up(&ocw->ocw_waitq);
}
EXIT;
ar->ar_force_sync = 0;
}
+/* Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable. */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ obd_count page_count = desc->bd_iov_count;
+ int i;
+
+ /* No unstable page tracking */
+ if (cli->cl_cache == NULL)
+ return;
+
+ LASSERT(page_count >= 0);
+
+ for (i = 0; i < page_count; i++)
+ dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+ cfs_atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr);
+ LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+
+ cfs_atomic_sub(page_count, &cli->cl_unstable_count);
+ LASSERT(cfs_atomic_read(&cli->cl_unstable_count) >= 0);
+
+ cfs_atomic_sub(page_count, &obd_unstable_pages);
+ LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+
+ spin_lock(&req->rq_lock);
+ req->rq_committed = 1;
+ req->rq_unstable = 0;
+ spin_unlock(&req->rq_lock);
+
+ wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
+}
+
+/* "unstable" page accounting. See: osc_dec_unstable_pages. */
+void osc_inc_unstable_pages(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ obd_count page_count = desc->bd_iov_count;
+ int i;
+
+ /* No unstable page tracking */
+ if (cli->cl_cache == NULL)
+ return;
+
+ LASSERT(page_count >= 0);
+
+ for (i = 0; i < page_count; i++)
+ inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+ LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+ cfs_atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr);
+
+ LASSERT(cfs_atomic_read(&cli->cl_unstable_count) >= 0);
+ cfs_atomic_add(page_count, &cli->cl_unstable_count);
+
+ LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+ cfs_atomic_add(page_count, &obd_unstable_pages);
+
+ spin_lock(&req->rq_lock);
+
+ /* If the request has already been committed (i.e. brw_commit
+ * called via rq_commit_cb), we need to undo the unstable page
+ * increments we just performed because rq_commit_cb wont be
+ * called again. Otherwise, just set the commit callback so the
+ * unstable page accounting is properly updated when the request
+ * is committed */
+ if (req->rq_committed) {
+ /* Drop lock before calling osc_dec_unstable_pages */
+ spin_unlock(&req->rq_lock);
+ osc_dec_unstable_pages(req);
+ spin_lock(&req->rq_lock);
+ } else {
+ req->rq_unstable = 1;
+ req->rq_commit_cb = osc_dec_unstable_pages;
+ }
+
+ spin_unlock(&req->rq_lock);
+}
+
/* this must be called holding the loi list lock to give coverage to exit_cache,
* async_flag maintenance, and oap_request */
static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
ENTRY;
if (oap->oap_request != NULL) {
+ if (rc == 0)
+ osc_inc_unstable_pages(oap->oap_request);
+
xid = ptlrpc_req_xid(oap->oap_request);
ptlrpc_req_finished(oap->oap_request);
oap->oap_request = NULL;
RETURN(0);
cfs_list_for_each_entry(tmp, rpclist, oe_link) {
- EASSERT(tmp->oe_owner == cfs_current(), tmp);
+ EASSERT(tmp->oe_owner == current, tmp);
#if 0
if (overlapped(tmp, ext)) {
OSC_EXTENT_DUMP(D_ERROR, tmp, "overlapped %p.\n", ext);
*pc += ext->oe_nr_pages;
cfs_list_move_tail(&ext->oe_link, rpclist);
- ext->oe_owner = cfs_current();
+ ext->oe_owner = current;
RETURN(1);
}
while ((osc = osc_next_obj(cli)) != NULL) {
struct cl_object *obj = osc2cl(osc);
- struct lu_ref_link *link;
+ struct lu_ref_link link;
OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
cl_object_get(obj);
client_obd_list_unlock(&cli->cl_loi_list_lock);
- link = lu_object_ref_add(&obj->co_lu, "check", cfs_current());
+ lu_object_ref_add_at(&obj->co_lu, &link, "check", current);
/* attempt some read/write balancing by alternating between
* reads and writes in an object. The makes_rpc checks here
osc_object_unlock(osc);
osc_list_maint(cli, osc);
- lu_object_ref_del_at(&obj->co_lu, link, "check", cfs_current());
+ lu_object_ref_del_at(&obj->co_lu, &link, "check", current);
cl_object_put(env, obj);
client_obd_list_lock(&cli->cl_loi_list_lock);
static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
struct osc_object *osc, pdl_policy_t pol, int async)
{
- int has_rpcs = 1;
int rc = 0;
- client_obd_list_lock(&cli->cl_loi_list_lock);
- if (osc != NULL)
- has_rpcs = __osc_list_maint(cli, osc);
- if (has_rpcs) {
- if (!async) {
- osc_check_rpcs(env, cli, pol);
- } else {
- CDEBUG(D_CACHE, "Queue writeback work for client %p.\n",
- cli);
- LASSERT(cli->cl_writeback_work != NULL);
- rc = ptlrpcd_queue_work(cli->cl_writeback_work);
- }
+ if (osc != NULL && osc_list_maint(cli, osc) == 0)
+ return 0;
+
+ if (!async) {
+ /* disable osc_lru_shrink() temporarily to avoid
+ * potential stack overrun problem. LU-2859 */
+ cfs_atomic_inc(&cli->cl_lru_shrinkers);
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ osc_check_rpcs(env, cli, pol);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ cfs_atomic_dec(&cli->cl_lru_shrinkers);
+ } else {
+ CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli);
+ LASSERT(cli->cl_writeback_work != NULL);
+ rc = ptlrpcd_queue_work(cli->cl_writeback_work);
}
- client_obd_list_unlock(&cli->cl_loi_list_lock);
return rc;
}
}
int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
- cfs_page_t *page, loff_t offset)
+ struct page *page, loff_t offset)
{
struct obd_export *exp = osc_export(osc);
struct osc_async_page *oap = &ops->ops_oap;
RETURN(rc);
}
+ if (osc_over_unstable_soft_limit(cli))
+ brw_flags |= OBD_BRW_SOFT_SYNC;
+
oap->oap_cmd = cmd;
oap->oap_page_off = ops->ops_from;
oap->oap_count = ops->ops_to - ops->ops_from;
+ /* No need to hold a lock here,
+ * since this page is not in any list yet. */
oap->oap_async_flags = 0;
oap->oap_brw_flags = brw_flags;
OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
oap, oap->oap_page, oap->oap_cmd & OBD_BRW_RWMASK);
- index = oap2cl_page(oap)->cp_index;
+ index = osc_index(oap2osc(oap));
/* Add this page into extent by the following steps:
* 1. if there exists an active extent for this IO, mostly this page
LASSERT(oap->oap_magic == OAP_MAGIC);
CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
- oap, ops, oap2cl_page(oap)->cp_index);
+ oap, ops, osc_index(oap2osc(oap)));
osc_object_lock(obj);
if (!cfs_list_empty(&oap->oap_rpc_item)) {
CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap);
rc = -EBUSY;
} else if (!cfs_list_empty(&oap->oap_pending_item)) {
- ext = osc_extent_lookup(obj, oap2cl_page(oap)->cp_index);
+ ext = osc_extent_lookup(obj, osc_index(oap2osc(oap)));
/* only truncated pages are allowed to be taken out.
* See osc_extent_truncate() and osc_cache_truncate_start()
* for details. */
if (ext != NULL && ext->oe_state != OES_TRUNC) {
OSC_EXTENT_DUMP(D_ERROR, ext, "trunc at %lu.\n",
- oap2cl_page(oap)->cp_index);
+ osc_index(oap2osc(oap)));
rc = -EBUSY;
}
}
struct osc_extent *ext = NULL;
struct osc_object *obj = cl2osc(ops->ops_cl.cpl_obj);
struct cl_page *cp = ops->ops_cl.cpl_page;
- pgoff_t index = cp->cp_index;
+ pgoff_t index = osc_index(ops);
struct osc_async_page *oap = &ops->ops_oap;
bool unplug = false;
int rc = 0;
switch (ext->oe_state) {
case OES_RPC:
case OES_LOCK_DONE:
- CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(cp),
- "flush an in-rpc page?\n");
+ CL_PAGE_DEBUG(D_ERROR, env, cp, "flush an in-rpc page?\n");
LASSERT(0);
break;
case OES_LOCKING:
* really sending the RPC. */
case OES_TRUNC:
/* race with truncate, page will be redirtied */
+ case OES_ACTIVE:
+ /* The extent is active so we need to abort and let the caller
+ * re-dirty the page. If we continued on here, and we were the
+ * one making the extent active, we could deadlock waiting for
+ * the page writeback to clear but it won't because the extent
+ * is active and won't be written out. */
GOTO(out, rc = -EAGAIN);
default:
break;
}
- rc = cl_page_prep(env, io, cl_page_top(cp), CRT_WRITE);
+ rc = cl_page_prep(env, io, cp, CRT_WRITE);
if (rc)
GOTO(out, rc);
oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
spin_unlock(&oap->oap_lock);
- if (cfs_memory_pressure_get())
+ if (memory_pressure_get())
ext->oe_memalloc = 1;
ext->oe_urgent = 1;
struct osc_extent *ext;
struct osc_extent *found = NULL;
cfs_list_t *plist;
- pgoff_t index = oap2cl_page(oap)->cp_index;
+ pgoff_t index = osc_index(ops);
int rc = -EBUSY;
int cmd;
ENTRY;
ENTRY;
cfs_list_for_each_entry(oap, list, oap_pending_item) {
- struct cl_page *cp = oap2cl_page(oap);
- if (cp->cp_index > end)
- end = cp->cp_index;
- if (cp->cp_index < start)
- start = cp->cp_index;
+ pgoff_t index = osc_index(oap2osc(oap));
+ if (index > end)
+ end = index;
+ if (index < start)
+ start = index;
++page_count;
mppr <<= (page_count > mppr);
}
* should take care of it. */
rc = osc_extent_wait(env, waiting, OES_INV);
if (rc < 0)
- OSC_EXTENT_DUMP(D_CACHE, ext, "wait error: %d.\n", rc);
+ OSC_EXTENT_DUMP(D_CACHE, waiting, "error: %d.\n", rc);
osc_extent_put(env, waiting);
waiting = NULL;
EASSERT(ext->oe_start >= start &&
ext->oe_max_end <= end, ext);
osc_extent_state_set(ext, OES_LOCKING);
- ext->oe_owner = cfs_current();
+ ext->oe_owner = current;
cfs_list_move_tail(&ext->oe_link,
&discard_list);
osc_update_pending(obj, OBD_BRW_WRITE,
result = rc;
}
- OSC_IO_DEBUG(obj, "cache page out.\n");
+ OSC_IO_DEBUG(obj, "pageout [%lu, %lu], %d.\n", start, end, result);
RETURN(result);
}
+/**
+ * Returns a list of pages by a given [start, end] of \a obj.
+ *
+ * \param resched If not NULL, then we give up before hogging CPU for too
+ * long and set *resched = 1, in that case caller should implement a retry
+ * logic.
+ *
+ * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
+ * crucial in the face of [offset, EOF] locks.
+ *
+ * Return at least one page in @queue unless there is no covered page.
+ */
+int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
+ struct osc_object *osc, pgoff_t start, pgoff_t end,
+ osc_page_gang_cbt cb, void *cbdata)
+{
+ struct osc_page *ops;
+ void **pvec;
+ pgoff_t idx;
+ unsigned int nr;
+ unsigned int i;
+ unsigned int j;
+ int res = CLP_GANG_OKAY;
+ bool tree_lock = true;
+ ENTRY;
+
+ idx = start;
+ pvec = osc_env_info(env)->oti_pvec;
+ spin_lock(&osc->oo_tree_lock);
+ while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
+ idx, OTI_PVEC_SIZE)) > 0) {
+ struct cl_page *page;
+ bool end_of_region = false;
+
+ for (i = 0, j = 0; i < nr; ++i) {
+ ops = pvec[i];
+ pvec[i] = NULL;
+
+ idx = osc_index(ops);
+ if (idx > end) {
+ end_of_region = true;
+ break;
+ }
+
+ page = ops->ops_cl.cpl_page;
+ LASSERT(page->cp_type == CPT_CACHEABLE);
+ if (page->cp_state == CPS_FREEING)
+ continue;
+
+ cl_page_get(page);
+ lu_ref_add_atomic(&page->cp_reference,
+ "gang_lookup", current);
+ pvec[j++] = ops;
+ }
+ ++idx;
+
+ /*
+ * Here a delicate locking dance is performed. Current thread
+ * holds a reference to a page, but has to own it before it
+ * can be placed into queue. Owning implies waiting, so
+ * radix-tree lock is to be released. After a wait one has to
+ * check that pages weren't truncated (cl_page_own() returns
+ * error in the latter case).
+ */
+ spin_unlock(&osc->oo_tree_lock);
+ tree_lock = false;
+
+ for (i = 0; i < j; ++i) {
+ ops = pvec[i];
+ if (res == CLP_GANG_OKAY)
+ res = (*cb)(env, io, ops, cbdata);
+
+ page = ops->ops_cl.cpl_page;
+ lu_ref_del(&page->cp_reference, "gang_lookup", current);
+ cl_page_put(env, page);
+ }
+ if (nr < OTI_PVEC_SIZE || end_of_region)
+ break;
+
+ if (res == CLP_GANG_OKAY && need_resched())
+ res = CLP_GANG_RESCHED;
+ if (res != CLP_GANG_OKAY)
+ break;
+
+ spin_lock(&osc->oo_tree_lock);
+ tree_lock = true;
+ }
+ if (tree_lock)
+ spin_unlock(&osc->oo_tree_lock);
+ RETURN(res);
+}
+
+/**
+ * Check if page @page is covered by an extra lock or discard it.
+ */
+static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
+ struct osc_page *ops, void *cbdata)
+{
+ struct osc_thread_info *info = osc_env_info(env);
+ struct cl_lock *lock = cbdata;
+ pgoff_t index;
+
+ index = osc_index(ops);
+ if (index >= info->oti_fn_index) {
+ struct cl_lock *tmp;
+ struct cl_page *page = ops->ops_cl.cpl_page;
+
+ /* refresh non-overlapped index */
+ tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
+ lock, 1, 0);
+ if (tmp != NULL) {
+ /* Cache the first-non-overlapped index so as to skip
+ * all pages within [index, oti_fn_index). This
+ * is safe because if tmp lock is canceled, it will
+ * discard these pages. */
+ info->oti_fn_index = tmp->cll_descr.cld_end + 1;
+ if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
+ info->oti_fn_index = CL_PAGE_EOF;
+ cl_lock_put(env, tmp);
+ } else if (cl_page_own(env, io, page) == 0) {
+ /* discard the page */
+ cl_page_discard(env, io, page);
+ cl_page_disown(env, io, page);
+ } else {
+ LASSERT(page->cp_state == CPS_FREEING);
+ }
+ }
+
+ info->oti_next_index = index + 1;
+ return CLP_GANG_OKAY;
+}
+
+static int discard_cb(const struct lu_env *env, struct cl_io *io,
+ struct osc_page *ops, void *cbdata)
+{
+ struct osc_thread_info *info = osc_env_info(env);
+ struct cl_lock *lock = cbdata;
+ struct cl_page *page = ops->ops_cl.cpl_page;
+
+ LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
+
+ /* page is top page. */
+ info->oti_next_index = osc_index(ops) + 1;
+ if (cl_page_own(env, io, page) == 0) {
+ KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
+ !PageDirty(cl_page_vmpage(page))));
+
+ /* discard the page */
+ cl_page_discard(env, io, page);
+ cl_page_disown(env, io, page);
+ } else {
+ LASSERT(page->cp_state == CPS_FREEING);
+ }
+
+ return CLP_GANG_OKAY;
+}
+
+/**
+ * Discard pages protected by the given lock. This function traverses radix
+ * tree to find all covering pages and discard them. If a page is being covered
+ * by other locks, it should remain in cache.
+ *
+ * If error happens on any step, the process continues anyway (the reasoning
+ * behind this being that lock cancellation cannot be delayed indefinitely).
+ */
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *ols)
+{
+ struct osc_thread_info *info = osc_env_info(env);
+ struct cl_io *io = &info->oti_io;
+ struct cl_object *osc = ols->ols_cl.cls_obj;
+ struct cl_lock *lock = ols->ols_cl.cls_lock;
+ struct cl_lock_descr *descr = &lock->cll_descr;
+ osc_page_gang_cbt cb;
+ int res;
+ int result;
+
+ ENTRY;
+
+ io->ci_obj = cl_object_top(osc);
+ io->ci_ignore_layout = 1;
+ result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+ if (result != 0)
+ GOTO(out, result);
+
+ cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
+ info->oti_fn_index = info->oti_next_index = descr->cld_start;
+ do {
+ res = osc_page_gang_lookup(env, io, cl2osc(osc),
+ info->oti_next_index, descr->cld_end,
+ cb, (void *)lock);
+ if (info->oti_next_index > descr->cld_end)
+ break;
+
+ if (res == CLP_GANG_RESCHED)
+ cond_resched();
+ } while (res != CLP_GANG_OKAY);
+out:
+ cl_io_fini(env, io);
+ RETURN(result);
+}
+
+
/** @} osc */