#define DEBUG_SUBSYSTEM S_OSC
#include <lustre_osc.h>
+#include <lustre_dlm.h>
#include "osc_internal.h"
{
char *buf = flags;
*buf++ = ext->oe_rw ? 'r' : 'w';
- if (ext->oe_intree)
+ if (!RB_EMPTY_NODE(&ext->oe_node))
*buf++ = 'i';
if (ext->oe_sync)
*buf++ = 'S';
/* ----- extent part 0 ----- */ \
__ext, EXTPARA(__ext), \
/* ----- part 1 ----- */ \
- atomic_read(&__ext->oe_refc), \
+ kref_read(&__ext->oe_refc), \
atomic_read(&__ext->oe_users), \
list_empty_marker(&__ext->oe_link), \
oes_strings[__ext->oe_state], ext_flags(__ext, __buf), \
static inline struct osc_extent *rb_extent(struct rb_node *n)
{
- if (n == NULL)
- return NULL;
-
- return container_of(n, struct osc_extent, oe_node);
+ return rb_entry_safe(n, struct osc_extent, oe_node);
}
static inline struct osc_extent *next_extent(struct osc_extent *ext)
if (ext == NULL)
return NULL;
- LASSERT(ext->oe_intree);
+ LASSERT(!RB_EMPTY_NODE(&ext->oe_node));
return rb_extent(rb_next(&ext->oe_node));
}
if (ext == NULL)
return NULL;
- LASSERT(ext->oe_intree);
+ LASSERT(!RB_EMPTY_NODE(&ext->oe_node));
return rb_extent(rb_prev(&ext->oe_node));
}
if (ext->oe_state >= OES_STATE_MAX)
GOTO(out, rc = 10);
- if (atomic_read(&ext->oe_refc) <= 0)
+ if (kref_read(&ext->oe_refc) <= 0)
GOTO(out, rc = 20);
- if (atomic_read(&ext->oe_refc) < atomic_read(&ext->oe_users))
+ if (kref_read(&ext->oe_refc) < atomic_read(&ext->oe_users))
GOTO(out, rc = 30);
switch (ext->oe_state) {
/* LASSERT(sanity_check_nolock(ext) == 0); */
/* TODO: validate the state machine */
- ext->oe_state = state;
- wake_up_all(&ext->oe_waitq);
+ smp_store_release(&ext->oe_state, state);
+ wake_up(&ext->oe_waitq);
}
static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
RB_CLEAR_NODE(&ext->oe_node);
ext->oe_obj = obj;
cl_object_get(osc2cl(obj));
- atomic_set(&ext->oe_refc, 1);
+ kref_init(&ext->oe_refc);
atomic_set(&ext->oe_users, 0);
INIT_LIST_HEAD(&ext->oe_link);
ext->oe_state = OES_INV;
return ext;
}
-static void osc_extent_free(struct osc_extent *ext)
+static void osc_extent_free(struct kref *kref)
{
+ struct osc_extent *ext = container_of(kref, struct osc_extent,
+ oe_refc);
+
+ LASSERT(list_empty(&ext->oe_link));
+ LASSERT(atomic_read(&ext->oe_users) == 0);
+ LASSERT(ext->oe_state == OES_INV);
+ LASSERT(RB_EMPTY_NODE(&ext->oe_node));
+
+ if (ext->oe_dlmlock) {
+ lu_ref_del(&ext->oe_dlmlock->l_reference,
+ "osc_extent", ext);
+ LDLM_LOCK_PUT(ext->oe_dlmlock);
+ ext->oe_dlmlock = NULL;
+ }
+#if 0
+ /* If/When cl_object_put drops the need for 'env',
+ * this code can be enabled, and matching code in
+ * osc_extent_put removed.
+ */
+ cl_object_put(osc2cl(ext->oe_obj));
+
OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
+#endif
}
static struct osc_extent *osc_extent_get(struct osc_extent *ext)
{
- LASSERT(atomic_read(&ext->oe_refc) >= 0);
- atomic_inc(&ext->oe_refc);
+ LASSERT(kref_read(&ext->oe_refc) >= 0);
+ kref_get(&ext->oe_refc);
return ext;
}
static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
{
- LASSERT(atomic_read(&ext->oe_refc) > 0);
- if (atomic_dec_and_test(&ext->oe_refc)) {
- LASSERT(list_empty(&ext->oe_link));
- LASSERT(atomic_read(&ext->oe_users) == 0);
- LASSERT(ext->oe_state == OES_INV);
- LASSERT(!ext->oe_intree);
-
- if (ext->oe_dlmlock != NULL) {
- lu_ref_add(&ext->oe_dlmlock->l_reference,
- "osc_extent", ext);
- LDLM_LOCK_PUT(ext->oe_dlmlock);
- ext->oe_dlmlock = NULL;
- }
+ LASSERT(kref_read(&ext->oe_refc) > 0);
+ if (kref_put(&ext->oe_refc, osc_extent_free)) {
+ /* This should be in osc_extent_free(), but
+ * while we need to pass 'env' it cannot be.
+ */
cl_object_put(env, osc2cl(ext->oe_obj));
- osc_extent_free(ext);
+
+ OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
}
}
*/
static void osc_extent_put_trust(struct osc_extent *ext)
{
- LASSERT(atomic_read(&ext->oe_refc) > 1);
+ LASSERT(kref_read(&ext->oe_refc) > 1);
assert_osc_object_is_locked(ext->oe_obj);
- atomic_dec(&ext->oe_refc);
+ osc_extent_put(NULL, ext);
}
/**
struct rb_node *parent = NULL;
struct osc_extent *tmp;
- LASSERT(ext->oe_intree == 0);
+ LASSERT(RB_EMPTY_NODE(&ext->oe_node));
LASSERT(ext->oe_obj == obj);
assert_osc_object_is_locked(obj);
while (*n != NULL) {
rb_link_node(&ext->oe_node, parent, n);
rb_insert_color(&ext->oe_node, &obj->oo_root);
osc_extent_get(ext);
- ext->oe_intree = 1;
}
/* caller must have held object lock. */
{
struct osc_object *obj = ext->oe_obj;
assert_osc_object_is_locked(obj);
- if (ext->oe_intree) {
+ if (!RB_EMPTY_NODE(&ext->oe_node)) {
rb_erase(&ext->oe_node, &obj->oo_root);
- ext->oe_intree = 0;
+ RB_CLEAR_NODE(&ext->oe_node);
/* rbtree held a refcount */
osc_extent_put_trust(ext);
}
if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
grant += cli->cl_grant_extent_tax;
- if (ext->oe_urgent)
+ if (ext->oe_hp)
+ list_move_tail(&ext->oe_link,
+ &obj->oo_hp_exts);
+ else if (ext->oe_urgent)
list_move_tail(&ext->oe_link,
&obj->oo_urgent_exts);
else if (ext->oe_nr_pages == ext->oe_mppr) {
RETURN(0);
}
-static int extent_wait_cb(struct osc_extent *ext, enum osc_extent_state state)
-{
- int ret;
-
- osc_object_lock(ext->oe_obj);
- ret = ext->oe_state == state;
- osc_object_unlock(ext->oe_obj);
-
- return ret;
-}
-
/**
* Wait for the extent's state to become @state.
*/
osc_extent_release(env, ext);
/* wait for the extent until its state becomes @state */
- rc = wait_event_idle_timeout(ext->oe_waitq, extent_wait_cb(ext, state),
+ rc = wait_event_idle_timeout(ext->oe_waitq,
+ smp_load_acquire(&ext->oe_state) == state,
cfs_time_seconds(600));
if (rc == 0) {
OSC_EXTENT_DUMP(D_ERROR, ext,
"%s: wait ext to %u timedout, recovery in progress?\n",
cli_name(osc_cli(obj)), state);
- wait_event_idle(ext->oe_waitq, extent_wait_cb(ext, state));
+ wait_event_idle(ext->oe_waitq,
+ smp_load_acquire(&ext->oe_state) == state);
}
if (ext->oe_rc < 0)
rc = ext->oe_rc;
* the size of file. */
if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
- LASSERT(last_oap_count > 0);
+ LASSERTF(last_oap_count > 0,
+ "last_oap_count %d\n", last_oap_count);
LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE);
last->oap_count = last_oap_count;
spin_lock(&last->oap_lock);
return rc;
}
-static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
+/* Following two inlines exist to pass code fragments
+ * to wait_event_idle_exclusive_timeout_cmd(). Passing
+ * code fragments as macro args can look confusing, so
+ * we provide inlines to encapsulate them.
+ */
+static inline void cli_unlock_and_unplug(const struct lu_env *env,
+ struct client_obd *cli,
+ struct osc_async_page *oap)
{
- int rc;
- spin_lock(&cli->cl_loi_list_lock);
- rc = list_empty(&ocw->ocw_entry);
spin_unlock(&cli->cl_loi_list_lock);
- return rc;
+ osc_io_unplug_async(env, cli, NULL);
+ CDEBUG(D_CACHE,
+ "%s: sleeping for cache space for %p\n",
+ cli_name(cli), oap);
}
+static inline void cli_lock_after_unplug(struct client_obd *cli)
+{
+ spin_lock(&cli->cl_loi_list_lock);
+}
/**
* The main entry to reserve dirty page accounting. Usually the grant reserved
* in this function will be freed in bulk in osc_free_grant() unless it fails
static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
struct osc_async_page *oap, int bytes)
{
- struct osc_object *osc = oap->oap_obj;
- struct lov_oinfo *loi = osc->oo_oinfo;
- struct osc_cache_waiter ocw;
- int rc = -EDQUOT;
+ struct osc_object *osc = oap->oap_obj;
+ struct lov_oinfo *loi = osc->oo_oinfo;
+ int rc = -EDQUOT;
+ int remain;
+ bool entered = false;
+ /* We cannot wait for a long time here since we are holding ldlm lock
+ * across the actual IO. If no requests complete fast (e.g. due to
+ * overloaded OST that takes a long time to process everything, we'd
+ * get evicted if we wait for a normal obd_timeout or some such.
+ * So we try to wait half the time it would take the client to be
+ * evicted by server which is half obd_timeout when AT is off
+ * or at least ldlm_enqueue_min with AT on.
+ * See LU-13131 */
+ unsigned long timeout = cfs_time_seconds(AT_OFF ? obd_timeout / 2 :
+ ldlm_enqueue_min / 2);
+
ENTRY;
OSC_DUMP_GRANT(D_CACHE, cli, "need:%d\n", bytes);
GOTO(out, rc = -EDQUOT);
}
- /* Hopefully normal case - cache space and write credits available */
- if (list_empty(&cli->cl_cache_waiters) &&
- osc_enter_cache_try(cli, oap, bytes)) {
- OSC_DUMP_GRANT(D_CACHE, cli, "granted from cache\n");
- GOTO(out, rc = 0);
- }
-
- /* We can get here for two reasons: too many dirty pages in cache, or
+ /*
+ * We can wait here for two reasons: too many dirty pages in cache, or
* run out of grants. In both cases we should write dirty pages out.
* Adding a cache waiter will trigger urgent write-out no matter what
* RPC size will be.
- * The exiting condition is no avail grants and no dirty pages caching,
- * that really means there is no space on the OST. */
- init_waitqueue_head(&ocw.ocw_waitq);
- ocw.ocw_oap = oap;
- ocw.ocw_grant = bytes;
- while (cli->cl_dirty_pages > 0 || cli->cl_w_in_flight > 0) {
- list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
- ocw.ocw_rc = 0;
- spin_unlock(&cli->cl_loi_list_lock);
-
- osc_io_unplug_async(env, cli, NULL);
-
- CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
- cli_name(cli), &ocw, oap);
-
- rc = wait_event_idle_timeout(ocw.ocw_waitq,
- ocw_granted(cli, &ocw),
- cfs_time_seconds(AT_OFF ?
- obd_timeout :
- at_max));
-
- spin_lock(&cli->cl_loi_list_lock);
-
- if (rc <= 0) {
- /* l_wait_event is interrupted by signal or timed out */
- list_del_init(&ocw.ocw_entry);
- if (rc == 0)
- rc = -ETIMEDOUT;
- break;
- }
- LASSERT(list_empty(&ocw.ocw_entry));
- rc = ocw.ocw_rc;
-
- if (rc != -EDQUOT)
- break;
- if (osc_enter_cache_try(cli, oap, bytes)) {
- rc = 0;
- break;
- }
- }
-
- switch (rc) {
- case 0:
- OSC_DUMP_GRANT(D_CACHE, cli, "finally got grant space\n");
- break;
- case -ETIMEDOUT:
+ * The exiting condition (other than success) is no avail grants
+ * and no dirty pages caching, that really means there is no space
+ * on the OST.
+ */
+ remain = wait_event_idle_exclusive_timeout_cmd(
+ cli->cl_cache_waiters,
+ (entered = osc_enter_cache_try(cli, oap, bytes)) ||
+ (cli->cl_dirty_pages == 0 && cli->cl_w_in_flight == 0),
+ timeout,
+ cli_unlock_and_unplug(env, cli, oap),
+ cli_lock_after_unplug(cli));
+
+ if (entered) {
+ if (remain == timeout)
+ OSC_DUMP_GRANT(D_CACHE, cli, "granted from cache\n");
+ else
+ OSC_DUMP_GRANT(D_CACHE, cli,
+ "finally got grant space\n");
+ wake_up(&cli->cl_cache_waiters);
+ rc = 0;
+ } else if (remain == 0) {
OSC_DUMP_GRANT(D_CACHE, cli,
"timeout, fall back to sync i/o\n");
osc_extent_tree_dump(D_CACHE, osc);
/* fall back to synchronous I/O */
- rc = -EDQUOT;
- break;
- case -EINTR:
- /* Ensures restartability - LU-3581 */
- OSC_DUMP_GRANT(D_CACHE, cli, "interrupted\n");
- rc = -ERESTARTSYS;
- break;
- case -EDQUOT:
+ } else {
OSC_DUMP_GRANT(D_CACHE, cli,
"no grant space, fall back to sync i/o\n");
- break;
- default:
- CDEBUG(D_CACHE, "%s: event for cache space @ %p never arrived "
- "due to %d, fall back to sync i/o\n",
- cli_name(cli), &ocw, rc);
- break;
+ wake_up_all(&cli->cl_cache_waiters);
}
EXIT;
out:
RETURN(rc);
}
-/* caller must hold loi_list_lock */
-void osc_wake_cache_waiters(struct client_obd *cli)
-{
- struct list_head *l, *tmp;
- struct osc_cache_waiter *ocw;
-
- ENTRY;
- list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
- ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
-
- ocw->ocw_rc = -EDQUOT;
-
- if (osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant))
- ocw->ocw_rc = 0;
-
- if (ocw->ocw_rc == 0 ||
- !(cli->cl_dirty_pages > 0 || cli->cl_w_in_flight > 0)) {
- list_del_init(&ocw->ocw_entry);
- CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant "
- "%ld, %d\n", ocw, ocw->ocw_oap,
- cli->cl_avail_grant, ocw->ocw_rc);
-
- wake_up(&ocw->ocw_waitq);
- }
- }
-
- EXIT;
-}
-EXPORT_SYMBOL(osc_wake_cache_waiters);
-
static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
{
int hprpc = !!list_empty(&osc->oo_hp_exts);
}
/* trigger a write rpc stream as long as there are dirtiers
* waiting for space. as they're waiting, they're not going to
- * create more pages to coalesce with what's waiting.. */
- if (!list_empty(&cli->cl_cache_waiters)) {
+ * create more pages to coalesce with what's waiting..
+ */
+ if (waitqueue_active(&cli->cl_cache_waiters)) {
CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
RETURN(1);
}
if (in_rpc->oe_dio && overlapped(ext, in_rpc))
return false;
+ if (ext->oe_is_rdma_only != in_rpc->oe_is_rdma_only)
+ return false;
+
return true;
}
while (!list_empty(&obj->oo_hp_exts)) {
ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
oe_link);
- LASSERT(ext->oe_state == OES_CACHE);
if (!try_to_add_extent_for_io(cli, ext, &data))
return data.erd_page_count;
EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
if (data.erd_page_count == data.erd_max_pages)
return data.erd_page_count;
- ext = first_extent(obj);
- while (ext != NULL) {
+ for (ext = first_extent(obj);
+ ext;
+ ext = next_extent(ext)) {
if ((ext->oe_state != OES_CACHE) ||
/* this extent may be already in current rpclist */
- (!list_empty(&ext->oe_link) && ext->oe_owner != NULL)) {
- ext = next_extent(ext);
+ (!list_empty(&ext->oe_link) && ext->oe_owner))
continue;
- }
if (!try_to_add_extent_for_io(cli, ext, &data))
return data.erd_page_count;
-
- ext = next_extent(ext);
}
return data.erd_page_count;
}
/* then if we have cache waiters, return all objects with queued
* writes. This is especially important when many small files
* have filled up the cache and not been fired into rpcs because
- * they don't pass the nr_pending/object threshhold */
- if (!list_empty(&cli->cl_cache_waiters) &&
+ * they don't pass the nr_pending/object threshhold
+ */
+ if (waitqueue_active(&cli->cl_cache_waiters) &&
!list_empty(&cli->cl_loi_write_list))
RETURN(list_to_obj(&cli->cl_loi_write_list, write_item));
OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
- if (osc_max_rpc_in_flight(cli, osc)) {
+ /* even if we have reached our max in flight RPCs, we still
+ * allow all high-priority RPCs through to prevent their
+ * starvation and leading to server evicting us for not
+ * writing out pages in a timely manner LU-13131 */
+ if (osc_max_rpc_in_flight(cli, osc) &&
+ list_empty(&osc->oo_hp_exts)) {
__osc_list_maint(cli, osc);
break;
}
oap->oap_cmd = cmd;
oap->oap_page_off = ops->ops_from;
- oap->oap_count = ops->ops_to - ops->ops_from;
+ oap->oap_count = ops->ops_to - ops->ops_from + 1;
/* No need to hold a lock here,
* since this page is not in any list yet. */
oap->oap_async_flags = 0;
oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
spin_unlock(&oap->oap_lock);
- if (memory_pressure_get())
+ if (current->flags & PF_MEMALLOC)
ext->oe_memalloc = 1;
ext->oe_urgent = 1;
++page_count;
mppr <<= (page_count > mppr);
- if (unlikely(opg->ops_from > 0 || opg->ops_to < PAGE_SIZE))
+ if (unlikely(opg->ops_from > 0 ||
+ opg->ops_to < PAGE_SIZE - 1))
can_merge = false;
}
ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
ext->oe_dio = !!(brw_flags & OBD_BRW_NOCACHE);
+ if (ext->oe_dio && !ext->oe_rw) { /* direct io write */
+ int grants;
+ int ppc;
+
+ ppc = 1 << (cli->cl_chunkbits - PAGE_SHIFT);
+ grants = cli->cl_grant_extent_tax;
+ grants += (1 << cli->cl_chunkbits) *
+ ((page_count + ppc - 1) / ppc);
+
+ spin_lock(&cli->cl_loi_list_lock);
+ if (osc_reserve_grant(cli, grants) == 0) {
+ list_for_each_entry(oap, list, oap_pending_item) {
+ osc_consume_write_grant(cli,
+ &oap->oap_brw_page);
+ atomic_long_inc(&obd_dirty_pages);
+ }
+ osc_unreserve_grant_nolock(cli, grants, 0);
+ ext->oe_grants = grants;
+ }
+ spin_unlock(&cli->cl_loi_list_lock);
+ }
+
+ ext->oe_is_rdma_only = !!(brw_flags & OBD_BRW_RDMA_ONLY);
ext->oe_nr_pages = page_count;
ext->oe_mppr = mppr;
list_splice_init(list, &ext->oe_pages);
/* Reuse the initial refcount for RPC, don't drop it */
osc_extent_state_set(ext, OES_LOCK_DONE);
if (!ext->oe_rw) { /* write */
- list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ if (!ext->oe_srvlock && !ext->oe_dio) {
+ /* The most likely case here is from lack of grants
+ * so we are either out of quota or out of space.
+ * Since this means we are holding locks across
+ * potentially multi-striped IO, we must send out
+ * everything out instantly to avoid prolonged
+ * waits resulting in lock eviction (likely since
+ * the extended wait in osc_cache_enter() did not
+ * yield any additional grant due to a timeout.
+ * LU-13131 */
+ ext->oe_hp = 1;
+ list_add_tail(&ext->oe_link, &obj->oo_hp_exts);
+ } else {
+ list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ }
osc_update_pending(obj, OBD_BRW_WRITE, page_count);
} else {
list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
EASSERT(!ext->oe_hp, ext);
ext->oe_hp = 1;
list = &obj->oo_hp_exts;
- } else if (!ext->oe_urgent) {
+ } else if (!ext->oe_urgent && !ext->oe_hp) {
ext->oe_urgent = 1;
list = &obj->oo_urgent_exts;
}
/**
* Returns a list of pages by a given [start, end] of \a obj.
*
- * \param resched If not NULL, then we give up before hogging CPU for too
- * long and set *resched = 1, in that case caller should implement a retry
- * logic.
- *
* Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
* crucial in the face of [offset, EOF] locks.
*
* Return at least one page in @queue unless there is no covered page.
*/
-int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
- struct osc_object *osc, pgoff_t start, pgoff_t end,
- osc_page_gang_cbt cb, void *cbdata)
+bool osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
+ struct osc_object *osc, pgoff_t start, pgoff_t end,
+ osc_page_gang_cbt cb, void *cbdata)
{
struct osc_page *ops;
struct pagevec *pagevec;
unsigned int nr;
unsigned int i;
unsigned int j;
- int res = CLP_GANG_OKAY;
+ bool res = true;
bool tree_lock = true;
ENTRY;
for (i = 0; i < j; ++i) {
ops = pvec[i];
- if (res == CLP_GANG_OKAY)
+ if (res)
res = (*cb)(env, io, ops, cbdata);
page = ops->ops_cl.cpl_page;
if (nr < OTI_PVEC_SIZE || end_of_region)
break;
- if (res == CLP_GANG_OKAY && need_resched())
- res = CLP_GANG_RESCHED;
- if (res != CLP_GANG_OKAY)
+ if (!res)
break;
+ if (need_resched())
+ cond_resched();
spin_lock(&osc->oo_tree_lock);
tree_lock = true;
/**
* Check if page @page is covered by an extra lock or discard it.
*/
-static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
+static bool check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
struct osc_object *osc = cbdata;
+ struct cl_page *page = ops->ops_cl.cpl_page;
pgoff_t index;
+ bool discard = false;
index = osc_index(ops);
- if (index >= info->oti_fn_index) {
- struct ldlm_lock *tmp;
- struct cl_page *page = ops->ops_cl.cpl_page;
+ /* negative lock caching */
+ if (index < info->oti_ng_index) {
+ discard = true;
+ } else if (index >= info->oti_fn_index) {
+ struct ldlm_lock *tmp;
/* refresh non-overlapped index */
tmp = osc_dlmlock_at_pgoff(env, osc, index,
- OSC_DAP_FL_TEST_LOCK);
+ OSC_DAP_FL_TEST_LOCK |
+ OSC_DAP_FL_AST | OSC_DAP_FL_RIGHT);
if (tmp != NULL) {
__u64 end = tmp->l_policy_data.l_extent.end;
- /* Cache the first-non-overlapped index so as to skip
- * all pages within [index, oti_fn_index). This is safe
- * because if tmp lock is canceled, it will discard
- * these pages. */
- info->oti_fn_index = cl_index(osc2cl(osc), end + 1);
- if (end == OBD_OBJECT_EOF)
- info->oti_fn_index = CL_PAGE_EOF;
+ __u64 start = tmp->l_policy_data.l_extent.start;
+
+ /* no lock covering this page */
+ if (index < cl_index(osc2cl(osc), start)) {
+ /* no lock at @index, first lock at @start */
+ info->oti_ng_index = cl_index(osc2cl(osc),
+ start);
+ discard = true;
+ } else {
+ /* Cache the first-non-overlapped index so as to
+ * skip all pages within [index, oti_fn_index).
+ * This is safe because if tmp lock is canceled,
+ * it will discard these pages.
+ */
+ info->oti_fn_index = cl_index(osc2cl(osc),
+ end + 1);
+ if (end == OBD_OBJECT_EOF)
+ info->oti_fn_index = CL_PAGE_EOF;
+ }
LDLM_LOCK_PUT(tmp);
- } else if (cl_page_own(env, io, page) == 0) {
- /* discard the page */
+ } else {
+ info->oti_ng_index = CL_PAGE_EOF;
+ discard = true;
+ }
+ }
+
+ if (discard) {
+ if (cl_page_own(env, io, page) == 0) {
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
} else {
}
info->oti_next_index = index + 1;
- return CLP_GANG_OKAY;
+ return true;
}
-int osc_discard_cb(const struct lu_env *env, struct cl_io *io,
+bool osc_discard_cb(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
LASSERT(page->cp_state == CPS_FREEING);
}
- return CLP_GANG_OKAY;
+ return true;
}
EXPORT_SYMBOL(osc_discard_cb);
struct osc_thread_info *info = osc_env_info(env);
struct cl_io *io = osc_env_thread_io(env);
osc_page_gang_cbt cb;
- int res;
int result;
ENTRY;
cb = discard ? osc_discard_cb : check_and_discard_cb;
info->oti_fn_index = info->oti_next_index = start;
- do {
- res = osc_page_gang_lookup(env, io, osc,
- info->oti_next_index, end, cb, osc);
- if (info->oti_next_index > end)
- break;
+ info->oti_ng_index = 0;
- if (res == CLP_GANG_RESCHED)
- cond_resched();
- } while (res != CLP_GANG_OKAY);
+ osc_page_gang_lookup(env, io, osc,
+ info->oti_next_index, end, cb, osc);
out:
cl_io_fini(env, io);
RETURN(result);