Whamcloud - gitweb
b=18266
[fs/lustre-release.git] / lustre / osc / cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osc/cache.c
37  *
38  * Cache of triples - object, lock, extent
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_OSC
45
46 #ifdef __KERNEL__
47 # include <linux/version.h>
48 # include <linux/module.h>
49 # include <linux/list.h>
50 #else                           /* __KERNEL__ */
51 # include <liblustre.h>
52 #endif
53
54 #include <lustre_dlm.h>
55 #include <lustre_cache.h>
56 #include <obd.h>
57 #include <lustre_debug.h>
58
59 #include "osc_internal.h"
60
61 /* Adding @lock to the @cache */
62 int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
63 {
64         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
65
66         if (!lock)      // Lock disappeared under us.
67                 return 0;
68
69         spin_lock(&cache->lc_locks_list_lock);
70         list_add_tail(&lock->l_cache_locks_list, &cache->lc_locks_list);
71         spin_unlock(&cache->lc_locks_list_lock);
72
73         LDLM_LOCK_PUT(lock);
74
75         return 0;
76 }
77
78 /* Tries to add @extent to lock represented by @lockh if non-NULL, otherwise
79    just tries to match some suitable lock by resource and data contained in
80    @extent */
81 /* Should be called with oap->lock held (except on initial addition, see
82    comment in osc_request.c*/
83 int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
84                      struct osc_async_page *extent, struct lustre_handle *lockh)
85 {
86         struct lustre_handle tmplockh;
87         ldlm_policy_data_t tmpex;
88         struct ldlm_lock *lock = NULL;
89         int mode = 0;
90         ENTRY;
91
92         /* Don't add anything second time */
93         if (!list_empty(&extent->oap_page_list)) {
94                 LBUG();
95                 RETURN(0);
96         }
97
98         if (lockh && lustre_handle_is_used(lockh)) {
99                 lock = ldlm_handle2lock(lockh);
100                 if (!lock)
101                         RETURN(-ENOLCK);
102
103                 LASSERTF(lock->l_policy_data.l_extent.start <=
104                          extent->oap_obj_off &&
105                          extent->oap_obj_off + CFS_PAGE_SIZE - 1 <=
106                          lock->l_policy_data.l_extent.end,
107                          "Got wrong lock [" LPU64 "," LPU64 "] for page with "
108                          "offset " LPU64 "\n",
109                          lock->l_policy_data.l_extent.start,
110                          lock->l_policy_data.l_extent.end, extent->oap_obj_off);
111         } else {
112                 /* Real extent width calculation here once we have real
113                  * extents
114                  */
115                 tmpex.l_extent.start = extent->oap_obj_off;
116                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
117
118                 /* XXX find lock from extent or something like that */
119                 /* The lock mode does not matter. If this is dirty page - then
120                  * there could be only one PW lock. If the page is clean,
121                  * any PR lock is good
122                  */
123                 mode = ldlm_lock_match(cache->lc_obd->obd_namespace,
124                                        LDLM_FL_BLOCK_GRANTED |
125                                        LDLM_FL_CBPENDING, res, LDLM_EXTENT,
126                                        &tmpex, LCK_PW | LCK_PR, &tmplockh);
127
128                 if (mode <= 0) {
129                         CDEBUG(D_CACHE, "No lock to attach " LPU64 "->" LPU64
130                                " extent to!\n", tmpex.l_extent.start,
131                                tmpex.l_extent.end);
132                         RETURN((mode < 0) ? mode : -ENOLCK);
133                 }
134
135                 lock = ldlm_handle2lock(&tmplockh);
136                 if (!lock) {    // Race - lock disappeared under us (eviction?)
137                         CDEBUG(D_CACHE, "Newly matched lock just disappeared "
138                                "under us\n");
139                         RETURN(-ENOLCK);
140                 }
141         }
142
143         spin_lock(&lock->l_extents_list_lock);
144         list_add_tail(&extent->oap_page_list, &lock->l_extents_list);
145         spin_unlock(&lock->l_extents_list_lock);
146         extent->oap_ldlm_lock = lock;
147         LASSERTF(!(lock->l_flags & LDLM_FL_CANCEL), "Adding a page to already "
148                  "cancelled lock %p", lock);
149         if (mode)
150                 ldlm_lock_decref(&tmplockh, mode);
151         LDLM_LOCK_PUT(lock);
152
153         RETURN(0);
154 }
155
156 static void cache_extent_removal_get(struct page_removal_cb_element *element)
157 {
158         atomic_inc(&element->prce_refcnt);
159 }
160
161 static void cache_extent_removal_put(struct page_removal_cb_element *element)
162 {
163         if(atomic_dec_and_test(&element->prce_refcnt))
164                 OBD_FREE_PTR(element);
165 }
166
167 static int cache_extent_removal_event(struct lustre_cache *cache,
168                                       void *data, int discard)
169 {
170         struct page *page = data;
171         struct list_head *iter;
172         struct page_removal_cb_element *element;
173
174         read_lock(&cache->lc_page_removal_cb_lock);
175         iter = cache->lc_page_removal_callback_list.next;
176         while(iter != &cache->lc_page_removal_callback_list) {
177                 element = list_entry(iter, struct page_removal_cb_element, prce_list);
178                 cache_extent_removal_get(element);
179                 read_unlock(&cache->lc_page_removal_cb_lock);
180
181                 element->prce_callback(page, discard);
182
183                 read_lock(&cache->lc_page_removal_cb_lock);
184                 iter = iter->next;
185                 cache_extent_removal_put(element);
186         }
187         read_unlock(&cache->lc_page_removal_cb_lock);
188
189         return 0;
190 }
191
192 /* Registers set of pin/remove callbacks for extents. Current limitation is
193    there could be only one pin_cb per cache.
194    @pin_cb is called when we have the page locked to pin it in memory so that
195    it does not disappear after we release page lock (which we need to do
196    to avoid deadlocks).
197    @func_cb is removal callback that is called after page and all spinlocks are
198    released, and is supposed to clean the page and remove it from all
199    (vfs) caches it might be in */
200 int cache_add_extent_removal_cb(struct lustre_cache *cache,
201                                 obd_page_removal_cb_t func_cb,
202                                 obd_pin_extent_cb pin_cb)
203 {
204         struct page_removal_cb_element *element;
205
206         if (!func_cb)
207                 return 0;
208
209         OBD_ALLOC_PTR(element);
210         if (!element)
211                 return -ENOMEM;
212         element->prce_callback = func_cb;
213         atomic_set(&element->prce_refcnt, 1);
214
215         write_lock(&cache->lc_page_removal_cb_lock);
216         list_add_tail(&element->prce_list,
217                       &cache->lc_page_removal_callback_list);
218         write_unlock(&cache->lc_page_removal_cb_lock);
219
220         cache->lc_pin_extent_cb = pin_cb;
221         return 0;
222 }
223 EXPORT_SYMBOL(cache_add_extent_removal_cb);
224
225 /* Unregister exntent removal callback registered earlier. If the list of
226    registered removal callbacks becomes empty, we also clear pin callback
227    since it could only be one */
228 int cache_del_extent_removal_cb(struct lustre_cache *cache,
229                                 obd_page_removal_cb_t func_cb)
230 {
231         int found = 0;
232         struct page_removal_cb_element *element, *t;
233         ENTRY;
234
235         write_lock(&cache->lc_page_removal_cb_lock);
236         list_for_each_entry_safe(element, t,
237                                  &cache->lc_page_removal_callback_list,
238                                  prce_list) {
239                 if (element->prce_callback == func_cb) {
240                         list_del(&element->prce_list);
241                         write_unlock(&cache->lc_page_removal_cb_lock);
242                         found = 1;
243                         cache_extent_removal_put(element);
244                         write_lock(&cache->lc_page_removal_cb_lock);
245                         /* We continue iterating the list in case this function
246                            was registered more than once */
247                 }
248         }
249         write_unlock(&cache->lc_page_removal_cb_lock);
250
251         if (list_empty(&cache->lc_page_removal_callback_list))
252                 cache->lc_pin_extent_cb = NULL;
253
254         return !found;
255 }
256 EXPORT_SYMBOL(cache_del_extent_removal_cb);
257
258 static int cache_remove_extent_nolock(struct lustre_cache *cache,
259                                       struct osc_async_page *extent)
260 {
261         int have_lock = !!extent->oap_ldlm_lock;
262         /* We used to check oap_ldlm_lock for non NULL here, but it might be
263            NULL, in fact, due to parallel page eviction clearing it and waiting
264            on a lock's page list lock */
265         extent->oap_ldlm_lock = NULL;
266
267         if (!list_empty(&extent->oap_page_list))
268                 list_del_init(&extent->oap_page_list);
269
270         return have_lock;
271 }
272
273 /* Request the @extent to be removed from cache and locks it belongs to. */
274 void cache_remove_extent(struct lustre_cache *cache,
275                          struct osc_async_page *extent)
276 {
277         struct ldlm_lock *lock;
278
279         spin_lock(&extent->oap_lock);
280         lock = extent->oap_ldlm_lock;
281
282         extent->oap_ldlm_lock = NULL;
283         spin_unlock(&extent->oap_lock);
284
285         /* No lock - means this extent is not in any list */
286         if (!lock)
287                 return;
288
289         spin_lock(&lock->l_extents_list_lock);
290         if (!list_empty(&extent->oap_page_list))
291                 list_del_init(&extent->oap_page_list);
292         spin_unlock(&lock->l_extents_list_lock);
293 }
294
295 /* Iterate through list of extents in given lock identified by @lockh,
296    calling @cb_func for every such extent. Also passed @data to every call.
297    Stops iterating prematurely if @cb_func returns nonzero. */
298 int cache_iterate_extents(struct lustre_cache *cache,
299                           struct lustre_handle *lockh,
300                           cache_iterate_extents_cb_t cb_func, void *data)
301 {
302         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
303         struct osc_async_page *extent, *t;
304
305         if (!lock)      // Lock disappeared
306                 return 0;
307         /* Parallel page removal from mem pressure can race with us */
308         spin_lock(&lock->l_extents_list_lock);
309         list_for_each_entry_safe(extent, t, &lock->l_extents_list,
310                                  oap_page_list) {
311                 if (cb_func(cache, lockh, extent, data))
312                         break;
313         }
314         spin_unlock(&lock->l_extents_list_lock);
315         LDLM_LOCK_PUT(lock);
316
317         return 0;
318 }
319
320 static int cache_remove_extents_from_lock(struct lustre_cache *cache,
321                                           struct ldlm_lock *lock, void *data)
322 {
323         struct osc_async_page *extent;
324         void *ext_data;
325
326         LASSERT(lock);
327
328         spin_lock(&lock->l_extents_list_lock);
329         while (!list_empty(&lock->l_extents_list)) {
330                 extent = list_entry(lock->l_extents_list.next,
331                                     struct osc_async_page, oap_page_list);
332
333                 spin_lock(&extent->oap_lock);
334                 /* If there is no lock referenced from this oap, it means
335                    there is parallel page-removal process waiting to free that
336                    page on l_extents_list_lock and it holds page lock.
337                    We need this page to completely go away and for that to
338                    happen we will just try to truncate it here too.
339                    Serialisation on page lock will achieve that goal for us. */
340                 /* Try to add extent back to the cache first, but only if we
341                  * cancel read lock, write locks cannot have other overlapping
342                  * locks. If adding is not possible (or canceling pw lock),
343                  * then remove extent from cache */
344                 if (!cache_remove_extent_nolock(cache, extent) ||
345                     (lock->l_granted_mode == LCK_PW) ||
346                     cache_add_extent(cache, &lock->l_resource->lr_name, extent,
347                                      NULL)) {
348                         /* We need to remember this oap_page value now,
349                            once we release spinlocks, extent struct
350                            might be freed and we endup requesting
351                            page with address 0x5a5a5a5a in
352                            cache_extent_removal_event */
353                         ext_data = extent->oap_page;
354                         LASSERT(cache->lc_pin_extent_cb != NULL);
355                         cache->lc_pin_extent_cb(extent->oap_page);
356
357                         if (lock->l_flags & LDLM_FL_BL_AST)
358                                 extent->oap_async_flags |= ASYNC_HP;
359                         spin_unlock(&extent->oap_lock);
360                         spin_unlock(&lock->l_extents_list_lock);
361                         cache_extent_removal_event(cache, ext_data,
362                                                    lock->
363                                                    l_flags &
364                                                    LDLM_FL_DISCARD_DATA);
365                         spin_lock(&lock->l_extents_list_lock);
366                 } else {
367                         spin_unlock(&extent->oap_lock);
368                 }
369         }
370         spin_unlock(&lock->l_extents_list_lock);
371
372         return 0;
373 }
374
375 /* Remoes @lock from cache after necessary checks. */
376 int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
377 {
378         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
379
380         if (!lock)  // The lock was removed by somebody just now, nothing to do
381                 return 0;
382
383         cache_remove_extents_from_lock(cache, lock, NULL /*data */ );
384
385         spin_lock(&cache->lc_locks_list_lock);
386         list_del_init(&lock->l_cache_locks_list);
387         spin_unlock(&cache->lc_locks_list_lock);
388
389         LDLM_LOCK_PUT(lock);
390
391         return 0;
392 }
393
394 /* Supposed to iterate through all locks in the cache for given resource.
395    Not implemented atthe moment. */
396 int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
397                         cache_iterate_locks_cb_t cb_fun, void *data)
398 {
399         return -ENOTSUPP;
400 }
401
402 /* Create lustre cache and attach it to @obd */
403 struct lustre_cache *cache_create(struct obd_device *obd)
404 {
405         struct lustre_cache *cache;
406
407         OBD_ALLOC(cache, sizeof(*cache));
408         if (!cache)
409                 GOTO(out, NULL);
410
411         spin_lock_init(&cache->lc_locks_list_lock);
412         CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
413         CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
414         rwlock_init(&cache->lc_page_removal_cb_lock);
415         cache->lc_obd = obd;
416
417 out:
418         return cache;
419 }
420
421 /* Destroy @cache and free its memory */
422 int cache_destroy(struct lustre_cache *cache)
423 {
424         if (!cache)
425                 RETURN(0);
426
427         spin_lock(&cache->lc_locks_list_lock);
428         if (!list_empty(&cache->lc_locks_list)) {
429                 struct ldlm_lock *lock, *tmp;
430                 CERROR("still have locks in the list on cleanup:\n");
431
432                 list_for_each_entry_safe(lock, tmp,
433                                          &cache->lc_locks_list,
434                                          l_cache_locks_list) {
435                         list_del_init(&lock->l_cache_locks_list);
436                         /* XXX: Of course natural idea would be to print
437                          * offending locks here, but if we use
438                          * e.g. LDLM_ERROR, we will likely crash here,
439                          * as LDLM error tries to access e.g.
440                          * nonexisting namespace. Normally this kind of
441                          * case could only happen when somebody did not
442                          * release lock reference and we have other ways
443                          * to detect this. */
444                         /* Make sure there are no pages left under the
445                          * lock */
446                         LASSERT(list_empty(&lock->l_extents_list));
447                 }
448         }
449         spin_unlock(&cache->lc_locks_list_lock);
450         LASSERT(list_empty(&cache->lc_page_removal_callback_list));
451
452         OBD_FREE(cache, sizeof(*cache));
453         return 0;
454 }