Whamcloud - gitweb
LU-11423 osc: Do not walk full extent list
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index 27a034e..7524899 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  *
  */
 /*
@@ -41,7 +37,8 @@
 
 #define DEBUG_SUBSYSTEM S_OSC
 
-#include "osc_cl_internal.h"
+#include <lustre_osc.h>
+
 #include "osc_internal.h"
 
 static int extent_debug; /* set it to be true for more debug */
@@ -229,7 +226,9 @@ static int osc_extent_sanity_check0(struct osc_extent *ext,
        if (ext->oe_sync && ext->oe_grants > 0)
                GOTO(out, rc = 90);
 
-       if (ext->oe_dlmlock != NULL && !ldlm_is_failed(ext->oe_dlmlock)) {
+       if (ext->oe_dlmlock != NULL &&
+           ext->oe_dlmlock->l_resource->lr_type == LDLM_EXTENT &&
+           !ldlm_is_failed(ext->oe_dlmlock)) {
                struct ldlm_extent *extent;
 
                extent = &ext->oe_dlmlock->l_policy_data.l_extent;
@@ -523,7 +522,7 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
                return -ERANGE;
 
        LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
-       ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_CACHE_SHIFT;
+       ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
        chunk_start = cur->oe_start >> ppc_bits;
        chunk_end   = cur->oe_end   >> ppc_bits;
        if (chunk_start   != (victim->oe_end >> ppc_bits) + 1 &&
@@ -598,6 +597,10 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
                        if (ext->oe_urgent)
                                list_move_tail(&ext->oe_link,
                                               &obj->oo_urgent_exts);
+                       else if (ext->oe_nr_pages == ext->oe_mppr) {
+                               list_move_tail(&ext->oe_link,
+                                              &obj->oo_full_exts);
+                       }
                }
                osc_object_unlock(obj);
 
@@ -696,7 +699,7 @@ restart:
                pgoff_t ext_chk_end   = ext->oe_end   >> ppc_bits;
 
                LASSERT(sanity_check_nolock(ext) == 0);
-               if (chunk > ext_chk_end + 1)
+               if (chunk > ext_chk_end + 1 || chunk < ext_chk_start)
                        break;
 
                /* if covering by different locks, no chance to match */
@@ -776,6 +779,7 @@ restart:
                        /* pull ext's start back to cover cur */
                        ext->oe_start   = cur->oe_start;
                        ext->oe_grants += chunksize;
+                       LASSERT(*grants >= chunksize);
                        *grants -= chunksize;
 
                        found = osc_extent_hold(ext);
@@ -783,6 +787,7 @@ restart:
                        /* rear merge */
                        ext->oe_end     = cur->oe_end;
                        ext->oe_grants += chunksize;
+                       LASSERT(*grants >= chunksize);
                        *grants -= chunksize;
 
                        /* try to merge with the next one because we just fill
@@ -811,8 +816,8 @@ restart:
                /* create a new extent */
                EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
                cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
+               LASSERT(*grants >= cur->oe_grants);
                *grants -= cur->oe_grants;
-               LASSERT(*grants >= 0);
 
                cur->oe_state = OES_CACHE;
                found = osc_extent_hold(cur);
@@ -839,7 +844,6 @@ restart:
 
 out:
        osc_extent_put(env, cur);
-       LASSERT(*grants >= 0);
        return found;
 }
 
@@ -881,8 +885,8 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
 
        if (!sent) {
                lost_grant = ext->oe_grants;
-       } else if (blocksize < PAGE_CACHE_SIZE &&
-                  last_count != PAGE_CACHE_SIZE) {
+       } else if (blocksize < PAGE_SIZE &&
+                  last_count != PAGE_SIZE) {
                /* For short writes we shouldn't count parts of pages that
                 * span a whole chunk on the OST side, or our accounting goes
                 * wrong.  Should match the code in filter_grant_check. */
@@ -892,7 +896,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                if (end)
                        count += blocksize - end;
 
-               lost_grant = PAGE_CACHE_SIZE - count;
+               lost_grant = PAGE_SIZE - count;
        }
        if (ext->oe_grants > 0)
                osc_free_grant(cli, nr_pages, lost_grant, ext->oe_grants);
@@ -974,7 +978,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
        struct osc_async_page *tmp;
        int                    pages_in_chunk = 0;
        int                    ppc_bits    = cli->cl_chunkbits -
-                                            PAGE_CACHE_SHIFT;
+                                            PAGE_SHIFT;
        __u64                  trunc_chunk = trunc_index >> ppc_bits;
        int                    grants   = 0;
        int                    nr_pages = 0;
@@ -990,7 +994,10 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
         * We can't use that env from osc_cache_truncate_start() because
         * it's from lov_io_sub and not fully initialized. */
        env = cl_env_get(&refcheck);
-       io  = &osc_env_info(env)->oti_io;
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       io  = osc_env_thread_io(env);
        io->ci_obj = cl_object_top(osc2cl(obj));
        io->ci_ignore_layout = 1;
        rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
@@ -1132,7 +1139,7 @@ static int osc_extent_make_ready(const struct lu_env *env,
        if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
                int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
                LASSERT(last_oap_count > 0);
-               LASSERT(last->oap_page_off + last_oap_count <= PAGE_CACHE_SIZE);
+               LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE);
                last->oap_count = last_oap_count;
                spin_lock(&last->oap_lock);
                last->oap_async_flags |= ASYNC_COUNT_STABLE;
@@ -1143,7 +1150,7 @@ static int osc_extent_make_ready(const struct lu_env *env,
         * because it's known they are not the last page */
        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
-                       oap->oap_count = PAGE_CACHE_SIZE - oap->oap_page_off;
+                       oap->oap_count = PAGE_SIZE - oap->oap_page_off;
                        spin_lock(&oap->oap_lock);
                        oap->oap_async_flags |= ASYNC_COUNT_STABLE;
                        spin_unlock(&oap->oap_lock);
@@ -1170,7 +1177,7 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index,
        struct osc_object *obj = ext->oe_obj;
        struct client_obd *cli = osc_cli(obj);
        struct osc_extent *next;
-       int ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
+       int ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
        pgoff_t chunk = index >> ppc_bits;
        pgoff_t end_chunk;
        pgoff_t end_index;
@@ -1205,8 +1212,8 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index,
 
        ext->oe_end = end_index;
        ext->oe_grants += chunksize;
+       LASSERT(*grants >= chunksize);
        *grants -= chunksize;
-       LASSERT(*grants >= 0);
        EASSERTF(osc_extent_is_overlapped(obj, ext) == 0, ext,
                 "overlapped after expanding for %lu.\n", index);
        EXIT;
@@ -1222,6 +1229,9 @@ static void osc_extent_tree_dump0(int level, struct osc_object *obj,
        struct osc_extent *ext;
        int cnt;
 
+       if (!cfs_cdebug_show(level, DEBUG_SUBSYSTEM))
+               return;
+
        CDEBUG(level, "Dump object %p extents at %s:%d, mppr: %u.\n",
               obj, func, line, osc_cli(obj)->cl_max_pages_per_rpc);
 
@@ -1276,7 +1286,7 @@ static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
        ENTRY;
        result = cl_page_make_ready(env, page, CRT_WRITE);
        if (result == 0)
-               opg->ops_submit_time = cfs_time_current();
+               opg->ops_submit_time = ktime_get();
        RETURN(result);
 }
 
@@ -1287,7 +1297,6 @@ static int osc_refresh_count(const struct lu_env *env,
        pgoff_t index = osc_index(oap2osc(oap));
        struct cl_object *obj;
        struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
-
        int result;
        loff_t kms;
 
@@ -1307,9 +1316,9 @@ static int osc_refresh_count(const struct lu_env *env,
                return 0;
        else if (cl_offset(obj, index + 1) > kms)
                /* catch sub-page write at end of file */
-               return kms % PAGE_CACHE_SIZE;
+               return kms % PAGE_SIZE;
        else
-               return PAGE_CACHE_SIZE;
+               return PAGE_SIZE;
 }
 
 static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
@@ -1333,7 +1342,7 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        /* Clear opg->ops_transfer_pinned before VM lock is released. */
        opg->ops_transfer_pinned = 0;
 
-       opg->ops_submit_time = 0;
+       opg->ops_submit_time = ktime_set(0, 0);
        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
 
        /* statistic */
@@ -1388,7 +1397,7 @@ static void osc_consume_write_grant(struct client_obd *cli,
        cli->cl_dirty_pages++;
        pga->flag |= OBD_BRW_FROM_GRANT;
        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
-              PAGE_CACHE_SIZE, pga, pga->pg);
+              PAGE_SIZE, pga, pga->pg);
        osc_update_next_shrink(cli);
 }
 
@@ -1469,7 +1478,7 @@ static void osc_unreserve_grant(struct client_obd *cli,
  * used, we should return these grants to OST. There're two cases where grants
  * can be lost:
  * 1. truncate;
- * 2. blocksize at OST is less than PAGE_CACHE_SIZE and a partial page was
+ * 2. blocksize at OST is less than PAGE_SIZE and a partial page was
  *    written. In this case OST may use less chunks to serve this partial
  *    write. OSTs don't actually know the page size on the client side. so
  *    clients have to calculate lost grant by the blocksize on the OST.
@@ -1497,7 +1506,7 @@ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
        spin_unlock(&cli->cl_loi_list_lock);
        CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu/%lu\n",
               lost_grant, cli->cl_lost_grant,
-              cli->cl_avail_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT,
+              cli->cl_avail_grant, cli->cl_dirty_pages << PAGE_SHIFT,
               cli->cl_dirty_grant);
 }
 
@@ -1697,6 +1706,7 @@ wakeup:
 
        EXIT;
 }
+EXPORT_SYMBOL(osc_wake_cache_waiters);
 
 static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
 {
@@ -1742,9 +1752,10 @@ static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
                        CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
                        RETURN(1);
                }
-               if (atomic_read(&osc->oo_nr_writes) >=
-                   cli->cl_max_pages_per_rpc)
+               if (!list_empty(&osc->oo_full_exts)) {
+                       CDEBUG(D_CACHE, "full extent ready, make an RPC\n");
                        RETURN(1);
+               }
        } else {
                if (atomic_read(&osc->oo_nr_reads) == 0)
                        RETURN(0);
@@ -1883,6 +1894,22 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
        EXIT;
 }
 
+struct extent_rpc_data {
+       struct list_head        *erd_rpc_list;
+       unsigned int            erd_page_count;
+       unsigned int            erd_max_pages;
+       unsigned int            erd_max_chunks;
+       unsigned int            erd_max_extents;
+};
+
+static inline unsigned osc_extent_chunks(const struct osc_extent *ext)
+{
+       struct client_obd *cli = osc_cli(ext->oe_obj);
+       unsigned ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
+
+       return (ext->oe_end >> ppc_bits) - (ext->oe_start >> ppc_bits) + 1;
+}
+
 /**
  * Try to add extent to one RPC. We need to think about the following things:
  * - # of pages must not be over max_pages_per_rpc
@@ -1890,10 +1917,10 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
  */
 static int try_to_add_extent_for_io(struct client_obd *cli,
                                    struct osc_extent *ext,
-                                   struct list_head *rpclist,
-                                   unsigned int *pc, unsigned int *max_pages)
+                                   struct extent_rpc_data *data)
 {
        struct osc_extent *tmp;
+       unsigned int chunk_count;
        struct osc_async_page *oap = list_first_entry(&ext->oe_pages,
                                                      struct osc_async_page,
                                                      oap_pending_item);
@@ -1901,12 +1928,30 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
 
        EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE),
                ext);
+       OSC_EXTENT_DUMP(D_CACHE, ext, "trying to add this extent\n");
+
+       if (data->erd_max_extents == 0)
+               RETURN(0);
+
+       chunk_count = osc_extent_chunks(ext);
+       EASSERTF(data->erd_page_count != 0 ||
+                chunk_count <= data->erd_max_chunks, ext,
+                "The first extent to be fit in a RPC contains %u chunks, "
+                "which is over the limit %u.\n", chunk_count,
+                data->erd_max_chunks);
+       if (chunk_count > data->erd_max_chunks)
+               RETURN(0);
 
-       *max_pages = max(ext->oe_mppr, *max_pages);
-       if (*pc + ext->oe_nr_pages > *max_pages)
+       data->erd_max_pages = max(ext->oe_mppr, data->erd_max_pages);
+       EASSERTF(data->erd_page_count != 0 ||
+               ext->oe_nr_pages <= data->erd_max_pages, ext,
+               "The first extent to be fit in a RPC contains %u pages, "
+               "which is over the limit %u.\n", ext->oe_nr_pages,
+               data->erd_max_pages);
+       if (data->erd_page_count + ext->oe_nr_pages > data->erd_max_pages)
                RETURN(0);
 
-       list_for_each_entry(tmp, rpclist, oe_link) {
+       list_for_each_entry(tmp, data->erd_rpc_list, oe_link) {
                struct osc_async_page *oap2;
                oap2 = list_first_entry(&tmp->oe_pages, struct osc_async_page,
                                        oap_pending_item);
@@ -1918,13 +1963,14 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
                }
 #endif
                if (oap2cl_page(oap)->cp_type != oap2cl_page(oap2)->cp_type) {
-                       CDEBUG(D_CACHE, "Do not permit different type of IO"
-                                       " for a same RPC\n");
+                       CDEBUG(D_CACHE, "Do not permit different types of IO "
+                              "in one RPC\n");
                        RETURN(0);
                }
 
                if (tmp->oe_srvlock != ext->oe_srvlock ||
                    !tmp->oe_grants != !ext->oe_grants ||
+                   tmp->oe_ndelay != ext->oe_ndelay ||
                    tmp->oe_no_merge || ext->oe_no_merge)
                        RETURN(0);
 
@@ -1932,12 +1978,44 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
                break;
        }
 
-       *pc += ext->oe_nr_pages;
-       list_move_tail(&ext->oe_link, rpclist);
+       data->erd_max_extents--;
+       data->erd_max_chunks -= chunk_count;
+       data->erd_page_count += ext->oe_nr_pages;
+       list_move_tail(&ext->oe_link, data->erd_rpc_list);
        ext->oe_owner = current;
        RETURN(1);
 }
 
+static inline unsigned osc_max_write_chunks(const struct client_obd *cli)
+{
+       /*
+        * LU-8135:
+        *
+        * The maximum size of a single transaction is about 64MB in ZFS.
+        * #define DMU_MAX_ACCESS (64 * 1024 * 1024)
+        *
+        * Since ZFS is a copy-on-write file system, a single dirty page in
+        * a chunk will result in the rewrite of the whole chunk, therefore
+        * an RPC shouldn't be allowed to contain too many chunks otherwise
+        * it will make transaction size much bigger than 64MB, especially
+        * with big block size for ZFS.
+        *
+        * This piece of code is to make sure that OSC won't send write RPCs
+        * with too many chunks. The maximum chunk size that an RPC can cover
+        * is set to PTLRPC_MAX_BRW_SIZE, which is defined to 16MB. Ideally
+        * OST should tell the client what the biggest transaction size is,
+        * but it's good enough for now.
+        *
+        * This limitation doesn't apply to ldiskfs, which allows as many
+        * chunks in one RPC as we want. However, it won't have any benefits
+        * to have too many discontiguous pages in one RPC.
+        *
+        * An osc_extent won't cover over a RPC size, so the chunks in an
+        * osc_extent won't bigger than PTLRPC_MAX_BRW_SIZE >> chunkbits.
+        */
+       return PTLRPC_MAX_BRW_SIZE >> cli->cl_chunkbits;
+}
+
 /**
  * In order to prevent multiple ptlrpcd from breaking contiguous extents,
  * get_write_extent() takes all appropriate extents in atomic.
@@ -1956,45 +2034,48 @@ static unsigned int get_write_extents(struct osc_object *obj,
 {
        struct client_obd *cli = osc_cli(obj);
        struct osc_extent *ext;
-       unsigned int page_count = 0;
-       unsigned int max_pages = cli->cl_max_pages_per_rpc;
+       struct extent_rpc_data data = {
+               .erd_rpc_list   = rpclist,
+               .erd_page_count = 0,
+               .erd_max_pages  = cli->cl_max_pages_per_rpc,
+               .erd_max_chunks = osc_max_write_chunks(cli),
+               .erd_max_extents = 256,
+       };
 
        LASSERT(osc_object_is_locked(obj));
        while (!list_empty(&obj->oo_hp_exts)) {
                ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
                                 oe_link);
                LASSERT(ext->oe_state == OES_CACHE);
-               if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
-                                             &max_pages))
-                       return page_count;
-               EASSERT(ext->oe_nr_pages <= max_pages, ext);
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       return data.erd_page_count;
+               EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
        }
-       if (page_count == max_pages)
-               return page_count;
+       if (data.erd_page_count == data.erd_max_pages)
+               return data.erd_page_count;
 
        while (!list_empty(&obj->oo_urgent_exts)) {
                ext = list_entry(obj->oo_urgent_exts.next,
                                 struct osc_extent, oe_link);
-               if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
-                                             &max_pages))
-                       return page_count;
-
-               if (!ext->oe_intree)
-                       continue;
-
-               while ((ext = next_extent(ext)) != NULL) {
-                       if ((ext->oe_state != OES_CACHE) ||
-                           (!list_empty(&ext->oe_link) &&
-                            ext->oe_owner != NULL))
-                               continue;
-
-                       if (!try_to_add_extent_for_io(cli, ext, rpclist,
-                                                     &page_count, &max_pages))
-                               return page_count;
-               }
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       return data.erd_page_count;
+       }
+       if (data.erd_page_count == data.erd_max_pages)
+               return data.erd_page_count;
+
+       /* One key difference between full extents and other extents: full
+        * extents can usually only be added if the rpclist was empty, so if we
+        * can't add one, we continue on to trying to add normal extents.  This
+        * is so we don't miss adding extra extents to an RPC containing high
+        * priority or urgent extents. */
+       while (!list_empty(&obj->oo_full_exts)) {
+               ext = list_entry(obj->oo_full_exts.next,
+                                struct osc_extent, oe_link);
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       break;
        }
-       if (page_count == max_pages)
-               return page_count;
+       if (data.erd_page_count == data.erd_max_pages)
+               return data.erd_page_count;
 
        ext = first_extent(obj);
        while (ext != NULL) {
@@ -2005,13 +2086,12 @@ static unsigned int get_write_extents(struct osc_object *obj,
                        continue;
                }
 
-               if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
-                                             &max_pages))
-                       return page_count;
+               if (!try_to_add_extent_for_io(cli, ext, &data))
+                       return data.erd_page_count;
 
                ext = next_extent(ext);
        }
-       return page_count;
+       return data.erd_page_count;
 }
 
 static int
@@ -2096,29 +2176,31 @@ __must_hold(osc)
        struct osc_extent *ext;
        struct osc_extent *next;
        struct list_head rpclist = LIST_HEAD_INIT(rpclist);
-       unsigned int page_count = 0;
-       unsigned int max_pages = cli->cl_max_pages_per_rpc;
+       struct extent_rpc_data data = {
+               .erd_rpc_list   = &rpclist,
+               .erd_page_count = 0,
+               .erd_max_pages  = cli->cl_max_pages_per_rpc,
+               .erd_max_chunks = UINT_MAX,
+               .erd_max_extents = UINT_MAX,
+       };
        int rc = 0;
        ENTRY;
 
        LASSERT(osc_object_is_locked(osc));
-       list_for_each_entry_safe(ext, next,
-                                    &osc->oo_reading_exts, oe_link) {
+       list_for_each_entry_safe(ext, next, &osc->oo_reading_exts, oe_link) {
                EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
-               if (!try_to_add_extent_for_io(cli, ext, &rpclist, &page_count,
-                                             &max_pages))
+               if (!try_to_add_extent_for_io(cli, ext, &data))
                        break;
                osc_extent_state_set(ext, OES_RPC);
-               EASSERT(ext->oe_nr_pages <= max_pages, ext);
+               EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
        }
-       LASSERT(page_count <= max_pages);
+       LASSERT(data.erd_page_count <= data.erd_max_pages);
 
-       osc_update_pending(osc, OBD_BRW_READ, -page_count);
+       osc_update_pending(osc, OBD_BRW_READ, -data.erd_page_count);
 
        if (!list_empty(&rpclist)) {
                osc_object_unlock(osc);
 
-               LASSERT(page_count > 0);
                rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ);
                LASSERT(list_empty(&rpclist));
 
@@ -2237,8 +2319,8 @@ __must_hold(&cli->cl_loi_list_lock)
        }
 }
 
-static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
-                         struct osc_object *osc, int async)
+int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
+                  struct osc_object *osc, int async)
 {
        int rc = 0;
 
@@ -2256,18 +2338,7 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
        }
        return rc;
 }
-
-static int osc_io_unplug_async(const struct lu_env *env,
-                               struct client_obd *cli, struct osc_object *osc)
-{
-       return osc_io_unplug0(env, cli, osc, 1);
-}
-
-void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
-                  struct osc_object *osc)
-{
-       (void)osc_io_unplug0(env, cli, osc, 0);
-}
+EXPORT_SYMBOL(osc_io_unplug0);
 
 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
                        struct page *page, loff_t offset)
@@ -2294,10 +2365,11 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
        INIT_LIST_HEAD(&oap->oap_rpc_item);
 
        spin_lock_init(&oap->oap_lock);
-       CDEBUG(D_INFO, "oap %p page %p obj off "LPU64"\n",
+       CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
               oap, page, oap->oap_obj_off);
        RETURN(0);
 }
+EXPORT_SYMBOL(osc_prep_async_page);
 
 int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                       struct osc_page *ops)
@@ -2348,6 +2420,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
 
                qid[USRQUOTA] = attr->cat_uid;
                qid[GRPQUOTA] = attr->cat_gid;
+               qid[PRJQUOTA] = attr->cat_projid;
                if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA)
                        rc = -EDQUOT;
                if (rc)
@@ -2460,7 +2533,11 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                ++ext->oe_nr_pages;
                list_add_tail(&oap->oap_pending_item, &ext->oe_pages);
                osc_object_unlock(osc);
+
+               if (!ext->oe_layout_version)
+                       ext->oe_layout_version = io->ci_layout_version;
        }
+
        RETURN(rc);
 }
 
@@ -2646,8 +2723,9 @@ int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
        RETURN(rc);
 }
 
-int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
-                        struct list_head *list, int cmd, int brw_flags)
+int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+                        struct osc_object *obj, struct list_head *list,
+                        int brw_flags)
 {
        struct client_obd     *cli = osc_cli(obj);
        struct osc_extent     *ext;
@@ -2685,7 +2763,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
                RETURN(-ENOMEM);
        }
 
-       ext->oe_rw = !!(cmd & OBD_BRW_READ);
+       ext->oe_rw = !!(brw_flags & OBD_BRW_READ);
        ext->oe_sync = 1;
        ext->oe_no_merge = !can_merge;
        ext->oe_urgent = 1;
@@ -2693,14 +2771,16 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
        ext->oe_end = ext->oe_max_end = end;
        ext->oe_obj = obj;
        ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
+       ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
        ext->oe_nr_pages = page_count;
        ext->oe_mppr = mppr;
        list_splice_init(list, &ext->oe_pages);
+       ext->oe_layout_version = io->ci_layout_version;
 
        osc_object_lock(obj);
        /* Reuse the initial refcount for RPC, don't drop it */
        osc_extent_state_set(ext, OES_LOCK_DONE);
-       if (cmd & OBD_BRW_WRITE) {
+       if (!ext->oe_rw) { /* write */
                list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
                osc_update_pending(obj, OBD_BRW_WRITE, page_count);
        } else {
@@ -2753,7 +2833,7 @@ again:
                        break;
                }
 
-               OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:"LPU64".\n", size);
+               OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:%llu.\n", size);
 
                osc_extent_get(ext);
                if (ext->oe_state == OES_ACTIVE) {
@@ -2769,8 +2849,12 @@ again:
                        osc_update_pending(obj, OBD_BRW_WRITE,
                                           -ext->oe_nr_pages);
                }
-               EASSERT(list_empty(&ext->oe_link), ext);
-               list_add_tail(&ext->oe_link, &list);
+               /* This extent could be on the full extents list, that's OK */
+               EASSERT(!ext->oe_hp && !ext->oe_urgent, ext);
+               if (!list_empty(&ext->oe_link))
+                       list_move_tail(&ext->oe_link, &list);
+               else
+                       list_add_tail(&ext->oe_link, &list);
 
                ext = next_extent(ext);
        }
@@ -2816,7 +2900,7 @@ again:
                        LASSERT(*extp == NULL);
                        *extp = osc_extent_get(ext);
                        OSC_EXTENT_DUMP(D_CACHE, ext,
-                                       "trunc at "LPU64"\n", size);
+                                       "trunc at %llu\n", size);
                }
                osc_extent_put(env, ext);
        }
@@ -2835,6 +2919,7 @@ again:
        }
        RETURN(result);
 }
+EXPORT_SYMBOL(osc_cache_truncate_start);
 
 /**
  * Called after osc_io_setattr_end to add oio->oi_trunc back to cache.
@@ -2921,6 +3006,7 @@ again:
        OSC_IO_DEBUG(obj, "sync file range.\n");
        RETURN(result);
 }
+EXPORT_SYMBOL(osc_cache_wait_range);
 
 /**
  * Called to write out a range of osc object.
@@ -3037,6 +3123,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
        OSC_IO_DEBUG(obj, "pageout [%lu, %lu], %d.\n", start, end, result);
        RETURN(result);
 }
+EXPORT_SYMBOL(osc_cache_writeback_range);
 
 /**
  * Returns a list of pages by a given [start, end] of \a obj.
@@ -3129,6 +3216,7 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
                spin_unlock(&osc->oo_tree_lock);
        RETURN(res);
 }
+EXPORT_SYMBOL(osc_page_gang_lookup);
 
 /**
  * Check if page @page is covered by an extra lock or discard it.
@@ -3171,8 +3259,8 @@ static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
        return CLP_GANG_OKAY;
 }
 
-static int discard_cb(const struct lu_env *env, struct cl_io *io,
-                     struct osc_page *ops, void *cbdata)
+int osc_discard_cb(const struct lu_env *env, struct cl_io *io,
+                  struct osc_page *ops, void *cbdata)
 {
        struct osc_thread_info *info = osc_env_info(env);
        struct cl_page *page = ops->ops_cl.cpl_page;
@@ -3194,6 +3282,7 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io,
 
        return CLP_GANG_OKAY;
 }
+EXPORT_SYMBOL(osc_discard_cb);
 
 /**
  * Discard pages protected by the given lock. This function traverses radix
@@ -3204,10 +3293,10 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io,
  * behind this being that lock cancellation cannot be delayed indefinitely).
  */
 int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
-                          pgoff_t start, pgoff_t end, enum cl_lock_mode mode)
+                          pgoff_t start, pgoff_t end, bool discard)
 {
        struct osc_thread_info *info = osc_env_info(env);
-       struct cl_io *io = &info->oti_io;
+       struct cl_io *io = osc_env_thread_io(env);
        osc_page_gang_cbt cb;
        int res;
        int result;
@@ -3220,7 +3309,7 @@ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
        if (result != 0)
                GOTO(out, result);
 
-       cb = mode == CLM_READ ? check_and_discard_cb : discard_cb;
+       cb = discard ? osc_discard_cb : check_and_discard_cb;
        info->oti_fn_index = info->oti_next_index = start;
        do {
                res = osc_page_gang_lookup(env, io, osc,