Whamcloud - gitweb
LU-9679 modules: convert MIN/MAX to kernel style
[fs/lustre-release.git] / lustre / osc / osc_request.c
index d2086a1..cf3136e 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2016, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 
 #define DEBUG_SUBSYSTEM S_OSC
 
-#include <libcfs/libcfs.h>
-
-#include <lustre/lustre_user.h>
-
+#include <linux/workqueue.h>
 #include <lprocfs_status.h>
 #include <lustre_debug.h>
 #include <lustre_dlm.h>
 #include <lustre_fid.h>
 #include <lustre_ha.h>
-#include <lustre_ioctl.h>
+#include <uapi/linux/lustre/lustre_ioctl.h>
 #include <lustre_net.h>
 #include <lustre_obdo.h>
-#include <lustre_param.h>
 #include <obd.h>
 #include <obd_cksum.h>
 #include <obd_class.h>
+#include <lustre_osc.h>
 
-#include "osc_cl_internal.h"
 #include "osc_internal.h"
 
 atomic_t osc_pool_req_count;
@@ -60,17 +56,8 @@ struct ptlrpc_request_pool *osc_rq_pool;
 static unsigned int osc_reqpool_mem_max = 5;
 module_param(osc_reqpool_mem_max, uint, 0444);
 
-struct osc_brw_async_args {
-       struct obdo              *aa_oa;
-       int                       aa_requested_nob;
-       int                       aa_nio_count;
-       u32                       aa_page_count;
-       int                       aa_resends;
-       struct brw_page **aa_ppga;
-       struct client_obd        *aa_cli;
-       struct list_head          aa_oaps;
-       struct list_head          aa_exts;
-};
+static int osc_idle_timeout = 20;
+module_param(osc_idle_timeout, uint, 0644);
 
 #define osc_grant_args osc_brw_async_args
 
@@ -93,18 +80,6 @@ struct osc_ladvise_args {
        void                    *la_cookie;
 };
 
-struct osc_enqueue_args {
-       struct obd_export       *oa_exp;
-       enum ldlm_type          oa_type;
-       enum ldlm_mode          oa_mode;
-       __u64                   *oa_flags;
-       osc_enqueue_upcall_f    oa_upcall;
-       void                    *oa_cookie;
-       struct ost_lvb          *oa_lvb;
-       struct lustre_handle    oa_lockh;
-       unsigned int            oa_agl:1;
-};
-
 static void osc_release_ppga(struct brw_page **ppga, size_t count);
 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
                         void *data, int rc);
@@ -204,24 +179,25 @@ out:
 }
 
 static int osc_setattr_interpret(const struct lu_env *env,
-                                 struct ptlrpc_request *req,
-                                 struct osc_setattr_args *sa, int rc)
+                                struct ptlrpc_request *req, void *args, int rc)
 {
-        struct ost_body *body;
-        ENTRY;
+       struct osc_setattr_args *sa = args;
+       struct ost_body *body;
 
-        if (rc != 0)
-                GOTO(out, rc);
+       ENTRY;
 
-        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                GOTO(out, rc = -EPROTO);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL)
+               GOTO(out, rc = -EPROTO);
 
        lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
                             &body->oa);
 out:
-        rc = sa->sa_upcall(sa->sa_cookie, rc);
-        RETURN(rc);
+       rc = sa->sa_upcall(sa->sa_cookie, rc);
+       RETURN(rc);
 }
 
 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
@@ -253,11 +229,9 @@ int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
                /* Do not wait for response. */
                ptlrpcd_add_req(req);
        } else {
-               req->rq_interpret_reply =
-                       (ptlrpc_interpterer_t)osc_setattr_interpret;
+               req->rq_interpret_reply = osc_setattr_interpret;
 
-               CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
-               sa = ptlrpc_req_async_args(req);
+               sa = ptlrpc_req_async_args(sa, req);
                sa->sa_oa = oa;
                sa->sa_upcall = upcall;
                sa->sa_cookie = cookie;
@@ -345,8 +319,7 @@ int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
        }
 
        req->rq_interpret_reply = osc_ladvise_interpret;
-       CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
-       la = ptlrpc_req_async_args(req);
+       la = ptlrpc_req_async_args(la, req);
        la->la_oa = oa;
        la->la_upcall = upcall;
        la->la_cookie = cookie;
@@ -410,57 +383,57 @@ out:
        RETURN(rc);
 }
 
-int osc_punch_base(struct obd_export *exp, struct obdo *oa,
-                   obd_enqueue_update_f upcall, void *cookie,
-                   struct ptlrpc_request_set *rqset)
+int osc_punch_send(struct obd_export *exp, struct obdo *oa,
+                  obd_enqueue_update_f upcall, void *cookie)
 {
-        struct ptlrpc_request   *req;
-        struct osc_setattr_args *sa;
-        struct ost_body         *body;
-        int                      rc;
-        ENTRY;
+       struct ptlrpc_request *req;
+       struct osc_setattr_args *sa;
+       struct obd_import *imp = class_exp2cliimp(exp);
+       struct ost_body *body;
+       int rc;
 
-        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
-        if (req == NULL)
-                RETURN(-ENOMEM);
+       ENTRY;
 
-        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
-        req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
-        ptlrpc_at_set_req_timeout(req);
+       req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
+       if (rc < 0) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       osc_set_io_portal(req);
+
+       ptlrpc_at_set_req_timeout(req);
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-       LASSERT(body);
-       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+
+       lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
 
        ptlrpc_request_set_replen(req);
 
-       req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
-       CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
-       sa = ptlrpc_req_async_args(req);
+       req->rq_interpret_reply = osc_setattr_interpret;
+       sa = ptlrpc_req_async_args(sa, req);
        sa->sa_oa = oa;
        sa->sa_upcall = upcall;
        sa->sa_cookie = cookie;
-       if (rqset == PTLRPCD_SET)
-               ptlrpcd_add_req(req);
-       else
-               ptlrpc_set_add_req(rqset, req);
+
+       ptlrpcd_add_req(req);
 
        RETURN(0);
 }
+EXPORT_SYMBOL(osc_punch_send);
 
 static int osc_sync_interpret(const struct lu_env *env,
-                              struct ptlrpc_request *req,
-                              void *arg, int rc)
+                             struct ptlrpc_request *req, void *args, int rc)
 {
-       struct osc_fsync_args   *fa = arg;
-       struct ost_body         *body;
-       struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
-       unsigned long           valid = 0;
-       struct cl_object        *obj;
+       struct osc_fsync_args *fa = args;
+       struct ost_body *body;
+       struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+       unsigned long valid = 0;
+       struct cl_object *obj;
        ENTRY;
 
        if (rc != 0)
@@ -520,8 +493,7 @@ int osc_sync_base(struct osc_object *obj, struct obdo *oa,
        ptlrpc_request_set_replen(req);
        req->rq_interpret_reply = osc_sync_interpret;
 
-       CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
-       fa = ptlrpc_req_async_args(req);
+       fa = ptlrpc_req_async_args(fa, req);
        fa->fa_obj = obj;
        fa->fa_oa = oa;
        fa->fa_upcall = upcall;
@@ -571,13 +543,13 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
 }
 
 static int osc_destroy_interpret(const struct lu_env *env,
-                                struct ptlrpc_request *req, void *data,
-                                int rc)
+                                struct ptlrpc_request *req, void *args, int rc)
 {
        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
 
        atomic_dec(&cli->cl_destroy_in_flight);
        wake_up(&cli->cl_destroy_waitq);
+
        return 0;
 }
 
@@ -605,7 +577,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
         struct client_obd     *cli = &exp->exp_obd->u.cli;
         struct ptlrpc_request *req;
         struct ost_body       *body;
-       struct list_head       cancels = LIST_HEAD_INIT(cancels);
+       LIST_HEAD(cancels);
         int rc, count;
         ENTRY;
 
@@ -650,7 +622,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
                rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
                                            osc_can_send_destroy(cli), &lwi);
                if (rc) {
-                       ptlrpc_request_free(req);
+                       ptlrpc_req_finished(req);
                        RETURN(rc);
                }
        }
@@ -673,21 +645,18 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
                oa->o_dirty = cli->cl_dirty_grant;
        else
                oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
-       if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
-                    cli->cl_dirty_max_pages)) {
-               CERROR("dirty %lu - %lu > dirty_max %lu\n",
-                      cli->cl_dirty_pages, cli->cl_dirty_transit,
+       if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
+               CERROR("dirty %lu > dirty_max %lu\n",
+                      cli->cl_dirty_pages,
                       cli->cl_dirty_max_pages);
                oa->o_undirty = 0;
-       } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
-                           atomic_long_read(&obd_dirty_transit_pages) >
+       } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
                            (long)(obd_max_dirty_pages + 1))) {
                /* The atomic_read() allowing the atomic_inc() are
                 * not covered by a lock thus they may safely race and trip
                 * this CERROR() unless we add in a small fudge factor (+1). */
-               CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
+               CERROR("%s: dirty %ld > system dirty_max %ld\n",
                       cli_name(cli), atomic_long_read(&obd_dirty_pages),
-                      atomic_long_read(&obd_dirty_transit_pages),
                       obd_max_dirty_pages);
                oa->o_undirty = 0;
        } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
@@ -697,11 +666,12 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
                oa->o_undirty = 0;
        } else {
                unsigned long nrpages;
+               unsigned long undirty;
 
                nrpages = cli->cl_max_pages_per_rpc;
                nrpages *= cli->cl_max_rpcs_in_flight + 1;
                nrpages = max(nrpages, cli->cl_dirty_max_pages);
-               oa->o_undirty = nrpages << PAGE_SHIFT;
+               undirty = nrpages << PAGE_SHIFT;
                if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
                                 GRANT_PARAM)) {
                        int nrextents;
@@ -710,8 +680,13 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
                         * grant space */
                        nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
                                     cli->cl_max_extent_pages;
-                       oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
+                       undirty += nrextents * cli->cl_grant_extent_tax;
                }
+               /* Do not ask for more than OBD_MAX_GRANT - a margin for server
+                * to add extent tax, etc.
+                */
+               oa->o_undirty = min(undirty, OBD_MAX_GRANT &
+                                   ~(PTLRPC_MAX_BRW_SIZE * 4UL));
         }
        oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
         oa->o_dropped = cli->cl_lost_grant;
@@ -723,10 +698,11 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
 
 void osc_update_next_shrink(struct client_obd *cli)
 {
-        cli->cl_next_shrink_grant =
-                cfs_time_shift(cli->cl_grant_shrink_interval);
-        CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
-               cli->cl_next_shrink_grant);
+       cli->cl_next_shrink_grant = ktime_get_seconds() +
+                                   cli->cl_grant_shrink_interval;
+
+       CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
+              cli->cl_next_shrink_grant);
 }
 
 static void __osc_update_grant(struct client_obd *cli, u64 grant)
@@ -744,30 +720,37 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
         }
 }
 
-static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
-                             u32 keylen, void *key,
-                             u32 vallen, void *val,
-                             struct ptlrpc_request_set *set);
+/**
+ * grant thread data for shrinking space.
+ */
+struct grant_thread_data {
+       struct list_head        gtd_clients;
+       struct mutex            gtd_mutex;
+       unsigned long           gtd_stopped:1;
+};
+static struct grant_thread_data client_gtd;
 
 static int osc_shrink_grant_interpret(const struct lu_env *env,
-                                      struct ptlrpc_request *req,
-                                      void *aa, int rc)
+                                     struct ptlrpc_request *req,
+                                     void *args, int rc)
 {
-        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
-        struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
-        struct ost_body *body;
+       struct osc_grant_args *aa = args;
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       struct ost_body *body;
 
-        if (rc != 0) {
-                __osc_update_grant(cli, oa->o_grant);
-                GOTO(out, rc);
-        }
+       if (rc != 0) {
+               __osc_update_grant(cli, aa->aa_oa->o_grant);
+               GOTO(out, rc);
+       }
 
-        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        LASSERT(body);
-        osc_update_grant(cli, body);
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
+       osc_update_grant(cli, body);
 out:
-        OBDO_FREE(oa);
-        return rc;
+       OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
+       aa->aa_oa = NULL;
+
+       return rc;
 }
 
 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
@@ -827,6 +810,11 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
        osc_announce_cached(cli, &body->oa, 0);
 
        spin_lock(&cli->cl_loi_list_lock);
+       if (target_bytes >= cli->cl_avail_grant) {
+               /* available grant has changed since target calculation */
+               spin_unlock(&cli->cl_loi_list_lock);
+               GOTO(out_free, rc = 0);
+       }
        body->oa.o_grant = cli->cl_avail_grant - target_bytes;
        cli->cl_avail_grant = target_bytes;
        spin_unlock(&cli->cl_loi_list_lock);
@@ -842,20 +830,25 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
                                 sizeof(*body), body, NULL);
         if (rc != 0)
                 __osc_update_grant(cli, body->oa.o_grant);
+out_free:
         OBD_FREE_PTR(body);
         RETURN(rc);
 }
 
 static int osc_should_shrink_grant(struct client_obd *client)
 {
-        cfs_time_t time = cfs_time_current();
-        cfs_time_t next_shrink = client->cl_next_shrink_grant;
+       time64_t next_shrink = client->cl_next_shrink_grant;
 
-        if ((client->cl_import->imp_connect_data.ocd_connect_flags &
-             OBD_CONNECT_GRANT_SHRINK) == 0)
-                return 0;
+       if (client->cl_import == NULL)
+               return 0;
+
+       if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
+           client->cl_import->imp_grant_shrink_disabled) {
+               osc_update_next_shrink(client);
+               return 0;
+       }
 
-       if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
+       if (ktime_get_seconds() >= next_shrink - 5) {
                /* Get the current RPC size directly, instead of going via:
                 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
                 * Keep comment here so that it can be found by searching. */
@@ -870,41 +863,95 @@ static int osc_should_shrink_grant(struct client_obd *client)
         return 0;
 }
 
-static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
+#define GRANT_SHRINK_RPC_BATCH 100
+
+static struct delayed_work work;
+
+static void osc_grant_work_handler(struct work_struct *data)
 {
-       struct client_obd *client;
+       struct client_obd *cli;
+       int rpc_sent;
+       bool init_next_shrink = true;
+       time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
+
+       rpc_sent = 0;
+       mutex_lock(&client_gtd.gtd_mutex);
+       list_for_each_entry(cli, &client_gtd.gtd_clients,
+                           cl_grant_chain) {
+               if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
+                   osc_should_shrink_grant(cli)) {
+                       osc_shrink_grant(cli);
+                       rpc_sent++;
+               }
 
-       list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
-               if (osc_should_shrink_grant(client))
-                       osc_shrink_grant(client);
+               if (!init_next_shrink) {
+                       if (cli->cl_next_shrink_grant < next_shrink &&
+                           cli->cl_next_shrink_grant > ktime_get_seconds())
+                               next_shrink = cli->cl_next_shrink_grant;
+               } else {
+                       init_next_shrink = false;
+                       next_shrink = cli->cl_next_shrink_grant;
+               }
+       }
+       mutex_unlock(&client_gtd.gtd_mutex);
+
+       if (client_gtd.gtd_stopped == 1)
+               return;
+
+       if (next_shrink > ktime_get_seconds()) {
+               time64_t delay = next_shrink - ktime_get_seconds();
+
+               schedule_delayed_work(&work, cfs_time_seconds(delay));
+       } else {
+               schedule_work(&work.work);
        }
-       return 0;
 }
 
-static int osc_add_shrink_grant(struct client_obd *client)
+void osc_schedule_grant_work(void)
 {
-       int rc;
+       cancel_delayed_work_sync(&work);
+       schedule_work(&work.work);
+}
+
+/**
+ * Start grant thread for returing grant to server for idle clients.
+ */
+static int osc_start_grant_work(void)
+{
+       client_gtd.gtd_stopped = 0;
+       mutex_init(&client_gtd.gtd_mutex);
+       INIT_LIST_HEAD(&client_gtd.gtd_clients);
+
+       INIT_DELAYED_WORK(&work, osc_grant_work_handler);
+       schedule_work(&work.work);
 
-       rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
-                                      TIMEOUT_GRANT,
-                                      osc_grant_shrink_grant_cb, NULL,
-                                      &client->cl_grant_shrink_list);
-       if (rc) {
-               CERROR("add grant client %s error %d\n", cli_name(client), rc);
-               return rc;
-       }
-       CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
-       osc_update_next_shrink(client);
        return 0;
 }
 
-static int osc_del_shrink_grant(struct client_obd *client)
+static void osc_stop_grant_work(void)
 {
-        return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
-                                         TIMEOUT_GRANT);
+       client_gtd.gtd_stopped = 1;
+       cancel_delayed_work_sync(&work);
 }
 
-static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
+static void osc_add_grant_list(struct client_obd *client)
+{
+       mutex_lock(&client_gtd.gtd_mutex);
+       list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
+       mutex_unlock(&client_gtd.gtd_mutex);
+}
+
+static void osc_del_grant_list(struct client_obd *client)
+{
+       if (list_empty(&client->cl_grant_chain))
+               return;
+
+       mutex_lock(&client_gtd.gtd_mutex);
+       list_del_init(&client->cl_grant_chain);
+       mutex_unlock(&client_gtd.gtd_mutex);
+}
+
+void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
 {
        /*
         * ocd_grant is the total grant amount we're expect to hold: if we've
@@ -957,10 +1004,10 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
                cli->cl_max_extent_pages);
 
-       if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
-           list_empty(&cli->cl_grant_shrink_list))
-               osc_add_shrink_grant(cli);
+       if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
+               osc_add_grant_list(cli);
 }
+EXPORT_SYMBOL(osc_init_grant);
 
 /* We assume that the reason this OSC got a short read is because it read
  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
@@ -1018,8 +1065,11 @@ static int check_write_rcs(struct ptlrpc_request *req,
 
         /* return error if any niobuf was in error */
         for (i = 0; i < niocount; i++) {
-                if ((int)remote_rcs[i] < 0)
-                        return(remote_rcs[i]);
+               if ((int)remote_rcs[i] < 0) {
+                       CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
+                              i, remote_rcs[i], req);
+                       return remote_rcs[i];
+               }
 
                 if (remote_rcs[i] != 0) {
                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
@@ -1027,8 +1077,8 @@ static int check_write_rcs(struct ptlrpc_request *req,
                         return(-EPROTO);
                 }
         }
-
-        if (req->rq_bulk->bd_nob_transferred != requested_nob) {
+       if (req->rq_bulk != NULL &&
+           req->rq_bulk->bd_nob_transferred != requested_nob) {
                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
                        req->rq_bulk->bd_nob_transferred, requested_nob);
                 return(-EPROTO);
@@ -1048,7 +1098,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
                  * safe to combine */
                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
-                              "report this at https://jira.hpdd.intel.com/\n",
+                              "report this at https://jira.whamcloud.com/\n",
                               p1->flag, p2->flag);
                 }
                 return 0;
@@ -1057,23 +1107,128 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
         return (p1->off + p1->count == p2->off);
 }
 
-static u32 osc_checksum_bulk(int nob, size_t pg_count,
+#if IS_ENABLED(CONFIG_CRC_T10DIF)
+static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
+                                  size_t pg_count, struct brw_page **pga,
+                                  int opc, obd_dif_csum_fn *fn,
+                                  int sector_size,
+                                  u32 *check_sum)
+{
+       struct ahash_request *req;
+       /* Used Adler as the default checksum type on top of DIF tags */
+       unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
+       struct page *__page;
+       unsigned char *buffer;
+       __u16 *guard_start;
+       unsigned int bufsize;
+       int guard_number;
+       int used_number = 0;
+       int used;
+       u32 cksum;
+       int rc = 0;
+       int i = 0;
+
+       LASSERT(pg_count > 0);
+
+       __page = alloc_page(GFP_KERNEL);
+       if (__page == NULL)
+               return -ENOMEM;
+
+       req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+       if (IS_ERR(req)) {
+               rc = PTR_ERR(req);
+               CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
+                      obd_name, cfs_crypto_hash_name(cfs_alg), rc);
+               GOTO(out, rc);
+       }
+
+       buffer = kmap(__page);
+       guard_start = (__u16 *)buffer;
+       guard_number = PAGE_SIZE / sizeof(*guard_start);
+       while (nob > 0 && pg_count > 0) {
+               unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
+
+               /* corrupt the data before we compute the checksum, to
+                * simulate an OST->client data error */
+               if (unlikely(i == 0 && opc == OST_READ &&
+                            OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
+                       unsigned char *ptr = kmap(pga[i]->pg);
+                       int off = pga[i]->off & ~PAGE_MASK;
+
+                       memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
+                       kunmap(pga[i]->pg);
+               }
+
+               /*
+                * The left guard number should be able to hold checksums of a
+                * whole page
+                */
+               rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
+                                                 pga[i]->off & ~PAGE_MASK,
+                                                 count,
+                                                 guard_start + used_number,
+                                                 guard_number - used_number,
+                                                 &used, sector_size,
+                                                 fn);
+               if (rc)
+                       break;
+
+               used_number += used;
+               if (used_number == guard_number) {
+                       cfs_crypto_hash_update_page(req, __page, 0,
+                               used_number * sizeof(*guard_start));
+                       used_number = 0;
+               }
+
+               nob -= pga[i]->count;
+               pg_count--;
+               i++;
+       }
+       kunmap(__page);
+       if (rc)
+               GOTO(out, rc);
+
+       if (used_number != 0)
+               cfs_crypto_hash_update_page(req, __page, 0,
+                       used_number * sizeof(*guard_start));
+
+       bufsize = sizeof(cksum);
+       cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
+
+       /* For sending we only compute the wrong checksum instead
+        * of corrupting the data so it is still correct on a redo */
+       if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+               cksum++;
+
+       *check_sum = cksum;
+out:
+       __free_page(__page);
+       return rc;
+}
+#else /* !CONFIG_CRC_T10DIF */
+#define obd_dif_ip_fn NULL
+#define obd_dif_crc_fn NULL
+#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
+       -EOPNOTSUPP
+#endif /* CONFIG_CRC_T10DIF */
+
+static int osc_checksum_bulk(int nob, size_t pg_count,
                             struct brw_page **pga, int opc,
-                            cksum_type_t cksum_type)
+                            enum cksum_types cksum_type,
+                            u32 *cksum)
 {
-       u32                             cksum;
        int                             i = 0;
-       struct cfs_crypto_hash_desc     *hdesc;
+       struct ahash_request           *req;
        unsigned int                    bufsize;
        unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
 
        LASSERT(pg_count > 0);
 
-       hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
-       if (IS_ERR(hdesc)) {
+       req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+       if (IS_ERR(req)) {
                CERROR("Unable to initialize checksum hash %s\n",
                       cfs_crypto_hash_name(cfs_alg));
-               return PTR_ERR(hdesc);
+               return PTR_ERR(req);
        }
 
        while (nob > 0 && pg_count > 0) {
@@ -1089,7 +1244,7 @@ static u32 osc_checksum_bulk(int nob, size_t pg_count,
                        memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
                        kunmap(pga[i]->pg);
                }
-               cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
+               cfs_crypto_hash_update_page(req, pga[i]->pg,
                                            pga[i]->off & ~PAGE_MASK,
                                            count);
                LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
@@ -1100,15 +1255,38 @@ static u32 osc_checksum_bulk(int nob, size_t pg_count,
                i++;
        }
 
-       bufsize = sizeof(cksum);
-       cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+       bufsize = sizeof(*cksum);
+       cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
 
        /* For sending we only compute the wrong checksum instead
         * of corrupting the data so it is still correct on a redo */
        if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
-               cksum++;
+               (*cksum)++;
 
-       return cksum;
+       return 0;
+}
+
+static int osc_checksum_bulk_rw(const char *obd_name,
+                               enum cksum_types cksum_type,
+                               int nob, size_t pg_count,
+                               struct brw_page **pga, int opc,
+                               u32 *check_sum)
+{
+       obd_dif_csum_fn *fn = NULL;
+       int sector_size = 0;
+       int rc;
+
+       ENTRY;
+       obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
+
+       if (fn)
+               rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
+                                            opc, fn, sector_size, check_sum);
+       else
+               rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
+                                      check_sum);
+
+       RETURN(rc);
 }
 
 static int
@@ -1121,10 +1299,12 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         struct ost_body         *body;
         struct obd_ioobj        *ioobj;
         struct niobuf_remote    *niobuf;
-        int niocount, i, requested_nob, opc, rc;
+       int niocount, i, requested_nob, opc, rc, short_io_size = 0;
         struct osc_brw_async_args *aa;
         struct req_capsule      *pill;
         struct brw_page *pg_prev;
+       void *short_io_buf;
+       const char *obd_name = cli->cl_import->imp_obd->obd_name;
 
         ENTRY;
         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
@@ -1155,17 +1335,38 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
                              niocount * sizeof(*niobuf));
 
+       for (i = 0; i < page_count; i++)
+               short_io_size += pga[i]->count;
+
+       /* Check if read/write is small enough to be a short io. */
+       if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
+           !imp_connect_shortio(cli->cl_import))
+               short_io_size = 0;
+
+       req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
+                            opc == OST_READ ? 0 : short_io_size);
+       if (opc == OST_READ)
+               req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
+                                    short_io_size);
+
         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
         if (rc) {
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
-        req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
-        ptlrpc_at_set_req_timeout(req);
+       osc_set_io_portal(req);
+
+       ptlrpc_at_set_req_timeout(req);
        /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
         * retry logic */
        req->rq_no_retry_einprogress = 1;
 
+       if (short_io_size != 0) {
+               desc = NULL;
+               short_io_buf = NULL;
+               goto no_bulk;
+       }
+
        desc = ptlrpc_prep_bulk_imp(req, page_count,
                cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
                (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
@@ -1177,7 +1378,7 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
         /* NB request now owns desc and will free it when it gets freed */
-
+no_bulk:
         body = req_capsule_client_get(pill, &RMF_OST_BODY);
         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
@@ -1185,6 +1386,15 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
 
        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
 
+       /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
+        * and from_kgid(), because they are asynchronous. Fortunately, variable
+        * oa contains valid o_uid and o_gid in these two operations.
+        * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
+        * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
+        * other process logic */
+       body->oa.o_uid = oa->o_uid;
+       body->oa.o_gid = oa->o_gid;
+
        obdo_to_ioobj(oa, ioobj);
        ioobj->ioo_bufcnt = niocount;
        /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
@@ -1192,7 +1402,26 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
         * "max - 1" for old client compatibility sending "0", and also so the
         * the actual maximum is a power-of-two number, not one less. LU-1431 */
-       ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+       if (desc != NULL)
+               ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+       else /* short io */
+               ioobj_max_brw_set(ioobj, 0);
+
+       if (short_io_size != 0) {
+               if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+                       body->oa.o_valid |= OBD_MD_FLFLAGS;
+                       body->oa.o_flags = 0;
+               }
+               body->oa.o_flags |= OBD_FL_SHORT_IO;
+               CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
+                      short_io_size);
+               if (opc == OST_WRITE) {
+                       short_io_buf = req_capsule_client_get(pill,
+                                                             &RMF_SHORT_IO);
+                       LASSERT(short_io_buf != NULL);
+               }
+       }
+
        LASSERT(page_count > 0);
        pg_prev = pga[0];
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
@@ -1217,9 +1446,19 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
                          pg_prev->pg->index, pg_prev->off);
                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
                         (pg->flag & OBD_BRW_SRVLOCK));
-
-               desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
-                requested_nob += pg->count;
+               if (short_io_size != 0 && opc == OST_WRITE) {
+                       unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
+
+                       LASSERT(short_io_size >= requested_nob + pg->count);
+                       memcpy(short_io_buf + requested_nob,
+                              ptr + poff,
+                              pg->count);
+                       ll_kunmap_atomic(ptr, KM_USER0);
+               } else if (short_io_size == 0) {
+                       desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
+                                                        pg->count);
+               }
+               requested_nob += pg->count;
 
                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
                         niobuf--;
@@ -1255,23 +1494,31 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         /* store cl_cksum_type in a local variable since
                          * it can be changed via lprocfs */
-                        cksum_type_t cksum_type = cli->cl_cksum_type;
+                       enum cksum_types cksum_type = cli->cl_cksum_type;
 
-                        if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
-                                oa->o_flags &= OBD_FL_LOCAL_MASK;
+                        if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
                                 body->oa.o_flags = 0;
-                        }
-                        body->oa.o_flags |= cksum_type_pack(cksum_type);
+
+                       body->oa.o_flags |= obd_cksum_type_pack(obd_name,
+                                                               cksum_type);
                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-                        body->oa.o_cksum = osc_checksum_bulk(requested_nob,
-                                                             page_count, pga,
-                                                             OST_WRITE,
-                                                             cksum_type);
+
+                       rc = osc_checksum_bulk_rw(obd_name, cksum_type,
+                                                 requested_nob, page_count,
+                                                 pga, OST_WRITE,
+                                                 &body->oa.o_cksum);
+                       if (rc < 0) {
+                               CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
+                                      rc);
+                               GOTO(out, rc);
+                       }
                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
                                body->oa.o_cksum);
+
                         /* save this in 'oa', too, for later checking */
                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-                        oa->o_flags |= cksum_type_pack(cksum_type);
+                       oa->o_flags |= obd_cksum_type_pack(obd_name,
+                                                          cksum_type);
                 } else {
                         /* clear out the checksum flag, in case this is a
                          * resend but cl_checksum is no longer set. b=11238 */
@@ -1286,21 +1533,26 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
                                 body->oa.o_flags = 0;
-                        body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
+                       body->oa.o_flags |= obd_cksum_type_pack(obd_name,
+                               cli->cl_cksum_type);
                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-                }
-        }
-        ptlrpc_request_set_replen(req);
+               }
+
+               /* Client cksum has been already copied to wire obdo in previous
+                * lustre_set_wire_obdo(), and in the case a bulk-read is being
+                * resent due to cksum error, this will allow Server to
+                * check+dump pages on its side */
+       }
+       ptlrpc_request_set_replen(req);
 
-        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = ptlrpc_req_async_args(req);
-        aa->aa_oa = oa;
-        aa->aa_requested_nob = requested_nob;
-        aa->aa_nio_count = niocount;
-        aa->aa_page_count = page_count;
-        aa->aa_resends = 0;
-        aa->aa_ppga = pga;
-        aa->aa_cli = cli;
+       aa = ptlrpc_req_async_args(aa, req);
+       aa->aa_oa = oa;
+       aa->aa_requested_nob = requested_nob;
+       aa->aa_nio_count = niocount;
+       aa->aa_page_count = page_count;
+       aa->aa_resends = 0;
+       aa->aa_ppga = pga;
+       aa->aa_cli = cli;
        INIT_LIST_HEAD(&aa->aa_oaps);
 
        *reqp = req;
@@ -1315,27 +1567,127 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         RETURN(rc);
 }
 
+char dbgcksum_file_name[PATH_MAX];
+
+static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
+                               struct brw_page **pga, __u32 server_cksum,
+                               __u32 client_cksum)
+{
+       struct file *filp;
+       int rc, i;
+       unsigned int len;
+       char *buf;
+
+       /* will only keep dump of pages on first error for the same range in
+        * file/fid, not during the resends/retries. */
+       snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
+                "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
+                (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
+                 libcfs_debug_file_path_arr :
+                 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
+                oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
+                oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
+                oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
+                pga[0]->off,
+                pga[page_count-1]->off + pga[page_count-1]->count - 1,
+                client_cksum, server_cksum);
+       filp = filp_open(dbgcksum_file_name,
+                        O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
+       if (IS_ERR(filp)) {
+               rc = PTR_ERR(filp);
+               if (rc == -EEXIST)
+                       CDEBUG(D_INFO, "%s: can't open to dump pages with "
+                              "checksum error: rc = %d\n", dbgcksum_file_name,
+                              rc);
+               else
+                       CERROR("%s: can't open to dump pages with checksum "
+                              "error: rc = %d\n", dbgcksum_file_name, rc);
+               return;
+       }
+
+       for (i = 0; i < page_count; i++) {
+               len = pga[i]->count;
+               buf = kmap(pga[i]->pg);
+               while (len != 0) {
+                       rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
+                       if (rc < 0) {
+                               CERROR("%s: wanted to write %u but got %d "
+                                      "error\n", dbgcksum_file_name, len, rc);
+                               break;
+                       }
+                       len -= rc;
+                       buf += rc;
+                       CDEBUG(D_INFO, "%s: wrote %d bytes\n",
+                              dbgcksum_file_name, rc);
+               }
+               kunmap(pga[i]->pg);
+       }
+
+       rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
+       if (rc)
+               CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
+       filp_close(filp, NULL);
+}
+
 static int
 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
-                    __u32 client_cksum, __u32 server_cksum, int nob,
-                    size_t page_count, struct brw_page **pga,
-                    cksum_type_t client_cksum_type)
-{
-        __u32 new_cksum;
-        char *msg;
-        cksum_type_t cksum_type;
+                    __u32 client_cksum, __u32 server_cksum,
+                    struct osc_brw_async_args *aa)
+{
+       const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
+       enum cksum_types cksum_type;
+       obd_dif_csum_fn *fn = NULL;
+       int sector_size = 0;
+       __u32 new_cksum;
+       char *msg;
+       int rc;
 
         if (server_cksum == client_cksum) {
                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
                 return 0;
         }
 
-        cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
-                                       oa->o_flags : 0);
-        new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
-                                      cksum_type);
+       if (aa->aa_cli->cl_checksum_dump)
+               dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
+                                   server_cksum, client_cksum);
+
+       cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
+                                          oa->o_flags : 0);
+
+       switch (cksum_type) {
+       case OBD_CKSUM_T10IP512:
+               fn = obd_dif_ip_fn;
+               sector_size = 512;
+               break;
+       case OBD_CKSUM_T10IP4K:
+               fn = obd_dif_ip_fn;
+               sector_size = 4096;
+               break;
+       case OBD_CKSUM_T10CRC512:
+               fn = obd_dif_crc_fn;
+               sector_size = 512;
+               break;
+       case OBD_CKSUM_T10CRC4K:
+               fn = obd_dif_crc_fn;
+               sector_size = 4096;
+               break;
+       default:
+               break;
+       }
+
+       if (fn)
+               rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
+                                            aa->aa_page_count, aa->aa_ppga,
+                                            OST_WRITE, fn, sector_size,
+                                            &new_cksum);
+       else
+               rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
+                                      aa->aa_ppga, OST_WRITE, cksum_type,
+                                      &new_cksum);
 
-        if (cksum_type != client_cksum_type)
+       if (rc < 0)
+               msg = "failed to calculate the client write checksum";
+       else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
                 msg = "the server did not use the checksum type specified in "
                       "the original request - likely a protocol problem";
         else if (new_cksum == server_cksum)
@@ -1347,104 +1699,145 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
                 msg = "changed in transit AND doesn't match the original - "
                       "likely false positive due to mmap IO (bug 11742)";
 
-       LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
-                          " object "DOSTID" extent [%llu-%llu]\n",
-                          msg, libcfs_nid2str(peer->nid),
+       LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
+                          DFID " object "DOSTID" extent [%llu-%llu], original "
+                          "client csum %x (type %x), server csum %x (type %x),"
+                          " client csum now %x\n",
+                          obd_name, msg, libcfs_nid2str(peer->nid),
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
-                          POSTID(&oa->o_oi), pga[0]->off,
-                          pga[page_count-1]->off + pga[page_count-1]->count - 1);
-       CERROR("original client csum %x (type %x), server csum %x (type %x), "
-              "client csum now %x\n", client_cksum, client_cksum_type,
-              server_cksum, cksum_type, new_cksum);
+                          POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
+                          aa->aa_ppga[aa->aa_page_count - 1]->off +
+                               aa->aa_ppga[aa->aa_page_count-1]->count - 1,
+                          client_cksum,
+                          obd_cksum_type_unpack(aa->aa_oa->o_flags),
+                          server_cksum, cksum_type, new_cksum);
        return 1;
 }
 
 /* Note rc enters this function as number of bytes transferred */
 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 {
-        struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+       struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+       struct client_obd *cli = aa->aa_cli;
+       const char *obd_name = cli->cl_import->imp_obd->obd_name;
        const struct lnet_process_id *peer =
-                        &req->rq_import->imp_connection->c_peer;
-        struct client_obd *cli = aa->aa_cli;
-        struct ost_body *body;
+               &req->rq_import->imp_connection->c_peer;
+       struct ost_body *body;
        u32 client_cksum = 0;
-        ENTRY;
 
-        if (rc < 0 && rc != -EDQUOT) {
-                DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
-                RETURN(rc);
-        }
+       ENTRY;
 
-        LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
-        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL) {
-                DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
-                RETURN(-EPROTO);
-        }
+       if (rc < 0 && rc != -EDQUOT) {
+               DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
+               RETURN(rc);
+       }
 
-        /* set/clear over quota flag for a uid/gid/projid */
-        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
-            body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
-               unsigned qid[LL_MAXQUOTAS] =
-                                       { body->oa.o_uid, body->oa.o_gid,
-                                        body->oa.o_projid };
+       LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL) {
+               DEBUG_REQ(D_INFO, req, "cannot unpack body");
+               RETURN(-EPROTO);
+       }
 
-               CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
-                       body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
+       /* set/clear over quota flag for a uid/gid/projid */
+       if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
+           body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
+               unsigned qid[LL_MAXQUOTAS] = {
+                                        body->oa.o_uid, body->oa.o_gid,
+                                        body->oa.o_projid };
+               CDEBUG(D_QUOTA,
+                      "setdq for [%u %u %u] with valid %#llx, flags %x\n",
+                      body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
                       body->oa.o_valid, body->oa.o_flags);
-                osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
-        }
+                      osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
+                                      body->oa.o_flags);
+       }
 
-        osc_update_grant(cli, body);
+       osc_update_grant(cli, body);
 
-        if (rc < 0)
-                RETURN(rc);
+       if (rc < 0)
+               RETURN(rc);
 
-        if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
-                client_cksum = aa->aa_oa->o_cksum; /* save for later */
+       if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
+               client_cksum = aa->aa_oa->o_cksum; /* save for later */
 
-        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
-                if (rc > 0) {
-                        CERROR("Unexpected +ve rc %d\n", rc);
-                        RETURN(-EPROTO);
-                }
-                LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
+       if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
+               if (rc > 0) {
+                       CERROR("%s: unexpected positive size %d\n",
+                              obd_name, rc);
+                       RETURN(-EPROTO);
+               }
 
-                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
-                        RETURN(-EAGAIN);
+               if (req->rq_bulk != NULL &&
+                   sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
+                       RETURN(-EAGAIN);
 
-                if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
-                    check_write_checksum(&body->oa, peer, client_cksum,
-                                         body->oa.o_cksum, aa->aa_requested_nob,
-                                         aa->aa_page_count, aa->aa_ppga,
-                                         cksum_type_unpack(aa->aa_oa->o_flags)))
-                        RETURN(-EAGAIN);
+               if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
+                   check_write_checksum(&body->oa, peer, client_cksum,
+                                        body->oa.o_cksum, aa))
+                       RETURN(-EAGAIN);
 
-                rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
-                                     aa->aa_page_count, aa->aa_ppga);
-                GOTO(out, rc);
-        }
+               rc = check_write_rcs(req, aa->aa_requested_nob,
+                                    aa->aa_nio_count, aa->aa_page_count,
+                                    aa->aa_ppga);
+               GOTO(out, rc);
+       }
 
-        /* The rest of this function executes only for OST_READs */
+       /* The rest of this function executes only for OST_READs */
 
-        /* if unwrap_bulk failed, return -EAGAIN to retry */
-        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
-        if (rc < 0)
-                GOTO(out, rc = -EAGAIN);
+       if (req->rq_bulk == NULL) {
+               rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
+                                         RCL_SERVER);
+               LASSERT(rc == req->rq_status);
+       } else {
+               /* if unwrap_bulk failed, return -EAGAIN to retry */
+               rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+       }
+       if (rc < 0)
+               GOTO(out, rc = -EAGAIN);
 
-        if (rc > aa->aa_requested_nob) {
-                CERROR("Unexpected rc %d (%d requested)\n", rc,
-                       aa->aa_requested_nob);
-                RETURN(-EPROTO);
-        }
+       if (rc > aa->aa_requested_nob) {
+               CERROR("%s: unexpected size %d, requested %d\n", obd_name,
+                      rc, aa->aa_requested_nob);
+               RETURN(-EPROTO);
+       }
 
-        if (rc != req->rq_bulk->bd_nob_transferred) {
-                CERROR ("Unexpected rc %d (%d transferred)\n",
-                        rc, req->rq_bulk->bd_nob_transferred);
-                return (-EPROTO);
-        }
+       if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
+               CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
+                      rc, req->rq_bulk->bd_nob_transferred);
+               RETURN(-EPROTO);
+       }
+
+       if (req->rq_bulk == NULL) {
+               /* short io */
+               int nob, pg_count, i = 0;
+               unsigned char *buf;
+
+               CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
+               pg_count = aa->aa_page_count;
+               buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
+                                                  rc);
+               nob = rc;
+               while (nob > 0 && pg_count > 0) {
+                       unsigned char *ptr;
+                       int count = aa->aa_ppga[i]->count > nob ?
+                                   nob : aa->aa_ppga[i]->count;
+
+                       CDEBUG(D_CACHE, "page %p count %d\n",
+                              aa->aa_ppga[i]->pg, count);
+                       ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
+                       memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
+                              count);
+                       ll_kunmap_atomic((void *) ptr, KM_USER0);
+
+                       buf += count;
+                       nob -= count;
+                       i++;
+                       pg_count--;
+               }
+       }
 
         if (rc < aa->aa_requested_nob)
                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
@@ -1454,39 +1847,53 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                u32        server_cksum = body->oa.o_cksum;
                char      *via = "";
                char      *router = "";
-                cksum_type_t cksum_type;
-
-                cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
-                                               body->oa.o_flags : 0);
-                client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
-                                                 aa->aa_ppga, OST_READ,
-                                                 cksum_type);
-
-               if (peer->nid != req->rq_bulk->bd_sender) {
+               enum cksum_types cksum_type;
+               u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
+                       body->oa.o_flags : 0;
+
+               cksum_type = obd_cksum_type_unpack(o_flags);
+               rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
+                                         aa->aa_page_count, aa->aa_ppga,
+                                         OST_READ, &client_cksum);
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               if (req->rq_bulk != NULL &&
+                   peer->nid != req->rq_bulk->bd_sender) {
                        via = " via ";
                        router = libcfs_nid2str(req->rq_bulk->bd_sender);
                }
 
                if (server_cksum != client_cksum) {
+                       struct ost_body *clbody;
+                       u32 page_count = aa->aa_page_count;
+
+                       clbody = req_capsule_client_get(&req->rq_pill,
+                                                       &RMF_OST_BODY);
+                       if (cli->cl_checksum_dump)
+                               dump_all_bulk_pages(&clbody->oa, page_count,
+                                                   aa->aa_ppga, server_cksum,
+                                                   client_cksum);
+
                        LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
                                           "%s%s%s inode "DFID" object "DOSTID
-                                          " extent [%llu-%llu]\n",
-                                          req->rq_import->imp_obd->obd_name,
+                                          " extent [%llu-%llu], client %x, "
+                                          "server %x, cksum_type %x\n",
+                                          obd_name,
                                           libcfs_nid2str(peer->nid),
                                           via, router,
-                                          body->oa.o_valid & OBD_MD_FLFID ?
-                                               body->oa.o_parent_seq : (__u64)0,
-                                          body->oa.o_valid & OBD_MD_FLFID ?
-                                               body->oa.o_parent_oid : 0,
-                                          body->oa.o_valid & OBD_MD_FLFID ?
-                                               body->oa.o_parent_ver : 0,
+                                          clbody->oa.o_valid & OBD_MD_FLFID ?
+                                               clbody->oa.o_parent_seq : 0ULL,
+                                          clbody->oa.o_valid & OBD_MD_FLFID ?
+                                               clbody->oa.o_parent_oid : 0,
+                                          clbody->oa.o_valid & OBD_MD_FLFID ?
+                                               clbody->oa.o_parent_ver : 0,
                                           POSTID(&body->oa.o_oi),
                                           aa->aa_ppga[0]->off,
-                                          aa->aa_ppga[aa->aa_page_count-1]->off +
-                                          aa->aa_ppga[aa->aa_page_count-1]->count -
-                                                                       1);
-                       CERROR("client %x, server %x, cksum_type %x\n",
-                              client_cksum, server_cksum, cksum_type);
+                                          aa->aa_ppga[page_count-1]->off +
+                                          aa->aa_ppga[page_count-1]->count - 1,
+                                          client_cksum, server_cksum,
+                                          cksum_type);
                        cksum_counter = 0;
                        aa->aa_oa->o_cksum = client_cksum;
                        rc = -EAGAIN;
@@ -1495,32 +1902,34 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                        CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
                        rc = 0;
                }
-        } else if (unlikely(client_cksum)) {
-                static int cksum_missed;
-
-                cksum_missed++;
-                if ((cksum_missed & (-cksum_missed)) == cksum_missed)
-                        CERROR("Checksum %u requested from %s but not sent\n",
-                               cksum_missed, libcfs_nid2str(peer->nid));
-        } else {
-                rc = 0;
-        }
+       } else if (unlikely(client_cksum)) {
+               static int cksum_missed;
+
+               cksum_missed++;
+               if ((cksum_missed & (-cksum_missed)) == cksum_missed)
+                       CERROR("%s: checksum %u requested from %s but not sent\n",
+                              obd_name, cksum_missed,
+                              libcfs_nid2str(peer->nid));
+       } else {
+               rc = 0;
+       }
 out:
        if (rc >= 0)
                lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
                                     aa->aa_oa, &body->oa);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int osc_brw_redo_request(struct ptlrpc_request *request,
                                struct osc_brw_async_args *aa, int rc)
 {
-        struct ptlrpc_request *new_req;
-        struct osc_brw_async_args *new_aa;
-        struct osc_async_page *oap;
-        ENTRY;
+       struct ptlrpc_request *new_req;
+       struct osc_brw_async_args *new_aa;
+       struct osc_async_page *oap;
+       ENTRY;
 
+       /* The below message is checked in replay-ost-single.sh test_8ae*/
        DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
                  "redo for recoverable error %d", rc);
 
@@ -1542,22 +1951,24 @@ static int osc_brw_redo_request(struct ptlrpc_request *request,
                         }
                 }
         }
-        /* New request takes over pga and oaps from old request.
-         * Note that copying a list_head doesn't work, need to move it... */
-        aa->aa_resends++;
-        new_req->rq_interpret_reply = request->rq_interpret_reply;
-        new_req->rq_async_args = request->rq_async_args;
+       /*
+        * New request takes over pga and oaps from old request.
+        * Note that copying a list_head doesn't work, need to move it...
+        */
+       aa->aa_resends++;
+       new_req->rq_interpret_reply = request->rq_interpret_reply;
+       new_req->rq_async_args = request->rq_async_args;
        new_req->rq_commit_cb = request->rq_commit_cb;
        /* cap resend delay to the current request timeout, this is similar to
         * what ptlrpc does (see after_reply()) */
        if (aa->aa_resends > new_req->rq_timeout)
-               new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
+               new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
        else
-               new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
+               new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
         new_req->rq_generation_set = 1;
         new_req->rq_import_generation = request->rq_import_generation;
 
-        new_aa = ptlrpc_req_async_args(new_req);
+       new_aa = ptlrpc_req_async_args(new_aa, new_req);
 
        INIT_LIST_HEAD(&new_aa->aa_oaps);
        list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
@@ -1620,19 +2031,23 @@ static void osc_release_ppga(struct brw_page **ppga, size_t count)
 }
 
 static int brw_interpret(const struct lu_env *env,
-                         struct ptlrpc_request *req, void *data, int rc)
+                        struct ptlrpc_request *req, void *args, int rc)
 {
-       struct osc_brw_async_args *aa = data;
+       struct osc_brw_async_args *aa = args;
        struct osc_extent *ext;
        struct osc_extent *tmp;
        struct client_obd *cli = aa->aa_cli;
-        ENTRY;
+       unsigned long transferred = 0;
 
-        rc = osc_brw_fini_request(req, rc);
-        CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
-        /* When server return -EINPROGRESS, client should always retry
-         * regardless of the number of times the bulk was resent already. */
-       if (osc_recoverable_error(rc)) {
+       ENTRY;
+
+       rc = osc_brw_fini_request(req, rc);
+       CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
+       /*
+        * When server returns -EINPROGRESS, client should always retry
+        * regardless of the number of times the bulk was resent already.
+        */
+       if (osc_recoverable_error(rc) && !req->rq_no_delay) {
                if (req->rq_import_generation !=
                    req->rq_import->imp_generation) {
                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
@@ -1706,20 +2121,26 @@ static int brw_interpret(const struct lu_env *env,
                        cl_object_attr_update(env, obj, attr, valid);
                cl_object_attr_unlock(obj);
        }
-       OBDO_FREE(aa->aa_oa);
+       OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
+       aa->aa_oa = NULL;
 
        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
                osc_inc_unstable_pages(req);
 
        list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
                list_del_init(&ext->oe_link);
-               osc_extent_finish(env, ext, 1, rc);
+               osc_extent_finish(env, ext, 1,
+                                 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
        }
        LASSERT(list_empty(&aa->aa_exts));
        LASSERT(list_empty(&aa->aa_oaps));
 
+       transferred = (req->rq_bulk == NULL ? /* short io */
+                      aa->aa_requested_nob :
+                      req->rq_bulk->bd_nob_transferred);
+
        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
-       ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+       ptlrpc_lprocfs_brw(req, transferred);
 
        spin_lock(&cli->cl_loi_list_lock);
        /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
@@ -1777,10 +2198,12 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        int                             page_count = 0;
        bool                            soft_sync = false;
        bool                            interrupted = false;
+       bool                            ndelay = false;
        int                             i;
        int                             grant = 0;
        int                             rc;
-       struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
+       __u32                           layout_version = 0;
+       LIST_HEAD(rpc_list);
        struct ost_body                 *body;
        ENTRY;
        LASSERT(!list_empty(ext_list));
@@ -1791,6 +2214,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                mem_tight |= ext->oe_memalloc;
                grant += ext->oe_grants;
                page_count += ext->oe_nr_pages;
+               layout_version = max(layout_version, ext->oe_layout_version);
                if (obj == NULL)
                        obj = ext->oe_obj;
        }
@@ -1803,7 +2227,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        if (pga == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       OBDO_ALLOC(oa);
+       OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
        if (oa == NULL)
                GOTO(out, rc = -ENOMEM);
 
@@ -1833,6 +2257,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                        if (oap->oap_interrupted)
                                interrupted = true;
                }
+               if (ext->oe_ndelay)
+                       ndelay = true;
        }
 
        /* first page in the list */
@@ -1846,8 +2272,16 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        crattr->cra_oa = oa;
        cl_req_attr_set(env, osc2cl(obj), crattr);
 
-       if (cmd == OBD_BRW_WRITE)
+       if (cmd == OBD_BRW_WRITE) {
                oa->o_grant_used = grant;
+               if (layout_version > 0) {
+                       CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
+                              PFID(&oa->o_oi.oi_fid), layout_version);
+
+                       oa->o_layout_version = layout_version;
+                       oa->o_valid |= OBD_MD_LAYOUT_VERSION;
+               }
+       }
 
        sort_brw_pages(pga, page_count);
        rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
@@ -1862,6 +2296,12 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        oap->oap_request = ptlrpc_request_addref(req);
        if (interrupted && !req->rq_intr)
                ptlrpc_mark_interrupted(req);
+       if (ndelay) {
+               req->rq_no_resend = req->rq_no_delay = 1;
+               /* probably set a shorter timeout value.
+                * to handle ETIMEDOUT in brw_interpret() correctly. */
+               /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
+       }
 
        /* Need to update the timestamps after the request is built in case
         * we race with setattr (locally or in queue at OST).  If OST gets
@@ -1870,12 +2310,11 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
         * way to do this in a single call.  bug 10150 */
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        crattr->cra_oa = &body->oa;
-       crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
+       crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
        cl_req_attr_set(env, osc2cl(obj), crattr);
        lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
 
-       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-       aa = ptlrpc_req_async_args(req);
+       aa = ptlrpc_req_async_args(aa, req);
        INIT_LIST_HEAD(&aa->aa_oaps);
        list_splice_init(&rpc_list, &aa->aa_oaps);
        INIT_LIST_HEAD(&aa->aa_exts);
@@ -1898,7 +2337,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        }
        spin_unlock(&cli->cl_loi_list_lock);
 
-       DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
+       DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
                  page_count, aa, cli->cl_r_in_flight,
                  cli->cl_w_in_flight);
        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
@@ -1915,7 +2354,7 @@ out:
                LASSERT(req == NULL);
 
                if (oa)
-                       OBDO_FREE(oa);
+                       OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
                if (pga)
                        OBD_FREE(pga, sizeof(*pga) * page_count);
                /* this should happen rarely and is pretty bad, it makes the
@@ -1948,10 +2387,10 @@ static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
        return set;
 }
 
-static int osc_enqueue_fini(struct ptlrpc_request *req,
-                           osc_enqueue_upcall_f upcall, void *cookie,
-                           struct lustre_handle *lockh, enum ldlm_mode mode,
-                           __u64 *flags, int agl, int errcode)
+int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
+                    void *cookie, struct lustre_handle *lockh,
+                    enum ldlm_mode mode, __u64 *flags, bool speculative,
+                    int errcode)
 {
        bool intent = *flags & LDLM_FL_HAS_INTENT;
        int rc;
@@ -1968,7 +2407,7 @@ static int osc_enqueue_fini(struct ptlrpc_request *req,
                        ptlrpc_status_ntoh(rep->lock_policy_res1);
                if (rep->lock_policy_res1)
                        errcode = rep->lock_policy_res1;
-               if (!agl)
+               if (!speculative)
                        *flags |= LDLM_FL_LVB_READY;
        } else if (errcode == ELDLM_OK) {
                *flags |= LDLM_FL_LVB_READY;
@@ -1983,13 +2422,13 @@ static int osc_enqueue_fini(struct ptlrpc_request *req,
        if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
                ldlm_lock_decref(lockh, mode);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
-static int osc_enqueue_interpret(const struct lu_env *env,
-                                struct ptlrpc_request *req,
-                                struct osc_enqueue_args *aa, int rc)
+int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
+                         void *args, int rc)
 {
+       struct osc_enqueue_args *aa = args;
        struct ldlm_lock *lock;
        struct lustre_handle *lockh = &aa->oa_lockh;
        enum ldlm_mode mode = aa->oa_mode;
@@ -2018,7 +2457,7 @@ static int osc_enqueue_interpret(const struct lu_env *env,
        /* Let CP AST to grant the lock first. */
        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
 
-       if (aa->oa_agl) {
+       if (aa->oa_speculative) {
                LASSERT(aa->oa_lvb == NULL);
                LASSERT(aa->oa_flags == NULL);
                aa->oa_flags = &flags;
@@ -2030,9 +2469,9 @@ static int osc_enqueue_interpret(const struct lu_env *env,
                                   lockh, rc);
        /* Complete osc stuff. */
        rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
-                             aa->oa_flags, aa->oa_agl, rc);
+                             aa->oa_flags, aa->oa_speculative, rc);
 
-        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
 
        ldlm_lock_decref(lockh, mode);
        LDLM_LOCK_PUT(lock);
@@ -2050,10 +2489,10 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
  * release locks just after they are obtained. */
 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                     __u64 *flags, union ldlm_policy_data *policy,
-                    struct ost_lvb *lvb, int kms_valid,
-                    osc_enqueue_upcall_f upcall, void *cookie,
-                    struct ldlm_enqueue_info *einfo,
-                    struct ptlrpc_request_set *rqset, int async, int agl)
+                    struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
+                    void *cookie, struct ldlm_enqueue_info *einfo,
+                    struct ptlrpc_request_set *rqset, int async,
+                    bool speculative)
 {
        struct obd_device *obd = exp->exp_obd;
        struct lustre_handle lockh = { 0 };
@@ -2069,15 +2508,6 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
        policy->l_extent.end |= ~PAGE_MASK;
 
-        /*
-         * kms is not valid when either object is completely fresh (so that no
-         * locks are cached), or object was evicted. In the latter case cached
-         * lock cannot be used, because it would prime inode state with
-         * potentially stale LVB.
-         */
-        if (!kms_valid)
-                goto no_match;
-
         /* Next, search for already existing extent locks that will cover us */
         /* If we're trying to read, we also search for an existing PW lock.  The
          * VFS and page cache already protect us locally, so lots of readers/
@@ -2093,7 +2523,10 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
         mode = einfo->ei_mode;
         if (einfo->ei_mode == LCK_PR)
                 mode |= LCK_PW;
-       if (agl == 0)
+       /* Normal lock requests must wait for the LVB to be ready before
+        * matching a lock; speculative lock requests do not need to,
+        * because they will not actually use the lock. */
+       if (!speculative)
                match_flags |= LDLM_FL_LVB_READY;
        if (intent != 0)
                match_flags |= LDLM_FL_BLOCK_GRANTED;
@@ -2106,13 +2539,22 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                        RETURN(ELDLM_OK);
 
                matched = ldlm_handle2lock(&lockh);
-               if (agl) {
-                       /* AGL enqueues DLM locks speculatively. Therefore if
-                        * it already exists a DLM lock, it wll just inform the
-                        * caller to cancel the AGL process for this stripe. */
+               if (speculative) {
+                       /* This DLM lock request is speculative, and does not
+                        * have an associated IO request. Therefore if there
+                        * is already a DLM lock, it wll just inform the
+                        * caller to cancel the request for this stripe.*/
+                       lock_res_and_lock(matched);
+                       if (ldlm_extent_equal(&policy->l_extent,
+                           &matched->l_policy_data.l_extent))
+                               rc = -EEXIST;
+                       else
+                               rc = -ECANCELED;
+                       unlock_res_and_lock(matched);
+
                        ldlm_lock_decref(&lockh, mode);
                        LDLM_LOCK_PUT(matched);
-                       RETURN(-ECANCELED);
+                       RETURN(rc);
                } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
                        *flags |= LDLM_FL_LVB_READY;
 
@@ -2128,7 +2570,6 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                }
        }
 
-no_match:
        if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
                RETURN(-ENOLCK);
 
@@ -2157,28 +2598,26 @@ no_match:
        if (async) {
                if (!rc) {
                        struct osc_enqueue_args *aa;
-                       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-                       aa = ptlrpc_req_async_args(req);
-                       aa->oa_exp    = exp;
-                       aa->oa_mode   = einfo->ei_mode;
-                       aa->oa_type   = einfo->ei_type;
+                       aa = ptlrpc_req_async_args(aa, req);
+                       aa->oa_exp         = exp;
+                       aa->oa_mode        = einfo->ei_mode;
+                       aa->oa_type        = einfo->ei_type;
                        lustre_handle_copy(&aa->oa_lockh, &lockh);
-                       aa->oa_upcall = upcall;
-                       aa->oa_cookie = cookie;
-                       aa->oa_agl    = !!agl;
-                       if (!agl) {
+                       aa->oa_upcall      = upcall;
+                       aa->oa_cookie      = cookie;
+                       aa->oa_speculative = speculative;
+                       if (!speculative) {
                                aa->oa_flags  = flags;
                                aa->oa_lvb    = lvb;
                        } else {
-                               /* AGL is essentially to enqueue an DLM lock
-                                * in advance, so we don't care about the
-                                * result of AGL enqueue. */
+                               /* speculative locks are essentially to enqueue
+                                * a DLM lock  in advance, so we don't care
+                                * about the result of the enqueue. */
                                aa->oa_lvb    = NULL;
                                aa->oa_flags  = NULL;
                        }
 
-                       req->rq_interpret_reply =
-                               (ptlrpc_interpterer_t)osc_enqueue_interpret;
+                       req->rq_interpret_reply = osc_enqueue_interpret;
                        if (rqset == PTLRPCD_SET)
                                ptlrpcd_add_req(req);
                        else
@@ -2190,16 +2629,17 @@ no_match:
        }
 
        rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
-                             flags, agl, rc);
+                             flags, speculative, rc);
        if (intent)
                ptlrpc_req_finished(req);
 
        RETURN(rc);
 }
 
-int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
-                  enum ldlm_type type, union ldlm_policy_data *policy,
-                  enum ldlm_mode mode, __u64 *flags, void *data,
+int osc_match_base(const struct lu_env *env, struct obd_export *exp,
+                  struct ldlm_res_id *res_id, enum ldlm_type type,
+                  union ldlm_policy_data *policy, enum ldlm_mode mode,
+                  __u64 *flags, struct osc_object *obj,
                   struct lustre_handle *lockh, int unref)
 {
        struct obd_device *obd = exp->exp_obd;
@@ -2227,11 +2667,19 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
                RETURN(rc);
 
-       if (data != NULL) {
+       if (obj != NULL) {
                struct ldlm_lock *lock = ldlm_handle2lock(lockh);
 
                LASSERT(lock != NULL);
-               if (!osc_set_lock_data(lock, data)) {
+               if (osc_set_lock_data(lock, obj)) {
+                       lock_res_and_lock(lock);
+                       if (!ldlm_is_lvb_cached(lock)) {
+                               LASSERT(lock->l_ast_data == obj);
+                               osc_lock_lvb_update(env, obj, lock, NULL);
+                               ldlm_set_lvb_cached(lock);
+                       }
+                       unlock_res_and_lock(lock);
+               } else {
                        ldlm_lock_decref(lockh, rc);
                        rc = 0;
                }
@@ -2241,48 +2689,64 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
 }
 
 static int osc_statfs_interpret(const struct lu_env *env,
-                                struct ptlrpc_request *req,
-                                struct osc_async_args *aa, int rc)
+                               struct ptlrpc_request *req, void *args, int rc)
 {
-        struct obd_statfs *msfs;
-        ENTRY;
+       struct osc_async_args *aa = args;
+       struct obd_statfs *msfs;
 
-        if (rc == -EBADR)
-                /* The request has in fact never been sent
-                 * due to issues at a higher level (LOV).
-                 * Exit immediately since the caller is
-                 * aware of the problem and takes care
-                 * of the clean up */
-                 RETURN(rc);
+       ENTRY;
+       if (rc == -EBADR)
+               /*
+                * The request has in fact never been sent due to issues at
+                * a higher level (LOV).  Exit immediately since the caller
+                * is aware of the problem and takes care of the clean up.
+                */
+               RETURN(rc);
 
-        if ((rc == -ENOTCONN || rc == -EAGAIN) &&
-            (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
-                GOTO(out, rc = 0);
+       if ((rc == -ENOTCONN || rc == -EAGAIN) &&
+           (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
+               GOTO(out, rc = 0);
 
-        if (rc != 0)
-                GOTO(out, rc);
+       if (rc != 0)
+               GOTO(out, rc);
 
-        msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
-        if (msfs == NULL) {
+       msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
+       if (msfs == NULL)
                 GOTO(out, rc = -EPROTO);
-        }
 
-        *aa->aa_oi->oi_osfs = *msfs;
+       *aa->aa_oi->oi_osfs = *msfs;
 out:
-        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
-        RETURN(rc);
+       rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
+
+       RETURN(rc);
 }
 
 static int osc_statfs_async(struct obd_export *exp,
-                            struct obd_info *oinfo, __u64 max_age,
+                           struct obd_info *oinfo, time64_t max_age,
                             struct ptlrpc_request_set *rqset)
 {
         struct obd_device     *obd = class_exp2obd(exp);
         struct ptlrpc_request *req;
         struct osc_async_args *aa;
-        int                    rc;
+       int rc;
         ENTRY;
 
+       if (obd->obd_osfs_age >= max_age) {
+               CDEBUG(D_SUPER,
+                      "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
+                      obd->obd_name, &obd->obd_osfs,
+                      obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
+                      obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
+               spin_lock(&obd->obd_osfs_lock);
+               memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
+               spin_unlock(&obd->obd_osfs_lock);
+               oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
+               if (oinfo->oi_cb_up)
+                       oinfo->oi_cb_up(oinfo, 0);
+
+               RETURN(0);
+       }
+
         /* We could possibly pass max_age in the request (as an absolute
          * timestamp or a "seconds.usec ago") so the target can avoid doing
          * extra calls into the filesystem if that isn't necessary (e.g.
@@ -2298,34 +2762,34 @@ static int osc_statfs_async(struct obd_export *exp,
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
-        ptlrpc_request_set_replen(req);
-        req->rq_request_portal = OST_CREATE_PORTAL;
-        ptlrpc_at_set_req_timeout(req);
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OST_CREATE_PORTAL;
+       ptlrpc_at_set_req_timeout(req);
 
-        if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
-                /* procfs requests not want stat in wait for avoid deadlock */
-                req->rq_no_resend = 1;
-                req->rq_no_delay = 1;
-        }
+       if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
+               /* procfs requests not want stat in wait for avoid deadlock */
+               req->rq_no_resend = 1;
+               req->rq_no_delay = 1;
+       }
 
-        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
-        CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = ptlrpc_req_async_args(req);
-        aa->aa_oi = oinfo;
+       req->rq_interpret_reply = osc_statfs_interpret;
+       aa = ptlrpc_req_async_args(aa, req);
+       aa->aa_oi = oinfo;
 
-        ptlrpc_set_add_req(rqset, req);
-        RETURN(0);
+       ptlrpc_set_add_req(rqset, req);
+       RETURN(0);
 }
 
 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
-                      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
+                     struct obd_statfs *osfs, time64_t max_age, __u32 flags)
 {
-        struct obd_device     *obd = class_exp2obd(exp);
-        struct obd_statfs     *msfs;
-        struct ptlrpc_request *req;
-        struct obd_import     *imp = NULL;
-        int rc;
-        ENTRY;
+       struct obd_device     *obd = class_exp2obd(exp);
+       struct obd_statfs     *msfs;
+       struct ptlrpc_request *req;
+       struct obd_import     *imp = NULL;
+       int rc;
+       ENTRY;
+
 
         /*Since the request might also come from lprocfs, so we need
          *sync this with client_disconnect_export Bug15684*/
@@ -2336,92 +2800,88 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
         if (!imp)
                 RETURN(-ENODEV);
 
-        /* We could possibly pass max_age in the request (as an absolute
-         * timestamp or a "seconds.usec ago") so the target can avoid doing
-         * extra calls into the filesystem if that isn't necessary (e.g.
-         * during mount that would help a bit).  Having relative timestamps
-         * is not so great if request processing is slow, while absolute
-         * timestamps are not ideal because they need time synchronization. */
-        req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
+       /* We could possibly pass max_age in the request (as an absolute
+        * timestamp or a "seconds.usec ago") so the target can avoid doing
+        * extra calls into the filesystem if that isn't necessary (e.g.
+        * during mount that would help a bit).  Having relative timestamps
+        * is not so great if request processing is slow, while absolute
+        * timestamps are not ideal because they need time synchronization. */
+       req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
 
-        class_import_put(imp);
+       class_import_put(imp);
 
-        if (req == NULL)
-                RETURN(-ENOMEM);
+       if (req == NULL)
+               RETURN(-ENOMEM);
 
-        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
-        ptlrpc_request_set_replen(req);
-        req->rq_request_portal = OST_CREATE_PORTAL;
-        ptlrpc_at_set_req_timeout(req);
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OST_CREATE_PORTAL;
+       ptlrpc_at_set_req_timeout(req);
 
-        if (flags & OBD_STATFS_NODELAY) {
-                /* procfs requests not want stat in wait for avoid deadlock */
-                req->rq_no_resend = 1;
-                req->rq_no_delay = 1;
-        }
+       if (flags & OBD_STATFS_NODELAY) {
+               /* procfs requests not want stat in wait for avoid deadlock */
+               req->rq_no_resend = 1;
+               req->rq_no_delay = 1;
+       }
 
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out, rc);
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               GOTO(out, rc);
 
-        msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
-        if (msfs == NULL) {
-                GOTO(out, rc = -EPROTO);
-        }
+       msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
+       if (msfs == NULL)
+               GOTO(out, rc = -EPROTO);
 
-        *osfs = *msfs;
+       *osfs = *msfs;
 
-        EXIT;
- out:
-        ptlrpc_req_finished(req);
-        return rc;
+       EXIT;
+out:
+       ptlrpc_req_finished(req);
+       return rc;
 }
 
 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         void *karg, void __user *uarg)
 {
-        struct obd_device *obd = exp->exp_obd;
-        struct obd_ioctl_data *data = karg;
-        int err = 0;
-        ENTRY;
+       struct obd_device *obd = exp->exp_obd;
+       struct obd_ioctl_data *data = karg;
+       int rc = 0;
 
+       ENTRY;
        if (!try_module_get(THIS_MODULE)) {
                CERROR("%s: cannot get module '%s'\n", obd->obd_name,
                       module_name(THIS_MODULE));
                return -EINVAL;
        }
-        switch (cmd) {
-        case OBD_IOC_CLIENT_RECOVER:
-                err = ptlrpc_recover_import(obd->u.cli.cl_import,
-                                            data->ioc_inlbuf1, 0);
-                if (err > 0)
-                        err = 0;
-                GOTO(out, err);
-        case IOC_OSC_SET_ACTIVE:
-                err = ptlrpc_set_import_active(obd->u.cli.cl_import,
-                                               data->ioc_offset);
-                GOTO(out, err);
-        case OBD_IOC_PING_TARGET:
-                err = ptlrpc_obd_ping(obd);
-                GOTO(out, err);
+       switch (cmd) {
+       case OBD_IOC_CLIENT_RECOVER:
+               rc = ptlrpc_recover_import(obd->u.cli.cl_import,
+                                          data->ioc_inlbuf1, 0);
+               if (rc > 0)
+                       rc = 0;
+               break;
+       case IOC_OSC_SET_ACTIVE:
+               rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
+                                             data->ioc_offset);
+               break;
        default:
-               CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
-                      cmd, current_comm());
-               GOTO(out, err = -ENOTTY);
+               rc = -ENOTTY;
+               CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
+                      obd->obd_name, cmd, current_comm(), rc);
+               break;
        }
-out:
+
        module_put(THIS_MODULE);
-       return err;
+       return rc;
 }
 
-static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
-                             u32 keylen, void *key,
-                             u32 vallen, void *val,
-                             struct ptlrpc_request_set *set)
+int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
+                      u32 keylen, void *key, u32 vallen, void *val,
+                      struct ptlrpc_request_set *set)
 {
         struct ptlrpc_request *req;
         struct obd_device     *obd = exp->exp_obd;
@@ -2449,23 +2909,6 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
                 RETURN(0);
         }
 
-       if (KEY_IS(KEY_CACHE_SET)) {
-               struct client_obd *cli = &obd->u.cli;
-
-               LASSERT(cli->cl_cache == NULL); /* only once */
-               cli->cl_cache = (struct cl_client_cache *)val;
-               cl_cache_incref(cli->cl_cache);
-               cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
-
-               /* add this osc into entity list */
-               LASSERT(list_empty(&cli->cl_lru_osc));
-               spin_lock(&cli->cl_cache->ccc_lru_lock);
-               list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
-               spin_unlock(&cli->cl_cache->ccc_lru_lock);
-
-               RETURN(0);
-       }
-
        if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
                struct client_obd *cli = &obd->u.cli;
                long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
@@ -2508,23 +2951,22 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
        tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
                                                        &RMF_OST_BODY :
                                                        &RMF_SETINFO_VAL);
-        memcpy(tmp, val, vallen);
+       memcpy(tmp, val, vallen);
 
        if (KEY_IS(KEY_GRANT_SHRINK)) {
-                struct osc_grant_args *aa;
-                struct obdo *oa;
-
-                CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-                aa = ptlrpc_req_async_args(req);
-                OBDO_ALLOC(oa);
-                if (!oa) {
-                        ptlrpc_req_finished(req);
-                        RETURN(-ENOMEM);
-                }
-                *oa = ((struct ost_body *)val)->oa;
-                aa->aa_oa = oa;
-                req->rq_interpret_reply = osc_shrink_grant_interpret;
-        }
+               struct osc_grant_args *aa;
+               struct obdo *oa;
+
+               aa = ptlrpc_req_async_args(aa, req);
+               OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
+               if (!oa) {
+                       ptlrpc_req_finished(req);
+                       RETURN(-ENOMEM);
+               }
+               *oa = ((struct ost_body *)val)->oa;
+               aa->aa_oa = oa;
+               req->rq_interpret_reply = osc_shrink_grant_interpret;
+       }
 
        ptlrpc_request_set_replen(req);
        if (!KEY_IS(KEY_GRANT_SHRINK)) {
@@ -2537,25 +2979,27 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
 
        RETURN(0);
 }
+EXPORT_SYMBOL(osc_set_info_async);
 
-static int osc_reconnect(const struct lu_env *env,
-                         struct obd_export *exp, struct obd_device *obd,
-                         struct obd_uuid *cluuid,
-                         struct obd_connect_data *data,
-                         void *localdata)
+int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
+                 struct obd_device *obd, struct obd_uuid *cluuid,
+                 struct obd_connect_data *data, void *localdata)
 {
-        struct client_obd *cli = &obd->u.cli;
+       struct client_obd *cli = &obd->u.cli;
 
-        if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
-                long lost_grant;
+       if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
+               long lost_grant;
                long grant;
 
                spin_lock(&cli->cl_loi_list_lock);
                grant = cli->cl_avail_grant + cli->cl_reserved_grant;
-               if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
+               if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
+                       /* restore ocd_grant_blkbits as client page bits */
+                       data->ocd_grant_blkbits = PAGE_SHIFT;
                        grant += cli->cl_dirty_grant;
-               else
+               } else {
                        grant += cli->cl_dirty_pages << PAGE_SHIFT;
+               }
                data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
                lost_grant = cli->cl_lost_grant;
                cli->cl_lost_grant = 0;
@@ -2568,37 +3012,36 @@ static int osc_reconnect(const struct lu_env *env,
 
        RETURN(0);
 }
+EXPORT_SYMBOL(osc_reconnect);
 
-static int osc_disconnect(struct obd_export *exp)
+int osc_disconnect(struct obd_export *exp)
 {
        struct obd_device *obd = class_exp2obd(exp);
        int rc;
 
-        rc = client_disconnect_export(exp);
-        /**
-         * Initially we put del_shrink_grant before disconnect_export, but it
-         * causes the following problem if setup (connect) and cleanup
-         * (disconnect) are tangled together.
-         *      connect p1                     disconnect p2
-         *   ptlrpc_connect_import
-         *     ...............               class_manual_cleanup
-         *                                     osc_disconnect
-         *                                     del_shrink_grant
-         *   ptlrpc_connect_interrupt
-         *     init_grant_shrink
-         *   add this client to shrink list
-         *                                      cleanup_osc
-         * Bang! pinger trigger the shrink.
-         * So the osc should be disconnected from the shrink list, after we
-         * are sure the import has been destroyed. BUG18662
-         */
-        if (obd->u.cli.cl_import == NULL)
-                osc_del_shrink_grant(&obd->u.cli);
-        return rc;
-}
-
-static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
-       struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
+       rc = client_disconnect_export(exp);
+       /**
+        * Initially we put del_shrink_grant before disconnect_export, but it
+        * causes the following problem if setup (connect) and cleanup
+        * (disconnect) are tangled together.
+        *      connect p1                     disconnect p2
+        *   ptlrpc_connect_import
+        *     ...............               class_manual_cleanup
+        *                                     osc_disconnect
+        *                                     del_shrink_grant
+        *   ptlrpc_connect_interrupt
+        *     osc_init_grant
+        *   add this client to shrink list
+        *                                      cleanup_osc
+        * Bang! grant shrink thread trigger the shrink. BUG18662
+        */
+       osc_del_grant_list(&obd->u.cli);
+       return rc;
+}
+EXPORT_SYMBOL(osc_disconnect);
+
+int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+                                struct hlist_node *hnode, void *arg)
 {
        struct lu_env *env = arg;
        struct ldlm_resource *res = cfs_hash_object(hs, hnode);
@@ -2627,6 +3070,7 @@ static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
 
        RETURN(0);
 }
+EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
 
 static int osc_import_event(struct obd_device *obd,
                             struct obd_import *imp,
@@ -2717,7 +3161,7 @@ static int osc_cancel_weight(struct ldlm_lock *lock)
         * Cancel all unused and granted extent lock.
         */
        if (lock->l_resource->lr_type == LDLM_EXTENT &&
-           lock->l_granted_mode == lock->l_req_mode &&
+           ldlm_is_granted(lock) &&
            osc_ldlm_weigh_ast(lock) == 0)
                RETURN(1);
 
@@ -2734,15 +3178,12 @@ static int brw_queue_work(const struct lu_env *env, void *data)
        RETURN(0);
 }
 
-int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
        struct client_obd *cli = &obd->u.cli;
-       struct obd_type   *type;
-       void              *handler;
-       int                rc;
-       int                adding;
-       int                added;
-       int                req_count;
+       void *handler;
+       int rc;
+
        ENTRY;
 
        rc = ptlrpcd_addref();
@@ -2753,9 +3194,10 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
        if (rc)
                GOTO(out_ptlrpcd, rc);
 
+
        handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
        if (IS_ERR(handler))
-               GOTO(out_client_setup, rc = PTR_ERR(handler));
+               GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
        cli->cl_writeback_work = handler;
 
        handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
@@ -2768,36 +3210,43 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                GOTO(out_ptlrpcd_work, rc);
 
        cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
+       osc_update_next_shrink(cli);
 
-#ifdef CONFIG_PROC_FS
-       obd->obd_vars = lprocfs_osc_obd_vars;
-#endif
-       /* If this is true then both client (osc) and server (osp) are on the
-        * same node. The osp layer if loaded first will register the osc proc
-        * directory. In that case this obd_device will be attached its proc
-        * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
-       type = class_search_type(LUSTRE_OSP_NAME);
-       if (type && type->typ_procsym) {
-               obd->obd_proc_entry = lprocfs_register(obd->obd_name,
-                                                      type->typ_procsym,
-                                                      obd->obd_vars, obd);
-               if (IS_ERR(obd->obd_proc_entry)) {
-                       rc = PTR_ERR(obd->obd_proc_entry);
-                       CERROR("error %d setting up lprocfs for %s\n", rc,
-                              obd->obd_name);
-                       obd->obd_proc_entry = NULL;
-               }
-       } else {
-               rc = lprocfs_obd_setup(obd);
-       }
+       RETURN(rc);
 
-       /* If the basic OSC proc tree construction succeeded then
-        * lets do the rest. */
-       if (rc == 0) {
-               lproc_osc_attach_seqstat(obd);
-               sptlrpc_lprocfs_cliobd_attach(obd);
-               ptlrpc_lprocfs_register_obd(obd);
+out_ptlrpcd_work:
+       if (cli->cl_writeback_work != NULL) {
+               ptlrpcd_destroy_work(cli->cl_writeback_work);
+               cli->cl_writeback_work = NULL;
        }
+       if (cli->cl_lru_work != NULL) {
+               ptlrpcd_destroy_work(cli->cl_lru_work);
+               cli->cl_lru_work = NULL;
+       }
+       client_obd_cleanup(obd);
+out_ptlrpcd:
+       ptlrpcd_decref();
+       RETURN(rc);
+}
+EXPORT_SYMBOL(osc_setup_common);
+
+int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+{
+       struct client_obd *cli = &obd->u.cli;
+       int                adding;
+       int                added;
+       int                req_count;
+       int                rc;
+
+       ENTRY;
+
+       rc = osc_setup_common(obd, lcfg);
+       if (rc < 0)
+               RETURN(rc);
+
+       rc = osc_tunables_init(obd);
+       if (rc)
+               RETURN(rc);
 
        /*
         * We try to control the total number of requests with a upper limit
@@ -2814,32 +3263,18 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                atomic_add(added, &osc_pool_req_count);
        }
 
-       INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
        ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
 
        spin_lock(&osc_shrink_lock);
        list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
        spin_unlock(&osc_shrink_lock);
+       cli->cl_import->imp_idle_timeout = osc_idle_timeout;
+       cli->cl_import->imp_idle_debug = D_HA;
 
        RETURN(0);
-
-out_ptlrpcd_work:
-       if (cli->cl_writeback_work != NULL) {
-               ptlrpcd_destroy_work(cli->cl_writeback_work);
-               cli->cl_writeback_work = NULL;
-       }
-       if (cli->cl_lru_work != NULL) {
-               ptlrpcd_destroy_work(cli->cl_lru_work);
-               cli->cl_lru_work = NULL;
-       }
-out_client_setup:
-       client_obd_cleanup(obd);
-out_ptlrpcd:
-       ptlrpcd_decref();
-       RETURN(rc);
 }
 
-static int osc_precleanup(struct obd_device *obd)
+int osc_precleanup_common(struct obd_device *obd)
 {
        struct client_obd *cli = &obd->u.cli;
        ENTRY;
@@ -2865,12 +3300,21 @@ static int osc_precleanup(struct obd_device *obd)
        }
 
        obd_cleanup_client_import(obd);
+       RETURN(0);
+}
+EXPORT_SYMBOL(osc_precleanup_common);
+
+static int osc_precleanup(struct obd_device *obd)
+{
+       ENTRY;
+
+       osc_precleanup_common(obd);
+
        ptlrpc_lprocfs_unregister_obd(obd);
-       lprocfs_obd_cleanup(obd);
        RETURN(0);
 }
 
-int osc_cleanup(struct obd_device *obd)
+int osc_cleanup_common(struct obd_device *obd)
 {
        struct client_obd *cli = &obd->u.cli;
        int rc;
@@ -2900,26 +3344,16 @@ int osc_cleanup(struct obd_device *obd)
        ptlrpcd_decref();
        RETURN(rc);
 }
+EXPORT_SYMBOL(osc_cleanup_common);
 
-int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
-{
-       int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
-       return rc > 0 ? 0: rc;
-}
-
-static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
-{
-        return osc_process_config_base(obd, buf);
-}
-
-static struct obd_ops osc_obd_ops = {
+static const struct obd_ops osc_obd_ops = {
         .o_owner                = THIS_MODULE,
         .o_setup                = osc_setup,
         .o_precleanup           = osc_precleanup,
-        .o_cleanup              = osc_cleanup,
+       .o_cleanup              = osc_cleanup_common,
         .o_add_conn             = client_import_add_conn,
         .o_del_conn             = client_import_del_conn,
-        .o_connect              = client_connect_import,
+       .o_connect              = client_connect_import,
         .o_reconnect            = osc_reconnect,
         .o_disconnect           = osc_disconnect,
         .o_statfs               = osc_statfs,
@@ -2931,12 +3365,11 @@ static struct obd_ops osc_obd_ops = {
         .o_iocontrol            = osc_iocontrol,
         .o_set_info_async       = osc_set_info_async,
         .o_import_event         = osc_import_event,
-        .o_process_config       = osc_process_config,
         .o_quotactl             = osc_quotactl,
 };
 
 static struct shrinker *osc_cache_shrinker;
-struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
+LIST_HEAD(osc_shrink_list);
 DEFINE_SPINLOCK(osc_shrink_lock);
 
 #ifndef HAVE_SHRINKER_COUNT
@@ -2946,10 +3379,6 @@ static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
                .nr_to_scan = shrink_param(sc, nr_to_scan),
                .gfp_mask   = shrink_param(sc, gfp_mask)
        };
-#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
-       struct shrinker *shrinker = NULL;
-#endif
-
        (void)osc_cache_shrink_scan(shrinker, &scv);
 
        return osc_cache_shrink_count(shrinker, &scv);
@@ -2958,8 +3387,6 @@ static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
 
 static int __init osc_init(void)
 {
-       bool enable_proc = true;
-       struct obd_type *type;
        unsigned int reqpool_size;
        unsigned int reqsize;
        int rc;
@@ -2976,11 +3403,7 @@ static int __init osc_init(void)
        if (rc)
                RETURN(rc);
 
-       type = class_search_type(LUSTRE_OSP_NAME);
-       if (type != NULL && type->typ_procsym != NULL)
-               enable_proc = false;
-
-       rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
+       rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
                                 LUSTRE_OSC_NAME, &osc_device_type);
        if (rc)
                GOTO(out_kmem, rc);
@@ -3009,19 +3432,28 @@ static int __init osc_init(void)
        osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
                                          ptlrpc_add_rqs_to_pool);
 
-       if (osc_rq_pool != NULL)
-               GOTO(out, rc);
-       rc = -ENOMEM;
+       if (osc_rq_pool == NULL)
+               GOTO(out_type, rc = -ENOMEM);
+
+       rc = osc_start_grant_work();
+       if (rc != 0)
+               GOTO(out_req_pool, rc);
+
+       RETURN(rc);
+
+out_req_pool:
+       ptlrpc_free_rq_pool(osc_rq_pool);
 out_type:
        class_unregister_type(LUSTRE_OSC_NAME);
 out_kmem:
        lu_kmem_fini(osc_caches);
-out:
+
        RETURN(rc);
 }
 
 static void __exit osc_exit(void)
 {
+       osc_stop_grant_work();
        remove_shrinker(osc_cache_shrinker);
        class_unregister_type(LUSTRE_OSC_NAME);
        lu_kmem_fini(osc_caches);