Whamcloud - gitweb
add some debug message
[fs/lustre-release.git] / lustre / sec / upcall_cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LOV
26 #include <linux/slab.h>
27 #include <linux/module.h>
28 #include <linux/init.h>
29 #include <linux/slab.h>
30
31 #include <linux/obd_support.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_idl.h>
34 #include <linux/obd_class.h>
35 #include <linux/lustre_ucache.h>
36
37 /* FIXME
38  * current ucache implementation is simply took from group hash code, almost
39  * without any change. it's very simple and have very limited functionality,
40  * and probably it's also only suitable for usage of group hash.
41  */
42
43 void upcall_cache_init_entry(struct upcall_cache *cache,
44                              struct upcall_cache_entry *entry,
45                              __u64 key)
46 {
47         UC_CACHE_SET_NEW(entry);
48         INIT_LIST_HEAD(&entry->ue_hash);
49         atomic_set(&entry->ue_refcount, 0);
50         entry->ue_key = key;
51         entry->ue_cache = cache;
52         init_waitqueue_head(&entry->ue_waitq);
53 }
54 EXPORT_SYMBOL(upcall_cache_init_entry);
55
56 static inline struct upcall_cache_entry *
57 alloc_entry(struct upcall_cache *cache, __u64 key)
58 {
59         LASSERT(cache->alloc_entry);
60         return cache->alloc_entry(cache, key);
61 }
62
63 static void free_entry(struct upcall_cache_entry *entry)
64 {
65         struct upcall_cache *cache = entry->ue_cache;
66
67         LASSERT(cache);
68         LASSERT(cache->free_entry);
69         LASSERT(atomic_read(&entry->ue_refcount) == 0);
70
71         CDEBUG(D_SEC, "%s: destroy entry %p for key "LPU64"\n",
72                cache->uc_name, entry, entry->ue_key);
73
74         list_del(&entry->ue_hash);
75         cache->free_entry(cache, entry);
76 }
77
78 static inline void get_entry(struct upcall_cache_entry *entry)
79 {
80         atomic_inc(&entry->ue_refcount);
81 }
82
83 static inline void put_entry(struct upcall_cache_entry *entry)
84 {
85         if (atomic_dec_and_test(&entry->ue_refcount) &&
86             !UC_CACHE_IS_VALID(entry)) {
87                 free_entry(entry);
88         }
89 }
90
91 static inline int refresh_entry(struct upcall_cache_entry *entry)
92 {
93         struct upcall_cache *cache = entry->ue_cache;
94
95         LASSERT(cache);
96         LASSERT(cache->make_upcall);
97
98         return cache->make_upcall(cache, entry);
99 }
100
101 static int check_unlink_entry(struct upcall_cache_entry *entry)
102 {
103         /* upcall will be issued upon new entries immediately
104          * after they are created
105          */
106         LASSERT(!UC_CACHE_IS_NEW(entry));
107
108         if (UC_CACHE_IS_VALID(entry) &&
109             time_before(get_seconds(), entry->ue_expire))
110                 return 0;
111
112         if (UC_CACHE_IS_ACQUIRING(entry)) {
113                 if (time_before(get_seconds(), entry->ue_acquire_expire))
114                         return 0;
115                 else {
116                         UC_CACHE_SET_EXPIRED(entry);
117                         wake_up_all(&entry->ue_waitq);
118                 }
119         } else if (!UC_CACHE_IS_INVALID(entry)) {
120                 UC_CACHE_SET_EXPIRED(entry);
121         }
122
123         list_del_init(&entry->ue_hash);
124         if (!atomic_read(&entry->ue_refcount))
125                 free_entry(entry);
126         return 1;
127 }
128
129 /* XXX
130  * currently always use write_lock
131  */
132 static struct upcall_cache_entry *
133 __get_entry(struct upcall_cache *cache, unsigned int hash, __u64 key,
134             int create, int async)
135 {
136         struct list_head *head;
137         struct upcall_cache_entry *entry, *next, *new = NULL;
138         int found, rc;
139         ENTRY;
140
141         LASSERT(hash < cache->uc_hashsize);
142         head = &cache->uc_hashtable[hash];
143
144 find_again:
145         found = 0;
146         write_lock(&cache->uc_hashlock);
147         list_for_each_entry_safe(entry, next, head, ue_hash) {
148                 if (check_unlink_entry(entry))
149                         continue;
150                 if (entry->ue_key == key) {
151                         found = 1;
152                         break;
153                 }
154         }
155
156         if (!found) {
157                 if (!create)
158                         RETURN(NULL);
159                 if (!new) {
160                         write_unlock(&cache->uc_hashlock);
161                         new = alloc_entry(cache, key);
162                         if (!new) {
163                                 CERROR("fail to alloc entry\n");
164                                 RETURN(NULL);
165                         }
166                         goto find_again;
167                 } else {
168                         list_add(&new->ue_hash, head);
169                         entry = new;
170                 }
171         } else {
172                 if (new) {
173                         free_entry(new);
174                         new = NULL;
175                 }
176                 list_move(&entry->ue_hash, head);
177         }
178         get_entry(entry);
179
180         /* as for this moment, we have found matched entry
181          * and hold a ref of it. if it's NEW (we created it),
182          * we must give it a push to refresh
183          */
184         if (UC_CACHE_IS_NEW(entry)) {
185                 LASSERT(entry == new);
186                 UC_CACHE_SET_ACQUIRING(entry);
187                 UC_CACHE_CLEAR_NEW(entry);
188                 entry->ue_acquire_expire = get_seconds() +
189                                            cache->uc_acquire_expire;
190                 CWARN("%s: %p: cur %lu(%lu), cache ex %ld\n",
191                       cache->uc_name, entry, get_seconds(), jiffies,
192                       entry->ue_acquire_expire); //XXX
193
194                 write_unlock(&cache->uc_hashlock);
195                 rc = refresh_entry(entry);
196                 write_lock(&cache->uc_hashlock);
197                 if (rc) {
198                         UC_CACHE_CLEAR_ACQUIRING(entry);
199                         UC_CACHE_SET_INVALID(entry);
200                 }
201         }
202
203         /* caller don't want to wait */
204         if (async) {
205                 write_unlock(&cache->uc_hashlock);
206                 RETURN(entry);
207         }
208
209         /* someone (and only one) is doing upcall upon
210          * this item, just wait it complete
211          */
212         if (UC_CACHE_IS_ACQUIRING(entry)) {
213                 signed long tmp1;
214                 wait_queue_t wait;
215
216                 init_waitqueue_entry(&wait, current);
217                 add_wait_queue(&entry->ue_waitq, &wait);
218                 set_current_state(TASK_INTERRUPTIBLE);
219                 write_unlock(&cache->uc_hashlock);
220
221                 tmp1 = schedule_timeout(cache->uc_acquire_expire * HZ);
222
223                 write_lock(&cache->uc_hashlock);
224                 remove_wait_queue(&entry->ue_waitq, &wait);
225                 if (UC_CACHE_IS_ACQUIRING(entry)) {
226                         /* we're interrupted or upcall failed
227                          * in the middle
228                          */
229                         CERROR("cur %lu(%ld), scheduled %ld, sigpending %d\n",
230                                get_seconds(), jiffies, tmp1,
231                                signal_pending(current)); //XXX
232                         CERROR("%s: entry %p not refreshed: key "LPU64", "
233                                "ref %d fl %u, cur %lu, ex %ld/%ld\n",
234                                cache->uc_name, entry, entry->ue_key,
235                                atomic_read(&entry->ue_refcount),
236                                entry->ue_flags, get_seconds(),
237                                entry->ue_acquire_expire, entry->ue_expire);
238                         put_entry(entry);
239                         write_unlock(&cache->uc_hashlock);
240                         CERROR("Interrupted? Or check whether %s is in place\n",
241                                cache->uc_upcall);
242                         RETURN(NULL);
243                 }
244                 /* fall through */
245         }
246
247         /* invalid means error, don't need to try again */
248         if (UC_CACHE_IS_INVALID(entry)) {
249                 put_entry(entry);
250                 write_unlock(&cache->uc_hashlock);
251                 RETURN(NULL);
252         }
253
254         /* check expired 
255          * We can't refresh the existed one because some
256          * memory might be shared by multiple processes.
257          */
258         if (check_unlink_entry(entry)) {
259                 /* if expired, try again. but if this entry is
260                  * created by me and too quickly turn to expired
261                  * without any error, should at least give a
262                  * chance to use it once.
263                  */
264                 if (entry != new) {
265                         put_entry(entry);
266                         write_unlock(&cache->uc_hashlock);
267                         new = NULL;
268                         goto find_again;
269                 }
270         }
271         
272         /* Now we know it's good */
273         write_unlock(&cache->uc_hashlock);
274
275         RETURN(entry);
276 }
277
278 struct upcall_cache_entry *
279 upcall_cache_get_entry(struct upcall_cache *cache, __u64 key)
280 {
281         unsigned int hash;
282
283         LASSERT(cache->hash);
284
285         hash = cache->hash(cache, key);
286
287         return __get_entry(cache, hash, key, 1, 0);
288 }
289 EXPORT_SYMBOL(upcall_cache_get_entry);
290
291 void upcall_cache_put_entry(struct upcall_cache_entry *entry)
292 {
293         struct upcall_cache *cache = entry->ue_cache;
294
295         write_lock(&cache->uc_hashlock);
296         LASSERTF(atomic_read(&entry->ue_refcount) > 0,
297                  "%s: entry %p: ref %d\n", cache->uc_name, entry,
298                  atomic_read(&entry->ue_refcount));
299         put_entry(entry);
300         write_unlock(&cache->uc_hashlock);
301 }
302 EXPORT_SYMBOL(upcall_cache_put_entry);
303
304 int upcall_cache_downcall(struct upcall_cache *cache, __u64 key, void *args)
305 {
306         struct list_head *head;
307         struct upcall_cache_entry *entry;
308         int found = 0, rc;
309         unsigned int hash;
310         ENTRY;
311
312         hash = cache->hash(cache, key);
313         LASSERT(hash < cache->uc_hashsize);
314
315         head = &cache->uc_hashtable[hash];
316
317         write_lock(&cache->uc_hashlock);
318         list_for_each_entry(entry, head, ue_hash) {
319                 if (entry->ue_key == key) {
320                         found = 1;
321                         break;
322                 }
323         }
324         if (!found) {
325                 /* haven't found, it's possible */
326                 write_unlock(&cache->uc_hashlock);
327                 CWARN("%s: key "LPU64" entry dosen't found\n",
328                       cache->uc_name, key);
329                 RETURN(-EINVAL);
330         }
331
332         if (!UC_CACHE_IS_ACQUIRING(entry)) {
333                 if (UC_CACHE_IS_VALID(entry)) {
334                         /* This should not happen, just give a warning
335                          * at this moment.
336                          */
337                         CWARN("%s: entry %p(key "LPU64", cur %lu, ex %ld/%ld) "
338                               "already valid\n", cache->uc_name,
339                               entry, entry->ue_key, get_seconds(),
340                               entry->ue_acquire_expire, entry->ue_expire);
341                         GOTO(out, rc = 0);
342                 }
343
344                 CWARN("%s: stale entry %p: key "LPU64", ref %d, fl %u, "
345                       "cur %lu, ex %ld/%ld\n",
346                       cache->uc_name, entry, entry->ue_key,
347                       atomic_read(&entry->ue_refcount),
348                       entry->ue_flags, get_seconds(),
349                       entry->ue_acquire_expire, entry->ue_expire);
350                 GOTO(out, rc = -EINVAL);
351         }
352
353         if (!UC_CACHE_IS_ACQUIRING(entry) ||
354             UC_CACHE_IS_INVALID(entry) ||
355             UC_CACHE_IS_EXPIRED(entry)) {
356                 CWARN("%s: invalid entry %p: key "LPU64", ref %d, fl %u, "
357                       "cur %lu, ex %ld/%ld\n",
358                       cache->uc_name, entry, entry->ue_key,
359                       atomic_read(&entry->ue_refcount),
360                       entry->ue_flags, get_seconds(), 
361                       entry->ue_acquire_expire, entry->ue_expire);
362                 GOTO(out, rc = -EINVAL);
363         }
364
365         atomic_inc(&entry->ue_refcount);
366         write_unlock(&cache->uc_hashlock);
367         rc = cache->parse_downcall(cache, entry, args);
368         write_lock(&cache->uc_hashlock);
369         atomic_dec(&entry->ue_refcount);
370
371         if (rc < 0) {
372                 UC_CACHE_SET_INVALID(entry);
373                 list_del_init(&entry->ue_hash);
374                 GOTO(out, rc);
375         } else if (rc == 0) {
376                 entry->ue_expire = get_seconds() + cache->uc_entry_expire;
377         } else {
378                 entry->ue_expire = get_seconds() + cache->uc_err_entry_expire;
379         }
380
381         UC_CACHE_SET_VALID(entry);
382         CDEBUG(D_SEC, "%s: create ucache entry %p(key "LPU64")\n",
383                cache->uc_name, entry, entry->ue_key);
384 out:
385         wake_up_all(&entry->ue_waitq);
386         write_unlock(&cache->uc_hashlock);
387         RETURN(rc);
388 }
389 EXPORT_SYMBOL(upcall_cache_downcall);
390
391 void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key)
392 {
393         struct list_head *head;
394         struct upcall_cache_entry *entry;
395         unsigned int hash;
396         int found = 0;
397         ENTRY;
398
399         hash = cache->hash(cache, key);
400         LASSERT(hash < cache->uc_hashsize);
401         head = &cache->uc_hashtable[hash];
402
403         write_lock(&cache->uc_hashlock);
404         list_for_each_entry(entry, head, ue_hash) {
405                 if (entry->ue_key == key) {
406                         found = 1;
407                         break;
408                 }
409         }
410
411         if (found) {
412                 CWARN("%s: flush entry %p: key "LPU64", ref %d, fl %x, "
413                       "cur %lu, ex %ld/%ld\n",
414                       cache->uc_name, entry, entry->ue_key,
415                       atomic_read(&entry->ue_refcount), entry->ue_flags,
416                       get_seconds(), entry->ue_acquire_expire,
417                       entry->ue_expire);
418                 UC_CACHE_SET_EXPIRED(entry);
419                 if (!atomic_read(&entry->ue_refcount))
420                         free_entry(entry);
421         }
422         write_unlock(&cache->uc_hashlock);
423 }
424 EXPORT_SYMBOL(upcall_cache_flush_one);
425
426 static void cache_flush(struct upcall_cache *cache, int force, int sync)
427 {
428         struct upcall_cache_entry *entry, *next;
429         int i;
430         ENTRY;
431
432         write_lock(&cache->uc_hashlock);
433         for (i = 0; i < cache->uc_hashsize; i++) {
434                 list_for_each_entry_safe(entry, next,
435                                          &cache->uc_hashtable[i], ue_hash) {
436                         if (!force && atomic_read(&entry->ue_refcount)) {
437                                 UC_CACHE_SET_EXPIRED(entry);
438                                 continue;
439                         }
440                         LASSERT(!atomic_read(&entry->ue_refcount));
441                         free_entry(entry);
442                 }
443         }
444         write_unlock(&cache->uc_hashlock);
445         EXIT;
446 }
447
448 void upcall_cache_flush_idle(struct upcall_cache *cache)
449 {
450         cache_flush(cache, 0, 0);
451 }
452
453 void upcall_cache_flush_all(struct upcall_cache *cache)
454 {
455         cache_flush(cache, 1, 0);
456 }
457 EXPORT_SYMBOL(upcall_cache_flush_idle);
458 EXPORT_SYMBOL(upcall_cache_flush_all);