Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdclass / class_hash.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/class_hash.c
37  *
38  * Implement a hash class for hash process in lustre system.
39  *
40  * Author: YuZhangyong <yzy@clusterfs.com>
41  *
42  * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov>
43  * - Simplified API and improved documentation
44  * - Added per-hash feature flags:
45  *   * LH_DEBUG additional validation
46  *   * LH_REHASH dynamic rehashing
47  * - Added per-hash statistics
48  * - General performance enhancements
49  */
50
51 #ifndef __KERNEL__
52 #include <liblustre.h>
53 #include <obd.h>
54 #endif
55
56 #include <class_hash.h>
57
58 /**
59  * Initialize new lustre hash, where:
60  * @name     - Descriptive hash name
61  * @cur_bits - Initial hash table size, in bits
62  * @max_bits - Maximum allowed hash table resize, in bits
63  * @ops      - Registered hash table operations
64  * @flags    - LH_REHASH enable synamic hash resizing
65  *           - LH_SORT enable chained hash sort
66  */
67 lustre_hash_t *
68 lustre_hash_init(char *name, unsigned int cur_bits, unsigned int max_bits,
69                  lustre_hash_ops_t *ops, int flags)
70 {
71         lustre_hash_t *lh;
72         int            i;
73         ENTRY;
74
75         LASSERT(name != NULL);
76         LASSERT(ops != NULL);
77
78         LASSERT(cur_bits > 0);
79         LASSERT(max_bits >= cur_bits);
80         LASSERT(max_bits < 31);
81
82         OBD_ALLOC_PTR(lh);
83         if (!lh)
84                 RETURN(NULL);
85
86         strncpy(lh->lh_name, name, sizeof(lh->lh_name));
87         atomic_set(&lh->lh_rehash_count, 0);
88         atomic_set(&lh->lh_count, 0);
89         rwlock_init(&lh->lh_rwlock);
90         lh->lh_cur_bits = cur_bits;
91         lh->lh_cur_mask = (1 << cur_bits) - 1;
92         lh->lh_min_bits = cur_bits;
93         lh->lh_max_bits = max_bits;
94         /* XXX: need to fixup lustre_hash_rehash_bits() before this can be
95          *      anything other than 0.5 and 2.0 */
96         lh->lh_min_theta = 1 << (LH_THETA_BITS - 1);
97         lh->lh_max_theta = 1 << (LH_THETA_BITS + 1);
98         lh->lh_ops = ops;
99         lh->lh_flags = flags;
100
101         /* theta * 1000 */
102         __lustre_hash_set_theta(lh, 500, 2000);
103
104         OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
105         if (!lh->lh_buckets) {
106                 OBD_FREE_PTR(lh);
107                 RETURN(NULL);
108         }
109
110         for (i = 0; i <= lh->lh_cur_mask; i++) {
111                 INIT_HLIST_HEAD(&lh->lh_buckets[i].lhb_head);
112                 rwlock_init(&lh->lh_buckets[i].lhb_rwlock);
113                 atomic_set(&lh->lh_buckets[i].lhb_count, 0);
114         }
115
116         return lh;
117 }
118 EXPORT_SYMBOL(lustre_hash_init);
119
120 /**
121  * Cleanup lustre hash @lh.
122  */
123 void
124 lustre_hash_exit(lustre_hash_t *lh)
125 {
126         lustre_hash_bucket_t *lhb;
127         struct hlist_node    *hnode;
128         struct hlist_node    *pos;
129         int                   i;
130         ENTRY;
131
132         LASSERT(lh != NULL);
133
134         write_lock(&lh->lh_rwlock);
135
136         lh_for_each_bucket(lh, lhb, i) {
137                 write_lock(&lhb->lhb_rwlock);
138                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
139                         __lustre_hash_bucket_validate(lh, lhb, hnode);
140                         __lustre_hash_bucket_del(lh, lhb, hnode);
141                         lh_exit(lh, hnode);
142                 }
143
144                 LASSERT(hlist_empty(&(lhb->lhb_head)));
145                 LASSERT(atomic_read(&lhb->lhb_count) == 0);
146                 write_unlock(&lhb->lhb_rwlock);
147         }
148
149         OBD_VFREE(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
150         LASSERT(atomic_read(&lh->lh_count) == 0);
151         write_unlock(&lh->lh_rwlock);
152
153         OBD_FREE_PTR(lh);
154         EXIT;
155 }
156 EXPORT_SYMBOL(lustre_hash_exit);
157
158 static inline unsigned int lustre_hash_rehash_bits(lustre_hash_t *lh)
159 {
160         if (!(lh->lh_flags & LH_REHASH))
161                 return 0;
162
163         /* XXX: need to handle case with max_theta != 2.0
164          *      and the case with min_theta != 0.5 */
165         if ((lh->lh_cur_bits < lh->lh_max_bits) &&
166             (__lustre_hash_theta(lh) > lh->lh_max_theta))
167                 return lh->lh_cur_bits + 1;
168
169         if ((lh->lh_cur_bits > lh->lh_min_bits) &&
170             (__lustre_hash_theta(lh) < lh->lh_min_theta))
171                 return lh->lh_cur_bits - 1;
172
173         return 0;
174 }
175
176 /**
177  * Add item @hnode to lustre hash @lh using @key.  The registered
178  * ops->lh_get function will be called when the item is added.
179  */
180 void
181 lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
182 {
183         lustre_hash_bucket_t *lhb;
184         int                   bits;
185         unsigned              i;
186         ENTRY;
187
188         __lustre_hash_key_validate(lh, key, hnode);
189
190         read_lock(&lh->lh_rwlock);
191         i = lh_hash(lh, key, lh->lh_cur_mask);
192         lhb = &lh->lh_buckets[i];
193         LASSERT(i <= lh->lh_cur_mask);
194         LASSERT(hlist_unhashed(hnode));
195
196         write_lock(&lhb->lhb_rwlock);
197         __lustre_hash_bucket_add(lh, lhb, hnode);
198         write_unlock(&lhb->lhb_rwlock);
199
200         bits = lustre_hash_rehash_bits(lh);
201         read_unlock(&lh->lh_rwlock);
202         if (bits)
203                 lustre_hash_rehash(lh, bits);
204
205         EXIT;
206 }
207 EXPORT_SYMBOL(lustre_hash_add);
208
209 static struct hlist_node *
210 lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
211                                  struct hlist_node *hnode)
212 {
213         int                   bits = 0;
214         struct hlist_node    *ehnode;
215         lustre_hash_bucket_t *lhb;
216         unsigned              i;
217         ENTRY;
218
219         __lustre_hash_key_validate(lh, key, hnode);
220
221         read_lock(&lh->lh_rwlock);
222         i = lh_hash(lh, key, lh->lh_cur_mask);
223         lhb = &lh->lh_buckets[i];
224         LASSERT(i <= lh->lh_cur_mask);
225         LASSERT(hlist_unhashed(hnode));
226
227         write_lock(&lhb->lhb_rwlock);
228         ehnode = __lustre_hash_bucket_lookup(lh, lhb, key);
229         if (ehnode) {
230                 lh_get(lh, ehnode);
231         } else {
232                 __lustre_hash_bucket_add(lh, lhb, hnode);
233                 ehnode = hnode;
234                 bits = lustre_hash_rehash_bits(lh);
235         }
236         write_unlock(&lhb->lhb_rwlock);
237         read_unlock(&lh->lh_rwlock);
238         if (bits)
239                 lustre_hash_rehash(lh, bits);
240
241         RETURN(ehnode);
242 }
243
244 /**
245  * Add item @hnode to lustre hash @lh using @key.  The registered
246  * ops->lh_get function will be called if the item was added.
247  * Returns 0 on success or -EALREADY on key collisions.
248  */
249 int
250 lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
251 {
252         struct hlist_node    *ehnode;
253         ENTRY;
254
255         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
256         if (ehnode != hnode) {
257                 lh_put(lh, ehnode);
258                 RETURN(-EALREADY);
259         }
260         RETURN(0);
261 }
262 EXPORT_SYMBOL(lustre_hash_add_unique);
263
264 /**
265  * Add item @hnode to lustre hash @lh using @key.  If this @key
266  * already exists in the hash then ops->lh_get will be called on the
267  * conflicting entry and that entry will be returned to the caller.
268  * Otherwise ops->lh_get is called on the item which was added.
269  */
270 void *
271 lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
272                            struct hlist_node *hnode)
273 {
274         struct hlist_node    *ehnode;
275         void                 *obj;
276         ENTRY;
277
278         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
279         obj = lh_get(lh, ehnode);
280         lh_put(lh, ehnode);
281         RETURN(obj);
282 }
283 EXPORT_SYMBOL(lustre_hash_findadd_unique);
284
285 /**
286  * Delete item @hnode from the lustre hash @lh using @key.  The @key
287  * is required to ensure the correct hash bucket is locked since there
288  * is no direct linkage from the item to the bucket.  The object
289  * removed from the hash will be returned and obs->lh_put is called
290  * on the removed object.
291  */
292 void *
293 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
294 {
295         lustre_hash_bucket_t *lhb;
296         unsigned              i;
297         void                 *obj;
298         ENTRY;
299
300         __lustre_hash_key_validate(lh, key, hnode);
301
302         read_lock(&lh->lh_rwlock);
303         i = lh_hash(lh, key, lh->lh_cur_mask);
304         lhb = &lh->lh_buckets[i];
305         LASSERT(i <= lh->lh_cur_mask);
306         LASSERT(!hlist_unhashed(hnode));
307
308         write_lock(&lhb->lhb_rwlock);
309         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
310         write_unlock(&lhb->lhb_rwlock);
311         read_unlock(&lh->lh_rwlock);
312
313         RETURN(obj);
314 }
315 EXPORT_SYMBOL(lustre_hash_del);
316
317 /**
318  * Delete item given @key in lustre hash @lh.  The first @key found in
319  * the hash will be removed, if the key exists multiple times in the hash
320  * @lh this function must be called once per key.  The removed object
321  * will be returned and ops->lh_put is called on the removed object.
322  */
323 void *
324 lustre_hash_del_key(lustre_hash_t *lh, void *key)
325 {
326         struct hlist_node    *hnode;
327         lustre_hash_bucket_t *lhb;
328         unsigned              i;
329         void                 *obj = NULL;
330         ENTRY;
331
332         read_lock(&lh->lh_rwlock);
333         i = lh_hash(lh, key, lh->lh_cur_mask);
334         lhb = &lh->lh_buckets[i];
335         LASSERT(i <= lh->lh_cur_mask);
336
337         write_lock(&lhb->lhb_rwlock);
338         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
339         if (hnode)
340                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
341
342         write_unlock(&lhb->lhb_rwlock);
343         read_unlock(&lh->lh_rwlock);
344
345         RETURN(obj);
346 }
347 EXPORT_SYMBOL(lustre_hash_del_key);
348
349 /**
350  * Lookup an item using @key in the lustre hash @lh and return it.
351  * If the @key is found in the hash lh->lh_get() is called and the
352  * matching objects is returned.  It is the callers responsibility
353  * to call the counterpart ops->lh_put using the lh_put() macro
354  * when when finished with the object.  If the @key was not found
355  * in the hash @lh NULL is returned.
356  */
357 void *
358 lustre_hash_lookup(lustre_hash_t *lh, void *key)
359 {
360         struct hlist_node    *hnode;
361         lustre_hash_bucket_t *lhb;
362         unsigned              i;
363         void                 *obj = NULL;
364         ENTRY;
365
366         read_lock(&lh->lh_rwlock);
367         i = lh_hash(lh, key, lh->lh_cur_mask);
368         lhb = &lh->lh_buckets[i];
369         LASSERT(i <= lh->lh_cur_mask);
370
371         read_lock(&lhb->lhb_rwlock);
372         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
373         if (hnode)
374                 obj = lh_get(lh, hnode);
375
376         read_unlock(&lhb->lhb_rwlock);
377         read_unlock(&lh->lh_rwlock);
378
379         RETURN(obj);
380 }
381 EXPORT_SYMBOL(lustre_hash_lookup);
382
383 /**
384  * For each item in the lustre hash @lh call the passed callback @func
385  * and pass to it as an argument each hash item and the private @data.
386  * Before each callback ops->lh_get will be called, and after each
387  * callback ops->lh_put will be called.  Finally, during the callback
388  * the bucket lock is held so the callback must never sleep.
389  */
390 void
391 lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data)
392 {
393         struct hlist_node    *hnode;
394         lustre_hash_bucket_t *lhb;
395         void                 *obj;
396         int                   i;
397         ENTRY;
398
399         read_lock(&lh->lh_rwlock);
400         lh_for_each_bucket(lh, lhb, i) {
401                 read_lock(&lhb->lhb_rwlock);
402                 hlist_for_each(hnode, &(lhb->lhb_head)) {
403                         __lustre_hash_bucket_validate(lh, lhb, hnode);
404                         obj = lh_get(lh, hnode);
405                         func(obj, data);
406                         (void)lh_put(lh, hnode);
407                 }
408                 read_unlock(&lhb->lhb_rwlock);
409         }
410         read_unlock(&lh->lh_rwlock);
411
412         EXIT;
413 }
414 EXPORT_SYMBOL(lustre_hash_for_each);
415
416 /**
417  * For each item in the lustre hash @lh call the passed callback @func
418  * and pass to it as an argument each hash item and the private @data.
419  * Before each callback ops->lh_get will be called, and after each
420  * callback ops->lh_put will be called.  During the callback the
421  * bucket lock will not be held will allows for the current item
422  * to be removed from the hash during the callback.  However, care
423  * should be taken to prevent other callers from operating on the
424  * hash concurrently or list corruption may occur.
425  */
426 void
427 lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data)
428 {
429         struct hlist_node    *hnode;
430         struct hlist_node    *pos;
431         lustre_hash_bucket_t *lhb;
432         void                 *obj;
433         int                   i;
434         ENTRY;
435
436         read_lock(&lh->lh_rwlock);
437         lh_for_each_bucket(lh, lhb, i) {
438                 read_lock(&lhb->lhb_rwlock);
439                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
440                         __lustre_hash_bucket_validate(lh, lhb, hnode);
441                         obj = lh_get(lh, hnode);
442                         read_unlock(&lhb->lhb_rwlock);
443                         func(obj, data);
444                         read_lock(&lhb->lhb_rwlock);
445                         (void)lh_put(lh, hnode);
446                 }
447                 read_unlock(&lhb->lhb_rwlock);
448         }
449         read_unlock(&lh->lh_rwlock);
450         EXIT;
451 }
452 EXPORT_SYMBOL(lustre_hash_for_each_safe);
453
454 /**
455  * For each hash bucket in the lustre hash @lh call the passed callback
456  * @func until all the hash buckets are empty.  The passed callback @func
457  * or the previously registered callback lh->lh_put must remove the item
458  * from the hash.  You may either use the lustre_hash_del() or hlist_del()
459  * functions.  No rwlocks will be held during the callback @func it is
460  * safe to sleep if needed.  This function will not terminate until the
461  * hash is empty.  Note it is still possible to concurrently add new
462  * items in to the hash.  It is the callers responsibility to ensure
463  * the required locking is in place to prevent concurrent insertions.
464  */
465 void
466 lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data)
467 {
468         struct hlist_node    *hnode;
469         lustre_hash_bucket_t *lhb;
470         void                 *obj;
471         int                   i;
472         ENTRY;
473
474 restart:
475         read_lock(&lh->lh_rwlock);
476         lh_for_each_bucket(lh, lhb, i) {
477                 write_lock(&lhb->lhb_rwlock);
478                 while (!hlist_empty(&lhb->lhb_head)) {
479                         hnode =  lhb->lhb_head.first;
480                         __lustre_hash_bucket_validate(lh, lhb, hnode);
481                         obj = lh_get(lh, hnode);
482                         write_unlock(&lhb->lhb_rwlock);
483                         read_unlock(&lh->lh_rwlock);
484                         func(obj, data);
485                         (void)lh_put(lh, hnode);
486                         goto restart;
487                 }
488                 write_unlock(&lhb->lhb_rwlock);
489         }
490         read_unlock(&lh->lh_rwlock);
491         EXIT;
492 }
493 EXPORT_SYMBOL(lustre_hash_for_each_empty);
494
495 /*
496  * For each item in the lustre hash @lh which matches the @key call
497  * the passed callback @func and pass to it as an argument each hash
498  * item and the private @data.  Before each callback ops->lh_get will
499  * be called, and after each callback ops->lh_put will be called.
500  * Finally, during the callback the bucket lock is held so the
501  * callback must never sleep.
502    */
503 void
504 lustre_hash_for_each_key(lustre_hash_t *lh, void *key,
505                          lh_for_each_cb func, void *data)
506 {
507         struct hlist_node    *hnode;
508         lustre_hash_bucket_t *lhb;
509         unsigned              i;
510         ENTRY;
511
512         read_lock(&lh->lh_rwlock);
513         i = lh_hash(lh, key, lh->lh_cur_mask);
514         lhb = &lh->lh_buckets[i];
515         LASSERT(i <= lh->lh_cur_mask);
516
517         read_lock(&lhb->lhb_rwlock);
518         hlist_for_each(hnode, &(lhb->lhb_head)) {
519                 __lustre_hash_bucket_validate(lh, lhb, hnode);
520
521                 if (!lh_compare(lh, key, hnode))
522                         continue;
523
524                 func(lh_get(lh, hnode), data);
525                 (void)lh_put(lh, hnode);
526         }
527
528         read_unlock(&lhb->lhb_rwlock);
529         read_unlock(&lh->lh_rwlock);
530
531         EXIT;
532 }
533 EXPORT_SYMBOL(lustre_hash_for_each_key);
534
535 /**
536  * Rehash the lustre hash @lh to the given @bits.  This can be used
537  * to grow the hash size when excessive chaining is detected, or to
538  * shrink the hash when it is larger than needed.  When the LH_REHASH
539  * flag is set in @lh the lustre hash may be dynamically rehashed
540  * during addition or removal if the hash's theta value exceeds
541  * either the lh->lh_min_theta or lh->max_theta values.  By default
542  * these values are tuned to keep the chained hash depth small, and
543  * this approach assumes a reasonably uniform hashing function.  The
544  * theta thresholds for @lh are tunable via lustre_hash_set_theta().
545  */
546 int
547 lustre_hash_rehash(lustre_hash_t *lh, int bits)
548 {
549         struct hlist_node     *hnode;
550         struct hlist_node     *pos;
551         lustre_hash_bucket_t  *lh_buckets;
552         lustre_hash_bucket_t  *rehash_buckets;
553         lustre_hash_bucket_t  *lh_lhb;
554         lustre_hash_bucket_t  *rehash_lhb;
555         int                    i;
556         int                    theta;
557         int                    lh_mask;
558         int                    lh_bits;
559         int                    mask = (1 << bits) - 1;
560         void                  *key;
561         ENTRY;
562
563         LASSERT(!in_interrupt());
564         LASSERT(mask > 0);
565
566         OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) << bits);
567         if (!rehash_buckets)
568                 RETURN(-ENOMEM);
569
570         for (i = 0; i <= mask; i++) {
571                 INIT_HLIST_HEAD(&rehash_buckets[i].lhb_head);
572                 rwlock_init(&rehash_buckets[i].lhb_rwlock);
573                 atomic_set(&rehash_buckets[i].lhb_count, 0);
574         }
575
576         write_lock(&lh->lh_rwlock);
577
578         /*
579          * Early return for multiple concurrent racing callers,
580          * ensure we only trigger the rehash if it is still needed.
581          */
582         theta = __lustre_hash_theta(lh);
583         if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
584                 OBD_VFREE(rehash_buckets, sizeof(*rehash_buckets) << bits);
585                 write_unlock(&lh->lh_rwlock);
586                 RETURN(-EALREADY);
587         }
588
589         lh_bits = lh->lh_cur_bits;
590         lh_buckets = lh->lh_buckets;
591         lh_mask = (1 << lh_bits) - 1;
592
593         lh->lh_cur_bits = bits;
594         lh->lh_cur_mask = (1 << bits) - 1;
595         lh->lh_buckets = rehash_buckets;
596         atomic_inc(&lh->lh_rehash_count);
597
598         for (i = 0; i <= lh_mask; i++) {
599                 lh_lhb = &lh_buckets[i];
600
601                 write_lock(&lh_lhb->lhb_rwlock);
602                 hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
603                         key = lh_key(lh, hnode);
604                         LASSERT(key);
605
606                         /*
607                          * Validate hnode is in the correct bucket.
608                          */
609                         if (unlikely(lh->lh_flags & LH_DEBUG))
610                                 LASSERT(lh_hash(lh, key, lh_mask) == i);
611
612                         /*
613                          * Delete from old hash bucket.
614                          */
615                         hlist_del(hnode);
616                         LASSERT(atomic_read(&lh_lhb->lhb_count) > 0);
617                         atomic_dec(&lh_lhb->lhb_count);
618
619                         /*
620                          * Add to rehash bucket, ops->lh_key must be defined.
621                          */
622                         rehash_lhb = &rehash_buckets[lh_hash(lh, key, mask)];
623                         hlist_add_head(hnode, &(rehash_lhb->lhb_head));
624                         atomic_inc(&rehash_lhb->lhb_count);
625                 }
626
627                 LASSERT(hlist_empty(&(lh_lhb->lhb_head)));
628                 LASSERT(atomic_read(&lh_lhb->lhb_count) == 0);
629                 write_unlock(&lh_lhb->lhb_rwlock);
630         }
631
632         OBD_VFREE(lh_buckets, sizeof(*lh_buckets) << lh_bits);
633         write_unlock(&lh->lh_rwlock);
634
635         RETURN(0);
636 }
637 EXPORT_SYMBOL(lustre_hash_rehash);
638
639 /**
640  * Rehash the object referenced by @hnode in the lustre hash @lh.  The
641  * @old_key must be provided to locate the objects previous location
642  * in the hash, and the @new_key will be used to reinsert the object.
643  * Use this function instead of a lustre_hash_add() + lustre_hash_del()
644  * combo when it is critical that there is no window in time where the
645  * object is missing from the hash.  When an object is being rehashed
646  * the registered lh_get() and lh_put() functions will not be called.
647  */
648 void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key,
649                             struct hlist_node *hnode)
650 {
651         lustre_hash_bucket_t  *old_lhb;
652         lustre_hash_bucket_t  *new_lhb;
653         unsigned               i;
654         int                    j;
655         ENTRY;
656
657         __lustre_hash_key_validate(lh, new_key, hnode);
658         LASSERT(!hlist_unhashed(hnode));
659
660         read_lock(&lh->lh_rwlock);
661
662         i = lh_hash(lh, old_key, lh->lh_cur_mask);
663         old_lhb = &lh->lh_buckets[i];
664         LASSERT(i <= lh->lh_cur_mask);
665
666         j = lh_hash(lh, new_key, lh->lh_cur_mask);
667         new_lhb = &lh->lh_buckets[j];
668         LASSERT(j <= lh->lh_cur_mask);
669
670         write_lock(&old_lhb->lhb_rwlock);
671         write_lock(&new_lhb->lhb_rwlock);
672
673         /*
674          * Migrate item between hash buckets without calling
675          * the lh_get() and lh_put() callback functions.
676          */
677         hlist_del(hnode);
678         LASSERT(atomic_read(&old_lhb->lhb_count) > 0);
679         atomic_dec(&old_lhb->lhb_count);
680         hlist_add_head(hnode, &(new_lhb->lhb_head));
681         atomic_inc(&new_lhb->lhb_count);
682
683         write_unlock(&new_lhb->lhb_rwlock);
684         write_unlock(&old_lhb->lhb_rwlock);
685         read_unlock(&lh->lh_rwlock);
686
687         EXIT;
688 }
689 EXPORT_SYMBOL(lustre_hash_rehash_key);
690
691 int lustre_hash_debug_header(char *str, int size)
692 {
693         return snprintf(str, size,
694                  "%-36s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n",
695                  "name", "cur", "min", "max", "theta", "t-min", "t-max",
696                  "flags", "rehash", "count", " distribution");
697 }
698 EXPORT_SYMBOL(lustre_hash_debug_header);
699
700 int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size)
701 {
702         lustre_hash_bucket_t  *lhb;
703         int                    theta;
704         int                    i;
705         int                    c = 0;
706         int                    dist[8] = { 0, };
707
708         if (str == NULL || size == 0)
709                 return 0;
710
711         read_lock(&lh->lh_rwlock);
712         theta = __lustre_hash_theta(lh);
713
714         c += snprintf(str + c, size - c, "%-36s ", lh->lh_name);
715         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_cur_bits);
716         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_min_bits);
717         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_max_bits);
718         c += snprintf(str + c, size - c, "%d.%03d ",
719                       __lustre_hash_theta_int(theta),
720                       __lustre_hash_theta_frac(theta));
721         c += snprintf(str + c, size - c, "%d.%03d ",
722                       __lustre_hash_theta_int(lh->lh_min_theta),
723                       __lustre_hash_theta_frac(lh->lh_min_theta));
724         c += snprintf(str + c, size - c, "%d.%03d ",
725                       __lustre_hash_theta_int(lh->lh_max_theta),
726                       __lustre_hash_theta_frac(lh->lh_max_theta));
727         c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags);
728         c += snprintf(str + c, size - c, "%6d ",
729                       atomic_read(&lh->lh_rehash_count));
730         c += snprintf(str + c, size - c, "%5d ",
731                       atomic_read(&lh->lh_count));
732
733         /*
734          * The distribution is a summary of the chained hash depth in
735          * each of the lustre hash buckets.  Each buckets lhb_count is
736          * divided by the hash theta value and used to generate a
737          * histogram of the hash distribution.  A uniform hash will
738          * result in all hash buckets being close to the average thus
739          * only the first few entries in the histogram will be non-zero.
740          * If you hash function results in a non-uniform hash the will
741          * be observable by outlier bucks in the distribution histogram.
742          *
743          * Uniform hash distribution:      128/128/0/0/0/0/0/0
744          * Non-Uniform hash distribution:  128/125/0/0/0/0/2/1
745          */
746         lh_for_each_bucket(lh, lhb, i)
747                 dist[min(__fls(atomic_read(&lhb->lhb_count)/max(theta,1)),7)]++;
748
749         for (i = 0; i < 8; i++)
750                 c += snprintf(str + c, size - c, "%d%c",  dist[i],
751                               (i == 7) ? '\n' : '/');
752
753         read_unlock(&lh->lh_rwlock);
754
755         return c;
756 }
757 EXPORT_SYMBOL(lustre_hash_debug_str);