Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / lvfs / upcall_cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Supplementary groups cache.
5  *
6  *  Copyright (c) 2004 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_SEC
25
26 #ifndef AUTOCONF_INCLUDED
27 #include <linux/config.h>
28 #endif
29 #include <linux/module.h>
30 #include <linux/kernel.h>
31 #include <linux/mm.h>
32 #include <linux/kmod.h>
33 #include <linux/string.h>
34 #include <linux/stat.h>
35 #include <linux/errno.h>
36 #include <linux/version.h>
37 #include <linux/unistd.h>
38
39 #include <asm/system.h>
40 #include <asm/uaccess.h>
41
42 #include <linux/fs.h>
43 #include <linux/stat.h>
44 #include <asm/uaccess.h>
45 #include <linux/slab.h>
46 #ifdef HAVE_SEGMENT_H
47 # include <asm/segment.h>
48 #endif
49
50 #include <obd_support.h>
51 #include <lustre_lib.h>
52
53 static struct upcall_cache_entry *alloc_entry(struct upcall_cache *cache,
54                                               __u64 key, void *args)
55 {
56         struct upcall_cache_entry *entry;
57
58         OBD_ALLOC_PTR(entry);
59         if (!entry)
60                 return NULL;
61
62         UC_CACHE_SET_NEW(entry);
63         INIT_LIST_HEAD(&entry->ue_hash);
64         entry->ue_key = key;
65         atomic_set(&entry->ue_refcount, 0);
66         init_waitqueue_head(&entry->ue_waitq);
67         if (cache->uc_ops->init_entry)
68                 cache->uc_ops->init_entry(entry, args);
69         return entry;
70 }
71
72 /* protected by cache lock */
73 static void free_entry(struct upcall_cache *cache,
74                        struct upcall_cache_entry *entry)
75 {
76         if (cache->uc_ops->free_entry)
77                 cache->uc_ops->free_entry(cache, entry);
78
79         list_del(&entry->ue_hash);
80         CDEBUG(D_OTHER, "destroy cache entry %p for key "LPU64"\n",
81                entry, entry->ue_key);
82         OBD_FREE_PTR(entry);
83 }
84
85 static inline int upcall_compare(struct upcall_cache *cache,
86                                  struct upcall_cache_entry *entry,
87                                  __u64 key, void *args)
88 {
89         if (entry->ue_key != key)
90                 return -1;
91
92         if (cache->uc_ops->upcall_compare)
93                 return cache->uc_ops->upcall_compare(cache, entry, key, args);
94
95         return 0;
96 }
97
98 static inline int downcall_compare(struct upcall_cache *cache,
99                                    struct upcall_cache_entry *entry,
100                                    __u64 key, void *args)
101 {
102         if (entry->ue_key != key)
103                 return -1;
104
105         if (cache->uc_ops->downcall_compare)
106                 return cache->uc_ops->downcall_compare(cache, entry, key, args);
107
108         return 0;
109 }
110
111 static inline void get_entry(struct upcall_cache_entry *entry)
112 {
113         atomic_inc(&entry->ue_refcount);
114 }
115
116 static inline void put_entry(struct upcall_cache *cache,
117                              struct upcall_cache_entry *entry)
118 {
119         if (atomic_dec_and_test(&entry->ue_refcount) &&
120             (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry))) {
121                 free_entry(cache, entry);
122         }
123 }
124
125 static int check_unlink_entry(struct upcall_cache *cache,
126                               struct upcall_cache_entry *entry)
127 {
128         if (UC_CACHE_IS_VALID(entry) &&
129             time_before(jiffies, entry->ue_expire))
130                 return 0;
131
132         if (UC_CACHE_IS_ACQUIRING(entry)) {
133                 if (time_before(jiffies, entry->ue_acquire_expire))
134                         return 0;
135
136                 UC_CACHE_SET_EXPIRED(entry);
137                 wake_up_all(&entry->ue_waitq);
138         } else if (!UC_CACHE_IS_INVALID(entry)) {
139                 UC_CACHE_SET_EXPIRED(entry);
140         }
141
142         list_del_init(&entry->ue_hash);
143         if (!atomic_read(&entry->ue_refcount))
144                 free_entry(cache, entry);
145         return 1;
146 }
147
148 static inline int refresh_entry(struct upcall_cache *cache,
149                          struct upcall_cache_entry *entry)
150 {
151         LASSERT(cache->uc_ops->do_upcall);
152         return cache->uc_ops->do_upcall(cache, entry);
153 }
154
155 struct upcall_cache_entry *upcall_cache_get_entry(struct upcall_cache *cache,
156                                                   __u64 key, void *args)
157 {
158         struct upcall_cache_entry *entry = NULL, *new = NULL, *next;
159         struct list_head *head;
160         wait_queue_t wait;
161         int rc, found;
162         ENTRY;
163
164         LASSERT(cache);
165
166         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
167 find_again:
168         found = 0;
169         spin_lock(&cache->uc_lock);
170         list_for_each_entry_safe(entry, next, head, ue_hash) {
171                 /* check invalid & expired items */
172                 if (check_unlink_entry(cache, entry))
173                         continue;
174                 if (upcall_compare(cache, entry, key, args) == 0) {
175                         found = 1;
176                         break;
177                 }
178         }
179
180         if (!found) { /* didn't find it */
181                 if (!new) {
182                         spin_unlock(&cache->uc_lock);
183                         new = alloc_entry(cache, key, args);
184                         if (!new) {
185                                 CERROR("fail to alloc entry\n");
186                                 RETURN(ERR_PTR(-ENOMEM));
187                         }
188                         goto find_again;
189                 } else {
190                         list_add(&new->ue_hash, head);
191                         entry = new;
192                 }
193         } else {
194                 if (new) {
195                         free_entry(cache, new);
196                         new = NULL;
197                 }
198                 list_move(&entry->ue_hash, head);
199         }
200         get_entry(entry);
201
202         /* acquire for new one */
203         if (UC_CACHE_IS_NEW(entry)) {
204                 UC_CACHE_SET_ACQUIRING(entry);
205                 UC_CACHE_CLEAR_NEW(entry);
206                 entry->ue_acquire_expire = jiffies + cache->uc_acquire_expire;
207                 spin_unlock(&cache->uc_lock);
208                 rc = refresh_entry(cache, entry);
209                 spin_lock(&cache->uc_lock);
210                 if (rc < 0) {
211                         UC_CACHE_CLEAR_ACQUIRING(entry);
212                         UC_CACHE_SET_INVALID(entry);
213                         if (unlikely(rc == -EREMCHG)) {
214                                 put_entry(cache, entry);
215                                 GOTO(out, entry = ERR_PTR(rc));
216                         }
217                 }
218                 /* fall through */
219         }
220         /* someone (and only one) is doing upcall upon
221          * this item, just wait it complete
222          */
223         if (UC_CACHE_IS_ACQUIRING(entry)) {
224                 unsigned long expiry = jiffies + cache->uc_acquire_expire;
225
226                 init_waitqueue_entry(&wait, current);
227                 add_wait_queue(&entry->ue_waitq, &wait);
228                 set_current_state(TASK_INTERRUPTIBLE);
229                 spin_unlock(&cache->uc_lock);
230
231                 schedule_timeout(cache->uc_acquire_expire);
232
233                 spin_lock(&cache->uc_lock);
234                 remove_wait_queue(&entry->ue_waitq, &wait);
235                 if (UC_CACHE_IS_ACQUIRING(entry)) {
236                         /* we're interrupted or upcall failed in the middle */
237                         rc = time_before(jiffies, expiry) ? -EINTR : -ETIMEDOUT;
238                         put_entry(cache, entry);
239                         CERROR("acquire timeout exceeded for key "LPU64
240                                "\n", entry->ue_key);
241                         GOTO(out, entry = ERR_PTR(rc));
242                 }
243                 /* fall through */
244         }
245
246         /* invalid means error, don't need to try again */
247         if (UC_CACHE_IS_INVALID(entry)) {
248                 put_entry(cache, entry);
249                 GOTO(out, entry = ERR_PTR(-EIDRM));
250         }
251
252         /* check expired
253          * We can't refresh the existing one because some
254          * memory might be shared by multiple processes.
255          */
256         if (check_unlink_entry(cache, entry)) {
257                 /* if expired, try again. but if this entry is
258                  * created by me but too quickly turn to expired
259                  * without any error, should at least give a
260                  * chance to use it once.
261                  */
262                 if (entry != new) {
263                         put_entry(cache, entry);
264                         spin_unlock(&cache->uc_lock);
265                         new = NULL;
266                         goto find_again;
267                 }
268         }
269
270         /* Now we know it's good */
271 out:
272         spin_unlock(&cache->uc_lock);
273         RETURN(entry);
274 }
275 EXPORT_SYMBOL(upcall_cache_get_entry);
276
277 void upcall_cache_put_entry(struct upcall_cache *cache,
278                             struct upcall_cache_entry *entry)
279 {
280         ENTRY;
281
282         if (!entry) {
283                 EXIT;
284                 return;
285         }
286
287         LASSERT(atomic_read(&entry->ue_refcount) > 0);
288         spin_lock(&cache->uc_lock);
289         put_entry(cache, entry);
290         spin_unlock(&cache->uc_lock);
291         EXIT;
292 }
293 EXPORT_SYMBOL(upcall_cache_put_entry);
294
295 int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key,
296                           void *args)
297 {
298         struct upcall_cache_entry *entry = NULL;
299         struct list_head *head;
300         int found = 0, rc = 0;
301         ENTRY;
302
303         LASSERT(cache);
304
305         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
306
307         spin_lock(&cache->uc_lock);
308         list_for_each_entry(entry, head, ue_hash) {
309                 if (downcall_compare(cache, entry, key, args) == 0) {
310                         found = 1;
311                         get_entry(entry);
312                         break;
313                 }
314         }
315
316         if (!found) {
317                 CDEBUG(D_OTHER, "%s: upcall for key "LPU64" not expected\n",
318                        cache->uc_name, key);
319                 /* haven't found, it's possible */
320                 spin_unlock(&cache->uc_lock);
321                 RETURN(-EINVAL);
322         }
323
324         if (err) {
325                 CDEBUG(D_OTHER, "%s: upcall for key "LPU64" returned %d\n",
326                        cache->uc_name, entry->ue_key, err);
327                 GOTO(out, rc = -EINVAL);
328         }
329
330         if (!UC_CACHE_IS_ACQUIRING(entry)) {
331                 CDEBUG(D_RPCTRACE,"%s: found uptodate entry %p (key "LPU64")\n",
332                        cache->uc_name, entry, entry->ue_key);
333                 GOTO(out, rc = 0);
334         }
335
336         if (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry)) {
337                 CERROR("%s: found a stale entry %p (key "LPU64") in ioctl\n",
338                        cache->uc_name, entry, entry->ue_key);
339                 GOTO(out, rc = -EINVAL);
340         }
341
342         spin_unlock(&cache->uc_lock);
343         if (cache->uc_ops->parse_downcall)
344                 rc = cache->uc_ops->parse_downcall(cache, entry, args);
345         spin_lock(&cache->uc_lock);
346         if (rc)
347                 GOTO(out, rc);
348
349         entry->ue_expire = jiffies + cache->uc_entry_expire;
350         UC_CACHE_SET_VALID(entry);
351         CDEBUG(D_OTHER, "%s: created upcall cache entry %p for key "LPU64"\n",
352                cache->uc_name, entry, entry->ue_key);
353 out:
354         if (rc) {
355                 UC_CACHE_SET_INVALID(entry);
356                 list_del_init(&entry->ue_hash);
357         }
358         UC_CACHE_CLEAR_ACQUIRING(entry);
359         spin_unlock(&cache->uc_lock);
360         wake_up_all(&entry->ue_waitq);
361         put_entry(cache, entry);
362
363         RETURN(rc);
364 }
365 EXPORT_SYMBOL(upcall_cache_downcall);
366
367 static void cache_flush(struct upcall_cache *cache, int force)
368 {
369         struct upcall_cache_entry *entry, *next;
370         int i;
371         ENTRY;
372
373         spin_lock(&cache->uc_lock);
374         for (i = 0; i < UC_CACHE_HASH_SIZE; i++) {
375                 list_for_each_entry_safe(entry, next,
376                                          &cache->uc_hashtable[i], ue_hash) {
377                         if (!force && atomic_read(&entry->ue_refcount)) {
378                                 UC_CACHE_SET_EXPIRED(entry);
379                                 continue;
380                         }
381                         LASSERT(!atomic_read(&entry->ue_refcount));
382                         free_entry(cache, entry);
383                 }
384         }
385         spin_unlock(&cache->uc_lock);
386         EXIT;
387 }
388
389 void upcall_cache_flush_idle(struct upcall_cache *cache)
390 {
391         cache_flush(cache, 0);
392 }
393 EXPORT_SYMBOL(upcall_cache_flush_idle);
394
395 void upcall_cache_flush_all(struct upcall_cache *cache)
396 {
397         cache_flush(cache, 1);
398 }
399 EXPORT_SYMBOL(upcall_cache_flush_all);
400
401 void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key, void *args)
402 {
403         struct list_head *head;
404         struct upcall_cache_entry *entry;
405         int found = 0;
406         ENTRY;
407
408         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
409
410         spin_lock(&cache->uc_lock);
411         list_for_each_entry(entry, head, ue_hash) {
412                 if (upcall_compare(cache, entry, key, args) == 0) {
413                         found = 1;
414                         break;
415                 }
416         }
417
418         if (found) {
419                 CWARN("%s: flush entry %p: key "LPU64", ref %d, fl %x, "
420                       "cur %lu, ex %ld/%ld\n",
421                       cache->uc_name, entry, entry->ue_key,
422                       atomic_read(&entry->ue_refcount), entry->ue_flags,
423                       get_seconds(), entry->ue_acquire_expire,
424                       entry->ue_expire);
425                 UC_CACHE_SET_EXPIRED(entry);
426                 if (!atomic_read(&entry->ue_refcount))
427                         free_entry(cache, entry);
428         }
429         spin_unlock(&cache->uc_lock);
430 }
431 EXPORT_SYMBOL(upcall_cache_flush_one);
432
433 struct upcall_cache *upcall_cache_init(const char *name, const char *upcall,
434                                        struct upcall_cache_ops *ops)
435 {
436         struct upcall_cache *cache;
437         int i;
438         ENTRY;
439
440         OBD_ALLOC(cache, sizeof(*cache));
441         if (!cache)
442                 RETURN(ERR_PTR(-ENOMEM));
443
444         spin_lock_init(&cache->uc_lock);
445         rwlock_init(&cache->uc_upcall_rwlock);
446         for (i = 0; i < UC_CACHE_HASH_SIZE; i++)
447                 INIT_LIST_HEAD(&cache->uc_hashtable[i]);
448         strncpy(cache->uc_name, name, sizeof(cache->uc_name) - 1);
449         /* upcall pathname proc tunable */
450         strncpy(cache->uc_upcall, upcall, sizeof(cache->uc_upcall) - 1);
451         cache->uc_entry_expire = 10 * 60 * HZ;
452         cache->uc_acquire_expire = 15 * HZ;
453         cache->uc_ops = ops;
454
455         RETURN(cache);
456 }
457 EXPORT_SYMBOL(upcall_cache_init);
458
459 void upcall_cache_cleanup(struct upcall_cache *cache)
460 {
461         if (!cache)
462                 return;
463         upcall_cache_flush_all(cache);
464         OBD_FREE(cache, sizeof(*cache));
465 }
466 EXPORT_SYMBOL(upcall_cache_cleanup);