X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Flu_object.c;h=2679063311ed58b31ef7f3ef69e9e53195411834;hb=7bf1d7c6cb7d0a7231b3fdcb9e3d3ec3129fb427;hp=b3bc375ed6f062873b27679261b7f5c1fd04a68c;hpb=598996a12f5818190635e1f3a948c9e6a77297b5;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index b3bc375..2679063 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -28,9 +26,8 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. - */ -/* - * Copyright (c) 2011 Whamcloud, Inc. + * + * Copyright (c) 2011, 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -46,9 +43,6 @@ */ #define DEBUG_SUBSYSTEM S_CLASS -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #include @@ -64,8 +58,6 @@ #include #include #include -/* lu_time_global_{init,fini}() */ -#include static void lu_object_free(const struct lu_env *env, struct lu_object *o); @@ -81,11 +73,32 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) struct lu_site *site; struct lu_object *orig; cfs_hash_bd_t bd; + const struct lu_fid *fid; top = o->lo_header; site = o->lo_dev->ld_site; orig = o; + /* + * till we have full fids-on-OST implemented anonymous objects + * are possible in OSP. such an object isn't listed in the site + * so we should not remove it from the site. + */ + fid = lu_object_fid(o); + if (fid_is_zero(fid)) { + LASSERT(top->loh_hash.next == NULL + && top->loh_hash.pprev == NULL); + LASSERT(cfs_list_empty(&top->loh_lru)); + if (!cfs_atomic_dec_and_test(&top->loh_ref)) + return; + cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) { + if (o->lo_ops->loo_object_release != NULL) + o->lo_ops->loo_object_release(env, o); + } + lu_object_free(env, orig); + return; + } + cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd); bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd); @@ -130,7 +143,8 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) * and LRU lock, no race with concurrent object lookup is possible * and we can safely destroy object below. */ - cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash); + if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) + cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash); cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); /* * Object was already removed from hash and lru above, can @@ -141,6 +155,39 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) EXPORT_SYMBOL(lu_object_put); /** + * Put object and don't keep in cache. This is temporary solution for + * multi-site objects when its layering is not constant. + */ +void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o) +{ + set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags); + return lu_object_put(env, o); +} +EXPORT_SYMBOL(lu_object_put_nocache); + +/** + * Kill the object and take it out of LRU cache. + * Currently used by client code for layout change. + */ +void lu_object_unhash(const struct lu_env *env, struct lu_object *o) +{ + struct lu_object_header *top; + + top = o->lo_header; + set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags); + if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) { + cfs_hash_t *obj_hash = o->lo_dev->ld_site->ls_obj_hash; + cfs_hash_bd_t bd; + + cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1); + cfs_list_del_init(&top->loh_lru); + cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash); + cfs_hash_bd_unlock(obj_hash, &bd, 1); + } +} +EXPORT_SYMBOL(lu_object_unhash); + +/** * Allocate new object. * * This follows object creation protocol, described in the comment within @@ -158,18 +205,19 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, int result; ENTRY; - /* - * Create top-level object slice. This will also create - * lu_object_header. - */ - top = dev->ld_ops->ldo_object_alloc(env, NULL, dev); - if (top == NULL) - RETURN(ERR_PTR(-ENOMEM)); + /* + * Create top-level object slice. This will also create + * lu_object_header. + */ + top = dev->ld_ops->ldo_object_alloc(env, NULL, dev); + if (top == NULL) + RETURN(ERR_PTR(-ENOMEM)); + if (IS_ERR(top)) + RETURN(top); /* * This is the only place where object fid is assigned. It's constant * after this point. */ - LASSERT(fid_is_igif(f) || fid_ver(f) == 0); top->lo_header->loh_fid = *f; layers = &top->lo_header->loh_layers; do { @@ -355,7 +403,7 @@ enum { * * XXX overflow is not handled correctly. */ - LU_CDEBUG_LINE = 256 + LU_CDEBUG_LINE = 512 }; struct lu_cdebug_data { @@ -373,7 +421,8 @@ LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data); * lu_global_init(). */ struct lu_context_key lu_global_key = { - .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD, + .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | + LCT_MG_THREAD | LCT_CL_THREAD, .lct_init = lu_global_key_init, .lct_fini = lu_global_key_fini }; @@ -384,8 +433,8 @@ struct lu_context_key lu_global_key = { int lu_cdebug_printer(const struct lu_env *env, void *cookie, const char *format, ...) { - struct lu_cdebug_print_info *info = cookie; - struct lu_cdebug_data *key; + struct libcfs_debug_msg_data *msgdata = cookie; + struct lu_cdebug_data *key; int used; int complete; va_list args; @@ -403,10 +452,8 @@ int lu_cdebug_printer(const struct lu_env *env, vsnprintf(key->lck_area + used, ARRAY_SIZE(key->lck_area) - used, format, args); if (complete) { - if (cfs_cdebug_show(info->lpi_mask, info->lpi_subsys)) - libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask, - (char *)info->lpi_file, info->lpi_fn, - info->lpi_line, "%s", key->lck_area); + if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys)) + libcfs_debug_msg(msgdata, "%s", key->lck_area); key->lck_area[0] = 0; } va_end(args); @@ -493,9 +540,9 @@ static struct lu_object *htable_lookup(struct lu_site *s, *version = ver; bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd); - /* cfs_hash_bd_lookup_intent is a somehow "internal" function - * of cfs_hash, but we don't want refcount on object right now */ - hnode = cfs_hash_bd_lookup_locked(s->ls_obj_hash, bd, (void *)f); + /* cfs_hash_bd_peek_locked is a somehow "internal" function + * of cfs_hash, it doesn't add refcount on object. */ + hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f); if (hnode == NULL) { lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS); return NULL; @@ -503,7 +550,9 @@ static struct lu_object *htable_lookup(struct lu_site *s, h = container_of0(hnode, struct lu_object_header, loh_hash); if (likely(!lu_object_is_dying(h))) { + cfs_hash_get(s->ls_obj_hash, hnode); lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT); + cfs_list_del_init(&h->loh_lru); return lu_object_top(h); } @@ -512,7 +561,6 @@ static struct lu_object *htable_lookup(struct lu_site *s, * returned (to assure that references to dying objects are eventually * drained), and moreover, lookup has to wait until object is freed. */ - cfs_atomic_dec(&h->loh_ref); cfs_waitlink_init(waiter); cfs_waitq_add(&bkt->lsb_marche_funebre, waiter); @@ -600,8 +648,6 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env, hs = s->ls_obj_hash; cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); o = htable_lookup(s, &bd, f, waiter, &version); - if (o != NULL && !cfs_list_empty(&o->lo_header->loh_lru)) - cfs_list_del_init(&o->lo_header->loh_lru); cfs_hash_bd_unlock(hs, &bd, 1); if (o != NULL) return o; @@ -693,20 +739,22 @@ static CFS_LIST_HEAD(lu_device_types); int lu_device_type_init(struct lu_device_type *ldt) { - int result; + int result = 0; - CFS_INIT_LIST_HEAD(&ldt->ldt_linkage); - result = ldt->ldt_ops->ldto_init(ldt); - if (result == 0) - cfs_list_add(&ldt->ldt_linkage, &lu_device_types); - return result; + CFS_INIT_LIST_HEAD(&ldt->ldt_linkage); + if (ldt->ldt_ops->ldto_init) + result = ldt->ldt_ops->ldto_init(ldt); + if (result == 0) + cfs_list_add(&ldt->ldt_linkage, &lu_device_types); + return result; } EXPORT_SYMBOL(lu_device_type_init); void lu_device_type_fini(struct lu_device_type *ldt) { - cfs_list_del_init(&ldt->ldt_linkage); - ldt->ldt_ops->ldto_fini(ldt); + cfs_list_del_init(&ldt->ldt_linkage); + if (ldt->ldt_ops->ldto_fini) + ldt->ldt_ops->ldto_fini(ldt); } EXPORT_SYMBOL(lu_device_type_fini); @@ -714,10 +762,10 @@ void lu_types_stop(void) { struct lu_device_type *ldt; - cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) { - if (ldt->ldt_device_nr == 0) - ldt->ldt_ops->ldto_stop(ldt); - } + cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) { + if (ldt->ldt_device_nr == 0 && ldt->ldt_ops->ldto_stop) + ldt->ldt_ops->ldto_stop(ldt); + } } EXPORT_SYMBOL(lu_types_stop); @@ -725,7 +773,7 @@ EXPORT_SYMBOL(lu_types_stop); * Global list of all sites on this node */ static CFS_LIST_HEAD(lu_sites); -static CFS_DECLARE_MUTEX(lu_sites_guard); +static DEFINE_MUTEX(lu_sites_guard); /** * Global environment used by site shrinker. @@ -829,10 +877,18 @@ static unsigned lu_obj_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask) { struct lu_fid *fid = (struct lu_fid *)key; - unsigned hash; + __u32 hash; + + hash = fid_flatten32(fid); + hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */ + hash = cfs_hash_long(hash, hs->hs_bkt_bits); + + /* give me another random factor */ + hash -= cfs_hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3); + + hash <<= hs->hs_cur_bits - hs->hs_bkt_bits; + hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1); - hash = (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1); - hash += fid_hash(fid, hs->hs_bkt_bits) << hs->hs_bkt_bits; return hash & mask; } @@ -886,31 +942,50 @@ cfs_hash_ops_t lu_site_hash_ops = { .hs_put_locked = lu_obj_hop_put_locked, }; +void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d) +{ + spin_lock(&s->ls_ld_lock); + if (cfs_list_empty(&d->ld_linkage)) + cfs_list_add(&d->ld_linkage, &s->ls_ld_linkage); + spin_unlock(&s->ls_ld_lock); +} +EXPORT_SYMBOL(lu_dev_add_linkage); + +void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d) +{ + spin_lock(&s->ls_ld_lock); + cfs_list_del_init(&d->ld_linkage); + spin_unlock(&s->ls_ld_lock); +} +EXPORT_SYMBOL(lu_dev_del_linkage); + /** * Initialize site \a s, with \a d as the top level device. */ #define LU_SITE_BITS_MIN 12 -#define LU_SITE_BITS_MAX 23 +#define LU_SITE_BITS_MAX 24 /** - * total 128 buckets, we don't want too many buckets because: + * total 256 buckets, we don't want too many buckets because: * - consume too much memory * - avoid unbalanced LRU list */ -#define LU_SITE_BKT_BITS 7 +#define LU_SITE_BKT_BITS 8 int lu_site_init(struct lu_site *s, struct lu_device *top) { struct lu_site_bkt_data *bkt; cfs_hash_bd_t bd; + char name[16]; int bits; int i; ENTRY; memset(s, 0, sizeof *s); bits = lu_htable_order(); + snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name); for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX); bits >= LU_SITE_BITS_MIN; bits--) { - s->ls_obj_hash = cfs_hash_create("lu_site", bits, bits, + s->ls_obj_hash = cfs_hash_create(name, bits, bits, bits - LU_SITE_BKT_BITS, sizeof(*bkt), 0, 0, &lu_site_hash_ops, @@ -959,7 +1034,12 @@ int lu_site_init(struct lu_site *s, struct lu_device *top) lu_device_get(top); lu_ref_add(&top->ld_reference, "site-top", s); - RETURN(0); + CFS_INIT_LIST_HEAD(&s->ls_ld_linkage); + spin_lock_init(&s->ls_ld_lock); + + lu_dev_add_linkage(s, top); + + RETURN(0); } EXPORT_SYMBOL(lu_site_init); @@ -968,9 +1048,9 @@ EXPORT_SYMBOL(lu_site_init); */ void lu_site_fini(struct lu_site *s) { - cfs_down(&lu_sites_guard); + mutex_lock(&lu_sites_guard); cfs_list_del_init(&s->ls_linkage); - cfs_up(&lu_sites_guard); + mutex_unlock(&lu_sites_guard); if (s->ls_obj_hash != NULL) { cfs_hash_putref(s->ls_obj_hash); @@ -995,11 +1075,11 @@ EXPORT_SYMBOL(lu_site_fini); int lu_site_init_finish(struct lu_site *s) { int result; - cfs_down(&lu_sites_guard); + mutex_lock(&lu_sites_guard); result = lu_context_refill(&lu_shrink_env.le_ctx); if (result == 0) cfs_list_add(&s->ls_linkage, &lu_sites); - cfs_up(&lu_sites_guard); + mutex_unlock(&lu_sites_guard); return result; } EXPORT_SYMBOL(lu_site_init_finish); @@ -1034,6 +1114,7 @@ int lu_device_init(struct lu_device *d, struct lu_device_type *t) cfs_atomic_set(&d->ld_ref, 0); d->ld_type = t; lu_ref_init(&d->ld_reference); + CFS_INIT_LIST_HEAD(&d->ld_linkage); return 0; } EXPORT_SYMBOL(lu_device_init); @@ -1188,15 +1269,6 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top) /* purge again. */ lu_site_purge(env, site, ~0); - if (!cfs_hash_is_empty(site->ls_obj_hash)) { - /* - * Uh-oh, objects still exist. - */ - static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR); - - lu_site_print(env, site, &cookie, lu_cdebug_printer); - } - for (scan = top; scan != NULL; scan = next) { const struct lu_device_type *ldt = scan->ld_type; struct obd_type *type; @@ -1215,12 +1287,12 @@ enum { /** * Maximal number of tld slots. */ - LU_CONTEXT_KEY_NR = 32 + LU_CONTEXT_KEY_NR = 40 }; static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, }; -static cfs_spinlock_t lu_keys_guard = CFS_SPIN_LOCK_UNLOCKED; +static DEFINE_SPINLOCK(lu_keys_guard); /** * Global counter incremented whenever key is registered, unregistered, @@ -1244,7 +1316,7 @@ int lu_context_key_register(struct lu_context_key *key) LASSERT(key->lct_owner != NULL); result = -ENFILE; - cfs_spin_lock(&lu_keys_guard); + spin_lock(&lu_keys_guard); for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { if (lu_keys[i] == NULL) { key->lct_index = i; @@ -1256,8 +1328,8 @@ int lu_context_key_register(struct lu_context_key *key) break; } } - cfs_spin_unlock(&lu_keys_guard); - return result; + spin_unlock(&lu_keys_guard); + return result; } EXPORT_SYMBOL(lu_context_key_register); @@ -1274,13 +1346,14 @@ static void key_fini(struct lu_context *ctx, int index) key->lct_fini(ctx, key, ctx->lc_value[index]); lu_ref_del(&key->lct_reference, "ctx", ctx); cfs_atomic_dec(&key->lct_used); - LASSERT(key->lct_owner != NULL); - if (!(ctx->lc_tags & LCT_NOREF)) { - LASSERT(cfs_module_refcount(key->lct_owner) > 0); - cfs_module_put(key->lct_owner); - } - ctx->lc_value[index] = NULL; - } + + LASSERT(key->lct_owner != NULL); + if ((ctx->lc_tags & LCT_NOREF) == 0) { + LINVRNT(cfs_module_refcount(key->lct_owner) > 0); + cfs_module_put(key->lct_owner); + } + ctx->lc_value[index] = NULL; + } } /** @@ -1288,23 +1361,23 @@ static void key_fini(struct lu_context *ctx, int index) */ void lu_context_key_degister(struct lu_context_key *key) { - LASSERT(cfs_atomic_read(&key->lct_used) >= 1); - LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys)); + LASSERT(cfs_atomic_read(&key->lct_used) >= 1); + LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys)); - lu_context_key_quiesce(key); + lu_context_key_quiesce(key); - ++key_set_version; - cfs_spin_lock(&lu_keys_guard); - key_fini(&lu_shrink_env.le_ctx, key->lct_index); - if (lu_keys[key->lct_index]) { - lu_keys[key->lct_index] = NULL; - lu_ref_fini(&key->lct_reference); - } - cfs_spin_unlock(&lu_keys_guard); + ++key_set_version; + spin_lock(&lu_keys_guard); + key_fini(&lu_shrink_env.le_ctx, key->lct_index); + if (lu_keys[key->lct_index]) { + lu_keys[key->lct_index] = NULL; + lu_ref_fini(&key->lct_reference); + } + spin_unlock(&lu_keys_guard); - LASSERTF(cfs_atomic_read(&key->lct_used) == 1, - "key has instances: %d\n", - cfs_atomic_read(&key->lct_used)); + LASSERTF(cfs_atomic_read(&key->lct_used) == 1, + "key has instances: %d\n", + cfs_atomic_read(&key->lct_used)); } EXPORT_SYMBOL(lu_context_key_degister); @@ -1426,13 +1499,13 @@ void lu_context_key_quiesce(struct lu_context_key *key) /* * XXX memory barrier has to go here. */ - cfs_spin_lock(&lu_keys_guard); - cfs_list_for_each_entry(ctx, &lu_context_remembered, - lc_remember) - key_fini(ctx, key->lct_index); - cfs_spin_unlock(&lu_keys_guard); - ++key_set_version; - } + spin_lock(&lu_keys_guard); + cfs_list_for_each_entry(ctx, &lu_context_remembered, + lc_remember) + key_fini(ctx, key->lct_index); + spin_unlock(&lu_keys_guard); + ++key_set_version; + } } EXPORT_SYMBOL(lu_context_key_quiesce); @@ -1445,23 +1518,23 @@ EXPORT_SYMBOL(lu_context_key_revive); static void keys_fini(struct lu_context *ctx) { - int i; + int i; - cfs_spin_lock(&lu_keys_guard); - if (ctx->lc_value != NULL) { - for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) - key_fini(ctx, i); - OBD_FREE(ctx->lc_value, - ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); - ctx->lc_value = NULL; - } - cfs_spin_unlock(&lu_keys_guard); + if (ctx->lc_value == NULL) + return; + + for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) + key_fini(ctx, i); + + OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); + ctx->lc_value = NULL; } static int keys_fill(struct lu_context *ctx) { int i; + LINVRNT(ctx->lc_value != NULL); for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { struct lu_context_key *key; @@ -1503,17 +1576,11 @@ static int keys_fill(struct lu_context *ctx) static int keys_init(struct lu_context *ctx) { - int result; - - OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); - if (likely(ctx->lc_value != NULL)) - result = keys_fill(ctx); - else - result = -ENOMEM; + OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); + if (likely(ctx->lc_value != NULL)) + return keys_fill(ctx); - if (result != 0) - keys_fini(ctx); - return result; + return -ENOMEM; } /** @@ -1521,16 +1588,24 @@ static int keys_init(struct lu_context *ctx) */ int lu_context_init(struct lu_context *ctx, __u32 tags) { - memset(ctx, 0, sizeof *ctx); - ctx->lc_state = LCS_INITIALIZED; - ctx->lc_tags = tags; - if (tags & LCT_REMEMBER) { - cfs_spin_lock(&lu_keys_guard); - cfs_list_add(&ctx->lc_remember, &lu_context_remembered); - cfs_spin_unlock(&lu_keys_guard); - } else - CFS_INIT_LIST_HEAD(&ctx->lc_remember); - return keys_init(ctx); + int rc; + + memset(ctx, 0, sizeof *ctx); + ctx->lc_state = LCS_INITIALIZED; + ctx->lc_tags = tags; + if (tags & LCT_REMEMBER) { + spin_lock(&lu_keys_guard); + cfs_list_add(&ctx->lc_remember, &lu_context_remembered); + spin_unlock(&lu_keys_guard); + } else { + CFS_INIT_LIST_HEAD(&ctx->lc_remember); + } + + rc = keys_init(ctx); + if (rc != 0) + lu_context_fini(ctx); + + return rc; } EXPORT_SYMBOL(lu_context_init); @@ -1539,12 +1614,19 @@ EXPORT_SYMBOL(lu_context_init); */ void lu_context_fini(struct lu_context *ctx) { - LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); - ctx->lc_state = LCS_FINALIZED; - keys_fini(ctx); - cfs_spin_lock(&lu_keys_guard); - cfs_list_del_init(&ctx->lc_remember); - cfs_spin_unlock(&lu_keys_guard); + LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); + ctx->lc_state = LCS_FINALIZED; + + if ((ctx->lc_tags & LCT_REMEMBER) == 0) { + LASSERT(cfs_list_empty(&ctx->lc_remember)); + keys_fini(ctx); + + } else { /* could race with key degister */ + spin_lock(&lu_keys_guard); + keys_fini(ctx); + cfs_list_del_init(&ctx->lc_remember); + spin_unlock(&lu_keys_guard); + } } EXPORT_SYMBOL(lu_context_fini); @@ -1585,15 +1667,61 @@ EXPORT_SYMBOL(lu_context_exit); /** * Allocate for context all missing keys that were registered after context - * creation. + * creation. key_set_version is only changed in rare cases when modules + * are loaded and removed. */ int lu_context_refill(struct lu_context *ctx) { - LINVRNT(ctx->lc_value != NULL); - return ctx->lc_version == key_set_version ? 0 : keys_fill(ctx); + return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx); } EXPORT_SYMBOL(lu_context_refill); +/** + * lu_ctx_tags/lu_ses_tags will be updated if there are new types of + * obd being added. Currently, this is only used on client side, specifically + * for echo device client, for other stack (like ptlrpc threads), context are + * predefined when the lu_device type are registered, during the module probe + * phase. + */ +__u32 lu_context_tags_default = 0; +__u32 lu_session_tags_default = 0; + +void lu_context_tags_update(__u32 tags) +{ + spin_lock(&lu_keys_guard); + lu_context_tags_default |= tags; + key_set_version++; + spin_unlock(&lu_keys_guard); +} +EXPORT_SYMBOL(lu_context_tags_update); + +void lu_context_tags_clear(__u32 tags) +{ + spin_lock(&lu_keys_guard); + lu_context_tags_default &= ~tags; + key_set_version++; + spin_unlock(&lu_keys_guard); +} +EXPORT_SYMBOL(lu_context_tags_clear); + +void lu_session_tags_update(__u32 tags) +{ + spin_lock(&lu_keys_guard); + lu_session_tags_default |= tags; + key_set_version++; + spin_unlock(&lu_keys_guard); +} +EXPORT_SYMBOL(lu_session_tags_update); + +void lu_session_tags_clear(__u32 tags) +{ + spin_lock(&lu_keys_guard); + lu_session_tags_default &= ~tags; + key_set_version++; + spin_unlock(&lu_keys_guard); +} +EXPORT_SYMBOL(lu_session_tags_clear); + int lu_env_init(struct lu_env *env, __u32 tags) { int result; @@ -1625,6 +1753,34 @@ int lu_env_refill(struct lu_env *env) } EXPORT_SYMBOL(lu_env_refill); +/** + * Currently, this API will only be used by echo client. + * Because echo client and normal lustre client will share + * same cl_env cache. So echo client needs to refresh + * the env context after it get one from the cache, especially + * when normal client and echo client co-exist in the same client. + */ +int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags, + __u32 stags) +{ + int result; + + if ((env->le_ctx.lc_tags & ctags) != ctags) { + env->le_ctx.lc_version = 0; + env->le_ctx.lc_tags |= ctags; + } + + if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) { + env->le_ses->lc_version = 0; + env->le_ses->lc_tags |= stags; + } + + result = lu_env_refill(env); + + return result; +} +EXPORT_SYMBOL(lu_env_refill_by_tags); + static struct cfs_shrinker *lu_site_shrinker = NULL; typedef struct lu_site_stats{ @@ -1664,6 +1820,24 @@ static void lu_site_stats_get(cfs_hash_t *hs, #ifdef __KERNEL__ +/* + * There exists a potential lock inversion deadlock scenario when using + * Lustre on top of ZFS. This occurs between one of ZFS's + * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially, + * thread A will take the lu_sites_guard lock and sleep on the ht_lock, + * while thread B will take the ht_lock and sleep on the lu_sites_guard + * lock. Obviously neither thread will wake and drop their respective hold + * on their lock. + * + * To prevent this from happening we must ensure the lu_sites_guard lock is + * not taken while down this code path. ZFS reliably does not set the + * __GFP_FS bit in its code paths, so this can be used to determine if it + * is safe to take the lu_sites_guard lock. + * + * Ideally we should accurately return the remaining number of cached + * objects without taking the lu_sites_guard lock, but this is not + * possible in the current implementation. + */ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) { lu_site_stats_t stats; @@ -1673,13 +1847,27 @@ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) int remain = shrink_param(sc, nr_to_scan); CFS_LIST_HEAD(splice); - if (remain != 0) { - if (!(shrink_param(sc, gfp_mask) & __GFP_FS)) + if (!(shrink_param(sc, gfp_mask) & __GFP_FS)) { + if (remain != 0) return -1; - CDEBUG(D_INODE, "Shrink %d objects\n", remain); + else + /* We must not take the lu_sites_guard lock when + * __GFP_FS is *not* set because of the deadlock + * possibility detailed above. Additionally, + * since we cannot determine the number of + * objects in the cache without taking this + * lock, we're in a particularly tough spot. As + * a result, we'll just lie and say our cache is + * empty. This _should_ be ok, as we can't + * reclaim objects when __GFP_FS is *not* set + * anyways. + */ + return 0; } - cfs_down(&lu_sites_guard); + CDEBUG(D_INODE, "Shrink %d objects\n", remain); + + mutex_lock(&lu_sites_guard); cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) { if (shrink_param(sc, nr_to_scan) != 0) { remain = lu_site_purge(&lu_shrink_env, s, remain); @@ -1697,7 +1885,7 @@ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) break; } cfs_list_splice(&splice, lu_sites.prev); - cfs_up(&lu_sites_guard); + mutex_unlock(&lu_sites_guard); cached = (cached / 100) * sysctl_vfs_cache_pressure; if (shrink_param(sc, nr_to_scan) == 0) @@ -1728,9 +1916,9 @@ int lu_printk_printer(const struct lu_env *env, return 0; } -void lu_debugging_setup(void) +int lu_debugging_setup(void) { - lu_env_init(&lu_debugging_env, ~0); + return lu_env_init(&lu_debugging_env, ~0); } void lu_context_keys_dump(void) @@ -1771,6 +1959,52 @@ void dt_global_fini(void); int llo_global_init(void); void llo_global_fini(void); +/* context key constructor/destructor: lu_ucred_key_init, lu_ucred_key_fini */ +LU_KEY_INIT_FINI(lu_ucred, struct lu_ucred); + +static struct lu_context_key lu_ucred_key = { + .lct_tags = LCT_SESSION, + .lct_init = lu_ucred_key_init, + .lct_fini = lu_ucred_key_fini +}; + +/** + * Get ucred key if session exists and ucred key is allocated on it. + * Return NULL otherwise. + */ +struct lu_ucred *lu_ucred(const struct lu_env *env) +{ + if (!env->le_ses) + return NULL; + return lu_context_key_get(env->le_ses, &lu_ucred_key); +} +EXPORT_SYMBOL(lu_ucred); + +/** + * Get ucred key and check if it is properly initialized. + * Return NULL otherwise. + */ +struct lu_ucred *lu_ucred_check(const struct lu_env *env) +{ + struct lu_ucred *uc = lu_ucred(env); + if (uc && uc->uc_valid != UCRED_OLD && uc->uc_valid != UCRED_NEW) + return NULL; + return uc; +} +EXPORT_SYMBOL(lu_ucred_check); + +/** + * Get ucred key, which must exist and must be properly initialized. + * Assert otherwise. + */ +struct lu_ucred *lu_ucred_assert(const struct lu_env *env) +{ + struct lu_ucred *uc = lu_ucred_check(env); + LASSERT(uc != NULL); + return uc; +} +EXPORT_SYMBOL(lu_ucred_assert); + /** * Initialization of global lu_* data. */ @@ -1788,14 +2022,20 @@ int lu_global_init(void) result = lu_context_key_register(&lu_global_key); if (result != 0) return result; + + LU_CONTEXT_KEY_INIT(&lu_ucred_key); + result = lu_context_key_register(&lu_ucred_key); + if (result != 0) + return result; + /* * At this level, we don't know what tags are needed, so allocate them * conservatively. This should not be too bad, because this * environment is global. */ - cfs_down(&lu_sites_guard); + mutex_lock(&lu_sites_guard); result = lu_env_init(&lu_shrink_env, LCT_SHRINKER); - cfs_up(&lu_sites_guard); + mutex_unlock(&lu_sites_guard); if (result != 0) return result; @@ -1808,21 +2048,16 @@ int lu_global_init(void) if (lu_site_shrinker == NULL) return -ENOMEM; - result = lu_time_global_init(); - if (result) - GOTO(out, result); - #ifdef __KERNEL__ - result = dt_global_init(); - if (result) - GOTO(out, result); + result = dt_global_init(); + if (result != 0) + return result; - result = llo_global_init(); - if (result) - GOTO(out, result); + result = llo_global_init(); + if (result != 0) + return result; #endif result = cl_global_init(); -out: return result; } @@ -1837,21 +2072,21 @@ void lu_global_fini(void) llo_global_fini(); dt_global_fini(); #endif - lu_time_global_fini(); if (lu_site_shrinker != NULL) { cfs_remove_shrinker(lu_site_shrinker); lu_site_shrinker = NULL; } lu_context_key_degister(&lu_global_key); + lu_context_key_degister(&lu_ucred_key); /* * Tear shrinker environment down _after_ de-registering * lu_global_key, because the latter has a value in the former. */ - cfs_down(&lu_sites_guard); + mutex_lock(&lu_sites_guard); lu_env_fini(&lu_shrink_env); - cfs_up(&lu_sites_guard); + mutex_unlock(&lu_sites_guard); lu_ref_global_fini(); } @@ -1900,13 +2135,6 @@ int lu_site_stats_print(const struct lu_site *s, char *page, int count) } EXPORT_SYMBOL(lu_site_stats_print); -const char *lu_time_names[LU_TIME_NR] = { - [LU_TIME_FIND_LOOKUP] = "find_lookup", - [LU_TIME_FIND_ALLOC] = "find_alloc", - [LU_TIME_FIND_INSERT] = "find_insert" -}; -EXPORT_SYMBOL(lu_time_names); - /** * Helper function to initialize a number of kmem slab caches at once. */ @@ -1948,3 +2176,53 @@ void lu_kmem_fini(struct lu_kmem_descr *caches) } } EXPORT_SYMBOL(lu_kmem_fini); + +/** + * Temporary solution to be able to assign fid in ->do_create() + * till we have fully-functional OST fids + */ +void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o, + const struct lu_fid *fid) +{ + struct lu_site *s = o->lo_dev->ld_site; + struct lu_fid *old = &o->lo_header->loh_fid; + struct lu_site_bkt_data *bkt; + struct lu_object *shadow; + cfs_waitlink_t waiter; + cfs_hash_t *hs; + cfs_hash_bd_t bd; + __u64 version = 0; + + LASSERT(fid_is_zero(old)); + + hs = s->ls_obj_hash; + cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1); + shadow = htable_lookup(s, &bd, fid, &waiter, &version); + /* supposed to be unique */ + LASSERT(shadow == NULL); + *old = *fid; + bkt = cfs_hash_bd_extra_get(hs, &bd); + cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); + bkt->lsb_busy++; + cfs_hash_bd_unlock(hs, &bd, 1); +} +EXPORT_SYMBOL(lu_object_assign_fid); + +/** + * allocates object with 0 (non-assiged) fid + * XXX: temporary solution to be able to assign fid in ->do_create() + * till we have fully-functional OST fids + */ +struct lu_object *lu_object_anon(const struct lu_env *env, + struct lu_device *dev, + const struct lu_object_conf *conf) +{ + struct lu_fid fid; + struct lu_object *o; + + fid_zero(&fid); + o = lu_object_alloc(env, dev, &fid, conf); + + return o; +} +EXPORT_SYMBOL(lu_object_anon);