X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Flu_object.c;h=dce91d5613b96d3607b7e3fade073a3303001dc9;hp=63729fe4ac009f2d596cc80494d03ff70b069bc4;hb=979f5e1db041dc49585b97c4915a6bc3e58435da;hpb=99bb9f91f5c5ca6a380b22efa04a3c00c8f520ca diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index 63729fe..dce91d5 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -40,10 +40,13 @@ #define DEBUG_SUBSYSTEM S_CLASS +#include #include #include +#include +#include + #include -#include /* hash_long() */ #include #include #include @@ -55,22 +58,22 @@ struct lu_site_bkt_data { /** * LRU list, updated on each access to object. Protected by - * bucket lock of lu_site::ls_obj_hash. + * lsb_waitq.lock. * * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are - * moved to the lu_site::ls_lru.prev (this is due to the non-existence - * of list_for_each_entry_safe_reverse()). + * moved to the lu_site::ls_lru.prev */ struct list_head lsb_lru; /** * Wait-queue signaled when an object in this site is ultimately - * destroyed (lu_object_free()). It is used by lu_object_find() to - * wait before re-trying when object in the process of destruction is - * found in the hash table. + * destroyed (lu_object_free()) or initialized (lu_object_start()). + * It is used by lu_object_find() to wait before re-trying when + * object in the process of destruction is found in the hash table; + * or wait object to be initialized by the allocator. * * \see htable_lookup(). */ - wait_queue_head_t lsb_marche_funebre; + wait_queue_head_t lsb_waitq; }; enum { @@ -81,21 +84,21 @@ enum { #define LU_CACHE_NR_MAX_ADJUST 512 #define LU_CACHE_NR_UNLIMITED -1 #define LU_CACHE_NR_DEFAULT LU_CACHE_NR_UNLIMITED -#define LU_CACHE_NR_LDISKFS_LIMIT LU_CACHE_NR_UNLIMITED /** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */ #define LU_CACHE_NR_ZFS_LIMIT 10240 -#define LU_SITE_BITS_MIN 12 -#define LU_SITE_BITS_MAX 24 -#define LU_SITE_BITS_MAX_CL 19 +#define LU_CACHE_NR_MIN 4096 +#define LU_CACHE_NR_MAX 0x80000000UL + /** - * total 256 buckets, we don't want too many buckets because: - * - consume too much memory + * Max 256 buckets, we don't want too many buckets because: + * - consume too much memory (currently max 16K) * - avoid unbalanced LRU list + * With few cpus there is little gain from extra buckets, so + * we treat this as a maximum in lu_site_init(). */ #define LU_SITE_BKT_BITS 8 - static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT; module_param(lu_cache_percent, int, 0644); MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache"); @@ -107,15 +110,36 @@ MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache"); static void lu_object_free(const struct lu_env *env, struct lu_object *o); static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx); +static u32 lu_fid_hash(const void *data, u32 len, u32 seed) +{ + const struct lu_fid *fid = data; + + seed = cfs_hash_32(seed ^ fid->f_oid, 32); + seed ^= cfs_hash_64(fid->f_seq, 32); + return seed; +} + +static const struct rhashtable_params obj_hash_params = { + .key_len = sizeof(struct lu_fid), + .key_offset = offsetof(struct lu_object_header, loh_fid), + .head_offset = offsetof(struct lu_object_header, loh_hash), + .hashfn = lu_fid_hash, + .automatic_shrinking = true, +}; + +static inline int lu_bkt_hash(struct lu_site *s, const struct lu_fid *fid) +{ + return lu_fid_hash(fid, sizeof(*fid), s->ls_bkt_seed) & + (s->ls_bkt_cnt - 1); +} + wait_queue_head_t * lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid) { - struct cfs_hash_bd bd; struct lu_site_bkt_data *bkt; - cfs_hash_bd_get(site->ls_obj_hash, fid, &bd); - bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd); - return &bkt->lsb_marche_funebre; + bkt = &site->ls_bkts[lu_bkt_hash(site, fid)]; + return &bkt->lsb_waitq; } EXPORT_SYMBOL(lu_site_wq_from_fid); @@ -127,25 +151,17 @@ EXPORT_SYMBOL(lu_site_wq_from_fid); void lu_object_put(const struct lu_env *env, struct lu_object *o) { struct lu_site_bkt_data *bkt; - struct lu_object_header *top; - struct lu_site *site; - struct lu_object *orig; - struct cfs_hash_bd bd; - const struct lu_fid *fid; - - top = o->lo_header; - site = o->lo_dev->ld_site; - orig = o; + struct lu_object_header *top = o->lo_header; + struct lu_site *site = o->lo_dev->ld_site; + struct lu_object *orig = o; + const struct lu_fid *fid = lu_object_fid(o); /* * till we have full fids-on-OST implemented anonymous objects * are possible in OSP. such an object isn't listed in the site * so we should not remove it from the site. */ - fid = lu_object_fid(o); if (fid_is_zero(fid)) { - LASSERT(top->loh_hash.next == NULL - && top->loh_hash.pprev == NULL); LASSERT(list_empty(&top->loh_lru)); if (!atomic_dec_and_test(&top->loh_ref)) return; @@ -157,58 +173,74 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) return; } - cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd); - bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd); + bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)]; + if (atomic_add_unless(&top->loh_ref, -1, 1)) { +still_active: + /* + * At this point the object reference is dropped and lock is + * not taken, so lu_object should not be touched because it + * can be freed by concurrent thread. + * + * Somebody may be waiting for this, currently only used for + * cl_object, see cl_object_put_last(). + */ + wake_up(&bkt->lsb_waitq); - if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) { - if (lu_object_is_dying(top)) { - /* - * somebody may be waiting for this, currently only - * used for cl_object, see cl_object_put_last(). - */ - wake_up_all(&bkt->lsb_marche_funebre); - } return; } + spin_lock(&bkt->lsb_waitq.lock); + if (!atomic_dec_and_test(&top->loh_ref)) { + spin_unlock(&bkt->lsb_waitq.lock); + goto still_active; + } + + /* + * Refcount is zero, and cannot be incremented without taking the bkt + * lock, so object is stable. + */ + /* - * When last reference is released, iterate over object - * layers, and notify them that object is no longer busy. + * When last reference is released, iterate over object layers, and + * notify them that object is no longer busy. */ list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) { if (o->lo_ops->loo_object_release != NULL) o->lo_ops->loo_object_release(env, o); } + /* + * Don't use local 'is_dying' here because if was taken without lock but + * here we need the latest actual value of it so check lu_object + * directly here. + */ if (!lu_object_is_dying(top) && (lu_object_exists(orig) || lu_object_is_cl(orig))) { LASSERT(list_empty(&top->loh_lru)); list_add_tail(&top->loh_lru, &bkt->lsb_lru); + spin_unlock(&bkt->lsb_waitq.lock); percpu_counter_inc(&site->ls_lru_len_counter); - CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p\n", - orig, top, site->ls_obj_hash, bkt); - cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); + CDEBUG(D_INODE, "Add %p/%p to site lru. bkt: %p\n", + orig, top, bkt); return; } /* - * If object is dying (will not be cached) then remove it - * from hash table and LRU. + * If object is dying (will not be cached) then remove it from hash + * table (it is already not on the LRU). * - * This is done with hash table and LRU lists locked. As the only - * way to acquire first reference to previously unreferenced - * object is through hash-table lookup (lu_object_find()), - * or LRU scanning (lu_site_purge()), that are done under hash-table - * and LRU lock, no race with concurrent object lookup is possible - * and we can safely destroy object below. + * This is done with bucket lock held. As the only way to acquire first + * reference to previously unreferenced object is through hash-table + * lookup (lu_object_find()) which takes the lock for first reference, + * no race with concurrent object lookup is possible and we can safely + * destroy object below. */ if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) - cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash); - cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); - /* - * Object was already removed from hash and lru above, can - * kill it. - */ + rhashtable_remove_fast(&site->ls_obj_hash, &top->loh_hash, + obj_hash_params); + + spin_unlock(&bkt->lsb_waitq.lock); + /* Object was already removed from hash above, can kill it. */ lu_object_free(env, orig); } EXPORT_SYMBOL(lu_object_put); @@ -236,19 +268,19 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o) set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags); if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) { struct lu_site *site = o->lo_dev->ld_site; - struct cfs_hash *obj_hash = site->ls_obj_hash; - struct cfs_hash_bd bd; + struct rhashtable *obj_hash = &site->ls_obj_hash; + struct lu_site_bkt_data *bkt; - cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1); + bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)]; + spin_lock(&bkt->lsb_waitq.lock); if (!list_empty(&top->loh_lru)) { - struct lu_site_bkt_data *bkt; - list_del_init(&top->loh_lru); - bkt = cfs_hash_bd_extra_get(obj_hash, &bd); percpu_counter_dec(&site->ls_lru_len_counter); } - cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash); - cfs_hash_bd_unlock(obj_hash, &bd, 1); + spin_unlock(&bkt->lsb_waitq.lock); + + rhashtable_remove_fast(obj_hash, &top->loh_hash, + obj_hash_params); } } EXPORT_SYMBOL(lu_object_unhash); @@ -261,17 +293,9 @@ EXPORT_SYMBOL(lu_object_unhash); */ static struct lu_object *lu_object_alloc(const struct lu_env *env, struct lu_device *dev, - const struct lu_fid *f, - const struct lu_object_conf *conf) + const struct lu_fid *f) { - struct lu_object *scan; struct lu_object *top; - struct list_head *layers; - unsigned int init_mask = 0; - unsigned int init_flag; - int clean; - int result; - ENTRY; /* * Create top-level object slice. This will also create @@ -279,15 +303,36 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, */ top = dev->ld_ops->ldo_object_alloc(env, NULL, dev); if (top == NULL) - RETURN(ERR_PTR(-ENOMEM)); + return ERR_PTR(-ENOMEM); if (IS_ERR(top)) - RETURN(top); - /* - * This is the only place where object fid is assigned. It's constant - * after this point. - */ - top->lo_header->loh_fid = *f; - layers = &top->lo_header->loh_layers; + return top; + /* + * This is the only place where object fid is assigned. It's constant + * after this point. + */ + top->lo_header->loh_fid = *f; + + return top; +} + +/** + * Initialize object. + * + * This is called after object hash insertion to avoid returning an object with + * stale attributes. + */ +static int lu_object_start(const struct lu_env *env, struct lu_device *dev, + struct lu_object *top, + const struct lu_object_conf *conf) +{ + struct lu_object *scan; + struct list_head *layers; + unsigned int init_mask = 0; + unsigned int init_flag; + int clean; + int result; + + layers = &top->lo_header->loh_layers; do { /* @@ -302,10 +347,9 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, clean = 0; scan->lo_header = top->lo_header; result = scan->lo_ops->loo_object_init(env, scan, conf); - if (result != 0) { - lu_object_free(env, top); - RETURN(ERR_PTR(result)); - } + if (result) + return result; + init_mask |= init_flag; next: init_flag <<= 1; @@ -313,17 +357,18 @@ next: } while (!clean); list_for_each_entry_reverse(scan, layers, lo_linkage) { - if (scan->lo_ops->loo_object_start != NULL) { - result = scan->lo_ops->loo_object_start(env, scan); - if (result != 0) { - lu_object_free(env, top); - RETURN(ERR_PTR(result)); - } - } - } + if (scan->lo_ops->loo_object_start != NULL) { + result = scan->lo_ops->loo_object_start(env, scan); + if (result) + return result; + } + } + + lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED); - lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED); - RETURN(top); + set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags); + + return 0; } /** @@ -332,10 +377,10 @@ next: static void lu_object_free(const struct lu_env *env, struct lu_object *o) { wait_queue_head_t *wq; - struct lu_site *site; - struct lu_object *scan; - struct list_head *layers; - struct list_head splice; + struct lu_site *site; + struct lu_object *scan; + struct list_head *layers; + LIST_HEAD(splice); site = o->lo_dev->ld_site; layers = &o->lo_header->loh_layers; @@ -354,7 +399,6 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o) * necessary, because lu_object_header is freed together with the * top-level slice. */ - INIT_LIST_HEAD(&splice); list_splice_init(layers, &splice); while (!list_empty(&splice)) { /* @@ -362,7 +406,7 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o) * lives as long as possible and ->loo_object_free() methods * can look at its contents. */ - o = container_of0(splice.prev, struct lu_object, lo_linkage); + o = container_of(splice.prev, struct lu_object, lo_linkage); list_del_init(&o->lo_linkage); LASSERT(o->lo_ops->loo_object_free != NULL); o->lo_ops->loo_object_free(env, o); @@ -380,95 +424,89 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o) int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s, int nr, int canblock) { - struct lu_object_header *h; - struct lu_object_header *temp; - struct lu_site_bkt_data *bkt; - struct cfs_hash_bd bd; - struct cfs_hash_bd bd2; - struct list_head dispose; + struct lu_object_header *h; + struct lu_object_header *temp; + struct lu_site_bkt_data *bkt; + LIST_HEAD(dispose); int did_sth; unsigned int start = 0; - int count; - int bnr; + int count; + int bnr; unsigned int i; if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU)) RETURN(0); - INIT_LIST_HEAD(&dispose); - /* - * Under LRU list lock, scan LRU list and move unreferenced objects to - * the dispose list, removing them from LRU and hash table. - */ + /* + * Under LRU list lock, scan LRU list and move unreferenced objects to + * the dispose list, removing them from LRU and hash table. + */ if (nr != ~0) start = s->ls_purge_start; - bnr = (nr == ~0) ? -1 : nr / (int)CFS_HASH_NBKT(s->ls_obj_hash) + 1; - again: + bnr = (nr == ~0) ? -1 : nr / s->ls_bkt_cnt + 1; +again: /* * It doesn't make any sense to make purge threads parallel, that can - * only bring troubles to us. See LU-5331. + * only bring troubles to us. See LU-5331. */ if (canblock != 0) mutex_lock(&s->ls_purge_mutex); else if (mutex_trylock(&s->ls_purge_mutex) == 0) goto out; - did_sth = 0; - cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) { - if (i < start) - continue; - count = bnr; - cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1); - bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd); + did_sth = 0; + for (i = start; i < s->ls_bkt_cnt ; i++) { + count = bnr; + bkt = &s->ls_bkts[i]; + spin_lock(&bkt->lsb_waitq.lock); list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) { LASSERT(atomic_read(&h->loh_ref) == 0); - cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2); - LASSERT(bd.bd_bucket == bd2.bd_bucket); + LINVRNT(lu_bkt_hash(s, &h->loh_fid) == i); - cfs_hash_bd_del_locked(s->ls_obj_hash, - &bd2, &h->loh_hash); + set_bit(LU_OBJECT_UNHASHED, &h->loh_flags); + rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash, + obj_hash_params); list_move(&h->loh_lru, &dispose); percpu_counter_dec(&s->ls_lru_len_counter); - if (did_sth == 0) - did_sth = 1; + if (did_sth == 0) + did_sth = 1; - if (nr != ~0 && --nr == 0) - break; + if (nr != ~0 && --nr == 0) + break; - if (count > 0 && --count == 0) - break; + if (count > 0 && --count == 0) + break; } - cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1); + spin_unlock(&bkt->lsb_waitq.lock); cond_resched(); /* * Free everything on the dispose list. This is safe against * races due to the reasons described in lu_object_put(). */ - while (!list_empty(&dispose)) { - h = container_of0(dispose.next, - struct lu_object_header, loh_lru); + while ((h = list_first_entry_or_null(&dispose, + struct lu_object_header, + loh_lru)) != NULL) { list_del_init(&h->loh_lru); lu_object_free(env, lu_object_top(h)); lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED); } - if (nr == 0) - break; - } + if (nr == 0) + break; + } mutex_unlock(&s->ls_purge_mutex); - if (nr != 0 && did_sth && start != 0) { - start = 0; /* restart from the first bucket */ - goto again; - } - /* race on s->ls_purge_start, but nobody cares */ - s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash); - + if (nr != 0 && did_sth && start != 0) { + start = 0; /* restart from the first bucket */ + goto again; + } + /* race on s->ls_purge_start, but nobody cares */ + s->ls_purge_start = i & (s->ls_bkt_cnt - 1); out: - return nr; + return nr; } EXPORT_SYMBOL(lu_site_purge_objects); @@ -562,9 +600,9 @@ void lu_object_header_print(const struct lu_env *env, void *cookie, (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]", hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref), PFID(&hdr->loh_fid), - hlist_unhashed(&hdr->loh_hash) ? "" : " hash", - list_empty((struct list_head *)&hdr->loh_lru) ? \ - "" : " lru", + test_bit(LU_OBJECT_UNHASHED, + &hdr->loh_flags) ? "" : " hash", + list_empty(&hdr->loh_lru) ? "" : " lru", hdr->loh_attr & LOHA_EXISTS ? " exist" : ""); } EXPORT_SYMBOL(lu_object_header_print); @@ -616,36 +654,97 @@ int lu_object_invariant(const struct lu_object *o) return 1; } -static struct lu_object *htable_lookup(struct lu_site *s, - struct cfs_hash_bd *bd, +/* + * Limit the lu_object cache to a maximum of lu_cache_nr objects. Because the + * calculation for the number of objects to reclaim is not covered by a lock the + * maximum number of objects is capped by LU_CACHE_MAX_ADJUST. This ensures + * that many concurrent threads will not accidentally purge the entire cache. + */ +static void lu_object_limit(const struct lu_env *env, + struct lu_device *dev) +{ + u64 size, nr; + + if (lu_cache_nr == LU_CACHE_NR_UNLIMITED) + return; + + size = atomic_read(&dev->ld_site->ls_obj_hash.nelems); + nr = (u64)lu_cache_nr; + if (size <= nr) + return; + + lu_site_purge_objects(env, dev->ld_site, + min_t(u64, size - nr, LU_CACHE_NR_MAX_ADJUST), + 0); +} + +static struct lu_object *htable_lookup(const struct lu_env *env, + struct lu_device *dev, + struct lu_site_bkt_data *bkt, const struct lu_fid *f, - __u64 *version) + struct lu_object_header *new) { - struct lu_site_bkt_data *bkt; + struct lu_site *s = dev->ld_site; struct lu_object_header *h; - struct hlist_node *hnode; - __u64 ver = cfs_hash_bd_version_get(bd); - if (*version == ver) +try_again: + rcu_read_lock(); + if (new) + h = rhashtable_lookup_get_insert_fast(&s->ls_obj_hash, + &new->loh_hash, + obj_hash_params); + else + h = rhashtable_lookup(&s->ls_obj_hash, f, obj_hash_params); + + if (IS_ERR_OR_NULL(h)) { + /* Not found */ + if (!new) + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS); + rcu_read_unlock(); + if (PTR_ERR(h) == -ENOMEM) { + msleep(20); + goto try_again; + } + lu_object_limit(env, dev); + if (PTR_ERR(h) == -E2BIG) + goto try_again; + return ERR_PTR(-ENOENT); + } + + if (atomic_inc_not_zero(&h->loh_ref)) { + rcu_read_unlock(); + return lu_object_top(h); + } - *version = ver; - bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd); - /* cfs_hash_bd_peek_locked is a somehow "internal" function - * of cfs_hash, it doesn't add refcount on object. */ - hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f); - if (!hnode) { + spin_lock(&bkt->lsb_waitq.lock); + if (lu_object_is_dying(h) || + test_bit(LU_OBJECT_UNHASHED, &h->loh_flags)) { + spin_unlock(&bkt->lsb_waitq.lock); + rcu_read_unlock(); + if (new) { + /* + * Old object might have already been removed, or will + * be soon. We need to insert our new object, so + * remove the old one just in case it is still there. + */ + rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash, + obj_hash_params); + goto try_again; + } lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS); return ERR_PTR(-ENOENT); } + /* Now protected by spinlock */ + rcu_read_unlock(); - h = container_of0(hnode, struct lu_object_header, loh_hash); - cfs_hash_get(s->ls_obj_hash, hnode); - lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT); if (!list_empty(&h->loh_lru)) { list_del_init(&h->loh_lru); percpu_counter_dec(&s->ls_lru_len_counter); } + atomic_inc(&h->loh_ref); + spin_unlock(&bkt->lsb_waitq.lock); + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT); return lu_object_top(h); } @@ -663,28 +762,37 @@ struct lu_object *lu_object_find(const struct lu_env *env, EXPORT_SYMBOL(lu_object_find); /* - * Limit the lu_object cache to a maximum of lu_cache_nr objects. Because - * the calculation for the number of objects to reclaim is not covered by - * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST. - * This ensures that many concurrent threads will not accidentally purge - * the entire cache. + * Get a 'first' reference to an object that was found while looking through the + * hash table. */ -static void lu_object_limit(const struct lu_env *env, - struct lu_device *dev) +struct lu_object *lu_object_get_first(struct lu_object_header *h, + struct lu_device *dev) { - __u64 size, nr; + struct lu_site *s = dev->ld_site; + struct lu_object *ret; - if (lu_cache_nr == LU_CACHE_NR_UNLIMITED) - return; + if (IS_ERR_OR_NULL(h) || lu_object_is_dying(h)) + return NULL; - size = cfs_hash_size_get(dev->ld_site->ls_obj_hash); - nr = (__u64)lu_cache_nr; - if (size <= nr) - return; + ret = lu_object_locate(h, dev->ld_type); + if (!ret) + return ret; - lu_site_purge_objects(env, dev->ld_site, - MIN(size - nr, LU_CACHE_NR_MAX_ADJUST), 0); + if (!atomic_inc_not_zero(&h->loh_ref)) { + struct lu_site_bkt_data *bkt; + + bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)]; + spin_lock(&bkt->lsb_waitq.lock); + if (!lu_object_is_dying(h) && + !test_bit(LU_OBJECT_UNHASHED, &h->loh_flags)) + atomic_inc(&h->loh_ref); + else + ret = NULL; + spin_unlock(&bkt->lsb_waitq.lock); + } + return ret; } +EXPORT_SYMBOL(lu_object_get_first); /** * Core logic of lu_object_find*() functions. @@ -701,9 +809,11 @@ struct lu_object *lu_object_find_at(const struct lu_env *env, struct lu_object *o; struct lu_object *shadow; struct lu_site *s; - struct cfs_hash *hs; - struct cfs_hash_bd bd; - __u64 version = 0; + struct lu_site_bkt_data *bkt; + struct rhashtable *hs; + int rc; + + ENTRY; /* * This uses standard index maintenance protocol: @@ -724,45 +834,98 @@ struct lu_object *lu_object_find_at(const struct lu_env *env, * */ s = dev->ld_site; - hs = s->ls_obj_hash; - cfs_hash_bd_get(hs, f, &bd); + hs = &s->ls_obj_hash; + + if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE))) + lu_site_purge(env, s, -1); + + bkt = &s->ls_bkts[lu_bkt_hash(s, f)]; if (!(conf && conf->loc_flags & LOC_F_NEW)) { - cfs_hash_bd_lock(hs, &bd, 1); - o = htable_lookup(s, &bd, f, &version); - cfs_hash_bd_unlock(hs, &bd, 1); + o = htable_lookup(env, dev, bkt, f, NULL); - if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT) - return o; + if (!IS_ERR(o)) { + if (likely(lu_object_is_inited(o->lo_header))) + RETURN(o); + + wait_event_idle(bkt->lsb_waitq, + lu_object_is_inited(o->lo_header) || + lu_object_is_dying(o->lo_header)); + + if (lu_object_is_dying(o->lo_header)) { + lu_object_put(env, o); + + RETURN(ERR_PTR(-ENOENT)); + } + + RETURN(o); + } + + if (PTR_ERR(o) != -ENOENT) + RETURN(o); } + /* - * Allocate new object. This may result in rather complicated - * operations, including fld queries, inode loading, etc. + * Allocate new object, NB, object is unitialized in case object + * is changed between allocation and hash insertion, thus the object + * with stale attributes is returned. */ - o = lu_object_alloc(env, dev, f, conf); + o = lu_object_alloc(env, dev, f); if (IS_ERR(o)) - return o; + RETURN(o); LASSERT(lu_fid_eq(lu_object_fid(o), f)); - cfs_hash_bd_lock(hs, &bd, 1); + CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE); - if (conf && conf->loc_flags & LOC_F_NEW) - shadow = ERR_PTR(-ENOENT); - else - shadow = htable_lookup(s, &bd, f, &version); + if (conf && conf->loc_flags & LOC_F_NEW) { + int status = rhashtable_insert_fast(hs, &o->lo_header->loh_hash, + obj_hash_params); + if (status) + /* Strange error - go the slow way */ + shadow = htable_lookup(env, dev, bkt, f, o->lo_header); + else + shadow = ERR_PTR(-ENOENT); + } else { + shadow = htable_lookup(env, dev, bkt, f, o->lo_header); + } if (likely(PTR_ERR(shadow) == -ENOENT)) { - cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); - cfs_hash_bd_unlock(hs, &bd, 1); + /* + * The new object has been successfully inserted. + * + * This may result in rather complicated operations, including + * fld queries, inode loading, etc. + */ + rc = lu_object_start(env, dev, o, conf); + if (rc) { + lu_object_put_nocache(env, o); + RETURN(ERR_PTR(rc)); + } + + wake_up(&bkt->lsb_waitq); lu_object_limit(env, dev); - return o; + RETURN(o); } lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE); - cfs_hash_bd_unlock(hs, &bd, 1); lu_object_free(env, o); - return shadow; + + if (!(conf && conf->loc_flags & LOC_F_NEW) && + !IS_ERR(shadow) && + !lu_object_is_inited(shadow->lo_header)) { + wait_event_idle(bkt->lsb_waitq, + lu_object_is_inited(shadow->lo_header) || + lu_object_is_dying(shadow->lo_header)); + + if (lu_object_is_dying(shadow->lo_header)) { + lu_object_put(env, shadow); + + RETURN(ERR_PTR(-ENOENT)); + } + } + + RETURN(shadow); } EXPORT_SYMBOL(lu_object_find_at); @@ -827,14 +990,9 @@ struct lu_site_print_arg { lu_printer_t lsp_printer; }; -static int -lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd, - struct hlist_node *hnode, void *data) +static void +lu_site_obj_print(struct lu_object_header *h, struct lu_site_print_arg *arg) { - struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data; - struct lu_object_header *h; - - h = hlist_entry(hnode, struct lu_object_header, loh_hash); if (!list_empty(&h->loh_layers)) { const struct lu_object *o; @@ -845,33 +1003,45 @@ lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd, lu_object_header_print(arg->lsp_env, arg->lsp_cookie, arg->lsp_printer, h); } - return 0; } /** * Print all objects in \a s. */ -void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie, - lu_printer_t printer) +void lu_site_print(const struct lu_env *env, struct lu_site *s, atomic_t *ref, + int msg_flag, lu_printer_t printer) { - struct lu_site_print_arg arg = { - .lsp_env = (struct lu_env *)env, - .lsp_cookie = cookie, - .lsp_printer = printer, - }; + struct lu_site_print_arg arg = { + .lsp_env = (struct lu_env *)env, + .lsp_printer = printer, + }; + struct rhashtable_iter iter; + struct lu_object_header *h; + LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, msg_flag, NULL); - cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg); + if (!s || !atomic_read(ref)) + return; + + arg.lsp_cookie = (void *)&msgdata; + + rhashtable_walk_enter(&s->ls_obj_hash, &iter); + rhashtable_walk_start(&iter); + while ((h = rhashtable_walk_next(&iter)) != NULL) { + if (IS_ERR(h)) + continue; + lu_site_obj_print(h, &arg); + } + rhashtable_walk_stop(&iter); + rhashtable_walk_exit(&iter); } EXPORT_SYMBOL(lu_site_print); /** * Return desired hash table order. */ -static unsigned long lu_htable_order(struct lu_device *top) +static void lu_htable_limits(struct lu_device *top) { unsigned long cache_size; - unsigned long bits; - unsigned long bits_max = LU_SITE_BITS_MAX; /* * For ZFS based OSDs the cache should be disabled by default. This @@ -880,110 +1050,40 @@ static unsigned long lu_htable_order(struct lu_device *top) * always stay cached it must maintain a hold on them. */ if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) { - lu_cache_percent = 1; lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT; - return LU_SITE_BITS_MIN; + return; } - if (strcmp(top->ld_type->ldt_name, LUSTRE_VVP_NAME) == 0) - bits_max = LU_SITE_BITS_MAX_CL; - - /* - * Calculate hash table size, assuming that we want reasonable - * performance when 20% of total memory is occupied by cache of - * lu_objects. - * - * Size of lu_object is (arbitrary) taken as 1K (together with inode). - */ - cache_size = totalram_pages; + /* + * Calculate hash table size, assuming that we want reasonable + * performance when 20% of total memory is occupied by cache of + * lu_objects. + * + * Size of lu_object is (arbitrary) taken as 1K (together with inode). + */ + cache_size = cfs_totalram_pages(); #if BITS_PER_LONG == 32 - /* limit hashtable size for lowmem systems to low RAM */ + /* limit hashtable size for lowmem systems to low RAM */ if (cache_size > 1 << (30 - PAGE_SHIFT)) cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4; #endif - /* clear off unreasonable cache setting. */ - if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) { - CWARN("obdclass: invalid lu_cache_percent: %u, it must be in" - " the range of (0, %u]. Will use default value: %u.\n", - lu_cache_percent, LU_CACHE_PERCENT_MAX, - LU_CACHE_PERCENT_DEFAULT); + /* clear off unreasonable cache setting. */ + if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) { + CWARN("obdclass: invalid lu_cache_percent: %u, it must be in the range of (0, %u]. Will use default value: %u.\n", + lu_cache_percent, LU_CACHE_PERCENT_MAX, + LU_CACHE_PERCENT_DEFAULT); - lu_cache_percent = LU_CACHE_PERCENT_DEFAULT; - } - cache_size = cache_size / 100 * lu_cache_percent * + lu_cache_percent = LU_CACHE_PERCENT_DEFAULT; + } + cache_size = cache_size / 100 * lu_cache_percent * (PAGE_SIZE / 1024); - for (bits = 1; (1 << bits) < cache_size; ++bits) { - ; - } - - return clamp_t(typeof(bits), bits, LU_SITE_BITS_MIN, bits_max); -} - -static unsigned lu_obj_hop_hash(struct cfs_hash *hs, - const void *key, unsigned mask) -{ - struct lu_fid *fid = (struct lu_fid *)key; - __u32 hash; - - hash = fid_flatten32(fid); - hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */ - hash = hash_long(hash, hs->hs_bkt_bits); - - /* give me another random factor */ - hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3); - - hash <<= hs->hs_cur_bits - hs->hs_bkt_bits; - hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1); - - return hash & mask; -} - -static void *lu_obj_hop_object(struct hlist_node *hnode) -{ - return hlist_entry(hnode, struct lu_object_header, loh_hash); -} - -static void *lu_obj_hop_key(struct hlist_node *hnode) -{ - struct lu_object_header *h; - - h = hlist_entry(hnode, struct lu_object_header, loh_hash); - return &h->loh_fid; -} - -static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode) -{ - struct lu_object_header *h; - - h = hlist_entry(hnode, struct lu_object_header, loh_hash); - return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key); -} - -static void lu_obj_hop_get(struct cfs_hash *hs, struct hlist_node *hnode) -{ - struct lu_object_header *h; - - h = hlist_entry(hnode, struct lu_object_header, loh_hash); - atomic_inc(&h->loh_ref); -} - -static void lu_obj_hop_put_locked(struct cfs_hash *hs, struct hlist_node *hnode) -{ - LBUG(); /* we should never called it */ + lu_cache_nr = clamp_t(typeof(cache_size), cache_size, + LU_CACHE_NR_MIN, LU_CACHE_NR_MAX); } -static struct cfs_hash_ops lu_site_hash_ops = { - .hs_hash = lu_obj_hop_hash, - .hs_key = lu_obj_hop_key, - .hs_keycmp = lu_obj_hop_keycmp, - .hs_object = lu_obj_hop_object, - .hs_get = lu_obj_hop_get, - .hs_put_locked = lu_obj_hop_put_locked, -}; - void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d) { spin_lock(&s->ls_ld_lock); @@ -1007,15 +1107,13 @@ EXPORT_SYMBOL(lu_dev_del_linkage); int lu_site_init(struct lu_site *s, struct lu_device *top) { struct lu_site_bkt_data *bkt; - struct cfs_hash_bd bd; - char name[16]; - unsigned long bits; unsigned int i; int rc; ENTRY; memset(s, 0, sizeof *s); mutex_init(&s->ls_purge_mutex); + lu_htable_limits(top); #ifdef HAVE_PERCPU_COUNTER_INIT_GFP_FLAG rc = percpu_counter_init(&s->ls_lru_len_counter, 0, GFP_NOFS); @@ -1025,39 +1123,35 @@ int lu_site_init(struct lu_site *s, struct lu_device *top) if (rc) return -ENOMEM; - snprintf(name, sizeof(name), "lu_site_%s", top->ld_type->ldt_name); - for (bits = lu_htable_order(top); - bits >= LU_SITE_BITS_MIN; bits--) { - s->ls_obj_hash = cfs_hash_create(name, bits, bits, - bits - LU_SITE_BKT_BITS, - sizeof(*bkt), 0, 0, - &lu_site_hash_ops, - CFS_HASH_SPIN_BKTLOCK | - CFS_HASH_NO_ITEMREF | - CFS_HASH_DEPTH | - CFS_HASH_ASSERT_EMPTY | - CFS_HASH_COUNTER); - if (s->ls_obj_hash != NULL) - break; + if (rhashtable_init(&s->ls_obj_hash, &obj_hash_params) != 0) { + CERROR("failed to create lu_site hash\n"); + return -ENOMEM; } - if (s->ls_obj_hash == NULL) { - CERROR("failed to create lu_site hash with bits: %lu\n", bits); + s->ls_bkt_seed = prandom_u32(); + s->ls_bkt_cnt = max_t(long, 1 << LU_SITE_BKT_BITS, + 2 * num_possible_cpus()); + s->ls_bkt_cnt = roundup_pow_of_two(s->ls_bkt_cnt); + OBD_ALLOC_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt); + if (!s->ls_bkts) { + rhashtable_destroy(&s->ls_obj_hash); + s->ls_bkts = NULL; return -ENOMEM; } - cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) { - bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd); + for (i = 0; i < s->ls_bkt_cnt; i++) { + bkt = &s->ls_bkts[i]; INIT_LIST_HEAD(&bkt->lsb_lru); - init_waitqueue_head(&bkt->lsb_marche_funebre); + init_waitqueue_head(&bkt->lsb_waitq); } - s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0); - if (s->ls_stats == NULL) { - cfs_hash_putref(s->ls_obj_hash); - s->ls_obj_hash = NULL; - return -ENOMEM; - } + s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0); + if (s->ls_stats == NULL) { + OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt); + s->ls_bkts = NULL; + rhashtable_destroy(&s->ls_obj_hash); + return -ENOMEM; + } lprocfs_counter_init(s->ls_stats, LU_SS_CREATED, 0, "created", "created"); @@ -1098,10 +1192,11 @@ void lu_site_fini(struct lu_site *s) percpu_counter_destroy(&s->ls_lru_len_counter); - if (s->ls_obj_hash != NULL) { - cfs_hash_putref(s->ls_obj_hash); - s->ls_obj_hash = NULL; - } + if (s->ls_bkts) { + rhashtable_destroy(&s->ls_obj_hash); + OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt); + s->ls_bkts = NULL; + } if (s->ls_top_dev != NULL) { s->ls_top_dev->ld_site = NULL; @@ -1149,14 +1244,25 @@ void lu_device_put(struct lu_device *d) } EXPORT_SYMBOL(lu_device_put); +enum { /* Maximal number of tld slots. */ + LU_CONTEXT_KEY_NR = 40 +}; +static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, }; +static DECLARE_RWSEM(lu_key_initing); + /** * Initialize device \a d of type \a t. */ int lu_device_init(struct lu_device *d, struct lu_device_type *t) { - if (atomic_inc_return(&t->ldt_device_nr) == 1 && - t->ldt_ops->ldto_start != NULL) - t->ldt_ops->ldto_start(t); + if (atomic_add_unless(&t->ldt_device_nr, 1, 0) == 0) { + down_write(&lu_key_initing); + if (t->ldt_ops->ldto_start && + atomic_read(&t->ldt_device_nr) == 0) + t->ldt_ops->ldto_start(t); + atomic_inc(&t->ldt_device_nr); + up_write(&lu_key_initing); + } memset(d, 0, sizeof *d); d->ld_type = t; @@ -1257,7 +1363,6 @@ int lu_object_header_init(struct lu_object_header *h) { memset(h, 0, sizeof *h); atomic_set(&h->loh_ref, 1); - INIT_HLIST_NODE(&h->loh_hash); INIT_LIST_HEAD(&h->loh_lru); INIT_LIST_HEAD(&h->loh_layers); lu_ref_init(&h->loh_reference); @@ -1272,7 +1377,6 @@ void lu_object_header_fini(struct lu_object_header *h) { LASSERT(list_empty(&h->loh_layers)); LASSERT(list_empty(&h->loh_lru)); - LASSERT(hlist_unhashed(&h->loh_hash)); lu_ref_fini(&h->loh_reference); } EXPORT_SYMBOL(lu_object_header_fini); @@ -1319,29 +1423,11 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top) for (scan = top; scan != NULL; scan = next) { const struct lu_device_type *ldt = scan->ld_type; - struct obd_type *type; next = ldt->ldt_ops->ldto_device_free(env, scan); - type = ldt->ldt_obd_type; - if (type != NULL) { - type->typ_refcnt--; - class_put_type(type); - } } } -enum { - /** - * Maximal number of tld slots. - */ - LU_CONTEXT_KEY_NR = 40 -}; - -static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, }; - -DEFINE_RWLOCK(lu_keys_guard); -static DECLARE_RWSEM(lu_key_initing); - /** * Global counter incremented whenever key is registered, unregistered, * revived or quiesced. This is used to void unnecessary calls to @@ -1364,19 +1450,27 @@ int lu_context_key_register(struct lu_context_key *key) LASSERT(key->lct_owner != NULL); result = -ENFILE; - write_lock(&lu_keys_guard); + atomic_set(&key->lct_used, 1); + lu_ref_init(&key->lct_reference); for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { - if (lu_keys[i] == NULL) { - key->lct_index = i; - atomic_set(&key->lct_used, 1); - lu_keys[i] = key; - lu_ref_init(&key->lct_reference); - result = 0; - atomic_inc(&key_set_version); - break; - } + if (lu_keys[i]) + continue; + key->lct_index = i; + + if (strncmp("osd_", module_name(key->lct_owner), 4) == 0) + CFS_RACE_WAIT(OBD_FAIL_OBD_SETUP); + + if (cmpxchg(&lu_keys[i], NULL, key) != NULL) + continue; + + result = 0; + atomic_inc(&key_set_version); + break; } - write_unlock(&lu_keys_guard); + if (result) { + lu_ref_fini(&key->lct_reference); + atomic_set(&key->lct_used, 0); + } return result; } EXPORT_SYMBOL(lu_context_key_register); @@ -1389,11 +1483,12 @@ static void key_fini(struct lu_context *ctx, int index) key = lu_keys[index]; LASSERT(key != NULL); LASSERT(key->lct_fini != NULL); - LASSERT(atomic_read(&key->lct_used) > 1); + LASSERT(atomic_read(&key->lct_used) > 0); key->lct_fini(ctx, key, ctx->lc_value[index]); lu_ref_del(&key->lct_reference, "ctx", ctx); - atomic_dec(&key->lct_used); + if (atomic_dec_and_test(&key->lct_used)) + wake_up_var(&key->lct_used); LASSERT(key->lct_owner != NULL); if ((ctx->lc_tags & LCT_NOREF) == 0) { @@ -1412,32 +1507,21 @@ void lu_context_key_degister(struct lu_context_key *key) LASSERT(atomic_read(&key->lct_used) >= 1); LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys)); - lu_context_key_quiesce(key); + lu_context_key_quiesce(NULL, key); - write_lock(&lu_keys_guard); key_fini(&lu_shrink_env.le_ctx, key->lct_index); /** * Wait until all transient contexts referencing this key have * run lu_context_key::lct_fini() method. */ - while (atomic_read(&key->lct_used) > 1) { - write_unlock(&lu_keys_guard); - CDEBUG(D_INFO, "lu_context_key_degister: \"%s\" %p, %d\n", - key->lct_owner ? key->lct_owner->name : "", key, - atomic_read(&key->lct_used)); - schedule(); - write_lock(&lu_keys_guard); - } - if (lu_keys[key->lct_index]) { - lu_keys[key->lct_index] = NULL; + atomic_dec(&key->lct_used); + wait_var_event(&key->lct_used, atomic_read(&key->lct_used) == 0); + + if (!WARN_ON(lu_keys[key->lct_index] == NULL)) lu_ref_fini(&key->lct_reference); - } - write_unlock(&lu_keys_guard); - LASSERTF(atomic_read(&key->lct_used) == 1, - "key has instances: %d\n", - atomic_read(&key->lct_used)); + smp_store_release(&lu_keys[key->lct_index], NULL); } EXPORT_SYMBOL(lu_context_key_degister); @@ -1509,16 +1593,17 @@ EXPORT_SYMBOL(lu_context_key_revive_many); /** * Quiescent a number of keys. */ -void lu_context_key_quiesce_many(struct lu_context_key *k, ...) +void lu_context_key_quiesce_many(struct lu_device_type *t, + struct lu_context_key *k, ...) { - va_list args; + va_list args; - va_start(args, k); - do { - lu_context_key_quiesce(k); - k = va_arg(args, struct lu_context_key*); - } while (k != NULL); - va_end(args); + va_start(args, k); + do { + lu_context_key_quiesce(t, k); + k = va_arg(args, struct lu_context_key*); + } while (k != NULL); + va_end(args); } EXPORT_SYMBOL(lu_context_key_quiesce_many); @@ -1539,31 +1624,41 @@ EXPORT_SYMBOL(lu_context_key_get); * List of remembered contexts. XXX document me. */ static LIST_HEAD(lu_context_remembered); +static DEFINE_SPINLOCK(lu_context_remembered_guard); /** * Destroy \a key in all remembered contexts. This is used to destroy key * values in "shared" contexts (like service threads), when a module owning * the key is about to be unloaded. */ -void lu_context_key_quiesce(struct lu_context_key *key) +void lu_context_key_quiesce(struct lu_device_type *t, + struct lu_context_key *key) { struct lu_context *ctx; + if (key->lct_tags & LCT_QUIESCENT) + return; + /* + * The write-lock on lu_key_initing will ensure that any + * keys_fill() which didn't see LCT_QUIESCENT will have + * finished before we call key_fini(). + */ + down_write(&lu_key_initing); if (!(key->lct_tags & LCT_QUIESCENT)) { - /* - * The write-lock on lu_key_initing will ensure that any - * keys_fill() which didn't see LCT_QUIESCENT will have - * finished before we call key_fini(). - */ - down_write(&lu_key_initing); - key->lct_tags |= LCT_QUIESCENT; + if (t == NULL || atomic_read(&t->ldt_device_nr) == 0) + key->lct_tags |= LCT_QUIESCENT; up_write(&lu_key_initing); - write_lock(&lu_keys_guard); - list_for_each_entry(ctx, &lu_context_remembered, lc_remember) + spin_lock(&lu_context_remembered_guard); + list_for_each_entry(ctx, &lu_context_remembered, lc_remember) { + spin_until_cond(READ_ONCE(ctx->lc_state) != LCS_LEAVING); key_fini(ctx, key->lct_index); - write_unlock(&lu_keys_guard); + } + spin_unlock(&lu_context_remembered_guard); + + return; } + up_write(&lu_key_initing); } void lu_context_key_revive(struct lu_context_key *key) @@ -1582,7 +1677,7 @@ static void keys_fini(struct lu_context *ctx) for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) key_fini(ctx, i); - OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); + OBD_FREE_PTR_ARRAY(ctx->lc_value, ARRAY_SIZE(lu_keys)); ctx->lc_value = NULL; } @@ -1650,7 +1745,7 @@ static int keys_fill(struct lu_context *ctx) static int keys_init(struct lu_context *ctx) { - OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); + OBD_ALLOC_PTR_ARRAY(ctx->lc_value, ARRAY_SIZE(lu_keys)); if (likely(ctx->lc_value != NULL)) return keys_fill(ctx); @@ -1668,9 +1763,9 @@ int lu_context_init(struct lu_context *ctx, __u32 tags) ctx->lc_state = LCS_INITIALIZED; ctx->lc_tags = tags; if (tags & LCT_REMEMBER) { - write_lock(&lu_keys_guard); + spin_lock(&lu_context_remembered_guard); list_add(&ctx->lc_remember, &lu_context_remembered); - write_unlock(&lu_keys_guard); + spin_unlock(&lu_context_remembered_guard); } else { INIT_LIST_HEAD(&ctx->lc_remember); } @@ -1693,14 +1788,13 @@ void lu_context_fini(struct lu_context *ctx) if ((ctx->lc_tags & LCT_REMEMBER) == 0) { LASSERT(list_empty(&ctx->lc_remember)); - keys_fini(ctx); - - } else { /* could race with key degister */ - write_lock(&lu_keys_guard); - keys_fini(ctx); + } else { + /* could race with key degister */ + spin_lock(&lu_context_remembered_guard); list_del_init(&ctx->lc_remember); - write_unlock(&lu_keys_guard); + spin_unlock(&lu_context_remembered_guard); } + keys_fini(ctx); } EXPORT_SYMBOL(lu_context_fini); @@ -1721,28 +1815,35 @@ void lu_context_exit(struct lu_context *ctx) { unsigned int i; - LINVRNT(ctx->lc_state == LCS_ENTERED); - ctx->lc_state = LCS_LEFT; - if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) { - /* could race with key quiescency */ - if (ctx->lc_tags & LCT_REMEMBER) - read_lock(&lu_keys_guard); - + LINVRNT(ctx->lc_state == LCS_ENTERED); + /* + * Disable preempt to ensure we get a warning if + * any lct_exit ever tries to sleep. That would hurt + * lu_context_key_quiesce() which spins waiting for us. + * This also ensure we aren't preempted while the state + * is LCS_LEAVING, as that too would cause problems for + * lu_context_key_quiesce(). + */ + preempt_disable(); + /* + * Ensure lu_context_key_quiesce() sees LCS_LEAVING + * or we see LCT_QUIESCENT + */ + smp_store_mb(ctx->lc_state, LCS_LEAVING); + if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) { for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { - if (ctx->lc_value[i] != NULL) { - struct lu_context_key *key; - - key = lu_keys[i]; - LASSERT(key != NULL); - if (key->lct_exit != NULL) - key->lct_exit(ctx, - key, ctx->lc_value[i]); - } - } + struct lu_context_key *key; - if (ctx->lc_tags & LCT_REMEMBER) - read_unlock(&lu_keys_guard); + key = lu_keys[i]; + if (ctx->lc_value[i] && + !(key->lct_tags & LCT_QUIESCENT) && + key->lct_exit) + key->lct_exit(ctx, key, ctx->lc_value[i]); + } } + + smp_store_release(&ctx->lc_state, LCS_LEFT); + preempt_enable(); } EXPORT_SYMBOL(lu_context_exit); @@ -1766,46 +1867,44 @@ int lu_context_refill(struct lu_context *ctx) * predefined when the lu_device type are registered, during the module probe * phase. */ -u32 lu_context_tags_default; -u32 lu_session_tags_default; +u32 lu_context_tags_default = LCT_CL_THREAD; +u32 lu_session_tags_default = LCT_SESSION; -#ifdef HAVE_SERVER_SUPPORT void lu_context_tags_update(__u32 tags) { - write_lock(&lu_keys_guard); + spin_lock(&lu_context_remembered_guard); lu_context_tags_default |= tags; atomic_inc(&key_set_version); - write_unlock(&lu_keys_guard); + spin_unlock(&lu_context_remembered_guard); } EXPORT_SYMBOL(lu_context_tags_update); void lu_context_tags_clear(__u32 tags) { - write_lock(&lu_keys_guard); + spin_lock(&lu_context_remembered_guard); lu_context_tags_default &= ~tags; atomic_inc(&key_set_version); - write_unlock(&lu_keys_guard); + spin_unlock(&lu_context_remembered_guard); } EXPORT_SYMBOL(lu_context_tags_clear); void lu_session_tags_update(__u32 tags) { - write_lock(&lu_keys_guard); + spin_lock(&lu_context_remembered_guard); lu_session_tags_default |= tags; atomic_inc(&key_set_version); - write_unlock(&lu_keys_guard); + spin_unlock(&lu_context_remembered_guard); } EXPORT_SYMBOL(lu_session_tags_update); void lu_session_tags_clear(__u32 tags) { - write_lock(&lu_keys_guard); + spin_lock(&lu_context_remembered_guard); lu_session_tags_default &= ~tags; atomic_inc(&key_set_version); - write_unlock(&lu_keys_guard); + spin_unlock(&lu_context_remembered_guard); } EXPORT_SYMBOL(lu_session_tags_clear); -#endif /* HAVE_SERVER_SUPPORT */ int lu_env_init(struct lu_env *env, __u32 tags) { @@ -1866,6 +1965,119 @@ int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags, } EXPORT_SYMBOL(lu_env_refill_by_tags); + +struct lu_env_item { + struct task_struct *lei_task; /* rhashtable key */ + struct rhash_head lei_linkage; + struct lu_env *lei_env; + struct rcu_head lei_rcu_head; +}; + +static const struct rhashtable_params lu_env_rhash_params = { + .key_len = sizeof(struct task_struct *), + .key_offset = offsetof(struct lu_env_item, lei_task), + .head_offset = offsetof(struct lu_env_item, lei_linkage), + }; + +struct rhashtable lu_env_rhash; + +struct lu_env_percpu { + struct task_struct *lep_task; + struct lu_env *lep_env ____cacheline_aligned_in_smp; +}; + +static struct lu_env_percpu lu_env_percpu[NR_CPUS]; + +int lu_env_add_task(struct lu_env *env, struct task_struct *task) +{ + struct lu_env_item *lei, *old; + + LASSERT(env); + + OBD_ALLOC_PTR(lei); + if (!lei) + return -ENOMEM; + + lei->lei_task = task; + lei->lei_env = env; + + old = rhashtable_lookup_get_insert_fast(&lu_env_rhash, + &lei->lei_linkage, + lu_env_rhash_params); + LASSERT(!old); + + return 0; +} +EXPORT_SYMBOL(lu_env_add_task); + +int lu_env_add(struct lu_env *env) +{ + return lu_env_add_task(env, current); +} +EXPORT_SYMBOL(lu_env_add); + +static void lu_env_item_free(struct rcu_head *head) +{ + struct lu_env_item *lei; + + lei = container_of(head, struct lu_env_item, lei_rcu_head); + OBD_FREE_PTR(lei); +} + +void lu_env_remove(struct lu_env *env) +{ + struct lu_env_item *lei; + const void *task = current; + int i; + + for_each_possible_cpu(i) { + if (lu_env_percpu[i].lep_env == env) { + LASSERT(lu_env_percpu[i].lep_task == task); + lu_env_percpu[i].lep_task = NULL; + lu_env_percpu[i].lep_env = NULL; + } + } + + /* The rcu_lock is not taking in this case since the key + * used is the actual task_struct. This implies that each + * object is only removed by the owning thread, so there + * can never be a race on a particular object. + */ + lei = rhashtable_lookup_fast(&lu_env_rhash, &task, + lu_env_rhash_params); + if (lei && rhashtable_remove_fast(&lu_env_rhash, &lei->lei_linkage, + lu_env_rhash_params) == 0) + call_rcu(&lei->lei_rcu_head, lu_env_item_free); +} +EXPORT_SYMBOL(lu_env_remove); + +struct lu_env *lu_env_find(void) +{ + struct lu_env *env = NULL; + struct lu_env_item *lei; + const void *task = current; + int i = get_cpu(); + + if (lu_env_percpu[i].lep_task == current) { + env = lu_env_percpu[i].lep_env; + put_cpu(); + LASSERT(env); + return env; + } + + lei = rhashtable_lookup_fast(&lu_env_rhash, &task, + lu_env_rhash_params); + if (lei) { + env = lei->lei_env; + lu_env_percpu[i].lep_task = current; + lu_env_percpu[i].lep_env = env; + } + put_cpu(); + + return env; +} +EXPORT_SYMBOL(lu_env_find); + static struct shrinker *lu_site_shrinker; typedef struct lu_site_stats{ @@ -1876,37 +2088,21 @@ typedef struct lu_site_stats{ } lu_site_stats_t; static void lu_site_stats_get(const struct lu_site *s, - lu_site_stats_t *stats, int populated) + lu_site_stats_t *stats) { - struct cfs_hash *hs = s->ls_obj_hash; - struct cfs_hash_bd bd; - unsigned int i; + int cnt = atomic_read(&s->ls_obj_hash.nelems); /* * percpu_counter_sum_positive() won't accept a const pointer * as it does modify the struct by taking a spinlock */ struct lu_site *s2 = (struct lu_site *)s; - stats->lss_busy += cfs_hash_size_get(hs) - + stats->lss_busy += cnt - percpu_counter_sum_positive(&s2->ls_lru_len_counter); - cfs_hash_for_each_bucket(hs, &bd, i) { - struct hlist_head *hhead; - - cfs_hash_bd_lock(hs, &bd, 1); - stats->lss_total += cfs_hash_bd_count_get(&bd); - stats->lss_max_search = max((int)stats->lss_max_search, - cfs_hash_bd_depmax_get(&bd)); - if (!populated) { - cfs_hash_bd_unlock(hs, &bd, 1); - continue; - } - cfs_hash_bd_for_each_hlist(hs, &bd, hhead) { - if (!hlist_empty(hhead)) - stats->lss_populated++; - } - cfs_hash_bd_unlock(hs, &bd, 1); - } + stats->lss_total += cnt; + stats->lss_max_search = 0; + stats->lss_populated = 0; } @@ -2011,10 +2207,6 @@ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) .nr_to_scan = shrink_param(sc, nr_to_scan), .gfp_mask = shrink_param(sc, gfp_mask) }; -#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL) - struct shrinker* shrinker = NULL; -#endif - CDEBUG(D_INODE, "Shrink %lu objects\n", scv.nr_to_scan); @@ -2081,7 +2273,7 @@ void lu_context_keys_dump(void) */ int lu_global_init(void) { - int result; + int result; DEF_SHRINKER_VAR(shvar, lu_cache_shrink, lu_cache_shrink_count, lu_cache_shrink_scan); @@ -2116,6 +2308,8 @@ int lu_global_init(void) if (lu_site_shrinker == NULL) return -ENOMEM; + result = rhashtable_init(&lu_env_rhash, &lu_env_rhash_params); + return result; } @@ -2139,6 +2333,8 @@ void lu_global_fini(void) lu_env_fini(&lu_shrink_env); up_write(&lu_sites_guard); + rhashtable_destroy(&lu_env_rhash); + lu_ref_global_fini(); } @@ -2160,16 +2356,23 @@ static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx) */ int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m) { + const struct bucket_table *tbl; lu_site_stats_t stats; + unsigned int chains; memset(&stats, 0, sizeof(stats)); - lu_site_stats_get(s, &stats, 1); - - seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n", + lu_site_stats_get(s, &stats); + + rcu_read_lock(); + tbl = rht_dereference_rcu(s->ls_obj_hash.tbl, + &((struct lu_site *)s)->ls_obj_hash); + chains = tbl->size; + rcu_read_unlock(); + seq_printf(m, "%d/%d %d/%u %d %d %d %d %d %d %d\n", stats.lss_busy, stats.lss_total, stats.lss_populated, - CFS_HASH_NHLIST(s->ls_obj_hash), + chains, stats.lss_max_search, ls_stats_read(s->ls_stats, LU_SS_CREATED), ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT), @@ -2228,27 +2431,27 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o, { struct lu_site *s = o->lo_dev->ld_site; struct lu_fid *old = &o->lo_header->loh_fid; - struct cfs_hash *hs; - struct cfs_hash_bd bd; + int rc; LASSERT(fid_is_zero(old)); - + *old = *fid; +try_again: + rc = rhashtable_lookup_insert_fast(&s->ls_obj_hash, + &o->lo_header->loh_hash, + obj_hash_params); /* supposed to be unique */ - hs = s->ls_obj_hash; - cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1); -#ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK - { - __u64 version = 0; - struct lu_object *shadow; - - shadow = htable_lookup(s, &bd, fid, &version); - /* supposed to be unique */ - LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT); + LASSERT(rc != -EEXIST); + /* handle hash table resizing */ + if (rc == -ENOMEM) { + msleep(20); + goto try_again; } -#endif - *old = *fid; - cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); - cfs_hash_bd_unlock(hs, &bd, 1); + /* trim the hash if its growing to big */ + lu_object_limit(env, o->lo_dev); + if (rc == -E2BIG) + goto try_again; + + LASSERTF(rc == 0, "failed hashtable insertion: rc = %d\n", rc); } EXPORT_SYMBOL(lu_object_assign_fid); @@ -2261,11 +2464,19 @@ struct lu_object *lu_object_anon(const struct lu_env *env, struct lu_device *dev, const struct lu_object_conf *conf) { - struct lu_fid fid; + struct lu_fid fid; struct lu_object *o; + int rc; fid_zero(&fid); - o = lu_object_alloc(env, dev, &fid, conf); + o = lu_object_alloc(env, dev, &fid); + if (!IS_ERR(o)) { + rc = lu_object_start(env, dev, o, conf); + if (rc) { + lu_object_free(env, o); + return ERR_PTR(rc); + } + } return o; }