X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_cache.c;h=4d97b3e028be984345aa987486338db13b63f49f;hp=581beb6013314f01a28948bab94c70dec4d638e7;hb=27815a0611a2e315a9a7696a20c2f257d48aeb7e;hpb=66be2e6014a7bf20b2f749c877cb141b209f15d9 diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 581beb6..4d97b3e 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -27,7 +27,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2013, Intel Corporation. * */ /* @@ -99,27 +99,28 @@ static inline char list_empty_marker(cfs_list_t *list) #define EXTSTR "[%lu -> %lu/%lu]" #define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end +static const char *oes_strings[] = { + "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL }; #define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do { \ struct osc_extent *__ext = (extent); \ - const char *__str[] = OES_STRINGS; \ char __buf[16]; \ \ CDEBUG(lvl, \ "extent %p@{" EXTSTR ", " \ "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt, \ - /* ----- extent part 0 ----- */ \ + /* ----- extent part 0 ----- */ \ __ext, EXTPARA(__ext), \ /* ----- part 1 ----- */ \ cfs_atomic_read(&__ext->oe_refc), \ cfs_atomic_read(&__ext->oe_users), \ list_empty_marker(&__ext->oe_link), \ - __str[__ext->oe_state], ext_flags(__ext, __buf), \ + oes_strings[__ext->oe_state], ext_flags(__ext, __buf), \ __ext->oe_obj, \ /* ----- part 2 ----- */ \ __ext->oe_grants, __ext->oe_nr_pages, \ list_empty_marker(&__ext->oe_pages), \ - cfs_waitq_active(&__ext->oe_waitq) ? '+' : '-', \ + waitqueue_active(&__ext->oe_waitq) ? '+' : '-', \ __ext->oe_osclock, __ext->oe_mppr, __ext->oe_owner, \ /* ----- part 4 ----- */ \ ## __VA_ARGS__); \ @@ -128,10 +129,10 @@ static inline char list_empty_marker(cfs_list_t *list) #undef EASSERTF #define EASSERTF(expr, ext, fmt, args...) do { \ if (!(expr)) { \ - OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args); \ - osc_extent_tree_dump(D_ERROR, (ext)->oe_obj); \ + OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args); \ + osc_extent_tree_dump(D_ERROR, (ext)->oe_obj); \ LASSERT(expr); \ - } \ + } \ } while (0) #undef EASSERT @@ -301,14 +302,14 @@ static void osc_extent_state_set(struct osc_extent *ext, int state) /* TODO: validate the state machine */ ext->oe_state = state; - cfs_waitq_broadcast(&ext->oe_waitq); + wake_up_all(&ext->oe_waitq); } static struct osc_extent *osc_extent_alloc(struct osc_object *obj) { struct osc_extent *ext; - OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, CFS_ALLOC_STD); + OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, GFP_IOFS); if (ext == NULL) return NULL; @@ -319,7 +320,7 @@ static struct osc_extent *osc_extent_alloc(struct osc_object *obj) CFS_INIT_LIST_HEAD(&ext->oe_link); ext->oe_state = OES_INV; CFS_INIT_LIST_HEAD(&ext->oe_pages); - cfs_waitq_init(&ext->oe_waitq); + init_waitqueue_head(&ext->oe_waitq); ext->oe_osclock = NULL; return ext; @@ -504,7 +505,7 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur, return -ERANGE; LASSERT(cur->oe_osclock == victim->oe_osclock); - ppc_bits = osc_cli(obj)->cl_chunkbits - CFS_PAGE_SHIFT; + ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_CACHE_SHIFT; chunk_start = cur->oe_start >> ppc_bits; chunk_end = cur->oe_end >> ppc_bits; if (chunk_start != (victim->oe_end >> ppc_bits) + 1 && @@ -611,8 +612,8 @@ struct osc_extent *osc_extent_find(const struct lu_env *env, LASSERT(lock != NULL); LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE); - LASSERT(cli->cl_chunkbits >= CFS_PAGE_SHIFT); - ppc_bits = cli->cl_chunkbits - CFS_PAGE_SHIFT; + LASSERT(cli->cl_chunkbits >= PAGE_CACHE_SHIFT); + ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT; chunk_mask = ~((1 << ppc_bits) - 1); chunksize = 1 << cli->cl_chunkbits; chunk = index >> ppc_bits; @@ -826,8 +827,8 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, if (!sent) { lost_grant = ext->oe_grants; - } else if (blocksize < CFS_PAGE_SIZE && - last_count != CFS_PAGE_SIZE) { + } else if (blocksize < PAGE_CACHE_SIZE && + last_count != PAGE_CACHE_SIZE) { /* For short writes we shouldn't count parts of pages that * span a whole chunk on the OST side, or our accounting goes * wrong. Should match the code in filter_grant_check. */ @@ -837,7 +838,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, if (end) count += blocksize - end; - lost_grant = CFS_PAGE_SIZE - count; + lost_grant = PAGE_CACHE_SIZE - count; } if (ext->oe_grants > 0) osc_free_grant(cli, nr_pages, lost_grant); @@ -895,7 +896,7 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext, "%s: wait ext to %d timedout, recovery in progress?\n", osc_export(obj)->exp_obd->obd_name, state); - lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); + lwi = LWI_INTR(NULL, NULL); rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), &lwi); } @@ -919,7 +920,8 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, struct osc_async_page *oap; struct osc_async_page *tmp; int pages_in_chunk = 0; - int ppc_bits = cli->cl_chunkbits - CFS_PAGE_SHIFT; + int ppc_bits = cli->cl_chunkbits - + PAGE_CACHE_SHIFT; __u64 trunc_chunk = trunc_index >> ppc_bits; int grants = 0; int nr_pages = 0; @@ -962,7 +964,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, cfs_list_del_init(&oap->oap_pending_item); cl_page_get(page); - lu_ref_add(&page->cp_reference, "truncate", cfs_current()); + lu_ref_add(&page->cp_reference, "truncate", current); if (cl_page_own(env, io, page) == 0) { cl_page_unmap(env, io, page); @@ -973,7 +975,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, LASSERT(0); } - lu_ref_del(&page->cp_reference, "truncate", cfs_current()); + lu_ref_del(&page->cp_reference, "truncate", current); cl_page_put(env, page); --ext->oe_nr_pages; @@ -1076,16 +1078,20 @@ static int osc_extent_make_ready(const struct lu_env *env, if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) { last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE); LASSERT(last->oap_count > 0); - LASSERT(last->oap_page_off + last->oap_count <= CFS_PAGE_SIZE); + LASSERT(last->oap_page_off + last->oap_count <= PAGE_CACHE_SIZE); + spin_lock(&last->oap_lock); last->oap_async_flags |= ASYNC_COUNT_STABLE; + spin_unlock(&last->oap_lock); } /* for the rest of pages, we don't need to call osf_refresh_count() * because it's known they are not the last page */ cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) { - oap->oap_count = CFS_PAGE_SIZE - oap->oap_page_off; + oap->oap_count = PAGE_CACHE_SIZE - oap->oap_page_off; + spin_lock(&oap->oap_lock); oap->oap_async_flags |= ASYNC_COUNT_STABLE; + spin_unlock(&oap->oap_lock); } } @@ -1108,7 +1114,7 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants) struct osc_object *obj = ext->oe_obj; struct client_obd *cli = osc_cli(obj); struct osc_extent *next; - int ppc_bits = cli->cl_chunkbits - CFS_PAGE_SHIFT; + int ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT; pgoff_t chunk = index >> ppc_bits; pgoff_t end_chunk; pgoff_t end_index; @@ -1240,9 +1246,9 @@ static int osc_refresh_count(const struct lu_env *env, return 0; else if (cl_offset(obj, page->cp_index + 1) > kms) /* catch sub-page write at end of file */ - return kms % CFS_PAGE_SIZE; + return kms % PAGE_CACHE_SIZE; else - return CFS_PAGE_SIZE; + return PAGE_CACHE_SIZE; } static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, @@ -1257,8 +1263,10 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, ENTRY; cmd &= ~OBD_BRW_NOQUOTA; - LASSERT(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ)); - LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE)); + LASSERTF(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ), + "cp_state:%u, cmd:%d\n", page->cp_state, cmd); + LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE), + "cp_state:%u, cmd:%d\n", page->cp_state, cmd); LASSERT(opg->ops_transfer_pinned); /* @@ -1307,28 +1315,34 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, RETURN(0); } -#define OSC_DUMP_GRANT(cli, fmt, args...) do { \ +#define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do { \ struct client_obd *__tmp = (cli); \ - CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d " \ - "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " fmt, \ + CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d " \ + "unstable_pages: %d/%d dropped: %ld avail: %ld, " \ + "reserved: %ld, flight: %d } lru {in list: %d, " \ + "left: %d, waiters: %d }" fmt, \ __tmp->cl_import->imp_obd->obd_name, \ __tmp->cl_dirty, __tmp->cl_dirty_max, \ cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages, \ + cfs_atomic_read(&obd_unstable_pages), obd_max_dirty_pages, \ __tmp->cl_lost_grant, __tmp->cl_avail_grant, \ - __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args); \ + __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, \ + cfs_atomic_read(&__tmp->cl_lru_in_list), \ + cfs_atomic_read(&__tmp->cl_lru_busy), \ + cfs_atomic_read(&__tmp->cl_lru_shrinkers), ##args); \ } while (0) /* caller must hold loi_list_lock */ static void osc_consume_write_grant(struct client_obd *cli, struct brw_page *pga) { - LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock); + LASSERT(spin_is_locked(&cli->cl_loi_list_lock.lock)); LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT)); cfs_atomic_inc(&obd_dirty_pages); - cli->cl_dirty += CFS_PAGE_SIZE; + cli->cl_dirty += PAGE_CACHE_SIZE; pga->flag |= OBD_BRW_FROM_GRANT; CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n", - CFS_PAGE_SIZE, pga, pga->pg); + PAGE_CACHE_SIZE, pga, pga->pg); osc_update_next_shrink(cli); } @@ -1339,7 +1353,7 @@ static void osc_release_write_grant(struct client_obd *cli, { ENTRY; - LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock); + LASSERT(spin_is_locked(&cli->cl_loi_list_lock.lock)); if (!(pga->flag & OBD_BRW_FROM_GRANT)) { EXIT; return; @@ -1347,11 +1361,11 @@ static void osc_release_write_grant(struct client_obd *cli, pga->flag &= ~OBD_BRW_FROM_GRANT; cfs_atomic_dec(&obd_dirty_pages); - cli->cl_dirty -= CFS_PAGE_SIZE; + cli->cl_dirty -= PAGE_CACHE_SIZE; if (pga->flag & OBD_BRW_NOCACHE) { pga->flag &= ~OBD_BRW_NOCACHE; cfs_atomic_dec(&obd_dirty_transit_pages); - cli->cl_dirty_transit -= CFS_PAGE_SIZE; + cli->cl_dirty_transit -= PAGE_CACHE_SIZE; } EXIT; } @@ -1407,7 +1421,7 @@ void osc_unreserve_grant(struct client_obd *cli, * used, we should return these grants to OST. There're two cases where grants * can be lost: * 1. truncate; - * 2. blocksize at OST is less than CFS_PAGE_SIZE and a partial page was + * 2. blocksize at OST is less than PAGE_CACHE_SIZE and a partial page was * written. In this case OST may use less chunks to serve this partial * write. OSTs don't actually know the page size on the client side. so * clients have to calculate lost grant by the blocksize on the OST. @@ -1420,7 +1434,7 @@ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages, client_obd_list_lock(&cli->cl_loi_list_lock); cfs_atomic_sub(nr_pages, &obd_dirty_pages); - cli->cl_dirty -= nr_pages << CFS_PAGE_SHIFT; + cli->cl_dirty -= nr_pages << PAGE_CACHE_SHIFT; cli->cl_lost_grant += lost_grant; if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) { /* borrow some grant from truncate to avoid the case that @@ -1456,17 +1470,18 @@ static int osc_enter_cache_try(struct client_obd *cli, { int rc; - OSC_DUMP_GRANT(cli, "need:%d.\n", bytes); + OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes); rc = osc_reserve_grant(cli, bytes); if (rc < 0) return 0; - if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max && - cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) { + if (cli->cl_dirty + PAGE_CACHE_SIZE <= cli->cl_dirty_max && + cfs_atomic_read(&obd_unstable_pages) + 1 + + cfs_atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) { osc_consume_write_grant(cli, &oap->oap_brw_page); if (transient) { - cli->cl_dirty_transit += CFS_PAGE_SIZE; + cli->cl_dirty_transit += PAGE_CACHE_SIZE; cfs_atomic_inc(&obd_dirty_transit_pages); oap->oap_brw_flags |= OBD_BRW_NOCACHE; } @@ -1482,7 +1497,7 @@ static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw) { int rc; client_obd_list_lock(&cli->cl_loi_list_lock); - rc = cfs_list_empty(&ocw->ocw_entry) || cli->cl_w_in_flight == 0; + rc = cfs_list_empty(&ocw->ocw_entry); client_obd_list_unlock(&cli->cl_loi_list_lock); return rc; } @@ -1500,18 +1515,19 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, struct osc_object *osc = oap->oap_obj; struct lov_oinfo *loi = osc->oo_oinfo; struct osc_cache_waiter ocw; - struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); + struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL, + LWI_ON_SIGNAL_NOOP, NULL); int rc = -EDQUOT; ENTRY; - OSC_DUMP_GRANT(cli, "need:%d.\n", bytes); + OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes); client_obd_list_lock(&cli->cl_loi_list_lock); /* force the caller to try sync io. this can jump the list * of queued writes and create a discontiguous rpc stream */ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) || - cli->cl_dirty_max < CFS_PAGE_SIZE || + cli->cl_dirty_max < PAGE_CACHE_SIZE || cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) GOTO(out, rc = -EDQUOT); @@ -1525,10 +1541,10 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, * RPC size will be. * The exiting condition is no avail grants and no dirty pages caching, * that really means there is no space on the OST. */ - cfs_waitq_init(&ocw.ocw_waitq); + init_waitqueue_head(&ocw.ocw_waitq); ocw.ocw_oap = oap; ocw.ocw_grant = bytes; - if (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) { + while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) { cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters); ocw.ocw_rc = 0; client_obd_list_unlock(&cli->cl_loi_list_lock); @@ -1542,31 +1558,42 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, client_obd_list_lock(&cli->cl_loi_list_lock); - /* l_wait_event is interrupted by signal */ + /* l_wait_event is interrupted by signal, or timed out */ if (rc < 0) { + switch (rc) { + case -ETIMEDOUT: + OSC_DUMP_GRANT(D_ERROR, cli, + "try to reserve %d.\n", bytes); + osc_extent_tree_dump(D_ERROR, osc); + rc = -EDQUOT; + break; + case -EINTR: + /* Ensures restartability - LU-3581 */ + rc = -ERESTARTSYS; + break; + default: + CDEBUG(D_CACHE, "%s: event for cache space @" + " %p never arrived due to %d\n", + cli->cl_import->imp_obd->obd_name, + &ocw, rc); + break; + } cfs_list_del_init(&ocw.ocw_entry); GOTO(out, rc); } - /* If ocw_entry isn't empty, which means it's not waked up - * by osc_wake_cache_waiters(), then the page must not be - * granted yet. */ - if (!cfs_list_empty(&ocw.ocw_entry)) { - rc = -EDQUOT; - cfs_list_del_init(&ocw.ocw_entry); - } else { - rc = ocw.ocw_rc; - } + LASSERT(cfs_list_empty(&ocw.ocw_entry)); + rc = ocw.ocw_rc; if (rc != -EDQUOT) GOTO(out, rc); if (osc_enter_cache_try(cli, oap, bytes, 0)) - rc = 0; + GOTO(out, rc = 0); } EXIT; out: client_obd_list_unlock(&cli->cl_loi_list_lock); - OSC_DUMP_GRANT(cli, "returned %d.\n", rc); + OSC_DUMP_GRANT(D_CACHE, cli, "returned %d.\n", rc); RETURN(rc); } @@ -1578,35 +1605,29 @@ void osc_wake_cache_waiters(struct client_obd *cli) ENTRY; cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { - /* if we can't dirty more, we must wait until some is written */ - if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) || - (cfs_atomic_read(&obd_dirty_pages) + 1 > - obd_max_dirty_pages)) { + ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry); + cfs_list_del_init(&ocw->ocw_entry); + + ocw->ocw_rc = -EDQUOT; + /* we can't dirty more */ + if ((cli->cl_dirty + PAGE_CACHE_SIZE > cli->cl_dirty_max) || + (cfs_atomic_read(&obd_unstable_pages) + 1 + + cfs_atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) { CDEBUG(D_CACHE, "no dirty room: dirty: %ld " "osc max %ld, sys max %d\n", cli->cl_dirty, cli->cl_dirty_max, obd_max_dirty_pages); - return; + goto wakeup; } - /* if still dirty cache but no grant wait for pending RPCs that - * may yet return us some grant before doing sync writes */ - if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) { - CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n", - cli->cl_w_in_flight); - return; - } - - ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry); - cfs_list_del_init(&ocw->ocw_entry); - ocw->ocw_rc = 0; if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0)) ocw->ocw_rc = -EDQUOT; +wakeup: CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n", ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc); - cfs_waitq_signal(&ocw->ocw_waitq); + wake_up(&ocw->ocw_waitq); } EXIT; @@ -1759,6 +1780,91 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid, ar->ar_force_sync = 0; } +/* Performs "unstable" page accounting. This function balances the + * increment operations performed in osc_inc_unstable_pages. It is + * registered as the RPC request callback, and is executed when the + * bulk RPC is committed on the server. Thus at this point, the pages + * involved in the bulk transfer are no longer considered unstable. */ +void osc_dec_unstable_pages(struct ptlrpc_request *req) +{ + struct ptlrpc_bulk_desc *desc = req->rq_bulk; + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + obd_count page_count = desc->bd_iov_count; + int i; + + /* No unstable page tracking */ + if (cli->cl_cache == NULL) + return; + + LASSERT(page_count >= 0); + + for (i = 0; i < page_count; i++) + dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS); + + cfs_atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr); + LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0); + + cfs_atomic_sub(page_count, &cli->cl_unstable_count); + LASSERT(cfs_atomic_read(&cli->cl_unstable_count) >= 0); + + cfs_atomic_sub(page_count, &obd_unstable_pages); + LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0); + + spin_lock(&req->rq_lock); + req->rq_committed = 1; + req->rq_unstable = 0; + spin_unlock(&req->rq_lock); + + wake_up_all(&cli->cl_cache->ccc_unstable_waitq); +} + +/* "unstable" page accounting. See: osc_dec_unstable_pages. */ +void osc_inc_unstable_pages(struct ptlrpc_request *req) +{ + struct ptlrpc_bulk_desc *desc = req->rq_bulk; + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + obd_count page_count = desc->bd_iov_count; + int i; + + /* No unstable page tracking */ + if (cli->cl_cache == NULL) + return; + + LASSERT(page_count >= 0); + + for (i = 0; i < page_count; i++) + inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS); + + LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0); + cfs_atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr); + + LASSERT(cfs_atomic_read(&cli->cl_unstable_count) >= 0); + cfs_atomic_add(page_count, &cli->cl_unstable_count); + + LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0); + cfs_atomic_add(page_count, &obd_unstable_pages); + + spin_lock(&req->rq_lock); + + /* If the request has already been committed (i.e. brw_commit + * called via rq_commit_cb), we need to undo the unstable page + * increments we just performed because rq_commit_cb wont be + * called again. Otherwise, just set the commit callback so the + * unstable page accounting is properly updated when the request + * is committed */ + if (req->rq_committed) { + /* Drop lock before calling osc_dec_unstable_pages */ + spin_unlock(&req->rq_lock); + osc_dec_unstable_pages(req); + spin_lock(&req->rq_lock); + } else { + req->rq_unstable = 1; + req->rq_commit_cb = osc_dec_unstable_pages; + } + + spin_unlock(&req->rq_lock); +} + /* this must be called holding the loi list lock to give coverage to exit_cache, * async_flag maintenance, and oap_request */ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, @@ -1770,6 +1876,9 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, ENTRY; if (oap->oap_request != NULL) { + if (rc == 0) + osc_inc_unstable_pages(oap->oap_request); + xid = ptlrpc_req_xid(oap->oap_request); ptlrpc_req_finished(oap->oap_request); oap->oap_request = NULL; @@ -1816,7 +1925,7 @@ static int try_to_add_extent_for_io(struct client_obd *cli, RETURN(0); cfs_list_for_each_entry(tmp, rpclist, oe_link) { - EASSERT(tmp->oe_owner == cfs_current(), tmp); + EASSERT(tmp->oe_owner == current, tmp); #if 0 if (overlapped(tmp, ext)) { OSC_EXTENT_DUMP(D_ERROR, tmp, "overlapped %p.\n", ext); @@ -1834,7 +1943,7 @@ static int try_to_add_extent_for_io(struct client_obd *cli, *pc += ext->oe_nr_pages; cfs_list_move_tail(&ext->oe_link, rpclist); - ext->oe_owner = cfs_current(); + ext->oe_owner = current; RETURN(1); } @@ -2075,7 +2184,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, while ((osc = osc_next_obj(cli)) != NULL) { struct cl_object *obj = osc2cl(osc); - struct lu_ref_link *link; + struct lu_ref_link link; OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli)); @@ -2086,7 +2195,8 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, cl_object_get(obj); client_obd_list_unlock(&cli->cl_loi_list_lock); - link = lu_object_ref_add(&obj->co_lu, "check", cfs_current()); + lu_object_ref_add_at(&obj->co_lu, &link, "check", + current); /* attempt some read/write balancing by alternating between * reads and writes in an object. The makes_rpc checks here @@ -2127,7 +2237,8 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, osc_object_unlock(osc); osc_list_maint(cli, osc); - lu_object_ref_del_at(&obj->co_lu, link, "check", cfs_current()); + lu_object_ref_del_at(&obj->co_lu, &link, "check", + current); cl_object_put(env, obj); client_obd_list_lock(&cli->cl_loi_list_lock); @@ -2137,23 +2248,24 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli, struct osc_object *osc, pdl_policy_t pol, int async) { - int has_rpcs = 1; int rc = 0; - client_obd_list_lock(&cli->cl_loi_list_lock); - if (osc != NULL) - has_rpcs = __osc_list_maint(cli, osc); - if (has_rpcs) { - if (!async) { - osc_check_rpcs(env, cli, pol); - } else { - CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", - cli); - LASSERT(cli->cl_writeback_work != NULL); - rc = ptlrpcd_queue_work(cli->cl_writeback_work); - } + if (osc != NULL && osc_list_maint(cli, osc) == 0) + return 0; + + if (!async) { + /* disable osc_lru_shrink() temporarily to avoid + * potential stack overrun problem. LU-2859 */ + cfs_atomic_inc(&cli->cl_lru_shrinkers); + client_obd_list_lock(&cli->cl_loi_list_lock); + osc_check_rpcs(env, cli, pol); + client_obd_list_unlock(&cli->cl_loi_list_lock); + cfs_atomic_dec(&cli->cl_lru_shrinkers); + } else { + CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli); + LASSERT(cli->cl_writeback_work != NULL); + rc = ptlrpcd_queue_work(cli->cl_writeback_work); } - client_obd_list_unlock(&cli->cl_loi_list_lock); return rc; } @@ -2171,7 +2283,7 @@ void osc_io_unplug(const struct lu_env *env, struct client_obd *cli, } int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, - cfs_page_t *page, loff_t offset) + struct page *page, loff_t offset) { struct obd_export *exp = osc_export(osc); struct osc_async_page *oap = &ops->ops_oap; @@ -2255,9 +2367,14 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, RETURN(rc); } + if (osc_over_unstable_soft_limit(cli)) + brw_flags |= OBD_BRW_SOFT_SYNC; + oap->oap_cmd = cmd; oap->oap_page_off = ops->ops_from; oap->oap_count = ops->ops_to - ops->ops_from; + /* No need to hold a lock here, + * since this page is not in any list yet. */ oap->oap_async_flags = 0; oap->oap_brw_flags = brw_flags; @@ -2452,7 +2569,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io, oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT; spin_unlock(&oap->oap_lock); - if (cfs_memory_pressure_get()) + if (memory_pressure_get()) ext->oe_memalloc = 1; ext->oe_urgent = 1; @@ -2709,7 +2826,7 @@ again: * should take care of it. */ rc = osc_extent_wait(env, waiting, OES_INV); if (rc < 0) - OSC_EXTENT_DUMP(D_CACHE, ext, "wait error: %d.\n", rc); + OSC_EXTENT_DUMP(D_CACHE, waiting, "error: %d.\n", rc); osc_extent_put(env, waiting); waiting = NULL; @@ -2858,7 +2975,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, EASSERT(ext->oe_start >= start && ext->oe_max_end <= end, ext); osc_extent_state_set(ext, OES_LOCKING); - ext->oe_owner = cfs_current(); + ext->oe_owner = current; cfs_list_move_tail(&ext->oe_link, &discard_list); osc_update_pending(obj, OBD_BRW_WRITE, @@ -2919,7 +3036,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, result = rc; } - OSC_IO_DEBUG(obj, "cache page out.\n"); + OSC_IO_DEBUG(obj, "pageout [%lu, %lu], %d.\n", start, end, result); RETURN(result); }