From fa14bdf6b648d1d4023a4fa88789059d185f4a07 Mon Sep 17 00:00:00 2001 From: Lai Siyao Date: Tue, 23 May 2017 15:56:06 +0800 Subject: [PATCH] LU-9049 obdclass: change object lookup to no wait mode Currently we set LU_OBJECT_HEARD_BANSHEE on object when we want to remove object from cache, but this may lead to deadlock, because when other process lookup such object, it needs to wait for this object until release (done at last refcount put), while that process maybe already hold an LDLM lock. Now that current code can handle dying object correctly, we can just return such object in lookup, thus the above deadlock can be avoided. There is another case we need to make some changes: objects created in OUT doesn't set dt_body_ops for LOD layer because originally it's set by lod_create(), change to set dt_body_ops in lod_object_init() so objects created in OUT is no different from those created in MDT. To achieve this, functions in lod_body_ops should check file type inside to avoid misuse. Signed-off-by: Lai Siyao Change-Id: Ia31ab5f09f9bf80a9aa8fd7e7b60348b02400b25 Reviewed-on: https://review.whamcloud.com/26965 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Alex Zhuravlev Tested-by: Cliff White Reviewed-by: Fan Yong Reviewed-by: Oleg Drokin --- lustre/include/lu_object.h | 6 +- lustre/lfsck/lfsck_internal.h | 28 +--- lustre/lfsck/lfsck_layout.c | 9 +- lustre/lfsck/lfsck_striped_dir.c | 4 +- lustre/lod/lod_internal.h | 1 + lustre/lod/lod_object.c | 52 ++----- lustre/obdclass/lu_object.c | 311 +++++++++++++++++---------------------- lustre/target/out_handler.c | 10 -- 8 files changed, 154 insertions(+), 267 deletions(-) diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index 8c10fdb..ab41873 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -166,10 +166,6 @@ typedef enum { /* This is a new object to be allocated, or the file * corresponding to the object does not exists. */ LOC_F_NEW = 0x00000001, - - /* When find a dying object, just return -EAGAIN at once instead of - * blocking the thread. */ - LOC_F_NOWAIT = 0x00000002, } loc_flags_t; /** @@ -711,7 +707,7 @@ static inline void lu_object_get(struct lu_object *o) } /** - * Return true of object will not be cached after last reference to it is + * Return true if object will not be cached after last reference to it is * released. */ static inline int lu_object_is_dying(const struct lu_object_header *h) diff --git a/lustre/lfsck/lfsck_internal.h b/lustre/lfsck/lfsck_internal.h index f497b13..3ab2472 100644 --- a/lustre/lfsck/lfsck_internal.h +++ b/lustre/lfsck/lfsck_internal.h @@ -1223,9 +1223,7 @@ static inline umode_t lfsck_object_type(const struct dt_object *obj) static inline int lfsck_is_dead_obj(const struct dt_object *obj) { - struct lu_object_header *loh = obj->do_lu.lo_header; - - return !!test_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags); + return lu_object_is_dying(obj->do_lu.lo_header); } static inline struct dt_object *lfsck_object_get(struct dt_object *obj) @@ -1262,16 +1260,6 @@ lfsck_object_find_by_dev_new(const struct lu_env *env, struct dt_device *dev, } static inline struct dt_object * -lfsck_object_find_by_dev_nowait(const struct lu_env *env, struct dt_device *dev, - const struct lu_fid *fid) -{ - struct lu_object_conf *conf = &lfsck_env_info(env)->lti_conf; - - conf->loc_flags = LOC_F_NOWAIT; - return lu2dt(lu_object_find_slice(env, dt2lu_dev(dev), fid, conf)); -} - -static inline struct dt_object * lfsck_object_find_by_dev(const struct lu_env *env, struct dt_device *dev, const struct lu_fid *fid) { @@ -1321,20 +1309,6 @@ lfsck_object_find_bottom(const struct lu_env *env, struct lfsck_instance *lfsck, } static inline struct dt_object * -lfsck_object_find_bottom_nowait(const struct lu_env *env, - struct lfsck_instance *lfsck, - const struct lu_fid *fid) -{ - struct dt_device *dev; - - dev = lfsck_find_dev_by_fid(env, lfsck, fid); - if (IS_ERR(dev)) - return (struct dt_object *)dev; - - return lfsck_object_find_by_dev_nowait(env, dev, fid); -} - -static inline struct dt_object * lfsck_object_find_bottom_new(const struct lu_env *env, struct lfsck_instance *lfsck, const struct lu_fid *fid) diff --git a/lustre/lfsck/lfsck_layout.c b/lustre/lfsck/lfsck_layout.c index 06390d8..5a00031 100644 --- a/lustre/lfsck/lfsck_layout.c +++ b/lustre/lfsck/lfsck_layout.c @@ -5292,13 +5292,8 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env, * cause the parent object cannot be purged, then cause the * child object cannot be purged also. So the LFSCK thread * will fall into deadlock. - * - * We introduce non-blocked version lu_object_find() to allow - * the LFSCK thread to return failure immediately (instead of - * wait) when it finds dying (child) object, then the LFSCK - * thread can check whether the parent object is dying or not. - * So avoid above deadlock. LU-5395 */ - cobj = lfsck_object_find_by_dev_nowait(env, tgt->ltd_tgt, fid); + */ + cobj = lfsck_object_find_by_dev(env, tgt->ltd_tgt, fid); if (IS_ERR(cobj)) { if (lfsck_is_dead_obj(parent)) { lfsck_tgt_put(tgt); diff --git a/lustre/lfsck/lfsck_striped_dir.c b/lustre/lfsck/lfsck_striped_dir.c index 857953a..0872896 100644 --- a/lustre/lfsck/lfsck_striped_dir.c +++ b/lustre/lfsck/lfsck_striped_dir.c @@ -1932,7 +1932,7 @@ int lfsck_namespace_striped_dir_rescan(const struct lu_env *env, cname = lfsck_name_get_const(env, info->lti_tmpbuf, len); memcpy(lnr->lnr_name, info->lti_tmpbuf, len); - obj = lfsck_object_find_bottom_nowait(env, lfsck, cfid); + obj = lfsck_object_find_bottom(env, lfsck, cfid); if (IS_ERR(obj)) { if (dir == NULL) { dir = lfsck_assistant_object_load(env, lfsck, @@ -2309,7 +2309,7 @@ int lfsck_namespace_handle_striped_master(const struct lu_env *env, dev = ltd->ltd_tgt; } - obj = lfsck_object_find_by_dev_nowait(env, dev, &lnr->lnr_fid); + obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid); if (IS_ERR(obj)) { if (lfsck_is_dead_obj(dir)) RETURN(0); diff --git a/lustre/lod/lod_internal.h b/lustre/lod/lod_internal.h index 09baa1a..2970d93 100644 --- a/lustre/lod/lod_internal.h +++ b/lustre/lod/lod_internal.h @@ -662,6 +662,7 @@ void lod_procfs_fini(struct lod_device *lod); /* lod_object.c */ extern struct dt_object_operations lod_obj_ops; extern struct lu_object_operations lod_lu_obj_ops; + int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, struct lu_buf *buf, bool resize); int lod_declare_striped_create(const struct lu_env *env, struct dt_object *dt, diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index f4a86a9..bc1ac0e 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -58,9 +58,6 @@ static const char dot[] = "."; static const char dotdot[] = ".."; -static const struct dt_body_operations lod_body_lnk_ops; -static const struct dt_body_operations lod_body_ops; - /** * Implementation of dt_index_operations::dio_lookup * @@ -4212,11 +4209,6 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, if (rc != 0) GOTO(out, rc); - if (dof->dof_type == DFT_SYM) - dt->do_body_ops = &lod_body_lnk_ops; - else if (dof->dof_type == DFT_REGULAR) - dt->do_body_ops = &lod_body_ops; - /* * it's lod_ah_init() that has decided the object will be striped */ @@ -5024,6 +5016,9 @@ static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, loff_t *pos) { struct dt_object *next = dt_object_child(dt); + + LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr) || + S_ISLNK(dt->do_lu.lo_header->loh_attr)); return next->do_body_ops->dbo_read(env, next, buf, pos); } @@ -5050,6 +5045,8 @@ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, struct thandle *th, int iq) { + LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr) || + S_ISLNK(dt->do_lu.lo_header->loh_attr)); return lod_sub_write(env, dt_object_child(dt), buf, pos, th, iq); } @@ -5068,16 +5065,17 @@ static int lod_punch(const struct lu_env *env, struct dt_object *dt, if (dt_object_remote(dt)) return -ENOTSUPP; + LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr)); return lod_sub_punch(env, dt_object_child(dt), start, end, th); } -static const struct dt_body_operations lod_body_lnk_ops = { - .dbo_read = lod_read, - .dbo_declare_write = lod_declare_write, - .dbo_write = lod_write -}; - -static const struct dt_body_operations lod_body_ops = { +/* + * different type of files use the same body_ops because object may be created + * in OUT, where there is no chance to set correct body_ops for each type, so + * body_ops themselves will check file type inside, see lod_read/write/punch for + * details. + */ +const struct dt_body_operations lod_body_ops = { .dbo_read = lod_read, .dbo_declare_write = lod_declare_write, .dbo_write = lod_write, @@ -5154,6 +5152,8 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, if (unlikely(cobj == NULL)) RETURN(-ENOMEM); + lu2lod_obj(lo)->ldo_obj.do_body_ops = &lod_body_ops; + lu_object_add(lo, cobj); RETURN(0); @@ -5213,27 +5213,6 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) } /** - * Implementation of lu_object_operations::loo_object_start. - * - * \see lu_object_operations::loo_object_start() in the API description - * for details. - */ -static int lod_object_start(const struct lu_env *env, struct lu_object *o) -{ - if (S_ISLNK(o->lo_header->loh_attr & S_IFMT)) { - lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops; - } else if (S_ISREG(o->lo_header->loh_attr & S_IFMT) || - fid_is_local_file(lu_object_fid(o))) { - /* Note: some local file (like last rcvd) is created - * through bottom layer (OSD), so the object initialization - * comes to lod, it does not set loh_attr yet, so - * set do_body_ops for local file anyway */ - lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_ops; - } - return 0; -} - -/** * Implementation of lu_object_operations::loo_object_free. * * \see lu_object_operations::loo_object_free() in the API description @@ -5277,7 +5256,6 @@ static int lod_object_print(const struct lu_env *env, void *cookie, struct lu_object_operations lod_lu_obj_ops = { .loo_object_init = lod_object_init, - .loo_object_start = lod_object_start, .loo_object_free = lod_object_free, .loo_object_release = lod_object_release, .loo_object_print = lod_object_print, diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index cc89029..296564f 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -92,16 +92,16 @@ static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx); */ void lu_object_put(const struct lu_env *env, struct lu_object *o) { - struct lu_site_bkt_data *bkt; - struct lu_object_header *top; - struct lu_site *site; - struct lu_object *orig; - struct cfs_hash_bd bd; - const struct lu_fid *fid; + struct lu_site_bkt_data *bkt; + struct lu_object_header *top; + struct lu_site *site; + struct lu_object *orig; + struct cfs_hash_bd bd; + const struct lu_fid *fid; - top = o->lo_header; - site = o->lo_dev->ld_site; - orig = o; + top = o->lo_header; + site = o->lo_dev->ld_site; + orig = o; /* * till we have full fids-on-OST implemented anonymous objects @@ -123,12 +123,11 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) return; } - cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd); - bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd); + cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd); + bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd); if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) { if (lu_object_is_dying(top)) { - /* * somebody may be waiting for this, currently only * used for cl_object, see cl_object_put_last(). @@ -138,14 +137,14 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) return; } - /* - * When last reference is released, iterate over object - * layers, and notify them that object is no longer busy. - */ + /* + * When last reference is released, iterate over object + * layers, and notify them that object is no longer busy. + */ list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) { - if (o->lo_ops->loo_object_release != NULL) - o->lo_ops->loo_object_release(env, o); - } + if (o->lo_ops->loo_object_release != NULL) + o->lo_ops->loo_object_release(env, o); + } if (!lu_object_is_dying(top) && (lu_object_exists(orig) || lu_object_is_cl(orig))) { @@ -156,29 +155,29 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) CDEBUG(D_INODE, "Add %p to site lru. hash: %p, bkt: %p, " "lru_len: %ld\n", o, site->ls_obj_hash, bkt, bkt->lsb_lru_len); - cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); - return; - } + cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); + return; + } - /* + /* * If object is dying (will not be cached) then remove it - * from hash table and LRU. - * - * This is done with hash table and LRU lists locked. As the only - * way to acquire first reference to previously unreferenced - * object is through hash-table lookup (lu_object_find()), - * or LRU scanning (lu_site_purge()), that are done under hash-table - * and LRU lock, no race with concurrent object lookup is possible - * and we can safely destroy object below. - */ + * from hash table and LRU. + * + * This is done with hash table and LRU lists locked. As the only + * way to acquire first reference to previously unreferenced + * object is through hash-table lookup (lu_object_find()), + * or LRU scanning (lu_site_purge()), that are done under hash-table + * and LRU lock, no race with concurrent object lookup is possible + * and we can safely destroy object below. + */ if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash); - cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); - /* - * Object was already removed from hash and lru above, can - * kill it. - */ - lu_object_free(env, orig); + cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); + /* + * Object was already removed from hash and lru above, can + * kill it. + */ + lu_object_free(env, orig); } EXPORT_SYMBOL(lu_object_put); @@ -590,53 +589,35 @@ int lu_object_invariant(const struct lu_object *o) static struct lu_object *htable_lookup(struct lu_site *s, struct cfs_hash_bd *bd, const struct lu_fid *f, - wait_queue_t *waiter, __u64 *version) { struct lu_site_bkt_data *bkt; struct lu_object_header *h; - struct hlist_node *hnode; - __u64 ver = cfs_hash_bd_version_get(bd); + struct hlist_node *hnode; + __u64 ver = cfs_hash_bd_version_get(bd); - if (*version == ver) + if (*version == ver) return ERR_PTR(-ENOENT); - *version = ver; - bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd); + *version = ver; + bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd); /* cfs_hash_bd_peek_locked is a somehow "internal" function * of cfs_hash, it doesn't add refcount on object. */ hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f); - if (hnode == NULL) { - lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS); + if (!hnode) { + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS); return ERR_PTR(-ENOENT); - } - - h = container_of0(hnode, struct lu_object_header, loh_hash); - if (likely(!lu_object_is_dying(h))) { - cfs_hash_get(s->ls_obj_hash, hnode); - lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT); - if (!list_empty(&h->loh_lru)) { - list_del_init(&h->loh_lru); - bkt->lsb_lru_len--; - percpu_counter_dec(&s->ls_lru_len_counter); - } - return lu_object_top(h); - } - - /* - * Lookup found an object being destroyed this object cannot be - * returned (to assure that references to dying objects are eventually - * drained), and moreover, lookup has to wait until object is freed. - */ - - if (likely(waiter != NULL)) { - init_waitqueue_entry(waiter, current); - add_wait_queue(&bkt->lsb_marche_funebre, waiter); - set_current_state(TASK_UNINTERRUPTIBLE); - lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE); } - return ERR_PTR(-EAGAIN); + h = container_of0(hnode, struct lu_object_header, loh_hash); + cfs_hash_get(s->ls_obj_hash, hnode); + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT); + if (!list_empty(&h->loh_lru)) { + list_del_init(&h->loh_lru); + bkt->lsb_lru_len--; + percpu_counter_dec(&s->ls_lru_len_counter); + } + return lu_object_top(h); } /** @@ -677,132 +658,104 @@ static void lu_object_limit(const struct lu_env *env, } static struct lu_object *lu_object_new(const struct lu_env *env, - struct lu_device *dev, - const struct lu_fid *f, - const struct lu_object_conf *conf) + struct lu_device *dev, + const struct lu_fid *f, + const struct lu_object_conf *conf) { - struct lu_object *o; - struct cfs_hash *hs; - struct cfs_hash_bd bd; + struct lu_object *o; + struct cfs_hash *hs; + struct cfs_hash_bd bd; - o = lu_object_alloc(env, dev, f, conf); - if (unlikely(IS_ERR(o))) - return o; + o = lu_object_alloc(env, dev, f, conf); + if (unlikely(IS_ERR(o))) + return o; - hs = dev->ld_site->ls_obj_hash; - cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); - cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); - cfs_hash_bd_unlock(hs, &bd, 1); + hs = dev->ld_site->ls_obj_hash; + cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); + cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); + cfs_hash_bd_unlock(hs, &bd, 1); lu_object_limit(env, dev); - return o; + return o; } /** * Core logic of lu_object_find*() functions. + * + * Much like lu_object_find(), but top level device of object is specifically + * \a dev rather than top level device of the site. This interface allows + * objects of different "stacking" to be created within the same site. */ -static struct lu_object *lu_object_find_try(const struct lu_env *env, - struct lu_device *dev, - const struct lu_fid *f, - const struct lu_object_conf *conf, - wait_queue_t *waiter) +struct lu_object *lu_object_find_at(const struct lu_env *env, + struct lu_device *dev, + const struct lu_fid *f, + const struct lu_object_conf *conf) { - struct lu_object *o; - struct lu_object *shadow; - struct lu_site *s; - struct cfs_hash *hs; - struct cfs_hash_bd bd; - __u64 version = 0; + struct lu_object *o; + struct lu_object *shadow; + struct lu_site *s; + struct cfs_hash *hs; + struct cfs_hash_bd bd; + __u64 version = 0; - /* - * This uses standard index maintenance protocol: - * - * - search index under lock, and return object if found; - * - otherwise, unlock index, allocate new object; - * - lock index and search again; - * - if nothing is found (usual case), insert newly created - * object into index; - * - otherwise (race: other thread inserted object), free - * object just allocated. - * - unlock index; - * - return object. - * - * For "LOC_F_NEW" case, we are sure the object is new established. - * It is unnecessary to perform lookup-alloc-lookup-insert, instead, - * just alloc and insert directly. - * - * If dying object is found during index search, add @waiter to the - * site wait-queue and return ERR_PTR(-EAGAIN). - */ - if (conf != NULL && conf->loc_flags & LOC_F_NEW) - return lu_object_new(env, dev, f, conf); - - s = dev->ld_site; - hs = s->ls_obj_hash; - cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); - o = htable_lookup(s, &bd, f, waiter, &version); - cfs_hash_bd_unlock(hs, &bd, 1); + /* + * This uses standard index maintenance protocol: + * + * - search index under lock, and return object if found; + * - otherwise, unlock index, allocate new object; + * - lock index and search again; + * - if nothing is found (usual case), insert newly created + * object into index; + * - otherwise (race: other thread inserted object), free + * object just allocated. + * - unlock index; + * - return object. + * + * For "LOC_F_NEW" case, we are sure the object is new established. + * It is unnecessary to perform lookup-alloc-lookup-insert, instead, + * just alloc and insert directly. + * + * If dying object is found during index search, add @waiter to the + * site wait-queue and return ERR_PTR(-EAGAIN). + */ + if (conf && conf->loc_flags & LOC_F_NEW) + return lu_object_new(env, dev, f, conf); + + s = dev->ld_site; + hs = s->ls_obj_hash; + cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); + o = htable_lookup(s, &bd, f, &version); + cfs_hash_bd_unlock(hs, &bd, 1); if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT) - return o; + return o; - /* - * Allocate new object. This may result in rather complicated - * operations, including fld queries, inode loading, etc. - */ - o = lu_object_alloc(env, dev, f, conf); - if (unlikely(IS_ERR(o))) - return o; + /* + * Allocate new object. This may result in rather complicated + * operations, including fld queries, inode loading, etc. + */ + o = lu_object_alloc(env, dev, f, conf); + if (unlikely(IS_ERR(o))) + return o; - LASSERT(lu_fid_eq(lu_object_fid(o), f)); + LASSERT(lu_fid_eq(lu_object_fid(o), f)); - cfs_hash_bd_lock(hs, &bd, 1); + cfs_hash_bd_lock(hs, &bd, 1); - shadow = htable_lookup(s, &bd, f, waiter, &version); + shadow = htable_lookup(s, &bd, f, &version); if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) { - cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); - cfs_hash_bd_unlock(hs, &bd, 1); + cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); + cfs_hash_bd_unlock(hs, &bd, 1); lu_object_limit(env, dev); - return o; - } - - lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE); - cfs_hash_bd_unlock(hs, &bd, 1); - lu_object_free(env, o); - return shadow; -} - -/** - * Much like lu_object_find(), but top level device of object is specifically - * \a dev rather than top level device of the site. This interface allows - * objects of different "stacking" to be created within the same site. - */ -struct lu_object *lu_object_find_at(const struct lu_env *env, - struct lu_device *dev, - const struct lu_fid *f, - const struct lu_object_conf *conf) -{ - struct lu_site_bkt_data *bkt; - struct lu_object *obj; - wait_queue_t wait; - - if (conf != NULL && conf->loc_flags & LOC_F_NOWAIT) - return lu_object_find_try(env, dev, f, conf, NULL); - - while (1) { - obj = lu_object_find_try(env, dev, f, conf, &wait); - if (obj != ERR_PTR(-EAGAIN)) - return obj; - /* - * lu_object_find_try() already added waiter into the - * wait queue. - */ - schedule(); - bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f); - remove_wait_queue(&bkt->lsb_marche_funebre, &wait); + return o; } + + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE); + cfs_hash_bd_unlock(hs, &bd, 1); + lu_object_free(env, o); + return shadow; } EXPORT_SYMBOL(lu_object_find_at); @@ -2298,10 +2251,10 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o, cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1); #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK { - __u64 version = 0; - wait_queue_t waiter; - struct lu_object *shadow; - shadow = htable_lookup(s, &bd, fid, &waiter, &version); + __u64 version = 0; + struct lu_object *shadow; + + shadow = htable_lookup(s, &bd, fid, &version); /* supposed to be unique */ LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT); } diff --git a/lustre/target/out_handler.c b/lustre/target/out_handler.c index 6ad753e..8e761c1 100644 --- a/lustre/target/out_handler.c +++ b/lustre/target/out_handler.c @@ -809,16 +809,6 @@ static int out_trans_stop(const struct lu_env *env, rc = dt_trans_stop(env, ta->ta_handle->th_dev, ta->ta_handle); for (i = 0; i < ta->ta_argno; i++) { if (ta->ta_args[i]->object != NULL) { - struct dt_object *obj = ta->ta_args[i]->object; - - /* If the object is being created during this - * transaction, we need to remove them from the - * cache immediately, because a few layers are - * missing in OUT handler, i.e. the object might - * not be initialized in all layers */ - if (ta->ta_args[i]->exec_fn == out_tx_create_exec) - set_bit(LU_OBJECT_HEARD_BANSHEE, - &obj->do_lu.lo_header->loh_flags); dt_object_put(env, ta->ta_args[i]->object); ta->ta_args[i]->object = NULL; } -- 1.8.3.1