Whamcloud - gitweb
lu: add ->loo_object_start() method called late during object allocation. It can...
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 #include <obd_support.h>
39 #include <lustre_disk.h>
40 #include <lustre_fid.h>
41 #include <lu_object.h>
42 #include <libcfs/list.h>
43
44 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o);
45
46 /*
47  * Decrease reference counter on object. If last reference is freed, return
48  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
49  * case, free object immediately.
50  */
51 void lu_object_put(const struct lu_context *ctxt, struct lu_object *o)
52 {
53         struct lu_object_header *top;
54         struct lu_site          *site;
55         struct lu_object        *orig;
56         int                      kill_it;
57
58         top = o->lo_header;
59         site = o->lo_dev->ld_site;
60         orig = o;
61         kill_it = 0;
62         spin_lock(&site->ls_guard);
63         if (-- top->loh_ref == 0) {
64                 /*
65                  * When last reference is released, iterate over object
66                  * layers, and notify them that object is no longer busy.
67                  */
68                 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
69                         if (o->lo_ops->loo_object_release != NULL)
70                                 o->lo_ops->loo_object_release(ctxt, o);
71                 }
72                 -- site->ls_busy;
73                 if (lu_object_is_dying(top)) {
74                         /*
75                          * If object is dying (will not be cached), removed it
76                          * from hash table and LRU.
77                          *
78                          * This is done with hash table and LRU lists
79                          * locked. As the only way to acquire first reference
80                          * to previously unreferenced object is through
81                          * hash-table lookup (lu_object_find()), or LRU
82                          * scanning (lu_site_purge()), that are done under
83                          * hash-table and LRU lock, no race with concurrent
84                          * object lookup is possible and we can safely destroy
85                          * object below.
86                          */
87                         hlist_del_init(&top->loh_hash);
88                         list_del_init(&top->loh_lru);
89                         kill_it = 1;
90                 }
91         }
92         spin_unlock(&site->ls_guard);
93         if (kill_it)
94                 /*
95                  * Object was already removed from hash and lru above, can
96                  * kill it.
97                  */
98                 lu_object_free(ctxt, orig);
99 }
100 EXPORT_SYMBOL(lu_object_put);
101
102 /*
103  * Allocate new object.
104  *
105  * This follows object creation protocol, described in the comment within
106  * struct lu_device_operations definition.
107  */
108 static struct lu_object *lu_object_alloc(const struct lu_context *ctxt,
109                                          struct lu_site *s,
110                                          const struct lu_fid *f)
111 {
112         struct lu_object *scan;
113         struct lu_object *top;
114         struct list_head *layers;
115         int clean;
116         int result;
117
118         /*
119          * Create top-level object slice. This will also create
120          * lu_object_header.
121          */
122         top = s->ls_top_dev->ld_ops->ldo_object_alloc(ctxt,
123                                                       NULL, s->ls_top_dev);
124         if (IS_ERR(top))
125                 RETURN(top);
126         s->ls_total ++;
127         /*
128          * This is the only place where object fid is assigned. It's constant
129          * after this point.
130          */
131         top->lo_header->loh_fid = *f;
132         layers = &top->lo_header->loh_layers;
133         do {
134                 /*
135                  * Call ->loo_object_init() repeatedly, until no more new
136                  * object slices are created.
137                  */
138                 clean = 1;
139                 list_for_each_entry(scan, layers, lo_linkage) {
140                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
141                                 continue;
142                         clean = 0;
143                         scan->lo_header = top->lo_header;
144                         result = scan->lo_ops->loo_object_init(ctxt, scan);
145                         if (result != 0) {
146                                 lu_object_free(ctxt, top);
147                                 RETURN(ERR_PTR(result));
148                         }
149                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
150                 }
151         } while (!clean);
152
153         list_for_each_entry_reverse(scan, layers, lo_linkage) {
154                 if (scan->lo_ops->loo_object_start != NULL) {
155                         result = scan->lo_ops->loo_object_start(ctxt, scan);
156                         if (result != 0) {
157                                 lu_object_free(ctxt, top);
158                                 RETURN(ERR_PTR(result));
159                         }
160                 }
161         }
162
163         s->ls_stats.s_created ++;
164         RETURN(top);
165 }
166
167 /*
168  * Free object.
169  */
170 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o)
171 {
172         struct list_head splice;
173         struct lu_object *scan;
174
175         /*
176          * First call ->loo_object_delete() method to release all resources.
177          */
178         list_for_each_entry_reverse(scan,
179                                     &o->lo_header->loh_layers, lo_linkage) {
180                 if (scan->lo_ops->loo_object_delete != NULL)
181                         scan->lo_ops->loo_object_delete(ctx, scan);
182         }
183         -- o->lo_dev->ld_site->ls_total;
184         /*
185          * Then, splice object layers into stand-alone list, and call
186          * ->loo_object_free() on all layers to free memory. Splice is
187          * necessary, because lu_object_header is freed together with the
188          * top-level slice.
189          */
190         INIT_LIST_HEAD(&splice);
191         list_splice_init(&o->lo_header->loh_layers, &splice);
192         while (!list_empty(&splice)) {
193                 o = container_of0(splice.next, struct lu_object, lo_linkage);
194                 list_del_init(&o->lo_linkage);
195                 LASSERT(o->lo_ops->loo_object_free != NULL);
196                 o->lo_ops->loo_object_free(ctx, o);
197         }
198 }
199
200 /*
201  * Free @nr objects from the cold end of the site LRU list.
202  */
203 void lu_site_purge(const struct lu_context *ctx, struct lu_site *s, int nr)
204 {
205         struct list_head         dispose;
206         struct lu_object_header *h;
207         struct lu_object_header *temp;
208
209         INIT_LIST_HEAD(&dispose);
210         /*
211          * Under LRU list lock, scan LRU list and move unreferenced objects to
212          * the dispose list, removing them from LRU and hash table.
213          */
214         spin_lock(&s->ls_guard);
215         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
216                 if (nr-- == 0)
217                         break;
218                 if (h->loh_ref > 0)
219                         continue;
220                 hlist_del_init(&h->loh_hash);
221                 list_move(&h->loh_lru, &dispose);
222         }
223         spin_unlock(&s->ls_guard);
224         /*
225          * Free everything on the dispose list. This is safe against races due
226          * to the reasons described in lu_object_put().
227          */
228         while (!list_empty(&dispose)) {
229                 h = container_of0(dispose.next,
230                                  struct lu_object_header, loh_lru);
231                 list_del_init(&h->loh_lru);
232                 lu_object_free(ctx, lu_object_top(h));
233                 s->ls_stats.s_lru_purged ++;
234         }
235 }
236 EXPORT_SYMBOL(lu_site_purge);
237
238 /*
239  * Object printing.
240  *
241  * Code below has to jump through certain loops to output object description
242  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
243  * composes object description from strings that are parts of _lines_ of
244  * output (i.e., strings that are not terminated by newline). This doesn't fit
245  * very well into libcfs_debug_msg() interface that assumes that each message
246  * supplied to it is a self-contained output line.
247  *
248  * To work around this, strings are collected in a temporary buffer
249  * (implemented as a value of lu_cdebug_key key), until terminating newline
250  * character is detected.
251  *
252  */
253
254 enum {
255         /*
256          * Maximal line size.
257          *
258          * XXX overflow is not handled correctly.
259          */
260         LU_CDEBUG_LINE = 256
261 };
262
263 struct lu_cdebug_data {
264         /*
265          * Temporary buffer.
266          */
267         char lck_area[LU_CDEBUG_LINE];
268 };
269
270 static void *lu_cdebug_key_init(const struct lu_context *ctx,
271                                 struct lu_context_key *key)
272 {
273         struct lu_cdebug_data *value;
274
275         OBD_ALLOC_PTR(value);
276         if (value == NULL)
277                 value = ERR_PTR(-ENOMEM);
278         return value;
279 }
280
281 static void lu_cdebug_key_fini(const struct lu_context *ctx,
282                                struct lu_context_key *key, void *data)
283 {
284         struct lu_cdebug_data *value = data;
285         OBD_FREE_PTR(value);
286 }
287
288 /*
289  * Key, holding temporary buffer. This key is registered very early by
290  * lu_global_init().
291  */
292 static struct lu_context_key lu_cdebug_key = {
293         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
294         .lct_init = lu_cdebug_key_init,
295         .lct_fini = lu_cdebug_key_fini
296 };
297
298 /*
299  * Printer function emitting messages through libcfs_debug_msg().
300  */
301 int lu_cdebug_printer(const struct lu_context *ctx,
302                       void *cookie, const char *format, ...)
303 {
304         struct lu_cdebug_print_info *info = cookie;
305         struct lu_cdebug_data       *key;
306         int used;
307         int complete;
308         va_list args;
309
310         va_start(args, format);
311
312         key = lu_context_key_get(ctx, &lu_cdebug_key);
313         LASSERT(key != NULL);
314
315         used = strlen(key->lck_area);
316         complete = format[strlen(format) - 1] == '\n';
317         /*
318          * Append new chunk to the buffer.
319          */
320         vsnprintf(key->lck_area + used,
321                   ARRAY_SIZE(key->lck_area) - used, format, args);
322         if (complete) {
323                 libcfs_debug_msg(info->lpi_subsys, info->lpi_mask,
324                                  info->lpi_file, info->lpi_fn,
325                                  info->lpi_line, "%s", key->lck_area);
326                 key->lck_area[0] = 0;
327         }
328         va_end(args);
329         return 0;
330 }
331 EXPORT_SYMBOL(lu_cdebug_printer);
332
333 /*
334  * Print object header.
335  */
336 static void lu_object_header_print(const struct lu_context *ctx,
337                                    void *cookie, lu_printer_t printer,
338                                    const struct lu_object_header *hdr)
339 {
340         (*printer)(ctx, cookie, "header@%p[%#lx, %d, "DFID"%s%s]",
341                    hdr, hdr->loh_flags, hdr->loh_ref, PFID(&hdr->loh_fid),
342                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
343                    list_empty(&hdr->loh_lru) ? "" : " lru");
344 }
345
346 /*
347  * Print human readable representation of the @o to the @printer.
348  */
349 void lu_object_print(const struct lu_context *ctx, void *cookie,
350                      lu_printer_t printer, const struct lu_object *o)
351 {
352         static const char ruler[] = "........................................";
353         struct lu_object_header *top;
354         int depth;
355
356         top = o->lo_header;
357         lu_object_header_print(ctx, cookie, printer, top);
358         (*printer)(ctx, cookie, "\n");
359         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
360                 depth = o->lo_depth + 4;
361                 LASSERT(o->lo_ops->loo_object_print != NULL);
362                 /*
363                  * print `.' @depth times.
364                  */
365                 (*printer)(ctx, cookie, "%*.*s", depth, depth, ruler);
366                 o->lo_ops->loo_object_print(ctx, cookie, printer, o);
367                 (*printer)(ctx, cookie, "\n");
368         }
369 }
370 EXPORT_SYMBOL(lu_object_print);
371
372 /*
373  * Check object consistency.
374  */
375 int lu_object_invariant(const struct lu_object *o)
376 {
377         struct lu_object_header *top;
378
379         top = o->lo_header;
380         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
381                 if (o->lo_ops->loo_object_invariant != NULL &&
382                     !o->lo_ops->loo_object_invariant(o))
383                         return 0;
384         }
385         return 1;
386 }
387 EXPORT_SYMBOL(lu_object_invariant);
388
389 static struct lu_object *htable_lookup(struct lu_site *s,
390                                        const struct hlist_head *bucket,
391                                        const struct lu_fid *f)
392 {
393         struct lu_object_header *h;
394         struct hlist_node *scan;
395
396         hlist_for_each_entry(h, scan, bucket, loh_hash) {
397                 s->ls_stats.s_cache_check ++;
398                 if (lu_fid_eq(&h->loh_fid, f) && !lu_object_is_dying(h)) {
399                         /* bump reference count... */
400                         if (h->loh_ref ++ == 0)
401                                 ++ s->ls_busy;
402                         /* and move to the head of the LRU */
403                         list_move_tail(&h->loh_lru, &s->ls_lru);
404                         s->ls_stats.s_cache_hit ++;
405                         return lu_object_top(h);
406                 }
407         }
408         s->ls_stats.s_cache_miss ++;
409         return NULL;
410 }
411
412 static __u32 fid_hash(const struct lu_fid *f)
413 {
414         /* all objects with same id and different versions will belong to same
415          * collisions list. */
416         return (fid_seq(f) - 1) * LUSTRE_SEQ_MAX_WIDTH + fid_oid(f);
417 }
418
419 /*
420  * Search cache for an object with the fid @f. If such object is found, return
421  * it. Otherwise, create new object, insert it into cache and return it. In
422  * any case, additional reference is acquired on the returned object.
423  */
424 struct lu_object *lu_object_find(const struct lu_context *ctxt,
425                                  struct lu_site *s, const struct lu_fid *f)
426 {
427         struct lu_object  *o;
428         struct lu_object  *shadow;
429         struct hlist_head *bucket;
430
431         /*
432          * This uses standard index maintenance protocol:
433          *
434          *     - search index under lock, and return object if found;
435          *     - otherwise, unlock index, allocate new object;
436          *     - lock index and search again;
437          *     - if nothing is found (usual case), insert newly created
438          *       object into index;
439          *     - otherwise (race: other thread inserted object), free
440          *       object just allocated.
441          *     - unlock index;
442          *     - return object.
443          */
444
445         bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
446         spin_lock(&s->ls_guard);
447         o = htable_lookup(s, bucket, f);
448
449         spin_unlock(&s->ls_guard);
450         if (o != NULL)
451                 return o;
452         /*
453          * Allocate new object. This may result in rather complicated
454          * operations, including fld queries, inode loading, etc.
455          */
456         o = lu_object_alloc(ctxt, s, f);
457         if (IS_ERR(o))
458                 return o;
459
460         LASSERT(lu_fid_eq(lu_object_fid(o), f));
461
462         spin_lock(&s->ls_guard);
463         shadow = htable_lookup(s, bucket, f);
464         if (shadow == NULL) {
465                 hlist_add_head(&o->lo_header->loh_hash, bucket);
466                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
467                 ++ s->ls_busy;
468                 shadow = o;
469                 o = NULL;
470         } else
471                 s->ls_stats.s_cache_race ++;
472         spin_unlock(&s->ls_guard);
473         if (o != NULL)
474                 lu_object_free(ctxt, o);
475         return shadow;
476 }
477 EXPORT_SYMBOL(lu_object_find);
478
479 enum {
480         LU_SITE_HTABLE_BITS = 8,
481         LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS),
482         LU_SITE_HTABLE_MASK = LU_SITE_HTABLE_SIZE - 1
483 };
484
485 /*
486  * Initialize site @s, with @d as the top level device.
487  */
488 int lu_site_init(struct lu_site *s, struct lu_device *top)
489 {
490         int result;
491         ENTRY;
492
493         memset(s, 0, sizeof *s);
494         spin_lock_init(&s->ls_guard);
495         CFS_INIT_LIST_HEAD(&s->ls_lru);
496         s->ls_top_dev = top;
497         top->ld_site = s;
498         lu_device_get(top);
499         /*
500          * XXX nikita: fixed size hash-table.
501          */
502         s->ls_hash_mask = LU_SITE_HTABLE_MASK;
503         OBD_ALLOC(s->ls_hash, LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
504         if (s->ls_hash != NULL) {
505                 int i;
506                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
507                         INIT_HLIST_HEAD(&s->ls_hash[i]);
508                 result = 0;
509         } else {
510                 result = -ENOMEM;
511         }
512
513         RETURN(result);
514 }
515 EXPORT_SYMBOL(lu_site_init);
516
517 /*
518  * Finalize @s and release its resources.
519  */
520 void lu_site_fini(struct lu_site *s)
521 {
522         LASSERT(list_empty(&s->ls_lru));
523         LASSERT(s->ls_total == 0);
524         LASSERT(s->ls_busy == 0);
525
526         if (s->ls_hash != NULL) {
527                 int i;
528                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
529                         LASSERT(hlist_empty(&s->ls_hash[i]));
530                 OBD_FREE(s->ls_hash,
531                          LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
532                 s->ls_hash = NULL;
533        }
534        if (s->ls_top_dev != NULL) {
535                s->ls_top_dev->ld_site = NULL;
536                lu_device_put(s->ls_top_dev);
537                s->ls_top_dev = NULL;
538        }
539  }
540 EXPORT_SYMBOL(lu_site_fini);
541
542 /*
543  * Acquire additional reference on device @d
544  */
545 void lu_device_get(struct lu_device *d)
546 {
547         atomic_inc(&d->ld_ref);
548 }
549 EXPORT_SYMBOL(lu_device_get);
550
551 /*
552  * Release reference on device @d.
553  */
554 void lu_device_put(struct lu_device *d)
555 {
556         atomic_dec(&d->ld_ref);
557 }
558 EXPORT_SYMBOL(lu_device_put);
559
560 /*
561  * Initialize device @d of type @t.
562  */
563 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
564 {
565         memset(d, 0, sizeof *d);
566         atomic_set(&d->ld_ref, 0);
567         d->ld_type = t;
568         return 0;
569 }
570 EXPORT_SYMBOL(lu_device_init);
571
572 /*
573  * Finalize device @d.
574  */
575 void lu_device_fini(struct lu_device *d)
576 {
577         LASSERT(atomic_read(&d->ld_ref) == 0);
578 }
579 EXPORT_SYMBOL(lu_device_fini);
580
581 /*
582  * Initialize object @o that is part of compound object @h and was created by
583  * device @d.
584  */
585 int lu_object_init(struct lu_object *o,
586                    struct lu_object_header *h, struct lu_device *d)
587 {
588         memset(o, 0, sizeof *o);
589         o->lo_header = h;
590         o->lo_dev    = d;
591         lu_device_get(d);
592         CFS_INIT_LIST_HEAD(&o->lo_linkage);
593         return 0;
594 }
595 EXPORT_SYMBOL(lu_object_init);
596
597 /*
598  * Finalize object and release its resources.
599  */
600 void lu_object_fini(struct lu_object *o)
601 {
602         LASSERT(list_empty(&o->lo_linkage));
603
604         if (o->lo_dev != NULL) {
605                 lu_device_put(o->lo_dev);
606                 o->lo_dev = NULL;
607         }
608 }
609 EXPORT_SYMBOL(lu_object_fini);
610
611 /*
612  * Add object @o as first layer of compound object @h
613  *
614  * This is typically called by the ->ldo_object_alloc() method of top-level
615  * device.
616  */
617 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
618 {
619         list_move(&o->lo_linkage, &h->loh_layers);
620 }
621 EXPORT_SYMBOL(lu_object_add_top);
622
623 /*
624  * Add object @o as a layer of compound object, going after @before.1
625  *
626  * This is typically called by the ->ldo_object_alloc() method of
627  * @before->lo_dev.
628  */
629 void lu_object_add(struct lu_object *before, struct lu_object *o)
630 {
631         list_move(&o->lo_linkage, &before->lo_linkage);
632 }
633 EXPORT_SYMBOL(lu_object_add);
634
635 /*
636  * Initialize compound object.
637  */
638 int lu_object_header_init(struct lu_object_header *h)
639 {
640         memset(h, 0, sizeof *h);
641         h->loh_ref = 1;
642         INIT_HLIST_NODE(&h->loh_hash);
643         CFS_INIT_LIST_HEAD(&h->loh_lru);
644         CFS_INIT_LIST_HEAD(&h->loh_layers);
645         return 0;
646 }
647 EXPORT_SYMBOL(lu_object_header_init);
648
649 /*
650  * Finalize compound object.
651  */
652 void lu_object_header_fini(struct lu_object_header *h)
653 {
654         LASSERT(list_empty(&h->loh_layers));
655         LASSERT(list_empty(&h->loh_lru));
656         LASSERT(hlist_unhashed(&h->loh_hash));
657 }
658 EXPORT_SYMBOL(lu_object_header_fini);
659
660 /*
661  * Given a compound object, find its slice, corresponding to the device type
662  * @dtype.
663  */
664 struct lu_object *lu_object_locate(struct lu_object_header *h,
665                                    struct lu_device_type *dtype)
666 {
667         struct lu_object *o;
668
669         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
670                 if (o->lo_dev->ld_type == dtype)
671                         return o;
672         }
673         return NULL;
674 }
675 EXPORT_SYMBOL(lu_object_locate);
676
677 enum {
678         /*
679          * Maximal number of tld slots.
680          */
681         LU_CONTEXT_KEY_NR = 16
682 };
683
684 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
685
686 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
687
688 /*
689  * Register new key.
690  */
691 int lu_context_key_register(struct lu_context_key *key)
692 {
693         int result;
694         int i;
695
696         LASSERT(key->lct_init != NULL);
697         LASSERT(key->lct_fini != NULL);
698         LASSERT(key->lct_tags != 0);
699
700         result = -ENFILE;
701         spin_lock(&lu_keys_guard);
702         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
703                 if (lu_keys[i] == NULL) {
704                         key->lct_index = i;
705                         key->lct_used = 1;
706                         lu_keys[i] = key;
707                         result = 0;
708                         break;
709                 }
710         }
711         spin_unlock(&lu_keys_guard);
712         return result;
713 }
714 EXPORT_SYMBOL(lu_context_key_register);
715
716 /*
717  * Deregister key.
718  */
719 void lu_context_key_degister(struct lu_context_key *key)
720 {
721         LASSERT(key->lct_used >= 1);
722         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
723
724         if (key->lct_used > 1)
725                 CERROR("key has instances.\n");
726         spin_lock(&lu_keys_guard);
727         lu_keys[key->lct_index] = NULL;
728         spin_unlock(&lu_keys_guard);
729 }
730 EXPORT_SYMBOL(lu_context_key_degister);
731
732 /*
733  * Return value associated with key @key in context @ctx.
734  */
735 void *lu_context_key_get(const struct lu_context *ctx,
736                          struct lu_context_key *key)
737 {
738         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
739         return ctx->lc_value[key->lct_index];
740 }
741 EXPORT_SYMBOL(lu_context_key_get);
742
743 static void keys_fini(struct lu_context *ctx)
744 {
745         int i;
746
747         if (ctx->lc_value != NULL) {
748                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
749                         if (ctx->lc_value[i] != NULL) {
750                                 struct lu_context_key *key;
751
752                                 key = lu_keys[i];
753                                 LASSERT(key != NULL);
754                                 LASSERT(key->lct_fini != NULL);
755                                 LASSERT(key->lct_used > 1);
756
757                                 key->lct_fini(ctx, key, ctx->lc_value[i]);
758                                 key->lct_used--;
759                                 ctx->lc_value[i] = NULL;
760                         }
761                 }
762                 OBD_FREE(ctx->lc_value,
763                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
764                 ctx->lc_value = NULL;
765         }
766 }
767
768 static int keys_init(struct lu_context *ctx)
769 {
770         int i;
771         int result;
772
773         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
774         if (ctx->lc_value != NULL) {
775                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
776                         struct lu_context_key *key;
777
778                         key = lu_keys[i];
779                         if (key != NULL && key->lct_tags & ctx->lc_tags) {
780                                 void *value;
781
782                                 LASSERT(key->lct_init != NULL);
783                                 LASSERT(key->lct_index == i);
784
785                                 value = key->lct_init(ctx, key);
786                                 if (IS_ERR(value)) {
787                                         keys_fini(ctx);
788                                         return PTR_ERR(value);
789                                 }
790                                 key->lct_used++;
791                                 ctx->lc_value[i] = value;
792                         }
793                 }
794                 result = 0;
795         } else
796                 result = -ENOMEM;
797         return result;
798 }
799
800 /*
801  * Initialize context data-structure. Create values for all keys.
802  */
803 int lu_context_init(struct lu_context *ctx, __u32 tags)
804 {
805         memset(ctx, 0, sizeof *ctx);
806         ctx->lc_tags = tags;
807         keys_init(ctx);
808         return 0;
809 }
810 EXPORT_SYMBOL(lu_context_init);
811
812 /*
813  * Finalize context data-structure. Destroy key values.
814  */
815 void lu_context_fini(struct lu_context *ctx)
816 {
817         keys_fini(ctx);
818 }
819 EXPORT_SYMBOL(lu_context_fini);
820
821 /*
822  * Called before entering context.
823  */
824 void lu_context_enter(struct lu_context *ctx)
825 {
826 }
827 EXPORT_SYMBOL(lu_context_enter);
828
829 /*
830  * Called after exiting from @ctx
831  */
832 void lu_context_exit(struct lu_context *ctx)
833 {
834         int i;
835
836         if (ctx->lc_value != NULL) {
837                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
838                         if (ctx->lc_value[i] != NULL) {
839                                 struct lu_context_key *key;
840
841                                 key = lu_keys[i];
842                                 LASSERT(key != NULL);
843                                 if (key->lct_exit != NULL)
844                                         key->lct_exit(ctx,
845                                                       key, ctx->lc_value[i]);
846                         }
847                 }
848         }
849 }
850 EXPORT_SYMBOL(lu_context_exit);
851
852 /*
853  * Initialization of global lu_* data.
854  */
855 int lu_global_init(void)
856 {
857         int result;
858
859         result = lu_context_key_register(&lu_cdebug_key);
860         return result;
861 }
862
863 /*
864  * Dual to lu_global_init().
865  */
866 void lu_global_fini(void)
867 {
868         lu_context_key_degister(&lu_cdebug_key);
869 }