Whamcloud - gitweb
b=17511
[fs/lustre-release.git] / lustre / osc / cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osc/cache.c
37  *
38  * Cache of triples - object, lock, extent
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_OSC
45
46 #ifdef __KERNEL__
47 # include <linux/version.h>
48 # include <linux/module.h>
49 # include <linux/list.h>
50 #else                           /* __KERNEL__ */
51 # include <liblustre.h>
52 #endif
53
54 #include <lustre_dlm.h>
55 #include <lustre_cache.h>
56 #include <obd.h>
57 #include <lustre_debug.h>
58
59 #include "osc_internal.h"
60
61 /* Adding @lock to the @cache */
62 int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
63 {
64         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
65
66         if (!lock)      // Lock disappeared under us.
67                 return 0;
68
69         spin_lock(&cache->lc_locks_list_lock);
70         list_add_tail(&lock->l_cache_locks_list, &cache->lc_locks_list);
71         spin_unlock(&cache->lc_locks_list_lock);
72
73         LDLM_LOCK_PUT(lock);
74
75         return 0;
76 }
77
78 /* Tries to add @extent to lock represented by @lockh if non-NULL, otherwise
79    just tries to match some suitable lock by resource and data contained in
80    @extent */
81 /* Should be called with oap->lock held (except on initial addition, see
82    comment in osc_request.c*/
83 int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
84                      struct osc_async_page *extent, struct lustre_handle *lockh)
85 {
86         struct lustre_handle tmplockh;
87         ldlm_policy_data_t tmpex;
88         struct ldlm_lock *lock = NULL;
89         ENTRY;
90
91         /* Don't add anything second time */
92         if (!list_empty(&extent->oap_page_list)) {
93                 LBUG();
94                 RETURN(0);
95         }
96
97         if (lockh && lustre_handle_is_used(lockh)) {
98                 lock = ldlm_handle2lock(lockh);
99                 if (!lock)
100                         RETURN(-ENOLCK);
101
102                 LASSERTF(lock->l_policy_data.l_extent.start <=
103                          extent->oap_obj_off &&
104                          extent->oap_obj_off + CFS_PAGE_SIZE - 1 <=
105                          lock->l_policy_data.l_extent.end,
106                          "Got wrong lock [" LPU64 "," LPU64 "] for page with "
107                          "offset " LPU64 "\n",
108                          lock->l_policy_data.l_extent.start,
109                          lock->l_policy_data.l_extent.end, extent->oap_obj_off);
110         } else {
111                 int mode;
112                 /* Real extent width calculation here once we have real
113                  * extents
114                  */
115                 tmpex.l_extent.start = extent->oap_obj_off;
116                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
117
118                 /* XXX find lock from extent or something like that */
119                 /* The lock mode does not matter. If this is dirty page - then
120                  * there could be only one PW lock. If the page is clean,
121                  * any PR lock is good
122                  */
123
124                 mode = ldlm_lock_match(cache->lc_obd->obd_namespace,
125                                        LDLM_FL_BLOCK_GRANTED |
126                                        LDLM_FL_CBPENDING, res, LDLM_EXTENT,
127                                        &tmpex, LCK_PW | LCK_PR, &tmplockh);
128
129                 if (mode <= 0) {
130                         CDEBUG(D_CACHE, "No lock to attach " LPU64 "->" LPU64
131                                " extent to!\n", tmpex.l_extent.start,
132                                tmpex.l_extent.end);
133                         RETURN((mode < 0) ? mode : -ENOLCK);
134                 }
135
136                 lock = ldlm_handle2lock(&tmplockh);
137                 if (!lock) {    // Race - lock disappeared under us (eviction?)
138                         CDEBUG(D_CACHE, "Newly matched lock just disappeared "
139                                "under us\n");
140                         RETURN(-ENOLCK);
141                 }
142                 ldlm_lock_decref(&tmplockh, mode);
143         }
144
145         spin_lock(&lock->l_extents_list_lock);
146         list_add_tail(&extent->oap_page_list, &lock->l_extents_list);
147         spin_unlock(&lock->l_extents_list_lock);
148         extent->oap_ldlm_lock = lock;
149         LDLM_LOCK_PUT(lock);
150
151         RETURN(0);
152 }
153
154 static void cache_extent_removal_get(struct page_removal_cb_element *element)
155 {
156         atomic_inc(&element->prce_refcnt);
157 }
158
159 static void cache_extent_removal_put(struct page_removal_cb_element *element)
160 {
161         if(atomic_dec_and_test(&element->prce_refcnt))
162                 OBD_FREE_PTR(element);
163 }
164
165 static int cache_extent_removal_event(struct lustre_cache *cache,
166                                       void *data, int discard)
167 {
168         struct page *page = data;
169         struct list_head *iter;
170         struct page_removal_cb_element *element;
171
172         read_lock(&cache->lc_page_removal_cb_lock);
173         iter = cache->lc_page_removal_callback_list.next;
174         while(iter != &cache->lc_page_removal_callback_list) {
175                 element = list_entry(iter, struct page_removal_cb_element, prce_list);
176                 cache_extent_removal_get(element);
177                 read_unlock(&cache->lc_page_removal_cb_lock);
178
179                 element->prce_callback(page, discard);
180
181                 read_lock(&cache->lc_page_removal_cb_lock);
182                 iter = iter->next;
183                 cache_extent_removal_put(element);
184         }
185         read_unlock(&cache->lc_page_removal_cb_lock);
186
187         return 0;
188 }
189
190 /* Registers set of pin/remove callbacks for extents. Current limitation is
191    there could be only one pin_cb per cache.
192    @pin_cb is called when we have the page locked to pin it in memory so that
193    it does not disappear after we release page lock (which we need to do
194    to avoid deadlocks).
195    @func_cb is removal callback that is called after page and all spinlocks are
196    released, and is supposed to clean the page and remove it from all
197    (vfs) caches it might be in */
198 int cache_add_extent_removal_cb(struct lustre_cache *cache,
199                                 obd_page_removal_cb_t func_cb,
200                                 obd_pin_extent_cb pin_cb)
201 {
202         struct page_removal_cb_element *element;
203
204         if (!func_cb)
205                 return 0;
206
207         OBD_ALLOC_PTR(element);
208         if (!element)
209                 return -ENOMEM;
210         element->prce_callback = func_cb;
211         atomic_set(&element->prce_refcnt, 1);
212
213         write_lock(&cache->lc_page_removal_cb_lock);
214         list_add_tail(&element->prce_list,
215                       &cache->lc_page_removal_callback_list);
216         write_unlock(&cache->lc_page_removal_cb_lock);
217
218         cache->lc_pin_extent_cb = pin_cb;
219         return 0;
220 }
221 EXPORT_SYMBOL(cache_add_extent_removal_cb);
222
223 /* Unregister exntent removal callback registered earlier. If the list of
224    registered removal callbacks becomes empty, we also clear pin callback
225    since it could only be one */
226 int cache_del_extent_removal_cb(struct lustre_cache *cache,
227                                 obd_page_removal_cb_t func_cb)
228 {
229         int found = 0;
230         struct page_removal_cb_element *element, *t;
231
232         write_lock(&cache->lc_page_removal_cb_lock);
233         list_for_each_entry_safe(element, t,
234                                  &cache->lc_page_removal_callback_list,
235                                  prce_list) {
236                 if (element->prce_callback == func_cb) {
237                         list_del(&element->prce_list);
238                         write_unlock(&cache->lc_page_removal_cb_lock);
239                         found = 1;
240                         cache_extent_removal_put(element);
241                         write_lock(&cache->lc_page_removal_cb_lock);
242                         /* We continue iterating the list in case this function
243                            was registered more than once */
244                 }
245         }
246         write_unlock(&cache->lc_page_removal_cb_lock);
247
248         if (list_empty(&cache->lc_page_removal_callback_list))
249                 cache->lc_pin_extent_cb = NULL;
250
251         return !found;
252 }
253 EXPORT_SYMBOL(cache_del_extent_removal_cb);
254
255 static int cache_remove_extent_nolock(struct lustre_cache *cache,
256                                       struct osc_async_page *extent)
257 {
258         int have_lock = !!extent->oap_ldlm_lock;
259         /* We used to check oap_ldlm_lock for non NULL here, but it might be
260            NULL, in fact, due to parallel page eviction clearing it and waiting
261            on a lock's page list lock */
262         extent->oap_ldlm_lock = NULL;
263
264         if (!list_empty(&extent->oap_page_list))
265                 list_del_init(&extent->oap_page_list);
266
267         return have_lock;
268 }
269
270 /* Request the @extent to be removed from cache and locks it belongs to. */
271 void cache_remove_extent(struct lustre_cache *cache,
272                          struct osc_async_page *extent)
273 {
274         struct ldlm_lock *lock;
275
276         spin_lock(&extent->oap_lock);
277         lock = extent->oap_ldlm_lock;
278
279         extent->oap_ldlm_lock = NULL;
280         spin_unlock(&extent->oap_lock);
281
282         /* No lock - means this extent is not in any list */
283         if (!lock)
284                 return;
285
286         spin_lock(&lock->l_extents_list_lock);
287         if (!list_empty(&extent->oap_page_list))
288                 list_del_init(&extent->oap_page_list);
289         spin_unlock(&lock->l_extents_list_lock);
290 }
291
292 /* iterate through list of extents in given lock identified by @lockh,
293    calling @cb_func for every such extent. also passed @data to every call.
294    stops iterating prematurely if @cb_func returns nonzero. */
295 int cache_iterate_extents(struct lustre_cache *cache,
296                           struct lustre_handle *lockh,
297                           cache_iterate_extents_cb_t cb_func, void *data)
298 {
299         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
300         struct osc_async_page *extent, *t;
301
302         if (!lock)      // Lock disappeared
303                 return 0;
304         /* Parallel page removal from mem pressure can race with us */
305         spin_lock(&lock->l_extents_list_lock);
306         list_for_each_entry_safe(extent, t, &lock->l_extents_list,
307                                  oap_page_list) {
308                 if (cb_func(cache, lockh, extent, data))
309                         break;
310         }
311         spin_unlock(&lock->l_extents_list_lock);
312         LDLM_LOCK_PUT(lock);
313
314         return 0;
315 }
316
317 static int cache_remove_extents_from_lock(struct lustre_cache *cache,
318                                           struct ldlm_lock *lock, void *data)
319 {
320         struct osc_async_page *extent;
321         void *ext_data;
322
323         LASSERT(lock);
324
325         spin_lock(&lock->l_extents_list_lock);
326         while (!list_empty(&lock->l_extents_list)) {
327                 extent = list_entry(lock->l_extents_list.next,
328                                     struct osc_async_page, oap_page_list);
329
330                 spin_lock(&extent->oap_lock);
331                 /* If there is no lock referenced from this oap, it means
332                    there is parallel page-removal process waiting to free that
333                    page on l_extents_list_lock and it holds page lock.
334                    We need this page to completely go away and for that to
335                    happen we will just try to truncate it here too.
336                    Serialisation on page lock will achieve that goal for us. */
337                 /* Try to add extent back to the cache first, but only if we
338                  * cancel read lock, write locks cannot have other overlapping
339                  * locks. If adding is not possible (or canceling pw lock),
340                  * then remove extent from cache */
341                 if (!cache_remove_extent_nolock(cache, extent) ||
342                     (lock->l_granted_mode == LCK_PW) ||
343                     cache_add_extent(cache, &lock->l_resource->lr_name, extent,
344                                      NULL)) {
345                         /* We need to remember this oap_page value now,
346                            once we release spinlocks, extent struct
347                            might be freed and we endup requesting
348                            page with address 0x5a5a5a5a in
349                            cache_extent_removal_event */
350                         ext_data = extent->oap_page;
351                         cache->lc_pin_extent_cb(extent->oap_page);
352                         spin_unlock(&extent->oap_lock);
353                         spin_unlock(&lock->l_extents_list_lock);
354                         cache_extent_removal_event(cache, ext_data,
355                                                    lock->
356                                                    l_flags &
357                                                    LDLM_FL_DISCARD_DATA);
358                         spin_lock(&lock->l_extents_list_lock);
359                 } else {
360                         spin_unlock(&extent->oap_lock);
361                 }
362         }
363         spin_unlock(&lock->l_extents_list_lock);
364
365         return 0;
366 }
367
368 /* Remoes @lock from cache after necessary checks. */
369 int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
370 {
371         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
372
373         if (!lock)  // The lock was removed by somebody just now, nothing to do
374                 return 0;
375
376         cache_remove_extents_from_lock(cache, lock, NULL /*data */ );
377
378         spin_lock(&cache->lc_locks_list_lock);
379         list_del_init(&lock->l_cache_locks_list);
380         spin_unlock(&cache->lc_locks_list_lock);
381
382         LDLM_LOCK_PUT(lock);
383
384         return 0;
385 }
386
387 /* Supposed to iterate through all locks in the cache for given resource.
388    Not implemented atthe moment. */
389 int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
390                         cache_iterate_locks_cb_t cb_fun, void *data)
391 {
392         return -ENOTSUPP;
393 }
394
395 /* Create lustre cache and attach it to @obd */
396 struct lustre_cache *cache_create(struct obd_device *obd)
397 {
398         struct lustre_cache *cache;
399
400         OBD_ALLOC(cache, sizeof(*cache));
401         if (!cache)
402                 GOTO(out, NULL);
403         spin_lock_init(&cache->lc_locks_list_lock);
404         CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
405         CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
406         rwlock_init(&cache->lc_page_removal_cb_lock);
407         cache->lc_obd = obd;
408
409       out:
410         return cache;
411 }
412
413 /* Destroy @cache and free its memory */
414 int cache_destroy(struct lustre_cache *cache)
415 {
416         if (cache) {
417                 spin_lock(&cache->lc_locks_list_lock);
418                 if (!list_empty(&cache->lc_locks_list)) {
419                         struct ldlm_lock *lock, *tmp;
420                         CERROR("still have locks in the list on cleanup:\n");
421
422                         list_for_each_entry_safe(lock, tmp,
423                                                  &cache->lc_locks_list,
424                                                  l_cache_locks_list) {
425                                 list_del_init(&lock->l_cache_locks_list);
426                                 /* XXX: Of course natural idea would be to print
427                                    offending locks here, but if we use
428                                    e.g. LDLM_ERROR, we will likely crash here,
429                                    as LDLM error tries to access e.g.
430                                    nonexisting namespace. Normally this kind of
431                                    case could only happen when somebody did not
432                                    release lock reference and we have other ways
433                                    to detect this. */
434                                 /* Make sure there are no pages left under the
435                                    lock */
436                                 LASSERT(list_empty(&lock->l_extents_list));
437                         }
438                 }
439                 spin_unlock(&cache->lc_locks_list_lock);
440                 LASSERT(list_empty(&cache->lc_page_removal_callback_list));
441                 OBD_FREE(cache, sizeof(*cache));
442         }
443
444         return 0;
445 }