Whamcloud - gitweb
LU-14627 lnet: Ensure ref taken when queueing for discovery
[fs/lustre-release.git] / lustre / obdclass / cl_page.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * Client Lustre Page.
32  *
33  *   Author: Nikita Danilov <nikita.danilov@sun.com>
34  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
35  */
36
37 #define DEBUG_SUBSYSTEM S_CLASS
38
39 #include <linux/list.h>
40 #include <libcfs/libcfs.h>
41 #include <obd_class.h>
42 #include <obd_support.h>
43
44 #include <cl_object.h>
45 #include "cl_internal.h"
46
47 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg);
48 static DEFINE_MUTEX(cl_page_kmem_mutex);
49
50 #ifdef LIBCFS_DEBUG
51 # define PASSERT(env, page, expr)                                       \
52   do {                                                                    \
53           if (unlikely(!(expr))) {                                      \
54                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
55                   LASSERT(0);                                           \
56           }                                                             \
57   } while (0)
58 #else /* !LIBCFS_DEBUG */
59 # define PASSERT(env, page, exp) \
60         ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
61 #endif /* !LIBCFS_DEBUG */
62
63 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
64 # define PINVRNT(env, page, expr)                                       \
65   do {                                                                    \
66           if (unlikely(!(expr))) {                                      \
67                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
68                   LINVRNT(0);                                           \
69           }                                                             \
70   } while (0)
71 #else /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
72 # define PINVRNT(env, page, exp) \
73          ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
74 #endif /* !CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
75
76 /* Disable page statistic by default due to huge performance penalty. */
77 static void cs_page_inc(const struct cl_object *obj,
78                         enum cache_stats_item item)
79 {
80 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
81         atomic_inc(&cl_object_site(obj)->cs_pages.cs_stats[item]);
82 #endif
83 }
84
85 static void cs_page_dec(const struct cl_object *obj,
86                         enum cache_stats_item item)
87 {
88 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
89         atomic_dec(&cl_object_site(obj)->cs_pages.cs_stats[item]);
90 #endif
91 }
92
93 static void cs_pagestate_inc(const struct cl_object *obj,
94                              enum cl_page_state state)
95 {
96 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
97         atomic_inc(&cl_object_site(obj)->cs_pages_state[state]);
98 #endif
99 }
100
101 static void cs_pagestate_dec(const struct cl_object *obj,
102                               enum cl_page_state state)
103 {
104 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
105         atomic_dec(&cl_object_site(obj)->cs_pages_state[state]);
106 #endif
107 }
108
109 /**
110  * Internal version of cl_page_get().
111  *
112  * This function can be used to obtain initial reference to previously
113  * unreferenced cached object. It can be called only if concurrent page
114  * reclamation is somehow prevented, e.g., by keeping a lock on a VM page,
115  * associated with \a page.
116  *
117  * Use with care! Not exported.
118  */
119 static void cl_page_get_trust(struct cl_page *page)
120 {
121         LASSERT(atomic_read(&page->cp_ref) > 0);
122         atomic_inc(&page->cp_ref);
123 }
124
125 static struct cl_page_slice *
126 cl_page_slice_get(const struct cl_page *cl_page, int index)
127 {
128         if (index < 0 || index >= cl_page->cp_layer_count)
129                 return NULL;
130
131         /* To get the cp_layer_offset values fit under 256 bytes, we
132          * use the offset beyond the end of struct cl_page.
133          */
134         return (struct cl_page_slice *)((char *)cl_page + sizeof(*cl_page) +
135                                         cl_page->cp_layer_offset[index]);
136 }
137
138 #define cl_page_slice_for_each(cl_page, slice, i)               \
139         for (i = 0, slice = cl_page_slice_get(cl_page, 0);      \
140              i < (cl_page)->cp_layer_count;                     \
141              slice = cl_page_slice_get(cl_page, ++i))
142
143 #define cl_page_slice_for_each_reverse(cl_page, slice, i)       \
144         for (i = (cl_page)->cp_layer_count - 1,                 \
145              slice = cl_page_slice_get(cl_page, i); i >= 0;     \
146              slice = cl_page_slice_get(cl_page, --i))
147
148 /**
149  * Returns a slice within a cl_page, corresponding to the given layer in the
150  * device stack.
151  *
152  * \see cl_lock_at()
153  */
154 static const struct cl_page_slice *
155 cl_page_at_trusted(const struct cl_page *cl_page,
156                    const struct lu_device_type *dtype)
157 {
158         const struct cl_page_slice *slice;
159         int i;
160
161         ENTRY;
162
163         cl_page_slice_for_each(cl_page, slice, i) {
164                 if (slice->cpl_obj->co_lu.lo_dev->ld_type == dtype)
165                         RETURN(slice);
166         }
167
168         RETURN(NULL);
169 }
170
171 static void __cl_page_free(struct cl_page *cl_page, unsigned short bufsize)
172 {
173         int index = cl_page->cp_kmem_index;
174
175         if (index >= 0) {
176                 LASSERT(index < ARRAY_SIZE(cl_page_kmem_array));
177                 LASSERT(cl_page_kmem_size_array[index] == bufsize);
178                 OBD_SLAB_FREE(cl_page, cl_page_kmem_array[index], bufsize);
179         } else {
180                 OBD_FREE(cl_page, bufsize);
181         }
182 }
183
184 static void cl_page_free(const struct lu_env *env, struct cl_page *cl_page,
185                          struct pagevec *pvec)
186 {
187         struct cl_object *obj  = cl_page->cp_obj;
188         unsigned short bufsize = cl_object_header(obj)->coh_page_bufsize;
189         struct cl_page_slice *slice;
190         int i;
191
192         ENTRY;
193         PASSERT(env, cl_page, list_empty(&cl_page->cp_batch));
194         PASSERT(env, cl_page, cl_page->cp_owner == NULL);
195         PASSERT(env, cl_page, cl_page->cp_state == CPS_FREEING);
196
197         cl_page_slice_for_each(cl_page, slice, i) {
198                 if (unlikely(slice->cpl_ops->cpo_fini != NULL))
199                         slice->cpl_ops->cpo_fini(env, slice, pvec);
200         }
201         cl_page->cp_layer_count = 0;
202         cs_page_dec(obj, CS_total);
203         cs_pagestate_dec(obj, cl_page->cp_state);
204         lu_object_ref_del_at(&obj->co_lu, &cl_page->cp_obj_ref,
205                              "cl_page", cl_page);
206         cl_object_put(env, obj);
207         lu_ref_fini(&cl_page->cp_reference);
208         __cl_page_free(cl_page, bufsize);
209         EXIT;
210 }
211
212 static struct cl_page *__cl_page_alloc(struct cl_object *o)
213 {
214         int i = 0;
215         struct cl_page *cl_page = NULL;
216         unsigned short bufsize = cl_object_header(o)->coh_page_bufsize;
217
218 check:
219         /* the number of entries in cl_page_kmem_array is expected to
220          * only be 2-3 entries, so the lookup overhead should be low.
221          */
222         for ( ; i < ARRAY_SIZE(cl_page_kmem_array); i++) {
223                 if (smp_load_acquire(&cl_page_kmem_size_array[i])
224                     == bufsize) {
225                         OBD_SLAB_ALLOC_GFP(cl_page, cl_page_kmem_array[i],
226                                            bufsize, GFP_NOFS);
227                         if (cl_page)
228                                 cl_page->cp_kmem_index = i;
229                         return cl_page;
230                 }
231                 if (cl_page_kmem_size_array[i] == 0)
232                         break;
233         }
234
235         if (i < ARRAY_SIZE(cl_page_kmem_array)) {
236                 char cache_name[32];
237
238                 mutex_lock(&cl_page_kmem_mutex);
239                 if (cl_page_kmem_size_array[i]) {
240                         mutex_unlock(&cl_page_kmem_mutex);
241                         goto check;
242                 }
243                 snprintf(cache_name, sizeof(cache_name),
244                          "cl_page_kmem-%u", bufsize);
245                 cl_page_kmem_array[i] =
246                         kmem_cache_create(cache_name, bufsize,
247                                           0, 0, NULL);
248                 if (cl_page_kmem_array[i] == NULL) {
249                         mutex_unlock(&cl_page_kmem_mutex);
250                         return NULL;
251                 }
252                 smp_store_release(&cl_page_kmem_size_array[i],
253                                   bufsize);
254                 mutex_unlock(&cl_page_kmem_mutex);
255                 goto check;
256         } else {
257                 OBD_ALLOC_GFP(cl_page, bufsize, GFP_NOFS);
258                 if (cl_page)
259                         cl_page->cp_kmem_index = -1;
260         }
261
262         return cl_page;
263 }
264
265 struct cl_page *cl_page_alloc(const struct lu_env *env, struct cl_object *o,
266                               pgoff_t ind, struct page *vmpage,
267                               enum cl_page_type type)
268 {
269         struct cl_page *cl_page;
270         struct cl_object *head;
271
272         ENTRY;
273
274         cl_page = __cl_page_alloc(o);
275         if (cl_page != NULL) {
276                 int result = 0;
277
278                 /*
279                  * Please fix cl_page:cp_state/type declaration if
280                  * these assertions fail in the future.
281                  */
282                 BUILD_BUG_ON((1 << CP_STATE_BITS) < CPS_NR); /* cp_state */
283                 BUILD_BUG_ON((1 << CP_TYPE_BITS) < CPT_NR); /* cp_type */
284                 atomic_set(&cl_page->cp_ref, 1);
285                 cl_page->cp_obj = o;
286                 cl_object_get(o);
287                 lu_object_ref_add_at(&o->co_lu, &cl_page->cp_obj_ref,
288                                      "cl_page", cl_page);
289                 cl_page->cp_vmpage = vmpage;
290                 cl_page->cp_state = CPS_CACHED;
291                 cl_page->cp_type = type;
292                 cl_page->cp_inode = NULL;
293                 INIT_LIST_HEAD(&cl_page->cp_batch);
294                 lu_ref_init(&cl_page->cp_reference);
295                 head = o;
296                 cl_object_for_each(o, head) {
297                         if (o->co_ops->coo_page_init != NULL) {
298                                 result = o->co_ops->coo_page_init(env, o,
299                                                         cl_page, ind);
300                                 if (result != 0) {
301                                         cl_page_delete0(env, cl_page);
302                                         cl_page_free(env, cl_page, NULL);
303                                         cl_page = ERR_PTR(result);
304                                         break;
305                                 }
306                         }
307                 }
308                 if (result == 0) {
309                         cs_page_inc(o, CS_total);
310                         cs_page_inc(o, CS_create);
311                         cs_pagestate_dec(o, CPS_CACHED);
312                 }
313         } else {
314                 cl_page = ERR_PTR(-ENOMEM);
315         }
316         RETURN(cl_page);
317 }
318
319 /**
320  * Returns a cl_page with index \a idx at the object \a o, and associated with
321  * the VM page \a vmpage.
322  *
323  * This is the main entry point into the cl_page caching interface. First, a
324  * cache (implemented as a per-object radix tree) is consulted. If page is
325  * found there, it is returned immediately. Otherwise new page is allocated
326  * and returned. In any case, additional reference to page is acquired.
327  *
328  * \see cl_object_find(), cl_lock_find()
329  */
330 struct cl_page *cl_page_find(const struct lu_env *env,
331                              struct cl_object *o,
332                              pgoff_t idx, struct page *vmpage,
333                              enum cl_page_type type)
334 {
335         struct cl_page          *page = NULL;
336         struct cl_object_header *hdr;
337
338         LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
339         might_sleep();
340
341         ENTRY;
342
343         hdr = cl_object_header(o);
344         cs_page_inc(o, CS_lookup);
345
346         CDEBUG(D_PAGE, "%lu@"DFID" %p %lx %d\n",
347                idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
348         /* fast path. */
349         if (type == CPT_CACHEABLE) {
350                 /* vmpage lock is used to protect the child/parent
351                  * relationship */
352                 LASSERT(PageLocked(vmpage));
353                 /*
354                  * cl_vmpage_page() can be called here without any locks as
355                  *
356                  *     - "vmpage" is locked (which prevents ->private from
357                  *       concurrent updates), and
358                  *
359                  *     - "o" cannot be destroyed while current thread holds a
360                  *       reference on it.
361                  */
362                 page = cl_vmpage_page(vmpage, o);
363                 if (page != NULL) {
364                         cs_page_inc(o, CS_hit);
365                         RETURN(page);
366                 }
367         }
368
369         /* allocate and initialize cl_page */
370         page = cl_page_alloc(env, o, idx, vmpage, type);
371         RETURN(page);
372 }
373 EXPORT_SYMBOL(cl_page_find);
374
375 static inline int cl_page_invariant(const struct cl_page *pg)
376 {
377         return cl_page_in_use_noref(pg);
378 }
379
380 static void cl_page_state_set0(const struct lu_env *env,
381                                struct cl_page *cl_page,
382                                enum cl_page_state state)
383 {
384         enum cl_page_state old;
385
386         /*
387          * Matrix of allowed state transitions [old][new], for sanity
388          * checking.
389          */
390         static const int allowed_transitions[CPS_NR][CPS_NR] = {
391                 [CPS_CACHED] = {
392                         [CPS_CACHED]  = 0,
393                         [CPS_OWNED]   = 1, /* io finds existing cached page */
394                         [CPS_PAGEIN]  = 0,
395                         [CPS_PAGEOUT] = 1, /* write-out from the cache */
396                         [CPS_FREEING] = 1, /* eviction on the memory pressure */
397                 },
398                 [CPS_OWNED] = {
399                         [CPS_CACHED]  = 1, /* release to the cache */
400                         [CPS_OWNED]   = 0,
401                         [CPS_PAGEIN]  = 1, /* start read immediately */
402                         [CPS_PAGEOUT] = 1, /* start write immediately */
403                         [CPS_FREEING] = 1, /* lock invalidation or truncate */
404                 },
405                 [CPS_PAGEIN] = {
406                         [CPS_CACHED]  = 1, /* io completion */
407                         [CPS_OWNED]   = 0,
408                         [CPS_PAGEIN]  = 0,
409                         [CPS_PAGEOUT] = 0,
410                         [CPS_FREEING] = 0,
411                 },
412                 [CPS_PAGEOUT] = {
413                         [CPS_CACHED]  = 1, /* io completion */
414                         [CPS_OWNED]   = 0,
415                         [CPS_PAGEIN]  = 0,
416                         [CPS_PAGEOUT] = 0,
417                         [CPS_FREEING] = 0,
418                 },
419                 [CPS_FREEING] = {
420                         [CPS_CACHED]  = 0,
421                         [CPS_OWNED]   = 0,
422                         [CPS_PAGEIN]  = 0,
423                         [CPS_PAGEOUT] = 0,
424                         [CPS_FREEING] = 0,
425                 }
426         };
427
428         ENTRY;
429         old = cl_page->cp_state;
430         PASSERT(env, cl_page, allowed_transitions[old][state]);
431         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d -> %d\n", old, state);
432         PASSERT(env, cl_page, cl_page->cp_state == old);
433         PASSERT(env, cl_page, equi(state == CPS_OWNED,
434                                    cl_page->cp_owner != NULL));
435
436         cs_pagestate_dec(cl_page->cp_obj, cl_page->cp_state);
437         cs_pagestate_inc(cl_page->cp_obj, state);
438         cl_page->cp_state = state;
439         EXIT;
440 }
441
442 static void cl_page_state_set(const struct lu_env *env,
443                               struct cl_page *page, enum cl_page_state state)
444 {
445         cl_page_state_set0(env, page, state);
446 }
447
448 /**
449  * Acquires an additional reference to a page.
450  *
451  * This can be called only by caller already possessing a reference to \a
452  * page.
453  *
454  * \see cl_object_get(), cl_lock_get().
455  */
456 void cl_page_get(struct cl_page *page)
457 {
458         ENTRY;
459         cl_page_get_trust(page);
460         EXIT;
461 }
462 EXPORT_SYMBOL(cl_page_get);
463
464 /**
465  * Releases a reference to a page, use the pagevec to release the pages
466  * in batch if provided.
467  *
468  * Users need to do a final pagevec_release() to release any trailing pages.
469  */
470 void cl_pagevec_put(const struct lu_env *env, struct cl_page *page,
471                   struct pagevec *pvec)
472 {
473         ENTRY;
474         CL_PAGE_HEADER(D_TRACE, env, page, "%d\n",
475                        atomic_read(&page->cp_ref));
476
477         if (atomic_dec_and_test(&page->cp_ref)) {
478                 LASSERT(page->cp_state == CPS_FREEING);
479
480                 LASSERT(atomic_read(&page->cp_ref) == 0);
481                 PASSERT(env, page, page->cp_owner == NULL);
482                 PASSERT(env, page, list_empty(&page->cp_batch));
483                 /*
484                  * Page is no longer reachable by other threads. Tear
485                  * it down.
486                  */
487                 cl_page_free(env, page, pvec);
488         }
489
490         EXIT;
491 }
492 EXPORT_SYMBOL(cl_pagevec_put);
493
494 /**
495  * Releases a reference to a page, wrapper to cl_pagevec_put
496  *
497  * When last reference is released, page is returned to the cache, unless it
498  * is in cl_page_state::CPS_FREEING state, in which case it is immediately
499  * destroyed.
500  *
501  * \see cl_object_put(), cl_lock_put().
502  */
503 void cl_page_put(const struct lu_env *env, struct cl_page *page)
504 {
505         cl_pagevec_put(env, page, NULL);
506 }
507 EXPORT_SYMBOL(cl_page_put);
508
509 /**
510  * Returns a cl_page associated with a VM page, and given cl_object.
511  */
512 struct cl_page *cl_vmpage_page(struct page *vmpage, struct cl_object *obj)
513 {
514         struct cl_page *page;
515
516         ENTRY;
517         LASSERT(PageLocked(vmpage));
518
519         /*
520          * NOTE: absence of races and liveness of data are guaranteed by page
521          *       lock on a "vmpage". That works because object destruction has
522          *       bottom-to-top pass.
523          */
524
525         page = (struct cl_page *)vmpage->private;
526         if (page != NULL) {
527                 cl_page_get_trust(page);
528                 LASSERT(page->cp_type == CPT_CACHEABLE);
529         }
530         RETURN(page);
531 }
532 EXPORT_SYMBOL(cl_vmpage_page);
533
534 const struct cl_page_slice *cl_page_at(const struct cl_page *page,
535                                        const struct lu_device_type *dtype)
536 {
537         return cl_page_at_trusted(page, dtype);
538 }
539 EXPORT_SYMBOL(cl_page_at);
540
541 static void cl_page_owner_clear(struct cl_page *page)
542 {
543         ENTRY;
544         if (page->cp_owner != NULL) {
545                 LASSERT(page->cp_owner->ci_owned_nr > 0);
546                 page->cp_owner->ci_owned_nr--;
547                 page->cp_owner = NULL;
548         }
549         EXIT;
550 }
551
552 static void cl_page_owner_set(struct cl_page *page)
553 {
554         ENTRY;
555         LASSERT(page->cp_owner != NULL);
556         page->cp_owner->ci_owned_nr++;
557         EXIT;
558 }
559
560 void cl_page_disown0(const struct lu_env *env,
561                      struct cl_io *io, struct cl_page *cl_page)
562 {
563         const struct cl_page_slice *slice;
564         enum cl_page_state state;
565         int i;
566
567         ENTRY;
568         state = cl_page->cp_state;
569         PINVRNT(env, cl_page, state == CPS_OWNED ||
570                 state == CPS_FREEING);
571         PINVRNT(env, cl_page, cl_page_invariant(cl_page) ||
572                 state == CPS_FREEING);
573         cl_page_owner_clear(cl_page);
574
575         if (state == CPS_OWNED)
576                 cl_page_state_set(env, cl_page, CPS_CACHED);
577         /*
578          * Completion call-backs are executed in the bottom-up order, so that
579          * uppermost layer (llite), responsible for VFS/VM interaction runs
580          * last and can release locks safely.
581          */
582         cl_page_slice_for_each_reverse(cl_page, slice, i) {
583                 if (slice->cpl_ops->cpo_disown != NULL)
584                         (*slice->cpl_ops->cpo_disown)(env, slice, io);
585         }
586
587         EXIT;
588 }
589
590 /**
591  * returns true, iff page is owned by the given io.
592  */
593 int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
594 {
595         struct cl_io *top = cl_io_top((struct cl_io *)io);
596         LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
597         ENTRY;
598         RETURN(pg->cp_state == CPS_OWNED && pg->cp_owner == top);
599 }
600 EXPORT_SYMBOL(cl_page_is_owned);
601
602 /**
603  * Try to own a page by IO.
604  *
605  * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
606  * into cl_page_state::CPS_OWNED state.
607  *
608  * \pre  !cl_page_is_owned(cl_page, io)
609  * \post result == 0 iff cl_page_is_owned(cl_page, io)
610  *
611  * \retval 0   success
612  *
613  * \retval -ve failure, e.g., cl_page was destroyed (and landed in
614  *             cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
615  *             or, page was owned by another thread, or in IO.
616  *
617  * \see cl_page_disown()
618  * \see cl_page_operations::cpo_own()
619  * \see cl_page_own_try()
620  * \see cl_page_own
621  */
622 static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
623                         struct cl_page *cl_page, int nonblock)
624 {
625         const struct cl_page_slice *slice;
626         int result = 0;
627         int i;
628
629         ENTRY;
630         PINVRNT(env, cl_page, !cl_page_is_owned(cl_page, io));
631         io = cl_io_top(io);
632
633         if (cl_page->cp_state == CPS_FREEING) {
634                 result = -ENOENT;
635                 goto out;
636         }
637
638         cl_page_slice_for_each(cl_page, slice, i) {
639                 if (slice->cpl_ops->cpo_own)
640                         result = (*slice->cpl_ops->cpo_own)(env, slice,
641                                                             io, nonblock);
642                 if (result != 0)
643                         break;
644         }
645         if (result > 0)
646                 result = 0;
647
648         if (result == 0) {
649                 PASSERT(env, cl_page, cl_page->cp_owner == NULL);
650                 cl_page->cp_owner = cl_io_top(io);
651                 cl_page_owner_set(cl_page);
652                 if (cl_page->cp_state != CPS_FREEING) {
653                         cl_page_state_set(env, cl_page, CPS_OWNED);
654                 } else {
655                         cl_page_disown0(env, io, cl_page);
656                         result = -ENOENT;
657                 }
658         }
659
660 out:
661         PINVRNT(env, cl_page, ergo(result == 0,
662                 cl_page_invariant(cl_page)));
663         RETURN(result);
664 }
665
666 /**
667  * Own a page, might be blocked.
668  *
669  * \see cl_page_own0()
670  */
671 int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
672 {
673         return cl_page_own0(env, io, pg, 0);
674 }
675 EXPORT_SYMBOL(cl_page_own);
676
677 /**
678  * Nonblock version of cl_page_own().
679  *
680  * \see cl_page_own0()
681  */
682 int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
683                     struct cl_page *pg)
684 {
685         return cl_page_own0(env, io, pg, 1);
686 }
687 EXPORT_SYMBOL(cl_page_own_try);
688
689
690 /**
691  * Assume page ownership.
692  *
693  * Called when page is already locked by the hosting VM.
694  *
695  * \pre !cl_page_is_owned(cl_page, io)
696  * \post cl_page_is_owned(cl_page, io)
697  *
698  * \see cl_page_operations::cpo_assume()
699  */
700 void cl_page_assume(const struct lu_env *env,
701                     struct cl_io *io, struct cl_page *cl_page)
702 {
703         const struct cl_page_slice *slice;
704         int i;
705
706         ENTRY;
707
708         PINVRNT(env, cl_page,
709                 cl_object_same(cl_page->cp_obj, io->ci_obj));
710         io = cl_io_top(io);
711
712         cl_page_slice_for_each(cl_page, slice, i) {
713                 if (slice->cpl_ops->cpo_assume != NULL)
714                         (*slice->cpl_ops->cpo_assume)(env, slice, io);
715         }
716
717         PASSERT(env, cl_page, cl_page->cp_owner == NULL);
718         cl_page->cp_owner = cl_io_top(io);
719         cl_page_owner_set(cl_page);
720         cl_page_state_set(env, cl_page, CPS_OWNED);
721         EXIT;
722 }
723 EXPORT_SYMBOL(cl_page_assume);
724
725 /**
726  * Releases page ownership without unlocking the page.
727  *
728  * Moves cl_page into cl_page_state::CPS_CACHED without releasing a lock
729  * on the underlying VM page (as VM is supposed to do this itself).
730  *
731  * \pre   cl_page_is_owned(cl_page, io)
732  * \post !cl_page_is_owned(cl_page, io)
733  *
734  * \see cl_page_assume()
735  */
736 void cl_page_unassume(const struct lu_env *env,
737                       struct cl_io *io, struct cl_page *cl_page)
738 {
739         const struct cl_page_slice *slice;
740         int i;
741
742         ENTRY;
743         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
744         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
745
746         io = cl_io_top(io);
747         cl_page_owner_clear(cl_page);
748         cl_page_state_set(env, cl_page, CPS_CACHED);
749
750         cl_page_slice_for_each_reverse(cl_page, slice, i) {
751                 if (slice->cpl_ops->cpo_unassume != NULL)
752                         (*slice->cpl_ops->cpo_unassume)(env, slice, io);
753         }
754
755         EXIT;
756 }
757 EXPORT_SYMBOL(cl_page_unassume);
758
759 /**
760  * Releases page ownership.
761  *
762  * Moves page into cl_page_state::CPS_CACHED.
763  *
764  * \pre   cl_page_is_owned(pg, io)
765  * \post !cl_page_is_owned(pg, io)
766  *
767  * \see cl_page_own()
768  * \see cl_page_operations::cpo_disown()
769  */
770 void cl_page_disown(const struct lu_env *env,
771                     struct cl_io *io, struct cl_page *pg)
772 {
773         PINVRNT(env, pg, cl_page_is_owned(pg, io) ||
774                 pg->cp_state == CPS_FREEING);
775
776         ENTRY;
777         io = cl_io_top(io);
778         cl_page_disown0(env, io, pg);
779         EXIT;
780 }
781 EXPORT_SYMBOL(cl_page_disown);
782
783 /**
784  * Called when cl_page is to be removed from the object, e.g.,
785  * as a result of truncate.
786  *
787  * Calls cl_page_operations::cpo_discard() top-to-bottom.
788  *
789  * \pre cl_page_is_owned(cl_page, io)
790  *
791  * \see cl_page_operations::cpo_discard()
792  */
793 void cl_page_discard(const struct lu_env *env,
794                      struct cl_io *io, struct cl_page *cl_page)
795 {
796         const struct cl_page_slice *slice;
797         int i;
798
799         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
800         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
801
802         cl_page_slice_for_each(cl_page, slice, i) {
803                 if (slice->cpl_ops->cpo_discard != NULL)
804                         (*slice->cpl_ops->cpo_discard)(env, slice, io);
805         }
806 }
807 EXPORT_SYMBOL(cl_page_discard);
808
809 /**
810  * Version of cl_page_delete() that can be called for not fully constructed
811  * cl_pages, e.g. in an error handling cl_page_find()->cl_page_delete0()
812  * path. Doesn't check cl_page invariant.
813  */
814 static void cl_page_delete0(const struct lu_env *env,
815                             struct cl_page *cl_page)
816 {
817         const struct cl_page_slice *slice;
818         int i;
819
820         ENTRY;
821
822         PASSERT(env, cl_page, cl_page->cp_state != CPS_FREEING);
823
824         /*
825          * Severe all ways to obtain new pointers to @pg.
826          */
827         cl_page_owner_clear(cl_page);
828         cl_page_state_set0(env, cl_page, CPS_FREEING);
829
830         cl_page_slice_for_each_reverse(cl_page, slice, i) {
831                 if (slice->cpl_ops->cpo_delete != NULL)
832                         (*slice->cpl_ops->cpo_delete)(env, slice);
833         }
834
835         EXIT;
836 }
837
838 /**
839  * Called when a decision is made to throw page out of memory.
840  *
841  * Notifies all layers about page destruction by calling
842  * cl_page_operations::cpo_delete() method top-to-bottom.
843  *
844  * Moves page into cl_page_state::CPS_FREEING state (this is the only place
845  * where transition to this state happens).
846  *
847  * Eliminates all venues through which new references to the page can be
848  * obtained:
849  *
850  *     - removes page from the radix trees,
851  *
852  *     - breaks linkage from VM page to cl_page.
853  *
854  * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
855  * drain after some time, at which point page will be recycled.
856  *
857  * \pre  VM page is locked
858  * \post pg->cp_state == CPS_FREEING
859  *
860  * \see cl_page_operations::cpo_delete()
861  */
862 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
863 {
864         PINVRNT(env, pg, cl_page_invariant(pg));
865         ENTRY;
866         cl_page_delete0(env, pg);
867         EXIT;
868 }
869 EXPORT_SYMBOL(cl_page_delete);
870
871 /**
872  * Marks page up-to-date.
873  *
874  * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
875  * layer responsible for VM interaction has to mark/clear page as up-to-date
876  * by the \a uptodate argument.
877  *
878  * \see cl_page_operations::cpo_export()
879  */
880 void cl_page_export(const struct lu_env *env, struct cl_page *cl_page,
881                     int uptodate)
882 {
883         const struct cl_page_slice *slice;
884         int i;
885
886         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
887
888         cl_page_slice_for_each(cl_page, slice, i) {
889                 if (slice->cpl_ops->cpo_export != NULL)
890                         (*slice->cpl_ops->cpo_export)(env, slice, uptodate);
891         }
892 }
893 EXPORT_SYMBOL(cl_page_export);
894
895 /**
896  * Returns true, if \a page is VM locked in a suitable sense by the calling
897  * thread.
898  */
899 int cl_page_is_vmlocked(const struct lu_env *env,
900                         const struct cl_page *cl_page)
901 {
902         const struct cl_page_slice *slice;
903         int result;
904
905         ENTRY;
906         slice = cl_page_slice_get(cl_page, 0);
907         PASSERT(env, cl_page, slice->cpl_ops->cpo_is_vmlocked != NULL);
908         /*
909          * Call ->cpo_is_vmlocked() directly instead of going through
910          * CL_PAGE_INVOKE(), because cl_page_is_vmlocked() is used by
911          * cl_page_invariant().
912          */
913         result = slice->cpl_ops->cpo_is_vmlocked(env, slice);
914         PASSERT(env, cl_page, result == -EBUSY || result == -ENODATA);
915
916         RETURN(result == -EBUSY);
917 }
918 EXPORT_SYMBOL(cl_page_is_vmlocked);
919
920 void cl_page_touch(const struct lu_env *env,
921                    const struct cl_page *cl_page, size_t to)
922 {
923         const struct cl_page_slice *slice;
924         int i;
925
926         ENTRY;
927
928         cl_page_slice_for_each(cl_page, slice, i) {
929                 if (slice->cpl_ops->cpo_page_touch != NULL)
930                         (*slice->cpl_ops->cpo_page_touch)(env, slice, to);
931         }
932
933         EXIT;
934 }
935 EXPORT_SYMBOL(cl_page_touch);
936
937 static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
938 {
939         ENTRY;
940         RETURN(crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN);
941 }
942
943 static void cl_page_io_start(const struct lu_env *env,
944                              struct cl_page *pg, enum cl_req_type crt)
945 {
946         /*
947          * Page is queued for IO, change its state.
948          */
949         ENTRY;
950         cl_page_owner_clear(pg);
951         cl_page_state_set(env, pg, cl_req_type_state(crt));
952         EXIT;
953 }
954
955 /**
956  * Prepares page for immediate transfer. cl_page_operations::cpo_prep() is
957  * called top-to-bottom. Every layer either agrees to submit this page (by
958  * returning 0), or requests to omit this page (by returning -EALREADY). Layer
959  * handling interactions with the VM also has to inform VM that page is under
960  * transfer now.
961  */
962 int cl_page_prep(const struct lu_env *env, struct cl_io *io,
963                  struct cl_page *cl_page, enum cl_req_type crt)
964 {
965         const struct cl_page_slice *slice;
966         int result = 0;
967         int i;
968
969         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
970         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
971         PINVRNT(env, cl_page, crt < CRT_NR);
972
973         /*
974          * this has to be called bottom-to-top, so that llite can set up
975          * PG_writeback without risking other layers deciding to skip this
976          * page.
977          */
978         if (crt >= CRT_NR)
979                 return -EINVAL;
980
981         cl_page_slice_for_each(cl_page, slice, i) {
982                 if (slice->cpl_ops->cpo_own)
983                         result = (*slice->cpl_ops->io[crt].cpo_prep)(env,
984                                                                      slice,
985                                                                      io);
986                 if (result != 0)
987                         break;
988         }
989
990         if (result >= 0) {
991                 result = 0;
992                 cl_page_io_start(env, cl_page, crt);
993         }
994
995         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", crt, result);
996         return result;
997 }
998 EXPORT_SYMBOL(cl_page_prep);
999
1000 /**
1001  * Notify layers about transfer completion.
1002  *
1003  * Invoked by transfer sub-system (which is a part of osc) to notify layers
1004  * that a transfer, of which this page is a part of has completed.
1005  *
1006  * Completion call-backs are executed in the bottom-up order, so that
1007  * uppermost layer (llite), responsible for the VFS/VM interaction runs last
1008  * and can release locks safely.
1009  *
1010  * \pre  cl_page->cp_state == CPS_PAGEIN || cl_page->cp_state == CPS_PAGEOUT
1011  * \post cl_page->cl_page_state == CPS_CACHED
1012  *
1013  * \see cl_page_operations::cpo_completion()
1014  */
1015 void cl_page_completion(const struct lu_env *env,
1016                         struct cl_page *cl_page, enum cl_req_type crt,
1017                         int ioret)
1018 {
1019         const struct cl_page_slice *slice;
1020         struct cl_sync_io *anchor = cl_page->cp_sync_io;
1021         int i;
1022
1023         ENTRY;
1024         PASSERT(env, cl_page, crt < CRT_NR);
1025         PASSERT(env, cl_page, cl_page->cp_state == cl_req_type_state(crt));
1026
1027         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", crt, ioret);
1028         cl_page_state_set(env, cl_page, CPS_CACHED);
1029         if (crt >= CRT_NR)
1030                 return;
1031
1032         cl_page_slice_for_each_reverse(cl_page, slice, i) {
1033                 if (slice->cpl_ops->io[crt].cpo_completion != NULL)
1034                         (*slice->cpl_ops->io[crt].cpo_completion)(env, slice,
1035                                                                   ioret);
1036         }
1037
1038         if (anchor != NULL) {
1039                 LASSERT(cl_page->cp_sync_io == anchor);
1040                 cl_page->cp_sync_io = NULL;
1041                 cl_sync_io_note(env, anchor, ioret);
1042         }
1043         EXIT;
1044 }
1045 EXPORT_SYMBOL(cl_page_completion);
1046
1047 /**
1048  * Notify layers that transfer formation engine decided to yank this page from
1049  * the cache and to make it a part of a transfer.
1050  *
1051  * \pre  cl_page->cp_state == CPS_CACHED
1052  * \post cl_page->cp_state == CPS_PAGEIN || cl_page->cp_state == CPS_PAGEOUT
1053  *
1054  * \see cl_page_operations::cpo_make_ready()
1055  */
1056 int cl_page_make_ready(const struct lu_env *env, struct cl_page *cl_page,
1057                        enum cl_req_type crt)
1058 {
1059         const struct cl_page_slice *slice;
1060         int result = 0;
1061         int i;
1062
1063         ENTRY;
1064         PINVRNT(env, cl_page, crt < CRT_NR);
1065         if (crt >= CRT_NR)
1066                 RETURN(-EINVAL);
1067
1068         cl_page_slice_for_each(cl_page, slice, i) {
1069                 if (slice->cpl_ops->io[crt].cpo_make_ready != NULL)
1070                         result = (*slice->cpl_ops->io[crt].cpo_make_ready)(env, slice);
1071                 if (result != 0)
1072                         break;
1073         }
1074
1075         if (result >= 0) {
1076                 result = 0;
1077                 PASSERT(env, cl_page, cl_page->cp_state == CPS_CACHED);
1078                 cl_page_io_start(env, cl_page, crt);
1079         }
1080         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", crt, result);
1081
1082         RETURN(result);
1083 }
1084 EXPORT_SYMBOL(cl_page_make_ready);
1085
1086 /**
1087  * Called if a page is being written back by kernel's intention.
1088  *
1089  * \pre  cl_page_is_owned(cl_page, io)
1090  * \post ergo(result == 0, cl_page->cp_state == CPS_PAGEOUT)
1091  *
1092  * \see cl_page_operations::cpo_flush()
1093  */
1094 int cl_page_flush(const struct lu_env *env, struct cl_io *io,
1095                   struct cl_page *cl_page)
1096 {
1097         const struct cl_page_slice *slice;
1098         int result = 0;
1099         int i;
1100
1101         ENTRY;
1102         PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
1103         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
1104
1105         cl_page_slice_for_each(cl_page, slice, i) {
1106                 if (slice->cpl_ops->cpo_flush != NULL)
1107                         result = (*slice->cpl_ops->cpo_flush)(env, slice, io);
1108                 if (result != 0)
1109                         break;
1110         }
1111         if (result > 0)
1112                 result = 0;
1113
1114         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d\n", result);
1115         RETURN(result);
1116 }
1117 EXPORT_SYMBOL(cl_page_flush);
1118
1119 /**
1120  * Tells transfer engine that only part of a page is to be transmitted.
1121  *
1122  * \see cl_page_operations::cpo_clip()
1123  */
1124 void cl_page_clip(const struct lu_env *env, struct cl_page *cl_page,
1125                   int from, int to)
1126 {
1127         const struct cl_page_slice *slice;
1128         int i;
1129
1130         PINVRNT(env, cl_page, cl_page_invariant(cl_page));
1131
1132         CL_PAGE_HEADER(D_TRACE, env, cl_page, "%d %d\n", from, to);
1133         cl_page_slice_for_each(cl_page, slice, i) {
1134                 if (slice->cpl_ops->cpo_clip != NULL)
1135                         (*slice->cpl_ops->cpo_clip)(env, slice, from, to);
1136         }
1137 }
1138 EXPORT_SYMBOL(cl_page_clip);
1139
1140 /**
1141  * Prints human readable representation of \a pg to the \a f.
1142  */
1143 void cl_page_header_print(const struct lu_env *env, void *cookie,
1144                           lu_printer_t printer, const struct cl_page *pg)
1145 {
1146         (*printer)(env, cookie,
1147                    "page@%p[%d %p %d %d %p]\n",
1148                    pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1149                    pg->cp_state, pg->cp_type,
1150                    pg->cp_owner);
1151 }
1152 EXPORT_SYMBOL(cl_page_header_print);
1153
1154 /**
1155  * Prints human readable representation of \a cl_page to the \a f.
1156  */
1157 void cl_page_print(const struct lu_env *env, void *cookie,
1158                    lu_printer_t printer, const struct cl_page *cl_page)
1159 {
1160         const struct cl_page_slice *slice;
1161         int result = 0;
1162         int i;
1163
1164         cl_page_header_print(env, cookie, printer, cl_page);
1165         cl_page_slice_for_each(cl_page, slice, i) {
1166                 if (slice->cpl_ops->cpo_print != NULL)
1167                         result = (*slice->cpl_ops->cpo_print)(env, slice,
1168                                                              cookie, printer);
1169                 if (result != 0)
1170                         break;
1171         }
1172         (*printer)(env, cookie, "end page@%p\n", cl_page);
1173 }
1174 EXPORT_SYMBOL(cl_page_print);
1175
1176 /**
1177  * Converts a byte offset within object \a obj into a page index.
1178  */
1179 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1180 {
1181         return (loff_t)idx << PAGE_SHIFT;
1182 }
1183 EXPORT_SYMBOL(cl_offset);
1184
1185 /**
1186  * Converts a page index into a byte offset within object \a obj.
1187  */
1188 pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1189 {
1190         return offset >> PAGE_SHIFT;
1191 }
1192 EXPORT_SYMBOL(cl_index);
1193
1194 size_t cl_page_size(const struct cl_object *obj)
1195 {
1196         return 1UL << PAGE_SHIFT;
1197 }
1198 EXPORT_SYMBOL(cl_page_size);
1199
1200 /**
1201  * Adds page slice to the compound page.
1202  *
1203  * This is called by cl_object_operations::coo_page_init() methods to add a
1204  * per-layer state to the page. New state is added at the end of
1205  * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1206  *
1207  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1208  */
1209 void cl_page_slice_add(struct cl_page *cl_page, struct cl_page_slice *slice,
1210                        struct cl_object *obj,
1211                        const struct cl_page_operations *ops)
1212 {
1213         unsigned int offset = (char *)slice -
1214                         ((char *)cl_page + sizeof(*cl_page));
1215
1216         ENTRY;
1217         LASSERT(cl_page->cp_layer_count < CP_MAX_LAYER);
1218         LASSERT(offset < (1 << sizeof(cl_page->cp_layer_offset[0]) * 8));
1219         cl_page->cp_layer_offset[cl_page->cp_layer_count++] = offset;
1220         slice->cpl_obj  = obj;
1221         slice->cpl_ops  = ops;
1222         slice->cpl_page = cl_page;
1223
1224         EXIT;
1225 }
1226 EXPORT_SYMBOL(cl_page_slice_add);
1227
1228 /**
1229  * Allocate and initialize cl_cache, called by ll_init_sbi().
1230  */
1231 struct cl_client_cache *cl_cache_init(unsigned long lru_page_max)
1232 {
1233         struct cl_client_cache  *cache = NULL;
1234
1235         ENTRY;
1236         OBD_ALLOC(cache, sizeof(*cache));
1237         if (cache == NULL)
1238                 RETURN(NULL);
1239
1240         /* Initialize cache data */
1241         atomic_set(&cache->ccc_users, 1);
1242         cache->ccc_lru_max = lru_page_max;
1243         atomic_long_set(&cache->ccc_lru_left, lru_page_max);
1244         spin_lock_init(&cache->ccc_lru_lock);
1245         INIT_LIST_HEAD(&cache->ccc_lru);
1246
1247         /* turn unstable check off by default as it impacts performance */
1248         cache->ccc_unstable_check = 0;
1249         atomic_long_set(&cache->ccc_unstable_nr, 0);
1250         init_waitqueue_head(&cache->ccc_unstable_waitq);
1251         mutex_init(&cache->ccc_max_cache_mb_lock);
1252
1253         RETURN(cache);
1254 }
1255 EXPORT_SYMBOL(cl_cache_init);
1256
1257 /**
1258  * Increase cl_cache refcount
1259  */
1260 void cl_cache_incref(struct cl_client_cache *cache)
1261 {
1262         atomic_inc(&cache->ccc_users);
1263 }
1264 EXPORT_SYMBOL(cl_cache_incref);
1265
1266 /**
1267  * Decrease cl_cache refcount and free the cache if refcount=0.
1268  * Since llite, lov and osc all hold cl_cache refcount,
1269  * the free will not cause race. (LU-6173)
1270  */
1271 void cl_cache_decref(struct cl_client_cache *cache)
1272 {
1273         if (atomic_dec_and_test(&cache->ccc_users))
1274                 OBD_FREE(cache, sizeof(*cache));
1275 }
1276 EXPORT_SYMBOL(cl_cache_decref);