X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fobdclass%2Flu_object.c;h=74c1cc9c3308174b03da121e39f812866bcc3c21;hb=51d6a9e73384ed7ceff88022eb1c73a503896d52;hp=ca6e73d2b0e16c604e0ac27cce5646e23da6d94d;hpb=e3a7c58aebafce40323db54bf6056029e5af4a70;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index ca6e73d..74c1cc9 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -43,9 +43,6 @@ */ #define DEBUG_SUBSYSTEM S_CLASS -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #include @@ -78,11 +75,32 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) struct lu_site *site; struct lu_object *orig; cfs_hash_bd_t bd; + const struct lu_fid *fid; top = o->lo_header; site = o->lo_dev->ld_site; orig = o; + /* + * till we have full fids-on-OST implemented anonymous objects + * are possible in OSP. such an object isn't listed in the site + * so we should not remove it from the site. + */ + fid = lu_object_fid(o); + if (fid_is_zero(fid)) { + LASSERT(top->loh_hash.next == NULL + && top->loh_hash.pprev == NULL); + LASSERT(cfs_list_empty(&top->loh_lru)); + if (!cfs_atomic_dec_and_test(&top->loh_ref)) + return; + cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) { + if (o->lo_ops->loo_object_release != NULL) + o->lo_ops->loo_object_release(env, o); + } + lu_object_free(env, orig); + return; + } + cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd); bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd); @@ -138,6 +156,18 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) EXPORT_SYMBOL(lu_object_put); /** + * Put object and don't keep in cache. This is temporary solution for + * multi-site objects when its layering is not constant. + */ +void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o) +{ + cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, + &o->lo_header->loh_flags); + return lu_object_put(env, o); +} +EXPORT_SYMBOL(lu_object_put_nocache); + +/** * Allocate new object. * * This follows object creation protocol, described in the comment within @@ -889,6 +919,23 @@ cfs_hash_ops_t lu_site_hash_ops = { .hs_put_locked = lu_obj_hop_put_locked, }; +void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d) +{ + cfs_spin_lock(&s->ls_ld_lock); + if (cfs_list_empty(&d->ld_linkage)) + cfs_list_add(&d->ld_linkage, &s->ls_ld_linkage); + cfs_spin_unlock(&s->ls_ld_lock); +} +EXPORT_SYMBOL(lu_dev_add_linkage); + +void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d) +{ + cfs_spin_lock(&s->ls_ld_lock); + cfs_list_del_init(&d->ld_linkage); + cfs_spin_unlock(&s->ls_ld_lock); +} +EXPORT_SYMBOL(lu_dev_del_linkage); + /** * Initialize site \a s, with \a d as the top level device. */ @@ -967,9 +1014,7 @@ int lu_site_init(struct lu_site *s, struct lu_device *top) CFS_INIT_LIST_HEAD(&s->ls_ld_linkage); cfs_spin_lock_init(&s->ls_ld_lock); - cfs_spin_lock(&s->ls_ld_lock); - cfs_list_add(&top->ld_linkage, &s->ls_ld_linkage); - cfs_spin_unlock(&s->ls_ld_lock); + lu_dev_add_linkage(s, top); RETURN(0); } @@ -1201,15 +1246,6 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top) /* purge again. */ lu_site_purge(env, site, ~0); - if (!cfs_hash_is_empty(site->ls_obj_hash)) { - /* - * Uh-oh, objects still exist. - */ - LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL); - - lu_site_print(env, site, &msgdata, lu_cdebug_printer); - } - for (scan = top; scan != NULL; scan = next) { const struct lu_device_type *ldt = scan->ld_type; struct obd_type *type; @@ -1228,12 +1264,12 @@ enum { /** * Maximal number of tld slots. */ - LU_CONTEXT_KEY_NR = 32 + LU_CONTEXT_KEY_NR = 40 }; static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, }; -static cfs_spinlock_t lu_keys_guard = CFS_SPIN_LOCK_UNLOCKED; +static DEFINE_SPINLOCK(lu_keys_guard); /** * Global counter incremented whenever key is registered, unregistered, @@ -1287,13 +1323,14 @@ static void key_fini(struct lu_context *ctx, int index) key->lct_fini(ctx, key, ctx->lc_value[index]); lu_ref_del(&key->lct_reference, "ctx", ctx); cfs_atomic_dec(&key->lct_used); - LASSERT(key->lct_owner != NULL); - if (!(ctx->lc_tags & LCT_NOREF)) { - LASSERT(cfs_module_refcount(key->lct_owner) > 0); - cfs_module_put(key->lct_owner); - } - ctx->lc_value[index] = NULL; - } + + LASSERT(key->lct_owner != NULL); + if ((ctx->lc_tags & LCT_NOREF) == 0) { + LINVRNT(cfs_module_refcount(key->lct_owner) > 0); + cfs_module_put(key->lct_owner); + } + ctx->lc_value[index] = NULL; + } } /** @@ -1458,17 +1495,16 @@ EXPORT_SYMBOL(lu_context_key_revive); static void keys_fini(struct lu_context *ctx) { - int i; + int i; - cfs_spin_lock(&lu_keys_guard); - if (ctx->lc_value != NULL) { - for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) - key_fini(ctx, i); - OBD_FREE(ctx->lc_value, - ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); - ctx->lc_value = NULL; - } - cfs_spin_unlock(&lu_keys_guard); + if (ctx->lc_value == NULL) + return; + + for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) + key_fini(ctx, i); + + OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); + ctx->lc_value = NULL; } static int keys_fill(struct lu_context *ctx) @@ -1517,17 +1553,11 @@ static int keys_fill(struct lu_context *ctx) static int keys_init(struct lu_context *ctx) { - int result; + OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); + if (likely(ctx->lc_value != NULL)) + return keys_fill(ctx); - OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); - if (likely(ctx->lc_value != NULL)) - result = keys_fill(ctx); - else - result = -ENOMEM; - - if (result != 0) - keys_fini(ctx); - return result; + return -ENOMEM; } /** @@ -1535,6 +1565,8 @@ static int keys_init(struct lu_context *ctx) */ int lu_context_init(struct lu_context *ctx, __u32 tags) { + int rc; + memset(ctx, 0, sizeof *ctx); ctx->lc_state = LCS_INITIALIZED; ctx->lc_tags = tags; @@ -1542,9 +1574,15 @@ int lu_context_init(struct lu_context *ctx, __u32 tags) cfs_spin_lock(&lu_keys_guard); cfs_list_add(&ctx->lc_remember, &lu_context_remembered); cfs_spin_unlock(&lu_keys_guard); - } else - CFS_INIT_LIST_HEAD(&ctx->lc_remember); - return keys_init(ctx); + } else { + CFS_INIT_LIST_HEAD(&ctx->lc_remember); + } + + rc = keys_init(ctx); + if (rc != 0) + lu_context_fini(ctx); + + return rc; } EXPORT_SYMBOL(lu_context_init); @@ -1553,12 +1591,19 @@ EXPORT_SYMBOL(lu_context_init); */ void lu_context_fini(struct lu_context *ctx) { - LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); - ctx->lc_state = LCS_FINALIZED; - keys_fini(ctx); - cfs_spin_lock(&lu_keys_guard); - cfs_list_del_init(&ctx->lc_remember); - cfs_spin_unlock(&lu_keys_guard); + LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); + ctx->lc_state = LCS_FINALIZED; + + if ((ctx->lc_tags & LCT_REMEMBER) == 0) { + LASSERT(cfs_list_empty(&ctx->lc_remember)); + keys_fini(ctx); + + } else { /* could race with key degister */ + cfs_spin_lock(&lu_keys_guard); + keys_fini(ctx); + cfs_list_del_init(&ctx->lc_remember); + cfs_spin_unlock(&lu_keys_guard); + } } EXPORT_SYMBOL(lu_context_fini); @@ -1752,6 +1797,24 @@ static void lu_site_stats_get(cfs_hash_t *hs, #ifdef __KERNEL__ +/* + * There exists a potential lock inversion deadlock scenario when using + * Lustre on top of ZFS. This occurs between one of ZFS's + * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially, + * thread A will take the lu_sites_guard lock and sleep on the ht_lock, + * while thread B will take the ht_lock and sleep on the lu_sites_guard + * lock. Obviously neither thread will wake and drop their respective hold + * on their lock. + * + * To prevent this from happening we must ensure the lu_sites_guard lock is + * not taken while down this code path. ZFS reliably does not set the + * __GFP_FS bit in its code paths, so this can be used to determine if it + * is safe to take the lu_sites_guard lock. + * + * Ideally we should accurately return the remaining number of cached + * objects without taking the lu_sites_guard lock, but this is not + * possible in the current implementation. + */ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) { lu_site_stats_t stats; @@ -1761,12 +1824,26 @@ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) int remain = shrink_param(sc, nr_to_scan); CFS_LIST_HEAD(splice); - if (remain != 0) { - if (!(shrink_param(sc, gfp_mask) & __GFP_FS)) + if (!(shrink_param(sc, gfp_mask) & __GFP_FS)) { + if (remain != 0) return -1; - CDEBUG(D_INODE, "Shrink %d objects\n", remain); + else + /* We must not take the lu_sites_guard lock when + * __GFP_FS is *not* set because of the deadlock + * possibility detailed above. Additionally, + * since we cannot determine the number of + * objects in the cache without taking this + * lock, we're in a particularly tough spot. As + * a result, we'll just lie and say our cache is + * empty. This _should_ be ok, as we can't + * reclaim objects when __GFP_FS is *not* set + * anyways. + */ + return 0; } + CDEBUG(D_INODE, "Shrink %d objects\n", remain); + cfs_mutex_lock(&lu_sites_guard); cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) { if (shrink_param(sc, nr_to_scan) != 0) { @@ -2036,3 +2113,53 @@ void lu_kmem_fini(struct lu_kmem_descr *caches) } } EXPORT_SYMBOL(lu_kmem_fini); + +/** + * Temporary solution to be able to assign fid in ->do_create() + * till we have fully-functional OST fids + */ +void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o, + const struct lu_fid *fid) +{ + struct lu_site *s = o->lo_dev->ld_site; + struct lu_fid *old = &o->lo_header->loh_fid; + struct lu_site_bkt_data *bkt; + struct lu_object *shadow; + cfs_waitlink_t waiter; + cfs_hash_t *hs; + cfs_hash_bd_t bd; + __u64 version = 0; + + LASSERT(fid_is_zero(old)); + + hs = s->ls_obj_hash; + cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1); + shadow = htable_lookup(s, &bd, fid, &waiter, &version); + /* supposed to be unique */ + LASSERT(shadow == NULL); + *old = *fid; + bkt = cfs_hash_bd_extra_get(hs, &bd); + cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); + bkt->lsb_busy++; + cfs_hash_bd_unlock(hs, &bd, 1); +} +EXPORT_SYMBOL(lu_object_assign_fid); + +/** + * allocates object with 0 (non-assiged) fid + * XXX: temporary solution to be able to assign fid in ->do_create() + * till we have fully-functional OST fids + */ +struct lu_object *lu_object_anon(const struct lu_env *env, + struct lu_device *dev, + const struct lu_object_conf *conf) +{ + struct lu_fid fid; + struct lu_object *o; + + fid_zero(&fid); + o = lu_object_alloc(env, dev, &fid, conf); + + return o; +} +EXPORT_SYMBOL(lu_object_anon);