#include <linux/module.h>
#include <linux/list.h>
+#ifdef HAVE_PROCESSOR_H
+#include <linux/processor.h>
+#else
+#include <libcfs/linux/processor.h>
+#endif
+
#include <libcfs/libcfs.h>
#include <libcfs/libcfs_hash.h> /* hash_long() */
+#include <libcfs/linux/linux-mem.h>
#include <obd_class.h>
#include <obd_support.h>
#include <lustre_disk.h>
#include <lu_object.h>
#include <lu_ref.h>
+struct lu_site_bkt_data {
+ /**
+ * LRU list, updated on each access to object. Protected by
+ * bucket lock of lu_site::ls_obj_hash.
+ *
+ * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
+ * moved to the lu_site::ls_lru.prev (this is due to the non-existence
+ * of list_for_each_entry_safe_reverse()).
+ */
+ struct list_head lsb_lru;
+ /**
+ * Wait-queue signaled when an object in this site is ultimately
+ * destroyed (lu_object_free()). It is used by lu_object_find() to
+ * wait before re-trying when object in the process of destruction is
+ * found in the hash table.
+ *
+ * \see htable_lookup().
+ */
+ wait_queue_head_t lsb_marche_funebre;
+};
+
enum {
LU_CACHE_PERCENT_MAX = 50,
LU_CACHE_PERCENT_DEFAULT = 20
static void lu_object_free(const struct lu_env *env, struct lu_object *o);
static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
+wait_queue_head_t *
+lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
+{
+ struct cfs_hash_bd bd;
+ struct lu_site_bkt_data *bkt;
+
+ cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
+ bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
+ return &bkt->lsb_marche_funebre;
+}
+EXPORT_SYMBOL(lu_site_wq_from_fid);
+
/**
* Decrease reference counter on object. If last reference is freed, return
* object to the cache, unless lu_object_is_dying(o) holds. In the latter
void lu_object_put(const struct lu_env *env, struct lu_object *o)
{
struct lu_site_bkt_data *bkt;
- struct lu_object_header *top;
- struct lu_site *site;
- struct lu_object *orig;
+ struct lu_object_header *top = o->lo_header;
+ struct lu_site *site = o->lo_dev->ld_site;
+ struct lu_object *orig = o;
struct cfs_hash_bd bd;
- const struct lu_fid *fid;
-
- top = o->lo_header;
- site = o->lo_dev->ld_site;
- orig = o;
+ const struct lu_fid *fid = lu_object_fid(o);
+ bool is_dying;
/*
* till we have full fids-on-OST implemented anonymous objects
* are possible in OSP. such an object isn't listed in the site
* so we should not remove it from the site.
*/
- fid = lu_object_fid(o);
if (fid_is_zero(fid)) {
LASSERT(top->loh_hash.next == NULL
&& top->loh_hash.pprev == NULL);
cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
+ is_dying = lu_object_is_dying(top);
if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
- if (lu_object_is_dying(top)) {
+ /* at this point the object reference is dropped and lock is
+ * not taken, so lu_object should not be touched because it
+ * can be freed by concurrent thread. Use local variable for
+ * check.
+ */
+ if (is_dying) {
/*
* somebody may be waiting for this, currently only
* used for cl_object, see cl_object_put_last().
o->lo_ops->loo_object_release(env, o);
}
+ /* don't use local 'is_dying' here because if was taken without lock
+ * but here we need the latest actual value of it so check lu_object
+ * directly here.
+ */
if (!lu_object_is_dying(top) &&
(lu_object_exists(orig) || lu_object_is_cl(orig))) {
LASSERT(list_empty(&top->loh_lru));
list_add_tail(&top->loh_lru, &bkt->lsb_lru);
- bkt->lsb_lru_len++;
percpu_counter_inc(&site->ls_lru_len_counter);
- CDEBUG(D_INODE, "Add %p to site lru. hash: %p, bkt: %p, "
- "lru_len: %ld\n",
- o, site->ls_obj_hash, bkt, bkt->lsb_lru_len);
+ CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p\n",
+ orig, top, site->ls_obj_hash, bkt);
cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
return;
}
list_del_init(&top->loh_lru);
bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
- bkt->lsb_lru_len--;
percpu_counter_dec(&site->ls_lru_len_counter);
}
cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
*/
static void lu_object_free(const struct lu_env *env, struct lu_object *o)
{
- struct lu_site_bkt_data *bkt;
+ wait_queue_head_t *wq;
struct lu_site *site;
struct lu_object *scan;
struct list_head *layers;
struct list_head splice;
- site = o->lo_dev->ld_site;
- layers = &o->lo_header->loh_layers;
- bkt = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
+ site = o->lo_dev->ld_site;
+ layers = &o->lo_header->loh_layers;
+ wq = lu_site_wq_from_fid(site, &o->lo_header->loh_fid);
/*
* First call ->loo_object_delete() method to release all resources.
*/
o->lo_ops->loo_object_free(env, o);
}
- if (waitqueue_active(&bkt->lsb_marche_funebre))
- wake_up_all(&bkt->lsb_marche_funebre);
+ if (waitqueue_active(wq))
+ wake_up_all(wq);
}
/**
cfs_hash_bd_del_locked(s->ls_obj_hash,
&bd2, &h->loh_hash);
list_move(&h->loh_lru, &dispose);
- bkt->lsb_lru_len--;
percpu_counter_dec(&s->ls_lru_len_counter);
if (did_sth == 0)
did_sth = 1;
lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
if (!list_empty(&h->loh_lru)) {
list_del_init(&h->loh_lru);
- bkt->lsb_lru_len--;
percpu_counter_dec(&s->ls_lru_len_counter);
}
return lu_object_top(h);
MIN(size - nr, LU_CACHE_NR_MAX_ADJUST), 0);
}
-static struct lu_object *lu_object_new(const struct lu_env *env,
- struct lu_device *dev,
- const struct lu_fid *f,
- const struct lu_object_conf *conf)
-{
- struct lu_object *o;
- struct cfs_hash *hs;
- struct cfs_hash_bd bd;
-
- o = lu_object_alloc(env, dev, f, conf);
- if (unlikely(IS_ERR(o)))
- return o;
-
- hs = dev->ld_site->ls_obj_hash;
- cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
- cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
- cfs_hash_bd_unlock(hs, &bd, 1);
-
- lu_object_limit(env, dev);
-
- return o;
-}
-
/**
* Core logic of lu_object_find*() functions.
*
* It is unnecessary to perform lookup-alloc-lookup-insert, instead,
* just alloc and insert directly.
*
- * If dying object is found during index search, add @waiter to the
- * site wait-queue and return ERR_PTR(-EAGAIN).
*/
- if (conf && conf->loc_flags & LOC_F_NEW)
- return lu_object_new(env, dev, f, conf);
-
s = dev->ld_site;
hs = s->ls_obj_hash;
- cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
- o = htable_lookup(s, &bd, f, &version);
- cfs_hash_bd_unlock(hs, &bd, 1);
- if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
- return o;
+ cfs_hash_bd_get(hs, f, &bd);
+ if (!(conf && conf->loc_flags & LOC_F_NEW)) {
+ cfs_hash_bd_lock(hs, &bd, 1);
+ o = htable_lookup(s, &bd, f, &version);
+ cfs_hash_bd_unlock(hs, &bd, 1);
+ if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
+ return o;
+ }
/*
* Allocate new object. This may result in rather complicated
* operations, including fld queries, inode loading, etc.
*/
o = lu_object_alloc(env, dev, f, conf);
- if (unlikely(IS_ERR(o)))
+ if (IS_ERR(o))
return o;
LASSERT(lu_fid_eq(lu_object_fid(o), f));
cfs_hash_bd_lock(hs, &bd, 1);
- shadow = htable_lookup(s, &bd, f, &version);
- if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
+ if (conf && conf->loc_flags & LOC_F_NEW)
+ shadow = ERR_PTR(-ENOENT);
+ else
+ shadow = htable_lookup(s, &bd, f, &version);
+ if (likely(PTR_ERR(shadow) == -ENOENT)) {
cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
cfs_hash_bd_unlock(hs, &bd, 1);
static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
-DEFINE_RWLOCK(lu_keys_guard);
-static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0);
+static DECLARE_RWSEM(lu_key_initing);
/**
* Global counter incremented whenever key is registered, unregistered,
* lu_context_refill(). No locking is provided, as initialization and shutdown
* are supposed to be externally serialized.
*/
-static unsigned key_set_version = 0;
+static atomic_t key_set_version = ATOMIC_INIT(0);
/**
* Register new key.
LASSERT(key->lct_owner != NULL);
result = -ENFILE;
- write_lock(&lu_keys_guard);
+ atomic_set(&key->lct_used, 1);
+ lu_ref_init(&key->lct_reference);
for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
- if (lu_keys[i] == NULL) {
- key->lct_index = i;
- atomic_set(&key->lct_used, 1);
- lu_keys[i] = key;
- lu_ref_init(&key->lct_reference);
- result = 0;
- ++key_set_version;
- break;
- }
+ if (lu_keys[i])
+ continue;
+ key->lct_index = i;
+ if (cmpxchg(&lu_keys[i], NULL, key) != NULL)
+ continue;
+
+ result = 0;
+ atomic_inc(&key_set_version);
+ break;
}
- write_unlock(&lu_keys_guard);
+ if (result) {
+ lu_ref_fini(&key->lct_reference);
+ atomic_set(&key->lct_used, 0);
+ }
return result;
}
EXPORT_SYMBOL(lu_context_key_register);
key = lu_keys[index];
LASSERT(key != NULL);
LASSERT(key->lct_fini != NULL);
- LASSERT(atomic_read(&key->lct_used) > 1);
+ LASSERT(atomic_read(&key->lct_used) > 0);
key->lct_fini(ctx, key, ctx->lc_value[index]);
lu_ref_del(&key->lct_reference, "ctx", ctx);
- atomic_dec(&key->lct_used);
+ if (atomic_dec_and_test(&key->lct_used))
+ wake_up_var(&key->lct_used);
LASSERT(key->lct_owner != NULL);
if ((ctx->lc_tags & LCT_NOREF) == 0) {
lu_context_key_quiesce(key);
- ++key_set_version;
- write_lock(&lu_keys_guard);
key_fini(&lu_shrink_env.le_ctx, key->lct_index);
/**
* Wait until all transient contexts referencing this key have
* run lu_context_key::lct_fini() method.
*/
- while (atomic_read(&key->lct_used) > 1) {
- write_unlock(&lu_keys_guard);
- CDEBUG(D_INFO, "lu_context_key_degister: \"%s\" %p, %d\n",
- key->lct_owner ? key->lct_owner->name : "", key,
- atomic_read(&key->lct_used));
- schedule();
- write_lock(&lu_keys_guard);
- }
- if (lu_keys[key->lct_index]) {
- lu_keys[key->lct_index] = NULL;
+ atomic_dec(&key->lct_used);
+ wait_var_event(&key->lct_used, atomic_read(&key->lct_used) == 0);
+
+ if (!WARN_ON(lu_keys[key->lct_index] == NULL))
lu_ref_fini(&key->lct_reference);
- }
- write_unlock(&lu_keys_guard);
- LASSERTF(atomic_read(&key->lct_used) == 1,
- "key has instances: %d\n",
- atomic_read(&key->lct_used));
+ smp_store_release(&lu_keys[key->lct_index], NULL);
}
EXPORT_SYMBOL(lu_context_key_degister);
* List of remembered contexts. XXX document me.
*/
static LIST_HEAD(lu_context_remembered);
+static DEFINE_SPINLOCK(lu_context_remembered_guard);
/**
* Destroy \a key in all remembered contexts. This is used to destroy key
if (!(key->lct_tags & LCT_QUIESCENT)) {
/*
- * XXX memory barrier has to go here.
+ * The write-lock on lu_key_initing will ensure that any
+ * keys_fill() which didn't see LCT_QUIESCENT will have
+ * finished before we call key_fini().
*/
- write_lock(&lu_keys_guard);
+ down_write(&lu_key_initing);
key->lct_tags |= LCT_QUIESCENT;
+ up_write(&lu_key_initing);
- /**
- * Wait until all lu_context_key::lct_init() methods
- * have completed.
- */
- while (atomic_read(&lu_key_initing_cnt) > 0) {
- write_unlock(&lu_keys_guard);
- CDEBUG(D_INFO, "lu_context_key_quiesce: \"%s\""
- " %p, %d (%d)\n",
- key->lct_owner ? key->lct_owner->name : "",
- key, atomic_read(&key->lct_used),
- atomic_read(&lu_key_initing_cnt));
- schedule();
- write_lock(&lu_keys_guard);
- }
-
- list_for_each_entry(ctx, &lu_context_remembered,
- lc_remember)
+ spin_lock(&lu_context_remembered_guard);
+ list_for_each_entry(ctx, &lu_context_remembered, lc_remember) {
+ spin_until_cond(READ_ONCE(ctx->lc_state) != LCS_LEAVING);
key_fini(ctx, key->lct_index);
+ }
- ++key_set_version;
- write_unlock(&lu_keys_guard);
+ spin_unlock(&lu_context_remembered_guard);
}
}
void lu_context_key_revive(struct lu_context_key *key)
{
- write_lock(&lu_keys_guard);
key->lct_tags &= ~LCT_QUIESCENT;
- ++key_set_version;
- write_unlock(&lu_keys_guard);
+ atomic_inc(&key_set_version);
}
static void keys_fini(struct lu_context *ctx)
static int keys_fill(struct lu_context *ctx)
{
unsigned int i;
- unsigned pre_version;
+ int rc = 0;
/*
- * A serialisation with lu_context_key_quiesce() is needed, but some
- * "key->lct_init()" are calling kernel memory allocation routine and
- * can't be called while holding a spin_lock.
- * "lu_keys_guard" is held while incrementing "lu_key_initing_cnt"
- * to ensure the start of the serialisation.
- * An atomic_t variable is still used, in order not to reacquire the
- * lock when decrementing the counter.
+ * A serialisation with lu_context_key_quiesce() is needed, to
+ * ensure we see LCT_QUIESCENT and don't allocate a new value
+ * after it freed one. The rwsem provides this. As down_read()
+ * does optimistic spinning while the writer is active, this is
+ * unlikely to ever sleep.
*/
- read_lock(&lu_keys_guard);
- atomic_inc(&lu_key_initing_cnt);
- pre_version = key_set_version;
- read_unlock(&lu_keys_guard);
+ down_read(&lu_key_initing);
+ ctx->lc_version = atomic_read(&key_set_version);
-refill:
- LINVRNT(ctx->lc_value != NULL);
+ LINVRNT(ctx->lc_value);
for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
struct lu_context_key *key;
key = lu_keys[i];
- if (ctx->lc_value[i] == NULL && key != NULL &&
+ if (!ctx->lc_value[i] && key &&
(key->lct_tags & ctx->lc_tags) &&
/*
* Don't create values for a LCT_QUIESCENT key, as this
value = key->lct_init(ctx, key);
if (unlikely(IS_ERR(value))) {
- atomic_dec(&lu_key_initing_cnt);
- return PTR_ERR(value);
+ rc = PTR_ERR(value);
+ break;
}
lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
}
}
- read_lock(&lu_keys_guard);
- if (pre_version != key_set_version) {
- pre_version = key_set_version;
- read_unlock(&lu_keys_guard);
- goto refill;
- }
-
- ctx->lc_version = key_set_version;
-
- atomic_dec(&lu_key_initing_cnt);
- read_unlock(&lu_keys_guard);
- return 0;
+ up_read(&lu_key_initing);
+ return rc;
}
static int keys_init(struct lu_context *ctx)
ctx->lc_state = LCS_INITIALIZED;
ctx->lc_tags = tags;
if (tags & LCT_REMEMBER) {
- write_lock(&lu_keys_guard);
+ spin_lock(&lu_context_remembered_guard);
list_add(&ctx->lc_remember, &lu_context_remembered);
- write_unlock(&lu_keys_guard);
+ spin_unlock(&lu_context_remembered_guard);
} else {
INIT_LIST_HEAD(&ctx->lc_remember);
}
if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
LASSERT(list_empty(&ctx->lc_remember));
- keys_fini(ctx);
-
- } else { /* could race with key degister */
- write_lock(&lu_keys_guard);
- keys_fini(ctx);
+ } else {
+ /* could race with key degister */
+ spin_lock(&lu_context_remembered_guard);
list_del_init(&ctx->lc_remember);
- write_unlock(&lu_keys_guard);
+ spin_unlock(&lu_context_remembered_guard);
}
+ keys_fini(ctx);
}
EXPORT_SYMBOL(lu_context_fini);
{
unsigned int i;
- LINVRNT(ctx->lc_state == LCS_ENTERED);
- ctx->lc_state = LCS_LEFT;
- if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
- /* could race with key quiescency */
- if (ctx->lc_tags & LCT_REMEMBER)
- read_lock(&lu_keys_guard);
-
+ LINVRNT(ctx->lc_state == LCS_ENTERED);
+ /*
+ * Disable preempt to ensure we get a warning if
+ * any lct_exit ever tries to sleep. That would hurt
+ * lu_context_key_quiesce() which spins waiting for us.
+ * This also ensure we aren't preempted while the state
+ * is LCS_LEAVING, as that too would cause problems for
+ * lu_context_key_quiesce().
+ */
+ preempt_disable();
+ /*
+ * Ensure lu_context_key_quiesce() sees LCS_LEAVING
+ * or we see LCT_QUIESCENT
+ */
+ smp_store_mb(ctx->lc_state, LCS_LEAVING);
+ if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) {
for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
- if (ctx->lc_value[i] != NULL) {
- struct lu_context_key *key;
-
- key = lu_keys[i];
- LASSERT(key != NULL);
- if (key->lct_exit != NULL)
- key->lct_exit(ctx,
- key, ctx->lc_value[i]);
- }
- }
+ struct lu_context_key *key;
- if (ctx->lc_tags & LCT_REMEMBER)
- read_unlock(&lu_keys_guard);
+ key = lu_keys[i];
+ if (ctx->lc_value[i] &&
+ !(key->lct_tags & LCT_QUIESCENT) &&
+ key->lct_exit)
+ key->lct_exit(ctx, key, ctx->lc_value[i]);
+ }
}
+
+ smp_store_release(&ctx->lc_state, LCS_LEFT);
+ preempt_enable();
}
EXPORT_SYMBOL(lu_context_exit);
*/
int lu_context_refill(struct lu_context *ctx)
{
- return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
+ if (likely(ctx->lc_version == atomic_read(&key_set_version)))
+ return 0;
+
+ return keys_fill(ctx);
}
/**
* predefined when the lu_device type are registered, during the module probe
* phase.
*/
-__u32 lu_context_tags_default = 0;
-__u32 lu_session_tags_default = 0;
+u32 lu_context_tags_default;
+u32 lu_session_tags_default;
+#ifdef HAVE_SERVER_SUPPORT
void lu_context_tags_update(__u32 tags)
{
- write_lock(&lu_keys_guard);
+ spin_lock(&lu_context_remembered_guard);
lu_context_tags_default |= tags;
- key_set_version++;
- write_unlock(&lu_keys_guard);
+ atomic_inc(&key_set_version);
+ spin_unlock(&lu_context_remembered_guard);
}
EXPORT_SYMBOL(lu_context_tags_update);
void lu_context_tags_clear(__u32 tags)
{
- write_lock(&lu_keys_guard);
+ spin_lock(&lu_context_remembered_guard);
lu_context_tags_default &= ~tags;
- key_set_version++;
- write_unlock(&lu_keys_guard);
+ atomic_inc(&key_set_version);
+ spin_unlock(&lu_context_remembered_guard);
}
EXPORT_SYMBOL(lu_context_tags_clear);
void lu_session_tags_update(__u32 tags)
{
- write_lock(&lu_keys_guard);
+ spin_lock(&lu_context_remembered_guard);
lu_session_tags_default |= tags;
- key_set_version++;
- write_unlock(&lu_keys_guard);
+ atomic_inc(&key_set_version);
+ spin_unlock(&lu_context_remembered_guard);
}
EXPORT_SYMBOL(lu_session_tags_update);
void lu_session_tags_clear(__u32 tags)
{
- write_lock(&lu_keys_guard);
+ spin_lock(&lu_context_remembered_guard);
lu_session_tags_default &= ~tags;
- key_set_version++;
- write_unlock(&lu_keys_guard);
+ atomic_inc(&key_set_version);
+ spin_unlock(&lu_context_remembered_guard);
}
EXPORT_SYMBOL(lu_session_tags_clear);
+#endif /* HAVE_SERVER_SUPPORT */
int lu_env_init(struct lu_env *env, __u32 tags)
{
}
EXPORT_SYMBOL(lu_env_refill_by_tags);
+
+struct lu_env_item {
+ struct task_struct *lei_task; /* rhashtable key */
+ struct rhash_head lei_linkage;
+ struct lu_env *lei_env;
+};
+
+static const struct rhashtable_params lu_env_rhash_params = {
+ .key_len = sizeof(struct task_struct *),
+ .key_offset = offsetof(struct lu_env_item, lei_task),
+ .head_offset = offsetof(struct lu_env_item, lei_linkage),
+ };
+
+struct rhashtable lu_env_rhash;
+
+struct lu_env_percpu {
+ struct task_struct *lep_task;
+ struct lu_env *lep_env ____cacheline_aligned_in_smp;
+};
+
+static struct lu_env_percpu lu_env_percpu[NR_CPUS];
+
+int lu_env_add(struct lu_env *env)
+{
+ struct lu_env_item *lei, *old;
+
+ LASSERT(env);
+
+ OBD_ALLOC_PTR(lei);
+ if (!lei)
+ return -ENOMEM;
+
+ lei->lei_task = current;
+ lei->lei_env = env;
+
+ old = rhashtable_lookup_get_insert_fast(&lu_env_rhash,
+ &lei->lei_linkage,
+ lu_env_rhash_params);
+ LASSERT(!old);
+
+ return 0;
+}
+EXPORT_SYMBOL(lu_env_add);
+
+void lu_env_remove(struct lu_env *env)
+{
+ struct lu_env_item *lei;
+ const void *task = current;
+ int i;
+
+ for_each_possible_cpu(i) {
+ if (lu_env_percpu[i].lep_env == env) {
+ LASSERT(lu_env_percpu[i].lep_task == task);
+ lu_env_percpu[i].lep_task = NULL;
+ lu_env_percpu[i].lep_env = NULL;
+ }
+ }
+
+ rcu_read_lock();
+ lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
+ lu_env_rhash_params);
+ if (lei && rhashtable_remove_fast(&lu_env_rhash, &lei->lei_linkage,
+ lu_env_rhash_params) == 0)
+ OBD_FREE_PTR(lei);
+ rcu_read_unlock();
+}
+EXPORT_SYMBOL(lu_env_remove);
+
+struct lu_env *lu_env_find(void)
+{
+ struct lu_env *env = NULL;
+ struct lu_env_item *lei;
+ const void *task = current;
+ int i = get_cpu();
+
+ if (lu_env_percpu[i].lep_task == current) {
+ env = lu_env_percpu[i].lep_env;
+ put_cpu();
+ LASSERT(env);
+ return env;
+ }
+
+ lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
+ lu_env_rhash_params);
+ if (lei) {
+ env = lei->lei_env;
+ lu_env_percpu[i].lep_task = current;
+ lu_env_percpu[i].lep_env = env;
+ }
+ put_cpu();
+
+ return env;
+}
+EXPORT_SYMBOL(lu_env_find);
+
static struct shrinker *lu_site_shrinker;
typedef struct lu_site_stats{
unsigned lss_busy;
} lu_site_stats_t;
-static void lu_site_stats_get(struct cfs_hash *hs,
+static void lu_site_stats_get(const struct lu_site *s,
lu_site_stats_t *stats, int populated)
{
+ struct cfs_hash *hs = s->ls_obj_hash;
struct cfs_hash_bd bd;
- unsigned int i;
+ unsigned int i;
+ /*
+ * percpu_counter_sum_positive() won't accept a const pointer
+ * as it does modify the struct by taking a spinlock
+ */
+ struct lu_site *s2 = (struct lu_site *)s;
+ stats->lss_busy += cfs_hash_size_get(hs) -
+ percpu_counter_sum_positive(&s2->ls_lru_len_counter);
cfs_hash_for_each_bucket(hs, &bd, i) {
- struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
- struct hlist_head *hhead;
+ struct hlist_head *hhead;
cfs_hash_bd_lock(hs, &bd, 1);
- stats->lss_busy +=
- cfs_hash_bd_count_get(&bd) - bkt->lsb_lru_len;
stats->lss_total += cfs_hash_bd_count_get(&bd);
stats->lss_max_search = max((int)stats->lss_max_search,
cfs_hash_bd_depmax_get(&bd));
*/
int lu_global_init(void)
{
- int result;
+ int result;
DEF_SHRINKER_VAR(shvar, lu_cache_shrink,
lu_cache_shrink_count, lu_cache_shrink_scan);
if (lu_site_shrinker == NULL)
return -ENOMEM;
+ result = rhashtable_init(&lu_env_rhash, &lu_env_rhash_params);
+
return result;
}
lu_env_fini(&lu_shrink_env);
up_write(&lu_sites_guard);
+ rhashtable_destroy(&lu_env_rhash);
+
lu_ref_global_fini();
}
lu_site_stats_t stats;
memset(&stats, 0, sizeof(stats));
- lu_site_stats_get(s->ls_obj_hash, &stats, 1);
+ lu_site_stats_get(s, &stats, 1);
seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
stats.lss_busy,