Whamcloud - gitweb
Fix a race between lu_object_find() finding an object and its concurrent
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 1a36993..8ecd85c 100644 (file)
@@ -421,15 +421,29 @@ EXPORT_SYMBOL(lu_object_invariant);
 
 static struct lu_object *htable_lookup(struct lu_site *s,
                                        const struct hlist_head *bucket,
-                                       const struct lu_fid *f)
+                                       const struct lu_fid *f,
+                                       cfs_waitlink_t *waiter)
 {
         struct lu_object_header *h;
         struct hlist_node *scan;
 
         hlist_for_each_entry(h, scan, bucket, loh_hash) {
                 s->ls_stats.s_cache_check ++;
-                if (likely(lu_fid_eq(&h->loh_fid, f) &&
-                           !lu_object_is_dying(h))) {
+                if (likely(lu_fid_eq(&h->loh_fid, f))) {
+                        if (unlikely(lu_object_is_dying(h))) {
+                                /*
+                                 * Lookup found an object being destroyed;
+                                 * this object cannot be returned (to assure
+                                 * that references to dying objects are
+                                 * eventually drained), and moreover, lookup
+                                 * has to wait until object is freed.
+                                 */
+                                cfs_waitlink_init(waiter);
+                                cfs_waitq_add(&s->ls_marche_funebre, waiter);
+                                set_current_state(CFS_TASK_UNINT);
+                                s->ls_stats.s_cache_death_race ++;
+                                return ERR_PTR(-EAGAIN);
+                        }
                         /* bump reference count... */
                         if (atomic_add_return(1, &h->loh_ref) == 1)
                                 ++ s->ls_busy;
@@ -454,14 +468,29 @@ static __u32 fid_hash(const struct lu_fid *f, int bits)
         return hash_long(fid_flatten(f), bits);
 }
 
-/*
- * Search cache for an object with the fid @f. If such object is found, return
- * it. Otherwise, create new object, insert it into cache and return it. In
- * any case, additional reference is acquired on the returned object.
+/**
+ * Search cache for an object with the fid \a f. If such object is found,
+ * return it. Otherwise, create new object, insert it into cache and return
+ * it. In any case, additional reference is acquired on the returned object.
  */
 struct lu_object *lu_object_find(const struct lu_env *env,
-                                 struct lu_site *s, const struct lu_fid *f)
+                                 struct lu_device *dev, const struct lu_fid *f,
+                                 const struct lu_object_conf *conf)
 {
+        return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
+}
+EXPORT_SYMBOL(lu_object_find);
+
+/**
+ * Core logic of lu_object_find*() functions.
+ */
+static struct lu_object *lu_object_find_try(const struct lu_env *env,
+                                            struct lu_device *dev,
+                                            const struct lu_fid *f,
+                                            const struct lu_object_conf *conf,
+                                            cfs_waitlink_t *waiter)
+{
+        struct lu_site    *s;
         struct lu_object     *o;
         struct lu_object     *shadow;
         struct hlist_head *bucket;
@@ -478,12 +507,16 @@ struct lu_object *lu_object_find(const struct lu_env *env,
          *       object just allocated.
          *     - unlock index;
          *     - return object.
+         *
+         * If dying object is found during index search, add @waiter to the
+         * site wait-queue and return ERR_PTR(-EAGAIN).
          */
 
+        s = dev->ld_site;
         bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits);
 
         read_lock(&s->ls_guard);
-        o = htable_lookup(s, bucket, f);
+        o = htable_lookup(s, bucket, f, waiter);
         read_unlock(&s->ls_guard);
 
         if (o != NULL)
@@ -493,14 +526,14 @@ struct lu_object *lu_object_find(const struct lu_env *env,
          * Allocate new object. This may result in rather complicated
          * operations, including fld queries, inode loading, etc.
          */
-        o = lu_object_alloc(env, s, f);
+        o = lu_object_alloc(env, dev, f, conf);
         if (unlikely(IS_ERR(o)))
                 return o;
 
         LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
         write_lock(&s->ls_guard);
-        shadow = htable_lookup(s, bucket, f);
+        shadow = htable_lookup(s, bucket, f, waiter);
         if (likely(shadow == NULL)) {
                 hlist_add_head(&o->lo_header->loh_hash, bucket);
                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
@@ -515,9 +548,58 @@ struct lu_object *lu_object_find(const struct lu_env *env,
                 lu_object_free(env, o);
         return shadow;
 }
-EXPORT_SYMBOL(lu_object_find);
 
-/*
+/**
+ * Much like lu_object_find(), but top level device of object is specifically
+ * \a dev rather than top level device of the site. This interface allows
+ * objects of different "stacking" to be created within the same site.
+ */
+struct lu_object *lu_object_find_at(const struct lu_env *env,
+                                    struct lu_device *dev,
+                                    const struct lu_fid *f,
+                                    const struct lu_object_conf *conf)
+{
+        struct lu_object *obj;
+        cfs_waitlink_t    wait;
+
+        while (1) {
+                obj = lu_object_find_try(env, dev, f, conf, &wait);
+                if (obj == ERR_PTR(-EAGAIN)) {
+                        /*
+                         * lu_object_find_try() already added waiter into the
+                         * wait queue.
+                         */
+                        cfs_waitq_wait(&wait, CFS_TASK_UNINT);
+                        cfs_waitq_del(&dev->ld_site->ls_marche_funebre, &wait);
+                } else
+                        break;
+        }
+        return obj;
+}
+EXPORT_SYMBOL(lu_object_find_at);
+
+/**
+ * Find object with given fid, and return its slice belonging to given device.
+ */
+struct lu_object *lu_object_find_slice(const struct lu_env *env,
+                                       struct lu_device *dev,
+                                       const struct lu_fid *f,
+                                       const struct lu_object_conf *conf)
+{
+        struct lu_object *top;
+        struct lu_object *obj;
+
+        top = lu_object_find(env, dev, f, conf);
+        if (!IS_ERR(top)) {
+                obj = lu_object_locate(top->lo_header, dev->ld_type);
+                if (obj == NULL)
+                        lu_object_put(env, top);
+        } else
+                obj = top;
+        return obj;
+}
+EXPORT_SYMBOL(lu_object_find_slice);
+
 /**
  * Global list of all device types.
  */