Whamcloud - gitweb
Land b1_8_gate onto b1_8 (20081218_1708)
[fs/lustre-release.git] / lustre / osc / cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osc/cache.c
37  *
38  * Cache of triples - object, lock, extent
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_OSC
45
46 #ifdef __KERNEL__
47 # include <linux/version.h>
48 # include <linux/module.h>
49 # include <linux/list.h>
50 #else                           /* __KERNEL__ */
51 # include <liblustre.h>
52 #endif
53
54 #include <lustre_dlm.h>
55 #include <lustre_cache.h>
56 #include <obd.h>
57 #include <lustre_debug.h>
58
59 #include "osc_internal.h"
60
61 /* Adding @lock to the @cache */
62 int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
63 {
64         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
65
66         if (!lock)      // Lock disappeared under us.
67                 return 0;
68
69         spin_lock(&cache->lc_locks_list_lock);
70         list_add_tail(&lock->l_cache_locks_list, &cache->lc_locks_list);
71         spin_unlock(&cache->lc_locks_list_lock);
72
73         LDLM_LOCK_PUT(lock);
74
75         return 0;
76 }
77
78 /* Tries to add @extent to lock represented by @lockh if non-NULL, otherwise
79    just tries to match some suitable lock by resource and data contained in
80    @extent */
81 /* Should be called with oap->lock held (except on initial addition, see
82    comment in osc_request.c*/
83 int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
84                      struct osc_async_page *extent, struct lustre_handle *lockh)
85 {
86         struct lustre_handle tmplockh;
87         ldlm_policy_data_t tmpex;
88         struct ldlm_lock *lock = NULL;
89         ENTRY;
90
91         /* Don't add anything second time */
92         if (!list_empty(&extent->oap_page_list)) {
93                 LBUG();
94                 RETURN(0);
95         }
96
97         if (lockh && lustre_handle_is_used(lockh)) {
98                 lock = ldlm_handle2lock(lockh);
99                 if (!lock)
100                         RETURN(-ENOLCK);
101
102                 LASSERTF(lock->l_policy_data.l_extent.start <=
103                          extent->oap_obj_off &&
104                          extent->oap_obj_off + CFS_PAGE_SIZE - 1 <=
105                          lock->l_policy_data.l_extent.end,
106                          "Got wrong lock [" LPU64 "," LPU64 "] for page with "
107                          "offset " LPU64 "\n",
108                          lock->l_policy_data.l_extent.start,
109                          lock->l_policy_data.l_extent.end, extent->oap_obj_off);
110         } else {
111                 int mode;
112                 /* Real extent width calculation here once we have real
113                  * extents
114                  */
115                 tmpex.l_extent.start = extent->oap_obj_off;
116                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
117
118                 /* XXX find lock from extent or something like that */
119                 /* The lock mode does not matter. If this is dirty page - then
120                  * there could be only one PW lock. If the page is clean,
121                  * any PR lock is good
122                  */
123                 mode = ldlm_lock_match(cache->lc_obd->obd_namespace,
124                                        LDLM_FL_BLOCK_GRANTED |
125                                        LDLM_FL_CBPENDING, res, LDLM_EXTENT,
126                                        &tmpex, LCK_PW | LCK_PR, &tmplockh);
127
128                 if (mode <= 0) {
129                         CDEBUG(D_CACHE, "No lock to attach " LPU64 "->" LPU64
130                                " extent to!\n", tmpex.l_extent.start,
131                                tmpex.l_extent.end);
132                         RETURN((mode < 0) ? mode : -ENOLCK);
133                 }
134
135                 lock = ldlm_handle2lock(&tmplockh);
136                 if (!lock) {    // Race - lock disappeared under us (eviction?)
137                         CDEBUG(D_CACHE, "Newly matched lock just disappeared "
138                                "under us\n");
139                         RETURN(-ENOLCK);
140                 }
141                 ldlm_lock_decref(&tmplockh, mode);
142         }
143
144         spin_lock(&lock->l_extents_list_lock);
145         list_add_tail(&extent->oap_page_list, &lock->l_extents_list);
146         spin_unlock(&lock->l_extents_list_lock);
147         extent->oap_ldlm_lock = lock;
148         LDLM_LOCK_PUT(lock);
149
150         RETURN(0);
151 }
152
153 static void cache_extent_removal_get(struct page_removal_cb_element *element)
154 {
155         atomic_inc(&element->prce_refcnt);
156 }
157
158 static void cache_extent_removal_put(struct page_removal_cb_element *element)
159 {
160         if(atomic_dec_and_test(&element->prce_refcnt))
161                 OBD_FREE_PTR(element);
162 }
163
164 static int cache_extent_removal_event(struct lustre_cache *cache,
165                                       void *data, int discard)
166 {
167         struct page *page = data;
168         struct list_head *iter;
169         struct page_removal_cb_element *element;
170
171         read_lock(&cache->lc_page_removal_cb_lock);
172         iter = cache->lc_page_removal_callback_list.next;
173         while(iter != &cache->lc_page_removal_callback_list) {
174                 element = list_entry(iter, struct page_removal_cb_element, prce_list);
175                 cache_extent_removal_get(element);
176                 read_unlock(&cache->lc_page_removal_cb_lock);
177
178                 element->prce_callback(page, discard);
179
180                 read_lock(&cache->lc_page_removal_cb_lock);
181                 iter = iter->next;
182                 cache_extent_removal_put(element);
183         }
184         read_unlock(&cache->lc_page_removal_cb_lock);
185
186         return 0;
187 }
188
189 /* Registers set of pin/remove callbacks for extents. Current limitation is
190    there could be only one pin_cb per cache.
191    @pin_cb is called when we have the page locked to pin it in memory so that
192    it does not disappear after we release page lock (which we need to do
193    to avoid deadlocks).
194    @func_cb is removal callback that is called after page and all spinlocks are
195    released, and is supposed to clean the page and remove it from all
196    (vfs) caches it might be in */
197 int cache_add_extent_removal_cb(struct lustre_cache *cache,
198                                 obd_page_removal_cb_t func_cb,
199                                 obd_pin_extent_cb pin_cb)
200 {
201         struct page_removal_cb_element *element;
202
203         if (!func_cb)
204                 return 0;
205
206         OBD_ALLOC_PTR(element);
207         if (!element)
208                 return -ENOMEM;
209         element->prce_callback = func_cb;
210         atomic_set(&element->prce_refcnt, 1);
211
212         write_lock(&cache->lc_page_removal_cb_lock);
213         list_add_tail(&element->prce_list,
214                       &cache->lc_page_removal_callback_list);
215         write_unlock(&cache->lc_page_removal_cb_lock);
216
217         cache->lc_pin_extent_cb = pin_cb;
218         return 0;
219 }
220 EXPORT_SYMBOL(cache_add_extent_removal_cb);
221
222 /* Unregister exntent removal callback registered earlier. If the list of
223    registered removal callbacks becomes empty, we also clear pin callback
224    since it could only be one */
225 int cache_del_extent_removal_cb(struct lustre_cache *cache,
226                                 obd_page_removal_cb_t func_cb)
227 {
228         int found = 0;
229         struct page_removal_cb_element *element, *t;
230
231         write_lock(&cache->lc_page_removal_cb_lock);
232         list_for_each_entry_safe(element, t,
233                                  &cache->lc_page_removal_callback_list,
234                                  prce_list) {
235                 if (element->prce_callback == func_cb) {
236                         list_del(&element->prce_list);
237                         write_unlock(&cache->lc_page_removal_cb_lock);
238                         found = 1;
239                         cache_extent_removal_put(element);
240                         write_lock(&cache->lc_page_removal_cb_lock);
241                         /* We continue iterating the list in case this function
242                            was registered more than once */
243                 }
244         }
245         write_unlock(&cache->lc_page_removal_cb_lock);
246
247         if (list_empty(&cache->lc_page_removal_callback_list))
248                 cache->lc_pin_extent_cb = NULL;
249
250         return !found;
251 }
252 EXPORT_SYMBOL(cache_del_extent_removal_cb);
253
254 static int cache_remove_extent_nolock(struct lustre_cache *cache,
255                                       struct osc_async_page *extent)
256 {
257         int have_lock = !!extent->oap_ldlm_lock;
258         /* We used to check oap_ldlm_lock for non NULL here, but it might be
259            NULL, in fact, due to parallel page eviction clearing it and waiting
260            on a lock's page list lock */
261         extent->oap_ldlm_lock = NULL;
262
263         if (!list_empty(&extent->oap_page_list))
264                 list_del_init(&extent->oap_page_list);
265
266         return have_lock;
267 }
268
269 /* Request the @extent to be removed from cache and locks it belongs to. */
270 void cache_remove_extent(struct lustre_cache *cache,
271                          struct osc_async_page *extent)
272 {
273         struct ldlm_lock *lock;
274
275         spin_lock(&extent->oap_lock);
276         lock = extent->oap_ldlm_lock;
277
278         extent->oap_ldlm_lock = NULL;
279         spin_unlock(&extent->oap_lock);
280
281         /* No lock - means this extent is not in any list */
282         if (!lock)
283                 return;
284
285         spin_lock(&lock->l_extents_list_lock);
286         if (!list_empty(&extent->oap_page_list))
287                 list_del_init(&extent->oap_page_list);
288         spin_unlock(&lock->l_extents_list_lock);
289 }
290
291 /* Iterate through list of extents in given lock identified by @lockh,
292    calling @cb_func for every such extent. Also passed @data to every call.
293    Stops iterating prematurely if @cb_func returns nonzero. */
294 int cache_iterate_extents(struct lustre_cache *cache,
295                           struct lustre_handle *lockh,
296                           cache_iterate_extents_cb_t cb_func, void *data)
297 {
298         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
299         struct osc_async_page *extent, *t;
300
301         if (!lock)      // Lock disappeared
302                 return 0;
303         /* Parallel page removal from mem pressure can race with us */
304         spin_lock(&lock->l_extents_list_lock);
305         list_for_each_entry_safe(extent, t, &lock->l_extents_list,
306                                  oap_page_list) {
307                 if (cb_func(cache, lockh, extent, data))
308                         break;
309         }
310         spin_unlock(&lock->l_extents_list_lock);
311         LDLM_LOCK_PUT(lock);
312
313         return 0;
314 }
315
316 static int cache_remove_extents_from_lock(struct lustre_cache *cache,
317                                           struct ldlm_lock *lock, void *data)
318 {
319         struct osc_async_page *extent;
320         void *ext_data;
321
322         LASSERT(lock);
323
324         spin_lock(&lock->l_extents_list_lock);
325         while (!list_empty(&lock->l_extents_list)) {
326                 extent = list_entry(lock->l_extents_list.next,
327                                     struct osc_async_page, oap_page_list);
328
329                 spin_lock(&extent->oap_lock);
330                 /* If there is no lock referenced from this oap, it means
331                    there is parallel page-removal process waiting to free that
332                    page on l_extents_list_lock and it holds page lock.
333                    We need this page to completely go away and for that to
334                    happen we will just try to truncate it here too.
335                    Serialisation on page lock will achieve that goal for us. */
336                 /* Try to add extent back to the cache first, but only if we
337                  * cancel read lock, write locks cannot have other overlapping
338                  * locks. If adding is not possible (or canceling pw lock),
339                  * then remove extent from cache */
340                 if (!cache_remove_extent_nolock(cache, extent) ||
341                     (lock->l_granted_mode == LCK_PW) ||
342                     cache_add_extent(cache, &lock->l_resource->lr_name, extent,
343                                      NULL)) {
344                         /* We need to remember this oap_page value now,
345                            once we release spinlocks, extent struct
346                            might be freed and we endup requesting
347                            page with address 0x5a5a5a5a in
348                            cache_extent_removal_event */
349                         ext_data = extent->oap_page;
350                         cache->lc_pin_extent_cb(extent->oap_page);
351
352                         if (lock->l_flags & LDLM_FL_BL_AST)
353                                 extent->oap_async_flags |= ASYNC_HP;
354                         spin_unlock(&extent->oap_lock);
355                         spin_unlock(&lock->l_extents_list_lock);
356                         cache_extent_removal_event(cache, ext_data,
357                                                    lock->
358                                                    l_flags &
359                                                    LDLM_FL_DISCARD_DATA);
360                         spin_lock(&lock->l_extents_list_lock);
361                 } else {
362                         spin_unlock(&extent->oap_lock);
363                 }
364         }
365         spin_unlock(&lock->l_extents_list_lock);
366
367         return 0;
368 }
369
370 /* Remoes @lock from cache after necessary checks. */
371 int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
372 {
373         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
374
375         if (!lock)  // The lock was removed by somebody just now, nothing to do
376                 return 0;
377
378         cache_remove_extents_from_lock(cache, lock, NULL /*data */ );
379
380         spin_lock(&cache->lc_locks_list_lock);
381         list_del_init(&lock->l_cache_locks_list);
382         spin_unlock(&cache->lc_locks_list_lock);
383
384         LDLM_LOCK_PUT(lock);
385
386         return 0;
387 }
388
389 /* Supposed to iterate through all locks in the cache for given resource.
390    Not implemented atthe moment. */
391 int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
392                         cache_iterate_locks_cb_t cb_fun, void *data)
393 {
394         return -ENOTSUPP;
395 }
396
397 /* Create lustre cache and attach it to @obd */
398 struct lustre_cache *cache_create(struct obd_device *obd)
399 {
400         struct lustre_cache *cache;
401
402         OBD_ALLOC(cache, sizeof(*cache));
403         if (!cache)
404                 GOTO(out, NULL);
405         spin_lock_init(&cache->lc_locks_list_lock);
406         CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
407         CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
408         rwlock_init(&cache->lc_page_removal_cb_lock);
409         cache->lc_obd = obd;
410
411       out:
412         return cache;
413 }
414
415 /* Destroy @cache and free its memory */
416 int cache_destroy(struct lustre_cache *cache)
417 {
418         if (cache) {
419                 spin_lock(&cache->lc_locks_list_lock);
420                 if (!list_empty(&cache->lc_locks_list)) {
421                         struct ldlm_lock *lock, *tmp;
422                         CERROR("still have locks in the list on cleanup:\n");
423
424                         list_for_each_entry_safe(lock, tmp,
425                                                  &cache->lc_locks_list,
426                                                  l_cache_locks_list) {
427                                 list_del_init(&lock->l_cache_locks_list);
428                                 /* XXX: Of course natural idea would be to print
429                                    offending locks here, but if we use
430                                    e.g. LDLM_ERROR, we will likely crash here,
431                                    as LDLM error tries to access e.g.
432                                    nonexisting namespace. Normally this kind of
433                                    case could only happen when somebody did not
434                                    release lock reference and we have other ways
435                                    to detect this. */
436                                 /* Make sure there are no pages left under the
437                                    lock */
438                                 LASSERT(list_empty(&lock->l_extents_list));
439                         }
440                 }
441                 spin_unlock(&cache->lc_locks_list_lock);
442                 LASSERT(list_empty(&cache->lc_page_removal_callback_list));
443                 OBD_FREE(cache, sizeof(*cache));
444         }
445
446         return 0;
447 }