4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
34 * Author: Nikita Danilov <nikita.danilov@sun.com>
35 * Author: Jinshan Xiong <jinshan.xiong@intel.com>
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/list.h>
41 #include <libcfs/libcfs.h>
42 #include <obd_class.h>
43 #include <obd_support.h>
45 #include <cl_object.h>
46 #include "cl_internal.h"
48 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg);
49 static DEFINE_MUTEX(cl_page_kmem_mutex);
52 # define PASSERT(env, page, expr) \
54 if (unlikely(!(expr))) { \
55 CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n"); \
59 #else /* !LIBCFS_DEBUG */
60 # define PASSERT(env, page, exp) \
61 ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
62 #endif /* !LIBCFS_DEBUG */
64 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
65 # define PINVRNT(env, page, expr) \
67 if (unlikely(!(expr))) { \
68 CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n"); \
72 #else /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
73 # define PINVRNT(env, page, exp) \
74 ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
75 #endif /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
77 /* Disable page statistic by default due to huge performance penalty. */
78 static void cs_page_inc(const struct cl_object *obj,
79 enum cache_stats_item item)
81 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
82 atomic_inc(&cl_object_site(obj)->cs_pages.cs_stats[item]);
86 static void cs_page_dec(const struct cl_object *obj,
87 enum cache_stats_item item)
89 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
90 atomic_dec(&cl_object_site(obj)->cs_pages.cs_stats[item]);
94 static void cs_pagestate_inc(const struct cl_object *obj,
95 enum cl_page_state state)
97 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
98 atomic_inc(&cl_object_site(obj)->cs_pages_state[state]);
102 static void cs_pagestate_dec(const struct cl_object *obj,
103 enum cl_page_state state)
105 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
106 atomic_dec(&cl_object_site(obj)->cs_pages_state[state]);
111 * Internal version of cl_page_get().
113 * This function can be used to obtain initial reference to previously
114 * unreferenced cached object. It can be called only if concurrent page
115 * reclamation is somehow prevented, e.g., by keeping a lock on a VM page,
116 * associated with \a page.
118 * Use with care! Not exported.
120 static void cl_page_get_trust(struct cl_page *page)
122 LASSERT(atomic_read(&page->cp_ref) > 0);
123 atomic_inc(&page->cp_ref);
127 * Returns a slice within a page, corresponding to the given layer in the
132 static const struct cl_page_slice *
133 cl_page_at_trusted(const struct cl_page *page,
134 const struct lu_device_type *dtype)
136 const struct cl_page_slice *slice;
139 list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
140 if (slice->cpl_obj->co_lu.lo_dev->ld_type == dtype)
146 static void __cl_page_free(struct cl_page *cl_page, unsigned short bufsize)
148 int index = cl_page->cp_kmem_index;
151 LASSERT(index < ARRAY_SIZE(cl_page_kmem_array));
152 LASSERT(cl_page_kmem_size_array[index] == bufsize);
153 OBD_SLAB_FREE(cl_page, cl_page_kmem_array[index], bufsize);
155 OBD_FREE(cl_page, bufsize);
159 static void cl_page_free(const struct lu_env *env, struct cl_page *page,
160 struct pagevec *pvec)
162 struct cl_object *obj = page->cp_obj;
163 unsigned short bufsize = cl_object_header(obj)->coh_page_bufsize;
165 PASSERT(env, page, list_empty(&page->cp_batch));
166 PASSERT(env, page, page->cp_owner == NULL);
167 PASSERT(env, page, page->cp_state == CPS_FREEING);
170 while (!list_empty(&page->cp_layers)) {
171 struct cl_page_slice *slice;
173 slice = list_entry(page->cp_layers.next,
174 struct cl_page_slice, cpl_linkage);
175 list_del_init(page->cp_layers.next);
176 if (unlikely(slice->cpl_ops->cpo_fini != NULL))
177 slice->cpl_ops->cpo_fini(env, slice, pvec);
179 cs_page_dec(obj, CS_total);
180 cs_pagestate_dec(obj, page->cp_state);
181 lu_object_ref_del_at(&obj->co_lu, &page->cp_obj_ref, "cl_page", page);
182 cl_object_put(env, obj);
183 lu_ref_fini(&page->cp_reference);
184 __cl_page_free(page, bufsize);
189 * Helper function updating page state. This is the only place in the code
190 * where cl_page::cp_state field is mutated.
192 static inline void cl_page_state_set_trust(struct cl_page *page,
193 enum cl_page_state state)
196 *(enum cl_page_state *)&page->cp_state = state;
199 static struct cl_page *__cl_page_alloc(struct cl_object *o)
202 struct cl_page *cl_page = NULL;
203 unsigned short bufsize = cl_object_header(o)->coh_page_bufsize;
206 /* the number of entries in cl_page_kmem_array is expected to
207 * only be 2-3 entries, so the lookup overhead should be low.
209 for ( ; i < ARRAY_SIZE(cl_page_kmem_array); i++) {
210 if (smp_load_acquire(&cl_page_kmem_size_array[i])
212 OBD_SLAB_ALLOC_GFP(cl_page, cl_page_kmem_array[i],
215 cl_page->cp_kmem_index = i;
218 if (cl_page_kmem_size_array[i] == 0)
222 if (i < ARRAY_SIZE(cl_page_kmem_array)) {
225 mutex_lock(&cl_page_kmem_mutex);
226 if (cl_page_kmem_size_array[i]) {
227 mutex_unlock(&cl_page_kmem_mutex);
230 snprintf(cache_name, sizeof(cache_name),
231 "cl_page_kmem-%u", bufsize);
232 cl_page_kmem_array[i] =
233 kmem_cache_create(cache_name, bufsize,
235 if (cl_page_kmem_array[i] == NULL) {
236 mutex_unlock(&cl_page_kmem_mutex);
239 smp_store_release(&cl_page_kmem_size_array[i],
241 mutex_unlock(&cl_page_kmem_mutex);
244 OBD_ALLOC_GFP(cl_page, bufsize, GFP_NOFS);
246 cl_page->cp_kmem_index = -1;
252 struct cl_page *cl_page_alloc(const struct lu_env *env,
253 struct cl_object *o, pgoff_t ind, struct page *vmpage,
254 enum cl_page_type type)
256 struct cl_page *page;
257 struct lu_object_header *head;
261 page = __cl_page_alloc(o);
264 atomic_set(&page->cp_ref, 1);
267 lu_object_ref_add_at(&o->co_lu, &page->cp_obj_ref, "cl_page",
269 page->cp_vmpage = vmpage;
270 cl_page_state_set_trust(page, CPS_CACHED);
271 page->cp_type = type;
272 INIT_LIST_HEAD(&page->cp_layers);
273 INIT_LIST_HEAD(&page->cp_batch);
274 lu_ref_init(&page->cp_reference);
275 head = o->co_lu.lo_header;
276 list_for_each_entry(o, &head->loh_layers,
278 if (o->co_ops->coo_page_init != NULL) {
279 result = o->co_ops->coo_page_init(env, o, page,
282 cl_page_delete0(env, page);
283 cl_page_free(env, page, NULL);
284 page = ERR_PTR(result);
290 cs_page_inc(o, CS_total);
291 cs_page_inc(o, CS_create);
292 cs_pagestate_dec(o, CPS_CACHED);
295 page = ERR_PTR(-ENOMEM);
301 * Returns a cl_page with index \a idx at the object \a o, and associated with
302 * the VM page \a vmpage.
304 * This is the main entry point into the cl_page caching interface. First, a
305 * cache (implemented as a per-object radix tree) is consulted. If page is
306 * found there, it is returned immediately. Otherwise new page is allocated
307 * and returned. In any case, additional reference to page is acquired.
309 * \see cl_object_find(), cl_lock_find()
311 struct cl_page *cl_page_find(const struct lu_env *env,
313 pgoff_t idx, struct page *vmpage,
314 enum cl_page_type type)
316 struct cl_page *page = NULL;
317 struct cl_object_header *hdr;
319 LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
324 hdr = cl_object_header(o);
325 cs_page_inc(o, CS_lookup);
327 CDEBUG(D_PAGE, "%lu@"DFID" %p %lx %d\n",
328 idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
330 if (type == CPT_CACHEABLE) {
331 /* vmpage lock is used to protect the child/parent
333 KLASSERT(PageLocked(vmpage));
335 * cl_vmpage_page() can be called here without any locks as
337 * - "vmpage" is locked (which prevents ->private from
338 * concurrent updates), and
340 * - "o" cannot be destroyed while current thread holds a
343 page = cl_vmpage_page(vmpage, o);
345 cs_page_inc(o, CS_hit);
350 /* allocate and initialize cl_page */
351 page = cl_page_alloc(env, o, idx, vmpage, type);
354 EXPORT_SYMBOL(cl_page_find);
356 static inline int cl_page_invariant(const struct cl_page *pg)
358 return cl_page_in_use_noref(pg);
361 static void cl_page_state_set0(const struct lu_env *env,
362 struct cl_page *page, enum cl_page_state state)
364 enum cl_page_state old;
367 * Matrix of allowed state transitions [old][new], for sanity
370 static const int allowed_transitions[CPS_NR][CPS_NR] = {
373 [CPS_OWNED] = 1, /* io finds existing cached page */
375 [CPS_PAGEOUT] = 1, /* write-out from the cache */
376 [CPS_FREEING] = 1, /* eviction on the memory pressure */
379 [CPS_CACHED] = 1, /* release to the cache */
381 [CPS_PAGEIN] = 1, /* start read immediately */
382 [CPS_PAGEOUT] = 1, /* start write immediately */
383 [CPS_FREEING] = 1, /* lock invalidation or truncate */
386 [CPS_CACHED] = 1, /* io completion */
393 [CPS_CACHED] = 1, /* io completion */
409 old = page->cp_state;
410 PASSERT(env, page, allowed_transitions[old][state]);
411 CL_PAGE_HEADER(D_TRACE, env, page, "%d -> %d\n", old, state);
412 PASSERT(env, page, page->cp_state == old);
413 PASSERT(env, page, equi(state == CPS_OWNED, page->cp_owner != NULL));
415 cs_pagestate_dec(page->cp_obj, page->cp_state);
416 cs_pagestate_inc(page->cp_obj, state);
417 cl_page_state_set_trust(page, state);
421 static void cl_page_state_set(const struct lu_env *env,
422 struct cl_page *page, enum cl_page_state state)
424 cl_page_state_set0(env, page, state);
428 * Acquires an additional reference to a page.
430 * This can be called only by caller already possessing a reference to \a
433 * \see cl_object_get(), cl_lock_get().
435 void cl_page_get(struct cl_page *page)
438 cl_page_get_trust(page);
441 EXPORT_SYMBOL(cl_page_get);
444 * Releases a reference to a page, use the pagevec to release the pages
445 * in batch if provided.
447 * Users need to do a final pagevec_release() to release any trailing pages.
449 void cl_pagevec_put(const struct lu_env *env, struct cl_page *page,
450 struct pagevec *pvec)
453 CL_PAGE_HEADER(D_TRACE, env, page, "%d\n",
454 atomic_read(&page->cp_ref));
456 if (atomic_dec_and_test(&page->cp_ref)) {
457 LASSERT(page->cp_state == CPS_FREEING);
459 LASSERT(atomic_read(&page->cp_ref) == 0);
460 PASSERT(env, page, page->cp_owner == NULL);
461 PASSERT(env, page, list_empty(&page->cp_batch));
463 * Page is no longer reachable by other threads. Tear
466 cl_page_free(env, page, pvec);
471 EXPORT_SYMBOL(cl_pagevec_put);
474 * Releases a reference to a page, wrapper to cl_pagevec_put
476 * When last reference is released, page is returned to the cache, unless it
477 * is in cl_page_state::CPS_FREEING state, in which case it is immediately
480 * \see cl_object_put(), cl_lock_put().
482 void cl_page_put(const struct lu_env *env, struct cl_page *page)
484 cl_pagevec_put(env, page, NULL);
486 EXPORT_SYMBOL(cl_page_put);
489 * Returns a cl_page associated with a VM page, and given cl_object.
491 struct cl_page *cl_vmpage_page(struct page *vmpage, struct cl_object *obj)
493 struct cl_page *page;
496 KLASSERT(PageLocked(vmpage));
499 * NOTE: absence of races and liveness of data are guaranteed by page
500 * lock on a "vmpage". That works because object destruction has
501 * bottom-to-top pass.
504 page = (struct cl_page *)vmpage->private;
506 cl_page_get_trust(page);
507 LASSERT(page->cp_type == CPT_CACHEABLE);
511 EXPORT_SYMBOL(cl_vmpage_page);
513 const struct cl_page_slice *cl_page_at(const struct cl_page *page,
514 const struct lu_device_type *dtype)
516 return cl_page_at_trusted(page, dtype);
518 EXPORT_SYMBOL(cl_page_at);
520 static void cl_page_owner_clear(struct cl_page *page)
523 if (page->cp_owner != NULL) {
524 LASSERT(page->cp_owner->ci_owned_nr > 0);
525 page->cp_owner->ci_owned_nr--;
526 page->cp_owner = NULL;
531 static void cl_page_owner_set(struct cl_page *page)
534 LASSERT(page->cp_owner != NULL);
535 page->cp_owner->ci_owned_nr++;
539 void cl_page_disown0(const struct lu_env *env,
540 struct cl_io *io, struct cl_page *pg)
542 const struct cl_page_slice *slice;
543 enum cl_page_state state;
546 state = pg->cp_state;
547 PINVRNT(env, pg, state == CPS_OWNED || state == CPS_FREEING);
548 PINVRNT(env, pg, cl_page_invariant(pg) || state == CPS_FREEING);
549 cl_page_owner_clear(pg);
551 if (state == CPS_OWNED)
552 cl_page_state_set(env, pg, CPS_CACHED);
554 * Completion call-backs are executed in the bottom-up order, so that
555 * uppermost layer (llite), responsible for VFS/VM interaction runs
556 * last and can release locks safely.
558 list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
559 if (slice->cpl_ops->cpo_disown != NULL)
560 (*slice->cpl_ops->cpo_disown)(env, slice, io);
567 * returns true, iff page is owned by the given io.
569 int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
571 struct cl_io *top = cl_io_top((struct cl_io *)io);
572 LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
574 RETURN(pg->cp_state == CPS_OWNED && pg->cp_owner == top);
576 EXPORT_SYMBOL(cl_page_is_owned);
579 * Try to own a page by IO.
581 * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
582 * into cl_page_state::CPS_OWNED state.
584 * \pre !cl_page_is_owned(pg, io)
585 * \post result == 0 iff cl_page_is_owned(pg, io)
589 * \retval -ve failure, e.g., page was destroyed (and landed in
590 * cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
591 * or, page was owned by another thread, or in IO.
593 * \see cl_page_disown()
594 * \see cl_page_operations::cpo_own()
595 * \see cl_page_own_try()
598 static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
599 struct cl_page *pg, int nonblock)
602 const struct cl_page_slice *slice;
604 PINVRNT(env, pg, !cl_page_is_owned(pg, io));
609 if (pg->cp_state == CPS_FREEING) {
614 list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
615 if (slice->cpl_ops->cpo_own)
616 result = (*slice->cpl_ops->cpo_own)(env, slice,
627 PASSERT(env, pg, pg->cp_owner == NULL);
628 pg->cp_owner = cl_io_top(io);
629 cl_page_owner_set(pg);
630 if (pg->cp_state != CPS_FREEING) {
631 cl_page_state_set(env, pg, CPS_OWNED);
633 cl_page_disown0(env, io, pg);
639 PINVRNT(env, pg, ergo(result == 0, cl_page_invariant(pg)));
644 * Own a page, might be blocked.
646 * \see cl_page_own0()
648 int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
650 return cl_page_own0(env, io, pg, 0);
652 EXPORT_SYMBOL(cl_page_own);
655 * Nonblock version of cl_page_own().
657 * \see cl_page_own0()
659 int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
662 return cl_page_own0(env, io, pg, 1);
664 EXPORT_SYMBOL(cl_page_own_try);
668 * Assume page ownership.
670 * Called when page is already locked by the hosting VM.
672 * \pre !cl_page_is_owned(pg, io)
673 * \post cl_page_is_owned(pg, io)
675 * \see cl_page_operations::cpo_assume()
677 void cl_page_assume(const struct lu_env *env,
678 struct cl_io *io, struct cl_page *pg)
680 const struct cl_page_slice *slice;
682 PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj));
687 list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
688 if (slice->cpl_ops->cpo_assume != NULL)
689 (*slice->cpl_ops->cpo_assume)(env, slice, io);
692 PASSERT(env, pg, pg->cp_owner == NULL);
693 pg->cp_owner = cl_io_top(io);
694 cl_page_owner_set(pg);
695 cl_page_state_set(env, pg, CPS_OWNED);
698 EXPORT_SYMBOL(cl_page_assume);
701 * Releases page ownership without unlocking the page.
703 * Moves page into cl_page_state::CPS_CACHED without releasing a lock on the
704 * underlying VM page (as VM is supposed to do this itself).
706 * \pre cl_page_is_owned(pg, io)
707 * \post !cl_page_is_owned(pg, io)
709 * \see cl_page_assume()
711 void cl_page_unassume(const struct lu_env *env,
712 struct cl_io *io, struct cl_page *pg)
714 const struct cl_page_slice *slice;
716 PINVRNT(env, pg, cl_page_is_owned(pg, io));
717 PINVRNT(env, pg, cl_page_invariant(pg));
721 cl_page_owner_clear(pg);
722 cl_page_state_set(env, pg, CPS_CACHED);
724 list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
725 if (slice->cpl_ops->cpo_unassume != NULL)
726 (*slice->cpl_ops->cpo_unassume)(env, slice, io);
731 EXPORT_SYMBOL(cl_page_unassume);
734 * Releases page ownership.
736 * Moves page into cl_page_state::CPS_CACHED.
738 * \pre cl_page_is_owned(pg, io)
739 * \post !cl_page_is_owned(pg, io)
742 * \see cl_page_operations::cpo_disown()
744 void cl_page_disown(const struct lu_env *env,
745 struct cl_io *io, struct cl_page *pg)
747 PINVRNT(env, pg, cl_page_is_owned(pg, io) ||
748 pg->cp_state == CPS_FREEING);
752 cl_page_disown0(env, io, pg);
755 EXPORT_SYMBOL(cl_page_disown);
758 * Called when page is to be removed from the object, e.g., as a result of
761 * Calls cl_page_operations::cpo_discard() top-to-bottom.
763 * \pre cl_page_is_owned(pg, io)
765 * \see cl_page_operations::cpo_discard()
767 void cl_page_discard(const struct lu_env *env,
768 struct cl_io *io, struct cl_page *pg)
770 const struct cl_page_slice *slice;
772 PINVRNT(env, pg, cl_page_is_owned(pg, io));
773 PINVRNT(env, pg, cl_page_invariant(pg));
775 list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
776 if (slice->cpl_ops->cpo_discard != NULL)
777 (*slice->cpl_ops->cpo_discard)(env, slice, io);
780 EXPORT_SYMBOL(cl_page_discard);
783 * Version of cl_page_delete() that can be called for not fully constructed
784 * pages, e.g. in an error handling cl_page_find()->cl_page_delete0()
785 * path. Doesn't check page invariant.
787 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg)
789 const struct cl_page_slice *slice;
793 PASSERT(env, pg, pg->cp_state != CPS_FREEING);
796 * Severe all ways to obtain new pointers to @pg.
798 cl_page_owner_clear(pg);
799 cl_page_state_set0(env, pg, CPS_FREEING);
801 list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
802 if (slice->cpl_ops->cpo_delete != NULL)
803 (*slice->cpl_ops->cpo_delete)(env, slice);
810 * Called when a decision is made to throw page out of memory.
812 * Notifies all layers about page destruction by calling
813 * cl_page_operations::cpo_delete() method top-to-bottom.
815 * Moves page into cl_page_state::CPS_FREEING state (this is the only place
816 * where transition to this state happens).
818 * Eliminates all venues through which new references to the page can be
821 * - removes page from the radix trees,
823 * - breaks linkage from VM page to cl_page.
825 * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
826 * drain after some time, at which point page will be recycled.
828 * \pre VM page is locked
829 * \post pg->cp_state == CPS_FREEING
831 * \see cl_page_operations::cpo_delete()
833 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
835 PINVRNT(env, pg, cl_page_invariant(pg));
837 cl_page_delete0(env, pg);
840 EXPORT_SYMBOL(cl_page_delete);
843 * Marks page up-to-date.
845 * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
846 * layer responsible for VM interaction has to mark/clear page as up-to-date
847 * by the \a uptodate argument.
849 * \see cl_page_operations::cpo_export()
851 void cl_page_export(const struct lu_env *env, struct cl_page *pg, int uptodate)
853 const struct cl_page_slice *slice;
855 PINVRNT(env, pg, cl_page_invariant(pg));
857 list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
858 if (slice->cpl_ops->cpo_export != NULL)
859 (*slice->cpl_ops->cpo_export)(env, slice, uptodate);
862 EXPORT_SYMBOL(cl_page_export);
865 * Returns true, iff \a pg is VM locked in a suitable sense by the calling
868 int cl_page_is_vmlocked(const struct lu_env *env, const struct cl_page *pg)
870 const struct cl_page_slice *slice;
874 slice = container_of(pg->cp_layers.next,
875 const struct cl_page_slice, cpl_linkage);
876 PASSERT(env, pg, slice->cpl_ops->cpo_is_vmlocked != NULL);
878 * Call ->cpo_is_vmlocked() directly instead of going through
879 * CL_PAGE_INVOKE(), because cl_page_is_vmlocked() is used by
880 * cl_page_invariant().
882 result = slice->cpl_ops->cpo_is_vmlocked(env, slice);
883 PASSERT(env, pg, result == -EBUSY || result == -ENODATA);
884 RETURN(result == -EBUSY);
886 EXPORT_SYMBOL(cl_page_is_vmlocked);
888 void cl_page_touch(const struct lu_env *env, const struct cl_page *pg,
891 const struct cl_page_slice *slice;
895 list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
896 if (slice->cpl_ops->cpo_page_touch != NULL)
897 (*slice->cpl_ops->cpo_page_touch)(env, slice, to);
902 EXPORT_SYMBOL(cl_page_touch);
904 static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
907 RETURN(crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN);
910 static void cl_page_io_start(const struct lu_env *env,
911 struct cl_page *pg, enum cl_req_type crt)
914 * Page is queued for IO, change its state.
917 cl_page_owner_clear(pg);
918 cl_page_state_set(env, pg, cl_req_type_state(crt));
923 * Prepares page for immediate transfer. cl_page_operations::cpo_prep() is
924 * called top-to-bottom. Every layer either agrees to submit this page (by
925 * returning 0), or requests to omit this page (by returning -EALREADY). Layer
926 * handling interactions with the VM also has to inform VM that page is under
929 int cl_page_prep(const struct lu_env *env, struct cl_io *io,
930 struct cl_page *pg, enum cl_req_type crt)
932 const struct cl_page_slice *slice;
935 PINVRNT(env, pg, cl_page_is_owned(pg, io));
936 PINVRNT(env, pg, cl_page_invariant(pg));
937 PINVRNT(env, pg, crt < CRT_NR);
940 * XXX this has to be called bottom-to-top, so that llite can set up
941 * PG_writeback without risking other layers deciding to skip this
947 list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
948 if (slice->cpl_ops->cpo_own)
949 result = (*slice->cpl_ops->io[crt].cpo_prep)(env,
960 cl_page_io_start(env, pg, crt);
963 CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
966 EXPORT_SYMBOL(cl_page_prep);
969 * Notify layers about transfer completion.
971 * Invoked by transfer sub-system (which is a part of osc) to notify layers
972 * that a transfer, of which this page is a part of has completed.
974 * Completion call-backs are executed in the bottom-up order, so that
975 * uppermost layer (llite), responsible for the VFS/VM interaction runs last
976 * and can release locks safely.
978 * \pre pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
979 * \post pg->cp_state == CPS_CACHED
981 * \see cl_page_operations::cpo_completion()
983 void cl_page_completion(const struct lu_env *env,
984 struct cl_page *pg, enum cl_req_type crt, int ioret)
986 const struct cl_page_slice *slice;
987 struct cl_sync_io *anchor = pg->cp_sync_io;
989 PASSERT(env, pg, crt < CRT_NR);
990 PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
993 CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, ioret);
994 cl_page_state_set(env, pg, CPS_CACHED);
998 list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
999 if (slice->cpl_ops->io[crt].cpo_completion != NULL)
1000 (*slice->cpl_ops->io[crt].cpo_completion)(env, slice,
1004 if (anchor != NULL) {
1005 LASSERT(pg->cp_sync_io == anchor);
1006 pg->cp_sync_io = NULL;
1007 cl_sync_io_note(env, anchor, ioret);
1011 EXPORT_SYMBOL(cl_page_completion);
1014 * Notify layers that transfer formation engine decided to yank this page from
1015 * the cache and to make it a part of a transfer.
1017 * \pre pg->cp_state == CPS_CACHED
1018 * \post pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
1020 * \see cl_page_operations::cpo_make_ready()
1022 int cl_page_make_ready(const struct lu_env *env, struct cl_page *pg,
1023 enum cl_req_type crt)
1025 const struct cl_page_slice *sli;
1028 PINVRNT(env, pg, crt < CRT_NR);
1034 list_for_each_entry(sli, &pg->cp_layers, cpl_linkage) {
1035 if (sli->cpl_ops->io[crt].cpo_make_ready != NULL)
1036 result = (*sli->cpl_ops->io[crt].cpo_make_ready)(env,
1044 PASSERT(env, pg, pg->cp_state == CPS_CACHED);
1045 cl_page_io_start(env, pg, crt);
1047 CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
1050 EXPORT_SYMBOL(cl_page_make_ready);
1053 * Called if a pge is being written back by kernel's intention.
1055 * \pre cl_page_is_owned(pg, io)
1056 * \post ergo(result == 0, pg->cp_state == CPS_PAGEOUT)
1058 * \see cl_page_operations::cpo_flush()
1060 int cl_page_flush(const struct lu_env *env, struct cl_io *io,
1063 const struct cl_page_slice *slice;
1066 PINVRNT(env, pg, cl_page_is_owned(pg, io));
1067 PINVRNT(env, pg, cl_page_invariant(pg));
1071 list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
1072 if (slice->cpl_ops->cpo_flush != NULL)
1073 result = (*slice->cpl_ops->cpo_flush)(env, slice, io);
1080 CL_PAGE_HEADER(D_TRACE, env, pg, "%d\n", result);
1083 EXPORT_SYMBOL(cl_page_flush);
1086 * Tells transfer engine that only part of a page is to be transmitted.
1088 * \see cl_page_operations::cpo_clip()
1090 void cl_page_clip(const struct lu_env *env, struct cl_page *pg,
1093 const struct cl_page_slice *slice;
1095 PINVRNT(env, pg, cl_page_invariant(pg));
1097 CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", from, to);
1098 list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
1099 if (slice->cpl_ops->cpo_clip != NULL)
1100 (*slice->cpl_ops->cpo_clip)(env, slice, from, to);
1103 EXPORT_SYMBOL(cl_page_clip);
1106 * Prints human readable representation of \a pg to the \a f.
1108 void cl_page_header_print(const struct lu_env *env, void *cookie,
1109 lu_printer_t printer, const struct cl_page *pg)
1111 (*printer)(env, cookie,
1112 "page@%p[%d %p %d %d %p]\n",
1113 pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1114 pg->cp_state, pg->cp_type,
1117 EXPORT_SYMBOL(cl_page_header_print);
1120 * Prints human readable representation of \a pg to the \a f.
1122 void cl_page_print(const struct lu_env *env, void *cookie,
1123 lu_printer_t printer, const struct cl_page *pg)
1125 const struct cl_page_slice *slice;
1128 cl_page_header_print(env, cookie, printer, pg);
1129 list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
1130 if (slice->cpl_ops->cpo_print != NULL)
1131 result = (*slice->cpl_ops->cpo_print)(env, slice,
1136 (*printer)(env, cookie, "end page@%p\n", pg);
1138 EXPORT_SYMBOL(cl_page_print);
1141 * Converts a byte offset within object \a obj into a page index.
1143 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1145 return (loff_t)idx << PAGE_SHIFT;
1147 EXPORT_SYMBOL(cl_offset);
1150 * Converts a page index into a byte offset within object \a obj.
1152 pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1154 return offset >> PAGE_SHIFT;
1156 EXPORT_SYMBOL(cl_index);
1158 size_t cl_page_size(const struct cl_object *obj)
1160 return 1UL << PAGE_SHIFT;
1162 EXPORT_SYMBOL(cl_page_size);
1165 * Adds page slice to the compound page.
1167 * This is called by cl_object_operations::coo_page_init() methods to add a
1168 * per-layer state to the page. New state is added at the end of
1169 * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1171 * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1173 void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
1174 struct cl_object *obj,
1175 const struct cl_page_operations *ops)
1178 list_add_tail(&slice->cpl_linkage, &page->cp_layers);
1179 slice->cpl_obj = obj;
1180 slice->cpl_ops = ops;
1181 slice->cpl_page = page;
1184 EXPORT_SYMBOL(cl_page_slice_add);
1187 * Allocate and initialize cl_cache, called by ll_init_sbi().
1189 struct cl_client_cache *cl_cache_init(unsigned long lru_page_max)
1191 struct cl_client_cache *cache = NULL;
1194 OBD_ALLOC(cache, sizeof(*cache));
1198 /* Initialize cache data */
1199 atomic_set(&cache->ccc_users, 1);
1200 cache->ccc_lru_max = lru_page_max;
1201 atomic_long_set(&cache->ccc_lru_left, lru_page_max);
1202 spin_lock_init(&cache->ccc_lru_lock);
1203 INIT_LIST_HEAD(&cache->ccc_lru);
1205 /* turn unstable check off by default as it impacts performance */
1206 cache->ccc_unstable_check = 0;
1207 atomic_long_set(&cache->ccc_unstable_nr, 0);
1208 init_waitqueue_head(&cache->ccc_unstable_waitq);
1209 mutex_init(&cache->ccc_max_cache_mb_lock);
1213 EXPORT_SYMBOL(cl_cache_init);
1216 * Increase cl_cache refcount
1218 void cl_cache_incref(struct cl_client_cache *cache)
1220 atomic_inc(&cache->ccc_users);
1222 EXPORT_SYMBOL(cl_cache_incref);
1225 * Decrease cl_cache refcount and free the cache if refcount=0.
1226 * Since llite, lov and osc all hold cl_cache refcount,
1227 * the free will not cause race. (LU-6173)
1229 void cl_cache_decref(struct cl_client_cache *cache)
1231 if (atomic_dec_and_test(&cache->ccc_users))
1232 OBD_FREE(cache, sizeof(*cache));
1234 EXPORT_SYMBOL(cl_cache_decref);