Whamcloud - gitweb
LU-2139 osc: Use SOFT_SYNC to urge server commit
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index d11ff0e..4d97b3e 100644 (file)
@@ -964,7 +964,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                cfs_list_del_init(&oap->oap_pending_item);
 
                cl_page_get(page);
                cfs_list_del_init(&oap->oap_pending_item);
 
                cl_page_get(page);
-               lu_ref_add(&page->cp_reference, "truncate", cfs_current());
+               lu_ref_add(&page->cp_reference, "truncate", current);
 
                if (cl_page_own(env, io, page) == 0) {
                        cl_page_unmap(env, io, page);
 
                if (cl_page_own(env, io, page) == 0) {
                        cl_page_unmap(env, io, page);
@@ -975,7 +975,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                        LASSERT(0);
                }
 
                        LASSERT(0);
                }
 
-               lu_ref_del(&page->cp_reference, "truncate", cfs_current());
+               lu_ref_del(&page->cp_reference, "truncate", current);
                cl_page_put(env, page);
 
                --ext->oe_nr_pages;
                cl_page_put(env, page);
 
                --ext->oe_nr_pages;
@@ -1263,8 +1263,10 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        ENTRY;
 
        cmd &= ~OBD_BRW_NOQUOTA;
        ENTRY;
 
        cmd &= ~OBD_BRW_NOQUOTA;
-       LASSERT(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ));
-       LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
+       LASSERTF(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ),
+                "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
+       LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE),
+               "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
        LASSERT(opg->ops_transfer_pinned);
 
        /*
        LASSERT(opg->ops_transfer_pinned);
 
        /*
@@ -1316,11 +1318,13 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
 #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do {                          \
        struct client_obd *__tmp = (cli);                                     \
        CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d "          \
 #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do {                          \
        struct client_obd *__tmp = (cli);                                     \
        CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d "          \
-              "dropped: %ld avail: %ld, reserved: %ld, flight: %d } "        \
-              "lru {in list: %d, left: %d, waiters: %d }" fmt,               \
+              "unstable_pages: %d/%d dropped: %ld avail: %ld, "              \
+              "reserved: %ld, flight: %d } lru {in list: %d, "               \
+              "left: %d, waiters: %d }" fmt,                                 \
               __tmp->cl_import->imp_obd->obd_name,                           \
               __tmp->cl_dirty, __tmp->cl_dirty_max,                          \
               cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages,        \
               __tmp->cl_import->imp_obd->obd_name,                           \
               __tmp->cl_dirty, __tmp->cl_dirty_max,                          \
               cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages,        \
+              cfs_atomic_read(&obd_unstable_pages), obd_max_dirty_pages,     \
               __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
               __tmp->cl_reserved_grant, __tmp->cl_w_in_flight,               \
               cfs_atomic_read(&__tmp->cl_lru_in_list),                       \
               __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
               __tmp->cl_reserved_grant, __tmp->cl_w_in_flight,               \
               cfs_atomic_read(&__tmp->cl_lru_in_list),                       \
@@ -1473,7 +1477,8 @@ static int osc_enter_cache_try(struct client_obd *cli,
                return 0;
 
        if (cli->cl_dirty + PAGE_CACHE_SIZE <= cli->cl_dirty_max &&
                return 0;
 
        if (cli->cl_dirty + PAGE_CACHE_SIZE <= cli->cl_dirty_max &&
-           cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
+           cfs_atomic_read(&obd_unstable_pages) + 1 +
+           cfs_atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
                osc_consume_write_grant(cli, &oap->oap_brw_page);
                if (transient) {
                        cli->cl_dirty_transit += PAGE_CACHE_SIZE;
                osc_consume_write_grant(cli, &oap->oap_brw_page);
                if (transient) {
                        cli->cl_dirty_transit += PAGE_CACHE_SIZE;
@@ -1606,8 +1611,8 @@ void osc_wake_cache_waiters(struct client_obd *cli)
                ocw->ocw_rc = -EDQUOT;
                /* we can't dirty more */
                if ((cli->cl_dirty + PAGE_CACHE_SIZE > cli->cl_dirty_max) ||
                ocw->ocw_rc = -EDQUOT;
                /* we can't dirty more */
                if ((cli->cl_dirty + PAGE_CACHE_SIZE > cli->cl_dirty_max) ||
-                   (cfs_atomic_read(&obd_dirty_pages) + 1 >
-                    obd_max_dirty_pages)) {
+                   (cfs_atomic_read(&obd_unstable_pages) + 1 +
+                    cfs_atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) {
                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
                               "osc max %ld, sys max %d\n", cli->cl_dirty,
                               cli->cl_dirty_max, obd_max_dirty_pages);
                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
                               "osc max %ld, sys max %d\n", cli->cl_dirty,
                               cli->cl_dirty_max, obd_max_dirty_pages);
@@ -1775,6 +1780,91 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
                ar->ar_force_sync = 0;
 }
 
                ar->ar_force_sync = 0;
 }
 
+/* Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable. */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+       struct ptlrpc_bulk_desc *desc       = req->rq_bulk;
+       struct client_obd       *cli        = &req->rq_import->imp_obd->u.cli;
+       obd_count                page_count = desc->bd_iov_count;
+       int i;
+
+       /* No unstable page tracking */
+       if (cli->cl_cache == NULL)
+               return;
+
+       LASSERT(page_count >= 0);
+
+       for (i = 0; i < page_count; i++)
+               dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+       cfs_atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr);
+       LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+
+       cfs_atomic_sub(page_count, &cli->cl_unstable_count);
+       LASSERT(cfs_atomic_read(&cli->cl_unstable_count) >= 0);
+
+       cfs_atomic_sub(page_count, &obd_unstable_pages);
+       LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+
+       spin_lock(&req->rq_lock);
+       req->rq_committed = 1;
+       req->rq_unstable  = 0;
+       spin_unlock(&req->rq_lock);
+
+       wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
+}
+
+/* "unstable" page accounting. See: osc_dec_unstable_pages. */
+void osc_inc_unstable_pages(struct ptlrpc_request *req)
+{
+       struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+       struct client_obd       *cli  = &req->rq_import->imp_obd->u.cli;
+       obd_count                page_count = desc->bd_iov_count;
+       int i;
+
+       /* No unstable page tracking */
+       if (cli->cl_cache == NULL)
+               return;
+
+       LASSERT(page_count >= 0);
+
+       for (i = 0; i < page_count; i++)
+               inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+       LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+       cfs_atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr);
+
+       LASSERT(cfs_atomic_read(&cli->cl_unstable_count) >= 0);
+       cfs_atomic_add(page_count, &cli->cl_unstable_count);
+
+       LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+       cfs_atomic_add(page_count, &obd_unstable_pages);
+
+       spin_lock(&req->rq_lock);
+
+       /* If the request has already been committed (i.e. brw_commit
+        * called via rq_commit_cb), we need to undo the unstable page
+        * increments we just performed because rq_commit_cb wont be
+        * called again. Otherwise, just set the commit callback so the
+        * unstable page accounting is properly updated when the request
+        * is committed */
+       if (req->rq_committed) {
+               /* Drop lock before calling osc_dec_unstable_pages */
+               spin_unlock(&req->rq_lock);
+               osc_dec_unstable_pages(req);
+               spin_lock(&req->rq_lock);
+       } else {
+               req->rq_unstable  = 1;
+               req->rq_commit_cb = osc_dec_unstable_pages;
+       }
+
+       spin_unlock(&req->rq_lock);
+}
+
 /* this must be called holding the loi list lock to give coverage to exit_cache,
  * async_flag maintenance, and oap_request */
 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
 /* this must be called holding the loi list lock to give coverage to exit_cache,
  * async_flag maintenance, and oap_request */
 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
@@ -1786,6 +1876,9 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
 
        ENTRY;
        if (oap->oap_request != NULL) {
 
        ENTRY;
        if (oap->oap_request != NULL) {
+               if (rc == 0)
+                       osc_inc_unstable_pages(oap->oap_request);
+
                xid = ptlrpc_req_xid(oap->oap_request);
                ptlrpc_req_finished(oap->oap_request);
                oap->oap_request = NULL;
                xid = ptlrpc_req_xid(oap->oap_request);
                ptlrpc_req_finished(oap->oap_request);
                oap->oap_request = NULL;
@@ -1832,7 +1925,7 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
                RETURN(0);
 
        cfs_list_for_each_entry(tmp, rpclist, oe_link) {
                RETURN(0);
 
        cfs_list_for_each_entry(tmp, rpclist, oe_link) {
-               EASSERT(tmp->oe_owner == cfs_current(), tmp);
+               EASSERT(tmp->oe_owner == current, tmp);
 #if 0
                if (overlapped(tmp, ext)) {
                        OSC_EXTENT_DUMP(D_ERROR, tmp, "overlapped %p.\n", ext);
 #if 0
                if (overlapped(tmp, ext)) {
                        OSC_EXTENT_DUMP(D_ERROR, tmp, "overlapped %p.\n", ext);
@@ -1850,7 +1943,7 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
 
        *pc += ext->oe_nr_pages;
        cfs_list_move_tail(&ext->oe_link, rpclist);
 
        *pc += ext->oe_nr_pages;
        cfs_list_move_tail(&ext->oe_link, rpclist);
-       ext->oe_owner = cfs_current();
+       ext->oe_owner = current;
        RETURN(1);
 }
 
        RETURN(1);
 }
 
@@ -2103,7 +2196,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
                cl_object_get(obj);
                client_obd_list_unlock(&cli->cl_loi_list_lock);
                lu_object_ref_add_at(&obj->co_lu, &link, "check",
                cl_object_get(obj);
                client_obd_list_unlock(&cli->cl_loi_list_lock);
                lu_object_ref_add_at(&obj->co_lu, &link, "check",
-                                    cfs_current());
+                                    current);
 
                /* attempt some read/write balancing by alternating between
                 * reads and writes in an object.  The makes_rpc checks here
 
                /* attempt some read/write balancing by alternating between
                 * reads and writes in an object.  The makes_rpc checks here
@@ -2145,7 +2238,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
 
                osc_list_maint(cli, osc);
                lu_object_ref_del_at(&obj->co_lu, &link, "check",
 
                osc_list_maint(cli, osc);
                lu_object_ref_del_at(&obj->co_lu, &link, "check",
-                                    cfs_current());
+                                    current);
                cl_object_put(env, obj);
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
                cl_object_put(env, obj);
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
@@ -2274,6 +2367,9 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                        RETURN(rc);
        }
 
                        RETURN(rc);
        }
 
+       if (osc_over_unstable_soft_limit(cli))
+               brw_flags |= OBD_BRW_SOFT_SYNC;
+
        oap->oap_cmd = cmd;
        oap->oap_page_off = ops->ops_from;
        oap->oap_count = ops->ops_to - ops->ops_from;
        oap->oap_cmd = cmd;
        oap->oap_page_off = ops->ops_from;
        oap->oap_count = ops->ops_to - ops->ops_from;
@@ -2879,7 +2975,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
                                EASSERT(ext->oe_start >= start &&
                                        ext->oe_max_end <= end, ext);
                                osc_extent_state_set(ext, OES_LOCKING);
                                EASSERT(ext->oe_start >= start &&
                                        ext->oe_max_end <= end, ext);
                                osc_extent_state_set(ext, OES_LOCKING);
-                               ext->oe_owner = cfs_current();
+                               ext->oe_owner = current;
                                cfs_list_move_tail(&ext->oe_link,
                                                   &discard_list);
                                osc_update_pending(obj, OBD_BRW_WRITE,
                                cfs_list_move_tail(&ext->oe_link,
                                                   &discard_list);
                                osc_update_pending(obj, OBD_BRW_WRITE,