Whamcloud - gitweb
lu_object: return back old hash: hash_long() proved to be a disadvantage.
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 /* nr_free_pages() */
39 #include <linux/swap.h>
40 /* hash_long() */
41 #include <linux/hash.h>
42 #include <obd_support.h>
43 #include <lustre_disk.h>
44 #include <lustre_fid.h>
45 #include <lu_object.h>
46 #include <libcfs/list.h>
47
48 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
49
50 /*
51  * Decrease reference counter on object. If last reference is freed, return
52  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
53  * case, free object immediately.
54  */
55 void lu_object_put(const struct lu_env *env, struct lu_object *o)
56 {
57         struct lu_object_header *top;
58         struct lu_site          *site;
59         struct lu_object        *orig;
60         int                      kill_it;
61
62         top = o->lo_header;
63         site = o->lo_dev->ld_site;
64         orig = o;
65         kill_it = 0;
66         spin_lock(&site->ls_guard);
67         if (-- top->loh_ref == 0) {
68                 /*
69                  * When last reference is released, iterate over object
70                  * layers, and notify them that object is no longer busy.
71                  */
72                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
73                         if (o->lo_ops->loo_object_release != NULL)
74                                 o->lo_ops->loo_object_release(env, o);
75                 }
76                 -- site->ls_busy;
77                 if (lu_object_is_dying(top)) {
78                         /*
79                          * If object is dying (will not be cached), removed it
80                          * from hash table and LRU.
81                          *
82                          * This is done with hash table and LRU lists
83                          * locked. As the only way to acquire first reference
84                          * to previously unreferenced object is through
85                          * hash-table lookup (lu_object_find()), or LRU
86                          * scanning (lu_site_purge()), that are done under
87                          * hash-table and LRU lock, no race with concurrent
88                          * object lookup is possible and we can safely destroy
89                          * object below.
90                          */
91                         hlist_del_init(&top->loh_hash);
92                         list_del_init(&top->loh_lru);
93                         -- site->ls_total;
94                         kill_it = 1;
95                 }
96         }
97         spin_unlock(&site->ls_guard);
98         if (kill_it)
99                 /*
100                  * Object was already removed from hash and lru above, can
101                  * kill it.
102                  */
103                 lu_object_free(env, orig);
104 }
105 EXPORT_SYMBOL(lu_object_put);
106
107 /*
108  * Allocate new object.
109  *
110  * This follows object creation protocol, described in the comment within
111  * struct lu_device_operations definition.
112  */
113 static struct lu_object *lu_object_alloc(const struct lu_env *env,
114                                          struct lu_site *s,
115                                          const struct lu_fid *f)
116 {
117         struct lu_object *scan;
118         struct lu_object *top;
119         struct list_head *layers;
120         int clean;
121         int result;
122
123         /*
124          * Create top-level object slice. This will also create
125          * lu_object_header.
126          */
127         top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
128                                                       NULL, s->ls_top_dev);
129         if (IS_ERR(top))
130                 RETURN(top);
131         /*
132          * This is the only place where object fid is assigned. It's constant
133          * after this point.
134          */
135         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
136         top->lo_header->loh_fid  = *f;
137         layers = &top->lo_header->loh_layers;
138         do {
139                 /*
140                  * Call ->loo_object_init() repeatedly, until no more new
141                  * object slices are created.
142                  */
143                 clean = 1;
144                 list_for_each_entry(scan, layers, lo_linkage) {
145                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
146                                 continue;
147                         clean = 0;
148                         scan->lo_header = top->lo_header;
149                         result = scan->lo_ops->loo_object_init(env, scan);
150                         if (result != 0) {
151                                 lu_object_free(env, top);
152                                 RETURN(ERR_PTR(result));
153                         }
154                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
155                 }
156         } while (!clean);
157
158         list_for_each_entry_reverse(scan, layers, lo_linkage) {
159                 if (scan->lo_ops->loo_object_start != NULL) {
160                         result = scan->lo_ops->loo_object_start(env, scan);
161                         if (result != 0) {
162                                 lu_object_free(env, top);
163                                 RETURN(ERR_PTR(result));
164                         }
165                 }
166         }
167
168         s->ls_stats.s_created ++;
169         RETURN(top);
170 }
171
172 /*
173  * Free object.
174  */
175 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
176 {
177         struct list_head splice;
178         struct lu_object *scan;
179
180         /*
181          * First call ->loo_object_delete() method to release all resources.
182          */
183         list_for_each_entry_reverse(scan,
184                                     &o->lo_header->loh_layers, lo_linkage) {
185                 if (scan->lo_ops->loo_object_delete != NULL)
186                         scan->lo_ops->loo_object_delete(env, scan);
187         }
188
189         /*
190          * Then, splice object layers into stand-alone list, and call
191          * ->loo_object_free() on all layers to free memory. Splice is
192          * necessary, because lu_object_header is freed together with the
193          * top-level slice.
194          */
195         INIT_LIST_HEAD(&splice);
196         list_splice_init(&o->lo_header->loh_layers, &splice);
197         while (!list_empty(&splice)) {
198                 o = container_of0(splice.next, struct lu_object, lo_linkage);
199                 list_del_init(&o->lo_linkage);
200                 LASSERT(o->lo_ops->loo_object_free != NULL);
201                 o->lo_ops->loo_object_free(env, o);
202         }
203 }
204
205 /*
206  * Free @nr objects from the cold end of the site LRU list.
207  */
208 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
209 {
210         struct list_head         dispose;
211         struct lu_object_header *h;
212         struct lu_object_header *temp;
213
214         INIT_LIST_HEAD(&dispose);
215         /*
216          * Under LRU list lock, scan LRU list and move unreferenced objects to
217          * the dispose list, removing them from LRU and hash table.
218          */
219         spin_lock(&s->ls_guard);
220         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
221                 /*
222                  * Objects are sorted in lru order, and "busy" objects (ones
223                  * with h->loh_ref > 0) naturally tend to live near hot end
224                  * that we scan last. Unfortunately, sites usually have small
225                  * (less then ten) number of busy yet rarely accessed objects
226                  * (some global objects, accessed directly through pointers,
227                  * bypassing hash table). Currently algorithm scans them over
228                  * and over again. Probably we should move busy objects out of
229                  * LRU, or we can live with that.
230                  */
231                 if (nr-- == 0)
232                         break;
233                 if (h->loh_ref > 0)
234                         continue;
235                 hlist_del_init(&h->loh_hash);
236                 list_move(&h->loh_lru, &dispose);
237                 s->ls_total --;
238         }
239         spin_unlock(&s->ls_guard);
240         /*
241          * Free everything on the dispose list. This is safe against races due
242          * to the reasons described in lu_object_put().
243          */
244         while (!list_empty(&dispose)) {
245                 h = container_of0(dispose.next,
246                                  struct lu_object_header, loh_lru);
247                 list_del_init(&h->loh_lru);
248                 lu_object_free(env, lu_object_top(h));
249                 s->ls_stats.s_lru_purged ++;
250         }
251         return nr;
252 }
253 EXPORT_SYMBOL(lu_site_purge);
254
255 /*
256  * Object printing.
257  *
258  * Code below has to jump through certain loops to output object description
259  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
260  * composes object description from strings that are parts of _lines_ of
261  * output (i.e., strings that are not terminated by newline). This doesn't fit
262  * very well into libcfs_debug_msg() interface that assumes that each message
263  * supplied to it is a self-contained output line.
264  *
265  * To work around this, strings are collected in a temporary buffer
266  * (implemented as a value of lu_cdebug_key key), until terminating newline
267  * character is detected.
268  *
269  */
270
271 enum {
272         /*
273          * Maximal line size.
274          *
275          * XXX overflow is not handled correctly.
276          */
277         LU_CDEBUG_LINE = 256
278 };
279
280 struct lu_cdebug_data {
281         /*
282          * Temporary buffer.
283          */
284         char lck_area[LU_CDEBUG_LINE];
285 };
286
287 static void *lu_cdebug_key_init(const struct lu_context *ctx,
288                                 struct lu_context_key *key)
289 {
290         struct lu_cdebug_data *value;
291
292         OBD_ALLOC_PTR(value);
293         if (value == NULL)
294                 value = ERR_PTR(-ENOMEM);
295         return value;
296 }
297
298 static void lu_cdebug_key_fini(const struct lu_context *ctx,
299                                struct lu_context_key *key, void *data)
300 {
301         struct lu_cdebug_data *value = data;
302         OBD_FREE_PTR(value);
303 }
304
305 /*
306  * Key, holding temporary buffer. This key is registered very early by
307  * lu_global_init().
308  */
309 static struct lu_context_key lu_cdebug_key = {
310         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
311         .lct_init = lu_cdebug_key_init,
312         .lct_fini = lu_cdebug_key_fini
313 };
314
315 /*
316  * Printer function emitting messages through libcfs_debug_msg().
317  */
318 int lu_cdebug_printer(const struct lu_env *env,
319                       void *cookie, const char *format, ...)
320 {
321         struct lu_cdebug_print_info *info = cookie;
322         struct lu_cdebug_data       *key;
323         int used;
324         int complete;
325         va_list args;
326
327         va_start(args, format);
328
329         key = lu_context_key_get(&env->le_ctx, &lu_cdebug_key);
330         LASSERT(key != NULL);
331
332         used = strlen(key->lck_area);
333         complete = format[strlen(format) - 1] == '\n';
334         /*
335          * Append new chunk to the buffer.
336          */
337         vsnprintf(key->lck_area + used,
338                   ARRAY_SIZE(key->lck_area) - used, format, args);
339         if (complete) {
340                 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
341                                  (char *)info->lpi_file, info->lpi_fn,
342                                  info->lpi_line, "%s", key->lck_area);
343                 key->lck_area[0] = 0;
344         }
345         va_end(args);
346         return 0;
347 }
348 EXPORT_SYMBOL(lu_cdebug_printer);
349
350 /*
351  * Print object header.
352  */
353 static void lu_object_header_print(const struct lu_env *env,
354                                    void *cookie, lu_printer_t printer,
355                                    const struct lu_object_header *hdr)
356 {
357         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s]",
358                    hdr, hdr->loh_flags, hdr->loh_ref, PFID(&hdr->loh_fid),
359                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
360                    list_empty(&hdr->loh_lru) ? "" : " lru");
361 }
362
363 /*
364  * Print human readable representation of the @o to the @printer.
365  */
366 void lu_object_print(const struct lu_env *env, void *cookie,
367                      lu_printer_t printer, const struct lu_object *o)
368 {
369         static const char ruler[] = "........................................";
370         struct lu_object_header *top;
371         int depth;
372
373         top = o->lo_header;
374         lu_object_header_print(env, cookie, printer, top);
375         (*printer)(env, cookie, "\n");
376         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
377                 depth = o->lo_depth + 4;
378                 LASSERT(o->lo_ops->loo_object_print != NULL);
379                 /*
380                  * print `.' @depth times.
381                  */
382                 (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
383                 o->lo_ops->loo_object_print(env, cookie, printer, o);
384                 (*printer)(env, cookie, "\n");
385         }
386 }
387 EXPORT_SYMBOL(lu_object_print);
388
389 /*
390  * Check object consistency.
391  */
392 int lu_object_invariant(const struct lu_object *o)
393 {
394         struct lu_object_header *top;
395
396         top = o->lo_header;
397         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
398                 if (o->lo_ops->loo_object_invariant != NULL &&
399                     !o->lo_ops->loo_object_invariant(o))
400                         return 0;
401         }
402         return 1;
403 }
404 EXPORT_SYMBOL(lu_object_invariant);
405
406 static struct lu_object *htable_lookup(struct lu_site *s,
407                                        const struct hlist_head *bucket,
408                                        const struct lu_fid *f)
409 {
410         struct lu_object_header *h;
411         struct hlist_node *scan;
412
413         hlist_for_each_entry(h, scan, bucket, loh_hash) {
414                 s->ls_stats.s_cache_check ++;
415                 if (lu_fid_eq(&h->loh_fid, f) && !lu_object_is_dying(h)) {
416                         /* bump reference count... */
417                         if (h->loh_ref ++ == 0)
418                                 ++ s->ls_busy;
419                         /* and move to the head of the LRU */
420                         list_move_tail(&h->loh_lru, &s->ls_lru);
421                         s->ls_stats.s_cache_hit ++;
422                         return lu_object_top(h);
423                 }
424         }
425         s->ls_stats.s_cache_miss ++;
426         return NULL;
427 }
428
429 /*
430  * Hash-table parameters. Initialized in lu_global_init(). This assumes single
431  * site per node.
432  */
433 static int lu_site_htable_bits;
434 static int lu_site_htable_size;
435 static int lu_site_htable_mask;
436
437 static __u32 fid_hash(const struct lu_fid *f)
438 {
439         /* all objects with same id and different versions will belong to same
440          * collisions list. */
441 #if 1
442         return (fid_seq(f) - 1) * LUSTRE_SEQ_MAX_WIDTH + fid_oid(f);
443 #else
444         unsigned long hash;
445         __u64 seq;
446
447         seq  = fid_seq(f);
448         hash = seq ^ fid_oid(f);
449         if (sizeof hash != sizeof seq)
450                 hash ^= seq >> 32;
451         return hash_long(hash, lu_site_htable_bits);
452 #endif
453 }
454
455 /*
456  * Search cache for an object with the fid @f. If such object is found, return
457  * it. Otherwise, create new object, insert it into cache and return it. In
458  * any case, additional reference is acquired on the returned object.
459  */
460 struct lu_object *lu_object_find(const struct lu_env *env,
461                                  struct lu_site *s, const struct lu_fid *f)
462 {
463         struct lu_object  *o;
464         struct lu_object  *shadow;
465         struct hlist_head *bucket;
466
467         /*
468          * This uses standard index maintenance protocol:
469          *
470          *     - search index under lock, and return object if found;
471          *     - otherwise, unlock index, allocate new object;
472          *     - lock index and search again;
473          *     - if nothing is found (usual case), insert newly created
474          *       object into index;
475          *     - otherwise (race: other thread inserted object), free
476          *       object just allocated.
477          *     - unlock index;
478          *     - return object.
479          */
480
481         bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
482         spin_lock(&s->ls_guard);
483         o = htable_lookup(s, bucket, f);
484
485         spin_unlock(&s->ls_guard);
486         if (o != NULL)
487                 return o;
488
489         /*
490          * Allocate new object. This may result in rather complicated
491          * operations, including fld queries, inode loading, etc.
492          */
493         o = lu_object_alloc(env, s, f);
494         if (IS_ERR(o))
495                 return o;
496
497         LASSERT(lu_fid_eq(lu_object_fid(o), f));
498
499         spin_lock(&s->ls_guard);
500         shadow = htable_lookup(s, bucket, f);
501         if (shadow == NULL) {
502                 hlist_add_head(&o->lo_header->loh_hash, bucket);
503                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
504                 ++ s->ls_busy;
505                 ++ s->ls_total;
506                 shadow = o;
507                 o = NULL;
508         } else
509                 s->ls_stats.s_cache_race ++;
510         spin_unlock(&s->ls_guard);
511         if (o != NULL)
512                 lu_object_free(env, o);
513         return shadow;
514 }
515 EXPORT_SYMBOL(lu_object_find);
516
517 /*
518  * Global list of all sites on this node
519  */
520 static LIST_HEAD(lu_sites);
521 static DECLARE_MUTEX(lu_sites_guard);
522
523 /*
524  * Global environment used by site shrinker.
525  */
526 static struct lu_env lu_shrink_env;
527
528 /*
529  * Print all objects in @s.
530  */
531 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
532                    lu_printer_t printer)
533 {
534         int i;
535
536         for (i = 0; i < lu_site_htable_size; ++i) {
537                 struct lu_object_header *h;
538                 struct hlist_node       *scan;
539
540                 spin_lock(&s->ls_guard);
541                 hlist_for_each_entry(h, scan, &s->ls_hash[i], loh_hash) {
542
543                         if (!list_empty(&h->loh_layers)) {
544                                 const struct lu_object *obj;
545
546                                 obj = lu_object_top(h);
547                                 lu_object_print(env, cookie, printer, obj);
548                         } else
549                                 lu_object_header_print(env, cookie, printer, h);
550                 }
551                 spin_unlock(&s->ls_guard);
552         }
553 }
554 EXPORT_SYMBOL(lu_site_print);
555
556 /*
557  * Initialize site @s, with @d as the top level device.
558  */
559 int lu_site_init(struct lu_site *s, struct lu_device *top)
560 {
561         int result;
562         ENTRY;
563
564         memset(s, 0, sizeof *s);
565         spin_lock_init(&s->ls_guard);
566         CFS_INIT_LIST_HEAD(&s->ls_lru);
567         CFS_INIT_LIST_HEAD(&s->ls_linkage);
568         s->ls_top_dev = top;
569         top->ld_site = s;
570         lu_device_get(top);
571         /*
572          * XXX nikita: fixed size hash-table.
573          */
574         s->ls_hash_mask = lu_site_htable_mask;
575         OBD_ALLOC(s->ls_hash, lu_site_htable_size * sizeof s->ls_hash[0]);
576         if (s->ls_hash != NULL) {
577                 int i;
578                 for (i = 0; i < lu_site_htable_size; i++)
579                         INIT_HLIST_HEAD(&s->ls_hash[i]);
580                 result = 0;
581         } else
582                 result = -ENOMEM;
583
584         RETURN(result);
585 }
586 EXPORT_SYMBOL(lu_site_init);
587
588 /*
589  * Finalize @s and release its resources.
590  */
591 void lu_site_fini(struct lu_site *s)
592 {
593         LASSERT(list_empty(&s->ls_lru));
594         LASSERT(s->ls_total == 0);
595         LASSERT(s->ls_busy == 0);
596
597         down(&lu_sites_guard);
598         list_del_init(&s->ls_linkage);
599         up(&lu_sites_guard);
600
601         if (s->ls_hash != NULL) {
602                 int i;
603                 for (i = 0; i < lu_site_htable_size; i++)
604                         LASSERT(hlist_empty(&s->ls_hash[i]));
605                 OBD_FREE(s->ls_hash,
606                          lu_site_htable_size * sizeof s->ls_hash[0]);
607                 s->ls_hash = NULL;
608         }
609         if (s->ls_top_dev != NULL) {
610                 s->ls_top_dev->ld_site = NULL;
611                 lu_device_put(s->ls_top_dev);
612                 s->ls_top_dev = NULL;
613         }
614 }
615 EXPORT_SYMBOL(lu_site_fini);
616
617 /*
618  * Called when initialization of stack for this site is completed.
619  */
620 int lu_site_init_finish(struct lu_site *s)
621 {
622         int result;
623         down(&lu_sites_guard);
624         result = lu_context_refill(&lu_shrink_env.le_ctx);
625         if (result == 0)
626                 list_add(&s->ls_linkage, &lu_sites);
627         up(&lu_sites_guard);
628         return result;
629 }
630 EXPORT_SYMBOL(lu_site_init_finish);
631
632 /*
633  * Acquire additional reference on device @d
634  */
635 void lu_device_get(struct lu_device *d)
636 {
637         atomic_inc(&d->ld_ref);
638 }
639 EXPORT_SYMBOL(lu_device_get);
640
641 /*
642  * Release reference on device @d.
643  */
644 void lu_device_put(struct lu_device *d)
645 {
646         atomic_dec(&d->ld_ref);
647 }
648 EXPORT_SYMBOL(lu_device_put);
649
650 /*
651  * Initialize device @d of type @t.
652  */
653 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
654 {
655         memset(d, 0, sizeof *d);
656         atomic_set(&d->ld_ref, 0);
657         d->ld_type = t;
658         return 0;
659 }
660 EXPORT_SYMBOL(lu_device_init);
661
662 /*
663  * Finalize device @d.
664  */
665 void lu_device_fini(struct lu_device *d)
666 {
667         if (d->ld_obd != NULL)
668                 /* finish lprocfs */
669                 lprocfs_obd_cleanup(d->ld_obd);
670
671         LASSERTF(atomic_read(&d->ld_ref) == 0,
672                  "Refcount is %u\n", atomic_read(&d->ld_ref));
673 }
674 EXPORT_SYMBOL(lu_device_fini);
675
676 /*
677  * Initialize object @o that is part of compound object @h and was created by
678  * device @d.
679  */
680 int lu_object_init(struct lu_object *o,
681                    struct lu_object_header *h, struct lu_device *d)
682 {
683         memset(o, 0, sizeof *o);
684         o->lo_header = h;
685         o->lo_dev    = d;
686         lu_device_get(d);
687         CFS_INIT_LIST_HEAD(&o->lo_linkage);
688         return 0;
689 }
690 EXPORT_SYMBOL(lu_object_init);
691
692 /*
693  * Finalize object and release its resources.
694  */
695 void lu_object_fini(struct lu_object *o)
696 {
697         LASSERT(list_empty(&o->lo_linkage));
698
699         if (o->lo_dev != NULL) {
700                 lu_device_put(o->lo_dev);
701                 o->lo_dev = NULL;
702         }
703 }
704 EXPORT_SYMBOL(lu_object_fini);
705
706 /*
707  * Add object @o as first layer of compound object @h
708  *
709  * This is typically called by the ->ldo_object_alloc() method of top-level
710  * device.
711  */
712 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
713 {
714         list_move(&o->lo_linkage, &h->loh_layers);
715 }
716 EXPORT_SYMBOL(lu_object_add_top);
717
718 /*
719  * Add object @o as a layer of compound object, going after @before.1
720  *
721  * This is typically called by the ->ldo_object_alloc() method of
722  * @before->lo_dev.
723  */
724 void lu_object_add(struct lu_object *before, struct lu_object *o)
725 {
726         list_move(&o->lo_linkage, &before->lo_linkage);
727 }
728 EXPORT_SYMBOL(lu_object_add);
729
730 /*
731  * Initialize compound object.
732  */
733 int lu_object_header_init(struct lu_object_header *h)
734 {
735         memset(h, 0, sizeof *h);
736         h->loh_ref = 1;
737         INIT_HLIST_NODE(&h->loh_hash);
738         CFS_INIT_LIST_HEAD(&h->loh_lru);
739         CFS_INIT_LIST_HEAD(&h->loh_layers);
740         return 0;
741 }
742 EXPORT_SYMBOL(lu_object_header_init);
743
744 /*
745  * Finalize compound object.
746  */
747 void lu_object_header_fini(struct lu_object_header *h)
748 {
749         LASSERT(list_empty(&h->loh_layers));
750         LASSERT(list_empty(&h->loh_lru));
751         LASSERT(hlist_unhashed(&h->loh_hash));
752 }
753 EXPORT_SYMBOL(lu_object_header_fini);
754
755 /*
756  * Given a compound object, find its slice, corresponding to the device type
757  * @dtype.
758  */
759 struct lu_object *lu_object_locate(struct lu_object_header *h,
760                                    struct lu_device_type *dtype)
761 {
762         struct lu_object *o;
763
764         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
765                 if (o->lo_dev->ld_type == dtype)
766                         return o;
767         }
768         return NULL;
769 }
770 EXPORT_SYMBOL(lu_object_locate);
771
772 enum {
773         /*
774          * Maximal number of tld slots.
775          */
776         LU_CONTEXT_KEY_NR = 16
777 };
778
779 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
780
781 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
782
783 /*
784  * Register new key.
785  */
786 int lu_context_key_register(struct lu_context_key *key)
787 {
788         int result;
789         int i;
790
791         LASSERT(key->lct_init != NULL);
792         LASSERT(key->lct_fini != NULL);
793         LASSERT(key->lct_tags != 0);
794
795         result = -ENFILE;
796         spin_lock(&lu_keys_guard);
797         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
798                 if (lu_keys[i] == NULL) {
799                         key->lct_index = i;
800                         atomic_set(&key->lct_used, 1);
801                         lu_keys[i] = key;
802                         result = 0;
803                         break;
804                 }
805         }
806         spin_unlock(&lu_keys_guard);
807         return result;
808 }
809 EXPORT_SYMBOL(lu_context_key_register);
810
811 static void key_fini(struct lu_context *ctx, int index)
812 {
813         if (ctx->lc_value[index] != NULL) {
814                 struct lu_context_key *key;
815
816                 key = lu_keys[index];
817                 LASSERT(key != NULL);
818                 LASSERT(key->lct_fini != NULL);
819                 LASSERT(atomic_read(&key->lct_used) > 1);
820
821                 key->lct_fini(ctx, key, ctx->lc_value[index]);
822                 atomic_dec(&key->lct_used);
823                 ctx->lc_value[index] = NULL;
824         }
825 }
826
827 /*
828  * Deregister key.
829  */
830 void lu_context_key_degister(struct lu_context_key *key)
831 {
832         LASSERT(atomic_read(&key->lct_used) >= 1);
833         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
834
835         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
836
837         if (atomic_read(&key->lct_used) > 1)
838                 CERROR("key has instances.\n");
839         spin_lock(&lu_keys_guard);
840         lu_keys[key->lct_index] = NULL;
841         spin_unlock(&lu_keys_guard);
842 }
843 EXPORT_SYMBOL(lu_context_key_degister);
844
845 /*
846  * Return value associated with key @key in context @ctx.
847  */
848 void *lu_context_key_get(const struct lu_context *ctx,
849                          struct lu_context_key *key)
850 {
851         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
852         return ctx->lc_value[key->lct_index];
853 }
854 EXPORT_SYMBOL(lu_context_key_get);
855
856 static void keys_fini(struct lu_context *ctx)
857 {
858         int i;
859
860         if (ctx->lc_value != NULL) {
861                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
862                         key_fini(ctx, i);
863                 OBD_FREE(ctx->lc_value,
864                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
865                 ctx->lc_value = NULL;
866         }
867 }
868
869 static int keys_fill(const struct lu_context *ctx)
870 {
871         int i;
872
873         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
874                 struct lu_context_key *key;
875
876                 key = lu_keys[i];
877                 if (ctx->lc_value[i] == NULL &&
878                     key != NULL && key->lct_tags & ctx->lc_tags) {
879                         void *value;
880
881                         LASSERT(key->lct_init != NULL);
882                         LASSERT(key->lct_index == i);
883
884                         value = key->lct_init(ctx, key);
885                         if (IS_ERR(value))
886                                 return PTR_ERR(value);
887                         atomic_inc(&key->lct_used);
888                         ctx->lc_value[i] = value;
889                 }
890         }
891         return 0;
892 }
893
894 static int keys_init(struct lu_context *ctx)
895 {
896         int result;
897
898         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
899         if (ctx->lc_value != NULL)
900                 result = keys_fill(ctx);
901         else
902                 result = -ENOMEM;
903
904         if (result != 0)
905                 keys_fini(ctx);
906         return result;
907 }
908
909 /*
910  * Initialize context data-structure. Create values for all keys.
911  */
912 int lu_context_init(struct lu_context *ctx, __u32 tags)
913 {
914         memset(ctx, 0, sizeof *ctx);
915         ctx->lc_tags = tags;
916         return keys_init(ctx);
917 }
918 EXPORT_SYMBOL(lu_context_init);
919
920 /*
921  * Finalize context data-structure. Destroy key values.
922  */
923 void lu_context_fini(struct lu_context *ctx)
924 {
925         keys_fini(ctx);
926 }
927 EXPORT_SYMBOL(lu_context_fini);
928
929 /*
930  * Called before entering context.
931  */
932 void lu_context_enter(struct lu_context *ctx)
933 {
934 }
935 EXPORT_SYMBOL(lu_context_enter);
936
937 /*
938  * Called after exiting from @ctx
939  */
940 void lu_context_exit(struct lu_context *ctx)
941 {
942         int i;
943
944         if (ctx->lc_value != NULL) {
945                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
946                         if (ctx->lc_value[i] != NULL) {
947                                 struct lu_context_key *key;
948
949                                 key = lu_keys[i];
950                                 LASSERT(key != NULL);
951                                 if (key->lct_exit != NULL)
952                                         key->lct_exit(ctx,
953                                                       key, ctx->lc_value[i]);
954                         }
955                 }
956         }
957 }
958 EXPORT_SYMBOL(lu_context_exit);
959
960 /*
961  * Allocate for context all missing keys that were registered after context
962  * creation.
963  */
964 int lu_context_refill(const struct lu_context *ctx)
965 {
966         LASSERT(ctx->lc_value != NULL);
967         return keys_fill(ctx);
968 }
969 EXPORT_SYMBOL(lu_context_refill);
970
971 int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags)
972 {
973         int result;
974
975         env->le_ses = ses;
976         result = lu_context_init(&env->le_ctx, tags);
977         if (result == 0)
978                 lu_context_enter(&env->le_ctx);
979         return result;
980 }
981 EXPORT_SYMBOL(lu_env_init);
982
983 void lu_env_fini(struct lu_env *env)
984 {
985         lu_context_exit(&env->le_ctx);
986         lu_context_fini(&env->le_ctx);
987         env->le_ses = NULL;
988 }
989 EXPORT_SYMBOL(lu_env_fini);
990
991 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
992 {
993         struct lu_site *s;
994         struct lu_site *tmp;
995         int cached = 0;
996         int remain = nr;
997         LIST_HEAD(splice);
998
999         if (nr != 0 && !(gfp_mask & __GFP_FS))
1000                 return -1;
1001
1002         down(&lu_sites_guard);
1003         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1004                 if (nr != 0) {
1005                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1006                         /*
1007                          * Move just shrunk site to the tail of site list to
1008                          * assure shrinking fairness.
1009                          */
1010                         list_move_tail(&s->ls_linkage, &splice);
1011                 }
1012                 spin_lock(&s->ls_guard);
1013                 cached += s->ls_total - s->ls_busy;
1014                 spin_unlock(&s->ls_guard);
1015                 if (remain <= 0)
1016                         break;
1017         }
1018         list_splice(&splice, lu_sites.prev);
1019         up(&lu_sites_guard);
1020         return cached;
1021 }
1022
1023 static struct shrinker *lu_site_shrinker = NULL;
1024
1025 enum {
1026         LU_CACHE_PERCENT   = 30,
1027         LU_CACHE_CAP_PAGES = 8
1028 };
1029
1030 /*
1031  * Initialization of global lu_* data.
1032  */
1033 int lu_global_init(void)
1034 {
1035         int result;
1036         unsigned long cache_size;
1037
1038         /*
1039          * Calculate hash table size, assuming that we want reasonable
1040          * performance when 30% of available memory is occupied by cache of
1041          * lu_objects.
1042          *
1043          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
1044          */
1045         cache_size = min(nr_free_pages() / 100 *
1046                          LU_CACHE_PERCENT * (CFS_PAGE_SIZE / 1024),
1047                          /*
1048                           * And cap it at some reasonable upper bound (total
1049                           * hash table size is 8 pages) as to avoid high order
1050                           * allocations, that are unlikely to ever succeed.
1051                           */
1052                          LU_CACHE_CAP_PAGES * CFS_PAGE_SIZE /
1053                          sizeof(struct hlist_head));
1054
1055         for (lu_site_htable_bits = 1;
1056              (1 << lu_site_htable_bits) <= cache_size; ++lu_site_htable_bits);
1057
1058         lu_site_htable_size = 1 << lu_site_htable_bits;
1059         lu_site_htable_mask = lu_site_htable_size - 1;
1060
1061         result = lu_context_key_register(&lu_cdebug_key);
1062         if (result == 0) {
1063                 /*
1064                  * At this level, we don't know what tags are needed, so
1065                  * allocate them conservatively. This should not be too bad,
1066                  * because this environment is global.
1067                  */
1068                 down(&lu_sites_guard);
1069                 result = lu_env_init(&lu_shrink_env, NULL, LCT_SHRINKER);
1070                 up(&lu_sites_guard);
1071                 if (result == 0)
1072                         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS,
1073                                                         lu_cache_shrink);
1074         }
1075         return result;
1076 }
1077
1078 /*
1079  * Dual to lu_global_init().
1080  */
1081 void lu_global_fini(void)
1082 {
1083         if (lu_site_shrinker != NULL) {
1084                 remove_shrinker(lu_site_shrinker);
1085                 lu_site_shrinker = NULL;
1086         }
1087
1088         lu_context_key_degister(&lu_cdebug_key);
1089
1090         /*
1091          * Tear shrinker environment down _after_ de-registering
1092          * lu_cdebug_key, because the latter has a value in the former.
1093          */
1094         down(&lu_sites_guard);
1095         lu_env_fini(&lu_shrink_env);
1096         up(&lu_sites_guard);
1097 }
1098
1099 struct lu_buf LU_BUF_NULL = {
1100         .lb_buf = NULL,
1101         .lb_len = 0
1102 };
1103 EXPORT_SYMBOL(LU_BUF_NULL);