Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 /* nr_free_pages() */
39 #include <linux/swap.h>
40 /* hash_long() */
41 #include <linux/hash.h>
42 #include <obd_support.h>
43 #include <lustre_disk.h>
44 #include <lustre_fid.h>
45 #include <lu_object.h>
46 #include <libcfs/list.h>
47 /* lu_time_global_{init,fini}() */
48 #include <lu_time.h>
49
50 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
51
52 /*
53  * Decrease reference counter on object. If last reference is freed, return
54  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
55  * case, free object immediately.
56  */
57 void lu_object_put(const struct lu_env *env, struct lu_object *o)
58 {
59         struct lu_object_header *top;
60         struct lu_site          *site;
61         struct lu_object        *orig;
62         int                      kill_it;
63
64         top = o->lo_header;
65         site = o->lo_dev->ld_site;
66         orig = o;
67         kill_it = 0;
68         write_lock(&site->ls_guard);
69         if (atomic_dec_and_test(&top->loh_ref)) {
70                 /*
71                  * When last reference is released, iterate over object
72                  * layers, and notify them that object is no longer busy.
73                  */
74                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
75                         if (o->lo_ops->loo_object_release != NULL)
76                                 o->lo_ops->loo_object_release(env, o);
77                 }
78                 -- site->ls_busy;
79                 if (lu_object_is_dying(top)) {
80                         /*
81                          * If object is dying (will not be cached), removed it
82                          * from hash table and LRU.
83                          *
84                          * This is done with hash table and LRU lists
85                          * locked. As the only way to acquire first reference
86                          * to previously unreferenced object is through
87                          * hash-table lookup (lu_object_find()), or LRU
88                          * scanning (lu_site_purge()), that are done under
89                          * hash-table and LRU lock, no race with concurrent
90                          * object lookup is possible and we can safely destroy
91                          * object below.
92                          */
93                         hlist_del_init(&top->loh_hash);
94                         list_del_init(&top->loh_lru);
95                         -- site->ls_total;
96                         kill_it = 1;
97                 }
98         }
99         write_unlock(&site->ls_guard);
100         if (kill_it)
101                 /*
102                  * Object was already removed from hash and lru above, can
103                  * kill it.
104                  */
105                 lu_object_free(env, orig);
106 }
107 EXPORT_SYMBOL(lu_object_put);
108
109 /*
110  * Allocate new object.
111  *
112  * This follows object creation protocol, described in the comment within
113  * struct lu_device_operations definition.
114  */
115 static struct lu_object *lu_object_alloc(const struct lu_env *env,
116                                          struct lu_site *s,
117                                          const struct lu_fid *f)
118 {
119         struct lu_object *scan;
120         struct lu_object *top;
121         struct list_head *layers;
122         int clean;
123         int result;
124
125         /*
126          * Create top-level object slice. This will also create
127          * lu_object_header.
128          */
129         top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
130                                                       NULL, s->ls_top_dev);
131         if (IS_ERR(top))
132                 RETURN(top);
133         /*
134          * This is the only place where object fid is assigned. It's constant
135          * after this point.
136          */
137         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
138         top->lo_header->loh_fid  = *f;
139         layers = &top->lo_header->loh_layers;
140         do {
141                 /*
142                  * Call ->loo_object_init() repeatedly, until no more new
143                  * object slices are created.
144                  */
145                 clean = 1;
146                 list_for_each_entry(scan, layers, lo_linkage) {
147                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
148                                 continue;
149                         clean = 0;
150                         scan->lo_header = top->lo_header;
151                         result = scan->lo_ops->loo_object_init(env, scan);
152                         if (result != 0) {
153                                 lu_object_free(env, top);
154                                 RETURN(ERR_PTR(result));
155                         }
156                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
157                 }
158         } while (!clean);
159
160         list_for_each_entry_reverse(scan, layers, lo_linkage) {
161                 if (scan->lo_ops->loo_object_start != NULL) {
162                         result = scan->lo_ops->loo_object_start(env, scan);
163                         if (result != 0) {
164                                 lu_object_free(env, top);
165                                 RETURN(ERR_PTR(result));
166                         }
167                 }
168         }
169
170         s->ls_stats.s_created ++;
171         RETURN(top);
172 }
173
174 /*
175  * Free object.
176  */
177 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
178 {
179         struct list_head splice;
180         struct lu_object *scan;
181
182         /*
183          * First call ->loo_object_delete() method to release all resources.
184          */
185         list_for_each_entry_reverse(scan,
186                                     &o->lo_header->loh_layers, lo_linkage) {
187                 if (scan->lo_ops->loo_object_delete != NULL)
188                         scan->lo_ops->loo_object_delete(env, scan);
189         }
190
191         /*
192          * Then, splice object layers into stand-alone list, and call
193          * ->loo_object_free() on all layers to free memory. Splice is
194          * necessary, because lu_object_header is freed together with the
195          * top-level slice.
196          */
197         INIT_LIST_HEAD(&splice);
198         list_splice_init(&o->lo_header->loh_layers, &splice);
199         while (!list_empty(&splice)) {
200                 o = container_of0(splice.next, struct lu_object, lo_linkage);
201                 list_del_init(&o->lo_linkage);
202                 LASSERT(o->lo_ops->loo_object_free != NULL);
203                 o->lo_ops->loo_object_free(env, o);
204         }
205 }
206
207 /*
208  * Free @nr objects from the cold end of the site LRU list.
209  */
210 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
211 {
212         struct list_head         dispose;
213         struct lu_object_header *h;
214         struct lu_object_header *temp;
215
216         INIT_LIST_HEAD(&dispose);
217         /*
218          * Under LRU list lock, scan LRU list and move unreferenced objects to
219          * the dispose list, removing them from LRU and hash table.
220          */
221         write_lock(&s->ls_guard);
222         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
223                 /*
224                  * Objects are sorted in lru order, and "busy" objects (ones
225                  * with h->loh_ref > 0) naturally tend to live near hot end
226                  * that we scan last. Unfortunately, sites usually have small
227                  * (less then ten) number of busy yet rarely accessed objects
228                  * (some global objects, accessed directly through pointers,
229                  * bypassing hash table). Currently algorithm scans them over
230                  * and over again. Probably we should move busy objects out of
231                  * LRU, or we can live with that.
232                  */
233                 if (nr-- == 0)
234                         break;
235                 if (atomic_read(&h->loh_ref) > 0)
236                         continue;
237                 hlist_del_init(&h->loh_hash);
238                 list_move(&h->loh_lru, &dispose);
239                 s->ls_total --;
240         }
241         write_unlock(&s->ls_guard);
242         /*
243          * Free everything on the dispose list. This is safe against races due
244          * to the reasons described in lu_object_put().
245          */
246         while (!list_empty(&dispose)) {
247                 h = container_of0(dispose.next,
248                                  struct lu_object_header, loh_lru);
249                 list_del_init(&h->loh_lru);
250                 lu_object_free(env, lu_object_top(h));
251                 s->ls_stats.s_lru_purged ++;
252         }
253         return nr;
254 }
255 EXPORT_SYMBOL(lu_site_purge);
256
257 /*
258  * Object printing.
259  *
260  * Code below has to jump through certain loops to output object description
261  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
262  * composes object description from strings that are parts of _lines_ of
263  * output (i.e., strings that are not terminated by newline). This doesn't fit
264  * very well into libcfs_debug_msg() interface that assumes that each message
265  * supplied to it is a self-contained output line.
266  *
267  * To work around this, strings are collected in a temporary buffer
268  * (implemented as a value of lu_cdebug_key key), until terminating newline
269  * character is detected.
270  *
271  */
272
273 enum {
274         /*
275          * Maximal line size.
276          *
277          * XXX overflow is not handled correctly.
278          */
279         LU_CDEBUG_LINE = 256
280 };
281
282 struct lu_cdebug_data {
283         /*
284          * Temporary buffer.
285          */
286         char lck_area[LU_CDEBUG_LINE];
287         /*
288          * fid staging area used by dt_store_open().
289          */
290         struct lu_fid_pack lck_pack;
291 };
292
293 static void *lu_global_key_init(const struct lu_context *ctx,
294                                 struct lu_context_key *key)
295 {
296         struct lu_cdebug_data *value;
297
298         OBD_ALLOC_PTR(value);
299         if (value == NULL)
300                 value = ERR_PTR(-ENOMEM);
301         return value;
302 }
303
304 static void lu_global_key_fini(const struct lu_context *ctx,
305                                struct lu_context_key *key, void *data)
306 {
307         struct lu_cdebug_data *value = data;
308         OBD_FREE_PTR(value);
309 }
310
311 /*
312  * Key, holding temporary buffer. This key is registered very early by
313  * lu_global_init().
314  */
315 struct lu_context_key lu_global_key = {
316         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
317         .lct_init = lu_global_key_init,
318         .lct_fini = lu_global_key_fini
319 };
320
321 /*
322  * Printer function emitting messages through libcfs_debug_msg().
323  */
324 int lu_cdebug_printer(const struct lu_env *env,
325                       void *cookie, const char *format, ...)
326 {
327         struct lu_cdebug_print_info *info = cookie;
328         struct lu_cdebug_data       *key;
329         int used;
330         int complete;
331         va_list args;
332
333         va_start(args, format);
334
335         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
336         LASSERT(key != NULL);
337
338         used = strlen(key->lck_area);
339         complete = format[strlen(format) - 1] == '\n';
340         /*
341          * Append new chunk to the buffer.
342          */
343         vsnprintf(key->lck_area + used,
344                   ARRAY_SIZE(key->lck_area) - used, format, args);
345         if (complete) {
346                 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
347                                  (char *)info->lpi_file, info->lpi_fn,
348                                  info->lpi_line, "%s", key->lck_area);
349                 key->lck_area[0] = 0;
350         }
351         va_end(args);
352         return 0;
353 }
354 EXPORT_SYMBOL(lu_cdebug_printer);
355
356 /*
357  * Print object header.
358  */
359 static void lu_object_header_print(const struct lu_env *env,
360                                    void *cookie, lu_printer_t printer,
361                                    const struct lu_object_header *hdr)
362 {
363         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
364                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
365                    PFID(&hdr->loh_fid),
366                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
367                    list_empty(&hdr->loh_lru) ? "" : " lru",
368                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
369 }
370
371 /*
372  * Print human readable representation of the @o to the @printer.
373  */
374 void lu_object_print(const struct lu_env *env, void *cookie,
375                      lu_printer_t printer, const struct lu_object *o)
376 {
377         static const char ruler[] = "........................................";
378         struct lu_object_header *top;
379         int depth;
380
381         top = o->lo_header;
382         lu_object_header_print(env, cookie, printer, top);
383         (*printer)(env, cookie, "\n");
384         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
385                 depth = o->lo_depth + 4;
386                 LASSERT(o->lo_ops->loo_object_print != NULL);
387                 /*
388                  * print `.' @depth times.
389                  */
390                 (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
391                 o->lo_ops->loo_object_print(env, cookie, printer, o);
392                 (*printer)(env, cookie, "\n");
393         }
394 }
395 EXPORT_SYMBOL(lu_object_print);
396
397 /*
398  * Check object consistency.
399  */
400 int lu_object_invariant(const struct lu_object *o)
401 {
402         struct lu_object_header *top;
403
404         top = o->lo_header;
405         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
406                 if (o->lo_ops->loo_object_invariant != NULL &&
407                     !o->lo_ops->loo_object_invariant(o))
408                         return 0;
409         }
410         return 1;
411 }
412 EXPORT_SYMBOL(lu_object_invariant);
413
414 static struct lu_object *htable_lookup(struct lu_site *s,
415                                        const struct hlist_head *bucket,
416                                        const struct lu_fid *f)
417 {
418         struct lu_object_header *h;
419         struct hlist_node *scan;
420
421         hlist_for_each_entry(h, scan, bucket, loh_hash) {
422                 s->ls_stats.s_cache_check ++;
423                 if (likely(lu_fid_eq(&h->loh_fid, f) &&
424                            !lu_object_is_dying(h))) {
425                         /* bump reference count... */
426                         if (atomic_add_return(1, &h->loh_ref) == 1)
427                                 ++ s->ls_busy;
428                         /* and move to the head of the LRU */
429                         /*
430                          * XXX temporary disable this to measure effects of
431                          * read-write locking.
432                          */
433                         /* list_move_tail(&h->loh_lru, &s->ls_lru); */
434                         s->ls_stats.s_cache_hit ++;
435                         return lu_object_top(h);
436                 }
437         }
438         s->ls_stats.s_cache_miss ++;
439         return NULL;
440 }
441
442 static __u32 fid_hash(const struct lu_fid *f, int bits)
443 {
444         /* all objects with same id and different versions will belong to same
445          * collisions list. */
446         return hash_long(fid_flatten(f), bits);
447 }
448
449 /*
450  * Search cache for an object with the fid @f. If such object is found, return
451  * it. Otherwise, create new object, insert it into cache and return it. In
452  * any case, additional reference is acquired on the returned object.
453  */
454 struct lu_object *lu_object_find(const struct lu_env *env,
455                                  struct lu_site *s, const struct lu_fid *f)
456 {
457         struct lu_object     *o;
458         struct lu_object     *shadow;
459         struct hlist_head *bucket;
460
461         /*
462          * This uses standard index maintenance protocol:
463          *
464          *     - search index under lock, and return object if found;
465          *     - otherwise, unlock index, allocate new object;
466          *     - lock index and search again;
467          *     - if nothing is found (usual case), insert newly created
468          *       object into index;
469          *     - otherwise (race: other thread inserted object), free
470          *       object just allocated.
471          *     - unlock index;
472          *     - return object.
473          */
474
475         bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits);
476
477         read_lock(&s->ls_guard);
478         o = htable_lookup(s, bucket, f);
479         read_unlock(&s->ls_guard);
480
481         if (o != NULL)
482                 return o;
483
484         /*
485          * Allocate new object. This may result in rather complicated
486          * operations, including fld queries, inode loading, etc.
487          */
488         o = lu_object_alloc(env, s, f);
489         if (unlikely(IS_ERR(o)))
490                 return o;
491
492         LASSERT(lu_fid_eq(lu_object_fid(o), f));
493
494         write_lock(&s->ls_guard);
495         shadow = htable_lookup(s, bucket, f);
496         if (likely(shadow == NULL)) {
497                 hlist_add_head(&o->lo_header->loh_hash, bucket);
498                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
499                 ++ s->ls_busy;
500                 ++ s->ls_total;
501                 shadow = o;
502                 o = NULL;
503         } else
504                 s->ls_stats.s_cache_race ++;
505         write_unlock(&s->ls_guard);
506         if (o != NULL)
507                 lu_object_free(env, o);
508         return shadow;
509 }
510 EXPORT_SYMBOL(lu_object_find);
511
512 /*
513  * Global list of all sites on this node
514  */
515 static LIST_HEAD(lu_sites);
516 static DECLARE_MUTEX(lu_sites_guard);
517
518 /*
519  * Global environment used by site shrinker.
520  */
521 static struct lu_env lu_shrink_env;
522
523 /*
524  * Print all objects in @s.
525  */
526 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
527                    lu_printer_t printer)
528 {
529         int i;
530
531         for (i = 0; i < s->ls_hash_size; ++i) {
532                 struct lu_object_header *h;
533                 struct hlist_node       *scan;
534
535                 read_lock(&s->ls_guard);
536                 hlist_for_each_entry(h, scan, &s->ls_hash[i], loh_hash) {
537
538                         if (!list_empty(&h->loh_layers)) {
539                                 const struct lu_object *obj;
540
541                                 obj = lu_object_top(h);
542                                 lu_object_print(env, cookie, printer, obj);
543                         } else
544                                 lu_object_header_print(env, cookie, printer, h);
545                 }
546                 read_unlock(&s->ls_guard);
547         }
548 }
549 EXPORT_SYMBOL(lu_site_print);
550
551 enum {
552         LU_CACHE_PERCENT   = 30,
553 };
554
555 /*
556  * Return desired hash table order.
557  */
558 static int lu_htable_order(void)
559 {
560         int bits;
561         unsigned long cache_size;
562
563         /*
564          * Calculate hash table size, assuming that we want reasonable
565          * performance when 30% of available memory is occupied by cache of
566          * lu_objects.
567          *
568          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
569          */
570         cache_size = nr_free_buffer_pages() / 100 *
571                 LU_CACHE_PERCENT * (CFS_PAGE_SIZE / 1024);
572
573         for (bits = 1; (1 << bits) < cache_size; ++bits) {
574                 ;
575         }
576         return bits;
577 }
578
579 /*
580  * Initialize site @s, with @d as the top level device.
581  */
582 int lu_site_init(struct lu_site *s, struct lu_device *top)
583 {
584         int bits;
585         int size;
586         int i;
587         ENTRY;
588
589         memset(s, 0, sizeof *s);
590         rwlock_init(&s->ls_guard);
591         CFS_INIT_LIST_HEAD(&s->ls_lru);
592         CFS_INIT_LIST_HEAD(&s->ls_linkage);
593         s->ls_top_dev = top;
594         top->ld_site = s;
595         lu_device_get(top);
596
597         for (bits = lu_htable_order(), size = 1 << bits;
598              (s->ls_hash =
599               cfs_alloc_large(size * sizeof s->ls_hash[0])) == NULL;
600              --bits, size >>= 1) {
601                 /*
602                  * Scale hash table down, until allocation succeeds.
603                  */
604                 ;
605         }
606
607         s->ls_hash_size = size;
608         s->ls_hash_bits = bits;
609         s->ls_hash_mask = size - 1;
610
611         for (i = 0; i < size; i++)
612                 INIT_HLIST_HEAD(&s->ls_hash[i]);
613
614         RETURN(0);
615 }
616 EXPORT_SYMBOL(lu_site_init);
617
618 /*
619  * Finalize @s and release its resources.
620  */
621 void lu_site_fini(struct lu_site *s)
622 {
623         LASSERT(list_empty(&s->ls_lru));
624         LASSERT(s->ls_total == 0);
625
626         down(&lu_sites_guard);
627         list_del_init(&s->ls_linkage);
628         up(&lu_sites_guard);
629
630         if (s->ls_hash != NULL) {
631                 int i;
632                 for (i = 0; i < s->ls_hash_size; i++)
633                         LASSERT(hlist_empty(&s->ls_hash[i]));
634                 cfs_free_large(s->ls_hash);
635                 s->ls_hash = NULL;
636         }
637         if (s->ls_top_dev != NULL) {
638                 s->ls_top_dev->ld_site = NULL;
639                 lu_device_put(s->ls_top_dev);
640                 s->ls_top_dev = NULL;
641         }
642 }
643 EXPORT_SYMBOL(lu_site_fini);
644
645 /*
646  * Called when initialization of stack for this site is completed.
647  */
648 int lu_site_init_finish(struct lu_site *s)
649 {
650         int result;
651         down(&lu_sites_guard);
652         result = lu_context_refill(&lu_shrink_env.le_ctx);
653         if (result == 0)
654                 list_add(&s->ls_linkage, &lu_sites);
655         up(&lu_sites_guard);
656         return result;
657 }
658 EXPORT_SYMBOL(lu_site_init_finish);
659
660 /*
661  * Acquire additional reference on device @d
662  */
663 void lu_device_get(struct lu_device *d)
664 {
665         atomic_inc(&d->ld_ref);
666 }
667 EXPORT_SYMBOL(lu_device_get);
668
669 /*
670  * Release reference on device @d.
671  */
672 void lu_device_put(struct lu_device *d)
673 {
674         atomic_dec(&d->ld_ref);
675 }
676 EXPORT_SYMBOL(lu_device_put);
677
678 /*
679  * Initialize device @d of type @t.
680  */
681 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
682 {
683         memset(d, 0, sizeof *d);
684         atomic_set(&d->ld_ref, 0);
685         d->ld_type = t;
686         return 0;
687 }
688 EXPORT_SYMBOL(lu_device_init);
689
690 /*
691  * Finalize device @d.
692  */
693 void lu_device_fini(struct lu_device *d)
694 {
695         if (d->ld_obd != NULL)
696                 /* finish lprocfs */
697                 lprocfs_obd_cleanup(d->ld_obd);
698
699         LASSERTF(atomic_read(&d->ld_ref) == 0,
700                  "Refcount is %u\n", atomic_read(&d->ld_ref));
701 }
702 EXPORT_SYMBOL(lu_device_fini);
703
704 /*
705  * Initialize object @o that is part of compound object @h and was created by
706  * device @d.
707  */
708 int lu_object_init(struct lu_object *o,
709                    struct lu_object_header *h, struct lu_device *d)
710 {
711         memset(o, 0, sizeof *o);
712         o->lo_header = h;
713         o->lo_dev    = d;
714         lu_device_get(d);
715         CFS_INIT_LIST_HEAD(&o->lo_linkage);
716         return 0;
717 }
718 EXPORT_SYMBOL(lu_object_init);
719
720 /*
721  * Finalize object and release its resources.
722  */
723 void lu_object_fini(struct lu_object *o)
724 {
725         LASSERT(list_empty(&o->lo_linkage));
726
727         if (o->lo_dev != NULL) {
728                 lu_device_put(o->lo_dev);
729                 o->lo_dev = NULL;
730         }
731 }
732 EXPORT_SYMBOL(lu_object_fini);
733
734 /*
735  * Add object @o as first layer of compound object @h
736  *
737  * This is typically called by the ->ldo_object_alloc() method of top-level
738  * device.
739  */
740 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
741 {
742         list_move(&o->lo_linkage, &h->loh_layers);
743 }
744 EXPORT_SYMBOL(lu_object_add_top);
745
746 /*
747  * Add object @o as a layer of compound object, going after @before.1
748  *
749  * This is typically called by the ->ldo_object_alloc() method of
750  * @before->lo_dev.
751  */
752 void lu_object_add(struct lu_object *before, struct lu_object *o)
753 {
754         list_move(&o->lo_linkage, &before->lo_linkage);
755 }
756 EXPORT_SYMBOL(lu_object_add);
757
758 /*
759  * Initialize compound object.
760  */
761 int lu_object_header_init(struct lu_object_header *h)
762 {
763         memset(h, 0, sizeof *h);
764         atomic_set(&h->loh_ref, 1);
765         INIT_HLIST_NODE(&h->loh_hash);
766         CFS_INIT_LIST_HEAD(&h->loh_lru);
767         CFS_INIT_LIST_HEAD(&h->loh_layers);
768         return 0;
769 }
770 EXPORT_SYMBOL(lu_object_header_init);
771
772 /*
773  * Finalize compound object.
774  */
775 void lu_object_header_fini(struct lu_object_header *h)
776 {
777         LASSERT(list_empty(&h->loh_layers));
778         LASSERT(list_empty(&h->loh_lru));
779         LASSERT(hlist_unhashed(&h->loh_hash));
780 }
781 EXPORT_SYMBOL(lu_object_header_fini);
782
783 /*
784  * Given a compound object, find its slice, corresponding to the device type
785  * @dtype.
786  */
787 struct lu_object *lu_object_locate(struct lu_object_header *h,
788                                    struct lu_device_type *dtype)
789 {
790         struct lu_object *o;
791
792         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
793                 if (o->lo_dev->ld_type == dtype)
794                         return o;
795         }
796         return NULL;
797 }
798 EXPORT_SYMBOL(lu_object_locate);
799
800 enum {
801         /*
802          * Maximal number of tld slots.
803          */
804         LU_CONTEXT_KEY_NR = 16
805 };
806
807 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
808
809 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
810
811 /*
812  * Register new key.
813  */
814 int lu_context_key_register(struct lu_context_key *key)
815 {
816         int result;
817         int i;
818
819         LASSERT(key->lct_init != NULL);
820         LASSERT(key->lct_fini != NULL);
821         LASSERT(key->lct_tags != 0);
822         LASSERT(key->lct_owner != NULL);
823
824         result = -ENFILE;
825         spin_lock(&lu_keys_guard);
826         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
827                 if (lu_keys[i] == NULL) {
828                         key->lct_index = i;
829                         atomic_set(&key->lct_used, 1);
830                         lu_keys[i] = key;
831                         result = 0;
832                         break;
833                 }
834         }
835         spin_unlock(&lu_keys_guard);
836         return result;
837 }
838 EXPORT_SYMBOL(lu_context_key_register);
839
840 static void key_fini(struct lu_context *ctx, int index)
841 {
842         if (ctx->lc_value[index] != NULL) {
843                 struct lu_context_key *key;
844
845                 key = lu_keys[index];
846                 LASSERT(key != NULL);
847                 LASSERT(key->lct_fini != NULL);
848                 LASSERT(atomic_read(&key->lct_used) > 1);
849
850                 key->lct_fini(ctx, key, ctx->lc_value[index]);
851                 atomic_dec(&key->lct_used);
852                 LASSERT(key->lct_owner != NULL);
853                 if (!(ctx->lc_tags & LCT_NOREF)) {
854                         LASSERT(module_refcount(key->lct_owner) > 0);
855                         module_put(key->lct_owner);
856                 }
857                 ctx->lc_value[index] = NULL;
858         }
859 }
860
861 /*
862  * Deregister key.
863  */
864 void lu_context_key_degister(struct lu_context_key *key)
865 {
866         LASSERT(atomic_read(&key->lct_used) >= 1);
867         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
868
869         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
870
871         if (atomic_read(&key->lct_used) > 1)
872                 CERROR("key has instances.\n");
873         spin_lock(&lu_keys_guard);
874         lu_keys[key->lct_index] = NULL;
875         spin_unlock(&lu_keys_guard);
876 }
877 EXPORT_SYMBOL(lu_context_key_degister);
878
879 /*
880  * Return value associated with key @key in context @ctx.
881  */
882 void *lu_context_key_get(const struct lu_context *ctx,
883                          struct lu_context_key *key)
884 {
885         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
886         return ctx->lc_value[key->lct_index];
887 }
888 EXPORT_SYMBOL(lu_context_key_get);
889
890 static void keys_fini(struct lu_context *ctx)
891 {
892         int i;
893
894         if (ctx->lc_value != NULL) {
895                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
896                         key_fini(ctx, i);
897                 OBD_FREE(ctx->lc_value,
898                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
899                 ctx->lc_value = NULL;
900         }
901 }
902
903 static int keys_fill(const struct lu_context *ctx)
904 {
905         int i;
906
907         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
908                 struct lu_context_key *key;
909
910                 key = lu_keys[i];
911                 if (ctx->lc_value[i] == NULL &&
912                     key != NULL && key->lct_tags & ctx->lc_tags) {
913                         void *value;
914
915                         LASSERT(key->lct_init != NULL);
916                         LASSERT(key->lct_index == i);
917
918                         value = key->lct_init(ctx, key);
919                         if (unlikely(IS_ERR(value)))
920                                 return PTR_ERR(value);
921                         LASSERT(key->lct_owner != NULL);
922                         if (!(ctx->lc_tags & LCT_NOREF))
923                                 try_module_get(key->lct_owner);
924                         atomic_inc(&key->lct_used);
925                         ctx->lc_value[i] = value;
926                 }
927         }
928         return 0;
929 }
930
931 static int keys_init(struct lu_context *ctx)
932 {
933         int result;
934
935         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
936         if (likely(ctx->lc_value != NULL))
937                 result = keys_fill(ctx);
938         else
939                 result = -ENOMEM;
940
941         if (result != 0)
942                 keys_fini(ctx);
943         return result;
944 }
945
946 /*
947  * Initialize context data-structure. Create values for all keys.
948  */
949 int lu_context_init(struct lu_context *ctx, __u32 tags)
950 {
951         memset(ctx, 0, sizeof *ctx);
952         ctx->lc_tags = tags;
953         return keys_init(ctx);
954 }
955 EXPORT_SYMBOL(lu_context_init);
956
957 /*
958  * Finalize context data-structure. Destroy key values.
959  */
960 void lu_context_fini(struct lu_context *ctx)
961 {
962         keys_fini(ctx);
963 }
964 EXPORT_SYMBOL(lu_context_fini);
965
966 /*
967  * Called before entering context.
968  */
969 void lu_context_enter(struct lu_context *ctx)
970 {
971 }
972 EXPORT_SYMBOL(lu_context_enter);
973
974 /*
975  * Called after exiting from @ctx
976  */
977 void lu_context_exit(struct lu_context *ctx)
978 {
979         int i;
980
981         if (ctx->lc_value != NULL) {
982                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
983                         if (ctx->lc_value[i] != NULL) {
984                                 struct lu_context_key *key;
985
986                                 key = lu_keys[i];
987                                 LASSERT(key != NULL);
988                                 if (key->lct_exit != NULL)
989                                         key->lct_exit(ctx,
990                                                       key, ctx->lc_value[i]);
991                         }
992                 }
993         }
994 }
995 EXPORT_SYMBOL(lu_context_exit);
996
997 /*
998  * Allocate for context all missing keys that were registered after context
999  * creation.
1000  */
1001 int lu_context_refill(const struct lu_context *ctx)
1002 {
1003         LASSERT(ctx->lc_value != NULL);
1004         return keys_fill(ctx);
1005 }
1006 EXPORT_SYMBOL(lu_context_refill);
1007
1008 static int lu_env_setup(struct lu_env *env, struct lu_context *ses,
1009                         __u32 tags, int noref)
1010 {
1011         int result;
1012
1013         LASSERT(ergo(!noref, !(tags & LCT_NOREF)));
1014
1015         env->le_ses = ses;
1016         result = lu_context_init(&env->le_ctx, tags);
1017         if (likely(result == 0))
1018                 lu_context_enter(&env->le_ctx);
1019         return result;
1020 }
1021
1022 static int lu_env_init_noref(struct lu_env *env, struct lu_context *ses,
1023                              __u32 tags)
1024 {
1025         return lu_env_setup(env, ses, tags, 1);
1026 }
1027
1028 int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags)
1029 {
1030         return lu_env_setup(env, ses, tags, 0);
1031 }
1032 EXPORT_SYMBOL(lu_env_init);
1033
1034 void lu_env_fini(struct lu_env *env)
1035 {
1036         lu_context_exit(&env->le_ctx);
1037         lu_context_fini(&env->le_ctx);
1038         env->le_ses = NULL;
1039 }
1040 EXPORT_SYMBOL(lu_env_fini);
1041
1042 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1043 {
1044         struct lu_site *s;
1045         struct lu_site *tmp;
1046         int cached = 0;
1047         int remain = nr;
1048         LIST_HEAD(splice);
1049
1050         if (nr != 0 && !(gfp_mask & __GFP_FS))
1051                 return -1;
1052
1053         down(&lu_sites_guard);
1054         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1055                 if (nr != 0) {
1056                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1057                         /*
1058                          * Move just shrunk site to the tail of site list to
1059                          * assure shrinking fairness.
1060                          */
1061                         list_move_tail(&s->ls_linkage, &splice);
1062                 }
1063                 read_lock(&s->ls_guard);
1064                 cached += s->ls_total - s->ls_busy;
1065                 read_unlock(&s->ls_guard);
1066                 if (remain <= 0)
1067                         break;
1068         }
1069         list_splice(&splice, lu_sites.prev);
1070         up(&lu_sites_guard);
1071         return cached;
1072 }
1073
1074 static struct shrinker *lu_site_shrinker = NULL;
1075
1076 /*
1077  * Initialization of global lu_* data.
1078  */
1079 int lu_global_init(void)
1080 {
1081         int result;
1082
1083         LU_CONTEXT_KEY_INIT(&lu_global_key);
1084         result = lu_context_key_register(&lu_global_key);
1085         if (result == 0) {
1086                 /*
1087                  * At this level, we don't know what tags are needed, so
1088                  * allocate them conservatively. This should not be too bad,
1089                  * because this environment is global.
1090                  */
1091                 down(&lu_sites_guard);
1092                 result = lu_env_init_noref(&lu_shrink_env, NULL, LCT_SHRINKER);
1093                 up(&lu_sites_guard);
1094                 if (result == 0) {
1095                         /*
1096                          * seeks estimation: 3 seeks to read a record from oi,
1097                          * one to read inode, one for ea. Unfortunately
1098                          * setting this high value results in lu_object/inode
1099                          * cache consuming all the memory.
1100                          */
1101                         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS,
1102                                                         lu_cache_shrink);
1103                         if (result == 0)
1104                                 result = lu_time_global_init();
1105                 }
1106         }
1107         return result;
1108 }
1109
1110 /*
1111  * Dual to lu_global_init().
1112  */
1113 void lu_global_fini(void)
1114 {
1115         lu_time_global_fini();
1116         if (lu_site_shrinker != NULL) {
1117                 remove_shrinker(lu_site_shrinker);
1118                 lu_site_shrinker = NULL;
1119         }
1120
1121         lu_context_key_degister(&lu_global_key);
1122
1123         /*
1124          * Tear shrinker environment down _after_ de-registering
1125          * lu_global_key, because the latter has a value in the former.
1126          */
1127         down(&lu_sites_guard);
1128         lu_env_fini(&lu_shrink_env);
1129         up(&lu_sites_guard);
1130 }
1131
1132 struct lu_buf LU_BUF_NULL = {
1133         .lb_buf = NULL,
1134         .lb_len = 0
1135 };
1136 EXPORT_SYMBOL(LU_BUF_NULL);
1137
1138 /*
1139  * XXX: Functions below logically belong to fid module, but they are used by
1140  * dt_store_open(). Put them here until better place is found.
1141  */
1142
1143 void fid_pack(struct lu_fid_pack *pack, const struct lu_fid *fid,
1144               struct lu_fid *befider)
1145 {
1146         int recsize;
1147         __u64 seq;
1148         __u32 oid;
1149
1150         seq = fid_seq(fid);
1151         oid = fid_oid(fid);
1152
1153         /*
1154          * Two cases: compact 6 bytes representation for a common case, and
1155          * full 17 byte representation for "unusual" fid.
1156          */
1157
1158         /*
1159          * Check that usual case is really usual.
1160          */
1161         CLASSERT(LUSTRE_SEQ_MAX_WIDTH < 0xffffull);
1162
1163         if (fid_is_igif(fid) ||
1164             seq > 0xffffffull || oid > 0xffff || fid_ver(fid) != 0) {
1165                 fid_cpu_to_be(befider, fid);
1166                 recsize = sizeof *befider;
1167         } else {
1168                 unsigned char *small_befider;
1169
1170                 small_befider = (char *)befider;
1171
1172                 small_befider[0] = seq >> 16;
1173                 small_befider[1] = seq >> 8;
1174                 small_befider[2] = seq;
1175
1176                 small_befider[3] = oid >> 8;
1177                 small_befider[4] = oid;
1178
1179                 recsize = 5;
1180         }
1181         memcpy(pack->fp_area, befider, recsize);
1182         pack->fp_len = recsize + 1;
1183 }
1184 EXPORT_SYMBOL(fid_pack);
1185
1186 void fid_unpack(const struct lu_fid_pack *pack, struct lu_fid *fid)
1187 {
1188         switch (pack->fp_len) {
1189         case sizeof *fid + 1:
1190                 memcpy(fid, pack->fp_area, sizeof *fid);
1191                 fid_be_to_cpu(fid, fid);
1192                 break;
1193         case 6: {
1194                 const unsigned char *area;
1195
1196                 area = pack->fp_area;
1197                 fid->f_seq = (area[0] << 16) | (area[1] << 8) | area[2];
1198                 fid->f_oid = (area[3] << 8) | area[4];
1199                 fid->f_ver = 0;
1200                 break;
1201         }
1202         default:
1203                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1204                 LBUG();
1205         }
1206 }
1207 EXPORT_SYMBOL(fid_unpack);
1208
1209 const char *lu_time_names[LU_TIME_NR] = {
1210         [LU_TIME_FIND_LOOKUP] = "find_lookup",
1211         [LU_TIME_FIND_ALLOC]  = "find_alloc",
1212         [LU_TIME_FIND_INSERT] = "find_insert"
1213 };
1214 EXPORT_SYMBOL(lu_time_names);