Whamcloud - gitweb
LU-16498 obdclass: change uc_lock to rwlock 95/52395/13
authorSebastien Buisson <sbuisson@ddn.com>
Thu, 14 Sep 2023 16:00:04 +0000 (18:00 +0200)
committerOleg Drokin <green@whamcloud.com>
Thu, 15 Feb 2024 07:06:10 +0000 (07:06 +0000)
Change the upcall cache uc_lock to a read-write lock so that threads
can get the read lock to do concurrent lookups in the upcall cache,
and only grab the write lock in the rare case when a new entry is
added or old entries are expired. That reduces serialization between
server threads during normal operation, and avoids all of the threads
spinning for some time if the requested key (UID or gss context) is
not in the cache at all, before they sleep.

Test-Parameters: kerberos=true testlist=sanity-krb5
Signed-off-by: Sebastien Buisson <sbuisson@ddn.com>
Change-Id: I812400104fd2115d19386fb4a03bb3ce99c49383
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/52395
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Aurelien Degremont <adegremont@nvidia.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/upcall_cache.h
lustre/obdclass/upcall_cache.c

index e5891aa..fa2c590 100644 (file)
@@ -140,7 +140,7 @@ struct upcall_cache_ops {
 struct upcall_cache {
        struct list_head        *uc_hashtable;
        int                     uc_hashsize;
-       spinlock_t              uc_lock;
+       rwlock_t                uc_lock;
        struct rw_semaphore     uc_upcall_rwsem;
 
        char                    uc_name[40];            /* for upcall */
index cdf2e44..39eb199 100644 (file)
@@ -112,8 +112,18 @@ static inline void put_entry(struct upcall_cache *cache,
        }
 }
 
+static inline void write_lock_from_read(rwlock_t *lock, bool *writelock)
+{
+       if (!*writelock) {
+               read_unlock(lock);
+               write_lock(lock);
+               *writelock = true;
+       }
+}
+
 static int check_unlink_entry(struct upcall_cache *cache,
-                             struct upcall_cache_entry *entry)
+                             struct upcall_cache_entry *entry,
+                             bool writelock)
 {
        time64_t now = ktime_get_seconds();
 
@@ -125,15 +135,19 @@ static int check_unlink_entry(struct upcall_cache *cache,
                    now < entry->ue_acquire_expire)
                        return 0;
 
-               UC_CACHE_SET_EXPIRED(entry);
-               wake_up(&entry->ue_waitq);
-       } else if (!UC_CACHE_IS_INVALID(entry)) {
+               if (writelock) {
+                       UC_CACHE_SET_EXPIRED(entry);
+                       wake_up(&entry->ue_waitq);
+               }
+       } else if (!UC_CACHE_IS_INVALID(entry) && writelock) {
                UC_CACHE_SET_EXPIRED(entry);
        }
 
-       list_del_init(&entry->ue_hash);
-       if (!atomic_read(&entry->ue_refcount))
-               free_entry(cache, entry);
+       if (writelock) {
+               list_del_init(&entry->ue_hash);
+               if (!atomic_read(&entry->ue_refcount))
+                       free_entry(cache, entry);
+       }
        return 1;
 }
 
@@ -151,7 +165,9 @@ struct upcall_cache_entry *upcall_cache_get_entry(struct upcall_cache *cache,
        bool failedacquiring = false;
        struct list_head *head;
        wait_queue_entry_t wait;
+       bool writelock;
        int rc, found;
+
        ENTRY;
 
        LASSERT(cache);
@@ -160,10 +176,17 @@ struct upcall_cache_entry *upcall_cache_get_entry(struct upcall_cache *cache,
                                                        cache->uc_hashsize)];
 find_again:
        found = 0;
-       spin_lock(&cache->uc_lock);
+       if (new) {
+               write_lock(&cache->uc_lock);
+               writelock = true;
+       } else {
+               read_lock(&cache->uc_lock);
+               writelock = false;
+       }
+find_with_lock:
        list_for_each_entry_safe(entry, next, head, ue_hash) {
                /* check invalid & expired items */
-               if (check_unlink_entry(cache, entry))
+               if (check_unlink_entry(cache, entry, writelock))
                        continue;
                if (upcall_compare(cache, entry, key, args) == 0) {
                        found = 1;
@@ -173,10 +196,14 @@ find_again:
 
        if (!found) {
                if (!new) {
-                       spin_unlock(&cache->uc_lock);
+                       if (writelock)
+                               write_unlock(&cache->uc_lock);
+                       else
+                               read_unlock(&cache->uc_lock);
                        new = alloc_entry(cache, key, args);
                        if (!new) {
-                               CERROR("fail to alloc entry\n");
+                               CERROR("%s: fail to alloc entry: rc = %d\n",
+                                      cache->uc_name, -ENOMEM);
                                RETURN(ERR_PTR(-ENOMEM));
                        }
                        goto find_again;
@@ -188,18 +215,27 @@ find_again:
                if (new) {
                        free_entry(cache, new);
                        new = NULL;
+               } else if (!writelock) {
+                       /* We found an entry while holding the read lock, so
+                        * convert it to a write lock and find again, to check
+                        * that entry was not modified/freed in between.
+                        */
+                       write_lock_from_read(&cache->uc_lock, &writelock);
+                       found = 0;
+                       goto find_with_lock;
                }
                list_move(&entry->ue_hash, head);
        }
+       /* now we hold a write lock */
        get_entry(entry);
 
        /* acquire for new one */
        if (UC_CACHE_IS_NEW(entry)) {
                UC_CACHE_SET_ACQUIRING(entry);
                UC_CACHE_CLEAR_NEW(entry);
-               spin_unlock(&cache->uc_lock);
+               write_unlock(&cache->uc_lock);
                rc = refresh_entry(cache, entry);
-               spin_lock(&cache->uc_lock);
+               write_lock(&cache->uc_lock);
                entry->ue_acquire_expire = ktime_get_seconds() +
                                           cache->uc_acquire_expire;
                if (rc < 0) {
@@ -223,11 +259,11 @@ find_again:
                init_wait(&wait);
                add_wait_queue(&entry->ue_waitq, &wait);
                set_current_state(TASK_INTERRUPTIBLE);
-               spin_unlock(&cache->uc_lock);
+               write_unlock(&cache->uc_lock);
 
                left = schedule_timeout(expiry);
 
-               spin_lock(&cache->uc_lock);
+               write_lock(&cache->uc_lock);
                remove_wait_queue(&entry->ue_waitq, &wait);
                if (UC_CACHE_IS_ACQUIRING(entry)) {
                        /* we're interrupted or upcall failed in the middle */
@@ -239,7 +275,7 @@ find_again:
                                failedacquiring = true;
                        put_entry(cache, entry);
                        if (!failedacquiring) {
-                               spin_unlock(&cache->uc_lock);
+                               write_unlock(&cache->uc_lock);
                                failedacquiring = true;
                                new = NULL;
                                CDEBUG(D_OTHER,
@@ -247,8 +283,9 @@ find_again:
                                       entry->ue_key, rc);
                                goto find_again;
                        }
-                       CERROR("acquire for key %llu: error %d\n",
-                              entry->ue_key, rc);
+                       CERROR("%s: acquire for key %lld after %llu: rc = %d\n",
+                              cache->uc_name, entry->ue_key,
+                              cache->uc_acquire_expire, rc);
                        GOTO(out, entry = ERR_PTR(rc));
                }
        }
@@ -263,15 +300,16 @@ find_again:
         * We can't refresh the existing one because some
         * memory might be shared by multiple processes.
         */
-       if (check_unlink_entry(cache, entry)) {
+       if (check_unlink_entry(cache, entry, writelock)) {
                /* if expired, try again. but if this entry is
                 * created by me but too quickly turn to expired
                 * without any error, should at least give a
                 * chance to use it once.
                 */
                if (entry != new) {
+                       /* as stated above, we already hold a write lock */
                        put_entry(cache, entry);
-                       spin_unlock(&cache->uc_lock);
+                       write_unlock(&cache->uc_lock);
                        new = NULL;
                        goto find_again;
                }
@@ -279,7 +317,10 @@ find_again:
 
        /* Now we know it's good */
 out:
-       spin_unlock(&cache->uc_lock);
+       if (writelock)
+               write_unlock(&cache->uc_lock);
+       else
+               read_unlock(&cache->uc_lock);
        RETURN(entry);
 }
 EXPORT_SYMBOL(upcall_cache_get_entry);
@@ -294,13 +335,13 @@ void upcall_cache_update_entry(struct upcall_cache *cache,
                               struct upcall_cache_entry *entry,
                               time64_t expire, int state)
 {
-       spin_lock(&cache->uc_lock);
+       write_lock(&cache->uc_lock);
        entry->ue_expire = expire;
        if (!state)
                UC_CACHE_SET_VALID(entry);
        else
                entry->ue_flags |= state;
-       spin_unlock(&cache->uc_lock);
+       write_unlock(&cache->uc_lock);
 }
 EXPORT_SYMBOL(upcall_cache_update_entry);
 
@@ -315,9 +356,9 @@ void upcall_cache_put_entry(struct upcall_cache *cache,
        }
 
        LASSERT(atomic_read(&entry->ue_refcount) > 0);
-       spin_lock(&cache->uc_lock);
+       write_lock(&cache->uc_lock);
        put_entry(cache, entry);
-       spin_unlock(&cache->uc_lock);
+       write_unlock(&cache->uc_lock);
        EXIT;
 }
 EXPORT_SYMBOL(upcall_cache_put_entry);
@@ -328,6 +369,7 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key,
        struct upcall_cache_entry *entry = NULL;
        struct list_head *head;
        int found = 0, rc = 0;
+       bool writelock = false;
        ENTRY;
 
        LASSERT(cache);
@@ -335,7 +377,7 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key,
        head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key,
                                                        cache->uc_hashsize)];
 
-       spin_lock(&cache->uc_lock);
+       read_lock(&cache->uc_lock);
        list_for_each_entry(entry, head, ue_hash) {
                if (downcall_compare(cache, entry, key, args) == 0) {
                        found = 1;
@@ -348,32 +390,35 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key,
                CDEBUG(D_OTHER, "%s: upcall for key %llu not expected\n",
                       cache->uc_name, key);
                /* haven't found, it's possible */
-               spin_unlock(&cache->uc_lock);
+               read_unlock(&cache->uc_lock);
                RETURN(-EINVAL);
        }
 
        if (err) {
                CDEBUG(D_OTHER, "%s: upcall for key %llu returned %d\n",
                       cache->uc_name, entry->ue_key, err);
+               write_lock_from_read(&cache->uc_lock, &writelock);
                GOTO(out, rc = err);
        }
 
        if (!UC_CACHE_IS_ACQUIRING(entry)) {
                CDEBUG(D_RPCTRACE, "%s: found uptodate entry %p (key %llu)"
                       "\n", cache->uc_name, entry, entry->ue_key);
+               write_lock_from_read(&cache->uc_lock, &writelock);
                GOTO(out, rc = 0);
        }
 
        if (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry)) {
                CERROR("%s: found a stale entry %p (key %llu) in ioctl\n",
                       cache->uc_name, entry, entry->ue_key);
+               write_lock_from_read(&cache->uc_lock, &writelock);
                GOTO(out, rc = -EINVAL);
        }
 
-       spin_unlock(&cache->uc_lock);
+       read_unlock(&cache->uc_lock);
        if (cache->uc_ops->parse_downcall)
                rc = cache->uc_ops->parse_downcall(cache, entry, args);
-       spin_lock(&cache->uc_lock);
+       write_lock(&cache->uc_lock);
        if (rc)
                GOTO(out, rc);
 
@@ -383,14 +428,15 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key,
        CDEBUG(D_OTHER, "%s: created upcall cache entry %p for key %llu\n",
               cache->uc_name, entry, entry->ue_key);
 out:
+       /* 'goto out' needs to make sure to take a write lock first */
        if (rc) {
                UC_CACHE_SET_INVALID(entry);
                list_del_init(&entry->ue_hash);
        }
        UC_CACHE_CLEAR_ACQUIRING(entry);
-       spin_unlock(&cache->uc_lock);
        wake_up(&entry->ue_waitq);
        put_entry(cache, entry);
+       write_unlock(&cache->uc_lock);
 
        RETURN(rc);
 }
@@ -402,7 +448,7 @@ void upcall_cache_flush(struct upcall_cache *cache, int force)
        int i;
        ENTRY;
 
-       spin_lock(&cache->uc_lock);
+       write_lock(&cache->uc_lock);
        for (i = 0; i < cache->uc_hashsize; i++) {
                list_for_each_entry_safe(entry, next,
                                         &cache->uc_hashtable[i], ue_hash) {
@@ -414,7 +460,7 @@ void upcall_cache_flush(struct upcall_cache *cache, int force)
                        free_entry(cache, entry);
                }
        }
-       spin_unlock(&cache->uc_lock);
+       write_unlock(&cache->uc_lock);
        EXIT;
 }
 EXPORT_SYMBOL(upcall_cache_flush);
@@ -429,7 +475,7 @@ void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key, void *args)
        head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key,
                                                        cache->uc_hashsize)];
 
-       spin_lock(&cache->uc_lock);
+       write_lock(&cache->uc_lock);
        list_for_each_entry(entry, head, ue_hash) {
                if (upcall_compare(cache, entry, key, args) == 0) {
                        found = 1;
@@ -444,11 +490,11 @@ void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key, void *args)
                      atomic_read(&entry->ue_refcount), entry->ue_flags,
                      ktime_get_real_seconds(), entry->ue_acquire_expire,
                      entry->ue_expire);
+               get_entry(entry);
                UC_CACHE_SET_EXPIRED(entry);
-               if (!atomic_read(&entry->ue_refcount))
-                       free_entry(cache, entry);
+               put_entry(cache, entry);
        }
-       spin_unlock(&cache->uc_lock);
+       write_unlock(&cache->uc_lock);
 }
 EXPORT_SYMBOL(upcall_cache_flush_one);
 
@@ -465,7 +511,7 @@ struct upcall_cache *upcall_cache_init(const char *name, const char *upcall,
        if (!cache)
                RETURN(ERR_PTR(-ENOMEM));
 
-       spin_lock_init(&cache->uc_lock);
+       rwlock_init(&cache->uc_lock);
        init_rwsem(&cache->uc_upcall_rwsem);
        cache->uc_hashsize = hashsz;
        LIBCFS_ALLOC(cache->uc_hashtable,