Whamcloud - gitweb
6940f0d0384755f5bdded0b897146227db1aab4c
[fs/lustre-release.git] / lustre / obdclass / class_hash.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/class_hash.c
37  *
38  * Implement a hash class for hash process in lustre system.
39  *
40  * Author: YuZhangyong <yzy@clusterfs.com>
41  *
42  * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov>
43  * - Simplified API and improved documentation
44  * - Added per-hash feature flags:
45  *   * LH_DEBUG additional validation
46  *   * LH_REHASH dynamic rehashing
47  * - Added per-hash statistics
48  * - General performance enhancements
49  */
50
51 #ifndef __KERNEL__
52 #include <liblustre.h>
53 #include <obd.h>
54 #endif
55
56 #include <class_hash.h>
57
58 /**
59  * Initialize new lustre hash, where:
60  * @name     - Descriptive hash name
61  * @cur_bits - Initial hash table size, in bits
62  * @max_bits - Maximum allowed hash table resize, in bits
63  * @ops      - Registered hash table operations
64  * @flags    - LH_REHASH enable synamic hash resizing
65  *           - LH_SORT enable chained hash sort
66  */
67 lustre_hash_t *
68 lustre_hash_init(char *name, unsigned int cur_bits, unsigned int max_bits,
69                  lustre_hash_ops_t *ops, int flags)
70 {
71         lustre_hash_t *lh;
72         int            i;
73         ENTRY;
74
75         LASSERT(name != NULL);
76         LASSERT(ops != NULL);
77
78         LASSERT(cur_bits > 0);
79         LASSERT(max_bits >= cur_bits);
80         LASSERT(max_bits < 31);
81
82         OBD_ALLOC_PTR(lh);
83         if (!lh)
84                 RETURN(NULL);
85
86         strncpy(lh->lh_name, name, sizeof(lh->lh_name));
87         lh->lh_name[sizeof(lh->lh_name) - 1] = '\0';
88         atomic_set(&lh->lh_rehash_count, 0);
89         atomic_set(&lh->lh_count, 0);
90         rwlock_init(&lh->lh_rwlock);
91         lh->lh_cur_bits = cur_bits;
92         lh->lh_cur_mask = (1 << cur_bits) - 1;
93         lh->lh_min_bits = cur_bits;
94         lh->lh_max_bits = max_bits;
95         /* XXX: need to fixup lustre_hash_rehash_bits() before this can be
96          *      anything other than 0.5 and 2.0 */
97         lh->lh_min_theta = 1 << (LH_THETA_BITS - 1);
98         lh->lh_max_theta = 1 << (LH_THETA_BITS + 1);
99         lh->lh_ops = ops;
100         lh->lh_flags = flags;
101
102         /* theta * 1000 */
103         __lustre_hash_set_theta(lh, 500, 2000);
104
105         OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
106         if (!lh->lh_buckets) {
107                 OBD_FREE_PTR(lh);
108                 RETURN(NULL);
109         }
110
111         for (i = 0; i <= lh->lh_cur_mask; i++) {
112                 INIT_HLIST_HEAD(&lh->lh_buckets[i].lhb_head);
113                 rwlock_init(&lh->lh_buckets[i].lhb_rwlock);
114                 atomic_set(&lh->lh_buckets[i].lhb_count, 0);
115         }
116
117         return lh;
118 }
119 EXPORT_SYMBOL(lustre_hash_init);
120
121 /**
122  * Cleanup lustre hash @lh.
123  */
124 void
125 lustre_hash_exit(lustre_hash_t *lh)
126 {
127         lustre_hash_bucket_t *lhb;
128         struct hlist_node    *hnode;
129         struct hlist_node    *pos;
130         int                   i;
131         ENTRY;
132
133         LASSERT(lh != NULL);
134
135         write_lock(&lh->lh_rwlock);
136
137         lh_for_each_bucket(lh, lhb, i) {
138                 write_lock(&lhb->lhb_rwlock);
139                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
140                         __lustre_hash_bucket_validate(lh, lhb, hnode);
141                         __lustre_hash_bucket_del(lh, lhb, hnode);
142                         lh_exit(lh, hnode);
143                 }
144
145                 LASSERT(hlist_empty(&(lhb->lhb_head)));
146                 LASSERT(atomic_read(&lhb->lhb_count) == 0);
147                 write_unlock(&lhb->lhb_rwlock);
148         }
149
150         LASSERT(atomic_read(&lh->lh_count) == 0);
151         write_unlock(&lh->lh_rwlock);
152
153         OBD_VFREE(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
154         OBD_FREE_PTR(lh);
155         EXIT;
156 }
157 EXPORT_SYMBOL(lustre_hash_exit);
158
159 static inline unsigned int lustre_hash_rehash_bits(lustre_hash_t *lh)
160 {
161         if (!(lh->lh_flags & LH_REHASH))
162                 return 0;
163
164         /* XXX: need to handle case with max_theta != 2.0
165          *      and the case with min_theta != 0.5 */
166         if ((lh->lh_cur_bits < lh->lh_max_bits) &&
167             (__lustre_hash_theta(lh) > lh->lh_max_theta))
168                 return lh->lh_cur_bits + 1;
169
170         if ((lh->lh_cur_bits > lh->lh_min_bits) &&
171             (__lustre_hash_theta(lh) < lh->lh_min_theta))
172                 return lh->lh_cur_bits - 1;
173
174         return 0;
175 }
176
177 /**
178  * Add item @hnode to lustre hash @lh using @key.  The registered
179  * ops->lh_get function will be called when the item is added.
180  */
181 void
182 lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
183 {
184         lustre_hash_bucket_t *lhb;
185         int                   bits;
186         unsigned              i;
187         ENTRY;
188
189         __lustre_hash_key_validate(lh, key, hnode);
190
191         read_lock(&lh->lh_rwlock);
192         i = lh_hash(lh, key, lh->lh_cur_mask);
193         lhb = &lh->lh_buckets[i];
194         LASSERT(i <= lh->lh_cur_mask);
195         LASSERT(hlist_unhashed(hnode));
196
197         write_lock(&lhb->lhb_rwlock);
198         __lustre_hash_bucket_add(lh, lhb, hnode);
199         write_unlock(&lhb->lhb_rwlock);
200
201         bits = lustre_hash_rehash_bits(lh);
202         read_unlock(&lh->lh_rwlock);
203         if (bits)
204                 lustre_hash_rehash(lh, bits);
205
206         EXIT;
207 }
208 EXPORT_SYMBOL(lustre_hash_add);
209
210 static struct hlist_node *
211 lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
212                                  struct hlist_node *hnode)
213 {
214         int                   bits = 0;
215         struct hlist_node    *ehnode;
216         lustre_hash_bucket_t *lhb;
217         unsigned              i;
218         ENTRY;
219
220         __lustre_hash_key_validate(lh, key, hnode);
221
222         read_lock(&lh->lh_rwlock);
223         i = lh_hash(lh, key, lh->lh_cur_mask);
224         lhb = &lh->lh_buckets[i];
225         LASSERT(i <= lh->lh_cur_mask);
226         LASSERT(hlist_unhashed(hnode));
227
228         write_lock(&lhb->lhb_rwlock);
229         ehnode = __lustre_hash_bucket_lookup(lh, lhb, key);
230         if (ehnode) {
231                 lh_get(lh, ehnode);
232         } else {
233                 __lustre_hash_bucket_add(lh, lhb, hnode);
234                 ehnode = hnode;
235                 bits = lustre_hash_rehash_bits(lh);
236         }
237         write_unlock(&lhb->lhb_rwlock);
238         read_unlock(&lh->lh_rwlock);
239         if (bits)
240                 lustre_hash_rehash(lh, bits);
241
242         RETURN(ehnode);
243 }
244
245 /**
246  * Add item @hnode to lustre hash @lh using @key.  The registered
247  * ops->lh_get function will be called if the item was added.
248  * Returns 0 on success or -EALREADY on key collisions.
249  */
250 int
251 lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
252 {
253         struct hlist_node    *ehnode;
254         ENTRY;
255
256         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
257         if (ehnode != hnode) {
258                 lh_put(lh, ehnode);
259                 RETURN(-EALREADY);
260         }
261         RETURN(0);
262 }
263 EXPORT_SYMBOL(lustre_hash_add_unique);
264
265 /**
266  * Add item @hnode to lustre hash @lh using @key.  If this @key
267  * already exists in the hash then ops->lh_get will be called on the
268  * conflicting entry and that entry will be returned to the caller.
269  * Otherwise ops->lh_get is called on the item which was added.
270  */
271 void *
272 lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
273                            struct hlist_node *hnode)
274 {
275         struct hlist_node    *ehnode;
276         void                 *obj;
277         ENTRY;
278
279         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
280         obj = lh_get(lh, ehnode);
281         lh_put(lh, ehnode);
282         RETURN(obj);
283 }
284 EXPORT_SYMBOL(lustre_hash_findadd_unique);
285
286 /**
287  * Delete item @hnode from the lustre hash @lh using @key.  The @key
288  * is required to ensure the correct hash bucket is locked since there
289  * is no direct linkage from the item to the bucket.  The object
290  * removed from the hash will be returned and obs->lh_put is called
291  * on the removed object.
292  */
293 void *
294 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
295 {
296         lustre_hash_bucket_t *lhb;
297         unsigned              i;
298         void                 *obj;
299         ENTRY;
300
301         __lustre_hash_key_validate(lh, key, hnode);
302
303         read_lock(&lh->lh_rwlock);
304         i = lh_hash(lh, key, lh->lh_cur_mask);
305         lhb = &lh->lh_buckets[i];
306         LASSERT(i <= lh->lh_cur_mask);
307         LASSERT(!hlist_unhashed(hnode));
308
309         write_lock(&lhb->lhb_rwlock);
310         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
311         write_unlock(&lhb->lhb_rwlock);
312         read_unlock(&lh->lh_rwlock);
313
314         RETURN(obj);
315 }
316 EXPORT_SYMBOL(lustre_hash_del);
317
318 /**
319  * Delete item given @key in lustre hash @lh.  The first @key found in
320  * the hash will be removed, if the key exists multiple times in the hash
321  * @lh this function must be called once per key.  The removed object
322  * will be returned and ops->lh_put is called on the removed object.
323  */
324 void *
325 lustre_hash_del_key(lustre_hash_t *lh, void *key)
326 {
327         struct hlist_node    *hnode;
328         lustre_hash_bucket_t *lhb;
329         unsigned              i;
330         void                 *obj = NULL;
331         ENTRY;
332
333         read_lock(&lh->lh_rwlock);
334         i = lh_hash(lh, key, lh->lh_cur_mask);
335         lhb = &lh->lh_buckets[i];
336         LASSERT(i <= lh->lh_cur_mask);
337
338         write_lock(&lhb->lhb_rwlock);
339         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
340         if (hnode)
341                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
342
343         write_unlock(&lhb->lhb_rwlock);
344         read_unlock(&lh->lh_rwlock);
345
346         RETURN(obj);
347 }
348 EXPORT_SYMBOL(lustre_hash_del_key);
349
350 /**
351  * Lookup an item using @key in the lustre hash @lh and return it.
352  * If the @key is found in the hash lh->lh_get() is called and the
353  * matching objects is returned.  It is the callers responsibility
354  * to call the counterpart ops->lh_put using the lh_put() macro
355  * when when finished with the object.  If the @key was not found
356  * in the hash @lh NULL is returned.
357  */
358 void *
359 lustre_hash_lookup(lustre_hash_t *lh, void *key)
360 {
361         struct hlist_node    *hnode;
362         lustre_hash_bucket_t *lhb;
363         unsigned              i;
364         void                 *obj = NULL;
365         ENTRY;
366
367         read_lock(&lh->lh_rwlock);
368         i = lh_hash(lh, key, lh->lh_cur_mask);
369         lhb = &lh->lh_buckets[i];
370         LASSERT(i <= lh->lh_cur_mask);
371
372         read_lock(&lhb->lhb_rwlock);
373         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
374         if (hnode)
375                 obj = lh_get(lh, hnode);
376
377         read_unlock(&lhb->lhb_rwlock);
378         read_unlock(&lh->lh_rwlock);
379
380         RETURN(obj);
381 }
382 EXPORT_SYMBOL(lustre_hash_lookup);
383
384 /**
385  * For each item in the lustre hash @lh call the passed callback @func
386  * and pass to it as an argument each hash item and the private @data.
387  * Before each callback ops->lh_get will be called, and after each
388  * callback ops->lh_put will be called.  Finally, during the callback
389  * the bucket lock is held so the callback must never sleep.
390  */
391 void
392 lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data)
393 {
394         struct hlist_node    *hnode;
395         lustre_hash_bucket_t *lhb;
396         void                 *obj;
397         int                   i;
398         ENTRY;
399
400         read_lock(&lh->lh_rwlock);
401         lh_for_each_bucket(lh, lhb, i) {
402                 read_lock(&lhb->lhb_rwlock);
403                 hlist_for_each(hnode, &(lhb->lhb_head)) {
404                         __lustre_hash_bucket_validate(lh, lhb, hnode);
405                         obj = lh_get(lh, hnode);
406                         func(obj, data);
407                         (void)lh_put(lh, hnode);
408                 }
409                 read_unlock(&lhb->lhb_rwlock);
410         }
411         read_unlock(&lh->lh_rwlock);
412
413         EXIT;
414 }
415 EXPORT_SYMBOL(lustre_hash_for_each);
416
417 /**
418  * For each item in the lustre hash @lh call the passed callback @func
419  * and pass to it as an argument each hash item and the private @data.
420  * Before each callback ops->lh_get will be called, and after each
421  * callback ops->lh_put will be called.  During the callback the
422  * bucket lock will not be held will allows for the current item
423  * to be removed from the hash during the callback.  However, care
424  * should be taken to prevent other callers from operating on the
425  * hash concurrently or list corruption may occur.
426  */
427 void
428 lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data)
429 {
430         struct hlist_node    *hnode;
431         struct hlist_node    *pos;
432         lustre_hash_bucket_t *lhb;
433         void                 *obj;
434         int                   i;
435         ENTRY;
436
437         read_lock(&lh->lh_rwlock);
438         lh_for_each_bucket(lh, lhb, i) {
439                 read_lock(&lhb->lhb_rwlock);
440                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
441                         __lustre_hash_bucket_validate(lh, lhb, hnode);
442                         obj = lh_get(lh, hnode);
443                         read_unlock(&lhb->lhb_rwlock);
444                         func(obj, data);
445                         read_lock(&lhb->lhb_rwlock);
446                         (void)lh_put(lh, hnode);
447                 }
448                 read_unlock(&lhb->lhb_rwlock);
449         }
450         read_unlock(&lh->lh_rwlock);
451         EXIT;
452 }
453 EXPORT_SYMBOL(lustre_hash_for_each_safe);
454
455 /**
456  * For each hash bucket in the lustre hash @lh call the passed callback
457  * @func until all the hash buckets are empty.  The passed callback @func
458  * or the previously registered callback lh->lh_put must remove the item
459  * from the hash.  You may either use the lustre_hash_del() or hlist_del()
460  * functions.  No rwlocks will be held during the callback @func it is
461  * safe to sleep if needed.  This function will not terminate until the
462  * hash is empty.  Note it is still possible to concurrently add new
463  * items in to the hash.  It is the callers responsibility to ensure
464  * the required locking is in place to prevent concurrent insertions.
465  */
466 void
467 lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data)
468 {
469         struct hlist_node    *hnode;
470         lustre_hash_bucket_t *lhb;
471         void                 *obj;
472         int                   i;
473         ENTRY;
474
475 restart:
476         read_lock(&lh->lh_rwlock);
477         lh_for_each_bucket(lh, lhb, i) {
478                 write_lock(&lhb->lhb_rwlock);
479                 while (!hlist_empty(&lhb->lhb_head)) {
480                         hnode =  lhb->lhb_head.first;
481                         __lustre_hash_bucket_validate(lh, lhb, hnode);
482                         obj = lh_get(lh, hnode);
483                         write_unlock(&lhb->lhb_rwlock);
484                         read_unlock(&lh->lh_rwlock);
485                         func(obj, data);
486                         (void)lh_put(lh, hnode);
487                         goto restart;
488                 }
489                 write_unlock(&lhb->lhb_rwlock);
490         }
491         read_unlock(&lh->lh_rwlock);
492         EXIT;
493 }
494 EXPORT_SYMBOL(lustre_hash_for_each_empty);
495
496 /*
497  * For each item in the lustre hash @lh which matches the @key call
498  * the passed callback @func and pass to it as an argument each hash
499  * item and the private @data.  Before each callback ops->lh_get will
500  * be called, and after each callback ops->lh_put will be called.
501  * Finally, during the callback the bucket lock is held so the
502  * callback must never sleep.
503    */
504 void
505 lustre_hash_for_each_key(lustre_hash_t *lh, void *key,
506                          lh_for_each_cb func, void *data)
507 {
508         struct hlist_node    *hnode;
509         lustre_hash_bucket_t *lhb;
510         unsigned              i;
511         ENTRY;
512
513         read_lock(&lh->lh_rwlock);
514         i = lh_hash(lh, key, lh->lh_cur_mask);
515         lhb = &lh->lh_buckets[i];
516         LASSERT(i <= lh->lh_cur_mask);
517
518         read_lock(&lhb->lhb_rwlock);
519         hlist_for_each(hnode, &(lhb->lhb_head)) {
520                 __lustre_hash_bucket_validate(lh, lhb, hnode);
521
522                 if (!lh_compare(lh, key, hnode))
523                         continue;
524
525                 func(lh_get(lh, hnode), data);
526                 (void)lh_put(lh, hnode);
527         }
528
529         read_unlock(&lhb->lhb_rwlock);
530         read_unlock(&lh->lh_rwlock);
531
532         EXIT;
533 }
534 EXPORT_SYMBOL(lustre_hash_for_each_key);
535
536 /**
537  * Rehash the lustre hash @lh to the given @bits.  This can be used
538  * to grow the hash size when excessive chaining is detected, or to
539  * shrink the hash when it is larger than needed.  When the LH_REHASH
540  * flag is set in @lh the lustre hash may be dynamically rehashed
541  * during addition or removal if the hash's theta value exceeds
542  * either the lh->lh_min_theta or lh->max_theta values.  By default
543  * these values are tuned to keep the chained hash depth small, and
544  * this approach assumes a reasonably uniform hashing function.  The
545  * theta thresholds for @lh are tunable via lustre_hash_set_theta().
546  */
547 int
548 lustre_hash_rehash(lustre_hash_t *lh, int bits)
549 {
550         struct hlist_node     *hnode;
551         struct hlist_node     *pos;
552         lustre_hash_bucket_t  *lh_buckets;
553         lustre_hash_bucket_t  *rehash_buckets;
554         lustre_hash_bucket_t  *lh_lhb;
555         lustre_hash_bucket_t  *rehash_lhb;
556         int                    i;
557         int                    theta;
558         int                    lh_mask;
559         int                    lh_bits;
560         int                    mask = (1 << bits) - 1;
561         void                  *key;
562         ENTRY;
563
564         LASSERT(!in_interrupt());
565         LASSERT(mask > 0);
566
567         OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) << bits);
568         if (!rehash_buckets)
569                 RETURN(-ENOMEM);
570
571         for (i = 0; i <= mask; i++) {
572                 INIT_HLIST_HEAD(&rehash_buckets[i].lhb_head);
573                 rwlock_init(&rehash_buckets[i].lhb_rwlock);
574                 atomic_set(&rehash_buckets[i].lhb_count, 0);
575         }
576
577         write_lock(&lh->lh_rwlock);
578
579         /*
580          * Early return for multiple concurrent racing callers,
581          * ensure we only trigger the rehash if it is still needed.
582          */
583         theta = __lustre_hash_theta(lh);
584         if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
585                 OBD_VFREE(rehash_buckets, sizeof(*rehash_buckets) << bits);
586                 write_unlock(&lh->lh_rwlock);
587                 RETURN(-EALREADY);
588         }
589
590         lh_bits = lh->lh_cur_bits;
591         lh_buckets = lh->lh_buckets;
592         lh_mask = (1 << lh_bits) - 1;
593
594         lh->lh_cur_bits = bits;
595         lh->lh_cur_mask = (1 << bits) - 1;
596         lh->lh_buckets = rehash_buckets;
597         atomic_inc(&lh->lh_rehash_count);
598
599         for (i = 0; i <= lh_mask; i++) {
600                 lh_lhb = &lh_buckets[i];
601
602                 write_lock(&lh_lhb->lhb_rwlock);
603                 hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
604                         key = lh_key(lh, hnode);
605                         LASSERT(key);
606
607                         /*
608                          * Validate hnode is in the correct bucket.
609                          */
610                         if (unlikely(lh->lh_flags & LH_DEBUG))
611                                 LASSERT(lh_hash(lh, key, lh_mask) == i);
612
613                         /*
614                          * Delete from old hash bucket.
615                          */
616                         hlist_del(hnode);
617                         LASSERT(atomic_read(&lh_lhb->lhb_count) > 0);
618                         atomic_dec(&lh_lhb->lhb_count);
619
620                         /*
621                          * Add to rehash bucket, ops->lh_key must be defined.
622                          */
623                         rehash_lhb = &rehash_buckets[lh_hash(lh, key, mask)];
624                         hlist_add_head(hnode, &(rehash_lhb->lhb_head));
625                         atomic_inc(&rehash_lhb->lhb_count);
626                 }
627
628                 LASSERT(hlist_empty(&(lh_lhb->lhb_head)));
629                 LASSERT(atomic_read(&lh_lhb->lhb_count) == 0);
630                 write_unlock(&lh_lhb->lhb_rwlock);
631         }
632
633         OBD_VFREE(lh_buckets, sizeof(*lh_buckets) << lh_bits);
634         write_unlock(&lh->lh_rwlock);
635
636         RETURN(0);
637 }
638 EXPORT_SYMBOL(lustre_hash_rehash);
639
640 /**
641  * Rehash the object referenced by @hnode in the lustre hash @lh.  The
642  * @old_key must be provided to locate the objects previous location
643  * in the hash, and the @new_key will be used to reinsert the object.
644  * Use this function instead of a lustre_hash_add() + lustre_hash_del()
645  * combo when it is critical that there is no window in time where the
646  * object is missing from the hash.  When an object is being rehashed
647  * the registered lh_get() and lh_put() functions will not be called.
648  */
649 void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key,
650                             struct hlist_node *hnode)
651 {
652         lustre_hash_bucket_t  *old_lhb;
653         lustre_hash_bucket_t  *new_lhb;
654         unsigned               i;
655         int                    j;
656         ENTRY;
657
658         __lustre_hash_key_validate(lh, new_key, hnode);
659         LASSERT(!hlist_unhashed(hnode));
660
661         read_lock(&lh->lh_rwlock);
662
663         i = lh_hash(lh, old_key, lh->lh_cur_mask);
664         old_lhb = &lh->lh_buckets[i];
665         LASSERT(i <= lh->lh_cur_mask);
666
667         j = lh_hash(lh, new_key, lh->lh_cur_mask);
668         new_lhb = &lh->lh_buckets[j];
669         LASSERT(j <= lh->lh_cur_mask);
670
671         write_lock(&old_lhb->lhb_rwlock);
672         write_lock(&new_lhb->lhb_rwlock);
673
674         /*
675          * Migrate item between hash buckets without calling
676          * the lh_get() and lh_put() callback functions.
677          */
678         hlist_del(hnode);
679         LASSERT(atomic_read(&old_lhb->lhb_count) > 0);
680         atomic_dec(&old_lhb->lhb_count);
681         hlist_add_head(hnode, &(new_lhb->lhb_head));
682         atomic_inc(&new_lhb->lhb_count);
683
684         write_unlock(&new_lhb->lhb_rwlock);
685         write_unlock(&old_lhb->lhb_rwlock);
686         read_unlock(&lh->lh_rwlock);
687
688         EXIT;
689 }
690 EXPORT_SYMBOL(lustre_hash_rehash_key);
691
692 int lustre_hash_debug_header(char *str, int size)
693 {
694         return snprintf(str, size,
695                  "%-*s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n", LUSTRE_MAX_HASH_NAME,
696                  "name", "cur", "min", "max", "theta", "t-min", "t-max",
697                  "flags", "rehash", "count", " distribution");
698 }
699 EXPORT_SYMBOL(lustre_hash_debug_header);
700
701 int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size)
702 {
703         lustre_hash_bucket_t  *lhb;
704         int                    theta;
705         int                    i;
706         int                    c = 0;
707         int                    dist[8] = { 0, };
708
709         if (str == NULL || size == 0)
710                 return 0;
711
712         read_lock(&lh->lh_rwlock);
713         theta = __lustre_hash_theta(lh);
714
715         c += snprintf(str + c, size - c, "%-*s ",
716                       LUSTRE_MAX_HASH_NAME, lh->lh_name);
717         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_cur_bits);
718         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_min_bits);
719         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_max_bits);
720         c += snprintf(str + c, size - c, "%d.%03d ",
721                       __lustre_hash_theta_int(theta),
722                       __lustre_hash_theta_frac(theta));
723         c += snprintf(str + c, size - c, "%d.%03d ",
724                       __lustre_hash_theta_int(lh->lh_min_theta),
725                       __lustre_hash_theta_frac(lh->lh_min_theta));
726         c += snprintf(str + c, size - c, "%d.%03d ",
727                       __lustre_hash_theta_int(lh->lh_max_theta),
728                       __lustre_hash_theta_frac(lh->lh_max_theta));
729         c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags);
730         c += snprintf(str + c, size - c, "%6d ",
731                       atomic_read(&lh->lh_rehash_count));
732         c += snprintf(str + c, size - c, "%5d ",
733                       atomic_read(&lh->lh_count));
734
735         /*
736          * The distribution is a summary of the chained hash depth in
737          * each of the lustre hash buckets.  Each buckets lhb_count is
738          * divided by the hash theta value and used to generate a
739          * histogram of the hash distribution.  A uniform hash will
740          * result in all hash buckets being close to the average thus
741          * only the first few entries in the histogram will be non-zero.
742          * If you hash function results in a non-uniform hash the will
743          * be observable by outlier bucks in the distribution histogram.
744          *
745          * Uniform hash distribution:      128/128/0/0/0/0/0/0
746          * Non-Uniform hash distribution:  128/125/0/0/0/0/2/1
747          */
748         lh_for_each_bucket(lh, lhb, i)
749                 dist[min(__fls(atomic_read(&lhb->lhb_count)/max(theta,1)),7)]++;
750
751         for (i = 0; i < 8; i++)
752                 c += snprintf(str + c, size - c, "%d%c",  dist[i],
753                               (i == 7) ? '\n' : '/');
754
755         read_unlock(&lh->lh_rwlock);
756
757         return c;
758 }
759 EXPORT_SYMBOL(lustre_hash_debug_str);