Whamcloud - gitweb
lu_context_key: make ->lct_used atomic to avoid races
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Object.
5  *
6  *  Copyright (C) 2006 Cluster File Systems, Inc.
7  *   Author: Nikita Danilov <nikita@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  *
27  * These are the only exported functions, they provide some generic
28  * infrastructure for managing object devices
29  */
30
31 #define DEBUG_SUBSYSTEM S_CLASS
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35
36 #include <linux/seq_file.h>
37 #include <linux/module.h>
38 #include <obd_support.h>
39 #include <lustre_disk.h>
40 #include <lustre_fid.h>
41 #include <lu_object.h>
42 #include <libcfs/list.h>
43
44 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
45
46 /*
47  * Decrease reference counter on object. If last reference is freed, return
48  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
49  * case, free object immediately.
50  */
51 void lu_object_put(const struct lu_env *env, struct lu_object *o)
52 {
53         struct lu_object_header *top;
54         struct lu_site          *site;
55         struct lu_object        *orig;
56         int                      kill_it;
57
58         top = o->lo_header;
59         site = o->lo_dev->ld_site;
60         orig = o;
61         kill_it = 0;
62         spin_lock(&site->ls_guard);
63         if (-- top->loh_ref == 0) {
64                 /*
65                  * When last reference is released, iterate over object
66                  * layers, and notify them that object is no longer busy.
67                  */
68                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
69                         if (o->lo_ops->loo_object_release != NULL)
70                                 o->lo_ops->loo_object_release(env, o);
71                 }
72                 -- site->ls_busy;
73                 if (lu_object_is_dying(top)) {
74                         /*
75                          * If object is dying (will not be cached), removed it
76                          * from hash table and LRU.
77                          *
78                          * This is done with hash table and LRU lists
79                          * locked. As the only way to acquire first reference
80                          * to previously unreferenced object is through
81                          * hash-table lookup (lu_object_find()), or LRU
82                          * scanning (lu_site_purge()), that are done under
83                          * hash-table and LRU lock, no race with concurrent
84                          * object lookup is possible and we can safely destroy
85                          * object below.
86                          */
87                         hlist_del_init(&top->loh_hash);
88                         list_del_init(&top->loh_lru);
89                         kill_it = 1;
90                 }
91         }
92         spin_unlock(&site->ls_guard);
93         if (kill_it)
94                 /*
95                  * Object was already removed from hash and lru above, can
96                  * kill it.
97                  */
98                 lu_object_free(env, orig);
99 }
100 EXPORT_SYMBOL(lu_object_put);
101
102 /*
103  * Allocate new object.
104  *
105  * This follows object creation protocol, described in the comment within
106  * struct lu_device_operations definition.
107  */
108 static struct lu_object *lu_object_alloc(const struct lu_env *env,
109                                          struct lu_site *s,
110                                          const struct lu_fid *f,
111                                          const struct lustre_capa *capa)
112 {
113         struct lu_object *scan;
114         struct lu_object *top;
115         struct list_head *layers;
116         int clean;
117         int result;
118
119         /*
120          * Create top-level object slice. This will also create
121          * lu_object_header.
122          */
123         top = s->ls_top_dev->ld_ops->ldo_object_alloc(env,
124                                                       NULL, s->ls_top_dev);
125         if (IS_ERR(top))
126                 RETURN(top);
127         s->ls_total ++;
128         /*
129          * This is the only place where object fid is assigned. It's constant
130          * after this point.
131          */
132         top->lo_header->loh_fid  = *f;
133         if (capa == BYPASS_CAPA)
134                 lu_object_bypass_capa(top);
135         else
136                 top->lo_header->loh_capa = *capa;
137         layers = &top->lo_header->loh_layers;
138         do {
139                 /*
140                  * Call ->loo_object_init() repeatedly, until no more new
141                  * object slices are created.
142                  */
143                 clean = 1;
144                 list_for_each_entry(scan, layers, lo_linkage) {
145                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
146                                 continue;
147                         clean = 0;
148                         scan->lo_header = top->lo_header;
149                         result = scan->lo_ops->loo_object_init(env, scan);
150                         if (result != 0) {
151                                 lu_object_free(env, top);
152                                 RETURN(ERR_PTR(result));
153                         }
154                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
155                 }
156         } while (!clean);
157
158         list_for_each_entry_reverse(scan, layers, lo_linkage) {
159                 if (scan->lo_ops->loo_object_start != NULL) {
160                         result = scan->lo_ops->loo_object_start(env, scan);
161                         if (result != 0) {
162                                 lu_object_free(env, top);
163                                 RETURN(ERR_PTR(result));
164                         }
165                 }
166         }
167
168         s->ls_stats.s_created ++;
169         RETURN(top);
170 }
171
172 /*
173  * Free object.
174  */
175 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
176 {
177         struct list_head splice;
178         struct lu_object *scan;
179
180         /*
181          * First call ->loo_object_delete() method to release all resources.
182          */
183         list_for_each_entry_reverse(scan,
184                                     &o->lo_header->loh_layers, lo_linkage) {
185                 if (scan->lo_ops->loo_object_delete != NULL)
186                         scan->lo_ops->loo_object_delete(env, scan);
187         }
188         -- o->lo_dev->ld_site->ls_total;
189         /*
190          * Then, splice object layers into stand-alone list, and call
191          * ->loo_object_free() on all layers to free memory. Splice is
192          * necessary, because lu_object_header is freed together with the
193          * top-level slice.
194          */
195         INIT_LIST_HEAD(&splice);
196         list_splice_init(&o->lo_header->loh_layers, &splice);
197         while (!list_empty(&splice)) {
198                 o = container_of0(splice.next, struct lu_object, lo_linkage);
199                 list_del_init(&o->lo_linkage);
200                 LASSERT(o->lo_ops->loo_object_free != NULL);
201                 o->lo_ops->loo_object_free(env, o);
202         }
203 }
204
205 /*
206  * Free @nr objects from the cold end of the site LRU list.
207  */
208 void lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
209 {
210         struct list_head         dispose;
211         struct lu_object_header *h;
212         struct lu_object_header *temp;
213
214         INIT_LIST_HEAD(&dispose);
215         /*
216          * Under LRU list lock, scan LRU list and move unreferenced objects to
217          * the dispose list, removing them from LRU and hash table.
218          */
219         spin_lock(&s->ls_guard);
220         list_for_each_entry_safe(h, temp, &s->ls_lru, loh_lru) {
221                 if (nr-- == 0)
222                         break;
223                 if (h->loh_ref > 0)
224                         continue;
225                 hlist_del_init(&h->loh_hash);
226                 list_move(&h->loh_lru, &dispose);
227         }
228         spin_unlock(&s->ls_guard);
229         /*
230          * Free everything on the dispose list. This is safe against races due
231          * to the reasons described in lu_object_put().
232          */
233         while (!list_empty(&dispose)) {
234                 h = container_of0(dispose.next,
235                                  struct lu_object_header, loh_lru);
236                 list_del_init(&h->loh_lru);
237                 lu_object_free(env, lu_object_top(h));
238                 s->ls_stats.s_lru_purged ++;
239         }
240 }
241 EXPORT_SYMBOL(lu_site_purge);
242
243 /*
244  * Object printing.
245  *
246  * Code below has to jump through certain loops to output object description
247  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
248  * composes object description from strings that are parts of _lines_ of
249  * output (i.e., strings that are not terminated by newline). This doesn't fit
250  * very well into libcfs_debug_msg() interface that assumes that each message
251  * supplied to it is a self-contained output line.
252  *
253  * To work around this, strings are collected in a temporary buffer
254  * (implemented as a value of lu_cdebug_key key), until terminating newline
255  * character is detected.
256  *
257  */
258
259 enum {
260         /*
261          * Maximal line size.
262          *
263          * XXX overflow is not handled correctly.
264          */
265         LU_CDEBUG_LINE = 256
266 };
267
268 struct lu_cdebug_data {
269         /*
270          * Temporary buffer.
271          */
272         char lck_area[LU_CDEBUG_LINE];
273 };
274
275 static void *lu_cdebug_key_init(const struct lu_context *ctx,
276                                 struct lu_context_key *key)
277 {
278         struct lu_cdebug_data *value;
279
280         OBD_ALLOC_PTR(value);
281         if (value == NULL)
282                 value = ERR_PTR(-ENOMEM);
283         return value;
284 }
285
286 static void lu_cdebug_key_fini(const struct lu_context *ctx,
287                                struct lu_context_key *key, void *data)
288 {
289         struct lu_cdebug_data *value = data;
290         OBD_FREE_PTR(value);
291 }
292
293 /*
294  * Key, holding temporary buffer. This key is registered very early by
295  * lu_global_init().
296  */
297 static struct lu_context_key lu_cdebug_key = {
298         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
299         .lct_init = lu_cdebug_key_init,
300         .lct_fini = lu_cdebug_key_fini
301 };
302
303 /*
304  * Printer function emitting messages through libcfs_debug_msg().
305  */
306 int lu_cdebug_printer(const struct lu_env *env,
307                       void *cookie, const char *format, ...)
308 {
309         struct lu_cdebug_print_info *info = cookie;
310         struct lu_cdebug_data       *key;
311         int used;
312         int complete;
313         va_list args;
314
315         va_start(args, format);
316
317         key = lu_context_key_get(&env->le_ctx, &lu_cdebug_key);
318         LASSERT(key != NULL);
319
320         used = strlen(key->lck_area);
321         complete = format[strlen(format) - 1] == '\n';
322         /*
323          * Append new chunk to the buffer.
324          */
325         vsnprintf(key->lck_area + used,
326                   ARRAY_SIZE(key->lck_area) - used, format, args);
327         if (complete) {
328                 libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
329                                  (char *)info->lpi_file, info->lpi_fn,
330                                  info->lpi_line, "%s", key->lck_area);
331                 key->lck_area[0] = 0;
332         }
333         va_end(args);
334         return 0;
335 }
336 EXPORT_SYMBOL(lu_cdebug_printer);
337
338 /*
339  * Print object header.
340  */
341 static void lu_object_header_print(const struct lu_env *env,
342                                    void *cookie, lu_printer_t printer,
343                                    const struct lu_object_header *hdr)
344 {
345         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s]",
346                    hdr, hdr->loh_flags, hdr->loh_ref, PFID(&hdr->loh_fid),
347                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
348                    list_empty(&hdr->loh_lru) ? "" : " lru");
349 }
350
351 /*
352  * Print human readable representation of the @o to the @printer.
353  */
354 void lu_object_print(const struct lu_env *env, void *cookie,
355                      lu_printer_t printer, const struct lu_object *o)
356 {
357         static const char ruler[] = "........................................";
358         struct lu_object_header *top;
359         int depth;
360
361         top = o->lo_header;
362         lu_object_header_print(env, cookie, printer, top);
363         (*printer)(env, cookie, "\n");
364         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
365                 depth = o->lo_depth + 4;
366                 LASSERT(o->lo_ops->loo_object_print != NULL);
367                 /*
368                  * print `.' @depth times.
369                  */
370                 (*printer)(env, cookie, "%*.*s", depth, depth, ruler);
371                 o->lo_ops->loo_object_print(env, cookie, printer, o);
372                 (*printer)(env, cookie, "\n");
373         }
374 }
375 EXPORT_SYMBOL(lu_object_print);
376
377 /*
378  * Check object consistency.
379  */
380 int lu_object_invariant(const struct lu_object *o)
381 {
382         struct lu_object_header *top;
383
384         top = o->lo_header;
385         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
386                 if (o->lo_ops->loo_object_invariant != NULL &&
387                     !o->lo_ops->loo_object_invariant(o))
388                         return 0;
389         }
390         return 1;
391 }
392 EXPORT_SYMBOL(lu_object_invariant);
393
394 static struct lu_object *htable_lookup(struct lu_site *s,
395                                        const struct hlist_head *bucket,
396                                        const struct lu_fid *f)
397 {
398         struct lu_object_header *h;
399         struct hlist_node *scan;
400
401         hlist_for_each_entry(h, scan, bucket, loh_hash) {
402                 s->ls_stats.s_cache_check ++;
403                 if (lu_fid_eq(&h->loh_fid, f) && !lu_object_is_dying(h)) {
404                         /* bump reference count... */
405                         if (h->loh_ref ++ == 0)
406                                 ++ s->ls_busy;
407                         /* and move to the head of the LRU */
408                         list_move_tail(&h->loh_lru, &s->ls_lru);
409                         s->ls_stats.s_cache_hit ++;
410                         return lu_object_top(h);
411                 }
412         }
413         s->ls_stats.s_cache_miss ++;
414         return NULL;
415 }
416
417 static __u32 fid_hash(const struct lu_fid *f)
418 {
419         /* all objects with same id and different versions will belong to same
420          * collisions list. */
421         return (fid_seq(f) - 1) * LUSTRE_SEQ_MAX_WIDTH + fid_oid(f);
422 }
423
424 /*
425  * Search cache for an object with the fid @f. If such object is found, return
426  * it. Otherwise, create new object, insert it into cache and return it. In
427  * any case, additional reference is acquired on the returned object.
428  */
429 struct lu_object *lu_object_find(const struct lu_env *env,
430                                  struct lu_site *s, const struct lu_fid *f,
431                                  struct lustre_capa *capa)
432 {
433         struct lu_object  *o;
434         struct lu_object  *shadow;
435         struct hlist_head *bucket;
436         int                rc;
437
438         /*
439          * This uses standard index maintenance protocol:
440          *
441          *     - search index under lock, and return object if found;
442          *     - otherwise, unlock index, allocate new object;
443          *     - lock index and search again;
444          *     - if nothing is found (usual case), insert newly created
445          *       object into index;
446          *     - otherwise (race: other thread inserted object), free
447          *       object just allocated.
448          *     - unlock index;
449          *     - return object.
450          */
451
452         bucket = s->ls_hash + (fid_hash(f) & s->ls_hash_mask);
453         spin_lock(&s->ls_guard);
454         o = htable_lookup(s, bucket, f);
455
456         spin_unlock(&s->ls_guard);
457         if (o != NULL) {
458                 if (capa == BYPASS_CAPA) {
459                         o->lo_header->loh_capa_bypass = 1;
460                 } else {
461                         rc = lu_object_auth(env, o, capa,
462                                             CAPA_OPC_INDEX_LOOKUP);
463                         if (rc)
464                                 return ERR_PTR(rc);
465                         if (capa)
466                                 o->lo_header->loh_capa = *capa;
467                 }
468                 return o;
469         }
470
471         /*
472          * Allocate new object. This may result in rather complicated
473          * operations, including fld queries, inode loading, etc.
474          */
475         o = lu_object_alloc(env, s, f, capa);
476         if (IS_ERR(o))
477                 return o;
478
479         LASSERT(lu_fid_eq(lu_object_fid(o), f));
480
481         spin_lock(&s->ls_guard);
482         shadow = htable_lookup(s, bucket, f);
483         if (shadow == NULL) {
484                 hlist_add_head(&o->lo_header->loh_hash, bucket);
485                 list_add_tail(&o->lo_header->loh_lru, &s->ls_lru);
486                 ++ s->ls_busy;
487                 shadow = o;
488                 o = NULL;
489         } else
490                 s->ls_stats.s_cache_race ++;
491         spin_unlock(&s->ls_guard);
492         if (o != NULL)
493                 lu_object_free(env, o);
494         return shadow;
495 }
496 EXPORT_SYMBOL(lu_object_find);
497
498 int lu_object_auth(const struct lu_env *env, const struct lu_object *o,
499                    struct lustre_capa *capa, __u64 opc)
500 {
501         struct lu_object_header *top = o->lo_header;
502         int rc;
503
504         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
505                 if (o->lo_ops->loo_object_auth) {
506                         rc = o->lo_ops->loo_object_auth(env, o, capa, opc);
507                         if (rc)
508                                 return rc;
509                 }
510         }
511
512         return 0;
513 }
514 EXPORT_SYMBOL(lu_object_auth);
515
516 enum {
517         LU_SITE_HTABLE_BITS = 8,
518         LU_SITE_HTABLE_SIZE = (1 << LU_SITE_HTABLE_BITS),
519         LU_SITE_HTABLE_MASK = LU_SITE_HTABLE_SIZE - 1
520 };
521
522 /*
523  * Initialize site @s, with @d as the top level device.
524  */
525 int lu_site_init(struct lu_site *s, struct lu_device *top)
526 {
527         int result;
528         ENTRY;
529
530         memset(s, 0, sizeof *s);
531         spin_lock_init(&s->ls_guard);
532         CFS_INIT_LIST_HEAD(&s->ls_lru);
533         s->ls_top_dev = top;
534         top->ld_site = s;
535         lu_device_get(top);
536         /*
537          * XXX nikita: fixed size hash-table.
538          */
539         s->ls_hash_mask = LU_SITE_HTABLE_MASK;
540         OBD_ALLOC(s->ls_hash, LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
541         if (s->ls_hash != NULL) {
542                 int i;
543                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
544                         INIT_HLIST_HEAD(&s->ls_hash[i]);
545                 result = 0;
546         } else {
547                 result = -ENOMEM;
548         }
549
550         RETURN(result);
551 }
552 EXPORT_SYMBOL(lu_site_init);
553
554 /*
555  * Finalize @s and release its resources.
556  */
557 void lu_site_fini(struct lu_site *s)
558 {
559         LASSERT(list_empty(&s->ls_lru));
560         LASSERT(s->ls_total == 0);
561         LASSERT(s->ls_busy == 0);
562
563         if (s->ls_hash != NULL) {
564                 int i;
565                 for (i = 0; i < LU_SITE_HTABLE_SIZE; i++)
566                         LASSERT(hlist_empty(&s->ls_hash[i]));
567                 OBD_FREE(s->ls_hash,
568                          LU_SITE_HTABLE_SIZE * sizeof s->ls_hash[0]);
569                 s->ls_hash = NULL;
570        }
571        if (s->ls_top_dev != NULL) {
572                s->ls_top_dev->ld_site = NULL;
573                lu_device_put(s->ls_top_dev);
574                s->ls_top_dev = NULL;
575        }
576  }
577 EXPORT_SYMBOL(lu_site_fini);
578
579 /*
580  * Acquire additional reference on device @d
581  */
582 void lu_device_get(struct lu_device *d)
583 {
584         atomic_inc(&d->ld_ref);
585 }
586 EXPORT_SYMBOL(lu_device_get);
587
588 /*
589  * Release reference on device @d.
590  */
591 void lu_device_put(struct lu_device *d)
592 {
593         atomic_dec(&d->ld_ref);
594 }
595 EXPORT_SYMBOL(lu_device_put);
596
597 /*
598  * Initialize device @d of type @t.
599  */
600 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
601 {
602         memset(d, 0, sizeof *d);
603         atomic_set(&d->ld_ref, 0);
604         d->ld_type = t;
605         return 0;
606 }
607 EXPORT_SYMBOL(lu_device_init);
608
609 /*
610  * Finalize device @d.
611  */
612 void lu_device_fini(struct lu_device *d)
613 {
614         if (d->ld_obd != NULL)
615                 /* finish lprocfs */
616                 lprocfs_obd_cleanup(d->ld_obd);
617
618         LASSERTF(atomic_read(&d->ld_ref) == 0,
619                  "Refcount is %u\n", atomic_read(&d->ld_ref));
620 }
621 EXPORT_SYMBOL(lu_device_fini);
622
623 /*
624  * Initialize object @o that is part of compound object @h and was created by
625  * device @d.
626  */
627 int lu_object_init(struct lu_object *o,
628                    struct lu_object_header *h, struct lu_device *d)
629 {
630         memset(o, 0, sizeof *o);
631         o->lo_header = h;
632         o->lo_dev    = d;
633         lu_device_get(d);
634         CFS_INIT_LIST_HEAD(&o->lo_linkage);
635         return 0;
636 }
637 EXPORT_SYMBOL(lu_object_init);
638
639 /*
640  * Finalize object and release its resources.
641  */
642 void lu_object_fini(struct lu_object *o)
643 {
644         LASSERT(list_empty(&o->lo_linkage));
645
646         if (o->lo_dev != NULL) {
647                 lu_device_put(o->lo_dev);
648                 o->lo_dev = NULL;
649         }
650 }
651 EXPORT_SYMBOL(lu_object_fini);
652
653 /*
654  * Add object @o as first layer of compound object @h
655  *
656  * This is typically called by the ->ldo_object_alloc() method of top-level
657  * device.
658  */
659 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
660 {
661         list_move(&o->lo_linkage, &h->loh_layers);
662 }
663 EXPORT_SYMBOL(lu_object_add_top);
664
665 /*
666  * Add object @o as a layer of compound object, going after @before.1
667  *
668  * This is typically called by the ->ldo_object_alloc() method of
669  * @before->lo_dev.
670  */
671 void lu_object_add(struct lu_object *before, struct lu_object *o)
672 {
673         list_move(&o->lo_linkage, &before->lo_linkage);
674 }
675 EXPORT_SYMBOL(lu_object_add);
676
677 /*
678  * Initialize compound object.
679  */
680 int lu_object_header_init(struct lu_object_header *h)
681 {
682         memset(h, 0, sizeof *h);
683         h->loh_ref = 1;
684         INIT_HLIST_NODE(&h->loh_hash);
685         CFS_INIT_LIST_HEAD(&h->loh_lru);
686         CFS_INIT_LIST_HEAD(&h->loh_layers);
687         return 0;
688 }
689 EXPORT_SYMBOL(lu_object_header_init);
690
691 /*
692  * Finalize compound object.
693  */
694 void lu_object_header_fini(struct lu_object_header *h)
695 {
696         LASSERT(list_empty(&h->loh_layers));
697         LASSERT(list_empty(&h->loh_lru));
698         LASSERT(hlist_unhashed(&h->loh_hash));
699 }
700 EXPORT_SYMBOL(lu_object_header_fini);
701
702 /*
703  * Given a compound object, find its slice, corresponding to the device type
704  * @dtype.
705  */
706 struct lu_object *lu_object_locate(struct lu_object_header *h,
707                                    struct lu_device_type *dtype)
708 {
709         struct lu_object *o;
710
711         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
712                 if (o->lo_dev->ld_type == dtype)
713                         return o;
714         }
715         return NULL;
716 }
717 EXPORT_SYMBOL(lu_object_locate);
718
719 enum {
720         /*
721          * Maximal number of tld slots.
722          */
723         LU_CONTEXT_KEY_NR = 16
724 };
725
726 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
727
728 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
729
730 /*
731  * Register new key.
732  */
733 int lu_context_key_register(struct lu_context_key *key)
734 {
735         int result;
736         int i;
737
738         LASSERT(key->lct_init != NULL);
739         LASSERT(key->lct_fini != NULL);
740         LASSERT(key->lct_tags != 0);
741
742         result = -ENFILE;
743         spin_lock(&lu_keys_guard);
744         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
745                 if (lu_keys[i] == NULL) {
746                         key->lct_index = i;
747                         atomic_set(&key->lct_used, 1);
748                         lu_keys[i] = key;
749                         result = 0;
750                         break;
751                 }
752         }
753         spin_unlock(&lu_keys_guard);
754         return result;
755 }
756 EXPORT_SYMBOL(lu_context_key_register);
757
758 /*
759  * Deregister key.
760  */
761 void lu_context_key_degister(struct lu_context_key *key)
762 {
763         LASSERT(atomic_read(&key->lct_used) >= 1);
764         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
765
766         if (atomic_read(&key->lct_used) > 1)
767                 CERROR("key has instances.\n");
768         spin_lock(&lu_keys_guard);
769         lu_keys[key->lct_index] = NULL;
770         spin_unlock(&lu_keys_guard);
771 }
772 EXPORT_SYMBOL(lu_context_key_degister);
773
774 /*
775  * Return value associated with key @key in context @ctx.
776  */
777 void *lu_context_key_get(const struct lu_context *ctx,
778                          struct lu_context_key *key)
779 {
780         LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
781         return ctx->lc_value[key->lct_index];
782 }
783 EXPORT_SYMBOL(lu_context_key_get);
784
785 static void keys_fini(struct lu_context *ctx)
786 {
787         int i;
788
789         if (ctx->lc_value != NULL) {
790                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
791                         if (ctx->lc_value[i] != NULL) {
792                                 struct lu_context_key *key;
793
794                                 key = lu_keys[i];
795                                 LASSERT(key != NULL);
796                                 LASSERT(key->lct_fini != NULL);
797                                 LASSERT(atomic_read(&key->lct_used) > 1);
798
799                                 key->lct_fini(ctx, key, ctx->lc_value[i]);
800                                 atomic_dec(&key->lct_used);
801                                 ctx->lc_value[i] = NULL;
802                         }
803                 }
804                 OBD_FREE(ctx->lc_value,
805                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
806                 ctx->lc_value = NULL;
807         }
808 }
809
810 static int keys_fill(const struct lu_context *ctx)
811 {
812         int i;
813
814         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
815                 struct lu_context_key *key;
816
817                 key = lu_keys[i];
818                 if (ctx->lc_value[i] == NULL &&
819                     key != NULL && key->lct_tags & ctx->lc_tags) {
820                         void *value;
821
822                         LASSERT(key->lct_init != NULL);
823                         LASSERT(key->lct_index == i);
824
825                         value = key->lct_init(ctx, key);
826                         if (IS_ERR(value))
827                                 return PTR_ERR(value);
828                         atomic_inc(&key->lct_used);
829                         ctx->lc_value[i] = value;
830                 }
831         }
832         return 0;
833 }
834
835 static int keys_init(struct lu_context *ctx)
836 {
837         int result;
838
839         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
840         if (ctx->lc_value != NULL)
841                 result = keys_fill(ctx);
842         else
843                 result = -ENOMEM;
844
845         if (result != 0)
846                 keys_fini(ctx);
847         return result;
848 }
849
850 /*
851  * Initialize context data-structure. Create values for all keys.
852  */
853 int lu_context_init(struct lu_context *ctx, __u32 tags)
854 {
855         memset(ctx, 0, sizeof *ctx);
856         ctx->lc_tags = tags;
857         return keys_init(ctx);
858 }
859 EXPORT_SYMBOL(lu_context_init);
860
861 /*
862  * Finalize context data-structure. Destroy key values.
863  */
864 void lu_context_fini(struct lu_context *ctx)
865 {
866         keys_fini(ctx);
867 }
868 EXPORT_SYMBOL(lu_context_fini);
869
870 /*
871  * Called before entering context.
872  */
873 void lu_context_enter(struct lu_context *ctx)
874 {
875 }
876 EXPORT_SYMBOL(lu_context_enter);
877
878 /*
879  * Called after exiting from @ctx
880  */
881 void lu_context_exit(struct lu_context *ctx)
882 {
883         int i;
884
885         if (ctx->lc_value != NULL) {
886                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
887                         if (ctx->lc_value[i] != NULL) {
888                                 struct lu_context_key *key;
889
890                                 key = lu_keys[i];
891                                 LASSERT(key != NULL);
892                                 if (key->lct_exit != NULL)
893                                         key->lct_exit(ctx,
894                                                       key, ctx->lc_value[i]);
895                         }
896                 }
897         }
898 }
899 EXPORT_SYMBOL(lu_context_exit);
900
901 /*
902  * Allocate for context all missing keys that were registered after context
903  * creation.
904  */
905 int lu_context_refill(const struct lu_context *ctx)
906 {
907         LASSERT(ctx->lc_value != NULL);
908         return keys_fill(ctx);
909 }
910 EXPORT_SYMBOL(lu_context_refill);
911
912 int lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags)
913 {
914         int result;
915
916         env->le_ses = ses;
917         result = lu_context_init(&env->le_ctx, tags);
918         if (result == 0)
919                 lu_context_enter(&env->le_ctx);
920         return result;
921 }
922 EXPORT_SYMBOL(lu_env_init);
923
924 void lu_env_fini(struct lu_env *env)
925 {
926         lu_context_exit(&env->le_ctx);
927         lu_context_fini(&env->le_ctx);
928         env->le_ses = NULL;
929 }
930 EXPORT_SYMBOL(lu_env_fini);
931
932 /*
933  * Initialization of global lu_* data.
934  */
935 int lu_global_init(void)
936 {
937         int result;
938
939         result = lu_context_key_register(&lu_cdebug_key);
940         return result;
941 }
942
943 /*
944  * Dual to lu_global_init().
945  */
946 void lu_global_fini(void)
947 {
948         lu_context_key_degister(&lu_cdebug_key);
949 }
950
951 struct lu_buf LU_BUF_NULL = {
952         .lb_buf = NULL,
953         .lb_len = 0
954 };
955 EXPORT_SYMBOL(LU_BUF_NULL);