Whamcloud - gitweb
LU-9855 obdclass: Code cleanup
[fs/lustre-release.git] / lustre / obdclass / cl_page.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * Client Lustre Page.
33  *
34  *   Author: Nikita Danilov <nikita.danilov@sun.com>
35  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/list.h>
41 #include <libcfs/libcfs.h>
42 #include <obd_class.h>
43 #include <obd_support.h>
44
45 #include <cl_object.h>
46 #include "cl_internal.h"
47
48 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg);
49
50 #ifdef LIBCFS_DEBUG
51 # define PASSERT(env, page, expr)                                       \
52   do {                                                                    \
53           if (unlikely(!(expr))) {                                      \
54                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
55                   LASSERT(0);                                           \
56           }                                                             \
57   } while (0)
58 #else /* !LIBCFS_DEBUG */
59 # define PASSERT(env, page, exp) \
60         ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
61 #endif /* !LIBCFS_DEBUG */
62
63 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
64 # define PINVRNT(env, page, expr)                                       \
65   do {                                                                    \
66           if (unlikely(!(expr))) {                                      \
67                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
68                   LINVRNT(0);                                           \
69           }                                                             \
70   } while (0)
71 #else /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
72 # define PINVRNT(env, page, exp) \
73          ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
74 #endif /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
75
76 /* Disable page statistic by default due to huge performance penalty. */
77 static void cs_page_inc(const struct cl_object *obj,
78                         enum cache_stats_item item)
79 {
80 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
81         atomic_inc(&cl_object_site(obj)->cs_pages.cs_stats[item]);
82 #endif
83 }
84
85 static void cs_page_dec(const struct cl_object *obj,
86                         enum cache_stats_item item)
87 {
88 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
89         atomic_dec(&cl_object_site(obj)->cs_pages.cs_stats[item]);
90 #endif
91 }
92
93 static void cs_pagestate_inc(const struct cl_object *obj,
94                              enum cl_page_state state)
95 {
96 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
97         atomic_inc(&cl_object_site(obj)->cs_pages_state[state]);
98 #endif
99 }
100
101 static void cs_pagestate_dec(const struct cl_object *obj,
102                               enum cl_page_state state)
103 {
104 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
105         atomic_dec(&cl_object_site(obj)->cs_pages_state[state]);
106 #endif
107 }
108
109 /**
110  * Internal version of cl_page_get().
111  *
112  * This function can be used to obtain initial reference to previously
113  * unreferenced cached object. It can be called only if concurrent page
114  * reclamation is somehow prevented, e.g., by keeping a lock on a VM page,
115  * associated with \a page.
116  *
117  * Use with care! Not exported.
118  */
119 static void cl_page_get_trust(struct cl_page *page)
120 {
121         LASSERT(atomic_read(&page->cp_ref) > 0);
122         atomic_inc(&page->cp_ref);
123 }
124
125 /**
126  * Returns a slice within a page, corresponding to the given layer in the
127  * device stack.
128  *
129  * \see cl_lock_at()
130  */
131 static const struct cl_page_slice *
132 cl_page_at_trusted(const struct cl_page *page,
133                    const struct lu_device_type *dtype)
134 {
135         const struct cl_page_slice *slice;
136         ENTRY;
137
138         list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
139                 if (slice->cpl_obj->co_lu.lo_dev->ld_type == dtype)
140                         RETURN(slice);
141         }
142         RETURN(NULL);
143 }
144
145 static void cl_page_free(const struct lu_env *env, struct cl_page *page)
146 {
147         struct cl_object *obj  = page->cp_obj;
148         int pagesize = cl_object_header(obj)->coh_page_bufsize;
149
150         PASSERT(env, page, list_empty(&page->cp_batch));
151         PASSERT(env, page, page->cp_owner == NULL);
152         PASSERT(env, page, page->cp_state == CPS_FREEING);
153
154         ENTRY;
155         while (!list_empty(&page->cp_layers)) {
156                 struct cl_page_slice *slice;
157
158                 slice = list_entry(page->cp_layers.next,
159                                    struct cl_page_slice, cpl_linkage);
160                 list_del_init(page->cp_layers.next);
161                 if (unlikely(slice->cpl_ops->cpo_fini != NULL))
162                         slice->cpl_ops->cpo_fini(env, slice);
163         }
164         cs_page_dec(obj, CS_total);
165         cs_pagestate_dec(obj, page->cp_state);
166         lu_object_ref_del_at(&obj->co_lu, &page->cp_obj_ref, "cl_page", page);
167         cl_object_put(env, obj);
168         lu_ref_fini(&page->cp_reference);
169         OBD_FREE(page, pagesize);
170         EXIT;
171 }
172
173 /**
174  * Helper function updating page state. This is the only place in the code
175  * where cl_page::cp_state field is mutated.
176  */
177 static inline void cl_page_state_set_trust(struct cl_page *page,
178                                            enum cl_page_state state)
179 {
180         /* bypass const. */
181         *(enum cl_page_state *)&page->cp_state = state;
182 }
183
184 struct cl_page *cl_page_alloc(const struct lu_env *env,
185                 struct cl_object *o, pgoff_t ind, struct page *vmpage,
186                 enum cl_page_type type)
187 {
188         struct cl_page          *page;
189         struct lu_object_header *head;
190
191         ENTRY;
192         OBD_ALLOC_GFP(page, cl_object_header(o)->coh_page_bufsize,
193                         GFP_NOFS);
194         if (page != NULL) {
195                 int result = 0;
196                 atomic_set(&page->cp_ref, 1);
197                 page->cp_obj = o;
198                 cl_object_get(o);
199                 lu_object_ref_add_at(&o->co_lu, &page->cp_obj_ref, "cl_page",
200                                      page);
201                 page->cp_vmpage = vmpage;
202                 cl_page_state_set_trust(page, CPS_CACHED);
203                 page->cp_type = type;
204                 INIT_LIST_HEAD(&page->cp_layers);
205                 INIT_LIST_HEAD(&page->cp_batch);
206                 lu_ref_init(&page->cp_reference);
207                 head = o->co_lu.lo_header;
208                 list_for_each_entry(o, &head->loh_layers,
209                                     co_lu.lo_linkage) {
210                         if (o->co_ops->coo_page_init != NULL) {
211                                 result = o->co_ops->coo_page_init(env, o, page,
212                                                                   ind);
213                                 if (result != 0) {
214                                         cl_page_delete0(env, page);
215                                         cl_page_free(env, page);
216                                         page = ERR_PTR(result);
217                                         break;
218                                 }
219                         }
220                 }
221                 if (result == 0) {
222                         cs_page_inc(o, CS_total);
223                         cs_page_inc(o, CS_create);
224                         cs_pagestate_dec(o, CPS_CACHED);
225                 }
226         } else {
227                 page = ERR_PTR(-ENOMEM);
228         }
229         RETURN(page);
230 }
231
232 /**
233  * Returns a cl_page with index \a idx at the object \a o, and associated with
234  * the VM page \a vmpage.
235  *
236  * This is the main entry point into the cl_page caching interface. First, a
237  * cache (implemented as a per-object radix tree) is consulted. If page is
238  * found there, it is returned immediately. Otherwise new page is allocated
239  * and returned. In any case, additional reference to page is acquired.
240  *
241  * \see cl_object_find(), cl_lock_find()
242  */
243 struct cl_page *cl_page_find(const struct lu_env *env,
244                              struct cl_object *o,
245                              pgoff_t idx, struct page *vmpage,
246                              enum cl_page_type type)
247 {
248         struct cl_page          *page = NULL;
249         struct cl_object_header *hdr;
250
251         LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
252         might_sleep();
253
254         ENTRY;
255
256         hdr = cl_object_header(o);
257         cs_page_inc(o, CS_lookup);
258
259         CDEBUG(D_PAGE, "%lu@"DFID" %p %lx %d\n",
260                idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
261         /* fast path. */
262         if (type == CPT_CACHEABLE) {
263                 /* vmpage lock is used to protect the child/parent
264                  * relationship */
265                 KLASSERT(PageLocked(vmpage));
266                 /*
267                  * cl_vmpage_page() can be called here without any locks as
268                  *
269                  *     - "vmpage" is locked (which prevents ->private from
270                  *       concurrent updates), and
271                  *
272                  *     - "o" cannot be destroyed while current thread holds a
273                  *       reference on it.
274                  */
275                 page = cl_vmpage_page(vmpage, o);
276                 if (page != NULL) {
277                         cs_page_inc(o, CS_hit);
278                         RETURN(page);
279                 }
280         }
281
282         /* allocate and initialize cl_page */
283         page = cl_page_alloc(env, o, idx, vmpage, type);
284         RETURN(page);
285 }
286 EXPORT_SYMBOL(cl_page_find);
287
288 static inline int cl_page_invariant(const struct cl_page *pg)
289 {
290         return cl_page_in_use_noref(pg);
291 }
292
293 static void cl_page_state_set0(const struct lu_env *env,
294                                struct cl_page *page, enum cl_page_state state)
295 {
296         enum cl_page_state old;
297
298         /*
299          * Matrix of allowed state transitions [old][new], for sanity
300          * checking.
301          */
302         static const int allowed_transitions[CPS_NR][CPS_NR] = {
303                 [CPS_CACHED] = {
304                         [CPS_CACHED]  = 0,
305                         [CPS_OWNED]   = 1, /* io finds existing cached page */
306                         [CPS_PAGEIN]  = 0,
307                         [CPS_PAGEOUT] = 1, /* write-out from the cache */
308                         [CPS_FREEING] = 1, /* eviction on the memory pressure */
309                 },
310                 [CPS_OWNED] = {
311                         [CPS_CACHED]  = 1, /* release to the cache */
312                         [CPS_OWNED]   = 0,
313                         [CPS_PAGEIN]  = 1, /* start read immediately */
314                         [CPS_PAGEOUT] = 1, /* start write immediately */
315                         [CPS_FREEING] = 1, /* lock invalidation or truncate */
316                 },
317                 [CPS_PAGEIN] = {
318                         [CPS_CACHED]  = 1, /* io completion */
319                         [CPS_OWNED]   = 0,
320                         [CPS_PAGEIN]  = 0,
321                         [CPS_PAGEOUT] = 0,
322                         [CPS_FREEING] = 0,
323                 },
324                 [CPS_PAGEOUT] = {
325                         [CPS_CACHED]  = 1, /* io completion */
326                         [CPS_OWNED]   = 0,
327                         [CPS_PAGEIN]  = 0,
328                         [CPS_PAGEOUT] = 0,
329                         [CPS_FREEING] = 0,
330                 },
331                 [CPS_FREEING] = {
332                         [CPS_CACHED]  = 0,
333                         [CPS_OWNED]   = 0,
334                         [CPS_PAGEIN]  = 0,
335                         [CPS_PAGEOUT] = 0,
336                         [CPS_FREEING] = 0,
337                 }
338         };
339
340         ENTRY;
341         old = page->cp_state;
342         PASSERT(env, page, allowed_transitions[old][state]);
343         CL_PAGE_HEADER(D_TRACE, env, page, "%d -> %d\n", old, state);
344         PASSERT(env, page, page->cp_state == old);
345         PASSERT(env, page, equi(state == CPS_OWNED, page->cp_owner != NULL));
346
347         cs_pagestate_dec(page->cp_obj, page->cp_state);
348         cs_pagestate_inc(page->cp_obj, state);
349         cl_page_state_set_trust(page, state);
350         EXIT;
351 }
352
353 static void cl_page_state_set(const struct lu_env *env,
354                               struct cl_page *page, enum cl_page_state state)
355 {
356         cl_page_state_set0(env, page, state);
357 }
358
359 /**
360  * Acquires an additional reference to a page.
361  *
362  * This can be called only by caller already possessing a reference to \a
363  * page.
364  *
365  * \see cl_object_get(), cl_lock_get().
366  */
367 void cl_page_get(struct cl_page *page)
368 {
369         ENTRY;
370         cl_page_get_trust(page);
371         EXIT;
372 }
373 EXPORT_SYMBOL(cl_page_get);
374
375 /**
376  * Releases a reference to a page.
377  *
378  * When last reference is released, page is returned to the cache, unless it
379  * is in cl_page_state::CPS_FREEING state, in which case it is immediately
380  * destroyed.
381  *
382  * \see cl_object_put(), cl_lock_put().
383  */
384 void cl_page_put(const struct lu_env *env, struct cl_page *page)
385 {
386         ENTRY;
387         CL_PAGE_HEADER(D_TRACE, env, page, "%d\n",
388                        atomic_read(&page->cp_ref));
389
390         if (atomic_dec_and_test(&page->cp_ref)) {
391                 LASSERT(page->cp_state == CPS_FREEING);
392
393                 LASSERT(atomic_read(&page->cp_ref) == 0);
394                 PASSERT(env, page, page->cp_owner == NULL);
395                 PASSERT(env, page, list_empty(&page->cp_batch));
396                 /*
397                  * Page is no longer reachable by other threads. Tear
398                  * it down.
399                  */
400                 cl_page_free(env, page);
401         }
402
403         EXIT;
404 }
405 EXPORT_SYMBOL(cl_page_put);
406
407 /**
408  * Returns a cl_page associated with a VM page, and given cl_object.
409  */
410 struct cl_page *cl_vmpage_page(struct page *vmpage, struct cl_object *obj)
411 {
412         struct cl_page *page;
413
414         ENTRY;
415         KLASSERT(PageLocked(vmpage));
416
417         /*
418          * NOTE: absence of races and liveness of data are guaranteed by page
419          *       lock on a "vmpage". That works because object destruction has
420          *       bottom-to-top pass.
421          */
422
423         page = (struct cl_page *)vmpage->private;
424         if (page != NULL) {
425                 cl_page_get_trust(page);
426                 LASSERT(page->cp_type == CPT_CACHEABLE);
427         }
428         RETURN(page);
429 }
430 EXPORT_SYMBOL(cl_vmpage_page);
431
432 const struct cl_page_slice *cl_page_at(const struct cl_page *page,
433                                        const struct lu_device_type *dtype)
434 {
435         return cl_page_at_trusted(page, dtype);
436 }
437 EXPORT_SYMBOL(cl_page_at);
438
439 static void cl_page_owner_clear(struct cl_page *page)
440 {
441         ENTRY;
442         if (page->cp_owner != NULL) {
443                 LASSERT(page->cp_owner->ci_owned_nr > 0);
444                 page->cp_owner->ci_owned_nr--;
445                 page->cp_owner = NULL;
446         }
447         EXIT;
448 }
449
450 static void cl_page_owner_set(struct cl_page *page)
451 {
452         ENTRY;
453         LASSERT(page->cp_owner != NULL);
454         page->cp_owner->ci_owned_nr++;
455         EXIT;
456 }
457
458 void cl_page_disown0(const struct lu_env *env,
459                      struct cl_io *io, struct cl_page *pg)
460 {
461         const struct cl_page_slice *slice;
462         enum cl_page_state state;
463
464         ENTRY;
465         state = pg->cp_state;
466         PINVRNT(env, pg, state == CPS_OWNED || state == CPS_FREEING);
467         PINVRNT(env, pg, cl_page_invariant(pg) || state == CPS_FREEING);
468         cl_page_owner_clear(pg);
469
470         if (state == CPS_OWNED)
471                 cl_page_state_set(env, pg, CPS_CACHED);
472         /*
473          * Completion call-backs are executed in the bottom-up order, so that
474          * uppermost layer (llite), responsible for VFS/VM interaction runs
475          * last and can release locks safely.
476          */
477         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
478                 if (slice->cpl_ops->cpo_disown != NULL)
479                         (*slice->cpl_ops->cpo_disown)(env, slice, io);
480         }
481
482         EXIT;
483 }
484
485 /**
486  * returns true, iff page is owned by the given io.
487  */
488 int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
489 {
490         struct cl_io *top = cl_io_top((struct cl_io *)io);
491         LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
492         ENTRY;
493         RETURN(pg->cp_state == CPS_OWNED && pg->cp_owner == top);
494 }
495 EXPORT_SYMBOL(cl_page_is_owned);
496
497 /**
498  * Try to own a page by IO.
499  *
500  * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
501  * into cl_page_state::CPS_OWNED state.
502  *
503  * \pre  !cl_page_is_owned(pg, io)
504  * \post result == 0 iff cl_page_is_owned(pg, io)
505  *
506  * \retval 0   success
507  *
508  * \retval -ve failure, e.g., page was destroyed (and landed in
509  *             cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
510  *             or, page was owned by another thread, or in IO.
511  *
512  * \see cl_page_disown()
513  * \see cl_page_operations::cpo_own()
514  * \see cl_page_own_try()
515  * \see cl_page_own
516  */
517 static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
518                         struct cl_page *pg, int nonblock)
519 {
520         int result = 0;
521         const struct cl_page_slice *slice;
522
523         PINVRNT(env, pg, !cl_page_is_owned(pg, io));
524
525         ENTRY;
526         io = cl_io_top(io);
527
528         if (pg->cp_state == CPS_FREEING) {
529                 result = -ENOENT;
530                 goto out;
531         }
532
533         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
534                 if (slice->cpl_ops->cpo_own)
535                         result = (*slice->cpl_ops->cpo_own)(env, slice,
536                                                             io, nonblock);
537
538                 if (result != 0)
539                         break;
540
541         }
542         if (result > 0)
543                 result = 0;
544
545         if (result == 0) {
546                 PASSERT(env, pg, pg->cp_owner == NULL);
547                 pg->cp_owner = cl_io_top(io);
548                 cl_page_owner_set(pg);
549                 if (pg->cp_state != CPS_FREEING) {
550                         cl_page_state_set(env, pg, CPS_OWNED);
551                 } else {
552                         cl_page_disown0(env, io, pg);
553                         result = -ENOENT;
554                 }
555         }
556
557 out:
558         PINVRNT(env, pg, ergo(result == 0, cl_page_invariant(pg)));
559         RETURN(result);
560 }
561
562 /**
563  * Own a page, might be blocked.
564  *
565  * \see cl_page_own0()
566  */
567 int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
568 {
569         return cl_page_own0(env, io, pg, 0);
570 }
571 EXPORT_SYMBOL(cl_page_own);
572
573 /**
574  * Nonblock version of cl_page_own().
575  *
576  * \see cl_page_own0()
577  */
578 int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
579                     struct cl_page *pg)
580 {
581         return cl_page_own0(env, io, pg, 1);
582 }
583 EXPORT_SYMBOL(cl_page_own_try);
584
585
586 /**
587  * Assume page ownership.
588  *
589  * Called when page is already locked by the hosting VM.
590  *
591  * \pre !cl_page_is_owned(pg, io)
592  * \post cl_page_is_owned(pg, io)
593  *
594  * \see cl_page_operations::cpo_assume()
595  */
596 void cl_page_assume(const struct lu_env *env,
597                     struct cl_io *io, struct cl_page *pg)
598 {
599         const struct cl_page_slice *slice;
600
601         PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj));
602
603         ENTRY;
604         io = cl_io_top(io);
605
606         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
607                 if (slice->cpl_ops->cpo_assume != NULL)
608                         (*slice->cpl_ops->cpo_assume)(env, slice, io);
609         }
610
611         PASSERT(env, pg, pg->cp_owner == NULL);
612         pg->cp_owner = cl_io_top(io);
613         cl_page_owner_set(pg);
614         cl_page_state_set(env, pg, CPS_OWNED);
615         EXIT;
616 }
617 EXPORT_SYMBOL(cl_page_assume);
618
619 /**
620  * Releases page ownership without unlocking the page.
621  *
622  * Moves page into cl_page_state::CPS_CACHED without releasing a lock on the
623  * underlying VM page (as VM is supposed to do this itself).
624  *
625  * \pre   cl_page_is_owned(pg, io)
626  * \post !cl_page_is_owned(pg, io)
627  *
628  * \see cl_page_assume()
629  */
630 void cl_page_unassume(const struct lu_env *env,
631                       struct cl_io *io, struct cl_page *pg)
632 {
633         const struct cl_page_slice *slice;
634
635         PINVRNT(env, pg, cl_page_is_owned(pg, io));
636         PINVRNT(env, pg, cl_page_invariant(pg));
637
638         ENTRY;
639         io = cl_io_top(io);
640         cl_page_owner_clear(pg);
641         cl_page_state_set(env, pg, CPS_CACHED);
642
643         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
644                 if (slice->cpl_ops->cpo_unassume != NULL)
645                         (*slice->cpl_ops->cpo_unassume)(env, slice, io);
646         }
647
648         EXIT;
649 }
650 EXPORT_SYMBOL(cl_page_unassume);
651
652 /**
653  * Releases page ownership.
654  *
655  * Moves page into cl_page_state::CPS_CACHED.
656  *
657  * \pre   cl_page_is_owned(pg, io)
658  * \post !cl_page_is_owned(pg, io)
659  *
660  * \see cl_page_own()
661  * \see cl_page_operations::cpo_disown()
662  */
663 void cl_page_disown(const struct lu_env *env,
664                     struct cl_io *io, struct cl_page *pg)
665 {
666         PINVRNT(env, pg, cl_page_is_owned(pg, io) ||
667                 pg->cp_state == CPS_FREEING);
668
669         ENTRY;
670         io = cl_io_top(io);
671         cl_page_disown0(env, io, pg);
672         EXIT;
673 }
674 EXPORT_SYMBOL(cl_page_disown);
675
676 /**
677  * Called when page is to be removed from the object, e.g., as a result of
678  * truncate.
679  *
680  * Calls cl_page_operations::cpo_discard() top-to-bottom.
681  *
682  * \pre cl_page_is_owned(pg, io)
683  *
684  * \see cl_page_operations::cpo_discard()
685  */
686 void cl_page_discard(const struct lu_env *env,
687                      struct cl_io *io, struct cl_page *pg)
688 {
689         const struct cl_page_slice *slice;
690
691         PINVRNT(env, pg, cl_page_is_owned(pg, io));
692         PINVRNT(env, pg, cl_page_invariant(pg));
693
694         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
695                 if (slice->cpl_ops->cpo_discard != NULL)
696                         (*slice->cpl_ops->cpo_discard)(env, slice, io);
697         }
698 }
699 EXPORT_SYMBOL(cl_page_discard);
700
701 /**
702  * Version of cl_page_delete() that can be called for not fully constructed
703  * pages, e.g. in an error handling cl_page_find()->cl_page_delete0()
704  * path. Doesn't check page invariant.
705  */
706 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg)
707 {
708         const struct cl_page_slice *slice;
709
710         ENTRY;
711
712         PASSERT(env, pg, pg->cp_state != CPS_FREEING);
713
714         /*
715          * Severe all ways to obtain new pointers to @pg.
716          */
717         cl_page_owner_clear(pg);
718         cl_page_state_set0(env, pg, CPS_FREEING);
719
720         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
721                 if (slice->cpl_ops->cpo_delete != NULL)
722                         (*slice->cpl_ops->cpo_delete)(env, slice);
723         }
724
725         EXIT;
726 }
727
728 /**
729  * Called when a decision is made to throw page out of memory.
730  *
731  * Notifies all layers about page destruction by calling
732  * cl_page_operations::cpo_delete() method top-to-bottom.
733  *
734  * Moves page into cl_page_state::CPS_FREEING state (this is the only place
735  * where transition to this state happens).
736  *
737  * Eliminates all venues through which new references to the page can be
738  * obtained:
739  *
740  *     - removes page from the radix trees,
741  *
742  *     - breaks linkage from VM page to cl_page.
743  *
744  * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
745  * drain after some time, at which point page will be recycled.
746  *
747  * \pre  VM page is locked
748  * \post pg->cp_state == CPS_FREEING
749  *
750  * \see cl_page_operations::cpo_delete()
751  */
752 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
753 {
754         PINVRNT(env, pg, cl_page_invariant(pg));
755         ENTRY;
756         cl_page_delete0(env, pg);
757         EXIT;
758 }
759 EXPORT_SYMBOL(cl_page_delete);
760
761 /**
762  * Marks page up-to-date.
763  *
764  * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
765  * layer responsible for VM interaction has to mark/clear page as up-to-date
766  * by the \a uptodate argument.
767  *
768  * \see cl_page_operations::cpo_export()
769  */
770 void cl_page_export(const struct lu_env *env, struct cl_page *pg, int uptodate)
771 {
772         const struct cl_page_slice *slice;
773
774         PINVRNT(env, pg, cl_page_invariant(pg));
775
776         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
777                 if (slice->cpl_ops->cpo_export != NULL)
778                         (*slice->cpl_ops->cpo_export)(env, slice, uptodate);
779         }
780 }
781 EXPORT_SYMBOL(cl_page_export);
782
783 /**
784  * Returns true, iff \a pg is VM locked in a suitable sense by the calling
785  * thread.
786  */
787 int cl_page_is_vmlocked(const struct lu_env *env, const struct cl_page *pg)
788 {
789         const struct cl_page_slice *slice;
790         int result;
791
792         ENTRY;
793         slice = container_of(pg->cp_layers.next,
794                              const struct cl_page_slice, cpl_linkage);
795         PASSERT(env, pg, slice->cpl_ops->cpo_is_vmlocked != NULL);
796         /*
797          * Call ->cpo_is_vmlocked() directly instead of going through
798          * CL_PAGE_INVOKE(), because cl_page_is_vmlocked() is used by
799          * cl_page_invariant().
800          */
801         result = slice->cpl_ops->cpo_is_vmlocked(env, slice);
802         PASSERT(env, pg, result == -EBUSY || result == -ENODATA);
803         RETURN(result == -EBUSY);
804 }
805 EXPORT_SYMBOL(cl_page_is_vmlocked);
806
807 static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
808 {
809         ENTRY;
810         RETURN(crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN);
811 }
812
813 static void cl_page_io_start(const struct lu_env *env,
814                              struct cl_page *pg, enum cl_req_type crt)
815 {
816         /*
817          * Page is queued for IO, change its state.
818          */
819         ENTRY;
820         cl_page_owner_clear(pg);
821         cl_page_state_set(env, pg, cl_req_type_state(crt));
822         EXIT;
823 }
824
825 /**
826  * Prepares page for immediate transfer. cl_page_operations::cpo_prep() is
827  * called top-to-bottom. Every layer either agrees to submit this page (by
828  * returning 0), or requests to omit this page (by returning -EALREADY). Layer
829  * handling interactions with the VM also has to inform VM that page is under
830  * transfer now.
831  */
832 int cl_page_prep(const struct lu_env *env, struct cl_io *io,
833                  struct cl_page *pg, enum cl_req_type crt)
834 {
835         const struct cl_page_slice *slice;
836         int result = 0;
837
838         PINVRNT(env, pg, cl_page_is_owned(pg, io));
839         PINVRNT(env, pg, cl_page_invariant(pg));
840         PINVRNT(env, pg, crt < CRT_NR);
841
842         /*
843          * XXX this has to be called bottom-to-top, so that llite can set up
844          * PG_writeback without risking other layers deciding to skip this
845          * page.
846          */
847         if (crt >= CRT_NR)
848                 return -EINVAL;
849
850         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
851                 if (slice->cpl_ops->cpo_own)
852                         result = (*slice->cpl_ops->io[crt].cpo_prep)(env,
853                                                                      slice,
854                                                                      io);
855
856                 if (result != 0)
857                         break;
858
859         }
860
861         if (result >= 0) {
862                 result = 0;
863                 cl_page_io_start(env, pg, crt);
864         }
865
866         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
867         return result;
868 }
869 EXPORT_SYMBOL(cl_page_prep);
870
871 /**
872  * Notify layers about transfer completion.
873  *
874  * Invoked by transfer sub-system (which is a part of osc) to notify layers
875  * that a transfer, of which this page is a part of has completed.
876  *
877  * Completion call-backs are executed in the bottom-up order, so that
878  * uppermost layer (llite), responsible for the VFS/VM interaction runs last
879  * and can release locks safely.
880  *
881  * \pre  pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
882  * \post pg->cp_state == CPS_CACHED
883  *
884  * \see cl_page_operations::cpo_completion()
885  */
886 void cl_page_completion(const struct lu_env *env,
887                         struct cl_page *pg, enum cl_req_type crt, int ioret)
888 {
889         const struct cl_page_slice *slice;
890         struct cl_sync_io *anchor = pg->cp_sync_io;
891
892         PASSERT(env, pg, crt < CRT_NR);
893         PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
894
895         ENTRY;
896         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, ioret);
897         cl_page_state_set(env, pg, CPS_CACHED);
898         if (crt >= CRT_NR)
899                 return;
900
901         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
902                 if (slice->cpl_ops->io[crt].cpo_completion != NULL)
903                         (*slice->cpl_ops->io[crt].cpo_completion)(env, slice,
904                                                                   ioret);
905         }
906
907         if (anchor != NULL) {
908                 LASSERT(pg->cp_sync_io == anchor);
909                 pg->cp_sync_io = NULL;
910                 cl_sync_io_note(env, anchor, ioret);
911         }
912         EXIT;
913 }
914 EXPORT_SYMBOL(cl_page_completion);
915
916 /**
917  * Notify layers that transfer formation engine decided to yank this page from
918  * the cache and to make it a part of a transfer.
919  *
920  * \pre  pg->cp_state == CPS_CACHED
921  * \post pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
922  *
923  * \see cl_page_operations::cpo_make_ready()
924  */
925 int cl_page_make_ready(const struct lu_env *env, struct cl_page *pg,
926                        enum cl_req_type crt)
927 {
928         const struct cl_page_slice *sli;
929         int result = 0;
930
931         PINVRNT(env, pg, crt < CRT_NR);
932
933         ENTRY;
934         if (crt >= CRT_NR)
935                 RETURN(-EINVAL);
936
937         list_for_each_entry(sli, &pg->cp_layers, cpl_linkage) {
938                 if (sli->cpl_ops->io[crt].cpo_make_ready != NULL)
939                         result = (*sli->cpl_ops->io[crt].cpo_make_ready)(env,
940                                                                          sli);
941                 if (result != 0)
942                         break;
943         }
944
945         if (result >= 0) {
946                 result = 0;
947                 PASSERT(env, pg, pg->cp_state == CPS_CACHED);
948                 cl_page_io_start(env, pg, crt);
949         }
950         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
951         RETURN(result);
952 }
953 EXPORT_SYMBOL(cl_page_make_ready);
954
955 /**
956  * Called if a pge is being written back by kernel's intention.
957  *
958  * \pre  cl_page_is_owned(pg, io)
959  * \post ergo(result == 0, pg->cp_state == CPS_PAGEOUT)
960  *
961  * \see cl_page_operations::cpo_flush()
962  */
963 int cl_page_flush(const struct lu_env *env, struct cl_io *io,
964                   struct cl_page *pg)
965 {
966         const struct cl_page_slice *slice;
967         int result = 0;
968
969         PINVRNT(env, pg, cl_page_is_owned(pg, io));
970         PINVRNT(env, pg, cl_page_invariant(pg));
971
972         ENTRY;
973
974         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
975                 if (slice->cpl_ops->cpo_flush != NULL)
976                         result = (*slice->cpl_ops->cpo_flush)(env, slice, io);
977                 if (result != 0)
978                         break;
979         }
980         if (result > 0)
981                 result = 0;
982
983         CL_PAGE_HEADER(D_TRACE, env, pg, "%d\n", result);
984         RETURN(result);
985 }
986 EXPORT_SYMBOL(cl_page_flush);
987
988 /**
989  * Tells transfer engine that only part of a page is to be transmitted.
990  *
991  * \see cl_page_operations::cpo_clip()
992  */
993 void cl_page_clip(const struct lu_env *env, struct cl_page *pg,
994                   int from, int to)
995 {
996         const struct cl_page_slice *slice;
997
998         PINVRNT(env, pg, cl_page_invariant(pg));
999
1000         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", from, to);
1001         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
1002                 if (slice->cpl_ops->cpo_clip != NULL)
1003                         (*slice->cpl_ops->cpo_clip)(env, slice, from, to);
1004         }
1005 }
1006 EXPORT_SYMBOL(cl_page_clip);
1007
1008 /**
1009  * Prints human readable representation of \a pg to the \a f.
1010  */
1011 void cl_page_header_print(const struct lu_env *env, void *cookie,
1012                           lu_printer_t printer, const struct cl_page *pg)
1013 {
1014         (*printer)(env, cookie,
1015                    "page@%p[%d %p %d %d %p]\n",
1016                    pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1017                    pg->cp_state, pg->cp_type,
1018                    pg->cp_owner);
1019 }
1020 EXPORT_SYMBOL(cl_page_header_print);
1021
1022 /**
1023  * Prints human readable representation of \a pg to the \a f.
1024  */
1025 void cl_page_print(const struct lu_env *env, void *cookie,
1026                    lu_printer_t printer, const struct cl_page *pg)
1027 {
1028         const struct cl_page_slice *slice;
1029         int result = 0;
1030
1031         cl_page_header_print(env, cookie, printer, pg);
1032         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
1033                 if (slice->cpl_ops->cpo_print != NULL)
1034                         result = (*slice->cpl_ops->cpo_print)(env, slice,
1035                                                              cookie, printer);
1036                 if (result != 0)
1037                         break;
1038         }
1039         (*printer)(env, cookie, "end page@%p\n", pg);
1040 }
1041 EXPORT_SYMBOL(cl_page_print);
1042
1043 /**
1044  * Cancel a page which is still in a transfer.
1045  */
1046 int cl_page_cancel(const struct lu_env *env, struct cl_page *page)
1047 {
1048         const struct cl_page_slice *slice;
1049         int                         result = 0;
1050
1051         list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
1052                 if (slice->cpl_ops->cpo_cancel != NULL)
1053                         result = (*slice->cpl_ops->cpo_cancel)(env, slice);
1054                 if (result != 0)
1055                         break;
1056         }
1057         if (result > 0)
1058                 result = 0;
1059
1060         return result;
1061 }
1062
1063 /**
1064  * Converts a byte offset within object \a obj into a page index.
1065  */
1066 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1067 {
1068         return (loff_t)idx << PAGE_SHIFT;
1069 }
1070 EXPORT_SYMBOL(cl_offset);
1071
1072 /**
1073  * Converts a page index into a byte offset within object \a obj.
1074  */
1075 pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1076 {
1077         return offset >> PAGE_SHIFT;
1078 }
1079 EXPORT_SYMBOL(cl_index);
1080
1081 size_t cl_page_size(const struct cl_object *obj)
1082 {
1083         return 1UL << PAGE_SHIFT;
1084 }
1085 EXPORT_SYMBOL(cl_page_size);
1086
1087 /**
1088  * Adds page slice to the compound page.
1089  *
1090  * This is called by cl_object_operations::coo_page_init() methods to add a
1091  * per-layer state to the page. New state is added at the end of
1092  * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1093  *
1094  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1095  */
1096 void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
1097                        struct cl_object *obj, pgoff_t index,
1098                        const struct cl_page_operations *ops)
1099 {
1100         ENTRY;
1101         list_add_tail(&slice->cpl_linkage, &page->cp_layers);
1102         slice->cpl_obj  = obj;
1103         slice->cpl_index = index;
1104         slice->cpl_ops  = ops;
1105         slice->cpl_page = page;
1106         EXIT;
1107 }
1108 EXPORT_SYMBOL(cl_page_slice_add);
1109
1110 /**
1111  * Allocate and initialize cl_cache, called by ll_init_sbi().
1112  */
1113 struct cl_client_cache *cl_cache_init(unsigned long lru_page_max)
1114 {
1115         struct cl_client_cache  *cache = NULL;
1116
1117         ENTRY;
1118         OBD_ALLOC(cache, sizeof(*cache));
1119         if (cache == NULL)
1120                 RETURN(NULL);
1121
1122         /* Initialize cache data */
1123         atomic_set(&cache->ccc_users, 1);
1124         cache->ccc_lru_max = lru_page_max;
1125         atomic_long_set(&cache->ccc_lru_left, lru_page_max);
1126         spin_lock_init(&cache->ccc_lru_lock);
1127         INIT_LIST_HEAD(&cache->ccc_lru);
1128
1129         /* turn unstable check off by default as it impacts performance */
1130         cache->ccc_unstable_check = 0;
1131         atomic_long_set(&cache->ccc_unstable_nr, 0);
1132         init_waitqueue_head(&cache->ccc_unstable_waitq);
1133
1134         RETURN(cache);
1135 }
1136 EXPORT_SYMBOL(cl_cache_init);
1137
1138 /**
1139  * Increase cl_cache refcount
1140  */
1141 void cl_cache_incref(struct cl_client_cache *cache)
1142 {
1143         atomic_inc(&cache->ccc_users);
1144 }
1145 EXPORT_SYMBOL(cl_cache_incref);
1146
1147 /**
1148  * Decrease cl_cache refcount and free the cache if refcount=0.
1149  * Since llite, lov and osc all hold cl_cache refcount,
1150  * the free will not cause race. (LU-6173)
1151  */
1152 void cl_cache_decref(struct cl_client_cache *cache)
1153 {
1154         if (atomic_dec_and_test(&cache->ccc_users))
1155                 OBD_FREE(cache, sizeof(*cache));
1156 }
1157 EXPORT_SYMBOL(cl_cache_decref);