Whamcloud - gitweb
LU-1302 llog: structures changes, llog_thread_info
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  *   Author: Nikita Danilov <nikita.danilov@sun.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46
47 #include <libcfs/libcfs.h>
48
49 #ifdef __KERNEL__
50 # include <linux/module.h>
51 #endif
52
53 /* hash_long() */
54 #include <libcfs/libcfs_hash.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_disk.h>
58 #include <lustre_fid.h>
59 #include <lu_object.h>
60 #include <libcfs/list.h>
61 /* lu_time_global_{init,fini}() */
62 #include <lu_time.h>
63
64 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
65
66 /**
67  * Decrease reference counter on object. If last reference is freed, return
68  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
69  * case, free object immediately.
70  */
71 void lu_object_put(const struct lu_env *env, struct lu_object *o)
72 {
73         struct lu_site_bkt_data *bkt;
74         struct lu_object_header *top;
75         struct lu_site          *site;
76         struct lu_object        *orig;
77         cfs_hash_bd_t            bd;
78
79         top  = o->lo_header;
80         site = o->lo_dev->ld_site;
81         orig = o;
82
83         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
84         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
85
86         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
87                 if (lu_object_is_dying(top)) {
88
89                         /*
90                          * somebody may be waiting for this, currently only
91                          * used for cl_object, see cl_object_put_last().
92                          */
93                         cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
94                 }
95                 return;
96         }
97
98         LASSERT(bkt->lsb_busy > 0);
99         bkt->lsb_busy--;
100         /*
101          * When last reference is released, iterate over object
102          * layers, and notify them that object is no longer busy.
103          */
104         cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
105                 if (o->lo_ops->loo_object_release != NULL)
106                         o->lo_ops->loo_object_release(env, o);
107         }
108
109         if (!lu_object_is_dying(top)) {
110                 LASSERT(cfs_list_empty(&top->loh_lru));
111                 cfs_list_add_tail(&top->loh_lru, &bkt->lsb_lru);
112                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
113                 return;
114         }
115
116         /*
117          * If object is dying (will not be cached), removed it
118          * from hash table and LRU.
119          *
120          * This is done with hash table and LRU lists locked. As the only
121          * way to acquire first reference to previously unreferenced
122          * object is through hash-table lookup (lu_object_find()),
123          * or LRU scanning (lu_site_purge()), that are done under hash-table
124          * and LRU lock, no race with concurrent object lookup is possible
125          * and we can safely destroy object below.
126          */
127         cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
128         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
129         /*
130          * Object was already removed from hash and lru above, can
131          * kill it.
132          */
133         lu_object_free(env, orig);
134 }
135 EXPORT_SYMBOL(lu_object_put);
136
137 /**
138  * Allocate new object.
139  *
140  * This follows object creation protocol, described in the comment within
141  * struct lu_device_operations definition.
142  */
143 static struct lu_object *lu_object_alloc(const struct lu_env *env,
144                                          struct lu_device *dev,
145                                          const struct lu_fid *f,
146                                          const struct lu_object_conf *conf)
147 {
148         struct lu_object *scan;
149         struct lu_object *top;
150         cfs_list_t *layers;
151         int clean;
152         int result;
153         ENTRY;
154
155         /*
156          * Create top-level object slice. This will also create
157          * lu_object_header.
158          */
159         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
160         if (top == NULL)
161                 RETURN(ERR_PTR(-ENOMEM));
162         /*
163          * This is the only place where object fid is assigned. It's constant
164          * after this point.
165          */
166         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
167         top->lo_header->loh_fid = *f;
168         layers = &top->lo_header->loh_layers;
169         do {
170                 /*
171                  * Call ->loo_object_init() repeatedly, until no more new
172                  * object slices are created.
173                  */
174                 clean = 1;
175                 cfs_list_for_each_entry(scan, layers, lo_linkage) {
176                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
177                                 continue;
178                         clean = 0;
179                         scan->lo_header = top->lo_header;
180                         result = scan->lo_ops->loo_object_init(env, scan, conf);
181                         if (result != 0) {
182                                 lu_object_free(env, top);
183                                 RETURN(ERR_PTR(result));
184                         }
185                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
186                 }
187         } while (!clean);
188
189         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
190                 if (scan->lo_ops->loo_object_start != NULL) {
191                         result = scan->lo_ops->loo_object_start(env, scan);
192                         if (result != 0) {
193                                 lu_object_free(env, top);
194                                 RETURN(ERR_PTR(result));
195                         }
196                 }
197         }
198
199         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
200         RETURN(top);
201 }
202
203 /**
204  * Free an object.
205  */
206 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
207 {
208         struct lu_site_bkt_data *bkt;
209         struct lu_site          *site;
210         struct lu_object        *scan;
211         cfs_list_t              *layers;
212         cfs_list_t               splice;
213
214         site   = o->lo_dev->ld_site;
215         layers = &o->lo_header->loh_layers;
216         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
217         /*
218          * First call ->loo_object_delete() method to release all resources.
219          */
220         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
221                 if (scan->lo_ops->loo_object_delete != NULL)
222                         scan->lo_ops->loo_object_delete(env, scan);
223         }
224
225         /*
226          * Then, splice object layers into stand-alone list, and call
227          * ->loo_object_free() on all layers to free memory. Splice is
228          * necessary, because lu_object_header is freed together with the
229          * top-level slice.
230          */
231         CFS_INIT_LIST_HEAD(&splice);
232         cfs_list_splice_init(layers, &splice);
233         while (!cfs_list_empty(&splice)) {
234                 /*
235                  * Free layers in bottom-to-top order, so that object header
236                  * lives as long as possible and ->loo_object_free() methods
237                  * can look at its contents.
238                  */
239                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
240                 cfs_list_del_init(&o->lo_linkage);
241                 LASSERT(o->lo_ops->loo_object_free != NULL);
242                 o->lo_ops->loo_object_free(env, o);
243         }
244
245         if (cfs_waitq_active(&bkt->lsb_marche_funebre))
246                 cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
247 }
248
249 /**
250  * Free \a nr objects from the cold end of the site LRU list.
251  */
252 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
253 {
254         struct lu_object_header *h;
255         struct lu_object_header *temp;
256         struct lu_site_bkt_data *bkt;
257         cfs_hash_bd_t            bd;
258         cfs_hash_bd_t            bd2;
259         cfs_list_t               dispose;
260         int                      did_sth;
261         int                      start;
262         int                      count;
263         int                      bnr;
264         int                      i;
265
266         CFS_INIT_LIST_HEAD(&dispose);
267         /*
268          * Under LRU list lock, scan LRU list and move unreferenced objects to
269          * the dispose list, removing them from LRU and hash table.
270          */
271         start = s->ls_purge_start;
272         bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
273  again:
274         did_sth = 0;
275         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
276                 if (i < start)
277                         continue;
278                 count = bnr;
279                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
280                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
281
282                 cfs_list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
283                         LASSERT(cfs_atomic_read(&h->loh_ref) == 0);
284
285                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
286                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
287
288                         cfs_hash_bd_del_locked(s->ls_obj_hash,
289                                                &bd2, &h->loh_hash);
290                         cfs_list_move(&h->loh_lru, &dispose);
291                         if (did_sth == 0)
292                                 did_sth = 1;
293
294                         if (nr != ~0 && --nr == 0)
295                                 break;
296
297                         if (count > 0 && --count == 0)
298                                 break;
299
300                 }
301                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
302                 cfs_cond_resched();
303                 /*
304                  * Free everything on the dispose list. This is safe against
305                  * races due to the reasons described in lu_object_put().
306                  */
307                 while (!cfs_list_empty(&dispose)) {
308                         h = container_of0(dispose.next,
309                                           struct lu_object_header, loh_lru);
310                         cfs_list_del_init(&h->loh_lru);
311                         lu_object_free(env, lu_object_top(h));
312                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
313                 }
314
315                 if (nr == 0)
316                         break;
317         }
318
319         if (nr != 0 && did_sth && start != 0) {
320                 start = 0; /* restart from the first bucket */
321                 goto again;
322         }
323         /* race on s->ls_purge_start, but nobody cares */
324         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
325
326         return nr;
327 }
328 EXPORT_SYMBOL(lu_site_purge);
329
330 /*
331  * Object printing.
332  *
333  * Code below has to jump through certain loops to output object description
334  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
335  * composes object description from strings that are parts of _lines_ of
336  * output (i.e., strings that are not terminated by newline). This doesn't fit
337  * very well into libcfs_debug_msg() interface that assumes that each message
338  * supplied to it is a self-contained output line.
339  *
340  * To work around this, strings are collected in a temporary buffer
341  * (implemented as a value of lu_cdebug_key key), until terminating newline
342  * character is detected.
343  *
344  */
345
346 enum {
347         /**
348          * Maximal line size.
349          *
350          * XXX overflow is not handled correctly.
351          */
352         LU_CDEBUG_LINE = 512
353 };
354
355 struct lu_cdebug_data {
356         /**
357          * Temporary buffer.
358          */
359         char lck_area[LU_CDEBUG_LINE];
360 };
361
362 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
363 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
364
365 /**
366  * Key, holding temporary buffer. This key is registered very early by
367  * lu_global_init().
368  */
369 struct lu_context_key lu_global_key = {
370         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
371                     LCT_MG_THREAD | LCT_CL_THREAD,
372         .lct_init = lu_global_key_init,
373         .lct_fini = lu_global_key_fini
374 };
375
376 /**
377  * Printer function emitting messages through libcfs_debug_msg().
378  */
379 int lu_cdebug_printer(const struct lu_env *env,
380                       void *cookie, const char *format, ...)
381 {
382         struct libcfs_debug_msg_data *msgdata = cookie;
383         struct lu_cdebug_data        *key;
384         int used;
385         int complete;
386         va_list args;
387
388         va_start(args, format);
389
390         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
391         LASSERT(key != NULL);
392
393         used = strlen(key->lck_area);
394         complete = format[strlen(format) - 1] == '\n';
395         /*
396          * Append new chunk to the buffer.
397          */
398         vsnprintf(key->lck_area + used,
399                   ARRAY_SIZE(key->lck_area) - used, format, args);
400         if (complete) {
401                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
402                         libcfs_debug_msg(msgdata, "%s", key->lck_area);
403                 key->lck_area[0] = 0;
404         }
405         va_end(args);
406         return 0;
407 }
408 EXPORT_SYMBOL(lu_cdebug_printer);
409
410 /**
411  * Print object header.
412  */
413 void lu_object_header_print(const struct lu_env *env, void *cookie,
414                             lu_printer_t printer,
415                             const struct lu_object_header *hdr)
416 {
417         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
418                    hdr, hdr->loh_flags, cfs_atomic_read(&hdr->loh_ref),
419                    PFID(&hdr->loh_fid),
420                    cfs_hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
421                    cfs_list_empty((cfs_list_t *)&hdr->loh_lru) ? \
422                    "" : " lru",
423                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
424 }
425 EXPORT_SYMBOL(lu_object_header_print);
426
427 /**
428  * Print human readable representation of the \a o to the \a printer.
429  */
430 void lu_object_print(const struct lu_env *env, void *cookie,
431                      lu_printer_t printer, const struct lu_object *o)
432 {
433         static const char ruler[] = "........................................";
434         struct lu_object_header *top;
435         int depth;
436
437         top = o->lo_header;
438         lu_object_header_print(env, cookie, printer, top);
439         (*printer)(env, cookie, "{ \n");
440         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
441                 depth = o->lo_depth + 4;
442
443                 /*
444                  * print `.' \a depth times followed by type name and address
445                  */
446                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
447                            o->lo_dev->ld_type->ldt_name, o);
448                 if (o->lo_ops->loo_object_print != NULL)
449                         o->lo_ops->loo_object_print(env, cookie, printer, o);
450                 (*printer)(env, cookie, "\n");
451         }
452         (*printer)(env, cookie, "} header@%p\n", top);
453 }
454 EXPORT_SYMBOL(lu_object_print);
455
456 /**
457  * Check object consistency.
458  */
459 int lu_object_invariant(const struct lu_object *o)
460 {
461         struct lu_object_header *top;
462
463         top = o->lo_header;
464         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
465                 if (o->lo_ops->loo_object_invariant != NULL &&
466                     !o->lo_ops->loo_object_invariant(o))
467                         return 0;
468         }
469         return 1;
470 }
471 EXPORT_SYMBOL(lu_object_invariant);
472
473 static struct lu_object *htable_lookup(struct lu_site *s,
474                                        cfs_hash_bd_t *bd,
475                                        const struct lu_fid *f,
476                                        cfs_waitlink_t *waiter,
477                                        __u64 *version)
478 {
479         struct lu_site_bkt_data *bkt;
480         struct lu_object_header *h;
481         cfs_hlist_node_t        *hnode;
482         __u64  ver = cfs_hash_bd_version_get(bd);
483
484         if (*version == ver)
485                 return NULL;
486
487         *version = ver;
488         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
489         /* cfs_hash_bd_lookup_intent is a somehow "internal" function
490          * of cfs_hash, but we don't want refcount on object right now */
491         hnode = cfs_hash_bd_lookup_locked(s->ls_obj_hash, bd, (void *)f);
492         if (hnode == NULL) {
493                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
494                 return NULL;
495         }
496
497         h = container_of0(hnode, struct lu_object_header, loh_hash);
498         if (likely(!lu_object_is_dying(h))) {
499                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
500                 cfs_list_del_init(&h->loh_lru);
501                 return lu_object_top(h);
502         }
503
504         /*
505          * Lookup found an object being destroyed this object cannot be
506          * returned (to assure that references to dying objects are eventually
507          * drained), and moreover, lookup has to wait until object is freed.
508          */
509         cfs_atomic_dec(&h->loh_ref);
510
511         cfs_waitlink_init(waiter);
512         cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
513         cfs_set_current_state(CFS_TASK_UNINT);
514         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
515         return ERR_PTR(-EAGAIN);
516 }
517
518 /**
519  * Search cache for an object with the fid \a f. If such object is found,
520  * return it. Otherwise, create new object, insert it into cache and return
521  * it. In any case, additional reference is acquired on the returned object.
522  */
523 struct lu_object *lu_object_find(const struct lu_env *env,
524                                  struct lu_device *dev, const struct lu_fid *f,
525                                  const struct lu_object_conf *conf)
526 {
527         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
528 }
529 EXPORT_SYMBOL(lu_object_find);
530
531 static struct lu_object *lu_object_new(const struct lu_env *env,
532                                        struct lu_device *dev,
533                                        const struct lu_fid *f,
534                                        const struct lu_object_conf *conf)
535 {
536         struct lu_object        *o;
537         cfs_hash_t              *hs;
538         cfs_hash_bd_t            bd;
539         struct lu_site_bkt_data *bkt;
540
541         o = lu_object_alloc(env, dev, f, conf);
542         if (unlikely(IS_ERR(o)))
543                 return o;
544
545         hs = dev->ld_site->ls_obj_hash;
546         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
547         bkt = cfs_hash_bd_extra_get(hs, &bd);
548         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
549         bkt->lsb_busy++;
550         cfs_hash_bd_unlock(hs, &bd, 1);
551         return o;
552 }
553
554 /**
555  * Core logic of lu_object_find*() functions.
556  */
557 static struct lu_object *lu_object_find_try(const struct lu_env *env,
558                                             struct lu_device *dev,
559                                             const struct lu_fid *f,
560                                             const struct lu_object_conf *conf,
561                                             cfs_waitlink_t *waiter)
562 {
563         struct lu_object      *o;
564         struct lu_object      *shadow;
565         struct lu_site        *s;
566         cfs_hash_t            *hs;
567         cfs_hash_bd_t          bd;
568         __u64                  version = 0;
569
570         /*
571          * This uses standard index maintenance protocol:
572          *
573          *     - search index under lock, and return object if found;
574          *     - otherwise, unlock index, allocate new object;
575          *     - lock index and search again;
576          *     - if nothing is found (usual case), insert newly created
577          *       object into index;
578          *     - otherwise (race: other thread inserted object), free
579          *       object just allocated.
580          *     - unlock index;
581          *     - return object.
582          *
583          * For "LOC_F_NEW" case, we are sure the object is new established.
584          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
585          * just alloc and insert directly.
586          *
587          * If dying object is found during index search, add @waiter to the
588          * site wait-queue and return ERR_PTR(-EAGAIN).
589          */
590         if (conf != NULL && conf->loc_flags & LOC_F_NEW)
591                 return lu_object_new(env, dev, f, conf);
592
593         s  = dev->ld_site;
594         hs = s->ls_obj_hash;
595         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
596         o = htable_lookup(s, &bd, f, waiter, &version);
597         cfs_hash_bd_unlock(hs, &bd, 1);
598         if (o != NULL)
599                 return o;
600
601         /*
602          * Allocate new object. This may result in rather complicated
603          * operations, including fld queries, inode loading, etc.
604          */
605         o = lu_object_alloc(env, dev, f, conf);
606         if (unlikely(IS_ERR(o)))
607                 return o;
608
609         LASSERT(lu_fid_eq(lu_object_fid(o), f));
610
611         cfs_hash_bd_lock(hs, &bd, 1);
612
613         shadow = htable_lookup(s, &bd, f, waiter, &version);
614         if (likely(shadow == NULL)) {
615                 struct lu_site_bkt_data *bkt;
616
617                 bkt = cfs_hash_bd_extra_get(hs, &bd);
618                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
619                 bkt->lsb_busy++;
620                 cfs_hash_bd_unlock(hs, &bd, 1);
621                 return o;
622         }
623
624         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
625         cfs_hash_bd_unlock(hs, &bd, 1);
626         lu_object_free(env, o);
627         return shadow;
628 }
629
630 /**
631  * Much like lu_object_find(), but top level device of object is specifically
632  * \a dev rather than top level device of the site. This interface allows
633  * objects of different "stacking" to be created within the same site.
634  */
635 struct lu_object *lu_object_find_at(const struct lu_env *env,
636                                     struct lu_device *dev,
637                                     const struct lu_fid *f,
638                                     const struct lu_object_conf *conf)
639 {
640         struct lu_site_bkt_data *bkt;
641         struct lu_object        *obj;
642         cfs_waitlink_t           wait;
643
644         while (1) {
645                 obj = lu_object_find_try(env, dev, f, conf, &wait);
646                 if (obj != ERR_PTR(-EAGAIN))
647                         return obj;
648                 /*
649                  * lu_object_find_try() already added waiter into the
650                  * wait queue.
651                  */
652                 cfs_waitq_wait(&wait, CFS_TASK_UNINT);
653                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
654                 cfs_waitq_del(&bkt->lsb_marche_funebre, &wait);
655         }
656 }
657 EXPORT_SYMBOL(lu_object_find_at);
658
659 /**
660  * Find object with given fid, and return its slice belonging to given device.
661  */
662 struct lu_object *lu_object_find_slice(const struct lu_env *env,
663                                        struct lu_device *dev,
664                                        const struct lu_fid *f,
665                                        const struct lu_object_conf *conf)
666 {
667         struct lu_object *top;
668         struct lu_object *obj;
669
670         top = lu_object_find(env, dev, f, conf);
671         if (!IS_ERR(top)) {
672                 obj = lu_object_locate(top->lo_header, dev->ld_type);
673                 if (obj == NULL)
674                         lu_object_put(env, top);
675         } else
676                 obj = top;
677         return obj;
678 }
679 EXPORT_SYMBOL(lu_object_find_slice);
680
681 /**
682  * Global list of all device types.
683  */
684 static CFS_LIST_HEAD(lu_device_types);
685
686 int lu_device_type_init(struct lu_device_type *ldt)
687 {
688         int result;
689
690         CFS_INIT_LIST_HEAD(&ldt->ldt_linkage);
691         result = ldt->ldt_ops->ldto_init(ldt);
692         if (result == 0)
693                 cfs_list_add(&ldt->ldt_linkage, &lu_device_types);
694         return result;
695 }
696 EXPORT_SYMBOL(lu_device_type_init);
697
698 void lu_device_type_fini(struct lu_device_type *ldt)
699 {
700         cfs_list_del_init(&ldt->ldt_linkage);
701         ldt->ldt_ops->ldto_fini(ldt);
702 }
703 EXPORT_SYMBOL(lu_device_type_fini);
704
705 void lu_types_stop(void)
706 {
707         struct lu_device_type *ldt;
708
709         cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
710                 if (ldt->ldt_device_nr == 0)
711                         ldt->ldt_ops->ldto_stop(ldt);
712         }
713 }
714 EXPORT_SYMBOL(lu_types_stop);
715
716 /**
717  * Global list of all sites on this node
718  */
719 static CFS_LIST_HEAD(lu_sites);
720 static CFS_DEFINE_MUTEX(lu_sites_guard);
721
722 /**
723  * Global environment used by site shrinker.
724  */
725 static struct lu_env lu_shrink_env;
726
727 struct lu_site_print_arg {
728         struct lu_env   *lsp_env;
729         void            *lsp_cookie;
730         lu_printer_t     lsp_printer;
731 };
732
733 static int
734 lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd,
735                   cfs_hlist_node_t *hnode, void *data)
736 {
737         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
738         struct lu_object_header  *h;
739
740         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
741         if (!cfs_list_empty(&h->loh_layers)) {
742                 const struct lu_object *o;
743
744                 o = lu_object_top(h);
745                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
746                                 arg->lsp_printer, o);
747         } else {
748                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
749                                        arg->lsp_printer, h);
750         }
751         return 0;
752 }
753
754 /**
755  * Print all objects in \a s.
756  */
757 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
758                    lu_printer_t printer)
759 {
760         struct lu_site_print_arg arg = {
761                 .lsp_env     = (struct lu_env *)env,
762                 .lsp_cookie  = cookie,
763                 .lsp_printer = printer,
764         };
765
766         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
767 }
768 EXPORT_SYMBOL(lu_site_print);
769
770 enum {
771         LU_CACHE_PERCENT_MAX     = 50,
772         LU_CACHE_PERCENT_DEFAULT = 20
773 };
774
775 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
776 CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
777                 "Percentage of memory to be used as lu_object cache");
778
779 /**
780  * Return desired hash table order.
781  */
782 static int lu_htable_order(void)
783 {
784         unsigned long cache_size;
785         int bits;
786
787         /*
788          * Calculate hash table size, assuming that we want reasonable
789          * performance when 20% of total memory is occupied by cache of
790          * lu_objects.
791          *
792          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
793          */
794         cache_size = cfs_num_physpages;
795
796 #if BITS_PER_LONG == 32
797         /* limit hashtable size for lowmem systems to low RAM */
798         if (cache_size > 1 << (30 - CFS_PAGE_SHIFT))
799                 cache_size = 1 << (30 - CFS_PAGE_SHIFT) * 3 / 4;
800 #endif
801
802         /* clear off unreasonable cache setting. */
803         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
804                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
805                       " the range of (0, %u]. Will use default value: %u.\n",
806                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
807                       LU_CACHE_PERCENT_DEFAULT);
808
809                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
810         }
811         cache_size = cache_size / 100 * lu_cache_percent *
812                 (CFS_PAGE_SIZE / 1024);
813
814         for (bits = 1; (1 << bits) < cache_size; ++bits) {
815                 ;
816         }
817         return bits;
818 }
819
820 static unsigned lu_obj_hop_hash(cfs_hash_t *hs,
821                                 const void *key, unsigned mask)
822 {
823         struct lu_fid  *fid = (struct lu_fid *)key;
824         __u32           hash;
825
826         hash = fid_flatten32(fid);
827         hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
828         hash = cfs_hash_long(hash, hs->hs_bkt_bits);
829
830         /* give me another random factor */
831         hash -= cfs_hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
832
833         hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
834         hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
835
836         return hash & mask;
837 }
838
839 static void *lu_obj_hop_object(cfs_hlist_node_t *hnode)
840 {
841         return cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
842 }
843
844 static void *lu_obj_hop_key(cfs_hlist_node_t *hnode)
845 {
846         struct lu_object_header *h;
847
848         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
849         return &h->loh_fid;
850 }
851
852 static int lu_obj_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
853 {
854         struct lu_object_header *h;
855
856         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
857         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
858 }
859
860 static void lu_obj_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
861 {
862         struct lu_object_header *h;
863
864         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
865         if (cfs_atomic_add_return(1, &h->loh_ref) == 1) {
866                 struct lu_site_bkt_data *bkt;
867                 cfs_hash_bd_t            bd;
868
869                 cfs_hash_bd_get(hs, &h->loh_fid, &bd);
870                 bkt = cfs_hash_bd_extra_get(hs, &bd);
871                 bkt->lsb_busy++;
872         }
873 }
874
875 static void lu_obj_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
876 {
877         LBUG(); /* we should never called it */
878 }
879
880 cfs_hash_ops_t lu_site_hash_ops = {
881         .hs_hash        = lu_obj_hop_hash,
882         .hs_key         = lu_obj_hop_key,
883         .hs_keycmp      = lu_obj_hop_keycmp,
884         .hs_object      = lu_obj_hop_object,
885         .hs_get         = lu_obj_hop_get,
886         .hs_put_locked  = lu_obj_hop_put_locked,
887 };
888
889 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
890 {
891         cfs_spin_lock(&s->ls_ld_lock);
892         if (cfs_list_empty(&d->ld_linkage))
893                 cfs_list_add(&d->ld_linkage, &s->ls_ld_linkage);
894         cfs_spin_unlock(&s->ls_ld_lock);
895 }
896 EXPORT_SYMBOL(lu_dev_add_linkage);
897
898 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
899 {
900         cfs_spin_lock(&s->ls_ld_lock);
901         cfs_list_del_init(&d->ld_linkage);
902         cfs_spin_unlock(&s->ls_ld_lock);
903 }
904 EXPORT_SYMBOL(lu_dev_del_linkage);
905
906 /**
907  * Initialize site \a s, with \a d as the top level device.
908  */
909 #define LU_SITE_BITS_MIN    12
910 #define LU_SITE_BITS_MAX    24
911 /**
912  * total 256 buckets, we don't want too many buckets because:
913  * - consume too much memory
914  * - avoid unbalanced LRU list
915  */
916 #define LU_SITE_BKT_BITS    8
917
918 int lu_site_init(struct lu_site *s, struct lu_device *top)
919 {
920         struct lu_site_bkt_data *bkt;
921         cfs_hash_bd_t bd;
922         char name[16];
923         int bits;
924         int i;
925         ENTRY;
926
927         memset(s, 0, sizeof *s);
928         bits = lu_htable_order();
929         snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
930         for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
931              bits >= LU_SITE_BITS_MIN; bits--) {
932                 s->ls_obj_hash = cfs_hash_create(name, bits, bits,
933                                                  bits - LU_SITE_BKT_BITS,
934                                                  sizeof(*bkt), 0, 0,
935                                                  &lu_site_hash_ops,
936                                                  CFS_HASH_SPIN_BKTLOCK |
937                                                  CFS_HASH_NO_ITEMREF |
938                                                  CFS_HASH_DEPTH |
939                                                  CFS_HASH_ASSERT_EMPTY);
940                 if (s->ls_obj_hash != NULL)
941                         break;
942         }
943
944         if (s->ls_obj_hash == NULL) {
945                 CERROR("failed to create lu_site hash with bits: %d\n", bits);
946                 return -ENOMEM;
947         }
948
949         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
950                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
951                 CFS_INIT_LIST_HEAD(&bkt->lsb_lru);
952                 cfs_waitq_init(&bkt->lsb_marche_funebre);
953         }
954
955         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
956         if (s->ls_stats == NULL) {
957                 cfs_hash_putref(s->ls_obj_hash);
958                 s->ls_obj_hash = NULL;
959                 return -ENOMEM;
960         }
961
962         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
963                              0, "created", "created");
964         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
965                              0, "cache_hit", "cache_hit");
966         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
967                              0, "cache_miss", "cache_miss");
968         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
969                              0, "cache_race", "cache_race");
970         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
971                              0, "cache_death_race", "cache_death_race");
972         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
973                              0, "lru_purged", "lru_purged");
974
975         CFS_INIT_LIST_HEAD(&s->ls_linkage);
976         s->ls_top_dev = top;
977         top->ld_site = s;
978         lu_device_get(top);
979         lu_ref_add(&top->ld_reference, "site-top", s);
980
981         CFS_INIT_LIST_HEAD(&s->ls_ld_linkage);
982         cfs_spin_lock_init(&s->ls_ld_lock);
983
984         lu_dev_add_linkage(s, top);
985
986         RETURN(0);
987 }
988 EXPORT_SYMBOL(lu_site_init);
989
990 /**
991  * Finalize \a s and release its resources.
992  */
993 void lu_site_fini(struct lu_site *s)
994 {
995         cfs_mutex_lock(&lu_sites_guard);
996         cfs_list_del_init(&s->ls_linkage);
997         cfs_mutex_unlock(&lu_sites_guard);
998
999         if (s->ls_obj_hash != NULL) {
1000                 cfs_hash_putref(s->ls_obj_hash);
1001                 s->ls_obj_hash = NULL;
1002         }
1003
1004         if (s->ls_top_dev != NULL) {
1005                 s->ls_top_dev->ld_site = NULL;
1006                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1007                 lu_device_put(s->ls_top_dev);
1008                 s->ls_top_dev = NULL;
1009         }
1010
1011         if (s->ls_stats != NULL)
1012                 lprocfs_free_stats(&s->ls_stats);
1013 }
1014 EXPORT_SYMBOL(lu_site_fini);
1015
1016 /**
1017  * Called when initialization of stack for this site is completed.
1018  */
1019 int lu_site_init_finish(struct lu_site *s)
1020 {
1021         int result;
1022         cfs_mutex_lock(&lu_sites_guard);
1023         result = lu_context_refill(&lu_shrink_env.le_ctx);
1024         if (result == 0)
1025                 cfs_list_add(&s->ls_linkage, &lu_sites);
1026         cfs_mutex_unlock(&lu_sites_guard);
1027         return result;
1028 }
1029 EXPORT_SYMBOL(lu_site_init_finish);
1030
1031 /**
1032  * Acquire additional reference on device \a d
1033  */
1034 void lu_device_get(struct lu_device *d)
1035 {
1036         cfs_atomic_inc(&d->ld_ref);
1037 }
1038 EXPORT_SYMBOL(lu_device_get);
1039
1040 /**
1041  * Release reference on device \a d.
1042  */
1043 void lu_device_put(struct lu_device *d)
1044 {
1045         LASSERT(cfs_atomic_read(&d->ld_ref) > 0);
1046         cfs_atomic_dec(&d->ld_ref);
1047 }
1048 EXPORT_SYMBOL(lu_device_put);
1049
1050 /**
1051  * Initialize device \a d of type \a t.
1052  */
1053 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1054 {
1055         if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL)
1056                 t->ldt_ops->ldto_start(t);
1057         memset(d, 0, sizeof *d);
1058         cfs_atomic_set(&d->ld_ref, 0);
1059         d->ld_type = t;
1060         lu_ref_init(&d->ld_reference);
1061         CFS_INIT_LIST_HEAD(&d->ld_linkage);
1062         return 0;
1063 }
1064 EXPORT_SYMBOL(lu_device_init);
1065
1066 /**
1067  * Finalize device \a d.
1068  */
1069 void lu_device_fini(struct lu_device *d)
1070 {
1071         struct lu_device_type *t;
1072
1073         t = d->ld_type;
1074         if (d->ld_obd != NULL) {
1075                 d->ld_obd->obd_lu_dev = NULL;
1076                 d->ld_obd = NULL;
1077         }
1078
1079         lu_ref_fini(&d->ld_reference);
1080         LASSERTF(cfs_atomic_read(&d->ld_ref) == 0,
1081                  "Refcount is %u\n", cfs_atomic_read(&d->ld_ref));
1082         LASSERT(t->ldt_device_nr > 0);
1083         if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL)
1084                 t->ldt_ops->ldto_stop(t);
1085 }
1086 EXPORT_SYMBOL(lu_device_fini);
1087
1088 /**
1089  * Initialize object \a o that is part of compound object \a h and was created
1090  * by device \a d.
1091  */
1092 int lu_object_init(struct lu_object *o,
1093                    struct lu_object_header *h, struct lu_device *d)
1094 {
1095         memset(o, 0, sizeof *o);
1096         o->lo_header = h;
1097         o->lo_dev    = d;
1098         lu_device_get(d);
1099         o->lo_dev_ref = lu_ref_add(&d->ld_reference, "lu_object", o);
1100         CFS_INIT_LIST_HEAD(&o->lo_linkage);
1101         return 0;
1102 }
1103 EXPORT_SYMBOL(lu_object_init);
1104
1105 /**
1106  * Finalize object and release its resources.
1107  */
1108 void lu_object_fini(struct lu_object *o)
1109 {
1110         struct lu_device *dev = o->lo_dev;
1111
1112         LASSERT(cfs_list_empty(&o->lo_linkage));
1113
1114         if (dev != NULL) {
1115                 lu_ref_del_at(&dev->ld_reference,
1116                               o->lo_dev_ref , "lu_object", o);
1117                 lu_device_put(dev);
1118                 o->lo_dev = NULL;
1119         }
1120 }
1121 EXPORT_SYMBOL(lu_object_fini);
1122
1123 /**
1124  * Add object \a o as first layer of compound object \a h
1125  *
1126  * This is typically called by the ->ldo_object_alloc() method of top-level
1127  * device.
1128  */
1129 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1130 {
1131         cfs_list_move(&o->lo_linkage, &h->loh_layers);
1132 }
1133 EXPORT_SYMBOL(lu_object_add_top);
1134
1135 /**
1136  * Add object \a o as a layer of compound object, going after \a before.
1137  *
1138  * This is typically called by the ->ldo_object_alloc() method of \a
1139  * before->lo_dev.
1140  */
1141 void lu_object_add(struct lu_object *before, struct lu_object *o)
1142 {
1143         cfs_list_move(&o->lo_linkage, &before->lo_linkage);
1144 }
1145 EXPORT_SYMBOL(lu_object_add);
1146
1147 /**
1148  * Initialize compound object.
1149  */
1150 int lu_object_header_init(struct lu_object_header *h)
1151 {
1152         memset(h, 0, sizeof *h);
1153         cfs_atomic_set(&h->loh_ref, 1);
1154         CFS_INIT_HLIST_NODE(&h->loh_hash);
1155         CFS_INIT_LIST_HEAD(&h->loh_lru);
1156         CFS_INIT_LIST_HEAD(&h->loh_layers);
1157         lu_ref_init(&h->loh_reference);
1158         return 0;
1159 }
1160 EXPORT_SYMBOL(lu_object_header_init);
1161
1162 /**
1163  * Finalize compound object.
1164  */
1165 void lu_object_header_fini(struct lu_object_header *h)
1166 {
1167         LASSERT(cfs_list_empty(&h->loh_layers));
1168         LASSERT(cfs_list_empty(&h->loh_lru));
1169         LASSERT(cfs_hlist_unhashed(&h->loh_hash));
1170         lu_ref_fini(&h->loh_reference);
1171 }
1172 EXPORT_SYMBOL(lu_object_header_fini);
1173
1174 /**
1175  * Given a compound object, find its slice, corresponding to the device type
1176  * \a dtype.
1177  */
1178 struct lu_object *lu_object_locate(struct lu_object_header *h,
1179                                    const struct lu_device_type *dtype)
1180 {
1181         struct lu_object *o;
1182
1183         cfs_list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1184                 if (o->lo_dev->ld_type == dtype)
1185                         return o;
1186         }
1187         return NULL;
1188 }
1189 EXPORT_SYMBOL(lu_object_locate);
1190
1191
1192
1193 /**
1194  * Finalize and free devices in the device stack.
1195  *
1196  * Finalize device stack by purging object cache, and calling
1197  * lu_device_type_operations::ldto_device_fini() and
1198  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1199  */
1200 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1201 {
1202         struct lu_site   *site = top->ld_site;
1203         struct lu_device *scan;
1204         struct lu_device *next;
1205
1206         lu_site_purge(env, site, ~0);
1207         for (scan = top; scan != NULL; scan = next) {
1208                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1209                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1210                 lu_device_put(scan);
1211         }
1212
1213         /* purge again. */
1214         lu_site_purge(env, site, ~0);
1215
1216         if (!cfs_hash_is_empty(site->ls_obj_hash)) {
1217                 /*
1218                  * Uh-oh, objects still exist.
1219                  */
1220                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
1221
1222                 lu_site_print(env, site, &msgdata, lu_cdebug_printer);
1223         }
1224
1225         for (scan = top; scan != NULL; scan = next) {
1226                 const struct lu_device_type *ldt = scan->ld_type;
1227                 struct obd_type             *type;
1228
1229                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1230                 type = ldt->ldt_obd_type;
1231                 if (type != NULL) {
1232                         type->typ_refcnt--;
1233                         class_put_type(type);
1234                 }
1235         }
1236 }
1237 EXPORT_SYMBOL(lu_stack_fini);
1238
1239 enum {
1240         /**
1241          * Maximal number of tld slots.
1242          */
1243         LU_CONTEXT_KEY_NR = 32
1244 };
1245
1246 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1247
1248 static DEFINE_SPINLOCK(lu_keys_guard);
1249
1250 /**
1251  * Global counter incremented whenever key is registered, unregistered,
1252  * revived or quiesced. This is used to void unnecessary calls to
1253  * lu_context_refill(). No locking is provided, as initialization and shutdown
1254  * are supposed to be externally serialized.
1255  */
1256 static unsigned key_set_version = 0;
1257
1258 /**
1259  * Register new key.
1260  */
1261 int lu_context_key_register(struct lu_context_key *key)
1262 {
1263         int result;
1264         int i;
1265
1266         LASSERT(key->lct_init != NULL);
1267         LASSERT(key->lct_fini != NULL);
1268         LASSERT(key->lct_tags != 0);
1269         LASSERT(key->lct_owner != NULL);
1270
1271         result = -ENFILE;
1272         cfs_spin_lock(&lu_keys_guard);
1273         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1274                 if (lu_keys[i] == NULL) {
1275                         key->lct_index = i;
1276                         cfs_atomic_set(&key->lct_used, 1);
1277                         lu_keys[i] = key;
1278                         lu_ref_init(&key->lct_reference);
1279                         result = 0;
1280                         ++key_set_version;
1281                         break;
1282                 }
1283         }
1284         cfs_spin_unlock(&lu_keys_guard);
1285         return result;
1286 }
1287 EXPORT_SYMBOL(lu_context_key_register);
1288
1289 static void key_fini(struct lu_context *ctx, int index)
1290 {
1291         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1292                 struct lu_context_key *key;
1293
1294                 key = lu_keys[index];
1295                 LASSERT(key != NULL);
1296                 LASSERT(key->lct_fini != NULL);
1297                 LASSERT(cfs_atomic_read(&key->lct_used) > 1);
1298
1299                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1300                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1301                 cfs_atomic_dec(&key->lct_used);
1302
1303                 LASSERT(key->lct_owner != NULL);
1304                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1305                         LINVRNT(cfs_module_refcount(key->lct_owner) > 0);
1306                         cfs_module_put(key->lct_owner);
1307                 }
1308                 ctx->lc_value[index] = NULL;
1309         }
1310 }
1311
1312 /**
1313  * Deregister key.
1314  */
1315 void lu_context_key_degister(struct lu_context_key *key)
1316 {
1317         LASSERT(cfs_atomic_read(&key->lct_used) >= 1);
1318         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1319
1320         lu_context_key_quiesce(key);
1321
1322         ++key_set_version;
1323         cfs_spin_lock(&lu_keys_guard);
1324         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1325         if (lu_keys[key->lct_index]) {
1326                 lu_keys[key->lct_index] = NULL;
1327                 lu_ref_fini(&key->lct_reference);
1328         }
1329         cfs_spin_unlock(&lu_keys_guard);
1330
1331         LASSERTF(cfs_atomic_read(&key->lct_used) == 1,
1332                  "key has instances: %d\n",
1333                  cfs_atomic_read(&key->lct_used));
1334 }
1335 EXPORT_SYMBOL(lu_context_key_degister);
1336
1337 /**
1338  * Register a number of keys. This has to be called after all keys have been
1339  * initialized by a call to LU_CONTEXT_KEY_INIT().
1340  */
1341 int lu_context_key_register_many(struct lu_context_key *k, ...)
1342 {
1343         struct lu_context_key *key = k;
1344         va_list args;
1345         int result;
1346
1347         va_start(args, k);
1348         do {
1349                 result = lu_context_key_register(key);
1350                 if (result)
1351                         break;
1352                 key = va_arg(args, struct lu_context_key *);
1353         } while (key != NULL);
1354         va_end(args);
1355
1356         if (result != 0) {
1357                 va_start(args, k);
1358                 while (k != key) {
1359                         lu_context_key_degister(k);
1360                         k = va_arg(args, struct lu_context_key *);
1361                 }
1362                 va_end(args);
1363         }
1364
1365         return result;
1366 }
1367 EXPORT_SYMBOL(lu_context_key_register_many);
1368
1369 /**
1370  * De-register a number of keys. This is a dual to
1371  * lu_context_key_register_many().
1372  */
1373 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1374 {
1375         va_list args;
1376
1377         va_start(args, k);
1378         do {
1379                 lu_context_key_degister(k);
1380                 k = va_arg(args, struct lu_context_key*);
1381         } while (k != NULL);
1382         va_end(args);
1383 }
1384 EXPORT_SYMBOL(lu_context_key_degister_many);
1385
1386 /**
1387  * Revive a number of keys.
1388  */
1389 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1390 {
1391         va_list args;
1392
1393         va_start(args, k);
1394         do {
1395                 lu_context_key_revive(k);
1396                 k = va_arg(args, struct lu_context_key*);
1397         } while (k != NULL);
1398         va_end(args);
1399 }
1400 EXPORT_SYMBOL(lu_context_key_revive_many);
1401
1402 /**
1403  * Quiescent a number of keys.
1404  */
1405 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1406 {
1407         va_list args;
1408
1409         va_start(args, k);
1410         do {
1411                 lu_context_key_quiesce(k);
1412                 k = va_arg(args, struct lu_context_key*);
1413         } while (k != NULL);
1414         va_end(args);
1415 }
1416 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1417
1418 /**
1419  * Return value associated with key \a key in context \a ctx.
1420  */
1421 void *lu_context_key_get(const struct lu_context *ctx,
1422                          const struct lu_context_key *key)
1423 {
1424         LINVRNT(ctx->lc_state == LCS_ENTERED);
1425         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1426         LASSERT(lu_keys[key->lct_index] == key);
1427         return ctx->lc_value[key->lct_index];
1428 }
1429 EXPORT_SYMBOL(lu_context_key_get);
1430
1431 /**
1432  * List of remembered contexts. XXX document me.
1433  */
1434 static CFS_LIST_HEAD(lu_context_remembered);
1435
1436 /**
1437  * Destroy \a key in all remembered contexts. This is used to destroy key
1438  * values in "shared" contexts (like service threads), when a module owning
1439  * the key is about to be unloaded.
1440  */
1441 void lu_context_key_quiesce(struct lu_context_key *key)
1442 {
1443         struct lu_context *ctx;
1444         extern unsigned cl_env_cache_purge(unsigned nr);
1445
1446         if (!(key->lct_tags & LCT_QUIESCENT)) {
1447                 /*
1448                  * XXX layering violation.
1449                  */
1450                 cl_env_cache_purge(~0);
1451                 key->lct_tags |= LCT_QUIESCENT;
1452                 /*
1453                  * XXX memory barrier has to go here.
1454                  */
1455                 cfs_spin_lock(&lu_keys_guard);
1456                 cfs_list_for_each_entry(ctx, &lu_context_remembered,
1457                                         lc_remember)
1458                         key_fini(ctx, key->lct_index);
1459                 cfs_spin_unlock(&lu_keys_guard);
1460                 ++key_set_version;
1461         }
1462 }
1463 EXPORT_SYMBOL(lu_context_key_quiesce);
1464
1465 void lu_context_key_revive(struct lu_context_key *key)
1466 {
1467         key->lct_tags &= ~LCT_QUIESCENT;
1468         ++key_set_version;
1469 }
1470 EXPORT_SYMBOL(lu_context_key_revive);
1471
1472 static void keys_fini(struct lu_context *ctx)
1473 {
1474         int     i;
1475
1476         if (ctx->lc_value == NULL)
1477                 return;
1478
1479         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1480                 key_fini(ctx, i);
1481
1482         OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1483         ctx->lc_value = NULL;
1484 }
1485
1486 static int keys_fill(struct lu_context *ctx)
1487 {
1488         int i;
1489
1490         LINVRNT(ctx->lc_value != NULL);
1491         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1492                 struct lu_context_key *key;
1493
1494                 key = lu_keys[i];
1495                 if (ctx->lc_value[i] == NULL && key != NULL &&
1496                     (key->lct_tags & ctx->lc_tags) &&
1497                     /*
1498                      * Don't create values for a LCT_QUIESCENT key, as this
1499                      * will pin module owning a key.
1500                      */
1501                     !(key->lct_tags & LCT_QUIESCENT)) {
1502                         void *value;
1503
1504                         LINVRNT(key->lct_init != NULL);
1505                         LINVRNT(key->lct_index == i);
1506
1507                         value = key->lct_init(ctx, key);
1508                         if (unlikely(IS_ERR(value)))
1509                                 return PTR_ERR(value);
1510
1511                         LASSERT(key->lct_owner != NULL);
1512                         if (!(ctx->lc_tags & LCT_NOREF))
1513                                 cfs_try_module_get(key->lct_owner);
1514                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1515                         cfs_atomic_inc(&key->lct_used);
1516                         /*
1517                          * This is the only place in the code, where an
1518                          * element of ctx->lc_value[] array is set to non-NULL
1519                          * value.
1520                          */
1521                         ctx->lc_value[i] = value;
1522                         if (key->lct_exit != NULL)
1523                                 ctx->lc_tags |= LCT_HAS_EXIT;
1524                 }
1525                 ctx->lc_version = key_set_version;
1526         }
1527         return 0;
1528 }
1529
1530 static int keys_init(struct lu_context *ctx)
1531 {
1532         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1533         if (likely(ctx->lc_value != NULL))
1534                 return keys_fill(ctx);
1535
1536         return -ENOMEM;
1537 }
1538
1539 /**
1540  * Initialize context data-structure. Create values for all keys.
1541  */
1542 int lu_context_init(struct lu_context *ctx, __u32 tags)
1543 {
1544         int     rc;
1545
1546         memset(ctx, 0, sizeof *ctx);
1547         ctx->lc_state = LCS_INITIALIZED;
1548         ctx->lc_tags = tags;
1549         if (tags & LCT_REMEMBER) {
1550                 cfs_spin_lock(&lu_keys_guard);
1551                 cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
1552                 cfs_spin_unlock(&lu_keys_guard);
1553         } else {
1554                 CFS_INIT_LIST_HEAD(&ctx->lc_remember);
1555         }
1556
1557         rc = keys_init(ctx);
1558         if (rc != 0)
1559                 lu_context_fini(ctx);
1560
1561         return rc;
1562 }
1563 EXPORT_SYMBOL(lu_context_init);
1564
1565 /**
1566  * Finalize context data-structure. Destroy key values.
1567  */
1568 void lu_context_fini(struct lu_context *ctx)
1569 {
1570         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1571         ctx->lc_state = LCS_FINALIZED;
1572
1573         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1574                 LASSERT(cfs_list_empty(&ctx->lc_remember));
1575                 keys_fini(ctx);
1576
1577         } else { /* could race with key degister */
1578                 cfs_spin_lock(&lu_keys_guard);
1579                 keys_fini(ctx);
1580                 cfs_list_del_init(&ctx->lc_remember);
1581                 cfs_spin_unlock(&lu_keys_guard);
1582         }
1583 }
1584 EXPORT_SYMBOL(lu_context_fini);
1585
1586 /**
1587  * Called before entering context.
1588  */
1589 void lu_context_enter(struct lu_context *ctx)
1590 {
1591         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1592         ctx->lc_state = LCS_ENTERED;
1593 }
1594 EXPORT_SYMBOL(lu_context_enter);
1595
1596 /**
1597  * Called after exiting from \a ctx
1598  */
1599 void lu_context_exit(struct lu_context *ctx)
1600 {
1601         int i;
1602
1603         LINVRNT(ctx->lc_state == LCS_ENTERED);
1604         ctx->lc_state = LCS_LEFT;
1605         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1606                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1607                         if (ctx->lc_value[i] != NULL) {
1608                                 struct lu_context_key *key;
1609
1610                                 key = lu_keys[i];
1611                                 LASSERT(key != NULL);
1612                                 if (key->lct_exit != NULL)
1613                                         key->lct_exit(ctx,
1614                                                       key, ctx->lc_value[i]);
1615                         }
1616                 }
1617         }
1618 }
1619 EXPORT_SYMBOL(lu_context_exit);
1620
1621 /**
1622  * Allocate for context all missing keys that were registered after context
1623  * creation. key_set_version is only changed in rare cases when modules
1624  * are loaded and removed.
1625  */
1626 int lu_context_refill(struct lu_context *ctx)
1627 {
1628         return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
1629 }
1630 EXPORT_SYMBOL(lu_context_refill);
1631
1632 /**
1633  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1634  * obd being added. Currently, this is only used on client side, specifically
1635  * for echo device client, for other stack (like ptlrpc threads), context are
1636  * predefined when the lu_device type are registered, during the module probe
1637  * phase.
1638  */
1639 __u32 lu_context_tags_default = 0;
1640 __u32 lu_session_tags_default = 0;
1641
1642 void lu_context_tags_update(__u32 tags)
1643 {
1644         cfs_spin_lock(&lu_keys_guard);
1645         lu_context_tags_default |= tags;
1646         key_set_version ++;
1647         cfs_spin_unlock(&lu_keys_guard);
1648 }
1649 EXPORT_SYMBOL(lu_context_tags_update);
1650
1651 void lu_context_tags_clear(__u32 tags)
1652 {
1653         cfs_spin_lock(&lu_keys_guard);
1654         lu_context_tags_default &= ~tags;
1655         key_set_version ++;
1656         cfs_spin_unlock(&lu_keys_guard);
1657 }
1658 EXPORT_SYMBOL(lu_context_tags_clear);
1659
1660 void lu_session_tags_update(__u32 tags)
1661 {
1662         cfs_spin_lock(&lu_keys_guard);
1663         lu_session_tags_default |= tags;
1664         key_set_version ++;
1665         cfs_spin_unlock(&lu_keys_guard);
1666 }
1667 EXPORT_SYMBOL(lu_session_tags_update);
1668
1669 void lu_session_tags_clear(__u32 tags)
1670 {
1671         cfs_spin_lock(&lu_keys_guard);
1672         lu_session_tags_default &= ~tags;
1673         key_set_version ++;
1674         cfs_spin_unlock(&lu_keys_guard);
1675 }
1676 EXPORT_SYMBOL(lu_session_tags_clear);
1677
1678 int lu_env_init(struct lu_env *env, __u32 tags)
1679 {
1680         int result;
1681
1682         env->le_ses = NULL;
1683         result = lu_context_init(&env->le_ctx, tags);
1684         if (likely(result == 0))
1685                 lu_context_enter(&env->le_ctx);
1686         return result;
1687 }
1688 EXPORT_SYMBOL(lu_env_init);
1689
1690 void lu_env_fini(struct lu_env *env)
1691 {
1692         lu_context_exit(&env->le_ctx);
1693         lu_context_fini(&env->le_ctx);
1694         env->le_ses = NULL;
1695 }
1696 EXPORT_SYMBOL(lu_env_fini);
1697
1698 int lu_env_refill(struct lu_env *env)
1699 {
1700         int result;
1701
1702         result = lu_context_refill(&env->le_ctx);
1703         if (result == 0 && env->le_ses != NULL)
1704                 result = lu_context_refill(env->le_ses);
1705         return result;
1706 }
1707 EXPORT_SYMBOL(lu_env_refill);
1708
1709 /**
1710  * Currently, this API will only be used by echo client.
1711  * Because echo client and normal lustre client will share
1712  * same cl_env cache. So echo client needs to refresh
1713  * the env context after it get one from the cache, especially
1714  * when normal client and echo client co-exist in the same client.
1715  */
1716 int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
1717                           __u32 stags)
1718 {
1719         int    result;
1720
1721         if ((env->le_ctx.lc_tags & ctags) != ctags) {
1722                 env->le_ctx.lc_version = 0;
1723                 env->le_ctx.lc_tags |= ctags;
1724         }
1725
1726         if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
1727                 env->le_ses->lc_version = 0;
1728                 env->le_ses->lc_tags |= stags;
1729         }
1730
1731         result = lu_env_refill(env);
1732
1733         return result;
1734 }
1735 EXPORT_SYMBOL(lu_env_refill_by_tags);
1736
1737 static struct cfs_shrinker *lu_site_shrinker = NULL;
1738
1739 typedef struct lu_site_stats{
1740         unsigned        lss_populated;
1741         unsigned        lss_max_search;
1742         unsigned        lss_total;
1743         unsigned        lss_busy;
1744 } lu_site_stats_t;
1745
1746 static void lu_site_stats_get(cfs_hash_t *hs,
1747                               lu_site_stats_t *stats, int populated)
1748 {
1749         cfs_hash_bd_t bd;
1750         int           i;
1751
1752         cfs_hash_for_each_bucket(hs, &bd, i) {
1753                 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1754                 cfs_hlist_head_t        *hhead;
1755
1756                 cfs_hash_bd_lock(hs, &bd, 1);
1757                 stats->lss_busy  += bkt->lsb_busy;
1758                 stats->lss_total += cfs_hash_bd_count_get(&bd);
1759                 stats->lss_max_search = max((int)stats->lss_max_search,
1760                                             cfs_hash_bd_depmax_get(&bd));
1761                 if (!populated) {
1762                         cfs_hash_bd_unlock(hs, &bd, 1);
1763                         continue;
1764                 }
1765
1766                 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1767                         if (!cfs_hlist_empty(hhead))
1768                                 stats->lss_populated++;
1769                 }
1770                 cfs_hash_bd_unlock(hs, &bd, 1);
1771         }
1772 }
1773
1774 #ifdef __KERNEL__
1775
1776 static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
1777 {
1778         lu_site_stats_t stats;
1779         struct lu_site *s;
1780         struct lu_site *tmp;
1781         int cached = 0;
1782         int remain = shrink_param(sc, nr_to_scan);
1783         CFS_LIST_HEAD(splice);
1784
1785         if (remain != 0) {
1786                 if (!(shrink_param(sc, gfp_mask) & __GFP_FS))
1787                         return -1;
1788                 CDEBUG(D_INODE, "Shrink %d objects\n", remain);
1789         }
1790
1791         cfs_mutex_lock(&lu_sites_guard);
1792         cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1793                 if (shrink_param(sc, nr_to_scan) != 0) {
1794                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1795                         /*
1796                          * Move just shrunk site to the tail of site list to
1797                          * assure shrinking fairness.
1798                          */
1799                         cfs_list_move_tail(&s->ls_linkage, &splice);
1800                 }
1801
1802                 memset(&stats, 0, sizeof(stats));
1803                 lu_site_stats_get(s->ls_obj_hash, &stats, 0);
1804                 cached += stats.lss_total - stats.lss_busy;
1805                 if (shrink_param(sc, nr_to_scan) && remain <= 0)
1806                         break;
1807         }
1808         cfs_list_splice(&splice, lu_sites.prev);
1809         cfs_mutex_unlock(&lu_sites_guard);
1810
1811         cached = (cached / 100) * sysctl_vfs_cache_pressure;
1812         if (shrink_param(sc, nr_to_scan) == 0)
1813                 CDEBUG(D_INODE, "%d objects cached\n", cached);
1814         return cached;
1815 }
1816
1817 /*
1818  * Debugging stuff.
1819  */
1820
1821 /**
1822  * Environment to be used in debugger, contains all tags.
1823  */
1824 struct lu_env lu_debugging_env;
1825
1826 /**
1827  * Debugging printer function using printk().
1828  */
1829 int lu_printk_printer(const struct lu_env *env,
1830                       void *unused, const char *format, ...)
1831 {
1832         va_list args;
1833
1834         va_start(args, format);
1835         vprintk(format, args);
1836         va_end(args);
1837         return 0;
1838 }
1839
1840 void lu_debugging_setup(void)
1841 {
1842         lu_env_init(&lu_debugging_env, ~0);
1843 }
1844
1845 void lu_context_keys_dump(void)
1846 {
1847         int i;
1848
1849         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1850                 struct lu_context_key *key;
1851
1852                 key = lu_keys[i];
1853                 if (key != NULL) {
1854                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
1855                                i, key, key->lct_tags,
1856                                key->lct_init, key->lct_fini, key->lct_exit,
1857                                key->lct_index, cfs_atomic_read(&key->lct_used),
1858                                key->lct_owner ? key->lct_owner->name : "",
1859                                key->lct_owner);
1860                         lu_ref_print(&key->lct_reference);
1861                 }
1862         }
1863 }
1864 EXPORT_SYMBOL(lu_context_keys_dump);
1865 #else  /* !__KERNEL__ */
1866 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1867 {
1868         return 0;
1869 }
1870 #endif /* __KERNEL__ */
1871
1872 int  cl_global_init(void);
1873 void cl_global_fini(void);
1874 int  lu_ref_global_init(void);
1875 void lu_ref_global_fini(void);
1876
1877 int dt_global_init(void);
1878 void dt_global_fini(void);
1879
1880 int llo_global_init(void);
1881 void llo_global_fini(void);
1882
1883 /**
1884  * Initialization of global lu_* data.
1885  */
1886 int lu_global_init(void)
1887 {
1888         int result;
1889
1890         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
1891
1892         result = lu_ref_global_init();
1893         if (result != 0)
1894                 return result;
1895
1896         LU_CONTEXT_KEY_INIT(&lu_global_key);
1897         result = lu_context_key_register(&lu_global_key);
1898         if (result != 0)
1899                 return result;
1900         /*
1901          * At this level, we don't know what tags are needed, so allocate them
1902          * conservatively. This should not be too bad, because this
1903          * environment is global.
1904          */
1905         cfs_mutex_lock(&lu_sites_guard);
1906         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
1907         cfs_mutex_unlock(&lu_sites_guard);
1908         if (result != 0)
1909                 return result;
1910
1911         /*
1912          * seeks estimation: 3 seeks to read a record from oi, one to read
1913          * inode, one for ea. Unfortunately setting this high value results in
1914          * lu_object/inode cache consuming all the memory.
1915          */
1916         lu_site_shrinker = cfs_set_shrinker(CFS_DEFAULT_SEEKS, lu_cache_shrink);
1917         if (lu_site_shrinker == NULL)
1918                 return -ENOMEM;
1919
1920         result = lu_time_global_init();
1921         if (result)
1922                 GOTO(out, result);
1923
1924 #ifdef __KERNEL__
1925         result = dt_global_init();
1926         if (result)
1927                 GOTO(out, result);
1928
1929         result = llo_global_init();
1930         if (result)
1931                 GOTO(out, result);
1932 #endif
1933         result = cl_global_init();
1934 out:
1935
1936         return result;
1937 }
1938
1939 /**
1940  * Dual to lu_global_init().
1941  */
1942 void lu_global_fini(void)
1943 {
1944         cl_global_fini();
1945 #ifdef __KERNEL__
1946         llo_global_fini();
1947         dt_global_fini();
1948 #endif
1949         lu_time_global_fini();
1950         if (lu_site_shrinker != NULL) {
1951                 cfs_remove_shrinker(lu_site_shrinker);
1952                 lu_site_shrinker = NULL;
1953         }
1954
1955         lu_context_key_degister(&lu_global_key);
1956
1957         /*
1958          * Tear shrinker environment down _after_ de-registering
1959          * lu_global_key, because the latter has a value in the former.
1960          */
1961         cfs_mutex_lock(&lu_sites_guard);
1962         lu_env_fini(&lu_shrink_env);
1963         cfs_mutex_unlock(&lu_sites_guard);
1964
1965         lu_ref_global_fini();
1966 }
1967
1968 struct lu_buf LU_BUF_NULL = {
1969         .lb_buf = NULL,
1970         .lb_len = 0
1971 };
1972 EXPORT_SYMBOL(LU_BUF_NULL);
1973
1974 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
1975 {
1976 #ifdef LPROCFS
1977         struct lprocfs_counter ret;
1978
1979         lprocfs_stats_collect(stats, idx, &ret);
1980         return (__u32)ret.lc_count;
1981 #else
1982         return 0;
1983 #endif
1984 }
1985
1986 /**
1987  * Output site statistical counters into a buffer. Suitable for
1988  * lprocfs_rd_*()-style functions.
1989  */
1990 int lu_site_stats_print(const struct lu_site *s, char *page, int count)
1991 {
1992         lu_site_stats_t stats;
1993
1994         memset(&stats, 0, sizeof(stats));
1995         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
1996
1997         return snprintf(page, count, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
1998                         stats.lss_busy,
1999                         stats.lss_total,
2000                         stats.lss_populated,
2001                         CFS_HASH_NHLIST(s->ls_obj_hash),
2002                         stats.lss_max_search,
2003                         ls_stats_read(s->ls_stats, LU_SS_CREATED),
2004                         ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2005                         ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2006                         ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2007                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2008                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2009 }
2010 EXPORT_SYMBOL(lu_site_stats_print);
2011
2012 const char *lu_time_names[LU_TIME_NR] = {
2013         [LU_TIME_FIND_LOOKUP] = "find_lookup",
2014         [LU_TIME_FIND_ALLOC]  = "find_alloc",
2015         [LU_TIME_FIND_INSERT] = "find_insert"
2016 };
2017 EXPORT_SYMBOL(lu_time_names);
2018
2019 /**
2020  * Helper function to initialize a number of kmem slab caches at once.
2021  */
2022 int lu_kmem_init(struct lu_kmem_descr *caches)
2023 {
2024         int result;
2025         struct lu_kmem_descr *iter = caches;
2026
2027         for (result = 0; iter->ckd_cache != NULL; ++iter) {
2028                 *iter->ckd_cache = cfs_mem_cache_create(iter->ckd_name,
2029                                                         iter->ckd_size,
2030                                                         0, 0);
2031                 if (*iter->ckd_cache == NULL) {
2032                         result = -ENOMEM;
2033                         /* free all previously allocated caches */
2034                         lu_kmem_fini(caches);
2035                         break;
2036                 }
2037         }
2038         return result;
2039 }
2040 EXPORT_SYMBOL(lu_kmem_init);
2041
2042 /**
2043  * Helper function to finalize a number of kmem slab cached at once. Dual to
2044  * lu_kmem_init().
2045  */
2046 void lu_kmem_fini(struct lu_kmem_descr *caches)
2047 {
2048         int rc;
2049
2050         for (; caches->ckd_cache != NULL; ++caches) {
2051                 if (*caches->ckd_cache != NULL) {
2052                         rc = cfs_mem_cache_destroy(*caches->ckd_cache);
2053                         LASSERTF(rc == 0, "couldn't destroy %s slab\n",
2054                                  caches->ckd_name);
2055                         *caches->ckd_cache = NULL;
2056                 }
2057         }
2058 }
2059 EXPORT_SYMBOL(lu_kmem_fini);