Whamcloud - gitweb
- added working proto of sequences manager with super-, meta- sequence approach....
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 #include <obd_support.h>
39 #include <lustre_disk.h>
40 #include <lustre_fid.h>
41 #include <lu_object.h>
42 #include <libcfs/list.h>
43
44 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o);
45
46 /*
47  * Decrease reference counter on object. If last reference is freed, return
48  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
49  * case, free object immediately.
50  */
51 void lu_object_put(const struct lu_context *ctxt, struct lu_object *o)
52 {
53         struct lu_object_header *top;
54         struct lu_site          *site;
55         int                      kill_it;
56
57         top = o->lo_header;
58         site = o->lo_dev->ld_site;
59         kill_it = 0;
60         spin_lock(&site->ls_guard);
61         if (-- top->loh_ref == 0) {
62                 /*
63                  * When last reference is released, iterate over object
64                  * layers, and notify them that object is no longer busy.
65                  */
66                 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
67                         if (o->lo_ops->loo_object_release != NULL)
68                                 o->lo_ops->loo_object_release(ctxt, o);
69                 }
70                 -- site->ls_busy;
71                 if (lu_object_is_dying(top)) {
72                         /*
73                          * If object is dying (will not be cached), removed it
74                          * from hash table and LRU.
75                          *
76                          * This is done with hash table and LRU lists
77                          * locked. As the only way to acquire first reference
78                          * to previously unreferenced object is through
79                          * hash-table lookup (lu_object_find()), or LRU
80                          * scanning (lu_site_purge()), that are done under
81                          * hash-table and LRU lock, no race with concurrent
82                          * object lookup is possible and we can safely destroy
83                          * object below.
84                          */
85                         hlist_del_init(&top->loh_hash);
86                         list_del_init(&top->loh_lru);
87                         kill_it = 1;
88                 }
89         }
90         spin_unlock(&site->ls_guard);
91         if (kill_it)
92                 /*
93                  * Object was already removed from hash and lru above, can
94                  * kill it.
95                  */
96                 lu_object_free(ctxt, o);
97 }
98 EXPORT_SYMBOL(lu_object_put);
99
100 /*
101  * Allocate new object.
102  *
103  * This follows object creation protocol, described in the comment within
104  * struct lu_device_operations definition.
105  */
106 static struct lu_object *lu_object_alloc(const struct lu_context *ctxt,
107                                          struct lu_site *s,
108                                          const struct lu_fid *f)
109 {
110         struct lu_object *scan;
111         struct lu_object *top;
112         int clean;
113         int result;
114
115         /*
116          * Create top-level object slice. This will also create
117          * lu_object_header.
118          */
119         top = s->ls_top_dev->ld_ops->ldo_object_alloc(ctxt,
120                                                       NULL, s->ls_top_dev);
121         if (IS_ERR(top))
122                 RETURN(top);
123         s->ls_total ++;
124         /*
125          * This is the only place where object fid is assigned. It's constant
126          * after this point.
127          */
128         top->lo_header->loh_fid = *f;
129         do {
130                 /*
131                  * Call ->loo_object_init() repeatedly, until no more new
132                  * object slices are created.
133                  */
134                 clean = 1;
135                 list_for_each_entry(scan,
136                                     &top->lo_header->loh_layers, lo_linkage) {
137                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
138                                 continue;
139                         clean = 0;
140                         scan->lo_header = top->lo_header;
141                         result = scan->lo_ops->loo_object_init(ctxt, scan);
142                         if (result != 0) {
143                                 lu_object_free(ctxt, top);
144                                 RETURN(ERR_PTR(result));
145                         }
146                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
147                 }
148         } while (!clean);
149         s->ls_stats.s_created ++;
150         RETURN(top);
151 }
152
153 /*
154  * Free object.
155  */
156 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o)
157 {
158         struct list_head splice;
159         struct lu_object *scan;
160
161         /*
162          * First call ->loo_object_delete() method to release all resources.
163          */
164         list_for_each_entry_reverse(scan,
165                                     &o->lo_header->loh_layers, lo_linkage) {
166                 if (scan->lo_ops->loo_object_delete != NULL)
167                         scan->lo_ops->loo_object_delete(ctx, scan);
168         }
169         -- o->lo_dev->ld_site->ls_total;
170         /*
171          * Then, splice object layers into stand-alone list, and call
172          * ->loo_object_free() on all layers to free memory. Splice is
173          * necessary, because lu_object_header is freed together with the
174          * top-level slice.
175          */
176         INIT_LIST_HEAD(&splice);
177         list_splice_init(&o->lo_header->loh_layers, &splice);
178         while (!list_empty(&splice)) {
179                 o = container_of0(splice.next, struct lu_object, lo_linkage);
180                 list_del_init(&o->lo_linkage);
181                 LASSERT(o->lo_ops->loo_object_free != NULL);
182                 o->lo_ops->loo_object_free(ctx, o);
183         }
184 }
185
186 /*
187  * Free @nr objects from the cold end of the site LRU list.
188  */
189 void lu_site_purge(const struct lu_context *ctx, struct lu_site *s, int nr)
190 {
191         struct list_head         dispose;
192         struct lu_object_header *h;
193         struct lu_object_header *temp;
194
195         INIT_LIST_HEAD(&dispose);
196         /*
197          * Under LRU list lock, scan LRU list and move unreferenced objects to
198          * the dispose list, removing them from LRU and hash table.
199          */
200         spin_lock(&s->ls_guard);
201         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
202                 if (nr-- == 0)
203                         break;
204                 if (h->loh_ref > 0)
205                         continue;
206                 hlist_del_init(&h->loh_hash);
207                 list_move(&h->loh_lru, &dispose);
208         }
209         spin_unlock(&s->ls_guard);
210         /*
211          * Free everything on the dispose list. This is safe against races due
212          * to the reasons described in lu_object_put().
213          */
214         while (!list_empty(&dispose)) {
215                 h = container_of0(dispose.next,
216                                  struct lu_object_header, loh_lru);
217                 list_del_init(&h->loh_lru);
218                 lu_object_free(ctx, lu_object_top(h));
219                 s->ls_stats.s_lru_purged ++;
220         }
221 }
222 EXPORT_SYMBOL(lu_site_purge);
223
224 /*
225  * Print human readable representation of the @o to the @f.
226  */
227 int lu_object_print(const struct lu_context *ctx,
228                     struct seq_file *f, const struct lu_object *o)
229 {
230         static char ruler[] = "........................................";
231         struct lu_object_header *top;
232         int nob;
233         int depth;
234
235         nob = 0;
236         top = o->lo_header;
237         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
238                 depth = o->lo_depth;
239                 LASSERT(o->lo_ops->loo_object_print != NULL);
240                 /*
241                  * print `.' @depth times.
242                  */
243                 nob += seq_printf(f, "%*.*s", depth, depth, ruler);
244                 nob += o->lo_ops->loo_object_print(ctx, f, o);
245                 nob += seq_printf(f, "\n");
246         }
247         return nob;
248 }
249 EXPORT_SYMBOL(lu_object_print);
250
251
252 static struct lu_object *htable_lookup(struct lu_site *s,
253                                        const struct hlist_head *bucket,
254                                        const struct lu_fid *f)
255 {
256         struct lu_object_header *h;
257         struct hlist_node *scan;
258
259         hlist_for_each_entry(h, scan, bucket, loh_hash) {
260                 s->ls_stats.s_cache_check ++;
261                 if (lu_fid_eq(&h->loh_fid, f) && !lu_object_is_dying(h)) {
262                         /* bump reference count... */
263                         if (h->loh_ref ++ == 0)
264                                 ++ s->ls_busy;
265                         /* and move to the head of the LRU */
266                         list_move_tail(&h->loh_lru, &s->ls_lru);
267                         s->ls_stats.s_cache_hit ++;
268                         return lu_object_top(h);
269                 }
270         }
271         s->ls_stats.s_cache_miss ++;
272         return NULL;
273 }
274
275 static __u32 fid_hash(const struct lu_fid *f)
276 {
277         /* all objects with same id and different versions will belong to same
278          * collisions list. */
279         return (fid_seq(f) - 1) * LUSTRE_SEQ_WIDTH + fid_oid(f);
280 }
281
282 /*
283  * Search cache for an object with the fid @f. If such object is found, return
284  * it. Otherwise, create new object, insert it into cache and return it. In
285  * any case, additional reference is acquired on the returned object.
286  */
287 struct lu_object *lu_object_find(const struct lu_context *ctxt,
288                                  struct lu_site *s, const struct lu_fid *f)
289 {
290         struct lu_object  *o;
291         struct lu_object  *shadow;
292         struct hlist_head *bucket;
293
294         /*
295          * This uses standard index maintenance protocol:
296          *
297          *     - search index under lock, and return object if found;
298          *     - otherwise, unlock index, allocate new object;
299          *     - lock index and search again;
300          *     - if nothing is found (usual case), insert newly created
301          *       object into index;
302          *     - otherwise (race: other thread inserted object), free
303          *       object just allocated.
304          *     - unlock index;
305          *     - return object.
306          */
307
308         bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
309         spin_lock(&s->ls_guard);
310         o = htable_lookup(s, bucket, f);
311
312         spin_unlock(&s->ls_guard);
313         if (o != NULL)
314                 return o;
315         /*
316          * Allocate new object. This may result in rather complicated
317          * operations, including fld queries, inode loading, etc.
318          */
319         o = lu_object_alloc(ctxt, s, f);
320         if (IS_ERR(o))
321                 return o;
322
323         LASSERT(lu_fid_eq(lu_object_fid(o), f));
324
325         spin_lock(&s->ls_guard);
326         shadow = htable_lookup(s, bucket, f);
327         if (shadow == NULL) {
328                 hlist_add_head(&o->lo_header->loh_hash, bucket);
329                 list_add_tail(&s->ls_lru, &o->lo_header->loh_lru);
330                 ++ s->ls_busy;
331                 shadow = o;
332                 o = NULL;
333         } else
334                 s->ls_stats.s_cache_race ++;
335         spin_unlock(&s->ls_guard);
336         if (o != NULL)
337                 lu_object_free(ctxt, o);
338         return shadow;
339 }
340 EXPORT_SYMBOL(lu_object_find);
341
342 enum {
343         LU_SITE_HTABLE_BITS = 8,
344         LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS),
345         LU_SITE_HTABLE_MASK = LU_SITE_HTABLE_SIZE - 1
346 };
347
348 /*
349  * Initialize site @s, with @d as the top level device.
350  */
351 int lu_site_init(struct lu_site *s, struct lu_device *top)
352 {
353         int result;
354         ENTRY;
355
356         memset(s, 0, sizeof *s);
357         spin_lock_init(&s->ls_guard);
358         CFS_INIT_LIST_HEAD(&s->ls_lru);
359         s->ls_top_dev = top;
360         top->ld_site = s;
361         lu_device_get(top);
362         /*
363          * XXX nikita: fixed size hash-table.
364          */
365         s->ls_hash_mask = LU_SITE_HTABLE_MASK;
366         OBD_ALLOC(s->ls_hash, LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
367         if (s->ls_hash != NULL) {
368                 int i;
369                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
370                         INIT_HLIST_HEAD(&s->ls_hash[i]);
371                 result = 0;
372         } else {
373                 result = -ENOMEM;
374         }
375
376         RETURN(result);
377 }
378 EXPORT_SYMBOL(lu_site_init);
379
380 /*
381  * Finalize @s and release its resources.
382  */
383 void lu_site_fini(struct lu_site *s)
384 {
385         LASSERT(list_empty(&s->ls_lru));
386         LASSERT(s->ls_total == 0);
387         LASSERT(s->ls_busy == 0);
388
389         if (s->ls_hash != NULL) {
390                 int i;
391                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
392                         LASSERT(hlist_empty(&s->ls_hash[i]));
393                 OBD_FREE(s->ls_hash,
394                          LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
395                 s->ls_hash = NULL;
396        }
397        if (s->ls_top_dev != NULL) {
398                s->ls_top_dev->ld_site = NULL;
399                lu_device_put(s->ls_top_dev);
400                s->ls_top_dev = NULL;
401        }
402  }
403 EXPORT_SYMBOL(lu_site_fini);
404
405 /*
406  * Acquire additional reference on device @d
407  */
408 void lu_device_get(struct lu_device *d)
409 {
410         atomic_inc(&d->ld_ref);
411 }
412 EXPORT_SYMBOL(lu_device_get);
413
414 /*
415  * Release reference on device @d.
416  */
417 void lu_device_put(struct lu_device *d)
418 {
419         atomic_dec(&d->ld_ref);
420 }
421 EXPORT_SYMBOL(lu_device_put);
422
423 /*
424  * Initialize device @d of type @t.
425  */
426 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
427 {
428         memset(d, 0, sizeof *d);
429         atomic_set(&d->ld_ref, 0);
430         d->ld_type = t;
431         return 0;
432 }
433 EXPORT_SYMBOL(lu_device_init);
434
435 /*
436  * Finalize device @d.
437  */
438 void lu_device_fini(struct lu_device *d)
439 {
440         LASSERT(atomic_read(&d->ld_ref) == 0);
441 }
442 EXPORT_SYMBOL(lu_device_fini);
443
444 /*
445  * Initialize object @o that is part of compound object @h and was created by
446  * device @d.
447  */
448 int lu_object_init(struct lu_object *o,
449                    struct lu_object_header *h, struct lu_device *d)
450 {
451         memset(o, 0, sizeof *o);
452         o->lo_header = h;
453         o->lo_dev    = d;
454         lu_device_get(d);
455         CFS_INIT_LIST_HEAD(&o->lo_linkage);
456         return 0;
457 }
458 EXPORT_SYMBOL(lu_object_init);
459
460 /*
461  * Finalize object and release its resources.
462  */
463 void lu_object_fini(struct lu_object *o)
464 {
465         LASSERT(list_empty(&o->lo_linkage));
466
467         if (o->lo_dev != NULL) {
468                 lu_device_put(o->lo_dev);
469                 o->lo_dev = NULL;
470         }
471 }
472 EXPORT_SYMBOL(lu_object_fini);
473
474 /*
475  * Add object @o as first layer of compound object @h
476  *
477  * This is typically called by the ->ldo_object_alloc() method of top-level
478  * device.
479  */
480 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
481 {
482         list_move(&o->lo_linkage, &h->loh_layers);
483 }
484 EXPORT_SYMBOL(lu_object_add_top);
485
486 /*
487  * Add object @o as a layer of compound object, going after @before.1
488  *
489  * This is typically called by the ->ldo_object_alloc() method of
490  * @before->lo_dev.
491  */
492 void lu_object_add(struct lu_object *before, struct lu_object *o)
493 {
494         list_move(&o->lo_linkage, &before->lo_linkage);
495 }
496 EXPORT_SYMBOL(lu_object_add);
497
498 /*
499  * Initialize compound object.
500  */
501 int lu_object_header_init(struct lu_object_header *h)
502 {
503         memset(h, 0, sizeof *h);
504         h->loh_ref = 1;
505         INIT_HLIST_NODE(&h->loh_hash);
506         CFS_INIT_LIST_HEAD(&h->loh_lru);
507         CFS_INIT_LIST_HEAD(&h->loh_layers);
508         return 0;
509 }
510 EXPORT_SYMBOL(lu_object_header_init);
511
512 /*
513  * Finalize compound object.
514  */
515 void lu_object_header_fini(struct lu_object_header *h)
516 {
517         LASSERT(list_empty(&h->loh_layers));
518         LASSERT(list_empty(&h->loh_lru));
519         LASSERT(hlist_unhashed(&h->loh_hash));
520 }
521 EXPORT_SYMBOL(lu_object_header_fini);
522
523 /*
524  * Given a compound object, find its slice, corresponding to the device type
525  * @dtype.
526  */
527 struct lu_object *lu_object_locate(struct lu_object_header *h,
528                                    struct lu_device_type *dtype)
529 {
530         struct lu_object *o;
531
532         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
533                 if (o->lo_dev->ld_type == dtype)
534                         return o;
535         }
536         return NULL;
537 }
538 EXPORT_SYMBOL(lu_object_locate);
539
540 enum {
541         /*
542          * Maximal number of tld slots.
543          */
544         LU_CONTEXT_KEY_NR = 16
545 };
546
547 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
548
549 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
550
551 /*
552  * Register new key.
553  */
554 int lu_context_key_register(struct lu_context_key *key)
555 {
556         int result;
557         int i;
558
559         result = -ENFILE;
560         spin_lock(&lu_keys_guard);
561         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
562                 if (lu_keys[i] == NULL) {
563                         key->lct_index = i;
564                         key->lct_used = 1;
565                         lu_keys[i] = key;
566                         result = 0;
567                         break;
568                 }
569         }
570         spin_unlock(&lu_keys_guard);
571         return result;
572 }
573 EXPORT_SYMBOL(lu_context_key_register);
574
575 /*
576  * Deregister key.
577  */
578 void lu_context_key_degister(struct lu_context_key *key)
579 {
580         LASSERT(key->lct_used >= 1);
581         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
582
583         if (key->lct_used > 1)
584                 CERROR("key has instances.\n");
585         spin_lock(&lu_keys_guard);
586         lu_keys[key->lct_index] = NULL;
587         spin_unlock(&lu_keys_guard);
588 }
589 EXPORT_SYMBOL(lu_context_key_degister);
590
591 /*
592  * Return value associated with key @key in context @ctx.
593  */
594 void *lu_context_key_get(const struct lu_context *ctx,
595                          struct lu_context_key *key)
596 {
597         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
598         return ctx->lc_value[key->lct_index];
599 }
600 EXPORT_SYMBOL(lu_context_key_get);
601
602 static void keys_fini(struct lu_context *ctx)
603 {
604         int i;
605
606         if (ctx->lc_value != NULL) {
607                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
608                         if (ctx->lc_value[i] != NULL) {
609                                 struct lu_context_key *key;
610
611                                 key = lu_keys[i];
612                                 LASSERT(key != NULL);
613                                 LASSERT(key->lct_fini != NULL);
614                                 LASSERT(key->lct_used > 1);
615
616                                 key->lct_fini(ctx, key, ctx->lc_value[i]);
617                                 key->lct_used--;
618                                 ctx->lc_value[i] = NULL;
619                         }
620                 }
621                 OBD_FREE(ctx->lc_value,
622                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
623                 ctx->lc_value = NULL;
624         }
625 }
626
627 static int keys_init(struct lu_context *ctx)
628 {
629         int i;
630         int result;
631
632         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
633         if (ctx->lc_value != NULL) {
634                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
635                         struct lu_context_key *key;
636
637                         key = lu_keys[i];
638                         if (key != NULL) {
639                                 void *value;
640
641                                 LASSERT(key->lct_init != NULL);
642                                 LASSERT(key->lct_index == i);
643
644                                 value = key->lct_init(ctx, key);
645                                 if (IS_ERR(value)) {
646                                         keys_fini(ctx);
647                                         return PTR_ERR(value);
648                                 }
649                                 key->lct_used++;
650                                 ctx->lc_value[i] = value;
651                         }
652                 }
653                 result = 0;
654         } else
655                 result = -ENOMEM;
656         return result;
657 }
658
659 /*
660  * Initialize context data-structure. Create values for all keys.
661  */
662 int lu_context_init(struct lu_context *ctx)
663 {
664         memset(ctx, 0, sizeof *ctx);
665         keys_init(ctx);
666         return 0;
667 }
668 EXPORT_SYMBOL(lu_context_init);
669
670 /*
671  * Finalize context data-structure. Destroy key values.
672  */
673 void lu_context_fini(struct lu_context *ctx)
674 {
675         keys_fini(ctx);
676 }
677 EXPORT_SYMBOL(lu_context_fini);
678
679 /*
680  * Called before entering context.
681  */
682 void lu_context_enter(struct lu_context *ctx)
683 {
684 }
685 EXPORT_SYMBOL(lu_context_enter);
686
687 /*
688  * Called after exiting from @ctx
689  */
690 void lu_context_exit(struct lu_context *ctx)
691 {
692 }
693 EXPORT_SYMBOL(lu_context_exit);