1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Copyright (C) 2006 Cluster File Systems, Inc.
7 * Author: Nikita Danilov <nikita@clusterfs.com>
9 * This file is part of the Lustre file system, http://www.lustre.org
10 * Lustre is a trademark of Cluster File Systems, Inc.
12 * You may have signed or agreed to another license before downloading
13 * this software. If so, you are bound by the terms and conditions
14 * of that agreement, and the following does not apply to you. See the
15 * LICENSE file included with this distribution for more information.
17 * If you did not agree to a different license, then this copy of Lustre
18 * is open source software; you can redistribute it and/or modify it
19 * under the terms of version 2 of the GNU General Public License as
20 * published by the Free Software Foundation.
22 * In either case, Lustre is distributed in the hope that it will be
23 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * license text for more details.
27 * These are the only exported functions, they provide some generic
28 * infrastructure for managing object devices
31 #define DEBUG_SUBSYSTEM S_CLASS
33 # define EXPORT_SYMTAB
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 #include <obd_support.h>
39 #include <lustre_disk.h>
40 #include <lustre_fid.h>
41 #include <lu_object.h>
42 #include <libcfs/list.h>
44 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
47 * Decrease reference counter on object. If last reference is freed, return
48 * object to the cache, unless lu_object_is_dying(o) holds. In the latter
49 * case, free object immediately.
51 void lu_object_put(const struct lu_env *env, struct lu_object *o)
53 struct lu_object_header *top;
55 struct lu_object *orig;
59 site = o->lo_dev->ld_site;
62 spin_lock(&site->ls_guard);
63 if (-- top->loh_ref == 0) {
65 * When last reference is released, iterate over object
66 * layers, and notify them that object is no longer busy.
68 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
69 if (o->lo_ops->loo_object_release != NULL)
70 o->lo_ops->loo_object_release(env, o);
73 if (lu_object_is_dying(top)) {
75 * If object is dying (will not be cached), removed it
76 * from hash table and LRU.
78 * This is done with hash table and LRU lists
79 * locked. As the only way to acquire first reference
80 * to previously unreferenced object is through
81 * hash-table lookup (lu_object_find()), or LRU
82 * scanning (lu_site_purge()), that are done under
83 * hash-table and LRU lock, no race with concurrent
84 * object lookup is possible and we can safely destroy
87 hlist_del_init(&top->loh_hash);
88 list_del_init(&top->loh_lru);
92 spin_unlock(&site->ls_guard);
95 * Object was already removed from hash and lru above, can
98 lu_object_free(env, orig);
100 EXPORT_SYMBOL(lu_object_put);
103 * Allocate new object.
105 * This follows object creation protocol, described in the comment within
106 * struct lu_device_operations definition.
108 static struct lu_object *lu_object_alloc(const struct lu_env *env,
110 const struct lu_fid *f,
111 const struct lustre_capa *capa)
113 struct lu_object *scan;
114 struct lu_object *top;
115 struct list_head *layers;
120 * Create top-level object slice. This will also create
123 top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
124 NULL, s->ls_top_dev);
129 * This is the only place where object fid is assigned. It's constant
132 top->lo_header->loh_fid = *f;
133 if (capa == BYPASS_CAPA)
134 lu_object_bypass_capa(top);
136 top->lo_header->loh_capa = *capa;
137 layers = &top->lo_header->loh_layers;
140 * Call ->loo_object_init() repeatedly, until no more new
141 * object slices are created.
144 list_for_each_entry(scan, layers, lo_linkage) {
145 if (scan->lo_flags & LU_OBJECT_ALLOCATED)
148 scan->lo_header = top->lo_header;
149 result = scan->lo_ops->loo_object_init(env, scan);
151 lu_object_free(env, top);
152 RETURN(ERR_PTR(result));
154 scan->lo_flags |= LU_OBJECT_ALLOCATED;
158 list_for_each_entry_reverse(scan, layers, lo_linkage) {
159 if (scan->lo_ops->loo_object_start != NULL) {
160 result = scan->lo_ops->loo_object_start(env, scan);
162 lu_object_free(env, top);
163 RETURN(ERR_PTR(result));
168 s->ls_stats.s_created ++;
175 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
177 struct list_head splice;
178 struct lu_object *scan;
181 * First call ->loo_object_delete() method to release all resources.
183 list_for_each_entry_reverse(scan,
184 &o->lo_header->loh_layers, lo_linkage) {
185 if (scan->lo_ops->loo_object_delete != NULL)
186 scan->lo_ops->loo_object_delete(env, scan);
188 -- o->lo_dev->ld_site->ls_total;
190 * Then, splice object layers into stand-alone list, and call
191 * ->loo_object_free() on all layers to free memory. Splice is
192 * necessary, because lu_object_header is freed together with the
195 INIT_LIST_HEAD(&splice);
196 list_splice_init(&o->lo_header->loh_layers, &splice);
197 while (!list_empty(&splice)) {
198 o = container_of0(splice.next, struct lu_object, lo_linkage);
199 list_del_init(&o->lo_linkage);
200 LASSERT(o->lo_ops->loo_object_free != NULL);
201 o->lo_ops->loo_object_free(env, o);
206 * Free @nr objects from the cold end of the site LRU list.
208 void lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
210 struct list_head dispose;
211 struct lu_object_header *h;
212 struct lu_object_header *temp;
214 INIT_LIST_HEAD(&dispose);
216 * Under LRU list lock, scan LRU list and move unreferenced objects to
217 * the dispose list, removing them from LRU and hash table.
219 spin_lock(&s->ls_guard);
220 list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
225 hlist_del_init(&h->loh_hash);
226 list_move(&h->loh_lru, &dispose);
228 spin_unlock(&s->ls_guard);
230 * Free everything on the dispose list. This is safe against races due
231 * to the reasons described in lu_object_put().
233 while (!list_empty(&dispose)) {
234 h = container_of0(dispose.next,
235 struct lu_object_header, loh_lru);
236 list_del_init(&h->loh_lru);
237 lu_object_free(env, lu_object_top(h));
238 s->ls_stats.s_lru_purged ++;
241 EXPORT_SYMBOL(lu_site_purge);
246 * Code below has to jump through certain loops to output object description
247 * into libcfs_debug_msg-based log. The problem is that lu_object_print()
248 * composes object description from strings that are parts of _lines_ of
249 * output (i.e., strings that are not terminated by newline). This doesn't fit
250 * very well into libcfs_debug_msg() interface that assumes that each message
251 * supplied to it is a self-contained output line.
253 * To work around this, strings are collected in a temporary buffer
254 * (implemented as a value of lu_cdebug_key key), until terminating newline
255 * character is detected.
263 * XXX overflow is not handled correctly.
268 struct lu_cdebug_data {
272 char lck_area[LU_CDEBUG_LINE];
275 static void *lu_cdebug_key_init(const struct lu_context *ctx,
276 struct lu_context_key *key)
278 struct lu_cdebug_data *value;
280 OBD_ALLOC_PTR(value);
282 value = ERR_PTR(-ENOMEM);
286 static void lu_cdebug_key_fini(const struct lu_context *ctx,
287 struct lu_context_key *key, void *data)
289 struct lu_cdebug_data *value = data;
294 * Key, holding temporary buffer. This key is registered very early by
297 static struct lu_context_key lu_cdebug_key = {
298 .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
299 .lct_init = lu_cdebug_key_init,
300 .lct_fini = lu_cdebug_key_fini
304 * Printer function emitting messages through libcfs_debug_msg().
306 int lu_cdebug_printer(const struct lu_env *env,
307 void *cookie, const char *format, ...)
309 struct lu_cdebug_print_info *info = cookie;
310 struct lu_cdebug_data *key;
315 va_start(args, format);
317 key = lu_context_key_get(&env->le_ctx, &lu_cdebug_key);
318 LASSERT(key != NULL);
320 used = strlen(key->lck_area);
321 complete = format[strlen(format) - 1] == '\n';
323 * Append new chunk to the buffer.
325 vsnprintf(key->lck_area + used,
326 ARRAY_SIZE(key->lck_area) - used, format, args);
328 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
329 (char *)info->lpi_file, info->lpi_fn,
330 info->lpi_line, "%s", key->lck_area);
331 key->lck_area[0] = 0;
336 EXPORT_SYMBOL(lu_cdebug_printer);
339 * Print object header.
341 static void lu_object_header_print(const struct lu_env *env,
342 void *cookie, lu_printer_t printer,
343 const struct lu_object_header *hdr)
345 (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s]",
346 hdr, hdr->loh_flags, hdr->loh_ref, PFID(&hdr->loh_fid),
347 hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
348 list_empty(&hdr->loh_lru) ? "" : " lru");
352 * Print human readable representation of the @o to the @printer.
354 void lu_object_print(const struct lu_env *env, void *cookie,
355 lu_printer_t printer, const struct lu_object *o)
357 static const char ruler[] = "........................................";
358 struct lu_object_header *top;
362 lu_object_header_print(env, cookie, printer, top);
363 (*printer)(env, cookie, "\n");
364 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
365 depth = o->lo_depth + 4;
366 LASSERT(o->lo_ops->loo_object_print != NULL);
368 * print `.' @depth times.
370 (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
371 o->lo_ops->loo_object_print(env, cookie, printer, o);
372 (*printer)(env, cookie, "\n");
375 EXPORT_SYMBOL(lu_object_print);
378 * Check object consistency.
380 int lu_object_invariant(const struct lu_object *o)
382 struct lu_object_header *top;
385 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
386 if (o->lo_ops->loo_object_invariant != NULL &&
387 !o->lo_ops->loo_object_invariant(o))
392 EXPORT_SYMBOL(lu_object_invariant);
394 static struct lu_object *htable_lookup(struct lu_site *s,
395 const struct hlist_head *bucket,
396 const struct lu_fid *f)
398 struct lu_object_header *h;
399 struct hlist_node *scan;
401 hlist_for_each_entry(h, scan, bucket, loh_hash) {
402 s->ls_stats.s_cache_check ++;
403 if (lu_fid_eq(&h->loh_fid, f) && !lu_object_is_dying(h)) {
404 /* bump reference count... */
405 if (h->loh_ref ++ == 0)
407 /* and move to the head of the LRU */
408 list_move_tail(&h->loh_lru, &s->ls_lru);
409 s->ls_stats.s_cache_hit ++;
410 return lu_object_top(h);
413 s->ls_stats.s_cache_miss ++;
417 static __u32 fid_hash(const struct lu_fid *f)
419 /* all objects with same id and different versions will belong to same
420 * collisions list. */
421 return (fid_seq(f) - 1) * LUSTRE_SEQ_MAX_WIDTH + fid_oid(f);
425 * Search cache for an object with the fid @f. If such object is found, return
426 * it. Otherwise, create new object, insert it into cache and return it. In
427 * any case, additional reference is acquired on the returned object.
429 struct lu_object *lu_object_find(const struct lu_env *env,
430 struct lu_site *s, const struct lu_fid *f,
431 struct lustre_capa *capa)
434 struct lu_object *shadow;
435 struct hlist_head *bucket;
439 * This uses standard index maintenance protocol:
441 * - search index under lock, and return object if found;
442 * - otherwise, unlock index, allocate new object;
443 * - lock index and search again;
444 * - if nothing is found (usual case), insert newly created
446 * - otherwise (race: other thread inserted object), free
447 * object just allocated.
452 bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
453 spin_lock(&s->ls_guard);
454 o = htable_lookup(s, bucket, f);
456 spin_unlock(&s->ls_guard);
458 if (capa == BYPASS_CAPA) {
459 o->lo_header->loh_capa_bypass = 1;
461 rc = lu_object_auth(env, o, capa,
462 CAPA_OPC_INDEX_LOOKUP);
466 o->lo_header->loh_capa = *capa;
472 * Allocate new object. This may result in rather complicated
473 * operations, including fld queries, inode loading, etc.
475 o = lu_object_alloc(env, s, f, capa);
479 LASSERT(lu_fid_eq(lu_object_fid(o), f));
481 spin_lock(&s->ls_guard);
482 shadow = htable_lookup(s, bucket, f);
483 if (shadow == NULL) {
484 hlist_add_head(&o->lo_header->loh_hash, bucket);
485 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
490 s->ls_stats.s_cache_race ++;
491 spin_unlock(&s->ls_guard);
493 lu_object_free(env, o);
496 EXPORT_SYMBOL(lu_object_find);
498 int lu_object_auth(const struct lu_env *env, const struct lu_object *o,
499 struct lustre_capa *capa, __u64 opc)
501 struct lu_object_header *top = o->lo_header;
504 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
505 if (o->lo_ops->loo_object_auth) {
506 rc = o->lo_ops->loo_object_auth(env, o, capa, opc);
514 EXPORT_SYMBOL(lu_object_auth);
517 LU_SITE_HTABLE_BITS = 8,
518 LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS),
519 LU_SITE_HTABLE_MASK = LU_SITE_HTABLE_SIZE - 1
523 * Initialize site @s, with @d as the top level device.
525 int lu_site_init(struct lu_site *s, struct lu_device *top)
530 memset(s, 0, sizeof *s);
531 spin_lock_init(&s->ls_guard);
532 CFS_INIT_LIST_HEAD(&s->ls_lru);
537 * XXX nikita: fixed size hash-table.
539 s->ls_hash_mask = LU_SITE_HTABLE_MASK;
540 OBD_ALLOC(s->ls_hash, LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
541 if (s->ls_hash != NULL) {
543 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
544 INIT_HLIST_HEAD(&s->ls_hash[i]);
552 EXPORT_SYMBOL(lu_site_init);
555 * Finalize @s and release its resources.
557 void lu_site_fini(struct lu_site *s)
559 LASSERT(list_empty(&s->ls_lru));
560 LASSERT(s->ls_total == 0);
561 LASSERT(s->ls_busy == 0);
563 if (s->ls_hash != NULL) {
565 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
566 LASSERT(hlist_empty(&s->ls_hash[i]));
568 LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
571 if (s->ls_top_dev != NULL) {
572 s->ls_top_dev->ld_site = NULL;
573 lu_device_put(s->ls_top_dev);
574 s->ls_top_dev = NULL;
577 EXPORT_SYMBOL(lu_site_fini);
580 * Acquire additional reference on device @d
582 void lu_device_get(struct lu_device *d)
584 atomic_inc(&d->ld_ref);
586 EXPORT_SYMBOL(lu_device_get);
589 * Release reference on device @d.
591 void lu_device_put(struct lu_device *d)
593 atomic_dec(&d->ld_ref);
595 EXPORT_SYMBOL(lu_device_put);
598 * Initialize device @d of type @t.
600 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
602 memset(d, 0, sizeof *d);
603 atomic_set(&d->ld_ref, 0);
607 EXPORT_SYMBOL(lu_device_init);
610 * Finalize device @d.
612 void lu_device_fini(struct lu_device *d)
614 if (d->ld_obd != NULL)
616 lprocfs_obd_cleanup(d->ld_obd);
618 LASSERTF(atomic_read(&d->ld_ref) == 0,
619 "Refcount is %u\n", atomic_read(&d->ld_ref));
621 EXPORT_SYMBOL(lu_device_fini);
624 * Initialize object @o that is part of compound object @h and was created by
627 int lu_object_init(struct lu_object *o,
628 struct lu_object_header *h, struct lu_device *d)
630 memset(o, 0, sizeof *o);
634 CFS_INIT_LIST_HEAD(&o->lo_linkage);
637 EXPORT_SYMBOL(lu_object_init);
640 * Finalize object and release its resources.
642 void lu_object_fini(struct lu_object *o)
644 LASSERT(list_empty(&o->lo_linkage));
646 if (o->lo_dev != NULL) {
647 lu_device_put(o->lo_dev);
651 EXPORT_SYMBOL(lu_object_fini);
654 * Add object @o as first layer of compound object @h
656 * This is typically called by the ->ldo_object_alloc() method of top-level
659 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
661 list_move(&o->lo_linkage, &h->loh_layers);
663 EXPORT_SYMBOL(lu_object_add_top);
666 * Add object @o as a layer of compound object, going after @before.1
668 * This is typically called by the ->ldo_object_alloc() method of
671 void lu_object_add(struct lu_object *before, struct lu_object *o)
673 list_move(&o->lo_linkage, &before->lo_linkage);
675 EXPORT_SYMBOL(lu_object_add);
678 * Initialize compound object.
680 int lu_object_header_init(struct lu_object_header *h)
682 memset(h, 0, sizeof *h);
684 INIT_HLIST_NODE(&h->loh_hash);
685 CFS_INIT_LIST_HEAD(&h->loh_lru);
686 CFS_INIT_LIST_HEAD(&h->loh_layers);
689 EXPORT_SYMBOL(lu_object_header_init);
692 * Finalize compound object.
694 void lu_object_header_fini(struct lu_object_header *h)
696 LASSERT(list_empty(&h->loh_layers));
697 LASSERT(list_empty(&h->loh_lru));
698 LASSERT(hlist_unhashed(&h->loh_hash));
700 EXPORT_SYMBOL(lu_object_header_fini);
703 * Given a compound object, find its slice, corresponding to the device type
706 struct lu_object *lu_object_locate(struct lu_object_header *h,
707 struct lu_device_type *dtype)
711 list_for_each_entry(o, &h->loh_layers, lo_linkage) {
712 if (o->lo_dev->ld_type == dtype)
717 EXPORT_SYMBOL(lu_object_locate);
721 * Maximal number of tld slots.
723 LU_CONTEXT_KEY_NR = 16
726 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
728 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
733 int lu_context_key_register(struct lu_context_key *key)
738 LASSERT(key->lct_init != NULL);
739 LASSERT(key->lct_fini != NULL);
740 LASSERT(key->lct_tags != 0);
743 spin_lock(&lu_keys_guard);
744 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
745 if (lu_keys[i] == NULL) {
747 atomic_set(&key->lct_used, 1);
753 spin_unlock(&lu_keys_guard);
756 EXPORT_SYMBOL(lu_context_key_register);
761 void lu_context_key_degister(struct lu_context_key *key)
763 LASSERT(atomic_read(&key->lct_used) >= 1);
764 LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
766 if (atomic_read(&key->lct_used) > 1)
767 CERROR("key has instances.\n");
768 spin_lock(&lu_keys_guard);
769 lu_keys[key->lct_index] = NULL;
770 spin_unlock(&lu_keys_guard);
772 EXPORT_SYMBOL(lu_context_key_degister);
775 * Return value associated with key @key in context @ctx.
777 void *lu_context_key_get(const struct lu_context *ctx,
778 struct lu_context_key *key)
780 LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
781 return ctx->lc_value[key->lct_index];
783 EXPORT_SYMBOL(lu_context_key_get);
785 static void keys_fini(struct lu_context *ctx)
789 if (ctx->lc_value != NULL) {
790 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
791 if (ctx->lc_value[i] != NULL) {
792 struct lu_context_key *key;
795 LASSERT(key != NULL);
796 LASSERT(key->lct_fini != NULL);
797 LASSERT(atomic_read(&key->lct_used) > 1);
799 key->lct_fini(ctx, key, ctx->lc_value[i]);
800 atomic_dec(&key->lct_used);
801 ctx->lc_value[i] = NULL;
804 OBD_FREE(ctx->lc_value,
805 ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
806 ctx->lc_value = NULL;
810 static int keys_fill(const struct lu_context *ctx)
814 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
815 struct lu_context_key *key;
818 if (ctx->lc_value[i] == NULL &&
819 key != NULL && key->lct_tags & ctx->lc_tags) {
822 LASSERT(key->lct_init != NULL);
823 LASSERT(key->lct_index == i);
825 value = key->lct_init(ctx, key);
827 return PTR_ERR(value);
828 atomic_inc(&key->lct_used);
829 ctx->lc_value[i] = value;
835 static int keys_init(struct lu_context *ctx)
839 OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
840 if (ctx->lc_value != NULL)
841 result = keys_fill(ctx);
851 * Initialize context data-structure. Create values for all keys.
853 int lu_context_init(struct lu_context *ctx, __u32 tags)
855 memset(ctx, 0, sizeof *ctx);
857 return keys_init(ctx);
859 EXPORT_SYMBOL(lu_context_init);
862 * Finalize context data-structure. Destroy key values.
864 void lu_context_fini(struct lu_context *ctx)
868 EXPORT_SYMBOL(lu_context_fini);
871 * Called before entering context.
873 void lu_context_enter(struct lu_context *ctx)
876 EXPORT_SYMBOL(lu_context_enter);
879 * Called after exiting from @ctx
881 void lu_context_exit(struct lu_context *ctx)
885 if (ctx->lc_value != NULL) {
886 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
887 if (ctx->lc_value[i] != NULL) {
888 struct lu_context_key *key;
891 LASSERT(key != NULL);
892 if (key->lct_exit != NULL)
894 key, ctx->lc_value[i]);
899 EXPORT_SYMBOL(lu_context_exit);
902 * Allocate for context all missing keys that were registered after context
905 int lu_context_refill(const struct lu_context *ctx)
907 LASSERT(ctx->lc_value != NULL);
908 return keys_fill(ctx);
910 EXPORT_SYMBOL(lu_context_refill);
912 int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags)
917 result = lu_context_init(&env->le_ctx, tags);
919 lu_context_enter(&env->le_ctx);
922 EXPORT_SYMBOL(lu_env_init);
924 void lu_env_fini(struct lu_env *env)
926 lu_context_exit(&env->le_ctx);
927 lu_context_fini(&env->le_ctx);
930 EXPORT_SYMBOL(lu_env_fini);
933 * Initialization of global lu_* data.
935 int lu_global_init(void)
939 result = lu_context_key_register(&lu_cdebug_key);
944 * Dual to lu_global_init().
946 void lu_global_fini(void)
948 lu_context_key_degister(&lu_cdebug_key);
951 struct lu_buf LU_BUF_NULL = {
955 EXPORT_SYMBOL(LU_BUF_NULL);