Whamcloud - gitweb
b=21275
[fs/lustre-release.git] / lustre / obdclass / class_hash.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/class_hash.c
37  *
38  * Implement a hash class for hash process in lustre system.
39  *
40  * Author: YuZhangyong <yzy@clusterfs.com>
41  *
42  * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov>
43  * - Simplified API and improved documentation
44  * - Added per-hash feature flags:
45  *   * LH_DEBUG additional validation
46  *   * LH_REHASH dynamic rehashing
47  * - Added per-hash statistics
48  * - General performance enhancements
49  */
50
51 #ifndef __KERNEL__
52 #include <liblustre.h>
53 #include <obd.h>
54 #endif
55
56 #include <class_hash.h>
57
58 static void
59 lh_read_lock(lustre_hash_t *lh)
60 {
61         if ((lh->lh_flags & LH_REHASH) != 0)
62                 read_lock(&lh->lh_rwlock);
63 }
64
65 static void
66 lh_read_unlock(lustre_hash_t *lh)
67 {
68         if ((lh->lh_flags & LH_REHASH) != 0)
69                 read_unlock(&lh->lh_rwlock);
70 }
71
72 static void
73 lh_write_lock(lustre_hash_t *lh)
74 {
75         if ((lh->lh_flags & LH_REHASH) != 0)
76                 write_lock(&lh->lh_rwlock);
77 }
78
79 static void
80 lh_write_unlock(lustre_hash_t *lh)
81 {
82         if ((lh->lh_flags & LH_REHASH) != 0)
83                 write_unlock(&lh->lh_rwlock);
84 }
85
86 /**
87  * Initialize new lustre hash, where:
88  * \param name Descriptive hash name
89  * \param cur_bits Initial hash table size, in bits
90  * \param max_bits Maximum allowed hash table resize, in bits
91  * \param ops Registered hash table operations
92  * \param flags LH_REHASH enables dynamic hash resizing
93  *              LH_SORT enables chained hash sort
94  */
95 lustre_hash_t *
96 lustre_hash_init(char *name, unsigned int cur_bits, unsigned int max_bits,
97                  lustre_hash_ops_t *ops, int flags)
98 {
99         lustre_hash_t *lh;
100         int            i;
101         ENTRY;
102
103         LASSERT(name != NULL);
104         LASSERT(ops != NULL);
105
106         LASSERT(cur_bits > 0);
107         LASSERT(max_bits >= cur_bits);
108         LASSERT(max_bits < 31);
109
110         LIBCFS_ALLOC(lh, sizeof(*lh));
111         if (!lh)
112                 RETURN(NULL);
113
114         strncpy(lh->lh_name, name, sizeof(lh->lh_name));
115         lh->lh_name[sizeof(lh->lh_name) - 1] = '\0';
116         atomic_set(&lh->lh_rehash_count, 0);
117         atomic_set(&lh->lh_count, 0);
118         rwlock_init(&lh->lh_rwlock);
119         lh->lh_cur_bits = cur_bits;
120         lh->lh_cur_mask = (1 << cur_bits) - 1;
121         lh->lh_min_bits = cur_bits;
122         lh->lh_max_bits = max_bits;
123         /* XXX: need to fixup lustre_hash_rehash_bits() before this can be
124          *      anything other than 0.5 and 2.0 */
125         lh->lh_min_theta = 1 << (LH_THETA_BITS - 1);
126         lh->lh_max_theta = 1 << (LH_THETA_BITS + 1);
127         lh->lh_ops = ops;
128         lh->lh_flags = flags;
129         if (cur_bits != max_bits && (lh->lh_flags & LH_REHASH) == 0)
130                 CWARN("Rehash is disabled, ignore max_bits %d\n", max_bits);
131
132         /* theta * 1000 */
133         __lustre_hash_set_theta(lh, 500, 2000);
134
135         LIBCFS_ALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
136         if (lh->lh_buckets == NULL) {
137                 LIBCFS_FREE(lh, sizeof(*lh));
138                 RETURN(NULL);
139         }
140
141         for (i = 0; i <= lh->lh_cur_mask; i++) {
142                 LIBCFS_ALLOC(lh->lh_buckets[i], sizeof(lustre_hash_bucket_t));
143                 if (lh->lh_buckets[i] == NULL) {
144                         lustre_hash_exit(lh);
145                         return NULL;
146                 }
147
148                 INIT_HLIST_HEAD(&lh->lh_buckets[i]->lhb_head);
149                 rwlock_init(&lh->lh_buckets[i]->lhb_rwlock);
150                 atomic_set(&lh->lh_buckets[i]->lhb_count, 0);
151         }
152
153         return lh;
154 }
155 EXPORT_SYMBOL(lustre_hash_init);
156
157 /**
158  * Cleanup lustre hash \a lh.
159  */
160 void
161 lustre_hash_exit(lustre_hash_t *lh)
162 {
163         lustre_hash_bucket_t *lhb;
164         struct hlist_node    *hnode;
165         struct hlist_node    *pos;
166         int                   i;
167         ENTRY;
168
169         LASSERT(lh != NULL);
170
171         lh_write_lock(lh);
172
173         lh_for_each_bucket(lh, lhb, i) {
174                 if (lhb == NULL)
175                         continue;
176
177                 write_lock(&lhb->lhb_rwlock);
178                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
179                         __lustre_hash_bucket_validate(lh, lhb, hnode);
180                         __lustre_hash_bucket_del(lh, lhb, hnode);
181                         lh_exit(lh, hnode);
182                 }
183
184                 LASSERT(hlist_empty(&(lhb->lhb_head)));
185                 LASSERT(atomic_read(&lhb->lhb_count) == 0);
186                 write_unlock(&lhb->lhb_rwlock);
187                 LIBCFS_FREE(lhb, sizeof(*lhb));
188         }
189
190         LASSERT(atomic_read(&lh->lh_count) == 0);
191         lh_write_unlock(lh);
192
193         LIBCFS_FREE(lh->lh_buckets, sizeof(*lh->lh_buckets) << lh->lh_cur_bits);
194         LIBCFS_FREE(lh, sizeof(*lh));
195         EXIT;
196 }
197 EXPORT_SYMBOL(lustre_hash_exit);
198
199 static inline unsigned int lustre_hash_rehash_bits(lustre_hash_t *lh)
200 {
201         if (!(lh->lh_flags & LH_REHASH))
202                 return 0;
203
204         /* XXX: need to handle case with max_theta != 2.0
205          *      and the case with min_theta != 0.5 */
206         if ((lh->lh_cur_bits < lh->lh_max_bits) &&
207             (__lustre_hash_theta(lh) > lh->lh_max_theta))
208                 return lh->lh_cur_bits + 1;
209
210         if ((lh->lh_cur_bits > lh->lh_min_bits) &&
211             (__lustre_hash_theta(lh) < lh->lh_min_theta))
212                 return lh->lh_cur_bits - 1;
213
214         return 0;
215 }
216
217 /**
218  * Add item \a hnode to lustre hash \a lh using \a key.  The registered
219  * ops->lh_get function will be called when the item is added.
220  */
221 void
222 lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
223 {
224         lustre_hash_bucket_t *lhb;
225         int                   bits;
226         unsigned              i;
227         ENTRY;
228
229         __lustre_hash_key_validate(lh, key, hnode);
230
231         lh_read_lock(lh);
232         i = lh_hash(lh, key, lh->lh_cur_mask);
233         lhb = lh->lh_buckets[i];
234         LASSERT(i <= lh->lh_cur_mask);
235         LASSERT(hlist_unhashed(hnode));
236
237         write_lock(&lhb->lhb_rwlock);
238         __lustre_hash_bucket_add(lh, lhb, hnode);
239         write_unlock(&lhb->lhb_rwlock);
240
241         bits = lustre_hash_rehash_bits(lh);
242         lh_read_unlock(lh);
243         if (bits)
244                 lustre_hash_rehash(lh, bits);
245
246         EXIT;
247 }
248 EXPORT_SYMBOL(lustre_hash_add);
249
250 static struct hlist_node *
251 lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
252                                  struct hlist_node *hnode)
253 {
254         int                   bits = 0;
255         struct hlist_node    *ehnode;
256         lustre_hash_bucket_t *lhb;
257         unsigned              i;
258         ENTRY;
259
260         __lustre_hash_key_validate(lh, key, hnode);
261
262         lh_read_lock(lh);
263         i = lh_hash(lh, key, lh->lh_cur_mask);
264         lhb = lh->lh_buckets[i];
265         LASSERT(i <= lh->lh_cur_mask);
266         LASSERT(hlist_unhashed(hnode));
267
268         write_lock(&lhb->lhb_rwlock);
269         ehnode = __lustre_hash_bucket_lookup(lh, lhb, key);
270         if (ehnode) {
271                 lh_get(lh, ehnode);
272         } else {
273                 __lustre_hash_bucket_add(lh, lhb, hnode);
274                 ehnode = hnode;
275                 bits = lustre_hash_rehash_bits(lh);
276         }
277         write_unlock(&lhb->lhb_rwlock);
278         lh_read_unlock(lh);
279         if (bits)
280                 lustre_hash_rehash(lh, bits);
281
282         RETURN(ehnode);
283 }
284
285 /**
286  * Add item \a hnode to lustre hash \a lh using \a key.  The registered
287  * ops->lh_get function will be called if the item was added.
288  * Returns 0 on success or -EALREADY on key collisions.
289  */
290 int
291 lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
292 {
293         struct hlist_node    *ehnode;
294         ENTRY;
295
296         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
297         if (ehnode != hnode) {
298                 lh_put(lh, ehnode);
299                 RETURN(-EALREADY);
300         }
301         RETURN(0);
302 }
303 EXPORT_SYMBOL(lustre_hash_add_unique);
304
305 /**
306  * Add item \a hnode to lustre hash \a lh using \a key.  If this \a key
307  * already exists in the hash then ops->lh_get will be called on the
308  * conflicting entry and that entry will be returned to the caller.
309  * Otherwise ops->lh_get is called on the item which was added.
310  */
311 void *
312 lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
313                            struct hlist_node *hnode)
314 {
315         struct hlist_node    *ehnode;
316         void                 *obj;
317         ENTRY;
318
319         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
320         obj = lh_get(lh, ehnode);
321         lh_put(lh, ehnode);
322         RETURN(obj);
323 }
324 EXPORT_SYMBOL(lustre_hash_findadd_unique);
325
326 /**
327  * Delete item \a hnode from the lustre hash \a lh using \a key.  The \a key
328  * is required to ensure the correct hash bucket is locked since there
329  * is no direct linkage from the item to the bucket.  The object
330  * removed from the hash will be returned and obs->lh_put is called
331  * on the removed object.
332  */
333 void *
334 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
335 {
336         lustre_hash_bucket_t *lhb;
337         unsigned              i;
338         void                 *obj;
339         ENTRY;
340
341         __lustre_hash_key_validate(lh, key, hnode);
342
343         lh_read_lock(lh);
344         i = lh_hash(lh, key, lh->lh_cur_mask);
345         lhb = lh->lh_buckets[i];
346         LASSERT(i <= lh->lh_cur_mask);
347         LASSERT(!hlist_unhashed(hnode));
348
349         write_lock(&lhb->lhb_rwlock);
350         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
351         write_unlock(&lhb->lhb_rwlock);
352         lh_read_unlock(lh);
353
354         RETURN(obj);
355 }
356 EXPORT_SYMBOL(lustre_hash_del);
357
358 /**
359  * Delete item given \a key in lustre hash \a lh.  The first \a key found in
360  * the hash will be removed, if the key exists multiple times in the hash
361  * \a lh this function must be called once per key.  The removed object
362  * will be returned and ops->lh_put is called on the removed object.
363  */
364 void *
365 lustre_hash_del_key(lustre_hash_t *lh, void *key)
366 {
367         struct hlist_node    *hnode;
368         lustre_hash_bucket_t *lhb;
369         unsigned              i;
370         void                 *obj = NULL;
371         ENTRY;
372
373         lh_read_lock(lh);
374         i = lh_hash(lh, key, lh->lh_cur_mask);
375         lhb = lh->lh_buckets[i];
376         LASSERT(i <= lh->lh_cur_mask);
377
378         write_lock(&lhb->lhb_rwlock);
379         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
380         if (hnode)
381                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
382
383         write_unlock(&lhb->lhb_rwlock);
384         lh_read_unlock(lh);
385
386         RETURN(obj);
387 }
388 EXPORT_SYMBOL(lustre_hash_del_key);
389
390 /**
391  * Lookup an item using \a key in the lustre hash \a lh and return it.
392  * If the \a key is found in the hash lh->lh_get() is called and the
393  * matching objects is returned.  It is the callers responsibility
394  * to call the counterpart ops->lh_put using the lh_put() macro
395  * when when finished with the object.  If the \a key was not found
396  * in the hash \a lh NULL is returned.
397  */
398 void *
399 lustre_hash_lookup(lustre_hash_t *lh, void *key)
400 {
401         struct hlist_node    *hnode;
402         lustre_hash_bucket_t *lhb;
403         unsigned              i;
404         void                 *obj = NULL;
405         ENTRY;
406
407         lh_read_lock(lh);
408         i = lh_hash(lh, key, lh->lh_cur_mask);
409         lhb = lh->lh_buckets[i];
410         LASSERT(i <= lh->lh_cur_mask);
411
412         read_lock(&lhb->lhb_rwlock);
413         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
414         if (hnode)
415                 obj = lh_get(lh, hnode);
416
417         read_unlock(&lhb->lhb_rwlock);
418         lh_read_unlock(lh);
419
420         RETURN(obj);
421 }
422 EXPORT_SYMBOL(lustre_hash_lookup);
423
424 /**
425  * For each item in the lustre hash \a lh call the passed callback \a func
426  * and pass to it as an argument each hash item and the private \a data.
427  * Before each callback ops->lh_get will be called, and after each
428  * callback ops->lh_put will be called.  Finally, during the callback
429  * the bucket lock is held so the callback must never sleep.
430  */
431 void
432 lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data)
433 {
434         struct hlist_node    *hnode;
435         lustre_hash_bucket_t *lhb;
436         void                 *obj;
437         int                   i;
438         ENTRY;
439
440         lh_read_lock(lh);
441         lh_for_each_bucket(lh, lhb, i) {
442                 read_lock(&lhb->lhb_rwlock);
443                 hlist_for_each(hnode, &(lhb->lhb_head)) {
444                         __lustre_hash_bucket_validate(lh, lhb, hnode);
445                         obj = lh_get(lh, hnode);
446                         func(obj, data);
447                         (void)lh_put(lh, hnode);
448                 }
449                 read_unlock(&lhb->lhb_rwlock);
450         }
451         lh_read_unlock(lh);
452
453         EXIT;
454 }
455 EXPORT_SYMBOL(lustre_hash_for_each);
456
457 /**
458  * For each item in the lustre hash \a lh call the passed callback \a func
459  * and pass to it as an argument each hash item and the private \a data.
460  * Before each callback ops->lh_get will be called, and after each
461  * callback ops->lh_put will be called.  During the callback the
462  * bucket lock will not be held will allows for the current item
463  * to be removed from the hash during the callback.  However, care
464  * should be taken to prevent other callers from operating on the
465  * hash concurrently or list corruption may occur.
466  */
467 void
468 lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data)
469 {
470         struct hlist_node    *hnode;
471         struct hlist_node    *pos;
472         lustre_hash_bucket_t *lhb;
473         void                 *obj;
474         int                   i;
475         ENTRY;
476
477         lh_read_lock(lh);
478         lh_for_each_bucket(lh, lhb, i) {
479                 read_lock(&lhb->lhb_rwlock);
480                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
481                         __lustre_hash_bucket_validate(lh, lhb, hnode);
482                         obj = lh_get(lh, hnode);
483                         read_unlock(&lhb->lhb_rwlock);
484                         func(obj, data);
485                         read_lock(&lhb->lhb_rwlock);
486                         (void)lh_put(lh, hnode);
487                 }
488                 read_unlock(&lhb->lhb_rwlock);
489         }
490         lh_read_unlock(lh);
491         EXIT;
492 }
493 EXPORT_SYMBOL(lustre_hash_for_each_safe);
494
495 /**
496  * For each hash bucket in the lustre hash \a lh call the passed callback
497  * \a func until all the hash buckets are empty.  The passed callback \a func
498  * or the previously registered callback lh->lh_put must remove the item
499  * from the hash.  You may either use the lustre_hash_del() or hlist_del()
500  * functions.  No rwlocks will be held during the callback \a func it is
501  * safe to sleep if needed.  This function will not terminate until the
502  * hash is empty.  Note it is still possible to concurrently add new
503  * items in to the hash.  It is the callers responsibility to ensure
504  * the required locking is in place to prevent concurrent insertions.
505  */
506 void
507 lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data)
508 {
509         struct hlist_node    *hnode;
510         lustre_hash_bucket_t *lhb;
511         void                 *obj;
512         int                   i;
513         ENTRY;
514
515 restart:
516         lh_read_lock(lh);
517         lh_for_each_bucket(lh, lhb, i) {
518                 write_lock(&lhb->lhb_rwlock);
519                 while (!hlist_empty(&lhb->lhb_head)) {
520                         hnode =  lhb->lhb_head.first;
521                         __lustre_hash_bucket_validate(lh, lhb, hnode);
522                         obj = lh_get(lh, hnode);
523                         write_unlock(&lhb->lhb_rwlock);
524                         lh_read_unlock(lh);
525                         func(obj, data);
526                         (void)lh_put(lh, hnode);
527                         goto restart;
528                 }
529                 write_unlock(&lhb->lhb_rwlock);
530         }
531         lh_read_unlock(lh);
532         EXIT;
533 }
534 EXPORT_SYMBOL(lustre_hash_for_each_empty);
535
536 /*
537  * For each item in the lustre hash \a lh which matches the \a key call
538  * the passed callback \a func and pass to it as an argument each hash
539  * item and the private \a data.  Before each callback ops->lh_get will
540  * be called, and after each callback ops->lh_put will be called.
541  * Finally, during the callback the bucket lock is held so the
542  * callback must never sleep.
543    */
544 void
545 lustre_hash_for_each_key(lustre_hash_t *lh, void *key,
546                          lh_for_each_cb func, void *data)
547 {
548         struct hlist_node    *hnode;
549         lustre_hash_bucket_t *lhb;
550         unsigned              i;
551         ENTRY;
552
553         lh_read_lock(lh);
554         i = lh_hash(lh, key, lh->lh_cur_mask);
555         lhb = lh->lh_buckets[i];
556         LASSERT(i <= lh->lh_cur_mask);
557
558         read_lock(&lhb->lhb_rwlock);
559         hlist_for_each(hnode, &(lhb->lhb_head)) {
560                 __lustre_hash_bucket_validate(lh, lhb, hnode);
561
562                 if (!lh_compare(lh, key, hnode))
563                         continue;
564
565                 func(lh_get(lh, hnode), data);
566                 (void)lh_put(lh, hnode);
567         }
568
569         read_unlock(&lhb->lhb_rwlock);
570         lh_read_unlock(lh);
571
572         EXIT;
573 }
574 EXPORT_SYMBOL(lustre_hash_for_each_key);
575
576 /**
577  * Rehash the lustre hash \a lh to the given \a bits.  This can be used
578  * to grow the hash size when excessive chaining is detected, or to
579  * shrink the hash when it is larger than needed.  When the LH_REHASH
580  * flag is set in \a lh the lustre hash may be dynamically rehashed
581  * during addition or removal if the hash's theta value exceeds
582  * either the lh->lh_min_theta or lh->max_theta values.  By default
583  * these values are tuned to keep the chained hash depth small, and
584  * this approach assumes a reasonably uniform hashing function.  The
585  * theta thresholds for \a lh are tunable via lustre_hash_set_theta().
586  */
587 int
588 lustre_hash_rehash(lustre_hash_t *lh, int bits)
589 {
590         struct hlist_node     *hnode;
591         struct hlist_node     *pos;
592         lustre_hash_bucket_t **lh_buckets;
593         lustre_hash_bucket_t **rehash_buckets;
594         lustre_hash_bucket_t  *lh_lhb;
595         lustre_hash_bucket_t  *rehash_lhb;
596         int                    i;
597         int                    theta;
598         int                    lh_mask;
599         int                    lh_bits;
600         int                    mask = (1 << bits) - 1;
601         int                    rc = 0;
602         void                  *key;
603         ENTRY;
604
605         LASSERT(!in_interrupt());
606         LASSERT(mask > 0);
607         LASSERT((lh->lh_flags & LH_REHASH) != 0);
608
609         LIBCFS_ALLOC(rehash_buckets, sizeof(*rehash_buckets) << bits);
610         if (!rehash_buckets)
611                 RETURN(-ENOMEM);
612
613         for (i = 0; i <= mask; i++) {
614                 LIBCFS_ALLOC(rehash_buckets[i], sizeof(*rehash_buckets[i]));
615                 if (rehash_buckets[i] == NULL)
616                         GOTO(free, rc = -ENOMEM);
617
618                 INIT_HLIST_HEAD(&rehash_buckets[i]->lhb_head);
619                 rwlock_init(&rehash_buckets[i]->lhb_rwlock);
620                 atomic_set(&rehash_buckets[i]->lhb_count, 0);
621         }
622
623         lh_write_lock(lh);
624
625         /*
626          * Early return for multiple concurrent racing callers,
627          * ensure we only trigger the rehash if it is still needed.
628          */
629         theta = __lustre_hash_theta(lh);
630         if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
631                 lh_write_unlock(lh);
632                 GOTO(free, rc = -EALREADY);
633         }
634
635         lh_bits = lh->lh_cur_bits;
636         lh_buckets = lh->lh_buckets;
637         lh_mask = (1 << lh_bits) - 1;
638
639         lh->lh_cur_bits = bits;
640         lh->lh_cur_mask = (1 << bits) - 1;
641         lh->lh_buckets = rehash_buckets;
642         atomic_inc(&lh->lh_rehash_count);
643
644         for (i = 0; i <= lh_mask; i++) {
645                 lh_lhb = lh_buckets[i];
646
647                 write_lock(&lh_lhb->lhb_rwlock);
648                 hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
649                         key = lh_key(lh, hnode);
650                         LASSERT(key);
651
652                         /*
653                          * Validate hnode is in the correct bucket.
654                          */
655                         if (unlikely(lh->lh_flags & LH_DEBUG))
656                                 LASSERT(lh_hash(lh, key, lh_mask) == i);
657
658                         /*
659                          * Delete from old hash bucket.
660                          */
661                         hlist_del(hnode);
662                         LASSERT(atomic_read(&lh_lhb->lhb_count) > 0);
663                         atomic_dec(&lh_lhb->lhb_count);
664
665                         /*
666                          * Add to rehash bucket, ops->lh_key must be defined.
667                          */
668                         rehash_lhb = rehash_buckets[lh_hash(lh, key, mask)];
669                         hlist_add_head(hnode, &(rehash_lhb->lhb_head));
670                         atomic_inc(&rehash_lhb->lhb_count);
671                 }
672
673                 LASSERT(hlist_empty(&(lh_lhb->lhb_head)));
674                 LASSERT(atomic_read(&lh_lhb->lhb_count) == 0);
675                 write_unlock(&lh_lhb->lhb_rwlock);
676         }
677
678         lh_write_unlock(lh);
679         rehash_buckets = lh_buckets;
680         i = (1 << lh_bits) - 1;
681         bits = lh_bits;
682  free:
683         while (i-- > 0)
684                 LIBCFS_FREE(rehash_buckets[i], sizeof(*rehash_buckets[i]));
685         LIBCFS_FREE(rehash_buckets, sizeof(*rehash_buckets) << bits);
686         RETURN(rc);
687 }
688 EXPORT_SYMBOL(lustre_hash_rehash);
689
690 /**
691  * Rehash the object referenced by \a hnode in the lustre hash \a lh.  The
692  * \a old_key must be provided to locate the objects previous location
693  * in the hash, and the \a new_key will be used to reinsert the object.
694  * Use this function instead of a lustre_hash_add() + lustre_hash_del()
695  * combo when it is critical that there is no window in time where the
696  * object is missing from the hash.  When an object is being rehashed
697  * the registered lh_get() and lh_put() functions will not be called.
698  */
699 void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key,
700                             struct hlist_node *hnode)
701 {
702         lustre_hash_bucket_t  *old_lhb;
703         lustre_hash_bucket_t  *new_lhb;
704         unsigned               i;
705         unsigned               j;
706         ENTRY;
707
708         __lustre_hash_key_validate(lh, new_key, hnode);
709         LASSERT(!hlist_unhashed(hnode));
710
711         lh_read_lock(lh);
712
713         i = lh_hash(lh, old_key, lh->lh_cur_mask);
714         old_lhb = lh->lh_buckets[i];
715         LASSERT(i <= lh->lh_cur_mask);
716
717         j = lh_hash(lh, new_key, lh->lh_cur_mask);
718         new_lhb = lh->lh_buckets[j];
719         LASSERT(j <= lh->lh_cur_mask);
720
721         if (i < j) { /* write_lock ordering */
722                 write_lock(&old_lhb->lhb_rwlock);
723                 write_lock(&new_lhb->lhb_rwlock);
724         } else if (i > j) {
725                 write_lock(&new_lhb->lhb_rwlock);
726                 write_lock(&old_lhb->lhb_rwlock);
727         } else { /* do nothing */
728                 read_unlock(&lh->lh_rwlock);
729                 EXIT;
730                 return;
731         }
732
733         /*
734          * Migrate item between hash buckets without calling
735          * the lh_get() and lh_put() callback functions.
736          */
737         hlist_del(hnode);
738         LASSERT(atomic_read(&old_lhb->lhb_count) > 0);
739         atomic_dec(&old_lhb->lhb_count);
740         hlist_add_head(hnode, &(new_lhb->lhb_head));
741         atomic_inc(&new_lhb->lhb_count);
742
743         write_unlock(&new_lhb->lhb_rwlock);
744         write_unlock(&old_lhb->lhb_rwlock);
745         lh_read_unlock(lh);
746
747         EXIT;
748 }
749 EXPORT_SYMBOL(lustre_hash_rehash_key);
750
751 int lustre_hash_debug_header(char *str, int size)
752 {
753         return snprintf(str, size,
754                  "%-*s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n", LUSTRE_MAX_HASH_NAME,
755                  "name", "cur", "min", "max", "theta", "t-min", "t-max",
756                  "flags", "rehash", "count", " distribution");
757 }
758 EXPORT_SYMBOL(lustre_hash_debug_header);
759
760 int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size)
761 {
762         lustre_hash_bucket_t  *lhb;
763         int                    theta;
764         int                    i;
765         int                    c = 0;
766         int                    dist[8] = { 0, };
767
768         if (str == NULL || size == 0)
769                 return 0;
770
771         lh_read_lock(lh);
772         theta = __lustre_hash_theta(lh);
773
774         c += snprintf(str + c, size - c, "%-*s ",
775                       LUSTRE_MAX_HASH_NAME, lh->lh_name);
776         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_cur_bits);
777         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_min_bits);
778         c += snprintf(str + c, size - c, "%5d ",  1 << lh->lh_max_bits);
779         c += snprintf(str + c, size - c, "%d.%03d ",
780                       __lustre_hash_theta_int(theta),
781                       __lustre_hash_theta_frac(theta));
782         c += snprintf(str + c, size - c, "%d.%03d ",
783                       __lustre_hash_theta_int(lh->lh_min_theta),
784                       __lustre_hash_theta_frac(lh->lh_min_theta));
785         c += snprintf(str + c, size - c, "%d.%03d ",
786                       __lustre_hash_theta_int(lh->lh_max_theta),
787                       __lustre_hash_theta_frac(lh->lh_max_theta));
788         c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags);
789         c += snprintf(str + c, size - c, "%6d ",
790                       atomic_read(&lh->lh_rehash_count));
791         c += snprintf(str + c, size - c, "%5d ",
792                       atomic_read(&lh->lh_count));
793
794         /*
795          * The distribution is a summary of the chained hash depth in
796          * each of the lustre hash buckets.  Each buckets lhb_count is
797          * divided by the hash theta value and used to generate a
798          * histogram of the hash distribution.  A uniform hash will
799          * result in all hash buckets being close to the average thus
800          * only the first few entries in the histogram will be non-zero.
801          * If you hash function results in a non-uniform hash the will
802          * be observable by outlier bucks in the distribution histogram.
803          *
804          * Uniform hash distribution:      128/128/0/0/0/0/0/0
805          * Non-Uniform hash distribution:  128/125/0/0/0/0/2/1
806          */
807         lh_for_each_bucket(lh, lhb, i)
808                 dist[min(__fls(atomic_read(&lhb->lhb_count)/max(theta,1)),7)]++;
809
810         for (i = 0; i < 8; i++)
811                 c += snprintf(str + c, size - c, "%d%c",  dist[i],
812                               (i == 7) ? '\n' : '/');
813
814         lh_read_unlock(lh);
815
816         return c;
817 }
818 EXPORT_SYMBOL(lustre_hash_debug_str);