Whamcloud - gitweb
add recount protection for osc callbacks, so avoid panic on shutdown
[fs/lustre-release.git] / lustre / osc / cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Oleg Drokin <green@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  * Cache of triples - object, lock, extent
26  */
27
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_OSC
32
33 #ifdef __KERNEL__
34 # include <linux/version.h>
35 # include <linux/module.h>
36 # include <linux/list.h>
37 #else                           /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <lustre_dlm.h>
42 #include <lustre_cache.h>
43 #include <obd.h>
44 #include <lustre_debug.h>
45
46 #include "osc_internal.h"
47
48 /* Adding @lock to the @cache */
49 int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
50 {
51         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
52
53         if (!lock)      // Lock disappeared under us.
54                 return 0;
55
56         spin_lock(&cache->lc_locks_list_lock);
57         list_add_tail(&lock->l_cache_locks_list, &cache->lc_locks_list);
58         spin_unlock(&cache->lc_locks_list_lock);
59
60         LDLM_LOCK_PUT(lock);
61
62         return 0;
63 }
64
65 /* Tries to add @extent to lock represented by @lockh if non-NULL, otherwise
66    just tries to match some suitable lock by resource and data contained in
67    @extent */
68 /* Should be called with oap->lock held (except on initial addition, see
69    comment in osc_request.c*/
70 int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
71                      struct osc_async_page *extent, struct lustre_handle *lockh)
72 {
73         struct lustre_handle tmplockh;
74         ldlm_policy_data_t tmpex;
75         struct ldlm_lock *lock = NULL;
76         ENTRY;
77
78         /* Don't add anything second time */
79         if (!list_empty(&extent->oap_page_list)) {
80                 LBUG();
81                 RETURN(0);
82         }
83
84         if (lockh && lustre_handle_is_used(lockh)) {
85                 lock = ldlm_handle2lock(lockh);
86                 if (!lock)
87                         RETURN(-ENOLCK);
88
89                 LASSERTF(lock->l_policy_data.l_extent.start <=
90                          extent->oap_obj_off &&
91                          extent->oap_obj_off + CFS_PAGE_SIZE - 1 <=
92                          lock->l_policy_data.l_extent.end,
93                          "Got wrong lock [" LPU64 "," LPU64 "] for page with "
94                          "offset " LPU64 "\n",
95                          lock->l_policy_data.l_extent.start,
96                          lock->l_policy_data.l_extent.end, extent->oap_obj_off);
97         } else {
98                 int mode;
99                 /* Real extent width calculation here once we have real
100                  * extents
101                  */
102                 tmpex.l_extent.start = extent->oap_obj_off;
103                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
104
105                 /* XXX find lock from extent or something like that */
106                 /* The lock mode does not matter. If this is dirty page - then
107                  * there could be only one PW lock. If the page is clean,
108                  * any PR lock is good
109                  */
110                 mode = ldlm_lock_match(cache->lc_obd->obd_namespace,
111                                        LDLM_FL_BLOCK_GRANTED |
112                                        LDLM_FL_CBPENDING, res, LDLM_EXTENT,
113                                        &tmpex, LCK_PW | LCK_PR, &tmplockh);
114
115                 if (mode <= 0) {
116                         CDEBUG(D_CACHE, "No lock to attach " LPU64 "->" LPU64
117                                " extent to!\n", tmpex.l_extent.start,
118                                tmpex.l_extent.end);
119                         RETURN((mode < 0) ? mode : -ENOLCK);
120                 }
121
122                 lock = ldlm_handle2lock(&tmplockh);
123                 if (!lock) {    // Race - lock disappeared under us (eviction?)
124                         CDEBUG(D_CACHE, "Newly matched lock just disappeared "
125                                "under us\n");
126                         RETURN(-ENOLCK);
127                 }
128                 ldlm_lock_decref(&tmplockh, mode);
129         }
130
131         spin_lock(&lock->l_extents_list_lock);
132         list_add_tail(&extent->oap_page_list, &lock->l_extents_list);
133         spin_unlock(&lock->l_extents_list_lock);
134         extent->oap_ldlm_lock = lock;
135         LDLM_LOCK_PUT(lock);
136
137         RETURN(0);
138 }
139
140 static void cache_extent_removal_get(struct page_removal_cb_element *element)
141 {
142         atomic_inc(&element->prce_refcnt);
143 }
144
145 static void cache_extent_removal_put(struct page_removal_cb_element *element)
146 {
147         if(atomic_dec_and_test(&element->prce_refcnt))
148                 OBD_FREE_PTR(element);
149 }
150
151 static int cache_extent_removal_event(struct lustre_cache *cache,
152                                       void *data, int discard)
153 {
154         struct page *page = data;
155         struct list_head *iter;
156         struct page_removal_cb_element *element;
157
158         read_lock(&cache->lc_page_removal_cb_lock);
159         iter = cache->lc_page_removal_callback_list.next;
160         while(iter != &cache->lc_page_removal_callback_list) {
161                 element = list_entry(iter, struct page_removal_cb_element, prce_list);
162                 cache_extent_removal_get(element);
163                 read_unlock(&cache->lc_page_removal_cb_lock);
164
165                 element->prce_callback(page, discard);
166
167                 read_lock(&cache->lc_page_removal_cb_lock);
168                 iter = iter->next;
169                 cache_extent_removal_put(element);
170         }
171         read_unlock(&cache->lc_page_removal_cb_lock);
172
173         return 0;
174 }
175
176 /* Registers set of pin/remove callbacks for extents. Current limitation is
177    there could be only one pin_cb per cache.
178    @pin_cb is called when we have the page locked to pin it in memory so that
179    it does not disappear after we release page lock (which we need to do
180    to avoid deadlocks).
181    @func_cb is removal callback that is called after page and all spinlocks are
182    released, and is supposed to clean the page and remove it from all
183    (vfs) caches it might be in */
184 int cache_add_extent_removal_cb(struct lustre_cache *cache,
185                                 obd_page_removal_cb_t func_cb,
186                                 obd_pin_extent_cb pin_cb)
187 {
188         struct page_removal_cb_element *element;
189
190         if (!func_cb)
191                 return 0;
192
193         OBD_ALLOC_PTR(element);
194         if (!element)
195                 return -ENOMEM;
196         element->prce_callback = func_cb;
197         atomic_set(&element->prce_refcnt, 1);
198
199         write_lock(&cache->lc_page_removal_cb_lock);
200         list_add_tail(&element->prce_list,
201                       &cache->lc_page_removal_callback_list);
202         write_unlock(&cache->lc_page_removal_cb_lock);
203
204         cache->lc_pin_extent_cb = pin_cb;
205         return 0;
206 }
207 EXPORT_SYMBOL(cache_add_extent_removal_cb);
208
209 /* Unregister exntent removal callback registered earlier. If the list of
210    registered removal callbacks becomes empty, we also clear pin callback
211    since it could only be one */
212 int cache_del_extent_removal_cb(struct lustre_cache *cache,
213                                 obd_page_removal_cb_t func_cb)
214 {
215         int found = 0;
216         struct page_removal_cb_element *element, *t;
217
218         write_lock(&cache->lc_page_removal_cb_lock);
219         list_for_each_entry_safe(element, t,
220                                  &cache->lc_page_removal_callback_list,
221                                  prce_list) {
222                 if (element->prce_callback == func_cb) {
223                         list_del(&element->prce_list);
224                         write_unlock(&cache->lc_page_removal_cb_lock);
225                         found = 1;
226                         cache_extent_removal_put(element);
227                         write_lock(&cache->lc_page_removal_cb_lock);
228                         /* We continue iterating the list in case this function
229                            was registered more than once */
230                 }
231         }
232         write_unlock(&cache->lc_page_removal_cb_lock);
233
234         if (list_empty(&cache->lc_page_removal_callback_list))
235                 cache->lc_pin_extent_cb = NULL;
236
237         return !found;
238 }
239 EXPORT_SYMBOL(cache_del_extent_removal_cb);
240
241 static int cache_remove_extent_nolock(struct lustre_cache *cache,
242                                       struct osc_async_page *extent)
243 {
244         int have_lock = !!extent->oap_ldlm_lock;
245         /* We used to check oap_ldlm_lock for non NULL here, but it might be
246            NULL, in fact, due to parallel page eviction clearing it and waiting
247            on a lock's page list lock */
248         extent->oap_ldlm_lock = NULL;
249
250         if (!list_empty(&extent->oap_page_list))
251                 list_del_init(&extent->oap_page_list);
252
253         return have_lock;
254 }
255
256 /* Request the @extent to be removed from cache and locks it belongs to. */
257 void cache_remove_extent(struct lustre_cache *cache,
258                          struct osc_async_page *extent)
259 {
260         struct ldlm_lock *lock;
261
262         spin_lock(&extent->oap_lock);
263         lock = extent->oap_ldlm_lock;
264
265         extent->oap_ldlm_lock = NULL;
266         spin_unlock(&extent->oap_lock);
267
268         /* No lock - means this extent is not in any list */
269         if (!lock)
270                 return;
271
272         spin_lock(&lock->l_extents_list_lock);
273         if (!list_empty(&extent->oap_page_list))
274                 list_del_init(&extent->oap_page_list);
275         spin_unlock(&lock->l_extents_list_lock);
276 }
277
278 /* Iterate through list of extents in given lock identified by @lockh,
279    calling @cb_func for every such extent. Also passed @data to every call.
280    Stops iterating prematurely if @cb_func returns nonzero. */
281 int cache_iterate_extents(struct lustre_cache *cache,
282                           struct lustre_handle *lockh,
283                           cache_iterate_extents_cb_t cb_func, void *data)
284 {
285         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
286         struct osc_async_page *extent, *t;
287
288         if (!lock)      // Lock disappeared
289                 return 0;
290         /* Parallel page removal from mem pressure can race with us */
291         spin_lock(&lock->l_extents_list_lock);
292         list_for_each_entry_safe(extent, t, &lock->l_extents_list,
293                                  oap_page_list) {
294                 if (cb_func(cache, lockh, extent, data))
295                         break;
296         }
297         spin_unlock(&lock->l_extents_list_lock);
298         LDLM_LOCK_PUT(lock);
299
300         return 0;
301 }
302
303 static int cache_remove_extents_from_lock(struct lustre_cache *cache,
304                                           struct ldlm_lock *lock, void *data)
305 {
306         struct osc_async_page *extent;
307         void *ext_data;
308
309         LASSERT(lock);
310
311         spin_lock(&lock->l_extents_list_lock);
312         while (!list_empty(&lock->l_extents_list)) {
313                 extent = list_entry(lock->l_extents_list.next,
314                                     struct osc_async_page, oap_page_list);
315
316                 spin_lock(&extent->oap_lock);
317                 /* If there is no lock referenced from this oap, it means
318                    there is parallel page-removal process waiting to free that
319                    page on l_extents_list_lock and it holds page lock.
320                    We need this page to completely go away and for that to
321                    happen we will just try to truncate it here too.
322                    Serialisation on page lock will achieve that goal for us. */
323                 /* Try to add extent back to the cache first, but only if we
324                  * cancel read lock, write locks cannot have other overlapping
325                  * locks. If adding is not possible (or canceling pw lock),
326                  * then remove extent from cache */
327                 if (!cache_remove_extent_nolock(cache, extent) ||
328                     (lock->l_granted_mode == LCK_PW) ||
329                     cache_add_extent(cache, &lock->l_resource->lr_name, extent,
330                                      NULL)) {
331                         /* We need to remember this oap_page value now,
332                            once we release spinlocks, extent struct
333                            might be freed and we endup requesting
334                            page with address 0x5a5a5a5a in
335                            cache_extent_removal_event */
336                         ext_data = extent->oap_page;
337                         cache->lc_pin_extent_cb(extent->oap_page);
338                         spin_unlock(&extent->oap_lock);
339                         spin_unlock(&lock->l_extents_list_lock);
340                         cache_extent_removal_event(cache, ext_data,
341                                                    lock->
342                                                    l_flags &
343                                                    LDLM_FL_DISCARD_DATA);
344                         spin_lock(&lock->l_extents_list_lock);
345                 } else {
346                         spin_unlock(&extent->oap_lock);
347                 }
348         }
349         spin_unlock(&lock->l_extents_list_lock);
350
351         return 0;
352 }
353
354 /* Remoes @lock from cache after necessary checks. */
355 int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
356 {
357         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
358
359         if (!lock)  // The lock was removed by somebody just now, nothing to do
360                 return 0;
361
362         cache_remove_extents_from_lock(cache, lock, NULL /*data */ );
363
364         spin_lock(&cache->lc_locks_list_lock);
365         list_del_init(&lock->l_cache_locks_list);
366         spin_unlock(&cache->lc_locks_list_lock);
367
368         LDLM_LOCK_PUT(lock);
369
370         return 0;
371 }
372
373 /* Supposed to iterate through all locks in the cache for given resource.
374    Not implemented atthe moment. */
375 int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
376                         cache_iterate_locks_cb_t cb_fun, void *data)
377 {
378         return -ENOTSUPP;
379 }
380
381 /* Create lustre cache and attach it to @obd */
382 struct lustre_cache *cache_create(struct obd_device *obd)
383 {
384         struct lustre_cache *cache;
385
386         OBD_ALLOC(cache, sizeof(*cache));
387         if (!cache)
388                 GOTO(out, NULL);
389         spin_lock_init(&cache->lc_locks_list_lock);
390         CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
391         CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
392         rwlock_init(&cache->lc_page_removal_cb_lock);
393         cache->lc_obd = obd;
394
395       out:
396         return cache;
397 }
398
399 /* Destroy @cache and free its memory */
400 int cache_destroy(struct lustre_cache *cache)
401 {
402         if (cache) {
403                 spin_lock(&cache->lc_locks_list_lock);
404                 if (!list_empty(&cache->lc_locks_list)) {
405                         struct ldlm_lock *lock, *tmp;
406                         CERROR("still have locks in the list on cleanup:\n");
407
408                         list_for_each_entry_safe(lock, tmp,
409                                                  &cache->lc_locks_list,
410                                                  l_cache_locks_list) {
411                                 list_del_init(&lock->l_cache_locks_list);
412                                 /* XXX: Of course natural idea would be to print
413                                    offending locks here, but if we use
414                                    e.g. LDLM_ERROR, we will likely crash here,
415                                    as LDLM error tries to access e.g.
416                                    nonexisting namespace. Normally this kind of
417                                    case could only happen when somebody did not
418                                    release lock reference and we have other ways
419                                    to detect this. */
420                                 /* Make sure there are no pages left under the
421                                    lock */
422                                 LASSERT(list_empty(&lock->l_extents_list));
423                         }
424                 }
425                 spin_unlock(&cache->lc_locks_list_lock);
426                 LASSERT(list_empty(&cache->lc_page_removal_callback_list));
427                 OBD_FREE(cache, sizeof(*cache));
428         }
429
430         return 0;
431 }