Whamcloud - gitweb
LU-14711 tests: Ensure there's no eviction with long cache discard
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index 56fd9e4..3255696 100644 (file)
@@ -28,7 +28,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * osc cache management.
  *
@@ -38,6 +37,7 @@
 #define DEBUG_SUBSYSTEM S_OSC
 
 #include <lustre_osc.h>
+#include <lustre_dlm.h>
 
 #include "osc_internal.h"
 
@@ -96,7 +96,7 @@ static inline char *ext_flags(struct osc_extent *ext, char *flags)
 
 #define EXTSTR       "[%lu -> %lu/%lu]"
 #define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
-static const char *oes_strings[] = {
+static const char *const oes_strings[] = {
        "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
 
 #define OSC_EXTENT_DUMP_WITH_LOC(file, func, line, mask, extent, fmt, ...) do {\
@@ -110,7 +110,7 @@ static const char *oes_strings[] = {
                /* ----- extent part 0 ----- */                               \
                __ext, EXTPARA(__ext),                                        \
                /* ----- part 1 ----- */                                      \
-               atomic_read(&__ext->oe_refc),                                 \
+               kref_read(&__ext->oe_refc),                                   \
                atomic_read(&__ext->oe_users),                                \
                list_empty_marker(&__ext->oe_link),                           \
                oes_strings[__ext->oe_state], ext_flags(__ext, __buf),        \
@@ -186,10 +186,10 @@ static int osc_extent_sanity_check0(struct osc_extent *ext,
        if (ext->oe_state >= OES_STATE_MAX)
                GOTO(out, rc = 10);
 
-       if (atomic_read(&ext->oe_refc) <= 0)
+       if (kref_read(&ext->oe_refc) <= 0)
                GOTO(out, rc = 20);
 
-       if (atomic_read(&ext->oe_refc) < atomic_read(&ext->oe_users))
+       if (kref_read(&ext->oe_refc) < atomic_read(&ext->oe_users))
                GOTO(out, rc = 30);
 
        switch (ext->oe_state) {
@@ -329,7 +329,7 @@ static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
        RB_CLEAR_NODE(&ext->oe_node);
        ext->oe_obj = obj;
        cl_object_get(osc2cl(obj));
-       atomic_set(&ext->oe_refc, 1);
+       kref_init(&ext->oe_refc);
        atomic_set(&ext->oe_users, 0);
        INIT_LIST_HEAD(&ext->oe_link);
        ext->oe_state = OES_INV;
@@ -340,35 +340,50 @@ static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
        return ext;
 }
 
-static void osc_extent_free(struct osc_extent *ext)
+static void osc_extent_free(struct kref *kref)
 {
+       struct osc_extent *ext = container_of(kref, struct osc_extent,
+                                             oe_refc);
+
+       LASSERT(list_empty(&ext->oe_link));
+       LASSERT(atomic_read(&ext->oe_users) == 0);
+       LASSERT(ext->oe_state == OES_INV);
+       LASSERT(RB_EMPTY_NODE(&ext->oe_node));
+
+       if (ext->oe_dlmlock) {
+               lu_ref_del(&ext->oe_dlmlock->l_reference,
+                          "osc_extent", ext);
+               LDLM_LOCK_PUT(ext->oe_dlmlock);
+               ext->oe_dlmlock = NULL;
+       }
+#if 0
+       /* If/When cl_object_put drops the need for 'env',
+        * this code can be enabled, and matching code in
+        * osc_extent_put removed.
+        */
+       cl_object_put(osc2cl(ext->oe_obj));
+
        OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
+#endif
 }
 
 static struct osc_extent *osc_extent_get(struct osc_extent *ext)
 {
-       LASSERT(atomic_read(&ext->oe_refc) >= 0);
-       atomic_inc(&ext->oe_refc);
+       LASSERT(kref_read(&ext->oe_refc) >= 0);
+       kref_get(&ext->oe_refc);
        return ext;
 }
 
 static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
 {
-       LASSERT(atomic_read(&ext->oe_refc) > 0);
-       if (atomic_dec_and_test(&ext->oe_refc)) {
-               LASSERT(list_empty(&ext->oe_link));
-               LASSERT(atomic_read(&ext->oe_users) == 0);
-               LASSERT(ext->oe_state == OES_INV);
-               LASSERT(RB_EMPTY_NODE(&ext->oe_node));
-
-               if (ext->oe_dlmlock != NULL) {
-                       lu_ref_del(&ext->oe_dlmlock->l_reference,
-                                  "osc_extent", ext);
-                       LDLM_LOCK_RELEASE(ext->oe_dlmlock);
-                       ext->oe_dlmlock = NULL;
-               }
+       LASSERT(kref_read(&ext->oe_refc) > 0);
+       if (kref_put(&ext->oe_refc, osc_extent_free)) {
+               /* This should be in osc_extent_free(), but
+                * while we need to pass 'env' it cannot be.
+                */
                cl_object_put(env, osc2cl(ext->oe_obj));
-               osc_extent_free(ext);
+
+               OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
        }
 }
 
@@ -379,9 +394,9 @@ static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
  */
 static void osc_extent_put_trust(struct osc_extent *ext)
 {
-       LASSERT(atomic_read(&ext->oe_refc) > 1);
+       LASSERT(kref_read(&ext->oe_refc) > 1);
        assert_osc_object_is_locked(ext->oe_obj);
-       atomic_dec(&ext->oe_refc);
+       osc_extent_put(NULL, ext);
 }
 
 /**
@@ -515,12 +530,21 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
        if (victim == NULL)
                return -EINVAL;
 
-       if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait)
+       if (victim->oe_state != OES_INV &&
+           (victim->oe_state != OES_CACHE || victim->oe_fsync_wait))
                return -EBUSY;
 
        if (cur->oe_max_end != victim->oe_max_end)
                return -ERANGE;
 
+       /*
+        * In the rare case max_pages_per_rpc (mppr) is changed, don't
+        * merge extents until after old ones have been sent, or the
+        * "extents are aligned to RPCs" checks are unhappy.
+        */
+       if (cur->oe_mppr != victim->oe_mppr)
+               return -ERANGE;
+
        LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
        ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
        chunk_start = cur->oe_start >> ppc_bits;
@@ -546,7 +570,6 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
        cur->oe_urgent   |= victim->oe_urgent;
        cur->oe_memalloc |= victim->oe_memalloc;
        list_splice_init(&victim->oe_pages, &cur->oe_pages);
-       list_del_init(&victim->oe_link);
        victim->oe_nr_pages = 0;
 
        osc_extent_get(victim);
@@ -560,11 +583,10 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
 /**
  * Drop user count of osc_extent, and unplug IO asynchronously.
  */
-int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
+void osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
 {
        struct osc_object *obj = ext->oe_obj;
        struct client_obd *cli = osc_cli(obj);
-       int rc = 0;
        ENTRY;
 
        LASSERT(atomic_read(&ext->oe_users) > 0);
@@ -593,7 +615,10 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
                        if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
                                grant += cli->cl_grant_extent_tax;
 
-                       if (ext->oe_urgent)
+                       if (ext->oe_hp)
+                               list_move_tail(&ext->oe_link,
+                                              &obj->oo_hp_exts);
+                       else if (ext->oe_urgent)
                                list_move_tail(&ext->oe_link,
                                               &obj->oo_urgent_exts);
                        else if (ext->oe_nr_pages == ext->oe_mppr) {
@@ -608,7 +633,8 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
                osc_io_unplug_async(env, cli, obj);
        }
        osc_extent_put(env, ext);
-       RETURN(rc);
+
+       RETURN_EXIT;
 }
 
 /**
@@ -671,7 +697,7 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
                cur->oe_start = descr->cld_start;
        if (cur->oe_end > max_end)
                cur->oe_end = max_end;
-       cur->oe_grants  = 0;
+       cur->oe_grants  = chunksize + cli->cl_grant_extent_tax;
        cur->oe_mppr    = max_pages;
        if (olck->ols_dlmlock != NULL) {
                LASSERT(olck->ols_hold);
@@ -739,54 +765,21 @@ restart:
                         * flushed, try next one. */
                        continue;
 
-               /* check if they belong to the same rpc slot before trying to
-                * merge. the extents are not overlapped and contiguous at
-                * chunk level to get here. */
-               if (ext->oe_max_end != max_end)
-                       /* if they don't belong to the same RPC slot or
-                        * max_pages_per_rpc has ever changed, do not merge. */
-                       continue;
-
-               /* check whether maximum extent size will be hit */
-               if ((ext_chk_end - ext_chk_start + 1 + 1) << ppc_bits >
-                   cli->cl_max_extent_pages)
-                       continue;
-
-               /* it's required that an extent must be contiguous at chunk
-                * level so that we know the whole extent is covered by grant
-                * (the pages in the extent are NOT required to be contiguous).
-                * Otherwise, it will be too much difficult to know which
-                * chunks have grants allocated. */
-
-               /* try to do front merge - extend ext's start */
-               if (chunk + 1 == ext_chk_start) {
-                       /* ext must be chunk size aligned */
-                       EASSERT((ext->oe_start & ~chunk_mask) == 0, ext);
-
-                       /* pull ext's start back to cover cur */
-                       ext->oe_start   = cur->oe_start;
-                       ext->oe_grants += chunksize;
+               if (osc_extent_merge(env, ext, cur) == 0) {
                        LASSERT(*grants >= chunksize);
                        *grants -= chunksize;
 
-                       found = osc_extent_hold(ext);
-               } else if (chunk == ext_chk_end + 1) {
-                       /* rear merge */
-                       ext->oe_end     = cur->oe_end;
-                       ext->oe_grants += chunksize;
-                       LASSERT(*grants >= chunksize);
-                       *grants -= chunksize;
-
-                       /* try to merge with the next one because we just fill
-                        * in a gap */
+                       /*
+                        * Try to merge with the next one too because we
+                        * might have just filled in a gap.
+                        */
                        if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
                                /* we can save extent tax from next extent */
                                *grants += cli->cl_grant_extent_tax;
 
                        found = osc_extent_hold(ext);
-               }
-               if (found != NULL)
                        break;
+               }
        }
 
        osc_extent_tree_dump(D_CACHE, obj);
@@ -800,7 +793,6 @@ restart:
        } else if (conflict == NULL) {
                /* create a new extent */
                EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
-               cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
                LASSERT(*grants >= cur->oe_grants);
                *grants -= cur->oe_grants;
 
@@ -1118,7 +1110,8 @@ static int osc_extent_make_ready(const struct lu_env *env,
         * the size of file. */
        if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
                int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
-               LASSERT(last_oap_count > 0);
+               LASSERTF(last_oap_count > 0,
+                        "last_oap_count %d\n", last_oap_count);
                LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE);
                last->oap_count = last_oap_count;
                spin_lock(&last->oap_lock);
@@ -1377,7 +1370,6 @@ static void osc_consume_write_grant(struct client_obd *cli,
        pga->flag |= OBD_BRW_FROM_GRANT;
        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
               PAGE_SIZE, pga, pga->pg);
-       osc_update_next_shrink(cli);
 }
 
 /* the companion to osc_consume_write_grant, called when a brw has completed.
@@ -1533,15 +1525,26 @@ out:
        return rc;
 }
 
-static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
+/* Following two inlines exist to pass code fragments
+ * to wait_event_idle_exclusive_timeout_cmd().  Passing
+ * code fragments as macro args can look confusing, so
+ * we provide inlines to encapsulate them.
+ */
+static inline void cli_unlock_and_unplug(const struct lu_env *env,
+                                        struct client_obd *cli,
+                                        struct osc_async_page *oap)
 {
-       int rc;
-       spin_lock(&cli->cl_loi_list_lock);
-       rc = list_empty(&ocw->ocw_entry);
        spin_unlock(&cli->cl_loi_list_lock);
-       return rc;
+       osc_io_unplug_async(env, cli, NULL);
+       CDEBUG(D_CACHE,
+              "%s: sleeping for cache space for %p\n",
+              cli_name(cli), oap);
 }
 
+static inline void cli_lock_after_unplug(struct client_obd *cli)
+{
+       spin_lock(&cli->cl_loi_list_lock);
+}
 /**
  * The main entry to reserve dirty page accounting. Usually the grant reserved
  * in this function will be freed in bulk in osc_free_grant() unless it fails
@@ -1552,10 +1555,22 @@ static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
 static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
                           struct osc_async_page *oap, int bytes)
 {
-       struct osc_object       *osc = oap->oap_obj;
-       struct lov_oinfo        *loi = osc->oo_oinfo;
-       struct osc_cache_waiter  ocw;
-       int                      rc = -EDQUOT;
+       struct osc_object *osc = oap->oap_obj;
+       struct lov_oinfo *loi = osc->oo_oinfo;
+       int rc = -EDQUOT;
+       int remain;
+       bool entered = false;
+       /* We cannot wait for a long time here since we are holding ldlm lock
+        * across the actual IO. If no requests complete fast (e.g. due to
+        * overloaded OST that takes a long time to process everything, we'd
+        * get evicted if we wait for a normal obd_timeout or some such.
+        * So we try to wait half the time it would take the client to be
+        * evicted by server which is half obd_timeout when AT is off
+        * or at least ldlm_enqueue_min with AT on.
+        * See LU-13131 */
+       unsigned long timeout = cfs_time_seconds(AT_OFF ? obd_timeout / 2 :
+                                                         ldlm_enqueue_min / 2);
+
        ENTRY;
 
        OSC_DUMP_GRANT(D_CACHE, cli, "need:%d\n", bytes);
@@ -1571,83 +1586,40 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
                GOTO(out, rc = -EDQUOT);
        }
 
-       /* Hopefully normal case - cache space and write credits available */
-       if (list_empty(&cli->cl_cache_waiters) &&
-           osc_enter_cache_try(cli, oap, bytes)) {
-               OSC_DUMP_GRANT(D_CACHE, cli, "granted from cache\n");
-               GOTO(out, rc = 0);
-       }
-
-       /* We can get here for two reasons: too many dirty pages in cache, or
+       /*
+        * We can wait here for two reasons: too many dirty pages in cache, or
         * run out of grants. In both cases we should write dirty pages out.
         * Adding a cache waiter will trigger urgent write-out no matter what
         * RPC size will be.
-        * The exiting condition is no avail grants and no dirty pages caching,
-        * that really means there is no space on the OST. */
-       init_waitqueue_head(&ocw.ocw_waitq);
-       ocw.ocw_oap   = oap;
-       ocw.ocw_grant = bytes;
-       while (cli->cl_dirty_pages > 0 || cli->cl_w_in_flight > 0) {
-               list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
-               ocw.ocw_rc = 0;
-               spin_unlock(&cli->cl_loi_list_lock);
-
-               osc_io_unplug_async(env, cli, NULL);
-
-               CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
-                      cli_name(cli), &ocw, oap);
-
-               rc = wait_event_idle_timeout(ocw.ocw_waitq,
-                                            ocw_granted(cli, &ocw),
-                                            cfs_time_seconds(AT_OFF ?
-                                                             obd_timeout :
-                                                             at_max));
-
-               spin_lock(&cli->cl_loi_list_lock);
-
-               if (rc <= 0) {
-                       /* wait_event_idle_timeout timed out */
-                       list_del_init(&ocw.ocw_entry);
-                       if (rc == 0)
-                               rc = -ETIMEDOUT;
-                       break;
-               }
-               LASSERT(list_empty(&ocw.ocw_entry));
-               rc = ocw.ocw_rc;
-
-               if (rc != -EDQUOT)
-                       break;
-               if (osc_enter_cache_try(cli, oap, bytes)) {
-                       rc = 0;
-                       break;
-               }
-       }
-
-       switch (rc) {
-       case 0:
-               OSC_DUMP_GRANT(D_CACHE, cli, "finally got grant space\n");
-               break;
-       case -ETIMEDOUT:
+        * The exiting condition (other than success) is no avail grants
+        * and no dirty pages caching, that really means there is no space
+        * on the OST.
+        */
+       remain = wait_event_idle_exclusive_timeout_cmd(
+               cli->cl_cache_waiters,
+               (entered = osc_enter_cache_try(cli, oap, bytes)) ||
+               (cli->cl_dirty_pages == 0 && cli->cl_w_in_flight == 0),
+               timeout,
+               cli_unlock_and_unplug(env, cli, oap),
+               cli_lock_after_unplug(cli));
+
+       if (entered) {
+               if (remain == timeout)
+                       OSC_DUMP_GRANT(D_CACHE, cli, "granted from cache\n");
+               else
+                       OSC_DUMP_GRANT(D_CACHE, cli,
+                                      "finally got grant space\n");
+               wake_up(&cli->cl_cache_waiters);
+               rc = 0;
+       } else if (remain == 0) {
                OSC_DUMP_GRANT(D_CACHE, cli,
                               "timeout, fall back to sync i/o\n");
                osc_extent_tree_dump(D_CACHE, osc);
                /* fall back to synchronous I/O */
-               rc = -EDQUOT;
-               break;
-       case -EINTR:
-               /* Ensures restartability - LU-3581 */
-               OSC_DUMP_GRANT(D_CACHE, cli, "interrupted\n");
-               rc = -ERESTARTSYS;
-               break;
-       case -EDQUOT:
+       } else {
                OSC_DUMP_GRANT(D_CACHE, cli,
                               "no grant space, fall back to sync i/o\n");
-               break;
-       default:
-               CDEBUG(D_CACHE, "%s: event for cache space @ %p never arrived "
-                      "due to %d, fall back to sync i/o\n",
-                      cli_name(cli), &ocw, rc);
-               break;
+               wake_up_all(&cli->cl_cache_waiters);
        }
        EXIT;
 out:
@@ -1655,36 +1627,6 @@ out:
        RETURN(rc);
 }
 
-/* caller must hold loi_list_lock */
-void osc_wake_cache_waiters(struct client_obd *cli)
-{
-       struct list_head *l, *tmp;
-       struct osc_cache_waiter *ocw;
-
-       ENTRY;
-       list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
-               ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
-
-               ocw->ocw_rc = -EDQUOT;
-
-               if (osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant))
-                       ocw->ocw_rc = 0;
-
-               if (ocw->ocw_rc == 0 ||
-                   !(cli->cl_dirty_pages > 0 || cli->cl_w_in_flight > 0)) {
-                       list_del_init(&ocw->ocw_entry);
-                       CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant "
-                              "%ld, %d\n", ocw, ocw->ocw_oap,
-                              cli->cl_avail_grant, ocw->ocw_rc);
-
-                       wake_up(&ocw->ocw_waitq);
-               }
-       }
-
-       EXIT;
-}
-EXPORT_SYMBOL(osc_wake_cache_waiters);
-
 static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
 {
        int hprpc = !!list_empty(&osc->oo_hp_exts);
@@ -1724,8 +1666,9 @@ static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
                }
                /* trigger a write rpc stream as long as there are dirtiers
                 * waiting for space.  as they're waiting, they're not going to
-                * create more pages to coalesce with what's waiting.. */
-               if (!list_empty(&cli->cl_cache_waiters)) {
+                * create more pages to coalesce with what's waiting..
+                */
+               if (waitqueue_active(&cli->cl_cache_waiters)) {
                        CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
                        RETURN(1);
                }
@@ -1908,6 +1851,9 @@ can_merge(const struct osc_extent *ext, const struct osc_extent *in_rpc)
        if (in_rpc->oe_dio && overlapped(ext, in_rpc))
                return false;
 
+       if (ext->oe_is_rdma_only != in_rpc->oe_is_rdma_only)
+               return false;
+
        return true;
 }
 
@@ -1991,10 +1937,9 @@ static unsigned int get_write_extents(struct osc_object *obj,
        };
 
        assert_osc_object_is_locked(obj);
-       while (!list_empty(&obj->oo_hp_exts)) {
-               ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
-                                oe_link);
-               LASSERT(ext->oe_state == OES_CACHE);
+       while ((ext = list_first_entry_or_null(&obj->oo_hp_exts,
+                                              struct osc_extent,
+                                              oe_link)) != NULL) {
                if (!try_to_add_extent_for_io(cli, ext, &data))
                        return data.erd_page_count;
                EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
@@ -2002,9 +1947,9 @@ static unsigned int get_write_extents(struct osc_object *obj,
        if (data.erd_page_count == data.erd_max_pages)
                return data.erd_page_count;
 
-       while (!list_empty(&obj->oo_urgent_exts)) {
-               ext = list_entry(obj->oo_urgent_exts.next,
-                                struct osc_extent, oe_link);
+       while ((ext = list_first_entry_or_null(&obj->oo_urgent_exts,
+                                              struct osc_extent,
+                                              oe_link)) != NULL) {
                if (!try_to_add_extent_for_io(cli, ext, &data))
                        return data.erd_page_count;
        }
@@ -2015,10 +1960,11 @@ static unsigned int get_write_extents(struct osc_object *obj,
         * extents can usually only be added if the rpclist was empty, so if we
         * can't add one, we continue on to trying to add normal extents.  This
         * is so we don't miss adding extra extents to an RPC containing high
-        * priority or urgent extents. */
-       while (!list_empty(&obj->oo_full_exts)) {
-               ext = list_entry(obj->oo_full_exts.next,
-                                struct osc_extent, oe_link);
+        * priority or urgent extents.
+        */
+       while ((ext = list_first_entry_or_null(&obj->oo_full_exts,
+                                              struct osc_extent,
+                                              oe_link)) != NULL) {
                if (!try_to_add_extent_for_io(cli, ext, &data))
                        break;
        }
@@ -2177,8 +2123,9 @@ static struct osc_object *osc_next_obj(struct client_obd *cli)
        /* then if we have cache waiters, return all objects with queued
         * writes.  This is especially important when many small files
         * have filled up the cache and not been fired into rpcs because
-        * they don't pass the nr_pending/object threshhold */
-       if (!list_empty(&cli->cl_cache_waiters) &&
+        * they don't pass the nr_pending/object threshhold
+        */
+       if (waitqueue_active(&cli->cl_cache_waiters) &&
            !list_empty(&cli->cl_loi_write_list))
                RETURN(list_to_obj(&cli->cl_loi_write_list, write_item));
 
@@ -2209,7 +2156,12 @@ __must_hold(&cli->cl_loi_list_lock)
 
                OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
 
-               if (osc_max_rpc_in_flight(cli, osc)) {
+               /* even if we have reached our max in flight RPCs, we still
+                * allow all high-priority RPCs through to prevent their
+                * starvation and leading to server evicting us for not
+                * writing out pages in a timely manner LU-13131 */
+               if (osc_max_rpc_in_flight(cli, osc) &&
+                   list_empty(&osc->oo_hp_exts)) {
                        __osc_list_maint(cli, osc);
                        break;
                }
@@ -2286,10 +2238,11 @@ int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
 EXPORT_SYMBOL(osc_io_unplug0);
 
 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
-                       struct page *page, loff_t offset)
+                       struct cl_page *page, loff_t offset)
 {
        struct obd_export     *exp = osc_export(osc);
        struct osc_async_page *oap = &ops->ops_oap;
+       struct page           *vmpage = page->cp_vmpage;
        ENTRY;
 
        if (!page)
@@ -2299,16 +2252,24 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
        oap->oap_cli = &exp->exp_obd->u.cli;
        oap->oap_obj = osc;
 
-       oap->oap_page = page;
+       oap->oap_page = vmpage;
        oap->oap_obj_off = offset;
        LASSERT(!(offset & ~PAGE_MASK));
 
+       /* Count of transient (direct i/o) pages is always stable by the time
+        * they're submitted.  Setting this here lets us avoid calling
+        * cl_page_clip later to set this.
+        */
+       if (page->cp_type == CPT_TRANSIENT)
+               oap->oap_async_flags |= ASYNC_COUNT_STABLE|ASYNC_URGENT|
+                                       ASYNC_READY;
+
        INIT_LIST_HEAD(&oap->oap_pending_item);
        INIT_LIST_HEAD(&oap->oap_rpc_item);
 
        spin_lock_init(&oap->oap_lock);
-       CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
-              oap, page, oap->oap_obj_off);
+       CDEBUG(D_INFO, "oap %p vmpage %p obj off %llu\n",
+              oap, vmpage, oap->oap_obj_off);
        RETURN(0);
 }
 EXPORT_SYMBOL(osc_prep_async_page);
@@ -2349,7 +2310,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
        }
 
        /* check if the file's owner/group is over quota */
-       if (!(cmd & OBD_BRW_NOQUOTA)) {
+       if (!io->ci_noquota) {
                struct cl_object *obj;
                struct cl_attr   *attr;
                unsigned int qid[LL_MAXQUOTAS];
@@ -2372,7 +2333,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
 
        oap->oap_cmd = cmd;
        oap->oap_page_off = ops->ops_from;
-       oap->oap_count = ops->ops_to - ops->ops_from;
+       oap->oap_count = ops->ops_to - ops->ops_from + 1;
        /* No need to hold a lock here,
         * since this page is not in any list yet. */
        oap->oap_async_flags = 0;
@@ -2586,7 +2547,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
        oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
        spin_unlock(&oap->oap_lock);
 
-       if (memory_pressure_get())
+       if (current->flags & PF_MEMALLOC)
                ext->oe_memalloc = 1;
 
        ext->oe_urgent = 1;
@@ -2608,7 +2569,7 @@ out:
        return rc;
 }
 
-int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
                         struct osc_object *obj, struct list_head *list,
                         int brw_flags)
 {
@@ -2633,7 +2594,8 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
                ++page_count;
                mppr <<= (page_count > mppr);
 
-               if (unlikely(opg->ops_from > 0 || opg->ops_to < PAGE_SIZE))
+               if (unlikely(opg->ops_from > 0 ||
+                            opg->ops_to < PAGE_SIZE - 1))
                        can_merge = false;
        }
 
@@ -2658,6 +2620,40 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
        ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
        ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
        ext->oe_dio = !!(brw_flags & OBD_BRW_NOCACHE);
+       if (ext->oe_dio && !ext->oe_rw) { /* direct io write */
+               int grants;
+               int ppc;
+
+               ppc = 1 << (cli->cl_chunkbits - PAGE_SHIFT);
+               grants = cli->cl_grant_extent_tax;
+               grants += (1 << cli->cl_chunkbits) *
+                       ((page_count + ppc - 1) / ppc);
+
+               CDEBUG(D_CACHE, "requesting %d bytes grant\n", grants);
+               spin_lock(&cli->cl_loi_list_lock);
+               if (osc_reserve_grant(cli, grants) == 0) {
+                       list_for_each_entry(oap, list, oap_pending_item) {
+                               osc_consume_write_grant(cli,
+                                                       &oap->oap_brw_page);
+                       }
+                       atomic_long_add(page_count, &obd_dirty_pages);
+                       osc_unreserve_grant_nolock(cli, grants, 0);
+                       ext->oe_grants = grants;
+               } else {
+                       /* We cannot report ENOSPC correctly if we do parallel
+                        * DIO (async RPC submission), so turn off parallel dio
+                        * if there is not sufficient grant available.  This
+                        * makes individual RPCs synchronous.
+                        */
+                       io->ci_parallel_dio = false;
+                       CDEBUG(D_CACHE,
+                       "not enough grant available, switching to sync for this i/o\n");
+               }
+               spin_unlock(&cli->cl_loi_list_lock);
+               osc_update_next_shrink(cli);
+       }
+
+       ext->oe_is_rdma_only = !!(brw_flags & OBD_BRW_RDMA_ONLY);
        ext->oe_nr_pages = page_count;
        ext->oe_mppr = mppr;
        list_splice_init(list, &ext->oe_pages);
@@ -2667,7 +2663,21 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
        /* Reuse the initial refcount for RPC, don't drop it */
        osc_extent_state_set(ext, OES_LOCK_DONE);
        if (!ext->oe_rw) { /* write */
-               list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+               if (!ext->oe_srvlock && !ext->oe_dio) {
+                       /* The most likely case here is from lack of grants
+                        * so we are either out of quota or out of space.
+                        * Since this means we are holding locks across
+                        * potentially multi-striped IO, we must send out
+                        * everything out instantly to avoid prolonged
+                        * waits resulting in lock eviction (likely since
+                        * the extended wait in osc_cache_enter() did not
+                        * yield any additional grant due to a timeout.
+                        * LU-13131 */
+                       ext->oe_hp = 1;
+                       list_add_tail(&ext->oe_link, &obj->oo_hp_exts);
+               } else {
+                       list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+               }
                osc_update_pending(obj, OBD_BRW_WRITE, page_count);
        } else {
                list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
@@ -2748,10 +2758,11 @@ again:
 
        osc_list_maint(cli, obj);
 
-       while (!list_empty(&list)) {
+       while ((ext = list_first_entry_or_null(&list,
+                                              struct osc_extent,
+                                              oe_link)) != NULL) {
                int rc;
 
-               ext = list_entry(list.next, struct osc_extent, oe_link);
                list_del_init(&ext->oe_link);
 
                /* extent may be in OES_ACTIVE state because inode mutex
@@ -2932,7 +2943,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
                                        EASSERT(!ext->oe_hp, ext);
                                        ext->oe_hp = 1;
                                        list = &obj->oo_hp_exts;
-                               } else if (!ext->oe_urgent) {
+                               } else if (!ext->oe_urgent && !ext->oe_hp) {
                                        ext->oe_urgent = 1;
                                        list = &obj->oo_urgent_exts;
                                }
@@ -3092,11 +3103,10 @@ bool osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
                spin_unlock(&osc->oo_tree_lock);
                tree_lock = false;
 
+               res = (*cb)(env, io, pvec, j, cbdata);
+
                for (i = 0; i < j; ++i) {
                        ops = pvec[i];
-                       if (res)
-                               res = (*cb)(env, io, ops, cbdata);
-
                        page = ops->ops_cl.cpl_page;
                        lu_ref_del(&page->cp_reference, "gang_lookup", current);
                        cl_pagevec_put(env, page, pagevec);
@@ -3108,6 +3118,18 @@ bool osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
 
                if (!res)
                        break;
+
+               OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SLOW_PAGE_EVICT,
+                                cfs_fail_val ?: 20);
+
+               if (io->ci_type == CIT_MISC &&
+                   io->u.ci_misc.lm_next_rpc_time &&
+                   ktime_get_seconds() > io->u.ci_misc.lm_next_rpc_time) {
+                       osc_send_empty_rpc(osc, idx << PAGE_SHIFT);
+                       io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() +
+                                                        5 * obd_timeout / 16;
+               }
+
                if (need_resched())
                        cond_resched();
 
@@ -3124,62 +3146,101 @@ EXPORT_SYMBOL(osc_page_gang_lookup);
  * Check if page @page is covered by an extra lock or discard it.
  */
 static bool check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
-                               struct osc_page *ops, void *cbdata)
+                                void **pvec, int count, void *cbdata)
 {
        struct osc_thread_info *info = osc_env_info(env);
        struct osc_object *osc = cbdata;
-       pgoff_t index;
+       int i;
 
-       index = osc_index(ops);
-       if (index >= info->oti_fn_index) {
-               struct ldlm_lock *tmp;
+       for (i = 0; i < count; i++) {
+               struct osc_page *ops = pvec[i];
                struct cl_page *page = ops->ops_cl.cpl_page;
+               pgoff_t index = osc_index(ops);
+               bool discard = false;
+
+               /* negative lock caching */
+               if (index < info->oti_ng_index) {
+                       discard = true;
+               } else if (index >= info->oti_fn_index) {
+                       struct ldlm_lock *tmp;
+                       /* refresh non-overlapped index */
+                       tmp = osc_dlmlock_at_pgoff(env, osc, index,
+                                       OSC_DAP_FL_TEST_LOCK |
+                                       OSC_DAP_FL_AST |
+                                       OSC_DAP_FL_RIGHT);
+                       if (tmp != NULL) {
+                               __u64 end =
+                                       tmp->l_policy_data.l_extent.end;
+                               __u64 start =
+                                       tmp->l_policy_data.l_extent.start;
+
+                               /* no lock covering this page */
+                               if (index < cl_index(osc2cl(osc), start)) {
+                                       /* no lock at @index,
+                                        * first lock at @start
+                                        */
+                                       info->oti_ng_index =
+                                               cl_index(osc2cl(osc), start);
+                                       discard = true;
+                               } else {
+                                       /* Cache the first-non-overlapped
+                                        * index so as to skip all pages
+                                        * within [index, oti_fn_index).
+                                        * This is safe because if tmp lock
+                                        * is canceled, it will discard these
+                                        * pages.
+                                        */
+                                       info->oti_fn_index =
+                                               cl_index(osc2cl(osc), end + 1);
+                                       if (end == OBD_OBJECT_EOF)
+                                               info->oti_fn_index =
+                                                       CL_PAGE_EOF;
+                               }
+                               LDLM_LOCK_PUT(tmp);
+                       } else {
+                               info->oti_ng_index = CL_PAGE_EOF;
+                               discard = true;
+                       }
+               }
 
-               /* refresh non-overlapped index */
-               tmp = osc_dlmlock_at_pgoff(env, osc, index,
-                                          OSC_DAP_FL_TEST_LOCK);
-               if (tmp != NULL) {
-                       __u64 end = tmp->l_policy_data.l_extent.end;
-                       /* Cache the first-non-overlapped index so as to skip
-                        * all pages within [index, oti_fn_index). This is safe
-                        * because if tmp lock is canceled, it will discard
-                        * these pages. */
-                       info->oti_fn_index = cl_index(osc2cl(osc), end + 1);
-                       if (end == OBD_OBJECT_EOF)
-                               info->oti_fn_index = CL_PAGE_EOF;
-                       LDLM_LOCK_PUT(tmp);
-               } else if (cl_page_own(env, io, page) == 0) {
-                       /* discard the page */
-                       cl_page_discard(env, io, page);
-                       cl_page_disown(env, io, page);
-               } else {
-                       LASSERT(page->cp_state == CPS_FREEING);
+               if (discard) {
+                       if (cl_page_own(env, io, page) == 0) {
+                               cl_page_discard(env, io, page);
+                               cl_page_disown(env, io, page);
+                       } else {
+                               LASSERT(page->cp_state == CPS_FREEING);
+                       }
                }
-       }
 
-       info->oti_next_index = index + 1;
+               info->oti_next_index = index + 1;
+       }
        return true;
 }
 
 bool osc_discard_cb(const struct lu_env *env, struct cl_io *io,
-                  struct osc_page *ops, void *cbdata)
+                   void **pvec, int count, void *cbdata)
 {
        struct osc_thread_info *info = osc_env_info(env);
-       struct cl_page *page = ops->ops_cl.cpl_page;
-
-       /* page is top page. */
-       info->oti_next_index = osc_index(ops) + 1;
-       if (cl_page_own(env, io, page) == 0) {
-               if (!ergo(page->cp_type == CPT_CACHEABLE,
-                         !PageDirty(cl_page_vmpage(page))))
-                       CL_PAGE_DEBUG(D_ERROR, env, page,
-                                       "discard dirty page?\n");
-
-               /* discard the page */
-               cl_page_discard(env, io, page);
-               cl_page_disown(env, io, page);
-       } else {
-               LASSERT(page->cp_state == CPS_FREEING);
+       int i;
+
+       for (i = 0; i < count; i++) {
+               struct osc_page *ops = pvec[i];
+               struct cl_page *page = ops->ops_cl.cpl_page;
+
+               /* page is top page. */
+               info->oti_next_index = osc_index(ops) + 1;
+               if (cl_page_own(env, io, page) == 0) {
+                       if (!ergo(page->cp_type == CPT_CACHEABLE,
+                                 !PageDirty(cl_page_vmpage(page))))
+                               CL_PAGE_DEBUG(D_ERROR, env, page,
+                                             "discard dirty page?\n");
+
+                       /* discard the page */
+                       cl_page_discard(env, io, page);
+                       cl_page_disown(env, io, page);
+               } else {
+                       LASSERT(page->cp_state == CPS_FREEING);
+               }
        }
 
        return true;
@@ -3206,12 +3267,15 @@ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
 
        io->ci_obj = cl_object_top(osc2cl(osc));
        io->ci_ignore_layout = 1;
+       io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() +
+                                        5 * obd_timeout / 16;
        result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
        if (result != 0)
                GOTO(out, result);
 
        cb = discard ? osc_discard_cb : check_and_discard_cb;
        info->oti_fn_index = info->oti_next_index = start;
+       info->oti_ng_index = 0;
 
        osc_page_gang_lookup(env, io, osc,
                             info->oti_next_index, end, cb, osc);