Whamcloud - gitweb
LU-10239 osc: limit chunk number of write submit
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index 981f35c..4660212 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  *
  */
 /*
@@ -41,7 +37,8 @@
 
 #define DEBUG_SUBSYSTEM S_OSC
 
-#include "osc_cl_internal.h"
+#include <lustre_osc.h>
+
 #include "osc_internal.h"
 
 static int extent_debug; /* set it to be true for more debug */
@@ -58,13 +55,16 @@ static int osc_refresh_count(const struct lu_env *env,
 static int osc_io_unplug_async(const struct lu_env *env,
                               struct client_obd *cli, struct osc_object *osc);
 static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
-                          unsigned int lost_grant);
+                          unsigned int lost_grant, unsigned int dirty_grant);
 
 static void osc_extent_tree_dump0(int level, struct osc_object *obj,
                                  const char *func, int line);
 #define osc_extent_tree_dump(lvl, obj) \
        osc_extent_tree_dump0(lvl, obj, __func__, __LINE__)
 
+static void osc_unreserve_grant(struct client_obd *cli, unsigned int reserved,
+                               unsigned int unused);
+
 /** \addtogroup osc
  *  @{
  */
@@ -127,9 +127,9 @@ static const char *oes_strings[] = {
                /* ----- part 4 ----- */                                      \
                ## __VA_ARGS__);                                              \
        if (lvl == D_ERROR && __ext->oe_dlmlock != NULL)                      \
-               LDLM_ERROR(__ext->oe_dlmlock, "extent: %p\n", __ext);         \
+               LDLM_ERROR(__ext->oe_dlmlock, "extent: %p", __ext);           \
        else                                                                  \
-               LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p\n", __ext);         \
+               LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p", __ext);           \
 } while (0)
 
 #undef EASSERTF
@@ -226,7 +226,9 @@ static int osc_extent_sanity_check0(struct osc_extent *ext,
        if (ext->oe_sync && ext->oe_grants > 0)
                GOTO(out, rc = 90);
 
-       if (ext->oe_dlmlock != NULL && !ldlm_is_failed(ext->oe_dlmlock)) {
+       if (ext->oe_dlmlock != NULL &&
+           ext->oe_dlmlock->l_resource->lr_type == LDLM_EXTENT &&
+           !ldlm_is_failed(ext->oe_dlmlock)) {
                struct ldlm_extent *extent;
 
                extent = &ext->oe_dlmlock->l_policy_data.l_extent;
@@ -319,12 +321,13 @@ static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
 {
        struct osc_extent *ext;
 
-       OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, GFP_IOFS);
+       OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, GFP_NOFS);
        if (ext == NULL)
                return NULL;
 
        RB_CLEAR_NODE(&ext->oe_node);
        ext->oe_obj = obj;
+       cl_object_get(osc2cl(obj));
        atomic_set(&ext->oe_refc, 1);
        atomic_set(&ext->oe_users, 0);
        INIT_LIST_HEAD(&ext->oe_link);
@@ -363,6 +366,7 @@ static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
                        LDLM_LOCK_PUT(ext->oe_dlmlock);
                        ext->oe_dlmlock = NULL;
                }
+               cl_object_put(env, osc2cl(ext->oe_obj));
                osc_extent_free(ext);
        }
 }
@@ -495,15 +499,16 @@ static void osc_extent_remove(struct osc_extent *ext)
 
 /**
  * This function is used to merge extents to get better performance. It checks
- * if @cur and @victim are contiguous at chunk level.
+ * if @cur and @victim are contiguous at block level.
  */
 static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
                            struct osc_extent *victim)
 {
-       struct osc_object *obj = cur->oe_obj;
-       pgoff_t chunk_start;
-       pgoff_t chunk_end;
-       int ppc_bits;
+       struct osc_object       *obj = cur->oe_obj;
+       struct client_obd       *cli = osc_cli(obj);
+       pgoff_t                  chunk_start;
+       pgoff_t                  chunk_end;
+       int                      ppc_bits;
 
        LASSERT(cur->oe_state == OES_CACHE);
        LASSERT(osc_object_is_locked(obj));
@@ -517,18 +522,25 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
                return -ERANGE;
 
        LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
-       ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_CACHE_SHIFT;
+       ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
        chunk_start = cur->oe_start >> ppc_bits;
        chunk_end   = cur->oe_end   >> ppc_bits;
        if (chunk_start   != (victim->oe_end >> ppc_bits) + 1 &&
            chunk_end + 1 != victim->oe_start >> ppc_bits)
                return -ERANGE;
 
+       /* overall extent size should not exceed the max supported limit
+        * reported by the server */
+       if (cur->oe_end - cur->oe_start + 1 +
+           victim->oe_end - victim->oe_start + 1 > cli->cl_max_extent_pages)
+               return -ERANGE;
+
        OSC_EXTENT_DUMP(D_CACHE, victim, "will be merged by %p.\n", cur);
 
        cur->oe_start     = min(cur->oe_start, victim->oe_start);
        cur->oe_end       = max(cur->oe_end,   victim->oe_end);
-       cur->oe_grants   += victim->oe_grants;
+       /* per-extent tax should be accounted only once for the whole extent */
+       cur->oe_grants   += victim->oe_grants - cli->cl_grant_extent_tax;
        cur->oe_nr_pages += victim->oe_nr_pages;
        /* only the following bits are needed to merge */
        cur->oe_urgent   |= victim->oe_urgent;
@@ -551,6 +563,7 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
 int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
 {
        struct osc_object *obj = ext->oe_obj;
+       struct client_obd *cli = osc_cli(obj);
        int rc = 0;
        ENTRY;
 
@@ -567,21 +580,31 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
                        osc_extent_state_set(ext, OES_TRUNC);
                        ext->oe_trunc_pending = 0;
                } else {
+                       int grant = 0;
+
                        osc_extent_state_set(ext, OES_CACHE);
                        osc_update_pending(obj, OBD_BRW_WRITE,
                                           ext->oe_nr_pages);
 
                        /* try to merge the previous and next extent. */
-                       osc_extent_merge(env, ext, prev_extent(ext));
-                       osc_extent_merge(env, ext, next_extent(ext));
+                       if (osc_extent_merge(env, ext, prev_extent(ext)) == 0)
+                               grant += cli->cl_grant_extent_tax;
+                       if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
+                               grant += cli->cl_grant_extent_tax;
+                       if (grant > 0)
+                               osc_unreserve_grant(cli, 0, grant);
 
                        if (ext->oe_urgent)
                                list_move_tail(&ext->oe_link,
                                               &obj->oo_urgent_exts);
+                       else if (ext->oe_nr_pages == ext->oe_mppr) {
+                               list_move_tail(&ext->oe_link,
+                                              &obj->oo_full_exts);
+                       }
                }
                osc_object_unlock(obj);
 
-               osc_io_unplug_async(env, osc_cli(obj), obj);
+               osc_io_unplug_async(env, cli, obj);
        }
        osc_extent_put(env, ext);
        RETURN(rc);
@@ -627,15 +650,20 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
        descr = &olck->ols_cl.cls_lock->cll_descr;
        LASSERT(descr->cld_mode >= CLM_WRITE);
 
-       LASSERT(cli->cl_chunkbits >= PAGE_CACHE_SHIFT);
-       ppc_bits   = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
+       LASSERTF(cli->cl_chunkbits >= PAGE_SHIFT,
+                "chunkbits: %u\n", cli->cl_chunkbits);
+       ppc_bits   = cli->cl_chunkbits - PAGE_SHIFT;
        chunk_mask = ~((1 << ppc_bits) - 1);
        chunksize  = 1 << cli->cl_chunkbits;
        chunk      = index >> ppc_bits;
 
-       /* align end to rpc edge, rpc size may not be a power 2 integer. */
+       /* align end to RPC edge. */
        max_pages = cli->cl_max_pages_per_rpc;
-       LASSERT((max_pages & ~chunk_mask) == 0);
+       if ((max_pages & ~chunk_mask) != 0) {
+               CERROR("max_pages: %#x chunkbits: %u chunk_mask: %#lx\n",
+                      max_pages, cli->cl_chunkbits, chunk_mask);
+               RETURN(ERR_PTR(-EINVAL));
+       }
        max_end = index - (index % max_pages) + max_pages - 1;
        max_end = min_t(pgoff_t, max_end, descr->cld_end);
 
@@ -656,8 +684,8 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
        }
 
        /* grants has been allocated by caller */
-       LASSERTF(*grants >= chunksize + cli->cl_extent_tax,
-                "%u/%u/%u.\n", *grants, chunksize, cli->cl_extent_tax);
+       LASSERTF(*grants >= chunksize + cli->cl_grant_extent_tax,
+                "%u/%u/%u.\n", *grants, chunksize, cli->cl_grant_extent_tax);
        LASSERTF((max_end - cur->oe_start) < max_pages, EXTSTR"\n",
                 EXTPARA(cur));
 
@@ -671,7 +699,7 @@ restart:
                pgoff_t ext_chk_end   = ext->oe_end   >> ppc_bits;
 
                LASSERT(sanity_check_nolock(ext) == 0);
-               if (chunk > ext_chk_end + 1)
+               if (chunk > ext_chk_end + 1 || chunk < ext_chk_start)
                        break;
 
                /* if covering by different locks, no chance to match */
@@ -730,6 +758,13 @@ restart:
                        continue;
                }
 
+               /* check whether maximum extent size will be hit */
+               if ((ext_chk_end - ext_chk_start + 1 + 1) << ppc_bits >
+                   cli->cl_max_extent_pages) {
+                       ext = next_extent(ext);
+                       continue;
+               }
+
                /* it's required that an extent must be contiguous at chunk
                 * level so that we know the whole extent is covered by grant
                 * (the pages in the extent are NOT required to be contiguous).
@@ -744,6 +779,7 @@ restart:
                        /* pull ext's start back to cover cur */
                        ext->oe_start   = cur->oe_start;
                        ext->oe_grants += chunksize;
+                       LASSERT(*grants >= chunksize);
                        *grants -= chunksize;
 
                        found = osc_extent_hold(ext);
@@ -751,13 +787,14 @@ restart:
                        /* rear merge */
                        ext->oe_end     = cur->oe_end;
                        ext->oe_grants += chunksize;
+                       LASSERT(*grants >= chunksize);
                        *grants -= chunksize;
 
                        /* try to merge with the next one because we just fill
                         * in a gap */
                        if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
                                /* we can save extent tax from next extent */
-                               *grants += cli->cl_extent_tax;
+                               *grants += cli->cl_grant_extent_tax;
 
                        found = osc_extent_hold(ext);
                }
@@ -778,9 +815,9 @@ restart:
        } else if (conflict == NULL) {
                /* create a new extent */
                EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
-               cur->oe_grants = chunksize + cli->cl_extent_tax;
+               cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
+               LASSERT(*grants >= cur->oe_grants);
                *grants -= cur->oe_grants;
-               LASSERT(*grants >= 0);
 
                cur->oe_state = OES_CACHE;
                found = osc_extent_hold(cur);
@@ -807,7 +844,6 @@ restart:
 
 out:
        osc_extent_put(env, cur);
-       LASSERT(*grants >= 0);
        return found;
 }
 
@@ -849,8 +885,8 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
 
        if (!sent) {
                lost_grant = ext->oe_grants;
-       } else if (blocksize < PAGE_CACHE_SIZE &&
-                  last_count != PAGE_CACHE_SIZE) {
+       } else if (blocksize < PAGE_SIZE &&
+                  last_count != PAGE_SIZE) {
                /* For short writes we shouldn't count parts of pages that
                 * span a whole chunk on the OST side, or our accounting goes
                 * wrong.  Should match the code in filter_grant_check. */
@@ -860,10 +896,10 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                if (end)
                        count += blocksize - end;
 
-               lost_grant = PAGE_CACHE_SIZE - count;
+               lost_grant = PAGE_SIZE - count;
        }
        if (ext->oe_grants > 0)
-               osc_free_grant(cli, nr_pages, lost_grant);
+               osc_free_grant(cli, nr_pages, lost_grant, ext->oe_grants);
 
        osc_extent_remove(ext);
        /* put the refcount for RPC */
@@ -934,20 +970,21 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
 static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                                bool partial)
 {
-       struct cl_env_nest     nest;
        struct lu_env         *env;
        struct cl_io          *io;
        struct osc_object     *obj = ext->oe_obj;
        struct client_obd     *cli = osc_cli(obj);
        struct osc_async_page *oap;
        struct osc_async_page *tmp;
+       struct pagevec        *pvec;
        int                    pages_in_chunk = 0;
        int                    ppc_bits    = cli->cl_chunkbits -
-                                            PAGE_CACHE_SHIFT;
+                                            PAGE_SHIFT;
        __u64                  trunc_chunk = trunc_index >> ppc_bits;
        int                    grants   = 0;
        int                    nr_pages = 0;
        int                    rc       = 0;
+       __u16                  refcheck;
        ENTRY;
 
        LASSERT(sanity_check(ext) == 0);
@@ -957,9 +994,15 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
        /* Request new lu_env.
         * We can't use that env from osc_cache_truncate_start() because
         * it's from lov_io_sub and not fully initialized. */
-       env = cl_env_nested_get(&nest);
-       io  = &osc_env_info(env)->oti_io;
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       io  = osc_env_thread_io(env);
        io->ci_obj = cl_object_top(osc2cl(obj));
+       io->ci_ignore_layout = 1;
+       pvec = &osc_env_info(env)->oti_pagevec;
+       ll_pagevec_init(pvec, 0);
        rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
        if (rc < 0)
                GOTO(out, rc);
@@ -997,11 +1040,13 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                }
 
                lu_ref_del(&page->cp_reference, "truncate", current);
-               cl_page_put(env, page);
+               cl_pagevec_put(env, page, pvec);
 
                --ext->oe_nr_pages;
                ++nr_pages;
        }
+       pagevec_release(pvec);
+
        EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
                      ext->oe_nr_pages == 0),
                ext, "trunc_index %lu, partial %d\n", trunc_index, partial);
@@ -1038,11 +1083,11 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
        osc_object_unlock(obj);
 
        if (grants > 0 || nr_pages > 0)
-               osc_free_grant(cli, nr_pages, grants);
+               osc_free_grant(cli, nr_pages, grants, grants);
 
 out:
        cl_io_fini(env, io);
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
        RETURN(rc);
 }
 
@@ -1099,7 +1144,7 @@ static int osc_extent_make_ready(const struct lu_env *env,
        if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
                int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
                LASSERT(last_oap_count > 0);
-               LASSERT(last->oap_page_off + last_oap_count <= PAGE_CACHE_SIZE);
+               LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE);
                last->oap_count = last_oap_count;
                spin_lock(&last->oap_lock);
                last->oap_async_flags |= ASYNC_COUNT_STABLE;
@@ -1110,7 +1155,7 @@ static int osc_extent_make_ready(const struct lu_env *env,
         * because it's known they are not the last page */
        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
-                       oap->oap_count = PAGE_CACHE_SIZE - oap->oap_page_off;
+                       oap->oap_count = PAGE_SIZE - oap->oap_page_off;
                        spin_lock(&oap->oap_lock);
                        oap->oap_async_flags |= ASYNC_COUNT_STABLE;
                        spin_unlock(&oap->oap_lock);
@@ -1137,7 +1182,7 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index,
        struct osc_object *obj = ext->oe_obj;
        struct client_obd *cli = osc_cli(obj);
        struct osc_extent *next;
-       int ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
+       int ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
        pgoff_t chunk = index >> ppc_bits;
        pgoff_t end_chunk;
        pgoff_t end_index;
@@ -1156,9 +1201,14 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index,
                GOTO(out, rc = 0);
 
        LASSERT(end_chunk + 1 == chunk);
+
        /* try to expand this extent to cover @index */
        end_index = min(ext->oe_max_end, ((chunk + 1) << ppc_bits) - 1);
 
+       /* don't go over the maximum extent size reported by server */
+       if (end_index - ext->oe_start + 1 > cli->cl_max_extent_pages)
+               GOTO(out, rc = -ERANGE);
+
        next = next_extent(ext);
        if (next != NULL && next->oe_start <= end_index)
                /* complex mode - overlapped with the next extent,
@@ -1167,8 +1217,8 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index,
 
        ext->oe_end = end_index;
        ext->oe_grants += chunksize;
+       LASSERT(*grants >= chunksize);
        *grants -= chunksize;
-       LASSERT(*grants >= 0);
        EASSERTF(osc_extent_is_overlapped(obj, ext) == 0, ext,
                 "overlapped after expanding for %lu.\n", index);
        EXIT;
@@ -1184,6 +1234,9 @@ static void osc_extent_tree_dump0(int level, struct osc_object *obj,
        struct osc_extent *ext;
        int cnt;
 
+       if (!cfs_cdebug_show(level, DEBUG_SUBSYSTEM))
+               return;
+
        CDEBUG(level, "Dump object %p extents at %s:%d, mppr: %u.\n",
               obj, func, line, osc_cli(obj)->cl_max_pages_per_rpc);
 
@@ -1238,7 +1291,7 @@ static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
        ENTRY;
        result = cl_page_make_ready(env, page, CRT_WRITE);
        if (result == 0)
-               opg->ops_submit_time = cfs_time_current();
+               opg->ops_submit_time = ktime_get();
        RETURN(result);
 }
 
@@ -1249,7 +1302,6 @@ static int osc_refresh_count(const struct lu_env *env,
        pgoff_t index = osc_index(oap2osc(oap));
        struct cl_object *obj;
        struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
-
        int result;
        loff_t kms;
 
@@ -1269,9 +1321,9 @@ static int osc_refresh_count(const struct lu_env *env,
                return 0;
        else if (cl_offset(obj, index + 1) > kms)
                /* catch sub-page write at end of file */
-               return kms % PAGE_CACHE_SIZE;
+               return kms % PAGE_SIZE;
        else
-               return PAGE_CACHE_SIZE;
+               return PAGE_SIZE;
 }
 
 static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
@@ -1295,7 +1347,7 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        /* Clear opg->ops_transfer_pinned before VM lock is released. */
        opg->ops_transfer_pinned = 0;
 
-       opg->ops_submit_time = 0;
+       opg->ops_submit_time = ktime_set(0, 0);
        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
 
        /* statistic */
@@ -1325,13 +1377,15 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
 
 #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do {                    \
        struct client_obd *__tmp = (cli);                               \
-       CDEBUG(lvl, "%s: grant { dirty: %lu/%lu dirty_pages: %ld/%lu "  \
-              "dropped: %ld avail: %ld, reserved: %ld, flight: %d }"   \
-              "lru {in list: %ld, left: %ld, waiters: %d }"fmt"\n",    \
+       CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %ld/%lu "  \
+              "dropped: %ld avail: %ld, dirty_grant: %ld, "            \
+              "reserved: %ld, flight: %d } lru {in list: %ld, "        \
+              "left: %ld, waiters: %d }" fmt "\n",                     \
               cli_name(__tmp),                                         \
               __tmp->cl_dirty_pages, __tmp->cl_dirty_max_pages,        \
               atomic_long_read(&obd_dirty_pages), obd_max_dirty_pages, \
               __tmp->cl_lost_grant, __tmp->cl_avail_grant,             \
+              __tmp->cl_dirty_grant,                                   \
               __tmp->cl_reserved_grant, __tmp->cl_w_in_flight,         \
               atomic_long_read(&__tmp->cl_lru_in_list),                \
               atomic_long_read(&__tmp->cl_lru_busy),                   \
@@ -1348,7 +1402,7 @@ static void osc_consume_write_grant(struct client_obd *cli,
        cli->cl_dirty_pages++;
        pga->flag |= OBD_BRW_FROM_GRANT;
        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
-              PAGE_CACHE_SIZE, pga, pga->pg);
+              PAGE_SIZE, pga, pga->pg);
        osc_update_next_shrink(cli);
 }
 
@@ -1405,8 +1459,10 @@ static void __osc_unreserve_grant(struct client_obd *cli,
        if (unused > reserved) {
                cli->cl_avail_grant += reserved;
                cli->cl_lost_grant  += unused - reserved;
+               cli->cl_dirty_grant -= unused - reserved;
        } else {
                cli->cl_avail_grant += unused;
+               cli->cl_dirty_grant += reserved - unused;
        }
 }
 
@@ -1427,21 +1483,24 @@ static void osc_unreserve_grant(struct client_obd *cli,
  * used, we should return these grants to OST. There're two cases where grants
  * can be lost:
  * 1. truncate;
- * 2. blocksize at OST is less than PAGE_CACHE_SIZE and a partial page was
+ * 2. blocksize at OST is less than PAGE_SIZE and a partial page was
  *    written. In this case OST may use less chunks to serve this partial
  *    write. OSTs don't actually know the page size on the client side. so
  *    clients have to calculate lost grant by the blocksize on the OST.
  *    See filter_grant_check() for details.
  */
 static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
-                          unsigned int lost_grant)
+                          unsigned int lost_grant, unsigned int dirty_grant)
 {
-       unsigned long grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
+       unsigned long grant;
+
+       grant = (1 << cli->cl_chunkbits) + cli->cl_grant_extent_tax;
 
        spin_lock(&cli->cl_loi_list_lock);
        atomic_long_sub(nr_pages, &obd_dirty_pages);
        cli->cl_dirty_pages -= nr_pages;
        cli->cl_lost_grant += lost_grant;
+       cli->cl_dirty_grant -= dirty_grant;
        if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) {
                /* borrow some grant from truncate to avoid the case that
                 * truncate uses up all avail grant */
@@ -1450,9 +1509,10 @@ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
        }
        osc_wake_cache_waiters(cli);
        spin_unlock(&cli->cl_loi_list_lock);
-       CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu\n",
+       CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu/%lu\n",
               lost_grant, cli->cl_lost_grant,
-              cli->cl_avail_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
+              cli->cl_avail_grant, cli->cl_dirty_pages << PAGE_SHIFT,
+              cli->cl_dirty_grant);
 }
 
 /**
@@ -1651,6 +1711,7 @@ wakeup:
 
        EXIT;
 }
+EXPORT_SYMBOL(osc_wake_cache_waiters);
 
 static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
 {
@@ -1696,9 +1757,10 @@ static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
                        CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
                        RETURN(1);
                }
-               if (atomic_read(&osc->oo_nr_writes) >=
-                   cli->cl_max_pages_per_rpc)
+               if (!list_empty(&osc->oo_full_exts)) {
+                       CDEBUG(D_CACHE, "full extent ready, make an RPC\n");
                        RETURN(1);
+               }
        } else {
                if (atomic_read(&osc->oo_nr_reads) == 0)
                        RETURN(0);
@@ -1837,6 +1899,22 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
        EXIT;
 }
 
+struct extent_rpc_data {
+       struct list_head        *erd_rpc_list;
+       unsigned int            erd_page_count;
+       unsigned int            erd_max_pages;
+       unsigned int            erd_max_chunks;
+       unsigned int            erd_max_extents;
+};
+
+static inline unsigned osc_extent_chunks(const struct osc_extent *ext)
+{
+       struct client_obd *cli = osc_cli(ext->oe_obj);
+       unsigned ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
+
+       return (ext->oe_end >> ppc_bits) - (ext->oe_start >> ppc_bits) + 1;
+}
+
 /**
  * Try to add extent to one RPC. We need to think about the following things:
  * - # of pages must not be over max_pages_per_rpc
@@ -1844,10 +1922,10 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
  */
 static int try_to_add_extent_for_io(struct client_obd *cli,
                                    struct osc_extent *ext,
-                                   struct list_head *rpclist,
-                                   unsigned int *pc, unsigned int *max_pages)
+                                   struct extent_rpc_data *data)
 {
        struct osc_extent *tmp;
+       unsigned int chunk_count;
        struct osc_async_page *oap = list_first_entry(&ext->oe_pages,
                                                      struct osc_async_page,
                                                      oap_pending_item);
@@ -1855,12 +1933,30 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
 
        EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE),
                ext);
+       OSC_EXTENT_DUMP(D_CACHE, ext, "trying to add this extent\n");
+
+       if (data->erd_max_extents == 0)
+               RETURN(0);
+
+       chunk_count = osc_extent_chunks(ext);
+       EASSERTF(data->erd_page_count != 0 ||
+                chunk_count <= data->erd_max_chunks, ext,
+                "The first extent to be fit in a RPC contains %u chunks, "
+                "which is over the limit %u.\n", chunk_count,
+                data->erd_max_chunks);
+       if (chunk_count > data->erd_max_chunks)
+               RETURN(0);
 
-       *max_pages = max(ext->oe_mppr, *max_pages);
-       if (*pc + ext->oe_nr_pages > *max_pages)
+       data->erd_max_pages = max(ext->oe_mppr, data->erd_max_pages);
+       EASSERTF(data->erd_page_count != 0 ||
+               ext->oe_nr_pages <= data->erd_max_pages, ext,
+               "The first extent to be fit in a RPC contains %u pages, "
+               "which is over the limit %u.\n", ext->oe_nr_pages,
+               data->erd_max_pages);
+       if (data->erd_page_count + ext->oe_nr_pages > data->erd_max_pages)
                RETURN(0);
 
-       list_for_each_entry(tmp, rpclist, oe_link) {
+       list_for_each_entry(tmp, data->erd_rpc_list, oe_link) {
                struct osc_async_page *oap2;
                oap2 = list_first_entry(&tmp->oe_pages, struct osc_async_page,
                                        oap_pending_item);
@@ -1872,21 +1968,25 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
                }
 #endif
                if (oap2cl_page(oap)->cp_type != oap2cl_page(oap2)->cp_type) {
-                       CDEBUG(D_CACHE, "Do not permit different type of IO"
-                                       " for a same RPC\n");
+                       CDEBUG(D_CACHE, "Do not permit different types of IO "
+                              "in one RPC\n");
                        RETURN(0);
                }
 
                if (tmp->oe_srvlock != ext->oe_srvlock ||
-                   !tmp->oe_grants != !ext->oe_grants)
+                   !tmp->oe_grants != !ext->oe_grants ||
+                   tmp->oe_ndelay != ext->oe_ndelay ||
+                   tmp->oe_no_merge || ext->oe_no_merge)
                        RETURN(0);
 
                /* remove break for strict check */
                break;
        }
 
-       *pc += ext->oe_nr_pages;
-       list_move_tail(&ext->oe_link, rpclist);
+       data->erd_max_extents--;
+       data->erd_max_chunks -= chunk_count;
+       data->erd_page_count += ext->oe_nr_pages;
+       list_move_tail(&ext->oe_link, data->erd_rpc_list);
        ext->oe_owner = current;
        RETURN(1);
 }
@@ -1909,45 +2009,48 @@ static unsigned int get_write_extents(struct osc_object *obj,
 {
        struct client_obd *cli = osc_cli(obj);
        struct osc_extent *ext;
-       unsigned int page_count = 0;
-       unsigned int max_pages = cli->cl_max_pages_per_rpc;
+       struct extent_rpc_data data = {
+               .erd_rpc_list   = rpclist,
+               .erd_page_count = 0,
+               .erd_max_pages  = cli->cl_max_pages_per_rpc,
+               .erd_max_chunks = osc_max_write_chunks(cli),
+               .erd_max_extents = 256,
+       };
 
        LASSERT(osc_object_is_locked(obj));
        while (!list_empty(&obj->oo_hp_exts)) {
                ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
                                 oe_link);
                LASSERT(ext->oe_state == OES_CACHE);
-               if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
-                                             &max_pages))
-                       return page_count;
-               EASSERT(ext->oe_nr_pages <= max_pages, ext);
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       return data.erd_page_count;
+               EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
        }
-       if (page_count == max_pages)
-               return page_count;
+       if (data.erd_page_count == data.erd_max_pages)
+               return data.erd_page_count;
 
        while (!list_empty(&obj->oo_urgent_exts)) {
                ext = list_entry(obj->oo_urgent_exts.next,
                                 struct osc_extent, oe_link);
-               if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
-                                             &max_pages))
-                       return page_count;
-
-               if (!ext->oe_intree)
-                       continue;
-
-               while ((ext = next_extent(ext)) != NULL) {
-                       if ((ext->oe_state != OES_CACHE) ||
-                           (!list_empty(&ext->oe_link) &&
-                            ext->oe_owner != NULL))
-                               continue;
-
-                       if (!try_to_add_extent_for_io(cli, ext, rpclist,
-                                                     &page_count, &max_pages))
-                               return page_count;
-               }
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       return data.erd_page_count;
+       }
+       if (data.erd_page_count == data.erd_max_pages)
+               return data.erd_page_count;
+
+       /* One key difference between full extents and other extents: full
+        * extents can usually only be added if the rpclist was empty, so if we
+        * can't add one, we continue on to trying to add normal extents.  This
+        * is so we don't miss adding extra extents to an RPC containing high
+        * priority or urgent extents. */
+       while (!list_empty(&obj->oo_full_exts)) {
+               ext = list_entry(obj->oo_full_exts.next,
+                                struct osc_extent, oe_link);
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       break;
        }
-       if (page_count == max_pages)
-               return page_count;
+       if (data.erd_page_count == data.erd_max_pages)
+               return data.erd_page_count;
 
        ext = first_extent(obj);
        while (ext != NULL) {
@@ -1958,13 +2061,12 @@ static unsigned int get_write_extents(struct osc_object *obj,
                        continue;
                }
 
-               if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
-                                             &max_pages))
-                       return page_count;
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       return data.erd_page_count;
 
                ext = next_extent(ext);
        }
-       return page_count;
+       return data.erd_page_count;
 }
 
 static int
@@ -2049,29 +2151,31 @@ __must_hold(osc)
        struct osc_extent *ext;
        struct osc_extent *next;
        struct list_head rpclist = LIST_HEAD_INIT(rpclist);
-       unsigned int page_count = 0;
-       unsigned int max_pages = cli->cl_max_pages_per_rpc;
+       struct extent_rpc_data data = {
+               .erd_rpc_list   = &rpclist,
+               .erd_page_count = 0,
+               .erd_max_pages  = cli->cl_max_pages_per_rpc,
+               .erd_max_chunks = UINT_MAX,
+               .erd_max_extents = UINT_MAX,
+       };
        int rc = 0;
        ENTRY;
 
        LASSERT(osc_object_is_locked(osc));
-       list_for_each_entry_safe(ext, next,
-                                    &osc->oo_reading_exts, oe_link) {
+       list_for_each_entry_safe(ext, next, &osc->oo_reading_exts, oe_link) {
                EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
-               if (!try_to_add_extent_for_io(cli, ext, &rpclist, &page_count,
-                                             &max_pages))
+               if (!try_to_add_extent_for_io(cli, ext, &data))
                        break;
                osc_extent_state_set(ext, OES_RPC);
-               EASSERT(ext->oe_nr_pages <= max_pages, ext);
+               EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
        }
-       LASSERT(page_count <= max_pages);
+       LASSERT(data.erd_page_count <= data.erd_max_pages);
 
-       osc_update_pending(osc, OBD_BRW_READ, -page_count);
+       osc_update_pending(osc, OBD_BRW_READ, -data.erd_page_count);
 
        if (!list_empty(&rpclist)) {
                osc_object_unlock(osc);
 
-               LASSERT(page_count > 0);
                rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ);
                LASSERT(list_empty(&rpclist));
 
@@ -2190,8 +2294,8 @@ __must_hold(&cli->cl_loi_list_lock)
        }
 }
 
-static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
-                         struct osc_object *osc, int async)
+int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
+                  struct osc_object *osc, int async)
 {
        int rc = 0;
 
@@ -2209,18 +2313,7 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
        }
        return rc;
 }
-
-static int osc_io_unplug_async(const struct lu_env *env,
-                               struct client_obd *cli, struct osc_object *osc)
-{
-       return osc_io_unplug0(env, cli, osc, 1);
-}
-
-void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
-                  struct osc_object *osc)
-{
-       (void)osc_io_unplug0(env, cli, osc, 0);
-}
+EXPORT_SYMBOL(osc_io_unplug0);
 
 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
                        struct page *page, loff_t offset)
@@ -2240,17 +2333,18 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
        oap->oap_obj_off = offset;
        LASSERT(!(offset & ~PAGE_MASK));
 
-       if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE))
+       if (cfs_capable(CFS_CAP_SYS_RESOURCE))
                oap->oap_brw_flags = OBD_BRW_NOQUOTA;
 
        INIT_LIST_HEAD(&oap->oap_pending_item);
        INIT_LIST_HEAD(&oap->oap_rpc_item);
 
        spin_lock_init(&oap->oap_lock);
-       CDEBUG(D_INFO, "oap %p page %p obj off "LPU64"\n",
+       CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
               oap, page, oap->oap_obj_off);
        RETURN(0);
 }
+EXPORT_SYMBOL(osc_prep_async_page);
 
 int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                       struct osc_page *ops)
@@ -2281,8 +2375,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
 
        /* Set the OBD_BRW_SRVLOCK before the page is queued. */
        brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
-       if (!client_is_remote(osc_export(osc)) &&
-           cfs_capable(CFS_CAP_SYS_RESOURCE)) {
+       if (cfs_capable(CFS_CAP_SYS_RESOURCE)) {
                brw_flags |= OBD_BRW_NOQUOTA;
                cmd |= OBD_BRW_NOQUOTA;
        }
@@ -2291,7 +2384,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
        if (!(cmd & OBD_BRW_NOQUOTA)) {
                struct cl_object *obj;
                struct cl_attr   *attr;
-               unsigned int qid[MAXQUOTAS];
+               unsigned int qid[LL_MAXQUOTAS];
 
                obj = cl_object_top(&osc->oo_cl);
                attr = &osc_env_info(env)->oti_attr;
@@ -2302,6 +2395,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
 
                qid[USRQUOTA] = attr->cat_uid;
                qid[GRPQUOTA] = attr->cat_gid;
+               qid[PRJQUOTA] = attr->cat_projid;
                if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA)
                        rc = -EDQUOT;
                if (rc)
@@ -2331,7 +2425,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
        if (ext != NULL && ext->oe_start <= index && ext->oe_max_end >= index) {
                /* one chunk plus extent overhead must be enough to write this
                 * page */
-               grants = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
+               grants = (1 << cli->cl_chunkbits) + cli->cl_grant_extent_tax;
                if (ext->oe_end >= index)
                        grants = 0;
 
@@ -2368,7 +2462,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
        }
 
        if (ext == NULL) {
-               tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
+               tmp = (1 << cli->cl_chunkbits) + cli->cl_grant_extent_tax;
 
                /* try to find new extent to cover this page */
                LASSERT(oio->oi_active == NULL);
@@ -2414,7 +2508,11 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                ++ext->oe_nr_pages;
                list_add_tail(&oap->oap_pending_item, &ext->oe_pages);
                osc_object_unlock(osc);
+
+               if (!ext->oe_layout_version)
+                       ext->oe_layout_version = io->ci_layout_version;
        }
+
        RETURN(rc);
 }
 
@@ -2600,26 +2698,33 @@ int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
        RETURN(rc);
 }
 
-int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
-                        struct list_head *list, int cmd, int brw_flags)
+int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+                        struct osc_object *obj, struct list_head *list,
+                        int brw_flags)
 {
        struct client_obd     *cli = osc_cli(obj);
        struct osc_extent     *ext;
        struct osc_async_page *oap;
        int     page_count = 0;
        int     mppr       = cli->cl_max_pages_per_rpc;
+       bool    can_merge   = true;
        pgoff_t start      = CL_PAGE_EOF;
        pgoff_t end        = 0;
        ENTRY;
 
        list_for_each_entry(oap, list, oap_pending_item) {
-               pgoff_t index = osc_index(oap2osc(oap));
+               struct osc_page *opg = oap2osc_page(oap);
+               pgoff_t index = osc_index(opg);
+
                if (index > end)
                        end = index;
                if (index < start)
                        start = index;
                ++page_count;
                mppr <<= (page_count > mppr);
+
+               if (unlikely(opg->ops_from > 0 || opg->ops_to < PAGE_SIZE))
+                       can_merge = false;
        }
 
        ext = osc_extent_alloc(obj);
@@ -2633,21 +2738,24 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
                RETURN(-ENOMEM);
        }
 
-       ext->oe_rw = !!(cmd & OBD_BRW_READ);
+       ext->oe_rw = !!(brw_flags & OBD_BRW_READ);
        ext->oe_sync = 1;
+       ext->oe_no_merge = !can_merge;
        ext->oe_urgent = 1;
        ext->oe_start = start;
        ext->oe_end = ext->oe_max_end = end;
        ext->oe_obj = obj;
        ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
+       ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
        ext->oe_nr_pages = page_count;
        ext->oe_mppr = mppr;
        list_splice_init(list, &ext->oe_pages);
+       ext->oe_layout_version = io->ci_layout_version;
 
        osc_object_lock(obj);
        /* Reuse the initial refcount for RPC, don't drop it */
        osc_extent_state_set(ext, OES_LOCK_DONE);
-       if (cmd & OBD_BRW_WRITE) {
+       if (!ext->oe_rw) { /* write */
                list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
                osc_update_pending(obj, OBD_BRW_WRITE, page_count);
        } else {
@@ -2700,7 +2808,7 @@ again:
                        break;
                }
 
-               OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:"LPU64".\n", size);
+               OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:%llu.\n", size);
 
                osc_extent_get(ext);
                if (ext->oe_state == OES_ACTIVE) {
@@ -2716,8 +2824,12 @@ again:
                        osc_update_pending(obj, OBD_BRW_WRITE,
                                           -ext->oe_nr_pages);
                }
-               EASSERT(list_empty(&ext->oe_link), ext);
-               list_add_tail(&ext->oe_link, &list);
+               /* This extent could be on the full extents list, that's OK */
+               EASSERT(!ext->oe_hp && !ext->oe_urgent, ext);
+               if (!list_empty(&ext->oe_link))
+                       list_move_tail(&ext->oe_link, &list);
+               else
+                       list_add_tail(&ext->oe_link, &list);
 
                ext = next_extent(ext);
        }
@@ -2763,7 +2875,7 @@ again:
                        LASSERT(*extp == NULL);
                        *extp = osc_extent_get(ext);
                        OSC_EXTENT_DUMP(D_CACHE, ext,
-                                       "trunc at "LPU64"\n", size);
+                                       "trunc at %llu\n", size);
                }
                osc_extent_put(env, ext);
        }
@@ -2782,6 +2894,7 @@ again:
        }
        RETURN(result);
 }
+EXPORT_SYMBOL(osc_cache_truncate_start);
 
 /**
  * Called after osc_io_setattr_end to add oio->oi_trunc back to cache.
@@ -2868,6 +2981,7 @@ again:
        OSC_IO_DEBUG(obj, "sync file range.\n");
        RETURN(result);
 }
+EXPORT_SYMBOL(osc_cache_wait_range);
 
 /**
  * Called to write out a range of osc object.
@@ -2984,6 +3098,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
        OSC_IO_DEBUG(obj, "pageout [%lu, %lu], %d.\n", start, end, result);
        RETURN(result);
 }
+EXPORT_SYMBOL(osc_cache_writeback_range);
 
 /**
  * Returns a list of pages by a given [start, end] of \a obj.
@@ -3002,6 +3117,7 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
                        osc_page_gang_cbt cb, void *cbdata)
 {
        struct osc_page *ops;
+       struct pagevec  *pagevec;
        void            **pvec;
        pgoff_t         idx;
        unsigned int    nr;
@@ -3013,6 +3129,8 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
 
        idx = start;
        pvec = osc_env_info(env)->oti_pvec;
+       pagevec = &osc_env_info(env)->oti_pagevec;
+       ll_pagevec_init(pagevec, 0);
        spin_lock(&osc->oo_tree_lock);
        while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
                                            idx, OTI_PVEC_SIZE)) > 0) {
@@ -3059,8 +3177,10 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
 
                        page = ops->ops_cl.cpl_page;
                        lu_ref_del(&page->cp_reference, "gang_lookup", current);
-                       cl_page_put(env, page);
+                       cl_pagevec_put(env, page, pagevec);
                }
+               pagevec_release(pagevec);
+
                if (nr < OTI_PVEC_SIZE || end_of_region)
                        break;
 
@@ -3076,6 +3196,7 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
                spin_unlock(&osc->oo_tree_lock);
        RETURN(res);
 }
+EXPORT_SYMBOL(osc_page_gang_lookup);
 
 /**
  * Check if page @page is covered by an extra lock or discard it.
@@ -3118,8 +3239,8 @@ static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
        return CLP_GANG_OKAY;
 }
 
-static int discard_cb(const struct lu_env *env, struct cl_io *io,
-                     struct osc_page *ops, void *cbdata)
+int osc_discard_cb(const struct lu_env *env, struct cl_io *io,
+                  struct osc_page *ops, void *cbdata)
 {
        struct osc_thread_info *info = osc_env_info(env);
        struct cl_page *page = ops->ops_cl.cpl_page;
@@ -3141,6 +3262,7 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io,
 
        return CLP_GANG_OKAY;
 }
+EXPORT_SYMBOL(osc_discard_cb);
 
 /**
  * Discard pages protected by the given lock. This function traverses radix
@@ -3151,10 +3273,10 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io,
  * behind this being that lock cancellation cannot be delayed indefinitely).
  */
 int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
-                          pgoff_t start, pgoff_t end, enum cl_lock_mode mode)
+                          pgoff_t start, pgoff_t end, bool discard)
 {
        struct osc_thread_info *info = osc_env_info(env);
-       struct cl_io *io = &info->oti_io;
+       struct cl_io *io = osc_env_thread_io(env);
        osc_page_gang_cbt cb;
        int res;
        int result;
@@ -3167,7 +3289,7 @@ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
        if (result != 0)
                GOTO(out, result);
 
-       cb = mode == CLM_READ ? check_and_discard_cb : discard_cb;
+       cb = discard ? osc_discard_cb : check_and_discard_cb;
        info->oti_fn_index = info->oti_next_index = start;
        do {
                res = osc_page_gang_lookup(env, io, osc,