Whamcloud - gitweb
LU-10239 osc: limit chunk number of write submit
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index 2b15cb8..4660212 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  *
  */
 /*
@@ -41,7 +37,8 @@
 
 #define DEBUG_SUBSYSTEM S_OSC
 
-#include "osc_cl_internal.h"
+#include <lustre_osc.h>
+
 #include "osc_internal.h"
 
 static int extent_debug; /* set it to be true for more debug */
@@ -130,9 +127,9 @@ static const char *oes_strings[] = {
                /* ----- part 4 ----- */                                      \
                ## __VA_ARGS__);                                              \
        if (lvl == D_ERROR && __ext->oe_dlmlock != NULL)                      \
-               LDLM_ERROR(__ext->oe_dlmlock, "extent: %p\n", __ext);         \
+               LDLM_ERROR(__ext->oe_dlmlock, "extent: %p", __ext);           \
        else                                                                  \
-               LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p\n", __ext);         \
+               LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p", __ext);           \
 } while (0)
 
 #undef EASSERTF
@@ -229,7 +226,9 @@ static int osc_extent_sanity_check0(struct osc_extent *ext,
        if (ext->oe_sync && ext->oe_grants > 0)
                GOTO(out, rc = 90);
 
-       if (ext->oe_dlmlock != NULL && !ldlm_is_failed(ext->oe_dlmlock)) {
+       if (ext->oe_dlmlock != NULL &&
+           ext->oe_dlmlock->l_resource->lr_type == LDLM_EXTENT &&
+           !ldlm_is_failed(ext->oe_dlmlock)) {
                struct ldlm_extent *extent;
 
                extent = &ext->oe_dlmlock->l_policy_data.l_extent;
@@ -322,7 +321,7 @@ static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
 {
        struct osc_extent *ext;
 
-       OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, GFP_IOFS);
+       OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, GFP_NOFS);
        if (ext == NULL)
                return NULL;
 
@@ -523,7 +522,7 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
                return -ERANGE;
 
        LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
-       ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_CACHE_SHIFT;
+       ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
        chunk_start = cur->oe_start >> ppc_bits;
        chunk_end   = cur->oe_end   >> ppc_bits;
        if (chunk_start   != (victim->oe_end >> ppc_bits) + 1 &&
@@ -598,6 +597,10 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
                        if (ext->oe_urgent)
                                list_move_tail(&ext->oe_link,
                                               &obj->oo_urgent_exts);
+                       else if (ext->oe_nr_pages == ext->oe_mppr) {
+                               list_move_tail(&ext->oe_link,
+                                              &obj->oo_full_exts);
+                       }
                }
                osc_object_unlock(obj);
 
@@ -647,15 +650,20 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
        descr = &olck->ols_cl.cls_lock->cll_descr;
        LASSERT(descr->cld_mode >= CLM_WRITE);
 
-       LASSERT(cli->cl_chunkbits >= PAGE_CACHE_SHIFT);
-       ppc_bits   = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
+       LASSERTF(cli->cl_chunkbits >= PAGE_SHIFT,
+                "chunkbits: %u\n", cli->cl_chunkbits);
+       ppc_bits   = cli->cl_chunkbits - PAGE_SHIFT;
        chunk_mask = ~((1 << ppc_bits) - 1);
        chunksize  = 1 << cli->cl_chunkbits;
        chunk      = index >> ppc_bits;
 
-       /* align end to rpc edge, rpc size may not be a power 2 integer. */
+       /* align end to RPC edge. */
        max_pages = cli->cl_max_pages_per_rpc;
-       LASSERT((max_pages & ~chunk_mask) == 0);
+       if ((max_pages & ~chunk_mask) != 0) {
+               CERROR("max_pages: %#x chunkbits: %u chunk_mask: %#lx\n",
+                      max_pages, cli->cl_chunkbits, chunk_mask);
+               RETURN(ERR_PTR(-EINVAL));
+       }
        max_end = index - (index % max_pages) + max_pages - 1;
        max_end = min_t(pgoff_t, max_end, descr->cld_end);
 
@@ -691,7 +699,7 @@ restart:
                pgoff_t ext_chk_end   = ext->oe_end   >> ppc_bits;
 
                LASSERT(sanity_check_nolock(ext) == 0);
-               if (chunk > ext_chk_end + 1)
+               if (chunk > ext_chk_end + 1 || chunk < ext_chk_start)
                        break;
 
                /* if covering by different locks, no chance to match */
@@ -771,6 +779,7 @@ restart:
                        /* pull ext's start back to cover cur */
                        ext->oe_start   = cur->oe_start;
                        ext->oe_grants += chunksize;
+                       LASSERT(*grants >= chunksize);
                        *grants -= chunksize;
 
                        found = osc_extent_hold(ext);
@@ -778,6 +787,7 @@ restart:
                        /* rear merge */
                        ext->oe_end     = cur->oe_end;
                        ext->oe_grants += chunksize;
+                       LASSERT(*grants >= chunksize);
                        *grants -= chunksize;
 
                        /* try to merge with the next one because we just fill
@@ -806,8 +816,8 @@ restart:
                /* create a new extent */
                EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
                cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
+               LASSERT(*grants >= cur->oe_grants);
                *grants -= cur->oe_grants;
-               LASSERT(*grants >= 0);
 
                cur->oe_state = OES_CACHE;
                found = osc_extent_hold(cur);
@@ -834,7 +844,6 @@ restart:
 
 out:
        osc_extent_put(env, cur);
-       LASSERT(*grants >= 0);
        return found;
 }
 
@@ -876,8 +885,8 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
 
        if (!sent) {
                lost_grant = ext->oe_grants;
-       } else if (blocksize < PAGE_CACHE_SIZE &&
-                  last_count != PAGE_CACHE_SIZE) {
+       } else if (blocksize < PAGE_SIZE &&
+                  last_count != PAGE_SIZE) {
                /* For short writes we shouldn't count parts of pages that
                 * span a whole chunk on the OST side, or our accounting goes
                 * wrong.  Should match the code in filter_grant_check. */
@@ -887,7 +896,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                if (end)
                        count += blocksize - end;
 
-               lost_grant = PAGE_CACHE_SIZE - count;
+               lost_grant = PAGE_SIZE - count;
        }
        if (ext->oe_grants > 0)
                osc_free_grant(cli, nr_pages, lost_grant, ext->oe_grants);
@@ -961,20 +970,21 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
 static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                                bool partial)
 {
-       struct cl_env_nest     nest;
        struct lu_env         *env;
        struct cl_io          *io;
        struct osc_object     *obj = ext->oe_obj;
        struct client_obd     *cli = osc_cli(obj);
        struct osc_async_page *oap;
        struct osc_async_page *tmp;
+       struct pagevec        *pvec;
        int                    pages_in_chunk = 0;
        int                    ppc_bits    = cli->cl_chunkbits -
-                                            PAGE_CACHE_SHIFT;
+                                            PAGE_SHIFT;
        __u64                  trunc_chunk = trunc_index >> ppc_bits;
        int                    grants   = 0;
        int                    nr_pages = 0;
        int                    rc       = 0;
+       __u16                  refcheck;
        ENTRY;
 
        LASSERT(sanity_check(ext) == 0);
@@ -984,9 +994,15 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
        /* Request new lu_env.
         * We can't use that env from osc_cache_truncate_start() because
         * it's from lov_io_sub and not fully initialized. */
-       env = cl_env_nested_get(&nest);
-       io  = &osc_env_info(env)->oti_io;
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       io  = osc_env_thread_io(env);
        io->ci_obj = cl_object_top(osc2cl(obj));
+       io->ci_ignore_layout = 1;
+       pvec = &osc_env_info(env)->oti_pagevec;
+       ll_pagevec_init(pvec, 0);
        rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
        if (rc < 0)
                GOTO(out, rc);
@@ -1024,11 +1040,13 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                }
 
                lu_ref_del(&page->cp_reference, "truncate", current);
-               cl_page_put(env, page);
+               cl_pagevec_put(env, page, pvec);
 
                --ext->oe_nr_pages;
                ++nr_pages;
        }
+       pagevec_release(pvec);
+
        EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
                      ext->oe_nr_pages == 0),
                ext, "trunc_index %lu, partial %d\n", trunc_index, partial);
@@ -1069,7 +1087,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
 
 out:
        cl_io_fini(env, io);
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
        RETURN(rc);
 }
 
@@ -1126,7 +1144,7 @@ static int osc_extent_make_ready(const struct lu_env *env,
        if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
                int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
                LASSERT(last_oap_count > 0);
-               LASSERT(last->oap_page_off + last_oap_count <= PAGE_CACHE_SIZE);
+               LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE);
                last->oap_count = last_oap_count;
                spin_lock(&last->oap_lock);
                last->oap_async_flags |= ASYNC_COUNT_STABLE;
@@ -1137,7 +1155,7 @@ static int osc_extent_make_ready(const struct lu_env *env,
         * because it's known they are not the last page */
        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
-                       oap->oap_count = PAGE_CACHE_SIZE - oap->oap_page_off;
+                       oap->oap_count = PAGE_SIZE - oap->oap_page_off;
                        spin_lock(&oap->oap_lock);
                        oap->oap_async_flags |= ASYNC_COUNT_STABLE;
                        spin_unlock(&oap->oap_lock);
@@ -1164,7 +1182,7 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index,
        struct osc_object *obj = ext->oe_obj;
        struct client_obd *cli = osc_cli(obj);
        struct osc_extent *next;
-       int ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
+       int ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
        pgoff_t chunk = index >> ppc_bits;
        pgoff_t end_chunk;
        pgoff_t end_index;
@@ -1199,8 +1217,8 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index,
 
        ext->oe_end = end_index;
        ext->oe_grants += chunksize;
+       LASSERT(*grants >= chunksize);
        *grants -= chunksize;
-       LASSERT(*grants >= 0);
        EASSERTF(osc_extent_is_overlapped(obj, ext) == 0, ext,
                 "overlapped after expanding for %lu.\n", index);
        EXIT;
@@ -1216,6 +1234,9 @@ static void osc_extent_tree_dump0(int level, struct osc_object *obj,
        struct osc_extent *ext;
        int cnt;
 
+       if (!cfs_cdebug_show(level, DEBUG_SUBSYSTEM))
+               return;
+
        CDEBUG(level, "Dump object %p extents at %s:%d, mppr: %u.\n",
               obj, func, line, osc_cli(obj)->cl_max_pages_per_rpc);
 
@@ -1270,7 +1291,7 @@ static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
        ENTRY;
        result = cl_page_make_ready(env, page, CRT_WRITE);
        if (result == 0)
-               opg->ops_submit_time = cfs_time_current();
+               opg->ops_submit_time = ktime_get();
        RETURN(result);
 }
 
@@ -1281,7 +1302,6 @@ static int osc_refresh_count(const struct lu_env *env,
        pgoff_t index = osc_index(oap2osc(oap));
        struct cl_object *obj;
        struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
-
        int result;
        loff_t kms;
 
@@ -1301,9 +1321,9 @@ static int osc_refresh_count(const struct lu_env *env,
                return 0;
        else if (cl_offset(obj, index + 1) > kms)
                /* catch sub-page write at end of file */
-               return kms % PAGE_CACHE_SIZE;
+               return kms % PAGE_SIZE;
        else
-               return PAGE_CACHE_SIZE;
+               return PAGE_SIZE;
 }
 
 static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
@@ -1327,7 +1347,7 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        /* Clear opg->ops_transfer_pinned before VM lock is released. */
        opg->ops_transfer_pinned = 0;
 
-       opg->ops_submit_time = 0;
+       opg->ops_submit_time = ktime_set(0, 0);
        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
 
        /* statistic */
@@ -1382,7 +1402,7 @@ static void osc_consume_write_grant(struct client_obd *cli,
        cli->cl_dirty_pages++;
        pga->flag |= OBD_BRW_FROM_GRANT;
        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
-              PAGE_CACHE_SIZE, pga, pga->pg);
+              PAGE_SIZE, pga, pga->pg);
        osc_update_next_shrink(cli);
 }
 
@@ -1463,7 +1483,7 @@ static void osc_unreserve_grant(struct client_obd *cli,
  * used, we should return these grants to OST. There're two cases where grants
  * can be lost:
  * 1. truncate;
- * 2. blocksize at OST is less than PAGE_CACHE_SIZE and a partial page was
+ * 2. blocksize at OST is less than PAGE_SIZE and a partial page was
  *    written. In this case OST may use less chunks to serve this partial
  *    write. OSTs don't actually know the page size on the client side. so
  *    clients have to calculate lost grant by the blocksize on the OST.
@@ -1491,7 +1511,7 @@ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
        spin_unlock(&cli->cl_loi_list_lock);
        CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu/%lu\n",
               lost_grant, cli->cl_lost_grant,
-              cli->cl_avail_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT,
+              cli->cl_avail_grant, cli->cl_dirty_pages << PAGE_SHIFT,
               cli->cl_dirty_grant);
 }
 
@@ -1691,6 +1711,7 @@ wakeup:
 
        EXIT;
 }
+EXPORT_SYMBOL(osc_wake_cache_waiters);
 
 static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
 {
@@ -1736,9 +1757,10 @@ static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
                        CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
                        RETURN(1);
                }
-               if (atomic_read(&osc->oo_nr_writes) >=
-                   cli->cl_max_pages_per_rpc)
+               if (!list_empty(&osc->oo_full_exts)) {
+                       CDEBUG(D_CACHE, "full extent ready, make an RPC\n");
                        RETURN(1);
+               }
        } else {
                if (atomic_read(&osc->oo_nr_reads) == 0)
                        RETURN(0);
@@ -1877,6 +1899,22 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
        EXIT;
 }
 
+struct extent_rpc_data {
+       struct list_head        *erd_rpc_list;
+       unsigned int            erd_page_count;
+       unsigned int            erd_max_pages;
+       unsigned int            erd_max_chunks;
+       unsigned int            erd_max_extents;
+};
+
+static inline unsigned osc_extent_chunks(const struct osc_extent *ext)
+{
+       struct client_obd *cli = osc_cli(ext->oe_obj);
+       unsigned ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
+
+       return (ext->oe_end >> ppc_bits) - (ext->oe_start >> ppc_bits) + 1;
+}
+
 /**
  * Try to add extent to one RPC. We need to think about the following things:
  * - # of pages must not be over max_pages_per_rpc
@@ -1884,10 +1922,10 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
  */
 static int try_to_add_extent_for_io(struct client_obd *cli,
                                    struct osc_extent *ext,
-                                   struct list_head *rpclist,
-                                   unsigned int *pc, unsigned int *max_pages)
+                                   struct extent_rpc_data *data)
 {
        struct osc_extent *tmp;
+       unsigned int chunk_count;
        struct osc_async_page *oap = list_first_entry(&ext->oe_pages,
                                                      struct osc_async_page,
                                                      oap_pending_item);
@@ -1895,12 +1933,30 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
 
        EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE),
                ext);
+       OSC_EXTENT_DUMP(D_CACHE, ext, "trying to add this extent\n");
 
-       *max_pages = max(ext->oe_mppr, *max_pages);
-       if (*pc + ext->oe_nr_pages > *max_pages)
+       if (data->erd_max_extents == 0)
                RETURN(0);
 
-       list_for_each_entry(tmp, rpclist, oe_link) {
+       chunk_count = osc_extent_chunks(ext);
+       EASSERTF(data->erd_page_count != 0 ||
+                chunk_count <= data->erd_max_chunks, ext,
+                "The first extent to be fit in a RPC contains %u chunks, "
+                "which is over the limit %u.\n", chunk_count,
+                data->erd_max_chunks);
+       if (chunk_count > data->erd_max_chunks)
+               RETURN(0);
+
+       data->erd_max_pages = max(ext->oe_mppr, data->erd_max_pages);
+       EASSERTF(data->erd_page_count != 0 ||
+               ext->oe_nr_pages <= data->erd_max_pages, ext,
+               "The first extent to be fit in a RPC contains %u pages, "
+               "which is over the limit %u.\n", ext->oe_nr_pages,
+               data->erd_max_pages);
+       if (data->erd_page_count + ext->oe_nr_pages > data->erd_max_pages)
+               RETURN(0);
+
+       list_for_each_entry(tmp, data->erd_rpc_list, oe_link) {
                struct osc_async_page *oap2;
                oap2 = list_first_entry(&tmp->oe_pages, struct osc_async_page,
                                        oap_pending_item);
@@ -1912,13 +1968,14 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
                }
 #endif
                if (oap2cl_page(oap)->cp_type != oap2cl_page(oap2)->cp_type) {
-                       CDEBUG(D_CACHE, "Do not permit different type of IO"
-                                       " for a same RPC\n");
+                       CDEBUG(D_CACHE, "Do not permit different types of IO "
+                              "in one RPC\n");
                        RETURN(0);
                }
 
                if (tmp->oe_srvlock != ext->oe_srvlock ||
                    !tmp->oe_grants != !ext->oe_grants ||
+                   tmp->oe_ndelay != ext->oe_ndelay ||
                    tmp->oe_no_merge || ext->oe_no_merge)
                        RETURN(0);
 
@@ -1926,8 +1983,10 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
                break;
        }
 
-       *pc += ext->oe_nr_pages;
-       list_move_tail(&ext->oe_link, rpclist);
+       data->erd_max_extents--;
+       data->erd_max_chunks -= chunk_count;
+       data->erd_page_count += ext->oe_nr_pages;
+       list_move_tail(&ext->oe_link, data->erd_rpc_list);
        ext->oe_owner = current;
        RETURN(1);
 }
@@ -1950,45 +2009,48 @@ static unsigned int get_write_extents(struct osc_object *obj,
 {
        struct client_obd *cli = osc_cli(obj);
        struct osc_extent *ext;
-       unsigned int page_count = 0;
-       unsigned int max_pages = cli->cl_max_pages_per_rpc;
+       struct extent_rpc_data data = {
+               .erd_rpc_list   = rpclist,
+               .erd_page_count = 0,
+               .erd_max_pages  = cli->cl_max_pages_per_rpc,
+               .erd_max_chunks = osc_max_write_chunks(cli),
+               .erd_max_extents = 256,
+       };
 
        LASSERT(osc_object_is_locked(obj));
        while (!list_empty(&obj->oo_hp_exts)) {
                ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
                                 oe_link);
                LASSERT(ext->oe_state == OES_CACHE);
-               if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
-                                             &max_pages))
-                       return page_count;
-               EASSERT(ext->oe_nr_pages <= max_pages, ext);
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       return data.erd_page_count;
+               EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
        }
-       if (page_count == max_pages)
-               return page_count;
+       if (data.erd_page_count == data.erd_max_pages)
+               return data.erd_page_count;
 
        while (!list_empty(&obj->oo_urgent_exts)) {
                ext = list_entry(obj->oo_urgent_exts.next,
                                 struct osc_extent, oe_link);
-               if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
-                                             &max_pages))
-                       return page_count;
-
-               if (!ext->oe_intree)
-                       continue;
-
-               while ((ext = next_extent(ext)) != NULL) {
-                       if ((ext->oe_state != OES_CACHE) ||
-                           (!list_empty(&ext->oe_link) &&
-                            ext->oe_owner != NULL))
-                               continue;
-
-                       if (!try_to_add_extent_for_io(cli, ext, rpclist,
-                                                     &page_count, &max_pages))
-                               return page_count;
-               }
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       return data.erd_page_count;
+       }
+       if (data.erd_page_count == data.erd_max_pages)
+               return data.erd_page_count;
+
+       /* One key difference between full extents and other extents: full
+        * extents can usually only be added if the rpclist was empty, so if we
+        * can't add one, we continue on to trying to add normal extents.  This
+        * is so we don't miss adding extra extents to an RPC containing high
+        * priority or urgent extents. */
+       while (!list_empty(&obj->oo_full_exts)) {
+               ext = list_entry(obj->oo_full_exts.next,
+                                struct osc_extent, oe_link);
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       break;
        }
-       if (page_count == max_pages)
-               return page_count;
+       if (data.erd_page_count == data.erd_max_pages)
+               return data.erd_page_count;
 
        ext = first_extent(obj);
        while (ext != NULL) {
@@ -1999,13 +2061,12 @@ static unsigned int get_write_extents(struct osc_object *obj,
                        continue;
                }
 
-               if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
-                                             &max_pages))
-                       return page_count;
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       return data.erd_page_count;
 
                ext = next_extent(ext);
        }
-       return page_count;
+       return data.erd_page_count;
 }
 
 static int
@@ -2090,29 +2151,31 @@ __must_hold(osc)
        struct osc_extent *ext;
        struct osc_extent *next;
        struct list_head rpclist = LIST_HEAD_INIT(rpclist);
-       unsigned int page_count = 0;
-       unsigned int max_pages = cli->cl_max_pages_per_rpc;
+       struct extent_rpc_data data = {
+               .erd_rpc_list   = &rpclist,
+               .erd_page_count = 0,
+               .erd_max_pages  = cli->cl_max_pages_per_rpc,
+               .erd_max_chunks = UINT_MAX,
+               .erd_max_extents = UINT_MAX,
+       };
        int rc = 0;
        ENTRY;
 
        LASSERT(osc_object_is_locked(osc));
-       list_for_each_entry_safe(ext, next,
-                                    &osc->oo_reading_exts, oe_link) {
+       list_for_each_entry_safe(ext, next, &osc->oo_reading_exts, oe_link) {
                EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
-               if (!try_to_add_extent_for_io(cli, ext, &rpclist, &page_count,
-                                             &max_pages))
+               if (!try_to_add_extent_for_io(cli, ext, &data))
                        break;
                osc_extent_state_set(ext, OES_RPC);
-               EASSERT(ext->oe_nr_pages <= max_pages, ext);
+               EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
        }
-       LASSERT(page_count <= max_pages);
+       LASSERT(data.erd_page_count <= data.erd_max_pages);
 
-       osc_update_pending(osc, OBD_BRW_READ, -page_count);
+       osc_update_pending(osc, OBD_BRW_READ, -data.erd_page_count);
 
        if (!list_empty(&rpclist)) {
                osc_object_unlock(osc);
 
-               LASSERT(page_count > 0);
                rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ);
                LASSERT(list_empty(&rpclist));
 
@@ -2231,8 +2294,8 @@ __must_hold(&cli->cl_loi_list_lock)
        }
 }
 
-static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
-                         struct osc_object *osc, int async)
+int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
+                  struct osc_object *osc, int async)
 {
        int rc = 0;
 
@@ -2250,18 +2313,7 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
        }
        return rc;
 }
-
-static int osc_io_unplug_async(const struct lu_env *env,
-                               struct client_obd *cli, struct osc_object *osc)
-{
-       return osc_io_unplug0(env, cli, osc, 1);
-}
-
-void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
-                  struct osc_object *osc)
-{
-       (void)osc_io_unplug0(env, cli, osc, 0);
-}
+EXPORT_SYMBOL(osc_io_unplug0);
 
 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
                        struct page *page, loff_t offset)
@@ -2281,17 +2333,18 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
        oap->oap_obj_off = offset;
        LASSERT(!(offset & ~PAGE_MASK));
 
-       if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE))
+       if (cfs_capable(CFS_CAP_SYS_RESOURCE))
                oap->oap_brw_flags = OBD_BRW_NOQUOTA;
 
        INIT_LIST_HEAD(&oap->oap_pending_item);
        INIT_LIST_HEAD(&oap->oap_rpc_item);
 
        spin_lock_init(&oap->oap_lock);
-       CDEBUG(D_INFO, "oap %p page %p obj off "LPU64"\n",
+       CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
               oap, page, oap->oap_obj_off);
        RETURN(0);
 }
+EXPORT_SYMBOL(osc_prep_async_page);
 
 int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                       struct osc_page *ops)
@@ -2322,8 +2375,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
 
        /* Set the OBD_BRW_SRVLOCK before the page is queued. */
        brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
-       if (!client_is_remote(osc_export(osc)) &&
-           cfs_capable(CFS_CAP_SYS_RESOURCE)) {
+       if (cfs_capable(CFS_CAP_SYS_RESOURCE)) {
                brw_flags |= OBD_BRW_NOQUOTA;
                cmd |= OBD_BRW_NOQUOTA;
        }
@@ -2332,7 +2384,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
        if (!(cmd & OBD_BRW_NOQUOTA)) {
                struct cl_object *obj;
                struct cl_attr   *attr;
-               unsigned int qid[MAXQUOTAS];
+               unsigned int qid[LL_MAXQUOTAS];
 
                obj = cl_object_top(&osc->oo_cl);
                attr = &osc_env_info(env)->oti_attr;
@@ -2343,6 +2395,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
 
                qid[USRQUOTA] = attr->cat_uid;
                qid[GRPQUOTA] = attr->cat_gid;
+               qid[PRJQUOTA] = attr->cat_projid;
                if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA)
                        rc = -EDQUOT;
                if (rc)
@@ -2455,7 +2508,11 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                ++ext->oe_nr_pages;
                list_add_tail(&oap->oap_pending_item, &ext->oe_pages);
                osc_object_unlock(osc);
+
+               if (!ext->oe_layout_version)
+                       ext->oe_layout_version = io->ci_layout_version;
        }
+
        RETURN(rc);
 }
 
@@ -2641,8 +2698,9 @@ int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
        RETURN(rc);
 }
 
-int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
-                        struct list_head *list, int cmd, int brw_flags)
+int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+                        struct osc_object *obj, struct list_head *list,
+                        int brw_flags)
 {
        struct client_obd     *cli = osc_cli(obj);
        struct osc_extent     *ext;
@@ -2680,7 +2738,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
                RETURN(-ENOMEM);
        }
 
-       ext->oe_rw = !!(cmd & OBD_BRW_READ);
+       ext->oe_rw = !!(brw_flags & OBD_BRW_READ);
        ext->oe_sync = 1;
        ext->oe_no_merge = !can_merge;
        ext->oe_urgent = 1;
@@ -2688,14 +2746,16 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
        ext->oe_end = ext->oe_max_end = end;
        ext->oe_obj = obj;
        ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
+       ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
        ext->oe_nr_pages = page_count;
        ext->oe_mppr = mppr;
        list_splice_init(list, &ext->oe_pages);
+       ext->oe_layout_version = io->ci_layout_version;
 
        osc_object_lock(obj);
        /* Reuse the initial refcount for RPC, don't drop it */
        osc_extent_state_set(ext, OES_LOCK_DONE);
-       if (cmd & OBD_BRW_WRITE) {
+       if (!ext->oe_rw) { /* write */
                list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
                osc_update_pending(obj, OBD_BRW_WRITE, page_count);
        } else {
@@ -2748,7 +2808,7 @@ again:
                        break;
                }
 
-               OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:"LPU64".\n", size);
+               OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:%llu.\n", size);
 
                osc_extent_get(ext);
                if (ext->oe_state == OES_ACTIVE) {
@@ -2764,8 +2824,12 @@ again:
                        osc_update_pending(obj, OBD_BRW_WRITE,
                                           -ext->oe_nr_pages);
                }
-               EASSERT(list_empty(&ext->oe_link), ext);
-               list_add_tail(&ext->oe_link, &list);
+               /* This extent could be on the full extents list, that's OK */
+               EASSERT(!ext->oe_hp && !ext->oe_urgent, ext);
+               if (!list_empty(&ext->oe_link))
+                       list_move_tail(&ext->oe_link, &list);
+               else
+                       list_add_tail(&ext->oe_link, &list);
 
                ext = next_extent(ext);
        }
@@ -2811,7 +2875,7 @@ again:
                        LASSERT(*extp == NULL);
                        *extp = osc_extent_get(ext);
                        OSC_EXTENT_DUMP(D_CACHE, ext,
-                                       "trunc at "LPU64"\n", size);
+                                       "trunc at %llu\n", size);
                }
                osc_extent_put(env, ext);
        }
@@ -2830,6 +2894,7 @@ again:
        }
        RETURN(result);
 }
+EXPORT_SYMBOL(osc_cache_truncate_start);
 
 /**
  * Called after osc_io_setattr_end to add oio->oi_trunc back to cache.
@@ -2916,6 +2981,7 @@ again:
        OSC_IO_DEBUG(obj, "sync file range.\n");
        RETURN(result);
 }
+EXPORT_SYMBOL(osc_cache_wait_range);
 
 /**
  * Called to write out a range of osc object.
@@ -3032,6 +3098,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
        OSC_IO_DEBUG(obj, "pageout [%lu, %lu], %d.\n", start, end, result);
        RETURN(result);
 }
+EXPORT_SYMBOL(osc_cache_writeback_range);
 
 /**
  * Returns a list of pages by a given [start, end] of \a obj.
@@ -3050,6 +3117,7 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
                        osc_page_gang_cbt cb, void *cbdata)
 {
        struct osc_page *ops;
+       struct pagevec  *pagevec;
        void            **pvec;
        pgoff_t         idx;
        unsigned int    nr;
@@ -3061,6 +3129,8 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
 
        idx = start;
        pvec = osc_env_info(env)->oti_pvec;
+       pagevec = &osc_env_info(env)->oti_pagevec;
+       ll_pagevec_init(pagevec, 0);
        spin_lock(&osc->oo_tree_lock);
        while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
                                            idx, OTI_PVEC_SIZE)) > 0) {
@@ -3107,8 +3177,10 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
 
                        page = ops->ops_cl.cpl_page;
                        lu_ref_del(&page->cp_reference, "gang_lookup", current);
-                       cl_page_put(env, page);
+                       cl_pagevec_put(env, page, pagevec);
                }
+               pagevec_release(pagevec);
+
                if (nr < OTI_PVEC_SIZE || end_of_region)
                        break;
 
@@ -3124,6 +3196,7 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
                spin_unlock(&osc->oo_tree_lock);
        RETURN(res);
 }
+EXPORT_SYMBOL(osc_page_gang_lookup);
 
 /**
  * Check if page @page is covered by an extra lock or discard it.
@@ -3166,8 +3239,8 @@ static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
        return CLP_GANG_OKAY;
 }
 
-static int discard_cb(const struct lu_env *env, struct cl_io *io,
-                     struct osc_page *ops, void *cbdata)
+int osc_discard_cb(const struct lu_env *env, struct cl_io *io,
+                  struct osc_page *ops, void *cbdata)
 {
        struct osc_thread_info *info = osc_env_info(env);
        struct cl_page *page = ops->ops_cl.cpl_page;
@@ -3189,6 +3262,7 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io,
 
        return CLP_GANG_OKAY;
 }
+EXPORT_SYMBOL(osc_discard_cb);
 
 /**
  * Discard pages protected by the given lock. This function traverses radix
@@ -3199,10 +3273,10 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io,
  * behind this being that lock cancellation cannot be delayed indefinitely).
  */
 int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
-                          pgoff_t start, pgoff_t end, enum cl_lock_mode mode)
+                          pgoff_t start, pgoff_t end, bool discard)
 {
        struct osc_thread_info *info = osc_env_info(env);
-       struct cl_io *io = &info->oti_io;
+       struct cl_io *io = osc_env_thread_io(env);
        osc_page_gang_cbt cb;
        int res;
        int result;
@@ -3215,7 +3289,7 @@ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
        if (result != 0)
                GOTO(out, result);
 
-       cb = mode == CLM_READ ? check_and_discard_cb : discard_cb;
+       cb = discard ? osc_discard_cb : check_and_discard_cb;
        info->oti_fn_index = info->oti_next_index = start;
        do {
                res = osc_page_gang_lookup(env, io, osc,