X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Flu_object.c;h=9d6746d076e1735b74f73d86bdfa15b8a4df6c05;hp=4fd6f0df649a97e64c0b48a77e7d4f0dcc975c3c;hb=8a5b8dbda960b155f669c13602504f1233a84c7e;hpb=26a5dff967c99245eac01f00288e707513c58a98 diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index 4fd6f0d..9d6746d 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -26,8 +24,10 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -43,9 +43,6 @@ */ #define DEBUG_SUBSYSTEM S_CLASS -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #include @@ -73,53 +70,67 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o); */ void lu_object_put(const struct lu_env *env, struct lu_object *o) { + struct lu_site_bkt_data *bkt; struct lu_object_header *top; struct lu_site *site; struct lu_object *orig; - int kill_it; + cfs_hash_bd_t bd; - top = o->lo_header; + top = o->lo_header; site = o->lo_dev->ld_site; orig = o; - kill_it = 0; - write_lock(&site->ls_guard); - if (atomic_dec_and_test(&top->loh_ref)) { - /* - * When last reference is released, iterate over object - * layers, and notify them that object is no longer busy. - */ - list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) { - if (o->lo_ops->loo_object_release != NULL) - o->lo_ops->loo_object_release(env, o); - } - -- site->ls_busy; + + cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd); + bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd); + + if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) { if (lu_object_is_dying(top)) { + /* - * If object is dying (will not be cached), removed it - * from hash table and LRU. - * - * This is done with hash table and LRU lists - * locked. As the only way to acquire first reference - * to previously unreferenced object is through - * hash-table lookup (lu_object_find()), or LRU - * scanning (lu_site_purge()), that are done under - * hash-table and LRU lock, no race with concurrent - * object lookup is possible and we can safely destroy - * object below. + * somebody may be waiting for this, currently only + * used for cl_object, see cl_object_put_last(). */ - hlist_del_init(&top->loh_hash); - list_del_init(&top->loh_lru); - -- site->ls_total; - kill_it = 1; + cfs_waitq_broadcast(&bkt->lsb_marche_funebre); } + return; } - write_unlock(&site->ls_guard); - if (kill_it) - /* - * Object was already removed from hash and lru above, can - * kill it. - */ - lu_object_free(env, orig); + + LASSERT(bkt->lsb_busy > 0); + bkt->lsb_busy--; + /* + * When last reference is released, iterate over object + * layers, and notify them that object is no longer busy. + */ + cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) { + if (o->lo_ops->loo_object_release != NULL) + o->lo_ops->loo_object_release(env, o); + } + + if (!lu_object_is_dying(top)) { + LASSERT(cfs_list_empty(&top->loh_lru)); + cfs_list_add_tail(&top->loh_lru, &bkt->lsb_lru); + cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); + return; + } + + /* + * If object is dying (will not be cached), removed it + * from hash table and LRU. + * + * This is done with hash table and LRU lists locked. As the only + * way to acquire first reference to previously unreferenced + * object is through hash-table lookup (lu_object_find()), + * or LRU scanning (lu_site_purge()), that are done under hash-table + * and LRU lock, no race with concurrent object lookup is possible + * and we can safely destroy object below. + */ + cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash); + cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); + /* + * Object was already removed from hash and lru above, can + * kill it. + */ + lu_object_free(env, orig); } EXPORT_SYMBOL(lu_object_put); @@ -136,7 +147,7 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, { struct lu_object *scan; struct lu_object *top; - struct list_head *layers; + cfs_list_t *layers; int clean; int result; ENTRY; @@ -153,7 +164,7 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, * after this point. */ LASSERT(fid_is_igif(f) || fid_ver(f) == 0); - top->lo_header->loh_fid = *f; + top->lo_header->loh_fid = *f; layers = &top->lo_header->loh_layers; do { /* @@ -161,7 +172,7 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, * object slices are created. */ clean = 1; - list_for_each_entry(scan, layers, lo_linkage) { + cfs_list_for_each_entry(scan, layers, lo_linkage) { if (scan->lo_flags & LU_OBJECT_ALLOCATED) continue; clean = 0; @@ -175,7 +186,7 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, } } while (!clean); - list_for_each_entry_reverse(scan, layers, lo_linkage) { + cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) { if (scan->lo_ops->loo_object_start != NULL) { result = scan->lo_ops->loo_object_start(env, scan); if (result != 0) { @@ -185,7 +196,7 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, } } - dev->ld_site->ls_stats.s_created ++; + lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED); RETURN(top); } @@ -194,17 +205,19 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, */ static void lu_object_free(const struct lu_env *env, struct lu_object *o) { - struct list_head splice; - struct lu_object *scan; - struct lu_site *site; - struct list_head *layers; + struct lu_site_bkt_data *bkt; + struct lu_site *site; + struct lu_object *scan; + cfs_list_t *layers; + cfs_list_t splice; site = o->lo_dev->ld_site; layers = &o->lo_header->loh_layers; + bkt = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid); /* * First call ->loo_object_delete() method to release all resources. */ - list_for_each_entry_reverse(scan, layers, lo_linkage) { + cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) { if (scan->lo_ops->loo_object_delete != NULL) scan->lo_ops->loo_object_delete(env, scan); } @@ -216,19 +229,21 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o) * top-level slice. */ CFS_INIT_LIST_HEAD(&splice); - list_splice_init(layers, &splice); - while (!list_empty(&splice)) { + cfs_list_splice_init(layers, &splice); + while (!cfs_list_empty(&splice)) { /* * Free layers in bottom-to-top order, so that object header * lives as long as possible and ->loo_object_free() methods * can look at its contents. */ o = container_of0(splice.prev, struct lu_object, lo_linkage); - list_del_init(&o->lo_linkage); + cfs_list_del_init(&o->lo_linkage); LASSERT(o->lo_ops->loo_object_free != NULL); o->lo_ops->loo_object_free(env, o); } - cfs_waitq_broadcast(&site->ls_marche_funebre); + + if (cfs_waitq_active(&bkt->lsb_marche_funebre)) + cfs_waitq_broadcast(&bkt->lsb_marche_funebre); } /** @@ -236,47 +251,78 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o) */ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr) { - struct list_head dispose; struct lu_object_header *h; struct lu_object_header *temp; + struct lu_site_bkt_data *bkt; + cfs_hash_bd_t bd; + cfs_hash_bd_t bd2; + cfs_list_t dispose; + int did_sth; + int start; + int count; + int bnr; + int i; CFS_INIT_LIST_HEAD(&dispose); /* * Under LRU list lock, scan LRU list and move unreferenced objects to * the dispose list, removing them from LRU and hash table. */ - write_lock(&s->ls_guard); - list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) { + start = s->ls_purge_start; + bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1; + again: + did_sth = 0; + cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) { + if (i < start) + continue; + count = bnr; + cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1); + bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd); + + cfs_list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) { + LASSERT(cfs_atomic_read(&h->loh_ref) == 0); + + cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2); + LASSERT(bd.bd_bucket == bd2.bd_bucket); + + cfs_hash_bd_del_locked(s->ls_obj_hash, + &bd2, &h->loh_hash); + cfs_list_move(&h->loh_lru, &dispose); + if (did_sth == 0) + did_sth = 1; + + if (nr != ~0 && --nr == 0) + break; + + if (count > 0 && --count == 0) + break; + + } + cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1); + cfs_cond_resched(); /* - * Objects are sorted in lru order, and "busy" objects (ones - * with h->loh_ref > 0) naturally tend to live near hot end - * that we scan last. Unfortunately, sites usually have small - * (less then ten) number of busy yet rarely accessed objects - * (some global objects, accessed directly through pointers, - * bypassing hash table). Currently algorithm scans them over - * and over again. Probably we should move busy objects out of - * LRU, or we can live with that. + * Free everything on the dispose list. This is safe against + * races due to the reasons described in lu_object_put(). */ - if (nr-- == 0) + while (!cfs_list_empty(&dispose)) { + h = container_of0(dispose.next, + struct lu_object_header, loh_lru); + cfs_list_del_init(&h->loh_lru); + lu_object_free(env, lu_object_top(h)); + lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED); + } + + if (nr == 0) break; - if (atomic_read(&h->loh_ref) > 0) - continue; - hlist_del_init(&h->loh_hash); - list_move(&h->loh_lru, &dispose); - s->ls_total --; } - write_unlock(&s->ls_guard); - /* - * Free everything on the dispose list. This is safe against races due - * to the reasons described in lu_object_put(). - */ - while (!list_empty(&dispose)) { - h = container_of0(dispose.next, - struct lu_object_header, loh_lru); - list_del_init(&h->loh_lru); - lu_object_free(env, lu_object_top(h)); - s->ls_stats.s_lru_purged ++; + + if (nr != 0 && did_sth && start != 0) { + start = 0; /* restart from the first bucket */ + goto again; } + /* race on s->ls_purge_start, but nobody cares */ + s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash); + return nr; } EXPORT_SYMBOL(lu_site_purge); @@ -303,7 +349,7 @@ enum { * * XXX overflow is not handled correctly. */ - LU_CDEBUG_LINE = 256 + LU_CDEBUG_LINE = 512 }; struct lu_cdebug_data { @@ -321,7 +367,8 @@ LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data); * lu_global_init(). */ struct lu_context_key lu_global_key = { - .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD, + .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | + LCT_MG_THREAD | LCT_CL_THREAD, .lct_init = lu_global_key_init, .lct_fini = lu_global_key_fini }; @@ -332,8 +379,8 @@ struct lu_context_key lu_global_key = { int lu_cdebug_printer(const struct lu_env *env, void *cookie, const char *format, ...) { - struct lu_cdebug_print_info *info = cookie; - struct lu_cdebug_data *key; + struct libcfs_debug_msg_data *msgdata = cookie; + struct lu_cdebug_data *key; int used; int complete; va_list args; @@ -351,10 +398,8 @@ int lu_cdebug_printer(const struct lu_env *env, vsnprintf(key->lck_area + used, ARRAY_SIZE(key->lck_area) - used, format, args); if (complete) { - if (cdebug_show(info->lpi_mask, info->lpi_subsys)) - libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask, - (char *)info->lpi_file, info->lpi_fn, - info->lpi_line, "%s", key->lck_area); + if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys)) + libcfs_debug_msg(msgdata, "%s", key->lck_area); key->lck_area[0] = 0; } va_end(args); @@ -370,10 +415,11 @@ void lu_object_header_print(const struct lu_env *env, void *cookie, const struct lu_object_header *hdr) { (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]", - hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref), + hdr, hdr->loh_flags, cfs_atomic_read(&hdr->loh_ref), PFID(&hdr->loh_fid), - hlist_unhashed(&hdr->loh_hash) ? "" : " hash", - list_empty((struct list_head *)&hdr->loh_lru) ? "" : " lru", + cfs_hlist_unhashed(&hdr->loh_hash) ? "" : " hash", + cfs_list_empty((cfs_list_t *)&hdr->loh_lru) ? \ + "" : " lru", hdr->loh_attr & LOHA_EXISTS ? " exist":""); } EXPORT_SYMBOL(lu_object_header_print); @@ -391,7 +437,7 @@ void lu_object_print(const struct lu_env *env, void *cookie, top = o->lo_header; lu_object_header_print(env, cookie, printer, top); (*printer)(env, cookie, "{ \n"); - list_for_each_entry(o, &top->loh_layers, lo_linkage) { + cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) { depth = o->lo_depth + 4; /* @@ -415,7 +461,7 @@ int lu_object_invariant(const struct lu_object *o) struct lu_object_header *top; top = o->lo_header; - list_for_each_entry(o, &top->loh_layers, lo_linkage) { + cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) { if (o->lo_ops->loo_object_invariant != NULL && !o->lo_ops->loo_object_invariant(o)) return 0; @@ -425,52 +471,48 @@ int lu_object_invariant(const struct lu_object *o) EXPORT_SYMBOL(lu_object_invariant); static struct lu_object *htable_lookup(struct lu_site *s, - const struct hlist_head *bucket, + cfs_hash_bd_t *bd, const struct lu_fid *f, - cfs_waitlink_t *waiter) + cfs_waitlink_t *waiter, + __u64 *version) { + struct lu_site_bkt_data *bkt; struct lu_object_header *h; - struct hlist_node *scan; - - hlist_for_each_entry(h, scan, bucket, loh_hash) { - s->ls_stats.s_cache_check ++; - if (likely(lu_fid_eq(&h->loh_fid, f))) { - if (unlikely(lu_object_is_dying(h))) { - /* - * Lookup found an object being destroyed; - * this object cannot be returned (to assure - * that references to dying objects are - * eventually drained), and moreover, lookup - * has to wait until object is freed. - */ - cfs_waitlink_init(waiter); - cfs_waitq_add(&s->ls_marche_funebre, waiter); - set_current_state(CFS_TASK_UNINT); - s->ls_stats.s_cache_death_race ++; - return ERR_PTR(-EAGAIN); - } - /* bump reference count... */ - if (atomic_add_return(1, &h->loh_ref) == 1) - ++ s->ls_busy; - /* and move to the head of the LRU */ - /* - * XXX temporary disable this to measure effects of - * read-write locking. - */ - /* list_move_tail(&h->loh_lru, &s->ls_lru); */ - s->ls_stats.s_cache_hit ++; - return lu_object_top(h); - } + cfs_hlist_node_t *hnode; + __u64 ver = cfs_hash_bd_version_get(bd); + + if (*version == ver) + return NULL; + + *version = ver; + bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd); + /* cfs_hash_bd_lookup_intent is a somehow "internal" function + * of cfs_hash, but we don't want refcount on object right now */ + hnode = cfs_hash_bd_lookup_locked(s->ls_obj_hash, bd, (void *)f); + if (hnode == NULL) { + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS); + return NULL; } - s->ls_stats.s_cache_miss ++; - return NULL; -} -static __u32 fid_hash(const struct lu_fid *f, int bits) -{ - /* all objects with same id and different versions will belong to same - * collisions list. */ - return hash_long(fid_flatten(f), bits); + h = container_of0(hnode, struct lu_object_header, loh_hash); + if (likely(!lu_object_is_dying(h))) { + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT); + cfs_list_del_init(&h->loh_lru); + return lu_object_top(h); + } + + /* + * Lookup found an object being destroyed this object cannot be + * returned (to assure that references to dying objects are eventually + * drained), and moreover, lookup has to wait until object is freed. + */ + cfs_atomic_dec(&h->loh_ref); + + cfs_waitlink_init(waiter); + cfs_waitq_add(&bkt->lsb_marche_funebre, waiter); + cfs_set_current_state(CFS_TASK_UNINT); + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE); + return ERR_PTR(-EAGAIN); } /** @@ -486,6 +528,29 @@ struct lu_object *lu_object_find(const struct lu_env *env, } EXPORT_SYMBOL(lu_object_find); +static struct lu_object *lu_object_new(const struct lu_env *env, + struct lu_device *dev, + const struct lu_fid *f, + const struct lu_object_conf *conf) +{ + struct lu_object *o; + cfs_hash_t *hs; + cfs_hash_bd_t bd; + struct lu_site_bkt_data *bkt; + + o = lu_object_alloc(env, dev, f, conf); + if (unlikely(IS_ERR(o))) + return o; + + hs = dev->ld_site->ls_obj_hash; + cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); + bkt = cfs_hash_bd_extra_get(hs, &bd); + cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); + bkt->lsb_busy++; + cfs_hash_bd_unlock(hs, &bd, 1); + return o; +} + /** * Core logic of lu_object_find*() functions. */ @@ -495,10 +560,12 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env, const struct lu_object_conf *conf, cfs_waitlink_t *waiter) { - struct lu_site *s; - struct lu_object *o; - struct lu_object *shadow; - struct hlist_head *bucket; + struct lu_object *o; + struct lu_object *shadow; + struct lu_site *s; + cfs_hash_t *hs; + cfs_hash_bd_t bd; + __u64 version = 0; /* * This uses standard index maintenance protocol: @@ -513,17 +580,21 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env, * - unlock index; * - return object. * + * For "LOC_F_NEW" case, we are sure the object is new established. + * It is unnecessary to perform lookup-alloc-lookup-insert, instead, + * just alloc and insert directly. + * * If dying object is found during index search, add @waiter to the * site wait-queue and return ERR_PTR(-EAGAIN). */ - - s = dev->ld_site; - bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits); - - read_lock(&s->ls_guard); - o = htable_lookup(s, bucket, f, waiter); - read_unlock(&s->ls_guard); - + if (conf != NULL && conf->loc_flags & LOC_F_NEW) + return lu_object_new(env, dev, f, conf); + + s = dev->ld_site; + hs = s->ls_obj_hash; + cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); + o = htable_lookup(s, &bd, f, waiter, &version); + cfs_hash_bd_unlock(hs, &bd, 1); if (o != NULL) return o; @@ -537,20 +608,22 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env, LASSERT(lu_fid_eq(lu_object_fid(o), f)); - write_lock(&s->ls_guard); - shadow = htable_lookup(s, bucket, f, waiter); + cfs_hash_bd_lock(hs, &bd, 1); + + shadow = htable_lookup(s, &bd, f, waiter, &version); if (likely(shadow == NULL)) { - hlist_add_head(&o->lo_header->loh_hash, bucket); - list_add_tail(&o->lo_header->loh_lru, &s->ls_lru); - ++ s->ls_busy; - ++ s->ls_total; - shadow = o; - o = NULL; - } else - s->ls_stats.s_cache_race ++; - write_unlock(&s->ls_guard); - if (o != NULL) - lu_object_free(env, o); + struct lu_site_bkt_data *bkt; + + bkt = cfs_hash_bd_extra_get(hs, &bd); + cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); + bkt->lsb_busy++; + cfs_hash_bd_unlock(hs, &bd, 1); + return o; + } + + lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE); + cfs_hash_bd_unlock(hs, &bd, 1); + lu_object_free(env, o); return shadow; } @@ -564,22 +637,22 @@ struct lu_object *lu_object_find_at(const struct lu_env *env, const struct lu_fid *f, const struct lu_object_conf *conf) { - struct lu_object *obj; - cfs_waitlink_t wait; + struct lu_site_bkt_data *bkt; + struct lu_object *obj; + cfs_waitlink_t wait; while (1) { obj = lu_object_find_try(env, dev, f, conf, &wait); - if (obj == ERR_PTR(-EAGAIN)) { - /* - * lu_object_find_try() already added waiter into the - * wait queue. - */ - cfs_waitq_wait(&wait, CFS_TASK_UNINT); - cfs_waitq_del(&dev->ld_site->ls_marche_funebre, &wait); - } else - break; + if (obj != ERR_PTR(-EAGAIN)) + return obj; + /* + * lu_object_find_try() already added waiter into the + * wait queue. + */ + cfs_waitq_wait(&wait, CFS_TASK_UNINT); + bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f); + cfs_waitq_del(&bkt->lsb_marche_funebre, &wait); } - return obj; } EXPORT_SYMBOL(lu_object_find_at); @@ -617,14 +690,14 @@ int lu_device_type_init(struct lu_device_type *ldt) CFS_INIT_LIST_HEAD(&ldt->ldt_linkage); result = ldt->ldt_ops->ldto_init(ldt); if (result == 0) - list_add(&ldt->ldt_linkage, &lu_device_types); + cfs_list_add(&ldt->ldt_linkage, &lu_device_types); return result; } EXPORT_SYMBOL(lu_device_type_init); void lu_device_type_fini(struct lu_device_type *ldt) { - list_del_init(&ldt->ldt_linkage); + cfs_list_del_init(&ldt->ldt_linkage); ldt->ldt_ops->ldto_fini(ldt); } EXPORT_SYMBOL(lu_device_type_fini); @@ -633,7 +706,7 @@ void lu_types_stop(void) { struct lu_device_type *ldt; - list_for_each_entry(ldt, &lu_device_types, ldt_linkage) { + cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) { if (ldt->ldt_device_nr == 0) ldt->ldt_ops->ldto_stop(ldt); } @@ -644,45 +717,65 @@ EXPORT_SYMBOL(lu_types_stop); * Global list of all sites on this node */ static CFS_LIST_HEAD(lu_sites); -static DECLARE_MUTEX(lu_sites_guard); +static CFS_DEFINE_MUTEX(lu_sites_guard); /** * Global environment used by site shrinker. */ static struct lu_env lu_shrink_env; +struct lu_site_print_arg { + struct lu_env *lsp_env; + void *lsp_cookie; + lu_printer_t lsp_printer; +}; + +static int +lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd, + cfs_hlist_node_t *hnode, void *data) +{ + struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data; + struct lu_object_header *h; + + h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash); + if (!cfs_list_empty(&h->loh_layers)) { + const struct lu_object *o; + + o = lu_object_top(h); + lu_object_print(arg->lsp_env, arg->lsp_cookie, + arg->lsp_printer, o); + } else { + lu_object_header_print(arg->lsp_env, arg->lsp_cookie, + arg->lsp_printer, h); + } + return 0; +} + /** * Print all objects in \a s. */ void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie, lu_printer_t printer) { - int i; - - for (i = 0; i < s->ls_hash_size; ++i) { - struct lu_object_header *h; - struct hlist_node *scan; - - read_lock(&s->ls_guard); - hlist_for_each_entry(h, scan, &s->ls_hash[i], loh_hash) { + struct lu_site_print_arg arg = { + .lsp_env = (struct lu_env *)env, + .lsp_cookie = cookie, + .lsp_printer = printer, + }; - if (!list_empty(&h->loh_layers)) { - const struct lu_object *obj; - - obj = lu_object_top(h); - lu_object_print(env, cookie, printer, obj); - } else - lu_object_header_print(env, cookie, printer, h); - } - read_unlock(&s->ls_guard); - } + cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg); } EXPORT_SYMBOL(lu_site_print); enum { - LU_CACHE_PERCENT = 20, + LU_CACHE_PERCENT_MAX = 50, + LU_CACHE_PERCENT_DEFAULT = 20 }; +static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT; +CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644, + "Percentage of memory to be used as lu_object cache"); + /** * Return desired hash table order. */ @@ -698,7 +791,7 @@ static int lu_htable_order(void) * * Size of lu_object is (arbitrary) taken as 1K (together with inode). */ - cache_size = num_physpages; + cache_size = cfs_num_physpages; #if BITS_PER_LONG == 32 /* limit hashtable size for lowmem systems to low RAM */ @@ -706,7 +799,16 @@ static int lu_htable_order(void) cache_size = 1 << (30 - CFS_PAGE_SHIFT) * 3 / 4; #endif - cache_size = cache_size / 100 * LU_CACHE_PERCENT * + /* clear off unreasonable cache setting. */ + if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) { + CWARN("obdclass: invalid lu_cache_percent: %u, it must be in" + " the range of (0, %u]. Will use default value: %u.\n", + lu_cache_percent, LU_CACHE_PERCENT_MAX, + LU_CACHE_PERCENT_DEFAULT); + + lu_cache_percent = LU_CACHE_PERCENT_DEFAULT; + } + cache_size = cache_size / 100 * lu_cache_percent * (CFS_PAGE_SIZE / 1024); for (bits = 1; (1 << bits) < cache_size; ++bits) { @@ -715,45 +817,156 @@ static int lu_htable_order(void) return bits; } -static struct lock_class_key lu_site_guard_class; +static unsigned lu_obj_hop_hash(cfs_hash_t *hs, + const void *key, unsigned mask) +{ + struct lu_fid *fid = (struct lu_fid *)key; + __u32 hash; + + hash = fid_flatten32(fid); + hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */ + hash = cfs_hash_long(hash, hs->hs_bkt_bits); + + /* give me another random factor */ + hash -= cfs_hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3); + + hash <<= hs->hs_cur_bits - hs->hs_bkt_bits; + hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1); + + return hash & mask; +} + +static void *lu_obj_hop_object(cfs_hlist_node_t *hnode) +{ + return cfs_hlist_entry(hnode, struct lu_object_header, loh_hash); +} + +static void *lu_obj_hop_key(cfs_hlist_node_t *hnode) +{ + struct lu_object_header *h; + + h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash); + return &h->loh_fid; +} + +static int lu_obj_hop_keycmp(const void *key, cfs_hlist_node_t *hnode) +{ + struct lu_object_header *h; + + h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash); + return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key); +} + +static void lu_obj_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct lu_object_header *h; + + h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash); + if (cfs_atomic_add_return(1, &h->loh_ref) == 1) { + struct lu_site_bkt_data *bkt; + cfs_hash_bd_t bd; + + cfs_hash_bd_get(hs, &h->loh_fid, &bd); + bkt = cfs_hash_bd_extra_get(hs, &bd); + bkt->lsb_busy++; + } +} + +static void lu_obj_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + LBUG(); /* we should never called it */ +} + +cfs_hash_ops_t lu_site_hash_ops = { + .hs_hash = lu_obj_hop_hash, + .hs_key = lu_obj_hop_key, + .hs_keycmp = lu_obj_hop_keycmp, + .hs_object = lu_obj_hop_object, + .hs_get = lu_obj_hop_get, + .hs_put_locked = lu_obj_hop_put_locked, +}; /** * Initialize site \a s, with \a d as the top level device. */ +#define LU_SITE_BITS_MIN 12 +#define LU_SITE_BITS_MAX 24 +/** + * total 256 buckets, we don't want too many buckets because: + * - consume too much memory + * - avoid unbalanced LRU list + */ +#define LU_SITE_BKT_BITS 8 + int lu_site_init(struct lu_site *s, struct lu_device *top) { + struct lu_site_bkt_data *bkt; + cfs_hash_bd_t bd; + char name[16]; int bits; - int size; int i; ENTRY; memset(s, 0, sizeof *s); - rwlock_init(&s->ls_guard); - lockdep_set_class(&s->ls_guard, &lu_site_guard_class); - CFS_INIT_LIST_HEAD(&s->ls_lru); + bits = lu_htable_order(); + snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name); + for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX); + bits >= LU_SITE_BITS_MIN; bits--) { + s->ls_obj_hash = cfs_hash_create(name, bits, bits, + bits - LU_SITE_BKT_BITS, + sizeof(*bkt), 0, 0, + &lu_site_hash_ops, + CFS_HASH_SPIN_BKTLOCK | + CFS_HASH_NO_ITEMREF | + CFS_HASH_DEPTH | + CFS_HASH_ASSERT_EMPTY); + if (s->ls_obj_hash != NULL) + break; + } + + if (s->ls_obj_hash == NULL) { + CERROR("failed to create lu_site hash with bits: %d\n", bits); + return -ENOMEM; + } + + cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) { + bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd); + CFS_INIT_LIST_HEAD(&bkt->lsb_lru); + cfs_waitq_init(&bkt->lsb_marche_funebre); + } + + s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0); + if (s->ls_stats == NULL) { + cfs_hash_putref(s->ls_obj_hash); + s->ls_obj_hash = NULL; + return -ENOMEM; + } + + lprocfs_counter_init(s->ls_stats, LU_SS_CREATED, + 0, "created", "created"); + lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT, + 0, "cache_hit", "cache_hit"); + lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS, + 0, "cache_miss", "cache_miss"); + lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE, + 0, "cache_race", "cache_race"); + lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE, + 0, "cache_death_race", "cache_death_race"); + lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED, + 0, "lru_purged", "lru_purged"); + CFS_INIT_LIST_HEAD(&s->ls_linkage); - cfs_waitq_init(&s->ls_marche_funebre); s->ls_top_dev = top; top->ld_site = s; lu_device_get(top); lu_ref_add(&top->ld_reference, "site-top", s); - for (bits = lu_htable_order(), size = 1 << bits; - (s->ls_hash = - cfs_alloc_large(size * sizeof s->ls_hash[0])) == NULL; - --bits, size >>= 1) { - /* - * Scale hash table down, until allocation succeeds. - */ - ; - } - - s->ls_hash_size = size; - s->ls_hash_bits = bits; - s->ls_hash_mask = size - 1; + CFS_INIT_LIST_HEAD(&s->ls_ld_linkage); + cfs_spin_lock_init(&s->ls_ld_lock); - for (i = 0; i < size; i++) - INIT_HLIST_HEAD(&s->ls_hash[i]); + cfs_spin_lock(&s->ls_ld_lock); + cfs_list_add(&top->ld_linkage, &s->ls_ld_linkage); + cfs_spin_unlock(&s->ls_ld_lock); RETURN(0); } @@ -764,26 +977,24 @@ EXPORT_SYMBOL(lu_site_init); */ void lu_site_fini(struct lu_site *s) { - LASSERT(list_empty(&s->ls_lru)); - LASSERT(s->ls_total == 0); + cfs_mutex_lock(&lu_sites_guard); + cfs_list_del_init(&s->ls_linkage); + cfs_mutex_unlock(&lu_sites_guard); - down(&lu_sites_guard); - list_del_init(&s->ls_linkage); - up(&lu_sites_guard); - - if (s->ls_hash != NULL) { - int i; - for (i = 0; i < s->ls_hash_size; i++) - LASSERT(hlist_empty(&s->ls_hash[i])); - cfs_free_large(s->ls_hash); - s->ls_hash = NULL; + if (s->ls_obj_hash != NULL) { + cfs_hash_putref(s->ls_obj_hash); + s->ls_obj_hash = NULL; } + if (s->ls_top_dev != NULL) { s->ls_top_dev->ld_site = NULL; lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s); lu_device_put(s->ls_top_dev); s->ls_top_dev = NULL; } + + if (s->ls_stats != NULL) + lprocfs_free_stats(&s->ls_stats); } EXPORT_SYMBOL(lu_site_fini); @@ -793,11 +1004,11 @@ EXPORT_SYMBOL(lu_site_fini); int lu_site_init_finish(struct lu_site *s) { int result; - down(&lu_sites_guard); + cfs_mutex_lock(&lu_sites_guard); result = lu_context_refill(&lu_shrink_env.le_ctx); if (result == 0) - list_add(&s->ls_linkage, &lu_sites); - up(&lu_sites_guard); + cfs_list_add(&s->ls_linkage, &lu_sites); + cfs_mutex_unlock(&lu_sites_guard); return result; } EXPORT_SYMBOL(lu_site_init_finish); @@ -807,7 +1018,7 @@ EXPORT_SYMBOL(lu_site_init_finish); */ void lu_device_get(struct lu_device *d) { - atomic_inc(&d->ld_ref); + cfs_atomic_inc(&d->ld_ref); } EXPORT_SYMBOL(lu_device_get); @@ -816,8 +1027,8 @@ EXPORT_SYMBOL(lu_device_get); */ void lu_device_put(struct lu_device *d) { - LASSERT(atomic_read(&d->ld_ref) > 0); - atomic_dec(&d->ld_ref); + LASSERT(cfs_atomic_read(&d->ld_ref) > 0); + cfs_atomic_dec(&d->ld_ref); } EXPORT_SYMBOL(lu_device_put); @@ -829,9 +1040,10 @@ int lu_device_init(struct lu_device *d, struct lu_device_type *t) if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL) t->ldt_ops->ldto_start(t); memset(d, 0, sizeof *d); - atomic_set(&d->ld_ref, 0); + cfs_atomic_set(&d->ld_ref, 0); d->ld_type = t; lu_ref_init(&d->ld_reference); + CFS_INIT_LIST_HEAD(&d->ld_linkage); return 0; } EXPORT_SYMBOL(lu_device_init); @@ -850,8 +1062,8 @@ void lu_device_fini(struct lu_device *d) } lu_ref_fini(&d->ld_reference); - LASSERTF(atomic_read(&d->ld_ref) == 0, - "Refcount is %u\n", atomic_read(&d->ld_ref)); + LASSERTF(cfs_atomic_read(&d->ld_ref) == 0, + "Refcount is %u\n", cfs_atomic_read(&d->ld_ref)); LASSERT(t->ldt_device_nr > 0); if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL) t->ldt_ops->ldto_stop(t); @@ -882,7 +1094,7 @@ void lu_object_fini(struct lu_object *o) { struct lu_device *dev = o->lo_dev; - LASSERT(list_empty(&o->lo_linkage)); + LASSERT(cfs_list_empty(&o->lo_linkage)); if (dev != NULL) { lu_ref_del_at(&dev->ld_reference, @@ -901,7 +1113,7 @@ EXPORT_SYMBOL(lu_object_fini); */ void lu_object_add_top(struct lu_object_header *h, struct lu_object *o) { - list_move(&o->lo_linkage, &h->loh_layers); + cfs_list_move(&o->lo_linkage, &h->loh_layers); } EXPORT_SYMBOL(lu_object_add_top); @@ -913,7 +1125,7 @@ EXPORT_SYMBOL(lu_object_add_top); */ void lu_object_add(struct lu_object *before, struct lu_object *o) { - list_move(&o->lo_linkage, &before->lo_linkage); + cfs_list_move(&o->lo_linkage, &before->lo_linkage); } EXPORT_SYMBOL(lu_object_add); @@ -923,8 +1135,8 @@ EXPORT_SYMBOL(lu_object_add); int lu_object_header_init(struct lu_object_header *h) { memset(h, 0, sizeof *h); - atomic_set(&h->loh_ref, 1); - INIT_HLIST_NODE(&h->loh_hash); + cfs_atomic_set(&h->loh_ref, 1); + CFS_INIT_HLIST_NODE(&h->loh_hash); CFS_INIT_LIST_HEAD(&h->loh_lru); CFS_INIT_LIST_HEAD(&h->loh_layers); lu_ref_init(&h->loh_reference); @@ -937,9 +1149,9 @@ EXPORT_SYMBOL(lu_object_header_init); */ void lu_object_header_fini(struct lu_object_header *h) { - LASSERT(list_empty(&h->loh_layers)); - LASSERT(list_empty(&h->loh_lru)); - LASSERT(hlist_unhashed(&h->loh_hash)); + LASSERT(cfs_list_empty(&h->loh_layers)); + LASSERT(cfs_list_empty(&h->loh_lru)); + LASSERT(cfs_hlist_unhashed(&h->loh_hash)); lu_ref_fini(&h->loh_reference); } EXPORT_SYMBOL(lu_object_header_fini); @@ -953,7 +1165,7 @@ struct lu_object *lu_object_locate(struct lu_object_header *h, { struct lu_object *o; - list_for_each_entry(o, &h->loh_layers, lo_linkage) { + cfs_list_for_each_entry(o, &h->loh_layers, lo_linkage) { if (o->lo_dev->ld_type == dtype) return o; } @@ -986,13 +1198,13 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top) /* purge again. */ lu_site_purge(env, site, ~0); - if (!list_empty(&site->ls_lru) || site->ls_total != 0) { + if (!cfs_hash_is_empty(site->ls_obj_hash)) { /* * Uh-oh, objects still exist. */ - static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR); + LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL); - lu_site_print(env, site, &cookie, lu_cdebug_printer); + lu_site_print(env, site, &msgdata, lu_cdebug_printer); } for (scan = top; scan != NULL; scan = next) { @@ -1018,7 +1230,7 @@ enum { static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, }; -static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED; +static cfs_spinlock_t lu_keys_guard = CFS_SPIN_LOCK_UNLOCKED; /** * Global counter incremented whenever key is registered, unregistered, @@ -1042,11 +1254,11 @@ int lu_context_key_register(struct lu_context_key *key) LASSERT(key->lct_owner != NULL); result = -ENFILE; - spin_lock(&lu_keys_guard); + cfs_spin_lock(&lu_keys_guard); for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { if (lu_keys[i] == NULL) { key->lct_index = i; - atomic_set(&key->lct_used, 1); + cfs_atomic_set(&key->lct_used, 1); lu_keys[i] = key; lu_ref_init(&key->lct_reference); result = 0; @@ -1054,7 +1266,7 @@ int lu_context_key_register(struct lu_context_key *key) break; } } - spin_unlock(&lu_keys_guard); + cfs_spin_unlock(&lu_keys_guard); return result; } EXPORT_SYMBOL(lu_context_key_register); @@ -1067,18 +1279,19 @@ static void key_fini(struct lu_context *ctx, int index) key = lu_keys[index]; LASSERT(key != NULL); LASSERT(key->lct_fini != NULL); - LASSERT(atomic_read(&key->lct_used) > 1); + LASSERT(cfs_atomic_read(&key->lct_used) > 1); key->lct_fini(ctx, key, ctx->lc_value[index]); lu_ref_del(&key->lct_reference, "ctx", ctx); - atomic_dec(&key->lct_used); - LASSERT(key->lct_owner != NULL); - if (!(ctx->lc_tags & LCT_NOREF)) { - LASSERT(module_refcount(key->lct_owner) > 0); - module_put(key->lct_owner); - } - ctx->lc_value[index] = NULL; - } + cfs_atomic_dec(&key->lct_used); + + LASSERT(key->lct_owner != NULL); + if ((ctx->lc_tags & LCT_NOREF) == 0) { + LINVRNT(cfs_module_refcount(key->lct_owner) > 0); + cfs_module_put(key->lct_owner); + } + ctx->lc_value[index] = NULL; + } } /** @@ -1086,19 +1299,23 @@ static void key_fini(struct lu_context *ctx, int index) */ void lu_context_key_degister(struct lu_context_key *key) { - LASSERT(atomic_read(&key->lct_used) >= 1); + LASSERT(cfs_atomic_read(&key->lct_used) >= 1); LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys)); lu_context_key_quiesce(key); ++key_set_version; + cfs_spin_lock(&lu_keys_guard); key_fini(&lu_shrink_env.le_ctx, key->lct_index); + if (lu_keys[key->lct_index]) { + lu_keys[key->lct_index] = NULL; + lu_ref_fini(&key->lct_reference); + } + cfs_spin_unlock(&lu_keys_guard); - if (atomic_read(&key->lct_used) > 1) - CERROR("key has instances.\n"); - spin_lock(&lu_keys_guard); - lu_keys[key->lct_index] = NULL; - spin_unlock(&lu_keys_guard); + LASSERTF(cfs_atomic_read(&key->lct_used) == 1, + "key has instances: %d\n", + cfs_atomic_read(&key->lct_used)); } EXPORT_SYMBOL(lu_context_key_degister); @@ -1220,10 +1437,11 @@ void lu_context_key_quiesce(struct lu_context_key *key) /* * XXX memory barrier has to go here. */ - spin_lock(&lu_keys_guard); - list_for_each_entry(ctx, &lu_context_remembered, lc_remember) + cfs_spin_lock(&lu_keys_guard); + cfs_list_for_each_entry(ctx, &lu_context_remembered, + lc_remember) key_fini(ctx, key->lct_index); - spin_unlock(&lu_keys_guard); + cfs_spin_unlock(&lu_keys_guard); ++key_set_version; } } @@ -1238,21 +1456,23 @@ EXPORT_SYMBOL(lu_context_key_revive); static void keys_fini(struct lu_context *ctx) { - int i; + int i; - if (ctx->lc_value != NULL) { - for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) - key_fini(ctx, i); - OBD_FREE(ctx->lc_value, - ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); - ctx->lc_value = NULL; - } + if (ctx->lc_value == NULL) + return; + + for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) + key_fini(ctx, i); + + OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); + ctx->lc_value = NULL; } static int keys_fill(struct lu_context *ctx) { int i; + LINVRNT(ctx->lc_value != NULL); for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { struct lu_context_key *key; @@ -1275,9 +1495,9 @@ static int keys_fill(struct lu_context *ctx) LASSERT(key->lct_owner != NULL); if (!(ctx->lc_tags & LCT_NOREF)) - try_module_get(key->lct_owner); + cfs_try_module_get(key->lct_owner); lu_ref_add_atomic(&key->lct_reference, "ctx", ctx); - atomic_inc(&key->lct_used); + cfs_atomic_inc(&key->lct_used); /* * This is the only place in the code, where an * element of ctx->lc_value[] array is set to non-NULL @@ -1294,17 +1514,11 @@ static int keys_fill(struct lu_context *ctx) static int keys_init(struct lu_context *ctx) { - int result; - - OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); - if (likely(ctx->lc_value != NULL)) - result = keys_fill(ctx); - else - result = -ENOMEM; + OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); + if (likely(ctx->lc_value != NULL)) + return keys_fill(ctx); - if (result != 0) - keys_fini(ctx); - return result; + return -ENOMEM; } /** @@ -1312,16 +1526,24 @@ static int keys_init(struct lu_context *ctx) */ int lu_context_init(struct lu_context *ctx, __u32 tags) { + int rc; + memset(ctx, 0, sizeof *ctx); ctx->lc_state = LCS_INITIALIZED; ctx->lc_tags = tags; if (tags & LCT_REMEMBER) { - spin_lock(&lu_keys_guard); - list_add(&ctx->lc_remember, &lu_context_remembered); - spin_unlock(&lu_keys_guard); - } else - CFS_INIT_LIST_HEAD(&ctx->lc_remember); - return keys_init(ctx); + cfs_spin_lock(&lu_keys_guard); + cfs_list_add(&ctx->lc_remember, &lu_context_remembered); + cfs_spin_unlock(&lu_keys_guard); + } else { + CFS_INIT_LIST_HEAD(&ctx->lc_remember); + } + + rc = keys_init(ctx); + if (rc != 0) + lu_context_fini(ctx); + + return rc; } EXPORT_SYMBOL(lu_context_init); @@ -1330,12 +1552,19 @@ EXPORT_SYMBOL(lu_context_init); */ void lu_context_fini(struct lu_context *ctx) { - LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); - ctx->lc_state = LCS_FINALIZED; - keys_fini(ctx); - spin_lock(&lu_keys_guard); - list_del_init(&ctx->lc_remember); - spin_unlock(&lu_keys_guard); + LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); + ctx->lc_state = LCS_FINALIZED; + + if ((ctx->lc_tags & LCT_REMEMBER) == 0) { + LASSERT(cfs_list_empty(&ctx->lc_remember)); + keys_fini(ctx); + + } else { /* could race with key degister */ + cfs_spin_lock(&lu_keys_guard); + keys_fini(ctx); + cfs_list_del_init(&ctx->lc_remember); + cfs_spin_unlock(&lu_keys_guard); + } } EXPORT_SYMBOL(lu_context_fini); @@ -1376,15 +1605,61 @@ EXPORT_SYMBOL(lu_context_exit); /** * Allocate for context all missing keys that were registered after context - * creation. + * creation. key_set_version is only changed in rare cases when modules + * are loaded and removed. */ int lu_context_refill(struct lu_context *ctx) { - LINVRNT(ctx->lc_value != NULL); - return ctx->lc_version == key_set_version ? 0 : keys_fill(ctx); + return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx); } EXPORT_SYMBOL(lu_context_refill); +/** + * lu_ctx_tags/lu_ses_tags will be updated if there are new types of + * obd being added. Currently, this is only used on client side, specifically + * for echo device client, for other stack (like ptlrpc threads), context are + * predefined when the lu_device type are registered, during the module probe + * phase. + */ +__u32 lu_context_tags_default = 0; +__u32 lu_session_tags_default = 0; + +void lu_context_tags_update(__u32 tags) +{ + cfs_spin_lock(&lu_keys_guard); + lu_context_tags_default |= tags; + key_set_version ++; + cfs_spin_unlock(&lu_keys_guard); +} +EXPORT_SYMBOL(lu_context_tags_update); + +void lu_context_tags_clear(__u32 tags) +{ + cfs_spin_lock(&lu_keys_guard); + lu_context_tags_default &= ~tags; + key_set_version ++; + cfs_spin_unlock(&lu_keys_guard); +} +EXPORT_SYMBOL(lu_context_tags_clear); + +void lu_session_tags_update(__u32 tags) +{ + cfs_spin_lock(&lu_keys_guard); + lu_session_tags_default |= tags; + key_set_version ++; + cfs_spin_unlock(&lu_keys_guard); +} +EXPORT_SYMBOL(lu_session_tags_update); + +void lu_session_tags_clear(__u32 tags) +{ + cfs_spin_lock(&lu_keys_guard); + lu_session_tags_default &= ~tags; + key_set_version ++; + cfs_spin_unlock(&lu_keys_guard); +} +EXPORT_SYMBOL(lu_session_tags_clear); + int lu_env_init(struct lu_env *env, __u32 tags) { int result; @@ -1416,38 +1691,111 @@ int lu_env_refill(struct lu_env *env) } EXPORT_SYMBOL(lu_env_refill); -static struct shrinker *lu_site_shrinker = NULL; +/** + * Currently, this API will only be used by echo client. + * Because echo client and normal lustre client will share + * same cl_env cache. So echo client needs to refresh + * the env context after it get one from the cache, especially + * when normal client and echo client co-exist in the same client. + */ +int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags, + __u32 stags) +{ + int result; + + if ((env->le_ctx.lc_tags & ctags) != ctags) { + env->le_ctx.lc_version = 0; + env->le_ctx.lc_tags |= ctags; + } + + if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) { + env->le_ses->lc_version = 0; + env->le_ses->lc_tags |= stags; + } + + result = lu_env_refill(env); + + return result; +} +EXPORT_SYMBOL(lu_env_refill_by_tags); + +static struct cfs_shrinker *lu_site_shrinker = NULL; + +typedef struct lu_site_stats{ + unsigned lss_populated; + unsigned lss_max_search; + unsigned lss_total; + unsigned lss_busy; +} lu_site_stats_t; + +static void lu_site_stats_get(cfs_hash_t *hs, + lu_site_stats_t *stats, int populated) +{ + cfs_hash_bd_t bd; + int i; + + cfs_hash_for_each_bucket(hs, &bd, i) { + struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd); + cfs_hlist_head_t *hhead; + + cfs_hash_bd_lock(hs, &bd, 1); + stats->lss_busy += bkt->lsb_busy; + stats->lss_total += cfs_hash_bd_count_get(&bd); + stats->lss_max_search = max((int)stats->lss_max_search, + cfs_hash_bd_depmax_get(&bd)); + if (!populated) { + cfs_hash_bd_unlock(hs, &bd, 1); + continue; + } + + cfs_hash_bd_for_each_hlist(hs, &bd, hhead) { + if (!cfs_hlist_empty(hhead)) + stats->lss_populated++; + } + cfs_hash_bd_unlock(hs, &bd, 1); + } +} #ifdef __KERNEL__ -static int lu_cache_shrink(int nr, unsigned int gfp_mask) + +static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) { + lu_site_stats_t stats; struct lu_site *s; struct lu_site *tmp; int cached = 0; - int remain = nr; + int remain = shrink_param(sc, nr_to_scan); CFS_LIST_HEAD(splice); - if (nr != 0 && !(gfp_mask & __GFP_FS)) - return -1; + if (remain != 0) { + if (!(shrink_param(sc, gfp_mask) & __GFP_FS)) + return -1; + CDEBUG(D_INODE, "Shrink %d objects\n", remain); + } - down(&lu_sites_guard); - list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) { - if (nr != 0) { + cfs_mutex_lock(&lu_sites_guard); + cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) { + if (shrink_param(sc, nr_to_scan) != 0) { remain = lu_site_purge(&lu_shrink_env, s, remain); /* * Move just shrunk site to the tail of site list to * assure shrinking fairness. */ - list_move_tail(&s->ls_linkage, &splice); + cfs_list_move_tail(&s->ls_linkage, &splice); } - read_lock(&s->ls_guard); - cached += s->ls_total - s->ls_busy; - read_unlock(&s->ls_guard); - if (remain <= 0) + + memset(&stats, 0, sizeof(stats)); + lu_site_stats_get(s->ls_obj_hash, &stats, 0); + cached += stats.lss_total - stats.lss_busy; + if (shrink_param(sc, nr_to_scan) && remain <= 0) break; } - list_splice(&splice, lu_sites.prev); - up(&lu_sites_guard); + cfs_list_splice(&splice, lu_sites.prev); + cfs_mutex_unlock(&lu_sites_guard); + + cached = (cached / 100) * sysctl_vfs_cache_pressure; + if (shrink_param(sc, nr_to_scan) == 0) + CDEBUG(D_INODE, "%d objects cached\n", cached); return cached; } @@ -1464,7 +1812,7 @@ struct lu_env lu_debugging_env; * Debugging printer function using printk(). */ int lu_printk_printer(const struct lu_env *env, - void *_, const char *format, ...) + void *unused, const char *format, ...) { va_list args; @@ -1488,10 +1836,10 @@ void lu_context_keys_dump(void) key = lu_keys[i]; if (key != NULL) { - CERROR("[%i]: %p %x (%p,%p,%p) %i %i \"%s\"@%p\n", + CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n", i, key, key->lct_tags, key->lct_init, key->lct_fini, key->lct_exit, - key->lct_index, atomic_read(&key->lct_used), + key->lct_index, cfs_atomic_read(&key->lct_used), key->lct_owner ? key->lct_owner->name : "", key->lct_owner); lu_ref_print(&key->lct_reference); @@ -1524,7 +1872,11 @@ int lu_global_init(void) { int result; - CDEBUG(D_CONSOLE, "Lustre LU module (%p).\n", &lu_keys); + CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys); + + result = lu_ref_global_init(); + if (result != 0) + return result; LU_CONTEXT_KEY_INIT(&lu_global_key); result = lu_context_key_register(&lu_global_key); @@ -1535,21 +1887,18 @@ int lu_global_init(void) * conservatively. This should not be too bad, because this * environment is global. */ - down(&lu_sites_guard); + cfs_mutex_lock(&lu_sites_guard); result = lu_env_init(&lu_shrink_env, LCT_SHRINKER); - up(&lu_sites_guard); + cfs_mutex_unlock(&lu_sites_guard); if (result != 0) return result; - result = lu_ref_global_init(); - if (result != 0) - return result; /* * seeks estimation: 3 seeks to read a record from oi, one to read * inode, one for ea. Unfortunately setting this high value results in * lu_object/inode cache consuming all the memory. */ - lu_site_shrinker = set_shrinker(DEFAULT_SEEKS, lu_cache_shrink); + lu_site_shrinker = cfs_set_shrinker(CFS_DEFAULT_SEEKS, lu_cache_shrink); if (lu_site_shrinker == NULL) return -ENOMEM; @@ -1584,7 +1933,7 @@ void lu_global_fini(void) #endif lu_time_global_fini(); if (lu_site_shrinker != NULL) { - remove_shrinker(lu_site_shrinker); + cfs_remove_shrinker(lu_site_shrinker); lu_site_shrinker = NULL; } @@ -1594,9 +1943,9 @@ void lu_global_fini(void) * Tear shrinker environment down _after_ de-registering * lu_global_key, because the latter has a value in the former. */ - down(&lu_sites_guard); + cfs_mutex_lock(&lu_sites_guard); lu_env_fini(&lu_shrink_env); - up(&lu_sites_guard); + cfs_mutex_unlock(&lu_sites_guard); lu_ref_global_fini(); } @@ -1607,119 +1956,43 @@ struct lu_buf LU_BUF_NULL = { }; EXPORT_SYMBOL(LU_BUF_NULL); -/** - * Output site statistical counters into a buffer. Suitable for - * lprocfs_rd_*()-style functions. - */ -int lu_site_stats_print(const struct lu_site *s, char *page, int count) +static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx) { - int i; - int populated; +#ifdef LPROCFS + struct lprocfs_counter ret; - /* - * How many hash buckets are not-empty? Don't bother with locks: it's - * an estimation anyway. - */ - for (i = 0, populated = 0; i < s->ls_hash_size; i++) - populated += !hlist_empty(&s->ls_hash[i]); - - return snprintf(page, count, "%d %d %d/%d %d %d %d %d %d %d %d\n", - s->ls_total, - s->ls_busy, - populated, - s->ls_hash_size, - s->ls_stats.s_created, - s->ls_stats.s_cache_hit, - s->ls_stats.s_cache_miss, - s->ls_stats.s_cache_check, - s->ls_stats.s_cache_race, - s->ls_stats.s_cache_death_race, - s->ls_stats.s_lru_purged); + lprocfs_stats_collect(stats, idx, &ret); + return (__u32)ret.lc_count; +#else + return 0; +#endif } -EXPORT_SYMBOL(lu_site_stats_print); -#ifdef __KERNEL__ -/* - * XXX: Functions below logically belong to the fid module, but they are used - * by dt_store_open(). Put them here until better place is found. +/** + * Output site statistical counters into a buffer. Suitable for + * lprocfs_rd_*()-style functions. */ - -void fid_pack(struct lu_fid_pack *pack, const struct lu_fid *fid, - struct lu_fid *befider) +int lu_site_stats_print(const struct lu_site *s, char *page, int count) { - int recsize; - __u64 seq; - __u32 oid; - - seq = fid_seq(fid); - oid = fid_oid(fid); - - /* - * Two cases: compact 6 bytes representation for a common case, and - * full 17 byte representation for "unusual" fid. - */ + lu_site_stats_t stats; - /* - * Check that usual case is really usual. - */ - CLASSERT(LUSTRE_SEQ_MAX_WIDTH < 0xffffull); + memset(&stats, 0, sizeof(stats)); + lu_site_stats_get(s->ls_obj_hash, &stats, 1); - /* fid can be packed in six bytes (first byte as length of packed fid, - * three bytes of seq and two bytes of oid). - * this reduces IO overhead specially for OSD Object Index. */ - - if (seq < FID_SEQ_START || - seq > (0xffffffull + FID_SEQ_START) || - oid > 0xffff || fid_ver(fid) != 0) { - fid_cpu_to_be(befider, fid); - recsize = sizeof *befider; - } else { - unsigned char *small_befider; - - small_befider = (unsigned char *)befider; - - small_befider[0] = seq >> 16; - small_befider[1] = seq >> 8; - small_befider[2] = seq; - - small_befider[3] = oid >> 8; - small_befider[4] = oid; - - recsize = 5; - } - memcpy(pack->fp_area, befider, recsize); - pack->fp_len = recsize + 1; -} -EXPORT_SYMBOL(fid_pack); - -int fid_unpack(const struct lu_fid_pack *pack, struct lu_fid *fid) -{ - int result; - - result = 0; - switch (pack->fp_len) { - case sizeof *fid + 1: - memcpy(fid, pack->fp_area, sizeof *fid); - fid_be_to_cpu(fid, fid); - break; - case 6: { - const unsigned char *area; - - area = (unsigned char *)pack->fp_area; - fid->f_seq = (area[0] << 16) | (area[1] << 8) | area[2]; - fid->f_seq += FID_SEQ_START; - fid->f_oid = (area[3] << 8) | area[4]; - fid->f_ver = 0; - break; - } - default: - CERROR("Unexpected packed fid size: %d\n", pack->fp_len); - result = -EIO; - } - return result; + return snprintf(page, count, "%d/%d %d/%d %d %d %d %d %d %d %d\n", + stats.lss_busy, + stats.lss_total, + stats.lss_populated, + CFS_HASH_NHLIST(s->ls_obj_hash), + stats.lss_max_search, + ls_stats_read(s->ls_stats, LU_SS_CREATED), + ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT), + ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS), + ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE), + ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE), + ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED)); } -EXPORT_SYMBOL(fid_unpack); -#endif /* #ifdef __KERNEL__ */ +EXPORT_SYMBOL(lu_site_stats_print); const char *lu_time_names[LU_TIME_NR] = { [LU_TIME_FIND_LOOKUP] = "find_lookup", @@ -1734,13 +2007,16 @@ EXPORT_SYMBOL(lu_time_names); int lu_kmem_init(struct lu_kmem_descr *caches) { int result; + struct lu_kmem_descr *iter = caches; - for (result = 0; caches->ckd_cache != NULL; ++caches) { - *caches->ckd_cache = cfs_mem_cache_create(caches->ckd_name, - caches->ckd_size, - 0, 0); - if (*caches->ckd_cache == NULL) { + for (result = 0; iter->ckd_cache != NULL; ++iter) { + *iter->ckd_cache = cfs_mem_cache_create(iter->ckd_name, + iter->ckd_size, + 0, 0); + if (*iter->ckd_cache == NULL) { result = -ENOMEM; + /* free all previously allocated caches */ + lu_kmem_fini(caches); break; } }