Whamcloud - gitweb
b=3031
[fs/lustre-release.git] / lustre / sec / upcall_cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LOV
26 #include <linux/slab.h>
27 #include <linux/module.h>
28 #include <linux/init.h>
29 #include <linux/slab.h>
30
31 #include <linux/obd_support.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_idl.h>
34 #include <linux/obd_class.h>
35 #include <linux/lustre_ucache.h>
36
37 /* FIXME
38  * current ucache implementation is simply took from group hash code, almost
39  * without any change. it's very simple and have very limited functionality,
40  * and probably it's also only suitable for usage of group hash.
41  */
42
43 void upcall_cache_init_entry(struct upcall_cache *cache,
44                              struct upcall_cache_entry *entry,
45                              __u64 key)
46 {
47         UC_CACHE_SET_NEW(entry);
48         INIT_LIST_HEAD(&entry->ue_hash);
49         atomic_set(&entry->ue_refcount, 0);
50         entry->ue_key = key;
51         entry->ue_cache = cache;
52         init_waitqueue_head(&entry->ue_waitq);
53 }
54 EXPORT_SYMBOL(upcall_cache_init_entry);
55
56 static inline struct upcall_cache_entry *
57 alloc_entry(struct upcall_cache *cache, __u64 key)
58 {
59         LASSERT(cache->alloc_entry);
60         return cache->alloc_entry(cache, key);
61 }
62
63 static void free_entry(struct upcall_cache_entry *entry)
64 {
65         struct upcall_cache *cache = entry->ue_cache;
66
67         LASSERT(cache);
68         LASSERT(cache->free_entry);
69         LASSERT(atomic_read(&entry->ue_refcount) == 0);
70
71         CDEBUG(D_OTHER, "destroy %s entry %p for key "LPU64"\n",
72                cache->uc_name, entry, entry->ue_key);
73
74         list_del(&entry->ue_hash);
75         cache->free_entry(cache, entry);
76 }
77
78 static inline void get_entry(struct upcall_cache_entry *entry)
79 {
80         atomic_inc(&entry->ue_refcount);
81 }
82
83 static inline void put_entry(struct upcall_cache_entry *entry)
84 {
85         if (atomic_dec_and_test(&entry->ue_refcount) &&
86             !UC_CACHE_IS_VALID(entry)) {
87                 free_entry(entry);
88         }
89 }
90
91 static inline int refresh_entry(struct upcall_cache_entry *entry)
92 {
93         struct upcall_cache *cache = entry->ue_cache;
94
95         LASSERT(cache);
96         LASSERT(cache->make_upcall);
97
98         return cache->make_upcall(cache, entry);
99 }
100
101 static int check_unlink_entry(struct upcall_cache_entry *entry)
102 {
103         if (UC_CACHE_IS_VALID(entry) &&
104             time_before(get_seconds(), entry->ue_expire))
105                 return 0;
106
107         if (UC_CACHE_IS_ACQUIRING(entry) &&
108             time_after(get_seconds(), entry->ue_acquire_expire)) {
109                 UC_CACHE_SET_EXPIRED(entry);
110                 wake_up_all(&entry->ue_waitq);
111         } else if (!UC_CACHE_IS_INVALID(entry)) {
112                 UC_CACHE_SET_EXPIRED(entry);
113         }
114
115         list_del_init(&entry->ue_hash);
116         if (!atomic_read(&entry->ue_refcount))
117                 free_entry(entry);
118         return 1;
119 }
120
121 /* XXX
122  * currently always use write_lock
123  */
124 static struct upcall_cache_entry *
125 __get_entry(struct upcall_cache *cache, unsigned int hash, __u64 key,
126             int create, int async)
127 {
128         struct list_head *head;
129         struct upcall_cache_entry *entry, *next, *new = NULL;
130         int found = 0, rc;
131         ENTRY;
132
133         LASSERT(hash < cache->uc_hashsize);
134
135         head = &cache->uc_hashtable[hash];
136
137 find_again:
138         write_lock(&cache->uc_hashlock);
139         list_for_each_entry_safe(entry, next, head, ue_hash) {
140                 if (check_unlink_entry(entry))
141                         continue;
142                 if (entry->ue_key == key) {
143                         found = 1;
144                         break;
145                 }
146         }
147
148         if (!found) {
149                 if (!create)
150                         RETURN(NULL);
151                 if (!new) {
152                         write_unlock(&cache->uc_hashlock);
153                         new = alloc_entry(cache, key);
154                         if (!new) {
155                                 CERROR("fail to alloc entry\n");
156                                 RETURN(NULL);
157                         }
158                         goto find_again;
159                 } else {
160                         list_add(&new->ue_hash, head);
161                         entry = new;
162                 }
163         } else {
164                 if (new) {
165                         free_entry(new);
166                         new = NULL;
167                 }
168                 list_move(&entry->ue_hash, head);
169         }
170         get_entry(entry);
171
172         /* as for this moment, we have found matched entry
173          * and hold a ref of it. if it's NEW (we created it),
174          * we must give it a push to refresh
175          */
176         if (UC_CACHE_IS_NEW(entry)) {
177                 LASSERT(entry == new);
178                 UC_CACHE_SET_ACQUIRING(entry);
179                 UC_CACHE_CLEAR_NEW(entry);
180                 entry->ue_acquire_expire = get_seconds() +
181                                            cache->uc_acquire_expire;
182
183                 write_unlock(&cache->uc_hashlock);
184                 rc = refresh_entry(entry);
185                 write_lock(&cache->uc_hashlock);
186                 if (rc) {
187                         UC_CACHE_CLEAR_ACQUIRING(entry);
188                         UC_CACHE_SET_INVALID(entry);
189                 }
190         }
191
192         /* caller don't want to wait */
193         if (async) {
194                 write_unlock(&cache->uc_hashlock);
195                 RETURN(entry);
196         }
197
198         /* someone (and only one) is doing upcall upon
199          * this item, just wait it complete
200          */
201         if (UC_CACHE_IS_ACQUIRING(entry)) {
202                 wait_queue_t wait;
203
204                 init_waitqueue_entry(&wait, current);
205                 add_wait_queue(&entry->ue_waitq, &wait);
206                 set_current_state(TASK_INTERRUPTIBLE);
207                 write_unlock(&cache->uc_hashlock);
208
209                 schedule_timeout(cache->uc_acquire_expire * HZ);
210
211                 write_lock(&cache->uc_hashlock);
212                 remove_wait_queue(&entry->ue_waitq, &wait);
213                 if (UC_CACHE_IS_ACQUIRING(entry)) {
214                         /* we're interrupted or upcall failed
215                          * in the middle
216                          */
217                         CERROR("entry %p not refreshed: cur %lu, key "LPU64", "
218                                "ref %d fl %u, ac %ld, ex %ld\n",
219                                entry, get_seconds(), entry->ue_key,
220                                atomic_read(&entry->ue_refcount),
221                                entry->ue_flags, entry->ue_acquire_expire,
222                                entry->ue_expire);
223                         put_entry(entry);
224                         write_unlock(&cache->uc_hashlock);
225                         CERROR("Interrupted? Or check whether %s is in place\n",
226                                cache->uc_upcall);
227                         RETURN(NULL);
228                 }
229                 /* fall through */
230         }
231
232         /* invalid means error, don't need to try again */
233         if (UC_CACHE_IS_INVALID(entry)) {
234                 put_entry(entry);
235                 write_unlock(&cache->uc_hashlock);
236                 RETURN(NULL);
237         }
238
239         /* check expired 
240          * We can't refresh the existed one because some
241          * memory might be shared by multiple processes.
242          */
243         if (check_unlink_entry(entry)) {
244                 /* if expired, try again. but if this entry is
245                  * created by me and too quickly turn to expired
246                  * without any error, should at least give a
247                  * chance to use it once.
248                  */
249                 if (entry != new) {
250                         put_entry(entry);
251                         write_unlock(&cache->uc_hashlock);
252                         new = NULL;
253                         goto find_again;
254                 }
255         }
256         
257         /* Now we know it's good */
258         LASSERT(UC_CACHE_IS_VALID(entry));
259         write_unlock(&cache->uc_hashlock);
260
261         RETURN(entry);
262 }
263
264 struct upcall_cache_entry *
265 upcall_cache_get_entry(struct upcall_cache *cache, __u64 key)
266 {
267         unsigned int hash;
268
269         LASSERT(cache->hash);
270
271         hash = cache->hash(cache, key);
272
273         return __get_entry(cache, hash, key, 1, 0);
274 }
275 EXPORT_SYMBOL(upcall_cache_get_entry);
276
277 void upcall_cache_put_entry(struct upcall_cache_entry *entry)
278 {
279         struct upcall_cache *cache = entry->ue_cache;
280
281         write_lock(&cache->uc_hashlock);
282         LASSERTF(atomic_read(&entry->ue_refcount) > 0,
283                  "entry %p: ref %d\n", entry, atomic_read(&entry->ue_refcount));
284         put_entry(entry);
285         write_unlock(&cache->uc_hashlock);
286 }
287 EXPORT_SYMBOL(upcall_cache_put_entry);
288
289 int upcall_cache_downcall(struct upcall_cache *cache, __u64 key, void *args)
290 {
291         struct list_head *head;
292         struct upcall_cache_entry *entry;
293         int found = 0, rc;
294         unsigned int hash;
295         ENTRY;
296
297         hash = cache->hash(cache, key);
298         LASSERT(hash < cache->uc_hashsize);
299
300         head = &cache->uc_hashtable[hash];
301
302         write_lock(&cache->uc_hashlock);
303         list_for_each_entry(entry, head, ue_hash) {
304                 if (entry->ue_key == key) {
305                         found = 1;
306                         break;
307                 }
308         }
309         if (!found) {
310                 /* haven't found, it's possible */
311                 write_unlock(&cache->uc_hashlock);
312                 CWARN("key "LPU64" entry dosen't found\n", key);
313                 RETURN(-EINVAL);
314         }
315
316         if (!UC_CACHE_IS_ACQUIRING(entry) ||
317             UC_CACHE_IS_INVALID(entry) ||
318             UC_CACHE_IS_EXPIRED(entry)) {
319                 CWARN("stale entry %p: cur %lu, key "LPU64", ref %d, "
320                       "fl %u, ac %ld, ex %ld\n",
321                        entry, get_seconds(), entry->ue_key,
322                        atomic_read(&entry->ue_refcount), entry->ue_flags,
323                        entry->ue_acquire_expire, entry->ue_expire);
324                 GOTO(out, rc = -EINVAL);
325         }
326
327         atomic_inc(&entry->ue_refcount);
328         write_unlock(&cache->uc_hashlock);
329         rc = cache->parse_downcall(cache, entry, args);
330         write_lock(&cache->uc_hashlock);
331         atomic_dec(&entry->ue_refcount);
332
333         if (rc < 0) {
334                 UC_CACHE_SET_INVALID(entry);
335                 list_del_init(&entry->ue_hash);
336                 GOTO(out, rc);
337         } else if (rc == 0) {
338                 entry->ue_expire = get_seconds() + cache->uc_entry_expire;
339         } else {
340                 entry->ue_expire = get_seconds() + cache->uc_err_entry_expire;
341         }
342
343         UC_CACHE_SET_VALID(entry);
344         CDEBUG(D_OTHER, "create ucache entry %p(key "LPU64")\n",
345                entry, entry->ue_key);
346 out:
347         wake_up_all(&entry->ue_waitq);
348         write_unlock(&cache->uc_hashlock);
349         RETURN(rc);
350 }
351 EXPORT_SYMBOL(upcall_cache_downcall);
352
353 void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key)
354 {
355         struct list_head *head;
356         struct upcall_cache_entry *entry;
357         unsigned int hash;
358         int found = 0;
359         ENTRY;
360
361         hash = cache->hash(cache, key);
362         LASSERT(hash < cache->uc_hashsize);
363
364         head = &cache->uc_hashtable[hash];
365
366         write_lock(&cache->uc_hashlock);
367         list_for_each_entry(entry, head, ue_hash) {
368                 if (entry->ue_key == key) {
369                         found = 1;
370                         break;
371                 }
372         }
373
374         if (found) {
375                 UC_CACHE_SET_EXPIRED(entry);
376                 if (!atomic_read(&entry->ue_refcount))
377                         free_entry(entry);
378         }
379         write_unlock(&cache->uc_hashlock);
380 }
381 EXPORT_SYMBOL(upcall_cache_flush_one);
382
383 static void cache_flush(struct upcall_cache *cache, int force, int sync)
384 {
385         struct upcall_cache_entry *entry, *next;
386         int i;
387         ENTRY;
388
389         write_lock(&cache->uc_hashlock);
390         for (i = 0; i < cache->uc_hashsize; i++) {
391                 list_for_each_entry_safe(entry, next,
392                                          &cache->uc_hashtable[i], ue_hash) {
393                         if (!force && atomic_read(&entry->ue_refcount)) {
394                                 UC_CACHE_SET_EXPIRED(entry);
395                                 continue;
396                         }
397                         LASSERT(!atomic_read(&entry->ue_refcount));
398                         free_entry(entry);
399                 }
400         }
401         write_unlock(&cache->uc_hashlock);
402         EXIT;
403 }
404
405 void upcall_cache_flush_idle(struct upcall_cache *cache)
406 {
407         cache_flush(cache, 0, 0);
408 }
409
410 void upcall_cache_flush_all(struct upcall_cache *cache)
411 {
412         cache_flush(cache, 1, 0);
413 }
414 EXPORT_SYMBOL(upcall_cache_flush_idle);
415 EXPORT_SYMBOL(upcall_cache_flush_all);