Whamcloud - gitweb
land clio.
[fs/lustre-release.git] / lustre / obdclass / cl_page.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client Lustre Page.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_CLASS
42 #ifndef EXPORT_SYMTAB
43 # define EXPORT_SYMTAB
44 #endif
45
46 #include <libcfs/libcfs.h>
47 #include <obd_class.h>
48 #include <obd_support.h>
49 #include <libcfs/list.h>
50
51 #include <cl_object.h>
52 #include "cl_internal.h"
53
54 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
55                             int radix);
56
57 static cfs_mem_cache_t      *cl_page_kmem = NULL;
58
59 static struct lu_kmem_descr cl_page_caches[] = {
60         {
61                 .ckd_cache = &cl_page_kmem,
62                 .ckd_name  = "cl_page_kmem",
63                 .ckd_size  = sizeof (struct cl_page)
64         },
65         {
66                 .ckd_cache = NULL
67         }
68 };
69
70 #ifdef LIBCFS_DEBUG
71 # define PASSERT(env, page, expr)                                       \
72   do {                                                                    \
73           if (unlikely(!(expr))) {                                      \
74                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
75                   LASSERT(0);                                           \
76           }                                                             \
77   } while (0)
78 #else /* !LIBCFS_DEBUG */
79 # define PASSERT(env, page, exp) \
80         ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
81 #endif /* !LIBCFS_DEBUG */
82
83 #ifdef INVARIANT_CHECK
84 # define PINVRNT(env, page, expr)                                       \
85   do {                                                                    \
86           if (unlikely(!(expr))) {                                      \
87                   CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
88                   LINVRNT(0);                                           \
89           }                                                             \
90   } while (0)
91 #else /* !INVARIANT_CHECK */
92 # define PINVRNT(env, page, exp) \
93         ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
94 #endif /* !INVARIANT_CHECK */
95
96 /**
97  * Internal version of cl_page_top, it should be called with page referenced,
98  * or coh_page_guard held.
99  */
100 static struct cl_page *cl_page_top_trusted(struct cl_page *page)
101 {
102         LASSERT(cl_is_page(page));
103         while (page->cp_parent != NULL)
104                 page = page->cp_parent;
105         return page;
106 }
107
108 /**
109  * Internal version of cl_page_get().
110  *
111  * This function can be used to obtain initial reference to previously
112  * unreferenced cached object. It can be called only if concurrent page
113  * reclamation is somehow prevented, e.g., by locking page radix-tree
114  * (cl_object_header::hdr->coh_page_guard), or by keeping a lock on a VM page,
115  * associated with \a page.
116  *
117  * Use with care! Not exported.
118  */
119 static void cl_page_get_trust(struct cl_page *page)
120 {
121         LASSERT(cl_is_page(page));
122         /*
123          * Checkless version for trusted users.
124          */
125         if (atomic_inc_return(&page->cp_ref) == 1)
126                 atomic_inc(&cl_object_site(page->cp_obj)->cs_pages.cs_busy);
127 }
128
129 /**
130  * Returns a slice within a page, corresponding to the given layer in the
131  * device stack.
132  *
133  * \see cl_lock_at()
134  */
135 static const struct cl_page_slice *
136 cl_page_at_trusted(const struct cl_page *page,
137                    const struct lu_device_type *dtype)
138 {
139         const struct cl_page_slice *slice;
140         struct cl_object_header *ch = cl_object_header(page->cp_obj);
141
142         ENTRY;
143         LINVRNT(ergo(!atomic_read(&page->cp_ref),
144                 spin_is_locked(&ch->coh_page_guard)));
145
146         page = cl_page_top_trusted((struct cl_page *)page);
147         do {
148                 list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
149                         if (slice->cpl_obj->co_lu.lo_dev->ld_type == dtype)
150                                 RETURN(slice);
151                 }
152                 page = page->cp_child;
153         } while (page != NULL);
154         RETURN(NULL);
155 }
156
157 /**
158  * Returns a page with given index in the given object, or NULL if no page is
159  * found. Acquires a reference on \a page.
160  *
161  * Locking: called under cl_object_header::coh_page_guard spin-lock.
162  */
163 struct cl_page *cl_page_lookup(struct cl_object_header *hdr, pgoff_t index)
164 {
165         struct cl_page *page;
166
167         LASSERT(spin_is_locked(&hdr->coh_page_guard));
168
169         page = radix_tree_lookup(&hdr->coh_tree, index);
170         if (page != NULL) {
171                 LASSERT(cl_is_page(page));
172                 cl_page_get_trust(page);
173         }
174         return page;
175 }
176 EXPORT_SYMBOL(cl_page_lookup);
177
178 /**
179  * Returns a list of pages by a given [start, end] of @obj.
180  *
181  * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
182  * crucial in the face of [offset, EOF] locks.
183  */
184 void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
185                          struct cl_io *io, pgoff_t start, pgoff_t end,
186                          struct cl_page_list *queue)
187 {
188         struct cl_object_header *hdr;
189         struct cl_page          *page;
190         struct cl_page         **pvec;
191         const struct cl_page_slice  *slice;
192         const struct lu_device_type *dtype;
193         pgoff_t                  idx;
194         unsigned int             nr;
195         unsigned int             i;
196         unsigned int             j;
197         ENTRY;
198
199         idx = start;
200         hdr = cl_object_header(obj);
201         pvec = cl_env_info(env)->clt_pvec;
202         dtype = cl_object_top(obj)->co_lu.lo_dev->ld_type;
203         spin_lock(&hdr->coh_page_guard);
204         while ((nr = radix_tree_gang_lookup(&hdr->coh_tree, (void **)pvec,
205                                             idx, CLT_PVEC_SIZE)) > 0) {
206                 idx = pvec[nr - 1]->cp_index + 1;
207                 for (i = 0, j = 0; i < nr; ++i) {
208                         page = pvec[i];
209                         PASSERT(env, page, cl_is_page(page));
210                         pvec[i] = NULL;
211                         if (page->cp_index > end)
212                                 break;
213                         if (page->cp_state == CPS_FREEING)
214                                 continue;
215                         if (page->cp_type == CPT_TRANSIENT) {
216                                 /* God, we found a transient page!*/
217                                 continue;
218                         }
219
220                         slice = cl_page_at_trusted(page, dtype);
221                         /*
222                          * Pages for lsm-less file has no underneath sub-page
223                          * for osc, in case of ...
224                          */
225                         PASSERT(env, page, slice != NULL);
226                         page = slice->cpl_page;
227                         /*
228                          * Can safely call cl_page_get_trust() under
229                          * radix-tree spin-lock.
230                          *
231                          * XXX not true, because @page is from object another
232                          * than @hdr and protected by different tree lock.
233                          */
234                         cl_page_get_trust(page);
235                         lu_ref_add_atomic(&page->cp_reference,
236                                           "page_list", cfs_current());
237                         pvec[j++] = page;
238                 }
239
240                 /*
241                  * Here a delicate locking dance is performed. Current thread
242                  * holds a reference to a page, but has to own it before it
243                  * can be placed into queue. Owning implies waiting, so
244                  * radix-tree lock is to be released. After a wait one has to
245                  * check that pages weren't truncated (cl_page_own() returns
246                  * error in the latter case).
247                  */
248                 spin_unlock(&hdr->coh_page_guard);
249                 for (i = 0; i < j; ++i) {
250                         page = pvec[i];
251                         if (cl_page_own(env, io, page) == 0)
252                                 cl_page_list_add(queue, page);
253                         lu_ref_del(&page->cp_reference,
254                                    "page_list", cfs_current());
255                         cl_page_put(env, page);
256                 }
257                 spin_lock(&hdr->coh_page_guard);
258                 if (nr < CLT_PVEC_SIZE)
259                         break;
260         }
261         spin_unlock(&hdr->coh_page_guard);
262         EXIT;
263 }
264 EXPORT_SYMBOL(cl_page_gang_lookup);
265
266 static void cl_page_free(const struct lu_env *env, struct cl_page *page)
267 {
268         struct cl_object *obj  = page->cp_obj;
269         struct cl_site   *site = cl_object_site(obj);
270
271         PASSERT(env, page, cl_is_page(page));
272         PASSERT(env, page, list_empty(&page->cp_batch));
273         PASSERT(env, page, page->cp_owner == NULL);
274         PASSERT(env, page, page->cp_req == NULL);
275         PASSERT(env, page, page->cp_parent == NULL);
276         PASSERT(env, page, page->cp_state == CPS_FREEING);
277
278         ENTRY;
279         might_sleep();
280         while (!list_empty(&page->cp_layers)) {
281                 struct cl_page_slice *slice;
282
283                 slice = list_entry(page->cp_layers.next, struct cl_page_slice,
284                                    cpl_linkage);
285                 list_del_init(page->cp_layers.next);
286                 slice->cpl_ops->cpo_fini(env, slice);
287         }
288         atomic_dec(&site->cs_pages.cs_total);
289         atomic_dec(&site->cs_pages_state[page->cp_state]);
290         lu_object_ref_del_at(&obj->co_lu, page->cp_obj_ref, "cl_page", page);
291         cl_object_put(env, obj);
292         lu_ref_fini(&page->cp_reference);
293         OBD_SLAB_FREE_PTR(page, cl_page_kmem);
294         EXIT;
295 }
296
297 /**
298  * Helper function updating page state. This is the only place in the code
299  * where cl_page::cp_state field is mutated.
300  */
301 static inline void cl_page_state_set_trust(struct cl_page *page,
302                                            enum cl_page_state state)
303 {
304         /* bypass const. */
305         *(enum cl_page_state *)&page->cp_state = state;
306 }
307
308 static int cl_page_alloc(const struct lu_env *env, struct cl_object *o,
309                          pgoff_t ind, struct page *vmpage,
310                          enum cl_page_type type, struct cl_page **out)
311 {
312         struct cl_page          *page;
313         struct cl_page          *err  = NULL;
314         struct lu_object_header *head;
315         struct cl_site          *site = cl_object_site(o);
316         int                      result;
317
318         ENTRY;
319         result = +1;
320         OBD_SLAB_ALLOC_PTR(page, cl_page_kmem);
321         if (page != NULL) {
322                 atomic_set(&page->cp_ref, 1);
323                 page->cp_obj = o;
324                 cl_object_get(o);
325                 page->cp_obj_ref = lu_object_ref_add(&o->co_lu,
326                                                      "cl_page", page);
327                 page->cp_index = ind;
328                 cl_page_state_set_trust(page, CPS_CACHED);
329                 page->cp_type = type;
330                 CFS_INIT_LIST_HEAD(&page->cp_layers);
331                 CFS_INIT_LIST_HEAD(&page->cp_batch);
332                 CFS_INIT_LIST_HEAD(&page->cp_flight);
333                 mutex_init(&page->cp_mutex);
334                 lu_ref_init(&page->cp_reference);
335                 head = o->co_lu.lo_header;
336                 list_for_each_entry(o, &head->loh_layers, co_lu.lo_linkage) {
337                         if (o->co_ops->coo_page_init != NULL) {
338                                 err = o->co_ops->coo_page_init(env, o,
339                                                                page, vmpage);
340                                 if (err != NULL) {
341                                         cl_page_state_set_trust(page,
342                                                                 CPS_FREEING);
343                                         cl_page_free(env, page);
344                                         page = err;
345                                         break;
346                                 }
347                         }
348                 }
349                 if (err == NULL) {
350                         atomic_inc(&site->cs_pages.cs_busy);
351                         atomic_inc(&site->cs_pages.cs_total);
352                         atomic_inc(&site->cs_pages_state[CPS_CACHED]);
353                         atomic_inc(&site->cs_pages.cs_created);
354                         result = 0;
355                 }
356         } else
357                 page = ERR_PTR(-ENOMEM);
358         *out = page;
359         RETURN(result);
360 }
361
362 /**
363  * Returns a cl_page with index \a idx at the object \a o, and associated with
364  * the VM page \a vmpage.
365  *
366  * This is the main entry point into the cl_page caching interface. First, a
367  * cache (implemented as a per-object radix tree) is consulted. If page is
368  * found there, it is returned immediately. Otherwise new page is allocated
369  * and returned. In any case, additional reference to page is acquired.
370  *
371  * \see cl_object_find(), cl_lock_find()
372  */
373 struct cl_page *cl_page_find(const struct lu_env *env, struct cl_object *o,
374                              pgoff_t idx, struct page *vmpage,
375                              enum cl_page_type type)
376 {
377         struct cl_page          *page;
378         struct cl_page          *ghost = NULL;
379         struct cl_object_header *hdr;
380         struct cl_site          *site = cl_object_site(o);
381         int err;
382
383         LINVRNT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
384         might_sleep();
385
386         ENTRY;
387
388         hdr = cl_object_header(o);
389         atomic_inc(&site->cs_pages.cs_lookup);
390
391         CDEBUG(D_PAGE, "%lu@"DFID" %p %lu %i\n",
392                idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
393         /* fast path. */
394         if (type == CPT_CACHEABLE) {
395                 /*
396                  * cl_vmpage_page() can be called here without any locks as
397                  *
398                  *     - "vmpage" is locked (which prevents ->private from
399                  *       concurrent updates), and
400                  *
401                  *     - "o" cannot be destroyed while current thread holds a
402                  *       reference on it.
403                  */
404                 page = cl_vmpage_page(vmpage, o);
405                 PINVRNT(env, page,
406                         ergo(page != NULL,
407                              cl_page_vmpage(env, page) == vmpage &&
408                              (void *)radix_tree_lookup(&hdr->coh_tree,
409                                                        idx) == page));
410         } else {
411                 spin_lock(&hdr->coh_page_guard);
412                 page = cl_page_lookup(hdr, idx);
413                 spin_unlock(&hdr->coh_page_guard);
414         }
415         if (page != NULL) {
416                 atomic_inc(&site->cs_pages.cs_hit);
417                 RETURN(page);
418         }
419
420         /* allocate and initialize cl_page */
421         err = cl_page_alloc(env, o, idx, vmpage, type, &page);
422         if (err != 0)
423                 RETURN(page);
424         /*
425          * XXX optimization: use radix_tree_preload() here, and change tree
426          * gfp mask to GFP_KERNEL in cl_object_header_init().
427          */
428         spin_lock(&hdr->coh_page_guard);
429         err = radix_tree_insert(&hdr->coh_tree, idx, page);
430         if (err != 0) {
431                 ghost = page;
432                 /*
433                  * Noted by Jay: a lock on \a vmpage protects cl_page_find()
434                  * from this race, but
435                  *
436                  *     0. it's better to have cl_page interface "locally
437                  *     consistent" so that its correctness can be reasoned
438                  *     about without appealing to the (obscure world of) VM
439                  *     locking.
440                  *
441                  *     1. handling this race allows ->coh_tree to remain
442                  *     consistent even when VM locking is somehow busted,
443                  *     which is very useful during diagnosing and debugging.
444                  */
445                 if (err == -EEXIST) {
446                         /*
447                          * XXX in case of a lookup for CPT_TRANSIENT page,
448                          * nothing protects a CPT_CACHEABLE page from being
449                          * concurrently moved into CPS_FREEING state.
450                          */
451                         page = cl_page_lookup(hdr, idx);
452                         PASSERT(env, page, page != NULL);
453                         if (page->cp_type == CPT_TRANSIENT &&
454                             type == CPT_CACHEABLE) {
455                                 /* XXX: We should make sure that inode sem
456                                  * keeps being held in the lifetime of
457                                  * transient pages, so it is impossible to
458                                  * have conflicting transient pages.
459                                  */
460                                 spin_unlock(&hdr->coh_page_guard);
461                                 cl_page_put(env, page);
462                                 spin_lock(&hdr->coh_page_guard);
463                                 page = ERR_PTR(-EBUSY);
464                         }
465                 } else
466                         page = ERR_PTR(err);
467         } else
468                 hdr->coh_pages++;
469         spin_unlock(&hdr->coh_page_guard);
470
471         if (unlikely(ghost != NULL)) {
472                 atomic_dec(&site->cs_pages.cs_busy);
473                 cl_page_delete0(env, ghost, 0);
474                 cl_page_free(env, ghost);
475         }
476         RETURN(page);
477 }
478 EXPORT_SYMBOL(cl_page_find);
479
480 static inline int cl_page_invariant(const struct cl_page *pg)
481 {
482         struct cl_object_header *header;
483         struct cl_page          *parent;
484         struct cl_page          *child;
485         struct cl_io            *owner;
486
487         LASSERT(cl_is_page(pg));
488         /*
489          * Page invariant is protected by a VM lock.
490          */
491         LINVRNT(cl_page_is_vmlocked(NULL, pg));
492
493         header = cl_object_header(pg->cp_obj);
494         parent = pg->cp_parent;
495         child  = pg->cp_child;
496         owner  = pg->cp_owner;
497
498         return atomic_read(&pg->cp_ref) > 0 &&
499                 ergo(parent != NULL, parent->cp_child == pg) &&
500                 ergo(child != NULL, child->cp_parent == pg) &&
501                 ergo(child != NULL, pg->cp_obj != child->cp_obj) &&
502                 ergo(parent != NULL, pg->cp_obj != parent->cp_obj) &&
503                 ergo(owner != NULL && parent != NULL,
504                      parent->cp_owner == pg->cp_owner->ci_parent) &&
505                 ergo(owner != NULL && child != NULL,
506                      child->cp_owner->ci_parent == owner) &&
507                 /*
508                  * Either page is early in initialization (has neither child
509                  * nor parent yet), or it is in the object radix tree.
510                  */
511                 ergo(pg->cp_state < CPS_FREEING,
512                      (void *)radix_tree_lookup(&header->coh_tree,
513                                                pg->cp_index) == pg ||
514                      (child == NULL && parent == NULL));
515 }
516
517 static void cl_page_state_set0(const struct lu_env *env,
518                                struct cl_page *page, enum cl_page_state state)
519 {
520         enum cl_page_state old;
521         struct cl_site *site = cl_object_site(page->cp_obj);
522
523         /*
524          * Matrix of allowed state transitions [old][new], for sanity
525          * checking.
526          */
527         static const int allowed_transitions[CPS_NR][CPS_NR] = {
528                 [CPS_CACHED] = {
529                         [CPS_CACHED]  = 0,
530                         [CPS_OWNED]   = 1, /* io finds existing cached page */
531                         [CPS_PAGEIN]  = 0,
532                         [CPS_PAGEOUT] = 1, /* write-out from the cache */
533                         [CPS_FREEING] = 1, /* eviction on the memory pressure */
534                 },
535                 [CPS_OWNED] = {
536                         [CPS_CACHED]  = 1, /* release to the cache */
537                         [CPS_OWNED]   = 0,
538                         [CPS_PAGEIN]  = 1, /* start read immediately */
539                         [CPS_PAGEOUT] = 1, /* start write immediately */
540                         [CPS_FREEING] = 1, /* lock invalidation or truncate */
541                 },
542                 [CPS_PAGEIN] = {
543                         [CPS_CACHED]  = 1, /* io completion */
544                         [CPS_OWNED]   = 0,
545                         [CPS_PAGEIN]  = 0,
546                         [CPS_PAGEOUT] = 0,
547                         [CPS_FREEING] = 0,
548                 },
549                 [CPS_PAGEOUT] = {
550                         [CPS_CACHED]  = 1, /* io completion */
551                         [CPS_OWNED]   = 0,
552                         [CPS_PAGEIN]  = 0,
553                         [CPS_PAGEOUT] = 0,
554                         [CPS_FREEING] = 0,
555                 },
556                 [CPS_FREEING] = {
557                         [CPS_CACHED]  = 0,
558                         [CPS_OWNED]   = 0,
559                         [CPS_PAGEIN]  = 0,
560                         [CPS_PAGEOUT] = 0,
561                         [CPS_FREEING] = 0,
562                 }
563         };
564
565         ENTRY;
566         old = page->cp_state;
567         PASSERT(env, page, allowed_transitions[old][state]);
568         CL_PAGE_HEADER(D_TRACE, env, page, "%i -> %i\n", old, state);
569         for (; page != NULL; page = page->cp_child) {
570                 PASSERT(env, page, page->cp_state == old);
571                 PASSERT(env, page,
572                         equi(state == CPS_OWNED, page->cp_owner != NULL));
573
574                 atomic_dec(&site->cs_pages_state[page->cp_state]);
575                 atomic_inc(&site->cs_pages_state[state]);
576                 cl_page_state_set_trust(page, state);
577         }
578         EXIT;
579 }
580
581 static void cl_page_state_set(const struct lu_env *env,
582                               struct cl_page *page, enum cl_page_state state)
583 {
584         PINVRNT(env, page, cl_page_invariant(page));
585         cl_page_state_set0(env, page, state);
586 }
587
588 /**
589  * Acquires an additional reference to a page.
590  *
591  * This can be called only by caller already possessing a reference to \a
592  * page.
593  *
594  * \see cl_object_get(), cl_lock_get().
595  */
596 void cl_page_get(struct cl_page *page)
597 {
598         ENTRY;
599         LASSERT(page->cp_state != CPS_FREEING);
600         cl_page_get_trust(page);
601         EXIT;
602 }
603 EXPORT_SYMBOL(cl_page_get);
604
605 /**
606  * Releases a reference to a page.
607  *
608  * When last reference is released, page is returned to the cache, unless it
609  * is in cl_page_state::CPS_FREEING state, in which case it is immediately
610  * destroyed.
611  *
612  * \see cl_object_put(), cl_lock_put().
613  */
614 void cl_page_put(const struct lu_env *env, struct cl_page *page)
615 {
616         struct cl_object_header *hdr;
617         struct cl_site *site = cl_object_site(page->cp_obj);
618
619         PASSERT(env, page, atomic_read(&page->cp_ref) > !!page->cp_parent);
620
621         ENTRY;
622         CL_PAGE_HEADER(D_TRACE, env, page, "%i\n", atomic_read(&page->cp_ref));
623         hdr = cl_object_header(page->cp_obj);
624         if (atomic_dec_and_test(&page->cp_ref)) {
625                 atomic_dec(&site->cs_pages.cs_busy);
626                 if (page->cp_state == CPS_FREEING) {
627                         PASSERT(env, page, page->cp_owner == NULL);
628                         PASSERT(env, page, list_empty(&page->cp_batch));
629                         /*
630                          * Page is no longer reachable by other threads. Tear
631                          * it down.
632                          */
633                         cl_page_free(env, page);
634                 }
635         }
636         EXIT;
637 }
638 EXPORT_SYMBOL(cl_page_put);
639
640 /**
641  * Returns a VM page associated with a given cl_page.
642  */
643 cfs_page_t *cl_page_vmpage(const struct lu_env *env, struct cl_page *page)
644 {
645         const struct cl_page_slice *slice;
646
647         /*
648          * Find uppermost layer with ->cpo_vmpage() method, and return its
649          * result.
650          */
651         page = cl_page_top(page);
652         do {
653                 list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
654                         if (slice->cpl_ops->cpo_vmpage != NULL)
655                                 RETURN(slice->cpl_ops->cpo_vmpage(env, slice));
656                 }
657                 page = page->cp_child;
658         } while (page != NULL);
659         LBUG(); /* ->cpo_vmpage() has to be defined somewhere in the stack */
660 }
661 EXPORT_SYMBOL(cl_page_vmpage);
662
663 /**
664  * Returns a cl_page associated with a VM page, and given cl_object.
665  */
666 struct cl_page *cl_vmpage_page(cfs_page_t *vmpage, struct cl_object *obj)
667 {
668         struct cl_page *page;
669
670         ENTRY;
671         KLASSERT(PageLocked(vmpage));
672
673         /*
674          * NOTE: absence of races and liveness of data are guaranteed by page
675          *       lock on a "vmpage". That works because object destruction has
676          *       bottom-to-top pass.
677          */
678
679         /*
680          * This loop assumes that ->private points to the top-most page. This
681          * can be rectified easily.
682          */
683         for (page = (void *)vmpage->private;
684              page != NULL; page = page->cp_child) {
685                 if (cl_object_same(page->cp_obj, obj)) {
686                         cl_page_get_trust(page);
687                         break;
688                 }
689         }
690         LASSERT(ergo(page, cl_is_page(page) && page->cp_type == CPT_CACHEABLE));
691         RETURN(page);
692 }
693 EXPORT_SYMBOL(cl_vmpage_page);
694
695 /**
696  * Returns the top-page for a given page.
697  *
698  * \see cl_object_top(), cl_io_top()
699  */
700 struct cl_page *cl_page_top(struct cl_page *page)
701 {
702         return cl_page_top_trusted(page);
703 }
704 EXPORT_SYMBOL(cl_page_top);
705
706 /**
707  * Returns true if \a addr is an address of an allocated cl_page. Used in
708  * assertions. This check is optimistically imprecise, i.e., it occasionally
709  * returns true for the incorrect addresses, but if it returns false, then the
710  * address is guaranteed to be incorrect. (Should be named cl_pagep().)
711  *
712  * \see cl_is_lock()
713  */
714 int cl_is_page(const void *addr)
715 {
716         return cfs_mem_is_in_cache(addr, cl_page_kmem);
717 }
718 EXPORT_SYMBOL(cl_is_page);
719
720 const struct cl_page_slice *cl_page_at(const struct cl_page *page,
721                                        const struct lu_device_type *dtype)
722 {
723         return cl_page_at_trusted(page, dtype);
724 }
725 EXPORT_SYMBOL(cl_page_at);
726
727 #define CL_PAGE_OP(opname) offsetof(struct cl_page_operations, opname)
728
729 #define CL_PAGE_INVOKE(_env, _page, _op, _proto, ...)                   \
730 ({                                                                      \
731         const struct lu_env        *__env  = (_env);                    \
732         struct cl_page             *__page = (_page);                   \
733         const struct cl_page_slice *__scan;                             \
734         int                         __result;                           \
735         ptrdiff_t                   __op   = (_op);                     \
736         int                       (*__method)_proto;                    \
737                                                                         \
738         __result = 0;                                                   \
739         __page = cl_page_top(__page);                                   \
740         do {                                                            \
741                 list_for_each_entry(__scan, &__page->cp_layers,         \
742                                     cpl_linkage) {                      \
743                         __method = *(void **)((char *)__scan->cpl_ops + \
744                                               __op);                    \
745                         if (__method != NULL) {                         \
746                                 __result = (*__method)(__env, __scan,   \
747                                                        ## __VA_ARGS__); \
748                                 if (__result != 0)                      \
749                                         break;                          \
750                         }                                               \
751                 }                                                       \
752                 __page = __page->cp_child;                              \
753         } while (__page != NULL && __result == 0);                      \
754         if (__result > 0)                                               \
755                 __result = 0;                                           \
756         __result;                                                       \
757 })
758
759 #define CL_PAGE_INVOID(_env, _page, _op, _proto, ...)                   \
760 do {                                                                    \
761         const struct lu_env        *__env  = (_env);                    \
762         struct cl_page             *__page = (_page);                   \
763         const struct cl_page_slice *__scan;                             \
764         ptrdiff_t                   __op   = (_op);                     \
765         void                      (*__method)_proto;                    \
766                                                                         \
767         __page = cl_page_top(__page);                                   \
768         do {                                                            \
769                 list_for_each_entry(__scan, &__page->cp_layers,         \
770                                     cpl_linkage) {                      \
771                         __method = *(void **)((char *)__scan->cpl_ops + \
772                                               __op);                    \
773                         if (__method != NULL)                           \
774                                 (*__method)(__env, __scan,              \
775                                             ## __VA_ARGS__);            \
776                 }                                                       \
777                 __page = __page->cp_child;                              \
778         } while (__page != NULL);                                       \
779 } while (0)
780
781 #define CL_PAGE_INVOID_REVERSE(_env, _page, _op, _proto, ...)           \
782 do {                                                                    \
783         const struct lu_env        *__env  = (_env);                    \
784         struct cl_page             *__page = (_page);                   \
785         const struct cl_page_slice *__scan;                             \
786         ptrdiff_t                   __op   = (_op);                     \
787         void                      (*__method)_proto;                    \
788                                                                         \
789         /* get to the bottom page. */                                   \
790         while (__page->cp_child != NULL)                                \
791                 __page = __page->cp_child;                              \
792         do {                                                            \
793                 list_for_each_entry_reverse(__scan, &__page->cp_layers, \
794                                             cpl_linkage) {              \
795                         __method = *(void **)((char *)__scan->cpl_ops + \
796                                               __op);                    \
797                         if (__method != NULL)                           \
798                                 (*__method)(__env, __scan,              \
799                                             ## __VA_ARGS__);            \
800                 }                                                       \
801                 __page = __page->cp_parent;                             \
802         } while (__page != NULL);                                       \
803 } while (0)
804
805 static int cl_page_invoke(const struct lu_env *env,
806                           struct cl_io *io, struct cl_page *page, ptrdiff_t op)
807
808 {
809         PINVRNT(env, page, cl_object_same(page->cp_obj, io->ci_obj));
810         ENTRY;
811         RETURN(CL_PAGE_INVOKE(env, page, op,
812                               (const struct lu_env *,
813                                const struct cl_page_slice *, struct cl_io *),
814                               io));
815 }
816
817 static void cl_page_invoid(const struct lu_env *env,
818                            struct cl_io *io, struct cl_page *page, ptrdiff_t op)
819
820 {
821         PINVRNT(env, page, cl_object_same(page->cp_obj, io->ci_obj));
822         ENTRY;
823         CL_PAGE_INVOID(env, page, op,
824                        (const struct lu_env *,
825                         const struct cl_page_slice *, struct cl_io *), io);
826         EXIT;
827 }
828
829 static void cl_page_owner_clear(struct cl_page *page)
830 {
831         ENTRY;
832         for (page = cl_page_top(page); page != NULL; page = page->cp_child) {
833                 if (page->cp_owner != NULL) {
834                         LASSERT(page->cp_owner->ci_owned_nr > 0);
835                         page->cp_owner->ci_owned_nr--;
836                         page->cp_owner = NULL;
837                 }
838         }
839         EXIT;
840 }
841
842 static void cl_page_owner_set(struct cl_page *page)
843 {
844         ENTRY;
845         for (page = cl_page_top(page); page != NULL; page = page->cp_child) {
846                 LASSERT(page->cp_owner != NULL);
847                 page->cp_owner->ci_owned_nr++;
848         }
849         EXIT;
850 }
851
852 void cl_page_disown0(const struct lu_env *env,
853                      struct cl_io *io, struct cl_page *pg)
854 {
855         enum cl_page_state state;
856
857         ENTRY;
858         state = pg->cp_state;
859         PINVRNT(env, pg, state == CPS_OWNED || state == CPS_FREEING);
860         PINVRNT(env, pg, cl_page_invariant(pg));
861         cl_page_owner_clear(pg);
862
863         if (state == CPS_OWNED)
864                 cl_page_state_set(env, pg, CPS_CACHED);
865         /*
866          * Completion call-backs are executed in the bottom-up order, so that
867          * uppermost layer (llite), responsible for VFS/VM interaction runs
868          * last and can release locks safely.
869          */
870         CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(cpo_disown),
871                                (const struct lu_env *,
872                                 const struct cl_page_slice *, struct cl_io *),
873                                io);
874         EXIT;
875 }
876
877 /**
878  * returns true, iff page is owned by the given io.
879  */
880 int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
881 {
882         LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
883         ENTRY;
884         RETURN(pg->cp_state == CPS_OWNED && pg->cp_owner == io);
885 }
886 EXPORT_SYMBOL(cl_page_is_owned);
887
888 /**
889  * Owns a page by IO.
890  *
891  * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
892  * into cl_page_state::CPS_OWNED state.
893  *
894  * \pre  !cl_page_is_owned(pg, io)
895  * \post result == 0 iff cl_page_is_owned(pg, io)
896  *
897  * \retval 0   success
898  *
899  * \retval -ve failure, e.g., page was destroyed (and landed in
900  *             cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
901  *
902  * \see cl_page_disown()
903  * \see cl_page_operations::cpo_own()
904  */
905 int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
906 {
907         int result;
908
909         PINVRNT(env, pg, !cl_page_is_owned(pg, io));
910
911         ENTRY;
912         pg = cl_page_top(pg);
913         io = cl_io_top(io);
914
915         cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_own));
916         PASSERT(env, pg, pg->cp_owner == NULL);
917         PASSERT(env, pg, pg->cp_req == NULL);
918         pg->cp_owner = io;
919         cl_page_owner_set(pg);
920         if (pg->cp_state != CPS_FREEING) {
921                 cl_page_state_set(env, pg, CPS_OWNED);
922                 result = 0;
923         } else {
924                 cl_page_disown0(env, io, pg);
925                 result = -EAGAIN;
926         }
927         PINVRNT(env, pg, ergo(result == 0, cl_page_invariant(pg)));
928         RETURN(result);
929 }
930 EXPORT_SYMBOL(cl_page_own);
931
932 /**
933  * Assume page ownership.
934  *
935  * Called when page is already locked by the hosting VM.
936  *
937  * \pre !cl_page_is_owned(pg, io)
938  * \post cl_page_is_owned(pg, io)
939  *
940  * \see cl_page_operations::cpo_assume()
941  */
942 void cl_page_assume(const struct lu_env *env,
943                     struct cl_io *io, struct cl_page *pg)
944 {
945         PASSERT(env, pg, pg->cp_state < CPS_OWNED);
946         PASSERT(env, pg, pg->cp_owner == NULL);
947         PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj));
948         PINVRNT(env, pg, cl_page_invariant(pg));
949
950         ENTRY;
951         pg = cl_page_top(pg);
952         io = cl_io_top(io);
953
954         cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_assume));
955         pg->cp_owner = io;
956         cl_page_owner_set(pg);
957         cl_page_state_set(env, pg, CPS_OWNED);
958         EXIT;
959 }
960 EXPORT_SYMBOL(cl_page_assume);
961
962 /**
963  * Releases page ownership without unlocking the page.
964  *
965  * Moves page into cl_page_state::CPS_CACHED without releasing a lock on the
966  * underlying VM page (as VM is supposed to do this itself).
967  *
968  * \pre   cl_page_is_owned(pg, io)
969  * \post !cl_page_is_owned(pg, io)
970  *
971  * \see cl_page_assume()
972  */
973 void cl_page_unassume(const struct lu_env *env,
974                       struct cl_io *io, struct cl_page *pg)
975 {
976         PINVRNT(env, pg, cl_page_is_owned(pg, io));
977         PINVRNT(env, pg, cl_page_invariant(pg));
978
979         ENTRY;
980         pg = cl_page_top(pg);
981         io = cl_io_top(io);
982         cl_page_owner_clear(pg);
983         cl_page_state_set(env, pg, CPS_CACHED);
984         CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(cpo_unassume),
985                                (const struct lu_env *,
986                                 const struct cl_page_slice *, struct cl_io *),
987                                io);
988         EXIT;
989 }
990 EXPORT_SYMBOL(cl_page_unassume);
991
992 /**
993  * Releases page ownership.
994  *
995  * Moves page into cl_page_state::CPS_CACHED.
996  *
997  * \pre   cl_page_is_owned(pg, io)
998  * \post !cl_page_is_owned(pg, io)
999  *
1000  * \see cl_page_own()
1001  * \see cl_page_operations::cpo_disown()
1002  */
1003 void cl_page_disown(const struct lu_env *env,
1004                     struct cl_io *io, struct cl_page *pg)
1005 {
1006         PINVRNT(env, pg, cl_page_is_owned(pg, io));
1007
1008         ENTRY;
1009         pg = cl_page_top(pg);
1010         io = cl_io_top(io);
1011         cl_page_disown0(env, io, pg);
1012         EXIT;
1013 }
1014 EXPORT_SYMBOL(cl_page_disown);
1015
1016 /**
1017  * Called when page is to be removed from the object, e.g., as a result of
1018  * truncate.
1019  *
1020  * Calls cl_page_operations::cpo_discard() top-to-bottom.
1021  *
1022  * \pre cl_page_is_owned(pg, io)
1023  *
1024  * \see cl_page_operations::cpo_discard()
1025  */
1026 void cl_page_discard(const struct lu_env *env,
1027                      struct cl_io *io, struct cl_page *pg)
1028 {
1029         PINVRNT(env, pg, cl_page_is_owned(pg, io));
1030         PINVRNT(env, pg, cl_page_invariant(pg));
1031
1032         cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_discard));
1033 }
1034 EXPORT_SYMBOL(cl_page_discard);
1035
1036 /**
1037  * Version of cl_page_delete() that can be called for not fully constructed
1038  * pages, e.g,. in a error handling cl_page_find()->cl_page_delete0()
1039  * path. Doesn't check page invariant.
1040  */
1041 static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
1042                             int radix)
1043 {
1044         PASSERT(env, pg, pg == cl_page_top(pg));
1045         PASSERT(env, pg, pg->cp_state != CPS_FREEING);
1046
1047         ENTRY;
1048         /*
1049          * Severe all ways to obtain new pointers to @pg.
1050          */
1051         cl_page_owner_clear(pg);
1052         cl_page_state_set0(env, pg, CPS_FREEING);
1053         CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_delete),
1054                        (const struct lu_env *, const struct cl_page_slice *));
1055         if (!radix)
1056                 /*
1057                  * !radix means that @pg is not yet in the radix tree, skip
1058                  * removing it.
1059                  */
1060                 pg = pg->cp_child;
1061         for (; pg != NULL; pg = pg->cp_child) {
1062                 void                    *value;
1063                 struct cl_object_header *hdr;
1064
1065                 hdr = cl_object_header(pg->cp_obj);
1066                 spin_lock(&hdr->coh_page_guard);
1067                 value = radix_tree_delete(&hdr->coh_tree, pg->cp_index);
1068                 PASSERT(env, pg, value == pg);
1069                 PASSERT(env, pg, hdr->coh_pages > 0);
1070                 hdr->coh_pages--;
1071                 spin_unlock(&hdr->coh_page_guard);
1072         }
1073         EXIT;
1074 }
1075
1076 /**
1077  * Called when a decision is made to throw page out of memory.
1078  *
1079  * Notifies all layers about page destruction by calling
1080  * cl_page_operations::cpo_delete() method top-to-bottom.
1081  *
1082  * Moves page into cl_page_state::CPS_FREEING state (this is the only place
1083  * where transition to this state happens).
1084  *
1085  * Eliminates all venues through which new references to the page can be
1086  * obtained:
1087  *
1088  *     - removes page from the radix trees,
1089  *
1090  *     - breaks linkage from VM page to cl_page.
1091  *
1092  * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
1093  * drain after some time, at which point page will be recycled.
1094  *
1095  * \pre  pg == cl_page_top(pg)
1096  * \pre  VM page is locked
1097  * \post pg->cp_state == CPS_FREEING
1098  *
1099  * \see cl_page_operations::cpo_delete()
1100  */
1101 void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
1102 {
1103         PINVRNT(env, pg, cl_page_invariant(pg));
1104         ENTRY;
1105         cl_page_delete0(env, pg, 1);
1106         EXIT;
1107 }
1108 EXPORT_SYMBOL(cl_page_delete);
1109
1110 /**
1111  * Unmaps page from user virtual memory.
1112  *
1113  * Calls cl_page_operations::cpo_unmap() through all layers top-to-bottom. The
1114  * layer responsible for VM interaction has to unmap page from user space
1115  * virtual memory.
1116  *
1117  * \see cl_page_operations::cpo_unmap()
1118  */
1119 int cl_page_unmap(const struct lu_env *env,
1120                   struct cl_io *io, struct cl_page *pg)
1121 {
1122         PINVRNT(env, pg, cl_page_is_owned(pg, io));
1123         PINVRNT(env, pg, cl_page_invariant(pg));
1124
1125         return cl_page_invoke(env, io, pg, CL_PAGE_OP(cpo_unmap));
1126 }
1127 EXPORT_SYMBOL(cl_page_unmap);
1128
1129 /**
1130  * Marks page up-to-date.
1131  *
1132  * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
1133  * layer responsible for VM interaction has to mark page as up-to-date. From
1134  * this moment on, page can be shown to the user space without Lustre being
1135  * notified, hence the name.
1136  *
1137  * \see cl_page_operations::cpo_export()
1138  */
1139 void cl_page_export(const struct lu_env *env, struct cl_page *pg)
1140 {
1141         PINVRNT(env, pg, cl_page_invariant(pg));
1142         CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_export),
1143                        (const struct lu_env *, const struct cl_page_slice *));
1144 }
1145 EXPORT_SYMBOL(cl_page_export);
1146
1147 /**
1148  * Returns true, iff \a pg is VM locked in a suitable sense by the calling
1149  * thread.
1150  */
1151 int cl_page_is_vmlocked(const struct lu_env *env, const struct cl_page *pg)
1152 {
1153         int result;
1154         const struct cl_page_slice *slice;
1155
1156         ENTRY;
1157         pg = cl_page_top_trusted((struct cl_page *)pg);
1158         slice = container_of(pg->cp_layers.next,
1159                              const struct cl_page_slice, cpl_linkage);
1160         PASSERT(env, pg, slice->cpl_ops->cpo_is_vmlocked != NULL);
1161         /*
1162          * Call ->cpo_is_vmlocked() directly instead of going through
1163          * CL_PAGE_INVOKE(), because cl_page_is_vmlocked() is used by
1164          * cl_page_invariant().
1165          */
1166         result = slice->cpl_ops->cpo_is_vmlocked(env, slice);
1167         PASSERT(env, pg, result == -EBUSY || result == -ENODATA);
1168         RETURN(result == -EBUSY);
1169 }
1170 EXPORT_SYMBOL(cl_page_is_vmlocked);
1171
1172 static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
1173 {
1174         ENTRY;
1175         RETURN(crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN);
1176 }
1177
1178 static void cl_page_io_start(const struct lu_env *env,
1179                              struct cl_page *pg, enum cl_req_type crt)
1180 {
1181         /*
1182          * Page is queued for IO, change its state.
1183          */
1184         ENTRY;
1185         cl_page_owner_clear(pg);
1186         cl_page_state_set(env, pg, cl_req_type_state(crt));
1187         EXIT;
1188 }
1189
1190 /**
1191  * Prepares page for immediate transfer. cl_page_operations::cpo_prep() is
1192  * called top-to-bottom. Every layer either agrees to submit this page (by
1193  * returning 0), or requests to omit this page (by returning -EALREADY). Layer
1194  * handling interactions with the VM also has to inform VM that page is under
1195  * transfer now.
1196  */
1197 int cl_page_prep(const struct lu_env *env, struct cl_io *io,
1198                  struct cl_page *pg, enum cl_req_type crt)
1199 {
1200         int result;
1201
1202         PINVRNT(env, pg, cl_page_is_owned(pg, io));
1203         PINVRNT(env, pg, cl_page_invariant(pg));
1204         PINVRNT(env, pg, crt < CRT_NR);
1205
1206         /*
1207          * XXX this has to be called bottom-to-top, so that llite can set up
1208          * PG_writeback without risking other layers deciding to skip this
1209          * page.
1210          */
1211         result = cl_page_invoke(env, io, pg, CL_PAGE_OP(io[crt].cpo_prep));
1212         if (result == 0)
1213                 cl_page_io_start(env, pg, crt);
1214
1215         KLASSERT(ergo(crt == CRT_WRITE && pg->cp_type == CPT_CACHEABLE,
1216                       equi(result == 0,
1217                            PageWriteback(cl_page_vmpage(env, pg)))));
1218         CL_PAGE_HEADER(D_TRACE, env, pg, "%i %i\n", crt, result);
1219         return result;
1220 }
1221 EXPORT_SYMBOL(cl_page_prep);
1222
1223 /**
1224  * Notify layers about transfer completion.
1225  *
1226  * Invoked by transfer sub-system (which is a part of osc) to notify layers
1227  * that a transfer, of which this page is a part of has completed.
1228  *
1229  * Completion call-backs are executed in the bottom-up order, so that
1230  * uppermost layer (llite), responsible for the VFS/VM interaction runs last
1231  * and can release locks safely.
1232  *
1233  * \pre  pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
1234  * \post pg->cp_state == CPS_CACHED
1235  *
1236  * \see cl_page_operations::cpo_completion()
1237  */
1238 void cl_page_completion(const struct lu_env *env,
1239                         struct cl_page *pg, enum cl_req_type crt, int ioret)
1240 {
1241         PASSERT(env, pg, crt < CRT_NR);
1242         /* cl_page::cp_req already cleared by the caller (osc_completion()) */
1243         PASSERT(env, pg, pg->cp_req == NULL);
1244         PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
1245         PINVRNT(env, pg, cl_page_invariant(pg));
1246
1247         ENTRY;
1248         CL_PAGE_HEADER(D_TRACE, env, pg, "%i %i\n", crt, ioret);
1249         if (crt == CRT_READ) {
1250                 PASSERT(env, pg, !(pg->cp_flags & CPF_READ_COMPLETED));
1251                 pg->cp_flags |= CPF_READ_COMPLETED;
1252         }
1253
1254         cl_page_state_set(env, pg, CPS_CACHED);
1255         CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(io[crt].cpo_completion),
1256                                (const struct lu_env *,
1257                                 const struct cl_page_slice *, int), ioret);
1258
1259         KLASSERT(!PageWriteback(cl_page_vmpage(env, pg)));
1260         EXIT;
1261 }
1262 EXPORT_SYMBOL(cl_page_completion);
1263
1264 /**
1265  * Notify layers that transfer formation engine decided to yank this page from
1266  * the cache and to make it a part of a transfer.
1267  *
1268  * \pre  pg->cp_state == CPS_CACHED
1269  * \post pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
1270  *
1271  * \see cl_page_operations::cpo_make_ready()
1272  */
1273 int cl_page_make_ready(const struct lu_env *env, struct cl_page *pg,
1274                        enum cl_req_type crt)
1275 {
1276         int result;
1277
1278         PINVRNT(env, pg, crt < CRT_NR);
1279
1280         ENTRY;
1281         result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(io[crt].cpo_make_ready),
1282                                 (const struct lu_env *,
1283                                  const struct cl_page_slice *));
1284         if (result == 0) {
1285                 PASSERT(env, pg, pg->cp_state == CPS_CACHED);
1286                 cl_page_io_start(env, pg, crt);
1287         }
1288         CL_PAGE_HEADER(D_TRACE, env, pg, "%i %i\n", crt, result);
1289         RETURN(result);
1290 }
1291 EXPORT_SYMBOL(cl_page_make_ready);
1292
1293 /**
1294  * Notify layers that high level io decided to place this page into a cache
1295  * for future transfer.
1296  *
1297  * The layer implementing transfer engine (osc) has to register this page in
1298  * its queues.
1299  *
1300  * \pre  cl_page_is_owned(pg, io)
1301  * \post ergo(result == 0,
1302  *            pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT)
1303  *
1304  * \see cl_page_operations::cpo_cache_add()
1305  */
1306 int cl_page_cache_add(const struct lu_env *env, struct cl_io *io,
1307                       struct cl_page *pg, enum cl_req_type crt)
1308 {
1309         int result;
1310
1311         PINVRNT(env, pg, crt < CRT_NR);
1312         PINVRNT(env, pg, cl_page_is_owned(pg, io));
1313         PINVRNT(env, pg, cl_page_invariant(pg));
1314
1315         ENTRY;
1316         result = cl_page_invoke(env, io, pg, CL_PAGE_OP(io[crt].cpo_cache_add));
1317         if (result == 0) {
1318                 cl_page_owner_clear(pg);
1319                 cl_page_state_set(env, pg, CPS_CACHED);
1320         }
1321         CL_PAGE_HEADER(D_TRACE, env, pg, "%i %i\n", crt, result);
1322         RETURN(result);
1323 }
1324 EXPORT_SYMBOL(cl_page_cache_add);
1325
1326 /**
1327  * Checks whether page is protected by any extent lock is at least required
1328  * mode.
1329  *
1330  * \return the same as in cl_page_operations::cpo_is_under_lock() method.
1331  * \see cl_page_operations::cpo_is_under_lock()
1332  */
1333 int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
1334                           struct cl_page *page)
1335 {
1336         int rc;
1337
1338         PINVRNT(env, page, cl_page_invariant(page));
1339
1340         ENTRY;
1341         rc = CL_PAGE_INVOKE(env, page, CL_PAGE_OP(cpo_is_under_lock),
1342                             (const struct lu_env *,
1343                              const struct cl_page_slice *, struct cl_io *),
1344                             io);
1345         PASSERT(env, page, rc != 0);
1346         RETURN(rc);
1347 }
1348 EXPORT_SYMBOL(cl_page_is_under_lock);
1349
1350 /**
1351  * Purges all cached pages belonging to the object \a obj.
1352  */
1353 int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
1354 {
1355         struct cl_thread_info   *info;
1356         struct cl_object        *obj = cl_object_top(clobj);
1357         struct cl_io            *io;
1358         struct cl_page_list     *plist;
1359         int                      result;
1360
1361         ENTRY;
1362         info  = cl_env_info(env);
1363         plist = &info->clt_list;
1364         io    = &info->clt_io;
1365
1366         /*
1367          * initialize the io. This is ugly since we never do IO in this
1368          * function, we just make cl_page_list functions happy. -jay
1369          */
1370         io->ci_obj = obj;
1371         result = cl_io_init(env, io, CIT_MISC, obj);
1372         if (result != 0) {
1373                 cl_io_fini(env, io);
1374                 RETURN(io->ci_result);
1375         }
1376
1377         cl_page_list_init(plist);
1378         cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF, plist);
1379         /*
1380          * Since we're purging the pages of an object, we don't care
1381          * the possible outcomes of the following functions.
1382          */
1383         cl_page_list_unmap(env, io, plist);
1384         cl_page_list_discard(env, io, plist);
1385         cl_page_list_disown(env, io, plist);
1386         cl_page_list_fini(env, plist);
1387
1388         cl_io_fini(env, io);
1389         RETURN(result);
1390 }
1391 EXPORT_SYMBOL(cl_pages_prune);
1392
1393 /**
1394  * Tells transfer engine that only part of a page is to be transmitted.
1395  *
1396  * \see cl_page_operations::cpo_clip()
1397  */
1398 void cl_page_clip(const struct lu_env *env, struct cl_page *pg,
1399                   int from, int to)
1400 {
1401         PINVRNT(env, pg, cl_page_invariant(pg));
1402
1403         CL_PAGE_HEADER(D_TRACE, env, pg, "%i %i\n", from, to);
1404         CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_clip),
1405                        (const struct lu_env *,
1406                         const struct cl_page_slice *,int, int),
1407                        from, to);
1408 }
1409 EXPORT_SYMBOL(cl_page_clip);
1410
1411 /**
1412  * Prints human readable representation of \a pg to the \a f.
1413  */
1414 void cl_page_header_print(const struct lu_env *env, void *cookie,
1415                           lu_printer_t printer, const struct cl_page *pg)
1416 {
1417         (*printer)(env, cookie,
1418                    "page@%p[%d %p:%lu ^%p_%p %d %d %d %p %p %#x]\n",
1419                    pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1420                    pg->cp_index, pg->cp_parent, pg->cp_child,
1421                    pg->cp_state, pg->cp_error, pg->cp_type,
1422                    pg->cp_owner, pg->cp_req, pg->cp_flags);
1423 }
1424 EXPORT_SYMBOL(cl_page_header_print);
1425
1426 /**
1427  * Prints human readable representation of \a pg to the \a f.
1428  */
1429 void cl_page_print(const struct lu_env *env, void *cookie,
1430                    lu_printer_t printer, const struct cl_page *pg)
1431 {
1432         struct cl_page *scan;
1433
1434         for (scan = cl_page_top((struct cl_page *)pg);
1435              scan != NULL; scan = scan->cp_child)
1436                 cl_page_header_print(env, cookie, printer, scan);
1437         CL_PAGE_INVOKE(env, (struct cl_page *)pg, CL_PAGE_OP(cpo_print),
1438                        (const struct lu_env *env,
1439                         const struct cl_page_slice *slice,
1440                         void *cookie, lu_printer_t p), cookie, printer);
1441         (*printer)(env, cookie, "end page@%p\n", pg);
1442 }
1443 EXPORT_SYMBOL(cl_page_print);
1444
1445 /**
1446  * Cancel a page which is still in a transfer.
1447  */
1448 int cl_page_cancel(const struct lu_env *env, struct cl_page *page)
1449 {
1450         return CL_PAGE_INVOKE(env, page, CL_PAGE_OP(cpo_cancel),
1451                               (const struct lu_env *,
1452                                const struct cl_page_slice *));
1453 }
1454 EXPORT_SYMBOL(cl_page_cancel);
1455
1456 /**
1457  * Converts a byte offset within object \a obj into a page index.
1458  */
1459 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1460 {
1461         /*
1462          * XXX for now.
1463          */
1464         return (loff_t)idx << CFS_PAGE_SHIFT;
1465 }
1466 EXPORT_SYMBOL(cl_offset);
1467
1468 /**
1469  * Converts a page index into a byte offset within object \a obj.
1470  */
1471 pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1472 {
1473         /*
1474          * XXX for now.
1475          */
1476         return offset >> CFS_PAGE_SHIFT;
1477 }
1478 EXPORT_SYMBOL(cl_index);
1479
1480 int cl_page_size(const struct cl_object *obj)
1481 {
1482         return 1 << CFS_PAGE_SHIFT;
1483 }
1484 EXPORT_SYMBOL(cl_page_size);
1485
1486 /**
1487  * Adds page slice to the compound page.
1488  *
1489  * This is called by cl_object_operations::coo_page_init() methods to add a
1490  * per-layer state to the page. New state is added at the end of
1491  * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1492  *
1493  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1494  */
1495 void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
1496                        struct cl_object *obj,
1497                        const struct cl_page_operations *ops)
1498 {
1499         ENTRY;
1500         list_add_tail(&slice->cpl_linkage, &page->cp_layers);
1501         slice->cpl_obj  = obj;
1502         slice->cpl_ops  = ops;
1503         slice->cpl_page = page;
1504         EXIT;
1505 }
1506 EXPORT_SYMBOL(cl_page_slice_add);
1507
1508 int  cl_page_init(void)
1509 {
1510         return lu_kmem_init(cl_page_caches);
1511 }
1512
1513 void cl_page_fini(void)
1514 {
1515         lu_kmem_fini(cl_page_caches);
1516 }