Whamcloud - gitweb
LU-12616 obclass: fix MDS start/stop race
[fs/lustre-release.git] / lustre / obdclass / cl_page.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * Client Lustre Page.
33  *
34  *   Author: Nikita Danilov <nikita.danilov@sun.com>
35  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/list.h>
41 #include <libcfs/libcfs.h>
42 #include <obd_class.h>
43 #include <obd_support.h>
44
45 #include <cl_object.h>
46 #include "cl_internal.h"
47
48 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg);
49
50 #ifdef LIBCFS_DEBUG
51 # define PASSERT(env, page, expr)                                       \
52   do {                                                                    \
53           if (unlikely(!(expr))) {                                      \
54                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
55                   LASSERT(0);                                           \
56           }                                                             \
57   } while (0)
58 #else /* !LIBCFS_DEBUG */
59 # define PASSERT(env, page, exp) \
60         ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
61 #endif /* !LIBCFS_DEBUG */
62
63 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
64 # define PINVRNT(env, page, expr)                                       \
65   do {                                                                    \
66           if (unlikely(!(expr))) {                                      \
67                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
68                   LINVRNT(0);                                           \
69           }                                                             \
70   } while (0)
71 #else /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
72 # define PINVRNT(env, page, exp) \
73          ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
74 #endif /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
75
76 /* Disable page statistic by default due to huge performance penalty. */
77 static void cs_page_inc(const struct cl_object *obj,
78                         enum cache_stats_item item)
79 {
80 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
81         atomic_inc(&cl_object_site(obj)->cs_pages.cs_stats[item]);
82 #endif
83 }
84
85 static void cs_page_dec(const struct cl_object *obj,
86                         enum cache_stats_item item)
87 {
88 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
89         atomic_dec(&cl_object_site(obj)->cs_pages.cs_stats[item]);
90 #endif
91 }
92
93 static void cs_pagestate_inc(const struct cl_object *obj,
94                              enum cl_page_state state)
95 {
96 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
97         atomic_inc(&cl_object_site(obj)->cs_pages_state[state]);
98 #endif
99 }
100
101 static void cs_pagestate_dec(const struct cl_object *obj,
102                               enum cl_page_state state)
103 {
104 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
105         atomic_dec(&cl_object_site(obj)->cs_pages_state[state]);
106 #endif
107 }
108
109 /**
110  * Internal version of cl_page_get().
111  *
112  * This function can be used to obtain initial reference to previously
113  * unreferenced cached object. It can be called only if concurrent page
114  * reclamation is somehow prevented, e.g., by keeping a lock on a VM page,
115  * associated with \a page.
116  *
117  * Use with care! Not exported.
118  */
119 static void cl_page_get_trust(struct cl_page *page)
120 {
121         LASSERT(atomic_read(&page->cp_ref) > 0);
122         atomic_inc(&page->cp_ref);
123 }
124
125 /**
126  * Returns a slice within a page, corresponding to the given layer in the
127  * device stack.
128  *
129  * \see cl_lock_at()
130  */
131 static const struct cl_page_slice *
132 cl_page_at_trusted(const struct cl_page *page,
133                    const struct lu_device_type *dtype)
134 {
135         const struct cl_page_slice *slice;
136         ENTRY;
137
138         list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
139                 if (slice->cpl_obj->co_lu.lo_dev->ld_type == dtype)
140                         RETURN(slice);
141         }
142         RETURN(NULL);
143 }
144
145 static void cl_page_free(const struct lu_env *env, struct cl_page *page,
146                          struct pagevec *pvec)
147 {
148         struct cl_object *obj  = page->cp_obj;
149         int pagesize = cl_object_header(obj)->coh_page_bufsize;
150
151         PASSERT(env, page, list_empty(&page->cp_batch));
152         PASSERT(env, page, page->cp_owner == NULL);
153         PASSERT(env, page, page->cp_state == CPS_FREEING);
154
155         ENTRY;
156         while (!list_empty(&page->cp_layers)) {
157                 struct cl_page_slice *slice;
158
159                 slice = list_entry(page->cp_layers.next,
160                                    struct cl_page_slice, cpl_linkage);
161                 list_del_init(page->cp_layers.next);
162                 if (unlikely(slice->cpl_ops->cpo_fini != NULL))
163                         slice->cpl_ops->cpo_fini(env, slice, pvec);
164         }
165         cs_page_dec(obj, CS_total);
166         cs_pagestate_dec(obj, page->cp_state);
167         lu_object_ref_del_at(&obj->co_lu, &page->cp_obj_ref, "cl_page", page);
168         cl_object_put(env, obj);
169         lu_ref_fini(&page->cp_reference);
170         OBD_FREE(page, pagesize);
171         EXIT;
172 }
173
174 /**
175  * Helper function updating page state. This is the only place in the code
176  * where cl_page::cp_state field is mutated.
177  */
178 static inline void cl_page_state_set_trust(struct cl_page *page,
179                                            enum cl_page_state state)
180 {
181         /* bypass const. */
182         *(enum cl_page_state *)&page->cp_state = state;
183 }
184
185 struct cl_page *cl_page_alloc(const struct lu_env *env,
186                 struct cl_object *o, pgoff_t ind, struct page *vmpage,
187                 enum cl_page_type type)
188 {
189         struct cl_page          *page;
190         struct lu_object_header *head;
191
192         ENTRY;
193         OBD_ALLOC_GFP(page, cl_object_header(o)->coh_page_bufsize,
194                         GFP_NOFS);
195         if (page != NULL) {
196                 int result = 0;
197                 atomic_set(&page->cp_ref, 1);
198                 page->cp_obj = o;
199                 cl_object_get(o);
200                 lu_object_ref_add_at(&o->co_lu, &page->cp_obj_ref, "cl_page",
201                                      page);
202                 page->cp_vmpage = vmpage;
203                 cl_page_state_set_trust(page, CPS_CACHED);
204                 page->cp_type = type;
205                 INIT_LIST_HEAD(&page->cp_layers);
206                 INIT_LIST_HEAD(&page->cp_batch);
207                 lu_ref_init(&page->cp_reference);
208                 head = o->co_lu.lo_header;
209                 list_for_each_entry(o, &head->loh_layers,
210                                     co_lu.lo_linkage) {
211                         if (o->co_ops->coo_page_init != NULL) {
212                                 result = o->co_ops->coo_page_init(env, o, page,
213                                                                   ind);
214                                 if (result != 0) {
215                                         cl_page_delete0(env, page);
216                                         cl_page_free(env, page, NULL);
217                                         page = ERR_PTR(result);
218                                         break;
219                                 }
220                         }
221                 }
222                 if (result == 0) {
223                         cs_page_inc(o, CS_total);
224                         cs_page_inc(o, CS_create);
225                         cs_pagestate_dec(o, CPS_CACHED);
226                 }
227         } else {
228                 page = ERR_PTR(-ENOMEM);
229         }
230         RETURN(page);
231 }
232
233 /**
234  * Returns a cl_page with index \a idx at the object \a o, and associated with
235  * the VM page \a vmpage.
236  *
237  * This is the main entry point into the cl_page caching interface. First, a
238  * cache (implemented as a per-object radix tree) is consulted. If page is
239  * found there, it is returned immediately. Otherwise new page is allocated
240  * and returned. In any case, additional reference to page is acquired.
241  *
242  * \see cl_object_find(), cl_lock_find()
243  */
244 struct cl_page *cl_page_find(const struct lu_env *env,
245                              struct cl_object *o,
246                              pgoff_t idx, struct page *vmpage,
247                              enum cl_page_type type)
248 {
249         struct cl_page          *page = NULL;
250         struct cl_object_header *hdr;
251
252         LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
253         might_sleep();
254
255         ENTRY;
256
257         hdr = cl_object_header(o);
258         cs_page_inc(o, CS_lookup);
259
260         CDEBUG(D_PAGE, "%lu@"DFID" %p %lx %d\n",
261                idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
262         /* fast path. */
263         if (type == CPT_CACHEABLE) {
264                 /* vmpage lock is used to protect the child/parent
265                  * relationship */
266                 KLASSERT(PageLocked(vmpage));
267                 /*
268                  * cl_vmpage_page() can be called here without any locks as
269                  *
270                  *     - "vmpage" is locked (which prevents ->private from
271                  *       concurrent updates), and
272                  *
273                  *     - "o" cannot be destroyed while current thread holds a
274                  *       reference on it.
275                  */
276                 page = cl_vmpage_page(vmpage, o);
277                 if (page != NULL) {
278                         cs_page_inc(o, CS_hit);
279                         RETURN(page);
280                 }
281         }
282
283         /* allocate and initialize cl_page */
284         page = cl_page_alloc(env, o, idx, vmpage, type);
285         RETURN(page);
286 }
287 EXPORT_SYMBOL(cl_page_find);
288
289 static inline int cl_page_invariant(const struct cl_page *pg)
290 {
291         return cl_page_in_use_noref(pg);
292 }
293
294 static void cl_page_state_set0(const struct lu_env *env,
295                                struct cl_page *page, enum cl_page_state state)
296 {
297         enum cl_page_state old;
298
299         /*
300          * Matrix of allowed state transitions [old][new], for sanity
301          * checking.
302          */
303         static const int allowed_transitions[CPS_NR][CPS_NR] = {
304                 [CPS_CACHED] = {
305                         [CPS_CACHED]  = 0,
306                         [CPS_OWNED]   = 1, /* io finds existing cached page */
307                         [CPS_PAGEIN]  = 0,
308                         [CPS_PAGEOUT] = 1, /* write-out from the cache */
309                         [CPS_FREEING] = 1, /* eviction on the memory pressure */
310                 },
311                 [CPS_OWNED] = {
312                         [CPS_CACHED]  = 1, /* release to the cache */
313                         [CPS_OWNED]   = 0,
314                         [CPS_PAGEIN]  = 1, /* start read immediately */
315                         [CPS_PAGEOUT] = 1, /* start write immediately */
316                         [CPS_FREEING] = 1, /* lock invalidation or truncate */
317                 },
318                 [CPS_PAGEIN] = {
319                         [CPS_CACHED]  = 1, /* io completion */
320                         [CPS_OWNED]   = 0,
321                         [CPS_PAGEIN]  = 0,
322                         [CPS_PAGEOUT] = 0,
323                         [CPS_FREEING] = 0,
324                 },
325                 [CPS_PAGEOUT] = {
326                         [CPS_CACHED]  = 1, /* io completion */
327                         [CPS_OWNED]   = 0,
328                         [CPS_PAGEIN]  = 0,
329                         [CPS_PAGEOUT] = 0,
330                         [CPS_FREEING] = 0,
331                 },
332                 [CPS_FREEING] = {
333                         [CPS_CACHED]  = 0,
334                         [CPS_OWNED]   = 0,
335                         [CPS_PAGEIN]  = 0,
336                         [CPS_PAGEOUT] = 0,
337                         [CPS_FREEING] = 0,
338                 }
339         };
340
341         ENTRY;
342         old = page->cp_state;
343         PASSERT(env, page, allowed_transitions[old][state]);
344         CL_PAGE_HEADER(D_TRACE, env, page, "%d -> %d\n", old, state);
345         PASSERT(env, page, page->cp_state == old);
346         PASSERT(env, page, equi(state == CPS_OWNED, page->cp_owner != NULL));
347
348         cs_pagestate_dec(page->cp_obj, page->cp_state);
349         cs_pagestate_inc(page->cp_obj, state);
350         cl_page_state_set_trust(page, state);
351         EXIT;
352 }
353
354 static void cl_page_state_set(const struct lu_env *env,
355                               struct cl_page *page, enum cl_page_state state)
356 {
357         cl_page_state_set0(env, page, state);
358 }
359
360 /**
361  * Acquires an additional reference to a page.
362  *
363  * This can be called only by caller already possessing a reference to \a
364  * page.
365  *
366  * \see cl_object_get(), cl_lock_get().
367  */
368 void cl_page_get(struct cl_page *page)
369 {
370         ENTRY;
371         cl_page_get_trust(page);
372         EXIT;
373 }
374 EXPORT_SYMBOL(cl_page_get);
375
376 /**
377  * Releases a reference to a page, use the pagevec to release the pages
378  * in batch if provided.
379  *
380  * Users need to do a final pagevec_release() to release any trailing pages.
381  */
382 void cl_pagevec_put(const struct lu_env *env, struct cl_page *page,
383                   struct pagevec *pvec)
384 {
385         ENTRY;
386         CL_PAGE_HEADER(D_TRACE, env, page, "%d\n",
387                        atomic_read(&page->cp_ref));
388
389         if (atomic_dec_and_test(&page->cp_ref)) {
390                 LASSERT(page->cp_state == CPS_FREEING);
391
392                 LASSERT(atomic_read(&page->cp_ref) == 0);
393                 PASSERT(env, page, page->cp_owner == NULL);
394                 PASSERT(env, page, list_empty(&page->cp_batch));
395                 /*
396                  * Page is no longer reachable by other threads. Tear
397                  * it down.
398                  */
399                 cl_page_free(env, page, pvec);
400         }
401
402         EXIT;
403 }
404 EXPORT_SYMBOL(cl_pagevec_put);
405
406 /**
407  * Releases a reference to a page, wrapper to cl_pagevec_put
408  *
409  * When last reference is released, page is returned to the cache, unless it
410  * is in cl_page_state::CPS_FREEING state, in which case it is immediately
411  * destroyed.
412  *
413  * \see cl_object_put(), cl_lock_put().
414  */
415 void cl_page_put(const struct lu_env *env, struct cl_page *page)
416 {
417         cl_pagevec_put(env, page, NULL);
418 }
419 EXPORT_SYMBOL(cl_page_put);
420
421 /**
422  * Returns a cl_page associated with a VM page, and given cl_object.
423  */
424 struct cl_page *cl_vmpage_page(struct page *vmpage, struct cl_object *obj)
425 {
426         struct cl_page *page;
427
428         ENTRY;
429         KLASSERT(PageLocked(vmpage));
430
431         /*
432          * NOTE: absence of races and liveness of data are guaranteed by page
433          *       lock on a "vmpage". That works because object destruction has
434          *       bottom-to-top pass.
435          */
436
437         page = (struct cl_page *)vmpage->private;
438         if (page != NULL) {
439                 cl_page_get_trust(page);
440                 LASSERT(page->cp_type == CPT_CACHEABLE);
441         }
442         RETURN(page);
443 }
444 EXPORT_SYMBOL(cl_vmpage_page);
445
446 const struct cl_page_slice *cl_page_at(const struct cl_page *page,
447                                        const struct lu_device_type *dtype)
448 {
449         return cl_page_at_trusted(page, dtype);
450 }
451 EXPORT_SYMBOL(cl_page_at);
452
453 static void cl_page_owner_clear(struct cl_page *page)
454 {
455         ENTRY;
456         if (page->cp_owner != NULL) {
457                 LASSERT(page->cp_owner->ci_owned_nr > 0);
458                 page->cp_owner->ci_owned_nr--;
459                 page->cp_owner = NULL;
460         }
461         EXIT;
462 }
463
464 static void cl_page_owner_set(struct cl_page *page)
465 {
466         ENTRY;
467         LASSERT(page->cp_owner != NULL);
468         page->cp_owner->ci_owned_nr++;
469         EXIT;
470 }
471
472 void cl_page_disown0(const struct lu_env *env,
473                      struct cl_io *io, struct cl_page *pg)
474 {
475         const struct cl_page_slice *slice;
476         enum cl_page_state state;
477
478         ENTRY;
479         state = pg->cp_state;
480         PINVRNT(env, pg, state == CPS_OWNED || state == CPS_FREEING);
481         PINVRNT(env, pg, cl_page_invariant(pg) || state == CPS_FREEING);
482         cl_page_owner_clear(pg);
483
484         if (state == CPS_OWNED)
485                 cl_page_state_set(env, pg, CPS_CACHED);
486         /*
487          * Completion call-backs are executed in the bottom-up order, so that
488          * uppermost layer (llite), responsible for VFS/VM interaction runs
489          * last and can release locks safely.
490          */
491         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
492                 if (slice->cpl_ops->cpo_disown != NULL)
493                         (*slice->cpl_ops->cpo_disown)(env, slice, io);
494         }
495
496         EXIT;
497 }
498
499 /**
500  * returns true, iff page is owned by the given io.
501  */
502 int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
503 {
504         struct cl_io *top = cl_io_top((struct cl_io *)io);
505         LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
506         ENTRY;
507         RETURN(pg->cp_state == CPS_OWNED && pg->cp_owner == top);
508 }
509 EXPORT_SYMBOL(cl_page_is_owned);
510
511 /**
512  * Try to own a page by IO.
513  *
514  * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
515  * into cl_page_state::CPS_OWNED state.
516  *
517  * \pre  !cl_page_is_owned(pg, io)
518  * \post result == 0 iff cl_page_is_owned(pg, io)
519  *
520  * \retval 0   success
521  *
522  * \retval -ve failure, e.g., page was destroyed (and landed in
523  *             cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
524  *             or, page was owned by another thread, or in IO.
525  *
526  * \see cl_page_disown()
527  * \see cl_page_operations::cpo_own()
528  * \see cl_page_own_try()
529  * \see cl_page_own
530  */
531 static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
532                         struct cl_page *pg, int nonblock)
533 {
534         int result = 0;
535         const struct cl_page_slice *slice;
536
537         PINVRNT(env, pg, !cl_page_is_owned(pg, io));
538
539         ENTRY;
540         io = cl_io_top(io);
541
542         if (pg->cp_state == CPS_FREEING) {
543                 result = -ENOENT;
544                 goto out;
545         }
546
547         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
548                 if (slice->cpl_ops->cpo_own)
549                         result = (*slice->cpl_ops->cpo_own)(env, slice,
550                                                             io, nonblock);
551
552                 if (result != 0)
553                         break;
554
555         }
556         if (result > 0)
557                 result = 0;
558
559         if (result == 0) {
560                 PASSERT(env, pg, pg->cp_owner == NULL);
561                 pg->cp_owner = cl_io_top(io);
562                 cl_page_owner_set(pg);
563                 if (pg->cp_state != CPS_FREEING) {
564                         cl_page_state_set(env, pg, CPS_OWNED);
565                 } else {
566                         cl_page_disown0(env, io, pg);
567                         result = -ENOENT;
568                 }
569         }
570
571 out:
572         PINVRNT(env, pg, ergo(result == 0, cl_page_invariant(pg)));
573         RETURN(result);
574 }
575
576 /**
577  * Own a page, might be blocked.
578  *
579  * \see cl_page_own0()
580  */
581 int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
582 {
583         return cl_page_own0(env, io, pg, 0);
584 }
585 EXPORT_SYMBOL(cl_page_own);
586
587 /**
588  * Nonblock version of cl_page_own().
589  *
590  * \see cl_page_own0()
591  */
592 int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
593                     struct cl_page *pg)
594 {
595         return cl_page_own0(env, io, pg, 1);
596 }
597 EXPORT_SYMBOL(cl_page_own_try);
598
599
600 /**
601  * Assume page ownership.
602  *
603  * Called when page is already locked by the hosting VM.
604  *
605  * \pre !cl_page_is_owned(pg, io)
606  * \post cl_page_is_owned(pg, io)
607  *
608  * \see cl_page_operations::cpo_assume()
609  */
610 void cl_page_assume(const struct lu_env *env,
611                     struct cl_io *io, struct cl_page *pg)
612 {
613         const struct cl_page_slice *slice;
614
615         PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj));
616
617         ENTRY;
618         io = cl_io_top(io);
619
620         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
621                 if (slice->cpl_ops->cpo_assume != NULL)
622                         (*slice->cpl_ops->cpo_assume)(env, slice, io);
623         }
624
625         PASSERT(env, pg, pg->cp_owner == NULL);
626         pg->cp_owner = cl_io_top(io);
627         cl_page_owner_set(pg);
628         cl_page_state_set(env, pg, CPS_OWNED);
629         EXIT;
630 }
631 EXPORT_SYMBOL(cl_page_assume);
632
633 /**
634  * Releases page ownership without unlocking the page.
635  *
636  * Moves page into cl_page_state::CPS_CACHED without releasing a lock on the
637  * underlying VM page (as VM is supposed to do this itself).
638  *
639  * \pre   cl_page_is_owned(pg, io)
640  * \post !cl_page_is_owned(pg, io)
641  *
642  * \see cl_page_assume()
643  */
644 void cl_page_unassume(const struct lu_env *env,
645                       struct cl_io *io, struct cl_page *pg)
646 {
647         const struct cl_page_slice *slice;
648
649         PINVRNT(env, pg, cl_page_is_owned(pg, io));
650         PINVRNT(env, pg, cl_page_invariant(pg));
651
652         ENTRY;
653         io = cl_io_top(io);
654         cl_page_owner_clear(pg);
655         cl_page_state_set(env, pg, CPS_CACHED);
656
657         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
658                 if (slice->cpl_ops->cpo_unassume != NULL)
659                         (*slice->cpl_ops->cpo_unassume)(env, slice, io);
660         }
661
662         EXIT;
663 }
664 EXPORT_SYMBOL(cl_page_unassume);
665
666 /**
667  * Releases page ownership.
668  *
669  * Moves page into cl_page_state::CPS_CACHED.
670  *
671  * \pre   cl_page_is_owned(pg, io)
672  * \post !cl_page_is_owned(pg, io)
673  *
674  * \see cl_page_own()
675  * \see cl_page_operations::cpo_disown()
676  */
677 void cl_page_disown(const struct lu_env *env,
678                     struct cl_io *io, struct cl_page *pg)
679 {
680         PINVRNT(env, pg, cl_page_is_owned(pg, io) ||
681                 pg->cp_state == CPS_FREEING);
682
683         ENTRY;
684         io = cl_io_top(io);
685         cl_page_disown0(env, io, pg);
686         EXIT;
687 }
688 EXPORT_SYMBOL(cl_page_disown);
689
690 /**
691  * Called when page is to be removed from the object, e.g., as a result of
692  * truncate.
693  *
694  * Calls cl_page_operations::cpo_discard() top-to-bottom.
695  *
696  * \pre cl_page_is_owned(pg, io)
697  *
698  * \see cl_page_operations::cpo_discard()
699  */
700 void cl_page_discard(const struct lu_env *env,
701                      struct cl_io *io, struct cl_page *pg)
702 {
703         const struct cl_page_slice *slice;
704
705         PINVRNT(env, pg, cl_page_is_owned(pg, io));
706         PINVRNT(env, pg, cl_page_invariant(pg));
707
708         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
709                 if (slice->cpl_ops->cpo_discard != NULL)
710                         (*slice->cpl_ops->cpo_discard)(env, slice, io);
711         }
712 }
713 EXPORT_SYMBOL(cl_page_discard);
714
715 /**
716  * Version of cl_page_delete() that can be called for not fully constructed
717  * pages, e.g. in an error handling cl_page_find()->cl_page_delete0()
718  * path. Doesn't check page invariant.
719  */
720 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg)
721 {
722         const struct cl_page_slice *slice;
723
724         ENTRY;
725
726         PASSERT(env, pg, pg->cp_state != CPS_FREEING);
727
728         /*
729          * Severe all ways to obtain new pointers to @pg.
730          */
731         cl_page_owner_clear(pg);
732         cl_page_state_set0(env, pg, CPS_FREEING);
733
734         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
735                 if (slice->cpl_ops->cpo_delete != NULL)
736                         (*slice->cpl_ops->cpo_delete)(env, slice);
737         }
738
739         EXIT;
740 }
741
742 /**
743  * Called when a decision is made to throw page out of memory.
744  *
745  * Notifies all layers about page destruction by calling
746  * cl_page_operations::cpo_delete() method top-to-bottom.
747  *
748  * Moves page into cl_page_state::CPS_FREEING state (this is the only place
749  * where transition to this state happens).
750  *
751  * Eliminates all venues through which new references to the page can be
752  * obtained:
753  *
754  *     - removes page from the radix trees,
755  *
756  *     - breaks linkage from VM page to cl_page.
757  *
758  * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
759  * drain after some time, at which point page will be recycled.
760  *
761  * \pre  VM page is locked
762  * \post pg->cp_state == CPS_FREEING
763  *
764  * \see cl_page_operations::cpo_delete()
765  */
766 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
767 {
768         PINVRNT(env, pg, cl_page_invariant(pg));
769         ENTRY;
770         cl_page_delete0(env, pg);
771         EXIT;
772 }
773 EXPORT_SYMBOL(cl_page_delete);
774
775 /**
776  * Marks page up-to-date.
777  *
778  * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
779  * layer responsible for VM interaction has to mark/clear page as up-to-date
780  * by the \a uptodate argument.
781  *
782  * \see cl_page_operations::cpo_export()
783  */
784 void cl_page_export(const struct lu_env *env, struct cl_page *pg, int uptodate)
785 {
786         const struct cl_page_slice *slice;
787
788         PINVRNT(env, pg, cl_page_invariant(pg));
789
790         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
791                 if (slice->cpl_ops->cpo_export != NULL)
792                         (*slice->cpl_ops->cpo_export)(env, slice, uptodate);
793         }
794 }
795 EXPORT_SYMBOL(cl_page_export);
796
797 /**
798  * Returns true, iff \a pg is VM locked in a suitable sense by the calling
799  * thread.
800  */
801 int cl_page_is_vmlocked(const struct lu_env *env, const struct cl_page *pg)
802 {
803         const struct cl_page_slice *slice;
804         int result;
805
806         ENTRY;
807         slice = container_of(pg->cp_layers.next,
808                              const struct cl_page_slice, cpl_linkage);
809         PASSERT(env, pg, slice->cpl_ops->cpo_is_vmlocked != NULL);
810         /*
811          * Call ->cpo_is_vmlocked() directly instead of going through
812          * CL_PAGE_INVOKE(), because cl_page_is_vmlocked() is used by
813          * cl_page_invariant().
814          */
815         result = slice->cpl_ops->cpo_is_vmlocked(env, slice);
816         PASSERT(env, pg, result == -EBUSY || result == -ENODATA);
817         RETURN(result == -EBUSY);
818 }
819 EXPORT_SYMBOL(cl_page_is_vmlocked);
820
821 void cl_page_touch(const struct lu_env *env, const struct cl_page *pg,
822                   size_t to)
823 {
824         const struct cl_page_slice *slice;
825
826         ENTRY;
827
828         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
829                 if (slice->cpl_ops->cpo_page_touch != NULL)
830                         (*slice->cpl_ops->cpo_page_touch)(env, slice, to);
831         }
832
833         EXIT;
834 }
835 EXPORT_SYMBOL(cl_page_touch);
836
837 static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
838 {
839         ENTRY;
840         RETURN(crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN);
841 }
842
843 static void cl_page_io_start(const struct lu_env *env,
844                              struct cl_page *pg, enum cl_req_type crt)
845 {
846         /*
847          * Page is queued for IO, change its state.
848          */
849         ENTRY;
850         cl_page_owner_clear(pg);
851         cl_page_state_set(env, pg, cl_req_type_state(crt));
852         EXIT;
853 }
854
855 /**
856  * Prepares page for immediate transfer. cl_page_operations::cpo_prep() is
857  * called top-to-bottom. Every layer either agrees to submit this page (by
858  * returning 0), or requests to omit this page (by returning -EALREADY). Layer
859  * handling interactions with the VM also has to inform VM that page is under
860  * transfer now.
861  */
862 int cl_page_prep(const struct lu_env *env, struct cl_io *io,
863                  struct cl_page *pg, enum cl_req_type crt)
864 {
865         const struct cl_page_slice *slice;
866         int result = 0;
867
868         PINVRNT(env, pg, cl_page_is_owned(pg, io));
869         PINVRNT(env, pg, cl_page_invariant(pg));
870         PINVRNT(env, pg, crt < CRT_NR);
871
872         /*
873          * XXX this has to be called bottom-to-top, so that llite can set up
874          * PG_writeback without risking other layers deciding to skip this
875          * page.
876          */
877         if (crt >= CRT_NR)
878                 return -EINVAL;
879
880         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
881                 if (slice->cpl_ops->cpo_own)
882                         result = (*slice->cpl_ops->io[crt].cpo_prep)(env,
883                                                                      slice,
884                                                                      io);
885
886                 if (result != 0)
887                         break;
888
889         }
890
891         if (result >= 0) {
892                 result = 0;
893                 cl_page_io_start(env, pg, crt);
894         }
895
896         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
897         return result;
898 }
899 EXPORT_SYMBOL(cl_page_prep);
900
901 /**
902  * Notify layers about transfer completion.
903  *
904  * Invoked by transfer sub-system (which is a part of osc) to notify layers
905  * that a transfer, of which this page is a part of has completed.
906  *
907  * Completion call-backs are executed in the bottom-up order, so that
908  * uppermost layer (llite), responsible for the VFS/VM interaction runs last
909  * and can release locks safely.
910  *
911  * \pre  pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
912  * \post pg->cp_state == CPS_CACHED
913  *
914  * \see cl_page_operations::cpo_completion()
915  */
916 void cl_page_completion(const struct lu_env *env,
917                         struct cl_page *pg, enum cl_req_type crt, int ioret)
918 {
919         const struct cl_page_slice *slice;
920         struct cl_sync_io *anchor = pg->cp_sync_io;
921
922         PASSERT(env, pg, crt < CRT_NR);
923         PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
924
925         ENTRY;
926         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, ioret);
927         cl_page_state_set(env, pg, CPS_CACHED);
928         if (crt >= CRT_NR)
929                 return;
930
931         list_for_each_entry_reverse(slice, &pg->cp_layers, cpl_linkage) {
932                 if (slice->cpl_ops->io[crt].cpo_completion != NULL)
933                         (*slice->cpl_ops->io[crt].cpo_completion)(env, slice,
934                                                                   ioret);
935         }
936
937         if (anchor != NULL) {
938                 LASSERT(pg->cp_sync_io == anchor);
939                 pg->cp_sync_io = NULL;
940                 cl_sync_io_note(env, anchor, ioret);
941         }
942         EXIT;
943 }
944 EXPORT_SYMBOL(cl_page_completion);
945
946 /**
947  * Notify layers that transfer formation engine decided to yank this page from
948  * the cache and to make it a part of a transfer.
949  *
950  * \pre  pg->cp_state == CPS_CACHED
951  * \post pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
952  *
953  * \see cl_page_operations::cpo_make_ready()
954  */
955 int cl_page_make_ready(const struct lu_env *env, struct cl_page *pg,
956                        enum cl_req_type crt)
957 {
958         const struct cl_page_slice *sli;
959         int result = 0;
960
961         PINVRNT(env, pg, crt < CRT_NR);
962
963         ENTRY;
964         if (crt >= CRT_NR)
965                 RETURN(-EINVAL);
966
967         list_for_each_entry(sli, &pg->cp_layers, cpl_linkage) {
968                 if (sli->cpl_ops->io[crt].cpo_make_ready != NULL)
969                         result = (*sli->cpl_ops->io[crt].cpo_make_ready)(env,
970                                                                          sli);
971                 if (result != 0)
972                         break;
973         }
974
975         if (result >= 0) {
976                 result = 0;
977                 PASSERT(env, pg, pg->cp_state == CPS_CACHED);
978                 cl_page_io_start(env, pg, crt);
979         }
980         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
981         RETURN(result);
982 }
983 EXPORT_SYMBOL(cl_page_make_ready);
984
985 /**
986  * Called if a pge is being written back by kernel's intention.
987  *
988  * \pre  cl_page_is_owned(pg, io)
989  * \post ergo(result == 0, pg->cp_state == CPS_PAGEOUT)
990  *
991  * \see cl_page_operations::cpo_flush()
992  */
993 int cl_page_flush(const struct lu_env *env, struct cl_io *io,
994                   struct cl_page *pg)
995 {
996         const struct cl_page_slice *slice;
997         int result = 0;
998
999         PINVRNT(env, pg, cl_page_is_owned(pg, io));
1000         PINVRNT(env, pg, cl_page_invariant(pg));
1001
1002         ENTRY;
1003
1004         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
1005                 if (slice->cpl_ops->cpo_flush != NULL)
1006                         result = (*slice->cpl_ops->cpo_flush)(env, slice, io);
1007                 if (result != 0)
1008                         break;
1009         }
1010         if (result > 0)
1011                 result = 0;
1012
1013         CL_PAGE_HEADER(D_TRACE, env, pg, "%d\n", result);
1014         RETURN(result);
1015 }
1016 EXPORT_SYMBOL(cl_page_flush);
1017
1018 /**
1019  * Tells transfer engine that only part of a page is to be transmitted.
1020  *
1021  * \see cl_page_operations::cpo_clip()
1022  */
1023 void cl_page_clip(const struct lu_env *env, struct cl_page *pg,
1024                   int from, int to)
1025 {
1026         const struct cl_page_slice *slice;
1027
1028         PINVRNT(env, pg, cl_page_invariant(pg));
1029
1030         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", from, to);
1031         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
1032                 if (slice->cpl_ops->cpo_clip != NULL)
1033                         (*slice->cpl_ops->cpo_clip)(env, slice, from, to);
1034         }
1035 }
1036 EXPORT_SYMBOL(cl_page_clip);
1037
1038 /**
1039  * Prints human readable representation of \a pg to the \a f.
1040  */
1041 void cl_page_header_print(const struct lu_env *env, void *cookie,
1042                           lu_printer_t printer, const struct cl_page *pg)
1043 {
1044         (*printer)(env, cookie,
1045                    "page@%p[%d %p %d %d %p]\n",
1046                    pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1047                    pg->cp_state, pg->cp_type,
1048                    pg->cp_owner);
1049 }
1050 EXPORT_SYMBOL(cl_page_header_print);
1051
1052 /**
1053  * Prints human readable representation of \a pg to the \a f.
1054  */
1055 void cl_page_print(const struct lu_env *env, void *cookie,
1056                    lu_printer_t printer, const struct cl_page *pg)
1057 {
1058         const struct cl_page_slice *slice;
1059         int result = 0;
1060
1061         cl_page_header_print(env, cookie, printer, pg);
1062         list_for_each_entry(slice, &pg->cp_layers, cpl_linkage) {
1063                 if (slice->cpl_ops->cpo_print != NULL)
1064                         result = (*slice->cpl_ops->cpo_print)(env, slice,
1065                                                              cookie, printer);
1066                 if (result != 0)
1067                         break;
1068         }
1069         (*printer)(env, cookie, "end page@%p\n", pg);
1070 }
1071 EXPORT_SYMBOL(cl_page_print);
1072
1073 /**
1074  * Cancel a page which is still in a transfer.
1075  */
1076 int cl_page_cancel(const struct lu_env *env, struct cl_page *page)
1077 {
1078         const struct cl_page_slice *slice;
1079         int                         result = 0;
1080
1081         list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
1082                 if (slice->cpl_ops->cpo_cancel != NULL)
1083                         result = (*slice->cpl_ops->cpo_cancel)(env, slice);
1084                 if (result != 0)
1085                         break;
1086         }
1087         if (result > 0)
1088                 result = 0;
1089
1090         return result;
1091 }
1092
1093 /**
1094  * Converts a byte offset within object \a obj into a page index.
1095  */
1096 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1097 {
1098         return (loff_t)idx << PAGE_SHIFT;
1099 }
1100 EXPORT_SYMBOL(cl_offset);
1101
1102 /**
1103  * Converts a page index into a byte offset within object \a obj.
1104  */
1105 pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1106 {
1107         return offset >> PAGE_SHIFT;
1108 }
1109 EXPORT_SYMBOL(cl_index);
1110
1111 size_t cl_page_size(const struct cl_object *obj)
1112 {
1113         return 1UL << PAGE_SHIFT;
1114 }
1115 EXPORT_SYMBOL(cl_page_size);
1116
1117 /**
1118  * Adds page slice to the compound page.
1119  *
1120  * This is called by cl_object_operations::coo_page_init() methods to add a
1121  * per-layer state to the page. New state is added at the end of
1122  * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1123  *
1124  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1125  */
1126 void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
1127                        struct cl_object *obj, pgoff_t index,
1128                        const struct cl_page_operations *ops)
1129 {
1130         ENTRY;
1131         list_add_tail(&slice->cpl_linkage, &page->cp_layers);
1132         slice->cpl_obj  = obj;
1133         slice->cpl_index = index;
1134         slice->cpl_ops  = ops;
1135         slice->cpl_page = page;
1136         EXIT;
1137 }
1138 EXPORT_SYMBOL(cl_page_slice_add);
1139
1140 /**
1141  * Allocate and initialize cl_cache, called by ll_init_sbi().
1142  */
1143 struct cl_client_cache *cl_cache_init(unsigned long lru_page_max)
1144 {
1145         struct cl_client_cache  *cache = NULL;
1146
1147         ENTRY;
1148         OBD_ALLOC(cache, sizeof(*cache));
1149         if (cache == NULL)
1150                 RETURN(NULL);
1151
1152         /* Initialize cache data */
1153         atomic_set(&cache->ccc_users, 1);
1154         cache->ccc_lru_max = lru_page_max;
1155         atomic_long_set(&cache->ccc_lru_left, lru_page_max);
1156         spin_lock_init(&cache->ccc_lru_lock);
1157         INIT_LIST_HEAD(&cache->ccc_lru);
1158
1159         /* turn unstable check off by default as it impacts performance */
1160         cache->ccc_unstable_check = 0;
1161         atomic_long_set(&cache->ccc_unstable_nr, 0);
1162         init_waitqueue_head(&cache->ccc_unstable_waitq);
1163
1164         RETURN(cache);
1165 }
1166 EXPORT_SYMBOL(cl_cache_init);
1167
1168 /**
1169  * Increase cl_cache refcount
1170  */
1171 void cl_cache_incref(struct cl_client_cache *cache)
1172 {
1173         atomic_inc(&cache->ccc_users);
1174 }
1175 EXPORT_SYMBOL(cl_cache_incref);
1176
1177 /**
1178  * Decrease cl_cache refcount and free the cache if refcount=0.
1179  * Since llite, lov and osc all hold cl_cache refcount,
1180  * the free will not cause race. (LU-6173)
1181  */
1182 void cl_cache_decref(struct cl_client_cache *cache)
1183 {
1184         if (atomic_dec_and_test(&cache->ccc_users))
1185                 OBD_FREE(cache, sizeof(*cache));
1186 }
1187 EXPORT_SYMBOL(cl_cache_decref);