Whamcloud - gitweb
LU-848 clio: page writeback support
[fs/lustre-release.git] / lustre / obdclass / cl_page.c
index 926035e..5c8973f 100644 (file)
@@ -28,6 +28,9 @@
 /*
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -184,11 +187,12 @@ EXPORT_SYMBOL(cl_page_lookup);
  *
  * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
  * crucial in the face of [offset, EOF] locks.
+ *
+ * Return at least one page in @queue unless there is no covered page.
  */
-void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
-                         struct cl_io *io, pgoff_t start, pgoff_t end,
-                         struct cl_page_list *queue, int nonblock,
-                         int *resched)
+int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
+                        struct cl_io *io, pgoff_t start, pgoff_t end,
+                        struct cl_page_list *queue)
 {
         struct cl_object_header *hdr;
         struct cl_page          *page;
@@ -199,15 +203,10 @@ void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
         unsigned int             nr;
         unsigned int             i;
         unsigned int             j;
-        int                    (*page_own)(const struct lu_env *env,
-                                           struct cl_io *io,
-                                           struct cl_page *pg);
+        int                      res = CLP_GANG_OKAY;
+        int                      tree_lock = 1;
         ENTRY;
 
-        if (resched != NULL)
-                *resched = 0;
-        page_own = nonblock ? cl_page_own_try : cl_page_own;
-
         idx = start;
         hdr = cl_object_header(obj);
         pvec = cl_env_info(env)->clt_pvec;
@@ -215,18 +214,19 @@ void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
         cfs_spin_lock(&hdr->coh_page_guard);
         while ((nr = radix_tree_gang_lookup(&hdr->coh_tree, (void **)pvec,
                                             idx, CLT_PVEC_SIZE)) > 0) {
+                int end_of_region = 0;
                 idx = pvec[nr - 1]->cp_index + 1;
                 for (i = 0, j = 0; i < nr; ++i) {
                         page = pvec[i];
                         pvec[i] = NULL;
-                        if (page->cp_index > end)
+
+                        LASSERT(page->cp_type == CPT_CACHEABLE);
+                        if (page->cp_index > end) {
+                                end_of_region = 1;
                                 break;
+                        }
                         if (page->cp_state == CPS_FREEING)
                                 continue;
-                        if (page->cp_type == CPT_TRANSIENT) {
-                                /* God, we found a transient page!*/
-                                continue;
-                        }
 
                         slice = cl_page_at_trusted(page, dtype);
                         /*
@@ -258,24 +258,44 @@ void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
                  * error in the latter case).
                  */
                 cfs_spin_unlock(&hdr->coh_page_guard);
+                tree_lock = 0;
+
                 for (i = 0; i < j; ++i) {
                         page = pvec[i];
-                        if (page_own(env, io, page) == 0)
-                                cl_page_list_add(queue, page);
+                        if (res == CLP_GANG_OKAY) {
+                                typeof(cl_page_own) *page_own;
+
+                                page_own = queue->pl_nr ?
+                                           cl_page_own_try : cl_page_own;
+                                if (page_own(env, io, page) == 0) {
+                                        cl_page_list_add(queue, page);
+                                } else if (page->cp_state != CPS_FREEING) {
+                                        /* cl_page_own() won't fail unless
+                                         * the page is being freed. */
+                                        LASSERT(queue->pl_nr != 0);
+                                        res = CLP_GANG_AGAIN;
+                                }
+                        }
                         lu_ref_del(&page->cp_reference,
                                    "page_list", cfs_current());
                         cl_page_put(env, page);
                 }
-                cfs_spin_lock(&hdr->coh_page_guard);
-                if (nr < CLT_PVEC_SIZE)
+                if (nr < CLT_PVEC_SIZE || end_of_region)
                         break;
-                if (resched != NULL && cfs_need_resched()) {
-                        *resched = 1;
+
+                /* if the number of pages is zero, this will mislead the caller
+                 * that there is no page any more. */
+                if (queue->pl_nr && cfs_need_resched())
+                        res = CLP_GANG_RESCHED;
+                if (res != CLP_GANG_OKAY)
                         break;
-                }
+
+                cfs_spin_lock(&hdr->coh_page_guard);
+                tree_lock = 1;
         }
-        cfs_spin_unlock(&hdr->coh_page_guard);
-        EXIT;
+        if (tree_lock)
+                cfs_spin_unlock(&hdr->coh_page_guard);
+        RETURN(res);
 }
 EXPORT_SYMBOL(cl_page_gang_lookup);
 
@@ -357,8 +377,7 @@ static int cl_page_alloc(const struct lu_env *env, struct cl_object *o,
                                 err = o->co_ops->coo_page_init(env, o,
                                                                page, vmpage);
                                 if (err != NULL) {
-                                        cl_page_state_set_trust(page,
-                                                                CPS_FREEING);
+                                        cl_page_delete0(env, page, 0);
                                         cl_page_free(env, page);
                                         page = err;
                                         break;
@@ -398,13 +417,13 @@ static struct cl_page *cl_page_find0(const struct lu_env *env,
                                      enum cl_page_type type,
                                      struct cl_page *parent)
 {
-        struct cl_page          *page;
+        struct cl_page          *page = NULL;
         struct cl_page          *ghost = NULL;
         struct cl_object_header *hdr;
         struct cl_site          *site = cl_object_site(o);
         int err;
 
-        LINVRNT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
+        LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
         cfs_might_sleep();
 
         ENTRY;
@@ -431,11 +450,8 @@ static struct cl_page *cl_page_find0(const struct lu_env *env,
                              cl_page_vmpage(env, page) == vmpage &&
                              (void *)radix_tree_lookup(&hdr->coh_tree,
                                                        idx) == page));
-        } else {
-                cfs_spin_lock(&hdr->coh_page_guard);
-                page = cl_page_lookup(hdr, idx);
-                cfs_spin_unlock(&hdr->coh_page_guard);
         }
+
         if (page != NULL) {
                 cfs_atomic_inc(&site->cs_pages.cs_hit);
                 RETURN(page);
@@ -445,6 +461,16 @@ static struct cl_page *cl_page_find0(const struct lu_env *env,
         err = cl_page_alloc(env, o, idx, vmpage, type, &page);
         if (err != 0)
                 RETURN(page);
+
+        if (type == CPT_TRANSIENT) {
+                if (parent) {
+                        LASSERT(page->cp_parent == NULL);
+                        page->cp_parent = parent;
+                        parent->cp_child = page;
+                }
+                RETURN(page);
+        }
+
         /*
          * XXX optimization: use radix_tree_preload() here, and change tree
          * gfp mask to GFP_KERNEL in cl_object_header_init().
@@ -467,27 +493,8 @@ static struct cl_page *cl_page_find0(const struct lu_env *env,
                  *     which is very useful during diagnosing and debugging.
                  */
                 page = ERR_PTR(err);
-                if (err == -EEXIST) {
-                        /*
-                         * XXX in case of a lookup for CPT_TRANSIENT page,
-                         * nothing protects a CPT_CACHEABLE page from being
-                         * concurrently moved into CPS_FREEING state.
-                         */
-                        page = cl_page_lookup(hdr, idx);
-                        PASSERT(env, page, page != NULL);
-                        if (page->cp_type == CPT_TRANSIENT &&
-                            type == CPT_CACHEABLE) {
-                                /* XXX: We should make sure that inode sem
-                                 * keeps being held in the lifetime of
-                                 * transient pages, so it is impossible to
-                                 * have conflicting transient pages.
-                                 */
-                                cfs_spin_unlock(&hdr->coh_page_guard);
-                                cl_page_put(env, page);
-                                cfs_spin_lock(&hdr->coh_page_guard);
-                                page = ERR_PTR(-EBUSY);
-                        }
-                }
+                CL_PAGE_DEBUG(D_ERROR, env, ghost,
+                              "fail to insert into radix tree: %d\n", err);
         } else {
                 if (parent) {
                         LASSERT(page->cp_parent == NULL);
@@ -553,7 +560,7 @@ static inline int cl_page_invariant(const struct cl_page *pg)
                  * Either page is early in initialization (has neither child
                  * nor parent yet), or it is in the object radix tree.
                  */
-                ergo(pg->cp_state < CPS_FREEING,
+                ergo(pg->cp_state < CPS_FREEING && pg->cp_type == CPT_CACHEABLE,
                      (void *)radix_tree_lookup(&header->coh_tree,
                                                pg->cp_index) == pg ||
                      (child == NULL && parent == NULL));
@@ -630,7 +637,6 @@ static void cl_page_state_set0(const struct lu_env *env,
 static void cl_page_state_set(const struct lu_env *env,
                               struct cl_page *page, enum cl_page_state state)
 {
-        PINVRNT(env, page, cl_page_invariant(page));
         cl_page_state_set0(env, page, state);
 }
 
@@ -975,7 +981,7 @@ static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
         io = cl_io_top(io);
 
         if (pg->cp_state == CPS_FREEING) {
-                result = -EAGAIN;
+                result = -ENOENT;
         } else {
                 result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(cpo_own),
                                         (const struct lu_env *,
@@ -992,7 +998,7 @@ static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
                                 cl_page_state_set(env, pg, CPS_OWNED);
                         } else {
                                 cl_page_disown0(env, io, pg);
-                                result = -EAGAIN;
+                                result = -ENOENT;
                         }
                 }
         }
@@ -1037,7 +1043,6 @@ EXPORT_SYMBOL(cl_page_own_try);
 void cl_page_assume(const struct lu_env *env,
                     struct cl_io *io, struct cl_page *pg)
 {
-        PASSERT(env, pg, pg->cp_state < CPS_OWNED);
         PASSERT(env, pg, pg->cp_owner == NULL);
         PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj));
         PINVRNT(env, pg, cl_page_invariant(pg));
@@ -1159,23 +1164,25 @@ static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
         cl_page_export(env, pg, 0);
         cl_page_state_set0(env, pg, CPS_FREEING);
 
-        if (!radix)
-                /*
-                 * !radix means that @pg is not yet in the radix tree, skip
-                 * removing it.
-                 */
-                tmp = pg->cp_child;
-        for (; tmp != NULL; tmp = tmp->cp_child) {
-                void                    *value;
-                struct cl_object_header *hdr;
-
-                hdr = cl_object_header(tmp->cp_obj);
-                cfs_spin_lock(&hdr->coh_page_guard);
-                value = radix_tree_delete(&hdr->coh_tree, tmp->cp_index);
-                PASSERT(env, tmp, value == tmp);
-                PASSERT(env, tmp, hdr->coh_pages > 0);
-                hdr->coh_pages--;
-                cfs_spin_unlock(&hdr->coh_page_guard);
+        if (tmp->cp_type == CPT_CACHEABLE) {
+                if (!radix)
+                        /* !radix means that @pg is not yet in the radix tree,
+                         * skip removing it.
+                         */
+                        tmp = pg->cp_child;
+                for (; tmp != NULL; tmp = tmp->cp_child) {
+                        void                    *value;
+                        struct cl_object_header *hdr;
+
+                        hdr = cl_object_header(tmp->cp_obj);
+                        cfs_spin_lock(&hdr->coh_page_guard);
+                        value = radix_tree_delete(&hdr->coh_tree,
+                                                  tmp->cp_index);
+                        PASSERT(env, tmp, value == tmp);
+                        PASSERT(env, tmp, hdr->coh_pages > 0);
+                        hdr->coh_pages--;
+                        cfs_spin_unlock(&hdr->coh_page_guard);
+                }
         }
 
         CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_delete),
@@ -1354,7 +1361,6 @@ void cl_page_completion(const struct lu_env *env,
         /* cl_page::cp_req already cleared by the caller (osc_completion()) */
         PASSERT(env, pg, pg->cp_req == NULL);
         PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
-        PINVRNT(env, pg, cl_page_invariant(pg));
 
         ENTRY;
         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, ioret);
@@ -1368,6 +1374,7 @@ void cl_page_completion(const struct lu_env *env,
                                (const struct lu_env *,
                                 const struct cl_page_slice *, int), ioret);
         if (anchor) {
+                LASSERT(cl_page_is_vmlocked(env, pg));
                 LASSERT(pg->cp_sync_io == anchor);
                 pg->cp_sync_io = NULL;
                 cl_sync_io_note(anchor, ioret);
@@ -1478,7 +1485,6 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
         struct cl_object        *obj = cl_object_top(clobj);
         struct cl_io            *io;
         struct cl_page_list     *plist;
-        int                      resched;
         int                      result;
 
         ENTRY;
@@ -1499,8 +1505,8 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
 
         do {
                 cl_page_list_init(plist);
-                cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF, plist, 0,
-                                    &resched);
+                result = cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF,
+                                             plist);
                 /*
                  * Since we're purging the pages of an object, we don't care
                  * the possible outcomes of the following functions.
@@ -1510,9 +1516,9 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
                 cl_page_list_disown(env, io, plist);
                 cl_page_list_fini(env, plist);
 
-                if (resched)
+                if (result == CLP_GANG_RESCHED)
                         cfs_cond_resched();
-        } while (resched);
+        } while (result != CLP_GANG_OKAY);
 
         cl_io_fini(env, io);
         RETURN(result);