Whamcloud - gitweb
LU-1301 lu: local objects library
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  *   Author: Nikita Danilov <nikita.danilov@sun.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46
47 #include <libcfs/libcfs.h>
48
49 #ifdef __KERNEL__
50 # include <linux/module.h>
51 #endif
52
53 /* hash_long() */
54 #include <libcfs/libcfs_hash.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_disk.h>
58 #include <lustre_fid.h>
59 #include <lu_object.h>
60 #include <libcfs/list.h>
61 /* lu_time_global_{init,fini}() */
62 #include <lu_time.h>
63
64 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
65
66 /**
67  * Decrease reference counter on object. If last reference is freed, return
68  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
69  * case, free object immediately.
70  */
71 void lu_object_put(const struct lu_env *env, struct lu_object *o)
72 {
73         struct lu_site_bkt_data *bkt;
74         struct lu_object_header *top;
75         struct lu_site          *site;
76         struct lu_object        *orig;
77         cfs_hash_bd_t            bd;
78
79         top  = o->lo_header;
80         site = o->lo_dev->ld_site;
81         orig = o;
82
83         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
84         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
85
86         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
87                 if (lu_object_is_dying(top)) {
88
89                         /*
90                          * somebody may be waiting for this, currently only
91                          * used for cl_object, see cl_object_put_last().
92                          */
93                         cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
94                 }
95                 return;
96         }
97
98         LASSERT(bkt->lsb_busy > 0);
99         bkt->lsb_busy--;
100         /*
101          * When last reference is released, iterate over object
102          * layers, and notify them that object is no longer busy.
103          */
104         cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
105                 if (o->lo_ops->loo_object_release != NULL)
106                         o->lo_ops->loo_object_release(env, o);
107         }
108
109         if (!lu_object_is_dying(top)) {
110                 LASSERT(cfs_list_empty(&top->loh_lru));
111                 cfs_list_add_tail(&top->loh_lru, &bkt->lsb_lru);
112                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
113                 return;
114         }
115
116         /*
117          * If object is dying (will not be cached), removed it
118          * from hash table and LRU.
119          *
120          * This is done with hash table and LRU lists locked. As the only
121          * way to acquire first reference to previously unreferenced
122          * object is through hash-table lookup (lu_object_find()),
123          * or LRU scanning (lu_site_purge()), that are done under hash-table
124          * and LRU lock, no race with concurrent object lookup is possible
125          * and we can safely destroy object below.
126          */
127         cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
128         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
129         /*
130          * Object was already removed from hash and lru above, can
131          * kill it.
132          */
133         lu_object_free(env, orig);
134 }
135 EXPORT_SYMBOL(lu_object_put);
136
137 /**
138  * Put object and don't keep in cache. This is temporary solution for
139  * multi-site objects when its layering is not constant.
140  */
141 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
142 {
143         cfs_set_bit(LU_OBJECT_HEARD_BANSHEE,
144                     &o->lo_header->loh_flags);
145         return lu_object_put(env, o);
146 }
147 EXPORT_SYMBOL(lu_object_put_nocache);
148
149 /**
150  * Allocate new object.
151  *
152  * This follows object creation protocol, described in the comment within
153  * struct lu_device_operations definition.
154  */
155 static struct lu_object *lu_object_alloc(const struct lu_env *env,
156                                          struct lu_device *dev,
157                                          const struct lu_fid *f,
158                                          const struct lu_object_conf *conf)
159 {
160         struct lu_object *scan;
161         struct lu_object *top;
162         cfs_list_t *layers;
163         int clean;
164         int result;
165         ENTRY;
166
167         /*
168          * Create top-level object slice. This will also create
169          * lu_object_header.
170          */
171         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
172         if (top == NULL)
173                 RETURN(ERR_PTR(-ENOMEM));
174         /*
175          * This is the only place where object fid is assigned. It's constant
176          * after this point.
177          */
178         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
179         top->lo_header->loh_fid = *f;
180         layers = &top->lo_header->loh_layers;
181         do {
182                 /*
183                  * Call ->loo_object_init() repeatedly, until no more new
184                  * object slices are created.
185                  */
186                 clean = 1;
187                 cfs_list_for_each_entry(scan, layers, lo_linkage) {
188                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
189                                 continue;
190                         clean = 0;
191                         scan->lo_header = top->lo_header;
192                         result = scan->lo_ops->loo_object_init(env, scan, conf);
193                         if (result != 0) {
194                                 lu_object_free(env, top);
195                                 RETURN(ERR_PTR(result));
196                         }
197                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
198                 }
199         } while (!clean);
200
201         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
202                 if (scan->lo_ops->loo_object_start != NULL) {
203                         result = scan->lo_ops->loo_object_start(env, scan);
204                         if (result != 0) {
205                                 lu_object_free(env, top);
206                                 RETURN(ERR_PTR(result));
207                         }
208                 }
209         }
210
211         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
212         RETURN(top);
213 }
214
215 /**
216  * Free an object.
217  */
218 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
219 {
220         struct lu_site_bkt_data *bkt;
221         struct lu_site          *site;
222         struct lu_object        *scan;
223         cfs_list_t              *layers;
224         cfs_list_t               splice;
225
226         site   = o->lo_dev->ld_site;
227         layers = &o->lo_header->loh_layers;
228         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
229         /*
230          * First call ->loo_object_delete() method to release all resources.
231          */
232         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
233                 if (scan->lo_ops->loo_object_delete != NULL)
234                         scan->lo_ops->loo_object_delete(env, scan);
235         }
236
237         /*
238          * Then, splice object layers into stand-alone list, and call
239          * ->loo_object_free() on all layers to free memory. Splice is
240          * necessary, because lu_object_header is freed together with the
241          * top-level slice.
242          */
243         CFS_INIT_LIST_HEAD(&splice);
244         cfs_list_splice_init(layers, &splice);
245         while (!cfs_list_empty(&splice)) {
246                 /*
247                  * Free layers in bottom-to-top order, so that object header
248                  * lives as long as possible and ->loo_object_free() methods
249                  * can look at its contents.
250                  */
251                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
252                 cfs_list_del_init(&o->lo_linkage);
253                 LASSERT(o->lo_ops->loo_object_free != NULL);
254                 o->lo_ops->loo_object_free(env, o);
255         }
256
257         if (cfs_waitq_active(&bkt->lsb_marche_funebre))
258                 cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
259 }
260
261 /**
262  * Free \a nr objects from the cold end of the site LRU list.
263  */
264 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
265 {
266         struct lu_object_header *h;
267         struct lu_object_header *temp;
268         struct lu_site_bkt_data *bkt;
269         cfs_hash_bd_t            bd;
270         cfs_hash_bd_t            bd2;
271         cfs_list_t               dispose;
272         int                      did_sth;
273         int                      start;
274         int                      count;
275         int                      bnr;
276         int                      i;
277
278         CFS_INIT_LIST_HEAD(&dispose);
279         /*
280          * Under LRU list lock, scan LRU list and move unreferenced objects to
281          * the dispose list, removing them from LRU and hash table.
282          */
283         start = s->ls_purge_start;
284         bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
285  again:
286         did_sth = 0;
287         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
288                 if (i < start)
289                         continue;
290                 count = bnr;
291                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
292                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
293
294                 cfs_list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
295                         LASSERT(cfs_atomic_read(&h->loh_ref) == 0);
296
297                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
298                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
299
300                         cfs_hash_bd_del_locked(s->ls_obj_hash,
301                                                &bd2, &h->loh_hash);
302                         cfs_list_move(&h->loh_lru, &dispose);
303                         if (did_sth == 0)
304                                 did_sth = 1;
305
306                         if (nr != ~0 && --nr == 0)
307                                 break;
308
309                         if (count > 0 && --count == 0)
310                                 break;
311
312                 }
313                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
314                 cfs_cond_resched();
315                 /*
316                  * Free everything on the dispose list. This is safe against
317                  * races due to the reasons described in lu_object_put().
318                  */
319                 while (!cfs_list_empty(&dispose)) {
320                         h = container_of0(dispose.next,
321                                           struct lu_object_header, loh_lru);
322                         cfs_list_del_init(&h->loh_lru);
323                         lu_object_free(env, lu_object_top(h));
324                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
325                 }
326
327                 if (nr == 0)
328                         break;
329         }
330
331         if (nr != 0 && did_sth && start != 0) {
332                 start = 0; /* restart from the first bucket */
333                 goto again;
334         }
335         /* race on s->ls_purge_start, but nobody cares */
336         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
337
338         return nr;
339 }
340 EXPORT_SYMBOL(lu_site_purge);
341
342 /*
343  * Object printing.
344  *
345  * Code below has to jump through certain loops to output object description
346  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
347  * composes object description from strings that are parts of _lines_ of
348  * output (i.e., strings that are not terminated by newline). This doesn't fit
349  * very well into libcfs_debug_msg() interface that assumes that each message
350  * supplied to it is a self-contained output line.
351  *
352  * To work around this, strings are collected in a temporary buffer
353  * (implemented as a value of lu_cdebug_key key), until terminating newline
354  * character is detected.
355  *
356  */
357
358 enum {
359         /**
360          * Maximal line size.
361          *
362          * XXX overflow is not handled correctly.
363          */
364         LU_CDEBUG_LINE = 512
365 };
366
367 struct lu_cdebug_data {
368         /**
369          * Temporary buffer.
370          */
371         char lck_area[LU_CDEBUG_LINE];
372 };
373
374 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
375 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
376
377 /**
378  * Key, holding temporary buffer. This key is registered very early by
379  * lu_global_init().
380  */
381 struct lu_context_key lu_global_key = {
382         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
383                     LCT_MG_THREAD | LCT_CL_THREAD,
384         .lct_init = lu_global_key_init,
385         .lct_fini = lu_global_key_fini
386 };
387
388 /**
389  * Printer function emitting messages through libcfs_debug_msg().
390  */
391 int lu_cdebug_printer(const struct lu_env *env,
392                       void *cookie, const char *format, ...)
393 {
394         struct libcfs_debug_msg_data *msgdata = cookie;
395         struct lu_cdebug_data        *key;
396         int used;
397         int complete;
398         va_list args;
399
400         va_start(args, format);
401
402         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
403         LASSERT(key != NULL);
404
405         used = strlen(key->lck_area);
406         complete = format[strlen(format) - 1] == '\n';
407         /*
408          * Append new chunk to the buffer.
409          */
410         vsnprintf(key->lck_area + used,
411                   ARRAY_SIZE(key->lck_area) - used, format, args);
412         if (complete) {
413                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
414                         libcfs_debug_msg(msgdata, "%s", key->lck_area);
415                 key->lck_area[0] = 0;
416         }
417         va_end(args);
418         return 0;
419 }
420 EXPORT_SYMBOL(lu_cdebug_printer);
421
422 /**
423  * Print object header.
424  */
425 void lu_object_header_print(const struct lu_env *env, void *cookie,
426                             lu_printer_t printer,
427                             const struct lu_object_header *hdr)
428 {
429         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
430                    hdr, hdr->loh_flags, cfs_atomic_read(&hdr->loh_ref),
431                    PFID(&hdr->loh_fid),
432                    cfs_hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
433                    cfs_list_empty((cfs_list_t *)&hdr->loh_lru) ? \
434                    "" : " lru",
435                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
436 }
437 EXPORT_SYMBOL(lu_object_header_print);
438
439 /**
440  * Print human readable representation of the \a o to the \a printer.
441  */
442 void lu_object_print(const struct lu_env *env, void *cookie,
443                      lu_printer_t printer, const struct lu_object *o)
444 {
445         static const char ruler[] = "........................................";
446         struct lu_object_header *top;
447         int depth;
448
449         top = o->lo_header;
450         lu_object_header_print(env, cookie, printer, top);
451         (*printer)(env, cookie, "{ \n");
452         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
453                 depth = o->lo_depth + 4;
454
455                 /*
456                  * print `.' \a depth times followed by type name and address
457                  */
458                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
459                            o->lo_dev->ld_type->ldt_name, o);
460                 if (o->lo_ops->loo_object_print != NULL)
461                         o->lo_ops->loo_object_print(env, cookie, printer, o);
462                 (*printer)(env, cookie, "\n");
463         }
464         (*printer)(env, cookie, "} header@%p\n", top);
465 }
466 EXPORT_SYMBOL(lu_object_print);
467
468 /**
469  * Check object consistency.
470  */
471 int lu_object_invariant(const struct lu_object *o)
472 {
473         struct lu_object_header *top;
474
475         top = o->lo_header;
476         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
477                 if (o->lo_ops->loo_object_invariant != NULL &&
478                     !o->lo_ops->loo_object_invariant(o))
479                         return 0;
480         }
481         return 1;
482 }
483 EXPORT_SYMBOL(lu_object_invariant);
484
485 static struct lu_object *htable_lookup(struct lu_site *s,
486                                        cfs_hash_bd_t *bd,
487                                        const struct lu_fid *f,
488                                        cfs_waitlink_t *waiter,
489                                        __u64 *version)
490 {
491         struct lu_site_bkt_data *bkt;
492         struct lu_object_header *h;
493         cfs_hlist_node_t        *hnode;
494         __u64  ver = cfs_hash_bd_version_get(bd);
495
496         if (*version == ver)
497                 return NULL;
498
499         *version = ver;
500         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
501         /* cfs_hash_bd_lookup_intent is a somehow "internal" function
502          * of cfs_hash, but we don't want refcount on object right now */
503         hnode = cfs_hash_bd_lookup_locked(s->ls_obj_hash, bd, (void *)f);
504         if (hnode == NULL) {
505                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
506                 return NULL;
507         }
508
509         h = container_of0(hnode, struct lu_object_header, loh_hash);
510         if (likely(!lu_object_is_dying(h))) {
511                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
512                 cfs_list_del_init(&h->loh_lru);
513                 return lu_object_top(h);
514         }
515
516         /*
517          * Lookup found an object being destroyed this object cannot be
518          * returned (to assure that references to dying objects are eventually
519          * drained), and moreover, lookup has to wait until object is freed.
520          */
521         cfs_atomic_dec(&h->loh_ref);
522
523         cfs_waitlink_init(waiter);
524         cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
525         cfs_set_current_state(CFS_TASK_UNINT);
526         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
527         return ERR_PTR(-EAGAIN);
528 }
529
530 /**
531  * Search cache for an object with the fid \a f. If such object is found,
532  * return it. Otherwise, create new object, insert it into cache and return
533  * it. In any case, additional reference is acquired on the returned object.
534  */
535 struct lu_object *lu_object_find(const struct lu_env *env,
536                                  struct lu_device *dev, const struct lu_fid *f,
537                                  const struct lu_object_conf *conf)
538 {
539         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
540 }
541 EXPORT_SYMBOL(lu_object_find);
542
543 static struct lu_object *lu_object_new(const struct lu_env *env,
544                                        struct lu_device *dev,
545                                        const struct lu_fid *f,
546                                        const struct lu_object_conf *conf)
547 {
548         struct lu_object        *o;
549         cfs_hash_t              *hs;
550         cfs_hash_bd_t            bd;
551         struct lu_site_bkt_data *bkt;
552
553         o = lu_object_alloc(env, dev, f, conf);
554         if (unlikely(IS_ERR(o)))
555                 return o;
556
557         hs = dev->ld_site->ls_obj_hash;
558         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
559         bkt = cfs_hash_bd_extra_get(hs, &bd);
560         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
561         bkt->lsb_busy++;
562         cfs_hash_bd_unlock(hs, &bd, 1);
563         return o;
564 }
565
566 /**
567  * Core logic of lu_object_find*() functions.
568  */
569 static struct lu_object *lu_object_find_try(const struct lu_env *env,
570                                             struct lu_device *dev,
571                                             const struct lu_fid *f,
572                                             const struct lu_object_conf *conf,
573                                             cfs_waitlink_t *waiter)
574 {
575         struct lu_object      *o;
576         struct lu_object      *shadow;
577         struct lu_site        *s;
578         cfs_hash_t            *hs;
579         cfs_hash_bd_t          bd;
580         __u64                  version = 0;
581
582         /*
583          * This uses standard index maintenance protocol:
584          *
585          *     - search index under lock, and return object if found;
586          *     - otherwise, unlock index, allocate new object;
587          *     - lock index and search again;
588          *     - if nothing is found (usual case), insert newly created
589          *       object into index;
590          *     - otherwise (race: other thread inserted object), free
591          *       object just allocated.
592          *     - unlock index;
593          *     - return object.
594          *
595          * For "LOC_F_NEW" case, we are sure the object is new established.
596          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
597          * just alloc and insert directly.
598          *
599          * If dying object is found during index search, add @waiter to the
600          * site wait-queue and return ERR_PTR(-EAGAIN).
601          */
602         if (conf != NULL && conf->loc_flags & LOC_F_NEW)
603                 return lu_object_new(env, dev, f, conf);
604
605         s  = dev->ld_site;
606         hs = s->ls_obj_hash;
607         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
608         o = htable_lookup(s, &bd, f, waiter, &version);
609         cfs_hash_bd_unlock(hs, &bd, 1);
610         if (o != NULL)
611                 return o;
612
613         /*
614          * Allocate new object. This may result in rather complicated
615          * operations, including fld queries, inode loading, etc.
616          */
617         o = lu_object_alloc(env, dev, f, conf);
618         if (unlikely(IS_ERR(o)))
619                 return o;
620
621         LASSERT(lu_fid_eq(lu_object_fid(o), f));
622
623         cfs_hash_bd_lock(hs, &bd, 1);
624
625         shadow = htable_lookup(s, &bd, f, waiter, &version);
626         if (likely(shadow == NULL)) {
627                 struct lu_site_bkt_data *bkt;
628
629                 bkt = cfs_hash_bd_extra_get(hs, &bd);
630                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
631                 bkt->lsb_busy++;
632                 cfs_hash_bd_unlock(hs, &bd, 1);
633                 return o;
634         }
635
636         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
637         cfs_hash_bd_unlock(hs, &bd, 1);
638         lu_object_free(env, o);
639         return shadow;
640 }
641
642 /**
643  * Much like lu_object_find(), but top level device of object is specifically
644  * \a dev rather than top level device of the site. This interface allows
645  * objects of different "stacking" to be created within the same site.
646  */
647 struct lu_object *lu_object_find_at(const struct lu_env *env,
648                                     struct lu_device *dev,
649                                     const struct lu_fid *f,
650                                     const struct lu_object_conf *conf)
651 {
652         struct lu_site_bkt_data *bkt;
653         struct lu_object        *obj;
654         cfs_waitlink_t           wait;
655
656         while (1) {
657                 obj = lu_object_find_try(env, dev, f, conf, &wait);
658                 if (obj != ERR_PTR(-EAGAIN))
659                         return obj;
660                 /*
661                  * lu_object_find_try() already added waiter into the
662                  * wait queue.
663                  */
664                 cfs_waitq_wait(&wait, CFS_TASK_UNINT);
665                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
666                 cfs_waitq_del(&bkt->lsb_marche_funebre, &wait);
667         }
668 }
669 EXPORT_SYMBOL(lu_object_find_at);
670
671 /**
672  * Find object with given fid, and return its slice belonging to given device.
673  */
674 struct lu_object *lu_object_find_slice(const struct lu_env *env,
675                                        struct lu_device *dev,
676                                        const struct lu_fid *f,
677                                        const struct lu_object_conf *conf)
678 {
679         struct lu_object *top;
680         struct lu_object *obj;
681
682         top = lu_object_find(env, dev, f, conf);
683         if (!IS_ERR(top)) {
684                 obj = lu_object_locate(top->lo_header, dev->ld_type);
685                 if (obj == NULL)
686                         lu_object_put(env, top);
687         } else
688                 obj = top;
689         return obj;
690 }
691 EXPORT_SYMBOL(lu_object_find_slice);
692
693 /**
694  * Global list of all device types.
695  */
696 static CFS_LIST_HEAD(lu_device_types);
697
698 int lu_device_type_init(struct lu_device_type *ldt)
699 {
700         int result;
701
702         CFS_INIT_LIST_HEAD(&ldt->ldt_linkage);
703         result = ldt->ldt_ops->ldto_init(ldt);
704         if (result == 0)
705                 cfs_list_add(&ldt->ldt_linkage, &lu_device_types);
706         return result;
707 }
708 EXPORT_SYMBOL(lu_device_type_init);
709
710 void lu_device_type_fini(struct lu_device_type *ldt)
711 {
712         cfs_list_del_init(&ldt->ldt_linkage);
713         ldt->ldt_ops->ldto_fini(ldt);
714 }
715 EXPORT_SYMBOL(lu_device_type_fini);
716
717 void lu_types_stop(void)
718 {
719         struct lu_device_type *ldt;
720
721         cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
722                 if (ldt->ldt_device_nr == 0)
723                         ldt->ldt_ops->ldto_stop(ldt);
724         }
725 }
726 EXPORT_SYMBOL(lu_types_stop);
727
728 /**
729  * Global list of all sites on this node
730  */
731 static CFS_LIST_HEAD(lu_sites);
732 static CFS_DEFINE_MUTEX(lu_sites_guard);
733
734 /**
735  * Global environment used by site shrinker.
736  */
737 static struct lu_env lu_shrink_env;
738
739 struct lu_site_print_arg {
740         struct lu_env   *lsp_env;
741         void            *lsp_cookie;
742         lu_printer_t     lsp_printer;
743 };
744
745 static int
746 lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd,
747                   cfs_hlist_node_t *hnode, void *data)
748 {
749         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
750         struct lu_object_header  *h;
751
752         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
753         if (!cfs_list_empty(&h->loh_layers)) {
754                 const struct lu_object *o;
755
756                 o = lu_object_top(h);
757                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
758                                 arg->lsp_printer, o);
759         } else {
760                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
761                                        arg->lsp_printer, h);
762         }
763         return 0;
764 }
765
766 /**
767  * Print all objects in \a s.
768  */
769 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
770                    lu_printer_t printer)
771 {
772         struct lu_site_print_arg arg = {
773                 .lsp_env     = (struct lu_env *)env,
774                 .lsp_cookie  = cookie,
775                 .lsp_printer = printer,
776         };
777
778         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
779 }
780 EXPORT_SYMBOL(lu_site_print);
781
782 enum {
783         LU_CACHE_PERCENT_MAX     = 50,
784         LU_CACHE_PERCENT_DEFAULT = 20
785 };
786
787 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
788 CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
789                 "Percentage of memory to be used as lu_object cache");
790
791 /**
792  * Return desired hash table order.
793  */
794 static int lu_htable_order(void)
795 {
796         unsigned long cache_size;
797         int bits;
798
799         /*
800          * Calculate hash table size, assuming that we want reasonable
801          * performance when 20% of total memory is occupied by cache of
802          * lu_objects.
803          *
804          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
805          */
806         cache_size = cfs_num_physpages;
807
808 #if BITS_PER_LONG == 32
809         /* limit hashtable size for lowmem systems to low RAM */
810         if (cache_size > 1 << (30 - CFS_PAGE_SHIFT))
811                 cache_size = 1 << (30 - CFS_PAGE_SHIFT) * 3 / 4;
812 #endif
813
814         /* clear off unreasonable cache setting. */
815         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
816                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
817                       " the range of (0, %u]. Will use default value: %u.\n",
818                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
819                       LU_CACHE_PERCENT_DEFAULT);
820
821                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
822         }
823         cache_size = cache_size / 100 * lu_cache_percent *
824                 (CFS_PAGE_SIZE / 1024);
825
826         for (bits = 1; (1 << bits) < cache_size; ++bits) {
827                 ;
828         }
829         return bits;
830 }
831
832 static unsigned lu_obj_hop_hash(cfs_hash_t *hs,
833                                 const void *key, unsigned mask)
834 {
835         struct lu_fid  *fid = (struct lu_fid *)key;
836         __u32           hash;
837
838         hash = fid_flatten32(fid);
839         hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
840         hash = cfs_hash_long(hash, hs->hs_bkt_bits);
841
842         /* give me another random factor */
843         hash -= cfs_hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
844
845         hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
846         hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
847
848         return hash & mask;
849 }
850
851 static void *lu_obj_hop_object(cfs_hlist_node_t *hnode)
852 {
853         return cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
854 }
855
856 static void *lu_obj_hop_key(cfs_hlist_node_t *hnode)
857 {
858         struct lu_object_header *h;
859
860         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
861         return &h->loh_fid;
862 }
863
864 static int lu_obj_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
865 {
866         struct lu_object_header *h;
867
868         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
869         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
870 }
871
872 static void lu_obj_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
873 {
874         struct lu_object_header *h;
875
876         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
877         if (cfs_atomic_add_return(1, &h->loh_ref) == 1) {
878                 struct lu_site_bkt_data *bkt;
879                 cfs_hash_bd_t            bd;
880
881                 cfs_hash_bd_get(hs, &h->loh_fid, &bd);
882                 bkt = cfs_hash_bd_extra_get(hs, &bd);
883                 bkt->lsb_busy++;
884         }
885 }
886
887 static void lu_obj_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
888 {
889         LBUG(); /* we should never called it */
890 }
891
892 cfs_hash_ops_t lu_site_hash_ops = {
893         .hs_hash        = lu_obj_hop_hash,
894         .hs_key         = lu_obj_hop_key,
895         .hs_keycmp      = lu_obj_hop_keycmp,
896         .hs_object      = lu_obj_hop_object,
897         .hs_get         = lu_obj_hop_get,
898         .hs_put_locked  = lu_obj_hop_put_locked,
899 };
900
901 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
902 {
903         cfs_spin_lock(&s->ls_ld_lock);
904         if (cfs_list_empty(&d->ld_linkage))
905                 cfs_list_add(&d->ld_linkage, &s->ls_ld_linkage);
906         cfs_spin_unlock(&s->ls_ld_lock);
907 }
908 EXPORT_SYMBOL(lu_dev_add_linkage);
909
910 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
911 {
912         cfs_spin_lock(&s->ls_ld_lock);
913         cfs_list_del_init(&d->ld_linkage);
914         cfs_spin_unlock(&s->ls_ld_lock);
915 }
916 EXPORT_SYMBOL(lu_dev_del_linkage);
917
918 /**
919  * Initialize site \a s, with \a d as the top level device.
920  */
921 #define LU_SITE_BITS_MIN    12
922 #define LU_SITE_BITS_MAX    24
923 /**
924  * total 256 buckets, we don't want too many buckets because:
925  * - consume too much memory
926  * - avoid unbalanced LRU list
927  */
928 #define LU_SITE_BKT_BITS    8
929
930 int lu_site_init(struct lu_site *s, struct lu_device *top)
931 {
932         struct lu_site_bkt_data *bkt;
933         cfs_hash_bd_t bd;
934         char name[16];
935         int bits;
936         int i;
937         ENTRY;
938
939         memset(s, 0, sizeof *s);
940         bits = lu_htable_order();
941         snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
942         for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
943              bits >= LU_SITE_BITS_MIN; bits--) {
944                 s->ls_obj_hash = cfs_hash_create(name, bits, bits,
945                                                  bits - LU_SITE_BKT_BITS,
946                                                  sizeof(*bkt), 0, 0,
947                                                  &lu_site_hash_ops,
948                                                  CFS_HASH_SPIN_BKTLOCK |
949                                                  CFS_HASH_NO_ITEMREF |
950                                                  CFS_HASH_DEPTH |
951                                                  CFS_HASH_ASSERT_EMPTY);
952                 if (s->ls_obj_hash != NULL)
953                         break;
954         }
955
956         if (s->ls_obj_hash == NULL) {
957                 CERROR("failed to create lu_site hash with bits: %d\n", bits);
958                 return -ENOMEM;
959         }
960
961         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
962                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
963                 CFS_INIT_LIST_HEAD(&bkt->lsb_lru);
964                 cfs_waitq_init(&bkt->lsb_marche_funebre);
965         }
966
967         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
968         if (s->ls_stats == NULL) {
969                 cfs_hash_putref(s->ls_obj_hash);
970                 s->ls_obj_hash = NULL;
971                 return -ENOMEM;
972         }
973
974         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
975                              0, "created", "created");
976         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
977                              0, "cache_hit", "cache_hit");
978         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
979                              0, "cache_miss", "cache_miss");
980         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
981                              0, "cache_race", "cache_race");
982         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
983                              0, "cache_death_race", "cache_death_race");
984         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
985                              0, "lru_purged", "lru_purged");
986
987         CFS_INIT_LIST_HEAD(&s->ls_linkage);
988         s->ls_top_dev = top;
989         top->ld_site = s;
990         lu_device_get(top);
991         lu_ref_add(&top->ld_reference, "site-top", s);
992
993         CFS_INIT_LIST_HEAD(&s->ls_ld_linkage);
994         cfs_spin_lock_init(&s->ls_ld_lock);
995
996         lu_dev_add_linkage(s, top);
997
998         RETURN(0);
999 }
1000 EXPORT_SYMBOL(lu_site_init);
1001
1002 /**
1003  * Finalize \a s and release its resources.
1004  */
1005 void lu_site_fini(struct lu_site *s)
1006 {
1007         cfs_mutex_lock(&lu_sites_guard);
1008         cfs_list_del_init(&s->ls_linkage);
1009         cfs_mutex_unlock(&lu_sites_guard);
1010
1011         if (s->ls_obj_hash != NULL) {
1012                 cfs_hash_putref(s->ls_obj_hash);
1013                 s->ls_obj_hash = NULL;
1014         }
1015
1016         if (s->ls_top_dev != NULL) {
1017                 s->ls_top_dev->ld_site = NULL;
1018                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1019                 lu_device_put(s->ls_top_dev);
1020                 s->ls_top_dev = NULL;
1021         }
1022
1023         if (s->ls_stats != NULL)
1024                 lprocfs_free_stats(&s->ls_stats);
1025 }
1026 EXPORT_SYMBOL(lu_site_fini);
1027
1028 /**
1029  * Called when initialization of stack for this site is completed.
1030  */
1031 int lu_site_init_finish(struct lu_site *s)
1032 {
1033         int result;
1034         cfs_mutex_lock(&lu_sites_guard);
1035         result = lu_context_refill(&lu_shrink_env.le_ctx);
1036         if (result == 0)
1037                 cfs_list_add(&s->ls_linkage, &lu_sites);
1038         cfs_mutex_unlock(&lu_sites_guard);
1039         return result;
1040 }
1041 EXPORT_SYMBOL(lu_site_init_finish);
1042
1043 /**
1044  * Acquire additional reference on device \a d
1045  */
1046 void lu_device_get(struct lu_device *d)
1047 {
1048         cfs_atomic_inc(&d->ld_ref);
1049 }
1050 EXPORT_SYMBOL(lu_device_get);
1051
1052 /**
1053  * Release reference on device \a d.
1054  */
1055 void lu_device_put(struct lu_device *d)
1056 {
1057         LASSERT(cfs_atomic_read(&d->ld_ref) > 0);
1058         cfs_atomic_dec(&d->ld_ref);
1059 }
1060 EXPORT_SYMBOL(lu_device_put);
1061
1062 /**
1063  * Initialize device \a d of type \a t.
1064  */
1065 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1066 {
1067         if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL)
1068                 t->ldt_ops->ldto_start(t);
1069         memset(d, 0, sizeof *d);
1070         cfs_atomic_set(&d->ld_ref, 0);
1071         d->ld_type = t;
1072         lu_ref_init(&d->ld_reference);
1073         CFS_INIT_LIST_HEAD(&d->ld_linkage);
1074         return 0;
1075 }
1076 EXPORT_SYMBOL(lu_device_init);
1077
1078 /**
1079  * Finalize device \a d.
1080  */
1081 void lu_device_fini(struct lu_device *d)
1082 {
1083         struct lu_device_type *t;
1084
1085         t = d->ld_type;
1086         if (d->ld_obd != NULL) {
1087                 d->ld_obd->obd_lu_dev = NULL;
1088                 d->ld_obd = NULL;
1089         }
1090
1091         lu_ref_fini(&d->ld_reference);
1092         LASSERTF(cfs_atomic_read(&d->ld_ref) == 0,
1093                  "Refcount is %u\n", cfs_atomic_read(&d->ld_ref));
1094         LASSERT(t->ldt_device_nr > 0);
1095         if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL)
1096                 t->ldt_ops->ldto_stop(t);
1097 }
1098 EXPORT_SYMBOL(lu_device_fini);
1099
1100 /**
1101  * Initialize object \a o that is part of compound object \a h and was created
1102  * by device \a d.
1103  */
1104 int lu_object_init(struct lu_object *o,
1105                    struct lu_object_header *h, struct lu_device *d)
1106 {
1107         memset(o, 0, sizeof *o);
1108         o->lo_header = h;
1109         o->lo_dev    = d;
1110         lu_device_get(d);
1111         o->lo_dev_ref = lu_ref_add(&d->ld_reference, "lu_object", o);
1112         CFS_INIT_LIST_HEAD(&o->lo_linkage);
1113         return 0;
1114 }
1115 EXPORT_SYMBOL(lu_object_init);
1116
1117 /**
1118  * Finalize object and release its resources.
1119  */
1120 void lu_object_fini(struct lu_object *o)
1121 {
1122         struct lu_device *dev = o->lo_dev;
1123
1124         LASSERT(cfs_list_empty(&o->lo_linkage));
1125
1126         if (dev != NULL) {
1127                 lu_ref_del_at(&dev->ld_reference,
1128                               o->lo_dev_ref , "lu_object", o);
1129                 lu_device_put(dev);
1130                 o->lo_dev = NULL;
1131         }
1132 }
1133 EXPORT_SYMBOL(lu_object_fini);
1134
1135 /**
1136  * Add object \a o as first layer of compound object \a h
1137  *
1138  * This is typically called by the ->ldo_object_alloc() method of top-level
1139  * device.
1140  */
1141 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1142 {
1143         cfs_list_move(&o->lo_linkage, &h->loh_layers);
1144 }
1145 EXPORT_SYMBOL(lu_object_add_top);
1146
1147 /**
1148  * Add object \a o as a layer of compound object, going after \a before.
1149  *
1150  * This is typically called by the ->ldo_object_alloc() method of \a
1151  * before->lo_dev.
1152  */
1153 void lu_object_add(struct lu_object *before, struct lu_object *o)
1154 {
1155         cfs_list_move(&o->lo_linkage, &before->lo_linkage);
1156 }
1157 EXPORT_SYMBOL(lu_object_add);
1158
1159 /**
1160  * Initialize compound object.
1161  */
1162 int lu_object_header_init(struct lu_object_header *h)
1163 {
1164         memset(h, 0, sizeof *h);
1165         cfs_atomic_set(&h->loh_ref, 1);
1166         CFS_INIT_HLIST_NODE(&h->loh_hash);
1167         CFS_INIT_LIST_HEAD(&h->loh_lru);
1168         CFS_INIT_LIST_HEAD(&h->loh_layers);
1169         lu_ref_init(&h->loh_reference);
1170         return 0;
1171 }
1172 EXPORT_SYMBOL(lu_object_header_init);
1173
1174 /**
1175  * Finalize compound object.
1176  */
1177 void lu_object_header_fini(struct lu_object_header *h)
1178 {
1179         LASSERT(cfs_list_empty(&h->loh_layers));
1180         LASSERT(cfs_list_empty(&h->loh_lru));
1181         LASSERT(cfs_hlist_unhashed(&h->loh_hash));
1182         lu_ref_fini(&h->loh_reference);
1183 }
1184 EXPORT_SYMBOL(lu_object_header_fini);
1185
1186 /**
1187  * Given a compound object, find its slice, corresponding to the device type
1188  * \a dtype.
1189  */
1190 struct lu_object *lu_object_locate(struct lu_object_header *h,
1191                                    const struct lu_device_type *dtype)
1192 {
1193         struct lu_object *o;
1194
1195         cfs_list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1196                 if (o->lo_dev->ld_type == dtype)
1197                         return o;
1198         }
1199         return NULL;
1200 }
1201 EXPORT_SYMBOL(lu_object_locate);
1202
1203
1204
1205 /**
1206  * Finalize and free devices in the device stack.
1207  *
1208  * Finalize device stack by purging object cache, and calling
1209  * lu_device_type_operations::ldto_device_fini() and
1210  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1211  */
1212 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1213 {
1214         struct lu_site   *site = top->ld_site;
1215         struct lu_device *scan;
1216         struct lu_device *next;
1217
1218         lu_site_purge(env, site, ~0);
1219         for (scan = top; scan != NULL; scan = next) {
1220                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1221                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1222                 lu_device_put(scan);
1223         }
1224
1225         /* purge again. */
1226         lu_site_purge(env, site, ~0);
1227
1228         for (scan = top; scan != NULL; scan = next) {
1229                 const struct lu_device_type *ldt = scan->ld_type;
1230                 struct obd_type             *type;
1231
1232                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1233                 type = ldt->ldt_obd_type;
1234                 if (type != NULL) {
1235                         type->typ_refcnt--;
1236                         class_put_type(type);
1237                 }
1238         }
1239 }
1240 EXPORT_SYMBOL(lu_stack_fini);
1241
1242 enum {
1243         /**
1244          * Maximal number of tld slots.
1245          */
1246         LU_CONTEXT_KEY_NR = 32
1247 };
1248
1249 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1250
1251 static DEFINE_SPINLOCK(lu_keys_guard);
1252
1253 /**
1254  * Global counter incremented whenever key is registered, unregistered,
1255  * revived or quiesced. This is used to void unnecessary calls to
1256  * lu_context_refill(). No locking is provided, as initialization and shutdown
1257  * are supposed to be externally serialized.
1258  */
1259 static unsigned key_set_version = 0;
1260
1261 /**
1262  * Register new key.
1263  */
1264 int lu_context_key_register(struct lu_context_key *key)
1265 {
1266         int result;
1267         int i;
1268
1269         LASSERT(key->lct_init != NULL);
1270         LASSERT(key->lct_fini != NULL);
1271         LASSERT(key->lct_tags != 0);
1272         LASSERT(key->lct_owner != NULL);
1273
1274         result = -ENFILE;
1275         cfs_spin_lock(&lu_keys_guard);
1276         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1277                 if (lu_keys[i] == NULL) {
1278                         key->lct_index = i;
1279                         cfs_atomic_set(&key->lct_used, 1);
1280                         lu_keys[i] = key;
1281                         lu_ref_init(&key->lct_reference);
1282                         result = 0;
1283                         ++key_set_version;
1284                         break;
1285                 }
1286         }
1287         cfs_spin_unlock(&lu_keys_guard);
1288         return result;
1289 }
1290 EXPORT_SYMBOL(lu_context_key_register);
1291
1292 static void key_fini(struct lu_context *ctx, int index)
1293 {
1294         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1295                 struct lu_context_key *key;
1296
1297                 key = lu_keys[index];
1298                 LASSERT(key != NULL);
1299                 LASSERT(key->lct_fini != NULL);
1300                 LASSERT(cfs_atomic_read(&key->lct_used) > 1);
1301
1302                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1303                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1304                 cfs_atomic_dec(&key->lct_used);
1305
1306                 LASSERT(key->lct_owner != NULL);
1307                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1308                         LINVRNT(cfs_module_refcount(key->lct_owner) > 0);
1309                         cfs_module_put(key->lct_owner);
1310                 }
1311                 ctx->lc_value[index] = NULL;
1312         }
1313 }
1314
1315 /**
1316  * Deregister key.
1317  */
1318 void lu_context_key_degister(struct lu_context_key *key)
1319 {
1320         LASSERT(cfs_atomic_read(&key->lct_used) >= 1);
1321         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1322
1323         lu_context_key_quiesce(key);
1324
1325         ++key_set_version;
1326         cfs_spin_lock(&lu_keys_guard);
1327         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1328         if (lu_keys[key->lct_index]) {
1329                 lu_keys[key->lct_index] = NULL;
1330                 lu_ref_fini(&key->lct_reference);
1331         }
1332         cfs_spin_unlock(&lu_keys_guard);
1333
1334         LASSERTF(cfs_atomic_read(&key->lct_used) == 1,
1335                  "key has instances: %d\n",
1336                  cfs_atomic_read(&key->lct_used));
1337 }
1338 EXPORT_SYMBOL(lu_context_key_degister);
1339
1340 /**
1341  * Register a number of keys. This has to be called after all keys have been
1342  * initialized by a call to LU_CONTEXT_KEY_INIT().
1343  */
1344 int lu_context_key_register_many(struct lu_context_key *k, ...)
1345 {
1346         struct lu_context_key *key = k;
1347         va_list args;
1348         int result;
1349
1350         va_start(args, k);
1351         do {
1352                 result = lu_context_key_register(key);
1353                 if (result)
1354                         break;
1355                 key = va_arg(args, struct lu_context_key *);
1356         } while (key != NULL);
1357         va_end(args);
1358
1359         if (result != 0) {
1360                 va_start(args, k);
1361                 while (k != key) {
1362                         lu_context_key_degister(k);
1363                         k = va_arg(args, struct lu_context_key *);
1364                 }
1365                 va_end(args);
1366         }
1367
1368         return result;
1369 }
1370 EXPORT_SYMBOL(lu_context_key_register_many);
1371
1372 /**
1373  * De-register a number of keys. This is a dual to
1374  * lu_context_key_register_many().
1375  */
1376 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1377 {
1378         va_list args;
1379
1380         va_start(args, k);
1381         do {
1382                 lu_context_key_degister(k);
1383                 k = va_arg(args, struct lu_context_key*);
1384         } while (k != NULL);
1385         va_end(args);
1386 }
1387 EXPORT_SYMBOL(lu_context_key_degister_many);
1388
1389 /**
1390  * Revive a number of keys.
1391  */
1392 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1393 {
1394         va_list args;
1395
1396         va_start(args, k);
1397         do {
1398                 lu_context_key_revive(k);
1399                 k = va_arg(args, struct lu_context_key*);
1400         } while (k != NULL);
1401         va_end(args);
1402 }
1403 EXPORT_SYMBOL(lu_context_key_revive_many);
1404
1405 /**
1406  * Quiescent a number of keys.
1407  */
1408 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1409 {
1410         va_list args;
1411
1412         va_start(args, k);
1413         do {
1414                 lu_context_key_quiesce(k);
1415                 k = va_arg(args, struct lu_context_key*);
1416         } while (k != NULL);
1417         va_end(args);
1418 }
1419 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1420
1421 /**
1422  * Return value associated with key \a key in context \a ctx.
1423  */
1424 void *lu_context_key_get(const struct lu_context *ctx,
1425                          const struct lu_context_key *key)
1426 {
1427         LINVRNT(ctx->lc_state == LCS_ENTERED);
1428         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1429         LASSERT(lu_keys[key->lct_index] == key);
1430         return ctx->lc_value[key->lct_index];
1431 }
1432 EXPORT_SYMBOL(lu_context_key_get);
1433
1434 /**
1435  * List of remembered contexts. XXX document me.
1436  */
1437 static CFS_LIST_HEAD(lu_context_remembered);
1438
1439 /**
1440  * Destroy \a key in all remembered contexts. This is used to destroy key
1441  * values in "shared" contexts (like service threads), when a module owning
1442  * the key is about to be unloaded.
1443  */
1444 void lu_context_key_quiesce(struct lu_context_key *key)
1445 {
1446         struct lu_context *ctx;
1447         extern unsigned cl_env_cache_purge(unsigned nr);
1448
1449         if (!(key->lct_tags & LCT_QUIESCENT)) {
1450                 /*
1451                  * XXX layering violation.
1452                  */
1453                 cl_env_cache_purge(~0);
1454                 key->lct_tags |= LCT_QUIESCENT;
1455                 /*
1456                  * XXX memory barrier has to go here.
1457                  */
1458                 cfs_spin_lock(&lu_keys_guard);
1459                 cfs_list_for_each_entry(ctx, &lu_context_remembered,
1460                                         lc_remember)
1461                         key_fini(ctx, key->lct_index);
1462                 cfs_spin_unlock(&lu_keys_guard);
1463                 ++key_set_version;
1464         }
1465 }
1466 EXPORT_SYMBOL(lu_context_key_quiesce);
1467
1468 void lu_context_key_revive(struct lu_context_key *key)
1469 {
1470         key->lct_tags &= ~LCT_QUIESCENT;
1471         ++key_set_version;
1472 }
1473 EXPORT_SYMBOL(lu_context_key_revive);
1474
1475 static void keys_fini(struct lu_context *ctx)
1476 {
1477         int     i;
1478
1479         if (ctx->lc_value == NULL)
1480                 return;
1481
1482         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1483                 key_fini(ctx, i);
1484
1485         OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1486         ctx->lc_value = NULL;
1487 }
1488
1489 static int keys_fill(struct lu_context *ctx)
1490 {
1491         int i;
1492
1493         LINVRNT(ctx->lc_value != NULL);
1494         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1495                 struct lu_context_key *key;
1496
1497                 key = lu_keys[i];
1498                 if (ctx->lc_value[i] == NULL && key != NULL &&
1499                     (key->lct_tags & ctx->lc_tags) &&
1500                     /*
1501                      * Don't create values for a LCT_QUIESCENT key, as this
1502                      * will pin module owning a key.
1503                      */
1504                     !(key->lct_tags & LCT_QUIESCENT)) {
1505                         void *value;
1506
1507                         LINVRNT(key->lct_init != NULL);
1508                         LINVRNT(key->lct_index == i);
1509
1510                         value = key->lct_init(ctx, key);
1511                         if (unlikely(IS_ERR(value)))
1512                                 return PTR_ERR(value);
1513
1514                         LASSERT(key->lct_owner != NULL);
1515                         if (!(ctx->lc_tags & LCT_NOREF))
1516                                 cfs_try_module_get(key->lct_owner);
1517                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1518                         cfs_atomic_inc(&key->lct_used);
1519                         /*
1520                          * This is the only place in the code, where an
1521                          * element of ctx->lc_value[] array is set to non-NULL
1522                          * value.
1523                          */
1524                         ctx->lc_value[i] = value;
1525                         if (key->lct_exit != NULL)
1526                                 ctx->lc_tags |= LCT_HAS_EXIT;
1527                 }
1528                 ctx->lc_version = key_set_version;
1529         }
1530         return 0;
1531 }
1532
1533 static int keys_init(struct lu_context *ctx)
1534 {
1535         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1536         if (likely(ctx->lc_value != NULL))
1537                 return keys_fill(ctx);
1538
1539         return -ENOMEM;
1540 }
1541
1542 /**
1543  * Initialize context data-structure. Create values for all keys.
1544  */
1545 int lu_context_init(struct lu_context *ctx, __u32 tags)
1546 {
1547         int     rc;
1548
1549         memset(ctx, 0, sizeof *ctx);
1550         ctx->lc_state = LCS_INITIALIZED;
1551         ctx->lc_tags = tags;
1552         if (tags & LCT_REMEMBER) {
1553                 cfs_spin_lock(&lu_keys_guard);
1554                 cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
1555                 cfs_spin_unlock(&lu_keys_guard);
1556         } else {
1557                 CFS_INIT_LIST_HEAD(&ctx->lc_remember);
1558         }
1559
1560         rc = keys_init(ctx);
1561         if (rc != 0)
1562                 lu_context_fini(ctx);
1563
1564         return rc;
1565 }
1566 EXPORT_SYMBOL(lu_context_init);
1567
1568 /**
1569  * Finalize context data-structure. Destroy key values.
1570  */
1571 void lu_context_fini(struct lu_context *ctx)
1572 {
1573         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1574         ctx->lc_state = LCS_FINALIZED;
1575
1576         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1577                 LASSERT(cfs_list_empty(&ctx->lc_remember));
1578                 keys_fini(ctx);
1579
1580         } else { /* could race with key degister */
1581                 cfs_spin_lock(&lu_keys_guard);
1582                 keys_fini(ctx);
1583                 cfs_list_del_init(&ctx->lc_remember);
1584                 cfs_spin_unlock(&lu_keys_guard);
1585         }
1586 }
1587 EXPORT_SYMBOL(lu_context_fini);
1588
1589 /**
1590  * Called before entering context.
1591  */
1592 void lu_context_enter(struct lu_context *ctx)
1593 {
1594         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1595         ctx->lc_state = LCS_ENTERED;
1596 }
1597 EXPORT_SYMBOL(lu_context_enter);
1598
1599 /**
1600  * Called after exiting from \a ctx
1601  */
1602 void lu_context_exit(struct lu_context *ctx)
1603 {
1604         int i;
1605
1606         LINVRNT(ctx->lc_state == LCS_ENTERED);
1607         ctx->lc_state = LCS_LEFT;
1608         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1609                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1610                         if (ctx->lc_value[i] != NULL) {
1611                                 struct lu_context_key *key;
1612
1613                                 key = lu_keys[i];
1614                                 LASSERT(key != NULL);
1615                                 if (key->lct_exit != NULL)
1616                                         key->lct_exit(ctx,
1617                                                       key, ctx->lc_value[i]);
1618                         }
1619                 }
1620         }
1621 }
1622 EXPORT_SYMBOL(lu_context_exit);
1623
1624 /**
1625  * Allocate for context all missing keys that were registered after context
1626  * creation. key_set_version is only changed in rare cases when modules
1627  * are loaded and removed.
1628  */
1629 int lu_context_refill(struct lu_context *ctx)
1630 {
1631         return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
1632 }
1633 EXPORT_SYMBOL(lu_context_refill);
1634
1635 /**
1636  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1637  * obd being added. Currently, this is only used on client side, specifically
1638  * for echo device client, for other stack (like ptlrpc threads), context are
1639  * predefined when the lu_device type are registered, during the module probe
1640  * phase.
1641  */
1642 __u32 lu_context_tags_default = 0;
1643 __u32 lu_session_tags_default = 0;
1644
1645 void lu_context_tags_update(__u32 tags)
1646 {
1647         cfs_spin_lock(&lu_keys_guard);
1648         lu_context_tags_default |= tags;
1649         key_set_version ++;
1650         cfs_spin_unlock(&lu_keys_guard);
1651 }
1652 EXPORT_SYMBOL(lu_context_tags_update);
1653
1654 void lu_context_tags_clear(__u32 tags)
1655 {
1656         cfs_spin_lock(&lu_keys_guard);
1657         lu_context_tags_default &= ~tags;
1658         key_set_version ++;
1659         cfs_spin_unlock(&lu_keys_guard);
1660 }
1661 EXPORT_SYMBOL(lu_context_tags_clear);
1662
1663 void lu_session_tags_update(__u32 tags)
1664 {
1665         cfs_spin_lock(&lu_keys_guard);
1666         lu_session_tags_default |= tags;
1667         key_set_version ++;
1668         cfs_spin_unlock(&lu_keys_guard);
1669 }
1670 EXPORT_SYMBOL(lu_session_tags_update);
1671
1672 void lu_session_tags_clear(__u32 tags)
1673 {
1674         cfs_spin_lock(&lu_keys_guard);
1675         lu_session_tags_default &= ~tags;
1676         key_set_version ++;
1677         cfs_spin_unlock(&lu_keys_guard);
1678 }
1679 EXPORT_SYMBOL(lu_session_tags_clear);
1680
1681 int lu_env_init(struct lu_env *env, __u32 tags)
1682 {
1683         int result;
1684
1685         env->le_ses = NULL;
1686         result = lu_context_init(&env->le_ctx, tags);
1687         if (likely(result == 0))
1688                 lu_context_enter(&env->le_ctx);
1689         return result;
1690 }
1691 EXPORT_SYMBOL(lu_env_init);
1692
1693 void lu_env_fini(struct lu_env *env)
1694 {
1695         lu_context_exit(&env->le_ctx);
1696         lu_context_fini(&env->le_ctx);
1697         env->le_ses = NULL;
1698 }
1699 EXPORT_SYMBOL(lu_env_fini);
1700
1701 int lu_env_refill(struct lu_env *env)
1702 {
1703         int result;
1704
1705         result = lu_context_refill(&env->le_ctx);
1706         if (result == 0 && env->le_ses != NULL)
1707                 result = lu_context_refill(env->le_ses);
1708         return result;
1709 }
1710 EXPORT_SYMBOL(lu_env_refill);
1711
1712 /**
1713  * Currently, this API will only be used by echo client.
1714  * Because echo client and normal lustre client will share
1715  * same cl_env cache. So echo client needs to refresh
1716  * the env context after it get one from the cache, especially
1717  * when normal client and echo client co-exist in the same client.
1718  */
1719 int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
1720                           __u32 stags)
1721 {
1722         int    result;
1723
1724         if ((env->le_ctx.lc_tags & ctags) != ctags) {
1725                 env->le_ctx.lc_version = 0;
1726                 env->le_ctx.lc_tags |= ctags;
1727         }
1728
1729         if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
1730                 env->le_ses->lc_version = 0;
1731                 env->le_ses->lc_tags |= stags;
1732         }
1733
1734         result = lu_env_refill(env);
1735
1736         return result;
1737 }
1738 EXPORT_SYMBOL(lu_env_refill_by_tags);
1739
1740 static struct cfs_shrinker *lu_site_shrinker = NULL;
1741
1742 typedef struct lu_site_stats{
1743         unsigned        lss_populated;
1744         unsigned        lss_max_search;
1745         unsigned        lss_total;
1746         unsigned        lss_busy;
1747 } lu_site_stats_t;
1748
1749 static void lu_site_stats_get(cfs_hash_t *hs,
1750                               lu_site_stats_t *stats, int populated)
1751 {
1752         cfs_hash_bd_t bd;
1753         int           i;
1754
1755         cfs_hash_for_each_bucket(hs, &bd, i) {
1756                 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1757                 cfs_hlist_head_t        *hhead;
1758
1759                 cfs_hash_bd_lock(hs, &bd, 1);
1760                 stats->lss_busy  += bkt->lsb_busy;
1761                 stats->lss_total += cfs_hash_bd_count_get(&bd);
1762                 stats->lss_max_search = max((int)stats->lss_max_search,
1763                                             cfs_hash_bd_depmax_get(&bd));
1764                 if (!populated) {
1765                         cfs_hash_bd_unlock(hs, &bd, 1);
1766                         continue;
1767                 }
1768
1769                 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1770                         if (!cfs_hlist_empty(hhead))
1771                                 stats->lss_populated++;
1772                 }
1773                 cfs_hash_bd_unlock(hs, &bd, 1);
1774         }
1775 }
1776
1777 #ifdef __KERNEL__
1778
1779 static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
1780 {
1781         lu_site_stats_t stats;
1782         struct lu_site *s;
1783         struct lu_site *tmp;
1784         int cached = 0;
1785         int remain = shrink_param(sc, nr_to_scan);
1786         CFS_LIST_HEAD(splice);
1787
1788         if (remain != 0) {
1789                 if (!(shrink_param(sc, gfp_mask) & __GFP_FS))
1790                         return -1;
1791                 CDEBUG(D_INODE, "Shrink %d objects\n", remain);
1792         }
1793
1794         cfs_mutex_lock(&lu_sites_guard);
1795         cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1796                 if (shrink_param(sc, nr_to_scan) != 0) {
1797                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1798                         /*
1799                          * Move just shrunk site to the tail of site list to
1800                          * assure shrinking fairness.
1801                          */
1802                         cfs_list_move_tail(&s->ls_linkage, &splice);
1803                 }
1804
1805                 memset(&stats, 0, sizeof(stats));
1806                 lu_site_stats_get(s->ls_obj_hash, &stats, 0);
1807                 cached += stats.lss_total - stats.lss_busy;
1808                 if (shrink_param(sc, nr_to_scan) && remain <= 0)
1809                         break;
1810         }
1811         cfs_list_splice(&splice, lu_sites.prev);
1812         cfs_mutex_unlock(&lu_sites_guard);
1813
1814         cached = (cached / 100) * sysctl_vfs_cache_pressure;
1815         if (shrink_param(sc, nr_to_scan) == 0)
1816                 CDEBUG(D_INODE, "%d objects cached\n", cached);
1817         return cached;
1818 }
1819
1820 /*
1821  * Debugging stuff.
1822  */
1823
1824 /**
1825  * Environment to be used in debugger, contains all tags.
1826  */
1827 struct lu_env lu_debugging_env;
1828
1829 /**
1830  * Debugging printer function using printk().
1831  */
1832 int lu_printk_printer(const struct lu_env *env,
1833                       void *unused, const char *format, ...)
1834 {
1835         va_list args;
1836
1837         va_start(args, format);
1838         vprintk(format, args);
1839         va_end(args);
1840         return 0;
1841 }
1842
1843 void lu_debugging_setup(void)
1844 {
1845         lu_env_init(&lu_debugging_env, ~0);
1846 }
1847
1848 void lu_context_keys_dump(void)
1849 {
1850         int i;
1851
1852         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1853                 struct lu_context_key *key;
1854
1855                 key = lu_keys[i];
1856                 if (key != NULL) {
1857                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
1858                                i, key, key->lct_tags,
1859                                key->lct_init, key->lct_fini, key->lct_exit,
1860                                key->lct_index, cfs_atomic_read(&key->lct_used),
1861                                key->lct_owner ? key->lct_owner->name : "",
1862                                key->lct_owner);
1863                         lu_ref_print(&key->lct_reference);
1864                 }
1865         }
1866 }
1867 EXPORT_SYMBOL(lu_context_keys_dump);
1868 #else  /* !__KERNEL__ */
1869 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1870 {
1871         return 0;
1872 }
1873 #endif /* __KERNEL__ */
1874
1875 int  cl_global_init(void);
1876 void cl_global_fini(void);
1877 int  lu_ref_global_init(void);
1878 void lu_ref_global_fini(void);
1879
1880 int dt_global_init(void);
1881 void dt_global_fini(void);
1882
1883 int llo_global_init(void);
1884 void llo_global_fini(void);
1885
1886 /**
1887  * Initialization of global lu_* data.
1888  */
1889 int lu_global_init(void)
1890 {
1891         int result;
1892
1893         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
1894
1895         result = lu_ref_global_init();
1896         if (result != 0)
1897                 return result;
1898
1899         LU_CONTEXT_KEY_INIT(&lu_global_key);
1900         result = lu_context_key_register(&lu_global_key);
1901         if (result != 0)
1902                 return result;
1903         /*
1904          * At this level, we don't know what tags are needed, so allocate them
1905          * conservatively. This should not be too bad, because this
1906          * environment is global.
1907          */
1908         cfs_mutex_lock(&lu_sites_guard);
1909         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
1910         cfs_mutex_unlock(&lu_sites_guard);
1911         if (result != 0)
1912                 return result;
1913
1914         /*
1915          * seeks estimation: 3 seeks to read a record from oi, one to read
1916          * inode, one for ea. Unfortunately setting this high value results in
1917          * lu_object/inode cache consuming all the memory.
1918          */
1919         lu_site_shrinker = cfs_set_shrinker(CFS_DEFAULT_SEEKS, lu_cache_shrink);
1920         if (lu_site_shrinker == NULL)
1921                 return -ENOMEM;
1922
1923         result = lu_time_global_init();
1924         if (result)
1925                 GOTO(out, result);
1926
1927 #ifdef __KERNEL__
1928         result = dt_global_init();
1929         if (result)
1930                 GOTO(out, result);
1931
1932         result = llo_global_init();
1933         if (result)
1934                 GOTO(out, result);
1935 #endif
1936         result = cl_global_init();
1937 out:
1938
1939         return result;
1940 }
1941
1942 /**
1943  * Dual to lu_global_init().
1944  */
1945 void lu_global_fini(void)
1946 {
1947         cl_global_fini();
1948 #ifdef __KERNEL__
1949         llo_global_fini();
1950         dt_global_fini();
1951 #endif
1952         lu_time_global_fini();
1953         if (lu_site_shrinker != NULL) {
1954                 cfs_remove_shrinker(lu_site_shrinker);
1955                 lu_site_shrinker = NULL;
1956         }
1957
1958         lu_context_key_degister(&lu_global_key);
1959
1960         /*
1961          * Tear shrinker environment down _after_ de-registering
1962          * lu_global_key, because the latter has a value in the former.
1963          */
1964         cfs_mutex_lock(&lu_sites_guard);
1965         lu_env_fini(&lu_shrink_env);
1966         cfs_mutex_unlock(&lu_sites_guard);
1967
1968         lu_ref_global_fini();
1969 }
1970
1971 struct lu_buf LU_BUF_NULL = {
1972         .lb_buf = NULL,
1973         .lb_len = 0
1974 };
1975 EXPORT_SYMBOL(LU_BUF_NULL);
1976
1977 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
1978 {
1979 #ifdef LPROCFS
1980         struct lprocfs_counter ret;
1981
1982         lprocfs_stats_collect(stats, idx, &ret);
1983         return (__u32)ret.lc_count;
1984 #else
1985         return 0;
1986 #endif
1987 }
1988
1989 /**
1990  * Output site statistical counters into a buffer. Suitable for
1991  * lprocfs_rd_*()-style functions.
1992  */
1993 int lu_site_stats_print(const struct lu_site *s, char *page, int count)
1994 {
1995         lu_site_stats_t stats;
1996
1997         memset(&stats, 0, sizeof(stats));
1998         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
1999
2000         return snprintf(page, count, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
2001                         stats.lss_busy,
2002                         stats.lss_total,
2003                         stats.lss_populated,
2004                         CFS_HASH_NHLIST(s->ls_obj_hash),
2005                         stats.lss_max_search,
2006                         ls_stats_read(s->ls_stats, LU_SS_CREATED),
2007                         ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2008                         ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2009                         ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2010                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2011                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2012 }
2013 EXPORT_SYMBOL(lu_site_stats_print);
2014
2015 const char *lu_time_names[LU_TIME_NR] = {
2016         [LU_TIME_FIND_LOOKUP] = "find_lookup",
2017         [LU_TIME_FIND_ALLOC]  = "find_alloc",
2018         [LU_TIME_FIND_INSERT] = "find_insert"
2019 };
2020 EXPORT_SYMBOL(lu_time_names);
2021
2022 /**
2023  * Helper function to initialize a number of kmem slab caches at once.
2024  */
2025 int lu_kmem_init(struct lu_kmem_descr *caches)
2026 {
2027         int result;
2028         struct lu_kmem_descr *iter = caches;
2029
2030         for (result = 0; iter->ckd_cache != NULL; ++iter) {
2031                 *iter->ckd_cache = cfs_mem_cache_create(iter->ckd_name,
2032                                                         iter->ckd_size,
2033                                                         0, 0);
2034                 if (*iter->ckd_cache == NULL) {
2035                         result = -ENOMEM;
2036                         /* free all previously allocated caches */
2037                         lu_kmem_fini(caches);
2038                         break;
2039                 }
2040         }
2041         return result;
2042 }
2043 EXPORT_SYMBOL(lu_kmem_init);
2044
2045 /**
2046  * Helper function to finalize a number of kmem slab cached at once. Dual to
2047  * lu_kmem_init().
2048  */
2049 void lu_kmem_fini(struct lu_kmem_descr *caches)
2050 {
2051         int rc;
2052
2053         for (; caches->ckd_cache != NULL; ++caches) {
2054                 if (*caches->ckd_cache != NULL) {
2055                         rc = cfs_mem_cache_destroy(*caches->ckd_cache);
2056                         LASSERTF(rc == 0, "couldn't destroy %s slab\n",
2057                                  caches->ckd_name);
2058                         *caches->ckd_cache = NULL;
2059                 }
2060         }
2061 }
2062 EXPORT_SYMBOL(lu_kmem_fini);