Whamcloud - gitweb
567c4b4c18cc5d3bef3e9d0f16043b6c43db3914
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  *   Author: Nikita Danilov <nikita.danilov@sun.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46
47 #include <libcfs/libcfs.h>
48
49 #ifdef __KERNEL__
50 # include <linux/module.h>
51 #endif
52
53 /* hash_long() */
54 #include <libcfs/libcfs_hash.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_disk.h>
58 #include <lustre_fid.h>
59 #include <lu_object.h>
60 #include <lu_ref.h>
61 #include <libcfs/list.h>
62
63 enum {
64         LU_CACHE_PERCENT_MAX     = 50,
65         LU_CACHE_PERCENT_DEFAULT = 20
66 };
67
68 #define LU_CACHE_NR_MAX_ADJUST          128
69 #define LU_CACHE_NR_UNLIMITED           -1
70 #define LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
71 #define LU_CACHE_NR_LDISKFS_LIMIT       LU_CACHE_NR_UNLIMITED
72 /** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */
73 #define LU_CACHE_NR_ZFS_LIMIT           10240
74
75 #define LU_SITE_BITS_MIN    12
76 #define LU_SITE_BITS_MAX    24
77 /**
78  * total 256 buckets, we don't want too many buckets because:
79  * - consume too much memory
80  * - avoid unbalanced LRU list
81  */
82 #define LU_SITE_BKT_BITS    8
83
84
85 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
86 CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
87                 "Percentage of memory to be used as lu_object cache");
88
89 static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
90 CFS_MODULE_PARM(lu_cache_nr, "l", long, 0644,
91                 "Maximum number of objects in lu_object cache");
92
93 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
94
95 /**
96  * Decrease reference counter on object. If last reference is freed, return
97  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
98  * case, free object immediately.
99  */
100 void lu_object_put(const struct lu_env *env, struct lu_object *o)
101 {
102         struct lu_site_bkt_data *bkt;
103         struct lu_object_header *top;
104         struct lu_site          *site;
105         struct lu_object        *orig;
106         cfs_hash_bd_t            bd;
107         const struct lu_fid     *fid;
108
109         top  = o->lo_header;
110         site = o->lo_dev->ld_site;
111         orig = o;
112
113         /*
114          * till we have full fids-on-OST implemented anonymous objects
115          * are possible in OSP. such an object isn't listed in the site
116          * so we should not remove it from the site.
117          */
118         fid = lu_object_fid(o);
119         if (fid_is_zero(fid)) {
120                 LASSERT(top->loh_hash.next == NULL
121                         && top->loh_hash.pprev == NULL);
122                 LASSERT(list_empty(&top->loh_lru));
123                 if (!atomic_dec_and_test(&top->loh_ref))
124                         return;
125                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
126                         if (o->lo_ops->loo_object_release != NULL)
127                                 o->lo_ops->loo_object_release(env, o);
128                 }
129                 lu_object_free(env, orig);
130                 return;
131         }
132
133         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
134         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
135
136         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
137                 if (lu_object_is_dying(top)) {
138
139                         /*
140                          * somebody may be waiting for this, currently only
141                          * used for cl_object, see cl_object_put_last().
142                          */
143                         wake_up_all(&bkt->lsb_marche_funebre);
144                 }
145                 return;
146         }
147
148         LASSERT(bkt->lsb_busy > 0);
149         bkt->lsb_busy--;
150         /*
151          * When last reference is released, iterate over object
152          * layers, and notify them that object is no longer busy.
153          */
154         list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
155                 if (o->lo_ops->loo_object_release != NULL)
156                         o->lo_ops->loo_object_release(env, o);
157         }
158
159         if (!lu_object_is_dying(top)) {
160                 LASSERT(list_empty(&top->loh_lru));
161                 list_add_tail(&top->loh_lru, &bkt->lsb_lru);
162                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
163                 return;
164         }
165
166         /*
167          * If object is dying (will not be cached), removed it
168          * from hash table and LRU.
169          *
170          * This is done with hash table and LRU lists locked. As the only
171          * way to acquire first reference to previously unreferenced
172          * object is through hash-table lookup (lu_object_find()),
173          * or LRU scanning (lu_site_purge()), that are done under hash-table
174          * and LRU lock, no race with concurrent object lookup is possible
175          * and we can safely destroy object below.
176          */
177         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
178                 cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
179         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
180         /*
181          * Object was already removed from hash and lru above, can
182          * kill it.
183          */
184         lu_object_free(env, orig);
185 }
186 EXPORT_SYMBOL(lu_object_put);
187
188 /**
189  * Put object and don't keep in cache. This is temporary solution for
190  * multi-site objects when its layering is not constant.
191  */
192 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
193 {
194         set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
195         return lu_object_put(env, o);
196 }
197 EXPORT_SYMBOL(lu_object_put_nocache);
198
199 /**
200  * Kill the object and take it out of LRU cache.
201  * Currently used by client code for layout change.
202  */
203 void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
204 {
205         struct lu_object_header *top;
206
207         top = o->lo_header;
208         set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
209         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
210                 cfs_hash_t *obj_hash = o->lo_dev->ld_site->ls_obj_hash;
211                 cfs_hash_bd_t bd;
212
213                 cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
214                 list_del_init(&top->loh_lru);
215                 cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
216                 cfs_hash_bd_unlock(obj_hash, &bd, 1);
217         }
218 }
219 EXPORT_SYMBOL(lu_object_unhash);
220
221 /**
222  * Allocate new object.
223  *
224  * This follows object creation protocol, described in the comment within
225  * struct lu_device_operations definition.
226  */
227 static struct lu_object *lu_object_alloc(const struct lu_env *env,
228                                          struct lu_device *dev,
229                                          const struct lu_fid *f,
230                                          const struct lu_object_conf *conf)
231 {
232         struct lu_object *scan;
233         struct lu_object *top;
234         struct list_head *layers;
235         unsigned int init_mask = 0;
236         unsigned int init_flag;
237         int clean;
238         int result;
239         ENTRY;
240
241         /*
242          * Create top-level object slice. This will also create
243          * lu_object_header.
244          */
245         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
246         if (top == NULL)
247                 RETURN(ERR_PTR(-ENOMEM));
248         if (IS_ERR(top))
249                 RETURN(top);
250         /*
251          * This is the only place where object fid is assigned. It's constant
252          * after this point.
253          */
254         top->lo_header->loh_fid = *f;
255         layers = &top->lo_header->loh_layers;
256
257         do {
258                 /*
259                  * Call ->loo_object_init() repeatedly, until no more new
260                  * object slices are created.
261                  */
262                 clean = 1;
263                 init_flag = 1;
264                 list_for_each_entry(scan, layers, lo_linkage) {
265                         if (init_mask & init_flag)
266                                 goto next;
267                         clean = 0;
268                         scan->lo_header = top->lo_header;
269                         result = scan->lo_ops->loo_object_init(env, scan, conf);
270                         if (result != 0) {
271                                 lu_object_free(env, top);
272                                 RETURN(ERR_PTR(result));
273                         }
274                         init_mask |= init_flag;
275 next:
276                         init_flag <<= 1;
277                 }
278         } while (!clean);
279
280         list_for_each_entry_reverse(scan, layers, lo_linkage) {
281                 if (scan->lo_ops->loo_object_start != NULL) {
282                         result = scan->lo_ops->loo_object_start(env, scan);
283                         if (result != 0) {
284                                 lu_object_free(env, top);
285                                 RETURN(ERR_PTR(result));
286                         }
287                 }
288         }
289
290         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
291         RETURN(top);
292 }
293
294 /**
295  * Free an object.
296  */
297 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
298 {
299         struct lu_site_bkt_data *bkt;
300         struct lu_site          *site;
301         struct lu_object        *scan;
302         struct list_head        *layers;
303         struct list_head         splice;
304
305         site   = o->lo_dev->ld_site;
306         layers = &o->lo_header->loh_layers;
307         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
308         /*
309          * First call ->loo_object_delete() method to release all resources.
310          */
311         list_for_each_entry_reverse(scan, layers, lo_linkage) {
312                 if (scan->lo_ops->loo_object_delete != NULL)
313                         scan->lo_ops->loo_object_delete(env, scan);
314         }
315
316         /*
317          * Then, splice object layers into stand-alone list, and call
318          * ->loo_object_free() on all layers to free memory. Splice is
319          * necessary, because lu_object_header is freed together with the
320          * top-level slice.
321          */
322         INIT_LIST_HEAD(&splice);
323         list_splice_init(layers, &splice);
324         while (!list_empty(&splice)) {
325                 /*
326                  * Free layers in bottom-to-top order, so that object header
327                  * lives as long as possible and ->loo_object_free() methods
328                  * can look at its contents.
329                  */
330                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
331                 list_del_init(&o->lo_linkage);
332                 LASSERT(o->lo_ops->loo_object_free != NULL);
333                 o->lo_ops->loo_object_free(env, o);
334         }
335
336         if (waitqueue_active(&bkt->lsb_marche_funebre))
337                 wake_up_all(&bkt->lsb_marche_funebre);
338 }
339
340 /**
341  * Free \a nr objects from the cold end of the site LRU list.
342  */
343 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
344 {
345         struct lu_object_header *h;
346         struct lu_object_header *temp;
347         struct lu_site_bkt_data *bkt;
348         cfs_hash_bd_t            bd;
349         cfs_hash_bd_t            bd2;
350         struct list_head         dispose;
351         int                      did_sth;
352         int                      start;
353         int                      count;
354         int                      bnr;
355         unsigned int             i;
356
357         if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
358                 RETURN(0);
359
360         INIT_LIST_HEAD(&dispose);
361         /*
362          * Under LRU list lock, scan LRU list and move unreferenced objects to
363          * the dispose list, removing them from LRU and hash table.
364          */
365         start = s->ls_purge_start;
366         bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
367  again:
368         /*
369          * It doesn't make any sense to make purge threads parallel, that can
370          * only bring troubles to us. See LU-5331.
371          */
372         mutex_lock(&s->ls_purge_mutex);
373         did_sth = 0;
374         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
375                 if (i < start)
376                         continue;
377                 count = bnr;
378                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
379                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
380
381                 list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
382                         LASSERT(atomic_read(&h->loh_ref) == 0);
383
384                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
385                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
386
387                         cfs_hash_bd_del_locked(s->ls_obj_hash,
388                                                &bd2, &h->loh_hash);
389                         list_move(&h->loh_lru, &dispose);
390                         if (did_sth == 0)
391                                 did_sth = 1;
392
393                         if (nr != ~0 && --nr == 0)
394                                 break;
395
396                         if (count > 0 && --count == 0)
397                                 break;
398
399                 }
400                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
401                 cond_resched();
402                 /*
403                  * Free everything on the dispose list. This is safe against
404                  * races due to the reasons described in lu_object_put().
405                  */
406                 while (!list_empty(&dispose)) {
407                         h = container_of0(dispose.next,
408                                           struct lu_object_header, loh_lru);
409                         list_del_init(&h->loh_lru);
410                         lu_object_free(env, lu_object_top(h));
411                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
412                 }
413
414                 if (nr == 0)
415                         break;
416         }
417         mutex_unlock(&s->ls_purge_mutex);
418
419         if (nr != 0 && did_sth && start != 0) {
420                 start = 0; /* restart from the first bucket */
421                 goto again;
422         }
423         /* race on s->ls_purge_start, but nobody cares */
424         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
425
426         return nr;
427 }
428 EXPORT_SYMBOL(lu_site_purge);
429
430 /*
431  * Object printing.
432  *
433  * Code below has to jump through certain loops to output object description
434  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
435  * composes object description from strings that are parts of _lines_ of
436  * output (i.e., strings that are not terminated by newline). This doesn't fit
437  * very well into libcfs_debug_msg() interface that assumes that each message
438  * supplied to it is a self-contained output line.
439  *
440  * To work around this, strings are collected in a temporary buffer
441  * (implemented as a value of lu_cdebug_key key), until terminating newline
442  * character is detected.
443  *
444  */
445
446 enum {
447         /**
448          * Maximal line size.
449          *
450          * XXX overflow is not handled correctly.
451          */
452         LU_CDEBUG_LINE = 512
453 };
454
455 struct lu_cdebug_data {
456         /**
457          * Temporary buffer.
458          */
459         char lck_area[LU_CDEBUG_LINE];
460 };
461
462 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
463 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
464
465 /**
466  * Key, holding temporary buffer. This key is registered very early by
467  * lu_global_init().
468  */
469 struct lu_context_key lu_global_key = {
470         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
471                     LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
472         .lct_init = lu_global_key_init,
473         .lct_fini = lu_global_key_fini
474 };
475
476 /**
477  * Printer function emitting messages through libcfs_debug_msg().
478  */
479 int lu_cdebug_printer(const struct lu_env *env,
480                       void *cookie, const char *format, ...)
481 {
482         struct libcfs_debug_msg_data *msgdata = cookie;
483         struct lu_cdebug_data        *key;
484         int used;
485         int complete;
486         va_list args;
487
488         va_start(args, format);
489
490         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
491         LASSERT(key != NULL);
492
493         used = strlen(key->lck_area);
494         complete = format[strlen(format) - 1] == '\n';
495         /*
496          * Append new chunk to the buffer.
497          */
498         vsnprintf(key->lck_area + used,
499                   ARRAY_SIZE(key->lck_area) - used, format, args);
500         if (complete) {
501                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
502                         libcfs_debug_msg(msgdata, "%s\n", key->lck_area);
503                 key->lck_area[0] = 0;
504         }
505         va_end(args);
506         return 0;
507 }
508 EXPORT_SYMBOL(lu_cdebug_printer);
509
510 /**
511  * Print object header.
512  */
513 void lu_object_header_print(const struct lu_env *env, void *cookie,
514                             lu_printer_t printer,
515                             const struct lu_object_header *hdr)
516 {
517         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
518                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
519                    PFID(&hdr->loh_fid),
520                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
521                    list_empty((struct list_head *)&hdr->loh_lru) ? \
522                    "" : " lru",
523                    hdr->loh_attr & LOHA_EXISTS ? " exist" : "");
524 }
525 EXPORT_SYMBOL(lu_object_header_print);
526
527 /**
528  * Print human readable representation of the \a o to the \a printer.
529  */
530 void lu_object_print(const struct lu_env *env, void *cookie,
531                      lu_printer_t printer, const struct lu_object *o)
532 {
533         static const char ruler[] = "........................................";
534         struct lu_object_header *top;
535         int depth = 4;
536
537         top = o->lo_header;
538         lu_object_header_print(env, cookie, printer, top);
539         (*printer)(env, cookie, "{\n");
540
541         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
542                 /*
543                  * print `.' \a depth times followed by type name and address
544                  */
545                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
546                            o->lo_dev->ld_type->ldt_name, o);
547
548                 if (o->lo_ops->loo_object_print != NULL)
549                         (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
550
551                 (*printer)(env, cookie, "\n");
552         }
553
554         (*printer)(env, cookie, "} header@%p\n", top);
555 }
556 EXPORT_SYMBOL(lu_object_print);
557
558 /**
559  * Check object consistency.
560  */
561 int lu_object_invariant(const struct lu_object *o)
562 {
563         struct lu_object_header *top;
564
565         top = o->lo_header;
566         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
567                 if (o->lo_ops->loo_object_invariant != NULL &&
568                     !o->lo_ops->loo_object_invariant(o))
569                         return 0;
570         }
571         return 1;
572 }
573 EXPORT_SYMBOL(lu_object_invariant);
574
575 static struct lu_object *htable_lookup(struct lu_site *s,
576                                        cfs_hash_bd_t *bd,
577                                        const struct lu_fid *f,
578                                        wait_queue_t *waiter,
579                                        __u64 *version)
580 {
581         struct lu_site_bkt_data *bkt;
582         struct lu_object_header *h;
583         struct hlist_node       *hnode;
584         __u64  ver = cfs_hash_bd_version_get(bd);
585
586         if (*version == ver)
587                 return ERR_PTR(-ENOENT);
588
589         *version = ver;
590         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
591         /* cfs_hash_bd_peek_locked is a somehow "internal" function
592          * of cfs_hash, it doesn't add refcount on object. */
593         hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
594         if (hnode == NULL) {
595                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
596                 return ERR_PTR(-ENOENT);
597         }
598
599         h = container_of0(hnode, struct lu_object_header, loh_hash);
600         if (likely(!lu_object_is_dying(h))) {
601                 cfs_hash_get(s->ls_obj_hash, hnode);
602                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
603                 list_del_init(&h->loh_lru);
604                 return lu_object_top(h);
605         }
606
607         /*
608          * Lookup found an object being destroyed this object cannot be
609          * returned (to assure that references to dying objects are eventually
610          * drained), and moreover, lookup has to wait until object is freed.
611          */
612
613         init_waitqueue_entry_current(waiter);
614         add_wait_queue(&bkt->lsb_marche_funebre, waiter);
615         set_current_state(TASK_UNINTERRUPTIBLE);
616         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
617         return ERR_PTR(-EAGAIN);
618 }
619
620 static struct lu_object *htable_lookup_nowait(struct lu_site *s,
621                                               cfs_hash_bd_t *bd,
622                                               const struct lu_fid *f)
623 {
624         struct hlist_node       *hnode;
625         struct lu_object_header *h;
626
627         /* cfs_hash_bd_peek_locked is a somehow "internal" function
628          * of cfs_hash, it doesn't add refcount on object. */
629         hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
630         if (hnode == NULL) {
631                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
632                 return ERR_PTR(-ENOENT);
633         }
634
635         h = container_of0(hnode, struct lu_object_header, loh_hash);
636         if (unlikely(lu_object_is_dying(h)))
637                 return ERR_PTR(-ENOENT);
638
639         cfs_hash_get(s->ls_obj_hash, hnode);
640         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
641         list_del_init(&h->loh_lru);
642         return lu_object_top(h);
643 }
644
645 /**
646  * Search cache for an object with the fid \a f. If such object is found,
647  * return it. Otherwise, create new object, insert it into cache and return
648  * it. In any case, additional reference is acquired on the returned object.
649  */
650 struct lu_object *lu_object_find(const struct lu_env *env,
651                                  struct lu_device *dev, const struct lu_fid *f,
652                                  const struct lu_object_conf *conf)
653 {
654         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
655 }
656 EXPORT_SYMBOL(lu_object_find);
657
658 /*
659  * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because
660  * the calculation for the number of objects to reclaim is not covered by
661  * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST.
662  * This ensures that many concurrent threads will not accidentally purge
663  * the entire cache.
664  */
665 static void lu_object_limit(const struct lu_env *env,
666                             struct lu_device *dev)
667 {
668         __u64 size, nr;
669
670         if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
671                 return;
672
673         size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
674         nr = (__u64)lu_cache_nr;
675         if (size > nr)
676                 lu_site_purge(env, dev->ld_site,
677                               MIN(size - nr, LU_CACHE_NR_MAX_ADJUST));
678
679         return;
680 }
681
682 static struct lu_object *lu_object_new(const struct lu_env *env,
683                                        struct lu_device *dev,
684                                        const struct lu_fid *f,
685                                        const struct lu_object_conf *conf)
686 {
687         struct lu_object        *o;
688         cfs_hash_t              *hs;
689         cfs_hash_bd_t            bd;
690         struct lu_site_bkt_data *bkt;
691
692         o = lu_object_alloc(env, dev, f, conf);
693         if (unlikely(IS_ERR(o)))
694                 return o;
695
696         hs = dev->ld_site->ls_obj_hash;
697         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
698         bkt = cfs_hash_bd_extra_get(hs, &bd);
699         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
700         bkt->lsb_busy++;
701         cfs_hash_bd_unlock(hs, &bd, 1);
702
703         lu_object_limit(env, dev);
704
705         return o;
706 }
707
708 /**
709  * Core logic of lu_object_find*() functions.
710  */
711 static struct lu_object *lu_object_find_try(const struct lu_env *env,
712                                             struct lu_device *dev,
713                                             const struct lu_fid *f,
714                                             const struct lu_object_conf *conf,
715                                             wait_queue_t *waiter)
716 {
717         struct lu_object      *o;
718         struct lu_object      *shadow;
719         struct lu_site        *s;
720         cfs_hash_t            *hs;
721         cfs_hash_bd_t          bd;
722         __u64                  version = 0;
723
724         /*
725          * This uses standard index maintenance protocol:
726          *
727          *     - search index under lock, and return object if found;
728          *     - otherwise, unlock index, allocate new object;
729          *     - lock index and search again;
730          *     - if nothing is found (usual case), insert newly created
731          *       object into index;
732          *     - otherwise (race: other thread inserted object), free
733          *       object just allocated.
734          *     - unlock index;
735          *     - return object.
736          *
737          * For "LOC_F_NEW" case, we are sure the object is new established.
738          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
739          * just alloc and insert directly.
740          *
741          * If dying object is found during index search, add @waiter to the
742          * site wait-queue and return ERR_PTR(-EAGAIN).
743          */
744         if (conf != NULL && conf->loc_flags & LOC_F_NEW)
745                 return lu_object_new(env, dev, f, conf);
746
747         s  = dev->ld_site;
748         hs = s->ls_obj_hash;
749         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
750         o = htable_lookup(s, &bd, f, waiter, &version);
751         cfs_hash_bd_unlock(hs, &bd, 1);
752         if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
753                 return o;
754
755         /*
756          * Allocate new object. This may result in rather complicated
757          * operations, including fld queries, inode loading, etc.
758          */
759         o = lu_object_alloc(env, dev, f, conf);
760         if (unlikely(IS_ERR(o)))
761                 return o;
762
763         LASSERT(lu_fid_eq(lu_object_fid(o), f));
764
765         cfs_hash_bd_lock(hs, &bd, 1);
766
767         shadow = htable_lookup(s, &bd, f, waiter, &version);
768         if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
769                 struct lu_site_bkt_data *bkt;
770
771                 bkt = cfs_hash_bd_extra_get(hs, &bd);
772                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
773                 bkt->lsb_busy++;
774                 cfs_hash_bd_unlock(hs, &bd, 1);
775
776                 lu_object_limit(env, dev);
777
778                 return o;
779         }
780
781         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
782         cfs_hash_bd_unlock(hs, &bd, 1);
783         lu_object_free(env, o);
784         return shadow;
785 }
786
787 /**
788  * Much like lu_object_find(), but top level device of object is specifically
789  * \a dev rather than top level device of the site. This interface allows
790  * objects of different "stacking" to be created within the same site.
791  */
792 struct lu_object *lu_object_find_at(const struct lu_env *env,
793                                     struct lu_device *dev,
794                                     const struct lu_fid *f,
795                                     const struct lu_object_conf *conf)
796 {
797         struct lu_site_bkt_data *bkt;
798         struct lu_object        *obj;
799         wait_queue_t           wait;
800
801         while (1) {
802                 obj = lu_object_find_try(env, dev, f, conf, &wait);
803                 if (obj != ERR_PTR(-EAGAIN))
804                         return obj;
805                 /*
806                  * lu_object_find_try() already added waiter into the
807                  * wait queue.
808                  */
809                 waitq_wait(&wait, TASK_UNINTERRUPTIBLE);
810                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
811                 remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
812         }
813 }
814 EXPORT_SYMBOL(lu_object_find_at);
815
816 /**
817  * Try to find the object in cache without waiting for the dead object
818  * to be released nor allocating object if no cached one was found.
819  *
820  * The found object will be set as LU_OBJECT_HEARD_BANSHEE for purging.
821  */
822 void lu_object_purge(const struct lu_env *env, struct lu_device *dev,
823                      const struct lu_fid *f)
824 {
825         struct lu_site          *s  = dev->ld_site;
826         cfs_hash_t              *hs = s->ls_obj_hash;
827         cfs_hash_bd_t            bd;
828         struct lu_object        *o;
829
830         cfs_hash_bd_get_and_lock(hs, f, &bd, 1);
831         o = htable_lookup_nowait(s, &bd, f);
832         cfs_hash_bd_unlock(hs, &bd, 1);
833         if (!IS_ERR(o)) {
834                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
835                 lu_object_put(env, o);
836         }
837 }
838 EXPORT_SYMBOL(lu_object_purge);
839
840 /**
841  * Find object with given fid, and return its slice belonging to given device.
842  */
843 struct lu_object *lu_object_find_slice(const struct lu_env *env,
844                                        struct lu_device *dev,
845                                        const struct lu_fid *f,
846                                        const struct lu_object_conf *conf)
847 {
848         struct lu_object *top;
849         struct lu_object *obj;
850
851         top = lu_object_find(env, dev, f, conf);
852         if (!IS_ERR(top)) {
853                 obj = lu_object_locate(top->lo_header, dev->ld_type);
854                 if (obj == NULL)
855                         lu_object_put(env, top);
856         } else
857                 obj = top;
858         return obj;
859 }
860 EXPORT_SYMBOL(lu_object_find_slice);
861
862 /**
863  * Global list of all device types.
864  */
865 static struct list_head lu_device_types;
866
867 int lu_device_type_init(struct lu_device_type *ldt)
868 {
869         int result = 0;
870
871         atomic_set(&ldt->ldt_device_nr, 0);
872         INIT_LIST_HEAD(&ldt->ldt_linkage);
873         if (ldt->ldt_ops->ldto_init)
874                 result = ldt->ldt_ops->ldto_init(ldt);
875
876         if (result == 0) {
877                 spin_lock(&obd_types_lock);
878                 list_add(&ldt->ldt_linkage, &lu_device_types);
879                 spin_unlock(&obd_types_lock);
880         }
881
882         return result;
883 }
884 EXPORT_SYMBOL(lu_device_type_init);
885
886 void lu_device_type_fini(struct lu_device_type *ldt)
887 {
888         spin_lock(&obd_types_lock);
889         list_del_init(&ldt->ldt_linkage);
890         spin_unlock(&obd_types_lock);
891         if (ldt->ldt_ops->ldto_fini)
892                 ldt->ldt_ops->ldto_fini(ldt);
893 }
894 EXPORT_SYMBOL(lu_device_type_fini);
895
896 /**
897  * Global list of all sites on this node
898  */
899 static struct list_head lu_sites;
900 static DEFINE_MUTEX(lu_sites_guard);
901
902 /**
903  * Global environment used by site shrinker.
904  */
905 static struct lu_env lu_shrink_env;
906
907 struct lu_site_print_arg {
908         struct lu_env   *lsp_env;
909         void            *lsp_cookie;
910         lu_printer_t     lsp_printer;
911 };
912
913 static int
914 lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd,
915                   struct hlist_node *hnode, void *data)
916 {
917         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
918         struct lu_object_header  *h;
919
920         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
921         if (!list_empty(&h->loh_layers)) {
922                 const struct lu_object *o;
923
924                 o = lu_object_top(h);
925                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
926                                 arg->lsp_printer, o);
927         } else {
928                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
929                                        arg->lsp_printer, h);
930         }
931         return 0;
932 }
933
934 /**
935  * Print all objects in \a s.
936  */
937 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
938                    lu_printer_t printer)
939 {
940         struct lu_site_print_arg arg = {
941                 .lsp_env     = (struct lu_env *)env,
942                 .lsp_cookie  = cookie,
943                 .lsp_printer = printer,
944         };
945
946         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
947 }
948 EXPORT_SYMBOL(lu_site_print);
949
950 /**
951  * Return desired hash table order.
952  */
953 static unsigned int lu_htable_order(struct lu_device *top)
954 {
955         unsigned long cache_size;
956         unsigned int  bits;
957
958         /*
959          * For ZFS based OSDs the cache should be disabled by default.  This
960          * allows the ZFS ARC maximum flexibility in determining what buffers
961          * to cache.  If Lustre has objects or buffer which it wants to ensure
962          * always stay cached it must maintain a hold on them.
963          */
964         if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
965                 lu_cache_percent = 1;
966                 lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
967                 return LU_SITE_BITS_MIN;
968         }
969
970         /*
971          * Calculate hash table size, assuming that we want reasonable
972          * performance when 20% of total memory is occupied by cache of
973          * lu_objects.
974          *
975          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
976          */
977         cache_size = totalram_pages;
978
979 #if BITS_PER_LONG == 32
980         /* limit hashtable size for lowmem systems to low RAM */
981         if (cache_size > 1 << (30 - PAGE_CACHE_SHIFT))
982                 cache_size = 1 << (30 - PAGE_CACHE_SHIFT) * 3 / 4;
983 #endif
984
985         /* clear off unreasonable cache setting. */
986         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
987                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
988                       " the range of (0, %u]. Will use default value: %u.\n",
989                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
990                       LU_CACHE_PERCENT_DEFAULT);
991
992                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
993         }
994         cache_size = cache_size / 100 * lu_cache_percent *
995                 (PAGE_CACHE_SIZE / 1024);
996
997         for (bits = 1; (1 << bits) < cache_size; ++bits) {
998                 ;
999         }
1000         return bits;
1001 }
1002
1003 static unsigned lu_obj_hop_hash(cfs_hash_t *hs,
1004                                 const void *key, unsigned mask)
1005 {
1006         struct lu_fid  *fid = (struct lu_fid *)key;
1007         __u32           hash;
1008
1009         hash = fid_flatten32(fid);
1010         hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
1011         hash = hash_long(hash, hs->hs_bkt_bits);
1012
1013         /* give me another random factor */
1014         hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
1015
1016         hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
1017         hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
1018
1019         return hash & mask;
1020 }
1021
1022 static void *lu_obj_hop_object(struct hlist_node *hnode)
1023 {
1024         return hlist_entry(hnode, struct lu_object_header, loh_hash);
1025 }
1026
1027 static void *lu_obj_hop_key(struct hlist_node *hnode)
1028 {
1029         struct lu_object_header *h;
1030
1031         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
1032         return &h->loh_fid;
1033 }
1034
1035 static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
1036 {
1037         struct lu_object_header *h;
1038
1039         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
1040         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
1041 }
1042
1043 static void lu_obj_hop_get(cfs_hash_t *hs, struct hlist_node *hnode)
1044 {
1045         struct lu_object_header *h;
1046
1047         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
1048         if (atomic_add_return(1, &h->loh_ref) == 1) {
1049                 struct lu_site_bkt_data *bkt;
1050                 cfs_hash_bd_t            bd;
1051
1052                 cfs_hash_bd_get(hs, &h->loh_fid, &bd);
1053                 bkt = cfs_hash_bd_extra_get(hs, &bd);
1054                 bkt->lsb_busy++;
1055         }
1056 }
1057
1058 static void lu_obj_hop_put_locked(cfs_hash_t *hs, struct hlist_node *hnode)
1059 {
1060         LBUG(); /* we should never called it */
1061 }
1062
1063 cfs_hash_ops_t lu_site_hash_ops = {
1064         .hs_hash        = lu_obj_hop_hash,
1065         .hs_key         = lu_obj_hop_key,
1066         .hs_keycmp      = lu_obj_hop_keycmp,
1067         .hs_object      = lu_obj_hop_object,
1068         .hs_get         = lu_obj_hop_get,
1069         .hs_put_locked  = lu_obj_hop_put_locked,
1070 };
1071
1072 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
1073 {
1074         spin_lock(&s->ls_ld_lock);
1075         if (list_empty(&d->ld_linkage))
1076                 list_add(&d->ld_linkage, &s->ls_ld_linkage);
1077         spin_unlock(&s->ls_ld_lock);
1078 }
1079 EXPORT_SYMBOL(lu_dev_add_linkage);
1080
1081 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
1082 {
1083         spin_lock(&s->ls_ld_lock);
1084         list_del_init(&d->ld_linkage);
1085         spin_unlock(&s->ls_ld_lock);
1086 }
1087 EXPORT_SYMBOL(lu_dev_del_linkage);
1088
1089 /**
1090   * Initialize site \a s, with \a d as the top level device.
1091   */
1092 int lu_site_init(struct lu_site *s, struct lu_device *top)
1093 {
1094         struct lu_site_bkt_data *bkt;
1095         cfs_hash_bd_t bd;
1096         char name[16];
1097         unsigned int bits;
1098         unsigned int i;
1099         ENTRY;
1100
1101         memset(s, 0, sizeof *s);
1102         mutex_init(&s->ls_purge_mutex);
1103         bits = lu_htable_order(top);
1104         snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
1105         for (bits = clamp_t(typeof(bits), bits,
1106                             LU_SITE_BITS_MIN, LU_SITE_BITS_MAX);
1107              bits >= LU_SITE_BITS_MIN; bits--) {
1108                 s->ls_obj_hash = cfs_hash_create(name, bits, bits,
1109                                                  bits - LU_SITE_BKT_BITS,
1110                                                  sizeof(*bkt), 0, 0,
1111                                                  &lu_site_hash_ops,
1112                                                  CFS_HASH_SPIN_BKTLOCK |
1113                                                  CFS_HASH_NO_ITEMREF |
1114                                                  CFS_HASH_DEPTH |
1115                                                  CFS_HASH_ASSERT_EMPTY |
1116                                                  CFS_HASH_COUNTER);
1117                 if (s->ls_obj_hash != NULL)
1118                         break;
1119         }
1120
1121         if (s->ls_obj_hash == NULL) {
1122                 CERROR("failed to create lu_site hash with bits: %d\n", bits);
1123                 return -ENOMEM;
1124         }
1125
1126         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
1127                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
1128                 INIT_LIST_HEAD(&bkt->lsb_lru);
1129                 init_waitqueue_head(&bkt->lsb_marche_funebre);
1130         }
1131
1132         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
1133         if (s->ls_stats == NULL) {
1134                 cfs_hash_putref(s->ls_obj_hash);
1135                 s->ls_obj_hash = NULL;
1136                 return -ENOMEM;
1137         }
1138
1139         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
1140                              0, "created", "created");
1141         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
1142                              0, "cache_hit", "cache_hit");
1143         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
1144                              0, "cache_miss", "cache_miss");
1145         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
1146                              0, "cache_race", "cache_race");
1147         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
1148                              0, "cache_death_race", "cache_death_race");
1149         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
1150                              0, "lru_purged", "lru_purged");
1151
1152         INIT_LIST_HEAD(&s->ls_linkage);
1153         s->ls_top_dev = top;
1154         top->ld_site = s;
1155         lu_device_get(top);
1156         lu_ref_add(&top->ld_reference, "site-top", s);
1157
1158         INIT_LIST_HEAD(&s->ls_ld_linkage);
1159         spin_lock_init(&s->ls_ld_lock);
1160
1161         lu_dev_add_linkage(s, top);
1162
1163         RETURN(0);
1164 }
1165 EXPORT_SYMBOL(lu_site_init);
1166
1167 /**
1168  * Finalize \a s and release its resources.
1169  */
1170 void lu_site_fini(struct lu_site *s)
1171 {
1172         mutex_lock(&lu_sites_guard);
1173         list_del_init(&s->ls_linkage);
1174         mutex_unlock(&lu_sites_guard);
1175
1176         if (s->ls_obj_hash != NULL) {
1177                 cfs_hash_putref(s->ls_obj_hash);
1178                 s->ls_obj_hash = NULL;
1179         }
1180
1181         if (s->ls_top_dev != NULL) {
1182                 s->ls_top_dev->ld_site = NULL;
1183                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1184                 lu_device_put(s->ls_top_dev);
1185                 s->ls_top_dev = NULL;
1186         }
1187
1188         if (s->ls_stats != NULL)
1189                 lprocfs_free_stats(&s->ls_stats);
1190 }
1191 EXPORT_SYMBOL(lu_site_fini);
1192
1193 /**
1194  * Called when initialization of stack for this site is completed.
1195  */
1196 int lu_site_init_finish(struct lu_site *s)
1197 {
1198         int result;
1199         mutex_lock(&lu_sites_guard);
1200         result = lu_context_refill(&lu_shrink_env.le_ctx);
1201         if (result == 0)
1202                 list_add(&s->ls_linkage, &lu_sites);
1203         mutex_unlock(&lu_sites_guard);
1204         return result;
1205 }
1206 EXPORT_SYMBOL(lu_site_init_finish);
1207
1208 /**
1209  * Acquire additional reference on device \a d
1210  */
1211 void lu_device_get(struct lu_device *d)
1212 {
1213         atomic_inc(&d->ld_ref);
1214 }
1215 EXPORT_SYMBOL(lu_device_get);
1216
1217 /**
1218  * Release reference on device \a d.
1219  */
1220 void lu_device_put(struct lu_device *d)
1221 {
1222         LASSERT(atomic_read(&d->ld_ref) > 0);
1223         atomic_dec(&d->ld_ref);
1224 }
1225 EXPORT_SYMBOL(lu_device_put);
1226
1227 /**
1228  * Initialize device \a d of type \a t.
1229  */
1230 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1231 {
1232         if (atomic_inc_return(&t->ldt_device_nr) == 1 &&
1233             t->ldt_ops->ldto_start != NULL)
1234                 t->ldt_ops->ldto_start(t);
1235
1236         memset(d, 0, sizeof *d);
1237         d->ld_type = t;
1238         lu_ref_init(&d->ld_reference);
1239         INIT_LIST_HEAD(&d->ld_linkage);
1240
1241         return 0;
1242 }
1243 EXPORT_SYMBOL(lu_device_init);
1244
1245 /**
1246  * Finalize device \a d.
1247  */
1248 void lu_device_fini(struct lu_device *d)
1249 {
1250         struct lu_device_type *t = d->ld_type;
1251
1252         if (d->ld_obd != NULL) {
1253                 d->ld_obd->obd_lu_dev = NULL;
1254                 d->ld_obd = NULL;
1255         }
1256
1257         lu_ref_fini(&d->ld_reference);
1258         LASSERTF(atomic_read(&d->ld_ref) == 0,
1259                  "Refcount is %u\n", atomic_read(&d->ld_ref));
1260         LASSERT(atomic_read(&t->ldt_device_nr) > 0);
1261
1262         if (atomic_dec_and_test(&t->ldt_device_nr) &&
1263             t->ldt_ops->ldto_stop != NULL)
1264                 t->ldt_ops->ldto_stop(t);
1265 }
1266 EXPORT_SYMBOL(lu_device_fini);
1267
1268 /**
1269  * Initialize object \a o that is part of compound object \a h and was created
1270  * by device \a d.
1271  */
1272 int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1273                    struct lu_device *d)
1274 {
1275         memset(o, 0, sizeof(*o));
1276         o->lo_header = h;
1277         o->lo_dev = d;
1278         lu_device_get(d);
1279         lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
1280         INIT_LIST_HEAD(&o->lo_linkage);
1281
1282         return 0;
1283 }
1284 EXPORT_SYMBOL(lu_object_init);
1285
1286 /**
1287  * Finalize object and release its resources.
1288  */
1289 void lu_object_fini(struct lu_object *o)
1290 {
1291         struct lu_device *dev = o->lo_dev;
1292
1293         LASSERT(list_empty(&o->lo_linkage));
1294
1295         if (dev != NULL) {
1296                 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1297                               "lu_object", o);
1298                 lu_device_put(dev);
1299                 o->lo_dev = NULL;
1300         }
1301 }
1302 EXPORT_SYMBOL(lu_object_fini);
1303
1304 /**
1305  * Add object \a o as first layer of compound object \a h
1306  *
1307  * This is typically called by the ->ldo_object_alloc() method of top-level
1308  * device.
1309  */
1310 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1311 {
1312         list_move(&o->lo_linkage, &h->loh_layers);
1313 }
1314 EXPORT_SYMBOL(lu_object_add_top);
1315
1316 /**
1317  * Add object \a o as a layer of compound object, going after \a before.
1318  *
1319  * This is typically called by the ->ldo_object_alloc() method of \a
1320  * before->lo_dev.
1321  */
1322 void lu_object_add(struct lu_object *before, struct lu_object *o)
1323 {
1324         list_move(&o->lo_linkage, &before->lo_linkage);
1325 }
1326 EXPORT_SYMBOL(lu_object_add);
1327
1328 /**
1329  * Initialize compound object.
1330  */
1331 int lu_object_header_init(struct lu_object_header *h)
1332 {
1333         memset(h, 0, sizeof *h);
1334         atomic_set(&h->loh_ref, 1);
1335         INIT_HLIST_NODE(&h->loh_hash);
1336         INIT_LIST_HEAD(&h->loh_lru);
1337         INIT_LIST_HEAD(&h->loh_layers);
1338         lu_ref_init(&h->loh_reference);
1339         return 0;
1340 }
1341 EXPORT_SYMBOL(lu_object_header_init);
1342
1343 /**
1344  * Finalize compound object.
1345  */
1346 void lu_object_header_fini(struct lu_object_header *h)
1347 {
1348         LASSERT(list_empty(&h->loh_layers));
1349         LASSERT(list_empty(&h->loh_lru));
1350         LASSERT(hlist_unhashed(&h->loh_hash));
1351         lu_ref_fini(&h->loh_reference);
1352 }
1353 EXPORT_SYMBOL(lu_object_header_fini);
1354
1355 /**
1356  * Given a compound object, find its slice, corresponding to the device type
1357  * \a dtype.
1358  */
1359 struct lu_object *lu_object_locate(struct lu_object_header *h,
1360                                    const struct lu_device_type *dtype)
1361 {
1362         struct lu_object *o;
1363
1364         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1365                 if (o->lo_dev->ld_type == dtype)
1366                         return o;
1367         }
1368         return NULL;
1369 }
1370 EXPORT_SYMBOL(lu_object_locate);
1371
1372 /**
1373  * Finalize and free devices in the device stack.
1374  *
1375  * Finalize device stack by purging object cache, and calling
1376  * lu_device_type_operations::ldto_device_fini() and
1377  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1378  */
1379 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1380 {
1381         struct lu_site   *site = top->ld_site;
1382         struct lu_device *scan;
1383         struct lu_device *next;
1384
1385         lu_site_purge(env, site, ~0);
1386         for (scan = top; scan != NULL; scan = next) {
1387                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1388                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1389                 lu_device_put(scan);
1390         }
1391
1392         /* purge again. */
1393         lu_site_purge(env, site, ~0);
1394
1395         for (scan = top; scan != NULL; scan = next) {
1396                 const struct lu_device_type *ldt = scan->ld_type;
1397                 struct obd_type             *type;
1398
1399                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1400                 type = ldt->ldt_obd_type;
1401                 if (type != NULL) {
1402                         type->typ_refcnt--;
1403                         class_put_type(type);
1404                 }
1405         }
1406 }
1407 EXPORT_SYMBOL(lu_stack_fini);
1408
1409 enum {
1410         /**
1411          * Maximal number of tld slots.
1412          */
1413         LU_CONTEXT_KEY_NR = 40
1414 };
1415
1416 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1417
1418 static DEFINE_SPINLOCK(lu_keys_guard);
1419
1420 /**
1421  * Global counter incremented whenever key is registered, unregistered,
1422  * revived or quiesced. This is used to void unnecessary calls to
1423  * lu_context_refill(). No locking is provided, as initialization and shutdown
1424  * are supposed to be externally serialized.
1425  */
1426 static unsigned key_set_version = 0;
1427
1428 /**
1429  * Register new key.
1430  */
1431 int lu_context_key_register(struct lu_context_key *key)
1432 {
1433         int result;
1434         unsigned int i;
1435
1436         LASSERT(key->lct_init != NULL);
1437         LASSERT(key->lct_fini != NULL);
1438         LASSERT(key->lct_tags != 0);
1439         LASSERT(key->lct_owner != NULL);
1440
1441         result = -ENFILE;
1442         spin_lock(&lu_keys_guard);
1443         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1444                 if (lu_keys[i] == NULL) {
1445                         key->lct_index = i;
1446                         atomic_set(&key->lct_used, 1);
1447                         lu_keys[i] = key;
1448                         lu_ref_init(&key->lct_reference);
1449                         result = 0;
1450                         ++key_set_version;
1451                         break;
1452                 }
1453         }
1454         spin_unlock(&lu_keys_guard);
1455         return result;
1456 }
1457 EXPORT_SYMBOL(lu_context_key_register);
1458
1459 static void key_fini(struct lu_context *ctx, int index)
1460 {
1461         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1462                 struct lu_context_key *key;
1463
1464                 key = lu_keys[index];
1465                 LASSERT(key != NULL);
1466                 LASSERT(key->lct_fini != NULL);
1467                 LASSERT(atomic_read(&key->lct_used) > 1);
1468
1469                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1470                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1471                 atomic_dec(&key->lct_used);
1472
1473                 LASSERT(key->lct_owner != NULL);
1474                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1475                         LINVRNT(module_refcount(key->lct_owner) > 0);
1476                         module_put(key->lct_owner);
1477                 }
1478                 ctx->lc_value[index] = NULL;
1479         }
1480 }
1481
1482 /**
1483  * Deregister key.
1484  */
1485 void lu_context_key_degister(struct lu_context_key *key)
1486 {
1487         LASSERT(atomic_read(&key->lct_used) >= 1);
1488         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1489
1490         lu_context_key_quiesce(key);
1491
1492         ++key_set_version;
1493         spin_lock(&lu_keys_guard);
1494         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1495         if (lu_keys[key->lct_index]) {
1496                 lu_keys[key->lct_index] = NULL;
1497                 lu_ref_fini(&key->lct_reference);
1498         }
1499         spin_unlock(&lu_keys_guard);
1500
1501         LASSERTF(atomic_read(&key->lct_used) == 1,
1502                  "key has instances: %d\n",
1503                  atomic_read(&key->lct_used));
1504 }
1505 EXPORT_SYMBOL(lu_context_key_degister);
1506
1507 /**
1508  * Register a number of keys. This has to be called after all keys have been
1509  * initialized by a call to LU_CONTEXT_KEY_INIT().
1510  */
1511 int lu_context_key_register_many(struct lu_context_key *k, ...)
1512 {
1513         struct lu_context_key *key = k;
1514         va_list args;
1515         int result;
1516
1517         va_start(args, k);
1518         do {
1519                 result = lu_context_key_register(key);
1520                 if (result)
1521                         break;
1522                 key = va_arg(args, struct lu_context_key *);
1523         } while (key != NULL);
1524         va_end(args);
1525
1526         if (result != 0) {
1527                 va_start(args, k);
1528                 while (k != key) {
1529                         lu_context_key_degister(k);
1530                         k = va_arg(args, struct lu_context_key *);
1531                 }
1532                 va_end(args);
1533         }
1534
1535         return result;
1536 }
1537 EXPORT_SYMBOL(lu_context_key_register_many);
1538
1539 /**
1540  * De-register a number of keys. This is a dual to
1541  * lu_context_key_register_many().
1542  */
1543 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1544 {
1545         va_list args;
1546
1547         va_start(args, k);
1548         do {
1549                 lu_context_key_degister(k);
1550                 k = va_arg(args, struct lu_context_key*);
1551         } while (k != NULL);
1552         va_end(args);
1553 }
1554 EXPORT_SYMBOL(lu_context_key_degister_many);
1555
1556 /**
1557  * Revive a number of keys.
1558  */
1559 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1560 {
1561         va_list args;
1562
1563         va_start(args, k);
1564         do {
1565                 lu_context_key_revive(k);
1566                 k = va_arg(args, struct lu_context_key*);
1567         } while (k != NULL);
1568         va_end(args);
1569 }
1570 EXPORT_SYMBOL(lu_context_key_revive_many);
1571
1572 /**
1573  * Quiescent a number of keys.
1574  */
1575 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1576 {
1577         va_list args;
1578
1579         va_start(args, k);
1580         do {
1581                 lu_context_key_quiesce(k);
1582                 k = va_arg(args, struct lu_context_key*);
1583         } while (k != NULL);
1584         va_end(args);
1585 }
1586 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1587
1588 /**
1589  * Return value associated with key \a key in context \a ctx.
1590  */
1591 void *lu_context_key_get(const struct lu_context *ctx,
1592                          const struct lu_context_key *key)
1593 {
1594         LINVRNT(ctx->lc_state == LCS_ENTERED);
1595         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1596         LASSERT(lu_keys[key->lct_index] == key);
1597         return ctx->lc_value[key->lct_index];
1598 }
1599 EXPORT_SYMBOL(lu_context_key_get);
1600
1601 /**
1602  * List of remembered contexts. XXX document me.
1603  */
1604 static struct list_head lu_context_remembered;
1605
1606 /**
1607  * Destroy \a key in all remembered contexts. This is used to destroy key
1608  * values in "shared" contexts (like service threads), when a module owning
1609  * the key is about to be unloaded.
1610  */
1611 void lu_context_key_quiesce(struct lu_context_key *key)
1612 {
1613         struct lu_context *ctx;
1614         extern unsigned cl_env_cache_purge(unsigned nr);
1615
1616         if (!(key->lct_tags & LCT_QUIESCENT)) {
1617                 /*
1618                  * XXX layering violation.
1619                  */
1620                 cl_env_cache_purge(~0);
1621                 key->lct_tags |= LCT_QUIESCENT;
1622                 /*
1623                  * XXX memory barrier has to go here.
1624                  */
1625                 spin_lock(&lu_keys_guard);
1626                 list_for_each_entry(ctx, &lu_context_remembered,
1627                                     lc_remember)
1628                         key_fini(ctx, key->lct_index);
1629                 spin_unlock(&lu_keys_guard);
1630                 ++key_set_version;
1631         }
1632 }
1633 EXPORT_SYMBOL(lu_context_key_quiesce);
1634
1635 void lu_context_key_revive(struct lu_context_key *key)
1636 {
1637         key->lct_tags &= ~LCT_QUIESCENT;
1638         ++key_set_version;
1639 }
1640 EXPORT_SYMBOL(lu_context_key_revive);
1641
1642 static void keys_fini(struct lu_context *ctx)
1643 {
1644         unsigned int i;
1645
1646         if (ctx->lc_value == NULL)
1647                 return;
1648
1649         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1650                 key_fini(ctx, i);
1651
1652         OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1653         ctx->lc_value = NULL;
1654 }
1655
1656 static int keys_fill(struct lu_context *ctx)
1657 {
1658         unsigned int i;
1659
1660         LINVRNT(ctx->lc_value != NULL);
1661         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1662                 struct lu_context_key *key;
1663
1664                 key = lu_keys[i];
1665                 if (ctx->lc_value[i] == NULL && key != NULL &&
1666                     (key->lct_tags & ctx->lc_tags) &&
1667                     /*
1668                      * Don't create values for a LCT_QUIESCENT key, as this
1669                      * will pin module owning a key.
1670                      */
1671                     !(key->lct_tags & LCT_QUIESCENT)) {
1672                         void *value;
1673
1674                         LINVRNT(key->lct_init != NULL);
1675                         LINVRNT(key->lct_index == i);
1676
1677                         value = key->lct_init(ctx, key);
1678                         if (unlikely(IS_ERR(value)))
1679                                 return PTR_ERR(value);
1680
1681                         LASSERT(key->lct_owner != NULL);
1682                         if (!(ctx->lc_tags & LCT_NOREF))
1683                                 try_module_get(key->lct_owner);
1684                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1685                         atomic_inc(&key->lct_used);
1686                         /*
1687                          * This is the only place in the code, where an
1688                          * element of ctx->lc_value[] array is set to non-NULL
1689                          * value.
1690                          */
1691                         ctx->lc_value[i] = value;
1692                         if (key->lct_exit != NULL)
1693                                 ctx->lc_tags |= LCT_HAS_EXIT;
1694                 }
1695                 ctx->lc_version = key_set_version;
1696         }
1697         return 0;
1698 }
1699
1700 static int keys_init(struct lu_context *ctx)
1701 {
1702         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1703         if (likely(ctx->lc_value != NULL))
1704                 return keys_fill(ctx);
1705
1706         return -ENOMEM;
1707 }
1708
1709 /**
1710  * Initialize context data-structure. Create values for all keys.
1711  */
1712 int lu_context_init(struct lu_context *ctx, __u32 tags)
1713 {
1714         int     rc;
1715
1716         memset(ctx, 0, sizeof *ctx);
1717         ctx->lc_state = LCS_INITIALIZED;
1718         ctx->lc_tags = tags;
1719         if (tags & LCT_REMEMBER) {
1720                 spin_lock(&lu_keys_guard);
1721                 list_add(&ctx->lc_remember, &lu_context_remembered);
1722                 spin_unlock(&lu_keys_guard);
1723         } else {
1724                 INIT_LIST_HEAD(&ctx->lc_remember);
1725         }
1726
1727         rc = keys_init(ctx);
1728         if (rc != 0)
1729                 lu_context_fini(ctx);
1730
1731         return rc;
1732 }
1733 EXPORT_SYMBOL(lu_context_init);
1734
1735 /**
1736  * Finalize context data-structure. Destroy key values.
1737  */
1738 void lu_context_fini(struct lu_context *ctx)
1739 {
1740         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1741         ctx->lc_state = LCS_FINALIZED;
1742
1743         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1744                 LASSERT(list_empty(&ctx->lc_remember));
1745                 keys_fini(ctx);
1746
1747         } else { /* could race with key degister */
1748                 spin_lock(&lu_keys_guard);
1749                 keys_fini(ctx);
1750                 list_del_init(&ctx->lc_remember);
1751                 spin_unlock(&lu_keys_guard);
1752         }
1753 }
1754 EXPORT_SYMBOL(lu_context_fini);
1755
1756 /**
1757  * Called before entering context.
1758  */
1759 void lu_context_enter(struct lu_context *ctx)
1760 {
1761         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1762         ctx->lc_state = LCS_ENTERED;
1763 }
1764 EXPORT_SYMBOL(lu_context_enter);
1765
1766 /**
1767  * Called after exiting from \a ctx
1768  */
1769 void lu_context_exit(struct lu_context *ctx)
1770 {
1771         unsigned int i;
1772
1773         LINVRNT(ctx->lc_state == LCS_ENTERED);
1774         ctx->lc_state = LCS_LEFT;
1775         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1776                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1777                         if (ctx->lc_value[i] != NULL) {
1778                                 struct lu_context_key *key;
1779
1780                                 key = lu_keys[i];
1781                                 LASSERT(key != NULL);
1782                                 if (key->lct_exit != NULL)
1783                                         key->lct_exit(ctx,
1784                                                       key, ctx->lc_value[i]);
1785                         }
1786                 }
1787         }
1788 }
1789 EXPORT_SYMBOL(lu_context_exit);
1790
1791 /**
1792  * Allocate for context all missing keys that were registered after context
1793  * creation. key_set_version is only changed in rare cases when modules
1794  * are loaded and removed.
1795  */
1796 int lu_context_refill(struct lu_context *ctx)
1797 {
1798         return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
1799 }
1800 EXPORT_SYMBOL(lu_context_refill);
1801
1802 /**
1803  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1804  * obd being added. Currently, this is only used on client side, specifically
1805  * for echo device client, for other stack (like ptlrpc threads), context are
1806  * predefined when the lu_device type are registered, during the module probe
1807  * phase.
1808  */
1809 __u32 lu_context_tags_default = 0;
1810 __u32 lu_session_tags_default = 0;
1811
1812 void lu_context_tags_update(__u32 tags)
1813 {
1814         spin_lock(&lu_keys_guard);
1815         lu_context_tags_default |= tags;
1816         key_set_version++;
1817         spin_unlock(&lu_keys_guard);
1818 }
1819 EXPORT_SYMBOL(lu_context_tags_update);
1820
1821 void lu_context_tags_clear(__u32 tags)
1822 {
1823         spin_lock(&lu_keys_guard);
1824         lu_context_tags_default &= ~tags;
1825         key_set_version++;
1826         spin_unlock(&lu_keys_guard);
1827 }
1828 EXPORT_SYMBOL(lu_context_tags_clear);
1829
1830 void lu_session_tags_update(__u32 tags)
1831 {
1832         spin_lock(&lu_keys_guard);
1833         lu_session_tags_default |= tags;
1834         key_set_version++;
1835         spin_unlock(&lu_keys_guard);
1836 }
1837 EXPORT_SYMBOL(lu_session_tags_update);
1838
1839 void lu_session_tags_clear(__u32 tags)
1840 {
1841         spin_lock(&lu_keys_guard);
1842         lu_session_tags_default &= ~tags;
1843         key_set_version++;
1844         spin_unlock(&lu_keys_guard);
1845 }
1846 EXPORT_SYMBOL(lu_session_tags_clear);
1847
1848 int lu_env_init(struct lu_env *env, __u32 tags)
1849 {
1850         int result;
1851
1852         env->le_ses = NULL;
1853         result = lu_context_init(&env->le_ctx, tags);
1854         if (likely(result == 0))
1855                 lu_context_enter(&env->le_ctx);
1856         return result;
1857 }
1858 EXPORT_SYMBOL(lu_env_init);
1859
1860 void lu_env_fini(struct lu_env *env)
1861 {
1862         lu_context_exit(&env->le_ctx);
1863         lu_context_fini(&env->le_ctx);
1864         env->le_ses = NULL;
1865 }
1866 EXPORT_SYMBOL(lu_env_fini);
1867
1868 int lu_env_refill(struct lu_env *env)
1869 {
1870         int result;
1871
1872         result = lu_context_refill(&env->le_ctx);
1873         if (result == 0 && env->le_ses != NULL)
1874                 result = lu_context_refill(env->le_ses);
1875         return result;
1876 }
1877 EXPORT_SYMBOL(lu_env_refill);
1878
1879 /**
1880  * Currently, this API will only be used by echo client.
1881  * Because echo client and normal lustre client will share
1882  * same cl_env cache. So echo client needs to refresh
1883  * the env context after it get one from the cache, especially
1884  * when normal client and echo client co-exist in the same client.
1885  */
1886 int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
1887                           __u32 stags)
1888 {
1889         int    result;
1890
1891         if ((env->le_ctx.lc_tags & ctags) != ctags) {
1892                 env->le_ctx.lc_version = 0;
1893                 env->le_ctx.lc_tags |= ctags;
1894         }
1895
1896         if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
1897                 env->le_ses->lc_version = 0;
1898                 env->le_ses->lc_tags |= stags;
1899         }
1900
1901         result = lu_env_refill(env);
1902
1903         return result;
1904 }
1905 EXPORT_SYMBOL(lu_env_refill_by_tags);
1906
1907 static struct shrinker *lu_site_shrinker;
1908
1909 typedef struct lu_site_stats{
1910         unsigned        lss_populated;
1911         unsigned        lss_max_search;
1912         unsigned        lss_total;
1913         unsigned        lss_busy;
1914 } lu_site_stats_t;
1915
1916 static void lu_site_stats_get(cfs_hash_t *hs,
1917                               lu_site_stats_t *stats, int populated)
1918 {
1919         cfs_hash_bd_t bd;
1920         unsigned int  i;
1921
1922         cfs_hash_for_each_bucket(hs, &bd, i) {
1923                 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1924                 struct hlist_head       *hhead;
1925
1926                 cfs_hash_bd_lock(hs, &bd, 1);
1927                 stats->lss_busy  += bkt->lsb_busy;
1928                 stats->lss_total += cfs_hash_bd_count_get(&bd);
1929                 stats->lss_max_search = max((int)stats->lss_max_search,
1930                                             cfs_hash_bd_depmax_get(&bd));
1931                 if (!populated) {
1932                         cfs_hash_bd_unlock(hs, &bd, 1);
1933                         continue;
1934                 }
1935
1936                 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1937                         if (!hlist_empty(hhead))
1938                                 stats->lss_populated++;
1939                 }
1940                 cfs_hash_bd_unlock(hs, &bd, 1);
1941         }
1942 }
1943
1944 #ifdef __KERNEL__
1945
1946 static unsigned long lu_cache_shrink_count(struct shrinker *sk,
1947                                            struct shrink_control *sc)
1948 {
1949         lu_site_stats_t stats;
1950         struct lu_site *s;
1951         struct lu_site *tmp;
1952         unsigned long cached = 0;
1953
1954         if (!(sc->gfp_mask & __GFP_FS))
1955                 return 0;
1956
1957         mutex_lock(&lu_sites_guard);
1958         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1959                 memset(&stats, 0, sizeof(stats));
1960                 lu_site_stats_get(s->ls_obj_hash, &stats, 0);
1961                 cached += stats.lss_total - stats.lss_busy;
1962         }
1963         mutex_unlock(&lu_sites_guard);
1964
1965         cached = (cached / 100) * sysctl_vfs_cache_pressure;
1966         CDEBUG(D_INODE, "%ld objects cached\n", cached);
1967         return cached;
1968 }
1969
1970 static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
1971                                           struct shrink_control *sc)
1972 {
1973         struct lu_site *s;
1974         struct lu_site *tmp;
1975         unsigned long remain = sc->nr_to_scan;
1976         LIST_HEAD(splice);
1977
1978         if (!(sc->gfp_mask & __GFP_FS))
1979                 /* We must not take the lu_sites_guard lock when
1980                  * __GFP_FS is *not* set because of the deadlock
1981                  * possibility detailed above. Additionally,
1982                  * since we cannot determine the number of
1983                  * objects in the cache without taking this
1984                  * lock, we're in a particularly tough spot. As
1985                  * a result, we'll just lie and say our cache is
1986                  * empty. This _should_ be ok, as we can't
1987                  * reclaim objects when __GFP_FS is *not* set
1988                  * anyways.
1989                  */
1990                 return SHRINK_STOP;
1991
1992         mutex_lock(&lu_sites_guard);
1993         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1994                 remain = lu_site_purge(&lu_shrink_env, s, remain);
1995                 /*
1996                  * Move just shrunk site to the tail of site list to
1997                  * assure shrinking fairness.
1998                  */
1999                 list_move_tail(&s->ls_linkage, &splice);
2000         }
2001         list_splice(&splice, lu_sites.prev);
2002         mutex_unlock(&lu_sites_guard);
2003
2004         return sc->nr_to_scan - remain;
2005 }
2006
2007 #ifndef HAVE_SHRINKER_COUNT
2008 /*
2009  * There exists a potential lock inversion deadlock scenario when using
2010  * Lustre on top of ZFS. This occurs between one of ZFS's
2011  * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
2012  * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
2013  * while thread B will take the ht_lock and sleep on the lu_sites_guard
2014  * lock. Obviously neither thread will wake and drop their respective hold
2015  * on their lock.
2016  *
2017  * To prevent this from happening we must ensure the lu_sites_guard lock is
2018  * not taken while down this code path. ZFS reliably does not set the
2019  * __GFP_FS bit in its code paths, so this can be used to determine if it
2020  * is safe to take the lu_sites_guard lock.
2021  *
2022  * Ideally we should accurately return the remaining number of cached
2023  * objects without taking the  lu_sites_guard lock, but this is not
2024  * possible in the current implementation.
2025  */
2026 static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2027 {
2028         int cached = 0;
2029         struct shrink_control scv = {
2030                  .nr_to_scan = shrink_param(sc, nr_to_scan),
2031                  .gfp_mask   = shrink_param(sc, gfp_mask)
2032         };
2033 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2034         struct shrinker* shrinker = NULL;
2035 #endif
2036
2037
2038         CDEBUG(D_INODE, "Shrink %lu objects\n", scv.nr_to_scan);
2039
2040         lu_cache_shrink_scan(shrinker, &scv);
2041
2042         cached = lu_cache_shrink_count(shrinker, &scv);
2043         if (scv.nr_to_scan == 0)
2044                 CDEBUG(D_INODE, "%d objects cached\n", cached);
2045         return cached;
2046 }
2047
2048 #endif /* HAVE_SHRINKER_COUNT */
2049
2050
2051 /*
2052  * Debugging stuff.
2053  */
2054
2055 /**
2056  * Environment to be used in debugger, contains all tags.
2057  */
2058 struct lu_env lu_debugging_env;
2059
2060 /**
2061  * Debugging printer function using printk().
2062  */
2063 int lu_printk_printer(const struct lu_env *env,
2064                       void *unused, const char *format, ...)
2065 {
2066         va_list args;
2067
2068         va_start(args, format);
2069         vprintk(format, args);
2070         va_end(args);
2071         return 0;
2072 }
2073
2074 int lu_debugging_setup(void)
2075 {
2076         return lu_env_init(&lu_debugging_env, ~0);
2077 }
2078
2079 void lu_context_keys_dump(void)
2080 {
2081         unsigned int i;
2082
2083         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
2084                 struct lu_context_key *key;
2085
2086                 key = lu_keys[i];
2087                 if (key != NULL) {
2088                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
2089                                i, key, key->lct_tags,
2090                                key->lct_init, key->lct_fini, key->lct_exit,
2091                                key->lct_index, atomic_read(&key->lct_used),
2092                                key->lct_owner ? key->lct_owner->name : "",
2093                                key->lct_owner);
2094                         lu_ref_print(&key->lct_reference);
2095                 }
2096         }
2097 }
2098 EXPORT_SYMBOL(lu_context_keys_dump);
2099 #endif /* __KERNEL__ */
2100
2101 /**
2102  * Initialization of global lu_* data.
2103  */
2104 int lu_global_init(void)
2105 {
2106         int result;
2107         DEF_SHRINKER_VAR(shvar, lu_cache_shrink,
2108                          lu_cache_shrink_count, lu_cache_shrink_scan);
2109
2110         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
2111
2112         INIT_LIST_HEAD(&lu_device_types);
2113         INIT_LIST_HEAD(&lu_context_remembered);
2114         INIT_LIST_HEAD(&lu_sites);
2115
2116         result = lu_ref_global_init();
2117         if (result != 0)
2118                 return result;
2119
2120         LU_CONTEXT_KEY_INIT(&lu_global_key);
2121         result = lu_context_key_register(&lu_global_key);
2122         if (result != 0)
2123                 return result;
2124
2125         /*
2126          * At this level, we don't know what tags are needed, so allocate them
2127          * conservatively. This should not be too bad, because this
2128          * environment is global.
2129          */
2130         mutex_lock(&lu_sites_guard);
2131         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
2132         mutex_unlock(&lu_sites_guard);
2133         if (result != 0)
2134                 return result;
2135
2136         /*
2137          * seeks estimation: 3 seeks to read a record from oi, one to read
2138          * inode, one for ea. Unfortunately setting this high value results in
2139          * lu_object/inode cache consuming all the memory.
2140          */
2141         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS, &shvar);
2142         if (lu_site_shrinker == NULL)
2143                 return -ENOMEM;
2144
2145         return result;
2146 }
2147
2148 /**
2149  * Dual to lu_global_init().
2150  */
2151 void lu_global_fini(void)
2152 {
2153         if (lu_site_shrinker != NULL) {
2154                 remove_shrinker(lu_site_shrinker);
2155                 lu_site_shrinker = NULL;
2156         }
2157
2158         lu_context_key_degister(&lu_global_key);
2159
2160         /*
2161          * Tear shrinker environment down _after_ de-registering
2162          * lu_global_key, because the latter has a value in the former.
2163          */
2164         mutex_lock(&lu_sites_guard);
2165         lu_env_fini(&lu_shrink_env);
2166         mutex_unlock(&lu_sites_guard);
2167
2168         lu_ref_global_fini();
2169 }
2170
2171 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
2172 {
2173 #ifdef LPROCFS
2174         struct lprocfs_counter ret;
2175
2176         lprocfs_stats_collect(stats, idx, &ret);
2177         return (__u32)ret.lc_count;
2178 #else
2179         return 0;
2180 #endif
2181 }
2182
2183 /**
2184  * Output site statistical counters into a buffer. Suitable for
2185  * lprocfs_rd_*()-style functions.
2186  */
2187 int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
2188 {
2189         lu_site_stats_t stats;
2190
2191         memset(&stats, 0, sizeof(stats));
2192         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
2193
2194         return seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
2195                           stats.lss_busy,
2196                           stats.lss_total,
2197                           stats.lss_populated,
2198                           CFS_HASH_NHLIST(s->ls_obj_hash),
2199                           stats.lss_max_search,
2200                           ls_stats_read(s->ls_stats, LU_SS_CREATED),
2201                           ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2202                           ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2203                           ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2204                           ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2205                           ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2206 }
2207 EXPORT_SYMBOL(lu_site_stats_seq_print);
2208
2209 int lu_site_stats_print(const struct lu_site *s, char *page, int count)
2210 {
2211         lu_site_stats_t stats;
2212
2213         memset(&stats, 0, sizeof(stats));
2214         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
2215
2216         return snprintf(page, count, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
2217                         stats.lss_busy,
2218                         stats.lss_total,
2219                         stats.lss_populated,
2220                         CFS_HASH_NHLIST(s->ls_obj_hash),
2221                         stats.lss_max_search,
2222                         ls_stats_read(s->ls_stats, LU_SS_CREATED),
2223                         ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2224                         ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2225                         ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2226                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2227                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2228 }
2229 EXPORT_SYMBOL(lu_site_stats_print);
2230
2231 /**
2232  * Helper function to initialize a number of kmem slab caches at once.
2233  */
2234 int lu_kmem_init(struct lu_kmem_descr *caches)
2235 {
2236         int result;
2237         struct lu_kmem_descr *iter = caches;
2238
2239         for (result = 0; iter->ckd_cache != NULL; ++iter) {
2240                 *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
2241                                                      iter->ckd_size,
2242                                                      0, 0, NULL);
2243                 if (*iter->ckd_cache == NULL) {
2244                         result = -ENOMEM;
2245                         /* free all previously allocated caches */
2246                         lu_kmem_fini(caches);
2247                         break;
2248                 }
2249         }
2250         return result;
2251 }
2252 EXPORT_SYMBOL(lu_kmem_init);
2253
2254 /**
2255  * Helper function to finalize a number of kmem slab cached at once. Dual to
2256  * lu_kmem_init().
2257  */
2258 void lu_kmem_fini(struct lu_kmem_descr *caches)
2259 {
2260         for (; caches->ckd_cache != NULL; ++caches) {
2261                 if (*caches->ckd_cache != NULL) {
2262                         kmem_cache_destroy(*caches->ckd_cache);
2263                         *caches->ckd_cache = NULL;
2264                 }
2265         }
2266 }
2267 EXPORT_SYMBOL(lu_kmem_fini);
2268
2269 /**
2270  * Temporary solution to be able to assign fid in ->do_create()
2271  * till we have fully-functional OST fids
2272  */
2273 void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
2274                           const struct lu_fid *fid)
2275 {
2276         struct lu_site          *s = o->lo_dev->ld_site;
2277         struct lu_fid           *old = &o->lo_header->loh_fid;
2278         struct lu_site_bkt_data *bkt;
2279         struct lu_object        *shadow;
2280         wait_queue_t             waiter;
2281         cfs_hash_t              *hs;
2282         cfs_hash_bd_t            bd;
2283         __u64                    version = 0;
2284
2285         LASSERT(fid_is_zero(old));
2286
2287         hs = s->ls_obj_hash;
2288         cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
2289         shadow = htable_lookup(s, &bd, fid, &waiter, &version);
2290         /* supposed to be unique */
2291         LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
2292         *old = *fid;
2293         bkt = cfs_hash_bd_extra_get(hs, &bd);
2294         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
2295         bkt->lsb_busy++;
2296         cfs_hash_bd_unlock(hs, &bd, 1);
2297 }
2298 EXPORT_SYMBOL(lu_object_assign_fid);
2299
2300 /**
2301  * allocates object with 0 (non-assiged) fid
2302  * XXX: temporary solution to be able to assign fid in ->do_create()
2303  *      till we have fully-functional OST fids
2304  */
2305 struct lu_object *lu_object_anon(const struct lu_env *env,
2306                                  struct lu_device *dev,
2307                                  const struct lu_object_conf *conf)
2308 {
2309         struct lu_fid     fid;
2310         struct lu_object *o;
2311
2312         fid_zero(&fid);
2313         o = lu_object_alloc(env, dev, &fid, conf);
2314
2315         return o;
2316 }
2317 EXPORT_SYMBOL(lu_object_anon);
2318
2319 struct lu_buf LU_BUF_NULL = {
2320         .lb_buf = NULL,
2321         .lb_len = 0
2322 };
2323 EXPORT_SYMBOL(LU_BUF_NULL);
2324
2325 void lu_buf_free(struct lu_buf *buf)
2326 {
2327         LASSERT(buf);
2328         if (buf->lb_buf) {
2329                 LASSERT(buf->lb_len > 0);
2330                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2331                 buf->lb_buf = NULL;
2332                 buf->lb_len = 0;
2333         }
2334 }
2335 EXPORT_SYMBOL(lu_buf_free);
2336
2337 void lu_buf_alloc(struct lu_buf *buf, size_t size)
2338 {
2339         LASSERT(buf);
2340         LASSERT(buf->lb_buf == NULL);
2341         LASSERT(buf->lb_len == 0);
2342         OBD_ALLOC_LARGE(buf->lb_buf, size);
2343         if (likely(buf->lb_buf))
2344                 buf->lb_len = size;
2345 }
2346 EXPORT_SYMBOL(lu_buf_alloc);
2347
2348 void lu_buf_realloc(struct lu_buf *buf, size_t size)
2349 {
2350         lu_buf_free(buf);
2351         lu_buf_alloc(buf, size);
2352 }
2353 EXPORT_SYMBOL(lu_buf_realloc);
2354
2355 struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, size_t len)
2356 {
2357         if (buf->lb_buf == NULL && buf->lb_len == 0)
2358                 lu_buf_alloc(buf, len);
2359
2360         if ((len > buf->lb_len) && (buf->lb_buf != NULL))
2361                 lu_buf_realloc(buf, len);
2362
2363         return buf;
2364 }
2365 EXPORT_SYMBOL(lu_buf_check_and_alloc);
2366
2367 /**
2368  * Increase the size of the \a buf.
2369  * preserves old data in buffer
2370  * old buffer remains unchanged on error
2371  * \retval 0 or -ENOMEM
2372  */
2373 int lu_buf_check_and_grow(struct lu_buf *buf, size_t len)
2374 {
2375         char *ptr;
2376
2377         if (len <= buf->lb_len)
2378                 return 0;
2379
2380         OBD_ALLOC_LARGE(ptr, len);
2381         if (ptr == NULL)
2382                 return -ENOMEM;
2383
2384         /* Free the old buf */
2385         if (buf->lb_buf != NULL) {
2386                 memcpy(ptr, buf->lb_buf, buf->lb_len);
2387                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2388         }
2389
2390         buf->lb_buf = ptr;
2391         buf->lb_len = len;
2392         return 0;
2393 }
2394 EXPORT_SYMBOL(lu_buf_check_and_grow);
2395