Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Oleg Drokin <green@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  * Cache of triples - object, lock, extent
26  */
27
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_OSC
32
33 #ifdef __KERNEL__
34 # include <linux/version.h>
35 # include <linux/module.h>
36 # include <linux/list.h>
37 #else                           /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <lustre_dlm.h>
42 #include <lustre_cache.h>
43 #include <obd.h>
44 #include <lustre_debug.h>
45
46 #include "osc_internal.h"
47
48 /* Adding @lock to the @cache */
49 int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
50 {
51         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
52
53         if (!lock)      // Lock disappeared under us.
54                 return 0;
55
56         spin_lock(&cache->lc_locks_list_lock);
57         list_add_tail(&lock->l_cache_locks_list, &cache->lc_locks_list);
58         spin_unlock(&cache->lc_locks_list_lock);
59
60         LDLM_LOCK_PUT(lock);
61
62         return 0;
63 }
64
65 /* Tries to add @extent to lock represented by @lockh if non-NULL, otherwise
66    just tries to match some suitable lock by resource and data contained in
67    @extent */
68 /* Should be called with oap->lock held (except on initial addition, see
69    comment in osc_request.c*/
70 int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
71                      struct osc_async_page *extent, struct lustre_handle *lockh)
72 {
73         struct lustre_handle tmplockh;
74         ldlm_policy_data_t tmpex;
75         struct ldlm_lock *lock = NULL;
76         ENTRY;
77
78         /* Don't add anything second time */
79         if (!list_empty(&extent->oap_page_list)) {
80                 LBUG();
81                 RETURN(0);
82         }
83
84         if (lockh && lustre_handle_is_used(lockh)) {
85                 lock = ldlm_handle2lock(lockh);
86                 if (!lock)
87                         RETURN(-ENOLCK);
88
89                 LASSERTF(lock->l_policy_data.l_extent.start <=
90                          extent->oap_obj_off &&
91                          extent->oap_obj_off + CFS_PAGE_SIZE - 1 <=
92                          lock->l_policy_data.l_extent.end,
93                          "Got wrong lock [" LPU64 "," LPU64 "] for page with "
94                          "offset " LPU64 "\n",
95                          lock->l_policy_data.l_extent.start,
96                          lock->l_policy_data.l_extent.end, extent->oap_obj_off);
97         } else {
98                 int mode;
99                 /* Real extent width calculation here once we have real
100                  * extents
101                  */
102                 tmpex.l_extent.start = extent->oap_obj_off;
103                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
104
105                 /* XXX find lock from extent or something like that */
106                 /* The lock mode does not matter. If this is dirty page - then
107                  * there could be only one PW lock. If the page is clean,
108                  * any PR lock is good
109                  */
110
111                 mode = ldlm_lock_match(cache->lc_obd->obd_namespace,
112                                        LDLM_FL_BLOCK_GRANTED |
113                                        LDLM_FL_CBPENDING, res, LDLM_EXTENT,
114                                        &tmpex, LCK_PW | LCK_PR, &tmplockh);
115
116                 if (mode <= 0) {
117                         CDEBUG(D_CACHE, "No lock to attach " LPU64 "->" LPU64
118                                " extent to!\n", tmpex.l_extent.start,
119                                tmpex.l_extent.end);
120                         RETURN((mode < 0) ? mode : -ENOLCK);
121                 }
122
123                 lock = ldlm_handle2lock(&tmplockh);
124                 if (!lock) {    // Race - lock disappeared under us (eviction?)
125                         CDEBUG(D_CACHE, "Newly matched lock just disappeared "
126                                "under us\n");
127                         RETURN(-ENOLCK);
128                 }
129                 ldlm_lock_decref(&tmplockh, mode);
130         }
131
132         spin_lock(&lock->l_extents_list_lock);
133         list_add_tail(&extent->oap_page_list, &lock->l_extents_list);
134         spin_unlock(&lock->l_extents_list_lock);
135         extent->oap_ldlm_lock = lock;
136         LDLM_LOCK_PUT(lock);
137
138         RETURN(0);
139 }
140
141 static int cache_extent_removal_event(struct lustre_cache *cache,
142                                       void *data, int discard)
143 {
144         struct page *page = data;
145         struct page_removal_cb_element *element;
146
147         list_for_each_entry(element, &cache->lc_page_removal_callback_list,
148                             prce_list) {
149                 element->prce_callback(page, discard);
150         }
151         return 0;
152 }
153
154 /* Registers set of pin/remove callbacks for extents. Current limitation is
155    there could be only one pin_cb per cache.
156    @pin_cb is called when we have the page locked to pin it in memory so that
157    it does not disappear after we release page lock (which we need to do
158    to avoid deadlocks).
159    @func_cb is removal callback that is called after page and all spinlocks are
160    released, and is supposed to clean the page and remove it from all
161    (vfs) caches it might be in */
162 int cache_add_extent_removal_cb(struct lustre_cache *cache,
163                                 obd_page_removal_cb_t func_cb,
164                                 obd_pin_extent_cb pin_cb)
165 {
166         struct page_removal_cb_element *element;
167
168         if (!func_cb)
169                 return 0;
170         OBD_ALLOC(element, sizeof(*element));
171         if (!element)
172                 return -ENOMEM;
173         element->prce_callback = func_cb;
174         list_add_tail(&element->prce_list,
175                       &cache->lc_page_removal_callback_list);
176
177         cache->lc_pin_extent_cb = pin_cb;
178         return 0;
179 }
180 EXPORT_SYMBOL(cache_add_extent_removal_cb);
181
182 /* Unregister exntent removal callback registered earlier. If the list of
183    registered removal callbacks becomes empty, we also clear pin callback
184    since it could only be one */
185 int cache_del_extent_removal_cb(struct lustre_cache *cache,
186                                 obd_page_removal_cb_t func_cb)
187 {
188         int found = 0;
189         struct page_removal_cb_element *element, *t;
190
191         list_for_each_entry_safe(element, t,
192                                  &cache->lc_page_removal_callback_list,
193                                  prce_list) {
194                 if (element->prce_callback == func_cb) {
195                         list_del(&element->prce_list);
196                         OBD_FREE(element, sizeof(*element));
197                         found = 1;
198                         /* We continue iterating the list in case this function
199                            was registered more than once */
200                 }
201         }
202
203         if (list_empty(&cache->lc_page_removal_callback_list))
204                 cache->lc_pin_extent_cb = NULL;
205
206         return !found;
207 }
208 EXPORT_SYMBOL(cache_del_extent_removal_cb);
209
210 static int cache_remove_extent_nolock(struct lustre_cache *cache,
211                                       struct osc_async_page *extent)
212 {
213         int have_lock = !!extent->oap_ldlm_lock;
214         /* We used to check oap_ldlm_lock for non NULL here, but it might be
215            NULL, in fact, due to parallel page eviction clearing it and waiting
216            on a lock's page list lock */
217         extent->oap_ldlm_lock = NULL;
218
219         if (!list_empty(&extent->oap_page_list))
220                 list_del_init(&extent->oap_page_list);
221
222         return have_lock;
223 }
224
225 /* Request the @extent to be removed from cache and locks it belongs to. */
226 void cache_remove_extent(struct lustre_cache *cache,
227                          struct osc_async_page *extent)
228 {
229         struct ldlm_lock *lock;
230
231         spin_lock(&extent->oap_lock);
232         lock = extent->oap_ldlm_lock;
233
234         extent->oap_ldlm_lock = NULL;
235         spin_unlock(&extent->oap_lock);
236
237         /* No lock - means this extent is not in any list */
238         if (!lock)
239                 return;
240
241         spin_lock(&lock->l_extents_list_lock);
242         if (!list_empty(&extent->oap_page_list))
243                 list_del_init(&extent->oap_page_list);
244         spin_unlock(&lock->l_extents_list_lock);
245 }
246
247 /* iterate through list of extents in given lock identified by @lockh,
248    calling @cb_func for every such extent. also passed @data to every call.
249    stops iterating prematurely if @cb_func returns nonzero. */
250 int cache_iterate_extents(struct lustre_cache *cache,
251                           struct lustre_handle *lockh,
252                           cache_iterate_extents_cb_t cb_func, void *data)
253 {
254         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
255         struct osc_async_page *extent, *t;
256
257         if (!lock)      // Lock disappeared
258                 return 0;
259         /* Parallel page removal from mem pressure can race with us */
260         spin_lock(&lock->l_extents_list_lock);
261         list_for_each_entry_safe(extent, t, &lock->l_extents_list,
262                                  oap_page_list) {
263                 if (cb_func(cache, lockh, extent, data))
264                         break;
265         }
266         spin_unlock(&lock->l_extents_list_lock);
267         LDLM_LOCK_PUT(lock);
268
269         return 0;
270 }
271
272 static int cache_remove_extents_from_lock(struct lustre_cache *cache,
273                                           struct ldlm_lock *lock, void *data)
274 {
275         struct osc_async_page *extent;
276         void *ext_data;
277
278         LASSERT(lock);
279
280         spin_lock(&lock->l_extents_list_lock);
281         while (!list_empty(&lock->l_extents_list)) {
282                 extent = list_entry(lock->l_extents_list.next,
283                                     struct osc_async_page, oap_page_list);
284
285                 spin_lock(&extent->oap_lock);
286                 /* If there is no lock referenced from this oap, it means
287                    there is parallel page-removal process waiting to free that
288                    page on l_extents_list_lock and it holds page lock.
289                    We need this page to completely go away and for that to
290                    happen we will just try to truncate it here too.
291                    Serialisation on page lock will achieve that goal for us. */
292                 /* Try to add extent back to the cache first, but only if we
293                  * cancel read lock, write locks cannot have other overlapping
294                  * locks. If adding is not possible (or canceling pw lock),
295                  * then remove extent from cache */
296                 if (!cache_remove_extent_nolock(cache, extent) ||
297                     (lock->l_granted_mode == LCK_PW) ||
298                     cache_add_extent(cache, &lock->l_resource->lr_name, extent,
299                                      NULL)) {
300                         /* We need to remember this oap_page value now,
301                            once we release spinlocks, extent struct
302                            might be freed and we endup requesting
303                            page with address 0x5a5a5a5a in
304                            cache_extent_removal_event */
305                         ext_data = extent->oap_page;
306                         cache->lc_pin_extent_cb(extent->oap_page);
307                         spin_unlock(&extent->oap_lock);
308                         spin_unlock(&lock->l_extents_list_lock);
309                         cache_extent_removal_event(cache, ext_data,
310                                                    lock->
311                                                    l_flags &
312                                                    LDLM_FL_DISCARD_DATA);
313                         spin_lock(&lock->l_extents_list_lock);
314                 } else {
315                         spin_unlock(&extent->oap_lock);
316                 }
317         }
318         spin_unlock(&lock->l_extents_list_lock);
319
320         return 0;
321 }
322
323 /* Remoes @lock from cache after necessary checks. */
324 int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
325 {
326         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
327
328         if (!lock)  // The lock was removed by somebody just now, nothing to do
329                 return 0;
330
331         cache_remove_extents_from_lock(cache, lock, NULL /*data */ );
332
333         spin_lock(&cache->lc_locks_list_lock);
334         list_del_init(&lock->l_cache_locks_list);
335         spin_unlock(&cache->lc_locks_list_lock);
336
337         LDLM_LOCK_PUT(lock);
338
339         return 0;
340 }
341
342 /* Supposed to iterate through all locks in the cache for given resource.
343    Not implemented atthe moment. */
344 int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
345                         cache_iterate_locks_cb_t cb_fun, void *data)
346 {
347         return -ENOTSUPP;
348 }
349
350 /* Create lustre cache and attach it to @obd */
351 struct lustre_cache *cache_create(struct obd_device *obd)
352 {
353         struct lustre_cache *cache;
354
355         OBD_ALLOC(cache, sizeof(*cache));
356         if (!cache)
357                 GOTO(out, NULL);
358         spin_lock_init(&cache->lc_locks_list_lock);
359         CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
360         CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
361         cache->lc_obd = obd;
362
363       out:
364         return cache;
365 }
366
367 /* Destroy @cache and free its memory */
368 int cache_destroy(struct lustre_cache *cache)
369 {
370         if (cache) {
371                 spin_lock(&cache->lc_locks_list_lock);
372                 if (!list_empty(&cache->lc_locks_list)) {
373                         struct ldlm_lock *lock, *tmp;
374                         CERROR("still have locks in the list on cleanup:\n");
375
376                         list_for_each_entry_safe(lock, tmp,
377                                                  &cache->lc_locks_list,
378                                                  l_cache_locks_list) {
379                                 list_del_init(&lock->l_cache_locks_list);
380                                 /* XXX: Of course natural idea would be to print
381                                    offending locks here, but if we use
382                                    e.g. LDLM_ERROR, we will likely crash here,
383                                    as LDLM error tries to access e.g.
384                                    nonexisting namespace. Normally this kind of
385                                    case could only happen when somebody did not
386                                    release lock reference and we have other ways
387                                    to detect this. */
388                                 /* Make sure there are no pages left under the
389                                    lock */
390                                 LASSERT(list_empty(&lock->l_extents_list));
391                         }
392                 }
393                 spin_unlock(&cache->lc_locks_list_lock);
394                 LASSERT(list_empty(&cache->lc_page_removal_callback_list));
395                 OBD_FREE(cache, sizeof(*cache));
396         }
397
398         return 0;
399 }