Whamcloud - gitweb
LU-3544 nfs: writing to new files will return ENOENT
[fs/lustre-release.git] / libcfs / libcfs / upcall_cache.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * libcfs/libcfs/upcall_cache.c
37  *
38  * Supplementary groups cache.
39  */
40 #define DEBUG_SUBSYSTEM S_SEC
41
42 #include <libcfs/lucache.h>
43
44 static struct upcall_cache_entry *alloc_entry(struct upcall_cache *cache,
45                                               __u64 key, void *args)
46 {
47         struct upcall_cache_entry *entry;
48
49         LIBCFS_ALLOC(entry, sizeof(*entry));
50         if (!entry)
51                 return NULL;
52
53         UC_CACHE_SET_NEW(entry);
54         CFS_INIT_LIST_HEAD(&entry->ue_hash);
55         entry->ue_key = key;
56         cfs_atomic_set(&entry->ue_refcount, 0);
57         cfs_waitq_init(&entry->ue_waitq);
58         if (cache->uc_ops->init_entry)
59                 cache->uc_ops->init_entry(entry, args);
60         return entry;
61 }
62
63 /* protected by cache lock */
64 static void free_entry(struct upcall_cache *cache,
65                        struct upcall_cache_entry *entry)
66 {
67         if (cache->uc_ops->free_entry)
68                 cache->uc_ops->free_entry(cache, entry);
69
70         cfs_list_del(&entry->ue_hash);
71         CDEBUG(D_OTHER, "destroy cache entry %p for key "LPU64"\n",
72                entry, entry->ue_key);
73         LIBCFS_FREE(entry, sizeof(*entry));
74 }
75
76 static inline int upcall_compare(struct upcall_cache *cache,
77                                  struct upcall_cache_entry *entry,
78                                  __u64 key, void *args)
79 {
80         if (entry->ue_key != key)
81                 return -1;
82
83         if (cache->uc_ops->upcall_compare)
84                 return cache->uc_ops->upcall_compare(cache, entry, key, args);
85
86         return 0;
87 }
88
89 static inline int downcall_compare(struct upcall_cache *cache,
90                                    struct upcall_cache_entry *entry,
91                                    __u64 key, void *args)
92 {
93         if (entry->ue_key != key)
94                 return -1;
95
96         if (cache->uc_ops->downcall_compare)
97                 return cache->uc_ops->downcall_compare(cache, entry, key, args);
98
99         return 0;
100 }
101
102 static inline void get_entry(struct upcall_cache_entry *entry)
103 {
104         cfs_atomic_inc(&entry->ue_refcount);
105 }
106
107 static inline void put_entry(struct upcall_cache *cache,
108                              struct upcall_cache_entry *entry)
109 {
110         if (cfs_atomic_dec_and_test(&entry->ue_refcount) &&
111             (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry))) {
112                 free_entry(cache, entry);
113         }
114 }
115
116 static int check_unlink_entry(struct upcall_cache *cache,
117                               struct upcall_cache_entry *entry)
118 {
119         if (UC_CACHE_IS_VALID(entry) &&
120             cfs_time_before(cfs_time_current(), entry->ue_expire))
121                 return 0;
122
123         if (UC_CACHE_IS_ACQUIRING(entry)) {
124                 if (entry->ue_acquire_expire == 0 ||
125                     cfs_time_before(cfs_time_current(),
126                                     entry->ue_acquire_expire))
127                         return 0;
128
129                 UC_CACHE_SET_EXPIRED(entry);
130                 cfs_waitq_broadcast(&entry->ue_waitq);
131         } else if (!UC_CACHE_IS_INVALID(entry)) {
132                 UC_CACHE_SET_EXPIRED(entry);
133         }
134
135         cfs_list_del_init(&entry->ue_hash);
136         if (!cfs_atomic_read(&entry->ue_refcount))
137                 free_entry(cache, entry);
138         return 1;
139 }
140
141 static inline int refresh_entry(struct upcall_cache *cache,
142                          struct upcall_cache_entry *entry)
143 {
144         LASSERT(cache->uc_ops->do_upcall);
145         return cache->uc_ops->do_upcall(cache, entry);
146 }
147
148 struct upcall_cache_entry *upcall_cache_get_entry(struct upcall_cache *cache,
149                                                   __u64 key, void *args)
150 {
151         struct upcall_cache_entry *entry = NULL, *new = NULL, *next;
152         cfs_list_t *head;
153         cfs_waitlink_t wait;
154         int rc, found;
155         ENTRY;
156
157         LASSERT(cache);
158
159         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
160 find_again:
161         found = 0;
162         spin_lock(&cache->uc_lock);
163         cfs_list_for_each_entry_safe(entry, next, head, ue_hash) {
164                 /* check invalid & expired items */
165                 if (check_unlink_entry(cache, entry))
166                         continue;
167                 if (upcall_compare(cache, entry, key, args) == 0) {
168                         found = 1;
169                         break;
170                 }
171         }
172
173         if (!found) {
174                 if (!new) {
175                         spin_unlock(&cache->uc_lock);
176                         new = alloc_entry(cache, key, args);
177                         if (!new) {
178                                 CERROR("fail to alloc entry\n");
179                                 RETURN(ERR_PTR(-ENOMEM));
180                         }
181                         goto find_again;
182                 } else {
183                         cfs_list_add(&new->ue_hash, head);
184                         entry = new;
185                 }
186         } else {
187                 if (new) {
188                         free_entry(cache, new);
189                         new = NULL;
190                 }
191                 cfs_list_move(&entry->ue_hash, head);
192         }
193         get_entry(entry);
194
195         /* acquire for new one */
196         if (UC_CACHE_IS_NEW(entry)) {
197                 UC_CACHE_SET_ACQUIRING(entry);
198                 UC_CACHE_CLEAR_NEW(entry);
199                 spin_unlock(&cache->uc_lock);
200                 rc = refresh_entry(cache, entry);
201                 spin_lock(&cache->uc_lock);
202                 entry->ue_acquire_expire =
203                         cfs_time_shift(cache->uc_acquire_expire);
204                 if (rc < 0) {
205                         UC_CACHE_CLEAR_ACQUIRING(entry);
206                         UC_CACHE_SET_INVALID(entry);
207                         cfs_waitq_broadcast(&entry->ue_waitq);
208                         if (unlikely(rc == -EREMCHG)) {
209                                 put_entry(cache, entry);
210                                 GOTO(out, entry = ERR_PTR(rc));
211                         }
212                 }
213         }
214         /* someone (and only one) is doing upcall upon this item,
215          * wait it to complete */
216         if (UC_CACHE_IS_ACQUIRING(entry)) {
217                 long expiry = (entry == new) ?
218                               cfs_time_seconds(cache->uc_acquire_expire) :
219                               CFS_MAX_SCHEDULE_TIMEOUT;
220                 long left;
221
222                 cfs_waitlink_init(&wait);
223                 cfs_waitq_add(&entry->ue_waitq, &wait);
224                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
225                 spin_unlock(&cache->uc_lock);
226
227                 left = cfs_waitq_timedwait(&wait, CFS_TASK_INTERRUPTIBLE,
228                                            expiry);
229
230                 spin_lock(&cache->uc_lock);
231                 cfs_waitq_del(&entry->ue_waitq, &wait);
232                 if (UC_CACHE_IS_ACQUIRING(entry)) {
233                         /* we're interrupted or upcall failed in the middle */
234                         rc = left > 0 ? -EINTR : -ETIMEDOUT;
235                         CERROR("acquire for key "LPU64": error %d\n",
236                                entry->ue_key, rc);
237                         put_entry(cache, entry);
238                         GOTO(out, entry = ERR_PTR(rc));
239                 }
240         }
241
242         /* invalid means error, don't need to try again */
243         if (UC_CACHE_IS_INVALID(entry)) {
244                 put_entry(cache, entry);
245                 GOTO(out, entry = ERR_PTR(-EIDRM));
246         }
247
248         /* check expired
249          * We can't refresh the existing one because some
250          * memory might be shared by multiple processes.
251          */
252         if (check_unlink_entry(cache, entry)) {
253                 /* if expired, try again. but if this entry is
254                  * created by me but too quickly turn to expired
255                  * without any error, should at least give a
256                  * chance to use it once.
257                  */
258                 if (entry != new) {
259                         put_entry(cache, entry);
260                         spin_unlock(&cache->uc_lock);
261                         new = NULL;
262                         goto find_again;
263                 }
264         }
265
266         /* Now we know it's good */
267 out:
268         spin_unlock(&cache->uc_lock);
269         RETURN(entry);
270 }
271 EXPORT_SYMBOL(upcall_cache_get_entry);
272
273 void upcall_cache_put_entry(struct upcall_cache *cache,
274                             struct upcall_cache_entry *entry)
275 {
276         ENTRY;
277
278         if (!entry) {
279                 EXIT;
280                 return;
281         }
282
283         LASSERT(cfs_atomic_read(&entry->ue_refcount) > 0);
284         spin_lock(&cache->uc_lock);
285         put_entry(cache, entry);
286         spin_unlock(&cache->uc_lock);
287         EXIT;
288 }
289 EXPORT_SYMBOL(upcall_cache_put_entry);
290
291 int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key,
292                           void *args)
293 {
294         struct upcall_cache_entry *entry = NULL;
295         cfs_list_t *head;
296         int found = 0, rc = 0;
297         ENTRY;
298
299         LASSERT(cache);
300
301         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
302
303         spin_lock(&cache->uc_lock);
304         cfs_list_for_each_entry(entry, head, ue_hash) {
305                 if (downcall_compare(cache, entry, key, args) == 0) {
306                         found = 1;
307                         get_entry(entry);
308                         break;
309                 }
310         }
311
312         if (!found) {
313                 CDEBUG(D_OTHER, "%s: upcall for key "LPU64" not expected\n",
314                        cache->uc_name, key);
315                 /* haven't found, it's possible */
316                 spin_unlock(&cache->uc_lock);
317                 RETURN(-EINVAL);
318         }
319
320         if (err) {
321                 CDEBUG(D_OTHER, "%s: upcall for key "LPU64" returned %d\n",
322                        cache->uc_name, entry->ue_key, err);
323                 GOTO(out, rc = -EINVAL);
324         }
325
326         if (!UC_CACHE_IS_ACQUIRING(entry)) {
327                 CDEBUG(D_RPCTRACE,"%s: found uptodate entry %p (key "LPU64")\n",
328                        cache->uc_name, entry, entry->ue_key);
329                 GOTO(out, rc = 0);
330         }
331
332         if (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry)) {
333                 CERROR("%s: found a stale entry %p (key "LPU64") in ioctl\n",
334                        cache->uc_name, entry, entry->ue_key);
335                 GOTO(out, rc = -EINVAL);
336         }
337
338         spin_unlock(&cache->uc_lock);
339         if (cache->uc_ops->parse_downcall)
340                 rc = cache->uc_ops->parse_downcall(cache, entry, args);
341         spin_lock(&cache->uc_lock);
342         if (rc)
343                 GOTO(out, rc);
344
345         entry->ue_expire = cfs_time_shift(cache->uc_entry_expire);
346         UC_CACHE_SET_VALID(entry);
347         CDEBUG(D_OTHER, "%s: created upcall cache entry %p for key "LPU64"\n",
348                cache->uc_name, entry, entry->ue_key);
349 out:
350         if (rc) {
351                 UC_CACHE_SET_INVALID(entry);
352                 cfs_list_del_init(&entry->ue_hash);
353         }
354         UC_CACHE_CLEAR_ACQUIRING(entry);
355         spin_unlock(&cache->uc_lock);
356         cfs_waitq_broadcast(&entry->ue_waitq);
357         put_entry(cache, entry);
358
359         RETURN(rc);
360 }
361 EXPORT_SYMBOL(upcall_cache_downcall);
362
363 static void cache_flush(struct upcall_cache *cache, int force)
364 {
365         struct upcall_cache_entry *entry, *next;
366         int i;
367         ENTRY;
368
369         spin_lock(&cache->uc_lock);
370         for (i = 0; i < UC_CACHE_HASH_SIZE; i++) {
371                 cfs_list_for_each_entry_safe(entry, next,
372                                          &cache->uc_hashtable[i], ue_hash) {
373                         if (!force && cfs_atomic_read(&entry->ue_refcount)) {
374                                 UC_CACHE_SET_EXPIRED(entry);
375                                 continue;
376                         }
377                         LASSERT(!cfs_atomic_read(&entry->ue_refcount));
378                         free_entry(cache, entry);
379                 }
380         }
381         spin_unlock(&cache->uc_lock);
382         EXIT;
383 }
384
385 void upcall_cache_flush_idle(struct upcall_cache *cache)
386 {
387         cache_flush(cache, 0);
388 }
389 EXPORT_SYMBOL(upcall_cache_flush_idle);
390
391 void upcall_cache_flush_all(struct upcall_cache *cache)
392 {
393         cache_flush(cache, 1);
394 }
395 EXPORT_SYMBOL(upcall_cache_flush_all);
396
397 void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key, void *args)
398 {
399         cfs_list_t *head;
400         struct upcall_cache_entry *entry;
401         int found = 0;
402         ENTRY;
403
404         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
405
406         spin_lock(&cache->uc_lock);
407         cfs_list_for_each_entry(entry, head, ue_hash) {
408                 if (upcall_compare(cache, entry, key, args) == 0) {
409                         found = 1;
410                         break;
411                 }
412         }
413
414         if (found) {
415                 CWARN("%s: flush entry %p: key "LPU64", ref %d, fl %x, "
416                       "cur %lu, ex %ld/%ld\n",
417                       cache->uc_name, entry, entry->ue_key,
418                       cfs_atomic_read(&entry->ue_refcount), entry->ue_flags,
419                       cfs_time_current_sec(), entry->ue_acquire_expire,
420                       entry->ue_expire);
421                 UC_CACHE_SET_EXPIRED(entry);
422                 if (!cfs_atomic_read(&entry->ue_refcount))
423                         free_entry(cache, entry);
424         }
425         spin_unlock(&cache->uc_lock);
426 }
427 EXPORT_SYMBOL(upcall_cache_flush_one);
428
429 struct upcall_cache *upcall_cache_init(const char *name, const char *upcall,
430                                        struct upcall_cache_ops *ops)
431 {
432         struct upcall_cache *cache;
433         int i;
434         ENTRY;
435
436         LIBCFS_ALLOC(cache, sizeof(*cache));
437         if (!cache)
438                 RETURN(ERR_PTR(-ENOMEM));
439
440         spin_lock_init(&cache->uc_lock);
441         rwlock_init(&cache->uc_upcall_rwlock);
442         for (i = 0; i < UC_CACHE_HASH_SIZE; i++)
443                 CFS_INIT_LIST_HEAD(&cache->uc_hashtable[i]);
444         strncpy(cache->uc_name, name, sizeof(cache->uc_name) - 1);
445         /* upcall pathname proc tunable */
446         strncpy(cache->uc_upcall, upcall, sizeof(cache->uc_upcall) - 1);
447         cache->uc_entry_expire = 20 * 60;
448         cache->uc_acquire_expire = 30;
449         cache->uc_ops = ops;
450
451         RETURN(cache);
452 }
453 EXPORT_SYMBOL(upcall_cache_init);
454
455 void upcall_cache_cleanup(struct upcall_cache *cache)
456 {
457         if (!cache)
458                 return;
459         upcall_cache_flush_all(cache);
460         LIBCFS_FREE(cache, sizeof(*cache));
461 }
462 EXPORT_SYMBOL(upcall_cache_cleanup);