Whamcloud - gitweb
LU-2139 osc: Track and limit "unstable" pages
authorPrakash Surya <surya1@llnl.gov>
Tue, 2 Apr 2013 20:36:39 +0000 (13:36 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 30 Apr 2013 16:24:40 +0000 (12:24 -0400)
This change adds a global counter to track the number of "unstable"
pages held by a given client, along with per file system counters. An
"unstable" page is defined as a page which has been sent to the server
as part of a bulk request, but is uncommitted to stable storage.

In addition to simply tracking the unstable pages, they now also count
towards the maximum number of "pinned" pages on the system at any given
time. Thus, a client will now be bound on the number of dirty and
unstable pages it can pin in memory. Previously only dirty pages were
accounted for in this limit.

In addition to tracking the number of unstable pages in Lustre, the
NR_UNSTABLE_NFS memory zone is also incremented and decremented for
easy monitoring using the "NFS_Unstable:" field in /proc/meminfo.
This field is also used internally by the kernel to limit the total
amount of unstable pages on the system.

The motivation for this change is twofold. First, the client must not
allow itself to disconnect from an OST while still holding unstable
pages. Otherwise, these unstable pages can get lost due to an OST
failure, and replay is not possible due to the disconnect via unmount.

Secondly, the client needs a mechanism to prevent it from allocating too
much of its available RAM to unreclaimable pages pinned by the ptlrpc
layer. If this case occurs, out of memory events can trigger as a side
effect, which we need to avoid.

The current number of unstable pages accounted for on a per file system
granularity is exported by the unstable_stats proc file, contained under
each file system's llite namespace. An example of retrieving this
information is below:

$ lctl get_param llite.*.unstable_stats

Signed-off-by: Prakash Surya <surya1@llnl.gov>
Change-Id: Ic43d34bfa39da9ac4fe159000c7db5908467fd7b
Reviewed-on: http://review.whamcloud.com/4245
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
13 files changed:
libcfs/include/libcfs/user-mem.h
lustre/include/lclient.h
lustre/include/lustre_net.h
lustre/include/obd.h
lustre/include/obd_support.h
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/lov/lov_obd.c
lustre/obdclass/class_obd.c
lustre/osc/osc_cache.c
lustre/osc/osc_internal.h
lustre/osc/osc_request.c

index e48b124..94f8911 100644 (file)
@@ -70,6 +70,9 @@ void cfs_kunmap(cfs_page_t *pg);
 #define cfs_page_pin(page) do {} while (0)
 #define cfs_page_unpin(page) do {} while (0)
 
 #define cfs_page_pin(page) do {} while (0)
 #define cfs_page_unpin(page) do {} while (0)
 
+#define inc_zone_page_state(page, state) do {} while (0)
+#define dec_zone_page_state(page, state) do {} while (0)
+
 /*
  * Memory allocator
  * Inline function, so utils can use them without linking of libcfs
 /*
  * Memory allocator
  * Inline function, so utils can use them without linking of libcfs
index 93169c7..3d404f6 100644 (file)
@@ -434,8 +434,10 @@ struct lov_stripe_md *ccc_inode_lsm_get(struct inode *inode);
 void ccc_inode_lsm_put(struct inode *inode, struct lov_stripe_md *lsm);
 
 /**
 void ccc_inode_lsm_put(struct inode *inode, struct lov_stripe_md *lsm);
 
 /**
- * Data structure managing a client's cached clean pages. An LRU of
- * pages is maintained, along with other statistics.
+ * Data structure managing a client's cached pages. A count of
+ * "unstable" pages is maintained, and an LRU of clean pages is
+ * maintained. "unstable" pages are pages pinned by the ptlrpc
+ * layer for recovery purposes.
  */
 struct cl_client_cache {
        cfs_atomic_t    ccc_users;    /* # of users (OSCs) of this data */
  */
 struct cl_client_cache {
        cfs_atomic_t    ccc_users;    /* # of users (OSCs) of this data */
@@ -444,6 +446,8 @@ struct cl_client_cache {
        cfs_atomic_t    ccc_lru_left; /* # of LRU entries available */
        unsigned long   ccc_lru_max;  /* Max # of LRU entries possible */
        unsigned int    ccc_lru_shrinkers; /* # of threads reclaiming */
        cfs_atomic_t    ccc_lru_left; /* # of LRU entries available */
        unsigned long   ccc_lru_max;  /* Max # of LRU entries possible */
        unsigned int    ccc_lru_shrinkers; /* # of threads reclaiming */
+       cfs_atomic_t    ccc_unstable_nr;    /* # of unstable pages pinned */
+       cfs_waitq_t     ccc_unstable_waitq; /* Signaled on BRW commit */
 };
 
 #endif /*LCLIENT_H */
 };
 
 #endif /*LCLIENT_H */
index 3ec5a24..a644c06 100644 (file)
@@ -1848,7 +1848,9 @@ struct ptlrpc_request {
                rq_no_retry_einprogress:1,
                /* allow the req to be sent if the import is in recovery
                 * status */
                rq_no_retry_einprogress:1,
                /* allow the req to be sent if the import is in recovery
                 * status */
-               rq_allow_replay:1;
+               rq_allow_replay:1,
+               /* bulk request, sent to server, but uncommitted */
+               rq_unstable:1;
 
        unsigned int rq_nr_resend;
 
 
        unsigned int rq_nr_resend;
 
index 681e057..0e0c0f9 100644 (file)
@@ -692,7 +692,7 @@ struct lov_obd {
         cfs_proc_dir_entry_t   *lov_pool_proc_entry;
         enum lustre_sec_part    lov_sp_me;
 
         cfs_proc_dir_entry_t   *lov_pool_proc_entry;
         enum lustre_sec_part    lov_sp_me;
 
-       /* Cached LRU pages from upper layer */
+       /* Cached LRU and unstable data from upper layer */
        void                   *lov_cache;
 
        struct rw_semaphore     lov_notify_lock;
        void                   *lov_cache;
 
        struct rw_semaphore     lov_notify_lock;
index 8c7e4ac..f6f6d9f 100644 (file)
@@ -75,6 +75,7 @@ extern int at_early_margin;
 extern int at_extra;
 extern unsigned int obd_sync_filter;
 extern unsigned int obd_max_dirty_pages;
 extern int at_extra;
 extern unsigned int obd_sync_filter;
 extern unsigned int obd_max_dirty_pages;
+extern cfs_atomic_t obd_unstable_pages;
 extern cfs_atomic_t obd_dirty_pages;
 extern cfs_atomic_t obd_dirty_transit_pages;
 extern unsigned int obd_alloc_fail_rate;
 extern cfs_atomic_t obd_dirty_pages;
 extern cfs_atomic_t obd_dirty_transit_pages;
 extern unsigned int obd_alloc_fail_rate;
index 957ca24..cc7fc04 100644 (file)
@@ -473,6 +473,10 @@ struct ll_sb_info {
 
         struct lprocfs_stats     *ll_stats; /* lprocfs stats counter */
 
 
         struct lprocfs_stats     *ll_stats; /* lprocfs stats counter */
 
+       /* Used to track "unstable" pages on a client, and maintain a
+        * LRU list of clean pages. An "unstable" page is defined as
+        * any page which is sent to a server as part of a bulk request,
+        * but is uncommitted to stable storage. */
        struct cl_client_cache    ll_cache;
 
         struct lprocfs_stats     *ll_ra_stats;
        struct cl_client_cache    ll_cache;
 
         struct lprocfs_stats     *ll_ra_stats;
index 81916ce..744ed25 100644 (file)
@@ -99,13 +99,16 @@ static struct ll_sb_info *ll_init_sbi(void)
                lru_page_max = (pages / 4) * 3;
        }
 
                lru_page_max = (pages / 4) * 3;
        }
 
-       /* initialize lru data */
+       /* initialize ll_cache data */
        cfs_atomic_set(&sbi->ll_cache.ccc_users, 0);
        sbi->ll_cache.ccc_lru_max = lru_page_max;
        cfs_atomic_set(&sbi->ll_cache.ccc_lru_left, lru_page_max);
        spin_lock_init(&sbi->ll_cache.ccc_lru_lock);
        CFS_INIT_LIST_HEAD(&sbi->ll_cache.ccc_lru);
 
        cfs_atomic_set(&sbi->ll_cache.ccc_users, 0);
        sbi->ll_cache.ccc_lru_max = lru_page_max;
        cfs_atomic_set(&sbi->ll_cache.ccc_lru_left, lru_page_max);
        spin_lock_init(&sbi->ll_cache.ccc_lru_lock);
        CFS_INIT_LIST_HEAD(&sbi->ll_cache.ccc_lru);
 
+       cfs_atomic_set(&sbi->ll_cache.ccc_unstable_nr, 0);
+       cfs_waitq_init(&sbi->ll_cache.ccc_unstable_waitq);
+
         sbi->ll_ra_info.ra_max_pages_per_file = min(pages / 32,
                                            SBI_DEFAULT_READAHEAD_MAX);
         sbi->ll_ra_info.ra_max_pages = sbi->ll_ra_info.ra_max_pages_per_file;
         sbi->ll_ra_info.ra_max_pages_per_file = min(pages / 32,
                                            SBI_DEFAULT_READAHEAD_MAX);
         sbi->ll_ra_info.ra_max_pages = sbi->ll_ra_info.ra_max_pages_per_file;
@@ -1089,7 +1092,7 @@ void ll_put_super(struct super_block *sb)
         struct lustre_sb_info *lsi = s2lsi(sb);
         struct ll_sb_info *sbi = ll_s2sbi(sb);
         char *profilenm = get_profile_name(sb);
         struct lustre_sb_info *lsi = s2lsi(sb);
         struct ll_sb_info *sbi = ll_s2sbi(sb);
         char *profilenm = get_profile_name(sb);
-        int force = 1, next;
+       int ccc_count, next, force = 1, rc = 0;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op: sb %p - %s\n", sb, profilenm);
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op: sb %p - %s\n", sb, profilenm);
@@ -1105,6 +1108,19 @@ void ll_put_super(struct super_block *sb)
                         force = obd->obd_force;
         }
 
                         force = obd->obd_force;
         }
 
+       /* Wait for unstable pages to be committed to stable storage */
+       if (force == 0) {
+               struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+               rc = l_wait_event(sbi->ll_cache.ccc_unstable_waitq,
+                       cfs_atomic_read(&sbi->ll_cache.ccc_unstable_nr) == 0,
+                       &lwi);
+       }
+
+       ccc_count = cfs_atomic_read(&sbi->ll_cache.ccc_unstable_nr);
+       if (force == 0 && rc != -EINTR)
+               LASSERTF(ccc_count == 0, "count: %i\n", ccc_count);
+
+
         /* We need to set force before the lov_disconnect in
            lustre_common_put_super, since l_d cleans up osc's as well. */
         if (force) {
         /* We need to set force before the lov_disconnect in
            lustre_common_put_super, since l_d cleans up osc's as well. */
         if (force) {
index a8dba54..dc4fe5d 100644 (file)
@@ -740,6 +740,23 @@ static int ll_rd_sbi_flags(char *page, char **start, off_t off,
        return rc;
 }
 
        return rc;
 }
 
+static int ll_rd_unstable_stats(char *page, char **start, off_t off,
+                             int count, int *eof, void *data)
+{
+       struct super_block      *sb    = data;
+       struct ll_sb_info       *sbi   = ll_s2sbi(sb);
+       struct cl_client_cache  *cache = &sbi->ll_cache;
+       int pages, mb, rc;
+
+       pages = cfs_atomic_read(&cache->ccc_unstable_nr);
+       mb    = (pages * CFS_PAGE_SIZE) >> 20;
+
+       rc = snprintf(page, count, "unstable_pages: %8d\n"
+                                  "unstable_mb:    %8d\n", pages, mb);
+
+       return rc;
+}
+
 static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "uuid",         ll_rd_sb_uuid,          0, 0 },
         //{ "mntpt_path",   ll_rd_path,             0, 0 },
 static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "uuid",         ll_rd_sb_uuid,          0, 0 },
         //{ "mntpt_path",   ll_rd_path,             0, 0 },
@@ -771,6 +788,7 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "lazystatfs",       ll_rd_lazystatfs, ll_wr_lazystatfs, 0 },
         { "max_easize",       ll_rd_maxea_size, 0, 0 },
        { "sbi_flags",        ll_rd_sbi_flags, 0, 0 },
         { "lazystatfs",       ll_rd_lazystatfs, ll_wr_lazystatfs, 0 },
         { "max_easize",       ll_rd_maxea_size, 0, 0 },
        { "sbi_flags",        ll_rd_sbi_flags, 0, 0 },
+       { "unstable_stats",   ll_rd_unstable_stats, 0, 0},
         { 0 }
 };
 
         { 0 }
 };
 
index 81c01f2..e96e65f 100644 (file)
@@ -61,7 +61,7 @@
 #include <lprocfs_status.h>
 #include <lustre_param.h>
 #include <cl_object.h>
 #include <lprocfs_status.h>
 #include <lustre_param.h>
 #include <cl_object.h>
-#include <lclient.h> /* for cl_client_lru */
+#include <lclient.h>
 #include <lustre/ll_fiemap.h>
 #include <lustre_log.h>
 #include <lustre_fid.h>
 #include <lustre/ll_fiemap.h>
 #include <lustre_log.h>
 #include <lustre_fid.h>
index c72a189..1d0619e 100644 (file)
@@ -84,6 +84,8 @@ unsigned int obd_dump_on_eviction;
 EXPORT_SYMBOL(obd_dump_on_eviction);
 unsigned int obd_max_dirty_pages = 256;
 EXPORT_SYMBOL(obd_max_dirty_pages);
 EXPORT_SYMBOL(obd_dump_on_eviction);
 unsigned int obd_max_dirty_pages = 256;
 EXPORT_SYMBOL(obd_max_dirty_pages);
+cfs_atomic_t obd_unstable_pages;
+EXPORT_SYMBOL(obd_unstable_pages);
 cfs_atomic_t obd_dirty_pages;
 EXPORT_SYMBOL(obd_dirty_pages);
 unsigned int obd_timeout = OBD_TIMEOUT_DEFAULT;   /* seconds */
 cfs_atomic_t obd_dirty_pages;
 EXPORT_SYMBOL(obd_dirty_pages);
 unsigned int obd_timeout = OBD_TIMEOUT_DEFAULT;   /* seconds */
index 8ec5be4..ed5e933 100644 (file)
@@ -1310,10 +1310,12 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
 #define OSC_DUMP_GRANT(cli, fmt, args...) do {                               \
        struct client_obd *__tmp = (cli);                                     \
        CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d "            \
 #define OSC_DUMP_GRANT(cli, fmt, args...) do {                               \
        struct client_obd *__tmp = (cli);                                     \
        CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d "            \
-              "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " fmt,   \
+              "unstable_pages: %d/%d dropped: %ld avail: %ld, "              \
+              "reserved: %ld, flight: %d } " fmt,                            \
               __tmp->cl_import->imp_obd->obd_name,                           \
               __tmp->cl_dirty, __tmp->cl_dirty_max,                          \
               cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages,        \
               __tmp->cl_import->imp_obd->obd_name,                           \
               __tmp->cl_dirty, __tmp->cl_dirty_max,                          \
               cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages,        \
+              cfs_atomic_read(&obd_unstable_pages), obd_max_dirty_pages,     \
               __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
               __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args);      \
 } while (0)
               __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
               __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args);      \
 } while (0)
@@ -1463,7 +1465,8 @@ static int osc_enter_cache_try(struct client_obd *cli,
                return 0;
 
        if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
                return 0;
 
        if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
-           cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
+           cfs_atomic_read(&obd_unstable_pages) + 1 +
+           cfs_atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
                osc_consume_write_grant(cli, &oap->oap_brw_page);
                if (transient) {
                        cli->cl_dirty_transit += CFS_PAGE_SIZE;
                osc_consume_write_grant(cli, &oap->oap_brw_page);
                if (transient) {
                        cli->cl_dirty_transit += CFS_PAGE_SIZE;
@@ -1576,9 +1579,9 @@ void osc_wake_cache_waiters(struct client_obd *cli)
 
                ocw->ocw_rc = -EDQUOT;
                /* we can't dirty more */
 
                ocw->ocw_rc = -EDQUOT;
                /* we can't dirty more */
-               if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
-                   (cfs_atomic_read(&obd_dirty_pages) + 1 >
-                    obd_max_dirty_pages)) {
+               if (cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max ||
+                   cfs_atomic_read(&obd_unstable_pages) + 1 +
+                   cfs_atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
                               "osc max %ld, sys max %d\n", cli->cl_dirty,
                               cli->cl_dirty_max, obd_max_dirty_pages);
                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
                               "osc max %ld, sys max %d\n", cli->cl_dirty,
                               cli->cl_dirty_max, obd_max_dirty_pages);
@@ -1746,6 +1749,85 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
                ar->ar_force_sync = 0;
 }
 
                ar->ar_force_sync = 0;
 }
 
+/* Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable. */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+       struct ptlrpc_bulk_desc *desc       = req->rq_bulk;
+       struct client_obd       *cli        = &req->rq_import->imp_obd->u.cli;
+       obd_count                page_count = desc->bd_iov_count;
+       int i;
+
+       /* No unstable page tracking */
+       if (cli->cl_cache == NULL)
+               return;
+
+       LASSERT(page_count >= 0);
+
+       for (i = 0; i < page_count; i++)
+               dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+       cfs_atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr);
+       LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+
+       cfs_atomic_sub(page_count, &obd_unstable_pages);
+       LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+
+       spin_lock(&req->rq_lock);
+       req->rq_committed = 1;
+       req->rq_unstable  = 0;
+       spin_unlock(&req->rq_lock);
+
+       cfs_waitq_broadcast(&cli->cl_cache->ccc_unstable_waitq);
+}
+
+/* "unstable" page accounting. See: osc_dec_unstable_pages. */
+void osc_inc_unstable_pages(struct ptlrpc_request *req)
+{
+       struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+       struct client_obd       *cli  = &req->rq_import->imp_obd->u.cli;
+       obd_count                page_count = desc->bd_iov_count;
+       int i;
+
+       /* No unstable page tracking */
+       if (cli->cl_cache == NULL)
+               return;
+
+       LASSERT(page_count >= 0);
+
+       for (i = 0; i < page_count; i++)
+               inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+       LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+       cfs_atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr);
+
+       LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+       cfs_atomic_add(page_count, &obd_unstable_pages);
+
+       spin_lock(&req->rq_lock);
+
+       /* If the request has already been committed (i.e. brw_commit
+        * called via rq_commit_cb), we need to undo the unstable page
+        * increments we just performed because rq_commit_cb wont be
+        * called again. Otherwise, just set the commit callback so the
+        * unstable page accounting is properly updated when the request
+        * is committed */
+       if (req->rq_committed) {
+               /* Drop lock before calling osc_dec_unstable_pages */
+               spin_unlock(&req->rq_lock);
+               osc_dec_unstable_pages(req);
+               spin_lock(&req->rq_lock);
+       } else {
+               req->rq_unstable  = 1;
+               req->rq_commit_cb = osc_dec_unstable_pages;
+       }
+
+       spin_unlock(&req->rq_lock);
+}
+
 /* this must be called holding the loi list lock to give coverage to exit_cache,
  * async_flag maintenance, and oap_request */
 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
 /* this must be called holding the loi list lock to give coverage to exit_cache,
  * async_flag maintenance, and oap_request */
 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
@@ -1757,6 +1839,9 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
 
        ENTRY;
        if (oap->oap_request != NULL) {
 
        ENTRY;
        if (oap->oap_request != NULL) {
+               if (rc == 0)
+                       osc_inc_unstable_pages(oap->oap_request);
+
                xid = ptlrpc_req_xid(oap->oap_request);
                ptlrpc_req_finished(oap->oap_request);
                oap->oap_request = NULL;
                xid = ptlrpc_req_xid(oap->oap_request);
                ptlrpc_req_finished(oap->oap_request);
                oap->oap_request = NULL;
index 7af9757..078d389 100644 (file)
@@ -204,4 +204,7 @@ int osc_quotactl(struct obd_device *unused, struct obd_export *exp,
 int osc_quotacheck(struct obd_device *unused, struct obd_export *exp,
                    struct obd_quotactl *oqctl);
 int osc_quota_poll_check(struct obd_export *exp, struct if_quotacheck *qchk);
 int osc_quotacheck(struct obd_device *unused, struct obd_export *exp,
                    struct obd_quotactl *oqctl);
 int osc_quota_poll_check(struct obd_export *exp, struct if_quotacheck *qchk);
+
+void osc_inc_unstable_pages(struct ptlrpc_request *req);
+void osc_dec_unstable_pages(struct ptlrpc_request *req);
 #endif /* OSC_INTERNAL_H */
 #endif /* OSC_INTERNAL_H */
index 43f0c7e..881ab54 100644 (file)
@@ -829,13 +829,16 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
                CERROR("dirty %lu - %lu > dirty_max %lu\n",
                       cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
                oa->o_undirty = 0;
                CERROR("dirty %lu - %lu > dirty_max %lu\n",
                       cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
                oa->o_undirty = 0;
-       } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) -
+       } else if (unlikely(cfs_atomic_read(&obd_unstable_pages) +
+                           cfs_atomic_read(&obd_dirty_pages) -
                            cfs_atomic_read(&obd_dirty_transit_pages) >
                            (long)(obd_max_dirty_pages + 1))) {
                /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
                 * not covered by a lock thus they may safely race and trip
                 * this CERROR() unless we add in a small fudge factor (+1). */
                            cfs_atomic_read(&obd_dirty_transit_pages) >
                            (long)(obd_max_dirty_pages + 1))) {
                /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
                 * not covered by a lock thus they may safely race and trip
                 * this CERROR() unless we add in a small fudge factor (+1). */
-               CERROR("dirty %d - %d > system dirty_max %d\n",
+               CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
+                      cli->cl_import->imp_obd->obd_name,
+                      cfs_atomic_read(&obd_unstable_pages),
                       cfs_atomic_read(&obd_dirty_pages),
                       cfs_atomic_read(&obd_dirty_transit_pages),
                       obd_max_dirty_pages);
                       cfs_atomic_read(&obd_dirty_pages),
                       cfs_atomic_read(&obd_dirty_transit_pages),
                       obd_max_dirty_pages);
@@ -1748,6 +1751,7 @@ static int osc_brw_redo_request(struct ptlrpc_request *request,
         aa->aa_resends++;
         new_req->rq_interpret_reply = request->rq_interpret_reply;
         new_req->rq_async_args = request->rq_async_args;
         aa->aa_resends++;
         new_req->rq_interpret_reply = request->rq_interpret_reply;
         new_req->rq_async_args = request->rq_async_args;
+       new_req->rq_commit_cb = request->rq_commit_cb;
        /* cap resend delay to the current request timeout, this is similar to
         * what ptlrpc does (see after_reply()) */
        if (aa->aa_resends > new_req->rq_timeout)
        /* cap resend delay to the current request timeout, this is similar to
         * what ptlrpc does (see after_reply()) */
        if (aa->aa_resends > new_req->rq_timeout)
@@ -2041,6 +2045,20 @@ static int brw_interpret(const struct lu_env *env,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+static void brw_commit(struct ptlrpc_request *req)
+{
+       spin_lock(&req->rq_lock);
+       /* If osc_inc_unstable_pages (via osc_extent_finish) races with
+        * this called via the rq_commit_cb, I need to ensure
+        * osc_dec_unstable_pages is still called. Otherwise unstable
+        * pages may be leaked. */
+       if (req->rq_unstable)
+               osc_dec_unstable_pages(req);
+       else
+               req->rq_committed = 1;
+       spin_unlock(&req->rq_lock);
+}
+
 /**
  * Build an RPC by the list of extent @ext_list. The caller must ensure
  * that the total pages in this list are NOT over max pages per RPC.
 /**
  * Build an RPC by the list of extent @ext_list. The caller must ensure
  * that the total pages in this list are NOT over max pages per RPC.
@@ -2146,7 +2164,9 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                GOTO(out, rc);
        }
 
                GOTO(out, rc);
        }
 
+       req->rq_commit_cb = brw_commit;
        req->rq_interpret_reply = brw_interpret;
        req->rq_interpret_reply = brw_interpret;
+
        if (mem_tight != 0)
                 req->rq_memalloc = 1;
 
        if (mem_tight != 0)
                 req->rq_memalloc = 1;