Whamcloud - gitweb
LU-14306 sec: get rid of bad rss-counter state messages
[fs/lustre-release.git] / lustre / obdclass / cl_page.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * Client Lustre Page.
33  *
34  *   Author: Nikita Danilov <nikita.danilov@sun.com>
35  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/list.h>
41 #include <libcfs/libcfs.h>
42 #include <obd_class.h>
43 #include <obd_support.h>
44
45 #include <cl_object.h>
46 #include "cl_internal.h"
47
48 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg);
49 static DEFINE_MUTEX(cl_page_kmem_mutex);
50
51 #ifdef LIBCFS_DEBUG
52 # define PASSERT(env, page, expr)                                       \
53   do {                                                                    \
54           if (unlikely(!(expr))) {                                      \
55                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
56                   LASSERT(0);                                           \
57           }                                                             \
58   } while (0)
59 #else /* !LIBCFS_DEBUG */
60 # define PASSERT(env, page, exp) \
61         ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
62 #endif /* !LIBCFS_DEBUG */
63
64 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
65 # define PINVRNT(env, page, expr)                                       \
66   do {                                                                    \
67           if (unlikely(!(expr))) {                                      \
68                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
69                   LINVRNT(0);                                           \
70           }                                                             \
71   } while (0)
72 #else /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
73 # define PINVRNT(env, page, exp) \
74          ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
75 #endif /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
76
77 /* Disable page statistic by default due to huge performance penalty. */
78 static void cs_page_inc(const struct cl_object *obj,
79                         enum cache_stats_item item)
80 {
81 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
82         atomic_inc(&cl_object_site(obj)->cs_pages.cs_stats[item]);
83 #endif
84 }
85
86 static void cs_page_dec(const struct cl_object *obj,
87                         enum cache_stats_item item)
88 {
89 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
90         atomic_dec(&cl_object_site(obj)->cs_pages.cs_stats[item]);
91 #endif
92 }
93
94 static void cs_pagestate_inc(const struct cl_object *obj,
95                              enum cl_page_state state)
96 {
97 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
98         atomic_inc(&cl_object_site(obj)->cs_pages_state[state]);
99 #endif
100 }
101
102 static void cs_pagestate_dec(const struct cl_object *obj,
103                               enum cl_page_state state)
104 {
105 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
106         atomic_dec(&cl_object_site(obj)->cs_pages_state[state]);
107 #endif
108 }
109
110 /**
111  * Internal version of cl_page_get().
112  *
113  * This function can be used to obtain initial reference to previously
114  * unreferenced cached object. It can be called only if concurrent page
115  * reclamation is somehow prevented, e.g., by keeping a lock on a VM page,
116  * associated with \a page.
117  *
118  * Use with care! Not exported.
119  */
120 static void cl_page_get_trust(struct cl_page *page)
121 {
122         LASSERT(atomic_read(&page->cp_ref) > 0);
123         atomic_inc(&page->cp_ref);
124 }
125
126 static struct cl_page_slice *
127 cl_page_slice_get(const struct cl_page *cl_page, int index)
128 {
129         if (index < 0 || index >= cl_page->cp_layer_count)
130                 return NULL;
131
132         /* To get the cp_layer_offset values fit under 256 bytes, we
133          * use the offset beyond the end of struct cl_page.
134          */
135         return (struct cl_page_slice *)((char *)cl_page + sizeof(*cl_page) +
136                                         cl_page->cp_layer_offset[index]);
137 }
138
139 #define cl_page_slice_for_each(cl_page, slice, i)               \
140         for (i = 0, slice = cl_page_slice_get(cl_page, 0);      \
141              i < (cl_page)->cp_layer_count;                     \
142              slice = cl_page_slice_get(cl_page, ++i))
143
144 #define cl_page_slice_for_each_reverse(cl_page, slice, i)       \
145         for (i = (cl_page)->cp_layer_count - 1,                 \
146              slice = cl_page_slice_get(cl_page, i); i >= 0;     \
147              slice = cl_page_slice_get(cl_page, --i))
148
149 /**
150  * Returns a slice within a cl_page, corresponding to the given layer in the
151  * device stack.
152  *
153  * \see cl_lock_at()
154  */
155 static const struct cl_page_slice *
156 cl_page_at_trusted(const struct cl_page *cl_page,
157                    const struct lu_device_type *dtype)
158 {
159         const struct cl_page_slice *slice;
160         int i;
161
162         ENTRY;
163
164         cl_page_slice_for_each(cl_page, slice, i) {
165                 if (slice->cpl_obj->co_lu.lo_dev->ld_type == dtype)
166                         RETURN(slice);
167         }
168
169         RETURN(NULL);
170 }
171
172 static void __cl_page_free(struct cl_page *cl_page, unsigned short bufsize)
173 {
174         int index = cl_page->cp_kmem_index;
175
176         if (index >= 0) {
177                 LASSERT(index < ARRAY_SIZE(cl_page_kmem_array));
178                 LASSERT(cl_page_kmem_size_array[index] == bufsize);
179                 OBD_SLAB_FREE(cl_page, cl_page_kmem_array[index], bufsize);
180         } else {
181                 OBD_FREE(cl_page, bufsize);
182         }
183 }
184
185 static void cl_page_free(const struct lu_env *env, struct cl_page *cl_page,
186                          struct pagevec *pvec)
187 {
188         struct cl_object *obj  = cl_page->cp_obj;
189         unsigned short bufsize = cl_object_header(obj)->coh_page_bufsize;
190         struct cl_page_slice *slice;
191         int i;
192
193         ENTRY;
194         PASSERT(env, cl_page, list_empty(&cl_page->cp_batch));
195         PASSERT(env, cl_page, cl_page->cp_owner == NULL);
196         PASSERT(env, cl_page, cl_page->cp_state == CPS_FREEING);
197
198         cl_page_slice_for_each(cl_page, slice, i) {
199                 if (unlikely(slice->cpl_ops->cpo_fini != NULL))
200                         slice->cpl_ops->cpo_fini(env, slice, pvec);
201         }
202         cl_page->cp_layer_count = 0;
203         cs_page_dec(obj, CS_total);
204         cs_pagestate_dec(obj, cl_page->cp_state);
205         lu_object_ref_del_at(&obj->co_lu, &cl_page->cp_obj_ref,
206                              "cl_page", cl_page);
207         cl_object_put(env, obj);
208         lu_ref_fini(&cl_page->cp_reference);
209         __cl_page_free(cl_page, bufsize);
210         EXIT;
211 }
212
213 static struct cl_page *__cl_page_alloc(struct cl_object *o)
214 {
215         int i = 0;
216         struct cl_page *cl_page = NULL;
217         unsigned short bufsize = cl_object_header(o)->coh_page_bufsize;
218
219 check:
220         /* the number of entries in cl_page_kmem_array is expected to
221          * only be 2-3 entries, so the lookup overhead should be low.
222          */
223         for ( ; i < ARRAY_SIZE(cl_page_kmem_array); i++) {
224                 if (smp_load_acquire(&cl_page_kmem_size_array[i])
225                     == bufsize) {
226                         OBD_SLAB_ALLOC_GFP(cl_page, cl_page_kmem_array[i],
227                                            bufsize, GFP_NOFS);
228                         if (cl_page)
229                                 cl_page->cp_kmem_index = i;
230                         return cl_page;
231                 }
232                 if (cl_page_kmem_size_array[i] == 0)
233                         break;
234         }
235
236         if (i < ARRAY_SIZE(cl_page_kmem_array)) {
237                 char cache_name[32];
238
239                 mutex_lock(&cl_page_kmem_mutex);
240                 if (cl_page_kmem_size_array[i]) {
241                         mutex_unlock(&cl_page_kmem_mutex);
242                         goto check;
243                 }
244                 snprintf(cache_name, sizeof(cache_name),
245                          "cl_page_kmem-%u", bufsize);
246                 cl_page_kmem_array[i] =
247                         kmem_cache_create(cache_name, bufsize,
248                                           0, 0, NULL);
249                 if (cl_page_kmem_array[i] == NULL) {
250                         mutex_unlock(&cl_page_kmem_mutex);
251                         return NULL;
252                 }
253                 smp_store_release(&cl_page_kmem_size_array[i],
254                                   bufsize);
255                 mutex_unlock(&cl_page_kmem_mutex);
256                 goto check;
257         } else {
258                 OBD_ALLOC_GFP(cl_page, bufsize, GFP_NOFS);
259                 if (cl_page)
260                         cl_page->cp_kmem_index = -1;
261         }
262
263         return cl_page;
264 }
265
266 struct cl_page *cl_page_alloc(const struct lu_env *env, struct cl_object *o,
267                               pgoff_t ind, struct page *vmpage,
268                               enum cl_page_type type)
269 {
270         struct cl_page *cl_page;
271         struct lu_object_header *head;
272
273         ENTRY;
274
275         cl_page = __cl_page_alloc(o);
276         if (cl_page != NULL) {
277                 int result = 0;
278
279                 /*
280                  * Please fix cl_page:cp_state/type declaration if
281                  * these assertions fail in the future.
282                  */
283                 BUILD_BUG_ON((1 << CP_STATE_BITS) < CPS_NR); /* cp_state */
284                 BUILD_BUG_ON((1 << CP_TYPE_BITS) < CPT_NR); /* cp_type */
285                 atomic_set(&cl_page->cp_ref, 1);
286                 cl_page->cp_obj = o;
287                 cl_object_get(o);
288                 lu_object_ref_add_at(&o->co_lu, &cl_page->cp_obj_ref,
289                                      "cl_page", cl_page);
290                 cl_page->cp_vmpage = vmpage;
291                 cl_page->cp_state = CPS_CACHED;
292                 cl_page->cp_type = type;
293                 cl_page->cp_inode = NULL;
294                 INIT_LIST_HEAD(&cl_page->cp_batch);
295                 lu_ref_init(&cl_page->cp_reference);
296                 head = o->co_lu.lo_header;
297                 list_for_each_entry(o, &head->loh_layers,
298                                     co_lu.lo_linkage) {
299                         if (o->co_ops->coo_page_init != NULL) {
300                                 result = o->co_ops->coo_page_init(env, o,
301                                                         cl_page, ind);
302                                 if (result != 0) {
303                                         cl_page_delete0(env, cl_page);
304                                         cl_page_free(env, cl_page, NULL);
305                                         cl_page = ERR_PTR(result);
306                                         break;
307                                 }
308                         }
309                 }
310                 if (result == 0) {
311                         cs_page_inc(o, CS_total);
312                         cs_page_inc(o, CS_create);
313                         cs_pagestate_dec(o, CPS_CACHED);
314                 }
315         } else {
316                 cl_page = ERR_PTR(-ENOMEM);
317         }
318         RETURN(cl_page);
319 }
320
321 /**
322  * Returns a cl_page with index \a idx at the object \a o, and associated with
323  * the VM page \a vmpage.
324  *
325  * This is the main entry point into the cl_page caching interface. First, a
326  * cache (implemented as a per-object radix tree) is consulted. If page is
327  * found there, it is returned immediately. Otherwise new page is allocated
328  * and returned. In any case, additional reference to page is acquired.
329  *
330  * \see cl_object_find(), cl_lock_find()
331  */
332 struct cl_page *cl_page_find(const struct lu_env *env,
333                              struct cl_object *o,
334                              pgoff_t idx, struct page *vmpage,
335                              enum cl_page_type type)
336 {
337         struct cl_page          *page = NULL;
338         struct cl_object_header *hdr;
339
340         LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
341         might_sleep();
342
343         ENTRY;
344
345         hdr = cl_object_header(o);
346         cs_page_inc(o, CS_lookup);
347
348         CDEBUG(D_PAGE, "%lu@"DFID" %p %lx %d\n",
349                idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
350         /* fast path. */
351         if (type == CPT_CACHEABLE) {
352                 /* vmpage lock is used to protect the child/parent
353                  * relationship */
354                 LASSERT(PageLocked(vmpage));
355                 /*
356                  * cl_vmpage_page() can be called here without any locks as
357                  *
358                  *     - "vmpage" is locked (which prevents ->private from
359                  *       concurrent updates), and
360                  *
361                  *     - "o" cannot be destroyed while current thread holds a
362                  *       reference on it.
363                  */
364                 page = cl_vmpage_page(vmpage, o);
365                 if (page != NULL) {
366                         cs_page_inc(o, CS_hit);
367                         RETURN(page);
368                 }
369         }
370
371         /* allocate and initialize cl_page */
372         page = cl_page_alloc(env, o, idx, vmpage, type);
373         RETURN(page);
374 }
375 EXPORT_SYMBOL(cl_page_find);
376
377 static inline int cl_page_invariant(const struct cl_page *pg)
378 {
379         return cl_page_in_use_noref(pg);
380 }
381
382 static void cl_page_state_set0(const struct lu_env *env,
383                                struct cl_page *cl_page,
384                                enum cl_page_state state)
385 {
386         enum cl_page_state old;
387
388         /*
389          * Matrix of allowed state transitions [old][new], for sanity
390          * checking.
391          */
392         static const int allowed_transitions[CPS_NR][CPS_NR] = {
393                 [CPS_CACHED] = {
394                         [CPS_CACHED]  = 0,
395                         [CPS_OWNED]   = 1, /* io finds existing cached page */
396                         [CPS_PAGEIN]  = 0,
397                         [CPS_PAGEOUT] = 1, /* write-out from the cache */
398                         [CPS_FREEING] = 1, /* eviction on the memory pressure */
399                 },
400                 [CPS_OWNED] = {
401                         [CPS_CACHED]  = 1, /* release to the cache */
402                         [CPS_OWNED]   = 0,
403                         [CPS_PAGEIN]  = 1, /* start read immediately */
404                         [CPS_PAGEOUT] = 1, /* start write immediately */
405                         [CPS_FREEING] = 1, /* lock invalidation or truncate */
406                 },
407                 [CPS_PAGEIN] = {
408                         [CPS_CACHED]  = 1, /* io completion */
409                         [CPS_OWNED]   = 0,
410                         [CPS_PAGEIN]  = 0,
411                         [CPS_PAGEOUT] = 0,
412                         [CPS_FREEING] = 0,
413                 },
414                 [CPS_PAGEOUT] = {
415                         [CPS_CACHED]  = 1, /* io completion */
416                         [CPS_OWNED]   = 0,
417                         [CPS_PAGEIN]  = 0,
418                         [CPS_PAGEOUT] = 0,
419                         [CPS_FREEING] = 0,
420                 },
421                 [CPS_FREEING] = {
422                         [CPS_CACHED]  = 0,
423                         [CPS_OWNED]   = 0,
424                         [CPS_PAGEIN]  = 0,
425                         [CPS_PAGEOUT] = 0,
426                         [CPS_FREEING] = 0,
427                 }
428         };
429
430         ENTRY;
431         old = cl_page->cp_state;
432         PASSERT(env, cl_page, allowed_transitions[old][state]);
433         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d -> %d\n", old, state);
434         PASSERT(env, cl_page, cl_page->cp_state == old);
435         PASSERT(env, cl_page, equi(state == CPS_OWNED,
436                                    cl_page->cp_owner != NULL));
437
438         cs_pagestate_dec(cl_page->cp_obj, cl_page->cp_state);
439         cs_pagestate_inc(cl_page->cp_obj, state);
440         cl_page->cp_state = state;
441         EXIT;
442 }
443
444 static void cl_page_state_set(const struct lu_env *env,
445                               struct cl_page *page, enum cl_page_state state)
446 {
447         cl_page_state_set0(env, page, state);
448 }
449
450 /**
451  * Acquires an additional reference to a page.
452  *
453  * This can be called only by caller already possessing a reference to \a
454  * page.
455  *
456  * \see cl_object_get(), cl_lock_get().
457  */
458 void cl_page_get(struct cl_page *page)
459 {
460         ENTRY;
461         cl_page_get_trust(page);
462         EXIT;
463 }
464 EXPORT_SYMBOL(cl_page_get);
465
466 /**
467  * Releases a reference to a page, use the pagevec to release the pages
468  * in batch if provided.
469  *
470  * Users need to do a final pagevec_release() to release any trailing pages.
471  */
472 void cl_pagevec_put(const struct lu_env *env, struct cl_page *page,
473                   struct pagevec *pvec)
474 {
475         ENTRY;
476         CL_PAGE_HEADER(D_TRACE, env, page, "%d\n",
477                        atomic_read(&page->cp_ref));
478
479         if (atomic_dec_and_test(&page->cp_ref)) {
480                 LASSERT(page->cp_state == CPS_FREEING);
481
482                 LASSERT(atomic_read(&page->cp_ref) == 0);
483                 PASSERT(env, page, page->cp_owner == NULL);
484                 PASSERT(env, page, list_empty(&page->cp_batch));
485                 /*
486                  * Page is no longer reachable by other threads. Tear
487                  * it down.
488                  */
489                 cl_page_free(env, page, pvec);
490         }
491
492         EXIT;
493 }
494 EXPORT_SYMBOL(cl_pagevec_put);
495
496 /**
497  * Releases a reference to a page, wrapper to cl_pagevec_put
498  *
499  * When last reference is released, page is returned to the cache, unless it
500  * is in cl_page_state::CPS_FREEING state, in which case it is immediately
501  * destroyed.
502  *
503  * \see cl_object_put(), cl_lock_put().
504  */
505 void cl_page_put(const struct lu_env *env, struct cl_page *page)
506 {
507         cl_pagevec_put(env, page, NULL);
508 }
509 EXPORT_SYMBOL(cl_page_put);
510
511 /**
512  * Returns a cl_page associated with a VM page, and given cl_object.
513  */
514 struct cl_page *cl_vmpage_page(struct page *vmpage, struct cl_object *obj)
515 {
516         struct cl_page *page;
517
518         ENTRY;
519         LASSERT(PageLocked(vmpage));
520
521         /*
522          * NOTE: absence of races and liveness of data are guaranteed by page
523          *       lock on a "vmpage". That works because object destruction has
524          *       bottom-to-top pass.
525          */
526
527         page = (struct cl_page *)vmpage->private;
528         if (page != NULL) {
529                 cl_page_get_trust(page);
530                 LASSERT(page->cp_type == CPT_CACHEABLE);
531         }
532         RETURN(page);
533 }
534 EXPORT_SYMBOL(cl_vmpage_page);
535
536 const struct cl_page_slice *cl_page_at(const struct cl_page *page,
537                                        const struct lu_device_type *dtype)
538 {
539         return cl_page_at_trusted(page, dtype);
540 }
541 EXPORT_SYMBOL(cl_page_at);
542
543 static void cl_page_owner_clear(struct cl_page *page)
544 {
545         ENTRY;
546         if (page->cp_owner != NULL) {
547                 LASSERT(page->cp_owner->ci_owned_nr > 0);
548                 page->cp_owner->ci_owned_nr--;
549                 page->cp_owner = NULL;
550         }
551         EXIT;
552 }
553
554 static void cl_page_owner_set(struct cl_page *page)
555 {
556         ENTRY;
557         LASSERT(page->cp_owner != NULL);
558         page->cp_owner->ci_owned_nr++;
559         EXIT;
560 }
561
562 void cl_page_disown0(const struct lu_env *env,
563                      struct cl_io *io, struct cl_page *cl_page)
564 {
565         const struct cl_page_slice *slice;
566         enum cl_page_state state;
567         int i;
568
569         ENTRY;
570         state = cl_page->cp_state;
571         PINVRNT(env, cl_page, state == CPS_OWNED ||
572                 state == CPS_FREEING);
573         PINVRNT(env, cl_page, cl_page_invariant(cl_page) ||
574                 state == CPS_FREEING);
575         cl_page_owner_clear(cl_page);
576
577         if (state == CPS_OWNED)
578                 cl_page_state_set(env, cl_page, CPS_CACHED);
579         /*
580          * Completion call-backs are executed in the bottom-up order, so that
581          * uppermost layer (llite), responsible for VFS/VM interaction runs
582          * last and can release locks safely.
583          */
584         cl_page_slice_for_each_reverse(cl_page, slice, i) {
585                 if (slice->cpl_ops->cpo_disown != NULL)
586                         (*slice->cpl_ops->cpo_disown)(env, slice, io);
587         }
588
589         EXIT;
590 }
591
592 /**
593  * returns true, iff page is owned by the given io.
594  */
595 int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
596 {
597         struct cl_io *top = cl_io_top((struct cl_io *)io);
598         LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
599         ENTRY;
600         RETURN(pg->cp_state == CPS_OWNED && pg->cp_owner == top);
601 }
602 EXPORT_SYMBOL(cl_page_is_owned);
603
604 /**
605  * Try to own a page by IO.
606  *
607  * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
608  * into cl_page_state::CPS_OWNED state.
609  *
610  * \pre  !cl_page_is_owned(cl_page, io)
611  * \post result == 0 iff cl_page_is_owned(cl_page, io)
612  *
613  * \retval 0   success
614  *
615  * \retval -ve failure, e.g., cl_page was destroyed (and landed in
616  *             cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
617  *             or, page was owned by another thread, or in IO.
618  *
619  * \see cl_page_disown()
620  * \see cl_page_operations::cpo_own()
621  * \see cl_page_own_try()
622  * \see cl_page_own
623  */
624 static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
625                         struct cl_page *cl_page, int nonblock)
626 {
627         const struct cl_page_slice *slice;
628         int result = 0;
629         int i;
630
631         ENTRY;
632         PINVRNT(env, cl_page, !cl_page_is_owned(cl_page, io));
633         io = cl_io_top(io);
634
635         if (cl_page->cp_state == CPS_FREEING) {
636                 result = -ENOENT;
637                 goto out;
638         }
639
640         cl_page_slice_for_each(cl_page, slice, i) {
641                 if (slice->cpl_ops->cpo_own)
642                         result = (*slice->cpl_ops->cpo_own)(env, slice,
643                                                             io, nonblock);
644                 if (result != 0)
645                         break;
646         }
647         if (result > 0)
648                 result = 0;
649
650         if (result == 0) {
651                 PASSERT(env, cl_page, cl_page->cp_owner == NULL);
652                 cl_page->cp_owner = cl_io_top(io);
653                 cl_page_owner_set(cl_page);
654                 if (cl_page->cp_state != CPS_FREEING) {
655                         cl_page_state_set(env, cl_page, CPS_OWNED);
656                 } else {
657                         cl_page_disown0(env, io, cl_page);
658                         result = -ENOENT;
659                 }
660         }
661
662 out:
663         PINVRNT(env, cl_page, ergo(result == 0,
664                 cl_page_invariant(cl_page)));
665         RETURN(result);
666 }
667
668 /**
669  * Own a page, might be blocked.
670  *
671  * \see cl_page_own0()
672  */
673 int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
674 {
675         return cl_page_own0(env, io, pg, 0);
676 }
677 EXPORT_SYMBOL(cl_page_own);
678
679 /**
680  * Nonblock version of cl_page_own().
681  *
682  * \see cl_page_own0()
683  */
684 int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
685                     struct cl_page *pg)
686 {
687         return cl_page_own0(env, io, pg, 1);
688 }
689 EXPORT_SYMBOL(cl_page_own_try);
690
691
692 /**
693  * Assume page ownership.
694  *
695  * Called when page is already locked by the hosting VM.
696  *
697  * \pre !cl_page_is_owned(cl_page, io)
698  * \post cl_page_is_owned(cl_page, io)
699  *
700  * \see cl_page_operations::cpo_assume()
701  */
702 void cl_page_assume(const struct lu_env *env,
703                     struct cl_io *io, struct cl_page *cl_page)
704 {
705         const struct cl_page_slice *slice;
706         int i;
707
708         ENTRY;
709
710         PINVRNT(env, cl_page,
711                 cl_object_same(cl_page->cp_obj, io->ci_obj));
712         io = cl_io_top(io);
713
714         cl_page_slice_for_each(cl_page, slice, i) {
715                 if (slice->cpl_ops->cpo_assume != NULL)
716                         (*slice->cpl_ops->cpo_assume)(env, slice, io);
717         }
718
719         PASSERT(env, cl_page, cl_page->cp_owner == NULL);
720         cl_page->cp_owner = cl_io_top(io);
721         cl_page_owner_set(cl_page);
722         cl_page_state_set(env, cl_page, CPS_OWNED);
723         EXIT;
724 }
725 EXPORT_SYMBOL(cl_page_assume);
726
727 /**
728  * Releases page ownership without unlocking the page.
729  *
730  * Moves cl_page into cl_page_state::CPS_CACHED without releasing a lock
731  * on the underlying VM page (as VM is supposed to do this itself).
732  *
733  * \pre   cl_page_is_owned(cl_page, io)
734  * \post !cl_page_is_owned(cl_page, io)
735  *
736  * \see cl_page_assume()
737  */
738 void cl_page_unassume(const struct lu_env *env,
739                       struct cl_io *io, struct cl_page *cl_page)
740 {
741         const struct cl_page_slice *slice;
742         int i;
743
744         ENTRY;
745         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
746         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
747
748         io = cl_io_top(io);
749         cl_page_owner_clear(cl_page);
750         cl_page_state_set(env, cl_page, CPS_CACHED);
751
752         cl_page_slice_for_each_reverse(cl_page, slice, i) {
753                 if (slice->cpl_ops->cpo_unassume != NULL)
754                         (*slice->cpl_ops->cpo_unassume)(env, slice, io);
755         }
756
757         EXIT;
758 }
759 EXPORT_SYMBOL(cl_page_unassume);
760
761 /**
762  * Releases page ownership.
763  *
764  * Moves page into cl_page_state::CPS_CACHED.
765  *
766  * \pre   cl_page_is_owned(pg, io)
767  * \post !cl_page_is_owned(pg, io)
768  *
769  * \see cl_page_own()
770  * \see cl_page_operations::cpo_disown()
771  */
772 void cl_page_disown(const struct lu_env *env,
773                     struct cl_io *io, struct cl_page *pg)
774 {
775         PINVRNT(env, pg, cl_page_is_owned(pg, io) ||
776                 pg->cp_state == CPS_FREEING);
777
778         ENTRY;
779         io = cl_io_top(io);
780         cl_page_disown0(env, io, pg);
781         EXIT;
782 }
783 EXPORT_SYMBOL(cl_page_disown);
784
785 /**
786  * Called when cl_page is to be removed from the object, e.g.,
787  * as a result of truncate.
788  *
789  * Calls cl_page_operations::cpo_discard() top-to-bottom.
790  *
791  * \pre cl_page_is_owned(cl_page, io)
792  *
793  * \see cl_page_operations::cpo_discard()
794  */
795 void cl_page_discard(const struct lu_env *env,
796                      struct cl_io *io, struct cl_page *cl_page)
797 {
798         const struct cl_page_slice *slice;
799         int i;
800
801         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
802         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
803
804         cl_page_slice_for_each(cl_page, slice, i) {
805                 if (slice->cpl_ops->cpo_discard != NULL)
806                         (*slice->cpl_ops->cpo_discard)(env, slice, io);
807         }
808 }
809 EXPORT_SYMBOL(cl_page_discard);
810
811 /**
812  * Version of cl_page_delete() that can be called for not fully constructed
813  * cl_pages, e.g. in an error handling cl_page_find()->cl_page_delete0()
814  * path. Doesn't check cl_page invariant.
815  */
816 static void cl_page_delete0(const struct lu_env *env,
817                             struct cl_page *cl_page)
818 {
819         const struct cl_page_slice *slice;
820         int i;
821
822         ENTRY;
823
824         PASSERT(env, cl_page, cl_page->cp_state != CPS_FREEING);
825
826         /*
827          * Severe all ways to obtain new pointers to @pg.
828          */
829         cl_page_owner_clear(cl_page);
830         cl_page_state_set0(env, cl_page, CPS_FREEING);
831
832         cl_page_slice_for_each_reverse(cl_page, slice, i) {
833                 if (slice->cpl_ops->cpo_delete != NULL)
834                         (*slice->cpl_ops->cpo_delete)(env, slice);
835         }
836
837         EXIT;
838 }
839
840 /**
841  * Called when a decision is made to throw page out of memory.
842  *
843  * Notifies all layers about page destruction by calling
844  * cl_page_operations::cpo_delete() method top-to-bottom.
845  *
846  * Moves page into cl_page_state::CPS_FREEING state (this is the only place
847  * where transition to this state happens).
848  *
849  * Eliminates all venues through which new references to the page can be
850  * obtained:
851  *
852  *     - removes page from the radix trees,
853  *
854  *     - breaks linkage from VM page to cl_page.
855  *
856  * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
857  * drain after some time, at which point page will be recycled.
858  *
859  * \pre  VM page is locked
860  * \post pg->cp_state == CPS_FREEING
861  *
862  * \see cl_page_operations::cpo_delete()
863  */
864 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
865 {
866         PINVRNT(env, pg, cl_page_invariant(pg));
867         ENTRY;
868         cl_page_delete0(env, pg);
869         EXIT;
870 }
871 EXPORT_SYMBOL(cl_page_delete);
872
873 /**
874  * Marks page up-to-date.
875  *
876  * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
877  * layer responsible for VM interaction has to mark/clear page as up-to-date
878  * by the \a uptodate argument.
879  *
880  * \see cl_page_operations::cpo_export()
881  */
882 void cl_page_export(const struct lu_env *env, struct cl_page *cl_page,
883                     int uptodate)
884 {
885         const struct cl_page_slice *slice;
886         int i;
887
888         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
889
890         cl_page_slice_for_each(cl_page, slice, i) {
891                 if (slice->cpl_ops->cpo_export != NULL)
892                         (*slice->cpl_ops->cpo_export)(env, slice, uptodate);
893         }
894 }
895 EXPORT_SYMBOL(cl_page_export);
896
897 /**
898  * Returns true, if \a page is VM locked in a suitable sense by the calling
899  * thread.
900  */
901 int cl_page_is_vmlocked(const struct lu_env *env,
902                         const struct cl_page *cl_page)
903 {
904         const struct cl_page_slice *slice;
905         int result;
906
907         ENTRY;
908         slice = cl_page_slice_get(cl_page, 0);
909         PASSERT(env, cl_page, slice->cpl_ops->cpo_is_vmlocked != NULL);
910         /*
911          * Call ->cpo_is_vmlocked() directly instead of going through
912          * CL_PAGE_INVOKE(), because cl_page_is_vmlocked() is used by
913          * cl_page_invariant().
914          */
915         result = slice->cpl_ops->cpo_is_vmlocked(env, slice);
916         PASSERT(env, cl_page, result == -EBUSY || result == -ENODATA);
917
918         RETURN(result == -EBUSY);
919 }
920 EXPORT_SYMBOL(cl_page_is_vmlocked);
921
922 void cl_page_touch(const struct lu_env *env,
923                    const struct cl_page *cl_page, size_t to)
924 {
925         const struct cl_page_slice *slice;
926         int i;
927
928         ENTRY;
929
930         cl_page_slice_for_each(cl_page, slice, i) {
931                 if (slice->cpl_ops->cpo_page_touch != NULL)
932                         (*slice->cpl_ops->cpo_page_touch)(env, slice, to);
933         }
934
935         EXIT;
936 }
937 EXPORT_SYMBOL(cl_page_touch);
938
939 static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
940 {
941         ENTRY;
942         RETURN(crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN);
943 }
944
945 static void cl_page_io_start(const struct lu_env *env,
946                              struct cl_page *pg, enum cl_req_type crt)
947 {
948         /*
949          * Page is queued for IO, change its state.
950          */
951         ENTRY;
952         cl_page_owner_clear(pg);
953         cl_page_state_set(env, pg, cl_req_type_state(crt));
954         EXIT;
955 }
956
957 /**
958  * Prepares page for immediate transfer. cl_page_operations::cpo_prep() is
959  * called top-to-bottom. Every layer either agrees to submit this page (by
960  * returning 0), or requests to omit this page (by returning -EALREADY). Layer
961  * handling interactions with the VM also has to inform VM that page is under
962  * transfer now.
963  */
964 int cl_page_prep(const struct lu_env *env, struct cl_io *io,
965                  struct cl_page *cl_page, enum cl_req_type crt)
966 {
967         const struct cl_page_slice *slice;
968         int result = 0;
969         int i;
970
971         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
972         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
973         PINVRNT(env, cl_page, crt < CRT_NR);
974
975         /*
976          * this has to be called bottom-to-top, so that llite can set up
977          * PG_writeback without risking other layers deciding to skip this
978          * page.
979          */
980         if (crt >= CRT_NR)
981                 return -EINVAL;
982
983         cl_page_slice_for_each(cl_page, slice, i) {
984                 if (slice->cpl_ops->cpo_own)
985                         result = (*slice->cpl_ops->io[crt].cpo_prep)(env,
986                                                                      slice,
987                                                                      io);
988                 if (result != 0)
989                         break;
990         }
991
992         if (result >= 0) {
993                 result = 0;
994                 cl_page_io_start(env, cl_page, crt);
995         }
996
997         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", crt, result);
998         return result;
999 }
1000 EXPORT_SYMBOL(cl_page_prep);
1001
1002 /**
1003  * Notify layers about transfer completion.
1004  *
1005  * Invoked by transfer sub-system (which is a part of osc) to notify layers
1006  * that a transfer, of which this page is a part of has completed.
1007  *
1008  * Completion call-backs are executed in the bottom-up order, so that
1009  * uppermost layer (llite), responsible for the VFS/VM interaction runs last
1010  * and can release locks safely.
1011  *
1012  * \pre  cl_page->cp_state == CPS_PAGEIN || cl_page->cp_state == CPS_PAGEOUT
1013  * \post cl_page->cl_page_state == CPS_CACHED
1014  *
1015  * \see cl_page_operations::cpo_completion()
1016  */
1017 void cl_page_completion(const struct lu_env *env,
1018                         struct cl_page *cl_page, enum cl_req_type crt,
1019                         int ioret)
1020 {
1021         const struct cl_page_slice *slice;
1022         struct cl_sync_io *anchor = cl_page->cp_sync_io;
1023         int i;
1024
1025         ENTRY;
1026         PASSERT(env, cl_page, crt < CRT_NR);
1027         PASSERT(env, cl_page, cl_page->cp_state == cl_req_type_state(crt));
1028
1029         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", crt, ioret);
1030         cl_page_state_set(env, cl_page, CPS_CACHED);
1031         if (crt >= CRT_NR)
1032                 return;
1033
1034         cl_page_slice_for_each_reverse(cl_page, slice, i) {
1035                 if (slice->cpl_ops->io[crt].cpo_completion != NULL)
1036                         (*slice->cpl_ops->io[crt].cpo_completion)(env, slice,
1037                                                                   ioret);
1038         }
1039
1040         if (anchor != NULL) {
1041                 LASSERT(cl_page->cp_sync_io == anchor);
1042                 cl_page->cp_sync_io = NULL;
1043                 cl_sync_io_note(env, anchor, ioret);
1044         }
1045         EXIT;
1046 }
1047 EXPORT_SYMBOL(cl_page_completion);
1048
1049 /**
1050  * Notify layers that transfer formation engine decided to yank this page from
1051  * the cache and to make it a part of a transfer.
1052  *
1053  * \pre  cl_page->cp_state == CPS_CACHED
1054  * \post cl_page->cp_state == CPS_PAGEIN || cl_page->cp_state == CPS_PAGEOUT
1055  *
1056  * \see cl_page_operations::cpo_make_ready()
1057  */
1058 int cl_page_make_ready(const struct lu_env *env, struct cl_page *cl_page,
1059                        enum cl_req_type crt)
1060 {
1061         const struct cl_page_slice *slice;
1062         int result = 0;
1063         int i;
1064
1065         ENTRY;
1066         PINVRNT(env, cl_page, crt < CRT_NR);
1067         if (crt >= CRT_NR)
1068                 RETURN(-EINVAL);
1069
1070         cl_page_slice_for_each(cl_page, slice, i) {
1071                 if (slice->cpl_ops->io[crt].cpo_make_ready != NULL)
1072                         result = (*slice->cpl_ops->io[crt].cpo_make_ready)(env, slice);
1073                 if (result != 0)
1074                         break;
1075         }
1076
1077         if (result >= 0) {
1078                 result = 0;
1079                 PASSERT(env, cl_page, cl_page->cp_state == CPS_CACHED);
1080                 cl_page_io_start(env, cl_page, crt);
1081         }
1082         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", crt, result);
1083
1084         RETURN(result);
1085 }
1086 EXPORT_SYMBOL(cl_page_make_ready);
1087
1088 /**
1089  * Called if a page is being written back by kernel's intention.
1090  *
1091  * \pre  cl_page_is_owned(cl_page, io)
1092  * \post ergo(result == 0, cl_page->cp_state == CPS_PAGEOUT)
1093  *
1094  * \see cl_page_operations::cpo_flush()
1095  */
1096 int cl_page_flush(const struct lu_env *env, struct cl_io *io,
1097                   struct cl_page *cl_page)
1098 {
1099         const struct cl_page_slice *slice;
1100         int result = 0;
1101         int i;
1102
1103         ENTRY;
1104         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
1105         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
1106
1107         cl_page_slice_for_each(cl_page, slice, i) {
1108                 if (slice->cpl_ops->cpo_flush != NULL)
1109                         result = (*slice->cpl_ops->cpo_flush)(env, slice, io);
1110                 if (result != 0)
1111                         break;
1112         }
1113         if (result > 0)
1114                 result = 0;
1115
1116         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d\n", result);
1117         RETURN(result);
1118 }
1119 EXPORT_SYMBOL(cl_page_flush);
1120
1121 /**
1122  * Tells transfer engine that only part of a page is to be transmitted.
1123  *
1124  * \see cl_page_operations::cpo_clip()
1125  */
1126 void cl_page_clip(const struct lu_env *env, struct cl_page *cl_page,
1127                   int from, int to)
1128 {
1129         const struct cl_page_slice *slice;
1130         int i;
1131
1132         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
1133
1134         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", from, to);
1135         cl_page_slice_for_each(cl_page, slice, i) {
1136                 if (slice->cpl_ops->cpo_clip != NULL)
1137                         (*slice->cpl_ops->cpo_clip)(env, slice, from, to);
1138         }
1139 }
1140 EXPORT_SYMBOL(cl_page_clip);
1141
1142 /**
1143  * Prints human readable representation of \a pg to the \a f.
1144  */
1145 void cl_page_header_print(const struct lu_env *env, void *cookie,
1146                           lu_printer_t printer, const struct cl_page *pg)
1147 {
1148         (*printer)(env, cookie,
1149                    "page@%p[%d %p %d %d %p]\n",
1150                    pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1151                    pg->cp_state, pg->cp_type,
1152                    pg->cp_owner);
1153 }
1154 EXPORT_SYMBOL(cl_page_header_print);
1155
1156 /**
1157  * Prints human readable representation of \a cl_page to the \a f.
1158  */
1159 void cl_page_print(const struct lu_env *env, void *cookie,
1160                    lu_printer_t printer, const struct cl_page *cl_page)
1161 {
1162         const struct cl_page_slice *slice;
1163         int result = 0;
1164         int i;
1165
1166         cl_page_header_print(env, cookie, printer, cl_page);
1167         cl_page_slice_for_each(cl_page, slice, i) {
1168                 if (slice->cpl_ops->cpo_print != NULL)
1169                         result = (*slice->cpl_ops->cpo_print)(env, slice,
1170                                                              cookie, printer);
1171                 if (result != 0)
1172                         break;
1173         }
1174         (*printer)(env, cookie, "end page@%p\n", cl_page);
1175 }
1176 EXPORT_SYMBOL(cl_page_print);
1177
1178 /**
1179  * Converts a byte offset within object \a obj into a page index.
1180  */
1181 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1182 {
1183         return (loff_t)idx << PAGE_SHIFT;
1184 }
1185 EXPORT_SYMBOL(cl_offset);
1186
1187 /**
1188  * Converts a page index into a byte offset within object \a obj.
1189  */
1190 pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1191 {
1192         return offset >> PAGE_SHIFT;
1193 }
1194 EXPORT_SYMBOL(cl_index);
1195
1196 size_t cl_page_size(const struct cl_object *obj)
1197 {
1198         return 1UL << PAGE_SHIFT;
1199 }
1200 EXPORT_SYMBOL(cl_page_size);
1201
1202 /**
1203  * Adds page slice to the compound page.
1204  *
1205  * This is called by cl_object_operations::coo_page_init() methods to add a
1206  * per-layer state to the page. New state is added at the end of
1207  * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1208  *
1209  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1210  */
1211 void cl_page_slice_add(struct cl_page *cl_page, struct cl_page_slice *slice,
1212                        struct cl_object *obj,
1213                        const struct cl_page_operations *ops)
1214 {
1215         unsigned int offset = (char *)slice -
1216                         ((char *)cl_page + sizeof(*cl_page));
1217
1218         ENTRY;
1219         LASSERT(cl_page->cp_layer_count < CP_MAX_LAYER);
1220         LASSERT(offset < (1 << sizeof(cl_page->cp_layer_offset[0]) * 8));
1221         cl_page->cp_layer_offset[cl_page->cp_layer_count++] = offset;
1222         slice->cpl_obj  = obj;
1223         slice->cpl_ops  = ops;
1224         slice->cpl_page = cl_page;
1225
1226         EXIT;
1227 }
1228 EXPORT_SYMBOL(cl_page_slice_add);
1229
1230 /**
1231  * Allocate and initialize cl_cache, called by ll_init_sbi().
1232  */
1233 struct cl_client_cache *cl_cache_init(unsigned long lru_page_max)
1234 {
1235         struct cl_client_cache  *cache = NULL;
1236
1237         ENTRY;
1238         OBD_ALLOC(cache, sizeof(*cache));
1239         if (cache == NULL)
1240                 RETURN(NULL);
1241
1242         /* Initialize cache data */
1243         atomic_set(&cache->ccc_users, 1);
1244         cache->ccc_lru_max = lru_page_max;
1245         atomic_long_set(&cache->ccc_lru_left, lru_page_max);
1246         spin_lock_init(&cache->ccc_lru_lock);
1247         INIT_LIST_HEAD(&cache->ccc_lru);
1248
1249         /* turn unstable check off by default as it impacts performance */
1250         cache->ccc_unstable_check = 0;
1251         atomic_long_set(&cache->ccc_unstable_nr, 0);
1252         init_waitqueue_head(&cache->ccc_unstable_waitq);
1253         mutex_init(&cache->ccc_max_cache_mb_lock);
1254
1255         RETURN(cache);
1256 }
1257 EXPORT_SYMBOL(cl_cache_init);
1258
1259 /**
1260  * Increase cl_cache refcount
1261  */
1262 void cl_cache_incref(struct cl_client_cache *cache)
1263 {
1264         atomic_inc(&cache->ccc_users);
1265 }
1266 EXPORT_SYMBOL(cl_cache_incref);
1267
1268 /**
1269  * Decrease cl_cache refcount and free the cache if refcount=0.
1270  * Since llite, lov and osc all hold cl_cache refcount,
1271  * the free will not cause race. (LU-6173)
1272  */
1273 void cl_cache_decref(struct cl_client_cache *cache)
1274 {
1275         if (atomic_dec_and_test(&cache->ccc_users))
1276                 OBD_FREE(cache, sizeof(*cache));
1277 }
1278 EXPORT_SYMBOL(cl_cache_decref);