Whamcloud - gitweb
LU-3430 nrs: Fix a race condition in the ORR policy 23/7623/3
authorNikitas Angelinas <nikitas_angelinas@xyratex.com>
Wed, 11 Sep 2013 22:43:23 +0000 (23:43 +0100)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 23 Sep 2013 16:42:25 +0000 (16:42 +0000)
Checking the atomic oo_ref in nrs_orr_hop_put_free() and then
taking the bucket lock leaves a race window open, by which a
second thread can attempt to free an already-freed
nrs_orr_object. Fix this, and change the hash bucket lock type
from an rwlock to a spinlock, as there are now as many calls on
the write path, as there are on the read path. Rehashing is not
used on the hashes, so libcfs_hash API usage can also be
simplified.

Signed-off-by: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
Change-Id: I2e16320c5e4b0803e1bef428f26ea18b0341ac5f
Xyratex-bug-id: MRP-1294
Reviewed-on: http://review.whamcloud.com/7623
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Liang Zhen <liang.zhen@intel.com>
Reviewed-by: Emoly Liu <emoly.liu@intel.com>
Reviewed-by: Li Xi <pkuelelixi@gmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_net.h
lustre/ptlrpc/nrs_orr.c

index 37f1974..de65401 100644 (file)
@@ -1579,7 +1579,7 @@ struct nrs_orr_data {
        struct ptlrpc_nrs_resource      od_res;
        cfs_binheap_t                  *od_binheap;
        cfs_hash_t                     *od_obj_hash;
-       struct kmem_cache                      *od_cache;
+       struct kmem_cache              *od_cache;
        /**
         * Used when a new scheduling round commences, in order to synchronize
         * all object or OST batches with the new round number.
@@ -1636,7 +1636,7 @@ struct nrs_orr_object {
         * scheduling RPCs
         */
        struct nrs_orr_key              oo_key;
-       cfs_atomic_t                    oo_ref;
+       long                            oo_ref;
        /**
         * Round Robin quantum; the maximum number of RPCs that are allowed to
         * be scheduled for the object or OST in a single batch of each round.
index 65b2f7a..b723bca 100644 (file)
@@ -378,11 +378,11 @@ static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name)
  */
 #define NRS_ORR_BITS           24
 #define NRS_ORR_BKT_BITS       12
-#define NRS_ORR_HASH_FLAGS     (CFS_HASH_RW_BKTLOCK | CFS_HASH_ASSERT_EMPTY)
+#define NRS_ORR_HASH_FLAGS     (CFS_HASH_SPIN_BKTLOCK | CFS_HASH_ASSERT_EMPTY)
 
 #define NRS_TRR_BITS           4
 #define NRS_TRR_BKT_BITS       2
-#define NRS_TRR_HASH_FLAGS     CFS_HASH_RW_BKTLOCK
+#define NRS_TRR_HASH_FLAGS     CFS_HASH_SPIN_BKTLOCK
 
 static unsigned nrs_orr_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask)
 {
@@ -417,7 +417,7 @@ static void nrs_orr_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
        struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
                                                      struct nrs_orr_object,
                                                      oo_hnode);
-       cfs_atomic_inc(&orro->oo_ref);
+       orro->oo_ref++;
 }
 
 /**
@@ -431,34 +431,21 @@ static void nrs_orr_hop_put_free(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
                                                      oo_hnode);
        struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
                                                   struct nrs_orr_data, od_res);
-       cfs_hash_bd_t          bds[2];
+       cfs_hash_bd_t          bd;
 
-       if (cfs_atomic_dec_return(&orro->oo_ref) > 1)
-               return;
+       cfs_hash_bd_get_and_lock(hs, &orro->oo_key, &bd, 1);
 
-       cfs_hash_lock(hs, 0);
-       cfs_hash_dual_bd_get_and_lock(hs, &orro->oo_key, bds, 1);
+       if (--orro->oo_ref > 1) {
+               cfs_hash_bd_unlock(hs, &bd, 1);
 
-       /**
-        * Another thread may have won the race and taken a reference on the
-        * nrs_orr_object.
-        */
-       if (cfs_atomic_read(&orro->oo_ref) > 1)
-               goto lost_race;
+               return;
+       }
+       LASSERT(orro->oo_ref == 1);
 
-       if (bds[1].bd_bucket == NULL)
-               cfs_hash_bd_del_locked(hs, &bds[0], hnode);
-       else
-               hnode = cfs_hash_dual_bd_finddel_locked(hs, bds, &orro->oo_key,
-                                                       hnode);
-       LASSERT(hnode != NULL);
+       cfs_hash_bd_del_locked(hs, &bd, hnode);
+       cfs_hash_bd_unlock(hs, &bd, 1);
 
        OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
-
-lost_race:
-
-       cfs_hash_dual_bd_unlock(hs, bds, 1);
-       cfs_hash_unlock(hs, 0);
 }
 
 static void nrs_orr_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
@@ -466,7 +453,7 @@ static void nrs_orr_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
        struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
                                                      struct nrs_orr_object,
                                                      oo_hnode);
-       cfs_atomic_dec(&orro->oo_ref);
+       orro->oo_ref--;
 }
 
 static int nrs_trr_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
@@ -486,9 +473,9 @@ static void nrs_trr_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
        struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
                                                   struct nrs_orr_data, od_res);
 
-       LASSERTF(cfs_atomic_read(&orro->oo_ref) == 0,
-                "Busy NRS TRR policy object for OST with index %u, with %d "
-                "refs\n", orro->oo_key.ok_idx, cfs_atomic_read(&orro->oo_ref));
+       LASSERTF(orro->oo_ref == 0,
+                "Busy NRS TRR policy object for OST with index %u, with %ld "
+                "refs\n", orro->oo_key.ok_idx, orro->oo_ref);
 
        OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
 }
@@ -882,13 +869,12 @@ int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
 
        OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
                                   nrs_pol2cptab(policy), nrs_pol2cptid(policy),
-                                  (moving_req ? GFP_ATOMIC :
-                                   __GFP_IO));
+                                  moving_req ? GFP_ATOMIC : __GFP_IO);
        if (orro == NULL)
                RETURN(-ENOMEM);
 
        orro->oo_key = key;
-       cfs_atomic_set(&orro->oo_ref, 1);
+       orro->oo_ref = 1;
 
        tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key,
                                      &orro->oo_hnode);