Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  *
26  * These are the only exported functions, they provide some generic
27  * infrastructure for managing object devices
28  */
29
30 #define DEBUG_SUBSYSTEM S_CLASS
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #include <linux/seq_file.h>
36 #include <linux/module.h>
37 #include <obd_support.h>
38 #include <lustre_disk.h>
39 #include <lu_object.h>
40 #include <libcfs/list.h>
41
42 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o);
43
44 /*
45  * Decrease reference counter on object. If last reference is freed, return
46  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
47  * case, free object immediately.
48  */
49 void lu_object_put(const struct lu_context *ctxt, struct lu_object *o)
50 {
51         struct lu_object_header *top;
52         struct lu_site          *site;
53
54         top = o->lo_header;
55         site = o->lo_dev->ld_site;
56         spin_lock(&site->ls_guard);
57         if (-- top->loh_ref == 0) {
58                 /*
59                  * When last reference is released, iterate over object
60                  * layers, and notify them that object is no longer busy.
61                  */
62                 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
63                         if (o->lo_ops->loo_object_release != NULL)
64                                 o->lo_ops->loo_object_release(ctxt, o);
65                 }
66                 -- site->ls_busy;
67                 if (lu_object_is_dying(top)) {
68                         /*
69                          * If object is dying (will not be cached), removed it
70                          * from hash table and LRU.
71                          *
72                          * This is done with hash table and LRU lists
73                          * locked. As the only way to acquire first reference
74                          * to previously unreferenced object is through
75                          * hash-table lookup (lu_object_find()), or LRU
76                          * scanning (lu_site_purge()), that are done under
77                          * hash-table and LRU lock, no race with concurrent
78                          * object lookup is possible and we can safely destroy
79                          * object below.
80                          */
81                         hlist_del_init(&top->loh_hash);
82                         list_del_init(&top->loh_lru);
83                 }
84         }
85         spin_unlock(&site->ls_guard);
86         if (lu_object_is_dying(top))
87                 /*
88                  * Object was already removed from hash and lru above, can
89                  * kill it.
90                  */
91                 lu_object_free(ctxt, o);
92 }
93 EXPORT_SYMBOL(lu_object_put);
94
95 /*
96  * Allocate new object.
97  *
98  * This follows object creation protocol, described in the comment within
99  * struct lu_device_operations definition.
100  */
101 static struct lu_object *lu_object_alloc(const struct lu_context *ctxt,
102                                          struct lu_site *s,
103                                          const struct lu_fid *f)
104 {
105         struct lu_object *scan;
106         struct lu_object *top;
107         int clean;
108         int result;
109
110         /*
111          * Create top-level object slice. This will also create
112          * lu_object_header.
113          */
114         top = s->ls_top_dev->ld_ops->ldo_object_alloc(ctxt, s->ls_top_dev);
115         if (IS_ERR(top))
116                 RETURN(top);
117         s->ls_total ++;
118         /*
119          * This is the only place where object fid is assigned. It's constant
120          * after this point.
121          */
122         top->lo_header->loh_fid = *f;
123         do {
124                 /*
125                  * Call ->loo_object_init() repeatedly, until no more new
126                  * object slices are created.
127                  */
128                 clean = 1;
129                 list_for_each_entry(scan,
130                                     &top->lo_header->loh_layers, lo_linkage) {
131                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
132                                 continue;
133                         clean = 0;
134                         scan->lo_header = top->lo_header;
135                         result = scan->lo_ops->loo_object_init(ctxt, scan);
136                         if (result != 0) {
137                                 lu_object_free(ctxt, top);
138                                 RETURN(ERR_PTR(result));
139                         }
140                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
141                 }
142         } while (!clean);
143         s->ls_stats.s_created ++;
144         RETURN(top);
145 }
146
147 /*
148  * Free object.
149  */
150 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o)
151 {
152         struct list_head splice;
153         struct lu_object *scan;
154
155         /*
156          * First call ->loo_object_delete() method to release all resources.
157          */
158         list_for_each_entry_reverse(scan,
159                                     &o->lo_header->loh_layers, lo_linkage) {
160                 if (scan->lo_ops->loo_object_delete != NULL)
161                         scan->lo_ops->loo_object_delete(ctx, scan);
162         }
163         -- o->lo_dev->ld_site->ls_total;
164         /*
165          * Then, splice object layers into stand-alone list, and call
166          * ->loo_object_free() on all layers to free memory. Splice is
167          * necessary, because lu_object_header is freed together with the
168          * top-level slice.
169          */
170         INIT_LIST_HEAD(&splice);
171         list_splice_init(&o->lo_header->loh_layers, &splice);
172         while (!list_empty(&splice)) {
173                 o = container_of0(splice.next, struct lu_object, lo_linkage);
174                 list_del_init(&o->lo_linkage);
175                 LASSERT(o->lo_ops->loo_object_free != NULL);
176                 o->lo_ops->loo_object_free(ctx, o);
177         }
178 }
179
180 /*
181  * Free @nr objects from the cold end of the site LRU list.
182  */
183 void lu_site_purge(const struct lu_context *ctx, struct lu_site *s, int nr)
184 {
185         struct list_head         dispose;
186         struct lu_object_header *h;
187         struct lu_object_header *temp;
188
189         INIT_LIST_HEAD(&dispose);
190         /*
191          * Under LRU list lock, scan LRU list and move unreferenced objects to
192          * the dispose list, removing them from LRU and hash table.
193          */
194         spin_lock(&s->ls_guard);
195         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
196                 if (nr-- == 0)
197                         break;
198                 if (h->loh_ref > 0)
199                         continue;
200                 hlist_del_init(&h->loh_hash);
201                 list_move(&h->loh_lru, &dispose);
202         }
203         spin_unlock(&s->ls_guard);
204         /*
205          * Free everything on the dispose list. This is safe against races due
206          * to the reasons described in lu_object_put().
207          */
208         while (!list_empty(&dispose)) {
209                 h = container_of0(dispose.next,
210                                  struct lu_object_header, loh_lru);
211                 list_del_init(&h->loh_lru);
212                 lu_object_free(ctx, lu_object_top(h));
213                 s->ls_stats.s_lru_purged ++;
214         }
215 }
216 EXPORT_SYMBOL(lu_site_purge);
217
218 /*
219  * Print human readable representation of the @o to the @f.
220  */
221 int lu_object_print(const struct lu_context *ctx,
222                     struct seq_file *f, const struct lu_object *o)
223 {
224         static char ruler[] = "........................................";
225         const struct lu_object *scan;
226         int nob;
227         int depth;
228
229         nob = 0;
230         scan = o;
231         list_for_each_entry_continue(scan, &o->lo_linkage, lo_linkage) {
232                 depth = scan->lo_depth;
233                 if (depth <= o->lo_depth && scan != o)
234                         break;
235                 LASSERT(scan->lo_ops->loo_object_print != NULL);
236                 /*
237                  * print `.' @depth times.
238                  */
239                 nob += seq_printf(f, "%*.*s", depth, depth, ruler);
240                 nob += scan->lo_ops->loo_object_print(ctx, f, scan);
241                 nob += seq_printf(f, "\n");
242         }
243         return nob;
244 }
245 EXPORT_SYMBOL(lu_object_print);
246
247
248 static struct lu_object *htable_lookup(struct lu_site *s,
249                                        const struct hlist_head *bucket,
250                                        const struct lu_fid *f)
251 {
252         struct lu_object_header *h;
253         struct hlist_node *scan;
254
255         hlist_for_each_entry(h, scan, bucket, loh_hash) {
256                 s->ls_stats.s_cache_check ++;
257                 if (lu_fid_eq(&h->loh_fid, f) && !lu_object_is_dying(h)) {
258                         /* bump reference count... */
259                         if (h->loh_ref ++ == 0)
260                                 ++ s->ls_busy;
261                         /* and move to the head of the LRU */
262                         list_move_tail(&h->loh_lru, &s->ls_lru);
263                         s->ls_stats.s_cache_hit ++;
264                         return lu_object_top(h);
265                 }
266         }
267         s->ls_stats.s_cache_miss ++;
268         return NULL;
269 }
270
271 static __u32 fid_hash(const struct lu_fid *f)
272 {
273         /* all objects with same id and different versions will belong to same
274          * collisions list. */
275         return (fid_seq(f) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f);
276 }
277
278 /*
279  * Search cache for an object with the fid @f. If such object is found, return
280  * it. Otherwise, create new object, insert it into cache and return it. In
281  * any case, additional reference is acquired on the returned object.
282  */
283 struct lu_object *lu_object_find(const struct lu_context *ctxt,
284                                  struct lu_site *s, const struct lu_fid *f)
285 {
286         struct lu_object  *o;
287         struct lu_object  *shadow;
288         struct hlist_head *bucket;
289
290         /*
291          * This uses standard index maintenance protocol:
292          *
293          *     - search index under lock, and return object if found;
294          *     - otherwise, unlock index, allocate new object;
295          *     - lock index and search again;
296          *     - if nothing is found (usual case), insert newly created
297          *       object into index;
298          *     - otherwise (race: other thread inserted object), free
299          *       object just allocated.
300          *     - unlock index;
301          *     - return object.
302          */
303
304         bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
305         spin_lock(&s->ls_guard);
306         o = htable_lookup(s, bucket, f);
307
308         spin_unlock(&s->ls_guard);
309         if (o != NULL)
310                 return o;
311         /*
312          * Allocate new object. This may result in rather complicated
313          * operations, including fld queries, inode loading, etc.
314          */
315         o = lu_object_alloc(ctxt, s, f);
316         if (IS_ERR(o))
317                 return o;
318
319         LASSERT(lu_fid_eq(lu_object_fid(o), f));
320
321         spin_lock(&s->ls_guard);
322         shadow = htable_lookup(s, bucket, f);
323         if (shadow == NULL) {
324                 hlist_add_head(&o->lo_header->loh_hash, bucket);
325                 list_add_tail(&s->ls_lru, &o->lo_header->loh_lru);
326                 shadow = o;
327                 o = NULL;
328         } else
329                 s->ls_stats.s_cache_race ++;
330         spin_unlock(&s->ls_guard);
331         if (o != NULL)
332                 lu_object_free(ctxt, o);
333         return shadow;
334 }
335 EXPORT_SYMBOL(lu_object_find);
336
337 enum {
338         LU_SITE_HTABLE_BITS = 8,
339         LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS),
340         LU_SITE_HTABLE_MASK = LU_SITE_HTABLE_SIZE - 1
341 };
342
343 /*
344  * Initialize site @s, with @d as the top level device.
345  */
346 int lu_site_init(struct lu_site *s, struct lu_device *top)
347 {
348         int result;
349         ENTRY;
350
351         memset(s, 0, sizeof *s);
352         spin_lock_init(&s->ls_guard);
353         CFS_INIT_LIST_HEAD(&s->ls_lru);
354         s->ls_top_dev = top;
355         top->ld_site = s;
356         lu_device_get(top);
357         /*
358          * XXX nikita: fixed size hash-table.
359          */
360         s->ls_hash_mask = LU_SITE_HTABLE_MASK;
361         OBD_ALLOC(s->ls_hash, LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
362         if (s->ls_hash != NULL) {
363                 int i;
364                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
365                         INIT_HLIST_HEAD(&s->ls_hash[i]);
366                 result = 0;
367         } else {
368                 result = -ENOMEM;
369         }
370
371         RETURN(result);
372 }
373 EXPORT_SYMBOL(lu_site_init);
374
375 /*
376  * Finalize @s and release its resources.
377  */
378 void lu_site_fini(struct lu_site *s)
379 {
380         LASSERT(list_empty(&s->ls_lru));
381         LASSERT(s->ls_total == 0);
382         LASSERT(s->ls_busy == 0);
383
384         if (s->ls_hash != NULL) {
385                 int i;
386                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
387                         LASSERT(hlist_empty(&s->ls_hash[i]));
388                 OBD_FREE(s->ls_hash,
389                          LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
390                 s->ls_hash = NULL;
391        }
392        if (s->ls_top_dev != NULL) {
393                s->ls_top_dev->ld_site = NULL;
394                lu_device_put(s->ls_top_dev);
395                s->ls_top_dev = NULL;
396        }
397  }
398 EXPORT_SYMBOL(lu_site_fini);
399
400 /*
401  * Acquire additional reference on device @d
402  */
403 void lu_device_get(struct lu_device *d)
404 {
405         atomic_inc(&d->ld_ref);
406 }
407 EXPORT_SYMBOL(lu_device_get);
408
409 /*
410  * Release reference on device @d.
411  */
412 void lu_device_put(struct lu_device *d)
413 {
414         atomic_dec(&d->ld_ref);
415 }
416 EXPORT_SYMBOL(lu_device_put);
417
418 /*
419  * Initialize device @d of type @t.
420  */
421 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
422 {
423         memset(d, 0, sizeof *d);
424         atomic_set(&d->ld_ref, 0);
425         d->ld_type = t;
426         return 0;
427 }
428 EXPORT_SYMBOL(lu_device_init);
429
430 /*
431  * Finalize device @d.
432  */
433 void lu_device_fini(struct lu_device *d)
434 {
435         LASSERT(atomic_read(&d->ld_ref) == 0);
436 }
437 EXPORT_SYMBOL(lu_device_fini);
438
439 /*
440  * Initialize object @o that is part of compound object @h and was created by
441  * device @d.
442  */
443 int lu_object_init(struct lu_object *o,
444                    struct lu_object_header *h, struct lu_device *d)
445 {
446         memset(o, 0, sizeof *o);
447         o->lo_header = h;
448         o->lo_dev    = d;
449         lu_device_get(d);
450         CFS_INIT_LIST_HEAD(&o->lo_linkage);
451         return 0;
452 }
453 EXPORT_SYMBOL(lu_object_init);
454
455 /*
456  * Finalize object and release its resources.
457  */
458 void lu_object_fini(struct lu_object *o)
459 {
460         LASSERT(list_empty(&o->lo_linkage));
461
462         if (o->lo_dev != NULL) {
463                 lu_device_put(o->lo_dev);
464                 o->lo_dev = NULL;
465         }
466 }
467 EXPORT_SYMBOL(lu_object_fini);
468
469 /*
470  * Add object @o as first layer of compound object @h
471  *
472  * This is typically called by the ->ldo_object_alloc() method of top-level
473  * device.
474  */
475 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
476 {
477         list_move(&o->lo_linkage, &h->loh_layers);
478 }
479 EXPORT_SYMBOL(lu_object_add_top);
480
481 /*
482  * Add object @o as a layer of compound object, going after @before.1
483  *
484  * This is typically called by the ->ldo_object_alloc() method of
485  * @before->lo_dev.
486  */
487 void lu_object_add(struct lu_object *before, struct lu_object *o)
488 {
489         list_move(&o->lo_linkage, &before->lo_linkage);
490 }
491 EXPORT_SYMBOL(lu_object_add);
492
493 /*
494  * Initialize compound object.
495  */
496 int lu_object_header_init(struct lu_object_header *h)
497 {
498         memset(h, 0, sizeof *h);
499         h->loh_ref = 1; 
500         INIT_HLIST_NODE(&h->loh_hash);
501         CFS_INIT_LIST_HEAD(&h->loh_lru);
502         CFS_INIT_LIST_HEAD(&h->loh_layers);
503         return 0;
504 }
505 EXPORT_SYMBOL(lu_object_header_init);
506
507 /*
508  * Finalize compound object.
509  */
510 void lu_object_header_fini(struct lu_object_header *h)
511 {
512         LASSERT(list_empty(&h->loh_layers));
513         LASSERT(list_empty(&h->loh_lru));
514         LASSERT(hlist_unhashed(&h->loh_hash));
515 }
516 EXPORT_SYMBOL(lu_object_header_fini);
517
518 /*
519  * Given a compound object, find its slice, corresponding to the device type
520  * @dtype.
521  */
522 struct lu_object *lu_object_locate(struct lu_object_header *h,
523                                    struct lu_device_type *dtype)
524 {
525         struct lu_object *o;
526
527         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
528                 if (o->lo_dev->ld_type == dtype)
529                         return o;
530         }
531         return NULL;
532 }
533 EXPORT_SYMBOL(lu_object_locate);
534
535 enum {
536         /*
537          * Maximal number of tld slots.
538          */
539         LU_CONTEXT_KEY_NR = 16
540 };
541
542 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
543
544 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
545
546 /*
547  * Register new key.
548  */
549 int lu_context_key_register(struct lu_context_key *key)
550 {
551         int result;
552         int i;
553
554         result = -ENFILE;
555         spin_lock(&lu_keys_guard);
556         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
557                 if (lu_keys[i] == NULL) {
558                         key->lct_index = i;
559                         key->lct_used = 1;
560                         lu_keys[i] = key;
561                         result = 0;
562                         break;
563                 }
564         }
565         spin_unlock(&lu_keys_guard);
566         return result;
567 }
568 EXPORT_SYMBOL(lu_context_key_register);
569
570 /*
571  * Deregister key.
572  */
573 void lu_context_key_degister(struct lu_context_key *key)
574 {
575         LASSERT(key->lct_used >= 1);
576         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
577
578         if (key->lct_used > 1)
579                 CERROR("key has instances.\n");
580         spin_lock(&lu_keys_guard);
581         lu_keys[key->lct_index] = NULL;
582         spin_unlock(&lu_keys_guard);
583 }
584 EXPORT_SYMBOL(lu_context_key_degister);
585
586 /*
587  * Return value associated with key @key in context @ctx.
588  */
589 void *lu_context_key_get(const struct lu_context *ctx,
590                          struct lu_context_key *key)
591 {
592         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
593         return ctx->lc_value[key->lct_index];
594 }
595 EXPORT_SYMBOL(lu_context_key_get);
596
597 static void keys_fini(struct lu_context *ctx)
598 {
599         int i;
600
601         if (ctx->lc_value != NULL) {
602                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
603                         if (ctx->lc_value[i] != NULL) {
604                                 struct lu_context_key *key;
605
606                                 key = lu_keys[i];
607                                 LASSERT(key != NULL);
608                                 LASSERT(key->lct_fini != NULL);
609                                 LASSERT(key->lct_used > 1);
610
611                                 key->lct_fini(ctx, ctx->lc_value[i]);
612                                 key->lct_used--;
613                                 ctx->lc_value[i] = NULL;
614                         }
615                 }
616                 OBD_FREE(ctx->lc_value,
617                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
618                 ctx->lc_value = NULL;
619         }
620 }
621
622 static int keys_init(struct lu_context *ctx)
623 {
624         int i;
625         int result;
626
627         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
628         if (ctx->lc_value != NULL) {
629                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
630                         struct lu_context_key *key;
631
632                         key = lu_keys[i];
633                         if (key != NULL) {
634                                 void *value;
635
636                                 LASSERT(key->lct_init != NULL);
637                                 LASSERT(key->lct_index == i);
638
639                                 value = key->lct_init(ctx);
640                                 if (IS_ERR(value)) {
641                                         keys_fini(ctx);
642                                         return PTR_ERR(value);
643                                 }
644                                 key->lct_used++;
645                                 ctx->lc_value[i] = value;
646                         }
647                 }
648                 result = 0;
649         } else
650                 result = -ENOMEM;
651         return result;
652 }
653
654 /*
655  * Initialize context data-structure. Create values for all keys.
656  */
657 int lu_context_init(struct lu_context *ctx)
658 {
659         memset(ctx, 0, sizeof *ctx);
660         keys_init(ctx);
661         return 0;
662 }
663 EXPORT_SYMBOL(lu_context_init);
664
665 /*
666  * Finalize context data-structure. Destroy key values.
667  */
668 void lu_context_fini(struct lu_context *ctx)
669 {
670         keys_fini(ctx);
671 }
672 EXPORT_SYMBOL(lu_context_fini);
673
674 /*
675  * Called before entering context.
676  */
677 void lu_context_enter(struct lu_context *ctx)
678 {
679 }
680 EXPORT_SYMBOL(lu_context_enter);
681
682 /*
683  * Called after exiting from @ctx
684  */
685 void lu_context_exit(struct lu_context *ctx)
686 {
687 }
688 EXPORT_SYMBOL(lu_context_exit);