Whamcloud - gitweb
94e47a2d909d854c848cadd159fe4337f2fefb9e
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  *   Author: Nikita Danilov <nikita.danilov@sun.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49
50 #include <libcfs/libcfs.h>
51
52 #ifdef __KERNEL__
53 # include <linux/module.h>
54 #endif
55
56 /* hash_long() */
57 #include <libcfs/libcfs_hash.h>
58 #include <obd_class.h>
59 #include <obd_support.h>
60 #include <lustre_disk.h>
61 #include <lustre_fid.h>
62 #include <lu_object.h>
63 #include <libcfs/list.h>
64 /* lu_time_global_{init,fini}() */
65 #include <lu_time.h>
66
67 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
68
69 /**
70  * Decrease reference counter on object. If last reference is freed, return
71  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
72  * case, free object immediately.
73  */
74 void lu_object_put(const struct lu_env *env, struct lu_object *o)
75 {
76         struct lu_site_bkt_data *bkt;
77         struct lu_object_header *top;
78         struct lu_site          *site;
79         struct lu_object        *orig;
80         cfs_hash_bd_t            bd;
81
82         top  = o->lo_header;
83         site = o->lo_dev->ld_site;
84         orig = o;
85
86         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
87         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
88
89         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
90                 if (lu_object_is_dying(top)) {
91
92                         /*
93                          * somebody may be waiting for this, currently only
94                          * used for cl_object, see cl_object_put_last().
95                          */
96                         cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
97                 }
98                 return;
99         }
100
101         LASSERT(bkt->lsb_busy > 0);
102         bkt->lsb_busy--;
103         /*
104          * When last reference is released, iterate over object
105          * layers, and notify them that object is no longer busy.
106          */
107         cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
108                 if (o->lo_ops->loo_object_release != NULL)
109                         o->lo_ops->loo_object_release(env, o);
110         }
111
112         if (!lu_object_is_dying(top)) {
113                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
114                 return;
115         }
116
117         /*
118          * If object is dying (will not be cached), removed it
119          * from hash table and LRU.
120          *
121          * This is done with hash table and LRU lists locked. As the only
122          * way to acquire first reference to previously unreferenced
123          * object is through hash-table lookup (lu_object_find()),
124          * or LRU scanning (lu_site_purge()), that are done under hash-table
125          * and LRU lock, no race with concurrent object lookup is possible
126          * and we can safely destroy object below.
127          */
128         cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
129         cfs_list_del_init(&top->loh_lru);
130         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
131         /*
132          * Object was already removed from hash and lru above, can
133          * kill it.
134          */
135         lu_object_free(env, orig);
136 }
137 EXPORT_SYMBOL(lu_object_put);
138
139 /**
140  * Allocate new object.
141  *
142  * This follows object creation protocol, described in the comment within
143  * struct lu_device_operations definition.
144  */
145 static struct lu_object *lu_object_alloc(const struct lu_env *env,
146                                          struct lu_device *dev,
147                                          const struct lu_fid *f,
148                                          const struct lu_object_conf *conf)
149 {
150         struct lu_object *scan;
151         struct lu_object *top;
152         cfs_list_t *layers;
153         int clean;
154         int result;
155         ENTRY;
156
157         /*
158          * Create top-level object slice. This will also create
159          * lu_object_header.
160          */
161         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
162         if (top == NULL)
163                 RETURN(ERR_PTR(-ENOMEM));
164         /*
165          * This is the only place where object fid is assigned. It's constant
166          * after this point.
167          */
168         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
169         top->lo_header->loh_fid = *f;
170         layers = &top->lo_header->loh_layers;
171         do {
172                 /*
173                  * Call ->loo_object_init() repeatedly, until no more new
174                  * object slices are created.
175                  */
176                 clean = 1;
177                 cfs_list_for_each_entry(scan, layers, lo_linkage) {
178                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
179                                 continue;
180                         clean = 0;
181                         scan->lo_header = top->lo_header;
182                         result = scan->lo_ops->loo_object_init(env, scan, conf);
183                         if (result != 0) {
184                                 lu_object_free(env, top);
185                                 RETURN(ERR_PTR(result));
186                         }
187                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
188                 }
189         } while (!clean);
190
191         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
192                 if (scan->lo_ops->loo_object_start != NULL) {
193                         result = scan->lo_ops->loo_object_start(env, scan);
194                         if (result != 0) {
195                                 lu_object_free(env, top);
196                                 RETURN(ERR_PTR(result));
197                         }
198                 }
199         }
200
201         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
202         RETURN(top);
203 }
204
205 /**
206  * Free an object.
207  */
208 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
209 {
210         struct lu_site_bkt_data *bkt;
211         struct lu_site          *site;
212         struct lu_object        *scan;
213         cfs_list_t              *layers;
214         cfs_list_t               splice;
215
216         site   = o->lo_dev->ld_site;
217         layers = &o->lo_header->loh_layers;
218         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
219         /*
220          * First call ->loo_object_delete() method to release all resources.
221          */
222         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
223                 if (scan->lo_ops->loo_object_delete != NULL)
224                         scan->lo_ops->loo_object_delete(env, scan);
225         }
226
227         /*
228          * Then, splice object layers into stand-alone list, and call
229          * ->loo_object_free() on all layers to free memory. Splice is
230          * necessary, because lu_object_header is freed together with the
231          * top-level slice.
232          */
233         CFS_INIT_LIST_HEAD(&splice);
234         cfs_list_splice_init(layers, &splice);
235         while (!cfs_list_empty(&splice)) {
236                 /*
237                  * Free layers in bottom-to-top order, so that object header
238                  * lives as long as possible and ->loo_object_free() methods
239                  * can look at its contents.
240                  */
241                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
242                 cfs_list_del_init(&o->lo_linkage);
243                 LASSERT(o->lo_ops->loo_object_free != NULL);
244                 o->lo_ops->loo_object_free(env, o);
245         }
246
247         if (cfs_waitq_active(&bkt->lsb_marche_funebre))
248                 cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
249 }
250
251 /**
252  * Free \a nr objects from the cold end of the site LRU list.
253  */
254 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
255 {
256         struct lu_object_header *h;
257         struct lu_object_header *temp;
258         struct lu_site_bkt_data *bkt;
259         cfs_hash_bd_t            bd;
260         cfs_hash_bd_t            bd2;
261         cfs_list_t               dispose;
262         int                      did_sth;
263         int                      start;
264         int                      count;
265         int                      bnr;
266         int                      i;
267
268         CFS_INIT_LIST_HEAD(&dispose);
269         /*
270          * Under LRU list lock, scan LRU list and move unreferenced objects to
271          * the dispose list, removing them from LRU and hash table.
272          */
273         start = s->ls_purge_start;
274         bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
275  again:
276         did_sth = 0;
277         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
278                 if (i < start)
279                         continue;
280                 count = bnr;
281                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
282                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
283
284                 cfs_list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
285                         /*
286                          * Objects are sorted in lru order, and "busy"
287                          * objects (ones with h->loh_ref > 0) naturally tend to
288                          * live near hot end that we scan last. Unfortunately,
289                          * sites usually have small (less then ten) number of
290                          * busy yet rarely accessed objects (some global
291                          * objects, accessed directly through pointers,
292                          * bypassing hash table).
293                          * Currently algorithm scans them over and over again.
294                          * Probably we should move busy objects out of LRU,
295                          * or we can live with that.
296                          */
297                         if (cfs_atomic_read(&h->loh_ref) > 0)
298                                 continue;
299
300                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
301                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
302
303                         cfs_hash_bd_del_locked(s->ls_obj_hash,
304                                                &bd2, &h->loh_hash);
305                         cfs_list_move(&h->loh_lru, &dispose);
306                         if (did_sth == 0)
307                                 did_sth = 1;
308
309                         if (nr != ~0 && --nr == 0)
310                                 break;
311
312                         if (count > 0 && --count == 0)
313                                 break;
314
315                 }
316                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
317                 cfs_cond_resched();
318                 /*
319                  * Free everything on the dispose list. This is safe against
320                  * races due to the reasons described in lu_object_put().
321                  */
322                 while (!cfs_list_empty(&dispose)) {
323                         h = container_of0(dispose.next,
324                                           struct lu_object_header, loh_lru);
325                         cfs_list_del_init(&h->loh_lru);
326                         lu_object_free(env, lu_object_top(h));
327                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
328                 }
329
330                 if (nr == 0)
331                         break;
332         }
333
334         if (nr != 0 && did_sth && start != 0) {
335                 start = 0; /* restart from the first bucket */
336                 goto again;
337         }
338         /* race on s->ls_purge_start, but nobody cares */
339         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
340
341         return nr;
342 }
343 EXPORT_SYMBOL(lu_site_purge);
344
345 /*
346  * Object printing.
347  *
348  * Code below has to jump through certain loops to output object description
349  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
350  * composes object description from strings that are parts of _lines_ of
351  * output (i.e., strings that are not terminated by newline). This doesn't fit
352  * very well into libcfs_debug_msg() interface that assumes that each message
353  * supplied to it is a self-contained output line.
354  *
355  * To work around this, strings are collected in a temporary buffer
356  * (implemented as a value of lu_cdebug_key key), until terminating newline
357  * character is detected.
358  *
359  */
360
361 enum {
362         /**
363          * Maximal line size.
364          *
365          * XXX overflow is not handled correctly.
366          */
367         LU_CDEBUG_LINE = 512
368 };
369
370 struct lu_cdebug_data {
371         /**
372          * Temporary buffer.
373          */
374         char lck_area[LU_CDEBUG_LINE];
375 };
376
377 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
378 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
379
380 /**
381  * Key, holding temporary buffer. This key is registered very early by
382  * lu_global_init().
383  */
384 struct lu_context_key lu_global_key = {
385         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
386         .lct_init = lu_global_key_init,
387         .lct_fini = lu_global_key_fini
388 };
389
390 /**
391  * Printer function emitting messages through libcfs_debug_msg().
392  */
393 int lu_cdebug_printer(const struct lu_env *env,
394                       void *cookie, const char *format, ...)
395 {
396         struct lu_cdebug_print_info *info = cookie;
397         struct lu_cdebug_data       *key;
398         int used;
399         int complete;
400         va_list args;
401
402         va_start(args, format);
403
404         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
405         LASSERT(key != NULL);
406
407         used = strlen(key->lck_area);
408         complete = format[strlen(format) - 1] == '\n';
409         /*
410          * Append new chunk to the buffer.
411          */
412         vsnprintf(key->lck_area + used,
413                   ARRAY_SIZE(key->lck_area) - used, format, args);
414         if (complete) {
415                 if (cfs_cdebug_show(info->lpi_mask, info->lpi_subsys))
416                         libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
417                                          (char *)info->lpi_file, info->lpi_fn,
418                                          info->lpi_line, "%s", key->lck_area);
419                 key->lck_area[0] = 0;
420         }
421         va_end(args);
422         return 0;
423 }
424 EXPORT_SYMBOL(lu_cdebug_printer);
425
426 /**
427  * Print object header.
428  */
429 void lu_object_header_print(const struct lu_env *env, void *cookie,
430                             lu_printer_t printer,
431                             const struct lu_object_header *hdr)
432 {
433         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
434                    hdr, hdr->loh_flags, cfs_atomic_read(&hdr->loh_ref),
435                    PFID(&hdr->loh_fid),
436                    cfs_hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
437                    cfs_list_empty((cfs_list_t *)&hdr->loh_lru) ? \
438                    "" : " lru",
439                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
440 }
441 EXPORT_SYMBOL(lu_object_header_print);
442
443 /**
444  * Print human readable representation of the \a o to the \a printer.
445  */
446 void lu_object_print(const struct lu_env *env, void *cookie,
447                      lu_printer_t printer, const struct lu_object *o)
448 {
449         static const char ruler[] = "........................................";
450         struct lu_object_header *top;
451         int depth;
452
453         top = o->lo_header;
454         lu_object_header_print(env, cookie, printer, top);
455         (*printer)(env, cookie, "{ \n");
456         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
457                 depth = o->lo_depth + 4;
458
459                 /*
460                  * print `.' \a depth times followed by type name and address
461                  */
462                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
463                            o->lo_dev->ld_type->ldt_name, o);
464                 if (o->lo_ops->loo_object_print != NULL)
465                         o->lo_ops->loo_object_print(env, cookie, printer, o);
466                 (*printer)(env, cookie, "\n");
467         }
468         (*printer)(env, cookie, "} header@%p\n", top);
469 }
470 EXPORT_SYMBOL(lu_object_print);
471
472 /**
473  * Check object consistency.
474  */
475 int lu_object_invariant(const struct lu_object *o)
476 {
477         struct lu_object_header *top;
478
479         top = o->lo_header;
480         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
481                 if (o->lo_ops->loo_object_invariant != NULL &&
482                     !o->lo_ops->loo_object_invariant(o))
483                         return 0;
484         }
485         return 1;
486 }
487 EXPORT_SYMBOL(lu_object_invariant);
488
489 static struct lu_object *htable_lookup(struct lu_site *s,
490                                        cfs_hash_bd_t *bd,
491                                        const struct lu_fid *f,
492                                        cfs_waitlink_t *waiter,
493                                        __u64 *version)
494 {
495         struct lu_site_bkt_data *bkt;
496         struct lu_object_header *h;
497         cfs_hlist_node_t        *hnode;
498         __u64  ver = cfs_hash_bd_version_get(bd);
499
500         if (*version == ver)
501                 return NULL;
502
503         *version = ver;
504         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
505         /* cfs_hash_bd_lookup_intent is a somehow "internal" function
506          * of cfs_hash, but we don't want refcount on object right now */
507         hnode = cfs_hash_bd_lookup_locked(s->ls_obj_hash, bd, (void *)f);
508         if (hnode == NULL) {
509                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
510                 return NULL;
511         }
512
513         h = container_of0(hnode, struct lu_object_header, loh_hash);
514         if (likely(!lu_object_is_dying(h))) {
515                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
516                 return lu_object_top(h);
517         }
518
519         /*
520          * Lookup found an object being destroyed this object cannot be
521          * returned (to assure that references to dying objects are eventually
522          * drained), and moreover, lookup has to wait until object is freed.
523          */
524         cfs_atomic_dec(&h->loh_ref);
525
526         cfs_waitlink_init(waiter);
527         cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
528         cfs_set_current_state(CFS_TASK_UNINT);
529         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
530         return ERR_PTR(-EAGAIN);
531 }
532
533 /**
534  * Search cache for an object with the fid \a f. If such object is found,
535  * return it. Otherwise, create new object, insert it into cache and return
536  * it. In any case, additional reference is acquired on the returned object.
537  */
538 struct lu_object *lu_object_find(const struct lu_env *env,
539                                  struct lu_device *dev, const struct lu_fid *f,
540                                  const struct lu_object_conf *conf)
541 {
542         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
543 }
544 EXPORT_SYMBOL(lu_object_find);
545
546 /**
547  * Core logic of lu_object_find*() functions.
548  */
549 static struct lu_object *lu_object_find_try(const struct lu_env *env,
550                                             struct lu_device *dev,
551                                             const struct lu_fid *f,
552                                             const struct lu_object_conf *conf,
553                                             cfs_waitlink_t *waiter)
554 {
555         struct lu_object      *o;
556         struct lu_object      *shadow;
557         struct lu_site        *s;
558         cfs_hash_t            *hs;
559         cfs_hash_bd_t          bd;
560         __u64                  version = 0;
561
562         /*
563          * This uses standard index maintenance protocol:
564          *
565          *     - search index under lock, and return object if found;
566          *     - otherwise, unlock index, allocate new object;
567          *     - lock index and search again;
568          *     - if nothing is found (usual case), insert newly created
569          *       object into index;
570          *     - otherwise (race: other thread inserted object), free
571          *       object just allocated.
572          *     - unlock index;
573          *     - return object.
574          *
575          * If dying object is found during index search, add @waiter to the
576          * site wait-queue and return ERR_PTR(-EAGAIN).
577          */
578         s  = dev->ld_site;
579         hs = s->ls_obj_hash;
580         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
581         o = htable_lookup(s, &bd, f, waiter, &version);
582         cfs_hash_bd_unlock(hs, &bd, 1);
583         if (o != NULL)
584                 return o;
585
586         /*
587          * Allocate new object. This may result in rather complicated
588          * operations, including fld queries, inode loading, etc.
589          */
590         o = lu_object_alloc(env, dev, f, conf);
591         if (unlikely(IS_ERR(o)))
592                 return o;
593
594         LASSERT(lu_fid_eq(lu_object_fid(o), f));
595
596         cfs_hash_bd_lock(hs, &bd, 1);
597
598         shadow = htable_lookup(s, &bd, f, waiter, &version);
599         if (likely(shadow == NULL)) {
600                 struct lu_site_bkt_data *bkt;
601
602                 bkt = cfs_hash_bd_extra_get(hs, &bd);
603                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
604                 cfs_list_add_tail(&o->lo_header->loh_lru, &bkt->lsb_lru);
605                 bkt->lsb_busy++;
606                 cfs_hash_bd_unlock(hs, &bd, 1);
607                 return o;
608         }
609
610         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
611         cfs_hash_bd_unlock(hs, &bd, 1);
612         lu_object_free(env, o);
613         return shadow;
614 }
615
616 /**
617  * Much like lu_object_find(), but top level device of object is specifically
618  * \a dev rather than top level device of the site. This interface allows
619  * objects of different "stacking" to be created within the same site.
620  */
621 struct lu_object *lu_object_find_at(const struct lu_env *env,
622                                     struct lu_device *dev,
623                                     const struct lu_fid *f,
624                                     const struct lu_object_conf *conf)
625 {
626         struct lu_site_bkt_data *bkt;
627         struct lu_object        *obj;
628         cfs_waitlink_t           wait;
629
630         while (1) {
631                 obj = lu_object_find_try(env, dev, f, conf, &wait);
632                 if (obj != ERR_PTR(-EAGAIN))
633                         return obj;
634                 /*
635                  * lu_object_find_try() already added waiter into the
636                  * wait queue.
637                  */
638                 cfs_waitq_wait(&wait, CFS_TASK_UNINT);
639                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
640                 cfs_waitq_del(&bkt->lsb_marche_funebre, &wait);
641         }
642 }
643 EXPORT_SYMBOL(lu_object_find_at);
644
645 /**
646  * Find object with given fid, and return its slice belonging to given device.
647  */
648 struct lu_object *lu_object_find_slice(const struct lu_env *env,
649                                        struct lu_device *dev,
650                                        const struct lu_fid *f,
651                                        const struct lu_object_conf *conf)
652 {
653         struct lu_object *top;
654         struct lu_object *obj;
655
656         top = lu_object_find(env, dev, f, conf);
657         if (!IS_ERR(top)) {
658                 obj = lu_object_locate(top->lo_header, dev->ld_type);
659                 if (obj == NULL)
660                         lu_object_put(env, top);
661         } else
662                 obj = top;
663         return obj;
664 }
665 EXPORT_SYMBOL(lu_object_find_slice);
666
667 /**
668  * Global list of all device types.
669  */
670 static CFS_LIST_HEAD(lu_device_types);
671
672 int lu_device_type_init(struct lu_device_type *ldt)
673 {
674         int result;
675
676         CFS_INIT_LIST_HEAD(&ldt->ldt_linkage);
677         result = ldt->ldt_ops->ldto_init(ldt);
678         if (result == 0)
679                 cfs_list_add(&ldt->ldt_linkage, &lu_device_types);
680         return result;
681 }
682 EXPORT_SYMBOL(lu_device_type_init);
683
684 void lu_device_type_fini(struct lu_device_type *ldt)
685 {
686         cfs_list_del_init(&ldt->ldt_linkage);
687         ldt->ldt_ops->ldto_fini(ldt);
688 }
689 EXPORT_SYMBOL(lu_device_type_fini);
690
691 void lu_types_stop(void)
692 {
693         struct lu_device_type *ldt;
694
695         cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
696                 if (ldt->ldt_device_nr == 0)
697                         ldt->ldt_ops->ldto_stop(ldt);
698         }
699 }
700 EXPORT_SYMBOL(lu_types_stop);
701
702 /**
703  * Global list of all sites on this node
704  */
705 static CFS_LIST_HEAD(lu_sites);
706 static CFS_DECLARE_MUTEX(lu_sites_guard);
707
708 /**
709  * Global environment used by site shrinker.
710  */
711 static struct lu_env lu_shrink_env;
712
713 struct lu_site_print_arg {
714         struct lu_env   *lsp_env;
715         void            *lsp_cookie;
716         lu_printer_t     lsp_printer;
717 };
718
719 static int
720 lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd,
721                   cfs_hlist_node_t *hnode, void *data)
722 {
723         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
724         struct lu_object_header  *h;
725
726         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
727         if (!cfs_list_empty(&h->loh_layers)) {
728                 const struct lu_object *o;
729
730                 o = lu_object_top(h);
731                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
732                                 arg->lsp_printer, o);
733         } else {
734                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
735                                        arg->lsp_printer, h);
736         }
737         return 0;
738 }
739
740 /**
741  * Print all objects in \a s.
742  */
743 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
744                    lu_printer_t printer)
745 {
746         struct lu_site_print_arg arg = {
747                 .lsp_env     = (struct lu_env *)env,
748                 .lsp_cookie  = cookie,
749                 .lsp_printer = printer,
750         };
751
752         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
753 }
754 EXPORT_SYMBOL(lu_site_print);
755
756 enum {
757         LU_CACHE_PERCENT   = 20,
758 };
759
760 /**
761  * Return desired hash table order.
762  */
763 static int lu_htable_order(void)
764 {
765         unsigned long cache_size;
766         int bits;
767
768         /*
769          * Calculate hash table size, assuming that we want reasonable
770          * performance when 20% of total memory is occupied by cache of
771          * lu_objects.
772          *
773          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
774          */
775         cache_size = cfs_num_physpages;
776
777 #if BITS_PER_LONG == 32
778         /* limit hashtable size for lowmem systems to low RAM */
779         if (cache_size > 1 << (30 - CFS_PAGE_SHIFT))
780                 cache_size = 1 << (30 - CFS_PAGE_SHIFT) * 3 / 4;
781 #endif
782
783         cache_size = cache_size / 100 * LU_CACHE_PERCENT *
784                 (CFS_PAGE_SIZE / 1024);
785
786         for (bits = 1; (1 << bits) < cache_size; ++bits) {
787                 ;
788         }
789         return bits;
790 }
791
792 static unsigned lu_obj_hop_hash(cfs_hash_t *hs,
793                                 const void *key, unsigned mask)
794 {
795         struct lu_fid  *fid = (struct lu_fid *)key;
796         __u32           hash;
797
798         hash = fid_flatten32(fid);
799         hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
800         hash = cfs_hash_long(hash, hs->hs_bkt_bits);
801
802         /* give me another random factor */
803         hash -= cfs_hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
804
805         hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
806         hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
807
808         return hash & mask;
809 }
810
811 static void *lu_obj_hop_object(cfs_hlist_node_t *hnode)
812 {
813         return cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
814 }
815
816 static void *lu_obj_hop_key(cfs_hlist_node_t *hnode)
817 {
818         struct lu_object_header *h;
819
820         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
821         return &h->loh_fid;
822 }
823
824 static int lu_obj_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
825 {
826         struct lu_object_header *h;
827
828         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
829         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
830 }
831
832 static void lu_obj_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
833 {
834         struct lu_object_header *h;
835
836         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
837         if (cfs_atomic_add_return(1, &h->loh_ref) == 1) {
838                 struct lu_site_bkt_data *bkt;
839                 cfs_hash_bd_t            bd;
840
841                 cfs_hash_bd_get(hs, &h->loh_fid, &bd);
842                 bkt = cfs_hash_bd_extra_get(hs, &bd);
843                 bkt->lsb_busy++;
844         }
845 }
846
847 static void lu_obj_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
848 {
849         LBUG(); /* we should never called it */
850 }
851
852 cfs_hash_ops_t lu_site_hash_ops = {
853         .hs_hash        = lu_obj_hop_hash,
854         .hs_key         = lu_obj_hop_key,
855         .hs_keycmp      = lu_obj_hop_keycmp,
856         .hs_object      = lu_obj_hop_object,
857         .hs_get         = lu_obj_hop_get,
858         .hs_put_locked  = lu_obj_hop_put_locked,
859 };
860
861 /**
862  * Initialize site \a s, with \a d as the top level device.
863  */
864 #define LU_SITE_BITS_MIN    12
865 #define LU_SITE_BITS_MAX    24
866 /**
867  * total 256 buckets, we don't want too many buckets because:
868  * - consume too much memory
869  * - avoid unbalanced LRU list
870  */
871 #define LU_SITE_BKT_BITS    8
872
873 int lu_site_init(struct lu_site *s, struct lu_device *top)
874 {
875         struct lu_site_bkt_data *bkt;
876         cfs_hash_bd_t bd;
877         char name[16];
878         int bits;
879         int i;
880         ENTRY;
881
882         memset(s, 0, sizeof *s);
883         bits = lu_htable_order();
884         snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
885         for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
886              bits >= LU_SITE_BITS_MIN; bits--) {
887                 s->ls_obj_hash = cfs_hash_create(name, bits, bits,
888                                                  bits - LU_SITE_BKT_BITS,
889                                                  sizeof(*bkt), 0, 0,
890                                                  &lu_site_hash_ops,
891                                                  CFS_HASH_SPIN_BKTLOCK |
892                                                  CFS_HASH_NO_ITEMREF |
893                                                  CFS_HASH_DEPTH |
894                                                  CFS_HASH_ASSERT_EMPTY);
895                 if (s->ls_obj_hash != NULL)
896                         break;
897         }
898
899         if (s->ls_obj_hash == NULL) {
900                 CERROR("failed to create lu_site hash with bits: %d\n", bits);
901                 return -ENOMEM;
902         }
903
904         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
905                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
906                 CFS_INIT_LIST_HEAD(&bkt->lsb_lru);
907                 cfs_waitq_init(&bkt->lsb_marche_funebre);
908         }
909
910         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
911         if (s->ls_stats == NULL) {
912                 cfs_hash_putref(s->ls_obj_hash);
913                 s->ls_obj_hash = NULL;
914                 return -ENOMEM;
915         }
916
917         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
918                              0, "created", "created");
919         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
920                              0, "cache_hit", "cache_hit");
921         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
922                              0, "cache_miss", "cache_miss");
923         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
924                              0, "cache_race", "cache_race");
925         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
926                              0, "cache_death_race", "cache_death_race");
927         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
928                              0, "lru_purged", "lru_purged");
929
930         CFS_INIT_LIST_HEAD(&s->ls_linkage);
931         s->ls_top_dev = top;
932         top->ld_site = s;
933         lu_device_get(top);
934         lu_ref_add(&top->ld_reference, "site-top", s);
935
936         RETURN(0);
937 }
938 EXPORT_SYMBOL(lu_site_init);
939
940 /**
941  * Finalize \a s and release its resources.
942  */
943 void lu_site_fini(struct lu_site *s)
944 {
945         cfs_down(&lu_sites_guard);
946         cfs_list_del_init(&s->ls_linkage);
947         cfs_up(&lu_sites_guard);
948
949         if (s->ls_obj_hash != NULL) {
950                 cfs_hash_putref(s->ls_obj_hash);
951                 s->ls_obj_hash = NULL;
952         }
953
954         if (s->ls_top_dev != NULL) {
955                 s->ls_top_dev->ld_site = NULL;
956                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
957                 lu_device_put(s->ls_top_dev);
958                 s->ls_top_dev = NULL;
959         }
960
961         if (s->ls_stats != NULL)
962                 lprocfs_free_stats(&s->ls_stats);
963 }
964 EXPORT_SYMBOL(lu_site_fini);
965
966 /**
967  * Called when initialization of stack for this site is completed.
968  */
969 int lu_site_init_finish(struct lu_site *s)
970 {
971         int result;
972         cfs_down(&lu_sites_guard);
973         result = lu_context_refill(&lu_shrink_env.le_ctx);
974         if (result == 0)
975                 cfs_list_add(&s->ls_linkage, &lu_sites);
976         cfs_up(&lu_sites_guard);
977         return result;
978 }
979 EXPORT_SYMBOL(lu_site_init_finish);
980
981 /**
982  * Acquire additional reference on device \a d
983  */
984 void lu_device_get(struct lu_device *d)
985 {
986         cfs_atomic_inc(&d->ld_ref);
987 }
988 EXPORT_SYMBOL(lu_device_get);
989
990 /**
991  * Release reference on device \a d.
992  */
993 void lu_device_put(struct lu_device *d)
994 {
995         LASSERT(cfs_atomic_read(&d->ld_ref) > 0);
996         cfs_atomic_dec(&d->ld_ref);
997 }
998 EXPORT_SYMBOL(lu_device_put);
999
1000 /**
1001  * Initialize device \a d of type \a t.
1002  */
1003 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1004 {
1005         if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL)
1006                 t->ldt_ops->ldto_start(t);
1007         memset(d, 0, sizeof *d);
1008         cfs_atomic_set(&d->ld_ref, 0);
1009         d->ld_type = t;
1010         lu_ref_init(&d->ld_reference);
1011         return 0;
1012 }
1013 EXPORT_SYMBOL(lu_device_init);
1014
1015 /**
1016  * Finalize device \a d.
1017  */
1018 void lu_device_fini(struct lu_device *d)
1019 {
1020         struct lu_device_type *t;
1021
1022         t = d->ld_type;
1023         if (d->ld_obd != NULL) {
1024                 d->ld_obd->obd_lu_dev = NULL;
1025                 d->ld_obd = NULL;
1026         }
1027
1028         lu_ref_fini(&d->ld_reference);
1029         LASSERTF(cfs_atomic_read(&d->ld_ref) == 0,
1030                  "Refcount is %u\n", cfs_atomic_read(&d->ld_ref));
1031         LASSERT(t->ldt_device_nr > 0);
1032         if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL)
1033                 t->ldt_ops->ldto_stop(t);
1034 }
1035 EXPORT_SYMBOL(lu_device_fini);
1036
1037 /**
1038  * Initialize object \a o that is part of compound object \a h and was created
1039  * by device \a d.
1040  */
1041 int lu_object_init(struct lu_object *o,
1042                    struct lu_object_header *h, struct lu_device *d)
1043 {
1044         memset(o, 0, sizeof *o);
1045         o->lo_header = h;
1046         o->lo_dev    = d;
1047         lu_device_get(d);
1048         o->lo_dev_ref = lu_ref_add(&d->ld_reference, "lu_object", o);
1049         CFS_INIT_LIST_HEAD(&o->lo_linkage);
1050         return 0;
1051 }
1052 EXPORT_SYMBOL(lu_object_init);
1053
1054 /**
1055  * Finalize object and release its resources.
1056  */
1057 void lu_object_fini(struct lu_object *o)
1058 {
1059         struct lu_device *dev = o->lo_dev;
1060
1061         LASSERT(cfs_list_empty(&o->lo_linkage));
1062
1063         if (dev != NULL) {
1064                 lu_ref_del_at(&dev->ld_reference,
1065                               o->lo_dev_ref , "lu_object", o);
1066                 lu_device_put(dev);
1067                 o->lo_dev = NULL;
1068         }
1069 }
1070 EXPORT_SYMBOL(lu_object_fini);
1071
1072 /**
1073  * Add object \a o as first layer of compound object \a h
1074  *
1075  * This is typically called by the ->ldo_object_alloc() method of top-level
1076  * device.
1077  */
1078 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1079 {
1080         cfs_list_move(&o->lo_linkage, &h->loh_layers);
1081 }
1082 EXPORT_SYMBOL(lu_object_add_top);
1083
1084 /**
1085  * Add object \a o as a layer of compound object, going after \a before.
1086  *
1087  * This is typically called by the ->ldo_object_alloc() method of \a
1088  * before->lo_dev.
1089  */
1090 void lu_object_add(struct lu_object *before, struct lu_object *o)
1091 {
1092         cfs_list_move(&o->lo_linkage, &before->lo_linkage);
1093 }
1094 EXPORT_SYMBOL(lu_object_add);
1095
1096 /**
1097  * Initialize compound object.
1098  */
1099 int lu_object_header_init(struct lu_object_header *h)
1100 {
1101         memset(h, 0, sizeof *h);
1102         cfs_atomic_set(&h->loh_ref, 1);
1103         CFS_INIT_HLIST_NODE(&h->loh_hash);
1104         CFS_INIT_LIST_HEAD(&h->loh_lru);
1105         CFS_INIT_LIST_HEAD(&h->loh_layers);
1106         lu_ref_init(&h->loh_reference);
1107         return 0;
1108 }
1109 EXPORT_SYMBOL(lu_object_header_init);
1110
1111 /**
1112  * Finalize compound object.
1113  */
1114 void lu_object_header_fini(struct lu_object_header *h)
1115 {
1116         LASSERT(cfs_list_empty(&h->loh_layers));
1117         LASSERT(cfs_list_empty(&h->loh_lru));
1118         LASSERT(cfs_hlist_unhashed(&h->loh_hash));
1119         lu_ref_fini(&h->loh_reference);
1120 }
1121 EXPORT_SYMBOL(lu_object_header_fini);
1122
1123 /**
1124  * Given a compound object, find its slice, corresponding to the device type
1125  * \a dtype.
1126  */
1127 struct lu_object *lu_object_locate(struct lu_object_header *h,
1128                                    const struct lu_device_type *dtype)
1129 {
1130         struct lu_object *o;
1131
1132         cfs_list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1133                 if (o->lo_dev->ld_type == dtype)
1134                         return o;
1135         }
1136         return NULL;
1137 }
1138 EXPORT_SYMBOL(lu_object_locate);
1139
1140
1141
1142 /**
1143  * Finalize and free devices in the device stack.
1144  *
1145  * Finalize device stack by purging object cache, and calling
1146  * lu_device_type_operations::ldto_device_fini() and
1147  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1148  */
1149 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1150 {
1151         struct lu_site   *site = top->ld_site;
1152         struct lu_device *scan;
1153         struct lu_device *next;
1154
1155         lu_site_purge(env, site, ~0);
1156         for (scan = top; scan != NULL; scan = next) {
1157                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1158                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1159                 lu_device_put(scan);
1160         }
1161
1162         /* purge again. */
1163         lu_site_purge(env, site, ~0);
1164
1165         if (!cfs_hash_is_empty(site->ls_obj_hash)) {
1166                 /*
1167                  * Uh-oh, objects still exist.
1168                  */
1169                 static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR);
1170
1171                 lu_site_print(env, site, &cookie, lu_cdebug_printer);
1172         }
1173
1174         for (scan = top; scan != NULL; scan = next) {
1175                 const struct lu_device_type *ldt = scan->ld_type;
1176                 struct obd_type             *type;
1177
1178                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1179                 type = ldt->ldt_obd_type;
1180                 if (type != NULL) {
1181                         type->typ_refcnt--;
1182                         class_put_type(type);
1183                 }
1184         }
1185 }
1186 EXPORT_SYMBOL(lu_stack_fini);
1187
1188 enum {
1189         /**
1190          * Maximal number of tld slots.
1191          */
1192         LU_CONTEXT_KEY_NR = 32
1193 };
1194
1195 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1196
1197 static cfs_spinlock_t lu_keys_guard = CFS_SPIN_LOCK_UNLOCKED;
1198
1199 /**
1200  * Global counter incremented whenever key is registered, unregistered,
1201  * revived or quiesced. This is used to void unnecessary calls to
1202  * lu_context_refill(). No locking is provided, as initialization and shutdown
1203  * are supposed to be externally serialized.
1204  */
1205 static unsigned key_set_version = 0;
1206
1207 /**
1208  * Register new key.
1209  */
1210 int lu_context_key_register(struct lu_context_key *key)
1211 {
1212         int result;
1213         int i;
1214
1215         LASSERT(key->lct_init != NULL);
1216         LASSERT(key->lct_fini != NULL);
1217         LASSERT(key->lct_tags != 0);
1218         LASSERT(key->lct_owner != NULL);
1219
1220         result = -ENFILE;
1221         cfs_spin_lock(&lu_keys_guard);
1222         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1223                 if (lu_keys[i] == NULL) {
1224                         key->lct_index = i;
1225                         cfs_atomic_set(&key->lct_used, 1);
1226                         lu_keys[i] = key;
1227                         lu_ref_init(&key->lct_reference);
1228                         result = 0;
1229                         ++key_set_version;
1230                         break;
1231                 }
1232         }
1233         cfs_spin_unlock(&lu_keys_guard);
1234         return result;
1235 }
1236 EXPORT_SYMBOL(lu_context_key_register);
1237
1238 static void key_fini(struct lu_context *ctx, int index)
1239 {
1240         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1241                 struct lu_context_key *key;
1242
1243                 key = lu_keys[index];
1244                 LASSERT(key != NULL);
1245                 LASSERT(key->lct_fini != NULL);
1246                 LASSERT(cfs_atomic_read(&key->lct_used) > 1);
1247
1248                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1249                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1250                 cfs_atomic_dec(&key->lct_used);
1251                 LASSERT(key->lct_owner != NULL);
1252                 if (!(ctx->lc_tags & LCT_NOREF)) {
1253                         LASSERT(cfs_module_refcount(key->lct_owner) > 0);
1254                         cfs_module_put(key->lct_owner);
1255                 }
1256                 ctx->lc_value[index] = NULL;
1257         }
1258 }
1259
1260 /**
1261  * Deregister key.
1262  */
1263 void lu_context_key_degister(struct lu_context_key *key)
1264 {
1265         LASSERT(cfs_atomic_read(&key->lct_used) >= 1);
1266         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1267
1268         lu_context_key_quiesce(key);
1269
1270         ++key_set_version;
1271         cfs_spin_lock(&lu_keys_guard);
1272         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1273         if (lu_keys[key->lct_index]) {
1274                 lu_keys[key->lct_index] = NULL;
1275                 lu_ref_fini(&key->lct_reference);
1276         }
1277         cfs_spin_unlock(&lu_keys_guard);
1278
1279         LASSERTF(cfs_atomic_read(&key->lct_used) == 1,
1280                  "key has instances: %d\n",
1281                  cfs_atomic_read(&key->lct_used));
1282 }
1283 EXPORT_SYMBOL(lu_context_key_degister);
1284
1285 /**
1286  * Register a number of keys. This has to be called after all keys have been
1287  * initialized by a call to LU_CONTEXT_KEY_INIT().
1288  */
1289 int lu_context_key_register_many(struct lu_context_key *k, ...)
1290 {
1291         struct lu_context_key *key = k;
1292         va_list args;
1293         int result;
1294
1295         va_start(args, k);
1296         do {
1297                 result = lu_context_key_register(key);
1298                 if (result)
1299                         break;
1300                 key = va_arg(args, struct lu_context_key *);
1301         } while (key != NULL);
1302         va_end(args);
1303
1304         if (result != 0) {
1305                 va_start(args, k);
1306                 while (k != key) {
1307                         lu_context_key_degister(k);
1308                         k = va_arg(args, struct lu_context_key *);
1309                 }
1310                 va_end(args);
1311         }
1312
1313         return result;
1314 }
1315 EXPORT_SYMBOL(lu_context_key_register_many);
1316
1317 /**
1318  * De-register a number of keys. This is a dual to
1319  * lu_context_key_register_many().
1320  */
1321 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1322 {
1323         va_list args;
1324
1325         va_start(args, k);
1326         do {
1327                 lu_context_key_degister(k);
1328                 k = va_arg(args, struct lu_context_key*);
1329         } while (k != NULL);
1330         va_end(args);
1331 }
1332 EXPORT_SYMBOL(lu_context_key_degister_many);
1333
1334 /**
1335  * Revive a number of keys.
1336  */
1337 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1338 {
1339         va_list args;
1340
1341         va_start(args, k);
1342         do {
1343                 lu_context_key_revive(k);
1344                 k = va_arg(args, struct lu_context_key*);
1345         } while (k != NULL);
1346         va_end(args);
1347 }
1348 EXPORT_SYMBOL(lu_context_key_revive_many);
1349
1350 /**
1351  * Quiescent a number of keys.
1352  */
1353 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1354 {
1355         va_list args;
1356
1357         va_start(args, k);
1358         do {
1359                 lu_context_key_quiesce(k);
1360                 k = va_arg(args, struct lu_context_key*);
1361         } while (k != NULL);
1362         va_end(args);
1363 }
1364 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1365
1366 /**
1367  * Return value associated with key \a key in context \a ctx.
1368  */
1369 void *lu_context_key_get(const struct lu_context *ctx,
1370                          const struct lu_context_key *key)
1371 {
1372         LINVRNT(ctx->lc_state == LCS_ENTERED);
1373         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1374         LASSERT(lu_keys[key->lct_index] == key);
1375         return ctx->lc_value[key->lct_index];
1376 }
1377 EXPORT_SYMBOL(lu_context_key_get);
1378
1379 /**
1380  * List of remembered contexts. XXX document me.
1381  */
1382 static CFS_LIST_HEAD(lu_context_remembered);
1383
1384 /**
1385  * Destroy \a key in all remembered contexts. This is used to destroy key
1386  * values in "shared" contexts (like service threads), when a module owning
1387  * the key is about to be unloaded.
1388  */
1389 void lu_context_key_quiesce(struct lu_context_key *key)
1390 {
1391         struct lu_context *ctx;
1392         extern unsigned cl_env_cache_purge(unsigned nr);
1393
1394         if (!(key->lct_tags & LCT_QUIESCENT)) {
1395                 /*
1396                  * XXX layering violation.
1397                  */
1398                 cl_env_cache_purge(~0);
1399                 key->lct_tags |= LCT_QUIESCENT;
1400                 /*
1401                  * XXX memory barrier has to go here.
1402                  */
1403                 cfs_spin_lock(&lu_keys_guard);
1404                 cfs_list_for_each_entry(ctx, &lu_context_remembered,
1405                                         lc_remember)
1406                         key_fini(ctx, key->lct_index);
1407                 cfs_spin_unlock(&lu_keys_guard);
1408                 ++key_set_version;
1409         }
1410 }
1411 EXPORT_SYMBOL(lu_context_key_quiesce);
1412
1413 void lu_context_key_revive(struct lu_context_key *key)
1414 {
1415         key->lct_tags &= ~LCT_QUIESCENT;
1416         ++key_set_version;
1417 }
1418 EXPORT_SYMBOL(lu_context_key_revive);
1419
1420 static void keys_fini(struct lu_context *ctx)
1421 {
1422         int i;
1423
1424         cfs_spin_lock(&lu_keys_guard);
1425         if (ctx->lc_value != NULL) {
1426                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1427                         key_fini(ctx, i);
1428                 OBD_FREE(ctx->lc_value,
1429                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1430                 ctx->lc_value = NULL;
1431         }
1432         cfs_spin_unlock(&lu_keys_guard);
1433 }
1434
1435 static int keys_fill(struct lu_context *ctx)
1436 {
1437         int i;
1438
1439         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1440                 struct lu_context_key *key;
1441
1442                 key = lu_keys[i];
1443                 if (ctx->lc_value[i] == NULL && key != NULL &&
1444                     (key->lct_tags & ctx->lc_tags) &&
1445                     /*
1446                      * Don't create values for a LCT_QUIESCENT key, as this
1447                      * will pin module owning a key.
1448                      */
1449                     !(key->lct_tags & LCT_QUIESCENT)) {
1450                         void *value;
1451
1452                         LINVRNT(key->lct_init != NULL);
1453                         LINVRNT(key->lct_index == i);
1454
1455                         value = key->lct_init(ctx, key);
1456                         if (unlikely(IS_ERR(value)))
1457                                 return PTR_ERR(value);
1458
1459                         LASSERT(key->lct_owner != NULL);
1460                         if (!(ctx->lc_tags & LCT_NOREF))
1461                                 cfs_try_module_get(key->lct_owner);
1462                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1463                         cfs_atomic_inc(&key->lct_used);
1464                         /*
1465                          * This is the only place in the code, where an
1466                          * element of ctx->lc_value[] array is set to non-NULL
1467                          * value.
1468                          */
1469                         ctx->lc_value[i] = value;
1470                         if (key->lct_exit != NULL)
1471                                 ctx->lc_tags |= LCT_HAS_EXIT;
1472                 }
1473                 ctx->lc_version = key_set_version;
1474         }
1475         return 0;
1476 }
1477
1478 static int keys_init(struct lu_context *ctx)
1479 {
1480         int result;
1481
1482         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1483         if (likely(ctx->lc_value != NULL))
1484                 result = keys_fill(ctx);
1485         else
1486                 result = -ENOMEM;
1487
1488         if (result != 0)
1489                 keys_fini(ctx);
1490         return result;
1491 }
1492
1493 /**
1494  * Initialize context data-structure. Create values for all keys.
1495  */
1496 int lu_context_init(struct lu_context *ctx, __u32 tags)
1497 {
1498         memset(ctx, 0, sizeof *ctx);
1499         ctx->lc_state = LCS_INITIALIZED;
1500         ctx->lc_tags = tags;
1501         if (tags & LCT_REMEMBER) {
1502                 cfs_spin_lock(&lu_keys_guard);
1503                 cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
1504                 cfs_spin_unlock(&lu_keys_guard);
1505         } else
1506                 CFS_INIT_LIST_HEAD(&ctx->lc_remember);
1507         return keys_init(ctx);
1508 }
1509 EXPORT_SYMBOL(lu_context_init);
1510
1511 /**
1512  * Finalize context data-structure. Destroy key values.
1513  */
1514 void lu_context_fini(struct lu_context *ctx)
1515 {
1516         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1517         ctx->lc_state = LCS_FINALIZED;
1518         keys_fini(ctx);
1519         cfs_spin_lock(&lu_keys_guard);
1520         cfs_list_del_init(&ctx->lc_remember);
1521         cfs_spin_unlock(&lu_keys_guard);
1522 }
1523 EXPORT_SYMBOL(lu_context_fini);
1524
1525 /**
1526  * Called before entering context.
1527  */
1528 void lu_context_enter(struct lu_context *ctx)
1529 {
1530         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1531         ctx->lc_state = LCS_ENTERED;
1532 }
1533 EXPORT_SYMBOL(lu_context_enter);
1534
1535 /**
1536  * Called after exiting from \a ctx
1537  */
1538 void lu_context_exit(struct lu_context *ctx)
1539 {
1540         int i;
1541
1542         LINVRNT(ctx->lc_state == LCS_ENTERED);
1543         ctx->lc_state = LCS_LEFT;
1544         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1545                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1546                         if (ctx->lc_value[i] != NULL) {
1547                                 struct lu_context_key *key;
1548
1549                                 key = lu_keys[i];
1550                                 LASSERT(key != NULL);
1551                                 if (key->lct_exit != NULL)
1552                                         key->lct_exit(ctx,
1553                                                       key, ctx->lc_value[i]);
1554                         }
1555                 }
1556         }
1557 }
1558 EXPORT_SYMBOL(lu_context_exit);
1559
1560 /**
1561  * Allocate for context all missing keys that were registered after context
1562  * creation.
1563  */
1564 int lu_context_refill(struct lu_context *ctx)
1565 {
1566         LINVRNT(ctx->lc_value != NULL);
1567         return ctx->lc_version == key_set_version ? 0 : keys_fill(ctx);
1568 }
1569 EXPORT_SYMBOL(lu_context_refill);
1570
1571 int lu_env_init(struct lu_env *env, __u32 tags)
1572 {
1573         int result;
1574
1575         env->le_ses = NULL;
1576         result = lu_context_init(&env->le_ctx, tags);
1577         if (likely(result == 0))
1578                 lu_context_enter(&env->le_ctx);
1579         return result;
1580 }
1581 EXPORT_SYMBOL(lu_env_init);
1582
1583 void lu_env_fini(struct lu_env *env)
1584 {
1585         lu_context_exit(&env->le_ctx);
1586         lu_context_fini(&env->le_ctx);
1587         env->le_ses = NULL;
1588 }
1589 EXPORT_SYMBOL(lu_env_fini);
1590
1591 int lu_env_refill(struct lu_env *env)
1592 {
1593         int result;
1594
1595         result = lu_context_refill(&env->le_ctx);
1596         if (result == 0 && env->le_ses != NULL)
1597                 result = lu_context_refill(env->le_ses);
1598         return result;
1599 }
1600 EXPORT_SYMBOL(lu_env_refill);
1601
1602 static struct cfs_shrinker *lu_site_shrinker = NULL;
1603
1604 typedef struct lu_site_stats{
1605         unsigned        lss_populated;
1606         unsigned        lss_max_search;
1607         unsigned        lss_total;
1608         unsigned        lss_busy;
1609 } lu_site_stats_t;
1610
1611 static void lu_site_stats_get(cfs_hash_t *hs,
1612                               lu_site_stats_t *stats, int populated)
1613 {
1614         cfs_hash_bd_t bd;
1615         int           i;
1616
1617         cfs_hash_for_each_bucket(hs, &bd, i) {
1618                 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1619                 cfs_hlist_head_t        *hhead;
1620
1621                 cfs_hash_bd_lock(hs, &bd, 1);
1622                 stats->lss_busy  += bkt->lsb_busy;
1623                 stats->lss_total += cfs_hash_bd_count_get(&bd);
1624                 stats->lss_max_search = max((int)stats->lss_max_search,
1625                                             cfs_hash_bd_depmax_get(&bd));
1626                 if (!populated) {
1627                         cfs_hash_bd_unlock(hs, &bd, 1);
1628                         continue;
1629                 }
1630
1631                 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1632                         if (!cfs_hlist_empty(hhead))
1633                                 stats->lss_populated++;
1634                 }
1635                 cfs_hash_bd_unlock(hs, &bd, 1);
1636         }
1637 }
1638
1639 #ifdef __KERNEL__
1640 static int lu_cache_shrink(SHRINKER_FIRST_ARG int nr_to_scan,
1641                            unsigned int gfp_mask)
1642 {
1643         lu_site_stats_t stats;
1644         struct lu_site *s;
1645         struct lu_site *tmp;
1646         int cached = 0;
1647         int remain = nr_to_scan;
1648         CFS_LIST_HEAD(splice);
1649
1650         if (nr_to_scan != 0) {
1651                 if (!(gfp_mask & __GFP_FS))
1652                         return -1;
1653                 CDEBUG(D_INODE, "Shrink %d objects\n", nr_to_scan);
1654         }
1655
1656         cfs_down(&lu_sites_guard);
1657         cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1658                 if (nr_to_scan != 0) {
1659                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1660                         /*
1661                          * Move just shrunk site to the tail of site list to
1662                          * assure shrinking fairness.
1663                          */
1664                         cfs_list_move_tail(&s->ls_linkage, &splice);
1665                 }
1666
1667                 memset(&stats, 0, sizeof(stats));
1668                 lu_site_stats_get(s->ls_obj_hash, &stats, 0);
1669                 cached += stats.lss_total - stats.lss_busy;
1670                 if (nr_to_scan && remain <= 0)
1671                         break;
1672         }
1673         cfs_list_splice(&splice, lu_sites.prev);
1674         cfs_up(&lu_sites_guard);
1675
1676         cached = (cached / 100) * sysctl_vfs_cache_pressure;
1677         if (nr_to_scan == 0)
1678                 CDEBUG(D_INODE, "%d objects cached\n", cached);
1679         return cached;
1680 }
1681
1682 /*
1683  * Debugging stuff.
1684  */
1685
1686 /**
1687  * Environment to be used in debugger, contains all tags.
1688  */
1689 struct lu_env lu_debugging_env;
1690
1691 /**
1692  * Debugging printer function using printk().
1693  */
1694 int lu_printk_printer(const struct lu_env *env,
1695                       void *unused, const char *format, ...)
1696 {
1697         va_list args;
1698
1699         va_start(args, format);
1700         vprintk(format, args);
1701         va_end(args);
1702         return 0;
1703 }
1704
1705 void lu_debugging_setup(void)
1706 {
1707         lu_env_init(&lu_debugging_env, ~0);
1708 }
1709
1710 void lu_context_keys_dump(void)
1711 {
1712         int i;
1713
1714         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1715                 struct lu_context_key *key;
1716
1717                 key = lu_keys[i];
1718                 if (key != NULL) {
1719                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
1720                                i, key, key->lct_tags,
1721                                key->lct_init, key->lct_fini, key->lct_exit,
1722                                key->lct_index, cfs_atomic_read(&key->lct_used),
1723                                key->lct_owner ? key->lct_owner->name : "",
1724                                key->lct_owner);
1725                         lu_ref_print(&key->lct_reference);
1726                 }
1727         }
1728 }
1729 EXPORT_SYMBOL(lu_context_keys_dump);
1730 #else  /* !__KERNEL__ */
1731 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1732 {
1733         return 0;
1734 }
1735 #endif /* __KERNEL__ */
1736
1737 int  cl_global_init(void);
1738 void cl_global_fini(void);
1739 int  lu_ref_global_init(void);
1740 void lu_ref_global_fini(void);
1741
1742 int dt_global_init(void);
1743 void dt_global_fini(void);
1744
1745 int llo_global_init(void);
1746 void llo_global_fini(void);
1747
1748 /**
1749  * Initialization of global lu_* data.
1750  */
1751 int lu_global_init(void)
1752 {
1753         int result;
1754
1755         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
1756
1757         result = lu_ref_global_init();
1758         if (result != 0)
1759                 return result;
1760
1761         LU_CONTEXT_KEY_INIT(&lu_global_key);
1762         result = lu_context_key_register(&lu_global_key);
1763         if (result != 0)
1764                 return result;
1765         /*
1766          * At this level, we don't know what tags are needed, so allocate them
1767          * conservatively. This should not be too bad, because this
1768          * environment is global.
1769          */
1770         cfs_down(&lu_sites_guard);
1771         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
1772         cfs_up(&lu_sites_guard);
1773         if (result != 0)
1774                 return result;
1775
1776         /*
1777          * seeks estimation: 3 seeks to read a record from oi, one to read
1778          * inode, one for ea. Unfortunately setting this high value results in
1779          * lu_object/inode cache consuming all the memory.
1780          */
1781         lu_site_shrinker = cfs_set_shrinker(CFS_DEFAULT_SEEKS, lu_cache_shrink);
1782         if (lu_site_shrinker == NULL)
1783                 return -ENOMEM;
1784
1785         result = lu_time_global_init();
1786         if (result)
1787                 GOTO(out, result);
1788
1789 #ifdef __KERNEL__
1790         result = dt_global_init();
1791         if (result)
1792                 GOTO(out, result);
1793
1794         result = llo_global_init();
1795         if (result)
1796                 GOTO(out, result);
1797 #endif
1798         result = cl_global_init();
1799 out:
1800
1801         return result;
1802 }
1803
1804 /**
1805  * Dual to lu_global_init().
1806  */
1807 void lu_global_fini(void)
1808 {
1809         cl_global_fini();
1810 #ifdef __KERNEL__
1811         llo_global_fini();
1812         dt_global_fini();
1813 #endif
1814         lu_time_global_fini();
1815         if (lu_site_shrinker != NULL) {
1816                 cfs_remove_shrinker(lu_site_shrinker);
1817                 lu_site_shrinker = NULL;
1818         }
1819
1820         lu_context_key_degister(&lu_global_key);
1821
1822         /*
1823          * Tear shrinker environment down _after_ de-registering
1824          * lu_global_key, because the latter has a value in the former.
1825          */
1826         cfs_down(&lu_sites_guard);
1827         lu_env_fini(&lu_shrink_env);
1828         cfs_up(&lu_sites_guard);
1829
1830         lu_ref_global_fini();
1831 }
1832
1833 struct lu_buf LU_BUF_NULL = {
1834         .lb_buf = NULL,
1835         .lb_len = 0
1836 };
1837 EXPORT_SYMBOL(LU_BUF_NULL);
1838
1839 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
1840 {
1841 #ifdef LPROCFS
1842         struct lprocfs_counter ret;
1843
1844         lprocfs_stats_collect(stats, idx, &ret);
1845         return (__u32)ret.lc_count;
1846 #else
1847         return 0;
1848 #endif
1849 }
1850
1851 /**
1852  * Output site statistical counters into a buffer. Suitable for
1853  * lprocfs_rd_*()-style functions.
1854  */
1855 int lu_site_stats_print(const struct lu_site *s, char *page, int count)
1856 {
1857         lu_site_stats_t stats;
1858
1859         memset(&stats, 0, sizeof(stats));
1860         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
1861
1862         return snprintf(page, count, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
1863                         stats.lss_busy,
1864                         stats.lss_total,
1865                         stats.lss_populated,
1866                         CFS_HASH_NHLIST(s->ls_obj_hash),
1867                         stats.lss_max_search,
1868                         ls_stats_read(s->ls_stats, LU_SS_CREATED),
1869                         ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
1870                         ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
1871                         ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
1872                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
1873                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
1874 }
1875 EXPORT_SYMBOL(lu_site_stats_print);
1876
1877 const char *lu_time_names[LU_TIME_NR] = {
1878         [LU_TIME_FIND_LOOKUP] = "find_lookup",
1879         [LU_TIME_FIND_ALLOC]  = "find_alloc",
1880         [LU_TIME_FIND_INSERT] = "find_insert"
1881 };
1882 EXPORT_SYMBOL(lu_time_names);
1883
1884 /**
1885  * Helper function to initialize a number of kmem slab caches at once.
1886  */
1887 int lu_kmem_init(struct lu_kmem_descr *caches)
1888 {
1889         int result;
1890
1891         for (result = 0; caches->ckd_cache != NULL; ++caches) {
1892                 *caches->ckd_cache = cfs_mem_cache_create(caches->ckd_name,
1893                                                           caches->ckd_size,
1894                                                           0, 0);
1895                 if (*caches->ckd_cache == NULL) {
1896                         result = -ENOMEM;
1897                         break;
1898                 }
1899         }
1900         return result;
1901 }
1902 EXPORT_SYMBOL(lu_kmem_init);
1903
1904 /**
1905  * Helper function to finalize a number of kmem slab cached at once. Dual to
1906  * lu_kmem_init().
1907  */
1908 void lu_kmem_fini(struct lu_kmem_descr *caches)
1909 {
1910         int rc;
1911
1912         for (; caches->ckd_cache != NULL; ++caches) {
1913                 if (*caches->ckd_cache != NULL) {
1914                         rc = cfs_mem_cache_destroy(*caches->ckd_cache);
1915                         LASSERTF(rc == 0, "couldn't destroy %s slab\n",
1916                                  caches->ckd_name);
1917                         *caches->ckd_cache = NULL;
1918                 }
1919         }
1920 }
1921 EXPORT_SYMBOL(lu_kmem_fini);