Whamcloud - gitweb
Land b_head_libcfs onto HEAD (20080805_1611)
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  * Author: Nikita Danilov <nikita@clusterfs.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49
50 #include <linux/seq_file.h>
51 #include <linux/module.h>
52 /* nr_free_pages() */
53 #include <linux/swap.h>
54 /* hash_long() */
55 #include <libcfs/libcfs_hash.h>
56 #include <obd_support.h>
57 #include <lustre_disk.h>
58 #include <lustre_fid.h>
59 #include <lu_object.h>
60 #include <libcfs/list.h>
61 /* lu_time_global_{init,fini}() */
62 #include <lu_time.h>
63
64 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
65
66 /*
67  * Decrease reference counter on object. If last reference is freed, return
68  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
69  * case, free object immediately.
70  */
71 void lu_object_put(const struct lu_env *env, struct lu_object *o)
72 {
73         struct lu_object_header *top;
74         struct lu_site          *site;
75         struct lu_object        *orig;
76         int                      kill_it;
77
78         top = o->lo_header;
79         site = o->lo_dev->ld_site;
80         orig = o;
81         kill_it = 0;
82         write_lock(&site->ls_guard);
83         if (atomic_dec_and_test(&top->loh_ref)) {
84                 /*
85                  * When last reference is released, iterate over object
86                  * layers, and notify them that object is no longer busy.
87                  */
88                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
89                         if (o->lo_ops->loo_object_release != NULL)
90                                 o->lo_ops->loo_object_release(env, o);
91                 }
92                 -- site->ls_busy;
93                 if (lu_object_is_dying(top)) {
94                         /*
95                          * If object is dying (will not be cached), removed it
96                          * from hash table and LRU.
97                          *
98                          * This is done with hash table and LRU lists
99                          * locked. As the only way to acquire first reference
100                          * to previously unreferenced object is through
101                          * hash-table lookup (lu_object_find()), or LRU
102                          * scanning (lu_site_purge()), that are done under
103                          * hash-table and LRU lock, no race with concurrent
104                          * object lookup is possible and we can safely destroy
105                          * object below.
106                          */
107                         hlist_del_init(&top->loh_hash);
108                         list_del_init(&top->loh_lru);
109                         -- site->ls_total;
110                         kill_it = 1;
111                 }
112         }
113         write_unlock(&site->ls_guard);
114         if (kill_it)
115                 /*
116                  * Object was already removed from hash and lru above, can
117                  * kill it.
118                  */
119                 lu_object_free(env, orig);
120 }
121 EXPORT_SYMBOL(lu_object_put);
122
123 /*
124  * Allocate new object.
125  *
126  * This follows object creation protocol, described in the comment within
127  * struct lu_device_operations definition.
128  */
129 static struct lu_object *lu_object_alloc(const struct lu_env *env,
130                                          struct lu_site *s,
131                                          const struct lu_fid *f)
132 {
133         struct lu_object *scan;
134         struct lu_object *top;
135         struct list_head *layers;
136         int clean;
137         int result;
138         ENTRY;
139
140         /*
141          * Create top-level object slice. This will also create
142          * lu_object_header.
143          */
144         top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
145                                                       NULL, s->ls_top_dev);
146         if (top == NULL)
147                 RETURN(ERR_PTR(-ENOMEM));
148         /*
149          * This is the only place where object fid is assigned. It's constant
150          * after this point.
151          */
152         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
153         top->lo_header->loh_fid  = *f;
154         layers = &top->lo_header->loh_layers;
155         do {
156                 /*
157                  * Call ->loo_object_init() repeatedly, until no more new
158                  * object slices are created.
159                  */
160                 clean = 1;
161                 list_for_each_entry(scan, layers, lo_linkage) {
162                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
163                                 continue;
164                         clean = 0;
165                         scan->lo_header = top->lo_header;
166                         result = scan->lo_ops->loo_object_init(env, scan);
167                         if (result != 0) {
168                                 lu_object_free(env, top);
169                                 RETURN(ERR_PTR(result));
170                         }
171                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
172                 }
173         } while (!clean);
174
175         list_for_each_entry_reverse(scan, layers, lo_linkage) {
176                 if (scan->lo_ops->loo_object_start != NULL) {
177                         result = scan->lo_ops->loo_object_start(env, scan);
178                         if (result != 0) {
179                                 lu_object_free(env, top);
180                                 RETURN(ERR_PTR(result));
181                         }
182                 }
183         }
184
185         s->ls_stats.s_created ++;
186         RETURN(top);
187 }
188
189 /*
190  * Free object.
191  */
192 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
193 {
194         struct list_head splice;
195         struct lu_object *scan;
196
197         /*
198          * First call ->loo_object_delete() method to release all resources.
199          */
200         list_for_each_entry_reverse(scan,
201                                     &o->lo_header->loh_layers, lo_linkage) {
202                 if (scan->lo_ops->loo_object_delete != NULL)
203                         scan->lo_ops->loo_object_delete(env, scan);
204         }
205
206         /*
207          * Then, splice object layers into stand-alone list, and call
208          * ->loo_object_free() on all layers to free memory. Splice is
209          * necessary, because lu_object_header is freed together with the
210          * top-level slice.
211          */
212         CFS_INIT_LIST_HEAD(&splice);
213         list_splice_init(&o->lo_header->loh_layers, &splice);
214         while (!list_empty(&splice)) {
215                 o = container_of0(splice.next, struct lu_object, lo_linkage);
216                 list_del_init(&o->lo_linkage);
217                 LASSERT(o->lo_ops->loo_object_free != NULL);
218                 o->lo_ops->loo_object_free(env, o);
219         }
220 }
221
222 /*
223  * Free @nr objects from the cold end of the site LRU list.
224  */
225 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
226 {
227         struct list_head         dispose;
228         struct lu_object_header *h;
229         struct lu_object_header *temp;
230
231         CFS_INIT_LIST_HEAD(&dispose);
232         /*
233          * Under LRU list lock, scan LRU list and move unreferenced objects to
234          * the dispose list, removing them from LRU and hash table.
235          */
236         write_lock(&s->ls_guard);
237         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
238                 /*
239                  * Objects are sorted in lru order, and "busy" objects (ones
240                  * with h->loh_ref > 0) naturally tend to live near hot end
241                  * that we scan last. Unfortunately, sites usually have small
242                  * (less then ten) number of busy yet rarely accessed objects
243                  * (some global objects, accessed directly through pointers,
244                  * bypassing hash table). Currently algorithm scans them over
245                  * and over again. Probably we should move busy objects out of
246                  * LRU, or we can live with that.
247                  */
248                 if (nr-- == 0)
249                         break;
250                 if (atomic_read(&h->loh_ref) > 0)
251                         continue;
252                 hlist_del_init(&h->loh_hash);
253                 list_move(&h->loh_lru, &dispose);
254                 s->ls_total --;
255         }
256         write_unlock(&s->ls_guard);
257         /*
258          * Free everything on the dispose list. This is safe against races due
259          * to the reasons described in lu_object_put().
260          */
261         while (!list_empty(&dispose)) {
262                 h = container_of0(dispose.next,
263                                  struct lu_object_header, loh_lru);
264                 list_del_init(&h->loh_lru);
265                 lu_object_free(env, lu_object_top(h));
266                 s->ls_stats.s_lru_purged ++;
267         }
268         return nr;
269 }
270 EXPORT_SYMBOL(lu_site_purge);
271
272 /*
273  * Object printing.
274  *
275  * Code below has to jump through certain loops to output object description
276  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
277  * composes object description from strings that are parts of _lines_ of
278  * output (i.e., strings that are not terminated by newline). This doesn't fit
279  * very well into libcfs_debug_msg() interface that assumes that each message
280  * supplied to it is a self-contained output line.
281  *
282  * To work around this, strings are collected in a temporary buffer
283  * (implemented as a value of lu_cdebug_key key), until terminating newline
284  * character is detected.
285  *
286  */
287
288 enum {
289         /*
290          * Maximal line size.
291          *
292          * XXX overflow is not handled correctly.
293          */
294         LU_CDEBUG_LINE = 256
295 };
296
297 struct lu_cdebug_data {
298         /*
299          * Temporary buffer.
300          */
301         char lck_area[LU_CDEBUG_LINE];
302         /*
303          * fid staging area used by dt_store_open().
304          */
305         struct lu_fid_pack lck_pack;
306 };
307
308 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
309 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
310
311 /*
312  * Key, holding temporary buffer. This key is registered very early by
313  * lu_global_init().
314  */
315 struct lu_context_key lu_global_key = {
316         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
317         .lct_init = lu_global_key_init,
318         .lct_fini = lu_global_key_fini
319 };
320
321 /*
322  * Printer function emitting messages through libcfs_debug_msg().
323  */
324 int lu_cdebug_printer(const struct lu_env *env,
325                       void *cookie, const char *format, ...)
326 {
327         struct lu_cdebug_print_info *info = cookie;
328         struct lu_cdebug_data       *key;
329         int used;
330         int complete;
331         va_list args;
332
333         va_start(args, format);
334
335         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
336         LASSERT(key != NULL);
337
338         used = strlen(key->lck_area);
339         complete = format[strlen(format) - 1] == '\n';
340         /*
341          * Append new chunk to the buffer.
342          */
343         vsnprintf(key->lck_area + used,
344                   ARRAY_SIZE(key->lck_area) - used, format, args);
345         if (complete) {
346                 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
347                                  (char *)info->lpi_file, info->lpi_fn,
348                                  info->lpi_line, "%s", key->lck_area);
349                 key->lck_area[0] = 0;
350         }
351         va_end(args);
352         return 0;
353 }
354 EXPORT_SYMBOL(lu_cdebug_printer);
355
356 /*
357  * Print object header.
358  */
359 static void lu_object_header_print(const struct lu_env *env,
360                                    void *cookie, lu_printer_t printer,
361                                    const struct lu_object_header *hdr)
362 {
363         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
364                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
365                    PFID(&hdr->loh_fid),
366                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
367                    list_empty(&hdr->loh_lru) ? "" : " lru",
368                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
369 }
370
371 /*
372  * Print human readable representation of the @o to the @printer.
373  */
374 void lu_object_print(const struct lu_env *env, void *cookie,
375                      lu_printer_t printer, const struct lu_object *o)
376 {
377         static const char ruler[] = "........................................";
378         struct lu_object_header *top;
379         int depth;
380
381         top = o->lo_header;
382         lu_object_header_print(env, cookie, printer, top);
383         (*printer)(env, cookie, "\n");
384         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
385                 depth = o->lo_depth + 4;
386                 LASSERT(o->lo_ops->loo_object_print != NULL);
387                 /*
388                  * print `.' @depth times.
389                  */
390                 (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
391                 o->lo_ops->loo_object_print(env, cookie, printer, o);
392                 (*printer)(env, cookie, "\n");
393         }
394 }
395 EXPORT_SYMBOL(lu_object_print);
396
397 /*
398  * Check object consistency.
399  */
400 int lu_object_invariant(const struct lu_object *o)
401 {
402         struct lu_object_header *top;
403
404         top = o->lo_header;
405         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
406                 if (o->lo_ops->loo_object_invariant != NULL &&
407                     !o->lo_ops->loo_object_invariant(o))
408                         return 0;
409         }
410         return 1;
411 }
412 EXPORT_SYMBOL(lu_object_invariant);
413
414 static struct lu_object *htable_lookup(struct lu_site *s,
415                                        const struct hlist_head *bucket,
416                                        const struct lu_fid *f)
417 {
418         struct lu_object_header *h;
419         struct hlist_node *scan;
420
421         hlist_for_each_entry(h, scan, bucket, loh_hash) {
422                 s->ls_stats.s_cache_check ++;
423                 if (likely(lu_fid_eq(&h->loh_fid, f) &&
424                            !lu_object_is_dying(h))) {
425                         /* bump reference count... */
426                         if (atomic_add_return(1, &h->loh_ref) == 1)
427                                 ++ s->ls_busy;
428                         /* and move to the head of the LRU */
429                         /*
430                          * XXX temporary disable this to measure effects of
431                          * read-write locking.
432                          */
433                         /* list_move_tail(&h->loh_lru, &s->ls_lru); */
434                         s->ls_stats.s_cache_hit ++;
435                         return lu_object_top(h);
436                 }
437         }
438         s->ls_stats.s_cache_miss ++;
439         return NULL;
440 }
441
442 static __u32 fid_hash(const struct lu_fid *f, int bits)
443 {
444         /* all objects with same id and different versions will belong to same
445          * collisions list. */
446         return hash_long(fid_flatten(f), bits);
447 }
448
449 /*
450  * Search cache for an object with the fid @f. If such object is found, return
451  * it. Otherwise, create new object, insert it into cache and return it. In
452  * any case, additional reference is acquired on the returned object.
453  */
454 struct lu_object *lu_object_find(const struct lu_env *env,
455                                  struct lu_site *s, const struct lu_fid *f)
456 {
457         struct lu_object     *o;
458         struct lu_object     *shadow;
459         struct hlist_head *bucket;
460
461         /*
462          * This uses standard index maintenance protocol:
463          *
464          *     - search index under lock, and return object if found;
465          *     - otherwise, unlock index, allocate new object;
466          *     - lock index and search again;
467          *     - if nothing is found (usual case), insert newly created
468          *       object into index;
469          *     - otherwise (race: other thread inserted object), free
470          *       object just allocated.
471          *     - unlock index;
472          *     - return object.
473          */
474
475         bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits);
476
477         read_lock(&s->ls_guard);
478         o = htable_lookup(s, bucket, f);
479         read_unlock(&s->ls_guard);
480
481         if (o != NULL)
482                 return o;
483
484         /*
485          * Allocate new object. This may result in rather complicated
486          * operations, including fld queries, inode loading, etc.
487          */
488         o = lu_object_alloc(env, s, f);
489         if (unlikely(IS_ERR(o)))
490                 return o;
491
492         LASSERT(lu_fid_eq(lu_object_fid(o), f));
493
494         write_lock(&s->ls_guard);
495         shadow = htable_lookup(s, bucket, f);
496         if (likely(shadow == NULL)) {
497                 hlist_add_head(&o->lo_header->loh_hash, bucket);
498                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
499                 ++ s->ls_busy;
500                 ++ s->ls_total;
501                 shadow = o;
502                 o = NULL;
503         } else
504                 s->ls_stats.s_cache_race ++;
505         write_unlock(&s->ls_guard);
506         if (o != NULL)
507                 lu_object_free(env, o);
508         return shadow;
509 }
510 EXPORT_SYMBOL(lu_object_find);
511
512 /*
513  * Global list of all sites on this node
514  */
515 static CFS_LIST_HEAD(lu_sites);
516 static DECLARE_MUTEX(lu_sites_guard);
517
518 /*
519  * Global environment used by site shrinker.
520  */
521 static struct lu_env lu_shrink_env;
522
523 /*
524  * Print all objects in @s.
525  */
526 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
527                    lu_printer_t printer)
528 {
529         int i;
530
531         for (i = 0; i < s->ls_hash_size; ++i) {
532                 struct lu_object_header *h;
533                 struct hlist_node       *scan;
534
535                 read_lock(&s->ls_guard);
536                 hlist_for_each_entry(h, scan, &s->ls_hash[i], loh_hash) {
537
538                         if (!list_empty(&h->loh_layers)) {
539                                 const struct lu_object *obj;
540
541                                 obj = lu_object_top(h);
542                                 lu_object_print(env, cookie, printer, obj);
543                         } else
544                                 lu_object_header_print(env, cookie, printer, h);
545                 }
546                 read_unlock(&s->ls_guard);
547         }
548 }
549 EXPORT_SYMBOL(lu_site_print);
550
551 enum {
552         LU_CACHE_PERCENT   = 20,
553 };
554
555 /*
556  * Return desired hash table order.
557  */
558 static int lu_htable_order(void)
559 {
560         unsigned long cache_size;
561         int bits;
562
563         /*
564          * Calculate hash table size, assuming that we want reasonable
565          * performance when 20% of total memory is occupied by cache of
566          * lu_objects.
567          *
568          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
569          */
570         cache_size = num_physpages;
571
572 #if BITS_PER_LONG == 32
573         /* limit hashtable size for lowmem systems to low RAM */
574         if (cache_size > 1 << (30 - CFS_PAGE_SHIFT))
575                 cache_size = 1 << (30 - CFS_PAGE_SHIFT) * 3 / 4;
576 #endif
577
578         cache_size = cache_size / 100 * LU_CACHE_PERCENT *
579                 (CFS_PAGE_SIZE / 1024);
580
581         for (bits = 1; (1 << bits) < cache_size; ++bits) {
582                 ;
583         }
584         return bits;
585 }
586
587 /*
588  * Initialize site @s, with @d as the top level device.
589  */
590 int lu_site_init(struct lu_site *s, struct lu_device *top)
591 {
592         int bits;
593         int size;
594         int i;
595         ENTRY;
596
597         memset(s, 0, sizeof *s);
598         rwlock_init(&s->ls_guard);
599         CFS_INIT_LIST_HEAD(&s->ls_lru);
600         CFS_INIT_LIST_HEAD(&s->ls_linkage);
601         s->ls_top_dev = top;
602         top->ld_site = s;
603         lu_device_get(top);
604
605         for (bits = lu_htable_order(), size = 1 << bits;
606              (s->ls_hash =
607               cfs_alloc_large(size * sizeof s->ls_hash[0])) == NULL;
608              --bits, size >>= 1) {
609                 /*
610                  * Scale hash table down, until allocation succeeds.
611                  */
612                 ;
613         }
614
615         s->ls_hash_size = size;
616         s->ls_hash_bits = bits;
617         s->ls_hash_mask = size - 1;
618
619         for (i = 0; i < size; i++)
620                 INIT_HLIST_HEAD(&s->ls_hash[i]);
621
622         RETURN(0);
623 }
624 EXPORT_SYMBOL(lu_site_init);
625
626 /*
627  * Finalize @s and release its resources.
628  */
629 void lu_site_fini(struct lu_site *s)
630 {
631         LASSERT(list_empty(&s->ls_lru));
632         LASSERT(s->ls_total == 0);
633
634         down(&lu_sites_guard);
635         list_del_init(&s->ls_linkage);
636         up(&lu_sites_guard);
637
638         if (s->ls_hash != NULL) {
639                 int i;
640                 for (i = 0; i < s->ls_hash_size; i++)
641                         LASSERT(hlist_empty(&s->ls_hash[i]));
642                 cfs_free_large(s->ls_hash);
643                 s->ls_hash = NULL;
644         }
645         if (s->ls_top_dev != NULL) {
646                 s->ls_top_dev->ld_site = NULL;
647                 lu_device_put(s->ls_top_dev);
648                 s->ls_top_dev = NULL;
649         }
650 }
651 EXPORT_SYMBOL(lu_site_fini);
652
653 /*
654  * Called when initialization of stack for this site is completed.
655  */
656 int lu_site_init_finish(struct lu_site *s)
657 {
658         int result;
659         down(&lu_sites_guard);
660         result = lu_context_refill(&lu_shrink_env.le_ctx);
661         if (result == 0)
662                 list_add(&s->ls_linkage, &lu_sites);
663         up(&lu_sites_guard);
664         return result;
665 }
666 EXPORT_SYMBOL(lu_site_init_finish);
667
668 /*
669  * Acquire additional reference on device @d
670  */
671 void lu_device_get(struct lu_device *d)
672 {
673         atomic_inc(&d->ld_ref);
674 }
675 EXPORT_SYMBOL(lu_device_get);
676
677 /*
678  * Release reference on device @d.
679  */
680 void lu_device_put(struct lu_device *d)
681 {
682         atomic_dec(&d->ld_ref);
683 }
684 EXPORT_SYMBOL(lu_device_put);
685
686 /*
687  * Initialize device @d of type @t.
688  */
689 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
690 {
691         memset(d, 0, sizeof *d);
692         atomic_set(&d->ld_ref, 0);
693         d->ld_type = t;
694         return 0;
695 }
696 EXPORT_SYMBOL(lu_device_init);
697
698 /*
699  * Finalize device @d.
700  */
701 void lu_device_fini(struct lu_device *d)
702 {
703         if (d->ld_obd != NULL)
704                 /* finish lprocfs */
705                 lprocfs_obd_cleanup(d->ld_obd);
706
707         LASSERTF(atomic_read(&d->ld_ref) == 0,
708                  "Refcount is %u\n", atomic_read(&d->ld_ref));
709 }
710 EXPORT_SYMBOL(lu_device_fini);
711
712 /*
713  * Initialize object @o that is part of compound object @h and was created by
714  * device @d.
715  */
716 int lu_object_init(struct lu_object *o,
717                    struct lu_object_header *h, struct lu_device *d)
718 {
719         memset(o, 0, sizeof *o);
720         o->lo_header = h;
721         o->lo_dev    = d;
722         lu_device_get(d);
723         CFS_INIT_LIST_HEAD(&o->lo_linkage);
724         return 0;
725 }
726 EXPORT_SYMBOL(lu_object_init);
727
728 /*
729  * Finalize object and release its resources.
730  */
731 void lu_object_fini(struct lu_object *o)
732 {
733         LASSERT(list_empty(&o->lo_linkage));
734
735         if (o->lo_dev != NULL) {
736                 lu_device_put(o->lo_dev);
737                 o->lo_dev = NULL;
738         }
739 }
740 EXPORT_SYMBOL(lu_object_fini);
741
742 /*
743  * Add object @o as first layer of compound object @h
744  *
745  * This is typically called by the ->ldo_object_alloc() method of top-level
746  * device.
747  */
748 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
749 {
750         list_move(&o->lo_linkage, &h->loh_layers);
751 }
752 EXPORT_SYMBOL(lu_object_add_top);
753
754 /*
755  * Add object @o as a layer of compound object, going after @before.1
756  *
757  * This is typically called by the ->ldo_object_alloc() method of
758  * @before->lo_dev.
759  */
760 void lu_object_add(struct lu_object *before, struct lu_object *o)
761 {
762         list_move(&o->lo_linkage, &before->lo_linkage);
763 }
764 EXPORT_SYMBOL(lu_object_add);
765
766 /*
767  * Initialize compound object.
768  */
769 int lu_object_header_init(struct lu_object_header *h)
770 {
771         memset(h, 0, sizeof *h);
772         atomic_set(&h->loh_ref, 1);
773         INIT_HLIST_NODE(&h->loh_hash);
774         CFS_INIT_LIST_HEAD(&h->loh_lru);
775         CFS_INIT_LIST_HEAD(&h->loh_layers);
776         return 0;
777 }
778 EXPORT_SYMBOL(lu_object_header_init);
779
780 /*
781  * Finalize compound object.
782  */
783 void lu_object_header_fini(struct lu_object_header *h)
784 {
785         LASSERT(list_empty(&h->loh_layers));
786         LASSERT(list_empty(&h->loh_lru));
787         LASSERT(hlist_unhashed(&h->loh_hash));
788 }
789 EXPORT_SYMBOL(lu_object_header_fini);
790
791 /*
792  * Given a compound object, find its slice, corresponding to the device type
793  * @dtype.
794  */
795 struct lu_object *lu_object_locate(struct lu_object_header *h,
796                                    struct lu_device_type *dtype)
797 {
798         struct lu_object *o;
799
800         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
801                 if (o->lo_dev->ld_type == dtype)
802                         return o;
803         }
804         return NULL;
805 }
806 EXPORT_SYMBOL(lu_object_locate);
807
808
809
810 /*
811  * Finalize and free devices in the device stack.
812  * 
813  * Finalize device stack by purging object cache, and calling
814  * lu_device_type_operations::ldto_device_fini() and
815  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
816  */
817 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
818 {
819         struct lu_site   *site = top->ld_site;
820         struct lu_device *scan;
821         struct lu_device *next;
822
823         lu_site_purge(env, site, ~0);
824         for (scan = top; scan != NULL; scan = next) {
825                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
826                 lu_device_put(scan);
827         }
828
829         /* purge again. */
830         lu_site_purge(env, site, ~0);
831
832         if (!list_empty(&site->ls_lru) || site->ls_total != 0) {
833                 /*
834                  * Uh-oh, objects still exist.
835                  */
836                 static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR);
837
838                 lu_site_print(env, site, &cookie, lu_cdebug_printer);
839         }
840
841         for (scan = top; scan != NULL; scan = next) {
842                 const struct lu_device_type *ldt = scan->ld_type;
843                 struct obd_type             *type;
844
845                 next = ldt->ldt_ops->ldto_device_free(env, scan);
846                 type = ldt->ldt_obd_type;
847                 type->typ_refcnt--;
848                 class_put_type(type);
849         }
850 }
851 EXPORT_SYMBOL(lu_stack_fini);
852
853 enum {
854         /*
855          * Maximal number of tld slots.
856          */
857         LU_CONTEXT_KEY_NR = 16
858 };
859
860 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
861
862 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
863
864 /*
865  * Register new key.
866  */
867 int lu_context_key_register(struct lu_context_key *key)
868 {
869         int result;
870         int i;
871
872         LASSERT(key->lct_init != NULL);
873         LASSERT(key->lct_fini != NULL);
874         LASSERT(key->lct_tags != 0);
875         LASSERT(key->lct_owner != NULL);
876
877         result = -ENFILE;
878         spin_lock(&lu_keys_guard);
879         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
880                 if (lu_keys[i] == NULL) {
881                         key->lct_index = i;
882                         atomic_set(&key->lct_used, 1);
883                         lu_keys[i] = key;
884                         result = 0;
885                         break;
886                 }
887         }
888         spin_unlock(&lu_keys_guard);
889         return result;
890 }
891 EXPORT_SYMBOL(lu_context_key_register);
892
893 static void key_fini(struct lu_context *ctx, int index)
894 {
895         if (ctx->lc_value[index] != NULL) {
896                 struct lu_context_key *key;
897
898                 key = lu_keys[index];
899                 LASSERT(key != NULL);
900                 LASSERT(key->lct_fini != NULL);
901                 LASSERT(atomic_read(&key->lct_used) > 1);
902
903                 key->lct_fini(ctx, key, ctx->lc_value[index]);
904                 atomic_dec(&key->lct_used);
905                 LASSERT(key->lct_owner != NULL);
906                 if (!(ctx->lc_tags & LCT_NOREF)) {
907                         LASSERT(module_refcount(key->lct_owner) > 0);
908                         module_put(key->lct_owner);
909                 }
910                 ctx->lc_value[index] = NULL;
911         }
912 }
913
914 /*
915  * Deregister key.
916  */
917 void lu_context_key_degister(struct lu_context_key *key)
918 {
919         LASSERT(atomic_read(&key->lct_used) >= 1);
920         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
921
922         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
923
924         if (atomic_read(&key->lct_used) > 1)
925                 CERROR("key has instances.\n");
926         spin_lock(&lu_keys_guard);
927         lu_keys[key->lct_index] = NULL;
928         spin_unlock(&lu_keys_guard);
929 }
930 EXPORT_SYMBOL(lu_context_key_degister);
931
932 /*
933  * Return value associated with key @key in context @ctx.
934  */
935 void *lu_context_key_get(const struct lu_context *ctx,
936                          struct lu_context_key *key)
937 {
938         LASSERT(ctx->lc_state == LCS_ENTERED);
939         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
940         return ctx->lc_value[key->lct_index];
941 }
942 EXPORT_SYMBOL(lu_context_key_get);
943
944 static void keys_fini(struct lu_context *ctx)
945 {
946         int i;
947
948         if (ctx->lc_value != NULL) {
949                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
950                         key_fini(ctx, i);
951                 OBD_FREE(ctx->lc_value,
952                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
953                 ctx->lc_value = NULL;
954         }
955 }
956
957 static int keys_fill(const struct lu_context *ctx)
958 {
959         int i;
960
961         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
962                 struct lu_context_key *key;
963
964                 key = lu_keys[i];
965                 if (ctx->lc_value[i] == NULL &&
966                     key != NULL && key->lct_tags & ctx->lc_tags) {
967                         void *value;
968
969                         LASSERT(key->lct_init != NULL);
970                         LASSERT(key->lct_index == i);
971
972                         value = key->lct_init(ctx, key);
973                         if (unlikely(IS_ERR(value)))
974                                 return PTR_ERR(value);
975                         LASSERT(key->lct_owner != NULL);
976                         if (!(ctx->lc_tags & LCT_NOREF))
977                                 try_module_get(key->lct_owner);
978                         atomic_inc(&key->lct_used);
979                         ctx->lc_value[i] = value;
980                 }
981         }
982         return 0;
983 }
984
985 static int keys_init(struct lu_context *ctx)
986 {
987         int result;
988
989         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
990         if (likely(ctx->lc_value != NULL))
991                 result = keys_fill(ctx);
992         else
993                 result = -ENOMEM;
994
995         if (result != 0)
996                 keys_fini(ctx);
997         return result;
998 }
999
1000 /*
1001  * Initialize context data-structure. Create values for all keys.
1002  */
1003 int lu_context_init(struct lu_context *ctx, __u32 tags)
1004 {
1005         memset(ctx, 0, sizeof *ctx);
1006         ctx->lc_state = LCS_INITIALIZED;
1007         ctx->lc_tags = tags;
1008         return keys_init(ctx);
1009 }
1010 EXPORT_SYMBOL(lu_context_init);
1011
1012 /*
1013  * Finalize context data-structure. Destroy key values.
1014  */
1015 void lu_context_fini(struct lu_context *ctx)
1016 {
1017         LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1018         ctx->lc_state = LCS_FINALIZED;
1019         keys_fini(ctx);
1020 }
1021 EXPORT_SYMBOL(lu_context_fini);
1022
1023 /*
1024  * Called before entering context.
1025  */
1026 void lu_context_enter(struct lu_context *ctx)
1027 {
1028         LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1029         ctx->lc_state = LCS_ENTERED;
1030 }
1031 EXPORT_SYMBOL(lu_context_enter);
1032
1033 /*
1034  * Called after exiting from @ctx
1035  */
1036 void lu_context_exit(struct lu_context *ctx)
1037 {
1038         int i;
1039
1040         LASSERT(ctx->lc_state == LCS_ENTERED);
1041         ctx->lc_state = LCS_LEFT;
1042         if (ctx->lc_value != NULL) {
1043                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1044                         if (ctx->lc_value[i] != NULL) {
1045                                 struct lu_context_key *key;
1046
1047                                 key = lu_keys[i];
1048                                 LASSERT(key != NULL);
1049                                 if (key->lct_exit != NULL)
1050                                         key->lct_exit(ctx,
1051                                                       key, ctx->lc_value[i]);
1052                         }
1053                 }
1054         }
1055 }
1056 EXPORT_SYMBOL(lu_context_exit);
1057
1058 /*
1059  * Allocate for context all missing keys that were registered after context
1060  * creation.
1061  */
1062 int lu_context_refill(const struct lu_context *ctx)
1063 {
1064         LASSERT(ctx->lc_value != NULL);
1065         return keys_fill(ctx);
1066 }
1067 EXPORT_SYMBOL(lu_context_refill);
1068
1069 static int lu_env_setup(struct lu_env *env, struct lu_context *ses,
1070                         __u32 tags, int noref)
1071 {
1072         int result;
1073
1074         LASSERT(ergo(!noref, !(tags & LCT_NOREF)));
1075
1076         env->le_ses = ses;
1077         result = lu_context_init(&env->le_ctx, tags);
1078         if (likely(result == 0))
1079                 lu_context_enter(&env->le_ctx);
1080         return result;
1081 }
1082
1083 static int lu_env_init_noref(struct lu_env *env, struct lu_context *ses,
1084                              __u32 tags)
1085 {
1086         return lu_env_setup(env, ses, tags, 1);
1087 }
1088
1089 int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags)
1090 {
1091         return lu_env_setup(env, ses, tags, 0);
1092 }
1093 EXPORT_SYMBOL(lu_env_init);
1094
1095 void lu_env_fini(struct lu_env *env)
1096 {
1097         lu_context_exit(&env->le_ctx);
1098         lu_context_fini(&env->le_ctx);
1099         env->le_ses = NULL;
1100 }
1101 EXPORT_SYMBOL(lu_env_fini);
1102
1103 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1104 {
1105         struct lu_site *s;
1106         struct lu_site *tmp;
1107         int cached = 0;
1108         int remain = nr;
1109         CFS_LIST_HEAD(splice);
1110
1111         if (nr != 0 && !(gfp_mask & __GFP_FS))
1112                 return -1;
1113
1114         down(&lu_sites_guard);
1115         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1116                 if (nr != 0) {
1117                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1118                         /*
1119                          * Move just shrunk site to the tail of site list to
1120                          * assure shrinking fairness.
1121                          */
1122                         list_move_tail(&s->ls_linkage, &splice);
1123                 }
1124                 read_lock(&s->ls_guard);
1125                 cached += s->ls_total - s->ls_busy;
1126                 read_unlock(&s->ls_guard);
1127                 if (remain <= 0)
1128                         break;
1129         }
1130         list_splice(&splice, lu_sites.prev);
1131         up(&lu_sites_guard);
1132         return cached;
1133 }
1134
1135 static struct shrinker *lu_site_shrinker = NULL;
1136
1137 /*
1138  * Initialization of global lu_* data.
1139  */
1140 int lu_global_init(void)
1141 {
1142         int result;
1143
1144         LU_CONTEXT_KEY_INIT(&lu_global_key);
1145         result = lu_context_key_register(&lu_global_key);
1146         if (result == 0) {
1147                 /*
1148                  * At this level, we don't know what tags are needed, so
1149                  * allocate them conservatively. This should not be too bad,
1150                  * because this environment is global.
1151                  */
1152                 down(&lu_sites_guard);
1153                 result = lu_env_init_noref(&lu_shrink_env, NULL, LCT_SHRINKER);
1154                 up(&lu_sites_guard);
1155                 if (result == 0) {
1156                         /*
1157                          * seeks estimation: 3 seeks to read a record from oi,
1158                          * one to read inode, one for ea. Unfortunately
1159                          * setting this high value results in lu_object/inode
1160                          * cache consuming all the memory.
1161                          */
1162                         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS,
1163                                                         lu_cache_shrink);
1164                         if (result == 0)
1165                                 result = lu_time_global_init();
1166                 }
1167         }
1168         return result;
1169 }
1170
1171 /*
1172  * Dual to lu_global_init().
1173  */
1174 void lu_global_fini(void)
1175 {
1176         lu_time_global_fini();
1177         if (lu_site_shrinker != NULL) {
1178                 remove_shrinker(lu_site_shrinker);
1179                 lu_site_shrinker = NULL;
1180         }
1181
1182         lu_context_key_degister(&lu_global_key);
1183
1184         /*
1185          * Tear shrinker environment down _after_ de-registering
1186          * lu_global_key, because the latter has a value in the former.
1187          */
1188         down(&lu_sites_guard);
1189         lu_env_fini(&lu_shrink_env);
1190         up(&lu_sites_guard);
1191 }
1192
1193 struct lu_buf LU_BUF_NULL = {
1194         .lb_buf = NULL,
1195         .lb_len = 0
1196 };
1197 EXPORT_SYMBOL(LU_BUF_NULL);
1198
1199 /*
1200  * XXX: Functions below logically belong to fid module, but they are used by
1201  * dt_store_open(). Put them here until better place is found.
1202  */
1203
1204 void fid_pack(struct lu_fid_pack *pack, const struct lu_fid *fid,
1205               struct lu_fid *befider)
1206 {
1207         int recsize;
1208         __u64 seq;
1209         __u32 oid;
1210
1211         seq = fid_seq(fid);
1212         oid = fid_oid(fid);
1213
1214         /*
1215          * Two cases: compact 6 bytes representation for a common case, and
1216          * full 17 byte representation for "unusual" fid.
1217          */
1218
1219         /*
1220          * Check that usual case is really usual.
1221          */
1222         CLASSERT(LUSTRE_SEQ_MAX_WIDTH < 0xffffull);
1223
1224         if (fid_is_igif(fid) ||
1225             seq > 0xffffffull || oid > 0xffff || fid_ver(fid) != 0) {
1226                 fid_cpu_to_be(befider, fid);
1227                 recsize = sizeof *befider;
1228         } else {
1229                 unsigned char *small_befider;
1230
1231                 small_befider = (char *)befider;
1232
1233                 small_befider[0] = seq >> 16;
1234                 small_befider[1] = seq >> 8;
1235                 small_befider[2] = seq;
1236
1237                 small_befider[3] = oid >> 8;
1238                 small_befider[4] = oid;
1239
1240                 recsize = 5;
1241         }
1242         memcpy(pack->fp_area, befider, recsize);
1243         pack->fp_len = recsize + 1;
1244 }
1245 EXPORT_SYMBOL(fid_pack);
1246
1247 int fid_unpack(const struct lu_fid_pack *pack, struct lu_fid *fid)
1248 {
1249         int result;
1250
1251         result = 0;
1252         switch (pack->fp_len) {
1253         case sizeof *fid + 1:
1254                 memcpy(fid, pack->fp_area, sizeof *fid);
1255                 fid_be_to_cpu(fid, fid);
1256                 break;
1257         case 6: {
1258                 const unsigned char *area;
1259
1260                 area = pack->fp_area;
1261                 fid->f_seq = (area[0] << 16) | (area[1] << 8) | area[2];
1262                 fid->f_oid = (area[3] << 8) | area[4];
1263                 fid->f_ver = 0;
1264                 break;
1265         }
1266         default:
1267                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1268                 result = -EIO;
1269         }
1270         return result;
1271 }
1272 EXPORT_SYMBOL(fid_unpack);
1273
1274 const char *lu_time_names[LU_TIME_NR] = {
1275         [LU_TIME_FIND_LOOKUP] = "find_lookup",
1276         [LU_TIME_FIND_ALLOC]  = "find_alloc",
1277         [LU_TIME_FIND_INSERT] = "find_insert"
1278 };
1279 EXPORT_SYMBOL(lu_time_names);