Whamcloud - gitweb
b=18551 adapt lu_site to new cfs_hash
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  *   Author: Nikita Danilov <nikita.danilov@sun.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49
50 #include <libcfs/libcfs.h>
51
52 #ifdef __KERNEL__
53 # include <linux/module.h>
54 #endif
55
56 /* hash_long() */
57 #include <libcfs/libcfs_hash.h>
58 #include <obd_class.h>
59 #include <obd_support.h>
60 #include <lustre_disk.h>
61 #include <lustre_fid.h>
62 #include <lu_object.h>
63 #include <libcfs/list.h>
64 /* lu_time_global_{init,fini}() */
65 #include <lu_time.h>
66
67 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
68
69 /**
70  * Decrease reference counter on object. If last reference is freed, return
71  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
72  * case, free object immediately.
73  */
74 void lu_object_put(const struct lu_env *env, struct lu_object *o)
75 {
76         struct lu_object_header *top;
77         struct lu_site          *site;
78         struct lu_object        *orig;
79         cfs_hash_bd_t            bd;
80
81         top  = o->lo_header;
82         site = o->lo_dev->ld_site;
83         orig = o;
84
85         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
86         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
87                 if (lu_object_is_dying(top)) {
88                         struct lu_site_bkt_data *bkt;
89
90                         /*
91                          * somebody may be waiting for this, currently only
92                          * used for cl_object, see cl_object_put_last().
93                          */
94                         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
95                         cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
96                 }
97                 return;
98         }
99
100         /*
101          * When last reference is released, iterate over object
102          * layers, and notify them that object is no longer busy.
103          */
104         cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
105                 if (o->lo_ops->loo_object_release != NULL)
106                         o->lo_ops->loo_object_release(env, o);
107         }
108
109         if (!lu_object_is_dying(top)) {
110                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
111                 return;
112         }
113
114         /*
115          * If object is dying (will not be cached), removed it
116          * from hash table and LRU.
117          *
118          * This is done with hash table and LRU lists locked. As the only
119          * way to acquire first reference to previously unreferenced
120          * object is through hash-table lookup (lu_object_find()),
121          * or LRU scanning (lu_site_purge()), that are done under hash-table
122          * and LRU lock, no race with concurrent object lookup is possible
123          * and we can safely destroy object below.
124          */
125         cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
126         cfs_list_del_init(&top->loh_lru);
127         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
128         /*
129          * Object was already removed from hash and lru above, can
130          * kill it.
131          */
132         lu_object_free(env, orig);
133 }
134 EXPORT_SYMBOL(lu_object_put);
135
136 /**
137  * Allocate new object.
138  *
139  * This follows object creation protocol, described in the comment within
140  * struct lu_device_operations definition.
141  */
142 static struct lu_object *lu_object_alloc(const struct lu_env *env,
143                                          struct lu_device *dev,
144                                          const struct lu_fid *f,
145                                          const struct lu_object_conf *conf)
146 {
147         struct lu_object *scan;
148         struct lu_object *top;
149         cfs_list_t *layers;
150         int clean;
151         int result;
152         ENTRY;
153
154         /*
155          * Create top-level object slice. This will also create
156          * lu_object_header.
157          */
158         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
159         if (top == NULL)
160                 RETURN(ERR_PTR(-ENOMEM));
161         /*
162          * This is the only place where object fid is assigned. It's constant
163          * after this point.
164          */
165         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
166         top->lo_header->loh_fid = *f;
167         layers = &top->lo_header->loh_layers;
168         do {
169                 /*
170                  * Call ->loo_object_init() repeatedly, until no more new
171                  * object slices are created.
172                  */
173                 clean = 1;
174                 cfs_list_for_each_entry(scan, layers, lo_linkage) {
175                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
176                                 continue;
177                         clean = 0;
178                         scan->lo_header = top->lo_header;
179                         result = scan->lo_ops->loo_object_init(env, scan, conf);
180                         if (result != 0) {
181                                 lu_object_free(env, top);
182                                 RETURN(ERR_PTR(result));
183                         }
184                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
185                 }
186         } while (!clean);
187
188         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
189                 if (scan->lo_ops->loo_object_start != NULL) {
190                         result = scan->lo_ops->loo_object_start(env, scan);
191                         if (result != 0) {
192                                 lu_object_free(env, top);
193                                 RETURN(ERR_PTR(result));
194                         }
195                 }
196         }
197
198         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
199         RETURN(top);
200 }
201
202 /**
203  * Free an object.
204  */
205 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
206 {
207         struct lu_site_bkt_data *bkt;
208         struct lu_site          *site;
209         struct lu_object        *scan;
210         cfs_list_t              *layers;
211         cfs_list_t               splice;
212
213         site   = o->lo_dev->ld_site;
214         layers = &o->lo_header->loh_layers;
215         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
216         /*
217          * First call ->loo_object_delete() method to release all resources.
218          */
219         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
220                 if (scan->lo_ops->loo_object_delete != NULL)
221                         scan->lo_ops->loo_object_delete(env, scan);
222         }
223
224         /*
225          * Then, splice object layers into stand-alone list, and call
226          * ->loo_object_free() on all layers to free memory. Splice is
227          * necessary, because lu_object_header is freed together with the
228          * top-level slice.
229          */
230         CFS_INIT_LIST_HEAD(&splice);
231         cfs_list_splice_init(layers, &splice);
232         while (!cfs_list_empty(&splice)) {
233                 /*
234                  * Free layers in bottom-to-top order, so that object header
235                  * lives as long as possible and ->loo_object_free() methods
236                  * can look at its contents.
237                  */
238                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
239                 cfs_list_del_init(&o->lo_linkage);
240                 LASSERT(o->lo_ops->loo_object_free != NULL);
241                 o->lo_ops->loo_object_free(env, o);
242         }
243
244         if (cfs_waitq_active(&bkt->lsb_marche_funebre))
245                 cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
246 }
247
248 /**
249  * Free \a nr objects from the cold end of the site LRU list.
250  */
251 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
252 {
253         struct lu_object_header *h;
254         struct lu_object_header *temp;
255         struct lu_site_bkt_data *bkt;
256         cfs_hash_bd_t            bd;
257         cfs_hash_bd_t            bd2;
258         cfs_list_t               dispose;
259         int                      did_sth;
260         int                      start;
261         int                      count;
262         int                      bnr;
263         int                      i;
264
265         CFS_INIT_LIST_HEAD(&dispose);
266         /*
267          * Under LRU list lock, scan LRU list and move unreferenced objects to
268          * the dispose list, removing them from LRU and hash table.
269          */
270         start = s->ls_purge_start;
271         bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
272  again:
273         did_sth = 0;
274         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
275                 if (i < start)
276                         continue;
277                 count = bnr;
278                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
279                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
280
281                 cfs_list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
282                         /*
283                          * Objects are sorted in lru order, and "busy"
284                          * objects (ones with h->loh_ref > 0) naturally tend to
285                          * live near hot end that we scan last. Unfortunately,
286                          * sites usually have small (less then ten) number of
287                          * busy yet rarely accessed objects (some global
288                          * objects, accessed directly through pointers,
289                          * bypassing hash table).
290                          * Currently algorithm scans them over and over again.
291                          * Probably we should move busy objects out of LRU,
292                          * or we can live with that.
293                          */
294                         if (cfs_atomic_read(&h->loh_ref) > 0)
295                                 continue;
296
297                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
298                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
299
300                         cfs_hash_bd_del_locked(s->ls_obj_hash,
301                                                &bd2, &h->loh_hash);
302                         cfs_list_move(&h->loh_lru, &dispose);
303                         if (did_sth == 0)
304                                 did_sth = 1;
305
306                         if (nr != ~0 && --nr == 0)
307                                 break;
308
309                         if (count > 0 && --count == 0)
310                                 break;
311
312                 }
313                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
314                 cfs_cond_resched();
315                 /*
316                  * Free everything on the dispose list. This is safe against
317                  * races due to the reasons described in lu_object_put().
318                  */
319                 while (!cfs_list_empty(&dispose)) {
320                         h = container_of0(dispose.next,
321                                           struct lu_object_header, loh_lru);
322                         cfs_list_del_init(&h->loh_lru);
323                         lu_object_free(env, lu_object_top(h));
324                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
325                 }
326
327                 if (nr == 0)
328                         break;
329         }
330
331         if (nr != 0 && did_sth && start != 0) {
332                 start = 0; /* restart from the first bucket */
333                 goto again;
334         }
335         /* race on s->ls_purge_start, but nobody cares */
336         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
337
338         return nr;
339 }
340 EXPORT_SYMBOL(lu_site_purge);
341
342 /*
343  * Object printing.
344  *
345  * Code below has to jump through certain loops to output object description
346  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
347  * composes object description from strings that are parts of _lines_ of
348  * output (i.e., strings that are not terminated by newline). This doesn't fit
349  * very well into libcfs_debug_msg() interface that assumes that each message
350  * supplied to it is a self-contained output line.
351  *
352  * To work around this, strings are collected in a temporary buffer
353  * (implemented as a value of lu_cdebug_key key), until terminating newline
354  * character is detected.
355  *
356  */
357
358 enum {
359         /**
360          * Maximal line size.
361          *
362          * XXX overflow is not handled correctly.
363          */
364         LU_CDEBUG_LINE = 256
365 };
366
367 struct lu_cdebug_data {
368         /**
369          * Temporary buffer.
370          */
371         char lck_area[LU_CDEBUG_LINE];
372 };
373
374 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
375 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
376
377 /**
378  * Key, holding temporary buffer. This key is registered very early by
379  * lu_global_init().
380  */
381 struct lu_context_key lu_global_key = {
382         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
383         .lct_init = lu_global_key_init,
384         .lct_fini = lu_global_key_fini
385 };
386
387 /**
388  * Printer function emitting messages through libcfs_debug_msg().
389  */
390 int lu_cdebug_printer(const struct lu_env *env,
391                       void *cookie, const char *format, ...)
392 {
393         struct lu_cdebug_print_info *info = cookie;
394         struct lu_cdebug_data       *key;
395         int used;
396         int complete;
397         va_list args;
398
399         va_start(args, format);
400
401         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
402         LASSERT(key != NULL);
403
404         used = strlen(key->lck_area);
405         complete = format[strlen(format) - 1] == '\n';
406         /*
407          * Append new chunk to the buffer.
408          */
409         vsnprintf(key->lck_area + used,
410                   ARRAY_SIZE(key->lck_area) - used, format, args);
411         if (complete) {
412                 if (cfs_cdebug_show(info->lpi_mask, info->lpi_subsys))
413                         libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
414                                          (char *)info->lpi_file, info->lpi_fn,
415                                          info->lpi_line, "%s", key->lck_area);
416                 key->lck_area[0] = 0;
417         }
418         va_end(args);
419         return 0;
420 }
421 EXPORT_SYMBOL(lu_cdebug_printer);
422
423 /**
424  * Print object header.
425  */
426 void lu_object_header_print(const struct lu_env *env, void *cookie,
427                             lu_printer_t printer,
428                             const struct lu_object_header *hdr)
429 {
430         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
431                    hdr, hdr->loh_flags, cfs_atomic_read(&hdr->loh_ref),
432                    PFID(&hdr->loh_fid),
433                    cfs_hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
434                    cfs_list_empty((cfs_list_t *)&hdr->loh_lru) ? \
435                    "" : " lru",
436                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
437 }
438 EXPORT_SYMBOL(lu_object_header_print);
439
440 /**
441  * Print human readable representation of the \a o to the \a printer.
442  */
443 void lu_object_print(const struct lu_env *env, void *cookie,
444                      lu_printer_t printer, const struct lu_object *o)
445 {
446         static const char ruler[] = "........................................";
447         struct lu_object_header *top;
448         int depth;
449
450         top = o->lo_header;
451         lu_object_header_print(env, cookie, printer, top);
452         (*printer)(env, cookie, "{ \n");
453         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
454                 depth = o->lo_depth + 4;
455
456                 /*
457                  * print `.' \a depth times followed by type name and address
458                  */
459                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
460                            o->lo_dev->ld_type->ldt_name, o);
461                 if (o->lo_ops->loo_object_print != NULL)
462                         o->lo_ops->loo_object_print(env, cookie, printer, o);
463                 (*printer)(env, cookie, "\n");
464         }
465         (*printer)(env, cookie, "} header@%p\n", top);
466 }
467 EXPORT_SYMBOL(lu_object_print);
468
469 /**
470  * Check object consistency.
471  */
472 int lu_object_invariant(const struct lu_object *o)
473 {
474         struct lu_object_header *top;
475
476         top = o->lo_header;
477         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
478                 if (o->lo_ops->loo_object_invariant != NULL &&
479                     !o->lo_ops->loo_object_invariant(o))
480                         return 0;
481         }
482         return 1;
483 }
484 EXPORT_SYMBOL(lu_object_invariant);
485
486 static struct lu_object *htable_lookup(struct lu_site *s,
487                                        cfs_hash_bd_t *bd,
488                                        const struct lu_fid *f,
489                                        cfs_waitlink_t *waiter,
490                                        __u64 *version)
491 {
492         struct lu_site_bkt_data *bkt;
493         struct lu_object_header *h;
494         cfs_hlist_node_t        *hnode;
495         __u64  ver = cfs_hash_bd_version_get(bd);
496
497         if (*version == ver)
498                 return NULL;
499
500         *version = ver;
501         /* cfs_hash_bd_lookup_intent is a somehow "internal" function
502          * of cfs_hash, but we don't want refcount on object right now */
503         hnode = cfs_hash_bd_lookup_locked(s->ls_obj_hash, bd, (void *)f);
504         if (hnode == NULL) {
505                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
506                 return NULL;
507         }
508
509         h = container_of0(hnode, struct lu_object_header, loh_hash);
510         if (likely(!lu_object_is_dying(h))) {
511                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
512                 return lu_object_top(h);
513         }
514
515         /*
516          * Lookup found an object being destroyed this object cannot be
517          * returned (to assure that references to dying objects are eventually
518          * drained), and moreover, lookup has to wait until object is freed.
519          */
520         cfs_atomic_dec(&h->loh_ref);
521
522         cfs_waitlink_init(waiter);
523         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
524         cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
525         cfs_set_current_state(CFS_TASK_UNINT);
526         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
527         return ERR_PTR(-EAGAIN);
528 }
529
530 /**
531  * Search cache for an object with the fid \a f. If such object is found,
532  * return it. Otherwise, create new object, insert it into cache and return
533  * it. In any case, additional reference is acquired on the returned object.
534  */
535 struct lu_object *lu_object_find(const struct lu_env *env,
536                                  struct lu_device *dev, const struct lu_fid *f,
537                                  const struct lu_object_conf *conf)
538 {
539         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
540 }
541 EXPORT_SYMBOL(lu_object_find);
542
543 /**
544  * Core logic of lu_object_find*() functions.
545  */
546 static struct lu_object *lu_object_find_try(const struct lu_env *env,
547                                             struct lu_device *dev,
548                                             const struct lu_fid *f,
549                                             const struct lu_object_conf *conf,
550                                             cfs_waitlink_t *waiter)
551 {
552         struct lu_object      *o;
553         struct lu_object      *shadow;
554         struct lu_site        *s;
555         cfs_hash_t            *hs;
556         cfs_hash_bd_t          bd;
557         __u64                  version = 0;
558
559         /*
560          * This uses standard index maintenance protocol:
561          *
562          *     - search index under lock, and return object if found;
563          *     - otherwise, unlock index, allocate new object;
564          *     - lock index and search again;
565          *     - if nothing is found (usual case), insert newly created
566          *       object into index;
567          *     - otherwise (race: other thread inserted object), free
568          *       object just allocated.
569          *     - unlock index;
570          *     - return object.
571          *
572          * If dying object is found during index search, add @waiter to the
573          * site wait-queue and return ERR_PTR(-EAGAIN).
574          */
575         s  = dev->ld_site;
576         hs = s->ls_obj_hash;
577         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
578         o = htable_lookup(s, &bd, f, waiter, &version);
579         cfs_hash_bd_unlock(hs, &bd, 1);
580         if (o != NULL)
581                 return o;
582
583         /*
584          * Allocate new object. This may result in rather complicated
585          * operations, including fld queries, inode loading, etc.
586          */
587         o = lu_object_alloc(env, dev, f, conf);
588         if (unlikely(IS_ERR(o)))
589                 return o;
590
591         LASSERT(lu_fid_eq(lu_object_fid(o), f));
592
593         cfs_hash_bd_lock(hs, &bd, 1);
594
595         shadow = htable_lookup(s, &bd, f, waiter, &version);
596         if (likely(shadow == NULL)) {
597                 struct lu_site_bkt_data *bkt;
598
599                 bkt = cfs_hash_bd_extra_get(hs, &bd);
600                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
601                 cfs_list_add_tail(&o->lo_header->loh_lru, &bkt->lsb_lru);
602                 cfs_hash_bd_unlock(hs, &bd, 1);
603                 return o;
604         }
605
606         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
607         cfs_hash_bd_unlock(hs, &bd, 1);
608         lu_object_free(env, o);
609         return shadow;
610 }
611
612 /**
613  * Much like lu_object_find(), but top level device of object is specifically
614  * \a dev rather than top level device of the site. This interface allows
615  * objects of different "stacking" to be created within the same site.
616  */
617 struct lu_object *lu_object_find_at(const struct lu_env *env,
618                                     struct lu_device *dev,
619                                     const struct lu_fid *f,
620                                     const struct lu_object_conf *conf)
621 {
622         struct lu_site_bkt_data *bkt;
623         struct lu_object        *obj;
624         cfs_waitlink_t           wait;
625
626         while (1) {
627                 obj = lu_object_find_try(env, dev, f, conf, &wait);
628                 if (obj != ERR_PTR(-EAGAIN))
629                         return obj;
630                 /*
631                  * lu_object_find_try() already added waiter into the
632                  * wait queue.
633                  */
634                 cfs_waitq_wait(&wait, CFS_TASK_UNINT);
635                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
636                 cfs_waitq_del(&bkt->lsb_marche_funebre, &wait);
637         }
638 }
639 EXPORT_SYMBOL(lu_object_find_at);
640
641 /**
642  * Find object with given fid, and return its slice belonging to given device.
643  */
644 struct lu_object *lu_object_find_slice(const struct lu_env *env,
645                                        struct lu_device *dev,
646                                        const struct lu_fid *f,
647                                        const struct lu_object_conf *conf)
648 {
649         struct lu_object *top;
650         struct lu_object *obj;
651
652         top = lu_object_find(env, dev, f, conf);
653         if (!IS_ERR(top)) {
654                 obj = lu_object_locate(top->lo_header, dev->ld_type);
655                 if (obj == NULL)
656                         lu_object_put(env, top);
657         } else
658                 obj = top;
659         return obj;
660 }
661 EXPORT_SYMBOL(lu_object_find_slice);
662
663 /**
664  * Global list of all device types.
665  */
666 static CFS_LIST_HEAD(lu_device_types);
667
668 int lu_device_type_init(struct lu_device_type *ldt)
669 {
670         int result;
671
672         CFS_INIT_LIST_HEAD(&ldt->ldt_linkage);
673         result = ldt->ldt_ops->ldto_init(ldt);
674         if (result == 0)
675                 cfs_list_add(&ldt->ldt_linkage, &lu_device_types);
676         return result;
677 }
678 EXPORT_SYMBOL(lu_device_type_init);
679
680 void lu_device_type_fini(struct lu_device_type *ldt)
681 {
682         cfs_list_del_init(&ldt->ldt_linkage);
683         ldt->ldt_ops->ldto_fini(ldt);
684 }
685 EXPORT_SYMBOL(lu_device_type_fini);
686
687 void lu_types_stop(void)
688 {
689         struct lu_device_type *ldt;
690
691         cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
692                 if (ldt->ldt_device_nr == 0)
693                         ldt->ldt_ops->ldto_stop(ldt);
694         }
695 }
696 EXPORT_SYMBOL(lu_types_stop);
697
698 /**
699  * Global list of all sites on this node
700  */
701 static CFS_LIST_HEAD(lu_sites);
702 static CFS_DECLARE_MUTEX(lu_sites_guard);
703
704 /**
705  * Global environment used by site shrinker.
706  */
707 static struct lu_env lu_shrink_env;
708
709 struct lu_site_print_arg {
710         struct lu_env   *lsp_env;
711         void            *lsp_cookie;
712         lu_printer_t     lsp_printer;
713 };
714
715 static int
716 lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd,
717                   cfs_hlist_node_t *hnode, void *data)
718 {
719         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
720         struct lu_object_header  *h;
721
722         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
723         if (!cfs_list_empty(&h->loh_layers)) {
724                 const struct lu_object *o;
725
726                 o = lu_object_top(h);
727                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
728                                 arg->lsp_printer, o);
729         } else {
730                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
731                                        arg->lsp_printer, h);
732         }
733         return 0;
734 }
735
736 /**
737  * Print all objects in \a s.
738  */
739 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
740                    lu_printer_t printer)
741 {
742         struct lu_site_print_arg arg = {
743                 .lsp_env     = (struct lu_env *)env,
744                 .lsp_cookie  = cookie,
745                 .lsp_printer = printer,
746         };
747
748         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
749 }
750 EXPORT_SYMBOL(lu_site_print);
751
752 enum {
753         LU_CACHE_PERCENT   = 20,
754 };
755
756 /**
757  * Return desired hash table order.
758  */
759 static int lu_htable_order(void)
760 {
761         unsigned long cache_size;
762         int bits;
763
764         /*
765          * Calculate hash table size, assuming that we want reasonable
766          * performance when 20% of total memory is occupied by cache of
767          * lu_objects.
768          *
769          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
770          */
771         cache_size = cfs_num_physpages;
772
773 #if BITS_PER_LONG == 32
774         /* limit hashtable size for lowmem systems to low RAM */
775         if (cache_size > 1 << (30 - CFS_PAGE_SHIFT))
776                 cache_size = 1 << (30 - CFS_PAGE_SHIFT) * 3 / 4;
777 #endif
778
779         cache_size = cache_size / 100 * LU_CACHE_PERCENT *
780                 (CFS_PAGE_SIZE / 1024);
781
782         for (bits = 1; (1 << bits) < cache_size; ++bits) {
783                 ;
784         }
785         return bits;
786 }
787
788 static unsigned lu_obj_hop_hash(cfs_hash_t *hs, void *key, unsigned mask)
789 {
790         struct lu_fid  *fid = (struct lu_fid *)key;
791         unsigned        hash;
792
793         hash = (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
794         hash += fid_hash(fid, hs->hs_bkt_bits) << hs->hs_bkt_bits;
795         return hash & mask;
796 }
797
798 static void *lu_obj_hop_object(cfs_hlist_node_t *hnode)
799 {
800         return cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
801 }
802
803 static void *lu_obj_hop_key(cfs_hlist_node_t *hnode)
804 {
805         struct lu_object_header *h;
806
807         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
808         return &h->loh_fid;
809 }
810
811 static int lu_obj_hop_keycmp(void *key, cfs_hlist_node_t *hnode)
812 {
813         struct lu_object_header *h;
814
815         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
816         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
817 }
818
819 static void *lu_obj_hop_get(cfs_hlist_node_t *hnode)
820 {
821         struct lu_object_header *h;
822
823         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
824         cfs_atomic_inc(&h->loh_ref);
825         return h;
826 }
827
828 static void *lu_obj_hop_put_locked(cfs_hlist_node_t *hnode)
829 {
830         LBUG(); /* we should never called it */
831         return NULL;
832 }
833
834 cfs_hash_ops_t lu_site_hash_ops = {
835         .hs_hash        = lu_obj_hop_hash,
836         .hs_key         = lu_obj_hop_key,
837         .hs_keycmp      = lu_obj_hop_keycmp,
838         .hs_object      = lu_obj_hop_object,
839         .hs_get         = lu_obj_hop_get,
840         .hs_put_locked  = lu_obj_hop_put_locked,
841 };
842
843 /**
844  * Initialize site \a s, with \a d as the top level device.
845  */
846 #define LU_SITE_BITS_MIN    10
847 #define LU_SITE_BITS_MAX    23
848
849 int lu_site_init(struct lu_site *s, struct lu_device *top)
850 {
851         struct lu_site_bkt_data *bkt;
852         cfs_hash_bd_t bd;
853         int bits;
854         int i;
855         ENTRY;
856
857         memset(s, 0, sizeof *s);
858         bits = lu_htable_order();
859         for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
860              bits >= LU_SITE_BITS_MIN; bits--) {
861                 s->ls_obj_hash = cfs_hash_create("lu_site", bits,
862                                                  bits, bits - LU_SITE_BITS_MIN,
863                                                  sizeof(*bkt), 0, 0,
864                                                  &lu_site_hash_ops,
865                                                  CFS_HASH_SPIN_BKTLOCK |
866                                                  CFS_HASH_NO_ITEMREF |
867                                                  CFS_HASH_DEPTH |
868                                                  CFS_HASH_ASSERT_EMPTY);
869                 if (s->ls_obj_hash != NULL)
870                         break;
871         }
872
873         if (s->ls_obj_hash == NULL) {
874                 CERROR("failed to create lu_site hash with bits: %d\n", bits);
875                 return -ENOMEM;
876         }
877
878         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
879                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
880                 CFS_INIT_LIST_HEAD(&bkt->lsb_lru);
881                 cfs_waitq_init(&bkt->lsb_marche_funebre);
882         }
883
884         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
885         if (s->ls_stats == NULL) {
886                 cfs_hash_putref(s->ls_obj_hash);
887                 s->ls_obj_hash = NULL;
888                 return -ENOMEM;
889         }
890
891         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
892                              0, "created", "created");
893         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
894                              0, "cache_hit", "cache_hit");
895         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
896                              0, "cache_miss", "cache_miss");
897         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
898                              0, "cache_race", "cache_race");
899         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
900                              0, "cache_death_race", "cache_death_race");
901         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
902                              0, "lru_purged", "lru_purged");
903
904         CFS_INIT_LIST_HEAD(&s->ls_linkage);
905         s->ls_top_dev = top;
906         top->ld_site = s;
907         lu_device_get(top);
908         lu_ref_add(&top->ld_reference, "site-top", s);
909
910         RETURN(0);
911 }
912 EXPORT_SYMBOL(lu_site_init);
913
914 /**
915  * Finalize \a s and release its resources.
916  */
917 void lu_site_fini(struct lu_site *s)
918 {
919         cfs_down(&lu_sites_guard);
920         cfs_list_del_init(&s->ls_linkage);
921         cfs_up(&lu_sites_guard);
922
923         if (s->ls_obj_hash != NULL) {
924                 cfs_hash_putref(s->ls_obj_hash);
925                 s->ls_obj_hash = NULL;
926         }
927
928         if (s->ls_top_dev != NULL) {
929                 s->ls_top_dev->ld_site = NULL;
930                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
931                 lu_device_put(s->ls_top_dev);
932                 s->ls_top_dev = NULL;
933         }
934
935         if (s->ls_stats != NULL)
936                 lprocfs_free_stats(&s->ls_stats);
937 }
938 EXPORT_SYMBOL(lu_site_fini);
939
940 /**
941  * Called when initialization of stack for this site is completed.
942  */
943 int lu_site_init_finish(struct lu_site *s)
944 {
945         int result;
946         cfs_down(&lu_sites_guard);
947         result = lu_context_refill(&lu_shrink_env.le_ctx);
948         if (result == 0)
949                 cfs_list_add(&s->ls_linkage, &lu_sites);
950         cfs_up(&lu_sites_guard);
951         return result;
952 }
953 EXPORT_SYMBOL(lu_site_init_finish);
954
955 /**
956  * Acquire additional reference on device \a d
957  */
958 void lu_device_get(struct lu_device *d)
959 {
960         cfs_atomic_inc(&d->ld_ref);
961 }
962 EXPORT_SYMBOL(lu_device_get);
963
964 /**
965  * Release reference on device \a d.
966  */
967 void lu_device_put(struct lu_device *d)
968 {
969         LASSERT(cfs_atomic_read(&d->ld_ref) > 0);
970         cfs_atomic_dec(&d->ld_ref);
971 }
972 EXPORT_SYMBOL(lu_device_put);
973
974 /**
975  * Initialize device \a d of type \a t.
976  */
977 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
978 {
979         if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL)
980                 t->ldt_ops->ldto_start(t);
981         memset(d, 0, sizeof *d);
982         cfs_atomic_set(&d->ld_ref, 0);
983         d->ld_type = t;
984         lu_ref_init(&d->ld_reference);
985         return 0;
986 }
987 EXPORT_SYMBOL(lu_device_init);
988
989 /**
990  * Finalize device \a d.
991  */
992 void lu_device_fini(struct lu_device *d)
993 {
994         struct lu_device_type *t;
995
996         t = d->ld_type;
997         if (d->ld_obd != NULL) {
998                 d->ld_obd->obd_lu_dev = NULL;
999                 d->ld_obd = NULL;
1000         }
1001
1002         lu_ref_fini(&d->ld_reference);
1003         LASSERTF(cfs_atomic_read(&d->ld_ref) == 0,
1004                  "Refcount is %u\n", cfs_atomic_read(&d->ld_ref));
1005         LASSERT(t->ldt_device_nr > 0);
1006         if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL)
1007                 t->ldt_ops->ldto_stop(t);
1008 }
1009 EXPORT_SYMBOL(lu_device_fini);
1010
1011 /**
1012  * Initialize object \a o that is part of compound object \a h and was created
1013  * by device \a d.
1014  */
1015 int lu_object_init(struct lu_object *o,
1016                    struct lu_object_header *h, struct lu_device *d)
1017 {
1018         memset(o, 0, sizeof *o);
1019         o->lo_header = h;
1020         o->lo_dev    = d;
1021         lu_device_get(d);
1022         o->lo_dev_ref = lu_ref_add(&d->ld_reference, "lu_object", o);
1023         CFS_INIT_LIST_HEAD(&o->lo_linkage);
1024         return 0;
1025 }
1026 EXPORT_SYMBOL(lu_object_init);
1027
1028 /**
1029  * Finalize object and release its resources.
1030  */
1031 void lu_object_fini(struct lu_object *o)
1032 {
1033         struct lu_device *dev = o->lo_dev;
1034
1035         LASSERT(cfs_list_empty(&o->lo_linkage));
1036
1037         if (dev != NULL) {
1038                 lu_ref_del_at(&dev->ld_reference,
1039                               o->lo_dev_ref , "lu_object", o);
1040                 lu_device_put(dev);
1041                 o->lo_dev = NULL;
1042         }
1043 }
1044 EXPORT_SYMBOL(lu_object_fini);
1045
1046 /**
1047  * Add object \a o as first layer of compound object \a h
1048  *
1049  * This is typically called by the ->ldo_object_alloc() method of top-level
1050  * device.
1051  */
1052 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1053 {
1054         cfs_list_move(&o->lo_linkage, &h->loh_layers);
1055 }
1056 EXPORT_SYMBOL(lu_object_add_top);
1057
1058 /**
1059  * Add object \a o as a layer of compound object, going after \a before.
1060  *
1061  * This is typically called by the ->ldo_object_alloc() method of \a
1062  * before->lo_dev.
1063  */
1064 void lu_object_add(struct lu_object *before, struct lu_object *o)
1065 {
1066         cfs_list_move(&o->lo_linkage, &before->lo_linkage);
1067 }
1068 EXPORT_SYMBOL(lu_object_add);
1069
1070 /**
1071  * Initialize compound object.
1072  */
1073 int lu_object_header_init(struct lu_object_header *h)
1074 {
1075         memset(h, 0, sizeof *h);
1076         cfs_atomic_set(&h->loh_ref, 1);
1077         CFS_INIT_HLIST_NODE(&h->loh_hash);
1078         CFS_INIT_LIST_HEAD(&h->loh_lru);
1079         CFS_INIT_LIST_HEAD(&h->loh_layers);
1080         lu_ref_init(&h->loh_reference);
1081         return 0;
1082 }
1083 EXPORT_SYMBOL(lu_object_header_init);
1084
1085 /**
1086  * Finalize compound object.
1087  */
1088 void lu_object_header_fini(struct lu_object_header *h)
1089 {
1090         LASSERT(cfs_list_empty(&h->loh_layers));
1091         LASSERT(cfs_list_empty(&h->loh_lru));
1092         LASSERT(cfs_hlist_unhashed(&h->loh_hash));
1093         lu_ref_fini(&h->loh_reference);
1094 }
1095 EXPORT_SYMBOL(lu_object_header_fini);
1096
1097 /**
1098  * Given a compound object, find its slice, corresponding to the device type
1099  * \a dtype.
1100  */
1101 struct lu_object *lu_object_locate(struct lu_object_header *h,
1102                                    const struct lu_device_type *dtype)
1103 {
1104         struct lu_object *o;
1105
1106         cfs_list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1107                 if (o->lo_dev->ld_type == dtype)
1108                         return o;
1109         }
1110         return NULL;
1111 }
1112 EXPORT_SYMBOL(lu_object_locate);
1113
1114
1115
1116 /**
1117  * Finalize and free devices in the device stack.
1118  *
1119  * Finalize device stack by purging object cache, and calling
1120  * lu_device_type_operations::ldto_device_fini() and
1121  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1122  */
1123 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1124 {
1125         struct lu_site   *site = top->ld_site;
1126         struct lu_device *scan;
1127         struct lu_device *next;
1128
1129         lu_site_purge(env, site, ~0);
1130         for (scan = top; scan != NULL; scan = next) {
1131                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1132                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1133                 lu_device_put(scan);
1134         }
1135
1136         /* purge again. */
1137         lu_site_purge(env, site, ~0);
1138
1139         if (!cfs_hash_is_empty(site->ls_obj_hash)) {
1140                 /*
1141                  * Uh-oh, objects still exist.
1142                  */
1143                 static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR);
1144
1145                 lu_site_print(env, site, &cookie, lu_cdebug_printer);
1146         }
1147
1148         for (scan = top; scan != NULL; scan = next) {
1149                 const struct lu_device_type *ldt = scan->ld_type;
1150                 struct obd_type             *type;
1151
1152                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1153                 type = ldt->ldt_obd_type;
1154                 if (type != NULL) {
1155                         type->typ_refcnt--;
1156                         class_put_type(type);
1157                 }
1158         }
1159 }
1160 EXPORT_SYMBOL(lu_stack_fini);
1161
1162 enum {
1163         /**
1164          * Maximal number of tld slots.
1165          */
1166         LU_CONTEXT_KEY_NR = 32
1167 };
1168
1169 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1170
1171 static cfs_spinlock_t lu_keys_guard = CFS_SPIN_LOCK_UNLOCKED;
1172
1173 /**
1174  * Global counter incremented whenever key is registered, unregistered,
1175  * revived or quiesced. This is used to void unnecessary calls to
1176  * lu_context_refill(). No locking is provided, as initialization and shutdown
1177  * are supposed to be externally serialized.
1178  */
1179 static unsigned key_set_version = 0;
1180
1181 /**
1182  * Register new key.
1183  */
1184 int lu_context_key_register(struct lu_context_key *key)
1185 {
1186         int result;
1187         int i;
1188
1189         LASSERT(key->lct_init != NULL);
1190         LASSERT(key->lct_fini != NULL);
1191         LASSERT(key->lct_tags != 0);
1192         LASSERT(key->lct_owner != NULL);
1193
1194         result = -ENFILE;
1195         cfs_spin_lock(&lu_keys_guard);
1196         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1197                 if (lu_keys[i] == NULL) {
1198                         key->lct_index = i;
1199                         cfs_atomic_set(&key->lct_used, 1);
1200                         lu_keys[i] = key;
1201                         lu_ref_init(&key->lct_reference);
1202                         result = 0;
1203                         ++key_set_version;
1204                         break;
1205                 }
1206         }
1207         cfs_spin_unlock(&lu_keys_guard);
1208         return result;
1209 }
1210 EXPORT_SYMBOL(lu_context_key_register);
1211
1212 static void key_fini(struct lu_context *ctx, int index)
1213 {
1214         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1215                 struct lu_context_key *key;
1216
1217                 key = lu_keys[index];
1218                 LASSERT(key != NULL);
1219                 LASSERT(key->lct_fini != NULL);
1220                 LASSERT(cfs_atomic_read(&key->lct_used) > 1);
1221
1222                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1223                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1224                 cfs_atomic_dec(&key->lct_used);
1225                 LASSERT(key->lct_owner != NULL);
1226                 if (!(ctx->lc_tags & LCT_NOREF)) {
1227                         LASSERT(cfs_module_refcount(key->lct_owner) > 0);
1228                         cfs_module_put(key->lct_owner);
1229                 }
1230                 ctx->lc_value[index] = NULL;
1231         }
1232 }
1233
1234 /**
1235  * Deregister key.
1236  */
1237 void lu_context_key_degister(struct lu_context_key *key)
1238 {
1239         LASSERT(cfs_atomic_read(&key->lct_used) >= 1);
1240         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1241
1242         lu_context_key_quiesce(key);
1243
1244         ++key_set_version;
1245         cfs_spin_lock(&lu_keys_guard);
1246         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1247         if (lu_keys[key->lct_index]) {
1248                 lu_keys[key->lct_index] = NULL;
1249                 lu_ref_fini(&key->lct_reference);
1250         }
1251         cfs_spin_unlock(&lu_keys_guard);
1252
1253         LASSERTF(cfs_atomic_read(&key->lct_used) == 1,
1254                  "key has instances: %d\n",
1255                  cfs_atomic_read(&key->lct_used));
1256 }
1257 EXPORT_SYMBOL(lu_context_key_degister);
1258
1259 /**
1260  * Register a number of keys. This has to be called after all keys have been
1261  * initialized by a call to LU_CONTEXT_KEY_INIT().
1262  */
1263 int lu_context_key_register_many(struct lu_context_key *k, ...)
1264 {
1265         struct lu_context_key *key = k;
1266         va_list args;
1267         int result;
1268
1269         va_start(args, k);
1270         do {
1271                 result = lu_context_key_register(key);
1272                 if (result)
1273                         break;
1274                 key = va_arg(args, struct lu_context_key *);
1275         } while (key != NULL);
1276         va_end(args);
1277
1278         if (result != 0) {
1279                 va_start(args, k);
1280                 while (k != key) {
1281                         lu_context_key_degister(k);
1282                         k = va_arg(args, struct lu_context_key *);
1283                 }
1284                 va_end(args);
1285         }
1286
1287         return result;
1288 }
1289 EXPORT_SYMBOL(lu_context_key_register_many);
1290
1291 /**
1292  * De-register a number of keys. This is a dual to
1293  * lu_context_key_register_many().
1294  */
1295 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1296 {
1297         va_list args;
1298
1299         va_start(args, k);
1300         do {
1301                 lu_context_key_degister(k);
1302                 k = va_arg(args, struct lu_context_key*);
1303         } while (k != NULL);
1304         va_end(args);
1305 }
1306 EXPORT_SYMBOL(lu_context_key_degister_many);
1307
1308 /**
1309  * Revive a number of keys.
1310  */
1311 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1312 {
1313         va_list args;
1314
1315         va_start(args, k);
1316         do {
1317                 lu_context_key_revive(k);
1318                 k = va_arg(args, struct lu_context_key*);
1319         } while (k != NULL);
1320         va_end(args);
1321 }
1322 EXPORT_SYMBOL(lu_context_key_revive_many);
1323
1324 /**
1325  * Quiescent a number of keys.
1326  */
1327 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1328 {
1329         va_list args;
1330
1331         va_start(args, k);
1332         do {
1333                 lu_context_key_quiesce(k);
1334                 k = va_arg(args, struct lu_context_key*);
1335         } while (k != NULL);
1336         va_end(args);
1337 }
1338 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1339
1340 /**
1341  * Return value associated with key \a key in context \a ctx.
1342  */
1343 void *lu_context_key_get(const struct lu_context *ctx,
1344                          const struct lu_context_key *key)
1345 {
1346         LINVRNT(ctx->lc_state == LCS_ENTERED);
1347         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1348         LASSERT(lu_keys[key->lct_index] == key);
1349         return ctx->lc_value[key->lct_index];
1350 }
1351 EXPORT_SYMBOL(lu_context_key_get);
1352
1353 /**
1354  * List of remembered contexts. XXX document me.
1355  */
1356 static CFS_LIST_HEAD(lu_context_remembered);
1357
1358 /**
1359  * Destroy \a key in all remembered contexts. This is used to destroy key
1360  * values in "shared" contexts (like service threads), when a module owning
1361  * the key is about to be unloaded.
1362  */
1363 void lu_context_key_quiesce(struct lu_context_key *key)
1364 {
1365         struct lu_context *ctx;
1366         extern unsigned cl_env_cache_purge(unsigned nr);
1367
1368         if (!(key->lct_tags & LCT_QUIESCENT)) {
1369                 /*
1370                  * XXX layering violation.
1371                  */
1372                 cl_env_cache_purge(~0);
1373                 key->lct_tags |= LCT_QUIESCENT;
1374                 /*
1375                  * XXX memory barrier has to go here.
1376                  */
1377                 cfs_spin_lock(&lu_keys_guard);
1378                 cfs_list_for_each_entry(ctx, &lu_context_remembered,
1379                                         lc_remember)
1380                         key_fini(ctx, key->lct_index);
1381                 cfs_spin_unlock(&lu_keys_guard);
1382                 ++key_set_version;
1383         }
1384 }
1385 EXPORT_SYMBOL(lu_context_key_quiesce);
1386
1387 void lu_context_key_revive(struct lu_context_key *key)
1388 {
1389         key->lct_tags &= ~LCT_QUIESCENT;
1390         ++key_set_version;
1391 }
1392 EXPORT_SYMBOL(lu_context_key_revive);
1393
1394 static void keys_fini(struct lu_context *ctx)
1395 {
1396         int i;
1397
1398         cfs_spin_lock(&lu_keys_guard);
1399         if (ctx->lc_value != NULL) {
1400                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1401                         key_fini(ctx, i);
1402                 OBD_FREE(ctx->lc_value,
1403                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1404                 ctx->lc_value = NULL;
1405         }
1406         cfs_spin_unlock(&lu_keys_guard);
1407 }
1408
1409 static int keys_fill(struct lu_context *ctx)
1410 {
1411         int i;
1412
1413         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1414                 struct lu_context_key *key;
1415
1416                 key = lu_keys[i];
1417                 if (ctx->lc_value[i] == NULL && key != NULL &&
1418                     (key->lct_tags & ctx->lc_tags) &&
1419                     /*
1420                      * Don't create values for a LCT_QUIESCENT key, as this
1421                      * will pin module owning a key.
1422                      */
1423                     !(key->lct_tags & LCT_QUIESCENT)) {
1424                         void *value;
1425
1426                         LINVRNT(key->lct_init != NULL);
1427                         LINVRNT(key->lct_index == i);
1428
1429                         value = key->lct_init(ctx, key);
1430                         if (unlikely(IS_ERR(value)))
1431                                 return PTR_ERR(value);
1432
1433                         LASSERT(key->lct_owner != NULL);
1434                         if (!(ctx->lc_tags & LCT_NOREF))
1435                                 cfs_try_module_get(key->lct_owner);
1436                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1437                         cfs_atomic_inc(&key->lct_used);
1438                         /*
1439                          * This is the only place in the code, where an
1440                          * element of ctx->lc_value[] array is set to non-NULL
1441                          * value.
1442                          */
1443                         ctx->lc_value[i] = value;
1444                         if (key->lct_exit != NULL)
1445                                 ctx->lc_tags |= LCT_HAS_EXIT;
1446                 }
1447                 ctx->lc_version = key_set_version;
1448         }
1449         return 0;
1450 }
1451
1452 static int keys_init(struct lu_context *ctx)
1453 {
1454         int result;
1455
1456         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1457         if (likely(ctx->lc_value != NULL))
1458                 result = keys_fill(ctx);
1459         else
1460                 result = -ENOMEM;
1461
1462         if (result != 0)
1463                 keys_fini(ctx);
1464         return result;
1465 }
1466
1467 /**
1468  * Initialize context data-structure. Create values for all keys.
1469  */
1470 int lu_context_init(struct lu_context *ctx, __u32 tags)
1471 {
1472         memset(ctx, 0, sizeof *ctx);
1473         ctx->lc_state = LCS_INITIALIZED;
1474         ctx->lc_tags = tags;
1475         if (tags & LCT_REMEMBER) {
1476                 cfs_spin_lock(&lu_keys_guard);
1477                 cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
1478                 cfs_spin_unlock(&lu_keys_guard);
1479         } else
1480                 CFS_INIT_LIST_HEAD(&ctx->lc_remember);
1481         return keys_init(ctx);
1482 }
1483 EXPORT_SYMBOL(lu_context_init);
1484
1485 /**
1486  * Finalize context data-structure. Destroy key values.
1487  */
1488 void lu_context_fini(struct lu_context *ctx)
1489 {
1490         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1491         ctx->lc_state = LCS_FINALIZED;
1492         keys_fini(ctx);
1493         cfs_spin_lock(&lu_keys_guard);
1494         cfs_list_del_init(&ctx->lc_remember);
1495         cfs_spin_unlock(&lu_keys_guard);
1496 }
1497 EXPORT_SYMBOL(lu_context_fini);
1498
1499 /**
1500  * Called before entering context.
1501  */
1502 void lu_context_enter(struct lu_context *ctx)
1503 {
1504         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1505         ctx->lc_state = LCS_ENTERED;
1506 }
1507 EXPORT_SYMBOL(lu_context_enter);
1508
1509 /**
1510  * Called after exiting from \a ctx
1511  */
1512 void lu_context_exit(struct lu_context *ctx)
1513 {
1514         int i;
1515
1516         LINVRNT(ctx->lc_state == LCS_ENTERED);
1517         ctx->lc_state = LCS_LEFT;
1518         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1519                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1520                         if (ctx->lc_value[i] != NULL) {
1521                                 struct lu_context_key *key;
1522
1523                                 key = lu_keys[i];
1524                                 LASSERT(key != NULL);
1525                                 if (key->lct_exit != NULL)
1526                                         key->lct_exit(ctx,
1527                                                       key, ctx->lc_value[i]);
1528                         }
1529                 }
1530         }
1531 }
1532 EXPORT_SYMBOL(lu_context_exit);
1533
1534 /**
1535  * Allocate for context all missing keys that were registered after context
1536  * creation.
1537  */
1538 int lu_context_refill(struct lu_context *ctx)
1539 {
1540         LINVRNT(ctx->lc_value != NULL);
1541         return ctx->lc_version == key_set_version ? 0 : keys_fill(ctx);
1542 }
1543 EXPORT_SYMBOL(lu_context_refill);
1544
1545 int lu_env_init(struct lu_env *env, __u32 tags)
1546 {
1547         int result;
1548
1549         env->le_ses = NULL;
1550         result = lu_context_init(&env->le_ctx, tags);
1551         if (likely(result == 0))
1552                 lu_context_enter(&env->le_ctx);
1553         return result;
1554 }
1555 EXPORT_SYMBOL(lu_env_init);
1556
1557 void lu_env_fini(struct lu_env *env)
1558 {
1559         lu_context_exit(&env->le_ctx);
1560         lu_context_fini(&env->le_ctx);
1561         env->le_ses = NULL;
1562 }
1563 EXPORT_SYMBOL(lu_env_fini);
1564
1565 int lu_env_refill(struct lu_env *env)
1566 {
1567         int result;
1568
1569         result = lu_context_refill(&env->le_ctx);
1570         if (result == 0 && env->le_ses != NULL)
1571                 result = lu_context_refill(env->le_ses);
1572         return result;
1573 }
1574 EXPORT_SYMBOL(lu_env_refill);
1575
1576 static struct cfs_shrinker *lu_site_shrinker = NULL;
1577
1578 struct lu_site_stats_result {
1579         unsigned        lss_populated;
1580         unsigned        lss_max_search;
1581         unsigned        lss_total;
1582         unsigned        lss_busy;
1583         cfs_hash_bd_t   lss_bd;
1584 };
1585
1586 static int lu_site_stats_get(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1587                              cfs_hlist_node_t *hnode, void *data)
1588 {
1589         struct lu_site_stats_result    *sa = data;
1590         struct lu_object_header        *h;
1591
1592         sa->lss_total++;
1593         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
1594         if (cfs_atomic_read(&h->loh_ref) > 0)
1595                 sa->lss_busy++;
1596
1597         if (sa->lss_bd.bd_bucket == NULL ||
1598             cfs_hash_bd_compare(&sa->lss_bd, bd) != 0) {
1599                 if (sa->lss_max_search < cfs_hash_bd_depmax_get(bd))
1600                         sa->lss_max_search = cfs_hash_bd_depmax_get(bd);
1601                 sa->lss_populated++;
1602                 sa->lss_bd = *bd;
1603         }
1604         return 0;
1605 }
1606
1607 #ifdef __KERNEL__
1608 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1609 {
1610         struct lu_site_stats_result stats;
1611         struct lu_site *s;
1612         struct lu_site *tmp;
1613         int cached = 0;
1614         int remain = nr;
1615         CFS_LIST_HEAD(splice);
1616
1617         if (nr != 0) {
1618                 if (!(gfp_mask & __GFP_FS))
1619                         return -1;
1620                 CDEBUG(D_INODE, "Shrink %d objects\n", nr);
1621         }
1622
1623         cfs_down(&lu_sites_guard);
1624         cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1625                 if (nr != 0) {
1626                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1627                         /*
1628                          * Move just shrunk site to the tail of site list to
1629                          * assure shrinking fairness.
1630                          */
1631                         cfs_list_move_tail(&s->ls_linkage, &splice);
1632                 }
1633
1634                 memset(&stats, 0, sizeof(stats));
1635                 cfs_hash_for_each(s->ls_obj_hash, lu_site_stats_get, &stats);
1636                 cached += stats.lss_total - stats.lss_busy;
1637                 if (nr && remain <= 0)
1638                         break;
1639         }
1640         cfs_list_splice(&splice, lu_sites.prev);
1641         cfs_up(&lu_sites_guard);
1642
1643         cached = (cached / 100) * sysctl_vfs_cache_pressure;
1644         if (nr == 0)
1645                 CDEBUG(D_INODE, "%d objects cached\n", cached);
1646         return cached;
1647 }
1648
1649 /*
1650  * Debugging stuff.
1651  */
1652
1653 /**
1654  * Environment to be used in debugger, contains all tags.
1655  */
1656 struct lu_env lu_debugging_env;
1657
1658 /**
1659  * Debugging printer function using printk().
1660  */
1661 int lu_printk_printer(const struct lu_env *env,
1662                       void *unused, const char *format, ...)
1663 {
1664         va_list args;
1665
1666         va_start(args, format);
1667         vprintk(format, args);
1668         va_end(args);
1669         return 0;
1670 }
1671
1672 void lu_debugging_setup(void)
1673 {
1674         lu_env_init(&lu_debugging_env, ~0);
1675 }
1676
1677 void lu_context_keys_dump(void)
1678 {
1679         int i;
1680
1681         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1682                 struct lu_context_key *key;
1683
1684                 key = lu_keys[i];
1685                 if (key != NULL) {
1686                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
1687                                i, key, key->lct_tags,
1688                                key->lct_init, key->lct_fini, key->lct_exit,
1689                                key->lct_index, cfs_atomic_read(&key->lct_used),
1690                                key->lct_owner ? key->lct_owner->name : "",
1691                                key->lct_owner);
1692                         lu_ref_print(&key->lct_reference);
1693                 }
1694         }
1695 }
1696 EXPORT_SYMBOL(lu_context_keys_dump);
1697 #else  /* !__KERNEL__ */
1698 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1699 {
1700         return 0;
1701 }
1702 #endif /* __KERNEL__ */
1703
1704 int  cl_global_init(void);
1705 void cl_global_fini(void);
1706 int  lu_ref_global_init(void);
1707 void lu_ref_global_fini(void);
1708
1709 int dt_global_init(void);
1710 void dt_global_fini(void);
1711
1712 int llo_global_init(void);
1713 void llo_global_fini(void);
1714
1715 /**
1716  * Initialization of global lu_* data.
1717  */
1718 int lu_global_init(void)
1719 {
1720         int result;
1721
1722         CDEBUG(D_CONSOLE, "Lustre LU module (%p).\n", &lu_keys);
1723
1724         result = lu_ref_global_init();
1725         if (result != 0)
1726                 return result;
1727
1728         LU_CONTEXT_KEY_INIT(&lu_global_key);
1729         result = lu_context_key_register(&lu_global_key);
1730         if (result != 0)
1731                 return result;
1732         /*
1733          * At this level, we don't know what tags are needed, so allocate them
1734          * conservatively. This should not be too bad, because this
1735          * environment is global.
1736          */
1737         cfs_down(&lu_sites_guard);
1738         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
1739         cfs_up(&lu_sites_guard);
1740         if (result != 0)
1741                 return result;
1742
1743         /*
1744          * seeks estimation: 3 seeks to read a record from oi, one to read
1745          * inode, one for ea. Unfortunately setting this high value results in
1746          * lu_object/inode cache consuming all the memory.
1747          */
1748         lu_site_shrinker = cfs_set_shrinker(CFS_DEFAULT_SEEKS, lu_cache_shrink);
1749         if (lu_site_shrinker == NULL)
1750                 return -ENOMEM;
1751
1752         result = lu_time_global_init();
1753         if (result)
1754                 GOTO(out, result);
1755
1756 #ifdef __KERNEL__
1757         result = dt_global_init();
1758         if (result)
1759                 GOTO(out, result);
1760
1761         result = llo_global_init();
1762         if (result)
1763                 GOTO(out, result);
1764 #endif
1765         result = cl_global_init();
1766 out:
1767
1768         return result;
1769 }
1770
1771 /**
1772  * Dual to lu_global_init().
1773  */
1774 void lu_global_fini(void)
1775 {
1776         cl_global_fini();
1777 #ifdef __KERNEL__
1778         llo_global_fini();
1779         dt_global_fini();
1780 #endif
1781         lu_time_global_fini();
1782         if (lu_site_shrinker != NULL) {
1783                 cfs_remove_shrinker(lu_site_shrinker);
1784                 lu_site_shrinker = NULL;
1785         }
1786
1787         lu_context_key_degister(&lu_global_key);
1788
1789         /*
1790          * Tear shrinker environment down _after_ de-registering
1791          * lu_global_key, because the latter has a value in the former.
1792          */
1793         cfs_down(&lu_sites_guard);
1794         lu_env_fini(&lu_shrink_env);
1795         cfs_up(&lu_sites_guard);
1796
1797         lu_ref_global_fini();
1798 }
1799
1800 struct lu_buf LU_BUF_NULL = {
1801         .lb_buf = NULL,
1802         .lb_len = 0
1803 };
1804 EXPORT_SYMBOL(LU_BUF_NULL);
1805
1806 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
1807 {
1808 #ifdef LPROCFS
1809         struct lprocfs_counter ret;
1810
1811         lprocfs_stats_collect(stats, idx, &ret);
1812         return (__u32)ret.lc_count;
1813 #else
1814         return 0;
1815 #endif
1816 }
1817
1818 /**
1819  * Output site statistical counters into a buffer. Suitable for
1820  * lprocfs_rd_*()-style functions.
1821  */
1822 int lu_site_stats_print(const struct lu_site *s, char *page, int count)
1823 {
1824         struct lu_site_stats_result stats;
1825
1826         memset(&stats, 0, sizeof(stats));
1827         cfs_hash_for_each(s->ls_obj_hash, lu_site_stats_get, &stats);
1828
1829         return snprintf(page, count, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
1830                         stats.lss_busy,
1831                         stats.lss_total,
1832                         stats.lss_populated,
1833                         CFS_HASH_NHLIST(s->ls_obj_hash),
1834                         stats.lss_max_search,
1835                         ls_stats_read(s->ls_stats, LU_SS_CREATED),
1836                         ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
1837                         ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
1838                         ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
1839                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
1840                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
1841 }
1842 EXPORT_SYMBOL(lu_site_stats_print);
1843
1844 const char *lu_time_names[LU_TIME_NR] = {
1845         [LU_TIME_FIND_LOOKUP] = "find_lookup",
1846         [LU_TIME_FIND_ALLOC]  = "find_alloc",
1847         [LU_TIME_FIND_INSERT] = "find_insert"
1848 };
1849 EXPORT_SYMBOL(lu_time_names);
1850
1851 /**
1852  * Helper function to initialize a number of kmem slab caches at once.
1853  */
1854 int lu_kmem_init(struct lu_kmem_descr *caches)
1855 {
1856         int result;
1857
1858         for (result = 0; caches->ckd_cache != NULL; ++caches) {
1859                 *caches->ckd_cache = cfs_mem_cache_create(caches->ckd_name,
1860                                                           caches->ckd_size,
1861                                                           0, 0);
1862                 if (*caches->ckd_cache == NULL) {
1863                         result = -ENOMEM;
1864                         break;
1865                 }
1866         }
1867         return result;
1868 }
1869 EXPORT_SYMBOL(lu_kmem_init);
1870
1871 /**
1872  * Helper function to finalize a number of kmem slab cached at once. Dual to
1873  * lu_kmem_init().
1874  */
1875 void lu_kmem_fini(struct lu_kmem_descr *caches)
1876 {
1877         int rc;
1878
1879         for (; caches->ckd_cache != NULL; ++caches) {
1880                 if (*caches->ckd_cache != NULL) {
1881                         rc = cfs_mem_cache_destroy(*caches->ckd_cache);
1882                         LASSERTF(rc == 0, "couldn't destroy %s slab\n",
1883                                  caches->ckd_name);
1884                         *caches->ckd_cache = NULL;
1885                 }
1886         }
1887 }
1888 EXPORT_SYMBOL(lu_kmem_fini);