*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* Implementation of cl_page for OSC layer.
*
static void osc_page_transfer_add(const struct lu_env *env,
struct osc_page *opg, enum cl_req_type crt)
{
- struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
+ struct osc_object *obj = osc_page_object(opg);
osc_lru_use(osc_cli(obj), opg);
}
-int osc_page_cache_add(const struct lu_env *env, struct osc_page *opg,
- struct cl_io *io, cl_commit_cbt cb)
+int osc_page_cache_add(const struct lu_env *env, struct osc_object *osc,
+ struct osc_page *opg, struct cl_io *io,
+ cl_commit_cbt cb)
{
int result;
ENTRY;
osc_page_transfer_get(opg, "transfer\0cache");
- result = osc_queue_async_io(env, io, opg, cb);
+ result = osc_queue_async_io(env, io, osc, opg, cb);
if (result != 0)
osc_page_transfer_put(env, opg);
else
const struct cl_object *obj, pgoff_t start, pgoff_t end)
{
memset(policy, 0, sizeof *policy);
- policy->l_extent.start = cl_offset(obj, start);
- policy->l_extent.end = cl_offset(obj, end + 1) - 1;
-}
-
-static inline s64 osc_submit_duration(struct osc_page *opg)
-{
- if (ktime_to_ns(opg->ops_submit_time) == 0)
- return 0;
-
- return ktime_ms_delta(ktime_get(), opg->ops_submit_time);
+ policy->l_extent.start = start << PAGE_SHIFT;
+ policy->l_extent.end = ((end + 1) << PAGE_SHIFT) - 1;
}
static int osc_page_print(const struct lu_env *env,
{
struct osc_page *opg = cl2osc_page(slice);
struct osc_async_page *oap = &opg->ops_oap;
- struct osc_object *obj = cl2osc(slice->cpl_obj);
+ struct osc_object *obj = osc_page_object(opg);
struct client_obd *cli = &osc_export(obj)->exp_obd->u.cli;
return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p %lu: "
- "1< %#x %d %c %c > "
- "2< %lld %u %u %#x %#x | %p %p %p > "
- "3< %d %lld %d > "
+ "1< %d %c %c > "
+ "2< %lld %u %u %#x %#x | %p %p > "
+ "3< %d %d > "
"4< %d %d %d %lu %c | %c %c %c %c > "
"5< %c %c %c %c | %d %c | %d %c %c>\n",
opg, osc_index(opg),
/* 1 */
- oap->oap_magic, oap->oap_cmd,
+ oap->oap_cmd,
list_empty_marker(&oap->oap_pending_item),
list_empty_marker(&oap->oap_rpc_item),
/* 2 */
oap->oap_obj_off, oap->oap_page_off, oap->oap_count,
oap->oap_async_flags, oap->oap_brw_flags,
- oap->oap_request, oap->oap_cli, obj,
+ cli, obj,
/* 3 */
opg->ops_transfer_pinned,
- osc_submit_duration(opg), opg->ops_srvlock,
+ opg->ops_srvlock,
/* 4 */
cli->cl_r_in_flight, cli->cl_w_in_flight,
cli->cl_max_rpcs_in_flight,
cli->cl_avail_grant,
- list_empty_marker(&cli->cl_cache_waiters),
+ waitqueue_active(&cli->cl_cache_waiters) ? '+' : '-',
list_empty_marker(&cli->cl_loi_ready_list),
list_empty_marker(&cli->cl_loi_hp_ready_list),
list_empty_marker(&cli->cl_loi_write_list),
const struct cl_page_slice *slice)
{
struct osc_page *opg = cl2osc_page(slice);
- struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
+ struct osc_object *obj = osc_page_object(opg);
int rc;
ENTRY;
struct osc_page *opg = cl2osc_page(slice);
struct osc_async_page *oap = &opg->ops_oap;
+ CDEBUG(D_CACHE, "from %d, to %d\n", from, to);
+
opg->ops_from = from;
- opg->ops_to = to;
- spin_lock(&oap->oap_lock);
+ /* argument @to is exclusive, but @ops_to is inclusive */
+ opg->ops_to = to - 1;
oap->oap_async_flags |= ASYNC_COUNT_STABLE;
- spin_unlock(&oap->oap_lock);
}
static int osc_page_flush(const struct lu_env *env,
const struct cl_page_slice *slice, size_t to)
{
struct osc_page *opg = cl2osc_page(slice);
- struct cl_object *obj = opg->ops_cl.cpl_obj;
+ struct cl_object *obj = osc2cl(osc_page_object(opg));
osc_page_touch_at(env, obj, osc_index(opg), to);
}
+static const struct cl_page_operations osc_transient_page_ops = {
+ .cpo_print = osc_page_print,
+ .cpo_delete = osc_page_delete,
+ .cpo_clip = osc_page_clip,
+};
+
static const struct cl_page_operations osc_page_ops = {
.cpo_print = osc_page_print,
.cpo_delete = osc_page_delete,
};
int osc_page_init(const struct lu_env *env, struct cl_object *obj,
- struct cl_page *page, pgoff_t index)
+ struct cl_page *cl_page, pgoff_t index)
{
struct osc_object *osc = cl2osc(obj);
- struct osc_page *opg = cl_object_page_slice(obj, page);
+ struct osc_page *opg = cl_object_page_slice(obj, cl_page);
struct osc_io *oio = osc_env_io(env);
int result;
opg->ops_from = 0;
- opg->ops_to = PAGE_SIZE;
+ opg->ops_to = PAGE_SIZE - 1;
INIT_LIST_HEAD(&opg->ops_lru);
- result = osc_prep_async_page(osc, opg, page->cp_vmpage,
- cl_offset(obj, index));
+ result = osc_prep_async_page(osc, opg, cl_page, index << PAGE_SHIFT);
if (result != 0)
return result;
opg->ops_srvlock = osc_io_srvlock(oio);
- cl_page_slice_add(page, &opg->ops_cl, obj, index,
- &osc_page_ops);
-
- /* reserve an LRU space for this page */
- if (page->cp_type == CPT_CACHEABLE) {
+ if (cl_page->cp_type == CPT_TRANSIENT) {
+ cl_page_slice_add(cl_page, &opg->ops_cl, obj,
+ &osc_transient_page_ops);
+ } else if (cl_page->cp_type == CPT_CACHEABLE) {
+ cl_page_slice_add(cl_page, &opg->ops_cl, obj, &osc_page_ops);
+ /* reserve an LRU space for this page */
result = osc_lru_alloc(env, osc_cli(osc), opg);
if (result == 0) {
result = radix_tree_preload(GFP_NOFS);
struct osc_io *oio = osc_env_io(env);
struct osc_async_page *oap = &opg->ops_oap;
- LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
- "magic 0x%x\n", oap, oap->oap_magic);
LASSERT(oap->oap_async_flags & ASYNC_READY);
LASSERT(oap->oap_async_flags & ASYNC_COUNT_STABLE);
- oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
- oap->oap_page_off = opg->ops_from;
- oap->oap_count = opg->ops_to - opg->ops_from;
+ oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
+ oap->oap_page_off = opg->ops_from;
+ oap->oap_count = opg->ops_to - opg->ops_from + 1;
oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags;
- if (oio->oi_cap_sys_resource) {
- oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
- oap->oap_cmd |= OBD_BRW_NOQUOTA;
- }
+ if (oio->oi_cap_sys_resource)
+ oap->oap_brw_flags |= OBD_BRW_SYS_RESOURCE;
- opg->ops_submit_time = ktime_get();
osc_page_transfer_get(opg, "transfer\0imm");
osc_page_transfer_add(env, opg, crt);
}
unsigned long budget;
LASSERT(cache != NULL);
- budget = cache->ccc_lru_max / (atomic_read(&cache->ccc_users) - 2);
+ budget = cache->ccc_lru_max / (refcount_read(&cache->ccc_users) - 2);
/* if it's going to run out LRU slots, we should free some, but not
* too much to maintain faireness among OSCs. */
}
}
-static void discard_pagevec(const struct lu_env *env, struct cl_io *io,
- struct cl_page **pvec, int max_index)
+static void discard_cl_pages(const struct lu_env *env, struct cl_io *io,
+ struct cl_page **pvec, int max_index)
{
- struct pagevec *pagevec = &osc_env_info(env)->oti_pagevec;
+ struct folio_batch *fbatch = &osc_env_info(env)->oti_fbatch;
int i;
- ll_pagevec_init(pagevec, 0);
+ ll_folio_batch_init(fbatch, 0);
for (i = 0; i < max_index; i++) {
struct cl_page *page = pvec[i];
+ LASSERT(page->cp_type != CPT_TRANSIENT);
LASSERT(cl_page_is_owned(page, io));
- cl_page_delete(env, page);
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
- cl_pagevec_put(env, page, pagevec);
+ cl_batch_put(env, page, fbatch);
pvec[i] = NULL;
}
- pagevec_release(pagevec);
+ folio_batch_release(fbatch);
}
/**
if (--maxscan < 0)
break;
- opg = list_entry(cli->cl_lru_list.next, struct osc_page,
- ops_lru);
+ opg = list_first_entry(&cli->cl_lru_list, struct osc_page,
+ ops_lru);
page = opg->ops_cl.cpl_page;
if (lru_page_busy(cli, page)) {
list_move_tail(&opg->ops_lru, &cli->cl_lru_list);
spin_unlock(&cli->cl_lru_list_lock);
if (clobj != NULL) {
- discard_pagevec(env, io, pvec, index);
+ discard_cl_pages(env, io, pvec, index);
index = 0;
cl_io_fini(env, io);
cl_object_put(env, clobj);
clobj = NULL;
+ cond_resched();
}
clobj = tmp;
pvec[index++] = page;
if (unlikely(index == OTI_PVEC_SIZE)) {
spin_unlock(&cli->cl_lru_list_lock);
- discard_pagevec(env, io, pvec, index);
+ discard_cl_pages(env, io, pvec, index);
index = 0;
spin_lock(&cli->cl_lru_list_lock);
spin_unlock(&cli->cl_lru_list_lock);
if (clobj != NULL) {
- discard_pagevec(env, io, pvec, index);
+ discard_cl_pages(env, io, pvec, index);
cl_io_fini(env, io);
cl_object_put(env, clobj);
+ cond_resched();
}
atomic_dec(&cli->cl_lru_shrinkers);
if (count > 0) {
atomic_long_add(count, cli->cl_lru_left);
- wake_up_all(&osc_lru_waitq);
+ wake_up(&osc_lru_waitq);
}
RETURN(count > 0 ? count : rc);
}
{
struct lu_env *env;
struct cl_client_cache *cache = cli->cl_cache;
+ struct client_obd *scan;
int max_scans;
__u16 refcheck;
long rc = 0;
cache->ccc_lru_shrinkers++;
list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
- max_scans = atomic_read(&cache->ccc_users) - 2;
- while (--max_scans > 0 && !list_empty(&cache->ccc_lru)) {
- cli = list_entry(cache->ccc_lru.next, struct client_obd,
- cl_lru_osc);
-
+ max_scans = refcount_read(&cache->ccc_users) - 2;
+ while (--max_scans > 0 &&
+ (scan = list_first_entry_or_null(&cache->ccc_lru,
+ struct client_obd,
+ cl_lru_osc)) != NULL) {
CDEBUG(D_CACHE, "%s: cli %p LRU pages: %ld, busy: %ld.\n",
- cli_name(cli), cli,
- atomic_long_read(&cli->cl_lru_in_list),
- atomic_long_read(&cli->cl_lru_busy));
+ cli_name(scan), scan,
+ atomic_long_read(&scan->cl_lru_in_list),
+ atomic_long_read(&scan->cl_lru_busy));
- list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
- if (osc_cache_too_much(cli) > 0) {
+ list_move_tail(&scan->cl_lru_osc, &cache->ccc_lru);
+ if (osc_cache_too_much(scan) > 0) {
spin_unlock(&cache->ccc_lru_lock);
- rc = osc_lru_shrink(env, cli, npages, true);
+ rc = osc_lru_shrink(env, scan, npages, true);
spin_lock(&cache->ccc_lru_lock);
if (rc >= npages)
break;
out:
cl_env_put(env, &refcheck);
CDEBUG(D_CACHE, "%s: cli %p freed %ld pages.\n",
- cli_name(cli), cli, rc);
+ cli_name(cli), cli, rc);
return rc;
}
break;
if (rc > 0)
continue;
+ /* IO issued by readahead, don't try hard */
+ if (oio->oi_is_readahead) {
+ if (atomic_long_read(cli->cl_lru_left) > 0)
+ continue;
+ rc = -EBUSY;
+ break;
+ }
cond_resched();
rc = l_wait_event_abortable(
unsigned long reserved = 0;
unsigned long max_pages;
unsigned long c;
+ int rc;
- /* reserve a full RPC window at most to avoid that a thread accidentally
- * consumes too many LRU slots */
- max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
- if (npages > max_pages)
- npages = max_pages;
-
+again:
c = atomic_long_read(cli->cl_lru_left);
if (c < npages && osc_lru_reclaim(cli, npages) > 0)
c = atomic_long_read(cli->cl_lru_left);
+
+ if (c < npages) {
+ /*
+ * Trigger writeback in the hope some LRU slot could
+ * be freed.
+ */
+ rc = ptlrpcd_queue_work(cli->cl_writeback_work);
+ if (rc)
+ return 0;
+ }
+
while (c >= npages) {
if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
reserved = npages;
}
c = atomic_long_read(cli->cl_lru_left);
}
+
+ if (reserved != npages) {
+ cond_resched();
+ rc = l_wait_event_abortable(
+ osc_lru_waitq,
+ atomic_long_read(cli->cl_lru_left) > 0);
+ goto again;
+ }
+
+ max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
if (atomic_long_read(cli->cl_lru_left) < max_pages) {
/* If there aren't enough pages in the per-OSC LRU then
* wake up the LRU thread to try and clear out space, so
void osc_lru_unreserve(struct client_obd *cli, unsigned long npages)
{
atomic_long_add(npages, cli->cl_lru_left);
- wake_up_all(&osc_lru_waitq);
+ wake_up(&osc_lru_waitq);
}
/**
* In practice this can work pretty good because the pages in the same RPC
* are likely from the same page zone.
*/
+#ifdef HAVE_NR_UNSTABLE_NFS
+/* Old kernels use a separate counter for unstable pages,
+ * newer kernels treat them like any other writeback.
+ * (see Linux commit: v5.7-467-g8d92890bd6b8)
+ */
+#define NR_ZONE_WRITE_PENDING ((enum zone_stat_item)NR_UNSTABLE_NFS)
+#elif !defined(HAVE_NR_ZONE_WRITE_PENDING)
+#define NR_ZONE_WRITE_PENDING ((enum zone_stat_item)NR_WRITEBACK)
+#endif
+
static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
- struct osc_brw_async_args *aa,
int factor)
{
int page_count;
int count = 0;
int i;
- if (desc != NULL) {
- page_count = desc->bd_iov_count;
- } else {
- page_count = aa->aa_page_count;
- }
+ ENTRY;
+
+ page_count = desc->bd_iov_count;
+
+ CDEBUG(D_PAGE, "%s %d unstable pages\n",
+ factor == 1 ? "adding" : "removing", page_count);
for (i = 0; i < page_count; i++) {
- void *pz;
- if (desc)
- pz = page_zone(desc->bd_vec[i].kiov_page);
- else
- pz = page_zone(aa->aa_ppga[i]->pg);
+ void *pz = page_zone(desc->bd_vec[i].bv_page);
if (likely(pz == zone)) {
++count;
}
if (count > 0) {
- mod_zone_page_state(zone, NR_UNSTABLE_NFS,
+ mod_zone_page_state(zone, NR_ZONE_WRITE_PENDING,
factor * count);
count = 0;
}
++count;
}
if (count > 0)
- mod_zone_page_state(zone, NR_UNSTABLE_NFS, factor * count);
+ mod_zone_page_state(zone, NR_ZONE_WRITE_PENDING,
+ factor * count);
+
+ EXIT;
}
-static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
- struct osc_brw_async_args *aa)
+static inline void add_unstable_pages(struct ptlrpc_bulk_desc *desc)
{
- unstable_page_accounting(desc, aa, 1);
+ unstable_page_accounting(desc, 1);
}
-static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
- struct osc_brw_async_args *aa)
+static inline void dec_unstable_pages(struct ptlrpc_bulk_desc *desc)
{
- unstable_page_accounting(desc, aa, -1);
+ unstable_page_accounting(desc, -1);
}
/**
void osc_dec_unstable_pages(struct ptlrpc_request *req)
{
struct ptlrpc_bulk_desc *desc = req->rq_bulk;
- struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
int page_count;
long unstable_count;
- if (desc)
- page_count = desc->bd_iov_count;
- else
- page_count = aa->aa_page_count;
+ /* no desc means short io, which doesn't have separate unstable pages,
+ * it's just using space inside the RPC itself
+ */
+ if (!desc)
+ return;
+
+ page_count = desc->bd_iov_count;
LASSERT(page_count >= 0);
- dec_unstable_page_accounting(desc, aa);
+ dec_unstable_pages(desc);
unstable_count = atomic_long_sub_return(page_count,
&cli->cl_unstable_count);
unstable_count = atomic_long_sub_return(page_count,
&cli->cl_cache->ccc_unstable_nr);
LASSERT(unstable_count >= 0);
- if (unstable_count == 0)
- wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
if (waitqueue_active(&osc_lru_waitq))
(void)ptlrpcd_queue_work(cli->cl_lru_work);
void osc_inc_unstable_pages(struct ptlrpc_request *req)
{
struct ptlrpc_bulk_desc *desc = req->rq_bulk;
- struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
long page_count;
if (cli->cl_cache == NULL || !cli->cl_cache->ccc_unstable_check)
return;
- if (desc)
- page_count = desc->bd_iov_count;
- else
- page_count = aa->aa_page_count;
+ /* no desc means short io, which doesn't have separate unstable pages,
+ * it's just using space inside the RPC itself
+ */
+ if (!desc)
+ return;
+
+ page_count = desc->bd_iov_count;
- add_unstable_page_accounting(desc, aa);
+ add_unstable_pages(desc);
atomic_long_add(page_count, &cli->cl_unstable_count);
atomic_long_add(page_count, &cli->cl_cache->ccc_unstable_nr);
struct client_obd *cli;
unsigned long cached = 0;
+ if (!osc_page_cache_shrink_enabled)
+ return 0;
+
spin_lock(&osc_shrink_lock);
list_for_each_entry(cli, &osc_shrink_list, cl_shrink_list)
cached += atomic_long_read(&cli->cl_lru_in_list);
return SHRINK_STOP;
spin_lock(&osc_shrink_lock);
- while (!list_empty(&osc_shrink_list)) {
- cli = list_entry(osc_shrink_list.next, struct client_obd,
- cl_shrink_list);
-
+ while ((cli = list_first_entry_or_null(&osc_shrink_list,
+ struct client_obd,
+ cl_shrink_list)) != NULL) {
if (stop_anchor == NULL)
stop_anchor = cli;
else if (cli == stop_anchor)