Whamcloud - gitweb
LU-13134 obdclass: use slab allocation for cl_page
[fs/lustre-release.git] / lustre / obdclass / cl_page.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * Client Lustre Page.
33  *
34  *   Author: Nikita Danilov <nikita.danilov@sun.com>
35  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/list.h>
41 #include <libcfs/libcfs.h>
42 #include <obd_class.h>
43 #include <obd_support.h>
44
45 #include <cl_object.h>
46 #include "cl_internal.h"
47
48 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg);
49 static DEFINE_MUTEX(cl_page_kmem_mutex);
50
51 #ifdef LIBCFS_DEBUG
52 # define PASSERT(env, page, expr)                                       \
53   do {                                                                    \
54           if (unlikely(!(expr))) {                                      \
55                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
56                   LASSERT(0);                                           \
57           }                                                             \
58   } while (0)
59 #else /* !LIBCFS_DEBUG */
60 # define PASSERT(env, page, exp) \
61         ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
62 #endif /* !LIBCFS_DEBUG */
63
64 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
65 # define PINVRNT(env, page, expr)                                       \
66   do {                                                                    \
67           if (unlikely(!(expr))) {                                      \
68                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
69                   LINVRNT(0);                                           \
70           }                                                             \
71   } while (0)
72 #else /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
73 # define PINVRNT(env, page, exp) \
74          ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
75 #endif /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
76
77 /* Disable page statistic by default due to huge performance penalty. */
78 static void cs_page_inc(const struct cl_object *obj,
79                         enum cache_stats_item item)
80 {
81 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
82         atomic_inc(&cl_object_site(obj)->cs_pages.cs_stats[item]);
83 #endif
84 }
85
86 static void cs_page_dec(const struct cl_object *obj,
87                         enum cache_stats_item item)
88 {
89 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
90         atomic_dec(&cl_object_site(obj)->cs_pages.cs_stats[item]);
91 #endif
92 }
93
94 static void cs_pagestate_inc(const struct cl_object *obj,
95                              enum cl_page_state state)
96 {
97 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
98         atomic_inc(&cl_object_site(obj)->cs_pages_state[state]);
99 #endif
100 }
101
102 static void cs_pagestate_dec(const struct cl_object *obj,
103                               enum cl_page_state state)
104 {
105 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
106         atomic_dec(&cl_object_site(obj)->cs_pages_state[state]);
107 #endif
108 }
109
110 /**
111  * Internal version of cl_page_get().
112  *
113  * This function can be used to obtain initial reference to previously
114  * unreferenced cached object. It can be called only if concurrent page
115  * reclamation is somehow prevented, e.g., by keeping a lock on a VM page,
116  * associated with \a page.
117  *
118  * Use with care! Not exported.
119  */
120 static void cl_page_get_trust(struct cl_page *page)
121 {
122         LASSERT(atomic_read(&page->cp_ref) > 0);
123         atomic_inc(&page->cp_ref);
124 }
125
126 /**
127  * Returns a slice within a page, corresponding to the given layer in the
128  * device stack.
129  *
130  * \see cl_lock_at()
131  */
132 static const struct cl_page_slice *
133 cl_page_at_trusted(const struct cl_page *page,
134                    const struct lu_device_type *dtype)
135 {
136         const struct cl_page_slice *slice;
137         ENTRY;
138
139         list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
140                 if (slice->cpl_obj->co_lu.lo_dev->ld_type == dtype)
141                         RETURN(slice);
142         }
143         RETURN(NULL);
144 }
145
146 static void __cl_page_free(struct cl_page *cl_page, unsigned short bufsize)
147 {
148         int index = cl_page->cp_kmem_index;
149
150         if (index >= 0) {
151                 LASSERT(index < ARRAY_SIZE(cl_page_kmem_array));
152                 LASSERT(cl_page_kmem_size_array[index] == bufsize);
153                 OBD_SLAB_FREE(cl_page, cl_page_kmem_array[index], bufsize);
154         } else {
155                 OBD_FREE(cl_page, bufsize);
156         }
157 }
158
159 static void cl_page_free(const struct lu_env *env, struct cl_page *page,
160                          struct pagevec *pvec)
161 {
162         struct cl_object *obj  = page->cp_obj;
163         unsigned short bufsize = cl_object_header(obj)->coh_page_bufsize;
164
165         PASSERT(env, page, list_empty(&page->cp_batch));
166         PASSERT(env, page, page->cp_owner == NULL);
167         PASSERT(env, page, page->cp_state == CPS_FREEING);
168
169         ENTRY;
170         while (!list_empty(&page->cp_layers)) {
171                 struct cl_page_slice *slice;
172
173                 slice = list_entry(page->cp_layers.next,
174                                    struct cl_page_slice, cpl_linkage);
175                 list_del_init(page->cp_layers.next);
176                 if (unlikely(slice->cpl_ops->cpo_fini != NULL))
177                         slice->cpl_ops->cpo_fini(env, slice, pvec);
178         }
179         cs_page_dec(obj, CS_total);
180         cs_pagestate_dec(obj, page->cp_state);
181         lu_object_ref_del_at(&obj->co_lu, &page->cp_obj_ref, "cl_page", page);
182         cl_object_put(env, obj);
183         lu_ref_fini(&page->cp_reference);
184         __cl_page_free(page, bufsize);
185         EXIT;
186 }
187
188 /**
189  * Helper function updating page state. This is the only place in the code
190  * where cl_page::cp_state field is mutated.
191  */
192 static inline void cl_page_state_set_trust(struct cl_page *page,
193                                            enum cl_page_state state)
194 {
195         /* bypass const. */
196         *(enum cl_page_state *)&page->cp_state = state;
197 }
198
199 static struct cl_page *__cl_page_alloc(struct cl_object *o)
200 {
201         int i = 0;
202         struct cl_page *cl_page = NULL;
203         unsigned short bufsize = cl_object_header(o)->coh_page_bufsize;
204
205 check:
206         /* the number of entries in cl_page_kmem_array is expected to
207          * only be 2-3 entries, so the lookup overhead should be low.
208          */
209         for ( ; i < ARRAY_SIZE(cl_page_kmem_array); i++) {
210                 if (smp_load_acquire(&cl_page_kmem_size_array[i])
211                     == bufsize) {
212                         OBD_SLAB_ALLOC_GFP(cl_page, cl_page_kmem_array[i],
213                                            bufsize, GFP_NOFS);
214                         if (cl_page)
215                                 cl_page->cp_kmem_index = i;
216                         return cl_page;
217                 }
218                 if (cl_page_kmem_size_array[i] == 0)
219                         break;
220         }
221
222         if (i < ARRAY_SIZE(cl_page_kmem_array)) {
223                 char cache_name[32];
224
225                 mutex_lock(&cl_page_kmem_mutex);
226                 if (cl_page_kmem_size_array[i]) {
227                         mutex_unlock(&cl_page_kmem_mutex);
228                         goto check;
229                 }
230                 snprintf(cache_name, sizeof(cache_name),
231                          "cl_page_kmem-%u", bufsize);
232                 cl_page_kmem_array[i] =
233                         kmem_cache_create(cache_name, bufsize,
234                                           0, 0, NULL);
235                 if (cl_page_kmem_array[i] == NULL) {
236                         mutex_unlock(&cl_page_kmem_mutex);
237                         return NULL;
238                 }
239                 smp_store_release(&cl_page_kmem_size_array[i],
240                                   bufsize);
241                 mutex_unlock(&cl_page_kmem_mutex);
242                 goto check;
243         } else {
244                 OBD_ALLOC_GFP(cl_page, bufsize, GFP_NOFS);
245                 if (cl_page)
246                         cl_page->cp_kmem_index = -1;
247         }
248
249         return cl_page;
250 }
251
252 struct cl_page *cl_page_alloc(const struct lu_env *env,
253                 struct cl_object *o, pgoff_t ind, struct page *vmpage,
254                 enum cl_page_type type)
255 {
256         struct cl_page          *page;
257         struct lu_object_header *head;
258
259         ENTRY;
260
261         page = __cl_page_alloc(o);
262         if (page != NULL) {
263                 int result = 0;
264                 atomic_set(&page->cp_ref, 1);
265                 page->cp_obj = o;
266                 cl_object_get(o);
267                 lu_object_ref_add_at(&o->co_lu, &page->cp_obj_ref, "cl_page",
268                                      page);
269                 page->cp_vmpage = vmpage;
270                 cl_page_state_set_trust(page, CPS_CACHED);
271                 page->cp_type = type;
272                 INIT_LIST_HEAD(&page->cp_layers);
273                 INIT_LIST_HEAD(&page->cp_batch);
274                 lu_ref_init(&page->cp_reference);
275                 head = o->co_lu.lo_header;
276                 list_for_each_entry(o, &head->loh_layers,
277                                     co_lu.lo_linkage) {
278                         if (o->co_ops->coo_page_init != NULL) {
279                                 result = o->co_ops->coo_page_init(env, o, page,
280                                                                   ind);
281                                 if (result != 0) {
282                                         cl_page_delete0(env, page);
283                                         cl_page_free(env, page, NULL);
284                                         page = ERR_PTR(result);
285                                         break;
286                                 }
287                         }
288                 }
289                 if (result == 0) {
290                         cs_page_inc(o, CS_total);
291                         cs_page_inc(o, CS_create);
292                         cs_pagestate_dec(o, CPS_CACHED);
293                 }
294         } else {
295                 page = ERR_PTR(-ENOMEM);
296         }
297         RETURN(page);
298 }
299
300 /**
301  * Returns a cl_page with index \a idx at the object \a o, and associated with
302  * the VM page \a vmpage.
303  *
304  * This is the main entry point into the cl_page caching interface. First, a
305  * cache (implemented as a per-object radix tree) is consulted. If page is
306  * found there, it is returned immediately. Otherwise new page is allocated
307  * and returned. In any case, additional reference to page is acquired.
308  *
309  * \see cl_object_find(), cl_lock_find()
310  */
311 struct cl_page *cl_page_find(const struct lu_env *env,
312                              struct cl_object *o,
313                              pgoff_t idx, struct page *vmpage,
314                              enum cl_page_type type)
315 {
316         struct cl_page          *page = NULL;
317         struct cl_object_header *hdr;
318
319         LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
320         might_sleep();
321
322         ENTRY;
323
324         hdr = cl_object_header(o);
325         cs_page_inc(o, CS_lookup);
326
327         CDEBUG(D_PAGE, "%lu@"DFID" %p %lx %d\n",
328                idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
329         /* fast path. */
330         if (type == CPT_CACHEABLE) {
331                 /* vmpage lock is used to protect the child/parent
332                  * relationship */
333                 KLASSERT(PageLocked(vmpage));
334                 /*
335                  * cl_vmpage_page() can be called here without any locks as
336                  *
337                  *     - "vmpage" is locked (which prevents ->private from
338                  *       concurrent updates), and
339                  *
340                  *     - "o" cannot be destroyed while current thread holds a
341                  *       reference on it.
342                  */
343                 page = cl_vmpage_page(vmpage, o);
344                 if (page != NULL) {
345                         cs_page_inc(o, CS_hit);
346                         RETURN(page);
347                 }
348         }
349
350         /* allocate and initialize cl_page */
351         page = cl_page_alloc(env, o, idx, vmpage, type);
352         RETURN(page);
353 }
354 EXPORT_SYMBOL(cl_page_find);
355
356 static inline int cl_page_invariant(const struct cl_page *pg)
357 {
358         return cl_page_in_use_noref(pg);
359 }
360
361 static void cl_page_state_set0(const struct lu_env *env,
362                                struct cl_page *page, enum cl_page_state state)
363 {
364         enum cl_page_state old;
365
366         /*
367          * Matrix of allowed state transitions [old][new], for sanity
368          * checking.
369          */
370         static const int allowed_transitions[CPS_NR][CPS_NR] = {
371                 [CPS_CACHED] = {
372                         [CPS_CACHED]  = 0,
373                         [CPS_OWNED]   = 1, /* io finds existing cached page */
374                         [CPS_PAGEIN]  = 0,
375                         [CPS_PAGEOUT] = 1, /* write-out from the cache */
376                         [CPS_FREEING] = 1, /* eviction on the memory pressure */
377                 },
378                 [CPS_OWNED] = {
379                         [CPS_CACHED]  = 1, /* release to the cache */
380                         [CPS_OWNED]   = 0,
381                         [CPS_PAGEIN]  = 1, /* start read immediately */
382                         [CPS_PAGEOUT] = 1, /* start write immediately */
383                         [CPS_FREEING] = 1, /* lock invalidation or truncate */
384                 },
385                 [CPS_PAGEIN] = {
386                         [CPS_CACHED]  = 1, /* io completion */
387                         [CPS_OWNED]   = 0,
388                         [CPS_PAGEIN]  = 0,
389                         [CPS_PAGEOUT] = 0,
390                         [CPS_FREEING] = 0,
391                 },
392                 [CPS_PAGEOUT] = {
393                         [CPS_CACHED]  = 1, /* io completion */
394                         [CPS_OWNED]   = 0,
395                         [CPS_PAGEIN]  = 0,
396                         [CPS_PAGEOUT] = 0,
397                         [CPS_FREEING] = 0,
398                 },
399                 [CPS_FREEING] = {
400                         [CPS_CACHED]  = 0,
401                         [CPS_OWNED]   = 0,
402                         [CPS_PAGEIN]  = 0,
403                         [CPS_PAGEOUT] = 0,
404                         [CPS_FREEING] = 0,
405                 }
406         };
407
408         ENTRY;
409         old = page->cp_state;
410         PASSERT(env, page, allowed_transitions[old][state]);
411         CL_PAGE_HEADER(D_TRACE, env, page, "%d -> %d\n", old, state);
412         PASSERT(env, page, page->cp_state == old);
413         PASSERT(env, page, equi(state == CPS_OWNED, page->cp_owner != NULL));
414
415         cs_pagestate_dec(page->cp_obj, page->cp_state);
416         cs_pagestate_inc(page->cp_obj, state);
417         cl_page_state_set_trust(page, state);
418         EXIT;
419 }
420
421 static void cl_page_state_set(const struct lu_env *env,
422                               struct cl_page *page, enum cl_page_state state)
423 {
424         cl_page_state_set0(env, page, state);
425 }
426
427 /**
428  * Acquires an additional reference to a page.
429  *
430  * This can be called only by caller already possessing a reference to \a
431  * page.
432  *
433  * \see cl_object_get(), cl_lock_get().
434  */
435 void cl_page_get(struct cl_page *page)
436 {
437         ENTRY;
438         cl_page_get_trust(page);
439         EXIT;
440 }
441 EXPORT_SYMBOL(cl_page_get);
442
443 /**
444  * Releases a reference to a page, use the pagevec to release the pages
445  * in batch if provided.
446  *
447  * Users need to do a final pagevec_release() to release any trailing pages.
448  */
449 void cl_pagevec_put(const struct lu_env *env, struct cl_page *page,
450                   struct pagevec *pvec)
451 {
452         ENTRY;
453         CL_PAGE_HEADER(D_TRACE, env, page, "%d\n",
454                        atomic_read(&page->cp_ref));
455
456         if (atomic_dec_and_test(&page->cp_ref)) {
457                 LASSERT(page->cp_state == CPS_FREEING);
458
459                 LASSERT(atomic_read(&page->cp_ref) == 0);
460                 PASSERT(env, page, page->cp_owner == NULL);
461                 PASSERT(env, page, list_empty(&page->cp_batch));
462                 /*
463                  * Page is no longer reachable by other threads. Tear
464                  * it down.
465                  */
466                 cl_page_free(env, page, pvec);
467         }
468
469         EXIT;
470 }
471 EXPORT_SYMBOL(cl_pagevec_put);
472
473 /**
474  * Releases a reference to a page, wrapper to cl_pagevec_put
475  *
476  * When last reference is released, page is returned to the cache, unless it
477  * is in cl_page_state::CPS_FREEING state, in which case it is immediately
478  * destroyed.
479  *
480  * \see cl_object_put(), cl_lock_put().
481  */
482 void cl_page_put(const struct lu_env *env, struct cl_page *page)
483 {
484         cl_pagevec_put(env, page, NULL);
485 }
486 EXPORT_SYMBOL(cl_page_put);
487
488 /**
489  * Returns a cl_page associated with a VM page, and given cl_object.
490  */
491 struct cl_page *cl_vmpage_page(struct page *vmpage, struct cl_object *obj)
492 {
493         struct cl_page *page;
494
495         ENTRY;
496         KLASSERT(PageLocked(vmpage));
497
498         /*
499          * NOTE: absence of races and liveness of data are guaranteed by page
500          *       lock on a "vmpage". That works because object destruction has
501          *       bottom-to-top pass.
502          */
503
504         page = (struct cl_page *)vmpage->private;
505         if (page != NULL) {
506                 cl_page_get_trust(page);
507                 LASSERT(page->cp_type == CPT_CACHEABLE);
508         }
509         RETURN(page);
510 }
511 EXPORT_SYMBOL(cl_vmpage_page);
512
513 const struct cl_page_slice *cl_page_at(const struct cl_page *page,
514                                        const struct lu_device_type *dtype)
515 {
516         return cl_page_at_trusted(page, dtype);
517 }
518 EXPORT_SYMBOL(cl_page_at);
519
520 static void cl_page_owner_clear(struct cl_page *page)
521 {
522         ENTRY;
523         if (page->cp_owner != NULL) {
524                 LASSERT(page->cp_owner->ci_owned_nr > 0);
525                 page->cp_owner->ci_owned_nr--;
526                 page->cp_owner = NULL;
527         }
528         EXIT;
529 }
530
531 static void cl_page_owner_set(struct cl_page *page)
532 {
533         ENTRY;
534         LASSERT(page->cp_owner != NULL);
535         page->cp_owner->ci_owned_nr++;
536         EXIT;
537 }
538
539 void cl_page_disown0(const struct lu_env *env,
540                      struct cl_io *io, struct cl_page *pg)
541 {
542         const struct cl_page_slice *slice;
543         enum cl_page_state state;
544
545         ENTRY;
546         state = pg->cp_state;
547         PINVRNT(env, pg, state == CPS_OWNED || state == CPS_FREEING);
548         PINVRNT(env, pg, cl_page_invariant(pg) || state == CPS_FREEING);
549         cl_page_owner_clear(pg);
550
551         if (state == CPS_OWNED)
552                 cl_page_state_set(env, pg, CPS_CACHED);
553         /*
554          * Completion call-backs are executed in the bottom-up order, so that
555          * uppermost layer (llite), responsible for VFS/VM interaction runs
556          * last and can release locks safely.
557          */
558         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
559                 if (slice->cpl_ops->cpo_disown != NULL)
560                         (*slice->cpl_ops->cpo_disown)(env, slice, io);
561         }
562
563         EXIT;
564 }
565
566 /**
567  * returns true, iff page is owned by the given io.
568  */
569 int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
570 {
571         struct cl_io *top = cl_io_top((struct cl_io *)io);
572         LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
573         ENTRY;
574         RETURN(pg->cp_state == CPS_OWNED && pg->cp_owner == top);
575 }
576 EXPORT_SYMBOL(cl_page_is_owned);
577
578 /**
579  * Try to own a page by IO.
580  *
581  * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
582  * into cl_page_state::CPS_OWNED state.
583  *
584  * \pre  !cl_page_is_owned(pg, io)
585  * \post result == 0 iff cl_page_is_owned(pg, io)
586  *
587  * \retval 0   success
588  *
589  * \retval -ve failure, e.g., page was destroyed (and landed in
590  *             cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
591  *             or, page was owned by another thread, or in IO.
592  *
593  * \see cl_page_disown()
594  * \see cl_page_operations::cpo_own()
595  * \see cl_page_own_try()
596  * \see cl_page_own
597  */
598 static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
599                         struct cl_page *pg, int nonblock)
600 {
601         int result = 0;
602         const struct cl_page_slice *slice;
603
604         PINVRNT(env, pg, !cl_page_is_owned(pg, io));
605
606         ENTRY;
607         io = cl_io_top(io);
608
609         if (pg->cp_state == CPS_FREEING) {
610                 result = -ENOENT;
611                 goto out;
612         }
613
614         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
615                 if (slice->cpl_ops->cpo_own)
616                         result = (*slice->cpl_ops->cpo_own)(env, slice,
617                                                             io, nonblock);
618
619                 if (result != 0)
620                         break;
621
622         }
623         if (result > 0)
624                 result = 0;
625
626         if (result == 0) {
627                 PASSERT(env, pg, pg->cp_owner == NULL);
628                 pg->cp_owner = cl_io_top(io);
629                 cl_page_owner_set(pg);
630                 if (pg->cp_state != CPS_FREEING) {
631                         cl_page_state_set(env, pg, CPS_OWNED);
632                 } else {
633                         cl_page_disown0(env, io, pg);
634                         result = -ENOENT;
635                 }
636         }
637
638 out:
639         PINVRNT(env, pg, ergo(result == 0, cl_page_invariant(pg)));
640         RETURN(result);
641 }
642
643 /**
644  * Own a page, might be blocked.
645  *
646  * \see cl_page_own0()
647  */
648 int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
649 {
650         return cl_page_own0(env, io, pg, 0);
651 }
652 EXPORT_SYMBOL(cl_page_own);
653
654 /**
655  * Nonblock version of cl_page_own().
656  *
657  * \see cl_page_own0()
658  */
659 int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
660                     struct cl_page *pg)
661 {
662         return cl_page_own0(env, io, pg, 1);
663 }
664 EXPORT_SYMBOL(cl_page_own_try);
665
666
667 /**
668  * Assume page ownership.
669  *
670  * Called when page is already locked by the hosting VM.
671  *
672  * \pre !cl_page_is_owned(pg, io)
673  * \post cl_page_is_owned(pg, io)
674  *
675  * \see cl_page_operations::cpo_assume()
676  */
677 void cl_page_assume(const struct lu_env *env,
678                     struct cl_io *io, struct cl_page *pg)
679 {
680         const struct cl_page_slice *slice;
681
682         PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj));
683
684         ENTRY;
685         io = cl_io_top(io);
686
687         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
688                 if (slice->cpl_ops->cpo_assume != NULL)
689                         (*slice->cpl_ops->cpo_assume)(env, slice, io);
690         }
691
692         PASSERT(env, pg, pg->cp_owner == NULL);
693         pg->cp_owner = cl_io_top(io);
694         cl_page_owner_set(pg);
695         cl_page_state_set(env, pg, CPS_OWNED);
696         EXIT;
697 }
698 EXPORT_SYMBOL(cl_page_assume);
699
700 /**
701  * Releases page ownership without unlocking the page.
702  *
703  * Moves page into cl_page_state::CPS_CACHED without releasing a lock on the
704  * underlying VM page (as VM is supposed to do this itself).
705  *
706  * \pre   cl_page_is_owned(pg, io)
707  * \post !cl_page_is_owned(pg, io)
708  *
709  * \see cl_page_assume()
710  */
711 void cl_page_unassume(const struct lu_env *env,
712                       struct cl_io *io, struct cl_page *pg)
713 {
714         const struct cl_page_slice *slice;
715
716         PINVRNT(env, pg, cl_page_is_owned(pg, io));
717         PINVRNT(env, pg, cl_page_invariant(pg));
718
719         ENTRY;
720         io = cl_io_top(io);
721         cl_page_owner_clear(pg);
722         cl_page_state_set(env, pg, CPS_CACHED);
723
724         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
725                 if (slice->cpl_ops->cpo_unassume != NULL)
726                         (*slice->cpl_ops->cpo_unassume)(env, slice, io);
727         }
728
729         EXIT;
730 }
731 EXPORT_SYMBOL(cl_page_unassume);
732
733 /**
734  * Releases page ownership.
735  *
736  * Moves page into cl_page_state::CPS_CACHED.
737  *
738  * \pre   cl_page_is_owned(pg, io)
739  * \post !cl_page_is_owned(pg, io)
740  *
741  * \see cl_page_own()
742  * \see cl_page_operations::cpo_disown()
743  */
744 void cl_page_disown(const struct lu_env *env,
745                     struct cl_io *io, struct cl_page *pg)
746 {
747         PINVRNT(env, pg, cl_page_is_owned(pg, io) ||
748                 pg->cp_state == CPS_FREEING);
749
750         ENTRY;
751         io = cl_io_top(io);
752         cl_page_disown0(env, io, pg);
753         EXIT;
754 }
755 EXPORT_SYMBOL(cl_page_disown);
756
757 /**
758  * Called when page is to be removed from the object, e.g., as a result of
759  * truncate.
760  *
761  * Calls cl_page_operations::cpo_discard() top-to-bottom.
762  *
763  * \pre cl_page_is_owned(pg, io)
764  *
765  * \see cl_page_operations::cpo_discard()
766  */
767 void cl_page_discard(const struct lu_env *env,
768                      struct cl_io *io, struct cl_page *pg)
769 {
770         const struct cl_page_slice *slice;
771
772         PINVRNT(env, pg, cl_page_is_owned(pg, io));
773         PINVRNT(env, pg, cl_page_invariant(pg));
774
775         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
776                 if (slice->cpl_ops->cpo_discard != NULL)
777                         (*slice->cpl_ops->cpo_discard)(env, slice, io);
778         }
779 }
780 EXPORT_SYMBOL(cl_page_discard);
781
782 /**
783  * Version of cl_page_delete() that can be called for not fully constructed
784  * pages, e.g. in an error handling cl_page_find()->cl_page_delete0()
785  * path. Doesn't check page invariant.
786  */
787 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg)
788 {
789         const struct cl_page_slice *slice;
790
791         ENTRY;
792
793         PASSERT(env, pg, pg->cp_state != CPS_FREEING);
794
795         /*
796          * Severe all ways to obtain new pointers to @pg.
797          */
798         cl_page_owner_clear(pg);
799         cl_page_state_set0(env, pg, CPS_FREEING);
800
801         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
802                 if (slice->cpl_ops->cpo_delete != NULL)
803                         (*slice->cpl_ops->cpo_delete)(env, slice);
804         }
805
806         EXIT;
807 }
808
809 /**
810  * Called when a decision is made to throw page out of memory.
811  *
812  * Notifies all layers about page destruction by calling
813  * cl_page_operations::cpo_delete() method top-to-bottom.
814  *
815  * Moves page into cl_page_state::CPS_FREEING state (this is the only place
816  * where transition to this state happens).
817  *
818  * Eliminates all venues through which new references to the page can be
819  * obtained:
820  *
821  *     - removes page from the radix trees,
822  *
823  *     - breaks linkage from VM page to cl_page.
824  *
825  * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
826  * drain after some time, at which point page will be recycled.
827  *
828  * \pre  VM page is locked
829  * \post pg->cp_state == CPS_FREEING
830  *
831  * \see cl_page_operations::cpo_delete()
832  */
833 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
834 {
835         PINVRNT(env, pg, cl_page_invariant(pg));
836         ENTRY;
837         cl_page_delete0(env, pg);
838         EXIT;
839 }
840 EXPORT_SYMBOL(cl_page_delete);
841
842 /**
843  * Marks page up-to-date.
844  *
845  * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
846  * layer responsible for VM interaction has to mark/clear page as up-to-date
847  * by the \a uptodate argument.
848  *
849  * \see cl_page_operations::cpo_export()
850  */
851 void cl_page_export(const struct lu_env *env, struct cl_page *pg, int uptodate)
852 {
853         const struct cl_page_slice *slice;
854
855         PINVRNT(env, pg, cl_page_invariant(pg));
856
857         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
858                 if (slice->cpl_ops->cpo_export != NULL)
859                         (*slice->cpl_ops->cpo_export)(env, slice, uptodate);
860         }
861 }
862 EXPORT_SYMBOL(cl_page_export);
863
864 /**
865  * Returns true, iff \a pg is VM locked in a suitable sense by the calling
866  * thread.
867  */
868 int cl_page_is_vmlocked(const struct lu_env *env, const struct cl_page *pg)
869 {
870         const struct cl_page_slice *slice;
871         int result;
872
873         ENTRY;
874         slice = container_of(pg->cp_layers.next,
875                              const struct cl_page_slice, cpl_linkage);
876         PASSERT(env, pg, slice->cpl_ops->cpo_is_vmlocked != NULL);
877         /*
878          * Call ->cpo_is_vmlocked() directly instead of going through
879          * CL_PAGE_INVOKE(), because cl_page_is_vmlocked() is used by
880          * cl_page_invariant().
881          */
882         result = slice->cpl_ops->cpo_is_vmlocked(env, slice);
883         PASSERT(env, pg, result == -EBUSY || result == -ENODATA);
884         RETURN(result == -EBUSY);
885 }
886 EXPORT_SYMBOL(cl_page_is_vmlocked);
887
888 void cl_page_touch(const struct lu_env *env, const struct cl_page *pg,
889                   size_t to)
890 {
891         const struct cl_page_slice *slice;
892
893         ENTRY;
894
895         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
896                 if (slice->cpl_ops->cpo_page_touch != NULL)
897                         (*slice->cpl_ops->cpo_page_touch)(env, slice, to);
898         }
899
900         EXIT;
901 }
902 EXPORT_SYMBOL(cl_page_touch);
903
904 static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
905 {
906         ENTRY;
907         RETURN(crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN);
908 }
909
910 static void cl_page_io_start(const struct lu_env *env,
911                              struct cl_page *pg, enum cl_req_type crt)
912 {
913         /*
914          * Page is queued for IO, change its state.
915          */
916         ENTRY;
917         cl_page_owner_clear(pg);
918         cl_page_state_set(env, pg, cl_req_type_state(crt));
919         EXIT;
920 }
921
922 /**
923  * Prepares page for immediate transfer. cl_page_operations::cpo_prep() is
924  * called top-to-bottom. Every layer either agrees to submit this page (by
925  * returning 0), or requests to omit this page (by returning -EALREADY). Layer
926  * handling interactions with the VM also has to inform VM that page is under
927  * transfer now.
928  */
929 int cl_page_prep(const struct lu_env *env, struct cl_io *io,
930                  struct cl_page *pg, enum cl_req_type crt)
931 {
932         const struct cl_page_slice *slice;
933         int result = 0;
934
935         PINVRNT(env, pg, cl_page_is_owned(pg, io));
936         PINVRNT(env, pg, cl_page_invariant(pg));
937         PINVRNT(env, pg, crt < CRT_NR);
938
939         /*
940          * XXX this has to be called bottom-to-top, so that llite can set up
941          * PG_writeback without risking other layers deciding to skip this
942          * page.
943          */
944         if (crt >= CRT_NR)
945                 return -EINVAL;
946
947         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
948                 if (slice->cpl_ops->cpo_own)
949                         result = (*slice->cpl_ops->io[crt].cpo_prep)(env,
950                                                                      slice,
951                                                                      io);
952
953                 if (result != 0)
954                         break;
955
956         }
957
958         if (result >= 0) {
959                 result = 0;
960                 cl_page_io_start(env, pg, crt);
961         }
962
963         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
964         return result;
965 }
966 EXPORT_SYMBOL(cl_page_prep);
967
968 /**
969  * Notify layers about transfer completion.
970  *
971  * Invoked by transfer sub-system (which is a part of osc) to notify layers
972  * that a transfer, of which this page is a part of has completed.
973  *
974  * Completion call-backs are executed in the bottom-up order, so that
975  * uppermost layer (llite), responsible for the VFS/VM interaction runs last
976  * and can release locks safely.
977  *
978  * \pre  pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
979  * \post pg->cp_state == CPS_CACHED
980  *
981  * \see cl_page_operations::cpo_completion()
982  */
983 void cl_page_completion(const struct lu_env *env,
984                         struct cl_page *pg, enum cl_req_type crt, int ioret)
985 {
986         const struct cl_page_slice *slice;
987         struct cl_sync_io *anchor = pg->cp_sync_io;
988
989         PASSERT(env, pg, crt < CRT_NR);
990         PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
991
992         ENTRY;
993         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, ioret);
994         cl_page_state_set(env, pg, CPS_CACHED);
995         if (crt >= CRT_NR)
996                 return;
997
998         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
999                 if (slice->cpl_ops->io[crt].cpo_completion != NULL)
1000                         (*slice->cpl_ops->io[crt].cpo_completion)(env, slice,
1001                                                                   ioret);
1002         }
1003
1004         if (anchor != NULL) {
1005                 LASSERT(pg->cp_sync_io == anchor);
1006                 pg->cp_sync_io = NULL;
1007                 cl_sync_io_note(env, anchor, ioret);
1008         }
1009         EXIT;
1010 }
1011 EXPORT_SYMBOL(cl_page_completion);
1012
1013 /**
1014  * Notify layers that transfer formation engine decided to yank this page from
1015  * the cache and to make it a part of a transfer.
1016  *
1017  * \pre  pg->cp_state == CPS_CACHED
1018  * \post pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
1019  *
1020  * \see cl_page_operations::cpo_make_ready()
1021  */
1022 int cl_page_make_ready(const struct lu_env *env, struct cl_page *pg,
1023                        enum cl_req_type crt)
1024 {
1025         const struct cl_page_slice *sli;
1026         int result = 0;
1027
1028         PINVRNT(env, pg, crt < CRT_NR);
1029
1030         ENTRY;
1031         if (crt >= CRT_NR)
1032                 RETURN(-EINVAL);
1033
1034         list_for_each_entry(sli, &pg->cp_layers, cpl_linkage) {
1035                 if (sli->cpl_ops->io[crt].cpo_make_ready != NULL)
1036                         result = (*sli->cpl_ops->io[crt].cpo_make_ready)(env,
1037                                                                          sli);
1038                 if (result != 0)
1039                         break;
1040         }
1041
1042         if (result >= 0) {
1043                 result = 0;
1044                 PASSERT(env, pg, pg->cp_state == CPS_CACHED);
1045                 cl_page_io_start(env, pg, crt);
1046         }
1047         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
1048         RETURN(result);
1049 }
1050 EXPORT_SYMBOL(cl_page_make_ready);
1051
1052 /**
1053  * Called if a pge is being written back by kernel's intention.
1054  *
1055  * \pre  cl_page_is_owned(pg, io)
1056  * \post ergo(result == 0, pg->cp_state == CPS_PAGEOUT)
1057  *
1058  * \see cl_page_operations::cpo_flush()
1059  */
1060 int cl_page_flush(const struct lu_env *env, struct cl_io *io,
1061                   struct cl_page *pg)
1062 {
1063         const struct cl_page_slice *slice;
1064         int result = 0;
1065
1066         PINVRNT(env, pg, cl_page_is_owned(pg, io));
1067         PINVRNT(env, pg, cl_page_invariant(pg));
1068
1069         ENTRY;
1070
1071         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
1072                 if (slice->cpl_ops->cpo_flush != NULL)
1073                         result = (*slice->cpl_ops->cpo_flush)(env, slice, io);
1074                 if (result != 0)
1075                         break;
1076         }
1077         if (result > 0)
1078                 result = 0;
1079
1080         CL_PAGE_HEADER(D_TRACE, env, pg, "%d\n", result);
1081         RETURN(result);
1082 }
1083 EXPORT_SYMBOL(cl_page_flush);
1084
1085 /**
1086  * Tells transfer engine that only part of a page is to be transmitted.
1087  *
1088  * \see cl_page_operations::cpo_clip()
1089  */
1090 void cl_page_clip(const struct lu_env *env, struct cl_page *pg,
1091                   int from, int to)
1092 {
1093         const struct cl_page_slice *slice;
1094
1095         PINVRNT(env, pg, cl_page_invariant(pg));
1096
1097         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", from, to);
1098         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
1099                 if (slice->cpl_ops->cpo_clip != NULL)
1100                         (*slice->cpl_ops->cpo_clip)(env, slice, from, to);
1101         }
1102 }
1103 EXPORT_SYMBOL(cl_page_clip);
1104
1105 /**
1106  * Prints human readable representation of \a pg to the \a f.
1107  */
1108 void cl_page_header_print(const struct lu_env *env, void *cookie,
1109                           lu_printer_t printer, const struct cl_page *pg)
1110 {
1111         (*printer)(env, cookie,
1112                    "page@%p[%d %p %d %d %p]\n",
1113                    pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1114                    pg->cp_state, pg->cp_type,
1115                    pg->cp_owner);
1116 }
1117 EXPORT_SYMBOL(cl_page_header_print);
1118
1119 /**
1120  * Prints human readable representation of \a pg to the \a f.
1121  */
1122 void cl_page_print(const struct lu_env *env, void *cookie,
1123                    lu_printer_t printer, const struct cl_page *pg)
1124 {
1125         const struct cl_page_slice *slice;
1126         int result = 0;
1127
1128         cl_page_header_print(env, cookie, printer, pg);
1129         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
1130                 if (slice->cpl_ops->cpo_print != NULL)
1131                         result = (*slice->cpl_ops->cpo_print)(env, slice,
1132                                                              cookie, printer);
1133                 if (result != 0)
1134                         break;
1135         }
1136         (*printer)(env, cookie, "end page@%p\n", pg);
1137 }
1138 EXPORT_SYMBOL(cl_page_print);
1139
1140 /**
1141  * Converts a byte offset within object \a obj into a page index.
1142  */
1143 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1144 {
1145         return (loff_t)idx << PAGE_SHIFT;
1146 }
1147 EXPORT_SYMBOL(cl_offset);
1148
1149 /**
1150  * Converts a page index into a byte offset within object \a obj.
1151  */
1152 pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1153 {
1154         return offset >> PAGE_SHIFT;
1155 }
1156 EXPORT_SYMBOL(cl_index);
1157
1158 size_t cl_page_size(const struct cl_object *obj)
1159 {
1160         return 1UL << PAGE_SHIFT;
1161 }
1162 EXPORT_SYMBOL(cl_page_size);
1163
1164 /**
1165  * Adds page slice to the compound page.
1166  *
1167  * This is called by cl_object_operations::coo_page_init() methods to add a
1168  * per-layer state to the page. New state is added at the end of
1169  * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1170  *
1171  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1172  */
1173 void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
1174                        struct cl_object *obj, pgoff_t index,
1175                        const struct cl_page_operations *ops)
1176 {
1177         ENTRY;
1178         list_add_tail(&slice->cpl_linkage, &page->cp_layers);
1179         slice->cpl_obj  = obj;
1180         slice->cpl_index = index;
1181         slice->cpl_ops  = ops;
1182         slice->cpl_page = page;
1183         EXIT;
1184 }
1185 EXPORT_SYMBOL(cl_page_slice_add);
1186
1187 /**
1188  * Allocate and initialize cl_cache, called by ll_init_sbi().
1189  */
1190 struct cl_client_cache *cl_cache_init(unsigned long lru_page_max)
1191 {
1192         struct cl_client_cache  *cache = NULL;
1193
1194         ENTRY;
1195         OBD_ALLOC(cache, sizeof(*cache));
1196         if (cache == NULL)
1197                 RETURN(NULL);
1198
1199         /* Initialize cache data */
1200         atomic_set(&cache->ccc_users, 1);
1201         cache->ccc_lru_max = lru_page_max;
1202         atomic_long_set(&cache->ccc_lru_left, lru_page_max);
1203         spin_lock_init(&cache->ccc_lru_lock);
1204         INIT_LIST_HEAD(&cache->ccc_lru);
1205
1206         /* turn unstable check off by default as it impacts performance */
1207         cache->ccc_unstable_check = 0;
1208         atomic_long_set(&cache->ccc_unstable_nr, 0);
1209         init_waitqueue_head(&cache->ccc_unstable_waitq);
1210         mutex_init(&cache->ccc_max_cache_mb_lock);
1211
1212         RETURN(cache);
1213 }
1214 EXPORT_SYMBOL(cl_cache_init);
1215
1216 /**
1217  * Increase cl_cache refcount
1218  */
1219 void cl_cache_incref(struct cl_client_cache *cache)
1220 {
1221         atomic_inc(&cache->ccc_users);
1222 }
1223 EXPORT_SYMBOL(cl_cache_incref);
1224
1225 /**
1226  * Decrease cl_cache refcount and free the cache if refcount=0.
1227  * Since llite, lov and osc all hold cl_cache refcount,
1228  * the free will not cause race. (LU-6173)
1229  */
1230 void cl_cache_decref(struct cl_client_cache *cache)
1231 {
1232         if (atomic_dec_and_test(&cache->ccc_users))
1233                 OBD_FREE(cache, sizeof(*cache));
1234 }
1235 EXPORT_SYMBOL(cl_cache_decref);