Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 /* nr_free_pages() */
39 #include <linux/swap.h>
40 /* hash_long() */
41 #include <linux/hash.h>
42 #include <obd_support.h>
43 #include <lustre_disk.h>
44 #include <lustre_fid.h>
45 #include <lu_object.h>
46 #include <libcfs/list.h>
47 /* lu_time_global_{init,fini}() */
48 #include <lu_time.h>
49
50 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
51
52 /*
53  * Decrease reference counter on object. If last reference is freed, return
54  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
55  * case, free object immediately.
56  */
57 void lu_object_put(const struct lu_env *env, struct lu_object *o)
58 {
59         struct lu_object_header *top;
60         struct lu_site          *site;
61         struct lu_object        *orig;
62         int                      kill_it;
63
64         top = o->lo_header;
65         site = o->lo_dev->ld_site;
66         orig = o;
67         kill_it = 0;
68         write_lock(&site->ls_guard);
69         if (atomic_dec_and_test(&top->loh_ref)) {
70                 /*
71                  * When last reference is released, iterate over object
72                  * layers, and notify them that object is no longer busy.
73                  */
74                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
75                         if (o->lo_ops->loo_object_release != NULL)
76                                 o->lo_ops->loo_object_release(env, o);
77                 }
78                 -- site->ls_busy;
79                 if (lu_object_is_dying(top)) {
80                         /*
81                          * If object is dying (will not be cached), removed it
82                          * from hash table and LRU.
83                          *
84                          * This is done with hash table and LRU lists
85                          * locked. As the only way to acquire first reference
86                          * to previously unreferenced object is through
87                          * hash-table lookup (lu_object_find()), or LRU
88                          * scanning (lu_site_purge()), that are done under
89                          * hash-table and LRU lock, no race with concurrent
90                          * object lookup is possible and we can safely destroy
91                          * object below.
92                          */
93                         hlist_del_init(&top->loh_hash);
94                         list_del_init(&top->loh_lru);
95                         -- site->ls_total;
96                         kill_it = 1;
97                 }
98         }
99         write_unlock(&site->ls_guard);
100         if (kill_it)
101                 /*
102                  * Object was already removed from hash and lru above, can
103                  * kill it.
104                  */
105                 lu_object_free(env, orig);
106 }
107 EXPORT_SYMBOL(lu_object_put);
108
109 /*
110  * Allocate new object.
111  *
112  * This follows object creation protocol, described in the comment within
113  * struct lu_device_operations definition.
114  */
115 static struct lu_object *lu_object_alloc(const struct lu_env *env,
116                                          struct lu_site *s,
117                                          const struct lu_fid *f)
118 {
119         struct lu_object *scan;
120         struct lu_object *top;
121         struct list_head *layers;
122         int clean;
123         int result;
124         ENTRY;
125
126         /*
127          * Create top-level object slice. This will also create
128          * lu_object_header.
129          */
130         top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
131                                                       NULL, s->ls_top_dev);
132         if (top == NULL)
133                 RETURN(ERR_PTR(-ENOMEM));
134         /*
135          * This is the only place where object fid is assigned. It's constant
136          * after this point.
137          */
138         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
139         top->lo_header->loh_fid  = *f;
140         layers = &top->lo_header->loh_layers;
141         do {
142                 /*
143                  * Call ->loo_object_init() repeatedly, until no more new
144                  * object slices are created.
145                  */
146                 clean = 1;
147                 list_for_each_entry(scan, layers, lo_linkage) {
148                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
149                                 continue;
150                         clean = 0;
151                         scan->lo_header = top->lo_header;
152                         result = scan->lo_ops->loo_object_init(env, scan);
153                         if (result != 0) {
154                                 lu_object_free(env, top);
155                                 RETURN(ERR_PTR(result));
156                         }
157                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
158                 }
159         } while (!clean);
160
161         list_for_each_entry_reverse(scan, layers, lo_linkage) {
162                 if (scan->lo_ops->loo_object_start != NULL) {
163                         result = scan->lo_ops->loo_object_start(env, scan);
164                         if (result != 0) {
165                                 lu_object_free(env, top);
166                                 RETURN(ERR_PTR(result));
167                         }
168                 }
169         }
170
171         s->ls_stats.s_created ++;
172         RETURN(top);
173 }
174
175 /*
176  * Free object.
177  */
178 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
179 {
180         struct list_head splice;
181         struct lu_object *scan;
182
183         /*
184          * First call ->loo_object_delete() method to release all resources.
185          */
186         list_for_each_entry_reverse(scan,
187                                     &o->lo_header->loh_layers, lo_linkage) {
188                 if (scan->lo_ops->loo_object_delete != NULL)
189                         scan->lo_ops->loo_object_delete(env, scan);
190         }
191
192         /*
193          * Then, splice object layers into stand-alone list, and call
194          * ->loo_object_free() on all layers to free memory. Splice is
195          * necessary, because lu_object_header is freed together with the
196          * top-level slice.
197          */
198         INIT_LIST_HEAD(&splice);
199         list_splice_init(&o->lo_header->loh_layers, &splice);
200         while (!list_empty(&splice)) {
201                 o = container_of0(splice.next, struct lu_object, lo_linkage);
202                 list_del_init(&o->lo_linkage);
203                 LASSERT(o->lo_ops->loo_object_free != NULL);
204                 o->lo_ops->loo_object_free(env, o);
205         }
206 }
207
208 /*
209  * Free @nr objects from the cold end of the site LRU list.
210  */
211 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
212 {
213         struct list_head         dispose;
214         struct lu_object_header *h;
215         struct lu_object_header *temp;
216
217         INIT_LIST_HEAD(&dispose);
218         /*
219          * Under LRU list lock, scan LRU list and move unreferenced objects to
220          * the dispose list, removing them from LRU and hash table.
221          */
222         write_lock(&s->ls_guard);
223         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
224                 /*
225                  * Objects are sorted in lru order, and "busy" objects (ones
226                  * with h->loh_ref > 0) naturally tend to live near hot end
227                  * that we scan last. Unfortunately, sites usually have small
228                  * (less then ten) number of busy yet rarely accessed objects
229                  * (some global objects, accessed directly through pointers,
230                  * bypassing hash table). Currently algorithm scans them over
231                  * and over again. Probably we should move busy objects out of
232                  * LRU, or we can live with that.
233                  */
234                 if (nr-- == 0)
235                         break;
236                 if (atomic_read(&h->loh_ref) > 0)
237                         continue;
238                 hlist_del_init(&h->loh_hash);
239                 list_move(&h->loh_lru, &dispose);
240                 s->ls_total --;
241         }
242         write_unlock(&s->ls_guard);
243         /*
244          * Free everything on the dispose list. This is safe against races due
245          * to the reasons described in lu_object_put().
246          */
247         while (!list_empty(&dispose)) {
248                 h = container_of0(dispose.next,
249                                  struct lu_object_header, loh_lru);
250                 list_del_init(&h->loh_lru);
251                 lu_object_free(env, lu_object_top(h));
252                 s->ls_stats.s_lru_purged ++;
253         }
254         return nr;
255 }
256 EXPORT_SYMBOL(lu_site_purge);
257
258 /*
259  * Object printing.
260  *
261  * Code below has to jump through certain loops to output object description
262  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
263  * composes object description from strings that are parts of _lines_ of
264  * output (i.e., strings that are not terminated by newline). This doesn't fit
265  * very well into libcfs_debug_msg() interface that assumes that each message
266  * supplied to it is a self-contained output line.
267  *
268  * To work around this, strings are collected in a temporary buffer
269  * (implemented as a value of lu_cdebug_key key), until terminating newline
270  * character is detected.
271  *
272  */
273
274 enum {
275         /*
276          * Maximal line size.
277          *
278          * XXX overflow is not handled correctly.
279          */
280         LU_CDEBUG_LINE = 256
281 };
282
283 struct lu_cdebug_data {
284         /*
285          * Temporary buffer.
286          */
287         char lck_area[LU_CDEBUG_LINE];
288         /*
289          * fid staging area used by dt_store_open().
290          */
291         struct lu_fid_pack lck_pack;
292 };
293
294 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
295 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
296
297 /*
298  * Key, holding temporary buffer. This key is registered very early by
299  * lu_global_init().
300  */
301 struct lu_context_key lu_global_key = {
302         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
303         .lct_init = lu_global_key_init,
304         .lct_fini = lu_global_key_fini
305 };
306
307 /*
308  * Printer function emitting messages through libcfs_debug_msg().
309  */
310 int lu_cdebug_printer(const struct lu_env *env,
311                       void *cookie, const char *format, ...)
312 {
313         struct lu_cdebug_print_info *info = cookie;
314         struct lu_cdebug_data       *key;
315         int used;
316         int complete;
317         va_list args;
318
319         va_start(args, format);
320
321         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
322         LASSERT(key != NULL);
323
324         used = strlen(key->lck_area);
325         complete = format[strlen(format) - 1] == '\n';
326         /*
327          * Append new chunk to the buffer.
328          */
329         vsnprintf(key->lck_area + used,
330                   ARRAY_SIZE(key->lck_area) - used, format, args);
331         if (complete) {
332                 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
333                                  (char *)info->lpi_file, info->lpi_fn,
334                                  info->lpi_line, "%s", key->lck_area);
335                 key->lck_area[0] = 0;
336         }
337         va_end(args);
338         return 0;
339 }
340 EXPORT_SYMBOL(lu_cdebug_printer);
341
342 /*
343  * Print object header.
344  */
345 static void lu_object_header_print(const struct lu_env *env,
346                                    void *cookie, lu_printer_t printer,
347                                    const struct lu_object_header *hdr)
348 {
349         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
350                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
351                    PFID(&hdr->loh_fid),
352                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
353                    list_empty(&hdr->loh_lru) ? "" : " lru",
354                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
355 }
356
357 /*
358  * Print human readable representation of the @o to the @printer.
359  */
360 void lu_object_print(const struct lu_env *env, void *cookie,
361                      lu_printer_t printer, const struct lu_object *o)
362 {
363         static const char ruler[] = "........................................";
364         struct lu_object_header *top;
365         int depth;
366
367         top = o->lo_header;
368         lu_object_header_print(env, cookie, printer, top);
369         (*printer)(env, cookie, "\n");
370         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
371                 depth = o->lo_depth + 4;
372                 LASSERT(o->lo_ops->loo_object_print != NULL);
373                 /*
374                  * print `.' @depth times.
375                  */
376                 (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
377                 o->lo_ops->loo_object_print(env, cookie, printer, o);
378                 (*printer)(env, cookie, "\n");
379         }
380 }
381 EXPORT_SYMBOL(lu_object_print);
382
383 /*
384  * Check object consistency.
385  */
386 int lu_object_invariant(const struct lu_object *o)
387 {
388         struct lu_object_header *top;
389
390         top = o->lo_header;
391         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
392                 if (o->lo_ops->loo_object_invariant != NULL &&
393                     !o->lo_ops->loo_object_invariant(o))
394                         return 0;
395         }
396         return 1;
397 }
398 EXPORT_SYMBOL(lu_object_invariant);
399
400 static struct lu_object *htable_lookup(struct lu_site *s,
401                                        const struct hlist_head *bucket,
402                                        const struct lu_fid *f)
403 {
404         struct lu_object_header *h;
405         struct hlist_node *scan;
406
407         hlist_for_each_entry(h, scan, bucket, loh_hash) {
408                 s->ls_stats.s_cache_check ++;
409                 if (likely(lu_fid_eq(&h->loh_fid, f) &&
410                            !lu_object_is_dying(h))) {
411                         /* bump reference count... */
412                         if (atomic_add_return(1, &h->loh_ref) == 1)
413                                 ++ s->ls_busy;
414                         /* and move to the head of the LRU */
415                         /*
416                          * XXX temporary disable this to measure effects of
417                          * read-write locking.
418                          */
419                         /* list_move_tail(&h->loh_lru, &s->ls_lru); */
420                         s->ls_stats.s_cache_hit ++;
421                         return lu_object_top(h);
422                 }
423         }
424         s->ls_stats.s_cache_miss ++;
425         return NULL;
426 }
427
428 static __u32 fid_hash(const struct lu_fid *f, int bits)
429 {
430         /* all objects with same id and different versions will belong to same
431          * collisions list. */
432         return hash_long(fid_flatten(f), bits);
433 }
434
435 /*
436  * Search cache for an object with the fid @f. If such object is found, return
437  * it. Otherwise, create new object, insert it into cache and return it. In
438  * any case, additional reference is acquired on the returned object.
439  */
440 struct lu_object *lu_object_find(const struct lu_env *env,
441                                  struct lu_site *s, const struct lu_fid *f)
442 {
443         struct lu_object     *o;
444         struct lu_object     *shadow;
445         struct hlist_head *bucket;
446
447         /*
448          * This uses standard index maintenance protocol:
449          *
450          *     - search index under lock, and return object if found;
451          *     - otherwise, unlock index, allocate new object;
452          *     - lock index and search again;
453          *     - if nothing is found (usual case), insert newly created
454          *       object into index;
455          *     - otherwise (race: other thread inserted object), free
456          *       object just allocated.
457          *     - unlock index;
458          *     - return object.
459          */
460
461         bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits);
462
463         read_lock(&s->ls_guard);
464         o = htable_lookup(s, bucket, f);
465         read_unlock(&s->ls_guard);
466
467         if (o != NULL)
468                 return o;
469
470         /*
471          * Allocate new object. This may result in rather complicated
472          * operations, including fld queries, inode loading, etc.
473          */
474         o = lu_object_alloc(env, s, f);
475         if (unlikely(IS_ERR(o)))
476                 return o;
477
478         LASSERT(lu_fid_eq(lu_object_fid(o), f));
479
480         write_lock(&s->ls_guard);
481         shadow = htable_lookup(s, bucket, f);
482         if (likely(shadow == NULL)) {
483                 hlist_add_head(&o->lo_header->loh_hash, bucket);
484                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
485                 ++ s->ls_busy;
486                 ++ s->ls_total;
487                 shadow = o;
488                 o = NULL;
489         } else
490                 s->ls_stats.s_cache_race ++;
491         write_unlock(&s->ls_guard);
492         if (o != NULL)
493                 lu_object_free(env, o);
494         return shadow;
495 }
496 EXPORT_SYMBOL(lu_object_find);
497
498 /*
499  * Global list of all sites on this node
500  */
501 static LIST_HEAD(lu_sites);
502 static DECLARE_MUTEX(lu_sites_guard);
503
504 /*
505  * Global environment used by site shrinker.
506  */
507 static struct lu_env lu_shrink_env;
508
509 /*
510  * Print all objects in @s.
511  */
512 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
513                    lu_printer_t printer)
514 {
515         int i;
516
517         for (i = 0; i < s->ls_hash_size; ++i) {
518                 struct lu_object_header *h;
519                 struct hlist_node       *scan;
520
521                 read_lock(&s->ls_guard);
522                 hlist_for_each_entry(h, scan, &s->ls_hash[i], loh_hash) {
523
524                         if (!list_empty(&h->loh_layers)) {
525                                 const struct lu_object *obj;
526
527                                 obj = lu_object_top(h);
528                                 lu_object_print(env, cookie, printer, obj);
529                         } else
530                                 lu_object_header_print(env, cookie, printer, h);
531                 }
532                 read_unlock(&s->ls_guard);
533         }
534 }
535 EXPORT_SYMBOL(lu_site_print);
536
537 enum {
538         LU_CACHE_PERCENT   = 30,
539 };
540
541 /*
542  * Return desired hash table order.
543  */
544 static int lu_htable_order(void)
545 {
546         int bits;
547         unsigned long cache_size;
548
549         /*
550          * Calculate hash table size, assuming that we want reasonable
551          * performance when 30% of available memory is occupied by cache of
552          * lu_objects.
553          *
554          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
555          */
556         cache_size = nr_free_buffer_pages() / 100 *
557                 LU_CACHE_PERCENT * (CFS_PAGE_SIZE / 1024);
558
559         for (bits = 1; (1 << bits) < cache_size; ++bits) {
560                 ;
561         }
562         return bits;
563 }
564
565 /*
566  * Initialize site @s, with @d as the top level device.
567  */
568 int lu_site_init(struct lu_site *s, struct lu_device *top)
569 {
570         int bits;
571         int size;
572         int i;
573         ENTRY;
574
575         memset(s, 0, sizeof *s);
576         rwlock_init(&s->ls_guard);
577         CFS_INIT_LIST_HEAD(&s->ls_lru);
578         CFS_INIT_LIST_HEAD(&s->ls_linkage);
579         s->ls_top_dev = top;
580         top->ld_site = s;
581         lu_device_get(top);
582
583         for (bits = lu_htable_order(), size = 1 << bits;
584              (s->ls_hash =
585               cfs_alloc_large(size * sizeof s->ls_hash[0])) == NULL;
586              --bits, size >>= 1) {
587                 /*
588                  * Scale hash table down, until allocation succeeds.
589                  */
590                 ;
591         }
592
593         s->ls_hash_size = size;
594         s->ls_hash_bits = bits;
595         s->ls_hash_mask = size - 1;
596
597         for (i = 0; i < size; i++)
598                 INIT_HLIST_HEAD(&s->ls_hash[i]);
599
600         RETURN(0);
601 }
602 EXPORT_SYMBOL(lu_site_init);
603
604 /*
605  * Finalize @s and release its resources.
606  */
607 void lu_site_fini(struct lu_site *s)
608 {
609         LASSERT(list_empty(&s->ls_lru));
610         LASSERT(s->ls_total == 0);
611
612         down(&lu_sites_guard);
613         list_del_init(&s->ls_linkage);
614         up(&lu_sites_guard);
615
616         if (s->ls_hash != NULL) {
617                 int i;
618                 for (i = 0; i < s->ls_hash_size; i++)
619                         LASSERT(hlist_empty(&s->ls_hash[i]));
620                 cfs_free_large(s->ls_hash);
621                 s->ls_hash = NULL;
622         }
623         if (s->ls_top_dev != NULL) {
624                 s->ls_top_dev->ld_site = NULL;
625                 lu_device_put(s->ls_top_dev);
626                 s->ls_top_dev = NULL;
627         }
628 }
629 EXPORT_SYMBOL(lu_site_fini);
630
631 /*
632  * Called when initialization of stack for this site is completed.
633  */
634 int lu_site_init_finish(struct lu_site *s)
635 {
636         int result;
637         down(&lu_sites_guard);
638         result = lu_context_refill(&lu_shrink_env.le_ctx);
639         if (result == 0)
640                 list_add(&s->ls_linkage, &lu_sites);
641         up(&lu_sites_guard);
642         return result;
643 }
644 EXPORT_SYMBOL(lu_site_init_finish);
645
646 /*
647  * Acquire additional reference on device @d
648  */
649 void lu_device_get(struct lu_device *d)
650 {
651         atomic_inc(&d->ld_ref);
652 }
653 EXPORT_SYMBOL(lu_device_get);
654
655 /*
656  * Release reference on device @d.
657  */
658 void lu_device_put(struct lu_device *d)
659 {
660         atomic_dec(&d->ld_ref);
661 }
662 EXPORT_SYMBOL(lu_device_put);
663
664 /*
665  * Initialize device @d of type @t.
666  */
667 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
668 {
669         memset(d, 0, sizeof *d);
670         atomic_set(&d->ld_ref, 0);
671         d->ld_type = t;
672         return 0;
673 }
674 EXPORT_SYMBOL(lu_device_init);
675
676 /*
677  * Finalize device @d.
678  */
679 void lu_device_fini(struct lu_device *d)
680 {
681         if (d->ld_obd != NULL)
682                 /* finish lprocfs */
683                 lprocfs_obd_cleanup(d->ld_obd);
684
685         LASSERTF(atomic_read(&d->ld_ref) == 0,
686                  "Refcount is %u\n", atomic_read(&d->ld_ref));
687 }
688 EXPORT_SYMBOL(lu_device_fini);
689
690 /*
691  * Initialize object @o that is part of compound object @h and was created by
692  * device @d.
693  */
694 int lu_object_init(struct lu_object *o,
695                    struct lu_object_header *h, struct lu_device *d)
696 {
697         memset(o, 0, sizeof *o);
698         o->lo_header = h;
699         o->lo_dev    = d;
700         lu_device_get(d);
701         CFS_INIT_LIST_HEAD(&o->lo_linkage);
702         return 0;
703 }
704 EXPORT_SYMBOL(lu_object_init);
705
706 /*
707  * Finalize object and release its resources.
708  */
709 void lu_object_fini(struct lu_object *o)
710 {
711         LASSERT(list_empty(&o->lo_linkage));
712
713         if (o->lo_dev != NULL) {
714                 lu_device_put(o->lo_dev);
715                 o->lo_dev = NULL;
716         }
717 }
718 EXPORT_SYMBOL(lu_object_fini);
719
720 /*
721  * Add object @o as first layer of compound object @h
722  *
723  * This is typically called by the ->ldo_object_alloc() method of top-level
724  * device.
725  */
726 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
727 {
728         list_move(&o->lo_linkage, &h->loh_layers);
729 }
730 EXPORT_SYMBOL(lu_object_add_top);
731
732 /*
733  * Add object @o as a layer of compound object, going after @before.1
734  *
735  * This is typically called by the ->ldo_object_alloc() method of
736  * @before->lo_dev.
737  */
738 void lu_object_add(struct lu_object *before, struct lu_object *o)
739 {
740         list_move(&o->lo_linkage, &before->lo_linkage);
741 }
742 EXPORT_SYMBOL(lu_object_add);
743
744 /*
745  * Initialize compound object.
746  */
747 int lu_object_header_init(struct lu_object_header *h)
748 {
749         memset(h, 0, sizeof *h);
750         atomic_set(&h->loh_ref, 1);
751         INIT_HLIST_NODE(&h->loh_hash);
752         CFS_INIT_LIST_HEAD(&h->loh_lru);
753         CFS_INIT_LIST_HEAD(&h->loh_layers);
754         return 0;
755 }
756 EXPORT_SYMBOL(lu_object_header_init);
757
758 /*
759  * Finalize compound object.
760  */
761 void lu_object_header_fini(struct lu_object_header *h)
762 {
763         LASSERT(list_empty(&h->loh_layers));
764         LASSERT(list_empty(&h->loh_lru));
765         LASSERT(hlist_unhashed(&h->loh_hash));
766 }
767 EXPORT_SYMBOL(lu_object_header_fini);
768
769 /*
770  * Given a compound object, find its slice, corresponding to the device type
771  * @dtype.
772  */
773 struct lu_object *lu_object_locate(struct lu_object_header *h,
774                                    struct lu_device_type *dtype)
775 {
776         struct lu_object *o;
777
778         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
779                 if (o->lo_dev->ld_type == dtype)
780                         return o;
781         }
782         return NULL;
783 }
784 EXPORT_SYMBOL(lu_object_locate);
785
786 enum {
787         /*
788          * Maximal number of tld slots.
789          */
790         LU_CONTEXT_KEY_NR = 16
791 };
792
793 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
794
795 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
796
797 /*
798  * Register new key.
799  */
800 int lu_context_key_register(struct lu_context_key *key)
801 {
802         int result;
803         int i;
804
805         LASSERT(key->lct_init != NULL);
806         LASSERT(key->lct_fini != NULL);
807         LASSERT(key->lct_tags != 0);
808         LASSERT(key->lct_owner != NULL);
809
810         result = -ENFILE;
811         spin_lock(&lu_keys_guard);
812         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
813                 if (lu_keys[i] == NULL) {
814                         key->lct_index = i;
815                         atomic_set(&key->lct_used, 1);
816                         lu_keys[i] = key;
817                         result = 0;
818                         break;
819                 }
820         }
821         spin_unlock(&lu_keys_guard);
822         return result;
823 }
824 EXPORT_SYMBOL(lu_context_key_register);
825
826 static void key_fini(struct lu_context *ctx, int index)
827 {
828         if (ctx->lc_value[index] != NULL) {
829                 struct lu_context_key *key;
830
831                 key = lu_keys[index];
832                 LASSERT(key != NULL);
833                 LASSERT(key->lct_fini != NULL);
834                 LASSERT(atomic_read(&key->lct_used) > 1);
835
836                 key->lct_fini(ctx, key, ctx->lc_value[index]);
837                 atomic_dec(&key->lct_used);
838                 LASSERT(key->lct_owner != NULL);
839                 if (!(ctx->lc_tags & LCT_NOREF)) {
840                         LASSERT(module_refcount(key->lct_owner) > 0);
841                         module_put(key->lct_owner);
842                 }
843                 ctx->lc_value[index] = NULL;
844         }
845 }
846
847 /*
848  * Deregister key.
849  */
850 void lu_context_key_degister(struct lu_context_key *key)
851 {
852         LASSERT(atomic_read(&key->lct_used) >= 1);
853         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
854
855         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
856
857         if (atomic_read(&key->lct_used) > 1)
858                 CERROR("key has instances.\n");
859         spin_lock(&lu_keys_guard);
860         lu_keys[key->lct_index] = NULL;
861         spin_unlock(&lu_keys_guard);
862 }
863 EXPORT_SYMBOL(lu_context_key_degister);
864
865 /*
866  * Return value associated with key @key in context @ctx.
867  */
868 void *lu_context_key_get(const struct lu_context *ctx,
869                          struct lu_context_key *key)
870 {
871         LASSERT(ctx->lc_state == LCS_ENTERED);
872         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
873         return ctx->lc_value[key->lct_index];
874 }
875 EXPORT_SYMBOL(lu_context_key_get);
876
877 static void keys_fini(struct lu_context *ctx)
878 {
879         int i;
880
881         if (ctx->lc_value != NULL) {
882                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
883                         key_fini(ctx, i);
884                 OBD_FREE(ctx->lc_value,
885                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
886                 ctx->lc_value = NULL;
887         }
888 }
889
890 static int keys_fill(const struct lu_context *ctx)
891 {
892         int i;
893
894         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
895                 struct lu_context_key *key;
896
897                 key = lu_keys[i];
898                 if (ctx->lc_value[i] == NULL &&
899                     key != NULL && key->lct_tags & ctx->lc_tags) {
900                         void *value;
901
902                         LASSERT(key->lct_init != NULL);
903                         LASSERT(key->lct_index == i);
904
905                         value = key->lct_init(ctx, key);
906                         if (unlikely(IS_ERR(value)))
907                                 return PTR_ERR(value);
908                         LASSERT(key->lct_owner != NULL);
909                         if (!(ctx->lc_tags & LCT_NOREF))
910                                 try_module_get(key->lct_owner);
911                         atomic_inc(&key->lct_used);
912                         ctx->lc_value[i] = value;
913                 }
914         }
915         return 0;
916 }
917
918 static int keys_init(struct lu_context *ctx)
919 {
920         int result;
921
922         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
923         if (likely(ctx->lc_value != NULL))
924                 result = keys_fill(ctx);
925         else
926                 result = -ENOMEM;
927
928         if (result != 0)
929                 keys_fini(ctx);
930         return result;
931 }
932
933 /*
934  * Initialize context data-structure. Create values for all keys.
935  */
936 int lu_context_init(struct lu_context *ctx, __u32 tags)
937 {
938         memset(ctx, 0, sizeof *ctx);
939         ctx->lc_state = LCS_INITIALIZED;
940         ctx->lc_tags = tags;
941         return keys_init(ctx);
942 }
943 EXPORT_SYMBOL(lu_context_init);
944
945 /*
946  * Finalize context data-structure. Destroy key values.
947  */
948 void lu_context_fini(struct lu_context *ctx)
949 {
950         LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
951         ctx->lc_state = LCS_FINALIZED;
952         keys_fini(ctx);
953 }
954 EXPORT_SYMBOL(lu_context_fini);
955
956 /*
957  * Called before entering context.
958  */
959 void lu_context_enter(struct lu_context *ctx)
960 {
961         LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
962         ctx->lc_state = LCS_ENTERED;
963 }
964 EXPORT_SYMBOL(lu_context_enter);
965
966 /*
967  * Called after exiting from @ctx
968  */
969 void lu_context_exit(struct lu_context *ctx)
970 {
971         int i;
972
973         LASSERT(ctx->lc_state == LCS_ENTERED);
974         ctx->lc_state = LCS_LEFT;
975         if (ctx->lc_value != NULL) {
976                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
977                         if (ctx->lc_value[i] != NULL) {
978                                 struct lu_context_key *key;
979
980                                 key = lu_keys[i];
981                                 LASSERT(key != NULL);
982                                 if (key->lct_exit != NULL)
983                                         key->lct_exit(ctx,
984                                                       key, ctx->lc_value[i]);
985                         }
986                 }
987         }
988 }
989 EXPORT_SYMBOL(lu_context_exit);
990
991 /*
992  * Allocate for context all missing keys that were registered after context
993  * creation.
994  */
995 int lu_context_refill(const struct lu_context *ctx)
996 {
997         LASSERT(ctx->lc_value != NULL);
998         return keys_fill(ctx);
999 }
1000 EXPORT_SYMBOL(lu_context_refill);
1001
1002 static int lu_env_setup(struct lu_env *env, struct lu_context *ses,
1003                         __u32 tags, int noref)
1004 {
1005         int result;
1006
1007         LASSERT(ergo(!noref, !(tags & LCT_NOREF)));
1008
1009         env->le_ses = ses;
1010         result = lu_context_init(&env->le_ctx, tags);
1011         if (likely(result == 0))
1012                 lu_context_enter(&env->le_ctx);
1013         return result;
1014 }
1015
1016 static int lu_env_init_noref(struct lu_env *env, struct lu_context *ses,
1017                              __u32 tags)
1018 {
1019         return lu_env_setup(env, ses, tags, 1);
1020 }
1021
1022 int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags)
1023 {
1024         return lu_env_setup(env, ses, tags, 0);
1025 }
1026 EXPORT_SYMBOL(lu_env_init);
1027
1028 void lu_env_fini(struct lu_env *env)
1029 {
1030         lu_context_exit(&env->le_ctx);
1031         lu_context_fini(&env->le_ctx);
1032         env->le_ses = NULL;
1033 }
1034 EXPORT_SYMBOL(lu_env_fini);
1035
1036 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1037 {
1038         struct lu_site *s;
1039         struct lu_site *tmp;
1040         int cached = 0;
1041         int remain = nr;
1042         LIST_HEAD(splice);
1043
1044         if (nr != 0 && !(gfp_mask & __GFP_FS))
1045                 return -1;
1046
1047         down(&lu_sites_guard);
1048         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1049                 if (nr != 0) {
1050                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1051                         /*
1052                          * Move just shrunk site to the tail of site list to
1053                          * assure shrinking fairness.
1054                          */
1055                         list_move_tail(&s->ls_linkage, &splice);
1056                 }
1057                 read_lock(&s->ls_guard);
1058                 cached += s->ls_total - s->ls_busy;
1059                 read_unlock(&s->ls_guard);
1060                 if (remain <= 0)
1061                         break;
1062         }
1063         list_splice(&splice, lu_sites.prev);
1064         up(&lu_sites_guard);
1065         return cached;
1066 }
1067
1068 static struct shrinker *lu_site_shrinker = NULL;
1069
1070 /*
1071  * Initialization of global lu_* data.
1072  */
1073 int lu_global_init(void)
1074 {
1075         int result;
1076
1077         LU_CONTEXT_KEY_INIT(&lu_global_key);
1078         result = lu_context_key_register(&lu_global_key);
1079         if (result == 0) {
1080                 /*
1081                  * At this level, we don't know what tags are needed, so
1082                  * allocate them conservatively. This should not be too bad,
1083                  * because this environment is global.
1084                  */
1085                 down(&lu_sites_guard);
1086                 result = lu_env_init_noref(&lu_shrink_env, NULL, LCT_SHRINKER);
1087                 up(&lu_sites_guard);
1088                 if (result == 0) {
1089                         /*
1090                          * seeks estimation: 3 seeks to read a record from oi,
1091                          * one to read inode, one for ea. Unfortunately
1092                          * setting this high value results in lu_object/inode
1093                          * cache consuming all the memory.
1094                          */
1095                         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS,
1096                                                         lu_cache_shrink);
1097                         if (result == 0)
1098                                 result = lu_time_global_init();
1099                 }
1100         }
1101         return result;
1102 }
1103
1104 /*
1105  * Dual to lu_global_init().
1106  */
1107 void lu_global_fini(void)
1108 {
1109         lu_time_global_fini();
1110         if (lu_site_shrinker != NULL) {
1111                 remove_shrinker(lu_site_shrinker);
1112                 lu_site_shrinker = NULL;
1113         }
1114
1115         lu_context_key_degister(&lu_global_key);
1116
1117         /*
1118          * Tear shrinker environment down _after_ de-registering
1119          * lu_global_key, because the latter has a value in the former.
1120          */
1121         down(&lu_sites_guard);
1122         lu_env_fini(&lu_shrink_env);
1123         up(&lu_sites_guard);
1124 }
1125
1126 struct lu_buf LU_BUF_NULL = {
1127         .lb_buf = NULL,
1128         .lb_len = 0
1129 };
1130 EXPORT_SYMBOL(LU_BUF_NULL);
1131
1132 /*
1133  * XXX: Functions below logically belong to fid module, but they are used by
1134  * dt_store_open(). Put them here until better place is found.
1135  */
1136
1137 void fid_pack(struct lu_fid_pack *pack, const struct lu_fid *fid,
1138               struct lu_fid *befider)
1139 {
1140         int recsize;
1141         __u64 seq;
1142         __u32 oid;
1143
1144         seq = fid_seq(fid);
1145         oid = fid_oid(fid);
1146
1147         /*
1148          * Two cases: compact 6 bytes representation for a common case, and
1149          * full 17 byte representation for "unusual" fid.
1150          */
1151
1152         /*
1153          * Check that usual case is really usual.
1154          */
1155         CLASSERT(LUSTRE_SEQ_MAX_WIDTH < 0xffffull);
1156
1157         if (fid_is_igif(fid) ||
1158             seq > 0xffffffull || oid > 0xffff || fid_ver(fid) != 0) {
1159                 fid_cpu_to_be(befider, fid);
1160                 recsize = sizeof *befider;
1161         } else {
1162                 unsigned char *small_befider;
1163
1164                 small_befider = (char *)befider;
1165
1166                 small_befider[0] = seq >> 16;
1167                 small_befider[1] = seq >> 8;
1168                 small_befider[2] = seq;
1169
1170                 small_befider[3] = oid >> 8;
1171                 small_befider[4] = oid;
1172
1173                 recsize = 5;
1174         }
1175         memcpy(pack->fp_area, befider, recsize);
1176         pack->fp_len = recsize + 1;
1177 }
1178 EXPORT_SYMBOL(fid_pack);
1179
1180 int fid_unpack(const struct lu_fid_pack *pack, struct lu_fid *fid)
1181 {
1182         int result;
1183
1184         result = 0;
1185         switch (pack->fp_len) {
1186         case sizeof *fid + 1:
1187                 memcpy(fid, pack->fp_area, sizeof *fid);
1188                 fid_be_to_cpu(fid, fid);
1189                 break;
1190         case 6: {
1191                 const unsigned char *area;
1192
1193                 area = pack->fp_area;
1194                 fid->f_seq = (area[0] << 16) | (area[1] << 8) | area[2];
1195                 fid->f_oid = (area[3] << 8) | area[4];
1196                 fid->f_ver = 0;
1197                 break;
1198         }
1199         default:
1200                 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
1201                 result = -EIO;
1202         }
1203         return result;
1204 }
1205 EXPORT_SYMBOL(fid_unpack);
1206
1207 const char *lu_time_names[LU_TIME_NR] = {
1208         [LU_TIME_FIND_LOOKUP] = "find_lookup",
1209         [LU_TIME_FIND_ALLOC]  = "find_alloc",
1210         [LU_TIME_FIND_INSERT] = "find_insert"
1211 };
1212 EXPORT_SYMBOL(lu_time_names);