Whamcloud - gitweb
682cdc8700e83a1b79d086ae9a12f26d11b58242
[fs/lustre-release.git] / lustre / obdclass / class_hash.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/class_hash.c
37  *
38  * Implement a hash class for hash process in lustre system.
39  *
40  * Author: YuZhangyong <yzy@clusterfs.com>
41  *
42  * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov>
43  * - Simplified API and improved documentation
44  * - Added per-hash feature flags:
45  *   * LH_DEBUG additional validation
46  *   * LH_REHASH dynamic rehashing
47  * - Added per-hash statistics
48  * - General performance enhancements
49  */
50
51 #ifndef __KERNEL__
52 #include <liblustre.h>
53 #include <obd.h>
54 #endif
55
56 #include <class_hash.h>
57
58 static void
59 lh_read_lock(lustre_hash_t *lh)
60 {
61         if ((lh->lh_flags & LH_REHASH) != 0)
62                 read_lock(&lh->lh_rwlock);
63 }
64
65 static void
66 lh_read_unlock(lustre_hash_t *lh)
67 {
68         if ((lh->lh_flags & LH_REHASH) != 0)
69                 read_unlock(&lh->lh_rwlock);
70 }
71
72 static void
73 lh_write_lock(lustre_hash_t *lh)
74 {
75         if ((lh->lh_flags & LH_REHASH) != 0)
76                 write_lock(&lh->lh_rwlock);
77 }
78
79 static void
80 lh_write_unlock(lustre_hash_t *lh)
81 {
82         if ((lh->lh_flags & LH_REHASH) != 0)
83                 write_unlock(&lh->lh_rwlock);
84 }
85
86 /**
87  * Initialize new lustre hash, where:
88  * @name     - Descriptive hash name
89  * @cur_bits - Initial hash table size, in bits
90  * @max_bits - Maximum allowed hash table resize, in bits
91  * @ops      - Registered hash table operations
92  * @flags    - LH_REHASH enable synamic hash resizing
93  *           - LH_SORT enable chained hash sort
94  */
95 lustre_hash_t *
96 lustre_hash_init(char *name, unsigned int cur_bits, unsigned int max_bits,
97                  lustre_hash_ops_t *ops, int flags)
98 {
99         lustre_hash_t *lh;
100         int            i;
101         ENTRY;
102
103         LASSERT(name != NULL);
104         LASSERT(ops != NULL);
105
106         LASSERT(cur_bits > 0);
107         LASSERT(max_bits >= cur_bits);
108         LASSERT(max_bits < 31);
109
110         LIBCFS_ALLOC_PTR(lh);
111         if (!lh)
112                 RETURN(NULL);
113
114         strncpy(lh->lh_name, name, sizeof(lh->lh_name));
115         lh->lh_name[sizeof(lh->lh_name) - 1] = '\0';
116         atomic_set(&lh->lh_rehash_count, 0);
117         atomic_set(&lh->lh_count, 0);
118         rwlock_init(&lh->lh_rwlock);
119         lh->lh_cur_bits = cur_bits;
120         lh->lh_cur_mask = (1 << cur_bits) - 1;
121         lh->lh_min_bits = cur_bits;
122         lh->lh_max_bits = max_bits;
123         /* XXX: need to fixup lustre_hash_rehash_bits() before this can be
124          *      anything other than 0.5 and 2.0 */
125         lh->lh_min_theta = 1 << (LH_THETA_BITS - 1);
126         lh->lh_max_theta = 1 << (LH_THETA_BITS + 1);
127         lh->lh_ops = ops;
128         lh->lh_flags = flags;
129         if (cur_bits != max_bits && (lh->lh_flags & LH_REHASH) == 0)
130                 CERROR("Rehash is disabled for %s, ignore max_bits %d\n",
131                        name, max_bits);
132
133         /* theta * 1000 */
134         __lustre_hash_set_theta(lh, 500, 2000);
135
136         LIBCFS_ALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
137         if (!lh->lh_buckets) {
138                 LIBCFS_FREE_PTR(lh);
139                 RETURN(NULL);
140         }
141
142         for (i = 0; i <= lh->lh_cur_mask; i++) {
143                 LIBCFS_ALLOC(lh->lh_buckets[i], sizeof(lustre_hash_bucket_t));
144                 if (lh->lh_buckets[i] == NULL) {
145                         lustre_hash_exit(lh);
146                         return NULL;
147                 }
148
149                 INIT_HLIST_HEAD(&lh->lh_buckets[i]->lhb_head);
150                 rwlock_init(&lh->lh_buckets[i]->lhb_rwlock);
151                 atomic_set(&lh->lh_buckets[i]->lhb_count, 0);
152         }
153
154         return lh;
155 }
156 EXPORT_SYMBOL(lustre_hash_init);
157
158 /**
159  * Cleanup lustre hash @lh.
160  */
161 void
162 lustre_hash_exit(lustre_hash_t *lh)
163 {
164         lustre_hash_bucket_t *lhb;
165         struct hlist_node    *hnode;
166         struct hlist_node    *pos;
167         int                   i;
168         ENTRY;
169
170         LASSERT(lh != NULL);
171
172         lh_write_lock(lh);
173
174         lh_for_each_bucket(lh, lhb, i) {
175                 if (lhb == NULL)
176                         continue;
177
178                 write_lock(&lhb->lhb_rwlock);
179                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
180                         __lustre_hash_bucket_validate(lh, lhb, hnode);
181                         __lustre_hash_bucket_del(lh, lhb, hnode);
182                         lh_exit(lh, hnode);
183                 }
184
185                 LASSERT(hlist_empty(&(lhb->lhb_head)));
186                 LASSERT(atomic_read(&lhb->lhb_count) == 0);
187                 write_unlock(&lhb->lhb_rwlock);
188                 LIBCFS_FREE_PTR(lhb);
189         }
190
191         LASSERT(atomic_read(&lh->lh_count) == 0);
192         lh_write_unlock(lh);
193
194         LIBCFS_FREE(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
195         LIBCFS_FREE_PTR(lh);
196         EXIT;
197 }
198 EXPORT_SYMBOL(lustre_hash_exit);
199
200 static inline unsigned int lustre_hash_rehash_bits(lustre_hash_t *lh)
201 {
202         if (!(lh->lh_flags & LH_REHASH))
203                 return 0;
204
205         /* XXX: need to handle case with max_theta != 2.0
206          *      and the case with min_theta != 0.5 */
207         if ((lh->lh_cur_bits < lh->lh_max_bits) &&
208             (__lustre_hash_theta(lh) > lh->lh_max_theta))
209                 return lh->lh_cur_bits + 1;
210
211         if ((lh->lh_cur_bits > lh->lh_min_bits) &&
212             (__lustre_hash_theta(lh) < lh->lh_min_theta))
213                 return lh->lh_cur_bits - 1;
214
215         return 0;
216 }
217
218 /**
219  * Add item @hnode to lustre hash @lh using @key.  The registered
220  * ops->lh_get function will be called when the item is added.
221  */
222 void
223 lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
224 {
225         lustre_hash_bucket_t *lhb;
226         int                   bits;
227         unsigned              i;
228         ENTRY;
229
230         __lustre_hash_key_validate(lh, key, hnode);
231
232         lh_read_lock(lh);
233         i = lh_hash(lh, key, lh->lh_cur_mask);
234         lhb = lh->lh_buckets[i];
235         LASSERT(i <= lh->lh_cur_mask);
236         LASSERT(hlist_unhashed(hnode));
237
238         write_lock(&lhb->lhb_rwlock);
239         __lustre_hash_bucket_add(lh, lhb, hnode);
240         write_unlock(&lhb->lhb_rwlock);
241
242         bits = lustre_hash_rehash_bits(lh);
243         lh_read_unlock(lh);
244         if (bits)
245                 lustre_hash_rehash(lh, bits);
246
247         EXIT;
248 }
249 EXPORT_SYMBOL(lustre_hash_add);
250
251 static struct hlist_node *
252 lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
253                                  struct hlist_node *hnode)
254 {
255         int                   bits = 0;
256         struct hlist_node    *ehnode;
257         lustre_hash_bucket_t *lhb;
258         unsigned              i;
259         ENTRY;
260
261         __lustre_hash_key_validate(lh, key, hnode);
262
263         lh_read_lock(lh);
264         i = lh_hash(lh, key, lh->lh_cur_mask);
265         lhb = lh->lh_buckets[i];
266         LASSERT(i <= lh->lh_cur_mask);
267         LASSERT(hlist_unhashed(hnode));
268
269         write_lock(&lhb->lhb_rwlock);
270         ehnode = __lustre_hash_bucket_lookup(lh, lhb, key);
271         if (ehnode) {
272                 lh_get(lh, ehnode);
273         } else {
274                 __lustre_hash_bucket_add(lh, lhb, hnode);
275                 ehnode = hnode;
276                 bits = lustre_hash_rehash_bits(lh);
277         }
278         write_unlock(&lhb->lhb_rwlock);
279         lh_read_unlock(lh);
280         if (bits)
281                 lustre_hash_rehash(lh, bits);
282
283         RETURN(ehnode);
284 }
285
286 /**
287  * Add item @hnode to lustre hash @lh using @key.  The registered
288  * ops->lh_get function will be called if the item was added.
289  * Returns 0 on success or -EALREADY on key collisions.
290  */
291 int
292 lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
293 {
294         struct hlist_node    *ehnode;
295         ENTRY;
296
297         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
298         if (ehnode != hnode) {
299                 lh_put(lh, ehnode);
300                 RETURN(-EALREADY);
301         }
302         RETURN(0);
303 }
304 EXPORT_SYMBOL(lustre_hash_add_unique);
305
306 /**
307  * Add item @hnode to lustre hash @lh using @key.  If this @key
308  * already exists in the hash then ops->lh_get will be called on the
309  * conflicting entry and that entry will be returned to the caller.
310  * Otherwise ops->lh_get is called on the item which was added.
311  */
312 void *
313 lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
314                            struct hlist_node *hnode)
315 {
316         struct hlist_node    *ehnode;
317         void                 *obj;
318         ENTRY;
319
320         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
321         obj = lh_get(lh, ehnode);
322         lh_put(lh, ehnode);
323         RETURN(obj);
324 }
325 EXPORT_SYMBOL(lustre_hash_findadd_unique);
326
327 /**
328  * Delete item @hnode from the lustre hash @lh using @key.  The @key
329  * is required to ensure the correct hash bucket is locked since there
330  * is no direct linkage from the item to the bucket.  The object
331  * removed from the hash will be returned and obs->lh_put is called
332  * on the removed object.
333  */
334 void *
335 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
336 {
337         lustre_hash_bucket_t *lhb;
338         unsigned              i;
339         void                 *obj;
340         ENTRY;
341
342         __lustre_hash_key_validate(lh, key, hnode);
343
344         lh_read_lock(lh);
345         i = lh_hash(lh, key, lh->lh_cur_mask);
346         lhb = lh->lh_buckets[i];
347         LASSERT(i <= lh->lh_cur_mask);
348         LASSERT(!hlist_unhashed(hnode));
349
350         write_lock(&lhb->lhb_rwlock);
351         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
352         write_unlock(&lhb->lhb_rwlock);
353         lh_read_unlock(lh);
354
355         RETURN(obj);
356 }
357 EXPORT_SYMBOL(lustre_hash_del);
358
359 /**
360  * Delete item given @key in lustre hash @lh.  The first @key found in
361  * the hash will be removed, if the key exists multiple times in the hash
362  * @lh this function must be called once per key.  The removed object
363  * will be returned and ops->lh_put is called on the removed object.
364  */
365 void *
366 lustre_hash_del_key(lustre_hash_t *lh, void *key)
367 {
368         struct hlist_node    *hnode;
369         lustre_hash_bucket_t *lhb;
370         unsigned              i;
371         void                 *obj = NULL;
372         ENTRY;
373
374         lh_read_lock(lh);
375         i = lh_hash(lh, key, lh->lh_cur_mask);
376         lhb = lh->lh_buckets[i];
377         LASSERT(i <= lh->lh_cur_mask);
378
379         write_lock(&lhb->lhb_rwlock);
380         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
381         if (hnode)
382                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
383
384         write_unlock(&lhb->lhb_rwlock);
385         lh_read_unlock(lh);
386
387         RETURN(obj);
388 }
389 EXPORT_SYMBOL(lustre_hash_del_key);
390
391 /**
392  * Lookup an item using @key in the lustre hash @lh and return it.
393  * If the @key is found in the hash lh->lh_get() is called and the
394  * matching objects is returned.  It is the callers responsibility
395  * to call the counterpart ops->lh_put using the lh_put() macro
396  * when when finished with the object.  If the @key was not found
397  * in the hash @lh NULL is returned.
398  */
399 void *
400 lustre_hash_lookup(lustre_hash_t *lh, void *key)
401 {
402         struct hlist_node    *hnode;
403         lustre_hash_bucket_t *lhb;
404         unsigned              i;
405         void                 *obj = NULL;
406         ENTRY;
407
408         lh_read_lock(lh);
409         i = lh_hash(lh, key, lh->lh_cur_mask);
410         lhb = lh->lh_buckets[i];
411         LASSERT(i <= lh->lh_cur_mask);
412
413         read_lock(&lhb->lhb_rwlock);
414         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
415         if (hnode)
416                 obj = lh_get(lh, hnode);
417
418         read_unlock(&lhb->lhb_rwlock);
419         lh_read_unlock(lh);
420
421         RETURN(obj);
422 }
423 EXPORT_SYMBOL(lustre_hash_lookup);
424
425 /**
426  * For each item in the lustre hash @lh call the passed callback @func
427  * and pass to it as an argument each hash item and the private @data.
428  * Before each callback ops->lh_get will be called, and after each
429  * callback ops->lh_put will be called.  Finally, during the callback
430  * the bucket lock is held so the callback must never sleep.
431  */
432 void
433 lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data)
434 {
435         struct hlist_node    *hnode;
436         lustre_hash_bucket_t *lhb;
437         void                 *obj;
438         int                   i;
439         ENTRY;
440
441         lh_read_lock(lh);
442         lh_for_each_bucket(lh, lhb, i) {
443                 read_lock(&lhb->lhb_rwlock);
444                 hlist_for_each(hnode, &(lhb->lhb_head)) {
445                         __lustre_hash_bucket_validate(lh, lhb, hnode);
446                         obj = lh_get(lh, hnode);
447                         func(obj, data);
448                         (void)lh_put(lh, hnode);
449                 }
450                 read_unlock(&lhb->lhb_rwlock);
451         }
452         lh_read_unlock(lh);
453
454         EXIT;
455 }
456 EXPORT_SYMBOL(lustre_hash_for_each);
457
458 /**
459  * For each item in the lustre hash @lh call the passed callback @func
460  * and pass to it as an argument each hash item and the private @data.
461  * Before each callback ops->lh_get will be called, and after each
462  * callback ops->lh_put will be called.  During the callback the
463  * bucket lock will not be held will allows for the current item
464  * to be removed from the hash during the callback.  However, care
465  * should be taken to prevent other callers from operating on the
466  * hash concurrently or list corruption may occur.
467  */
468 void
469 lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data)
470 {
471         struct hlist_node    *hnode;
472         struct hlist_node    *pos;
473         lustre_hash_bucket_t *lhb;
474         void                 *obj;
475         int                   i;
476         ENTRY;
477
478         lh_read_lock(lh);
479         lh_for_each_bucket(lh, lhb, i) {
480                 read_lock(&lhb->lhb_rwlock);
481                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
482                         __lustre_hash_bucket_validate(lh, lhb, hnode);
483                         obj = lh_get(lh, hnode);
484                         read_unlock(&lhb->lhb_rwlock);
485                         func(obj, data);
486                         read_lock(&lhb->lhb_rwlock);
487                         (void)lh_put(lh, hnode);
488                 }
489                 read_unlock(&lhb->lhb_rwlock);
490         }
491         lh_read_unlock(lh);
492         EXIT;
493 }
494 EXPORT_SYMBOL(lustre_hash_for_each_safe);
495
496 /**
497  * For each hash bucket in the lustre hash @lh call the passed callback
498  * @func until all the hash buckets are empty.  The passed callback @func
499  * or the previously registered callback lh->lh_put must remove the item
500  * from the hash.  You may either use the lustre_hash_del() or hlist_del()
501  * functions.  No rwlocks will be held during the callback @func it is
502  * safe to sleep if needed.  This function will not terminate until the
503  * hash is empty.  Note it is still possible to concurrently add new
504  * items in to the hash.  It is the callers responsibility to ensure
505  * the required locking is in place to prevent concurrent insertions.
506  */
507 void
508 lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data)
509 {
510         struct hlist_node    *hnode;
511         lustre_hash_bucket_t *lhb;
512         lustre_hash_bucket_t **lhb_last = NULL;
513         void                 *obj;
514         int                   i = 0;
515         ENTRY;
516
517 restart:
518         lh_read_lock(lh);
519         /* If the hash table has changed since we last held lh_rwlock,
520          * we need to start traversing the list from the start. */
521         if (lh->lh_buckets != lhb_last) {
522                 i = 0;
523                 lhb_last = lh->lh_buckets;
524         }
525         lh_for_each_bucket_restart(lh, lhb, i) {
526                 write_lock(&lhb->lhb_rwlock);
527                 while (!hlist_empty(&lhb->lhb_head)) {
528                         hnode =  lhb->lhb_head.first;
529                         __lustre_hash_bucket_validate(lh, lhb, hnode);
530                         obj = lh_get(lh, hnode);
531                         write_unlock(&lhb->lhb_rwlock);
532                         lh_read_unlock(lh);
533                         func(obj, data);
534                         (void)lh_put(lh, hnode);
535                         cfs_cond_resched();
536                         goto restart;
537                 }
538                 write_unlock(&lhb->lhb_rwlock);
539         }
540         lh_read_unlock(lh);
541         EXIT;
542 }
543 EXPORT_SYMBOL(lustre_hash_for_each_empty);
544
545 /*
546  * For each item in the lustre hash @lh which matches the @key call
547  * the passed callback @func and pass to it as an argument each hash
548  * item and the private @data.  Before each callback ops->lh_get will
549  * be called, and after each callback ops->lh_put will be called.
550  * Finally, during the callback the bucket lock is held so the
551  * callback must never sleep.
552    */
553 void
554 lustre_hash_for_each_key(lustre_hash_t *lh, void *key,
555                          lh_for_each_cb func, void *data)
556 {
557         struct hlist_node    *hnode;
558         lustre_hash_bucket_t *lhb;
559         unsigned              i;
560         ENTRY;
561
562         lh_read_lock(lh);
563         i = lh_hash(lh, key, lh->lh_cur_mask);
564         lhb = lh->lh_buckets[i];
565         LASSERT(i <= lh->lh_cur_mask);
566
567         read_lock(&lhb->lhb_rwlock);
568         hlist_for_each(hnode, &(lhb->lhb_head)) {
569                 __lustre_hash_bucket_validate(lh, lhb, hnode);
570
571                 if (!lh_compare(lh, key, hnode))
572                         continue;
573
574                 func(lh_get(lh, hnode), data);
575                 (void)lh_put(lh, hnode);
576         }
577
578         read_unlock(&lhb->lhb_rwlock);
579         lh_read_unlock(lh);
580
581         EXIT;
582 }
583 EXPORT_SYMBOL(lustre_hash_for_each_key);
584
585 /**
586  * Rehash the lustre hash @lh to the given @bits.  This can be used
587  * to grow the hash size when excessive chaining is detected, or to
588  * shrink the hash when it is larger than needed.  When the LH_REHASH
589  * flag is set in @lh the lustre hash may be dynamically rehashed
590  * during addition or removal if the hash's theta value exceeds
591  * either the lh->lh_min_theta or lh->max_theta values.  By default
592  * these values are tuned to keep the chained hash depth small, and
593  * this approach assumes a reasonably uniform hashing function.  The
594  * theta thresholds for @lh are tunable via lustre_hash_set_theta().
595  */
596 int
597 lustre_hash_rehash(lustre_hash_t *lh, int bits)
598 {
599         struct hlist_node     *hnode;
600         struct hlist_node     *pos;
601         lustre_hash_bucket_t **lh_buckets;
602         lustre_hash_bucket_t **rehash_buckets;
603         lustre_hash_bucket_t  *lh_lhb;
604         lustre_hash_bucket_t  *rehash_lhb;
605         int                    i;
606         int                    theta;
607         int                    lh_mask;
608         int                    lh_bits;
609         int                    mask = (1 << bits) - 1;
610         int                    rc = 0;
611         void                  *key;
612         ENTRY;
613
614         LASSERT(!in_interrupt());
615         LASSERT(mask > 0);
616         LASSERT((lh->lh_flags & LH_REHASH) != 0);
617
618         LIBCFS_ALLOC(rehash_buckets, sizeof(*rehash_buckets) << bits);
619         if (!rehash_buckets)
620                 RETURN(-ENOMEM);
621
622         for (i = 0; i <= mask; i++) {
623                 LIBCFS_ALLOC(rehash_buckets[i], sizeof(*rehash_buckets[i]));
624                 if (rehash_buckets[i] == NULL)
625                         GOTO(free, rc = -ENOMEM);
626
627                 INIT_HLIST_HEAD(&rehash_buckets[i]->lhb_head);
628                 rwlock_init(&rehash_buckets[i]->lhb_rwlock);
629                 atomic_set(&rehash_buckets[i]->lhb_count, 0);
630         }
631
632         lh_write_lock(lh);
633
634         /*
635          * Early return for multiple concurrent racing callers,
636          * ensure we only trigger the rehash if it is still needed.
637          */
638         theta = __lustre_hash_theta(lh);
639         if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
640                 lh_write_unlock(lh);
641                 GOTO(free, rc = -EALREADY);
642         }
643
644         lh_bits = lh->lh_cur_bits;
645         lh_buckets = lh->lh_buckets;
646         lh_mask = (1 << lh_bits) - 1;
647
648         lh->lh_cur_bits = bits;
649         lh->lh_cur_mask = (1 << bits) - 1;
650         lh->lh_buckets = rehash_buckets;
651         atomic_inc(&lh->lh_rehash_count);
652
653         for (i = 0; i <= lh_mask; i++) {
654                 lh_lhb = lh_buckets[i];
655
656                 write_lock(&lh_lhb->lhb_rwlock);
657                 hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
658                         key = lh_key(lh, hnode);
659                         LASSERT(key);
660
661                         /*
662                          * Validate hnode is in the correct bucket.
663                          */
664                         if (unlikely(lh->lh_flags & LH_DEBUG))
665                                 LASSERT(lh_hash(lh, key, lh_mask) == i);
666
667                         /*
668                          * Delete from old hash bucket.
669                          */
670                         hlist_del(hnode);
671                         LASSERT(atomic_read(&lh_lhb->lhb_count) > 0);
672                         atomic_dec(&lh_lhb->lhb_count);
673
674                         /*
675                          * Add to rehash bucket, ops->lh_key must be defined.
676                          */
677                         rehash_lhb = rehash_buckets[lh_hash(lh, key, mask)];
678                         hlist_add_head(hnode, &(rehash_lhb->lhb_head));
679                         atomic_inc(&rehash_lhb->lhb_count);
680                 }
681
682                 LASSERT(hlist_empty(&(lh_lhb->lhb_head)));
683                 LASSERT(atomic_read(&lh_lhb->lhb_count) == 0);
684                 write_unlock(&lh_lhb->lhb_rwlock);
685         }
686
687         lh_write_unlock(lh);
688         rehash_buckets = lh_buckets;
689         i = (1 << lh_bits);
690         bits = lh_bits;
691 free:
692         while (i-- > 0)
693                 LIBCFS_FREE(rehash_buckets[i], sizeof(*rehash_buckets[i]));
694         LIBCFS_FREE(rehash_buckets, sizeof(*rehash_buckets) << bits);
695
696         RETURN(rc);
697 }
698 EXPORT_SYMBOL(lustre_hash_rehash);
699
700 /**
701  * Rehash the object referenced by @hnode in the lustre hash @lh.  The
702  * @old_key must be provided to locate the objects previous location
703  * in the hash, and the @new_key will be used to reinsert the object.
704  * Use this function instead of a lustre_hash_add() + lustre_hash_del()
705  * combo when it is critical that there is no window in time where the
706  * object is missing from the hash.  When an object is being rehashed
707  * the registered lh_get() and lh_put() functions will not be called.
708  */
709 void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key,
710                             struct hlist_node *hnode)
711 {
712         lustre_hash_bucket_t  *old_lhb;
713         lustre_hash_bucket_t  *new_lhb;
714         unsigned               i;
715         unsigned               j;
716         ENTRY;
717
718         __lustre_hash_key_validate(lh, new_key, hnode);
719         LASSERT(!hlist_unhashed(hnode));
720
721         lh_read_lock(lh);
722
723         i = lh_hash(lh, old_key, lh->lh_cur_mask);
724         old_lhb = lh->lh_buckets[i];
725         LASSERT(i <= lh->lh_cur_mask);
726
727         j = lh_hash(lh, new_key, lh->lh_cur_mask);
728         new_lhb = lh->lh_buckets[j];
729         LASSERT(j <= lh->lh_cur_mask);
730
731         if (i < j) { /* write_lock ordering */
732                 write_lock(&old_lhb->lhb_rwlock);
733                 write_lock(&new_lhb->lhb_rwlock);
734         } else if (i > j) {
735                 write_lock(&new_lhb->lhb_rwlock);
736                 write_lock(&old_lhb->lhb_rwlock);
737         } else { /* do nothing */
738                 read_unlock(&lh->lh_rwlock);
739                 EXIT;
740                 return;
741         }
742
743         /*
744          * Migrate item between hash buckets without calling
745          * the lh_get() and lh_put() callback functions.
746          */
747         hlist_del(hnode);
748         LASSERT(atomic_read(&old_lhb->lhb_count) > 0);
749         atomic_dec(&old_lhb->lhb_count);
750         hlist_add_head(hnode, &(new_lhb->lhb_head));
751         atomic_inc(&new_lhb->lhb_count);
752
753         write_unlock(&new_lhb->lhb_rwlock);
754         write_unlock(&old_lhb->lhb_rwlock);
755         lh_read_unlock(lh);
756
757         EXIT;
758 }
759 EXPORT_SYMBOL(lustre_hash_rehash_key);
760
761 int lustre_hash_debug_header(char *str, int size)
762 {
763         return snprintf(str, size,
764                  "%-*s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n", LUSTRE_MAX_HASH_NAME,
765                  "name", "cur", "min", "max", "theta", "t-min", "t-max",
766                  "flags", "rehash", "count", " distribution");
767 }
768 EXPORT_SYMBOL(lustre_hash_debug_header);
769
770 int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size)
771 {
772         lustre_hash_bucket_t  *lhb;
773         int                    theta;
774         int                    i;
775         int                    c = 0;
776         int                    dist[8] = { 0, };
777
778         if (str == NULL || size == 0)
779                 return 0;
780
781         lh_read_lock(lh);
782         theta = __lustre_hash_theta(lh);
783
784         c += snprintf(str + c, size - c, "%-*s ",
785                       LUSTRE_MAX_HASH_NAME, lh->lh_name);
786         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_cur_bits);
787         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_min_bits);
788         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_max_bits);
789         c += snprintf(str + c, size - c, "%d.%03d ",
790                       __lustre_hash_theta_int(theta),
791                       __lustre_hash_theta_frac(theta));
792         c += snprintf(str + c, size - c, "%d.%03d ",
793                       __lustre_hash_theta_int(lh->lh_min_theta),
794                       __lustre_hash_theta_frac(lh->lh_min_theta));
795         c += snprintf(str + c, size - c, "%d.%03d ",
796                       __lustre_hash_theta_int(lh->lh_max_theta),
797                       __lustre_hash_theta_frac(lh->lh_max_theta));
798         c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags);
799         c += snprintf(str + c, size - c, "%6d ",
800                       atomic_read(&lh->lh_rehash_count));
801         c += snprintf(str + c, size - c, "%5d ",
802                       atomic_read(&lh->lh_count));
803
804         /*
805          * The distribution is a summary of the chained hash depth in
806          * each of the lustre hash buckets.  Each buckets lhb_count is
807          * divided by the hash theta value and used to generate a
808          * histogram of the hash distribution.  A uniform hash will
809          * result in all hash buckets being close to the average thus
810          * only the first few entries in the histogram will be non-zero.
811          * If you hash function results in a non-uniform hash the will
812          * be observable by outlier bucks in the distribution histogram.
813          *
814          * Uniform hash distribution:      128/128/0/0/0/0/0/0
815          * Non-Uniform hash distribution:  128/125/0/0/0/0/2/1
816          */
817         lh_for_each_bucket(lh, lhb, i)
818                 dist[min(__fls(atomic_read(&lhb->lhb_count)/max(theta,1)),7)]++;
819
820         for (i = 0; i < 8; i++)
821                 c += snprintf(str + c, size - c, "%d%c",  dist[i],
822                               (i == 7) ? '\n' : '/');
823
824         lh_read_unlock(lh);
825
826         return c;
827 }
828 EXPORT_SYMBOL(lustre_hash_debug_str);