Whamcloud - gitweb
b=19906
[fs/lustre-release.git] / lustre / obdclass / cl_page.c
index e88427b..b277cc5 100644 (file)
@@ -179,14 +179,14 @@ struct cl_page *cl_page_lookup(struct cl_object_header *hdr, pgoff_t index)
 EXPORT_SYMBOL(cl_page_lookup);
 
 /**
- * Returns a list of pages by a given [start, end] of @obj.
+ * Returns a list of pages by a given [start, end] of \a obj.
  *
  * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
  * crucial in the face of [offset, EOF] locks.
  */
 void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
                          struct cl_io *io, pgoff_t start, pgoff_t end,
-                         struct cl_page_list *queue)
+                         struct cl_page_list *queue, int nonblock)
 {
         struct cl_object_header *hdr;
         struct cl_page          *page;
@@ -197,8 +197,13 @@ void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
         unsigned int             nr;
         unsigned int             i;
         unsigned int             j;
+        int                    (*page_own)(const struct lu_env *env,
+                                           struct cl_io *io,
+                                           struct cl_page *pg);
         ENTRY;
 
+        page_own = nonblock ? cl_page_own_try : cl_page_own;
+
         idx = start;
         hdr = cl_object_header(obj);
         pvec = cl_env_info(env)->clt_pvec;
@@ -251,7 +256,7 @@ void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
                 spin_unlock(&hdr->coh_page_guard);
                 for (i = 0; i < j; ++i) {
                         page = pvec[i];
-                        if (cl_page_own(env, io, page) == 0)
+                        if (page_own(env, io, page) == 0)
                                 cl_page_list_add(queue, page);
                         lu_ref_del(&page->cp_reference,
                                    "page_list", cfs_current());
@@ -320,7 +325,7 @@ static int cl_page_alloc(const struct lu_env *env, struct cl_object *o,
 
         ENTRY;
         result = +1;
-        OBD_SLAB_ALLOC_PTR(page, cl_page_kmem);
+        OBD_SLAB_ALLOC_PTR_GFP(page, cl_page_kmem, CFS_ALLOC_IO);
         if (page != NULL) {
                 atomic_set(&page->cp_ref, 1);
                 page->cp_obj = o;
@@ -623,10 +628,22 @@ void cl_page_put(const struct lu_env *env, struct cl_page *page)
 
         ENTRY;
         CL_PAGE_HEADER(D_TRACE, env, page, "%i\n", atomic_read(&page->cp_ref));
-        hdr = cl_object_header(page->cp_obj);
-        if (atomic_dec_and_test(&page->cp_ref)) {
+
+        hdr = cl_object_header(cl_object_top(page->cp_obj));
+        if (atomic_dec_and_lock(&page->cp_ref, &hdr->coh_page_guard)) {
                 atomic_dec(&site->cs_pages.cs_busy);
+                /* We're going to access the page w/o a reference, but it's
+                 * ok because we have grabbed the lock coh_page_guard, which
+                 * means nobody is able to free this page behind us.
+                 */
                 if (page->cp_state == CPS_FREEING) {
+                        /* We drop the page reference and check the page state
+                         * inside the coh_page_guard. So that if it gets here,
+                         * it is the REALLY last reference to this page.
+                         */
+                        spin_unlock(&hdr->coh_page_guard);
+
+                        LASSERT(atomic_read(&page->cp_ref) == 0);
                         PASSERT(env, page, page->cp_owner == NULL);
                         PASSERT(env, page, list_empty(&page->cp_batch));
                         /*
@@ -634,8 +651,13 @@ void cl_page_put(const struct lu_env *env, struct cl_page *page)
                          * it down.
                          */
                         cl_page_free(env, page);
+
+                        EXIT;
+                        return;
                 }
+                spin_unlock(&hdr->coh_page_guard);
         }
+
         EXIT;
 }
 EXPORT_SYMBOL(cl_page_put);
@@ -669,6 +691,7 @@ EXPORT_SYMBOL(cl_page_vmpage);
 struct cl_page *cl_vmpage_page(cfs_page_t *vmpage, struct cl_object *obj)
 {
         struct cl_page *page;
+        struct cl_object_header *hdr;
 
         ENTRY;
         KLASSERT(PageLocked(vmpage));
@@ -683,6 +706,8 @@ struct cl_page *cl_vmpage_page(cfs_page_t *vmpage, struct cl_object *obj)
          * This loop assumes that ->private points to the top-most page. This
          * can be rectified easily.
          */
+        hdr = cl_object_header(cl_object_top(obj));
+        spin_lock(&hdr->coh_page_guard);
         for (page = (void *)vmpage->private;
              page != NULL; page = page->cp_child) {
                 if (cl_object_same(page->cp_obj, obj)) {
@@ -690,6 +715,7 @@ struct cl_page *cl_vmpage_page(cfs_page_t *vmpage, struct cl_object *obj)
                         break;
                 }
         }
+        spin_unlock(&hdr->coh_page_guard);
         LASSERT(ergo(page, cl_is_page(page) && page->cp_type == CPT_CACHEABLE));
         RETURN(page);
 }
@@ -837,6 +863,7 @@ static void cl_page_owner_clear(struct cl_page *page)
                         LASSERT(page->cp_owner->ci_owned_nr > 0);
                         page->cp_owner->ci_owned_nr--;
                         page->cp_owner = NULL;
+                        page->cp_task = NULL;
                 }
         }
         EXIT;
@@ -889,7 +916,7 @@ int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
 EXPORT_SYMBOL(cl_page_is_owned);
 
 /**
- * Owns a page by IO.
+ * Try to own a page by IO.
  *
  * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
  * into cl_page_state::CPS_OWNED state.
@@ -901,11 +928,15 @@ EXPORT_SYMBOL(cl_page_is_owned);
  *
  * \retval -ve failure, e.g., page was destroyed (and landed in
  *             cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
+ *             or, page was owned by another thread, or in IO.
  *
  * \see cl_page_disown()
  * \see cl_page_operations::cpo_own()
+ * \see cl_page_own_try()
+ * \see cl_page_own
  */
-int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
+static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
+                        struct cl_page *pg, int nonblock)
 {
         int result;
 
@@ -915,24 +946,57 @@ int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
         pg = cl_page_top(pg);
         io = cl_io_top(io);
 
-        cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_own));
-        PASSERT(env, pg, pg->cp_owner == NULL);
-        PASSERT(env, pg, pg->cp_req == NULL);
-        pg->cp_owner = io;
-        cl_page_owner_set(pg);
-        if (pg->cp_state != CPS_FREEING) {
-                cl_page_state_set(env, pg, CPS_OWNED);
-                result = 0;
-        } else {
-                cl_page_disown0(env, io, pg);
+        if (pg->cp_state == CPS_FREEING) {
                 result = -EAGAIN;
+        } else {
+                result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(cpo_own),
+                                        (const struct lu_env *,
+                                         const struct cl_page_slice *,
+                                         struct cl_io *, int),
+                                        io, nonblock);
+                if (result == 0) {
+                        PASSERT(env, pg, pg->cp_owner == NULL);
+                        PASSERT(env, pg, pg->cp_req == NULL);
+                        pg->cp_owner = io;
+                        pg->cp_task  = current;
+                        cl_page_owner_set(pg);
+                        if (pg->cp_state != CPS_FREEING) {
+                                cl_page_state_set(env, pg, CPS_OWNED);
+                        } else {
+                                cl_page_disown0(env, io, pg);
+                                result = -EAGAIN;
+                        }
+                }
         }
         PINVRNT(env, pg, ergo(result == 0, cl_page_invariant(pg)));
         RETURN(result);
 }
+
+/**
+ * Own a page, might be blocked.
+ *
+ * \see cl_page_own0()
+ */
+int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
+{
+        return cl_page_own0(env, io, pg, 0);
+}
 EXPORT_SYMBOL(cl_page_own);
 
 /**
+ * Nonblock version of cl_page_own().
+ *
+ * \see cl_page_own0()
+ */
+int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
+                    struct cl_page *pg)
+{
+        return cl_page_own0(env, io, pg, 1);
+}
+EXPORT_SYMBOL(cl_page_own_try);
+
+
+/**
  * Assume page ownership.
  *
  * Called when page is already locked by the hosting VM.
@@ -956,6 +1020,7 @@ void cl_page_assume(const struct lu_env *env,
 
         cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_assume));
         pg->cp_owner = io;
+        pg->cp_task = current;
         cl_page_owner_set(pg);
         cl_page_state_set(env, pg, CPS_OWNED);
         EXIT;
@@ -1044,35 +1109,49 @@ EXPORT_SYMBOL(cl_page_discard);
 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
                             int radix)
 {
+        struct cl_page *tmp = pg;
+        ENTRY;
+
         PASSERT(env, pg, pg == cl_page_top(pg));
         PASSERT(env, pg, pg->cp_state != CPS_FREEING);
 
-        ENTRY;
         /*
          * Severe all ways to obtain new pointers to @pg.
          */
         cl_page_owner_clear(pg);
+
+        /* 
+         * unexport the page firstly before freeing it so that
+         * the page content is considered to be invalid.
+         * We have to do this because a CPS_FREEING cl_page may
+         * be NOT under the protection of a cl_lock.
+         * Afterwards, if this page is found by other threads, then this
+         * page will be forced to reread.
+         */
+        cl_page_export(env, pg, 0);
         cl_page_state_set0(env, pg, CPS_FREEING);
-        CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_delete),
-                       (const struct lu_env *, const struct cl_page_slice *));
+
         if (!radix)
                 /*
                  * !radix means that @pg is not yet in the radix tree, skip
                  * removing it.
                  */
-                pg = pg->cp_child;
-        for (; pg != NULL; pg = pg->cp_child) {
+                tmp = pg->cp_child;
+        for (; tmp != NULL; tmp = tmp->cp_child) {
                 void                    *value;
                 struct cl_object_header *hdr;
 
-                hdr = cl_object_header(pg->cp_obj);
+                hdr = cl_object_header(tmp->cp_obj);
                 spin_lock(&hdr->coh_page_guard);
-                value = radix_tree_delete(&hdr->coh_tree, pg->cp_index);
-                PASSERT(env, pg, value == pg);
-                PASSERT(env, pg, hdr->coh_pages > 0);
+                value = radix_tree_delete(&hdr->coh_tree, tmp->cp_index);
+                PASSERT(env, tmp, value == tmp);
+                PASSERT(env, tmp, hdr->coh_pages > 0);
                 hdr->coh_pages--;
                 spin_unlock(&hdr->coh_page_guard);
         }
+
+        CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_delete),
+                       (const struct lu_env *, const struct cl_page_slice *));
         EXIT;
 }
 
@@ -1133,17 +1212,17 @@ EXPORT_SYMBOL(cl_page_unmap);
  * Marks page up-to-date.
  *
  * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
- * layer responsible for VM interaction has to mark page as up-to-date. From
- * this moment on, page can be shown to the user space without Lustre being
- * notified, hence the name.
+ * layer responsible for VM interaction has to mark/clear page as up-to-date
+ * by the \a uptodate argument.
  *
  * \see cl_page_operations::cpo_export()
  */
-void cl_page_export(const struct lu_env *env, struct cl_page *pg)
+void cl_page_export(const struct lu_env *env, struct cl_page *pg, int uptodate)
 {
         PINVRNT(env, pg, cl_page_invariant(pg));
         CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_export),
-                       (const struct lu_env *, const struct cl_page_slice *));
+                       (const struct lu_env *,
+                        const struct cl_page_slice *, int), uptodate);
 }
 EXPORT_SYMBOL(cl_page_export);
 
@@ -1241,6 +1320,8 @@ EXPORT_SYMBOL(cl_page_prep);
 void cl_page_completion(const struct lu_env *env,
                         struct cl_page *pg, enum cl_req_type crt, int ioret)
 {
+        struct cl_sync_io *anchor = pg->cp_sync_io;
+
         PASSERT(env, pg, crt < CRT_NR);
         /* cl_page::cp_req already cleared by the caller (osc_completion()) */
         PASSERT(env, pg, pg->cp_req == NULL);
@@ -1249,7 +1330,7 @@ void cl_page_completion(const struct lu_env *env,
 
         ENTRY;
         CL_PAGE_HEADER(D_TRACE, env, pg, "%i %i\n", crt, ioret);
-        if (crt == CRT_READ) {
+        if (crt == CRT_READ && ioret == 0) {
                 PASSERT(env, pg, !(pg->cp_flags & CPF_READ_COMPLETED));
                 pg->cp_flags |= CPF_READ_COMPLETED;
         }
@@ -1258,6 +1339,11 @@ void cl_page_completion(const struct lu_env *env,
         CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(io[crt].cpo_completion),
                                (const struct lu_env *,
                                 const struct cl_page_slice *, int), ioret);
+        if (anchor) {
+                LASSERT(pg->cp_sync_io == anchor);
+                pg->cp_sync_io = NULL;
+                cl_sync_io_note(anchor, ioret);
+        }
 
         /* Don't assert the page writeback bit here because the lustre file
          * may be as a backend of swap space. in this case, the page writeback
@@ -1383,7 +1469,7 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
         }
 
         cl_page_list_init(plist);
-        cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF, plist);
+        cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF, plist, 0);
         /*
          * Since we're purging the pages of an object, we don't care
          * the possible outcomes of the following functions.