Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / obdclass / upcall_cache.c
index 4d9ef1c..f71901b 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/obdclass/upcall_cache.c
  *
@@ -40,7 +35,7 @@
 #define DEBUG_SUBSYSTEM S_SEC
 
 #include <libcfs/libcfs.h>
-#include <lnet/types.h>
+#include <uapi/linux/lnet/lnet-types.h>
 #include <upcall_cache.h>
 
 static struct upcall_cache_entry *alloc_entry(struct upcall_cache *cache,
@@ -57,6 +52,8 @@ static struct upcall_cache_entry *alloc_entry(struct upcall_cache *cache,
        entry->ue_key = key;
        atomic_set(&entry->ue_refcount, 0);
        init_waitqueue_head(&entry->ue_waitq);
+       entry->ue_acquire_expire = 0;
+       entry->ue_expire = 0;
        if (cache->uc_ops->init_entry)
                cache->uc_ops->init_entry(entry, args);
        return entry;
@@ -70,7 +67,7 @@ static void free_entry(struct upcall_cache *cache,
                cache->uc_ops->free_entry(cache, entry);
 
        list_del(&entry->ue_hash);
-       CDEBUG(D_OTHER, "destroy cache entry %p for key "LPU64"\n",
+       CDEBUG(D_OTHER, "destroy cache entry %p for key %llu\n",
                entry, entry->ue_key);
        LIBCFS_FREE(entry, sizeof(*entry));
 }
@@ -115,31 +112,86 @@ static inline void put_entry(struct upcall_cache *cache,
        }
 }
 
+static inline void write_lock_from_read(rwlock_t *lock, bool *writelock)
+{
+       if (!*writelock) {
+               read_unlock(lock);
+               write_lock(lock);
+               *writelock = true;
+       }
+}
+
 static int check_unlink_entry(struct upcall_cache *cache,
-                             struct upcall_cache_entry *entry)
+                             struct upcall_cache_entry *entry,
+                             bool writelock)
 {
-       if (UC_CACHE_IS_VALID(entry) &&
-           cfs_time_before(cfs_time_current(), entry->ue_expire))
+       time64_t now = ktime_get_seconds();
+
+       if (UC_CACHE_IS_VALID(entry) && now < entry->ue_expire)
                return 0;
 
        if (UC_CACHE_IS_ACQUIRING(entry)) {
                if (entry->ue_acquire_expire == 0 ||
-                   cfs_time_before(cfs_time_current(),
-                                   entry->ue_acquire_expire))
+                   now < entry->ue_acquire_expire)
                        return 0;
 
-               UC_CACHE_SET_EXPIRED(entry);
-               wake_up_all(&entry->ue_waitq);
-       } else if (!UC_CACHE_IS_INVALID(entry)) {
+               if (writelock) {
+                       UC_CACHE_SET_EXPIRED(entry);
+                       wake_up(&entry->ue_waitq);
+               }
+       } else if (!UC_CACHE_IS_INVALID(entry) && writelock) {
                UC_CACHE_SET_EXPIRED(entry);
        }
 
-       list_del_init(&entry->ue_hash);
-       if (!atomic_read(&entry->ue_refcount))
-               free_entry(cache, entry);
+       if (writelock) {
+               list_del_init(&entry->ue_hash);
+               if (!atomic_read(&entry->ue_refcount))
+                       free_entry(cache, entry);
+       }
        return 1;
 }
 
+int upcall_cache_set_upcall(struct upcall_cache *cache, const char *buffer,
+                           size_t count, bool path_only)
+{
+       char *upcall;
+
+       if (count >= UC_CACHE_UPCALL_MAXPATH)
+               return -E2BIG;
+
+       OBD_ALLOC(upcall, count + 1);
+       if (upcall == NULL)
+               return -ENOMEM;
+
+       /* Remove any extraneous bits from the upcall (e.g. linefeeds) */
+       if (sscanf(buffer, "%s", upcall) != 1)
+               goto invalid;
+
+       if (upcall[0] == '/')
+               goto valid;
+
+       if (path_only)
+               goto invalid;
+
+       if (strcasecmp(upcall, "NONE") == 0) {
+               snprintf(upcall, count + 1, "NONE");
+               goto valid;
+       }
+
+invalid:
+       OBD_FREE(upcall, count + 1);
+       return -EINVAL;
+
+valid:
+       down_write(&cache->uc_upcall_rwsem);
+       strcpy(cache->uc_upcall, upcall);
+       up_write(&cache->uc_upcall_rwsem);
+
+       OBD_FREE(upcall, count + 1);
+       return 0;
+}
+EXPORT_SYMBOL(upcall_cache_set_upcall);
+
 static inline int refresh_entry(struct upcall_cache *cache,
                         struct upcall_cache_entry *entry)
 {
@@ -151,20 +203,31 @@ struct upcall_cache_entry *upcall_cache_get_entry(struct upcall_cache *cache,
                                                  __u64 key, void *args)
 {
        struct upcall_cache_entry *entry = NULL, *new = NULL, *next;
+       bool failedacquiring = false;
        struct list_head *head;
-       wait_queue_t wait;
+       wait_queue_entry_t wait;
+       bool writelock;
        int rc, found;
+
        ENTRY;
 
        LASSERT(cache);
 
-       head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
+       head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key,
+                                                       cache->uc_hashsize)];
 find_again:
        found = 0;
-       spin_lock(&cache->uc_lock);
+       if (new) {
+               write_lock(&cache->uc_lock);
+               writelock = true;
+       } else {
+               read_lock(&cache->uc_lock);
+               writelock = false;
+       }
+find_with_lock:
        list_for_each_entry_safe(entry, next, head, ue_hash) {
                /* check invalid & expired items */
-               if (check_unlink_entry(cache, entry))
+               if (check_unlink_entry(cache, entry, writelock))
                        continue;
                if (upcall_compare(cache, entry, key, args) == 0) {
                        found = 1;
@@ -174,10 +237,14 @@ find_again:
 
        if (!found) {
                if (!new) {
-                       spin_unlock(&cache->uc_lock);
+                       if (writelock)
+                               write_unlock(&cache->uc_lock);
+                       else
+                               read_unlock(&cache->uc_lock);
                        new = alloc_entry(cache, key, args);
                        if (!new) {
-                               CERROR("fail to alloc entry\n");
+                               CERROR("%s: fail to alloc entry: rc = %d\n",
+                                      cache->uc_name, -ENOMEM);
                                RETURN(ERR_PTR(-ENOMEM));
                        }
                        goto find_again;
@@ -189,24 +256,33 @@ find_again:
                if (new) {
                        free_entry(cache, new);
                        new = NULL;
+               } else if (!writelock) {
+                       /* We found an entry while holding the read lock, so
+                        * convert it to a write lock and find again, to check
+                        * that entry was not modified/freed in between.
+                        */
+                       write_lock_from_read(&cache->uc_lock, &writelock);
+                       found = 0;
+                       goto find_with_lock;
                }
                list_move(&entry->ue_hash, head);
        }
+       /* now we hold a write lock */
        get_entry(entry);
 
        /* acquire for new one */
        if (UC_CACHE_IS_NEW(entry)) {
                UC_CACHE_SET_ACQUIRING(entry);
                UC_CACHE_CLEAR_NEW(entry);
-               spin_unlock(&cache->uc_lock);
+               write_unlock(&cache->uc_lock);
                rc = refresh_entry(cache, entry);
-               spin_lock(&cache->uc_lock);
-               entry->ue_acquire_expire =
-                       cfs_time_shift(cache->uc_acquire_expire);
+               write_lock(&cache->uc_lock);
+               entry->ue_acquire_expire = ktime_get_seconds() +
+                                          cache->uc_acquire_expire;
                if (rc < 0) {
                        UC_CACHE_CLEAR_ACQUIRING(entry);
                        UC_CACHE_SET_INVALID(entry);
-                       wake_up_all(&entry->ue_waitq);
+                       wake_up(&entry->ue_waitq);
                        if (unlikely(rc == -EREMCHG)) {
                                put_entry(cache, entry);
                                GOTO(out, entry = ERR_PTR(rc));
@@ -221,22 +297,37 @@ find_again:
                              MAX_SCHEDULE_TIMEOUT;
                long left;
 
-               init_waitqueue_entry_current(&wait);
+               init_wait(&wait);
                add_wait_queue(&entry->ue_waitq, &wait);
                set_current_state(TASK_INTERRUPTIBLE);
-               spin_unlock(&cache->uc_lock);
+               write_unlock(&cache->uc_lock);
 
-               left = waitq_timedwait(&wait, TASK_INTERRUPTIBLE,
-                                          expiry);
+               left = schedule_timeout(expiry);
 
-               spin_lock(&cache->uc_lock);
+               write_lock(&cache->uc_lock);
                remove_wait_queue(&entry->ue_waitq, &wait);
                if (UC_CACHE_IS_ACQUIRING(entry)) {
                        /* we're interrupted or upcall failed in the middle */
                        rc = left > 0 ? -EINTR : -ETIMEDOUT;
-                       CERROR("acquire for key "LPU64": error %d\n",
-                              entry->ue_key, rc);
+                       /* if we waited uc_acquire_expire, we can try again
+                        * with same data, but only if acquire is replayable
+                        */
+                       if (left <= 0 && !cache->uc_acquire_replay)
+                               failedacquiring = true;
                        put_entry(cache, entry);
+                       if (!failedacquiring) {
+                               write_unlock(&cache->uc_lock);
+                               failedacquiring = true;
+                               new = NULL;
+                               CDEBUG(D_OTHER,
+                                      "retry acquire for key %llu (got %d)\n",
+                                      entry->ue_key, rc);
+                               goto find_again;
+                       }
+                       wake_up_all(&entry->ue_waitq);
+                       CERROR("%s: acquire for key %lld after %llu: rc = %d\n",
+                              cache->uc_name, entry->ue_key,
+                              cache->uc_acquire_expire, rc);
                        GOTO(out, entry = ERR_PTR(rc));
                }
        }
@@ -251,15 +342,16 @@ find_again:
         * We can't refresh the existing one because some
         * memory might be shared by multiple processes.
         */
-       if (check_unlink_entry(cache, entry)) {
+       if (check_unlink_entry(cache, entry, writelock)) {
                /* if expired, try again. but if this entry is
                 * created by me but too quickly turn to expired
                 * without any error, should at least give a
                 * chance to use it once.
                 */
                if (entry != new) {
+                       /* as stated above, we already hold a write lock */
                        put_entry(cache, entry);
-                       spin_unlock(&cache->uc_lock);
+                       write_unlock(&cache->uc_lock);
                        new = NULL;
                        goto find_again;
                }
@@ -267,11 +359,34 @@ find_again:
 
        /* Now we know it's good */
 out:
-       spin_unlock(&cache->uc_lock);
+       if (writelock)
+               write_unlock(&cache->uc_lock);
+       else
+               read_unlock(&cache->uc_lock);
        RETURN(entry);
 }
 EXPORT_SYMBOL(upcall_cache_get_entry);
 
+void upcall_cache_get_entry_raw(struct upcall_cache_entry *entry)
+{
+       get_entry(entry);
+}
+EXPORT_SYMBOL(upcall_cache_get_entry_raw);
+
+void upcall_cache_update_entry(struct upcall_cache *cache,
+                              struct upcall_cache_entry *entry,
+                              time64_t expire, int state)
+{
+       write_lock(&cache->uc_lock);
+       entry->ue_expire = expire;
+       if (!state)
+               UC_CACHE_SET_VALID(entry);
+       else
+               entry->ue_flags |= state;
+       write_unlock(&cache->uc_lock);
+}
+EXPORT_SYMBOL(upcall_cache_update_entry);
+
 void upcall_cache_put_entry(struct upcall_cache *cache,
                            struct upcall_cache_entry *entry)
 {
@@ -283,9 +398,9 @@ void upcall_cache_put_entry(struct upcall_cache *cache,
        }
 
        LASSERT(atomic_read(&entry->ue_refcount) > 0);
-       spin_lock(&cache->uc_lock);
+       write_lock(&cache->uc_lock);
        put_entry(cache, entry);
-       spin_unlock(&cache->uc_lock);
+       write_unlock(&cache->uc_lock);
        EXIT;
 }
 EXPORT_SYMBOL(upcall_cache_put_entry);
@@ -296,13 +411,15 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key,
        struct upcall_cache_entry *entry = NULL;
        struct list_head *head;
        int found = 0, rc = 0;
+       bool writelock = false;
        ENTRY;
 
        LASSERT(cache);
 
-       head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
+       head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key,
+                                                       cache->uc_hashsize)];
 
-       spin_lock(&cache->uc_lock);
+       read_lock(&cache->uc_lock);
        list_for_each_entry(entry, head, ue_hash) {
                if (downcall_compare(cache, entry, key, args) == 0) {
                        found = 1;
@@ -312,51 +429,56 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key,
        }
 
        if (!found) {
-               CDEBUG(D_OTHER, "%s: upcall for key "LPU64" not expected\n",
+               CDEBUG(D_OTHER, "%s: upcall for key %llu not expected\n",
                       cache->uc_name, key);
                /* haven't found, it's possible */
-               spin_unlock(&cache->uc_lock);
+               read_unlock(&cache->uc_lock);
                RETURN(-EINVAL);
        }
 
        if (err) {
-               CDEBUG(D_OTHER, "%s: upcall for key "LPU64" returned %d\n",
+               CDEBUG(D_OTHER, "%s: upcall for key %llu returned %d\n",
                       cache->uc_name, entry->ue_key, err);
-               GOTO(out, rc = -EINVAL);
+               write_lock_from_read(&cache->uc_lock, &writelock);
+               GOTO(out, rc = err);
        }
 
        if (!UC_CACHE_IS_ACQUIRING(entry)) {
-               CDEBUG(D_RPCTRACE, "%s: found uptodate entry %p (key "LPU64")"
+               CDEBUG(D_RPCTRACE, "%s: found uptodate entry %p (key %llu)"
                       "\n", cache->uc_name, entry, entry->ue_key);
+               write_lock_from_read(&cache->uc_lock, &writelock);
                GOTO(out, rc = 0);
        }
 
        if (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry)) {
-               CERROR("%s: found a stale entry %p (key "LPU64") in ioctl\n",
+               CERROR("%s: found a stale entry %p (key %llu) in ioctl\n",
                       cache->uc_name, entry, entry->ue_key);
+               write_lock_from_read(&cache->uc_lock, &writelock);
                GOTO(out, rc = -EINVAL);
        }
 
-       spin_unlock(&cache->uc_lock);
+       read_unlock(&cache->uc_lock);
        if (cache->uc_ops->parse_downcall)
                rc = cache->uc_ops->parse_downcall(cache, entry, args);
-       spin_lock(&cache->uc_lock);
+       write_lock(&cache->uc_lock);
        if (rc)
                GOTO(out, rc);
 
-       entry->ue_expire = cfs_time_shift(cache->uc_entry_expire);
+       if (!entry->ue_expire)
+               entry->ue_expire = ktime_get_seconds() + cache->uc_entry_expire;
        UC_CACHE_SET_VALID(entry);
-       CDEBUG(D_OTHER, "%s: created upcall cache entry %p for key "LPU64"\n",
+       CDEBUG(D_OTHER, "%s: created upcall cache entry %p for key %llu\n",
               cache->uc_name, entry, entry->ue_key);
 out:
+       /* 'goto out' needs to make sure to take a write lock first */
        if (rc) {
                UC_CACHE_SET_INVALID(entry);
                list_del_init(&entry->ue_hash);
        }
        UC_CACHE_CLEAR_ACQUIRING(entry);
-       spin_unlock(&cache->uc_lock);
-       wake_up_all(&entry->ue_waitq);
+       wake_up(&entry->ue_waitq);
        put_entry(cache, entry);
+       write_unlock(&cache->uc_lock);
 
        RETURN(rc);
 }
@@ -368,8 +490,8 @@ void upcall_cache_flush(struct upcall_cache *cache, int force)
        int i;
        ENTRY;
 
-       spin_lock(&cache->uc_lock);
-       for (i = 0; i < UC_CACHE_HASH_SIZE; i++) {
+       write_lock(&cache->uc_lock);
+       for (i = 0; i < cache->uc_hashsize; i++) {
                list_for_each_entry_safe(entry, next,
                                         &cache->uc_hashtable[i], ue_hash) {
                        if (!force && atomic_read(&entry->ue_refcount)) {
@@ -380,7 +502,7 @@ void upcall_cache_flush(struct upcall_cache *cache, int force)
                        free_entry(cache, entry);
                }
        }
-       spin_unlock(&cache->uc_lock);
+       write_unlock(&cache->uc_lock);
        EXIT;
 }
 EXPORT_SYMBOL(upcall_cache_flush);
@@ -392,9 +514,10 @@ void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key, void *args)
        int found = 0;
        ENTRY;
 
-       head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key)];
+       head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key,
+                                                       cache->uc_hashsize)];
 
-       spin_lock(&cache->uc_lock);
+       write_lock(&cache->uc_lock);
        list_for_each_entry(entry, head, ue_hash) {
                if (upcall_compare(cache, entry, key, args) == 0) {
                        found = 1;
@@ -403,21 +526,23 @@ void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key, void *args)
        }
 
        if (found) {
-               CWARN("%s: flush entry %p: key "LPU64", ref %d, fl %x, "
-                     "cur %lu, ex %ld/%ld\n",
+               CWARN("%s: flush entry %p: key %llu, ref %d, fl %x, "
+                     "cur %lld, ex %lld/%lld\n",
                      cache->uc_name, entry, entry->ue_key,
                      atomic_read(&entry->ue_refcount), entry->ue_flags,
-                     cfs_time_current_sec(), entry->ue_acquire_expire,
+                     ktime_get_real_seconds(), entry->ue_acquire_expire,
                      entry->ue_expire);
+               get_entry(entry);
                UC_CACHE_SET_EXPIRED(entry);
-               if (!atomic_read(&entry->ue_refcount))
-                       free_entry(cache, entry);
+               put_entry(cache, entry);
        }
-       spin_unlock(&cache->uc_lock);
+       write_unlock(&cache->uc_lock);
 }
 EXPORT_SYMBOL(upcall_cache_flush_one);
 
 struct upcall_cache *upcall_cache_init(const char *name, const char *upcall,
+                                      int hashsz, time64_t entry_expire,
+                                      time64_t acquire_expire, bool replayable,
                                       struct upcall_cache_ops *ops)
 {
        struct upcall_cache *cache;
@@ -428,15 +553,21 @@ struct upcall_cache *upcall_cache_init(const char *name, const char *upcall,
        if (!cache)
                RETURN(ERR_PTR(-ENOMEM));
 
-       spin_lock_init(&cache->uc_lock);
-       rwlock_init(&cache->uc_upcall_rwlock);
-       for (i = 0; i < UC_CACHE_HASH_SIZE; i++)
+       rwlock_init(&cache->uc_lock);
+       init_rwsem(&cache->uc_upcall_rwsem);
+       cache->uc_hashsize = hashsz;
+       LIBCFS_ALLOC(cache->uc_hashtable,
+                    sizeof(*cache->uc_hashtable) * cache->uc_hashsize);
+       if (!cache->uc_hashtable)
+               RETURN(ERR_PTR(-ENOMEM));
+       for (i = 0; i < cache->uc_hashsize; i++)
                INIT_LIST_HEAD(&cache->uc_hashtable[i]);
-       strlcpy(cache->uc_name, name, sizeof(cache->uc_name));
+       strscpy(cache->uc_name, name, sizeof(cache->uc_name));
        /* upcall pathname proc tunable */
-       strlcpy(cache->uc_upcall, upcall, sizeof(cache->uc_upcall));
-       cache->uc_entry_expire = 20 * 60;
-       cache->uc_acquire_expire = 30;
+       strscpy(cache->uc_upcall, upcall, sizeof(cache->uc_upcall));
+       cache->uc_entry_expire = entry_expire;
+       cache->uc_acquire_expire = acquire_expire;
+       cache->uc_acquire_replay = replayable;
        cache->uc_ops = ops;
 
        RETURN(cache);
@@ -448,6 +579,8 @@ void upcall_cache_cleanup(struct upcall_cache *cache)
        if (!cache)
                return;
        upcall_cache_flush_all(cache);
+       LIBCFS_FREE(cache->uc_hashtable,
+                   sizeof(*cache->uc_hashtable) * cache->uc_hashsize);
        LIBCFS_FREE(cache, sizeof(*cache));
 }
 EXPORT_SYMBOL(upcall_cache_cleanup);