Whamcloud - gitweb
fix defect with list_add_tail() params.
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 #include <obd_support.h>
39 #include <lustre_disk.h>
40 #include <lustre_fid.h>
41 #include <lu_object.h>
42 #include <libcfs/list.h>
43
44 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o);
45
46 /*
47  * Decrease reference counter on object. If last reference is freed, return
48  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
49  * case, free object immediately.
50  */
51 void lu_object_put(const struct lu_context *ctxt, struct lu_object *o)
52 {
53         struct lu_object_header *top;
54         struct lu_site          *site;
55         struct lu_object        *orig;
56         int                      kill_it;
57
58         top = o->lo_header;
59         site = o->lo_dev->ld_site;
60         orig = o;
61         kill_it = 0;
62         spin_lock(&site->ls_guard);
63         if (-- top->loh_ref == 0) {
64                 /*
65                  * When last reference is released, iterate over object
66                  * layers, and notify them that object is no longer busy.
67                  */
68                 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
69                         if (o->lo_ops->loo_object_release != NULL)
70                                 o->lo_ops->loo_object_release(ctxt, o);
71                 }
72                 -- site->ls_busy;
73                 if (lu_object_is_dying(top)) {
74                         /*
75                          * If object is dying (will not be cached), removed it
76                          * from hash table and LRU.
77                          *
78                          * This is done with hash table and LRU lists
79                          * locked. As the only way to acquire first reference
80                          * to previously unreferenced object is through
81                          * hash-table lookup (lu_object_find()), or LRU
82                          * scanning (lu_site_purge()), that are done under
83                          * hash-table and LRU lock, no race with concurrent
84                          * object lookup is possible and we can safely destroy
85                          * object below.
86                          */
87                         hlist_del_init(&top->loh_hash);
88                         list_del_init(&top->loh_lru);
89                         kill_it = 1;
90                 }
91         }
92         spin_unlock(&site->ls_guard);
93         if (kill_it)
94                 /*
95                  * Object was already removed from hash and lru above, can
96                  * kill it.
97                  */
98                 lu_object_free(ctxt, orig);
99 }
100 EXPORT_SYMBOL(lu_object_put);
101
102 /*
103  * Allocate new object.
104  *
105  * This follows object creation protocol, described in the comment within
106  * struct lu_device_operations definition.
107  */
108 static struct lu_object *lu_object_alloc(const struct lu_context *ctxt,
109                                          struct lu_site *s,
110                                          const struct lu_fid *f)
111 {
112         struct lu_object *scan;
113         struct lu_object *top;
114         int clean;
115         int result;
116
117         /*
118          * Create top-level object slice. This will also create
119          * lu_object_header.
120          */
121         top = s->ls_top_dev->ld_ops->ldo_object_alloc(ctxt,
122                                                       NULL, s->ls_top_dev);
123         if (IS_ERR(top))
124                 RETURN(top);
125         s->ls_total ++;
126         /*
127          * This is the only place where object fid is assigned. It's constant
128          * after this point.
129          */
130         top->lo_header->loh_fid = *f;
131         do {
132                 /*
133                  * Call ->loo_object_init() repeatedly, until no more new
134                  * object slices are created.
135                  */
136                 clean = 1;
137                 list_for_each_entry(scan,
138                                     &top->lo_header->loh_layers, lo_linkage) {
139                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
140                                 continue;
141                         clean = 0;
142                         scan->lo_header = top->lo_header;
143                         result = scan->lo_ops->loo_object_init(ctxt, scan);
144                         if (result != 0) {
145                                 lu_object_free(ctxt, top);
146                                 RETURN(ERR_PTR(result));
147                         }
148                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
149                 }
150         } while (!clean);
151         s->ls_stats.s_created ++;
152         RETURN(top);
153 }
154
155 /*
156  * Free object.
157  */
158 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o)
159 {
160         struct list_head splice;
161         struct lu_object *scan;
162
163         /*
164          * First call ->loo_object_delete() method to release all resources.
165          */
166         list_for_each_entry_reverse(scan,
167                                     &o->lo_header->loh_layers, lo_linkage) {
168                 if (scan->lo_ops->loo_object_delete != NULL)
169                         scan->lo_ops->loo_object_delete(ctx, scan);
170         }
171         -- o->lo_dev->ld_site->ls_total;
172         /*
173          * Then, splice object layers into stand-alone list, and call
174          * ->loo_object_free() on all layers to free memory. Splice is
175          * necessary, because lu_object_header is freed together with the
176          * top-level slice.
177          */
178         INIT_LIST_HEAD(&splice);
179         list_splice_init(&o->lo_header->loh_layers, &splice);
180         while (!list_empty(&splice)) {
181                 o = container_of0(splice.next, struct lu_object, lo_linkage);
182                 list_del_init(&o->lo_linkage);
183                 LASSERT(o->lo_ops->loo_object_free != NULL);
184                 o->lo_ops->loo_object_free(ctx, o);
185         }
186 }
187
188 /*
189  * Free @nr objects from the cold end of the site LRU list.
190  */
191 void lu_site_purge(const struct lu_context *ctx, struct lu_site *s, int nr)
192 {
193         struct list_head         dispose;
194         struct lu_object_header *h;
195         struct lu_object_header *temp;
196
197         INIT_LIST_HEAD(&dispose);
198         /*
199          * Under LRU list lock, scan LRU list and move unreferenced objects to
200          * the dispose list, removing them from LRU and hash table.
201          */
202         spin_lock(&s->ls_guard);
203         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
204                 if (nr-- == 0)
205                         break;
206                 if (h->loh_ref > 0)
207                         continue;
208                 hlist_del_init(&h->loh_hash);
209                 list_move(&h->loh_lru, &dispose);
210         }
211         spin_unlock(&s->ls_guard);
212         /*
213          * Free everything on the dispose list. This is safe against races due
214          * to the reasons described in lu_object_put().
215          */
216         while (!list_empty(&dispose)) {
217                 h = container_of0(dispose.next,
218                                  struct lu_object_header, loh_lru);
219                 list_del_init(&h->loh_lru);
220                 lu_object_free(ctx, lu_object_top(h));
221                 s->ls_stats.s_lru_purged ++;
222         }
223 }
224 EXPORT_SYMBOL(lu_site_purge);
225
226 /*
227  * Object printing.
228  *
229  * Code below has to jump through certain loops to output object description
230  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
231  * composes object description from strings that are parts of _lines_ of
232  * output (i.e., strings that are not terminated by newline). This doesn't fit
233  * very well into libcfs_debug_msg() interface that assumes that each message
234  * supplied to it is a self-contained output line.
235  *
236  * To work around this, strings are collected in a temporary buffer
237  * (implemented as a value of lu_cdebug_key key), until terminating newline
238  * character is detected.
239  *
240  */
241
242 enum {
243         /*
244          * Maximal line size.
245          *
246          * XXX overflow is not handled correctly.
247          */
248         LU_CDEBUG_LINE = 256
249 };
250
251 struct lu_cdebug_data {
252         /*
253          * Temporary buffer.
254          */
255         char lck_area[LU_CDEBUG_LINE];
256 };
257
258 static void *lu_cdebug_key_init(const struct lu_context *ctx,
259                                 struct lu_context_key *key)
260 {
261         struct lu_cdebug_data *value;
262
263         OBD_ALLOC_PTR(value);
264         if (value == NULL)
265                 value = ERR_PTR(-ENOMEM);
266         return value;
267 }
268
269 static void lu_cdebug_key_fini(const struct lu_context *ctx,
270                                struct lu_context_key *key, void *data)
271 {
272         struct lu_cdebug_data *value = data;
273         OBD_FREE_PTR(value);
274 }
275
276 /*
277  * Key, holding temporary buffer. This key is registered very early by
278  * lu_global_init().
279  */
280 static struct lu_context_key lu_cdebug_key = {
281         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
282         .lct_init = lu_cdebug_key_init,
283         .lct_fini = lu_cdebug_key_fini
284 };
285
286 /*
287  * Printer function emitting messages through libcfs_debug_msg().
288  */
289 int lu_cdebug_printer(const struct lu_context *ctx,
290                       void *cookie, const char *format, ...)
291 {
292         struct lu_cdebug_print_info *info = cookie;
293         struct lu_cdebug_data       *key;
294         int used;
295         int complete;
296         va_list args;
297
298         va_start(args, format);
299
300         key = lu_context_key_get(ctx, &lu_cdebug_key);
301         LASSERT(key != NULL);
302
303         used = strlen(key->lck_area);
304         complete = format[strlen(format) - 1] == '\n';
305         /*
306          * Append new chunk to the buffer.
307          */
308         vsnprintf(key->lck_area + used,
309                   ARRAY_SIZE(key->lck_area) - used, format, args);
310         if (complete) {
311                 libcfs_debug_msg(info->lpi_subsys, info->lpi_mask,
312                                  info->lpi_file, info->lpi_fn,
313                                  info->lpi_line, "%s", key->lck_area);
314                 key->lck_area[0] = 0;
315         }
316         va_end(args);
317         return 0;
318 }
319 EXPORT_SYMBOL(lu_cdebug_printer);
320
321 /*
322  * Print object header.
323  */
324 static void lu_object_header_print(const struct lu_context *ctx,
325                                    void *cookie, lu_printer_t printer,
326                                    const struct lu_object_header *hdr)
327 {
328         (*printer)(ctx, cookie, "header@%p[%#lx, %d, "DFID"%s%s]",
329                    hdr, hdr->loh_flags, hdr->loh_ref, PFID(&hdr->loh_fid),
330                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
331                    list_empty(&hdr->loh_lru) ? "" : " lru");
332 }
333
334 /*
335  * Print human readable representation of the @o to the @printer.
336  */
337 void lu_object_print(const struct lu_context *ctx, void *cookie,
338                      lu_printer_t printer, const struct lu_object *o)
339 {
340         static const char ruler[] = "........................................";
341         struct lu_object_header *top;
342         int depth;
343
344         top = o->lo_header;
345         lu_object_header_print(ctx, cookie, printer, top);
346         (*printer)(ctx, cookie, "\n");
347         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
348                 depth = o->lo_depth + 4;
349                 LASSERT(o->lo_ops->loo_object_print != NULL);
350                 /*
351                  * print `.' @depth times.
352                  */
353                 (*printer)(ctx, cookie, "%*.*s", depth, depth, ruler);
354                 o->lo_ops->loo_object_print(ctx, cookie, printer, o);
355                 (*printer)(ctx, cookie, "\n");
356         }
357 }
358 EXPORT_SYMBOL(lu_object_print);
359
360 /*
361  * Check object consistency.
362  */
363 int lu_object_invariant(const struct lu_object *o)
364 {
365         struct lu_object_header *top;
366
367         top = o->lo_header;
368         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
369                 if (o->lo_ops->loo_object_invariant != NULL &&
370                     !o->lo_ops->loo_object_invariant(o))
371                         return 0;
372         }
373         return 1;
374 }
375 EXPORT_SYMBOL(lu_object_invariant);
376
377 static struct lu_object *htable_lookup(struct lu_site *s,
378                                        const struct hlist_head *bucket,
379                                        const struct lu_fid *f)
380 {
381         struct lu_object_header *h;
382         struct hlist_node *scan;
383
384         hlist_for_each_entry(h, scan, bucket, loh_hash) {
385                 s->ls_stats.s_cache_check ++;
386                 if (lu_fid_eq(&h->loh_fid, f) && !lu_object_is_dying(h)) {
387                         /* bump reference count... */
388                         if (h->loh_ref ++ == 0)
389                                 ++ s->ls_busy;
390                         /* and move to the head of the LRU */
391                         list_move_tail(&h->loh_lru, &s->ls_lru);
392                         s->ls_stats.s_cache_hit ++;
393                         return lu_object_top(h);
394                 }
395         }
396         s->ls_stats.s_cache_miss ++;
397         return NULL;
398 }
399
400 static __u32 fid_hash(const struct lu_fid *f)
401 {
402         /* all objects with same id and different versions will belong to same
403          * collisions list. */
404         return (fid_seq(f) - 1) * LUSTRE_SEQ_MAX_WIDTH + fid_oid(f);
405 }
406
407 /*
408  * Search cache for an object with the fid @f. If such object is found, return
409  * it. Otherwise, create new object, insert it into cache and return it. In
410  * any case, additional reference is acquired on the returned object.
411  */
412 struct lu_object *lu_object_find(const struct lu_context *ctxt,
413                                  struct lu_site *s, const struct lu_fid *f)
414 {
415         struct lu_object  *o;
416         struct lu_object  *shadow;
417         struct hlist_head *bucket;
418
419         /*
420          * This uses standard index maintenance protocol:
421          *
422          *     - search index under lock, and return object if found;
423          *     - otherwise, unlock index, allocate new object;
424          *     - lock index and search again;
425          *     - if nothing is found (usual case), insert newly created
426          *       object into index;
427          *     - otherwise (race: other thread inserted object), free
428          *       object just allocated.
429          *     - unlock index;
430          *     - return object.
431          */
432
433         bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
434         spin_lock(&s->ls_guard);
435         o = htable_lookup(s, bucket, f);
436
437         spin_unlock(&s->ls_guard);
438         if (o != NULL)
439                 return o;
440         /*
441          * Allocate new object. This may result in rather complicated
442          * operations, including fld queries, inode loading, etc.
443          */
444         o = lu_object_alloc(ctxt, s, f);
445         if (IS_ERR(o))
446                 return o;
447
448         LASSERT(lu_fid_eq(lu_object_fid(o), f));
449
450         spin_lock(&s->ls_guard);
451         shadow = htable_lookup(s, bucket, f);
452         if (shadow == NULL) {
453                 hlist_add_head(&o->lo_header->loh_hash, bucket);
454                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
455                 ++ s->ls_busy;
456                 shadow = o;
457                 o = NULL;
458         } else
459                 s->ls_stats.s_cache_race ++;
460         spin_unlock(&s->ls_guard);
461         if (o != NULL)
462                 lu_object_free(ctxt, o);
463         return shadow;
464 }
465 EXPORT_SYMBOL(lu_object_find);
466
467 enum {
468         LU_SITE_HTABLE_BITS = 8,
469         LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS),
470         LU_SITE_HTABLE_MASK = LU_SITE_HTABLE_SIZE - 1
471 };
472
473 /*
474  * Initialize site @s, with @d as the top level device.
475  */
476 int lu_site_init(struct lu_site *s, struct lu_device *top)
477 {
478         int result;
479         ENTRY;
480
481         memset(s, 0, sizeof *s);
482         spin_lock_init(&s->ls_guard);
483         CFS_INIT_LIST_HEAD(&s->ls_lru);
484         s->ls_top_dev = top;
485         top->ld_site = s;
486         lu_device_get(top);
487         /*
488          * XXX nikita: fixed size hash-table.
489          */
490         s->ls_hash_mask = LU_SITE_HTABLE_MASK;
491         OBD_ALLOC(s->ls_hash, LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
492         if (s->ls_hash != NULL) {
493                 int i;
494                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
495                         INIT_HLIST_HEAD(&s->ls_hash[i]);
496                 result = 0;
497         } else {
498                 result = -ENOMEM;
499         }
500
501         RETURN(result);
502 }
503 EXPORT_SYMBOL(lu_site_init);
504
505 /*
506  * Finalize @s and release its resources.
507  */
508 void lu_site_fini(struct lu_site *s)
509 {
510         LASSERT(list_empty(&s->ls_lru));
511         LASSERT(s->ls_total == 0);
512         LASSERT(s->ls_busy == 0);
513
514         if (s->ls_hash != NULL) {
515                 int i;
516                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
517                         LASSERT(hlist_empty(&s->ls_hash[i]));
518                 OBD_FREE(s->ls_hash,
519                          LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
520                 s->ls_hash = NULL;
521        }
522        if (s->ls_top_dev != NULL) {
523                s->ls_top_dev->ld_site = NULL;
524                lu_device_put(s->ls_top_dev);
525                s->ls_top_dev = NULL;
526        }
527  }
528 EXPORT_SYMBOL(lu_site_fini);
529
530 /*
531  * Acquire additional reference on device @d
532  */
533 void lu_device_get(struct lu_device *d)
534 {
535         atomic_inc(&d->ld_ref);
536 }
537 EXPORT_SYMBOL(lu_device_get);
538
539 /*
540  * Release reference on device @d.
541  */
542 void lu_device_put(struct lu_device *d)
543 {
544         atomic_dec(&d->ld_ref);
545 }
546 EXPORT_SYMBOL(lu_device_put);
547
548 /*
549  * Initialize device @d of type @t.
550  */
551 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
552 {
553         memset(d, 0, sizeof *d);
554         atomic_set(&d->ld_ref, 0);
555         d->ld_type = t;
556         return 0;
557 }
558 EXPORT_SYMBOL(lu_device_init);
559
560 /*
561  * Finalize device @d.
562  */
563 void lu_device_fini(struct lu_device *d)
564 {
565         LASSERT(atomic_read(&d->ld_ref) == 0);
566 }
567 EXPORT_SYMBOL(lu_device_fini);
568
569 /*
570  * Initialize object @o that is part of compound object @h and was created by
571  * device @d.
572  */
573 int lu_object_init(struct lu_object *o,
574                    struct lu_object_header *h, struct lu_device *d)
575 {
576         memset(o, 0, sizeof *o);
577         o->lo_header = h;
578         o->lo_dev    = d;
579         lu_device_get(d);
580         CFS_INIT_LIST_HEAD(&o->lo_linkage);
581         return 0;
582 }
583 EXPORT_SYMBOL(lu_object_init);
584
585 /*
586  * Finalize object and release its resources.
587  */
588 void lu_object_fini(struct lu_object *o)
589 {
590         LASSERT(list_empty(&o->lo_linkage));
591
592         if (o->lo_dev != NULL) {
593                 lu_device_put(o->lo_dev);
594                 o->lo_dev = NULL;
595         }
596 }
597 EXPORT_SYMBOL(lu_object_fini);
598
599 /*
600  * Add object @o as first layer of compound object @h
601  *
602  * This is typically called by the ->ldo_object_alloc() method of top-level
603  * device.
604  */
605 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
606 {
607         list_move(&o->lo_linkage, &h->loh_layers);
608 }
609 EXPORT_SYMBOL(lu_object_add_top);
610
611 /*
612  * Add object @o as a layer of compound object, going after @before.1
613  *
614  * This is typically called by the ->ldo_object_alloc() method of
615  * @before->lo_dev.
616  */
617 void lu_object_add(struct lu_object *before, struct lu_object *o)
618 {
619         list_move(&o->lo_linkage, &before->lo_linkage);
620 }
621 EXPORT_SYMBOL(lu_object_add);
622
623 /*
624  * Initialize compound object.
625  */
626 int lu_object_header_init(struct lu_object_header *h)
627 {
628         memset(h, 0, sizeof *h);
629         h->loh_ref = 1;
630         INIT_HLIST_NODE(&h->loh_hash);
631         CFS_INIT_LIST_HEAD(&h->loh_lru);
632         CFS_INIT_LIST_HEAD(&h->loh_layers);
633         return 0;
634 }
635 EXPORT_SYMBOL(lu_object_header_init);
636
637 /*
638  * Finalize compound object.
639  */
640 void lu_object_header_fini(struct lu_object_header *h)
641 {
642         LASSERT(list_empty(&h->loh_layers));
643         LASSERT(list_empty(&h->loh_lru));
644         LASSERT(hlist_unhashed(&h->loh_hash));
645 }
646 EXPORT_SYMBOL(lu_object_header_fini);
647
648 /*
649  * Given a compound object, find its slice, corresponding to the device type
650  * @dtype.
651  */
652 struct lu_object *lu_object_locate(struct lu_object_header *h,
653                                    struct lu_device_type *dtype)
654 {
655         struct lu_object *o;
656
657         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
658                 if (o->lo_dev->ld_type == dtype)
659                         return o;
660         }
661         return NULL;
662 }
663 EXPORT_SYMBOL(lu_object_locate);
664
665 enum {
666         /*
667          * Maximal number of tld slots.
668          */
669         LU_CONTEXT_KEY_NR = 16
670 };
671
672 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
673
674 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
675
676 /*
677  * Register new key.
678  */
679 int lu_context_key_register(struct lu_context_key *key)
680 {
681         int result;
682         int i;
683
684         LASSERT(key->lct_init != NULL);
685         LASSERT(key->lct_fini != NULL);
686         LASSERT(key->lct_tags != 0);
687
688         result = -ENFILE;
689         spin_lock(&lu_keys_guard);
690         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
691                 if (lu_keys[i] == NULL) {
692                         key->lct_index = i;
693                         key->lct_used = 1;
694                         lu_keys[i] = key;
695                         result = 0;
696                         break;
697                 }
698         }
699         spin_unlock(&lu_keys_guard);
700         return result;
701 }
702 EXPORT_SYMBOL(lu_context_key_register);
703
704 /*
705  * Deregister key.
706  */
707 void lu_context_key_degister(struct lu_context_key *key)
708 {
709         LASSERT(key->lct_used >= 1);
710         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
711
712         if (key->lct_used > 1)
713                 CERROR("key has instances.\n");
714         spin_lock(&lu_keys_guard);
715         lu_keys[key->lct_index] = NULL;
716         spin_unlock(&lu_keys_guard);
717 }
718 EXPORT_SYMBOL(lu_context_key_degister);
719
720 /*
721  * Return value associated with key @key in context @ctx.
722  */
723 void *lu_context_key_get(const struct lu_context *ctx,
724                          struct lu_context_key *key)
725 {
726         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
727         return ctx->lc_value[key->lct_index];
728 }
729 EXPORT_SYMBOL(lu_context_key_get);
730
731 static void keys_fini(struct lu_context *ctx)
732 {
733         int i;
734
735         if (ctx->lc_value != NULL) {
736                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
737                         if (ctx->lc_value[i] != NULL) {
738                                 struct lu_context_key *key;
739
740                                 key = lu_keys[i];
741                                 LASSERT(key != NULL);
742                                 LASSERT(key->lct_fini != NULL);
743                                 LASSERT(key->lct_used > 1);
744
745                                 key->lct_fini(ctx, key, ctx->lc_value[i]);
746                                 key->lct_used--;
747                                 ctx->lc_value[i] = NULL;
748                         }
749                 }
750                 OBD_FREE(ctx->lc_value,
751                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
752                 ctx->lc_value = NULL;
753         }
754 }
755
756 static int keys_init(struct lu_context *ctx)
757 {
758         int i;
759         int result;
760
761         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
762         if (ctx->lc_value != NULL) {
763                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
764                         struct lu_context_key *key;
765
766                         key = lu_keys[i];
767                         if (key != NULL && key->lct_tags & ctx->lc_tags) {
768                                 void *value;
769
770                                 LASSERT(key->lct_init != NULL);
771                                 LASSERT(key->lct_index == i);
772
773                                 value = key->lct_init(ctx, key);
774                                 if (IS_ERR(value)) {
775                                         keys_fini(ctx);
776                                         return PTR_ERR(value);
777                                 }
778                                 key->lct_used++;
779                                 ctx->lc_value[i] = value;
780                         }
781                 }
782                 result = 0;
783         } else
784                 result = -ENOMEM;
785         return result;
786 }
787
788 /*
789  * Initialize context data-structure. Create values for all keys.
790  */
791 int lu_context_init(struct lu_context *ctx, __u32 tags)
792 {
793         memset(ctx, 0, sizeof *ctx);
794         ctx->lc_tags = tags;
795         keys_init(ctx);
796         return 0;
797 }
798 EXPORT_SYMBOL(lu_context_init);
799
800 /*
801  * Finalize context data-structure. Destroy key values.
802  */
803 void lu_context_fini(struct lu_context *ctx)
804 {
805         keys_fini(ctx);
806 }
807 EXPORT_SYMBOL(lu_context_fini);
808
809 /*
810  * Called before entering context.
811  */
812 void lu_context_enter(struct lu_context *ctx)
813 {
814 }
815 EXPORT_SYMBOL(lu_context_enter);
816
817 /*
818  * Called after exiting from @ctx
819  */
820 void lu_context_exit(struct lu_context *ctx)
821 {
822         int i;
823
824         if (ctx->lc_value != NULL) {
825                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
826                         if (ctx->lc_value[i] != NULL) {
827                                 struct lu_context_key *key;
828
829                                 key = lu_keys[i];
830                                 LASSERT(key != NULL);
831                                 if (key->lct_exit != NULL)
832                                         key->lct_exit(ctx,
833                                                       key, ctx->lc_value[i]);
834                         }
835                 }
836         }
837 }
838 EXPORT_SYMBOL(lu_context_exit);
839
840 /*
841  * Initialization of global lu_* data.
842  */
843 int lu_global_init(void)
844 {
845         int result;
846
847         result = lu_context_key_register(&lu_cdebug_key);
848         return result;
849 }
850
851 /*
852  * Dual to lu_global_init().
853  */
854 void lu_global_fini(void)
855 {
856         lu_context_key_degister(&lu_cdebug_key);
857 }