Whamcloud - gitweb
LU-3321 clio: collapse layer of cl_page 92/7892/10
authorJinshan Xiong <jinshan.xiong@intel.com>
Mon, 30 Sep 2013 22:00:38 +0000 (15:00 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 15 Nov 2013 06:55:39 +0000 (06:55 +0000)
Move radix tree to osc layer to for performance improvement.

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I93e3cb8352f7be41c23465b12945874316aa1809
Reviewed-on: http://review.whamcloud.com/7892
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Bobi Jam <bobijam@gmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
19 files changed:
lustre/include/cl_object.h
lustre/llite/rw.c
lustre/llite/rw26.c
lustre/llite/vvp_dev.c
lustre/llite/vvp_io.c
lustre/llite/vvp_object.c
lustre/llite/vvp_page.c
lustre/lov/lov_object.c
lustre/lov/lov_page.c
lustre/obdclass/cl_io.c
lustre/obdclass/cl_lock.c
lustre/obdclass/cl_object.c
lustre/obdclass/cl_page.c
lustre/osc/osc_cache.c
lustre/osc/osc_cl_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_lock.c
lustre/osc/osc_object.c
lustre/osc/osc_page.c

index cfaa8fc..f57e2ac 100644 (file)
@@ -387,6 +387,12 @@ struct cl_object_operations {
          */
         int (*coo_glimpse)(const struct lu_env *env,
                            const struct cl_object *obj, struct ost_lvb *lvb);
          */
         int (*coo_glimpse)(const struct lu_env *env,
                            const struct cl_object *obj, struct ost_lvb *lvb);
+       /**
+        * Object prune method. Called when the layout is going to change on
+        * this object, therefore each layer has to clean up their cache,
+        * mainly pages and locks.
+        */
+       int (*coo_prune)(const struct lu_env *env, struct cl_object *obj);
 };
 
 /**
 };
 
 /**
@@ -401,15 +407,9 @@ struct cl_object_header {
          * mostly useless otherwise.
          */
         /** @{ */
          * mostly useless otherwise.
          */
         /** @{ */
-        /** Lock protecting page tree. */
-       spinlock_t               coh_page_guard;
        /** Lock protecting lock list. */
        spinlock_t               coh_lock_guard;
         /** @} locks */
        /** Lock protecting lock list. */
        spinlock_t               coh_lock_guard;
         /** @} locks */
-        /** Radix tree of cl_page's, cached for this object. */
-        struct radix_tree_root   coh_tree;
-        /** # of pages in radix tree. */
-        unsigned long            coh_pages;
         /** List of cl_lock's granted for this object. */
         cfs_list_t               coh_locks;
 
         /** List of cl_lock's granted for this object. */
         cfs_list_t               coh_locks;
 
@@ -891,14 +891,6 @@ struct cl_page_operations {
         void  (*cpo_export)(const struct lu_env *env,
                             const struct cl_page_slice *slice, int uptodate);
         /**
         void  (*cpo_export)(const struct lu_env *env,
                             const struct cl_page_slice *slice, int uptodate);
         /**
-         * Unmaps page from the user space (if it is mapped).
-         *
-         * \see cl_page_unmap()
-         * \see vvp_page_unmap()
-         */
-        int (*cpo_unmap)(const struct lu_env *env,
-                         const struct cl_page_slice *slice, struct cl_io *io);
-        /**
          * Checks whether underlying VM page is locked (in the suitable
          * sense). Used for assertions.
          *
          * Checks whether underlying VM page is locked (in the suitable
          * sense). Used for assertions.
          *
@@ -2787,25 +2779,16 @@ enum {
         CLP_GANG_AGAIN,
         CLP_GANG_ABORT
 };
         CLP_GANG_AGAIN,
         CLP_GANG_ABORT
 };
-
 /* callback of cl_page_gang_lookup() */
 /* callback of cl_page_gang_lookup() */
-typedef int   (*cl_page_gang_cb_t)  (const struct lu_env *, struct cl_io *,
-                                     struct cl_page *, void *);
-int             cl_page_gang_lookup (const struct lu_env *env,
-                                     struct cl_object *obj,
-                                     struct cl_io *io,
-                                     pgoff_t start, pgoff_t end,
-                                     cl_page_gang_cb_t cb, void *cbdata);
-struct cl_page *cl_page_lookup      (struct cl_object_header *hdr,
-                                     pgoff_t index);
+
 struct cl_page *cl_page_find        (const struct lu_env *env,
                                      struct cl_object *obj,
                                      pgoff_t idx, struct page *vmpage,
                                      enum cl_page_type type);
 struct cl_page *cl_page_find        (const struct lu_env *env,
                                      struct cl_object *obj,
                                      pgoff_t idx, struct page *vmpage,
                                      enum cl_page_type type);
-struct cl_page *cl_page_find_sub    (const struct lu_env *env,
-                                     struct cl_object *obj,
-                                     pgoff_t idx, struct page *vmpage,
-                                     struct cl_page *parent);
+struct cl_page *cl_page_alloc       (const struct lu_env *env,
+                                    struct cl_object *o, pgoff_t ind,
+                                    struct page *vmpage,
+                                    enum cl_page_type type);
 void            cl_page_get         (struct cl_page *page);
 void            cl_page_put         (const struct lu_env *env,
                                      struct cl_page *page);
 void            cl_page_get         (struct cl_page *page);
 void            cl_page_put         (const struct lu_env *env,
                                      struct cl_page *page);
@@ -2876,8 +2859,6 @@ int  cl_page_flush      (const struct lu_env *env, struct cl_io *io,
 void    cl_page_discard      (const struct lu_env *env, struct cl_io *io,
                               struct cl_page *pg);
 void    cl_page_delete       (const struct lu_env *env, struct cl_page *pg);
 void    cl_page_discard      (const struct lu_env *env, struct cl_io *io,
                               struct cl_page *pg);
 void    cl_page_delete       (const struct lu_env *env, struct cl_page *pg);
-int     cl_page_unmap        (const struct lu_env *env, struct cl_io *io,
-                              struct cl_page *pg);
 int     cl_page_is_vmlocked  (const struct lu_env *env,
                               const struct cl_page *pg);
 void    cl_page_export       (const struct lu_env *env,
 int     cl_page_is_vmlocked  (const struct lu_env *env,
                               const struct cl_page *pg);
 void    cl_page_export       (const struct lu_env *env,
@@ -3158,8 +3139,6 @@ void cl_page_list_assume (const struct lu_env *env,
                           struct cl_io *io, struct cl_page_list *plist);
 void cl_page_list_discard(const struct lu_env *env,
                           struct cl_io *io, struct cl_page_list *plist);
                           struct cl_io *io, struct cl_page_list *plist);
 void cl_page_list_discard(const struct lu_env *env,
                           struct cl_io *io, struct cl_page_list *plist);
-int  cl_page_list_unmap  (const struct lu_env *env,
-                          struct cl_io *io, struct cl_page_list *plist);
 void cl_page_list_fini   (const struct lu_env *env, struct cl_page_list *plist);
 
 void cl_2queue_init     (struct cl_2queue *queue);
 void cl_page_list_fini   (const struct lu_env *env, struct cl_page_list *plist);
 
 void cl_2queue_init     (struct cl_2queue *queue);
index ec52fc4..bab5c65 100644 (file)
@@ -483,7 +483,7 @@ static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
                        cl_page_list_add(queue, page);
                        rc = 1;
                } else {
                        cl_page_list_add(queue, page);
                        rc = 1;
                } else {
-                       cl_page_delete(env, page);
+                       cl_page_discard(env, io, page);
                        rc = -ENOLCK;
                }
        } else {
                        rc = -ENOLCK;
                }
        } else {
index b97edd0..0a4b9a9 100644 (file)
@@ -101,11 +101,7 @@ static void ll_invalidatepage(struct page *vmpage, unsigned long offset)
                         if (obj != NULL) {
                                 page = cl_vmpage_page(vmpage, obj);
                                 if (page != NULL) {
                         if (obj != NULL) {
                                 page = cl_vmpage_page(vmpage, obj);
                                 if (page != NULL) {
-                                        lu_ref_add(&page->cp_reference,
-                                                   "delete", vmpage);
                                         cl_page_delete(env, page);
                                         cl_page_delete(env, page);
-                                        lu_ref_del(&page->cp_reference,
-                                                   "delete", vmpage);
                                         cl_page_put(env, page);
                                 }
                         } else
                                         cl_page_put(env, page);
                                 }
                         } else
index 9e5d09d..8b9e09a 100644 (file)
@@ -36,6 +36,7 @@
  * cl_device and cl_device_type implementation for VVP layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  * cl_device and cl_device_type implementation for VVP layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LLITE
  */
 
 #define DEBUG_SUBSYSTEM S_LLITE
@@ -361,23 +362,18 @@ static loff_t vvp_pgcache_find(const struct lu_env *env,
                        return ~0ULL;
                clob = vvp_pgcache_obj(env, dev, &id);
                if (clob != NULL) {
                        return ~0ULL;
                clob = vvp_pgcache_obj(env, dev, &id);
                if (clob != NULL) {
-                       struct cl_object_header *hdr;
-                       int                      nr;
-                       struct cl_page          *pg;
+                       struct inode *inode = ccc_object_inode(clob);
+                       struct page *vmpage;
+                       int nr;
 
 
-                       /* got an object. Find next page. */
-                       hdr = cl_object_header(clob);
-
-                       spin_lock(&hdr->coh_page_guard);
-                       nr = radix_tree_gang_lookup(&hdr->coh_tree,
-                                                   (void **)&pg,
-                                                   id.vpi_index, 1);
+                       nr = find_get_pages_contig(inode->i_mapping,
+                                                  id.vpi_index, 1, &vmpage);
                        if (nr > 0) {
                        if (nr > 0) {
-                               id.vpi_index = pg->cp_index;
+                               id.vpi_index = vmpage->index;
                                /* Cant support over 16T file */
                                /* Cant support over 16T file */
-                               nr = !(pg->cp_index > 0xffffffff);
+                               nr = !(vmpage->index > 0xffffffff);
+                               page_cache_release(vmpage);
                        }
                        }
-                       spin_unlock(&hdr->coh_page_guard);
 
                        lu_object_ref_del(&clob->co_lu, "dump", current);
                        cl_object_put(env, clob);
 
                        lu_object_ref_del(&clob->co_lu, "dump", current);
                        cl_object_put(env, clob);
@@ -436,8 +432,6 @@ static int vvp_pgcache_show(struct seq_file *f, void *v)
        struct ll_sb_info       *sbi;
        struct cl_object        *clob;
        struct lu_env           *env;
        struct ll_sb_info       *sbi;
        struct cl_object        *clob;
        struct lu_env           *env;
-       struct cl_page          *page;
-       struct cl_object_header *hdr;
        struct vvp_pgcache_id    id;
        int                      refcheck;
        int                      result;
        struct vvp_pgcache_id    id;
        int                      refcheck;
        int                      result;
@@ -449,19 +443,27 @@ static int vvp_pgcache_show(struct seq_file *f, void *v)
                sbi = f->private;
                clob = vvp_pgcache_obj(env, &sbi->ll_cl->cd_lu_dev, &id);
                if (clob != NULL) {
                sbi = f->private;
                clob = vvp_pgcache_obj(env, &sbi->ll_cl->cd_lu_dev, &id);
                if (clob != NULL) {
-                       hdr = cl_object_header(clob);
-
-                       spin_lock(&hdr->coh_page_guard);
-                       page = cl_page_lookup(hdr, id.vpi_index);
+                       struct inode *inode = ccc_object_inode(clob);
+                       struct cl_page *page = NULL;
+                       struct page *vmpage;
+
+                       result = find_get_pages_contig(inode->i_mapping,
+                                                     id.vpi_index, 1, &vmpage);
+                       if (result > 0) {
+                               lock_page(vmpage);
+                               page = cl_vmpage_page(vmpage, clob);
+                               unlock_page(vmpage);
+
+                               page_cache_release(vmpage);
+                       }
 
 
-                       seq_printf(f, "%8x@"DFID": ",
-                                  id.vpi_index, PFID(&hdr->coh_lu.loh_fid));
+                       seq_printf(f, "%8x@"DFID": ", id.vpi_index,
+                                  PFID(lu_object_fid(&clob->co_lu)));
                        if (page != NULL) {
                                vvp_pgcache_page_show(env, f, page);
                                cl_page_put(env, page);
                        } else
                                seq_puts(f, "missing\n");
                        if (page != NULL) {
                                vvp_pgcache_page_show(env, f, page);
                                cl_page_put(env, page);
                        } else
                                seq_puts(f, "missing\n");
-                       spin_unlock(&hdr->coh_page_guard);
                        lu_object_ref_del(&clob->co_lu, "dump", current);
                        cl_object_put(env, clob);
                } else
                        lu_object_ref_del(&clob->co_lu, "dump", current);
                        cl_object_put(env, clob);
                } else
index b10de06..82b8b91 100644 (file)
@@ -781,7 +781,6 @@ static int vvp_io_fault_start(const struct lu_env *env,
 
                        vmpage = NULL;
                        if (result < 0) {
 
                        vmpage = NULL;
                        if (result < 0) {
-                               cl_page_unmap(env, io, page);
                                cl_page_discard(env, io, page);
                                cl_page_disown(env, io, page);
 
                                cl_page_discard(env, io, page);
                                cl_page_disown(env, io, page);
 
index 65a6e45..92c7763 100644 (file)
@@ -153,14 +153,28 @@ int vvp_conf_set(const struct lu_env *env, struct cl_object *obj,
        return 0;
 }
 
        return 0;
 }
 
+static int vvp_prune(const struct lu_env *env, struct cl_object *obj)
+{
+       struct inode *inode = ccc_object_inode(obj);
+       int rc;
+       ENTRY;
+
+       rc = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF, CL_FSYNC_ALL, 1);
+       if (rc == 0)
+               truncate_inode_pages(inode->i_mapping, 0);
+
+       RETURN(rc);
+}
+
 static const struct cl_object_operations vvp_ops = {
 static const struct cl_object_operations vvp_ops = {
-        .coo_page_init = vvp_page_init,
-        .coo_lock_init = vvp_lock_init,
-        .coo_io_init   = vvp_io_init,
-        .coo_attr_get  = vvp_attr_get,
-        .coo_attr_set  = vvp_attr_set,
-        .coo_conf_set  = vvp_conf_set,
-        .coo_glimpse   = ccc_object_glimpse
+       .coo_page_init = vvp_page_init,
+       .coo_lock_init = vvp_lock_init,
+       .coo_io_init   = vvp_io_init,
+       .coo_attr_get  = vvp_attr_get,
+       .coo_attr_set  = vvp_attr_set,
+       .coo_conf_set  = vvp_conf_set,
+       .coo_prune     = vvp_prune,
+       .coo_glimpse   = ccc_object_glimpse
 };
 
 static const struct lu_object_operations vvp_lu_obj_ops = {
 };
 
 static const struct lu_object_operations vvp_lu_obj_ops = {
index 1323b36..76bed3c 100644 (file)
@@ -139,62 +139,55 @@ static void vvp_page_discard(const struct lu_env *env,
                              const struct cl_page_slice *slice,
                              struct cl_io *unused)
 {
                              const struct cl_page_slice *slice,
                              struct cl_io *unused)
 {
-       struct page           *vmpage  = cl2vm_page(slice);
+       struct page          *vmpage  = cl2vm_page(slice);
        struct address_space *mapping;
        struct ccc_page      *cpg     = cl2ccc_page(slice);
        struct address_space *mapping;
        struct ccc_page      *cpg     = cl2ccc_page(slice);
+       __u64 offset;
 
 
-        LASSERT(vmpage != NULL);
-        LASSERT(PageLocked(vmpage));
+       LASSERT(vmpage != NULL);
+       LASSERT(PageLocked(vmpage));
 
        mapping = vmpage->mapping;
 
 
        mapping = vmpage->mapping;
 
-        if (cpg->cpg_defer_uptodate && !cpg->cpg_ra_used)
-                ll_ra_stats_inc(mapping, RA_STAT_DISCARDED);
-
-        /*
-         * truncate_complete_page() calls
-         * a_ops->invalidatepage()->cl_page_delete()->vvp_page_delete().
-         */
-        truncate_complete_page(mapping, vmpage);
-}
-
-static int vvp_page_unmap(const struct lu_env *env,
-                         const struct cl_page_slice *slice,
-                         struct cl_io *unused)
-{
-       struct page *vmpage = cl2vm_page(slice);
-       __u64       offset;
-
-       LASSERT(vmpage != NULL);
-       LASSERT(PageLocked(vmpage));
+       if (cpg->cpg_defer_uptodate && !cpg->cpg_ra_used)
+               ll_ra_stats_inc(mapping, RA_STAT_DISCARDED);
 
 
-       offset = vmpage->index << PAGE_CACHE_SHIFT;
+       offset = vmpage->index << PAGE_SHIFT;
+       ll_teardown_mmaps(vmpage->mapping, offset, offset + PAGE_SIZE);
 
        /*
 
        /*
-        * XXX is it safe to call this with the page lock held?
+        * truncate_complete_page() calls
+        * a_ops->invalidatepage()->cl_page_delete()->vvp_page_delete().
         */
         */
-       ll_teardown_mmaps(vmpage->mapping, offset, offset + PAGE_CACHE_SIZE);
-       return 0;
+       truncate_complete_page(mapping, vmpage);
 }
 
 static void vvp_page_delete(const struct lu_env *env,
                             const struct cl_page_slice *slice)
 {
 }
 
 static void vvp_page_delete(const struct lu_env *env,
                             const struct cl_page_slice *slice)
 {
-       struct page       *vmpage = cl2vm_page(slice);
+       struct page      *vmpage = cl2vm_page(slice);
        struct inode     *inode  = vmpage->mapping->host;
        struct cl_object *obj    = slice->cpl_obj;
        struct inode     *inode  = vmpage->mapping->host;
        struct cl_object *obj    = slice->cpl_obj;
+       struct cl_page   *page   = slice->cpl_page;
+       int refc;
 
 
-        LASSERT(PageLocked(vmpage));
-        LASSERT((struct cl_page *)vmpage->private == slice->cpl_page);
-        LASSERT(inode == ccc_object_inode(obj));
+       LASSERT(PageLocked(vmpage));
+       LASSERT((struct cl_page *)vmpage->private == page);
+       LASSERT(inode == ccc_object_inode(obj));
 
 
-        vvp_write_complete(cl2ccc(obj), cl2ccc_page(slice));
-        ClearPagePrivate(vmpage);
-        vmpage->private = 0;
-        /*
-         * Reference from vmpage to cl_page is removed, but the reference back
-         * is still here. It is removed later in vvp_page_fini().
-         */
+       vvp_write_complete(cl2ccc(obj), cl2ccc_page(slice));
+
+       /* Drop the reference count held in vvp_page_init */
+       refc = atomic_dec_return(&page->cp_ref);
+       LASSERTF(refc >= 1, "page = %p, refc = %d\n", page, refc);
+
+       ClearPageUptodate(vmpage);
+       ClearPagePrivate(vmpage);
+       vmpage->private = 0;
+       /*
+        * Reference from vmpage to cl_page is removed, but the reference back
+        * is still here. It is removed later in vvp_page_fini().
+        */
 }
 
 static void vvp_page_export(const struct lu_env *env,
 }
 
 static void vvp_page_export(const struct lu_env *env,
@@ -408,7 +401,6 @@ static const struct cl_page_operations vvp_page_ops = {
         .cpo_vmpage        = ccc_page_vmpage,
         .cpo_discard       = vvp_page_discard,
         .cpo_delete        = vvp_page_delete,
         .cpo_vmpage        = ccc_page_vmpage,
         .cpo_discard       = vvp_page_discard,
         .cpo_delete        = vvp_page_delete,
-        .cpo_unmap         = vvp_page_unmap,
         .cpo_export        = vvp_page_export,
         .cpo_is_vmlocked   = vvp_page_is_vmlocked,
         .cpo_fini          = vvp_page_fini,
         .cpo_export        = vvp_page_export,
         .cpo_is_vmlocked   = vvp_page_is_vmlocked,
         .cpo_fini          = vvp_page_fini,
@@ -545,6 +537,8 @@ int vvp_page_init(const struct lu_env *env, struct cl_object *obj,
 
        CFS_INIT_LIST_HEAD(&cpg->cpg_pending_linkage);
        if (page->cp_type == CPT_CACHEABLE) {
 
        CFS_INIT_LIST_HEAD(&cpg->cpg_pending_linkage);
        if (page->cp_type == CPT_CACHEABLE) {
+               /* in cache, decref in vvp_page_delete */
+               atomic_inc(&page->cp_ref);
                SetPagePrivate(vmpage);
                vmpage->private = (unsigned long)page;
                cl_page_slice_add(page, &cpg->cpg_cl, obj,
                SetPagePrivate(vmpage);
                vmpage->private = (unsigned long)page;
                cl_page_slice_add(page, &cpg->cpg_cl, obj,
index ae80756..8e1cd43 100644 (file)
@@ -284,7 +284,7 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
 
        lov_layout_wait(env, lov);
 
 
        lov_layout_wait(env, lov);
 
-       cl_object_prune(env, &lov->lo_cl);
+       cl_locks_prune(env, &lov->lo_cl, 0);
        return 0;
 }
 
        return 0;
 }
 
@@ -359,9 +359,9 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
                                  */
                                 lov_subobject_kill(env, lov, los, i);
                        }
                                  */
                                 lov_subobject_kill(env, lov, los, i);
                        }
-                }
-        }
-       cl_object_prune(env, &lov->lo_cl);
+               }
+       }
+       cl_locks_prune(env, &lov->lo_cl, 0);
        RETURN(0);
 }
 
        RETURN(0);
 }
 
@@ -669,7 +669,6 @@ static int lov_layout_change(const struct lu_env *unused,
        const struct lov_layout_operations *old_ops;
        const struct lov_layout_operations *new_ops;
 
        const struct lov_layout_operations *old_ops;
        const struct lov_layout_operations *new_ops;
 
-       struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
        void *cookie;
        struct lu_env *env;
        int refcheck;
        void *cookie;
        struct lu_env *env;
        int refcheck;
@@ -695,13 +694,13 @@ static int lov_layout_change(const struct lu_env *unused,
        old_ops = &lov_dispatch[lov->lo_type];
        new_ops = &lov_dispatch[llt];
 
        old_ops = &lov_dispatch[lov->lo_type];
        new_ops = &lov_dispatch[llt];
 
+       cl_object_prune(env, &lov->lo_cl);
+
        result = old_ops->llo_delete(env, lov, &lov->u);
        if (result == 0) {
                old_ops->llo_fini(env, lov, &lov->u);
 
                LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
        result = old_ops->llo_delete(env, lov, &lov->u);
        if (result == 0) {
                old_ops->llo_fini(env, lov, &lov->u);
 
                LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
-               LASSERT(hdr->coh_tree.rnode == NULL);
-               LASSERT(hdr->coh_pages == 0);
 
                lov->lo_type = LLT_EMPTY;
                result = new_ops->llo_init(env,
 
                lov->lo_type = LLT_EMPTY;
                result = new_ops->llo_init(env,
index c3c9915..85edd0e 100644 (file)
@@ -36,6 +36,7 @@
  * Implementation of cl_page for LOV layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  * Implementation of cl_page for LOV layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LOV
  */
 
 #define DEBUG_SUBSYSTEM S_LOV
@@ -178,39 +179,29 @@ int lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj,
        stripe = lov_stripe_number(loo->lo_lsm, offset);
        LASSERT(stripe < r0->lo_nr);
        rc = lov_stripe_offset(loo->lo_lsm, offset, stripe,
        stripe = lov_stripe_number(loo->lo_lsm, offset);
        LASSERT(stripe < r0->lo_nr);
        rc = lov_stripe_offset(loo->lo_lsm, offset, stripe,
-                                   &suboff);
-        LASSERT(rc == 0);
-
-        lpg->lps_invalid = 1;
-        cl_page_slice_add(page, &lpg->lps_cl, obj, &lov_page_ops);
-
-        sub = lov_sub_get(env, lio, stripe);
-        if (IS_ERR(sub))
-                GOTO(out, rc = PTR_ERR(sub));
-
-        subobj = lovsub2cl(r0->lo_sub[stripe]);
-        subpage = cl_page_find_sub(sub->sub_env, subobj,
-                                   cl_index(subobj, suboff), vmpage, page);
-        lov_sub_put(sub);
-        if (IS_ERR(subpage))
-                GOTO(out, rc = PTR_ERR(subpage));
-
-        if (likely(subpage->cp_parent == page)) {
-                lu_ref_add(&subpage->cp_reference, "lov", page);
-                lpg->lps_invalid = 0;
-               rc = 0;
-        } else {
-                CL_PAGE_DEBUG(D_ERROR, env, page, "parent page\n");
-                CL_PAGE_DEBUG(D_ERROR, env, subpage, "child page\n");
-                LASSERT(0);
-        }
-
-        EXIT;
-out:
-        return rc;
+                              &suboff);
+       LASSERT(rc == 0);
+
+       lpg->lps_invalid = 1;
+       cl_page_slice_add(page, &lpg->lps_cl, obj, &lov_page_ops);
+
+       sub = lov_sub_get(env, lio, stripe);
+       if (IS_ERR(sub))
+               RETURN(PTR_ERR(sub));
+
+       subobj = lovsub2cl(r0->lo_sub[stripe]);
+       subpage = cl_page_alloc(sub->sub_env, subobj, cl_index(subobj, suboff),
+                               vmpage, page->cp_type);
+       if (!IS_ERR(subpage)) {
+               subpage->cp_parent = page;
+               page->cp_child = subpage;
+               lpg->lps_invalid = 0;
+       } else
+               rc = PTR_ERR(subpage);
+       lov_sub_put(sub);
+       RETURN(rc);
 }
 
 }
 
-
 static const struct cl_page_operations lov_empty_page_ops = {
         .cpo_fini   = lov_empty_page_fini,
         .cpo_print  = lov_page_print
 static const struct cl_page_operations lov_empty_page_ops = {
         .cpo_fini   = lov_empty_page_fini,
         .cpo_print  = lov_page_print
index af0fca0..b7ddaed 100644 (file)
@@ -36,6 +36,7 @@
  * Client IO.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  * Client IO.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_CLASS
  */
 
 #define DEBUG_SUBSYSTEM S_CLASS
@@ -1287,27 +1288,6 @@ void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
 EXPORT_SYMBOL(cl_page_list_discard);
 
 /**
 EXPORT_SYMBOL(cl_page_list_discard);
 
 /**
- * Unmaps all pages in a queue from user virtual memory.
- */
-int cl_page_list_unmap(const struct lu_env *env, struct cl_io *io,
-                      struct cl_page_list *plist)
-{
-       struct cl_page *page;
-       int result;
-
-       LINVRNT(plist->pl_owner == current);
-       ENTRY;
-       result = 0;
-       cl_page_list_for_each(page, plist) {
-               result = cl_page_unmap(env, io, page);
-               if (result != 0)
-                       break;
-       }
-       RETURN(result);
-}
-EXPORT_SYMBOL(cl_page_list_unmap);
-
-/**
  * Initialize dual page queue.
  */
 void cl_2queue_init(struct cl_2queue *queue)
  * Initialize dual page queue.
  */
 void cl_2queue_init(struct cl_2queue *queue)
index ff5e6ee..a2a1624 100644 (file)
@@ -36,6 +36,7 @@
  * Client Extent Lock.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  * Client Extent Lock.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_CLASS
  */
 
 #define DEBUG_SUBSYSTEM S_CLASS
@@ -1894,129 +1895,6 @@ struct cl_lock *cl_lock_at_pgoff(const struct lu_env *env,
 EXPORT_SYMBOL(cl_lock_at_pgoff);
 
 /**
 EXPORT_SYMBOL(cl_lock_at_pgoff);
 
 /**
- * Calculate the page offset at the layer of @lock.
- * At the time of this writing, @page is top page and @lock is sub lock.
- */
-static pgoff_t pgoff_at_lock(struct cl_page *page, struct cl_lock *lock)
-{
-        struct lu_device_type *dtype;
-        const struct cl_page_slice *slice;
-
-        dtype = lock->cll_descr.cld_obj->co_lu.lo_dev->ld_type;
-        slice = cl_page_at(page, dtype);
-        LASSERT(slice != NULL);
-        return slice->cpl_page->cp_index;
-}
-
-/**
- * Check if page @page is covered by an extra lock or discard it.
- */
-static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
-                                struct cl_page *page, void *cbdata)
-{
-        struct cl_thread_info *info = cl_env_info(env);
-        struct cl_lock *lock = cbdata;
-        pgoff_t index = pgoff_at_lock(page, lock);
-
-        if (index >= info->clt_fn_index) {
-                struct cl_lock *tmp;
-
-               /* refresh non-overlapped index */
-               tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
-                                       lock, 1, 0);
-                if (tmp != NULL) {
-                        /* Cache the first-non-overlapped index so as to skip
-                         * all pages within [index, clt_fn_index). This
-                         * is safe because if tmp lock is canceled, it will
-                         * discard these pages. */
-                        info->clt_fn_index = tmp->cll_descr.cld_end + 1;
-                        if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
-                                info->clt_fn_index = CL_PAGE_EOF;
-                        cl_lock_put(env, tmp);
-                } else if (cl_page_own(env, io, page) == 0) {
-                        /* discard the page */
-                        cl_page_unmap(env, io, page);
-                        cl_page_discard(env, io, page);
-                        cl_page_disown(env, io, page);
-                } else {
-                        LASSERT(page->cp_state == CPS_FREEING);
-                }
-        }
-
-        info->clt_next_index = index + 1;
-        return CLP_GANG_OKAY;
-}
-
-static int discard_cb(const struct lu_env *env, struct cl_io *io,
-                      struct cl_page *page, void *cbdata)
-{
-       struct cl_thread_info *info = cl_env_info(env);
-       struct cl_lock *lock   = cbdata;
-
-       LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
-       KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
-                     !PageWriteback(cl_page_vmpage(env, page))));
-       KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
-                     !PageDirty(cl_page_vmpage(env, page))));
-
-       info->clt_next_index = pgoff_at_lock(page, lock) + 1;
-       if (cl_page_own(env, io, page) == 0) {
-               /* discard the page */
-               cl_page_unmap(env, io, page);
-               cl_page_discard(env, io, page);
-               cl_page_disown(env, io, page);
-       } else {
-               LASSERT(page->cp_state == CPS_FREEING);
-       }
-
-       return CLP_GANG_OKAY;
-}
-
-/**
- * Discard pages protected by the given lock. This function traverses radix
- * tree to find all covering pages and discard them. If a page is being covered
- * by other locks, it should remain in cache.
- *
- * If error happens on any step, the process continues anyway (the reasoning
- * behind this being that lock cancellation cannot be delayed indefinitely).
- */
-int cl_lock_discard_pages(const struct lu_env *env, struct cl_lock *lock)
-{
-        struct cl_thread_info *info  = cl_env_info(env);
-        struct cl_io          *io    = &info->clt_io;
-        struct cl_lock_descr  *descr = &lock->cll_descr;
-        cl_page_gang_cb_t      cb;
-        int res;
-        int result;
-
-        LINVRNT(cl_lock_invariant(env, lock));
-        ENTRY;
-
-        io->ci_obj = cl_object_top(descr->cld_obj);
-       io->ci_ignore_layout = 1;
-        result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
-        if (result != 0)
-                GOTO(out, result);
-
-       cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
-        info->clt_fn_index = info->clt_next_index = descr->cld_start;
-        do {
-                res = cl_page_gang_lookup(env, descr->cld_obj, io,
-                                          info->clt_next_index, descr->cld_end,
-                                          cb, (void *)lock);
-                if (info->clt_next_index > descr->cld_end)
-                        break;
-
-               if (res == CLP_GANG_RESCHED)
-                       cond_resched();
-       } while (res != CLP_GANG_OKAY);
-out:
-       cl_io_fini(env, io);
-       RETURN(result);
-}
-EXPORT_SYMBOL(cl_lock_discard_pages);
-
-/**
  * Eliminate all locks for a given object.
  *
  * Caller has to guarantee that no lock is in active use.
  * Eliminate all locks for a given object.
  *
  * Caller has to guarantee that no lock is in active use.
@@ -2031,12 +1909,6 @@ void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
 
        ENTRY;
        head = cl_object_header(obj);
 
        ENTRY;
        head = cl_object_header(obj);
-       /*
-        * If locks are destroyed without cancellation, all pages must be
-        * already destroyed (as otherwise they will be left unprotected).
-        */
-       LASSERT(ergo(!cancel,
-                    head->coh_tree.rnode == NULL && head->coh_pages == 0));
 
        spin_lock(&head->coh_lock_guard);
        while (!cfs_list_empty(&head->coh_locks)) {
 
        spin_lock(&head->coh_lock_guard);
        while (!cfs_list_empty(&head->coh_locks)) {
index 8f29926..6ad9b0a 100644 (file)
@@ -36,6 +36,7 @@
  * Client Lustre Object.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  * Client Lustre Object.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
  */
 
 /*
  */
 
 /*
@@ -43,7 +44,6 @@
  *
  *  i_mutex
  *      PG_locked
  *
  *  i_mutex
  *      PG_locked
- *          ->coh_page_guard
  *          ->coh_lock_guard
  *          ->coh_attr_guard
  *          ->ls_guard
  *          ->coh_lock_guard
  *          ->coh_attr_guard
  *          ->ls_guard
@@ -63,8 +63,6 @@
 
 static struct kmem_cache *cl_env_kmem;
 
 
 static struct kmem_cache *cl_env_kmem;
 
-/** Lock class of cl_object_header::coh_page_guard */
-static struct lock_class_key cl_page_guard_class;
 /** Lock class of cl_object_header::coh_lock_guard */
 static struct lock_class_key cl_lock_guard_class;
 /** Lock class of cl_object_header::coh_attr_guard */
 /** Lock class of cl_object_header::coh_lock_guard */
 static struct lock_class_key cl_lock_guard_class;
 /** Lock class of cl_object_header::coh_attr_guard */
@@ -82,15 +80,10 @@ int cl_object_header_init(struct cl_object_header *h)
         ENTRY;
         result = lu_object_header_init(&h->coh_lu);
         if (result == 0) {
         ENTRY;
         result = lu_object_header_init(&h->coh_lu);
         if (result == 0) {
-               spin_lock_init(&h->coh_page_guard);
                spin_lock_init(&h->coh_lock_guard);
                spin_lock_init(&h->coh_attr_guard);
                spin_lock_init(&h->coh_lock_guard);
                spin_lock_init(&h->coh_attr_guard);
-               lockdep_set_class(&h->coh_page_guard, &cl_page_guard_class);
                lockdep_set_class(&h->coh_lock_guard, &cl_lock_guard_class);
                lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class);
                lockdep_set_class(&h->coh_lock_guard, &cl_lock_guard_class);
                lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class);
-                h->coh_pages = 0;
-                /* XXX hard coded GFP_* mask. */
-                INIT_RADIX_TREE(&h->coh_tree, GFP_ATOMIC);
                 CFS_INIT_LIST_HEAD(&h->coh_locks);
                h->coh_page_bufsize = ALIGN(sizeof(struct cl_page), 8);
         }
                 CFS_INIT_LIST_HEAD(&h->coh_locks);
                h->coh_page_bufsize = ALIGN(sizeof(struct cl_page), 8);
         }
@@ -331,6 +324,33 @@ int cl_conf_set(const struct lu_env *env, struct cl_object *obj,
 EXPORT_SYMBOL(cl_conf_set);
 
 /**
 EXPORT_SYMBOL(cl_conf_set);
 
 /**
+ * Prunes caches of pages and locks for this object.
+ */
+void cl_object_prune(const struct lu_env *env, struct cl_object *obj)
+{
+       struct lu_object_header *top;
+       struct cl_object *o;
+       int result;
+       ENTRY;
+
+       top = obj->co_lu.lo_header;
+       result = 0;
+       cfs_list_for_each_entry(o, &top->loh_layers, co_lu.lo_linkage) {
+               if (o->co_ops->coo_prune != NULL) {
+                       result = o->co_ops->coo_prune(env, o);
+                       if (result != 0)
+                               break;
+               }
+       }
+
+       /* TODO: pruning locks will be moved into layers after cl_lock
+        * simplification is done */
+       cl_locks_prune(env, obj, 1);
+       EXIT;
+}
+EXPORT_SYMBOL(cl_object_prune);
+
+/**
  * Helper function removing all object locks, and marking object for
  * deletion. All object pages must have been deleted at this point.
  *
  * Helper function removing all object locks, and marking object for
  * deletion. All object pages must have been deleted at this point.
  *
@@ -342,8 +362,6 @@ void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
         struct cl_object_header *hdr;
 
         hdr = cl_object_header(obj);
         struct cl_object_header *hdr;
 
         hdr = cl_object_header(obj);
-        LASSERT(hdr->coh_tree.rnode == NULL);
-        LASSERT(hdr->coh_pages == 0);
 
        set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags);
         /*
 
        set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags);
         /*
@@ -358,18 +376,6 @@ void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
 EXPORT_SYMBOL(cl_object_kill);
 
 /**
 EXPORT_SYMBOL(cl_object_kill);
 
 /**
- * Prunes caches of pages and locks for this object.
- */
-void cl_object_prune(const struct lu_env *env, struct cl_object *obj)
-{
-        ENTRY;
-        cl_pages_prune(env, obj);
-        cl_locks_prune(env, obj, 1);
-        EXIT;
-}
-EXPORT_SYMBOL(cl_object_prune);
-
-/**
  * Check if the object has locks.
  */
 int cl_object_has_locks(struct cl_object *obj)
  * Check if the object has locks.
  */
 int cl_object_has_locks(struct cl_object *obj)
index 9e57e09..a8f46ea 100644 (file)
@@ -36,6 +36,7 @@
  * Client Lustre Page.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  * Client Lustre Page.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_CLASS
  */
 
 #define DEBUG_SUBSYSTEM S_CLASS
@@ -48,8 +49,7 @@
 #include <cl_object.h>
 #include "cl_internal.h"
 
 #include <cl_object.h>
 #include "cl_internal.h"
 
-static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
-                            int radix);
+static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg);
 
 #ifdef LIBCFS_DEBUG
 # define PASSERT(env, page, expr)                                       \
 
 #ifdef LIBCFS_DEBUG
 # define PASSERT(env, page, expr)                                       \
@@ -111,8 +111,7 @@ static struct cl_page *cl_page_top_trusted(struct cl_page *page)
  *
  * This function can be used to obtain initial reference to previously
  * unreferenced cached object. It can be called only if concurrent page
  *
  * This function can be used to obtain initial reference to previously
  * unreferenced cached object. It can be called only if concurrent page
- * reclamation is somehow prevented, e.g., by locking page radix-tree
- * (cl_object_header::hdr->coh_page_guard), or by keeping a lock on a VM page,
+ * reclamation is somehow prevented, e.g., by keeping a lock on a VM page,
  * associated with \a page.
  *
  * Use with care! Not exported.
  * associated with \a page.
  *
  * Use with care! Not exported.
@@ -147,132 +146,6 @@ cl_page_at_trusted(const struct cl_page *page,
         RETURN(NULL);
 }
 
         RETURN(NULL);
 }
 
-/**
- * Returns a page with given index in the given object, or NULL if no page is
- * found. Acquires a reference on \a page.
- *
- * Locking: called under cl_object_header::coh_page_guard spin-lock.
- */
-struct cl_page *cl_page_lookup(struct cl_object_header *hdr, pgoff_t index)
-{
-       struct cl_page *page;
-
-       LASSERT(spin_is_locked(&hdr->coh_page_guard));
-
-       page = radix_tree_lookup(&hdr->coh_tree, index);
-       if (page != NULL)
-               cl_page_get_trust(page);
-       return page;
-}
-EXPORT_SYMBOL(cl_page_lookup);
-
-/**
- * Returns a list of pages by a given [start, end] of \a obj.
- *
- * \param resched If not NULL, then we give up before hogging CPU for too
- * long and set *resched = 1, in that case caller should implement a retry
- * logic.
- *
- * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
- * crucial in the face of [offset, EOF] locks.
- *
- * Return at least one page in @queue unless there is no covered page.
- */
-int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
-                       struct cl_io *io, pgoff_t start, pgoff_t end,
-                       cl_page_gang_cb_t cb, void *cbdata)
-{
-        struct cl_object_header *hdr;
-        struct cl_page          *page;
-        struct cl_page         **pvec;
-        const struct cl_page_slice  *slice;
-        const struct lu_device_type *dtype;
-        pgoff_t                  idx;
-        unsigned int             nr;
-        unsigned int             i;
-        unsigned int             j;
-        int                      res = CLP_GANG_OKAY;
-        int                      tree_lock = 1;
-        ENTRY;
-
-        idx = start;
-        hdr = cl_object_header(obj);
-        pvec = cl_env_info(env)->clt_pvec;
-        dtype = cl_object_top(obj)->co_lu.lo_dev->ld_type;
-       spin_lock(&hdr->coh_page_guard);
-        while ((nr = radix_tree_gang_lookup(&hdr->coh_tree, (void **)pvec,
-                                            idx, CLT_PVEC_SIZE)) > 0) {
-                int end_of_region = 0;
-                idx = pvec[nr - 1]->cp_index + 1;
-                for (i = 0, j = 0; i < nr; ++i) {
-                        page = pvec[i];
-                        pvec[i] = NULL;
-
-                        LASSERT(page->cp_type == CPT_CACHEABLE);
-                        if (page->cp_index > end) {
-                                end_of_region = 1;
-                                break;
-                        }
-                        if (page->cp_state == CPS_FREEING)
-                                continue;
-
-                        slice = cl_page_at_trusted(page, dtype);
-                        /*
-                         * Pages for lsm-less file has no underneath sub-page
-                         * for osc, in case of ...
-                         */
-                        PASSERT(env, page, slice != NULL);
-
-                       page = slice->cpl_page;
-                       /*
-                        * Can safely call cl_page_get_trust() under
-                        * radix-tree spin-lock.
-                        *
-                        * XXX not true, because @page is from object another
-                        * than @hdr and protected by different tree lock.
-                        */
-                       cl_page_get_trust(page);
-                       lu_ref_add_atomic(&page->cp_reference,
-                                         "gang_lookup", current);
-                       pvec[j++] = page;
-               }
-
-               /*
-                * Here a delicate locking dance is performed. Current thread
-                * holds a reference to a page, but has to own it before it
-                * can be placed into queue. Owning implies waiting, so
-                * radix-tree lock is to be released. After a wait one has to
-                * check that pages weren't truncated (cl_page_own() returns
-                * error in the latter case).
-                */
-               spin_unlock(&hdr->coh_page_guard);
-               tree_lock = 0;
-
-               for (i = 0; i < j; ++i) {
-                       page = pvec[i];
-                       if (res == CLP_GANG_OKAY)
-                               res = (*cb)(env, io, page, cbdata);
-                       lu_ref_del(&page->cp_reference,
-                                  "gang_lookup", current);
-                       cl_page_put(env, page);
-               }
-               if (nr < CLT_PVEC_SIZE || end_of_region)
-                       break;
-
-               if (res == CLP_GANG_OKAY && need_resched())
-                       res = CLP_GANG_RESCHED;
-               if (res != CLP_GANG_OKAY)
-                       break;
-
-               spin_lock(&hdr->coh_page_guard);
-               tree_lock = 1;
-       }
-       if (tree_lock)
-               spin_unlock(&hdr->coh_page_guard);
-       RETURN(res);
-}
-EXPORT_SYMBOL(cl_page_gang_lookup);
-
 static void cl_page_free(const struct lu_env *env, struct cl_page *page)
 {
         struct cl_object *obj  = page->cp_obj;
 static void cl_page_free(const struct lu_env *env, struct cl_page *page)
 {
         struct cl_object *obj  = page->cp_obj;
@@ -314,7 +187,7 @@ static inline void cl_page_state_set_trust(struct cl_page *page,
         *(enum cl_page_state *)&page->cp_state = state;
 }
 
         *(enum cl_page_state *)&page->cp_state = state;
 }
 
-static struct cl_page *cl_page_alloc(const struct lu_env *env,
+struct cl_page *cl_page_alloc(const struct lu_env *env,
                struct cl_object *o, pgoff_t ind, struct page *vmpage,
                enum cl_page_type type)
 {
                struct cl_object *o, pgoff_t ind, struct page *vmpage,
                enum cl_page_type type)
 {
@@ -327,8 +200,6 @@ static struct cl_page *cl_page_alloc(const struct lu_env *env,
        if (page != NULL) {
                int result = 0;
                cfs_atomic_set(&page->cp_ref, 1);
        if (page != NULL) {
                int result = 0;
                cfs_atomic_set(&page->cp_ref, 1);
-               if (type == CPT_CACHEABLE) /* for radix tree */
-                       cfs_atomic_inc(&page->cp_ref);
                page->cp_obj = o;
                cl_object_get(o);
                lu_object_ref_add_at(&o->co_lu, &page->cp_obj_ref, "cl_page",
                page->cp_obj = o;
                cl_object_get(o);
                lu_object_ref_add_at(&o->co_lu, &page->cp_obj_ref, "cl_page",
@@ -348,7 +219,7 @@ static struct cl_page *cl_page_alloc(const struct lu_env *env,
                                result = o->co_ops->coo_page_init(env, o,
                                                                  page, vmpage);
                                if (result != 0) {
                                result = o->co_ops->coo_page_init(env, o,
                                                                  page, vmpage);
                                if (result != 0) {
-                                       cl_page_delete0(env, page, 0);
+                                       cl_page_delete0(env, page);
                                        cl_page_free(env, page);
                                        page = ERR_PTR(result);
                                        break;
                                        cl_page_free(env, page);
                                        page = ERR_PTR(result);
                                        break;
@@ -365,6 +236,7 @@ static struct cl_page *cl_page_alloc(const struct lu_env *env,
        }
        RETURN(page);
 }
        }
        RETURN(page);
 }
+EXPORT_SYMBOL(cl_page_alloc);
 
 /**
  * Returns a cl_page with index \a idx at the object \a o, and associated with
 
 /**
  * Returns a cl_page with index \a idx at the object \a o, and associated with
@@ -377,16 +249,13 @@ static struct cl_page *cl_page_alloc(const struct lu_env *env,
  *
  * \see cl_object_find(), cl_lock_find()
  */
  *
  * \see cl_object_find(), cl_lock_find()
  */
-static struct cl_page *cl_page_find0(const struct lu_env *env,
-                                     struct cl_object *o,
-                                     pgoff_t idx, struct page *vmpage,
-                                     enum cl_page_type type,
-                                     struct cl_page *parent)
+struct cl_page *cl_page_find(const struct lu_env *env,
+                            struct cl_object *o,
+                            pgoff_t idx, struct page *vmpage,
+                            enum cl_page_type type)
 {
        struct cl_page          *page = NULL;
 {
        struct cl_page          *page = NULL;
-       struct cl_page          *ghost = NULL;
        struct cl_object_header *hdr;
        struct cl_object_header *hdr;
-       int err;
 
        LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
        might_sleep();
 
        LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
        might_sleep();
@@ -413,93 +282,20 @@ static struct cl_page *cl_page_find0(const struct lu_env *env,
                  *       reference on it.
                  */
                 page = cl_vmpage_page(vmpage, o);
                  *       reference on it.
                  */
                 page = cl_vmpage_page(vmpage, o);
-                PINVRNT(env, page,
-                        ergo(page != NULL,
-                             cl_page_vmpage(env, page) == vmpage &&
-                             (void *)radix_tree_lookup(&hdr->coh_tree,
-                                                       idx) == page));
-        }
-
-        if (page != NULL) {
-               CS_PAGE_INC(o, hit);
-                RETURN(page);
+               if (page != NULL) {
+                       CS_PAGE_INC(o, hit);
+                       RETURN(page);
+               }
         }
 
         /* allocate and initialize cl_page */
         page = cl_page_alloc(env, o, idx, vmpage, type);
         }
 
         /* allocate and initialize cl_page */
         page = cl_page_alloc(env, o, idx, vmpage, type);
-        if (IS_ERR(page))
-                RETURN(page);
-
-        if (type == CPT_TRANSIENT) {
-                if (parent) {
-                        LASSERT(page->cp_parent == NULL);
-                        page->cp_parent = parent;
-                        parent->cp_child = page;
-                }
-                RETURN(page);
-        }
-
-        /*
-         * XXX optimization: use radix_tree_preload() here, and change tree
-         * gfp mask to GFP_KERNEL in cl_object_header_init().
-         */
-       spin_lock(&hdr->coh_page_guard);
-        err = radix_tree_insert(&hdr->coh_tree, idx, page);
-        if (err != 0) {
-                ghost = page;
-                /*
-                 * Noted by Jay: a lock on \a vmpage protects cl_page_find()
-                 * from this race, but
-                 *
-                 *     0. it's better to have cl_page interface "locally
-                 *     consistent" so that its correctness can be reasoned
-                 *     about without appealing to the (obscure world of) VM
-                 *     locking.
-                 *
-                 *     1. handling this race allows ->coh_tree to remain
-                 *     consistent even when VM locking is somehow busted,
-                 *     which is very useful during diagnosing and debugging.
-                 */
-                page = ERR_PTR(err);
-                CL_PAGE_DEBUG(D_ERROR, env, ghost,
-                              "fail to insert into radix tree: %d\n", err);
-        } else {
-                if (parent) {
-                        LASSERT(page->cp_parent == NULL);
-                        page->cp_parent = parent;
-                        parent->cp_child = page;
-                }
-                hdr->coh_pages++;
-        }
-       spin_unlock(&hdr->coh_page_guard);
-
-        if (unlikely(ghost != NULL)) {
-                cl_page_delete0(env, ghost, 0);
-                cl_page_free(env, ghost);
-        }
-        RETURN(page);
-}
-
-struct cl_page *cl_page_find(const struct lu_env *env, struct cl_object *o,
-                             pgoff_t idx, struct page *vmpage,
-                             enum cl_page_type type)
-{
-        return cl_page_find0(env, o, idx, vmpage, type, NULL);
+       RETURN(page);
 }
 EXPORT_SYMBOL(cl_page_find);
 
 }
 EXPORT_SYMBOL(cl_page_find);
 
-
-struct cl_page *cl_page_find_sub(const struct lu_env *env, struct cl_object *o,
-                                 pgoff_t idx, struct page *vmpage,
-                                 struct cl_page *parent)
-{
-        return cl_page_find0(env, o, idx, vmpage, parent->cp_type, parent);
-}
-EXPORT_SYMBOL(cl_page_find_sub);
-
 static inline int cl_page_invariant(const struct cl_page *pg)
 {
 static inline int cl_page_invariant(const struct cl_page *pg)
 {
-        struct cl_object_header *header;
         struct cl_page          *parent;
         struct cl_page          *child;
         struct cl_io            *owner;
         struct cl_page          *parent;
         struct cl_page          *child;
         struct cl_io            *owner;
@@ -509,7 +305,6 @@ static inline int cl_page_invariant(const struct cl_page *pg)
          */
         LINVRNT(cl_page_is_vmlocked(NULL, pg));
 
          */
         LINVRNT(cl_page_is_vmlocked(NULL, pg));
 
-        header = cl_object_header(pg->cp_obj);
         parent = pg->cp_parent;
         child  = pg->cp_child;
         owner  = pg->cp_owner;
         parent = pg->cp_parent;
         child  = pg->cp_child;
         owner  = pg->cp_owner;
@@ -522,15 +317,7 @@ static inline int cl_page_invariant(const struct cl_page *pg)
                 ergo(owner != NULL && parent != NULL,
                      parent->cp_owner == pg->cp_owner->ci_parent) &&
                 ergo(owner != NULL && child != NULL,
                 ergo(owner != NULL && parent != NULL,
                      parent->cp_owner == pg->cp_owner->ci_parent) &&
                 ergo(owner != NULL && child != NULL,
-                     child->cp_owner->ci_parent == owner) &&
-                /*
-                 * Either page is early in initialization (has neither child
-                 * nor parent yet), or it is in the object radix tree.
-                 */
-                ergo(pg->cp_state < CPS_FREEING && pg->cp_type == CPT_CACHEABLE,
-                     (void *)radix_tree_lookup(&header->coh_tree,
-                                               pg->cp_index) == pg ||
-                     (child == NULL && parent == NULL));
+                     child->cp_owner->ci_parent == owner);
 }
 
 static void cl_page_state_set0(const struct lu_env *env,
 }
 
 static void cl_page_state_set0(const struct lu_env *env,
@@ -1081,10 +868,8 @@ EXPORT_SYMBOL(cl_page_discard);
  * pages, e.g,. in a error handling cl_page_find()->cl_page_delete0()
  * path. Doesn't check page invariant.
  */
  * pages, e.g,. in a error handling cl_page_find()->cl_page_delete0()
  * path. Doesn't check page invariant.
  */
-static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
-                            int radix)
+static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg)
 {
 {
-        struct cl_page *tmp = pg;
         ENTRY;
 
         PASSERT(env, pg, pg == cl_page_top(pg));
         ENTRY;
 
         PASSERT(env, pg, pg == cl_page_top(pg));
@@ -1095,42 +880,11 @@ static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
          */
         cl_page_owner_clear(pg);
 
          */
         cl_page_owner_clear(pg);
 
-        /* 
-         * unexport the page firstly before freeing it so that
-         * the page content is considered to be invalid.
-         * We have to do this because a CPS_FREEING cl_page may
-         * be NOT under the protection of a cl_lock.
-         * Afterwards, if this page is found by other threads, then this
-         * page will be forced to reread.
-         */
-        cl_page_export(env, pg, 0);
         cl_page_state_set0(env, pg, CPS_FREEING);
 
         cl_page_state_set0(env, pg, CPS_FREEING);
 
-        CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_delete),
+        CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(cpo_delete),
                        (const struct lu_env *, const struct cl_page_slice *));
 
                        (const struct lu_env *, const struct cl_page_slice *));
 
-        if (tmp->cp_type == CPT_CACHEABLE) {
-                if (!radix)
-                        /* !radix means that @pg is not yet in the radix tree,
-                         * skip removing it.
-                         */
-                        tmp = pg->cp_child;
-                for (; tmp != NULL; tmp = tmp->cp_child) {
-                        void                    *value;
-                        struct cl_object_header *hdr;
-
-                        hdr = cl_object_header(tmp->cp_obj);
-                       spin_lock(&hdr->coh_page_guard);
-                       value = radix_tree_delete(&hdr->coh_tree,
-                                                 tmp->cp_index);
-                       PASSERT(env, tmp, value == tmp);
-                       PASSERT(env, tmp, hdr->coh_pages > 0);
-                       hdr->coh_pages--;
-                       spin_unlock(&hdr->coh_page_guard);
-                       cl_page_put(env, tmp);
-                }
-        }
-
         EXIT;
 }
 
         EXIT;
 }
 
@@ -1161,33 +915,14 @@ static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
  */
 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
 {
  */
 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
 {
-        PINVRNT(env, pg, cl_page_invariant(pg));
-        ENTRY;
-        cl_page_delete0(env, pg, 1);
-        EXIT;
+       PINVRNT(env, pg, cl_page_invariant(pg));
+       ENTRY;
+       cl_page_delete0(env, pg);
+       EXIT;
 }
 EXPORT_SYMBOL(cl_page_delete);
 
 /**
 }
 EXPORT_SYMBOL(cl_page_delete);
 
 /**
- * Unmaps page from user virtual memory.
- *
- * Calls cl_page_operations::cpo_unmap() through all layers top-to-bottom. The
- * layer responsible for VM interaction has to unmap page from user space
- * virtual memory.
- *
- * \see cl_page_operations::cpo_unmap()
- */
-int cl_page_unmap(const struct lu_env *env,
-                  struct cl_io *io, struct cl_page *pg)
-{
-        PINVRNT(env, pg, cl_page_is_owned(pg, io));
-        PINVRNT(env, pg, cl_page_invariant(pg));
-
-        return cl_page_invoke(env, io, pg, CL_PAGE_OP(cpo_unmap));
-}
-EXPORT_SYMBOL(cl_page_unmap);
-
-/**
  * Marks page up-to-date.
  *
  * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
  * Marks page up-to-date.
  *
  * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
@@ -1460,54 +1195,6 @@ int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
 }
 EXPORT_SYMBOL(cl_page_is_under_lock);
 
 }
 EXPORT_SYMBOL(cl_page_is_under_lock);
 
-static int page_prune_cb(const struct lu_env *env, struct cl_io *io,
-                         struct cl_page *page, void *cbdata)
-{
-        cl_page_own(env, io, page);
-        cl_page_unmap(env, io, page);
-        cl_page_discard(env, io, page);
-        cl_page_disown(env, io, page);
-        return CLP_GANG_OKAY;
-}
-
-/**
- * Purges all cached pages belonging to the object \a obj.
- */
-int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
-{
-       struct cl_thread_info   *info;
-       struct cl_object        *obj = cl_object_top(clobj);
-       struct cl_io            *io;
-       int                      result;
-
-       ENTRY;
-       info  = cl_env_info(env);
-       io    = &info->clt_io;
-
-       /*
-        * initialize the io. This is ugly since we never do IO in this
-        * function, we just make cl_page_list functions happy. -jay
-        */
-       io->ci_obj = obj;
-       io->ci_ignore_layout = 1;
-       result = cl_io_init(env, io, CIT_MISC, obj);
-       if (result != 0) {
-               cl_io_fini(env, io);
-               RETURN(io->ci_result);
-       }
-
-       do {
-               result = cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF,
-                                            page_prune_cb, NULL);
-               if (result == CLP_GANG_RESCHED)
-                       cond_resched();
-       } while (result != CLP_GANG_OKAY);
-
-       cl_io_fini(env, io);
-       RETURN(result);
-}
-EXPORT_SYMBOL(cl_pages_prune);
-
 /**
  * Tells transfer engine that only part of a page is to be transmitted.
  *
 /**
  * Tells transfer engine that only part of a page is to be transmitted.
  *
index 94cc695..a7697f3 100644 (file)
@@ -969,7 +969,6 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                lu_ref_add(&page->cp_reference, "truncate", current);
 
                if (cl_page_own(env, io, page) == 0) {
                lu_ref_add(&page->cp_reference, "truncate", current);
 
                if (cl_page_own(env, io, page) == 0) {
-                       cl_page_unmap(env, io, page);
                        cl_page_discard(env, io, page);
                        cl_page_disown(env, io, page);
                } else {
                        cl_page_discard(env, io, page);
                        cl_page_disown(env, io, page);
                } else {
@@ -2197,8 +2196,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
 
                cl_object_get(obj);
                client_obd_list_unlock(&cli->cl_loi_list_lock);
 
                cl_object_get(obj);
                client_obd_list_unlock(&cli->cl_loi_list_lock);
-               lu_object_ref_add_at(&obj->co_lu, &link, "check",
-                                    current);
+               lu_object_ref_add_at(&obj->co_lu, &link, "check", current);
 
                /* attempt some read/write balancing by alternating between
                 * reads and writes in an object.  The makes_rpc checks here
 
                /* attempt some read/write balancing by alternating between
                 * reads and writes in an object.  The makes_rpc checks here
@@ -2239,8 +2237,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
                osc_object_unlock(osc);
 
                osc_list_maint(cli, osc);
                osc_object_unlock(osc);
 
                osc_list_maint(cli, osc);
-               lu_object_ref_del_at(&obj->co_lu, &link, "check",
-                                    current);
+               lu_object_ref_del_at(&obj->co_lu, &link, "check", current);
                cl_object_put(env, obj);
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
                cl_object_put(env, obj);
 
                client_obd_list_lock(&cli->cl_loi_list_lock);
@@ -3042,4 +3039,207 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
        RETURN(result);
 }
 
        RETURN(result);
 }
 
+/**
+ * Returns a list of pages by a given [start, end] of \a obj.
+ *
+ * \param resched If not NULL, then we give up before hogging CPU for too
+ * long and set *resched = 1, in that case caller should implement a retry
+ * logic.
+ *
+ * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
+ * crucial in the face of [offset, EOF] locks.
+ *
+ * Return at least one page in @queue unless there is no covered page.
+ */
+int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
+                       struct osc_object *osc, pgoff_t start, pgoff_t end,
+                       osc_page_gang_cbt cb, void *cbdata)
+{
+       struct osc_page *ops;
+       void            **pvec;
+       pgoff_t         idx;
+       unsigned int    nr;
+       unsigned int    i;
+       unsigned int    j;
+       int             res = CLP_GANG_OKAY;
+       bool            tree_lock = true;
+       ENTRY;
+
+       idx = start;
+       pvec = osc_env_info(env)->oti_pvec;
+       spin_lock(&osc->oo_tree_lock);
+       while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
+                                           idx, OTI_PVEC_SIZE)) > 0) {
+               struct cl_page *page;
+               bool end_of_region = false;
+
+               for (i = 0, j = 0; i < nr; ++i) {
+                       ops = pvec[i];
+                       pvec[i] = NULL;
+
+                       idx = osc_index(ops);
+                       if (idx > end) {
+                               end_of_region = true;
+                               break;
+                       }
+
+                       page = cl_page_top(ops->ops_cl.cpl_page);
+                       LASSERT(page->cp_type == CPT_CACHEABLE);
+                       if (page->cp_state == CPS_FREEING)
+                               continue;
+
+                       cl_page_get(page);
+                       lu_ref_add_atomic(&page->cp_reference,
+                                         "gang_lookup", current);
+                       pvec[j++] = ops;
+               }
+               ++idx;
+
+               /*
+                * Here a delicate locking dance is performed. Current thread
+                * holds a reference to a page, but has to own it before it
+                * can be placed into queue. Owning implies waiting, so
+                * radix-tree lock is to be released. After a wait one has to
+                * check that pages weren't truncated (cl_page_own() returns
+                * error in the latter case).
+                */
+               spin_unlock(&osc->oo_tree_lock);
+               tree_lock = false;
+
+               for (i = 0; i < j; ++i) {
+                       ops = pvec[i];
+                       if (res == CLP_GANG_OKAY)
+                               res = (*cb)(env, io, ops, cbdata);
+
+                       page = cl_page_top(ops->ops_cl.cpl_page);
+                       lu_ref_del(&page->cp_reference, "gang_lookup", current);
+                       cl_page_put(env, page);
+               }
+               if (nr < OTI_PVEC_SIZE || end_of_region)
+                       break;
+
+               if (res == CLP_GANG_OKAY && need_resched())
+                       res = CLP_GANG_RESCHED;
+               if (res != CLP_GANG_OKAY)
+                       break;
+
+               spin_lock(&osc->oo_tree_lock);
+               tree_lock = true;
+       }
+       if (tree_lock)
+               spin_unlock(&osc->oo_tree_lock);
+       RETURN(res);
+}
+
+/**
+ * Check if page @page is covered by an extra lock or discard it.
+ */
+static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
+                               struct osc_page *ops, void *cbdata)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_lock *lock = cbdata;
+       pgoff_t index;
+
+       index = osc_index(ops);
+       if (index >= info->oti_fn_index) {
+               struct cl_lock *tmp;
+               struct cl_page *page = cl_page_top(ops->ops_cl.cpl_page);
+
+               /* refresh non-overlapped index */
+               tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
+                                      lock, 1, 0);
+               if (tmp != NULL) {
+                       /* Cache the first-non-overlapped index so as to skip
+                        * all pages within [index, oti_fn_index). This
+                        * is safe because if tmp lock is canceled, it will
+                        * discard these pages. */
+                       info->oti_fn_index = tmp->cll_descr.cld_end + 1;
+                       if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
+                               info->oti_fn_index = CL_PAGE_EOF;
+                       cl_lock_put(env, tmp);
+               } else if (cl_page_own(env, io, page) == 0) {
+                       /* discard the page */
+                       cl_page_discard(env, io, page);
+                       cl_page_disown(env, io, page);
+               } else {
+                       LASSERT(page->cp_state == CPS_FREEING);
+               }
+       }
+
+       info->oti_next_index = index + 1;
+       return CLP_GANG_OKAY;
+}
+
+static int discard_cb(const struct lu_env *env, struct cl_io *io,
+                     struct osc_page *ops, void *cbdata)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_lock *lock = cbdata;
+       struct cl_page *page = cl_page_top(ops->ops_cl.cpl_page);
+
+       LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
+       KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
+                     !PageWriteback(cl_page_vmpage(env, page))));
+       KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
+                     !PageDirty(cl_page_vmpage(env, page))));
+
+       /* page is top page. */
+       info->oti_next_index = osc_index(ops) + 1;
+       if (cl_page_own(env, io, page) == 0) {
+               /* discard the page */
+               cl_page_discard(env, io, page);
+               cl_page_disown(env, io, page);
+       } else {
+               LASSERT(page->cp_state == CPS_FREEING);
+       }
+
+       return CLP_GANG_OKAY;
+}
+
+/**
+ * Discard pages protected by the given lock. This function traverses radix
+ * tree to find all covering pages and discard them. If a page is being covered
+ * by other locks, it should remain in cache.
+ *
+ * If error happens on any step, the process continues anyway (the reasoning
+ * behind this being that lock cancellation cannot be delayed indefinitely).
+ */
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *ols)
+{
+       struct osc_thread_info *info = osc_env_info(env);
+       struct cl_io *io = &info->oti_io;
+       struct cl_object *osc = ols->ols_cl.cls_obj;
+       struct cl_lock *lock = ols->ols_cl.cls_lock;
+       struct cl_lock_descr *descr = &lock->cll_descr;
+       osc_page_gang_cbt cb;
+       int res;
+       int result;
+
+       ENTRY;
+
+       io->ci_obj = cl_object_top(osc);
+       io->ci_ignore_layout = 1;
+       result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+       if (result != 0)
+               GOTO(out, result);
+
+       cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
+       info->oti_fn_index = info->oti_next_index = descr->cld_start;
+       do {
+               res = osc_page_gang_lookup(env, io, cl2osc(osc),
+                                          info->oti_next_index, descr->cld_end,
+                                          cb, (void *)lock);
+               if (info->oti_next_index > descr->cld_end)
+                       break;
+
+               if (res == CLP_GANG_RESCHED)
+                       cond_resched();
+       } while (res != CLP_GANG_OKAY);
+out:
+       cl_io_fini(env, io);
+       RETURN(result);
+}
+
+
 /** @} osc */
 /** @} osc */
index 4a55e96..55e159b 100644 (file)
@@ -114,7 +114,12 @@ struct osc_thread_info {
         struct lustre_handle    oti_handle;
         struct cl_page_list     oti_plist;
        struct cl_io            oti_io;
         struct lustre_handle    oti_handle;
         struct cl_page_list     oti_plist;
        struct cl_io            oti_io;
-       struct cl_page         *oti_pvec[OTI_PVEC_SIZE];
+       void                    *oti_pvec[OTI_PVEC_SIZE];
+       /**
+        * Fields used by cl_lock_discard_pages().
+        */
+       pgoff_t                 oti_next_index;
+       pgoff_t                 oti_fn_index; /* first non-overlapped index */
 };
 
 struct osc_object {
 };
 
 struct osc_object {
@@ -171,6 +176,13 @@ struct osc_object {
        /** Protect extent tree. Will be used to protect
         * oo_{read|write}_pages soon. */
        spinlock_t          oo_lock;
        /** Protect extent tree. Will be used to protect
         * oo_{read|write}_pages soon. */
        spinlock_t          oo_lock;
+
+       /**
+        * Radix tree for caching pages
+        */
+       struct radix_tree_root  oo_tree;
+       spinlock_t              oo_tree_lock;
+       unsigned long           oo_npages;
 };
 
 static inline void osc_object_lock(struct osc_object *obj)
 };
 
 static inline void osc_object_lock(struct osc_object *obj)
@@ -571,6 +583,11 @@ static inline struct osc_page *oap2osc_page(struct osc_async_page *oap)
        return (struct osc_page *)container_of(oap, struct osc_page, ops_oap);
 }
 
        return (struct osc_page *)container_of(oap, struct osc_page, ops_oap);
 }
 
+static inline pgoff_t osc_index(struct osc_page *opg)
+{
+       return opg->ops_cl.cpl_page->cp_index;
+}
+
 static inline struct osc_lock *cl2osc_lock(const struct cl_lock_slice *slice)
 {
         LINVRNT(osc_is_object(&slice->cls_obj->co_lu));
 static inline struct osc_lock *cl2osc_lock(const struct cl_lock_slice *slice)
 {
         LINVRNT(osc_is_object(&slice->cls_obj->co_lu));
@@ -685,6 +702,14 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                      int sent, int rc);
 int osc_extent_release(const struct lu_env *env, struct osc_extent *ext);
 
                      int sent, int rc);
 int osc_extent_release(const struct lu_env *env, struct osc_extent *ext);
 
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *lock);
+
+typedef int (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *,
+                                struct osc_page *, void *);
+int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
+                        struct osc_object *osc, pgoff_t start, pgoff_t end,
+                        osc_page_gang_cbt cb, void *cbdata);
+
 /** @} osc */
 
 #endif /* OSC_CL_INTERNAL_H */
 /** @} osc */
 
 #endif /* OSC_CL_INTERNAL_H */
index bb5b6dd..a16fb81 100644 (file)
@@ -398,18 +398,13 @@ static int osc_async_upcall(void *a, int rc)
  * Checks that there are no pages being written in the extent being truncated.
  */
 static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
  * Checks that there are no pages being written in the extent being truncated.
  */
 static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
-                         struct cl_page *page, void *cbdata)
+                         struct osc_page *ops , void *cbdata)
 {
 {
-       const struct cl_page_slice *slice;
-       struct osc_page *ops;
+       struct cl_page *page = ops->ops_cl.cpl_page;
        struct osc_async_page *oap;
        __u64 start = *(__u64 *)cbdata;
 
        struct osc_async_page *oap;
        __u64 start = *(__u64 *)cbdata;
 
-       slice = cl_page_at(page, &osc_device_type);
-       LASSERT(slice != NULL);
-       ops = cl2osc_page(slice);
        oap = &ops->ops_oap;
        oap = &ops->ops_oap;
-
        if (oap->oap_cmd & OBD_BRW_WRITE &&
            !cfs_list_empty(&oap->oap_pending_item))
                CL_PAGE_DEBUG(D_ERROR, env, page, "exists " LPU64 "/%s.\n",
        if (oap->oap_cmd & OBD_BRW_WRITE &&
            !cfs_list_empty(&oap->oap_pending_item))
                CL_PAGE_DEBUG(D_ERROR, env, page, "exists " LPU64 "/%s.\n",
@@ -442,8 +437,9 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
         /*
          * Complain if there are pages in the truncated region.
          */
         /*
          * Complain if there are pages in the truncated region.
          */
-       cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF,
-                           trunc_check_cb, (void *)&size);
+       osc_page_gang_lookup(env, io, cl2osc(clob),
+                               start + partial, CL_PAGE_EOF,
+                               trunc_check_cb, (void *)&size);
 }
 #else /* __KERNEL__ */
 static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
 }
 #else /* __KERNEL__ */
 static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
index 839dfa5..78f3ceb 100644 (file)
@@ -36,6 +36,7 @@
  * Implementation of cl_lock for OSC layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  * Implementation of cl_lock for OSC layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_OSC
  */
 
 #define DEBUG_SUBSYSTEM S_OSC
@@ -910,11 +911,8 @@ static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
 static unsigned long osc_lock_weigh(const struct lu_env *env,
                                     const struct cl_lock_slice *slice)
 {
 static unsigned long osc_lock_weigh(const struct lu_env *env,
                                     const struct cl_lock_slice *slice)
 {
-        /*
-         * don't need to grab coh_page_guard since we don't care the exact #
-         * of pages..
-         */
-        return cl_object_header(slice->cls_obj)->coh_pages;
+       /* TODO: check how many pages are covered by this lock */
+       return cl2osc(slice->cls_obj)->oo_npages;
 }
 
 static void osc_lock_build_einfo(const struct lu_env *env,
 }
 
 static void osc_lock_build_einfo(const struct lu_env *env,
@@ -1290,7 +1288,7 @@ static int osc_lock_flush(struct osc_lock *ols, int discard)
                                result = 0;
                }
 
                                result = 0;
                }
 
-               rc = cl_lock_discard_pages(env, lock);
+               rc = osc_lock_discard_pages(env, ols);
                if (result == 0 && rc < 0)
                        result = rc;
 
                if (result == 0 && rc < 0)
                        result = rc;
 
@@ -1360,23 +1358,23 @@ static void osc_lock_cancel(const struct lu_env *env,
 
 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
 static int check_cb(const struct lu_env *env, struct cl_io *io,
 
 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
 static int check_cb(const struct lu_env *env, struct cl_io *io,
-                    struct cl_page *page, void *cbdata)
+                   struct osc_page *ops, void *cbdata)
 {
 {
-        struct cl_lock *lock = cbdata;
-
-        if (lock->cll_descr.cld_mode == CLM_READ) {
-                struct cl_lock *tmp;
-                tmp = cl_lock_at_page(env, lock->cll_descr.cld_obj,
-                                     page, lock, 1, 0);
-                if (tmp != NULL) {
-                        cl_lock_put(env, tmp);
-                        return CLP_GANG_OKAY;
-                }
-        }
+       struct cl_lock *lock = cbdata;
+
+       if (lock->cll_descr.cld_mode == CLM_READ) {
+               struct cl_lock *tmp;
+               tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj,
+                                      osc_index(ops), lock, 1, 0);
+               if (tmp != NULL) {
+                       cl_lock_put(env, tmp);
+                       return CLP_GANG_OKAY;
+               }
+       }
 
 
-        CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n");
-        CL_PAGE_DEBUG(D_ERROR, env, page, "\n");
-        return CLP_GANG_ABORT;
+       CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n");
+       CL_PAGE_DEBUG(D_ERROR, env, ops->ops_cl.cpl_page, "\n");
+       return CLP_GANG_ABORT;
 }
 
 /**
 }
 
 /**
@@ -1410,9 +1408,9 @@ static int osc_lock_has_pages(struct osc_lock *olck)
        io->ci_ignore_layout = 1;
         cl_io_init(env, io, CIT_MISC, io->ci_obj);
        do {
        io->ci_ignore_layout = 1;
         cl_io_init(env, io, CIT_MISC, io->ci_obj);
        do {
-               result = cl_page_gang_lookup(env, obj, io,
-                                            descr->cld_start, descr->cld_end,
-                                            check_cb, (void *)lock);
+               result = osc_page_gang_lookup(env, oob, io,
+                                             descr->cld_start, descr->cld_end,
+                                             check_cb, (void *)lock);
                if (result == CLP_GANG_ABORT)
                        break;
                if (result == CLP_GANG_RESCHED)
                if (result == CLP_GANG_ABORT)
                        break;
                if (result == CLP_GANG_RESCHED)
index 8d6eec6..91e3798 100644 (file)
@@ -36,6 +36,7 @@
  * Implementation of cl_object for OSC layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  * Implementation of cl_object for OSC layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_OSC
  */
 
 #define DEBUG_SUBSYSTEM S_OSC
@@ -97,6 +98,7 @@ static int osc_object_init(const struct lu_env *env, struct lu_object *obj,
        cfs_atomic_set(&osc->oo_nr_reads, 0);
        cfs_atomic_set(&osc->oo_nr_writes, 0);
        spin_lock_init(&osc->oo_lock);
        cfs_atomic_set(&osc->oo_nr_reads, 0);
        cfs_atomic_set(&osc->oo_nr_writes, 0);
        spin_lock_init(&osc->oo_lock);
+       spin_lock_init(&osc->oo_tree_lock);
 
        cl_object_page_init(lu2cl(obj), sizeof(struct osc_page));
 
 
        cl_object_page_init(lu2cl(obj), sizeof(struct osc_page));
 
index b7296ad..a673b91 100644 (file)
@@ -36,6 +36,7 @@
  * Implementation of cl_page for OSC layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  * Implementation of cl_page for OSC layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_OSC
  */
 
 #define DEBUG_SUBSYSTEM S_OSC
@@ -435,6 +436,18 @@ static void osc_page_delete(const struct lu_env *env,
 
        osc_lru_del(osc_cli(obj), opg);
 
 
        osc_lru_del(osc_cli(obj), opg);
 
+       if (slice->cpl_page->cp_type == CPT_CACHEABLE) {
+               void *value;
+
+               spin_lock(&obj->oo_tree_lock);
+               value = radix_tree_delete(&obj->oo_tree, osc_index(opg));
+               if (value != NULL)
+                       --obj->oo_npages;
+               spin_unlock(&obj->oo_tree_lock);
+
+               LASSERT(ergo(value != NULL, value == opg));
+       }
+
        EXIT;
 }
 
        EXIT;
 }
 
@@ -503,7 +516,7 @@ static const struct cl_page_operations osc_page_ops = {
 };
 
 int osc_page_init(const struct lu_env *env, struct cl_object *obj,
 };
 
 int osc_page_init(const struct lu_env *env, struct cl_object *obj,
-               struct cl_page *page, struct page *vmpage)
+                 struct cl_page *page, struct page *vmpage)
 {
        struct osc_object *osc = cl2osc(obj);
        struct osc_page   *opg = cl_object_page_slice(obj, page);
 {
        struct osc_object *osc = cl2osc(obj);
        struct osc_page   *opg = cl_object_page_slice(obj, page);
@@ -533,8 +546,18 @@ int osc_page_init(const struct lu_env *env, struct cl_object *obj,
        CFS_INIT_LIST_HEAD(&opg->ops_lru);
 
        /* reserve an LRU space for this page */
        CFS_INIT_LIST_HEAD(&opg->ops_lru);
 
        /* reserve an LRU space for this page */
-       if (page->cp_type == CPT_CACHEABLE && result == 0)
+       if (page->cp_type == CPT_CACHEABLE && result == 0) {
                result = osc_lru_reserve(env, osc, opg);
                result = osc_lru_reserve(env, osc, opg);
+               if (result == 0) {
+                       spin_lock(&osc->oo_tree_lock);
+                       result = radix_tree_insert(&osc->oo_tree,
+                                                  page->cp_index, opg);
+                       if (result == 0)
+                               ++osc->oo_npages;
+                       spin_unlock(&osc->oo_tree_lock);
+                       LASSERT(result == 0);
+               }
+       }
 
        return result;
 }
 
        return result;
 }
@@ -744,7 +767,6 @@ static void discard_pagevec(const struct lu_env *env, struct cl_io *io,
                 struct cl_page *page = pvec[i];
 
                LASSERT(cl_page_is_owned(page, io));
                 struct cl_page *page = pvec[i];
 
                LASSERT(cl_page_is_owned(page, io));
-               cl_page_unmap(env, io, page);
                cl_page_discard(env, io, page);
                cl_page_disown(env, io, page);
                 cl_page_put(env, page);
                cl_page_discard(env, io, page);
                cl_page_disown(env, io, page);
                 cl_page_put(env, page);
@@ -785,7 +807,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
                cfs_atomic_inc(&cli->cl_lru_shrinkers);
        }
 
                cfs_atomic_inc(&cli->cl_lru_shrinkers);
        }
 
-       pvec = osc_env_info(env)->oti_pvec;
+       pvec = (struct cl_page **)osc_env_info(env)->oti_pvec;
        io = &osc_env_info(env)->oti_io;
 
        client_obd_list_lock(&cli->cl_lru_list_lock);
        io = &osc_env_info(env)->oti_io;
 
        client_obd_list_lock(&cli->cl_lru_list_lock);