Whamcloud - gitweb
LU-2139 osc: Use SOFT_SYNC to urge server commit
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index d4765f2..4d97b3e 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011 Whamcloud, Inc.
+ * Copyright (c) 2012, 2013, Intel Corporation.
  *
  */
 /*
@@ -99,27 +99,28 @@ static inline char list_empty_marker(cfs_list_t *list)
 
 #define EXTSTR       "[%lu -> %lu/%lu]"
 #define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
+static const char *oes_strings[] = {
+       "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
 
 #define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do {                          \
        struct osc_extent *__ext = (extent);                                  \
-       const char *__str[] = OES_STRINGS;                                    \
        char __buf[16];                                                       \
                                                                              \
        CDEBUG(lvl,                                                           \
                "extent %p@{" EXTSTR ", "                                     \
                "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt,          \
-               /* ----- extent part 0 ----- */                               \
+               /* ----- extent part 0 ----- */                               \
                __ext, EXTPARA(__ext),                                        \
                /* ----- part 1 ----- */                                      \
                cfs_atomic_read(&__ext->oe_refc),                             \
                cfs_atomic_read(&__ext->oe_users),                            \
                list_empty_marker(&__ext->oe_link),                           \
-               __str[__ext->oe_state], ext_flags(__ext, __buf),              \
+               oes_strings[__ext->oe_state], ext_flags(__ext, __buf),        \
                __ext->oe_obj,                                                \
                /* ----- part 2 ----- */                                      \
                __ext->oe_grants, __ext->oe_nr_pages,                         \
                list_empty_marker(&__ext->oe_pages),                          \
-               cfs_waitq_active(&__ext->oe_waitq) ? '+' : '-',               \
+               waitqueue_active(&__ext->oe_waitq) ? '+' : '-',               \
                __ext->oe_osclock, __ext->oe_mppr, __ext->oe_owner,           \
                /* ----- part 4 ----- */                                      \
                ## __VA_ARGS__);                                              \
@@ -128,10 +129,10 @@ static inline char list_empty_marker(cfs_list_t *list)
 #undef EASSERTF
 #define EASSERTF(expr, ext, fmt, args...) do {                         \
        if (!(expr)) {                                                  \
-               OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args);            \
-               osc_extent_tree_dump(D_ERROR, (ext)->oe_obj);            \
+               OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args);           \
+               osc_extent_tree_dump(D_ERROR, (ext)->oe_obj);           \
                LASSERT(expr);                                          \
-       }                                                                    \
+       }                                                               \
 } while (0)
 
 #undef EASSERT
@@ -301,14 +302,14 @@ static void osc_extent_state_set(struct osc_extent *ext, int state)
 
        /* TODO: validate the state machine */
        ext->oe_state = state;
-       cfs_waitq_broadcast(&ext->oe_waitq);
+       wake_up_all(&ext->oe_waitq);
 }
 
 static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
 {
        struct osc_extent *ext;
 
-       OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, CFS_ALLOC_STD);
+       OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, GFP_IOFS);
        if (ext == NULL)
                return NULL;
 
@@ -319,7 +320,7 @@ static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
        CFS_INIT_LIST_HEAD(&ext->oe_link);
        ext->oe_state = OES_INV;
        CFS_INIT_LIST_HEAD(&ext->oe_pages);
-       cfs_waitq_init(&ext->oe_waitq);
+       init_waitqueue_head(&ext->oe_waitq);
        ext->oe_osclock = NULL;
 
        return ext;
@@ -504,7 +505,7 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
                return -ERANGE;
 
        LASSERT(cur->oe_osclock == victim->oe_osclock);
-       ppc_bits = osc_cli(obj)->cl_chunkbits - CFS_PAGE_SHIFT;
+       ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_CACHE_SHIFT;
        chunk_start = cur->oe_start >> ppc_bits;
        chunk_end   = cur->oe_end   >> ppc_bits;
        if (chunk_start   != (victim->oe_end >> ppc_bits) + 1 &&
@@ -611,8 +612,8 @@ struct osc_extent *osc_extent_find(const struct lu_env *env,
        LASSERT(lock != NULL);
        LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
 
-       LASSERT(cli->cl_chunkbits >= CFS_PAGE_SHIFT);
-       ppc_bits   = cli->cl_chunkbits - CFS_PAGE_SHIFT;
+       LASSERT(cli->cl_chunkbits >= PAGE_CACHE_SHIFT);
+       ppc_bits   = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
        chunk_mask = ~((1 << ppc_bits) - 1);
        chunksize  = 1 << cli->cl_chunkbits;
        chunk      = index >> ppc_bits;
@@ -826,8 +827,8 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
 
        if (!sent) {
                lost_grant = ext->oe_grants;
-       } else if (blocksize < CFS_PAGE_SIZE &&
-                  last_count != CFS_PAGE_SIZE) {
+       } else if (blocksize < PAGE_CACHE_SIZE &&
+                  last_count != PAGE_CACHE_SIZE) {
                /* For short writes we shouldn't count parts of pages that
                 * span a whole chunk on the OST side, or our accounting goes
                 * wrong.  Should match the code in filter_grant_check. */
@@ -837,7 +838,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                if (end)
                        count += blocksize - end;
 
-               lost_grant = CFS_PAGE_SIZE - count;
+               lost_grant = PAGE_CACHE_SIZE - count;
        }
        if (ext->oe_grants > 0)
                osc_free_grant(cli, nr_pages, lost_grant);
@@ -895,7 +896,7 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
                        "%s: wait ext to %d timedout, recovery in progress?\n",
                        osc_export(obj)->exp_obd->obd_name, state);
 
-               lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+               lwi = LWI_INTR(NULL, NULL);
                rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
                                  &lwi);
        }
@@ -919,7 +920,8 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
        struct osc_async_page *oap;
        struct osc_async_page *tmp;
        int                    pages_in_chunk = 0;
-       int                    ppc_bits    = cli->cl_chunkbits - CFS_PAGE_SHIFT;
+       int                    ppc_bits    = cli->cl_chunkbits -
+                                            PAGE_CACHE_SHIFT;
        __u64                  trunc_chunk = trunc_index >> ppc_bits;
        int                    grants   = 0;
        int                    nr_pages = 0;
@@ -962,7 +964,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                cfs_list_del_init(&oap->oap_pending_item);
 
                cl_page_get(page);
-               lu_ref_add(&page->cp_reference, "truncate", cfs_current());
+               lu_ref_add(&page->cp_reference, "truncate", current);
 
                if (cl_page_own(env, io, page) == 0) {
                        cl_page_unmap(env, io, page);
@@ -973,7 +975,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                        LASSERT(0);
                }
 
-               lu_ref_del(&page->cp_reference, "truncate", cfs_current());
+               lu_ref_del(&page->cp_reference, "truncate", current);
                cl_page_put(env, page);
 
                --ext->oe_nr_pages;
@@ -1076,16 +1078,20 @@ static int osc_extent_make_ready(const struct lu_env *env,
        if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
                last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
                LASSERT(last->oap_count > 0);
-               LASSERT(last->oap_page_off + last->oap_count <= CFS_PAGE_SIZE);
+               LASSERT(last->oap_page_off + last->oap_count <= PAGE_CACHE_SIZE);
+               spin_lock(&last->oap_lock);
                last->oap_async_flags |= ASYNC_COUNT_STABLE;
+               spin_unlock(&last->oap_lock);
        }
 
        /* for the rest of pages, we don't need to call osf_refresh_count()
         * because it's known they are not the last page */
        cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
-                       oap->oap_count = CFS_PAGE_SIZE - oap->oap_page_off;
+                       oap->oap_count = PAGE_CACHE_SIZE - oap->oap_page_off;
+                       spin_lock(&oap->oap_lock);
                        oap->oap_async_flags |= ASYNC_COUNT_STABLE;
+                       spin_unlock(&oap->oap_lock);
                }
        }
 
@@ -1108,7 +1114,7 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants)
        struct osc_object *obj = ext->oe_obj;
        struct client_obd *cli = osc_cli(obj);
        struct osc_extent *next;
-       int ppc_bits = cli->cl_chunkbits - CFS_PAGE_SHIFT;
+       int ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
        pgoff_t chunk = index >> ppc_bits;
        pgoff_t end_chunk;
        pgoff_t end_index;
@@ -1240,9 +1246,9 @@ static int osc_refresh_count(const struct lu_env *env,
                return 0;
        else if (cl_offset(obj, page->cp_index + 1) > kms)
                /* catch sub-page write at end of file */
-               return kms % CFS_PAGE_SIZE;
+               return kms % PAGE_CACHE_SIZE;
        else
-               return CFS_PAGE_SIZE;
+               return PAGE_CACHE_SIZE;
 }
 
 static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
@@ -1257,8 +1263,10 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        ENTRY;
 
        cmd &= ~OBD_BRW_NOQUOTA;
-       LASSERT(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ));
-       LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
+       LASSERTF(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ),
+                "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
+       LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE),
+               "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
        LASSERT(opg->ops_transfer_pinned);
 
        /*
@@ -1283,8 +1291,6 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        opg->ops_submit_time = 0;
        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
 
-       cl_page_completion(env, page, crt, rc);
-
        /* statistic */
        if (rc == 0 && srvlock) {
                struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
@@ -1303,37 +1309,40 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
         * reference counter protects page from concurrent reclaim.
         */
        lu_ref_del(&page->cp_reference, "transfer", page);
-       /*
-        * As page->cp_obj is pinned by a reference from page->cp_req, it is
-        * safe to call cl_page_put() without risking object destruction in a
-        * non-blocking context.
-        */
-       cl_page_put(env, page);
+
+       cl_page_completion(env, page, crt, rc);
+
        RETURN(0);
 }
 
-#define OSC_DUMP_GRANT(cli, fmt, args...) do {                               \
+#define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do {                          \
        struct client_obd *__tmp = (cli);                                     \
-       CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d "            \
-              "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " fmt,   \
+       CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d "          \
+              "unstable_pages: %d/%d dropped: %ld avail: %ld, "              \
+              "reserved: %ld, flight: %d } lru {in list: %d, "               \
+              "left: %d, waiters: %d }" fmt,                                 \
               __tmp->cl_import->imp_obd->obd_name,                           \
               __tmp->cl_dirty, __tmp->cl_dirty_max,                          \
               cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages,        \
+              cfs_atomic_read(&obd_unstable_pages), obd_max_dirty_pages,     \
               __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
-              __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args);      \
+              __tmp->cl_reserved_grant, __tmp->cl_w_in_flight,               \
+              cfs_atomic_read(&__tmp->cl_lru_in_list),                       \
+              cfs_atomic_read(&__tmp->cl_lru_busy),                          \
+              cfs_atomic_read(&__tmp->cl_lru_shrinkers), ##args);            \
 } while (0)
 
 /* caller must hold loi_list_lock */
 static void osc_consume_write_grant(struct client_obd *cli,
                                    struct brw_page *pga)
 {
-       LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
+       LASSERT(spin_is_locked(&cli->cl_loi_list_lock.lock));
        LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
        cfs_atomic_inc(&obd_dirty_pages);
-       cli->cl_dirty += CFS_PAGE_SIZE;
+       cli->cl_dirty += PAGE_CACHE_SIZE;
        pga->flag |= OBD_BRW_FROM_GRANT;
        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
-              CFS_PAGE_SIZE, pga, pga->pg);
+              PAGE_CACHE_SIZE, pga, pga->pg);
        osc_update_next_shrink(cli);
 }
 
@@ -1344,7 +1353,7 @@ static void osc_release_write_grant(struct client_obd *cli,
 {
        ENTRY;
 
-       LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
+       LASSERT(spin_is_locked(&cli->cl_loi_list_lock.lock));
        if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
                EXIT;
                return;
@@ -1352,11 +1361,11 @@ static void osc_release_write_grant(struct client_obd *cli,
 
        pga->flag &= ~OBD_BRW_FROM_GRANT;
        cfs_atomic_dec(&obd_dirty_pages);
-       cli->cl_dirty -= CFS_PAGE_SIZE;
+       cli->cl_dirty -= PAGE_CACHE_SIZE;
        if (pga->flag & OBD_BRW_NOCACHE) {
                pga->flag &= ~OBD_BRW_NOCACHE;
                cfs_atomic_dec(&obd_dirty_transit_pages);
-               cli->cl_dirty_transit -= CFS_PAGE_SIZE;
+               cli->cl_dirty_transit -= PAGE_CACHE_SIZE;
        }
        EXIT;
 }
@@ -1393,8 +1402,6 @@ static void __osc_unreserve_grant(struct client_obd *cli,
        } else {
                cli->cl_avail_grant += unused;
        }
-       if (unused > 0)
-               osc_wake_cache_waiters(cli);
 }
 
 void osc_unreserve_grant(struct client_obd *cli,
@@ -1402,6 +1409,8 @@ void osc_unreserve_grant(struct client_obd *cli,
 {
        client_obd_list_lock(&cli->cl_loi_list_lock);
        __osc_unreserve_grant(cli, reserved, unused);
+       if (unused > 0)
+               osc_wake_cache_waiters(cli);
        client_obd_list_unlock(&cli->cl_loi_list_lock);
 }
 
@@ -1412,7 +1421,7 @@ void osc_unreserve_grant(struct client_obd *cli,
  * used, we should return these grants to OST. There're two cases where grants
  * can be lost:
  * 1. truncate;
- * 2. blocksize at OST is less than CFS_PAGE_SIZE and a partial page was
+ * 2. blocksize at OST is less than PAGE_CACHE_SIZE and a partial page was
  *    written. In this case OST may use less chunks to serve this partial
  *    write. OSTs don't actually know the page size on the client side. so
  *    clients have to calculate lost grant by the blocksize on the OST.
@@ -1425,7 +1434,7 @@ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
 
        client_obd_list_lock(&cli->cl_loi_list_lock);
        cfs_atomic_sub(nr_pages, &obd_dirty_pages);
-       cli->cl_dirty -= nr_pages << CFS_PAGE_SHIFT;
+       cli->cl_dirty -= nr_pages << PAGE_CACHE_SHIFT;
        cli->cl_lost_grant += lost_grant;
        if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) {
                /* borrow some grant from truncate to avoid the case that
@@ -1461,17 +1470,18 @@ static int osc_enter_cache_try(struct client_obd *cli,
 {
        int rc;
 
-       OSC_DUMP_GRANT(cli, "need:%d.\n", bytes);
+       OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
 
        rc = osc_reserve_grant(cli, bytes);
        if (rc < 0)
                return 0;
 
-       if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
-           cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
+       if (cli->cl_dirty + PAGE_CACHE_SIZE <= cli->cl_dirty_max &&
+           cfs_atomic_read(&obd_unstable_pages) + 1 +
+           cfs_atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
                osc_consume_write_grant(cli, &oap->oap_brw_page);
                if (transient) {
-                       cli->cl_dirty_transit += CFS_PAGE_SIZE;
+                       cli->cl_dirty_transit += PAGE_CACHE_SIZE;
                        cfs_atomic_inc(&obd_dirty_transit_pages);
                        oap->oap_brw_flags |= OBD_BRW_NOCACHE;
                }
@@ -1483,6 +1493,15 @@ static int osc_enter_cache_try(struct client_obd *cli,
        return rc;
 }
 
+static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
+{
+       int rc;
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       rc = cfs_list_empty(&ocw->ocw_entry);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       return rc;
+}
+
 /**
  * The main entry to reserve dirty page accounting. Usually the grant reserved
  * in this function will be freed in bulk in osc_free_grant() unless it fails
@@ -1496,18 +1515,19 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
        struct osc_object *osc = oap->oap_obj;
        struct lov_oinfo  *loi = osc->oo_oinfo;
        struct osc_cache_waiter ocw;
-       struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+       struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
+                                                 LWI_ON_SIGNAL_NOOP, NULL);
        int rc = -EDQUOT;
        ENTRY;
 
-       OSC_DUMP_GRANT(cli, "need:%d.\n", bytes);
+       OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
 
        client_obd_list_lock(&cli->cl_loi_list_lock);
 
        /* force the caller to try sync io.  this can jump the list
         * of queued writes and create a discontiguous rpc stream */
        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
-           cli->cl_dirty_max < CFS_PAGE_SIZE     ||
+           cli->cl_dirty_max < PAGE_CACHE_SIZE     ||
            cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
                GOTO(out, rc = -EDQUOT);
 
@@ -1521,7 +1541,7 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
         * RPC size will be.
         * The exiting condition is no avail grants and no dirty pages caching,
         * that really means there is no space on the OST. */
-       cfs_waitq_init(&ocw.ocw_waitq);
+       init_waitqueue_head(&ocw.ocw_waitq);
        ocw.ocw_oap   = oap;
        ocw.ocw_grant = bytes;
        while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) {
@@ -1529,32 +1549,51 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
                ocw.ocw_rc = 0;
                client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-               osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND);
+               osc_io_unplug_async(env, cli, NULL);
 
                CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
                       cli->cl_import->imp_obd->obd_name, &ocw, oap);
 
-               rc = l_wait_event(ocw.ocw_waitq,
-                                 cfs_list_empty(&ocw.ocw_entry), &lwi);
+               rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
-               cfs_list_del_init(&ocw.ocw_entry);
-               if (rc < 0)
-                       break;
 
+               /* l_wait_event is interrupted by signal, or timed out */
+               if (rc < 0) {
+                       switch (rc) {
+                       case -ETIMEDOUT:
+                               OSC_DUMP_GRANT(D_ERROR, cli,
+                                               "try to reserve %d.\n", bytes);
+                               osc_extent_tree_dump(D_ERROR, osc);
+                               rc = -EDQUOT;
+                               break;
+                       case -EINTR:
+                               /* Ensures restartability - LU-3581 */
+                               rc = -ERESTARTSYS;
+                               break;
+                       default:
+                               CDEBUG(D_CACHE, "%s: event for cache space @"
+                                      " %p never arrived due to %d\n",
+                                      cli->cl_import->imp_obd->obd_name,
+                                      &ocw, rc);
+                               break;
+                       }
+                       cfs_list_del_init(&ocw.ocw_entry);
+                       GOTO(out, rc);
+               }
+
+               LASSERT(cfs_list_empty(&ocw.ocw_entry));
                rc = ocw.ocw_rc;
+
                if (rc != -EDQUOT)
-                       break;
-               if (osc_enter_cache_try(cli, oap, bytes, 0)) {
-                       rc = 0;
-                       break;
-               }
+                       GOTO(out, rc);
+               if (osc_enter_cache_try(cli, oap, bytes, 0))
+                       GOTO(out, rc = 0);
        }
        EXIT;
-
 out:
        client_obd_list_unlock(&cli->cl_loi_list_lock);
-       OSC_DUMP_GRANT(cli, "returned %d.\n", rc);
+       OSC_DUMP_GRANT(D_CACHE, cli, "returned %d.\n", rc);
        RETURN(rc);
 }
 
@@ -1566,35 +1605,29 @@ void osc_wake_cache_waiters(struct client_obd *cli)
 
        ENTRY;
        cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
-               /* if we can't dirty more, we must wait until some is written */
-               if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
-                   (cfs_atomic_read(&obd_dirty_pages) + 1 >
-                    obd_max_dirty_pages)) {
+               ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
+               cfs_list_del_init(&ocw->ocw_entry);
+
+               ocw->ocw_rc = -EDQUOT;
+               /* we can't dirty more */
+               if ((cli->cl_dirty + PAGE_CACHE_SIZE > cli->cl_dirty_max) ||
+                   (cfs_atomic_read(&obd_unstable_pages) + 1 +
+                    cfs_atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) {
                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
                               "osc max %ld, sys max %d\n", cli->cl_dirty,
                               cli->cl_dirty_max, obd_max_dirty_pages);
-                       return;
-               }
-
-               /* if still dirty cache but no grant wait for pending RPCs that
-                * may yet return us some grant before doing sync writes */
-               if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
-                       CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
-                              cli->cl_w_in_flight);
-                       return;
+                       goto wakeup;
                }
 
-               ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
-               cfs_list_del_init(&ocw->ocw_entry);
-
                ocw->ocw_rc = 0;
                if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
                        ocw->ocw_rc = -EDQUOT;
 
+wakeup:
                CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
                       ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
 
-               cfs_waitq_signal(&ocw->ocw_waitq);
+               wake_up(&ocw->ocw_waitq);
        }
 
        EXIT;
@@ -1747,6 +1780,91 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
                ar->ar_force_sync = 0;
 }
 
+/* Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable. */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+       struct ptlrpc_bulk_desc *desc       = req->rq_bulk;
+       struct client_obd       *cli        = &req->rq_import->imp_obd->u.cli;
+       obd_count                page_count = desc->bd_iov_count;
+       int i;
+
+       /* No unstable page tracking */
+       if (cli->cl_cache == NULL)
+               return;
+
+       LASSERT(page_count >= 0);
+
+       for (i = 0; i < page_count; i++)
+               dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+       cfs_atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr);
+       LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+
+       cfs_atomic_sub(page_count, &cli->cl_unstable_count);
+       LASSERT(cfs_atomic_read(&cli->cl_unstable_count) >= 0);
+
+       cfs_atomic_sub(page_count, &obd_unstable_pages);
+       LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+
+       spin_lock(&req->rq_lock);
+       req->rq_committed = 1;
+       req->rq_unstable  = 0;
+       spin_unlock(&req->rq_lock);
+
+       wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
+}
+
+/* "unstable" page accounting. See: osc_dec_unstable_pages. */
+void osc_inc_unstable_pages(struct ptlrpc_request *req)
+{
+       struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+       struct client_obd       *cli  = &req->rq_import->imp_obd->u.cli;
+       obd_count                page_count = desc->bd_iov_count;
+       int i;
+
+       /* No unstable page tracking */
+       if (cli->cl_cache == NULL)
+               return;
+
+       LASSERT(page_count >= 0);
+
+       for (i = 0; i < page_count; i++)
+               inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+       LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+       cfs_atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr);
+
+       LASSERT(cfs_atomic_read(&cli->cl_unstable_count) >= 0);
+       cfs_atomic_add(page_count, &cli->cl_unstable_count);
+
+       LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+       cfs_atomic_add(page_count, &obd_unstable_pages);
+
+       spin_lock(&req->rq_lock);
+
+       /* If the request has already been committed (i.e. brw_commit
+        * called via rq_commit_cb), we need to undo the unstable page
+        * increments we just performed because rq_commit_cb wont be
+        * called again. Otherwise, just set the commit callback so the
+        * unstable page accounting is properly updated when the request
+        * is committed */
+       if (req->rq_committed) {
+               /* Drop lock before calling osc_dec_unstable_pages */
+               spin_unlock(&req->rq_lock);
+               osc_dec_unstable_pages(req);
+               spin_lock(&req->rq_lock);
+       } else {
+               req->rq_unstable  = 1;
+               req->rq_commit_cb = osc_dec_unstable_pages;
+       }
+
+       spin_unlock(&req->rq_lock);
+}
+
 /* this must be called holding the loi list lock to give coverage to exit_cache,
  * async_flag maintenance, and oap_request */
 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
@@ -1758,6 +1876,9 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
 
        ENTRY;
        if (oap->oap_request != NULL) {
+               if (rc == 0)
+                       osc_inc_unstable_pages(oap->oap_request);
+
                xid = ptlrpc_req_xid(oap->oap_request);
                ptlrpc_req_finished(oap->oap_request);
                oap->oap_request = NULL;
@@ -1804,7 +1925,7 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
                RETURN(0);
 
        cfs_list_for_each_entry(tmp, rpclist, oe_link) {
-               EASSERT(tmp->oe_owner == cfs_current(), tmp);
+               EASSERT(tmp->oe_owner == current, tmp);
 #if 0
                if (overlapped(tmp, ext)) {
                        OSC_EXTENT_DUMP(D_ERROR, tmp, "overlapped %p.\n", ext);
@@ -1822,7 +1943,7 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
 
        *pc += ext->oe_nr_pages;
        cfs_list_move_tail(&ext->oe_link, rpclist);
-       ext->oe_owner = cfs_current();
+       ext->oe_owner = current;
        RETURN(1);
 }
 
@@ -2063,7 +2184,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
 
        while ((osc = osc_next_obj(cli)) != NULL) {
                struct cl_object *obj = osc2cl(osc);
-               struct lu_ref_link *link;
+               struct lu_ref_link link;
 
                OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
 
@@ -2074,7 +2195,8 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
 
                cl_object_get(obj);
                client_obd_list_unlock(&cli->cl_loi_list_lock);
-               link = lu_object_ref_add(&obj->co_lu, "check", cfs_current());
+               lu_object_ref_add_at(&obj->co_lu, &link, "check",
+                                    current);
 
                /* attempt some read/write balancing by alternating between
                 * reads and writes in an object.  The makes_rpc checks here
@@ -2115,7 +2237,8 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
                osc_object_unlock(osc);
 
                osc_list_maint(cli, osc);
-               lu_object_ref_del_at(&obj->co_lu, link, "check", cfs_current());
+               lu_object_ref_del_at(&obj->co_lu, &link, "check",
+                                    current);
                cl_object_put(env, obj);
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
@@ -2125,23 +2248,24 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
 static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
                          struct osc_object *osc, pdl_policy_t pol, int async)
 {
-       int has_rpcs = 1;
        int rc = 0;
 
-       client_obd_list_lock(&cli->cl_loi_list_lock);
-       if (osc != NULL)
-               has_rpcs = __osc_list_maint(cli, osc);
-       if (has_rpcs) {
-               if (!async) {
-                       osc_check_rpcs(env, cli, pol);
-               } else {
-                       CDEBUG(D_CACHE, "Queue writeback work for client %p.\n",
-                              cli);
-                       LASSERT(cli->cl_writeback_work != NULL);
-                       rc = ptlrpcd_queue_work(cli->cl_writeback_work);
-               }
+       if (osc != NULL && osc_list_maint(cli, osc) == 0)
+               return 0;
+
+       if (!async) {
+               /* disable osc_lru_shrink() temporarily to avoid
+                * potential stack overrun problem. LU-2859 */
+               cfs_atomic_inc(&cli->cl_lru_shrinkers);
+               client_obd_list_lock(&cli->cl_loi_list_lock);
+               osc_check_rpcs(env, cli, pol);
+               client_obd_list_unlock(&cli->cl_loi_list_lock);
+               cfs_atomic_dec(&cli->cl_lru_shrinkers);
+       } else {
+               CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli);
+               LASSERT(cli->cl_writeback_work != NULL);
+               rc = ptlrpcd_queue_work(cli->cl_writeback_work);
        }
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
        return rc;
 }
 
@@ -2159,7 +2283,7 @@ void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
 }
 
 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
-                       cfs_page_t *page, loff_t offset)
+                       struct page *page, loff_t offset)
 {
        struct obd_export     *exp = osc_export(osc);
        struct osc_async_page *oap = &ops->ops_oap;
@@ -2243,9 +2367,14 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                        RETURN(rc);
        }
 
+       if (osc_over_unstable_soft_limit(cli))
+               brw_flags |= OBD_BRW_SOFT_SYNC;
+
        oap->oap_cmd = cmd;
        oap->oap_page_off = ops->ops_from;
        oap->oap_count = ops->ops_to - ops->ops_from;
+       /* No need to hold a lock here,
+        * since this page is not in any list yet. */
        oap->oap_async_flags = 0;
        oap->oap_brw_flags = brw_flags;
 
@@ -2440,14 +2569,15 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
        oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
        spin_unlock(&oap->oap_lock);
 
-       if (cfs_memory_pressure_get())
+       if (memory_pressure_get())
                ext->oe_memalloc = 1;
 
        ext->oe_urgent = 1;
-       if (ext->oe_state == OES_CACHE && cfs_list_empty(&ext->oe_link)) {
+       if (ext->oe_state == OES_CACHE) {
                OSC_EXTENT_DUMP(D_CACHE, ext,
                                "flush page %p make it urgent.\n", oap);
-               cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+               if (cfs_list_empty(&ext->oe_link))
+                       cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
                unplug = true;
        }
        rc = 0;
@@ -2624,6 +2754,8 @@ again:
                        break;
                }
 
+               OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:"LPU64".\n", size);
+
                osc_extent_get(ext);
                if (ext->oe_state == OES_ACTIVE) {
                        /* though we grab inode mutex for write path, but we
@@ -2688,13 +2820,17 @@ again:
                osc_extent_put(env, ext);
        }
        if (waiting != NULL) {
-               if (result == 0)
-                       result = osc_extent_wait(env, waiting, OES_INV);
+               int rc;
+
+               /* ignore the result of osc_extent_wait the write initiator
+                * should take care of it. */
+               rc = osc_extent_wait(env, waiting, OES_INV);
+               if (rc < 0)
+                       OSC_EXTENT_DUMP(D_CACHE, waiting, "error: %d.\n", rc);
 
                osc_extent_put(env, waiting);
                waiting = NULL;
-               if (result == 0)
-                       goto again;
+               goto again;
        }
        RETURN(result);
 }
@@ -2830,17 +2966,16 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
                                        ext->oe_urgent = 1;
                                        list = &obj->oo_urgent_exts;
                                }
-                               if (list != NULL) {
+                               if (list != NULL)
                                        cfs_list_move_tail(&ext->oe_link, list);
-                                       unplug = true;
-                               }
+                               unplug = true;
                        } else {
                                /* the only discarder is lock cancelling, so
                                 * [start, end] must contain this extent */
                                EASSERT(ext->oe_start >= start &&
                                        ext->oe_max_end <= end, ext);
                                osc_extent_state_set(ext, OES_LOCKING);
-                               ext->oe_owner = cfs_current();
+                               ext->oe_owner = current;
                                cfs_list_move_tail(&ext->oe_link,
                                                   &discard_list);
                                osc_update_pending(obj, OBD_BRW_WRITE,
@@ -2901,7 +3036,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
                        result = rc;
        }
 
-       OSC_IO_DEBUG(obj, "cache page out.\n");
+       OSC_IO_DEBUG(obj, "pageout [%lu, %lu], %d.\n", start, end, result);
        RETURN(result);
 }