Whamcloud - gitweb
LU-12254 lnet: correct discovery LNetEQFree()
[fs/lustre-release.git] / lustre / obdclass / upcall_cache.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/upcall_cache.c
33  *
34  * Supplementary groups cache.
35  */
36 #define DEBUG_SUBSYSTEM S_SEC
37
38 #include <libcfs/libcfs.h>
39 #include <uapi/linux/lnet/lnet-types.h>
40 #include <upcall_cache.h>
41
42 static struct upcall_cache_entry *alloc_entry(struct upcall_cache *cache,
43                                               __u64 key, void *args)
44 {
45         struct upcall_cache_entry *entry;
46
47         LIBCFS_ALLOC(entry, sizeof(*entry));
48         if (!entry)
49                 return NULL;
50
51         UC_CACHE_SET_NEW(entry);
52         INIT_LIST_HEAD(&entry->ue_hash);
53         entry->ue_key = key;
54         atomic_set(&entry->ue_refcount, 0);
55         init_waitqueue_head(&entry->ue_waitq);
56         if (cache->uc_ops->init_entry)
57                 cache->uc_ops->init_entry(entry, args);
58         return entry;
59 }
60
61 /* protected by cache lock */
62 static void free_entry(struct upcall_cache *cache,
63                        struct upcall_cache_entry *entry)
64 {
65         if (cache->uc_ops->free_entry)
66                 cache->uc_ops->free_entry(cache, entry);
67
68         list_del(&entry->ue_hash);
69         CDEBUG(D_OTHER, "destroy cache entry %p for key %llu\n",
70                 entry, entry->ue_key);
71         LIBCFS_FREE(entry, sizeof(*entry));
72 }
73
74 static inline int upcall_compare(struct upcall_cache *cache,
75                                  struct upcall_cache_entry *entry,
76                                  __u64 key, void *args)
77 {
78         if (entry->ue_key != key)
79                 return -1;
80
81         if (cache->uc_ops->upcall_compare)
82                 return cache->uc_ops->upcall_compare(cache, entry, key, args);
83
84         return 0;
85 }
86
87 static inline int downcall_compare(struct upcall_cache *cache,
88                                    struct upcall_cache_entry *entry,
89                                    __u64 key, void *args)
90 {
91         if (entry->ue_key != key)
92                 return -1;
93
94         if (cache->uc_ops->downcall_compare)
95                 return cache->uc_ops->downcall_compare(cache, entry, key, args);
96
97         return 0;
98 }
99
100 static inline void get_entry(struct upcall_cache_entry *entry)
101 {
102         atomic_inc(&entry->ue_refcount);
103 }
104
105 static inline void put_entry(struct upcall_cache *cache,
106                              struct upcall_cache_entry *entry)
107 {
108         if (atomic_dec_and_test(&entry->ue_refcount) &&
109             (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry))) {
110                 free_entry(cache, entry);
111         }
112 }
113
114 static int check_unlink_entry(struct upcall_cache *cache,
115                               struct upcall_cache_entry *entry)
116 {
117         time64_t now = ktime_get_seconds();
118
119         if (UC_CACHE_IS_VALID(entry) && now < entry->ue_expire)
120                 return 0;
121
122         if (UC_CACHE_IS_ACQUIRING(entry)) {
123                 if (entry->ue_acquire_expire == 0 ||
124                     now < entry->ue_acquire_expire)
125                         return 0;
126
127                 UC_CACHE_SET_EXPIRED(entry);
128                 wake_up_all(&entry->ue_waitq);
129         } else if (!UC_CACHE_IS_INVALID(entry)) {
130                 UC_CACHE_SET_EXPIRED(entry);
131         }
132
133         list_del_init(&entry->ue_hash);
134         if (!atomic_read(&entry->ue_refcount))
135                 free_entry(cache, entry);
136         return 1;
137 }
138
139 static inline int refresh_entry(struct upcall_cache *cache,
140                          struct upcall_cache_entry *entry)
141 {
142         LASSERT(cache->uc_ops->do_upcall);
143         return cache->uc_ops->do_upcall(cache, entry);
144 }
145
146 struct upcall_cache_entry *upcall_cache_get_entry(struct upcall_cache *cache,
147                                                   __u64 key, void *args)
148 {
149         struct upcall_cache_entry *entry = NULL, *new = NULL, *next;
150         struct list_head *head;
151         wait_queue_entry_t wait;
152         int rc, found;
153         ENTRY;
154
155         LASSERT(cache);
156
157         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
158 find_again:
159         found = 0;
160         spin_lock(&cache->uc_lock);
161         list_for_each_entry_safe(entry, next, head, ue_hash) {
162                 /* check invalid & expired items */
163                 if (check_unlink_entry(cache, entry))
164                         continue;
165                 if (upcall_compare(cache, entry, key, args) == 0) {
166                         found = 1;
167                         break;
168                 }
169         }
170
171         if (!found) {
172                 if (!new) {
173                         spin_unlock(&cache->uc_lock);
174                         new = alloc_entry(cache, key, args);
175                         if (!new) {
176                                 CERROR("fail to alloc entry\n");
177                                 RETURN(ERR_PTR(-ENOMEM));
178                         }
179                         goto find_again;
180                 } else {
181                         list_add(&new->ue_hash, head);
182                         entry = new;
183                 }
184         } else {
185                 if (new) {
186                         free_entry(cache, new);
187                         new = NULL;
188                 }
189                 list_move(&entry->ue_hash, head);
190         }
191         get_entry(entry);
192
193         /* acquire for new one */
194         if (UC_CACHE_IS_NEW(entry)) {
195                 UC_CACHE_SET_ACQUIRING(entry);
196                 UC_CACHE_CLEAR_NEW(entry);
197                 spin_unlock(&cache->uc_lock);
198                 rc = refresh_entry(cache, entry);
199                 spin_lock(&cache->uc_lock);
200                 entry->ue_acquire_expire = ktime_get_seconds() +
201                                            cache->uc_acquire_expire;
202                 if (rc < 0) {
203                         UC_CACHE_CLEAR_ACQUIRING(entry);
204                         UC_CACHE_SET_INVALID(entry);
205                         wake_up_all(&entry->ue_waitq);
206                         if (unlikely(rc == -EREMCHG)) {
207                                 put_entry(cache, entry);
208                                 GOTO(out, entry = ERR_PTR(rc));
209                         }
210                 }
211         }
212         /* someone (and only one) is doing upcall upon this item,
213          * wait it to complete */
214         if (UC_CACHE_IS_ACQUIRING(entry)) {
215                 long expiry = (entry == new) ?
216                               cfs_time_seconds(cache->uc_acquire_expire) :
217                               MAX_SCHEDULE_TIMEOUT;
218                 long left;
219
220                 init_waitqueue_entry(&wait, current);
221                 add_wait_queue(&entry->ue_waitq, &wait);
222                 set_current_state(TASK_INTERRUPTIBLE);
223                 spin_unlock(&cache->uc_lock);
224
225                 left = schedule_timeout(expiry);
226
227                 spin_lock(&cache->uc_lock);
228                 remove_wait_queue(&entry->ue_waitq, &wait);
229                 if (UC_CACHE_IS_ACQUIRING(entry)) {
230                         /* we're interrupted or upcall failed in the middle */
231                         rc = left > 0 ? -EINTR : -ETIMEDOUT;
232                         CERROR("acquire for key %llu: error %d\n",
233                                entry->ue_key, rc);
234                         put_entry(cache, entry);
235                         GOTO(out, entry = ERR_PTR(rc));
236                 }
237         }
238
239         /* invalid means error, don't need to try again */
240         if (UC_CACHE_IS_INVALID(entry)) {
241                 put_entry(cache, entry);
242                 GOTO(out, entry = ERR_PTR(-EIDRM));
243         }
244
245         /* check expired
246          * We can't refresh the existing one because some
247          * memory might be shared by multiple processes.
248          */
249         if (check_unlink_entry(cache, entry)) {
250                 /* if expired, try again. but if this entry is
251                  * created by me but too quickly turn to expired
252                  * without any error, should at least give a
253                  * chance to use it once.
254                  */
255                 if (entry != new) {
256                         put_entry(cache, entry);
257                         spin_unlock(&cache->uc_lock);
258                         new = NULL;
259                         goto find_again;
260                 }
261         }
262
263         /* Now we know it's good */
264 out:
265         spin_unlock(&cache->uc_lock);
266         RETURN(entry);
267 }
268 EXPORT_SYMBOL(upcall_cache_get_entry);
269
270 void upcall_cache_put_entry(struct upcall_cache *cache,
271                             struct upcall_cache_entry *entry)
272 {
273         ENTRY;
274
275         if (!entry) {
276                 EXIT;
277                 return;
278         }
279
280         LASSERT(atomic_read(&entry->ue_refcount) > 0);
281         spin_lock(&cache->uc_lock);
282         put_entry(cache, entry);
283         spin_unlock(&cache->uc_lock);
284         EXIT;
285 }
286 EXPORT_SYMBOL(upcall_cache_put_entry);
287
288 int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key,
289                           void *args)
290 {
291         struct upcall_cache_entry *entry = NULL;
292         struct list_head *head;
293         int found = 0, rc = 0;
294         ENTRY;
295
296         LASSERT(cache);
297
298         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
299
300         spin_lock(&cache->uc_lock);
301         list_for_each_entry(entry, head, ue_hash) {
302                 if (downcall_compare(cache, entry, key, args) == 0) {
303                         found = 1;
304                         get_entry(entry);
305                         break;
306                 }
307         }
308
309         if (!found) {
310                 CDEBUG(D_OTHER, "%s: upcall for key %llu not expected\n",
311                        cache->uc_name, key);
312                 /* haven't found, it's possible */
313                 spin_unlock(&cache->uc_lock);
314                 RETURN(-EINVAL);
315         }
316
317         if (err) {
318                 CDEBUG(D_OTHER, "%s: upcall for key %llu returned %d\n",
319                        cache->uc_name, entry->ue_key, err);
320                 GOTO(out, rc = -EINVAL);
321         }
322
323         if (!UC_CACHE_IS_ACQUIRING(entry)) {
324                 CDEBUG(D_RPCTRACE, "%s: found uptodate entry %p (key %llu)"
325                        "\n", cache->uc_name, entry, entry->ue_key);
326                 GOTO(out, rc = 0);
327         }
328
329         if (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry)) {
330                 CERROR("%s: found a stale entry %p (key %llu) in ioctl\n",
331                        cache->uc_name, entry, entry->ue_key);
332                 GOTO(out, rc = -EINVAL);
333         }
334
335         spin_unlock(&cache->uc_lock);
336         if (cache->uc_ops->parse_downcall)
337                 rc = cache->uc_ops->parse_downcall(cache, entry, args);
338         spin_lock(&cache->uc_lock);
339         if (rc)
340                 GOTO(out, rc);
341
342         entry->ue_expire = ktime_get_seconds() + cache->uc_entry_expire;
343         UC_CACHE_SET_VALID(entry);
344         CDEBUG(D_OTHER, "%s: created upcall cache entry %p for key %llu\n",
345                cache->uc_name, entry, entry->ue_key);
346 out:
347         if (rc) {
348                 UC_CACHE_SET_INVALID(entry);
349                 list_del_init(&entry->ue_hash);
350         }
351         UC_CACHE_CLEAR_ACQUIRING(entry);
352         spin_unlock(&cache->uc_lock);
353         wake_up_all(&entry->ue_waitq);
354         put_entry(cache, entry);
355
356         RETURN(rc);
357 }
358 EXPORT_SYMBOL(upcall_cache_downcall);
359
360 void upcall_cache_flush(struct upcall_cache *cache, int force)
361 {
362         struct upcall_cache_entry *entry, *next;
363         int i;
364         ENTRY;
365
366         spin_lock(&cache->uc_lock);
367         for (i = 0; i < UC_CACHE_HASH_SIZE; i++) {
368                 list_for_each_entry_safe(entry, next,
369                                          &cache->uc_hashtable[i], ue_hash) {
370                         if (!force && atomic_read(&entry->ue_refcount)) {
371                                 UC_CACHE_SET_EXPIRED(entry);
372                                 continue;
373                         }
374                         LASSERT(!atomic_read(&entry->ue_refcount));
375                         free_entry(cache, entry);
376                 }
377         }
378         spin_unlock(&cache->uc_lock);
379         EXIT;
380 }
381 EXPORT_SYMBOL(upcall_cache_flush);
382
383 void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key, void *args)
384 {
385         struct list_head *head;
386         struct upcall_cache_entry *entry;
387         int found = 0;
388         ENTRY;
389
390         head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
391
392         spin_lock(&cache->uc_lock);
393         list_for_each_entry(entry, head, ue_hash) {
394                 if (upcall_compare(cache, entry, key, args) == 0) {
395                         found = 1;
396                         break;
397                 }
398         }
399
400         if (found) {
401                 CWARN("%s: flush entry %p: key %llu, ref %d, fl %x, "
402                       "cur %lld, ex %lld/%lld\n",
403                       cache->uc_name, entry, entry->ue_key,
404                       atomic_read(&entry->ue_refcount), entry->ue_flags,
405                       ktime_get_real_seconds(), entry->ue_acquire_expire,
406                       entry->ue_expire);
407                 UC_CACHE_SET_EXPIRED(entry);
408                 if (!atomic_read(&entry->ue_refcount))
409                         free_entry(cache, entry);
410         }
411         spin_unlock(&cache->uc_lock);
412 }
413 EXPORT_SYMBOL(upcall_cache_flush_one);
414
415 struct upcall_cache *upcall_cache_init(const char *name, const char *upcall,
416                                        struct upcall_cache_ops *ops)
417 {
418         struct upcall_cache *cache;
419         int i;
420         ENTRY;
421
422         LIBCFS_ALLOC(cache, sizeof(*cache));
423         if (!cache)
424                 RETURN(ERR_PTR(-ENOMEM));
425
426         spin_lock_init(&cache->uc_lock);
427         init_rwsem(&cache->uc_upcall_rwsem);
428         for (i = 0; i < UC_CACHE_HASH_SIZE; i++)
429                 INIT_LIST_HEAD(&cache->uc_hashtable[i]);
430         strlcpy(cache->uc_name, name, sizeof(cache->uc_name));
431         /* upcall pathname proc tunable */
432         strlcpy(cache->uc_upcall, upcall, sizeof(cache->uc_upcall));
433         cache->uc_entry_expire = 20 * 60;
434         cache->uc_acquire_expire = 30;
435         cache->uc_ops = ops;
436
437         RETURN(cache);
438 }
439 EXPORT_SYMBOL(upcall_cache_init);
440
441 void upcall_cache_cleanup(struct upcall_cache *cache)
442 {
443         if (!cache)
444                 return;
445         upcall_cache_flush_all(cache);
446         LIBCFS_FREE(cache, sizeof(*cache));
447 }
448 EXPORT_SYMBOL(upcall_cache_cleanup);