From 548b8ea916625b3697b0e0d0abfc330b4c954633 Mon Sep 17 00:00:00 2001 From: Nikitas Angelinas Date: Wed, 11 Sep 2013 23:43:23 +0100 Subject: [PATCH] LU-3430 nrs: Fix a race condition in the ORR policy Checking the atomic oo_ref in nrs_orr_hop_put_free() and then taking the bucket lock leaves a race window open, by which a second thread can attempt to free an already-freed nrs_orr_object. Fix this, and change the hash bucket lock type from an rwlock to a spinlock, as there are now as many calls on the write path, as there are on the read path. Rehashing is not used on the hashes, so libcfs_hash API usage can also be simplified. Signed-off-by: Nikitas Angelinas Change-Id: I2e16320c5e4b0803e1bef428f26ea18b0341ac5f Xyratex-bug-id: MRP-1294 Reviewed-on: http://review.whamcloud.com/7623 Tested-by: Maloo Reviewed-by: Liang Zhen Reviewed-by: Emoly Liu Reviewed-by: Li Xi Reviewed-by: Oleg Drokin --- lustre/include/lustre_net.h | 4 ++-- lustre/ptlrpc/nrs_orr.c | 50 ++++++++++++++++----------------------------- 2 files changed, 20 insertions(+), 34 deletions(-) diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 37f1974..de65401 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -1579,7 +1579,7 @@ struct nrs_orr_data { struct ptlrpc_nrs_resource od_res; cfs_binheap_t *od_binheap; cfs_hash_t *od_obj_hash; - struct kmem_cache *od_cache; + struct kmem_cache *od_cache; /** * Used when a new scheduling round commences, in order to synchronize * all object or OST batches with the new round number. @@ -1636,7 +1636,7 @@ struct nrs_orr_object { * scheduling RPCs */ struct nrs_orr_key oo_key; - cfs_atomic_t oo_ref; + long oo_ref; /** * Round Robin quantum; the maximum number of RPCs that are allowed to * be scheduled for the object or OST in a single batch of each round. diff --git a/lustre/ptlrpc/nrs_orr.c b/lustre/ptlrpc/nrs_orr.c index 65b2f7a..b723bca 100644 --- a/lustre/ptlrpc/nrs_orr.c +++ b/lustre/ptlrpc/nrs_orr.c @@ -378,11 +378,11 @@ static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name) */ #define NRS_ORR_BITS 24 #define NRS_ORR_BKT_BITS 12 -#define NRS_ORR_HASH_FLAGS (CFS_HASH_RW_BKTLOCK | CFS_HASH_ASSERT_EMPTY) +#define NRS_ORR_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | CFS_HASH_ASSERT_EMPTY) #define NRS_TRR_BITS 4 #define NRS_TRR_BKT_BITS 2 -#define NRS_TRR_HASH_FLAGS CFS_HASH_RW_BKTLOCK +#define NRS_TRR_HASH_FLAGS CFS_HASH_SPIN_BKTLOCK static unsigned nrs_orr_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask) { @@ -417,7 +417,7 @@ static void nrs_orr_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) struct nrs_orr_object *orro = cfs_hlist_entry(hnode, struct nrs_orr_object, oo_hnode); - cfs_atomic_inc(&orro->oo_ref); + orro->oo_ref++; } /** @@ -431,34 +431,21 @@ static void nrs_orr_hop_put_free(cfs_hash_t *hs, cfs_hlist_node_t *hnode) oo_hnode); struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent, struct nrs_orr_data, od_res); - cfs_hash_bd_t bds[2]; + cfs_hash_bd_t bd; - if (cfs_atomic_dec_return(&orro->oo_ref) > 1) - return; + cfs_hash_bd_get_and_lock(hs, &orro->oo_key, &bd, 1); - cfs_hash_lock(hs, 0); - cfs_hash_dual_bd_get_and_lock(hs, &orro->oo_key, bds, 1); + if (--orro->oo_ref > 1) { + cfs_hash_bd_unlock(hs, &bd, 1); - /** - * Another thread may have won the race and taken a reference on the - * nrs_orr_object. - */ - if (cfs_atomic_read(&orro->oo_ref) > 1) - goto lost_race; + return; + } + LASSERT(orro->oo_ref == 1); - if (bds[1].bd_bucket == NULL) - cfs_hash_bd_del_locked(hs, &bds[0], hnode); - else - hnode = cfs_hash_dual_bd_finddel_locked(hs, bds, &orro->oo_key, - hnode); - LASSERT(hnode != NULL); + cfs_hash_bd_del_locked(hs, &bd, hnode); + cfs_hash_bd_unlock(hs, &bd, 1); OBD_SLAB_FREE_PTR(orro, orrd->od_cache); - -lost_race: - - cfs_hash_dual_bd_unlock(hs, bds, 1); - cfs_hash_unlock(hs, 0); } static void nrs_orr_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode) @@ -466,7 +453,7 @@ static void nrs_orr_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode) struct nrs_orr_object *orro = cfs_hlist_entry(hnode, struct nrs_orr_object, oo_hnode); - cfs_atomic_dec(&orro->oo_ref); + orro->oo_ref--; } static int nrs_trr_hop_keycmp(const void *key, cfs_hlist_node_t *hnode) @@ -486,9 +473,9 @@ static void nrs_trr_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode) struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent, struct nrs_orr_data, od_res); - LASSERTF(cfs_atomic_read(&orro->oo_ref) == 0, - "Busy NRS TRR policy object for OST with index %u, with %d " - "refs\n", orro->oo_key.ok_idx, cfs_atomic_read(&orro->oo_ref)); + LASSERTF(orro->oo_ref == 0, + "Busy NRS TRR policy object for OST with index %u, with %ld " + "refs\n", orro->oo_key.ok_idx, orro->oo_ref); OBD_SLAB_FREE_PTR(orro, orrd->od_cache); } @@ -882,13 +869,12 @@ int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy, OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache, nrs_pol2cptab(policy), nrs_pol2cptid(policy), - (moving_req ? GFP_ATOMIC : - __GFP_IO)); + moving_req ? GFP_ATOMIC : __GFP_IO); if (orro == NULL) RETURN(-ENOMEM); orro->oo_key = key; - cfs_atomic_set(&orro->oo_ref, 1); + orro->oo_ref = 1; tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key, &orro->oo_hnode); -- 1.8.3.1