Whamcloud - gitweb
8924d25b00efd6b1094daabcacd8e5f47e53d6fe
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 /* nr_free_pages() */
39 #include <linux/swap.h>
40 /* hash_long() */
41 #include <linux/hash.h>
42 #include <obd_support.h>
43 #include <lustre_disk.h>
44 #include <lustre_fid.h>
45 #include <lu_object.h>
46 #include <libcfs/list.h>
47 /* lu_time_global_{init,fini}() */
48 #include <lu_time.h>
49
50 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
51
52 /*
53  * Decrease reference counter on object. If last reference is freed, return
54  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
55  * case, free object immediately.
56  */
57 void lu_object_put(const struct lu_env *env, struct lu_object *o)
58 {
59         struct lu_object_header *top;
60         struct lu_site          *site;
61         struct lu_object        *orig;
62         int                      kill_it;
63
64         top = o->lo_header;
65         site = o->lo_dev->ld_site;
66         orig = o;
67         kill_it = 0;
68         write_lock(&site->ls_guard);
69         if (atomic_dec_and_test(&top->loh_ref)) {
70                 /*
71                  * When last reference is released, iterate over object
72                  * layers, and notify them that object is no longer busy.
73                  */
74                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
75                         if (o->lo_ops->loo_object_release != NULL)
76                                 o->lo_ops->loo_object_release(env, o);
77                 }
78                 -- site->ls_busy;
79                 if (lu_object_is_dying(top)) {
80                         /*
81                          * If object is dying (will not be cached), removed it
82                          * from hash table and LRU.
83                          *
84                          * This is done with hash table and LRU lists
85                          * locked. As the only way to acquire first reference
86                          * to previously unreferenced object is through
87                          * hash-table lookup (lu_object_find()), or LRU
88                          * scanning (lu_site_purge()), that are done under
89                          * hash-table and LRU lock, no race with concurrent
90                          * object lookup is possible and we can safely destroy
91                          * object below.
92                          */
93                         hlist_del_init(&top->loh_hash);
94                         list_del_init(&top->loh_lru);
95                         -- site->ls_total;
96                         kill_it = 1;
97                 }
98         }
99         write_unlock(&site->ls_guard);
100         if (kill_it)
101                 /*
102                  * Object was already removed from hash and lru above, can
103                  * kill it.
104                  */
105                 lu_object_free(env, orig);
106 }
107 EXPORT_SYMBOL(lu_object_put);
108
109 /*
110  * Allocate new object.
111  *
112  * This follows object creation protocol, described in the comment within
113  * struct lu_device_operations definition.
114  */
115 static struct lu_object *lu_object_alloc(const struct lu_env *env,
116                                          struct lu_site *s,
117                                          const struct lu_fid *f)
118 {
119         struct lu_object *scan;
120         struct lu_object *top;
121         struct list_head *layers;
122         int clean;
123         int result;
124         ENTRY;
125
126         /*
127          * Create top-level object slice. This will also create
128          * lu_object_header.
129          */
130         top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
131                                                       NULL, s->ls_top_dev);
132         if (top == NULL)
133                 RETURN(ERR_PTR(-ENOMEM));
134         /*
135          * This is the only place where object fid is assigned. It's constant
136          * after this point.
137          */
138         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
139         top->lo_header->loh_fid  = *f;
140         layers = &top->lo_header->loh_layers;
141         do {
142                 /*
143                  * Call ->loo_object_init() repeatedly, until no more new
144                  * object slices are created.
145                  */
146                 clean = 1;
147                 list_for_each_entry(scan, layers, lo_linkage) {
148                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
149                                 continue;
150                         clean = 0;
151                         scan->lo_header = top->lo_header;
152                         result = scan->lo_ops->loo_object_init(env, scan);
153                         if (result != 0) {
154                                 lu_object_free(env, top);
155                                 RETURN(ERR_PTR(result));
156                         }
157                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
158                 }
159         } while (!clean);
160
161         list_for_each_entry_reverse(scan, layers, lo_linkage) {
162                 if (scan->lo_ops->loo_object_start != NULL) {
163                         result = scan->lo_ops->loo_object_start(env, scan);
164                         if (result != 0) {
165                                 lu_object_free(env, top);
166                                 RETURN(ERR_PTR(result));
167                         }
168                 }
169         }
170
171         s->ls_stats.s_created ++;
172         RETURN(top);
173 }
174
175 /*
176  * Free object.
177  */
178 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
179 {
180         struct list_head splice;
181         struct lu_object *scan;
182
183         /*
184          * First call ->loo_object_delete() method to release all resources.
185          */
186         list_for_each_entry_reverse(scan,
187                                     &o->lo_header->loh_layers, lo_linkage) {
188                 if (scan->lo_ops->loo_object_delete != NULL)
189                         scan->lo_ops->loo_object_delete(env, scan);
190         }
191
192         /*
193          * Then, splice object layers into stand-alone list, and call
194          * ->loo_object_free() on all layers to free memory. Splice is
195          * necessary, because lu_object_header is freed together with the
196          * top-level slice.
197          */
198         CFS_INIT_LIST_HEAD(&splice);
199         list_splice_init(&o->lo_header->loh_layers, &splice);
200         while (!list_empty(&splice)) {
201                 o = container_of0(splice.next, struct lu_object, lo_linkage);
202                 list_del_init(&o->lo_linkage);
203                 LASSERT(o->lo_ops->loo_object_free != NULL);
204                 o->lo_ops->loo_object_free(env, o);
205         }
206 }
207
208 /*
209  * Free @nr objects from the cold end of the site LRU list.
210  */
211 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
212 {
213         struct list_head         dispose;
214         struct lu_object_header *h;
215         struct lu_object_header *temp;
216
217         CFS_INIT_LIST_HEAD(&dispose);
218         /*
219          * Under LRU list lock, scan LRU list and move unreferenced objects to
220          * the dispose list, removing them from LRU and hash table.
221          */
222         write_lock(&s->ls_guard);
223         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
224                 /*
225                  * Objects are sorted in lru order, and "busy" objects (ones
226                  * with h->loh_ref > 0) naturally tend to live near hot end
227                  * that we scan last. Unfortunately, sites usually have small
228                  * (less then ten) number of busy yet rarely accessed objects
229                  * (some global objects, accessed directly through pointers,
230                  * bypassing hash table). Currently algorithm scans them over
231                  * and over again. Probably we should move busy objects out of
232                  * LRU, or we can live with that.
233                  */
234                 if (nr-- == 0)
235                         break;
236                 if (atomic_read(&h->loh_ref) > 0)
237                         continue;
238                 hlist_del_init(&h->loh_hash);
239                 list_move(&h->loh_lru, &dispose);
240                 s->ls_total --;
241         }
242         write_unlock(&s->ls_guard);
243         /*
244          * Free everything on the dispose list. This is safe against races due
245          * to the reasons described in lu_object_put().
246          */
247         while (!list_empty(&dispose)) {
248                 h = container_of0(dispose.next,
249                                  struct lu_object_header, loh_lru);
250                 list_del_init(&h->loh_lru);
251                 lu_object_free(env, lu_object_top(h));
252                 s->ls_stats.s_lru_purged ++;
253         }
254         return nr;
255 }
256 EXPORT_SYMBOL(lu_site_purge);
257
258 /*
259  * Object printing.
260  *
261  * Code below has to jump through certain loops to output object description
262  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
263  * composes object description from strings that are parts of _lines_ of
264  * output (i.e., strings that are not terminated by newline). This doesn't fit
265  * very well into libcfs_debug_msg() interface that assumes that each message
266  * supplied to it is a self-contained output line.
267  *
268  * To work around this, strings are collected in a temporary buffer
269  * (implemented as a value of lu_cdebug_key key), until terminating newline
270  * character is detected.
271  *
272  */
273
274 enum {
275         /*
276          * Maximal line size.
277          *
278          * XXX overflow is not handled correctly.
279          */
280         LU_CDEBUG_LINE = 256
281 };
282
283 struct lu_cdebug_data {
284         /*
285          * Temporary buffer.
286          */
287         char lck_area[LU_CDEBUG_LINE];
288         /*
289          * fid staging area used by dt_store_open().
290          */
291         struct lu_fid_pack lck_pack;
292 };
293
294 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
295 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
296
297 /*
298  * Key, holding temporary buffer. This key is registered very early by
299  * lu_global_init().
300  */
301 struct lu_context_key lu_global_key = {
302         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
303         .lct_init = lu_global_key_init,
304         .lct_fini = lu_global_key_fini
305 };
306
307 /*
308  * Printer function emitting messages through libcfs_debug_msg().
309  */
310 int lu_cdebug_printer(const struct lu_env *env,
311                       void *cookie, const char *format, ...)
312 {
313         struct lu_cdebug_print_info *info = cookie;
314         struct lu_cdebug_data       *key;
315         int used;
316         int complete;
317         va_list args;
318
319         va_start(args, format);
320
321         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
322         LASSERT(key != NULL);
323
324         used = strlen(key->lck_area);
325         complete = format[strlen(format) - 1] == '\n';
326         /*
327          * Append new chunk to the buffer.
328          */
329         vsnprintf(key->lck_area + used,
330                   ARRAY_SIZE(key->lck_area) - used, format, args);
331         if (complete) {
332                 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
333                                  (char *)info->lpi_file, info->lpi_fn,
334                                  info->lpi_line, "%s", key->lck_area);
335                 key->lck_area[0] = 0;
336         }
337         va_end(args);
338         return 0;
339 }
340 EXPORT_SYMBOL(lu_cdebug_printer);
341
342 /*
343  * Print object header.
344  */
345 static void lu_object_header_print(const struct lu_env *env,
346                                    void *cookie, lu_printer_t printer,
347                                    const struct lu_object_header *hdr)
348 {
349         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
350                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
351                    PFID(&hdr->loh_fid),
352                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
353                    list_empty(&hdr->loh_lru) ? "" : " lru",
354                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
355 }
356
357 /*
358  * Print human readable representation of the @o to the @printer.
359  */
360 void lu_object_print(const struct lu_env *env, void *cookie,
361                      lu_printer_t printer, const struct lu_object *o)
362 {
363         static const char ruler[] = "........................................";
364         struct lu_object_header *top;
365         int depth;
366
367         top = o->lo_header;
368         lu_object_header_print(env, cookie, printer, top);
369         (*printer)(env, cookie, "\n");
370         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
371                 depth = o->lo_depth + 4;
372                 LASSERT(o->lo_ops->loo_object_print != NULL);
373                 /*
374                  * print `.' @depth times.
375                  */
376                 (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
377                 o->lo_ops->loo_object_print(env, cookie, printer, o);
378                 (*printer)(env, cookie, "\n");
379         }
380 }
381 EXPORT_SYMBOL(lu_object_print);
382
383 /*
384  * Check object consistency.
385  */
386 int lu_object_invariant(const struct lu_object *o)
387 {
388         struct lu_object_header *top;
389
390         top = o->lo_header;
391         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
392                 if (o->lo_ops->loo_object_invariant != NULL &&
393                     !o->lo_ops->loo_object_invariant(o))
394                         return 0;
395         }
396         return 1;
397 }
398 EXPORT_SYMBOL(lu_object_invariant);
399
400 static struct lu_object *htable_lookup(struct lu_site *s,
401                                        const struct hlist_head *bucket,
402                                        const struct lu_fid *f)
403 {
404         struct lu_object_header *h;
405         struct hlist_node *scan;
406
407         hlist_for_each_entry(h, scan, bucket, loh_hash) {
408                 s->ls_stats.s_cache_check ++;
409                 if (likely(lu_fid_eq(&h->loh_fid, f) &&
410                            !lu_object_is_dying(h))) {
411                         /* bump reference count... */
412                         if (atomic_add_return(1, &h->loh_ref) == 1)
413                                 ++ s->ls_busy;
414                         /* and move to the head of the LRU */
415                         /*
416                          * XXX temporary disable this to measure effects of
417                          * read-write locking.
418                          */
419                         /* list_move_tail(&h->loh_lru, &s->ls_lru); */
420                         s->ls_stats.s_cache_hit ++;
421                         return lu_object_top(h);
422                 }
423         }
424         s->ls_stats.s_cache_miss ++;
425         return NULL;
426 }
427
428 static __u32 fid_hash(const struct lu_fid *f, int bits)
429 {
430         /* all objects with same id and different versions will belong to same
431          * collisions list. */
432         return hash_long(fid_flatten(f), bits);
433 }
434
435 /*
436  * Search cache for an object with the fid @f. If such object is found, return
437  * it. Otherwise, create new object, insert it into cache and return it. In
438  * any case, additional reference is acquired on the returned object.
439  */
440 struct lu_object *lu_object_find(const struct lu_env *env,
441                                  struct lu_site *s, const struct lu_fid *f)
442 {
443         struct lu_object     *o;
444         struct lu_object     *shadow;
445         struct hlist_head *bucket;
446
447         /*
448          * This uses standard index maintenance protocol:
449          *
450          *     - search index under lock, and return object if found;
451          *     - otherwise, unlock index, allocate new object;
452          *     - lock index and search again;
453          *     - if nothing is found (usual case), insert newly created
454          *       object into index;
455          *     - otherwise (race: other thread inserted object), free
456          *       object just allocated.
457          *     - unlock index;
458          *     - return object.
459          */
460
461         bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits);
462
463         read_lock(&s->ls_guard);
464         o = htable_lookup(s, bucket, f);
465         read_unlock(&s->ls_guard);
466
467         if (o != NULL)
468                 return o;
469
470         /*
471          * Allocate new object. This may result in rather complicated
472          * operations, including fld queries, inode loading, etc.
473          */
474         o = lu_object_alloc(env, s, f);
475         if (unlikely(IS_ERR(o)))
476                 return o;
477
478         LASSERT(lu_fid_eq(lu_object_fid(o), f));
479
480         write_lock(&s->ls_guard);
481         shadow = htable_lookup(s, bucket, f);
482         if (likely(shadow == NULL)) {
483                 hlist_add_head(&o->lo_header->loh_hash, bucket);
484                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
485                 ++ s->ls_busy;
486                 ++ s->ls_total;
487                 shadow = o;
488                 o = NULL;
489         } else
490                 s->ls_stats.s_cache_race ++;
491         write_unlock(&s->ls_guard);
492         if (o != NULL)
493                 lu_object_free(env, o);
494         return shadow;
495 }
496 EXPORT_SYMBOL(lu_object_find);
497
498 /*
499  * Global list of all sites on this node
500  */
501 static CFS_LIST_HEAD(lu_sites);
502 static DECLARE_MUTEX(lu_sites_guard);
503
504 /*
505  * Global environment used by site shrinker.
506  */
507 static struct lu_env lu_shrink_env;
508
509 /*
510  * Print all objects in @s.
511  */
512 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
513                    lu_printer_t printer)
514 {
515         int i;
516
517         for (i = 0; i < s->ls_hash_size; ++i) {
518                 struct lu_object_header *h;
519                 struct hlist_node       *scan;
520
521                 read_lock(&s->ls_guard);
522                 hlist_for_each_entry(h, scan, &s->ls_hash[i], loh_hash) {
523
524                         if (!list_empty(&h->loh_layers)) {
525                                 const struct lu_object *obj;
526
527                                 obj = lu_object_top(h);
528                                 lu_object_print(env, cookie, printer, obj);
529                         } else
530                                 lu_object_header_print(env, cookie, printer, h);
531                 }
532                 read_unlock(&s->ls_guard);
533         }
534 }
535 EXPORT_SYMBOL(lu_site_print);
536
537 enum {
538         LU_CACHE_PERCENT   = 20,
539 };
540
541 /*
542  * Return desired hash table order.
543  */
544 static int lu_htable_order(void)
545 {
546         unsigned long cache_size;
547         int bits;
548
549         /*
550          * Calculate hash table size, assuming that we want reasonable
551          * performance when 20% of total memory is occupied by cache of
552          * lu_objects.
553          *
554          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
555          */
556         cache_size = num_physpages;
557
558 #if BITS_PER_LONG == 32
559         /* limit hashtable size for lowmem systems to low RAM */
560         if (cache_size > 1 << (30 - CFS_PAGE_SHIFT))
561                 cache_size = 1 << (30 - CFS_PAGE_SHIFT) * 3 / 4;
562 #endif
563
564         cache_size = cache_size / 100 * LU_CACHE_PERCENT *
565                 (CFS_PAGE_SIZE / 1024);
566
567         for (bits = 1; (1 << bits) < cache_size; ++bits) {
568                 ;
569         }
570         return bits;
571 }
572
573 /*
574  * Initialize site @s, with @d as the top level device.
575  */
576 int lu_site_init(struct lu_site *s, struct lu_device *top)
577 {
578         int bits;
579         int size;
580         int i;
581         ENTRY;
582
583         memset(s, 0, sizeof *s);
584         rwlock_init(&s->ls_guard);
585         CFS_INIT_LIST_HEAD(&s->ls_lru);
586         CFS_INIT_LIST_HEAD(&s->ls_linkage);
587         s->ls_top_dev = top;
588         top->ld_site = s;
589         lu_device_get(top);
590
591         for (bits = lu_htable_order(), size = 1 << bits;
592              (s->ls_hash =
593               cfs_alloc_large(size * sizeof s->ls_hash[0])) == NULL;
594              --bits, size >>= 1) {
595                 /*
596                  * Scale hash table down, until allocation succeeds.
597                  */
598                 ;
599         }
600
601         s->ls_hash_size = size;
602         s->ls_hash_bits = bits;
603         s->ls_hash_mask = size - 1;
604
605         for (i = 0; i < size; i++)
606                 INIT_HLIST_HEAD(&s->ls_hash[i]);
607
608         RETURN(0);
609 }
610 EXPORT_SYMBOL(lu_site_init);
611
612 /*
613  * Finalize @s and release its resources.
614  */
615 void lu_site_fini(struct lu_site *s)
616 {
617         LASSERT(list_empty(&s->ls_lru));
618         LASSERT(s->ls_total == 0);
619
620         down(&lu_sites_guard);
621         list_del_init(&s->ls_linkage);
622         up(&lu_sites_guard);
623
624         if (s->ls_hash != NULL) {
625                 int i;
626                 for (i = 0; i < s->ls_hash_size; i++)
627                         LASSERT(hlist_empty(&s->ls_hash[i]));
628                 cfs_free_large(s->ls_hash);
629                 s->ls_hash = NULL;
630         }
631         if (s->ls_top_dev != NULL) {
632                 s->ls_top_dev->ld_site = NULL;
633                 lu_device_put(s->ls_top_dev);
634                 s->ls_top_dev = NULL;
635         }
636 }
637 EXPORT_SYMBOL(lu_site_fini);
638
639 /*
640  * Called when initialization of stack for this site is completed.
641  */
642 int lu_site_init_finish(struct lu_site *s)
643 {
644         int result;
645         down(&lu_sites_guard);
646         result = lu_context_refill(&lu_shrink_env.le_ctx);
647         if (result == 0)
648                 list_add(&s->ls_linkage, &lu_sites);
649         up(&lu_sites_guard);
650         return result;
651 }
652 EXPORT_SYMBOL(lu_site_init_finish);
653
654 /*
655  * Acquire additional reference on device @d
656  */
657 void lu_device_get(struct lu_device *d)
658 {
659         atomic_inc(&d->ld_ref);
660 }
661 EXPORT_SYMBOL(lu_device_get);
662
663 /*
664  * Release reference on device @d.
665  */
666 void lu_device_put(struct lu_device *d)
667 {
668         atomic_dec(&d->ld_ref);
669 }
670 EXPORT_SYMBOL(lu_device_put);
671
672 /*
673  * Initialize device @d of type @t.
674  */
675 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
676 {
677         memset(d, 0, sizeof *d);
678         atomic_set(&d->ld_ref, 0);
679         d->ld_type = t;
680         return 0;
681 }
682 EXPORT_SYMBOL(lu_device_init);
683
684 /*
685  * Finalize device @d.
686  */
687 void lu_device_fini(struct lu_device *d)
688 {
689         if (d->ld_obd != NULL)
690                 /* finish lprocfs */
691                 lprocfs_obd_cleanup(d->ld_obd);
692
693         LASSERTF(atomic_read(&d->ld_ref) == 0,
694                  "Refcount is %u\n", atomic_read(&d->ld_ref));
695 }
696 EXPORT_SYMBOL(lu_device_fini);
697
698 /*
699  * Initialize object @o that is part of compound object @h and was created by
700  * device @d.
701  */
702 int lu_object_init(struct lu_object *o,
703                    struct lu_object_header *h, struct lu_device *d)
704 {
705         memset(o, 0, sizeof *o);
706         o->lo_header = h;
707         o->lo_dev    = d;
708         lu_device_get(d);
709         CFS_INIT_LIST_HEAD(&o->lo_linkage);
710         return 0;
711 }
712 EXPORT_SYMBOL(lu_object_init);
713
714 /*
715  * Finalize object and release its resources.
716  */
717 void lu_object_fini(struct lu_object *o)
718 {
719         LASSERT(list_empty(&o->lo_linkage));
720
721         if (o->lo_dev != NULL) {
722                 lu_device_put(o->lo_dev);
723                 o->lo_dev = NULL;
724         }
725 }
726 EXPORT_SYMBOL(lu_object_fini);
727
728 /*
729  * Add object @o as first layer of compound object @h
730  *
731  * This is typically called by the ->ldo_object_alloc() method of top-level
732  * device.
733  */
734 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
735 {
736         list_move(&o->lo_linkage, &h->loh_layers);
737 }
738 EXPORT_SYMBOL(lu_object_add_top);
739
740 /*
741  * Add object @o as a layer of compound object, going after @before.1
742  *
743  * This is typically called by the ->ldo_object_alloc() method of
744  * @before->lo_dev.
745  */
746 void lu_object_add(struct lu_object *before, struct lu_object *o)
747 {
748         list_move(&o->lo_linkage, &before->lo_linkage);
749 }
750 EXPORT_SYMBOL(lu_object_add);
751
752 /*
753  * Initialize compound object.
754  */
755 int lu_object_header_init(struct lu_object_header *h)
756 {
757         memset(h, 0, sizeof *h);
758         atomic_set(&h->loh_ref, 1);
759         INIT_HLIST_NODE(&h->loh_hash);
760         CFS_INIT_LIST_HEAD(&h->loh_lru);
761         CFS_INIT_LIST_HEAD(&h->loh_layers);
762         return 0;
763 }
764 EXPORT_SYMBOL(lu_object_header_init);
765
766 /*
767  * Finalize compound object.
768  */
769 void lu_object_header_fini(struct lu_object_header *h)
770 {
771         LASSERT(list_empty(&h->loh_layers));
772         LASSERT(list_empty(&h->loh_lru));
773         LASSERT(hlist_unhashed(&h->loh_hash));
774 }
775 EXPORT_SYMBOL(lu_object_header_fini);
776
777 /*
778  * Given a compound object, find its slice, corresponding to the device type
779  * @dtype.
780  */
781 struct lu_object *lu_object_locate(struct lu_object_header *h,
782                                    struct lu_device_type *dtype)
783 {
784         struct lu_object *o;
785
786         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
787                 if (o->lo_dev->ld_type == dtype)
788                         return o;
789         }
790         return NULL;
791 }
792 EXPORT_SYMBOL(lu_object_locate);
793
794
795
796 /*
797  * Finalize and free devices in the device stack.
798  * 
799  * Finalize device stack by purging object cache, and calling
800  * lu_device_type_operations::ldto_device_fini() and
801  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
802  */
803 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
804 {
805         struct lu_site   *site = top->ld_site;
806         struct lu_device *scan;
807         struct lu_device *next;
808
809         lu_site_purge(env, site, ~0);
810         for (scan = top; scan != NULL; scan = next) {
811                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
812                 lu_device_put(scan);
813         }
814
815         /* purge again. */
816         lu_site_purge(env, site, ~0);
817
818         if (!list_empty(&site->ls_lru) || site->ls_total != 0) {
819                 /*
820                  * Uh-oh, objects still exist.
821                  */
822                 static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR);
823
824                 lu_site_print(env, site, &cookie, lu_cdebug_printer);
825         }
826
827         for (scan = top; scan != NULL; scan = next) {
828                 const struct lu_device_type *ldt = scan->ld_type;
829                 struct obd_type             *type;
830
831                 next = ldt->ldt_ops->ldto_device_free(env, scan);
832                 type = ldt->ldt_obd_type;
833                 type->typ_refcnt--;
834                 class_put_type(type);
835         }
836 }
837 EXPORT_SYMBOL(lu_stack_fini);
838
839 enum {
840         /*
841          * Maximal number of tld slots.
842          */
843         LU_CONTEXT_KEY_NR = 16
844 };
845
846 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
847
848 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
849
850 /*
851  * Register new key.
852  */
853 int lu_context_key_register(struct lu_context_key *key)
854 {
855         int result;
856         int i;
857
858         LASSERT(key->lct_init != NULL);
859         LASSERT(key->lct_fini != NULL);
860         LASSERT(key->lct_tags != 0);
861         LASSERT(key->lct_owner != NULL);
862
863         result = -ENFILE;
864         spin_lock(&lu_keys_guard);
865         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
866                 if (lu_keys[i] == NULL) {
867                         key->lct_index = i;
868                         atomic_set(&key->lct_used, 1);
869                         lu_keys[i] = key;
870                         result = 0;
871                         break;
872                 }
873         }
874         spin_unlock(&lu_keys_guard);
875         return result;
876 }
877 EXPORT_SYMBOL(lu_context_key_register);
878
879 static void key_fini(struct lu_context *ctx, int index)
880 {
881         if (ctx->lc_value[index] != NULL) {
882                 struct lu_context_key *key;
883
884                 key = lu_keys[index];
885                 LASSERT(key != NULL);
886                 LASSERT(key->lct_fini != NULL);
887                 LASSERT(atomic_read(&key->lct_used) > 1);
888
889                 key->lct_fini(ctx, key, ctx->lc_value[index]);
890                 atomic_dec(&key->lct_used);
891                 LASSERT(key->lct_owner != NULL);
892                 if (!(ctx->lc_tags & LCT_NOREF)) {
893                         LASSERT(module_refcount(key->lct_owner) > 0);
894                         module_put(key->lct_owner);
895                 }
896                 ctx->lc_value[index] = NULL;
897         }
898 }
899
900 /*
901  * Deregister key.
902  */
903 void lu_context_key_degister(struct lu_context_key *key)
904 {
905         LASSERT(atomic_read(&key->lct_used) >= 1);
906         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
907
908         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
909
910         if (atomic_read(&key->lct_used) > 1)
911                 CERROR("key has instances.\n");
912         spin_lock(&lu_keys_guard);
913         lu_keys[key->lct_index] = NULL;
914         spin_unlock(&lu_keys_guard);
915 }
916 EXPORT_SYMBOL(lu_context_key_degister);
917
918 /*
919  * Return value associated with key @key in context @ctx.
920  */
921 void *lu_context_key_get(const struct lu_context *ctx,
922                          struct lu_context_key *key)
923 {
924         LASSERT(ctx->lc_state == LCS_ENTERED);
925         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
926         return ctx->lc_value[key->lct_index];
927 }
928 EXPORT_SYMBOL(lu_context_key_get);
929
930 static void keys_fini(struct lu_context *ctx)
931 {
932         int i;
933
934         if (ctx->lc_value != NULL) {
935                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
936                         key_fini(ctx, i);
937                 OBD_FREE(ctx->lc_value,
938                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
939                 ctx->lc_value = NULL;
940         }
941 }
942
943 static int keys_fill(const struct lu_context *ctx)
944 {
945         int i;
946
947         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
948                 struct lu_context_key *key;
949
950                 key = lu_keys[i];
951                 if (ctx->lc_value[i] == NULL &&
952                     key != NULL && key->lct_tags & ctx->lc_tags) {
953                         void *value;
954
955                         LASSERT(key->lct_init != NULL);
956                         LASSERT(key->lct_index == i);
957
958                         value = key->lct_init(ctx, key);
959                         if (unlikely(IS_ERR(value)))
960                                 return PTR_ERR(value);
961                         LASSERT(key->lct_owner != NULL);
962                         if (!(ctx->lc_tags & LCT_NOREF))
963                                 try_module_get(key->lct_owner);
964                         atomic_inc(&key->lct_used);
965                         ctx->lc_value[i] = value;
966                 }
967         }
968         return 0;
969 }
970
971 static int keys_init(struct lu_context *ctx)
972 {
973         int result;
974
975         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
976         if (likely(ctx->lc_value != NULL))
977                 result = keys_fill(ctx);
978         else
979                 result = -ENOMEM;
980
981         if (result != 0)
982                 keys_fini(ctx);
983         return result;
984 }
985
986 /*
987  * Initialize context data-structure. Create values for all keys.
988  */
989 int lu_context_init(struct lu_context *ctx, __u32 tags)
990 {
991         memset(ctx, 0, sizeof *ctx);
992         ctx->lc_state = LCS_INITIALIZED;
993         ctx->lc_tags = tags;
994         return keys_init(ctx);
995 }
996 EXPORT_SYMBOL(lu_context_init);
997
998 /*
999  * Finalize context data-structure. Destroy key values.
1000  */
1001 void lu_context_fini(struct lu_context *ctx)
1002 {
1003         LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1004         ctx->lc_state = LCS_FINALIZED;
1005         keys_fini(ctx);
1006 }
1007 EXPORT_SYMBOL(lu_context_fini);
1008
1009 /*
1010  * Called before entering context.
1011  */
1012 void lu_context_enter(struct lu_context *ctx)
1013 {
1014         LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1015         ctx->lc_state = LCS_ENTERED;
1016 }
1017 EXPORT_SYMBOL(lu_context_enter);
1018
1019 /*
1020  * Called after exiting from @ctx
1021  */
1022 void lu_context_exit(struct lu_context *ctx)
1023 {
1024         int i;
1025
1026         LASSERT(ctx->lc_state == LCS_ENTERED);
1027         ctx->lc_state = LCS_LEFT;
1028         if (ctx->lc_value != NULL) {
1029                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1030                         if (ctx->lc_value[i] != NULL) {
1031                                 struct lu_context_key *key;
1032
1033                                 key = lu_keys[i];
1034                                 LASSERT(key != NULL);
1035                                 if (key->lct_exit != NULL)
1036                                         key->lct_exit(ctx,
1037                                                       key, ctx->lc_value[i]);
1038                         }
1039                 }
1040         }
1041 }
1042 EXPORT_SYMBOL(lu_context_exit);
1043
1044 /*
1045  * Allocate for context all missing keys that were registered after context
1046  * creation.
1047  */
1048 int lu_context_refill(const struct lu_context *ctx)
1049 {
1050         LASSERT(ctx->lc_value != NULL);
1051         return keys_fill(ctx);
1052 }
1053 EXPORT_SYMBOL(lu_context_refill);
1054
1055 static int lu_env_setup(struct lu_env *env, struct lu_context *ses,
1056                         __u32 tags, int noref)
1057 {
1058         int result;
1059
1060         LASSERT(ergo(!noref, !(tags & LCT_NOREF)));
1061
1062         env->le_ses = ses;
1063         result = lu_context_init(&env->le_ctx, tags);
1064         if (likely(result == 0))
1065                 lu_context_enter(&env->le_ctx);
1066         return result;
1067 }
1068
1069 static int lu_env_init_noref(struct lu_env *env, struct lu_context *ses,
1070                              __u32 tags)
1071 {
1072         return lu_env_setup(env, ses, tags, 1);
1073 }
1074
1075 int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags)
1076 {
1077         return lu_env_setup(env, ses, tags, 0);
1078 }
1079 EXPORT_SYMBOL(lu_env_init);
1080
1081 void lu_env_fini(struct lu_env *env)
1082 {
1083         lu_context_exit(&env->le_ctx);
1084         lu_context_fini(&env->le_ctx);
1085         env->le_ses = NULL;
1086 }
1087 EXPORT_SYMBOL(lu_env_fini);
1088
1089 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1090 {
1091         struct lu_site *s;
1092         struct lu_site *tmp;
1093         int cached = 0;
1094         int remain = nr;
1095         CFS_LIST_HEAD(splice);
1096
1097         if (nr != 0 && !(gfp_mask & __GFP_FS))
1098                 return -1;
1099
1100         down(&lu_sites_guard);
1101         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1102                 if (nr != 0) {
1103                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1104                         /*
1105                          * Move just shrunk site to the tail of site list to
1106                          * assure shrinking fairness.
1107                          */
1108                         list_move_tail(&s->ls_linkage, &splice);
1109                 }
1110                 read_lock(&s->ls_guard);
1111                 cached += s->ls_total - s->ls_busy;
1112                 read_unlock(&s->ls_guard);
1113                 if (remain <= 0)
1114                         break;
1115         }
1116         list_splice(&splice, lu_sites.prev);
1117         up(&lu_sites_guard);
1118         return cached;
1119 }
1120
1121 static struct shrinker *lu_site_shrinker = NULL;
1122
1123 /*
1124  * Initialization of global lu_* data.
1125  */
1126 int lu_global_init(void)
1127 {
1128         int result;
1129
1130         LU_CONTEXT_KEY_INIT(&lu_global_key);
1131         result = lu_context_key_register(&lu_global_key);
1132         if (result == 0) {
1133                 /*
1134                  * At this level, we don't know what tags are needed, so
1135                  * allocate them conservatively. This should not be too bad,
1136                  * because this environment is global.
1137                  */
1138                 down(&lu_sites_guard);
1139                 result = lu_env_init_noref(&lu_shrink_env, NULL, LCT_SHRINKER);
1140                 up(&lu_sites_guard);
1141                 if (result == 0) {
1142                         /*
1143                          * seeks estimation: 3 seeks to read a record from oi,
1144                          * one to read inode, one for ea. Unfortunately
1145                          * setting this high value results in lu_object/inode
1146                          * cache consuming all the memory.
1147                          */
1148                         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS,
1149                                                         lu_cache_shrink);
1150                         if (result == 0)
1151                                 result = lu_time_global_init();
1152                 }
1153         }
1154         return result;
1155 }
1156
1157 /*
1158  * Dual to lu_global_init().
1159  */
1160 void lu_global_fini(void)
1161 {
1162         lu_time_global_fini();
1163         if (lu_site_shrinker != NULL) {
1164                 remove_shrinker(lu_site_shrinker);
1165                 lu_site_shrinker = NULL;
1166         }
1167
1168         lu_context_key_degister(&lu_global_key);
1169
1170         /*
1171          * Tear shrinker environment down _after_ de-registering
1172          * lu_global_key, because the latter has a value in the former.
1173          */
1174         down(&lu_sites_guard);
1175         lu_env_fini(&lu_shrink_env);
1176         up(&lu_sites_guard);
1177 }
1178
1179 struct lu_buf LU_BUF_NULL = {
1180         .lb_buf = NULL,
1181         .lb_len = 0
1182 };
1183 EXPORT_SYMBOL(LU_BUF_NULL);
1184
1185 /*
1186  * XXX: Functions below logically belong to fid module, but they are used by
1187  * dt_store_open(). Put them here until better place is found.
1188  */
1189
1190 void fid_pack(struct lu_fid_pack *pack, const struct lu_fid *fid,
1191               struct lu_fid *befider)
1192 {
1193         int recsize;
1194         __u64 seq;
1195         __u32 oid;
1196
1197         seq = fid_seq(fid);
1198         oid = fid_oid(fid);
1199
1200         /*
1201          * Two cases: compact 6 bytes representation for a common case, and
1202          * full 17 byte representation for "unusual" fid.
1203          */
1204
1205         /*
1206          * Check that usual case is really usual.
1207          */
1208         CLASSERT(LUSTRE_SEQ_MAX_WIDTH < 0xffffull);
1209
1210         if (fid_is_igif(fid) ||
1211             seq > 0xffffffull || oid > 0xffff || fid_ver(fid) != 0) {
1212                 fid_cpu_to_be(befider, fid);
1213                 recsize = sizeof *befider;
1214         } else {
1215                 unsigned char *small_befider;
1216
1217                 small_befider = (char *)befider;
1218
1219                 small_befider[0] = seq >> 16;
1220                 small_befider[1] = seq >> 8;
1221                 small_befider[2] = seq;
1222
1223                 small_befider[3] = oid >> 8;
1224                 small_befider[4] = oid;
1225
1226                 recsize = 5;
1227         }
1228         memcpy(pack->fp_area, befider, recsize);
1229         pack->fp_len = recsize + 1;
1230 }
1231 EXPORT_SYMBOL(fid_pack);
1232
1233 int fid_unpack(const struct lu_fid_pack *pack, struct lu_fid *fid)
1234 {
1235         int result;
1236
1237         result = 0;
1238         switch (pack->fp_len) {
1239         case sizeof *fid + 1:
1240                 memcpy(fid, pack->fp_area, sizeof *fid);
1241                 fid_be_to_cpu(fid, fid);
1242                 break;
1243         case 6: {
1244                 const unsigned char *area;
1245
1246                 area = pack->fp_area;
1247                 fid->f_seq = (area[0] << 16) | (area[1] << 8) | area[2];
1248                 fid->f_oid = (area[3] << 8) | area[4];
1249                 fid->f_ver = 0;
1250                 break;
1251         }
1252         default:
1253                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1254                 result = -EIO;
1255         }
1256         return result;
1257 }
1258 EXPORT_SYMBOL(fid_unpack);
1259
1260 const char *lu_time_names[LU_TIME_NR] = {
1261         [LU_TIME_FIND_LOOKUP] = "find_lookup",
1262         [LU_TIME_FIND_ALLOC]  = "find_alloc",
1263         [LU_TIME_FIND_INSERT] = "find_insert"
1264 };
1265 EXPORT_SYMBOL(lu_time_names);