Whamcloud - gitweb
LU-2909 osc: flush until no dirty in osc_enter_cache()
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index b67911b..8ec5be4 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011 Whamcloud, Inc.
+ * Copyright (c) 2012, Intel Corporation.
  *
  */
 /*
@@ -799,10 +799,11 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
        struct client_obd *cli = osc_cli(ext->oe_obj);
        struct osc_async_page *oap;
        struct osc_async_page *tmp;
-       struct osc_async_page *last = NULL;
        int nr_pages = ext->oe_nr_pages;
        int lost_grant = 0;
        int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
+       __u64 last_off = 0;
+       int last_count = -1;
        ENTRY;
 
        OSC_EXTENT_DUMP(D_CACHE, ext, "extent finished.\n");
@@ -813,8 +814,10 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                                     oap_pending_item) {
                cfs_list_del_init(&oap->oap_rpc_item);
                cfs_list_del_init(&oap->oap_pending_item);
-               if (last == NULL || last->oap_obj_off < oap->oap_obj_off)
-                       last = oap;
+               if (last_off <= oap->oap_obj_off) {
+                       last_off = oap->oap_obj_off;
+                       last_count = oap->oap_count;
+               }
 
                --ext->oe_nr_pages;
                osc_ap_completion(env, cli, oap, sent, rc);
@@ -824,7 +827,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
        if (!sent) {
                lost_grant = ext->oe_grants;
        } else if (blocksize < CFS_PAGE_SIZE &&
-                  last->oap_count != CFS_PAGE_SIZE) {
+                  last_count != CFS_PAGE_SIZE) {
                /* For short writes we shouldn't count parts of pages that
                 * span a whole chunk on the OST side, or our accounting goes
                 * wrong.  Should match the code in filter_grant_check. */
@@ -845,6 +848,17 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
        RETURN(0);
 }
 
+static int extent_wait_cb(struct osc_extent *ext, int state)
+{
+       int ret;
+
+       osc_object_lock(ext->oe_obj);
+       ret = ext->oe_state == state;
+       osc_object_unlock(ext->oe_obj);
+
+       return ret;
+}
+
 /**
  * Wait for the extent's state to become @state.
  */
@@ -852,7 +866,8 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
                           int state)
 {
        struct osc_object *obj = ext->oe_obj;
-       struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+       struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
+                                                 LWI_ON_SIGNAL_NOOP, NULL);
        int rc = 0;
        ENTRY;
 
@@ -874,7 +889,16 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
                osc_extent_release(env, ext);
 
        /* wait for the extent until its state becomes @state */
-       rc = l_wait_event(ext->oe_waitq, ext->oe_state == state, &lwi);
+       rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), &lwi);
+       if (rc == -ETIMEDOUT) {
+               OSC_EXTENT_DUMP(D_ERROR, ext,
+                       "%s: wait ext to %d timedout, recovery in progress?\n",
+                       osc_export(obj)->exp_obd->obd_name, state);
+
+               lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+               rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
+                                 &lwi);
+       }
        if (rc == 0 && ext->oe_rc < 0)
                rc = ext->oe_rc;
        RETURN(rc);
@@ -884,7 +908,8 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
  * Discard pages with index greater than @size. If @ext is overlapped with
  * @size, then partial truncate happens.
  */
-static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index)
+static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
+                               bool partial)
 {
        struct cl_env_nest     nest;
        struct lu_env         *env;
@@ -925,7 +950,8 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index)
 
                /* only discard the pages with their index greater than
                 * trunc_index, and ... */
-               if (sub->cp_index < trunc_index) {
+               if (sub->cp_index < trunc_index ||
+                   (sub->cp_index == trunc_index && partial)) {
                        /* accounting how many pages remaining in the chunk
                         * so that we can calculate grants correctly. */
                        if (sub->cp_index >> ppc_bits == trunc_chunk)
@@ -953,8 +979,9 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index)
                --ext->oe_nr_pages;
                ++nr_pages;
        }
-       EASSERTF(ergo(ext->oe_start >= trunc_index, ext->oe_nr_pages == 0),
-                ext, "trunc_index %lu\n", trunc_index);
+       EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
+                     ext->oe_nr_pages == 0),
+               ext, "trunc_index %lu, partial %d\n", trunc_index, partial);
 
        osc_object_lock(obj);
        if (ext->oe_nr_pages == 0) {
@@ -1030,9 +1057,9 @@ static int osc_extent_make_ready(const struct lu_env *env,
                rc = osc_make_ready(env, oap, OBD_BRW_WRITE);
                switch (rc) {
                case 0:
-                       cfs_spin_lock(&oap->oap_lock);
+                       spin_lock(&oap->oap_lock);
                        oap->oap_async_flags |= ASYNC_READY;
-                       cfs_spin_unlock(&oap->oap_lock);
+                       spin_unlock(&oap->oap_lock);
                        break;
                case -EALREADY:
                        LASSERT((oap->oap_async_flags & ASYNC_READY) != 0);
@@ -1246,18 +1273,16 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        /* Clear opg->ops_transfer_pinned before VM lock is released. */
        opg->ops_transfer_pinned = 0;
 
-       cfs_spin_lock(&obj->oo_seatbelt);
+       spin_lock(&obj->oo_seatbelt);
        LASSERT(opg->ops_submitter != NULL);
        LASSERT(!cfs_list_empty(&opg->ops_inflight));
        cfs_list_del_init(&opg->ops_inflight);
        opg->ops_submitter = NULL;
-       cfs_spin_unlock(&obj->oo_seatbelt);
+       spin_unlock(&obj->oo_seatbelt);
 
        opg->ops_submit_time = 0;
        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
 
-       cl_page_completion(env, page, crt, rc);
-
        /* statistic */
        if (rc == 0 && srvlock) {
                struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
@@ -1276,12 +1301,9 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
         * reference counter protects page from concurrent reclaim.
         */
        lu_ref_del(&page->cp_reference, "transfer", page);
-       /*
-        * As page->cp_obj is pinned by a reference from page->cp_req, it is
-        * safe to call cl_page_put() without risking object destruction in a
-        * non-blocking context.
-        */
-       cl_page_put(env, page);
+
+       cl_page_completion(env, page, crt, rc);
+
        RETURN(0);
 }
 
@@ -1366,8 +1388,6 @@ static void __osc_unreserve_grant(struct client_obd *cli,
        } else {
                cli->cl_avail_grant += unused;
        }
-       if (unused > 0)
-               osc_wake_cache_waiters(cli);
 }
 
 void osc_unreserve_grant(struct client_obd *cli,
@@ -1375,6 +1395,8 @@ void osc_unreserve_grant(struct client_obd *cli,
 {
        client_obd_list_lock(&cli->cl_loi_list_lock);
        __osc_unreserve_grant(cli, reserved, unused);
+       if (unused > 0)
+               osc_wake_cache_waiters(cli);
        client_obd_list_unlock(&cli->cl_loi_list_lock);
 }
 
@@ -1413,12 +1435,15 @@ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
               cli->cl_avail_grant, cli->cl_dirty);
 }
 
-/* The companion to osc_enter_cache(), called when @oap is no longer part of
- * the dirty accounting.  Writeback completes or truncate happens before
- * writing starts.  Must be called with the loi lock held. */
+/**
+ * The companion to osc_enter_cache(), called when @oap is no longer part of
+ * the dirty accounting due to error.
+ */
 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap)
 {
+       client_obd_list_lock(&cli->cl_loi_list_lock);
        osc_release_write_grant(cli, &oap->oap_brw_page);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
 }
 
 /**
@@ -1453,8 +1478,22 @@ static int osc_enter_cache_try(struct client_obd *cli,
        return rc;
 }
 
-/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
- * grant or cache space. */
+static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
+{
+       int rc;
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       rc = cfs_list_empty(&ocw->ocw_entry);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       return rc;
+}
+
+/**
+ * The main entry to reserve dirty page accounting. Usually the grant reserved
+ * in this function will be freed in bulk in osc_free_grant() unless it fails
+ * to add osc cache, in that case, it will be freed in osc_exit_cache().
+ *
+ * The process will be put into sleep if it's already run out of grant.
+ */
 static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
                           struct osc_async_page *oap, int bytes)
 {
@@ -1494,29 +1533,30 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
                ocw.ocw_rc = 0;
                client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-               osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND);
+               osc_io_unplug_async(env, cli, NULL);
 
                CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
                       cli->cl_import->imp_obd->obd_name, &ocw, oap);
 
-               rc = l_wait_event(ocw.ocw_waitq,
-                                 cfs_list_empty(&ocw.ocw_entry), &lwi);
+               rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
-               cfs_list_del_init(&ocw.ocw_entry);
-               if (rc < 0)
-                       break;
 
+               /* l_wait_event is interrupted by signal */
+               if (rc < 0) {
+                       cfs_list_del_init(&ocw.ocw_entry);
+                       GOTO(out, rc);
+               }
+
+               LASSERT(cfs_list_empty(&ocw.ocw_entry));
                rc = ocw.ocw_rc;
+
                if (rc != -EDQUOT)
-                       break;
-               if (osc_enter_cache_try(cli, oap, bytes, 0)) {
-                       rc = 0;
-                       break;
-               }
+                       GOTO(out, rc);
+               if (osc_enter_cache_try(cli, oap, bytes, 0))
+                       GOTO(out, rc = 0);
        }
        EXIT;
-
 out:
        client_obd_list_unlock(&cli->cl_loi_list_lock);
        OSC_DUMP_GRANT(cli, "returned %d.\n", rc);
@@ -1531,31 +1571,25 @@ void osc_wake_cache_waiters(struct client_obd *cli)
 
        ENTRY;
        cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
-               /* if we can't dirty more, we must wait until some is written */
+               ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
+               cfs_list_del_init(&ocw->ocw_entry);
+
+               ocw->ocw_rc = -EDQUOT;
+               /* we can't dirty more */
                if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
                     obd_max_dirty_pages)) {
                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
                               "osc max %ld, sys max %d\n", cli->cl_dirty,
                               cli->cl_dirty_max, obd_max_dirty_pages);
-                       return;
-               }
-
-               /* if still dirty cache but no grant wait for pending RPCs that
-                * may yet return us some grant before doing sync writes */
-               if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
-                       CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
-                              cli->cl_w_in_flight);
-                       return;
+                       goto wakeup;
                }
 
-               ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
-               cfs_list_del_init(&ocw->ocw_entry);
-
                ocw->ocw_rc = 0;
                if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
                        ocw->ocw_rc = -EDQUOT;
 
+wakeup:
                CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
                       ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
 
@@ -1729,9 +1763,9 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
        }
 
        /* As the transfer for this page is being done, clear the flags */
-       cfs_spin_lock(&oap->oap_lock);
+       spin_lock(&oap->oap_lock);
        oap->oap_async_flags = 0;
-       cfs_spin_unlock(&oap->oap_lock);
+       spin_unlock(&oap->oap_lock);
        oap->oap_interrupted = 0;
 
        if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) {
@@ -2098,7 +2132,11 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
                has_rpcs = __osc_list_maint(cli, osc);
        if (has_rpcs) {
                if (!async) {
+                       /* disable osc_lru_shrink() temporarily to avoid
+                        * potential stack overrun problem. LU-2859 */
+                       cfs_atomic_inc(&cli->cl_lru_shrinkers);
                        osc_check_rpcs(env, cli, pol);
+                       cfs_atomic_dec(&cli->cl_lru_shrinkers);
                } else {
                        CDEBUG(D_CACHE, "Queue writeback work for client %p.\n",
                               cli);
@@ -2147,7 +2185,7 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
        CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
        CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
 
-       cfs_spin_lock_init(&oap->oap_lock);
+       spin_lock_init(&oap->oap_lock);
        CDEBUG(D_INFO, "oap %p page %p obj off "LPU64"\n",
               oap, page, oap->oap_obj_off);
        RETURN(0);
@@ -2365,7 +2403,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
        struct cl_page    *cp    = ops->ops_cl.cpl_page;
        pgoff_t            index = cp->cp_index;
        struct osc_async_page *oap = &ops->ops_oap;
-       int unplug = 0;
+       bool unplug = false;
        int rc = 0;
        ENTRY;
 
@@ -2401,19 +2439,20 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
        if (rc)
                GOTO(out, rc);
 
-       cfs_spin_lock(&oap->oap_lock);
+       spin_lock(&oap->oap_lock);
        oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
-       cfs_spin_unlock(&oap->oap_lock);
+       spin_unlock(&oap->oap_lock);
 
        if (cfs_memory_pressure_get())
                ext->oe_memalloc = 1;
 
        ext->oe_urgent = 1;
-       if (ext->oe_state == OES_CACHE && cfs_list_empty(&ext->oe_link)) {
+       if (ext->oe_state == OES_CACHE) {
                OSC_EXTENT_DUMP(D_CACHE, ext,
                                "flush page %p make it urgent.\n", oap);
-               cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
-               unplug = 1;
+               if (cfs_list_empty(&ext->oe_link))
+                       cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+               unplug = true;
        }
        rc = 0;
        EXIT;
@@ -2560,10 +2599,12 @@ int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio,
        pgoff_t index;
        CFS_LIST_HEAD(list);
        int result = 0;
+       bool partial;
        ENTRY;
 
        /* pages with index greater or equal to index will be truncated. */
-       index = cl_index(osc2cl(obj), size + CFS_PAGE_SIZE - 1);
+       index = cl_index(osc2cl(obj), size);
+       partial = size > cl_offset(osc2cl(obj), index);
 
 again:
        osc_object_lock(obj);
@@ -2587,6 +2628,8 @@ again:
                        break;
                }
 
+               OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:"LPU64".\n", size);
+
                osc_extent_get(ext);
                if (ext->oe_state == OES_ACTIVE) {
                        /* though we grab inode mutex for write path, but we
@@ -2621,7 +2664,7 @@ again:
                if (ext->oe_state != OES_TRUNC)
                        osc_extent_wait(env, ext, OES_TRUNC);
 
-               rc = osc_extent_truncate(ext, index);
+               rc = osc_extent_truncate(ext, index, partial);
                if (rc < 0) {
                        if (result == 0)
                                result = rc;
@@ -2634,10 +2677,11 @@ again:
                        /* this must be an overlapped extent which means only
                         * part of pages in this extent have been truncated.
                         */
-                       EASSERTF(ext->oe_start < index, ext,
-                                "trunc index = %lu.\n", index);
+                       EASSERTF(ext->oe_start <= index, ext,
+                                "trunc index = %lu/%d.\n", index, partial);
                        /* fix index to skip this partially truncated extent */
                        index = ext->oe_end + 1;
+                       partial = false;
 
                        /* we need to hold this extent in OES_TRUNC state so
                         * that no writeback will happen. This is to avoid
@@ -2650,13 +2694,17 @@ again:
                osc_extent_put(env, ext);
        }
        if (waiting != NULL) {
-               if (result == 0)
-                       result = osc_extent_wait(env, waiting, OES_INV);
+               int rc;
+
+               /* ignore the result of osc_extent_wait the write initiator
+                * should take care of it. */
+               rc = osc_extent_wait(env, waiting, OES_INV);
+               if (rc < 0)
+                       OSC_EXTENT_DUMP(D_CACHE, ext, "wait error: %d.\n", rc);
 
                osc_extent_put(env, waiting);
                waiting = NULL;
-               if (result == 0)
-                       goto again;
+               goto again;
        }
        RETURN(result);
 }
@@ -2671,6 +2719,8 @@ void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio,
 
        oio->oi_trunc = NULL;
        if (ext != NULL) {
+               bool unplug = false;
+
                EASSERT(ext->oe_nr_pages > 0, ext);
                EASSERT(ext->oe_state == OES_TRUNC, ext);
                EASSERT(!ext->oe_urgent, ext);
@@ -2678,11 +2728,17 @@ void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio,
                OSC_EXTENT_DUMP(D_CACHE, ext, "trunc -> cache.\n");
                osc_object_lock(obj);
                osc_extent_state_set(ext, OES_CACHE);
+               if (ext->oe_fsync_wait && !ext->oe_urgent) {
+                       ext->oe_urgent = 1;
+                       cfs_list_move_tail(&ext->oe_link, &obj->oo_urgent_exts);
+                       unplug = true;
+               }
                osc_update_pending(obj, OBD_BRW_WRITE, ext->oe_nr_pages);
                osc_object_unlock(obj);
                osc_extent_put(env, ext);
 
-               osc_list_maint(osc_cli(obj), obj);
+               if (unplug)
+                       osc_io_unplug_async(env, osc_cli(obj), obj);
        }
 }
 
@@ -2691,9 +2747,9 @@ void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio,
  * The caller must have called osc_cache_writeback_range() to issue IO
  * otherwise it will take a long time for this function to finish.
  *
- * Caller must hold inode_mutex and i_alloc_sem, or cancel exclusive
- * dlm lock so that nobody else can dirty this range of file while we're
- * waiting for extents to be written.
+ * Caller must hold inode_mutex , or cancel exclusive dlm lock so that
+ * nobody else can dirty this range of file while we're waiting for
+ * extents to be written.
  */
 int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
                         pgoff_t start, pgoff_t end)
@@ -2756,7 +2812,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
 {
        struct osc_extent *ext;
        CFS_LIST_HEAD(discard_list);
-       int unplug = 0;
+       bool unplug = false;
        int result = 0;
        ENTRY;
 
@@ -2784,10 +2840,9 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
                                        ext->oe_urgent = 1;
                                        list = &obj->oo_urgent_exts;
                                }
-                               if (list != NULL) {
+                               if (list != NULL)
                                        cfs_list_move_tail(&ext->oe_link, list);
-                                       unplug = 1;
-                               }
+                               unplug = true;
                        } else {
                                /* the only discarder is lock cancelling, so
                                 * [start, end] must contain this extent */
@@ -2808,6 +2863,11 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
                         * grants. We do this for the correctness of fsync. */
                        LASSERT(hp == 0 && discard == 0);
                        ext->oe_urgent = 1;
+                       break;
+               case OES_TRUNC:
+                       /* this extent is being truncated, can't do anything
+                        * for it now. it will be set to urgent after truncate
+                        * is finished in osc_cache_truncate_end(). */
                default:
                        break;
                }