Whamcloud - gitweb
Introduce .gitignore files.
[fs/lustre-release.git] / lustre / osc / cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osc/cache.c
37  *
38  * Cache of triples - object, lock, extent
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_OSC
45
46 #ifdef __KERNEL__
47 # include <linux/version.h>
48 # include <linux/module.h>
49 # include <linux/list.h>
50 #else                           /* __KERNEL__ */
51 # include <liblustre.h>
52 #endif
53
54 #include <lustre_dlm.h>
55 #include <lustre_cache.h>
56 #include <obd.h>
57 #include <lustre_debug.h>
58
59 #include "osc_internal.h"
60
61 /* Adding @lock to the @cache */
62 int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
63 {
64         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
65
66         if (!lock)      // Lock disappeared under us.
67                 return 0;
68
69         spin_lock(&cache->lc_locks_list_lock);
70         list_add_tail(&lock->l_cache_locks_list, &cache->lc_locks_list);
71         spin_unlock(&cache->lc_locks_list_lock);
72
73         LDLM_LOCK_PUT(lock);
74
75         return 0;
76 }
77
78 /* Tries to add @extent to lock represented by @lockh if non-NULL, otherwise
79    just tries to match some suitable lock by resource and data contained in
80    @extent */
81 /* Should be called with oap->lock held (except on initial addition, see
82    comment in osc_request.c*/
83 int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
84                      struct osc_async_page *extent, struct lustre_handle *lockh)
85 {
86         struct lustre_handle tmplockh;
87         ldlm_policy_data_t tmpex;
88         struct ldlm_lock *lock = NULL;
89         int mode = 0;
90         ENTRY;
91
92         /* Don't add anything second time */
93         if (!list_empty(&extent->oap_page_list)) {
94                 LBUG();
95                 RETURN(0);
96         }
97
98         if (lockh && lustre_handle_is_used(lockh)) {
99                 lock = ldlm_handle2lock(lockh);
100                 if (!lock)
101                         RETURN(-ENOLCK);
102
103                 if(lock->l_policy_data.l_extent.start > extent->oap_obj_off ||
104                    extent->oap_obj_off + CFS_PAGE_SIZE - 1 >
105                    lock->l_policy_data.l_extent.end) {
106                          CDEBUG(D_CACHE, "Got wrong lock [" LPU64 "," LPU64 "] "
107                                          "for page with offset " LPU64 "\n",
108                                          lock->l_policy_data.l_extent.start,
109                                          lock->l_policy_data.l_extent.end,
110                                          extent->oap_obj_off);
111                          LDLM_LOCK_PUT(lock);
112                          RETURN(-ENOLCK);
113                 }
114         } else {
115                 /* Real extent width calculation here once we have real
116                  * extents
117                  */
118                 tmpex.l_extent.start = extent->oap_obj_off;
119                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
120
121                 /* XXX find lock from extent or something like that */
122                 /* The lock mode does not matter. If this is dirty page - then
123                  * there could be only one PW lock. If the page is clean,
124                  * any PR lock is good
125                  */
126                 mode = ldlm_lock_match(cache->lc_obd->obd_namespace,
127                                        LDLM_FL_BLOCK_GRANTED |
128                                        LDLM_FL_CBPENDING, res, LDLM_EXTENT,
129                                        &tmpex, LCK_PW | LCK_PR, &tmplockh);
130
131                 if (mode <= 0) {
132                         CDEBUG(D_CACHE, "No lock to attach " LPU64 "->" LPU64
133                                " extent to!\n", tmpex.l_extent.start,
134                                tmpex.l_extent.end);
135                         RETURN((mode < 0) ? mode : -ENOLCK);
136                 }
137
138                 lock = ldlm_handle2lock(&tmplockh);
139                 if (!lock) {    // Race - lock disappeared under us (eviction?)
140                         CDEBUG(D_CACHE, "Newly matched lock just disappeared "
141                                "under us\n");
142                         RETURN(-ENOLCK);
143                 }
144         }
145
146         spin_lock(&lock->l_extents_list_lock);
147         list_add_tail(&extent->oap_page_list, &lock->l_extents_list);
148         spin_unlock(&lock->l_extents_list_lock);
149         extent->oap_ldlm_lock = lock;
150         LASSERTF(!(lock->l_flags & LDLM_FL_CANCEL), "Adding a page to already "
151                  "cancelled lock %p", lock);
152         if (mode)
153                 ldlm_lock_decref(&tmplockh, mode);
154         LDLM_LOCK_PUT(lock);
155
156         RETURN(0);
157 }
158
159 static void cache_extent_removal_get(struct page_removal_cb_element *element)
160 {
161         atomic_inc(&element->prce_refcnt);
162 }
163
164 static void cache_extent_removal_put(struct page_removal_cb_element *element)
165 {
166         if(atomic_dec_and_test(&element->prce_refcnt))
167                 OBD_FREE_PTR(element);
168 }
169
170 static int cache_extent_removal_event(struct lustre_cache *cache,
171                                       void *data, int discard)
172 {
173         struct page *page = data;
174         struct list_head *iter;
175         struct page_removal_cb_element *element;
176
177         read_lock(&cache->lc_page_removal_cb_lock);
178         iter = cache->lc_page_removal_callback_list.next;
179         while(iter != &cache->lc_page_removal_callback_list) {
180                 element = list_entry(iter, struct page_removal_cb_element, prce_list);
181                 cache_extent_removal_get(element);
182                 read_unlock(&cache->lc_page_removal_cb_lock);
183
184                 element->prce_callback(page, discard);
185
186                 read_lock(&cache->lc_page_removal_cb_lock);
187                 iter = iter->next;
188                 cache_extent_removal_put(element);
189         }
190         read_unlock(&cache->lc_page_removal_cb_lock);
191
192         return 0;
193 }
194
195 /* Registers set of pin/remove callbacks for extents. Current limitation is
196    there could be only one pin_cb per cache.
197    @pin_cb is called when we have the page locked to pin it in memory so that
198    it does not disappear after we release page lock (which we need to do
199    to avoid deadlocks).
200    @func_cb is removal callback that is called after page and all spinlocks are
201    released, and is supposed to clean the page and remove it from all
202    (vfs) caches it might be in */
203 int cache_add_extent_removal_cb(struct lustre_cache *cache,
204                                 obd_page_removal_cb_t func_cb,
205                                 obd_pin_extent_cb pin_cb)
206 {
207         struct page_removal_cb_element *element;
208
209         if (!func_cb)
210                 return 0;
211
212         OBD_ALLOC_PTR(element);
213         if (!element)
214                 return -ENOMEM;
215         element->prce_callback = func_cb;
216         atomic_set(&element->prce_refcnt, 1);
217
218         write_lock(&cache->lc_page_removal_cb_lock);
219         list_add_tail(&element->prce_list,
220                       &cache->lc_page_removal_callback_list);
221         write_unlock(&cache->lc_page_removal_cb_lock);
222
223         cache->lc_pin_extent_cb = pin_cb;
224         return 0;
225 }
226 EXPORT_SYMBOL(cache_add_extent_removal_cb);
227
228 /* Unregister exntent removal callback registered earlier. If the list of
229    registered removal callbacks becomes empty, we also clear pin callback
230    since it could only be one */
231 int cache_del_extent_removal_cb(struct lustre_cache *cache,
232                                 obd_page_removal_cb_t func_cb)
233 {
234         int found = 0;
235         struct page_removal_cb_element *element, *t;
236         ENTRY;
237
238         write_lock(&cache->lc_page_removal_cb_lock);
239         list_for_each_entry_safe(element, t,
240                                  &cache->lc_page_removal_callback_list,
241                                  prce_list) {
242                 if (element->prce_callback == func_cb) {
243                         list_del(&element->prce_list);
244                         write_unlock(&cache->lc_page_removal_cb_lock);
245                         found = 1;
246                         cache_extent_removal_put(element);
247                         write_lock(&cache->lc_page_removal_cb_lock);
248                         /* We continue iterating the list in case this function
249                            was registered more than once */
250                 }
251         }
252         write_unlock(&cache->lc_page_removal_cb_lock);
253
254         if (list_empty(&cache->lc_page_removal_callback_list))
255                 cache->lc_pin_extent_cb = NULL;
256
257         return !found;
258 }
259 EXPORT_SYMBOL(cache_del_extent_removal_cb);
260
261 static int cache_remove_extent_nolock(struct lustre_cache *cache,
262                                       struct osc_async_page *extent)
263 {
264         int have_lock = !!extent->oap_ldlm_lock;
265         /* We used to check oap_ldlm_lock for non NULL here, but it might be
266            NULL, in fact, due to parallel page eviction clearing it and waiting
267            on a lock's page list lock */
268         extent->oap_ldlm_lock = NULL;
269
270         if (!list_empty(&extent->oap_page_list))
271                 list_del_init(&extent->oap_page_list);
272
273         return have_lock;
274 }
275
276 /* Request the @extent to be removed from cache and locks it belongs to. */
277 void cache_remove_extent(struct lustre_cache *cache,
278                          struct osc_async_page *extent)
279 {
280         struct ldlm_lock *lock;
281
282         spin_lock(&extent->oap_lock);
283         lock = extent->oap_ldlm_lock;
284
285         extent->oap_ldlm_lock = NULL;
286         spin_unlock(&extent->oap_lock);
287
288         /* No lock - means this extent is not in any list */
289         if (!lock)
290                 return;
291
292         spin_lock(&lock->l_extents_list_lock);
293         if (!list_empty(&extent->oap_page_list))
294                 list_del_init(&extent->oap_page_list);
295         spin_unlock(&lock->l_extents_list_lock);
296 }
297
298 /* Iterate through list of extents in given lock identified by @lockh,
299    calling @cb_func for every such extent. Also passed @data to every call.
300    Stops iterating prematurely if @cb_func returns nonzero. */
301 int cache_iterate_extents(struct lustre_cache *cache,
302                           struct lustre_handle *lockh,
303                           cache_iterate_extents_cb_t cb_func, void *data)
304 {
305         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
306         struct osc_async_page *extent, *t;
307
308         if (!lock)      // Lock disappeared
309                 return 0;
310         /* Parallel page removal from mem pressure can race with us */
311         spin_lock(&lock->l_extents_list_lock);
312         list_for_each_entry_safe(extent, t, &lock->l_extents_list,
313                                  oap_page_list) {
314                 if (cb_func(cache, lockh, extent, data))
315                         break;
316         }
317         spin_unlock(&lock->l_extents_list_lock);
318         LDLM_LOCK_PUT(lock);
319
320         return 0;
321 }
322
323 static int cache_remove_extents_from_lock(struct lustre_cache *cache,
324                                           struct ldlm_lock *lock, void *data)
325 {
326         struct osc_async_page *extent;
327         void *ext_data;
328
329         LASSERT(lock);
330
331         spin_lock(&lock->l_extents_list_lock);
332         while (!list_empty(&lock->l_extents_list)) {
333                 extent = list_entry(lock->l_extents_list.next,
334                                     struct osc_async_page, oap_page_list);
335
336                 spin_lock(&extent->oap_lock);
337                 /* If there is no lock referenced from this oap, it means
338                    there is parallel page-removal process waiting to free that
339                    page on l_extents_list_lock and it holds page lock.
340                    We need this page to completely go away and for that to
341                    happen we will just try to truncate it here too.
342                    Serialisation on page lock will achieve that goal for us. */
343                 /* Try to add extent back to the cache first, but only if we
344                  * cancel read lock, write locks cannot have other overlapping
345                  * locks. If adding is not possible (or canceling pw lock),
346                  * then remove extent from cache */
347                 if (!cache_remove_extent_nolock(cache, extent) ||
348                     (lock->l_granted_mode == LCK_PW) ||
349                     cache_add_extent(cache, &lock->l_resource->lr_name, extent,
350                                      NULL)) {
351                         /* We need to remember this oap_page value now,
352                            once we release spinlocks, extent struct
353                            might be freed and we endup requesting
354                            page with address 0x5a5a5a5a in
355                            cache_extent_removal_event */
356                         ext_data = extent->oap_page;
357                         LASSERT(cache->lc_pin_extent_cb != NULL);
358                         cache->lc_pin_extent_cb(extent->oap_page);
359
360                         if (lock->l_flags & LDLM_FL_BL_AST)
361                                 extent->oap_async_flags |= ASYNC_HP;
362                         spin_unlock(&extent->oap_lock);
363                         spin_unlock(&lock->l_extents_list_lock);
364                         cache_extent_removal_event(cache, ext_data,
365                                                    lock->
366                                                    l_flags &
367                                                    LDLM_FL_DISCARD_DATA);
368                         spin_lock(&lock->l_extents_list_lock);
369                 } else {
370                         spin_unlock(&extent->oap_lock);
371                 }
372         }
373         spin_unlock(&lock->l_extents_list_lock);
374
375         return 0;
376 }
377
378 /* Remoes @lock from cache after necessary checks. */
379 int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
380 {
381         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
382
383         if (!lock)  // The lock was removed by somebody just now, nothing to do
384                 return 0;
385
386         cache_remove_extents_from_lock(cache, lock, NULL /*data */ );
387
388         spin_lock(&cache->lc_locks_list_lock);
389         list_del_init(&lock->l_cache_locks_list);
390         spin_unlock(&cache->lc_locks_list_lock);
391
392         LDLM_LOCK_PUT(lock);
393
394         return 0;
395 }
396
397 /* Supposed to iterate through all locks in the cache for given resource.
398    Not implemented atthe moment. */
399 int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
400                         cache_iterate_locks_cb_t cb_fun, void *data)
401 {
402         return -ENOTSUPP;
403 }
404
405 /* Create lustre cache and attach it to @obd */
406 struct lustre_cache *cache_create(struct obd_device *obd)
407 {
408         struct lustre_cache *cache;
409
410         OBD_ALLOC(cache, sizeof(*cache));
411         if (!cache)
412                 GOTO(out, NULL);
413
414         spin_lock_init(&cache->lc_locks_list_lock);
415         CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
416         CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
417         rwlock_init(&cache->lc_page_removal_cb_lock);
418         cache->lc_obd = obd;
419
420 out:
421         return cache;
422 }
423
424 /* Destroy @cache and free its memory */
425 int cache_destroy(struct lustre_cache *cache)
426 {
427         if (!cache)
428                 RETURN(0);
429
430         spin_lock(&cache->lc_locks_list_lock);
431         if (!list_empty(&cache->lc_locks_list)) {
432                 struct ldlm_lock *lock, *tmp;
433                 CERROR("still have locks in the list on cleanup:\n");
434
435                 list_for_each_entry_safe(lock, tmp,
436                                          &cache->lc_locks_list,
437                                          l_cache_locks_list) {
438                         list_del_init(&lock->l_cache_locks_list);
439                         /* XXX: Of course natural idea would be to print
440                          * offending locks here, but if we use
441                          * e.g. LDLM_ERROR, we will likely crash here,
442                          * as LDLM error tries to access e.g.
443                          * nonexisting namespace. Normally this kind of
444                          * case could only happen when somebody did not
445                          * release lock reference and we have other ways
446                          * to detect this. */
447                         /* Make sure there are no pages left under the
448                          * lock */
449                         LASSERT(list_empty(&lock->l_extents_list));
450                 }
451         }
452         spin_unlock(&cache->lc_locks_list_lock);
453         LASSERT(list_empty(&cache->lc_page_removal_callback_list));
454
455         OBD_FREE(cache, sizeof(*cache));
456         return 0;
457 }