Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdclass / class_hash.c
index 02ef3f4..5370a4e 100644 (file)
  * Implement a hash class for hash process in lustre system.
  *
  * Author: YuZhangyong <yzy@clusterfs.com>
+ *
+ * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov>
+ * - Simplified API and improved documentation
+ * - Added per-hash feature flags:
+ *   * LH_DEBUG additional validation
+ *   * LH_REHASH dynamic rehashing
+ * - Added per-hash statistics
+ * - General performance enhancements
  */
 
 #ifndef __KERNEL__
 #include <obd.h>
 #endif
 
-#include <obd_class.h>
 #include <class_hash.h>
-#include <lustre_export.h>
-#include <obd_support.h>
-#include <lustre_net.h>
 
-int lustre_hash_init(struct lustre_class_hash_body **hash_body_new, 
-                     char *hashname, __u32 hashsize, 
-                     struct lustre_hash_operations *hash_operations)
+/**
+ * Initialize new lustre hash, where:
+ * @name     - Descriptive hash name
+ * @cur_bits - Initial hash table size, in bits
+ * @max_bits - Maximum allowed hash table resize, in bits
+ * @ops      - Registered hash table operations
+ * @flags    - LH_REHASH enable synamic hash resizing
+ *           - LH_SORT enable chained hash sort
+ */
+lustre_hash_t *
+lustre_hash_init(char *name, unsigned int cur_bits, unsigned int max_bits,
+                 lustre_hash_ops_t *ops, int flags)
 {
-        int i, n = 0;
-        struct lustre_class_hash_body *hash_body = NULL;
-
-        LASSERT(hashsize > 0);
-        LASSERT(hash_operations != NULL);
+        lustre_hash_t *lh;
+        int            i;
         ENTRY;
 
-        i = hashsize;
-        while (i != 0) {
-                if (i & 0x1)
-                        n++;
-                i >>= 1;
-        }
+        LASSERT(name != NULL);
+        LASSERT(ops != NULL);
 
-        LASSERTF(n == 1, "hashsize %u isn't 2^n\n", hashsize);
+        LASSERT(cur_bits > 0);
+        LASSERT(max_bits >= cur_bits);
+        LASSERT(max_bits < 31);
 
-        /* alloc space for hash_body */   
-        OBD_ALLOC(hash_body, sizeof(*hash_body)); 
+        OBD_ALLOC_PTR(lh);
+        if (!lh)
+                RETURN(NULL);
 
-        if (hash_body == NULL) {
-                CERROR("Cannot alloc space for hash body, hashname = %s \n", 
-                        hashname);
-                RETURN(-ENOMEM);
+        strncpy(lh->lh_name, name, sizeof(lh->lh_name));
+        lh->lh_name[sizeof(lh->lh_name) - 1] = '\0';
+        atomic_set(&lh->lh_rehash_count, 0);
+        atomic_set(&lh->lh_count, 0);
+        rwlock_init(&lh->lh_rwlock);
+        lh->lh_cur_bits = cur_bits;
+        lh->lh_cur_mask = (1 << cur_bits) - 1;
+        lh->lh_min_bits = cur_bits;
+        lh->lh_max_bits = max_bits;
+        /* XXX: need to fixup lustre_hash_rehash_bits() before this can be
+         *      anything other than 0.5 and 2.0 */
+        lh->lh_min_theta = 1 << (LH_THETA_BITS - 1);
+        lh->lh_max_theta = 1 << (LH_THETA_BITS + 1);
+        lh->lh_ops = ops;
+        lh->lh_flags = flags;
+
+        /* theta * 1000 */
+        __lustre_hash_set_theta(lh, 500, 2000);
+
+        OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
+        if (!lh->lh_buckets) {
+                OBD_FREE_PTR(lh);
+                RETURN(NULL);
         }
 
-        LASSERT(hashname != NULL && 
-                strlen(hashname) <= sizeof(hash_body->hashname));
-        strcpy(hash_body->hashname, hashname);
-        hash_body->lchb_hash_max_size = hashsize;      
-        hash_body->lchb_hash_operations = hash_operations;  
-
-        /* alloc space for the hash tables */
-        OBD_ALLOC(hash_body->lchb_hash_tables, 
-                  sizeof(*hash_body->lchb_hash_tables) * hash_body->lchb_hash_max_size);
-
-        if (hash_body->lchb_hash_tables == NULL) {
-                OBD_FREE(hash_body, sizeof(*hash_body)); 
-                CERROR("Cannot alloc space for hashtables, hashname = %s \n", 
-                        hash_body->hashname);
-                RETURN(-ENOMEM);
+        for (i = 0; i <= lh->lh_cur_mask; i++) {
+                INIT_HLIST_HEAD(&lh->lh_buckets[i].lhb_head);
+                rwlock_init(&lh->lh_buckets[i].lhb_rwlock);
+                atomic_set(&lh->lh_buckets[i].lhb_count, 0);
         }
 
-        spin_lock_init(&hash_body->lchb_lock); /* initialize the body lock */
-
-        for(i = 0 ; i < hash_body->lchb_hash_max_size; i++) {
-                /* initial the bucket lock and list_head */
-                INIT_HLIST_HEAD(&hash_body->lchb_hash_tables[i].lhb_head);
-                spin_lock_init(&hash_body->lchb_hash_tables[i].lhb_lock);
-        }
-        *hash_body_new = hash_body;
-
-        RETURN(0);
+        return lh;
 }
 EXPORT_SYMBOL(lustre_hash_init);
 
-void lustre_hash_exit(struct lustre_class_hash_body **new_hash_body)
+/**
+ * Cleanup lustre hash @lh.
+ */
+void
+lustre_hash_exit(lustre_hash_t *lh)
 {
-        int i;
-        struct lustre_class_hash_body *hash_body = NULL;
+        lustre_hash_bucket_t *lhb;
+        struct hlist_node    *hnode;
+        struct hlist_node    *pos;
+        int                   i;
         ENTRY;
 
-        hash_body = *new_hash_body;
+        LASSERT(lh != NULL);
 
-        if (hash_body == NULL) {
-                CWARN("hash body has been deleted\n");
-                goto out_hash;
-        }
+        write_lock(&lh->lh_rwlock);
 
-        spin_lock(&hash_body->lchb_lock); /* lock the hash tables */
+        lh_for_each_bucket(lh, lhb, i) {
+                write_lock(&lhb->lhb_rwlock);
+                hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
+                        __lustre_hash_bucket_validate(lh, lhb, hnode);
+                        __lustre_hash_bucket_del(lh, lhb, hnode);
+                        lh_exit(lh, hnode);
+                }
 
-        if (hash_body->lchb_hash_tables == NULL ) {
-                spin_unlock(&hash_body->lchb_lock);
-                CWARN("hash tables has been deleted\n");
-                goto out_hash;   
+                LASSERT(hlist_empty(&(lhb->lhb_head)));
+                LASSERT(atomic_read(&lhb->lhb_count) == 0);
+                write_unlock(&lhb->lhb_rwlock);
         }
 
-        for( i = 0; i < hash_body->lchb_hash_max_size; i++ ) {
-                struct lustre_hash_bucket * bucket;
-                struct hlist_node * actual_hnode, *pos;
+        LASSERT(atomic_read(&lh->lh_count) == 0);
+        write_unlock(&lh->lh_rwlock);
 
-                bucket = &hash_body->lchb_hash_tables[i];
-                spin_lock(&bucket->lhb_lock); /* lock the bucket */
-                hlist_for_each_safe(actual_hnode, pos, &(bucket->lhb_head)) {
-                        lustre_hash_delitem_nolock(hash_body, i, actual_hnode);
-                }
-                spin_unlock(&bucket->lhb_lock); 
-        }
+        OBD_VFREE(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
+        OBD_FREE_PTR(lh);
+        EXIT;
+}
+EXPORT_SYMBOL(lustre_hash_exit);
 
-        /* free the hash_tables's memory space */
-        OBD_FREE(hash_body->lchb_hash_tables,
-                  sizeof(*hash_body->lchb_hash_tables) * hash_body->lchb_hash_max_size);     
+static inline unsigned int lustre_hash_rehash_bits(lustre_hash_t *lh)
+{
+        if (!(lh->lh_flags & LH_REHASH))
+                return 0;
 
-        hash_body->lchb_hash_tables = NULL;
+        /* XXX: need to handle case with max_theta != 2.0
+         *      and the case with min_theta != 0.5 */
+        if ((lh->lh_cur_bits < lh->lh_max_bits) &&
+            (__lustre_hash_theta(lh) > lh->lh_max_theta))
+                return lh->lh_cur_bits + 1;
 
-        spin_unlock(&hash_body->lchb_lock);
+        if ((lh->lh_cur_bits > lh->lh_min_bits) &&
+            (__lustre_hash_theta(lh) < lh->lh_min_theta))
+                return lh->lh_cur_bits - 1;
 
-out_hash : 
-        /* free the hash_body's memory space */
-        if (hash_body != NULL) {
-                OBD_FREE(hash_body, sizeof(*hash_body));
-                *new_hash_body = NULL;
-        }
-        EXIT;
+        return 0;
 }
-EXPORT_SYMBOL(lustre_hash_exit);
 
-/*
- * only allow unique @key in hashtables, if the same @key has existed 
- * in hashtables, it will return with fails.
+/**
+ * Add item @hnode to lustre hash @lh using @key.  The registered
+ * ops->lh_get function will be called when the item is added.
  */
-int lustre_hash_additem_unique(struct lustre_class_hash_body *hash_body, 
-                               void *key, struct hlist_node *actual_hnode)
+void
+lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
 {
-        int hashent;
-        struct lustre_hash_bucket *bucket = NULL;
-        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+        lustre_hash_bucket_t *lhb;
+        int                   bits;
+        unsigned              i;
         ENTRY;
 
-        LASSERT(hlist_unhashed(actual_hnode));
-        hashent = hop->lustre_hashfn(hash_body, key);
+        __lustre_hash_key_validate(lh, key, hnode);
 
-        /* get the hash-bucket and lock it */
-        bucket = &hash_body->lchb_hash_tables[hashent];
-        spin_lock(&bucket->lhb_lock);
+        read_lock(&lh->lh_rwlock);
+        i = lh_hash(lh, key, lh->lh_cur_mask);
+        lhb = &lh->lh_buckets[i];
+        LASSERT(i <= lh->lh_cur_mask);
+        LASSERT(hlist_unhashed(hnode));
 
-        if ( (lustre_hash_getitem_in_bucket_nolock(hash_body, hashent, key)) != NULL) {
-                /* the added-item exist in hashtables, so cannot add it again */
-                spin_unlock(&bucket->lhb_lock);
+        write_lock(&lhb->lhb_rwlock);
+        __lustre_hash_bucket_add(lh, lhb, hnode);
+        write_unlock(&lhb->lhb_rwlock);
 
-                CWARN("Already found the key in hash [%s]\n", 
-                      hash_body->hashname);
-                RETURN(-EALREADY);
-        }
+        bits = lustre_hash_rehash_bits(lh);
+        read_unlock(&lh->lh_rwlock);
+        if (bits)
+                lustre_hash_rehash(lh, bits);
 
-        hlist_add_head(actual_hnode, &(bucket->lhb_head));
+        EXIT;
+}
+EXPORT_SYMBOL(lustre_hash_add);
 
-#ifdef LUSTRE_HASH_DEBUG
-        /* hash distribute debug */
-        hash_body->lchb_hash_tables[hashent].lhb_item_count++; 
-        CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", 
-                        hash_body->hashname, hashent, 
-                        hash_body->lchb_hash_tables[hashent].lhb_item_count);
-#endif  
-        hop->lustre_hash_object_refcount_get(actual_hnode); 
+static struct hlist_node *
+lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
+                                 struct hlist_node *hnode)
+{
+        int                   bits = 0;
+        struct hlist_node    *ehnode;
+        lustre_hash_bucket_t *lhb;
+        unsigned              i;
+        ENTRY;
 
-        spin_unlock(&bucket->lhb_lock);
+        __lustre_hash_key_validate(lh, key, hnode);
+
+        read_lock(&lh->lh_rwlock);
+        i = lh_hash(lh, key, lh->lh_cur_mask);
+        lhb = &lh->lh_buckets[i];
+        LASSERT(i <= lh->lh_cur_mask);
+        LASSERT(hlist_unhashed(hnode));
+
+        write_lock(&lhb->lhb_rwlock);
+        ehnode = __lustre_hash_bucket_lookup(lh, lhb, key);
+        if (ehnode) {
+                lh_get(lh, ehnode);
+        } else {
+                __lustre_hash_bucket_add(lh, lhb, hnode);
+                ehnode = hnode;
+                bits = lustre_hash_rehash_bits(lh);
+        }
+        write_unlock(&lhb->lhb_rwlock);
+        read_unlock(&lh->lh_rwlock);
+        if (bits)
+                lustre_hash_rehash(lh, bits);
 
-        RETURN(0);
+        RETURN(ehnode);
 }
-EXPORT_SYMBOL(lustre_hash_additem_unique);
 
-/*
- * only allow unique @key in hashtables, if the same @key has existed
- * in hashtables, it will return with fails.
+/**
+ * Add item @hnode to lustre hash @lh using @key.  The registered
+ * ops->lh_get function will be called if the item was added.
+ * Returns 0 on success or -EALREADY on key collisions.
  */
-void* lustre_hash_findadd_unique(struct lustre_class_hash_body *hash_body,
-                                     void *key, struct hlist_node *actual_hnode)
+int
+lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
 {
-        int hashent;
-        struct lustre_hash_bucket *bucket = NULL;
-        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
-        struct hlist_node * hash_item_hnode = NULL;
-        void *obj;
+        struct hlist_node    *ehnode;
         ENTRY;
 
-        LASSERT(hlist_unhashed(actual_hnode));
-        hashent = hop->lustre_hashfn(hash_body, key);
-
-        /* get the hash-bucket and lock it */
-        bucket = &hash_body->lchb_hash_tables[hashent];
-        spin_lock(&bucket->lhb_lock);
-
-        hash_item_hnode = lustre_hash_getitem_in_bucket_nolock(hash_body,
-                                                               hashent, key);
-        if ( hash_item_hnode != NULL) {
-                /* the added-item exist in hashtables, so cannot add it again */
-                obj = hop->lustre_hash_object_refcount_get(hash_item_hnode);
-                spin_unlock(&bucket->lhb_lock);
-                RETURN(obj);
+        ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
+        if (ehnode != hnode) {
+                lh_put(lh, ehnode);
+                RETURN(-EALREADY);
         }
+        RETURN(0);
+}
+EXPORT_SYMBOL(lustre_hash_add_unique);
 
-        hlist_add_head(actual_hnode, &(bucket->lhb_head));
-
-#ifdef LUSTRE_HASH_DEBUG
-        /* hash distribute debug */
-        hash_body->lchb_hash_tables[hashent].lhb_item_count++;
-        CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n",
-                        hash_body->hashname, hashent,
-                        hash_body->lchb_hash_tables[hashent].lhb_item_count);
-#endif
-        obj = hop->lustre_hash_object_refcount_get(actual_hnode);
-
-        spin_unlock(&bucket->lhb_lock);
+/**
+ * Add item @hnode to lustre hash @lh using @key.  If this @key
+ * already exists in the hash then ops->lh_get will be called on the
+ * conflicting entry and that entry will be returned to the caller.
+ * Otherwise ops->lh_get is called on the item which was added.
+ */
+void *
+lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
+                           struct hlist_node *hnode)
+{
+        struct hlist_node    *ehnode;
+        void                 *obj;
+        ENTRY;
 
+        ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
+        obj = lh_get(lh, ehnode);
+        lh_put(lh, ehnode);
         RETURN(obj);
 }
 EXPORT_SYMBOL(lustre_hash_findadd_unique);
 
-/*
- * this version of additem, it allow multi same @key <key, value> in hashtables. 
- * in this additem version, we don't need to check if exist same @key in hash 
- * tables, we only add it to related hashbucket.
- * example: maybe same nid will be related to multi difference export
+/**
+ * Delete item @hnode from the lustre hash @lh using @key.  The @key
+ * is required to ensure the correct hash bucket is locked since there
+ * is no direct linkage from the item to the bucket.  The object
+ * removed from the hash will be returned and obs->lh_put is called
+ * on the removed object.
  */
-int lustre_hash_additem(struct lustre_class_hash_body *hash_body, void *key, 
-                         struct hlist_node *actual_hnode)
+void *
+lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
 {
-        int hashent;
-        struct lustre_hash_bucket *bucket = NULL;
-        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+        lustre_hash_bucket_t *lhb;
+        unsigned              i;
+        void                 *obj;
         ENTRY;
 
-        LASSERT(hlist_unhashed(actual_hnode));
-
-        hashent = hop->lustre_hashfn(hash_body, key);
+        __lustre_hash_key_validate(lh, key, hnode);
 
-        /* get the hashbucket and lock it */
-        bucket = &hash_body->lchb_hash_tables[hashent];
-        spin_lock(&bucket->lhb_lock);
+        read_lock(&lh->lh_rwlock);
+        i = lh_hash(lh, key, lh->lh_cur_mask);
+        lhb = &lh->lh_buckets[i];
+        LASSERT(i <= lh->lh_cur_mask);
+        LASSERT(!hlist_unhashed(hnode));
 
-        hlist_add_head(actual_hnode, &(bucket->lhb_head));
+        write_lock(&lhb->lhb_rwlock);
+        obj = __lustre_hash_bucket_del(lh, lhb, hnode);
+        write_unlock(&lhb->lhb_rwlock);
+        read_unlock(&lh->lh_rwlock);
 
-#ifdef LUSTRE_HASH_DEBUG
-        /* hash distribute debug */
-        hash_body->lchb_hash_tables[hashent].lhb_item_count++; 
-        CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", 
-                        hash_body->hashname, hashent, 
-                        hash_body->lchb_hash_tables[hashent].lhb_item_count);
-#endif  
-        hop->lustre_hash_object_refcount_get(actual_hnode); 
-
-        spin_unlock(&bucket->lhb_lock);
-
-        RETURN(0);
+        RETURN(obj);
 }
-EXPORT_SYMBOL(lustre_hash_additem);
-
+EXPORT_SYMBOL(lustre_hash_del);
 
-/*
- * this version of delitem will delete a hashitem with given @key, 
- * we need to search the <@key, @value> in hashbucket with @key, 
- * if match, the hashitem will be delete. 
- * we have a no-search version of delitem, it will directly delete a hashitem, 
- * doesn't need to search it in hashtables, so it is a O(1) delete.
+/**
+ * Delete item given @key in lustre hash @lh.  The first @key found in
+ * the hash will be removed, if the key exists multiple times in the hash
+ * @lh this function must be called once per key.  The removed object
+ * will be returned and ops->lh_put is called on the removed object.
  */
-int lustre_hash_delitem_by_key(struct lustre_class_hash_body *hash_body, 
-                               void *key)
+void *
+lustre_hash_del_key(lustre_hash_t *lh, void *key)
 {
-        int hashent ;
-        struct hlist_node * hash_item;
-        struct lustre_hash_bucket *bucket = NULL;
-        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
-        int retval = 0;
+        struct hlist_node    *hnode;
+        lustre_hash_bucket_t *lhb;
+        unsigned              i;
+        void                 *obj = NULL;
         ENTRY;
 
-        hashent = hop->lustre_hashfn(hash_body, key);
+        read_lock(&lh->lh_rwlock);
+        i = lh_hash(lh, key, lh->lh_cur_mask);
+        lhb = &lh->lh_buckets[i];
+        LASSERT(i <= lh->lh_cur_mask);
 
-        /* first, lock the hashbucket */
-        bucket = &hash_body->lchb_hash_tables[hashent];
-        spin_lock(&bucket->lhb_lock);
-
-        /* get the hash_item from hash_bucket */
-        hash_item = lustre_hash_getitem_in_bucket_nolock(hash_body, hashent, 
-                                                         key);
-
-        if (hash_item == NULL) {
-                spin_unlock(&bucket->lhb_lock);
-                RETURN(-ENOENT);
-        }
+        write_lock(&lhb->lhb_rwlock);
+        hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
+        if (hnode)
+                obj = __lustre_hash_bucket_del(lh, lhb, hnode);
 
-        /* call delitem_nolock() to delete the hash_item */
-        retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item);
+        write_unlock(&lhb->lhb_rwlock);
+        read_unlock(&lh->lh_rwlock);
 
-        spin_unlock(&bucket->lhb_lock);
-
-        RETURN(retval);
+        RETURN(obj);
 }
-EXPORT_SYMBOL(lustre_hash_delitem_by_key);
-
-/*
- * the O(1) version of delete hash item, 
- * it will directly delete the hashitem with given @hash_item,
- * the parameter @key used to get the relation hash bucket and lock it.
+EXPORT_SYMBOL(lustre_hash_del_key);
+
+/**
+ * Lookup an item using @key in the lustre hash @lh and return it.
+ * If the @key is found in the hash lh->lh_get() is called and the
+ * matching objects is returned.  It is the callers responsibility
+ * to call the counterpart ops->lh_put using the lh_put() macro
+ * when when finished with the object.  If the @key was not found
+ * in the hash @lh NULL is returned.
  */
-int lustre_hash_delitem(struct lustre_class_hash_body *hash_body, 
-                        void *key, struct hlist_node * hash_item)
-{  
-        int hashent = 0;
-        int retval = 0;
-        struct lustre_hash_bucket *bucket = NULL;
-        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+void *
+lustre_hash_lookup(lustre_hash_t *lh, void *key)
+{
+        struct hlist_node    *hnode;
+        lustre_hash_bucket_t *lhb;
+        unsigned              i;
+        void                 *obj = NULL;
         ENTRY;
 
-        hashent = hop->lustre_hashfn(hash_body, key);
-
-        bucket = &hash_body->lchb_hash_tables[hashent];
-        spin_lock(&bucket->lhb_lock);
+        read_lock(&lh->lh_rwlock);
+        i = lh_hash(lh, key, lh->lh_cur_mask);
+        lhb = &lh->lh_buckets[i];
+        LASSERT(i <= lh->lh_cur_mask);
 
-        /* call delitem_nolock() to delete the hash_item */
-        retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item);
+        read_lock(&lhb->lhb_rwlock);
+        hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
+        if (hnode)
+                obj = lh_get(lh, hnode);
 
-        spin_unlock(&bucket->lhb_lock);
+        read_unlock(&lhb->lhb_rwlock);
+        read_unlock(&lh->lh_rwlock);
 
-        RETURN(retval);
+        RETURN(obj);
 }
-EXPORT_SYMBOL(lustre_hash_delitem);
-
-void lustre_hash_bucket_iterate(struct lustre_class_hash_body *hash_body,
-                                void *key, hash_item_iterate_cb func, void *data)
+EXPORT_SYMBOL(lustre_hash_lookup);
+
+/**
+ * For each item in the lustre hash @lh call the passed callback @func
+ * and pass to it as an argument each hash item and the private @data.
+ * Before each callback ops->lh_get will be called, and after each
+ * callback ops->lh_put will be called.  Finally, during the callback
+ * the bucket lock is held so the callback must never sleep.
+ */
+void
+lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data)
 {
-        int hashent, find = 0;
-        struct lustre_hash_bucket *bucket = NULL;
-        struct hlist_node *hash_item_node = NULL;
-        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
-        struct obd_export *tmp = NULL;
-
+        struct hlist_node    *hnode;
+        lustre_hash_bucket_t *lhb;
+        void                 *obj;
+        int                   i;
         ENTRY;
 
-        hashent = hop->lustre_hashfn(hash_body, key);
-        bucket = &hash_body->lchb_hash_tables[hashent];
-
-        spin_lock(&bucket->lhb_lock);
-        hlist_for_each(hash_item_node, &(bucket->lhb_head)) {
-                find = hop->lustre_hash_key_compare(key, hash_item_node);
-                if (find) {
-                        tmp = hop->lustre_hash_object_refcount_get(hash_item_node);
-                        func(tmp, data);
-                        hop->lustre_hash_object_refcount_put(hash_item_node);
+        read_lock(&lh->lh_rwlock);
+        lh_for_each_bucket(lh, lhb, i) {
+                read_lock(&lhb->lhb_rwlock);
+                hlist_for_each(hnode, &(lhb->lhb_head)) {
+                        __lustre_hash_bucket_validate(lh, lhb, hnode);
+                        obj = lh_get(lh, hnode);
+                        func(obj, data);
+                        (void)lh_put(lh, hnode);
                 }
+                read_unlock(&lhb->lhb_rwlock);
         }
-        spin_unlock(&bucket->lhb_lock);
-}
-EXPORT_SYMBOL(lustre_hash_bucket_iterate);
+        read_unlock(&lh->lh_rwlock);
 
-void lustre_hash_iterate_all(struct lustre_class_hash_body *hash_body,
-                            hash_item_iterate_cb func, void *data)
+        EXIT;
+}
+EXPORT_SYMBOL(lustre_hash_for_each);
+
+/**
+ * For each item in the lustre hash @lh call the passed callback @func
+ * and pass to it as an argument each hash item and the private @data.
+ * Before each callback ops->lh_get will be called, and after each
+ * callback ops->lh_put will be called.  During the callback the
+ * bucket lock will not be held will allows for the current item
+ * to be removed from the hash during the callback.  However, care
+ * should be taken to prevent other callers from operating on the
+ * hash concurrently or list corruption may occur.
+ */
+void
+lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data)
 {
-        int i;
-        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+        struct hlist_node    *hnode;
+        struct hlist_node    *pos;
+        lustre_hash_bucket_t *lhb;
+        void                 *obj;
+        int                   i;
         ENTRY;
 
-        for( i = 0; i < hash_body->lchb_hash_max_size; i++ ) {
-                struct lustre_hash_bucket * bucket;
-                struct hlist_node * actual_hnode, *pos;
-                void *obj;
-
-                bucket = &hash_body->lchb_hash_tables[i];
-#ifdef LUSTRE_HASH_DEBUG
-                CDEBUG(D_INFO, "idx %d - bucket %p\n", i, bucket);
-#endif
-                spin_lock(&bucket->lhb_lock); /* lock the bucket */
-                hlist_for_each_safe(actual_hnode, pos, &(bucket->lhb_head)) {
-                        obj = hop->lustre_hash_object_refcount_get(actual_hnode);
+        read_lock(&lh->lh_rwlock);
+        lh_for_each_bucket(lh, lhb, i) {
+                read_lock(&lhb->lhb_rwlock);
+                hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
+                        __lustre_hash_bucket_validate(lh, lhb, hnode);
+                        obj = lh_get(lh, hnode);
+                        read_unlock(&lhb->lhb_rwlock);
                         func(obj, data);
-                        hop->lustre_hash_object_refcount_put(actual_hnode);
+                        read_lock(&lhb->lhb_rwlock);
+                        (void)lh_put(lh, hnode);
                 }
-                spin_unlock(&bucket->lhb_lock);
+                read_unlock(&lhb->lhb_rwlock);
         }
+        read_unlock(&lh->lh_rwlock);
         EXIT;
 }
-EXPORT_SYMBOL(lustre_hash_iterate_all);
-
-
-void * lustre_hash_get_object_by_key(struct lustre_class_hash_body *hash_body,
-                                     void *key)
+EXPORT_SYMBOL(lustre_hash_for_each_safe);
+
+/**
+ * For each hash bucket in the lustre hash @lh call the passed callback
+ * @func until all the hash buckets are empty.  The passed callback @func
+ * or the previously registered callback lh->lh_put must remove the item
+ * from the hash.  You may either use the lustre_hash_del() or hlist_del()
+ * functions.  No rwlocks will be held during the callback @func it is
+ * safe to sleep if needed.  This function will not terminate until the
+ * hash is empty.  Note it is still possible to concurrently add new
+ * items in to the hash.  It is the callers responsibility to ensure
+ * the required locking is in place to prevent concurrent insertions.
+ */
+void
+lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data)
 {
-        int hashent ;
-        struct hlist_node * hash_item_hnode = NULL;
-        void * obj_value = NULL;
-        struct lustre_hash_bucket *bucket = NULL;
-        struct lustre_hash_operations * hop = hash_body->lchb_hash_operations;
+        struct hlist_node    *hnode;
+        lustre_hash_bucket_t *lhb;
+        void                 *obj;
+        int                   i;
         ENTRY;
 
-        /* get the hash value from the given item */
-        hashent = hop->lustre_hashfn(hash_body, key);
-
-        bucket = &hash_body->lchb_hash_tables[hashent];
-        spin_lock(&bucket->lhb_lock); /* lock the bucket */
-
-        hash_item_hnode = lustre_hash_getitem_in_bucket_nolock(hash_body, 
-                                                               hashent, key);
-
-        if (hash_item_hnode == NULL) {
-                spin_unlock(&bucket->lhb_lock); /* lock the bucket */
-                RETURN(NULL);
+restart:
+        read_lock(&lh->lh_rwlock);
+        lh_for_each_bucket(lh, lhb, i) {
+                write_lock(&lhb->lhb_rwlock);
+                while (!hlist_empty(&lhb->lhb_head)) {
+                        hnode =  lhb->lhb_head.first;
+                        __lustre_hash_bucket_validate(lh, lhb, hnode);
+                        obj = lh_get(lh, hnode);
+                        write_unlock(&lhb->lhb_rwlock);
+                        read_unlock(&lh->lh_rwlock);
+                        func(obj, data);
+                        (void)lh_put(lh, hnode);
+                        goto restart;
+                }
+                write_unlock(&lhb->lhb_rwlock);
         }
-
-        obj_value = hop->lustre_hash_object_refcount_get(hash_item_hnode);
-        spin_unlock(&bucket->lhb_lock); /* lock the bucket */
-
-        RETURN(obj_value);
-}
-EXPORT_SYMBOL(lustre_hash_get_object_by_key);
-
-/* string hashing using djb2 hash algorithm */
-__u32 djb2_hashfn(struct lustre_class_hash_body *hash_body,  void* key,
-                  size_t size)
-{
-        __u32 hash = 5381;
-        int i;
-        char *ptr = key;
-
-        LASSERT(key != NULL);
-
-        for( i = 0; i < size; i++ )
-                hash = hash * 33 + ptr[i];
-
-        hash &= (hash_body->lchb_hash_max_size - 1);
-
-        RETURN(hash);
+        read_unlock(&lh->lh_rwlock);
+        EXIT;
 }
+EXPORT_SYMBOL(lustre_hash_for_each_empty);
 
 /*
- * define (uuid <-> export) hash operations and function define
- */
-
-/* define the uuid hash operations */
-struct lustre_hash_operations uuid_hash_operations = {
-        .lustre_hashfn = uuid_hashfn,
-        .lustre_hash_key_compare = uuid_hash_key_compare,
-        .lustre_hash_object_refcount_get = uuid_export_refcount_get,
-        .lustre_hash_object_refcount_put = uuid_export_refcount_put,
-};
-
-__u32 uuid_hashfn(struct lustre_class_hash_body *hash_body,  void * key)
-{
-        struct obd_uuid * uuid_key = key;
-
-        return djb2_hashfn(hash_body, uuid_key->uuid, sizeof(uuid_key->uuid));
-}
-
-/* Note, it is impossible to find an export that is in failed state with
- * this function */
-int uuid_hash_key_compare(void *key, struct hlist_node *compared_hnode)
+ * For each item in the lustre hash @lh which matches the @key call
+ * the passed callback @func and pass to it as an argument each hash
+ * item and the private @data.  Before each callback ops->lh_get will
+ * be called, and after each callback ops->lh_put will be called.
+ * Finally, during the callback the bucket lock is held so the
+ * callback must never sleep.
+   */
+void
+lustre_hash_for_each_key(lustre_hash_t *lh, void *key,
+                         lh_for_each_cb func, void *data)
 {
-        struct obd_export *export = NULL;
-        struct obd_uuid *uuid_key = NULL, *compared_uuid = NULL;
-
-        LASSERT( key != NULL);
-
-        uuid_key = (struct obd_uuid*)key;
-
-        export = hlist_entry(compared_hnode, struct obd_export, exp_uuid_hash);
-
-        compared_uuid = &export->exp_client_uuid;
-
-        RETURN(obd_uuid_equals(uuid_key, compared_uuid) &&
-               !export->exp_failed);
-}
-
-void * uuid_export_refcount_get(struct hlist_node * actual_hnode)
-{
-        struct obd_export *export = NULL;
-
-        LASSERT(actual_hnode != NULL);
-
-        export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash);
-
-        LASSERT(export != NULL);
-
-        class_export_get(export);
+        struct hlist_node    *hnode;
+        lustre_hash_bucket_t *lhb;
+        unsigned              i;
+        ENTRY;
 
-        RETURN(export);
-}
+        read_lock(&lh->lh_rwlock);
+        i = lh_hash(lh, key, lh->lh_cur_mask);
+        lhb = &lh->lh_buckets[i];
+        LASSERT(i <= lh->lh_cur_mask);
 
-void uuid_export_refcount_put(struct hlist_node * actual_hnode)
-{
-        struct obd_export *export = NULL;
+        read_lock(&lhb->lhb_rwlock);
+        hlist_for_each(hnode, &(lhb->lhb_head)) {
+                __lustre_hash_bucket_validate(lh, lhb, hnode);
 
-        LASSERT(actual_hnode != NULL);
+                if (!lh_compare(lh, key, hnode))
+                        continue;
 
-        export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash);
+                func(lh_get(lh, hnode), data);
+                (void)lh_put(lh, hnode);
+        }
 
-        LASSERT(export != NULL);
+        read_unlock(&lhb->lhb_rwlock);
+        read_unlock(&lh->lh_rwlock);
 
-        class_export_put(export);
+        EXIT;
 }
-
-/*
- * define (nid <-> export) hash operations and function define
+EXPORT_SYMBOL(lustre_hash_for_each_key);
+
+/**
+ * Rehash the lustre hash @lh to the given @bits.  This can be used
+ * to grow the hash size when excessive chaining is detected, or to
+ * shrink the hash when it is larger than needed.  When the LH_REHASH
+ * flag is set in @lh the lustre hash may be dynamically rehashed
+ * during addition or removal if the hash's theta value exceeds
+ * either the lh->lh_min_theta or lh->max_theta values.  By default
+ * these values are tuned to keep the chained hash depth small, and
+ * this approach assumes a reasonably uniform hashing function.  The
+ * theta thresholds for @lh are tunable via lustre_hash_set_theta().
  */
-
-/* define the nid hash operations */
-struct lustre_hash_operations nid_hash_operations = {
-        .lustre_hashfn = nid_hashfn,
-        .lustre_hash_key_compare = nid_hash_key_compare,
-        .lustre_hash_object_refcount_get = nid_export_refcount_get,
-        .lustre_hash_object_refcount_put = nid_export_refcount_put,
-};
-
-__u32 nid_hashfn(struct lustre_class_hash_body *hash_body,  void * key)
-{
-        return djb2_hashfn(hash_body, key, sizeof(lnet_nid_t));
-}
-
-/* Note, it is impossible to find an export that is in failed state with
- * this function */
-int nid_hash_key_compare(void *key, struct hlist_node *compared_hnode)
+int
+lustre_hash_rehash(lustre_hash_t *lh, int bits)
 {
-        struct obd_export *export = NULL;
-        lnet_nid_t *nid_key = NULL;
-
-        LASSERT( key != NULL);
-
-        nid_key = (lnet_nid_t*)key;
-
-        export = hlist_entry(compared_hnode, struct obd_export, exp_nid_hash);
-
-        return (export->exp_connection->c_peer.nid == *nid_key &&
-                !export->exp_failed);
-}
-
-void *nid_export_refcount_get(struct hlist_node *actual_hnode)
-{
-        struct obd_export *export = NULL;
+        struct hlist_node     *hnode;
+        struct hlist_node     *pos;
+        lustre_hash_bucket_t  *lh_buckets;
+        lustre_hash_bucket_t  *rehash_buckets;
+        lustre_hash_bucket_t  *lh_lhb;
+        lustre_hash_bucket_t  *rehash_lhb;
+        int                    i;
+        int                    theta;
+        int                    lh_mask;
+        int                    lh_bits;
+        int                    mask = (1 << bits) - 1;
+        void                  *key;
+        ENTRY;
 
-        LASSERT(actual_hnode != NULL);
+        LASSERT(!in_interrupt());
+        LASSERT(mask > 0);
 
-        export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash);
+        OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) << bits);
+        if (!rehash_buckets)
+                RETURN(-ENOMEM);
 
-        LASSERT(export != NULL);
+        for (i = 0; i <= mask; i++) {
+                INIT_HLIST_HEAD(&rehash_buckets[i].lhb_head);
+                rwlock_init(&rehash_buckets[i].lhb_rwlock);
+                atomic_set(&rehash_buckets[i].lhb_count, 0);
+        }
 
-        class_export_get(export);
+        write_lock(&lh->lh_rwlock);
 
-        RETURN(export);
-}
-
-void nid_export_refcount_put(struct hlist_node *actual_hnode)
-{
-        struct obd_export *export = NULL;
+        /*
+         * Early return for multiple concurrent racing callers,
+         * ensure we only trigger the rehash if it is still needed.
+         */
+        theta = __lustre_hash_theta(lh);
+        if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
+                OBD_VFREE(rehash_buckets, sizeof(*rehash_buckets) << bits);
+                write_unlock(&lh->lh_rwlock);
+                RETURN(-EALREADY);
+        }
 
-        LASSERT(actual_hnode != NULL);
+        lh_bits = lh->lh_cur_bits;
+        lh_buckets = lh->lh_buckets;
+        lh_mask = (1 << lh_bits) - 1;
+
+        lh->lh_cur_bits = bits;
+        lh->lh_cur_mask = (1 << bits) - 1;
+        lh->lh_buckets = rehash_buckets;
+        atomic_inc(&lh->lh_rehash_count);
+
+        for (i = 0; i <= lh_mask; i++) {
+                lh_lhb = &lh_buckets[i];
+
+                write_lock(&lh_lhb->lhb_rwlock);
+                hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
+                        key = lh_key(lh, hnode);
+                        LASSERT(key);
+
+                        /*
+                         * Validate hnode is in the correct bucket.
+                         */
+                        if (unlikely(lh->lh_flags & LH_DEBUG))
+                                LASSERT(lh_hash(lh, key, lh_mask) == i);
+
+                        /*
+                         * Delete from old hash bucket.
+                         */
+                        hlist_del(hnode);
+                        LASSERT(atomic_read(&lh_lhb->lhb_count) > 0);
+                        atomic_dec(&lh_lhb->lhb_count);
+
+                        /*
+                         * Add to rehash bucket, ops->lh_key must be defined.
+                         */
+                        rehash_lhb = &rehash_buckets[lh_hash(lh, key, mask)];
+                        hlist_add_head(hnode, &(rehash_lhb->lhb_head));
+                        atomic_inc(&rehash_lhb->lhb_count);
+                }
 
-        export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash);
+                LASSERT(hlist_empty(&(lh_lhb->lhb_head)));
+                LASSERT(atomic_read(&lh_lhb->lhb_count) == 0);
+                write_unlock(&lh_lhb->lhb_rwlock);
+        }
 
-        LASSERT(export != NULL);
+        OBD_VFREE(lh_buckets, sizeof(*lh_buckets) << lh_bits);
+        write_unlock(&lh->lh_rwlock);
 
-        class_export_put(export);
+        RETURN(0);
 }
-
-/*
- * define (net_peer <-> connection) hash operations and function define
+EXPORT_SYMBOL(lustre_hash_rehash);
+
+/**
+ * Rehash the object referenced by @hnode in the lustre hash @lh.  The
+ * @old_key must be provided to locate the objects previous location
+ * in the hash, and the @new_key will be used to reinsert the object.
+ * Use this function instead of a lustre_hash_add() + lustre_hash_del()
+ * combo when it is critical that there is no window in time where the
+ * object is missing from the hash.  When an object is being rehashed
+ * the registered lh_get() and lh_put() functions will not be called.
  */
-
-/* define the conn hash operations */
-struct lustre_hash_operations conn_hash_operations = {
-        .lustre_hashfn = conn_hashfn,
-        .lustre_hash_key_compare = conn_hash_key_compare,
-        .lustre_hash_object_refcount_get = conn_refcount_get,
-        .lustre_hash_object_refcount_put = conn_refcount_put,
-};
-EXPORT_SYMBOL(conn_hash_operations);
-
-__u32 conn_hashfn(struct lustre_class_hash_body *hash_body,  void * key)
+void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key,
+                            struct hlist_node *hnode)
 {
-        return djb2_hashfn(hash_body, key, sizeof(lnet_process_id_t));
-}
-
-int conn_hash_key_compare(void *key, struct hlist_node *compared_hnode)
-{
-        struct ptlrpc_connection *c = NULL;
-        lnet_process_id_t *conn_key = NULL;
-
-        LASSERT( key != NULL);
-
-        conn_key = (lnet_process_id_t*)key;
-
-        c = hlist_entry(compared_hnode, struct ptlrpc_connection, c_hash);
-
-        return (conn_key->nid == c->c_peer.nid &&
-                conn_key->pid == c->c_peer.pid);
-}
-
-void *conn_refcount_get(struct hlist_node *actual_hnode)
-{
-        struct ptlrpc_connection *c = NULL;
-
-        LASSERT(actual_hnode != NULL);
-
-        c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash);
-
-        LASSERT(c != NULL);
-
-        atomic_inc(&c->c_refcount);
-
-        RETURN(c);
-}
-
-void conn_refcount_put(struct hlist_node *actual_hnode)
-{
-        struct ptlrpc_connection *c = NULL;
-
-        LASSERT(actual_hnode != NULL);
-
-        c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash);
-
-        LASSERT(c != NULL);
-
-        atomic_dec(&c->c_refcount);
-}
-
-/*******************************************************************************/
-/* ( nid<>nidstats ) hash operations define */
-
-struct lustre_hash_operations nid_stat_hash_operations = {
-        .lustre_hashfn = nid_hashfn,
-        .lustre_hash_key_compare = nidstats_hash_key_compare,
-        .lustre_hash_object_refcount_get = nidstats_refcount_get,
-        .lustre_hash_object_refcount_put = nidstats_refcount_put,
-};
-EXPORT_SYMBOL(nid_stat_hash_operations);
+        lustre_hash_bucket_t  *old_lhb;
+        lustre_hash_bucket_t  *new_lhb;
+        unsigned               i;
+        unsigned               j;
+        ENTRY;
 
-int nidstats_hash_key_compare(void *key, struct hlist_node * compared_hnode)
-{
-        struct nid_stat *data;
-        lnet_nid_t *nid_key;
+        __lustre_hash_key_validate(lh, new_key, hnode);
+        LASSERT(!hlist_unhashed(hnode));
+
+        read_lock(&lh->lh_rwlock);
+
+        i = lh_hash(lh, old_key, lh->lh_cur_mask);
+        old_lhb = &lh->lh_buckets[i];
+        LASSERT(i <= lh->lh_cur_mask);
+
+        j = lh_hash(lh, new_key, lh->lh_cur_mask);
+        new_lhb = &lh->lh_buckets[j];
+        LASSERT(j <= lh->lh_cur_mask);
+
+        if (i < j) { /* write_lock ordering */
+                write_lock(&old_lhb->lhb_rwlock);
+                write_lock(&new_lhb->lhb_rwlock);
+        } else if (i > j) {
+                write_lock(&new_lhb->lhb_rwlock);
+                write_lock(&old_lhb->lhb_rwlock);
+        } else { /* do nothing */
+                read_unlock(&lh->lh_rwlock);
+                EXIT;
+                return;
+        }
 
-        LASSERT( key != NULL);
+        /*
+         * Migrate item between hash buckets without calling
+         * the lh_get() and lh_put() callback functions.
+         */
+        hlist_del(hnode);
+        LASSERT(atomic_read(&old_lhb->lhb_count) > 0);
+        atomic_dec(&old_lhb->lhb_count);
+        hlist_add_head(hnode, &(new_lhb->lhb_head));
+        atomic_inc(&new_lhb->lhb_count);
 
-        nid_key = (lnet_nid_t*)key;
-        data = hlist_entry(compared_hnode, struct nid_stat, nid_hash);
+        write_unlock(&new_lhb->lhb_rwlock);
+        write_unlock(&old_lhb->lhb_rwlock);
+        read_unlock(&lh->lh_rwlock);
 
-        return (data->nid == *nid_key);
+        EXIT;
 }
+EXPORT_SYMBOL(lustre_hash_rehash_key);
 
-void* nidstats_refcount_get(struct hlist_node * actual_hnode)
+int lustre_hash_debug_header(char *str, int size)
 {
-        struct nid_stat *data;
-
-        data = hlist_entry(actual_hnode, struct nid_stat, nid_hash);
-        data->nid_exp_ref_count++;
-
-        RETURN(data);
+        return snprintf(str, size,
+                 "%-*s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n", LUSTRE_MAX_HASH_NAME,
+                 "name", "cur", "min", "max", "theta", "t-min", "t-max",
+                 "flags", "rehash", "count", " distribution");
 }
+EXPORT_SYMBOL(lustre_hash_debug_header);
 
-void nidstats_refcount_put(struct hlist_node * actual_hnode)
+int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size)
 {
-        struct nid_stat *data;
-
-        data = hlist_entry(actual_hnode, struct nid_stat, nid_hash);
-        data->nid_exp_ref_count--;
-        EXIT;
+        lustre_hash_bucket_t  *lhb;
+        int                    theta;
+        int                    i;
+        int                    c = 0;
+        int                    dist[8] = { 0, };
+
+        if (str == NULL || size == 0)
+                return 0;
+
+        read_lock(&lh->lh_rwlock);
+        theta = __lustre_hash_theta(lh);
+
+        c += snprintf(str + c, size - c, "%-*s ",
+                      LUSTRE_MAX_HASH_NAME, lh->lh_name);
+        c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_cur_bits);
+        c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_min_bits);
+        c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_max_bits);
+        c += snprintf(str + c, size - c, "%d.%03d ",
+                      __lustre_hash_theta_int(theta),
+                      __lustre_hash_theta_frac(theta));
+        c += snprintf(str + c, size - c, "%d.%03d ",
+                      __lustre_hash_theta_int(lh->lh_min_theta),
+                      __lustre_hash_theta_frac(lh->lh_min_theta));
+        c += snprintf(str + c, size - c, "%d.%03d ",
+                      __lustre_hash_theta_int(lh->lh_max_theta),
+                      __lustre_hash_theta_frac(lh->lh_max_theta));
+        c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags);
+        c += snprintf(str + c, size - c, "%6d ",
+                      atomic_read(&lh->lh_rehash_count));
+        c += snprintf(str + c, size - c, "%5d ",
+                      atomic_read(&lh->lh_count));
+
+        /*
+         * The distribution is a summary of the chained hash depth in
+         * each of the lustre hash buckets.  Each buckets lhb_count is
+         * divided by the hash theta value and used to generate a
+         * histogram of the hash distribution.  A uniform hash will
+         * result in all hash buckets being close to the average thus
+         * only the first few entries in the histogram will be non-zero.
+         * If you hash function results in a non-uniform hash the will
+         * be observable by outlier bucks in the distribution histogram.
+         *
+         * Uniform hash distribution:      128/128/0/0/0/0/0/0
+         * Non-Uniform hash distribution:  128/125/0/0/0/0/2/1
+         */
+        lh_for_each_bucket(lh, lhb, i)
+                dist[min(__fls(atomic_read(&lhb->lhb_count)/max(theta,1)),7)]++;
+
+        for (i = 0; i < 8; i++)
+                c += snprintf(str + c, size - c, "%d%c",  dist[i],
+                              (i == 7) ? '\n' : '/');
+
+        read_unlock(&lh->lh_rwlock);
+
+        return c;
 }
-
-/*******************************************************************************/
+EXPORT_SYMBOL(lustre_hash_debug_str);