Whamcloud - gitweb
LU-14550 libcfs: fix setting of debug_path
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 96a077b..6cd0a2c 100644 (file)
@@ -33,8 +33,9 @@
 #define DEBUG_SUBSYSTEM S_OSC
 
 #include <linux/workqueue.h>
+#include <libcfs/libcfs.h>
+#include <linux/falloc.h>
 #include <lprocfs_status.h>
-#include <lustre_debug.h>
 #include <lustre_dlm.h>
 #include <lustre_fid.h>
 #include <lustre_ha.h>
@@ -45,6 +46,7 @@
 #include <obd_cksum.h>
 #include <obd_class.h>
 #include <lustre_osc.h>
+#include <linux/falloc.h>
 
 #include "osc_internal.h"
 
@@ -236,10 +238,7 @@ int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
                sa->sa_upcall = upcall;
                sa->sa_cookie = cookie;
 
-               if (rqset == PTLRPCD_SET)
-                       ptlrpcd_add_req(req);
-               else
-                       ptlrpc_set_add_req(rqset, req);
+               ptlrpc_set_add_req(rqset, req);
        }
 
        RETURN(0);
@@ -324,10 +323,7 @@ int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
        la->la_upcall = upcall;
        la->la_cookie = cookie;
 
-       if (rqset == PTLRPCD_SET)
-               ptlrpcd_add_req(req);
-       else
-               ptlrpc_set_add_req(rqset, req);
+       ptlrpc_set_add_req(rqset, req);
 
        RETURN(0);
 }
@@ -426,6 +422,71 @@ int osc_punch_send(struct obd_export *exp, struct obdo *oa,
 }
 EXPORT_SYMBOL(osc_punch_send);
 
+/**
+ * osc_fallocate_base() - Handles fallocate request.
+ *
+ * @exp:       Export structure
+ * @oa:                Attributes passed to OSS from client (obdo structure)
+ * @upcall:    Primary & supplementary group information
+ * @cookie:    Exclusive identifier
+ * @rqset:     Request list.
+ * @mode:      Operation done on given range.
+ *
+ * osc_fallocate_base() - Handles fallocate requests only. Only block
+ * allocation or standard preallocate operation is supported currently.
+ * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
+ * is supported via SETATTR request.
+ *
+ * Return: Non-zero on failure and O on success.
+ */
+int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
+                      obd_enqueue_update_f upcall, void *cookie, int mode)
+{
+       struct ptlrpc_request *req;
+       struct osc_setattr_args *sa;
+       struct ost_body *body;
+       struct obd_import *imp = class_exp2cliimp(exp);
+       int rc;
+       ENTRY;
+
+       /*
+        * Only mode == 0 (which is standard prealloc) is supported now.
+        * Punch is not supported yet.
+        */
+       if (mode & ~FALLOC_FL_KEEP_SIZE)
+               RETURN(-EOPNOTSUPP);
+       oa->o_falloc_mode = mode;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                  &RQF_OST_FALLOCATE);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
+       if (rc != 0) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
+
+       lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
+
+       ptlrpc_request_set_replen(req);
+
+       req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
+       BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
+       sa = ptlrpc_req_async_args(sa, req);
+       sa->sa_oa = oa;
+       sa->sa_upcall = upcall;
+       sa->sa_cookie = cookie;
+
+       ptlrpcd_add_req(req);
+
+       RETURN(0);
+}
+
 static int osc_sync_interpret(const struct lu_env *env,
                              struct ptlrpc_request *req, void *args, int rc)
 {
@@ -499,10 +560,7 @@ int osc_sync_base(struct osc_object *obj, struct obdo *oa,
        fa->fa_upcall = upcall;
        fa->fa_cookie = cookie;
 
-       if (rqset == PTLRPCD_SET)
-               ptlrpcd_add_req(req);
-       else
-               ptlrpc_set_add_req(rqset, req);
+       ptlrpc_set_add_req(rqset, req);
 
        RETURN (0);
 }
@@ -577,7 +635,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
         struct client_obd     *cli = &exp->exp_obd->u.cli;
         struct ptlrpc_request *req;
         struct ost_body       *body;
-       struct list_head       cancels = LIST_HEAD_INIT(cancels);
+       LIST_HEAD(cancels);
         int rc, count;
         ENTRY;
 
@@ -613,17 +671,16 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
 
        req->rq_interpret_reply = osc_destroy_interpret;
        if (!osc_can_send_destroy(cli)) {
-               struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
-
                /*
                 * Wait until the number of on-going destroy RPCs drops
                 * under max_rpc_in_flight
                 */
-               rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
-                                           osc_can_send_destroy(cli), &lwi);
+               rc = l_wait_event_abortable_exclusive(
+                       cli->cl_destroy_waitq,
+                       osc_can_send_destroy(cli));
                if (rc) {
                        ptlrpc_req_finished(req);
-                       RETURN(rc);
+                       RETURN(-EINTR);
                }
        }
 
@@ -641,25 +698,22 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
 
        oa->o_valid |= bits;
        spin_lock(&cli->cl_loi_list_lock);
-       if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
+       if (cli->cl_ocd_grant_param)
                oa->o_dirty = cli->cl_dirty_grant;
        else
                oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
-       if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
-                    cli->cl_dirty_max_pages)) {
-               CERROR("dirty %lu - %lu > dirty_max %lu\n",
-                      cli->cl_dirty_pages, cli->cl_dirty_transit,
+       if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
+               CERROR("dirty %lu > dirty_max %lu\n",
+                      cli->cl_dirty_pages,
                       cli->cl_dirty_max_pages);
                oa->o_undirty = 0;
-       } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
-                           atomic_long_read(&obd_dirty_transit_pages) >
+       } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
                            (long)(obd_max_dirty_pages + 1))) {
                /* The atomic_read() allowing the atomic_inc() are
                 * not covered by a lock thus they may safely race and trip
                 * this CERROR() unless we add in a small fudge factor (+1). */
-               CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
+               CERROR("%s: dirty %ld > system dirty_max %ld\n",
                       cli_name(cli), atomic_long_read(&obd_dirty_pages),
-                      atomic_long_read(&obd_dirty_transit_pages),
                       obd_max_dirty_pages);
                oa->o_undirty = 0;
        } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
@@ -675,13 +729,12 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
                nrpages *= cli->cl_max_rpcs_in_flight + 1;
                nrpages = max(nrpages, cli->cl_dirty_max_pages);
                undirty = nrpages << PAGE_SHIFT;
-               if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
-                                GRANT_PARAM)) {
+               if (cli->cl_ocd_grant_param) {
                        int nrextents;
 
                        /* take extent tax into account when asking for more
                         * grant space */
-                       nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
+                       nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
                                     cli->cl_max_extent_pages;
                        undirty += nrextents * cli->cl_grant_extent_tax;
                }
@@ -692,11 +745,20 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
                                    ~(PTLRPC_MAX_BRW_SIZE * 4UL));
         }
        oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
-        oa->o_dropped = cli->cl_lost_grant;
-        cli->cl_lost_grant = 0;
+       /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
+       if (cli->cl_lost_grant > INT_MAX) {
+               CDEBUG(D_CACHE,
+                     "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
+                     cli_name(cli), cli->cl_lost_grant);
+               oa->o_dropped = INT_MAX;
+       } else {
+               oa->o_dropped = cli->cl_lost_grant;
+       }
+       cli->cl_lost_grant -= oa->o_dropped;
        spin_unlock(&cli->cl_loi_list_lock);
-       CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
-               oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
+       CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
+              " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
+              oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
 }
 
 void osc_update_next_shrink(struct client_obd *cli)
@@ -845,9 +907,11 @@ static int osc_should_shrink_grant(struct client_obd *client)
        if (client->cl_import == NULL)
                return 0;
 
-        if ((client->cl_import->imp_connect_data.ocd_connect_flags &
-             OBD_CONNECT_GRANT_SHRINK) == 0)
-                return 0;
+       if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
+           client->cl_import->imp_grant_shrink_disabled) {
+               osc_update_next_shrink(client);
+               return 0;
+       }
 
        if (ktime_get_seconds() >= next_shrink - 5) {
                /* Get the current RPC size directly, instead of going via:
@@ -966,12 +1030,19 @@ void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
        spin_lock(&cli->cl_loi_list_lock);
        cli->cl_avail_grant = ocd->ocd_grant;
        if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
-               cli->cl_avail_grant -= cli->cl_reserved_grant;
+               unsigned long consumed = cli->cl_reserved_grant;
+
                if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
-                       cli->cl_avail_grant -= cli->cl_dirty_grant;
+                       consumed += cli->cl_dirty_grant;
                else
-                       cli->cl_avail_grant -=
-                                       cli->cl_dirty_pages << PAGE_SHIFT;
+                       consumed += cli->cl_dirty_pages << PAGE_SHIFT;
+               if (cli->cl_avail_grant < consumed) {
+                       CERROR("%s: granted %ld but already consumed %ld\n",
+                              cli_name(cli), cli->cl_avail_grant, consumed);
+                       cli->cl_avail_grant = 0;
+               } else {
+                       cli->cl_avail_grant -= consumed;
+               }
        }
 
        if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
@@ -989,21 +1060,21 @@ void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
                                             ~chunk_mask) & chunk_mask;
                /* determine maximum extent size, in #pages */
                size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
-               cli->cl_max_extent_pages = size >> PAGE_SHIFT;
-               if (cli->cl_max_extent_pages == 0)
-                       cli->cl_max_extent_pages = 1;
+               cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
+               cli->cl_ocd_grant_param = 1;
        } else {
+               cli->cl_ocd_grant_param = 0;
                cli->cl_grant_extent_tax = 0;
                cli->cl_chunkbits = PAGE_SHIFT;
                cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
        }
        spin_unlock(&cli->cl_loi_list_lock);
 
-       CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
-               "chunk bits: %d cl_max_extent_pages: %d\n",
-               cli_name(cli),
-               cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
-               cli->cl_max_extent_pages);
+       CDEBUG(D_CACHE,
+              "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
+              cli_name(cli),
+              cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
+              cli->cl_max_extent_pages);
 
        if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
                osc_add_grant_list(cli);
@@ -1290,28 +1361,62 @@ static int osc_checksum_bulk_rw(const char *obd_name,
        RETURN(rc);
 }
 
+static inline void osc_release_bounce_pages(struct brw_page **pga,
+                                           u32 page_count)
+{
+#ifdef HAVE_LUSTRE_CRYPTO
+       int i;
+
+       for (i = 0; i < page_count; i++) {
+               /* Bounce pages allocated by a call to
+                * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
+                * are identified thanks to the PageChecked flag.
+                */
+               if (PageChecked(pga[i]->pg))
+                       llcrypt_finalize_bounce_page(&pga[i]->pg);
+               pga[i]->count -= pga[i]->bp_count_diff;
+               pga[i]->off += pga[i]->bp_off_diff;
+       }
+#endif
+}
+
 static int
 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
                     u32 page_count, struct brw_page **pga,
                     struct ptlrpc_request **reqp, int resend)
 {
-        struct ptlrpc_request   *req;
-        struct ptlrpc_bulk_desc *desc;
-        struct ost_body         *body;
-        struct obd_ioobj        *ioobj;
-        struct niobuf_remote    *niobuf;
+       struct ptlrpc_request *req;
+       struct ptlrpc_bulk_desc *desc;
+       struct ost_body *body;
+       struct obd_ioobj *ioobj;
+       struct niobuf_remote *niobuf;
        int niocount, i, requested_nob, opc, rc, short_io_size = 0;
-        struct osc_brw_async_args *aa;
-        struct req_capsule      *pill;
-        struct brw_page *pg_prev;
+       struct osc_brw_async_args *aa;
+       struct req_capsule *pill;
+       struct brw_page *pg_prev;
        void *short_io_buf;
        const char *obd_name = cli->cl_import->imp_obd->obd_name;
+       struct inode *inode;
+       bool directio = false;
 
-        ENTRY;
-        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
-                RETURN(-ENOMEM); /* Recoverable */
-        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
-                RETURN(-EINVAL); /* Fatal */
+       ENTRY;
+       inode = page2inode(pga[0]->pg);
+       if (inode == NULL) {
+               /* Try to get reference to inode from cl_page if we are
+                * dealing with direct IO, as handled pages are not
+                * actual page cache pages.
+                */
+               struct osc_async_page *oap = brw_page2oap(pga[0]);
+               struct cl_page *clpage = oap2cl_page(oap);
+
+               inode = clpage->cp_inode;
+               if (inode)
+                       directio = true;
+       }
+       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
+               RETURN(-ENOMEM); /* Recoverable */
+       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
+               RETURN(-EINVAL); /* Fatal */
 
        if ((cmd & OBD_BRW_WRITE) != 0) {
                opc = OST_WRITE;
@@ -1325,6 +1430,96 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
+       if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
+               for (i = 0; i < page_count; i++) {
+                       struct brw_page *pg = pga[i];
+                       struct page *data_page = NULL;
+                       bool retried = false;
+                       bool lockedbymyself;
+                       u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
+                       struct address_space *map_orig = NULL;
+                       pgoff_t index_orig;
+
+retry_encrypt:
+                       if (nunits & ~LUSTRE_ENCRYPTION_MASK)
+                               nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
+                                       LUSTRE_ENCRYPTION_UNIT_SIZE;
+                       /* The page can already be locked when we arrive here.
+                        * This is possible when cl_page_assume/vvp_page_assume
+                        * is stuck on wait_on_page_writeback with page lock
+                        * held. In this case there is no risk for the lock to
+                        * be released while we are doing our encryption
+                        * processing, because writeback against that page will
+                        * end in vvp_page_completion_write/cl_page_completion,
+                        * which means only once the page is fully processed.
+                        */
+                       lockedbymyself = trylock_page(pg->pg);
+                       if (directio) {
+                               map_orig = pg->pg->mapping;
+                               pg->pg->mapping = inode->i_mapping;
+                               index_orig = pg->pg->index;
+                               pg->pg->index = pg->off >> PAGE_SHIFT;
+                       }
+                       data_page =
+                               llcrypt_encrypt_pagecache_blocks(pg->pg,
+                                                                nunits, 0,
+                                                                GFP_NOFS);
+                       if (directio) {
+                               pg->pg->mapping = map_orig;
+                               pg->pg->index = index_orig;
+                       }
+                       if (lockedbymyself)
+                               unlock_page(pg->pg);
+                       if (IS_ERR(data_page)) {
+                               rc = PTR_ERR(data_page);
+                               if (rc == -ENOMEM && !retried) {
+                                       retried = true;
+                                       rc = 0;
+                                       goto retry_encrypt;
+                               }
+                               ptlrpc_request_free(req);
+                               RETURN(rc);
+                       }
+                       /* Set PageChecked flag on bounce page for
+                        * disambiguation in osc_release_bounce_pages().
+                        */
+                       SetPageChecked(data_page);
+                       pg->pg = data_page;
+                       /* there should be no gap in the middle of page array */
+                       if (i == page_count - 1) {
+                               struct osc_async_page *oap = brw_page2oap(pg);
+
+                               oa->o_size = oap->oap_count +
+                                       oap->oap_obj_off + oap->oap_page_off;
+                       }
+                       /* len is forced to nunits, and relative offset to 0
+                        * so store the old, clear text info
+                        */
+                       pg->bp_count_diff = nunits - pg->count;
+                       pg->count = nunits;
+                       pg->bp_off_diff = pg->off & ~PAGE_MASK;
+                       pg->off = pg->off & PAGE_MASK;
+               }
+       } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
+               for (i = 0; i < page_count; i++) {
+                       struct brw_page *pg = pga[i];
+                       u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
+
+                       if (nunits & ~LUSTRE_ENCRYPTION_MASK)
+                               nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
+                                       LUSTRE_ENCRYPTION_UNIT_SIZE;
+                       /* count/off are forced to cover the whole encryption
+                        * unit size so that all encrypted data is stored on the
+                        * OST, so adjust bp_{count,off}_diff for the size of
+                        * the clear text.
+                        */
+                       pg->bp_count_diff = nunits - pg->count;
+                       pg->count = nunits;
+                       pg->bp_off_diff = pg->off & ~PAGE_MASK;
+                       pg->off = pg->off & PAGE_MASK;
+               }
+       }
+
         for (niocount = i = 1; i < page_count; i++) {
                 if (!can_merge_pages(pga[i - 1], pga[i]))
                         niocount++;
@@ -1336,8 +1531,13 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
                              niocount * sizeof(*niobuf));
 
-       for (i = 0; i < page_count; i++)
+       for (i = 0; i < page_count; i++) {
                short_io_size += pga[i]->count;
+               if (!inode || !IS_ENCRYPTED(inode)) {
+                       pga[i]->bp_count_diff = 0;
+                       pga[i]->bp_off_diff = 0;
+               }
+       }
 
        /* Check if read/write is small enough to be a short io. */
        if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
@@ -1371,8 +1571,7 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
        desc = ptlrpc_prep_bulk_imp(req, page_count,
                cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
                (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
-                       PTLRPC_BULK_PUT_SINK) |
-                       PTLRPC_BULK_BUF_KIOV,
+                       PTLRPC_BULK_PUT_SINK),
                OST_BULK_PORTAL,
                &ptlrpc_bulk_kiov_pin_ops);
 
@@ -1448,13 +1647,13 @@ no_bulk:
                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
                         (pg->flag & OBD_BRW_SRVLOCK));
                if (short_io_size != 0 && opc == OST_WRITE) {
-                       unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
+                       unsigned char *ptr = kmap_atomic(pg->pg);
 
                        LASSERT(short_io_size >= requested_nob + pg->count);
                        memcpy(short_io_buf + requested_nob,
                               ptr + poff,
                               pg->count);
-                       ll_kunmap_atomic(ptr, KM_USER0);
+                       kunmap_atomic(ptr);
                } else if (short_io_size == 0) {
                        desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
                                                         pg->count);
@@ -1583,9 +1782,8 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
         * file/fid, not during the resends/retries. */
        snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
                 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
-                (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
-                 libcfs_debug_file_path_arr :
-                 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
+                (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
+                 libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
@@ -1628,7 +1826,6 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
        if (rc)
                CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
        filp_close(filp, NULL);
-       return;
 }
 
 static int
@@ -1728,6 +1925,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                &req->rq_import->imp_connection->c_peer;
        struct ost_body *body;
        u32 client_cksum = 0;
+       struct inode *inode;
+       unsigned int blockbits = 0, blocksize = 0;
 
        ENTRY;
 
@@ -1829,10 +2028,10 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 
                        CDEBUG(D_CACHE, "page %p count %d\n",
                               aa->aa_ppga[i]->pg, count);
-                       ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
+                       ptr = kmap_atomic(aa->aa_ppga[i]->pg);
                        memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
                               count);
-                       ll_kunmap_atomic((void *) ptr, KM_USER0);
+                       kunmap_atomic((void *) ptr);
 
                        buf += count;
                        nob -= count;
@@ -1915,6 +2114,83 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
        } else {
                rc = 0;
        }
+
+       inode = page2inode(aa->aa_ppga[0]->pg);
+       if (inode == NULL) {
+               /* Try to get reference to inode from cl_page if we are
+                * dealing with direct IO, as handled pages are not
+                * actual page cache pages.
+                */
+               struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]);
+
+               inode = oap2cl_page(oap)->cp_inode;
+               if (inode) {
+                       blockbits = inode->i_blkbits;
+                       blocksize = 1 << blockbits;
+               }
+       }
+       if (inode && IS_ENCRYPTED(inode)) {
+               int idx;
+
+               if (!llcrypt_has_encryption_key(inode)) {
+                       CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
+                       GOTO(out, rc);
+               }
+               for (idx = 0; idx < aa->aa_page_count; idx++) {
+                       struct brw_page *pg = aa->aa_ppga[idx];
+                       unsigned int offs = 0;
+
+                       while (offs < PAGE_SIZE) {
+                               /* do not decrypt if page is all 0s */
+                               if (memchr_inv(page_address(pg->pg) + offs, 0,
+                                        LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
+                                       /* if page is empty forward info to
+                                        * upper layers (ll_io_zero_page) by
+                                        * clearing PagePrivate2
+                                        */
+                                       if (!offs)
+                                               ClearPagePrivate2(pg->pg);
+                                       break;
+                               }
+
+                               if (blockbits) {
+                                       /* This is direct IO case. Directly call
+                                        * decrypt function that takes inode as
+                                        * input parameter. Page does not need
+                                        * to be locked.
+                                        */
+                                       u64 lblk_num =
+                                               ((u64)(pg->off >> PAGE_SHIFT) <<
+                                                    (PAGE_SHIFT - blockbits)) +
+                                                      (offs >> blockbits);
+                                       unsigned int i;
+
+                                       for (i = offs;
+                                            i < offs +
+                                                   LUSTRE_ENCRYPTION_UNIT_SIZE;
+                                            i += blocksize, lblk_num++) {
+                                               rc =
+                                                 llcrypt_decrypt_block_inplace(
+                                                         inode, pg->pg,
+                                                         blocksize, i,
+                                                         lblk_num);
+                                               if (rc)
+                                                       break;
+                                       }
+                               } else {
+                                       rc = llcrypt_decrypt_pagecache_blocks(
+                                               pg->pg,
+                                               LUSTRE_ENCRYPTION_UNIT_SIZE,
+                                               offs);
+                               }
+                               if (rc)
+                                       GOTO(out, rc);
+
+                               offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
+                       }
+               }
+       }
+
 out:
        if (rc >= 0)
                lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
@@ -1943,16 +2219,12 @@ static int osc_brw_redo_request(struct ptlrpc_request *request,
                 RETURN(rc);
 
        list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
-                if (oap->oap_request != NULL) {
-                        LASSERTF(request == oap->oap_request,
-                                 "request %p != oap_request %p\n",
-                                 request, oap->oap_request);
-                        if (oap->oap_interrupted) {
-                                ptlrpc_req_finished(new_req);
-                                RETURN(-EINTR);
-                        }
-                }
-        }
+               if (oap->oap_request != NULL) {
+                       LASSERTF(request == oap->oap_request,
+                                "request %p != oap_request %p\n",
+                                request, oap->oap_request);
+               }
+       }
        /*
         * New request takes over pga and oaps from old request.
         * Note that copying a list_head doesn't work, need to move it...
@@ -2028,8 +2300,8 @@ static void sort_brw_pages(struct brw_page **array, int num)
 
 static void osc_release_ppga(struct brw_page **ppga, size_t count)
 {
-        LASSERT(ppga != NULL);
-        OBD_FREE(ppga, sizeof(*ppga) * count);
+       LASSERT(ppga != NULL);
+       OBD_FREE_PTR_ARRAY(ppga, count);
 }
 
 static int brw_interpret(const struct lu_env *env,
@@ -2045,6 +2317,10 @@ static int brw_interpret(const struct lu_env *env,
 
        rc = osc_brw_fini_request(req, rc);
        CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
+
+       /* restore clear text pages */
+       osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
+
        /*
         * When server returns -EINPROGRESS, client should always retry
         * regardless of the number of times the bulk was resent already.
@@ -2132,7 +2408,7 @@ static int brw_interpret(const struct lu_env *env,
        list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
                list_del_init(&ext->oe_link);
                osc_extent_finish(env, ext, 1,
-                                 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
+                                 rc && req->rq_no_delay ? -EAGAIN : rc);
        }
        LASSERT(list_empty(&aa->aa_exts));
        LASSERT(list_empty(&aa->aa_oaps));
@@ -2195,17 +2471,17 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        struct cl_req_attr              *crattr = NULL;
        loff_t                          starting_offset = OBD_OBJECT_EOF;
        loff_t                          ending_offset = 0;
-       int                             mpflag = 0;
+       /* '1' for consistency with code that checks !mpflag to restore */
+       int mpflag = 1;
        int                             mem_tight = 0;
        int                             page_count = 0;
        bool                            soft_sync = false;
-       bool                            interrupted = false;
        bool                            ndelay = false;
        int                             i;
        int                             grant = 0;
        int                             rc;
        __u32                           layout_version = 0;
-       struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
+       LIST_HEAD(rpc_list);
        struct ost_body                 *body;
        ENTRY;
        LASSERT(!list_empty(ext_list));
@@ -2216,16 +2492,16 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                mem_tight |= ext->oe_memalloc;
                grant += ext->oe_grants;
                page_count += ext->oe_nr_pages;
-               layout_version = MAX(layout_version, ext->oe_layout_version);
+               layout_version = max(layout_version, ext->oe_layout_version);
                if (obj == NULL)
                        obj = ext->oe_obj;
        }
 
        soft_sync = osc_over_unstable_soft_limit(cli);
        if (mem_tight)
-               mpflag = cfs_memory_pressure_get_and_set();
+               mpflag = memalloc_noreclaim_save();
 
-       OBD_ALLOC(pga, sizeof(*pga) * page_count);
+       OBD_ALLOC_PTR_ARRAY(pga, page_count);
        if (pga == NULL)
                GOTO(out, rc = -ENOMEM);
 
@@ -2256,8 +2532,6 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                        else
                                LASSERT(oap->oap_page_off + oap->oap_count ==
                                        PAGE_SIZE);
-                       if (oap->oap_interrupted)
-                               interrupted = true;
                }
                if (ext->oe_ndelay)
                        ndelay = true;
@@ -2296,8 +2570,6 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        req->rq_interpret_reply = brw_interpret;
        req->rq_memalloc = mem_tight != 0;
        oap->oap_request = ptlrpc_request_addref(req);
-       if (interrupted && !req->rq_intr)
-               ptlrpc_mark_interrupted(req);
        if (ndelay) {
                req->rq_no_resend = req->rq_no_delay = 1;
                /* probably set a shorter timeout value.
@@ -2349,16 +2621,18 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        EXIT;
 
 out:
-       if (mem_tight != 0)
-               cfs_memory_pressure_restore(mpflag);
+       if (mem_tight)
+               memalloc_noreclaim_restore(mpflag);
 
        if (rc != 0) {
                LASSERT(req == NULL);
 
                if (oa)
                        OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
-               if (pga)
-                       OBD_FREE(pga, sizeof(*pga) * page_count);
+               if (pga) {
+                       osc_release_bounce_pages(pga, page_count);
+                       osc_release_ppga(pga, page_count);
+               }
                /* this should happen rarely and is pretty bad, it makes the
                 * pending list not follow the dirty order */
                while (!list_empty(ext_list)) {
@@ -2437,6 +2711,10 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
        struct ost_lvb *lvb = aa->oa_lvb;
        __u32 lvb_len = sizeof(*lvb);
        __u64 flags = 0;
+       struct ldlm_enqueue_info einfo = {
+               .ei_type = aa->oa_type,
+               .ei_mode = mode,
+       };
 
        ENTRY;
 
@@ -2466,9 +2744,8 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
        }
 
        /* Complete obtaining the lock procedure. */
-       rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
-                                  aa->oa_mode, aa->oa_flags, lvb, lvb_len,
-                                  lockh, rc);
+       rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
+                                  lvb, lvb_len, lockh, rc);
        /* Complete osc stuff. */
        rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
                              aa->oa_flags, aa->oa_speculative, rc);
@@ -2480,8 +2757,6 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
        RETURN(rc);
 }
 
-struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
-
 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
  * other synchronous requests, however keeping some locks and trying to obtain
@@ -2533,7 +2808,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        if (intent != 0)
                match_flags |= LDLM_FL_BLOCK_GRANTED;
        mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
-                              einfo->ei_type, policy, mode, &lockh, 0);
+                              einfo->ei_type, policy, mode, &lockh);
        if (mode) {
                struct ldlm_lock *matched;
 
@@ -2575,23 +2850,6 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
                RETURN(-ENOLCK);
 
-       if (intent) {
-               req = ptlrpc_request_alloc(class_exp2cliimp(exp),
-                                          &RQF_LDLM_ENQUEUE_LVB);
-               if (req == NULL)
-                       RETURN(-ENOMEM);
-
-               rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
-               if (rc) {
-                        ptlrpc_request_free(req);
-                        RETURN(rc);
-                }
-
-                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
-                                     sizeof *lvb);
-                ptlrpc_request_set_replen(req);
-        }
-
         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
         *flags &= ~LDLM_FL_BLOCK_GRANTED;
 
@@ -2620,28 +2878,22 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                        }
 
                        req->rq_interpret_reply = osc_enqueue_interpret;
-                       if (rqset == PTLRPCD_SET)
-                               ptlrpcd_add_req(req);
-                       else
-                               ptlrpc_set_add_req(rqset, req);
-               } else if (intent) {
-                       ptlrpc_req_finished(req);
+                       ptlrpc_set_add_req(rqset, req);
                }
                RETURN(rc);
        }
 
        rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
                              flags, speculative, rc);
-       if (intent)
-               ptlrpc_req_finished(req);
 
        RETURN(rc);
 }
 
-int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
-                  enum ldlm_type type, union ldlm_policy_data *policy,
-                  enum ldlm_mode mode, __u64 *flags, void *data,
-                  struct lustre_handle *lockh, int unref)
+int osc_match_base(const struct lu_env *env, struct obd_export *exp,
+                  struct ldlm_res_id *res_id, enum ldlm_type type,
+                  union ldlm_policy_data *policy, enum ldlm_mode mode,
+                  __u64 *flags, struct osc_object *obj,
+                  struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
 {
        struct obd_device *obd = exp->exp_obd;
        __u64 lflags = *flags;
@@ -2656,23 +2908,26 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
        policy->l_extent.end |= ~PAGE_MASK;
 
-        /* Next, search for already existing extent locks that will cover us */
-        /* If we're trying to read, we also search for an existing PW lock.  The
-         * VFS and page cache already protect us locally, so lots of readers/
-         * writers can share a single PW lock. */
-        rc = mode;
-        if (mode == LCK_PR)
-                rc |= LCK_PW;
-        rc = ldlm_lock_match(obd->obd_namespace, lflags,
-                             res_id, type, policy, rc, lockh, unref);
+       /* Next, search for already existing extent locks that will cover us */
+       rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
+                                       res_id, type, policy, mode, lockh,
+                                       match_flags);
        if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
                RETURN(rc);
 
-       if (data != NULL) {
+       if (obj != NULL) {
                struct ldlm_lock *lock = ldlm_handle2lock(lockh);
 
                LASSERT(lock != NULL);
-               if (!osc_set_lock_data(lock, data)) {
+               if (osc_set_lock_data(lock, obj)) {
+                       lock_res_and_lock(lock);
+                       if (!ldlm_is_lvb_cached(lock)) {
+                               LASSERT(lock->l_ast_data == obj);
+                               osc_lock_lvb_update(env, obj, lock, NULL);
+                               ldlm_set_lvb_cached(lock);
+                       }
+                       unlock_res_and_lock(lock);
+               } else {
                        ldlm_lock_decref(lockh, rc);
                        rc = 0;
                }
@@ -2779,19 +3034,17 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
        struct obd_device     *obd = class_exp2obd(exp);
        struct obd_statfs     *msfs;
        struct ptlrpc_request *req;
-       struct obd_import     *imp = NULL;
+       struct obd_import     *imp, *imp0;
        int rc;
        ENTRY;
 
-
-        /*Since the request might also come from lprocfs, so we need
-         *sync this with client_disconnect_export Bug15684*/
-       down_read(&obd->u.cli.cl_sem);
-        if (obd->u.cli.cl_import)
-                imp = class_import_get(obd->u.cli.cl_import);
-       up_read(&obd->u.cli.cl_sem);
-        if (!imp)
-                RETURN(-ENODEV);
+       /*Since the request might also come from lprocfs, so we need
+        *sync this with client_disconnect_export Bug15684
+        */
+       with_imp_locked(obd, imp0, rc)
+               imp = class_import_get(imp0);
+       if (rc)
+               RETURN(rc);
 
        /* We could possibly pass max_age in the request (as an absolute
         * timestamp or a "seconds.usec ago") so the target can avoid doing
@@ -2864,7 +3117,7 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
        default:
                rc = -ENOTTY;
                CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
-                      obd->obd_name, cmd, current_comm(), rc);
+                      obd->obd_name, cmd, current->comm, rc);
                break;
        }
 
@@ -3339,7 +3592,7 @@ int osc_cleanup_common(struct obd_device *obd)
 }
 EXPORT_SYMBOL(osc_cleanup_common);
 
-static struct obd_ops osc_obd_ops = {
+static const struct obd_ops osc_obd_ops = {
         .o_owner                = THIS_MODULE,
         .o_setup                = osc_setup,
         .o_precleanup           = osc_precleanup,
@@ -3361,25 +3614,28 @@ static struct obd_ops osc_obd_ops = {
         .o_quotactl             = osc_quotactl,
 };
 
-static struct shrinker *osc_cache_shrinker;
-struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
+LIST_HEAD(osc_shrink_list);
 DEFINE_SPINLOCK(osc_shrink_lock);
 
-#ifndef HAVE_SHRINKER_COUNT
-static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
+#ifdef HAVE_SHRINKER_COUNT
+static struct shrinker osc_cache_shrinker = {
+       .count_objects  = osc_cache_shrink_count,
+       .scan_objects   = osc_cache_shrink_scan,
+       .seeks          = DEFAULT_SEEKS,
+};
+#else
+static int osc_cache_shrink(struct shrinker *shrinker,
+                           struct shrink_control *sc)
 {
-       struct shrink_control scv = {
-               .nr_to_scan = shrink_param(sc, nr_to_scan),
-               .gfp_mask   = shrink_param(sc, gfp_mask)
-       };
-#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
-       struct shrinker *shrinker = NULL;
-#endif
+       (void)osc_cache_shrink_scan(shrinker, sc);
 
-       (void)osc_cache_shrink_scan(shrinker, &scv);
-
-       return osc_cache_shrink_count(shrinker, &scv);
+       return osc_cache_shrink_count(shrinker, sc);
 }
+
+static struct shrinker osc_cache_shrinker = {
+       .shrink   = osc_cache_shrink,
+       .seeks    = DEFAULT_SEEKS,
+};
 #endif
 
 static int __init osc_init(void)
@@ -3387,8 +3643,6 @@ static int __init osc_init(void)
        unsigned int reqpool_size;
        unsigned int reqsize;
        int rc;
-       DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
-                        osc_cache_shrink_count, osc_cache_shrink_scan);
        ENTRY;
 
        /* print an address of _any_ initialized kernel symbol from this
@@ -3400,16 +3654,18 @@ static int __init osc_init(void)
        if (rc)
                RETURN(rc);
 
-       rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
+       rc = class_register_type(&osc_obd_ops, NULL, true,
                                 LUSTRE_OSC_NAME, &osc_device_type);
        if (rc)
                GOTO(out_kmem, rc);
 
-       osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
+       rc = register_shrinker(&osc_cache_shrinker);
+       if (rc)
+               GOTO(out_type, rc);
 
        /* This is obviously too much memory, only prevent overflow here */
        if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
-               GOTO(out_type, rc = -EINVAL);
+               GOTO(out_shrinker, rc = -EINVAL);
 
        reqpool_size = osc_reqpool_mem_max << 20;
 
@@ -3430,7 +3686,7 @@ static int __init osc_init(void)
                                          ptlrpc_add_rqs_to_pool);
 
        if (osc_rq_pool == NULL)
-               GOTO(out_type, rc = -ENOMEM);
+               GOTO(out_shrinker, rc = -ENOMEM);
 
        rc = osc_start_grant_work();
        if (rc != 0)
@@ -3440,6 +3696,8 @@ static int __init osc_init(void)
 
 out_req_pool:
        ptlrpc_free_rq_pool(osc_rq_pool);
+out_shrinker:
+       unregister_shrinker(&osc_cache_shrinker);
 out_type:
        class_unregister_type(LUSTRE_OSC_NAME);
 out_kmem:
@@ -3451,7 +3709,7 @@ out_kmem:
 static void __exit osc_exit(void)
 {
        osc_stop_grant_work();
-       remove_shrinker(osc_cache_shrinker);
+       unregister_shrinker(&osc_cache_shrinker);
        class_unregister_type(LUSTRE_OSC_NAME);
        lu_kmem_fini(osc_caches);
        ptlrpc_free_rq_pool(osc_rq_pool);