X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Flu_object.c;h=366099065a887d0e72770ce28890fb4ba997d9bf;hp=2053bd3c031ea21e8fc8fe4c938da25d6ccaa6dd;hb=f625f670afbe954030ff81f0f8522137d6cdd335;hpb=7817e4c785d075aae76b635dcf799064590833b0 diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index 2053bd3..3660990 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -140,8 +140,6 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) return; } - LASSERT(bkt->lsb_busy > 0); - bkt->lsb_busy--; /* * When last reference is released, iterate over object * layers, and notify them that object is no longer busy. @@ -151,15 +149,17 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) o->lo_ops->loo_object_release(env, o); } - if (!lu_object_is_dying(top)) { + if (!lu_object_is_dying(top) && + (lu_object_exists(orig) || lu_object_is_cl(orig))) { LASSERT(list_empty(&top->loh_lru)); list_add_tail(&top->loh_lru, &bkt->lsb_lru); + bkt->lsb_lru_len++; cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); return; } /* - * If object is dying (will not be cached), removed it + * If object is dying (will not be cached) then remove it * from hash table and LRU. * * This is done with hash table and LRU lists locked. As the only @@ -206,7 +206,13 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o) cfs_hash_bd_t bd; cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1); - list_del_init(&top->loh_lru); + if (!list_empty(&top->loh_lru)) { + struct lu_site_bkt_data *bkt; + + list_del_init(&top->loh_lru); + bkt = cfs_hash_bd_extra_get(obj_hash, &bd); + bkt->lsb_lru_len--; + } cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash); cfs_hash_bd_unlock(obj_hash, &bd, 1); } @@ -382,6 +388,7 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr) cfs_hash_bd_del_locked(s->ls_obj_hash, &bd2, &h->loh_hash); list_move(&h->loh_lru, &dispose); + bkt->lsb_lru_len--; if (did_sth == 0) did_sth = 1; @@ -565,7 +572,6 @@ int lu_object_invariant(const struct lu_object *o) } return 1; } -EXPORT_SYMBOL(lu_object_invariant); static struct lu_object *htable_lookup(struct lu_site *s, cfs_hash_bd_t *bd, @@ -595,7 +601,10 @@ static struct lu_object *htable_lookup(struct lu_site *s, if (likely(!lu_object_is_dying(h))) { cfs_hash_get(s->ls_obj_hash, hnode); lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT); - list_del_init(&h->loh_lru); + if (!list_empty(&h->loh_lru)) { + list_del_init(&h->loh_lru); + bkt->lsb_lru_len--; + } return lu_object_top(h); } @@ -615,31 +624,6 @@ static struct lu_object *htable_lookup(struct lu_site *s, return ERR_PTR(-EAGAIN); } -static struct lu_object *htable_lookup_nowait(struct lu_site *s, - cfs_hash_bd_t *bd, - const struct lu_fid *f) -{ - struct hlist_node *hnode; - struct lu_object_header *h; - - /* cfs_hash_bd_peek_locked is a somehow "internal" function - * of cfs_hash, it doesn't add refcount on object. */ - hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f); - if (hnode == NULL) { - lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS); - return ERR_PTR(-ENOENT); - } - - h = container_of0(hnode, struct lu_object_header, loh_hash); - if (unlikely(lu_object_is_dying(h))) - return ERR_PTR(-ENOENT); - - cfs_hash_get(s->ls_obj_hash, hnode); - lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT); - list_del_init(&h->loh_lru); - return lu_object_top(h); -} - /** * Search cache for an object with the fid \a f. If such object is found, * return it. Otherwise, create new object, insert it into cache and return @@ -685,7 +669,6 @@ static struct lu_object *lu_object_new(const struct lu_env *env, struct lu_object *o; cfs_hash_t *hs; cfs_hash_bd_t bd; - struct lu_site_bkt_data *bkt; o = lu_object_alloc(env, dev, f, conf); if (unlikely(IS_ERR(o))) @@ -693,9 +676,7 @@ static struct lu_object *lu_object_new(const struct lu_env *env, hs = dev->ld_site->ls_obj_hash; cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); - bkt = cfs_hash_bd_extra_get(hs, &bd); cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); - bkt->lsb_busy++; cfs_hash_bd_unlock(hs, &bd, 1); lu_object_limit(env, dev); @@ -764,11 +745,7 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env, shadow = htable_lookup(s, &bd, f, waiter, &version); if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) { - struct lu_site_bkt_data *bkt; - - bkt = cfs_hash_bd_extra_get(hs, &bd); cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); - bkt->lsb_busy++; cfs_hash_bd_unlock(hs, &bd, 1); lu_object_limit(env, dev); @@ -815,30 +792,6 @@ struct lu_object *lu_object_find_at(const struct lu_env *env, EXPORT_SYMBOL(lu_object_find_at); /** - * Try to find the object in cache without waiting for the dead object - * to be released nor allocating object if no cached one was found. - * - * The found object will be set as LU_OBJECT_HEARD_BANSHEE for purging. - */ -void lu_object_purge(const struct lu_env *env, struct lu_device *dev, - const struct lu_fid *f) -{ - struct lu_site *s = dev->ld_site; - cfs_hash_t *hs = s->ls_obj_hash; - cfs_hash_bd_t bd; - struct lu_object *o; - - cfs_hash_bd_get_and_lock(hs, f, &bd, 1); - o = htable_lookup_nowait(s, &bd, f); - cfs_hash_bd_unlock(hs, &bd, 1); - if (!IS_ERR(o)) { - set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags); - lu_object_put(env, o); - } -} -EXPORT_SYMBOL(lu_object_purge); - -/** * Find object with given fid, and return its slice belonging to given device. */ struct lu_object *lu_object_find_slice(const struct lu_env *env, @@ -1049,14 +1002,7 @@ static void lu_obj_hop_get(cfs_hash_t *hs, struct hlist_node *hnode) struct lu_object_header *h; h = hlist_entry(hnode, struct lu_object_header, loh_hash); - if (atomic_add_return(1, &h->loh_ref) == 1) { - struct lu_site_bkt_data *bkt; - cfs_hash_bd_t bd; - - cfs_hash_bd_get(hs, &h->loh_fid, &bd); - bkt = cfs_hash_bd_extra_get(hs, &bd); - bkt->lsb_busy++; - } + atomic_inc(&h->loh_ref); } static void lu_obj_hop_put_locked(cfs_hash_t *hs, struct hlist_node *hnode) @@ -1408,7 +1354,6 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top) } } } -EXPORT_SYMBOL(lu_stack_fini); enum { /** @@ -1420,6 +1365,7 @@ enum { static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, }; static DEFINE_SPINLOCK(lu_keys_guard); +static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0); /** * Global counter incremented whenever key is registered, unregistered, @@ -1496,6 +1442,19 @@ void lu_context_key_degister(struct lu_context_key *key) ++key_set_version; spin_lock(&lu_keys_guard); key_fini(&lu_shrink_env.le_ctx, key->lct_index); + + /** + * Wait until all transient contexts referencing this key have + * run lu_context_key::lct_fini() method. + */ + while (atomic_read(&key->lct_used) > 1) { + spin_unlock(&lu_keys_guard); + CDEBUG(D_INFO, "lu_context_key_degister: \"%s\" %p, %d\n", + key->lct_owner ? key->lct_owner->name : "", key, + atomic_read(&key->lct_used)); + schedule(); + spin_lock(&lu_keys_guard); + } if (lu_keys[key->lct_index]) { lu_keys[key->lct_index] = NULL; lu_ref_fini(&key->lct_reference); @@ -1622,11 +1581,27 @@ void lu_context_key_quiesce(struct lu_context_key *key) * XXX layering violation. */ cl_env_cache_purge(~0); - key->lct_tags |= LCT_QUIESCENT; /* * XXX memory barrier has to go here. */ spin_lock(&lu_keys_guard); + key->lct_tags |= LCT_QUIESCENT; + + /** + * Wait until all lu_context_key::lct_init() methods + * have completed. + */ + while (atomic_read(&lu_key_initing_cnt) > 0) { + spin_unlock(&lu_keys_guard); + CDEBUG(D_INFO, "lu_context_key_quiesce: \"%s\"" + " %p, %d (%d)\n", + key->lct_owner ? key->lct_owner->name : "", + key, atomic_read(&key->lct_used), + atomic_read(&lu_key_initing_cnt)); + schedule(); + spin_lock(&lu_keys_guard); + } + list_for_each_entry(ctx, &lu_context_remembered, lc_remember) key_fini(ctx, key->lct_index); @@ -1634,14 +1609,12 @@ void lu_context_key_quiesce(struct lu_context_key *key) ++key_set_version; } } -EXPORT_SYMBOL(lu_context_key_quiesce); void lu_context_key_revive(struct lu_context_key *key) { key->lct_tags &= ~LCT_QUIESCENT; ++key_set_version; } -EXPORT_SYMBOL(lu_context_key_revive); static void keys_fini(struct lu_context *ctx) { @@ -1661,6 +1634,19 @@ static int keys_fill(struct lu_context *ctx) { unsigned int i; + /* + * A serialisation with lu_context_key_quiesce() is needed, but some + * "key->lct_init()" are calling kernel memory allocation routine and + * can't be called while holding a spin_lock. + * "lu_keys_guard" is held while incrementing "lu_key_initing_cnt" + * to ensure the start of the serialisation. + * An atomic_t variable is still used, in order not to reacquire the + * lock when decrementing the counter. + */ + spin_lock(&lu_keys_guard); + atomic_inc(&lu_key_initing_cnt); + spin_unlock(&lu_keys_guard); + LINVRNT(ctx->lc_value != NULL); for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { struct lu_context_key *key; @@ -1678,13 +1664,19 @@ static int keys_fill(struct lu_context *ctx) LINVRNT(key->lct_init != NULL); LINVRNT(key->lct_index == i); - value = key->lct_init(ctx, key); - if (unlikely(IS_ERR(value))) - return PTR_ERR(value); - LASSERT(key->lct_owner != NULL); - if (!(ctx->lc_tags & LCT_NOREF)) - try_module_get(key->lct_owner); + if (!(ctx->lc_tags & LCT_NOREF) && + try_module_get(key->lct_owner) == 0) { + /* module is unloading, skip this key */ + continue; + } + + value = key->lct_init(ctx, key); + if (unlikely(IS_ERR(value))) { + atomic_dec(&lu_key_initing_cnt); + return PTR_ERR(value); + } + lu_ref_add_atomic(&key->lct_reference, "ctx", ctx); atomic_inc(&key->lct_used); /* @@ -1698,6 +1690,7 @@ static int keys_fill(struct lu_context *ctx) } ctx->lc_version = key_set_version; } + atomic_dec(&lu_key_initing_cnt); return 0; } @@ -1778,15 +1771,20 @@ void lu_context_exit(struct lu_context *ctx) ctx->lc_state = LCS_LEFT; if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) { for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { - if (ctx->lc_value[i] != NULL) { - struct lu_context_key *key; - - key = lu_keys[i]; - LASSERT(key != NULL); - if (key->lct_exit != NULL) - key->lct_exit(ctx, - key, ctx->lc_value[i]); - } + /* could race with key quiescency */ + if (ctx->lc_tags & LCT_REMEMBER) + spin_lock(&lu_keys_guard); + if (ctx->lc_value[i] != NULL) { + struct lu_context_key *key; + + key = lu_keys[i]; + LASSERT(key != NULL); + if (key->lct_exit != NULL) + key->lct_exit(ctx, + key, ctx->lc_value[i]); + } + if (ctx->lc_tags & LCT_REMEMBER) + spin_unlock(&lu_keys_guard); } } } @@ -1801,7 +1799,6 @@ int lu_context_refill(struct lu_context *ctx) { return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx); } -EXPORT_SYMBOL(lu_context_refill); /** * lu_ctx_tags/lu_ses_tags will be updated if there are new types of @@ -1928,7 +1925,8 @@ static void lu_site_stats_get(cfs_hash_t *hs, struct hlist_head *hhead; cfs_hash_bd_lock(hs, &bd, 1); - stats->lss_busy += bkt->lsb_busy; + stats->lss_busy += + cfs_hash_bd_count_get(&bd) - bkt->lsb_lru_len; stats->lss_total += cfs_hash_bd_count_get(&bd); stats->lss_max_search = max((int)stats->lss_max_search, cfs_hash_bd_depmax_get(&bd)); @@ -2023,7 +2021,7 @@ static unsigned long lu_cache_shrink_scan(struct shrinker *sk, * is safe to take the lu_sites_guard lock. * * Ideally we should accurately return the remaining number of cached - * objects without taking the lu_sites_guard lock, but this is not + * objects without taking the lu_sites_guard lock, but this is not * possible in the current implementation. */ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) @@ -2098,7 +2096,6 @@ void lu_context_keys_dump(void) } } } -EXPORT_SYMBOL(lu_context_keys_dump); /** * Initialization of global lu_* data. @@ -2228,7 +2225,6 @@ int lu_site_stats_print(const struct lu_site *s, char *page, int count) ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE), ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED)); } -EXPORT_SYMBOL(lu_site_stats_print); /** * Helper function to initialize a number of kmem slab caches at once. @@ -2277,7 +2273,6 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o, { struct lu_site *s = o->lo_dev->ld_site; struct lu_fid *old = &o->lo_header->loh_fid; - struct lu_site_bkt_data *bkt; struct lu_object *shadow; wait_queue_t waiter; cfs_hash_t *hs; @@ -2292,9 +2287,7 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o, /* supposed to be unique */ LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT); *old = *fid; - bkt = cfs_hash_bd_extra_get(hs, &bd); cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); - bkt->lsb_busy++; cfs_hash_bd_unlock(hs, &bd, 1); } EXPORT_SYMBOL(lu_object_assign_fid); @@ -2393,4 +2386,3 @@ int lu_buf_check_and_grow(struct lu_buf *buf, size_t len) buf->lb_len = len; return 0; } -EXPORT_SYMBOL(lu_buf_check_and_grow);