X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Flu_object.c;h=f7bdbccd0ea34d21874847dbb085ab40cf5fcc75;hp=b50b9663175dfaa9c6971e5c2b9bbb93cc06c32b;hb=fbfd488a2f87ea43332ae16341887f68c0ffbde5;hpb=6869932b552ac705f411de3362f01bd50c1f6f7d diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index b50b9663..f7bdbcc 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -39,7 +39,7 @@ * These are the only exported functions, they provide some generic * infrastructure for managing object devices * - * Author: Nikita Danilov + * Author: Nikita Danilov */ #define DEBUG_SUBSYSTEM S_CLASS @@ -47,12 +47,15 @@ # define EXPORT_SYMTAB #endif -#include -#include -/* nr_free_pages() */ -#include +#include + +#ifdef __KERNEL__ +# include +#endif + /* hash_long() */ #include +#include #include #include #include @@ -63,7 +66,7 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o); -/* +/** * Decrease reference counter on object. If last reference is freed, return * object to the cache, unless lu_object_is_dying(o) holds. In the latter * case, free object immediately. @@ -120,15 +123,16 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o) } EXPORT_SYMBOL(lu_object_put); -/* +/** * Allocate new object. * * This follows object creation protocol, described in the comment within * struct lu_device_operations definition. */ static struct lu_object *lu_object_alloc(const struct lu_env *env, - struct lu_site *s, - const struct lu_fid *f) + struct lu_device *dev, + const struct lu_fid *f, + const struct lu_object_conf *conf) { struct lu_object *scan; struct lu_object *top; @@ -141,8 +145,7 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, * Create top-level object slice. This will also create * lu_object_header. */ - top = s->ls_top_dev->ld_ops->ldo_object_alloc(env, - NULL, s->ls_top_dev); + top = dev->ld_ops->ldo_object_alloc(env, NULL, dev); if (top == NULL) RETURN(ERR_PTR(-ENOMEM)); /* @@ -163,7 +166,7 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, continue; clean = 0; scan->lo_header = top->lo_header; - result = scan->lo_ops->loo_object_init(env, scan); + result = scan->lo_ops->loo_object_init(env, scan, conf); if (result != 0) { lu_object_free(env, top); RETURN(ERR_PTR(result)); @@ -182,23 +185,26 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, } } - s->ls_stats.s_created ++; + dev->ld_site->ls_stats.s_created ++; RETURN(top); } -/* - * Free object. +/** + * Free an object. */ static void lu_object_free(const struct lu_env *env, struct lu_object *o) { - struct list_head splice; + struct list_head splice; struct lu_object *scan; + struct lu_site *site; + struct list_head *layers; + site = o->lo_dev->ld_site; + layers = &o->lo_header->loh_layers; /* * First call ->loo_object_delete() method to release all resources. */ - list_for_each_entry_reverse(scan, - &o->lo_header->loh_layers, lo_linkage) { + list_for_each_entry_reverse(scan, layers, lo_linkage) { if (scan->lo_ops->loo_object_delete != NULL) scan->lo_ops->loo_object_delete(env, scan); } @@ -210,17 +216,23 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o) * top-level slice. */ CFS_INIT_LIST_HEAD(&splice); - list_splice_init(&o->lo_header->loh_layers, &splice); + list_splice_init(layers, &splice); while (!list_empty(&splice)) { - o = container_of0(splice.next, struct lu_object, lo_linkage); + /* + * Free layers in bottom-to-top order, so that object header + * lives as long as possible and ->loo_object_free() methods + * can look at its contents. + */ + o = container_of0(splice.prev, struct lu_object, lo_linkage); list_del_init(&o->lo_linkage); LASSERT(o->lo_ops->loo_object_free != NULL); o->lo_ops->loo_object_free(env, o); } + cfs_waitq_broadcast(&site->ls_marche_funebre); } -/* - * Free @nr objects from the cold end of the site LRU list. +/** + * Free \a nr objects from the cold end of the site LRU list. */ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr) { @@ -286,7 +298,7 @@ EXPORT_SYMBOL(lu_site_purge); */ enum { - /* + /** * Maximal line size. * * XXX overflow is not handled correctly. @@ -295,20 +307,16 @@ enum { }; struct lu_cdebug_data { - /* + /** * Temporary buffer. */ char lck_area[LU_CDEBUG_LINE]; - /* - * fid staging area used by dt_store_open(). - */ - struct lu_fid_pack lck_pack; }; /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */ LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data); -/* +/** * Key, holding temporary buffer. This key is registered very early by * lu_global_init(). */ @@ -318,7 +326,7 @@ struct lu_context_key lu_global_key = { .lct_fini = lu_global_key_fini }; -/* +/** * Printer function emitting messages through libcfs_debug_msg(). */ int lu_cdebug_printer(const struct lu_env *env, @@ -328,7 +336,7 @@ int lu_cdebug_printer(const struct lu_env *env, struct lu_cdebug_data *key; int used; int complete; - va_list args; + va_list args; va_start(args, format); @@ -343,9 +351,10 @@ int lu_cdebug_printer(const struct lu_env *env, vsnprintf(key->lck_area + used, ARRAY_SIZE(key->lck_area) - used, format, args); if (complete) { - libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask, - (char *)info->lpi_file, info->lpi_fn, - info->lpi_line, "%s", key->lck_area); + if (cdebug_show(info->lpi_mask, info->lpi_subsys)) + libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask, + (char *)info->lpi_file, info->lpi_fn, + info->lpi_line, "%s", key->lck_area); key->lck_area[0] = 0; } va_end(args); @@ -353,23 +362,24 @@ int lu_cdebug_printer(const struct lu_env *env, } EXPORT_SYMBOL(lu_cdebug_printer); -/* +/** * Print object header. */ -static void lu_object_header_print(const struct lu_env *env, - void *cookie, lu_printer_t printer, - const struct lu_object_header *hdr) +void lu_object_header_print(const struct lu_env *env, void *cookie, + lu_printer_t printer, + const struct lu_object_header *hdr) { (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]", hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref), PFID(&hdr->loh_fid), hlist_unhashed(&hdr->loh_hash) ? "" : " hash", - list_empty(&hdr->loh_lru) ? "" : " lru", + list_empty((struct list_head *)&hdr->loh_lru) ? "" : " lru", hdr->loh_attr & LOHA_EXISTS ? " exist":""); } +EXPORT_SYMBOL(lu_object_header_print); -/* - * Print human readable representation of the @o to the @printer. +/** + * Print human readable representation of the \a o to the \a printer. */ void lu_object_print(const struct lu_env *env, void *cookie, lu_printer_t printer, const struct lu_object *o) @@ -380,21 +390,24 @@ void lu_object_print(const struct lu_env *env, void *cookie, top = o->lo_header; lu_object_header_print(env, cookie, printer, top); - (*printer)(env, cookie, "\n"); + (*printer)(env, cookie, "{ \n"); list_for_each_entry(o, &top->loh_layers, lo_linkage) { depth = o->lo_depth + 4; - LASSERT(o->lo_ops->loo_object_print != NULL); + /* - * print `.' @depth times. + * print `.' \a depth times followed by type name and address */ - (*printer)(env, cookie, "%*.*s", depth, depth, ruler); - o->lo_ops->loo_object_print(env, cookie, printer, o); + (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler, + o->lo_dev->ld_type->ldt_name, o); + if (o->lo_ops->loo_object_print != NULL) + o->lo_ops->loo_object_print(env, cookie, printer, o); (*printer)(env, cookie, "\n"); } + (*printer)(env, cookie, "} header@%p\n", top); } EXPORT_SYMBOL(lu_object_print); -/* +/** * Check object consistency. */ int lu_object_invariant(const struct lu_object *o) @@ -413,15 +426,29 @@ EXPORT_SYMBOL(lu_object_invariant); static struct lu_object *htable_lookup(struct lu_site *s, const struct hlist_head *bucket, - const struct lu_fid *f) + const struct lu_fid *f, + cfs_waitlink_t *waiter) { struct lu_object_header *h; struct hlist_node *scan; hlist_for_each_entry(h, scan, bucket, loh_hash) { s->ls_stats.s_cache_check ++; - if (likely(lu_fid_eq(&h->loh_fid, f) && - !lu_object_is_dying(h))) { + if (likely(lu_fid_eq(&h->loh_fid, f))) { + if (unlikely(lu_object_is_dying(h))) { + /* + * Lookup found an object being destroyed; + * this object cannot be returned (to assure + * that references to dying objects are + * eventually drained), and moreover, lookup + * has to wait until object is freed. + */ + cfs_waitlink_init(waiter); + cfs_waitq_add(&s->ls_marche_funebre, waiter); + set_current_state(CFS_TASK_UNINT); + s->ls_stats.s_cache_death_race ++; + return ERR_PTR(-EAGAIN); + } /* bump reference count... */ if (atomic_add_return(1, &h->loh_ref) == 1) ++ s->ls_busy; @@ -446,16 +473,31 @@ static __u32 fid_hash(const struct lu_fid *f, int bits) return hash_long(fid_flatten(f), bits); } -/* - * Search cache for an object with the fid @f. If such object is found, return - * it. Otherwise, create new object, insert it into cache and return it. In - * any case, additional reference is acquired on the returned object. +/** + * Search cache for an object with the fid \a f. If such object is found, + * return it. Otherwise, create new object, insert it into cache and return + * it. In any case, additional reference is acquired on the returned object. */ struct lu_object *lu_object_find(const struct lu_env *env, - struct lu_site *s, const struct lu_fid *f) + struct lu_device *dev, const struct lu_fid *f, + const struct lu_object_conf *conf) { - struct lu_object *o; - struct lu_object *shadow; + return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf); +} +EXPORT_SYMBOL(lu_object_find); + +/** + * Core logic of lu_object_find*() functions. + */ +static struct lu_object *lu_object_find_try(const struct lu_env *env, + struct lu_device *dev, + const struct lu_fid *f, + const struct lu_object_conf *conf, + cfs_waitlink_t *waiter) +{ + struct lu_site *s; + struct lu_object *o; + struct lu_object *shadow; struct hlist_head *bucket; /* @@ -470,12 +512,16 @@ struct lu_object *lu_object_find(const struct lu_env *env, * object just allocated. * - unlock index; * - return object. + * + * If dying object is found during index search, add @waiter to the + * site wait-queue and return ERR_PTR(-EAGAIN). */ + s = dev->ld_site; bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits); read_lock(&s->ls_guard); - o = htable_lookup(s, bucket, f); + o = htable_lookup(s, bucket, f, waiter); read_unlock(&s->ls_guard); if (o != NULL) @@ -485,14 +531,14 @@ struct lu_object *lu_object_find(const struct lu_env *env, * Allocate new object. This may result in rather complicated * operations, including fld queries, inode loading, etc. */ - o = lu_object_alloc(env, s, f); + o = lu_object_alloc(env, dev, f, conf); if (unlikely(IS_ERR(o))) return o; LASSERT(lu_fid_eq(lu_object_fid(o), f)); write_lock(&s->ls_guard); - shadow = htable_lookup(s, bucket, f); + shadow = htable_lookup(s, bucket, f, waiter); if (likely(shadow == NULL)) { hlist_add_head(&o->lo_header->loh_hash, bucket); list_add_tail(&o->lo_header->loh_lru, &s->ls_lru); @@ -507,21 +553,106 @@ struct lu_object *lu_object_find(const struct lu_env *env, lu_object_free(env, o); return shadow; } -EXPORT_SYMBOL(lu_object_find); -/* +/** + * Much like lu_object_find(), but top level device of object is specifically + * \a dev rather than top level device of the site. This interface allows + * objects of different "stacking" to be created within the same site. + */ +struct lu_object *lu_object_find_at(const struct lu_env *env, + struct lu_device *dev, + const struct lu_fid *f, + const struct lu_object_conf *conf) +{ + struct lu_object *obj; + cfs_waitlink_t wait; + + while (1) { + obj = lu_object_find_try(env, dev, f, conf, &wait); + if (obj == ERR_PTR(-EAGAIN)) { + /* + * lu_object_find_try() already added waiter into the + * wait queue. + */ + cfs_waitq_wait(&wait, CFS_TASK_UNINT); + cfs_waitq_del(&dev->ld_site->ls_marche_funebre, &wait); + } else + break; + } + return obj; +} +EXPORT_SYMBOL(lu_object_find_at); + +/** + * Find object with given fid, and return its slice belonging to given device. + */ +struct lu_object *lu_object_find_slice(const struct lu_env *env, + struct lu_device *dev, + const struct lu_fid *f, + const struct lu_object_conf *conf) +{ + struct lu_object *top; + struct lu_object *obj; + + top = lu_object_find(env, dev, f, conf); + if (!IS_ERR(top)) { + obj = lu_object_locate(top->lo_header, dev->ld_type); + if (obj == NULL) + lu_object_put(env, top); + } else + obj = top; + return obj; +} +EXPORT_SYMBOL(lu_object_find_slice); + +/** + * Global list of all device types. + */ +static CFS_LIST_HEAD(lu_device_types); + +int lu_device_type_init(struct lu_device_type *ldt) +{ + int result; + + CFS_INIT_LIST_HEAD(&ldt->ldt_linkage); + result = ldt->ldt_ops->ldto_init(ldt); + if (result == 0) + list_add(&ldt->ldt_linkage, &lu_device_types); + return result; +} +EXPORT_SYMBOL(lu_device_type_init); + +void lu_device_type_fini(struct lu_device_type *ldt) +{ + list_del_init(&ldt->ldt_linkage); + ldt->ldt_ops->ldto_fini(ldt); +} +EXPORT_SYMBOL(lu_device_type_fini); + +void lu_types_stop(void) +{ + struct lu_device_type *ldt; + + list_for_each_entry(ldt, &lu_device_types, ldt_linkage) { + if (ldt->ldt_device_nr == 0) + ldt->ldt_ops->ldto_stop(ldt); + } +} +EXPORT_SYMBOL(lu_types_stop); + +/** * Global list of all sites on this node */ static CFS_LIST_HEAD(lu_sites); static DECLARE_MUTEX(lu_sites_guard); -/* +/** * Global environment used by site shrinker. */ static struct lu_env lu_shrink_env; -/* - * Print all objects in @s. +/** + * Print all objects in \a s. */ void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie, lu_printer_t printer) @@ -552,7 +683,7 @@ enum { LU_CACHE_PERCENT = 20, }; -/* +/** * Return desired hash table order. */ static int lu_htable_order(void) @@ -584,8 +715,10 @@ static int lu_htable_order(void) return bits; } -/* - * Initialize site @s, with @d as the top level device. +static struct lock_class_key lu_site_guard_class; + +/** + * Initialize site \a s, with \a d as the top level device. */ int lu_site_init(struct lu_site *s, struct lu_device *top) { @@ -596,11 +729,14 @@ int lu_site_init(struct lu_site *s, struct lu_device *top) memset(s, 0, sizeof *s); rwlock_init(&s->ls_guard); + lockdep_set_class(&s->ls_guard, &lu_site_guard_class); CFS_INIT_LIST_HEAD(&s->ls_lru); CFS_INIT_LIST_HEAD(&s->ls_linkage); + cfs_waitq_init(&s->ls_marche_funebre); s->ls_top_dev = top; top->ld_site = s; lu_device_get(top); + lu_ref_add(&top->ld_reference, "site-top", s); for (bits = lu_htable_order(), size = 1 << bits; (s->ls_hash = @@ -623,8 +759,8 @@ int lu_site_init(struct lu_site *s, struct lu_device *top) } EXPORT_SYMBOL(lu_site_init); -/* - * Finalize @s and release its resources. +/** + * Finalize \a s and release its resources. */ void lu_site_fini(struct lu_site *s) { @@ -644,13 +780,14 @@ void lu_site_fini(struct lu_site *s) } if (s->ls_top_dev != NULL) { s->ls_top_dev->ld_site = NULL; + lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s); lu_device_put(s->ls_top_dev); s->ls_top_dev = NULL; } } EXPORT_SYMBOL(lu_site_fini); -/* +/** * Called when initialization of stack for this site is completed. */ int lu_site_init_finish(struct lu_site *s) @@ -665,8 +802,8 @@ int lu_site_init_finish(struct lu_site *s) } EXPORT_SYMBOL(lu_site_init_finish); -/* - * Acquire additional reference on device @d +/** + * Acquire additional reference on device \a d */ void lu_device_get(struct lu_device *d) { @@ -674,44 +811,56 @@ void lu_device_get(struct lu_device *d) } EXPORT_SYMBOL(lu_device_get); -/* - * Release reference on device @d. +/** + * Release reference on device \a d. */ void lu_device_put(struct lu_device *d) { + LASSERT(atomic_read(&d->ld_ref) > 0); atomic_dec(&d->ld_ref); } EXPORT_SYMBOL(lu_device_put); -/* - * Initialize device @d of type @t. +/** + * Initialize device \a d of type \a t. */ int lu_device_init(struct lu_device *d, struct lu_device_type *t) { + if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL) + t->ldt_ops->ldto_start(t); memset(d, 0, sizeof *d); atomic_set(&d->ld_ref, 0); d->ld_type = t; + lu_ref_init(&d->ld_reference); return 0; } EXPORT_SYMBOL(lu_device_init); -/* - * Finalize device @d. +/** + * Finalize device \a d. */ void lu_device_fini(struct lu_device *d) { - if (d->ld_obd != NULL) - /* finish lprocfs */ - lprocfs_obd_cleanup(d->ld_obd); + struct lu_device_type *t; + + t = d->ld_type; + if (d->ld_obd != NULL) { + d->ld_obd->obd_lu_dev = NULL; + d->ld_obd = NULL; + } + lu_ref_fini(&d->ld_reference); LASSERTF(atomic_read(&d->ld_ref) == 0, "Refcount is %u\n", atomic_read(&d->ld_ref)); + LASSERT(t->ldt_device_nr > 0); + if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL) + t->ldt_ops->ldto_stop(t); } EXPORT_SYMBOL(lu_device_fini); -/* - * Initialize object @o that is part of compound object @h and was created by - * device @d. +/** + * Initialize object \a o that is part of compound object \a h and was created + * by device \a d. */ int lu_object_init(struct lu_object *o, struct lu_object_header *h, struct lu_device *d) @@ -720,27 +869,32 @@ int lu_object_init(struct lu_object *o, o->lo_header = h; o->lo_dev = d; lu_device_get(d); + o->lo_dev_ref = lu_ref_add(&d->ld_reference, "lu_object", o); CFS_INIT_LIST_HEAD(&o->lo_linkage); return 0; } EXPORT_SYMBOL(lu_object_init); -/* +/** * Finalize object and release its resources. */ void lu_object_fini(struct lu_object *o) { + struct lu_device *dev = o->lo_dev; + LASSERT(list_empty(&o->lo_linkage)); - if (o->lo_dev != NULL) { - lu_device_put(o->lo_dev); + if (dev != NULL) { + lu_ref_del_at(&dev->ld_reference, + o->lo_dev_ref , "lu_object", o); + lu_device_put(dev); o->lo_dev = NULL; } } EXPORT_SYMBOL(lu_object_fini); -/* - * Add object @o as first layer of compound object @h +/** + * Add object \a o as first layer of compound object \a h * * This is typically called by the ->ldo_object_alloc() method of top-level * device. @@ -751,11 +905,11 @@ void lu_object_add_top(struct lu_object_header *h, struct lu_object *o) } EXPORT_SYMBOL(lu_object_add_top); -/* - * Add object @o as a layer of compound object, going after @before.1 +/** + * Add object \a o as a layer of compound object, going after \a before. * - * This is typically called by the ->ldo_object_alloc() method of - * @before->lo_dev. + * This is typically called by the ->ldo_object_alloc() method of \a + * before->lo_dev. */ void lu_object_add(struct lu_object *before, struct lu_object *o) { @@ -763,7 +917,7 @@ void lu_object_add(struct lu_object *before, struct lu_object *o) } EXPORT_SYMBOL(lu_object_add); -/* +/** * Initialize compound object. */ int lu_object_header_init(struct lu_object_header *h) @@ -773,11 +927,12 @@ int lu_object_header_init(struct lu_object_header *h) INIT_HLIST_NODE(&h->loh_hash); CFS_INIT_LIST_HEAD(&h->loh_lru); CFS_INIT_LIST_HEAD(&h->loh_layers); + lu_ref_init(&h->loh_reference); return 0; } EXPORT_SYMBOL(lu_object_header_init); -/* +/** * Finalize compound object. */ void lu_object_header_fini(struct lu_object_header *h) @@ -785,15 +940,16 @@ void lu_object_header_fini(struct lu_object_header *h) LASSERT(list_empty(&h->loh_layers)); LASSERT(list_empty(&h->loh_lru)); LASSERT(hlist_unhashed(&h->loh_hash)); + lu_ref_fini(&h->loh_reference); } EXPORT_SYMBOL(lu_object_header_fini); -/* +/** * Given a compound object, find its slice, corresponding to the device type - * @dtype. + * \a dtype. */ struct lu_object *lu_object_locate(struct lu_object_header *h, - struct lu_device_type *dtype) + const struct lu_device_type *dtype) { struct lu_object *o; @@ -807,9 +963,9 @@ EXPORT_SYMBOL(lu_object_locate); -/* +/** * Finalize and free devices in the device stack. - * + * * Finalize device stack by purging object cache, and calling * lu_device_type_operations::ldto_device_fini() and * lu_device_type_operations::ldto_device_free() on all devices in the stack. @@ -823,6 +979,7 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top) lu_site_purge(env, site, ~0); for (scan = top; scan != NULL; scan = next) { next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan); + lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init); lu_device_put(scan); } @@ -844,24 +1001,34 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top) next = ldt->ldt_ops->ldto_device_free(env, scan); type = ldt->ldt_obd_type; - type->typ_refcnt--; - class_put_type(type); + if (type != NULL) { + type->typ_refcnt--; + class_put_type(type); + } } } EXPORT_SYMBOL(lu_stack_fini); enum { - /* + /** * Maximal number of tld slots. */ - LU_CONTEXT_KEY_NR = 16 + LU_CONTEXT_KEY_NR = 32 }; static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, }; static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED; -/* +/** + * Global counter incremented whenever key is registered, unregistered, + * revived or quiesced. This is used to void unnecessary calls to + * lu_context_refill(). No locking is provided, as initialization and shutdown + * are supposed to be externally serialized. + */ +static unsigned key_set_version = 0; + +/** * Register new key. */ int lu_context_key_register(struct lu_context_key *key) @@ -881,7 +1048,9 @@ int lu_context_key_register(struct lu_context_key *key) key->lct_index = i; atomic_set(&key->lct_used, 1); lu_keys[i] = key; + lu_ref_init(&key->lct_reference); result = 0; + ++key_set_version; break; } } @@ -892,7 +1061,7 @@ EXPORT_SYMBOL(lu_context_key_register); static void key_fini(struct lu_context *ctx, int index) { - if (ctx->lc_value[index] != NULL) { + if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) { struct lu_context_key *key; key = lu_keys[index]; @@ -901,6 +1070,7 @@ static void key_fini(struct lu_context *ctx, int index) LASSERT(atomic_read(&key->lct_used) > 1); key->lct_fini(ctx, key, ctx->lc_value[index]); + lu_ref_del(&key->lct_reference, "ctx", ctx); atomic_dec(&key->lct_used); LASSERT(key->lct_owner != NULL); if (!(ctx->lc_tags & LCT_NOREF)) { @@ -911,40 +1081,169 @@ static void key_fini(struct lu_context *ctx, int index) } } -/* +/** * Deregister key. */ void lu_context_key_degister(struct lu_context_key *key) { LASSERT(atomic_read(&key->lct_used) >= 1); - LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys)); + LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys)); - key_fini(&lu_shrink_env.le_ctx, key->lct_index); + lu_context_key_quiesce(key); - if (atomic_read(&key->lct_used) > 1) - CERROR("key has instances.\n"); + ++key_set_version; spin_lock(&lu_keys_guard); - lu_keys[key->lct_index] = NULL; + key_fini(&lu_shrink_env.le_ctx, key->lct_index); + if (lu_keys[key->lct_index]) { + lu_keys[key->lct_index] = NULL; + lu_ref_fini(&key->lct_reference); + } spin_unlock(&lu_keys_guard); + + LASSERTF(atomic_read(&key->lct_used) == 1, "key has instances: %d\n", + atomic_read(&key->lct_used)); } EXPORT_SYMBOL(lu_context_key_degister); -/* - * Return value associated with key @key in context @ctx. +/** + * Register a number of keys. This has to be called after all keys have been + * initialized by a call to LU_CONTEXT_KEY_INIT(). + */ +int lu_context_key_register_many(struct lu_context_key *k, ...) +{ + struct lu_context_key *key = k; + va_list args; + int result; + + va_start(args, k); + do { + result = lu_context_key_register(key); + if (result) + break; + key = va_arg(args, struct lu_context_key *); + } while (key != NULL); + va_end(args); + + if (result != 0) { + va_start(args, k); + while (k != key) { + lu_context_key_degister(k); + k = va_arg(args, struct lu_context_key *); + } + va_end(args); + } + + return result; +} +EXPORT_SYMBOL(lu_context_key_register_many); + +/** + * De-register a number of keys. This is a dual to + * lu_context_key_register_many(). + */ +void lu_context_key_degister_many(struct lu_context_key *k, ...) +{ + va_list args; + + va_start(args, k); + do { + lu_context_key_degister(k); + k = va_arg(args, struct lu_context_key*); + } while (k != NULL); + va_end(args); +} +EXPORT_SYMBOL(lu_context_key_degister_many); + +/** + * Revive a number of keys. + */ +void lu_context_key_revive_many(struct lu_context_key *k, ...) +{ + va_list args; + + va_start(args, k); + do { + lu_context_key_revive(k); + k = va_arg(args, struct lu_context_key*); + } while (k != NULL); + va_end(args); +} +EXPORT_SYMBOL(lu_context_key_revive_many); + +/** + * Quiescent a number of keys. + */ +void lu_context_key_quiesce_many(struct lu_context_key *k, ...) +{ + va_list args; + + va_start(args, k); + do { + lu_context_key_quiesce(k); + k = va_arg(args, struct lu_context_key*); + } while (k != NULL); + va_end(args); +} +EXPORT_SYMBOL(lu_context_key_quiesce_many); + +/** + * Return value associated with key \a key in context \a ctx. */ void *lu_context_key_get(const struct lu_context *ctx, - struct lu_context_key *key) + const struct lu_context_key *key) { - LASSERT(ctx->lc_state == LCS_ENTERED); - LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys)); + LINVRNT(ctx->lc_state == LCS_ENTERED); + LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys)); + LASSERT(lu_keys[key->lct_index] == key); return ctx->lc_value[key->lct_index]; } EXPORT_SYMBOL(lu_context_key_get); +/** + * List of remembered contexts. XXX document me. + */ +static CFS_LIST_HEAD(lu_context_remembered); + +/** + * Destroy \a key in all remembered contexts. This is used to destroy key + * values in "shared" contexts (like service threads), when a module owning + * the key is about to be unloaded. + */ +void lu_context_key_quiesce(struct lu_context_key *key) +{ + struct lu_context *ctx; + extern unsigned cl_env_cache_purge(unsigned nr); + + if (!(key->lct_tags & LCT_QUIESCENT)) { + /* + * XXX layering violation. + */ + cl_env_cache_purge(~0); + key->lct_tags |= LCT_QUIESCENT; + /* + * XXX memory barrier has to go here. + */ + spin_lock(&lu_keys_guard); + list_for_each_entry(ctx, &lu_context_remembered, lc_remember) + key_fini(ctx, key->lct_index); + spin_unlock(&lu_keys_guard); + ++key_set_version; + } +} +EXPORT_SYMBOL(lu_context_key_quiesce); + +void lu_context_key_revive(struct lu_context_key *key) +{ + key->lct_tags &= ~LCT_QUIESCENT; + ++key_set_version; +} +EXPORT_SYMBOL(lu_context_key_revive); + static void keys_fini(struct lu_context *ctx) { int i; + spin_lock(&lu_keys_guard); if (ctx->lc_value != NULL) { for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) key_fini(ctx, i); @@ -952,9 +1251,10 @@ static void keys_fini(struct lu_context *ctx) ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]); ctx->lc_value = NULL; } + spin_unlock(&lu_keys_guard); } -static int keys_fill(const struct lu_context *ctx) +static int keys_fill(struct lu_context *ctx) { int i; @@ -962,22 +1262,37 @@ static int keys_fill(const struct lu_context *ctx) struct lu_context_key *key; key = lu_keys[i]; - if (ctx->lc_value[i] == NULL && - key != NULL && key->lct_tags & ctx->lc_tags) { + if (ctx->lc_value[i] == NULL && key != NULL && + (key->lct_tags & ctx->lc_tags) && + /* + * Don't create values for a LCT_QUIESCENT key, as this + * will pin module owning a key. + */ + !(key->lct_tags & LCT_QUIESCENT)) { void *value; - LASSERT(key->lct_init != NULL); - LASSERT(key->lct_index == i); + LINVRNT(key->lct_init != NULL); + LINVRNT(key->lct_index == i); value = key->lct_init(ctx, key); if (unlikely(IS_ERR(value))) return PTR_ERR(value); + LASSERT(key->lct_owner != NULL); if (!(ctx->lc_tags & LCT_NOREF)) try_module_get(key->lct_owner); + lu_ref_add_atomic(&key->lct_reference, "ctx", ctx); atomic_inc(&key->lct_used); + /* + * This is the only place in the code, where an + * element of ctx->lc_value[] array is set to non-NULL + * value. + */ ctx->lc_value[i] = value; + if (key->lct_exit != NULL) + ctx->lc_tags |= LCT_HAS_EXIT; } + ctx->lc_version = key_set_version; } return 0; } @@ -997,7 +1312,7 @@ static int keys_init(struct lu_context *ctx) return result; } -/* +/** * Initialize context data-structure. Create values for all keys. */ int lu_context_init(struct lu_context *ctx, __u32 tags) @@ -1005,41 +1320,50 @@ int lu_context_init(struct lu_context *ctx, __u32 tags) memset(ctx, 0, sizeof *ctx); ctx->lc_state = LCS_INITIALIZED; ctx->lc_tags = tags; + if (tags & LCT_REMEMBER) { + spin_lock(&lu_keys_guard); + list_add(&ctx->lc_remember, &lu_context_remembered); + spin_unlock(&lu_keys_guard); + } else + CFS_INIT_LIST_HEAD(&ctx->lc_remember); return keys_init(ctx); } EXPORT_SYMBOL(lu_context_init); -/* +/** * Finalize context data-structure. Destroy key values. */ void lu_context_fini(struct lu_context *ctx) { - LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); + LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); ctx->lc_state = LCS_FINALIZED; keys_fini(ctx); + spin_lock(&lu_keys_guard); + list_del_init(&ctx->lc_remember); + spin_unlock(&lu_keys_guard); } EXPORT_SYMBOL(lu_context_fini); -/* +/** * Called before entering context. */ void lu_context_enter(struct lu_context *ctx) { - LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); + LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); ctx->lc_state = LCS_ENTERED; } EXPORT_SYMBOL(lu_context_enter); -/* - * Called after exiting from @ctx +/** + * Called after exiting from \a ctx */ void lu_context_exit(struct lu_context *ctx) { int i; - LASSERT(ctx->lc_state == LCS_ENTERED); + LINVRNT(ctx->lc_state == LCS_ENTERED); ctx->lc_state = LCS_LEFT; - if (ctx->lc_value != NULL) { + if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) { for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { if (ctx->lc_value[i] != NULL) { struct lu_context_key *key; @@ -1055,41 +1379,27 @@ void lu_context_exit(struct lu_context *ctx) } EXPORT_SYMBOL(lu_context_exit); -/* +/** * Allocate for context all missing keys that were registered after context * creation. */ -int lu_context_refill(const struct lu_context *ctx) +int lu_context_refill(struct lu_context *ctx) { - LASSERT(ctx->lc_value != NULL); - return keys_fill(ctx); + LINVRNT(ctx->lc_value != NULL); + return ctx->lc_version == key_set_version ? 0 : keys_fill(ctx); } EXPORT_SYMBOL(lu_context_refill); -static int lu_env_setup(struct lu_env *env, struct lu_context *ses, - __u32 tags, int noref) +int lu_env_init(struct lu_env *env, __u32 tags) { int result; - LASSERT(ergo(!noref, !(tags & LCT_NOREF))); - - env->le_ses = ses; + env->le_ses = NULL; result = lu_context_init(&env->le_ctx, tags); if (likely(result == 0)) lu_context_enter(&env->le_ctx); return result; } - -static int lu_env_init_noref(struct lu_env *env, struct lu_context *ses, - __u32 tags) -{ - return lu_env_setup(env, ses, tags, 1); -} - -int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags) -{ - return lu_env_setup(env, ses, tags, 0); -} EXPORT_SYMBOL(lu_env_init); void lu_env_fini(struct lu_env *env) @@ -1100,6 +1410,20 @@ void lu_env_fini(struct lu_env *env) } EXPORT_SYMBOL(lu_env_fini); +int lu_env_refill(struct lu_env *env) +{ + int result; + + result = lu_context_refill(&env->le_ctx); + if (result == 0 && env->le_ses != NULL) + result = lu_context_refill(env->le_ses); + return result; +} +EXPORT_SYMBOL(lu_env_refill); + +static struct shrinker *lu_site_shrinker = NULL; + +#ifdef __KERNEL__ static int lu_cache_shrink(int nr, unsigned int gfp_mask) { struct lu_site *s; @@ -1108,8 +1432,11 @@ static int lu_cache_shrink(int nr, unsigned int gfp_mask) int remain = nr; CFS_LIST_HEAD(splice); - if (nr != 0 && !(gfp_mask & __GFP_FS)) - return -1; + if (nr != 0) { + if (!(gfp_mask & __GFP_FS)) + return -1; + CDEBUG(D_INODE, "Shrink %d objects\n", nr); + } down(&lu_sites_guard); list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) { @@ -1124,55 +1451,150 @@ static int lu_cache_shrink(int nr, unsigned int gfp_mask) read_lock(&s->ls_guard); cached += s->ls_total - s->ls_busy; read_unlock(&s->ls_guard); - if (remain <= 0) + if (nr && remain <= 0) break; } list_splice(&splice, lu_sites.prev); up(&lu_sites_guard); + + cached = (cached / 100) * sysctl_vfs_cache_pressure; + if (nr == 0) + CDEBUG(D_INODE, "%d objects cached\n", cached); return cached; } -static struct shrinker *lu_site_shrinker = NULL; - /* + * Debugging stuff. + */ + +/** + * Environment to be used in debugger, contains all tags. + */ +struct lu_env lu_debugging_env; + +/** + * Debugging printer function using printk(). + */ +int lu_printk_printer(const struct lu_env *env, + void *unused, const char *format, ...) +{ + va_list args; + + va_start(args, format); + vprintk(format, args); + va_end(args); + return 0; +} + +void lu_debugging_setup(void) +{ + lu_env_init(&lu_debugging_env, ~0); +} + +void lu_context_keys_dump(void) +{ + int i; + + for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { + struct lu_context_key *key; + + key = lu_keys[i]; + if (key != NULL) { + CERROR("[%i]: %p %x (%p,%p,%p) %i %i \"%s\"@%p\n", + i, key, key->lct_tags, + key->lct_init, key->lct_fini, key->lct_exit, + key->lct_index, atomic_read(&key->lct_used), + key->lct_owner ? key->lct_owner->name : "", + key->lct_owner); + lu_ref_print(&key->lct_reference); + } + } +} +EXPORT_SYMBOL(lu_context_keys_dump); +#else /* !__KERNEL__ */ +static int lu_cache_shrink(int nr, unsigned int gfp_mask) +{ + return 0; +} +#endif /* __KERNEL__ */ + +int cl_global_init(void); +void cl_global_fini(void); +int lu_ref_global_init(void); +void lu_ref_global_fini(void); + +int dt_global_init(void); +void dt_global_fini(void); + +int llo_global_init(void); +void llo_global_fini(void); + +/** * Initialization of global lu_* data. */ int lu_global_init(void) { int result; + CDEBUG(D_CONSOLE, "Lustre LU module (%p).\n", &lu_keys); + + result = lu_ref_global_init(); + if (result != 0) + return result; + LU_CONTEXT_KEY_INIT(&lu_global_key); result = lu_context_key_register(&lu_global_key); - if (result == 0) { - /* - * At this level, we don't know what tags are needed, so - * allocate them conservatively. This should not be too bad, - * because this environment is global. - */ - down(&lu_sites_guard); - result = lu_env_init_noref(&lu_shrink_env, NULL, LCT_SHRINKER); - up(&lu_sites_guard); - if (result == 0) { - /* - * seeks estimation: 3 seeks to read a record from oi, - * one to read inode, one for ea. Unfortunately - * setting this high value results in lu_object/inode - * cache consuming all the memory. - */ - lu_site_shrinker = set_shrinker(DEFAULT_SEEKS, - lu_cache_shrink); - if (result == 0) - result = lu_time_global_init(); - } - } + if (result != 0) + return result; + /* + * At this level, we don't know what tags are needed, so allocate them + * conservatively. This should not be too bad, because this + * environment is global. + */ + down(&lu_sites_guard); + result = lu_env_init(&lu_shrink_env, LCT_SHRINKER); + up(&lu_sites_guard); + if (result != 0) + return result; + + /* + * seeks estimation: 3 seeks to read a record from oi, one to read + * inode, one for ea. Unfortunately setting this high value results in + * lu_object/inode cache consuming all the memory. + */ + lu_site_shrinker = set_shrinker(DEFAULT_SEEKS, lu_cache_shrink); + if (lu_site_shrinker == NULL) + return -ENOMEM; + + result = lu_time_global_init(); + if (result) + GOTO(out, result); + +#ifdef __KERNEL__ + result = dt_global_init(); + if (result) + GOTO(out, result); + + result = llo_global_init(); + if (result) + GOTO(out, result); +#endif + result = cl_global_init(); +out: + return result; } -/* +/** * Dual to lu_global_init(). */ void lu_global_fini(void) { + cl_global_fini(); +#ifdef __KERNEL__ + llo_global_fini(); + dt_global_fini(); +#endif lu_time_global_fini(); if (lu_site_shrinker != NULL) { remove_shrinker(lu_site_shrinker); @@ -1188,6 +1610,8 @@ void lu_global_fini(void) down(&lu_sites_guard); lu_env_fini(&lu_shrink_env); up(&lu_sites_guard); + + lu_ref_global_fini(); } struct lu_buf LU_BUF_NULL = { @@ -1196,84 +1620,79 @@ struct lu_buf LU_BUF_NULL = { }; EXPORT_SYMBOL(LU_BUF_NULL); -/* - * XXX: Functions below logically belong to fid module, but they are used by - * dt_store_open(). Put them here until better place is found. +/** + * Output site statistical counters into a buffer. Suitable for + * lprocfs_rd_*()-style functions. */ - -void fid_pack(struct lu_fid_pack *pack, const struct lu_fid *fid, - struct lu_fid *befider) +int lu_site_stats_print(const struct lu_site *s, char *page, int count) { - int recsize; - __u64 seq; - __u32 oid; - - seq = fid_seq(fid); - oid = fid_oid(fid); - - /* - * Two cases: compact 6 bytes representation for a common case, and - * full 17 byte representation for "unusual" fid. - */ + int i; + int populated; /* - * Check that usual case is really usual. + * How many hash buckets are not-empty? Don't bother with locks: it's + * an estimation anyway. */ - CLASSERT(LUSTRE_SEQ_MAX_WIDTH < 0xffffull); - - if (fid_is_igif(fid) || - seq > 0xffffffull || oid > 0xffff || fid_ver(fid) != 0) { - fid_cpu_to_be(befider, fid); - recsize = sizeof *befider; - } else { - unsigned char *small_befider; - - small_befider = (char *)befider; - - small_befider[0] = seq >> 16; - small_befider[1] = seq >> 8; - small_befider[2] = seq; - - small_befider[3] = oid >> 8; - small_befider[4] = oid; - - recsize = 5; - } - memcpy(pack->fp_area, befider, recsize); - pack->fp_len = recsize + 1; + for (i = 0, populated = 0; i < s->ls_hash_size; i++) + populated += !hlist_empty(&s->ls_hash[i]); + + return snprintf(page, count, "%d %d %d/%d %d %d %d %d %d %d %d\n", + s->ls_total, + s->ls_busy, + populated, + s->ls_hash_size, + s->ls_stats.s_created, + s->ls_stats.s_cache_hit, + s->ls_stats.s_cache_miss, + s->ls_stats.s_cache_check, + s->ls_stats.s_cache_race, + s->ls_stats.s_cache_death_race, + s->ls_stats.s_lru_purged); } -EXPORT_SYMBOL(fid_pack); +EXPORT_SYMBOL(lu_site_stats_print); + +const char *lu_time_names[LU_TIME_NR] = { + [LU_TIME_FIND_LOOKUP] = "find_lookup", + [LU_TIME_FIND_ALLOC] = "find_alloc", + [LU_TIME_FIND_INSERT] = "find_insert" +}; +EXPORT_SYMBOL(lu_time_names); -int fid_unpack(const struct lu_fid_pack *pack, struct lu_fid *fid) +/** + * Helper function to initialize a number of kmem slab caches at once. + */ +int lu_kmem_init(struct lu_kmem_descr *caches) { int result; - result = 0; - switch (pack->fp_len) { - case sizeof *fid + 1: - memcpy(fid, pack->fp_area, sizeof *fid); - fid_be_to_cpu(fid, fid); - break; - case 6: { - const unsigned char *area; - - area = pack->fp_area; - fid->f_seq = (area[0] << 16) | (area[1] << 8) | area[2]; - fid->f_oid = (area[3] << 8) | area[4]; - fid->f_ver = 0; - break; - } - default: - CERROR("Unexpected packed fid size: %d\n", pack->fp_len); - result = -EIO; + for (result = 0; caches->ckd_cache != NULL; ++caches) { + *caches->ckd_cache = cfs_mem_cache_create(caches->ckd_name, + caches->ckd_size, + 0, 0); + if (*caches->ckd_cache == NULL) { + result = -ENOMEM; + break; + } } return result; } -EXPORT_SYMBOL(fid_unpack); +EXPORT_SYMBOL(lu_kmem_init); -const char *lu_time_names[LU_TIME_NR] = { - [LU_TIME_FIND_LOOKUP] = "find_lookup", - [LU_TIME_FIND_ALLOC] = "find_alloc", - [LU_TIME_FIND_INSERT] = "find_insert" -}; -EXPORT_SYMBOL(lu_time_names); +/** + * Helper function to finalize a number of kmem slab cached at once. Dual to + * lu_kmem_init(). + */ +void lu_kmem_fini(struct lu_kmem_descr *caches) +{ + int rc; + + for (; caches->ckd_cache != NULL; ++caches) { + if (*caches->ckd_cache != NULL) { + rc = cfs_mem_cache_destroy(*caches->ckd_cache); + LASSERTF(rc == 0, "couldn't destroy %s slab\n", + caches->ckd_name); + *caches->ckd_cache = NULL; + } + } +} +EXPORT_SYMBOL(lu_kmem_fini);