Whamcloud - gitweb
0dd18f137d7673827bc46c8598fb58fd3590b87c
[fs/lustre-release.git] / lustre / obdclass / class_hash.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/class_hash.c
37  *
38  * Implement a hash class for hash process in lustre system.
39  *
40  * Author: YuZhangyong <yzy@clusterfs.com>
41  *
42  * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov>
43  * - Simplified API and improved documentation
44  * - Added per-hash feature flags:
45  *   * LH_DEBUG additional validation
46  *   * LH_REHASH dynamic rehashing
47  * - Added per-hash statistics
48  * - General performance enhancements
49  */
50
51 #ifndef __KERNEL__
52 #include <liblustre.h>
53 #include <obd.h>
54 #endif
55
56 #include <class_hash.h>
57
58 /**
59  * Initialize new lustre hash, where:
60  * @name     - Descriptive hash name
61  * @cur_size - Initial hash table size
62  * @max_size - Maximum allowed hash table resize
63  * @ops      - Registered hash table operations
64  * @flags    - LH_REHASH enable synamic hash resizing
65  *           - LH_SORT enable chained hash sort
66  */
67 lustre_hash_t *
68 lustre_hash_init(char *name, unsigned int cur_size, unsigned int max_size,
69                  lustre_hash_ops_t *ops, int flags)
70 {
71         lustre_hash_t *lh;
72         int            i;
73         ENTRY;
74
75         LASSERT(name != NULL);
76         LASSERT(ops != NULL);
77
78         /*
79          * Ensure hash is a power of two to allow the use of a bitmask
80          * in the hash function instead of a more expensive modulus.
81          */
82         LASSERTF(cur_size && (cur_size & (cur_size - 1)) == 0,
83                  "Size (%u) is not power of 2\n", cur_size);
84         LASSERTF(max_size && (max_size & (max_size - 1)) == 0,
85                  "Size (%u) is not power of 2\n", max_size);
86
87         OBD_ALLOC_PTR(lh);
88         if (!lh)
89                 RETURN(NULL);
90
91         strncpy(lh->lh_name, name, sizeof(lh->lh_name));
92         atomic_set(&lh->lh_rehash_count, 0);
93         atomic_set(&lh->lh_count, 0);
94         rwlock_init(&lh->lh_rwlock);
95         lh->lh_cur_size = cur_size;
96         lh->lh_min_size = cur_size;
97         lh->lh_max_size = max_size;
98         lh->lh_min_theta = 500;  /* theta * 1000 */
99         lh->lh_max_theta = 2000; /* theta * 1000 */
100         lh->lh_ops = ops;
101         lh->lh_flags = flags;
102
103         OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size);
104         if (!lh->lh_buckets) {
105                 OBD_FREE_PTR(lh);
106                 RETURN(NULL);
107         }
108
109         for (i = 0; i < lh->lh_cur_size; i++) {
110                 INIT_HLIST_HEAD(&lh->lh_buckets[i].lhb_head);
111                 rwlock_init(&lh->lh_buckets[i].lhb_rwlock);
112                 atomic_set(&lh->lh_buckets[i].lhb_count, 0);
113         }
114
115         return lh;
116 }
117 EXPORT_SYMBOL(lustre_hash_init);
118
119 /**
120  * Cleanup lustre hash @lh.
121  */
122 void
123 lustre_hash_exit(lustre_hash_t *lh)
124 {
125         lustre_hash_bucket_t *lhb;
126         struct hlist_node    *hnode;
127         struct hlist_node    *pos;
128         int                   i;
129         ENTRY;
130
131         LASSERT(lh != NULL);
132
133         write_lock(&lh->lh_rwlock);
134
135         lh_for_each_bucket(lh, lhb, i) {
136                 write_lock(&lhb->lhb_rwlock);
137                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
138                         __lustre_hash_bucket_validate(lh, lhb, hnode);
139                         __lustre_hash_bucket_del(lh, lhb, hnode);
140                         lh_exit(lh, hnode);
141                 }
142
143                 LASSERT(hlist_empty(&(lhb->lhb_head)));
144                 LASSERT(atomic_read(&lhb->lhb_count) == 0);
145                 write_unlock(&lhb->lhb_rwlock);
146         }
147
148         OBD_VFREE(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size);
149         LASSERT(atomic_read(&lh->lh_count) == 0);
150         write_unlock(&lh->lh_rwlock);
151
152         OBD_FREE_PTR(lh);
153         EXIT;
154 }
155 EXPORT_SYMBOL(lustre_hash_exit);
156
157 static inline unsigned int lustre_hash_rehash_size(lustre_hash_t *lh)
158 {
159         if (!(lh->lh_flags & LH_REHASH))
160                 return 0;
161
162         if ((lh->lh_cur_size < lh->lh_max_size) &&
163             (__lustre_hash_theta(lh) > lh->lh_max_theta))
164                 return MIN(lh->lh_cur_size * 2, lh->lh_max_size);
165
166         if ((lh->lh_cur_size > lh->lh_min_size) &&
167             (__lustre_hash_theta(lh) < lh->lh_min_theta))
168                 return MAX(lh->lh_cur_size / 2, lh->lh_min_size);
169
170         return 0;
171 }
172
173 /**
174  * Add item @hnode to lustre hash @lh using @key.  The registered
175  * ops->lh_get function will be called when the item is added.
176  */
177 void
178 lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
179 {
180         lustre_hash_bucket_t *lhb;
181         int                   size;
182         unsigned              i;
183         ENTRY;
184
185         __lustre_hash_key_validate(lh, key, hnode);
186
187         read_lock(&lh->lh_rwlock);
188         i = lh_hash(lh, key, lh->lh_cur_size - 1);
189         lhb = &lh->lh_buckets[i];
190         LASSERT(i < lh->lh_cur_size);
191         LASSERT(hlist_unhashed(hnode));
192
193         write_lock(&lhb->lhb_rwlock);
194         __lustre_hash_bucket_add(lh, lhb, hnode);
195         write_unlock(&lhb->lhb_rwlock);
196
197         size = lustre_hash_rehash_size(lh);
198         read_unlock(&lh->lh_rwlock);
199         if (size)
200                 lustre_hash_rehash(lh, size);
201
202         EXIT;
203 }
204 EXPORT_SYMBOL(lustre_hash_add);
205
206 static struct hlist_node *
207 lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
208                                  struct hlist_node *hnode)
209 {
210         struct hlist_node    *ehnode;
211         lustre_hash_bucket_t *lhb;
212         int                   size;
213         unsigned              i;
214         ENTRY;
215
216         __lustre_hash_key_validate(lh, key, hnode);
217
218         read_lock(&lh->lh_rwlock);
219         i = lh_hash(lh, key, lh->lh_cur_size - 1);
220         lhb = &lh->lh_buckets[i];
221         LASSERT(i < lh->lh_cur_size);
222         LASSERT(hlist_unhashed(hnode));
223
224         write_lock(&lhb->lhb_rwlock);
225         ehnode = __lustre_hash_bucket_lookup(lh, lhb, key);
226         if (ehnode) {
227                 lh_get(lh, ehnode);
228         } else {
229                 __lustre_hash_bucket_add(lh, lhb, hnode);
230                 ehnode = hnode;
231         }
232         write_unlock(&lhb->lhb_rwlock);
233
234         size = lustre_hash_rehash_size(lh);
235         read_unlock(&lh->lh_rwlock);
236         if (size)
237                 lustre_hash_rehash(lh, size);
238
239         RETURN(ehnode);
240 }
241
242 /**
243  * Add item @hnode to lustre hash @lh using @key.  The registered
244  * ops->lh_get function will be called if the item was added.
245  * Returns 0 on success or -EALREADY on key collisions.
246  */
247 int
248 lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
249 {
250         struct hlist_node    *ehnode;
251         ENTRY;
252
253         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
254         if (ehnode != hnode)
255                 RETURN(-EALREADY);
256
257         RETURN(0);
258 }
259 EXPORT_SYMBOL(lustre_hash_add_unique);
260
261 /**
262  * Add item @hnode to lustre hash @lh using @key.  If this @key
263  * already exists in the hash then ops->lh_get will be called on the
264  * conflicting entry and that entry will be returned to the caller.
265  * Otherwise ops->lh_get is called on the item which was added.
266  */
267 void *
268 lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
269                            struct hlist_node *hnode)
270 {
271         struct hlist_node    *ehnode;
272         void                 *obj;
273         ENTRY;
274
275         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
276         obj = lh_get(lh, ehnode);
277         lh_put(lh, ehnode);
278         RETURN(obj);
279 }
280 EXPORT_SYMBOL(lustre_hash_findadd_unique);
281
282 /**
283  * Delete item @hnode from the lustre hash @lh using @key.  The @key
284  * is required to ensure the correct hash bucket is locked since there
285  * is no direct linkage from the item to the bucket.  The object
286  * removed from the hash will be returned and obs->lh_put is called
287  * on the removed object.
288  */
289 void *
290 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
291 {
292         lustre_hash_bucket_t *lhb;
293         int                   size;
294         unsigned              i;
295         void                 *obj;
296         ENTRY;
297
298         __lustre_hash_key_validate(lh, key, hnode);
299
300         read_lock(&lh->lh_rwlock);
301         i = lh_hash(lh, key, lh->lh_cur_size - 1);
302         lhb = &lh->lh_buckets[i];
303         LASSERT(i < lh->lh_cur_size);
304         LASSERT(!hlist_unhashed(hnode));
305
306         write_lock(&lhb->lhb_rwlock);
307         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
308         write_unlock(&lhb->lhb_rwlock);
309
310         size = lustre_hash_rehash_size(lh);
311         read_unlock(&lh->lh_rwlock);
312         if (size)
313                 lustre_hash_rehash(lh, size);
314
315         RETURN(obj);
316 }
317 EXPORT_SYMBOL(lustre_hash_del);
318
319 /**
320  * Delete item given @key in lustre hash @lh.  The first @key found in
321  * the hash will be removed, if the key exists multiple times in the hash
322  * @lh this function must be called once per key.  The removed object
323  * will be returned and ops->lh_put is called on the removed object.
324  */
325 void *
326 lustre_hash_del_key(lustre_hash_t *lh, void *key)
327 {
328         struct hlist_node    *hnode;
329         lustre_hash_bucket_t *lhb;
330         int                   size;
331         unsigned              i;
332         void                 *obj = NULL;
333         ENTRY;
334
335         read_lock(&lh->lh_rwlock);
336         i = lh_hash(lh, key, lh->lh_cur_size - 1);
337         lhb = &lh->lh_buckets[i];
338         LASSERT(i < lh->lh_cur_size);
339
340         write_lock(&lhb->lhb_rwlock);
341         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
342         if (hnode)
343                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
344
345         write_unlock(&lhb->lhb_rwlock);
346
347         size = lustre_hash_rehash_size(lh);
348         read_unlock(&lh->lh_rwlock);
349         if (size)
350                 lustre_hash_rehash(lh, size);
351
352         RETURN(obj);
353 }
354 EXPORT_SYMBOL(lustre_hash_del_key);
355
356 /**
357  * Lookup an item using @key in the lustre hash @lh and return it.
358  * If the @key is found in the hash lh->lh_get() is called and the
359  * matching objects is returned.  It is the callers responsibility
360  * to call the counterpart ops->lh_put using the lh_put() macro
361  * when when finished with the object.  If the @key was not found
362  * in the hash @lh NULL is returned.
363  */
364 void *
365 lustre_hash_lookup(lustre_hash_t *lh, void *key)
366 {
367         struct hlist_node    *hnode;
368         lustre_hash_bucket_t *lhb;
369         unsigned              i;
370         void                 *obj = NULL;
371         ENTRY;
372
373         read_lock(&lh->lh_rwlock);
374         i = lh_hash(lh, key, lh->lh_cur_size - 1);
375         lhb = &lh->lh_buckets[i];
376         LASSERT(i < lh->lh_cur_size);
377
378         read_lock(&lhb->lhb_rwlock);
379         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
380         if (hnode)
381                 obj = lh_get(lh, hnode);
382
383         read_unlock(&lhb->lhb_rwlock);
384         read_unlock(&lh->lh_rwlock);
385
386         RETURN(obj);
387 }
388 EXPORT_SYMBOL(lustre_hash_lookup);
389
390 /**
391  * For each item in the lustre hash @lh call the passed callback @func
392  * and pass to it as an argument each hash item and the private @data.
393  * Before each callback ops->lh_get will be called, and after each
394  * callback ops->lh_put will be called.  Finally, during the callback
395  * the bucket lock is held so the callback must never sleep.
396  */
397 void
398 lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data)
399 {
400         struct hlist_node    *hnode;
401         lustre_hash_bucket_t *lhb;
402         void                 *obj;
403         int                   i;
404         ENTRY;
405
406         read_lock(&lh->lh_rwlock);
407         lh_for_each_bucket(lh, lhb, i) {
408                 read_lock(&lhb->lhb_rwlock);
409                 hlist_for_each(hnode, &(lhb->lhb_head)) {
410                         __lustre_hash_bucket_validate(lh, lhb, hnode);
411                         obj = lh_get(lh, hnode);
412                         func(obj, data);
413                         (void)lh_put(lh, hnode);
414                 }
415                 read_unlock(&lhb->lhb_rwlock);
416         }
417         read_unlock(&lh->lh_rwlock);
418
419         EXIT;
420 }
421 EXPORT_SYMBOL(lustre_hash_for_each);
422
423 /**
424  * For each item in the lustre hash @lh call the passed callback @func
425  * and pass to it as an argument each hash item and the private @data.
426  * Before each callback ops->lh_get will be called, and after each
427  * callback ops->lh_put will be called.  During the callback the
428  * bucket lock will not be held will allows for the current item
429  * to be removed from the hash during the callback.  However, care
430  * should be taken to prevent other callers from operating on the
431  * hash concurrently or list corruption may occur.
432  */
433 void
434 lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data)
435 {
436         struct hlist_node    *hnode;
437         struct hlist_node    *pos;
438         lustre_hash_bucket_t *lhb;
439         void                 *obj;
440         int                   i;
441         ENTRY;
442
443         read_lock(&lh->lh_rwlock);
444         lh_for_each_bucket(lh, lhb, i) {
445                 read_lock(&lhb->lhb_rwlock);
446                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
447                         __lustre_hash_bucket_validate(lh, lhb, hnode);
448                         obj = lh_get(lh, hnode);
449                         read_unlock(&lhb->lhb_rwlock);
450                         func(obj, data);
451                         read_lock(&lhb->lhb_rwlock);
452                         (void)lh_put(lh, hnode);
453                 }
454                 read_unlock(&lhb->lhb_rwlock);
455         }
456         read_unlock(&lh->lh_rwlock);
457         EXIT;
458 }
459 EXPORT_SYMBOL(lustre_hash_for_each_safe);
460
461 /**
462  * For each hash bucket in the lustre hash @lh call the passed callback
463  * @func until all the hash buckets are empty.  The passed callback @func
464  * or the previously registered callback lh->lh_put must remove the item
465  * from the hash.  You may either use the lustre_hash_del() or hlist_del()
466  * functions.  No rwlocks will be held during the callback @func it is
467  * safe to sleep if needed.  This function will not terminate until the
468  * hash is empty.  Note it is still possible to concurrently add new
469  * items in to the hash.  It is the callers responsibility to ensure
470  * the required locking is in place to prevent concurrent insertions.
471  */
472 void
473 lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data)
474 {
475         struct hlist_node    *hnode;
476         lustre_hash_bucket_t *lhb;
477         void                 *obj;
478         int                   i;
479         ENTRY;
480
481 restart:
482         read_lock(&lh->lh_rwlock);
483         lh_for_each_bucket(lh, lhb, i) {
484                 write_lock(&lhb->lhb_rwlock);
485                 while (!hlist_empty(&lhb->lhb_head)) {
486                         hnode =  lhb->lhb_head.first;
487                         __lustre_hash_bucket_validate(lh, lhb, hnode);
488                         obj = lh_get(lh, hnode);
489                         write_unlock(&lhb->lhb_rwlock);
490                         read_unlock(&lh->lh_rwlock);
491                         func(obj, data);
492                         (void)lh_put(lh, hnode);
493                         goto restart;
494                 }
495                 write_unlock(&lhb->lhb_rwlock);
496         }
497         read_unlock(&lh->lh_rwlock);
498         EXIT;
499 }
500 EXPORT_SYMBOL(lustre_hash_for_each_empty);
501
502   /*
503  * For each item in the lustre hash @lh which matches the @key call
504  * the passed callback @func and pass to it as an argument each hash
505  * item and the private @data.  Before each callback ops->lh_get will
506  * be called, and after each callback ops->lh_put will be called.
507  * Finally, during the callback the bucket lock is held so the
508  * callback must never sleep.
509    */
510 void
511 lustre_hash_for_each_key(lustre_hash_t *lh, void *key,
512                          lh_for_each_cb func, void *data)
513 {
514         struct hlist_node    *hnode;
515         lustre_hash_bucket_t *lhb;
516         unsigned              i;
517         ENTRY;
518
519         read_lock(&lh->lh_rwlock);
520         i = lh_hash(lh, key, lh->lh_cur_size - 1);
521         lhb = &lh->lh_buckets[i];
522         LASSERT(i < lh->lh_cur_size);
523
524         read_lock(&lhb->lhb_rwlock);
525         hlist_for_each(hnode, &(lhb->lhb_head)) {
526                 __lustre_hash_bucket_validate(lh, lhb, hnode);
527
528                 if (!lh_compare(lh, key, hnode))
529                         continue;
530
531                 func(lh_get(lh, hnode), data);
532                 (void)lh_put(lh, hnode);
533         }
534
535         read_unlock(&lhb->lhb_rwlock);
536         read_unlock(&lh->lh_rwlock);
537
538         EXIT;
539 }
540 EXPORT_SYMBOL(lustre_hash_for_each_key);
541
542 /**
543  * Rehash the lustre hash @lh to the given @size.  This can be used
544  * to grow the hash size when excessive chaining is detected, or to
545  * shrink the hash when it is larger than needed.  When the LH_REHASH
546  * flag is set in @lh the lustre hash may be dynamically rehashed
547  * during addition or removal if the hash's theta value exceeds
548  * either the lh->lh_min_theta or lh->max_theta values.  By default
549  * these values are tuned to keep the chained hash depth small, and
550  * this approach assumes a reasonably uniform hashing function.  The
551  * theta thresholds for @lh are tunable via lustre_hash_set_theta().
552  */
553 int
554 lustre_hash_rehash(lustre_hash_t *lh, int size)
555 {
556         struct hlist_node     *hnode;
557         struct hlist_node     *pos;
558         lustre_hash_bucket_t  *lh_buckets;
559         lustre_hash_bucket_t  *rehash_buckets;
560         lustre_hash_bucket_t  *lh_lhb;
561         lustre_hash_bucket_t  *rehash_lhb;
562         int                    i;
563         int                    lh_size;
564         int                    theta;
565         void                  *key;
566         ENTRY;
567
568         LASSERT(size > 0);
569
570         OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) * size);
571         if (!rehash_buckets)
572                 RETURN(-ENOMEM);
573
574         for (i = 0; i < size; i++) {
575                 INIT_HLIST_HEAD(&rehash_buckets[i].lhb_head);
576                 rwlock_init(&rehash_buckets[i].lhb_rwlock);
577                 atomic_set(&rehash_buckets[i].lhb_count, 0);
578         }
579
580         write_lock(&lh->lh_rwlock);
581
582         /*
583          * Early return for multiple concurrent racing callers,
584          * ensure we only trigger the rehash if it is still needed.
585          */
586         theta = __lustre_hash_theta(lh);
587         if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
588                 OBD_VFREE(rehash_buckets, sizeof(*rehash_buckets) * size);
589                 write_unlock(&lh->lh_rwlock);
590                 RETURN(-EALREADY);
591         }
592
593         lh_size = lh->lh_cur_size;
594         lh_buckets = lh->lh_buckets;
595
596         lh->lh_cur_size = size;
597         lh->lh_buckets = rehash_buckets;
598         atomic_inc(&lh->lh_rehash_count);
599
600         for (i = 0; i < lh_size; i++) {
601                 lh_lhb = &lh_buckets[i];
602
603                 write_lock(&lh_lhb->lhb_rwlock);
604                 hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
605                         key = lh_key(lh, hnode);
606                         LASSERT(key);
607
608                         /*
609                          * Validate hnode is in the correct bucket.
610                          */
611                         if (unlikely(lh->lh_flags & LH_DEBUG))
612                                 LASSERT(lh_hash(lh, key, lh_size - 1) == i);
613
614                         /*
615                          * Delete from old hash bucket.
616                          */
617                         hlist_del(hnode);
618                         LASSERT(atomic_read(&lh_lhb->lhb_count) > 0);
619                         atomic_dec(&lh_lhb->lhb_count);
620
621                         /*
622                          * Add to rehash bucket, ops->lh_key must be defined.
623                          */
624                         rehash_lhb = &rehash_buckets[lh_hash(lh, key, size-1)];
625                         hlist_add_head(hnode, &(rehash_lhb->lhb_head));
626                         atomic_inc(&rehash_lhb->lhb_count);
627                 }
628
629                 LASSERT(hlist_empty(&(lh_lhb->lhb_head)));
630                 LASSERT(atomic_read(&lh_lhb->lhb_count) == 0);
631                 write_unlock(&lh_lhb->lhb_rwlock);
632         }
633
634         OBD_VFREE(lh_buckets, sizeof(*lh_buckets) * lh_size);
635         write_unlock(&lh->lh_rwlock);
636
637         RETURN(0);
638 }
639 EXPORT_SYMBOL(lustre_hash_rehash);
640
641 /**
642  * Rehash the object referenced by @hnode in the lustre hash @lh.  The
643  * @old_key must be provided to locate the objects previous location
644  * in the hash, and the @new_key will be used to reinsert the object.
645  * Use this function instead of a lustre_hash_add() + lustre_hash_del()
646  * combo when it is critical that there is no window in time where the
647  * object is missing from the hash.  When an object is being rehashed
648  * the registered lh_get() and lh_put() functions will not be called.
649  */
650 void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key,
651                             struct hlist_node *hnode)
652 {
653         lustre_hash_bucket_t  *old_lhb;
654         lustre_hash_bucket_t  *new_lhb;
655         unsigned               i;
656         int                    j;
657         ENTRY;
658
659         __lustre_hash_key_validate(lh, new_key, hnode);
660         LASSERT(!hlist_unhashed(hnode));
661
662         read_lock(&lh->lh_rwlock);
663
664         i = lh_hash(lh, old_key, lh->lh_cur_size - 1);
665         old_lhb = &lh->lh_buckets[i];
666         LASSERT(i < lh->lh_cur_size);
667
668         j = lh_hash(lh, new_key, lh->lh_cur_size - 1);
669         new_lhb = &lh->lh_buckets[j];
670         LASSERT(j < lh->lh_cur_size);
671
672         write_lock(&old_lhb->lhb_rwlock);
673         write_lock(&new_lhb->lhb_rwlock);
674
675         /*
676          * Migrate item between hash buckets without calling
677          * the lh_get() and lh_put() callback functions.
678          */
679         hlist_del(hnode);
680         LASSERT(atomic_read(&old_lhb->lhb_count) > 0);
681         atomic_dec(&old_lhb->lhb_count);
682         hlist_add_head(hnode, &(new_lhb->lhb_head));
683         atomic_inc(&new_lhb->lhb_count);
684
685         write_unlock(&new_lhb->lhb_rwlock);
686         write_unlock(&old_lhb->lhb_rwlock);
687         read_unlock(&lh->lh_rwlock);
688
689         EXIT;
690 }
691 EXPORT_SYMBOL(lustre_hash_rehash_key);
692
693 int lustre_hash_debug_header(char *str, int size)
694 {
695         return snprintf(str, size,
696                  "%-36s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n",
697                  "name", "cur", "min", "max", "theta", "t-min", "t-max",
698                  "flags", "rehash", "count", " distribution");
699 }
700 EXPORT_SYMBOL(lustre_hash_debug_header);
701
702 int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size)
703 {
704         lustre_hash_bucket_t  *lhb;
705         int                    theta;
706         int                    i;
707         int                    c = 0;
708         int                    dist[8] = { 0, };
709
710         if (str == NULL || size == 0)
711                 return 0;
712
713         read_lock(&lh->lh_rwlock);
714         theta = __lustre_hash_theta(lh);
715
716         c += snprintf(str + c, size - c, "%-36s ",lh->lh_name);
717         c += snprintf(str + c, size - c, "%5d ",  lh->lh_cur_size);
718         c += snprintf(str + c, size - c, "%5d ",  lh->lh_min_size);
719         c += snprintf(str + c, size - c, "%5d ",  lh->lh_max_size);
720         c += snprintf(str + c, size - c, "%d.%03d ",
721                       theta / 1000, theta % 1000);
722         c += snprintf(str + c, size - c, "%d.%03d ",
723                       lh->lh_min_theta / 1000, lh->lh_min_theta % 1000);
724         c += snprintf(str + c, size - c, "%d.%03d ",
725                       lh->lh_max_theta / 1000, lh->lh_max_theta % 1000);
726         c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags);
727         c += snprintf(str + c, size - c, "%6d ",
728                       atomic_read(&lh->lh_rehash_count));
729         c += snprintf(str + c, size - c, "%5d ",
730                       atomic_read(&lh->lh_count));
731
732         /*
733          * The distribution is a summary of the chained hash depth in
734          * each of the lustre hash buckets.  Each buckets lhb_count is
735          * divided by the hash theta value and used to generate a
736          * histogram of the hash distribution.  A uniform hash will
737          * result in all hash buckets being close to the average thus
738          * only the first few entries in the histogram will be non-zero.
739          * If you hash function results in a non-uniform hash the will
740          * be observable by outlier bucks in the distribution histogram.
741          *
742          * Uniform hash distribution:      128/128/0/0/0/0/0/0
743          * Non-Uniform hash distribution:  128/125/0/0/0/0/2/1
744          */
745         lh_for_each_bucket(lh, lhb, i)
746                 dist[MIN(__fls(atomic_read(&lhb->lhb_count)/MAX(theta,1)),7)]++;
747
748         for (i = 0; i < 8; i++)
749                 c += snprintf(str + c, size - c, "%d%c",  dist[i],
750                               (i == 7) ? '\n' : '/');
751
752         read_unlock(&lh->lh_rwlock);
753
754         return c;
755 }
756 EXPORT_SYMBOL(lustre_hash_debug_str);