Whamcloud - gitweb
LU-10994 clio: remove unused convenience functions
[fs/lustre-release.git] / lustre / obdclass / cl_page.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * Client Lustre Page.
32  *
33  *   Author: Nikita Danilov <nikita.danilov@sun.com>
34  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/list.h>
40 #include <libcfs/libcfs.h>
41 #include <obd_class.h>
42 #include <obd_support.h>
43
44 #include <cl_object.h>
45 #include "cl_internal.h"
46
47 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg);
48 static DEFINE_MUTEX(cl_page_kmem_mutex);
49
50 #ifdef LIBCFS_DEBUG
51 # define PASSERT(env, page, expr)                                       \
52   do {                                                                    \
53           if (unlikely(!(expr))) {                                      \
54                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
55                   LASSERT(0);                                           \
56           }                                                             \
57   } while (0)
58 #else /* !LIBCFS_DEBUG */
59 # define PASSERT(env, page, exp) \
60         ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
61 #endif /* !LIBCFS_DEBUG */
62
63 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
64 # define PINVRNT(env, page, expr)                                       \
65   do {                                                                    \
66           if (unlikely(!(expr))) {                                      \
67                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
68                   LINVRNT(0);                                           \
69           }                                                             \
70   } while (0)
71 #else /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
72 # define PINVRNT(env, page, exp) \
73          ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
74 #endif /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
75
76 /* Disable page statistic by default due to huge performance penalty. */
77 static void cs_page_inc(const struct cl_object *obj,
78                         enum cache_stats_item item)
79 {
80 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
81         atomic_inc(&cl_object_site(obj)->cs_pages.cs_stats[item]);
82 #endif
83 }
84
85 static void cs_page_dec(const struct cl_object *obj,
86                         enum cache_stats_item item)
87 {
88 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
89         atomic_dec(&cl_object_site(obj)->cs_pages.cs_stats[item]);
90 #endif
91 }
92
93 static void cs_pagestate_inc(const struct cl_object *obj,
94                              enum cl_page_state state)
95 {
96 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
97         atomic_inc(&cl_object_site(obj)->cs_pages_state[state]);
98 #endif
99 }
100
101 static void cs_pagestate_dec(const struct cl_object *obj,
102                               enum cl_page_state state)
103 {
104 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
105         atomic_dec(&cl_object_site(obj)->cs_pages_state[state]);
106 #endif
107 }
108
109 /**
110  * Internal version of cl_page_get().
111  *
112  * This function can be used to obtain initial reference to previously
113  * unreferenced cached object. It can be called only if concurrent page
114  * reclamation is somehow prevented, e.g., by keeping a lock on a VM page,
115  * associated with \a page.
116  *
117  * Use with care! Not exported.
118  */
119 static void cl_page_get_trust(struct cl_page *page)
120 {
121         LASSERT(atomic_read(&page->cp_ref) > 0);
122         atomic_inc(&page->cp_ref);
123 }
124
125 static struct cl_page_slice *
126 cl_page_slice_get(const struct cl_page *cl_page, int index)
127 {
128         if (index < 0 || index >= cl_page->cp_layer_count)
129                 return NULL;
130
131         /* To get the cp_layer_offset values fit under 256 bytes, we
132          * use the offset beyond the end of struct cl_page.
133          */
134         return (struct cl_page_slice *)((char *)cl_page + sizeof(*cl_page) +
135                                         cl_page->cp_layer_offset[index]);
136 }
137
138 #define cl_page_slice_for_each(cl_page, slice, i)               \
139         for (i = 0, slice = cl_page_slice_get(cl_page, 0);      \
140              i < (cl_page)->cp_layer_count;                     \
141              slice = cl_page_slice_get(cl_page, ++i))
142
143 #define cl_page_slice_for_each_reverse(cl_page, slice, i)       \
144         for (i = (cl_page)->cp_layer_count - 1,                 \
145              slice = cl_page_slice_get(cl_page, i); i >= 0;     \
146              slice = cl_page_slice_get(cl_page, --i))
147
148 static void __cl_page_free(struct cl_page *cl_page, unsigned short bufsize)
149 {
150         int index = cl_page->cp_kmem_index;
151
152         if (index >= 0) {
153                 LASSERT(index < ARRAY_SIZE(cl_page_kmem_array));
154                 LASSERT(cl_page_kmem_size_array[index] == bufsize);
155                 OBD_SLAB_FREE(cl_page, cl_page_kmem_array[index], bufsize);
156         } else {
157                 OBD_FREE(cl_page, bufsize);
158         }
159 }
160
161 static void cl_page_free(const struct lu_env *env, struct cl_page *cp,
162                          struct pagevec *pvec)
163 {
164         struct cl_object *obj  = cp->cp_obj;
165         unsigned short bufsize = cl_object_header(obj)->coh_page_bufsize;
166         struct page *vmpage;
167
168         ENTRY;
169         PASSERT(env, cp, list_empty(&cp->cp_batch));
170         PASSERT(env, cp, cp->cp_owner == NULL);
171         PASSERT(env, cp, cp->cp_state == CPS_FREEING);
172
173         if (cp->cp_type == CPT_CACHEABLE) {
174                 /* vmpage->private was already cleared when page was
175                  * moved into CPS_FREEING state. */
176                 vmpage = cp->cp_vmpage;
177                 LASSERT(vmpage != NULL);
178                 LASSERT((struct cl_page *)vmpage->private != cp);
179
180                 if (pvec != NULL) {
181                         if (!pagevec_add(pvec, vmpage))
182                                 pagevec_release(pvec);
183                 } else {
184                         put_page(vmpage);
185                 }
186         }
187
188         cp->cp_layer_count = 0;
189         cs_page_dec(obj, CS_total);
190         cs_pagestate_dec(obj, cp->cp_state);
191         lu_object_ref_del_at(&obj->co_lu, &cp->cp_obj_ref, "cl_page", cp);
192         if (cp->cp_type != CPT_TRANSIENT)
193                 cl_object_put(env, obj);
194         lu_ref_fini(&cp->cp_reference);
195         __cl_page_free(cp, bufsize);
196         EXIT;
197 }
198
199 static struct cl_page *__cl_page_alloc(struct cl_object *o)
200 {
201         int i = 0;
202         struct cl_page *cl_page = NULL;
203         unsigned short bufsize = cl_object_header(o)->coh_page_bufsize;
204
205         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PAGE_ALLOC))
206                 return NULL;
207
208 check:
209         /* the number of entries in cl_page_kmem_array is expected to
210          * only be 2-3 entries, so the lookup overhead should be low.
211          */
212         for ( ; i < ARRAY_SIZE(cl_page_kmem_array); i++) {
213                 if (smp_load_acquire(&cl_page_kmem_size_array[i])
214                     == bufsize) {
215                         OBD_SLAB_ALLOC_GFP(cl_page, cl_page_kmem_array[i],
216                                            bufsize, GFP_NOFS);
217                         if (cl_page)
218                                 cl_page->cp_kmem_index = i;
219                         return cl_page;
220                 }
221                 if (cl_page_kmem_size_array[i] == 0)
222                         break;
223         }
224
225         if (i < ARRAY_SIZE(cl_page_kmem_array)) {
226                 char cache_name[32];
227
228                 mutex_lock(&cl_page_kmem_mutex);
229                 if (cl_page_kmem_size_array[i]) {
230                         mutex_unlock(&cl_page_kmem_mutex);
231                         goto check;
232                 }
233                 snprintf(cache_name, sizeof(cache_name),
234                          "cl_page_kmem-%u", bufsize);
235                 cl_page_kmem_array[i] =
236                         kmem_cache_create(cache_name, bufsize,
237                                           0, 0, NULL);
238                 if (cl_page_kmem_array[i] == NULL) {
239                         mutex_unlock(&cl_page_kmem_mutex);
240                         return NULL;
241                 }
242                 smp_store_release(&cl_page_kmem_size_array[i],
243                                   bufsize);
244                 mutex_unlock(&cl_page_kmem_mutex);
245                 goto check;
246         } else {
247                 OBD_ALLOC_GFP(cl_page, bufsize, GFP_NOFS);
248                 if (cl_page)
249                         cl_page->cp_kmem_index = -1;
250         }
251
252         return cl_page;
253 }
254
255 struct cl_page *cl_page_alloc(const struct lu_env *env, struct cl_object *o,
256                               pgoff_t ind, struct page *vmpage,
257                               enum cl_page_type type)
258 {
259         struct cl_page *cl_page;
260         struct cl_object *head;
261
262         ENTRY;
263
264         cl_page = __cl_page_alloc(o);
265         if (cl_page != NULL) {
266                 int result = 0;
267
268                 /*
269                  * Please fix cl_page:cp_state/type declaration if
270                  * these assertions fail in the future.
271                  */
272                 BUILD_BUG_ON((1 << CP_STATE_BITS) < CPS_NR); /* cp_state */
273                 BUILD_BUG_ON((1 << CP_TYPE_BITS) < CPT_NR); /* cp_type */
274                 atomic_set(&cl_page->cp_ref, 1);
275                 cl_page->cp_obj = o;
276                 if (type != CPT_TRANSIENT)
277                         cl_object_get(o);
278                 lu_object_ref_add_at(&o->co_lu, &cl_page->cp_obj_ref,
279                                      "cl_page", cl_page);
280                 cl_page->cp_vmpage = vmpage;
281                 cl_page->cp_state = CPS_CACHED;
282                 cl_page->cp_type = type;
283                 if (type == CPT_TRANSIENT)
284                         /* ref to correct inode will be added
285                          * in ll_direct_rw_pages
286                          */
287                         cl_page->cp_inode = NULL;
288                 else
289                         cl_page->cp_inode = page2inode(vmpage);
290                 INIT_LIST_HEAD(&cl_page->cp_batch);
291                 lu_ref_init(&cl_page->cp_reference);
292                 head = o;
293                 cl_page->cp_page_index = ind;
294                 cl_object_for_each(o, head) {
295                         if (o->co_ops->coo_page_init != NULL) {
296                                 result = o->co_ops->coo_page_init(env, o,
297                                                         cl_page, ind);
298                                 if (result != 0) {
299                                         cl_page_delete0(env, cl_page);
300                                         cl_page_free(env, cl_page, NULL);
301                                         cl_page = ERR_PTR(result);
302                                         break;
303                                 }
304                         }
305                 }
306                 if (result == 0) {
307                         cs_page_inc(o, CS_total);
308                         cs_page_inc(o, CS_create);
309                         cs_pagestate_dec(o, CPS_CACHED);
310                 }
311         } else {
312                 cl_page = ERR_PTR(-ENOMEM);
313         }
314         RETURN(cl_page);
315 }
316
317 /**
318  * Returns a cl_page with index \a idx at the object \a o, and associated with
319  * the VM page \a vmpage.
320  *
321  * This is the main entry point into the cl_page caching interface. First, a
322  * cache (implemented as a per-object radix tree) is consulted. If page is
323  * found there, it is returned immediately. Otherwise new page is allocated
324  * and returned. In any case, additional reference to page is acquired.
325  *
326  * \see cl_object_find(), cl_lock_find()
327  */
328 struct cl_page *cl_page_find(const struct lu_env *env,
329                              struct cl_object *o,
330                              pgoff_t idx, struct page *vmpage,
331                              enum cl_page_type type)
332 {
333         struct cl_page          *page = NULL;
334         struct cl_object_header *hdr;
335
336         LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
337         might_sleep();
338
339         ENTRY;
340
341         hdr = cl_object_header(o);
342         cs_page_inc(o, CS_lookup);
343
344         CDEBUG(D_PAGE, "%lu@"DFID" %p %lx %d\n",
345                idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
346         /* fast path. */
347         if (type == CPT_CACHEABLE) {
348                 /* vmpage lock is used to protect the child/parent
349                  * relationship */
350                 LASSERT(PageLocked(vmpage));
351                 /*
352                  * cl_vmpage_page() can be called here without any locks as
353                  *
354                  *     - "vmpage" is locked (which prevents ->private from
355                  *       concurrent updates), and
356                  *
357                  *     - "o" cannot be destroyed while current thread holds a
358                  *       reference on it.
359                  */
360                 page = cl_vmpage_page(vmpage, o);
361                 if (page != NULL) {
362                         cs_page_inc(o, CS_hit);
363                         RETURN(page);
364                 }
365         }
366
367         /* allocate and initialize cl_page */
368         page = cl_page_alloc(env, o, idx, vmpage, type);
369         RETURN(page);
370 }
371 EXPORT_SYMBOL(cl_page_find);
372
373 static inline int cl_page_invariant(const struct cl_page *pg)
374 {
375         return cl_page_in_use_noref(pg);
376 }
377
378 static void cl_page_state_set0(const struct lu_env *env,
379                                struct cl_page *cl_page,
380                                enum cl_page_state state)
381 {
382         enum cl_page_state old;
383
384         /*
385          * Matrix of allowed state transitions [old][new], for sanity
386          * checking.
387          */
388         static const int allowed_transitions[CPS_NR][CPS_NR] = {
389                 [CPS_CACHED] = {
390                         [CPS_CACHED]  = 0,
391                         [CPS_OWNED]   = 1, /* io finds existing cached page */
392                         [CPS_PAGEIN]  = 0,
393                         [CPS_PAGEOUT] = 1, /* write-out from the cache */
394                         [CPS_FREEING] = 1, /* eviction on the memory pressure */
395                 },
396                 [CPS_OWNED] = {
397                         [CPS_CACHED]  = 1, /* release to the cache */
398                         [CPS_OWNED]   = 0,
399                         [CPS_PAGEIN]  = 1, /* start read immediately */
400                         [CPS_PAGEOUT] = 1, /* start write immediately */
401                         [CPS_FREEING] = 1, /* lock invalidation or truncate */
402                 },
403                 [CPS_PAGEIN] = {
404                         [CPS_CACHED]  = 1, /* io completion */
405                         [CPS_OWNED]   = 0,
406                         [CPS_PAGEIN]  = 0,
407                         [CPS_PAGEOUT] = 0,
408                         [CPS_FREEING] = 0,
409                 },
410                 [CPS_PAGEOUT] = {
411                         [CPS_CACHED]  = 1, /* io completion */
412                         [CPS_OWNED]   = 0,
413                         [CPS_PAGEIN]  = 0,
414                         [CPS_PAGEOUT] = 0,
415                         [CPS_FREEING] = 0,
416                 },
417                 [CPS_FREEING] = {
418                         [CPS_CACHED]  = 0,
419                         [CPS_OWNED]   = 0,
420                         [CPS_PAGEIN]  = 0,
421                         [CPS_PAGEOUT] = 0,
422                         [CPS_FREEING] = 0,
423                 }
424         };
425
426         ENTRY;
427         old = cl_page->cp_state;
428         PASSERT(env, cl_page, allowed_transitions[old][state]);
429         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d -> %d\n", old, state);
430         PASSERT(env, cl_page, cl_page->cp_state == old);
431         PASSERT(env, cl_page, equi(state == CPS_OWNED,
432                                    cl_page->cp_owner != NULL));
433
434         cs_pagestate_dec(cl_page->cp_obj, cl_page->cp_state);
435         cs_pagestate_inc(cl_page->cp_obj, state);
436         cl_page->cp_state = state;
437         EXIT;
438 }
439
440 static void cl_page_state_set(const struct lu_env *env,
441                               struct cl_page *page, enum cl_page_state state)
442 {
443         cl_page_state_set0(env, page, state);
444 }
445
446 /**
447  * Acquires an additional reference to a page.
448  *
449  * This can be called only by caller already possessing a reference to \a
450  * page.
451  *
452  * \see cl_object_get(), cl_lock_get().
453  */
454 void cl_page_get(struct cl_page *page)
455 {
456         ENTRY;
457         cl_page_get_trust(page);
458         EXIT;
459 }
460 EXPORT_SYMBOL(cl_page_get);
461
462 /**
463  * Releases a reference to a page, use the pagevec to release the pages
464  * in batch if provided.
465  *
466  * Users need to do a final pagevec_release() to release any trailing pages.
467  */
468 void cl_pagevec_put(const struct lu_env *env, struct cl_page *page,
469                   struct pagevec *pvec)
470 {
471         ENTRY;
472         CL_PAGE_HEADER(D_TRACE, env, page, "%d\n",
473                        atomic_read(&page->cp_ref));
474
475         if (atomic_dec_and_test(&page->cp_ref)) {
476                 LASSERT(page->cp_state == CPS_FREEING);
477
478                 LASSERT(atomic_read(&page->cp_ref) == 0);
479                 PASSERT(env, page, page->cp_owner == NULL);
480                 PASSERT(env, page, list_empty(&page->cp_batch));
481                 /*
482                  * Page is no longer reachable by other threads. Tear
483                  * it down.
484                  */
485                 cl_page_free(env, page, pvec);
486         }
487
488         EXIT;
489 }
490 EXPORT_SYMBOL(cl_pagevec_put);
491
492 /**
493  * Releases a reference to a page, wrapper to cl_pagevec_put
494  *
495  * When last reference is released, page is returned to the cache, unless it
496  * is in cl_page_state::CPS_FREEING state, in which case it is immediately
497  * destroyed.
498  *
499  * \see cl_object_put(), cl_lock_put().
500  */
501 void cl_page_put(const struct lu_env *env, struct cl_page *page)
502 {
503         cl_pagevec_put(env, page, NULL);
504 }
505 EXPORT_SYMBOL(cl_page_put);
506
507 /**
508  * Returns a cl_page associated with a VM page, and given cl_object.
509  */
510 struct cl_page *cl_vmpage_page(struct page *vmpage, struct cl_object *obj)
511 {
512         struct cl_page *page;
513
514         ENTRY;
515         LASSERT(PageLocked(vmpage));
516
517         /*
518          * NOTE: absence of races and liveness of data are guaranteed by page
519          *       lock on a "vmpage". That works because object destruction has
520          *       bottom-to-top pass.
521          */
522
523         page = (struct cl_page *)vmpage->private;
524         if (page != NULL) {
525                 cl_page_get_trust(page);
526                 LASSERT(page->cp_type == CPT_CACHEABLE);
527         }
528         RETURN(page);
529 }
530 EXPORT_SYMBOL(cl_vmpage_page);
531
532 static void cl_page_owner_clear(struct cl_page *page)
533 {
534         ENTRY;
535         if (page->cp_owner != NULL) {
536                 LASSERT(page->cp_owner->ci_owned_nr > 0);
537                 page->cp_owner->ci_owned_nr--;
538                 page->cp_owner = NULL;
539         }
540         EXIT;
541 }
542
543 static void cl_page_owner_set(struct cl_page *page)
544 {
545         ENTRY;
546         LASSERT(page->cp_owner != NULL);
547         page->cp_owner->ci_owned_nr++;
548         EXIT;
549 }
550
551 void cl_page_disown0(const struct lu_env *env, struct cl_page *cp)
552 {
553         struct page *vmpage;
554         enum cl_page_state state;
555
556         ENTRY;
557         state = cp->cp_state;
558         PINVRNT(env, cp, state == CPS_OWNED || state == CPS_FREEING);
559         PINVRNT(env, cp, cl_page_invariant(cp) || state == CPS_FREEING);
560         cl_page_owner_clear(cp);
561
562         if (state == CPS_OWNED)
563                 cl_page_state_set(env, cp, CPS_CACHED);
564
565         if (cp->cp_type == CPT_CACHEABLE) {
566                 vmpage = cp->cp_vmpage;
567                 LASSERT(vmpage != NULL);
568                 LASSERT(PageLocked(vmpage));
569                 unlock_page(vmpage);
570         }
571
572         EXIT;
573 }
574
575 /**
576  * returns true, iff page is owned by the given io.
577  */
578 int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
579 {
580         struct cl_io *top = cl_io_top((struct cl_io *)io);
581         LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
582         ENTRY;
583         RETURN(pg->cp_state == CPS_OWNED && pg->cp_owner == top);
584 }
585 EXPORT_SYMBOL(cl_page_is_owned);
586
587 /**
588  * Try to own a page by IO.
589  *
590  * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
591  * into cl_page_state::CPS_OWNED state.
592  *
593  * \pre  !cl_page_is_owned(cl_page, io)
594  * \post result == 0 iff cl_page_is_owned(cl_page, io)
595  *
596  * \retval 0   success
597  *
598  * \retval -ve failure, e.g., cl_page was destroyed (and landed in
599  *             cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
600  *             or, page was owned by another thread, or in IO.
601  *
602  * \see cl_page_disown()
603  * \see cl_page_own_try()
604  * \see cl_page_own
605  */
606 static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
607                         struct cl_page *cl_page, int nonblock)
608 {
609         struct page *vmpage = cl_page->cp_vmpage;
610         int result;
611
612         ENTRY;
613         PINVRNT(env, cl_page, !cl_page_is_owned(cl_page, io));
614
615         if (cl_page->cp_state == CPS_FREEING) {
616                 result = -ENOENT;
617                 goto out;
618         }
619
620         LASSERT(vmpage != NULL);
621
622         if (cl_page->cp_type == CPT_TRANSIENT) {
623                 /* OK */
624         } else if (nonblock) {
625                 if (!trylock_page(vmpage)) {
626                         result = -EAGAIN;
627                         goto out;
628                 }
629
630                 if (unlikely(PageWriteback(vmpage))) {
631                         unlock_page(vmpage);
632                         result = -EAGAIN;
633                         goto out;
634                 }
635         } else {
636                 lock_page(vmpage);
637                 wait_on_page_writeback(vmpage);
638         }
639
640         PASSERT(env, cl_page, cl_page->cp_owner == NULL);
641         cl_page->cp_owner = cl_io_top(io);
642         cl_page_owner_set(cl_page);
643
644         if (cl_page->cp_state == CPS_FREEING) {
645                 cl_page_disown0(env, cl_page);
646                 result = -ENOENT;
647                 goto out;
648         }
649
650         cl_page_state_set(env, cl_page, CPS_OWNED);
651         result = 0;
652 out:
653         PINVRNT(env, cl_page, ergo(result == 0,
654                 cl_page_invariant(cl_page)));
655         RETURN(result);
656 }
657
658 /**
659  * Own a page, might be blocked.
660  *
661  * \see cl_page_own0()
662  */
663 int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
664 {
665         return cl_page_own0(env, io, pg, 0);
666 }
667 EXPORT_SYMBOL(cl_page_own);
668
669 /**
670  * Nonblock version of cl_page_own().
671  *
672  * \see cl_page_own0()
673  */
674 int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
675                     struct cl_page *pg)
676 {
677         return cl_page_own0(env, io, pg, 1);
678 }
679 EXPORT_SYMBOL(cl_page_own_try);
680
681
682 /**
683  * Assume page ownership.
684  *
685  * Called when page is already locked by the hosting VM.
686  *
687  * \pre !cl_page_is_owned(cp, io)
688  * \post cl_page_is_owned(cp, io)
689  */
690 void cl_page_assume(const struct lu_env *env,
691                     struct cl_io *io, struct cl_page *cp)
692 {
693         struct page *vmpage;
694
695         ENTRY;
696         PINVRNT(env, cp, cl_object_same(cp->cp_obj, io->ci_obj));
697
698         if (cp->cp_type == CPT_CACHEABLE) {
699                 vmpage = cp->cp_vmpage;
700                 LASSERT(vmpage != NULL);
701                 LASSERT(PageLocked(vmpage));
702                 wait_on_page_writeback(vmpage);
703         }
704
705         PASSERT(env, cp, cp->cp_owner == NULL);
706         cp->cp_owner = cl_io_top(io);
707         cl_page_owner_set(cp);
708         cl_page_state_set(env, cp, CPS_OWNED);
709         EXIT;
710 }
711 EXPORT_SYMBOL(cl_page_assume);
712
713 /**
714  * Releases page ownership without unlocking the page.
715  *
716  * Moves cl_page into cl_page_state::CPS_CACHED without releasing a lock
717  * on the underlying VM page (as VM is supposed to do this itself).
718  *
719  * \pre   cl_page_is_owned(cp, io)
720  * \post !cl_page_is_owned(cp, io)
721  */
722 void cl_page_unassume(const struct lu_env *env,
723                       struct cl_io *io, struct cl_page *cp)
724 {
725         struct page *vmpage;
726
727         ENTRY;
728         PINVRNT(env, cp, cl_page_is_owned(cp, io));
729         PINVRNT(env, cp, cl_page_invariant(cp));
730
731         cl_page_owner_clear(cp);
732         cl_page_state_set(env, cp, CPS_CACHED);
733
734         if (cp->cp_type == CPT_CACHEABLE) {
735                 vmpage = cp->cp_vmpage;
736                 LASSERT(vmpage != NULL);
737                 LASSERT(PageLocked(vmpage));
738         }
739
740         EXIT;
741 }
742 EXPORT_SYMBOL(cl_page_unassume);
743
744 /**
745  * Releases page ownership.
746  *
747  * Moves page into cl_page_state::CPS_CACHED.
748  *
749  * \pre   cl_page_is_owned(pg, io)
750  * \post !cl_page_is_owned(pg, io)
751  *
752  * \see cl_page_own()
753  */
754 void cl_page_disown(const struct lu_env *env,
755                     struct cl_io *io, struct cl_page *pg)
756 {
757         PINVRNT(env, pg, cl_page_is_owned(pg, io) ||
758                 pg->cp_state == CPS_FREEING);
759
760         cl_page_disown0(env, pg);
761 }
762 EXPORT_SYMBOL(cl_page_disown);
763
764 /**
765  * Called when cl_page is to be removed from the object, e.g.,
766  * as a result of truncate.
767  *
768  * Calls cl_page_operations::cpo_discard() top-to-bottom.
769  *
770  * \pre cl_page_is_owned(cl_page, io)
771  *
772  * \see cl_page_operations::cpo_discard()
773  */
774 void cl_page_discard(const struct lu_env *env,
775                      struct cl_io *io, struct cl_page *cp)
776 {
777         struct page *vmpage;
778         const struct cl_page_slice *slice;
779         int i;
780
781         PINVRNT(env, cp, cl_page_is_owned(cp, io));
782         PINVRNT(env, cp, cl_page_invariant(cp));
783
784         cl_page_slice_for_each(cp, slice, i) {
785                 if (slice->cpl_ops->cpo_discard != NULL)
786                         (*slice->cpl_ops->cpo_discard)(env, slice, io);
787         }
788
789         if (cp->cp_type == CPT_CACHEABLE) {
790                 vmpage = cp->cp_vmpage;
791                 LASSERT(vmpage != NULL);
792                 LASSERT(PageLocked(vmpage));
793                 generic_error_remove_page(vmpage->mapping, vmpage);
794         } else {
795                 cl_page_delete(env, cp);
796         }
797 }
798 EXPORT_SYMBOL(cl_page_discard);
799
800 /**
801  * Version of cl_page_delete() that can be called for not fully constructed
802  * cl_pages, e.g. in an error handling cl_page_find()->cl_page_delete0()
803  * path. Doesn't check cl_page invariant.
804  */
805 static void cl_page_delete0(const struct lu_env *env, struct cl_page *cp)
806 {
807         struct page *vmpage;
808         const struct cl_page_slice *slice;
809         int refc;
810         int i;
811
812         ENTRY;
813         PASSERT(env, cp, cp->cp_state != CPS_FREEING);
814
815         /*
816          * Severe all ways to obtain new pointers to @pg.
817          */
818         cl_page_owner_clear(cp);
819         cl_page_state_set0(env, cp, CPS_FREEING);
820
821         cl_page_slice_for_each_reverse(cp, slice, i) {
822                 if (slice->cpl_ops->cpo_delete != NULL)
823                         (*slice->cpl_ops->cpo_delete)(env, slice);
824         }
825
826         if (cp->cp_type == CPT_CACHEABLE) {
827                 vmpage = cp->cp_vmpage;
828                 LASSERT(PageLocked(vmpage));
829                 LASSERT((struct cl_page *)vmpage->private == cp);
830
831                 /* Drop the reference count held in vvp_page_init */
832                 refc = atomic_dec_return(&cp->cp_ref);
833                 LASSERTF(refc >= 1, "page = %p, refc = %d\n", cp, refc);
834
835                 ClearPagePrivate(vmpage);
836                 vmpage->private = 0;
837
838                 /*
839                  * The reference from vmpage to cl_page is removed,
840                  * but the reference back is still here. It is removed
841                  * later in cl_page_free().
842                  */
843         }
844
845         EXIT;
846 }
847
848 /**
849  * Called when a decision is made to throw page out of memory.
850  *
851  * Notifies all layers about page destruction by calling
852  * cl_page_operations::cpo_delete() method top-to-bottom.
853  *
854  * Moves page into cl_page_state::CPS_FREEING state (this is the only place
855  * where transition to this state happens).
856  *
857  * Eliminates all venues through which new references to the page can be
858  * obtained:
859  *
860  *     - removes page from the radix trees,
861  *
862  *     - breaks linkage from VM page to cl_page.
863  *
864  * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
865  * drain after some time, at which point page will be recycled.
866  *
867  * \pre  VM page is locked
868  * \post pg->cp_state == CPS_FREEING
869  *
870  * \see cl_page_operations::cpo_delete()
871  */
872 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
873 {
874         PINVRNT(env, pg, cl_page_invariant(pg));
875         ENTRY;
876         cl_page_delete0(env, pg);
877         EXIT;
878 }
879 EXPORT_SYMBOL(cl_page_delete);
880
881 void cl_page_touch(const struct lu_env *env,
882                    const struct cl_page *cl_page, size_t to)
883 {
884         const struct cl_page_slice *slice;
885         int i;
886
887         ENTRY;
888
889         cl_page_slice_for_each(cl_page, slice, i) {
890                 if (slice->cpl_ops->cpo_page_touch != NULL)
891                         (*slice->cpl_ops->cpo_page_touch)(env, slice, to);
892         }
893
894         EXIT;
895 }
896 EXPORT_SYMBOL(cl_page_touch);
897
898 static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
899 {
900         ENTRY;
901         RETURN(crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN);
902 }
903
904 static void cl_page_io_start(const struct lu_env *env,
905                              struct cl_page *pg, enum cl_req_type crt)
906 {
907         /*
908          * Page is queued for IO, change its state.
909          */
910         ENTRY;
911         cl_page_owner_clear(pg);
912         cl_page_state_set(env, pg, cl_req_type_state(crt));
913         EXIT;
914 }
915
916 /**
917  * Prepares page for immediate transfer. Return -EALREADY if this page
918  * should be omitted from transfer.
919  */
920 int cl_page_prep(const struct lu_env *env, struct cl_io *io,
921                  struct cl_page *cp, enum cl_req_type crt)
922 {
923         struct page *vmpage = cp->cp_vmpage;
924         int rc;
925
926         PASSERT(env, cp, crt < CRT_NR);
927         PINVRNT(env, cp, cl_page_is_owned(cp, io));
928         PINVRNT(env, cp, cl_page_invariant(cp));
929
930         if (cp->cp_type == CPT_TRANSIENT) {
931                 /* Nothing to do. */
932         } else if (crt == CRT_READ) {
933                 if (PageUptodate(vmpage))
934                         GOTO(out, rc = -EALREADY);
935         } else {
936                 LASSERT(PageLocked(vmpage));
937                 LASSERT(!PageDirty(vmpage));
938
939                 /* ll_writepage path is not a sync write, so need to
940                  * set page writeback flag
941                  */
942                 if (cp->cp_sync_io == NULL)
943                         set_page_writeback(vmpage);
944         }
945
946         cl_page_io_start(env, cp, crt);
947         rc = 0;
948 out:
949         CL_PAGE_HEADER(D_TRACE, env, cp, "%d %d\n", crt, rc);
950
951         return rc;
952 }
953 EXPORT_SYMBOL(cl_page_prep);
954
955 /**
956  * Notify layers about transfer completion.
957  *
958  * Invoked by transfer sub-system (which is a part of osc) to notify layers
959  * that a transfer, of which this page is a part of has completed.
960  *
961  * Completion call-backs are executed in the bottom-up order, so that
962  * uppermost layer (llite), responsible for the VFS/VM interaction runs last
963  * and can release locks safely.
964  *
965  * \pre  cl_page->cp_state == CPS_PAGEIN || cl_page->cp_state == CPS_PAGEOUT
966  * \post cl_page->cl_page_state == CPS_CACHED
967  *
968  * \see cl_page_operations::cpo_completion()
969  */
970 void cl_page_completion(const struct lu_env *env,
971                         struct cl_page *cl_page, enum cl_req_type crt,
972                         int ioret)
973 {
974         const struct cl_page_slice *slice;
975         struct cl_sync_io *anchor = cl_page->cp_sync_io;
976         int i;
977
978         ENTRY;
979         PASSERT(env, cl_page, crt < CRT_NR);
980         PASSERT(env, cl_page, cl_page->cp_state == cl_req_type_state(crt));
981
982         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", crt, ioret);
983         cl_page_state_set(env, cl_page, CPS_CACHED);
984         if (crt >= CRT_NR)
985                 return;
986
987         cl_page_slice_for_each_reverse(cl_page, slice, i) {
988                 if (slice->cpl_ops->io[crt].cpo_completion != NULL)
989                         (*slice->cpl_ops->io[crt].cpo_completion)(env, slice,
990                                                                   ioret);
991         }
992
993         if (anchor != NULL) {
994                 LASSERT(cl_page->cp_sync_io == anchor);
995                 cl_page->cp_sync_io = NULL;
996                 cl_sync_io_note(env, anchor, ioret);
997         }
998         EXIT;
999 }
1000 EXPORT_SYMBOL(cl_page_completion);
1001
1002 /**
1003  * Notify layers that transfer formation engine decided to yank this page from
1004  * the cache and to make it a part of a transfer.
1005  *
1006  * \pre  cl_page->cp_state == CPS_CACHED
1007  * \post cl_page->cp_state == CPS_PAGEIN || cl_page->cp_state == CPS_PAGEOUT
1008  */
1009 int cl_page_make_ready(const struct lu_env *env, struct cl_page *cp,
1010                        enum cl_req_type crt)
1011 {
1012         struct page *vmpage = cp->cp_vmpage;
1013         int rc;
1014
1015         ENTRY;
1016         PASSERT(env, cp, crt == CRT_WRITE);
1017
1018         if (cp->cp_type == CPT_TRANSIENT)
1019                 GOTO(out, rc = 0);
1020
1021         lock_page(vmpage);
1022
1023         if (clear_page_dirty_for_io(vmpage)) {
1024                 LASSERT(cp->cp_state == CPS_CACHED);
1025                 /* This actually clears the dirty bit in the
1026                  * radix tree.
1027                  */
1028                 set_page_writeback(vmpage);
1029                 CL_PAGE_HEADER(D_PAGE, env, cp, "readied\n");
1030                 rc = 0;
1031         } else if (cp->cp_state == CPS_PAGEOUT) {
1032                 /* is it possible for osc_flush_async_page()
1033                  * to already make it ready?
1034                  */
1035                 rc = -EALREADY;
1036         } else {
1037                 CL_PAGE_DEBUG(D_ERROR, env, cp,
1038                               "unexpecting page state %d\n",
1039                               cp->cp_state);
1040                 LBUG();
1041         }
1042
1043         unlock_page(vmpage);
1044 out:
1045         if (rc == 0) {
1046                 PASSERT(env, cp, cp->cp_state == CPS_CACHED);
1047                 cl_page_io_start(env, cp, crt);
1048         }
1049
1050         CL_PAGE_HEADER(D_TRACE, env, cp, "%d %d\n", crt, rc);
1051
1052         return rc;
1053 }
1054 EXPORT_SYMBOL(cl_page_make_ready);
1055
1056 /**
1057  * Called if a page is being written back by kernel's intention.
1058  *
1059  * \pre  cl_page_is_owned(cl_page, io)
1060  * \post ergo(result == 0, cl_page->cp_state == CPS_PAGEOUT)
1061  *
1062  * \see cl_page_operations::cpo_flush()
1063  */
1064 int cl_page_flush(const struct lu_env *env, struct cl_io *io,
1065                   struct cl_page *cl_page)
1066 {
1067         const struct cl_page_slice *slice;
1068         int result = 0;
1069         int i;
1070
1071         ENTRY;
1072         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
1073         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
1074
1075         cl_page_slice_for_each(cl_page, slice, i) {
1076                 if (slice->cpl_ops->cpo_flush != NULL)
1077                         result = (*slice->cpl_ops->cpo_flush)(env, slice, io);
1078                 if (result != 0)
1079                         break;
1080         }
1081         if (result > 0)
1082                 result = 0;
1083
1084         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d\n", result);
1085         RETURN(result);
1086 }
1087 EXPORT_SYMBOL(cl_page_flush);
1088
1089 /**
1090  * Tells transfer engine that only part of a page is to be transmitted.
1091  *
1092  * \see cl_page_operations::cpo_clip()
1093  */
1094 void cl_page_clip(const struct lu_env *env, struct cl_page *cl_page,
1095                   int from, int to)
1096 {
1097         const struct cl_page_slice *slice;
1098         int i;
1099
1100         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
1101
1102         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", from, to);
1103         cl_page_slice_for_each(cl_page, slice, i) {
1104                 if (slice->cpl_ops->cpo_clip != NULL)
1105                         (*slice->cpl_ops->cpo_clip)(env, slice, from, to);
1106         }
1107 }
1108 EXPORT_SYMBOL(cl_page_clip);
1109
1110 /**
1111  * Prints human readable representation of \a pg to the \a f.
1112  */
1113 void cl_page_header_print(const struct lu_env *env, void *cookie,
1114                           lu_printer_t printer, const struct cl_page *pg)
1115 {
1116         (*printer)(env, cookie,
1117                    "page@%p[%d %p %d %d %p]\n",
1118                    pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1119                    pg->cp_state, pg->cp_type,
1120                    pg->cp_owner);
1121 }
1122 EXPORT_SYMBOL(cl_page_header_print);
1123
1124 /**
1125  * Prints human readable representation of \a cl_page to the \a f.
1126  */
1127 void cl_page_print(const struct lu_env *env, void *cookie,
1128                    lu_printer_t printer, const struct cl_page *cp)
1129 {
1130         struct page *vmpage = cp->cp_vmpage;
1131         const struct cl_page_slice *slice;
1132         int result = 0;
1133         int i;
1134
1135         cl_page_header_print(env, cookie, printer, cp);
1136
1137         (*printer)(env, cookie, "vmpage @%p", vmpage);
1138
1139         if (vmpage != NULL) {
1140                 (*printer)(env, cookie, " %lx %d:%d %lx %lu %slru",
1141                            (long)vmpage->flags, page_count(vmpage),
1142                            page_mapcount(vmpage), vmpage->private,
1143                            page_index(vmpage),
1144                            list_empty(&vmpage->lru) ? "not-" : "");
1145         }
1146
1147         (*printer)(env, cookie, "\n");
1148
1149         cl_page_slice_for_each(cp, slice, i) {
1150                 if (slice->cpl_ops->cpo_print != NULL)
1151                         result = (*slice->cpl_ops->cpo_print)(env, slice,
1152                                                               cookie, printer);
1153                 if (result != 0)
1154                         break;
1155         }
1156
1157         (*printer)(env, cookie, "end page@%p\n", cp);
1158 }
1159 EXPORT_SYMBOL(cl_page_print);
1160
1161 /**
1162  * Converts a byte offset within object \a obj into a page index.
1163  */
1164 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1165 {
1166         return (loff_t)idx << PAGE_SHIFT;
1167 }
1168 EXPORT_SYMBOL(cl_offset);
1169
1170 /**
1171  * Converts a page index into a byte offset within object \a obj.
1172  */
1173 pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1174 {
1175         return offset >> PAGE_SHIFT;
1176 }
1177 EXPORT_SYMBOL(cl_index);
1178
1179 size_t cl_page_size(const struct cl_object *obj)
1180 {
1181         return 1UL << PAGE_SHIFT;
1182 }
1183 EXPORT_SYMBOL(cl_page_size);
1184
1185 /**
1186  * Adds page slice to the compound page.
1187  *
1188  * This is called by cl_object_operations::coo_page_init() methods to add a
1189  * per-layer state to the page. New state is added at the end of
1190  * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1191  *
1192  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1193  */
1194 void cl_page_slice_add(struct cl_page *cl_page, struct cl_page_slice *slice,
1195                        struct cl_object *obj,
1196                        const struct cl_page_operations *ops)
1197 {
1198         unsigned int offset = (char *)slice -
1199                         ((char *)cl_page + sizeof(*cl_page));
1200
1201         ENTRY;
1202         LASSERT(cl_page->cp_layer_count < CP_MAX_LAYER);
1203         LASSERT(offset < (1 << sizeof(cl_page->cp_layer_offset[0]) * 8));
1204         cl_page->cp_layer_offset[cl_page->cp_layer_count++] = offset;
1205         slice->cpl_obj  = obj;
1206         slice->cpl_ops  = ops;
1207         slice->cpl_page = cl_page;
1208
1209         EXIT;
1210 }
1211 EXPORT_SYMBOL(cl_page_slice_add);
1212
1213 /**
1214  * Allocate and initialize cl_cache, called by ll_init_sbi().
1215  */
1216 struct cl_client_cache *cl_cache_init(unsigned long lru_page_max)
1217 {
1218         struct cl_client_cache  *cache = NULL;
1219
1220         ENTRY;
1221         OBD_ALLOC(cache, sizeof(*cache));
1222         if (cache == NULL)
1223                 RETURN(NULL);
1224
1225         /* Initialize cache data */
1226         atomic_set(&cache->ccc_users, 1);
1227         cache->ccc_lru_max = lru_page_max;
1228         atomic_long_set(&cache->ccc_lru_left, lru_page_max);
1229         spin_lock_init(&cache->ccc_lru_lock);
1230         INIT_LIST_HEAD(&cache->ccc_lru);
1231
1232         /* turn unstable check off by default as it impacts performance */
1233         cache->ccc_unstable_check = 0;
1234         atomic_long_set(&cache->ccc_unstable_nr, 0);
1235         init_waitqueue_head(&cache->ccc_unstable_waitq);
1236         mutex_init(&cache->ccc_max_cache_mb_lock);
1237
1238         RETURN(cache);
1239 }
1240 EXPORT_SYMBOL(cl_cache_init);
1241
1242 /**
1243  * Increase cl_cache refcount
1244  */
1245 void cl_cache_incref(struct cl_client_cache *cache)
1246 {
1247         atomic_inc(&cache->ccc_users);
1248 }
1249 EXPORT_SYMBOL(cl_cache_incref);
1250
1251 /**
1252  * Decrease cl_cache refcount and free the cache if refcount=0.
1253  * Since llite, lov and osc all hold cl_cache refcount,
1254  * the free will not cause race. (LU-6173)
1255  */
1256 void cl_cache_decref(struct cl_client_cache *cache)
1257 {
1258         if (atomic_dec_and_test(&cache->ccc_users))
1259                 OBD_FREE(cache, sizeof(*cache));
1260 }
1261 EXPORT_SYMBOL(cl_cache_decref);