Whamcloud - gitweb
LU-3321 clio: add pages into writeback cache in batch
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index d11ff0e..3dbb375 100644 (file)
@@ -811,6 +811,8 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
 
        ext->oe_rc = rc ?: ext->oe_nr_pages;
        EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext);
+
+       osc_lru_add_batch(cli, &ext->oe_pages);
        cfs_list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
                                     oap_pending_item) {
                cfs_list_del_init(&oap->oap_rpc_item);
@@ -832,9 +834,9 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                /* For short writes we shouldn't count parts of pages that
                 * span a whole chunk on the OST side, or our accounting goes
                 * wrong.  Should match the code in filter_grant_check. */
-               int offset = oap->oap_page_off & ~CFS_PAGE_MASK;
-               int count = oap->oap_count + (offset & (blocksize - 1));
-               int end = (offset + oap->oap_count) & (blocksize - 1);
+               int offset = last_off & ~CFS_PAGE_MASK;
+               int count = last_count + (offset & (blocksize - 1));
+               int end = (offset + last_count) & (blocksize - 1);
                if (end)
                        count += blocksize - end;
 
@@ -964,10 +966,9 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                cfs_list_del_init(&oap->oap_pending_item);
 
                cl_page_get(page);
-               lu_ref_add(&page->cp_reference, "truncate", cfs_current());
+               lu_ref_add(&page->cp_reference, "truncate", current);
 
                if (cl_page_own(env, io, page) == 0) {
-                       cl_page_unmap(env, io, page);
                        cl_page_discard(env, io, page);
                        cl_page_disown(env, io, page);
                } else {
@@ -975,7 +976,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                        LASSERT(0);
                }
 
-               lu_ref_del(&page->cp_reference, "truncate", cfs_current());
+               lu_ref_del(&page->cp_reference, "truncate", current);
                cl_page_put(env, page);
 
                --ext->oe_nr_pages;
@@ -1263,8 +1264,10 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        ENTRY;
 
        cmd &= ~OBD_BRW_NOQUOTA;
-       LASSERT(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ));
-       LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
+       LASSERTF(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ),
+                "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
+       LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE),
+               "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
        LASSERT(opg->ops_transfer_pinned);
 
        /*
@@ -1316,11 +1319,13 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
 #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do {                          \
        struct client_obd *__tmp = (cli);                                     \
        CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d "          \
-              "dropped: %ld avail: %ld, reserved: %ld, flight: %d } "        \
-              "lru {in list: %d, left: %d, waiters: %d }" fmt,               \
+              "unstable_pages: %d/%d dropped: %ld avail: %ld, "              \
+              "reserved: %ld, flight: %d } lru {in list: %d, "               \
+              "left: %d, waiters: %d }" fmt,                                 \
               __tmp->cl_import->imp_obd->obd_name,                           \
               __tmp->cl_dirty, __tmp->cl_dirty_max,                          \
               cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages,        \
+              cfs_atomic_read(&obd_unstable_pages), obd_max_dirty_pages,     \
               __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
               __tmp->cl_reserved_grant, __tmp->cl_w_in_flight,               \
               cfs_atomic_read(&__tmp->cl_lru_in_list),                       \
@@ -1473,7 +1478,8 @@ static int osc_enter_cache_try(struct client_obd *cli,
                return 0;
 
        if (cli->cl_dirty + PAGE_CACHE_SIZE <= cli->cl_dirty_max &&
-           cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
+           cfs_atomic_read(&obd_unstable_pages) + 1 +
+           cfs_atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
                osc_consume_write_grant(cli, &oap->oap_brw_page);
                if (transient) {
                        cli->cl_dirty_transit += PAGE_CACHE_SIZE;
@@ -1606,8 +1612,8 @@ void osc_wake_cache_waiters(struct client_obd *cli)
                ocw->ocw_rc = -EDQUOT;
                /* we can't dirty more */
                if ((cli->cl_dirty + PAGE_CACHE_SIZE > cli->cl_dirty_max) ||
-                   (cfs_atomic_read(&obd_dirty_pages) + 1 >
-                    obd_max_dirty_pages)) {
+                   (cfs_atomic_read(&obd_unstable_pages) + 1 +
+                    cfs_atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) {
                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
                               "osc max %ld, sys max %d\n", cli->cl_dirty,
                               cli->cl_dirty_max, obd_max_dirty_pages);
@@ -1775,6 +1781,91 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
                ar->ar_force_sync = 0;
 }
 
+/* Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable. */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+       struct ptlrpc_bulk_desc *desc       = req->rq_bulk;
+       struct client_obd       *cli        = &req->rq_import->imp_obd->u.cli;
+       obd_count                page_count = desc->bd_iov_count;
+       int i;
+
+       /* No unstable page tracking */
+       if (cli->cl_cache == NULL)
+               return;
+
+       LASSERT(page_count >= 0);
+
+       for (i = 0; i < page_count; i++)
+               dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+       cfs_atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr);
+       LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+
+       cfs_atomic_sub(page_count, &cli->cl_unstable_count);
+       LASSERT(cfs_atomic_read(&cli->cl_unstable_count) >= 0);
+
+       cfs_atomic_sub(page_count, &obd_unstable_pages);
+       LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+
+       spin_lock(&req->rq_lock);
+       req->rq_committed = 1;
+       req->rq_unstable  = 0;
+       spin_unlock(&req->rq_lock);
+
+       wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
+}
+
+/* "unstable" page accounting. See: osc_dec_unstable_pages. */
+void osc_inc_unstable_pages(struct ptlrpc_request *req)
+{
+       struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+       struct client_obd       *cli  = &req->rq_import->imp_obd->u.cli;
+       obd_count                page_count = desc->bd_iov_count;
+       int i;
+
+       /* No unstable page tracking */
+       if (cli->cl_cache == NULL)
+               return;
+
+       LASSERT(page_count >= 0);
+
+       for (i = 0; i < page_count; i++)
+               inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+       LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+       cfs_atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr);
+
+       LASSERT(cfs_atomic_read(&cli->cl_unstable_count) >= 0);
+       cfs_atomic_add(page_count, &cli->cl_unstable_count);
+
+       LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+       cfs_atomic_add(page_count, &obd_unstable_pages);
+
+       spin_lock(&req->rq_lock);
+
+       /* If the request has already been committed (i.e. brw_commit
+        * called via rq_commit_cb), we need to undo the unstable page
+        * increments we just performed because rq_commit_cb wont be
+        * called again. Otherwise, just set the commit callback so the
+        * unstable page accounting is properly updated when the request
+        * is committed */
+       if (req->rq_committed) {
+               /* Drop lock before calling osc_dec_unstable_pages */
+               spin_unlock(&req->rq_lock);
+               osc_dec_unstable_pages(req);
+               spin_lock(&req->rq_lock);
+       } else {
+               req->rq_unstable  = 1;
+               req->rq_commit_cb = osc_dec_unstable_pages;
+       }
+
+       spin_unlock(&req->rq_lock);
+}
+
 /* this must be called holding the loi list lock to give coverage to exit_cache,
  * async_flag maintenance, and oap_request */
 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
@@ -1786,6 +1877,9 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
 
        ENTRY;
        if (oap->oap_request != NULL) {
+               if (rc == 0)
+                       osc_inc_unstable_pages(oap->oap_request);
+
                xid = ptlrpc_req_xid(oap->oap_request);
                ptlrpc_req_finished(oap->oap_request);
                oap->oap_request = NULL;
@@ -1832,7 +1926,7 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
                RETURN(0);
 
        cfs_list_for_each_entry(tmp, rpclist, oe_link) {
-               EASSERT(tmp->oe_owner == cfs_current(), tmp);
+               EASSERT(tmp->oe_owner == current, tmp);
 #if 0
                if (overlapped(tmp, ext)) {
                        OSC_EXTENT_DUMP(D_ERROR, tmp, "overlapped %p.\n", ext);
@@ -1850,7 +1944,7 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
 
        *pc += ext->oe_nr_pages;
        cfs_list_move_tail(&ext->oe_link, rpclist);
-       ext->oe_owner = cfs_current();
+       ext->oe_owner = current;
        RETURN(1);
 }
 
@@ -2102,8 +2196,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
 
                cl_object_get(obj);
                client_obd_list_unlock(&cli->cl_loi_list_lock);
-               lu_object_ref_add_at(&obj->co_lu, &link, "check",
-                                    cfs_current());
+               lu_object_ref_add_at(&obj->co_lu, &link, "check", current);
 
                /* attempt some read/write balancing by alternating between
                 * reads and writes in an object.  The makes_rpc checks here
@@ -2144,8 +2237,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
                osc_object_unlock(osc);
 
                osc_list_maint(cli, osc);
-               lu_object_ref_del_at(&obj->co_lu, &link, "check",
-                                    cfs_current());
+               lu_object_ref_del_at(&obj->co_lu, &link, "check", current);
                cl_object_put(env, obj);
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
@@ -2274,6 +2366,9 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                        RETURN(rc);
        }
 
+       if (osc_over_unstable_soft_limit(cli))
+               brw_flags |= OBD_BRW_SOFT_SYNC;
+
        oap->oap_cmd = cmd;
        oap->oap_page_off = ops->ops_from;
        oap->oap_count = ops->ops_to - ops->ops_from;
@@ -2879,7 +2974,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
                                EASSERT(ext->oe_start >= start &&
                                        ext->oe_max_end <= end, ext);
                                osc_extent_state_set(ext, OES_LOCKING);
-                               ext->oe_owner = cfs_current();
+                               ext->oe_owner = current;
                                cfs_list_move_tail(&ext->oe_link,
                                                   &discard_list);
                                osc_update_pending(obj, OBD_BRW_WRITE,
@@ -2944,4 +3039,206 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
        RETURN(result);
 }
 
+/**
+ * Returns a list of pages by a given [start, end] of \a obj.
+ *
+ * \param resched If not NULL, then we give up before hogging CPU for too
+ * long and set *resched = 1, in that case caller should implement a retry
+ * logic.
+ *
+ * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
+ * crucial in the face of [offset, EOF] locks.
+ *
+ * Return at least one page in @queue unless there is no covered page.
+ */
+int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
+                       struct osc_object *osc, pgoff_t start, pgoff_t end,
+                       osc_page_gang_cbt cb, void *cbdata)
+{
+       struct osc_page *ops;
+       void            **pvec;
+       pgoff_t         idx;
+       unsigned int    nr;
+       unsigned int    i;
+       unsigned int    j;
+       int             res = CLP_GANG_OKAY;
+       bool            tree_lock = true;
+       ENTRY;
+
+       idx = start;
+       pvec = osc_env_info(env)->oti_pvec;
+       spin_lock(&osc->oo_tree_lock);
+       while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
+                                           idx, OTI_PVEC_SIZE)) > 0) {
+               struct cl_page *page;
+               bool end_of_region = false;
+
+               for (i = 0, j = 0; i < nr; ++i) {
+                       ops = pvec[i];
+                       pvec[i] = NULL;
+
+                       idx = osc_index(ops);
+                       if (idx > end) {
+                               end_of_region = true;
+                               break;
+                       }
+
+                       page = cl_page_top(ops->ops_cl.cpl_page);
+                       LASSERT(page->cp_type == CPT_CACHEABLE);
+                       if (page->cp_state == CPS_FREEING)
+                               continue;
+
+                       cl_page_get(page);
+                       lu_ref_add_atomic(&page->cp_reference,
+                                         "gang_lookup", current);
+                       pvec[j++] = ops;
+               }
+               ++idx;
+
+               /*
+                * Here a delicate locking dance is performed. Current thread
+                * holds a reference to a page, but has to own it before it
+                * can be placed into queue. Owning implies waiting, so
+                * radix-tree lock is to be released. After a wait one has to
+                * check that pages weren't truncated (cl_page_own() returns
+                * error in the latter case).
+                */
+               spin_unlock(&osc->oo_tree_lock);
+               tree_lock = false;
+
+               for (i = 0; i < j; ++i) {
+                       ops = pvec[i];
+                       if (res == CLP_GANG_OKAY)
+                               res = (*cb)(env, io, ops, cbdata);
+
+                       page = cl_page_top(ops->ops_cl.cpl_page);
+                       lu_ref_del(&page->cp_reference, "gang_lookup", current);
+                       cl_page_put(env, page);
+               }
+               if (nr < OTI_PVEC_SIZE || end_of_region)
+                       break;
+
+               if (res == CLP_GANG_OKAY && need_resched())
+                       res = CLP_GANG_RESCHED;
+               if (res != CLP_GANG_OKAY)
+                       break;
+
+               spin_lock(&osc->oo_tree_lock);
+               tree_lock = true;
+       }
+       if (tree_lock)
+               spin_unlock(&osc->oo_tree_lock);
+       RETURN(res);
+}
+
+/**
+ * Check if page @page is covered by an extra lock or discard it.
+ */
+static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
+                               struct osc_page *ops, void *cbdata)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_lock *lock = cbdata;
+       pgoff_t index;
+
+       index = osc_index(ops);
+       if (index >= info->oti_fn_index) {
+               struct cl_lock *tmp;
+               struct cl_page *page = cl_page_top(ops->ops_cl.cpl_page);
+
+               /* refresh non-overlapped index */
+               tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
+                                      lock, 1, 0);
+               if (tmp != NULL) {
+                       /* Cache the first-non-overlapped index so as to skip
+                        * all pages within [index, oti_fn_index). This
+                        * is safe because if tmp lock is canceled, it will
+                        * discard these pages. */
+                       info->oti_fn_index = tmp->cll_descr.cld_end + 1;
+                       if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
+                               info->oti_fn_index = CL_PAGE_EOF;
+                       cl_lock_put(env, tmp);
+               } else if (cl_page_own(env, io, page) == 0) {
+                       /* discard the page */
+                       cl_page_discard(env, io, page);
+                       cl_page_disown(env, io, page);
+               } else {
+                       LASSERT(page->cp_state == CPS_FREEING);
+               }
+       }
+
+       info->oti_next_index = index + 1;
+       return CLP_GANG_OKAY;
+}
+
+static int discard_cb(const struct lu_env *env, struct cl_io *io,
+                     struct osc_page *ops, void *cbdata)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_lock *lock = cbdata;
+       struct cl_page *page = cl_page_top(ops->ops_cl.cpl_page);
+
+       LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
+
+       /* page is top page. */
+       info->oti_next_index = osc_index(ops) + 1;
+       if (cl_page_own(env, io, page) == 0) {
+               KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
+                             !PageDirty(cl_page_vmpage(env, page))));
+
+               /* discard the page */
+               cl_page_discard(env, io, page);
+               cl_page_disown(env, io, page);
+       } else {
+               LASSERT(page->cp_state == CPS_FREEING);
+       }
+
+       return CLP_GANG_OKAY;
+}
+
+/**
+ * Discard pages protected by the given lock. This function traverses radix
+ * tree to find all covering pages and discard them. If a page is being covered
+ * by other locks, it should remain in cache.
+ *
+ * If error happens on any step, the process continues anyway (the reasoning
+ * behind this being that lock cancellation cannot be delayed indefinitely).
+ */
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *ols)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_io *io = &info->oti_io;
+       struct cl_object *osc = ols->ols_cl.cls_obj;
+       struct cl_lock *lock = ols->ols_cl.cls_lock;
+       struct cl_lock_descr *descr = &lock->cll_descr;
+       osc_page_gang_cbt cb;
+       int res;
+       int result;
+
+       ENTRY;
+
+       io->ci_obj = cl_object_top(osc);
+       io->ci_ignore_layout = 1;
+       result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+       if (result != 0)
+               GOTO(out, result);
+
+       cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
+       info->oti_fn_index = info->oti_next_index = descr->cld_start;
+       do {
+               res = osc_page_gang_lookup(env, io, cl2osc(osc),
+                                          info->oti_next_index, descr->cld_end,
+                                          cb, (void *)lock);
+               if (info->oti_next_index > descr->cld_end)
+                       break;
+
+               if (res == CLP_GANG_RESCHED)
+                       cond_resched();
+       } while (res != CLP_GANG_OKAY);
+out:
+       cl_io_fini(env, io);
+       RETURN(result);
+}
+
+
 /** @} osc */