From: Sebastien Buisson Date: Thu, 14 Sep 2023 16:00:04 +0000 (+0200) Subject: LU-16498 obdclass: change uc_lock to rwlock X-Git-Tag: 2.15.61~29 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=c7833db8e0bbed29908d5ec00dccc2a5ddac5305;p=fs%2Flustre-release.git LU-16498 obdclass: change uc_lock to rwlock Change the upcall cache uc_lock to a read-write lock so that threads can get the read lock to do concurrent lookups in the upcall cache, and only grab the write lock in the rare case when a new entry is added or old entries are expired. That reduces serialization between server threads during normal operation, and avoids all of the threads spinning for some time if the requested key (UID or gss context) is not in the cache at all, before they sleep. Test-Parameters: kerberos=true testlist=sanity-krb5 Signed-off-by: Sebastien Buisson Change-Id: I812400104fd2115d19386fb4a03bb3ce99c49383 Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/52395 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Aurelien Degremont Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/upcall_cache.h b/lustre/include/upcall_cache.h index e5891aa..fa2c590 100644 --- a/lustre/include/upcall_cache.h +++ b/lustre/include/upcall_cache.h @@ -140,7 +140,7 @@ struct upcall_cache_ops { struct upcall_cache { struct list_head *uc_hashtable; int uc_hashsize; - spinlock_t uc_lock; + rwlock_t uc_lock; struct rw_semaphore uc_upcall_rwsem; char uc_name[40]; /* for upcall */ diff --git a/lustre/obdclass/upcall_cache.c b/lustre/obdclass/upcall_cache.c index cdf2e44..39eb199 100644 --- a/lustre/obdclass/upcall_cache.c +++ b/lustre/obdclass/upcall_cache.c @@ -112,8 +112,18 @@ static inline void put_entry(struct upcall_cache *cache, } } +static inline void write_lock_from_read(rwlock_t *lock, bool *writelock) +{ + if (!*writelock) { + read_unlock(lock); + write_lock(lock); + *writelock = true; + } +} + static int check_unlink_entry(struct upcall_cache *cache, - struct upcall_cache_entry *entry) + struct upcall_cache_entry *entry, + bool writelock) { time64_t now = ktime_get_seconds(); @@ -125,15 +135,19 @@ static int check_unlink_entry(struct upcall_cache *cache, now < entry->ue_acquire_expire) return 0; - UC_CACHE_SET_EXPIRED(entry); - wake_up(&entry->ue_waitq); - } else if (!UC_CACHE_IS_INVALID(entry)) { + if (writelock) { + UC_CACHE_SET_EXPIRED(entry); + wake_up(&entry->ue_waitq); + } + } else if (!UC_CACHE_IS_INVALID(entry) && writelock) { UC_CACHE_SET_EXPIRED(entry); } - list_del_init(&entry->ue_hash); - if (!atomic_read(&entry->ue_refcount)) - free_entry(cache, entry); + if (writelock) { + list_del_init(&entry->ue_hash); + if (!atomic_read(&entry->ue_refcount)) + free_entry(cache, entry); + } return 1; } @@ -151,7 +165,9 @@ struct upcall_cache_entry *upcall_cache_get_entry(struct upcall_cache *cache, bool failedacquiring = false; struct list_head *head; wait_queue_entry_t wait; + bool writelock; int rc, found; + ENTRY; LASSERT(cache); @@ -160,10 +176,17 @@ struct upcall_cache_entry *upcall_cache_get_entry(struct upcall_cache *cache, cache->uc_hashsize)]; find_again: found = 0; - spin_lock(&cache->uc_lock); + if (new) { + write_lock(&cache->uc_lock); + writelock = true; + } else { + read_lock(&cache->uc_lock); + writelock = false; + } +find_with_lock: list_for_each_entry_safe(entry, next, head, ue_hash) { /* check invalid & expired items */ - if (check_unlink_entry(cache, entry)) + if (check_unlink_entry(cache, entry, writelock)) continue; if (upcall_compare(cache, entry, key, args) == 0) { found = 1; @@ -173,10 +196,14 @@ find_again: if (!found) { if (!new) { - spin_unlock(&cache->uc_lock); + if (writelock) + write_unlock(&cache->uc_lock); + else + read_unlock(&cache->uc_lock); new = alloc_entry(cache, key, args); if (!new) { - CERROR("fail to alloc entry\n"); + CERROR("%s: fail to alloc entry: rc = %d\n", + cache->uc_name, -ENOMEM); RETURN(ERR_PTR(-ENOMEM)); } goto find_again; @@ -188,18 +215,27 @@ find_again: if (new) { free_entry(cache, new); new = NULL; + } else if (!writelock) { + /* We found an entry while holding the read lock, so + * convert it to a write lock and find again, to check + * that entry was not modified/freed in between. + */ + write_lock_from_read(&cache->uc_lock, &writelock); + found = 0; + goto find_with_lock; } list_move(&entry->ue_hash, head); } + /* now we hold a write lock */ get_entry(entry); /* acquire for new one */ if (UC_CACHE_IS_NEW(entry)) { UC_CACHE_SET_ACQUIRING(entry); UC_CACHE_CLEAR_NEW(entry); - spin_unlock(&cache->uc_lock); + write_unlock(&cache->uc_lock); rc = refresh_entry(cache, entry); - spin_lock(&cache->uc_lock); + write_lock(&cache->uc_lock); entry->ue_acquire_expire = ktime_get_seconds() + cache->uc_acquire_expire; if (rc < 0) { @@ -223,11 +259,11 @@ find_again: init_wait(&wait); add_wait_queue(&entry->ue_waitq, &wait); set_current_state(TASK_INTERRUPTIBLE); - spin_unlock(&cache->uc_lock); + write_unlock(&cache->uc_lock); left = schedule_timeout(expiry); - spin_lock(&cache->uc_lock); + write_lock(&cache->uc_lock); remove_wait_queue(&entry->ue_waitq, &wait); if (UC_CACHE_IS_ACQUIRING(entry)) { /* we're interrupted or upcall failed in the middle */ @@ -239,7 +275,7 @@ find_again: failedacquiring = true; put_entry(cache, entry); if (!failedacquiring) { - spin_unlock(&cache->uc_lock); + write_unlock(&cache->uc_lock); failedacquiring = true; new = NULL; CDEBUG(D_OTHER, @@ -247,8 +283,9 @@ find_again: entry->ue_key, rc); goto find_again; } - CERROR("acquire for key %llu: error %d\n", - entry->ue_key, rc); + CERROR("%s: acquire for key %lld after %llu: rc = %d\n", + cache->uc_name, entry->ue_key, + cache->uc_acquire_expire, rc); GOTO(out, entry = ERR_PTR(rc)); } } @@ -263,15 +300,16 @@ find_again: * We can't refresh the existing one because some * memory might be shared by multiple processes. */ - if (check_unlink_entry(cache, entry)) { + if (check_unlink_entry(cache, entry, writelock)) { /* if expired, try again. but if this entry is * created by me but too quickly turn to expired * without any error, should at least give a * chance to use it once. */ if (entry != new) { + /* as stated above, we already hold a write lock */ put_entry(cache, entry); - spin_unlock(&cache->uc_lock); + write_unlock(&cache->uc_lock); new = NULL; goto find_again; } @@ -279,7 +317,10 @@ find_again: /* Now we know it's good */ out: - spin_unlock(&cache->uc_lock); + if (writelock) + write_unlock(&cache->uc_lock); + else + read_unlock(&cache->uc_lock); RETURN(entry); } EXPORT_SYMBOL(upcall_cache_get_entry); @@ -294,13 +335,13 @@ void upcall_cache_update_entry(struct upcall_cache *cache, struct upcall_cache_entry *entry, time64_t expire, int state) { - spin_lock(&cache->uc_lock); + write_lock(&cache->uc_lock); entry->ue_expire = expire; if (!state) UC_CACHE_SET_VALID(entry); else entry->ue_flags |= state; - spin_unlock(&cache->uc_lock); + write_unlock(&cache->uc_lock); } EXPORT_SYMBOL(upcall_cache_update_entry); @@ -315,9 +356,9 @@ void upcall_cache_put_entry(struct upcall_cache *cache, } LASSERT(atomic_read(&entry->ue_refcount) > 0); - spin_lock(&cache->uc_lock); + write_lock(&cache->uc_lock); put_entry(cache, entry); - spin_unlock(&cache->uc_lock); + write_unlock(&cache->uc_lock); EXIT; } EXPORT_SYMBOL(upcall_cache_put_entry); @@ -328,6 +369,7 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key, struct upcall_cache_entry *entry = NULL; struct list_head *head; int found = 0, rc = 0; + bool writelock = false; ENTRY; LASSERT(cache); @@ -335,7 +377,7 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key, head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key, cache->uc_hashsize)]; - spin_lock(&cache->uc_lock); + read_lock(&cache->uc_lock); list_for_each_entry(entry, head, ue_hash) { if (downcall_compare(cache, entry, key, args) == 0) { found = 1; @@ -348,32 +390,35 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key, CDEBUG(D_OTHER, "%s: upcall for key %llu not expected\n", cache->uc_name, key); /* haven't found, it's possible */ - spin_unlock(&cache->uc_lock); + read_unlock(&cache->uc_lock); RETURN(-EINVAL); } if (err) { CDEBUG(D_OTHER, "%s: upcall for key %llu returned %d\n", cache->uc_name, entry->ue_key, err); + write_lock_from_read(&cache->uc_lock, &writelock); GOTO(out, rc = err); } if (!UC_CACHE_IS_ACQUIRING(entry)) { CDEBUG(D_RPCTRACE, "%s: found uptodate entry %p (key %llu)" "\n", cache->uc_name, entry, entry->ue_key); + write_lock_from_read(&cache->uc_lock, &writelock); GOTO(out, rc = 0); } if (UC_CACHE_IS_INVALID(entry) || UC_CACHE_IS_EXPIRED(entry)) { CERROR("%s: found a stale entry %p (key %llu) in ioctl\n", cache->uc_name, entry, entry->ue_key); + write_lock_from_read(&cache->uc_lock, &writelock); GOTO(out, rc = -EINVAL); } - spin_unlock(&cache->uc_lock); + read_unlock(&cache->uc_lock); if (cache->uc_ops->parse_downcall) rc = cache->uc_ops->parse_downcall(cache, entry, args); - spin_lock(&cache->uc_lock); + write_lock(&cache->uc_lock); if (rc) GOTO(out, rc); @@ -383,14 +428,15 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u32 err, __u64 key, CDEBUG(D_OTHER, "%s: created upcall cache entry %p for key %llu\n", cache->uc_name, entry, entry->ue_key); out: + /* 'goto out' needs to make sure to take a write lock first */ if (rc) { UC_CACHE_SET_INVALID(entry); list_del_init(&entry->ue_hash); } UC_CACHE_CLEAR_ACQUIRING(entry); - spin_unlock(&cache->uc_lock); wake_up(&entry->ue_waitq); put_entry(cache, entry); + write_unlock(&cache->uc_lock); RETURN(rc); } @@ -402,7 +448,7 @@ void upcall_cache_flush(struct upcall_cache *cache, int force) int i; ENTRY; - spin_lock(&cache->uc_lock); + write_lock(&cache->uc_lock); for (i = 0; i < cache->uc_hashsize; i++) { list_for_each_entry_safe(entry, next, &cache->uc_hashtable[i], ue_hash) { @@ -414,7 +460,7 @@ void upcall_cache_flush(struct upcall_cache *cache, int force) free_entry(cache, entry); } } - spin_unlock(&cache->uc_lock); + write_unlock(&cache->uc_lock); EXIT; } EXPORT_SYMBOL(upcall_cache_flush); @@ -429,7 +475,7 @@ void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key, void *args) head = &cache->uc_hashtable[UC_CACHE_HASH_INDEX(key, cache->uc_hashsize)]; - spin_lock(&cache->uc_lock); + write_lock(&cache->uc_lock); list_for_each_entry(entry, head, ue_hash) { if (upcall_compare(cache, entry, key, args) == 0) { found = 1; @@ -444,11 +490,11 @@ void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key, void *args) atomic_read(&entry->ue_refcount), entry->ue_flags, ktime_get_real_seconds(), entry->ue_acquire_expire, entry->ue_expire); + get_entry(entry); UC_CACHE_SET_EXPIRED(entry); - if (!atomic_read(&entry->ue_refcount)) - free_entry(cache, entry); + put_entry(cache, entry); } - spin_unlock(&cache->uc_lock); + write_unlock(&cache->uc_lock); } EXPORT_SYMBOL(upcall_cache_flush_one); @@ -465,7 +511,7 @@ struct upcall_cache *upcall_cache_init(const char *name, const char *upcall, if (!cache) RETURN(ERR_PTR(-ENOMEM)); - spin_lock_init(&cache->uc_lock); + rwlock_init(&cache->uc_lock); init_rwsem(&cache->uc_upcall_rwsem); cache->uc_hashsize = hashsz; LIBCFS_ALLOC(cache->uc_hashtable,