Whamcloud - gitweb
patches related to bug 13377 (CMD small fixes), 2+4 patch and fid_unpack patch
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 /* nr_free_pages() */
39 #include <linux/swap.h>
40 /* hash_long() */
41 #include <linux/hash.h>
42 #include <obd_support.h>
43 #include <lustre_disk.h>
44 #include <lustre_fid.h>
45 #include <lu_object.h>
46 #include <libcfs/list.h>
47 /* lu_time_global_{init,fini}() */
48 #include <lu_time.h>
49
50 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
51
52 /*
53  * Decrease reference counter on object. If last reference is freed, return
54  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
55  * case, free object immediately.
56  */
57 void lu_object_put(const struct lu_env *env, struct lu_object *o)
58 {
59         struct lu_object_header *top;
60         struct lu_site          *site;
61         struct lu_object        *orig;
62         int                      kill_it;
63
64         top = o->lo_header;
65         site = o->lo_dev->ld_site;
66         orig = o;
67         kill_it = 0;
68         write_lock(&site->ls_guard);
69         if (atomic_dec_and_test(&top->loh_ref)) {
70                 /*
71                  * When last reference is released, iterate over object
72                  * layers, and notify them that object is no longer busy.
73                  */
74                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
75                         if (o->lo_ops->loo_object_release != NULL)
76                                 o->lo_ops->loo_object_release(env, o);
77                 }
78                 -- site->ls_busy;
79                 if (lu_object_is_dying(top)) {
80                         /*
81                          * If object is dying (will not be cached), removed it
82                          * from hash table and LRU.
83                          *
84                          * This is done with hash table and LRU lists
85                          * locked. As the only way to acquire first reference
86                          * to previously unreferenced object is through
87                          * hash-table lookup (lu_object_find()), or LRU
88                          * scanning (lu_site_purge()), that are done under
89                          * hash-table and LRU lock, no race with concurrent
90                          * object lookup is possible and we can safely destroy
91                          * object below.
92                          */
93                         hlist_del_init(&top->loh_hash);
94                         list_del_init(&top->loh_lru);
95                         -- site->ls_total;
96                         kill_it = 1;
97                 }
98         }
99         write_unlock(&site->ls_guard);
100         if (kill_it)
101                 /*
102                  * Object was already removed from hash and lru above, can
103                  * kill it.
104                  */
105                 lu_object_free(env, orig);
106 }
107 EXPORT_SYMBOL(lu_object_put);
108
109 /*
110  * Allocate new object.
111  *
112  * This follows object creation protocol, described in the comment within
113  * struct lu_device_operations definition.
114  */
115 static struct lu_object *lu_object_alloc(const struct lu_env *env,
116                                          struct lu_site *s,
117                                          const struct lu_fid *f)
118 {
119         struct lu_object *scan;
120         struct lu_object *top;
121         struct list_head *layers;
122         int clean;
123         int result;
124
125         /*
126          * Create top-level object slice. This will also create
127          * lu_object_header.
128          */
129         top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
130                                                       NULL, s->ls_top_dev);
131         if (IS_ERR(top))
132                 RETURN(top);
133         /*
134          * This is the only place where object fid is assigned. It's constant
135          * after this point.
136          */
137         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
138         top->lo_header->loh_fid  = *f;
139         layers = &top->lo_header->loh_layers;
140         do {
141                 /*
142                  * Call ->loo_object_init() repeatedly, until no more new
143                  * object slices are created.
144                  */
145                 clean = 1;
146                 list_for_each_entry(scan, layers, lo_linkage) {
147                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
148                                 continue;
149                         clean = 0;
150                         scan->lo_header = top->lo_header;
151                         result = scan->lo_ops->loo_object_init(env, scan);
152                         if (result != 0) {
153                                 lu_object_free(env, top);
154                                 RETURN(ERR_PTR(result));
155                         }
156                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
157                 }
158         } while (!clean);
159
160         list_for_each_entry_reverse(scan, layers, lo_linkage) {
161                 if (scan->lo_ops->loo_object_start != NULL) {
162                         result = scan->lo_ops->loo_object_start(env, scan);
163                         if (result != 0) {
164                                 lu_object_free(env, top);
165                                 RETURN(ERR_PTR(result));
166                         }
167                 }
168         }
169
170         s->ls_stats.s_created ++;
171         RETURN(top);
172 }
173
174 /*
175  * Free object.
176  */
177 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
178 {
179         struct list_head splice;
180         struct lu_object *scan;
181
182         /*
183          * First call ->loo_object_delete() method to release all resources.
184          */
185         list_for_each_entry_reverse(scan,
186                                     &o->lo_header->loh_layers, lo_linkage) {
187                 if (scan->lo_ops->loo_object_delete != NULL)
188                         scan->lo_ops->loo_object_delete(env, scan);
189         }
190
191         /*
192          * Then, splice object layers into stand-alone list, and call
193          * ->loo_object_free() on all layers to free memory. Splice is
194          * necessary, because lu_object_header is freed together with the
195          * top-level slice.
196          */
197         INIT_LIST_HEAD(&splice);
198         list_splice_init(&o->lo_header->loh_layers, &splice);
199         while (!list_empty(&splice)) {
200                 o = container_of0(splice.next, struct lu_object, lo_linkage);
201                 list_del_init(&o->lo_linkage);
202                 LASSERT(o->lo_ops->loo_object_free != NULL);
203                 o->lo_ops->loo_object_free(env, o);
204         }
205 }
206
207 /*
208  * Free @nr objects from the cold end of the site LRU list.
209  */
210 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
211 {
212         struct list_head         dispose;
213         struct lu_object_header *h;
214         struct lu_object_header *temp;
215
216         INIT_LIST_HEAD(&dispose);
217         /*
218          * Under LRU list lock, scan LRU list and move unreferenced objects to
219          * the dispose list, removing them from LRU and hash table.
220          */
221         write_lock(&s->ls_guard);
222         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
223                 /*
224                  * Objects are sorted in lru order, and "busy" objects (ones
225                  * with h->loh_ref > 0) naturally tend to live near hot end
226                  * that we scan last. Unfortunately, sites usually have small
227                  * (less then ten) number of busy yet rarely accessed objects
228                  * (some global objects, accessed directly through pointers,
229                  * bypassing hash table). Currently algorithm scans them over
230                  * and over again. Probably we should move busy objects out of
231                  * LRU, or we can live with that.
232                  */
233                 if (nr-- == 0)
234                         break;
235                 if (atomic_read(&h->loh_ref) > 0)
236                         continue;
237                 hlist_del_init(&h->loh_hash);
238                 list_move(&h->loh_lru, &dispose);
239                 s->ls_total --;
240         }
241         write_unlock(&s->ls_guard);
242         /*
243          * Free everything on the dispose list. This is safe against races due
244          * to the reasons described in lu_object_put().
245          */
246         while (!list_empty(&dispose)) {
247                 h = container_of0(dispose.next,
248                                  struct lu_object_header, loh_lru);
249                 list_del_init(&h->loh_lru);
250                 lu_object_free(env, lu_object_top(h));
251                 s->ls_stats.s_lru_purged ++;
252         }
253         return nr;
254 }
255 EXPORT_SYMBOL(lu_site_purge);
256
257 /*
258  * Object printing.
259  *
260  * Code below has to jump through certain loops to output object description
261  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
262  * composes object description from strings that are parts of _lines_ of
263  * output (i.e., strings that are not terminated by newline). This doesn't fit
264  * very well into libcfs_debug_msg() interface that assumes that each message
265  * supplied to it is a self-contained output line.
266  *
267  * To work around this, strings are collected in a temporary buffer
268  * (implemented as a value of lu_cdebug_key key), until terminating newline
269  * character is detected.
270  *
271  */
272
273 enum {
274         /*
275          * Maximal line size.
276          *
277          * XXX overflow is not handled correctly.
278          */
279         LU_CDEBUG_LINE = 256
280 };
281
282 struct lu_cdebug_data {
283         /*
284          * Temporary buffer.
285          */
286         char lck_area[LU_CDEBUG_LINE];
287         /*
288          * fid staging area used by dt_store_open().
289          */
290         struct lu_fid_pack lck_pack;
291 };
292
293 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
294
295 /*
296  * Key, holding temporary buffer. This key is registered very early by
297  * lu_global_init().
298  */
299 struct lu_context_key lu_global_key = {
300         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
301         .lct_init = lu_global_key_init,
302         .lct_fini = lu_global_key_fini
303 };
304
305 /*
306  * Printer function emitting messages through libcfs_debug_msg().
307  */
308 int lu_cdebug_printer(const struct lu_env *env,
309                       void *cookie, const char *format, ...)
310 {
311         struct lu_cdebug_print_info *info = cookie;
312         struct lu_cdebug_data       *key;
313         int used;
314         int complete;
315         va_list args;
316
317         va_start(args, format);
318
319         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
320         LASSERT(key != NULL);
321
322         used = strlen(key->lck_area);
323         complete = format[strlen(format) - 1] == '\n';
324         /*
325          * Append new chunk to the buffer.
326          */
327         vsnprintf(key->lck_area + used,
328                   ARRAY_SIZE(key->lck_area) - used, format, args);
329         if (complete) {
330                 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
331                                  (char *)info->lpi_file, info->lpi_fn,
332                                  info->lpi_line, "%s", key->lck_area);
333                 key->lck_area[0] = 0;
334         }
335         va_end(args);
336         return 0;
337 }
338 EXPORT_SYMBOL(lu_cdebug_printer);
339
340 /*
341  * Print object header.
342  */
343 static void lu_object_header_print(const struct lu_env *env,
344                                    void *cookie, lu_printer_t printer,
345                                    const struct lu_object_header *hdr)
346 {
347         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
348                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
349                    PFID(&hdr->loh_fid),
350                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
351                    list_empty(&hdr->loh_lru) ? "" : " lru",
352                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
353 }
354
355 /*
356  * Print human readable representation of the @o to the @printer.
357  */
358 void lu_object_print(const struct lu_env *env, void *cookie,
359                      lu_printer_t printer, const struct lu_object *o)
360 {
361         static const char ruler[] = "........................................";
362         struct lu_object_header *top;
363         int depth;
364
365         top = o->lo_header;
366         lu_object_header_print(env, cookie, printer, top);
367         (*printer)(env, cookie, "\n");
368         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
369                 depth = o->lo_depth + 4;
370                 LASSERT(o->lo_ops->loo_object_print != NULL);
371                 /*
372                  * print `.' @depth times.
373                  */
374                 (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
375                 o->lo_ops->loo_object_print(env, cookie, printer, o);
376                 (*printer)(env, cookie, "\n");
377         }
378 }
379 EXPORT_SYMBOL(lu_object_print);
380
381 /*
382  * Check object consistency.
383  */
384 int lu_object_invariant(const struct lu_object *o)
385 {
386         struct lu_object_header *top;
387
388         top = o->lo_header;
389         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
390                 if (o->lo_ops->loo_object_invariant != NULL &&
391                     !o->lo_ops->loo_object_invariant(o))
392                         return 0;
393         }
394         return 1;
395 }
396 EXPORT_SYMBOL(lu_object_invariant);
397
398 static struct lu_object *htable_lookup(struct lu_site *s,
399                                        const struct hlist_head *bucket,
400                                        const struct lu_fid *f)
401 {
402         struct lu_object_header *h;
403         struct hlist_node *scan;
404
405         hlist_for_each_entry(h, scan, bucket, loh_hash) {
406                 s->ls_stats.s_cache_check ++;
407                 if (likely(lu_fid_eq(&h->loh_fid, f) &&
408                            !lu_object_is_dying(h))) {
409                         /* bump reference count... */
410                         if (atomic_add_return(1, &h->loh_ref) == 1)
411                                 ++ s->ls_busy;
412                         /* and move to the head of the LRU */
413                         /*
414                          * XXX temporary disable this to measure effects of
415                          * read-write locking.
416                          */
417                         /* list_move_tail(&h->loh_lru, &s->ls_lru); */
418                         s->ls_stats.s_cache_hit ++;
419                         return lu_object_top(h);
420                 }
421         }
422         s->ls_stats.s_cache_miss ++;
423         return NULL;
424 }
425
426 static __u32 fid_hash(const struct lu_fid *f, int bits)
427 {
428         /* all objects with same id and different versions will belong to same
429          * collisions list. */
430         return hash_long(fid_flatten(f), bits);
431 }
432
433 /*
434  * Search cache for an object with the fid @f. If such object is found, return
435  * it. Otherwise, create new object, insert it into cache and return it. In
436  * any case, additional reference is acquired on the returned object.
437  */
438 struct lu_object *lu_object_find(const struct lu_env *env,
439                                  struct lu_site *s, const struct lu_fid *f)
440 {
441         struct lu_object     *o;
442         struct lu_object     *shadow;
443         struct hlist_head *bucket;
444
445         /*
446          * This uses standard index maintenance protocol:
447          *
448          *     - search index under lock, and return object if found;
449          *     - otherwise, unlock index, allocate new object;
450          *     - lock index and search again;
451          *     - if nothing is found (usual case), insert newly created
452          *       object into index;
453          *     - otherwise (race: other thread inserted object), free
454          *       object just allocated.
455          *     - unlock index;
456          *     - return object.
457          */
458
459         bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits);
460
461         read_lock(&s->ls_guard);
462         o = htable_lookup(s, bucket, f);
463         read_unlock(&s->ls_guard);
464
465         if (o != NULL)
466                 return o;
467
468         /*
469          * Allocate new object. This may result in rather complicated
470          * operations, including fld queries, inode loading, etc.
471          */
472         o = lu_object_alloc(env, s, f);
473         if (unlikely(IS_ERR(o)))
474                 return o;
475
476         LASSERT(lu_fid_eq(lu_object_fid(o), f));
477
478         write_lock(&s->ls_guard);
479         shadow = htable_lookup(s, bucket, f);
480         if (likely(shadow == NULL)) {
481                 hlist_add_head(&o->lo_header->loh_hash, bucket);
482                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
483                 ++ s->ls_busy;
484                 ++ s->ls_total;
485                 shadow = o;
486                 o = NULL;
487         } else
488                 s->ls_stats.s_cache_race ++;
489         write_unlock(&s->ls_guard);
490         if (o != NULL)
491                 lu_object_free(env, o);
492         return shadow;
493 }
494 EXPORT_SYMBOL(lu_object_find);
495
496 /*
497  * Global list of all sites on this node
498  */
499 static LIST_HEAD(lu_sites);
500 static DECLARE_MUTEX(lu_sites_guard);
501
502 /*
503  * Global environment used by site shrinker.
504  */
505 static struct lu_env lu_shrink_env;
506
507 /*
508  * Print all objects in @s.
509  */
510 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
511                    lu_printer_t printer)
512 {
513         int i;
514
515         for (i = 0; i < s->ls_hash_size; ++i) {
516                 struct lu_object_header *h;
517                 struct hlist_node       *scan;
518
519                 read_lock(&s->ls_guard);
520                 hlist_for_each_entry(h, scan, &s->ls_hash[i], loh_hash) {
521
522                         if (!list_empty(&h->loh_layers)) {
523                                 const struct lu_object *obj;
524
525                                 obj = lu_object_top(h);
526                                 lu_object_print(env, cookie, printer, obj);
527                         } else
528                                 lu_object_header_print(env, cookie, printer, h);
529                 }
530                 read_unlock(&s->ls_guard);
531         }
532 }
533 EXPORT_SYMBOL(lu_site_print);
534
535 enum {
536         LU_CACHE_PERCENT   = 30,
537 };
538
539 /*
540  * Return desired hash table order.
541  */
542 static int lu_htable_order(void)
543 {
544         int bits;
545         unsigned long cache_size;
546
547         /*
548          * Calculate hash table size, assuming that we want reasonable
549          * performance when 30% of available memory is occupied by cache of
550          * lu_objects.
551          *
552          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
553          */
554         cache_size = nr_free_buffer_pages() / 100 *
555                 LU_CACHE_PERCENT * (CFS_PAGE_SIZE / 1024);
556
557         for (bits = 1; (1 << bits) < cache_size; ++bits) {
558                 ;
559         }
560         return bits;
561 }
562
563 /*
564  * Initialize site @s, with @d as the top level device.
565  */
566 int lu_site_init(struct lu_site *s, struct lu_device *top)
567 {
568         int bits;
569         int size;
570         int i;
571         ENTRY;
572
573         memset(s, 0, sizeof *s);
574         rwlock_init(&s->ls_guard);
575         CFS_INIT_LIST_HEAD(&s->ls_lru);
576         CFS_INIT_LIST_HEAD(&s->ls_linkage);
577         s->ls_top_dev = top;
578         top->ld_site = s;
579         lu_device_get(top);
580
581         for (bits = lu_htable_order(), size = 1 << bits;
582              (s->ls_hash =
583               cfs_alloc_large(size * sizeof s->ls_hash[0])) == NULL;
584              --bits, size >>= 1) {
585                 /*
586                  * Scale hash table down, until allocation succeeds.
587                  */
588                 ;
589         }
590
591         s->ls_hash_size = size;
592         s->ls_hash_bits = bits;
593         s->ls_hash_mask = size - 1;
594
595         for (i = 0; i < size; i++)
596                 INIT_HLIST_HEAD(&s->ls_hash[i]);
597
598         RETURN(0);
599 }
600 EXPORT_SYMBOL(lu_site_init);
601
602 /*
603  * Finalize @s and release its resources.
604  */
605 void lu_site_fini(struct lu_site *s)
606 {
607         LASSERT(list_empty(&s->ls_lru));
608         LASSERT(s->ls_total == 0);
609
610         down(&lu_sites_guard);
611         list_del_init(&s->ls_linkage);
612         up(&lu_sites_guard);
613
614         if (s->ls_hash != NULL) {
615                 int i;
616                 for (i = 0; i < s->ls_hash_size; i++)
617                         LASSERT(hlist_empty(&s->ls_hash[i]));
618                 cfs_free_large(s->ls_hash);
619                 s->ls_hash = NULL;
620         }
621         if (s->ls_top_dev != NULL) {
622                 s->ls_top_dev->ld_site = NULL;
623                 lu_device_put(s->ls_top_dev);
624                 s->ls_top_dev = NULL;
625         }
626 }
627 EXPORT_SYMBOL(lu_site_fini);
628
629 /*
630  * Called when initialization of stack for this site is completed.
631  */
632 int lu_site_init_finish(struct lu_site *s)
633 {
634         int result;
635         down(&lu_sites_guard);
636         result = lu_context_refill(&lu_shrink_env.le_ctx);
637         if (result == 0)
638                 list_add(&s->ls_linkage, &lu_sites);
639         up(&lu_sites_guard);
640         return result;
641 }
642 EXPORT_SYMBOL(lu_site_init_finish);
643
644 /*
645  * Acquire additional reference on device @d
646  */
647 void lu_device_get(struct lu_device *d)
648 {
649         atomic_inc(&d->ld_ref);
650 }
651 EXPORT_SYMBOL(lu_device_get);
652
653 /*
654  * Release reference on device @d.
655  */
656 void lu_device_put(struct lu_device *d)
657 {
658         atomic_dec(&d->ld_ref);
659 }
660 EXPORT_SYMBOL(lu_device_put);
661
662 /*
663  * Initialize device @d of type @t.
664  */
665 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
666 {
667         memset(d, 0, sizeof *d);
668         atomic_set(&d->ld_ref, 0);
669         d->ld_type = t;
670         return 0;
671 }
672 EXPORT_SYMBOL(lu_device_init);
673
674 /*
675  * Finalize device @d.
676  */
677 void lu_device_fini(struct lu_device *d)
678 {
679         if (d->ld_obd != NULL)
680                 /* finish lprocfs */
681                 lprocfs_obd_cleanup(d->ld_obd);
682
683         LASSERTF(atomic_read(&d->ld_ref) == 0,
684                  "Refcount is %u\n", atomic_read(&d->ld_ref));
685 }
686 EXPORT_SYMBOL(lu_device_fini);
687
688 /*
689  * Initialize object @o that is part of compound object @h and was created by
690  * device @d.
691  */
692 int lu_object_init(struct lu_object *o,
693                    struct lu_object_header *h, struct lu_device *d)
694 {
695         memset(o, 0, sizeof *o);
696         o->lo_header = h;
697         o->lo_dev    = d;
698         lu_device_get(d);
699         CFS_INIT_LIST_HEAD(&o->lo_linkage);
700         return 0;
701 }
702 EXPORT_SYMBOL(lu_object_init);
703
704 /*
705  * Finalize object and release its resources.
706  */
707 void lu_object_fini(struct lu_object *o)
708 {
709         LASSERT(list_empty(&o->lo_linkage));
710
711         if (o->lo_dev != NULL) {
712                 lu_device_put(o->lo_dev);
713                 o->lo_dev = NULL;
714         }
715 }
716 EXPORT_SYMBOL(lu_object_fini);
717
718 /*
719  * Add object @o as first layer of compound object @h
720  *
721  * This is typically called by the ->ldo_object_alloc() method of top-level
722  * device.
723  */
724 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
725 {
726         list_move(&o->lo_linkage, &h->loh_layers);
727 }
728 EXPORT_SYMBOL(lu_object_add_top);
729
730 /*
731  * Add object @o as a layer of compound object, going after @before.1
732  *
733  * This is typically called by the ->ldo_object_alloc() method of
734  * @before->lo_dev.
735  */
736 void lu_object_add(struct lu_object *before, struct lu_object *o)
737 {
738         list_move(&o->lo_linkage, &before->lo_linkage);
739 }
740 EXPORT_SYMBOL(lu_object_add);
741
742 /*
743  * Initialize compound object.
744  */
745 int lu_object_header_init(struct lu_object_header *h)
746 {
747         memset(h, 0, sizeof *h);
748         atomic_set(&h->loh_ref, 1);
749         INIT_HLIST_NODE(&h->loh_hash);
750         CFS_INIT_LIST_HEAD(&h->loh_lru);
751         CFS_INIT_LIST_HEAD(&h->loh_layers);
752         return 0;
753 }
754 EXPORT_SYMBOL(lu_object_header_init);
755
756 /*
757  * Finalize compound object.
758  */
759 void lu_object_header_fini(struct lu_object_header *h)
760 {
761         LASSERT(list_empty(&h->loh_layers));
762         LASSERT(list_empty(&h->loh_lru));
763         LASSERT(hlist_unhashed(&h->loh_hash));
764 }
765 EXPORT_SYMBOL(lu_object_header_fini);
766
767 /*
768  * Given a compound object, find its slice, corresponding to the device type
769  * @dtype.
770  */
771 struct lu_object *lu_object_locate(struct lu_object_header *h,
772                                    struct lu_device_type *dtype)
773 {
774         struct lu_object *o;
775
776         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
777                 if (o->lo_dev->ld_type == dtype)
778                         return o;
779         }
780         return NULL;
781 }
782 EXPORT_SYMBOL(lu_object_locate);
783
784 enum {
785         /*
786          * Maximal number of tld slots.
787          */
788         LU_CONTEXT_KEY_NR = 16
789 };
790
791 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
792
793 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
794
795 /*
796  * Register new key.
797  */
798 int lu_context_key_register(struct lu_context_key *key)
799 {
800         int result;
801         int i;
802
803         LASSERT(key->lct_init != NULL);
804         LASSERT(key->lct_fini != NULL);
805         LASSERT(key->lct_tags != 0);
806         LASSERT(key->lct_owner != NULL);
807
808         result = -ENFILE;
809         spin_lock(&lu_keys_guard);
810         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
811                 if (lu_keys[i] == NULL) {
812                         key->lct_index = i;
813                         atomic_set(&key->lct_used, 1);
814                         lu_keys[i] = key;
815                         result = 0;
816                         break;
817                 }
818         }
819         spin_unlock(&lu_keys_guard);
820         return result;
821 }
822 EXPORT_SYMBOL(lu_context_key_register);
823
824 static void key_fini(struct lu_context *ctx, int index)
825 {
826         if (ctx->lc_value[index] != NULL) {
827                 struct lu_context_key *key;
828
829                 key = lu_keys[index];
830                 LASSERT(key != NULL);
831                 LASSERT(key->lct_fini != NULL);
832                 LASSERT(atomic_read(&key->lct_used) > 1);
833
834                 key->lct_fini(ctx, key, ctx->lc_value[index]);
835                 atomic_dec(&key->lct_used);
836                 LASSERT(key->lct_owner != NULL);
837                 if (!(ctx->lc_tags & LCT_NOREF)) {
838                         LASSERT(module_refcount(key->lct_owner) > 0);
839                         module_put(key->lct_owner);
840                 }
841                 ctx->lc_value[index] = NULL;
842         }
843 }
844
845 /*
846  * Deregister key.
847  */
848 void lu_context_key_degister(struct lu_context_key *key)
849 {
850         LASSERT(atomic_read(&key->lct_used) >= 1);
851         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
852
853         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
854
855         if (atomic_read(&key->lct_used) > 1)
856                 CERROR("key has instances.\n");
857         spin_lock(&lu_keys_guard);
858         lu_keys[key->lct_index] = NULL;
859         spin_unlock(&lu_keys_guard);
860 }
861 EXPORT_SYMBOL(lu_context_key_degister);
862
863 /*
864  * Return value associated with key @key in context @ctx.
865  */
866 void *lu_context_key_get(const struct lu_context *ctx,
867                          struct lu_context_key *key)
868 {
869         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
870         return ctx->lc_value[key->lct_index];
871 }
872 EXPORT_SYMBOL(lu_context_key_get);
873
874 static void keys_fini(struct lu_context *ctx)
875 {
876         int i;
877
878         if (ctx->lc_value != NULL) {
879                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
880                         key_fini(ctx, i);
881                 OBD_FREE(ctx->lc_value,
882                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
883                 ctx->lc_value = NULL;
884         }
885 }
886
887 static int keys_fill(const struct lu_context *ctx)
888 {
889         int i;
890
891         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
892                 struct lu_context_key *key;
893
894                 key = lu_keys[i];
895                 if (ctx->lc_value[i] == NULL &&
896                     key != NULL && key->lct_tags & ctx->lc_tags) {
897                         void *value;
898
899                         LASSERT(key->lct_init != NULL);
900                         LASSERT(key->lct_index == i);
901
902                         value = key->lct_init(ctx, key);
903                         if (unlikely(IS_ERR(value)))
904                                 return PTR_ERR(value);
905                         LASSERT(key->lct_owner != NULL);
906                         if (!(ctx->lc_tags & LCT_NOREF))
907                                 try_module_get(key->lct_owner);
908                         atomic_inc(&key->lct_used);
909                         ctx->lc_value[i] = value;
910                 }
911         }
912         return 0;
913 }
914
915 static int keys_init(struct lu_context *ctx)
916 {
917         int result;
918
919         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
920         if (likely(ctx->lc_value != NULL))
921                 result = keys_fill(ctx);
922         else
923                 result = -ENOMEM;
924
925         if (result != 0)
926                 keys_fini(ctx);
927         return result;
928 }
929
930 /*
931  * Initialize context data-structure. Create values for all keys.
932  */
933 int lu_context_init(struct lu_context *ctx, __u32 tags)
934 {
935         memset(ctx, 0, sizeof *ctx);
936         ctx->lc_tags = tags;
937         return keys_init(ctx);
938 }
939 EXPORT_SYMBOL(lu_context_init);
940
941 /*
942  * Finalize context data-structure. Destroy key values.
943  */
944 void lu_context_fini(struct lu_context *ctx)
945 {
946         keys_fini(ctx);
947 }
948 EXPORT_SYMBOL(lu_context_fini);
949
950 /*
951  * Called before entering context.
952  */
953 void lu_context_enter(struct lu_context *ctx)
954 {
955 }
956 EXPORT_SYMBOL(lu_context_enter);
957
958 /*
959  * Called after exiting from @ctx
960  */
961 void lu_context_exit(struct lu_context *ctx)
962 {
963         int i;
964
965         if (ctx->lc_value != NULL) {
966                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
967                         if (ctx->lc_value[i] != NULL) {
968                                 struct lu_context_key *key;
969
970                                 key = lu_keys[i];
971                                 LASSERT(key != NULL);
972                                 if (key->lct_exit != NULL)
973                                         key->lct_exit(ctx,
974                                                       key, ctx->lc_value[i]);
975                         }
976                 }
977         }
978 }
979 EXPORT_SYMBOL(lu_context_exit);
980
981 /*
982  * Allocate for context all missing keys that were registered after context
983  * creation.
984  */
985 int lu_context_refill(const struct lu_context *ctx)
986 {
987         LASSERT(ctx->lc_value != NULL);
988         return keys_fill(ctx);
989 }
990 EXPORT_SYMBOL(lu_context_refill);
991
992 static int lu_env_setup(struct lu_env *env, struct lu_context *ses,
993                         __u32 tags, int noref)
994 {
995         int result;
996
997         LASSERT(ergo(!noref, !(tags & LCT_NOREF)));
998
999         env->le_ses = ses;
1000         result = lu_context_init(&env->le_ctx, tags);
1001         if (likely(result == 0))
1002                 lu_context_enter(&env->le_ctx);
1003         return result;
1004 }
1005
1006 static int lu_env_init_noref(struct lu_env *env, struct lu_context *ses,
1007                              __u32 tags)
1008 {
1009         return lu_env_setup(env, ses, tags, 1);
1010 }
1011
1012 int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags)
1013 {
1014         return lu_env_setup(env, ses, tags, 0);
1015 }
1016 EXPORT_SYMBOL(lu_env_init);
1017
1018 void lu_env_fini(struct lu_env *env)
1019 {
1020         lu_context_exit(&env->le_ctx);
1021         lu_context_fini(&env->le_ctx);
1022         env->le_ses = NULL;
1023 }
1024 EXPORT_SYMBOL(lu_env_fini);
1025
1026 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1027 {
1028         struct lu_site *s;
1029         struct lu_site *tmp;
1030         int cached = 0;
1031         int remain = nr;
1032         LIST_HEAD(splice);
1033
1034         if (nr != 0 && !(gfp_mask & __GFP_FS))
1035                 return -1;
1036
1037         down(&lu_sites_guard);
1038         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1039                 if (nr != 0) {
1040                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1041                         /*
1042                          * Move just shrunk site to the tail of site list to
1043                          * assure shrinking fairness.
1044                          */
1045                         list_move_tail(&s->ls_linkage, &splice);
1046                 }
1047                 read_lock(&s->ls_guard);
1048                 cached += s->ls_total - s->ls_busy;
1049                 read_unlock(&s->ls_guard);
1050                 if (remain <= 0)
1051                         break;
1052         }
1053         list_splice(&splice, lu_sites.prev);
1054         up(&lu_sites_guard);
1055         return cached;
1056 }
1057
1058 static struct shrinker *lu_site_shrinker = NULL;
1059
1060 /*
1061  * Initialization of global lu_* data.
1062  */
1063 int lu_global_init(void)
1064 {
1065         int result;
1066
1067         LU_CONTEXT_KEY_INIT(&lu_global_key);
1068         result = lu_context_key_register(&lu_global_key);
1069         if (result == 0) {
1070                 /*
1071                  * At this level, we don't know what tags are needed, so
1072                  * allocate them conservatively. This should not be too bad,
1073                  * because this environment is global.
1074                  */
1075                 down(&lu_sites_guard);
1076                 result = lu_env_init_noref(&lu_shrink_env, NULL, LCT_SHRINKER);
1077                 up(&lu_sites_guard);
1078                 if (result == 0) {
1079                         /*
1080                          * seeks estimation: 3 seeks to read a record from oi,
1081                          * one to read inode, one for ea. Unfortunately
1082                          * setting this high value results in lu_object/inode
1083                          * cache consuming all the memory.
1084                          */
1085                         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS,
1086                                                         lu_cache_shrink);
1087                         if (result == 0)
1088                                 result = lu_time_global_init();
1089                 }
1090         }
1091         return result;
1092 }
1093
1094 /*
1095  * Dual to lu_global_init().
1096  */
1097 void lu_global_fini(void)
1098 {
1099         lu_time_global_fini();
1100         if (lu_site_shrinker != NULL) {
1101                 remove_shrinker(lu_site_shrinker);
1102                 lu_site_shrinker = NULL;
1103         }
1104
1105         lu_context_key_degister(&lu_global_key);
1106
1107         /*
1108          * Tear shrinker environment down _after_ de-registering
1109          * lu_global_key, because the latter has a value in the former.
1110          */
1111         down(&lu_sites_guard);
1112         lu_env_fini(&lu_shrink_env);
1113         up(&lu_sites_guard);
1114 }
1115
1116 struct lu_buf LU_BUF_NULL = {
1117         .lb_buf = NULL,
1118         .lb_len = 0
1119 };
1120 EXPORT_SYMBOL(LU_BUF_NULL);
1121
1122 /*
1123  * XXX: Functions below logically belong to fid module, but they are used by
1124  * dt_store_open(). Put them here until better place is found.
1125  */
1126
1127 void fid_pack(struct lu_fid_pack *pack, const struct lu_fid *fid,
1128               struct lu_fid *befider)
1129 {
1130         int recsize;
1131         __u64 seq;
1132         __u32 oid;
1133
1134         seq = fid_seq(fid);
1135         oid = fid_oid(fid);
1136
1137         /*
1138          * Two cases: compact 6 bytes representation for a common case, and
1139          * full 17 byte representation for "unusual" fid.
1140          */
1141
1142         /*
1143          * Check that usual case is really usual.
1144          */
1145         CLASSERT(LUSTRE_SEQ_MAX_WIDTH < 0xffffull);
1146
1147         if (fid_is_igif(fid) ||
1148             seq > 0xffffffull || oid > 0xffff || fid_ver(fid) != 0) {
1149                 fid_cpu_to_be(befider, fid);
1150                 recsize = sizeof *befider;
1151         } else {
1152                 unsigned char *small_befider;
1153
1154                 small_befider = (char *)befider;
1155
1156                 small_befider[0] = seq >> 16;
1157                 small_befider[1] = seq >> 8;
1158                 small_befider[2] = seq;
1159
1160                 small_befider[3] = oid >> 8;
1161                 small_befider[4] = oid;
1162
1163                 recsize = 5;
1164         }
1165         memcpy(pack->fp_area, befider, recsize);
1166         pack->fp_len = recsize + 1;
1167 }
1168 EXPORT_SYMBOL(fid_pack);
1169
1170 int fid_unpack(const struct lu_fid_pack *pack, struct lu_fid *fid)
1171 {
1172         int result;
1173
1174         result = 0;
1175         switch (pack->fp_len) {
1176         case sizeof *fid + 1:
1177                 memcpy(fid, pack->fp_area, sizeof *fid);
1178                 fid_be_to_cpu(fid, fid);
1179                 break;
1180         case 6: {
1181                 const unsigned char *area;
1182
1183                 area = pack->fp_area;
1184                 fid->f_seq = (area[0] << 16) | (area[1] << 8) | area[2];
1185                 fid->f_oid = (area[3] << 8) | area[4];
1186                 fid->f_ver = 0;
1187                 break;
1188         }
1189         default:
1190                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1191                 result = -EIO;
1192         }
1193         return result;
1194 }
1195 EXPORT_SYMBOL(fid_unpack);
1196
1197 const char *lu_time_names[LU_TIME_NR] = {
1198         [LU_TIME_FIND_LOOKUP] = "find_lookup",
1199         [LU_TIME_FIND_ALLOC]  = "find_alloc",
1200         [LU_TIME_FIND_INSERT] = "find_insert"
1201 };
1202 EXPORT_SYMBOL(lu_time_names);