Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 #include <obd_support.h>
39 #include <lustre_disk.h>
40 #include <lustre_fid.h>
41 #include <lu_object.h>
42 #include <libcfs/list.h>
43
44 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
45
46 /*
47  * Decrease reference counter on object. If last reference is freed, return
48  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
49  * case, free object immediately.
50  */
51 void lu_object_put(const struct lu_env *env, struct lu_object *o)
52 {
53         struct lu_object_header *top;
54         struct lu_site          *site;
55         struct lu_object        *orig;
56         int                      kill_it;
57
58         top = o->lo_header;
59         site = o->lo_dev->ld_site;
60         orig = o;
61         kill_it = 0;
62         spin_lock(&site->ls_guard);
63         if (-- top->loh_ref == 0) {
64                 /*
65                  * When last reference is released, iterate over object
66                  * layers, and notify them that object is no longer busy.
67                  */
68                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
69                         if (o->lo_ops->loo_object_release != NULL)
70                                 o->lo_ops->loo_object_release(env, o);
71                 }
72                 -- site->ls_busy;
73                 if (lu_object_is_dying(top)) {
74                         /*
75                          * If object is dying (will not be cached), removed it
76                          * from hash table and LRU.
77                          *
78                          * This is done with hash table and LRU lists
79                          * locked. As the only way to acquire first reference
80                          * to previously unreferenced object is through
81                          * hash-table lookup (lu_object_find()), or LRU
82                          * scanning (lu_site_purge()), that are done under
83                          * hash-table and LRU lock, no race with concurrent
84                          * object lookup is possible and we can safely destroy
85                          * object below.
86                          */
87                         hlist_del_init(&top->loh_hash);
88                         list_del_init(&top->loh_lru);
89                         kill_it = 1;
90                 }
91         }
92         spin_unlock(&site->ls_guard);
93         if (kill_it)
94                 /*
95                  * Object was already removed from hash and lru above, can
96                  * kill it.
97                  */
98                 lu_object_free(env, orig);
99 }
100 EXPORT_SYMBOL(lu_object_put);
101
102 /*
103  * Allocate new object.
104  *
105  * This follows object creation protocol, described in the comment within
106  * struct lu_device_operations definition.
107  */
108 static struct lu_object *lu_object_alloc(const struct lu_env *env,
109                                          struct lu_site *s,
110                                          const struct lu_fid *f,
111                                          const struct lustre_capa *capa)
112 {
113         struct lu_object *scan;
114         struct lu_object *top;
115         struct list_head *layers;
116         int clean;
117         int result;
118
119         /*
120          * Create top-level object slice. This will also create
121          * lu_object_header.
122          */
123         top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
124                                                       NULL, s->ls_top_dev);
125         if (IS_ERR(top))
126                 RETURN(top);
127         s->ls_total ++;
128         /*
129          * This is the only place where object fid is assigned. It's constant
130          * after this point.
131          */
132         top->lo_header->loh_fid  = *f;
133         if (capa == BYPASS_CAPA)
134                 lu_object_bypass_capa(top);
135         else if (capa)
136                 top->lo_header->loh_capa = *capa;
137
138         layers = &top->lo_header->loh_layers;
139         do {
140                 /*
141                  * Call ->loo_object_init() repeatedly, until no more new
142                  * object slices are created.
143                  */
144                 clean = 1;
145                 list_for_each_entry(scan, layers, lo_linkage) {
146                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
147                                 continue;
148                         clean = 0;
149                         scan->lo_header = top->lo_header;
150                         result = scan->lo_ops->loo_object_init(env, scan);
151                         if (result != 0) {
152                                 lu_object_free(env, top);
153                                 RETURN(ERR_PTR(result));
154                         }
155                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
156                 }
157         } while (!clean);
158
159         list_for_each_entry_reverse(scan, layers, lo_linkage) {
160                 if (scan->lo_ops->loo_object_start != NULL) {
161                         result = scan->lo_ops->loo_object_start(env, scan);
162                         if (result != 0) {
163                                 lu_object_free(env, top);
164                                 RETURN(ERR_PTR(result));
165                         }
166                 }
167         }
168
169         s->ls_stats.s_created ++;
170         RETURN(top);
171 }
172
173 /*
174  * Free object.
175  */
176 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
177 {
178         struct list_head splice;
179         struct lu_object *scan;
180
181         /*
182          * First call ->loo_object_delete() method to release all resources.
183          */
184         list_for_each_entry_reverse(scan,
185                                     &o->lo_header->loh_layers, lo_linkage) {
186                 if (scan->lo_ops->loo_object_delete != NULL)
187                         scan->lo_ops->loo_object_delete(env, scan);
188         }
189         -- o->lo_dev->ld_site->ls_total;
190         /*
191          * Then, splice object layers into stand-alone list, and call
192          * ->loo_object_free() on all layers to free memory. Splice is
193          * necessary, because lu_object_header is freed together with the
194          * top-level slice.
195          */
196         INIT_LIST_HEAD(&splice);
197         list_splice_init(&o->lo_header->loh_layers, &splice);
198         while (!list_empty(&splice)) {
199                 o = container_of0(splice.next, struct lu_object, lo_linkage);
200                 list_del_init(&o->lo_linkage);
201                 LASSERT(o->lo_ops->loo_object_free != NULL);
202                 o->lo_ops->loo_object_free(env, o);
203         }
204 }
205
206 /*
207  * Free @nr objects from the cold end of the site LRU list.
208  */
209 void lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
210 {
211         struct list_head         dispose;
212         struct lu_object_header *h;
213         struct lu_object_header *temp;
214
215         INIT_LIST_HEAD(&dispose);
216         /*
217          * Under LRU list lock, scan LRU list and move unreferenced objects to
218          * the dispose list, removing them from LRU and hash table.
219          */
220         spin_lock(&s->ls_guard);
221         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
222                 if (nr-- == 0)
223                         break;
224                 if (h->loh_ref > 0)
225                         continue;
226                 hlist_del_init(&h->loh_hash);
227                 list_move(&h->loh_lru, &dispose);
228         }
229         spin_unlock(&s->ls_guard);
230         /*
231          * Free everything on the dispose list. This is safe against races due
232          * to the reasons described in lu_object_put().
233          */
234         while (!list_empty(&dispose)) {
235                 h = container_of0(dispose.next,
236                                  struct lu_object_header, loh_lru);
237                 list_del_init(&h->loh_lru);
238                 lu_object_free(env, lu_object_top(h));
239                 s->ls_stats.s_lru_purged ++;
240         }
241 }
242 EXPORT_SYMBOL(lu_site_purge);
243
244 /*
245  * Object printing.
246  *
247  * Code below has to jump through certain loops to output object description
248  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
249  * composes object description from strings that are parts of _lines_ of
250  * output (i.e., strings that are not terminated by newline). This doesn't fit
251  * very well into libcfs_debug_msg() interface that assumes that each message
252  * supplied to it is a self-contained output line.
253  *
254  * To work around this, strings are collected in a temporary buffer
255  * (implemented as a value of lu_cdebug_key key), until terminating newline
256  * character is detected.
257  *
258  */
259
260 enum {
261         /*
262          * Maximal line size.
263          *
264          * XXX overflow is not handled correctly.
265          */
266         LU_CDEBUG_LINE = 256
267 };
268
269 struct lu_cdebug_data {
270         /*
271          * Temporary buffer.
272          */
273         char lck_area[LU_CDEBUG_LINE];
274 };
275
276 static void *lu_cdebug_key_init(const struct lu_context *ctx,
277                                 struct lu_context_key *key)
278 {
279         struct lu_cdebug_data *value;
280
281         OBD_ALLOC_PTR(value);
282         if (value == NULL)
283                 value = ERR_PTR(-ENOMEM);
284         return value;
285 }
286
287 static void lu_cdebug_key_fini(const struct lu_context *ctx,
288                                struct lu_context_key *key, void *data)
289 {
290         struct lu_cdebug_data *value = data;
291         OBD_FREE_PTR(value);
292 }
293
294 /*
295  * Key, holding temporary buffer. This key is registered very early by
296  * lu_global_init().
297  */
298 static struct lu_context_key lu_cdebug_key = {
299         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
300         .lct_init = lu_cdebug_key_init,
301         .lct_fini = lu_cdebug_key_fini
302 };
303
304 /*
305  * Printer function emitting messages through libcfs_debug_msg().
306  */
307 int lu_cdebug_printer(const struct lu_env *env,
308                       void *cookie, const char *format, ...)
309 {
310         struct lu_cdebug_print_info *info = cookie;
311         struct lu_cdebug_data       *key;
312         int used;
313         int complete;
314         va_list args;
315
316         va_start(args, format);
317
318         key = lu_context_key_get(&env->le_ctx, &lu_cdebug_key);
319         LASSERT(key != NULL);
320
321         used = strlen(key->lck_area);
322         complete = format[strlen(format) - 1] == '\n';
323         /*
324          * Append new chunk to the buffer.
325          */
326         vsnprintf(key->lck_area + used,
327                   ARRAY_SIZE(key->lck_area) - used, format, args);
328         if (complete) {
329                 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
330                                  (char *)info->lpi_file, info->lpi_fn,
331                                  info->lpi_line, "%s", key->lck_area);
332                 key->lck_area[0] = 0;
333         }
334         va_end(args);
335         return 0;
336 }
337 EXPORT_SYMBOL(lu_cdebug_printer);
338
339 /*
340  * Print object header.
341  */
342 static void lu_object_header_print(const struct lu_env *env,
343                                    void *cookie, lu_printer_t printer,
344                                    const struct lu_object_header *hdr)
345 {
346         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s]",
347                    hdr, hdr->loh_flags, hdr->loh_ref, PFID(&hdr->loh_fid),
348                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
349                    list_empty(&hdr->loh_lru) ? "" : " lru");
350 }
351
352 /*
353  * Print human readable representation of the @o to the @printer.
354  */
355 void lu_object_print(const struct lu_env *env, void *cookie,
356                      lu_printer_t printer, const struct lu_object *o)
357 {
358         static const char ruler[] = "........................................";
359         struct lu_object_header *top;
360         int depth;
361
362         top = o->lo_header;
363         lu_object_header_print(env, cookie, printer, top);
364         (*printer)(env, cookie, "\n");
365         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
366                 depth = o->lo_depth + 4;
367                 LASSERT(o->lo_ops->loo_object_print != NULL);
368                 /*
369                  * print `.' @depth times.
370                  */
371                 (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
372                 o->lo_ops->loo_object_print(env, cookie, printer, o);
373                 (*printer)(env, cookie, "\n");
374         }
375 }
376 EXPORT_SYMBOL(lu_object_print);
377
378 /*
379  * Check object consistency.
380  */
381 int lu_object_invariant(const struct lu_object *o)
382 {
383         struct lu_object_header *top;
384
385         top = o->lo_header;
386         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
387                 if (o->lo_ops->loo_object_invariant != NULL &&
388                     !o->lo_ops->loo_object_invariant(o))
389                         return 0;
390         }
391         return 1;
392 }
393 EXPORT_SYMBOL(lu_object_invariant);
394
395 static struct lu_object *htable_lookup(struct lu_site *s,
396                                        const struct hlist_head *bucket,
397                                        const struct lu_fid *f)
398 {
399         struct lu_object_header *h;
400         struct hlist_node *scan;
401
402         hlist_for_each_entry(h, scan, bucket, loh_hash) {
403                 s->ls_stats.s_cache_check ++;
404                 if (lu_fid_eq(&h->loh_fid, f) && !lu_object_is_dying(h)) {
405                         /* bump reference count... */
406                         if (h->loh_ref ++ == 0)
407                                 ++ s->ls_busy;
408                         /* and move to the head of the LRU */
409                         list_move_tail(&h->loh_lru, &s->ls_lru);
410                         s->ls_stats.s_cache_hit ++;
411                         return lu_object_top(h);
412                 }
413         }
414         s->ls_stats.s_cache_miss ++;
415         return NULL;
416 }
417
418 static __u32 fid_hash(const struct lu_fid *f)
419 {
420         /* all objects with same id and different versions will belong to same
421          * collisions list. */
422         return (fid_seq(f) - 1) * LUSTRE_SEQ_MAX_WIDTH + fid_oid(f);
423 }
424
425 /*
426  * Search cache for an object with the fid @f. If such object is found, return
427  * it. Otherwise, create new object, insert it into cache and return it. In
428  * any case, additional reference is acquired on the returned object.
429  */
430 struct lu_object *lu_object_find(const struct lu_env *env,
431                                  struct lu_site *s, const struct lu_fid *f,
432                                  struct lustre_capa *capa)
433 {
434         struct lu_object  *o;
435         struct lu_object  *shadow;
436         struct hlist_head *bucket;
437         int                rc;
438
439         /*
440          * This uses standard index maintenance protocol:
441          *
442          *     - search index under lock, and return object if found;
443          *     - otherwise, unlock index, allocate new object;
444          *     - lock index and search again;
445          *     - if nothing is found (usual case), insert newly created
446          *       object into index;
447          *     - otherwise (race: other thread inserted object), free
448          *       object just allocated.
449          *     - unlock index;
450          *     - return object.
451          */
452
453         bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
454         spin_lock(&s->ls_guard);
455         o = htable_lookup(s, bucket, f);
456
457         spin_unlock(&s->ls_guard);
458         if (o != NULL) {
459                 if (capa == BYPASS_CAPA) {
460                         o->lo_header->loh_capa_bypass = 1;
461                 } else {
462                         rc = lu_object_auth(env, o, capa,
463                                             CAPA_OPC_INDEX_LOOKUP);
464                         if (rc)
465                                 return ERR_PTR(rc);
466                         if (capa)
467                                 o->lo_header->loh_capa = *capa;
468                 }
469                 return o;
470         }
471
472         /*
473          * Allocate new object. This may result in rather complicated
474          * operations, including fld queries, inode loading, etc.
475          */
476         o = lu_object_alloc(env, s, f, capa);
477         if (IS_ERR(o))
478                 return o;
479
480         LASSERT(lu_fid_eq(lu_object_fid(o), f));
481
482         spin_lock(&s->ls_guard);
483         shadow = htable_lookup(s, bucket, f);
484         if (shadow == NULL) {
485                 hlist_add_head(&o->lo_header->loh_hash, bucket);
486                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
487                 ++ s->ls_busy;
488                 shadow = o;
489                 o = NULL;
490         } else
491                 s->ls_stats.s_cache_race ++;
492         spin_unlock(&s->ls_guard);
493         if (o != NULL)
494                 lu_object_free(env, o);
495         return shadow;
496 }
497 EXPORT_SYMBOL(lu_object_find);
498
499 int lu_object_auth(const struct lu_env *env, const struct lu_object *o,
500                    struct lustre_capa *capa, __u64 opc)
501 {
502         struct lu_object_header *top = o->lo_header;
503         int rc;
504
505         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
506                 if (o->lo_ops->loo_object_auth) {
507                         rc = o->lo_ops->loo_object_auth(env, o, capa, opc);
508                         if (rc)
509                                 return rc;
510                 }
511         }
512
513         return 0;
514 }
515 EXPORT_SYMBOL(lu_object_auth);
516
517 enum {
518         LU_SITE_HTABLE_BITS = 8,
519         LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS),
520         LU_SITE_HTABLE_MASK = LU_SITE_HTABLE_SIZE - 1
521 };
522
523 /*
524  * Initialize site @s, with @d as the top level device.
525  */
526 int lu_site_init(struct lu_site *s, struct lu_device *top)
527 {
528         int result;
529         ENTRY;
530
531         memset(s, 0, sizeof *s);
532         spin_lock_init(&s->ls_guard);
533         CFS_INIT_LIST_HEAD(&s->ls_lru);
534         s->ls_top_dev = top;
535         top->ld_site = s;
536         lu_device_get(top);
537         /*
538          * XXX nikita: fixed size hash-table.
539          */
540         s->ls_hash_mask = LU_SITE_HTABLE_MASK;
541         OBD_ALLOC(s->ls_hash, LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
542         if (s->ls_hash != NULL) {
543                 int i;
544                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
545                         INIT_HLIST_HEAD(&s->ls_hash[i]);
546                 result = 0;
547         } else {
548                 result = -ENOMEM;
549         }
550
551         RETURN(result);
552 }
553 EXPORT_SYMBOL(lu_site_init);
554
555 /*
556  * Finalize @s and release its resources.
557  */
558 void lu_site_fini(struct lu_site *s)
559 {
560         LASSERT(list_empty(&s->ls_lru));
561         LASSERT(s->ls_total == 0);
562         LASSERT(s->ls_busy == 0);
563
564         if (s->ls_hash != NULL) {
565                 int i;
566                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
567                         LASSERT(hlist_empty(&s->ls_hash[i]));
568                 OBD_FREE(s->ls_hash,
569                          LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
570                 s->ls_hash = NULL;
571        }
572        if (s->ls_top_dev != NULL) {
573                s->ls_top_dev->ld_site = NULL;
574                lu_device_put(s->ls_top_dev);
575                s->ls_top_dev = NULL;
576        }
577  }
578 EXPORT_SYMBOL(lu_site_fini);
579
580 /*
581  * Acquire additional reference on device @d
582  */
583 void lu_device_get(struct lu_device *d)
584 {
585         atomic_inc(&d->ld_ref);
586 }
587 EXPORT_SYMBOL(lu_device_get);
588
589 /*
590  * Release reference on device @d.
591  */
592 void lu_device_put(struct lu_device *d)
593 {
594         atomic_dec(&d->ld_ref);
595 }
596 EXPORT_SYMBOL(lu_device_put);
597
598 /*
599  * Initialize device @d of type @t.
600  */
601 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
602 {
603         memset(d, 0, sizeof *d);
604         atomic_set(&d->ld_ref, 0);
605         d->ld_type = t;
606         return 0;
607 }
608 EXPORT_SYMBOL(lu_device_init);
609
610 /*
611  * Finalize device @d.
612  */
613 void lu_device_fini(struct lu_device *d)
614 {
615         if (d->ld_obd != NULL)
616                 /* finish lprocfs */
617                 lprocfs_obd_cleanup(d->ld_obd);
618
619         LASSERTF(atomic_read(&d->ld_ref) == 0,
620                  "Refcount is %u\n", atomic_read(&d->ld_ref));
621 }
622 EXPORT_SYMBOL(lu_device_fini);
623
624 /*
625  * Initialize object @o that is part of compound object @h and was created by
626  * device @d.
627  */
628 int lu_object_init(struct lu_object *o,
629                    struct lu_object_header *h, struct lu_device *d)
630 {
631         memset(o, 0, sizeof *o);
632         o->lo_header = h;
633         o->lo_dev    = d;
634         lu_device_get(d);
635         CFS_INIT_LIST_HEAD(&o->lo_linkage);
636         return 0;
637 }
638 EXPORT_SYMBOL(lu_object_init);
639
640 /*
641  * Finalize object and release its resources.
642  */
643 void lu_object_fini(struct lu_object *o)
644 {
645         LASSERT(list_empty(&o->lo_linkage));
646
647         if (o->lo_dev != NULL) {
648                 lu_device_put(o->lo_dev);
649                 o->lo_dev = NULL;
650         }
651 }
652 EXPORT_SYMBOL(lu_object_fini);
653
654 /*
655  * Add object @o as first layer of compound object @h
656  *
657  * This is typically called by the ->ldo_object_alloc() method of top-level
658  * device.
659  */
660 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
661 {
662         list_move(&o->lo_linkage, &h->loh_layers);
663 }
664 EXPORT_SYMBOL(lu_object_add_top);
665
666 /*
667  * Add object @o as a layer of compound object, going after @before.1
668  *
669  * This is typically called by the ->ldo_object_alloc() method of
670  * @before->lo_dev.
671  */
672 void lu_object_add(struct lu_object *before, struct lu_object *o)
673 {
674         list_move(&o->lo_linkage, &before->lo_linkage);
675 }
676 EXPORT_SYMBOL(lu_object_add);
677
678 /*
679  * Initialize compound object.
680  */
681 int lu_object_header_init(struct lu_object_header *h)
682 {
683         memset(h, 0, sizeof *h);
684         h->loh_ref = 1;
685         INIT_HLIST_NODE(&h->loh_hash);
686         CFS_INIT_LIST_HEAD(&h->loh_lru);
687         CFS_INIT_LIST_HEAD(&h->loh_layers);
688         return 0;
689 }
690 EXPORT_SYMBOL(lu_object_header_init);
691
692 /*
693  * Finalize compound object.
694  */
695 void lu_object_header_fini(struct lu_object_header *h)
696 {
697         LASSERT(list_empty(&h->loh_layers));
698         LASSERT(list_empty(&h->loh_lru));
699         LASSERT(hlist_unhashed(&h->loh_hash));
700 }
701 EXPORT_SYMBOL(lu_object_header_fini);
702
703 /*
704  * Given a compound object, find its slice, corresponding to the device type
705  * @dtype.
706  */
707 struct lu_object *lu_object_locate(struct lu_object_header *h,
708                                    struct lu_device_type *dtype)
709 {
710         struct lu_object *o;
711
712         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
713                 if (o->lo_dev->ld_type == dtype)
714                         return o;
715         }
716         return NULL;
717 }
718 EXPORT_SYMBOL(lu_object_locate);
719
720 enum {
721         /*
722          * Maximal number of tld slots.
723          */
724         LU_CONTEXT_KEY_NR = 16
725 };
726
727 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
728
729 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
730
731 /*
732  * Register new key.
733  */
734 int lu_context_key_register(struct lu_context_key *key)
735 {
736         int result;
737         int i;
738
739         LASSERT(key->lct_init != NULL);
740         LASSERT(key->lct_fini != NULL);
741         LASSERT(key->lct_tags != 0);
742
743         result = -ENFILE;
744         spin_lock(&lu_keys_guard);
745         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
746                 if (lu_keys[i] == NULL) {
747                         key->lct_index = i;
748                         atomic_set(&key->lct_used, 1);
749                         lu_keys[i] = key;
750                         result = 0;
751                         break;
752                 }
753         }
754         spin_unlock(&lu_keys_guard);
755         return result;
756 }
757 EXPORT_SYMBOL(lu_context_key_register);
758
759 /*
760  * Deregister key.
761  */
762 void lu_context_key_degister(struct lu_context_key *key)
763 {
764         LASSERT(atomic_read(&key->lct_used) >= 1);
765         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
766
767         if (atomic_read(&key->lct_used) > 1)
768                 CERROR("key has instances.\n");
769         spin_lock(&lu_keys_guard);
770         lu_keys[key->lct_index] = NULL;
771         spin_unlock(&lu_keys_guard);
772 }
773 EXPORT_SYMBOL(lu_context_key_degister);
774
775 /*
776  * Return value associated with key @key in context @ctx.
777  */
778 void *lu_context_key_get(const struct lu_context *ctx,
779                          struct lu_context_key *key)
780 {
781         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
782         return ctx->lc_value[key->lct_index];
783 }
784 EXPORT_SYMBOL(lu_context_key_get);
785
786 static void keys_fini(struct lu_context *ctx)
787 {
788         int i;
789
790         if (ctx->lc_value != NULL) {
791                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
792                         if (ctx->lc_value[i] != NULL) {
793                                 struct lu_context_key *key;
794
795                                 key = lu_keys[i];
796                                 LASSERT(key != NULL);
797                                 LASSERT(key->lct_fini != NULL);
798                                 LASSERT(atomic_read(&key->lct_used) > 1);
799
800                                 key->lct_fini(ctx, key, ctx->lc_value[i]);
801                                 atomic_dec(&key->lct_used);
802                                 ctx->lc_value[i] = NULL;
803                         }
804                 }
805                 OBD_FREE(ctx->lc_value,
806                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
807                 ctx->lc_value = NULL;
808         }
809 }
810
811 static int keys_fill(const struct lu_context *ctx)
812 {
813         int i;
814
815         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
816                 struct lu_context_key *key;
817
818                 key = lu_keys[i];
819                 if (ctx->lc_value[i] == NULL &&
820                     key != NULL && key->lct_tags & ctx->lc_tags) {
821                         void *value;
822
823                         LASSERT(key->lct_init != NULL);
824                         LASSERT(key->lct_index == i);
825
826                         value = key->lct_init(ctx, key);
827                         if (IS_ERR(value))
828                                 return PTR_ERR(value);
829                         atomic_inc(&key->lct_used);
830                         ctx->lc_value[i] = value;
831                 }
832         }
833         return 0;
834 }
835
836 static int keys_init(struct lu_context *ctx)
837 {
838         int result;
839
840         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
841         if (ctx->lc_value != NULL)
842                 result = keys_fill(ctx);
843         else
844                 result = -ENOMEM;
845
846         if (result != 0)
847                 keys_fini(ctx);
848         return result;
849 }
850
851 /*
852  * Initialize context data-structure. Create values for all keys.
853  */
854 int lu_context_init(struct lu_context *ctx, __u32 tags)
855 {
856         memset(ctx, 0, sizeof *ctx);
857         ctx->lc_tags = tags;
858         return keys_init(ctx);
859 }
860 EXPORT_SYMBOL(lu_context_init);
861
862 /*
863  * Finalize context data-structure. Destroy key values.
864  */
865 void lu_context_fini(struct lu_context *ctx)
866 {
867         keys_fini(ctx);
868 }
869 EXPORT_SYMBOL(lu_context_fini);
870
871 /*
872  * Called before entering context.
873  */
874 void lu_context_enter(struct lu_context *ctx)
875 {
876 }
877 EXPORT_SYMBOL(lu_context_enter);
878
879 /*
880  * Called after exiting from @ctx
881  */
882 void lu_context_exit(struct lu_context *ctx)
883 {
884         int i;
885
886         if (ctx->lc_value != NULL) {
887                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
888                         if (ctx->lc_value[i] != NULL) {
889                                 struct lu_context_key *key;
890
891                                 key = lu_keys[i];
892                                 LASSERT(key != NULL);
893                                 if (key->lct_exit != NULL)
894                                         key->lct_exit(ctx,
895                                                       key, ctx->lc_value[i]);
896                         }
897                 }
898         }
899 }
900 EXPORT_SYMBOL(lu_context_exit);
901
902 /*
903  * Allocate for context all missing keys that were registered after context
904  * creation.
905  */
906 int lu_context_refill(const struct lu_context *ctx)
907 {
908         LASSERT(ctx->lc_value != NULL);
909         return keys_fill(ctx);
910 }
911 EXPORT_SYMBOL(lu_context_refill);
912
913 int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags)
914 {
915         int result;
916
917         env->le_ses = ses;
918         result = lu_context_init(&env->le_ctx, tags);
919         if (result == 0)
920                 lu_context_enter(&env->le_ctx);
921         return result;
922 }
923 EXPORT_SYMBOL(lu_env_init);
924
925 void lu_env_fini(struct lu_env *env)
926 {
927         lu_context_exit(&env->le_ctx);
928         lu_context_fini(&env->le_ctx);
929         env->le_ses = NULL;
930 }
931 EXPORT_SYMBOL(lu_env_fini);
932
933 /*
934  * Initialization of global lu_* data.
935  */
936 int lu_global_init(void)
937 {
938         int result;
939
940         result = lu_context_key_register(&lu_cdebug_key);
941         return result;
942 }
943
944 /*
945  * Dual to lu_global_init().
946  */
947 void lu_global_fini(void)
948 {
949         lu_context_key_degister(&lu_cdebug_key);
950 }
951
952 struct lu_buf LU_BUF_NULL = {
953         .lb_buf = NULL,
954         .lb_len = 0
955 };
956 EXPORT_SYMBOL(LU_BUF_NULL);