static struct lu_object *htable_lookup(struct lu_site *s,
const struct hlist_head *bucket,
- const struct lu_fid *f)
+ const struct lu_fid *f,
+ cfs_waitlink_t *waiter)
{
struct lu_object_header *h;
struct hlist_node *scan;
hlist_for_each_entry(h, scan, bucket, loh_hash) {
s->ls_stats.s_cache_check ++;
- if (likely(lu_fid_eq(&h->loh_fid, f) &&
- !lu_object_is_dying(h))) {
+ if (likely(lu_fid_eq(&h->loh_fid, f))) {
+ if (unlikely(lu_object_is_dying(h))) {
+ /*
+ * Lookup found an object being destroyed;
+ * this object cannot be returned (to assure
+ * that references to dying objects are
+ * eventually drained), and moreover, lookup
+ * has to wait until object is freed.
+ */
+ cfs_waitlink_init(waiter);
+ cfs_waitq_add(&s->ls_marche_funebre, waiter);
+ set_current_state(CFS_TASK_UNINT);
+ s->ls_stats.s_cache_death_race ++;
+ return ERR_PTR(-EAGAIN);
+ }
/* bump reference count... */
if (atomic_add_return(1, &h->loh_ref) == 1)
++ s->ls_busy;
return hash_long(fid_flatten(f), bits);
}
-/*
- * Search cache for an object with the fid @f. If such object is found, return
- * it. Otherwise, create new object, insert it into cache and return it. In
- * any case, additional reference is acquired on the returned object.
+/**
+ * Search cache for an object with the fid \a f. If such object is found,
+ * return it. Otherwise, create new object, insert it into cache and return
+ * it. In any case, additional reference is acquired on the returned object.
*/
struct lu_object *lu_object_find(const struct lu_env *env,
- struct lu_site *s, const struct lu_fid *f)
+ struct lu_device *dev, const struct lu_fid *f,
+ const struct lu_object_conf *conf)
{
+ return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
+}
+EXPORT_SYMBOL(lu_object_find);
+
+/**
+ * Core logic of lu_object_find*() functions.
+ */
+static struct lu_object *lu_object_find_try(const struct lu_env *env,
+ struct lu_device *dev,
+ const struct lu_fid *f,
+ const struct lu_object_conf *conf,
+ cfs_waitlink_t *waiter)
+{
+ struct lu_site *s;
struct lu_object *o;
struct lu_object *shadow;
struct hlist_head *bucket;
* object just allocated.
* - unlock index;
* - return object.
+ *
+ * If dying object is found during index search, add @waiter to the
+ * site wait-queue and return ERR_PTR(-EAGAIN).
*/
+ s = dev->ld_site;
bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits);
read_lock(&s->ls_guard);
- o = htable_lookup(s, bucket, f);
+ o = htable_lookup(s, bucket, f, waiter);
read_unlock(&s->ls_guard);
if (o != NULL)
* Allocate new object. This may result in rather complicated
* operations, including fld queries, inode loading, etc.
*/
- o = lu_object_alloc(env, s, f);
+ o = lu_object_alloc(env, dev, f, conf);
if (unlikely(IS_ERR(o)))
return o;
LASSERT(lu_fid_eq(lu_object_fid(o), f));
write_lock(&s->ls_guard);
- shadow = htable_lookup(s, bucket, f);
+ shadow = htable_lookup(s, bucket, f, waiter);
if (likely(shadow == NULL)) {
hlist_add_head(&o->lo_header->loh_hash, bucket);
list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
lu_object_free(env, o);
return shadow;
}
-EXPORT_SYMBOL(lu_object_find);
-/*
+/**
+ * Much like lu_object_find(), but top level device of object is specifically
+ * \a dev rather than top level device of the site. This interface allows
+ * objects of different "stacking" to be created within the same site.
+ */
+struct lu_object *lu_object_find_at(const struct lu_env *env,
+ struct lu_device *dev,
+ const struct lu_fid *f,
+ const struct lu_object_conf *conf)
+{
+ struct lu_object *obj;
+ cfs_waitlink_t wait;
+
+ while (1) {
+ obj = lu_object_find_try(env, dev, f, conf, &wait);
+ if (obj == ERR_PTR(-EAGAIN)) {
+ /*
+ * lu_object_find_try() already added waiter into the
+ * wait queue.
+ */
+ cfs_waitq_wait(&wait, CFS_TASK_UNINT);
+ cfs_waitq_del(&dev->ld_site->ls_marche_funebre, &wait);
+ } else
+ break;
+ }
+ return obj;
+}
+EXPORT_SYMBOL(lu_object_find_at);
+
+/**
+ * Find object with given fid, and return its slice belonging to given device.
+ */
+struct lu_object *lu_object_find_slice(const struct lu_env *env,
+ struct lu_device *dev,
+ const struct lu_fid *f,
+ const struct lu_object_conf *conf)
+{
+ struct lu_object *top;
+ struct lu_object *obj;
+
+ top = lu_object_find(env, dev, f, conf);
+ if (!IS_ERR(top)) {
+ obj = lu_object_locate(top->lo_header, dev->ld_type);
+ if (obj == NULL)
+ lu_object_put(env, top);
+ } else
+ obj = top;
+ return obj;
+}
+EXPORT_SYMBOL(lu_object_find_slice);
+
/**
* Global list of all device types.
*/