struct list_head lsb_lru;
/**
* Wait-queue signaled when an object in this site is ultimately
- * destroyed (lu_object_free()). It is used by lu_object_find() to
- * wait before re-trying when object in the process of destruction is
- * found in the hash table.
+ * destroyed (lu_object_free()) or initialized (lu_object_start()).
+ * It is used by lu_object_find() to wait before re-trying when
+ * object in the process of destruction is found in the hash table;
+ * or wait object to be initialized by the allocator.
*
* \see htable_lookup().
*/
- wait_queue_head_t lsb_marche_funebre;
+ wait_queue_head_t lsb_waitq;
};
enum {
cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
- return &bkt->lsb_marche_funebre;
+ return &bkt->lsb_waitq;
}
EXPORT_SYMBOL(lu_site_wq_from_fid);
* somebody may be waiting for this, currently only
* used for cl_object, see cl_object_put_last().
*/
- wake_up_all(&bkt->lsb_marche_funebre);
+ wake_up_all(&bkt->lsb_waitq);
}
return;
}
*/
static struct lu_object *lu_object_alloc(const struct lu_env *env,
struct lu_device *dev,
- const struct lu_fid *f,
- const struct lu_object_conf *conf)
+ const struct lu_fid *f)
{
- struct lu_object *scan;
struct lu_object *top;
- struct list_head *layers;
- unsigned int init_mask = 0;
- unsigned int init_flag;
- int clean;
- int result;
- ENTRY;
/*
* Create top-level object slice. This will also create
*/
top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
if (top == NULL)
- RETURN(ERR_PTR(-ENOMEM));
+ return ERR_PTR(-ENOMEM);
if (IS_ERR(top))
- RETURN(top);
- /*
- * This is the only place where object fid is assigned. It's constant
- * after this point.
- */
- top->lo_header->loh_fid = *f;
- layers = &top->lo_header->loh_layers;
+ return top;
+ /*
+ * This is the only place where object fid is assigned. It's constant
+ * after this point.
+ */
+ top->lo_header->loh_fid = *f;
+
+ return top;
+}
+
+/**
+ * Initialize object.
+ *
+ * This is called after object hash insertion to avoid returning an object with
+ * stale attributes.
+ */
+static int lu_object_start(const struct lu_env *env, struct lu_device *dev,
+ struct lu_object *top,
+ const struct lu_object_conf *conf)
+{
+ struct lu_object *scan;
+ struct list_head *layers;
+ unsigned int init_mask = 0;
+ unsigned int init_flag;
+ int clean;
+ int result;
+
+ layers = &top->lo_header->loh_layers;
do {
/*
clean = 0;
scan->lo_header = top->lo_header;
result = scan->lo_ops->loo_object_init(env, scan, conf);
- if (result != 0) {
- lu_object_free(env, top);
- RETURN(ERR_PTR(result));
- }
+ if (result)
+ return result;
+
init_mask |= init_flag;
next:
init_flag <<= 1;
} while (!clean);
list_for_each_entry_reverse(scan, layers, lo_linkage) {
- if (scan->lo_ops->loo_object_start != NULL) {
- result = scan->lo_ops->loo_object_start(env, scan);
- if (result != 0) {
- lu_object_free(env, top);
- RETURN(ERR_PTR(result));
- }
- }
- }
+ if (scan->lo_ops->loo_object_start != NULL) {
+ result = scan->lo_ops->loo_object_start(env, scan);
+ if (result)
+ return result;
+ }
+ }
+
+ lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
- lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
- RETURN(top);
+ set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags);
+
+ return 0;
}
/**
const struct lu_fid *f,
__u64 *version)
{
- struct lu_site_bkt_data *bkt;
struct lu_object_header *h;
struct hlist_node *hnode;
__u64 ver = cfs_hash_bd_version_get(bd);
return ERR_PTR(-ENOENT);
*version = ver;
- bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
/* cfs_hash_bd_peek_locked is a somehow "internal" function
* of cfs_hash, it doesn't add refcount on object. */
hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
struct lu_site *s;
struct cfs_hash *hs;
struct cfs_hash_bd bd;
+ struct lu_site_bkt_data *bkt;
+ struct l_wait_info lwi = { 0 };
__u64 version = 0;
+ int rc;
+
+ ENTRY;
/*
* This uses standard index maintenance protocol:
*/
s = dev->ld_site;
hs = s->ls_obj_hash;
+
+ if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
+ lu_site_purge(env, s, -1);
+
cfs_hash_bd_get(hs, f, &bd);
+ bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
if (!(conf && conf->loc_flags & LOC_F_NEW)) {
cfs_hash_bd_lock(hs, &bd, 1);
o = htable_lookup(s, &bd, f, &version);
cfs_hash_bd_unlock(hs, &bd, 1);
- if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
- return o;
+ if (!IS_ERR(o)) {
+ if (likely(lu_object_is_inited(o->lo_header)))
+ RETURN(o);
+
+ l_wait_event(bkt->lsb_waitq,
+ lu_object_is_inited(o->lo_header) ||
+ lu_object_is_dying(o->lo_header), &lwi);
+
+ if (lu_object_is_dying(o->lo_header)) {
+ lu_object_put(env, o);
+
+ RETURN(ERR_PTR(-ENOENT));
+ }
+
+ RETURN(o);
+ }
+
+ if (PTR_ERR(o) != -ENOENT)
+ RETURN(o);
}
+
/*
- * Allocate new object. This may result in rather complicated
- * operations, including fld queries, inode loading, etc.
+ * Allocate new object, NB, object is unitialized in case object
+ * is changed between allocation and hash insertion, thus the object
+ * with stale attributes is returned.
*/
- o = lu_object_alloc(env, dev, f, conf);
+ o = lu_object_alloc(env, dev, f);
if (IS_ERR(o))
- return o;
+ RETURN(o);
LASSERT(lu_fid_eq(lu_object_fid(o), f));
+ CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
+
cfs_hash_bd_lock(hs, &bd, 1);
if (conf && conf->loc_flags & LOC_F_NEW)
cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
cfs_hash_bd_unlock(hs, &bd, 1);
+ /*
+ * This may result in rather complicated operations, including
+ * fld queries, inode loading, etc.
+ */
+ rc = lu_object_start(env, dev, o, conf);
+ if (rc) {
+ lu_object_put_nocache(env, o);
+ RETURN(ERR_PTR(rc));
+ }
+
+ wake_up_all(&bkt->lsb_waitq);
+
lu_object_limit(env, dev);
- return o;
+ RETURN(o);
}
lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
cfs_hash_bd_unlock(hs, &bd, 1);
lu_object_free(env, o);
- return shadow;
+
+ if (!(conf && conf->loc_flags & LOC_F_NEW) &&
+ !lu_object_is_inited(shadow->lo_header)) {
+ l_wait_event(bkt->lsb_waitq,
+ lu_object_is_inited(shadow->lo_header) ||
+ lu_object_is_dying(shadow->lo_header), &lwi);
+
+ if (lu_object_is_dying(shadow->lo_header)) {
+ lu_object_put(env, shadow);
+
+ RETURN(ERR_PTR(-ENOENT));
+ }
+ }
+
+ RETURN(shadow);
}
EXPORT_SYMBOL(lu_object_find_at);
cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
INIT_LIST_HEAD(&bkt->lsb_lru);
- init_waitqueue_head(&bkt->lsb_marche_funebre);
+ init_waitqueue_head(&bkt->lsb_waitq);
}
s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
struct lu_device *dev,
const struct lu_object_conf *conf)
{
- struct lu_fid fid;
+ struct lu_fid fid;
struct lu_object *o;
+ int rc;
fid_zero(&fid);
- o = lu_object_alloc(env, dev, &fid, conf);
+ o = lu_object_alloc(env, dev, &fid);
+ if (!IS_ERR(o)) {
+ rc = lu_object_start(env, dev, o, conf);
+ if (rc) {
+ lu_object_free(env, o);
+ return ERR_PTR(rc);
+ }
+ }
return o;
}