From 0a259bd7dbac76d75b89a389bc317720153aa452 Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Mon, 30 Sep 2013 15:00:38 -0700 Subject: [PATCH] LU-3321 clio: collapse layer of cl_page Move radix tree to osc layer to for performance improvement. Signed-off-by: Jinshan Xiong Change-Id: I93e3cb8352f7be41c23465b12945874316aa1809 Reviewed-on: http://review.whamcloud.com/7892 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: Bobi Jam Reviewed-by: Oleg Drokin --- lustre/include/cl_object.h | 43 ++---- lustre/llite/rw.c | 2 +- lustre/llite/rw26.c | 4 - lustre/llite/vvp_dev.c | 46 +++--- lustre/llite/vvp_io.c | 1 - lustre/llite/vvp_object.c | 28 +++- lustre/llite/vvp_page.c | 70 ++++----- lustre/lov/lov_object.c | 13 +- lustre/lov/lov_page.c | 53 +++---- lustre/obdclass/cl_io.c | 22 +-- lustre/obdclass/cl_lock.c | 130 +--------------- lustre/obdclass/cl_object.c | 50 +++--- lustre/obdclass/cl_page.c | 357 +++---------------------------------------- lustre/osc/osc_cache.c | 210 ++++++++++++++++++++++++- lustre/osc/osc_cl_internal.h | 27 +++- lustre/osc/osc_io.c | 14 +- lustre/osc/osc_lock.c | 46 +++--- lustre/osc/osc_object.c | 2 + lustre/osc/osc_page.c | 30 +++- 19 files changed, 455 insertions(+), 693 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index cfaa8fc..f57e2ac 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -387,6 +387,12 @@ struct cl_object_operations { */ int (*coo_glimpse)(const struct lu_env *env, const struct cl_object *obj, struct ost_lvb *lvb); + /** + * Object prune method. Called when the layout is going to change on + * this object, therefore each layer has to clean up their cache, + * mainly pages and locks. + */ + int (*coo_prune)(const struct lu_env *env, struct cl_object *obj); }; /** @@ -401,15 +407,9 @@ struct cl_object_header { * mostly useless otherwise. */ /** @{ */ - /** Lock protecting page tree. */ - spinlock_t coh_page_guard; /** Lock protecting lock list. */ spinlock_t coh_lock_guard; /** @} locks */ - /** Radix tree of cl_page's, cached for this object. */ - struct radix_tree_root coh_tree; - /** # of pages in radix tree. */ - unsigned long coh_pages; /** List of cl_lock's granted for this object. */ cfs_list_t coh_locks; @@ -891,14 +891,6 @@ struct cl_page_operations { void (*cpo_export)(const struct lu_env *env, const struct cl_page_slice *slice, int uptodate); /** - * Unmaps page from the user space (if it is mapped). - * - * \see cl_page_unmap() - * \see vvp_page_unmap() - */ - int (*cpo_unmap)(const struct lu_env *env, - const struct cl_page_slice *slice, struct cl_io *io); - /** * Checks whether underlying VM page is locked (in the suitable * sense). Used for assertions. * @@ -2787,25 +2779,16 @@ enum { CLP_GANG_AGAIN, CLP_GANG_ABORT }; - /* callback of cl_page_gang_lookup() */ -typedef int (*cl_page_gang_cb_t) (const struct lu_env *, struct cl_io *, - struct cl_page *, void *); -int cl_page_gang_lookup (const struct lu_env *env, - struct cl_object *obj, - struct cl_io *io, - pgoff_t start, pgoff_t end, - cl_page_gang_cb_t cb, void *cbdata); -struct cl_page *cl_page_lookup (struct cl_object_header *hdr, - pgoff_t index); + struct cl_page *cl_page_find (const struct lu_env *env, struct cl_object *obj, pgoff_t idx, struct page *vmpage, enum cl_page_type type); -struct cl_page *cl_page_find_sub (const struct lu_env *env, - struct cl_object *obj, - pgoff_t idx, struct page *vmpage, - struct cl_page *parent); +struct cl_page *cl_page_alloc (const struct lu_env *env, + struct cl_object *o, pgoff_t ind, + struct page *vmpage, + enum cl_page_type type); void cl_page_get (struct cl_page *page); void cl_page_put (const struct lu_env *env, struct cl_page *page); @@ -2876,8 +2859,6 @@ int cl_page_flush (const struct lu_env *env, struct cl_io *io, void cl_page_discard (const struct lu_env *env, struct cl_io *io, struct cl_page *pg); void cl_page_delete (const struct lu_env *env, struct cl_page *pg); -int cl_page_unmap (const struct lu_env *env, struct cl_io *io, - struct cl_page *pg); int cl_page_is_vmlocked (const struct lu_env *env, const struct cl_page *pg); void cl_page_export (const struct lu_env *env, @@ -3158,8 +3139,6 @@ void cl_page_list_assume (const struct lu_env *env, struct cl_io *io, struct cl_page_list *plist); void cl_page_list_discard(const struct lu_env *env, struct cl_io *io, struct cl_page_list *plist); -int cl_page_list_unmap (const struct lu_env *env, - struct cl_io *io, struct cl_page_list *plist); void cl_page_list_fini (const struct lu_env *env, struct cl_page_list *plist); void cl_2queue_init (struct cl_2queue *queue); diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index ec52fc4..bab5c65 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -483,7 +483,7 @@ static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io, cl_page_list_add(queue, page); rc = 1; } else { - cl_page_delete(env, page); + cl_page_discard(env, io, page); rc = -ENOLCK; } } else { diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index b97edd0..0a4b9a9 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -101,11 +101,7 @@ static void ll_invalidatepage(struct page *vmpage, unsigned long offset) if (obj != NULL) { page = cl_vmpage_page(vmpage, obj); if (page != NULL) { - lu_ref_add(&page->cp_reference, - "delete", vmpage); cl_page_delete(env, page); - lu_ref_del(&page->cp_reference, - "delete", vmpage); cl_page_put(env, page); } } else diff --git a/lustre/llite/vvp_dev.c b/lustre/llite/vvp_dev.c index 9e5d09d..8b9e09a 100644 --- a/lustre/llite/vvp_dev.c +++ b/lustre/llite/vvp_dev.c @@ -36,6 +36,7 @@ * cl_device and cl_device_type implementation for VVP layer. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #define DEBUG_SUBSYSTEM S_LLITE @@ -361,23 +362,18 @@ static loff_t vvp_pgcache_find(const struct lu_env *env, return ~0ULL; clob = vvp_pgcache_obj(env, dev, &id); if (clob != NULL) { - struct cl_object_header *hdr; - int nr; - struct cl_page *pg; + struct inode *inode = ccc_object_inode(clob); + struct page *vmpage; + int nr; - /* got an object. Find next page. */ - hdr = cl_object_header(clob); - - spin_lock(&hdr->coh_page_guard); - nr = radix_tree_gang_lookup(&hdr->coh_tree, - (void **)&pg, - id.vpi_index, 1); + nr = find_get_pages_contig(inode->i_mapping, + id.vpi_index, 1, &vmpage); if (nr > 0) { - id.vpi_index = pg->cp_index; + id.vpi_index = vmpage->index; /* Cant support over 16T file */ - nr = !(pg->cp_index > 0xffffffff); + nr = !(vmpage->index > 0xffffffff); + page_cache_release(vmpage); } - spin_unlock(&hdr->coh_page_guard); lu_object_ref_del(&clob->co_lu, "dump", current); cl_object_put(env, clob); @@ -436,8 +432,6 @@ static int vvp_pgcache_show(struct seq_file *f, void *v) struct ll_sb_info *sbi; struct cl_object *clob; struct lu_env *env; - struct cl_page *page; - struct cl_object_header *hdr; struct vvp_pgcache_id id; int refcheck; int result; @@ -449,19 +443,27 @@ static int vvp_pgcache_show(struct seq_file *f, void *v) sbi = f->private; clob = vvp_pgcache_obj(env, &sbi->ll_cl->cd_lu_dev, &id); if (clob != NULL) { - hdr = cl_object_header(clob); - - spin_lock(&hdr->coh_page_guard); - page = cl_page_lookup(hdr, id.vpi_index); + struct inode *inode = ccc_object_inode(clob); + struct cl_page *page = NULL; + struct page *vmpage; + + result = find_get_pages_contig(inode->i_mapping, + id.vpi_index, 1, &vmpage); + if (result > 0) { + lock_page(vmpage); + page = cl_vmpage_page(vmpage, clob); + unlock_page(vmpage); + + page_cache_release(vmpage); + } - seq_printf(f, "%8x@"DFID": ", - id.vpi_index, PFID(&hdr->coh_lu.loh_fid)); + seq_printf(f, "%8x@"DFID": ", id.vpi_index, + PFID(lu_object_fid(&clob->co_lu))); if (page != NULL) { vvp_pgcache_page_show(env, f, page); cl_page_put(env, page); } else seq_puts(f, "missing\n"); - spin_unlock(&hdr->coh_page_guard); lu_object_ref_del(&clob->co_lu, "dump", current); cl_object_put(env, clob); } else diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index b10de06..82b8b91 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -781,7 +781,6 @@ static int vvp_io_fault_start(const struct lu_env *env, vmpage = NULL; if (result < 0) { - cl_page_unmap(env, io, page); cl_page_discard(env, io, page); cl_page_disown(env, io, page); diff --git a/lustre/llite/vvp_object.c b/lustre/llite/vvp_object.c index 65a6e45..92c7763 100644 --- a/lustre/llite/vvp_object.c +++ b/lustre/llite/vvp_object.c @@ -153,14 +153,28 @@ int vvp_conf_set(const struct lu_env *env, struct cl_object *obj, return 0; } +static int vvp_prune(const struct lu_env *env, struct cl_object *obj) +{ + struct inode *inode = ccc_object_inode(obj); + int rc; + ENTRY; + + rc = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF, CL_FSYNC_ALL, 1); + if (rc == 0) + truncate_inode_pages(inode->i_mapping, 0); + + RETURN(rc); +} + static const struct cl_object_operations vvp_ops = { - .coo_page_init = vvp_page_init, - .coo_lock_init = vvp_lock_init, - .coo_io_init = vvp_io_init, - .coo_attr_get = vvp_attr_get, - .coo_attr_set = vvp_attr_set, - .coo_conf_set = vvp_conf_set, - .coo_glimpse = ccc_object_glimpse + .coo_page_init = vvp_page_init, + .coo_lock_init = vvp_lock_init, + .coo_io_init = vvp_io_init, + .coo_attr_get = vvp_attr_get, + .coo_attr_set = vvp_attr_set, + .coo_conf_set = vvp_conf_set, + .coo_prune = vvp_prune, + .coo_glimpse = ccc_object_glimpse }; static const struct lu_object_operations vvp_lu_obj_ops = { diff --git a/lustre/llite/vvp_page.c b/lustre/llite/vvp_page.c index 1323b36..76bed3c 100644 --- a/lustre/llite/vvp_page.c +++ b/lustre/llite/vvp_page.c @@ -139,62 +139,55 @@ static void vvp_page_discard(const struct lu_env *env, const struct cl_page_slice *slice, struct cl_io *unused) { - struct page *vmpage = cl2vm_page(slice); + struct page *vmpage = cl2vm_page(slice); struct address_space *mapping; struct ccc_page *cpg = cl2ccc_page(slice); + __u64 offset; - LASSERT(vmpage != NULL); - LASSERT(PageLocked(vmpage)); + LASSERT(vmpage != NULL); + LASSERT(PageLocked(vmpage)); mapping = vmpage->mapping; - if (cpg->cpg_defer_uptodate && !cpg->cpg_ra_used) - ll_ra_stats_inc(mapping, RA_STAT_DISCARDED); - - /* - * truncate_complete_page() calls - * a_ops->invalidatepage()->cl_page_delete()->vvp_page_delete(). - */ - truncate_complete_page(mapping, vmpage); -} - -static int vvp_page_unmap(const struct lu_env *env, - const struct cl_page_slice *slice, - struct cl_io *unused) -{ - struct page *vmpage = cl2vm_page(slice); - __u64 offset; - - LASSERT(vmpage != NULL); - LASSERT(PageLocked(vmpage)); + if (cpg->cpg_defer_uptodate && !cpg->cpg_ra_used) + ll_ra_stats_inc(mapping, RA_STAT_DISCARDED); - offset = vmpage->index << PAGE_CACHE_SHIFT; + offset = vmpage->index << PAGE_SHIFT; + ll_teardown_mmaps(vmpage->mapping, offset, offset + PAGE_SIZE); /* - * XXX is it safe to call this with the page lock held? + * truncate_complete_page() calls + * a_ops->invalidatepage()->cl_page_delete()->vvp_page_delete(). */ - ll_teardown_mmaps(vmpage->mapping, offset, offset + PAGE_CACHE_SIZE); - return 0; + truncate_complete_page(mapping, vmpage); } static void vvp_page_delete(const struct lu_env *env, const struct cl_page_slice *slice) { - struct page *vmpage = cl2vm_page(slice); + struct page *vmpage = cl2vm_page(slice); struct inode *inode = vmpage->mapping->host; struct cl_object *obj = slice->cpl_obj; + struct cl_page *page = slice->cpl_page; + int refc; - LASSERT(PageLocked(vmpage)); - LASSERT((struct cl_page *)vmpage->private == slice->cpl_page); - LASSERT(inode == ccc_object_inode(obj)); + LASSERT(PageLocked(vmpage)); + LASSERT((struct cl_page *)vmpage->private == page); + LASSERT(inode == ccc_object_inode(obj)); - vvp_write_complete(cl2ccc(obj), cl2ccc_page(slice)); - ClearPagePrivate(vmpage); - vmpage->private = 0; - /* - * Reference from vmpage to cl_page is removed, but the reference back - * is still here. It is removed later in vvp_page_fini(). - */ + vvp_write_complete(cl2ccc(obj), cl2ccc_page(slice)); + + /* Drop the reference count held in vvp_page_init */ + refc = atomic_dec_return(&page->cp_ref); + LASSERTF(refc >= 1, "page = %p, refc = %d\n", page, refc); + + ClearPageUptodate(vmpage); + ClearPagePrivate(vmpage); + vmpage->private = 0; + /* + * Reference from vmpage to cl_page is removed, but the reference back + * is still here. It is removed later in vvp_page_fini(). + */ } static void vvp_page_export(const struct lu_env *env, @@ -408,7 +401,6 @@ static const struct cl_page_operations vvp_page_ops = { .cpo_vmpage = ccc_page_vmpage, .cpo_discard = vvp_page_discard, .cpo_delete = vvp_page_delete, - .cpo_unmap = vvp_page_unmap, .cpo_export = vvp_page_export, .cpo_is_vmlocked = vvp_page_is_vmlocked, .cpo_fini = vvp_page_fini, @@ -545,6 +537,8 @@ int vvp_page_init(const struct lu_env *env, struct cl_object *obj, CFS_INIT_LIST_HEAD(&cpg->cpg_pending_linkage); if (page->cp_type == CPT_CACHEABLE) { + /* in cache, decref in vvp_page_delete */ + atomic_inc(&page->cp_ref); SetPagePrivate(vmpage); vmpage->private = (unsigned long)page; cl_page_slice_add(page, &cpg->cpg_cl, obj, diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index ae80756..8e1cd43 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -284,7 +284,7 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov, lov_layout_wait(env, lov); - cl_object_prune(env, &lov->lo_cl); + cl_locks_prune(env, &lov->lo_cl, 0); return 0; } @@ -359,9 +359,9 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, */ lov_subobject_kill(env, lov, los, i); } - } - } - cl_object_prune(env, &lov->lo_cl); + } + } + cl_locks_prune(env, &lov->lo_cl, 0); RETURN(0); } @@ -669,7 +669,6 @@ static int lov_layout_change(const struct lu_env *unused, const struct lov_layout_operations *old_ops; const struct lov_layout_operations *new_ops; - struct cl_object_header *hdr = cl_object_header(&lov->lo_cl); void *cookie; struct lu_env *env; int refcheck; @@ -695,13 +694,13 @@ static int lov_layout_change(const struct lu_env *unused, old_ops = &lov_dispatch[lov->lo_type]; new_ops = &lov_dispatch[llt]; + cl_object_prune(env, &lov->lo_cl); + result = old_ops->llo_delete(env, lov, &lov->u); if (result == 0) { old_ops->llo_fini(env, lov, &lov->u); LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0); - LASSERT(hdr->coh_tree.rnode == NULL); - LASSERT(hdr->coh_pages == 0); lov->lo_type = LLT_EMPTY; result = new_ops->llo_init(env, diff --git a/lustre/lov/lov_page.c b/lustre/lov/lov_page.c index c3c9915..85edd0e 100644 --- a/lustre/lov/lov_page.c +++ b/lustre/lov/lov_page.c @@ -36,6 +36,7 @@ * Implementation of cl_page for LOV layer. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #define DEBUG_SUBSYSTEM S_LOV @@ -178,39 +179,29 @@ int lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj, stripe = lov_stripe_number(loo->lo_lsm, offset); LASSERT(stripe < r0->lo_nr); rc = lov_stripe_offset(loo->lo_lsm, offset, stripe, - &suboff); - LASSERT(rc == 0); - - lpg->lps_invalid = 1; - cl_page_slice_add(page, &lpg->lps_cl, obj, &lov_page_ops); - - sub = lov_sub_get(env, lio, stripe); - if (IS_ERR(sub)) - GOTO(out, rc = PTR_ERR(sub)); - - subobj = lovsub2cl(r0->lo_sub[stripe]); - subpage = cl_page_find_sub(sub->sub_env, subobj, - cl_index(subobj, suboff), vmpage, page); - lov_sub_put(sub); - if (IS_ERR(subpage)) - GOTO(out, rc = PTR_ERR(subpage)); - - if (likely(subpage->cp_parent == page)) { - lu_ref_add(&subpage->cp_reference, "lov", page); - lpg->lps_invalid = 0; - rc = 0; - } else { - CL_PAGE_DEBUG(D_ERROR, env, page, "parent page\n"); - CL_PAGE_DEBUG(D_ERROR, env, subpage, "child page\n"); - LASSERT(0); - } - - EXIT; -out: - return rc; + &suboff); + LASSERT(rc == 0); + + lpg->lps_invalid = 1; + cl_page_slice_add(page, &lpg->lps_cl, obj, &lov_page_ops); + + sub = lov_sub_get(env, lio, stripe); + if (IS_ERR(sub)) + RETURN(PTR_ERR(sub)); + + subobj = lovsub2cl(r0->lo_sub[stripe]); + subpage = cl_page_alloc(sub->sub_env, subobj, cl_index(subobj, suboff), + vmpage, page->cp_type); + if (!IS_ERR(subpage)) { + subpage->cp_parent = page; + page->cp_child = subpage; + lpg->lps_invalid = 0; + } else + rc = PTR_ERR(subpage); + lov_sub_put(sub); + RETURN(rc); } - static const struct cl_page_operations lov_empty_page_ops = { .cpo_fini = lov_empty_page_fini, .cpo_print = lov_page_print diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index af0fca0..b7ddaed 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -36,6 +36,7 @@ * Client IO. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #define DEBUG_SUBSYSTEM S_CLASS @@ -1287,27 +1288,6 @@ void cl_page_list_discard(const struct lu_env *env, struct cl_io *io, EXPORT_SYMBOL(cl_page_list_discard); /** - * Unmaps all pages in a queue from user virtual memory. - */ -int cl_page_list_unmap(const struct lu_env *env, struct cl_io *io, - struct cl_page_list *plist) -{ - struct cl_page *page; - int result; - - LINVRNT(plist->pl_owner == current); - ENTRY; - result = 0; - cl_page_list_for_each(page, plist) { - result = cl_page_unmap(env, io, page); - if (result != 0) - break; - } - RETURN(result); -} -EXPORT_SYMBOL(cl_page_list_unmap); - -/** * Initialize dual page queue. */ void cl_2queue_init(struct cl_2queue *queue) diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index ff5e6ee..a2a1624 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -36,6 +36,7 @@ * Client Extent Lock. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #define DEBUG_SUBSYSTEM S_CLASS @@ -1894,129 +1895,6 @@ struct cl_lock *cl_lock_at_pgoff(const struct lu_env *env, EXPORT_SYMBOL(cl_lock_at_pgoff); /** - * Calculate the page offset at the layer of @lock. - * At the time of this writing, @page is top page and @lock is sub lock. - */ -static pgoff_t pgoff_at_lock(struct cl_page *page, struct cl_lock *lock) -{ - struct lu_device_type *dtype; - const struct cl_page_slice *slice; - - dtype = lock->cll_descr.cld_obj->co_lu.lo_dev->ld_type; - slice = cl_page_at(page, dtype); - LASSERT(slice != NULL); - return slice->cpl_page->cp_index; -} - -/** - * Check if page @page is covered by an extra lock or discard it. - */ -static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io, - struct cl_page *page, void *cbdata) -{ - struct cl_thread_info *info = cl_env_info(env); - struct cl_lock *lock = cbdata; - pgoff_t index = pgoff_at_lock(page, lock); - - if (index >= info->clt_fn_index) { - struct cl_lock *tmp; - - /* refresh non-overlapped index */ - tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index, - lock, 1, 0); - if (tmp != NULL) { - /* Cache the first-non-overlapped index so as to skip - * all pages within [index, clt_fn_index). This - * is safe because if tmp lock is canceled, it will - * discard these pages. */ - info->clt_fn_index = tmp->cll_descr.cld_end + 1; - if (tmp->cll_descr.cld_end == CL_PAGE_EOF) - info->clt_fn_index = CL_PAGE_EOF; - cl_lock_put(env, tmp); - } else if (cl_page_own(env, io, page) == 0) { - /* discard the page */ - cl_page_unmap(env, io, page); - cl_page_discard(env, io, page); - cl_page_disown(env, io, page); - } else { - LASSERT(page->cp_state == CPS_FREEING); - } - } - - info->clt_next_index = index + 1; - return CLP_GANG_OKAY; -} - -static int discard_cb(const struct lu_env *env, struct cl_io *io, - struct cl_page *page, void *cbdata) -{ - struct cl_thread_info *info = cl_env_info(env); - struct cl_lock *lock = cbdata; - - LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE); - KLASSERT(ergo(page->cp_type == CPT_CACHEABLE, - !PageWriteback(cl_page_vmpage(env, page)))); - KLASSERT(ergo(page->cp_type == CPT_CACHEABLE, - !PageDirty(cl_page_vmpage(env, page)))); - - info->clt_next_index = pgoff_at_lock(page, lock) + 1; - if (cl_page_own(env, io, page) == 0) { - /* discard the page */ - cl_page_unmap(env, io, page); - cl_page_discard(env, io, page); - cl_page_disown(env, io, page); - } else { - LASSERT(page->cp_state == CPS_FREEING); - } - - return CLP_GANG_OKAY; -} - -/** - * Discard pages protected by the given lock. This function traverses radix - * tree to find all covering pages and discard them. If a page is being covered - * by other locks, it should remain in cache. - * - * If error happens on any step, the process continues anyway (the reasoning - * behind this being that lock cancellation cannot be delayed indefinitely). - */ -int cl_lock_discard_pages(const struct lu_env *env, struct cl_lock *lock) -{ - struct cl_thread_info *info = cl_env_info(env); - struct cl_io *io = &info->clt_io; - struct cl_lock_descr *descr = &lock->cll_descr; - cl_page_gang_cb_t cb; - int res; - int result; - - LINVRNT(cl_lock_invariant(env, lock)); - ENTRY; - - io->ci_obj = cl_object_top(descr->cld_obj); - io->ci_ignore_layout = 1; - result = cl_io_init(env, io, CIT_MISC, io->ci_obj); - if (result != 0) - GOTO(out, result); - - cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb; - info->clt_fn_index = info->clt_next_index = descr->cld_start; - do { - res = cl_page_gang_lookup(env, descr->cld_obj, io, - info->clt_next_index, descr->cld_end, - cb, (void *)lock); - if (info->clt_next_index > descr->cld_end) - break; - - if (res == CLP_GANG_RESCHED) - cond_resched(); - } while (res != CLP_GANG_OKAY); -out: - cl_io_fini(env, io); - RETURN(result); -} -EXPORT_SYMBOL(cl_lock_discard_pages); - -/** * Eliminate all locks for a given object. * * Caller has to guarantee that no lock is in active use. @@ -2031,12 +1909,6 @@ void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel) ENTRY; head = cl_object_header(obj); - /* - * If locks are destroyed without cancellation, all pages must be - * already destroyed (as otherwise they will be left unprotected). - */ - LASSERT(ergo(!cancel, - head->coh_tree.rnode == NULL && head->coh_pages == 0)); spin_lock(&head->coh_lock_guard); while (!cfs_list_empty(&head->coh_locks)) { diff --git a/lustre/obdclass/cl_object.c b/lustre/obdclass/cl_object.c index 8f29926..6ad9b0a 100644 --- a/lustre/obdclass/cl_object.c +++ b/lustre/obdclass/cl_object.c @@ -36,6 +36,7 @@ * Client Lustre Object. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ /* @@ -43,7 +44,6 @@ * * i_mutex * PG_locked - * ->coh_page_guard * ->coh_lock_guard * ->coh_attr_guard * ->ls_guard @@ -63,8 +63,6 @@ static struct kmem_cache *cl_env_kmem; -/** Lock class of cl_object_header::coh_page_guard */ -static struct lock_class_key cl_page_guard_class; /** Lock class of cl_object_header::coh_lock_guard */ static struct lock_class_key cl_lock_guard_class; /** Lock class of cl_object_header::coh_attr_guard */ @@ -82,15 +80,10 @@ int cl_object_header_init(struct cl_object_header *h) ENTRY; result = lu_object_header_init(&h->coh_lu); if (result == 0) { - spin_lock_init(&h->coh_page_guard); spin_lock_init(&h->coh_lock_guard); spin_lock_init(&h->coh_attr_guard); - lockdep_set_class(&h->coh_page_guard, &cl_page_guard_class); lockdep_set_class(&h->coh_lock_guard, &cl_lock_guard_class); lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class); - h->coh_pages = 0; - /* XXX hard coded GFP_* mask. */ - INIT_RADIX_TREE(&h->coh_tree, GFP_ATOMIC); CFS_INIT_LIST_HEAD(&h->coh_locks); h->coh_page_bufsize = ALIGN(sizeof(struct cl_page), 8); } @@ -331,6 +324,33 @@ int cl_conf_set(const struct lu_env *env, struct cl_object *obj, EXPORT_SYMBOL(cl_conf_set); /** + * Prunes caches of pages and locks for this object. + */ +void cl_object_prune(const struct lu_env *env, struct cl_object *obj) +{ + struct lu_object_header *top; + struct cl_object *o; + int result; + ENTRY; + + top = obj->co_lu.lo_header; + result = 0; + cfs_list_for_each_entry(o, &top->loh_layers, co_lu.lo_linkage) { + if (o->co_ops->coo_prune != NULL) { + result = o->co_ops->coo_prune(env, o); + if (result != 0) + break; + } + } + + /* TODO: pruning locks will be moved into layers after cl_lock + * simplification is done */ + cl_locks_prune(env, obj, 1); + EXIT; +} +EXPORT_SYMBOL(cl_object_prune); + +/** * Helper function removing all object locks, and marking object for * deletion. All object pages must have been deleted at this point. * @@ -342,8 +362,6 @@ void cl_object_kill(const struct lu_env *env, struct cl_object *obj) struct cl_object_header *hdr; hdr = cl_object_header(obj); - LASSERT(hdr->coh_tree.rnode == NULL); - LASSERT(hdr->coh_pages == 0); set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags); /* @@ -358,18 +376,6 @@ void cl_object_kill(const struct lu_env *env, struct cl_object *obj) EXPORT_SYMBOL(cl_object_kill); /** - * Prunes caches of pages and locks for this object. - */ -void cl_object_prune(const struct lu_env *env, struct cl_object *obj) -{ - ENTRY; - cl_pages_prune(env, obj); - cl_locks_prune(env, obj, 1); - EXIT; -} -EXPORT_SYMBOL(cl_object_prune); - -/** * Check if the object has locks. */ int cl_object_has_locks(struct cl_object *obj) diff --git a/lustre/obdclass/cl_page.c b/lustre/obdclass/cl_page.c index 9e57e09..a8f46ea 100644 --- a/lustre/obdclass/cl_page.c +++ b/lustre/obdclass/cl_page.c @@ -36,6 +36,7 @@ * Client Lustre Page. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #define DEBUG_SUBSYSTEM S_CLASS @@ -48,8 +49,7 @@ #include #include "cl_internal.h" -static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg, - int radix); +static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg); #ifdef LIBCFS_DEBUG # define PASSERT(env, page, expr) \ @@ -111,8 +111,7 @@ static struct cl_page *cl_page_top_trusted(struct cl_page *page) * * This function can be used to obtain initial reference to previously * unreferenced cached object. It can be called only if concurrent page - * reclamation is somehow prevented, e.g., by locking page radix-tree - * (cl_object_header::hdr->coh_page_guard), or by keeping a lock on a VM page, + * reclamation is somehow prevented, e.g., by keeping a lock on a VM page, * associated with \a page. * * Use with care! Not exported. @@ -147,132 +146,6 @@ cl_page_at_trusted(const struct cl_page *page, RETURN(NULL); } -/** - * Returns a page with given index in the given object, or NULL if no page is - * found. Acquires a reference on \a page. - * - * Locking: called under cl_object_header::coh_page_guard spin-lock. - */ -struct cl_page *cl_page_lookup(struct cl_object_header *hdr, pgoff_t index) -{ - struct cl_page *page; - - LASSERT(spin_is_locked(&hdr->coh_page_guard)); - - page = radix_tree_lookup(&hdr->coh_tree, index); - if (page != NULL) - cl_page_get_trust(page); - return page; -} -EXPORT_SYMBOL(cl_page_lookup); - -/** - * Returns a list of pages by a given [start, end] of \a obj. - * - * \param resched If not NULL, then we give up before hogging CPU for too - * long and set *resched = 1, in that case caller should implement a retry - * logic. - * - * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely - * crucial in the face of [offset, EOF] locks. - * - * Return at least one page in @queue unless there is no covered page. - */ -int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj, - struct cl_io *io, pgoff_t start, pgoff_t end, - cl_page_gang_cb_t cb, void *cbdata) -{ - struct cl_object_header *hdr; - struct cl_page *page; - struct cl_page **pvec; - const struct cl_page_slice *slice; - const struct lu_device_type *dtype; - pgoff_t idx; - unsigned int nr; - unsigned int i; - unsigned int j; - int res = CLP_GANG_OKAY; - int tree_lock = 1; - ENTRY; - - idx = start; - hdr = cl_object_header(obj); - pvec = cl_env_info(env)->clt_pvec; - dtype = cl_object_top(obj)->co_lu.lo_dev->ld_type; - spin_lock(&hdr->coh_page_guard); - while ((nr = radix_tree_gang_lookup(&hdr->coh_tree, (void **)pvec, - idx, CLT_PVEC_SIZE)) > 0) { - int end_of_region = 0; - idx = pvec[nr - 1]->cp_index + 1; - for (i = 0, j = 0; i < nr; ++i) { - page = pvec[i]; - pvec[i] = NULL; - - LASSERT(page->cp_type == CPT_CACHEABLE); - if (page->cp_index > end) { - end_of_region = 1; - break; - } - if (page->cp_state == CPS_FREEING) - continue; - - slice = cl_page_at_trusted(page, dtype); - /* - * Pages for lsm-less file has no underneath sub-page - * for osc, in case of ... - */ - PASSERT(env, page, slice != NULL); - - page = slice->cpl_page; - /* - * Can safely call cl_page_get_trust() under - * radix-tree spin-lock. - * - * XXX not true, because @page is from object another - * than @hdr and protected by different tree lock. - */ - cl_page_get_trust(page); - lu_ref_add_atomic(&page->cp_reference, - "gang_lookup", current); - pvec[j++] = page; - } - - /* - * Here a delicate locking dance is performed. Current thread - * holds a reference to a page, but has to own it before it - * can be placed into queue. Owning implies waiting, so - * radix-tree lock is to be released. After a wait one has to - * check that pages weren't truncated (cl_page_own() returns - * error in the latter case). - */ - spin_unlock(&hdr->coh_page_guard); - tree_lock = 0; - - for (i = 0; i < j; ++i) { - page = pvec[i]; - if (res == CLP_GANG_OKAY) - res = (*cb)(env, io, page, cbdata); - lu_ref_del(&page->cp_reference, - "gang_lookup", current); - cl_page_put(env, page); - } - if (nr < CLT_PVEC_SIZE || end_of_region) - break; - - if (res == CLP_GANG_OKAY && need_resched()) - res = CLP_GANG_RESCHED; - if (res != CLP_GANG_OKAY) - break; - - spin_lock(&hdr->coh_page_guard); - tree_lock = 1; - } - if (tree_lock) - spin_unlock(&hdr->coh_page_guard); - RETURN(res); -} -EXPORT_SYMBOL(cl_page_gang_lookup); - static void cl_page_free(const struct lu_env *env, struct cl_page *page) { struct cl_object *obj = page->cp_obj; @@ -314,7 +187,7 @@ static inline void cl_page_state_set_trust(struct cl_page *page, *(enum cl_page_state *)&page->cp_state = state; } -static struct cl_page *cl_page_alloc(const struct lu_env *env, +struct cl_page *cl_page_alloc(const struct lu_env *env, struct cl_object *o, pgoff_t ind, struct page *vmpage, enum cl_page_type type) { @@ -327,8 +200,6 @@ static struct cl_page *cl_page_alloc(const struct lu_env *env, if (page != NULL) { int result = 0; cfs_atomic_set(&page->cp_ref, 1); - if (type == CPT_CACHEABLE) /* for radix tree */ - cfs_atomic_inc(&page->cp_ref); page->cp_obj = o; cl_object_get(o); lu_object_ref_add_at(&o->co_lu, &page->cp_obj_ref, "cl_page", @@ -348,7 +219,7 @@ static struct cl_page *cl_page_alloc(const struct lu_env *env, result = o->co_ops->coo_page_init(env, o, page, vmpage); if (result != 0) { - cl_page_delete0(env, page, 0); + cl_page_delete0(env, page); cl_page_free(env, page); page = ERR_PTR(result); break; @@ -365,6 +236,7 @@ static struct cl_page *cl_page_alloc(const struct lu_env *env, } RETURN(page); } +EXPORT_SYMBOL(cl_page_alloc); /** * Returns a cl_page with index \a idx at the object \a o, and associated with @@ -377,16 +249,13 @@ static struct cl_page *cl_page_alloc(const struct lu_env *env, * * \see cl_object_find(), cl_lock_find() */ -static struct cl_page *cl_page_find0(const struct lu_env *env, - struct cl_object *o, - pgoff_t idx, struct page *vmpage, - enum cl_page_type type, - struct cl_page *parent) +struct cl_page *cl_page_find(const struct lu_env *env, + struct cl_object *o, + pgoff_t idx, struct page *vmpage, + enum cl_page_type type) { struct cl_page *page = NULL; - struct cl_page *ghost = NULL; struct cl_object_header *hdr; - int err; LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT); might_sleep(); @@ -413,93 +282,20 @@ static struct cl_page *cl_page_find0(const struct lu_env *env, * reference on it. */ page = cl_vmpage_page(vmpage, o); - PINVRNT(env, page, - ergo(page != NULL, - cl_page_vmpage(env, page) == vmpage && - (void *)radix_tree_lookup(&hdr->coh_tree, - idx) == page)); - } - - if (page != NULL) { - CS_PAGE_INC(o, hit); - RETURN(page); + if (page != NULL) { + CS_PAGE_INC(o, hit); + RETURN(page); + } } /* allocate and initialize cl_page */ page = cl_page_alloc(env, o, idx, vmpage, type); - if (IS_ERR(page)) - RETURN(page); - - if (type == CPT_TRANSIENT) { - if (parent) { - LASSERT(page->cp_parent == NULL); - page->cp_parent = parent; - parent->cp_child = page; - } - RETURN(page); - } - - /* - * XXX optimization: use radix_tree_preload() here, and change tree - * gfp mask to GFP_KERNEL in cl_object_header_init(). - */ - spin_lock(&hdr->coh_page_guard); - err = radix_tree_insert(&hdr->coh_tree, idx, page); - if (err != 0) { - ghost = page; - /* - * Noted by Jay: a lock on \a vmpage protects cl_page_find() - * from this race, but - * - * 0. it's better to have cl_page interface "locally - * consistent" so that its correctness can be reasoned - * about without appealing to the (obscure world of) VM - * locking. - * - * 1. handling this race allows ->coh_tree to remain - * consistent even when VM locking is somehow busted, - * which is very useful during diagnosing and debugging. - */ - page = ERR_PTR(err); - CL_PAGE_DEBUG(D_ERROR, env, ghost, - "fail to insert into radix tree: %d\n", err); - } else { - if (parent) { - LASSERT(page->cp_parent == NULL); - page->cp_parent = parent; - parent->cp_child = page; - } - hdr->coh_pages++; - } - spin_unlock(&hdr->coh_page_guard); - - if (unlikely(ghost != NULL)) { - cl_page_delete0(env, ghost, 0); - cl_page_free(env, ghost); - } - RETURN(page); -} - -struct cl_page *cl_page_find(const struct lu_env *env, struct cl_object *o, - pgoff_t idx, struct page *vmpage, - enum cl_page_type type) -{ - return cl_page_find0(env, o, idx, vmpage, type, NULL); + RETURN(page); } EXPORT_SYMBOL(cl_page_find); - -struct cl_page *cl_page_find_sub(const struct lu_env *env, struct cl_object *o, - pgoff_t idx, struct page *vmpage, - struct cl_page *parent) -{ - return cl_page_find0(env, o, idx, vmpage, parent->cp_type, parent); -} -EXPORT_SYMBOL(cl_page_find_sub); - static inline int cl_page_invariant(const struct cl_page *pg) { - struct cl_object_header *header; struct cl_page *parent; struct cl_page *child; struct cl_io *owner; @@ -509,7 +305,6 @@ static inline int cl_page_invariant(const struct cl_page *pg) */ LINVRNT(cl_page_is_vmlocked(NULL, pg)); - header = cl_object_header(pg->cp_obj); parent = pg->cp_parent; child = pg->cp_child; owner = pg->cp_owner; @@ -522,15 +317,7 @@ static inline int cl_page_invariant(const struct cl_page *pg) ergo(owner != NULL && parent != NULL, parent->cp_owner == pg->cp_owner->ci_parent) && ergo(owner != NULL && child != NULL, - child->cp_owner->ci_parent == owner) && - /* - * Either page is early in initialization (has neither child - * nor parent yet), or it is in the object radix tree. - */ - ergo(pg->cp_state < CPS_FREEING && pg->cp_type == CPT_CACHEABLE, - (void *)radix_tree_lookup(&header->coh_tree, - pg->cp_index) == pg || - (child == NULL && parent == NULL)); + child->cp_owner->ci_parent == owner); } static void cl_page_state_set0(const struct lu_env *env, @@ -1081,10 +868,8 @@ EXPORT_SYMBOL(cl_page_discard); * pages, e.g,. in a error handling cl_page_find()->cl_page_delete0() * path. Doesn't check page invariant. */ -static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg, - int radix) +static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg) { - struct cl_page *tmp = pg; ENTRY; PASSERT(env, pg, pg == cl_page_top(pg)); @@ -1095,42 +880,11 @@ static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg, */ cl_page_owner_clear(pg); - /* - * unexport the page firstly before freeing it so that - * the page content is considered to be invalid. - * We have to do this because a CPS_FREEING cl_page may - * be NOT under the protection of a cl_lock. - * Afterwards, if this page is found by other threads, then this - * page will be forced to reread. - */ - cl_page_export(env, pg, 0); cl_page_state_set0(env, pg, CPS_FREEING); - CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_delete), + CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(cpo_delete), (const struct lu_env *, const struct cl_page_slice *)); - if (tmp->cp_type == CPT_CACHEABLE) { - if (!radix) - /* !radix means that @pg is not yet in the radix tree, - * skip removing it. - */ - tmp = pg->cp_child; - for (; tmp != NULL; tmp = tmp->cp_child) { - void *value; - struct cl_object_header *hdr; - - hdr = cl_object_header(tmp->cp_obj); - spin_lock(&hdr->coh_page_guard); - value = radix_tree_delete(&hdr->coh_tree, - tmp->cp_index); - PASSERT(env, tmp, value == tmp); - PASSERT(env, tmp, hdr->coh_pages > 0); - hdr->coh_pages--; - spin_unlock(&hdr->coh_page_guard); - cl_page_put(env, tmp); - } - } - EXIT; } @@ -1161,33 +915,14 @@ static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg, */ void cl_page_delete(const struct lu_env *env, struct cl_page *pg) { - PINVRNT(env, pg, cl_page_invariant(pg)); - ENTRY; - cl_page_delete0(env, pg, 1); - EXIT; + PINVRNT(env, pg, cl_page_invariant(pg)); + ENTRY; + cl_page_delete0(env, pg); + EXIT; } EXPORT_SYMBOL(cl_page_delete); /** - * Unmaps page from user virtual memory. - * - * Calls cl_page_operations::cpo_unmap() through all layers top-to-bottom. The - * layer responsible for VM interaction has to unmap page from user space - * virtual memory. - * - * \see cl_page_operations::cpo_unmap() - */ -int cl_page_unmap(const struct lu_env *env, - struct cl_io *io, struct cl_page *pg) -{ - PINVRNT(env, pg, cl_page_is_owned(pg, io)); - PINVRNT(env, pg, cl_page_invariant(pg)); - - return cl_page_invoke(env, io, pg, CL_PAGE_OP(cpo_unmap)); -} -EXPORT_SYMBOL(cl_page_unmap); - -/** * Marks page up-to-date. * * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The @@ -1460,54 +1195,6 @@ int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io, } EXPORT_SYMBOL(cl_page_is_under_lock); -static int page_prune_cb(const struct lu_env *env, struct cl_io *io, - struct cl_page *page, void *cbdata) -{ - cl_page_own(env, io, page); - cl_page_unmap(env, io, page); - cl_page_discard(env, io, page); - cl_page_disown(env, io, page); - return CLP_GANG_OKAY; -} - -/** - * Purges all cached pages belonging to the object \a obj. - */ -int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj) -{ - struct cl_thread_info *info; - struct cl_object *obj = cl_object_top(clobj); - struct cl_io *io; - int result; - - ENTRY; - info = cl_env_info(env); - io = &info->clt_io; - - /* - * initialize the io. This is ugly since we never do IO in this - * function, we just make cl_page_list functions happy. -jay - */ - io->ci_obj = obj; - io->ci_ignore_layout = 1; - result = cl_io_init(env, io, CIT_MISC, obj); - if (result != 0) { - cl_io_fini(env, io); - RETURN(io->ci_result); - } - - do { - result = cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF, - page_prune_cb, NULL); - if (result == CLP_GANG_RESCHED) - cond_resched(); - } while (result != CLP_GANG_OKAY); - - cl_io_fini(env, io); - RETURN(result); -} -EXPORT_SYMBOL(cl_pages_prune); - /** * Tells transfer engine that only part of a page is to be transmitted. * diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 94cc695..a7697f3 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -969,7 +969,6 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, lu_ref_add(&page->cp_reference, "truncate", current); if (cl_page_own(env, io, page) == 0) { - cl_page_unmap(env, io, page); cl_page_discard(env, io, page); cl_page_disown(env, io, page); } else { @@ -2197,8 +2196,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, cl_object_get(obj); client_obd_list_unlock(&cli->cl_loi_list_lock); - lu_object_ref_add_at(&obj->co_lu, &link, "check", - current); + lu_object_ref_add_at(&obj->co_lu, &link, "check", current); /* attempt some read/write balancing by alternating between * reads and writes in an object. The makes_rpc checks here @@ -2239,8 +2237,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, osc_object_unlock(osc); osc_list_maint(cli, osc); - lu_object_ref_del_at(&obj->co_lu, &link, "check", - current); + lu_object_ref_del_at(&obj->co_lu, &link, "check", current); cl_object_put(env, obj); client_obd_list_lock(&cli->cl_loi_list_lock); @@ -3042,4 +3039,207 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, RETURN(result); } +/** + * Returns a list of pages by a given [start, end] of \a obj. + * + * \param resched If not NULL, then we give up before hogging CPU for too + * long and set *resched = 1, in that case caller should implement a retry + * logic. + * + * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely + * crucial in the face of [offset, EOF] locks. + * + * Return at least one page in @queue unless there is no covered page. + */ +int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, + struct osc_object *osc, pgoff_t start, pgoff_t end, + osc_page_gang_cbt cb, void *cbdata) +{ + struct osc_page *ops; + void **pvec; + pgoff_t idx; + unsigned int nr; + unsigned int i; + unsigned int j; + int res = CLP_GANG_OKAY; + bool tree_lock = true; + ENTRY; + + idx = start; + pvec = osc_env_info(env)->oti_pvec; + spin_lock(&osc->oo_tree_lock); + while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec, + idx, OTI_PVEC_SIZE)) > 0) { + struct cl_page *page; + bool end_of_region = false; + + for (i = 0, j = 0; i < nr; ++i) { + ops = pvec[i]; + pvec[i] = NULL; + + idx = osc_index(ops); + if (idx > end) { + end_of_region = true; + break; + } + + page = cl_page_top(ops->ops_cl.cpl_page); + LASSERT(page->cp_type == CPT_CACHEABLE); + if (page->cp_state == CPS_FREEING) + continue; + + cl_page_get(page); + lu_ref_add_atomic(&page->cp_reference, + "gang_lookup", current); + pvec[j++] = ops; + } + ++idx; + + /* + * Here a delicate locking dance is performed. Current thread + * holds a reference to a page, but has to own it before it + * can be placed into queue. Owning implies waiting, so + * radix-tree lock is to be released. After a wait one has to + * check that pages weren't truncated (cl_page_own() returns + * error in the latter case). + */ + spin_unlock(&osc->oo_tree_lock); + tree_lock = false; + + for (i = 0; i < j; ++i) { + ops = pvec[i]; + if (res == CLP_GANG_OKAY) + res = (*cb)(env, io, ops, cbdata); + + page = cl_page_top(ops->ops_cl.cpl_page); + lu_ref_del(&page->cp_reference, "gang_lookup", current); + cl_page_put(env, page); + } + if (nr < OTI_PVEC_SIZE || end_of_region) + break; + + if (res == CLP_GANG_OKAY && need_resched()) + res = CLP_GANG_RESCHED; + if (res != CLP_GANG_OKAY) + break; + + spin_lock(&osc->oo_tree_lock); + tree_lock = true; + } + if (tree_lock) + spin_unlock(&osc->oo_tree_lock); + RETURN(res); +} + +/** + * Check if page @page is covered by an extra lock or discard it. + */ +static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata) +{ + struct osc_thread_info *info = osc_env_info(env); + struct cl_lock *lock = cbdata; + pgoff_t index; + + index = osc_index(ops); + if (index >= info->oti_fn_index) { + struct cl_lock *tmp; + struct cl_page *page = cl_page_top(ops->ops_cl.cpl_page); + + /* refresh non-overlapped index */ + tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index, + lock, 1, 0); + if (tmp != NULL) { + /* Cache the first-non-overlapped index so as to skip + * all pages within [index, oti_fn_index). This + * is safe because if tmp lock is canceled, it will + * discard these pages. */ + info->oti_fn_index = tmp->cll_descr.cld_end + 1; + if (tmp->cll_descr.cld_end == CL_PAGE_EOF) + info->oti_fn_index = CL_PAGE_EOF; + cl_lock_put(env, tmp); + } else if (cl_page_own(env, io, page) == 0) { + /* discard the page */ + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + } else { + LASSERT(page->cp_state == CPS_FREEING); + } + } + + info->oti_next_index = index + 1; + return CLP_GANG_OKAY; +} + +static int discard_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata) +{ + struct osc_thread_info *info = osc_env_info(env); + struct cl_lock *lock = cbdata; + struct cl_page *page = cl_page_top(ops->ops_cl.cpl_page); + + LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE); + KLASSERT(ergo(page->cp_type == CPT_CACHEABLE, + !PageWriteback(cl_page_vmpage(env, page)))); + KLASSERT(ergo(page->cp_type == CPT_CACHEABLE, + !PageDirty(cl_page_vmpage(env, page)))); + + /* page is top page. */ + info->oti_next_index = osc_index(ops) + 1; + if (cl_page_own(env, io, page) == 0) { + /* discard the page */ + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + } else { + LASSERT(page->cp_state == CPS_FREEING); + } + + return CLP_GANG_OKAY; +} + +/** + * Discard pages protected by the given lock. This function traverses radix + * tree to find all covering pages and discard them. If a page is being covered + * by other locks, it should remain in cache. + * + * If error happens on any step, the process continues anyway (the reasoning + * behind this being that lock cancellation cannot be delayed indefinitely). + */ +int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *ols) +{ + struct osc_thread_info *info = osc_env_info(env); + struct cl_io *io = &info->oti_io; + struct cl_object *osc = ols->ols_cl.cls_obj; + struct cl_lock *lock = ols->ols_cl.cls_lock; + struct cl_lock_descr *descr = &lock->cll_descr; + osc_page_gang_cbt cb; + int res; + int result; + + ENTRY; + + io->ci_obj = cl_object_top(osc); + io->ci_ignore_layout = 1; + result = cl_io_init(env, io, CIT_MISC, io->ci_obj); + if (result != 0) + GOTO(out, result); + + cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb; + info->oti_fn_index = info->oti_next_index = descr->cld_start; + do { + res = osc_page_gang_lookup(env, io, cl2osc(osc), + info->oti_next_index, descr->cld_end, + cb, (void *)lock); + if (info->oti_next_index > descr->cld_end) + break; + + if (res == CLP_GANG_RESCHED) + cond_resched(); + } while (res != CLP_GANG_OKAY); +out: + cl_io_fini(env, io); + RETURN(result); +} + + /** @} osc */ diff --git a/lustre/osc/osc_cl_internal.h b/lustre/osc/osc_cl_internal.h index 4a55e96..55e159b 100644 --- a/lustre/osc/osc_cl_internal.h +++ b/lustre/osc/osc_cl_internal.h @@ -114,7 +114,12 @@ struct osc_thread_info { struct lustre_handle oti_handle; struct cl_page_list oti_plist; struct cl_io oti_io; - struct cl_page *oti_pvec[OTI_PVEC_SIZE]; + void *oti_pvec[OTI_PVEC_SIZE]; + /** + * Fields used by cl_lock_discard_pages(). + */ + pgoff_t oti_next_index; + pgoff_t oti_fn_index; /* first non-overlapped index */ }; struct osc_object { @@ -171,6 +176,13 @@ struct osc_object { /** Protect extent tree. Will be used to protect * oo_{read|write}_pages soon. */ spinlock_t oo_lock; + + /** + * Radix tree for caching pages + */ + struct radix_tree_root oo_tree; + spinlock_t oo_tree_lock; + unsigned long oo_npages; }; static inline void osc_object_lock(struct osc_object *obj) @@ -571,6 +583,11 @@ static inline struct osc_page *oap2osc_page(struct osc_async_page *oap) return (struct osc_page *)container_of(oap, struct osc_page, ops_oap); } +static inline pgoff_t osc_index(struct osc_page *opg) +{ + return opg->ops_cl.cpl_page->cp_index; +} + static inline struct osc_lock *cl2osc_lock(const struct cl_lock_slice *slice) { LINVRNT(osc_is_object(&slice->cls_obj->co_lu)); @@ -685,6 +702,14 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, int sent, int rc); int osc_extent_release(const struct lu_env *env, struct osc_extent *ext); +int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *lock); + +typedef int (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *, + struct osc_page *, void *); +int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, + struct osc_object *osc, pgoff_t start, pgoff_t end, + osc_page_gang_cbt cb, void *cbdata); + /** @} osc */ #endif /* OSC_CL_INTERNAL_H */ diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index bb5b6dd..a16fb81 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -398,18 +398,13 @@ static int osc_async_upcall(void *a, int rc) * Checks that there are no pages being written in the extent being truncated. */ static int trunc_check_cb(const struct lu_env *env, struct cl_io *io, - struct cl_page *page, void *cbdata) + struct osc_page *ops , void *cbdata) { - const struct cl_page_slice *slice; - struct osc_page *ops; + struct cl_page *page = ops->ops_cl.cpl_page; struct osc_async_page *oap; __u64 start = *(__u64 *)cbdata; - slice = cl_page_at(page, &osc_device_type); - LASSERT(slice != NULL); - ops = cl2osc_page(slice); oap = &ops->ops_oap; - if (oap->oap_cmd & OBD_BRW_WRITE && !cfs_list_empty(&oap->oap_pending_item)) CL_PAGE_DEBUG(D_ERROR, env, page, "exists " LPU64 "/%s.\n", @@ -442,8 +437,9 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, /* * Complain if there are pages in the truncated region. */ - cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, - trunc_check_cb, (void *)&size); + osc_page_gang_lookup(env, io, cl2osc(clob), + start + partial, CL_PAGE_EOF, + trunc_check_cb, (void *)&size); } #else /* __KERNEL__ */ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 839dfa5..78f3ceb 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -36,6 +36,7 @@ * Implementation of cl_lock for OSC layer. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #define DEBUG_SUBSYSTEM S_OSC @@ -910,11 +911,8 @@ static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) static unsigned long osc_lock_weigh(const struct lu_env *env, const struct cl_lock_slice *slice) { - /* - * don't need to grab coh_page_guard since we don't care the exact # - * of pages.. - */ - return cl_object_header(slice->cls_obj)->coh_pages; + /* TODO: check how many pages are covered by this lock */ + return cl2osc(slice->cls_obj)->oo_npages; } static void osc_lock_build_einfo(const struct lu_env *env, @@ -1290,7 +1288,7 @@ static int osc_lock_flush(struct osc_lock *ols, int discard) result = 0; } - rc = cl_lock_discard_pages(env, lock); + rc = osc_lock_discard_pages(env, ols); if (result == 0 && rc < 0) result = rc; @@ -1360,23 +1358,23 @@ static void osc_lock_cancel(const struct lu_env *env, #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK static int check_cb(const struct lu_env *env, struct cl_io *io, - struct cl_page *page, void *cbdata) + struct osc_page *ops, void *cbdata) { - struct cl_lock *lock = cbdata; - - if (lock->cll_descr.cld_mode == CLM_READ) { - struct cl_lock *tmp; - tmp = cl_lock_at_page(env, lock->cll_descr.cld_obj, - page, lock, 1, 0); - if (tmp != NULL) { - cl_lock_put(env, tmp); - return CLP_GANG_OKAY; - } - } + struct cl_lock *lock = cbdata; + + if (lock->cll_descr.cld_mode == CLM_READ) { + struct cl_lock *tmp; + tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, + osc_index(ops), lock, 1, 0); + if (tmp != NULL) { + cl_lock_put(env, tmp); + return CLP_GANG_OKAY; + } + } - CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n"); - CL_PAGE_DEBUG(D_ERROR, env, page, "\n"); - return CLP_GANG_ABORT; + CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n"); + CL_PAGE_DEBUG(D_ERROR, env, ops->ops_cl.cpl_page, "\n"); + return CLP_GANG_ABORT; } /** @@ -1410,9 +1408,9 @@ static int osc_lock_has_pages(struct osc_lock *olck) io->ci_ignore_layout = 1; cl_io_init(env, io, CIT_MISC, io->ci_obj); do { - result = cl_page_gang_lookup(env, obj, io, - descr->cld_start, descr->cld_end, - check_cb, (void *)lock); + result = osc_page_gang_lookup(env, oob, io, + descr->cld_start, descr->cld_end, + check_cb, (void *)lock); if (result == CLP_GANG_ABORT) break; if (result == CLP_GANG_RESCHED) diff --git a/lustre/osc/osc_object.c b/lustre/osc/osc_object.c index 8d6eec6..91e3798 100644 --- a/lustre/osc/osc_object.c +++ b/lustre/osc/osc_object.c @@ -36,6 +36,7 @@ * Implementation of cl_object for OSC layer. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #define DEBUG_SUBSYSTEM S_OSC @@ -97,6 +98,7 @@ static int osc_object_init(const struct lu_env *env, struct lu_object *obj, cfs_atomic_set(&osc->oo_nr_reads, 0); cfs_atomic_set(&osc->oo_nr_writes, 0); spin_lock_init(&osc->oo_lock); + spin_lock_init(&osc->oo_tree_lock); cl_object_page_init(lu2cl(obj), sizeof(struct osc_page)); diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index b7296ad..a673b91 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -36,6 +36,7 @@ * Implementation of cl_page for OSC layer. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #define DEBUG_SUBSYSTEM S_OSC @@ -435,6 +436,18 @@ static void osc_page_delete(const struct lu_env *env, osc_lru_del(osc_cli(obj), opg); + if (slice->cpl_page->cp_type == CPT_CACHEABLE) { + void *value; + + spin_lock(&obj->oo_tree_lock); + value = radix_tree_delete(&obj->oo_tree, osc_index(opg)); + if (value != NULL) + --obj->oo_npages; + spin_unlock(&obj->oo_tree_lock); + + LASSERT(ergo(value != NULL, value == opg)); + } + EXIT; } @@ -503,7 +516,7 @@ static const struct cl_page_operations osc_page_ops = { }; int osc_page_init(const struct lu_env *env, struct cl_object *obj, - struct cl_page *page, struct page *vmpage) + struct cl_page *page, struct page *vmpage) { struct osc_object *osc = cl2osc(obj); struct osc_page *opg = cl_object_page_slice(obj, page); @@ -533,8 +546,18 @@ int osc_page_init(const struct lu_env *env, struct cl_object *obj, CFS_INIT_LIST_HEAD(&opg->ops_lru); /* reserve an LRU space for this page */ - if (page->cp_type == CPT_CACHEABLE && result == 0) + if (page->cp_type == CPT_CACHEABLE && result == 0) { result = osc_lru_reserve(env, osc, opg); + if (result == 0) { + spin_lock(&osc->oo_tree_lock); + result = radix_tree_insert(&osc->oo_tree, + page->cp_index, opg); + if (result == 0) + ++osc->oo_npages; + spin_unlock(&osc->oo_tree_lock); + LASSERT(result == 0); + } + } return result; } @@ -744,7 +767,6 @@ static void discard_pagevec(const struct lu_env *env, struct cl_io *io, struct cl_page *page = pvec[i]; LASSERT(cl_page_is_owned(page, io)); - cl_page_unmap(env, io, page); cl_page_discard(env, io, page); cl_page_disown(env, io, page); cl_page_put(env, page); @@ -785,7 +807,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, cfs_atomic_inc(&cli->cl_lru_shrinkers); } - pvec = osc_env_info(env)->oti_pvec; + pvec = (struct cl_page **)osc_env_info(env)->oti_pvec; io = &osc_env_info(env)->oti_io; client_obd_list_lock(&cli->cl_lru_list_lock); -- 1.8.3.1