Whamcloud - gitweb
b=24039 lfs setstripe --pool broken
[fs/lustre-release.git] / lustre / osc / cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osc/cache.c
37  *
38  * Cache of triples - object, lock, extent
39  */
40
41 #ifndef EXPORT_SYMTAB
42 # define EXPORT_SYMTAB
43 #endif
44 #define DEBUG_SUBSYSTEM S_OSC
45
46 #ifdef __KERNEL__
47 # include <linux/version.h>
48 # include <linux/module.h>
49 # include <linux/list.h>
50 #else                           /* __KERNEL__ */
51 # include <liblustre.h>
52 #endif
53
54 #include <lustre_dlm.h>
55 #include <lustre_cache.h>
56 #include <obd.h>
57 #include <lustre_debug.h>
58
59 #include "osc_internal.h"
60
61 /* Adding @lock to the @cache */
62 int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
63 {
64         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
65
66         if (!lock)      // Lock disappeared under us.
67                 return 0;
68
69         spin_lock(&cache->lc_locks_list_lock);
70         list_add_tail(&lock->l_cache_locks_list, &cache->lc_locks_list);
71         spin_unlock(&cache->lc_locks_list_lock);
72
73         LDLM_LOCK_PUT(lock);
74
75         return 0;
76 }
77
78 /* Tries to add @extent to lock represented by @lockh if non-NULL, otherwise
79    just tries to match some suitable lock by resource and data contained in
80    @extent */
81 /* Should be called with oap->lock held (except on initial addition, see
82    comment in osc_request.c*/
83 int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
84                      struct osc_async_page *extent, struct lustre_handle *lockh)
85 {
86         struct lustre_handle tmplockh;
87         ldlm_policy_data_t tmpex;
88         struct ldlm_lock *lock = NULL;
89         int mode = 0;
90         ENTRY;
91
92         /* Don't add anything second time */
93         if (!list_empty(&extent->oap_page_list)) {
94                 LBUG();
95                 RETURN(0);
96         }
97
98         if (lockh && lustre_handle_is_used(lockh)) {
99                 lock = ldlm_handle2lock(lockh);
100                 if (!lock)
101                         RETURN(-ENOLCK);
102
103                 if(lock->l_policy_data.l_extent.start > extent->oap_obj_off ||
104                    extent->oap_obj_off + CFS_PAGE_SIZE - 1 >
105                    lock->l_policy_data.l_extent.end) {
106                          CDEBUG(D_CACHE, "Got wrong lock [" LPU64 "," LPU64 "] "
107                                          "for page with offset " LPU64 "\n",
108                                          lock->l_policy_data.l_extent.start,
109                                          lock->l_policy_data.l_extent.end,
110                                          extent->oap_obj_off);
111                          LDLM_LOCK_PUT(lock);
112                          RETURN(-ENOLCK);
113                 }
114         } else {
115                 /* Real extent width calculation here once we have real
116                  * extents
117                  */
118                 tmpex.l_extent.start = extent->oap_obj_off;
119                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
120
121                 /* XXX find lock from extent or something like that */
122                 /* The lock mode does not matter. If this is dirty page - then
123                  * there could be only one PW lock. If the page is clean,
124                  * any PR lock is good
125                  */
126                 mode = ldlm_lock_match(cache->lc_obd->obd_namespace,
127                                        LDLM_FL_BLOCK_GRANTED |
128                                        LDLM_FL_CBPENDING, res, LDLM_EXTENT,
129                                        &tmpex, LCK_PW | LCK_PR, &tmplockh);
130
131                 if (mode <= 0) {
132                         CDEBUG(D_CACHE, "No lock to attach " LPU64 "->" LPU64
133                                " extent to!\n", tmpex.l_extent.start,
134                                tmpex.l_extent.end);
135                         RETURN((mode < 0) ? mode : -ENOLCK);
136                 }
137
138                 lock = ldlm_handle2lock(&tmplockh);
139                 if (!lock) {    // Race - lock disappeared under us (eviction?)
140                         CDEBUG(D_CACHE, "Newly matched lock just disappeared "
141                                "under us\n");
142                         RETURN(-ENOLCK);
143                 }
144                
145                 /* XXX Note! if the caller passed a unused lock handle,
146                  * it expects us to return the lockh of the lock we matched,
147                  * reference(LCK_PR) of the lock is increased here to assure
148                  * its validity, and the caller should decrease the reference
149                  * when it isn't used any more. */ 
150                 if (lockh && !lustre_handle_is_used(lockh)) {
151                         ldlm_lock_addref(&tmplockh, LCK_PR);
152                         lustre_handle_copy(lockh, &tmplockh);
153                 }
154         }
155
156         spin_lock(&lock->l_extents_list_lock);
157         list_add_tail(&extent->oap_page_list, &lock->l_extents_list);
158         spin_unlock(&lock->l_extents_list_lock);
159         extent->oap_ldlm_lock = lock;
160         LASSERTF(!(lock->l_flags & LDLM_FL_CANCEL), "Adding a page to already "
161                  "cancelled lock %p", lock);
162         if (mode)
163                 ldlm_lock_decref(&tmplockh, mode);
164         LDLM_LOCK_PUT(lock);
165
166         RETURN(0);
167 }
168
169 static void cache_extent_removal_get(struct page_removal_cb_element *element)
170 {
171         atomic_inc(&element->prce_refcnt);
172 }
173
174 static void cache_extent_removal_put(struct page_removal_cb_element *element)
175 {
176         if(atomic_dec_and_test(&element->prce_refcnt))
177                 OBD_FREE_PTR(element);
178 }
179
180 static int cache_extent_removal_event(struct lustre_cache *cache,
181                                       void *data, int discard)
182 {
183         struct page *page = data;
184         struct list_head *iter;
185         struct page_removal_cb_element *element;
186
187         read_lock(&cache->lc_page_removal_cb_lock);
188         iter = cache->lc_page_removal_callback_list.next;
189         while(iter != &cache->lc_page_removal_callback_list) {
190                 element = list_entry(iter, struct page_removal_cb_element, prce_list);
191                 cache_extent_removal_get(element);
192                 read_unlock(&cache->lc_page_removal_cb_lock);
193
194                 element->prce_callback(page, discard);
195
196                 read_lock(&cache->lc_page_removal_cb_lock);
197                 iter = iter->next;
198                 cache_extent_removal_put(element);
199         }
200         read_unlock(&cache->lc_page_removal_cb_lock);
201
202         return 0;
203 }
204
205 /* Registers set of pin/remove callbacks for extents. Current limitation is
206    there could be only one pin_cb per cache.
207    @pin_cb is called when we have the page locked to pin it in memory so that
208    it does not disappear after we release page lock (which we need to do
209    to avoid deadlocks).
210    @func_cb is removal callback that is called after page and all spinlocks are
211    released, and is supposed to clean the page and remove it from all
212    (vfs) caches it might be in */
213 int cache_add_extent_removal_cb(struct lustre_cache *cache,
214                                 obd_page_removal_cb_t func_cb,
215                                 obd_pin_extent_cb pin_cb)
216 {
217         struct page_removal_cb_element *element;
218
219         if (!func_cb)
220                 return 0;
221
222         OBD_ALLOC_PTR(element);
223         if (!element)
224                 return -ENOMEM;
225         element->prce_callback = func_cb;
226         atomic_set(&element->prce_refcnt, 1);
227
228         write_lock(&cache->lc_page_removal_cb_lock);
229         list_add_tail(&element->prce_list,
230                       &cache->lc_page_removal_callback_list);
231         write_unlock(&cache->lc_page_removal_cb_lock);
232
233         cache->lc_pin_extent_cb = pin_cb;
234         return 0;
235 }
236 EXPORT_SYMBOL(cache_add_extent_removal_cb);
237
238 /* Unregister exntent removal callback registered earlier. If the list of
239    registered removal callbacks becomes empty, we also clear pin callback
240    since it could only be one */
241 int cache_del_extent_removal_cb(struct lustre_cache *cache,
242                                 obd_page_removal_cb_t func_cb)
243 {
244         int found = 0;
245         struct page_removal_cb_element *element, *t;
246         ENTRY;
247
248         write_lock(&cache->lc_page_removal_cb_lock);
249         list_for_each_entry_safe(element, t,
250                                  &cache->lc_page_removal_callback_list,
251                                  prce_list) {
252                 if (element->prce_callback == func_cb) {
253                         list_del(&element->prce_list);
254                         write_unlock(&cache->lc_page_removal_cb_lock);
255                         found = 1;
256                         cache_extent_removal_put(element);
257                         write_lock(&cache->lc_page_removal_cb_lock);
258                         /* We continue iterating the list in case this function
259                            was registered more than once */
260                 }
261         }
262         write_unlock(&cache->lc_page_removal_cb_lock);
263
264         if (list_empty(&cache->lc_page_removal_callback_list))
265                 cache->lc_pin_extent_cb = NULL;
266
267         return !found;
268 }
269 EXPORT_SYMBOL(cache_del_extent_removal_cb);
270
271 static int cache_remove_extent_nolock(struct lustre_cache *cache,
272                                       struct osc_async_page *extent)
273 {
274         int have_lock = !!extent->oap_ldlm_lock;
275         /* We used to check oap_ldlm_lock for non NULL here, but it might be
276            NULL, in fact, due to parallel page eviction clearing it and waiting
277            on a lock's page list lock */
278         extent->oap_ldlm_lock = NULL;
279
280         if (!list_empty(&extent->oap_page_list))
281                 list_del_init(&extent->oap_page_list);
282
283         return have_lock;
284 }
285
286 /* Request the @extent to be removed from cache and locks it belongs to. */
287 void cache_remove_extent(struct lustre_cache *cache,
288                          struct osc_async_page *extent)
289 {
290         struct ldlm_lock *lock;
291
292         spin_lock(&extent->oap_lock);
293         lock = extent->oap_ldlm_lock;
294
295         extent->oap_ldlm_lock = NULL;
296         spin_unlock(&extent->oap_lock);
297
298         /* No lock - means this extent is not in any list */
299         if (!lock)
300                 return;
301
302         spin_lock(&lock->l_extents_list_lock);
303         if (!list_empty(&extent->oap_page_list))
304                 list_del_init(&extent->oap_page_list);
305         spin_unlock(&lock->l_extents_list_lock);
306 }
307
308 /* Iterate through list of extents in given lock identified by @lockh,
309    calling @cb_func for every such extent. Also passed @data to every call.
310    Stops iterating prematurely if @cb_func returns nonzero. */
311 int cache_iterate_extents(struct lustre_cache *cache,
312                           struct lustre_handle *lockh,
313                           cache_iterate_extents_cb_t cb_func, void *data)
314 {
315         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
316         struct osc_async_page *extent, *t;
317
318         if (!lock)      // Lock disappeared
319                 return 0;
320         /* Parallel page removal from mem pressure can race with us */
321         spin_lock(&lock->l_extents_list_lock);
322         list_for_each_entry_safe(extent, t, &lock->l_extents_list,
323                                  oap_page_list) {
324                 if (cb_func(cache, lockh, extent, data))
325                         break;
326         }
327         spin_unlock(&lock->l_extents_list_lock);
328         LDLM_LOCK_PUT(lock);
329
330         return 0;
331 }
332
333 static int cache_remove_extents_from_lock(struct lustre_cache *cache,
334                                           struct ldlm_lock *lock, void *data)
335 {
336         struct osc_async_page *extent;
337         void *ext_data;
338
339         LASSERT(lock);
340
341         spin_lock(&lock->l_extents_list_lock);
342         while (!list_empty(&lock->l_extents_list)) {
343                 if (unlikely(cfs_cond_resched_lock(&lock->l_extents_list_lock)))
344                         continue;
345
346                 extent = list_entry(lock->l_extents_list.next,
347                                     struct osc_async_page, oap_page_list);
348
349                 spin_lock(&extent->oap_lock);
350                 /* If there is no lock referenced from this oap, it means
351                    there is parallel page-removal process waiting to free that
352                    page on l_extents_list_lock and it holds page lock.
353                    We need this page to completely go away and for that to
354                    happen we will just try to truncate it here too.
355                    Serialisation on page lock will achieve that goal for us. */
356                 /* Try to add extent back to the cache first, but only if we
357                  * cancel read lock, write locks cannot have other overlapping
358                  * locks. If adding is not possible (or canceling pw lock),
359                  * then remove extent from cache */
360                 if (!cache_remove_extent_nolock(cache, extent) ||
361                     (lock->l_granted_mode == LCK_PW) ||
362                     cache_add_extent(cache, &lock->l_resource->lr_name, extent,
363                                      NULL)) {
364                         /* We need to remember this oap_page value now,
365                            once we release spinlocks, extent struct
366                            might be freed and we endup requesting
367                            page with address 0x5a5a5a5a in
368                            cache_extent_removal_event */
369                         ext_data = extent->oap_page;
370                         LASSERT(cache->lc_pin_extent_cb != NULL);
371                         cache->lc_pin_extent_cb(extent->oap_page);
372
373                         if (lock->l_flags & LDLM_FL_BL_AST)
374                                 extent->oap_async_flags |= ASYNC_HP;
375                         spin_unlock(&extent->oap_lock);
376                         spin_unlock(&lock->l_extents_list_lock);
377                         cache_extent_removal_event(cache, ext_data,
378                                                    lock->
379                                                    l_flags &
380                                                    LDLM_FL_DISCARD_DATA);
381                         spin_lock(&lock->l_extents_list_lock);
382                 } else {
383                         spin_unlock(&extent->oap_lock);
384                 }
385         }
386         spin_unlock(&lock->l_extents_list_lock);
387
388         return 0;
389 }
390
391 /* Remoes @lock from cache after necessary checks. */
392 int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
393 {
394         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
395
396         if (!lock)  // The lock was removed by somebody just now, nothing to do
397                 return 0;
398
399         cache_remove_extents_from_lock(cache, lock, NULL /*data */ );
400
401         spin_lock(&cache->lc_locks_list_lock);
402         list_del_init(&lock->l_cache_locks_list);
403         spin_unlock(&cache->lc_locks_list_lock);
404
405         LDLM_LOCK_PUT(lock);
406
407         return 0;
408 }
409
410 /* Supposed to iterate through all locks in the cache for given resource.
411    Not implemented atthe moment. */
412 int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
413                         cache_iterate_locks_cb_t cb_fun, void *data)
414 {
415         return -ENOTSUPP;
416 }
417
418 /* Create lustre cache and attach it to @obd */
419 struct lustre_cache *cache_create(struct obd_device *obd)
420 {
421         struct lustre_cache *cache;
422
423         OBD_ALLOC(cache, sizeof(*cache));
424         if (!cache)
425                 GOTO(out, NULL);
426
427         spin_lock_init(&cache->lc_locks_list_lock);
428         CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
429         CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
430         rwlock_init(&cache->lc_page_removal_cb_lock);
431         cache->lc_obd = obd;
432
433 out:
434         return cache;
435 }
436
437 /* Destroy @cache and free its memory */
438 int cache_destroy(struct lustre_cache *cache)
439 {
440         if (!cache)
441                 RETURN(0);
442
443         spin_lock(&cache->lc_locks_list_lock);
444         if (!list_empty(&cache->lc_locks_list)) {
445                 struct ldlm_lock *lock, *tmp;
446                 CERROR("still have locks in the list on cleanup:\n");
447
448                 list_for_each_entry_safe(lock, tmp,
449                                          &cache->lc_locks_list,
450                                          l_cache_locks_list) {
451                         list_del_init(&lock->l_cache_locks_list);
452                         /* XXX: Of course natural idea would be to print
453                          * offending locks here, but if we use
454                          * e.g. LDLM_ERROR, we will likely crash here,
455                          * as LDLM error tries to access e.g.
456                          * nonexisting namespace. Normally this kind of
457                          * case could only happen when somebody did not
458                          * release lock reference and we have other ways
459                          * to detect this. */
460                         /* Make sure there are no pages left under the
461                          * lock */
462                         LASSERT(list_empty(&lock->l_extents_list));
463                 }
464         }
465         spin_unlock(&cache->lc_locks_list_lock);
466         LASSERT(list_empty(&cache->lc_page_removal_callback_list));
467
468         OBD_FREE(cache, sizeof(*cache));
469         return 0;
470 }