Whamcloud - gitweb
land clio.
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 299b3c7..2097a34 100644 (file)
@@ -61,7 +61,6 @@
 #include <lustre_log.h>
 #include <lustre_debug.h>
 #include <lustre_param.h>
-#include <lustre_cache.h>
 #include "osc_internal.h"
 
 static quota_interface_t *quota_interface = NULL;
@@ -399,7 +398,7 @@ static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
         /* do mds to ost setattr asynchronously */
         if (!rqset) {
                 /* Do not wait for response. */
-                ptlrpcd_add_req(req);
+                ptlrpcd_add_req(req, PSCOPE_OTHER);
         } else {
                 req->rq_interpret_reply =
                         (ptlrpc_interpterer_t)osc_setattr_interpret;
@@ -501,7 +500,7 @@ out:
 
 static int osc_punch_interpret(const struct lu_env *env,
                                struct ptlrpc_request *req,
-                               struct osc_async_args *aa, int rc)
+                               struct osc_punch_args *aa, int rc)
 {
         struct ost_body *body;
         ENTRY;
@@ -513,32 +512,28 @@ static int osc_punch_interpret(const struct lu_env *env,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        *aa->aa_oi->oi_oa = body->oa;
+        *aa->pa_oa = body->oa;
 out:
-        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
+        rc = aa->pa_upcall(aa->pa_cookie, rc);
         RETURN(rc);
 }
 
-static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
-                     struct obd_trans_info *oti,
-                     struct ptlrpc_request_set *rqset)
+int osc_punch_base(struct obd_export *exp, struct obdo *oa,
+                   struct obd_capa *capa,
+                   obd_enqueue_update_f upcall, void *cookie,
+                   struct ptlrpc_request_set *rqset)
 {
         struct ptlrpc_request *req;
-        struct osc_async_args *aa;
+        struct osc_punch_args *aa;
         struct ost_body       *body;
         int                    rc;
         ENTRY;
 
-        if (!oinfo->oi_oa) {
-                CDEBUG(D_INFO, "oa NULL\n");
-                RETURN(-EINVAL);
-        }
-
         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
+        osc_set_capa_size(req, &RMF_CAPA1, capa);
         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
         if (rc) {
                 ptlrpc_request_free(req);
@@ -546,26 +541,40 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
         }
         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
         ptlrpc_at_set_req_timeout(req);
-        osc_pack_req_body(req, oinfo);
 
-        /* overload the size and blocks fields in the oa with start/end */
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
-        body->oa.o_size = oinfo->oi_policy.l_extent.start;
-        body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
-        body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
+        body->oa = *oa;
+        osc_pack_capa(req, body, capa);
+
         ptlrpc_request_set_replen(req);
 
 
         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = ptlrpc_req_async_args(req);
-        aa->aa_oi = oinfo;
-        ptlrpc_set_add_req(rqset, req);
+        aa->pa_oa     = oa;
+        aa->pa_upcall = upcall;
+        aa->pa_cookie = cookie;
+        if (rqset == PTLRPCD_SET)
+                ptlrpcd_add_req(req, PSCOPE_OTHER);
+        else
+                ptlrpc_set_add_req(rqset, req);
 
         RETURN(0);
 }
 
+static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
+                     struct obd_trans_info *oti,
+                     struct ptlrpc_request_set *rqset)
+{
+        oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
+        oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
+        oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+        return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
+                              oinfo->oi_cb_up, oinfo, rqset);
+}
+
 static int osc_sync(struct obd_export *exp, struct obdo *oa,
                     struct lov_stripe_md *md, obd_size start, obd_size end,
                     void *capa)
@@ -739,7 +748,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         }
 
         /* Do not wait for response */
-        ptlrpcd_add_req(req);
+        ptlrpcd_add_req(req, PSCOPE_OTHER);
         RETURN(0);
 }
 
@@ -753,13 +762,16 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
         oa->o_valid |= bits;
         client_obd_list_lock(&cli->cl_loi_list_lock);
         oa->o_dirty = cli->cl_dirty;
-        if (cli->cl_dirty > cli->cl_dirty_max) {
-                CERROR("dirty %lu > dirty_max %lu\n",
-                       cli->cl_dirty, cli->cl_dirty_max);
+        if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
+                CERROR("dirty %lu - %lu > dirty_max %lu\n",
+                       cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
                 oa->o_undirty = 0;
-        } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
-                CERROR("dirty %d > system dirty_max %d\n",
-                       atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
+        } else if (atomic_read(&obd_dirty_pages) -
+                   atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
+                CERROR("dirty %d - %d > system dirty_max %d\n",
+                       atomic_read(&obd_dirty_pages),
+                       atomic_read(&obd_dirty_transit_pages),
+                       obd_max_dirty_pages);
                 oa->o_undirty = 0;
         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
                 CERROR("dirty %lu - dirty_max %lu too big???\n",
@@ -782,6 +794,7 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
 static void osc_consume_write_grant(struct client_obd *cli,
                                     struct brw_page *pga)
 {
+        LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
         atomic_inc(&obd_dirty_pages);
         cli->cl_dirty += CFS_PAGE_SIZE;
         cli->cl_avail_grant -= CFS_PAGE_SIZE;
@@ -807,6 +820,11 @@ static void osc_release_write_grant(struct client_obd *cli,
         pga->flag &= ~OBD_BRW_FROM_GRANT;
         atomic_dec(&obd_dirty_pages);
         cli->cl_dirty -= CFS_PAGE_SIZE;
+        if (pga->flag & OBD_BRW_NOCACHE) {
+                pga->flag &= ~OBD_BRW_NOCACHE;
+                atomic_dec(&obd_dirty_transit_pages);
+                cli->cl_dirty_transit -= CFS_PAGE_SIZE;
+        }
         if (!sent) {
                 cli->cl_lost_grant += CFS_PAGE_SIZE;
                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
@@ -977,7 +995,7 @@ static int check_write_rcs(struct ptlrpc_request *req,
 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
 {
         if (p1->flag != p2->flag) {
-                unsigned mask = ~OBD_BRW_FROM_GRANT;
+                unsigned mask = ~(OBD_BRW_FROM_GRANT|OBD_BRW_NOCACHE);
 
                 /* warn if we try to combine flags that we don't know to be
                  * safe to combine */
@@ -1538,63 +1556,6 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         RETURN(0);
 }
 
-static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
-                          struct lov_stripe_md *lsm, obd_count page_count,
-                          struct brw_page **pga, struct ptlrpc_request_set *set,
-                          struct obd_capa *ocapa)
-{
-        struct ptlrpc_request     *req;
-        struct client_obd         *cli = &exp->exp_obd->u.cli;
-        int                        rc, i;
-        struct osc_brw_async_args *aa;
-        ENTRY;
-
-        /* Consume write credits even if doing a sync write -
-         * otherwise we may run out of space on OST due to grant. */
-        if (cmd == OBD_BRW_WRITE) {
-                spin_lock(&cli->cl_loi_list_lock);
-                for (i = 0; i < page_count; i++) {
-                        if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
-                                osc_consume_write_grant(cli, pga[i]);
-                }
-                spin_unlock(&cli->cl_loi_list_lock);
-        }
-
-        rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
-                                  &req, ocapa);
-
-        aa = ptlrpc_req_async_args(req);
-        if (cmd == OBD_BRW_READ) {
-                lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
-                lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
-        } else {
-                lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
-                lprocfs_oh_tally(&cli->cl_write_rpc_hist,
-                                 cli->cl_w_in_flight);
-        }
-        ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
-
-        LASSERT(list_empty(&aa->aa_oaps));
-        if (rc == 0) {
-                req->rq_interpret_reply = brw_interpret;
-                ptlrpc_set_add_req(set, req);
-                client_obd_list_lock(&cli->cl_loi_list_lock);
-                if (cmd == OBD_BRW_READ)
-                        cli->cl_r_in_flight++;
-                else
-                        cli->cl_w_in_flight++;
-                client_obd_list_unlock(&cli->cl_loi_list_lock);
-                OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
-        } else if (cmd == OBD_BRW_WRITE) {
-                client_obd_list_lock(&cli->cl_loi_list_lock);
-                for (i = 0; i < page_count; i++)
-                        osc_release_write_grant(cli, pga[i], 0);
-                osc_wake_cache_waiters(cli);
-                client_obd_list_unlock(&cli->cl_loi_list_lock);
-        }
-        RETURN (rc);
-}
-
 /*
  * ugh, we want disk allocation on the target to happen in offset order.  we'll
  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
@@ -1743,76 +1704,6 @@ out:
         RETURN(rc);
 }
 
-static int osc_brw_async(int cmd, struct obd_export *exp,
-                         struct obd_info *oinfo, obd_count page_count,
-                         struct brw_page *pga, struct obd_trans_info *oti,
-                         struct ptlrpc_request_set *set)
-{
-        struct brw_page **ppga, **orig;
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        int page_count_orig;
-        int rc = 0;
-        ENTRY;
-
-        if (cmd & OBD_BRW_CHECK) {
-                struct obd_import *imp = class_exp2cliimp(exp);
-                /* The caller just wants to know if there's a chance that this
-                 * I/O can succeed */
-
-                if (imp == NULL || imp->imp_invalid)
-                        RETURN(-EIO);
-                RETURN(0);
-        }
-
-        orig = ppga = osc_build_ppga(pga, page_count);
-        if (ppga == NULL)
-                RETURN(-ENOMEM);
-        page_count_orig = page_count;
-
-        sort_brw_pages(ppga, page_count);
-        while (page_count) {
-                struct brw_page **copy;
-                obd_count pages_per_brw;
-
-                pages_per_brw = min_t(obd_count, page_count,
-                                      cli->cl_max_pages_per_rpc);
-
-                pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
-
-                /* use ppga only if single RPC is going to fly */
-                if (pages_per_brw != page_count_orig || ppga != orig) {
-                        OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
-                        if (copy == NULL)
-                                GOTO(out, rc = -ENOMEM);
-                        memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
-                } else
-                        copy = ppga;
-
-                rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
-                                    pages_per_brw, copy, set, oinfo->oi_capa);
-
-                if (rc != 0) {
-                        if (copy != ppga)
-                                OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
-                        break;
-                }
-                if (copy == orig) {
-                        /* we passed it to async_internal() which is
-                         * now responsible for releasing memory */
-                        orig = NULL;
-                }
-
-                page_count -= pages_per_brw;
-                ppga += pages_per_brw;
-        }
-out:
-        if (orig)
-                osc_release_ppga(orig, page_count_orig);
-        RETURN(rc);
-}
-
-static void osc_check_rpcs(struct client_obd *cli);
-
 /* The companion to osc_enter_cache(), called when @oap is no longer part of
  * the dirty accounting.  Writeback completes or truncate happens before
  * writing starts.  Must be called with the loi lock held. */
@@ -1883,7 +1774,7 @@ static void on_list(struct list_head *item, struct list_head *list,
 
 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
  * can find pages to build into rpcs quickly */
-static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
+void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
 {
         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
@@ -1906,34 +1797,35 @@ static void lop_update_pending(struct client_obd *cli,
                 cli->cl_pending_r_pages += delta;
 }
 
-/* this is called when a sync waiter receives an interruption.  Its job is to
+/**
+ * this is called when a sync waiter receives an interruption.  Its job is to
  * get the caller woken as soon as possible.  If its page hasn't been put in an
  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
  * desiring interruption which will forcefully complete the rpc once the rpc
- * has timed out */
-static void osc_occ_interrupted(struct oig_callback_context *occ)
+ * has timed out.
+ */
+int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
 {
-        struct osc_async_page *oap;
         struct loi_oap_pages *lop;
         struct lov_oinfo *loi;
+        int rc = -EBUSY;
         ENTRY;
 
-        /* XXX member_of() */
-        oap = list_entry(occ, struct osc_async_page, oap_occ);
-
-        client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
-
+        LASSERT(!oap->oap_interrupted);
         oap->oap_interrupted = 1;
 
         /* ok, it's been put in an rpc. only one oap gets a request reference */
         if (oap->oap_request != NULL) {
                 ptlrpc_mark_interrupted(oap->oap_request);
                 ptlrpcd_wake(oap->oap_request);
-                GOTO(unlock, 0);
+                ptlrpc_req_finished(oap->oap_request);
+                oap->oap_request = NULL;
         }
 
-        /* we don't get interruption callbacks until osc_trigger_group_io()
-         * has been called and put the sync oaps in the pending/urgent lists.*/
+        /*
+         * page completion may be called only if ->cpo_prep() method was
+         * executed by osc_io_submit(), that also adds page the to pending list
+         */
         if (!list_empty(&oap->oap_pending_item)) {
                 list_del_init(&oap->oap_pending_item);
                 list_del_init(&oap->oap_urgent_item);
@@ -1943,13 +1835,12 @@ static void osc_occ_interrupted(struct oig_callback_context *occ)
                         &loi->loi_write_lop : &loi->loi_read_lop;
                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
                 loi_list_maint(oap->oap_cli, oap->oap_loi);
-
-                oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
-                oap->oap_oig = NULL;
+                rc = oap->oap_caller_ops->ap_completion(env,
+                                          oap->oap_caller_data,
+                                          oap->oap_cmd, NULL, -EINTR);
         }
 
-unlock:
-        client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
+        RETURN(rc);
 }
 
 /* this is trying to propogate async writeback errors back up to the
@@ -1974,7 +1865,7 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
                 ar->ar_force_sync = 0;
 }
 
-static void osc_oap_to_pending(struct osc_async_page *oap)
+void osc_oap_to_pending(struct osc_async_page *oap)
 {
         struct loi_oap_pages *lop;
 
@@ -1991,7 +1882,8 @@ static void osc_oap_to_pending(struct osc_async_page *oap)
 
 /* this must be called holding the loi list lock to give coverage to exit_cache,
  * async_flag maintenance, and oap_request */
-static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
+static void osc_ap_completion(const struct lu_env *env,
+                              struct client_obd *cli, struct obdo *oa,
                               struct osc_async_page *oap, int sent, int rc)
 {
         __u64 xid = 0;
@@ -2022,15 +1914,7 @@ static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
         }
 
-        if (oap->oap_oig) {
-                osc_exit_cache(cli, oap, sent);
-                oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
-                oap->oap_oig = NULL;
-                EXIT;
-                return;
-        }
-
-        rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
+        rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
                                                 oap->oap_cmd, oa, rc);
 
         /* ll_ap_completion (from llite) drops PG_locked. so, a new
@@ -2049,6 +1933,7 @@ static int brw_interpret(const struct lu_env *env,
 {
         struct osc_brw_async_args *aa = data;
         struct client_obd *cli;
+        int async;
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
@@ -2071,13 +1956,14 @@ static int brw_interpret(const struct lu_env *env,
         else
                 cli->cl_r_in_flight--;
 
-        if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
+        async = list_empty(&aa->aa_oaps);
+        if (!async) { /* from osc_send_oap_rpc() */
                 struct osc_async_page *oap, *tmp;
                 /* the caller may re-use the oap after the completion call so
                  * we need to clean it up a little */
                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
                         list_del_init(&oap->oap_rpc_item);
-                        osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
+                        osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
                 }
                 OBDO_FREE(aa->aa_oa);
         } else { /* from async_internal() */
@@ -2086,14 +1972,16 @@ static int brw_interpret(const struct lu_env *env,
                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
         }
         osc_wake_cache_waiters(cli);
-        osc_check_rpcs(cli);
+        osc_check_rpcs(env, cli);
         client_obd_list_unlock(&cli->cl_loi_list_lock);
-
+        if (!async)
+                cl_req_completion(env, aa->aa_clerq, rc);
         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
         RETURN(rc);
 }
 
-static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
+static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
+                                            struct client_obd *cli,
                                             struct list_head *rpc_list,
                                             int page_count, int cmd)
 {
@@ -2101,19 +1989,24 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
         struct brw_page **pga = NULL;
         struct osc_brw_async_args *aa;
         struct obdo *oa = NULL;
-        struct obd_async_page_ops *ops = NULL;
+        const struct obd_async_page_ops *ops = NULL;
         void *caller_data = NULL;
-        struct obd_capa *ocapa;
         struct osc_async_page *oap;
+        struct osc_async_page *tmp;
+        struct ost_body *body;
+        struct cl_req *clerq = NULL;
+        enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
         struct ldlm_lock *lock = NULL;
+        struct cl_req_attr crattr;
         int i, rc;
 
         ENTRY;
         LASSERT(!list_empty(rpc_list));
 
+        memset(&crattr, 0, sizeof crattr);
         OBD_ALLOC(pga, sizeof(*pga) * page_count);
         if (pga == NULL)
-                RETURN(ERR_PTR(-ENOMEM));
+                GOTO(out, req = ERR_PTR(-ENOMEM));
 
         OBDO_ALLOC(oa);
         if (oa == NULL)
@@ -2121,9 +2014,16 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
 
         i = 0;
         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
+                struct cl_page *page = osc_oap2cl_page(oap);
                 if (ops == NULL) {
                         ops = oap->oap_caller_ops;
                         caller_data = oap->oap_caller_data;
+
+                        clerq = cl_req_alloc(env, page, crt,
+                                             1 /* only 1-object rpcs for
+                                                * now */);
+                        if (IS_ERR(clerq))
+                                GOTO(out, req = (void *)clerq);
                         lock = oap->oap_ldlm_lock;
                 }
                 pga[i] = &oap->oap_brw_page;
@@ -2131,21 +2031,28 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
                 i++;
+                cl_req_page_add(env, clerq, page);
         }
 
         /* always get the data for the obdo for the rpc */
         LASSERT(ops != NULL);
-        ops->ap_fill_obdo(caller_data, cmd, oa);
-        ocapa = ops->ap_lookup_capa(caller_data, cmd);
+        crattr.cra_oa = oa;
+        crattr.cra_capa = NULL;
+        cl_req_attr_set(env, clerq, &crattr, ~0ULL);
         if (lock) {
                 oa->o_handle = lock->l_remote_handle;
                 oa->o_valid |= OBD_MD_FLHANDLE;
         }
 
+        rc = cl_req_prep(env, clerq);
+        if (rc != 0) {
+                CERROR("cl_req_prep failed: %d\n", rc);
+                GOTO(out, req = ERR_PTR(rc));
+        }
+
         sort_brw_pages(pga, page_count);
         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
-                                  pga, &req, ocapa);
-        capa_put(ocapa);
+                                  pga, &req, crattr.cra_capa);
         if (rc != 0) {
                 CERROR("prep_req failed: %d\n", rc);
                 GOTO(out, req = ERR_PTR(rc));
@@ -2156,27 +2063,45 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
          * later setattr before earlier BRW (as determined by the request xid),
          * the OST will not use BRW timestamps.  Sadly, there is no obvious
          * way to do this in a single call.  bug 10150 */
-        ops->ap_update_obdo(caller_data, cmd, oa,
-                            OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+        cl_req_attr_set(env, clerq, &crattr,
+                        OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
 
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = ptlrpc_req_async_args(req);
         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
         list_splice(rpc_list, &aa->aa_oaps);
         CFS_INIT_LIST_HEAD(rpc_list);
-
+        aa->aa_clerq = clerq;
 out:
+        capa_put(crattr.cra_capa);
         if (IS_ERR(req)) {
                 if (oa)
                         OBDO_FREE(oa);
                 if (pga)
                         OBD_FREE(pga, sizeof(*pga) * page_count);
+                /* this should happen rarely and is pretty bad, it makes the
+                 * pending list not follow the dirty order */
+                client_obd_list_lock(&cli->cl_loi_list_lock);
+                list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
+                        list_del_init(&oap->oap_rpc_item);
+
+                        /* queued sync pages can be torn down while the pages
+                         * were between the pending list and the rpc */
+                        if (oap->oap_interrupted) {
+                                CDEBUG(D_INODE, "oap %p interrupted\n", oap);
+                                osc_ap_completion(env, cli, NULL, oap, 0,
+                                                  oap->oap_count);
+                                continue;
+                        }
+                        osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
+                }
+                if (clerq && !IS_ERR(clerq))
+                        cl_req_completion(env, clerq, PTR_ERR(req));
         }
         RETURN(req);
 }
 
-/* the loi lock is held across this function but it's allowed to release
- * and reacquire it during its work */
 /**
  * prepare pages for ASYNC io and put pages in send queue.
  *
@@ -2188,18 +2113,21 @@ out:
  * \return zero if pages successfully add to send queue.
  * \return not zere if error occurring.
  */
-static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
-                            int cmd, struct loi_oap_pages *lop)
+static int
+osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
+                 struct lov_oinfo *loi,
+                 int cmd, struct loi_oap_pages *lop)
 {
         struct ptlrpc_request *req;
         obd_count page_count = 0;
         struct osc_async_page *oap = NULL, *tmp;
         struct osc_brw_async_args *aa;
-        struct obd_async_page_ops *ops;
+        const struct obd_async_page_ops *ops;
         CFS_LIST_HEAD(rpc_list);
         unsigned int ending_offset;
         unsigned  starting_offset = 0;
         int srvlock = 0;
+        struct cl_object *clob = NULL;
         ENTRY;
 
         /* first we find the pages we're allowed to work with */
@@ -2209,6 +2137,13 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
 
                 LASSERT(oap->oap_magic == OAP_MAGIC);
 
+                if (clob == NULL) {
+                        /* pin object in memory, so that completion call-backs
+                         * can be safely called under client_obd_list lock. */
+                        clob = osc_oap2cl_page(oap)->cp_obj;
+                        cl_object_get(clob);
+                }
+
                 if (page_count != 0 &&
                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
@@ -2226,7 +2161,8 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                  * will still be on the dirty list).  we could call in
                  * at the end of ll_file_write to process the queue again. */
                 if (!(oap->oap_async_flags & ASYNC_READY)) {
-                        int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
+                        int rc = ops->ap_make_ready(env, oap->oap_caller_data,
+                                                    cmd);
                         if (rc < 0)
                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
                                                 "instead of ready\n", oap,
@@ -2264,11 +2200,20 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                  * ->ap_make_ready() or by higher layers.
                  */
 #if defined(__KERNEL__) && defined(__linux__)
-                 if(!(PageLocked(oap->oap_page) &&
-                     (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
-                       CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
-                               oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
-                        LBUG();
+                {
+                        struct cl_page *page;
+
+                        page = osc_oap2cl_page(oap);
+
+                        if (page->cp_type == CPT_CACHEABLE &&
+                            !(PageLocked(oap->oap_page) &&
+                              (CheckWriteback(oap->oap_page, cmd)))) {
+                                CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
+                                       oap->oap_page,
+                                       (long)oap->oap_page->flags,
+                                       oap->oap_async_flags);
+                                LBUG();
+                        }
                 }
 #endif
                 /* If there is a gap at the start of this page, it can't merge
@@ -2287,13 +2232,17 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                                           (PTLRPC_MAX_BRW_SIZE - 1);
 
                 /* ask the caller for the size of the io as the rpc leaves. */
-                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
+                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
                         oap->oap_count =
-                                ops->ap_refresh_count(oap->oap_caller_data,cmd);
+                                ops->ap_refresh_count(env, oap->oap_caller_data,
+                                                      cmd);
+                        LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
+                }
                 if (oap->oap_count <= 0) {
                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
                                oap->oap_count);
-                        osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
+                        osc_ap_completion(env, cli, NULL,
+                                          oap, 0, oap->oap_count);
                         continue;
                 }
 
@@ -2322,31 +2271,21 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
 
         osc_wake_cache_waiters(cli);
 
-        if (page_count == 0)
-                RETURN(0);
-
         loi_list_maint(cli, loi);
 
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-        req = osc_build_req(cli, &rpc_list, page_count, cmd);
-        if (IS_ERR(req)) {
-                /* this should happen rarely and is pretty bad, it makes the
-                 * pending list not follow the dirty order */
+        if (clob != NULL)
+                cl_object_put(env, clob);
+
+        if (page_count == 0) {
                 client_obd_list_lock(&cli->cl_loi_list_lock);
-                list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
-                        list_del_init(&oap->oap_rpc_item);
+                RETURN(0);
+        }
 
-                        /* queued sync pages can be torn down while the pages
-                         * were between the pending list and the rpc */
-                        if (oap->oap_interrupted) {
-                                CDEBUG(D_INODE, "oap %p interrupted\n", oap);
-                                osc_ap_completion(cli, NULL, oap, 0,
-                                                  oap->oap_count);
-                                continue;
-                        }
-                        osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
-                }
+        req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
+        if (IS_ERR(req)) {
+                LASSERT(list_empty(&rpc_list));
                 loi_list_maint(cli, loi);
                 RETURN(PTR_ERR(req));
         }
@@ -2394,7 +2333,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
 
         req->rq_interpret_reply = brw_interpret;
-        ptlrpcd_add_req(req);
+        ptlrpcd_add_req(req, PSCOPE_BRW);
         RETURN(1);
 }
 
@@ -2441,7 +2380,7 @@ struct lov_oinfo *osc_next_loi(struct client_obd *cli)
 }
 
 /* called with the loi list lock held */
-static void osc_check_rpcs(struct client_obd *cli)
+void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 {
         struct lov_oinfo *loi;
         int rc = 0, race_counter = 0;
@@ -2460,7 +2399,7 @@ static void osc_check_rpcs(struct client_obd *cli)
                  * partial read pending queue when we're given this object to
                  * do io on writes while there are cache waiters */
                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
-                        rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
+                        rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
                                               &loi->loi_write_lop);
                         if (rc < 0)
                                 break;
@@ -2470,7 +2409,7 @@ static void osc_check_rpcs(struct client_obd *cli)
                                 race_counter++;
                 }
                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
-                        rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
+                        rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
                                               &loi->loi_read_lop);
                         if (rc < 0)
                                 break;
@@ -2520,9 +2459,32 @@ static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
         RETURN(rc);
 };
 
+/**
+ * Non-blocking version of osc_enter_cache() that consumes grant only when it
+ * is available.
+ */
+int osc_enter_cache_try(const struct lu_env *env,
+                        struct client_obd *cli, struct lov_oinfo *loi,
+                        struct osc_async_page *oap, int transient)
+{
+        int has_grant;
+
+        has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
+        if (has_grant) {
+                osc_consume_write_grant(cli, &oap->oap_brw_page);
+                if (transient) {
+                        cli->cl_dirty_transit += CFS_PAGE_SIZE;
+                        atomic_inc(&obd_dirty_transit_pages);
+                        oap->oap_brw_flags |= OBD_BRW_NOCACHE;
+                }
+        }
+        return has_grant;
+}
+
 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
  * grant or cache space. */
-static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
+static int osc_enter_cache(const struct lu_env *env,
+                           struct client_obd *cli, struct lov_oinfo *loi,
                            struct osc_async_page *oap)
 {
         struct osc_cache_waiter ocw;
@@ -2542,13 +2504,10 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
                 RETURN(-EDQUOT);
 
         /* Hopefully normal case - cache space and write credits available */
-        if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
-            (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
-            (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
-                /* account for ourselves */
-                osc_consume_write_grant(cli, &oap->oap_brw_page);
+        if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
+            atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
+            osc_enter_cache_try(env, cli, loi, oap, 0))
                 RETURN(0);
-        }
 
         /* Make sure that there are write rpcs in flight to wait for.  This
          * is a little silly as this object may not have any pending but
@@ -2560,7 +2519,7 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
                 ocw.ocw_rc = 0;
 
                 loi_list_maint(cli, loi);
-                osc_check_rpcs(cli);
+                osc_check_rpcs(env, cli);
                 client_obd_list_unlock(&cli->cl_loi_list_lock);
 
                 CDEBUG(D_CACHE, "sleeping for cache space\n");
@@ -2577,84 +2536,15 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
         RETURN(-EDQUOT);
 }
 
-/**
- * Checks if requested extent lock is compatible with a lock under the page.
- *
- * Checks if the lock under \a page is compatible with a read or write lock
- * (specified by \a rw) for an extent [\a start , \a end].
- *
- * \param exp osc export
- * \param lsm striping information for the file
- * \param res osc_async_page placeholder
- * \param rw OBD_BRW_READ if requested for reading,
- *           OBD_BRW_WRITE if requested for writing
- * \param start start of the requested extent
- * \param end end of the requested extent
- * \param cookie transparent parameter for passing locking context
- *
- * \post result == 1, *cookie == context, appropriate lock is referenced or
- * \post result == 0
- *
- * \retval 1 owned lock is reused for the request
- * \retval 0 no lock reused for the request
- *
- * \see osc_release_short_lock
- */
-static int osc_reget_short_lock(struct obd_export *exp,
-                                struct lov_stripe_md *lsm,
-                                void **res, int rw,
-                                obd_off start, obd_off end,
-                                void **cookie)
-{
-        struct osc_async_page *oap = *res;
-        int rc;
-
-        ENTRY;
-
-        spin_lock(&oap->oap_lock);
-        rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
-                                  start, end, cookie);
-        spin_unlock(&oap->oap_lock);
-
-        RETURN(rc);
-}
-
-/**
- * Releases a reference to a lock taken in a "fast" way.
- *
- * Releases a read or a write (specified by \a rw) lock
- * referenced by \a cookie.
- *
- * \param exp osc export
- * \param lsm striping information for the file
- * \param end end of the locked extent
- * \param rw OBD_BRW_READ if requested for reading,
- *           OBD_BRW_WRITE if requested for writing
- * \param cookie transparent parameter for passing locking context
- *
- * \post appropriate lock is dereferenced
- *
- * \see osc_reget_short_lock
- */
-static int osc_release_short_lock(struct obd_export *exp,
-                                  struct lov_stripe_md *lsm, obd_off end,
-                                  void *cookie, int rw)
-{
-        ENTRY;
-        ldlm_lock_fast_release(cookie, rw);
-        /* no error could have happened at this layer */
-        RETURN(0);
-}
 
 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                         struct lov_oinfo *loi, cfs_page_t *page,
-                        obd_off offset, struct obd_async_page_ops *ops,
+                        obd_off offset, const struct obd_async_page_ops *ops,
                         void *data, void **res, int nocache,
                         struct lustre_handle *lockh)
 {
         struct osc_async_page *oap;
-        struct ldlm_res_id oid;
-        int rc = 0;
+
         ENTRY;
 
         if (!page)
@@ -2671,27 +2561,14 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         oap->oap_page = page;
         oap->oap_obj_off = offset;
 
+        LASSERT(!(offset & ~CFS_PAGE_MASK));
+
         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
 
-        oap->oap_occ.occ_interrupted = osc_occ_interrupted;
-
         spin_lock_init(&oap->oap_lock);
-
-        /* If the page was marked as notcacheable - don't add to any locks */
-        if (!nocache) {
-                osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
-                /* This is the only place where we can call cache_add_extent
-                   without oap_lock, because this page is locked now, and
-                   the lock we are adding it to is referenced, so cannot lose
-                   any pages either. */
-                rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
-                if (rc)
-                        RETURN(rc);
-        }
-
         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
         RETURN(0);
 }
@@ -2704,10 +2581,11 @@ struct osc_async_page *oap_from_cookie(void *cookie)
         return oap;
 };
 
-static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
-                              struct lov_oinfo *loi, void *cookie,
-                              int cmd, obd_off off, int count,
-                              obd_flag brw_flags, enum async_flags async_flags)
+int osc_queue_async_io(const struct lu_env *env,
+                       struct obd_export *exp, struct lov_stripe_md *lsm,
+                       struct lov_oinfo *loi, void *cookie,
+                       int cmd, obd_off off, int count,
+                       obd_flag brw_flags, enum async_flags async_flags)
 {
         struct client_obd *cli = &exp->exp_obd->u.cli;
         struct osc_async_page *oap;
@@ -2728,21 +2606,19 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         /* check if the file's owner/group is over quota */
 #ifdef HAVE_QUOTA_SUPPORT
-        if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
-                struct obd_async_page_ops *ops;
-                struct obdo *oa;
+        if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
+                struct cl_object *obj;
+                struct cl_attr    attr; /* XXX put attr into thread info */
 
-                OBDO_ALLOC(oa);
-                if (oa == NULL)
-                        RETURN(-ENOMEM);
+                obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
 
-                ops = oap->oap_caller_ops;
-                ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
-                if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
-                    NO_QUOTA)
-                        rc = -EDQUOT;
+                cl_object_attr_lock(obj);
+                rc = cl_object_attr_get(env, obj, &attr);
+                cl_object_attr_unlock(obj);
 
-                OBDO_FREE(oa);
+                if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
+                                            attr.cat_gid) == NO_QUOTA)
+                        rc = -EDQUOT;
                 if (rc)
                         RETURN(rc);
         }
@@ -2753,6 +2629,7 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         client_obd_list_lock(&cli->cl_loi_list_lock);
 
+        LASSERT(off + count <= CFS_PAGE_SIZE);
         oap->oap_cmd = cmd;
         oap->oap_page_off = off;
         oap->oap_count = count;
@@ -2760,7 +2637,7 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
         oap->oap_async_flags = async_flags;
 
         if (cmd & OBD_BRW_WRITE) {
-                rc = osc_enter_cache(cli, loi, oap);
+                rc = osc_enter_cache(env, cli, loi, oap);
                 if (rc) {
                         client_obd_list_unlock(&cli->cl_loi_list_lock);
                         RETURN(rc);
@@ -2773,7 +2650,7 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
                   cmd);
 
-        osc_check_rpcs(cli);
+        osc_check_rpcs(env, cli);
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 
         RETURN(0);
@@ -2782,50 +2659,27 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
 /* aka (~was & now & flag), but this is more clear :) */
 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
 
-static int osc_set_async_flags(struct obd_export *exp,
-                               struct lov_stripe_md *lsm,
-                               struct lov_oinfo *loi, void *cookie,
-                               obd_flag async_flags)
+int osc_set_async_flags_base(struct client_obd *cli,
+                             struct lov_oinfo *loi, struct osc_async_page *oap,
+                             obd_flag async_flags)
 {
-        struct client_obd *cli = &exp->exp_obd->u.cli;
         struct loi_oap_pages *lop;
-        struct osc_async_page *oap;
-        int rc = 0;
         ENTRY;
 
-        oap = oap_from_cookie(cookie);
-        if (IS_ERR(oap))
-                RETURN(PTR_ERR(oap));
-
-        /*
-         * bug 7311: OST-side locking is only supported for liblustre for now
-         * (and liblustre never calls obd_set_async_flags(). I hope.), generic
-         * implementation has to handle case where OST-locked page was picked
-         * up by, e.g., ->writepage().
-         */
-        LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
-        LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
-                                     * tread here. */
-
         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
                 RETURN(-EIO);
 
-        if (loi == NULL)
-                loi = lsm->lsm_oinfo[0];
-
         if (oap->oap_cmd & OBD_BRW_WRITE) {
                 lop = &loi->loi_write_lop;
         } else {
                 lop = &loi->loi_read_lop;
         }
 
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-
         if (list_empty(&oap->oap_pending_item))
-                GOTO(out, rc = -EINVAL);
+                RETURN(-EINVAL);
 
         if ((oap->oap_async_flags & async_flags) == async_flags)
-                GOTO(out, rc = 0);
+                RETURN(0);
 
         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
                 oap->oap_async_flags |= ASYNC_READY;
@@ -2839,106 +2693,12 @@ static int osc_set_async_flags(struct obd_export *exp,
 
         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
                         oap->oap_async_flags);
-out:
-        osc_check_rpcs(cli);
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-        RETURN(rc);
-}
-
-static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
-                             struct lov_oinfo *loi,
-                             struct obd_io_group *oig, void *cookie,
-                             int cmd, obd_off off, int count,
-                             obd_flag brw_flags,
-                             obd_flag async_flags)
-{
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        struct osc_async_page *oap;
-        struct loi_oap_pages *lop;
-        int rc = 0;
-        ENTRY;
-
-        oap = oap_from_cookie(cookie);
-        if (IS_ERR(oap))
-                RETURN(PTR_ERR(oap));
-
-        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
-                RETURN(-EIO);
-
-        if (!list_empty(&oap->oap_pending_item) ||
-            !list_empty(&oap->oap_urgent_item) ||
-            !list_empty(&oap->oap_rpc_item))
-                RETURN(-EBUSY);
-
-        if (loi == NULL)
-                loi = lsm->lsm_oinfo[0];
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-
-        oap->oap_cmd = cmd;
-        oap->oap_page_off = off;
-        oap->oap_count = count;
-        oap->oap_brw_flags = brw_flags;
-        oap->oap_async_flags = async_flags;
-
-        if (cmd & OBD_BRW_WRITE)
-                lop = &loi->loi_write_lop;
-        else
-                lop = &loi->loi_read_lop;
-
-        list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
-        if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
-                oap->oap_oig = oig;
-                rc = oig_add_one(oig, &oap->oap_occ);
-        }
-
-        LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
-                  oap, oap->oap_page, rc);
-
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-
-        RETURN(rc);
-}
-
-static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
-                                 struct loi_oap_pages *lop, int cmd)
-{
-        struct list_head *pos, *tmp;
-        struct osc_async_page *oap;
-
-        list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
-                oap = list_entry(pos, struct osc_async_page, oap_pending_item);
-                list_del(&oap->oap_pending_item);
-                osc_oap_to_pending(oap);
-        }
-        loi_list_maint(cli, loi);
-}
-
-static int osc_trigger_group_io(struct obd_export *exp,
-                                struct lov_stripe_md *lsm,
-                                struct lov_oinfo *loi,
-                                struct obd_io_group *oig)
-{
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        ENTRY;
-
-        if (loi == NULL)
-                loi = lsm->lsm_oinfo[0];
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-
-        osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
-        osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
-
-        osc_check_rpcs(cli);
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-
         RETURN(0);
 }
 
-static int osc_teardown_async_page(struct obd_export *exp,
-                                   struct lov_stripe_md *lsm,
-                                   struct lov_oinfo *loi, void *cookie)
+int osc_teardown_async_page(struct obd_export *exp,
+                            struct lov_stripe_md *lsm,
+                            struct lov_oinfo *loi, void *cookie)
 {
         struct client_obd *cli = &exp->exp_obd->u.cli;
         struct loi_oap_pages *lop;
@@ -2976,85 +2736,44 @@ static int osc_teardown_async_page(struct obd_export *exp,
                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
         }
         loi_list_maint(cli, loi);
-        cache_remove_extent(cli->cl_cache, oap);
-
         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
 out:
         client_obd_list_unlock(&cli->cl_loi_list_lock);
         RETURN(rc);
 }
 
-int osc_extent_blocking_cb(struct ldlm_lock *lock,
-                           struct ldlm_lock_desc *new, void *data,
-                           int flag)
+static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
+                                         struct ldlm_enqueue_info *einfo,
+                                         int flags)
 {
-        struct lustre_handle lockh = { 0 };
-        int rc;
-        ENTRY;
-
-        if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
-                LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
-                LBUG();
-        }
+        void *data = einfo->ei_cbdata;
 
-        switch (flag) {
-        case LDLM_CB_BLOCKING:
-                ldlm_lock2handle(lock, &lockh);
-                rc = ldlm_cli_cancel(&lockh);
-                if (rc != ELDLM_OK)
-                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
-                break;
-        case LDLM_CB_CANCELING: {
-
-                ldlm_lock2handle(lock, &lockh);
-                /* This lock wasn't granted, don't try to do anything */
-                if (lock->l_req_mode != lock->l_granted_mode)
-                        RETURN(0);
+        LASSERT(lock != NULL);
+        LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
+        LASSERT(lock->l_resource->lr_type == einfo->ei_type);
+        LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
+        LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
 
-                cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
-                                  &lockh);
-
-                if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
-                        lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
-                                                          lock, new, data,flag);
-                break;
-        }
-        default:
-                LBUG();
-        }
-
-        RETURN(0);
+        lock_res_and_lock(lock);
+        spin_lock(&osc_ast_guard);
+        LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
+        lock->l_ast_data = data;
+        spin_unlock(&osc_ast_guard);
+        unlock_res_and_lock(lock);
 }
-EXPORT_SYMBOL(osc_extent_blocking_cb);
 
-static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
+static void osc_set_data_with_check(struct lustre_handle *lockh,
+                                    struct ldlm_enqueue_info *einfo,
                                     int flags)
 {
         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
 
-        if (lock == NULL) {
-                CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
-                return;
-        }
-        lock_res_and_lock(lock);
-#if defined (__KERNEL__) && defined (__linux__)
-        /* Liang XXX: Darwin and Winnt checking should be added */
-        if (lock->l_ast_data && lock->l_ast_data != data) {
-                struct inode *new_inode = data;
-                struct inode *old_inode = lock->l_ast_data;
-                if (!(old_inode->i_state & I_FREEING))
-                        LDLM_ERROR(lock, "inconsistent l_ast_data found");
-                LASSERTF(old_inode->i_state & I_FREEING,
-                         "Found existing inode %p/%lu/%u state %lu in lock: "
-                         "setting data to %p/%lu/%u\n", old_inode,
-                         old_inode->i_ino, old_inode->i_generation,
-                         old_inode->i_state,
-                         new_inode, new_inode->i_ino, new_inode->i_generation);
-        }
-#endif
-        lock->l_ast_data = data;
-        unlock_res_and_lock(lock);
-        LDLM_LOCK_PUT(lock);
+        if (lock != NULL) {
+                osc_set_lock_data_with_check(lock, einfo, flags);
+                LDLM_LOCK_PUT(lock);
+        } else
+                CERROR("lockh %p, data %p - client evicted?\n",
+                       lockh, einfo->ei_cbdata);
 }
 
 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
@@ -3068,9 +2787,11 @@ static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
         return 0;
 }
 
-static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
-                            struct obd_info *oinfo, int intent, int rc)
+static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
+                            obd_enqueue_update_f upcall, void *cookie,
+                            int *flags, int rc)
 {
+        int intent = *flags & LDLM_FL_HAS_INTENT;
         ENTRY;
 
         if (intent) {
@@ -3087,17 +2808,13 @@ static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
         }
 
         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
+                *flags |= LDLM_FL_LVB_READY;
                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
-                       oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
-                       oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
-                       oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
+                       lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
         }
 
-        if (!rc)
-                cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
-
         /* Call the update callback. */
-        rc = oinfo->oi_cb_up(oinfo, rc);
+        rc = (*upcall)(cookie, rc);
         RETURN(rc);
 }
 
@@ -3105,36 +2822,87 @@ static int osc_enqueue_interpret(const struct lu_env *env,
                                  struct ptlrpc_request *req,
                                  struct osc_enqueue_args *aa, int rc)
 {
-        int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
-        struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
         struct ldlm_lock *lock;
+        struct lustre_handle handle;
+        __u32 mode;
+
+        /* Make a local copy of a lock handle and a mode, because aa->oa_*
+         * might be freed anytime after lock upcall has been called. */
+        lustre_handle_copy(&handle, aa->oa_lockh);
+        mode = aa->oa_ei->ei_mode;
 
         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
          * be valid. */
-        lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
+        lock = ldlm_handle2lock(&handle);
+
+        /* Take an additional reference so that a blocking AST that
+         * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
+         * to arrive after an upcall has been executed by
+         * osc_enqueue_fini(). */
+        ldlm_lock_addref(&handle, mode);
 
         /* Complete obtaining the lock procedure. */
         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
-                                   aa->oa_ei->ei_mode,
-                                   &aa->oa_oi->oi_flags,
-                                   &lsm->lsm_oinfo[0]->loi_lvb,
-                                   sizeof(lsm->lsm_oinfo[0]->loi_lvb),
-                                   lustre_swab_ost_lvb,
-                                   aa->oa_oi->oi_lockh, rc);
-
+                                   mode, aa->oa_flags, aa->oa_lvb,
+                                   sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
+                                   &handle, rc);
         /* Complete osc stuff. */
-        rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
-
+        rc = osc_enqueue_fini(req, aa->oa_lvb,
+                              aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
         /* Release the lock for async request. */
-        if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
-                ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
+        if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
+                /*
+                 * Releases a reference taken by ldlm_cli_enqueue(), if it is
+                 * not already released by
+                 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
+                 */
+                ldlm_lock_decref(&handle, mode);
 
         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
-                 aa->oa_oi->oi_lockh, req, aa);
+                 aa->oa_lockh, req, aa);
+        ldlm_lock_decref(&handle, mode);
         LDLM_LOCK_PUT(lock);
         return rc;
 }
 
+void osc_update_enqueue(struct lustre_handle *lov_lockhp,
+                        struct lov_oinfo *loi, int flags,
+                        struct ost_lvb *lvb, __u32 mode, int rc)
+{
+        if (rc == ELDLM_OK) {
+                struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
+                __u64 tmp;
+
+                LASSERT(lock != NULL);
+                loi->loi_lvb = *lvb;
+                tmp = loi->loi_lvb.lvb_size;
+                /* Extend KMS up to the end of this lock and no further
+                 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
+                if (tmp > lock->l_policy_data.l_extent.end)
+                        tmp = lock->l_policy_data.l_extent.end + 1;
+                if (tmp >= loi->loi_kms) {
+                        LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
+                                   ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
+                        loi_kms_set(loi, tmp);
+                } else {
+                        LDLM_DEBUG(lock, "lock acquired, setting rss="
+                                   LPU64"; leaving kms="LPU64", end="LPU64,
+                                   loi->loi_lvb.lvb_size, loi->loi_kms,
+                                   lock->l_policy_data.l_extent.end);
+                }
+                ldlm_lock_allow_match(lock);
+                LDLM_LOCK_PUT(lock);
+        } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
+                loi->loi_lvb = *lvb;
+                CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
+                       " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
+                rc = ELDLM_OK;
+        }
+}
+EXPORT_SYMBOL(osc_update_enqueue);
+
+struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
+
 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
  * other synchronous requests, however keeping some locks and trying to obtain
@@ -3142,28 +2910,33 @@ static int osc_enqueue_interpret(const struct lu_env *env,
  * when other sync requests do not get released lock from a client, the client
  * is excluded from the cluster -- such scenarious make the life difficult, so
  * release locks just after they are obtained. */
-static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
-                       struct ldlm_enqueue_info *einfo,
-                       struct ptlrpc_request_set *rqset)
+int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
+                     int *flags, ldlm_policy_data_t *policy,
+                     struct ost_lvb *lvb, int kms_valid,
+                     obd_enqueue_update_f upcall, void *cookie,
+                     struct ldlm_enqueue_info *einfo,
+                     struct lustre_handle *lockh,
+                     struct ptlrpc_request_set *rqset, int async)
 {
-        struct ldlm_res_id res_id;
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req = NULL;
-        int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
+        int intent = *flags & LDLM_FL_HAS_INTENT;
         ldlm_mode_t mode;
         int rc;
         ENTRY;
 
-
-        osc_build_res_name(oinfo->oi_md->lsm_object_id,
-                           oinfo->oi_md->lsm_object_gr, &res_id);
         /* Filesystem lock extents are extended to page boundaries so that
          * dealing with the page cache is a little smoother.  */
-        oinfo->oi_policy.l_extent.start -=
-                oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
-        oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
+        policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
+        policy->l_extent.end |= ~CFS_PAGE_MASK;
 
-        if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
+        /*
+         * kms is not valid when either object is completely fresh (so that no
+         * locks are cached), or object was evicted. In the latter case cached
+         * lock cannot be used, because it would prime inode state with
+         * potentially stale LVB.
+         */
+        if (!kms_valid)
                 goto no_match;
 
         /* Next, search for already existing extent locks that will cover us */
@@ -3182,32 +2955,37 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
         if (einfo->ei_mode == LCK_PR)
                 mode |= LCK_PW;
         mode = ldlm_lock_match(obd->obd_namespace,
-                               oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
-                               einfo->ei_type, &oinfo->oi_policy, mode,
-                               oinfo->oi_lockh);
+                               *flags | LDLM_FL_LVB_READY, res_id,
+                               einfo->ei_type, policy, mode, lockh, 0);
         if (mode) {
-                /* addref the lock only if not async requests and PW lock is
-                 * matched whereas we asked for PR. */
-                if (!rqset && einfo->ei_mode != mode)
-                        ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
-                osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
-                                        oinfo->oi_flags);
-                if (intent) {
-                        /* I would like to be able to ASSERT here that rss <=
-                         * kms, but I can't, for reasons which are explained in
-                         * lov_enqueue() */
-                }
-
-                /* We already have a lock, and it's referenced */
-                oinfo->oi_cb_up(oinfo, ELDLM_OK);
+                struct ldlm_lock *matched = ldlm_handle2lock(lockh);
+
+                if (matched->l_ast_data == NULL ||
+                    matched->l_ast_data == einfo->ei_cbdata) {
+                        /* addref the lock only if not async requests and PW
+                         * lock is matched whereas we asked for PR. */
+                        if (!rqset && einfo->ei_mode != mode)
+                                ldlm_lock_addref(lockh, LCK_PR);
+                        osc_set_lock_data_with_check(matched, einfo, *flags);
+                        if (intent) {
+                                /* I would like to be able to ASSERT here that
+                                 * rss <= kms, but I can't, for reasons which
+                                 * are explained in lov_enqueue() */
+                        }
 
-                /* For async requests, decref the lock. */
-                if (einfo->ei_mode != mode)
-                        ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
-                else if (rqset)
-                        ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
+                        /* We already have a lock, and it's referenced */
+                        (*upcall)(cookie, ELDLM_OK);
 
-                RETURN(ELDLM_OK);
+                        /* For async requests, decref the lock. */
+                        if (einfo->ei_mode != mode)
+                                ldlm_lock_decref(lockh, LCK_PW);
+                        else if (rqset)
+                                ldlm_lock_decref(lockh, einfo->ei_mode);
+                        LDLM_LOCK_PUT(matched);
+                        RETURN(ELDLM_OK);
+                } else
+                        ldlm_lock_decref(lockh, mode);
+                LDLM_LOCK_PUT(matched);
         }
 
  no_match:
@@ -3223,56 +3001,76 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                         RETURN(rc);
 
                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
-                                     sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
+                                     sizeof *lvb);
                 ptlrpc_request_set_replen(req);
         }
 
         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
-        oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
+        *flags &= ~LDLM_FL_BLOCK_GRANTED;
 
-        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
-                              &oinfo->oi_policy, &oinfo->oi_flags,
-                              &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
-                              sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
-                              lustre_swab_ost_lvb, oinfo->oi_lockh,
-                              rqset ? 1 : 0);
+        rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
+                              sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
         if (rqset) {
                 if (!rc) {
                         struct osc_enqueue_args *aa;
                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
                         aa = ptlrpc_req_async_args(req);
-                        aa->oa_oi = oinfo;
                         aa->oa_ei = einfo;
                         aa->oa_exp = exp;
+                        aa->oa_flags  = flags;
+                        aa->oa_upcall = upcall;
+                        aa->oa_cookie = cookie;
+                        aa->oa_lvb    = lvb;
+                        aa->oa_lockh  = lockh;
 
                         req->rq_interpret_reply =
                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
-                        ptlrpc_set_add_req(rqset, req);
+                        if (rqset == PTLRPCD_SET)
+                                ptlrpcd_add_req(req, PSCOPE_OTHER);
+                        else
+                                ptlrpc_set_add_req(rqset, req);
                 } else if (intent) {
                         ptlrpc_req_finished(req);
                 }
                 RETURN(rc);
         }
 
-        rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
+        rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
         if (intent)
                 ptlrpc_req_finished(req);
 
         RETURN(rc);
 }
 
-static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
-                     __u32 type, ldlm_policy_data_t *policy, __u32 mode,
-                     int *flags, void *data, struct lustre_handle *lockh)
+static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
+                       struct ldlm_enqueue_info *einfo,
+                       struct ptlrpc_request_set *rqset)
 {
         struct ldlm_res_id res_id;
+        int rc;
+        ENTRY;
+
+        osc_build_res_name(oinfo->oi_md->lsm_object_id,
+                           oinfo->oi_md->lsm_object_gr, &res_id);
+
+        rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
+                              &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
+                              oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
+                              oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
+                              rqset, rqset != NULL);
+        RETURN(rc);
+}
+
+int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
+                   __u32 type, ldlm_policy_data_t *policy, __u32 mode,
+                   int *flags, void *data, struct lustre_handle *lockh,
+                   int unref)
+{
         struct obd_device *obd = exp->exp_obd;
         int lflags = *flags;
         ldlm_mode_t rc;
         ENTRY;
 
-        osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
-
         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
                 RETURN(-EIO);
 
@@ -3289,9 +3087,10 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
         if (mode == LCK_PR)
                 rc |= LCK_PW;
         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
-                             &res_id, type, policy, rc, lockh);
+                             res_id, type, policy, rc, lockh, unref);
         if (rc) {
-                osc_set_data_with_check(lockh, data, lflags);
+                if (data != NULL)
+                        osc_set_data_with_check(lockh, data, lflags);
                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
                         ldlm_lock_addref(lockh, LCK_PR);
                         ldlm_lock_decref(lockh, LCK_PW);
@@ -3301,8 +3100,7 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
         RETURN(rc);
 }
 
-static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
-                      __u32 mode, struct lustre_handle *lockh)
+int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
 {
         ENTRY;
 
@@ -3314,6 +3112,13 @@ static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
         RETURN(0);
 }
 
+static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
+                      __u32 mode, struct lustre_handle *lockh)
+{
+        ENTRY;
+        RETURN(osc_cancel_base(lockh, mode));
+}
+
 static int osc_cancel_unused(struct obd_export *exp,
                              struct lov_stripe_md *lsm, int flags,
                              void *opaque)
@@ -3981,16 +3786,23 @@ static int osc_import_event(struct obd_device *obd,
         }
         case IMP_EVENT_INVALIDATE: {
                 struct ldlm_namespace *ns = obd->obd_namespace;
+                struct lu_env         *env;
+                int                    refcheck;
+
+                env = cl_env_get(&refcheck);
+                if (!IS_ERR(env)) {
+                        /* Reset grants */
+                        cli = &obd->u.cli;
+                        client_obd_list_lock(&cli->cl_loi_list_lock);
+                        /* all pages go to failing rpcs due to the invalid
+                         * import */
+                        osc_check_rpcs(env, cli);
+                        client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-                /* Reset grants */
-                cli = &obd->u.cli;
-                client_obd_list_lock(&cli->cl_loi_list_lock);
-                /* all pages go to failing rpcs due to the invalid import */
-                osc_check_rpcs(cli);
-                client_obd_list_unlock(&cli->cl_loi_list_lock);
-
-                ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
-
+                        ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
+                        cl_env_put(env, &refcheck);
+                } else
+                        rc = PTR_ERR(env);
                 break;
         }
         case IMP_EVENT_ACTIVE: {
@@ -4059,11 +3871,6 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
                                             OST_MAXREQSIZE,
                                             ptlrpc_add_rqs_to_pool);
-                cli->cl_cache = cache_create(obd);
-                if (!cli->cl_cache) {
-                        osc_cleanup(obd);
-                        rc = -ENOMEM;
-                }
         }
 
         RETURN(rc);
@@ -4130,53 +3937,14 @@ int osc_cleanup(struct obd_device *obd)
         /* free memory of osc quota cache */
         lquota_cleanup(quota_interface, obd);
 
-        cache_destroy(obd->u.cli.cl_cache);
         rc = client_obd_cleanup(obd);
 
         ptlrpcd_decref();
         RETURN(rc);
 }
 
-static int osc_register_page_removal_cb(struct obd_export *exp,
-                                        obd_page_removal_cb_t func,
-                                        obd_pin_extent_cb pin_cb)
-{
-        return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
-                                           pin_cb);
-}
-
-static int osc_unregister_page_removal_cb(struct obd_export *exp,
-                                          obd_page_removal_cb_t func)
-{
-        return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
-}
-
-static int osc_register_lock_cancel_cb(struct obd_export *exp,
-                                       obd_lock_cancel_cb cb)
-{
-        LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
-
-        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
-        return 0;
-}
-
-static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
-                                         obd_lock_cancel_cb cb)
-{
-        if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
-                CERROR("Unregistering cancel cb %p, while only %p was "
-                       "registered\n", cb,
-                       exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
-                RETURN(-EINVAL);
-        }
-
-        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
-        return 0;
-}
-
-static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
+int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
-        struct lustre_cfg *lcfg = buf;
         struct lprocfs_static_vars lvars = { 0 };
         int rc = 0;
 
@@ -4195,6 +3963,11 @@ static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
         return(rc);
 }
 
+static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
+{
+        return osc_process_config_base(obd, buf);
+}
+
 struct obd_ops osc_obd_ops = {
         .o_owner                = THIS_MODULE,
         .o_setup                = osc_setup,
@@ -4217,19 +3990,9 @@ struct obd_ops osc_obd_ops = {
         .o_setattr              = osc_setattr,
         .o_setattr_async        = osc_setattr_async,
         .o_brw                  = osc_brw,
-        .o_brw_async            = osc_brw_async,
-        .o_prep_async_page      = osc_prep_async_page,
-        .o_reget_short_lock     = osc_reget_short_lock,
-        .o_release_short_lock   = osc_release_short_lock,
-        .o_queue_async_io       = osc_queue_async_io,
-        .o_set_async_flags      = osc_set_async_flags,
-        .o_queue_group_io       = osc_queue_group_io,
-        .o_trigger_group_io     = osc_trigger_group_io,
-        .o_teardown_async_page  = osc_teardown_async_page,
         .o_punch                = osc_punch,
         .o_sync                 = osc_sync,
         .o_enqueue              = osc_enqueue,
-        .o_match                = osc_match,
         .o_change_cbdata        = osc_change_cbdata,
         .o_cancel               = osc_cancel,
         .o_cancel_unused        = osc_cancel_unused,
@@ -4240,18 +4003,25 @@ struct obd_ops osc_obd_ops = {
         .o_llog_init            = osc_llog_init,
         .o_llog_finish          = osc_llog_finish,
         .o_process_config       = osc_process_config,
-        .o_register_page_removal_cb = osc_register_page_removal_cb,
-        .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
-        .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
-        .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
 };
 
+extern struct lu_kmem_descr  osc_caches[];
+extern spinlock_t            osc_ast_guard;
+extern struct lock_class_key osc_ast_guard_class;
+
 int __init osc_init(void)
 {
         struct lprocfs_static_vars lvars = { 0 };
         int rc;
         ENTRY;
 
+        /* print an address of _any_ initialized kernel symbol from this
+         * module, to allow debugging with gdb that doesn't support data
+         * symbols from modules.*/
+        CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
+
+        rc = lu_kmem_init(osc_caches);
+
         lprocfs_osc_init_vars(&lvars);
 
         request_module("lquota");
@@ -4260,24 +4030,31 @@ int __init osc_init(void)
         init_obd_quota_ops(quota_interface, &osc_obd_ops);
 
         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
-                                 LUSTRE_OSC_NAME, NULL);
+                                 LUSTRE_OSC_NAME, &osc_device_type);
         if (rc) {
                 if (quota_interface)
                         PORTAL_SYMBOL_PUT(osc_quota_interface);
+                lu_kmem_fini(osc_caches);
                 RETURN(rc);
         }
 
+        spin_lock_init(&osc_ast_guard);
+        lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
+
         RETURN(rc);
 }
 
 #ifdef __KERNEL__
 static void /*__exit*/ osc_exit(void)
 {
+        lu_device_type_fini(&osc_device_type);
+
         lquota_exit(quota_interface);
         if (quota_interface)
                 PORTAL_SYMBOL_PUT(osc_quota_interface);
 
         class_unregister_type(LUSTRE_OSC_NAME);
+        lu_kmem_fini(osc_caches);
 }
 
 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");