1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Copyright (C) 2006 Cluster File Systems, Inc.
8 * This file is part of the Lustre file system, http://www.lustre.org
9 * Lustre is a trademark of Cluster File Systems, Inc.
11 * You may have signed or agreed to another license before downloading
12 * this software. If so, you are bound by the terms and conditions
13 * of that agreement, and the following does not apply to you. See the
14 * LICENSE file included with this distribution for more information.
16 * If you did not agree to a different license, then this copy of Lustre
17 * is open source software; you can redistribute it and/or modify it
18 * under the terms of version 2 of the GNU General Public License as
19 * published by the Free Software Foundation.
21 * In either case, Lustre is distributed in the hope that it will be
22 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24 * license text for more details.
26 * These are the only exported functions, they provide some generic
27 * infrastructure for managing object devices
30 #define DEBUG_SUBSYSTEM S_CLASS
32 # define EXPORT_SYMTAB
35 #include <linux/seq_file.h>
36 #include <linux/module.h>
37 #include <obd_support.h>
38 #include <lustre_disk.h>
39 #include <lu_object.h>
40 #include <libcfs/list.h>
42 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o);
45 * Decrease reference counter on object. If last reference is freed, return
46 * object to the cache, unless lu_object_is_dying(o) holds. In the latter
47 * case, free object immediately.
49 void lu_object_put(const struct lu_context *ctxt, struct lu_object *o)
51 struct lu_object_header *top;
55 site = o->lo_dev->ld_site;
56 spin_lock(&site->ls_guard);
57 if (-- top->loh_ref == 0) {
59 * When last reference is released, iterate over object
60 * layers, and notify them that object is no longer busy.
62 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
63 if (o->lo_ops->loo_object_release != NULL)
64 o->lo_ops->loo_object_release(ctxt, o);
67 if (lu_object_is_dying(top)) {
69 * If object is dying (will not be cached), removed it
70 * from hash table and LRU.
72 * This is done with hash table and LRU lists
73 * locked. As the only way to acquire first reference
74 * to previously unreferenced object is through
75 * hash-table lookup (lu_object_find()), or LRU
76 * scanning (lu_site_purge()), that are done under
77 * hash-table and LRU lock, no race with concurrent
78 * object lookup is possible and we can safely destroy
81 hlist_del_init(&top->loh_hash);
82 list_del_init(&top->loh_lru);
85 spin_unlock(&site->ls_guard);
86 if (lu_object_is_dying(top))
88 * Object was already removed from hash and lru above, can
91 lu_object_free(ctxt, o);
93 EXPORT_SYMBOL(lu_object_put);
96 * Allocate new object.
98 * This follows object creation protocol, described in the comment within
99 * struct lu_device_operations definition.
101 static struct lu_object *lu_object_alloc(const struct lu_context *ctxt,
103 const struct lu_fid *f)
105 struct lu_object *scan;
106 struct lu_object *top;
111 * Create top-level object slice. This will also create
114 top = s->ls_top_dev->ld_ops->ldo_object_alloc(ctxt,
115 NULL, s->ls_top_dev);
120 * This is the only place where object fid is assigned. It's constant
123 top->lo_header->loh_fid = *f;
126 * Call ->loo_object_init() repeatedly, until no more new
127 * object slices are created.
130 list_for_each_entry(scan,
131 &top->lo_header->loh_layers, lo_linkage) {
132 if (scan->lo_flags & LU_OBJECT_ALLOCATED)
135 scan->lo_header = top->lo_header;
136 result = scan->lo_ops->loo_object_init(ctxt, scan);
138 lu_object_free(ctxt, top);
139 RETURN(ERR_PTR(result));
141 scan->lo_flags |= LU_OBJECT_ALLOCATED;
144 s->ls_stats.s_created ++;
151 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o)
153 struct list_head splice;
154 struct lu_object *scan;
157 * First call ->loo_object_delete() method to release all resources.
159 list_for_each_entry_reverse(scan,
160 &o->lo_header->loh_layers, lo_linkage) {
161 if (scan->lo_ops->loo_object_delete != NULL)
162 scan->lo_ops->loo_object_delete(ctx, scan);
164 -- o->lo_dev->ld_site->ls_total;
166 * Then, splice object layers into stand-alone list, and call
167 * ->loo_object_free() on all layers to free memory. Splice is
168 * necessary, because lu_object_header is freed together with the
171 INIT_LIST_HEAD(&splice);
172 list_splice_init(&o->lo_header->loh_layers, &splice);
173 while (!list_empty(&splice)) {
174 o = container_of0(splice.next, struct lu_object, lo_linkage);
175 list_del_init(&o->lo_linkage);
176 LASSERT(o->lo_ops->loo_object_free != NULL);
177 o->lo_ops->loo_object_free(ctx, o);
182 * Free @nr objects from the cold end of the site LRU list.
184 void lu_site_purge(const struct lu_context *ctx, struct lu_site *s, int nr)
186 struct list_head dispose;
187 struct lu_object_header *h;
188 struct lu_object_header *temp;
190 INIT_LIST_HEAD(&dispose);
192 * Under LRU list lock, scan LRU list and move unreferenced objects to
193 * the dispose list, removing them from LRU and hash table.
195 spin_lock(&s->ls_guard);
196 list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
201 hlist_del_init(&h->loh_hash);
202 list_move(&h->loh_lru, &dispose);
204 spin_unlock(&s->ls_guard);
206 * Free everything on the dispose list. This is safe against races due
207 * to the reasons described in lu_object_put().
209 while (!list_empty(&dispose)) {
210 h = container_of0(dispose.next,
211 struct lu_object_header, loh_lru);
212 list_del_init(&h->loh_lru);
213 lu_object_free(ctx, lu_object_top(h));
214 s->ls_stats.s_lru_purged ++;
217 EXPORT_SYMBOL(lu_site_purge);
220 * Print human readable representation of the @o to the @f.
222 int lu_object_print(const struct lu_context *ctx,
223 struct seq_file *f, const struct lu_object *o)
225 static char ruler[] = "........................................";
226 const struct lu_object *scan;
232 list_for_each_entry_continue(scan, &o->lo_linkage, lo_linkage) {
233 depth = scan->lo_depth;
234 if (depth <= o->lo_depth && scan != o)
236 LASSERT(scan->lo_ops->loo_object_print != NULL);
238 * print `.' @depth times.
240 nob += seq_printf(f, "%*.*s", depth, depth, ruler);
241 nob += scan->lo_ops->loo_object_print(ctx, f, scan);
242 nob += seq_printf(f, "\n");
246 EXPORT_SYMBOL(lu_object_print);
249 static struct lu_object *htable_lookup(struct lu_site *s,
250 const struct hlist_head *bucket,
251 const struct lu_fid *f)
253 struct lu_object_header *h;
254 struct hlist_node *scan;
256 hlist_for_each_entry(h, scan, bucket, loh_hash) {
257 s->ls_stats.s_cache_check ++;
258 if (lu_fid_eq(&h->loh_fid, f) && !lu_object_is_dying(h)) {
259 /* bump reference count... */
260 if (h->loh_ref ++ == 0)
262 /* and move to the head of the LRU */
263 list_move_tail(&h->loh_lru, &s->ls_lru);
264 s->ls_stats.s_cache_hit ++;
265 return lu_object_top(h);
268 s->ls_stats.s_cache_miss ++;
272 static __u32 fid_hash(const struct lu_fid *f)
274 /* all objects with same id and different versions will belong to same
275 * collisions list. */
276 return (fid_seq(f) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f);
280 * Search cache for an object with the fid @f. If such object is found, return
281 * it. Otherwise, create new object, insert it into cache and return it. In
282 * any case, additional reference is acquired on the returned object.
284 struct lu_object *lu_object_find(const struct lu_context *ctxt,
285 struct lu_site *s, const struct lu_fid *f)
288 struct lu_object *shadow;
289 struct hlist_head *bucket;
292 * This uses standard index maintenance protocol:
294 * - search index under lock, and return object if found;
295 * - otherwise, unlock index, allocate new object;
296 * - lock index and search again;
297 * - if nothing is found (usual case), insert newly created
299 * - otherwise (race: other thread inserted object), free
300 * object just allocated.
305 bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
306 spin_lock(&s->ls_guard);
307 o = htable_lookup(s, bucket, f);
309 spin_unlock(&s->ls_guard);
313 * Allocate new object. This may result in rather complicated
314 * operations, including fld queries, inode loading, etc.
316 o = lu_object_alloc(ctxt, s, f);
320 LASSERT(lu_fid_eq(lu_object_fid(o), f));
322 spin_lock(&s->ls_guard);
323 shadow = htable_lookup(s, bucket, f);
324 if (shadow == NULL) {
325 hlist_add_head(&o->lo_header->loh_hash, bucket);
326 list_add_tail(&s->ls_lru, &o->lo_header->loh_lru);
331 s->ls_stats.s_cache_race ++;
332 spin_unlock(&s->ls_guard);
334 lu_object_free(ctxt, o);
337 EXPORT_SYMBOL(lu_object_find);
340 LU_SITE_HTABLE_BITS = 8,
341 LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS),
342 LU_SITE_HTABLE_MASK = LU_SITE_HTABLE_SIZE - 1
346 * Initialize site @s, with @d as the top level device.
348 int lu_site_init(struct lu_site *s, struct lu_device *top)
353 memset(s, 0, sizeof *s);
354 spin_lock_init(&s->ls_guard);
355 CFS_INIT_LIST_HEAD(&s->ls_lru);
360 * XXX nikita: fixed size hash-table.
362 s->ls_hash_mask = LU_SITE_HTABLE_MASK;
363 OBD_ALLOC(s->ls_hash, LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
364 if (s->ls_hash != NULL) {
366 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
367 INIT_HLIST_HEAD(&s->ls_hash[i]);
375 EXPORT_SYMBOL(lu_site_init);
378 * Finalize @s and release its resources.
380 void lu_site_fini(struct lu_site *s)
382 LASSERT(list_empty(&s->ls_lru));
383 LASSERT(s->ls_total == 0);
384 LASSERT(s->ls_busy == 0);
386 if (s->ls_hash != NULL) {
388 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
389 LASSERT(hlist_empty(&s->ls_hash[i]));
391 LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
394 if (s->ls_top_dev != NULL) {
395 s->ls_top_dev->ld_site = NULL;
396 lu_device_put(s->ls_top_dev);
397 s->ls_top_dev = NULL;
400 EXPORT_SYMBOL(lu_site_fini);
403 * Acquire additional reference on device @d
405 void lu_device_get(struct lu_device *d)
407 atomic_inc(&d->ld_ref);
409 EXPORT_SYMBOL(lu_device_get);
412 * Release reference on device @d.
414 void lu_device_put(struct lu_device *d)
416 atomic_dec(&d->ld_ref);
418 EXPORT_SYMBOL(lu_device_put);
421 * Initialize device @d of type @t.
423 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
425 memset(d, 0, sizeof *d);
426 atomic_set(&d->ld_ref, 0);
430 EXPORT_SYMBOL(lu_device_init);
433 * Finalize device @d.
435 void lu_device_fini(struct lu_device *d)
437 LASSERT(atomic_read(&d->ld_ref) == 0);
439 EXPORT_SYMBOL(lu_device_fini);
442 * Initialize object @o that is part of compound object @h and was created by
445 int lu_object_init(struct lu_object *o,
446 struct lu_object_header *h, struct lu_device *d)
448 memset(o, 0, sizeof *o);
452 CFS_INIT_LIST_HEAD(&o->lo_linkage);
455 EXPORT_SYMBOL(lu_object_init);
458 * Finalize object and release its resources.
460 void lu_object_fini(struct lu_object *o)
462 LASSERT(list_empty(&o->lo_linkage));
464 if (o->lo_dev != NULL) {
465 lu_device_put(o->lo_dev);
469 EXPORT_SYMBOL(lu_object_fini);
472 * Add object @o as first layer of compound object @h
474 * This is typically called by the ->ldo_object_alloc() method of top-level
477 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
479 list_move(&o->lo_linkage, &h->loh_layers);
481 EXPORT_SYMBOL(lu_object_add_top);
484 * Add object @o as a layer of compound object, going after @before.1
486 * This is typically called by the ->ldo_object_alloc() method of
489 void lu_object_add(struct lu_object *before, struct lu_object *o)
491 list_move(&o->lo_linkage, &before->lo_linkage);
493 EXPORT_SYMBOL(lu_object_add);
496 * Initialize compound object.
498 int lu_object_header_init(struct lu_object_header *h)
500 memset(h, 0, sizeof *h);
502 INIT_HLIST_NODE(&h->loh_hash);
503 CFS_INIT_LIST_HEAD(&h->loh_lru);
504 CFS_INIT_LIST_HEAD(&h->loh_layers);
507 EXPORT_SYMBOL(lu_object_header_init);
510 * Finalize compound object.
512 void lu_object_header_fini(struct lu_object_header *h)
514 LASSERT(list_empty(&h->loh_layers));
515 LASSERT(list_empty(&h->loh_lru));
516 LASSERT(hlist_unhashed(&h->loh_hash));
518 EXPORT_SYMBOL(lu_object_header_fini);
521 * Given a compound object, find its slice, corresponding to the device type
524 struct lu_object *lu_object_locate(struct lu_object_header *h,
525 struct lu_device_type *dtype)
529 list_for_each_entry(o, &h->loh_layers, lo_linkage) {
530 if (o->lo_dev->ld_type == dtype)
535 EXPORT_SYMBOL(lu_object_locate);
539 * Maximal number of tld slots.
541 LU_CONTEXT_KEY_NR = 16
544 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
546 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
551 int lu_context_key_register(struct lu_context_key *key)
557 spin_lock(&lu_keys_guard);
558 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
559 if (lu_keys[i] == NULL) {
567 spin_unlock(&lu_keys_guard);
570 EXPORT_SYMBOL(lu_context_key_register);
575 void lu_context_key_degister(struct lu_context_key *key)
577 LASSERT(key->lct_used >= 1);
578 LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
580 if (key->lct_used > 1)
581 CERROR("key has instances.\n");
582 spin_lock(&lu_keys_guard);
583 lu_keys[key->lct_index] = NULL;
584 spin_unlock(&lu_keys_guard);
586 EXPORT_SYMBOL(lu_context_key_degister);
589 * Return value associated with key @key in context @ctx.
591 void *lu_context_key_get(const struct lu_context *ctx,
592 struct lu_context_key *key)
594 LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
595 return ctx->lc_value[key->lct_index];
597 EXPORT_SYMBOL(lu_context_key_get);
599 static void keys_fini(struct lu_context *ctx)
603 if (ctx->lc_value != NULL) {
604 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
605 if (ctx->lc_value[i] != NULL) {
606 struct lu_context_key *key;
609 LASSERT(key != NULL);
610 LASSERT(key->lct_fini != NULL);
611 LASSERT(key->lct_used > 1);
613 key->lct_fini(ctx, key, ctx->lc_value[i]);
615 ctx->lc_value[i] = NULL;
618 OBD_FREE(ctx->lc_value,
619 ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
620 ctx->lc_value = NULL;
624 static int keys_init(struct lu_context *ctx)
629 OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
630 if (ctx->lc_value != NULL) {
631 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
632 struct lu_context_key *key;
638 LASSERT(key->lct_init != NULL);
639 LASSERT(key->lct_index == i);
641 value = key->lct_init(ctx, key);
644 return PTR_ERR(value);
647 ctx->lc_value[i] = value;
657 * Initialize context data-structure. Create values for all keys.
659 int lu_context_init(struct lu_context *ctx)
661 memset(ctx, 0, sizeof *ctx);
665 EXPORT_SYMBOL(lu_context_init);
668 * Finalize context data-structure. Destroy key values.
670 void lu_context_fini(struct lu_context *ctx)
674 EXPORT_SYMBOL(lu_context_fini);
677 * Called before entering context.
679 void lu_context_enter(struct lu_context *ctx)
682 EXPORT_SYMBOL(lu_context_enter);
685 * Called after exiting from @ctx
687 void lu_context_exit(struct lu_context *ctx)
690 EXPORT_SYMBOL(lu_context_exit);