Whamcloud - gitweb
Branch: HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 9bf1afa..f44f44b 100644 (file)
@@ -728,7 +728,6 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         }
 
         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
-        req->rq_interpret_reply = osc_destroy_interpret;
         ptlrpc_at_set_req_timeout(req);
 
         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
@@ -740,15 +739,19 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         osc_pack_capa(req, body, (struct obd_capa *)capa);
         ptlrpc_request_set_replen(req);
 
-        if (!osc_can_send_destroy(cli)) {
-                struct l_wait_info lwi = { 0 };
-
-                /*
-                 * Wait until the number of on-going destroy RPCs drops
-                 * under max_rpc_in_flight
-                 */
-                l_wait_event_exclusive(cli->cl_destroy_waitq,
-                                       osc_can_send_destroy(cli), &lwi);
+        /* don't throttle destroy RPCs for the MDT */
+        if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
+                req->rq_interpret_reply = osc_destroy_interpret;
+                if (!osc_can_send_destroy(cli)) {
+                        struct l_wait_info lwi = { 0 };
+
+                        /*
+                         * Wait until the number of on-going destroy RPCs drops
+                         * under max_rpc_in_flight
+                         */
+                        l_wait_event_exclusive(cli->cl_destroy_waitq,
+                                               osc_can_send_destroy(cli), &lwi);
+                }
         }
 
         /* Do not wait for response */
@@ -792,6 +795,15 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
         client_obd_list_unlock(&cli->cl_loi_list_lock);
         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
+
+}
+
+static void osc_update_next_shrink(struct client_obd *cli)
+{
+        int time = GRANT_SHRINK_INTERVAL;
+        cli->cl_next_shrink_grant = cfs_time_shift(time);
+        CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
+               cli->cl_next_shrink_grant);
 }
 
 /* caller must hold loi_list_lock */
@@ -806,6 +818,7 @@ static void osc_consume_write_grant(struct client_obd *cli,
         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
                CFS_PAGE_SIZE, pga, pga->pg);
         LASSERT(cli->cl_avail_grant >= 0);
+        osc_update_next_shrink(cli);
 }
 
 /* the companion to osc_consume_write_grant, called when a brw has completed.
@@ -899,25 +912,142 @@ void osc_wake_cache_waiters(struct client_obd *cli)
         EXIT;
 }
 
-static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
+static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
 {
         client_obd_list_lock(&cli->cl_loi_list_lock);
-        cli->cl_avail_grant = ocd->ocd_grant;
+        CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
+        if (body->oa.o_valid & OBD_MD_FLGRANT)
+                cli->cl_avail_grant += body->oa.o_grant;
+        /* waiters are woken in brw_interpret */
         client_obd_list_unlock(&cli->cl_loi_list_lock);
+}
 
-        CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
-               cli->cl_avail_grant, cli->cl_lost_grant);
-        LASSERT(cli->cl_avail_grant >= 0);
+static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
+                              void *key, obd_count vallen, void *val,
+                              struct ptlrpc_request_set *set);
+
+static int osc_shrink_grant_interpret(const struct lu_env *env,
+                                     struct ptlrpc_request *req,
+                                      void *aa, int rc)
+{
+        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+        struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
+        struct ost_body *body;
+        
+        if (rc != 0) {
+                client_obd_list_lock(&cli->cl_loi_list_lock);
+                cli->cl_avail_grant += oa->o_grant;
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
+                GOTO(out, rc);
+        }
+
+        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        LASSERT(body);
+        osc_update_grant(cli, body);
+out:
+        OBD_FREE_PTR(oa);
+        return rc;        
 }
 
-static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
 {
         client_obd_list_lock(&cli->cl_loi_list_lock);
-        CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
-        if (body->oa.o_valid & OBD_MD_FLGRANT)
+        oa->o_grant = cli->cl_avail_grant / 4;
+        cli->cl_avail_grant -= oa->o_grant; 
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+        oa->o_flags |= OBD_FL_SHRINK_GRANT;
+        osc_update_next_shrink(cli);
+}
+
+static int osc_shrink_grant(struct client_obd *cli)
+{
+        int    rc = 0;
+        struct ost_body     *body;
+        ENTRY;
+
+        OBD_ALLOC_PTR(body);
+        if (!body)
+                RETURN(-ENOMEM);
+
+        osc_announce_cached(cli, &body->oa, 0);
+        osc_shrink_grant_local(cli, &body->oa);
+        rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
+                                sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
+                                sizeof(*body), body, NULL);
+        if (rc) {
+                client_obd_list_lock(&cli->cl_loi_list_lock);
                 cli->cl_avail_grant += body->oa.o_grant;
-        /* waiters are woken in brw_interpret */
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
+        }
+        if (body)
+               OBD_FREE_PTR(body);
+        RETURN(rc);
+}
+
+#define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
+static int osc_should_shrink_grant(struct client_obd *client)
+{
+        cfs_time_t time = cfs_time_current();
+        cfs_time_t next_shrink = client->cl_next_shrink_grant;
+        if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
+                if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
+                    client->cl_avail_grant > GRANT_SHRINK_LIMIT)
+                        return 1;
+                else
+                        osc_update_next_shrink(client);
+        }
+        return 0;
+}
+
+static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
+{
+        struct client_obd *client;
+
+        list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
+                if (osc_should_shrink_grant(client))
+                        osc_shrink_grant(client);
+        }
+        return 0;
+}
+
+static int osc_add_shrink_grant(struct client_obd *client)
+{
+        int rc;
+
+        rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL, 
+                                         TIMEOUT_GRANT,
+                                         osc_grant_shrink_grant_cb, NULL,
+                                         &client->cl_grant_shrink_list);
+        if (rc) {
+                CERROR("add grant client %s error %d\n", 
+                        client->cl_import->imp_obd->obd_name, rc);
+                return rc;
+        }
+        CDEBUG(D_CACHE, "add grant client %s \n", 
+               client->cl_import->imp_obd->obd_name);
+        osc_update_next_shrink(client);
+        return 0; 
+}
+
+static int osc_del_shrink_grant(struct client_obd *client)
+{
+        return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list, 
+                                         TIMEOUT_GRANT);
+}
+
+static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
+{
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        cli->cl_avail_grant = ocd->ocd_grant;
         client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+        if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
+            list_empty(&cli->cl_grant_shrink_list))
+                osc_add_shrink_grant(cli);
+
+        CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
+               cli->cl_avail_grant, cli->cl_lost_grant);
+        LASSERT(cli->cl_avail_grant >= 0);
 }
 
 /* We assume that the reason this OSC got a short read is because it read
@@ -999,7 +1129,8 @@ static int check_write_rcs(struct ptlrpc_request *req,
 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
 {
         if (p1->flag != p2->flag) {
-                unsigned mask = ~(OBD_BRW_FROM_GRANT|OBD_BRW_NOCACHE);
+                unsigned mask = ~(OBD_BRW_FROM_GRANT|
+                                  OBD_BRW_NOCACHE|OBD_BRW_SYNC);
 
                 /* warn if we try to combine flags that we don't know to be
                  * safe to combine */
@@ -1168,11 +1299,13 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                 (void *)(niobuf - niocount));
 
         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
+        if (osc_should_shrink_grant(cli))
+                osc_shrink_grant_local(cli, &body->oa); 
 
         /* size[REQ_REC_OFF] still sizeof (*body) */
         if (opc == OST_WRITE) {
                 if (unlikely(cli->cl_checksum) &&
-                    req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
+                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         /* store cl_cksum_type in a local variable since
                          * it can be changed via lprocfs */
                         cksum_type_t cksum_type = cli->cl_cksum_type;
@@ -1201,7 +1334,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                                      sizeof(__u32) * niocount);
         } else {
                 if (unlikely(cli->cl_checksum) &&
-                    req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
+                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
                                 body->oa.o_flags = 0;
                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
@@ -1328,6 +1461,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 }
                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
 
+                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
+                        RETURN(-EAGAIN);
+
                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
                     check_write_checksum(&body->oa, peer, client_cksum,
                                          body->oa.o_cksum, aa->aa_requested_nob,
@@ -1335,15 +1471,17 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                                          cksum_type_unpack(aa->aa_oa->o_flags)))
                         RETURN(-EAGAIN);
 
-                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
-                        RETURN(-EAGAIN);
-
                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
                                      aa->aa_page_count, aa->aa_ppga);
                 GOTO(out, rc);
         }
 
         /* The rest of this function executes only for OST_READs */
+
+        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+        if (rc < 0)
+                GOTO(out, rc);
+
         if (rc > aa->aa_requested_nob) {
                 CERROR("Unexpected rc %d (%d requested)\n", rc,
                        aa->aa_requested_nob);
@@ -1359,10 +1497,6 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
         if (rc < aa->aa_requested_nob)
                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
 
-        if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
-                                         aa->aa_ppga))
-                GOTO(out, rc = -EAGAIN);
-
         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
                 static int cksum_counter;
                 __u32      server_cksum = body->oa.o_cksum;
@@ -1643,15 +1777,18 @@ static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
         struct obdo *saved_oa = NULL;
         struct brw_page **ppga, **orig;
         struct obd_import *imp = class_exp2cliimp(exp);
-        struct client_obd *cli = &imp->imp_obd->u.cli;
+        struct client_obd *cli;
         int rc, page_count_orig;
         ENTRY;
 
+        LASSERT((imp != NULL) && (imp->imp_obd != NULL));
+        cli = &imp->imp_obd->u.cli;
+
         if (cmd & OBD_BRW_CHECK) {
                 /* The caller just wants to know if there's a chance that this
                  * I/O can succeed */
 
-                if (imp == NULL || imp->imp_invalid)
+                if (imp->imp_invalid)
                         RETURN(-EIO);
                 RETURN(0);
         }
@@ -1766,6 +1903,25 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
         RETURN(0);
 }
 
+static int lop_makes_hprpc(struct loi_oap_pages *lop)
+{
+        struct osc_async_page *oap;
+        ENTRY;
+
+        if (list_empty(&lop->lop_urgent))
+                RETURN(0);
+
+        oap = list_entry(lop->lop_urgent.next,
+                         struct osc_async_page, oap_urgent_item);
+
+        if (oap->oap_async_flags & ASYNC_HP) {
+                CDEBUG(D_CACHE, "hp request forcing RPC\n");
+                RETURN(1);
+        }
+
+        RETURN(0);
+}
+
 static void on_list(struct list_head *item, struct list_head *list,
                     int should_be_on)
 {
@@ -1779,9 +1935,17 @@ static void on_list(struct list_head *item, struct list_head *list,
  * can find pages to build into rpcs quickly */
 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
 {
-        on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
-                lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
-                lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+        if (lop_makes_hprpc(&loi->loi_write_lop) ||
+            lop_makes_hprpc(&loi->loi_read_lop)) {
+                /* HP rpc */
+                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
+                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
+        } else {
+                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+                        lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
+                        lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+        }
 
         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
                 loi->loi_write_lop.lop_num_pending);
@@ -1877,8 +2041,10 @@ void osc_oap_to_pending(struct osc_async_page *oap)
         else
                 lop = &oap->oap_loi->loi_read_lop;
 
-        if (oap->oap_async_flags & ASYNC_URGENT)
+        if (oap->oap_async_flags & ASYNC_HP)
                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+        else if (oap->oap_async_flags & ASYNC_URGENT)
+                list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
 }
@@ -2138,12 +2304,22 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
         struct cl_object *clob = NULL;
         ENTRY;
 
+        /* If there are HP OAPs we need to handle at least 1 of them,
+         * move it the beginning of the pending list for that. */
+        if (!list_empty(&lop->lop_urgent)) {
+                oap = list_entry(lop->lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                if (oap->oap_async_flags & ASYNC_HP)
+                        list_move(&oap->oap_pending_item, &lop->lop_pending);
+        }
+
         /* first we find the pages we're allowed to work with */
         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
                                  oap_pending_item) {
                 ops = oap->oap_caller_ops;
 
-                LASSERT(oap->oap_magic == OAP_MAGIC);
+                LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
+                         "magic 0x%x\n", oap, oap->oap_magic);
 
                 if (clob == NULL) {
                         /* pin object in memory, so that completion call-backs
@@ -2347,7 +2523,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 
 #define LOI_DEBUG(LOI, STR, args...)                                     \
         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
-               !list_empty(&(LOI)->loi_cli_item),                        \
+               !list_empty(&(LOI)->loi_ready_item) ||                    \
+               !list_empty(&(LOI)->loi_hp_ready_item),                   \
                (LOI)->loi_write_lop.lop_num_pending,                     \
                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
                (LOI)->loi_read_lop.lop_num_pending,                      \
@@ -2359,11 +2536,16 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
 {
         ENTRY;
-        /* first return all objects which we already know to have
-         * pages ready to be stuffed into rpcs */
+
+        /* First return objects that have blocked locks so that they
+         * will be flushed quickly and other clients can get the lock,
+         * then objects which have pages ready to be stuffed into RPCs */
+        if (!list_empty(&cli->cl_loi_hp_ready_list))
+                RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
+                                  struct lov_oinfo, loi_hp_ready_item));
         if (!list_empty(&cli->cl_loi_ready_list))
                 RETURN(list_entry(cli->cl_loi_ready_list.next,
-                                  struct lov_oinfo, loi_cli_item));
+                                  struct lov_oinfo, loi_ready_item));
 
         /* then if we have cache waiters, return all objects with queued
          * writes.  This is especially important when many small files
@@ -2387,6 +2569,26 @@ struct lov_oinfo *osc_next_loi(struct client_obd *cli)
         RETURN(NULL);
 }
 
+static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
+{
+        struct osc_async_page *oap;
+        int hprpc = 0;
+
+        if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
+                oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+        }
+
+        if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
+                oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+        }
+
+        return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
+}
+
 /* called with the loi list lock held */
 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 {
@@ -2397,7 +2599,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
         while ((loi = osc_next_loi(cli)) != NULL) {
                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
 
-                if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
+                if (osc_max_rpc_in_flight(cli, loi))
                         break;
 
                 /* attempt some read/write balancing by alternating between
@@ -2429,8 +2631,10 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 
                 /* attempt some inter-object balancing by issueing rpcs
                  * for each object in turn */
-                if (!list_empty(&loi->loi_cli_item))
-                        list_del_init(&loi->loi_cli_item);
+                if (!list_empty(&loi->loi_hp_ready_item))
+                        list_del_init(&loi->loi_hp_ready_item);
+                if (!list_empty(&loi->loi_ready_item))
+                        list_del_init(&loi->loi_ready_item);
                 if (!list_empty(&loi->loi_write_item))
                         list_del_init(&loi->loi_write_item);
                 if (!list_empty(&loi->loi_read_item))
@@ -2693,11 +2897,14 @@ int osc_set_async_flags_base(struct client_obd *cli,
         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
                 oap->oap_async_flags |= ASYNC_READY;
 
-        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
-                if (list_empty(&oap->oap_rpc_item)) {
+        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
+            list_empty(&oap->oap_rpc_item)) {
+                if (oap->oap_async_flags & ASYNC_HP)
                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
-                        loi_list_maint(cli, loi);
-                }
+                else
+                        list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+                oap->oap_async_flags |= ASYNC_URGENT;
+                loi_list_maint(cli, loi);
         }
 
         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
@@ -2738,7 +2945,7 @@ int osc_teardown_async_page(struct obd_export *exp,
 
         if (!list_empty(&oap->oap_urgent_item)) {
                 list_del_init(&oap->oap_urgent_item);
-                oap->oap_async_flags &= ~ASYNC_URGENT;
+                oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
         }
         if (!list_empty(&oap->oap_pending_item)) {
                 list_del_init(&oap->oap_pending_item);
@@ -2858,6 +3065,9 @@ static int osc_enqueue_interpret(const struct lu_env *env,
         /* Complete osc stuff. */
         rc = osc_enqueue_fini(req, aa->oa_lvb,
                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
+
+        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
+
         /* Release the lock for async request. */
         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
                 /*
@@ -3407,6 +3617,9 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 err = lquota_poll_check(quota_interface, exp,
                                         (struct if_quotacheck *)karg);
                 GOTO(out, err);
+        case OBD_IOC_PING_TARGET:
+                err = ptlrpc_obd_ping(obd);
+                GOTO(out, err);
         default:
                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
                        cmd, cfs_curproc_comm());
@@ -3606,7 +3819,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                 RETURN(0);
         }
 
-        if (!set)
+        if (!set && !KEY_IS(KEY_GRANT_SHRINK))
                 RETURN(-EINVAL);
 
         /* We pass all other commands directly to OST. Since nobody calls osc
@@ -3616,9 +3829,12 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
            Even if something bad goes through, we'd get a -EINVAL from OST
            anyway. */
 
-
-        req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
-        if (req == NULL)
+       if (KEY_IS(KEY_GRANT_SHRINK))  
+                       req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO); 
+       else 
+               req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
+        
+       if (req == NULL)
                 RETURN(-ENOMEM);
 
         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
@@ -3643,13 +3859,31 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
-        }
+        } else if (KEY_IS(KEY_GRANT_SHRINK)) {
+                struct osc_grant_args *aa;
+                struct obdo *oa;
 
-        ptlrpc_request_set_replen(req);
-        ptlrpc_set_add_req(set, req);
-        ptlrpc_check_set(NULL, set);
-
-        RETURN(0);
+                CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+                aa = ptlrpc_req_async_args(req);
+                OBD_ALLOC_PTR(oa);
+                if (!oa) {
+                        ptlrpc_req_finished(req);
+                        RETURN(-ENOMEM);
+                }
+                *oa = ((struct ost_body *)val)->oa;
+                aa->aa_oa = oa;
+               req->rq_interpret_reply = osc_shrink_grant_interpret;
+       }
+               
+       ptlrpc_request_set_replen(req);
+       if (!KEY_IS(KEY_GRANT_SHRINK)) {
+               LASSERT(set != NULL);
+               ptlrpc_set_add_req(set, req);
+               ptlrpc_check_set(NULL, set);
+       } else 
+               ptlrpcd_add_req(req, PSCOPE_OTHER);
+        
+       RETURN(0);
 }
 
 
@@ -3680,7 +3914,7 @@ static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                         &catid->lci_logid, &osc_mds_ost_orig_logops);
         if (rc) {
                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
-                GOTO (out, rc);
+                GOTO(out, rc);
         }
 
         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
@@ -3760,17 +3994,36 @@ static int osc_disconnect(struct obd_export *exp)
         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
         if (ctxt) {
                 if (obd->u.cli.cl_conn_count == 1) {
-                        /* Flush any remaining cancel messages out to the 
+                        /* Flush any remaining cancel messages out to the
                          * target */
                         llog_sync(ctxt, exp);
                 }
                 llog_ctxt_put(ctxt);
         } else {
-                CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n", 
+                CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
                        obd);
         }
 
         rc = client_disconnect_export(exp);
+        /**
+         * Initially we put del_shrink_grant before disconnect_export, but it
+         * causes the following problem if setup (connect) and cleanup
+         * (disconnect) are tangled together.
+         *      connect p1                     disconnect p2
+         *   ptlrpc_connect_import 
+         *     ...............               class_manual_cleanup
+         *                                     osc_disconnect
+         *                                     del_shrink_grant
+         *   ptlrpc_connect_interrupt
+         *     init_grant_shrink
+         *   add this client to shrink list                 
+         *                                      cleanup_osc
+         * Bang! pinger trigger the shrink.
+         * So the osc should be disconnected from the shrink list, after we
+         * are sure the import has been destroyed. BUG18662 
+         */
+        if (obd->u.cli.cl_import == NULL)
+                osc_del_shrink_grant(&obd->u.cli);
         return rc;
 }
 
@@ -3892,6 +4145,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
                                             OST_MAXREQSIZE,
                                             ptlrpc_add_rqs_to_pool);
+               
+                CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
+                sema_init(&cli->cl_grant_sem, 1);
         }
 
         RETURN(rc);