Whamcloud - gitweb
e345958cce47d885dac304a5663b0979e7c61248
[fs/lustre-release.git] / lustre / obdclass / cl_page.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * Client Lustre Page.
32  *
33  *   Author: Nikita Danilov <nikita.danilov@sun.com>
34  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/list.h>
40 #include <libcfs/libcfs.h>
41 #include <obd_class.h>
42 #include <obd_support.h>
43
44 #include <cl_object.h>
45 #include "cl_internal.h"
46
47 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg);
48 static DEFINE_MUTEX(cl_page_kmem_mutex);
49
50 #ifdef LIBCFS_DEBUG
51 # define PASSERT(env, page, expr)                                       \
52   do {                                                                    \
53           if (unlikely(!(expr))) {                                      \
54                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
55                   LASSERT(0);                                           \
56           }                                                             \
57   } while (0)
58 #else /* !LIBCFS_DEBUG */
59 # define PASSERT(env, page, exp) \
60         ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
61 #endif /* !LIBCFS_DEBUG */
62
63 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
64 # define PINVRNT(env, page, expr)                                       \
65   do {                                                                    \
66           if (unlikely(!(expr))) {                                      \
67                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
68                   LINVRNT(0);                                           \
69           }                                                             \
70   } while (0)
71 #else /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
72 # define PINVRNT(env, page, exp) \
73          ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
74 #endif /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
75
76 /* Disable page statistic by default due to huge performance penalty. */
77 static void cs_page_inc(const struct cl_object *obj,
78                         enum cache_stats_item item)
79 {
80 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
81         atomic_inc(&cl_object_site(obj)->cs_pages.cs_stats[item]);
82 #endif
83 }
84
85 static void cs_page_dec(const struct cl_object *obj,
86                         enum cache_stats_item item)
87 {
88 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
89         atomic_dec(&cl_object_site(obj)->cs_pages.cs_stats[item]);
90 #endif
91 }
92
93 static void cs_pagestate_inc(const struct cl_object *obj,
94                              enum cl_page_state state)
95 {
96 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
97         atomic_inc(&cl_object_site(obj)->cs_pages_state[state]);
98 #endif
99 }
100
101 static void cs_pagestate_dec(const struct cl_object *obj,
102                               enum cl_page_state state)
103 {
104 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
105         atomic_dec(&cl_object_site(obj)->cs_pages_state[state]);
106 #endif
107 }
108
109 /**
110  * Internal version of cl_page_get().
111  *
112  * This function can be used to obtain initial reference to previously
113  * unreferenced cached object. It can be called only if concurrent page
114  * reclamation is somehow prevented, e.g., by keeping a lock on a VM page,
115  * associated with \a page.
116  *
117  * Use with care! Not exported.
118  */
119 static void cl_page_get_trust(struct cl_page *page)
120 {
121         LASSERT(atomic_read(&page->cp_ref) > 0);
122         atomic_inc(&page->cp_ref);
123 }
124
125 static struct cl_page_slice *
126 cl_page_slice_get(const struct cl_page *cl_page, int index)
127 {
128         if (index < 0 || index >= cl_page->cp_layer_count)
129                 return NULL;
130
131         /* To get the cp_layer_offset values fit under 256 bytes, we
132          * use the offset beyond the end of struct cl_page.
133          */
134         return (struct cl_page_slice *)((char *)cl_page + sizeof(*cl_page) +
135                                         cl_page->cp_layer_offset[index]);
136 }
137
138 #define cl_page_slice_for_each(cl_page, slice, i)               \
139         for (i = 0, slice = cl_page_slice_get(cl_page, 0);      \
140              i < (cl_page)->cp_layer_count;                     \
141              slice = cl_page_slice_get(cl_page, ++i))
142
143 #define cl_page_slice_for_each_reverse(cl_page, slice, i)       \
144         for (i = (cl_page)->cp_layer_count - 1,                 \
145              slice = cl_page_slice_get(cl_page, i); i >= 0;     \
146              slice = cl_page_slice_get(cl_page, --i))
147
148 /**
149  * Returns a slice within a cl_page, corresponding to the given layer in the
150  * device stack.
151  *
152  * \see cl_lock_at()
153  */
154 static const struct cl_page_slice *
155 cl_page_at_trusted(const struct cl_page *cl_page,
156                    const struct lu_device_type *dtype)
157 {
158         const struct cl_page_slice *slice;
159         int i;
160
161         ENTRY;
162
163         cl_page_slice_for_each(cl_page, slice, i) {
164                 if (slice->cpl_obj->co_lu.lo_dev->ld_type == dtype)
165                         RETURN(slice);
166         }
167
168         RETURN(NULL);
169 }
170
171 static void __cl_page_free(struct cl_page *cl_page, unsigned short bufsize)
172 {
173         int index = cl_page->cp_kmem_index;
174
175         if (index >= 0) {
176                 LASSERT(index < ARRAY_SIZE(cl_page_kmem_array));
177                 LASSERT(cl_page_kmem_size_array[index] == bufsize);
178                 OBD_SLAB_FREE(cl_page, cl_page_kmem_array[index], bufsize);
179         } else {
180                 OBD_FREE(cl_page, bufsize);
181         }
182 }
183
184 static void cl_page_free(const struct lu_env *env, struct cl_page *cl_page,
185                          struct pagevec *pvec)
186 {
187         struct cl_object *obj  = cl_page->cp_obj;
188         unsigned short bufsize = cl_object_header(obj)->coh_page_bufsize;
189         struct cl_page_slice *slice;
190         int i;
191
192         ENTRY;
193         PASSERT(env, cl_page, list_empty(&cl_page->cp_batch));
194         PASSERT(env, cl_page, cl_page->cp_owner == NULL);
195         PASSERT(env, cl_page, cl_page->cp_state == CPS_FREEING);
196
197         cl_page_slice_for_each(cl_page, slice, i) {
198                 if (unlikely(slice->cpl_ops->cpo_fini != NULL))
199                         slice->cpl_ops->cpo_fini(env, slice, pvec);
200         }
201         cl_page->cp_layer_count = 0;
202         cs_page_dec(obj, CS_total);
203         cs_pagestate_dec(obj, cl_page->cp_state);
204         lu_object_ref_del_at(&obj->co_lu, &cl_page->cp_obj_ref,
205                              "cl_page", cl_page);
206         cl_object_put(env, obj);
207         lu_ref_fini(&cl_page->cp_reference);
208         __cl_page_free(cl_page, bufsize);
209         EXIT;
210 }
211
212 static struct cl_page *__cl_page_alloc(struct cl_object *o)
213 {
214         int i = 0;
215         struct cl_page *cl_page = NULL;
216         unsigned short bufsize = cl_object_header(o)->coh_page_bufsize;
217
218         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PAGE_ALLOC))
219                 return NULL;
220
221 check:
222         /* the number of entries in cl_page_kmem_array is expected to
223          * only be 2-3 entries, so the lookup overhead should be low.
224          */
225         for ( ; i < ARRAY_SIZE(cl_page_kmem_array); i++) {
226                 if (smp_load_acquire(&cl_page_kmem_size_array[i])
227                     == bufsize) {
228                         OBD_SLAB_ALLOC_GFP(cl_page, cl_page_kmem_array[i],
229                                            bufsize, GFP_NOFS);
230                         if (cl_page)
231                                 cl_page->cp_kmem_index = i;
232                         return cl_page;
233                 }
234                 if (cl_page_kmem_size_array[i] == 0)
235                         break;
236         }
237
238         if (i < ARRAY_SIZE(cl_page_kmem_array)) {
239                 char cache_name[32];
240
241                 mutex_lock(&cl_page_kmem_mutex);
242                 if (cl_page_kmem_size_array[i]) {
243                         mutex_unlock(&cl_page_kmem_mutex);
244                         goto check;
245                 }
246                 snprintf(cache_name, sizeof(cache_name),
247                          "cl_page_kmem-%u", bufsize);
248                 cl_page_kmem_array[i] =
249                         kmem_cache_create(cache_name, bufsize,
250                                           0, 0, NULL);
251                 if (cl_page_kmem_array[i] == NULL) {
252                         mutex_unlock(&cl_page_kmem_mutex);
253                         return NULL;
254                 }
255                 smp_store_release(&cl_page_kmem_size_array[i],
256                                   bufsize);
257                 mutex_unlock(&cl_page_kmem_mutex);
258                 goto check;
259         } else {
260                 OBD_ALLOC_GFP(cl_page, bufsize, GFP_NOFS);
261                 if (cl_page)
262                         cl_page->cp_kmem_index = -1;
263         }
264
265         return cl_page;
266 }
267
268 struct cl_page *cl_page_alloc(const struct lu_env *env, struct cl_object *o,
269                               pgoff_t ind, struct page *vmpage,
270                               enum cl_page_type type)
271 {
272         struct cl_page *cl_page;
273         struct cl_object *head;
274
275         ENTRY;
276
277         cl_page = __cl_page_alloc(o);
278         if (cl_page != NULL) {
279                 int result = 0;
280
281                 /*
282                  * Please fix cl_page:cp_state/type declaration if
283                  * these assertions fail in the future.
284                  */
285                 BUILD_BUG_ON((1 << CP_STATE_BITS) < CPS_NR); /* cp_state */
286                 BUILD_BUG_ON((1 << CP_TYPE_BITS) < CPT_NR); /* cp_type */
287                 atomic_set(&cl_page->cp_ref, 1);
288                 cl_page->cp_obj = o;
289                 cl_object_get(o);
290                 lu_object_ref_add_at(&o->co_lu, &cl_page->cp_obj_ref,
291                                      "cl_page", cl_page);
292                 cl_page->cp_vmpage = vmpage;
293                 cl_page->cp_state = CPS_CACHED;
294                 cl_page->cp_type = type;
295                 cl_page->cp_inode = NULL;
296                 INIT_LIST_HEAD(&cl_page->cp_batch);
297                 lu_ref_init(&cl_page->cp_reference);
298                 head = o;
299                 cl_object_for_each(o, head) {
300                         if (o->co_ops->coo_page_init != NULL) {
301                                 result = o->co_ops->coo_page_init(env, o,
302                                                         cl_page, ind);
303                                 if (result != 0) {
304                                         cl_page_delete0(env, cl_page);
305                                         cl_page_free(env, cl_page, NULL);
306                                         cl_page = ERR_PTR(result);
307                                         break;
308                                 }
309                         }
310                 }
311                 if (result == 0) {
312                         cs_page_inc(o, CS_total);
313                         cs_page_inc(o, CS_create);
314                         cs_pagestate_dec(o, CPS_CACHED);
315                 }
316         } else {
317                 cl_page = ERR_PTR(-ENOMEM);
318         }
319         RETURN(cl_page);
320 }
321
322 /**
323  * Returns a cl_page with index \a idx at the object \a o, and associated with
324  * the VM page \a vmpage.
325  *
326  * This is the main entry point into the cl_page caching interface. First, a
327  * cache (implemented as a per-object radix tree) is consulted. If page is
328  * found there, it is returned immediately. Otherwise new page is allocated
329  * and returned. In any case, additional reference to page is acquired.
330  *
331  * \see cl_object_find(), cl_lock_find()
332  */
333 struct cl_page *cl_page_find(const struct lu_env *env,
334                              struct cl_object *o,
335                              pgoff_t idx, struct page *vmpage,
336                              enum cl_page_type type)
337 {
338         struct cl_page          *page = NULL;
339         struct cl_object_header *hdr;
340
341         LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
342         might_sleep();
343
344         ENTRY;
345
346         hdr = cl_object_header(o);
347         cs_page_inc(o, CS_lookup);
348
349         CDEBUG(D_PAGE, "%lu@"DFID" %p %lx %d\n",
350                idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
351         /* fast path. */
352         if (type == CPT_CACHEABLE) {
353                 /* vmpage lock is used to protect the child/parent
354                  * relationship */
355                 LASSERT(PageLocked(vmpage));
356                 /*
357                  * cl_vmpage_page() can be called here without any locks as
358                  *
359                  *     - "vmpage" is locked (which prevents ->private from
360                  *       concurrent updates), and
361                  *
362                  *     - "o" cannot be destroyed while current thread holds a
363                  *       reference on it.
364                  */
365                 page = cl_vmpage_page(vmpage, o);
366                 if (page != NULL) {
367                         cs_page_inc(o, CS_hit);
368                         RETURN(page);
369                 }
370         }
371
372         /* allocate and initialize cl_page */
373         page = cl_page_alloc(env, o, idx, vmpage, type);
374         RETURN(page);
375 }
376 EXPORT_SYMBOL(cl_page_find);
377
378 static inline int cl_page_invariant(const struct cl_page *pg)
379 {
380         return cl_page_in_use_noref(pg);
381 }
382
383 static void cl_page_state_set0(const struct lu_env *env,
384                                struct cl_page *cl_page,
385                                enum cl_page_state state)
386 {
387         enum cl_page_state old;
388
389         /*
390          * Matrix of allowed state transitions [old][new], for sanity
391          * checking.
392          */
393         static const int allowed_transitions[CPS_NR][CPS_NR] = {
394                 [CPS_CACHED] = {
395                         [CPS_CACHED]  = 0,
396                         [CPS_OWNED]   = 1, /* io finds existing cached page */
397                         [CPS_PAGEIN]  = 0,
398                         [CPS_PAGEOUT] = 1, /* write-out from the cache */
399                         [CPS_FREEING] = 1, /* eviction on the memory pressure */
400                 },
401                 [CPS_OWNED] = {
402                         [CPS_CACHED]  = 1, /* release to the cache */
403                         [CPS_OWNED]   = 0,
404                         [CPS_PAGEIN]  = 1, /* start read immediately */
405                         [CPS_PAGEOUT] = 1, /* start write immediately */
406                         [CPS_FREEING] = 1, /* lock invalidation or truncate */
407                 },
408                 [CPS_PAGEIN] = {
409                         [CPS_CACHED]  = 1, /* io completion */
410                         [CPS_OWNED]   = 0,
411                         [CPS_PAGEIN]  = 0,
412                         [CPS_PAGEOUT] = 0,
413                         [CPS_FREEING] = 0,
414                 },
415                 [CPS_PAGEOUT] = {
416                         [CPS_CACHED]  = 1, /* io completion */
417                         [CPS_OWNED]   = 0,
418                         [CPS_PAGEIN]  = 0,
419                         [CPS_PAGEOUT] = 0,
420                         [CPS_FREEING] = 0,
421                 },
422                 [CPS_FREEING] = {
423                         [CPS_CACHED]  = 0,
424                         [CPS_OWNED]   = 0,
425                         [CPS_PAGEIN]  = 0,
426                         [CPS_PAGEOUT] = 0,
427                         [CPS_FREEING] = 0,
428                 }
429         };
430
431         ENTRY;
432         old = cl_page->cp_state;
433         PASSERT(env, cl_page, allowed_transitions[old][state]);
434         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d -> %d\n", old, state);
435         PASSERT(env, cl_page, cl_page->cp_state == old);
436         PASSERT(env, cl_page, equi(state == CPS_OWNED,
437                                    cl_page->cp_owner != NULL));
438
439         cs_pagestate_dec(cl_page->cp_obj, cl_page->cp_state);
440         cs_pagestate_inc(cl_page->cp_obj, state);
441         cl_page->cp_state = state;
442         EXIT;
443 }
444
445 static void cl_page_state_set(const struct lu_env *env,
446                               struct cl_page *page, enum cl_page_state state)
447 {
448         cl_page_state_set0(env, page, state);
449 }
450
451 /**
452  * Acquires an additional reference to a page.
453  *
454  * This can be called only by caller already possessing a reference to \a
455  * page.
456  *
457  * \see cl_object_get(), cl_lock_get().
458  */
459 void cl_page_get(struct cl_page *page)
460 {
461         ENTRY;
462         cl_page_get_trust(page);
463         EXIT;
464 }
465 EXPORT_SYMBOL(cl_page_get);
466
467 /**
468  * Releases a reference to a page, use the pagevec to release the pages
469  * in batch if provided.
470  *
471  * Users need to do a final pagevec_release() to release any trailing pages.
472  */
473 void cl_pagevec_put(const struct lu_env *env, struct cl_page *page,
474                   struct pagevec *pvec)
475 {
476         ENTRY;
477         CL_PAGE_HEADER(D_TRACE, env, page, "%d\n",
478                        atomic_read(&page->cp_ref));
479
480         if (atomic_dec_and_test(&page->cp_ref)) {
481                 LASSERT(page->cp_state == CPS_FREEING);
482
483                 LASSERT(atomic_read(&page->cp_ref) == 0);
484                 PASSERT(env, page, page->cp_owner == NULL);
485                 PASSERT(env, page, list_empty(&page->cp_batch));
486                 /*
487                  * Page is no longer reachable by other threads. Tear
488                  * it down.
489                  */
490                 cl_page_free(env, page, pvec);
491         }
492
493         EXIT;
494 }
495 EXPORT_SYMBOL(cl_pagevec_put);
496
497 /**
498  * Releases a reference to a page, wrapper to cl_pagevec_put
499  *
500  * When last reference is released, page is returned to the cache, unless it
501  * is in cl_page_state::CPS_FREEING state, in which case it is immediately
502  * destroyed.
503  *
504  * \see cl_object_put(), cl_lock_put().
505  */
506 void cl_page_put(const struct lu_env *env, struct cl_page *page)
507 {
508         cl_pagevec_put(env, page, NULL);
509 }
510 EXPORT_SYMBOL(cl_page_put);
511
512 /**
513  * Returns a cl_page associated with a VM page, and given cl_object.
514  */
515 struct cl_page *cl_vmpage_page(struct page *vmpage, struct cl_object *obj)
516 {
517         struct cl_page *page;
518
519         ENTRY;
520         LASSERT(PageLocked(vmpage));
521
522         /*
523          * NOTE: absence of races and liveness of data are guaranteed by page
524          *       lock on a "vmpage". That works because object destruction has
525          *       bottom-to-top pass.
526          */
527
528         page = (struct cl_page *)vmpage->private;
529         if (page != NULL) {
530                 cl_page_get_trust(page);
531                 LASSERT(page->cp_type == CPT_CACHEABLE);
532         }
533         RETURN(page);
534 }
535 EXPORT_SYMBOL(cl_vmpage_page);
536
537 const struct cl_page_slice *cl_page_at(const struct cl_page *page,
538                                        const struct lu_device_type *dtype)
539 {
540         return cl_page_at_trusted(page, dtype);
541 }
542 EXPORT_SYMBOL(cl_page_at);
543
544 static void cl_page_owner_clear(struct cl_page *page)
545 {
546         ENTRY;
547         if (page->cp_owner != NULL) {
548                 LASSERT(page->cp_owner->ci_owned_nr > 0);
549                 page->cp_owner->ci_owned_nr--;
550                 page->cp_owner = NULL;
551         }
552         EXIT;
553 }
554
555 static void cl_page_owner_set(struct cl_page *page)
556 {
557         ENTRY;
558         LASSERT(page->cp_owner != NULL);
559         page->cp_owner->ci_owned_nr++;
560         EXIT;
561 }
562
563 void cl_page_disown0(const struct lu_env *env,
564                      struct cl_io *io, struct cl_page *cl_page)
565 {
566         const struct cl_page_slice *slice;
567         enum cl_page_state state;
568         int i;
569
570         ENTRY;
571         state = cl_page->cp_state;
572         PINVRNT(env, cl_page, state == CPS_OWNED ||
573                 state == CPS_FREEING);
574         PINVRNT(env, cl_page, cl_page_invariant(cl_page) ||
575                 state == CPS_FREEING);
576         cl_page_owner_clear(cl_page);
577
578         if (state == CPS_OWNED)
579                 cl_page_state_set(env, cl_page, CPS_CACHED);
580         /*
581          * Completion call-backs are executed in the bottom-up order, so that
582          * uppermost layer (llite), responsible for VFS/VM interaction runs
583          * last and can release locks safely.
584          */
585         cl_page_slice_for_each_reverse(cl_page, slice, i) {
586                 if (slice->cpl_ops->cpo_disown != NULL)
587                         (*slice->cpl_ops->cpo_disown)(env, slice, io);
588         }
589
590         EXIT;
591 }
592
593 /**
594  * returns true, iff page is owned by the given io.
595  */
596 int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
597 {
598         struct cl_io *top = cl_io_top((struct cl_io *)io);
599         LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
600         ENTRY;
601         RETURN(pg->cp_state == CPS_OWNED && pg->cp_owner == top);
602 }
603 EXPORT_SYMBOL(cl_page_is_owned);
604
605 /**
606  * Try to own a page by IO.
607  *
608  * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
609  * into cl_page_state::CPS_OWNED state.
610  *
611  * \pre  !cl_page_is_owned(cl_page, io)
612  * \post result == 0 iff cl_page_is_owned(cl_page, io)
613  *
614  * \retval 0   success
615  *
616  * \retval -ve failure, e.g., cl_page was destroyed (and landed in
617  *             cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
618  *             or, page was owned by another thread, or in IO.
619  *
620  * \see cl_page_disown()
621  * \see cl_page_operations::cpo_own()
622  * \see cl_page_own_try()
623  * \see cl_page_own
624  */
625 static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
626                         struct cl_page *cl_page, int nonblock)
627 {
628         const struct cl_page_slice *slice;
629         int result = 0;
630         int i;
631
632         ENTRY;
633         PINVRNT(env, cl_page, !cl_page_is_owned(cl_page, io));
634         io = cl_io_top(io);
635
636         if (cl_page->cp_state == CPS_FREEING) {
637                 result = -ENOENT;
638                 goto out;
639         }
640
641         cl_page_slice_for_each(cl_page, slice, i) {
642                 if (slice->cpl_ops->cpo_own)
643                         result = (*slice->cpl_ops->cpo_own)(env, slice,
644                                                             io, nonblock);
645                 if (result != 0)
646                         break;
647         }
648         if (result > 0)
649                 result = 0;
650
651         if (result == 0) {
652                 PASSERT(env, cl_page, cl_page->cp_owner == NULL);
653                 cl_page->cp_owner = cl_io_top(io);
654                 cl_page_owner_set(cl_page);
655                 if (cl_page->cp_state != CPS_FREEING) {
656                         cl_page_state_set(env, cl_page, CPS_OWNED);
657                 } else {
658                         cl_page_disown0(env, io, cl_page);
659                         result = -ENOENT;
660                 }
661         }
662
663 out:
664         PINVRNT(env, cl_page, ergo(result == 0,
665                 cl_page_invariant(cl_page)));
666         RETURN(result);
667 }
668
669 /**
670  * Own a page, might be blocked.
671  *
672  * \see cl_page_own0()
673  */
674 int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
675 {
676         return cl_page_own0(env, io, pg, 0);
677 }
678 EXPORT_SYMBOL(cl_page_own);
679
680 /**
681  * Nonblock version of cl_page_own().
682  *
683  * \see cl_page_own0()
684  */
685 int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
686                     struct cl_page *pg)
687 {
688         return cl_page_own0(env, io, pg, 1);
689 }
690 EXPORT_SYMBOL(cl_page_own_try);
691
692
693 /**
694  * Assume page ownership.
695  *
696  * Called when page is already locked by the hosting VM.
697  *
698  * \pre !cl_page_is_owned(cl_page, io)
699  * \post cl_page_is_owned(cl_page, io)
700  *
701  * \see cl_page_operations::cpo_assume()
702  */
703 void cl_page_assume(const struct lu_env *env,
704                     struct cl_io *io, struct cl_page *cl_page)
705 {
706         const struct cl_page_slice *slice;
707         int i;
708
709         ENTRY;
710
711         PINVRNT(env, cl_page,
712                 cl_object_same(cl_page->cp_obj, io->ci_obj));
713         io = cl_io_top(io);
714
715         cl_page_slice_for_each(cl_page, slice, i) {
716                 if (slice->cpl_ops->cpo_assume != NULL)
717                         (*slice->cpl_ops->cpo_assume)(env, slice, io);
718         }
719
720         PASSERT(env, cl_page, cl_page->cp_owner == NULL);
721         cl_page->cp_owner = cl_io_top(io);
722         cl_page_owner_set(cl_page);
723         cl_page_state_set(env, cl_page, CPS_OWNED);
724         EXIT;
725 }
726 EXPORT_SYMBOL(cl_page_assume);
727
728 /**
729  * Releases page ownership without unlocking the page.
730  *
731  * Moves cl_page into cl_page_state::CPS_CACHED without releasing a lock
732  * on the underlying VM page (as VM is supposed to do this itself).
733  *
734  * \pre   cl_page_is_owned(cl_page, io)
735  * \post !cl_page_is_owned(cl_page, io)
736  *
737  * \see cl_page_assume()
738  */
739 void cl_page_unassume(const struct lu_env *env,
740                       struct cl_io *io, struct cl_page *cl_page)
741 {
742         const struct cl_page_slice *slice;
743         int i;
744
745         ENTRY;
746         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
747         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
748
749         io = cl_io_top(io);
750         cl_page_owner_clear(cl_page);
751         cl_page_state_set(env, cl_page, CPS_CACHED);
752
753         cl_page_slice_for_each_reverse(cl_page, slice, i) {
754                 if (slice->cpl_ops->cpo_unassume != NULL)
755                         (*slice->cpl_ops->cpo_unassume)(env, slice, io);
756         }
757
758         EXIT;
759 }
760 EXPORT_SYMBOL(cl_page_unassume);
761
762 /**
763  * Releases page ownership.
764  *
765  * Moves page into cl_page_state::CPS_CACHED.
766  *
767  * \pre   cl_page_is_owned(pg, io)
768  * \post !cl_page_is_owned(pg, io)
769  *
770  * \see cl_page_own()
771  * \see cl_page_operations::cpo_disown()
772  */
773 void cl_page_disown(const struct lu_env *env,
774                     struct cl_io *io, struct cl_page *pg)
775 {
776         PINVRNT(env, pg, cl_page_is_owned(pg, io) ||
777                 pg->cp_state == CPS_FREEING);
778
779         ENTRY;
780         io = cl_io_top(io);
781         cl_page_disown0(env, io, pg);
782         EXIT;
783 }
784 EXPORT_SYMBOL(cl_page_disown);
785
786 /**
787  * Called when cl_page is to be removed from the object, e.g.,
788  * as a result of truncate.
789  *
790  * Calls cl_page_operations::cpo_discard() top-to-bottom.
791  *
792  * \pre cl_page_is_owned(cl_page, io)
793  *
794  * \see cl_page_operations::cpo_discard()
795  */
796 void cl_page_discard(const struct lu_env *env,
797                      struct cl_io *io, struct cl_page *cl_page)
798 {
799         const struct cl_page_slice *slice;
800         int i;
801
802         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
803         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
804
805         cl_page_slice_for_each(cl_page, slice, i) {
806                 if (slice->cpl_ops->cpo_discard != NULL)
807                         (*slice->cpl_ops->cpo_discard)(env, slice, io);
808         }
809 }
810 EXPORT_SYMBOL(cl_page_discard);
811
812 /**
813  * Version of cl_page_delete() that can be called for not fully constructed
814  * cl_pages, e.g. in an error handling cl_page_find()->cl_page_delete0()
815  * path. Doesn't check cl_page invariant.
816  */
817 static void cl_page_delete0(const struct lu_env *env,
818                             struct cl_page *cl_page)
819 {
820         const struct cl_page_slice *slice;
821         int i;
822
823         ENTRY;
824
825         PASSERT(env, cl_page, cl_page->cp_state != CPS_FREEING);
826
827         /*
828          * Severe all ways to obtain new pointers to @pg.
829          */
830         cl_page_owner_clear(cl_page);
831         cl_page_state_set0(env, cl_page, CPS_FREEING);
832
833         cl_page_slice_for_each_reverse(cl_page, slice, i) {
834                 if (slice->cpl_ops->cpo_delete != NULL)
835                         (*slice->cpl_ops->cpo_delete)(env, slice);
836         }
837
838         EXIT;
839 }
840
841 /**
842  * Called when a decision is made to throw page out of memory.
843  *
844  * Notifies all layers about page destruction by calling
845  * cl_page_operations::cpo_delete() method top-to-bottom.
846  *
847  * Moves page into cl_page_state::CPS_FREEING state (this is the only place
848  * where transition to this state happens).
849  *
850  * Eliminates all venues through which new references to the page can be
851  * obtained:
852  *
853  *     - removes page from the radix trees,
854  *
855  *     - breaks linkage from VM page to cl_page.
856  *
857  * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
858  * drain after some time, at which point page will be recycled.
859  *
860  * \pre  VM page is locked
861  * \post pg->cp_state == CPS_FREEING
862  *
863  * \see cl_page_operations::cpo_delete()
864  */
865 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
866 {
867         PINVRNT(env, pg, cl_page_invariant(pg));
868         ENTRY;
869         cl_page_delete0(env, pg);
870         EXIT;
871 }
872 EXPORT_SYMBOL(cl_page_delete);
873
874 /**
875  * Marks page up-to-date.
876  *
877  * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
878  * layer responsible for VM interaction has to mark/clear page as up-to-date
879  * by the \a uptodate argument.
880  *
881  * \see cl_page_operations::cpo_export()
882  */
883 void cl_page_export(const struct lu_env *env, struct cl_page *cl_page,
884                     int uptodate)
885 {
886         const struct cl_page_slice *slice;
887         int i;
888
889         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
890
891         cl_page_slice_for_each(cl_page, slice, i) {
892                 if (slice->cpl_ops->cpo_export != NULL)
893                         (*slice->cpl_ops->cpo_export)(env, slice, uptodate);
894         }
895 }
896 EXPORT_SYMBOL(cl_page_export);
897
898 /**
899  * Returns true, if \a page is VM locked in a suitable sense by the calling
900  * thread.
901  */
902 int cl_page_is_vmlocked(const struct lu_env *env,
903                         const struct cl_page *cl_page)
904 {
905         const struct cl_page_slice *slice;
906         int result;
907
908         ENTRY;
909         slice = cl_page_slice_get(cl_page, 0);
910         PASSERT(env, cl_page, slice->cpl_ops->cpo_is_vmlocked != NULL);
911         /*
912          * Call ->cpo_is_vmlocked() directly instead of going through
913          * CL_PAGE_INVOKE(), because cl_page_is_vmlocked() is used by
914          * cl_page_invariant().
915          */
916         result = slice->cpl_ops->cpo_is_vmlocked(env, slice);
917         PASSERT(env, cl_page, result == -EBUSY || result == -ENODATA);
918
919         RETURN(result == -EBUSY);
920 }
921 EXPORT_SYMBOL(cl_page_is_vmlocked);
922
923 void cl_page_touch(const struct lu_env *env,
924                    const struct cl_page *cl_page, size_t to)
925 {
926         const struct cl_page_slice *slice;
927         int i;
928
929         ENTRY;
930
931         cl_page_slice_for_each(cl_page, slice, i) {
932                 if (slice->cpl_ops->cpo_page_touch != NULL)
933                         (*slice->cpl_ops->cpo_page_touch)(env, slice, to);
934         }
935
936         EXIT;
937 }
938 EXPORT_SYMBOL(cl_page_touch);
939
940 static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
941 {
942         ENTRY;
943         RETURN(crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN);
944 }
945
946 static void cl_page_io_start(const struct lu_env *env,
947                              struct cl_page *pg, enum cl_req_type crt)
948 {
949         /*
950          * Page is queued for IO, change its state.
951          */
952         ENTRY;
953         cl_page_owner_clear(pg);
954         cl_page_state_set(env, pg, cl_req_type_state(crt));
955         EXIT;
956 }
957
958 /**
959  * Prepares page for immediate transfer. cl_page_operations::cpo_prep() is
960  * called top-to-bottom. Every layer either agrees to submit this page (by
961  * returning 0), or requests to omit this page (by returning -EALREADY). Layer
962  * handling interactions with the VM also has to inform VM that page is under
963  * transfer now.
964  */
965 int cl_page_prep(const struct lu_env *env, struct cl_io *io,
966                  struct cl_page *cl_page, enum cl_req_type crt)
967 {
968         const struct cl_page_slice *slice;
969         int result = 0;
970         int i;
971
972         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
973         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
974         PINVRNT(env, cl_page, crt < CRT_NR);
975
976         /*
977          * this has to be called bottom-to-top, so that llite can set up
978          * PG_writeback without risking other layers deciding to skip this
979          * page.
980          */
981         if (crt >= CRT_NR)
982                 return -EINVAL;
983
984         cl_page_slice_for_each(cl_page, slice, i) {
985                 if (slice->cpl_ops->cpo_own)
986                         result = (*slice->cpl_ops->io[crt].cpo_prep)(env,
987                                                                      slice,
988                                                                      io);
989                 if (result != 0)
990                         break;
991         }
992
993         if (result >= 0) {
994                 result = 0;
995                 cl_page_io_start(env, cl_page, crt);
996         }
997
998         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", crt, result);
999         return result;
1000 }
1001 EXPORT_SYMBOL(cl_page_prep);
1002
1003 /**
1004  * Notify layers about transfer completion.
1005  *
1006  * Invoked by transfer sub-system (which is a part of osc) to notify layers
1007  * that a transfer, of which this page is a part of has completed.
1008  *
1009  * Completion call-backs are executed in the bottom-up order, so that
1010  * uppermost layer (llite), responsible for the VFS/VM interaction runs last
1011  * and can release locks safely.
1012  *
1013  * \pre  cl_page->cp_state == CPS_PAGEIN || cl_page->cp_state == CPS_PAGEOUT
1014  * \post cl_page->cl_page_state == CPS_CACHED
1015  *
1016  * \see cl_page_operations::cpo_completion()
1017  */
1018 void cl_page_completion(const struct lu_env *env,
1019                         struct cl_page *cl_page, enum cl_req_type crt,
1020                         int ioret)
1021 {
1022         const struct cl_page_slice *slice;
1023         struct cl_sync_io *anchor = cl_page->cp_sync_io;
1024         int i;
1025
1026         ENTRY;
1027         PASSERT(env, cl_page, crt < CRT_NR);
1028         PASSERT(env, cl_page, cl_page->cp_state == cl_req_type_state(crt));
1029
1030         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", crt, ioret);
1031         cl_page_state_set(env, cl_page, CPS_CACHED);
1032         if (crt >= CRT_NR)
1033                 return;
1034
1035         cl_page_slice_for_each_reverse(cl_page, slice, i) {
1036                 if (slice->cpl_ops->io[crt].cpo_completion != NULL)
1037                         (*slice->cpl_ops->io[crt].cpo_completion)(env, slice,
1038                                                                   ioret);
1039         }
1040
1041         if (anchor != NULL) {
1042                 LASSERT(cl_page->cp_sync_io == anchor);
1043                 cl_page->cp_sync_io = NULL;
1044                 cl_sync_io_note(env, anchor, ioret);
1045         }
1046         EXIT;
1047 }
1048 EXPORT_SYMBOL(cl_page_completion);
1049
1050 /**
1051  * Notify layers that transfer formation engine decided to yank this page from
1052  * the cache and to make it a part of a transfer.
1053  *
1054  * \pre  cl_page->cp_state == CPS_CACHED
1055  * \post cl_page->cp_state == CPS_PAGEIN || cl_page->cp_state == CPS_PAGEOUT
1056  *
1057  * \see cl_page_operations::cpo_make_ready()
1058  */
1059 int cl_page_make_ready(const struct lu_env *env, struct cl_page *cl_page,
1060                        enum cl_req_type crt)
1061 {
1062         const struct cl_page_slice *slice;
1063         int result = 0;
1064         int i;
1065
1066         ENTRY;
1067         PINVRNT(env, cl_page, crt < CRT_NR);
1068         if (crt >= CRT_NR)
1069                 RETURN(-EINVAL);
1070
1071         cl_page_slice_for_each(cl_page, slice, i) {
1072                 if (slice->cpl_ops->io[crt].cpo_make_ready != NULL)
1073                         result = (*slice->cpl_ops->io[crt].cpo_make_ready)(env, slice);
1074                 if (result != 0)
1075                         break;
1076         }
1077
1078         if (result >= 0) {
1079                 result = 0;
1080                 PASSERT(env, cl_page, cl_page->cp_state == CPS_CACHED);
1081                 cl_page_io_start(env, cl_page, crt);
1082         }
1083         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", crt, result);
1084
1085         RETURN(result);
1086 }
1087 EXPORT_SYMBOL(cl_page_make_ready);
1088
1089 /**
1090  * Called if a page is being written back by kernel's intention.
1091  *
1092  * \pre  cl_page_is_owned(cl_page, io)
1093  * \post ergo(result == 0, cl_page->cp_state == CPS_PAGEOUT)
1094  *
1095  * \see cl_page_operations::cpo_flush()
1096  */
1097 int cl_page_flush(const struct lu_env *env, struct cl_io *io,
1098                   struct cl_page *cl_page)
1099 {
1100         const struct cl_page_slice *slice;
1101         int result = 0;
1102         int i;
1103
1104         ENTRY;
1105         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
1106         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
1107
1108         cl_page_slice_for_each(cl_page, slice, i) {
1109                 if (slice->cpl_ops->cpo_flush != NULL)
1110                         result = (*slice->cpl_ops->cpo_flush)(env, slice, io);
1111                 if (result != 0)
1112                         break;
1113         }
1114         if (result > 0)
1115                 result = 0;
1116
1117         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d\n", result);
1118         RETURN(result);
1119 }
1120 EXPORT_SYMBOL(cl_page_flush);
1121
1122 /**
1123  * Tells transfer engine that only part of a page is to be transmitted.
1124  *
1125  * \see cl_page_operations::cpo_clip()
1126  */
1127 void cl_page_clip(const struct lu_env *env, struct cl_page *cl_page,
1128                   int from, int to)
1129 {
1130         const struct cl_page_slice *slice;
1131         int i;
1132
1133         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
1134
1135         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", from, to);
1136         cl_page_slice_for_each(cl_page, slice, i) {
1137                 if (slice->cpl_ops->cpo_clip != NULL)
1138                         (*slice->cpl_ops->cpo_clip)(env, slice, from, to);
1139         }
1140 }
1141 EXPORT_SYMBOL(cl_page_clip);
1142
1143 /**
1144  * Prints human readable representation of \a pg to the \a f.
1145  */
1146 void cl_page_header_print(const struct lu_env *env, void *cookie,
1147                           lu_printer_t printer, const struct cl_page *pg)
1148 {
1149         (*printer)(env, cookie,
1150                    "page@%p[%d %p %d %d %p]\n",
1151                    pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1152                    pg->cp_state, pg->cp_type,
1153                    pg->cp_owner);
1154 }
1155 EXPORT_SYMBOL(cl_page_header_print);
1156
1157 /**
1158  * Prints human readable representation of \a cl_page to the \a f.
1159  */
1160 void cl_page_print(const struct lu_env *env, void *cookie,
1161                    lu_printer_t printer, const struct cl_page *cl_page)
1162 {
1163         const struct cl_page_slice *slice;
1164         int result = 0;
1165         int i;
1166
1167         cl_page_header_print(env, cookie, printer, cl_page);
1168         cl_page_slice_for_each(cl_page, slice, i) {
1169                 if (slice->cpl_ops->cpo_print != NULL)
1170                         result = (*slice->cpl_ops->cpo_print)(env, slice,
1171                                                              cookie, printer);
1172                 if (result != 0)
1173                         break;
1174         }
1175         (*printer)(env, cookie, "end page@%p\n", cl_page);
1176 }
1177 EXPORT_SYMBOL(cl_page_print);
1178
1179 /**
1180  * Converts a byte offset within object \a obj into a page index.
1181  */
1182 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1183 {
1184         return (loff_t)idx << PAGE_SHIFT;
1185 }
1186 EXPORT_SYMBOL(cl_offset);
1187
1188 /**
1189  * Converts a page index into a byte offset within object \a obj.
1190  */
1191 pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1192 {
1193         return offset >> PAGE_SHIFT;
1194 }
1195 EXPORT_SYMBOL(cl_index);
1196
1197 size_t cl_page_size(const struct cl_object *obj)
1198 {
1199         return 1UL << PAGE_SHIFT;
1200 }
1201 EXPORT_SYMBOL(cl_page_size);
1202
1203 /**
1204  * Adds page slice to the compound page.
1205  *
1206  * This is called by cl_object_operations::coo_page_init() methods to add a
1207  * per-layer state to the page. New state is added at the end of
1208  * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1209  *
1210  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1211  */
1212 void cl_page_slice_add(struct cl_page *cl_page, struct cl_page_slice *slice,
1213                        struct cl_object *obj,
1214                        const struct cl_page_operations *ops)
1215 {
1216         unsigned int offset = (char *)slice -
1217                         ((char *)cl_page + sizeof(*cl_page));
1218
1219         ENTRY;
1220         LASSERT(cl_page->cp_layer_count < CP_MAX_LAYER);
1221         LASSERT(offset < (1 << sizeof(cl_page->cp_layer_offset[0]) * 8));
1222         cl_page->cp_layer_offset[cl_page->cp_layer_count++] = offset;
1223         slice->cpl_obj  = obj;
1224         slice->cpl_ops  = ops;
1225         slice->cpl_page = cl_page;
1226
1227         EXIT;
1228 }
1229 EXPORT_SYMBOL(cl_page_slice_add);
1230
1231 /**
1232  * Allocate and initialize cl_cache, called by ll_init_sbi().
1233  */
1234 struct cl_client_cache *cl_cache_init(unsigned long lru_page_max)
1235 {
1236         struct cl_client_cache  *cache = NULL;
1237
1238         ENTRY;
1239         OBD_ALLOC(cache, sizeof(*cache));
1240         if (cache == NULL)
1241                 RETURN(NULL);
1242
1243         /* Initialize cache data */
1244         atomic_set(&cache->ccc_users, 1);
1245         cache->ccc_lru_max = lru_page_max;
1246         atomic_long_set(&cache->ccc_lru_left, lru_page_max);
1247         spin_lock_init(&cache->ccc_lru_lock);
1248         INIT_LIST_HEAD(&cache->ccc_lru);
1249
1250         /* turn unstable check off by default as it impacts performance */
1251         cache->ccc_unstable_check = 0;
1252         atomic_long_set(&cache->ccc_unstable_nr, 0);
1253         init_waitqueue_head(&cache->ccc_unstable_waitq);
1254         mutex_init(&cache->ccc_max_cache_mb_lock);
1255
1256         RETURN(cache);
1257 }
1258 EXPORT_SYMBOL(cl_cache_init);
1259
1260 /**
1261  * Increase cl_cache refcount
1262  */
1263 void cl_cache_incref(struct cl_client_cache *cache)
1264 {
1265         atomic_inc(&cache->ccc_users);
1266 }
1267 EXPORT_SYMBOL(cl_cache_incref);
1268
1269 /**
1270  * Decrease cl_cache refcount and free the cache if refcount=0.
1271  * Since llite, lov and osc all hold cl_cache refcount,
1272  * the free will not cause race. (LU-6173)
1273  */
1274 void cl_cache_decref(struct cl_client_cache *cache)
1275 {
1276         if (atomic_dec_and_test(&cache->ccc_users))
1277                 OBD_FREE(cache, sizeof(*cache));
1278 }
1279 EXPORT_SYMBOL(cl_cache_decref);