Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 65c0be4..e6bbf9a 100644 (file)
@@ -185,7 +185,7 @@ static inline void osc_pack_req_body(struct ptlrpc_request *req,
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
 
-        body->oa = *oinfo->oi_oa;
+        lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
         osc_pack_capa(req, body, oinfo->oi_capa);
 }
 
@@ -214,7 +214,7 @@ static int osc_getattr_interpret(const struct lu_env *env,
                                   lustre_swab_ost_body);
         if (body) {
                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
-                memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
+                lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
 
                 /* This should really be sent by the OST */
                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
@@ -292,7 +292,7 @@ static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
                 GOTO(out, rc = -EPROTO);
 
         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
-        *oinfo->oi_oa = body->oa;
+        lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
 
         /* This should really be sent by the OST */
         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
@@ -340,7 +340,7 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        *oinfo->oi_oa = body->oa;
+        lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
 
         EXIT;
 out:
@@ -362,7 +362,7 @@ static int osc_setattr_interpret(const struct lu_env *env,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        *aa->aa_oi->oi_oa = body->oa;
+        lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
 out:
         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
         RETURN(rc);
@@ -446,7 +446,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
-        body->oa = *oa;
+        lustre_set_wire_obdo(&body->oa, oa);
 
         ptlrpc_request_set_replen(req);
 
@@ -466,7 +466,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
         if (body == NULL)
                 GOTO(out_req, rc = -EPROTO);
 
-        *oa = body->oa;
+        lustre_get_wire_obdo(oa, &body->oa);
 
         /* This should really be sent by the OST */
         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
@@ -514,7 +514,7 @@ static int osc_punch_interpret(const struct lu_env *env,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        *aa->pa_oa = body->oa;
+        lustre_get_wire_obdo(aa->pa_oa, &body->oa);
 out:
         rc = aa->pa_upcall(aa->pa_cookie, rc);
         RETURN(rc);
@@ -546,7 +546,7 @@ int osc_punch_base(struct obd_export *exp, struct obdo *oa,
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
-        body->oa = *oa;
+        lustre_set_wire_obdo(&body->oa, oa);
         osc_pack_capa(req, body, capa);
 
         ptlrpc_request_set_replen(req);
@@ -605,7 +605,7 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
         /* overload the size and blocks fields in the oa with start/end */
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
-        body->oa = *oa;
+        lustre_set_wire_obdo(&body->oa, oa);
         body->oa.o_size = start;
         body->oa.o_blocks = end;
         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
@@ -621,7 +621,7 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        *oa = body->oa;
+        lustre_get_wire_obdo(oa, &body->oa);
 
         EXIT;
  out:
@@ -728,27 +728,31 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         }
 
         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
-        req->rq_interpret_reply = osc_destroy_interpret;
         ptlrpc_at_set_req_timeout(req);
 
         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
                 oa->o_lcookie = *oti->oti_logcookies;
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
-        body->oa = *oa;
+        lustre_set_wire_obdo(&body->oa, oa);
 
         osc_pack_capa(req, body, (struct obd_capa *)capa);
         ptlrpc_request_set_replen(req);
 
-        if (!osc_can_send_destroy(cli)) {
-                struct l_wait_info lwi = { 0 };
-
-                /*
-                 * Wait until the number of on-going destroy RPCs drops
-                 * under max_rpc_in_flight
-                 */
-                l_wait_event_exclusive(cli->cl_destroy_waitq,
-                                       osc_can_send_destroy(cli), &lwi);
+        /* don't throttle destroy RPCs for the MDT */
+        if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
+                req->rq_interpret_reply = osc_destroy_interpret;
+                if (!osc_can_send_destroy(cli)) {
+                        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
+                                                          NULL);
+
+                        /*
+                         * Wait until the number of on-going destroy RPCs drops
+                         * under max_rpc_in_flight
+                         */
+                        l_wait_event_exclusive(cli->cl_destroy_waitq,
+                                               osc_can_send_destroy(cli), &lwi);
+                }
         }
 
         /* Do not wait for response */
@@ -792,12 +796,22 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
         client_obd_list_unlock(&cli->cl_loi_list_lock);
         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
+
+}
+
+static void osc_update_next_shrink(struct client_obd *cli)
+{
+        cli->cl_next_shrink_grant =
+                cfs_time_shift(cli->cl_grant_shrink_interval);
+        CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
+               cli->cl_next_shrink_grant);
 }
 
 /* caller must hold loi_list_lock */
 static void osc_consume_write_grant(struct client_obd *cli,
                                     struct brw_page *pga)
 {
+        LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
         atomic_inc(&obd_dirty_pages);
         cli->cl_dirty += CFS_PAGE_SIZE;
@@ -806,6 +820,7 @@ static void osc_consume_write_grant(struct client_obd *cli,
         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
                CFS_PAGE_SIZE, pga, pga->pg);
         LASSERT(cli->cl_avail_grant >= 0);
+        osc_update_next_shrink(cli);
 }
 
 /* the companion to osc_consume_write_grant, called when a brw has completed.
@@ -816,6 +831,7 @@ static void osc_release_write_grant(struct client_obd *cli,
         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
         ENTRY;
 
+        LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
                 EXIT;
                 return;
@@ -899,25 +915,178 @@ void osc_wake_cache_waiters(struct client_obd *cli)
         EXIT;
 }
 
-static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
+static void __osc_update_grant(struct client_obd *cli, obd_size grant)
 {
         client_obd_list_lock(&cli->cl_loi_list_lock);
-        cli->cl_avail_grant = ocd->ocd_grant;
+        cli->cl_avail_grant += grant;
         client_obd_list_unlock(&cli->cl_loi_list_lock);
-
-        CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
-               cli->cl_avail_grant, cli->cl_lost_grant);
-        LASSERT(cli->cl_avail_grant >= 0);
 }
 
 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
 {
+        if (body->oa.o_valid & OBD_MD_FLGRANT) {
+                CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
+                __osc_update_grant(cli, body->oa.o_grant);
+        }
+}
+
+static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
+                              void *key, obd_count vallen, void *val,
+                              struct ptlrpc_request_set *set);
+
+static int osc_shrink_grant_interpret(const struct lu_env *env,
+                                      struct ptlrpc_request *req,
+                                      void *aa, int rc)
+{
+        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+        struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
+        struct ost_body *body;
+
+        if (rc != 0) {
+                __osc_update_grant(cli, oa->o_grant);
+                GOTO(out, rc);
+        }
+
+        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+        LASSERT(body);
+        osc_update_grant(cli, body);
+out:
+        OBD_FREE_PTR(oa);
+        return rc;
+}
+
+static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
+{
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        oa->o_grant = cli->cl_avail_grant / 4;
+        cli->cl_avail_grant -= oa->o_grant;
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+        oa->o_flags |= OBD_FL_SHRINK_GRANT;
+        osc_update_next_shrink(cli);
+}
+
+/* Shrink the current grant, either from some large amount to enough for a
+ * full set of in-flight RPCs, or if we have already shrunk to that limit
+ * then to enough for a single RPC.  This avoids keeping more grant than
+ * needed, and avoids shrinking the grant piecemeal. */
+static int osc_shrink_grant(struct client_obd *cli)
+{
+        long target = (cli->cl_max_rpcs_in_flight + 1) *
+                      cli->cl_max_pages_per_rpc;
+
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        if (cli->cl_avail_grant <= target)
+                target = cli->cl_max_pages_per_rpc;
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+        return osc_shrink_grant_to_target(cli, target);
+}
+
+int osc_shrink_grant_to_target(struct client_obd *cli, long target)
+{
+        int    rc = 0;
+        struct ost_body     *body;
+        ENTRY;
+
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        /* Don't shrink if we are already above or below the desired limit
+         * We don't want to shrink below a single RPC, as that will negatively
+         * impact block allocation and long-term performance. */
+        if (target < cli->cl_max_pages_per_rpc)
+                target = cli->cl_max_pages_per_rpc;
+
+        if (target >= cli->cl_avail_grant) {
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
+                RETURN(0);
+        }
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+        OBD_ALLOC_PTR(body);
+        if (!body)
+                RETURN(-ENOMEM);
+
+        osc_announce_cached(cli, &body->oa, 0);
+
         client_obd_list_lock(&cli->cl_loi_list_lock);
-        CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
-        if (body->oa.o_valid & OBD_MD_FLGRANT)
-                cli->cl_avail_grant += body->oa.o_grant;
-        /* waiters are woken in brw_interpret */
+        body->oa.o_grant = cli->cl_avail_grant - target;
+        cli->cl_avail_grant = target;
         client_obd_list_unlock(&cli->cl_loi_list_lock);
+        body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
+        osc_update_next_shrink(cli);
+
+        rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
+                                sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
+                                sizeof(*body), body, NULL);
+        if (rc != 0)
+                __osc_update_grant(cli, body->oa.o_grant);
+        OBD_FREE_PTR(body);
+        RETURN(rc);
+}
+
+#define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
+static int osc_should_shrink_grant(struct client_obd *client)
+{
+        cfs_time_t time = cfs_time_current();
+        cfs_time_t next_shrink = client->cl_next_shrink_grant;
+        if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
+                if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
+                    client->cl_avail_grant > GRANT_SHRINK_LIMIT)
+                        return 1;
+                else
+                        osc_update_next_shrink(client);
+        }
+        return 0;
+}
+
+static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
+{
+        struct client_obd *client;
+
+        list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
+                if (osc_should_shrink_grant(client))
+                        osc_shrink_grant(client);
+        }
+        return 0;
+}
+
+static int osc_add_shrink_grant(struct client_obd *client)
+{
+        int rc;
+
+        rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
+                                       TIMEOUT_GRANT,
+                                       osc_grant_shrink_grant_cb, NULL,
+                                       &client->cl_grant_shrink_list);
+        if (rc) {
+                CERROR("add grant client %s error %d\n",
+                        client->cl_import->imp_obd->obd_name, rc);
+                return rc;
+        }
+        CDEBUG(D_CACHE, "add grant client %s \n",
+               client->cl_import->imp_obd->obd_name);
+        osc_update_next_shrink(client);
+        return 0;
+}
+
+static int osc_del_shrink_grant(struct client_obd *client)
+{
+        return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
+                                         TIMEOUT_GRANT);
+}
+
+static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
+{
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        cli->cl_avail_grant = ocd->ocd_grant;
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+        if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
+            list_empty(&cli->cl_grant_shrink_list))
+                osc_add_shrink_grant(cli);
+
+        CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
+               cli->cl_avail_grant, cli->cl_lost_grant);
+        LASSERT(cli->cl_avail_grant >= 0);
 }
 
 /* We assume that the reason this OSC got a short read is because it read
@@ -999,7 +1168,8 @@ static int check_write_rcs(struct ptlrpc_request *req,
 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
 {
         if (p1->flag != p2->flag) {
-                unsigned mask = ~(OBD_BRW_FROM_GRANT|OBD_BRW_NOCACHE);
+                unsigned mask = ~(OBD_BRW_FROM_GRANT|
+                                  OBD_BRW_NOCACHE|OBD_BRW_SYNC);
 
                 /* warn if we try to combine flags that we don't know to be
                  * safe to combine */
@@ -1116,7 +1286,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
         LASSERT(body && ioobj && niobuf);
 
-        body->oa = *oa;
+        lustre_set_wire_obdo(&body->oa, oa);
 
         obdo_to_ioobj(oa, ioobj);
         ioobj->ioo_bufcnt = niocount;
@@ -1168,17 +1338,21 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                 (void *)(niobuf - niocount));
 
         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
+        if (osc_should_shrink_grant(cli))
+                osc_shrink_grant_local(cli, &body->oa);
 
         /* size[REQ_REC_OFF] still sizeof (*body) */
         if (opc == OST_WRITE) {
                 if (unlikely(cli->cl_checksum) &&
-                    req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
+                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         /* store cl_cksum_type in a local variable since
                          * it can be changed via lprocfs */
                         cksum_type_t cksum_type = cli->cl_cksum_type;
 
-                        if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
-                                oa->o_flags = body->oa.o_flags = 0;
+                        if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+                                oa->o_flags &= OBD_FL_LOCAL_MASK;
+                                body->oa.o_flags = 0;
+                        }
                         body->oa.o_flags |= cksum_type_pack(cksum_type);
                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
@@ -1201,7 +1375,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                                      sizeof(__u32) * niocount);
         } else {
                 if (unlikely(cli->cl_checksum) &&
-                    req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
+                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
                                 body->oa.o_flags = 0;
                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
@@ -1308,10 +1482,12 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 
         /* set/clear over quota flag for a uid/gid */
         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
-            body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
-                lquota_setdq(quota_interface, cli, body->oa.o_uid,
-                             body->oa.o_gid, body->oa.o_valid,
+            body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
+                unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
+
+                lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
                              body->oa.o_flags);
+        }
 
         if (rc < 0)
                 RETURN(rc);
@@ -1328,6 +1504,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 }
                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
 
+                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
+                        RETURN(-EAGAIN);
+
                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
                     check_write_checksum(&body->oa, peer, client_cksum,
                                          body->oa.o_cksum, aa->aa_requested_nob,
@@ -1335,15 +1514,18 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                                          cksum_type_unpack(aa->aa_oa->o_flags)))
                         RETURN(-EAGAIN);
 
-                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
-                        RETURN(-EAGAIN);
-
                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
                                      aa->aa_page_count, aa->aa_ppga);
                 GOTO(out, rc);
         }
 
         /* The rest of this function executes only for OST_READs */
+
+        /* if unwrap_bulk failed, return -EAGAIN to retry */
+        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+        if (rc < 0)
+                GOTO(out, rc = -EAGAIN);
+
         if (rc > aa->aa_requested_nob) {
                 CERROR("Unexpected rc %d (%d requested)\n", rc,
                        aa->aa_requested_nob);
@@ -1359,10 +1541,6 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
         if (rc < aa->aa_requested_nob)
                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
 
-        if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
-                                         aa->aa_ppga))
-                GOTO(out, rc = -EAGAIN);
-
         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
                 static int cksum_counter;
                 __u32      server_cksum = body->oa.o_cksum;
@@ -1431,7 +1609,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
         }
 out:
         if (rc >= 0)
-                *aa->aa_oa = body->oa;
+                lustre_get_wire_obdo(aa->aa_oa, &body->oa);
 
         RETURN(rc);
 }
@@ -1643,15 +1821,18 @@ static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
         struct obdo *saved_oa = NULL;
         struct brw_page **ppga, **orig;
         struct obd_import *imp = class_exp2cliimp(exp);
-        struct client_obd *cli = &imp->imp_obd->u.cli;
+        struct client_obd *cli;
         int rc, page_count_orig;
         ENTRY;
 
+        LASSERT((imp != NULL) && (imp->imp_obd != NULL));
+        cli = &imp->imp_obd->u.cli;
+
         if (cmd & OBD_BRW_CHECK) {
                 /* The caller just wants to know if there's a chance that this
                  * I/O can succeed */
 
-                if (imp == NULL || imp->imp_invalid)
+                if (imp->imp_invalid)
                         RETURN(-EIO);
                 RETURN(0);
         }
@@ -1766,6 +1947,25 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
         RETURN(0);
 }
 
+static int lop_makes_hprpc(struct loi_oap_pages *lop)
+{
+        struct osc_async_page *oap;
+        ENTRY;
+
+        if (list_empty(&lop->lop_urgent))
+                RETURN(0);
+
+        oap = list_entry(lop->lop_urgent.next,
+                         struct osc_async_page, oap_urgent_item);
+
+        if (oap->oap_async_flags & ASYNC_HP) {
+                CDEBUG(D_CACHE, "hp request forcing RPC\n");
+                RETURN(1);
+        }
+
+        RETURN(0);
+}
+
 static void on_list(struct list_head *item, struct list_head *list,
                     int should_be_on)
 {
@@ -1779,9 +1979,17 @@ static void on_list(struct list_head *item, struct list_head *list,
  * can find pages to build into rpcs quickly */
 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
 {
-        on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
-                lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
-                lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+        if (lop_makes_hprpc(&loi->loi_write_lop) ||
+            lop_makes_hprpc(&loi->loi_read_lop)) {
+                /* HP rpc */
+                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
+                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
+        } else {
+                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+                        lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
+                        lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+        }
 
         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
                 loi->loi_write_lop.lop_num_pending);
@@ -1877,8 +2085,10 @@ void osc_oap_to_pending(struct osc_async_page *oap)
         else
                 lop = &oap->oap_loi->loi_read_lop;
 
-        if (oap->oap_async_flags & ASYNC_URGENT)
+        if (oap->oap_async_flags & ASYNC_HP)
                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+        else if (oap->oap_async_flags & ASYNC_URGENT)
+                list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
 }
@@ -1978,6 +2188,9 @@ static int brw_interpret(const struct lu_env *env,
                 int i;
                 for (i = 0; i < aa->aa_page_count; i++)
                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
+               
+                if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
+                        OBDO_FREE(aa->aa_oa);
         }
         osc_wake_cache_waiters(cli);
         osc_check_rpcs(env, cli);
@@ -2138,12 +2351,22 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
         struct cl_object *clob = NULL;
         ENTRY;
 
+        /* If there are HP OAPs we need to handle at least 1 of them,
+         * move it the beginning of the pending list for that. */
+        if (!list_empty(&lop->lop_urgent)) {
+                oap = list_entry(lop->lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                if (oap->oap_async_flags & ASYNC_HP)
+                        list_move(&oap->oap_pending_item, &lop->lop_pending);
+        }
+
         /* first we find the pages we're allowed to work with */
         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
                                  oap_pending_item) {
                 ops = oap->oap_caller_ops;
 
-                LASSERT(oap->oap_magic == OAP_MAGIC);
+                LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
+                         "magic 0x%x\n", oap, oap->oap_magic);
 
                 if (clob == NULL) {
                         /* pin object in memory, so that completion call-backs
@@ -2347,7 +2570,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 
 #define LOI_DEBUG(LOI, STR, args...)                                     \
         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
-               !list_empty(&(LOI)->loi_cli_item),                        \
+               !list_empty(&(LOI)->loi_ready_item) ||                    \
+               !list_empty(&(LOI)->loi_hp_ready_item),                   \
                (LOI)->loi_write_lop.lop_num_pending,                     \
                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
                (LOI)->loi_read_lop.lop_num_pending,                      \
@@ -2359,11 +2583,16 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
 {
         ENTRY;
-        /* first return all objects which we already know to have
-         * pages ready to be stuffed into rpcs */
+
+        /* First return objects that have blocked locks so that they
+         * will be flushed quickly and other clients can get the lock,
+         * then objects which have pages ready to be stuffed into RPCs */
+        if (!list_empty(&cli->cl_loi_hp_ready_list))
+                RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
+                                  struct lov_oinfo, loi_hp_ready_item));
         if (!list_empty(&cli->cl_loi_ready_list))
                 RETURN(list_entry(cli->cl_loi_ready_list.next,
-                                  struct lov_oinfo, loi_cli_item));
+                                  struct lov_oinfo, loi_ready_item));
 
         /* then if we have cache waiters, return all objects with queued
          * writes.  This is especially important when many small files
@@ -2387,6 +2616,26 @@ struct lov_oinfo *osc_next_loi(struct client_obd *cli)
         RETURN(NULL);
 }
 
+static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
+{
+        struct osc_async_page *oap;
+        int hprpc = 0;
+
+        if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
+                oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+        }
+
+        if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
+                oap = list_entry(loi->loi_read_lop.lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+        }
+
+        return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
+}
+
 /* called with the loi list lock held */
 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 {
@@ -2397,7 +2646,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
         while ((loi = osc_next_loi(cli)) != NULL) {
                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
 
-                if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
+                if (osc_max_rpc_in_flight(cli, loi))
                         break;
 
                 /* attempt some read/write balancing by alternating between
@@ -2429,8 +2678,10 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 
                 /* attempt some inter-object balancing by issueing rpcs
                  * for each object in turn */
-                if (!list_empty(&loi->loi_cli_item))
-                        list_del_init(&loi->loi_cli_item);
+                if (!list_empty(&loi->loi_hp_ready_item))
+                        list_del_init(&loi->loi_hp_ready_item);
+                if (!list_empty(&loi->loi_ready_item))
+                        list_del_init(&loi->loi_ready_item);
                 if (!list_empty(&loi->loi_write_item))
                         list_del_init(&loi->loi_write_item);
                 if (!list_empty(&loi->loi_read_item))
@@ -2619,6 +2870,7 @@ int osc_queue_async_io(const struct lu_env *env,
         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
                 struct cl_object *obj;
                 struct cl_attr    attr; /* XXX put attr into thread info */
+                unsigned int qid[MAXQUOTAS];
 
                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
 
@@ -2626,8 +2878,10 @@ int osc_queue_async_io(const struct lu_env *env,
                 rc = cl_object_attr_get(env, obj, &attr);
                 cl_object_attr_unlock(obj);
 
-                if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
-                                            attr.cat_gid) == NO_QUOTA)
+                qid[USRQUOTA] = attr.cat_uid;
+                qid[GRPQUOTA] = attr.cat_gid;
+                if (rc == 0 &&
+                    lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
                         rc = -EDQUOT;
                 if (rc)
                         RETURN(rc);
@@ -2643,6 +2897,9 @@ int osc_queue_async_io(const struct lu_env *env,
         oap->oap_page_off = off;
         oap->oap_count = count;
         oap->oap_brw_flags = brw_flags;
+        /* Give a hint to OST that requests are coming from kswapd - bug19529 */
+        if (libcfs_memory_pressure_get())
+                oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
         oap->oap_async_flags = async_flags;
 
         if (cmd & OBD_BRW_WRITE) {
@@ -2675,8 +2932,7 @@ int osc_set_async_flags_base(struct client_obd *cli,
         struct loi_oap_pages *lop;
         ENTRY;
 
-        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
-                RETURN(-EIO);
+        LASSERT(!list_empty(&oap->oap_pending_item));
 
         if (oap->oap_cmd & OBD_BRW_WRITE) {
                 lop = &loi->loi_write_lop;
@@ -2684,20 +2940,20 @@ int osc_set_async_flags_base(struct client_obd *cli,
                 lop = &loi->loi_read_lop;
         }
 
-        if (list_empty(&oap->oap_pending_item))
-                RETURN(-EINVAL);
-
         if ((oap->oap_async_flags & async_flags) == async_flags)
                 RETURN(0);
 
         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
                 oap->oap_async_flags |= ASYNC_READY;
 
-        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
-                if (list_empty(&oap->oap_rpc_item)) {
+        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
+            list_empty(&oap->oap_rpc_item)) {
+                if (oap->oap_async_flags & ASYNC_HP)
                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
-                        loi_list_maint(cli, loi);
-                }
+                else
+                        list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+                oap->oap_async_flags |= ASYNC_URGENT;
+                loi_list_maint(cli, loi);
         }
 
         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
@@ -2738,7 +2994,7 @@ int osc_teardown_async_page(struct obd_export *exp,
 
         if (!list_empty(&oap->oap_urgent_item)) {
                 list_del_init(&oap->oap_urgent_item);
-                oap->oap_async_flags &= ~ASYNC_URGENT;
+                oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
         }
         if (!list_empty(&oap->oap_pending_item)) {
                 list_del_init(&oap->oap_pending_item);
@@ -2858,6 +3114,9 @@ static int osc_enqueue_interpret(const struct lu_env *env,
         /* Complete osc stuff. */
         rc = osc_enqueue_fini(req, aa->oa_lvb,
                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
+
+        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
+
         /* Release the lock for async request. */
         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
                 /*
@@ -3150,6 +3409,10 @@ static int osc_statfs_interpret(const struct lu_env *env,
         struct obd_statfs *msfs;
         ENTRY;
 
+        if ((rc == -ENOTCONN || rc == -EAGAIN) &&
+            (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
+                GOTO(out, rc = 0);
+
         if (rc != 0)
                 GOTO(out, rc);
 
@@ -3455,6 +3718,7 @@ static int osc_get_info(struct obd_export *exp, obd_count keylen,
                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
                 memcpy(tmp, key, keylen);
 
+                req->rq_no_delay = req->rq_no_resend = 1;
                 ptlrpc_request_set_replen(req);
                 rc = ptlrpc_queue_wait(req);
                 if (rc)
@@ -3516,27 +3780,20 @@ static int osc_get_info(struct obd_export *exp, obd_count keylen,
         RETURN(-EINVAL);
 }
 
-static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
-                                          struct ptlrpc_request *req,
-                                          void *aa, int rc)
+static int osc_setinfo_mds_connect_import(struct obd_import *imp)
 {
         struct llog_ctxt *ctxt;
-        struct obd_import *imp = req->rq_import;
+        int rc = 0;
         ENTRY;
 
-        if (rc != 0)
-                RETURN(rc);
-
         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
         if (ctxt) {
-                if (rc == 0)
-                        rc = llog_initiator_connect(ctxt);
-                else
-                        CERROR("cannot establish connection for "
-                               "ctxt %p: %d\n", ctxt, rc);
+                rc = llog_initiator_connect(ctxt);
+                llog_ctxt_put(ctxt);
+        } else {
+                /* XXX return an error? skip setting below flags? */
         }
 
-        llog_ctxt_put(ctxt);
         spin_lock(&imp->imp_lock);
         imp->imp_server_timeout = 1;
         imp->imp_pingable = 1;
@@ -3546,6 +3803,17 @@ static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
         RETURN(rc);
 }
 
+static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
+                                          struct ptlrpc_request *req,
+                                          void *aa, int rc)
+{
+        ENTRY;
+        if (rc != 0)
+                RETURN(rc);
+
+        RETURN(osc_setinfo_mds_connect_import(req->rq_import));
+}
+
 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                               void *key, obd_count vallen, void *val,
                               struct ptlrpc_request_set *set)
@@ -3609,7 +3877,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                 RETURN(0);
         }
 
-        if (!set)
+        if (!set && !KEY_IS(KEY_GRANT_SHRINK))
                 RETURN(-EINVAL);
 
         /* We pass all other commands directly to OST. Since nobody calls osc
@@ -3619,8 +3887,11 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
            Even if something bad goes through, we'd get a -EINVAL from OST
            anyway. */
 
+        if (KEY_IS(KEY_GRANT_SHRINK))
+                req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
+        else
+                req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
 
-        req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -3645,12 +3916,31 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
+                req->rq_no_delay = req->rq_no_resend = 1;
                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
+        } else if (KEY_IS(KEY_GRANT_SHRINK)) {
+                struct osc_grant_args *aa;
+                struct obdo *oa;
+
+                CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+                aa = ptlrpc_req_async_args(req);
+                OBD_ALLOC_PTR(oa);
+                if (!oa) {
+                        ptlrpc_req_finished(req);
+                        RETURN(-ENOMEM);
+                }
+                *oa = ((struct ost_body *)val)->oa;
+                aa->aa_oa = oa;
+                req->rq_interpret_reply = osc_shrink_grant_interpret;
         }
 
         ptlrpc_request_set_replen(req);
-        ptlrpc_set_add_req(set, req);
-        ptlrpc_check_set(NULL, set);
+        if (!KEY_IS(KEY_GRANT_SHRINK)) {
+                LASSERT(set != NULL);
+                ptlrpc_set_add_req(set, req);
+                ptlrpc_check_set(NULL, set);
+        } else
+                ptlrpcd_add_req(req, PSCOPE_OTHER);
 
         RETURN(0);
 }
@@ -3763,17 +4053,36 @@ static int osc_disconnect(struct obd_export *exp)
         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
         if (ctxt) {
                 if (obd->u.cli.cl_conn_count == 1) {
-                        /* Flush any remaining cancel messages out to the 
+                        /* Flush any remaining cancel messages out to the
                          * target */
                         llog_sync(ctxt, exp);
                 }
                 llog_ctxt_put(ctxt);
         } else {
-                CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n", 
+                CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
                        obd);
         }
 
         rc = client_disconnect_export(exp);
+        /**
+         * Initially we put del_shrink_grant before disconnect_export, but it
+         * causes the following problem if setup (connect) and cleanup
+         * (disconnect) are tangled together.
+         *      connect p1                     disconnect p2
+         *   ptlrpc_connect_import
+         *     ...............               class_manual_cleanup
+         *                                     osc_disconnect
+         *                                     del_shrink_grant
+         *   ptlrpc_connect_interrupt
+         *     init_grant_shrink
+         *   add this client to shrink list
+         *                                      cleanup_osc
+         * Bang! pinger trigger the shrink.
+         * So the osc should be disconnected from the shrink list, after we
+         * are sure the import has been destroyed. BUG18662
+         */
+        if (obd->u.cli.cl_import == NULL)
+                osc_del_shrink_grant(&obd->u.cli);
         return rc;
 }
 
@@ -3878,6 +4187,7 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 struct lprocfs_static_vars lvars = { 0 };
                 struct client_obd *cli = &obd->u.cli;
 
+                cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
                 lprocfs_osc_init_vars(&lvars);
                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
                         lproc_osc_attach_seqstat(obd);
@@ -3895,6 +4205,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
                                             OST_MAXREQSIZE,
                                             ptlrpc_add_rqs_to_pool);
+
+                CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
+                sema_init(&cli->cl_grant_sem, 1);
         }
 
         RETURN(rc);
@@ -3939,25 +4252,19 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
                 if (rc != 0)
                         CERROR("failed to cleanup llogging subsystems\n");
                 break;
-               }
+                }
         }
         RETURN(rc);
 }
 
 int osc_cleanup(struct obd_device *obd)
 {
-        struct osc_creator *oscc = &obd->u.cli.cl_oscc;
         int rc;
 
         ENTRY;
         ptlrpc_lprocfs_unregister_obd(obd);
         lprocfs_obd_cleanup(obd);
 
-        spin_lock(&oscc->oscc_lock);
-        oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
-        oscc->oscc_flags |= OSCC_FLAG_EXITING;
-        spin_unlock(&oscc->oscc_lock);
-
         /* free memory of osc quota cache */
         lquota_cleanup(quota_interface, obd);
 
@@ -3978,8 +4285,8 @@ int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
         default:
                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
                                               lcfg, obd);
-               if (rc > 0)
-                       rc = 0;
+                if (rc > 0)
+                        rc = 0;
                 break;
         }
 
@@ -4007,6 +4314,7 @@ struct obd_ops osc_obd_ops = {
         .o_unpackmd             = osc_unpackmd,
         .o_precreate            = osc_precreate,
         .o_create               = osc_create,
+        .o_create_async         = osc_create_async,
         .o_destroy              = osc_destroy,
         .o_getattr              = osc_getattr,
         .o_getattr_async        = osc_getattr_async,