Whamcloud - gitweb
LU-9679 osc: simplify osc_extent_find()
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index bf183d4..e0c9901 100644 (file)
@@ -38,6 +38,7 @@
 #define DEBUG_SUBSYSTEM S_OSC
 
 #include <lustre_osc.h>
+#include <lustre_dlm.h>
 
 #include "osc_internal.h"
 
@@ -110,7 +111,7 @@ static const char *oes_strings[] = {
                /* ----- extent part 0 ----- */                               \
                __ext, EXTPARA(__ext),                                        \
                /* ----- part 1 ----- */                                      \
-               atomic_read(&__ext->oe_refc),                                 \
+               kref_read(&__ext->oe_refc),                                   \
                atomic_read(&__ext->oe_users),                                \
                list_empty_marker(&__ext->oe_link),                           \
                oes_strings[__ext->oe_state], ext_flags(__ext, __buf),        \
@@ -186,10 +187,10 @@ static int osc_extent_sanity_check0(struct osc_extent *ext,
        if (ext->oe_state >= OES_STATE_MAX)
                GOTO(out, rc = 10);
 
-       if (atomic_read(&ext->oe_refc) <= 0)
+       if (kref_read(&ext->oe_refc) <= 0)
                GOTO(out, rc = 20);
 
-       if (atomic_read(&ext->oe_refc) < atomic_read(&ext->oe_users))
+       if (kref_read(&ext->oe_refc) < atomic_read(&ext->oe_users))
                GOTO(out, rc = 30);
 
        switch (ext->oe_state) {
@@ -329,7 +330,7 @@ static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
        RB_CLEAR_NODE(&ext->oe_node);
        ext->oe_obj = obj;
        cl_object_get(osc2cl(obj));
-       atomic_set(&ext->oe_refc, 1);
+       kref_init(&ext->oe_refc);
        atomic_set(&ext->oe_users, 0);
        INIT_LIST_HEAD(&ext->oe_link);
        ext->oe_state = OES_INV;
@@ -340,35 +341,50 @@ static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
        return ext;
 }
 
-static void osc_extent_free(struct osc_extent *ext)
+static void osc_extent_free(struct kref *kref)
 {
+       struct osc_extent *ext = container_of(kref, struct osc_extent,
+                                             oe_refc);
+
+       LASSERT(list_empty(&ext->oe_link));
+       LASSERT(atomic_read(&ext->oe_users) == 0);
+       LASSERT(ext->oe_state == OES_INV);
+       LASSERT(RB_EMPTY_NODE(&ext->oe_node));
+
+       if (ext->oe_dlmlock) {
+               lu_ref_del(&ext->oe_dlmlock->l_reference,
+                          "osc_extent", ext);
+               LDLM_LOCK_PUT(ext->oe_dlmlock);
+               ext->oe_dlmlock = NULL;
+       }
+#if 0
+       /* If/When cl_object_put drops the need for 'env',
+        * this code can be enabled, and matching code in
+        * osc_extent_put removed.
+        */
+       cl_object_put(osc2cl(ext->oe_obj));
+
        OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
+#endif
 }
 
 static struct osc_extent *osc_extent_get(struct osc_extent *ext)
 {
-       LASSERT(atomic_read(&ext->oe_refc) >= 0);
-       atomic_inc(&ext->oe_refc);
+       LASSERT(kref_read(&ext->oe_refc) >= 0);
+       kref_get(&ext->oe_refc);
        return ext;
 }
 
 static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
 {
-       LASSERT(atomic_read(&ext->oe_refc) > 0);
-       if (atomic_dec_and_test(&ext->oe_refc)) {
-               LASSERT(list_empty(&ext->oe_link));
-               LASSERT(atomic_read(&ext->oe_users) == 0);
-               LASSERT(ext->oe_state == OES_INV);
-               LASSERT(RB_EMPTY_NODE(&ext->oe_node));
-
-               if (ext->oe_dlmlock != NULL) {
-                       lu_ref_del(&ext->oe_dlmlock->l_reference,
-                                  "osc_extent", ext);
-                       LDLM_LOCK_RELEASE(ext->oe_dlmlock);
-                       ext->oe_dlmlock = NULL;
-               }
+       LASSERT(kref_read(&ext->oe_refc) > 0);
+       if (kref_put(&ext->oe_refc, osc_extent_free)) {
+               /* This should be in osc_extent_free(), but
+                * while we need to pass 'env' it cannot be.
+                */
                cl_object_put(env, osc2cl(ext->oe_obj));
-               osc_extent_free(ext);
+
+               OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
        }
 }
 
@@ -379,9 +395,9 @@ static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
  */
 static void osc_extent_put_trust(struct osc_extent *ext)
 {
-       LASSERT(atomic_read(&ext->oe_refc) > 1);
+       LASSERT(kref_read(&ext->oe_refc) > 1);
        assert_osc_object_is_locked(ext->oe_obj);
-       atomic_dec(&ext->oe_refc);
+       osc_extent_put(NULL, ext);
 }
 
 /**
@@ -521,6 +537,14 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
        if (cur->oe_max_end != victim->oe_max_end)
                return -ERANGE;
 
+       /*
+        * In the rare case max_pages_per_rpc (mppr) is changed, don't
+        * merge extents until after old ones have been sent, or the
+        * "extents are aligned to RPCs" checks are unhappy.
+        */
+       if (cur->oe_mppr != victim->oe_mppr)
+               return -ERANGE;
+
        LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
        ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
        chunk_start = cur->oe_start >> ppc_bits;
@@ -546,7 +570,6 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
        cur->oe_urgent   |= victim->oe_urgent;
        cur->oe_memalloc |= victim->oe_memalloc;
        list_splice_init(&victim->oe_pages, &cur->oe_pages);
-       list_del_init(&victim->oe_link);
        victim->oe_nr_pages = 0;
 
        osc_extent_get(victim);
@@ -674,7 +697,7 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
                cur->oe_start = descr->cld_start;
        if (cur->oe_end > max_end)
                cur->oe_end = max_end;
-       cur->oe_grants  = 0;
+       cur->oe_grants  = chunksize + cli->cl_grant_extent_tax;
        cur->oe_mppr    = max_pages;
        if (olck->ols_dlmlock != NULL) {
                LASSERT(olck->ols_hold);
@@ -742,54 +765,21 @@ restart:
                         * flushed, try next one. */
                        continue;
 
-               /* check if they belong to the same rpc slot before trying to
-                * merge. the extents are not overlapped and contiguous at
-                * chunk level to get here. */
-               if (ext->oe_max_end != max_end)
-                       /* if they don't belong to the same RPC slot or
-                        * max_pages_per_rpc has ever changed, do not merge. */
-                       continue;
-
-               /* check whether maximum extent size will be hit */
-               if ((ext_chk_end - ext_chk_start + 1 + 1) << ppc_bits >
-                   cli->cl_max_extent_pages)
-                       continue;
-
-               /* it's required that an extent must be contiguous at chunk
-                * level so that we know the whole extent is covered by grant
-                * (the pages in the extent are NOT required to be contiguous).
-                * Otherwise, it will be too much difficult to know which
-                * chunks have grants allocated. */
-
-               /* try to do front merge - extend ext's start */
-               if (chunk + 1 == ext_chk_start) {
-                       /* ext must be chunk size aligned */
-                       EASSERT((ext->oe_start & ~chunk_mask) == 0, ext);
-
-                       /* pull ext's start back to cover cur */
-                       ext->oe_start   = cur->oe_start;
-                       ext->oe_grants += chunksize;
+               if (osc_extent_merge(env, ext, cur) == 0) {
                        LASSERT(*grants >= chunksize);
                        *grants -= chunksize;
-
                        found = osc_extent_hold(ext);
-               } else if (chunk == ext_chk_end + 1) {
-                       /* rear merge */
-                       ext->oe_end     = cur->oe_end;
-                       ext->oe_grants += chunksize;
-                       LASSERT(*grants >= chunksize);
-                       *grants -= chunksize;
 
-                       /* try to merge with the next one because we just fill
-                        * in a gap */
+                       /*
+                        * Try to merge with the next one too because we
+                        * might have just filled in a gap.
+                        */
                        if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
                                /* we can save extent tax from next extent */
                                *grants += cli->cl_grant_extent_tax;
 
-                       found = osc_extent_hold(ext);
-               }
-               if (found != NULL)
                        break;
+               }
        }
 
        osc_extent_tree_dump(D_CACHE, obj);
@@ -803,7 +793,6 @@ restart:
        } else if (conflict == NULL) {
                /* create a new extent */
                EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
-               cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
                LASSERT(*grants >= cur->oe_grants);
                *grants -= cur->oe_grants;
 
@@ -1566,12 +1555,21 @@ static inline void cli_lock_after_unplug(struct client_obd *cli)
 static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
                           struct osc_async_page *oap, int bytes)
 {
-       struct osc_object       *osc = oap->oap_obj;
-       struct lov_oinfo        *loi = osc->oo_oinfo;
-       int                      rc = -EDQUOT;
-       unsigned long timeout = cfs_time_seconds(AT_OFF ? obd_timeout : at_max);
+       struct osc_object *osc = oap->oap_obj;
+       struct lov_oinfo *loi = osc->oo_oinfo;
+       int rc = -EDQUOT;
        int remain;
        bool entered = false;
+       /* We cannot wait for a long time here since we are holding ldlm lock
+        * across the actual IO. If no requests complete fast (e.g. due to
+        * overloaded OST that takes a long time to process everything, we'd
+        * get evicted if we wait for a normal obd_timeout or some such.
+        * So we try to wait half the time it would take the client to be
+        * evicted by server which is half obd_timeout when AT is off
+        * or at least ldlm_enqueue_min with AT on.
+        * See LU-13131 */
+       unsigned long timeout = cfs_time_seconds(AT_OFF ? obd_timeout / 2 :
+                                                         ldlm_enqueue_min / 2);
 
        ENTRY;
 
@@ -1939,7 +1937,6 @@ static unsigned int get_write_extents(struct osc_object *obj,
        while (!list_empty(&obj->oo_hp_exts)) {
                ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
                                 oe_link);
-               LASSERT(ext->oe_state == OES_CACHE);
                if (!try_to_add_extent_for_io(cli, ext, &data))
                        return data.erd_page_count;
                EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
@@ -2155,7 +2152,12 @@ __must_hold(&cli->cl_loi_list_lock)
 
                OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
 
-               if (osc_max_rpc_in_flight(cli, osc)) {
+               /* even if we have reached our max in flight RPCs, we still
+                * allow all high-priority RPCs through to prevent their
+                * starvation and leading to server evicting us for not
+                * writing out pages in a timely manner LU-13131 */
+               if (osc_max_rpc_in_flight(cli, osc) &&
+                   list_empty(&osc->oo_hp_exts)) {
                        __osc_list_maint(cli, osc);
                        break;
                }
@@ -2532,7 +2534,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
        oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
        spin_unlock(&oap->oap_lock);
 
-       if (memory_pressure_get())
+       if (current->flags & PF_MEMALLOC)
                ext->oe_memalloc = 1;
 
        ext->oe_urgent = 1;
@@ -2613,7 +2615,21 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
        /* Reuse the initial refcount for RPC, don't drop it */
        osc_extent_state_set(ext, OES_LOCK_DONE);
        if (!ext->oe_rw) { /* write */
-               list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+               if (!ext->oe_srvlock && !ext->oe_dio) {
+                       /* The most likely case here is from lack of grants
+                        * so we are either out of quota or out of space.
+                        * Since this means we are holding locks across
+                        * potentially multi-striped IO, we must send out
+                        * everything out instantly to avoid prolonged
+                        * waits resulting in lock eviction (likely since
+                        * the extended wait in osc_cache_enter() did not
+                        * yield any additional grant due to a timeout.
+                        * LU-13131 */
+                       ext->oe_hp = 1;
+                       list_add_tail(&ext->oe_link, &obj->oo_hp_exts);
+               } else {
+                       list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+               }
                osc_update_pending(obj, OBD_BRW_WRITE, page_count);
        } else {
                list_add_tail(&ext->oe_link, &obj->oo_reading_exts);