Whamcloud - gitweb
LU-4429 llite: fix open lock matching in ll_md_blocking_ast()
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index 94cc695..e6b111b 100644 (file)
@@ -241,7 +241,7 @@ static int osc_extent_sanity_check0(struct osc_extent *ext,
 
        page_count = 0;
        cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
-               pgoff_t index = oap2cl_page(oap)->cp_index;
+               pgoff_t index = osc_index(oap2osc(oap));
                ++page_count;
                if (index > ext->oe_end || index < ext->oe_start)
                        GOTO(out, rc = 110);
@@ -834,9 +834,9 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                /* For short writes we shouldn't count parts of pages that
                 * span a whole chunk on the OST side, or our accounting goes
                 * wrong.  Should match the code in filter_grant_check. */
-               int offset = oap->oap_page_off & ~CFS_PAGE_MASK;
-               int count = oap->oap_count + (offset & (blocksize - 1));
-               int end = (offset + oap->oap_count) & (blocksize - 1);
+               int offset = last_off & ~CFS_PAGE_MASK;
+               int count = last_count + (offset & (blocksize - 1));
+               int end = (offset + last_count) & (blocksize - 1);
                if (end)
                        count += blocksize - end;
 
@@ -947,18 +947,18 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
        /* discard all pages with index greater then trunc_index */
        cfs_list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
                                     oap_pending_item) {
-               struct cl_page  *sub  = oap2cl_page(oap);
-               struct cl_page  *page = cl_page_top(sub);
+               pgoff_t index = osc_index(oap2osc(oap));
+               struct cl_page  *page = oap2cl_page(oap);
 
                LASSERT(cfs_list_empty(&oap->oap_rpc_item));
 
                /* only discard the pages with their index greater than
                 * trunc_index, and ... */
-               if (sub->cp_index < trunc_index ||
-                   (sub->cp_index == trunc_index && partial)) {
+               if (index < trunc_index ||
+                   (index == trunc_index && partial)) {
                        /* accounting how many pages remaining in the chunk
                         * so that we can calculate grants correctly. */
-                       if (sub->cp_index >> ppc_bits == trunc_chunk)
+                       if (index >> ppc_bits == trunc_chunk)
                                ++pages_in_chunk;
                        continue;
                }
@@ -969,7 +969,6 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                lu_ref_add(&page->cp_reference, "truncate", current);
 
                if (cl_page_own(env, io, page) == 0) {
-                       cl_page_unmap(env, io, page);
                        cl_page_discard(env, io, page);
                        cl_page_disown(env, io, page);
                } else {
@@ -1209,7 +1208,7 @@ static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
                          int cmd)
 {
        struct osc_page *opg  = oap2osc_page(oap);
-       struct cl_page  *page = cl_page_top(oap2cl_page(oap));
+       struct cl_page  *page = oap2cl_page(oap);
        int result;
 
        LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
@@ -1225,7 +1224,7 @@ static int osc_refresh_count(const struct lu_env *env,
                             struct osc_async_page *oap, int cmd)
 {
        struct osc_page  *opg = oap2osc_page(oap);
-       struct cl_page   *page = oap2cl_page(oap);
+       pgoff_t index = osc_index(oap2osc(oap));
        struct cl_object *obj;
        struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
 
@@ -1243,10 +1242,10 @@ static int osc_refresh_count(const struct lu_env *env,
        if (result < 0)
                return result;
        kms = attr->cat_kms;
-       if (cl_offset(obj, page->cp_index) >= kms)
+       if (cl_offset(obj, index) >= kms)
                /* catch race with truncate */
                return 0;
-       else if (cl_offset(obj, page->cp_index + 1) > kms)
+       else if (cl_offset(obj, index + 1) > kms)
                /* catch sub-page write at end of file */
                return kms % PAGE_CACHE_SIZE;
        else
@@ -1257,7 +1256,7 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
                          int cmd, int rc)
 {
        struct osc_page   *opg  = oap2osc_page(oap);
-       struct cl_page    *page = cl_page_top(oap2cl_page(oap));
+       struct cl_page    *page = oap2cl_page(oap);
        struct osc_object *obj  = cl2osc(opg->ops_cl.cpl_obj);
        enum cl_req_type   crt;
        int srvlock;
@@ -2197,8 +2196,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
 
                cl_object_get(obj);
                client_obd_list_unlock(&cli->cl_loi_list_lock);
-               lu_object_ref_add_at(&obj->co_lu, &link, "check",
-                                    current);
+               lu_object_ref_add_at(&obj->co_lu, &link, "check", current);
 
                /* attempt some read/write balancing by alternating between
                 * reads and writes in an object.  The makes_rpc checks here
@@ -2239,8 +2237,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
                osc_object_unlock(osc);
 
                osc_list_maint(cli, osc);
-               lu_object_ref_del_at(&obj->co_lu, &link, "check",
-                                    current);
+               lu_object_ref_del_at(&obj->co_lu, &link, "check", current);
                cl_object_put(env, obj);
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
@@ -2383,7 +2380,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
        OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
                     oap, oap->oap_page, oap->oap_cmd & OBD_BRW_RWMASK);
 
-       index = oap2cl_page(oap)->cp_index;
+       index = osc_index(oap2osc(oap));
 
        /* Add this page into extent by the following steps:
         * 1. if there exists an active extent for this IO, mostly this page
@@ -2493,20 +2490,20 @@ int osc_teardown_async_page(const struct lu_env *env,
        LASSERT(oap->oap_magic == OAP_MAGIC);
 
        CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
-              oap, ops, oap2cl_page(oap)->cp_index);
+              oap, ops, osc_index(oap2osc(oap)));
 
        osc_object_lock(obj);
        if (!cfs_list_empty(&oap->oap_rpc_item)) {
                CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap);
                rc = -EBUSY;
        } else if (!cfs_list_empty(&oap->oap_pending_item)) {
-               ext = osc_extent_lookup(obj, oap2cl_page(oap)->cp_index);
+               ext = osc_extent_lookup(obj, osc_index(oap2osc(oap)));
                /* only truncated pages are allowed to be taken out.
                 * See osc_extent_truncate() and osc_cache_truncate_start()
                 * for details. */
                if (ext != NULL && ext->oe_state != OES_TRUNC) {
                        OSC_EXTENT_DUMP(D_ERROR, ext, "trunc at %lu.\n",
-                                       oap2cl_page(oap)->cp_index);
+                                       osc_index(oap2osc(oap)));
                        rc = -EBUSY;
                }
        }
@@ -2529,7 +2526,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
        struct osc_extent *ext   = NULL;
        struct osc_object *obj   = cl2osc(ops->ops_cl.cpl_obj);
        struct cl_page    *cp    = ops->ops_cl.cpl_page;
-       pgoff_t            index = cp->cp_index;
+       pgoff_t            index = osc_index(ops);
        struct osc_async_page *oap = &ops->ops_oap;
        bool unplug = false;
        int rc = 0;
@@ -2545,8 +2542,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
        switch (ext->oe_state) {
        case OES_RPC:
        case OES_LOCK_DONE:
-               CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(cp),
-                             "flush an in-rpc page?\n");
+               CL_PAGE_DEBUG(D_ERROR, env, cp, "flush an in-rpc page?\n");
                LASSERT(0);
                break;
        case OES_LOCKING:
@@ -2558,12 +2554,18 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
                 * really sending the RPC. */
        case OES_TRUNC:
                /* race with truncate, page will be redirtied */
+       case OES_ACTIVE:
+               /* The extent is active so we need to abort and let the caller
+                * re-dirty the page. If we continued on here, and we were the
+                * one making the extent active, we could deadlock waiting for
+                * the page writeback to clear but it won't because the extent
+                * is active and won't be written out. */
                GOTO(out, rc = -EAGAIN);
        default:
                break;
        }
 
-       rc = cl_page_prep(env, io, cl_page_top(cp), CRT_WRITE);
+       rc = cl_page_prep(env, io, cp, CRT_WRITE);
        if (rc)
                GOTO(out, rc);
 
@@ -2608,7 +2610,7 @@ int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
        struct osc_extent     *ext;
        struct osc_extent     *found = NULL;
        cfs_list_t            *plist;
-       pgoff_t index = oap2cl_page(oap)->cp_index;
+       pgoff_t index = osc_index(ops);
        int     rc = -EBUSY;
        int     cmd;
        ENTRY;
@@ -2671,11 +2673,11 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
        ENTRY;
 
        cfs_list_for_each_entry(oap, list, oap_pending_item) {
-               struct cl_page *cp = oap2cl_page(oap);
-               if (cp->cp_index > end)
-                       end = cp->cp_index;
-               if (cp->cp_index < start)
-                       start = cp->cp_index;
+               pgoff_t index = osc_index(oap2osc(oap));
+               if (index > end)
+                       end = index;
+               if (index < start)
+                       start = index;
                ++page_count;
                mppr <<= (page_count > mppr);
        }
@@ -3042,4 +3044,206 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
        RETURN(result);
 }
 
+/**
+ * Returns a list of pages by a given [start, end] of \a obj.
+ *
+ * \param resched If not NULL, then we give up before hogging CPU for too
+ * long and set *resched = 1, in that case caller should implement a retry
+ * logic.
+ *
+ * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
+ * crucial in the face of [offset, EOF] locks.
+ *
+ * Return at least one page in @queue unless there is no covered page.
+ */
+int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
+                       struct osc_object *osc, pgoff_t start, pgoff_t end,
+                       osc_page_gang_cbt cb, void *cbdata)
+{
+       struct osc_page *ops;
+       void            **pvec;
+       pgoff_t         idx;
+       unsigned int    nr;
+       unsigned int    i;
+       unsigned int    j;
+       int             res = CLP_GANG_OKAY;
+       bool            tree_lock = true;
+       ENTRY;
+
+       idx = start;
+       pvec = osc_env_info(env)->oti_pvec;
+       spin_lock(&osc->oo_tree_lock);
+       while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
+                                           idx, OTI_PVEC_SIZE)) > 0) {
+               struct cl_page *page;
+               bool end_of_region = false;
+
+               for (i = 0, j = 0; i < nr; ++i) {
+                       ops = pvec[i];
+                       pvec[i] = NULL;
+
+                       idx = osc_index(ops);
+                       if (idx > end) {
+                               end_of_region = true;
+                               break;
+                       }
+
+                       page = ops->ops_cl.cpl_page;
+                       LASSERT(page->cp_type == CPT_CACHEABLE);
+                       if (page->cp_state == CPS_FREEING)
+                               continue;
+
+                       cl_page_get(page);
+                       lu_ref_add_atomic(&page->cp_reference,
+                                         "gang_lookup", current);
+                       pvec[j++] = ops;
+               }
+               ++idx;
+
+               /*
+                * Here a delicate locking dance is performed. Current thread
+                * holds a reference to a page, but has to own it before it
+                * can be placed into queue. Owning implies waiting, so
+                * radix-tree lock is to be released. After a wait one has to
+                * check that pages weren't truncated (cl_page_own() returns
+                * error in the latter case).
+                */
+               spin_unlock(&osc->oo_tree_lock);
+               tree_lock = false;
+
+               for (i = 0; i < j; ++i) {
+                       ops = pvec[i];
+                       if (res == CLP_GANG_OKAY)
+                               res = (*cb)(env, io, ops, cbdata);
+
+                       page = ops->ops_cl.cpl_page;
+                       lu_ref_del(&page->cp_reference, "gang_lookup", current);
+                       cl_page_put(env, page);
+               }
+               if (nr < OTI_PVEC_SIZE || end_of_region)
+                       break;
+
+               if (res == CLP_GANG_OKAY && need_resched())
+                       res = CLP_GANG_RESCHED;
+               if (res != CLP_GANG_OKAY)
+                       break;
+
+               spin_lock(&osc->oo_tree_lock);
+               tree_lock = true;
+       }
+       if (tree_lock)
+               spin_unlock(&osc->oo_tree_lock);
+       RETURN(res);
+}
+
+/**
+ * Check if page @page is covered by an extra lock or discard it.
+ */
+static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
+                               struct osc_page *ops, void *cbdata)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_lock *lock = cbdata;
+       pgoff_t index;
+
+       index = osc_index(ops);
+       if (index >= info->oti_fn_index) {
+               struct cl_lock *tmp;
+               struct cl_page *page = ops->ops_cl.cpl_page;
+
+               /* refresh non-overlapped index */
+               tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
+                                      lock, 1, 0);
+               if (tmp != NULL) {
+                       /* Cache the first-non-overlapped index so as to skip
+                        * all pages within [index, oti_fn_index). This
+                        * is safe because if tmp lock is canceled, it will
+                        * discard these pages. */
+                       info->oti_fn_index = tmp->cll_descr.cld_end + 1;
+                       if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
+                               info->oti_fn_index = CL_PAGE_EOF;
+                       cl_lock_put(env, tmp);
+               } else if (cl_page_own(env, io, page) == 0) {
+                       /* discard the page */
+                       cl_page_discard(env, io, page);
+                       cl_page_disown(env, io, page);
+               } else {
+                       LASSERT(page->cp_state == CPS_FREEING);
+               }
+       }
+
+       info->oti_next_index = index + 1;
+       return CLP_GANG_OKAY;
+}
+
+static int discard_cb(const struct lu_env *env, struct cl_io *io,
+                     struct osc_page *ops, void *cbdata)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_lock *lock = cbdata;
+       struct cl_page *page = ops->ops_cl.cpl_page;
+
+       LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
+
+       /* page is top page. */
+       info->oti_next_index = osc_index(ops) + 1;
+       if (cl_page_own(env, io, page) == 0) {
+               KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
+                             !PageDirty(cl_page_vmpage(page))));
+
+               /* discard the page */
+               cl_page_discard(env, io, page);
+               cl_page_disown(env, io, page);
+       } else {
+               LASSERT(page->cp_state == CPS_FREEING);
+       }
+
+       return CLP_GANG_OKAY;
+}
+
+/**
+ * Discard pages protected by the given lock. This function traverses radix
+ * tree to find all covering pages and discard them. If a page is being covered
+ * by other locks, it should remain in cache.
+ *
+ * If error happens on any step, the process continues anyway (the reasoning
+ * behind this being that lock cancellation cannot be delayed indefinitely).
+ */
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *ols)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_io *io = &info->oti_io;
+       struct cl_object *osc = ols->ols_cl.cls_obj;
+       struct cl_lock *lock = ols->ols_cl.cls_lock;
+       struct cl_lock_descr *descr = &lock->cll_descr;
+       osc_page_gang_cbt cb;
+       int res;
+       int result;
+
+       ENTRY;
+
+       io->ci_obj = cl_object_top(osc);
+       io->ci_ignore_layout = 1;
+       result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+       if (result != 0)
+               GOTO(out, result);
+
+       cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
+       info->oti_fn_index = info->oti_next_index = descr->cld_start;
+       do {
+               res = osc_page_gang_lookup(env, io, cl2osc(osc),
+                                          info->oti_next_index, descr->cld_end,
+                                          cb, (void *)lock);
+               if (info->oti_next_index > descr->cld_end)
+                       break;
+
+               if (res == CLP_GANG_RESCHED)
+                       cond_resched();
+       } while (res != CLP_GANG_OKAY);
+out:
+       cl_io_fini(env, io);
+       RETURN(result);
+}
+
+
 /** @} osc */