#define DEBUG_SUBSYSTEM S_OSC
#include <lustre_osc.h>
+#include <lustre_dlm.h>
#include "osc_internal.h"
* the size of file. */
if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
- LASSERT(last_oap_count > 0);
+ LASSERTF(last_oap_count > 0,
+ "last_oap_count %d\n", last_oap_count);
LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE);
last->oap_count = last_oap_count;
spin_lock(&last->oap_lock);
static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
struct osc_async_page *oap, int bytes)
{
- struct osc_object *osc = oap->oap_obj;
- struct lov_oinfo *loi = osc->oo_oinfo;
- int rc = -EDQUOT;
- unsigned long timeout = cfs_time_seconds(AT_OFF ? obd_timeout : at_max);
+ struct osc_object *osc = oap->oap_obj;
+ struct lov_oinfo *loi = osc->oo_oinfo;
+ int rc = -EDQUOT;
int remain;
bool entered = false;
+ /* We cannot wait for a long time here since we are holding ldlm lock
+ * across the actual IO. If no requests complete fast (e.g. due to
+ * overloaded OST that takes a long time to process everything, we'd
+ * get evicted if we wait for a normal obd_timeout or some such.
+ * So we try to wait half the time it would take the client to be
+ * evicted by server which is half obd_timeout when AT is off
+ * or at least ldlm_enqueue_min with AT on.
+ * See LU-13131 */
+ unsigned long timeout = cfs_time_seconds(AT_OFF ? obd_timeout / 2 :
+ ldlm_enqueue_min / 2);
ENTRY;
if (in_rpc->oe_dio && overlapped(ext, in_rpc))
return false;
+ if (ext->oe_is_rdma_only != in_rpc->oe_is_rdma_only)
+ return false;
+
return true;
}
while (!list_empty(&obj->oo_hp_exts)) {
ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
oe_link);
- LASSERT(ext->oe_state == OES_CACHE);
if (!try_to_add_extent_for_io(cli, ext, &data))
return data.erd_page_count;
EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
oap->oap_cmd = cmd;
oap->oap_page_off = ops->ops_from;
- oap->oap_count = ops->ops_to - ops->ops_from;
+ oap->oap_count = ops->ops_to - ops->ops_from + 1;
/* No need to hold a lock here,
* since this page is not in any list yet. */
oap->oap_async_flags = 0;
oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
spin_unlock(&oap->oap_lock);
- if (memory_pressure_get())
+ if (current->flags & PF_MEMALLOC)
ext->oe_memalloc = 1;
ext->oe_urgent = 1;
++page_count;
mppr <<= (page_count > mppr);
- if (unlikely(opg->ops_from > 0 || opg->ops_to < PAGE_SIZE))
+ if (unlikely(opg->ops_from > 0 ||
+ opg->ops_to < PAGE_SIZE - 1))
can_merge = false;
}
ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
ext->oe_dio = !!(brw_flags & OBD_BRW_NOCACHE);
+ if (ext->oe_dio && !ext->oe_rw) { /* direct io write */
+ int grants;
+ int ppc;
+
+ ppc = 1 << (cli->cl_chunkbits - PAGE_SHIFT);
+ grants = cli->cl_grant_extent_tax;
+ grants += (1 << cli->cl_chunkbits) *
+ ((page_count + ppc - 1) / ppc);
+
+ spin_lock(&cli->cl_loi_list_lock);
+ if (osc_reserve_grant(cli, grants) == 0) {
+ list_for_each_entry(oap, list, oap_pending_item) {
+ osc_consume_write_grant(cli,
+ &oap->oap_brw_page);
+ atomic_long_inc(&obd_dirty_pages);
+ }
+ osc_unreserve_grant_nolock(cli, grants, 0);
+ ext->oe_grants = grants;
+ }
+ spin_unlock(&cli->cl_loi_list_lock);
+ }
+
+ ext->oe_is_rdma_only = !!(brw_flags & OBD_BRW_RDMA_ONLY);
ext->oe_nr_pages = page_count;
ext->oe_mppr = mppr;
list_splice_init(list, &ext->oe_pages);
/* Reuse the initial refcount for RPC, don't drop it */
osc_extent_state_set(ext, OES_LOCK_DONE);
if (!ext->oe_rw) { /* write */
- list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ if (!ext->oe_srvlock && !ext->oe_dio) {
+ /* The most likely case here is from lack of grants
+ * so we are either out of quota or out of space.
+ * Since this means we are holding locks across
+ * potentially multi-striped IO, we must send out
+ * everything out instantly to avoid prolonged
+ * waits resulting in lock eviction (likely since
+ * the extended wait in osc_cache_enter() did not
+ * yield any additional grant due to a timeout.
+ * LU-13131 */
+ ext->oe_hp = 1;
+ list_add_tail(&ext->oe_link, &obj->oo_hp_exts);
+ } else {
+ list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ }
osc_update_pending(obj, OBD_BRW_WRITE, page_count);
} else {
list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
{
struct osc_thread_info *info = osc_env_info(env);
struct osc_object *osc = cbdata;
+ struct cl_page *page = ops->ops_cl.cpl_page;
pgoff_t index;
+ bool discard = false;
index = osc_index(ops);
- if (index >= info->oti_fn_index) {
- struct ldlm_lock *tmp;
- struct cl_page *page = ops->ops_cl.cpl_page;
+ /* negative lock caching */
+ if (index < info->oti_ng_index) {
+ discard = true;
+ } else if (index >= info->oti_fn_index) {
+ struct ldlm_lock *tmp;
/* refresh non-overlapped index */
tmp = osc_dlmlock_at_pgoff(env, osc, index,
- OSC_DAP_FL_TEST_LOCK);
+ OSC_DAP_FL_TEST_LOCK |
+ OSC_DAP_FL_AST | OSC_DAP_FL_RIGHT);
if (tmp != NULL) {
__u64 end = tmp->l_policy_data.l_extent.end;
- /* Cache the first-non-overlapped index so as to skip
- * all pages within [index, oti_fn_index). This is safe
- * because if tmp lock is canceled, it will discard
- * these pages. */
- info->oti_fn_index = cl_index(osc2cl(osc), end + 1);
- if (end == OBD_OBJECT_EOF)
- info->oti_fn_index = CL_PAGE_EOF;
+ __u64 start = tmp->l_policy_data.l_extent.start;
+
+ /* no lock covering this page */
+ if (index < cl_index(osc2cl(osc), start)) {
+ /* no lock at @index, first lock at @start */
+ info->oti_ng_index = cl_index(osc2cl(osc),
+ start);
+ discard = true;
+ } else {
+ /* Cache the first-non-overlapped index so as to
+ * skip all pages within [index, oti_fn_index).
+ * This is safe because if tmp lock is canceled,
+ * it will discard these pages.
+ */
+ info->oti_fn_index = cl_index(osc2cl(osc),
+ end + 1);
+ if (end == OBD_OBJECT_EOF)
+ info->oti_fn_index = CL_PAGE_EOF;
+ }
LDLM_LOCK_PUT(tmp);
- } else if (cl_page_own(env, io, page) == 0) {
- /* discard the page */
+ } else {
+ info->oti_ng_index = CL_PAGE_EOF;
+ discard = true;
+ }
+ }
+
+ if (discard) {
+ if (cl_page_own(env, io, page) == 0) {
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
} else {
cb = discard ? osc_discard_cb : check_and_discard_cb;
info->oti_fn_index = info->oti_next_index = start;
+ info->oti_ng_index = 0;
osc_page_gang_lookup(env, io, osc,
info->oti_next_index, end, cb, osc);