Whamcloud - gitweb
Landing b_bug974 onto HEAD (20040213_1538).
[fs/lustre-release.git] / lustre / osc / osc_request.c
index b817a14..e8dd043 100644 (file)
@@ -532,38 +532,80 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         return rc;
 }
 
-static void osc_announce_cached(struct client_obd *cli, struct ost_body *body)
+static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
+                                long writing_bytes)
 {
-        obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLRDEV;
+        obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
 
-        LASSERT(!(body->oa.o_valid & bits));
+        LASSERT(!(oa->o_valid & bits));
 
-        body->oa.o_valid |= bits;
-        down(&cli->cl_dirty_sem);
-        body->oa.o_blocks = cli->cl_dirty;
-        body->oa.o_rdev = cli->cl_dirty_granted;
-        up(&cli->cl_dirty_sem);
-        CDEBUG(D_INODE, "announcing "LPU64" dirty "LPU64" granted\n",
-               cli->cl_dirty, cli->cl_dirty_granted);
+        oa->o_valid |= bits;
+        spin_lock(&cli->cl_loi_list_lock);
+        oa->o_dirty = cli->cl_dirty;
+        oa->o_undirty = cli->cl_dirty_max - oa->o_dirty;
+        oa->o_grant = cli->cl_avail_grant;
+        oa->o_dropped = cli->cl_lost_grant;
+        cli->cl_lost_grant = 0;
+        spin_unlock(&cli->cl_loi_list_lock);
+        CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
+               oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
 }
 
-static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+/* caller must hold loi_list_lock */
+static void osc_consume_write_grant(struct client_obd *cli,
+                                    struct osc_async_page *oap)
+{
+        cli->cl_dirty += PAGE_SIZE;
+        cli->cl_avail_grant -= PAGE_SIZE;
+        oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
+        CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", PAGE_SIZE, oap);
+        LASSERT(cli->cl_avail_grant >= 0);
+}
+
+/* caller must hold loi_list_lock */
+void osc_wake_cache_waiters(struct client_obd *cli)
 {
-        if(!(body->oa.o_valid & OBD_MD_FLRDEV)) {
-                if (cli->cl_ost_can_grant) {
-                        CDEBUG(D_INODE, "%s can't grant\n",
-                               cli->cl_import->imp_target_uuid.uuid);
+        struct list_head *l, *tmp;
+        struct osc_cache_waiter *ocw;
+
+        list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
+                /* if we can't dirty more, we must wait until some is written */
+                if (cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) {
+                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
+                               cli->cl_dirty, cli->cl_dirty_max);
+                        return;
                 }
-                cli->cl_ost_can_grant = 0;
-                return;
+
+                /* if still dirty cache but no grant wait for pending RPCs that
+                 * may yet return us some grant before doing sync writes */
+                if (cli->cl_brw_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
+                        CDEBUG(D_CACHE, "%d BRWs in flight, no grant\n",
+                               cli->cl_brw_in_flight);
+                        return;
+                }
+
+                ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
+                list_del_init(&ocw->ocw_entry);
+                if (cli->cl_avail_grant < PAGE_SIZE) {
+                        /* no more RPCs in flight to return grant, do sync IO */
+                        ocw->ocw_rc = -EDQUOT;
+                        CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
+                } else {
+                        osc_consume_write_grant(cli, ocw->ocw_oap);
+                }
+                wake_up(&ocw->ocw_waitq);
         }
 
-        CDEBUG(D_ERROR, "got "LPU64" grant\n", body->oa.o_rdev);
-        down(&cli->cl_dirty_sem);
-        cli->cl_dirty_granted = body->oa.o_rdev;
-        /* XXX check for over-run and wake up the io thread that
-         * doesn't exist yet */
-        up(&cli->cl_dirty_sem);
+        EXIT;
+}
+
+static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+{
+        spin_lock(&cli->cl_loi_list_lock);
+        CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
+        cli->cl_avail_grant += body->oa.o_grant;
+        /* waiters are woken in brw_interpret_oap */
+        spin_unlock(&cli->cl_loi_list_lock);
 }
 
 /* We assume that the reason this OSC got a short read is because it read
@@ -637,7 +679,7 @@ static int check_write_rcs(struct ptlrpc_request *request, int niocount,
 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
 {
         if (p1->flag != p2->flag) {
-                unsigned mask = ~(OBD_BRW_CREATE|OBD_BRW_FROM_GRANT);
+                unsigned mask = ~OBD_BRW_FROM_GRANT;
 
                 /* warn if we try to combine flags that we don't know to be
                  * safe to combine */
@@ -696,7 +738,7 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
 
         for (niocount = i = 1; i < page_count; i++)
-                if (!can_merge_pages (&pga[i - 1], &pga[i]))
+                if (!can_merge_pages(&pga[i - 1], &pga[i]))
                         niocount++;
 
         size[0] = sizeof(*body);
@@ -760,7 +802,7 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
 
         LASSERT((void *)(niobuf - niocount) ==
                 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
-        osc_announce_cached(cli, body);
+        osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
         spin_lock_irqsave(&req->rq_lock, flags);
         req->rq_no_resend = 1;
         spin_unlock_irqrestore(&req->rq_lock, flags);
@@ -769,7 +811,7 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
         if (opc == OST_WRITE) {
 #if CHECKSUM_BULK
                 body->oa.o_valid |= OBD_MD_FLCKSUM;
-                body->oa.o_nlink = cksum_pages(requested_nob, page_count, pga);
+                body->oa.o_cksum = cksum_pages(requested_nob, page_count, pga);
 #endif
                 /* 1 RC per niobuf */
                 size[1] = sizeof(__u32) * niocount;
@@ -796,14 +838,15 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
 {
         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
         struct ost_body *body;
+        ENTRY;
 
         if (rc < 0)
-                return (rc);
+                RETURN(rc);
 
         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
         if (body == NULL) {
                 CERROR ("Can't unpack body\n");
-                return (-EPROTO);
+                RETURN(-EPROTO);
         }
 
         osc_update_grant(cli, body);
@@ -811,15 +854,15 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
         if (req->rq_reqmsg->opc == OST_WRITE) {
                 if (rc > 0) {
                         CERROR ("Unexpected +ve rc %d\n", rc);
-                        return (-EPROTO);
+                        RETURN(-EPROTO);
                 }
 
-                return(check_write_rcs(req, niocount, page_count, pga));
+                RETURN(check_write_rcs(req, niocount, page_count, pga));
         }
 
         if (rc > requested_nob) {
                 CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
-                return (-EPROTO);
+                RETURN(-EPROTO);
         }
 
         if (rc < requested_nob)
@@ -832,7 +875,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
                 const struct ptlrpc_peer *peer =
                         &req->rq_import->imp_connection->c_peer;
                 static int cksum_counter;
-                obd_count server_cksum = oa->o_nlink;
+                obd_count server_cksum = oa->o_cksum;
                 obd_count cksum = cksum_pages(rc, page_count, pga);
                 char str[PTL_NALFMT_SIZE];
 
@@ -844,7 +887,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
                                LPX64" (%s)\n", server_cksum, cksum,
                                peer->peer_nid, str);
                         cksum_counter = 0;
-                        oa->o_nlink = cksum;
+                        oa->o_cksum = cksum;
                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
                         CWARN("Checksum %u from "LPX64" (%s) OK: %x\n",
                               cksum_counter, peer->peer_nid, str, cksum);
@@ -859,7 +902,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
                                req->rq_import->imp_connection->c_peer.peer_nid);
         }
 #endif
-        return (0);
+        RETURN(0);
 }
 
 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
@@ -1088,7 +1131,8 @@ static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
 }
 
 static void osc_check_rpcs(struct client_obd *cli);
-static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap);
+static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
+                           int sent);
 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
 static void lop_update_pending(struct client_obd *cli,
                                struct loi_oap_pages *lop, int cmd, int delta);
@@ -1127,27 +1171,25 @@ static void osc_occ_interrupted(struct osic_callback_context *occ)
                         list_del_init(&oap->oap_urgent_item);
 
                 loi = oap->oap_loi;
-                lop = (oap->oap_cmd == OBD_BRW_WRITE) ? 
+                lop = (oap->oap_cmd == OBD_BRW_WRITE) ?
                         &loi->loi_write_lop : &loi->loi_read_lop;
                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
                 loi_list_maint(oap->oap_cli, oap->oap_loi);
 
                 osic_complete_one(oap->oap_osic, &oap->oap_occ, 0);
                 oap->oap_osic = NULL;
-
         }
 
 unlock:
         spin_unlock(&oap->oap_cli->cl_loi_list_lock);
 }
 
-/* this must be called holding the list lock to give coverage to exit_cache, 
+/* this must be called holding the loi list lock to give coverage to exit_cache,
  * async_flag maintenance, and oap_request */
 static void osc_complete_oap(struct client_obd *cli,
-                             struct osc_async_page *oap, int rc)
+                             struct osc_async_page *oap, int sent, int rc)
 {
-        ENTRY;
-        osc_exit_cache(cli, oap);
+        osc_exit_cache(cli, oap, sent);
         oap->oap_async_flags = 0;
         oap->oap_interrupted = 0;
 
@@ -1165,7 +1207,6 @@ static void osc_complete_oap(struct client_obd *cli,
 
         oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd,
                                            rc);
-        EXIT;
 }
 
 static int brw_interpret_oap(struct ptlrpc_request *request,
@@ -1190,6 +1231,11 @@ static int brw_interpret_oap(struct ptlrpc_request *request,
 
         spin_lock(&cli->cl_loi_list_lock);
 
+        /* We need to decrement before osc_complete_oap->osc_wake_cache_waiters
+         * is called so we know whether to go to sync BRWs or wait for more
+         * RPCs to complete */
+        cli->cl_brw_in_flight--;
+
         /* the caller may re-use the oap after the completion call so
          * we need to clean it up a little */
         list_for_each_safe(pos, n, &aa->aa_oaps) {
@@ -1199,10 +1245,10 @@ static int brw_interpret_oap(struct ptlrpc_request *request,
                        //oap->oap_page, oap->oap_page->index, oap);
 
                 list_del_init(&oap->oap_rpc_item);
-                osc_complete_oap(cli, oap, rc);
+                osc_complete_oap(cli, oap, 1, rc);
         }
 
-        cli->cl_brw_in_flight--;
+        osc_wake_cache_waiters(cli);
         osc_check_rpcs(cli);
 
         spin_unlock(&cli->cl_loi_list_lock);
@@ -1250,8 +1296,8 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
                 pga[i].pg = oap->oap_page;
                 pga[i].count = oap->oap_count;
                 pga[i].flag = oap->oap_brw_flags;
-                //CDEBUG(D_INODE, "putting page %p index %lu oap %p into pga\n",
-                       //pga[i].pg, oap->oap_page->index, oap);
+                CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
+                       pga[i].pg, oap->oap_page->index, oap, pga[i].flag);
                 i++;
         }
 
@@ -1328,15 +1374,15 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
                         if (rc < 0)
                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
-                                                "instead of ready\n", oap, 
+                                                "instead of ready\n", oap,
                                                 oap->oap_page, rc);
                         switch (rc) {
                         case -EAGAIN:
                                 /* llite is telling us that the page is still
                                  * in commit_write and that we should try
-                                 * and put it in an rpc again later.  we 
+                                 * and put it in an rpc again later.  we
                                  * break out of the loop so we don't create
-                                 * a hole in the sequence of pages in the rpc 
+                                 * a hole in the sequence of pages in the rpc
                                  * stream.*/
                                 pos = NULL;
                                 break;
@@ -1351,7 +1397,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                                 break;
                         default:
                                 LASSERTF(0, "oap %p page %p returned %d "
-                                            "from make_ready\n", oap, 
+                                            "from make_ready\n", oap,
                                             oap->oap_page, rc);
                                 break;
                         }
@@ -1367,13 +1413,12 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
 
                 /* ask the caller for the size of the io as the rpc leaves. */
                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
-                        oap->oap_count = ops->ap_refresh_count(
-                                                        oap->oap_caller_data,
-                                                        cmd);
+                        oap->oap_count =
+                                ops->ap_refresh_count(oap->oap_caller_data,cmd);
                 if (oap->oap_count <= 0) {
-                        CDEBUG(D_INODE, "oap %p count %d, completing\n", oap,
+                        CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
                                oap->oap_count);
-                        osc_complete_oap(cli, oap, oap->oap_count);
+                        osc_complete_oap(cli, oap, 0, oap->oap_count);
                         continue;
                 }
 
@@ -1383,6 +1428,8 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                         break;
         }
 
+        osc_wake_cache_waiters(cli);
+
         if (page_count == 0)
                 RETURN(0);
 
@@ -1403,7 +1450,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                          * were between the pending list and the rpc */
                         if (oap->oap_interrupted) {
                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
-                                osc_complete_oap(cli, oap, oap->oap_count);
+                                osc_complete_oap(cli, oap, 0, oap->oap_count);
                                 continue;
                         }
 
@@ -1430,7 +1477,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_brw_in_flight);
         } else {
                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
-                lprocfs_oh_tally(&cli->cl_write_rpc_hist, 
+                lprocfs_oh_tally(&cli->cl_write_rpc_hist,
                                  cli->cl_brw_in_flight);
         }
 
@@ -1442,7 +1489,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
         list_for_each(pos, &aa->aa_oaps) {
                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
                 if (oap->oap_interrupted) {
-                        CDEBUG(D_INODE, "oap %p in req %p interrupted\n", 
+                        CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
                                oap, request);
                         ptlrpc_mark_interrupted(request);
                         break;
@@ -1487,7 +1534,7 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
                  * that are being queued but which can't be made ready until
                  * the queuer finishes with the page. this is a wart for
                  * llite::commit_write() */
-                optimal *= 2;
+                optimal += 16;
         }
         if (lop->lop_num_pending >= optimal)
                 RETURN(1);
@@ -1495,7 +1542,7 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
         RETURN(0);
 }
 
-static void on_list(struct list_head *item, struct list_head *list, 
+static void on_list(struct list_head *item, struct list_head *list,
                     int should_be_on)
 {
         if (list_empty(item) && should_be_on)
@@ -1508,39 +1555,39 @@ static void on_list(struct list_head *item, struct list_head *list,
  * can find pages to build into rpcs quickly */
 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
 {
-        on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list, 
+        on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
 
-        on_list(&loi->loi_write_item, &cli->cl_loi_write_list, 
+        on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
                 loi->loi_write_lop.lop_num_pending);
 }
 
-#define LOI_DEBUG(LOI, STR, args...) \
-        CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
-               !list_empty(&(LOI)->loi_cli_item),                  \
+#define LOI_DEBUG(LOI, STR, args...)                                     \
+        CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
+               !list_empty(&(LOI)->loi_cli_item),                        \
                (LOI)->loi_write_lop.lop_num_pending,                     \
-               !list_empty(&(LOI)->loi_write_lop.lop_urgent),         \
+               !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
                (LOI)->loi_read_lop.lop_num_pending,                      \
-               !list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
-               args)                       \
+               !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
+               args)                                                     \
 
 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
 {
         ENTRY;
         /* first return all objects which we already know to have
-         * pages ready to be stuffed into rpcs */ 
+         * pages ready to be stuffed into rpcs */
         if (!list_empty(&cli->cl_loi_ready_list))
-                RETURN(list_entry(cli->cl_loi_ready_list.next, 
+                RETURN(list_entry(cli->cl_loi_ready_list.next,
                                   struct lov_oinfo, loi_cli_item));
-        
-        /* then if we have cache waiters, return all objects with queued 
+
+        /* then if we have cache waiters, return all objects with queued
          * writes.  This is especially important when many small files
          * have filled up the cache and not been fired into rpcs because
          * they don't pass the nr_pending/object threshhold */
         if (!list_empty(&cli->cl_cache_waiters) &&
             !list_empty(&cli->cl_loi_write_list))
-                RETURN(list_entry(cli->cl_loi_write_list.next, 
+                RETURN(list_entry(cli->cl_loi_write_list.next,
                                   struct lov_oinfo, loi_write_item));
         RETURN(NULL);
 }
@@ -1608,73 +1655,78 @@ static void osc_check_rpcs(struct client_obd *cli)
 /* we're trying to queue a page in the osc so we're subject to the
  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
  * If the osc's queued pages are already at that limit, then we want to sleep
- * until there is space in the osc's queue for us.  we need this goofy
- * little struct to really tell that our allocation was fulfilled in
- * the presence of pending signals */
-struct osc_cache_waiter {
-        struct list_head        ocw_entry;
-        wait_queue_head_t       ocw_waitq;
-};
+ * until there is space in the osc's queue for us.  We also may be waiting for
+ * write credits from the OST if there are RPCs in flight that may return some
+ * before we fall back to sync writes.
+ *
+ * We need this know our allocation was granted in the presence of signals */
 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
 {
         int rc;
         ENTRY;
         spin_lock(&cli->cl_loi_list_lock);
-        rc = list_empty(&ocw->ocw_entry);
+        rc = list_empty(&ocw->ocw_entry) || cli->cl_brw_in_flight == 0;
         spin_unlock(&cli->cl_loi_list_lock);
         RETURN(rc);
 };
+
+/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
+ * grant or cache space. */
 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
                            struct osc_async_page *oap)
 {
         struct osc_cache_waiter ocw;
-        struct l_wait_info lwi = {0};
-        int rc = 0;
-        ENTRY;
+        struct l_wait_info lwi = { 0 };
+
+        CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
+               cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
+               cli->cl_avail_grant);
 
-        /* XXX check for ost grants here as well.. for now we ignore them. */
         if (cli->cl_dirty_max < PAGE_SIZE)
-                RETURN(-EDQUOT);
+                return(-EDQUOT);
 
-        /* if we fail this test then cl_dirty contains at least one page
-         * that will have to be completed after we release the lock */
-        if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max) {
+
+        /* Hopefully normal case - cache space and write credits available */
+        if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
+            cli->cl_avail_grant >= PAGE_SIZE) {
                 /* account for ourselves */
-                cli->cl_dirty += PAGE_SIZE;
-                GOTO(out, rc = 0);
+                osc_consume_write_grant(cli, oap);
+                return(0);
         }
 
-        init_waitqueue_head(&ocw.ocw_waitq);
-        list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
+        /* Make sure that there are write rpcs in flight to wait for.  This
+         * is a little silly as this object may not have any pending but
+         * other objects sure might. */
+        if (cli->cl_brw_in_flight) {
+                list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
+                init_waitqueue_head(&ocw.ocw_waitq);
+                ocw.ocw_oap = oap;
+                ocw.ocw_rc = 0;
 
-        /* make sure that there are write rpcs in flight to wait for. this
-         * is a little silly as this object may not have any pending
-         * but other objects sure might. this should probably be cleaned. */
-        loi_list_maint(cli, loi);
-        osc_check_rpcs(cli);
-        spin_unlock(&cli->cl_loi_list_lock);
+                loi_list_maint(cli, loi);
+                osc_check_rpcs(cli);
+                spin_unlock(&cli->cl_loi_list_lock);
 
-        CDEBUG(D_INODE, "sleeping for cache space\n");
-        l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
+                CDEBUG(0, "sleeping for cache space\n");
+                l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
 
-        spin_lock(&cli->cl_loi_list_lock);
-        if (!list_empty(&ocw.ocw_entry)) {
-                rc = -EINTR;
-                list_del(&ocw.ocw_entry);
+                spin_lock(&cli->cl_loi_list_lock);
+                if (!list_empty(&ocw.ocw_entry)) {
+                        list_del(&ocw.ocw_entry);
+                        RETURN(-EINTR);
+                }
+                RETURN(ocw.ocw_rc);
         }
-        GOTO(out, rc);
-out:
-        if (rc == 0)
-                oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
-        return rc;
+
+        RETURN(-EDQUOT);
 }
 
-/* the companion to enter_cache, called when an oap is now longer part of the
+/* the companion to enter_cache, called when an oap is no longer part of the
  * dirty accounting.. so writeback completes or truncate happens before writing
  * starts.  must be called with the loi lock held. */
-static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap)
+static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
+                           int sent)
 {
-        struct osc_cache_waiter *ocw;
         ENTRY;
 
         if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
@@ -1682,16 +1734,14 @@ static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap)
                 return;
         }
 
-        if (list_empty(&cli->cl_cache_waiters)) {
-                cli->cl_dirty -= PAGE_SIZE;
-        } else {
-                ocw = list_entry(cli->cl_cache_waiters.next,
-                                 struct osc_cache_waiter, ocw_entry);
-                list_del_init(&ocw->ocw_entry);
-                wake_up(&ocw->ocw_waitq);
+        oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
+        cli->cl_dirty -= PAGE_SIZE;
+        if (!sent) {
+                cli->cl_lost_grant += PAGE_SIZE;
+                CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
+                       cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
         }
 
-        oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
         EXIT;
 }
 
@@ -1973,7 +2023,8 @@ static int osc_teardown_async_page(struct obd_export *exp,
         if (!list_empty(&oap->oap_rpc_item))
                 GOTO(out, rc = -EBUSY);
 
-        osc_exit_cache(cli, oap);
+        osc_exit_cache(cli, oap, 0);
+        osc_wake_cache_waiters(cli);
 
         if (!list_empty(&oap->oap_urgent_item)) {
                 list_del_init(&oap->oap_urgent_item);
@@ -2771,7 +2822,7 @@ static int osc_disconnect(struct obd_export *exp, int flags)
         if (obd->u.cli.cl_conn_count == 1) {
                 /* flush any remaining cancel messages out to the target */
                 llog_sync(ctxt, exp);
-                
+
                 /* balance the conn2export for oscc in osc_connect */
                 class_export_put(exp);
         }
@@ -2796,18 +2847,27 @@ static int osc_lock_contains(struct obd_export *exp, struct lov_stripe_md *lsm,
 static int osc_invalidate_import(struct obd_device *obd,
                                  struct obd_import *imp)
 {
+        struct client_obd *cli;
         LASSERT(imp->imp_obd == obd);
         /* this used to try and tear down queued pages, but it was
          * not correctly implemented.  We'll have to do it again once
          * we call obd_invalidate_import() agian */
-        LBUG();
+        /* XXX And we still need to do this */
+
+        /* Reset grants, too */
+        cli = &obd->u.cli;
+        spin_lock(&cli->cl_loi_list_lock);
+        cli->cl_avail_grant = 0;
+        cli->cl_lost_grant = 0;
+        spin_unlock(&cli->cl_loi_list_lock);
+
         RETURN(0);
 }
 
 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
 {
         int rc;
-        
+
         rc = ptlrpcd_addref();
         if (rc)
                 return rc;