#include <class_hash.h>
+static void
+lh_read_lock(lustre_hash_t *lh)
+{
+ if ((lh->lh_flags & LH_REHASH) != 0)
+ read_lock(&lh->lh_rwlock);
+}
+
+static void
+lh_read_unlock(lustre_hash_t *lh)
+{
+ if ((lh->lh_flags & LH_REHASH) != 0)
+ read_unlock(&lh->lh_rwlock);
+}
+
+static void
+lh_write_lock(lustre_hash_t *lh)
+{
+ if ((lh->lh_flags & LH_REHASH) != 0)
+ write_lock(&lh->lh_rwlock);
+}
+
+static void
+lh_write_unlock(lustre_hash_t *lh)
+{
+ if ((lh->lh_flags & LH_REHASH) != 0)
+ write_unlock(&lh->lh_rwlock);
+}
+
/**
* Initialize new lustre hash, where:
- * \param name Descriptive hash name
- * \param cur_bits Initial hash table size, in bits
- * \param max_bits Maximum allowed hash table resize, in bits
- * \param ops Registered hash table operations
- * \param flags LH_REHASH enables dynamic hash resizing
- * LH_SORT enables chained hash sort
+ * @name - Descriptive hash name
+ * @cur_bits - Initial hash table size, in bits
+ * @max_bits - Maximum allowed hash table resize, in bits
+ * @ops - Registered hash table operations
+ * @flags - LH_REHASH enable synamic hash resizing
+ * - LH_SORT enable chained hash sort
*/
lustre_hash_t *
lustre_hash_init(char *name, unsigned int cur_bits, unsigned int max_bits,
LASSERT(max_bits >= cur_bits);
LASSERT(max_bits < 31);
- OBD_ALLOC_PTR(lh);
+ LIBCFS_ALLOC_PTR(lh);
if (!lh)
RETURN(NULL);
lh->lh_max_theta = 1 << (LH_THETA_BITS + 1);
lh->lh_ops = ops;
lh->lh_flags = flags;
+ if (cur_bits != max_bits && (lh->lh_flags & LH_REHASH) == 0)
+ CERROR("Rehash is disabled for %s, ignore max_bits %d\n",
+ name, max_bits);
/* theta * 1000 */
__lustre_hash_set_theta(lh, 500, 2000);
- OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
+ LIBCFS_ALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
if (!lh->lh_buckets) {
- OBD_FREE_PTR(lh);
+ LIBCFS_FREE_PTR(lh);
RETURN(NULL);
}
for (i = 0; i <= lh->lh_cur_mask; i++) {
- INIT_HLIST_HEAD(&lh->lh_buckets[i].lhb_head);
- rwlock_init(&lh->lh_buckets[i].lhb_rwlock);
- atomic_set(&lh->lh_buckets[i].lhb_count, 0);
+ LIBCFS_ALLOC(lh->lh_buckets[i], sizeof(lustre_hash_bucket_t));
+ if (lh->lh_buckets[i] == NULL) {
+ lustre_hash_exit(lh);
+ return NULL;
+ }
+
+ INIT_HLIST_HEAD(&lh->lh_buckets[i]->lhb_head);
+ rwlock_init(&lh->lh_buckets[i]->lhb_rwlock);
+ atomic_set(&lh->lh_buckets[i]->lhb_count, 0);
}
return lh;
EXPORT_SYMBOL(lustre_hash_init);
/**
- * Cleanup lustre hash \a lh.
+ * Cleanup lustre hash @lh.
*/
void
lustre_hash_exit(lustre_hash_t *lh)
LASSERT(lh != NULL);
- write_lock(&lh->lh_rwlock);
+ lh_write_lock(lh);
lh_for_each_bucket(lh, lhb, i) {
+ if (lhb == NULL)
+ continue;
+
write_lock(&lhb->lhb_rwlock);
hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
__lustre_hash_bucket_validate(lh, lhb, hnode);
lh_exit(lh, hnode);
}
- LASSERT(hlist_empty(&(lhb->lhb_head)));
- LASSERT(atomic_read(&lhb->lhb_count) == 0);
+ LASSERTF(hlist_empty(&(lhb->lhb_head)),
+ "hash bucket %d from %s is not empty\n", i, lh->lh_name);
+ LASSERTF(atomic_read(&lhb->lhb_count) == 0,
+ "hash bucket %d from %s has #entries > 0 (%d)\n", i,
+ lh->lh_name, atomic_read(&lhb->lhb_count));
write_unlock(&lhb->lhb_rwlock);
+ LIBCFS_FREE_PTR(lhb);
}
- LASSERT(atomic_read(&lh->lh_count) == 0);
- write_unlock(&lh->lh_rwlock);
+ LASSERTF(atomic_read(&lh->lh_count) == 0,
+ "hash %s still has #entries > 0 (%d)\n", lh->lh_name,
+ atomic_read(&lh->lh_count));
+ lh_write_unlock(lh);
- OBD_VFREE(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
- OBD_FREE_PTR(lh);
+ LIBCFS_FREE(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
+ LIBCFS_FREE_PTR(lh);
EXIT;
}
EXPORT_SYMBOL(lustre_hash_exit);
}
/**
- * Add item \a hnode to lustre hash \a lh using \a key. The registered
+ * Add item @hnode to lustre hash @lh using @key. The registered
* ops->lh_get function will be called when the item is added.
*/
void
__lustre_hash_key_validate(lh, key, hnode);
- read_lock(&lh->lh_rwlock);
+ lh_read_lock(lh);
i = lh_hash(lh, key, lh->lh_cur_mask);
- lhb = &lh->lh_buckets[i];
+ lhb = lh->lh_buckets[i];
LASSERT(i <= lh->lh_cur_mask);
LASSERT(hlist_unhashed(hnode));
write_unlock(&lhb->lhb_rwlock);
bits = lustre_hash_rehash_bits(lh);
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
if (bits)
lustre_hash_rehash(lh, bits);
__lustre_hash_key_validate(lh, key, hnode);
- read_lock(&lh->lh_rwlock);
+ lh_read_lock(lh);
i = lh_hash(lh, key, lh->lh_cur_mask);
- lhb = &lh->lh_buckets[i];
+ lhb = lh->lh_buckets[i];
LASSERT(i <= lh->lh_cur_mask);
LASSERT(hlist_unhashed(hnode));
bits = lustre_hash_rehash_bits(lh);
}
write_unlock(&lhb->lhb_rwlock);
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
if (bits)
lustre_hash_rehash(lh, bits);
}
/**
- * Add item \a hnode to lustre hash \a lh using \a key. The registered
+ * Add item @hnode to lustre hash @lh using @key. The registered
* ops->lh_get function will be called if the item was added.
* Returns 0 on success or -EALREADY on key collisions.
*/
EXPORT_SYMBOL(lustre_hash_add_unique);
/**
- * Add item \a hnode to lustre hash \a lh using \a key. If this \a key
+ * Add item @hnode to lustre hash @lh using @key. If this @key
* already exists in the hash then ops->lh_get will be called on the
* conflicting entry and that entry will be returned to the caller.
* Otherwise ops->lh_get is called on the item which was added.
EXPORT_SYMBOL(lustre_hash_findadd_unique);
/**
- * Delete item \a hnode from the lustre hash \a lh using \a key. The \a key
+ * Delete item @hnode from the lustre hash @lh using @key. The @key
* is required to ensure the correct hash bucket is locked since there
* is no direct linkage from the item to the bucket. The object
* removed from the hash will be returned and obs->lh_put is called
__lustre_hash_key_validate(lh, key, hnode);
- read_lock(&lh->lh_rwlock);
+ lh_read_lock(lh);
i = lh_hash(lh, key, lh->lh_cur_mask);
- lhb = &lh->lh_buckets[i];
+ lhb = lh->lh_buckets[i];
LASSERT(i <= lh->lh_cur_mask);
- LASSERT(!hlist_unhashed(hnode));
write_lock(&lhb->lhb_rwlock);
+ LASSERT(!hlist_unhashed(hnode));
obj = __lustre_hash_bucket_del(lh, lhb, hnode);
write_unlock(&lhb->lhb_rwlock);
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
RETURN(obj);
}
EXPORT_SYMBOL(lustre_hash_del);
/**
- * Delete item given \a key in lustre hash \a lh. The first \a key found in
+ * Delete item given @key in lustre hash @lh. The first @key found in
* the hash will be removed, if the key exists multiple times in the hash
- * \a lh this function must be called once per key. The removed object
+ * @lh this function must be called once per key. The removed object
* will be returned and ops->lh_put is called on the removed object.
*/
void *
void *obj = NULL;
ENTRY;
- read_lock(&lh->lh_rwlock);
+ lh_read_lock(lh);
i = lh_hash(lh, key, lh->lh_cur_mask);
- lhb = &lh->lh_buckets[i];
+ lhb = lh->lh_buckets[i];
LASSERT(i <= lh->lh_cur_mask);
write_lock(&lhb->lhb_rwlock);
obj = __lustre_hash_bucket_del(lh, lhb, hnode);
write_unlock(&lhb->lhb_rwlock);
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
RETURN(obj);
}
EXPORT_SYMBOL(lustre_hash_del_key);
/**
- * Lookup an item using \a key in the lustre hash \a lh and return it.
- * If the \a key is found in the hash lh->lh_get() is called and the
+ * Lookup an item using @key in the lustre hash @lh and return it.
+ * If the @key is found in the hash lh->lh_get() is called and the
* matching objects is returned. It is the callers responsibility
* to call the counterpart ops->lh_put using the lh_put() macro
- * when when finished with the object. If the \a key was not found
- * in the hash \a lh NULL is returned.
+ * when when finished with the object. If the @key was not found
+ * in the hash @lh NULL is returned.
*/
void *
lustre_hash_lookup(lustre_hash_t *lh, void *key)
void *obj = NULL;
ENTRY;
- read_lock(&lh->lh_rwlock);
+ lh_read_lock(lh);
i = lh_hash(lh, key, lh->lh_cur_mask);
- lhb = &lh->lh_buckets[i];
+ lhb = lh->lh_buckets[i];
LASSERT(i <= lh->lh_cur_mask);
read_lock(&lhb->lhb_rwlock);
obj = lh_get(lh, hnode);
read_unlock(&lhb->lhb_rwlock);
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
RETURN(obj);
}
EXPORT_SYMBOL(lustre_hash_lookup);
/**
- * For each item in the lustre hash \a lh call the passed callback \a func
- * and pass to it as an argument each hash item and the private \a data.
+ * For each item in the lustre hash @lh call the passed callback @func
+ * and pass to it as an argument each hash item and the private @data.
* Before each callback ops->lh_get will be called, and after each
* callback ops->lh_put will be called. Finally, during the callback
* the bucket lock is held so the callback must never sleep.
int i;
ENTRY;
- read_lock(&lh->lh_rwlock);
+ lh_read_lock(lh);
lh_for_each_bucket(lh, lhb, i) {
read_lock(&lhb->lhb_rwlock);
hlist_for_each(hnode, &(lhb->lhb_head)) {
}
read_unlock(&lhb->lhb_rwlock);
}
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
EXIT;
}
EXPORT_SYMBOL(lustre_hash_for_each);
/**
- * For each item in the lustre hash \a lh call the passed callback \a func
- * and pass to it as an argument each hash item and the private \a data.
+ * For each item in the lustre hash @lh call the passed callback @func
+ * and pass to it as an argument each hash item and the private @data.
* Before each callback ops->lh_get will be called, and after each
* callback ops->lh_put will be called. During the callback the
* bucket lock will not be held will allows for the current item
int i;
ENTRY;
- read_lock(&lh->lh_rwlock);
+ lh_read_lock(lh);
lh_for_each_bucket(lh, lhb, i) {
read_lock(&lhb->lhb_rwlock);
hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
}
read_unlock(&lhb->lhb_rwlock);
}
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
EXIT;
}
EXPORT_SYMBOL(lustre_hash_for_each_safe);
/**
- * For each hash bucket in the lustre hash \a lh call the passed callback
- * \a func until all the hash buckets are empty. The passed callback \a func
+ * For each hash bucket in the lustre hash @lh call the passed callback
+ * @func until all the hash buckets are empty. The passed callback @func
* or the previously registered callback lh->lh_put must remove the item
* from the hash. You may either use the lustre_hash_del() or hlist_del()
- * functions. No rwlocks will be held during the callback \a func it is
+ * functions. No rwlocks will be held during the callback @func it is
* safe to sleep if needed. This function will not terminate until the
* hash is empty. Note it is still possible to concurrently add new
* items in to the hash. It is the callers responsibility to ensure
{
struct hlist_node *hnode;
lustre_hash_bucket_t *lhb;
+ lustre_hash_bucket_t **lhb_last = NULL;
void *obj;
- int i;
+ int i = 0;
ENTRY;
restart:
- read_lock(&lh->lh_rwlock);
- lh_for_each_bucket(lh, lhb, i) {
+ lh_read_lock(lh);
+ /* If the hash table has changed since we last held lh_rwlock,
+ * we need to start traversing the list from the start. */
+ if (lh->lh_buckets != lhb_last) {
+ i = 0;
+ lhb_last = lh->lh_buckets;
+ }
+ lh_for_each_bucket_restart(lh, lhb, i) {
write_lock(&lhb->lhb_rwlock);
while (!hlist_empty(&lhb->lhb_head)) {
hnode = lhb->lhb_head.first;
__lustre_hash_bucket_validate(lh, lhb, hnode);
obj = lh_get(lh, hnode);
write_unlock(&lhb->lhb_rwlock);
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
func(obj, data);
(void)lh_put(lh, hnode);
+ cfs_cond_resched();
goto restart;
}
write_unlock(&lhb->lhb_rwlock);
}
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
EXIT;
}
EXPORT_SYMBOL(lustre_hash_for_each_empty);
/*
- * For each item in the lustre hash \a lh which matches the \a key call
- * the passed callback \a func and pass to it as an argument each hash
- * item and the private \a data. Before each callback ops->lh_get will
+ * For each item in the lustre hash @lh which matches the @key call
+ * the passed callback @func and pass to it as an argument each hash
+ * item and the private @data. Before each callback ops->lh_get will
* be called, and after each callback ops->lh_put will be called.
* Finally, during the callback the bucket lock is held so the
* callback must never sleep.
unsigned i;
ENTRY;
- read_lock(&lh->lh_rwlock);
+ lh_read_lock(lh);
i = lh_hash(lh, key, lh->lh_cur_mask);
- lhb = &lh->lh_buckets[i];
+ lhb = lh->lh_buckets[i];
LASSERT(i <= lh->lh_cur_mask);
read_lock(&lhb->lhb_rwlock);
}
read_unlock(&lhb->lhb_rwlock);
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
EXIT;
}
EXPORT_SYMBOL(lustre_hash_for_each_key);
/**
- * Rehash the lustre hash \a lh to the given \a bits. This can be used
+ * Rehash the lustre hash @lh to the given @bits. This can be used
* to grow the hash size when excessive chaining is detected, or to
* shrink the hash when it is larger than needed. When the LH_REHASH
- * flag is set in \a lh the lustre hash may be dynamically rehashed
+ * flag is set in @lh the lustre hash may be dynamically rehashed
* during addition or removal if the hash's theta value exceeds
* either the lh->lh_min_theta or lh->max_theta values. By default
* these values are tuned to keep the chained hash depth small, and
* this approach assumes a reasonably uniform hashing function. The
- * theta thresholds for \a lh are tunable via lustre_hash_set_theta().
+ * theta thresholds for @lh are tunable via lustre_hash_set_theta().
*/
int
lustre_hash_rehash(lustre_hash_t *lh, int bits)
{
struct hlist_node *hnode;
struct hlist_node *pos;
- lustre_hash_bucket_t *lh_buckets;
- lustre_hash_bucket_t *rehash_buckets;
+ lustre_hash_bucket_t **lh_buckets;
+ lustre_hash_bucket_t **rehash_buckets;
lustre_hash_bucket_t *lh_lhb;
lustre_hash_bucket_t *rehash_lhb;
int i;
int lh_mask;
int lh_bits;
int mask = (1 << bits) - 1;
+ int rc = 0;
void *key;
ENTRY;
LASSERT(!in_interrupt());
LASSERT(mask > 0);
+ LASSERT((lh->lh_flags & LH_REHASH) != 0);
- OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) << bits);
+ LIBCFS_ALLOC(rehash_buckets, sizeof(*rehash_buckets) << bits);
if (!rehash_buckets)
RETURN(-ENOMEM);
for (i = 0; i <= mask; i++) {
- INIT_HLIST_HEAD(&rehash_buckets[i].lhb_head);
- rwlock_init(&rehash_buckets[i].lhb_rwlock);
- atomic_set(&rehash_buckets[i].lhb_count, 0);
+ LIBCFS_ALLOC(rehash_buckets[i], sizeof(*rehash_buckets[i]));
+ if (rehash_buckets[i] == NULL)
+ GOTO(free, rc = -ENOMEM);
+
+ INIT_HLIST_HEAD(&rehash_buckets[i]->lhb_head);
+ rwlock_init(&rehash_buckets[i]->lhb_rwlock);
+ atomic_set(&rehash_buckets[i]->lhb_count, 0);
}
- write_lock(&lh->lh_rwlock);
+ lh_write_lock(lh);
/*
* Early return for multiple concurrent racing callers,
*/
theta = __lustre_hash_theta(lh);
if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
- OBD_VFREE(rehash_buckets, sizeof(*rehash_buckets) << bits);
- write_unlock(&lh->lh_rwlock);
- RETURN(-EALREADY);
+ lh_write_unlock(lh);
+ GOTO(free, rc = -EALREADY);
}
lh_bits = lh->lh_cur_bits;
atomic_inc(&lh->lh_rehash_count);
for (i = 0; i <= lh_mask; i++) {
- lh_lhb = &lh_buckets[i];
+ lh_lhb = lh_buckets[i];
write_lock(&lh_lhb->lhb_rwlock);
hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
/*
* Add to rehash bucket, ops->lh_key must be defined.
*/
- rehash_lhb = &rehash_buckets[lh_hash(lh, key, mask)];
+ rehash_lhb = rehash_buckets[lh_hash(lh, key, mask)];
hlist_add_head(hnode, &(rehash_lhb->lhb_head));
atomic_inc(&rehash_lhb->lhb_count);
}
write_unlock(&lh_lhb->lhb_rwlock);
}
- OBD_VFREE(lh_buckets, sizeof(*lh_buckets) << lh_bits);
- write_unlock(&lh->lh_rwlock);
+ lh_write_unlock(lh);
+ rehash_buckets = lh_buckets;
+ i = (1 << lh_bits);
+ bits = lh_bits;
+free:
+ while (i-- > 0)
+ LIBCFS_FREE(rehash_buckets[i], sizeof(*rehash_buckets[i]));
+ LIBCFS_FREE(rehash_buckets, sizeof(*rehash_buckets) << bits);
- RETURN(0);
+ RETURN(rc);
}
EXPORT_SYMBOL(lustre_hash_rehash);
/**
- * Rehash the object referenced by \a hnode in the lustre hash \a lh. The
- * \a old_key must be provided to locate the objects previous location
- * in the hash, and the \a new_key will be used to reinsert the object.
+ * Rehash the object referenced by @hnode in the lustre hash @lh. The
+ * @old_key must be provided to locate the objects previous location
+ * in the hash, and the @new_key will be used to reinsert the object.
* Use this function instead of a lustre_hash_add() + lustre_hash_del()
* combo when it is critical that there is no window in time where the
* object is missing from the hash. When an object is being rehashed
__lustre_hash_key_validate(lh, new_key, hnode);
LASSERT(!hlist_unhashed(hnode));
- read_lock(&lh->lh_rwlock);
+ lh_read_lock(lh);
i = lh_hash(lh, old_key, lh->lh_cur_mask);
- old_lhb = &lh->lh_buckets[i];
+ old_lhb = lh->lh_buckets[i];
LASSERT(i <= lh->lh_cur_mask);
j = lh_hash(lh, new_key, lh->lh_cur_mask);
- new_lhb = &lh->lh_buckets[j];
+ new_lhb = lh->lh_buckets[j];
LASSERT(j <= lh->lh_cur_mask);
if (i < j) { /* write_lock ordering */
write_lock(&new_lhb->lhb_rwlock);
write_lock(&old_lhb->lhb_rwlock);
} else { /* do nothing */
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
EXIT;
return;
}
write_unlock(&new_lhb->lhb_rwlock);
write_unlock(&old_lhb->lhb_rwlock);
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
EXIT;
}
if (str == NULL || size == 0)
return 0;
- read_lock(&lh->lh_rwlock);
+ lh_read_lock(lh);
theta = __lustre_hash_theta(lh);
c += snprintf(str + c, size - c, "%-*s ",
c += snprintf(str + c, size - c, "%d%c", dist[i],
(i == 7) ? '\n' : '/');
- read_unlock(&lh->lh_rwlock);
+ lh_read_unlock(lh);
return c;
}