#define DEBUG_SUBSYSTEM S_OSC
#include <lustre_osc.h>
+#include <lustre_dlm.h>
#include "osc_internal.h"
/* ----- extent part 0 ----- */ \
__ext, EXTPARA(__ext), \
/* ----- part 1 ----- */ \
- atomic_read(&__ext->oe_refc), \
+ kref_read(&__ext->oe_refc), \
atomic_read(&__ext->oe_users), \
list_empty_marker(&__ext->oe_link), \
oes_strings[__ext->oe_state], ext_flags(__ext, __buf), \
if (ext->oe_state >= OES_STATE_MAX)
GOTO(out, rc = 10);
- if (atomic_read(&ext->oe_refc) <= 0)
+ if (kref_read(&ext->oe_refc) <= 0)
GOTO(out, rc = 20);
- if (atomic_read(&ext->oe_refc) < atomic_read(&ext->oe_users))
+ if (kref_read(&ext->oe_refc) < atomic_read(&ext->oe_users))
GOTO(out, rc = 30);
switch (ext->oe_state) {
RB_CLEAR_NODE(&ext->oe_node);
ext->oe_obj = obj;
cl_object_get(osc2cl(obj));
- atomic_set(&ext->oe_refc, 1);
+ kref_init(&ext->oe_refc);
atomic_set(&ext->oe_users, 0);
INIT_LIST_HEAD(&ext->oe_link);
ext->oe_state = OES_INV;
return ext;
}
-static void osc_extent_free(struct osc_extent *ext)
+static void osc_extent_free(struct kref *kref)
{
+ struct osc_extent *ext = container_of(kref, struct osc_extent,
+ oe_refc);
+
+ LASSERT(list_empty(&ext->oe_link));
+ LASSERT(atomic_read(&ext->oe_users) == 0);
+ LASSERT(ext->oe_state == OES_INV);
+ LASSERT(RB_EMPTY_NODE(&ext->oe_node));
+
+ if (ext->oe_dlmlock) {
+ lu_ref_del(&ext->oe_dlmlock->l_reference,
+ "osc_extent", ext);
+ LDLM_LOCK_PUT(ext->oe_dlmlock);
+ ext->oe_dlmlock = NULL;
+ }
+#if 0
+ /* If/When cl_object_put drops the need for 'env',
+ * this code can be enabled, and matching code in
+ * osc_extent_put removed.
+ */
+ cl_object_put(osc2cl(ext->oe_obj));
+
OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
+#endif
}
static struct osc_extent *osc_extent_get(struct osc_extent *ext)
{
- LASSERT(atomic_read(&ext->oe_refc) >= 0);
- atomic_inc(&ext->oe_refc);
+ LASSERT(kref_read(&ext->oe_refc) >= 0);
+ kref_get(&ext->oe_refc);
return ext;
}
static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
{
- LASSERT(atomic_read(&ext->oe_refc) > 0);
- if (atomic_dec_and_test(&ext->oe_refc)) {
- LASSERT(list_empty(&ext->oe_link));
- LASSERT(atomic_read(&ext->oe_users) == 0);
- LASSERT(ext->oe_state == OES_INV);
- LASSERT(RB_EMPTY_NODE(&ext->oe_node));
-
- if (ext->oe_dlmlock != NULL) {
- lu_ref_del(&ext->oe_dlmlock->l_reference,
- "osc_extent", ext);
- LDLM_LOCK_RELEASE(ext->oe_dlmlock);
- ext->oe_dlmlock = NULL;
- }
+ LASSERT(kref_read(&ext->oe_refc) > 0);
+ if (kref_put(&ext->oe_refc, osc_extent_free)) {
+ /* This should be in osc_extent_free(), but
+ * while we need to pass 'env' it cannot be.
+ */
cl_object_put(env, osc2cl(ext->oe_obj));
- osc_extent_free(ext);
+
+ OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
}
}
*/
static void osc_extent_put_trust(struct osc_extent *ext)
{
- LASSERT(atomic_read(&ext->oe_refc) > 1);
+ LASSERT(kref_read(&ext->oe_refc) > 1);
assert_osc_object_is_locked(ext->oe_obj);
- atomic_dec(&ext->oe_refc);
+ osc_extent_put(NULL, ext);
}
/**
if (cur->oe_max_end != victim->oe_max_end)
return -ERANGE;
+ /*
+ * In the rare case max_pages_per_rpc (mppr) is changed, don't
+ * merge extents until after old ones have been sent, or the
+ * "extents are aligned to RPCs" checks are unhappy.
+ */
+ if (cur->oe_mppr != victim->oe_mppr)
+ return -ERANGE;
+
LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
chunk_start = cur->oe_start >> ppc_bits;
cur->oe_urgent |= victim->oe_urgent;
cur->oe_memalloc |= victim->oe_memalloc;
list_splice_init(&victim->oe_pages, &cur->oe_pages);
- list_del_init(&victim->oe_link);
victim->oe_nr_pages = 0;
osc_extent_get(victim);
if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
grant += cli->cl_grant_extent_tax;
- if (ext->oe_urgent)
+ if (ext->oe_hp)
+ list_move_tail(&ext->oe_link,
+ &obj->oo_hp_exts);
+ else if (ext->oe_urgent)
list_move_tail(&ext->oe_link,
&obj->oo_urgent_exts);
else if (ext->oe_nr_pages == ext->oe_mppr) {
cur->oe_start = descr->cld_start;
if (cur->oe_end > max_end)
cur->oe_end = max_end;
- cur->oe_grants = 0;
+ cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
cur->oe_mppr = max_pages;
if (olck->ols_dlmlock != NULL) {
LASSERT(olck->ols_hold);
* flushed, try next one. */
continue;
- /* check if they belong to the same rpc slot before trying to
- * merge. the extents are not overlapped and contiguous at
- * chunk level to get here. */
- if (ext->oe_max_end != max_end)
- /* if they don't belong to the same RPC slot or
- * max_pages_per_rpc has ever changed, do not merge. */
- continue;
-
- /* check whether maximum extent size will be hit */
- if ((ext_chk_end - ext_chk_start + 1 + 1) << ppc_bits >
- cli->cl_max_extent_pages)
- continue;
-
- /* it's required that an extent must be contiguous at chunk
- * level so that we know the whole extent is covered by grant
- * (the pages in the extent are NOT required to be contiguous).
- * Otherwise, it will be too much difficult to know which
- * chunks have grants allocated. */
-
- /* try to do front merge - extend ext's start */
- if (chunk + 1 == ext_chk_start) {
- /* ext must be chunk size aligned */
- EASSERT((ext->oe_start & ~chunk_mask) == 0, ext);
-
- /* pull ext's start back to cover cur */
- ext->oe_start = cur->oe_start;
- ext->oe_grants += chunksize;
+ if (osc_extent_merge(env, ext, cur) == 0) {
LASSERT(*grants >= chunksize);
*grants -= chunksize;
-
found = osc_extent_hold(ext);
- } else if (chunk == ext_chk_end + 1) {
- /* rear merge */
- ext->oe_end = cur->oe_end;
- ext->oe_grants += chunksize;
- LASSERT(*grants >= chunksize);
- *grants -= chunksize;
- /* try to merge with the next one because we just fill
- * in a gap */
+ /*
+ * Try to merge with the next one too because we
+ * might have just filled in a gap.
+ */
if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
/* we can save extent tax from next extent */
*grants += cli->cl_grant_extent_tax;
- found = osc_extent_hold(ext);
- }
- if (found != NULL)
break;
+ }
}
osc_extent_tree_dump(D_CACHE, obj);
} else if (conflict == NULL) {
/* create a new extent */
EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
- cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
LASSERT(*grants >= cur->oe_grants);
*grants -= cur->oe_grants;
static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
struct osc_async_page *oap, int bytes)
{
- struct osc_object *osc = oap->oap_obj;
- struct lov_oinfo *loi = osc->oo_oinfo;
- int rc = -EDQUOT;
- unsigned long timeout = cfs_time_seconds(AT_OFF ? obd_timeout : at_max);
+ struct osc_object *osc = oap->oap_obj;
+ struct lov_oinfo *loi = osc->oo_oinfo;
+ int rc = -EDQUOT;
int remain;
bool entered = false;
+ /* We cannot wait for a long time here since we are holding ldlm lock
+ * across the actual IO. If no requests complete fast (e.g. due to
+ * overloaded OST that takes a long time to process everything, we'd
+ * get evicted if we wait for a normal obd_timeout or some such.
+ * So we try to wait half the time it would take the client to be
+ * evicted by server which is half obd_timeout when AT is off
+ * or at least ldlm_enqueue_min with AT on.
+ * See LU-13131 */
+ unsigned long timeout = cfs_time_seconds(AT_OFF ? obd_timeout / 2 :
+ ldlm_enqueue_min / 2);
ENTRY;
while (!list_empty(&obj->oo_hp_exts)) {
ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
oe_link);
- LASSERT(ext->oe_state == OES_CACHE);
if (!try_to_add_extent_for_io(cli, ext, &data))
return data.erd_page_count;
EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
- if (osc_max_rpc_in_flight(cli, osc)) {
+ /* even if we have reached our max in flight RPCs, we still
+ * allow all high-priority RPCs through to prevent their
+ * starvation and leading to server evicting us for not
+ * writing out pages in a timely manner LU-13131 */
+ if (osc_max_rpc_in_flight(cli, osc) &&
+ list_empty(&osc->oo_hp_exts)) {
__osc_list_maint(cli, osc);
break;
}
oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
spin_unlock(&oap->oap_lock);
- if (memory_pressure_get())
+ if (current->flags & PF_MEMALLOC)
ext->oe_memalloc = 1;
ext->oe_urgent = 1;
/* Reuse the initial refcount for RPC, don't drop it */
osc_extent_state_set(ext, OES_LOCK_DONE);
if (!ext->oe_rw) { /* write */
- list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ if (!ext->oe_srvlock && !ext->oe_dio) {
+ /* The most likely case here is from lack of grants
+ * so we are either out of quota or out of space.
+ * Since this means we are holding locks across
+ * potentially multi-striped IO, we must send out
+ * everything out instantly to avoid prolonged
+ * waits resulting in lock eviction (likely since
+ * the extended wait in osc_cache_enter() did not
+ * yield any additional grant due to a timeout.
+ * LU-13131 */
+ ext->oe_hp = 1;
+ list_add_tail(&ext->oe_link, &obj->oo_hp_exts);
+ } else {
+ list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ }
osc_update_pending(obj, OBD_BRW_WRITE, page_count);
} else {
list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
EASSERT(!ext->oe_hp, ext);
ext->oe_hp = 1;
list = &obj->oo_hp_exts;
- } else if (!ext->oe_urgent) {
+ } else if (!ext->oe_urgent && !ext->oe_hp) {
ext->oe_urgent = 1;
list = &obj->oo_urgent_exts;
}