X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Fclass_hash.c;h=46b4c5edda8ca73291a19d079e1604d80a48ff86;hb=1a24137e8f26eaae9a2dac39a1e8a8a0bed46b6b;hp=02ef3f4a173320a452de043f38162ac722d27302;hpb=c7342f2d57745fe020d87012be52354d48a1bf7e;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/class_hash.c b/lustre/obdclass/class_hash.c index 02ef3f4..46b4c5ed 100644 --- a/lustre/obdclass/class_hash.c +++ b/lustre/obdclass/class_hash.c @@ -38,6 +38,14 @@ * Implement a hash class for hash process in lustre system. * * Author: YuZhangyong + * + * 2008-08-15: Brian Behlendorf + * - Simplified API and improved documentation + * - Added per-hash feature flags: + * * LH_DEBUG additional validation + * * LH_REHASH dynamic rehashing + * - Added per-hash statistics + * - General performance enhancements */ #ifndef __KERNEL__ @@ -45,658 +53,721 @@ #include #endif -#include #include -#include -#include -#include -int lustre_hash_init(struct lustre_class_hash_body **hash_body_new, - char *hashname, __u32 hashsize, - struct lustre_hash_operations *hash_operations) +/** + * Initialize new lustre hash, where: + * @name - Descriptive hash name + * @cur_size - Initial hash table size + * @max_size - Maximum allowed hash table resize + * @ops - Registered hash table operations + * @flags - LH_REHASH enable synamic hash resizing + * - LH_SORT enable chained hash sort + */ +lustre_hash_t * +lustre_hash_init(char *name, unsigned int cur_size, unsigned int max_size, + lustre_hash_ops_t *ops, int flags) { - int i, n = 0; - struct lustre_class_hash_body *hash_body = NULL; - - LASSERT(hashsize > 0); - LASSERT(hash_operations != NULL); + lustre_hash_t *lh; + int i; ENTRY; - - i = hashsize; - while (i != 0) { - if (i & 0x1) - n++; - i >>= 1; - } - - LASSERTF(n == 1, "hashsize %u isn't 2^n\n", hashsize); - - /* alloc space for hash_body */ - OBD_ALLOC(hash_body, sizeof(*hash_body)); - - if (hash_body == NULL) { - CERROR("Cannot alloc space for hash body, hashname = %s \n", - hashname); - RETURN(-ENOMEM); + + LASSERT(name != NULL); + LASSERT(ops != NULL); + + /* + * Ensure hash is a power of two to allow the use of a bitmask + * in the hash function instead of a more expensive modulus. + */ + LASSERTF(cur_size && (cur_size & (cur_size - 1)) == 0, + "Size (%u) is not power of 2\n", cur_size); + LASSERTF(max_size && (max_size & (max_size - 1)) == 0, + "Size (%u) is not power of 2\n", max_size); + + OBD_ALLOC_PTR(lh); + if (!lh) + RETURN(NULL); + + lh->lh_name_size = strlen(name) + 1; + rwlock_init(&lh->lh_rwlock); + + OBD_ALLOC(lh->lh_name, lh->lh_name_size); + if (!lh->lh_name) { + OBD_FREE_PTR(lh); + RETURN(NULL); } - - LASSERT(hashname != NULL && - strlen(hashname) <= sizeof(hash_body->hashname)); - strcpy(hash_body->hashname, hashname); - hash_body->lchb_hash_max_size = hashsize; - hash_body->lchb_hash_operations = hash_operations; - - /* alloc space for the hash tables */ - OBD_ALLOC(hash_body->lchb_hash_tables, - sizeof(*hash_body->lchb_hash_tables) * hash_body->lchb_hash_max_size); - - if (hash_body->lchb_hash_tables == NULL) { - OBD_FREE(hash_body, sizeof(*hash_body)); - CERROR("Cannot alloc space for hashtables, hashname = %s \n", - hash_body->hashname); - RETURN(-ENOMEM); + + strncpy(lh->lh_name, name, lh->lh_name_size); + + atomic_set(&lh->lh_count, 0); + atomic_set(&lh->lh_rehash_count, 0); + lh->lh_cur_size = cur_size; + lh->lh_min_size = cur_size; + lh->lh_max_size = max_size; + lh->lh_min_theta = 500; /* theta * 1000 */ + lh->lh_max_theta = 2000; /* theta * 1000 */ + lh->lh_ops = ops; + lh->lh_flags = flags; + + OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size); + if (!lh->lh_buckets) { + OBD_FREE(lh->lh_name, lh->lh_name_size); + OBD_FREE_PTR(lh); + RETURN(NULL); } - - spin_lock_init(&hash_body->lchb_lock); /* initialize the body lock */ - - for(i = 0 ; i < hash_body->lchb_hash_max_size; i++) { - /* initial the bucket lock and list_head */ - INIT_HLIST_HEAD(&hash_body->lchb_hash_tables[i].lhb_head); - spin_lock_init(&hash_body->lchb_hash_tables[i].lhb_lock); + + for (i = 0; i < lh->lh_cur_size; i++) { + INIT_HLIST_HEAD(&lh->lh_buckets[i].lhb_head); + rwlock_init(&lh->lh_buckets[i].lhb_rwlock); + atomic_set(&lh->lh_buckets[i].lhb_count, 0); } - *hash_body_new = hash_body; - - RETURN(0); + + return lh; } EXPORT_SYMBOL(lustre_hash_init); - -void lustre_hash_exit(struct lustre_class_hash_body **new_hash_body) + +/** + * Cleanup lustre hash @lh. + */ +void +lustre_hash_exit(lustre_hash_t *lh) { - int i; - struct lustre_class_hash_body *hash_body = NULL; + lustre_hash_bucket_t *lhb; + struct hlist_node *hnode; + struct hlist_node *pos; + int i; ENTRY; - - hash_body = *new_hash_body; - - if (hash_body == NULL) { - CWARN("hash body has been deleted\n"); - goto out_hash; - } - - spin_lock(&hash_body->lchb_lock); /* lock the hash tables */ - - if (hash_body->lchb_hash_tables == NULL ) { - spin_unlock(&hash_body->lchb_lock); - CWARN("hash tables has been deleted\n"); - goto out_hash; - } - - for( i = 0; i < hash_body->lchb_hash_max_size; i++ ) { - struct lustre_hash_bucket * bucket; - struct hlist_node * actual_hnode, *pos; - - bucket = &hash_body->lchb_hash_tables[i]; - spin_lock(&bucket->lhb_lock); /* lock the bucket */ - hlist_for_each_safe(actual_hnode, pos, &(bucket->lhb_head)) { - lustre_hash_delitem_nolock(hash_body, i, actual_hnode); + + if (!lh) + return; + + write_lock(&lh->lh_rwlock); + + lh_for_each_bucket(lh, lhb, i) { + write_lock(&lhb->lhb_rwlock); + hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) { + __lustre_hash_bucket_validate(lh, lhb, hnode); + __lustre_hash_bucket_del(lh, lhb, hnode); + lh_exit(lh, hnode); } - spin_unlock(&bucket->lhb_lock); + + LASSERT(hlist_empty(&(lhb->lhb_head))); + LASSERT(atomic_read(&lhb->lhb_count) == 0); + write_unlock(&lhb->lhb_rwlock); } + + OBD_VFREE(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size); + OBD_FREE(lh->lh_name, lh->lh_name_size); + + LASSERT(atomic_read(&lh->lh_count) == 0); + write_unlock(&lh->lh_rwlock); + + OBD_FREE_PTR(lh); + EXIT; +} +EXPORT_SYMBOL(lustre_hash_exit); - /* free the hash_tables's memory space */ - OBD_FREE(hash_body->lchb_hash_tables, - sizeof(*hash_body->lchb_hash_tables) * hash_body->lchb_hash_max_size); +static inline unsigned int lustre_hash_rehash_size(lustre_hash_t *lh) +{ + if (!(lh->lh_flags & LH_REHASH)) + return 0; - hash_body->lchb_hash_tables = NULL; + if ((lh->lh_cur_size < lh->lh_max_size) && + (__lustre_hash_theta(lh) > lh->lh_max_theta)) + return MIN(lh->lh_cur_size * 2, lh->lh_max_size); - spin_unlock(&hash_body->lchb_lock); + if ((lh->lh_cur_size > lh->lh_min_size) && + (__lustre_hash_theta(lh) < lh->lh_min_theta)) + return MAX(lh->lh_cur_size / 2, lh->lh_min_size); -out_hash : - /* free the hash_body's memory space */ - if (hash_body != NULL) { - OBD_FREE(hash_body, sizeof(*hash_body)); - *new_hash_body = NULL; - } + return 0; +} + +/** + * Add item @hnode to lustre hash @lh using @key. The registered + * ops->lh_get function will be called when the item is added. + */ +void +lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode) +{ + lustre_hash_bucket_t *lhb; + int size; + unsigned i; + ENTRY; + + __lustre_hash_key_validate(lh, key, hnode); + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + LASSERT(hlist_unhashed(hnode)); + + write_lock(&lhb->lhb_rwlock); + __lustre_hash_bucket_add(lh, lhb, hnode); + write_unlock(&lhb->lhb_rwlock); + + size = lustre_hash_rehash_size(lh); + read_unlock(&lh->lh_rwlock); + if (size) + lustre_hash_rehash(lh, size); + EXIT; } -EXPORT_SYMBOL(lustre_hash_exit); - -/* - * only allow unique @key in hashtables, if the same @key has existed - * in hashtables, it will return with fails. +EXPORT_SYMBOL(lustre_hash_add); + +/** + * Add item @hnode to lustre hash @lh using @key. The registered + * ops->lh_get function will be called if the item was added. + * Returns 0 on success or -EALREADY on key collisions. */ -int lustre_hash_additem_unique(struct lustre_class_hash_body *hash_body, - void *key, struct hlist_node *actual_hnode) +int +lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode) { - int hashent; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + lustre_hash_bucket_t *lhb; + int size; + int rc = -EALREADY; + unsigned i; ENTRY; - - LASSERT(hlist_unhashed(actual_hnode)); - hashent = hop->lustre_hashfn(hash_body, key); - - /* get the hash-bucket and lock it */ - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); - - if ( (lustre_hash_getitem_in_bucket_nolock(hash_body, hashent, key)) != NULL) { - /* the added-item exist in hashtables, so cannot add it again */ - spin_unlock(&bucket->lhb_lock); - - CWARN("Already found the key in hash [%s]\n", - hash_body->hashname); - RETURN(-EALREADY); + + __lustre_hash_key_validate(lh, key, hnode); + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + LASSERT(hlist_unhashed(hnode)); + + write_lock(&lhb->lhb_rwlock); + if (!__lustre_hash_bucket_lookup(lh, lhb, key)) { + __lustre_hash_bucket_add(lh, lhb, hnode); + rc = 0; } - - hlist_add_head(actual_hnode, &(bucket->lhb_head)); - -#ifdef LUSTRE_HASH_DEBUG - /* hash distribute debug */ - hash_body->lchb_hash_tables[hashent].lhb_item_count++; - CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", - hash_body->hashname, hashent, - hash_body->lchb_hash_tables[hashent].lhb_item_count); -#endif - hop->lustre_hash_object_refcount_get(actual_hnode); - - spin_unlock(&bucket->lhb_lock); - - RETURN(0); + write_unlock(&lhb->lhb_rwlock); + + size = lustre_hash_rehash_size(lh); + read_unlock(&lh->lh_rwlock); + if (size) + lustre_hash_rehash(lh, size); + + RETURN(rc); } -EXPORT_SYMBOL(lustre_hash_additem_unique); - -/* - * only allow unique @key in hashtables, if the same @key has existed - * in hashtables, it will return with fails. +EXPORT_SYMBOL(lustre_hash_add_unique); + +/** + * Add item @hnode to lustre hash @lh using @key. If this @key + * already exists in the hash then ops->lh_get will be called on the + * conflicting entry and that entry will be returned to the caller. + * Otherwise ops->lh_get is called on the item which was added. */ -void* lustre_hash_findadd_unique(struct lustre_class_hash_body *hash_body, - void *key, struct hlist_node *actual_hnode) +void * +lustre_hash_findadd_unique(lustre_hash_t *lh, void *key, + struct hlist_node *hnode) { - int hashent; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; - struct hlist_node * hash_item_hnode = NULL; - void *obj; + struct hlist_node *existing_hnode; + lustre_hash_bucket_t *lhb; + int size; + unsigned i; + void *obj; ENTRY; - - LASSERT(hlist_unhashed(actual_hnode)); - hashent = hop->lustre_hashfn(hash_body, key); - - /* get the hash-bucket and lock it */ - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); - - hash_item_hnode = lustre_hash_getitem_in_bucket_nolock(hash_body, - hashent, key); - if ( hash_item_hnode != NULL) { - /* the added-item exist in hashtables, so cannot add it again */ - obj = hop->lustre_hash_object_refcount_get(hash_item_hnode); - spin_unlock(&bucket->lhb_lock); - RETURN(obj); - } - - hlist_add_head(actual_hnode, &(bucket->lhb_head)); - -#ifdef LUSTRE_HASH_DEBUG - /* hash distribute debug */ - hash_body->lchb_hash_tables[hashent].lhb_item_count++; - CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", - hash_body->hashname, hashent, - hash_body->lchb_hash_tables[hashent].lhb_item_count); -#endif - obj = hop->lustre_hash_object_refcount_get(actual_hnode); - - spin_unlock(&bucket->lhb_lock); - + + __lustre_hash_key_validate(lh, key, hnode); + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + LASSERT(hlist_unhashed(hnode)); + + write_lock(&lhb->lhb_rwlock); + existing_hnode = __lustre_hash_bucket_lookup(lh, lhb, key); + if (existing_hnode) + obj = lh_get(lh, existing_hnode); + else + obj = __lustre_hash_bucket_add(lh, lhb, hnode); + write_unlock(&lhb->lhb_rwlock); + + size = lustre_hash_rehash_size(lh); + read_unlock(&lh->lh_rwlock); + if (size) + lustre_hash_rehash(lh, size); + RETURN(obj); } EXPORT_SYMBOL(lustre_hash_findadd_unique); - -/* - * this version of additem, it allow multi same @key in hashtables. - * in this additem version, we don't need to check if exist same @key in hash - * tables, we only add it to related hashbucket. - * example: maybe same nid will be related to multi difference export + +/** + * Delete item @hnode from the lustre hash @lh using @key. The @key + * is required to ensure the correct hash bucket is locked since there + * is no direct linkage from the item to the bucket. The object + * removed from the hash will be returned and obs->lh_put is called + * on the removed object. */ -int lustre_hash_additem(struct lustre_class_hash_body *hash_body, void *key, - struct hlist_node *actual_hnode) +void * +lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode) { - int hashent; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + lustre_hash_bucket_t *lhb; + int size; + unsigned i; + void *obj; ENTRY; - - LASSERT(hlist_unhashed(actual_hnode)); - - hashent = hop->lustre_hashfn(hash_body, key); - - /* get the hashbucket and lock it */ - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); - - hlist_add_head(actual_hnode, &(bucket->lhb_head)); - -#ifdef LUSTRE_HASH_DEBUG - /* hash distribute debug */ - hash_body->lchb_hash_tables[hashent].lhb_item_count++; - CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", - hash_body->hashname, hashent, - hash_body->lchb_hash_tables[hashent].lhb_item_count); -#endif - hop->lustre_hash_object_refcount_get(actual_hnode); - - spin_unlock(&bucket->lhb_lock); - - RETURN(0); + + __lustre_hash_key_validate(lh, key, hnode); + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + LASSERT(!hlist_unhashed(hnode)); + + write_lock(&lhb->lhb_rwlock); + obj = __lustre_hash_bucket_del(lh, lhb, hnode); + write_unlock(&lhb->lhb_rwlock); + + size = lustre_hash_rehash_size(lh); + read_unlock(&lh->lh_rwlock); + if (size) + lustre_hash_rehash(lh, size); + + RETURN(obj); } -EXPORT_SYMBOL(lustre_hash_additem); - - -/* - * this version of delitem will delete a hashitem with given @key, - * we need to search the <@key, @value> in hashbucket with @key, - * if match, the hashitem will be delete. - * we have a no-search version of delitem, it will directly delete a hashitem, - * doesn't need to search it in hashtables, so it is a O(1) delete. +EXPORT_SYMBOL(lustre_hash_del); + +/** + * Delete item given @key in lustre hash @lh. The first @key found in + * the hash will be removed, if the key exists multiple times in the hash + * @lh this function must be called once per key. The removed object + * will be returned and ops->lh_put is called on the removed object. */ -int lustre_hash_delitem_by_key(struct lustre_class_hash_body *hash_body, - void *key) +void * +lustre_hash_del_key(lustre_hash_t *lh, void *key) { - int hashent ; - struct hlist_node * hash_item; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; - int retval = 0; + struct hlist_node *hnode; + lustre_hash_bucket_t *lhb; + int size; + unsigned i; + void *obj = NULL; ENTRY; - - hashent = hop->lustre_hashfn(hash_body, key); - - /* first, lock the hashbucket */ - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); - - /* get the hash_item from hash_bucket */ - hash_item = lustre_hash_getitem_in_bucket_nolock(hash_body, hashent, - key); - - if (hash_item == NULL) { - spin_unlock(&bucket->lhb_lock); - RETURN(-ENOENT); - } - - /* call delitem_nolock() to delete the hash_item */ - retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item); - - spin_unlock(&bucket->lhb_lock); - - RETURN(retval); + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + + write_lock(&lhb->lhb_rwlock); + hnode = __lustre_hash_bucket_lookup(lh, lhb, key); + if (hnode) + obj = __lustre_hash_bucket_del(lh, lhb, hnode); + + write_unlock(&lhb->lhb_rwlock); + + size = lustre_hash_rehash_size(lh); + read_unlock(&lh->lh_rwlock); + if (size) + lustre_hash_rehash(lh, size); + + RETURN(obj); } -EXPORT_SYMBOL(lustre_hash_delitem_by_key); - -/* - * the O(1) version of delete hash item, - * it will directly delete the hashitem with given @hash_item, - * the parameter @key used to get the relation hash bucket and lock it. +EXPORT_SYMBOL(lustre_hash_del_key); + +/** + * Lookup an item using @key in the lustre hash @lh and return it. + * If the @key is found in the hash lh->lh_get() is called and the + * matching objects is returned. It is the callers responsibility + * to call the counterpart ops->lh_put using the lh_put() macro + * when when finished with the object. If the @key was not found + * in the hash @lh NULL is returned. */ -int lustre_hash_delitem(struct lustre_class_hash_body *hash_body, - void *key, struct hlist_node * hash_item) -{ - int hashent = 0; - int retval = 0; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; +void * +lustre_hash_lookup(lustre_hash_t *lh, void *key) +{ + struct hlist_node *hnode; + lustre_hash_bucket_t *lhb; + unsigned i; + void *obj = NULL; ENTRY; - - hashent = hop->lustre_hashfn(hash_body, key); - - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); - - /* call delitem_nolock() to delete the hash_item */ - retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item); - - spin_unlock(&bucket->lhb_lock); - - RETURN(retval); + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + + read_lock(&lhb->lhb_rwlock); + hnode = __lustre_hash_bucket_lookup(lh, lhb, key); + if (hnode) + obj = lh_get(lh, hnode); + + read_unlock(&lhb->lhb_rwlock); + read_unlock(&lh->lh_rwlock); + + RETURN(obj); } -EXPORT_SYMBOL(lustre_hash_delitem); - -void lustre_hash_bucket_iterate(struct lustre_class_hash_body *hash_body, - void *key, hash_item_iterate_cb func, void *data) +EXPORT_SYMBOL(lustre_hash_lookup); + +/** + * For each item in the lustre hash @lh call the passed callback @func + * and pass to it as an argument each hash item and the private @data. + * Before each callback ops->lh_get will be called, and after each + * callback ops->lh_put will be called. Finally, during the callback + * the bucket lock is held so the callback must never sleep. + */ +void +lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data) { - int hashent, find = 0; - struct lustre_hash_bucket *bucket = NULL; - struct hlist_node *hash_item_node = NULL; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; - struct obd_export *tmp = NULL; - + struct hlist_node *hnode; + lustre_hash_bucket_t *lhb; + void *obj; + int i; ENTRY; - - hashent = hop->lustre_hashfn(hash_body, key); - bucket = &hash_body->lchb_hash_tables[hashent]; - - spin_lock(&bucket->lhb_lock); - hlist_for_each(hash_item_node, &(bucket->lhb_head)) { - find = hop->lustre_hash_key_compare(key, hash_item_node); - if (find) { - tmp = hop->lustre_hash_object_refcount_get(hash_item_node); - func(tmp, data); - hop->lustre_hash_object_refcount_put(hash_item_node); + + read_lock(&lh->lh_rwlock); + lh_for_each_bucket(lh, lhb, i) { + read_lock(&lhb->lhb_rwlock); + hlist_for_each(hnode, &(lhb->lhb_head)) { + __lustre_hash_bucket_validate(lh, lhb, hnode); + obj = lh_get(lh, hnode); + func(obj, data); + (void)lh_put(lh, hnode); } + read_unlock(&lhb->lhb_rwlock); } - spin_unlock(&bucket->lhb_lock); -} -EXPORT_SYMBOL(lustre_hash_bucket_iterate); + read_unlock(&lh->lh_rwlock); -void lustre_hash_iterate_all(struct lustre_class_hash_body *hash_body, - hash_item_iterate_cb func, void *data) + EXIT; +} +EXPORT_SYMBOL(lustre_hash_for_each); + +/** + * For each item in the lustre hash @lh call the passed callback @func + * and pass to it as an argument each hash item and the private @data. + * Before each callback ops->lh_get will be called, and after each + * callback ops->lh_put will be called. During the callback the + * bucket lock will not be held will allows for the current item + * to be removed from the hash during the callback. However, care + * should be taken to prevent other callers from operating on the + * hash concurrently or list corruption may occur. + */ +void +lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data) { - int i; - struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + struct hlist_node *hnode; + struct hlist_node *pos; + lustre_hash_bucket_t *lhb; + void *obj; + int i; ENTRY; - - for( i = 0; i < hash_body->lchb_hash_max_size; i++ ) { - struct lustre_hash_bucket * bucket; - struct hlist_node * actual_hnode, *pos; - void *obj; - - bucket = &hash_body->lchb_hash_tables[i]; -#ifdef LUSTRE_HASH_DEBUG - CDEBUG(D_INFO, "idx %d - bucket %p\n", i, bucket); -#endif - spin_lock(&bucket->lhb_lock); /* lock the bucket */ - hlist_for_each_safe(actual_hnode, pos, &(bucket->lhb_head)) { - obj = hop->lustre_hash_object_refcount_get(actual_hnode); + + read_lock(&lh->lh_rwlock); + lh_for_each_bucket(lh, lhb, i) { + read_lock(&lhb->lhb_rwlock); + hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) { + __lustre_hash_bucket_validate(lh, lhb, hnode); + obj = lh_get(lh, hnode); + read_unlock(&lhb->lhb_rwlock); func(obj, data); - hop->lustre_hash_object_refcount_put(actual_hnode); + read_lock(&lhb->lhb_rwlock); + (void)lh_put(lh, hnode); } - spin_unlock(&bucket->lhb_lock); + read_unlock(&lhb->lhb_rwlock); } + read_unlock(&lh->lh_rwlock); EXIT; } -EXPORT_SYMBOL(lustre_hash_iterate_all); - - -void * lustre_hash_get_object_by_key(struct lustre_class_hash_body *hash_body, - void *key) +EXPORT_SYMBOL(lustre_hash_for_each_safe); + +/** + * For each hash bucket in the lustre hash @lh call the passed callback + * @func until all the hash buckets are empty. The passed callback @func + * or the previously registered callback lh->lh_put must remove the item + * from the hash. You may either use the lustre_hash_del() or hlist_del() + * functions. No rwlocks will be held during the callback @func it is + * safe to sleep if needed. This function will not terminate until the + * hash is empty. Note it is still possible to concurrently add new + * items in to the hash. It is the callers responsibility to ensure + * the required locking is in place to prevent concurrent insertions. + */ +void +lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data) { - int hashent ; - struct hlist_node * hash_item_hnode = NULL; - void * obj_value = NULL; - struct lustre_hash_bucket *bucket = NULL; - struct lustre_hash_operations * hop = hash_body->lchb_hash_operations; + struct hlist_node *hnode; + lustre_hash_bucket_t *lhb; + void *obj; + int i; ENTRY; - - /* get the hash value from the given item */ - hashent = hop->lustre_hashfn(hash_body, key); - - bucket = &hash_body->lchb_hash_tables[hashent]; - spin_lock(&bucket->lhb_lock); /* lock the bucket */ - - hash_item_hnode = lustre_hash_getitem_in_bucket_nolock(hash_body, - hashent, key); - - if (hash_item_hnode == NULL) { - spin_unlock(&bucket->lhb_lock); /* lock the bucket */ - RETURN(NULL); + +restart: + read_lock(&lh->lh_rwlock); + lh_for_each_bucket(lh, lhb, i) { + write_lock(&lhb->lhb_rwlock); + while (!hlist_empty(&lhb->lhb_head)) { + hnode = lhb->lhb_head.first; + __lustre_hash_bucket_validate(lh, lhb, hnode); + obj = lh_get(lh, hnode); + write_unlock(&lhb->lhb_rwlock); + read_unlock(&lh->lh_rwlock); + func(obj, data); + (void)lh_put(lh, hnode); + goto restart; + } + write_unlock(&lhb->lhb_rwlock); } - - obj_value = hop->lustre_hash_object_refcount_get(hash_item_hnode); - spin_unlock(&bucket->lhb_lock); /* lock the bucket */ - - RETURN(obj_value); -} -EXPORT_SYMBOL(lustre_hash_get_object_by_key); - -/* string hashing using djb2 hash algorithm */ -__u32 djb2_hashfn(struct lustre_class_hash_body *hash_body, void* key, - size_t size) -{ - __u32 hash = 5381; - int i; - char *ptr = key; - - LASSERT(key != NULL); - - for( i = 0; i < size; i++ ) - hash = hash * 33 + ptr[i]; - - hash &= (hash_body->lchb_hash_max_size - 1); - - RETURN(hash); -} - -/* - * define (uuid <-> export) hash operations and function define - */ - -/* define the uuid hash operations */ -struct lustre_hash_operations uuid_hash_operations = { - .lustre_hashfn = uuid_hashfn, - .lustre_hash_key_compare = uuid_hash_key_compare, - .lustre_hash_object_refcount_get = uuid_export_refcount_get, - .lustre_hash_object_refcount_put = uuid_export_refcount_put, -}; - -__u32 uuid_hashfn(struct lustre_class_hash_body *hash_body, void * key) -{ - struct obd_uuid * uuid_key = key; - - return djb2_hashfn(hash_body, uuid_key->uuid, sizeof(uuid_key->uuid)); -} - -/* Note, it is impossible to find an export that is in failed state with - * this function */ -int uuid_hash_key_compare(void *key, struct hlist_node *compared_hnode) -{ - struct obd_export *export = NULL; - struct obd_uuid *uuid_key = NULL, *compared_uuid = NULL; - - LASSERT( key != NULL); - - uuid_key = (struct obd_uuid*)key; - - export = hlist_entry(compared_hnode, struct obd_export, exp_uuid_hash); - - compared_uuid = &export->exp_client_uuid; - - RETURN(obd_uuid_equals(uuid_key, compared_uuid) && - !export->exp_failed); -} - -void * uuid_export_refcount_get(struct hlist_node * actual_hnode) -{ - struct obd_export *export = NULL; - - LASSERT(actual_hnode != NULL); - - export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash); - - LASSERT(export != NULL); - - class_export_get(export); - - RETURN(export); + read_unlock(&lh->lh_rwlock); + EXIT; } - -void uuid_export_refcount_put(struct hlist_node * actual_hnode) +EXPORT_SYMBOL(lustre_hash_for_each_empty); + + /* + * For each item in the lustre hash @lh which matches the @key call + * the passed callback @func and pass to it as an argument each hash + * item and the private @data. Before each callback ops->lh_get will + * be called, and after each callback ops->lh_put will be called. + * Finally, during the callback the bucket lock is held so the + * callback must never sleep. + */ +void +lustre_hash_for_each_key(lustre_hash_t *lh, void *key, + lh_for_each_cb func, void *data) { - struct obd_export *export = NULL; - - LASSERT(actual_hnode != NULL); - - export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash); - - LASSERT(export != NULL); - - class_export_put(export); + struct hlist_node *hnode; + lustre_hash_bucket_t *lhb; + unsigned i; + ENTRY; + + read_lock(&lh->lh_rwlock); + i = lh_hash(lh, key, lh->lh_cur_size - 1); + lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + + read_lock(&lhb->lhb_rwlock); + hlist_for_each(hnode, &(lhb->lhb_head)) { + __lustre_hash_bucket_validate(lh, lhb, hnode); + + if (!lh_compare(lh, key, hnode)) + continue; + + func(lh_get(lh, hnode), data); + (void)lh_put(lh, hnode); + } + + read_unlock(&lhb->lhb_rwlock); + read_unlock(&lh->lh_rwlock); + + EXIT; } - -/* - * define (nid <-> export) hash operations and function define +EXPORT_SYMBOL(lustre_hash_for_each_key); + +/** + * Rehash the lustre hash @lh to the given @size. This can be used + * to grow the hash size when excessive chaining is detected, or to + * shrink the hash when it is larger than needed. When the LH_REHASH + * flag is set in @lh the lustre hash may be dynamically rehashed + * during addition or removal if the hash's theta value exceeds + * either the lh->lh_min_theta or lh->max_theta values. By default + * these values are tuned to keep the chained hash depth small, and + * this approach assumes a reasonably uniform hashing function. The + * theta thresholds for @lh are tunable via lustre_hash_set_theta(). */ - -/* define the nid hash operations */ -struct lustre_hash_operations nid_hash_operations = { - .lustre_hashfn = nid_hashfn, - .lustre_hash_key_compare = nid_hash_key_compare, - .lustre_hash_object_refcount_get = nid_export_refcount_get, - .lustre_hash_object_refcount_put = nid_export_refcount_put, -}; - -__u32 nid_hashfn(struct lustre_class_hash_body *hash_body, void * key) -{ - return djb2_hashfn(hash_body, key, sizeof(lnet_nid_t)); -} - -/* Note, it is impossible to find an export that is in failed state with - * this function */ -int nid_hash_key_compare(void *key, struct hlist_node *compared_hnode) -{ - struct obd_export *export = NULL; - lnet_nid_t *nid_key = NULL; - - LASSERT( key != NULL); - - nid_key = (lnet_nid_t*)key; - - export = hlist_entry(compared_hnode, struct obd_export, exp_nid_hash); - - return (export->exp_connection->c_peer.nid == *nid_key && - !export->exp_failed); -} - -void *nid_export_refcount_get(struct hlist_node *actual_hnode) -{ - struct obd_export *export = NULL; - - LASSERT(actual_hnode != NULL); - - export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash); - - LASSERT(export != NULL); - - class_export_get(export); - - RETURN(export); -} - -void nid_export_refcount_put(struct hlist_node *actual_hnode) +int +lustre_hash_rehash(lustre_hash_t *lh, int size) { - struct obd_export *export = NULL; - - LASSERT(actual_hnode != NULL); - - export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash); - - LASSERT(export != NULL); - - class_export_put(export); + struct hlist_node *hnode; + struct hlist_node *pos; + lustre_hash_bucket_t *lh_buckets; + lustre_hash_bucket_t *rehash_buckets; + lustre_hash_bucket_t *lh_lhb; + lustre_hash_bucket_t *rehash_lhb; + int i; + int lh_size; + int theta; + void *key; + ENTRY; + + LASSERT(size > 0); + + OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) * size); + if (!rehash_buckets) + RETURN(-ENOMEM); + + for (i = 0; i < size; i++) { + INIT_HLIST_HEAD(&rehash_buckets[i].lhb_head); + rwlock_init(&rehash_buckets[i].lhb_rwlock); + atomic_set(&rehash_buckets[i].lhb_count, 0); + } + + write_lock(&lh->lh_rwlock); + + /* + * Early return for multiple concurrent racing callers, + * ensure we only trigger the rehash if it is still needed. + */ + theta = __lustre_hash_theta(lh); + if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) { + OBD_VFREE(rehash_buckets, sizeof(*rehash_buckets) * size); + write_unlock(&lh->lh_rwlock); + RETURN(-EALREADY); + } + + lh_size = lh->lh_cur_size; + lh_buckets = lh->lh_buckets; + + lh->lh_cur_size = size; + lh->lh_buckets = rehash_buckets; + atomic_inc(&lh->lh_rehash_count); + + for (i = 0; i < lh_size; i++) { + lh_lhb = &lh_buckets[i]; + + write_lock(&lh_lhb->lhb_rwlock); + hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) { + key = lh_key(lh, hnode); + LASSERT(key); + + /* + * Validate hnode is in the correct bucket. + */ + if (unlikely(lh->lh_flags & LH_DEBUG)) + LASSERT(lh_hash(lh, key, lh_size - 1) == i); + + /* + * Delete from old hash bucket. + */ + hlist_del(hnode); + LASSERT(atomic_read(&lh_lhb->lhb_count) > 0); + atomic_dec(&lh_lhb->lhb_count); + + /* + * Add to rehash bucket, ops->lh_key must be defined. + */ + rehash_lhb = &rehash_buckets[lh_hash(lh, key, size-1)]; + hlist_add_head(hnode, &(rehash_lhb->lhb_head)); + atomic_inc(&rehash_lhb->lhb_count); + } + + LASSERT(hlist_empty(&(lh_lhb->lhb_head))); + LASSERT(atomic_read(&lh_lhb->lhb_count) == 0); + write_unlock(&lh_lhb->lhb_rwlock); + } + + OBD_VFREE(lh_buckets, sizeof(*lh_buckets) * lh_size); + write_unlock(&lh->lh_rwlock); + + RETURN(0); } - -/* - * define (net_peer <-> connection) hash operations and function define +EXPORT_SYMBOL(lustre_hash_rehash); + +/** + * Rehash the object referenced by @hnode in the lustre hash @lh. The + * @old_key must be provided to locate the objects previous location + * in the hash, and the @new_key will be used to reinsert the object. + * Use this function instead of a lustre_hash_add() + lustre_hash_del() + * combo when it is critical that there is no window in time where the + * object is missing from the hash. When an object is being rehashed + * the registered lh_get() and lh_put() functions will not be called. */ - -/* define the conn hash operations */ -struct lustre_hash_operations conn_hash_operations = { - .lustre_hashfn = conn_hashfn, - .lustre_hash_key_compare = conn_hash_key_compare, - .lustre_hash_object_refcount_get = conn_refcount_get, - .lustre_hash_object_refcount_put = conn_refcount_put, -}; -EXPORT_SYMBOL(conn_hash_operations); - -__u32 conn_hashfn(struct lustre_class_hash_body *hash_body, void * key) +void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key, + struct hlist_node *hnode) { - return djb2_hashfn(hash_body, key, sizeof(lnet_process_id_t)); -} - -int conn_hash_key_compare(void *key, struct hlist_node *compared_hnode) -{ - struct ptlrpc_connection *c = NULL; - lnet_process_id_t *conn_key = NULL; - - LASSERT( key != NULL); - - conn_key = (lnet_process_id_t*)key; - - c = hlist_entry(compared_hnode, struct ptlrpc_connection, c_hash); - - return (conn_key->nid == c->c_peer.nid && - conn_key->pid == c->c_peer.pid); -} - -void *conn_refcount_get(struct hlist_node *actual_hnode) -{ - struct ptlrpc_connection *c = NULL; - - LASSERT(actual_hnode != NULL); - - c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash); - - LASSERT(c != NULL); - - atomic_inc(&c->c_refcount); - - RETURN(c); -} - -void conn_refcount_put(struct hlist_node *actual_hnode) -{ - struct ptlrpc_connection *c = NULL; - - LASSERT(actual_hnode != NULL); - - c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash); - - LASSERT(c != NULL); - - atomic_dec(&c->c_refcount); -} - -/*******************************************************************************/ -/* ( nid<>nidstats ) hash operations define */ - -struct lustre_hash_operations nid_stat_hash_operations = { - .lustre_hashfn = nid_hashfn, - .lustre_hash_key_compare = nidstats_hash_key_compare, - .lustre_hash_object_refcount_get = nidstats_refcount_get, - .lustre_hash_object_refcount_put = nidstats_refcount_put, -}; -EXPORT_SYMBOL(nid_stat_hash_operations); - -int nidstats_hash_key_compare(void *key, struct hlist_node * compared_hnode) -{ - struct nid_stat *data; - lnet_nid_t *nid_key; - - LASSERT( key != NULL); - - nid_key = (lnet_nid_t*)key; - data = hlist_entry(compared_hnode, struct nid_stat, nid_hash); - - return (data->nid == *nid_key); + lustre_hash_bucket_t *old_lhb; + lustre_hash_bucket_t *new_lhb; + unsigned i; + int j; + ENTRY; + + __lustre_hash_key_validate(lh, new_key, hnode); + LASSERT(!hlist_unhashed(hnode)); + + read_lock(&lh->lh_rwlock); + + i = lh_hash(lh, old_key, lh->lh_cur_size - 1); + old_lhb = &lh->lh_buckets[i]; + LASSERT(i < lh->lh_cur_size); + + j = lh_hash(lh, new_key, lh->lh_cur_size - 1); + new_lhb = &lh->lh_buckets[j]; + LASSERT(j < lh->lh_cur_size); + + write_lock(&old_lhb->lhb_rwlock); + write_lock(&new_lhb->lhb_rwlock); + + /* + * Migrate item between hash buckets without calling + * the lh_get() and lh_put() callback functions. + */ + hlist_del(hnode); + LASSERT(atomic_read(&old_lhb->lhb_count) > 0); + atomic_dec(&old_lhb->lhb_count); + hlist_add_head(hnode, &(new_lhb->lhb_head)); + atomic_inc(&new_lhb->lhb_count); + + write_unlock(&new_lhb->lhb_rwlock); + write_unlock(&old_lhb->lhb_rwlock); + read_unlock(&lh->lh_rwlock); + + EXIT; } - -void* nidstats_refcount_get(struct hlist_node * actual_hnode) +EXPORT_SYMBOL(lustre_hash_rehash_key); + +int lustre_hash_debug_header(char *str, int size) { - struct nid_stat *data; - - data = hlist_entry(actual_hnode, struct nid_stat, nid_hash); - data->nid_exp_ref_count++; - - RETURN(data); + return snprintf(str, size, + "%-36s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n", + "name", "cur", "min", "max", "theta", "t-min", "t-max", + "flags", "rehash", "count", " distribution"); } +EXPORT_SYMBOL(lustre_hash_debug_header); -void nidstats_refcount_put(struct hlist_node * actual_hnode) +int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size) { - struct nid_stat *data; - - data = hlist_entry(actual_hnode, struct nid_stat, nid_hash); - data->nid_exp_ref_count--; - EXIT; + lustre_hash_bucket_t *lhb; + int theta; + int i; + int c = 0; + int dist[8] = { 0, }; + + if (str == NULL || size == 0) + return 0; + + read_lock(&lh->lh_rwlock); + theta = __lustre_hash_theta(lh); + + c += snprintf(str + c, size - c, "%-36s ",lh->lh_name); + c += snprintf(str + c, size - c, "%5d ", lh->lh_cur_size); + c += snprintf(str + c, size - c, "%5d ", lh->lh_min_size); + c += snprintf(str + c, size - c, "%5d ", lh->lh_max_size); + c += snprintf(str + c, size - c, "%d.%03d ", + theta / 1000, theta % 1000); + c += snprintf(str + c, size - c, "%d.%03d ", + lh->lh_min_theta / 1000, lh->lh_min_theta % 1000); + c += snprintf(str + c, size - c, "%d.%03d ", + lh->lh_max_theta / 1000, lh->lh_max_theta % 1000); + c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags); + c += snprintf(str + c, size - c, "%6d ", + atomic_read(&lh->lh_rehash_count)); + c += snprintf(str + c, size - c, "%5d ", + atomic_read(&lh->lh_count)); + + /* + * The distribution is a summary of the chained hash depth in + * each of the lustre hash buckets. Each buckets lhb_count is + * divided by the hash theta value and used to generate a + * histogram of the hash distribution. A uniform hash will + * result in all hash buckets being close to the average thus + * only the first few entries in the histogram will be non-zero. + * If you hash function results in a non-uniform hash the will + * be observable by outlier bucks in the distribution histogram. + * + * Uniform hash distribution: 128/128/0/0/0/0/0/0 + * Non-Uniform hash distribution: 128/125/0/0/0/0/2/1 + */ + lh_for_each_bucket(lh, lhb, i) + dist[MIN(fls(atomic_read(&lhb->lhb_count)/MAX(theta,1)),7)]++; + + for (i = 0; i < 8; i++) + c += snprintf(str + c, size - c, "%d%c", dist[i], + (i == 7) ? '\n' : '/'); + + read_unlock(&lh->lh_rwlock); + + return c; } - -/*******************************************************************************/ +EXPORT_SYMBOL(lustre_hash_debug_str);