Whamcloud - gitweb
- merge with 1_5,some fixes.
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 7c2f25c..e3340fb 100644 (file)
@@ -4,6 +4,7 @@
  * Lustre Object.
  *
  *  Copyright (C) 2006 Cluster File Systems, Inc.
+ *   Author: Nikita Danilov <nikita@clusterfs.com>
  *
  *   This file is part of the Lustre file system, http://www.lustre.org
  *   Lustre is a trademark of Cluster File Systems, Inc.
@@ -36,6 +37,7 @@
 #include <linux/module.h>
 #include <obd_support.h>
 #include <lustre_disk.h>
+#include <lustre_fid.h>
 #include <lu_object.h>
 #include <libcfs/list.h>
 
@@ -50,9 +52,13 @@ void lu_object_put(const struct lu_context *ctxt, struct lu_object *o)
 {
         struct lu_object_header *top;
         struct lu_site          *site;
+        struct lu_object        *orig;
+        int                      kill_it;
 
         top = o->lo_header;
         site = o->lo_dev->ld_site;
+        orig = o;
+        kill_it = 0;
         spin_lock(&site->ls_guard);
         if (-- top->loh_ref == 0) {
                 /*
@@ -80,15 +86,16 @@ void lu_object_put(const struct lu_context *ctxt, struct lu_object *o)
                          */
                         hlist_del_init(&top->loh_hash);
                         list_del_init(&top->loh_lru);
+                        kill_it = 1;
                 }
         }
         spin_unlock(&site->ls_guard);
-        if (lu_object_is_dying(top))
+        if (kill_it)
                 /*
                  * Object was already removed from hash and lru above, can
                  * kill it.
                  */
-                lu_object_free(ctxt, o);
+                lu_object_free(ctxt, orig);
 }
 EXPORT_SYMBOL(lu_object_put);
 
@@ -104,6 +111,7 @@ static struct lu_object *lu_object_alloc(const struct lu_context *ctxt,
 {
         struct lu_object *scan;
         struct lu_object *top;
+        struct list_head *layers;
         int clean;
         int result;
 
@@ -121,14 +129,14 @@ static struct lu_object *lu_object_alloc(const struct lu_context *ctxt,
          * after this point.
          */
         top->lo_header->loh_fid = *f;
+        layers = &top->lo_header->loh_layers;
         do {
                 /*
                  * Call ->loo_object_init() repeatedly, until no more new
                  * object slices are created.
                  */
                 clean = 1;
-                list_for_each_entry(scan,
-                                    &top->lo_header->loh_layers, lo_linkage) {
+                list_for_each_entry(scan, layers, lo_linkage) {
                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
                                 continue;
                         clean = 0;
@@ -141,6 +149,17 @@ static struct lu_object *lu_object_alloc(const struct lu_context *ctxt,
                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
                 }
         } while (!clean);
+
+        list_for_each_entry_reverse(scan, layers, lo_linkage) {
+                if (scan->lo_ops->loo_object_start != NULL) {
+                        result = scan->lo_ops->loo_object_start(ctxt, scan);
+                        if (result != 0) {
+                                lu_object_free(ctxt, top);
+                                RETURN(ERR_PTR(result));
+                        }
+                }
+        }
+
         s->ls_stats.s_created ++;
         RETURN(top);
 }
@@ -217,34 +236,155 @@ void lu_site_purge(const struct lu_context *ctx, struct lu_site *s, int nr)
 EXPORT_SYMBOL(lu_site_purge);
 
 /*
- * Print human readable representation of the @o to the @f.
+ * Object printing.
+ *
+ * Code below has to jump through certain loops to output object description
+ * into libcfs_debug_msg-based log. The problem is that lu_object_print()
+ * composes object description from strings that are parts of _lines_ of
+ * output (i.e., strings that are not terminated by newline). This doesn't fit
+ * very well into libcfs_debug_msg() interface that assumes that each message
+ * supplied to it is a self-contained output line.
+ *
+ * To work around this, strings are collected in a temporary buffer
+ * (implemented as a value of lu_cdebug_key key), until terminating newline
+ * character is detected.
+ *
+ */
+
+enum {
+        /*
+         * Maximal line size.
+         *
+         * XXX overflow is not handled correctly.
+         */
+        LU_CDEBUG_LINE = 256
+};
+
+struct lu_cdebug_data {
+        /*
+         * Temporary buffer.
+         */
+        char lck_area[LU_CDEBUG_LINE];
+};
+
+static void *lu_cdebug_key_init(const struct lu_context *ctx,
+                                struct lu_context_key *key)
+{
+        struct lu_cdebug_data *value;
+
+        OBD_ALLOC_PTR(value);
+        if (value == NULL)
+                value = ERR_PTR(-ENOMEM);
+        return value;
+}
+
+static void lu_cdebug_key_fini(const struct lu_context *ctx,
+                               struct lu_context_key *key, void *data)
+{
+        struct lu_cdebug_data *value = data;
+        OBD_FREE_PTR(value);
+}
+
+/*
+ * Key, holding temporary buffer. This key is registered very early by
+ * lu_global_init().
+ */
+static struct lu_context_key lu_cdebug_key = {
+        .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
+        .lct_init = lu_cdebug_key_init,
+        .lct_fini = lu_cdebug_key_fini
+};
+
+/*
+ * Printer function emitting messages through libcfs_debug_msg().
+ */
+int lu_cdebug_printer(const struct lu_context *ctx,
+                      void *cookie, const char *format, ...)
+{
+        struct lu_cdebug_print_info *info = cookie;
+        struct lu_cdebug_data       *key;
+        int used;
+        int complete;
+       va_list args;
+
+        va_start(args, format);
+
+        key = lu_context_key_get(ctx, &lu_cdebug_key);
+        LASSERT(key != NULL);
+
+        used = strlen(key->lck_area);
+        complete = format[strlen(format) - 1] == '\n';
+        /*
+         * Append new chunk to the buffer.
+         */
+        vsnprintf(key->lck_area + used,
+                  ARRAY_SIZE(key->lck_area) - used, format, args);
+        if (complete) {
+                libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
+                                 (char *)info->lpi_file, info->lpi_fn,
+                                 info->lpi_line, "%s", key->lck_area);
+                key->lck_area[0] = 0;
+        }
+        va_end(args);
+        return 0;
+}
+EXPORT_SYMBOL(lu_cdebug_printer);
+
+/*
+ * Print object header.
+ */
+static void lu_object_header_print(const struct lu_context *ctx,
+                                   void *cookie, lu_printer_t printer,
+                                   const struct lu_object_header *hdr)
+{
+        (*printer)(ctx, cookie, "header@%p[%#lx, %d, "DFID"%s%s]",
+                   hdr, hdr->loh_flags, hdr->loh_ref, PFID(&hdr->loh_fid),
+                   hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
+                   list_empty(&hdr->loh_lru) ? "" : " lru");
+}
+
+/*
+ * Print human readable representation of the @o to the @printer.
  */
-int lu_object_print(const struct lu_context *ctx,
-                    struct seq_file *f, const struct lu_object *o)
+void lu_object_print(const struct lu_context *ctx, void *cookie,
+                     lu_printer_t printer, const struct lu_object *o)
 {
-        static char ruler[] = "........................................";
-        const struct lu_object *scan;
-        int nob;
+        static const char ruler[] = "........................................";
+        struct lu_object_header *top;
         int depth;
 
-        nob = 0;
-        scan = o;
-        list_for_each_entry_continue(scan, &o->lo_linkage, lo_linkage) {
-                depth = scan->lo_depth;
-                if (depth <= o->lo_depth && scan != o)
-                        break;
-                LASSERT(scan->lo_ops->loo_object_print != NULL);
+        top = o->lo_header;
+        lu_object_header_print(ctx, cookie, printer, top);
+        (*printer)(ctx, cookie, "\n");
+        list_for_each_entry(o, &top->loh_layers, lo_linkage) {
+                depth = o->lo_depth + 4;
+                LASSERT(o->lo_ops->loo_object_print != NULL);
                 /*
                  * print `.' @depth times.
                  */
-                nob += seq_printf(f, "%*.*s", depth, depth, ruler);
-                nob += scan->lo_ops->loo_object_print(ctx, f, scan);
-                nob += seq_printf(f, "\n");
+                (*printer)(ctx, cookie, "%*.*s", depth, depth, ruler);
+                o->lo_ops->loo_object_print(ctx, cookie, printer, o);
+                (*printer)(ctx, cookie, "\n");
         }
-        return nob;
 }
 EXPORT_SYMBOL(lu_object_print);
 
+/*
+ * Check object consistency.
+ */
+int lu_object_invariant(const struct lu_object *o)
+{
+        struct lu_object_header *top;
+
+        top = o->lo_header;
+        list_for_each_entry(o, &top->loh_layers, lo_linkage) {
+                if (o->lo_ops->loo_object_invariant != NULL &&
+                    !o->lo_ops->loo_object_invariant(o))
+                        return 0;
+        }
+        return 1;
+}
+EXPORT_SYMBOL(lu_object_invariant);
 
 static struct lu_object *htable_lookup(struct lu_site *s,
                                        const struct hlist_head *bucket,
@@ -273,7 +413,7 @@ static __u32 fid_hash(const struct lu_fid *f)
 {
         /* all objects with same id and different versions will belong to same
          * collisions list. */
-        return (fid_seq(f) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f);
+        return (fid_seq(f) - 1) * LUSTRE_SEQ_MAX_WIDTH + fid_oid(f);
 }
 
 /*
@@ -323,7 +463,7 @@ struct lu_object *lu_object_find(const struct lu_context *ctxt,
         shadow = htable_lookup(s, bucket, f);
         if (shadow == NULL) {
                 hlist_add_head(&o->lo_header->loh_hash, bucket);
-                list_add_tail(&s->ls_lru, &o->lo_header->loh_lru);
+                list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
                 ++ s->ls_busy;
                 shadow = o;
                 o = NULL;
@@ -553,6 +693,10 @@ int lu_context_key_register(struct lu_context_key *key)
         int result;
         int i;
 
+        LASSERT(key->lct_init != NULL);
+        LASSERT(key->lct_fini != NULL);
+        LASSERT(key->lct_tags != 0);
+
         result = -ENFILE;
         spin_lock(&lu_keys_guard);
         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
@@ -632,7 +776,7 @@ static int keys_init(struct lu_context *ctx)
                         struct lu_context_key *key;
 
                         key = lu_keys[i];
-                        if (key != NULL) {
+                        if (key != NULL && key->lct_tags & ctx->lc_tags) {
                                 void *value;
 
                                 LASSERT(key->lct_init != NULL);
@@ -656,9 +800,10 @@ static int keys_init(struct lu_context *ctx)
 /*
  * Initialize context data-structure. Create values for all keys.
  */
-int lu_context_init(struct lu_context *ctx)
+int lu_context_init(struct lu_context *ctx, __u32 tags)
 {
         memset(ctx, 0, sizeof *ctx);
+        ctx->lc_tags = tags;
         keys_init(ctx);
         return 0;
 }
@@ -686,5 +831,39 @@ EXPORT_SYMBOL(lu_context_enter);
  */
 void lu_context_exit(struct lu_context *ctx)
 {
+        int i;
+
+        if (ctx->lc_value != NULL) {
+                for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
+                        if (ctx->lc_value[i] != NULL) {
+                                struct lu_context_key *key;
+
+                                key = lu_keys[i];
+                                LASSERT(key != NULL);
+                                if (key->lct_exit != NULL)
+                                        key->lct_exit(ctx,
+                                                      key, ctx->lc_value[i]);
+                        }
+                }
+        }
 }
 EXPORT_SYMBOL(lu_context_exit);
+
+/*
+ * Initialization of global lu_* data.
+ */
+int lu_global_init(void)
+{
+        int result;
+
+        result = lu_context_key_register(&lu_cdebug_key);
+        return result;
+}
+
+/*
+ * Dual to lu_global_init().
+ */
+void lu_global_fini(void)
+{
+        lu_context_key_degister(&lu_cdebug_key);
+}