Whamcloud - gitweb
lu_object_find(): fix site->ls_busy accouting bug, found by Huang Hua
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  *
26  * These are the only exported functions, they provide some generic
27  * infrastructure for managing object devices
28  */
29
30 #define DEBUG_SUBSYSTEM S_CLASS
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #include <linux/seq_file.h>
36 #include <linux/module.h>
37 #include <obd_support.h>
38 #include <lustre_disk.h>
39 #include <lu_object.h>
40 #include <libcfs/list.h>
41
42 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o);
43
44 /*
45  * Decrease reference counter on object. If last reference is freed, return
46  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
47  * case, free object immediately.
48  */
49 void lu_object_put(const struct lu_context *ctxt, struct lu_object *o)
50 {
51         struct lu_object_header *top;
52         struct lu_site          *site;
53
54         top = o->lo_header;
55         site = o->lo_dev->ld_site;
56         spin_lock(&site->ls_guard);
57         if (-- top->loh_ref == 0) {
58                 /*
59                  * When last reference is released, iterate over object
60                  * layers, and notify them that object is no longer busy.
61                  */
62                 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
63                         if (o->lo_ops->loo_object_release != NULL)
64                                 o->lo_ops->loo_object_release(ctxt, o);
65                 }
66                 -- site->ls_busy;
67                 if (lu_object_is_dying(top)) {
68                         /*
69                          * If object is dying (will not be cached), removed it
70                          * from hash table and LRU.
71                          *
72                          * This is done with hash table and LRU lists
73                          * locked. As the only way to acquire first reference
74                          * to previously unreferenced object is through
75                          * hash-table lookup (lu_object_find()), or LRU
76                          * scanning (lu_site_purge()), that are done under
77                          * hash-table and LRU lock, no race with concurrent
78                          * object lookup is possible and we can safely destroy
79                          * object below.
80                          */
81                         hlist_del_init(&top->loh_hash);
82                         list_del_init(&top->loh_lru);
83                 }
84         }
85         spin_unlock(&site->ls_guard);
86         if (lu_object_is_dying(top))
87                 /*
88                  * Object was already removed from hash and lru above, can
89                  * kill it.
90                  */
91                 lu_object_free(ctxt, o);
92 }
93 EXPORT_SYMBOL(lu_object_put);
94
95 /*
96  * Allocate new object.
97  *
98  * This follows object creation protocol, described in the comment within
99  * struct lu_device_operations definition.
100  */
101 static struct lu_object *lu_object_alloc(const struct lu_context *ctxt,
102                                          struct lu_site *s,
103                                          const struct lu_fid *f)
104 {
105         struct lu_object *scan;
106         struct lu_object *top;
107         int clean;
108         int result;
109
110         /*
111          * Create top-level object slice. This will also create
112          * lu_object_header.
113          */
114         top = s->ls_top_dev->ld_ops->ldo_object_alloc(ctxt,
115                                                       NULL, s->ls_top_dev);
116         if (IS_ERR(top))
117                 RETURN(top);
118         s->ls_total ++;
119         /*
120          * This is the only place where object fid is assigned. It's constant
121          * after this point.
122          */
123         top->lo_header->loh_fid = *f;
124         do {
125                 /*
126                  * Call ->loo_object_init() repeatedly, until no more new
127                  * object slices are created.
128                  */
129                 clean = 1;
130                 list_for_each_entry(scan,
131                                     &top->lo_header->loh_layers, lo_linkage) {
132                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
133                                 continue;
134                         clean = 0;
135                         scan->lo_header = top->lo_header;
136                         result = scan->lo_ops->loo_object_init(ctxt, scan);
137                         if (result != 0) {
138                                 lu_object_free(ctxt, top);
139                                 RETURN(ERR_PTR(result));
140                         }
141                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
142                 }
143         } while (!clean);
144         s->ls_stats.s_created ++;
145         RETURN(top);
146 }
147
148 /*
149  * Free object.
150  */
151 static void lu_object_free(const struct lu_context *ctx, struct lu_object *o)
152 {
153         struct list_head splice;
154         struct lu_object *scan;
155
156         /*
157          * First call ->loo_object_delete() method to release all resources.
158          */
159         list_for_each_entry_reverse(scan,
160                                     &o->lo_header->loh_layers, lo_linkage) {
161                 if (scan->lo_ops->loo_object_delete != NULL)
162                         scan->lo_ops->loo_object_delete(ctx, scan);
163         }
164         -- o->lo_dev->ld_site->ls_total;
165         /*
166          * Then, splice object layers into stand-alone list, and call
167          * ->loo_object_free() on all layers to free memory. Splice is
168          * necessary, because lu_object_header is freed together with the
169          * top-level slice.
170          */
171         INIT_LIST_HEAD(&splice);
172         list_splice_init(&o->lo_header->loh_layers, &splice);
173         while (!list_empty(&splice)) {
174                 o = container_of0(splice.next, struct lu_object, lo_linkage);
175                 list_del_init(&o->lo_linkage);
176                 LASSERT(o->lo_ops->loo_object_free != NULL);
177                 o->lo_ops->loo_object_free(ctx, o);
178         }
179 }
180
181 /*
182  * Free @nr objects from the cold end of the site LRU list.
183  */
184 void lu_site_purge(const struct lu_context *ctx, struct lu_site *s, int nr)
185 {
186         struct list_head         dispose;
187         struct lu_object_header *h;
188         struct lu_object_header *temp;
189
190         INIT_LIST_HEAD(&dispose);
191         /*
192          * Under LRU list lock, scan LRU list and move unreferenced objects to
193          * the dispose list, removing them from LRU and hash table.
194          */
195         spin_lock(&s->ls_guard);
196         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
197                 if (nr-- == 0)
198                         break;
199                 if (h->loh_ref > 0)
200                         continue;
201                 hlist_del_init(&h->loh_hash);
202                 list_move(&h->loh_lru, &dispose);
203         }
204         spin_unlock(&s->ls_guard);
205         /*
206          * Free everything on the dispose list. This is safe against races due
207          * to the reasons described in lu_object_put().
208          */
209         while (!list_empty(&dispose)) {
210                 h = container_of0(dispose.next,
211                                  struct lu_object_header, loh_lru);
212                 list_del_init(&h->loh_lru);
213                 lu_object_free(ctx, lu_object_top(h));
214                 s->ls_stats.s_lru_purged ++;
215         }
216 }
217 EXPORT_SYMBOL(lu_site_purge);
218
219 /*
220  * Print human readable representation of the @o to the @f.
221  */
222 int lu_object_print(const struct lu_context *ctx,
223                     struct seq_file *f, const struct lu_object *o)
224 {
225         static char ruler[] = "........................................";
226         const struct lu_object *scan;
227         int nob;
228         int depth;
229
230         nob = 0;
231         scan = o;
232         list_for_each_entry_continue(scan, &o->lo_linkage, lo_linkage) {
233                 depth = scan->lo_depth;
234                 if (depth <= o->lo_depth && scan != o)
235                         break;
236                 LASSERT(scan->lo_ops->loo_object_print != NULL);
237                 /*
238                  * print `.' @depth times.
239                  */
240                 nob += seq_printf(f, "%*.*s", depth, depth, ruler);
241                 nob += scan->lo_ops->loo_object_print(ctx, f, scan);
242                 nob += seq_printf(f, "\n");
243         }
244         return nob;
245 }
246 EXPORT_SYMBOL(lu_object_print);
247
248
249 static struct lu_object *htable_lookup(struct lu_site *s,
250                                        const struct hlist_head *bucket,
251                                        const struct lu_fid *f)
252 {
253         struct lu_object_header *h;
254         struct hlist_node *scan;
255
256         hlist_for_each_entry(h, scan, bucket, loh_hash) {
257                 s->ls_stats.s_cache_check ++;
258                 if (lu_fid_eq(&h->loh_fid, f) && !lu_object_is_dying(h)) {
259                         /* bump reference count... */
260                         if (h->loh_ref ++ == 0)
261                                 ++ s->ls_busy;
262                         /* and move to the head of the LRU */
263                         list_move_tail(&h->loh_lru, &s->ls_lru);
264                         s->ls_stats.s_cache_hit ++;
265                         return lu_object_top(h);
266                 }
267         }
268         s->ls_stats.s_cache_miss ++;
269         return NULL;
270 }
271
272 static __u32 fid_hash(const struct lu_fid *f)
273 {
274         /* all objects with same id and different versions will belong to same
275          * collisions list. */
276         return (fid_seq(f) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f);
277 }
278
279 /*
280  * Search cache for an object with the fid @f. If such object is found, return
281  * it. Otherwise, create new object, insert it into cache and return it. In
282  * any case, additional reference is acquired on the returned object.
283  */
284 struct lu_object *lu_object_find(const struct lu_context *ctxt,
285                                  struct lu_site *s, const struct lu_fid *f)
286 {
287         struct lu_object  *o;
288         struct lu_object  *shadow;
289         struct hlist_head *bucket;
290
291         /*
292          * This uses standard index maintenance protocol:
293          *
294          *     - search index under lock, and return object if found;
295          *     - otherwise, unlock index, allocate new object;
296          *     - lock index and search again;
297          *     - if nothing is found (usual case), insert newly created
298          *       object into index;
299          *     - otherwise (race: other thread inserted object), free
300          *       object just allocated.
301          *     - unlock index;
302          *     - return object.
303          */
304
305         bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
306         spin_lock(&s->ls_guard);
307         o = htable_lookup(s, bucket, f);
308
309         spin_unlock(&s->ls_guard);
310         if (o != NULL)
311                 return o;
312         /*
313          * Allocate new object. This may result in rather complicated
314          * operations, including fld queries, inode loading, etc.
315          */
316         o = lu_object_alloc(ctxt, s, f);
317         if (IS_ERR(o))
318                 return o;
319
320         LASSERT(lu_fid_eq(lu_object_fid(o), f));
321
322         spin_lock(&s->ls_guard);
323         shadow = htable_lookup(s, bucket, f);
324         if (shadow == NULL) {
325                 hlist_add_head(&o->lo_header->loh_hash, bucket);
326                 list_add_tail(&s->ls_lru, &o->lo_header->loh_lru);
327                 ++ s->ls_busy;
328                 shadow = o;
329                 o = NULL;
330         } else
331                 s->ls_stats.s_cache_race ++;
332         spin_unlock(&s->ls_guard);
333         if (o != NULL)
334                 lu_object_free(ctxt, o);
335         return shadow;
336 }
337 EXPORT_SYMBOL(lu_object_find);
338
339 enum {
340         LU_SITE_HTABLE_BITS = 8,
341         LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS),
342         LU_SITE_HTABLE_MASK = LU_SITE_HTABLE_SIZE - 1
343 };
344
345 /*
346  * Initialize site @s, with @d as the top level device.
347  */
348 int lu_site_init(struct lu_site *s, struct lu_device *top)
349 {
350         int result;
351         ENTRY;
352
353         memset(s, 0, sizeof *s);
354         spin_lock_init(&s->ls_guard);
355         CFS_INIT_LIST_HEAD(&s->ls_lru);
356         s->ls_top_dev = top;
357         top->ld_site = s;
358         lu_device_get(top);
359         /*
360          * XXX nikita: fixed size hash-table.
361          */
362         s->ls_hash_mask = LU_SITE_HTABLE_MASK;
363         OBD_ALLOC(s->ls_hash, LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
364         if (s->ls_hash != NULL) {
365                 int i;
366                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
367                         INIT_HLIST_HEAD(&s->ls_hash[i]);
368                 result = 0;
369         } else {
370                 result = -ENOMEM;
371         }
372
373         RETURN(result);
374 }
375 EXPORT_SYMBOL(lu_site_init);
376
377 /*
378  * Finalize @s and release its resources.
379  */
380 void lu_site_fini(struct lu_site *s)
381 {
382         LASSERT(list_empty(&s->ls_lru));
383         LASSERT(s->ls_total == 0);
384         LASSERT(s->ls_busy == 0);
385
386         if (s->ls_hash != NULL) {
387                 int i;
388                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
389                         LASSERT(hlist_empty(&s->ls_hash[i]));
390                 OBD_FREE(s->ls_hash,
391                          LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
392                 s->ls_hash = NULL;
393        }
394        if (s->ls_top_dev != NULL) {
395                s->ls_top_dev->ld_site = NULL;
396                lu_device_put(s->ls_top_dev);
397                s->ls_top_dev = NULL;
398        }
399  }
400 EXPORT_SYMBOL(lu_site_fini);
401
402 /*
403  * Acquire additional reference on device @d
404  */
405 void lu_device_get(struct lu_device *d)
406 {
407         atomic_inc(&d->ld_ref);
408 }
409 EXPORT_SYMBOL(lu_device_get);
410
411 /*
412  * Release reference on device @d.
413  */
414 void lu_device_put(struct lu_device *d)
415 {
416         atomic_dec(&d->ld_ref);
417 }
418 EXPORT_SYMBOL(lu_device_put);
419
420 /*
421  * Initialize device @d of type @t.
422  */
423 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
424 {
425         memset(d, 0, sizeof *d);
426         atomic_set(&d->ld_ref, 0);
427         d->ld_type = t;
428         return 0;
429 }
430 EXPORT_SYMBOL(lu_device_init);
431
432 /*
433  * Finalize device @d.
434  */
435 void lu_device_fini(struct lu_device *d)
436 {
437         LASSERT(atomic_read(&d->ld_ref) == 0);
438 }
439 EXPORT_SYMBOL(lu_device_fini);
440
441 /*
442  * Initialize object @o that is part of compound object @h and was created by
443  * device @d.
444  */
445 int lu_object_init(struct lu_object *o,
446                    struct lu_object_header *h, struct lu_device *d)
447 {
448         memset(o, 0, sizeof *o);
449         o->lo_header = h;
450         o->lo_dev    = d;
451         lu_device_get(d);
452         CFS_INIT_LIST_HEAD(&o->lo_linkage);
453         return 0;
454 }
455 EXPORT_SYMBOL(lu_object_init);
456
457 /*
458  * Finalize object and release its resources.
459  */
460 void lu_object_fini(struct lu_object *o)
461 {
462         LASSERT(list_empty(&o->lo_linkage));
463
464         if (o->lo_dev != NULL) {
465                 lu_device_put(o->lo_dev);
466                 o->lo_dev = NULL;
467         }
468 }
469 EXPORT_SYMBOL(lu_object_fini);
470
471 /*
472  * Add object @o as first layer of compound object @h
473  *
474  * This is typically called by the ->ldo_object_alloc() method of top-level
475  * device.
476  */
477 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
478 {
479         list_move(&o->lo_linkage, &h->loh_layers);
480 }
481 EXPORT_SYMBOL(lu_object_add_top);
482
483 /*
484  * Add object @o as a layer of compound object, going after @before.1
485  *
486  * This is typically called by the ->ldo_object_alloc() method of
487  * @before->lo_dev.
488  */
489 void lu_object_add(struct lu_object *before, struct lu_object *o)
490 {
491         list_move(&o->lo_linkage, &before->lo_linkage);
492 }
493 EXPORT_SYMBOL(lu_object_add);
494
495 /*
496  * Initialize compound object.
497  */
498 int lu_object_header_init(struct lu_object_header *h)
499 {
500         memset(h, 0, sizeof *h);
501         h->loh_ref = 1;
502         INIT_HLIST_NODE(&h->loh_hash);
503         CFS_INIT_LIST_HEAD(&h->loh_lru);
504         CFS_INIT_LIST_HEAD(&h->loh_layers);
505         return 0;
506 }
507 EXPORT_SYMBOL(lu_object_header_init);
508
509 /*
510  * Finalize compound object.
511  */
512 void lu_object_header_fini(struct lu_object_header *h)
513 {
514         LASSERT(list_empty(&h->loh_layers));
515         LASSERT(list_empty(&h->loh_lru));
516         LASSERT(hlist_unhashed(&h->loh_hash));
517 }
518 EXPORT_SYMBOL(lu_object_header_fini);
519
520 /*
521  * Given a compound object, find its slice, corresponding to the device type
522  * @dtype.
523  */
524 struct lu_object *lu_object_locate(struct lu_object_header *h,
525                                    struct lu_device_type *dtype)
526 {
527         struct lu_object *o;
528
529         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
530                 if (o->lo_dev->ld_type == dtype)
531                         return o;
532         }
533         return NULL;
534 }
535 EXPORT_SYMBOL(lu_object_locate);
536
537 enum {
538         /*
539          * Maximal number of tld slots.
540          */
541         LU_CONTEXT_KEY_NR = 16
542 };
543
544 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
545
546 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
547
548 /*
549  * Register new key.
550  */
551 int lu_context_key_register(struct lu_context_key *key)
552 {
553         int result;
554         int i;
555
556         result = -ENFILE;
557         spin_lock(&lu_keys_guard);
558         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
559                 if (lu_keys[i] == NULL) {
560                         key->lct_index = i;
561                         key->lct_used = 1;
562                         lu_keys[i] = key;
563                         result = 0;
564                         break;
565                 }
566         }
567         spin_unlock(&lu_keys_guard);
568         return result;
569 }
570 EXPORT_SYMBOL(lu_context_key_register);
571
572 /*
573  * Deregister key.
574  */
575 void lu_context_key_degister(struct lu_context_key *key)
576 {
577         LASSERT(key->lct_used >= 1);
578         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
579
580         if (key->lct_used > 1)
581                 CERROR("key has instances.\n");
582         spin_lock(&lu_keys_guard);
583         lu_keys[key->lct_index] = NULL;
584         spin_unlock(&lu_keys_guard);
585 }
586 EXPORT_SYMBOL(lu_context_key_degister);
587
588 /*
589  * Return value associated with key @key in context @ctx.
590  */
591 void *lu_context_key_get(const struct lu_context *ctx,
592                          struct lu_context_key *key)
593 {
594         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
595         return ctx->lc_value[key->lct_index];
596 }
597 EXPORT_SYMBOL(lu_context_key_get);
598
599 static void keys_fini(struct lu_context *ctx)
600 {
601         int i;
602
603         if (ctx->lc_value != NULL) {
604                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
605                         if (ctx->lc_value[i] != NULL) {
606                                 struct lu_context_key *key;
607
608                                 key = lu_keys[i];
609                                 LASSERT(key != NULL);
610                                 LASSERT(key->lct_fini != NULL);
611                                 LASSERT(key->lct_used > 1);
612
613                                 key->lct_fini(ctx, key, ctx->lc_value[i]);
614                                 key->lct_used--;
615                                 ctx->lc_value[i] = NULL;
616                         }
617                 }
618                 OBD_FREE(ctx->lc_value,
619                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
620                 ctx->lc_value = NULL;
621         }
622 }
623
624 static int keys_init(struct lu_context *ctx)
625 {
626         int i;
627         int result;
628
629         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
630         if (ctx->lc_value != NULL) {
631                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
632                         struct lu_context_key *key;
633
634                         key = lu_keys[i];
635                         if (key != NULL) {
636                                 void *value;
637
638                                 LASSERT(key->lct_init != NULL);
639                                 LASSERT(key->lct_index == i);
640
641                                 value = key->lct_init(ctx, key);
642                                 if (IS_ERR(value)) {
643                                         keys_fini(ctx);
644                                         return PTR_ERR(value);
645                                 }
646                                 key->lct_used++;
647                                 ctx->lc_value[i] = value;
648                         }
649                 }
650                 result = 0;
651         } else
652                 result = -ENOMEM;
653         return result;
654 }
655
656 /*
657  * Initialize context data-structure. Create values for all keys.
658  */
659 int lu_context_init(struct lu_context *ctx)
660 {
661         memset(ctx, 0, sizeof *ctx);
662         keys_init(ctx);
663         return 0;
664 }
665 EXPORT_SYMBOL(lu_context_init);
666
667 /*
668  * Finalize context data-structure. Destroy key values.
669  */
670 void lu_context_fini(struct lu_context *ctx)
671 {
672         keys_fini(ctx);
673 }
674 EXPORT_SYMBOL(lu_context_fini);
675
676 /*
677  * Called before entering context.
678  */
679 void lu_context_enter(struct lu_context *ctx)
680 {
681 }
682 EXPORT_SYMBOL(lu_context_enter);
683
684 /*
685  * Called after exiting from @ctx
686  */
687 void lu_context_exit(struct lu_context *ctx)
688 {
689 }
690 EXPORT_SYMBOL(lu_context_exit);