Whamcloud - gitweb
- merge with 1_5,some fixes.
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 7ece821..e3340fb 100644 (file)
@@ -4,6 +4,7 @@
  * Lustre Object.
  *
  *  Copyright (C) 2006 Cluster File Systems, Inc.
+ *   Author: Nikita Danilov <nikita@clusterfs.com>
  *
  *   This file is part of the Lustre file system, http://www.lustre.org
  *   Lustre is a trademark of Cluster File Systems, Inc.
 
 #include <linux/seq_file.h>
 #include <linux/module.h>
-#include <linux/obd_support.h>
-#include <linux/lu_object.h>
-
+#include <obd_support.h>
+#include <lustre_disk.h>
+#include <lustre_fid.h>
+#include <lu_object.h>
 #include <libcfs/list.h>
 
-static void lu_object_free(struct lu_context *ctx, struct lu_object *o);
+static void lu_object_free(const struct lu_context *ctx, struct lu_object *o);
 
-void lu_object_put(struct lu_context *ctxt, struct lu_object *o)
+/*
+ * Decrease reference counter on object. If last reference is freed, return
+ * object to the cache, unless lu_object_is_dying(o) holds. In the latter
+ * case, free object immediately.
+ */
+void lu_object_put(const struct lu_context *ctxt, struct lu_object *o)
 {
         struct lu_object_header *top;
         struct lu_site          *site;
+        struct lu_object        *orig;
+        int                      kill_it;
 
         top = o->lo_header;
         site = o->lo_dev->ld_site;
+        orig = o;
+        kill_it = 0;
         spin_lock(&site->ls_guard);
         if (-- top->loh_ref == 0) {
+                /*
+                 * When last reference is released, iterate over object
+                 * layers, and notify them that object is no longer busy.
+                 */
                 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
-                        if (lu_object_ops(o)->ldo_object_release != NULL)
-                                lu_object_ops(o)->ldo_object_release(ctxt, o);
+                        if (o->lo_ops->loo_object_release != NULL)
+                                o->lo_ops->loo_object_release(ctxt, o);
                 }
                 -- site->ls_busy;
                 if (lu_object_is_dying(top)) {
+                        /*
+                         * If object is dying (will not be cached), removed it
+                         * from hash table and LRU.
+                         *
+                         * This is done with hash table and LRU lists
+                         * locked. As the only way to acquire first reference
+                         * to previously unreferenced object is through
+                         * hash-table lookup (lu_object_find()), or LRU
+                         * scanning (lu_site_purge()), that are done under
+                         * hash-table and LRU lock, no race with concurrent
+                         * object lookup is possible and we can safely destroy
+                         * object below.
+                         */
                         hlist_del_init(&top->loh_hash);
                         list_del_init(&top->loh_lru);
+                        kill_it = 1;
                 }
         }
         spin_unlock(&site->ls_guard);
-        if (lu_object_is_dying(top))
+        if (kill_it)
                 /*
                  * Object was already removed from hash and lru above, can
                  * kill it.
                  */
-                lu_object_free(ctxt, o);
+                lu_object_free(ctxt, orig);
 }
 EXPORT_SYMBOL(lu_object_put);
 
-struct lu_object *lu_object_alloc(struct lu_context *ctxt,
-                                  struct lu_site *s, const struct lu_fid *f)
+/*
+ * Allocate new object.
+ *
+ * This follows object creation protocol, described in the comment within
+ * struct lu_device_operations definition.
+ */
+static struct lu_object *lu_object_alloc(const struct lu_context *ctxt,
+                                         struct lu_site *s,
+                                         const struct lu_fid *f)
 {
         struct lu_object *scan;
         struct lu_object *top;
+        struct list_head *layers;
         int clean;
         int result;
 
-        top = s->ls_top_dev->ld_ops->ldo_object_alloc(ctxt, s->ls_top_dev);
+        /*
+         * Create top-level object slice. This will also create
+         * lu_object_header.
+         */
+        top = s->ls_top_dev->ld_ops->ldo_object_alloc(ctxt,
+                                                      NULL, s->ls_top_dev);
         if (IS_ERR(top))
                 RETURN(top);
-        *lu_object_fid(top) = *f;
+        s->ls_total ++;
+        /*
+         * This is the only place where object fid is assigned. It's constant
+         * after this point.
+         */
+        top->lo_header->loh_fid = *f;
+        layers = &top->lo_header->loh_layers;
         do {
+                /*
+                 * Call ->loo_object_init() repeatedly, until no more new
+                 * object slices are created.
+                 */
                 clean = 1;
-                list_for_each_entry(scan,
-                                    &top->lo_header->loh_layers, lo_linkage) {
+                list_for_each_entry(scan, layers, lo_linkage) {
                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
                                 continue;
                         clean = 0;
                         scan->lo_header = top->lo_header;
-                        result = lu_object_ops(scan)->ldo_object_init(ctxt,
-                                                                      scan);
+                        result = scan->lo_ops->loo_object_init(ctxt, scan);
                         if (result != 0) {
                                 lu_object_free(ctxt, top);
                                 RETURN(ERR_PTR(result));
@@ -99,38 +149,68 @@ struct lu_object *lu_object_alloc(struct lu_context *ctxt,
                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
                 }
         } while (!clean);
+
+        list_for_each_entry_reverse(scan, layers, lo_linkage) {
+                if (scan->lo_ops->loo_object_start != NULL) {
+                        result = scan->lo_ops->loo_object_start(ctxt, scan);
+                        if (result != 0) {
+                                lu_object_free(ctxt, top);
+                                RETURN(ERR_PTR(result));
+                        }
+                }
+        }
+
         s->ls_stats.s_created ++;
         RETURN(top);
 }
 
-static void lu_object_free(struct lu_context *ctx, struct lu_object *o)
+/*
+ * Free object.
+ */
+static void lu_object_free(const struct lu_context *ctx, struct lu_object *o)
 {
         struct list_head splice;
         struct lu_object *scan;
 
+        /*
+         * First call ->loo_object_delete() method to release all resources.
+         */
         list_for_each_entry_reverse(scan,
                                     &o->lo_header->loh_layers, lo_linkage) {
-                if (lu_object_ops(scan)->ldo_object_delete != NULL)
-                        lu_object_ops(scan)->ldo_object_delete(ctx, scan);
+                if (scan->lo_ops->loo_object_delete != NULL)
+                        scan->lo_ops->loo_object_delete(ctx, scan);
         }
         -- o->lo_dev->ld_site->ls_total;
+        /*
+         * Then, splice object layers into stand-alone list, and call
+         * ->loo_object_free() on all layers to free memory. Splice is
+         * necessary, because lu_object_header is freed together with the
+         * top-level slice.
+         */
         INIT_LIST_HEAD(&splice);
         list_splice_init(&o->lo_header->loh_layers, &splice);
         while (!list_empty(&splice)) {
-                o = container_of(splice.next, struct lu_object, lo_linkage);
+                o = container_of0(splice.next, struct lu_object, lo_linkage);
                 list_del_init(&o->lo_linkage);
-                LASSERT(lu_object_ops(o)->ldo_object_free != NULL);
-                lu_object_ops(o)->ldo_object_free(ctx, o);
+                LASSERT(o->lo_ops->loo_object_free != NULL);
+                o->lo_ops->loo_object_free(ctx, o);
         }
 }
 
-void lu_site_purge(struct lu_context *ctx, struct lu_site *s, int nr)
+/*
+ * Free @nr objects from the cold end of the site LRU list.
+ */
+void lu_site_purge(const struct lu_context *ctx, struct lu_site *s, int nr)
 {
         struct list_head         dispose;
         struct lu_object_header *h;
         struct lu_object_header *temp;
 
         INIT_LIST_HEAD(&dispose);
+        /*
+         * Under LRU list lock, scan LRU list and move unreferenced objects to
+         * the dispose list, removing them from LRU and hash table.
+         */
         spin_lock(&s->ls_guard);
         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
                 if (nr-- == 0)
@@ -141,8 +221,12 @@ void lu_site_purge(struct lu_context *ctx, struct lu_site *s, int nr)
                 list_move(&h->loh_lru, &dispose);
         }
         spin_unlock(&s->ls_guard);
+        /*
+         * Free everything on the dispose list. This is safe against races due
+         * to the reasons described in lu_object_put().
+         */
         while (!list_empty(&dispose)) {
-                h = container_of(dispose.next,
+                h = container_of0(dispose.next,
                                  struct lu_object_header, loh_lru);
                 list_del_init(&h->loh_lru);
                 lu_object_free(ctx, lu_object_top(h));
@@ -151,29 +235,157 @@ void lu_site_purge(struct lu_context *ctx, struct lu_site *s, int nr)
 }
 EXPORT_SYMBOL(lu_site_purge);
 
-int lu_object_print(struct lu_context *ctx,
-                    struct seq_file *f, const struct lu_object *o)
+/*
+ * Object printing.
+ *
+ * Code below has to jump through certain loops to output object description
+ * into libcfs_debug_msg-based log. The problem is that lu_object_print()
+ * composes object description from strings that are parts of _lines_ of
+ * output (i.e., strings that are not terminated by newline). This doesn't fit
+ * very well into libcfs_debug_msg() interface that assumes that each message
+ * supplied to it is a self-contained output line.
+ *
+ * To work around this, strings are collected in a temporary buffer
+ * (implemented as a value of lu_cdebug_key key), until terminating newline
+ * character is detected.
+ *
+ */
+
+enum {
+        /*
+         * Maximal line size.
+         *
+         * XXX overflow is not handled correctly.
+         */
+        LU_CDEBUG_LINE = 256
+};
+
+struct lu_cdebug_data {
+        /*
+         * Temporary buffer.
+         */
+        char lck_area[LU_CDEBUG_LINE];
+};
+
+static void *lu_cdebug_key_init(const struct lu_context *ctx,
+                                struct lu_context_key *key)
+{
+        struct lu_cdebug_data *value;
+
+        OBD_ALLOC_PTR(value);
+        if (value == NULL)
+                value = ERR_PTR(-ENOMEM);
+        return value;
+}
+
+static void lu_cdebug_key_fini(const struct lu_context *ctx,
+                               struct lu_context_key *key, void *data)
+{
+        struct lu_cdebug_data *value = data;
+        OBD_FREE_PTR(value);
+}
+
+/*
+ * Key, holding temporary buffer. This key is registered very early by
+ * lu_global_init().
+ */
+static struct lu_context_key lu_cdebug_key = {
+        .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
+        .lct_init = lu_cdebug_key_init,
+        .lct_fini = lu_cdebug_key_fini
+};
+
+/*
+ * Printer function emitting messages through libcfs_debug_msg().
+ */
+int lu_cdebug_printer(const struct lu_context *ctx,
+                      void *cookie, const char *format, ...)
 {
-        static char ruler[] = "........................................";
-        const struct lu_object *scan;
-        int nob;
+        struct lu_cdebug_print_info *info = cookie;
+        struct lu_cdebug_data       *key;
+        int used;
+        int complete;
+       va_list args;
+
+        va_start(args, format);
+
+        key = lu_context_key_get(ctx, &lu_cdebug_key);
+        LASSERT(key != NULL);
+
+        used = strlen(key->lck_area);
+        complete = format[strlen(format) - 1] == '\n';
+        /*
+         * Append new chunk to the buffer.
+         */
+        vsnprintf(key->lck_area + used,
+                  ARRAY_SIZE(key->lck_area) - used, format, args);
+        if (complete) {
+                libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
+                                 (char *)info->lpi_file, info->lpi_fn,
+                                 info->lpi_line, "%s", key->lck_area);
+                key->lck_area[0] = 0;
+        }
+        va_end(args);
+        return 0;
+}
+EXPORT_SYMBOL(lu_cdebug_printer);
+
+/*
+ * Print object header.
+ */
+static void lu_object_header_print(const struct lu_context *ctx,
+                                   void *cookie, lu_printer_t printer,
+                                   const struct lu_object_header *hdr)
+{
+        (*printer)(ctx, cookie, "header@%p[%#lx, %d, "DFID"%s%s]",
+                   hdr, hdr->loh_flags, hdr->loh_ref, PFID(&hdr->loh_fid),
+                   hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
+                   list_empty(&hdr->loh_lru) ? "" : " lru");
+}
+
+/*
+ * Print human readable representation of the @o to the @printer.
+ */
+void lu_object_print(const struct lu_context *ctx, void *cookie,
+                     lu_printer_t printer, const struct lu_object *o)
+{
+        static const char ruler[] = "........................................";
+        struct lu_object_header *top;
         int depth;
 
-        nob = 0;
-        scan = o;
-        list_for_each_entry_continue(scan, &o->lo_linkage, lo_linkage) {
-                depth = scan->lo_depth;
-                if (depth <= o->lo_depth && scan != o)
-                        break;
-                LASSERT(lu_object_ops(scan)->ldo_object_print != NULL);
-                nob += seq_printf(f, "%*.*s", depth, depth, ruler);
-                nob += lu_object_ops(scan)->ldo_object_print(ctx, f, scan);
-                nob += seq_printf(f, "\n");
+        top = o->lo_header;
+        lu_object_header_print(ctx, cookie, printer, top);
+        (*printer)(ctx, cookie, "\n");
+        list_for_each_entry(o, &top->loh_layers, lo_linkage) {
+                depth = o->lo_depth + 4;
+                LASSERT(o->lo_ops->loo_object_print != NULL);
+                /*
+                 * print `.' @depth times.
+                 */
+                (*printer)(ctx, cookie, "%*.*s", depth, depth, ruler);
+                o->lo_ops->loo_object_print(ctx, cookie, printer, o);
+                (*printer)(ctx, cookie, "\n");
         }
-        return nob;
 }
 EXPORT_SYMBOL(lu_object_print);
 
+/*
+ * Check object consistency.
+ */
+int lu_object_invariant(const struct lu_object *o)
+{
+        struct lu_object_header *top;
+
+        top = o->lo_header;
+        list_for_each_entry(o, &top->loh_layers, lo_linkage) {
+                if (o->lo_ops->loo_object_invariant != NULL &&
+                    !o->lo_ops->loo_object_invariant(o))
+                        return 0;
+        }
+        return 1;
+}
+EXPORT_SYMBOL(lu_object_invariant);
+
 static struct lu_object *htable_lookup(struct lu_site *s,
                                        const struct hlist_head *bucket,
                                        const struct lu_fid *f)
@@ -201,35 +413,58 @@ static __u32 fid_hash(const struct lu_fid *f)
 {
         /* all objects with same id and different versions will belong to same
          * collisions list. */
-        return (fid_seq(f) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f);
+        return (fid_seq(f) - 1) * LUSTRE_SEQ_MAX_WIDTH + fid_oid(f);
 }
 
-struct lu_object *lu_object_find(struct lu_context *ctxt, struct lu_site *s,
-                                 const struct lu_fid *f)
+/*
+ * Search cache for an object with the fid @f. If such object is found, return
+ * it. Otherwise, create new object, insert it into cache and return it. In
+ * any case, additional reference is acquired on the returned object.
+ */
+struct lu_object *lu_object_find(const struct lu_context *ctxt,
+                                 struct lu_site *s, const struct lu_fid *f)
 {
         struct lu_object  *o;
         struct lu_object  *shadow;
         struct hlist_head *bucket;
 
+        /*
+         * This uses standard index maintenance protocol:
+         *
+         *     - search index under lock, and return object if found;
+         *     - otherwise, unlock index, allocate new object;
+         *     - lock index and search again;
+         *     - if nothing is found (usual case), insert newly created
+         *       object into index;
+         *     - otherwise (race: other thread inserted object), free
+         *       object just allocated.
+         *     - unlock index;
+         *     - return object.
+         */
+
         bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
         spin_lock(&s->ls_guard);
         o = htable_lookup(s, bucket, f);
+
         spin_unlock(&s->ls_guard);
         if (o != NULL)
                 return o;
-
+        /*
+         * Allocate new object. This may result in rather complicated
+         * operations, including fld queries, inode loading, etc.
+         */
         o = lu_object_alloc(ctxt, s, f);
         if (IS_ERR(o))
                 return o;
 
-        ++ s->ls_total;
         LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
         spin_lock(&s->ls_guard);
         shadow = htable_lookup(s, bucket, f);
         if (shadow == NULL) {
                 hlist_add_head(&o->lo_header->loh_hash, bucket);
-                list_add_tail(&s->ls_lru, &o->lo_header->loh_lru);
+                list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
+                ++ s->ls_busy;
                 shadow = o;
                 o = NULL;
         } else
@@ -247,10 +482,15 @@ enum {
         LU_SITE_HTABLE_MASK = LU_SITE_HTABLE_SIZE - 1
 };
 
+/*
+ * Initialize site @s, with @d as the top level device.
+ */
 int lu_site_init(struct lu_site *s, struct lu_device *top)
 {
-        memset(s, 0, sizeof *s);
+        int result;
+        ENTRY;
 
+        memset(s, 0, sizeof *s);
         spin_lock_init(&s->ls_guard);
         CFS_INIT_LIST_HEAD(&s->ls_lru);
         s->ls_top_dev = top;
@@ -265,12 +505,18 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
                 int i;
                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
                         INIT_HLIST_HEAD(&s->ls_hash[i]);
-                return 0;
-        } else
-                return -ENOMEM;
+                result = 0;
+        } else {
+                result = -ENOMEM;
+        }
+
+        RETURN(result);
 }
 EXPORT_SYMBOL(lu_site_init);
 
+/*
+ * Finalize @s and release its resources.
+ */
 void lu_site_fini(struct lu_site *s)
 {
         LASSERT(list_empty(&s->ls_lru));
@@ -286,25 +532,34 @@ void lu_site_fini(struct lu_site *s)
                 s->ls_hash = NULL;
        }
        if (s->ls_top_dev != NULL) {
-               lu_device_put(s->ls_top_dev);
                s->ls_top_dev->ld_site = NULL;
+               lu_device_put(s->ls_top_dev);
                s->ls_top_dev = NULL;
        }
  }
 EXPORT_SYMBOL(lu_site_fini);
 
+/*
+ * Acquire additional reference on device @d
+ */
 void lu_device_get(struct lu_device *d)
 {
         atomic_inc(&d->ld_ref);
 }
 EXPORT_SYMBOL(lu_device_get);
 
+/*
+ * Release reference on device @d.
+ */
 void lu_device_put(struct lu_device *d)
 {
         atomic_dec(&d->ld_ref);
 }
 EXPORT_SYMBOL(lu_device_put);
 
+/*
+ * Initialize device @d of type @t.
+ */
 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
 {
         memset(d, 0, sizeof *d);
@@ -314,12 +569,19 @@ int lu_device_init(struct lu_device *d, struct lu_device_type *t)
 }
 EXPORT_SYMBOL(lu_device_init);
 
+/*
+ * Finalize device @d.
+ */
 void lu_device_fini(struct lu_device *d)
 {
         LASSERT(atomic_read(&d->ld_ref) == 0);
 }
 EXPORT_SYMBOL(lu_device_fini);
 
+/*
+ * Initialize object @o that is part of compound object @h and was created by
+ * device @d.
+ */
 int lu_object_init(struct lu_object *o,
                    struct lu_object_header *h, struct lu_device *d)
 {
@@ -332,32 +594,51 @@ int lu_object_init(struct lu_object *o,
 }
 EXPORT_SYMBOL(lu_object_init);
 
+/*
+ * Finalize object and release its resources.
+ */
 void lu_object_fini(struct lu_object *o)
 {
         LASSERT(list_empty(&o->lo_linkage));
 
         if (o->lo_dev != NULL) {
-                lu_device_get(o->lo_dev);
+                lu_device_put(o->lo_dev);
                 o->lo_dev = NULL;
         }
 }
 EXPORT_SYMBOL(lu_object_fini);
 
+/*
+ * Add object @o as first layer of compound object @h
+ *
+ * This is typically called by the ->ldo_object_alloc() method of top-level
+ * device.
+ */
 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
 {
         list_move(&o->lo_linkage, &h->loh_layers);
 }
 EXPORT_SYMBOL(lu_object_add_top);
 
+/*
+ * Add object @o as a layer of compound object, going after @before.1
+ *
+ * This is typically called by the ->ldo_object_alloc() method of
+ * @before->lo_dev.
+ */
 void lu_object_add(struct lu_object *before, struct lu_object *o)
 {
         list_move(&o->lo_linkage, &before->lo_linkage);
 }
 EXPORT_SYMBOL(lu_object_add);
 
+/*
+ * Initialize compound object.
+ */
 int lu_object_header_init(struct lu_object_header *h)
 {
         memset(h, 0, sizeof *h);
+        h->loh_ref = 1;
         INIT_HLIST_NODE(&h->loh_hash);
         CFS_INIT_LIST_HEAD(&h->loh_lru);
         CFS_INIT_LIST_HEAD(&h->loh_layers);
@@ -365,6 +646,9 @@ int lu_object_header_init(struct lu_object_header *h)
 }
 EXPORT_SYMBOL(lu_object_header_init);
 
+/*
+ * Finalize compound object.
+ */
 void lu_object_header_fini(struct lu_object_header *h)
 {
         LASSERT(list_empty(&h->loh_layers));
@@ -373,6 +657,10 @@ void lu_object_header_fini(struct lu_object_header *h)
 }
 EXPORT_SYMBOL(lu_object_header_fini);
 
+/*
+ * Given a compound object, find its slice, corresponding to the device type
+ * @dtype.
+ */
 struct lu_object *lu_object_locate(struct lu_object_header *h,
                                    struct lu_device_type *dtype)
 {
@@ -397,11 +685,18 @@ static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
 
 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
 
+/*
+ * Register new key.
+ */
 int lu_context_key_register(struct lu_context_key *key)
 {
         int result;
         int i;
 
+        LASSERT(key->lct_init != NULL);
+        LASSERT(key->lct_fini != NULL);
+        LASSERT(key->lct_tags != 0);
+
         result = -ENFILE;
         spin_lock(&lu_keys_guard);
         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
@@ -418,6 +713,9 @@ int lu_context_key_register(struct lu_context_key *key)
 }
 EXPORT_SYMBOL(lu_context_key_register);
 
+/*
+ * Deregister key.
+ */
 void lu_context_key_degister(struct lu_context_key *key)
 {
         LASSERT(key->lct_used >= 1);
@@ -431,7 +729,11 @@ void lu_context_key_degister(struct lu_context_key *key)
 }
 EXPORT_SYMBOL(lu_context_key_degister);
 
-void *lu_context_key_get(struct lu_context *ctx, struct lu_context_key *key)
+/*
+ * Return value associated with key @key in context @ctx.
+ */
+void *lu_context_key_get(const struct lu_context *ctx,
+                         struct lu_context_key *key)
 {
         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
         return ctx->lc_value[key->lct_index];
@@ -452,7 +754,7 @@ static void keys_fini(struct lu_context *ctx)
                                 LASSERT(key->lct_fini != NULL);
                                 LASSERT(key->lct_used > 1);
 
-                                key->lct_fini(ctx, ctx->lc_value[i]);
+                                key->lct_fini(ctx, key, ctx->lc_value[i]);
                                 key->lct_used--;
                                 ctx->lc_value[i] = NULL;
                         }
@@ -474,13 +776,13 @@ static int keys_init(struct lu_context *ctx)
                         struct lu_context_key *key;
 
                         key = lu_keys[i];
-                        if (key != NULL) {
+                        if (key != NULL && key->lct_tags & ctx->lc_tags) {
                                 void *value;
 
                                 LASSERT(key->lct_init != NULL);
                                 LASSERT(key->lct_index == i);
 
-                                value = key->lct_init(ctx);
+                                value = key->lct_init(ctx, key);
                                 if (IS_ERR(value)) {
                                         keys_fini(ctx);
                                         return PTR_ERR(value);
@@ -495,26 +797,73 @@ static int keys_init(struct lu_context *ctx)
         return result;
 }
 
-int lu_context_init(struct lu_context *ctx)
+/*
+ * Initialize context data-structure. Create values for all keys.
+ */
+int lu_context_init(struct lu_context *ctx, __u32 tags)
 {
         memset(ctx, 0, sizeof *ctx);
+        ctx->lc_tags = tags;
         keys_init(ctx);
         return 0;
 }
 EXPORT_SYMBOL(lu_context_init);
 
+/*
+ * Finalize context data-structure. Destroy key values.
+ */
 void lu_context_fini(struct lu_context *ctx)
 {
         keys_fini(ctx);
 }
 EXPORT_SYMBOL(lu_context_fini);
 
+/*
+ * Called before entering context.
+ */
 void lu_context_enter(struct lu_context *ctx)
 {
 }
 EXPORT_SYMBOL(lu_context_enter);
 
+/*
+ * Called after exiting from @ctx
+ */
 void lu_context_exit(struct lu_context *ctx)
 {
+        int i;
+
+        if (ctx->lc_value != NULL) {
+                for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
+                        if (ctx->lc_value[i] != NULL) {
+                                struct lu_context_key *key;
+
+                                key = lu_keys[i];
+                                LASSERT(key != NULL);
+                                if (key->lct_exit != NULL)
+                                        key->lct_exit(ctx,
+                                                      key, ctx->lc_value[i]);
+                        }
+                }
+        }
 }
 EXPORT_SYMBOL(lu_context_exit);
+
+/*
+ * Initialization of global lu_* data.
+ */
+int lu_global_init(void)
+{
+        int result;
+
+        result = lu_context_key_register(&lu_cdebug_key);
+        return result;
+}
+
+/*
+ * Dual to lu_global_init().
+ */
+void lu_global_fini(void)
+{
+        lu_context_key_degister(&lu_cdebug_key);
+}