Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / lvfs / upcall_cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Supplementary groups cache.
5  *
6  *  Copyright (c) 2004 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_SEC
25
26 #ifndef AUTOCONF_INCLUDED
27 #include <linux/config.h>
28 #endif
29 #include <linux/module.h>
30 #include <linux/kernel.h>
31 #include <linux/mm.h>
32 #include <linux/kmod.h>
33 #include <linux/string.h>
34 #include <linux/stat.h>
35 #include <linux/errno.h>
36 #include <linux/version.h>
37 #include <linux/unistd.h>
38
39 #include <asm/system.h>
40 #include <asm/uaccess.h>
41
42 #include <linux/fs.h>
43 #include <linux/stat.h>
44 #include <asm/uaccess.h>
45 #include <linux/slab.h>
46 #ifdef HAVE_SEGMENT_H
47 # include <asm/segment.h>
48 #endif
49
50 #include <obd_support.h>
51 #include <lustre_lib.h>
52
53 static struct upcall_cache_entry *alloc_entry(struct upcall_cache *cache,
54                                               __u64 key, void *args)
55 {
56         struct upcall_cache_entry *entry;
57
58         OBD_ALLOC_PTR(entry);
59         if (!entry)
60                 return NULL;
61
62         UC_CACHE_SET_NEW(entry);
63         INIT_LIST_HEAD(&entry->ue_hash);
64         entry->ue_key = key;
65         atomic_set(&entry->ue_refcount, 0);
66         init_waitqueue_head(&entry->ue_waitq);
67         if (cache->uc_ops->init_entry)
68                 cache->uc_ops->init_entry(entry, args);
69         return entry;
70 }
71
72 /* protected by cache lock */
73 static void free_entry(struct upcall_cache *cache,
74                        struct upcall_cache_entry *entry)
75 {
76         if (cache->uc_ops->free_entry)
77                 cache->uc_ops->free_entry(cache, entry);
78
79         list_del(&entry->ue_hash);
80         CDEBUG(D_OTHER, "destroy cache entry %p for key "LPU64"\n",
81                entry, entry->ue_key);
82         OBD_FREE_PTR(entry);
83 }
84
85 static inline int upcall_compare(struct upcall_cache *cache,
86                                  struct upcall_cache_entry *entry,
87                                  __u64 key, void *args)
88 {
89         if (entry->ue_key != key)
90                 return -1;
91
92         if (cache->uc_ops->upcall_compare)
93                 return cache->uc_ops->upcall_compare(cache, entry, key, args);
94
95         return 0;
96 }
97
98 static inline int downcall_compare(struct upcall_cache *cache,
99                                    struct upcall_cache_entry *entry,
100                                    __u64 key, void *args)
101 {
102         if (entry->ue_key != key)
103                 return -1;
104
105         if (cache->uc_ops->downcall_compare)
106                 return cache->uc_ops->downcall_compare(cache, entry, key, args);
107
108         return 0;
109 }
110
111 static inline void get_entry(struct upcall_cache_entry *entry)
112 {
113         atomic_inc(&entry->ue_refcount);
114 }
115
116 static inline void put_entry(struct upcall_cache *cache,
117                              struct upcall_cache_entry *entry)
118 {
119         if (atomic_dec_and_test(&entry->ue_refcount) &&
120             (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry))) {
121                 free_entry(cache, entry);
122         }
123 }
124
125 static int check_unlink_entry(struct upcall_cache *cache,
126                               struct upcall_cache_entry *entry)
127 {
128         if (UC_CACHE_IS_VALID(entry) &&
129             time_before(jiffies, entry->ue_expire))
130                 return 0;
131
132         if (UC_CACHE_IS_ACQUIRING(entry)) {
133                 if (time_before(jiffies, entry->ue_acquire_expire))
134                         return 0;
135
136                 UC_CACHE_SET_EXPIRED(entry);
137                 wake_up_all(&entry->ue_waitq);
138         } else if (!UC_CACHE_IS_INVALID(entry)) {
139                 UC_CACHE_SET_EXPIRED(entry);
140         }
141
142         list_del_init(&entry->ue_hash);
143         if (!atomic_read(&entry->ue_refcount))
144                 free_entry(cache, entry);
145         return 1;
146 }
147
148 static inline int refresh_entry(struct upcall_cache *cache,
149                          struct upcall_cache_entry *entry)
150 {
151         LASSERT(cache->uc_ops->do_upcall);
152         return cache->uc_ops->do_upcall(cache, entry);
153 }
154
155 struct upcall_cache_entry *upcall_cache_get_entry(struct upcall_cache *cache,
156                                                   __u64 key, void *args)
157 {
158         struct upcall_cache_entry *entry = NULL, *new = NULL, *next;
159         struct list_head *head;
160         wait_queue_t wait;
161         int rc, found;
162         ENTRY;
163
164         LASSERT(cache);
165
166         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
167 find_again:
168         found = 0;
169         spin_lock(&cache->uc_lock);
170         list_for_each_entry_safe(entry, next, head, ue_hash) {
171                 /* check invalid & expired items */
172                 if (check_unlink_entry(cache, entry))
173                         continue;
174                 if (upcall_compare(cache, entry, key, args) == 0) {
175                         found = 1;
176                         break;
177                 }
178         }
179
180         if (!found) { /* didn't find it */
181                 if (!new) {
182                         spin_unlock(&cache->uc_lock);
183                         new = alloc_entry(cache, key, args);
184                         if (!new) {
185                                 CERROR("fail to alloc entry\n");
186                                 RETURN(ERR_PTR(-ENOMEM));
187                         }
188                         goto find_again;
189                 } else {
190                         list_add(&new->ue_hash, head);
191                         entry = new;
192                 }
193         } else {
194                 if (new) {
195                         free_entry(cache, new);
196                         new = NULL;
197                 }
198                 list_move(&entry->ue_hash, head);
199         }
200         get_entry(entry);
201
202         /* acquire for new one */
203         if (UC_CACHE_IS_NEW(entry)) {
204                 UC_CACHE_SET_ACQUIRING(entry);
205                 UC_CACHE_CLEAR_NEW(entry);
206                 entry->ue_acquire_expire = jiffies + cache->uc_acquire_expire;
207                 spin_unlock(&cache->uc_lock);
208                 rc = refresh_entry(cache, entry);
209                 spin_lock(&cache->uc_lock);
210                 if (rc < 0) {
211                         UC_CACHE_CLEAR_ACQUIRING(entry);
212                         UC_CACHE_SET_INVALID(entry);
213                 }
214                 /* fall through */
215         }
216         /* someone (and only one) is doing upcall upon
217          * this item, just wait it complete
218          */
219         if (UC_CACHE_IS_ACQUIRING(entry)) {
220                 unsigned long expiry = jiffies + cache->uc_acquire_expire;
221
222                 init_waitqueue_entry(&wait, current);
223                 add_wait_queue(&entry->ue_waitq, &wait);
224                 set_current_state(TASK_INTERRUPTIBLE);
225                 spin_unlock(&cache->uc_lock);
226
227                 schedule_timeout(cache->uc_acquire_expire);
228
229                 spin_lock(&cache->uc_lock);
230                 remove_wait_queue(&entry->ue_waitq, &wait);
231                 if (UC_CACHE_IS_ACQUIRING(entry)) {
232                         /* we're interrupted or upcall failed in the middle */
233                         rc = time_before(jiffies, expiry) ? -EINTR : -ETIMEDOUT;
234                         put_entry(cache, entry);
235                         CERROR("acquire timeout exceeded for key "LPU64
236                                "\n", entry->ue_key);
237                         GOTO(out, entry = ERR_PTR(rc));
238                 }
239                 /* fall through */
240         }
241
242         /* invalid means error, don't need to try again */
243         if (UC_CACHE_IS_INVALID(entry)) {
244                 put_entry(cache, entry);
245                 GOTO(out, entry = ERR_PTR(-EIDRM));
246         }
247
248         /* check expired
249          * We can't refresh the existing one because some
250          * memory might be shared by multiple processes.
251          */
252         if (check_unlink_entry(cache, entry)) {
253                 /* if expired, try again. but if this entry is
254                  * created by me but too quickly turn to expired
255                  * without any error, should at least give a
256                  * chance to use it once.
257                  */
258                 if (entry != new) {
259                         put_entry(cache, entry);
260                         spin_unlock(&cache->uc_lock);
261                         new = NULL;
262                         goto find_again;
263                 }
264         }
265
266         /* Now we know it's good */
267 out:
268         spin_unlock(&cache->uc_lock);
269         RETURN(entry);
270 }
271 EXPORT_SYMBOL(upcall_cache_get_entry);
272
273 void upcall_cache_put_entry(struct upcall_cache *cache,
274                             struct upcall_cache_entry *entry)
275 {
276         ENTRY;
277
278         if (!entry) {
279                 EXIT;
280                 return;
281         }
282
283         LASSERT(atomic_read(&entry->ue_refcount) > 0);
284         spin_lock(&cache->uc_lock);
285         put_entry(cache, entry);
286         spin_unlock(&cache->uc_lock);
287         EXIT;
288 }
289 EXPORT_SYMBOL(upcall_cache_put_entry);
290
291 int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key,
292                           void *args)
293 {
294         struct upcall_cache_entry *entry = NULL;
295         struct list_head *head;
296         int found = 0, rc = 0;
297         ENTRY;
298
299         LASSERT(cache);
300
301         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
302
303         spin_lock(&cache->uc_lock);
304         list_for_each_entry(entry, head, ue_hash) {
305                 if (downcall_compare(cache, entry, key, args) == 0) {
306                         found = 1;
307                         get_entry(entry);
308                         break;
309                 }
310         }
311
312         if (!found) {
313                 CDEBUG(D_OTHER, "%s: upcall for key "LPU64" not expected\n",
314                        cache->uc_name, key);
315                 /* haven't found, it's possible */
316                 spin_unlock(&cache->uc_lock);
317                 RETURN(-EINVAL);
318         }
319
320         if (err) {
321                 CDEBUG(D_OTHER, "%s: upcall for key "LPU64" returned %d\n",
322                        cache->uc_name, entry->ue_key, err);
323                 GOTO(out, rc = -EINVAL);
324         }
325
326         if (!UC_CACHE_IS_ACQUIRING(entry)) {
327                 CDEBUG(D_RPCTRACE,"%s: found uptodate entry %p (key "LPU64")\n",
328                        cache->uc_name, entry, entry->ue_key);
329                 GOTO(out, rc = 0);
330         }
331
332         if (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry)) {
333                 CERROR("%s: found a stale entry %p (key "LPU64") in ioctl\n",
334                        cache->uc_name, entry, entry->ue_key);
335                 GOTO(out, rc = -EINVAL);
336         }
337
338         spin_unlock(&cache->uc_lock);
339         if (cache->uc_ops->parse_downcall)
340                 rc = cache->uc_ops->parse_downcall(cache, entry, args);
341         spin_lock(&cache->uc_lock);
342         if (rc)
343                 GOTO(out, rc);
344
345         entry->ue_expire = jiffies + cache->uc_entry_expire;
346         UC_CACHE_SET_VALID(entry);
347         CDEBUG(D_OTHER, "%s: created upcall cache entry %p for key "LPU64"\n",
348                cache->uc_name, entry, entry->ue_key);
349 out:
350         if (rc) {
351                 UC_CACHE_SET_INVALID(entry);
352                 list_del_init(&entry->ue_hash);
353         }
354         UC_CACHE_CLEAR_ACQUIRING(entry);
355         spin_unlock(&cache->uc_lock);
356         wake_up_all(&entry->ue_waitq);
357         put_entry(cache, entry);
358
359         RETURN(rc);
360 }
361 EXPORT_SYMBOL(upcall_cache_downcall);
362
363 static void cache_flush(struct upcall_cache *cache, int force)
364 {
365         struct upcall_cache_entry *entry, *next;
366         int i;
367         ENTRY;
368
369         spin_lock(&cache->uc_lock);
370         for (i = 0; i < UC_CACHE_HASH_SIZE; i++) {
371                 list_for_each_entry_safe(entry, next,
372                                          &cache->uc_hashtable[i], ue_hash) {
373                         if (!force && atomic_read(&entry->ue_refcount)) {
374                                 UC_CACHE_SET_EXPIRED(entry);
375                                 continue;
376                         }
377                         LASSERT(!atomic_read(&entry->ue_refcount));
378                         free_entry(cache, entry);
379                 }
380         }
381         spin_unlock(&cache->uc_lock);
382         EXIT;
383 }
384
385 void upcall_cache_flush_idle(struct upcall_cache *cache)
386 {
387         cache_flush(cache, 0);
388 }
389 EXPORT_SYMBOL(upcall_cache_flush_idle);
390
391 void upcall_cache_flush_all(struct upcall_cache *cache)
392 {
393         cache_flush(cache, 1);
394 }
395 EXPORT_SYMBOL(upcall_cache_flush_all);
396
397 void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key, void *args)
398 {
399         struct list_head *head;
400         struct upcall_cache_entry *entry;
401         int found = 0;
402         ENTRY;
403
404         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
405
406         spin_lock(&cache->uc_lock);
407         list_for_each_entry(entry, head, ue_hash) {
408                 if (upcall_compare(cache, entry, key, args) == 0) {
409                         found = 1;
410                         break;
411                 }
412         }
413
414         if (found) {
415                 CWARN("%s: flush entry %p: key "LPU64", ref %d, fl %x, "
416                       "cur %lu, ex %ld/%ld\n",
417                       cache->uc_name, entry, entry->ue_key,
418                       atomic_read(&entry->ue_refcount), entry->ue_flags,
419                       get_seconds(), entry->ue_acquire_expire,
420                       entry->ue_expire);
421                 UC_CACHE_SET_EXPIRED(entry);
422                 if (!atomic_read(&entry->ue_refcount))
423                         free_entry(cache, entry);
424         }
425         spin_unlock(&cache->uc_lock);
426 }
427 EXPORT_SYMBOL(upcall_cache_flush_one);
428
429 struct upcall_cache *upcall_cache_init(const char *name, const char *upcall,
430                                        struct upcall_cache_ops *ops)
431 {
432         struct upcall_cache *cache;
433         int i;
434         ENTRY;
435
436         OBD_ALLOC(cache, sizeof(*cache));
437         if (!cache)
438                 RETURN(ERR_PTR(-ENOMEM));
439
440         spin_lock_init(&cache->uc_lock);
441         for (i = 0; i < UC_CACHE_HASH_SIZE; i++)
442                 INIT_LIST_HEAD(&cache->uc_hashtable[i]);
443         strncpy(cache->uc_name, name, sizeof(cache->uc_name) - 1);
444         /* upcall pathname proc tunable */
445         strncpy(cache->uc_upcall, upcall, sizeof(cache->uc_upcall) - 1);
446         cache->uc_entry_expire = 10 * 60 * HZ;
447         cache->uc_acquire_expire = 15 * HZ;
448         cache->uc_ops = ops;
449
450         RETURN(cache);
451 }
452 EXPORT_SYMBOL(upcall_cache_init);
453
454 void upcall_cache_cleanup(struct upcall_cache *cache)
455 {
456         if (!cache)
457                 return;
458         upcall_cache_flush_all(cache);
459         OBD_FREE(cache, sizeof(*cache));
460 }
461 EXPORT_SYMBOL(upcall_cache_cleanup);