Whamcloud - gitweb
55b41dfe46f2ca6a62704d1402c828a42e98096e
[fs/lustre-release.git] / lustre / obdclass / class_hash.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/class_hash.c
37  *
38  * Implement a hash class for hash process in lustre system.
39  *
40  * Author: YuZhangyong <yzy@clusterfs.com>
41  *
42  * 2008-08-15: Brian Behlendorf <behlendorf1@llnl.gov>
43  * - Simplified API and improved documentation
44  * - Added per-hash feature flags:
45  *   * LH_DEBUG additional validation
46  *   * LH_REHASH dynamic rehashing
47  * - Added per-hash statistics
48  * - General performance enhancements
49  */
50
51 #ifndef __KERNEL__
52 #include <liblustre.h>
53 #include <obd.h>
54 #endif
55
56 #include <class_hash.h>
57
58 /**
59  * Initialize new lustre hash, where:
60  * @name     - Descriptive hash name
61  * @cur_size - Initial hash table size
62  * @max_size - Maximum allowed hash table resize
63  * @ops      - Registered hash table operations
64  * @flags    - LH_REHASH enable synamic hash resizing
65  *           - LH_SORT enable chained hash sort
66  */
67 lustre_hash_t *
68 lustre_hash_init(char *name, unsigned int cur_size, unsigned int max_size,
69                  lustre_hash_ops_t *ops, int flags)
70 {
71         lustre_hash_t *lh;
72         int            i;
73         ENTRY;
74
75         LASSERT(name != NULL);
76         LASSERT(ops != NULL);
77
78         /*
79          * Ensure hash is a power of two to allow the use of a bitmask
80          * in the hash function instead of a more expensive modulus.
81          */
82         LASSERTF(cur_size && (cur_size & (cur_size - 1)) == 0,
83                  "Size (%u) is not power of 2\n", cur_size);
84         LASSERTF(max_size && (max_size & (max_size - 1)) == 0,
85                  "Size (%u) is not power of 2\n", max_size);
86
87         OBD_ALLOC_PTR(lh);
88         if (!lh)
89                 RETURN(NULL);
90
91         strncpy(lh->lh_name, name, sizeof(lh->lh_name));
92         atomic_set(&lh->lh_rehash_count, 0);
93         atomic_set(&lh->lh_count, 0);
94         rwlock_init(&lh->lh_rwlock);
95         lh->lh_cur_size = cur_size;
96         lh->lh_min_size = cur_size;
97         lh->lh_max_size = max_size;
98         lh->lh_min_theta = 500;  /* theta * 1000 */
99         lh->lh_max_theta = 2000; /* theta * 1000 */
100         lh->lh_ops = ops;
101         lh->lh_flags = flags;
102
103         OBD_VMALLOC(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size);
104         if (!lh->lh_buckets) {
105                 OBD_FREE_PTR(lh);
106                 RETURN(NULL);
107         }
108
109         for (i = 0; i < lh->lh_cur_size; i++) {
110                 INIT_HLIST_HEAD(&lh->lh_buckets[i].lhb_head);
111                 rwlock_init(&lh->lh_buckets[i].lhb_rwlock);
112                 atomic_set(&lh->lh_buckets[i].lhb_count, 0);
113         }
114
115         return lh;
116 }
117 EXPORT_SYMBOL(lustre_hash_init);
118
119 /**
120  * Cleanup lustre hash @lh.
121  */
122 void
123 lustre_hash_exit(lustre_hash_t *lh)
124 {
125         lustre_hash_bucket_t *lhb;
126         struct hlist_node    *hnode;
127         struct hlist_node    *pos;
128         int                   i;
129         ENTRY;
130
131         if (!lh)
132                 return;
133
134         write_lock(&lh->lh_rwlock);
135
136         lh_for_each_bucket(lh, lhb, i) {
137                 write_lock(&lhb->lhb_rwlock);
138                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
139                         __lustre_hash_bucket_validate(lh, lhb, hnode);
140                         __lustre_hash_bucket_del(lh, lhb, hnode);
141                         lh_exit(lh, hnode);
142                 }
143
144                 LASSERT(hlist_empty(&(lhb->lhb_head)));
145                 LASSERT(atomic_read(&lhb->lhb_count) == 0);
146                 write_unlock(&lhb->lhb_rwlock);
147         }
148
149         OBD_VFREE(lh->lh_buckets, sizeof(*lh->lh_buckets) * lh->lh_cur_size);
150         LASSERT(atomic_read(&lh->lh_count) == 0);
151         write_unlock(&lh->lh_rwlock);
152
153         OBD_FREE_PTR(lh);
154         EXIT;
155 }
156 EXPORT_SYMBOL(lustre_hash_exit);
157
158 static inline unsigned int lustre_hash_rehash_size(lustre_hash_t *lh)
159 {
160         if (!(lh->lh_flags & LH_REHASH))
161                 return 0;
162
163         if ((lh->lh_cur_size < lh->lh_max_size) &&
164             (__lustre_hash_theta(lh) > lh->lh_max_theta))
165                 return MIN(lh->lh_cur_size * 2, lh->lh_max_size);
166
167         if ((lh->lh_cur_size > lh->lh_min_size) &&
168             (__lustre_hash_theta(lh) < lh->lh_min_theta))
169                 return MAX(lh->lh_cur_size / 2, lh->lh_min_size);
170
171         return 0;
172 }
173
174 /**
175  * Add item @hnode to lustre hash @lh using @key.  The registered
176  * ops->lh_get function will be called when the item is added.
177  */
178 void
179 lustre_hash_add(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
180 {
181         lustre_hash_bucket_t *lhb;
182         int                   size;
183         unsigned              i;
184         ENTRY;
185
186         __lustre_hash_key_validate(lh, key, hnode);
187
188         read_lock(&lh->lh_rwlock);
189         i = lh_hash(lh, key, lh->lh_cur_size - 1);
190         lhb = &lh->lh_buckets[i];
191         LASSERT(i < lh->lh_cur_size);
192         LASSERT(hlist_unhashed(hnode));
193
194         write_lock(&lhb->lhb_rwlock);
195         __lustre_hash_bucket_add(lh, lhb, hnode);
196         write_unlock(&lhb->lhb_rwlock);
197
198         size = lustre_hash_rehash_size(lh);
199         read_unlock(&lh->lh_rwlock);
200         if (size)
201                 lustre_hash_rehash(lh, size);
202
203         EXIT;
204 }
205 EXPORT_SYMBOL(lustre_hash_add);
206
207 static struct hlist_node *
208 lustre_hash_findadd_unique_hnode(lustre_hash_t *lh, void *key,
209                                  struct hlist_node *hnode)
210 {
211         struct hlist_node    *ehnode;
212         lustre_hash_bucket_t *lhb;
213         int                   size;
214         unsigned              i;
215         ENTRY;
216
217         __lustre_hash_key_validate(lh, key, hnode);
218
219         read_lock(&lh->lh_rwlock);
220         i = lh_hash(lh, key, lh->lh_cur_size - 1);
221         lhb = &lh->lh_buckets[i];
222         LASSERT(i < lh->lh_cur_size);
223         LASSERT(hlist_unhashed(hnode));
224
225         write_lock(&lhb->lhb_rwlock);
226         ehnode = __lustre_hash_bucket_lookup(lh, lhb, key);
227         if (ehnode) {
228                 lh_get(lh, ehnode);
229         } else {
230                 __lustre_hash_bucket_add(lh, lhb, hnode);
231                 ehnode = hnode;
232         }
233         write_unlock(&lhb->lhb_rwlock);
234
235         size = lustre_hash_rehash_size(lh);
236         read_unlock(&lh->lh_rwlock);
237         if (size)
238                 lustre_hash_rehash(lh, size);
239
240         RETURN(ehnode);
241 }
242
243 /**
244  * Add item @hnode to lustre hash @lh using @key.  The registered
245  * ops->lh_get function will be called if the item was added.
246  * Returns 0 on success or -EALREADY on key collisions.
247  */
248 int
249 lustre_hash_add_unique(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
250 {
251         struct hlist_node    *ehnode;
252         ENTRY;
253
254         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
255         if (ehnode != hnode)
256                 RETURN(-EALREADY);
257
258         RETURN(0);
259 }
260 EXPORT_SYMBOL(lustre_hash_add_unique);
261
262 /**
263  * Add item @hnode to lustre hash @lh using @key.  If this @key
264  * already exists in the hash then ops->lh_get will be called on the
265  * conflicting entry and that entry will be returned to the caller.
266  * Otherwise ops->lh_get is called on the item which was added.
267  */
268 void *
269 lustre_hash_findadd_unique(lustre_hash_t *lh, void *key,
270                            struct hlist_node *hnode)
271 {
272         struct hlist_node    *ehnode;
273         void                 *obj;
274         ENTRY;
275
276         ehnode = lustre_hash_findadd_unique_hnode(lh, key, hnode);
277         obj = lh_get(lh, ehnode);
278         lh_put(lh, ehnode);
279         RETURN(obj);
280 }
281 EXPORT_SYMBOL(lustre_hash_findadd_unique);
282
283 /**
284  * Delete item @hnode from the lustre hash @lh using @key.  The @key
285  * is required to ensure the correct hash bucket is locked since there
286  * is no direct linkage from the item to the bucket.  The object
287  * removed from the hash will be returned and obs->lh_put is called
288  * on the removed object.
289  */
290 void *
291 lustre_hash_del(lustre_hash_t *lh, void *key, struct hlist_node *hnode)
292 {
293         lustre_hash_bucket_t *lhb;
294         int                   size;
295         unsigned              i;
296         void                 *obj;
297         ENTRY;
298
299         __lustre_hash_key_validate(lh, key, hnode);
300
301         read_lock(&lh->lh_rwlock);
302         i = lh_hash(lh, key, lh->lh_cur_size - 1);
303         lhb = &lh->lh_buckets[i];
304         LASSERT(i < lh->lh_cur_size);
305         LASSERT(!hlist_unhashed(hnode));
306
307         write_lock(&lhb->lhb_rwlock);
308         obj = __lustre_hash_bucket_del(lh, lhb, hnode);
309         write_unlock(&lhb->lhb_rwlock);
310
311         size = lustre_hash_rehash_size(lh);
312         read_unlock(&lh->lh_rwlock);
313         if (size)
314                 lustre_hash_rehash(lh, size);
315
316         RETURN(obj);
317 }
318 EXPORT_SYMBOL(lustre_hash_del);
319
320 /**
321  * Delete item given @key in lustre hash @lh.  The first @key found in
322  * the hash will be removed, if the key exists multiple times in the hash
323  * @lh this function must be called once per key.  The removed object
324  * will be returned and ops->lh_put is called on the removed object.
325  */
326 void *
327 lustre_hash_del_key(lustre_hash_t *lh, void *key)
328 {
329         struct hlist_node    *hnode;
330         lustre_hash_bucket_t *lhb;
331         int                   size;
332         unsigned              i;
333         void                 *obj = NULL;
334         ENTRY;
335
336         read_lock(&lh->lh_rwlock);
337         i = lh_hash(lh, key, lh->lh_cur_size - 1);
338         lhb = &lh->lh_buckets[i];
339         LASSERT(i < lh->lh_cur_size);
340
341         write_lock(&lhb->lhb_rwlock);
342         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
343         if (hnode)
344                 obj = __lustre_hash_bucket_del(lh, lhb, hnode);
345
346         write_unlock(&lhb->lhb_rwlock);
347
348         size = lustre_hash_rehash_size(lh);
349         read_unlock(&lh->lh_rwlock);
350         if (size)
351                 lustre_hash_rehash(lh, size);
352
353         RETURN(obj);
354 }
355 EXPORT_SYMBOL(lustre_hash_del_key);
356
357 /**
358  * Lookup an item using @key in the lustre hash @lh and return it.
359  * If the @key is found in the hash lh->lh_get() is called and the
360  * matching objects is returned.  It is the callers responsibility
361  * to call the counterpart ops->lh_put using the lh_put() macro
362  * when when finished with the object.  If the @key was not found
363  * in the hash @lh NULL is returned.
364  */
365 void *
366 lustre_hash_lookup(lustre_hash_t *lh, void *key)
367 {
368         struct hlist_node    *hnode;
369         lustre_hash_bucket_t *lhb;
370         unsigned              i;
371         void                 *obj = NULL;
372         ENTRY;
373
374         read_lock(&lh->lh_rwlock);
375         i = lh_hash(lh, key, lh->lh_cur_size - 1);
376         lhb = &lh->lh_buckets[i];
377         LASSERT(i < lh->lh_cur_size);
378
379         read_lock(&lhb->lhb_rwlock);
380         hnode = __lustre_hash_bucket_lookup(lh, lhb, key);
381         if (hnode)
382                 obj = lh_get(lh, hnode);
383
384         read_unlock(&lhb->lhb_rwlock);
385         read_unlock(&lh->lh_rwlock);
386
387         RETURN(obj);
388 }
389 EXPORT_SYMBOL(lustre_hash_lookup);
390
391 /**
392  * For each item in the lustre hash @lh call the passed callback @func
393  * and pass to it as an argument each hash item and the private @data.
394  * Before each callback ops->lh_get will be called, and after each
395  * callback ops->lh_put will be called.  Finally, during the callback
396  * the bucket lock is held so the callback must never sleep.
397  */
398 void
399 lustre_hash_for_each(lustre_hash_t *lh, lh_for_each_cb func, void *data)
400 {
401         struct hlist_node    *hnode;
402         lustre_hash_bucket_t *lhb;
403         void                 *obj;
404         int                   i;
405         ENTRY;
406
407         read_lock(&lh->lh_rwlock);
408         lh_for_each_bucket(lh, lhb, i) {
409                 read_lock(&lhb->lhb_rwlock);
410                 hlist_for_each(hnode, &(lhb->lhb_head)) {
411                         __lustre_hash_bucket_validate(lh, lhb, hnode);
412                         obj = lh_get(lh, hnode);
413                         func(obj, data);
414                         (void)lh_put(lh, hnode);
415                 }
416                 read_unlock(&lhb->lhb_rwlock);
417         }
418         read_unlock(&lh->lh_rwlock);
419
420         EXIT;
421 }
422 EXPORT_SYMBOL(lustre_hash_for_each);
423
424 /**
425  * For each item in the lustre hash @lh call the passed callback @func
426  * and pass to it as an argument each hash item and the private @data.
427  * Before each callback ops->lh_get will be called, and after each
428  * callback ops->lh_put will be called.  During the callback the
429  * bucket lock will not be held will allows for the current item
430  * to be removed from the hash during the callback.  However, care
431  * should be taken to prevent other callers from operating on the
432  * hash concurrently or list corruption may occur.
433  */
434 void
435 lustre_hash_for_each_safe(lustre_hash_t *lh, lh_for_each_cb func, void *data)
436 {
437         struct hlist_node    *hnode;
438         struct hlist_node    *pos;
439         lustre_hash_bucket_t *lhb;
440         void                 *obj;
441         int                   i;
442         ENTRY;
443
444         read_lock(&lh->lh_rwlock);
445         lh_for_each_bucket(lh, lhb, i) {
446                 read_lock(&lhb->lhb_rwlock);
447                 hlist_for_each_safe(hnode, pos, &(lhb->lhb_head)) {
448                         __lustre_hash_bucket_validate(lh, lhb, hnode);
449                         obj = lh_get(lh, hnode);
450                         read_unlock(&lhb->lhb_rwlock);
451                         func(obj, data);
452                         read_lock(&lhb->lhb_rwlock);
453                         (void)lh_put(lh, hnode);
454                 }
455                 read_unlock(&lhb->lhb_rwlock);
456         }
457         read_unlock(&lh->lh_rwlock);
458         EXIT;
459 }
460 EXPORT_SYMBOL(lustre_hash_for_each_safe);
461
462 /**
463  * For each hash bucket in the lustre hash @lh call the passed callback
464  * @func until all the hash buckets are empty.  The passed callback @func
465  * or the previously registered callback lh->lh_put must remove the item
466  * from the hash.  You may either use the lustre_hash_del() or hlist_del()
467  * functions.  No rwlocks will be held during the callback @func it is
468  * safe to sleep if needed.  This function will not terminate until the
469  * hash is empty.  Note it is still possible to concurrently add new
470  * items in to the hash.  It is the callers responsibility to ensure
471  * the required locking is in place to prevent concurrent insertions.
472  */
473 void
474 lustre_hash_for_each_empty(lustre_hash_t *lh, lh_for_each_cb func, void *data)
475 {
476         struct hlist_node    *hnode;
477         lustre_hash_bucket_t *lhb;
478         void                 *obj;
479         int                   i;
480         ENTRY;
481
482 restart:
483         read_lock(&lh->lh_rwlock);
484         lh_for_each_bucket(lh, lhb, i) {
485                 write_lock(&lhb->lhb_rwlock);
486                 while (!hlist_empty(&lhb->lhb_head)) {
487                         hnode =  lhb->lhb_head.first;
488                         __lustre_hash_bucket_validate(lh, lhb, hnode);
489                         obj = lh_get(lh, hnode);
490                         write_unlock(&lhb->lhb_rwlock);
491                         read_unlock(&lh->lh_rwlock);
492                         func(obj, data);
493                         (void)lh_put(lh, hnode);
494                         goto restart;
495                 }
496                 write_unlock(&lhb->lhb_rwlock);
497         }
498         read_unlock(&lh->lh_rwlock);
499         EXIT;
500 }
501 EXPORT_SYMBOL(lustre_hash_for_each_empty);
502
503   /*
504  * For each item in the lustre hash @lh which matches the @key call
505  * the passed callback @func and pass to it as an argument each hash
506  * item and the private @data.  Before each callback ops->lh_get will
507  * be called, and after each callback ops->lh_put will be called.
508  * Finally, during the callback the bucket lock is held so the
509  * callback must never sleep.
510    */
511 void
512 lustre_hash_for_each_key(lustre_hash_t *lh, void *key,
513                          lh_for_each_cb func, void *data)
514 {
515         struct hlist_node    *hnode;
516         lustre_hash_bucket_t *lhb;
517         unsigned              i;
518         ENTRY;
519
520         read_lock(&lh->lh_rwlock);
521         i = lh_hash(lh, key, lh->lh_cur_size - 1);
522         lhb = &lh->lh_buckets[i];
523         LASSERT(i < lh->lh_cur_size);
524
525         read_lock(&lhb->lhb_rwlock);
526         hlist_for_each(hnode, &(lhb->lhb_head)) {
527                 __lustre_hash_bucket_validate(lh, lhb, hnode);
528
529                 if (!lh_compare(lh, key, hnode))
530                         continue;
531
532                 func(lh_get(lh, hnode), data);
533                 (void)lh_put(lh, hnode);
534         }
535
536         read_unlock(&lhb->lhb_rwlock);
537         read_unlock(&lh->lh_rwlock);
538
539         EXIT;
540 }
541 EXPORT_SYMBOL(lustre_hash_for_each_key);
542
543 /**
544  * Rehash the lustre hash @lh to the given @size.  This can be used
545  * to grow the hash size when excessive chaining is detected, or to
546  * shrink the hash when it is larger than needed.  When the LH_REHASH
547  * flag is set in @lh the lustre hash may be dynamically rehashed
548  * during addition or removal if the hash's theta value exceeds
549  * either the lh->lh_min_theta or lh->max_theta values.  By default
550  * these values are tuned to keep the chained hash depth small, and
551  * this approach assumes a reasonably uniform hashing function.  The
552  * theta thresholds for @lh are tunable via lustre_hash_set_theta().
553  */
554 int
555 lustre_hash_rehash(lustre_hash_t *lh, int size)
556 {
557         struct hlist_node     *hnode;
558         struct hlist_node     *pos;
559         lustre_hash_bucket_t  *lh_buckets;
560         lustre_hash_bucket_t  *rehash_buckets;
561         lustre_hash_bucket_t  *lh_lhb;
562         lustre_hash_bucket_t  *rehash_lhb;
563         int                    i;
564         int                    lh_size;
565         int                    theta;
566         void                  *key;
567         ENTRY;
568
569         LASSERT(size > 0);
570
571         OBD_VMALLOC(rehash_buckets, sizeof(*rehash_buckets) * size);
572         if (!rehash_buckets)
573                 RETURN(-ENOMEM);
574
575         for (i = 0; i < size; i++) {
576                 INIT_HLIST_HEAD(&rehash_buckets[i].lhb_head);
577                 rwlock_init(&rehash_buckets[i].lhb_rwlock);
578                 atomic_set(&rehash_buckets[i].lhb_count, 0);
579         }
580
581         write_lock(&lh->lh_rwlock);
582
583         /*
584          * Early return for multiple concurrent racing callers,
585          * ensure we only trigger the rehash if it is still needed.
586          */
587         theta = __lustre_hash_theta(lh);
588         if ((theta >= lh->lh_min_theta) && (theta <= lh->lh_max_theta)) {
589                 OBD_VFREE(rehash_buckets, sizeof(*rehash_buckets) * size);
590                 write_unlock(&lh->lh_rwlock);
591                 RETURN(-EALREADY);
592         }
593
594         lh_size = lh->lh_cur_size;
595         lh_buckets = lh->lh_buckets;
596
597         lh->lh_cur_size = size;
598         lh->lh_buckets = rehash_buckets;
599         atomic_inc(&lh->lh_rehash_count);
600
601         for (i = 0; i < lh_size; i++) {
602                 lh_lhb = &lh_buckets[i];
603
604                 write_lock(&lh_lhb->lhb_rwlock);
605                 hlist_for_each_safe(hnode, pos, &(lh_lhb->lhb_head)) {
606                         key = lh_key(lh, hnode);
607                         LASSERT(key);
608
609                         /*
610                          * Validate hnode is in the correct bucket.
611                          */
612                         if (unlikely(lh->lh_flags & LH_DEBUG))
613                                 LASSERT(lh_hash(lh, key, lh_size - 1) == i);
614
615                         /*
616                          * Delete from old hash bucket.
617                          */
618                         hlist_del(hnode);
619                         LASSERT(atomic_read(&lh_lhb->lhb_count) > 0);
620                         atomic_dec(&lh_lhb->lhb_count);
621
622                         /*
623                          * Add to rehash bucket, ops->lh_key must be defined.
624                          */
625                         rehash_lhb = &rehash_buckets[lh_hash(lh, key, size-1)];
626                         hlist_add_head(hnode, &(rehash_lhb->lhb_head));
627                         atomic_inc(&rehash_lhb->lhb_count);
628                 }
629
630                 LASSERT(hlist_empty(&(lh_lhb->lhb_head)));
631                 LASSERT(atomic_read(&lh_lhb->lhb_count) == 0);
632                 write_unlock(&lh_lhb->lhb_rwlock);
633         }
634
635         OBD_VFREE(lh_buckets, sizeof(*lh_buckets) * lh_size);
636         write_unlock(&lh->lh_rwlock);
637
638         RETURN(0);
639 }
640 EXPORT_SYMBOL(lustre_hash_rehash);
641
642 /**
643  * Rehash the object referenced by @hnode in the lustre hash @lh.  The
644  * @old_key must be provided to locate the objects previous location
645  * in the hash, and the @new_key will be used to reinsert the object.
646  * Use this function instead of a lustre_hash_add() + lustre_hash_del()
647  * combo when it is critical that there is no window in time where the
648  * object is missing from the hash.  When an object is being rehashed
649  * the registered lh_get() and lh_put() functions will not be called.
650  */
651 void lustre_hash_rehash_key(lustre_hash_t *lh, void *old_key, void *new_key,
652                             struct hlist_node *hnode)
653 {
654         lustre_hash_bucket_t  *old_lhb;
655         lustre_hash_bucket_t  *new_lhb;
656         unsigned               i;
657         int                    j;
658         ENTRY;
659
660         __lustre_hash_key_validate(lh, new_key, hnode);
661         LASSERT(!hlist_unhashed(hnode));
662
663         read_lock(&lh->lh_rwlock);
664
665         i = lh_hash(lh, old_key, lh->lh_cur_size - 1);
666         old_lhb = &lh->lh_buckets[i];
667         LASSERT(i < lh->lh_cur_size);
668
669         j = lh_hash(lh, new_key, lh->lh_cur_size - 1);
670         new_lhb = &lh->lh_buckets[j];
671         LASSERT(j < lh->lh_cur_size);
672
673         write_lock(&old_lhb->lhb_rwlock);
674         write_lock(&new_lhb->lhb_rwlock);
675
676         /*
677          * Migrate item between hash buckets without calling
678          * the lh_get() and lh_put() callback functions.
679          */
680         hlist_del(hnode);
681         LASSERT(atomic_read(&old_lhb->lhb_count) > 0);
682         atomic_dec(&old_lhb->lhb_count);
683         hlist_add_head(hnode, &(new_lhb->lhb_head));
684         atomic_inc(&new_lhb->lhb_count);
685
686         write_unlock(&new_lhb->lhb_rwlock);
687         write_unlock(&old_lhb->lhb_rwlock);
688         read_unlock(&lh->lh_rwlock);
689
690         EXIT;
691 }
692 EXPORT_SYMBOL(lustre_hash_rehash_key);
693
694 int lustre_hash_debug_header(char *str, int size)
695 {
696         return snprintf(str, size,
697                  "%-36s%6s%6s%6s%6s%6s%6s%6s%7s%6s%s\n",
698                  "name", "cur", "min", "max", "theta", "t-min", "t-max",
699                  "flags", "rehash", "count", " distribution");
700 }
701 EXPORT_SYMBOL(lustre_hash_debug_header);
702
703 int lustre_hash_debug_str(lustre_hash_t *lh, char *str, int size)
704 {
705         lustre_hash_bucket_t  *lhb;
706         int                    theta;
707         int                    i;
708         int                    c = 0;
709         int                    dist[8] = { 0, };
710
711         if (str == NULL || size == 0)
712                 return 0;
713
714         read_lock(&lh->lh_rwlock);
715         theta = __lustre_hash_theta(lh);
716
717         c += snprintf(str + c, size - c, "%-36s ",lh->lh_name);
718         c += snprintf(str + c, size - c, "%5d ",  lh->lh_cur_size);
719         c += snprintf(str + c, size - c, "%5d ",  lh->lh_min_size);
720         c += snprintf(str + c, size - c, "%5d ",  lh->lh_max_size);
721         c += snprintf(str + c, size - c, "%d.%03d ",
722                       theta / 1000, theta % 1000);
723         c += snprintf(str + c, size - c, "%d.%03d ",
724                       lh->lh_min_theta / 1000, lh->lh_min_theta % 1000);
725         c += snprintf(str + c, size - c, "%d.%03d ",
726                       lh->lh_max_theta / 1000, lh->lh_max_theta % 1000);
727         c += snprintf(str + c, size - c, " 0x%02x ", lh->lh_flags);
728         c += snprintf(str + c, size - c, "%6d ",
729                       atomic_read(&lh->lh_rehash_count));
730         c += snprintf(str + c, size - c, "%5d ",
731                       atomic_read(&lh->lh_count));
732
733         /*
734          * The distribution is a summary of the chained hash depth in
735          * each of the lustre hash buckets.  Each buckets lhb_count is
736          * divided by the hash theta value and used to generate a
737          * histogram of the hash distribution.  A uniform hash will
738          * result in all hash buckets being close to the average thus
739          * only the first few entries in the histogram will be non-zero.
740          * If you hash function results in a non-uniform hash the will
741          * be observable by outlier bucks in the distribution histogram.
742          *
743          * Uniform hash distribution:      128/128/0/0/0/0/0/0
744          * Non-Uniform hash distribution:  128/125/0/0/0/0/2/1
745          */
746         lh_for_each_bucket(lh, lhb, i)
747                 dist[MIN(__fls(atomic_read(&lhb->lhb_count)/MAX(theta,1)),7)]++;
748
749         for (i = 0; i < 8; i++)
750                 c += snprintf(str + c, size - c, "%d%c",  dist[i],
751                               (i == 7) ? '\n' : '/');
752
753         read_unlock(&lh->lh_rwlock);
754
755         return c;
756 }
757 EXPORT_SYMBOL(lustre_hash_debug_str);