Whamcloud - gitweb
LU-459 quiet too noisy console messages at mount
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  *   Author: Nikita Danilov <nikita.danilov@sun.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46 #ifndef EXPORT_SYMTAB
47 # define EXPORT_SYMTAB
48 #endif
49
50 #include <libcfs/libcfs.h>
51
52 #ifdef __KERNEL__
53 # include <linux/module.h>
54 #endif
55
56 /* hash_long() */
57 #include <libcfs/libcfs_hash.h>
58 #include <obd_class.h>
59 #include <obd_support.h>
60 #include <lustre_disk.h>
61 #include <lustre_fid.h>
62 #include <lu_object.h>
63 #include <libcfs/list.h>
64 /* lu_time_global_{init,fini}() */
65 #include <lu_time.h>
66
67 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
68
69 /**
70  * Decrease reference counter on object. If last reference is freed, return
71  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
72  * case, free object immediately.
73  */
74 void lu_object_put(const struct lu_env *env, struct lu_object *o)
75 {
76         struct lu_site_bkt_data *bkt;
77         struct lu_object_header *top;
78         struct lu_site          *site;
79         struct lu_object        *orig;
80         cfs_hash_bd_t            bd;
81
82         top  = o->lo_header;
83         site = o->lo_dev->ld_site;
84         orig = o;
85
86         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
87         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
88
89         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
90                 if (lu_object_is_dying(top)) {
91
92                         /*
93                          * somebody may be waiting for this, currently only
94                          * used for cl_object, see cl_object_put_last().
95                          */
96                         cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
97                 }
98                 return;
99         }
100
101         LASSERT(bkt->lsb_busy > 0);
102         bkt->lsb_busy--;
103         /*
104          * When last reference is released, iterate over object
105          * layers, and notify them that object is no longer busy.
106          */
107         cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
108                 if (o->lo_ops->loo_object_release != NULL)
109                         o->lo_ops->loo_object_release(env, o);
110         }
111
112         if (!lu_object_is_dying(top)) {
113                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
114                 return;
115         }
116
117         /*
118          * If object is dying (will not be cached), removed it
119          * from hash table and LRU.
120          *
121          * This is done with hash table and LRU lists locked. As the only
122          * way to acquire first reference to previously unreferenced
123          * object is through hash-table lookup (lu_object_find()),
124          * or LRU scanning (lu_site_purge()), that are done under hash-table
125          * and LRU lock, no race with concurrent object lookup is possible
126          * and we can safely destroy object below.
127          */
128         cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
129         cfs_list_del_init(&top->loh_lru);
130         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
131         /*
132          * Object was already removed from hash and lru above, can
133          * kill it.
134          */
135         lu_object_free(env, orig);
136 }
137 EXPORT_SYMBOL(lu_object_put);
138
139 /**
140  * Allocate new object.
141  *
142  * This follows object creation protocol, described in the comment within
143  * struct lu_device_operations definition.
144  */
145 static struct lu_object *lu_object_alloc(const struct lu_env *env,
146                                          struct lu_device *dev,
147                                          const struct lu_fid *f,
148                                          const struct lu_object_conf *conf)
149 {
150         struct lu_object *scan;
151         struct lu_object *top;
152         cfs_list_t *layers;
153         int clean;
154         int result;
155         ENTRY;
156
157         /*
158          * Create top-level object slice. This will also create
159          * lu_object_header.
160          */
161         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
162         if (top == NULL)
163                 RETURN(ERR_PTR(-ENOMEM));
164         /*
165          * This is the only place where object fid is assigned. It's constant
166          * after this point.
167          */
168         LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
169         top->lo_header->loh_fid = *f;
170         layers = &top->lo_header->loh_layers;
171         do {
172                 /*
173                  * Call ->loo_object_init() repeatedly, until no more new
174                  * object slices are created.
175                  */
176                 clean = 1;
177                 cfs_list_for_each_entry(scan, layers, lo_linkage) {
178                         if (scan->lo_flags & LU_OBJECT_ALLOCATED)
179                                 continue;
180                         clean = 0;
181                         scan->lo_header = top->lo_header;
182                         result = scan->lo_ops->loo_object_init(env, scan, conf);
183                         if (result != 0) {
184                                 lu_object_free(env, top);
185                                 RETURN(ERR_PTR(result));
186                         }
187                         scan->lo_flags |= LU_OBJECT_ALLOCATED;
188                 }
189         } while (!clean);
190
191         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
192                 if (scan->lo_ops->loo_object_start != NULL) {
193                         result = scan->lo_ops->loo_object_start(env, scan);
194                         if (result != 0) {
195                                 lu_object_free(env, top);
196                                 RETURN(ERR_PTR(result));
197                         }
198                 }
199         }
200
201         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
202         RETURN(top);
203 }
204
205 /**
206  * Free an object.
207  */
208 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
209 {
210         struct lu_site_bkt_data *bkt;
211         struct lu_site          *site;
212         struct lu_object        *scan;
213         cfs_list_t              *layers;
214         cfs_list_t               splice;
215
216         site   = o->lo_dev->ld_site;
217         layers = &o->lo_header->loh_layers;
218         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
219         /*
220          * First call ->loo_object_delete() method to release all resources.
221          */
222         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
223                 if (scan->lo_ops->loo_object_delete != NULL)
224                         scan->lo_ops->loo_object_delete(env, scan);
225         }
226
227         /*
228          * Then, splice object layers into stand-alone list, and call
229          * ->loo_object_free() on all layers to free memory. Splice is
230          * necessary, because lu_object_header is freed together with the
231          * top-level slice.
232          */
233         CFS_INIT_LIST_HEAD(&splice);
234         cfs_list_splice_init(layers, &splice);
235         while (!cfs_list_empty(&splice)) {
236                 /*
237                  * Free layers in bottom-to-top order, so that object header
238                  * lives as long as possible and ->loo_object_free() methods
239                  * can look at its contents.
240                  */
241                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
242                 cfs_list_del_init(&o->lo_linkage);
243                 LASSERT(o->lo_ops->loo_object_free != NULL);
244                 o->lo_ops->loo_object_free(env, o);
245         }
246
247         if (cfs_waitq_active(&bkt->lsb_marche_funebre))
248                 cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
249 }
250
251 /**
252  * Free \a nr objects from the cold end of the site LRU list.
253  */
254 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
255 {
256         struct lu_object_header *h;
257         struct lu_object_header *temp;
258         struct lu_site_bkt_data *bkt;
259         cfs_hash_bd_t            bd;
260         cfs_hash_bd_t            bd2;
261         cfs_list_t               dispose;
262         int                      did_sth;
263         int                      start;
264         int                      count;
265         int                      bnr;
266         int                      i;
267
268         CFS_INIT_LIST_HEAD(&dispose);
269         /*
270          * Under LRU list lock, scan LRU list and move unreferenced objects to
271          * the dispose list, removing them from LRU and hash table.
272          */
273         start = s->ls_purge_start;
274         bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
275  again:
276         did_sth = 0;
277         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
278                 if (i < start)
279                         continue;
280                 count = bnr;
281                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
282                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
283
284                 cfs_list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
285                         /*
286                          * Objects are sorted in lru order, and "busy"
287                          * objects (ones with h->loh_ref > 0) naturally tend to
288                          * live near hot end that we scan last. Unfortunately,
289                          * sites usually have small (less then ten) number of
290                          * busy yet rarely accessed objects (some global
291                          * objects, accessed directly through pointers,
292                          * bypassing hash table).
293                          * Currently algorithm scans them over and over again.
294                          * Probably we should move busy objects out of LRU,
295                          * or we can live with that.
296                          */
297                         if (cfs_atomic_read(&h->loh_ref) > 0)
298                                 continue;
299
300                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
301                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
302
303                         cfs_hash_bd_del_locked(s->ls_obj_hash,
304                                                &bd2, &h->loh_hash);
305                         cfs_list_move(&h->loh_lru, &dispose);
306                         if (did_sth == 0)
307                                 did_sth = 1;
308
309                         if (nr != ~0 && --nr == 0)
310                                 break;
311
312                         if (count > 0 && --count == 0)
313                                 break;
314
315                 }
316                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
317                 cfs_cond_resched();
318                 /*
319                  * Free everything on the dispose list. This is safe against
320                  * races due to the reasons described in lu_object_put().
321                  */
322                 while (!cfs_list_empty(&dispose)) {
323                         h = container_of0(dispose.next,
324                                           struct lu_object_header, loh_lru);
325                         cfs_list_del_init(&h->loh_lru);
326                         lu_object_free(env, lu_object_top(h));
327                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
328                 }
329
330                 if (nr == 0)
331                         break;
332         }
333
334         if (nr != 0 && did_sth && start != 0) {
335                 start = 0; /* restart from the first bucket */
336                 goto again;
337         }
338         /* race on s->ls_purge_start, but nobody cares */
339         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
340
341         return nr;
342 }
343 EXPORT_SYMBOL(lu_site_purge);
344
345 /*
346  * Object printing.
347  *
348  * Code below has to jump through certain loops to output object description
349  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
350  * composes object description from strings that are parts of _lines_ of
351  * output (i.e., strings that are not terminated by newline). This doesn't fit
352  * very well into libcfs_debug_msg() interface that assumes that each message
353  * supplied to it is a self-contained output line.
354  *
355  * To work around this, strings are collected in a temporary buffer
356  * (implemented as a value of lu_cdebug_key key), until terminating newline
357  * character is detected.
358  *
359  */
360
361 enum {
362         /**
363          * Maximal line size.
364          *
365          * XXX overflow is not handled correctly.
366          */
367         LU_CDEBUG_LINE = 256
368 };
369
370 struct lu_cdebug_data {
371         /**
372          * Temporary buffer.
373          */
374         char lck_area[LU_CDEBUG_LINE];
375 };
376
377 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
378 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
379
380 /**
381  * Key, holding temporary buffer. This key is registered very early by
382  * lu_global_init().
383  */
384 struct lu_context_key lu_global_key = {
385         .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
386         .lct_init = lu_global_key_init,
387         .lct_fini = lu_global_key_fini
388 };
389
390 /**
391  * Printer function emitting messages through libcfs_debug_msg().
392  */
393 int lu_cdebug_printer(const struct lu_env *env,
394                       void *cookie, const char *format, ...)
395 {
396         struct lu_cdebug_print_info *info = cookie;
397         struct lu_cdebug_data       *key;
398         int used;
399         int complete;
400         va_list args;
401
402         va_start(args, format);
403
404         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
405         LASSERT(key != NULL);
406
407         used = strlen(key->lck_area);
408         complete = format[strlen(format) - 1] == '\n';
409         /*
410          * Append new chunk to the buffer.
411          */
412         vsnprintf(key->lck_area + used,
413                   ARRAY_SIZE(key->lck_area) - used, format, args);
414         if (complete) {
415                 if (cfs_cdebug_show(info->lpi_mask, info->lpi_subsys))
416                         libcfs_debug_msg(NULL, info->lpi_subsys, info->lpi_mask,
417                                          (char *)info->lpi_file, info->lpi_fn,
418                                          info->lpi_line, "%s", key->lck_area);
419                 key->lck_area[0] = 0;
420         }
421         va_end(args);
422         return 0;
423 }
424 EXPORT_SYMBOL(lu_cdebug_printer);
425
426 /**
427  * Print object header.
428  */
429 void lu_object_header_print(const struct lu_env *env, void *cookie,
430                             lu_printer_t printer,
431                             const struct lu_object_header *hdr)
432 {
433         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
434                    hdr, hdr->loh_flags, cfs_atomic_read(&hdr->loh_ref),
435                    PFID(&hdr->loh_fid),
436                    cfs_hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
437                    cfs_list_empty((cfs_list_t *)&hdr->loh_lru) ? \
438                    "" : " lru",
439                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
440 }
441 EXPORT_SYMBOL(lu_object_header_print);
442
443 /**
444  * Print human readable representation of the \a o to the \a printer.
445  */
446 void lu_object_print(const struct lu_env *env, void *cookie,
447                      lu_printer_t printer, const struct lu_object *o)
448 {
449         static const char ruler[] = "........................................";
450         struct lu_object_header *top;
451         int depth;
452
453         top = o->lo_header;
454         lu_object_header_print(env, cookie, printer, top);
455         (*printer)(env, cookie, "{ \n");
456         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
457                 depth = o->lo_depth + 4;
458
459                 /*
460                  * print `.' \a depth times followed by type name and address
461                  */
462                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
463                            o->lo_dev->ld_type->ldt_name, o);
464                 if (o->lo_ops->loo_object_print != NULL)
465                         o->lo_ops->loo_object_print(env, cookie, printer, o);
466                 (*printer)(env, cookie, "\n");
467         }
468         (*printer)(env, cookie, "} header@%p\n", top);
469 }
470 EXPORT_SYMBOL(lu_object_print);
471
472 /**
473  * Check object consistency.
474  */
475 int lu_object_invariant(const struct lu_object *o)
476 {
477         struct lu_object_header *top;
478
479         top = o->lo_header;
480         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
481                 if (o->lo_ops->loo_object_invariant != NULL &&
482                     !o->lo_ops->loo_object_invariant(o))
483                         return 0;
484         }
485         return 1;
486 }
487 EXPORT_SYMBOL(lu_object_invariant);
488
489 static struct lu_object *htable_lookup(struct lu_site *s,
490                                        cfs_hash_bd_t *bd,
491                                        const struct lu_fid *f,
492                                        cfs_waitlink_t *waiter,
493                                        __u64 *version)
494 {
495         struct lu_site_bkt_data *bkt;
496         struct lu_object_header *h;
497         cfs_hlist_node_t        *hnode;
498         __u64  ver = cfs_hash_bd_version_get(bd);
499
500         if (*version == ver)
501                 return NULL;
502
503         *version = ver;
504         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
505         /* cfs_hash_bd_lookup_intent is a somehow "internal" function
506          * of cfs_hash, but we don't want refcount on object right now */
507         hnode = cfs_hash_bd_lookup_locked(s->ls_obj_hash, bd, (void *)f);
508         if (hnode == NULL) {
509                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
510                 return NULL;
511         }
512
513         h = container_of0(hnode, struct lu_object_header, loh_hash);
514         if (likely(!lu_object_is_dying(h))) {
515                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
516                 return lu_object_top(h);
517         }
518
519         /*
520          * Lookup found an object being destroyed this object cannot be
521          * returned (to assure that references to dying objects are eventually
522          * drained), and moreover, lookup has to wait until object is freed.
523          */
524         cfs_atomic_dec(&h->loh_ref);
525
526         cfs_waitlink_init(waiter);
527         cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
528         cfs_set_current_state(CFS_TASK_UNINT);
529         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
530         return ERR_PTR(-EAGAIN);
531 }
532
533 /**
534  * Search cache for an object with the fid \a f. If such object is found,
535  * return it. Otherwise, create new object, insert it into cache and return
536  * it. In any case, additional reference is acquired on the returned object.
537  */
538 struct lu_object *lu_object_find(const struct lu_env *env,
539                                  struct lu_device *dev, const struct lu_fid *f,
540                                  const struct lu_object_conf *conf)
541 {
542         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
543 }
544 EXPORT_SYMBOL(lu_object_find);
545
546 /**
547  * Core logic of lu_object_find*() functions.
548  */
549 static struct lu_object *lu_object_find_try(const struct lu_env *env,
550                                             struct lu_device *dev,
551                                             const struct lu_fid *f,
552                                             const struct lu_object_conf *conf,
553                                             cfs_waitlink_t *waiter)
554 {
555         struct lu_object      *o;
556         struct lu_object      *shadow;
557         struct lu_site        *s;
558         cfs_hash_t            *hs;
559         cfs_hash_bd_t          bd;
560         __u64                  version = 0;
561
562         /*
563          * This uses standard index maintenance protocol:
564          *
565          *     - search index under lock, and return object if found;
566          *     - otherwise, unlock index, allocate new object;
567          *     - lock index and search again;
568          *     - if nothing is found (usual case), insert newly created
569          *       object into index;
570          *     - otherwise (race: other thread inserted object), free
571          *       object just allocated.
572          *     - unlock index;
573          *     - return object.
574          *
575          * If dying object is found during index search, add @waiter to the
576          * site wait-queue and return ERR_PTR(-EAGAIN).
577          */
578         s  = dev->ld_site;
579         hs = s->ls_obj_hash;
580         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
581         o = htable_lookup(s, &bd, f, waiter, &version);
582         cfs_hash_bd_unlock(hs, &bd, 1);
583         if (o != NULL)
584                 return o;
585
586         /*
587          * Allocate new object. This may result in rather complicated
588          * operations, including fld queries, inode loading, etc.
589          */
590         o = lu_object_alloc(env, dev, f, conf);
591         if (unlikely(IS_ERR(o)))
592                 return o;
593
594         LASSERT(lu_fid_eq(lu_object_fid(o), f));
595
596         cfs_hash_bd_lock(hs, &bd, 1);
597
598         shadow = htable_lookup(s, &bd, f, waiter, &version);
599         if (likely(shadow == NULL)) {
600                 struct lu_site_bkt_data *bkt;
601
602                 bkt = cfs_hash_bd_extra_get(hs, &bd);
603                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
604                 cfs_list_add_tail(&o->lo_header->loh_lru, &bkt->lsb_lru);
605                 bkt->lsb_busy++;
606                 cfs_hash_bd_unlock(hs, &bd, 1);
607                 return o;
608         }
609
610         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
611         cfs_hash_bd_unlock(hs, &bd, 1);
612         lu_object_free(env, o);
613         return shadow;
614 }
615
616 /**
617  * Much like lu_object_find(), but top level device of object is specifically
618  * \a dev rather than top level device of the site. This interface allows
619  * objects of different "stacking" to be created within the same site.
620  */
621 struct lu_object *lu_object_find_at(const struct lu_env *env,
622                                     struct lu_device *dev,
623                                     const struct lu_fid *f,
624                                     const struct lu_object_conf *conf)
625 {
626         struct lu_site_bkt_data *bkt;
627         struct lu_object        *obj;
628         cfs_waitlink_t           wait;
629
630         while (1) {
631                 obj = lu_object_find_try(env, dev, f, conf, &wait);
632                 if (obj != ERR_PTR(-EAGAIN))
633                         return obj;
634                 /*
635                  * lu_object_find_try() already added waiter into the
636                  * wait queue.
637                  */
638                 cfs_waitq_wait(&wait, CFS_TASK_UNINT);
639                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
640                 cfs_waitq_del(&bkt->lsb_marche_funebre, &wait);
641         }
642 }
643 EXPORT_SYMBOL(lu_object_find_at);
644
645 /**
646  * Find object with given fid, and return its slice belonging to given device.
647  */
648 struct lu_object *lu_object_find_slice(const struct lu_env *env,
649                                        struct lu_device *dev,
650                                        const struct lu_fid *f,
651                                        const struct lu_object_conf *conf)
652 {
653         struct lu_object *top;
654         struct lu_object *obj;
655
656         top = lu_object_find(env, dev, f, conf);
657         if (!IS_ERR(top)) {
658                 obj = lu_object_locate(top->lo_header, dev->ld_type);
659                 if (obj == NULL)
660                         lu_object_put(env, top);
661         } else
662                 obj = top;
663         return obj;
664 }
665 EXPORT_SYMBOL(lu_object_find_slice);
666
667 /**
668  * Global list of all device types.
669  */
670 static CFS_LIST_HEAD(lu_device_types);
671
672 int lu_device_type_init(struct lu_device_type *ldt)
673 {
674         int result;
675
676         CFS_INIT_LIST_HEAD(&ldt->ldt_linkage);
677         result = ldt->ldt_ops->ldto_init(ldt);
678         if (result == 0)
679                 cfs_list_add(&ldt->ldt_linkage, &lu_device_types);
680         return result;
681 }
682 EXPORT_SYMBOL(lu_device_type_init);
683
684 void lu_device_type_fini(struct lu_device_type *ldt)
685 {
686         cfs_list_del_init(&ldt->ldt_linkage);
687         ldt->ldt_ops->ldto_fini(ldt);
688 }
689 EXPORT_SYMBOL(lu_device_type_fini);
690
691 void lu_types_stop(void)
692 {
693         struct lu_device_type *ldt;
694
695         cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
696                 if (ldt->ldt_device_nr == 0)
697                         ldt->ldt_ops->ldto_stop(ldt);
698         }
699 }
700 EXPORT_SYMBOL(lu_types_stop);
701
702 /**
703  * Global list of all sites on this node
704  */
705 static CFS_LIST_HEAD(lu_sites);
706 static CFS_DECLARE_MUTEX(lu_sites_guard);
707
708 /**
709  * Global environment used by site shrinker.
710  */
711 static struct lu_env lu_shrink_env;
712
713 struct lu_site_print_arg {
714         struct lu_env   *lsp_env;
715         void            *lsp_cookie;
716         lu_printer_t     lsp_printer;
717 };
718
719 static int
720 lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd,
721                   cfs_hlist_node_t *hnode, void *data)
722 {
723         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
724         struct lu_object_header  *h;
725
726         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
727         if (!cfs_list_empty(&h->loh_layers)) {
728                 const struct lu_object *o;
729
730                 o = lu_object_top(h);
731                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
732                                 arg->lsp_printer, o);
733         } else {
734                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
735                                        arg->lsp_printer, h);
736         }
737         return 0;
738 }
739
740 /**
741  * Print all objects in \a s.
742  */
743 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
744                    lu_printer_t printer)
745 {
746         struct lu_site_print_arg arg = {
747                 .lsp_env     = (struct lu_env *)env,
748                 .lsp_cookie  = cookie,
749                 .lsp_printer = printer,
750         };
751
752         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
753 }
754 EXPORT_SYMBOL(lu_site_print);
755
756 enum {
757         LU_CACHE_PERCENT_MAX     = 50,
758         LU_CACHE_PERCENT_DEFAULT = 20
759 };
760
761 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
762 CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
763                 "Percentage of memory to be used as lu_object cache");
764
765 /**
766  * Return desired hash table order.
767  */
768 static int lu_htable_order(void)
769 {
770         unsigned long cache_size;
771         int bits;
772
773         /*
774          * Calculate hash table size, assuming that we want reasonable
775          * performance when 20% of total memory is occupied by cache of
776          * lu_objects.
777          *
778          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
779          */
780         cache_size = cfs_num_physpages;
781
782 #if BITS_PER_LONG == 32
783         /* limit hashtable size for lowmem systems to low RAM */
784         if (cache_size > 1 << (30 - CFS_PAGE_SHIFT))
785                 cache_size = 1 << (30 - CFS_PAGE_SHIFT) * 3 / 4;
786 #endif
787
788         /* clear off unreasonable cache setting. */
789         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
790                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
791                       " the range of (0, %u]. Will use default value: %u.\n",
792                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
793                       LU_CACHE_PERCENT_DEFAULT);
794
795                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
796         }
797         cache_size = cache_size / 100 * lu_cache_percent *
798                 (CFS_PAGE_SIZE / 1024);
799
800         for (bits = 1; (1 << bits) < cache_size; ++bits) {
801                 ;
802         }
803         return bits;
804 }
805
806 static unsigned lu_obj_hop_hash(cfs_hash_t *hs,
807                                 const void *key, unsigned mask)
808 {
809         struct lu_fid  *fid = (struct lu_fid *)key;
810         unsigned        hash;
811
812         hash = (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
813         hash += fid_hash(fid, hs->hs_bkt_bits) << hs->hs_bkt_bits;
814         return hash & mask;
815 }
816
817 static void *lu_obj_hop_object(cfs_hlist_node_t *hnode)
818 {
819         return cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
820 }
821
822 static void *lu_obj_hop_key(cfs_hlist_node_t *hnode)
823 {
824         struct lu_object_header *h;
825
826         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
827         return &h->loh_fid;
828 }
829
830 static int lu_obj_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
831 {
832         struct lu_object_header *h;
833
834         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
835         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
836 }
837
838 static void lu_obj_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
839 {
840         struct lu_object_header *h;
841
842         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
843         if (cfs_atomic_add_return(1, &h->loh_ref) == 1) {
844                 struct lu_site_bkt_data *bkt;
845                 cfs_hash_bd_t            bd;
846
847                 cfs_hash_bd_get(hs, &h->loh_fid, &bd);
848                 bkt = cfs_hash_bd_extra_get(hs, &bd);
849                 bkt->lsb_busy++;
850         }
851 }
852
853 static void lu_obj_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
854 {
855         LBUG(); /* we should never called it */
856 }
857
858 cfs_hash_ops_t lu_site_hash_ops = {
859         .hs_hash        = lu_obj_hop_hash,
860         .hs_key         = lu_obj_hop_key,
861         .hs_keycmp      = lu_obj_hop_keycmp,
862         .hs_object      = lu_obj_hop_object,
863         .hs_get         = lu_obj_hop_get,
864         .hs_put_locked  = lu_obj_hop_put_locked,
865 };
866
867 /**
868  * Initialize site \a s, with \a d as the top level device.
869  */
870 #define LU_SITE_BITS_MIN    12
871 #define LU_SITE_BITS_MAX    23
872 /**
873  * total 128 buckets, we don't want too many buckets because:
874  * - consume too much memory
875  * - avoid unbalanced LRU list
876  */
877 #define LU_SITE_BKT_BITS    7
878
879 int lu_site_init(struct lu_site *s, struct lu_device *top)
880 {
881         struct lu_site_bkt_data *bkt;
882         cfs_hash_bd_t bd;
883         int bits;
884         int i;
885         ENTRY;
886
887         memset(s, 0, sizeof *s);
888         bits = lu_htable_order();
889         for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
890              bits >= LU_SITE_BITS_MIN; bits--) {
891                 s->ls_obj_hash = cfs_hash_create("lu_site", bits, bits,
892                                                  bits - LU_SITE_BKT_BITS,
893                                                  sizeof(*bkt), 0, 0,
894                                                  &lu_site_hash_ops,
895                                                  CFS_HASH_SPIN_BKTLOCK |
896                                                  CFS_HASH_NO_ITEMREF |
897                                                  CFS_HASH_DEPTH |
898                                                  CFS_HASH_ASSERT_EMPTY);
899                 if (s->ls_obj_hash != NULL)
900                         break;
901         }
902
903         if (s->ls_obj_hash == NULL) {
904                 CERROR("failed to create lu_site hash with bits: %d\n", bits);
905                 return -ENOMEM;
906         }
907
908         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
909                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
910                 CFS_INIT_LIST_HEAD(&bkt->lsb_lru);
911                 cfs_waitq_init(&bkt->lsb_marche_funebre);
912         }
913
914         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
915         if (s->ls_stats == NULL) {
916                 cfs_hash_putref(s->ls_obj_hash);
917                 s->ls_obj_hash = NULL;
918                 return -ENOMEM;
919         }
920
921         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
922                              0, "created", "created");
923         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
924                              0, "cache_hit", "cache_hit");
925         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
926                              0, "cache_miss", "cache_miss");
927         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
928                              0, "cache_race", "cache_race");
929         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
930                              0, "cache_death_race", "cache_death_race");
931         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
932                              0, "lru_purged", "lru_purged");
933
934         CFS_INIT_LIST_HEAD(&s->ls_linkage);
935         s->ls_top_dev = top;
936         top->ld_site = s;
937         lu_device_get(top);
938         lu_ref_add(&top->ld_reference, "site-top", s);
939
940         RETURN(0);
941 }
942 EXPORT_SYMBOL(lu_site_init);
943
944 /**
945  * Finalize \a s and release its resources.
946  */
947 void lu_site_fini(struct lu_site *s)
948 {
949         cfs_down(&lu_sites_guard);
950         cfs_list_del_init(&s->ls_linkage);
951         cfs_up(&lu_sites_guard);
952
953         if (s->ls_obj_hash != NULL) {
954                 cfs_hash_putref(s->ls_obj_hash);
955                 s->ls_obj_hash = NULL;
956         }
957
958         if (s->ls_top_dev != NULL) {
959                 s->ls_top_dev->ld_site = NULL;
960                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
961                 lu_device_put(s->ls_top_dev);
962                 s->ls_top_dev = NULL;
963         }
964
965         if (s->ls_stats != NULL)
966                 lprocfs_free_stats(&s->ls_stats);
967 }
968 EXPORT_SYMBOL(lu_site_fini);
969
970 /**
971  * Called when initialization of stack for this site is completed.
972  */
973 int lu_site_init_finish(struct lu_site *s)
974 {
975         int result;
976         cfs_down(&lu_sites_guard);
977         result = lu_context_refill(&lu_shrink_env.le_ctx);
978         if (result == 0)
979                 cfs_list_add(&s->ls_linkage, &lu_sites);
980         cfs_up(&lu_sites_guard);
981         return result;
982 }
983 EXPORT_SYMBOL(lu_site_init_finish);
984
985 /**
986  * Acquire additional reference on device \a d
987  */
988 void lu_device_get(struct lu_device *d)
989 {
990         cfs_atomic_inc(&d->ld_ref);
991 }
992 EXPORT_SYMBOL(lu_device_get);
993
994 /**
995  * Release reference on device \a d.
996  */
997 void lu_device_put(struct lu_device *d)
998 {
999         LASSERT(cfs_atomic_read(&d->ld_ref) > 0);
1000         cfs_atomic_dec(&d->ld_ref);
1001 }
1002 EXPORT_SYMBOL(lu_device_put);
1003
1004 /**
1005  * Initialize device \a d of type \a t.
1006  */
1007 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1008 {
1009         if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL)
1010                 t->ldt_ops->ldto_start(t);
1011         memset(d, 0, sizeof *d);
1012         cfs_atomic_set(&d->ld_ref, 0);
1013         d->ld_type = t;
1014         lu_ref_init(&d->ld_reference);
1015         return 0;
1016 }
1017 EXPORT_SYMBOL(lu_device_init);
1018
1019 /**
1020  * Finalize device \a d.
1021  */
1022 void lu_device_fini(struct lu_device *d)
1023 {
1024         struct lu_device_type *t;
1025
1026         t = d->ld_type;
1027         if (d->ld_obd != NULL) {
1028                 d->ld_obd->obd_lu_dev = NULL;
1029                 d->ld_obd = NULL;
1030         }
1031
1032         lu_ref_fini(&d->ld_reference);
1033         LASSERTF(cfs_atomic_read(&d->ld_ref) == 0,
1034                  "Refcount is %u\n", cfs_atomic_read(&d->ld_ref));
1035         LASSERT(t->ldt_device_nr > 0);
1036         if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL)
1037                 t->ldt_ops->ldto_stop(t);
1038 }
1039 EXPORT_SYMBOL(lu_device_fini);
1040
1041 /**
1042  * Initialize object \a o that is part of compound object \a h and was created
1043  * by device \a d.
1044  */
1045 int lu_object_init(struct lu_object *o,
1046                    struct lu_object_header *h, struct lu_device *d)
1047 {
1048         memset(o, 0, sizeof *o);
1049         o->lo_header = h;
1050         o->lo_dev    = d;
1051         lu_device_get(d);
1052         o->lo_dev_ref = lu_ref_add(&d->ld_reference, "lu_object", o);
1053         CFS_INIT_LIST_HEAD(&o->lo_linkage);
1054         return 0;
1055 }
1056 EXPORT_SYMBOL(lu_object_init);
1057
1058 /**
1059  * Finalize object and release its resources.
1060  */
1061 void lu_object_fini(struct lu_object *o)
1062 {
1063         struct lu_device *dev = o->lo_dev;
1064
1065         LASSERT(cfs_list_empty(&o->lo_linkage));
1066
1067         if (dev != NULL) {
1068                 lu_ref_del_at(&dev->ld_reference,
1069                               o->lo_dev_ref , "lu_object", o);
1070                 lu_device_put(dev);
1071                 o->lo_dev = NULL;
1072         }
1073 }
1074 EXPORT_SYMBOL(lu_object_fini);
1075
1076 /**
1077  * Add object \a o as first layer of compound object \a h
1078  *
1079  * This is typically called by the ->ldo_object_alloc() method of top-level
1080  * device.
1081  */
1082 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1083 {
1084         cfs_list_move(&o->lo_linkage, &h->loh_layers);
1085 }
1086 EXPORT_SYMBOL(lu_object_add_top);
1087
1088 /**
1089  * Add object \a o as a layer of compound object, going after \a before.
1090  *
1091  * This is typically called by the ->ldo_object_alloc() method of \a
1092  * before->lo_dev.
1093  */
1094 void lu_object_add(struct lu_object *before, struct lu_object *o)
1095 {
1096         cfs_list_move(&o->lo_linkage, &before->lo_linkage);
1097 }
1098 EXPORT_SYMBOL(lu_object_add);
1099
1100 /**
1101  * Initialize compound object.
1102  */
1103 int lu_object_header_init(struct lu_object_header *h)
1104 {
1105         memset(h, 0, sizeof *h);
1106         cfs_atomic_set(&h->loh_ref, 1);
1107         CFS_INIT_HLIST_NODE(&h->loh_hash);
1108         CFS_INIT_LIST_HEAD(&h->loh_lru);
1109         CFS_INIT_LIST_HEAD(&h->loh_layers);
1110         lu_ref_init(&h->loh_reference);
1111         return 0;
1112 }
1113 EXPORT_SYMBOL(lu_object_header_init);
1114
1115 /**
1116  * Finalize compound object.
1117  */
1118 void lu_object_header_fini(struct lu_object_header *h)
1119 {
1120         LASSERT(cfs_list_empty(&h->loh_layers));
1121         LASSERT(cfs_list_empty(&h->loh_lru));
1122         LASSERT(cfs_hlist_unhashed(&h->loh_hash));
1123         lu_ref_fini(&h->loh_reference);
1124 }
1125 EXPORT_SYMBOL(lu_object_header_fini);
1126
1127 /**
1128  * Given a compound object, find its slice, corresponding to the device type
1129  * \a dtype.
1130  */
1131 struct lu_object *lu_object_locate(struct lu_object_header *h,
1132                                    const struct lu_device_type *dtype)
1133 {
1134         struct lu_object *o;
1135
1136         cfs_list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1137                 if (o->lo_dev->ld_type == dtype)
1138                         return o;
1139         }
1140         return NULL;
1141 }
1142 EXPORT_SYMBOL(lu_object_locate);
1143
1144
1145
1146 /**
1147  * Finalize and free devices in the device stack.
1148  *
1149  * Finalize device stack by purging object cache, and calling
1150  * lu_device_type_operations::ldto_device_fini() and
1151  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1152  */
1153 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1154 {
1155         struct lu_site   *site = top->ld_site;
1156         struct lu_device *scan;
1157         struct lu_device *next;
1158
1159         lu_site_purge(env, site, ~0);
1160         for (scan = top; scan != NULL; scan = next) {
1161                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1162                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1163                 lu_device_put(scan);
1164         }
1165
1166         /* purge again. */
1167         lu_site_purge(env, site, ~0);
1168
1169         if (!cfs_hash_is_empty(site->ls_obj_hash)) {
1170                 /*
1171                  * Uh-oh, objects still exist.
1172                  */
1173                 static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR);
1174
1175                 lu_site_print(env, site, &cookie, lu_cdebug_printer);
1176         }
1177
1178         for (scan = top; scan != NULL; scan = next) {
1179                 const struct lu_device_type *ldt = scan->ld_type;
1180                 struct obd_type             *type;
1181
1182                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1183                 type = ldt->ldt_obd_type;
1184                 if (type != NULL) {
1185                         type->typ_refcnt--;
1186                         class_put_type(type);
1187                 }
1188         }
1189 }
1190 EXPORT_SYMBOL(lu_stack_fini);
1191
1192 enum {
1193         /**
1194          * Maximal number of tld slots.
1195          */
1196         LU_CONTEXT_KEY_NR = 32
1197 };
1198
1199 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1200
1201 static cfs_spinlock_t lu_keys_guard = CFS_SPIN_LOCK_UNLOCKED;
1202
1203 /**
1204  * Global counter incremented whenever key is registered, unregistered,
1205  * revived or quiesced. This is used to void unnecessary calls to
1206  * lu_context_refill(). No locking is provided, as initialization and shutdown
1207  * are supposed to be externally serialized.
1208  */
1209 static unsigned key_set_version = 0;
1210
1211 /**
1212  * Register new key.
1213  */
1214 int lu_context_key_register(struct lu_context_key *key)
1215 {
1216         int result;
1217         int i;
1218
1219         LASSERT(key->lct_init != NULL);
1220         LASSERT(key->lct_fini != NULL);
1221         LASSERT(key->lct_tags != 0);
1222         LASSERT(key->lct_owner != NULL);
1223
1224         result = -ENFILE;
1225         cfs_spin_lock(&lu_keys_guard);
1226         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1227                 if (lu_keys[i] == NULL) {
1228                         key->lct_index = i;
1229                         cfs_atomic_set(&key->lct_used, 1);
1230                         lu_keys[i] = key;
1231                         lu_ref_init(&key->lct_reference);
1232                         result = 0;
1233                         ++key_set_version;
1234                         break;
1235                 }
1236         }
1237         cfs_spin_unlock(&lu_keys_guard);
1238         return result;
1239 }
1240 EXPORT_SYMBOL(lu_context_key_register);
1241
1242 static void key_fini(struct lu_context *ctx, int index)
1243 {
1244         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1245                 struct lu_context_key *key;
1246
1247                 key = lu_keys[index];
1248                 LASSERT(key != NULL);
1249                 LASSERT(key->lct_fini != NULL);
1250                 LASSERT(cfs_atomic_read(&key->lct_used) > 1);
1251
1252                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1253                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1254                 cfs_atomic_dec(&key->lct_used);
1255                 LASSERT(key->lct_owner != NULL);
1256                 if (!(ctx->lc_tags & LCT_NOREF)) {
1257                         LASSERT(cfs_module_refcount(key->lct_owner) > 0);
1258                         cfs_module_put(key->lct_owner);
1259                 }
1260                 ctx->lc_value[index] = NULL;
1261         }
1262 }
1263
1264 /**
1265  * Deregister key.
1266  */
1267 void lu_context_key_degister(struct lu_context_key *key)
1268 {
1269         LASSERT(cfs_atomic_read(&key->lct_used) >= 1);
1270         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1271
1272         lu_context_key_quiesce(key);
1273
1274         ++key_set_version;
1275         cfs_spin_lock(&lu_keys_guard);
1276         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1277         if (lu_keys[key->lct_index]) {
1278                 lu_keys[key->lct_index] = NULL;
1279                 lu_ref_fini(&key->lct_reference);
1280         }
1281         cfs_spin_unlock(&lu_keys_guard);
1282
1283         LASSERTF(cfs_atomic_read(&key->lct_used) == 1,
1284                  "key has instances: %d\n",
1285                  cfs_atomic_read(&key->lct_used));
1286 }
1287 EXPORT_SYMBOL(lu_context_key_degister);
1288
1289 /**
1290  * Register a number of keys. This has to be called after all keys have been
1291  * initialized by a call to LU_CONTEXT_KEY_INIT().
1292  */
1293 int lu_context_key_register_many(struct lu_context_key *k, ...)
1294 {
1295         struct lu_context_key *key = k;
1296         va_list args;
1297         int result;
1298
1299         va_start(args, k);
1300         do {
1301                 result = lu_context_key_register(key);
1302                 if (result)
1303                         break;
1304                 key = va_arg(args, struct lu_context_key *);
1305         } while (key != NULL);
1306         va_end(args);
1307
1308         if (result != 0) {
1309                 va_start(args, k);
1310                 while (k != key) {
1311                         lu_context_key_degister(k);
1312                         k = va_arg(args, struct lu_context_key *);
1313                 }
1314                 va_end(args);
1315         }
1316
1317         return result;
1318 }
1319 EXPORT_SYMBOL(lu_context_key_register_many);
1320
1321 /**
1322  * De-register a number of keys. This is a dual to
1323  * lu_context_key_register_many().
1324  */
1325 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1326 {
1327         va_list args;
1328
1329         va_start(args, k);
1330         do {
1331                 lu_context_key_degister(k);
1332                 k = va_arg(args, struct lu_context_key*);
1333         } while (k != NULL);
1334         va_end(args);
1335 }
1336 EXPORT_SYMBOL(lu_context_key_degister_many);
1337
1338 /**
1339  * Revive a number of keys.
1340  */
1341 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1342 {
1343         va_list args;
1344
1345         va_start(args, k);
1346         do {
1347                 lu_context_key_revive(k);
1348                 k = va_arg(args, struct lu_context_key*);
1349         } while (k != NULL);
1350         va_end(args);
1351 }
1352 EXPORT_SYMBOL(lu_context_key_revive_many);
1353
1354 /**
1355  * Quiescent a number of keys.
1356  */
1357 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1358 {
1359         va_list args;
1360
1361         va_start(args, k);
1362         do {
1363                 lu_context_key_quiesce(k);
1364                 k = va_arg(args, struct lu_context_key*);
1365         } while (k != NULL);
1366         va_end(args);
1367 }
1368 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1369
1370 /**
1371  * Return value associated with key \a key in context \a ctx.
1372  */
1373 void *lu_context_key_get(const struct lu_context *ctx,
1374                          const struct lu_context_key *key)
1375 {
1376         LINVRNT(ctx->lc_state == LCS_ENTERED);
1377         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1378         LASSERT(lu_keys[key->lct_index] == key);
1379         return ctx->lc_value[key->lct_index];
1380 }
1381 EXPORT_SYMBOL(lu_context_key_get);
1382
1383 /**
1384  * List of remembered contexts. XXX document me.
1385  */
1386 static CFS_LIST_HEAD(lu_context_remembered);
1387
1388 /**
1389  * Destroy \a key in all remembered contexts. This is used to destroy key
1390  * values in "shared" contexts (like service threads), when a module owning
1391  * the key is about to be unloaded.
1392  */
1393 void lu_context_key_quiesce(struct lu_context_key *key)
1394 {
1395         struct lu_context *ctx;
1396         extern unsigned cl_env_cache_purge(unsigned nr);
1397
1398         if (!(key->lct_tags & LCT_QUIESCENT)) {
1399                 /*
1400                  * XXX layering violation.
1401                  */
1402                 cl_env_cache_purge(~0);
1403                 key->lct_tags |= LCT_QUIESCENT;
1404                 /*
1405                  * XXX memory barrier has to go here.
1406                  */
1407                 cfs_spin_lock(&lu_keys_guard);
1408                 cfs_list_for_each_entry(ctx, &lu_context_remembered,
1409                                         lc_remember)
1410                         key_fini(ctx, key->lct_index);
1411                 cfs_spin_unlock(&lu_keys_guard);
1412                 ++key_set_version;
1413         }
1414 }
1415 EXPORT_SYMBOL(lu_context_key_quiesce);
1416
1417 void lu_context_key_revive(struct lu_context_key *key)
1418 {
1419         key->lct_tags &= ~LCT_QUIESCENT;
1420         ++key_set_version;
1421 }
1422 EXPORT_SYMBOL(lu_context_key_revive);
1423
1424 static void keys_fini(struct lu_context *ctx)
1425 {
1426         int i;
1427
1428         cfs_spin_lock(&lu_keys_guard);
1429         if (ctx->lc_value != NULL) {
1430                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1431                         key_fini(ctx, i);
1432                 OBD_FREE(ctx->lc_value,
1433                          ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1434                 ctx->lc_value = NULL;
1435         }
1436         cfs_spin_unlock(&lu_keys_guard);
1437 }
1438
1439 static int keys_fill(struct lu_context *ctx)
1440 {
1441         int i;
1442
1443         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1444                 struct lu_context_key *key;
1445
1446                 key = lu_keys[i];
1447                 if (ctx->lc_value[i] == NULL && key != NULL &&
1448                     (key->lct_tags & ctx->lc_tags) &&
1449                     /*
1450                      * Don't create values for a LCT_QUIESCENT key, as this
1451                      * will pin module owning a key.
1452                      */
1453                     !(key->lct_tags & LCT_QUIESCENT)) {
1454                         void *value;
1455
1456                         LINVRNT(key->lct_init != NULL);
1457                         LINVRNT(key->lct_index == i);
1458
1459                         value = key->lct_init(ctx, key);
1460                         if (unlikely(IS_ERR(value)))
1461                                 return PTR_ERR(value);
1462
1463                         LASSERT(key->lct_owner != NULL);
1464                         if (!(ctx->lc_tags & LCT_NOREF))
1465                                 cfs_try_module_get(key->lct_owner);
1466                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1467                         cfs_atomic_inc(&key->lct_used);
1468                         /*
1469                          * This is the only place in the code, where an
1470                          * element of ctx->lc_value[] array is set to non-NULL
1471                          * value.
1472                          */
1473                         ctx->lc_value[i] = value;
1474                         if (key->lct_exit != NULL)
1475                                 ctx->lc_tags |= LCT_HAS_EXIT;
1476                 }
1477                 ctx->lc_version = key_set_version;
1478         }
1479         return 0;
1480 }
1481
1482 static int keys_init(struct lu_context *ctx)
1483 {
1484         int result;
1485
1486         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1487         if (likely(ctx->lc_value != NULL))
1488                 result = keys_fill(ctx);
1489         else
1490                 result = -ENOMEM;
1491
1492         if (result != 0)
1493                 keys_fini(ctx);
1494         return result;
1495 }
1496
1497 /**
1498  * Initialize context data-structure. Create values for all keys.
1499  */
1500 int lu_context_init(struct lu_context *ctx, __u32 tags)
1501 {
1502         memset(ctx, 0, sizeof *ctx);
1503         ctx->lc_state = LCS_INITIALIZED;
1504         ctx->lc_tags = tags;
1505         if (tags & LCT_REMEMBER) {
1506                 cfs_spin_lock(&lu_keys_guard);
1507                 cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
1508                 cfs_spin_unlock(&lu_keys_guard);
1509         } else
1510                 CFS_INIT_LIST_HEAD(&ctx->lc_remember);
1511         return keys_init(ctx);
1512 }
1513 EXPORT_SYMBOL(lu_context_init);
1514
1515 /**
1516  * Finalize context data-structure. Destroy key values.
1517  */
1518 void lu_context_fini(struct lu_context *ctx)
1519 {
1520         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1521         ctx->lc_state = LCS_FINALIZED;
1522         keys_fini(ctx);
1523         cfs_spin_lock(&lu_keys_guard);
1524         cfs_list_del_init(&ctx->lc_remember);
1525         cfs_spin_unlock(&lu_keys_guard);
1526 }
1527 EXPORT_SYMBOL(lu_context_fini);
1528
1529 /**
1530  * Called before entering context.
1531  */
1532 void lu_context_enter(struct lu_context *ctx)
1533 {
1534         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1535         ctx->lc_state = LCS_ENTERED;
1536 }
1537 EXPORT_SYMBOL(lu_context_enter);
1538
1539 /**
1540  * Called after exiting from \a ctx
1541  */
1542 void lu_context_exit(struct lu_context *ctx)
1543 {
1544         int i;
1545
1546         LINVRNT(ctx->lc_state == LCS_ENTERED);
1547         ctx->lc_state = LCS_LEFT;
1548         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1549                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1550                         if (ctx->lc_value[i] != NULL) {
1551                                 struct lu_context_key *key;
1552
1553                                 key = lu_keys[i];
1554                                 LASSERT(key != NULL);
1555                                 if (key->lct_exit != NULL)
1556                                         key->lct_exit(ctx,
1557                                                       key, ctx->lc_value[i]);
1558                         }
1559                 }
1560         }
1561 }
1562 EXPORT_SYMBOL(lu_context_exit);
1563
1564 /**
1565  * Allocate for context all missing keys that were registered after context
1566  * creation.
1567  */
1568 int lu_context_refill(struct lu_context *ctx)
1569 {
1570         LINVRNT(ctx->lc_value != NULL);
1571         return ctx->lc_version == key_set_version ? 0 : keys_fill(ctx);
1572 }
1573 EXPORT_SYMBOL(lu_context_refill);
1574
1575 int lu_env_init(struct lu_env *env, __u32 tags)
1576 {
1577         int result;
1578
1579         env->le_ses = NULL;
1580         result = lu_context_init(&env->le_ctx, tags);
1581         if (likely(result == 0))
1582                 lu_context_enter(&env->le_ctx);
1583         return result;
1584 }
1585 EXPORT_SYMBOL(lu_env_init);
1586
1587 void lu_env_fini(struct lu_env *env)
1588 {
1589         lu_context_exit(&env->le_ctx);
1590         lu_context_fini(&env->le_ctx);
1591         env->le_ses = NULL;
1592 }
1593 EXPORT_SYMBOL(lu_env_fini);
1594
1595 int lu_env_refill(struct lu_env *env)
1596 {
1597         int result;
1598
1599         result = lu_context_refill(&env->le_ctx);
1600         if (result == 0 && env->le_ses != NULL)
1601                 result = lu_context_refill(env->le_ses);
1602         return result;
1603 }
1604 EXPORT_SYMBOL(lu_env_refill);
1605
1606 static struct cfs_shrinker *lu_site_shrinker = NULL;
1607
1608 typedef struct lu_site_stats{
1609         unsigned        lss_populated;
1610         unsigned        lss_max_search;
1611         unsigned        lss_total;
1612         unsigned        lss_busy;
1613 } lu_site_stats_t;
1614
1615 static void lu_site_stats_get(cfs_hash_t *hs,
1616                               lu_site_stats_t *stats, int populated)
1617 {
1618         cfs_hash_bd_t bd;
1619         int           i;
1620
1621         cfs_hash_for_each_bucket(hs, &bd, i) {
1622                 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1623                 cfs_hlist_head_t        *hhead;
1624
1625                 cfs_hash_bd_lock(hs, &bd, 1);
1626                 stats->lss_busy  += bkt->lsb_busy;
1627                 stats->lss_total += cfs_hash_bd_count_get(&bd);
1628                 stats->lss_max_search = max((int)stats->lss_max_search,
1629                                             cfs_hash_bd_depmax_get(&bd));
1630                 if (!populated) {
1631                         cfs_hash_bd_unlock(hs, &bd, 1);
1632                         continue;
1633                 }
1634
1635                 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1636                         if (!cfs_hlist_empty(hhead))
1637                                 stats->lss_populated++;
1638                 }
1639                 cfs_hash_bd_unlock(hs, &bd, 1);
1640         }
1641 }
1642
1643 #ifdef __KERNEL__
1644 static int lu_cache_shrink(SHRINKER_FIRST_ARG int nr_to_scan,
1645                            unsigned int gfp_mask)
1646 {
1647         lu_site_stats_t stats;
1648         struct lu_site *s;
1649         struct lu_site *tmp;
1650         int cached = 0;
1651         int remain = nr_to_scan;
1652         CFS_LIST_HEAD(splice);
1653
1654         if (nr_to_scan != 0) {
1655                 if (!(gfp_mask & __GFP_FS))
1656                         return -1;
1657                 CDEBUG(D_INODE, "Shrink %d objects\n", nr_to_scan);
1658         }
1659
1660         cfs_down(&lu_sites_guard);
1661         cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1662                 if (nr_to_scan != 0) {
1663                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1664                         /*
1665                          * Move just shrunk site to the tail of site list to
1666                          * assure shrinking fairness.
1667                          */
1668                         cfs_list_move_tail(&s->ls_linkage, &splice);
1669                 }
1670
1671                 memset(&stats, 0, sizeof(stats));
1672                 lu_site_stats_get(s->ls_obj_hash, &stats, 0);
1673                 cached += stats.lss_total - stats.lss_busy;
1674                 if (nr_to_scan && remain <= 0)
1675                         break;
1676         }
1677         cfs_list_splice(&splice, lu_sites.prev);
1678         cfs_up(&lu_sites_guard);
1679
1680         cached = (cached / 100) * sysctl_vfs_cache_pressure;
1681         if (nr_to_scan == 0)
1682                 CDEBUG(D_INODE, "%d objects cached\n", cached);
1683         return cached;
1684 }
1685
1686 /*
1687  * Debugging stuff.
1688  */
1689
1690 /**
1691  * Environment to be used in debugger, contains all tags.
1692  */
1693 struct lu_env lu_debugging_env;
1694
1695 /**
1696  * Debugging printer function using printk().
1697  */
1698 int lu_printk_printer(const struct lu_env *env,
1699                       void *unused, const char *format, ...)
1700 {
1701         va_list args;
1702
1703         va_start(args, format);
1704         vprintk(format, args);
1705         va_end(args);
1706         return 0;
1707 }
1708
1709 void lu_debugging_setup(void)
1710 {
1711         lu_env_init(&lu_debugging_env, ~0);
1712 }
1713
1714 void lu_context_keys_dump(void)
1715 {
1716         int i;
1717
1718         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1719                 struct lu_context_key *key;
1720
1721                 key = lu_keys[i];
1722                 if (key != NULL) {
1723                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
1724                                i, key, key->lct_tags,
1725                                key->lct_init, key->lct_fini, key->lct_exit,
1726                                key->lct_index, cfs_atomic_read(&key->lct_used),
1727                                key->lct_owner ? key->lct_owner->name : "",
1728                                key->lct_owner);
1729                         lu_ref_print(&key->lct_reference);
1730                 }
1731         }
1732 }
1733 EXPORT_SYMBOL(lu_context_keys_dump);
1734 #else  /* !__KERNEL__ */
1735 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
1736 {
1737         return 0;
1738 }
1739 #endif /* __KERNEL__ */
1740
1741 int  cl_global_init(void);
1742 void cl_global_fini(void);
1743 int  lu_ref_global_init(void);
1744 void lu_ref_global_fini(void);
1745
1746 int dt_global_init(void);
1747 void dt_global_fini(void);
1748
1749 int llo_global_init(void);
1750 void llo_global_fini(void);
1751
1752 /**
1753  * Initialization of global lu_* data.
1754  */
1755 int lu_global_init(void)
1756 {
1757         int result;
1758
1759         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
1760
1761         result = lu_ref_global_init();
1762         if (result != 0)
1763                 return result;
1764
1765         LU_CONTEXT_KEY_INIT(&lu_global_key);
1766         result = lu_context_key_register(&lu_global_key);
1767         if (result != 0)
1768                 return result;
1769         /*
1770          * At this level, we don't know what tags are needed, so allocate them
1771          * conservatively. This should not be too bad, because this
1772          * environment is global.
1773          */
1774         cfs_down(&lu_sites_guard);
1775         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
1776         cfs_up(&lu_sites_guard);
1777         if (result != 0)
1778                 return result;
1779
1780         /*
1781          * seeks estimation: 3 seeks to read a record from oi, one to read
1782          * inode, one for ea. Unfortunately setting this high value results in
1783          * lu_object/inode cache consuming all the memory.
1784          */
1785         lu_site_shrinker = cfs_set_shrinker(CFS_DEFAULT_SEEKS, lu_cache_shrink);
1786         if (lu_site_shrinker == NULL)
1787                 return -ENOMEM;
1788
1789         result = lu_time_global_init();
1790         if (result)
1791                 GOTO(out, result);
1792
1793 #ifdef __KERNEL__
1794         result = dt_global_init();
1795         if (result)
1796                 GOTO(out, result);
1797
1798         result = llo_global_init();
1799         if (result)
1800                 GOTO(out, result);
1801 #endif
1802         result = cl_global_init();
1803 out:
1804
1805         return result;
1806 }
1807
1808 /**
1809  * Dual to lu_global_init().
1810  */
1811 void lu_global_fini(void)
1812 {
1813         cl_global_fini();
1814 #ifdef __KERNEL__
1815         llo_global_fini();
1816         dt_global_fini();
1817 #endif
1818         lu_time_global_fini();
1819         if (lu_site_shrinker != NULL) {
1820                 cfs_remove_shrinker(lu_site_shrinker);
1821                 lu_site_shrinker = NULL;
1822         }
1823
1824         lu_context_key_degister(&lu_global_key);
1825
1826         /*
1827          * Tear shrinker environment down _after_ de-registering
1828          * lu_global_key, because the latter has a value in the former.
1829          */
1830         cfs_down(&lu_sites_guard);
1831         lu_env_fini(&lu_shrink_env);
1832         cfs_up(&lu_sites_guard);
1833
1834         lu_ref_global_fini();
1835 }
1836
1837 struct lu_buf LU_BUF_NULL = {
1838         .lb_buf = NULL,
1839         .lb_len = 0
1840 };
1841 EXPORT_SYMBOL(LU_BUF_NULL);
1842
1843 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
1844 {
1845 #ifdef LPROCFS
1846         struct lprocfs_counter ret;
1847
1848         lprocfs_stats_collect(stats, idx, &ret);
1849         return (__u32)ret.lc_count;
1850 #else
1851         return 0;
1852 #endif
1853 }
1854
1855 /**
1856  * Output site statistical counters into a buffer. Suitable for
1857  * lprocfs_rd_*()-style functions.
1858  */
1859 int lu_site_stats_print(const struct lu_site *s, char *page, int count)
1860 {
1861         lu_site_stats_t stats;
1862
1863         memset(&stats, 0, sizeof(stats));
1864         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
1865
1866         return snprintf(page, count, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
1867                         stats.lss_busy,
1868                         stats.lss_total,
1869                         stats.lss_populated,
1870                         CFS_HASH_NHLIST(s->ls_obj_hash),
1871                         stats.lss_max_search,
1872                         ls_stats_read(s->ls_stats, LU_SS_CREATED),
1873                         ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
1874                         ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
1875                         ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
1876                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
1877                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
1878 }
1879 EXPORT_SYMBOL(lu_site_stats_print);
1880
1881 const char *lu_time_names[LU_TIME_NR] = {
1882         [LU_TIME_FIND_LOOKUP] = "find_lookup",
1883         [LU_TIME_FIND_ALLOC]  = "find_alloc",
1884         [LU_TIME_FIND_INSERT] = "find_insert"
1885 };
1886 EXPORT_SYMBOL(lu_time_names);
1887
1888 /**
1889  * Helper function to initialize a number of kmem slab caches at once.
1890  */
1891 int lu_kmem_init(struct lu_kmem_descr *caches)
1892 {
1893         int result;
1894
1895         for (result = 0; caches->ckd_cache != NULL; ++caches) {
1896                 *caches->ckd_cache = cfs_mem_cache_create(caches->ckd_name,
1897                                                           caches->ckd_size,
1898                                                           0, 0);
1899                 if (*caches->ckd_cache == NULL) {
1900                         result = -ENOMEM;
1901                         break;
1902                 }
1903         }
1904         return result;
1905 }
1906 EXPORT_SYMBOL(lu_kmem_init);
1907
1908 /**
1909  * Helper function to finalize a number of kmem slab cached at once. Dual to
1910  * lu_kmem_init().
1911  */
1912 void lu_kmem_fini(struct lu_kmem_descr *caches)
1913 {
1914         int rc;
1915
1916         for (; caches->ckd_cache != NULL; ++caches) {
1917                 if (*caches->ckd_cache != NULL) {
1918                         rc = cfs_mem_cache_destroy(*caches->ckd_cache);
1919                         LASSERTF(rc == 0, "couldn't destroy %s slab\n",
1920                                  caches->ckd_name);
1921                         *caches->ckd_cache = NULL;
1922                 }
1923         }
1924 }
1925 EXPORT_SYMBOL(lu_kmem_fini);