Whamcloud - gitweb
b=17511
[fs/lustre-release.git] / lustre / obdclass / class_hash.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/class_hash.c
37  *
38  * Implement a hash class for hash process in lustre system.
39  *
40  * Author: YuZhangyong <yzy@clusterfs.com>
41  *
42  * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov>
43  * - Simplified API and improved documentation
44  * - Added per-hash feature flags:
45  *   * LH_DEBUG additional validation
46  *   * LH_REHASH dynamic rehashing
47  * - Added per-hash statistics
48  * - General performance enhancements
49  */
50
51 #ifndef __KERNEL__
52 #include <liblustre.h>
53 #include <obd.h>
54 #endif
55
56 #include <class_hash.h>
57
58 /**
59  * Initialize new lustre hash, where:
60  * @name     - Descriptive hash name
61  * @cur_size - Initial hash table size
62  * @max_size - Maximum allowed hash table resize
63  * @ops      - Registered hash table operations
64  * @flags    - LH_REHASH enable synamic hash resizing
65  *           - LH_SORT enable chained hash sort
66  */
67 lustre_hash_t *
68 lustre_hash_init(char *name, unsigned int cur_size, unsigned int max_size,
69                  lustre_hash_ops_t *ops, int flags)
70 {
71         lustre_hash_t *lh;
72         int            i;
73         ENTRY;
74
75         LASSERT(name != NULL);
76         LASSERT(ops != NULL);
77
78         /*
79          * Ensure hash is a power of two to allow the use of a bitmask
80          * in the hash function instead of a more expensive modulus.
81          */
82         LASSERTF(cur_size && (cur_size & (cur_size - 1)) == 0,
83                  "Size (%u) is not power of 2\n", cur_size);
84         LASSERTF(max_size && (max_size & (max_size - 1)) == 0,
85                  "Size (%u) is not power of 2\n", max_size);
86
87         OBD_ALLOC_PTR(lh);
88         if (!lh)
89                 RETURN(NULL);
90
91         strncpy(lh->lh_name, name, sizeof(lh->lh_name));
92         atomic_set(&lh->lh_rehash_count, 0);
93         atomic_set(&lh->lh_count, 0);
94         rwlock_init(&lh->lh_rwlock);
95         lh->lh_cur_size = cur_size;
96         lh->lh_min_size = cur_size;
97         lh->lh_max_size = max_size;
98         lh->lh_ops = ops;
99         lh->lh_flags = flags;
100
101         /* theta * 1000 */
102         __lustre_hash_set_theta(lh, 500, 2000);
103
104         OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size);
105         if (!lh->lh_buckets) {
106                 OBD_FREE_PTR(lh);
107                 RETURN(NULL);
108         }
109
110         for (i = 0; i < lh->lh_cur_size; i++) {
111                 INIT_HLIST_HEAD(&lh->lh_buckets[i].lhb_head);
112                 rwlock_init(&lh->lh_buckets[i].lhb_rwlock);
113                 atomic_set(&lh->lh_buckets[i].lhb_count, 0);
114         }
115
116         return lh;
117 }
118 EXPORT_SYMBOL(lustre_hash_init);
119
120 /**
121  * Cleanup lustre hash @lh.
122  */
123 void
124 lustre_hash_exit(lustre_hash_t *lh)
125 {
126         lustre_hash_bucket_t *lhb;
127         struct hlist_node    *hnode;
128         struct hlist_node    *pos;
129         int                   i;
130         ENTRY;
131
132         LASSERT(lh != NULL);
133
134         write_lock(&lh->lh_rwlock);
135
136         lh_for_each_bucket(lh, lhb, i) {
137                 write_lock(&lhb->lhb_rwlock);
138                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
139                         __lustre_hash_bucket_validate(lh, lhb, hnode);
140                         __lustre_hash_bucket_del(lh, lhb, hnode);
141                         lh_exit(lh, hnode);
142                 }
143
144                 LASSERT(hlist_empty(&(lhb->lhb_head)));
145                 LASSERT(atomic_read(&lhb->lhb_count) == 0);
146                 write_unlock(&lhb->lhb_rwlock);
147         }
148
149         OBD_VFREE(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size);
150         LASSERT(atomic_read(&lh->lh_count) == 0);
151         write_unlock(&lh->lh_rwlock);
152
153         OBD_FREE_PTR(lh);
154         EXIT;
155 }
156 EXPORT_SYMBOL(lustre_hash_exit);
157
158 static inline unsigned int lustre_hash_rehash_size(lustre_hash_t *lh)
159 {
160         int size;
161
162         if (!(lh->lh_flags & LH_REHASH))
163                 return 0;
164
165         if ((lh->lh_cur_size < lh->lh_max_size) &&
166             (__lustre_hash_theta(lh) > lh->lh_max_theta))
167                 size = min(lh->lh_cur_size * 2, lh->lh_max_size);
168         else if ((lh->lh_cur_size > lh->lh_min_size) &&
169             (__lustre_hash_theta(lh) < lh->lh_min_theta))
170                 size = max(lh->lh_cur_size / 2, lh->lh_min_size);
171         else
172                 size = 0;
173
174         if (lh->lh_cur_size == size)
175                 size = 0;
176
177         return size;
178 }
179
180 /**
181  * Add item @hnode to lustre hash @lh using @key.  The registered
182  * ops->lh_get function will be called when the item is added.
183  */
184 void
185 lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
186 {
187         lustre_hash_bucket_t *lhb;
188         int                   size;
189         unsigned              i;
190         ENTRY;
191
192         __lustre_hash_key_validate(lh, key, hnode);
193
194         read_lock(&lh->lh_rwlock);
195         i = lh_hash(lh, key, lh->lh_cur_size - 1);
196         lhb = &lh->lh_buckets[i];
197         LASSERT(i < lh->lh_cur_size);
198         LASSERT(hlist_unhashed(hnode));
199
200         write_lock(&lhb->lhb_rwlock);
201         __lustre_hash_bucket_add(lh, lhb, hnode);
202         write_unlock(&lhb->lhb_rwlock);
203
204         size = lustre_hash_rehash_size(lh);
205         read_unlock(&lh->lh_rwlock);
206         if (size)
207                 lustre_hash_rehash(lh, size);
208
209         EXIT;
210 }
211 EXPORT_SYMBOL(lustre_hash_add);
212
213 static struct hlist_node *
214 lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
215                                  struct hlist_node *hnode)
216 {
217         int                   size = 0;
218         struct hlist_node    *ehnode;
219         lustre_hash_bucket_t *lhb;
220         unsigned              i;
221         ENTRY;
222
223         __lustre_hash_key_validate(lh, key, hnode);
224
225         read_lock(&lh->lh_rwlock);
226         i = lh_hash(lh, key, lh->lh_cur_size - 1);
227         lhb = &lh->lh_buckets[i];
228         LASSERT(i < lh->lh_cur_size);
229         LASSERT(hlist_unhashed(hnode));
230
231         write_lock(&lhb->lhb_rwlock);
232         ehnode = __lustre_hash_bucket_lookup(lh, lhb, key);
233         if (ehnode) {
234                 lh_get(lh, ehnode);
235         } else {
236                 __lustre_hash_bucket_add(lh, lhb, hnode);
237                 ehnode = hnode;
238                 size = lustre_hash_rehash_size(lh);
239         }
240         write_unlock(&lhb->lhb_rwlock);
241         read_unlock(&lh->lh_rwlock);
242         if (size)
243                 lustre_hash_rehash(lh, size);
244
245         RETURN(ehnode);
246 }
247
248 /**
249  * Add item @hnode to lustre hash @lh using @key.  The registered
250  * ops->lh_get function will be called if the item was added.
251  * Returns 0 on success or -EALREADY on key collisions.
252  */
253 int
254 lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
255 {
256         struct hlist_node    *ehnode;
257         ENTRY;
258
259         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
260         if (ehnode != hnode) {
261                 lh_put(lh, ehnode);
262                 RETURN(-EALREADY);
263         }
264
265         RETURN(0);
266 }
267 EXPORT_SYMBOL(lustre_hash_add_unique);
268
269 /**
270  * Add item @hnode to lustre hash @lh using @key.  If this @key
271  * already exists in the hash then ops->lh_get will be called on the
272  * conflicting entry and that entry will be returned to the caller.
273  * Otherwise ops->lh_get is called on the item which was added.
274  */
275 void *
276 lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
277                            struct hlist_node *hnode)
278 {
279         struct hlist_node    *ehnode;
280         void                 *obj;
281         ENTRY;
282
283         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
284         obj = lh_get(lh, ehnode);
285         lh_put(lh, ehnode);
286         RETURN(obj);
287 }
288 EXPORT_SYMBOL(lustre_hash_findadd_unique);
289
290 /**
291  * Delete item @hnode from the lustre hash @lh using @key.  The @key
292  * is required to ensure the correct hash bucket is locked since there
293  * is no direct linkage from the item to the bucket.  The object
294  * removed from the hash will be returned and obs->lh_put is called
295  * on the removed object.
296  */
297 void *
298 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
299 {
300         lustre_hash_bucket_t *lhb;
301         unsigned              i;
302         void                 *obj;
303         ENTRY;
304
305         __lustre_hash_key_validate(lh, key, hnode);
306
307         read_lock(&lh->lh_rwlock);
308         i = lh_hash(lh, key, lh->lh_cur_size - 1);
309         lhb = &lh->lh_buckets[i];
310         LASSERT(i < lh->lh_cur_size);
311         LASSERT(!hlist_unhashed(hnode));
312
313         write_lock(&lhb->lhb_rwlock);
314         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
315         write_unlock(&lhb->lhb_rwlock);
316         read_unlock(&lh->lh_rwlock);
317
318         RETURN(obj);
319 }
320 EXPORT_SYMBOL(lustre_hash_del);
321
322 /**
323  * Delete item given @key in lustre hash @lh.  The first @key found in
324  * the hash will be removed, if the key exists multiple times in the hash
325  * @lh this function must be called once per key.  The removed object
326  * will be returned and ops->lh_put is called on the removed object.
327  */
328 void *
329 lustre_hash_del_key(lustre_hash_t *lh, void *key)
330 {
331         struct hlist_node    *hnode;
332         lustre_hash_bucket_t *lhb;
333         unsigned              i;
334         void                 *obj = NULL;
335         ENTRY;
336
337         read_lock(&lh->lh_rwlock);
338         i = lh_hash(lh, key, lh->lh_cur_size - 1);
339         lhb = &lh->lh_buckets[i];
340         LASSERT(i < lh->lh_cur_size);
341
342         write_lock(&lhb->lhb_rwlock);
343         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
344         if (hnode)
345                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
346
347         write_unlock(&lhb->lhb_rwlock);
348         read_unlock(&lh->lh_rwlock);
349
350         RETURN(obj);
351 }
352 EXPORT_SYMBOL(lustre_hash_del_key);
353
354 /**
355  * Lookup an item using @key in the lustre hash @lh and return it.
356  * If the @key is found in the hash lh->lh_get() is called and the
357  * matching objects is returned.  It is the callers responsibility
358  * to call the counterpart ops->lh_put using the lh_put() macro
359  * when when finished with the object.  If the @key was not found
360  * in the hash @lh NULL is returned.
361  */
362 void *
363 lustre_hash_lookup(lustre_hash_t *lh, void *key)
364 {
365         struct hlist_node    *hnode;
366         lustre_hash_bucket_t *lhb;
367         unsigned              i;
368         void                 *obj = NULL;
369         ENTRY;
370
371         read_lock(&lh->lh_rwlock);
372         i = lh_hash(lh, key, lh->lh_cur_size - 1);
373         lhb = &lh->lh_buckets[i];
374         LASSERT(i < lh->lh_cur_size);
375
376         read_lock(&lhb->lhb_rwlock);
377         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
378         if (hnode)
379                 obj = lh_get(lh, hnode);
380
381         read_unlock(&lhb->lhb_rwlock);
382         read_unlock(&lh->lh_rwlock);
383
384         RETURN(obj);
385 }
386 EXPORT_SYMBOL(lustre_hash_lookup);
387
388 /**
389  * For each item in the lustre hash @lh call the passed callback @func
390  * and pass to it as an argument each hash item and the private @data.
391  * Before each callback ops->lh_get will be called, and after each
392  * callback ops->lh_put will be called.  Finally, during the callback
393  * the bucket lock is held so the callback must never sleep.
394  */
395 void
396 lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data)
397 {
398         struct hlist_node    *hnode;
399         lustre_hash_bucket_t *lhb;
400         void                 *obj;
401         int                   i;
402         ENTRY;
403
404         read_lock(&lh->lh_rwlock);
405         lh_for_each_bucket(lh, lhb, i) {
406                 read_lock(&lhb->lhb_rwlock);
407                 hlist_for_each(hnode, &(lhb->lhb_head)) {
408                         __lustre_hash_bucket_validate(lh, lhb, hnode);
409                         obj = lh_get(lh, hnode);
410                         func(obj, data);
411                         (void)lh_put(lh, hnode);
412                 }
413                 read_unlock(&lhb->lhb_rwlock);
414         }
415         read_unlock(&lh->lh_rwlock);
416
417         EXIT;
418 }
419 EXPORT_SYMBOL(lustre_hash_for_each);
420
421 /**
422  * For each item in the lustre hash @lh call the passed callback @func
423  * and pass to it as an argument each hash item and the private @data.
424  * Before each callback ops->lh_get will be called, and after each
425  * callback ops->lh_put will be called.  During the callback the
426  * bucket lock will not be held will allows for the current item
427  * to be removed from the hash during the callback.  However, care
428  * should be taken to prevent other callers from operating on the
429  * hash concurrently or list corruption may occur.
430  */
431 void
432 lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data)
433 {
434         struct hlist_node    *hnode;
435         struct hlist_node    *pos;
436         lustre_hash_bucket_t *lhb;
437         void                 *obj;
438         int                   i;
439         ENTRY;
440
441         read_lock(&lh->lh_rwlock);
442         lh_for_each_bucket(lh, lhb, i) {
443                 read_lock(&lhb->lhb_rwlock);
444                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
445                         __lustre_hash_bucket_validate(lh, lhb, hnode);
446                         obj = lh_get(lh, hnode);
447                         read_unlock(&lhb->lhb_rwlock);
448                         func(obj, data);
449                         read_lock(&lhb->lhb_rwlock);
450                         (void)lh_put(lh, hnode);
451                 }
452                 read_unlock(&lhb->lhb_rwlock);
453         }
454         read_unlock(&lh->lh_rwlock);
455         EXIT;
456 }
457 EXPORT_SYMBOL(lustre_hash_for_each_safe);
458
459 /**
460  * For each hash bucket in the lustre hash @lh call the passed callback
461  * @func until all the hash buckets are empty.  The passed callback @func
462  * or the previously registered callback lh->lh_put must remove the item
463  * from the hash.  You may either use the lustre_hash_del() or hlist_del()
464  * functions.  No rwlocks will be held during the callback @func it is
465  * safe to sleep if needed.  This function will not terminate until the
466  * hash is empty.  Note it is still possible to concurrently add new
467  * items in to the hash.  It is the callers responsibility to ensure
468  * the required locking is in place to prevent concurrent insertions.
469  */
470 void
471 lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data)
472 {
473         struct hlist_node    *hnode;
474         lustre_hash_bucket_t *lhb;
475         void                 *obj;
476         int                   i;
477         ENTRY;
478
479 restart:
480         read_lock(&lh->lh_rwlock);
481         lh_for_each_bucket(lh, lhb, i) {
482                 write_lock(&lhb->lhb_rwlock);
483                 while (!hlist_empty(&lhb->lhb_head)) {
484                         hnode =  lhb->lhb_head.first;
485                         __lustre_hash_bucket_validate(lh, lhb, hnode);
486                         obj = lh_get(lh, hnode);
487                         write_unlock(&lhb->lhb_rwlock);
488                         read_unlock(&lh->lh_rwlock);
489                         func(obj, data);
490                         (void)lh_put(lh, hnode);
491                         goto restart;
492                 }
493                 write_unlock(&lhb->lhb_rwlock);
494         }
495         read_unlock(&lh->lh_rwlock);
496         EXIT;
497 }
498 EXPORT_SYMBOL(lustre_hash_for_each_empty);
499
500 /**
501  * For each item in the lustre hash @lh which matches the @key call
502  * the passed callback @func and pass to it as an argument each hash
503  * item and the private @data.  Before each callback ops->lh_get will
504  * be called, and after each callback ops->lh_put will be called.
505  * Finally, during the callback the bucket lock is held so the
506  * callback must never sleep.
507    */
508 void
509 lustre_hash_for_each_key(lustre_hash_t *lh, void *key,
510                          lh_for_each_cb func, void *data)
511 {
512         struct hlist_node    *hnode;
513         lustre_hash_bucket_t *lhb;
514         unsigned              i;
515         ENTRY;
516
517         read_lock(&lh->lh_rwlock);
518         i = lh_hash(lh, key, lh->lh_cur_size - 1);
519         lhb = &lh->lh_buckets[i];
520         LASSERT(i < lh->lh_cur_size);
521
522         read_lock(&lhb->lhb_rwlock);
523         hlist_for_each(hnode, &(lhb->lhb_head)) {
524                 __lustre_hash_bucket_validate(lh, lhb, hnode);
525
526                 if (!lh_compare(lh, key, hnode))
527                         continue;
528
529                 func(lh_get(lh, hnode), data);
530                 (void)lh_put(lh, hnode);
531         }
532
533         read_unlock(&lhb->lhb_rwlock);
534         read_unlock(&lh->lh_rwlock);
535
536         EXIT;
537 }
538 EXPORT_SYMBOL(lustre_hash_for_each_key);
539
540 /**
541  * Rehash the lustre hash @lh to the given @size.  This can be used
542  * to grow the hash size when excessive chaining is detected, or to
543  * shrink the hash when it is larger than needed.  When the LH_REHASH
544  * flag is set in @lh the lustre hash may be dynamically rehashed
545  * during addition or removal if the hash's theta value exceeds
546  * either the lh->lh_min_theta or lh->max_theta values.  By default
547  * these values are tuned to keep the chained hash depth small, and
548  * this approach assumes a reasonably uniform hashing function.  The
549  * theta thresholds for @lh are tunable via lustre_hash_set_theta().
550  */
551 int
552 lustre_hash_rehash(lustre_hash_t *lh, int size)
553 {
554         struct hlist_node     *hnode;
555         struct hlist_node     *pos;
556         lustre_hash_bucket_t  *lh_buckets;
557         lustre_hash_bucket_t  *rehash_buckets;
558         lustre_hash_bucket_t  *lh_lhb;
559         lustre_hash_bucket_t  *rehash_lhb;
560         int                    i;
561         int                    lh_size;
562         int                    theta;
563         void                  *key;
564         ENTRY;
565
566         LASSERT(size > 0);
567         LASSERT(!in_interrupt());
568
569         OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) * size);
570         if (!rehash_buckets)
571                 RETURN(-ENOMEM);
572
573         for (i = 0; i < size; i++) {
574                 INIT_HLIST_HEAD(&rehash_buckets[i].lhb_head);
575                 rwlock_init(&rehash_buckets[i].lhb_rwlock);
576                 atomic_set(&rehash_buckets[i].lhb_count, 0);
577         }
578
579         write_lock(&lh->lh_rwlock);
580
581         /*
582          * Early return for multiple concurrent racing callers,
583          * ensure we only trigger the rehash if it is still needed.
584          */
585         theta = __lustre_hash_theta(lh);
586         if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
587                 OBD_VFREE(rehash_buckets, sizeof(*rehash_buckets) * size);
588                 write_unlock(&lh->lh_rwlock);
589                 RETURN(-EALREADY);
590         }
591
592         lh_size = lh->lh_cur_size;
593         lh_buckets = lh->lh_buckets;
594
595         lh->lh_cur_size = size;
596         lh->lh_buckets = rehash_buckets;
597         atomic_inc(&lh->lh_rehash_count);
598
599         for (i = 0; i < lh_size; i++) {
600                 lh_lhb = &lh_buckets[i];
601
602                 write_lock(&lh_lhb->lhb_rwlock);
603                 hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
604                         key = lh_key(lh, hnode);
605                         LASSERT(key);
606
607                         /*
608                          * Validate hnode is in the correct bucket.
609                          */
610                         if (unlikely(lh->lh_flags & LH_DEBUG))
611                                 LASSERT(lh_hash(lh, key, lh_size - 1) == i);
612
613                         /*
614                          * Delete from old hash bucket.
615                          */
616                         hlist_del(hnode);
617                         LASSERT(atomic_read(&lh_lhb->lhb_count) > 0);
618                         atomic_dec(&lh_lhb->lhb_count);
619
620                         /*
621                          * Add to rehash bucket, ops->lh_key must be defined.
622                          */
623                         rehash_lhb = &rehash_buckets[lh_hash(lh, key, size-1)];
624                         hlist_add_head(hnode, &(rehash_lhb->lhb_head));
625                         atomic_inc(&rehash_lhb->lhb_count);
626                 }
627
628                 LASSERT(hlist_empty(&(lh_lhb->lhb_head)));
629                 LASSERT(atomic_read(&lh_lhb->lhb_count) == 0);
630                 write_unlock(&lh_lhb->lhb_rwlock);
631         }
632
633         OBD_VFREE(lh_buckets, sizeof(*lh_buckets) * lh_size);
634         write_unlock(&lh->lh_rwlock);
635
636         RETURN(0);
637 }
638 EXPORT_SYMBOL(lustre_hash_rehash);
639
640 /**
641  * Rehash the object referenced by @hnode in the lustre hash @lh.  The
642  * @old_key must be provided to locate the objects previous location
643  * in the hash, and the @new_key will be used to reinsert the object.
644  * Use this function instead of a lustre_hash_add() + lustre_hash_del()
645  * combo when it is critical that there is no window in time where the
646  * object is missing from the hash.  When an object is being rehashed
647  * the registered lh_get() and lh_put() functions will not be called.
648  */
649 void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key,
650                             struct hlist_node *hnode)
651 {
652         lustre_hash_bucket_t  *old_lhb;
653         lustre_hash_bucket_t  *new_lhb;
654         unsigned               i;
655         int                    j;
656         ENTRY;
657
658         __lustre_hash_key_validate(lh, new_key, hnode);
659         LASSERT(!hlist_unhashed(hnode));
660
661         read_lock(&lh->lh_rwlock);
662
663         i = lh_hash(lh, old_key, lh->lh_cur_size - 1);
664         old_lhb = &lh->lh_buckets[i];
665         LASSERT(i < lh->lh_cur_size);
666
667         j = lh_hash(lh, new_key, lh->lh_cur_size - 1);
668         new_lhb = &lh->lh_buckets[j];
669         LASSERT(j < lh->lh_cur_size);
670
671         write_lock(&old_lhb->lhb_rwlock);
672         write_lock(&new_lhb->lhb_rwlock);
673
674         /*
675          * Migrate item between hash buckets without calling
676          * the lh_get() and lh_put() callback functions.
677          */
678         hlist_del(hnode);
679         LASSERT(atomic_read(&old_lhb->lhb_count) > 0);
680         atomic_dec(&old_lhb->lhb_count);
681         hlist_add_head(hnode, &(new_lhb->lhb_head));
682         atomic_inc(&new_lhb->lhb_count);
683
684         write_unlock(&new_lhb->lhb_rwlock);
685         write_unlock(&old_lhb->lhb_rwlock);
686         read_unlock(&lh->lh_rwlock);
687
688         EXIT;
689 }
690 EXPORT_SYMBOL(lustre_hash_rehash_key);
691
692 int lustre_hash_debug_header(char *str, int size)
693 {
694         return snprintf(str, size,
695                  "%-36s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n",
696                  "name", "cur", "min", "max", "theta", "t-min", "t-max",
697                  "flags", "rehash", "count", " distribution");
698 }
699 EXPORT_SYMBOL(lustre_hash_debug_header);
700
701 int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size)
702 {
703         lustre_hash_bucket_t  *lhb;
704         int                    theta;
705         int                    i;
706         int                    c = 0;
707         int                    dist[8] = { 0, };
708
709         if (str == NULL || size == 0)
710                 return 0;
711
712         read_lock(&lh->lh_rwlock);
713         theta = __lustre_hash_theta(lh);
714
715         c += snprintf(str + c, size - c, "%-36s ",lh->lh_name);
716         c += snprintf(str + c, size - c, "%5d ",  lh->lh_cur_size);
717         c += snprintf(str + c, size - c, "%5d ",  lh->lh_min_size);
718         c += snprintf(str + c, size - c, "%5d ",  lh->lh_max_size);
719         c += snprintf(str + c, size - c, "%d.%03d ",
720                       theta / 1000, theta % 1000);
721         c += snprintf(str + c, size - c, "%d.%03d ",
722                       lh->lh_min_theta / 1000, lh->lh_min_theta % 1000);
723         c += snprintf(str + c, size - c, "%d.%03d ",
724                       lh->lh_max_theta / 1000, lh->lh_max_theta % 1000);
725         c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags);
726         c += snprintf(str + c, size - c, "%6d ",
727                       atomic_read(&lh->lh_rehash_count));
728         c += snprintf(str + c, size - c, "%5d ",
729                       atomic_read(&lh->lh_count));
730
731         /*
732          * The distribution is a summary of the chained hash depth in
733          * each of the lustre hash buckets.  Each buckets lhb_count is
734          * divided by the hash theta value and used to generate a
735          * histogram of the hash distribution.  A uniform hash will
736          * result in all hash buckets being close to the average thus
737          * only the first few entries in the histogram will be non-zero.
738          * If you hash function results in a non-uniform hash the will
739          * be observable by outlier bucks in the distribution histogram.
740          *
741          * Uniform hash distribution:      128/128/0/0/0/0/0/0
742          * Non-Uniform hash distribution:  128/125/0/0/0/0/2/1
743          */
744         lh_for_each_bucket(lh, lhb, i)
745                 dist[MIN(__fls(atomic_read(&lhb->lhb_count)/MAX(theta,1)),7)]++;
746
747         for (i = 0; i < 8; i++)
748                 c += snprintf(str + c, size - c, "%d%c",  dist[i],
749                               (i == 7) ? '\n' : '/');
750
751         read_unlock(&lh->lh_rwlock);
752
753         return c;
754 }
755 EXPORT_SYMBOL(lustre_hash_debug_str);