* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2016, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*/
#define DEBUG_SUBSYSTEM S_OSC
+#include <lustre_osc.h>
-#include "osc_cl_internal.h"
+#include "osc_internal.h"
static void osc_lru_del(struct client_obd *cli, struct osc_page *opg);
static void osc_lru_use(struct client_obd *cli, struct osc_page *opg);
{
struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
- /* ops_lru and ops_inflight share the same field, so take it from LRU
- * first and then use it as inflight. */
osc_lru_use(osc_cli(obj), opg);
}
-int osc_page_cache_add(const struct lu_env *env,
- const struct cl_page_slice *slice, struct cl_io *io)
+int osc_page_cache_add(const struct lu_env *env, struct osc_page *opg,
+ struct cl_io *io, cl_commit_cbt cb)
{
- struct osc_page *opg = cl2osc_page(slice);
int result;
ENTRY;
osc_page_transfer_get(opg, "transfer\0cache");
- result = osc_queue_async_io(env, io, opg);
+ result = osc_queue_async_io(env, io, opg, cb);
if (result != 0)
osc_page_transfer_put(env, opg);
else
policy->l_extent.end = cl_offset(obj, end + 1) - 1;
}
-static const char *osc_list(struct list_head *head)
+static inline s64 osc_submit_duration(struct osc_page *opg)
{
- return list_empty(head) ? "-" : "+";
-}
-
-static inline cfs_time_t osc_submit_duration(struct osc_page *opg)
-{
- if (opg->ops_submit_time == 0)
- return 0;
+ if (ktime_to_ns(opg->ops_submit_time) == 0)
+ return 0;
- return (cfs_time_current() - opg->ops_submit_time);
+ return ktime_ms_delta(ktime_get(), opg->ops_submit_time);
}
static int osc_page_print(const struct lu_env *env,
- const struct cl_page_slice *slice,
- void *cookie, lu_printer_t printer)
+ const struct cl_page_slice *slice,
+ void *cookie, lu_printer_t printer)
{
- struct osc_page *opg = cl2osc_page(slice);
- struct osc_async_page *oap = &opg->ops_oap;
- struct osc_object *obj = cl2osc(slice->cpl_obj);
- struct client_obd *cli = &osc_export(obj)->exp_obd->u.cli;
+ struct osc_page *opg = cl2osc_page(slice);
+ struct osc_async_page *oap = &opg->ops_oap;
+ struct osc_object *obj = cl2osc(slice->cpl_obj);
+ struct client_obd *cli = &osc_export(obj)->exp_obd->u.cli;
return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p %lu: "
- "1< %#x %d %u %s %s > "
+ "1< %#x %d %c %c > "
"2< %lld %u %u %#x %#x | %p %p %p > "
- "3< %d %lu %d > "
- "4< %d %d %d %lu %s | %s %s %s %s > "
- "5< %s %s %s %s | %d %s | %d %s %s>\n",
+ "3< %d %lld %d > "
+ "4< %d %d %d %lu %c | %c %c %c %c > "
+ "5< %c %c %c %c | %d %c | %d %c %c>\n",
opg, osc_index(opg),
- /* 1 */
- oap->oap_magic, oap->oap_cmd,
- oap->oap_interrupted,
- osc_list(&oap->oap_pending_item),
- osc_list(&oap->oap_rpc_item),
- /* 2 */
- oap->oap_obj_off, oap->oap_page_off, oap->oap_count,
- oap->oap_async_flags, oap->oap_brw_flags,
+ /* 1 */
+ oap->oap_magic, oap->oap_cmd,
+ list_empty_marker(&oap->oap_pending_item),
+ list_empty_marker(&oap->oap_rpc_item),
+ /* 2 */
+ oap->oap_obj_off, oap->oap_page_off, oap->oap_count,
+ oap->oap_async_flags, oap->oap_brw_flags,
oap->oap_request, oap->oap_cli, obj,
/* 3 */
opg->ops_transfer_pinned,
osc_submit_duration(opg), opg->ops_srvlock,
- /* 4 */
- cli->cl_r_in_flight, cli->cl_w_in_flight,
- cli->cl_max_rpcs_in_flight,
- cli->cl_avail_grant,
- osc_list(&cli->cl_cache_waiters),
- osc_list(&cli->cl_loi_ready_list),
- osc_list(&cli->cl_loi_hp_ready_list),
- osc_list(&cli->cl_loi_write_list),
- osc_list(&cli->cl_loi_read_list),
- /* 5 */
- osc_list(&obj->oo_ready_item),
- osc_list(&obj->oo_hp_ready_item),
- osc_list(&obj->oo_write_item),
- osc_list(&obj->oo_read_item),
+ /* 4 */
+ cli->cl_r_in_flight, cli->cl_w_in_flight,
+ cli->cl_max_rpcs_in_flight,
+ cli->cl_avail_grant,
+ waitqueue_active(&cli->cl_cache_waiters) ? '+' : '-',
+ list_empty_marker(&cli->cl_loi_ready_list),
+ list_empty_marker(&cli->cl_loi_hp_ready_list),
+ list_empty_marker(&cli->cl_loi_write_list),
+ list_empty_marker(&cli->cl_loi_read_list),
+ /* 5 */
+ list_empty_marker(&obj->oo_ready_item),
+ list_empty_marker(&obj->oo_hp_ready_item),
+ list_empty_marker(&obj->oo_write_item),
+ list_empty_marker(&obj->oo_read_item),
atomic_read(&obj->oo_nr_reads),
- osc_list(&obj->oo_reading_exts),
+ list_empty_marker(&obj->oo_reading_exts),
atomic_read(&obj->oo_nr_writes),
- osc_list(&obj->oo_hp_exts),
- osc_list(&obj->oo_urgent_exts));
+ list_empty_marker(&obj->oo_hp_exts),
+ list_empty_marker(&obj->oo_urgent_exts));
}
static void osc_page_delete(const struct lu_env *env,
osc_lru_del(osc_cli(obj), opg);
if (slice->cpl_page->cp_type == CPT_CACHEABLE) {
- void *value;
+ void *value = NULL;
spin_lock(&obj->oo_tree_lock);
- value = radix_tree_delete(&obj->oo_tree, osc_index(opg));
- if (value != NULL)
- --obj->oo_npages;
+ if (opg->ops_intree) {
+ value = radix_tree_delete(&obj->oo_tree,
+ osc_index(opg));
+ if (value != NULL) {
+ --obj->oo_npages;
+ opg->ops_intree = 0;
+ }
+ }
spin_unlock(&obj->oo_tree_lock);
LASSERT(ergo(value != NULL, value == opg));
struct osc_async_page *oap = &opg->ops_oap;
opg->ops_from = from;
- opg->ops_to = to;
+ /* argument @to is exclusive, but @ops_to is inclusive */
+ opg->ops_to = to - 1;
spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_COUNT_STABLE;
spin_unlock(&oap->oap_lock);
}
-static int osc_page_cancel(const struct lu_env *env,
- const struct cl_page_slice *slice)
-{
- struct osc_page *opg = cl2osc_page(slice);
- int rc = 0;
-
- /* Check if the transferring against this page
- * is completed, or not even queued. */
- if (opg->ops_transfer_pinned)
- /* FIXME: may not be interrupted.. */
- rc = osc_cancel_async_page(env, opg);
- LASSERT(ergo(rc == 0, opg->ops_transfer_pinned == 0));
- return rc;
-}
-
static int osc_page_flush(const struct lu_env *env,
const struct cl_page_slice *slice,
struct cl_io *io)
RETURN(rc);
}
+static void osc_page_touch(const struct lu_env *env,
+ const struct cl_page_slice *slice, size_t to)
+{
+ struct osc_page *opg = cl2osc_page(slice);
+ struct cl_object *obj = opg->ops_cl.cpl_obj;
+
+ osc_page_touch_at(env, obj, osc_index(opg), to);
+}
+
static const struct cl_page_operations osc_page_ops = {
.cpo_print = osc_page_print,
.cpo_delete = osc_page_delete,
.cpo_clip = osc_page_clip,
- .cpo_cancel = osc_page_cancel,
- .cpo_flush = osc_page_flush
+ .cpo_flush = osc_page_flush,
+ .cpo_page_touch = osc_page_touch,
};
int osc_page_init(const struct lu_env *env, struct cl_object *obj,
- struct cl_page *page, pgoff_t index)
+ struct cl_page *cl_page, pgoff_t index)
{
struct osc_object *osc = cl2osc(obj);
- struct osc_page *opg = cl_object_page_slice(obj, page);
+ struct osc_page *opg = cl_object_page_slice(obj, cl_page);
struct osc_io *oio = osc_env_io(env);
int result;
opg->ops_from = 0;
- opg->ops_to = PAGE_SIZE;
+ opg->ops_to = PAGE_SIZE - 1;
INIT_LIST_HEAD(&opg->ops_lru);
- result = osc_prep_async_page(osc, opg, page->cp_vmpage,
+ result = osc_prep_async_page(osc, opg, cl_page->cp_vmpage,
cl_offset(obj, index));
if (result != 0)
return result;
opg->ops_srvlock = osc_io_srvlock(oio);
- cl_page_slice_add(page, &opg->ops_cl, obj, index,
- &osc_page_ops);
-
+ cl_page_slice_add(cl_page, &opg->ops_cl, obj, &osc_page_ops);
+ cl_page->cp_osc_index = index;
/* reserve an LRU space for this page */
- if (page->cp_type == CPT_CACHEABLE) {
+ if (cl_page->cp_type == CPT_CACHEABLE) {
result = osc_lru_alloc(env, osc_cli(osc), opg);
if (result == 0) {
result = radix_tree_preload(GFP_NOFS);
spin_lock(&osc->oo_tree_lock);
result = radix_tree_insert(&osc->oo_tree,
index, opg);
- if (result == 0)
+ if (result == 0) {
++osc->oo_npages;
+ opg->ops_intree = 1;
+ }
spin_unlock(&osc->oo_tree_lock);
radix_tree_preload_end();
return result;
}
+EXPORT_SYMBOL(osc_page_init);
/**
* Helper function called by osc_io_submit() for every page in an immediate
void osc_page_submit(const struct lu_env *env, struct osc_page *opg,
enum cl_req_type crt, int brw_flags)
{
+ struct osc_io *oio = osc_env_io(env);
struct osc_async_page *oap = &opg->ops_oap;
LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
LASSERT(oap->oap_async_flags & ASYNC_READY);
LASSERT(oap->oap_async_flags & ASYNC_COUNT_STABLE);
- oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
- oap->oap_page_off = opg->ops_from;
- oap->oap_count = opg->ops_to - opg->ops_from;
+ oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
+ oap->oap_page_off = opg->ops_from;
+ oap->oap_count = opg->ops_to - opg->ops_from + 1;
oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags;
- if (cfs_capable(CFS_CAP_SYS_RESOURCE)) {
+ if (oio->oi_cap_sys_resource) {
oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
oap->oap_cmd |= OBD_BRW_NOQUOTA;
}
- opg->ops_submit_time = cfs_time_current();
+ opg->ops_submit_time = ktime_get();
osc_page_transfer_get(opg, "transfer\0imm");
osc_page_transfer_add(env, opg, crt);
}
void osc_lru_add_batch(struct client_obd *cli, struct list_head *plist)
{
- struct list_head lru = LIST_HEAD_INIT(lru);
+ LIST_HEAD(lru);
struct osc_async_page *oap;
long npages = 0;
/* If page is being transferred for the first time,
* ops_lru should be empty */
if (opg->ops_in_lru) {
+ if (list_empty(&opg->ops_lru))
+ return;
spin_lock(&cli->cl_lru_list_lock);
if (!list_empty(&opg->ops_lru)) {
__osc_lru_del(cli, opg);
static void discard_pagevec(const struct lu_env *env, struct cl_io *io,
struct cl_page **pvec, int max_index)
{
- int i;
+ struct pagevec *pagevec = &osc_env_info(env)->oti_pagevec;
+ int i;
- for (i = 0; i < max_index; i++) {
- struct cl_page *page = pvec[i];
+ ll_pagevec_init(pagevec, 0);
+ for (i = 0; i < max_index; i++) {
+ struct cl_page *page = pvec[i];
LASSERT(cl_page_is_owned(page, io));
cl_page_delete(env, page);
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
- cl_page_put(env, page);
+ cl_pagevec_put(env, page, pagevec);
- pvec[i] = NULL;
- }
+ pvec[i] = NULL;
+ }
+ pagevec_release(pagevec);
}
/**
}
pvec = (struct cl_page **)osc_env_info(env)->oti_pvec;
- io = &osc_env_info(env)->oti_io;
+ io = osc_env_thread_io(env);
spin_lock(&cli->cl_lru_list_lock);
if (force)
}
RETURN(count > 0 ? count : rc);
}
+EXPORT_SYMBOL(osc_lru_shrink);
/**
* Reclaim LRU pages by an IO thread. The caller wants to reclaim at least
static int osc_lru_alloc(const struct lu_env *env, struct client_obd *cli,
struct osc_page *opg)
{
- struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
struct osc_io *oio = osc_env_io(env);
int rc = 0;
+
ENTRY;
if (cli->cl_cache == NULL) /* shall not be in LRU */
continue;
cond_resched();
- rc = l_wait_event(osc_lru_waitq,
- atomic_long_read(cli->cl_lru_left) > 0,
- &lwi);
- if (rc < 0)
+ rc = l_wait_event_abortable(
+ osc_lru_waitq,
+ atomic_long_read(cli->cl_lru_left) > 0);
+ if (rc < 0) {
+ rc = -EINTR;
break;
+ }
}
out:
* In practice this can work pretty good because the pages in the same RPC
* are likely from the same page zone.
*/
+#ifdef HAVE_NR_UNSTABLE_NFS
+/* Old kernels use a separate counter for unstable pages,
+ * newer kernels treat them like any other writeback.
+ */
+#define NR_WRITEBACK NR_UNSTABLE_NFS
+#endif
+
static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa,
int factor)
{
- int page_count = desc->bd_iov_count;
+ int page_count;
void *zone = NULL;
int count = 0;
int i;
- LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+ if (desc != NULL) {
+ page_count = desc->bd_iov_count;
+ } else {
+ page_count = aa->aa_page_count;
+ }
for (i = 0; i < page_count; i++) {
- void *pz = page_zone(BD_GET_KIOV(desc, i).kiov_page);
+ void *pz;
+ if (desc)
+ pz = page_zone(desc->bd_vec[i].bv_page);
+ else
+ pz = page_zone(aa->aa_ppga[i]->pg);
if (likely(pz == zone)) {
++count;
}
if (count > 0) {
- mod_zone_page_state(zone, NR_UNSTABLE_NFS,
+ mod_zone_page_state(zone, NR_WRITEBACK,
factor * count);
count = 0;
}
++count;
}
if (count > 0)
- mod_zone_page_state(zone, NR_UNSTABLE_NFS, factor * count);
+ mod_zone_page_state(zone, NR_WRITEBACK, factor * count);
}
-static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc)
+static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa)
{
- unstable_page_accounting(desc, 1);
+ unstable_page_accounting(desc, aa, 1);
}
-static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc)
+static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa)
{
- unstable_page_accounting(desc, -1);
+ unstable_page_accounting(desc, aa, -1);
}
/**
void osc_dec_unstable_pages(struct ptlrpc_request *req)
{
struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
- int page_count = desc->bd_iov_count;
+ int page_count;
long unstable_count;
+ if (desc)
+ page_count = desc->bd_iov_count;
+ else
+ page_count = aa->aa_page_count;
+
LASSERT(page_count >= 0);
- dec_unstable_page_accounting(desc);
+
+ dec_unstable_page_accounting(desc, aa);
unstable_count = atomic_long_sub_return(page_count,
&cli->cl_unstable_count);
void osc_inc_unstable_pages(struct ptlrpc_request *req)
{
struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
- long page_count = desc->bd_iov_count;
+ long page_count;
/* No unstable page tracking */
if (cli->cl_cache == NULL || !cli->cl_cache->ccc_unstable_check)
return;
- add_unstable_page_accounting(desc);
+ if (desc)
+ page_count = desc->bd_iov_count;
+ else
+ page_count = aa->aa_page_count;
+
+ add_unstable_page_accounting(desc, aa);
atomic_long_add(page_count, &cli->cl_unstable_count);
atomic_long_add(page_count, &cli->cl_cache->ccc_unstable_nr);