Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
index b8ee00a..e6bbf9a 100644 (file)
@@ -185,7 +185,7 @@ static inline void osc_pack_req_body(struct ptlrpc_request *req,
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
 
-        body->oa = *oinfo->oi_oa;
+        lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
         osc_pack_capa(req, body, oinfo->oi_capa);
 }
 
@@ -214,7 +214,7 @@ static int osc_getattr_interpret(const struct lu_env *env,
                                   lustre_swab_ost_body);
         if (body) {
                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
-                memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
+                lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
 
                 /* This should really be sent by the OST */
                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
@@ -292,7 +292,7 @@ static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
                 GOTO(out, rc = -EPROTO);
 
         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
-        *oinfo->oi_oa = body->oa;
+        lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
 
         /* This should really be sent by the OST */
         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
@@ -340,7 +340,7 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        *oinfo->oi_oa = body->oa;
+        lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
 
         EXIT;
 out:
@@ -362,7 +362,7 @@ static int osc_setattr_interpret(const struct lu_env *env,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        *aa->aa_oi->oi_oa = body->oa;
+        lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
 out:
         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
         RETURN(rc);
@@ -446,7 +446,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
-        body->oa = *oa;
+        lustre_set_wire_obdo(&body->oa, oa);
 
         ptlrpc_request_set_replen(req);
 
@@ -466,7 +466,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
         if (body == NULL)
                 GOTO(out_req, rc = -EPROTO);
 
-        *oa = body->oa;
+        lustre_get_wire_obdo(oa, &body->oa);
 
         /* This should really be sent by the OST */
         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
@@ -514,7 +514,7 @@ static int osc_punch_interpret(const struct lu_env *env,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        *aa->pa_oa = body->oa;
+        lustre_get_wire_obdo(aa->pa_oa, &body->oa);
 out:
         rc = aa->pa_upcall(aa->pa_cookie, rc);
         RETURN(rc);
@@ -546,7 +546,7 @@ int osc_punch_base(struct obd_export *exp, struct obdo *oa,
 
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
-        body->oa = *oa;
+        lustre_set_wire_obdo(&body->oa, oa);
         osc_pack_capa(req, body, capa);
 
         ptlrpc_request_set_replen(req);
@@ -605,7 +605,7 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
         /* overload the size and blocks fields in the oa with start/end */
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
-        body->oa = *oa;
+        lustre_set_wire_obdo(&body->oa, oa);
         body->oa.o_size = start;
         body->oa.o_blocks = end;
         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
@@ -621,7 +621,7 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        *oa = body->oa;
+        lustre_get_wire_obdo(oa, &body->oa);
 
         EXIT;
  out:
@@ -734,7 +734,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
                 oa->o_lcookie = *oti->oti_logcookies;
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
-        body->oa = *oa;
+        lustre_set_wire_obdo(&body->oa, oa);
 
         osc_pack_capa(req, body, (struct obd_capa *)capa);
         ptlrpc_request_set_replen(req);
@@ -743,7 +743,8 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
                 req->rq_interpret_reply = osc_destroy_interpret;
                 if (!osc_can_send_destroy(cli)) {
-                        struct l_wait_info lwi = { 0 };
+                        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
+                                                          NULL);
 
                         /*
                          * Wait until the number of on-going destroy RPCs drops
@@ -800,8 +801,8 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
 
 static void osc_update_next_shrink(struct client_obd *cli)
 {
-        int time = GRANT_SHRINK_INTERVAL;
-        cli->cl_next_shrink_grant = cfs_time_shift(time);
+        cli->cl_next_shrink_grant =
+                cfs_time_shift(cli->cl_grant_shrink_interval);
         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
                cli->cl_next_shrink_grant);
 }
@@ -810,6 +811,7 @@ static void osc_update_next_shrink(struct client_obd *cli)
 static void osc_consume_write_grant(struct client_obd *cli,
                                     struct brw_page *pga)
 {
+        LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
         atomic_inc(&obd_dirty_pages);
         cli->cl_dirty += CFS_PAGE_SIZE;
@@ -829,6 +831,7 @@ static void osc_release_write_grant(struct client_obd *cli,
         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
         ENTRY;
 
+        LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
                 EXIT;
                 return;
@@ -912,32 +915,35 @@ void osc_wake_cache_waiters(struct client_obd *cli)
         EXIT;
 }
 
-static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+static void __osc_update_grant(struct client_obd *cli, obd_size grant)
 {
         client_obd_list_lock(&cli->cl_loi_list_lock);
-        CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
-        if (body->oa.o_valid & OBD_MD_FLGRANT)
-                cli->cl_avail_grant += body->oa.o_grant;
-        /* waiters are woken in brw_interpret */
+        cli->cl_avail_grant += grant;
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 }
 
+static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+{
+        if (body->oa.o_valid & OBD_MD_FLGRANT) {
+                CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
+                __osc_update_grant(cli, body->oa.o_grant);
+        }
+}
+
 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                               void *key, obd_count vallen, void *val,
                               struct ptlrpc_request_set *set);
 
 static int osc_shrink_grant_interpret(const struct lu_env *env,
-                                     struct ptlrpc_request *req,
+                                      struct ptlrpc_request *req,
                                       void *aa, int rc)
 {
         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
         struct ost_body *body;
-        
+
         if (rc != 0) {
-                client_obd_list_lock(&cli->cl_loi_list_lock);
-                cli->cl_avail_grant += oa->o_grant;
-                client_obd_list_unlock(&cli->cl_loi_list_lock);
+                __osc_update_grant(cli, oa->o_grant);
                 GOTO(out, rc);
         }
 
@@ -946,41 +952,74 @@ static int osc_shrink_grant_interpret(const struct lu_env *env,
         osc_update_grant(cli, body);
 out:
         OBD_FREE_PTR(oa);
-        return rc;        
+        return rc;
 }
 
 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
 {
         client_obd_list_lock(&cli->cl_loi_list_lock);
         oa->o_grant = cli->cl_avail_grant / 4;
-        cli->cl_avail_grant -= oa->o_grant; 
+        cli->cl_avail_grant -= oa->o_grant;
         client_obd_list_unlock(&cli->cl_loi_list_lock);
         oa->o_flags |= OBD_FL_SHRINK_GRANT;
         osc_update_next_shrink(cli);
 }
 
+/* Shrink the current grant, either from some large amount to enough for a
+ * full set of in-flight RPCs, or if we have already shrunk to that limit
+ * then to enough for a single RPC.  This avoids keeping more grant than
+ * needed, and avoids shrinking the grant piecemeal. */
 static int osc_shrink_grant(struct client_obd *cli)
 {
+        long target = (cli->cl_max_rpcs_in_flight + 1) *
+                      cli->cl_max_pages_per_rpc;
+
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        if (cli->cl_avail_grant <= target)
+                target = cli->cl_max_pages_per_rpc;
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+        return osc_shrink_grant_to_target(cli, target);
+}
+
+int osc_shrink_grant_to_target(struct client_obd *cli, long target)
+{
         int    rc = 0;
         struct ost_body     *body;
         ENTRY;
 
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        /* Don't shrink if we are already above or below the desired limit
+         * We don't want to shrink below a single RPC, as that will negatively
+         * impact block allocation and long-term performance. */
+        if (target < cli->cl_max_pages_per_rpc)
+                target = cli->cl_max_pages_per_rpc;
+
+        if (target >= cli->cl_avail_grant) {
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
+                RETURN(0);
+        }
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+
         OBD_ALLOC_PTR(body);
         if (!body)
                 RETURN(-ENOMEM);
 
         osc_announce_cached(cli, &body->oa, 0);
-        osc_shrink_grant_local(cli, &body->oa);
+
+        client_obd_list_lock(&cli->cl_loi_list_lock);
+        body->oa.o_grant = cli->cl_avail_grant - target;
+        cli->cl_avail_grant = target;
+        client_obd_list_unlock(&cli->cl_loi_list_lock);
+        body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
+        osc_update_next_shrink(cli);
+
         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
                                 sizeof(*body), body, NULL);
-        if (rc) {
-                client_obd_list_lock(&cli->cl_loi_list_lock);
-                cli->cl_avail_grant += body->oa.o_grant;
-                client_obd_list_unlock(&cli->cl_loi_list_lock);
-        }
-        if (body)
-               OBD_FREE_PTR(body);
+        if (rc != 0)
+                __osc_update_grant(cli, body->oa.o_grant);
+        OBD_FREE_PTR(body);
         RETURN(rc);
 }
 
@@ -1014,26 +1053,25 @@ static int osc_add_shrink_grant(struct client_obd *client)
 {
         int rc;
 
-        rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL, 
-                                         TIMEOUT_GRANT,
-                                         osc_grant_shrink_grant_cb, NULL,
-                                         &client->cl_grant_shrink_list);
+        rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
+                                       TIMEOUT_GRANT,
+                                       osc_grant_shrink_grant_cb, NULL,
+                                       &client->cl_grant_shrink_list);
         if (rc) {
-                CERROR("add grant client %s error %d\n", 
+                CERROR("add grant client %s error %d\n",
                         client->cl_import->imp_obd->obd_name, rc);
                 return rc;
         }
-        CDEBUG(D_CACHE, "add grant client %s \n", 
+        CDEBUG(D_CACHE, "add grant client %s \n",
                client->cl_import->imp_obd->obd_name);
         osc_update_next_shrink(client);
-        return 0; 
+        return 0;
 }
 
 static int osc_del_shrink_grant(struct client_obd *client)
 {
-        CDEBUG(D_CACHE, "del grant client %s \n", 
-               client->cl_import->imp_obd->obd_name);
-        return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list);
+        return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
+                                         TIMEOUT_GRANT);
 }
 
 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
@@ -1248,7 +1286,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
         LASSERT(body && ioobj && niobuf);
 
-        body->oa = *oa;
+        lustre_set_wire_obdo(&body->oa, oa);
 
         obdo_to_ioobj(oa, ioobj);
         ioobj->ioo_bufcnt = niocount;
@@ -1301,7 +1339,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
 
         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
         if (osc_should_shrink_grant(cli))
-                osc_shrink_grant_local(cli, &body->oa); 
+                osc_shrink_grant_local(cli, &body->oa);
 
         /* size[REQ_REC_OFF] still sizeof (*body) */
         if (opc == OST_WRITE) {
@@ -1311,8 +1349,10 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                          * it can be changed via lprocfs */
                         cksum_type_t cksum_type = cli->cl_cksum_type;
 
-                        if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
-                                oa->o_flags = body->oa.o_flags = 0;
+                        if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+                                oa->o_flags &= OBD_FL_LOCAL_MASK;
+                                body->oa.o_flags = 0;
+                        }
                         body->oa.o_flags |= cksum_type_pack(cksum_type);
                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
@@ -1442,10 +1482,12 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 
         /* set/clear over quota flag for a uid/gid */
         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
-            body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
-                lquota_setdq(quota_interface, cli, body->oa.o_uid,
-                             body->oa.o_gid, body->oa.o_valid,
+            body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
+                unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
+
+                lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
                              body->oa.o_flags);
+        }
 
         if (rc < 0)
                 RETURN(rc);
@@ -1479,9 +1521,10 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 
         /* The rest of this function executes only for OST_READs */
 
+        /* if unwrap_bulk failed, return -EAGAIN to retry */
         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
         if (rc < 0)
-                GOTO(out, rc);
+                GOTO(out, rc = -EAGAIN);
 
         if (rc > aa->aa_requested_nob) {
                 CERROR("Unexpected rc %d (%d requested)\n", rc,
@@ -1566,7 +1609,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
         }
 out:
         if (rc >= 0)
-                *aa->aa_oa = body->oa;
+                lustre_get_wire_obdo(aa->aa_oa, &body->oa);
 
         RETURN(rc);
 }
@@ -1778,15 +1821,18 @@ static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
         struct obdo *saved_oa = NULL;
         struct brw_page **ppga, **orig;
         struct obd_import *imp = class_exp2cliimp(exp);
-        struct client_obd *cli = &imp->imp_obd->u.cli;
+        struct client_obd *cli;
         int rc, page_count_orig;
         ENTRY;
 
+        LASSERT((imp != NULL) && (imp->imp_obd != NULL));
+        cli = &imp->imp_obd->u.cli;
+
         if (cmd & OBD_BRW_CHECK) {
                 /* The caller just wants to know if there's a chance that this
                  * I/O can succeed */
 
-                if (imp == NULL || imp->imp_invalid)
+                if (imp->imp_invalid)
                         RETURN(-EIO);
                 RETURN(0);
         }
@@ -1901,6 +1947,25 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
         RETURN(0);
 }
 
+static int lop_makes_hprpc(struct loi_oap_pages *lop)
+{
+        struct osc_async_page *oap;
+        ENTRY;
+
+        if (list_empty(&lop->lop_urgent))
+                RETURN(0);
+
+        oap = list_entry(lop->lop_urgent.next,
+                         struct osc_async_page, oap_urgent_item);
+
+        if (oap->oap_async_flags & ASYNC_HP) {
+                CDEBUG(D_CACHE, "hp request forcing RPC\n");
+                RETURN(1);
+        }
+
+        RETURN(0);
+}
+
 static void on_list(struct list_head *item, struct list_head *list,
                     int should_be_on)
 {
@@ -1914,9 +1979,17 @@ static void on_list(struct list_head *item, struct list_head *list,
  * can find pages to build into rpcs quickly */
 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
 {
-        on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
-                lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
-                lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+        if (lop_makes_hprpc(&loi->loi_write_lop) ||
+            lop_makes_hprpc(&loi->loi_read_lop)) {
+                /* HP rpc */
+                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
+                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
+        } else {
+                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+                        lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
+                        lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+        }
 
         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
                 loi->loi_write_lop.lop_num_pending);
@@ -2012,8 +2085,10 @@ void osc_oap_to_pending(struct osc_async_page *oap)
         else
                 lop = &oap->oap_loi->loi_read_lop;
 
-        if (oap->oap_async_flags & ASYNC_URGENT)
+        if (oap->oap_async_flags & ASYNC_HP)
                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+        else if (oap->oap_async_flags & ASYNC_URGENT)
+                list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
 }
@@ -2113,6 +2188,9 @@ static int brw_interpret(const struct lu_env *env,
                 int i;
                 for (i = 0; i < aa->aa_page_count; i++)
                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
+               
+                if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
+                        OBDO_FREE(aa->aa_oa);
         }
         osc_wake_cache_waiters(cli);
         osc_check_rpcs(env, cli);
@@ -2273,6 +2351,15 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
         struct cl_object *clob = NULL;
         ENTRY;
 
+        /* If there are HP OAPs we need to handle at least 1 of them,
+         * move it the beginning of the pending list for that. */
+        if (!list_empty(&lop->lop_urgent)) {
+                oap = list_entry(lop->lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                if (oap->oap_async_flags & ASYNC_HP)
+                        list_move(&oap->oap_pending_item, &lop->lop_pending);
+        }
+
         /* first we find the pages we're allowed to work with */
         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
                                  oap_pending_item) {
@@ -2483,7 +2570,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 
 #define LOI_DEBUG(LOI, STR, args...)                                     \
         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
-               !list_empty(&(LOI)->loi_cli_item),                        \
+               !list_empty(&(LOI)->loi_ready_item) ||                    \
+               !list_empty(&(LOI)->loi_hp_ready_item),                   \
                (LOI)->loi_write_lop.lop_num_pending,                     \
                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
                (LOI)->loi_read_lop.lop_num_pending,                      \
@@ -2495,11 +2583,16 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
 {
         ENTRY;
-        /* first return all objects which we already know to have
-         * pages ready to be stuffed into rpcs */
+
+        /* First return objects that have blocked locks so that they
+         * will be flushed quickly and other clients can get the lock,
+         * then objects which have pages ready to be stuffed into RPCs */
+        if (!list_empty(&cli->cl_loi_hp_ready_list))
+                RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
+                                  struct lov_oinfo, loi_hp_ready_item));
         if (!list_empty(&cli->cl_loi_ready_list))
                 RETURN(list_entry(cli->cl_loi_ready_list.next,
-                                  struct lov_oinfo, loi_cli_item));
+                                  struct lov_oinfo, loi_ready_item));
 
         /* then if we have cache waiters, return all objects with queued
          * writes.  This is especially important when many small files
@@ -2523,6 +2616,26 @@ struct lov_oinfo *osc_next_loi(struct client_obd *cli)
         RETURN(NULL);
 }
 
+static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
+{
+        struct osc_async_page *oap;
+        int hprpc = 0;
+
+        if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
+                oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+        }
+
+        if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
+                oap = list_entry(loi->loi_read_lop.lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+        }
+
+        return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
+}
+
 /* called with the loi list lock held */
 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 {
@@ -2533,7 +2646,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
         while ((loi = osc_next_loi(cli)) != NULL) {
                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
 
-                if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
+                if (osc_max_rpc_in_flight(cli, loi))
                         break;
 
                 /* attempt some read/write balancing by alternating between
@@ -2565,8 +2678,10 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 
                 /* attempt some inter-object balancing by issueing rpcs
                  * for each object in turn */
-                if (!list_empty(&loi->loi_cli_item))
-                        list_del_init(&loi->loi_cli_item);
+                if (!list_empty(&loi->loi_hp_ready_item))
+                        list_del_init(&loi->loi_hp_ready_item);
+                if (!list_empty(&loi->loi_ready_item))
+                        list_del_init(&loi->loi_ready_item);
                 if (!list_empty(&loi->loi_write_item))
                         list_del_init(&loi->loi_write_item);
                 if (!list_empty(&loi->loi_read_item))
@@ -2755,6 +2870,7 @@ int osc_queue_async_io(const struct lu_env *env,
         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
                 struct cl_object *obj;
                 struct cl_attr    attr; /* XXX put attr into thread info */
+                unsigned int qid[MAXQUOTAS];
 
                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
 
@@ -2762,8 +2878,10 @@ int osc_queue_async_io(const struct lu_env *env,
                 rc = cl_object_attr_get(env, obj, &attr);
                 cl_object_attr_unlock(obj);
 
-                if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
-                                            attr.cat_gid) == NO_QUOTA)
+                qid[USRQUOTA] = attr.cat_uid;
+                qid[GRPQUOTA] = attr.cat_gid;
+                if (rc == 0 &&
+                    lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
                         rc = -EDQUOT;
                 if (rc)
                         RETURN(rc);
@@ -2779,6 +2897,9 @@ int osc_queue_async_io(const struct lu_env *env,
         oap->oap_page_off = off;
         oap->oap_count = count;
         oap->oap_brw_flags = brw_flags;
+        /* Give a hint to OST that requests are coming from kswapd - bug19529 */
+        if (libcfs_memory_pressure_get())
+                oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
         oap->oap_async_flags = async_flags;
 
         if (cmd & OBD_BRW_WRITE) {
@@ -2811,8 +2932,7 @@ int osc_set_async_flags_base(struct client_obd *cli,
         struct loi_oap_pages *lop;
         ENTRY;
 
-        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
-                RETURN(-EIO);
+        LASSERT(!list_empty(&oap->oap_pending_item));
 
         if (oap->oap_cmd & OBD_BRW_WRITE) {
                 lop = &loi->loi_write_lop;
@@ -2820,20 +2940,20 @@ int osc_set_async_flags_base(struct client_obd *cli,
                 lop = &loi->loi_read_lop;
         }
 
-        if (list_empty(&oap->oap_pending_item))
-                RETURN(-EINVAL);
-
         if ((oap->oap_async_flags & async_flags) == async_flags)
                 RETURN(0);
 
         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
                 oap->oap_async_flags |= ASYNC_READY;
 
-        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
-                if (list_empty(&oap->oap_rpc_item)) {
+        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
+            list_empty(&oap->oap_rpc_item)) {
+                if (oap->oap_async_flags & ASYNC_HP)
                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
-                        loi_list_maint(cli, loi);
-                }
+                else
+                        list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+                oap->oap_async_flags |= ASYNC_URGENT;
+                loi_list_maint(cli, loi);
         }
 
         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
@@ -2874,7 +2994,7 @@ int osc_teardown_async_page(struct obd_export *exp,
 
         if (!list_empty(&oap->oap_urgent_item)) {
                 list_del_init(&oap->oap_urgent_item);
-                oap->oap_async_flags &= ~ASYNC_URGENT;
+                oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
         }
         if (!list_empty(&oap->oap_pending_item)) {
                 list_del_init(&oap->oap_pending_item);
@@ -2994,6 +3114,9 @@ static int osc_enqueue_interpret(const struct lu_env *env,
         /* Complete osc stuff. */
         rc = osc_enqueue_fini(req, aa->oa_lvb,
                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
+
+        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
+
         /* Release the lock for async request. */
         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
                 /*
@@ -3286,6 +3409,10 @@ static int osc_statfs_interpret(const struct lu_env *env,
         struct obd_statfs *msfs;
         ENTRY;
 
+        if ((rc == -ENOTCONN || rc == -EAGAIN) &&
+            (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
+                GOTO(out, rc = 0);
+
         if (rc != 0)
                 GOTO(out, rc);
 
@@ -3591,6 +3718,7 @@ static int osc_get_info(struct obd_export *exp, obd_count keylen,
                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
                 memcpy(tmp, key, keylen);
 
+                req->rq_no_delay = req->rq_no_resend = 1;
                 ptlrpc_request_set_replen(req);
                 rc = ptlrpc_queue_wait(req);
                 if (rc)
@@ -3652,27 +3780,20 @@ static int osc_get_info(struct obd_export *exp, obd_count keylen,
         RETURN(-EINVAL);
 }
 
-static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
-                                          struct ptlrpc_request *req,
-                                          void *aa, int rc)
+static int osc_setinfo_mds_connect_import(struct obd_import *imp)
 {
         struct llog_ctxt *ctxt;
-        struct obd_import *imp = req->rq_import;
+        int rc = 0;
         ENTRY;
 
-        if (rc != 0)
-                RETURN(rc);
-
         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
         if (ctxt) {
-                if (rc == 0)
-                        rc = llog_initiator_connect(ctxt);
-                else
-                        CERROR("cannot establish connection for "
-                               "ctxt %p: %d\n", ctxt, rc);
+                rc = llog_initiator_connect(ctxt);
+                llog_ctxt_put(ctxt);
+        } else {
+                /* XXX return an error? skip setting below flags? */
         }
 
-        llog_ctxt_put(ctxt);
         spin_lock(&imp->imp_lock);
         imp->imp_server_timeout = 1;
         imp->imp_pingable = 1;
@@ -3682,6 +3803,17 @@ static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
         RETURN(rc);
 }
 
+static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
+                                          struct ptlrpc_request *req,
+                                          void *aa, int rc)
+{
+        ENTRY;
+        if (rc != 0)
+                RETURN(rc);
+
+        RETURN(osc_setinfo_mds_connect_import(req->rq_import));
+}
+
 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                               void *key, obd_count vallen, void *val,
                               struct ptlrpc_request_set *set)
@@ -3755,12 +3887,12 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
            Even if something bad goes through, we'd get a -EINVAL from OST
            anyway. */
 
-       if (KEY_IS(KEY_GRANT_SHRINK))  
-                       req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO); 
-       else 
-               req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
-        
-       if (req == NULL)
+        if (KEY_IS(KEY_GRANT_SHRINK))
+                req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
+        else
+                req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
+
+        if (req == NULL)
                 RETURN(-ENOMEM);
 
         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
@@ -3784,6 +3916,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
+                req->rq_no_delay = req->rq_no_resend = 1;
                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
                 struct osc_grant_args *aa;
@@ -3798,18 +3931,18 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                 }
                 *oa = ((struct ost_body *)val)->oa;
                 aa->aa_oa = oa;
-               req->rq_interpret_reply = osc_shrink_grant_interpret;
-       }
-               
-       ptlrpc_request_set_replen(req);
-       if (!KEY_IS(KEY_GRANT_SHRINK)) {
-               LASSERT(set != NULL);
-               ptlrpc_set_add_req(set, req);
-               ptlrpc_check_set(NULL, set);
-       } else 
-               ptlrpcd_add_req(req, PSCOPE_OTHER);
-        
-       RETURN(0);
+                req->rq_interpret_reply = osc_shrink_grant_interpret;
+        }
+
+        ptlrpc_request_set_replen(req);
+        if (!KEY_IS(KEY_GRANT_SHRINK)) {
+                LASSERT(set != NULL);
+                ptlrpc_set_add_req(set, req);
+                ptlrpc_check_set(NULL, set);
+        } else
+                ptlrpcd_add_req(req, PSCOPE_OTHER);
+
+        RETURN(0);
 }
 
 
@@ -3930,8 +4063,26 @@ static int osc_disconnect(struct obd_export *exp)
                        obd);
         }
 
-        osc_del_shrink_grant(&obd->u.cli);
         rc = client_disconnect_export(exp);
+        /**
+         * Initially we put del_shrink_grant before disconnect_export, but it
+         * causes the following problem if setup (connect) and cleanup
+         * (disconnect) are tangled together.
+         *      connect p1                     disconnect p2
+         *   ptlrpc_connect_import
+         *     ...............               class_manual_cleanup
+         *                                     osc_disconnect
+         *                                     del_shrink_grant
+         *   ptlrpc_connect_interrupt
+         *     init_grant_shrink
+         *   add this client to shrink list
+         *                                      cleanup_osc
+         * Bang! pinger trigger the shrink.
+         * So the osc should be disconnected from the shrink list, after we
+         * are sure the import has been destroyed. BUG18662
+         */
+        if (obd->u.cli.cl_import == NULL)
+                osc_del_shrink_grant(&obd->u.cli);
         return rc;
 }
 
@@ -4036,6 +4187,7 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 struct lprocfs_static_vars lvars = { 0 };
                 struct client_obd *cli = &obd->u.cli;
 
+                cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
                 lprocfs_osc_init_vars(&lvars);
                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
                         lproc_osc_attach_seqstat(obd);
@@ -4053,7 +4205,7 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
                                             OST_MAXREQSIZE,
                                             ptlrpc_add_rqs_to_pool);
-               
+
                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
                 sema_init(&cli->cl_grant_sem, 1);
         }
@@ -4100,25 +4252,19 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
                 if (rc != 0)
                         CERROR("failed to cleanup llogging subsystems\n");
                 break;
-               }
+                }
         }
         RETURN(rc);
 }
 
 int osc_cleanup(struct obd_device *obd)
 {
-        struct osc_creator *oscc = &obd->u.cli.cl_oscc;
         int rc;
 
         ENTRY;
         ptlrpc_lprocfs_unregister_obd(obd);
         lprocfs_obd_cleanup(obd);
 
-        spin_lock(&oscc->oscc_lock);
-        oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
-        oscc->oscc_flags |= OSCC_FLAG_EXITING;
-        spin_unlock(&oscc->oscc_lock);
-
         /* free memory of osc quota cache */
         lquota_cleanup(quota_interface, obd);
 
@@ -4139,8 +4285,8 @@ int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
         default:
                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
                                               lcfg, obd);
-               if (rc > 0)
-                       rc = 0;
+                if (rc > 0)
+                        rc = 0;
                 break;
         }
 
@@ -4168,6 +4314,7 @@ struct obd_ops osc_obd_ops = {
         .o_unpackmd             = osc_unpackmd,
         .o_precreate            = osc_precreate,
         .o_create               = osc_create,
+        .o_create_async         = osc_create_async,
         .o_destroy              = osc_destroy,
         .o_getattr              = osc_getattr,
         .o_getattr_async        = osc_getattr_async,