Whamcloud - gitweb
LU-3188 osc: shorten IO calling path
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index d4765f2..e7bfc47 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011 Whamcloud, Inc.
+ * Copyright (c) 2012, 2013, Intel Corporation.
  *
  */
 /*
@@ -1283,8 +1283,6 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        opg->ops_submit_time = 0;
        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
 
-       cl_page_completion(env, page, crt, rc);
-
        /* statistic */
        if (rc == 0 && srvlock) {
                struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
@@ -1303,22 +1301,21 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
         * reference counter protects page from concurrent reclaim.
         */
        lu_ref_del(&page->cp_reference, "transfer", page);
-       /*
-        * As page->cp_obj is pinned by a reference from page->cp_req, it is
-        * safe to call cl_page_put() without risking object destruction in a
-        * non-blocking context.
-        */
-       cl_page_put(env, page);
+
+       cl_page_completion(env, page, crt, rc);
+
        RETURN(0);
 }
 
 #define OSC_DUMP_GRANT(cli, fmt, args...) do {                               \
        struct client_obd *__tmp = (cli);                                     \
        CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d "            \
-              "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " fmt,   \
+              "unstable_pages: %d/%d dropped: %ld avail: %ld, "              \
+              "reserved: %ld, flight: %d } " fmt,                            \
               __tmp->cl_import->imp_obd->obd_name,                           \
               __tmp->cl_dirty, __tmp->cl_dirty_max,                          \
               cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages,        \
+              cfs_atomic_read(&obd_unstable_pages), obd_max_dirty_pages,     \
               __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
               __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args);      \
 } while (0)
@@ -1393,8 +1390,6 @@ static void __osc_unreserve_grant(struct client_obd *cli,
        } else {
                cli->cl_avail_grant += unused;
        }
-       if (unused > 0)
-               osc_wake_cache_waiters(cli);
 }
 
 void osc_unreserve_grant(struct client_obd *cli,
@@ -1402,6 +1397,8 @@ void osc_unreserve_grant(struct client_obd *cli,
 {
        client_obd_list_lock(&cli->cl_loi_list_lock);
        __osc_unreserve_grant(cli, reserved, unused);
+       if (unused > 0)
+               osc_wake_cache_waiters(cli);
        client_obd_list_unlock(&cli->cl_loi_list_lock);
 }
 
@@ -1468,7 +1465,8 @@ static int osc_enter_cache_try(struct client_obd *cli,
                return 0;
 
        if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
-           cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
+           cfs_atomic_read(&obd_unstable_pages) + 1 +
+           cfs_atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
                osc_consume_write_grant(cli, &oap->oap_brw_page);
                if (transient) {
                        cli->cl_dirty_transit += CFS_PAGE_SIZE;
@@ -1483,6 +1481,15 @@ static int osc_enter_cache_try(struct client_obd *cli,
        return rc;
 }
 
+static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
+{
+       int rc;
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       rc = cfs_list_empty(&ocw->ocw_entry);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       return rc;
+}
+
 /**
  * The main entry to reserve dirty page accounting. Usually the grant reserved
  * in this function will be freed in bulk in osc_free_grant() unless it fails
@@ -1529,29 +1536,30 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
                ocw.ocw_rc = 0;
                client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-               osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND);
+               osc_io_unplug_async(env, cli, NULL);
 
                CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
                       cli->cl_import->imp_obd->obd_name, &ocw, oap);
 
-               rc = l_wait_event(ocw.ocw_waitq,
-                                 cfs_list_empty(&ocw.ocw_entry), &lwi);
+               rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
-               cfs_list_del_init(&ocw.ocw_entry);
-               if (rc < 0)
-                       break;
 
+               /* l_wait_event is interrupted by signal */
+               if (rc < 0) {
+                       cfs_list_del_init(&ocw.ocw_entry);
+                       GOTO(out, rc);
+               }
+
+               LASSERT(cfs_list_empty(&ocw.ocw_entry));
                rc = ocw.ocw_rc;
+
                if (rc != -EDQUOT)
-                       break;
-               if (osc_enter_cache_try(cli, oap, bytes, 0)) {
-                       rc = 0;
-                       break;
-               }
+                       GOTO(out, rc);
+               if (osc_enter_cache_try(cli, oap, bytes, 0))
+                       GOTO(out, rc = 0);
        }
        EXIT;
-
 out:
        client_obd_list_unlock(&cli->cl_loi_list_lock);
        OSC_DUMP_GRANT(cli, "returned %d.\n", rc);
@@ -1566,31 +1574,25 @@ void osc_wake_cache_waiters(struct client_obd *cli)
 
        ENTRY;
        cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
-               /* if we can't dirty more, we must wait until some is written */
-               if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
-                   (cfs_atomic_read(&obd_dirty_pages) + 1 >
-                    obd_max_dirty_pages)) {
+               ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
+               cfs_list_del_init(&ocw->ocw_entry);
+
+               ocw->ocw_rc = -EDQUOT;
+               /* we can't dirty more */
+               if (cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max ||
+                   cfs_atomic_read(&obd_unstable_pages) + 1 +
+                   cfs_atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
                               "osc max %ld, sys max %d\n", cli->cl_dirty,
                               cli->cl_dirty_max, obd_max_dirty_pages);
-                       return;
+                       goto wakeup;
                }
 
-               /* if still dirty cache but no grant wait for pending RPCs that
-                * may yet return us some grant before doing sync writes */
-               if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
-                       CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
-                              cli->cl_w_in_flight);
-                       return;
-               }
-
-               ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
-               cfs_list_del_init(&ocw->ocw_entry);
-
                ocw->ocw_rc = 0;
                if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
                        ocw->ocw_rc = -EDQUOT;
 
+wakeup:
                CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
                       ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
 
@@ -1747,6 +1749,85 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
                ar->ar_force_sync = 0;
 }
 
+/* Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable. */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+       struct ptlrpc_bulk_desc *desc       = req->rq_bulk;
+       struct client_obd       *cli        = &req->rq_import->imp_obd->u.cli;
+       obd_count                page_count = desc->bd_iov_count;
+       int i;
+
+       /* No unstable page tracking */
+       if (cli->cl_cache == NULL)
+               return;
+
+       LASSERT(page_count >= 0);
+
+       for (i = 0; i < page_count; i++)
+               dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+       cfs_atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr);
+       LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+
+       cfs_atomic_sub(page_count, &obd_unstable_pages);
+       LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+
+       spin_lock(&req->rq_lock);
+       req->rq_committed = 1;
+       req->rq_unstable  = 0;
+       spin_unlock(&req->rq_lock);
+
+       cfs_waitq_broadcast(&cli->cl_cache->ccc_unstable_waitq);
+}
+
+/* "unstable" page accounting. See: osc_dec_unstable_pages. */
+void osc_inc_unstable_pages(struct ptlrpc_request *req)
+{
+       struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+       struct client_obd       *cli  = &req->rq_import->imp_obd->u.cli;
+       obd_count                page_count = desc->bd_iov_count;
+       int i;
+
+       /* No unstable page tracking */
+       if (cli->cl_cache == NULL)
+               return;
+
+       LASSERT(page_count >= 0);
+
+       for (i = 0; i < page_count; i++)
+               inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+       LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+       cfs_atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr);
+
+       LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+       cfs_atomic_add(page_count, &obd_unstable_pages);
+
+       spin_lock(&req->rq_lock);
+
+       /* If the request has already been committed (i.e. brw_commit
+        * called via rq_commit_cb), we need to undo the unstable page
+        * increments we just performed because rq_commit_cb wont be
+        * called again. Otherwise, just set the commit callback so the
+        * unstable page accounting is properly updated when the request
+        * is committed */
+       if (req->rq_committed) {
+               /* Drop lock before calling osc_dec_unstable_pages */
+               spin_unlock(&req->rq_lock);
+               osc_dec_unstable_pages(req);
+               spin_lock(&req->rq_lock);
+       } else {
+               req->rq_unstable  = 1;
+               req->rq_commit_cb = osc_dec_unstable_pages;
+       }
+
+       spin_unlock(&req->rq_lock);
+}
+
 /* this must be called holding the loi list lock to give coverage to exit_cache,
  * async_flag maintenance, and oap_request */
 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
@@ -1758,6 +1839,9 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
 
        ENTRY;
        if (oap->oap_request != NULL) {
+               if (rc == 0)
+                       osc_inc_unstable_pages(oap->oap_request);
+
                xid = ptlrpc_req_xid(oap->oap_request);
                ptlrpc_req_finished(oap->oap_request);
                oap->oap_request = NULL;
@@ -2133,7 +2217,11 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
                has_rpcs = __osc_list_maint(cli, osc);
        if (has_rpcs) {
                if (!async) {
+                       /* disable osc_lru_shrink() temporarily to avoid
+                        * potential stack overrun problem. LU-2859 */
+                       cfs_atomic_inc(&cli->cl_lru_shrinkers);
                        osc_check_rpcs(env, cli, pol);
+                       cfs_atomic_dec(&cli->cl_lru_shrinkers);
                } else {
                        CDEBUG(D_CACHE, "Queue writeback work for client %p.\n",
                               cli);
@@ -2444,10 +2532,11 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
                ext->oe_memalloc = 1;
 
        ext->oe_urgent = 1;
-       if (ext->oe_state == OES_CACHE && cfs_list_empty(&ext->oe_link)) {
+       if (ext->oe_state == OES_CACHE) {
                OSC_EXTENT_DUMP(D_CACHE, ext,
                                "flush page %p make it urgent.\n", oap);
-               cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+               if (cfs_list_empty(&ext->oe_link))
+                       cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
                unplug = true;
        }
        rc = 0;
@@ -2579,7 +2668,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
        }
        osc_object_unlock(obj);
 
-       osc_io_unplug(env, cli, obj, PDL_POLICY_ROUND);
+       osc_io_unplug_async(env, cli, obj);
        RETURN(0);
 }
 
@@ -2624,6 +2713,8 @@ again:
                        break;
                }
 
+               OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:"LPU64".\n", size);
+
                osc_extent_get(ext);
                if (ext->oe_state == OES_ACTIVE) {
                        /* though we grab inode mutex for write path, but we
@@ -2688,13 +2779,17 @@ again:
                osc_extent_put(env, ext);
        }
        if (waiting != NULL) {
-               if (result == 0)
-                       result = osc_extent_wait(env, waiting, OES_INV);
+               int rc;
+
+               /* ignore the result of osc_extent_wait the write initiator
+                * should take care of it. */
+               rc = osc_extent_wait(env, waiting, OES_INV);
+               if (rc < 0)
+                       OSC_EXTENT_DUMP(D_CACHE, ext, "wait error: %d.\n", rc);
 
                osc_extent_put(env, waiting);
                waiting = NULL;
-               if (result == 0)
-                       goto again;
+               goto again;
        }
        RETURN(result);
 }
@@ -2830,10 +2925,9 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
                                        ext->oe_urgent = 1;
                                        list = &obj->oo_urgent_exts;
                                }
-                               if (list != NULL) {
+                               if (list != NULL)
                                        cfs_list_move_tail(&ext->oe_link, list);
-                                       unplug = true;
-                               }
+                               unplug = true;
                        } else {
                                /* the only discarder is lock cancelling, so
                                 * [start, end] must contain this extent */