Whamcloud - gitweb
use special macro for print time_t, cleanup in includes.
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 /* nr_free_pages() */
39 #include <linux/swap.h>
40 /* hash_long() */
41 #include <linux/hash.h>
42 #include <obd_support.h>
43 #include <lustre_disk.h>
44 #include <lustre_fid.h>
45 #include <lu_object.h>
46 #include <libcfs/list.h>
47 /* lu_time_global_{init,fini}() */
48 #include <lu_time.h>
49
50 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
51
52 /*
53  * Decrease reference counter on object. If last reference is freed, return
54  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
55  * case, free object immediately.
56  */
57 void lu_object_put(const struct lu_env *env, struct lu_object *o)
58 {
59         struct lu_object_header *top;
60         struct lu_site          *site;
61         struct lu_object        *orig;
62         int                      kill_it;
63
64         top = o->lo_header;
65         site = o->lo_dev->ld_site;
66         orig = o;
67         kill_it = 0;
68         write_lock(&site->ls_guard);
69         if (atomic_dec_and_test(&top->loh_ref)) {
70                 /*
71                  * When last reference is released, iterate over object
72                  * layers, and notify them that object is no longer busy.
73                  */
74                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
75                         if (o->lo_ops->loo_object_release != NULL)
76                                 o->lo_ops->loo_object_release(env, o);
77                 }
78                 -- site->ls_busy;
79                 if (lu_object_is_dying(top)) {
80                         /*
81                          * If object is dying (will not be cached), removed it
82                          * from hash table and LRU.
83                          *
84                          * This is done with hash table and LRU lists
85                          * locked. As the only way to acquire first reference
86                          * to previously unreferenced object is through
87                          * hash-table lookup (lu_object_find()), or LRU
88                          * scanning (lu_site_purge()), that are done under
89                          * hash-table and LRU lock, no race with concurrent
90                          * object lookup is possible and we can safely destroy
91                          * object below.
92                          */
93                         hlist_del_init(&top->loh_hash);
94                         list_del_init(&top->loh_lru);
95                         -- site->ls_total;
96                         kill_it = 1;
97                 }
98         }
99         write_unlock(&site->ls_guard);
100         if (kill_it)
101                 /*
102                  * Object was already removed from hash and lru above, can
103                  * kill it.
104                  */
105                 lu_object_free(env, orig);
106 }
107 EXPORT_SYMBOL(lu_object_put);
108
109 /*
110  * Allocate new object.
111  *
112  * This follows object creation protocol, described in the comment within
113  * struct lu_device_operations definition.
114  */
115 static struct lu_object *lu_object_alloc(const struct lu_env *env,
116                                          struct lu_site *s,
117                                          const struct lu_fid *f)
118 {
119         struct lu_object *scan;
120         struct lu_object *top;
121         struct list_head *layers;
122         int clean;
123         int result;
124         ENTRY;
125
126         /*
127          * Create top-level object slice. This will also create
128          * lu_object_header.
129          */
130         top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
131                                                       NULL, s->ls_top_dev);
132         if (top == NULL)
133                 RETURN(ERR_PTR(-ENOMEM));
134         /*
135          * This is the only place where object fid is assigned. It's constant
136          * after this point.
137          */
138         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
139         top->lo_header->loh_fid  = *f;
140         layers = &top->lo_header->loh_layers;
141         do {
142                 /*
143                  * Call ->loo_object_init() repeatedly, until no more new
144                  * object slices are created.
145                  */
146                 clean = 1;
147                 list_for_each_entry(scan, layers, lo_linkage) {
148                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
149                                 continue;
150                         clean = 0;
151                         scan->lo_header = top->lo_header;
152                         result = scan->lo_ops->loo_object_init(env, scan);
153                         if (result != 0) {
154                                 lu_object_free(env, top);
155                                 RETURN(ERR_PTR(result));
156                         }
157                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
158                 }
159         } while (!clean);
160
161         list_for_each_entry_reverse(scan, layers, lo_linkage) {
162                 if (scan->lo_ops->loo_object_start != NULL) {
163                         result = scan->lo_ops->loo_object_start(env, scan);
164                         if (result != 0) {
165                                 lu_object_free(env, top);
166                                 RETURN(ERR_PTR(result));
167                         }
168                 }
169         }
170
171         s->ls_stats.s_created ++;
172         RETURN(top);
173 }
174
175 /*
176  * Free object.
177  */
178 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
179 {
180         struct list_head splice;
181         struct lu_object *scan;
182
183         /*
184          * First call ->loo_object_delete() method to release all resources.
185          */
186         list_for_each_entry_reverse(scan,
187                                     &o->lo_header->loh_layers, lo_linkage) {
188                 if (scan->lo_ops->loo_object_delete != NULL)
189                         scan->lo_ops->loo_object_delete(env, scan);
190         }
191
192         /*
193          * Then, splice object layers into stand-alone list, and call
194          * ->loo_object_free() on all layers to free memory. Splice is
195          * necessary, because lu_object_header is freed together with the
196          * top-level slice.
197          */
198         CFS_INIT_LIST_HEAD(&splice);
199         list_splice_init(&o->lo_header->loh_layers, &splice);
200         while (!list_empty(&splice)) {
201                 o = container_of0(splice.next, struct lu_object, lo_linkage);
202                 list_del_init(&o->lo_linkage);
203                 LASSERT(o->lo_ops->loo_object_free != NULL);
204                 o->lo_ops->loo_object_free(env, o);
205         }
206 }
207
208 /*
209  * Free @nr objects from the cold end of the site LRU list.
210  */
211 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
212 {
213         struct list_head         dispose;
214         struct lu_object_header *h;
215         struct lu_object_header *temp;
216
217         CFS_INIT_LIST_HEAD(&dispose);
218         /*
219          * Under LRU list lock, scan LRU list and move unreferenced objects to
220          * the dispose list, removing them from LRU and hash table.
221          */
222         write_lock(&s->ls_guard);
223         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
224                 /*
225                  * Objects are sorted in lru order, and "busy" objects (ones
226                  * with h->loh_ref > 0) naturally tend to live near hot end
227                  * that we scan last. Unfortunately, sites usually have small
228                  * (less then ten) number of busy yet rarely accessed objects
229                  * (some global objects, accessed directly through pointers,
230                  * bypassing hash table). Currently algorithm scans them over
231                  * and over again. Probably we should move busy objects out of
232                  * LRU, or we can live with that.
233                  */
234                 if (nr-- == 0)
235                         break;
236                 if (atomic_read(&h->loh_ref) > 0)
237                         continue;
238                 hlist_del_init(&h->loh_hash);
239                 list_move(&h->loh_lru, &dispose);
240                 s->ls_total --;
241         }
242         write_unlock(&s->ls_guard);
243         /*
244          * Free everything on the dispose list. This is safe against races due
245          * to the reasons described in lu_object_put().
246          */
247         while (!list_empty(&dispose)) {
248                 h = container_of0(dispose.next,
249                                  struct lu_object_header, loh_lru);
250                 list_del_init(&h->loh_lru);
251                 lu_object_free(env, lu_object_top(h));
252                 s->ls_stats.s_lru_purged ++;
253         }
254         return nr;
255 }
256 EXPORT_SYMBOL(lu_site_purge);
257
258 /*
259  * Object printing.
260  *
261  * Code below has to jump through certain loops to output object description
262  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
263  * composes object description from strings that are parts of _lines_ of
264  * output (i.e., strings that are not terminated by newline). This doesn't fit
265  * very well into libcfs_debug_msg() interface that assumes that each message
266  * supplied to it is a self-contained output line.
267  *
268  * To work around this, strings are collected in a temporary buffer
269  * (implemented as a value of lu_cdebug_key key), until terminating newline
270  * character is detected.
271  *
272  */
273
274 enum {
275         /*
276          * Maximal line size.
277          *
278          * XXX overflow is not handled correctly.
279          */
280         LU_CDEBUG_LINE = 256
281 };
282
283 struct lu_cdebug_data {
284         /*
285          * Temporary buffer.
286          */
287         char lck_area[LU_CDEBUG_LINE];
288         /*
289          * fid staging area used by dt_store_open().
290          */
291         struct lu_fid_pack lck_pack;
292 };
293
294 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
295 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
296
297 /*
298  * Key, holding temporary buffer. This key is registered very early by
299  * lu_global_init().
300  */
301 struct lu_context_key lu_global_key = {
302         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
303         .lct_init = lu_global_key_init,
304         .lct_fini = lu_global_key_fini
305 };
306
307 /*
308  * Printer function emitting messages through libcfs_debug_msg().
309  */
310 int lu_cdebug_printer(const struct lu_env *env,
311                       void *cookie, const char *format, ...)
312 {
313         struct lu_cdebug_print_info *info = cookie;
314         struct lu_cdebug_data       *key;
315         int used;
316         int complete;
317         va_list args;
318
319         va_start(args, format);
320
321         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
322         LASSERT(key != NULL);
323
324         used = strlen(key->lck_area);
325         complete = format[strlen(format) - 1] == '\n';
326         /*
327          * Append new chunk to the buffer.
328          */
329         vsnprintf(key->lck_area + used,
330                   ARRAY_SIZE(key->lck_area) - used, format, args);
331         if (complete) {
332                 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
333                                  (char *)info->lpi_file, info->lpi_fn,
334                                  info->lpi_line, "%s", key->lck_area);
335                 key->lck_area[0] = 0;
336         }
337         va_end(args);
338         return 0;
339 }
340 EXPORT_SYMBOL(lu_cdebug_printer);
341
342 /*
343  * Print object header.
344  */
345 static void lu_object_header_print(const struct lu_env *env,
346                                    void *cookie, lu_printer_t printer,
347                                    const struct lu_object_header *hdr)
348 {
349         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
350                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
351                    PFID(&hdr->loh_fid),
352                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
353                    list_empty(&hdr->loh_lru) ? "" : " lru",
354                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
355 }
356
357 /*
358  * Print human readable representation of the @o to the @printer.
359  */
360 void lu_object_print(const struct lu_env *env, void *cookie,
361                      lu_printer_t printer, const struct lu_object *o)
362 {
363         static const char ruler[] = "........................................";
364         struct lu_object_header *top;
365         int depth;
366
367         top = o->lo_header;
368         lu_object_header_print(env, cookie, printer, top);
369         (*printer)(env, cookie, "\n");
370         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
371                 depth = o->lo_depth + 4;
372                 LASSERT(o->lo_ops->loo_object_print != NULL);
373                 /*
374                  * print `.' @depth times.
375                  */
376                 (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
377                 o->lo_ops->loo_object_print(env, cookie, printer, o);
378                 (*printer)(env, cookie, "\n");
379         }
380 }
381 EXPORT_SYMBOL(lu_object_print);
382
383 /*
384  * Check object consistency.
385  */
386 int lu_object_invariant(const struct lu_object *o)
387 {
388         struct lu_object_header *top;
389
390         top = o->lo_header;
391         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
392                 if (o->lo_ops->loo_object_invariant != NULL &&
393                     !o->lo_ops->loo_object_invariant(o))
394                         return 0;
395         }
396         return 1;
397 }
398 EXPORT_SYMBOL(lu_object_invariant);
399
400 static struct lu_object *htable_lookup(struct lu_site *s,
401                                        const struct hlist_head *bucket,
402                                        const struct lu_fid *f)
403 {
404         struct lu_object_header *h;
405         struct hlist_node *scan;
406
407         hlist_for_each_entry(h, scan, bucket, loh_hash) {
408                 s->ls_stats.s_cache_check ++;
409                 if (likely(lu_fid_eq(&h->loh_fid, f) &&
410                            !lu_object_is_dying(h))) {
411                         /* bump reference count... */
412                         if (atomic_add_return(1, &h->loh_ref) == 1)
413                                 ++ s->ls_busy;
414                         /* and move to the head of the LRU */
415                         /*
416                          * XXX temporary disable this to measure effects of
417                          * read-write locking.
418                          */
419                         /* list_move_tail(&h->loh_lru, &s->ls_lru); */
420                         s->ls_stats.s_cache_hit ++;
421                         return lu_object_top(h);
422                 }
423         }
424         s->ls_stats.s_cache_miss ++;
425         return NULL;
426 }
427
428 static __u32 fid_hash(const struct lu_fid *f, int bits)
429 {
430         /* all objects with same id and different versions will belong to same
431          * collisions list. */
432         return hash_long(fid_flatten(f), bits);
433 }
434
435 /*
436  * Search cache for an object with the fid @f. If such object is found, return
437  * it. Otherwise, create new object, insert it into cache and return it. In
438  * any case, additional reference is acquired on the returned object.
439  */
440 struct lu_object *lu_object_find(const struct lu_env *env,
441                                  struct lu_site *s, const struct lu_fid *f)
442 {
443         struct lu_object     *o;
444         struct lu_object     *shadow;
445         struct hlist_head *bucket;
446
447         /*
448          * This uses standard index maintenance protocol:
449          *
450          *     - search index under lock, and return object if found;
451          *     - otherwise, unlock index, allocate new object;
452          *     - lock index and search again;
453          *     - if nothing is found (usual case), insert newly created
454          *       object into index;
455          *     - otherwise (race: other thread inserted object), free
456          *       object just allocated.
457          *     - unlock index;
458          *     - return object.
459          */
460
461         bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits);
462
463         read_lock(&s->ls_guard);
464         o = htable_lookup(s, bucket, f);
465         read_unlock(&s->ls_guard);
466
467         if (o != NULL)
468                 return o;
469
470         /*
471          * Allocate new object. This may result in rather complicated
472          * operations, including fld queries, inode loading, etc.
473          */
474         o = lu_object_alloc(env, s, f);
475         if (unlikely(IS_ERR(o)))
476                 return o;
477
478         LASSERT(lu_fid_eq(lu_object_fid(o), f));
479
480         write_lock(&s->ls_guard);
481         shadow = htable_lookup(s, bucket, f);
482         if (likely(shadow == NULL)) {
483                 hlist_add_head(&o->lo_header->loh_hash, bucket);
484                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
485                 ++ s->ls_busy;
486                 ++ s->ls_total;
487                 shadow = o;
488                 o = NULL;
489         } else
490                 s->ls_stats.s_cache_race ++;
491         write_unlock(&s->ls_guard);
492         if (o != NULL)
493                 lu_object_free(env, o);
494         return shadow;
495 }
496 EXPORT_SYMBOL(lu_object_find);
497
498 /*
499  * Global list of all sites on this node
500  */
501 static CFS_LIST_HEAD(lu_sites);
502 static DECLARE_MUTEX(lu_sites_guard);
503
504 /*
505  * Global environment used by site shrinker.
506  */
507 static struct lu_env lu_shrink_env;
508
509 /*
510  * Print all objects in @s.
511  */
512 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
513                    lu_printer_t printer)
514 {
515         int i;
516
517         for (i = 0; i < s->ls_hash_size; ++i) {
518                 struct lu_object_header *h;
519                 struct hlist_node       *scan;
520
521                 read_lock(&s->ls_guard);
522                 hlist_for_each_entry(h, scan, &s->ls_hash[i], loh_hash) {
523
524                         if (!list_empty(&h->loh_layers)) {
525                                 const struct lu_object *obj;
526
527                                 obj = lu_object_top(h);
528                                 lu_object_print(env, cookie, printer, obj);
529                         } else
530                                 lu_object_header_print(env, cookie, printer, h);
531                 }
532                 read_unlock(&s->ls_guard);
533         }
534 }
535 EXPORT_SYMBOL(lu_site_print);
536
537 enum {
538         LU_CACHE_PERCENT   = 30,
539 };
540
541 /*
542  * Return desired hash table order.
543  */
544 static int lu_htable_order(void)
545 {
546         int bits;
547         unsigned long cache_size;
548
549         /*
550          * Calculate hash table size, assuming that we want reasonable
551          * performance when 30% of available memory is occupied by cache of
552          * lu_objects.
553          *
554          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
555          */
556         cache_size = ll_nr_free_buffer_pages() / 100 *
557                 LU_CACHE_PERCENT * (CFS_PAGE_SIZE / 1024);
558
559         for (bits = 1; (1 << bits) < cache_size; ++bits) {
560                 ;
561         }
562         return bits;
563 }
564
565 /*
566  * Initialize site @s, with @d as the top level device.
567  */
568 int lu_site_init(struct lu_site *s, struct lu_device *top)
569 {
570         int bits;
571         int size;
572         int i;
573         ENTRY;
574
575         memset(s, 0, sizeof *s);
576         rwlock_init(&s->ls_guard);
577         CFS_INIT_LIST_HEAD(&s->ls_lru);
578         CFS_INIT_LIST_HEAD(&s->ls_linkage);
579         s->ls_top_dev = top;
580         top->ld_site = s;
581         lu_device_get(top);
582
583         for (bits = lu_htable_order(), size = 1 << bits;
584              (s->ls_hash =
585               cfs_alloc_large(size * sizeof s->ls_hash[0])) == NULL;
586              --bits, size >>= 1) {
587                 /*
588                  * Scale hash table down, until allocation succeeds.
589                  */
590                 ;
591         }
592
593         s->ls_hash_size = size;
594         s->ls_hash_bits = bits;
595         s->ls_hash_mask = size - 1;
596
597         for (i = 0; i < size; i++)
598                 INIT_HLIST_HEAD(&s->ls_hash[i]);
599
600         RETURN(0);
601 }
602 EXPORT_SYMBOL(lu_site_init);
603
604 /*
605  * Finalize @s and release its resources.
606  */
607 void lu_site_fini(struct lu_site *s)
608 {
609         LASSERT(list_empty(&s->ls_lru));
610         LASSERT(s->ls_total == 0);
611
612         down(&lu_sites_guard);
613         list_del_init(&s->ls_linkage);
614         up(&lu_sites_guard);
615
616         if (s->ls_hash != NULL) {
617                 int i;
618                 for (i = 0; i < s->ls_hash_size; i++)
619                         LASSERT(hlist_empty(&s->ls_hash[i]));
620                 cfs_free_large(s->ls_hash);
621                 s->ls_hash = NULL;
622         }
623         if (s->ls_top_dev != NULL) {
624                 s->ls_top_dev->ld_site = NULL;
625                 lu_device_put(s->ls_top_dev);
626                 s->ls_top_dev = NULL;
627         }
628 }
629 EXPORT_SYMBOL(lu_site_fini);
630
631 /*
632  * Called when initialization of stack for this site is completed.
633  */
634 int lu_site_init_finish(struct lu_site *s)
635 {
636         int result;
637         down(&lu_sites_guard);
638         result = lu_context_refill(&lu_shrink_env.le_ctx);
639         if (result == 0)
640                 list_add(&s->ls_linkage, &lu_sites);
641         up(&lu_sites_guard);
642         return result;
643 }
644 EXPORT_SYMBOL(lu_site_init_finish);
645
646 /*
647  * Acquire additional reference on device @d
648  */
649 void lu_device_get(struct lu_device *d)
650 {
651         atomic_inc(&d->ld_ref);
652 }
653 EXPORT_SYMBOL(lu_device_get);
654
655 /*
656  * Release reference on device @d.
657  */
658 void lu_device_put(struct lu_device *d)
659 {
660         atomic_dec(&d->ld_ref);
661 }
662 EXPORT_SYMBOL(lu_device_put);
663
664 /*
665  * Initialize device @d of type @t.
666  */
667 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
668 {
669         memset(d, 0, sizeof *d);
670         atomic_set(&d->ld_ref, 0);
671         d->ld_type = t;
672         return 0;
673 }
674 EXPORT_SYMBOL(lu_device_init);
675
676 /*
677  * Finalize device @d.
678  */
679 void lu_device_fini(struct lu_device *d)
680 {
681         if (d->ld_obd != NULL)
682                 /* finish lprocfs */
683                 lprocfs_obd_cleanup(d->ld_obd);
684
685         LASSERTF(atomic_read(&d->ld_ref) == 0,
686                  "Refcount is %u\n", atomic_read(&d->ld_ref));
687 }
688 EXPORT_SYMBOL(lu_device_fini);
689
690 /*
691  * Initialize object @o that is part of compound object @h and was created by
692  * device @d.
693  */
694 int lu_object_init(struct lu_object *o,
695                    struct lu_object_header *h, struct lu_device *d)
696 {
697         memset(o, 0, sizeof *o);
698         o->lo_header = h;
699         o->lo_dev    = d;
700         lu_device_get(d);
701         CFS_INIT_LIST_HEAD(&o->lo_linkage);
702         return 0;
703 }
704 EXPORT_SYMBOL(lu_object_init);
705
706 /*
707  * Finalize object and release its resources.
708  */
709 void lu_object_fini(struct lu_object *o)
710 {
711         LASSERT(list_empty(&o->lo_linkage));
712
713         if (o->lo_dev != NULL) {
714                 lu_device_put(o->lo_dev);
715                 o->lo_dev = NULL;
716         }
717 }
718 EXPORT_SYMBOL(lu_object_fini);
719
720 /*
721  * Add object @o as first layer of compound object @h
722  *
723  * This is typically called by the ->ldo_object_alloc() method of top-level
724  * device.
725  */
726 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
727 {
728         list_move(&o->lo_linkage, &h->loh_layers);
729 }
730 EXPORT_SYMBOL(lu_object_add_top);
731
732 /*
733  * Add object @o as a layer of compound object, going after @before.1
734  *
735  * This is typically called by the ->ldo_object_alloc() method of
736  * @before->lo_dev.
737  */
738 void lu_object_add(struct lu_object *before, struct lu_object *o)
739 {
740         list_move(&o->lo_linkage, &before->lo_linkage);
741 }
742 EXPORT_SYMBOL(lu_object_add);
743
744 /*
745  * Initialize compound object.
746  */
747 int lu_object_header_init(struct lu_object_header *h)
748 {
749         memset(h, 0, sizeof *h);
750         atomic_set(&h->loh_ref, 1);
751         INIT_HLIST_NODE(&h->loh_hash);
752         CFS_INIT_LIST_HEAD(&h->loh_lru);
753         CFS_INIT_LIST_HEAD(&h->loh_layers);
754         return 0;
755 }
756 EXPORT_SYMBOL(lu_object_header_init);
757
758 /*
759  * Finalize compound object.
760  */
761 void lu_object_header_fini(struct lu_object_header *h)
762 {
763         LASSERT(list_empty(&h->loh_layers));
764         LASSERT(list_empty(&h->loh_lru));
765         LASSERT(hlist_unhashed(&h->loh_hash));
766 }
767 EXPORT_SYMBOL(lu_object_header_fini);
768
769 /*
770  * Given a compound object, find its slice, corresponding to the device type
771  * @dtype.
772  */
773 struct lu_object *lu_object_locate(struct lu_object_header *h,
774                                    struct lu_device_type *dtype)
775 {
776         struct lu_object *o;
777
778         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
779                 if (o->lo_dev->ld_type == dtype)
780                         return o;
781         }
782         return NULL;
783 }
784 EXPORT_SYMBOL(lu_object_locate);
785
786
787
788 /*
789  * Finalize and free devices in the device stack.
790  * 
791  * Finalize device stack by purging object cache, and calling
792  * lu_device_type_operations::ldto_device_fini() and
793  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
794  */
795 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
796 {
797         struct lu_site   *site = top->ld_site;
798         struct lu_device *scan;
799         struct lu_device *next;
800
801         lu_site_purge(env, site, ~0);
802         for (scan = top; scan != NULL; scan = next) {
803                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
804                 lu_device_put(scan);
805         }
806
807         /* purge again. */
808         lu_site_purge(env, site, ~0);
809
810         if (!list_empty(&site->ls_lru) || site->ls_total != 0) {
811                 /*
812                  * Uh-oh, objects still exist.
813                  */
814                 static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR);
815
816                 lu_site_print(env, site, &cookie, lu_cdebug_printer);
817         }
818
819         for (scan = top; scan != NULL; scan = next) {
820                 const struct lu_device_type *ldt = scan->ld_type;
821                 struct obd_type             *type;
822
823                 next = ldt->ldt_ops->ldto_device_free(env, scan);
824                 type = ldt->ldt_obd_type;
825                 type->typ_refcnt--;
826                 class_put_type(type);
827         }
828 }
829 EXPORT_SYMBOL(lu_stack_fini);
830
831 enum {
832         /*
833          * Maximal number of tld slots.
834          */
835         LU_CONTEXT_KEY_NR = 16
836 };
837
838 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
839
840 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
841
842 /*
843  * Register new key.
844  */
845 int lu_context_key_register(struct lu_context_key *key)
846 {
847         int result;
848         int i;
849
850         LASSERT(key->lct_init != NULL);
851         LASSERT(key->lct_fini != NULL);
852         LASSERT(key->lct_tags != 0);
853         LASSERT(key->lct_owner != NULL);
854
855         result = -ENFILE;
856         spin_lock(&lu_keys_guard);
857         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
858                 if (lu_keys[i] == NULL) {
859                         key->lct_index = i;
860                         atomic_set(&key->lct_used, 1);
861                         lu_keys[i] = key;
862                         result = 0;
863                         break;
864                 }
865         }
866         spin_unlock(&lu_keys_guard);
867         return result;
868 }
869 EXPORT_SYMBOL(lu_context_key_register);
870
871 static void key_fini(struct lu_context *ctx, int index)
872 {
873         if (ctx->lc_value[index] != NULL) {
874                 struct lu_context_key *key;
875
876                 key = lu_keys[index];
877                 LASSERT(key != NULL);
878                 LASSERT(key->lct_fini != NULL);
879                 LASSERT(atomic_read(&key->lct_used) > 1);
880
881                 key->lct_fini(ctx, key, ctx->lc_value[index]);
882                 atomic_dec(&key->lct_used);
883                 LASSERT(key->lct_owner != NULL);
884                 if (!(ctx->lc_tags & LCT_NOREF)) {
885                         LASSERT(module_refcount(key->lct_owner) > 0);
886                         module_put(key->lct_owner);
887                 }
888                 ctx->lc_value[index] = NULL;
889         }
890 }
891
892 /*
893  * Deregister key.
894  */
895 void lu_context_key_degister(struct lu_context_key *key)
896 {
897         LASSERT(atomic_read(&key->lct_used) >= 1);
898         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
899
900         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
901
902         if (atomic_read(&key->lct_used) > 1)
903                 CERROR("key has instances.\n");
904         spin_lock(&lu_keys_guard);
905         lu_keys[key->lct_index] = NULL;
906         spin_unlock(&lu_keys_guard);
907 }
908 EXPORT_SYMBOL(lu_context_key_degister);
909
910 /*
911  * Return value associated with key @key in context @ctx.
912  */
913 void *lu_context_key_get(const struct lu_context *ctx,
914                          struct lu_context_key *key)
915 {
916         LASSERT(ctx->lc_state == LCS_ENTERED);
917         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
918         return ctx->lc_value[key->lct_index];
919 }
920 EXPORT_SYMBOL(lu_context_key_get);
921
922 static void keys_fini(struct lu_context *ctx)
923 {
924         int i;
925
926         if (ctx->lc_value != NULL) {
927                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
928                         key_fini(ctx, i);
929                 OBD_FREE(ctx->lc_value,
930                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
931                 ctx->lc_value = NULL;
932         }
933 }
934
935 static int keys_fill(const struct lu_context *ctx)
936 {
937         int i;
938
939         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
940                 struct lu_context_key *key;
941
942                 key = lu_keys[i];
943                 if (ctx->lc_value[i] == NULL &&
944                     key != NULL && key->lct_tags & ctx->lc_tags) {
945                         void *value;
946
947                         LASSERT(key->lct_init != NULL);
948                         LASSERT(key->lct_index == i);
949
950                         value = key->lct_init(ctx, key);
951                         if (unlikely(IS_ERR(value)))
952                                 return PTR_ERR(value);
953                         LASSERT(key->lct_owner != NULL);
954                         if (!(ctx->lc_tags & LCT_NOREF))
955                                 try_module_get(key->lct_owner);
956                         atomic_inc(&key->lct_used);
957                         ctx->lc_value[i] = value;
958                 }
959         }
960         return 0;
961 }
962
963 static int keys_init(struct lu_context *ctx)
964 {
965         int result;
966
967         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
968         if (likely(ctx->lc_value != NULL))
969                 result = keys_fill(ctx);
970         else
971                 result = -ENOMEM;
972
973         if (result != 0)
974                 keys_fini(ctx);
975         return result;
976 }
977
978 /*
979  * Initialize context data-structure. Create values for all keys.
980  */
981 int lu_context_init(struct lu_context *ctx, __u32 tags)
982 {
983         memset(ctx, 0, sizeof *ctx);
984         ctx->lc_state = LCS_INITIALIZED;
985         ctx->lc_tags = tags;
986         return keys_init(ctx);
987 }
988 EXPORT_SYMBOL(lu_context_init);
989
990 /*
991  * Finalize context data-structure. Destroy key values.
992  */
993 void lu_context_fini(struct lu_context *ctx)
994 {
995         LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
996         ctx->lc_state = LCS_FINALIZED;
997         keys_fini(ctx);
998 }
999 EXPORT_SYMBOL(lu_context_fini);
1000
1001 /*
1002  * Called before entering context.
1003  */
1004 void lu_context_enter(struct lu_context *ctx)
1005 {
1006         LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1007         ctx->lc_state = LCS_ENTERED;
1008 }
1009 EXPORT_SYMBOL(lu_context_enter);
1010
1011 /*
1012  * Called after exiting from @ctx
1013  */
1014 void lu_context_exit(struct lu_context *ctx)
1015 {
1016         int i;
1017
1018         LASSERT(ctx->lc_state == LCS_ENTERED);
1019         ctx->lc_state = LCS_LEFT;
1020         if (ctx->lc_value != NULL) {
1021                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1022                         if (ctx->lc_value[i] != NULL) {
1023                                 struct lu_context_key *key;
1024
1025                                 key = lu_keys[i];
1026                                 LASSERT(key != NULL);
1027                                 if (key->lct_exit != NULL)
1028                                         key->lct_exit(ctx,
1029                                                       key, ctx->lc_value[i]);
1030                         }
1031                 }
1032         }
1033 }
1034 EXPORT_SYMBOL(lu_context_exit);
1035
1036 /*
1037  * Allocate for context all missing keys that were registered after context
1038  * creation.
1039  */
1040 int lu_context_refill(const struct lu_context *ctx)
1041 {
1042         LASSERT(ctx->lc_value != NULL);
1043         return keys_fill(ctx);
1044 }
1045 EXPORT_SYMBOL(lu_context_refill);
1046
1047 static int lu_env_setup(struct lu_env *env, struct lu_context *ses,
1048                         __u32 tags, int noref)
1049 {
1050         int result;
1051
1052         LASSERT(ergo(!noref, !(tags & LCT_NOREF)));
1053
1054         env->le_ses = ses;
1055         result = lu_context_init(&env->le_ctx, tags);
1056         if (likely(result == 0))
1057                 lu_context_enter(&env->le_ctx);
1058         return result;
1059 }
1060
1061 static int lu_env_init_noref(struct lu_env *env, struct lu_context *ses,
1062                              __u32 tags)
1063 {
1064         return lu_env_setup(env, ses, tags, 1);
1065 }
1066
1067 int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags)
1068 {
1069         return lu_env_setup(env, ses, tags, 0);
1070 }
1071 EXPORT_SYMBOL(lu_env_init);
1072
1073 void lu_env_fini(struct lu_env *env)
1074 {
1075         lu_context_exit(&env->le_ctx);
1076         lu_context_fini(&env->le_ctx);
1077         env->le_ses = NULL;
1078 }
1079 EXPORT_SYMBOL(lu_env_fini);
1080
1081 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1082 {
1083         struct lu_site *s;
1084         struct lu_site *tmp;
1085         int cached = 0;
1086         int remain = nr;
1087         CFS_LIST_HEAD(splice);
1088
1089         if (nr != 0 && !(gfp_mask & __GFP_FS))
1090                 return -1;
1091
1092         down(&lu_sites_guard);
1093         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1094                 if (nr != 0) {
1095                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1096                         /*
1097                          * Move just shrunk site to the tail of site list to
1098                          * assure shrinking fairness.
1099                          */
1100                         list_move_tail(&s->ls_linkage, &splice);
1101                 }
1102                 read_lock(&s->ls_guard);
1103                 cached += s->ls_total - s->ls_busy;
1104                 read_unlock(&s->ls_guard);
1105                 if (remain <= 0)
1106                         break;
1107         }
1108         list_splice(&splice, lu_sites.prev);
1109         up(&lu_sites_guard);
1110         return cached;
1111 }
1112
1113 static struct shrinker *lu_site_shrinker = NULL;
1114
1115 /*
1116  * Initialization of global lu_* data.
1117  */
1118 int lu_global_init(void)
1119 {
1120         int result;
1121
1122         LU_CONTEXT_KEY_INIT(&lu_global_key);
1123         result = lu_context_key_register(&lu_global_key);
1124         if (result == 0) {
1125                 /*
1126                  * At this level, we don't know what tags are needed, so
1127                  * allocate them conservatively. This should not be too bad,
1128                  * because this environment is global.
1129                  */
1130                 down(&lu_sites_guard);
1131                 result = lu_env_init_noref(&lu_shrink_env, NULL, LCT_SHRINKER);
1132                 up(&lu_sites_guard);
1133                 if (result == 0) {
1134                         /*
1135                          * seeks estimation: 3 seeks to read a record from oi,
1136                          * one to read inode, one for ea. Unfortunately
1137                          * setting this high value results in lu_object/inode
1138                          * cache consuming all the memory.
1139                          */
1140                         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS,
1141                                                         lu_cache_shrink);
1142                         if (result == 0)
1143                                 result = lu_time_global_init();
1144                 }
1145         }
1146         return result;
1147 }
1148
1149 /*
1150  * Dual to lu_global_init().
1151  */
1152 void lu_global_fini(void)
1153 {
1154         lu_time_global_fini();
1155         if (lu_site_shrinker != NULL) {
1156                 remove_shrinker(lu_site_shrinker);
1157                 lu_site_shrinker = NULL;
1158         }
1159
1160         lu_context_key_degister(&lu_global_key);
1161
1162         /*
1163          * Tear shrinker environment down _after_ de-registering
1164          * lu_global_key, because the latter has a value in the former.
1165          */
1166         down(&lu_sites_guard);
1167         lu_env_fini(&lu_shrink_env);
1168         up(&lu_sites_guard);
1169 }
1170
1171 struct lu_buf LU_BUF_NULL = {
1172         .lb_buf = NULL,
1173         .lb_len = 0
1174 };
1175 EXPORT_SYMBOL(LU_BUF_NULL);
1176
1177 /*
1178  * XXX: Functions below logically belong to fid module, but they are used by
1179  * dt_store_open(). Put them here until better place is found.
1180  */
1181
1182 void fid_pack(struct lu_fid_pack *pack, const struct lu_fid *fid,
1183               struct lu_fid *befider)
1184 {
1185         int recsize;
1186         __u64 seq;
1187         __u32 oid;
1188
1189         seq = fid_seq(fid);
1190         oid = fid_oid(fid);
1191
1192         /*
1193          * Two cases: compact 6 bytes representation for a common case, and
1194          * full 17 byte representation for "unusual" fid.
1195          */
1196
1197         /*
1198          * Check that usual case is really usual.
1199          */
1200         CLASSERT(LUSTRE_SEQ_MAX_WIDTH < 0xffffull);
1201
1202         if (fid_is_igif(fid) ||
1203             seq > 0xffffffull || oid > 0xffff || fid_ver(fid) != 0) {
1204                 fid_cpu_to_be(befider, fid);
1205                 recsize = sizeof *befider;
1206         } else {
1207                 unsigned char *small_befider;
1208
1209                 small_befider = (char *)befider;
1210
1211                 small_befider[0] = seq >> 16;
1212                 small_befider[1] = seq >> 8;
1213                 small_befider[2] = seq;
1214
1215                 small_befider[3] = oid >> 8;
1216                 small_befider[4] = oid;
1217
1218                 recsize = 5;
1219         }
1220         memcpy(pack->fp_area, befider, recsize);
1221         pack->fp_len = recsize + 1;
1222 }
1223 EXPORT_SYMBOL(fid_pack);
1224
1225 int fid_unpack(const struct lu_fid_pack *pack, struct lu_fid *fid)
1226 {
1227         int result;
1228
1229         result = 0;
1230         switch (pack->fp_len) {
1231         case sizeof *fid + 1:
1232                 memcpy(fid, pack->fp_area, sizeof *fid);
1233                 fid_be_to_cpu(fid, fid);
1234                 break;
1235         case 6: {
1236                 const unsigned char *area;
1237
1238                 area = pack->fp_area;
1239                 fid->f_seq = (area[0] << 16) | (area[1] << 8) | area[2];
1240                 fid->f_oid = (area[3] << 8) | area[4];
1241                 fid->f_ver = 0;
1242                 break;
1243         }
1244         default:
1245                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1246                 result = -EIO;
1247         }
1248         return result;
1249 }
1250 EXPORT_SYMBOL(fid_unpack);
1251
1252 const char *lu_time_names[LU_TIME_NR] = {
1253         [LU_TIME_FIND_LOOKUP] = "find_lookup",
1254         [LU_TIME_FIND_ALLOC]  = "find_alloc",
1255         [LU_TIME_FIND_INSERT] = "find_insert"
1256 };
1257 EXPORT_SYMBOL(lu_time_names);