From: NeilBrown Date: Fri, 28 Dec 2018 14:26:29 +0000 (-0500) Subject: LU-11089 obdclass: use an rwsem instead of lu_key_initing_cnt. X-Git-Tag: 2.12.52~52 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=99bb9f91f5c5ca6a380b22efa04a3c00c8f520ca LU-11089 obdclass: use an rwsem instead of lu_key_initing_cnt. The main use of lu_key_initing_cnt is to wait for it to be zero, so that lu_context_key_quiesce() can continue. This is a lot like the behavior of a semaphore. So use an rwsem instead. When keys_fill() calls down_read() it will opportunistically spin while the writer is running. As the writer is very short - just setting a bit for keys_fill() to see, this is likey to always be the case. lu_context_key_quiesce() will now, if necessary, go to sleep until woken, rather than spin repeatedly calling schedule. Code is much more readable this way and lu_keys_guard is no longer involved in this locking. We can remove the write_lock from lu_context_key_revive() as there is nothing to protect against. This already mustn't race with lu_context_key_quiesce(), and if keys_fill() runs concurrently and doesn't see that LCT_QUIESCENT has been cleared, it hardly matters. After it is cleared, lu_context_refill() will need to be run anyway. Change-Id: Id183a9372ca42267cc50f2547823585ff383ea1d Signed-off-by: NeilBrown Signed-off-by: James Simmons Reviewed-on: https://review.whamcloud.com/32712 Reviewed-by: Mike Pershin Reviewed-by: Alex Zhuravlev Reviewed-by: Gu Zheng Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index 16bb8bb..63729fe 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -1340,7 +1340,7 @@ enum { static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, }; DEFINE_RWLOCK(lu_keys_guard); -static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0); +static DECLARE_RWSEM(lu_key_initing); /** * Global counter incremented whenever key is registered, unregistered, @@ -1551,40 +1551,25 @@ void lu_context_key_quiesce(struct lu_context_key *key) if (!(key->lct_tags & LCT_QUIESCENT)) { /* - * XXX memory barrier has to go here. + * The write-lock on lu_key_initing will ensure that any + * keys_fill() which didn't see LCT_QUIESCENT will have + * finished before we call key_fini(). */ - write_lock(&lu_keys_guard); + down_write(&lu_key_initing); key->lct_tags |= LCT_QUIESCENT; + up_write(&lu_key_initing); - /** - * Wait until all lu_context_key::lct_init() methods - * have completed. - */ - while (atomic_read(&lu_key_initing_cnt) > 0) { - write_unlock(&lu_keys_guard); - CDEBUG(D_INFO, "lu_context_key_quiesce: \"%s\"" - " %p, %d (%d)\n", - key->lct_owner ? key->lct_owner->name : "", - key, atomic_read(&key->lct_used), - atomic_read(&lu_key_initing_cnt)); - schedule(); - write_lock(&lu_keys_guard); - } - - list_for_each_entry(ctx, &lu_context_remembered, - lc_remember) + write_lock(&lu_keys_guard); + list_for_each_entry(ctx, &lu_context_remembered, lc_remember) key_fini(ctx, key->lct_index); - write_unlock(&lu_keys_guard); } } void lu_context_key_revive(struct lu_context_key *key) { - write_lock(&lu_keys_guard); key->lct_tags &= ~LCT_QUIESCENT; atomic_inc(&key_set_version); - write_unlock(&lu_keys_guard); } static void keys_fini(struct lu_context *ctx) @@ -1604,19 +1589,16 @@ static void keys_fini(struct lu_context *ctx) static int keys_fill(struct lu_context *ctx) { unsigned int i; + int rc = 0; /* - * A serialisation with lu_context_key_quiesce() is needed, but some - * "key->lct_init()" are calling kernel memory allocation routine and - * can't be called while holding a spin_lock. - * "lu_keys_guard" is held while incrementing "lu_key_initing_cnt" - * to ensure the start of the serialisation. - * An atomic_t variable is still used, in order not to reacquire the - * lock when decrementing the counter. + * A serialisation with lu_context_key_quiesce() is needed, to + * ensure we see LCT_QUIESCENT and don't allocate a new value + * after it freed one. The rwsem provides this. As down_read() + * does optimistic spinning while the writer is active, this is + * unlikely to ever sleep. */ - read_lock(&lu_keys_guard); - atomic_inc(&lu_key_initing_cnt); - read_unlock(&lu_keys_guard); + down_read(&lu_key_initing); ctx->lc_version = atomic_read(&key_set_version); LINVRNT(ctx->lc_value); @@ -1645,8 +1627,8 @@ static int keys_fill(struct lu_context *ctx) value = key->lct_init(ctx, key); if (unlikely(IS_ERR(value))) { - atomic_dec(&lu_key_initing_cnt); - return PTR_ERR(value); + rc = PTR_ERR(value); + break; } lu_ref_add_atomic(&key->lct_reference, "ctx", ctx); @@ -1662,8 +1644,8 @@ static int keys_fill(struct lu_context *ctx) } } - atomic_dec(&lu_key_initing_cnt); - return 0; + up_read(&lu_key_initing); + return rc; } static int keys_init(struct lu_context *ctx)