Whamcloud - gitweb
b=21587 don't LBUG if transno has changed during replay
[fs/lustre-release.git] / lustre / obdclass / class_hash.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/class_hash.c
37  *
38  * Implement a hash class for hash process in lustre system.
39  *
40  * Author: YuZhangyong <yzy@clusterfs.com>
41  *
42  * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov>
43  * - Simplified API and improved documentation
44  * - Added per-hash feature flags:
45  *   * LH_DEBUG additional validation
46  *   * LH_REHASH dynamic rehashing
47  * - Added per-hash statistics
48  * - General performance enhancements
49  */
50
51 #ifndef __KERNEL__
52 #include <liblustre.h>
53 #include <obd.h>
54 #endif
55
56 #include <class_hash.h>
57
58 static void
59 lh_read_lock(lustre_hash_t *lh)
60 {
61         if ((lh->lh_flags & LH_REHASH) != 0)
62                 read_lock(&lh->lh_rwlock);
63 }
64
65 static void
66 lh_read_unlock(lustre_hash_t *lh)
67 {
68         if ((lh->lh_flags & LH_REHASH) != 0)
69                 read_unlock(&lh->lh_rwlock);
70 }
71
72 static void
73 lh_write_lock(lustre_hash_t *lh)
74 {
75         if ((lh->lh_flags & LH_REHASH) != 0)
76                 write_lock(&lh->lh_rwlock);
77 }
78
79 static void
80 lh_write_unlock(lustre_hash_t *lh)
81 {
82         if ((lh->lh_flags & LH_REHASH) != 0)
83                 write_unlock(&lh->lh_rwlock);
84 }
85
86 /**
87  * Initialize new lustre hash, where:
88  * @name     - Descriptive hash name
89  * @cur_bits - Initial hash table size, in bits
90  * @max_bits - Maximum allowed hash table resize, in bits
91  * @ops      - Registered hash table operations
92  * @flags    - LH_REHASH enable synamic hash resizing
93  *           - LH_SORT enable chained hash sort
94  */
95 lustre_hash_t *
96 lustre_hash_init(char *name, unsigned int cur_bits, unsigned int max_bits,
97                  lustre_hash_ops_t *ops, int flags)
98 {
99         lustre_hash_t *lh;
100         int            i;
101         ENTRY;
102
103         LASSERT(name != NULL);
104         LASSERT(ops != NULL);
105
106         LASSERT(cur_bits > 0);
107         LASSERT(max_bits >= cur_bits);
108         LASSERT(max_bits < 31);
109
110         LIBCFS_ALLOC_PTR(lh);
111         if (!lh)
112                 RETURN(NULL);
113
114         strncpy(lh->lh_name, name, sizeof(lh->lh_name));
115         lh->lh_name[sizeof(lh->lh_name) - 1] = '\0';
116         atomic_set(&lh->lh_rehash_count, 0);
117         atomic_set(&lh->lh_count, 0);
118         rwlock_init(&lh->lh_rwlock);
119         lh->lh_cur_bits = cur_bits;
120         lh->lh_cur_mask = (1 << cur_bits) - 1;
121         lh->lh_min_bits = cur_bits;
122         lh->lh_max_bits = max_bits;
123         /* XXX: need to fixup lustre_hash_rehash_bits() before this can be
124          *      anything other than 0.5 and 2.0 */
125         lh->lh_min_theta = 1 << (LH_THETA_BITS - 1);
126         lh->lh_max_theta = 1 << (LH_THETA_BITS + 1);
127         lh->lh_ops = ops;
128         lh->lh_flags = flags;
129         if (cur_bits != max_bits && (lh->lh_flags & LH_REHASH) == 0)
130                 CERROR("Rehash is disabled for %s, ignore max_bits %d\n",
131                        name, max_bits);
132
133         /* theta * 1000 */
134         __lustre_hash_set_theta(lh, 500, 2000);
135
136         LIBCFS_ALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
137         if (!lh->lh_buckets) {
138                 LIBCFS_FREE_PTR(lh);
139                 RETURN(NULL);
140         }
141
142         for (i = 0; i <= lh->lh_cur_mask; i++) {
143                 LIBCFS_ALLOC(lh->lh_buckets[i], sizeof(lustre_hash_bucket_t));
144                 if (lh->lh_buckets[i] == NULL) {
145                         lustre_hash_exit(lh);
146                         return NULL;
147                 }
148
149                 INIT_HLIST_HEAD(&lh->lh_buckets[i]->lhb_head);
150                 rwlock_init(&lh->lh_buckets[i]->lhb_rwlock);
151                 atomic_set(&lh->lh_buckets[i]->lhb_count, 0);
152         }
153
154         return lh;
155 }
156 EXPORT_SYMBOL(lustre_hash_init);
157
158 /**
159  * Cleanup lustre hash @lh.
160  */
161 void
162 lustre_hash_exit(lustre_hash_t *lh)
163 {
164         lustre_hash_bucket_t *lhb;
165         struct hlist_node    *hnode;
166         struct hlist_node    *pos;
167         int                   i;
168         ENTRY;
169
170         LASSERT(lh != NULL);
171
172         lh_write_lock(lh);
173
174         lh_for_each_bucket(lh, lhb, i) {
175                 if (lhb == NULL)
176                         continue;
177
178                 write_lock(&lhb->lhb_rwlock);
179                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
180                         __lustre_hash_bucket_validate(lh, lhb, hnode);
181                         __lustre_hash_bucket_del(lh, lhb, hnode);
182                         lh_exit(lh, hnode);
183                 }
184
185                 LASSERTF(hlist_empty(&(lhb->lhb_head)),
186                          "hash bucket %d from %s is not empty\n", i, lh->lh_name);
187                 LASSERTF(atomic_read(&lhb->lhb_count) == 0,
188                          "hash bucket %d from %s has #entries > 0 (%d)\n", i,
189                          lh->lh_name, atomic_read(&lhb->lhb_count));
190                 write_unlock(&lhb->lhb_rwlock);
191                 LIBCFS_FREE_PTR(lhb);
192         }
193
194         LASSERTF(atomic_read(&lh->lh_count) == 0,
195                  "hash %s still has #entries > 0 (%d)\n", lh->lh_name,
196                  atomic_read(&lh->lh_count));
197         lh_write_unlock(lh);
198
199         LIBCFS_FREE(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
200         LIBCFS_FREE_PTR(lh);
201         EXIT;
202 }
203 EXPORT_SYMBOL(lustre_hash_exit);
204
205 static inline unsigned int lustre_hash_rehash_bits(lustre_hash_t *lh)
206 {
207         if (!(lh->lh_flags & LH_REHASH))
208                 return 0;
209
210         /* XXX: need to handle case with max_theta != 2.0
211          *      and the case with min_theta != 0.5 */
212         if ((lh->lh_cur_bits < lh->lh_max_bits) &&
213             (__lustre_hash_theta(lh) > lh->lh_max_theta))
214                 return lh->lh_cur_bits + 1;
215
216         if ((lh->lh_cur_bits > lh->lh_min_bits) &&
217             (__lustre_hash_theta(lh) < lh->lh_min_theta))
218                 return lh->lh_cur_bits - 1;
219
220         return 0;
221 }
222
223 /**
224  * Add item @hnode to lustre hash @lh using @key.  The registered
225  * ops->lh_get function will be called when the item is added.
226  */
227 void
228 lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
229 {
230         lustre_hash_bucket_t *lhb;
231         int                   bits;
232         unsigned              i;
233         ENTRY;
234
235         __lustre_hash_key_validate(lh, key, hnode);
236
237         lh_read_lock(lh);
238         i = lh_hash(lh, key, lh->lh_cur_mask);
239         lhb = lh->lh_buckets[i];
240         LASSERT(i <= lh->lh_cur_mask);
241         LASSERT(hlist_unhashed(hnode));
242
243         write_lock(&lhb->lhb_rwlock);
244         __lustre_hash_bucket_add(lh, lhb, hnode);
245         write_unlock(&lhb->lhb_rwlock);
246
247         bits = lustre_hash_rehash_bits(lh);
248         lh_read_unlock(lh);
249         if (bits)
250                 lustre_hash_rehash(lh, bits);
251
252         EXIT;
253 }
254 EXPORT_SYMBOL(lustre_hash_add);
255
256 static struct hlist_node *
257 lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
258                                  struct hlist_node *hnode)
259 {
260         int                   bits = 0;
261         struct hlist_node    *ehnode;
262         lustre_hash_bucket_t *lhb;
263         unsigned              i;
264         ENTRY;
265
266         __lustre_hash_key_validate(lh, key, hnode);
267
268         lh_read_lock(lh);
269         i = lh_hash(lh, key, lh->lh_cur_mask);
270         lhb = lh->lh_buckets[i];
271         LASSERT(i <= lh->lh_cur_mask);
272         LASSERT(hlist_unhashed(hnode));
273
274         write_lock(&lhb->lhb_rwlock);
275         ehnode = __lustre_hash_bucket_lookup(lh, lhb, key);
276         if (ehnode) {
277                 lh_get(lh, ehnode);
278         } else {
279                 __lustre_hash_bucket_add(lh, lhb, hnode);
280                 ehnode = hnode;
281                 bits = lustre_hash_rehash_bits(lh);
282         }
283         write_unlock(&lhb->lhb_rwlock);
284         lh_read_unlock(lh);
285         if (bits)
286                 lustre_hash_rehash(lh, bits);
287
288         RETURN(ehnode);
289 }
290
291 /**
292  * Add item @hnode to lustre hash @lh using @key.  The registered
293  * ops->lh_get function will be called if the item was added.
294  * Returns 0 on success or -EALREADY on key collisions.
295  */
296 int
297 lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
298 {
299         struct hlist_node    *ehnode;
300         ENTRY;
301
302         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
303         if (ehnode != hnode) {
304                 lh_put(lh, ehnode);
305                 RETURN(-EALREADY);
306         }
307         RETURN(0);
308 }
309 EXPORT_SYMBOL(lustre_hash_add_unique);
310
311 /**
312  * Add item @hnode to lustre hash @lh using @key.  If this @key
313  * already exists in the hash then ops->lh_get will be called on the
314  * conflicting entry and that entry will be returned to the caller.
315  * Otherwise ops->lh_get is called on the item which was added.
316  */
317 void *
318 lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
319                            struct hlist_node *hnode)
320 {
321         struct hlist_node    *ehnode;
322         void                 *obj;
323         ENTRY;
324
325         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
326         obj = lh_get(lh, ehnode);
327         lh_put(lh, ehnode);
328         RETURN(obj);
329 }
330 EXPORT_SYMBOL(lustre_hash_findadd_unique);
331
332 /**
333  * Delete item @hnode from the lustre hash @lh using @key.  The @key
334  * is required to ensure the correct hash bucket is locked since there
335  * is no direct linkage from the item to the bucket.  The object
336  * removed from the hash will be returned and obs->lh_put is called
337  * on the removed object.
338  */
339 void *
340 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
341 {
342         lustre_hash_bucket_t *lhb;
343         unsigned              i;
344         void                 *obj;
345         ENTRY;
346
347         __lustre_hash_key_validate(lh, key, hnode);
348
349         lh_read_lock(lh);
350         i = lh_hash(lh, key, lh->lh_cur_mask);
351         lhb = lh->lh_buckets[i];
352         LASSERT(i <= lh->lh_cur_mask);
353
354         write_lock(&lhb->lhb_rwlock);
355         LASSERT(!hlist_unhashed(hnode));
356         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
357         write_unlock(&lhb->lhb_rwlock);
358         lh_read_unlock(lh);
359
360         RETURN(obj);
361 }
362 EXPORT_SYMBOL(lustre_hash_del);
363
364 /**
365  * Delete item from the lustre hash @lh when @func return true.
366  * The write lock being hold during loop for each bucket to avoid
367  * any object be reference.
368  */
369 void
370 lustre_hash_cond_del(lustre_hash_t *lh, lh_cond_opt_cb func, void *data)
371 {
372         lustre_hash_bucket_t *lhb;
373         struct hlist_node    *hnode;
374         struct hlist_node    *pos;
375         int                   i;
376         ENTRY;
377
378         LASSERT(lh != NULL);
379
380         lh_write_lock(lh);
381         lh_for_each_bucket(lh, lhb, i) {
382                 if (lhb == NULL)
383                         continue;
384
385                 write_lock(&lhb->lhb_rwlock);
386                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
387                         __lustre_hash_bucket_validate(lh, lhb, hnode);
388                         if (func(lh_get(lh, hnode), data))
389                                 __lustre_hash_bucket_del(lh, lhb, hnode);
390                         (void)lh_put(lh, hnode);
391                 }
392                 write_unlock(&lhb->lhb_rwlock);
393         }
394         lh_write_unlock(lh);
395
396         EXIT;
397 }
398 EXPORT_SYMBOL(lustre_hash_cond_del);
399
400 /**
401  * Delete item given @key in lustre hash @lh.  The first @key found in
402  * the hash will be removed, if the key exists multiple times in the hash
403  * @lh this function must be called once per key.  The removed object
404  * will be returned and ops->lh_put is called on the removed object.
405  */
406 void *
407 lustre_hash_del_key(lustre_hash_t *lh, void *key)
408 {
409         struct hlist_node    *hnode;
410         lustre_hash_bucket_t *lhb;
411         unsigned              i;
412         void                 *obj = NULL;
413         ENTRY;
414
415         lh_read_lock(lh);
416         i = lh_hash(lh, key, lh->lh_cur_mask);
417         lhb = lh->lh_buckets[i];
418         LASSERT(i <= lh->lh_cur_mask);
419
420         write_lock(&lhb->lhb_rwlock);
421         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
422         if (hnode)
423                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
424
425         write_unlock(&lhb->lhb_rwlock);
426         lh_read_unlock(lh);
427
428         RETURN(obj);
429 }
430 EXPORT_SYMBOL(lustre_hash_del_key);
431
432 /**
433  * Lookup an item using @key in the lustre hash @lh and return it.
434  * If the @key is found in the hash lh->lh_get() is called and the
435  * matching objects is returned.  It is the callers responsibility
436  * to call the counterpart ops->lh_put using the lh_put() macro
437  * when when finished with the object.  If the @key was not found
438  * in the hash @lh NULL is returned.
439  */
440 void *
441 lustre_hash_lookup(lustre_hash_t *lh, void *key)
442 {
443         struct hlist_node    *hnode;
444         lustre_hash_bucket_t *lhb;
445         unsigned              i;
446         void                 *obj = NULL;
447         ENTRY;
448
449         lh_read_lock(lh);
450         i = lh_hash(lh, key, lh->lh_cur_mask);
451         lhb = lh->lh_buckets[i];
452         LASSERT(i <= lh->lh_cur_mask);
453
454         read_lock(&lhb->lhb_rwlock);
455         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
456         if (hnode)
457                 obj = lh_get(lh, hnode);
458
459         read_unlock(&lhb->lhb_rwlock);
460         lh_read_unlock(lh);
461
462         RETURN(obj);
463 }
464 EXPORT_SYMBOL(lustre_hash_lookup);
465
466 /**
467  * For each item in the lustre hash @lh call the passed callback @func
468  * and pass to it as an argument each hash item and the private @data.
469  * Before each callback ops->lh_get will be called, and after each
470  * callback ops->lh_put will be called.  Finally, during the callback
471  * the bucket lock is held so the callback must never sleep.
472  */
473 void
474 lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data)
475 {
476         struct hlist_node    *hnode;
477         lustre_hash_bucket_t *lhb;
478         void                 *obj;
479         int                   i;
480         ENTRY;
481
482         lh_read_lock(lh);
483         lh_for_each_bucket(lh, lhb, i) {
484                 read_lock(&lhb->lhb_rwlock);
485                 hlist_for_each(hnode, &(lhb->lhb_head)) {
486                         __lustre_hash_bucket_validate(lh, lhb, hnode);
487                         obj = lh_get(lh, hnode);
488                         func(obj, data);
489                         (void)lh_put(lh, hnode);
490                 }
491                 read_unlock(&lhb->lhb_rwlock);
492         }
493         lh_read_unlock(lh);
494
495         EXIT;
496 }
497 EXPORT_SYMBOL(lustre_hash_for_each);
498
499 /**
500  * For each item in the lustre hash @lh call the passed callback @func
501  * and pass to it as an argument each hash item and the private @data.
502  * Before each callback ops->lh_get will be called, and after each
503  * callback ops->lh_put will be called.  During the callback the
504  * bucket lock will not be held will allows for the current item
505  * to be removed from the hash during the callback.  However, care
506  * should be taken to prevent other callers from operating on the
507  * hash concurrently or list corruption may occur.
508  */
509 void
510 lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data)
511 {
512         struct hlist_node    *hnode;
513         struct hlist_node    *pos;
514         lustre_hash_bucket_t *lhb;
515         void                 *obj;
516         int                   i;
517         ENTRY;
518
519         lh_read_lock(lh);
520         lh_for_each_bucket(lh, lhb, i) {
521                 read_lock(&lhb->lhb_rwlock);
522                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
523                         __lustre_hash_bucket_validate(lh, lhb, hnode);
524                         obj = lh_get(lh, hnode);
525                         read_unlock(&lhb->lhb_rwlock);
526                         func(obj, data);
527                         read_lock(&lhb->lhb_rwlock);
528                         (void)lh_put(lh, hnode);
529                 }
530                 read_unlock(&lhb->lhb_rwlock);
531         }
532         lh_read_unlock(lh);
533         EXIT;
534 }
535 EXPORT_SYMBOL(lustre_hash_for_each_safe);
536
537 /**
538  * For each hash bucket in the lustre hash @lh call the passed callback
539  * @func until all the hash buckets are empty.  The passed callback @func
540  * or the previously registered callback lh->lh_put must remove the item
541  * from the hash.  You may either use the lustre_hash_del() or hlist_del()
542  * functions.  No rwlocks will be held during the callback @func it is
543  * safe to sleep if needed.  This function will not terminate until the
544  * hash is empty.  Note it is still possible to concurrently add new
545  * items in to the hash.  It is the callers responsibility to ensure
546  * the required locking is in place to prevent concurrent insertions.
547  */
548 void
549 lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data)
550 {
551         struct hlist_node    *hnode;
552         lustre_hash_bucket_t *lhb;
553         lustre_hash_bucket_t **lhb_last = NULL;
554         void                 *obj;
555         int                   i = 0;
556         ENTRY;
557
558 restart:
559         lh_read_lock(lh);
560         /* If the hash table has changed since we last held lh_rwlock,
561          * we need to start traversing the list from the start. */
562         if (lh->lh_buckets != lhb_last) {
563                 i = 0;
564                 lhb_last = lh->lh_buckets;
565         }
566         lh_for_each_bucket_restart(lh, lhb, i) {
567                 write_lock(&lhb->lhb_rwlock);
568                 while (!hlist_empty(&lhb->lhb_head)) {
569                         hnode =  lhb->lhb_head.first;
570                         __lustre_hash_bucket_validate(lh, lhb, hnode);
571                         obj = lh_get(lh, hnode);
572                         write_unlock(&lhb->lhb_rwlock);
573                         lh_read_unlock(lh);
574                         func(obj, data);
575                         (void)lh_put(lh, hnode);
576                         cfs_cond_resched();
577                         goto restart;
578                 }
579                 write_unlock(&lhb->lhb_rwlock);
580         }
581         lh_read_unlock(lh);
582         EXIT;
583 }
584 EXPORT_SYMBOL(lustre_hash_for_each_empty);
585
586 /*
587  * For each item in the lustre hash @lh which matches the @key call
588  * the passed callback @func and pass to it as an argument each hash
589  * item and the private @data.  Before each callback ops->lh_get will
590  * be called, and after each callback ops->lh_put will be called.
591  * Finally, during the callback the bucket lock is held so the
592  * callback must never sleep.
593    */
594 void
595 lustre_hash_for_each_key(lustre_hash_t *lh, void *key,
596                          lh_for_each_cb func, void *data)
597 {
598         struct hlist_node    *hnode;
599         lustre_hash_bucket_t *lhb;
600         unsigned              i;
601         ENTRY;
602
603         lh_read_lock(lh);
604         i = lh_hash(lh, key, lh->lh_cur_mask);
605         lhb = lh->lh_buckets[i];
606         LASSERT(i <= lh->lh_cur_mask);
607
608         read_lock(&lhb->lhb_rwlock);
609         hlist_for_each(hnode, &(lhb->lhb_head)) {
610                 __lustre_hash_bucket_validate(lh, lhb, hnode);
611
612                 if (!lh_compare(lh, key, hnode))
613                         continue;
614
615                 func(lh_get(lh, hnode), data);
616                 (void)lh_put(lh, hnode);
617         }
618
619         read_unlock(&lhb->lhb_rwlock);
620         lh_read_unlock(lh);
621
622         EXIT;
623 }
624 EXPORT_SYMBOL(lustre_hash_for_each_key);
625
626 /**
627  * Rehash the lustre hash @lh to the given @bits.  This can be used
628  * to grow the hash size when excessive chaining is detected, or to
629  * shrink the hash when it is larger than needed.  When the LH_REHASH
630  * flag is set in @lh the lustre hash may be dynamically rehashed
631  * during addition or removal if the hash's theta value exceeds
632  * either the lh->lh_min_theta or lh->max_theta values.  By default
633  * these values are tuned to keep the chained hash depth small, and
634  * this approach assumes a reasonably uniform hashing function.  The
635  * theta thresholds for @lh are tunable via lustre_hash_set_theta().
636  */
637 int
638 lustre_hash_rehash(lustre_hash_t *lh, int bits)
639 {
640         struct hlist_node     *hnode;
641         struct hlist_node     *pos;
642         lustre_hash_bucket_t **lh_buckets;
643         lustre_hash_bucket_t **rehash_buckets;
644         lustre_hash_bucket_t  *lh_lhb;
645         lustre_hash_bucket_t  *rehash_lhb;
646         int                    i;
647         int                    theta;
648         int                    lh_mask;
649         int                    lh_bits;
650         int                    mask = (1 << bits) - 1;
651         int                    rc = 0;
652         void                  *key;
653         ENTRY;
654
655         LASSERT(!in_interrupt());
656         LASSERT(mask > 0);
657         LASSERT((lh->lh_flags & LH_REHASH) != 0);
658
659         LIBCFS_ALLOC(rehash_buckets, sizeof(*rehash_buckets) << bits);
660         if (!rehash_buckets)
661                 RETURN(-ENOMEM);
662
663         for (i = 0; i <= mask; i++) {
664                 LIBCFS_ALLOC(rehash_buckets[i], sizeof(*rehash_buckets[i]));
665                 if (rehash_buckets[i] == NULL)
666                         GOTO(free, rc = -ENOMEM);
667
668                 INIT_HLIST_HEAD(&rehash_buckets[i]->lhb_head);
669                 rwlock_init(&rehash_buckets[i]->lhb_rwlock);
670                 atomic_set(&rehash_buckets[i]->lhb_count, 0);
671         }
672
673         lh_write_lock(lh);
674
675         /*
676          * Early return for multiple concurrent racing callers,
677          * ensure we only trigger the rehash if it is still needed.
678          */
679         theta = __lustre_hash_theta(lh);
680         if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
681                 lh_write_unlock(lh);
682                 GOTO(free, rc = -EALREADY);
683         }
684
685         lh_bits = lh->lh_cur_bits;
686         lh_buckets = lh->lh_buckets;
687         lh_mask = (1 << lh_bits) - 1;
688
689         lh->lh_cur_bits = bits;
690         lh->lh_cur_mask = (1 << bits) - 1;
691         lh->lh_buckets = rehash_buckets;
692         atomic_inc(&lh->lh_rehash_count);
693
694         for (i = 0; i <= lh_mask; i++) {
695                 lh_lhb = lh_buckets[i];
696
697                 write_lock(&lh_lhb->lhb_rwlock);
698                 hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
699                         key = lh_key(lh, hnode);
700                         LASSERT(key);
701
702                         /*
703                          * Validate hnode is in the correct bucket.
704                          */
705                         if (unlikely(lh->lh_flags & LH_DEBUG))
706                                 LASSERT(lh_hash(lh, key, lh_mask) == i);
707
708                         /*
709                          * Delete from old hash bucket.
710                          */
711                         hlist_del(hnode);
712                         LASSERT(atomic_read(&lh_lhb->lhb_count) > 0);
713                         atomic_dec(&lh_lhb->lhb_count);
714
715                         /*
716                          * Add to rehash bucket, ops->lh_key must be defined.
717                          */
718                         rehash_lhb = rehash_buckets[lh_hash(lh, key, mask)];
719                         hlist_add_head(hnode, &(rehash_lhb->lhb_head));
720                         atomic_inc(&rehash_lhb->lhb_count);
721                 }
722
723                 LASSERT(hlist_empty(&(lh_lhb->lhb_head)));
724                 LASSERT(atomic_read(&lh_lhb->lhb_count) == 0);
725                 write_unlock(&lh_lhb->lhb_rwlock);
726         }
727
728         lh_write_unlock(lh);
729         rehash_buckets = lh_buckets;
730         i = (1 << lh_bits);
731         bits = lh_bits;
732 free:
733         while (i-- > 0)
734                 LIBCFS_FREE(rehash_buckets[i], sizeof(*rehash_buckets[i]));
735         LIBCFS_FREE(rehash_buckets, sizeof(*rehash_buckets) << bits);
736
737         RETURN(rc);
738 }
739 EXPORT_SYMBOL(lustre_hash_rehash);
740
741 /**
742  * Rehash the object referenced by @hnode in the lustre hash @lh.  The
743  * @old_key must be provided to locate the objects previous location
744  * in the hash, and the @new_key will be used to reinsert the object.
745  * Use this function instead of a lustre_hash_add() + lustre_hash_del()
746  * combo when it is critical that there is no window in time where the
747  * object is missing from the hash.  When an object is being rehashed
748  * the registered lh_get() and lh_put() functions will not be called.
749  */
750 void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key,
751                             struct hlist_node *hnode)
752 {
753         lustre_hash_bucket_t  *old_lhb;
754         lustre_hash_bucket_t  *new_lhb;
755         unsigned               i;
756         unsigned               j;
757         ENTRY;
758
759         __lustre_hash_key_validate(lh, new_key, hnode);
760         LASSERT(!hlist_unhashed(hnode));
761
762         lh_read_lock(lh);
763
764         i = lh_hash(lh, old_key, lh->lh_cur_mask);
765         old_lhb = lh->lh_buckets[i];
766         LASSERT(i <= lh->lh_cur_mask);
767
768         j = lh_hash(lh, new_key, lh->lh_cur_mask);
769         new_lhb = lh->lh_buckets[j];
770         LASSERT(j <= lh->lh_cur_mask);
771
772         if (i < j) { /* write_lock ordering */
773                 write_lock(&old_lhb->lhb_rwlock);
774                 write_lock(&new_lhb->lhb_rwlock);
775         } else if (i > j) {
776                 write_lock(&new_lhb->lhb_rwlock);
777                 write_lock(&old_lhb->lhb_rwlock);
778         } else { /* do nothing */
779                 lh_read_unlock(lh);
780                 EXIT;
781                 return;
782         }
783
784         /*
785          * Migrate item between hash buckets without calling
786          * the lh_get() and lh_put() callback functions.
787          */
788         hlist_del(hnode);
789         LASSERT(atomic_read(&old_lhb->lhb_count) > 0);
790         atomic_dec(&old_lhb->lhb_count);
791         hlist_add_head(hnode, &(new_lhb->lhb_head));
792         atomic_inc(&new_lhb->lhb_count);
793
794         write_unlock(&new_lhb->lhb_rwlock);
795         write_unlock(&old_lhb->lhb_rwlock);
796         lh_read_unlock(lh);
797
798         EXIT;
799 }
800 EXPORT_SYMBOL(lustre_hash_rehash_key);
801
802 int lustre_hash_debug_header(char *str, int size)
803 {
804         return snprintf(str, size,
805                  "%-*s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n", LUSTRE_MAX_HASH_NAME,
806                  "name", "cur", "min", "max", "theta", "t-min", "t-max",
807                  "flags", "rehash", "count", " distribution");
808 }
809 EXPORT_SYMBOL(lustre_hash_debug_header);
810
811 int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size)
812 {
813         lustre_hash_bucket_t  *lhb;
814         int                    theta;
815         int                    i;
816         int                    c = 0;
817         int                    dist[8] = { 0, };
818
819         if (str == NULL || size == 0)
820                 return 0;
821
822         lh_read_lock(lh);
823         theta = __lustre_hash_theta(lh);
824
825         c += snprintf(str + c, size - c, "%-*s ",
826                       LUSTRE_MAX_HASH_NAME, lh->lh_name);
827         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_cur_bits);
828         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_min_bits);
829         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_max_bits);
830         c += snprintf(str + c, size - c, "%d.%03d ",
831                       __lustre_hash_theta_int(theta),
832                       __lustre_hash_theta_frac(theta));
833         c += snprintf(str + c, size - c, "%d.%03d ",
834                       __lustre_hash_theta_int(lh->lh_min_theta),
835                       __lustre_hash_theta_frac(lh->lh_min_theta));
836         c += snprintf(str + c, size - c, "%d.%03d ",
837                       __lustre_hash_theta_int(lh->lh_max_theta),
838                       __lustre_hash_theta_frac(lh->lh_max_theta));
839         c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags);
840         c += snprintf(str + c, size - c, "%6d ",
841                       atomic_read(&lh->lh_rehash_count));
842         c += snprintf(str + c, size - c, "%5d ",
843                       atomic_read(&lh->lh_count));
844
845         /*
846          * The distribution is a summary of the chained hash depth in
847          * each of the lustre hash buckets.  Each buckets lhb_count is
848          * divided by the hash theta value and used to generate a
849          * histogram of the hash distribution.  A uniform hash will
850          * result in all hash buckets being close to the average thus
851          * only the first few entries in the histogram will be non-zero.
852          * If you hash function results in a non-uniform hash the will
853          * be observable by outlier bucks in the distribution histogram.
854          *
855          * Uniform hash distribution:      128/128/0/0/0/0/0/0
856          * Non-Uniform hash distribution:  128/125/0/0/0/0/2/1
857          */
858         lh_for_each_bucket(lh, lhb, i)
859                 dist[min(__fls(atomic_read(&lhb->lhb_count)/max(theta,1)),7)]++;
860
861         for (i = 0; i < 8; i++)
862                 c += snprintf(str + c, size - c, "%d%c",  dist[i],
863                               (i == 7) ? '\n' : '/');
864
865         lh_read_unlock(lh);
866
867         return c;
868 }
869 EXPORT_SYMBOL(lustre_hash_debug_str);