Whamcloud - gitweb
Fix a race between lu_object_find() finding an object and its concurrent
authornikita <nikita>
Sat, 18 Oct 2008 17:22:46 +0000 (17:22 +0000)
committernikita <nikita>
Sat, 18 Oct 2008 17:22:46 +0000 (17:22 +0000)
finalization. This race is (most likely) not possible on the server, but might
happen on the client.
b=16450

lustre/ChangeLog
lustre/include/lu_object.h
lustre/obdclass/lu_object.c

index 8f7debb..a74808a 100644 (file)
@@ -1627,6 +1627,13 @@ Description: Introduce lu_kmem_descr.
 Details    : lu_kmem_descr and its companion interface allow to create
             and destroy a number of kmem caches at once.
 
+Severity   : normal
+Bugzilla   : 16450
+Description: Fix lu_object finalization race.
+Details    : Fix a race between lu_object_find() finding an object and its
+            concurrent finalization. This race is (most likely) not possible
+            on the server, but might happen on the client.
+
 --------------------------------------------------------------------------------
 
 2007-08-10         Cluster File Systems, Inc. <info@clusterfs.com>
index 64384b7..e848fac 100644 (file)
@@ -631,15 +631,25 @@ struct lu_site {
 
         /*
          * Client Seq Manager
+        /**
+         * Wait-queue signaled when an object in this site is ultimately
+         * destroyed (lu_object_free()). It is used by lu_object_find() to
+         * wait before re-trying when object in the process of destruction is
+         * found in the hash table.
+         *
+         * If having a single wait-queue turns out to be a problem, a
+         * wait-queue per hash-table bucket can be easily implemented.
+         *
+         * \see htable_lookup().
          */
-        struct lu_client_seq *ls_client_seq;
+        cfs_waitq_t           ls_marche_funebre;
 
-        /* statistical counters. Protected by nothing, races are accepted. */
+        /** statistical counters. Protected by nothing, races are accepted. */
         struct {
                 __u32 s_created;
                 __u32 s_cache_hit;
                 __u32 s_cache_miss;
-                /*
+                /**
                  * Number of hash-table entry checks made.
                  *
                  *       ->s_cache_check / (->s_cache_miss + ->s_cache_hit)
index 1a36993..8ecd85c 100644 (file)
@@ -421,15 +421,29 @@ EXPORT_SYMBOL(lu_object_invariant);
 
 static struct lu_object *htable_lookup(struct lu_site *s,
                                        const struct hlist_head *bucket,
-                                       const struct lu_fid *f)
+                                       const struct lu_fid *f,
+                                       cfs_waitlink_t *waiter)
 {
         struct lu_object_header *h;
         struct hlist_node *scan;
 
         hlist_for_each_entry(h, scan, bucket, loh_hash) {
                 s->ls_stats.s_cache_check ++;
-                if (likely(lu_fid_eq(&h->loh_fid, f) &&
-                           !lu_object_is_dying(h))) {
+                if (likely(lu_fid_eq(&h->loh_fid, f))) {
+                        if (unlikely(lu_object_is_dying(h))) {
+                                /*
+                                 * Lookup found an object being destroyed;
+                                 * this object cannot be returned (to assure
+                                 * that references to dying objects are
+                                 * eventually drained), and moreover, lookup
+                                 * has to wait until object is freed.
+                                 */
+                                cfs_waitlink_init(waiter);
+                                cfs_waitq_add(&s->ls_marche_funebre, waiter);
+                                set_current_state(CFS_TASK_UNINT);
+                                s->ls_stats.s_cache_death_race ++;
+                                return ERR_PTR(-EAGAIN);
+                        }
                         /* bump reference count... */
                         if (atomic_add_return(1, &h->loh_ref) == 1)
                                 ++ s->ls_busy;
@@ -454,14 +468,29 @@ static __u32 fid_hash(const struct lu_fid *f, int bits)
         return hash_long(fid_flatten(f), bits);
 }
 
-/*
- * Search cache for an object with the fid @f. If such object is found, return
- * it. Otherwise, create new object, insert it into cache and return it. In
- * any case, additional reference is acquired on the returned object.
+/**
+ * Search cache for an object with the fid \a f. If such object is found,
+ * return it. Otherwise, create new object, insert it into cache and return
+ * it. In any case, additional reference is acquired on the returned object.
  */
 struct lu_object *lu_object_find(const struct lu_env *env,
-                                 struct lu_site *s, const struct lu_fid *f)
+                                 struct lu_device *dev, const struct lu_fid *f,
+                                 const struct lu_object_conf *conf)
 {
+        return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
+}
+EXPORT_SYMBOL(lu_object_find);
+
+/**
+ * Core logic of lu_object_find*() functions.
+ */
+static struct lu_object *lu_object_find_try(const struct lu_env *env,
+                                            struct lu_device *dev,
+                                            const struct lu_fid *f,
+                                            const struct lu_object_conf *conf,
+                                            cfs_waitlink_t *waiter)
+{
+        struct lu_site    *s;
         struct lu_object     *o;
         struct lu_object     *shadow;
         struct hlist_head *bucket;
@@ -478,12 +507,16 @@ struct lu_object *lu_object_find(const struct lu_env *env,
          *       object just allocated.
          *     - unlock index;
          *     - return object.
+         *
+         * If dying object is found during index search, add @waiter to the
+         * site wait-queue and return ERR_PTR(-EAGAIN).
          */
 
+        s = dev->ld_site;
         bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits);
 
         read_lock(&s->ls_guard);
-        o = htable_lookup(s, bucket, f);
+        o = htable_lookup(s, bucket, f, waiter);
         read_unlock(&s->ls_guard);
 
         if (o != NULL)
@@ -493,14 +526,14 @@ struct lu_object *lu_object_find(const struct lu_env *env,
          * Allocate new object. This may result in rather complicated
          * operations, including fld queries, inode loading, etc.
          */
-        o = lu_object_alloc(env, s, f);
+        o = lu_object_alloc(env, dev, f, conf);
         if (unlikely(IS_ERR(o)))
                 return o;
 
         LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
         write_lock(&s->ls_guard);
-        shadow = htable_lookup(s, bucket, f);
+        shadow = htable_lookup(s, bucket, f, waiter);
         if (likely(shadow == NULL)) {
                 hlist_add_head(&o->lo_header->loh_hash, bucket);
                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
@@ -515,9 +548,58 @@ struct lu_object *lu_object_find(const struct lu_env *env,
                 lu_object_free(env, o);
         return shadow;
 }
-EXPORT_SYMBOL(lu_object_find);
 
-/*
+/**
+ * Much like lu_object_find(), but top level device of object is specifically
+ * \a dev rather than top level device of the site. This interface allows
+ * objects of different "stacking" to be created within the same site.
+ */
+struct lu_object *lu_object_find_at(const struct lu_env *env,
+                                    struct lu_device *dev,
+                                    const struct lu_fid *f,
+                                    const struct lu_object_conf *conf)
+{
+        struct lu_object *obj;
+        cfs_waitlink_t    wait;
+
+        while (1) {
+                obj = lu_object_find_try(env, dev, f, conf, &wait);
+                if (obj == ERR_PTR(-EAGAIN)) {
+                        /*
+                         * lu_object_find_try() already added waiter into the
+                         * wait queue.
+                         */
+                        cfs_waitq_wait(&wait, CFS_TASK_UNINT);
+                        cfs_waitq_del(&dev->ld_site->ls_marche_funebre, &wait);
+                } else
+                        break;
+        }
+        return obj;
+}
+EXPORT_SYMBOL(lu_object_find_at);
+
+/**
+ * Find object with given fid, and return its slice belonging to given device.
+ */
+struct lu_object *lu_object_find_slice(const struct lu_env *env,
+                                       struct lu_device *dev,
+                                       const struct lu_fid *f,
+                                       const struct lu_object_conf *conf)
+{
+        struct lu_object *top;
+        struct lu_object *obj;
+
+        top = lu_object_find(env, dev, f, conf);
+        if (!IS_ERR(top)) {
+                obj = lu_object_locate(top->lo_header, dev->ld_type);
+                if (obj == NULL)
+                        lu_object_put(env, top);
+        } else
+                obj = top;
+        return obj;
+}
+EXPORT_SYMBOL(lu_object_find_slice);
+
 /**
  * Global list of all device types.
  */