From: Bob Glossman Date: Mon, 4 Nov 2013 16:33:16 +0000 (-0800) Subject: LU-3430 nrs: Fix a race condition in the ORR policy X-Git-Tag: 2.4.2-RC1~42 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=2bae04d48c6410c5498a71ec95a70dcd70c4cf22;p=fs%2Flustre-release.git LU-3430 nrs: Fix a race condition in the ORR policy Checking the atomic oo_ref in nrs_orr_hop_put_free() and then taking the bucket lock leaves a race window open, by which a second thread can attempt to free an already-freed nrs_orr_object. Fix this, and change the hash bucket lock type from an rwlock to a spinlock, as there are now as many calls on the write path, as there are on the read path. Rehashing is not used on the hashes, so libcfs_hash API usage can also be simplified. Lusrtr-commit: 548b8ea916625b3697b0e0d0abfc330b4c954633 Lustre-change: http://review.whamcloud.com/7623 Signed-off-by: Bob Glossman Signed-off-by: Nikitas Angelinas Xyratex-bug-id: MRP-1294 Reviewed-by: Liang Zhen Reviewed-by: Emoly Liu Reviewed-by: Li Xi Reviewed-by: Oleg Drokin Change-Id: I08365b9888d781cb2bcbf871bf74d089bcf7ba9f Reviewed-on: http://review.whamcloud.com/8162 Tested-by: Jenkins Tested-by: Maloo --- diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index a435957..2e1669b 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -1636,7 +1636,7 @@ struct nrs_orr_object { * scheduling RPCs */ struct nrs_orr_key oo_key; - cfs_atomic_t oo_ref; + long oo_ref; /** * Round Robin quantum; the maximum number of RPCs that are allowed to * be scheduled for the object or OST in a single batch of each round. diff --git a/lustre/ptlrpc/nrs_orr.c b/lustre/ptlrpc/nrs_orr.c index 66e8baa..1d31c0f 100644 --- a/lustre/ptlrpc/nrs_orr.c +++ b/lustre/ptlrpc/nrs_orr.c @@ -378,11 +378,11 @@ static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name) */ #define NRS_ORR_BITS 24 #define NRS_ORR_BKT_BITS 12 -#define NRS_ORR_HASH_FLAGS (CFS_HASH_RW_BKTLOCK | CFS_HASH_ASSERT_EMPTY) +#define NRS_ORR_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | CFS_HASH_ASSERT_EMPTY) #define NRS_TRR_BITS 4 #define NRS_TRR_BKT_BITS 2 -#define NRS_TRR_HASH_FLAGS CFS_HASH_RW_BKTLOCK +#define NRS_TRR_HASH_FLAGS CFS_HASH_SPIN_BKTLOCK static unsigned nrs_orr_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask) { @@ -417,7 +417,7 @@ static void nrs_orr_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) struct nrs_orr_object *orro = cfs_hlist_entry(hnode, struct nrs_orr_object, oo_hnode); - cfs_atomic_inc(&orro->oo_ref); + orro->oo_ref++; } /** @@ -431,34 +431,21 @@ static void nrs_orr_hop_put_free(cfs_hash_t *hs, cfs_hlist_node_t *hnode) oo_hnode); struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent, struct nrs_orr_data, od_res); - cfs_hash_bd_t bds[2]; + cfs_hash_bd_t bd; - if (cfs_atomic_dec_return(&orro->oo_ref) > 1) - return; + cfs_hash_bd_get_and_lock(hs, &orro->oo_key, &bd, 1); - cfs_hash_lock(hs, 0); - cfs_hash_dual_bd_get_and_lock(hs, &orro->oo_key, bds, 1); + if (--orro->oo_ref > 1) { + cfs_hash_bd_unlock(hs, &bd, 1); - /** - * Another thread may have won the race and taken a reference on the - * nrs_orr_object. - */ - if (cfs_atomic_read(&orro->oo_ref) > 1) - goto lost_race; + return; + } + LASSERT(orro->oo_ref == 1); - if (bds[1].bd_bucket == NULL) - cfs_hash_bd_del_locked(hs, &bds[0], hnode); - else - hnode = cfs_hash_dual_bd_finddel_locked(hs, bds, &orro->oo_key, - hnode); - LASSERT(hnode != NULL); + cfs_hash_bd_del_locked(hs, &bd, hnode); + cfs_hash_bd_unlock(hs, &bd, 1); OBD_SLAB_FREE_PTR(orro, orrd->od_cache); - -lost_race: - - cfs_hash_dual_bd_unlock(hs, bds, 1); - cfs_hash_unlock(hs, 0); } static void nrs_orr_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode) @@ -466,7 +453,7 @@ static void nrs_orr_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode) struct nrs_orr_object *orro = cfs_hlist_entry(hnode, struct nrs_orr_object, oo_hnode); - cfs_atomic_dec(&orro->oo_ref); + orro->oo_ref--; } static int nrs_trr_hop_keycmp(const void *key, cfs_hlist_node_t *hnode) @@ -486,9 +473,9 @@ static void nrs_trr_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode) struct nrs_orr_data *orrd = container_of(orro->oo_res.res_parent, struct nrs_orr_data, od_res); - LASSERTF(cfs_atomic_read(&orro->oo_ref) == 0, - "Busy NRS TRR policy object for OST with index %u, with %d " - "refs\n", orro->oo_key.ok_idx, cfs_atomic_read(&orro->oo_ref)); + LASSERTF(orro->oo_ref == 0, + "Busy NRS TRR policy object for OST with index %u, with %ld " + "refs\n", orro->oo_key.ok_idx, orro->oo_ref); OBD_SLAB_FREE_PTR(orro, orrd->od_cache); } @@ -882,13 +869,12 @@ int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy, OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache, nrs_pol2cptab(policy), nrs_pol2cptid(policy), - (moving_req ? CFS_ALLOC_ATOMIC : - CFS_ALLOC_IO)); + moving_req ? GFP_ATOMIC : __GFP_IO); if (orro == NULL) RETURN(-ENOMEM); orro->oo_key = key; - cfs_atomic_set(&orro->oo_ref, 1); + orro->oo_ref = 1; tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key, &orro->oo_hnode);