Whamcloud - gitweb
b=16776
[fs/lustre-release.git] / lustre / obdclass / class_hash.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/class_hash.c
37  *
38  * Implement a hash class for hash process in lustre system.
39  *
40  * Author: YuZhangyong <yzy@clusterfs.com>
41  *
42  * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov>
43  * - Simplified API and improved documentation
44  * - Added per-hash feature flags:
45  *   * LH_DEBUG additional validation
46  *   * LH_REHASH dynamic rehashing
47  * - Added per-hash statistics
48  * - General performance enhancements
49  */
50
51 #ifndef __KERNEL__
52 #include <liblustre.h>
53 #include <obd.h>
54 #endif
55
56 #include <class_hash.h>
57
58 /**
59  * Initialize new lustre hash, where:
60  * @name     - Descriptive hash name
61  * @cur_size - Initial hash table size
62  * @max_size - Maximum allowed hash table resize
63  * @ops      - Registered hash table operations
64  * @flags    - LH_REHASH enable synamic hash resizing
65  *           - LH_SORT enable chained hash sort
66  */
67 lustre_hash_t *
68 lustre_hash_init(char *name, unsigned int cur_size, unsigned int max_size,
69                  lustre_hash_ops_t *ops, int flags)
70 {
71         lustre_hash_t *lh;
72         int            i;
73         ENTRY;
74   
75         LASSERT(name != NULL);
76         LASSERT(ops != NULL);
77
78         /* 
79          * Ensure hash is a power of two to allow the use of a bitmask
80          * in the hash function instead of a more expensive modulus. 
81          */
82         LASSERTF(cur_size && (cur_size & (cur_size - 1)) == 0,
83                  "Size (%u) is not power of 2\n", cur_size);
84         LASSERTF(max_size && (max_size & (max_size - 1)) == 0,
85                  "Size (%u) is not power of 2\n", max_size);
86   
87         OBD_ALLOC_PTR(lh);
88         if (!lh)
89                 RETURN(NULL);
90   
91         lh->lh_name_size = strlen(name) + 1;
92         rwlock_init(&lh->lh_rwlock);
93   
94         OBD_ALLOC(lh->lh_name, lh->lh_name_size);
95         if (!lh->lh_name) {
96                 OBD_FREE_PTR(lh);
97                 RETURN(NULL);
98         }
99   
100         strncpy(lh->lh_name, name, lh->lh_name_size);
101   
102         atomic_set(&lh->lh_count, 0);
103         atomic_set(&lh->lh_rehash_count, 0);
104         lh->lh_cur_size = cur_size;
105         lh->lh_min_size = cur_size;
106         lh->lh_max_size = max_size;
107         lh->lh_min_theta = 500;  /* theta * 1000 */
108         lh->lh_max_theta = 2000; /* theta * 1000 */
109         lh->lh_ops = ops;
110         lh->lh_flags = flags;
111
112         OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size);
113         if (!lh->lh_buckets) {
114                 OBD_FREE(lh->lh_name, lh->lh_name_size);
115                 OBD_FREE_PTR(lh);
116                 RETURN(NULL);
117         }
118   
119         for (i = 0; i < lh->lh_cur_size; i++) {
120                 INIT_HLIST_HEAD(&lh->lh_buckets[i].lhb_head);
121                 rwlock_init(&lh->lh_buckets[i].lhb_rwlock);
122                 atomic_set(&lh->lh_buckets[i].lhb_count, 0);
123         }
124   
125         return lh;
126 }
127 EXPORT_SYMBOL(lustre_hash_init);
128   
129 /**
130  * Cleanup lustre hash @lh.
131  */
132 void
133 lustre_hash_exit(lustre_hash_t *lh)
134 {
135         lustre_hash_bucket_t *lhb;
136         struct hlist_node    *hnode;
137         struct hlist_node    *pos;
138         int                   i;
139         ENTRY;
140   
141         if (!lh)
142                 return;
143   
144         write_lock(&lh->lh_rwlock);
145   
146         lh_for_each_bucket(lh, lhb, i) {
147                 write_lock(&lhb->lhb_rwlock);
148                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
149                         __lustre_hash_bucket_validate(lh, lhb, hnode);
150                         __lustre_hash_bucket_del(lh, lhb, hnode);
151                         lh_exit(lh, hnode);
152                 }
153   
154                 LASSERT(hlist_empty(&(lhb->lhb_head)));
155                 LASSERT(atomic_read(&lhb->lhb_count) == 0);
156                 write_unlock(&lhb->lhb_rwlock);
157         }
158   
159         OBD_VFREE(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size);
160         OBD_FREE(lh->lh_name, lh->lh_name_size);
161   
162         LASSERT(atomic_read(&lh->lh_count) == 0);
163         write_unlock(&lh->lh_rwlock);
164   
165         OBD_FREE_PTR(lh);
166         EXIT;
167 }
168 EXPORT_SYMBOL(lustre_hash_exit);
169
170 static inline unsigned int lustre_hash_rehash_size(lustre_hash_t *lh)
171 {
172         if (!(lh->lh_flags & LH_REHASH))
173                 return 0;
174
175         if ((lh->lh_cur_size < lh->lh_max_size) &&
176             (__lustre_hash_theta(lh) > lh->lh_max_theta))
177                 return MIN(lh->lh_cur_size * 2, lh->lh_max_size);
178
179         if ((lh->lh_cur_size > lh->lh_min_size) &&
180             (__lustre_hash_theta(lh) < lh->lh_min_theta))
181                 return MAX(lh->lh_cur_size / 2, lh->lh_min_size);
182
183         return 0;
184 }
185   
186 /**
187  * Add item @hnode to lustre hash @lh using @key.  The registered
188  * ops->lh_get function will be called when the item is added.
189  */
190 void
191 lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
192 {
193         lustre_hash_bucket_t *lhb;
194         int                   size;
195         unsigned              i;
196         ENTRY;
197   
198         __lustre_hash_key_validate(lh, key, hnode);
199
200         read_lock(&lh->lh_rwlock);
201         i = lh_hash(lh, key, lh->lh_cur_size - 1);
202         lhb = &lh->lh_buckets[i];
203         LASSERT(i < lh->lh_cur_size);
204         LASSERT(hlist_unhashed(hnode));
205
206         write_lock(&lhb->lhb_rwlock);
207         __lustre_hash_bucket_add(lh, lhb, hnode);
208         write_unlock(&lhb->lhb_rwlock);
209
210         size = lustre_hash_rehash_size(lh);
211         read_unlock(&lh->lh_rwlock);
212         if (size)
213                 lustre_hash_rehash(lh, size);
214   
215         EXIT;
216 }
217 EXPORT_SYMBOL(lustre_hash_add);
218   
219 /**
220  * Add item @hnode to lustre hash @lh using @key.  The registered
221  * ops->lh_get function will be called if the item was added.
222  * Returns 0 on success or -EALREADY on key collisions.
223  */
224 int
225 lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
226 {
227         lustre_hash_bucket_t *lhb;
228         int                   size;
229         int                   rc = -EALREADY;
230         unsigned              i;
231         ENTRY;
232   
233         __lustre_hash_key_validate(lh, key, hnode);
234   
235         read_lock(&lh->lh_rwlock);
236         i = lh_hash(lh, key, lh->lh_cur_size - 1);
237         lhb = &lh->lh_buckets[i];
238         LASSERT(i < lh->lh_cur_size);
239         LASSERT(hlist_unhashed(hnode));
240   
241         write_lock(&lhb->lhb_rwlock);
242         if (!__lustre_hash_bucket_lookup(lh, lhb, key)) {
243                 __lustre_hash_bucket_add(lh, lhb, hnode);
244                 rc = 0;
245         }
246         write_unlock(&lhb->lhb_rwlock);
247   
248         size = lustre_hash_rehash_size(lh);
249         read_unlock(&lh->lh_rwlock);
250         if (size)
251                 lustre_hash_rehash(lh, size);
252   
253         RETURN(rc);
254 }
255 EXPORT_SYMBOL(lustre_hash_add_unique);
256   
257 /**
258  * Add item @hnode to lustre hash @lh using @key.  If this @key
259  * already exists in the hash then ops->lh_get will be called on the
260  * conflicting entry and that entry will be returned to the caller.
261  * Otherwise ops->lh_get is called on the item which was added.
262  */
263 void *
264 lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
265                            struct hlist_node *hnode)
266 {
267         struct hlist_node    *existing_hnode;
268         lustre_hash_bucket_t *lhb;
269         int                   size;
270         unsigned              i;
271         void                 *obj;
272         ENTRY;
273   
274         __lustre_hash_key_validate(lh, key, hnode);
275   
276         read_lock(&lh->lh_rwlock);
277         i = lh_hash(lh, key, lh->lh_cur_size - 1);
278         lhb = &lh->lh_buckets[i];
279         LASSERT(i < lh->lh_cur_size);
280         LASSERT(hlist_unhashed(hnode));
281
282         write_lock(&lhb->lhb_rwlock);
283         existing_hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
284         if (existing_hnode)
285                 obj = lh_get(lh, existing_hnode);
286         else
287                 obj = __lustre_hash_bucket_add(lh, lhb, hnode);
288         write_unlock(&lhb->lhb_rwlock);
289
290         size = lustre_hash_rehash_size(lh);
291         read_unlock(&lh->lh_rwlock);
292         if (size)
293                 lustre_hash_rehash(lh, size);
294   
295         RETURN(obj);
296 }
297 EXPORT_SYMBOL(lustre_hash_findadd_unique);
298   
299 /**
300  * Delete item @hnode from the lustre hash @lh using @key.  The @key
301  * is required to ensure the correct hash bucket is locked since there
302  * is no direct linkage from the item to the bucket.  The object
303  * removed from the hash will be returned and obs->lh_put is called
304  * on the removed object.
305  */
306 void *
307 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
308 {
309         lustre_hash_bucket_t *lhb;
310         int                   size;
311         unsigned              i;
312         void                 *obj;
313         ENTRY;
314   
315         __lustre_hash_key_validate(lh, key, hnode);
316   
317         read_lock(&lh->lh_rwlock);
318         i = lh_hash(lh, key, lh->lh_cur_size - 1);
319         lhb = &lh->lh_buckets[i];
320         LASSERT(i < lh->lh_cur_size);
321         LASSERT(!hlist_unhashed(hnode));
322
323         write_lock(&lhb->lhb_rwlock);
324         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
325         write_unlock(&lhb->lhb_rwlock);
326
327         size = lustre_hash_rehash_size(lh);
328         read_unlock(&lh->lh_rwlock);
329         if (size)
330                 lustre_hash_rehash(lh, size);
331   
332         RETURN(obj);
333 }
334 EXPORT_SYMBOL(lustre_hash_del);
335   
336 /**
337  * Delete item given @key in lustre hash @lh.  The first @key found in
338  * the hash will be removed, if the key exists multiple times in the hash
339  * @lh this function must be called once per key.  The removed object
340  * will be returned and ops->lh_put is called on the removed object.
341  */
342 void *
343 lustre_hash_del_key(lustre_hash_t *lh, void *key)
344 {
345         struct hlist_node    *hnode;
346         lustre_hash_bucket_t *lhb;
347         int                   size;
348         unsigned              i;
349         void                 *obj = NULL;
350         ENTRY;
351   
352         read_lock(&lh->lh_rwlock);
353         i = lh_hash(lh, key, lh->lh_cur_size - 1);
354         lhb = &lh->lh_buckets[i];
355         LASSERT(i < lh->lh_cur_size);
356
357         write_lock(&lhb->lhb_rwlock);
358         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
359         if (hnode)
360                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
361
362         write_unlock(&lhb->lhb_rwlock);
363
364         size = lustre_hash_rehash_size(lh);
365         read_unlock(&lh->lh_rwlock);
366         if (size)
367                 lustre_hash_rehash(lh, size);
368   
369         RETURN(obj);
370 }
371 EXPORT_SYMBOL(lustre_hash_del_key);
372   
373 /**
374  * Lookup an item using @key in the lustre hash @lh and return it.
375  * If the @key is found in the hash lh->lh_get() is called and the
376  * matching objects is returned.  It is the callers responsibility
377  * to call the counterpart ops->lh_put using the lh_put() macro
378  * when when finished with the object.  If the @key was not found
379  * in the hash @lh NULL is returned.
380  */
381 void *
382 lustre_hash_lookup(lustre_hash_t *lh, void *key)
383 {
384         struct hlist_node    *hnode;
385         lustre_hash_bucket_t *lhb;
386         unsigned              i;
387         void                 *obj = NULL;
388         ENTRY;
389   
390         read_lock(&lh->lh_rwlock);
391         i = lh_hash(lh, key, lh->lh_cur_size - 1);
392         lhb = &lh->lh_buckets[i];
393         LASSERT(i < lh->lh_cur_size);
394
395         read_lock(&lhb->lhb_rwlock);
396         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
397         if (hnode)
398                 obj = lh_get(lh, hnode);
399   
400         read_unlock(&lhb->lhb_rwlock);
401         read_unlock(&lh->lh_rwlock);
402   
403         RETURN(obj);
404 }
405 EXPORT_SYMBOL(lustre_hash_lookup);
406   
407 /**
408  * For each item in the lustre hash @lh call the passed callback @func
409  * and pass to it as an argument each hash item and the private @data.
410  * Before each callback ops->lh_get will be called, and after each
411  * callback ops->lh_put will be called.  Finally, during the callback
412  * the bucket lock is held so the callback must never sleep.
413  */
414 void
415 lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data)
416 {
417         struct hlist_node    *hnode;
418         lustre_hash_bucket_t *lhb;
419         void                 *obj;
420         int                   i;
421         ENTRY;
422   
423         read_lock(&lh->lh_rwlock);
424         lh_for_each_bucket(lh, lhb, i) {
425                 read_lock(&lhb->lhb_rwlock);
426                 hlist_for_each(hnode, &(lhb->lhb_head)) {
427                         __lustre_hash_bucket_validate(lh, lhb, hnode);
428                         obj = lh_get(lh, hnode);
429                         func(obj, data);
430                         (void)lh_put(lh, hnode);
431                 }
432                 read_unlock(&lhb->lhb_rwlock);
433         }
434         read_unlock(&lh->lh_rwlock);
435
436         EXIT;
437 }
438 EXPORT_SYMBOL(lustre_hash_for_each);
439   
440 /**
441  * For each item in the lustre hash @lh call the passed callback @func
442  * and pass to it as an argument each hash item and the private @data.
443  * Before each callback ops->lh_get will be called, and after each
444  * callback ops->lh_put will be called.  During the callback the
445  * bucket lock will not be held will allows for the current item
446  * to be removed from the hash during the callback.  However, care
447  * should be taken to prevent other callers from operating on the
448  * hash concurrently or list corruption may occur.
449  */
450 void
451 lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data)
452 {
453         struct hlist_node    *hnode;
454         struct hlist_node    *pos;
455         lustre_hash_bucket_t *lhb;
456         void                 *obj;
457         int                   i;
458         ENTRY;
459   
460         read_lock(&lh->lh_rwlock);
461         lh_for_each_bucket(lh, lhb, i) {
462                 read_lock(&lhb->lhb_rwlock);
463                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
464                         __lustre_hash_bucket_validate(lh, lhb, hnode);
465                         obj = lh_get(lh, hnode);
466                         read_unlock(&lhb->lhb_rwlock);
467                         func(obj, data);
468                         read_lock(&lhb->lhb_rwlock);
469                         (void)lh_put(lh, hnode);
470                 }
471                 read_unlock(&lhb->lhb_rwlock);
472         }
473         read_unlock(&lh->lh_rwlock);
474         EXIT;
475 }
476 EXPORT_SYMBOL(lustre_hash_for_each_safe);
477   
478 /**
479  * For each hash bucket in the lustre hash @lh call the passed callback
480  * @func until all the hash buckets are empty.  The passed callback @func
481  * or the previously registered callback lh->lh_put must remove the item
482  * from the hash.  You may either use the lustre_hash_del() or hlist_del()
483  * functions.  No rwlocks will be held during the callback @func it is
484  * safe to sleep if needed.  This function will not terminate until the
485  * hash is empty.  Note it is still possible to concurrently add new
486  * items in to the hash.  It is the callers responsibility to ensure
487  * the required locking is in place to prevent concurrent insertions.
488  */
489 void
490 lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data)
491 {
492         struct hlist_node    *hnode;
493         lustre_hash_bucket_t *lhb;
494         void                 *obj;
495         int                   i;
496         ENTRY;
497   
498 restart:
499         read_lock(&lh->lh_rwlock);
500         lh_for_each_bucket(lh, lhb, i) {
501                 write_lock(&lhb->lhb_rwlock);
502                 while (!hlist_empty(&lhb->lhb_head)) {
503                         hnode =  lhb->lhb_head.first;
504                         __lustre_hash_bucket_validate(lh, lhb, hnode);
505                         obj = lh_get(lh, hnode);
506                         write_unlock(&lhb->lhb_rwlock);
507                         read_unlock(&lh->lh_rwlock);
508                         func(obj, data);
509                         (void)lh_put(lh, hnode);
510                         goto restart;
511                 }
512                 write_unlock(&lhb->lhb_rwlock);
513         }
514         read_unlock(&lh->lh_rwlock);
515         EXIT;
516 }
517 EXPORT_SYMBOL(lustre_hash_for_each_empty);
518   
519   /*
520  * For each item in the lustre hash @lh which matches the @key call
521  * the passed callback @func and pass to it as an argument each hash
522  * item and the private @data.  Before each callback ops->lh_get will
523  * be called, and after each callback ops->lh_put will be called.
524  * Finally, during the callback the bucket lock is held so the
525  * callback must never sleep.
526    */
527 void
528 lustre_hash_for_each_key(lustre_hash_t *lh, void *key,
529                          lh_for_each_cb func, void *data)
530 {
531         struct hlist_node    *hnode;
532         lustre_hash_bucket_t *lhb;
533         unsigned              i;
534         ENTRY;
535   
536         read_lock(&lh->lh_rwlock);
537         i = lh_hash(lh, key, lh->lh_cur_size - 1);
538         lhb = &lh->lh_buckets[i];
539         LASSERT(i < lh->lh_cur_size);
540   
541         read_lock(&lhb->lhb_rwlock);
542         hlist_for_each(hnode, &(lhb->lhb_head)) {
543                 __lustre_hash_bucket_validate(lh, lhb, hnode);
544   
545                 if (!lh_compare(lh, key, hnode))
546                         continue;
547   
548                 func(lh_get(lh, hnode), data);
549                 (void)lh_put(lh, hnode);
550         }
551   
552         read_unlock(&lhb->lhb_rwlock);
553         read_unlock(&lh->lh_rwlock);
554   
555         EXIT;
556 }
557 EXPORT_SYMBOL(lustre_hash_for_each_key);
558   
559 /**
560  * Rehash the lustre hash @lh to the given @size.  This can be used
561  * to grow the hash size when excessive chaining is detected, or to
562  * shrink the hash when it is larger than needed.  When the LH_REHASH
563  * flag is set in @lh the lustre hash may be dynamically rehashed
564  * during addition or removal if the hash's theta value exceeds
565  * either the lh->lh_min_theta or lh->max_theta values.  By default
566  * these values are tuned to keep the chained hash depth small, and
567  * this approach assumes a reasonably uniform hashing function.  The
568  * theta thresholds for @lh are tunable via lustre_hash_set_theta().
569  */
570 int
571 lustre_hash_rehash(lustre_hash_t *lh, int size)
572 {
573         struct hlist_node     *hnode;
574         struct hlist_node     *pos;
575         lustre_hash_bucket_t  *lh_buckets;
576         lustre_hash_bucket_t  *rehash_buckets;
577         lustre_hash_bucket_t  *lh_lhb;
578         lustre_hash_bucket_t  *rehash_lhb;
579         int                    i;
580         int                    lh_size;
581         int                    theta;
582         void                  *key;
583         ENTRY;
584   
585         LASSERT(size > 0);
586   
587         OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) * size);
588         if (!rehash_buckets)
589                 RETURN(-ENOMEM);
590   
591         for (i = 0; i < size; i++) {
592                 INIT_HLIST_HEAD(&rehash_buckets[i].lhb_head);
593                 rwlock_init(&rehash_buckets[i].lhb_rwlock);
594                 atomic_set(&rehash_buckets[i].lhb_count, 0);
595         }
596   
597         write_lock(&lh->lh_rwlock);
598
599         /* 
600          * Early return for multiple concurrent racing callers,
601          * ensure we only trigger the rehash if it is still needed. 
602          */
603         theta = __lustre_hash_theta(lh);
604         if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
605                 OBD_VFREE(rehash_buckets, sizeof(*rehash_buckets) * size);
606                 write_unlock(&lh->lh_rwlock);
607                 RETURN(-EALREADY);
608         }
609   
610         lh_size = lh->lh_cur_size;
611         lh_buckets = lh->lh_buckets;
612   
613         lh->lh_cur_size = size;
614         lh->lh_buckets = rehash_buckets;
615         atomic_inc(&lh->lh_rehash_count);
616
617         for (i = 0; i < lh_size; i++) {
618                 lh_lhb = &lh_buckets[i];
619
620                 write_lock(&lh_lhb->lhb_rwlock);
621                 hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
622                         key = lh_key(lh, hnode);
623                         LASSERT(key);
624
625                         /* 
626                          * Validate hnode is in the correct bucket.
627                          */
628                         if (unlikely(lh->lh_flags & LH_DEBUG))
629                                 LASSERT(lh_hash(lh, key, lh_size - 1) == i);
630
631                         /* 
632                          * Delete from old hash bucket.
633                          */
634                         hlist_del(hnode);
635                         LASSERT(atomic_read(&lh_lhb->lhb_count) > 0);
636                         atomic_dec(&lh_lhb->lhb_count);
637
638                         /* 
639                          * Add to rehash bucket, ops->lh_key must be defined. 
640                          */
641                         rehash_lhb = &rehash_buckets[lh_hash(lh, key, size-1)];
642                         hlist_add_head(hnode, &(rehash_lhb->lhb_head));
643                         atomic_inc(&rehash_lhb->lhb_count);
644                 }
645   
646                 LASSERT(hlist_empty(&(lh_lhb->lhb_head)));
647                 LASSERT(atomic_read(&lh_lhb->lhb_count) == 0);
648                 write_unlock(&lh_lhb->lhb_rwlock);
649         }
650   
651         OBD_VFREE(lh_buckets, sizeof(*lh_buckets) * lh_size);
652         write_unlock(&lh->lh_rwlock);
653   
654         RETURN(0);
655 }
656 EXPORT_SYMBOL(lustre_hash_rehash);
657   
658 /**
659  * Rehash the object referenced by @hnode in the lustre hash @lh.  The
660  * @old_key must be provided to locate the objects previous location
661  * in the hash, and the @new_key will be used to reinsert the object.
662  * Use this function instead of a lustre_hash_add() + lustre_hash_del()
663  * combo when it is critical that there is no window in time where the
664  * object is missing from the hash.  When an object is being rehashed
665  * the registered lh_get() and lh_put() functions will not be called.
666  */
667 void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key,
668                             struct hlist_node *hnode)
669 {
670         lustre_hash_bucket_t  *old_lhb;
671         lustre_hash_bucket_t  *new_lhb;
672         unsigned               i;
673         int                    j;
674         ENTRY;
675   
676         __lustre_hash_key_validate(lh, new_key, hnode);
677         LASSERT(!hlist_unhashed(hnode));
678   
679         read_lock(&lh->lh_rwlock);
680   
681         i = lh_hash(lh, old_key, lh->lh_cur_size - 1);
682         old_lhb = &lh->lh_buckets[i];
683         LASSERT(i < lh->lh_cur_size);
684
685         j = lh_hash(lh, new_key, lh->lh_cur_size - 1);
686         new_lhb = &lh->lh_buckets[j];
687         LASSERT(j < lh->lh_cur_size);
688
689         write_lock(&old_lhb->lhb_rwlock);
690         write_lock(&new_lhb->lhb_rwlock);
691
692         /* 
693          * Migrate item between hash buckets without calling
694          * the lh_get() and lh_put() callback functions. 
695          */
696         hlist_del(hnode);
697         LASSERT(atomic_read(&old_lhb->lhb_count) > 0);
698         atomic_dec(&old_lhb->lhb_count);
699         hlist_add_head(hnode, &(new_lhb->lhb_head));
700         atomic_inc(&new_lhb->lhb_count);
701
702         write_unlock(&new_lhb->lhb_rwlock);
703         write_unlock(&old_lhb->lhb_rwlock);
704         read_unlock(&lh->lh_rwlock);
705   
706         EXIT;
707 }
708 EXPORT_SYMBOL(lustre_hash_rehash_key);
709   
710 int lustre_hash_debug_header(char *str, int size)
711 {
712         return snprintf(str, size,
713                  "%-36s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n",
714                  "name", "cur", "min", "max", "theta", "t-min", "t-max",
715                  "flags", "rehash", "count", " distribution");
716 }
717 EXPORT_SYMBOL(lustre_hash_debug_header);
718
719 int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size)
720 {
721         lustre_hash_bucket_t  *lhb;
722         int                    theta;
723         int                    i;
724         int                    c = 0;
725         int                    dist[8] = { 0, };
726
727         if (str == NULL || size == 0)
728                 return 0;
729
730         read_lock(&lh->lh_rwlock);
731         theta = __lustre_hash_theta(lh);
732
733         c += snprintf(str + c, size - c, "%-36s ",lh->lh_name);
734         c += snprintf(str + c, size - c, "%5d ",  lh->lh_cur_size);
735         c += snprintf(str + c, size - c, "%5d ",  lh->lh_min_size);
736         c += snprintf(str + c, size - c, "%5d ",  lh->lh_max_size);
737         c += snprintf(str + c, size - c, "%d.%03d ",
738                       theta / 1000, theta % 1000);
739         c += snprintf(str + c, size - c, "%d.%03d ",
740                       lh->lh_min_theta / 1000, lh->lh_min_theta % 1000);
741         c += snprintf(str + c, size - c, "%d.%03d ",
742                       lh->lh_max_theta / 1000, lh->lh_max_theta % 1000);
743         c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags);
744         c += snprintf(str + c, size - c, "%6d ",
745                       atomic_read(&lh->lh_rehash_count));
746         c += snprintf(str + c, size - c, "%5d ",
747                       atomic_read(&lh->lh_count));
748
749         /* 
750          * The distribution is a summary of the chained hash depth in
751          * each of the lustre hash buckets.  Each buckets lhb_count is
752          * divided by the hash theta value and used to generate a
753          * histogram of the hash distribution.  A uniform hash will
754          * result in all hash buckets being close to the average thus
755          * only the first few entries in the histogram will be non-zero.
756          * If you hash function results in a non-uniform hash the will
757          * be observable by outlier bucks in the distribution histogram.
758          *
759          * Uniform hash distribution:      128/128/0/0/0/0/0/0
760          * Non-Uniform hash distribution:  128/125/0/0/0/0/2/1
761          */
762         lh_for_each_bucket(lh, lhb, i)
763                 dist[MIN(fls(atomic_read(&lhb->lhb_count)/MAX(theta,1)),7)]++;
764
765         for (i = 0; i < 8; i++)
766                 c += snprintf(str + c, size - c, "%d%c",  dist[i],
767                               (i == 7) ? '\n' : '/');
768   
769         read_unlock(&lh->lh_rwlock);
770   
771         return c;
772 }
773 EXPORT_SYMBOL(lustre_hash_debug_str);