Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / lvfs / upcall_cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Supplementary groups cache.
5  *
6  *  Copyright (c) 2004 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_SEC
25
26 #ifdef HAVE_KERNEL_CONFIG_H
27 #include <linux/config.h>
28 #endif
29 #include <linux/module.h>
30 #include <linux/kernel.h>
31 #include <linux/mm.h>
32 #include <linux/kmod.h>
33 #include <linux/string.h>
34 #include <linux/stat.h>
35 #include <linux/errno.h>
36 #include <linux/version.h>
37 #include <linux/unistd.h>
38
39 #include <asm/system.h>
40 #include <asm/uaccess.h>
41
42 #include <linux/fs.h>
43 #include <linux/stat.h>
44 #include <asm/uaccess.h>
45 #include <linux/slab.h>
46 #include <asm/segment.h>
47
48 #include <obd_support.h>
49 #include <lustre_lib.h>
50
51 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,4)
52 struct group_info *groups_alloc(int ngroups)
53 {
54         struct group_info *ginfo;
55
56         LASSERT(ngroups <= NGROUPS_SMALL);
57
58         OBD_ALLOC(ginfo, sizeof(*ginfo) + 1 * sizeof(gid_t *));
59         if (!ginfo)
60                 return NULL;
61         ginfo->ngroups = ngroups;
62         ginfo->nblocks = 1;
63         ginfo->blocks[0] = ginfo->small_block;
64         atomic_set(&ginfo->usage, 1);
65
66         return ginfo;
67 }
68
69 void groups_free(struct group_info *ginfo)
70 {
71         LASSERT(ginfo->ngroups <= NGROUPS_SMALL);
72         LASSERT(ginfo->nblocks == 1);
73         LASSERT(ginfo->blocks[0] == ginfo->small_block);
74
75         OBD_FREE(ginfo, sizeof(*ginfo) + 1 * sizeof(gid_t *));
76 }
77 #endif
78
79 static struct upcall_cache_entry *alloc_entry(struct upcall_cache *cache,
80                                               __u64 key, void *args)
81 {
82         struct upcall_cache_entry *entry;
83
84         OBD_ALLOC_PTR(entry);
85         if (!entry)
86                 return NULL;
87
88         UC_CACHE_SET_NEW(entry);
89         INIT_LIST_HEAD(&entry->ue_hash);
90         entry->ue_key = key;
91         atomic_set(&entry->ue_refcount, 0);
92         init_waitqueue_head(&entry->ue_waitq);
93         if (cache->uc_ops->init_entry)
94                 cache->uc_ops->init_entry(entry, args);
95         return entry;
96 }
97
98 /* protected by cache lock */
99 static void free_entry(struct upcall_cache *cache,
100                        struct upcall_cache_entry *entry)
101 {
102         if (cache->uc_ops->free_entry)
103                 cache->uc_ops->free_entry(cache, entry);
104
105         list_del(&entry->ue_hash);
106         CDEBUG(D_OTHER, "destroy cache entry %p for key "LPU64"\n",
107                entry, entry->ue_key);
108         OBD_FREE_PTR(entry);
109 }
110
111 static inline int upcall_compare(struct upcall_cache *cache,
112                                  struct upcall_cache_entry *entry,
113                                  __u64 key, void *args)
114 {
115         if (entry->ue_key != key)
116                 return -1;
117
118         if (cache->uc_ops->upcall_compare)
119                 return cache->uc_ops->upcall_compare(cache, entry, key, args);
120
121         return 0;
122 }
123
124 static inline int downcall_compare(struct upcall_cache *cache,
125                                    struct upcall_cache_entry *entry,
126                                    __u64 key, void *args)
127 {
128         if (entry->ue_key != key)
129                 return -1;
130
131         if (cache->uc_ops->downcall_compare)
132                 return cache->uc_ops->downcall_compare(cache, entry, key, args);
133
134         return 0;
135 }
136
137 static inline void get_entry(struct upcall_cache_entry *entry)
138 {
139         atomic_inc(&entry->ue_refcount);
140 }
141
142 static inline void put_entry(struct upcall_cache *cache,
143                              struct upcall_cache_entry *entry)
144 {
145         if (atomic_dec_and_test(&entry->ue_refcount) &&
146             (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry))) {
147                 free_entry(cache, entry);
148         }
149 }
150
151 static int check_unlink_entry(struct upcall_cache *cache,
152                               struct upcall_cache_entry *entry)
153 {
154         if (UC_CACHE_IS_VALID(entry) &&
155             time_before(jiffies, entry->ue_expire))
156                 return 0;
157
158         if (UC_CACHE_IS_ACQUIRING(entry)) {
159                 if (time_before(jiffies, entry->ue_acquire_expire))
160                         return 0;
161
162                 UC_CACHE_SET_EXPIRED(entry);
163                 wake_up_all(&entry->ue_waitq);
164         } else if (!UC_CACHE_IS_INVALID(entry)) {
165                 UC_CACHE_SET_EXPIRED(entry);
166         }
167
168         list_del_init(&entry->ue_hash);
169         if (!atomic_read(&entry->ue_refcount))
170                 free_entry(cache, entry);
171         return 1;
172 }
173
174 static inline int refresh_entry(struct upcall_cache *cache,
175                          struct upcall_cache_entry *entry)
176 {
177         LASSERT(cache->uc_ops->do_upcall);
178         return cache->uc_ops->do_upcall(cache, entry);
179 }
180
181 struct upcall_cache_entry *upcall_cache_get_entry(struct upcall_cache *cache,
182                                                   __u64 key, void *args)
183 {
184         struct upcall_cache_entry *entry = NULL, *new = NULL, *next;
185         struct list_head *head;
186         wait_queue_t wait;
187         int rc, found;
188         ENTRY;
189
190         LASSERT(cache);
191
192         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
193 find_again:
194         found = 0;
195         spin_lock(&cache->uc_lock);
196         list_for_each_entry_safe(entry, next, head, ue_hash) {
197                 /* check invalid & expired items */
198                 if (check_unlink_entry(cache, entry))
199                         continue;
200                 if (upcall_compare(cache, entry, key, args) == 0) {
201                         found = 1;
202                         break;
203                 }
204         }
205
206         if (!found) { /* didn't find it */
207                 if (!new) {
208                         spin_unlock(&cache->uc_lock);
209                         new = alloc_entry(cache, key, args);
210                         if (!new) {
211                                 CERROR("fail to alloc entry\n");
212                                 RETURN(ERR_PTR(-ENOMEM));
213                         }
214                         goto find_again;
215                 } else {
216                         list_add(&new->ue_hash, head);
217                         entry = new;
218                 }
219         } else {
220                 if (new) {
221                         free_entry(cache, new);
222                         new = NULL;
223                 }
224                 list_move(&entry->ue_hash, head);
225         }
226         get_entry(entry);
227
228         /* acquire for new one */
229         if (UC_CACHE_IS_NEW(entry)) {
230                 UC_CACHE_SET_ACQUIRING(entry);
231                 UC_CACHE_CLEAR_NEW(entry);
232                 entry->ue_acquire_expire = jiffies + cache->uc_acquire_expire;
233                 spin_unlock(&cache->uc_lock);
234                 rc = refresh_entry(cache, entry);
235                 spin_lock(&cache->uc_lock);
236                 if (rc < 0) {
237                         UC_CACHE_CLEAR_ACQUIRING(entry);
238                         UC_CACHE_SET_INVALID(entry);
239                 }
240                 /* fall through */
241         }
242         /* someone (and only one) is doing upcall upon
243          * this item, just wait it complete
244          */
245         if (UC_CACHE_IS_ACQUIRING(entry)) {
246                 unsigned long expiry = jiffies + cache->uc_acquire_expire;
247
248                 init_waitqueue_entry(&wait, current);
249                 add_wait_queue(&entry->ue_waitq, &wait);
250                 set_current_state(TASK_INTERRUPTIBLE);
251                 spin_unlock(&cache->uc_lock);
252
253                 schedule_timeout(cache->uc_acquire_expire);
254
255                 spin_lock(&cache->uc_lock);
256                 remove_wait_queue(&entry->ue_waitq, &wait);
257                 if (UC_CACHE_IS_ACQUIRING(entry)) {
258                         /* we're interrupted or upcall failed in the middle */
259                         rc = time_before(jiffies, expiry) ? -EINTR : -ETIMEDOUT;
260                         put_entry(cache, entry);
261                         CERROR("acquire timeout exceeded for key "LPU64
262                                "\n", entry->ue_key);
263                         GOTO(out, entry = ERR_PTR(rc));
264                 }
265                 /* fall through */
266         }
267
268         /* invalid means error, don't need to try again */
269         if (UC_CACHE_IS_INVALID(entry)) {
270                 put_entry(cache, entry);
271                 GOTO(out, entry = ERR_PTR(-EIDRM));
272         }
273
274         /* check expired
275          * We can't refresh the existing one because some
276          * memory might be shared by multiple processes.
277          */
278         if (check_unlink_entry(cache, entry)) {
279                 /* if expired, try again. but if this entry is
280                  * created by me but too quickly turn to expired
281                  * without any error, should at least give a
282                  * chance to use it once.
283                  */
284                 if (entry != new) {
285                         put_entry(cache, entry);
286                         spin_unlock(&cache->uc_lock);
287                         new = NULL;
288                         goto find_again;
289                 }
290         }
291
292         /* Now we know it's good */
293 out:
294         spin_unlock(&cache->uc_lock);
295         RETURN(entry);
296 }
297 EXPORT_SYMBOL(upcall_cache_get_entry);
298
299 void upcall_cache_put_entry(struct upcall_cache *cache,
300                             struct upcall_cache_entry *entry)
301 {
302         ENTRY;
303
304         if (!entry) {
305                 EXIT;
306                 return;
307         }
308
309         LASSERT(atomic_read(&entry->ue_refcount) > 0);
310         spin_lock(&cache->uc_lock);
311         put_entry(cache, entry);
312         spin_unlock(&cache->uc_lock);
313         EXIT;
314 }
315 EXPORT_SYMBOL(upcall_cache_put_entry);
316
317 int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key,
318                           void *args)
319 {
320         struct upcall_cache_entry *entry = NULL;
321         struct list_head *head;
322         int found = 0, rc = 0;
323         ENTRY;
324
325         LASSERT(cache);
326
327         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
328
329         spin_lock(&cache->uc_lock);
330         list_for_each_entry(entry, head, ue_hash) {
331                 if (downcall_compare(cache, entry, key, args) == 0) {
332                         found = 1;
333                         get_entry(entry);
334                         break;
335                 }
336         }
337
338         if (!found) {
339                 CDEBUG(D_OTHER, "%s: upcall for key "LPU64" not expected\n",
340                        cache->uc_name, key);
341                 /* haven't found, it's possible */
342                 spin_unlock(&cache->uc_lock);
343                 RETURN(-EINVAL);
344         }
345
346         if (err) {
347                 CDEBUG(D_OTHER, "%s: upcall for key "LPU64" returned %d\n",
348                        cache->uc_name, entry->ue_key, err);
349                 GOTO(out, rc = -EINVAL);
350         }
351
352         if (!UC_CACHE_IS_ACQUIRING(entry)) {
353                 CDEBUG(D_HA, "%s: found uptodate entry %p (key "LPU64")\n",
354                        cache->uc_name, entry, entry->ue_key);
355                 GOTO(out, rc = 0);
356         }
357
358         if (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry)) {
359                 CERROR("%s: found a stale entry %p (key "LPU64") in ioctl\n",
360                        cache->uc_name, entry, entry->ue_key);
361                 GOTO(out, rc = -EINVAL);
362         }
363
364         spin_unlock(&cache->uc_lock);
365         if (cache->uc_ops->parse_downcall)
366                 rc = cache->uc_ops->parse_downcall(cache, entry, args);
367         spin_lock(&cache->uc_lock);
368         if (rc)
369                 GOTO(out, rc);
370
371         entry->ue_expire = jiffies + cache->uc_entry_expire;
372         UC_CACHE_SET_VALID(entry);
373         CDEBUG(D_OTHER, "%s: created upcall cache entry %p for key "LPU64"\n",
374                cache->uc_name, entry, entry->ue_key);
375 out:
376         if (rc) {
377                 UC_CACHE_SET_INVALID(entry);
378                 list_del_init(&entry->ue_hash);
379         }
380         UC_CACHE_CLEAR_ACQUIRING(entry);
381         spin_unlock(&cache->uc_lock);
382         wake_up_all(&entry->ue_waitq);
383         put_entry(cache, entry);
384
385         RETURN(rc);
386 }
387 EXPORT_SYMBOL(upcall_cache_downcall);
388
389 static void cache_flush(struct upcall_cache *cache, int force)
390 {
391         struct upcall_cache_entry *entry, *next;
392         int i;
393         ENTRY;
394
395         spin_lock(&cache->uc_lock);
396         for (i = 0; i < UC_CACHE_HASH_SIZE; i++) {
397                 list_for_each_entry_safe(entry, next,
398                                          &cache->uc_hashtable[i], ue_hash) {
399                         if (!force && atomic_read(&entry->ue_refcount)) {
400                                 UC_CACHE_SET_EXPIRED(entry);
401                                 continue;
402                         }
403                         LASSERT(!atomic_read(&entry->ue_refcount));
404                         free_entry(cache, entry);
405                 }
406         }
407         spin_unlock(&cache->uc_lock);
408         EXIT;
409 }
410
411 void upcall_cache_flush_idle(struct upcall_cache *cache)
412 {
413         cache_flush(cache, 0);
414 }
415 EXPORT_SYMBOL(upcall_cache_flush_idle);
416
417 void upcall_cache_flush_all(struct upcall_cache *cache)
418 {
419         cache_flush(cache, 1);
420 }
421 EXPORT_SYMBOL(upcall_cache_flush_all);
422
423 void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key, void *args)
424 {
425         struct list_head *head;
426         struct upcall_cache_entry *entry;
427         int found = 0;
428         ENTRY;
429
430         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
431
432         spin_lock(&cache->uc_lock);
433         list_for_each_entry(entry, head, ue_hash) {
434                 if (upcall_compare(cache, entry, key, args) == 0) {
435                         found = 1;
436                         break;
437                 }
438         }
439
440         if (found) {
441                 CWARN("%s: flush entry %p: key "LPU64", ref %d, fl %x, "
442                       "cur %lu, ex %ld/%ld\n",
443                       cache->uc_name, entry, entry->ue_key,
444                       atomic_read(&entry->ue_refcount), entry->ue_flags,
445                       get_seconds(), entry->ue_acquire_expire,
446                       entry->ue_expire);
447                 UC_CACHE_SET_EXPIRED(entry);
448                 if (!atomic_read(&entry->ue_refcount))
449                         free_entry(cache, entry);
450         }
451         spin_unlock(&cache->uc_lock);
452 }
453 EXPORT_SYMBOL(upcall_cache_flush_one);
454
455 struct upcall_cache *upcall_cache_init(const char *name, const char *upcall,
456                                        struct upcall_cache_ops *ops)
457 {
458         struct upcall_cache *cache;
459         int i;
460         ENTRY;
461
462         OBD_ALLOC(cache, sizeof(*cache));
463         if (!cache)
464                 RETURN(ERR_PTR(-ENOMEM));
465
466         spin_lock_init(&cache->uc_lock);
467         for (i = 0; i < UC_CACHE_HASH_SIZE; i++)
468                 INIT_LIST_HEAD(&cache->uc_hashtable[i]);
469         strncpy(cache->uc_name, name, sizeof(cache->uc_name) - 1);
470         /* upcall pathname proc tunable */
471         strncpy(cache->uc_upcall, upcall, sizeof(cache->uc_upcall) - 1);
472         cache->uc_entry_expire = 10 * 60 * HZ;
473         cache->uc_acquire_expire = 15 * HZ;
474         cache->uc_ops = ops;
475
476         RETURN(cache);
477 }
478 EXPORT_SYMBOL(upcall_cache_init);
479
480 void upcall_cache_cleanup(struct upcall_cache *cache)
481 {
482         if (!cache)
483                 return;
484         upcall_cache_flush_all(cache);
485         OBD_FREE(cache, sizeof(*cache));
486 }
487 EXPORT_SYMBOL(upcall_cache_cleanup);