Whamcloud - gitweb
Use cdebug_show() in CDEBUG-style macros defined outside of libcfs.
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 1a36993..35d76cf 100644 (file)
@@ -351,6 +351,7 @@ int lu_cdebug_printer(const struct lu_env *env,
         vsnprintf(key->lck_area + used,
                   ARRAY_SIZE(key->lck_area) - used, format, args);
         if (complete) {
+                if (cdebug_show(info->lpi_mask, info->lpi_subsys))
                 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
                                  (char *)info->lpi_file, info->lpi_fn,
                                  info->lpi_line, "%s", key->lck_area);
@@ -361,23 +362,24 @@ int lu_cdebug_printer(const struct lu_env *env,
 }
 EXPORT_SYMBOL(lu_cdebug_printer);
 
-/*
+/**
  * Print object header.
  */
-static void lu_object_header_print(const struct lu_env *env,
-                                   void *cookie, lu_printer_t printer,
+void lu_object_header_print(const struct lu_env *env, void *cookie,
+                            lu_printer_t printer,
                                    const struct lu_object_header *hdr)
 {
         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
                    PFID(&hdr->loh_fid),
                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
-                   list_empty(&hdr->loh_lru) ? "" : " lru",
+                   list_empty((struct list_head *)&hdr->loh_lru) ? "" : " lru",
                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
 }
+EXPORT_SYMBOL(lu_object_header_print);
 
-/*
- * Print human readable representation of the @o to the @printer.
+/**
+ * Print human readable representation of the \a o to the \a printer.
  */
 void lu_object_print(const struct lu_env *env, void *cookie,
                      lu_printer_t printer, const struct lu_object *o)
@@ -388,21 +390,24 @@ void lu_object_print(const struct lu_env *env, void *cookie,
 
         top = o->lo_header;
         lu_object_header_print(env, cookie, printer, top);
-        (*printer)(env, cookie, "\n");
+        (*printer)(env, cookie, "\n");
         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
                 depth = o->lo_depth + 4;
-                LASSERT(o->lo_ops->loo_object_print != NULL);
+
                 /*
-                 * print `.' @depth times.
+                 * print `.' \a depth times followed by type name and address
                  */
-                (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
+                (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
+                           o->lo_dev->ld_type->ldt_name, o);
+                if (o->lo_ops->loo_object_print != NULL)
                 o->lo_ops->loo_object_print(env, cookie, printer, o);
                 (*printer)(env, cookie, "\n");
         }
+        (*printer)(env, cookie, "} header@%p\n", top);
 }
 EXPORT_SYMBOL(lu_object_print);
 
-/*
+/**
  * Check object consistency.
  */
 int lu_object_invariant(const struct lu_object *o)
@@ -421,15 +426,29 @@ EXPORT_SYMBOL(lu_object_invariant);
 
 static struct lu_object *htable_lookup(struct lu_site *s,
                                        const struct hlist_head *bucket,
-                                       const struct lu_fid *f)
+                                       const struct lu_fid *f,
+                                       cfs_waitlink_t *waiter)
 {
         struct lu_object_header *h;
         struct hlist_node *scan;
 
         hlist_for_each_entry(h, scan, bucket, loh_hash) {
                 s->ls_stats.s_cache_check ++;
-                if (likely(lu_fid_eq(&h->loh_fid, f) &&
-                           !lu_object_is_dying(h))) {
+                if (likely(lu_fid_eq(&h->loh_fid, f))) {
+                        if (unlikely(lu_object_is_dying(h))) {
+                                /*
+                                 * Lookup found an object being destroyed;
+                                 * this object cannot be returned (to assure
+                                 * that references to dying objects are
+                                 * eventually drained), and moreover, lookup
+                                 * has to wait until object is freed.
+                                 */
+                                cfs_waitlink_init(waiter);
+                                cfs_waitq_add(&s->ls_marche_funebre, waiter);
+                                set_current_state(CFS_TASK_UNINT);
+                                s->ls_stats.s_cache_death_race ++;
+                                return ERR_PTR(-EAGAIN);
+                        }
                         /* bump reference count... */
                         if (atomic_add_return(1, &h->loh_ref) == 1)
                                 ++ s->ls_busy;
@@ -454,14 +473,29 @@ static __u32 fid_hash(const struct lu_fid *f, int bits)
         return hash_long(fid_flatten(f), bits);
 }
 
-/*
- * Search cache for an object with the fid @f. If such object is found, return
- * it. Otherwise, create new object, insert it into cache and return it. In
- * any case, additional reference is acquired on the returned object.
+/**
+ * Search cache for an object with the fid \a f. If such object is found,
+ * return it. Otherwise, create new object, insert it into cache and return
+ * it. In any case, additional reference is acquired on the returned object.
  */
 struct lu_object *lu_object_find(const struct lu_env *env,
-                                 struct lu_site *s, const struct lu_fid *f)
+                                 struct lu_device *dev, const struct lu_fid *f,
+                                 const struct lu_object_conf *conf)
 {
+        return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
+}
+EXPORT_SYMBOL(lu_object_find);
+
+/**
+ * Core logic of lu_object_find*() functions.
+ */
+static struct lu_object *lu_object_find_try(const struct lu_env *env,
+                                            struct lu_device *dev,
+                                            const struct lu_fid *f,
+                                            const struct lu_object_conf *conf,
+                                            cfs_waitlink_t *waiter)
+{
+        struct lu_site    *s;
         struct lu_object     *o;
         struct lu_object     *shadow;
         struct hlist_head *bucket;
@@ -478,12 +512,16 @@ struct lu_object *lu_object_find(const struct lu_env *env,
          *       object just allocated.
          *     - unlock index;
          *     - return object.
+         *
+         * If dying object is found during index search, add @waiter to the
+         * site wait-queue and return ERR_PTR(-EAGAIN).
          */
 
+        s = dev->ld_site;
         bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits);
 
         read_lock(&s->ls_guard);
-        o = htable_lookup(s, bucket, f);
+        o = htable_lookup(s, bucket, f, waiter);
         read_unlock(&s->ls_guard);
 
         if (o != NULL)
@@ -493,14 +531,14 @@ struct lu_object *lu_object_find(const struct lu_env *env,
          * Allocate new object. This may result in rather complicated
          * operations, including fld queries, inode loading, etc.
          */
-        o = lu_object_alloc(env, s, f);
+        o = lu_object_alloc(env, dev, f, conf);
         if (unlikely(IS_ERR(o)))
                 return o;
 
         LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
         write_lock(&s->ls_guard);
-        shadow = htable_lookup(s, bucket, f);
+        shadow = htable_lookup(s, bucket, f, waiter);
         if (likely(shadow == NULL)) {
                 hlist_add_head(&o->lo_header->loh_hash, bucket);
                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
@@ -515,9 +553,58 @@ struct lu_object *lu_object_find(const struct lu_env *env,
                 lu_object_free(env, o);
         return shadow;
 }
-EXPORT_SYMBOL(lu_object_find);
 
-/*
+/**
+ * Much like lu_object_find(), but top level device of object is specifically
+ * \a dev rather than top level device of the site. This interface allows
+ * objects of different "stacking" to be created within the same site.
+ */
+struct lu_object *lu_object_find_at(const struct lu_env *env,
+                                    struct lu_device *dev,
+                                    const struct lu_fid *f,
+                                    const struct lu_object_conf *conf)
+{
+        struct lu_object *obj;
+        cfs_waitlink_t    wait;
+
+        while (1) {
+                obj = lu_object_find_try(env, dev, f, conf, &wait);
+                if (obj == ERR_PTR(-EAGAIN)) {
+                        /*
+                         * lu_object_find_try() already added waiter into the
+                         * wait queue.
+                         */
+                        cfs_waitq_wait(&wait, CFS_TASK_UNINT);
+                        cfs_waitq_del(&dev->ld_site->ls_marche_funebre, &wait);
+                } else
+                        break;
+        }
+        return obj;
+}
+EXPORT_SYMBOL(lu_object_find_at);
+
+/**
+ * Find object with given fid, and return its slice belonging to given device.
+ */
+struct lu_object *lu_object_find_slice(const struct lu_env *env,
+                                       struct lu_device *dev,
+                                       const struct lu_fid *f,
+                                       const struct lu_object_conf *conf)
+{
+        struct lu_object *top;
+        struct lu_object *obj;
+
+        top = lu_object_find(env, dev, f, conf);
+        if (!IS_ERR(top)) {
+                obj = lu_object_locate(top->lo_header, dev->ld_type);
+                if (obj == NULL)
+                        lu_object_put(env, top);
+        } else
+                obj = top;
+        return obj;
+}
+EXPORT_SYMBOL(lu_object_find_slice);
+
 /**
  * Global list of all device types.
  */
@@ -1448,9 +1535,40 @@ struct lu_buf LU_BUF_NULL = {
 };
 EXPORT_SYMBOL(LU_BUF_NULL);
 
+/**
+ * Output site statistical counters into a buffer. Suitable for
+ * lprocfs_rd_*()-style functions.
+ */
+int lu_site_stats_print(const struct lu_site *s, char *page, int count)
+{
+        int i;
+        int populated;
+
+        /*
+         * How many hash buckets are not-empty? Don't bother with locks: it's
+         * an estimation anyway.
+         */
+        for (i = 0, populated = 0; i < s->ls_hash_size; i++)
+                populated += !hlist_empty(&s->ls_hash[i]);
+
+        return snprintf(page, count, "%d %d %d/%d %d %d %d %d %d %d %d\n",
+                        s->ls_total,
+                        s->ls_busy,
+                        populated,
+                        s->ls_hash_size,
+                        s->ls_stats.s_created,
+                        s->ls_stats.s_cache_hit,
+                        s->ls_stats.s_cache_miss,
+                        s->ls_stats.s_cache_check,
+                        s->ls_stats.s_cache_race,
+                        s->ls_stats.s_cache_death_race,
+                        s->ls_stats.s_lru_purged);
+}
+EXPORT_SYMBOL(lu_site_stats_print);
+
 /*
- * XXX: Functions below logically belong to fid module, but they are used by
- * dt_store_open(). Put them here until better place is found.
+ * XXX: Functions below logically belong to the fid module, but they are used
+ * by dt_store_open(). Put them here until better place is found.
  */
 
 void fid_pack(struct lu_fid_pack *pack, const struct lu_fid *fid,