Whamcloud - gitweb
LU-3321 clio: collapse layer of cl_page
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index 94cc695..a7697f3 100644 (file)
@@ -969,7 +969,6 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                lu_ref_add(&page->cp_reference, "truncate", current);
 
                if (cl_page_own(env, io, page) == 0) {
-                       cl_page_unmap(env, io, page);
                        cl_page_discard(env, io, page);
                        cl_page_disown(env, io, page);
                } else {
@@ -2197,8 +2196,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
 
                cl_object_get(obj);
                client_obd_list_unlock(&cli->cl_loi_list_lock);
-               lu_object_ref_add_at(&obj->co_lu, &link, "check",
-                                    current);
+               lu_object_ref_add_at(&obj->co_lu, &link, "check", current);
 
                /* attempt some read/write balancing by alternating between
                 * reads and writes in an object.  The makes_rpc checks here
@@ -2239,8 +2237,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
                osc_object_unlock(osc);
 
                osc_list_maint(cli, osc);
-               lu_object_ref_del_at(&obj->co_lu, &link, "check",
-                                    current);
+               lu_object_ref_del_at(&obj->co_lu, &link, "check", current);
                cl_object_put(env, obj);
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
@@ -3042,4 +3039,207 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
        RETURN(result);
 }
 
+/**
+ * Returns a list of pages by a given [start, end] of \a obj.
+ *
+ * \param resched If not NULL, then we give up before hogging CPU for too
+ * long and set *resched = 1, in that case caller should implement a retry
+ * logic.
+ *
+ * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
+ * crucial in the face of [offset, EOF] locks.
+ *
+ * Return at least one page in @queue unless there is no covered page.
+ */
+int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
+                       struct osc_object *osc, pgoff_t start, pgoff_t end,
+                       osc_page_gang_cbt cb, void *cbdata)
+{
+       struct osc_page *ops;
+       void            **pvec;
+       pgoff_t         idx;
+       unsigned int    nr;
+       unsigned int    i;
+       unsigned int    j;
+       int             res = CLP_GANG_OKAY;
+       bool            tree_lock = true;
+       ENTRY;
+
+       idx = start;
+       pvec = osc_env_info(env)->oti_pvec;
+       spin_lock(&osc->oo_tree_lock);
+       while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
+                                           idx, OTI_PVEC_SIZE)) > 0) {
+               struct cl_page *page;
+               bool end_of_region = false;
+
+               for (i = 0, j = 0; i < nr; ++i) {
+                       ops = pvec[i];
+                       pvec[i] = NULL;
+
+                       idx = osc_index(ops);
+                       if (idx > end) {
+                               end_of_region = true;
+                               break;
+                       }
+
+                       page = cl_page_top(ops->ops_cl.cpl_page);
+                       LASSERT(page->cp_type == CPT_CACHEABLE);
+                       if (page->cp_state == CPS_FREEING)
+                               continue;
+
+                       cl_page_get(page);
+                       lu_ref_add_atomic(&page->cp_reference,
+                                         "gang_lookup", current);
+                       pvec[j++] = ops;
+               }
+               ++idx;
+
+               /*
+                * Here a delicate locking dance is performed. Current thread
+                * holds a reference to a page, but has to own it before it
+                * can be placed into queue. Owning implies waiting, so
+                * radix-tree lock is to be released. After a wait one has to
+                * check that pages weren't truncated (cl_page_own() returns
+                * error in the latter case).
+                */
+               spin_unlock(&osc->oo_tree_lock);
+               tree_lock = false;
+
+               for (i = 0; i < j; ++i) {
+                       ops = pvec[i];
+                       if (res == CLP_GANG_OKAY)
+                               res = (*cb)(env, io, ops, cbdata);
+
+                       page = cl_page_top(ops->ops_cl.cpl_page);
+                       lu_ref_del(&page->cp_reference, "gang_lookup", current);
+                       cl_page_put(env, page);
+               }
+               if (nr < OTI_PVEC_SIZE || end_of_region)
+                       break;
+
+               if (res == CLP_GANG_OKAY && need_resched())
+                       res = CLP_GANG_RESCHED;
+               if (res != CLP_GANG_OKAY)
+                       break;
+
+               spin_lock(&osc->oo_tree_lock);
+               tree_lock = true;
+       }
+       if (tree_lock)
+               spin_unlock(&osc->oo_tree_lock);
+       RETURN(res);
+}
+
+/**
+ * Check if page @page is covered by an extra lock or discard it.
+ */
+static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
+                               struct osc_page *ops, void *cbdata)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_lock *lock = cbdata;
+       pgoff_t index;
+
+       index = osc_index(ops);
+       if (index >= info->oti_fn_index) {
+               struct cl_lock *tmp;
+               struct cl_page *page = cl_page_top(ops->ops_cl.cpl_page);
+
+               /* refresh non-overlapped index */
+               tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
+                                      lock, 1, 0);
+               if (tmp != NULL) {
+                       /* Cache the first-non-overlapped index so as to skip
+                        * all pages within [index, oti_fn_index). This
+                        * is safe because if tmp lock is canceled, it will
+                        * discard these pages. */
+                       info->oti_fn_index = tmp->cll_descr.cld_end + 1;
+                       if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
+                               info->oti_fn_index = CL_PAGE_EOF;
+                       cl_lock_put(env, tmp);
+               } else if (cl_page_own(env, io, page) == 0) {
+                       /* discard the page */
+                       cl_page_discard(env, io, page);
+                       cl_page_disown(env, io, page);
+               } else {
+                       LASSERT(page->cp_state == CPS_FREEING);
+               }
+       }
+
+       info->oti_next_index = index + 1;
+       return CLP_GANG_OKAY;
+}
+
+static int discard_cb(const struct lu_env *env, struct cl_io *io,
+                     struct osc_page *ops, void *cbdata)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_lock *lock = cbdata;
+       struct cl_page *page = cl_page_top(ops->ops_cl.cpl_page);
+
+       LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
+       KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
+                     !PageWriteback(cl_page_vmpage(env, page))));
+       KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
+                     !PageDirty(cl_page_vmpage(env, page))));
+
+       /* page is top page. */
+       info->oti_next_index = osc_index(ops) + 1;
+       if (cl_page_own(env, io, page) == 0) {
+               /* discard the page */
+               cl_page_discard(env, io, page);
+               cl_page_disown(env, io, page);
+       } else {
+               LASSERT(page->cp_state == CPS_FREEING);
+       }
+
+       return CLP_GANG_OKAY;
+}
+
+/**
+ * Discard pages protected by the given lock. This function traverses radix
+ * tree to find all covering pages and discard them. If a page is being covered
+ * by other locks, it should remain in cache.
+ *
+ * If error happens on any step, the process continues anyway (the reasoning
+ * behind this being that lock cancellation cannot be delayed indefinitely).
+ */
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *ols)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_io *io = &info->oti_io;
+       struct cl_object *osc = ols->ols_cl.cls_obj;
+       struct cl_lock *lock = ols->ols_cl.cls_lock;
+       struct cl_lock_descr *descr = &lock->cll_descr;
+       osc_page_gang_cbt cb;
+       int res;
+       int result;
+
+       ENTRY;
+
+       io->ci_obj = cl_object_top(osc);
+       io->ci_ignore_layout = 1;
+       result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+       if (result != 0)
+               GOTO(out, result);
+
+       cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
+       info->oti_fn_index = info->oti_next_index = descr->cld_start;
+       do {
+               res = osc_page_gang_lookup(env, io, cl2osc(osc),
+                                          info->oti_next_index, descr->cld_end,
+                                          cb, (void *)lock);
+               if (info->oti_next_index > descr->cld_end)
+                       break;
+
+               if (res == CLP_GANG_RESCHED)
+                       cond_resched();
+       } while (res != CLP_GANG_OKAY);
+out:
+       cl_io_fini(env, io);
+       RETURN(result);
+}
+
+
 /** @} osc */