X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Flu_object.c;h=9f9718089d5961ef0327df6232315d6c6f5a6537;hp=1be5a9d4c8e42f7fa7f5a78970e7d4d1ae89d6bd;hb=0098396983e1075668414aa5298a4990e61ffbda;hpb=e2cdf469b0224e631e7d86046a2de5d92e80b7ca diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index 1be5a9d..9f97180 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -23,7 +23,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2016, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -42,8 +42,16 @@ #include #include +#ifdef HAVE_PROCESSOR_H +#include +#else +#include +#endif +#include + #include #include /* hash_long() */ +#include #include #include #include @@ -51,6 +59,27 @@ #include #include +struct lu_site_bkt_data { + /** + * LRU list, updated on each access to object. Protected by + * lsb_waitq.lock. + * + * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are + * moved to the lu_site::ls_lru.prev + */ + struct list_head lsb_lru; + /** + * Wait-queue signaled when an object in this site is ultimately + * destroyed (lu_object_free()) or initialized (lu_object_start()). + * It is used by lu_object_find() to wait before re-trying when + * object in the process of destruction is found in the hash table; + * or wait object to be initialized by the allocator. + * + * \see htable_lookup(). + */ + wait_queue_head_t lsb_waitq; +}; + enum { LU_CACHE_PERCENT_MAX = 50, LU_CACHE_PERCENT_DEFAULT = 20 @@ -67,9 +96,11 @@ enum { #define LU_SITE_BITS_MAX 24 #define LU_SITE_BITS_MAX_CL 19 /** - * total 256 buckets, we don't want too many buckets because: - * - consume too much memory + * Max 256 buckets, we don't want too many buckets because: + * - consume too much memory (currently max 16K) * - avoid unbalanced LRU list + * With few cpus there is little gain from extra buckets, so + * we treat this as a maximum in lu_site_init(). */ #define LU_SITE_BKT_BITS 8 @@ -85,6 +116,31 @@ MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache"); static void lu_object_free(const struct lu_env *env, struct lu_object *o); static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx); +static u32 lu_fid_hash(const void *data, u32 seed) +{ + const struct lu_fid *fid = data; + + seed = cfs_hash_32(seed ^ fid->f_oid, 32); + seed ^= cfs_hash_64(fid->f_seq, 32); + return seed; +} + +static inline int lu_bkt_hash(struct lu_site *s, const struct lu_fid *fid) +{ + return lu_fid_hash(fid, s->ls_bkt_seed) & + (s->ls_bkt_cnt - 1); +} + +wait_queue_head_t * +lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid) +{ + struct lu_site_bkt_data *bkt; + + bkt = &site->ls_bkts[lu_bkt_hash(site, fid)]; + return &bkt->lsb_waitq; +} +EXPORT_SYMBOL(lu_site_wq_from_fid); + /** * Decrease reference counter on object. If last reference is freed, return * object to the cache, unless lu_object_is_dying(o) holds. In the latter @@ -92,23 +148,19 @@ static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx); */ void lu_object_put(const struct lu_env *env, struct lu_object *o) { - struct lu_site_bkt_data *bkt; - struct lu_object_header *top; - struct lu_site *site; - struct lu_object *orig; - struct cfs_hash_bd bd; - const struct lu_fid *fid; - - top = o->lo_header; - site = o->lo_dev->ld_site; - orig = o; + struct lu_site_bkt_data *bkt; + struct lu_object_header *top = o->lo_header; + struct lu_site *site = o->lo_dev->ld_site; + struct lu_object *orig = o; + struct cfs_hash_bd bd; + const struct lu_fid *fid = lu_object_fid(o); + bool is_dying; /* * till we have full fids-on-OST implemented anonymous objects * are possible in OSP. such an object isn't listed in the site * so we should not remove it from the site. */ - fid = lu_object_fid(o); if (fid_is_zero(fid)) { LASSERT(top->loh_hash.next == NULL && top->loh_hash.pprev == NULL); @@ -123,62 +175,70 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) return; } - cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd); - bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd); + cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd); + is_dying = lu_object_is_dying(top); if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) { - if (lu_object_is_dying(top)) { - + /* at this point the object reference is dropped and lock is + * not taken, so lu_object should not be touched because it + * can be freed by concurrent thread. Use local variable for + * check. + */ + if (is_dying) { /* * somebody may be waiting for this, currently only * used for cl_object, see cl_object_put_last(). */ - wake_up_all(&bkt->lsb_marche_funebre); + bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)]; + wake_up_all(&bkt->lsb_waitq); } return; } - /* - * When last reference is released, iterate over object - * layers, and notify them that object is no longer busy. - */ + /* + * When last reference is released, iterate over object + * layers, and notify them that object is no longer busy. + */ list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) { - if (o->lo_ops->loo_object_release != NULL) - o->lo_ops->loo_object_release(env, o); - } + if (o->lo_ops->loo_object_release != NULL) + o->lo_ops->loo_object_release(env, o); + } + + bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)]; + spin_lock(&bkt->lsb_waitq.lock); + /* don't use local 'is_dying' here because if was taken without lock + * but here we need the latest actual value of it so check lu_object + * directly here. + */ if (!lu_object_is_dying(top) && (lu_object_exists(orig) || lu_object_is_cl(orig))) { LASSERT(list_empty(&top->loh_lru)); list_add_tail(&top->loh_lru, &bkt->lsb_lru); - bkt->lsb_lru_len++; + spin_unlock(&bkt->lsb_waitq.lock); percpu_counter_inc(&site->ls_lru_len_counter); - CDEBUG(D_INODE, "Add %p to site lru. hash: %p, bkt: %p, " - "lru_len: %ld\n", - o, site->ls_obj_hash, bkt, bkt->lsb_lru_len); - cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); - return; - } + CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p\n", + orig, top, site->ls_obj_hash, bkt); + cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); + return; + } - /* + /* * If object is dying (will not be cached) then remove it - * from hash table and LRU. - * - * This is done with hash table and LRU lists locked. As the only - * way to acquire first reference to previously unreferenced - * object is through hash-table lookup (lu_object_find()), - * or LRU scanning (lu_site_purge()), that are done under hash-table - * and LRU lock, no race with concurrent object lookup is possible - * and we can safely destroy object below. - */ + * from hash table (it is already not on the LRU). + * + * This is done with hash table lists locked. As the only + * way to acquire first reference to previously unreferenced + * object is through hash-table lookup (lu_object_find()) + * which is done under hash-table, no race with concurrent + * object lookup is possible and we can safely destroy object below. + */ if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash); - cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); - /* - * Object was already removed from hash and lru above, can - * kill it. - */ - lu_object_free(env, orig); + spin_unlock(&bkt->lsb_waitq.lock); + cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); + /* Object was already removed from hash above, can kill it. */ + lu_object_free(env, orig); } EXPORT_SYMBOL(lu_object_put); @@ -212,9 +272,10 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o) if (!list_empty(&top->loh_lru)) { struct lu_site_bkt_data *bkt; + bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)]; + spin_lock(&bkt->lsb_waitq.lock); list_del_init(&top->loh_lru); - bkt = cfs_hash_bd_extra_get(obj_hash, &bd); - bkt->lsb_lru_len--; + spin_unlock(&bkt->lsb_waitq.lock); percpu_counter_dec(&site->ls_lru_len_counter); } cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash); @@ -231,17 +292,9 @@ EXPORT_SYMBOL(lu_object_unhash); */ static struct lu_object *lu_object_alloc(const struct lu_env *env, struct lu_device *dev, - const struct lu_fid *f, - const struct lu_object_conf *conf) + const struct lu_fid *f) { - struct lu_object *scan; struct lu_object *top; - struct list_head *layers; - unsigned int init_mask = 0; - unsigned int init_flag; - int clean; - int result; - ENTRY; /* * Create top-level object slice. This will also create @@ -249,15 +302,36 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, */ top = dev->ld_ops->ldo_object_alloc(env, NULL, dev); if (top == NULL) - RETURN(ERR_PTR(-ENOMEM)); + return ERR_PTR(-ENOMEM); if (IS_ERR(top)) - RETURN(top); - /* - * This is the only place where object fid is assigned. It's constant - * after this point. - */ - top->lo_header->loh_fid = *f; - layers = &top->lo_header->loh_layers; + return top; + /* + * This is the only place where object fid is assigned. It's constant + * after this point. + */ + top->lo_header->loh_fid = *f; + + return top; +} + +/** + * Initialize object. + * + * This is called after object hash insertion to avoid returning an object with + * stale attributes. + */ +static int lu_object_start(const struct lu_env *env, struct lu_device *dev, + struct lu_object *top, + const struct lu_object_conf *conf) +{ + struct lu_object *scan; + struct list_head *layers; + unsigned int init_mask = 0; + unsigned int init_flag; + int clean; + int result; + + layers = &top->lo_header->loh_layers; do { /* @@ -272,10 +346,9 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, clean = 0; scan->lo_header = top->lo_header; result = scan->lo_ops->loo_object_init(env, scan, conf); - if (result != 0) { - lu_object_free(env, top); - RETURN(ERR_PTR(result)); - } + if (result) + return result; + init_mask |= init_flag; next: init_flag <<= 1; @@ -283,17 +356,18 @@ next: } while (!clean); list_for_each_entry_reverse(scan, layers, lo_linkage) { - if (scan->lo_ops->loo_object_start != NULL) { - result = scan->lo_ops->loo_object_start(env, scan); - if (result != 0) { - lu_object_free(env, top); - RETURN(ERR_PTR(result)); - } - } - } + if (scan->lo_ops->loo_object_start != NULL) { + result = scan->lo_ops->loo_object_start(env, scan); + if (result) + return result; + } + } + + lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED); - lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED); - RETURN(top); + set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags); + + return 0; } /** @@ -301,15 +375,15 @@ next: */ static void lu_object_free(const struct lu_env *env, struct lu_object *o) { - struct lu_site_bkt_data *bkt; - struct lu_site *site; - struct lu_object *scan; - struct list_head *layers; - struct list_head splice; - - site = o->lo_dev->ld_site; - layers = &o->lo_header->loh_layers; - bkt = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid); + wait_queue_head_t *wq; + struct lu_site *site; + struct lu_object *scan; + struct list_head *layers; + LIST_HEAD(splice); + + site = o->lo_dev->ld_site; + layers = &o->lo_header->loh_layers; + wq = lu_site_wq_from_fid(site, &o->lo_header->loh_fid); /* * First call ->loo_object_delete() method to release all resources. */ @@ -324,7 +398,6 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o) * necessary, because lu_object_header is freed together with the * top-level slice. */ - INIT_LIST_HEAD(&splice); list_splice_init(layers, &splice); while (!list_empty(&splice)) { /* @@ -338,8 +411,8 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o) o->lo_ops->loo_object_free(env, o); } - if (waitqueue_active(&bkt->lsb_marche_funebre)) - wake_up_all(&bkt->lsb_marche_funebre); + if (waitqueue_active(wq)) + wake_up_all(wq); } /** @@ -353,9 +426,7 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s, struct lu_object_header *h; struct lu_object_header *temp; struct lu_site_bkt_data *bkt; - struct cfs_hash_bd bd; - struct cfs_hash_bd bd2; - struct list_head dispose; + LIST_HEAD(dispose); int did_sth; unsigned int start = 0; int count; @@ -365,14 +436,13 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s, if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU)) RETURN(0); - INIT_LIST_HEAD(&dispose); /* * Under LRU list lock, scan LRU list and move unreferenced objects to * the dispose list, removing them from LRU and hash table. */ if (nr != ~0) start = s->ls_purge_start; - bnr = (nr == ~0) ? -1 : nr / (int)CFS_HASH_NBKT(s->ls_obj_hash) + 1; + bnr = (nr == ~0) ? -1 : nr / s->ls_bkt_cnt + 1; again: /* * It doesn't make any sense to make purge threads parallel, that can @@ -384,23 +454,22 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s, goto out; did_sth = 0; - cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) { - if (i < start) - continue; + for (i = start; i < s->ls_bkt_cnt ; i++) { count = bnr; - cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1); - bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd); + bkt = &s->ls_bkts[i]; + spin_lock(&bkt->lsb_waitq.lock); list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) { LASSERT(atomic_read(&h->loh_ref) == 0); - cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2); - LASSERT(bd.bd_bucket == bd2.bd_bucket); + LINVRNT(lu_bkt_hash(s, &h->loh_fid) == i); - cfs_hash_bd_del_locked(s->ls_obj_hash, - &bd2, &h->loh_hash); + /* Cannot remove from hash under current spinlock, + * so set flag to stop object from being found + * by htable_lookup(). + */ + set_bit(LU_OBJECT_PURGING, &h->loh_flags); list_move(&h->loh_lru, &dispose); - bkt->lsb_lru_len--; percpu_counter_dec(&s->ls_lru_len_counter); if (did_sth == 0) did_sth = 1; @@ -412,15 +481,16 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s, break; } - cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1); + spin_unlock(&bkt->lsb_waitq.lock); cond_resched(); /* * Free everything on the dispose list. This is safe against * races due to the reasons described in lu_object_put(). */ - while (!list_empty(&dispose)) { - h = container_of0(dispose.next, - struct lu_object_header, loh_lru); + while ((h = list_first_entry_or_null(&dispose, + struct lu_object_header, + loh_lru)) != NULL) { + cfs_hash_del(s->ls_obj_hash, &h->loh_fid, &h->loh_hash); list_del_init(&h->loh_lru); lu_object_free(env, lu_object_top(h)); lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED); @@ -436,8 +506,7 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s, goto again; } /* race on s->ls_purge_start, but nobody cares */ - s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash); - + s->ls_purge_start = i & (s->ls_bkt_cnt - 1); out: return nr; } @@ -590,53 +659,49 @@ int lu_object_invariant(const struct lu_object *o) static struct lu_object *htable_lookup(struct lu_site *s, struct cfs_hash_bd *bd, const struct lu_fid *f, - wait_queue_t *waiter, __u64 *version) { - struct lu_site_bkt_data *bkt; struct lu_object_header *h; - struct hlist_node *hnode; - __u64 ver = cfs_hash_bd_version_get(bd); + struct hlist_node *hnode; + __u64 ver = cfs_hash_bd_version_get(bd); - if (*version == ver) + if (*version == ver) return ERR_PTR(-ENOENT); - *version = ver; - bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd); + *version = ver; /* cfs_hash_bd_peek_locked is a somehow "internal" function * of cfs_hash, it doesn't add refcount on object. */ hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f); - if (hnode == NULL) { - lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS); + if (!hnode) { + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS); return ERR_PTR(-ENOENT); - } + } - h = container_of0(hnode, struct lu_object_header, loh_hash); - if (likely(!lu_object_is_dying(h))) { - cfs_hash_get(s->ls_obj_hash, hnode); - lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT); - if (!list_empty(&h->loh_lru)) { - list_del_init(&h->loh_lru); - bkt->lsb_lru_len--; - percpu_counter_dec(&s->ls_lru_len_counter); + h = container_of0(hnode, struct lu_object_header, loh_hash); + if (!list_empty(&h->loh_lru)) { + struct lu_site_bkt_data *bkt; + + bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)]; + spin_lock(&bkt->lsb_waitq.lock); + /* Might have just been moved to the dispose list, in which + * case LU_OBJECT_PURGING will be set. In that case, + * delete it from the hash table immediately. + * When lu_site_purge_objects() tried, it will find it + * isn't there, which is harmless. + */ + if (test_bit(LU_OBJECT_PURGING, &h->loh_flags)) { + spin_unlock(&bkt->lsb_waitq.lock); + cfs_hash_bd_del_locked(s->ls_obj_hash, bd, hnode); + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS); + return ERR_PTR(-ENOENT); } - return lu_object_top(h); - } - - /* - * Lookup found an object being destroyed this object cannot be - * returned (to assure that references to dying objects are eventually - * drained), and moreover, lookup has to wait until object is freed. - */ - - if (likely(waiter != NULL)) { - init_waitqueue_entry(waiter, current); - add_wait_queue(&bkt->lsb_marche_funebre, waiter); - set_current_state(TASK_UNINTERRUPTIBLE); - lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE); + list_del_init(&h->loh_lru); + spin_unlock(&bkt->lsb_waitq.lock); + percpu_counter_dec(&s->ls_lru_len_counter); } - - return ERR_PTR(-EAGAIN); + cfs_hash_get(s->ls_obj_hash, hnode); + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT); + return lu_object_top(h); } /** @@ -676,133 +741,139 @@ static void lu_object_limit(const struct lu_env *env, MIN(size - nr, LU_CACHE_NR_MAX_ADJUST), 0); } -static struct lu_object *lu_object_new(const struct lu_env *env, - struct lu_device *dev, - const struct lu_fid *f, - const struct lu_object_conf *conf) +/** + * Core logic of lu_object_find*() functions. + * + * Much like lu_object_find(), but top level device of object is specifically + * \a dev rather than top level device of the site. This interface allows + * objects of different "stacking" to be created within the same site. + */ +struct lu_object *lu_object_find_at(const struct lu_env *env, + struct lu_device *dev, + const struct lu_fid *f, + const struct lu_object_conf *conf) { - struct lu_object *o; - struct cfs_hash *hs; - struct cfs_hash_bd bd; + struct lu_object *o; + struct lu_object *shadow; + struct lu_site *s; + struct cfs_hash *hs; + struct cfs_hash_bd bd; + struct lu_site_bkt_data *bkt; + __u64 version = 0; + int rc; - o = lu_object_alloc(env, dev, f, conf); - if (unlikely(IS_ERR(o))) - return o; + ENTRY; - hs = dev->ld_site->ls_obj_hash; - cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); - cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); - cfs_hash_bd_unlock(hs, &bd, 1); + /* + * This uses standard index maintenance protocol: + * + * - search index under lock, and return object if found; + * - otherwise, unlock index, allocate new object; + * - lock index and search again; + * - if nothing is found (usual case), insert newly created + * object into index; + * - otherwise (race: other thread inserted object), free + * object just allocated. + * - unlock index; + * - return object. + * + * For "LOC_F_NEW" case, we are sure the object is new established. + * It is unnecessary to perform lookup-alloc-lookup-insert, instead, + * just alloc and insert directly. + * + */ + s = dev->ld_site; + hs = s->ls_obj_hash; - lu_object_limit(env, dev); + if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE))) + lu_site_purge(env, s, -1); - return o; -} + bkt = &s->ls_bkts[lu_bkt_hash(s, f)]; + cfs_hash_bd_get(hs, f, &bd); + if (!(conf && conf->loc_flags & LOC_F_NEW)) { + cfs_hash_bd_lock(hs, &bd, 1); + o = htable_lookup(s, &bd, f, &version); + cfs_hash_bd_unlock(hs, &bd, 1); -/** - * Core logic of lu_object_find*() functions. - */ -static struct lu_object *lu_object_find_try(const struct lu_env *env, - struct lu_device *dev, - const struct lu_fid *f, - const struct lu_object_conf *conf, - wait_queue_t *waiter) -{ - struct lu_object *o; - struct lu_object *shadow; - struct lu_site *s; - struct cfs_hash *hs; - struct cfs_hash_bd bd; - __u64 version = 0; + if (!IS_ERR(o)) { + if (likely(lu_object_is_inited(o->lo_header))) + RETURN(o); - /* - * This uses standard index maintenance protocol: - * - * - search index under lock, and return object if found; - * - otherwise, unlock index, allocate new object; - * - lock index and search again; - * - if nothing is found (usual case), insert newly created - * object into index; - * - otherwise (race: other thread inserted object), free - * object just allocated. - * - unlock index; - * - return object. - * - * For "LOC_F_NEW" case, we are sure the object is new established. - * It is unnecessary to perform lookup-alloc-lookup-insert, instead, - * just alloc and insert directly. - * - * If dying object is found during index search, add @waiter to the - * site wait-queue and return ERR_PTR(-EAGAIN). - */ - if (conf != NULL && conf->loc_flags & LOC_F_NEW) - return lu_object_new(env, dev, f, conf); + wait_event_idle(bkt->lsb_waitq, + lu_object_is_inited(o->lo_header) || + lu_object_is_dying(o->lo_header)); - s = dev->ld_site; - hs = s->ls_obj_hash; - cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); - o = htable_lookup(s, &bd, f, waiter, &version); - cfs_hash_bd_unlock(hs, &bd, 1); - if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT) - return o; + if (lu_object_is_dying(o->lo_header)) { + lu_object_put(env, o); - /* - * Allocate new object. This may result in rather complicated - * operations, including fld queries, inode loading, etc. - */ - o = lu_object_alloc(env, dev, f, conf); - if (unlikely(IS_ERR(o))) - return o; + RETURN(ERR_PTR(-ENOENT)); + } - LASSERT(lu_fid_eq(lu_object_fid(o), f)); + RETURN(o); + } - cfs_hash_bd_lock(hs, &bd, 1); + if (PTR_ERR(o) != -ENOENT) + RETURN(o); + } - shadow = htable_lookup(s, &bd, f, waiter, &version); - if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) { - cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); - cfs_hash_bd_unlock(hs, &bd, 1); + /* + * Allocate new object, NB, object is unitialized in case object + * is changed between allocation and hash insertion, thus the object + * with stale attributes is returned. + */ + o = lu_object_alloc(env, dev, f); + if (IS_ERR(o)) + RETURN(o); - lu_object_limit(env, dev); + LASSERT(lu_fid_eq(lu_object_fid(o), f)); - return o; - } + CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE); - lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE); - cfs_hash_bd_unlock(hs, &bd, 1); - lu_object_free(env, o); - return shadow; -} + cfs_hash_bd_lock(hs, &bd, 1); -/** - * Much like lu_object_find(), but top level device of object is specifically - * \a dev rather than top level device of the site. This interface allows - * objects of different "stacking" to be created within the same site. - */ -struct lu_object *lu_object_find_at(const struct lu_env *env, - struct lu_device *dev, - const struct lu_fid *f, - const struct lu_object_conf *conf) -{ - struct lu_site_bkt_data *bkt; - struct lu_object *obj; - wait_queue_t wait; + if (conf && conf->loc_flags & LOC_F_NEW) + shadow = ERR_PTR(-ENOENT); + else + shadow = htable_lookup(s, &bd, f, &version); + if (likely(PTR_ERR(shadow) == -ENOENT)) { + cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); + cfs_hash_bd_unlock(hs, &bd, 1); - if (conf != NULL && conf->loc_flags & LOC_F_NOWAIT) - return lu_object_find_try(env, dev, f, conf, NULL); - - while (1) { - obj = lu_object_find_try(env, dev, f, conf, &wait); - if (obj != ERR_PTR(-EAGAIN)) - return obj; /* - * lu_object_find_try() already added waiter into the - * wait queue. + * This may result in rather complicated operations, including + * fld queries, inode loading, etc. */ - schedule(); - bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f); - remove_wait_queue(&bkt->lsb_marche_funebre, &wait); + rc = lu_object_start(env, dev, o, conf); + if (rc) { + lu_object_put_nocache(env, o); + RETURN(ERR_PTR(rc)); + } + + wake_up_all(&bkt->lsb_waitq); + + lu_object_limit(env, dev); + + RETURN(o); + } + + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE); + cfs_hash_bd_unlock(hs, &bd, 1); + lu_object_free(env, o); + + if (!(conf && conf->loc_flags & LOC_F_NEW) && + !lu_object_is_inited(shadow->lo_header)) { + wait_event_idle(bkt->lsb_waitq, + lu_object_is_inited(shadow->lo_header) || + lu_object_is_dying(shadow->lo_header)); + + if (lu_object_is_dying(shadow->lo_header)) { + lu_object_put(env, shadow); + + RETURN(ERR_PTR(-ENOENT)); + } } + + RETURN(shadow); } EXPORT_SYMBOL(lu_object_find_at); @@ -935,7 +1006,7 @@ static unsigned long lu_htable_order(struct lu_device *top) * * Size of lu_object is (arbitrary) taken as 1K (together with inode). */ - cache_size = totalram_pages; + cache_size = cfs_totalram_pages(); #if BITS_PER_LONG == 32 /* limit hashtable size for lowmem systems to low RAM */ @@ -1047,7 +1118,6 @@ EXPORT_SYMBOL(lu_dev_del_linkage); int lu_site_init(struct lu_site *s, struct lu_device *top) { struct lu_site_bkt_data *bkt; - struct cfs_hash_bd bd; char name[16]; unsigned long bits; unsigned int i; @@ -1070,7 +1140,7 @@ int lu_site_init(struct lu_site *s, struct lu_device *top) bits >= LU_SITE_BITS_MIN; bits--) { s->ls_obj_hash = cfs_hash_create(name, bits, bits, bits - LU_SITE_BKT_BITS, - sizeof(*bkt), 0, 0, + 0, 0, 0, &lu_site_hash_ops, CFS_HASH_SPIN_BKTLOCK | CFS_HASH_NO_ITEMREF | @@ -1086,16 +1156,30 @@ int lu_site_init(struct lu_site *s, struct lu_device *top) return -ENOMEM; } - cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) { - bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd); + s->ls_bkt_seed = prandom_u32(); + s->ls_bkt_cnt = max_t(long, 1 << LU_SITE_BKT_BITS, + 2 * num_possible_cpus()); + s->ls_bkt_cnt = roundup_pow_of_two(s->ls_bkt_cnt); + OBD_ALLOC_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*bkt)); + if (!s->ls_bkts) { + cfs_hash_putref(s->ls_obj_hash); + s->ls_obj_hash = NULL; + s->ls_bkts = NULL; + return -ENOMEM; + } + + for (i = 0; i < s->ls_bkt_cnt; i++) { + bkt = &s->ls_bkts[i]; INIT_LIST_HEAD(&bkt->lsb_lru); - init_waitqueue_head(&bkt->lsb_marche_funebre); + init_waitqueue_head(&bkt->lsb_waitq); } s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0); if (s->ls_stats == NULL) { - cfs_hash_putref(s->ls_obj_hash); + OBD_FREE_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*bkt)); + cfs_hash_putref(s->ls_obj_hash); s->ls_obj_hash = NULL; + s->ls_bkts = NULL; return -ENOMEM; } @@ -1143,6 +1227,8 @@ void lu_site_fini(struct lu_site *s) s->ls_obj_hash = NULL; } + OBD_FREE_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*s->ls_bkts)); + if (s->ls_top_dev != NULL) { s->ls_top_dev->ld_site = NULL; lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s); @@ -1359,14 +1445,8 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top) for (scan = top; scan != NULL; scan = next) { const struct lu_device_type *ldt = scan->ld_type; - struct obd_type *type; next = ldt->ldt_ops->ldto_device_free(env, scan); - type = ldt->ldt_obd_type; - if (type != NULL) { - type->typ_refcnt--; - class_put_type(type); - } } } @@ -1379,8 +1459,7 @@ enum { static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, }; -DEFINE_RWLOCK(lu_keys_guard); -static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0); +static DECLARE_RWSEM(lu_key_initing); /** * Global counter incremented whenever key is registered, unregistered, @@ -1388,7 +1467,7 @@ static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0); * lu_context_refill(). No locking is provided, as initialization and shutdown * are supposed to be externally serialized. */ -static unsigned key_set_version = 0; +static atomic_t key_set_version = ATOMIC_INIT(0); /** * Register new key. @@ -1404,19 +1483,23 @@ int lu_context_key_register(struct lu_context_key *key) LASSERT(key->lct_owner != NULL); result = -ENFILE; - write_lock(&lu_keys_guard); + atomic_set(&key->lct_used, 1); + lu_ref_init(&key->lct_reference); for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { - if (lu_keys[i] == NULL) { - key->lct_index = i; - atomic_set(&key->lct_used, 1); - lu_keys[i] = key; - lu_ref_init(&key->lct_reference); - result = 0; - ++key_set_version; - break; - } + if (lu_keys[i]) + continue; + key->lct_index = i; + if (cmpxchg(&lu_keys[i], NULL, key) != NULL) + continue; + + result = 0; + atomic_inc(&key_set_version); + break; } - write_unlock(&lu_keys_guard); + if (result) { + lu_ref_fini(&key->lct_reference); + atomic_set(&key->lct_used, 0); + } return result; } EXPORT_SYMBOL(lu_context_key_register); @@ -1429,11 +1512,12 @@ static void key_fini(struct lu_context *ctx, int index) key = lu_keys[index]; LASSERT(key != NULL); LASSERT(key->lct_fini != NULL); - LASSERT(atomic_read(&key->lct_used) > 1); + LASSERT(atomic_read(&key->lct_used) > 0); key->lct_fini(ctx, key, ctx->lc_value[index]); lu_ref_del(&key->lct_reference, "ctx", ctx); - atomic_dec(&key->lct_used); + if (atomic_dec_and_test(&key->lct_used)) + wake_up_var(&key->lct_used); LASSERT(key->lct_owner != NULL); if ((ctx->lc_tags & LCT_NOREF) == 0) { @@ -1454,31 +1538,19 @@ void lu_context_key_degister(struct lu_context_key *key) lu_context_key_quiesce(key); - ++key_set_version; - write_lock(&lu_keys_guard); key_fini(&lu_shrink_env.le_ctx, key->lct_index); /** * Wait until all transient contexts referencing this key have * run lu_context_key::lct_fini() method. */ - while (atomic_read(&key->lct_used) > 1) { - write_unlock(&lu_keys_guard); - CDEBUG(D_INFO, "lu_context_key_degister: \"%s\" %p, %d\n", - key->lct_owner ? key->lct_owner->name : "", key, - atomic_read(&key->lct_used)); - schedule(); - write_lock(&lu_keys_guard); - } - if (lu_keys[key->lct_index]) { - lu_keys[key->lct_index] = NULL; + atomic_dec(&key->lct_used); + wait_var_event(&key->lct_used, atomic_read(&key->lct_used) == 0); + + if (!WARN_ON(lu_keys[key->lct_index] == NULL)) lu_ref_fini(&key->lct_reference); - } - write_unlock(&lu_keys_guard); - LASSERTF(atomic_read(&key->lct_used) == 1, - "key has instances: %d\n", - atomic_read(&key->lct_used)); + smp_store_release(&lu_keys[key->lct_index], NULL); } EXPORT_SYMBOL(lu_context_key_degister); @@ -1580,6 +1652,7 @@ EXPORT_SYMBOL(lu_context_key_get); * List of remembered contexts. XXX document me. */ static LIST_HEAD(lu_context_remembered); +static DEFINE_SPINLOCK(lu_context_remembered_guard); /** * Destroy \a key in all remembered contexts. This is used to destroy key @@ -1592,38 +1665,28 @@ void lu_context_key_quiesce(struct lu_context_key *key) if (!(key->lct_tags & LCT_QUIESCENT)) { /* - * XXX memory barrier has to go here. + * The write-lock on lu_key_initing will ensure that any + * keys_fill() which didn't see LCT_QUIESCENT will have + * finished before we call key_fini(). */ - write_lock(&lu_keys_guard); + down_write(&lu_key_initing); key->lct_tags |= LCT_QUIESCENT; + up_write(&lu_key_initing); - /** - * Wait until all lu_context_key::lct_init() methods - * have completed. - */ - while (atomic_read(&lu_key_initing_cnt) > 0) { - write_unlock(&lu_keys_guard); - CDEBUG(D_INFO, "lu_context_key_quiesce: \"%s\"" - " %p, %d (%d)\n", - key->lct_owner ? key->lct_owner->name : "", - key, atomic_read(&key->lct_used), - atomic_read(&lu_key_initing_cnt)); - schedule(); - write_lock(&lu_keys_guard); + spin_lock(&lu_context_remembered_guard); + list_for_each_entry(ctx, &lu_context_remembered, lc_remember) { + spin_until_cond(READ_ONCE(ctx->lc_state) != LCS_LEAVING); + key_fini(ctx, key->lct_index); } - list_for_each_entry(ctx, &lu_context_remembered, - lc_remember) - key_fini(ctx, key->lct_index); - write_unlock(&lu_keys_guard); - ++key_set_version; + spin_unlock(&lu_context_remembered_guard); } } void lu_context_key_revive(struct lu_context_key *key) { - key->lct_tags &= ~LCT_QUIESCENT; - ++key_set_version; + key->lct_tags &= ~LCT_QUIESCENT; + atomic_inc(&key_set_version); } static void keys_fini(struct lu_context *ctx) @@ -1643,36 +1706,34 @@ static void keys_fini(struct lu_context *ctx) static int keys_fill(struct lu_context *ctx) { unsigned int i; + int rc = 0; /* - * A serialisation with lu_context_key_quiesce() is needed, but some - * "key->lct_init()" are calling kernel memory allocation routine and - * can't be called while holding a spin_lock. - * "lu_keys_guard" is held while incrementing "lu_key_initing_cnt" - * to ensure the start of the serialisation. - * An atomic_t variable is still used, in order not to reacquire the - * lock when decrementing the counter. + * A serialisation with lu_context_key_quiesce() is needed, to + * ensure we see LCT_QUIESCENT and don't allocate a new value + * after it freed one. The rwsem provides this. As down_read() + * does optimistic spinning while the writer is active, this is + * unlikely to ever sleep. */ - read_lock(&lu_keys_guard); - atomic_inc(&lu_key_initing_cnt); - read_unlock(&lu_keys_guard); - - LINVRNT(ctx->lc_value != NULL); - for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { - struct lu_context_key *key; - - key = lu_keys[i]; - if (ctx->lc_value[i] == NULL && key != NULL && - (key->lct_tags & ctx->lc_tags) && - /* - * Don't create values for a LCT_QUIESCENT key, as this - * will pin module owning a key. - */ - !(key->lct_tags & LCT_QUIESCENT)) { - void *value; - - LINVRNT(key->lct_init != NULL); - LINVRNT(key->lct_index == i); + down_read(&lu_key_initing); + ctx->lc_version = atomic_read(&key_set_version); + + LINVRNT(ctx->lc_value); + for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { + struct lu_context_key *key; + + key = lu_keys[i]; + if (!ctx->lc_value[i] && key && + (key->lct_tags & ctx->lc_tags) && + /* + * Don't create values for a LCT_QUIESCENT key, as this + * will pin module owning a key. + */ + !(key->lct_tags & LCT_QUIESCENT)) { + void *value; + + LINVRNT(key->lct_init != NULL); + LINVRNT(key->lct_index == i); LASSERT(key->lct_owner != NULL); if (!(ctx->lc_tags & LCT_NOREF) && @@ -1683,25 +1744,25 @@ static int keys_fill(struct lu_context *ctx) value = key->lct_init(ctx, key); if (unlikely(IS_ERR(value))) { - atomic_dec(&lu_key_initing_cnt); - return PTR_ERR(value); + rc = PTR_ERR(value); + break; } lu_ref_add_atomic(&key->lct_reference, "ctx", ctx); atomic_inc(&key->lct_used); - /* - * This is the only place in the code, where an - * element of ctx->lc_value[] array is set to non-NULL - * value. - */ - ctx->lc_value[i] = value; - if (key->lct_exit != NULL) - ctx->lc_tags |= LCT_HAS_EXIT; - } - ctx->lc_version = key_set_version; - } - atomic_dec(&lu_key_initing_cnt); - return 0; + /* + * This is the only place in the code, where an + * element of ctx->lc_value[] array is set to non-NULL + * value. + */ + ctx->lc_value[i] = value; + if (key->lct_exit != NULL) + ctx->lc_tags |= LCT_HAS_EXIT; + } + } + + up_read(&lu_key_initing); + return rc; } static int keys_init(struct lu_context *ctx) @@ -1724,9 +1785,9 @@ int lu_context_init(struct lu_context *ctx, __u32 tags) ctx->lc_state = LCS_INITIALIZED; ctx->lc_tags = tags; if (tags & LCT_REMEMBER) { - write_lock(&lu_keys_guard); + spin_lock(&lu_context_remembered_guard); list_add(&ctx->lc_remember, &lu_context_remembered); - write_unlock(&lu_keys_guard); + spin_unlock(&lu_context_remembered_guard); } else { INIT_LIST_HEAD(&ctx->lc_remember); } @@ -1749,14 +1810,13 @@ void lu_context_fini(struct lu_context *ctx) if ((ctx->lc_tags & LCT_REMEMBER) == 0) { LASSERT(list_empty(&ctx->lc_remember)); - keys_fini(ctx); - - } else { /* could race with key degister */ - write_lock(&lu_keys_guard); - keys_fini(ctx); + } else { + /* could race with key degister */ + spin_lock(&lu_context_remembered_guard); list_del_init(&ctx->lc_remember); - write_unlock(&lu_keys_guard); + spin_unlock(&lu_context_remembered_guard); } + keys_fini(ctx); } EXPORT_SYMBOL(lu_context_fini); @@ -1777,28 +1837,35 @@ void lu_context_exit(struct lu_context *ctx) { unsigned int i; - LINVRNT(ctx->lc_state == LCS_ENTERED); - ctx->lc_state = LCS_LEFT; - if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) { - /* could race with key quiescency */ - if (ctx->lc_tags & LCT_REMEMBER) - read_lock(&lu_keys_guard); - + LINVRNT(ctx->lc_state == LCS_ENTERED); + /* + * Disable preempt to ensure we get a warning if + * any lct_exit ever tries to sleep. That would hurt + * lu_context_key_quiesce() which spins waiting for us. + * This also ensure we aren't preempted while the state + * is LCS_LEAVING, as that too would cause problems for + * lu_context_key_quiesce(). + */ + preempt_disable(); + /* + * Ensure lu_context_key_quiesce() sees LCS_LEAVING + * or we see LCT_QUIESCENT + */ + smp_store_mb(ctx->lc_state, LCS_LEAVING); + if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) { for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { - if (ctx->lc_value[i] != NULL) { - struct lu_context_key *key; - - key = lu_keys[i]; - LASSERT(key != NULL); - if (key->lct_exit != NULL) - key->lct_exit(ctx, - key, ctx->lc_value[i]); - } - } + struct lu_context_key *key; - if (ctx->lc_tags & LCT_REMEMBER) - read_unlock(&lu_keys_guard); + key = lu_keys[i]; + if (ctx->lc_value[i] && + !(key->lct_tags & LCT_QUIESCENT) && + key->lct_exit) + key->lct_exit(ctx, key, ctx->lc_value[i]); + } } + + smp_store_release(&ctx->lc_state, LCS_LEFT); + preempt_enable(); } EXPORT_SYMBOL(lu_context_exit); @@ -1809,7 +1876,10 @@ EXPORT_SYMBOL(lu_context_exit); */ int lu_context_refill(struct lu_context *ctx) { - return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx); + if (likely(ctx->lc_version == atomic_read(&key_set_version))) + return 0; + + return keys_fill(ctx); } /** @@ -1819,42 +1889,42 @@ int lu_context_refill(struct lu_context *ctx) * predefined when the lu_device type are registered, during the module probe * phase. */ -__u32 lu_context_tags_default = 0; -__u32 lu_session_tags_default = 0; +u32 lu_context_tags_default = LCT_CL_THREAD; +u32 lu_session_tags_default = LCT_SESSION; void lu_context_tags_update(__u32 tags) { - write_lock(&lu_keys_guard); + spin_lock(&lu_context_remembered_guard); lu_context_tags_default |= tags; - key_set_version++; - write_unlock(&lu_keys_guard); + atomic_inc(&key_set_version); + spin_unlock(&lu_context_remembered_guard); } EXPORT_SYMBOL(lu_context_tags_update); void lu_context_tags_clear(__u32 tags) { - write_lock(&lu_keys_guard); + spin_lock(&lu_context_remembered_guard); lu_context_tags_default &= ~tags; - key_set_version++; - write_unlock(&lu_keys_guard); + atomic_inc(&key_set_version); + spin_unlock(&lu_context_remembered_guard); } EXPORT_SYMBOL(lu_context_tags_clear); void lu_session_tags_update(__u32 tags) { - write_lock(&lu_keys_guard); + spin_lock(&lu_context_remembered_guard); lu_session_tags_default |= tags; - key_set_version++; - write_unlock(&lu_keys_guard); + atomic_inc(&key_set_version); + spin_unlock(&lu_context_remembered_guard); } EXPORT_SYMBOL(lu_session_tags_update); void lu_session_tags_clear(__u32 tags) { - write_lock(&lu_keys_guard); + spin_lock(&lu_context_remembered_guard); lu_session_tags_default &= ~tags; - key_set_version++; - write_unlock(&lu_keys_guard); + atomic_inc(&key_set_version); + spin_unlock(&lu_context_remembered_guard); } EXPORT_SYMBOL(lu_session_tags_clear); @@ -1917,6 +1987,113 @@ int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags, } EXPORT_SYMBOL(lu_env_refill_by_tags); + +struct lu_env_item { + struct task_struct *lei_task; /* rhashtable key */ + struct rhash_head lei_linkage; + struct lu_env *lei_env; + struct rcu_head lei_rcu_head; +}; + +static const struct rhashtable_params lu_env_rhash_params = { + .key_len = sizeof(struct task_struct *), + .key_offset = offsetof(struct lu_env_item, lei_task), + .head_offset = offsetof(struct lu_env_item, lei_linkage), + }; + +struct rhashtable lu_env_rhash; + +struct lu_env_percpu { + struct task_struct *lep_task; + struct lu_env *lep_env ____cacheline_aligned_in_smp; +}; + +static struct lu_env_percpu lu_env_percpu[NR_CPUS]; + +int lu_env_add(struct lu_env *env) +{ + struct lu_env_item *lei, *old; + + LASSERT(env); + + OBD_ALLOC_PTR(lei); + if (!lei) + return -ENOMEM; + + lei->lei_task = current; + lei->lei_env = env; + + old = rhashtable_lookup_get_insert_fast(&lu_env_rhash, + &lei->lei_linkage, + lu_env_rhash_params); + LASSERT(!old); + + return 0; +} +EXPORT_SYMBOL(lu_env_add); + +static void lu_env_item_free(struct rcu_head *head) +{ + struct lu_env_item *lei; + + lei = container_of(head, struct lu_env_item, lei_rcu_head); + OBD_FREE_PTR(lei); +} + +void lu_env_remove(struct lu_env *env) +{ + struct lu_env_item *lei; + const void *task = current; + int i; + + for_each_possible_cpu(i) { + if (lu_env_percpu[i].lep_env == env) { + LASSERT(lu_env_percpu[i].lep_task == task); + lu_env_percpu[i].lep_task = NULL; + lu_env_percpu[i].lep_env = NULL; + } + } + + /* The rcu_lock is not taking in this case since the key + * used is the actual task_struct. This implies that each + * object is only removed by the owning thread, so there + * can never be a race on a particular object. + */ + lei = rhashtable_lookup_fast(&lu_env_rhash, &task, + lu_env_rhash_params); + if (lei && rhashtable_remove_fast(&lu_env_rhash, &lei->lei_linkage, + lu_env_rhash_params) == 0) + call_rcu(&lei->lei_rcu_head, lu_env_item_free); +} +EXPORT_SYMBOL(lu_env_remove); + +struct lu_env *lu_env_find(void) +{ + struct lu_env *env = NULL; + struct lu_env_item *lei; + const void *task = current; + int i = get_cpu(); + + if (lu_env_percpu[i].lep_task == current) { + env = lu_env_percpu[i].lep_env; + put_cpu(); + LASSERT(env); + return env; + } + + lei = rhashtable_lookup_fast(&lu_env_rhash, &task, + lu_env_rhash_params); + if (lei) { + env = lei->lei_env; + lu_env_percpu[i].lep_task = current; + lu_env_percpu[i].lep_env = env; + } + put_cpu(); + + return env; +} +EXPORT_SYMBOL(lu_env_find); + static struct shrinker *lu_site_shrinker; typedef struct lu_site_stats{ @@ -1926,33 +2103,22 @@ typedef struct lu_site_stats{ unsigned lss_busy; } lu_site_stats_t; -static void lu_site_stats_get(struct cfs_hash *hs, - lu_site_stats_t *stats, int populated) +static void lu_site_stats_get(const struct lu_site *s, + lu_site_stats_t *stats) { - struct cfs_hash_bd bd; - unsigned int i; - - cfs_hash_for_each_bucket(hs, &bd, i) { - struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd); - struct hlist_head *hhead; - - cfs_hash_bd_lock(hs, &bd, 1); - stats->lss_busy += - cfs_hash_bd_count_get(&bd) - bkt->lsb_lru_len; - stats->lss_total += cfs_hash_bd_count_get(&bd); - stats->lss_max_search = max((int)stats->lss_max_search, - cfs_hash_bd_depmax_get(&bd)); - if (!populated) { - cfs_hash_bd_unlock(hs, &bd, 1); - continue; - } + int cnt = cfs_hash_size_get(s->ls_obj_hash); + /* + * percpu_counter_sum_positive() won't accept a const pointer + * as it does modify the struct by taking a spinlock + */ + struct lu_site *s2 = (struct lu_site *)s; - cfs_hash_bd_for_each_hlist(hs, &bd, hhead) { - if (!hlist_empty(hhead)) - stats->lss_populated++; - } - cfs_hash_bd_unlock(hs, &bd, 1); - } + stats->lss_busy += cnt - + percpu_counter_sum_positive(&s2->ls_lru_len_counter); + + stats->lss_total += cnt; + stats->lss_max_search = 0; + stats->lss_populated = 0; } @@ -2057,10 +2223,6 @@ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) .nr_to_scan = shrink_param(sc, nr_to_scan), .gfp_mask = shrink_param(sc, gfp_mask) }; -#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL) - struct shrinker* shrinker = NULL; -#endif - CDEBUG(D_INODE, "Shrink %lu objects\n", scv.nr_to_scan); @@ -2127,7 +2289,7 @@ void lu_context_keys_dump(void) */ int lu_global_init(void) { - int result; + int result; DEF_SHRINKER_VAR(shvar, lu_cache_shrink, lu_cache_shrink_count, lu_cache_shrink_scan); @@ -2162,6 +2324,8 @@ int lu_global_init(void) if (lu_site_shrinker == NULL) return -ENOMEM; + result = rhashtable_init(&lu_env_rhash, &lu_env_rhash_params); + return result; } @@ -2185,6 +2349,8 @@ void lu_global_fini(void) lu_env_fini(&lu_shrink_env); up_write(&lu_sites_guard); + rhashtable_destroy(&lu_env_rhash); + lu_ref_global_fini(); } @@ -2209,7 +2375,7 @@ int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m) lu_site_stats_t stats; memset(&stats, 0, sizeof(stats)); - lu_site_stats_get(s->ls_obj_hash, &stats, 1); + lu_site_stats_get(s, &stats); seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n", stats.lss_busy, @@ -2284,10 +2450,10 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o, cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1); #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK { - __u64 version = 0; - wait_queue_t waiter; - struct lu_object *shadow; - shadow = htable_lookup(s, &bd, fid, &waiter, &version); + __u64 version = 0; + struct lu_object *shadow; + + shadow = htable_lookup(s, &bd, fid, &version); /* supposed to be unique */ LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT); } @@ -2307,11 +2473,19 @@ struct lu_object *lu_object_anon(const struct lu_env *env, struct lu_device *dev, const struct lu_object_conf *conf) { - struct lu_fid fid; + struct lu_fid fid; struct lu_object *o; + int rc; fid_zero(&fid); - o = lu_object_alloc(env, dev, &fid, conf); + o = lu_object_alloc(env, dev, &fid); + if (!IS_ERR(o)) { + rc = lu_object_start(env, dev, o, conf); + if (rc) { + lu_object_free(env, o); + return ERR_PTR(rc); + } + } return o; }