Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / osc / osc_request.c
index a41abd8..e76f8ea 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #define DEBUG_SUBSYSTEM S_OSC
 #include <libcfs/libcfs.h>
 #include <linux/falloc.h>
 #include <lprocfs_status.h>
-#include <lustre_debug.h>
 #include <lustre_dlm.h>
 #include <lustre_fid.h>
 #include <lustre_ha.h>
 #include <uapi/linux/lustre/lustre_ioctl.h>
+#include <lustre_ioctl_old.h>
 #include <lustre_net.h>
 #include <lustre_obdo.h>
+#include <lustre_osc.h>
 #include <obd.h>
 #include <obd_cksum.h>
 #include <obd_class.h>
-#include <lustre_osc.h>
-#include <linux/falloc.h>
 
 #include "osc_internal.h"
+#include <lnet/lnet_rdma.h>
 
 atomic_t osc_pool_req_count;
 unsigned int osc_reqpool_maxreqcount;
@@ -87,7 +86,7 @@ static void osc_release_ppga(struct brw_page **ppga, size_t count);
 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
                         void *data, int rc);
 
-void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
+static void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
 {
        struct ost_body *body;
 
@@ -450,14 +449,7 @@ int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
        int rc;
        ENTRY;
 
-       /*
-        * Only mode == 0 (which is standard prealloc) is supported now.
-        * Punch is not supported yet.
-        */
-       if (mode & ~FALLOC_FL_KEEP_SIZE)
-               RETURN(-EOPNOTSUPP);
        oa->o_falloc_mode = mode;
-
        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
                                   &RQF_OST_FALLOCATE);
        if (req == NULL)
@@ -476,7 +468,7 @@ int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
 
        ptlrpc_request_set_replen(req);
 
-       req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
+       req->rq_interpret_reply = osc_setattr_interpret;
        BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
        sa = ptlrpc_req_async_args(sa, req);
        sa->sa_oa = oa;
@@ -487,6 +479,7 @@ int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
 
        RETURN(0);
 }
+EXPORT_SYMBOL(osc_fallocate_base);
 
 static int osc_sync_interpret(const struct lu_env *env,
                              struct ptlrpc_request *req, void *args, int rc)
@@ -589,7 +582,7 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
                RETURN(0);
 
        ostid_build_res_name(&oa->o_oi, &res_id);
-       res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
+       res = ldlm_resource_get(ns, &res_id, 0, 0);
        if (IS_ERR(res))
                RETURN(0);
 
@@ -699,7 +692,7 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
 
        oa->o_valid |= bits;
        spin_lock(&cli->cl_loi_list_lock);
-       if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
+       if (cli->cl_ocd_grant_param)
                oa->o_dirty = cli->cl_dirty_grant;
        else
                oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
@@ -730,13 +723,12 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
                nrpages *= cli->cl_max_rpcs_in_flight + 1;
                nrpages = max(nrpages, cli->cl_dirty_max_pages);
                undirty = nrpages << PAGE_SHIFT;
-               if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
-                                GRANT_PARAM)) {
+               if (cli->cl_ocd_grant_param) {
                        int nrextents;
 
                        /* take extent tax into account when asking for more
                         * grant space */
-                       nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
+                       nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
                                     cli->cl_max_extent_pages;
                        undirty += nrextents * cli->cl_grant_extent_tax;
                }
@@ -771,6 +763,7 @@ void osc_update_next_shrink(struct client_obd *cli)
        CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
               cli->cl_next_shrink_grant);
 }
+EXPORT_SYMBOL(osc_update_next_shrink);
 
 static void __osc_update_grant(struct client_obd *cli, u64 grant)
 {
@@ -979,6 +972,7 @@ void osc_schedule_grant_work(void)
        cancel_delayed_work_sync(&work);
        schedule_work(&work.work);
 }
+EXPORT_SYMBOL(osc_schedule_grant_work);
 
 /**
  * Start grant thread for returing grant to server for idle clients.
@@ -1062,10 +1056,10 @@ void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
                                             ~chunk_mask) & chunk_mask;
                /* determine maximum extent size, in #pages */
                size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
-               cli->cl_max_extent_pages = size >> PAGE_SHIFT;
-               if (cli->cl_max_extent_pages == 0)
-                       cli->cl_max_extent_pages = 1;
+               cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
+               cli->cl_ocd_grant_param = 1;
        } else {
+               cli->cl_ocd_grant_param = 0;
                cli->cl_grant_extent_tax = 0;
                cli->cl_chunkbits = PAGE_SHIFT;
                cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
@@ -1097,27 +1091,27 @@ static void handle_short_read(int nob_read, size_t page_count,
         while (nob_read > 0) {
                 LASSERT (page_count > 0);
 
-               if (pga[i]->count > nob_read) {
+               if (pga[i]->bp_count > nob_read) {
                        /* EOF inside this page */
-                       ptr = kmap(pga[i]->pg) +
-                               (pga[i]->off & ~PAGE_MASK);
-                       memset(ptr + nob_read, 0, pga[i]->count - nob_read);
-                       kunmap(pga[i]->pg);
+                       ptr = kmap(pga[i]->bp_page) +
+                               (pga[i]->bp_off & ~PAGE_MASK);
+                       memset(ptr + nob_read, 0, pga[i]->bp_count - nob_read);
+                       kunmap(pga[i]->bp_page);
                        page_count--;
                        i++;
                        break;
                }
 
-                nob_read -= pga[i]->count;
+                nob_read -= pga[i]->bp_count;
                 page_count--;
                 i++;
         }
 
        /* zero remaining pages */
        while (page_count-- > 0) {
-               ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
-               memset(ptr, 0, pga[i]->count);
-               kunmap(pga[i]->pg);
+               ptr = kmap(pga[i]->bp_page) + (pga[i]->bp_off & ~PAGE_MASK);
+               memset(ptr, 0, pga[i]->bp_count);
+               kunmap(pga[i]->bp_page);
                i++;
        }
 }
@@ -1163,22 +1157,23 @@ static int check_write_rcs(struct ptlrpc_request *req,
 
 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
 {
-        if (p1->flag != p2->flag) {
+        if (p1->bp_flag != p2->bp_flag) {
                unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
                                  OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
-                                 OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
+                                 OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC |
+                                 OBD_BRW_SYS_RESOURCE);
 
                 /* warn if we try to combine flags that we don't know to be
                  * safe to combine */
-                if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
+                if (unlikely((p1->bp_flag & mask) != (p2->bp_flag & mask))) {
                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
                               "report this at https://jira.whamcloud.com/\n",
-                              p1->flag, p2->flag);
+                              p1->bp_flag, p2->bp_flag);
                 }
                 return 0;
         }
 
-        return (p1->off + p1->count == p2->off);
+        return (p1->bp_off + p1->bp_count == p2->bp_off);
 }
 
 #if IS_ENABLED(CONFIG_CRC_T10DIF)
@@ -1186,20 +1181,20 @@ static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
                                   size_t pg_count, struct brw_page **pga,
                                   int opc, obd_dif_csum_fn *fn,
                                   int sector_size,
-                                  u32 *check_sum)
+                                  u32 *check_sum, bool resend)
 {
        struct ahash_request *req;
        /* Used Adler as the default checksum type on top of DIF tags */
        unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
        struct page *__page;
        unsigned char *buffer;
-       __u16 *guard_start;
-       unsigned int bufsize;
+       __be16 *guard_start;
        int guard_number;
        int used_number = 0;
        int used;
        u32 cksum;
-       int rc = 0;
+       unsigned int bufsize = sizeof(cksum);
+       int rc = 0, rc2;
        int i = 0;
 
        LASSERT(pg_count > 0);
@@ -1217,64 +1212,81 @@ static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
        }
 
        buffer = kmap(__page);
-       guard_start = (__u16 *)buffer;
+       guard_start = (__be16 *)buffer;
        guard_number = PAGE_SIZE / sizeof(*guard_start);
+       CDEBUG(D_PAGE | (resend ? D_HA : 0),
+              "GRD tags per page=%u, resend=%u, bytes=%u, pages=%zu\n",
+              guard_number, resend, nob, pg_count);
+
        while (nob > 0 && pg_count > 0) {
-               unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
+               int off = pga[i]->bp_off & ~PAGE_MASK;
+               unsigned int count =
+                       pga[i]->bp_count > nob ? nob : pga[i]->bp_count;
+               int guards_needed = DIV_ROUND_UP(off + count, sector_size) -
+                                       (off / sector_size);
+
+               if (guards_needed > guard_number - used_number) {
+                       cfs_crypto_hash_update_page(req, __page, 0,
+                               used_number * sizeof(*guard_start));
+                       used_number = 0;
+               }
 
                /* corrupt the data before we compute the checksum, to
                 * simulate an OST->client data error */
                if (unlikely(i == 0 && opc == OST_READ &&
-                            OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
-                       unsigned char *ptr = kmap(pga[i]->pg);
-                       int off = pga[i]->off & ~PAGE_MASK;
+                            CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
+                       unsigned char *ptr = kmap(pga[i]->bp_page);
 
                        memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
-                       kunmap(pga[i]->pg);
+                       kunmap(pga[i]->bp_page);
                }
 
                /*
                 * The left guard number should be able to hold checksums of a
                 * whole page
                 */
-               rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
-                                                 pga[i]->off & ~PAGE_MASK,
+               rc = obd_page_dif_generate_buffer(obd_name, pga[i]->bp_page,
+                                                 pga[i]->bp_off & ~PAGE_MASK,
                                                  count,
                                                  guard_start + used_number,
                                                  guard_number - used_number,
                                                  &used, sector_size,
                                                  fn);
+               if (unlikely(resend))
+                       CDEBUG(D_PAGE | D_HA,
+                              "pga[%u]: used %u off %llu+%u gen checksum: %*phN\n",
+                              i, used, pga[i]->bp_off & ~PAGE_MASK, count,
+                              (int)(used * sizeof(*guard_start)),
+                              guard_start + used_number);
                if (rc)
                        break;
 
                used_number += used;
-               if (used_number == guard_number) {
-                       cfs_crypto_hash_update_page(req, __page, 0,
-                               used_number * sizeof(*guard_start));
-                       used_number = 0;
-               }
-
-               nob -= pga[i]->count;
+               nob -= pga[i]->bp_count;
                pg_count--;
                i++;
        }
        kunmap(__page);
        if (rc)
-               GOTO(out, rc);
+               GOTO(out_hash, rc);
 
        if (used_number != 0)
                cfs_crypto_hash_update_page(req, __page, 0,
                        used_number * sizeof(*guard_start));
 
-       bufsize = sizeof(cksum);
-       cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
-
-       /* For sending we only compute the wrong checksum instead
-        * of corrupting the data so it is still correct on a redo */
-       if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
-               cksum++;
+out_hash:
+       rc2 = cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
+       if (!rc)
+               rc = rc2;
+       if (rc == 0) {
+               /* For sending we only compute the wrong checksum instead
+                * of corrupting the data so it is still correct on a redo */
+               if (opc == OST_WRITE &&
+                               CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+                       cksum++;
 
-       *check_sum = cksum;
+               *check_sum = cksum;
+       }
 out:
        __free_page(__page);
        return rc;
@@ -1282,7 +1294,7 @@ out:
 #else /* !CONFIG_CRC_T10DIF */
 #define obd_dif_ip_fn NULL
 #define obd_dif_crc_fn NULL
-#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum \
+#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum, re) \
        -EOPNOTSUPP
 #endif /* CONFIG_CRC_T10DIF */
 
@@ -1306,25 +1318,26 @@ static int osc_checksum_bulk(int nob, size_t pg_count,
        }
 
        while (nob > 0 && pg_count > 0) {
-               unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
+               unsigned int count =
+                       pga[i]->bp_count > nob ? nob : pga[i]->bp_count;
 
                /* corrupt the data before we compute the checksum, to
                 * simulate an OST->client data error */
                if (i == 0 && opc == OST_READ &&
-                   OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
-                       unsigned char *ptr = kmap(pga[i]->pg);
-                       int off = pga[i]->off & ~PAGE_MASK;
+                   CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
+                       unsigned char *ptr = kmap(pga[i]->bp_page);
+                       int off = pga[i]->bp_off & ~PAGE_MASK;
 
                        memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
-                       kunmap(pga[i]->pg);
+                       kunmap(pga[i]->bp_page);
                }
-               cfs_crypto_hash_update_page(req, pga[i]->pg,
-                                           pga[i]->off & ~PAGE_MASK,
+               cfs_crypto_hash_update_page(req, pga[i]->bp_page,
+                                           pga[i]->bp_off & ~PAGE_MASK,
                                            count);
-               LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
-                              (int)(pga[i]->off & ~PAGE_MASK));
+               LL_CDEBUG_PAGE(D_PAGE, pga[i]->bp_page, "off %d\n",
+                              (int)(pga[i]->bp_off & ~PAGE_MASK));
 
-               nob -= pga[i]->count;
+               nob -= pga[i]->bp_count;
                pg_count--;
                i++;
        }
@@ -1334,7 +1347,7 @@ static int osc_checksum_bulk(int nob, size_t pg_count,
 
        /* For sending we only compute the wrong checksum instead
         * of corrupting the data so it is still correct on a redo */
-       if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+       if (opc == OST_WRITE && CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
                (*cksum)++;
 
        return 0;
@@ -1344,7 +1357,7 @@ static int osc_checksum_bulk_rw(const char *obd_name,
                                enum cksum_types cksum_type,
                                int nob, size_t pg_count,
                                struct brw_page **pga, int opc,
-                               u32 *check_sum)
+                               u32 *check_sum, bool resend)
 {
        obd_dif_csum_fn *fn = NULL;
        int sector_size = 0;
@@ -1355,7 +1368,8 @@ static int osc_checksum_bulk_rw(const char *obd_name,
 
        if (fn)
                rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
-                                            opc, fn, sector_size, check_sum);
+                                            opc, fn, sector_size, check_sum,
+                                            resend);
        else
                rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
                                       check_sum);
@@ -1363,21 +1377,126 @@ static int osc_checksum_bulk_rw(const char *obd_name,
        RETURN(rc);
 }
 
+#ifdef CONFIG_LL_ENCRYPTION
+/**
+ * osc_encrypt_pagecache_blocks() - overlay to llcrypt_encrypt_pagecache_blocks
+ * @srcpage:      The locked pagecache page containing the block(s) to encrypt
+ * @dstpage:      The page to put encryption result
+ * @len:       Total size of the block(s) to encrypt.  Must be a nonzero
+ *             multiple of the filesystem's block size.
+ * @offs:      Byte offset within @page of the first block to encrypt.  Must be
+ *             a multiple of the filesystem's block size.
+ * @gfp_flags: Memory allocation flags
+ *
+ * This overlay function is necessary to be able to provide our own bounce page.
+ */
+static struct page *osc_encrypt_pagecache_blocks(struct page *srcpage,
+                                                struct page *dstpage,
+                                                unsigned int len,
+                                                unsigned int offs,
+                                                gfp_t gfp_flags)
+
+{
+       const struct inode *inode = srcpage->mapping->host;
+       const unsigned int blockbits = inode->i_blkbits;
+       const unsigned int blocksize = 1 << blockbits;
+       u64 lblk_num = ((u64)srcpage->index << (PAGE_SHIFT - blockbits)) +
+               (offs >> blockbits);
+       unsigned int i;
+       int err;
+
+       if (unlikely(!dstpage))
+               return llcrypt_encrypt_pagecache_blocks(srcpage, len, offs,
+                                                       gfp_flags);
+
+       if (WARN_ON_ONCE(!PageLocked(srcpage)))
+               return ERR_PTR(-EINVAL);
+
+       if (WARN_ON_ONCE(len <= 0 || !IS_ALIGNED(len | offs, blocksize)))
+               return ERR_PTR(-EINVAL);
+
+       /* Set PagePrivate2 for disambiguation in
+        * osc_finalize_bounce_page().
+        * It means cipher page was not allocated by llcrypt.
+        */
+       SetPagePrivate2(dstpage);
+
+       for (i = offs; i < offs + len; i += blocksize, lblk_num++) {
+               err = llcrypt_encrypt_block(inode, srcpage, dstpage, blocksize,
+                                           i, lblk_num, gfp_flags);
+               if (err)
+                       return ERR_PTR(err);
+       }
+       SetPagePrivate(dstpage);
+       set_page_private(dstpage, (unsigned long)srcpage);
+       return dstpage;
+}
+
+/**
+ * osc_finalize_bounce_page() - overlay to llcrypt_finalize_bounce_page
+ *
+ * This overlay function is necessary to handle bounce pages
+ * allocated by ourselves.
+ */
+static inline void osc_finalize_bounce_page(struct page **pagep)
+{
+       struct page *page = *pagep;
+
+       /* PagePrivate2 was set in osc_encrypt_pagecache_blocks
+        * to indicate the cipher page was allocated by ourselves.
+        * So we must not free it via llcrypt.
+        */
+       if (unlikely(!page || !PagePrivate2(page)))
+               return llcrypt_finalize_bounce_page(pagep);
+
+       if (llcrypt_is_bounce_page(page)) {
+               *pagep = llcrypt_pagecache_page(page);
+               ClearPagePrivate2(page);
+               set_page_private(page, (unsigned long)NULL);
+               ClearPagePrivate(page);
+       }
+}
+#else /* !CONFIG_LL_ENCRYPTION */
+#define osc_encrypt_pagecache_blocks(srcpage, dstpage, len, offs, gfp_flags) \
+       llcrypt_encrypt_pagecache_blocks(srcpage, len, offs, gfp_flags)
+#define osc_finalize_bounce_page(page) llcrypt_finalize_bounce_page(page)
+#endif
+
 static inline void osc_release_bounce_pages(struct brw_page **pga,
                                            u32 page_count)
 {
 #ifdef HAVE_LUSTRE_CRYPTO
-       int i;
+       struct page **pa = NULL;
+       int i, j = 0;
+
+       if (!pga[0])
+               return;
+
+#ifdef CONFIG_LL_ENCRYPTION
+       if (PageChecked(pga[0]->bp_page)) {
+               OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
+               if (!pa)
+                       return;
+       }
+#endif
 
        for (i = 0; i < page_count; i++) {
-               /* Bounce pages allocated by a call to
-                * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
+               /* Bounce pages used by osc_encrypt_pagecache_blocks()
+                * called from osc_brw_prep_request()
                 * are identified thanks to the PageChecked flag.
                 */
-               if (PageChecked(pga[i]->pg))
-                       llcrypt_finalize_bounce_page(&pga[i]->pg);
-               pga[i]->count -= pga[i]->bp_count_diff;
-               pga[i]->off += pga[i]->bp_off_diff;
+               if (PageChecked(pga[i]->bp_page)) {
+                       if (pa)
+                               pa[j++] = pga[i]->bp_page;
+                       osc_finalize_bounce_page(&pga[i]->bp_page);
+               }
+               pga[i]->bp_count -= pga[i]->bp_count_diff;
+               pga[i]->bp_off += pga[i]->bp_off_diff;
+       }
+
+       if (pa) {
+               sptlrpc_pool_put_pages_array(pa, j);
+               OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
        }
 #endif
 }
@@ -1398,13 +1517,22 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
        struct brw_page *pg_prev;
        void *short_io_buf;
        const char *obd_name = cli->cl_import->imp_obd->obd_name;
-       struct inode *inode;
+       struct inode *inode = NULL;
+       bool directio = false;
+       bool gpu = 0;
+       bool enable_checksum = true;
+       struct cl_page *clpage;
 
        ENTRY;
-       inode = page2inode(pga[0]->pg);
-       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
+       if (pga[0]->bp_page) {
+               clpage = oap2cl_page(brw_page2oap(pga[0]));
+               inode = clpage->cp_inode;
+               if (clpage->cp_type == CPT_TRANSIENT)
+                       directio = true;
+       }
+       if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
                RETURN(-ENOMEM); /* Recoverable */
-       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
+       if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
                RETURN(-EINVAL); /* Fatal */
 
        if ((cmd & OBD_BRW_WRITE) != 0) {
@@ -1419,18 +1547,38 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-       if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
+       if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) &&
+           llcrypt_has_encryption_key(inode)) {
+               struct page **pa = NULL;
+
+#ifdef CONFIG_LL_ENCRYPTION
+               OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
+               if (pa == NULL) {
+                       ptlrpc_request_free(req);
+                       RETURN(-ENOMEM);
+               }
+
+               rc = sptlrpc_pool_get_pages_array(pa, page_count);
+               if (rc) {
+                       CDEBUG(D_SEC, "failed to allocate from enc pool: %d\n",
+                              rc);
+                       ptlrpc_request_free(req);
+                       RETURN(rc);
+               }
+#endif
+
                for (i = 0; i < page_count; i++) {
-                       struct brw_page *pg = pga[i];
+                       struct brw_page *brwpg = pga[i];
                        struct page *data_page = NULL;
                        bool retried = false;
                        bool lockedbymyself;
-                       u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
+                       u32 nunits =
+                               (brwpg->bp_off & ~PAGE_MASK) + brwpg->bp_count;
+                       struct address_space *map_orig = NULL;
+                       pgoff_t index_orig;
 
 retry_encrypt:
-                       if (nunits & ~LUSTRE_ENCRYPTION_MASK)
-                               nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
-                                       LUSTRE_ENCRYPTION_UNIT_SIZE;
+                       nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
                        /* The page can already be locked when we arrive here.
                         * This is possible when cl_page_assume/vvp_page_assume
                         * is stuck on wait_on_page_writeback with page lock
@@ -1440,13 +1588,25 @@ retry_encrypt:
                         * end in vvp_page_completion_write/cl_page_completion,
                         * which means only once the page is fully processed.
                         */
-                       lockedbymyself = trylock_page(pg->pg);
+                       lockedbymyself = trylock_page(brwpg->bp_page);
+                       if (directio) {
+                               map_orig = brwpg->bp_page->mapping;
+                               brwpg->bp_page->mapping = inode->i_mapping;
+                               index_orig = brwpg->bp_page->index;
+                               clpage = oap2cl_page(brw_page2oap(brwpg));
+                               brwpg->bp_page->index = clpage->cp_page_index;
+                       }
                        data_page =
-                               llcrypt_encrypt_pagecache_blocks(pg->pg,
-                                                                nunits, 0,
-                                                                GFP_NOFS);
+                               osc_encrypt_pagecache_blocks(brwpg->bp_page,
+                                                           pa ? pa[i] : NULL,
+                                                           nunits, 0,
+                                                           GFP_NOFS);
+                       if (directio) {
+                               brwpg->bp_page->mapping = map_orig;
+                               brwpg->bp_page->index = index_orig;
+                       }
                        if (lockedbymyself)
-                               unlock_page(pg->pg);
+                               unlock_page(brwpg->bp_page);
                        if (IS_ERR(data_page)) {
                                rc = PTR_ERR(data_page);
                                if (rc == -ENOMEM && !retried) {
@@ -1454,6 +1614,12 @@ retry_encrypt:
                                        rc = 0;
                                        goto retry_encrypt;
                                }
+                               if (pa) {
+                                       sptlrpc_pool_put_pages_array(pa + i,
+                                                               page_count - i);
+                                       OBD_FREE_PTR_ARRAY_LARGE(pa,
+                                                                page_count);
+                               }
                                ptlrpc_request_free(req);
                                RETURN(rc);
                        }
@@ -1461,10 +1627,11 @@ retry_encrypt:
                         * disambiguation in osc_release_bounce_pages().
                         */
                        SetPageChecked(data_page);
-                       pg->pg = data_page;
+                       brwpg->bp_page = data_page;
                        /* there should be no gap in the middle of page array */
                        if (i == page_count - 1) {
-                               struct osc_async_page *oap = brw_page2oap(pg);
+                               struct osc_async_page *oap =
+                                       brw_page2oap(brwpg);
 
                                oa->o_size = oap->oap_count +
                                        oap->oap_obj_off + oap->oap_page_off;
@@ -1472,28 +1639,55 @@ retry_encrypt:
                        /* len is forced to nunits, and relative offset to 0
                         * so store the old, clear text info
                         */
-                       pg->bp_count_diff = nunits - pg->count;
-                       pg->count = nunits;
-                       pg->bp_off_diff = pg->off & ~PAGE_MASK;
-                       pg->off = pg->off & PAGE_MASK;
+                       brwpg->bp_count_diff = nunits - brwpg->bp_count;
+                       brwpg->bp_count = nunits;
+                       brwpg->bp_off_diff = brwpg->bp_off & ~PAGE_MASK;
+                       brwpg->bp_off = brwpg->bp_off & PAGE_MASK;
                }
-       } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
+
+               if (pa)
+                       OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
+       } else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
+               struct osc_async_page *oap = brw_page2oap(pga[0]);
+               struct cl_page *clpage = oap2cl_page(oap);
+               struct cl_object *clobj = clpage->cp_obj;
+               struct cl_attr attr = { 0 };
+               struct lu_env *env;
+               __u16 refcheck;
+
+               env = cl_env_get(&refcheck);
+               if (IS_ERR(env)) {
+                       rc = PTR_ERR(env);
+                       ptlrpc_request_free(req);
+                       RETURN(rc);
+               }
+
+               cl_object_attr_lock(clobj);
+               rc = cl_object_attr_get(env, clobj, &attr);
+               cl_object_attr_unlock(clobj);
+               cl_env_put(env, &refcheck);
+               if (rc != 0) {
+                       ptlrpc_request_free(req);
+                       RETURN(rc);
+               }
+               if (attr.cat_size)
+                       oa->o_size = attr.cat_size;
+       } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode) &&
+                  llcrypt_has_encryption_key(inode)) {
                for (i = 0; i < page_count; i++) {
                        struct brw_page *pg = pga[i];
-                       u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
+                       u32 nunits = (pg->bp_off & ~PAGE_MASK) + pg->bp_count;
 
-                       if (nunits & ~LUSTRE_ENCRYPTION_MASK)
-                               nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
-                                       LUSTRE_ENCRYPTION_UNIT_SIZE;
+                       nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
                        /* count/off are forced to cover the whole encryption
                         * unit size so that all encrypted data is stored on the
                         * OST, so adjust bp_{count,off}_diff for the size of
                         * the clear text.
                         */
-                       pg->bp_count_diff = nunits - pg->count;
-                       pg->count = nunits;
-                       pg->bp_off_diff = pg->off & ~PAGE_MASK;
-                       pg->off = pg->off & PAGE_MASK;
+                       pg->bp_count_diff = nunits - pg->bp_count;
+                       pg->bp_count = nunits;
+                       pg->bp_off_diff = pg->bp_off & ~PAGE_MASK;
+                       pg->bp_off = pg->bp_off & PAGE_MASK;
                }
        }
 
@@ -1509,18 +1703,31 @@ retry_encrypt:
                              niocount * sizeof(*niobuf));
 
        for (i = 0; i < page_count; i++) {
-               short_io_size += pga[i]->count;
-               if (!inode || !IS_ENCRYPTED(inode)) {
+               short_io_size += pga[i]->bp_count;
+               if (!inode || !IS_ENCRYPTED(inode) ||
+                   !llcrypt_has_encryption_key(inode)) {
                        pga[i]->bp_count_diff = 0;
                        pga[i]->bp_off_diff = 0;
                }
        }
 
+       if (brw_page2oap(pga[0])->oap_brw_flags & OBD_BRW_RDMA_ONLY) {
+               enable_checksum = false;
+               short_io_size = 0;
+               gpu = 1;
+       }
+
        /* Check if read/write is small enough to be a short io. */
        if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
            !imp_connect_shortio(cli->cl_import))
                short_io_size = 0;
 
+       /* If this is an empty RPC to old server, just ignore it */
+       if (!short_io_size && !pga[0]->bp_page) {
+               ptlrpc_request_free(req);
+               RETURN(-ENODATA);
+       }
+
        req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
                             opc == OST_READ ? 0 : short_io_size);
        if (opc == OST_READ)
@@ -1555,6 +1762,7 @@ retry_encrypt:
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
         /* NB request now owns desc and will free it when it gets freed */
+       desc->bd_is_rdma = gpu;
 no_bulk:
         body = req_capsule_client_get(pill, &RMF_OST_BODY);
         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
@@ -1584,6 +1792,16 @@ no_bulk:
        else /* short io */
                ioobj_max_brw_set(ioobj, 0);
 
+       if (inode && IS_ENCRYPTED(inode) &&
+           llcrypt_has_encryption_key(inode) &&
+           !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_ENCFLAG)) {
+               if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+                       body->oa.o_valid |= OBD_MD_FLFLAGS;
+                       body->oa.o_flags = 0;
+               }
+               body->oa.o_flags |= LUSTRE_ENCRYPT_FL;
+       }
+
        if (short_io_size != 0) {
                if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
                        body->oa.o_valid |= OBD_MD_FLFLAGS;
@@ -1601,74 +1819,79 @@ no_bulk:
 
        LASSERT(page_count > 0);
        pg_prev = pga[0];
-        for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
-                struct brw_page *pg = pga[i];
-               int poff = pg->off & ~PAGE_MASK;
+       for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
+               struct brw_page *pg = pga[i];
+               int poff = pg->bp_off & ~PAGE_MASK;
 
-                LASSERT(pg->count > 0);
-                /* make sure there is no gap in the middle of page array */
+               LASSERT(pg->bp_count > 0);
+               /* make sure there is no gap in the middle of page array */
                LASSERTF(page_count == 1 ||
-                        (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
+                        (ergo(i == 0, poff + pg->bp_count == PAGE_SIZE) &&
                          ergo(i > 0 && i < page_count - 1,
-                              poff == 0 && pg->count == PAGE_SIZE)   &&
+                              poff == 0 && pg->bp_count == PAGE_SIZE)   &&
                          ergo(i == page_count - 1, poff == 0)),
-                        "i: %d/%d pg: %p off: %llu, count: %u\n",
-                        i, page_count, pg, pg->off, pg->count);
-                LASSERTF(i == 0 || pg->off > pg_prev->off,
-                        "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
-                        " prev_pg %p [pri %lu ind %lu] off %llu\n",
-                         i, page_count,
-                         pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
-                         pg_prev->pg, page_private(pg_prev->pg),
-                         pg_prev->pg->index, pg_prev->off);
-                LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
-                        (pg->flag & OBD_BRW_SRVLOCK));
+                        "i: %d/%d pg: %px off: %llu, count: %u\n",
+                        i, page_count, pg, pg->bp_off, pg->bp_count);
+               LASSERTF(i == 0 || pg->bp_off > pg_prev->bp_off,
+                        "i %d p_c %u pg %px [pri %lu ind %lu] off %llu prev_pg %px [pri %lu ind %lu] off %llu\n",
+                        i, page_count,
+                        pg->bp_page, page_private(pg->bp_page),
+                        pg->bp_page->index, pg->bp_off,
+                        pg_prev->bp_page, page_private(pg_prev->bp_page),
+                        pg_prev->bp_page->index, pg_prev->bp_off);
+               LASSERT((pga[0]->bp_flag & OBD_BRW_SRVLOCK) ==
+                       (pg->bp_flag & OBD_BRW_SRVLOCK));
                if (short_io_size != 0 && opc == OST_WRITE) {
-                       unsigned char *ptr = kmap_atomic(pg->pg);
+                       unsigned char *ptr = kmap_atomic(pg->bp_page);
 
-                       LASSERT(short_io_size >= requested_nob + pg->count);
+                       LASSERT(short_io_size >= requested_nob + pg->bp_count);
                        memcpy(short_io_buf + requested_nob,
                               ptr + poff,
-                              pg->count);
+                              pg->bp_count);
                        kunmap_atomic(ptr);
                } else if (short_io_size == 0) {
-                       desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
-                                                        pg->count);
+                       desc->bd_frag_ops->add_kiov_frag(desc, pg->bp_page, poff,
+                                                        pg->bp_count);
                }
-               requested_nob += pg->count;
+               requested_nob += pg->bp_count;
 
-                if (i > 0 && can_merge_pages(pg_prev, pg)) {
-                        niobuf--;
-                       niobuf->rnb_len += pg->count;
+               if (i > 0 && can_merge_pages(pg_prev, pg)) {
+                       niobuf--;
+                       niobuf->rnb_len += pg->bp_count;
                } else {
-                       niobuf->rnb_offset = pg->off;
-                       niobuf->rnb_len    = pg->count;
-                       niobuf->rnb_flags  = pg->flag;
-                }
-                pg_prev = pg;
-        }
+                       niobuf->rnb_offset = pg->bp_off;
+                       niobuf->rnb_len    = pg->bp_count;
+                       niobuf->rnb_flags  = pg->bp_flag;
+               }
+               pg_prev = pg;
+               if (CFS_FAIL_CHECK(OBD_FAIL_OSC_MARK_COMPRESSED))
+                       niobuf->rnb_flags |= OBD_BRW_COMPRESSED;
+       }
 
-        LASSERTF((void *)(niobuf - niocount) ==
-                req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
-                "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
-                &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
+       LASSERTF((void *)(niobuf - niocount) ==
+                req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
+                "want %px - real %px\n",
+                req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
+                (void *)(niobuf - niocount));
 
-        osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
-        if (resend) {
-                if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
-                        body->oa.o_valid |= OBD_MD_FLFLAGS;
-                        body->oa.o_flags = 0;
-                }
-                body->oa.o_flags |= OBD_FL_RECOV_RESEND;
-        }
+       osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
+       if (resend) {
+               if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+                       body->oa.o_valid |= OBD_MD_FLFLAGS;
+                       body->oa.o_flags = 0;
+               }
+               body->oa.o_flags |= OBD_FL_RECOV_RESEND;
+       }
+
+       if (osc_should_shrink_grant(cli))
+               osc_shrink_grant_local(cli, &body->oa);
 
-        if (osc_should_shrink_grant(cli))
-                osc_shrink_grant_local(cli, &body->oa);
+       if (!cli->cl_checksum || sptlrpc_flavor_has_bulk(&req->rq_flvr))
+               enable_checksum = false;
 
         /* size[REQ_REC_OFF] still sizeof (*body) */
         if (opc == OST_WRITE) {
-                if (cli->cl_checksum &&
-                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
+                if (enable_checksum) {
                         /* store cl_cksum_type in a local variable since
                          * it can be changed via lprocfs */
                        enum cksum_types cksum_type = cli->cl_cksum_type;
@@ -1683,17 +1906,18 @@ no_bulk:
                        rc = osc_checksum_bulk_rw(obd_name, cksum_type,
                                                  requested_nob, page_count,
                                                  pga, OST_WRITE,
-                                                 &body->oa.o_cksum);
+                                                 &body->oa.o_cksum, resend);
                        if (rc < 0) {
-                               CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
+                               CDEBUG(D_PAGE, "failed to checksum: rc = %d\n",
                                       rc);
                                GOTO(out, rc);
                        }
-                        CDEBUG(D_PAGE, "checksum at write origin: %x\n",
-                               body->oa.o_cksum);
+                       CDEBUG(D_PAGE | (resend ? D_HA : 0),
+                              "checksum at write origin: %x (%x)\n",
+                              body->oa.o_cksum, cksum_type);
 
-                        /* save this in 'oa', too, for later checking */
-                        oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+                       /* save this in 'oa', too, for later checking */
+                       oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
                        oa->o_flags |= obd_cksum_type_pack(obd_name,
                                                           cksum_type);
                 } else {
@@ -1706,8 +1930,7 @@ no_bulk:
                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
                                      sizeof(__u32) * niocount);
         } else {
-                if (cli->cl_checksum &&
-                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
+                if (enable_checksum) {
                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
                                 body->oa.o_flags = 0;
                        body->oa.o_flags |= obd_cksum_type_pack(obd_name,
@@ -1759,15 +1982,15 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
         * file/fid, not during the resends/retries. */
        snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
                 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
-                (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
-                 libcfs_debug_file_path_arr :
-                 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
+                (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
+                 libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
-                pga[0]->off,
-                pga[page_count-1]->off + pga[page_count-1]->count - 1,
+                pga[0]->bp_off,
+                pga[page_count-1]->bp_off + pga[page_count-1]->bp_count - 1,
                 client_cksum, server_cksum);
+       CWARN("dumping checksum data to %s\n", dbgcksum_file_name);
        filp = filp_open(dbgcksum_file_name,
                         O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
        if (IS_ERR(filp)) {
@@ -1783,8 +2006,8 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
        }
 
        for (i = 0; i < page_count; i++) {
-               len = pga[i]->count;
-               buf = kmap(pga[i]->pg);
+               len = pga[i]->bp_count;
+               buf = kmap(pga[i]->bp_page);
                while (len != 0) {
                        rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
                        if (rc < 0) {
@@ -1794,20 +2017,20 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
                        }
                        len -= rc;
                        buf += rc;
-                       CDEBUG(D_INFO, "%s: wrote %d bytes\n",
-                              dbgcksum_file_name, rc);
                }
-               kunmap(pga[i]->pg);
+               kunmap(pga[i]->bp_page);
        }
 
        rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
        if (rc)
                CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
        filp_close(filp, NULL);
+
+       libcfs_debug_dumplog();
 }
 
 static int
-check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
+check_write_checksum(struct obdo *oa, const struct lnet_processid *peer,
                     __u32 client_cksum, __u32 server_cksum,
                     struct osc_brw_async_args *aa)
 {
@@ -1856,7 +2079,7 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
                rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
                                             aa->aa_page_count, aa->aa_ppga,
                                             OST_WRITE, fn, sector_size,
-                                            &new_cksum);
+                                            &new_cksum, true);
        else
                rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
                                       aa->aa_ppga, OST_WRITE, cksum_type,
@@ -1880,13 +2103,13 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
                           DFID " object "DOSTID" extent [%llu-%llu], original "
                           "client csum %x (type %x), server csum %x (type %x),"
                           " client csum now %x\n",
-                          obd_name, msg, libcfs_nid2str(peer->nid),
+                          obd_name, msg, libcfs_nidstr(&peer->nid),
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
-                          POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
-                          aa->aa_ppga[aa->aa_page_count - 1]->off +
-                               aa->aa_ppga[aa->aa_page_count-1]->count - 1,
+                          POSTID(&oa->o_oi), aa->aa_ppga[0]->bp_off,
+                          aa->aa_ppga[aa->aa_page_count - 1]->bp_off +
+                               aa->aa_ppga[aa->aa_page_count-1]->bp_count - 1,
                           client_cksum,
                           obd_cksum_type_unpack(aa->aa_oa->o_flags),
                           server_cksum, cksum_type, new_cksum);
@@ -1899,11 +2122,13 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
        struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
        struct client_obd *cli = aa->aa_cli;
        const char *obd_name = cli->cl_import->imp_obd->obd_name;
-       const struct lnet_process_id *peer =
+       const struct lnet_processid *peer =
                &req->rq_import->imp_connection->c_peer;
        struct ost_body *body;
        u32 client_cksum = 0;
-       struct inode *inode;
+       struct inode *inode = NULL;
+       unsigned int blockbits = 0, blocksize = 0;
+       struct cl_page *clpage;
 
        ENTRY;
 
@@ -1929,8 +2154,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                       "setdq for [%u %u %u] with valid %#llx, flags %x\n",
                       body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
                       body->oa.o_valid, body->oa.o_flags);
-                      osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
-                                      body->oa.o_flags);
+               osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
+                               body->oa.o_flags);
        }
 
        osc_update_grant(cli, body);
@@ -2000,13 +2225,13 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                nob = rc;
                while (nob > 0 && pg_count > 0) {
                        unsigned char *ptr;
-                       int count = aa->aa_ppga[i]->count > nob ?
-                                   nob : aa->aa_ppga[i]->count;
+                       int count = aa->aa_ppga[i]->bp_count > nob ?
+                                   nob : aa->aa_ppga[i]->bp_count;
 
                        CDEBUG(D_CACHE, "page %p count %d\n",
-                              aa->aa_ppga[i]->pg, count);
-                       ptr = kmap_atomic(aa->aa_ppga[i]->pg);
-                       memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
+                              aa->aa_ppga[i]->bp_page, count);
+                       ptr = kmap_atomic(aa->aa_ppga[i]->bp_page);
+                       memcpy(ptr + (aa->aa_ppga[i]->bp_off & ~PAGE_MASK), buf,
                               count);
                        kunmap_atomic((void *) ptr);
 
@@ -2017,35 +2242,40 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                }
        }
 
-        if (rc < aa->aa_requested_nob)
-                handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
+       if (rc < aa->aa_requested_nob)
+               handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
 
-        if (body->oa.o_valid & OBD_MD_FLCKSUM) {
-                static int cksum_counter;
-               u32        server_cksum = body->oa.o_cksum;
-               char      *via = "";
-               char      *router = "";
+       if (body->oa.o_valid & OBD_MD_FLCKSUM) {
+               static int cksum_counter;
+               u32 server_cksum = body->oa.o_cksum;
+               int nob = rc;
+               char *via = "";
+               char *router = "";
                enum cksum_types cksum_type;
                u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
                        body->oa.o_flags : 0;
 
                cksum_type = obd_cksum_type_unpack(o_flags);
-               rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
+               rc = osc_checksum_bulk_rw(obd_name, cksum_type, nob,
                                          aa->aa_page_count, aa->aa_ppga,
-                                         OST_READ, &client_cksum);
+                                         OST_READ, &client_cksum, false);
                if (rc < 0)
                        GOTO(out, rc);
 
                if (req->rq_bulk != NULL &&
-                   peer->nid != req->rq_bulk->bd_sender) {
+                   !nid_same(&peer->nid, &req->rq_bulk->bd_sender)) {
                        via = " via ";
-                       router = libcfs_nid2str(req->rq_bulk->bd_sender);
+                       router = libcfs_nidstr(&req->rq_bulk->bd_sender);
                }
 
                if (server_cksum != client_cksum) {
                        struct ost_body *clbody;
+                       __u32 client_cksum2;
                        u32 page_count = aa->aa_page_count;
 
+                       osc_checksum_bulk_rw(obd_name, cksum_type, nob,
+                                            page_count, aa->aa_ppga,
+                                            OST_READ, &client_cksum2, true);
                        clbody = req_capsule_client_get(&req->rq_pill,
                                                        &RMF_OST_BODY);
                        if (cli->cl_checksum_dump)
@@ -2055,10 +2285,10 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 
                        LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
                                           "%s%s%s inode "DFID" object "DOSTID
-                                          " extent [%llu-%llu], client %x, "
+                                          " extent [%llu-%llu], client %x/%x, "
                                           "server %x, cksum_type %x\n",
                                           obd_name,
-                                          libcfs_nid2str(peer->nid),
+                                          libcfs_nidstr(&peer->nid),
                                           via, router,
                                           clbody->oa.o_valid & OBD_MD_FLFID ?
                                                clbody->oa.o_parent_seq : 0ULL,
@@ -2067,11 +2297,11 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                                           clbody->oa.o_valid & OBD_MD_FLFID ?
                                                clbody->oa.o_parent_ver : 0,
                                           POSTID(&body->oa.o_oi),
-                                          aa->aa_ppga[0]->off,
-                                          aa->aa_ppga[page_count-1]->off +
-                                          aa->aa_ppga[page_count-1]->count - 1,
-                                          client_cksum, server_cksum,
-                                          cksum_type);
+                                          aa->aa_ppga[0]->bp_off,
+                                          aa->aa_ppga[page_count-1]->bp_off +
+                                          aa->aa_ppga[page_count-1]->bp_count - 1,
+                                          client_cksum, client_cksum2,
+                                          server_cksum, cksum_type);
                        cksum_counter = 0;
                        aa->aa_oa->o_cksum = client_cksum;
                        rc = -EAGAIN;
@@ -2087,12 +2317,18 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                if ((cksum_missed & (-cksum_missed)) == cksum_missed)
                        CERROR("%s: checksum %u requested from %s but not sent\n",
                               obd_name, cksum_missed,
-                              libcfs_nid2str(peer->nid));
+                              libcfs_nidstr(&peer->nid));
        } else {
                rc = 0;
        }
 
-       inode = page2inode(aa->aa_ppga[0]->pg);
+       /* get the inode from the first cl_page */
+       clpage = oap2cl_page(brw_page2oap(aa->aa_ppga[0]));
+       inode = clpage->cp_inode;
+       if (clpage->cp_type == CPT_TRANSIENT && inode) {
+               blockbits = inode->i_blkbits;
+               blocksize = 1 << blockbits;
+       }
        if (inode && IS_ENCRYPTED(inode)) {
                int idx;
 
@@ -2101,34 +2337,55 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                        GOTO(out, rc);
                }
                for (idx = 0; idx < aa->aa_page_count; idx++) {
-                       struct brw_page *pg = aa->aa_ppga[idx];
+                       struct brw_page *brwpg = aa->aa_ppga[idx];
                        unsigned int offs = 0;
 
                        while (offs < PAGE_SIZE) {
                                /* do not decrypt if page is all 0s */
-                               if (memchr_inv(page_address(pg->pg) + offs, 0,
-                                        LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
+                               if (memchr_inv(page_address(brwpg->bp_page) + offs,
+                                     0, LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
                                        /* if page is empty forward info to
                                         * upper layers (ll_io_zero_page) by
                                         * clearing PagePrivate2
                                         */
                                        if (!offs)
-                                               ClearPagePrivate2(pg->pg);
+                                               ClearPagePrivate2(brwpg->bp_page);
                                        break;
                                }
 
-                               /* The page is already locked when we arrive here,
-                                * except when we deal with a twisted page for
-                                * specific Direct IO support, in which case
-                                * PageChecked flag is set on page.
-                                */
-                               if (PageChecked(pg->pg))
-                                       lock_page(pg->pg);
-                               rc = llcrypt_decrypt_pagecache_blocks(pg->pg,
-                                                   LUSTRE_ENCRYPTION_UNIT_SIZE,
-                                                                     offs);
-                               if (PageChecked(pg->pg))
-                                       unlock_page(pg->pg);
+                               if (blockbits) {
+                                       /* This is direct IO case. Directly call
+                                        * decrypt function that takes inode as
+                                        * input parameter. Page does not need
+                                        * to be locked.
+                                        */
+                                       u64 lblk_num;
+                                       unsigned int i;
+
+                                       clpage =
+                                              oap2cl_page(brw_page2oap(brwpg));
+                                       lblk_num =
+                                               ((u64)(clpage->cp_page_index) <<
+                                               (PAGE_SHIFT - blockbits)) +
+                                               (offs >> blockbits);
+                                       for (i = offs;
+                                            i < offs +
+                                                   LUSTRE_ENCRYPTION_UNIT_SIZE;
+                                            i += blocksize, lblk_num++) {
+                                               rc =
+                                                 llcrypt_decrypt_block_inplace(
+                                                         inode, brwpg->bp_page,
+                                                         blocksize, i,
+                                                         lblk_num);
+                                               if (rc)
+                                                       break;
+                                       }
+                               } else {
+                                       rc = llcrypt_decrypt_pagecache_blocks(
+                                               brwpg->bp_page,
+                                               LUSTRE_ENCRYPTION_UNIT_SIZE,
+                                               offs);
+                               }
                                if (rc)
                                        GOTO(out, rc);
 
@@ -2150,7 +2407,6 @@ static int osc_brw_redo_request(struct ptlrpc_request *request,
 {
        struct ptlrpc_request *new_req;
        struct osc_brw_async_args *new_aa;
-       struct osc_async_page *oap;
        ENTRY;
 
        /* The below message is checked in replay-ost-single.sh test_8ae*/
@@ -2164,13 +2420,10 @@ static int osc_brw_redo_request(struct ptlrpc_request *request,
         if (rc)
                 RETURN(rc);
 
-       list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
-               if (oap->oap_request != NULL) {
-                       LASSERTF(request == oap->oap_request,
-                                "request %p != oap_request %p\n",
-                                request, oap->oap_request);
-               }
-       }
+
+       LASSERTF(request == aa->aa_request,
+                "request %p != aa_request %p\n",
+                request, aa->aa_request);
        /*
         * New request takes over pga and oaps from old request.
         * Note that copying a list_head doesn't work, need to move it...
@@ -2196,12 +2449,10 @@ static int osc_brw_redo_request(struct ptlrpc_request *request,
        list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
        new_aa->aa_resends = aa->aa_resends;
 
-       list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
-                if (oap->oap_request) {
-                        ptlrpc_req_finished(oap->oap_request);
-                        oap->oap_request = ptlrpc_request_addref(new_req);
-                }
-        }
+       if (aa->aa_request) {
+               ptlrpc_req_finished(aa->aa_request);
+               new_aa->aa_request = ptlrpc_request_addref(new_req);
+       }
 
        /* XXX: This code will run into problem if we're going to support
         * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
@@ -2235,7 +2486,7 @@ static void sort_brw_pages(struct brw_page **array, int num)
                 for (i = stride ; i < num ; i++) {
                         tmp = array[i];
                         j = i;
-                        while (j >= stride && array[j - stride]->off > tmp->off) {
+                        while (j >= stride && array[j - stride]->bp_off > tmp->bp_off) {
                                 array[j] = array[j - stride];
                                 j -= stride;
                         }
@@ -2247,20 +2498,48 @@ static void sort_brw_pages(struct brw_page **array, int num)
 static void osc_release_ppga(struct brw_page **ppga, size_t count)
 {
        LASSERT(ppga != NULL);
-       OBD_FREE_PTR_ARRAY(ppga, count);
+       OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
+}
+
+/* this is trying to propogate async writeback errors back up to the
+ * application.  As an async write fails we record the error code for later if
+ * the app does an fsync.  As long as errors persist we force future rpcs to be
+ * sync so that the app can get a sync error and break the cycle of queueing
+ * pages for which writeback will fail.
+ */
+static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
+                          int rc)
+{
+       if (rc) {
+               if (!ar->ar_rc)
+                       ar->ar_rc = rc;
+
+               ar->ar_force_sync = 1;
+               ar->ar_min_xid = ptlrpc_sample_next_xid();
+               return;
+
+       }
+
+       if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
+               ar->ar_force_sync = 0;
 }
 
 static int brw_interpret(const struct lu_env *env,
                         struct ptlrpc_request *req, void *args, int rc)
 {
        struct osc_brw_async_args *aa = args;
-       struct osc_extent *ext;
-       struct osc_extent *tmp;
        struct client_obd *cli = aa->aa_cli;
        unsigned long transferred = 0;
+       struct cl_object *obj = NULL;
+       struct osc_async_page *last;
+       struct osc_extent *ext;
+       struct osc_extent *tmp;
+       struct lov_oinfo *loi;
 
        ENTRY;
 
+       ext = list_first_entry(&aa->aa_exts, struct osc_extent, oe_link);
+
        rc = osc_brw_fini_request(req, rc);
        CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
 
@@ -2279,7 +2558,7 @@ static int brw_interpret(const struct lu_env *env,
                               req->rq_import->imp_obd->obd_name,
                               POSTID(&aa->aa_oa->o_oi), rc);
                } else if (rc == -EINPROGRESS ||
-                   client_should_resend(aa->aa_resends, aa->aa_cli)) {
+                          client_should_resend(aa->aa_resends, aa->aa_cli)) {
                        rc = osc_brw_redo_request(req, aa, rc);
                } else {
                        CERROR("%s: too many resent retries for object: "
@@ -2294,15 +2573,14 @@ static int brw_interpret(const struct lu_env *env,
                        rc = -EIO;
        }
 
+       last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
+       obj = osc2cl(ext->oe_obj);
+       loi = cl2osc(obj)->oo_oinfo;
+
        if (rc == 0) {
                struct obdo *oa = aa->aa_oa;
                struct cl_attr *attr = &osc_env_info(env)->oti_attr;
                unsigned long valid = 0;
-               struct cl_object *obj;
-               struct osc_async_page *last;
-
-               last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
-               obj = osc2cl(last->oap_obj);
 
                cl_object_attr_lock(obj);
                if (oa->o_valid & OBD_MD_FLBLOCKS) {
@@ -2323,7 +2601,6 @@ static int brw_interpret(const struct lu_env *env,
                }
 
                if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
-                       struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
                        loff_t last_off = last->oap_count + last->oap_obj_off +
                                last->oap_page_off;
 
@@ -2348,13 +2625,32 @@ static int brw_interpret(const struct lu_env *env,
        OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
        aa->aa_oa = NULL;
 
-       if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
+       if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) {
                osc_inc_unstable_pages(req);
+               /*
+                * If req->rq_committed is set, it means that the dirty pages
+                * have already committed into the stable storage on OSTs
+                * (i.e. Direct I/O).
+                */
+               if (!req->rq_committed)
+                       cl_object_dirty_for_sync(env, cl_object_top(obj));
+       }
 
+       if (aa->aa_request) {
+               __u64 xid = ptlrpc_req_xid(req);
+
+               ptlrpc_req_finished(req);
+               if (xid && lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
+                       spin_lock(&cli->cl_loi_list_lock);
+                       osc_process_ar(&cli->cl_ar, xid, rc);
+                       osc_process_ar(&loi->loi_ar, xid, rc);
+                       spin_unlock(&cli->cl_loi_list_lock);
+               }
+       }
        list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
                list_del_init(&ext->oe_link);
                osc_extent_finish(env, ext, 1,
-                                 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
+                                 rc && req->rq_no_delay ? -EAGAIN : rc);
        }
        LASSERT(list_empty(&aa->aa_exts));
        LASSERT(list_empty(&aa->aa_oaps));
@@ -2447,7 +2743,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        if (mem_tight)
                mpflag = memalloc_noreclaim_save();
 
-       OBD_ALLOC_PTR_ARRAY(pga, page_count);
+       OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
        if (pga == NULL)
                GOTO(out, rc = -ENOMEM);
 
@@ -2463,28 +2759,32 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                        if (soft_sync)
                                oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
                        pga[i] = &oap->oap_brw_page;
-                       pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
+                       pga[i]->bp_off = oap->oap_obj_off + oap->oap_page_off;
                        i++;
 
                        list_add_tail(&oap->oap_rpc_item, &rpc_list);
                        if (starting_offset == OBD_OBJECT_EOF ||
-                           starting_offset > oap->oap_obj_off)
+                           starting_offset > oap->oap_obj_off) {
                                starting_offset = oap->oap_obj_off;
-                       else
+                       } else {
+                               CDEBUG(D_CACHE, "page i:%d, oap->oap_obj_off %llu, oap->oap_page_off %u\n",
+                                      i, oap->oap_obj_off, oap->oap_page_off);
                                LASSERT(oap->oap_page_off == 0);
-                       if (ending_offset < oap->oap_obj_off + oap->oap_count)
+                       }
+                       if (ending_offset < oap->oap_obj_off + oap->oap_count) {
                                ending_offset = oap->oap_obj_off +
                                                oap->oap_count;
-                       else
+                       } else {
                                LASSERT(oap->oap_page_off + oap->oap_count ==
                                        PAGE_SIZE);
+                       }
                }
                if (ext->oe_ndelay)
                        ndelay = true;
        }
 
        /* first page in the list */
-       oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
+       oap = list_first_entry(&rpc_list, typeof(*oap), oap_rpc_item);
 
        crattr = &osc_env_info(env)->oti_req_attr;
        memset(crattr, 0, sizeof(*crattr));
@@ -2515,7 +2815,6 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        req->rq_commit_cb = brw_commit;
        req->rq_interpret_reply = brw_interpret;
        req->rq_memalloc = mem_tight != 0;
-       oap->oap_request = ptlrpc_request_addref(req);
        if (ndelay) {
                req->rq_no_resend = req->rq_no_delay = 1;
                /* probably set a shorter timeout value.
@@ -2532,6 +2831,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        crattr->cra_oa = &body->oa;
        crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
        cl_req_attr_set(env, osc2cl(obj), crattr);
+       lustre_msg_set_uid_gid(req->rq_reqmsg, &crattr->cra_uid,
+                              &crattr->cra_gid);
        lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
 
        aa = ptlrpc_req_async_args(aa, req);
@@ -2539,9 +2840,11 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        list_splice_init(&rpc_list, &aa->aa_oaps);
        INIT_LIST_HEAD(&aa->aa_exts);
        list_splice_init(ext_list, &aa->aa_exts);
+       aa->aa_request = ptlrpc_request_addref(req);
 
        spin_lock(&cli->cl_loi_list_lock);
        starting_offset >>= PAGE_SHIFT;
+       ending_offset >>= PAGE_SHIFT;
        if (cmd == OBD_BRW_READ) {
                cli->cl_r_in_flight++;
                lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
@@ -2558,9 +2861,20 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        spin_unlock(&cli->cl_loi_list_lock);
 
        DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
-                 page_count, aa, cli->cl_r_in_flight,
-                 cli->cl_w_in_flight);
-       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
+                 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
+       if (libcfs_debug & D_IOTRACE) {
+               struct lu_fid fid;
+
+               fid.f_seq = crattr->cra_oa->o_parent_seq;
+               fid.f_oid = crattr->cra_oa->o_parent_oid;
+               fid.f_ver = crattr->cra_oa->o_parent_ver;
+               CDEBUG(D_IOTRACE,
+                      DFID": %d %s pages, start %lld, end %lld, now %ur/%uw in flight\n",
+                      PFID(&fid), page_count,
+                      cmd == OBD_BRW_READ ? "read" : "write", starting_offset,
+                      ending_offset, cli->cl_r_in_flight, cli->cl_w_in_flight);
+       }
+       CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
 
        ptlrpcd_add_req(req);
        rc = 0;
@@ -2580,10 +2894,11 @@ out:
                        osc_release_ppga(pga, page_count);
                }
                /* this should happen rarely and is pretty bad, it makes the
-                * pending list not follow the dirty order */
-               while (!list_empty(ext_list)) {
-                       ext = list_entry(ext_list->next, struct osc_extent,
-                                        oe_link);
+                * pending list not follow the dirty order
+                */
+               while ((ext = list_first_entry_or_null(ext_list,
+                                                      struct osc_extent,
+                                                      oe_link)) != NULL) {
                        list_del_init(&ext->oe_link);
                        osc_extent_finish(env, ext, 0, rc);
                }
@@ -2591,6 +2906,34 @@ out:
        RETURN(rc);
 }
 
+/* This is to refresh our lock in face of no RPCs. */
+void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start)
+{
+       struct ptlrpc_request *req;
+       struct obdo oa;
+       struct brw_page bpg = { .bp_off = start, .bp_count = 1};
+       struct brw_page *pga = &bpg;
+       int rc;
+
+       memset(&oa, 0, sizeof(oa));
+       oa.o_oi = osc->oo_oinfo->loi_oi;
+       oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
+       /* For updated servers - don't do a read */
+       oa.o_flags = OBD_FL_NORPC;
+
+       rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga,
+                                 &req, 0);
+
+       /* If we succeeded we ship it off, if not there's no point in doing
+        * anything. Also no resends.
+        * No interpret callback, no commit callback.
+        */
+       if (!rc) {
+               req->rq_no_resend = 1;
+               ptlrpcd_add_req(req);
+       }
+}
+
 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
 {
         int set = 0;
@@ -2609,10 +2952,11 @@ static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
        return set;
 }
 
-int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
-                    void *cookie, struct lustre_handle *lockh,
-                    enum ldlm_mode mode, __u64 *flags, bool speculative,
-                    int errcode)
+static int osc_enqueue_fini(struct ptlrpc_request *req,
+                           osc_enqueue_upcall_f upcall,
+                           void *cookie, struct lustre_handle *lockh,
+                           enum ldlm_mode mode, __u64 *flags,
+                           bool speculative, int errcode)
 {
        bool intent = *flags & LDLM_FL_HAS_INTENT;
        int rc;
@@ -2647,8 +2991,9 @@ int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
        RETURN(rc);
 }
 
-int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
-                         void *args, int rc)
+static int osc_enqueue_interpret(const struct lu_env *env,
+                                struct ptlrpc_request *req,
+                                void *args, int rc)
 {
        struct osc_enqueue_args *aa = args;
        struct ldlm_lock *lock;
@@ -2668,7 +3013,7 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
         * be valid. */
        lock = ldlm_handle2lock(lockh);
        LASSERTF(lock != NULL,
-                "lockh %#llx, req %p, aa %p - client evicted?\n",
+                "lockh %#llx, req %px, aa %px - client evicted?\n",
                 lockh->cookie, req, aa);
 
        /* Take an additional reference so that a blocking AST that
@@ -2678,10 +3023,10 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
        ldlm_lock_addref(lockh, mode);
 
        /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
-       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
 
        /* Let CP AST to grant the lock first. */
-       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
 
        if (aa->oa_speculative) {
                LASSERT(aa->oa_lvb == NULL);
@@ -2690,13 +3035,14 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
        }
 
        /* Complete obtaining the lock procedure. */
-       rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
-                                  lvb, lvb_len, lockh, rc);
+       rc = ldlm_cli_enqueue_fini(aa->oa_exp, &req->rq_pill, &einfo, 1,
+                                  aa->oa_flags, lvb, lvb_len, lockh, rc,
+                                  false);
        /* Complete osc stuff. */
        rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
                              aa->oa_flags, aa->oa_speculative, rc);
 
-       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
 
        ldlm_lock_decref(lockh, mode);
        LDLM_LOCK_PUT(lock);
@@ -2721,7 +3067,8 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        struct lustre_handle lockh = { 0 };
        struct ptlrpc_request *req = NULL;
        int intent = *flags & LDLM_FL_HAS_INTENT;
-       __u64 match_flags = *flags;
+       __u64 search_flags = *flags;
+       __u64 match_flags = 0;
        enum ldlm_mode mode;
        int rc;
        ENTRY;
@@ -2750,11 +3097,14 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
         * matching a lock; speculative lock requests do not need to,
         * because they will not actually use the lock. */
        if (!speculative)
-               match_flags |= LDLM_FL_LVB_READY;
+               search_flags |= LDLM_FL_LVB_READY;
        if (intent != 0)
-               match_flags |= LDLM_FL_BLOCK_GRANTED;
-       mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
-                              einfo->ei_type, policy, mode, &lockh);
+               search_flags |= LDLM_FL_BLOCK_GRANTED;
+       if (mode == LCK_GROUP)
+               match_flags = LDLM_MATCH_GROUP;
+       mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
+                                        res_id, einfo->ei_type, policy, mode,
+                                        &lockh, match_flags);
        if (mode) {
                struct ldlm_lock *matched;
 
@@ -2846,7 +3196,7 @@ int osc_match_base(const struct lu_env *env, struct obd_export *exp,
        enum ldlm_mode rc;
        ENTRY;
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
+       if (CFS_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
                RETURN(-EIO);
 
        /* Filesystem lock extents are extended to page boundaries so that
@@ -2980,19 +3330,17 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
        struct obd_device     *obd = class_exp2obd(exp);
        struct obd_statfs     *msfs;
        struct ptlrpc_request *req;
-       struct obd_import     *imp = NULL;
+       struct obd_import     *imp, *imp0;
        int rc;
        ENTRY;
 
-
-        /*Since the request might also come from lprocfs, so we need
-         *sync this with client_disconnect_export Bug15684*/
-       down_read(&obd->u.cli.cl_sem);
-        if (obd->u.cli.cl_import)
-                imp = class_import_get(obd->u.cli.cl_import);
-       up_read(&obd->u.cli.cl_sem);
-        if (!imp)
-                RETURN(-ENODEV);
+       /*Since the request might also come from lprocfs, so we need
+        *sync this with client_disconnect_export Bug15684
+        */
+       with_imp_locked(obd, imp0, rc)
+               imp = class_import_get(imp0);
+       if (rc)
+               RETURN(rc);
 
        /* We could possibly pass max_age in the request (as an absolute
         * timestamp or a "seconds.usec ago") so the target can avoid doing
@@ -3042,30 +3390,57 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         void *karg, void __user *uarg)
 {
        struct obd_device *obd = exp->exp_obd;
-       struct obd_ioctl_data *data = karg;
-       int rc = 0;
+       struct obd_ioctl_data *data;
+       int rc;
 
        ENTRY;
+       CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n",
+              obd->obd_name, cmd, len, karg, uarg);
+
        if (!try_module_get(THIS_MODULE)) {
                CERROR("%s: cannot get module '%s'\n", obd->obd_name,
                       module_name(THIS_MODULE));
-               return -EINVAL;
+               RETURN(-EINVAL);
        }
+
        switch (cmd) {
        case OBD_IOC_CLIENT_RECOVER:
+               if (unlikely(karg == NULL)) {
+                       OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
+                                     rc = -EINVAL);
+                       break;
+               }
+               data = karg;
                rc = ptlrpc_recover_import(obd->u.cli.cl_import,
                                           data->ioc_inlbuf1, 0);
                if (rc > 0)
                        rc = 0;
                break;
-       case IOC_OSC_SET_ACTIVE:
+       case OBD_IOC_GETATTR:
+               if (unlikely(karg == NULL)) {
+                       OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
+                                     rc = -EINVAL);
+                       break;
+               }
+               data = karg;
+               rc = obd_getattr(NULL, exp, &data->ioc_obdo1);
+               break;
+#ifdef IOC_OSC_SET_ACTIVE
+       case_OBD_IOC_DEPRECATED_FT(IOC_OSC_SET_ACTIVE, obd->obd_name, 2, 17);
+#endif
+       case OBD_IOC_SET_ACTIVE:
+               if (unlikely(karg == NULL)) {
+                       OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
+                                     rc = -EINVAL);
+                       break;
+               }
+               data = karg;
                rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
                                              data->ioc_offset);
                break;
        default:
-               rc = -ENOTTY;
-               CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
-                      obd->obd_name, cmd, current->comm, rc);
+               rc = OBD_IOC_DEBUG(D_IOCTL, obd->obd_name, cmd, "unrecognized",
+                                  -ENOTTY);
                break;
        }
 
@@ -3077,31 +3452,31 @@ int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
                       u32 keylen, void *key, u32 vallen, void *val,
                       struct ptlrpc_request_set *set)
 {
-        struct ptlrpc_request *req;
-        struct obd_device     *obd = exp->exp_obd;
-        struct obd_import     *imp = class_exp2cliimp(exp);
-        char                  *tmp;
-        int                    rc;
-        ENTRY;
+       struct ptlrpc_request *req;
+       struct obd_device *obd = exp->exp_obd;
+       struct obd_import *imp = class_exp2cliimp(exp);
+       char *tmp;
+       int rc;
+       ENTRY;
 
-        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
 
-        if (KEY_IS(KEY_CHECKSUM)) {
-                if (vallen != sizeof(int))
-                        RETURN(-EINVAL);
-                exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
-                RETURN(0);
-        }
+       if (KEY_IS(KEY_CHECKSUM)) {
+               if (vallen != sizeof(int))
+                       RETURN(-EINVAL);
+               exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
+               RETURN(0);
+       }
 
-        if (KEY_IS(KEY_SPTLRPC_CONF)) {
-                sptlrpc_conf_client_adapt(obd);
-                RETURN(0);
-        }
+       if (KEY_IS(KEY_SPTLRPC_CONF)) {
+               sptlrpc_conf_client_adapt(obd);
+               RETURN(0);
+       }
 
-        if (KEY_IS(KEY_FLUSH_CTX)) {
-                sptlrpc_import_flush_my_ctx(imp);
-                RETURN(0);
-        }
+       if (KEY_IS(KEY_FLUSH_CTX)) {
+               sptlrpc_import_flush_my_ctx(imp);
+               RETURN(0);
+       }
 
        if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
                struct client_obd *cli = &obd->u.cli;
@@ -3113,15 +3488,17 @@ int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
                RETURN(0);
        }
 
-        if (!set && !KEY_IS(KEY_GRANT_SHRINK))
-                RETURN(-EINVAL);
+       if (!set && !KEY_IS(KEY_GRANT_SHRINK))
+               RETURN(-EINVAL);
 
-        /* We pass all other commands directly to OST. Since nobody calls osc
-           methods directly and everybody is supposed to go through LOV, we
-           assume lov checked invalid values for us.
-           The only recognised values so far are evict_by_nid and mds_conn.
-           Even if something bad goes through, we'd get a -EINVAL from OST
-           anyway. */
+       /*
+        * We pass all other commands directly to OST. Since nobody calls osc
+        * methods directly and everybody is supposed to go through LOV, we
+        * assume lov checked invalid values for us.
+        * The only recognised values so far are evict_by_nid and mds_conn.
+        * Even if something bad goes through, we'd get a -EINVAL from OST
+        * anyway.
+        */
 
        req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
                                                &RQF_OST_SET_GRANT_INFO :
@@ -3266,38 +3643,40 @@ int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
 }
 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
 
-static int osc_import_event(struct obd_device *obd,
-                            struct obd_import *imp,
-                            enum obd_import_event event)
+static int osc_import_event(struct obd_device *obd, struct obd_import *imp,
+                           enum obd_import_event event)
 {
-        struct client_obd *cli;
-        int rc = 0;
-
-        ENTRY;
-        LASSERT(imp->imp_obd == obd);
+       struct client_obd *cli;
+       int rc = 0;
 
-        switch (event) {
-        case IMP_EVENT_DISCON: {
-                cli = &obd->u.cli;
+       ENTRY;
+       if (WARN_ON_ONCE(!obd || !imp || imp->imp_obd != obd))
+               RETURN(-ENODEV);
+
+       switch (event) {
+       case IMP_EVENT_DISCON: {
+               cli = &obd->u.cli;
+               if (!cli)
+                       RETURN(-ENODEV);
                spin_lock(&cli->cl_loi_list_lock);
                cli->cl_avail_grant = 0;
                cli->cl_lost_grant = 0;
                spin_unlock(&cli->cl_loi_list_lock);
-                break;
-        }
-        case IMP_EVENT_INACTIVE: {
+               break;
+       }
+       case IMP_EVENT_INACTIVE: {
                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
-                break;
-        }
-        case IMP_EVENT_INVALIDATE: {
-                struct ldlm_namespace *ns = obd->obd_namespace;
-                struct lu_env         *env;
-               __u16                  refcheck;
+               break;
+       }
+       case IMP_EVENT_INVALIDATE: {
+               struct ldlm_namespace *ns = obd->obd_namespace;
+               struct lu_env *env;
+               __u16 refcheck;
 
                ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
 
-                env = cl_env_get(&refcheck);
-                if (!IS_ERR(env)) {
+               env = cl_env_get(&refcheck);
+               if (!IS_ERR(env)) {
                        osc_io_unplug(env, &obd->u.cli, NULL);
 
                        cfs_hash_for_each_nolock(ns->ns_rs_hash,
@@ -3306,40 +3685,43 @@ static int osc_import_event(struct obd_device *obd,
                        cl_env_put(env, &refcheck);
 
                        ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
-                } else
-                        rc = PTR_ERR(env);
-                break;
-        }
-        case IMP_EVENT_ACTIVE: {
+               } else {
+                       rc = PTR_ERR(env);
+               }
+               break;
+       }
+       case IMP_EVENT_ACTIVE: {
                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
-                break;
-        }
-        case IMP_EVENT_OCD: {
-                struct obd_connect_data *ocd = &imp->imp_connect_data;
+               break;
+       }
+       case IMP_EVENT_OCD: {
+               struct obd_connect_data *ocd = &imp->imp_connect_data;
 
-                if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
-                        osc_init_grant(&obd->u.cli, ocd);
+               if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
+                       osc_init_grant(&obd->u.cli, ocd);
 
-                /* See bug 7198 */
-                if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
-                        imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
+               /* See bug 7198 */
+               if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
+                       imp->imp_client->cli_request_portal =
+                               OST_REQUEST_PORTAL;
 
                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
-                break;
-        }
-        case IMP_EVENT_DEACTIVATE: {
+               break;
+       }
+       case IMP_EVENT_DEACTIVATE: {
                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
-                break;
-        }
-        case IMP_EVENT_ACTIVATE: {
+               break;
+       }
+       case IMP_EVENT_ACTIVATE: {
                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
-                break;
-        }
-        default:
-                CERROR("Unknown import event %d\n", event);
-                LBUG();
-        }
-        RETURN(rc);
+               break;
+       }
+       default:
+               CERROR("%s: Unknown import event %d: rc = %d\n",
+                      obd->obd_name, event, -EINVAL);
+               LBUG();
+       }
+       RETURN(rc);
 }
 
 /**
@@ -3404,6 +3786,7 @@ int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
                GOTO(out_ptlrpcd_work, rc);
 
        cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
+       cli->cl_root_squash = 0;
        osc_update_next_shrink(cli);
 
        RETURN(rc);
@@ -3521,7 +3904,7 @@ int osc_cleanup_common(struct obd_device *obd)
 
        /* lru cleanup */
        if (cli->cl_cache != NULL) {
-               LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
+               LASSERT(refcount_read(&cli->cl_cache->ccc_users) > 0);
                spin_lock(&cli->cl_cache->ccc_lru_lock);
                list_del_init(&cli->cl_lru_osc);
                spin_unlock(&cli->cl_cache->ccc_lru_lock);
@@ -3562,30 +3945,38 @@ static const struct obd_ops osc_obd_ops = {
         .o_quotactl             = osc_quotactl,
 };
 
-static struct shrinker *osc_cache_shrinker;
 LIST_HEAD(osc_shrink_list);
 DEFINE_SPINLOCK(osc_shrink_lock);
+bool osc_page_cache_shrink_enabled = true;
 
-#ifndef HAVE_SHRINKER_COUNT
-static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
+#ifdef HAVE_SHRINKER_COUNT
+static struct ll_shrinker_ops osc_cache_sh_ops = {
+       .count_objects  = osc_cache_shrink_count,
+       .scan_objects   = osc_cache_shrink_scan,
+       .seeks          = DEFAULT_SEEKS,
+};
+#else
+static int osc_cache_shrink(struct shrinker *shrinker,
+                           struct shrink_control *sc)
 {
-       struct shrink_control scv = {
-               .nr_to_scan = shrink_param(sc, nr_to_scan),
-               .gfp_mask   = shrink_param(sc, gfp_mask)
-       };
-       (void)osc_cache_shrink_scan(shrinker, &scv);
+       (void)osc_cache_shrink_scan(shrinker, sc);
 
-       return osc_cache_shrink_count(shrinker, &scv);
+       return osc_cache_shrink_count(shrinker, sc);
 }
+
+static struct ll_shrinker_ops osc_cache_sh_ops = {
+       .shrink   = osc_cache_shrink,
+       .seeks    = DEFAULT_SEEKS,
+};
 #endif
 
+static struct shrinker *osc_cache_shrinker;
+
 static int __init osc_init(void)
 {
        unsigned int reqpool_size;
        unsigned int reqsize;
        int rc;
-       DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
-                        osc_cache_shrink_count, osc_cache_shrink_scan);
        ENTRY;
 
        /* print an address of _any_ initialized kernel symbol from this
@@ -3593,20 +3984,22 @@ static int __init osc_init(void)
         * symbols from modules.*/
        CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
 
-       rc = lu_kmem_init(osc_caches);
+       rc = libcfs_setup();
        if (rc)
-               RETURN(rc);
+               return rc;
 
-       rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
-                                LUSTRE_OSC_NAME, &osc_device_type);
+       rc = lu_kmem_init(osc_caches);
        if (rc)
-               GOTO(out_kmem, rc);
+               RETURN(rc);
 
-       osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
+       osc_cache_shrinker = ll_shrinker_create(&osc_cache_sh_ops, 0,
+                                               "osc_cache");
+       if (IS_ERR(osc_cache_shrinker))
+               GOTO(out_kmem, rc = PTR_ERR(osc_cache_shrinker));
 
        /* This is obviously too much memory, only prevent overflow here */
        if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
-               GOTO(out_type, rc = -EINVAL);
+               GOTO(out_shrinker, rc = -EINVAL);
 
        reqpool_size = osc_reqpool_mem_max << 20;
 
@@ -3627,18 +4020,25 @@ static int __init osc_init(void)
                                          ptlrpc_add_rqs_to_pool);
 
        if (osc_rq_pool == NULL)
-               GOTO(out_type, rc = -ENOMEM);
+               GOTO(out_shrinker, rc = -ENOMEM);
 
        rc = osc_start_grant_work();
        if (rc != 0)
                GOTO(out_req_pool, rc);
 
+       rc = class_register_type(&osc_obd_ops, NULL, true,
+                                LUSTRE_OSC_NAME, &osc_device_type);
+       if (rc < 0)
+               GOTO(out_stop_grant, rc);
+
        RETURN(rc);
 
+out_stop_grant:
+       osc_stop_grant_work();
 out_req_pool:
        ptlrpc_free_rq_pool(osc_rq_pool);
-out_type:
-       class_unregister_type(LUSTRE_OSC_NAME);
+out_shrinker:
+       shrinker_free(osc_cache_shrinker);
 out_kmem:
        lu_kmem_fini(osc_caches);
 
@@ -3647,11 +4047,11 @@ out_kmem:
 
 static void __exit osc_exit(void)
 {
-       osc_stop_grant_work();
-       remove_shrinker(osc_cache_shrinker);
        class_unregister_type(LUSTRE_OSC_NAME);
-       lu_kmem_fini(osc_caches);
        ptlrpc_free_rq_pool(osc_rq_pool);
+       osc_stop_grant_work();
+       shrinker_free(osc_cache_shrinker);
+       lu_kmem_fini(osc_caches);
 }
 
 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");