Whamcloud - gitweb
LU-11304 misc: update all url links to whamcloud
[fs/lustre-release.git] / lustre / osc / osc_request.c
index d196a83..5c707de 100644 (file)
@@ -32,6 +32,7 @@
 
 #define DEBUG_SUBSYSTEM S_OSC
 
+#include <linux/workqueue.h>
 #include <lprocfs_status.h>
 #include <lustre_debug.h>
 #include <lustre_dlm.h>
@@ -56,6 +57,9 @@ struct ptlrpc_request_pool *osc_rq_pool;
 static unsigned int osc_reqpool_mem_max = 5;
 module_param(osc_reqpool_mem_max, uint, 0444);
 
+static int osc_idle_timeout = 20;
+module_param(osc_idle_timeout, uint, 0644);
+
 #define osc_grant_args osc_brw_async_args
 
 struct osc_setattr_args {
@@ -725,6 +729,16 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
         }
 }
 
+/**
+ * grant thread data for shrinking space.
+ */
+struct grant_thread_data {
+       struct list_head        gtd_clients;
+       struct mutex            gtd_mutex;
+       unsigned long           gtd_stopped:1;
+};
+static struct grant_thread_data client_gtd;
+
 static int osc_shrink_grant_interpret(const struct lu_env *env,
                                       struct ptlrpc_request *req,
                                       void *aa, int rc)
@@ -826,6 +840,9 @@ static int osc_should_shrink_grant(struct client_obd *client)
 {
        time64_t next_shrink = client->cl_next_shrink_grant;
 
+       if (client->cl_import == NULL)
+               return 0;
+
         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
              OBD_CONNECT_GRANT_SHRINK) == 0)
                 return 0;
@@ -845,38 +862,83 @@ static int osc_should_shrink_grant(struct client_obd *client)
         return 0;
 }
 
-static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
+#define GRANT_SHRINK_RPC_BATCH 100
+
+static struct delayed_work work;
+
+static void osc_grant_work_handler(struct work_struct *data)
 {
-       struct client_obd *client;
+       struct client_obd *cli;
+       int rpc_sent;
+       bool init_next_shrink = true;
+       time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
+
+       rpc_sent = 0;
+       mutex_lock(&client_gtd.gtd_mutex);
+       list_for_each_entry(cli, &client_gtd.gtd_clients,
+                           cl_grant_chain) {
+               if (++rpc_sent < GRANT_SHRINK_RPC_BATCH &&
+                   osc_should_shrink_grant(cli))
+                       osc_shrink_grant(cli);
 
-       list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
-               if (osc_should_shrink_grant(client))
-                       osc_shrink_grant(client);
+               if (!init_next_shrink) {
+                       if (cli->cl_next_shrink_grant < next_shrink &&
+                           cli->cl_next_shrink_grant > ktime_get_seconds())
+                               next_shrink = cli->cl_next_shrink_grant;
+               } else {
+                       init_next_shrink = false;
+                       next_shrink = cli->cl_next_shrink_grant;
+               }
        }
-       return 0;
+       mutex_unlock(&client_gtd.gtd_mutex);
+
+       if (client_gtd.gtd_stopped == 1)
+               return;
+
+       if (next_shrink > ktime_get_seconds())
+               schedule_delayed_work(&work, msecs_to_jiffies(
+                                       (next_shrink - ktime_get_seconds()) *
+                                       MSEC_PER_SEC));
+       else
+               schedule_work(&work.work);
 }
 
-static int osc_add_shrink_grant(struct client_obd *client)
+/**
+ * Start grant thread for returing grant to server for idle clients.
+ */
+static int osc_start_grant_work(void)
 {
-       int rc;
+       client_gtd.gtd_stopped = 0;
+       mutex_init(&client_gtd.gtd_mutex);
+       INIT_LIST_HEAD(&client_gtd.gtd_clients);
+
+       INIT_DELAYED_WORK(&work, osc_grant_work_handler);
+       schedule_work(&work.work);
 
-       rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
-                                      TIMEOUT_GRANT,
-                                      osc_grant_shrink_grant_cb, NULL,
-                                      &client->cl_grant_shrink_list);
-       if (rc) {
-               CERROR("add grant client %s error %d\n", cli_name(client), rc);
-               return rc;
-       }
-       CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
-       osc_update_next_shrink(client);
        return 0;
 }
 
-static int osc_del_shrink_grant(struct client_obd *client)
+static void osc_stop_grant_work(void)
+{
+       client_gtd.gtd_stopped = 1;
+       cancel_delayed_work_sync(&work);
+}
+
+static void osc_add_grant_list(struct client_obd *client)
 {
-        return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
-                                         TIMEOUT_GRANT);
+       mutex_lock(&client_gtd.gtd_mutex);
+       list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
+       mutex_unlock(&client_gtd.gtd_mutex);
+}
+
+static void osc_del_grant_list(struct client_obd *client)
+{
+       if (list_empty(&client->cl_grant_chain))
+               return;
+
+       mutex_lock(&client_gtd.gtd_mutex);
+       list_del_init(&client->cl_grant_chain);
+       mutex_unlock(&client_gtd.gtd_mutex);
 }
 
 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
@@ -932,9 +994,8 @@ void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
                cli->cl_max_extent_pages);
 
-       if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
-           list_empty(&cli->cl_grant_shrink_list))
-               osc_add_shrink_grant(cli);
+       if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
+               osc_add_grant_list(cli);
 }
 EXPORT_SYMBOL(osc_init_grant);
 
@@ -1024,7 +1085,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
                  * safe to combine */
                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
-                              "report this at https://jira.hpdd.intel.com/\n",
+                              "report this at https://jira.whamcloud.com/\n",
                               p1->flag, p2->flag);
                 }
                 return 0;
@@ -1033,6 +1094,103 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
         return (p1->off + p1->count == p2->off);
 }
 
+static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
+                                  size_t pg_count, struct brw_page **pga,
+                                  int opc, obd_dif_csum_fn *fn,
+                                  int sector_size,
+                                  u32 *check_sum)
+{
+       struct cfs_crypto_hash_desc *hdesc;
+       /* Used Adler as the default checksum type on top of DIF tags */
+       unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
+       struct page *__page;
+       unsigned char *buffer;
+       __u16 *guard_start;
+       unsigned int bufsize;
+       int guard_number;
+       int used_number = 0;
+       int used;
+       u32 cksum;
+       int rc = 0;
+       int i = 0;
+
+       LASSERT(pg_count > 0);
+
+       __page = alloc_page(GFP_KERNEL);
+       if (__page == NULL)
+               return -ENOMEM;
+
+       hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+       if (IS_ERR(hdesc)) {
+               rc = PTR_ERR(hdesc);
+               CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
+                      obd_name, cfs_crypto_hash_name(cfs_alg), rc);
+               GOTO(out, rc);
+       }
+
+       buffer = kmap(__page);
+       guard_start = (__u16 *)buffer;
+       guard_number = PAGE_SIZE / sizeof(*guard_start);
+       while (nob > 0 && pg_count > 0) {
+               unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
+
+               /* corrupt the data before we compute the checksum, to
+                * simulate an OST->client data error */
+               if (unlikely(i == 0 && opc == OST_READ &&
+                            OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
+                       unsigned char *ptr = kmap(pga[i]->pg);
+                       int off = pga[i]->off & ~PAGE_MASK;
+
+                       memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
+                       kunmap(pga[i]->pg);
+               }
+
+               /*
+                * The left guard number should be able to hold checksums of a
+                * whole page
+                */
+               rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg, 0,
+                                                 count,
+                                                 guard_start + used_number,
+                                                 guard_number - used_number,
+                                                 &used, sector_size,
+                                                 fn);
+               if (rc)
+                       break;
+
+               used_number += used;
+               if (used_number == guard_number) {
+                       cfs_crypto_hash_update_page(hdesc, __page, 0,
+                               used_number * sizeof(*guard_start));
+                       used_number = 0;
+               }
+
+               nob -= pga[i]->count;
+               pg_count--;
+               i++;
+       }
+       kunmap(__page);
+       if (rc)
+               GOTO(out, rc);
+
+       if (used_number != 0)
+               cfs_crypto_hash_update_page(hdesc, __page, 0,
+                       used_number * sizeof(*guard_start));
+
+       bufsize = sizeof(cksum);
+       cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+
+       /* For sending we only compute the wrong checksum instead
+        * of corrupting the data so it is still correct on a redo */
+       if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+               cksum++;
+
+       *check_sum = cksum;
+out:
+       __free_page(__page);
+       return rc;
+}
+
 static int osc_checksum_bulk(int nob, size_t pg_count,
                             struct brw_page **pga, int opc,
                             enum cksum_types cksum_type,
@@ -1087,6 +1245,29 @@ static int osc_checksum_bulk(int nob, size_t pg_count,
        return 0;
 }
 
+static int osc_checksum_bulk_rw(const char *obd_name,
+                               enum cksum_types cksum_type,
+                               int nob, size_t pg_count,
+                               struct brw_page **pga, int opc,
+                               u32 *check_sum)
+{
+       obd_dif_csum_fn *fn = NULL;
+       int sector_size = 0;
+       int rc;
+
+       ENTRY;
+       obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
+
+       if (fn)
+               rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
+                                            opc, fn, sector_size, check_sum);
+       else
+               rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
+                                      check_sum);
+
+       RETURN(rc);
+}
+
 static int
 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
                     u32 page_count, struct brw_page **pga,
@@ -1102,6 +1283,7 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         struct req_capsule      *pill;
         struct brw_page *pg_prev;
        void *short_io_buf;
+       const char *obd_name = cli->cl_import->imp_obd->obd_name;
 
         ENTRY;
         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
@@ -1296,12 +1478,14 @@ no_bulk:
                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
                                 body->oa.o_flags = 0;
 
-                        body->oa.o_flags |= cksum_type_pack(cksum_type);
+                       body->oa.o_flags |= obd_cksum_type_pack(obd_name,
+                                                               cksum_type);
                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
 
-                       rc = osc_checksum_bulk(requested_nob, page_count,
-                                              pga, OST_WRITE, cksum_type,
-                                              &body->oa.o_cksum);
+                       rc = osc_checksum_bulk_rw(obd_name, cksum_type,
+                                                 requested_nob, page_count,
+                                                 pga, OST_WRITE,
+                                                 &body->oa.o_cksum);
                        if (rc < 0) {
                                CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
                                       rc);
@@ -1312,7 +1496,8 @@ no_bulk:
 
                         /* save this in 'oa', too, for later checking */
                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-                        oa->o_flags |= cksum_type_pack(cksum_type);
+                       oa->o_flags |= obd_cksum_type_pack(obd_name,
+                                                          cksum_type);
                 } else {
                         /* clear out the checksum flag, in case this is a
                          * resend but cl_checksum is no longer set. b=11238 */
@@ -1327,9 +1512,10 @@ no_bulk:
                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
                                 body->oa.o_flags = 0;
-                        body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
+                       body->oa.o_flags |= obd_cksum_type_pack(obd_name,
+                               cli->cl_cksum_type);
                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-                }
+               }
 
                /* Client cksum has been already copied to wire obdo in previous
                 * lustre_set_wire_obdo(), and in the case a bulk-read is being
@@ -1426,12 +1612,16 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
 
 static int
 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
-                               __u32 client_cksum, __u32 server_cksum,
-                               struct osc_brw_async_args *aa)
+                    __u32 client_cksum, __u32 server_cksum,
+                    struct osc_brw_async_args *aa)
 {
-        __u32 new_cksum;
-        char *msg;
+       const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
        enum cksum_types cksum_type;
+       obd_dif_csum_fn *fn = NULL;
+       int sector_size = 0;
+       bool t10pi = false;
+       __u32 new_cksum;
+       char *msg;
        int rc;
 
         if (server_cksum == client_cksum) {
@@ -1443,15 +1633,50 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
                dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
                                    server_cksum, client_cksum);
 
-       cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
-                                      oa->o_flags : 0);
-       rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
-                              aa->aa_ppga, OST_WRITE, cksum_type,
-                              &new_cksum);
+       cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
+                                          oa->o_flags : 0);
+
+       switch (cksum_type) {
+       case OBD_CKSUM_T10IP512:
+               t10pi = true;
+               fn = obd_dif_ip_fn;
+               sector_size = 512;
+               break;
+       case OBD_CKSUM_T10IP4K:
+               t10pi = true;
+               fn = obd_dif_ip_fn;
+               sector_size = 4096;
+               break;
+       case OBD_CKSUM_T10CRC512:
+               t10pi = true;
+               fn = obd_dif_crc_fn;
+               sector_size = 512;
+               break;
+       case OBD_CKSUM_T10CRC4K:
+               t10pi = true;
+               fn = obd_dif_crc_fn;
+               sector_size = 4096;
+               break;
+       default:
+               break;
+       }
+
+       if (t10pi)
+               rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
+                                            aa->aa_page_count,
+                                            aa->aa_ppga,
+                                            OST_WRITE,
+                                            fn,
+                                            sector_size,
+                                            &new_cksum);
+       else
+               rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
+                                      aa->aa_ppga, OST_WRITE, cksum_type,
+                                      &new_cksum);
 
        if (rc < 0)
                msg = "failed to calculate the client write checksum";
-       else if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
+       else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
                 msg = "the server did not use the checksum type specified in "
                       "the original request - likely a protocol problem";
         else if (new_cksum == server_cksum)
@@ -1467,15 +1692,15 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
                           DFID " object "DOSTID" extent [%llu-%llu], original "
                           "client csum %x (type %x), server csum %x (type %x),"
                           " client csum now %x\n",
-                          aa->aa_cli->cl_import->imp_obd->obd_name,
-                          msg, libcfs_nid2str(peer->nid),
+                          obd_name, msg, libcfs_nid2str(peer->nid),
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
                           oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
                           POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
                           aa->aa_ppga[aa->aa_page_count - 1]->off +
                                aa->aa_ppga[aa->aa_page_count-1]->count - 1,
-                          client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
+                          client_cksum,
+                          obd_cksum_type_unpack(aa->aa_oa->o_flags),
                           server_cksum, cksum_type, new_cksum);
        return 1;
 }
@@ -1483,11 +1708,12 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
 /* Note rc enters this function as number of bytes transferred */
 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 {
-        struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+       struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+       struct client_obd *cli = aa->aa_cli;
+       const char *obd_name = cli->cl_import->imp_obd->obd_name;
        const struct lnet_process_id *peer =
-                        &req->rq_import->imp_connection->c_peer;
-        struct client_obd *cli = aa->aa_cli;
-        struct ost_body *body;
+               &req->rq_import->imp_connection->c_peer;
+       struct ost_body *body;
        u32 client_cksum = 0;
         ENTRY;
 
@@ -1607,16 +1833,16 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                char      *via = "";
                char      *router = "";
                enum cksum_types cksum_type;
-
-                cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
-                                               body->oa.o_flags : 0);
-               rc = osc_checksum_bulk(rc, aa->aa_page_count, aa->aa_ppga,
-                                      OST_READ, cksum_type, &client_cksum);
-               if (rc < 0) {
-                       CDEBUG(D_PAGE,
-                              "failed to calculate checksum, rc = %d\n", rc);
+               u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
+                       body->oa.o_flags : 0;
+
+               cksum_type = obd_cksum_type_unpack(o_flags);
+               rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
+                                         aa->aa_page_count, aa->aa_ppga,
+                                         OST_READ, &client_cksum);
+               if (rc < 0)
                        GOTO(out, rc);
-               }
+
                if (req->rq_bulk != NULL &&
                    peer->nid != req->rq_bulk->bd_sender) {
                        via = " via ";
@@ -1638,7 +1864,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                                           "%s%s%s inode "DFID" object "DOSTID
                                           " extent [%llu-%llu], client %x, "
                                           "server %x, cksum_type %x\n",
-                                          req->rq_import->imp_obd->obd_name,
+                                          obd_name,
                                           libcfs_nid2str(peer->nid),
                                           via, router,
                                           clbody->oa.o_valid & OBD_MD_FLFID ?
@@ -2483,7 +2709,7 @@ static int osc_statfs_async(struct obd_export *exp,
         struct obd_device     *obd = class_exp2obd(exp);
         struct ptlrpc_request *req;
         struct osc_async_args *aa;
-        int                    rc;
+       int rc;
         ENTRY;
 
         /* We could possibly pass max_age in the request (as an absolute
@@ -2501,15 +2727,15 @@ static int osc_statfs_async(struct obd_export *exp,
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
-        ptlrpc_request_set_replen(req);
-        req->rq_request_portal = OST_CREATE_PORTAL;
-        ptlrpc_at_set_req_timeout(req);
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OST_CREATE_PORTAL;
+       ptlrpc_at_set_req_timeout(req);
 
-        if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
-                /* procfs requests not want stat in wait for avoid deadlock */
-                req->rq_no_resend = 1;
-                req->rq_no_delay = 1;
-        }
+       if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
+               /* procfs requests not want stat in wait for avoid deadlock */
+               req->rq_no_resend = 1;
+               req->rq_no_delay = 1;
+       }
 
        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
@@ -2523,12 +2749,13 @@ static int osc_statfs_async(struct obd_export *exp,
 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
                      struct obd_statfs *osfs, time64_t max_age, __u32 flags)
 {
-        struct obd_device     *obd = class_exp2obd(exp);
-        struct obd_statfs     *msfs;
-        struct ptlrpc_request *req;
-        struct obd_import     *imp = NULL;
-        int rc;
-        ENTRY;
+       struct obd_device     *obd = class_exp2obd(exp);
+       struct obd_statfs     *msfs;
+       struct ptlrpc_request *req;
+       struct obd_import     *imp = NULL;
+       int rc;
+       ENTRY;
+
 
         /*Since the request might also come from lprocfs, so we need
          *sync this with client_disconnect_export Bug15684*/
@@ -2539,49 +2766,48 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
         if (!imp)
                 RETURN(-ENODEV);
 
-        /* We could possibly pass max_age in the request (as an absolute
-         * timestamp or a "seconds.usec ago") so the target can avoid doing
-         * extra calls into the filesystem if that isn't necessary (e.g.
-         * during mount that would help a bit).  Having relative timestamps
-         * is not so great if request processing is slow, while absolute
-         * timestamps are not ideal because they need time synchronization. */
-        req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
+       /* We could possibly pass max_age in the request (as an absolute
+        * timestamp or a "seconds.usec ago") so the target can avoid doing
+        * extra calls into the filesystem if that isn't necessary (e.g.
+        * during mount that would help a bit).  Having relative timestamps
+        * is not so great if request processing is slow, while absolute
+        * timestamps are not ideal because they need time synchronization. */
+       req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
 
-        class_import_put(imp);
+       class_import_put(imp);
 
-        if (req == NULL)
-                RETURN(-ENOMEM);
+       if (req == NULL)
+               RETURN(-ENOMEM);
 
-        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
-        ptlrpc_request_set_replen(req);
-        req->rq_request_portal = OST_CREATE_PORTAL;
-        ptlrpc_at_set_req_timeout(req);
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OST_CREATE_PORTAL;
+       ptlrpc_at_set_req_timeout(req);
 
-        if (flags & OBD_STATFS_NODELAY) {
-                /* procfs requests not want stat in wait for avoid deadlock */
-                req->rq_no_resend = 1;
-                req->rq_no_delay = 1;
-        }
+       if (flags & OBD_STATFS_NODELAY) {
+               /* procfs requests not want stat in wait for avoid deadlock */
+               req->rq_no_resend = 1;
+               req->rq_no_delay = 1;
+       }
 
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out, rc);
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               GOTO(out, rc);
 
-        msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
-        if (msfs == NULL) {
-                GOTO(out, rc = -EPROTO);
-        }
+       msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
+       if (msfs == NULL)
+               GOTO(out, rc = -EPROTO);
 
-        *osfs = *msfs;
+       *osfs = *msfs;
 
-        EXIT;
- out:
-        ptlrpc_req_finished(req);
-        return rc;
+       EXIT;
+out:
+       ptlrpc_req_finished(req);
+       return rc;
 }
 
 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
@@ -2776,27 +3002,24 @@ int osc_disconnect(struct obd_export *exp)
        struct obd_device *obd = class_exp2obd(exp);
        int rc;
 
-        rc = client_disconnect_export(exp);
-        /**
-         * Initially we put del_shrink_grant before disconnect_export, but it
-         * causes the following problem if setup (connect) and cleanup
-         * (disconnect) are tangled together.
-         *      connect p1                     disconnect p2
-         *   ptlrpc_connect_import
-         *     ...............               class_manual_cleanup
-         *                                     osc_disconnect
-         *                                     del_shrink_grant
-         *   ptlrpc_connect_interrupt
-         *     init_grant_shrink
-         *   add this client to shrink list
-         *                                      cleanup_osc
-         * Bang! pinger trigger the shrink.
-         * So the osc should be disconnected from the shrink list, after we
-         * are sure the import has been destroyed. BUG18662
-         */
-        if (obd->u.cli.cl_import == NULL)
-                osc_del_shrink_grant(&obd->u.cli);
-        return rc;
+       rc = client_disconnect_export(exp);
+       /**
+        * Initially we put del_shrink_grant before disconnect_export, but it
+        * causes the following problem if setup (connect) and cleanup
+        * (disconnect) are tangled together.
+        *      connect p1                     disconnect p2
+        *   ptlrpc_connect_import
+        *     ...............               class_manual_cleanup
+        *                                     osc_disconnect
+        *                                     del_shrink_grant
+        *   ptlrpc_connect_interrupt
+        *     osc_init_grant
+        *   add this client to shrink list
+        *                                      cleanup_osc
+        * Bang! grant shrink thread trigger the shrink. BUG18662
+        */
+       osc_del_grant_list(&obd->u.cli);
+       return rc;
 }
 EXPORT_SYMBOL(osc_disconnect);
 
@@ -2970,8 +3193,8 @@ int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
                GOTO(out_ptlrpcd_work, rc);
 
        cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
+       osc_update_next_shrink(cli);
 
-       INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
        RETURN(rc);
 
 out_ptlrpcd_work:
@@ -3023,12 +3246,12 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                atomic_add(added, &osc_pool_req_count);
        }
 
-       INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
        ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
 
        spin_lock(&osc_shrink_lock);
        list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
        spin_unlock(&osc_shrink_lock);
+       cli->cl_import->imp_idle_timeout = osc_idle_timeout;
 
        RETURN(0);
 }
@@ -3214,19 +3437,28 @@ static int __init osc_init(void)
        osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
                                          ptlrpc_add_rqs_to_pool);
 
-       if (osc_rq_pool != NULL)
-               GOTO(out, rc);
-       rc = -ENOMEM;
+       if (osc_rq_pool == NULL)
+               GOTO(out_type, rc = -ENOMEM);
+
+       rc = osc_start_grant_work();
+       if (rc != 0)
+               GOTO(out_req_pool, rc);
+
+       RETURN(rc);
+
+out_req_pool:
+       ptlrpc_free_rq_pool(osc_rq_pool);
 out_type:
        class_unregister_type(LUSTRE_OSC_NAME);
 out_kmem:
        lu_kmem_fini(osc_caches);
-out:
+
        RETURN(rc);
 }
 
 static void __exit osc_exit(void)
 {
+       osc_stop_grant_work();
        remove_shrinker(osc_cache_shrinker);
        class_unregister_type(LUSTRE_OSC_NAME);
        lu_kmem_fini(osc_caches);