Whamcloud - gitweb
LU-11304 misc: update all url links to whamcloud
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 6d7929d..5c707de 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2016, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -32,8 +32,7 @@
 
 #define DEBUG_SUBSYSTEM S_OSC
 
-#include <libcfs/libcfs.h>
-
+#include <linux/workqueue.h>
 #include <lprocfs_status.h>
 #include <lustre_debug.h>
 #include <lustre_dlm.h>
@@ -58,6 +57,9 @@ struct ptlrpc_request_pool *osc_rq_pool;
 static unsigned int osc_reqpool_mem_max = 5;
 module_param(osc_reqpool_mem_max, uint, 0444);
 
+static int osc_idle_timeout = 20;
+module_param(osc_idle_timeout, uint, 0644);
+
 #define osc_grant_args osc_brw_async_args
 
 struct osc_setattr_args {
@@ -673,11 +675,12 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
                oa->o_undirty = 0;
        } else {
                unsigned long nrpages;
+               unsigned long undirty;
 
                nrpages = cli->cl_max_pages_per_rpc;
                nrpages *= cli->cl_max_rpcs_in_flight + 1;
                nrpages = max(nrpages, cli->cl_dirty_max_pages);
-               oa->o_undirty = nrpages << PAGE_SHIFT;
+               undirty = nrpages << PAGE_SHIFT;
                if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
                                 GRANT_PARAM)) {
                        int nrextents;
@@ -686,8 +689,13 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
                         * grant space */
                        nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
                                     cli->cl_max_extent_pages;
-                       oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
+                       undirty += nrextents * cli->cl_grant_extent_tax;
                }
+               /* Do not ask for more than OBD_MAX_GRANT - a margin for server
+                * to add extent tax, etc.
+                */
+               oa->o_undirty = min(undirty, OBD_MAX_GRANT -
+                                   (PTLRPC_MAX_BRW_PAGES << PAGE_SHIFT)*4UL);
         }
        oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
         oa->o_dropped = cli->cl_lost_grant;
@@ -721,6 +729,16 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
         }
 }
 
+/**
+ * grant thread data for shrinking space.
+ */
+struct grant_thread_data {
+       struct list_head        gtd_clients;
+       struct mutex            gtd_mutex;
+       unsigned long           gtd_stopped:1;
+};
+static struct grant_thread_data client_gtd;
+
 static int osc_shrink_grant_interpret(const struct lu_env *env,
                                       struct ptlrpc_request *req,
                                       void *aa, int rc)
@@ -822,6 +840,9 @@ static int osc_should_shrink_grant(struct client_obd *client)
 {
        time64_t next_shrink = client->cl_next_shrink_grant;
 
+       if (client->cl_import == NULL)
+               return 0;
+
         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
              OBD_CONNECT_GRANT_SHRINK) == 0)
                 return 0;
@@ -841,38 +862,83 @@ static int osc_should_shrink_grant(struct client_obd *client)
         return 0;
 }
 
-static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
+#define GRANT_SHRINK_RPC_BATCH 100
+
+static struct delayed_work work;
+
+static void osc_grant_work_handler(struct work_struct *data)
 {
-       struct client_obd *client;
+       struct client_obd *cli;
+       int rpc_sent;
+       bool init_next_shrink = true;
+       time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
+
+       rpc_sent = 0;
+       mutex_lock(&client_gtd.gtd_mutex);
+       list_for_each_entry(cli, &client_gtd.gtd_clients,
+                           cl_grant_chain) {
+               if (++rpc_sent < GRANT_SHRINK_RPC_BATCH &&
+                   osc_should_shrink_grant(cli))
+                       osc_shrink_grant(cli);
 
-       list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
-               if (osc_should_shrink_grant(client))
-                       osc_shrink_grant(client);
+               if (!init_next_shrink) {
+                       if (cli->cl_next_shrink_grant < next_shrink &&
+                           cli->cl_next_shrink_grant > ktime_get_seconds())
+                               next_shrink = cli->cl_next_shrink_grant;
+               } else {
+                       init_next_shrink = false;
+                       next_shrink = cli->cl_next_shrink_grant;
+               }
        }
-       return 0;
+       mutex_unlock(&client_gtd.gtd_mutex);
+
+       if (client_gtd.gtd_stopped == 1)
+               return;
+
+       if (next_shrink > ktime_get_seconds())
+               schedule_delayed_work(&work, msecs_to_jiffies(
+                                       (next_shrink - ktime_get_seconds()) *
+                                       MSEC_PER_SEC));
+       else
+               schedule_work(&work.work);
 }
 
-static int osc_add_shrink_grant(struct client_obd *client)
+/**
+ * Start grant thread for returing grant to server for idle clients.
+ */
+static int osc_start_grant_work(void)
 {
-       int rc;
+       client_gtd.gtd_stopped = 0;
+       mutex_init(&client_gtd.gtd_mutex);
+       INIT_LIST_HEAD(&client_gtd.gtd_clients);
+
+       INIT_DELAYED_WORK(&work, osc_grant_work_handler);
+       schedule_work(&work.work);
 
-       rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
-                                      TIMEOUT_GRANT,
-                                      osc_grant_shrink_grant_cb, NULL,
-                                      &client->cl_grant_shrink_list);
-       if (rc) {
-               CERROR("add grant client %s error %d\n", cli_name(client), rc);
-               return rc;
-       }
-       CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
-       osc_update_next_shrink(client);
        return 0;
 }
 
-static int osc_del_shrink_grant(struct client_obd *client)
+static void osc_stop_grant_work(void)
 {
-        return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
-                                         TIMEOUT_GRANT);
+       client_gtd.gtd_stopped = 1;
+       cancel_delayed_work_sync(&work);
+}
+
+static void osc_add_grant_list(struct client_obd *client)
+{
+       mutex_lock(&client_gtd.gtd_mutex);
+       list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
+       mutex_unlock(&client_gtd.gtd_mutex);
+}
+
+static void osc_del_grant_list(struct client_obd *client)
+{
+       if (list_empty(&client->cl_grant_chain))
+               return;
+
+       mutex_lock(&client_gtd.gtd_mutex);
+       list_del_init(&client->cl_grant_chain);
+       mutex_unlock(&client_gtd.gtd_mutex);
 }
 
 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
@@ -928,9 +994,8 @@ void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
                cli->cl_max_extent_pages);
 
-       if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
-           list_empty(&cli->cl_grant_shrink_list))
-               osc_add_shrink_grant(cli);
+       if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
+               osc_add_grant_list(cli);
 }
 EXPORT_SYMBOL(osc_init_grant);
 
@@ -1020,7 +1085,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
                  * safe to combine */
                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
-                              "report this at https://jira.hpdd.intel.com/\n",
+                              "report this at https://jira.whamcloud.com/\n",
                               p1->flag, p2->flag);
                 }
                 return 0;
@@ -1029,11 +1094,108 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
         return (p1->off + p1->count == p2->off);
 }
 
-static u32 osc_checksum_bulk(int nob, size_t pg_count,
+static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
+                                  size_t pg_count, struct brw_page **pga,
+                                  int opc, obd_dif_csum_fn *fn,
+                                  int sector_size,
+                                  u32 *check_sum)
+{
+       struct cfs_crypto_hash_desc *hdesc;
+       /* Used Adler as the default checksum type on top of DIF tags */
+       unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
+       struct page *__page;
+       unsigned char *buffer;
+       __u16 *guard_start;
+       unsigned int bufsize;
+       int guard_number;
+       int used_number = 0;
+       int used;
+       u32 cksum;
+       int rc = 0;
+       int i = 0;
+
+       LASSERT(pg_count > 0);
+
+       __page = alloc_page(GFP_KERNEL);
+       if (__page == NULL)
+               return -ENOMEM;
+
+       hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+       if (IS_ERR(hdesc)) {
+               rc = PTR_ERR(hdesc);
+               CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
+                      obd_name, cfs_crypto_hash_name(cfs_alg), rc);
+               GOTO(out, rc);
+       }
+
+       buffer = kmap(__page);
+       guard_start = (__u16 *)buffer;
+       guard_number = PAGE_SIZE / sizeof(*guard_start);
+       while (nob > 0 && pg_count > 0) {
+               unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
+
+               /* corrupt the data before we compute the checksum, to
+                * simulate an OST->client data error */
+               if (unlikely(i == 0 && opc == OST_READ &&
+                            OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
+                       unsigned char *ptr = kmap(pga[i]->pg);
+                       int off = pga[i]->off & ~PAGE_MASK;
+
+                       memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
+                       kunmap(pga[i]->pg);
+               }
+
+               /*
+                * The left guard number should be able to hold checksums of a
+                * whole page
+                */
+               rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg, 0,
+                                                 count,
+                                                 guard_start + used_number,
+                                                 guard_number - used_number,
+                                                 &used, sector_size,
+                                                 fn);
+               if (rc)
+                       break;
+
+               used_number += used;
+               if (used_number == guard_number) {
+                       cfs_crypto_hash_update_page(hdesc, __page, 0,
+                               used_number * sizeof(*guard_start));
+                       used_number = 0;
+               }
+
+               nob -= pga[i]->count;
+               pg_count--;
+               i++;
+       }
+       kunmap(__page);
+       if (rc)
+               GOTO(out, rc);
+
+       if (used_number != 0)
+               cfs_crypto_hash_update_page(hdesc, __page, 0,
+                       used_number * sizeof(*guard_start));
+
+       bufsize = sizeof(cksum);
+       cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+
+       /* For sending we only compute the wrong checksum instead
+        * of corrupting the data so it is still correct on a redo */
+       if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+               cksum++;
+
+       *check_sum = cksum;
+out:
+       __free_page(__page);
+       return rc;
+}
+
+static int osc_checksum_bulk(int nob, size_t pg_count,
                             struct brw_page **pga, int opc,
-                            enum cksum_types cksum_type)
+                            enum cksum_types cksum_type,
+                            u32 *cksum)
 {
-       u32                             cksum;
        int                             i = 0;
        struct cfs_crypto_hash_desc     *hdesc;
        unsigned int                    bufsize;
@@ -1072,15 +1234,38 @@ static u32 osc_checksum_bulk(int nob, size_t pg_count,
                i++;
        }
 
-       bufsize = sizeof(cksum);
-       cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+       bufsize = sizeof(*cksum);
+       cfs_crypto_hash_final(hdesc, (unsigned char *)cksum, &bufsize);
 
        /* For sending we only compute the wrong checksum instead
         * of corrupting the data so it is still correct on a redo */
        if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
-               cksum++;
+               (*cksum)++;
 
-       return cksum;
+       return 0;
+}
+
+static int osc_checksum_bulk_rw(const char *obd_name,
+                               enum cksum_types cksum_type,
+                               int nob, size_t pg_count,
+                               struct brw_page **pga, int opc,
+                               u32 *check_sum)
+{
+       obd_dif_csum_fn *fn = NULL;
+       int sector_size = 0;
+       int rc;
+
+       ENTRY;
+       obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
+
+       if (fn)
+               rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
+                                            opc, fn, sector_size, check_sum);
+       else
+               rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
+                                      check_sum);
+
+       RETURN(rc);
 }
 
 static int
@@ -1093,11 +1278,12 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         struct ost_body         *body;
         struct obd_ioobj        *ioobj;
         struct niobuf_remote    *niobuf;
-       int niocount, i, requested_nob, opc, rc, short_io_size;
+       int niocount, i, requested_nob, opc, rc, short_io_size = 0;
         struct osc_brw_async_args *aa;
         struct req_capsule      *pill;
         struct brw_page *pg_prev;
        void *short_io_buf;
+       const char *obd_name = cli->cl_import->imp_obd->obd_name;
 
         ENTRY;
         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
@@ -1179,6 +1365,15 @@ no_bulk:
 
        lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
 
+       /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
+        * and from_kgid(), because they are asynchronous. Fortunately, variable
+        * oa contains valid o_uid and o_gid in these two operations.
+        * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
+        * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
+        * other process logic */
+       body->oa.o_uid = oa->o_uid;
+       body->oa.o_gid = oa->o_gid;
+
        obdo_to_ioobj(oa, ioobj);
        ioobj->ioo_bufcnt = niocount;
        /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
@@ -1283,17 +1478,26 @@ no_bulk:
                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
                                 body->oa.o_flags = 0;
 
-                        body->oa.o_flags |= cksum_type_pack(cksum_type);
+                       body->oa.o_flags |= obd_cksum_type_pack(obd_name,
+                                                               cksum_type);
                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-                        body->oa.o_cksum = osc_checksum_bulk(requested_nob,
-                                                             page_count, pga,
-                                                             OST_WRITE,
-                                                             cksum_type);
+
+                       rc = osc_checksum_bulk_rw(obd_name, cksum_type,
+                                                 requested_nob, page_count,
+                                                 pga, OST_WRITE,
+                                                 &body->oa.o_cksum);
+                       if (rc < 0) {
+                               CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
+                                      rc);
+                               GOTO(out, rc);
+                       }
                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
                                body->oa.o_cksum);
+
                         /* save this in 'oa', too, for later checking */
                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-                        oa->o_flags |= cksum_type_pack(cksum_type);
+                       oa->o_flags |= obd_cksum_type_pack(obd_name,
+                                                          cksum_type);
                 } else {
                         /* clear out the checksum flag, in case this is a
                          * resend but cl_checksum is no longer set. b=11238 */
@@ -1308,9 +1512,10 @@ no_bulk:
                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
                                 body->oa.o_flags = 0;
-                        body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
+                       body->oa.o_flags |= obd_cksum_type_pack(obd_name,
+                               cli->cl_cksum_type);
                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-                }
+               }
 
                /* Client cksum has been already copied to wire obdo in previous
                 * lustre_set_wire_obdo(), and in the case a bulk-read is being
@@ -1352,7 +1557,6 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
        int rc, i;
        unsigned int len;
        char *buf;
-       mm_segment_t oldfs;
 
        /* will only keep dump of pages on first error for the same range in
         * file/fid, not during the resends/retries. */
@@ -1381,14 +1585,11 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
                return;
        }
 
-       oldfs = get_fs();
-       set_fs(KERNEL_DS);
        for (i = 0; i < page_count; i++) {
                len = pga[i]->count;
                buf = kmap(pga[i]->pg);
                while (len != 0) {
-                       rc = vfs_write(filp, (__force const char __user *)buf,
-                                      len, &filp->f_pos);
+                       rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
                        if (rc < 0) {
                                CERROR("%s: wanted to write %u but got %d "
                                       "error\n", dbgcksum_file_name, len, rc);
@@ -1401,7 +1602,6 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
                }
                kunmap(pga[i]->pg);
        }
-       set_fs(oldfs);
 
        rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
        if (rc)
@@ -1411,13 +1611,18 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
 }
 
 static int
-check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
-                               __u32 client_cksum, __u32 server_cksum,
-                               struct osc_brw_async_args *aa)
+check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
+                    __u32 client_cksum, __u32 server_cksum,
+                    struct osc_brw_async_args *aa)
 {
-        __u32 new_cksum;
-        char *msg;
+       const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
        enum cksum_types cksum_type;
+       obd_dif_csum_fn *fn = NULL;
+       int sector_size = 0;
+       bool t10pi = false;
+       __u32 new_cksum;
+       char *msg;
+       int rc;
 
         if (server_cksum == client_cksum) {
                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
@@ -1428,12 +1633,50 @@ check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
                dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
                                    server_cksum, client_cksum);
 
-       cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
-                                      oa->o_flags : 0);
-       new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
-                                     aa->aa_ppga, OST_WRITE, cksum_type);
+       cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
+                                          oa->o_flags : 0);
+
+       switch (cksum_type) {
+       case OBD_CKSUM_T10IP512:
+               t10pi = true;
+               fn = obd_dif_ip_fn;
+               sector_size = 512;
+               break;
+       case OBD_CKSUM_T10IP4K:
+               t10pi = true;
+               fn = obd_dif_ip_fn;
+               sector_size = 4096;
+               break;
+       case OBD_CKSUM_T10CRC512:
+               t10pi = true;
+               fn = obd_dif_crc_fn;
+               sector_size = 512;
+               break;
+       case OBD_CKSUM_T10CRC4K:
+               t10pi = true;
+               fn = obd_dif_crc_fn;
+               sector_size = 4096;
+               break;
+       default:
+               break;
+       }
+
+       if (t10pi)
+               rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
+                                            aa->aa_page_count,
+                                            aa->aa_ppga,
+                                            OST_WRITE,
+                                            fn,
+                                            sector_size,
+                                            &new_cksum);
+       else
+               rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
+                                      aa->aa_ppga, OST_WRITE, cksum_type,
+                                      &new_cksum);
 
-       if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
+       if (rc < 0)
+               msg = "failed to calculate the client write checksum";
+       else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
                 msg = "the server did not use the checksum type specified in "
                       "the original request - likely a protocol problem";
         else if (new_cksum == server_cksum)
@@ -1449,15 +1692,15 @@ check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
                           DFID " object "DOSTID" extent [%llu-%llu], original "
                           "client csum %x (type %x), server csum %x (type %x),"
                           " client csum now %x\n",
-                          aa->aa_cli->cl_import->imp_obd->obd_name,
-                          msg, libcfs_nid2str(peer->nid),
+                          obd_name, msg, libcfs_nid2str(peer->nid),
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
                           POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
                           aa->aa_ppga[aa->aa_page_count - 1]->off +
                                aa->aa_ppga[aa->aa_page_count-1]->count - 1,
-                          client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
+                          client_cksum,
+                          obd_cksum_type_unpack(aa->aa_oa->o_flags),
                           server_cksum, cksum_type, new_cksum);
        return 1;
 }
@@ -1465,11 +1708,12 @@ check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
 /* Note rc enters this function as number of bytes transferred */
 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 {
-        struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+       struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+       struct client_obd *cli = aa->aa_cli;
+       const char *obd_name = cli->cl_import->imp_obd->obd_name;
        const struct lnet_process_id *peer =
-                        &req->rq_import->imp_connection->c_peer;
-        struct client_obd *cli = aa->aa_cli;
-        struct ost_body *body;
+               &req->rq_import->imp_connection->c_peer;
+       struct ost_body *body;
        u32 client_cksum = 0;
         ENTRY;
 
@@ -1589,12 +1833,15 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                char      *via = "";
                char      *router = "";
                enum cksum_types cksum_type;
+               u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
+                       body->oa.o_flags : 0;
 
-                cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
-                                               body->oa.o_flags : 0);
-                client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
-                                                 aa->aa_ppga, OST_READ,
-                                                 cksum_type);
+               cksum_type = obd_cksum_type_unpack(o_flags);
+               rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
+                                         aa->aa_page_count, aa->aa_ppga,
+                                         OST_READ, &client_cksum);
+               if (rc < 0)
+                       GOTO(out, rc);
 
                if (req->rq_bulk != NULL &&
                    peer->nid != req->rq_bulk->bd_sender) {
@@ -1617,7 +1864,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                                           "%s%s%s inode "DFID" object "DOSTID
                                           " extent [%llu-%llu], client %x, "
                                           "server %x, cksum_type %x\n",
-                                          req->rq_import->imp_obd->obd_name,
+                                          obd_name,
                                           libcfs_nid2str(peer->nid),
                                           via, router,
                                           clbody->oa.o_valid & OBD_MD_FLFID ?
@@ -2040,7 +2287,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
         * way to do this in a single call.  bug 10150 */
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        crattr->cra_oa = &body->oa;
-       crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
+       crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
        cl_req_attr_set(env, osc2cl(obj), crattr);
        lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
 
@@ -2456,13 +2703,13 @@ out:
 }
 
 static int osc_statfs_async(struct obd_export *exp,
-                            struct obd_info *oinfo, __u64 max_age,
+                           struct obd_info *oinfo, time64_t max_age,
                             struct ptlrpc_request_set *rqset)
 {
         struct obd_device     *obd = class_exp2obd(exp);
         struct ptlrpc_request *req;
         struct osc_async_args *aa;
-        int                    rc;
+       int rc;
         ENTRY;
 
         /* We could possibly pass max_age in the request (as an absolute
@@ -2480,15 +2727,15 @@ static int osc_statfs_async(struct obd_export *exp,
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
-        ptlrpc_request_set_replen(req);
-        req->rq_request_portal = OST_CREATE_PORTAL;
-        ptlrpc_at_set_req_timeout(req);
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OST_CREATE_PORTAL;
+       ptlrpc_at_set_req_timeout(req);
 
-        if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
-                /* procfs requests not want stat in wait for avoid deadlock */
-                req->rq_no_resend = 1;
-                req->rq_no_delay = 1;
-        }
+       if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
+               /* procfs requests not want stat in wait for avoid deadlock */
+               req->rq_no_resend = 1;
+               req->rq_no_delay = 1;
+       }
 
        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
@@ -2500,14 +2747,15 @@ static int osc_statfs_async(struct obd_export *exp,
 }
 
 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
-                      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
+                     struct obd_statfs *osfs, time64_t max_age, __u32 flags)
 {
-        struct obd_device     *obd = class_exp2obd(exp);
-        struct obd_statfs     *msfs;
-        struct ptlrpc_request *req;
-        struct obd_import     *imp = NULL;
-        int rc;
-        ENTRY;
+       struct obd_device     *obd = class_exp2obd(exp);
+       struct obd_statfs     *msfs;
+       struct ptlrpc_request *req;
+       struct obd_import     *imp = NULL;
+       int rc;
+       ENTRY;
+
 
         /*Since the request might also come from lprocfs, so we need
          *sync this with client_disconnect_export Bug15684*/
@@ -2518,49 +2766,48 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
         if (!imp)
                 RETURN(-ENODEV);
 
-        /* We could possibly pass max_age in the request (as an absolute
-         * timestamp or a "seconds.usec ago") so the target can avoid doing
-         * extra calls into the filesystem if that isn't necessary (e.g.
-         * during mount that would help a bit).  Having relative timestamps
-         * is not so great if request processing is slow, while absolute
-         * timestamps are not ideal because they need time synchronization. */
-        req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
+       /* We could possibly pass max_age in the request (as an absolute
+        * timestamp or a "seconds.usec ago") so the target can avoid doing
+        * extra calls into the filesystem if that isn't necessary (e.g.
+        * during mount that would help a bit).  Having relative timestamps
+        * is not so great if request processing is slow, while absolute
+        * timestamps are not ideal because they need time synchronization. */
+       req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
 
-        class_import_put(imp);
+       class_import_put(imp);
 
-        if (req == NULL)
-                RETURN(-ENOMEM);
+       if (req == NULL)
+               RETURN(-ENOMEM);
 
-        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
-        ptlrpc_request_set_replen(req);
-        req->rq_request_portal = OST_CREATE_PORTAL;
-        ptlrpc_at_set_req_timeout(req);
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OST_CREATE_PORTAL;
+       ptlrpc_at_set_req_timeout(req);
 
-        if (flags & OBD_STATFS_NODELAY) {
-                /* procfs requests not want stat in wait for avoid deadlock */
-                req->rq_no_resend = 1;
-                req->rq_no_delay = 1;
-        }
+       if (flags & OBD_STATFS_NODELAY) {
+               /* procfs requests not want stat in wait for avoid deadlock */
+               req->rq_no_resend = 1;
+               req->rq_no_delay = 1;
+       }
 
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out, rc);
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               GOTO(out, rc);
 
-        msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
-        if (msfs == NULL) {
-                GOTO(out, rc = -EPROTO);
-        }
+       msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
+       if (msfs == NULL)
+               GOTO(out, rc = -EPROTO);
 
-        *osfs = *msfs;
+       *osfs = *msfs;
 
-        EXIT;
- out:
-        ptlrpc_req_finished(req);
-        return rc;
+       EXIT;
+out:
+       ptlrpc_req_finished(req);
+       return rc;
 }
 
 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
@@ -2755,27 +3002,24 @@ int osc_disconnect(struct obd_export *exp)
        struct obd_device *obd = class_exp2obd(exp);
        int rc;
 
-        rc = client_disconnect_export(exp);
-        /**
-         * Initially we put del_shrink_grant before disconnect_export, but it
-         * causes the following problem if setup (connect) and cleanup
-         * (disconnect) are tangled together.
-         *      connect p1                     disconnect p2
-         *   ptlrpc_connect_import
-         *     ...............               class_manual_cleanup
-         *                                     osc_disconnect
-         *                                     del_shrink_grant
-         *   ptlrpc_connect_interrupt
-         *     init_grant_shrink
-         *   add this client to shrink list
-         *                                      cleanup_osc
-         * Bang! pinger trigger the shrink.
-         * So the osc should be disconnected from the shrink list, after we
-         * are sure the import has been destroyed. BUG18662
-         */
-        if (obd->u.cli.cl_import == NULL)
-                osc_del_shrink_grant(&obd->u.cli);
-        return rc;
+       rc = client_disconnect_export(exp);
+       /**
+        * Initially we put del_shrink_grant before disconnect_export, but it
+        * causes the following problem if setup (connect) and cleanup
+        * (disconnect) are tangled together.
+        *      connect p1                     disconnect p2
+        *   ptlrpc_connect_import
+        *     ...............               class_manual_cleanup
+        *                                     osc_disconnect
+        *                                     del_shrink_grant
+        *   ptlrpc_connect_interrupt
+        *     osc_init_grant
+        *   add this client to shrink list
+        *                                      cleanup_osc
+        * Bang! grant shrink thread trigger the shrink. BUG18662
+        */
+       osc_del_grant_list(&obd->u.cli);
+       return rc;
 }
 EXPORT_SYMBOL(osc_disconnect);
 
@@ -2949,8 +3193,8 @@ int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
                GOTO(out_ptlrpcd_work, rc);
 
        cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
+       osc_update_next_shrink(cli);
 
-       INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
        RETURN(rc);
 
 out_ptlrpcd_work:
@@ -2972,7 +3216,6 @@ EXPORT_SYMBOL(osc_setup_common);
 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
        struct client_obd *cli = &obd->u.cli;
-       struct obd_type   *type;
        int                adding;
        int                added;
        int                req_count;
@@ -2984,36 +3227,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
        if (rc < 0)
                RETURN(rc);
 
-#ifdef CONFIG_PROC_FS
-       obd->obd_vars = lprocfs_osc_obd_vars;
-#endif
-       /* If this is true then both client (osc) and server (osp) are on the
-        * same node. The osp layer if loaded first will register the osc proc
-        * directory. In that case this obd_device will be attached its proc
-        * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
-        */
-       type = class_search_type(LUSTRE_OSP_NAME);
-       if (type && type->typ_procsym) {
-               obd->obd_proc_entry = lprocfs_register(obd->obd_name,
-                                                      type->typ_procsym,
-                                                      obd->obd_vars, obd);
-               if (IS_ERR(obd->obd_proc_entry)) {
-                       rc = PTR_ERR(obd->obd_proc_entry);
-                       CERROR("error %d setting up lprocfs for %s\n", rc,
-                              obd->obd_name);
-                       obd->obd_proc_entry = NULL;
-               }
-       }
-
-       rc = lprocfs_obd_setup(obd, false);
-       if (!rc) {
-               /* If the basic OSC proc tree construction succeeded then
-                * lets do the rest.
-                */
-               lproc_osc_attach_seqstat(obd);
-               sptlrpc_lprocfs_cliobd_attach(obd);
-               ptlrpc_lprocfs_register_obd(obd);
-       }
+       rc = osc_tunables_init(obd);
+       if (rc)
+               RETURN(rc);
 
        /*
         * We try to control the total number of requests with a upper limit
@@ -3030,12 +3246,12 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                atomic_add(added, &osc_pool_req_count);
        }
 
-       INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
        ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
 
        spin_lock(&osc_shrink_lock);
        list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
        spin_unlock(&osc_shrink_lock);
+       cli->cl_import->imp_idle_timeout = osc_idle_timeout;
 
        RETURN(0);
 }
@@ -3077,7 +3293,6 @@ static int osc_precleanup(struct obd_device *obd)
        osc_precleanup_common(obd);
 
        ptlrpc_lprocfs_unregister_obd(obd);
-       lprocfs_obd_cleanup(obd);
        RETURN(0);
 }
 
@@ -3115,8 +3330,9 @@ EXPORT_SYMBOL(osc_cleanup_common);
 
 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
-       int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
-       return rc > 0 ? 0: rc;
+       ssize_t count  = class_modify_config(lcfg, PARAM_OSC,
+                                            &obd->obd_kset.kobj);
+       return count > 0 ? 0 : count;
 }
 
 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
@@ -3221,19 +3437,28 @@ static int __init osc_init(void)
        osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
                                          ptlrpc_add_rqs_to_pool);
 
-       if (osc_rq_pool != NULL)
-               GOTO(out, rc);
-       rc = -ENOMEM;
+       if (osc_rq_pool == NULL)
+               GOTO(out_type, rc = -ENOMEM);
+
+       rc = osc_start_grant_work();
+       if (rc != 0)
+               GOTO(out_req_pool, rc);
+
+       RETURN(rc);
+
+out_req_pool:
+       ptlrpc_free_rq_pool(osc_rq_pool);
 out_type:
        class_unregister_type(LUSTRE_OSC_NAME);
 out_kmem:
        lu_kmem_fini(osc_caches);
-out:
+
        RETURN(rc);
 }
 
 static void __exit osc_exit(void)
 {
+       osc_stop_grant_work();
        remove_shrinker(osc_cache_shrinker);
        class_unregister_type(LUSTRE_OSC_NAME);
        lu_kmem_fini(osc_caches);