Whamcloud - gitweb
fix code comment.
[fs/lustre-release.git] / lustre / osc / cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Oleg Drokin <green@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  * Cache of triples - object, lock, extent
26  */
27
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_OSC
32
33 #ifdef __KERNEL__
34 # include <linux/version.h>
35 # include <linux/module.h>
36 # include <linux/list.h>
37 #else                           /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <lustre_dlm.h>
42 #include <lustre_cache.h>
43 #include <obd.h>
44 #include <lustre_debug.h>
45
46 #include "osc_internal.h"
47
48 /* Adding @lock to the @cache */
49 int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
50 {
51         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
52
53         if (!lock)      // Lock disappeared under us.
54                 return 0;
55
56         spin_lock(&cache->lc_locks_list_lock);
57         list_add_tail(&lock->l_cache_locks_list, &cache->lc_locks_list);
58         spin_unlock(&cache->lc_locks_list_lock);
59
60         LDLM_LOCK_PUT(lock);
61
62         return 0;
63 }
64
65 /* Tries to add @extent to lock represented by @lockh if non-NULL, otherwise
66    just tries to match some suitable lock by resource and data contained in
67    @extent */
68 /* Should be called with oap->lock held (except on initial addition, see
69    comment in osc_request.c*/
70 int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
71                      struct osc_async_page *extent, struct lustre_handle *lockh)
72 {
73         struct lustre_handle tmplockh;
74         ldlm_policy_data_t tmpex;
75         struct ldlm_lock *lock = NULL;
76         ENTRY;
77
78         /* Don't add anything second time */
79         if (!list_empty(&extent->oap_page_list)) {
80                 LBUG();
81                 RETURN(0);
82         }
83
84         if (lockh && lustre_handle_is_used(lockh)) {
85                 lock = ldlm_handle2lock(lockh);
86                 if (!lock)
87                         RETURN(-ENOLCK);
88
89                 LASSERTF(lock->l_policy_data.l_extent.start <=
90                          extent->oap_obj_off &&
91                          extent->oap_obj_off + CFS_PAGE_SIZE - 1 <=
92                          lock->l_policy_data.l_extent.end,
93                          "Got wrong lock [" LPU64 "," LPU64 "] for page with "
94                          "offset " LPU64 "\n",
95                          lock->l_policy_data.l_extent.start,
96                          lock->l_policy_data.l_extent.end, extent->oap_obj_off);
97         } else {
98                 int mode;
99                 /* Real extent width calculation here once we have real
100                  * extents
101                  */
102                 tmpex.l_extent.start = extent->oap_obj_off;
103                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
104
105                 /* XXX find lock from extent or something like that */
106                 /* The lock mode does not matter. If this is dirty page - then
107                  * there could be only one PW lock. If the page is clean,
108                  * any PR lock is good
109                  */
110
111                 mode = ldlm_lock_match(cache->lc_obd->obd_namespace,
112                                        LDLM_FL_BLOCK_GRANTED |
113                                        LDLM_FL_CBPENDING, res, LDLM_EXTENT,
114                                        &tmpex, LCK_PW | LCK_PR, &tmplockh);
115
116                 if (mode <= 0) {
117                         CDEBUG(D_CACHE, "No lock to attach " LPU64 "->" LPU64
118                                " extent to!\n", tmpex.l_extent.start,
119                                tmpex.l_extent.end);
120                         RETURN((mode < 0) ? mode : -ENOLCK);
121                 }
122
123                 lock = ldlm_handle2lock(&tmplockh);
124                 if (!lock) {    // Race - lock disappeared under us (eviction?)
125                         CDEBUG(D_CACHE, "Newly matched lock just disappeared "
126                                "under us\n");
127                         RETURN(-ENOLCK);
128                 }
129                 ldlm_lock_decref(&tmplockh, mode);
130         }
131
132         spin_lock(&lock->l_extents_list_lock);
133         list_add_tail(&extent->oap_page_list, &lock->l_extents_list);
134         spin_unlock(&lock->l_extents_list_lock);
135         extent->oap_ldlm_lock = lock;
136         LDLM_LOCK_PUT(lock);
137
138         RETURN(0);
139 }
140
141 static void cache_extent_removal_get(struct page_removal_cb_element *element)
142 {
143         atomic_inc(&element->prce_refcnt);
144 }
145
146 static void cache_extent_removal_put(struct page_removal_cb_element *element)
147 {
148         if(atomic_dec_and_test(&element->prce_refcnt))
149                 OBD_FREE_PTR(element);
150 }
151
152 static int cache_extent_removal_event(struct lustre_cache *cache,
153                                       void *data, int discard)
154 {
155         struct page *page = data;
156         struct list_head *iter;
157         struct page_removal_cb_element *element;
158
159         read_lock(&cache->lc_page_removal_cb_lock);
160         iter = cache->lc_page_removal_callback_list.next;
161         while(iter != &cache->lc_page_removal_callback_list) {
162                 element = list_entry(iter, struct page_removal_cb_element, prce_list);
163                 cache_extent_removal_get(element);
164                 read_unlock(&cache->lc_page_removal_cb_lock);
165
166                 element->prce_callback(page, discard);
167
168                 read_lock(&cache->lc_page_removal_cb_lock);
169                 iter = iter->next;
170                 cache_extent_removal_put(element);
171         }
172         read_unlock(&cache->lc_page_removal_cb_lock);
173
174         return 0;
175 }
176
177 /* Registers set of pin/remove callbacks for extents. Current limitation is
178    there could be only one pin_cb per cache.
179    @pin_cb is called when we have the page locked to pin it in memory so that
180    it does not disappear after we release page lock (which we need to do
181    to avoid deadlocks).
182    @func_cb is removal callback that is called after page and all spinlocks are
183    released, and is supposed to clean the page and remove it from all
184    (vfs) caches it might be in */
185 int cache_add_extent_removal_cb(struct lustre_cache *cache,
186                                 obd_page_removal_cb_t func_cb,
187                                 obd_pin_extent_cb pin_cb)
188 {
189         struct page_removal_cb_element *element;
190
191         if (!func_cb)
192                 return 0;
193
194         OBD_ALLOC_PTR(element);
195         if (!element)
196                 return -ENOMEM;
197         element->prce_callback = func_cb;
198         atomic_set(&element->prce_refcnt, 1);
199
200         write_lock(&cache->lc_page_removal_cb_lock);
201         list_add_tail(&element->prce_list,
202                       &cache->lc_page_removal_callback_list);
203         write_unlock(&cache->lc_page_removal_cb_lock);
204
205         cache->lc_pin_extent_cb = pin_cb;
206         return 0;
207 }
208 EXPORT_SYMBOL(cache_add_extent_removal_cb);
209
210 /* Unregister exntent removal callback registered earlier. If the list of
211    registered removal callbacks becomes empty, we also clear pin callback
212    since it could only be one */
213 int cache_del_extent_removal_cb(struct lustre_cache *cache,
214                                 obd_page_removal_cb_t func_cb)
215 {
216         int found = 0;
217         struct page_removal_cb_element *element, *t;
218
219         write_lock(&cache->lc_page_removal_cb_lock);
220         list_for_each_entry_safe(element, t,
221                                  &cache->lc_page_removal_callback_list,
222                                  prce_list) {
223                 if (element->prce_callback == func_cb) {
224                         list_del(&element->prce_list);
225                         write_unlock(&cache->lc_page_removal_cb_lock);
226                         found = 1;
227                         cache_extent_removal_put(element);
228                         write_lock(&cache->lc_page_removal_cb_lock);
229                         /* We continue iterating the list in case this function
230                            was registered more than once */
231                 }
232         }
233         write_unlock(&cache->lc_page_removal_cb_lock);
234
235         if (list_empty(&cache->lc_page_removal_callback_list))
236                 cache->lc_pin_extent_cb = NULL;
237
238         return !found;
239 }
240 EXPORT_SYMBOL(cache_del_extent_removal_cb);
241
242 static int cache_remove_extent_nolock(struct lustre_cache *cache,
243                                       struct osc_async_page *extent)
244 {
245         int have_lock = !!extent->oap_ldlm_lock;
246         /* We used to check oap_ldlm_lock for non NULL here, but it might be
247            NULL, in fact, due to parallel page eviction clearing it and waiting
248            on a lock's page list lock */
249         extent->oap_ldlm_lock = NULL;
250
251         if (!list_empty(&extent->oap_page_list))
252                 list_del_init(&extent->oap_page_list);
253
254         return have_lock;
255 }
256
257 /* Request the @extent to be removed from cache and locks it belongs to. */
258 void cache_remove_extent(struct lustre_cache *cache,
259                          struct osc_async_page *extent)
260 {
261         struct ldlm_lock *lock;
262
263         spin_lock(&extent->oap_lock);
264         lock = extent->oap_ldlm_lock;
265
266         extent->oap_ldlm_lock = NULL;
267         spin_unlock(&extent->oap_lock);
268
269         /* No lock - means this extent is not in any list */
270         if (!lock)
271                 return;
272
273         spin_lock(&lock->l_extents_list_lock);
274         if (!list_empty(&extent->oap_page_list))
275                 list_del_init(&extent->oap_page_list);
276         spin_unlock(&lock->l_extents_list_lock);
277 }
278
279 /* iterate through list of extents in given lock identified by @lockh,
280    calling @cb_func for every such extent. also passed @data to every call.
281    stops iterating prematurely if @cb_func returns nonzero. */
282 int cache_iterate_extents(struct lustre_cache *cache,
283                           struct lustre_handle *lockh,
284                           cache_iterate_extents_cb_t cb_func, void *data)
285 {
286         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
287         struct osc_async_page *extent, *t;
288
289         if (!lock)      // Lock disappeared
290                 return 0;
291         /* Parallel page removal from mem pressure can race with us */
292         spin_lock(&lock->l_extents_list_lock);
293         list_for_each_entry_safe(extent, t, &lock->l_extents_list,
294                                  oap_page_list) {
295                 if (cb_func(cache, lockh, extent, data))
296                         break;
297         }
298         spin_unlock(&lock->l_extents_list_lock);
299         LDLM_LOCK_PUT(lock);
300
301         return 0;
302 }
303
304 static int cache_remove_extents_from_lock(struct lustre_cache *cache,
305                                           struct ldlm_lock *lock, void *data)
306 {
307         struct osc_async_page *extent;
308         void *ext_data;
309
310         LASSERT(lock);
311
312         spin_lock(&lock->l_extents_list_lock);
313         while (!list_empty(&lock->l_extents_list)) {
314                 extent = list_entry(lock->l_extents_list.next,
315                                     struct osc_async_page, oap_page_list);
316
317                 spin_lock(&extent->oap_lock);
318                 /* If there is no lock referenced from this oap, it means
319                    there is parallel page-removal process waiting to free that
320                    page on l_extents_list_lock and it holds page lock.
321                    We need this page to completely go away and for that to
322                    happen we will just try to truncate it here too.
323                    Serialisation on page lock will achieve that goal for us. */
324                 /* Try to add extent back to the cache first, but only if we
325                  * cancel read lock, write locks cannot have other overlapping
326                  * locks. If adding is not possible (or canceling pw lock),
327                  * then remove extent from cache */
328                 if (!cache_remove_extent_nolock(cache, extent) ||
329                     (lock->l_granted_mode == LCK_PW) ||
330                     cache_add_extent(cache, &lock->l_resource->lr_name, extent,
331                                      NULL)) {
332                         /* We need to remember this oap_page value now,
333                            once we release spinlocks, extent struct
334                            might be freed and we endup requesting
335                            page with address 0x5a5a5a5a in
336                            cache_extent_removal_event */
337                         ext_data = extent->oap_page;
338                         cache->lc_pin_extent_cb(extent->oap_page);
339                         spin_unlock(&extent->oap_lock);
340                         spin_unlock(&lock->l_extents_list_lock);
341                         cache_extent_removal_event(cache, ext_data,
342                                                    lock->
343                                                    l_flags &
344                                                    LDLM_FL_DISCARD_DATA);
345                         spin_lock(&lock->l_extents_list_lock);
346                 } else {
347                         spin_unlock(&extent->oap_lock);
348                 }
349         }
350         spin_unlock(&lock->l_extents_list_lock);
351
352         return 0;
353 }
354
355 /* Remoes @lock from cache after necessary checks. */
356 int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
357 {
358         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
359
360         if (!lock)  // The lock was removed by somebody just now, nothing to do
361                 return 0;
362
363         cache_remove_extents_from_lock(cache, lock, NULL /*data */ );
364
365         spin_lock(&cache->lc_locks_list_lock);
366         list_del_init(&lock->l_cache_locks_list);
367         spin_unlock(&cache->lc_locks_list_lock);
368
369         LDLM_LOCK_PUT(lock);
370
371         return 0;
372 }
373
374 /* Supposed to iterate through all locks in the cache for given resource.
375    Not implemented atthe moment. */
376 int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
377                         cache_iterate_locks_cb_t cb_fun, void *data)
378 {
379         return -ENOTSUPP;
380 }
381
382 /* Create lustre cache and attach it to @obd */
383 struct lustre_cache *cache_create(struct obd_device *obd)
384 {
385         struct lustre_cache *cache;
386
387         OBD_ALLOC(cache, sizeof(*cache));
388         if (!cache)
389                 GOTO(out, NULL);
390         spin_lock_init(&cache->lc_locks_list_lock);
391         CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
392         CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
393         rwlock_init(&cache->lc_page_removal_cb_lock);
394         cache->lc_obd = obd;
395
396       out:
397         return cache;
398 }
399
400 /* Destroy @cache and free its memory */
401 int cache_destroy(struct lustre_cache *cache)
402 {
403         if (cache) {
404                 spin_lock(&cache->lc_locks_list_lock);
405                 if (!list_empty(&cache->lc_locks_list)) {
406                         struct ldlm_lock *lock, *tmp;
407                         CERROR("still have locks in the list on cleanup:\n");
408
409                         list_for_each_entry_safe(lock, tmp,
410                                                  &cache->lc_locks_list,
411                                                  l_cache_locks_list) {
412                                 list_del_init(&lock->l_cache_locks_list);
413                                 /* XXX: Of course natural idea would be to print
414                                    offending locks here, but if we use
415                                    e.g. LDLM_ERROR, we will likely crash here,
416                                    as LDLM error tries to access e.g.
417                                    nonexisting namespace. Normally this kind of
418                                    case could only happen when somebody did not
419                                    release lock reference and we have other ways
420                                    to detect this. */
421                                 /* Make sure there are no pages left under the
422                                    lock */
423                                 LASSERT(list_empty(&lock->l_extents_list));
424                         }
425                 }
426                 spin_unlock(&cache->lc_locks_list_lock);
427                 LASSERT(list_empty(&cache->lc_page_removal_callback_list));
428                 OBD_FREE(cache, sizeof(*cache));
429         }
430
431         return 0;
432 }