-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*/
#define DEBUG_SUBSYSTEM S_CLASS
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
#include <libcfs/libcfs.h>
struct lu_site *site;
struct lu_object *orig;
cfs_hash_bd_t bd;
+ const struct lu_fid *fid;
top = o->lo_header;
site = o->lo_dev->ld_site;
orig = o;
+ /*
+ * till we have full fids-on-OST implemented anonymous objects
+ * are possible in OSP. such an object isn't listed in the site
+ * so we should not remove it from the site.
+ */
+ fid = lu_object_fid(o);
+ if (fid_is_zero(fid)) {
+ LASSERT(top->loh_hash.next == NULL
+ && top->loh_hash.pprev == NULL);
+ LASSERT(cfs_list_empty(&top->loh_lru));
+ if (!cfs_atomic_dec_and_test(&top->loh_ref))
+ return;
+ cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
+ if (o->lo_ops->loo_object_release != NULL)
+ o->lo_ops->loo_object_release(env, o);
+ }
+ lu_object_free(env, orig);
+ return;
+ }
+
cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
}
if (!lu_object_is_dying(top)) {
+ LASSERT(cfs_list_empty(&top->loh_lru));
+ cfs_list_add_tail(&top->loh_lru, &bkt->lsb_lru);
cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
return;
}
* and we can safely destroy object below.
*/
cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
- cfs_list_del_init(&top->loh_lru);
cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
/*
* Object was already removed from hash and lru above, can
EXPORT_SYMBOL(lu_object_put);
/**
+ * Put object and don't keep in cache. This is temporary solution for
+ * multi-site objects when its layering is not constant.
+ */
+void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
+{
+ set_bit(LU_OBJECT_HEARD_BANSHEE,
+ &o->lo_header->loh_flags);
+ return lu_object_put(env, o);
+}
+EXPORT_SYMBOL(lu_object_put_nocache);
+
+/**
* Allocate new object.
*
* This follows object creation protocol, described in the comment within
bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
cfs_list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
- /*
- * Objects are sorted in lru order, and "busy"
- * objects (ones with h->loh_ref > 0) naturally tend to
- * live near hot end that we scan last. Unfortunately,
- * sites usually have small (less then ten) number of
- * busy yet rarely accessed objects (some global
- * objects, accessed directly through pointers,
- * bypassing hash table).
- * Currently algorithm scans them over and over again.
- * Probably we should move busy objects out of LRU,
- * or we can live with that.
- */
- if (cfs_atomic_read(&h->loh_ref) > 0)
- continue;
+ LASSERT(cfs_atomic_read(&h->loh_ref) == 0);
cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
LASSERT(bd.bd_bucket == bd2.bd_bucket);
*
* XXX overflow is not handled correctly.
*/
- LU_CDEBUG_LINE = 256
+ LU_CDEBUG_LINE = 512
};
struct lu_cdebug_data {
* lu_global_init().
*/
struct lu_context_key lu_global_key = {
- .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
+ .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
+ LCT_MG_THREAD | LCT_CL_THREAD,
.lct_init = lu_global_key_init,
.lct_fini = lu_global_key_fini
};
int lu_cdebug_printer(const struct lu_env *env,
void *cookie, const char *format, ...)
{
- struct lu_cdebug_print_info *info = cookie;
- struct lu_cdebug_data *key;
+ struct libcfs_debug_msg_data *msgdata = cookie;
+ struct lu_cdebug_data *key;
int used;
int complete;
va_list args;
vsnprintf(key->lck_area + used,
ARRAY_SIZE(key->lck_area) - used, format, args);
if (complete) {
- if (cfs_cdebug_show(info->lpi_mask, info->lpi_subsys))
- libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
- (char *)info->lpi_file, info->lpi_fn,
- info->lpi_line, "%s", key->lck_area);
+ if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
+ libcfs_debug_msg(msgdata, "%s", key->lck_area);
key->lck_area[0] = 0;
}
va_end(args);
h = container_of0(hnode, struct lu_object_header, loh_hash);
if (likely(!lu_object_is_dying(h))) {
lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
+ cfs_list_del_init(&h->loh_lru);
return lu_object_top(h);
}
}
EXPORT_SYMBOL(lu_object_find);
+static struct lu_object *lu_object_new(const struct lu_env *env,
+ struct lu_device *dev,
+ const struct lu_fid *f,
+ const struct lu_object_conf *conf)
+{
+ struct lu_object *o;
+ cfs_hash_t *hs;
+ cfs_hash_bd_t bd;
+ struct lu_site_bkt_data *bkt;
+
+ o = lu_object_alloc(env, dev, f, conf);
+ if (unlikely(IS_ERR(o)))
+ return o;
+
+ hs = dev->ld_site->ls_obj_hash;
+ cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
+ bkt = cfs_hash_bd_extra_get(hs, &bd);
+ cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
+ bkt->lsb_busy++;
+ cfs_hash_bd_unlock(hs, &bd, 1);
+ return o;
+}
+
/**
* Core logic of lu_object_find*() functions.
*/
* - unlock index;
* - return object.
*
+ * For "LOC_F_NEW" case, we are sure the object is new established.
+ * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
+ * just alloc and insert directly.
+ *
* If dying object is found during index search, add @waiter to the
* site wait-queue and return ERR_PTR(-EAGAIN).
*/
+ if (conf != NULL && conf->loc_flags & LOC_F_NEW)
+ return lu_object_new(env, dev, f, conf);
+
s = dev->ld_site;
hs = s->ls_obj_hash;
cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
bkt = cfs_hash_bd_extra_get(hs, &bd);
cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
- cfs_list_add_tail(&o->lo_header->loh_lru, &bkt->lsb_lru);
bkt->lsb_busy++;
cfs_hash_bd_unlock(hs, &bd, 1);
return o;
* Global list of all sites on this node
*/
static CFS_LIST_HEAD(lu_sites);
-static CFS_DECLARE_MUTEX(lu_sites_guard);
+static DEFINE_MUTEX(lu_sites_guard);
/**
* Global environment used by site shrinker.
EXPORT_SYMBOL(lu_site_print);
enum {
- LU_CACHE_PERCENT = 20,
+ LU_CACHE_PERCENT_MAX = 50,
+ LU_CACHE_PERCENT_DEFAULT = 20
};
+static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
+CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
+ "Percentage of memory to be used as lu_object cache");
+
/**
* Return desired hash table order.
*/
cache_size = 1 << (30 - CFS_PAGE_SHIFT) * 3 / 4;
#endif
- cache_size = cache_size / 100 * LU_CACHE_PERCENT *
+ /* clear off unreasonable cache setting. */
+ if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
+ CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
+ " the range of (0, %u]. Will use default value: %u.\n",
+ lu_cache_percent, LU_CACHE_PERCENT_MAX,
+ LU_CACHE_PERCENT_DEFAULT);
+
+ lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
+ }
+ cache_size = cache_size / 100 * lu_cache_percent *
(CFS_PAGE_SIZE / 1024);
for (bits = 1; (1 << bits) < cache_size; ++bits) {
return bits;
}
-static unsigned lu_obj_hop_hash(cfs_hash_t *hs, void *key, unsigned mask)
+static unsigned lu_obj_hop_hash(cfs_hash_t *hs,
+ const void *key, unsigned mask)
{
struct lu_fid *fid = (struct lu_fid *)key;
- unsigned hash;
+ __u32 hash;
+
+ hash = fid_flatten32(fid);
+ hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
+ hash = cfs_hash_long(hash, hs->hs_bkt_bits);
+
+ /* give me another random factor */
+ hash -= cfs_hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
+
+ hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
+ hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
- hash = (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
- hash += fid_hash(fid, hs->hs_bkt_bits) << hs->hs_bkt_bits;
return hash & mask;
}
return &h->loh_fid;
}
-static int lu_obj_hop_keycmp(void *key, cfs_hlist_node_t *hnode)
+static int lu_obj_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
{
struct lu_object_header *h;
.hs_put_locked = lu_obj_hop_put_locked,
};
+void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
+{
+ spin_lock(&s->ls_ld_lock);
+ if (cfs_list_empty(&d->ld_linkage))
+ cfs_list_add(&d->ld_linkage, &s->ls_ld_linkage);
+ spin_unlock(&s->ls_ld_lock);
+}
+EXPORT_SYMBOL(lu_dev_add_linkage);
+
+void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
+{
+ spin_lock(&s->ls_ld_lock);
+ cfs_list_del_init(&d->ld_linkage);
+ spin_unlock(&s->ls_ld_lock);
+}
+EXPORT_SYMBOL(lu_dev_del_linkage);
+
/**
* Initialize site \a s, with \a d as the top level device.
*/
#define LU_SITE_BITS_MIN 12
-#define LU_SITE_BITS_MAX 23
+#define LU_SITE_BITS_MAX 24
/**
- * total 128 buckets, we don't want too many buckets because:
+ * total 256 buckets, we don't want too many buckets because:
* - consume too much memory
* - avoid unbalanced LRU list
*/
-#define LU_SITE_BKT_BITS 7
+#define LU_SITE_BKT_BITS 8
int lu_site_init(struct lu_site *s, struct lu_device *top)
{
struct lu_site_bkt_data *bkt;
cfs_hash_bd_t bd;
+ char name[16];
int bits;
int i;
ENTRY;
memset(s, 0, sizeof *s);
bits = lu_htable_order();
+ snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
bits >= LU_SITE_BITS_MIN; bits--) {
- s->ls_obj_hash = cfs_hash_create("lu_site", bits, bits,
+ s->ls_obj_hash = cfs_hash_create(name, bits, bits,
bits - LU_SITE_BKT_BITS,
sizeof(*bkt), 0, 0,
&lu_site_hash_ops,
lu_device_get(top);
lu_ref_add(&top->ld_reference, "site-top", s);
- RETURN(0);
+ CFS_INIT_LIST_HEAD(&s->ls_ld_linkage);
+ spin_lock_init(&s->ls_ld_lock);
+
+ lu_dev_add_linkage(s, top);
+
+ RETURN(0);
}
EXPORT_SYMBOL(lu_site_init);
*/
void lu_site_fini(struct lu_site *s)
{
- cfs_down(&lu_sites_guard);
+ mutex_lock(&lu_sites_guard);
cfs_list_del_init(&s->ls_linkage);
- cfs_up(&lu_sites_guard);
+ mutex_unlock(&lu_sites_guard);
if (s->ls_obj_hash != NULL) {
cfs_hash_putref(s->ls_obj_hash);
int lu_site_init_finish(struct lu_site *s)
{
int result;
- cfs_down(&lu_sites_guard);
+ mutex_lock(&lu_sites_guard);
result = lu_context_refill(&lu_shrink_env.le_ctx);
if (result == 0)
cfs_list_add(&s->ls_linkage, &lu_sites);
- cfs_up(&lu_sites_guard);
+ mutex_unlock(&lu_sites_guard);
return result;
}
EXPORT_SYMBOL(lu_site_init_finish);
cfs_atomic_set(&d->ld_ref, 0);
d->ld_type = t;
lu_ref_init(&d->ld_reference);
+ CFS_INIT_LIST_HEAD(&d->ld_linkage);
return 0;
}
EXPORT_SYMBOL(lu_device_init);
/* purge again. */
lu_site_purge(env, site, ~0);
- if (!cfs_hash_is_empty(site->ls_obj_hash)) {
- /*
- * Uh-oh, objects still exist.
- */
- static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR);
-
- lu_site_print(env, site, &cookie, lu_cdebug_printer);
- }
-
for (scan = top; scan != NULL; scan = next) {
const struct lu_device_type *ldt = scan->ld_type;
struct obd_type *type;
/**
* Maximal number of tld slots.
*/
- LU_CONTEXT_KEY_NR = 32
+ LU_CONTEXT_KEY_NR = 40
};
static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
-static cfs_spinlock_t lu_keys_guard = CFS_SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(lu_keys_guard);
/**
* Global counter incremented whenever key is registered, unregistered,
LASSERT(key->lct_owner != NULL);
result = -ENFILE;
- cfs_spin_lock(&lu_keys_guard);
+ spin_lock(&lu_keys_guard);
for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
if (lu_keys[i] == NULL) {
key->lct_index = i;
break;
}
}
- cfs_spin_unlock(&lu_keys_guard);
- return result;
+ spin_unlock(&lu_keys_guard);
+ return result;
}
EXPORT_SYMBOL(lu_context_key_register);
key->lct_fini(ctx, key, ctx->lc_value[index]);
lu_ref_del(&key->lct_reference, "ctx", ctx);
cfs_atomic_dec(&key->lct_used);
- LASSERT(key->lct_owner != NULL);
- if (!(ctx->lc_tags & LCT_NOREF)) {
- LASSERT(cfs_module_refcount(key->lct_owner) > 0);
- cfs_module_put(key->lct_owner);
- }
- ctx->lc_value[index] = NULL;
- }
+
+ LASSERT(key->lct_owner != NULL);
+ if ((ctx->lc_tags & LCT_NOREF) == 0) {
+ LINVRNT(cfs_module_refcount(key->lct_owner) > 0);
+ cfs_module_put(key->lct_owner);
+ }
+ ctx->lc_value[index] = NULL;
+ }
}
/**
*/
void lu_context_key_degister(struct lu_context_key *key)
{
- LASSERT(cfs_atomic_read(&key->lct_used) >= 1);
- LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
+ LASSERT(cfs_atomic_read(&key->lct_used) >= 1);
+ LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
- lu_context_key_quiesce(key);
+ lu_context_key_quiesce(key);
- ++key_set_version;
- cfs_spin_lock(&lu_keys_guard);
- key_fini(&lu_shrink_env.le_ctx, key->lct_index);
- if (lu_keys[key->lct_index]) {
- lu_keys[key->lct_index] = NULL;
- lu_ref_fini(&key->lct_reference);
- }
- cfs_spin_unlock(&lu_keys_guard);
+ ++key_set_version;
+ spin_lock(&lu_keys_guard);
+ key_fini(&lu_shrink_env.le_ctx, key->lct_index);
+ if (lu_keys[key->lct_index]) {
+ lu_keys[key->lct_index] = NULL;
+ lu_ref_fini(&key->lct_reference);
+ }
+ spin_unlock(&lu_keys_guard);
- LASSERTF(cfs_atomic_read(&key->lct_used) == 1,
- "key has instances: %d\n",
- cfs_atomic_read(&key->lct_used));
+ LASSERTF(cfs_atomic_read(&key->lct_used) == 1,
+ "key has instances: %d\n",
+ cfs_atomic_read(&key->lct_used));
}
EXPORT_SYMBOL(lu_context_key_degister);
/*
* XXX memory barrier has to go here.
*/
- cfs_spin_lock(&lu_keys_guard);
- cfs_list_for_each_entry(ctx, &lu_context_remembered,
- lc_remember)
- key_fini(ctx, key->lct_index);
- cfs_spin_unlock(&lu_keys_guard);
- ++key_set_version;
- }
+ spin_lock(&lu_keys_guard);
+ cfs_list_for_each_entry(ctx, &lu_context_remembered,
+ lc_remember)
+ key_fini(ctx, key->lct_index);
+ spin_unlock(&lu_keys_guard);
+ ++key_set_version;
+ }
}
EXPORT_SYMBOL(lu_context_key_quiesce);
static void keys_fini(struct lu_context *ctx)
{
- int i;
+ int i;
- cfs_spin_lock(&lu_keys_guard);
- if (ctx->lc_value != NULL) {
- for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
- key_fini(ctx, i);
- OBD_FREE(ctx->lc_value,
- ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
- ctx->lc_value = NULL;
- }
- cfs_spin_unlock(&lu_keys_guard);
+ if (ctx->lc_value == NULL)
+ return;
+
+ for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
+ key_fini(ctx, i);
+
+ OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
+ ctx->lc_value = NULL;
}
static int keys_fill(struct lu_context *ctx)
{
int i;
+ LINVRNT(ctx->lc_value != NULL);
for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
struct lu_context_key *key;
static int keys_init(struct lu_context *ctx)
{
- int result;
+ OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
+ if (likely(ctx->lc_value != NULL))
+ return keys_fill(ctx);
- OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
- if (likely(ctx->lc_value != NULL))
- result = keys_fill(ctx);
- else
- result = -ENOMEM;
-
- if (result != 0)
- keys_fini(ctx);
- return result;
+ return -ENOMEM;
}
/**
*/
int lu_context_init(struct lu_context *ctx, __u32 tags)
{
- memset(ctx, 0, sizeof *ctx);
- ctx->lc_state = LCS_INITIALIZED;
- ctx->lc_tags = tags;
- if (tags & LCT_REMEMBER) {
- cfs_spin_lock(&lu_keys_guard);
- cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
- cfs_spin_unlock(&lu_keys_guard);
- } else
- CFS_INIT_LIST_HEAD(&ctx->lc_remember);
- return keys_init(ctx);
+ int rc;
+
+ memset(ctx, 0, sizeof *ctx);
+ ctx->lc_state = LCS_INITIALIZED;
+ ctx->lc_tags = tags;
+ if (tags & LCT_REMEMBER) {
+ spin_lock(&lu_keys_guard);
+ cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
+ spin_unlock(&lu_keys_guard);
+ } else {
+ CFS_INIT_LIST_HEAD(&ctx->lc_remember);
+ }
+
+ rc = keys_init(ctx);
+ if (rc != 0)
+ lu_context_fini(ctx);
+
+ return rc;
}
EXPORT_SYMBOL(lu_context_init);
*/
void lu_context_fini(struct lu_context *ctx)
{
- LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
- ctx->lc_state = LCS_FINALIZED;
- keys_fini(ctx);
- cfs_spin_lock(&lu_keys_guard);
- cfs_list_del_init(&ctx->lc_remember);
- cfs_spin_unlock(&lu_keys_guard);
+ LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
+ ctx->lc_state = LCS_FINALIZED;
+
+ if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
+ LASSERT(cfs_list_empty(&ctx->lc_remember));
+ keys_fini(ctx);
+
+ } else { /* could race with key degister */
+ spin_lock(&lu_keys_guard);
+ keys_fini(ctx);
+ cfs_list_del_init(&ctx->lc_remember);
+ spin_unlock(&lu_keys_guard);
+ }
}
EXPORT_SYMBOL(lu_context_fini);
/**
* Allocate for context all missing keys that were registered after context
- * creation.
+ * creation. key_set_version is only changed in rare cases when modules
+ * are loaded and removed.
*/
int lu_context_refill(struct lu_context *ctx)
{
- LINVRNT(ctx->lc_value != NULL);
- return ctx->lc_version == key_set_version ? 0 : keys_fill(ctx);
+ return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
}
EXPORT_SYMBOL(lu_context_refill);
+/**
+ * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
+ * obd being added. Currently, this is only used on client side, specifically
+ * for echo device client, for other stack (like ptlrpc threads), context are
+ * predefined when the lu_device type are registered, during the module probe
+ * phase.
+ */
+__u32 lu_context_tags_default = 0;
+__u32 lu_session_tags_default = 0;
+
+void lu_context_tags_update(__u32 tags)
+{
+ spin_lock(&lu_keys_guard);
+ lu_context_tags_default |= tags;
+ key_set_version++;
+ spin_unlock(&lu_keys_guard);
+}
+EXPORT_SYMBOL(lu_context_tags_update);
+
+void lu_context_tags_clear(__u32 tags)
+{
+ spin_lock(&lu_keys_guard);
+ lu_context_tags_default &= ~tags;
+ key_set_version++;
+ spin_unlock(&lu_keys_guard);
+}
+EXPORT_SYMBOL(lu_context_tags_clear);
+
+void lu_session_tags_update(__u32 tags)
+{
+ spin_lock(&lu_keys_guard);
+ lu_session_tags_default |= tags;
+ key_set_version++;
+ spin_unlock(&lu_keys_guard);
+}
+EXPORT_SYMBOL(lu_session_tags_update);
+
+void lu_session_tags_clear(__u32 tags)
+{
+ spin_lock(&lu_keys_guard);
+ lu_session_tags_default &= ~tags;
+ key_set_version++;
+ spin_unlock(&lu_keys_guard);
+}
+EXPORT_SYMBOL(lu_session_tags_clear);
+
int lu_env_init(struct lu_env *env, __u32 tags)
{
int result;
}
EXPORT_SYMBOL(lu_env_refill);
+/**
+ * Currently, this API will only be used by echo client.
+ * Because echo client and normal lustre client will share
+ * same cl_env cache. So echo client needs to refresh
+ * the env context after it get one from the cache, especially
+ * when normal client and echo client co-exist in the same client.
+ */
+int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
+ __u32 stags)
+{
+ int result;
+
+ if ((env->le_ctx.lc_tags & ctags) != ctags) {
+ env->le_ctx.lc_version = 0;
+ env->le_ctx.lc_tags |= ctags;
+ }
+
+ if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
+ env->le_ses->lc_version = 0;
+ env->le_ses->lc_tags |= stags;
+ }
+
+ result = lu_env_refill(env);
+
+ return result;
+}
+EXPORT_SYMBOL(lu_env_refill_by_tags);
+
static struct cfs_shrinker *lu_site_shrinker = NULL;
typedef struct lu_site_stats{
}
#ifdef __KERNEL__
-static int lu_cache_shrink(int nr, unsigned int gfp_mask)
+
+/*
+ * There exists a potential lock inversion deadlock scenario when using
+ * Lustre on top of ZFS. This occurs between one of ZFS's
+ * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
+ * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
+ * while thread B will take the ht_lock and sleep on the lu_sites_guard
+ * lock. Obviously neither thread will wake and drop their respective hold
+ * on their lock.
+ *
+ * To prevent this from happening we must ensure the lu_sites_guard lock is
+ * not taken while down this code path. ZFS reliably does not set the
+ * __GFP_FS bit in its code paths, so this can be used to determine if it
+ * is safe to take the lu_sites_guard lock.
+ *
+ * Ideally we should accurately return the remaining number of cached
+ * objects without taking the lu_sites_guard lock, but this is not
+ * possible in the current implementation.
+ */
+static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
{
lu_site_stats_t stats;
struct lu_site *s;
struct lu_site *tmp;
int cached = 0;
- int remain = nr;
+ int remain = shrink_param(sc, nr_to_scan);
CFS_LIST_HEAD(splice);
- if (nr != 0) {
- if (!(gfp_mask & __GFP_FS))
+ if (!(shrink_param(sc, gfp_mask) & __GFP_FS)) {
+ if (remain != 0)
return -1;
- CDEBUG(D_INODE, "Shrink %d objects\n", nr);
+ else
+ /* We must not take the lu_sites_guard lock when
+ * __GFP_FS is *not* set because of the deadlock
+ * possibility detailed above. Additionally,
+ * since we cannot determine the number of
+ * objects in the cache without taking this
+ * lock, we're in a particularly tough spot. As
+ * a result, we'll just lie and say our cache is
+ * empty. This _should_ be ok, as we can't
+ * reclaim objects when __GFP_FS is *not* set
+ * anyways.
+ */
+ return 0;
}
- cfs_down(&lu_sites_guard);
+ CDEBUG(D_INODE, "Shrink %d objects\n", remain);
+
+ mutex_lock(&lu_sites_guard);
cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
- if (nr != 0) {
+ if (shrink_param(sc, nr_to_scan) != 0) {
remain = lu_site_purge(&lu_shrink_env, s, remain);
/*
* Move just shrunk site to the tail of site list to
memset(&stats, 0, sizeof(stats));
lu_site_stats_get(s->ls_obj_hash, &stats, 0);
cached += stats.lss_total - stats.lss_busy;
- if (nr && remain <= 0)
+ if (shrink_param(sc, nr_to_scan) && remain <= 0)
break;
}
cfs_list_splice(&splice, lu_sites.prev);
- cfs_up(&lu_sites_guard);
+ mutex_unlock(&lu_sites_guard);
cached = (cached / 100) * sysctl_vfs_cache_pressure;
- if (nr == 0)
+ if (shrink_param(sc, nr_to_scan) == 0)
CDEBUG(D_INODE, "%d objects cached\n", cached);
return cached;
}
int llo_global_init(void);
void llo_global_fini(void);
+/* context key constructor/destructor: lu_ucred_key_init, lu_ucred_key_fini */
+LU_KEY_INIT_FINI(lu_ucred, struct lu_ucred);
+
+static struct lu_context_key lu_ucred_key = {
+ .lct_tags = LCT_SESSION,
+ .lct_init = lu_ucred_key_init,
+ .lct_fini = lu_ucred_key_fini
+};
+
+/**
+ * Get ucred key if session exists and ucred key is allocated on it.
+ * Return NULL otherwise.
+ */
+struct lu_ucred *lu_ucred(const struct lu_env *env)
+{
+ if (!env->le_ses)
+ return NULL;
+ return lu_context_key_get(env->le_ses, &lu_ucred_key);
+}
+EXPORT_SYMBOL(lu_ucred);
+
+/**
+ * Get ucred key and check if it is properly initialized.
+ * Return NULL otherwise.
+ */
+struct lu_ucred *lu_ucred_check(const struct lu_env *env)
+{
+ struct lu_ucred *uc = lu_ucred(env);
+ if (uc && uc->uc_valid != UCRED_OLD && uc->uc_valid != UCRED_NEW)
+ return NULL;
+ return uc;
+}
+EXPORT_SYMBOL(lu_ucred_check);
+
+/**
+ * Get ucred key, which must exist and must be properly initialized.
+ * Assert otherwise.
+ */
+struct lu_ucred *lu_ucred_assert(const struct lu_env *env)
+{
+ struct lu_ucred *uc = lu_ucred_check(env);
+ LASSERT(uc != NULL);
+ return uc;
+}
+EXPORT_SYMBOL(lu_ucred_assert);
+
/**
* Initialization of global lu_* data.
*/
{
int result;
- CDEBUG(D_CONSOLE, "Lustre LU module (%p).\n", &lu_keys);
+ CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
result = lu_ref_global_init();
if (result != 0)
result = lu_context_key_register(&lu_global_key);
if (result != 0)
return result;
+
+ LU_CONTEXT_KEY_INIT(&lu_ucred_key);
+ result = lu_context_key_register(&lu_ucred_key);
+ if (result != 0)
+ return result;
+
/*
* At this level, we don't know what tags are needed, so allocate them
* conservatively. This should not be too bad, because this
* environment is global.
*/
- cfs_down(&lu_sites_guard);
+ mutex_lock(&lu_sites_guard);
result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
- cfs_up(&lu_sites_guard);
+ mutex_unlock(&lu_sites_guard);
if (result != 0)
return result;
}
lu_context_key_degister(&lu_global_key);
+ lu_context_key_degister(&lu_ucred_key);
/*
* Tear shrinker environment down _after_ de-registering
* lu_global_key, because the latter has a value in the former.
*/
- cfs_down(&lu_sites_guard);
+ mutex_lock(&lu_sites_guard);
lu_env_fini(&lu_shrink_env);
- cfs_up(&lu_sites_guard);
+ mutex_unlock(&lu_sites_guard);
lu_ref_global_fini();
}
int lu_kmem_init(struct lu_kmem_descr *caches)
{
int result;
+ struct lu_kmem_descr *iter = caches;
- for (result = 0; caches->ckd_cache != NULL; ++caches) {
- *caches->ckd_cache = cfs_mem_cache_create(caches->ckd_name,
- caches->ckd_size,
- 0, 0);
- if (*caches->ckd_cache == NULL) {
+ for (result = 0; iter->ckd_cache != NULL; ++iter) {
+ *iter->ckd_cache = cfs_mem_cache_create(iter->ckd_name,
+ iter->ckd_size,
+ 0, 0);
+ if (*iter->ckd_cache == NULL) {
result = -ENOMEM;
+ /* free all previously allocated caches */
+ lu_kmem_fini(caches);
break;
}
}
}
}
EXPORT_SYMBOL(lu_kmem_fini);
+
+/**
+ * Temporary solution to be able to assign fid in ->do_create()
+ * till we have fully-functional OST fids
+ */
+void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
+ const struct lu_fid *fid)
+{
+ struct lu_site *s = o->lo_dev->ld_site;
+ struct lu_fid *old = &o->lo_header->loh_fid;
+ struct lu_site_bkt_data *bkt;
+ struct lu_object *shadow;
+ cfs_waitlink_t waiter;
+ cfs_hash_t *hs;
+ cfs_hash_bd_t bd;
+ __u64 version = 0;
+
+ LASSERT(fid_is_zero(old));
+
+ hs = s->ls_obj_hash;
+ cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
+ shadow = htable_lookup(s, &bd, fid, &waiter, &version);
+ /* supposed to be unique */
+ LASSERT(shadow == NULL);
+ *old = *fid;
+ bkt = cfs_hash_bd_extra_get(hs, &bd);
+ cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
+ bkt->lsb_busy++;
+ cfs_hash_bd_unlock(hs, &bd, 1);
+}
+EXPORT_SYMBOL(lu_object_assign_fid);
+
+/**
+ * allocates object with 0 (non-assiged) fid
+ * XXX: temporary solution to be able to assign fid in ->do_create()
+ * till we have fully-functional OST fids
+ */
+struct lu_object *lu_object_anon(const struct lu_env *env,
+ struct lu_device *dev,
+ const struct lu_object_conf *conf)
+{
+ struct lu_fid fid;
+ struct lu_object *o;
+
+ fid_zero(&fid);
+ o = lu_object_alloc(env, dev, &fid, conf);
+
+ return o;
+}
+EXPORT_SYMBOL(lu_object_anon);