Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 /* nr_free_pages() */
39 #include <linux/swap.h>
40 /* hash_long() */
41 #include <linux/hash.h>
42 #include <obd_support.h>
43 #include <lustre_disk.h>
44 #include <lustre_fid.h>
45 #include <lu_object.h>
46 #include <libcfs/list.h>
47 /* lu_time_global_{init,fini}() */
48 #include <lu_time.h>
49
50 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
51
52 /*
53  * Decrease reference counter on object. If last reference is freed, return
54  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
55  * case, free object immediately.
56  */
57 void lu_object_put(const struct lu_env *env, struct lu_object *o)
58 {
59         struct lu_object_header *top;
60         struct lu_site          *site;
61         struct lu_object        *orig;
62         int                      kill_it;
63
64         top = o->lo_header;
65         site = o->lo_dev->ld_site;
66         orig = o;
67         kill_it = 0;
68         write_lock(&site->ls_guard);
69         if (atomic_dec_and_test(&top->loh_ref)) {
70                 /*
71                  * When last reference is released, iterate over object
72                  * layers, and notify them that object is no longer busy.
73                  */
74                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
75                         if (o->lo_ops->loo_object_release != NULL)
76                                 o->lo_ops->loo_object_release(env, o);
77                 }
78                 -- site->ls_busy;
79                 if (lu_object_is_dying(top)) {
80                         /*
81                          * If object is dying (will not be cached), removed it
82                          * from hash table and LRU.
83                          *
84                          * This is done with hash table and LRU lists
85                          * locked. As the only way to acquire first reference
86                          * to previously unreferenced object is through
87                          * hash-table lookup (lu_object_find()), or LRU
88                          * scanning (lu_site_purge()), that are done under
89                          * hash-table and LRU lock, no race with concurrent
90                          * object lookup is possible and we can safely destroy
91                          * object below.
92                          */
93                         hlist_del_init(&top->loh_hash);
94                         list_del_init(&top->loh_lru);
95                         -- site->ls_total;
96                         kill_it = 1;
97                 }
98         }
99         write_unlock(&site->ls_guard);
100         if (kill_it)
101                 /*
102                  * Object was already removed from hash and lru above, can
103                  * kill it.
104                  */
105                 lu_object_free(env, orig);
106 }
107 EXPORT_SYMBOL(lu_object_put);
108
109 /*
110  * Allocate new object.
111  *
112  * This follows object creation protocol, described in the comment within
113  * struct lu_device_operations definition.
114  */
115 static struct lu_object *lu_object_alloc(const struct lu_env *env,
116                                          struct lu_site *s,
117                                          const struct lu_fid *f)
118 {
119         struct lu_object *scan;
120         struct lu_object *top;
121         struct list_head *layers;
122         int clean;
123         int result;
124
125         /*
126          * Create top-level object slice. This will also create
127          * lu_object_header.
128          */
129         top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
130                                                       NULL, s->ls_top_dev);
131         if (IS_ERR(top))
132                 RETURN(top);
133         /*
134          * This is the only place where object fid is assigned. It's constant
135          * after this point.
136          */
137         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
138         top->lo_header->loh_fid  = *f;
139         layers = &top->lo_header->loh_layers;
140         do {
141                 /*
142                  * Call ->loo_object_init() repeatedly, until no more new
143                  * object slices are created.
144                  */
145                 clean = 1;
146                 list_for_each_entry(scan, layers, lo_linkage) {
147                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
148                                 continue;
149                         clean = 0;
150                         scan->lo_header = top->lo_header;
151                         result = scan->lo_ops->loo_object_init(env, scan);
152                         if (result != 0) {
153                                 lu_object_free(env, top);
154                                 RETURN(ERR_PTR(result));
155                         }
156                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
157                 }
158         } while (!clean);
159
160         list_for_each_entry_reverse(scan, layers, lo_linkage) {
161                 if (scan->lo_ops->loo_object_start != NULL) {
162                         result = scan->lo_ops->loo_object_start(env, scan);
163                         if (result != 0) {
164                                 lu_object_free(env, top);
165                                 RETURN(ERR_PTR(result));
166                         }
167                 }
168         }
169
170         s->ls_stats.s_created ++;
171         RETURN(top);
172 }
173
174 /*
175  * Free object.
176  */
177 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
178 {
179         struct list_head splice;
180         struct lu_object *scan;
181
182         /*
183          * First call ->loo_object_delete() method to release all resources.
184          */
185         list_for_each_entry_reverse(scan,
186                                     &o->lo_header->loh_layers, lo_linkage) {
187                 if (scan->lo_ops->loo_object_delete != NULL)
188                         scan->lo_ops->loo_object_delete(env, scan);
189         }
190
191         /*
192          * Then, splice object layers into stand-alone list, and call
193          * ->loo_object_free() on all layers to free memory. Splice is
194          * necessary, because lu_object_header is freed together with the
195          * top-level slice.
196          */
197         INIT_LIST_HEAD(&splice);
198         list_splice_init(&o->lo_header->loh_layers, &splice);
199         while (!list_empty(&splice)) {
200                 o = container_of0(splice.next, struct lu_object, lo_linkage);
201                 list_del_init(&o->lo_linkage);
202                 LASSERT(o->lo_ops->loo_object_free != NULL);
203                 o->lo_ops->loo_object_free(env, o);
204         }
205 }
206
207 /*
208  * Free @nr objects from the cold end of the site LRU list.
209  */
210 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
211 {
212         struct list_head         dispose;
213         struct lu_object_header *h;
214         struct lu_object_header *temp;
215
216         INIT_LIST_HEAD(&dispose);
217         /*
218          * Under LRU list lock, scan LRU list and move unreferenced objects to
219          * the dispose list, removing them from LRU and hash table.
220          */
221         write_lock(&s->ls_guard);
222         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
223                 /*
224                  * Objects are sorted in lru order, and "busy" objects (ones
225                  * with h->loh_ref > 0) naturally tend to live near hot end
226                  * that we scan last. Unfortunately, sites usually have small
227                  * (less then ten) number of busy yet rarely accessed objects
228                  * (some global objects, accessed directly through pointers,
229                  * bypassing hash table). Currently algorithm scans them over
230                  * and over again. Probably we should move busy objects out of
231                  * LRU, or we can live with that.
232                  */
233                 if (nr-- == 0)
234                         break;
235                 if (atomic_read(&h->loh_ref) > 0)
236                         continue;
237                 hlist_del_init(&h->loh_hash);
238                 list_move(&h->loh_lru, &dispose);
239                 s->ls_total --;
240         }
241         write_unlock(&s->ls_guard);
242         /*
243          * Free everything on the dispose list. This is safe against races due
244          * to the reasons described in lu_object_put().
245          */
246         while (!list_empty(&dispose)) {
247                 h = container_of0(dispose.next,
248                                  struct lu_object_header, loh_lru);
249                 list_del_init(&h->loh_lru);
250                 lu_object_free(env, lu_object_top(h));
251                 s->ls_stats.s_lru_purged ++;
252         }
253         return nr;
254 }
255 EXPORT_SYMBOL(lu_site_purge);
256
257 /*
258  * Object printing.
259  *
260  * Code below has to jump through certain loops to output object description
261  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
262  * composes object description from strings that are parts of _lines_ of
263  * output (i.e., strings that are not terminated by newline). This doesn't fit
264  * very well into libcfs_debug_msg() interface that assumes that each message
265  * supplied to it is a self-contained output line.
266  *
267  * To work around this, strings are collected in a temporary buffer
268  * (implemented as a value of lu_cdebug_key key), until terminating newline
269  * character is detected.
270  *
271  */
272
273 enum {
274         /*
275          * Maximal line size.
276          *
277          * XXX overflow is not handled correctly.
278          */
279         LU_CDEBUG_LINE = 256
280 };
281
282 struct lu_cdebug_data {
283         /*
284          * Temporary buffer.
285          */
286         char lck_area[LU_CDEBUG_LINE];
287         /*
288          * fid staging area used by dt_store_open().
289          */
290         struct lu_fid_pack lck_pack;
291 };
292
293 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
294 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
295
296 /*
297  * Key, holding temporary buffer. This key is registered very early by
298  * lu_global_init().
299  */
300 struct lu_context_key lu_global_key = {
301         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
302         .lct_init = lu_global_key_init,
303         .lct_fini = lu_global_key_fini
304 };
305
306 /*
307  * Printer function emitting messages through libcfs_debug_msg().
308  */
309 int lu_cdebug_printer(const struct lu_env *env,
310                       void *cookie, const char *format, ...)
311 {
312         struct lu_cdebug_print_info *info = cookie;
313         struct lu_cdebug_data       *key;
314         int used;
315         int complete;
316         va_list args;
317
318         va_start(args, format);
319
320         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
321         LASSERT(key != NULL);
322
323         used = strlen(key->lck_area);
324         complete = format[strlen(format) - 1] == '\n';
325         /*
326          * Append new chunk to the buffer.
327          */
328         vsnprintf(key->lck_area + used,
329                   ARRAY_SIZE(key->lck_area) - used, format, args);
330         if (complete) {
331                 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
332                                  (char *)info->lpi_file, info->lpi_fn,
333                                  info->lpi_line, "%s", key->lck_area);
334                 key->lck_area[0] = 0;
335         }
336         va_end(args);
337         return 0;
338 }
339 EXPORT_SYMBOL(lu_cdebug_printer);
340
341 /*
342  * Print object header.
343  */
344 static void lu_object_header_print(const struct lu_env *env,
345                                    void *cookie, lu_printer_t printer,
346                                    const struct lu_object_header *hdr)
347 {
348         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
349                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
350                    PFID(&hdr->loh_fid),
351                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
352                    list_empty(&hdr->loh_lru) ? "" : " lru",
353                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
354 }
355
356 /*
357  * Print human readable representation of the @o to the @printer.
358  */
359 void lu_object_print(const struct lu_env *env, void *cookie,
360                      lu_printer_t printer, const struct lu_object *o)
361 {
362         static const char ruler[] = "........................................";
363         struct lu_object_header *top;
364         int depth;
365
366         top = o->lo_header;
367         lu_object_header_print(env, cookie, printer, top);
368         (*printer)(env, cookie, "\n");
369         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
370                 depth = o->lo_depth + 4;
371                 LASSERT(o->lo_ops->loo_object_print != NULL);
372                 /*
373                  * print `.' @depth times.
374                  */
375                 (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
376                 o->lo_ops->loo_object_print(env, cookie, printer, o);
377                 (*printer)(env, cookie, "\n");
378         }
379 }
380 EXPORT_SYMBOL(lu_object_print);
381
382 /*
383  * Check object consistency.
384  */
385 int lu_object_invariant(const struct lu_object *o)
386 {
387         struct lu_object_header *top;
388
389         top = o->lo_header;
390         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
391                 if (o->lo_ops->loo_object_invariant != NULL &&
392                     !o->lo_ops->loo_object_invariant(o))
393                         return 0;
394         }
395         return 1;
396 }
397 EXPORT_SYMBOL(lu_object_invariant);
398
399 static struct lu_object *htable_lookup(struct lu_site *s,
400                                        const struct hlist_head *bucket,
401                                        const struct lu_fid *f)
402 {
403         struct lu_object_header *h;
404         struct hlist_node *scan;
405
406         hlist_for_each_entry(h, scan, bucket, loh_hash) {
407                 s->ls_stats.s_cache_check ++;
408                 if (likely(lu_fid_eq(&h->loh_fid, f) &&
409                            !lu_object_is_dying(h))) {
410                         /* bump reference count... */
411                         if (atomic_add_return(1, &h->loh_ref) == 1)
412                                 ++ s->ls_busy;
413                         /* and move to the head of the LRU */
414                         /*
415                          * XXX temporary disable this to measure effects of
416                          * read-write locking.
417                          */
418                         /* list_move_tail(&h->loh_lru, &s->ls_lru); */
419                         s->ls_stats.s_cache_hit ++;
420                         return lu_object_top(h);
421                 }
422         }
423         s->ls_stats.s_cache_miss ++;
424         return NULL;
425 }
426
427 static __u32 fid_hash(const struct lu_fid *f, int bits)
428 {
429         /* all objects with same id and different versions will belong to same
430          * collisions list. */
431         return hash_long(fid_flatten(f), bits);
432 }
433
434 /*
435  * Search cache for an object with the fid @f. If such object is found, return
436  * it. Otherwise, create new object, insert it into cache and return it. In
437  * any case, additional reference is acquired on the returned object.
438  */
439 struct lu_object *lu_object_find(const struct lu_env *env,
440                                  struct lu_site *s, const struct lu_fid *f)
441 {
442         struct lu_object     *o;
443         struct lu_object     *shadow;
444         struct hlist_head *bucket;
445
446         /*
447          * This uses standard index maintenance protocol:
448          *
449          *     - search index under lock, and return object if found;
450          *     - otherwise, unlock index, allocate new object;
451          *     - lock index and search again;
452          *     - if nothing is found (usual case), insert newly created
453          *       object into index;
454          *     - otherwise (race: other thread inserted object), free
455          *       object just allocated.
456          *     - unlock index;
457          *     - return object.
458          */
459
460         bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits);
461
462         read_lock(&s->ls_guard);
463         o = htable_lookup(s, bucket, f);
464         read_unlock(&s->ls_guard);
465
466         if (o != NULL)
467                 return o;
468
469         /*
470          * Allocate new object. This may result in rather complicated
471          * operations, including fld queries, inode loading, etc.
472          */
473         o = lu_object_alloc(env, s, f);
474         if (unlikely(IS_ERR(o)))
475                 return o;
476
477         LASSERT(lu_fid_eq(lu_object_fid(o), f));
478
479         write_lock(&s->ls_guard);
480         shadow = htable_lookup(s, bucket, f);
481         if (likely(shadow == NULL)) {
482                 hlist_add_head(&o->lo_header->loh_hash, bucket);
483                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
484                 ++ s->ls_busy;
485                 ++ s->ls_total;
486                 shadow = o;
487                 o = NULL;
488         } else
489                 s->ls_stats.s_cache_race ++;
490         write_unlock(&s->ls_guard);
491         if (o != NULL)
492                 lu_object_free(env, o);
493         return shadow;
494 }
495 EXPORT_SYMBOL(lu_object_find);
496
497 /*
498  * Global list of all sites on this node
499  */
500 static LIST_HEAD(lu_sites);
501 static DECLARE_MUTEX(lu_sites_guard);
502
503 /*
504  * Global environment used by site shrinker.
505  */
506 static struct lu_env lu_shrink_env;
507
508 /*
509  * Print all objects in @s.
510  */
511 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
512                    lu_printer_t printer)
513 {
514         int i;
515
516         for (i = 0; i < s->ls_hash_size; ++i) {
517                 struct lu_object_header *h;
518                 struct hlist_node       *scan;
519
520                 read_lock(&s->ls_guard);
521                 hlist_for_each_entry(h, scan, &s->ls_hash[i], loh_hash) {
522
523                         if (!list_empty(&h->loh_layers)) {
524                                 const struct lu_object *obj;
525
526                                 obj = lu_object_top(h);
527                                 lu_object_print(env, cookie, printer, obj);
528                         } else
529                                 lu_object_header_print(env, cookie, printer, h);
530                 }
531                 read_unlock(&s->ls_guard);
532         }
533 }
534 EXPORT_SYMBOL(lu_site_print);
535
536 enum {
537         LU_CACHE_PERCENT   = 30,
538 };
539
540 /*
541  * Return desired hash table order.
542  */
543 static int lu_htable_order(void)
544 {
545         int bits;
546         unsigned long cache_size;
547
548         /*
549          * Calculate hash table size, assuming that we want reasonable
550          * performance when 30% of available memory is occupied by cache of
551          * lu_objects.
552          *
553          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
554          */
555         cache_size = nr_free_buffer_pages() / 100 *
556                 LU_CACHE_PERCENT * (CFS_PAGE_SIZE / 1024);
557
558         for (bits = 1; (1 << bits) < cache_size; ++bits) {
559                 ;
560         }
561         return bits;
562 }
563
564 /*
565  * Initialize site @s, with @d as the top level device.
566  */
567 int lu_site_init(struct lu_site *s, struct lu_device *top)
568 {
569         int bits;
570         int size;
571         int i;
572         ENTRY;
573
574         memset(s, 0, sizeof *s);
575         rwlock_init(&s->ls_guard);
576         CFS_INIT_LIST_HEAD(&s->ls_lru);
577         CFS_INIT_LIST_HEAD(&s->ls_linkage);
578         s->ls_top_dev = top;
579         top->ld_site = s;
580         lu_device_get(top);
581
582         for (bits = lu_htable_order(), size = 1 << bits;
583              (s->ls_hash =
584               cfs_alloc_large(size * sizeof s->ls_hash[0])) == NULL;
585              --bits, size >>= 1) {
586                 /*
587                  * Scale hash table down, until allocation succeeds.
588                  */
589                 ;
590         }
591
592         s->ls_hash_size = size;
593         s->ls_hash_bits = bits;
594         s->ls_hash_mask = size - 1;
595
596         for (i = 0; i < size; i++)
597                 INIT_HLIST_HEAD(&s->ls_hash[i]);
598
599         RETURN(0);
600 }
601 EXPORT_SYMBOL(lu_site_init);
602
603 /*
604  * Finalize @s and release its resources.
605  */
606 void lu_site_fini(struct lu_site *s)
607 {
608         LASSERT(list_empty(&s->ls_lru));
609         LASSERT(s->ls_total == 0);
610
611         down(&lu_sites_guard);
612         list_del_init(&s->ls_linkage);
613         up(&lu_sites_guard);
614
615         if (s->ls_hash != NULL) {
616                 int i;
617                 for (i = 0; i < s->ls_hash_size; i++)
618                         LASSERT(hlist_empty(&s->ls_hash[i]));
619                 cfs_free_large(s->ls_hash);
620                 s->ls_hash = NULL;
621         }
622         if (s->ls_top_dev != NULL) {
623                 s->ls_top_dev->ld_site = NULL;
624                 lu_device_put(s->ls_top_dev);
625                 s->ls_top_dev = NULL;
626         }
627 }
628 EXPORT_SYMBOL(lu_site_fini);
629
630 /*
631  * Called when initialization of stack for this site is completed.
632  */
633 int lu_site_init_finish(struct lu_site *s)
634 {
635         int result;
636         down(&lu_sites_guard);
637         result = lu_context_refill(&lu_shrink_env.le_ctx);
638         if (result == 0)
639                 list_add(&s->ls_linkage, &lu_sites);
640         up(&lu_sites_guard);
641         return result;
642 }
643 EXPORT_SYMBOL(lu_site_init_finish);
644
645 /*
646  * Acquire additional reference on device @d
647  */
648 void lu_device_get(struct lu_device *d)
649 {
650         atomic_inc(&d->ld_ref);
651 }
652 EXPORT_SYMBOL(lu_device_get);
653
654 /*
655  * Release reference on device @d.
656  */
657 void lu_device_put(struct lu_device *d)
658 {
659         atomic_dec(&d->ld_ref);
660 }
661 EXPORT_SYMBOL(lu_device_put);
662
663 /*
664  * Initialize device @d of type @t.
665  */
666 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
667 {
668         memset(d, 0, sizeof *d);
669         atomic_set(&d->ld_ref, 0);
670         d->ld_type = t;
671         return 0;
672 }
673 EXPORT_SYMBOL(lu_device_init);
674
675 /*
676  * Finalize device @d.
677  */
678 void lu_device_fini(struct lu_device *d)
679 {
680         if (d->ld_obd != NULL)
681                 /* finish lprocfs */
682                 lprocfs_obd_cleanup(d->ld_obd);
683
684         LASSERTF(atomic_read(&d->ld_ref) == 0,
685                  "Refcount is %u\n", atomic_read(&d->ld_ref));
686 }
687 EXPORT_SYMBOL(lu_device_fini);
688
689 /*
690  * Initialize object @o that is part of compound object @h and was created by
691  * device @d.
692  */
693 int lu_object_init(struct lu_object *o,
694                    struct lu_object_header *h, struct lu_device *d)
695 {
696         memset(o, 0, sizeof *o);
697         o->lo_header = h;
698         o->lo_dev    = d;
699         lu_device_get(d);
700         CFS_INIT_LIST_HEAD(&o->lo_linkage);
701         return 0;
702 }
703 EXPORT_SYMBOL(lu_object_init);
704
705 /*
706  * Finalize object and release its resources.
707  */
708 void lu_object_fini(struct lu_object *o)
709 {
710         LASSERT(list_empty(&o->lo_linkage));
711
712         if (o->lo_dev != NULL) {
713                 lu_device_put(o->lo_dev);
714                 o->lo_dev = NULL;
715         }
716 }
717 EXPORT_SYMBOL(lu_object_fini);
718
719 /*
720  * Add object @o as first layer of compound object @h
721  *
722  * This is typically called by the ->ldo_object_alloc() method of top-level
723  * device.
724  */
725 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
726 {
727         list_move(&o->lo_linkage, &h->loh_layers);
728 }
729 EXPORT_SYMBOL(lu_object_add_top);
730
731 /*
732  * Add object @o as a layer of compound object, going after @before.1
733  *
734  * This is typically called by the ->ldo_object_alloc() method of
735  * @before->lo_dev.
736  */
737 void lu_object_add(struct lu_object *before, struct lu_object *o)
738 {
739         list_move(&o->lo_linkage, &before->lo_linkage);
740 }
741 EXPORT_SYMBOL(lu_object_add);
742
743 /*
744  * Initialize compound object.
745  */
746 int lu_object_header_init(struct lu_object_header *h)
747 {
748         memset(h, 0, sizeof *h);
749         atomic_set(&h->loh_ref, 1);
750         INIT_HLIST_NODE(&h->loh_hash);
751         CFS_INIT_LIST_HEAD(&h->loh_lru);
752         CFS_INIT_LIST_HEAD(&h->loh_layers);
753         return 0;
754 }
755 EXPORT_SYMBOL(lu_object_header_init);
756
757 /*
758  * Finalize compound object.
759  */
760 void lu_object_header_fini(struct lu_object_header *h)
761 {
762         LASSERT(list_empty(&h->loh_layers));
763         LASSERT(list_empty(&h->loh_lru));
764         LASSERT(hlist_unhashed(&h->loh_hash));
765 }
766 EXPORT_SYMBOL(lu_object_header_fini);
767
768 /*
769  * Given a compound object, find its slice, corresponding to the device type
770  * @dtype.
771  */
772 struct lu_object *lu_object_locate(struct lu_object_header *h,
773                                    struct lu_device_type *dtype)
774 {
775         struct lu_object *o;
776
777         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
778                 if (o->lo_dev->ld_type == dtype)
779                         return o;
780         }
781         return NULL;
782 }
783 EXPORT_SYMBOL(lu_object_locate);
784
785 enum {
786         /*
787          * Maximal number of tld slots.
788          */
789         LU_CONTEXT_KEY_NR = 16
790 };
791
792 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
793
794 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
795
796 /*
797  * Register new key.
798  */
799 int lu_context_key_register(struct lu_context_key *key)
800 {
801         int result;
802         int i;
803
804         LASSERT(key->lct_init != NULL);
805         LASSERT(key->lct_fini != NULL);
806         LASSERT(key->lct_tags != 0);
807         LASSERT(key->lct_owner != NULL);
808
809         result = -ENFILE;
810         spin_lock(&lu_keys_guard);
811         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
812                 if (lu_keys[i] == NULL) {
813                         key->lct_index = i;
814                         atomic_set(&key->lct_used, 1);
815                         lu_keys[i] = key;
816                         result = 0;
817                         break;
818                 }
819         }
820         spin_unlock(&lu_keys_guard);
821         return result;
822 }
823 EXPORT_SYMBOL(lu_context_key_register);
824
825 static void key_fini(struct lu_context *ctx, int index)
826 {
827         if (ctx->lc_value[index] != NULL) {
828                 struct lu_context_key *key;
829
830                 key = lu_keys[index];
831                 LASSERT(key != NULL);
832                 LASSERT(key->lct_fini != NULL);
833                 LASSERT(atomic_read(&key->lct_used) > 1);
834
835                 key->lct_fini(ctx, key, ctx->lc_value[index]);
836                 atomic_dec(&key->lct_used);
837                 LASSERT(key->lct_owner != NULL);
838                 if (!(ctx->lc_tags & LCT_NOREF)) {
839                         LASSERT(module_refcount(key->lct_owner) > 0);
840                         module_put(key->lct_owner);
841                 }
842                 ctx->lc_value[index] = NULL;
843         }
844 }
845
846 /*
847  * Deregister key.
848  */
849 void lu_context_key_degister(struct lu_context_key *key)
850 {
851         LASSERT(atomic_read(&key->lct_used) >= 1);
852         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
853
854         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
855
856         if (atomic_read(&key->lct_used) > 1)
857                 CERROR("key has instances.\n");
858         spin_lock(&lu_keys_guard);
859         lu_keys[key->lct_index] = NULL;
860         spin_unlock(&lu_keys_guard);
861 }
862 EXPORT_SYMBOL(lu_context_key_degister);
863
864 /*
865  * Return value associated with key @key in context @ctx.
866  */
867 void *lu_context_key_get(const struct lu_context *ctx,
868                          struct lu_context_key *key)
869 {
870         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
871         return ctx->lc_value[key->lct_index];
872 }
873 EXPORT_SYMBOL(lu_context_key_get);
874
875 static void keys_fini(struct lu_context *ctx)
876 {
877         int i;
878
879         if (ctx->lc_value != NULL) {
880                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
881                         key_fini(ctx, i);
882                 OBD_FREE(ctx->lc_value,
883                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
884                 ctx->lc_value = NULL;
885         }
886 }
887
888 static int keys_fill(const struct lu_context *ctx)
889 {
890         int i;
891
892         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
893                 struct lu_context_key *key;
894
895                 key = lu_keys[i];
896                 if (ctx->lc_value[i] == NULL &&
897                     key != NULL && key->lct_tags & ctx->lc_tags) {
898                         void *value;
899
900                         LASSERT(key->lct_init != NULL);
901                         LASSERT(key->lct_index == i);
902
903                         value = key->lct_init(ctx, key);
904                         if (unlikely(IS_ERR(value)))
905                                 return PTR_ERR(value);
906                         LASSERT(key->lct_owner != NULL);
907                         if (!(ctx->lc_tags & LCT_NOREF))
908                                 try_module_get(key->lct_owner);
909                         atomic_inc(&key->lct_used);
910                         ctx->lc_value[i] = value;
911                 }
912         }
913         return 0;
914 }
915
916 static int keys_init(struct lu_context *ctx)
917 {
918         int result;
919
920         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
921         if (likely(ctx->lc_value != NULL))
922                 result = keys_fill(ctx);
923         else
924                 result = -ENOMEM;
925
926         if (result != 0)
927                 keys_fini(ctx);
928         return result;
929 }
930
931 /*
932  * Initialize context data-structure. Create values for all keys.
933  */
934 int lu_context_init(struct lu_context *ctx, __u32 tags)
935 {
936         memset(ctx, 0, sizeof *ctx);
937         ctx->lc_tags = tags;
938         return keys_init(ctx);
939 }
940 EXPORT_SYMBOL(lu_context_init);
941
942 /*
943  * Finalize context data-structure. Destroy key values.
944  */
945 void lu_context_fini(struct lu_context *ctx)
946 {
947         keys_fini(ctx);
948 }
949 EXPORT_SYMBOL(lu_context_fini);
950
951 /*
952  * Called before entering context.
953  */
954 void lu_context_enter(struct lu_context *ctx)
955 {
956 }
957 EXPORT_SYMBOL(lu_context_enter);
958
959 /*
960  * Called after exiting from @ctx
961  */
962 void lu_context_exit(struct lu_context *ctx)
963 {
964         int i;
965
966         if (ctx->lc_value != NULL) {
967                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
968                         if (ctx->lc_value[i] != NULL) {
969                                 struct lu_context_key *key;
970
971                                 key = lu_keys[i];
972                                 LASSERT(key != NULL);
973                                 if (key->lct_exit != NULL)
974                                         key->lct_exit(ctx,
975                                                       key, ctx->lc_value[i]);
976                         }
977                 }
978         }
979 }
980 EXPORT_SYMBOL(lu_context_exit);
981
982 /*
983  * Allocate for context all missing keys that were registered after context
984  * creation.
985  */
986 int lu_context_refill(const struct lu_context *ctx)
987 {
988         LASSERT(ctx->lc_value != NULL);
989         return keys_fill(ctx);
990 }
991 EXPORT_SYMBOL(lu_context_refill);
992
993 static int lu_env_setup(struct lu_env *env, struct lu_context *ses,
994                         __u32 tags, int noref)
995 {
996         int result;
997
998         LASSERT(ergo(!noref, !(tags & LCT_NOREF)));
999
1000         env->le_ses = ses;
1001         result = lu_context_init(&env->le_ctx, tags);
1002         if (likely(result == 0))
1003                 lu_context_enter(&env->le_ctx);
1004         return result;
1005 }
1006
1007 static int lu_env_init_noref(struct lu_env *env, struct lu_context *ses,
1008                              __u32 tags)
1009 {
1010         return lu_env_setup(env, ses, tags, 1);
1011 }
1012
1013 int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags)
1014 {
1015         return lu_env_setup(env, ses, tags, 0);
1016 }
1017 EXPORT_SYMBOL(lu_env_init);
1018
1019 void lu_env_fini(struct lu_env *env)
1020 {
1021         lu_context_exit(&env->le_ctx);
1022         lu_context_fini(&env->le_ctx);
1023         env->le_ses = NULL;
1024 }
1025 EXPORT_SYMBOL(lu_env_fini);
1026
1027 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1028 {
1029         struct lu_site *s;
1030         struct lu_site *tmp;
1031         int cached = 0;
1032         int remain = nr;
1033         LIST_HEAD(splice);
1034
1035         if (nr != 0 && !(gfp_mask & __GFP_FS))
1036                 return -1;
1037
1038         down(&lu_sites_guard);
1039         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1040                 if (nr != 0) {
1041                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1042                         /*
1043                          * Move just shrunk site to the tail of site list to
1044                          * assure shrinking fairness.
1045                          */
1046                         list_move_tail(&s->ls_linkage, &splice);
1047                 }
1048                 read_lock(&s->ls_guard);
1049                 cached += s->ls_total - s->ls_busy;
1050                 read_unlock(&s->ls_guard);
1051                 if (remain <= 0)
1052                         break;
1053         }
1054         list_splice(&splice, lu_sites.prev);
1055         up(&lu_sites_guard);
1056         return cached;
1057 }
1058
1059 static struct shrinker *lu_site_shrinker = NULL;
1060
1061 /*
1062  * Initialization of global lu_* data.
1063  */
1064 int lu_global_init(void)
1065 {
1066         int result;
1067
1068         LU_CONTEXT_KEY_INIT(&lu_global_key);
1069         result = lu_context_key_register(&lu_global_key);
1070         if (result == 0) {
1071                 /*
1072                  * At this level, we don't know what tags are needed, so
1073                  * allocate them conservatively. This should not be too bad,
1074                  * because this environment is global.
1075                  */
1076                 down(&lu_sites_guard);
1077                 result = lu_env_init_noref(&lu_shrink_env, NULL, LCT_SHRINKER);
1078                 up(&lu_sites_guard);
1079                 if (result == 0) {
1080                         /*
1081                          * seeks estimation: 3 seeks to read a record from oi,
1082                          * one to read inode, one for ea. Unfortunately
1083                          * setting this high value results in lu_object/inode
1084                          * cache consuming all the memory.
1085                          */
1086                         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS,
1087                                                         lu_cache_shrink);
1088                         if (result == 0)
1089                                 result = lu_time_global_init();
1090                 }
1091         }
1092         return result;
1093 }
1094
1095 /*
1096  * Dual to lu_global_init().
1097  */
1098 void lu_global_fini(void)
1099 {
1100         lu_time_global_fini();
1101         if (lu_site_shrinker != NULL) {
1102                 remove_shrinker(lu_site_shrinker);
1103                 lu_site_shrinker = NULL;
1104         }
1105
1106         lu_context_key_degister(&lu_global_key);
1107
1108         /*
1109          * Tear shrinker environment down _after_ de-registering
1110          * lu_global_key, because the latter has a value in the former.
1111          */
1112         down(&lu_sites_guard);
1113         lu_env_fini(&lu_shrink_env);
1114         up(&lu_sites_guard);
1115 }
1116
1117 struct lu_buf LU_BUF_NULL = {
1118         .lb_buf = NULL,
1119         .lb_len = 0
1120 };
1121 EXPORT_SYMBOL(LU_BUF_NULL);
1122
1123 /*
1124  * XXX: Functions below logically belong to fid module, but they are used by
1125  * dt_store_open(). Put them here until better place is found.
1126  */
1127
1128 void fid_pack(struct lu_fid_pack *pack, const struct lu_fid *fid,
1129               struct lu_fid *befider)
1130 {
1131         int recsize;
1132         __u64 seq;
1133         __u32 oid;
1134
1135         seq = fid_seq(fid);
1136         oid = fid_oid(fid);
1137
1138         /*
1139          * Two cases: compact 6 bytes representation for a common case, and
1140          * full 17 byte representation for "unusual" fid.
1141          */
1142
1143         /*
1144          * Check that usual case is really usual.
1145          */
1146         CLASSERT(LUSTRE_SEQ_MAX_WIDTH < 0xffffull);
1147
1148         if (fid_is_igif(fid) ||
1149             seq > 0xffffffull || oid > 0xffff || fid_ver(fid) != 0) {
1150                 fid_cpu_to_be(befider, fid);
1151                 recsize = sizeof *befider;
1152         } else {
1153                 unsigned char *small_befider;
1154
1155                 small_befider = (char *)befider;
1156
1157                 small_befider[0] = seq >> 16;
1158                 small_befider[1] = seq >> 8;
1159                 small_befider[2] = seq;
1160
1161                 small_befider[3] = oid >> 8;
1162                 small_befider[4] = oid;
1163
1164                 recsize = 5;
1165         }
1166         memcpy(pack->fp_area, befider, recsize);
1167         pack->fp_len = recsize + 1;
1168 }
1169 EXPORT_SYMBOL(fid_pack);
1170
1171 int fid_unpack(const struct lu_fid_pack *pack, struct lu_fid *fid)
1172 {
1173         int result;
1174
1175         result = 0;
1176         switch (pack->fp_len) {
1177         case sizeof *fid + 1:
1178                 memcpy(fid, pack->fp_area, sizeof *fid);
1179                 fid_be_to_cpu(fid, fid);
1180                 break;
1181         case 6: {
1182                 const unsigned char *area;
1183
1184                 area = pack->fp_area;
1185                 fid->f_seq = (area[0] << 16) | (area[1] << 8) | area[2];
1186                 fid->f_oid = (area[3] << 8) | area[4];
1187                 fid->f_ver = 0;
1188                 break;
1189         }
1190         default:
1191                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1192                 result = -EIO;
1193         }
1194         return result;
1195 }
1196 EXPORT_SYMBOL(fid_unpack);
1197
1198 const char *lu_time_names[LU_TIME_NR] = {
1199         [LU_TIME_FIND_LOOKUP] = "find_lookup",
1200         [LU_TIME_FIND_ALLOC]  = "find_alloc",
1201         [LU_TIME_FIND_INSERT] = "find_insert"
1202 };
1203 EXPORT_SYMBOL(lu_time_names);