*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* osc cache management.
*
#define EXTSTR "[%lu -> %lu/%lu]"
#define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
-static const char *oes_strings[] = {
+static const char *const oes_strings[] = {
"inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
#define OSC_EXTENT_DUMP_WITH_LOC(file, func, line, mask, extent, fmt, ...) do {\
if (victim == NULL)
return -EINVAL;
- if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait)
+ if (victim->oe_state != OES_INV &&
+ (victim->oe_state != OES_CACHE || victim->oe_fsync_wait))
return -EBUSY;
if (cur->oe_max_end != victim->oe_max_end)
return -ERANGE;
+ /*
+ * In the rare case max_pages_per_rpc (mppr) is changed, don't
+ * merge extents until after old ones have been sent, or the
+ * "extents are aligned to RPCs" checks are unhappy.
+ */
+ if (cur->oe_mppr != victim->oe_mppr)
+ return -ERANGE;
+
LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
chunk_start = cur->oe_start >> ppc_bits;
cur->oe_urgent |= victim->oe_urgent;
cur->oe_memalloc |= victim->oe_memalloc;
list_splice_init(&victim->oe_pages, &cur->oe_pages);
- list_del_init(&victim->oe_link);
victim->oe_nr_pages = 0;
osc_extent_get(victim);
/**
* Drop user count of osc_extent, and unplug IO asynchronously.
*/
-int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
+void osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
{
struct osc_object *obj = ext->oe_obj;
struct client_obd *cli = osc_cli(obj);
- int rc = 0;
ENTRY;
LASSERT(atomic_read(&ext->oe_users) > 0);
osc_io_unplug_async(env, cli, obj);
}
osc_extent_put(env, ext);
- RETURN(rc);
+
+ RETURN_EXIT;
}
/**
cur->oe_start = descr->cld_start;
if (cur->oe_end > max_end)
cur->oe_end = max_end;
- cur->oe_grants = 0;
+ cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
cur->oe_mppr = max_pages;
if (olck->ols_dlmlock != NULL) {
LASSERT(olck->ols_hold);
* flushed, try next one. */
continue;
- /* check if they belong to the same rpc slot before trying to
- * merge. the extents are not overlapped and contiguous at
- * chunk level to get here. */
- if (ext->oe_max_end != max_end)
- /* if they don't belong to the same RPC slot or
- * max_pages_per_rpc has ever changed, do not merge. */
- continue;
-
- /* check whether maximum extent size will be hit */
- if ((ext_chk_end - ext_chk_start + 1 + 1) << ppc_bits >
- cli->cl_max_extent_pages)
- continue;
-
- /* it's required that an extent must be contiguous at chunk
- * level so that we know the whole extent is covered by grant
- * (the pages in the extent are NOT required to be contiguous).
- * Otherwise, it will be too much difficult to know which
- * chunks have grants allocated. */
-
- /* try to do front merge - extend ext's start */
- if (chunk + 1 == ext_chk_start) {
- /* ext must be chunk size aligned */
- EASSERT((ext->oe_start & ~chunk_mask) == 0, ext);
-
- /* pull ext's start back to cover cur */
- ext->oe_start = cur->oe_start;
- ext->oe_grants += chunksize;
+ if (osc_extent_merge(env, ext, cur) == 0) {
LASSERT(*grants >= chunksize);
*grants -= chunksize;
- found = osc_extent_hold(ext);
- } else if (chunk == ext_chk_end + 1) {
- /* rear merge */
- ext->oe_end = cur->oe_end;
- ext->oe_grants += chunksize;
- LASSERT(*grants >= chunksize);
- *grants -= chunksize;
-
- /* try to merge with the next one because we just fill
- * in a gap */
+ /*
+ * Try to merge with the next one too because we
+ * might have just filled in a gap.
+ */
if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
/* we can save extent tax from next extent */
*grants += cli->cl_grant_extent_tax;
found = osc_extent_hold(ext);
- }
- if (found != NULL)
break;
+ }
}
osc_extent_tree_dump(D_CACHE, obj);
} else if (conflict == NULL) {
/* create a new extent */
EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
- cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
LASSERT(*grants >= cur->oe_grants);
*grants -= cur->oe_grants;
* the size of file. */
if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
- LASSERT(last_oap_count > 0);
+ LASSERTF(last_oap_count > 0,
+ "last_oap_count %d\n", last_oap_count);
LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE);
last->oap_count = last_oap_count;
spin_lock(&last->oap_lock);
pga->flag |= OBD_BRW_FROM_GRANT;
CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
PAGE_SIZE, pga, pga->pg);
- osc_update_next_shrink(cli);
}
/* the companion to osc_consume_write_grant, called when a brw has completed.
if (in_rpc->oe_dio && overlapped(ext, in_rpc))
return false;
+ if (ext->oe_is_rdma_only != in_rpc->oe_is_rdma_only)
+ return false;
+
return true;
}
};
assert_osc_object_is_locked(obj);
- while (!list_empty(&obj->oo_hp_exts)) {
- ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
- oe_link);
- LASSERT(ext->oe_state == OES_CACHE);
+ while ((ext = list_first_entry_or_null(&obj->oo_hp_exts,
+ struct osc_extent,
+ oe_link)) != NULL) {
if (!try_to_add_extent_for_io(cli, ext, &data))
return data.erd_page_count;
EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
if (data.erd_page_count == data.erd_max_pages)
return data.erd_page_count;
- while (!list_empty(&obj->oo_urgent_exts)) {
- ext = list_entry(obj->oo_urgent_exts.next,
- struct osc_extent, oe_link);
+ while ((ext = list_first_entry_or_null(&obj->oo_urgent_exts,
+ struct osc_extent,
+ oe_link)) != NULL) {
if (!try_to_add_extent_for_io(cli, ext, &data))
return data.erd_page_count;
}
* extents can usually only be added if the rpclist was empty, so if we
* can't add one, we continue on to trying to add normal extents. This
* is so we don't miss adding extra extents to an RPC containing high
- * priority or urgent extents. */
- while (!list_empty(&obj->oo_full_exts)) {
- ext = list_entry(obj->oo_full_exts.next,
- struct osc_extent, oe_link);
+ * priority or urgent extents.
+ */
+ while ((ext = list_first_entry_or_null(&obj->oo_full_exts,
+ struct osc_extent,
+ oe_link)) != NULL) {
if (!try_to_add_extent_for_io(cli, ext, &data))
break;
}
EXPORT_SYMBOL(osc_io_unplug0);
int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
- struct page *page, loff_t offset)
+ struct cl_page *page, loff_t offset)
{
struct obd_export *exp = osc_export(osc);
struct osc_async_page *oap = &ops->ops_oap;
+ struct page *vmpage = page->cp_vmpage;
ENTRY;
if (!page)
oap->oap_cli = &exp->exp_obd->u.cli;
oap->oap_obj = osc;
- oap->oap_page = page;
+ oap->oap_page = vmpage;
oap->oap_obj_off = offset;
LASSERT(!(offset & ~PAGE_MASK));
+ /* Count of transient (direct i/o) pages is always stable by the time
+ * they're submitted. Setting this here lets us avoid calling
+ * cl_page_clip later to set this.
+ */
+ if (page->cp_type == CPT_TRANSIENT)
+ oap->oap_async_flags |= ASYNC_COUNT_STABLE|ASYNC_URGENT|
+ ASYNC_READY;
+
INIT_LIST_HEAD(&oap->oap_pending_item);
INIT_LIST_HEAD(&oap->oap_rpc_item);
spin_lock_init(&oap->oap_lock);
- CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
- oap, page, oap->oap_obj_off);
+ CDEBUG(D_INFO, "oap %p vmpage %p obj off %llu\n",
+ oap, vmpage, oap->oap_obj_off);
RETURN(0);
}
EXPORT_SYMBOL(osc_prep_async_page);
}
/* check if the file's owner/group is over quota */
- if (!(cmd & OBD_BRW_NOQUOTA)) {
+ if (!io->ci_noquota) {
struct cl_object *obj;
struct cl_attr *attr;
unsigned int qid[LL_MAXQUOTAS];
oap->oap_cmd = cmd;
oap->oap_page_off = ops->ops_from;
- oap->oap_count = ops->ops_to - ops->ops_from;
+ oap->oap_count = ops->ops_to - ops->ops_from + 1;
/* No need to hold a lock here,
* since this page is not in any list yet. */
oap->oap_async_flags = 0;
return rc;
}
-int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
struct osc_object *obj, struct list_head *list,
int brw_flags)
{
++page_count;
mppr <<= (page_count > mppr);
- if (unlikely(opg->ops_from > 0 || opg->ops_to < PAGE_SIZE))
+ if (unlikely(opg->ops_from > 0 ||
+ opg->ops_to < PAGE_SIZE - 1))
can_merge = false;
}
ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
ext->oe_dio = !!(brw_flags & OBD_BRW_NOCACHE);
+ if (ext->oe_dio && !ext->oe_rw) { /* direct io write */
+ int grants;
+ int ppc;
+
+ ppc = 1 << (cli->cl_chunkbits - PAGE_SHIFT);
+ grants = cli->cl_grant_extent_tax;
+ grants += (1 << cli->cl_chunkbits) *
+ ((page_count + ppc - 1) / ppc);
+
+ CDEBUG(D_CACHE, "requesting %d bytes grant\n", grants);
+ spin_lock(&cli->cl_loi_list_lock);
+ if (osc_reserve_grant(cli, grants) == 0) {
+ list_for_each_entry(oap, list, oap_pending_item) {
+ osc_consume_write_grant(cli,
+ &oap->oap_brw_page);
+ }
+ atomic_long_add(page_count, &obd_dirty_pages);
+ osc_unreserve_grant_nolock(cli, grants, 0);
+ ext->oe_grants = grants;
+ } else {
+ /* We cannot report ENOSPC correctly if we do parallel
+ * DIO (async RPC submission), so turn off parallel dio
+ * if there is not sufficient grant available. This
+ * makes individual RPCs synchronous.
+ */
+ io->ci_parallel_dio = false;
+ CDEBUG(D_CACHE,
+ "not enough grant available, switching to sync for this i/o\n");
+ }
+ spin_unlock(&cli->cl_loi_list_lock);
+ osc_update_next_shrink(cli);
+ }
+
+ ext->oe_is_rdma_only = !!(brw_flags & OBD_BRW_RDMA_ONLY);
ext->oe_nr_pages = page_count;
ext->oe_mppr = mppr;
list_splice_init(list, &ext->oe_pages);
/* Reuse the initial refcount for RPC, don't drop it */
osc_extent_state_set(ext, OES_LOCK_DONE);
if (!ext->oe_rw) { /* write */
- list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ if (!ext->oe_srvlock && !ext->oe_dio) {
+ /* The most likely case here is from lack of grants
+ * so we are either out of quota or out of space.
+ * Since this means we are holding locks across
+ * potentially multi-striped IO, we must send out
+ * everything out instantly to avoid prolonged
+ * waits resulting in lock eviction (likely since
+ * the extended wait in osc_cache_enter() did not
+ * yield any additional grant due to a timeout.
+ * LU-13131 */
+ ext->oe_hp = 1;
+ list_add_tail(&ext->oe_link, &obj->oo_hp_exts);
+ } else {
+ list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ }
osc_update_pending(obj, OBD_BRW_WRITE, page_count);
} else {
list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
osc_list_maint(cli, obj);
- while (!list_empty(&list)) {
+ while ((ext = list_first_entry_or_null(&list,
+ struct osc_extent,
+ oe_link)) != NULL) {
int rc;
- ext = list_entry(list.next, struct osc_extent, oe_link);
list_del_init(&ext->oe_link);
/* extent may be in OES_ACTIVE state because inode mutex
spin_unlock(&osc->oo_tree_lock);
tree_lock = false;
+ res = (*cb)(env, io, pvec, j, cbdata);
+
for (i = 0; i < j; ++i) {
ops = pvec[i];
- if (res)
- res = (*cb)(env, io, ops, cbdata);
-
page = ops->ops_cl.cpl_page;
lu_ref_del(&page->cp_reference, "gang_lookup", current);
cl_pagevec_put(env, page, pagevec);
if (!res)
break;
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SLOW_PAGE_EVICT,
+ cfs_fail_val ?: 20);
+
+ if (io->ci_type == CIT_MISC &&
+ io->u.ci_misc.lm_next_rpc_time &&
+ ktime_get_seconds() > io->u.ci_misc.lm_next_rpc_time) {
+ osc_send_empty_rpc(osc, idx << PAGE_SHIFT);
+ io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() +
+ 5 * obd_timeout / 16;
+ }
+
if (need_resched())
cond_resched();
* Check if page @page is covered by an extra lock or discard it.
*/
static bool check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
- struct osc_page *ops, void *cbdata)
+ void **pvec, int count, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
struct osc_object *osc = cbdata;
- pgoff_t index;
+ int i;
- index = osc_index(ops);
- if (index >= info->oti_fn_index) {
- struct ldlm_lock *tmp;
+ for (i = 0; i < count; i++) {
+ struct osc_page *ops = pvec[i];
struct cl_page *page = ops->ops_cl.cpl_page;
+ pgoff_t index = osc_index(ops);
+ bool discard = false;
+
+ /* negative lock caching */
+ if (index < info->oti_ng_index) {
+ discard = true;
+ } else if (index >= info->oti_fn_index) {
+ struct ldlm_lock *tmp;
+ /* refresh non-overlapped index */
+ tmp = osc_dlmlock_at_pgoff(env, osc, index,
+ OSC_DAP_FL_TEST_LOCK |
+ OSC_DAP_FL_AST |
+ OSC_DAP_FL_RIGHT);
+ if (tmp != NULL) {
+ __u64 end =
+ tmp->l_policy_data.l_extent.end;
+ __u64 start =
+ tmp->l_policy_data.l_extent.start;
+
+ /* no lock covering this page */
+ if (index < cl_index(osc2cl(osc), start)) {
+ /* no lock at @index,
+ * first lock at @start
+ */
+ info->oti_ng_index =
+ cl_index(osc2cl(osc), start);
+ discard = true;
+ } else {
+ /* Cache the first-non-overlapped
+ * index so as to skip all pages
+ * within [index, oti_fn_index).
+ * This is safe because if tmp lock
+ * is canceled, it will discard these
+ * pages.
+ */
+ info->oti_fn_index =
+ cl_index(osc2cl(osc), end + 1);
+ if (end == OBD_OBJECT_EOF)
+ info->oti_fn_index =
+ CL_PAGE_EOF;
+ }
+ LDLM_LOCK_PUT(tmp);
+ } else {
+ info->oti_ng_index = CL_PAGE_EOF;
+ discard = true;
+ }
+ }
- /* refresh non-overlapped index */
- tmp = osc_dlmlock_at_pgoff(env, osc, index,
- OSC_DAP_FL_TEST_LOCK);
- if (tmp != NULL) {
- __u64 end = tmp->l_policy_data.l_extent.end;
- /* Cache the first-non-overlapped index so as to skip
- * all pages within [index, oti_fn_index). This is safe
- * because if tmp lock is canceled, it will discard
- * these pages. */
- info->oti_fn_index = cl_index(osc2cl(osc), end + 1);
- if (end == OBD_OBJECT_EOF)
- info->oti_fn_index = CL_PAGE_EOF;
- LDLM_LOCK_PUT(tmp);
- } else if (cl_page_own(env, io, page) == 0) {
- /* discard the page */
- cl_page_discard(env, io, page);
- cl_page_disown(env, io, page);
- } else {
- LASSERT(page->cp_state == CPS_FREEING);
+ if (discard) {
+ if (cl_page_own(env, io, page) == 0) {
+ cl_page_discard(env, io, page);
+ cl_page_disown(env, io, page);
+ } else {
+ LASSERT(page->cp_state == CPS_FREEING);
+ }
}
- }
- info->oti_next_index = index + 1;
+ info->oti_next_index = index + 1;
+ }
return true;
}
bool osc_discard_cb(const struct lu_env *env, struct cl_io *io,
- struct osc_page *ops, void *cbdata)
+ void **pvec, int count, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
- struct cl_page *page = ops->ops_cl.cpl_page;
-
- /* page is top page. */
- info->oti_next_index = osc_index(ops) + 1;
- if (cl_page_own(env, io, page) == 0) {
- if (!ergo(page->cp_type == CPT_CACHEABLE,
- !PageDirty(cl_page_vmpage(page))))
- CL_PAGE_DEBUG(D_ERROR, env, page,
- "discard dirty page?\n");
-
- /* discard the page */
- cl_page_discard(env, io, page);
- cl_page_disown(env, io, page);
- } else {
- LASSERT(page->cp_state == CPS_FREEING);
+ int i;
+
+ for (i = 0; i < count; i++) {
+ struct osc_page *ops = pvec[i];
+ struct cl_page *page = ops->ops_cl.cpl_page;
+
+ /* page is top page. */
+ info->oti_next_index = osc_index(ops) + 1;
+ if (cl_page_own(env, io, page) == 0) {
+ if (!ergo(page->cp_type == CPT_CACHEABLE,
+ !PageDirty(cl_page_vmpage(page))))
+ CL_PAGE_DEBUG(D_ERROR, env, page,
+ "discard dirty page?\n");
+
+ /* discard the page */
+ cl_page_discard(env, io, page);
+ cl_page_disown(env, io, page);
+ } else {
+ LASSERT(page->cp_state == CPS_FREEING);
+ }
}
return true;
io->ci_obj = cl_object_top(osc2cl(osc));
io->ci_ignore_layout = 1;
+ io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() +
+ 5 * obd_timeout / 16;
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (result != 0)
GOTO(out, result);
cb = discard ? osc_discard_cb : check_and_discard_cb;
info->oti_fn_index = info->oti_next_index = start;
+ info->oti_ng_index = 0;
osc_page_gang_lookup(env, io, osc,
info->oti_next_index, end, cb, osc);