Whamcloud - gitweb
b=20433 decrease the usage of memory on clients.
[fs/lustre-release.git] / lustre / obdclass / class_hash.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/class_hash.c
37  *
38  * Implement a hash class for hash process in lustre system.
39  *
40  * Author: YuZhangyong <yzy@clusterfs.com>
41  *
42  * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov>
43  * - Simplified API and improved documentation
44  * - Added per-hash feature flags:
45  *   * LH_DEBUG additional validation
46  *   * LH_REHASH dynamic rehashing
47  * - Added per-hash statistics
48  * - General performance enhancements
49  */
50
51 #ifndef __KERNEL__
52 #include <liblustre.h>
53 #include <obd.h>
54 #endif
55
56 #include <class_hash.h>
57
58 static void
59 lh_read_lock(lustre_hash_t *lh)
60 {
61         if ((lh->lh_flags & LH_REHASH) != 0)
62                 read_lock(&lh->lh_rwlock);
63 }
64
65 static void
66 lh_read_unlock(lustre_hash_t *lh)
67 {
68         if ((lh->lh_flags & LH_REHASH) != 0)
69                 read_unlock(&lh->lh_rwlock);
70 }
71
72 static void
73 lh_write_lock(lustre_hash_t *lh)
74 {
75         if ((lh->lh_flags & LH_REHASH) != 0)
76                 write_lock(&lh->lh_rwlock);
77 }
78
79 static void
80 lh_write_unlock(lustre_hash_t *lh)
81 {
82         if ((lh->lh_flags & LH_REHASH) != 0)
83                 write_unlock(&lh->lh_rwlock);
84 }
85
86 /**
87  * Initialize new lustre hash, where:
88  * @name     - Descriptive hash name
89  * @cur_bits - Initial hash table size, in bits
90  * @max_bits - Maximum allowed hash table resize, in bits
91  * @ops      - Registered hash table operations
92  * @flags    - LH_REHASH enable synamic hash resizing
93  *           - LH_SORT enable chained hash sort
94  */
95 lustre_hash_t *
96 lustre_hash_init(char *name, unsigned int cur_bits, unsigned int max_bits,
97                  lustre_hash_ops_t *ops, int flags)
98 {
99         lustre_hash_t *lh;
100         int            i;
101         ENTRY;
102
103         LASSERT(name != NULL);
104         LASSERT(ops != NULL);
105
106         LASSERT(cur_bits > 0);
107         LASSERT(max_bits >= cur_bits);
108         LASSERT(max_bits < 31);
109
110         LIBCFS_ALLOC_PTR(lh);
111         if (!lh)
112                 RETURN(NULL);
113
114         strncpy(lh->lh_name, name, sizeof(lh->lh_name));
115         lh->lh_name[sizeof(lh->lh_name) - 1] = '\0';
116         atomic_set(&lh->lh_rehash_count, 0);
117         atomic_set(&lh->lh_count, 0);
118         rwlock_init(&lh->lh_rwlock);
119         lh->lh_cur_bits = cur_bits;
120         lh->lh_cur_mask = (1 << cur_bits) - 1;
121         lh->lh_min_bits = cur_bits;
122         lh->lh_max_bits = max_bits;
123         /* XXX: need to fixup lustre_hash_rehash_bits() before this can be
124          *      anything other than 0.5 and 2.0 */
125         lh->lh_min_theta = 1 << (LH_THETA_BITS - 1);
126         lh->lh_max_theta = 1 << (LH_THETA_BITS + 1);
127         lh->lh_ops = ops;
128         lh->lh_flags = flags;
129         if (cur_bits != max_bits && (lh->lh_flags & LH_REHASH) == 0)
130                 CERROR("Rehash is disabled for %s, ignore max_bits %d\n",
131                        name, max_bits);
132
133         /* theta * 1000 */
134         __lustre_hash_set_theta(lh, 500, 2000);
135
136         LIBCFS_ALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
137         if (!lh->lh_buckets) {
138                 LIBCFS_FREE_PTR(lh);
139                 RETURN(NULL);
140         }
141
142         for (i = 0; i <= lh->lh_cur_mask; i++) {
143                 LIBCFS_ALLOC(lh->lh_buckets[i], sizeof(lustre_hash_bucket_t));
144                 if (lh->lh_buckets[i] == NULL) {
145                         lustre_hash_exit(lh);
146                         return NULL;
147                 }
148
149                 INIT_HLIST_HEAD(&lh->lh_buckets[i]->lhb_head);
150                 rwlock_init(&lh->lh_buckets[i]->lhb_rwlock);
151                 atomic_set(&lh->lh_buckets[i]->lhb_count, 0);
152         }
153
154         return lh;
155 }
156 EXPORT_SYMBOL(lustre_hash_init);
157
158 /**
159  * Cleanup lustre hash @lh.
160  */
161 void
162 lustre_hash_exit(lustre_hash_t *lh)
163 {
164         lustre_hash_bucket_t *lhb;
165         struct hlist_node    *hnode;
166         struct hlist_node    *pos;
167         int                   i;
168         ENTRY;
169
170         LASSERT(lh != NULL);
171
172         lh_write_lock(lh);
173
174         lh_for_each_bucket(lh, lhb, i) {
175                 if (lhb == NULL)
176                         continue;
177
178                 write_lock(&lhb->lhb_rwlock);
179                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
180                         __lustre_hash_bucket_validate(lh, lhb, hnode);
181                         __lustre_hash_bucket_del(lh, lhb, hnode);
182                         lh_exit(lh, hnode);
183                 }
184
185                 LASSERTF(hlist_empty(&(lhb->lhb_head)),
186                          "hash bucket %d from %s is not empty\n", i, lh->lh_name);
187                 LASSERTF(atomic_read(&lhb->lhb_count) == 0,
188                          "hash bucket %d from %s has #entries > 0 (%d)\n", i,
189                          lh->lh_name, atomic_read(&lhb->lhb_count));
190                 write_unlock(&lhb->lhb_rwlock);
191                 LIBCFS_FREE_PTR(lhb);
192         }
193
194         LASSERTF(atomic_read(&lh->lh_count) == 0,
195                  "hash %s still has #entries > 0 (%d)\n", lh->lh_name,
196                  atomic_read(&lh->lh_count));
197         lh_write_unlock(lh);
198
199         LIBCFS_FREE(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
200         LIBCFS_FREE_PTR(lh);
201         EXIT;
202 }
203 EXPORT_SYMBOL(lustre_hash_exit);
204
205 static inline unsigned int lustre_hash_rehash_bits(lustre_hash_t *lh)
206 {
207         if (!(lh->lh_flags & LH_REHASH))
208                 return 0;
209
210         /* XXX: need to handle case with max_theta != 2.0
211          *      and the case with min_theta != 0.5 */
212         if ((lh->lh_cur_bits < lh->lh_max_bits) &&
213             (__lustre_hash_theta(lh) > lh->lh_max_theta))
214                 return lh->lh_cur_bits + 1;
215
216         if ((lh->lh_cur_bits > lh->lh_min_bits) &&
217             (__lustre_hash_theta(lh) < lh->lh_min_theta))
218                 return lh->lh_cur_bits - 1;
219
220         return 0;
221 }
222
223 /**
224  * Add item @hnode to lustre hash @lh using @key.  The registered
225  * ops->lh_get function will be called when the item is added.
226  */
227 void
228 lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
229 {
230         lustre_hash_bucket_t *lhb;
231         int                   bits;
232         unsigned              i;
233         ENTRY;
234
235         __lustre_hash_key_validate(lh, key, hnode);
236
237         lh_read_lock(lh);
238         i = lh_hash(lh, key, lh->lh_cur_mask);
239         lhb = lh->lh_buckets[i];
240         LASSERT(i <= lh->lh_cur_mask);
241         LASSERT(hlist_unhashed(hnode));
242
243         write_lock(&lhb->lhb_rwlock);
244         __lustre_hash_bucket_add(lh, lhb, hnode);
245         write_unlock(&lhb->lhb_rwlock);
246
247         bits = lustre_hash_rehash_bits(lh);
248         lh_read_unlock(lh);
249         if (bits)
250                 lustre_hash_rehash(lh, bits);
251
252         EXIT;
253 }
254 EXPORT_SYMBOL(lustre_hash_add);
255
256 static struct hlist_node *
257 lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
258                                  struct hlist_node *hnode)
259 {
260         int                   bits = 0;
261         struct hlist_node    *ehnode;
262         lustre_hash_bucket_t *lhb;
263         unsigned              i;
264         ENTRY;
265
266         __lustre_hash_key_validate(lh, key, hnode);
267
268         lh_read_lock(lh);
269         i = lh_hash(lh, key, lh->lh_cur_mask);
270         lhb = lh->lh_buckets[i];
271         LASSERT(i <= lh->lh_cur_mask);
272         LASSERT(hlist_unhashed(hnode));
273
274         write_lock(&lhb->lhb_rwlock);
275         ehnode = __lustre_hash_bucket_lookup(lh, lhb, key);
276         if (ehnode) {
277                 lh_get(lh, ehnode);
278         } else {
279                 __lustre_hash_bucket_add(lh, lhb, hnode);
280                 ehnode = hnode;
281                 bits = lustre_hash_rehash_bits(lh);
282         }
283         write_unlock(&lhb->lhb_rwlock);
284         lh_read_unlock(lh);
285         if (bits)
286                 lustre_hash_rehash(lh, bits);
287
288         RETURN(ehnode);
289 }
290
291 /**
292  * Add item @hnode to lustre hash @lh using @key.  The registered
293  * ops->lh_get function will be called if the item was added.
294  * Returns 0 on success or -EALREADY on key collisions.
295  */
296 int
297 lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
298 {
299         struct hlist_node    *ehnode;
300         ENTRY;
301
302         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
303         if (ehnode != hnode) {
304                 lh_put(lh, ehnode);
305                 RETURN(-EALREADY);
306         }
307         RETURN(0);
308 }
309 EXPORT_SYMBOL(lustre_hash_add_unique);
310
311 /**
312  * Add item @hnode to lustre hash @lh using @key.  If this @key
313  * already exists in the hash then ops->lh_get will be called on the
314  * conflicting entry and that entry will be returned to the caller.
315  * Otherwise ops->lh_get is called on the item which was added.
316  */
317 void *
318 lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
319                            struct hlist_node *hnode)
320 {
321         struct hlist_node    *ehnode;
322         void                 *obj;
323         ENTRY;
324
325         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
326         obj = lh_get(lh, ehnode);
327         lh_put(lh, ehnode);
328         RETURN(obj);
329 }
330 EXPORT_SYMBOL(lustre_hash_findadd_unique);
331
332 /**
333  * Delete item @hnode from the lustre hash @lh using @key.  The @key
334  * is required to ensure the correct hash bucket is locked since there
335  * is no direct linkage from the item to the bucket.  The object
336  * removed from the hash will be returned and obs->lh_put is called
337  * on the removed object.
338  */
339 void *
340 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
341 {
342         lustre_hash_bucket_t *lhb;
343         unsigned              i;
344         void                 *obj;
345         ENTRY;
346
347         __lustre_hash_key_validate(lh, key, hnode);
348
349         lh_read_lock(lh);
350         i = lh_hash(lh, key, lh->lh_cur_mask);
351         lhb = lh->lh_buckets[i];
352         LASSERT(i <= lh->lh_cur_mask);
353
354         write_lock(&lhb->lhb_rwlock);
355         LASSERT(!hlist_unhashed(hnode));
356         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
357         write_unlock(&lhb->lhb_rwlock);
358         lh_read_unlock(lh);
359
360         RETURN(obj);
361 }
362 EXPORT_SYMBOL(lustre_hash_del);
363
364 /**
365  * Delete item given @key in lustre hash @lh.  The first @key found in
366  * the hash will be removed, if the key exists multiple times in the hash
367  * @lh this function must be called once per key.  The removed object
368  * will be returned and ops->lh_put is called on the removed object.
369  */
370 void *
371 lustre_hash_del_key(lustre_hash_t *lh, void *key)
372 {
373         struct hlist_node    *hnode;
374         lustre_hash_bucket_t *lhb;
375         unsigned              i;
376         void                 *obj = NULL;
377         ENTRY;
378
379         lh_read_lock(lh);
380         i = lh_hash(lh, key, lh->lh_cur_mask);
381         lhb = lh->lh_buckets[i];
382         LASSERT(i <= lh->lh_cur_mask);
383
384         write_lock(&lhb->lhb_rwlock);
385         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
386         if (hnode)
387                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
388
389         write_unlock(&lhb->lhb_rwlock);
390         lh_read_unlock(lh);
391
392         RETURN(obj);
393 }
394 EXPORT_SYMBOL(lustre_hash_del_key);
395
396 /**
397  * Lookup an item using @key in the lustre hash @lh and return it.
398  * If the @key is found in the hash lh->lh_get() is called and the
399  * matching objects is returned.  It is the callers responsibility
400  * to call the counterpart ops->lh_put using the lh_put() macro
401  * when when finished with the object.  If the @key was not found
402  * in the hash @lh NULL is returned.
403  */
404 void *
405 lustre_hash_lookup(lustre_hash_t *lh, void *key)
406 {
407         struct hlist_node    *hnode;
408         lustre_hash_bucket_t *lhb;
409         unsigned              i;
410         void                 *obj = NULL;
411         ENTRY;
412
413         lh_read_lock(lh);
414         i = lh_hash(lh, key, lh->lh_cur_mask);
415         lhb = lh->lh_buckets[i];
416         LASSERT(i <= lh->lh_cur_mask);
417
418         read_lock(&lhb->lhb_rwlock);
419         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
420         if (hnode)
421                 obj = lh_get(lh, hnode);
422
423         read_unlock(&lhb->lhb_rwlock);
424         lh_read_unlock(lh);
425
426         RETURN(obj);
427 }
428 EXPORT_SYMBOL(lustre_hash_lookup);
429
430 /**
431  * For each item in the lustre hash @lh call the passed callback @func
432  * and pass to it as an argument each hash item and the private @data.
433  * Before each callback ops->lh_get will be called, and after each
434  * callback ops->lh_put will be called.  Finally, during the callback
435  * the bucket lock is held so the callback must never sleep.
436  */
437 void
438 lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data)
439 {
440         struct hlist_node    *hnode;
441         lustre_hash_bucket_t *lhb;
442         void                 *obj;
443         int                   i;
444         ENTRY;
445
446         lh_read_lock(lh);
447         lh_for_each_bucket(lh, lhb, i) {
448                 read_lock(&lhb->lhb_rwlock);
449                 hlist_for_each(hnode, &(lhb->lhb_head)) {
450                         __lustre_hash_bucket_validate(lh, lhb, hnode);
451                         obj = lh_get(lh, hnode);
452                         func(obj, data);
453                         (void)lh_put(lh, hnode);
454                 }
455                 read_unlock(&lhb->lhb_rwlock);
456         }
457         lh_read_unlock(lh);
458
459         EXIT;
460 }
461 EXPORT_SYMBOL(lustre_hash_for_each);
462
463 /**
464  * For each item in the lustre hash @lh call the passed callback @func
465  * and pass to it as an argument each hash item and the private @data.
466  * Before each callback ops->lh_get will be called, and after each
467  * callback ops->lh_put will be called.  During the callback the
468  * bucket lock will not be held will allows for the current item
469  * to be removed from the hash during the callback.  However, care
470  * should be taken to prevent other callers from operating on the
471  * hash concurrently or list corruption may occur.
472  */
473 void
474 lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data)
475 {
476         struct hlist_node    *hnode;
477         struct hlist_node    *pos;
478         lustre_hash_bucket_t *lhb;
479         void                 *obj;
480         int                   i;
481         ENTRY;
482
483         lh_read_lock(lh);
484         lh_for_each_bucket(lh, lhb, i) {
485                 read_lock(&lhb->lhb_rwlock);
486                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
487                         __lustre_hash_bucket_validate(lh, lhb, hnode);
488                         obj = lh_get(lh, hnode);
489                         read_unlock(&lhb->lhb_rwlock);
490                         func(obj, data);
491                         read_lock(&lhb->lhb_rwlock);
492                         (void)lh_put(lh, hnode);
493                 }
494                 read_unlock(&lhb->lhb_rwlock);
495         }
496         lh_read_unlock(lh);
497         EXIT;
498 }
499 EXPORT_SYMBOL(lustre_hash_for_each_safe);
500
501 /**
502  * For each hash bucket in the lustre hash @lh call the passed callback
503  * @func until all the hash buckets are empty.  The passed callback @func
504  * or the previously registered callback lh->lh_put must remove the item
505  * from the hash.  You may either use the lustre_hash_del() or hlist_del()
506  * functions.  No rwlocks will be held during the callback @func it is
507  * safe to sleep if needed.  This function will not terminate until the
508  * hash is empty.  Note it is still possible to concurrently add new
509  * items in to the hash.  It is the callers responsibility to ensure
510  * the required locking is in place to prevent concurrent insertions.
511  */
512 void
513 lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data)
514 {
515         struct hlist_node    *hnode;
516         lustre_hash_bucket_t *lhb;
517         lustre_hash_bucket_t **lhb_last = NULL;
518         void                 *obj;
519         int                   i = 0;
520         ENTRY;
521
522 restart:
523         lh_read_lock(lh);
524         /* If the hash table has changed since we last held lh_rwlock,
525          * we need to start traversing the list from the start. */
526         if (lh->lh_buckets != lhb_last) {
527                 i = 0;
528                 lhb_last = lh->lh_buckets;
529         }
530         lh_for_each_bucket_restart(lh, lhb, i) {
531                 write_lock(&lhb->lhb_rwlock);
532                 while (!hlist_empty(&lhb->lhb_head)) {
533                         hnode =  lhb->lhb_head.first;
534                         __lustre_hash_bucket_validate(lh, lhb, hnode);
535                         obj = lh_get(lh, hnode);
536                         write_unlock(&lhb->lhb_rwlock);
537                         lh_read_unlock(lh);
538                         func(obj, data);
539                         (void)lh_put(lh, hnode);
540                         cfs_cond_resched();
541                         goto restart;
542                 }
543                 write_unlock(&lhb->lhb_rwlock);
544         }
545         lh_read_unlock(lh);
546         EXIT;
547 }
548 EXPORT_SYMBOL(lustre_hash_for_each_empty);
549
550 /*
551  * For each item in the lustre hash @lh which matches the @key call
552  * the passed callback @func and pass to it as an argument each hash
553  * item and the private @data.  Before each callback ops->lh_get will
554  * be called, and after each callback ops->lh_put will be called.
555  * Finally, during the callback the bucket lock is held so the
556  * callback must never sleep.
557    */
558 void
559 lustre_hash_for_each_key(lustre_hash_t *lh, void *key,
560                          lh_for_each_cb func, void *data)
561 {
562         struct hlist_node    *hnode;
563         lustre_hash_bucket_t *lhb;
564         unsigned              i;
565         ENTRY;
566
567         lh_read_lock(lh);
568         i = lh_hash(lh, key, lh->lh_cur_mask);
569         lhb = lh->lh_buckets[i];
570         LASSERT(i <= lh->lh_cur_mask);
571
572         read_lock(&lhb->lhb_rwlock);
573         hlist_for_each(hnode, &(lhb->lhb_head)) {
574                 __lustre_hash_bucket_validate(lh, lhb, hnode);
575
576                 if (!lh_compare(lh, key, hnode))
577                         continue;
578
579                 func(lh_get(lh, hnode), data);
580                 (void)lh_put(lh, hnode);
581         }
582
583         read_unlock(&lhb->lhb_rwlock);
584         lh_read_unlock(lh);
585
586         EXIT;
587 }
588 EXPORT_SYMBOL(lustre_hash_for_each_key);
589
590 /**
591  * Rehash the lustre hash @lh to the given @bits.  This can be used
592  * to grow the hash size when excessive chaining is detected, or to
593  * shrink the hash when it is larger than needed.  When the LH_REHASH
594  * flag is set in @lh the lustre hash may be dynamically rehashed
595  * during addition or removal if the hash's theta value exceeds
596  * either the lh->lh_min_theta or lh->max_theta values.  By default
597  * these values are tuned to keep the chained hash depth small, and
598  * this approach assumes a reasonably uniform hashing function.  The
599  * theta thresholds for @lh are tunable via lustre_hash_set_theta().
600  */
601 int
602 lustre_hash_rehash(lustre_hash_t *lh, int bits)
603 {
604         struct hlist_node     *hnode;
605         struct hlist_node     *pos;
606         lustre_hash_bucket_t **lh_buckets;
607         lustre_hash_bucket_t **rehash_buckets;
608         lustre_hash_bucket_t  *lh_lhb;
609         lustre_hash_bucket_t  *rehash_lhb;
610         int                    i;
611         int                    theta;
612         int                    lh_mask;
613         int                    lh_bits;
614         int                    mask = (1 << bits) - 1;
615         int                    rc = 0;
616         void                  *key;
617         ENTRY;
618
619         LASSERT(!in_interrupt());
620         LASSERT(mask > 0);
621         LASSERT((lh->lh_flags & LH_REHASH) != 0);
622
623         LIBCFS_ALLOC(rehash_buckets, sizeof(*rehash_buckets) << bits);
624         if (!rehash_buckets)
625                 RETURN(-ENOMEM);
626
627         for (i = 0; i <= mask; i++) {
628                 LIBCFS_ALLOC(rehash_buckets[i], sizeof(*rehash_buckets[i]));
629                 if (rehash_buckets[i] == NULL)
630                         GOTO(free, rc = -ENOMEM);
631
632                 INIT_HLIST_HEAD(&rehash_buckets[i]->lhb_head);
633                 rwlock_init(&rehash_buckets[i]->lhb_rwlock);
634                 atomic_set(&rehash_buckets[i]->lhb_count, 0);
635         }
636
637         lh_write_lock(lh);
638
639         /*
640          * Early return for multiple concurrent racing callers,
641          * ensure we only trigger the rehash if it is still needed.
642          */
643         theta = __lustre_hash_theta(lh);
644         if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
645                 lh_write_unlock(lh);
646                 GOTO(free, rc = -EALREADY);
647         }
648
649         lh_bits = lh->lh_cur_bits;
650         lh_buckets = lh->lh_buckets;
651         lh_mask = (1 << lh_bits) - 1;
652
653         lh->lh_cur_bits = bits;
654         lh->lh_cur_mask = (1 << bits) - 1;
655         lh->lh_buckets = rehash_buckets;
656         atomic_inc(&lh->lh_rehash_count);
657
658         for (i = 0; i <= lh_mask; i++) {
659                 lh_lhb = lh_buckets[i];
660
661                 write_lock(&lh_lhb->lhb_rwlock);
662                 hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
663                         key = lh_key(lh, hnode);
664                         LASSERT(key);
665
666                         /*
667                          * Validate hnode is in the correct bucket.
668                          */
669                         if (unlikely(lh->lh_flags & LH_DEBUG))
670                                 LASSERT(lh_hash(lh, key, lh_mask) == i);
671
672                         /*
673                          * Delete from old hash bucket.
674                          */
675                         hlist_del(hnode);
676                         LASSERT(atomic_read(&lh_lhb->lhb_count) > 0);
677                         atomic_dec(&lh_lhb->lhb_count);
678
679                         /*
680                          * Add to rehash bucket, ops->lh_key must be defined.
681                          */
682                         rehash_lhb = rehash_buckets[lh_hash(lh, key, mask)];
683                         hlist_add_head(hnode, &(rehash_lhb->lhb_head));
684                         atomic_inc(&rehash_lhb->lhb_count);
685                 }
686
687                 LASSERT(hlist_empty(&(lh_lhb->lhb_head)));
688                 LASSERT(atomic_read(&lh_lhb->lhb_count) == 0);
689                 write_unlock(&lh_lhb->lhb_rwlock);
690         }
691
692         lh_write_unlock(lh);
693         rehash_buckets = lh_buckets;
694         i = (1 << lh_bits);
695         bits = lh_bits;
696 free:
697         while (i-- > 0)
698                 LIBCFS_FREE(rehash_buckets[i], sizeof(*rehash_buckets[i]));
699         LIBCFS_FREE(rehash_buckets, sizeof(*rehash_buckets) << bits);
700
701         RETURN(rc);
702 }
703 EXPORT_SYMBOL(lustre_hash_rehash);
704
705 /**
706  * Rehash the object referenced by @hnode in the lustre hash @lh.  The
707  * @old_key must be provided to locate the objects previous location
708  * in the hash, and the @new_key will be used to reinsert the object.
709  * Use this function instead of a lustre_hash_add() + lustre_hash_del()
710  * combo when it is critical that there is no window in time where the
711  * object is missing from the hash.  When an object is being rehashed
712  * the registered lh_get() and lh_put() functions will not be called.
713  */
714 void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key,
715                             struct hlist_node *hnode)
716 {
717         lustre_hash_bucket_t  *old_lhb;
718         lustre_hash_bucket_t  *new_lhb;
719         unsigned               i;
720         unsigned               j;
721         ENTRY;
722
723         __lustre_hash_key_validate(lh, new_key, hnode);
724         LASSERT(!hlist_unhashed(hnode));
725
726         lh_read_lock(lh);
727
728         i = lh_hash(lh, old_key, lh->lh_cur_mask);
729         old_lhb = lh->lh_buckets[i];
730         LASSERT(i <= lh->lh_cur_mask);
731
732         j = lh_hash(lh, new_key, lh->lh_cur_mask);
733         new_lhb = lh->lh_buckets[j];
734         LASSERT(j <= lh->lh_cur_mask);
735
736         if (i < j) { /* write_lock ordering */
737                 write_lock(&old_lhb->lhb_rwlock);
738                 write_lock(&new_lhb->lhb_rwlock);
739         } else if (i > j) {
740                 write_lock(&new_lhb->lhb_rwlock);
741                 write_lock(&old_lhb->lhb_rwlock);
742         } else { /* do nothing */
743                 lh_read_unlock(lh);
744                 EXIT;
745                 return;
746         }
747
748         /*
749          * Migrate item between hash buckets without calling
750          * the lh_get() and lh_put() callback functions.
751          */
752         hlist_del(hnode);
753         LASSERT(atomic_read(&old_lhb->lhb_count) > 0);
754         atomic_dec(&old_lhb->lhb_count);
755         hlist_add_head(hnode, &(new_lhb->lhb_head));
756         atomic_inc(&new_lhb->lhb_count);
757
758         write_unlock(&new_lhb->lhb_rwlock);
759         write_unlock(&old_lhb->lhb_rwlock);
760         lh_read_unlock(lh);
761
762         EXIT;
763 }
764 EXPORT_SYMBOL(lustre_hash_rehash_key);
765
766 int lustre_hash_debug_header(char *str, int size)
767 {
768         return snprintf(str, size,
769                  "%-*s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n", LUSTRE_MAX_HASH_NAME,
770                  "name", "cur", "min", "max", "theta", "t-min", "t-max",
771                  "flags", "rehash", "count", " distribution");
772 }
773 EXPORT_SYMBOL(lustre_hash_debug_header);
774
775 int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size)
776 {
777         lustre_hash_bucket_t  *lhb;
778         int                    theta;
779         int                    i;
780         int                    c = 0;
781         int                    dist[8] = { 0, };
782
783         if (str == NULL || size == 0)
784                 return 0;
785
786         lh_read_lock(lh);
787         theta = __lustre_hash_theta(lh);
788
789         c += snprintf(str + c, size - c, "%-*s ",
790                       LUSTRE_MAX_HASH_NAME, lh->lh_name);
791         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_cur_bits);
792         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_min_bits);
793         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_max_bits);
794         c += snprintf(str + c, size - c, "%d.%03d ",
795                       __lustre_hash_theta_int(theta),
796                       __lustre_hash_theta_frac(theta));
797         c += snprintf(str + c, size - c, "%d.%03d ",
798                       __lustre_hash_theta_int(lh->lh_min_theta),
799                       __lustre_hash_theta_frac(lh->lh_min_theta));
800         c += snprintf(str + c, size - c, "%d.%03d ",
801                       __lustre_hash_theta_int(lh->lh_max_theta),
802                       __lustre_hash_theta_frac(lh->lh_max_theta));
803         c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags);
804         c += snprintf(str + c, size - c, "%6d ",
805                       atomic_read(&lh->lh_rehash_count));
806         c += snprintf(str + c, size - c, "%5d ",
807                       atomic_read(&lh->lh_count));
808
809         /*
810          * The distribution is a summary of the chained hash depth in
811          * each of the lustre hash buckets.  Each buckets lhb_count is
812          * divided by the hash theta value and used to generate a
813          * histogram of the hash distribution.  A uniform hash will
814          * result in all hash buckets being close to the average thus
815          * only the first few entries in the histogram will be non-zero.
816          * If you hash function results in a non-uniform hash the will
817          * be observable by outlier bucks in the distribution histogram.
818          *
819          * Uniform hash distribution:      128/128/0/0/0/0/0/0
820          * Non-Uniform hash distribution:  128/125/0/0/0/0/2/1
821          */
822         lh_for_each_bucket(lh, lhb, i)
823                 dist[min(__fls(atomic_read(&lhb->lhb_count)/max(theta,1)),7)]++;
824
825         for (i = 0; i < 8; i++)
826                 c += snprintf(str + c, size - c, "%d%c",  dist[i],
827                               (i == 7) ? '\n' : '/');
828
829         lh_read_unlock(lh);
830
831         return c;
832 }
833 EXPORT_SYMBOL(lustre_hash_debug_str);