Whamcloud - gitweb
LU-5422 obdclass: Fix null pointer derefs in lu_cache_shrink()
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  *   Author: Nikita Danilov <nikita.danilov@sun.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46
47 #include <libcfs/libcfs.h>
48
49 #ifdef __KERNEL__
50 # include <linux/module.h>
51 #endif
52
53 /* hash_long() */
54 #include <libcfs/libcfs_hash.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_disk.h>
58 #include <lustre_fid.h>
59 #include <lu_object.h>
60 #include <lu_ref.h>
61 #include <libcfs/list.h>
62
63 enum {
64         LU_CACHE_PERCENT_MAX     = 50,
65         LU_CACHE_PERCENT_DEFAULT = 20
66 };
67
68 #define LU_CACHE_NR_MAX_ADJUST          128
69 #define LU_CACHE_NR_UNLIMITED           -1
70 #define LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
71 #define LU_CACHE_NR_LDISKFS_LIMIT       LU_CACHE_NR_UNLIMITED
72 /** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */
73 #define LU_CACHE_NR_ZFS_LIMIT           10240
74
75 #define LU_SITE_BITS_MIN    12
76 #define LU_SITE_BITS_MAX    24
77 /**
78  * total 256 buckets, we don't want too many buckets because:
79  * - consume too much memory
80  * - avoid unbalanced LRU list
81  */
82 #define LU_SITE_BKT_BITS    8
83
84
85 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
86 CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
87                 "Percentage of memory to be used as lu_object cache");
88
89 static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
90 CFS_MODULE_PARM(lu_cache_nr, "l", long, 0644,
91                 "Maximum number of objects in lu_object cache");
92
93 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
94
95 /**
96  * Decrease reference counter on object. If last reference is freed, return
97  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
98  * case, free object immediately.
99  */
100 void lu_object_put(const struct lu_env *env, struct lu_object *o)
101 {
102         struct lu_site_bkt_data *bkt;
103         struct lu_object_header *top;
104         struct lu_site          *site;
105         struct lu_object        *orig;
106         cfs_hash_bd_t            bd;
107         const struct lu_fid     *fid;
108
109         top  = o->lo_header;
110         site = o->lo_dev->ld_site;
111         orig = o;
112
113         /*
114          * till we have full fids-on-OST implemented anonymous objects
115          * are possible in OSP. such an object isn't listed in the site
116          * so we should not remove it from the site.
117          */
118         fid = lu_object_fid(o);
119         if (fid_is_zero(fid)) {
120                 LASSERT(top->loh_hash.next == NULL
121                         && top->loh_hash.pprev == NULL);
122                 LASSERT(list_empty(&top->loh_lru));
123                 if (!atomic_dec_and_test(&top->loh_ref))
124                         return;
125                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
126                         if (o->lo_ops->loo_object_release != NULL)
127                                 o->lo_ops->loo_object_release(env, o);
128                 }
129                 lu_object_free(env, orig);
130                 return;
131         }
132
133         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
134         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
135
136         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
137                 if (lu_object_is_dying(top)) {
138
139                         /*
140                          * somebody may be waiting for this, currently only
141                          * used for cl_object, see cl_object_put_last().
142                          */
143                         wake_up_all(&bkt->lsb_marche_funebre);
144                 }
145                 return;
146         }
147
148         LASSERT(bkt->lsb_busy > 0);
149         bkt->lsb_busy--;
150         /*
151          * When last reference is released, iterate over object
152          * layers, and notify them that object is no longer busy.
153          */
154         list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
155                 if (o->lo_ops->loo_object_release != NULL)
156                         o->lo_ops->loo_object_release(env, o);
157         }
158
159         if (!lu_object_is_dying(top)) {
160                 LASSERT(list_empty(&top->loh_lru));
161                 list_add_tail(&top->loh_lru, &bkt->lsb_lru);
162                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
163                 return;
164         }
165
166         /*
167          * If object is dying (will not be cached), removed it
168          * from hash table and LRU.
169          *
170          * This is done with hash table and LRU lists locked. As the only
171          * way to acquire first reference to previously unreferenced
172          * object is through hash-table lookup (lu_object_find()),
173          * or LRU scanning (lu_site_purge()), that are done under hash-table
174          * and LRU lock, no race with concurrent object lookup is possible
175          * and we can safely destroy object below.
176          */
177         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
178                 cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
179         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
180         /*
181          * Object was already removed from hash and lru above, can
182          * kill it.
183          */
184         lu_object_free(env, orig);
185 }
186 EXPORT_SYMBOL(lu_object_put);
187
188 /**
189  * Put object and don't keep in cache. This is temporary solution for
190  * multi-site objects when its layering is not constant.
191  */
192 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
193 {
194         set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
195         return lu_object_put(env, o);
196 }
197 EXPORT_SYMBOL(lu_object_put_nocache);
198
199 /**
200  * Kill the object and take it out of LRU cache.
201  * Currently used by client code for layout change.
202  */
203 void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
204 {
205         struct lu_object_header *top;
206
207         top = o->lo_header;
208         set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
209         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
210                 cfs_hash_t *obj_hash = o->lo_dev->ld_site->ls_obj_hash;
211                 cfs_hash_bd_t bd;
212
213                 cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
214                 list_del_init(&top->loh_lru);
215                 cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
216                 cfs_hash_bd_unlock(obj_hash, &bd, 1);
217         }
218 }
219 EXPORT_SYMBOL(lu_object_unhash);
220
221 /**
222  * Allocate new object.
223  *
224  * This follows object creation protocol, described in the comment within
225  * struct lu_device_operations definition.
226  */
227 static struct lu_object *lu_object_alloc(const struct lu_env *env,
228                                          struct lu_device *dev,
229                                          const struct lu_fid *f,
230                                          const struct lu_object_conf *conf)
231 {
232         struct lu_object *scan;
233         struct lu_object *top;
234         struct list_head *layers;
235         unsigned int init_mask = 0;
236         unsigned int init_flag;
237         int clean;
238         int result;
239         ENTRY;
240
241         /*
242          * Create top-level object slice. This will also create
243          * lu_object_header.
244          */
245         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
246         if (top == NULL)
247                 RETURN(ERR_PTR(-ENOMEM));
248         if (IS_ERR(top))
249                 RETURN(top);
250         /*
251          * This is the only place where object fid is assigned. It's constant
252          * after this point.
253          */
254         top->lo_header->loh_fid = *f;
255         layers = &top->lo_header->loh_layers;
256
257         do {
258                 /*
259                  * Call ->loo_object_init() repeatedly, until no more new
260                  * object slices are created.
261                  */
262                 clean = 1;
263                 init_flag = 1;
264                 list_for_each_entry(scan, layers, lo_linkage) {
265                         if (init_mask & init_flag)
266                                 goto next;
267                         clean = 0;
268                         scan->lo_header = top->lo_header;
269                         result = scan->lo_ops->loo_object_init(env, scan, conf);
270                         if (result != 0) {
271                                 lu_object_free(env, top);
272                                 RETURN(ERR_PTR(result));
273                         }
274                         init_mask |= init_flag;
275 next:
276                         init_flag <<= 1;
277                 }
278         } while (!clean);
279
280         list_for_each_entry_reverse(scan, layers, lo_linkage) {
281                 if (scan->lo_ops->loo_object_start != NULL) {
282                         result = scan->lo_ops->loo_object_start(env, scan);
283                         if (result != 0) {
284                                 lu_object_free(env, top);
285                                 RETURN(ERR_PTR(result));
286                         }
287                 }
288         }
289
290         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
291         RETURN(top);
292 }
293
294 /**
295  * Free an object.
296  */
297 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
298 {
299         struct lu_site_bkt_data *bkt;
300         struct lu_site          *site;
301         struct lu_object        *scan;
302         struct list_head        *layers;
303         struct list_head         splice;
304
305         site   = o->lo_dev->ld_site;
306         layers = &o->lo_header->loh_layers;
307         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
308         /*
309          * First call ->loo_object_delete() method to release all resources.
310          */
311         list_for_each_entry_reverse(scan, layers, lo_linkage) {
312                 if (scan->lo_ops->loo_object_delete != NULL)
313                         scan->lo_ops->loo_object_delete(env, scan);
314         }
315
316         /*
317          * Then, splice object layers into stand-alone list, and call
318          * ->loo_object_free() on all layers to free memory. Splice is
319          * necessary, because lu_object_header is freed together with the
320          * top-level slice.
321          */
322         INIT_LIST_HEAD(&splice);
323         list_splice_init(layers, &splice);
324         while (!list_empty(&splice)) {
325                 /*
326                  * Free layers in bottom-to-top order, so that object header
327                  * lives as long as possible and ->loo_object_free() methods
328                  * can look at its contents.
329                  */
330                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
331                 list_del_init(&o->lo_linkage);
332                 LASSERT(o->lo_ops->loo_object_free != NULL);
333                 o->lo_ops->loo_object_free(env, o);
334         }
335
336         if (waitqueue_active(&bkt->lsb_marche_funebre))
337                 wake_up_all(&bkt->lsb_marche_funebre);
338 }
339
340 /**
341  * Free \a nr objects from the cold end of the site LRU list.
342  */
343 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
344 {
345         struct lu_object_header *h;
346         struct lu_object_header *temp;
347         struct lu_site_bkt_data *bkt;
348         cfs_hash_bd_t            bd;
349         cfs_hash_bd_t            bd2;
350         struct list_head         dispose;
351         int                      did_sth;
352         int                      start;
353         int                      count;
354         int                      bnr;
355         int                      i;
356
357         if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
358                 RETURN(0);
359
360         INIT_LIST_HEAD(&dispose);
361         /*
362          * Under LRU list lock, scan LRU list and move unreferenced objects to
363          * the dispose list, removing them from LRU and hash table.
364          */
365         start = s->ls_purge_start;
366         bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
367  again:
368         /*
369          * It doesn't make any sense to make purge threads parallel, that can
370          * only bring troubles to us. See LU-5331.
371          */
372         mutex_lock(&s->ls_purge_mutex);
373         did_sth = 0;
374         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
375                 if (i < start)
376                         continue;
377                 count = bnr;
378                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
379                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
380
381                 list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
382                         LASSERT(atomic_read(&h->loh_ref) == 0);
383
384                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
385                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
386
387                         cfs_hash_bd_del_locked(s->ls_obj_hash,
388                                                &bd2, &h->loh_hash);
389                         list_move(&h->loh_lru, &dispose);
390                         if (did_sth == 0)
391                                 did_sth = 1;
392
393                         if (nr != ~0 && --nr == 0)
394                                 break;
395
396                         if (count > 0 && --count == 0)
397                                 break;
398
399                 }
400                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
401                 cond_resched();
402                 /*
403                  * Free everything on the dispose list. This is safe against
404                  * races due to the reasons described in lu_object_put().
405                  */
406                 while (!list_empty(&dispose)) {
407                         h = container_of0(dispose.next,
408                                           struct lu_object_header, loh_lru);
409                         list_del_init(&h->loh_lru);
410                         lu_object_free(env, lu_object_top(h));
411                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
412                 }
413
414                 if (nr == 0)
415                         break;
416         }
417         mutex_unlock(&s->ls_purge_mutex);
418
419         if (nr != 0 && did_sth && start != 0) {
420                 start = 0; /* restart from the first bucket */
421                 goto again;
422         }
423         /* race on s->ls_purge_start, but nobody cares */
424         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
425
426         return nr;
427 }
428 EXPORT_SYMBOL(lu_site_purge);
429
430 /*
431  * Object printing.
432  *
433  * Code below has to jump through certain loops to output object description
434  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
435  * composes object description from strings that are parts of _lines_ of
436  * output (i.e., strings that are not terminated by newline). This doesn't fit
437  * very well into libcfs_debug_msg() interface that assumes that each message
438  * supplied to it is a self-contained output line.
439  *
440  * To work around this, strings are collected in a temporary buffer
441  * (implemented as a value of lu_cdebug_key key), until terminating newline
442  * character is detected.
443  *
444  */
445
446 enum {
447         /**
448          * Maximal line size.
449          *
450          * XXX overflow is not handled correctly.
451          */
452         LU_CDEBUG_LINE = 512
453 };
454
455 struct lu_cdebug_data {
456         /**
457          * Temporary buffer.
458          */
459         char lck_area[LU_CDEBUG_LINE];
460 };
461
462 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
463 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
464
465 /**
466  * Key, holding temporary buffer. This key is registered very early by
467  * lu_global_init().
468  */
469 struct lu_context_key lu_global_key = {
470         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
471                     LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
472         .lct_init = lu_global_key_init,
473         .lct_fini = lu_global_key_fini
474 };
475
476 /**
477  * Printer function emitting messages through libcfs_debug_msg().
478  */
479 int lu_cdebug_printer(const struct lu_env *env,
480                       void *cookie, const char *format, ...)
481 {
482         struct libcfs_debug_msg_data *msgdata = cookie;
483         struct lu_cdebug_data        *key;
484         int used;
485         int complete;
486         va_list args;
487
488         va_start(args, format);
489
490         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
491         LASSERT(key != NULL);
492
493         used = strlen(key->lck_area);
494         complete = format[strlen(format) - 1] == '\n';
495         /*
496          * Append new chunk to the buffer.
497          */
498         vsnprintf(key->lck_area + used,
499                   ARRAY_SIZE(key->lck_area) - used, format, args);
500         if (complete) {
501                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
502                         libcfs_debug_msg(msgdata, "%s\n", key->lck_area);
503                 key->lck_area[0] = 0;
504         }
505         va_end(args);
506         return 0;
507 }
508 EXPORT_SYMBOL(lu_cdebug_printer);
509
510 /**
511  * Print object header.
512  */
513 void lu_object_header_print(const struct lu_env *env, void *cookie,
514                             lu_printer_t printer,
515                             const struct lu_object_header *hdr)
516 {
517         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
518                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
519                    PFID(&hdr->loh_fid),
520                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
521                    list_empty((struct list_head *)&hdr->loh_lru) ? \
522                    "" : " lru",
523                    hdr->loh_attr & LOHA_EXISTS ? " exist" : "");
524 }
525 EXPORT_SYMBOL(lu_object_header_print);
526
527 /**
528  * Print human readable representation of the \a o to the \a printer.
529  */
530 void lu_object_print(const struct lu_env *env, void *cookie,
531                      lu_printer_t printer, const struct lu_object *o)
532 {
533         static const char ruler[] = "........................................";
534         struct lu_object_header *top;
535         int depth = 4;
536
537         top = o->lo_header;
538         lu_object_header_print(env, cookie, printer, top);
539         (*printer)(env, cookie, "{\n");
540
541         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
542                 /*
543                  * print `.' \a depth times followed by type name and address
544                  */
545                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
546                            o->lo_dev->ld_type->ldt_name, o);
547
548                 if (o->lo_ops->loo_object_print != NULL)
549                         (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
550
551                 (*printer)(env, cookie, "\n");
552         }
553
554         (*printer)(env, cookie, "} header@%p\n", top);
555 }
556 EXPORT_SYMBOL(lu_object_print);
557
558 /**
559  * Check object consistency.
560  */
561 int lu_object_invariant(const struct lu_object *o)
562 {
563         struct lu_object_header *top;
564
565         top = o->lo_header;
566         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
567                 if (o->lo_ops->loo_object_invariant != NULL &&
568                     !o->lo_ops->loo_object_invariant(o))
569                         return 0;
570         }
571         return 1;
572 }
573 EXPORT_SYMBOL(lu_object_invariant);
574
575 static struct lu_object *htable_lookup(struct lu_site *s,
576                                        cfs_hash_bd_t *bd,
577                                        const struct lu_fid *f,
578                                        wait_queue_t *waiter,
579                                        __u64 *version)
580 {
581         struct lu_site_bkt_data *bkt;
582         struct lu_object_header *h;
583         struct hlist_node       *hnode;
584         __u64  ver = cfs_hash_bd_version_get(bd);
585
586         if (*version == ver)
587                 return ERR_PTR(-ENOENT);
588
589         *version = ver;
590         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
591         /* cfs_hash_bd_peek_locked is a somehow "internal" function
592          * of cfs_hash, it doesn't add refcount on object. */
593         hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
594         if (hnode == NULL) {
595                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
596                 return ERR_PTR(-ENOENT);
597         }
598
599         h = container_of0(hnode, struct lu_object_header, loh_hash);
600         if (likely(!lu_object_is_dying(h))) {
601                 cfs_hash_get(s->ls_obj_hash, hnode);
602                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
603                 list_del_init(&h->loh_lru);
604                 return lu_object_top(h);
605         }
606
607         /*
608          * Lookup found an object being destroyed this object cannot be
609          * returned (to assure that references to dying objects are eventually
610          * drained), and moreover, lookup has to wait until object is freed.
611          */
612
613         init_waitqueue_entry_current(waiter);
614         add_wait_queue(&bkt->lsb_marche_funebre, waiter);
615         set_current_state(TASK_UNINTERRUPTIBLE);
616         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
617         return ERR_PTR(-EAGAIN);
618 }
619
620 static struct lu_object *htable_lookup_nowait(struct lu_site *s,
621                                               cfs_hash_bd_t *bd,
622                                               const struct lu_fid *f)
623 {
624         struct hlist_node       *hnode;
625         struct lu_object_header *h;
626
627         /* cfs_hash_bd_peek_locked is a somehow "internal" function
628          * of cfs_hash, it doesn't add refcount on object. */
629         hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
630         if (hnode == NULL) {
631                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
632                 return ERR_PTR(-ENOENT);
633         }
634
635         h = container_of0(hnode, struct lu_object_header, loh_hash);
636         if (unlikely(lu_object_is_dying(h)))
637                 return ERR_PTR(-ENOENT);
638
639         cfs_hash_get(s->ls_obj_hash, hnode);
640         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
641         list_del_init(&h->loh_lru);
642         return lu_object_top(h);
643 }
644
645 /**
646  * Search cache for an object with the fid \a f. If such object is found,
647  * return it. Otherwise, create new object, insert it into cache and return
648  * it. In any case, additional reference is acquired on the returned object.
649  */
650 struct lu_object *lu_object_find(const struct lu_env *env,
651                                  struct lu_device *dev, const struct lu_fid *f,
652                                  const struct lu_object_conf *conf)
653 {
654         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
655 }
656 EXPORT_SYMBOL(lu_object_find);
657
658 /*
659  * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because
660  * the calculation for the number of objects to reclaim is not covered by
661  * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST.
662  * This ensures that many concurrent threads will not accidentally purge
663  * the entire cache.
664  */
665 static void lu_object_limit(const struct lu_env *env,
666                             struct lu_device *dev)
667 {
668         __u64 size, nr;
669
670         if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
671                 return;
672
673         size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
674         nr = (__u64)lu_cache_nr;
675         if (size > nr)
676                 lu_site_purge(env, dev->ld_site,
677                               MIN(size - nr, LU_CACHE_NR_MAX_ADJUST));
678
679         return;
680 }
681
682 static struct lu_object *lu_object_new(const struct lu_env *env,
683                                        struct lu_device *dev,
684                                        const struct lu_fid *f,
685                                        const struct lu_object_conf *conf)
686 {
687         struct lu_object        *o;
688         cfs_hash_t              *hs;
689         cfs_hash_bd_t            bd;
690         struct lu_site_bkt_data *bkt;
691
692         o = lu_object_alloc(env, dev, f, conf);
693         if (unlikely(IS_ERR(o)))
694                 return o;
695
696         hs = dev->ld_site->ls_obj_hash;
697         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
698         bkt = cfs_hash_bd_extra_get(hs, &bd);
699         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
700         bkt->lsb_busy++;
701         cfs_hash_bd_unlock(hs, &bd, 1);
702
703         lu_object_limit(env, dev);
704
705         return o;
706 }
707
708 /**
709  * Core logic of lu_object_find*() functions.
710  */
711 static struct lu_object *lu_object_find_try(const struct lu_env *env,
712                                             struct lu_device *dev,
713                                             const struct lu_fid *f,
714                                             const struct lu_object_conf *conf,
715                                             wait_queue_t *waiter)
716 {
717         struct lu_object      *o;
718         struct lu_object      *shadow;
719         struct lu_site        *s;
720         cfs_hash_t            *hs;
721         cfs_hash_bd_t          bd;
722         __u64                  version = 0;
723
724         /*
725          * This uses standard index maintenance protocol:
726          *
727          *     - search index under lock, and return object if found;
728          *     - otherwise, unlock index, allocate new object;
729          *     - lock index and search again;
730          *     - if nothing is found (usual case), insert newly created
731          *       object into index;
732          *     - otherwise (race: other thread inserted object), free
733          *       object just allocated.
734          *     - unlock index;
735          *     - return object.
736          *
737          * For "LOC_F_NEW" case, we are sure the object is new established.
738          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
739          * just alloc and insert directly.
740          *
741          * If dying object is found during index search, add @waiter to the
742          * site wait-queue and return ERR_PTR(-EAGAIN).
743          */
744         if (conf != NULL && conf->loc_flags & LOC_F_NEW)
745                 return lu_object_new(env, dev, f, conf);
746
747         s  = dev->ld_site;
748         hs = s->ls_obj_hash;
749         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
750         o = htable_lookup(s, &bd, f, waiter, &version);
751         cfs_hash_bd_unlock(hs, &bd, 1);
752         if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
753                 return o;
754
755         /*
756          * Allocate new object. This may result in rather complicated
757          * operations, including fld queries, inode loading, etc.
758          */
759         o = lu_object_alloc(env, dev, f, conf);
760         if (unlikely(IS_ERR(o)))
761                 return o;
762
763         LASSERT(lu_fid_eq(lu_object_fid(o), f));
764
765         cfs_hash_bd_lock(hs, &bd, 1);
766
767         shadow = htable_lookup(s, &bd, f, waiter, &version);
768         if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
769                 struct lu_site_bkt_data *bkt;
770
771                 bkt = cfs_hash_bd_extra_get(hs, &bd);
772                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
773                 bkt->lsb_busy++;
774                 cfs_hash_bd_unlock(hs, &bd, 1);
775
776                 lu_object_limit(env, dev);
777
778                 return o;
779         }
780
781         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
782         cfs_hash_bd_unlock(hs, &bd, 1);
783         lu_object_free(env, o);
784         return shadow;
785 }
786
787 /**
788  * Much like lu_object_find(), but top level device of object is specifically
789  * \a dev rather than top level device of the site. This interface allows
790  * objects of different "stacking" to be created within the same site.
791  */
792 struct lu_object *lu_object_find_at(const struct lu_env *env,
793                                     struct lu_device *dev,
794                                     const struct lu_fid *f,
795                                     const struct lu_object_conf *conf)
796 {
797         struct lu_site_bkt_data *bkt;
798         struct lu_object        *obj;
799         wait_queue_t           wait;
800
801         while (1) {
802                 obj = lu_object_find_try(env, dev, f, conf, &wait);
803                 if (obj != ERR_PTR(-EAGAIN))
804                         return obj;
805                 /*
806                  * lu_object_find_try() already added waiter into the
807                  * wait queue.
808                  */
809                 waitq_wait(&wait, TASK_UNINTERRUPTIBLE);
810                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
811                 remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
812         }
813 }
814 EXPORT_SYMBOL(lu_object_find_at);
815
816 /**
817  * Try to find the object in cache without waiting for the dead object
818  * to be released nor allocating object if no cached one was found.
819  *
820  * The found object will be set as LU_OBJECT_HEARD_BANSHEE for purging.
821  */
822 void lu_object_purge(const struct lu_env *env, struct lu_device *dev,
823                      const struct lu_fid *f)
824 {
825         struct lu_site          *s  = dev->ld_site;
826         cfs_hash_t              *hs = s->ls_obj_hash;
827         cfs_hash_bd_t            bd;
828         struct lu_object        *o;
829
830         cfs_hash_bd_get_and_lock(hs, f, &bd, 1);
831         o = htable_lookup_nowait(s, &bd, f);
832         cfs_hash_bd_unlock(hs, &bd, 1);
833         if (!IS_ERR(o)) {
834                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
835                 lu_object_put(env, o);
836         }
837 }
838 EXPORT_SYMBOL(lu_object_purge);
839
840 /**
841  * Find object with given fid, and return its slice belonging to given device.
842  */
843 struct lu_object *lu_object_find_slice(const struct lu_env *env,
844                                        struct lu_device *dev,
845                                        const struct lu_fid *f,
846                                        const struct lu_object_conf *conf)
847 {
848         struct lu_object *top;
849         struct lu_object *obj;
850
851         top = lu_object_find(env, dev, f, conf);
852         if (!IS_ERR(top)) {
853                 obj = lu_object_locate(top->lo_header, dev->ld_type);
854                 if (obj == NULL)
855                         lu_object_put(env, top);
856         } else
857                 obj = top;
858         return obj;
859 }
860 EXPORT_SYMBOL(lu_object_find_slice);
861
862 /**
863  * Global list of all device types.
864  */
865 static struct list_head lu_device_types;
866
867 int lu_device_type_init(struct lu_device_type *ldt)
868 {
869         int result = 0;
870
871         atomic_set(&ldt->ldt_device_nr, 0);
872         INIT_LIST_HEAD(&ldt->ldt_linkage);
873         if (ldt->ldt_ops->ldto_init)
874                 result = ldt->ldt_ops->ldto_init(ldt);
875
876         if (result == 0) {
877                 spin_lock(&obd_types_lock);
878                 list_add(&ldt->ldt_linkage, &lu_device_types);
879                 spin_unlock(&obd_types_lock);
880         }
881
882         return result;
883 }
884 EXPORT_SYMBOL(lu_device_type_init);
885
886 void lu_device_type_fini(struct lu_device_type *ldt)
887 {
888         spin_lock(&obd_types_lock);
889         list_del_init(&ldt->ldt_linkage);
890         spin_unlock(&obd_types_lock);
891         if (ldt->ldt_ops->ldto_fini)
892                 ldt->ldt_ops->ldto_fini(ldt);
893 }
894 EXPORT_SYMBOL(lu_device_type_fini);
895
896 /**
897  * Global list of all sites on this node
898  */
899 static struct list_head lu_sites;
900 static DEFINE_MUTEX(lu_sites_guard);
901
902 /**
903  * Global environment used by site shrinker.
904  */
905 static struct lu_env lu_shrink_env;
906
907 struct lu_site_print_arg {
908         struct lu_env   *lsp_env;
909         void            *lsp_cookie;
910         lu_printer_t     lsp_printer;
911 };
912
913 static int
914 lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd,
915                   struct hlist_node *hnode, void *data)
916 {
917         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
918         struct lu_object_header  *h;
919
920         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
921         if (!list_empty(&h->loh_layers)) {
922                 const struct lu_object *o;
923
924                 o = lu_object_top(h);
925                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
926                                 arg->lsp_printer, o);
927         } else {
928                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
929                                        arg->lsp_printer, h);
930         }
931         return 0;
932 }
933
934 /**
935  * Print all objects in \a s.
936  */
937 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
938                    lu_printer_t printer)
939 {
940         struct lu_site_print_arg arg = {
941                 .lsp_env     = (struct lu_env *)env,
942                 .lsp_cookie  = cookie,
943                 .lsp_printer = printer,
944         };
945
946         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
947 }
948 EXPORT_SYMBOL(lu_site_print);
949
950 /**
951  * Return desired hash table order.
952  */
953 static int lu_htable_order(struct lu_device *top)
954 {
955         unsigned long cache_size;
956         int bits;
957
958         /*
959          * For ZFS based OSDs the cache should be disabled by default.  This
960          * allows the ZFS ARC maximum flexibility in determining what buffers
961          * to cache.  If Lustre has objects or buffer which it wants to ensure
962          * always stay cached it must maintain a hold on them.
963          */
964         if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
965                 lu_cache_percent = 1;
966                 lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
967                 return LU_SITE_BITS_MIN;
968         }
969
970         /*
971          * Calculate hash table size, assuming that we want reasonable
972          * performance when 20% of total memory is occupied by cache of
973          * lu_objects.
974          *
975          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
976          */
977         cache_size = totalram_pages;
978
979 #if BITS_PER_LONG == 32
980         /* limit hashtable size for lowmem systems to low RAM */
981         if (cache_size > 1 << (30 - PAGE_CACHE_SHIFT))
982                 cache_size = 1 << (30 - PAGE_CACHE_SHIFT) * 3 / 4;
983 #endif
984
985         /* clear off unreasonable cache setting. */
986         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
987                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
988                       " the range of (0, %u]. Will use default value: %u.\n",
989                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
990                       LU_CACHE_PERCENT_DEFAULT);
991
992                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
993         }
994         cache_size = cache_size / 100 * lu_cache_percent *
995                 (PAGE_CACHE_SIZE / 1024);
996
997         for (bits = 1; (1 << bits) < cache_size; ++bits) {
998                 ;
999         }
1000         return bits;
1001 }
1002
1003 static unsigned lu_obj_hop_hash(cfs_hash_t *hs,
1004                                 const void *key, unsigned mask)
1005 {
1006         struct lu_fid  *fid = (struct lu_fid *)key;
1007         __u32           hash;
1008
1009         hash = fid_flatten32(fid);
1010         hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
1011         hash = hash_long(hash, hs->hs_bkt_bits);
1012
1013         /* give me another random factor */
1014         hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
1015
1016         hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
1017         hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
1018
1019         return hash & mask;
1020 }
1021
1022 static void *lu_obj_hop_object(struct hlist_node *hnode)
1023 {
1024         return hlist_entry(hnode, struct lu_object_header, loh_hash);
1025 }
1026
1027 static void *lu_obj_hop_key(struct hlist_node *hnode)
1028 {
1029         struct lu_object_header *h;
1030
1031         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
1032         return &h->loh_fid;
1033 }
1034
1035 static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
1036 {
1037         struct lu_object_header *h;
1038
1039         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
1040         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
1041 }
1042
1043 static void lu_obj_hop_get(cfs_hash_t *hs, struct hlist_node *hnode)
1044 {
1045         struct lu_object_header *h;
1046
1047         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
1048         if (atomic_add_return(1, &h->loh_ref) == 1) {
1049                 struct lu_site_bkt_data *bkt;
1050                 cfs_hash_bd_t            bd;
1051
1052                 cfs_hash_bd_get(hs, &h->loh_fid, &bd);
1053                 bkt = cfs_hash_bd_extra_get(hs, &bd);
1054                 bkt->lsb_busy++;
1055         }
1056 }
1057
1058 static void lu_obj_hop_put_locked(cfs_hash_t *hs, struct hlist_node *hnode)
1059 {
1060         LBUG(); /* we should never called it */
1061 }
1062
1063 cfs_hash_ops_t lu_site_hash_ops = {
1064         .hs_hash        = lu_obj_hop_hash,
1065         .hs_key         = lu_obj_hop_key,
1066         .hs_keycmp      = lu_obj_hop_keycmp,
1067         .hs_object      = lu_obj_hop_object,
1068         .hs_get         = lu_obj_hop_get,
1069         .hs_put_locked  = lu_obj_hop_put_locked,
1070 };
1071
1072 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
1073 {
1074         spin_lock(&s->ls_ld_lock);
1075         if (list_empty(&d->ld_linkage))
1076                 list_add(&d->ld_linkage, &s->ls_ld_linkage);
1077         spin_unlock(&s->ls_ld_lock);
1078 }
1079 EXPORT_SYMBOL(lu_dev_add_linkage);
1080
1081 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
1082 {
1083         spin_lock(&s->ls_ld_lock);
1084         list_del_init(&d->ld_linkage);
1085         spin_unlock(&s->ls_ld_lock);
1086 }
1087 EXPORT_SYMBOL(lu_dev_del_linkage);
1088
1089 /**
1090   * Initialize site \a s, with \a d as the top level device.
1091   */
1092 int lu_site_init(struct lu_site *s, struct lu_device *top)
1093 {
1094         struct lu_site_bkt_data *bkt;
1095         cfs_hash_bd_t bd;
1096         char name[16];
1097         int bits;
1098         int i;
1099         ENTRY;
1100
1101         memset(s, 0, sizeof *s);
1102         mutex_init(&s->ls_purge_mutex);
1103         bits = lu_htable_order(top);
1104         snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
1105         for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
1106              bits >= LU_SITE_BITS_MIN; bits--) {
1107                 s->ls_obj_hash = cfs_hash_create(name, bits, bits,
1108                                                  bits - LU_SITE_BKT_BITS,
1109                                                  sizeof(*bkt), 0, 0,
1110                                                  &lu_site_hash_ops,
1111                                                  CFS_HASH_SPIN_BKTLOCK |
1112                                                  CFS_HASH_NO_ITEMREF |
1113                                                  CFS_HASH_DEPTH |
1114                                                  CFS_HASH_ASSERT_EMPTY |
1115                                                  CFS_HASH_COUNTER);
1116                 if (s->ls_obj_hash != NULL)
1117                         break;
1118         }
1119
1120         if (s->ls_obj_hash == NULL) {
1121                 CERROR("failed to create lu_site hash with bits: %d\n", bits);
1122                 return -ENOMEM;
1123         }
1124
1125         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
1126                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
1127                 INIT_LIST_HEAD(&bkt->lsb_lru);
1128                 init_waitqueue_head(&bkt->lsb_marche_funebre);
1129         }
1130
1131         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
1132         if (s->ls_stats == NULL) {
1133                 cfs_hash_putref(s->ls_obj_hash);
1134                 s->ls_obj_hash = NULL;
1135                 return -ENOMEM;
1136         }
1137
1138         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
1139                              0, "created", "created");
1140         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
1141                              0, "cache_hit", "cache_hit");
1142         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
1143                              0, "cache_miss", "cache_miss");
1144         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
1145                              0, "cache_race", "cache_race");
1146         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
1147                              0, "cache_death_race", "cache_death_race");
1148         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
1149                              0, "lru_purged", "lru_purged");
1150
1151         INIT_LIST_HEAD(&s->ls_linkage);
1152         s->ls_top_dev = top;
1153         top->ld_site = s;
1154         lu_device_get(top);
1155         lu_ref_add(&top->ld_reference, "site-top", s);
1156
1157         INIT_LIST_HEAD(&s->ls_ld_linkage);
1158         spin_lock_init(&s->ls_ld_lock);
1159
1160         lu_dev_add_linkage(s, top);
1161
1162         RETURN(0);
1163 }
1164 EXPORT_SYMBOL(lu_site_init);
1165
1166 /**
1167  * Finalize \a s and release its resources.
1168  */
1169 void lu_site_fini(struct lu_site *s)
1170 {
1171         mutex_lock(&lu_sites_guard);
1172         list_del_init(&s->ls_linkage);
1173         mutex_unlock(&lu_sites_guard);
1174
1175         if (s->ls_obj_hash != NULL) {
1176                 cfs_hash_putref(s->ls_obj_hash);
1177                 s->ls_obj_hash = NULL;
1178         }
1179
1180         if (s->ls_top_dev != NULL) {
1181                 s->ls_top_dev->ld_site = NULL;
1182                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1183                 lu_device_put(s->ls_top_dev);
1184                 s->ls_top_dev = NULL;
1185         }
1186
1187         if (s->ls_stats != NULL)
1188                 lprocfs_free_stats(&s->ls_stats);
1189 }
1190 EXPORT_SYMBOL(lu_site_fini);
1191
1192 /**
1193  * Called when initialization of stack for this site is completed.
1194  */
1195 int lu_site_init_finish(struct lu_site *s)
1196 {
1197         int result;
1198         mutex_lock(&lu_sites_guard);
1199         result = lu_context_refill(&lu_shrink_env.le_ctx);
1200         if (result == 0)
1201                 list_add(&s->ls_linkage, &lu_sites);
1202         mutex_unlock(&lu_sites_guard);
1203         return result;
1204 }
1205 EXPORT_SYMBOL(lu_site_init_finish);
1206
1207 /**
1208  * Acquire additional reference on device \a d
1209  */
1210 void lu_device_get(struct lu_device *d)
1211 {
1212         atomic_inc(&d->ld_ref);
1213 }
1214 EXPORT_SYMBOL(lu_device_get);
1215
1216 /**
1217  * Release reference on device \a d.
1218  */
1219 void lu_device_put(struct lu_device *d)
1220 {
1221         LASSERT(atomic_read(&d->ld_ref) > 0);
1222         atomic_dec(&d->ld_ref);
1223 }
1224 EXPORT_SYMBOL(lu_device_put);
1225
1226 /**
1227  * Initialize device \a d of type \a t.
1228  */
1229 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1230 {
1231         if (atomic_inc_return(&t->ldt_device_nr) == 1 &&
1232             t->ldt_ops->ldto_start != NULL)
1233                 t->ldt_ops->ldto_start(t);
1234
1235         memset(d, 0, sizeof *d);
1236         d->ld_type = t;
1237         lu_ref_init(&d->ld_reference);
1238         INIT_LIST_HEAD(&d->ld_linkage);
1239
1240         return 0;
1241 }
1242 EXPORT_SYMBOL(lu_device_init);
1243
1244 /**
1245  * Finalize device \a d.
1246  */
1247 void lu_device_fini(struct lu_device *d)
1248 {
1249         struct lu_device_type *t = d->ld_type;
1250
1251         if (d->ld_obd != NULL) {
1252                 d->ld_obd->obd_lu_dev = NULL;
1253                 d->ld_obd = NULL;
1254         }
1255
1256         lu_ref_fini(&d->ld_reference);
1257         LASSERTF(atomic_read(&d->ld_ref) == 0,
1258                  "Refcount is %u\n", atomic_read(&d->ld_ref));
1259         LASSERT(atomic_read(&t->ldt_device_nr) > 0);
1260
1261         if (atomic_dec_and_test(&t->ldt_device_nr) &&
1262             t->ldt_ops->ldto_stop != NULL)
1263                 t->ldt_ops->ldto_stop(t);
1264 }
1265 EXPORT_SYMBOL(lu_device_fini);
1266
1267 /**
1268  * Initialize object \a o that is part of compound object \a h and was created
1269  * by device \a d.
1270  */
1271 int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1272                    struct lu_device *d)
1273 {
1274         memset(o, 0, sizeof(*o));
1275         o->lo_header = h;
1276         o->lo_dev = d;
1277         lu_device_get(d);
1278         lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
1279         INIT_LIST_HEAD(&o->lo_linkage);
1280
1281         return 0;
1282 }
1283 EXPORT_SYMBOL(lu_object_init);
1284
1285 /**
1286  * Finalize object and release its resources.
1287  */
1288 void lu_object_fini(struct lu_object *o)
1289 {
1290         struct lu_device *dev = o->lo_dev;
1291
1292         LASSERT(list_empty(&o->lo_linkage));
1293
1294         if (dev != NULL) {
1295                 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1296                               "lu_object", o);
1297                 lu_device_put(dev);
1298                 o->lo_dev = NULL;
1299         }
1300 }
1301 EXPORT_SYMBOL(lu_object_fini);
1302
1303 /**
1304  * Add object \a o as first layer of compound object \a h
1305  *
1306  * This is typically called by the ->ldo_object_alloc() method of top-level
1307  * device.
1308  */
1309 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1310 {
1311         list_move(&o->lo_linkage, &h->loh_layers);
1312 }
1313 EXPORT_SYMBOL(lu_object_add_top);
1314
1315 /**
1316  * Add object \a o as a layer of compound object, going after \a before.
1317  *
1318  * This is typically called by the ->ldo_object_alloc() method of \a
1319  * before->lo_dev.
1320  */
1321 void lu_object_add(struct lu_object *before, struct lu_object *o)
1322 {
1323         list_move(&o->lo_linkage, &before->lo_linkage);
1324 }
1325 EXPORT_SYMBOL(lu_object_add);
1326
1327 /**
1328  * Initialize compound object.
1329  */
1330 int lu_object_header_init(struct lu_object_header *h)
1331 {
1332         memset(h, 0, sizeof *h);
1333         atomic_set(&h->loh_ref, 1);
1334         INIT_HLIST_NODE(&h->loh_hash);
1335         INIT_LIST_HEAD(&h->loh_lru);
1336         INIT_LIST_HEAD(&h->loh_layers);
1337         lu_ref_init(&h->loh_reference);
1338         return 0;
1339 }
1340 EXPORT_SYMBOL(lu_object_header_init);
1341
1342 /**
1343  * Finalize compound object.
1344  */
1345 void lu_object_header_fini(struct lu_object_header *h)
1346 {
1347         LASSERT(list_empty(&h->loh_layers));
1348         LASSERT(list_empty(&h->loh_lru));
1349         LASSERT(hlist_unhashed(&h->loh_hash));
1350         lu_ref_fini(&h->loh_reference);
1351 }
1352 EXPORT_SYMBOL(lu_object_header_fini);
1353
1354 /**
1355  * Given a compound object, find its slice, corresponding to the device type
1356  * \a dtype.
1357  */
1358 struct lu_object *lu_object_locate(struct lu_object_header *h,
1359                                    const struct lu_device_type *dtype)
1360 {
1361         struct lu_object *o;
1362
1363         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1364                 if (o->lo_dev->ld_type == dtype)
1365                         return o;
1366         }
1367         return NULL;
1368 }
1369 EXPORT_SYMBOL(lu_object_locate);
1370
1371 /**
1372  * Finalize and free devices in the device stack.
1373  *
1374  * Finalize device stack by purging object cache, and calling
1375  * lu_device_type_operations::ldto_device_fini() and
1376  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1377  */
1378 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1379 {
1380         struct lu_site   *site = top->ld_site;
1381         struct lu_device *scan;
1382         struct lu_device *next;
1383
1384         lu_site_purge(env, site, ~0);
1385         for (scan = top; scan != NULL; scan = next) {
1386                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1387                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1388                 lu_device_put(scan);
1389         }
1390
1391         /* purge again. */
1392         lu_site_purge(env, site, ~0);
1393
1394         for (scan = top; scan != NULL; scan = next) {
1395                 const struct lu_device_type *ldt = scan->ld_type;
1396                 struct obd_type             *type;
1397
1398                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1399                 type = ldt->ldt_obd_type;
1400                 if (type != NULL) {
1401                         type->typ_refcnt--;
1402                         class_put_type(type);
1403                 }
1404         }
1405 }
1406 EXPORT_SYMBOL(lu_stack_fini);
1407
1408 enum {
1409         /**
1410          * Maximal number of tld slots.
1411          */
1412         LU_CONTEXT_KEY_NR = 40
1413 };
1414
1415 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1416
1417 static DEFINE_SPINLOCK(lu_keys_guard);
1418
1419 /**
1420  * Global counter incremented whenever key is registered, unregistered,
1421  * revived or quiesced. This is used to void unnecessary calls to
1422  * lu_context_refill(). No locking is provided, as initialization and shutdown
1423  * are supposed to be externally serialized.
1424  */
1425 static unsigned key_set_version = 0;
1426
1427 /**
1428  * Register new key.
1429  */
1430 int lu_context_key_register(struct lu_context_key *key)
1431 {
1432         int result;
1433         int i;
1434
1435         LASSERT(key->lct_init != NULL);
1436         LASSERT(key->lct_fini != NULL);
1437         LASSERT(key->lct_tags != 0);
1438         LASSERT(key->lct_owner != NULL);
1439
1440         result = -ENFILE;
1441         spin_lock(&lu_keys_guard);
1442         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1443                 if (lu_keys[i] == NULL) {
1444                         key->lct_index = i;
1445                         atomic_set(&key->lct_used, 1);
1446                         lu_keys[i] = key;
1447                         lu_ref_init(&key->lct_reference);
1448                         result = 0;
1449                         ++key_set_version;
1450                         break;
1451                 }
1452         }
1453         spin_unlock(&lu_keys_guard);
1454         return result;
1455 }
1456 EXPORT_SYMBOL(lu_context_key_register);
1457
1458 static void key_fini(struct lu_context *ctx, int index)
1459 {
1460         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1461                 struct lu_context_key *key;
1462
1463                 key = lu_keys[index];
1464                 LASSERT(key != NULL);
1465                 LASSERT(key->lct_fini != NULL);
1466                 LASSERT(atomic_read(&key->lct_used) > 1);
1467
1468                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1469                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1470                 atomic_dec(&key->lct_used);
1471
1472                 LASSERT(key->lct_owner != NULL);
1473                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1474                         LINVRNT(module_refcount(key->lct_owner) > 0);
1475                         module_put(key->lct_owner);
1476                 }
1477                 ctx->lc_value[index] = NULL;
1478         }
1479 }
1480
1481 /**
1482  * Deregister key.
1483  */
1484 void lu_context_key_degister(struct lu_context_key *key)
1485 {
1486         LASSERT(atomic_read(&key->lct_used) >= 1);
1487         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1488
1489         lu_context_key_quiesce(key);
1490
1491         ++key_set_version;
1492         spin_lock(&lu_keys_guard);
1493         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1494         if (lu_keys[key->lct_index]) {
1495                 lu_keys[key->lct_index] = NULL;
1496                 lu_ref_fini(&key->lct_reference);
1497         }
1498         spin_unlock(&lu_keys_guard);
1499
1500         LASSERTF(atomic_read(&key->lct_used) == 1,
1501                  "key has instances: %d\n",
1502                  atomic_read(&key->lct_used));
1503 }
1504 EXPORT_SYMBOL(lu_context_key_degister);
1505
1506 /**
1507  * Register a number of keys. This has to be called after all keys have been
1508  * initialized by a call to LU_CONTEXT_KEY_INIT().
1509  */
1510 int lu_context_key_register_many(struct lu_context_key *k, ...)
1511 {
1512         struct lu_context_key *key = k;
1513         va_list args;
1514         int result;
1515
1516         va_start(args, k);
1517         do {
1518                 result = lu_context_key_register(key);
1519                 if (result)
1520                         break;
1521                 key = va_arg(args, struct lu_context_key *);
1522         } while (key != NULL);
1523         va_end(args);
1524
1525         if (result != 0) {
1526                 va_start(args, k);
1527                 while (k != key) {
1528                         lu_context_key_degister(k);
1529                         k = va_arg(args, struct lu_context_key *);
1530                 }
1531                 va_end(args);
1532         }
1533
1534         return result;
1535 }
1536 EXPORT_SYMBOL(lu_context_key_register_many);
1537
1538 /**
1539  * De-register a number of keys. This is a dual to
1540  * lu_context_key_register_many().
1541  */
1542 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1543 {
1544         va_list args;
1545
1546         va_start(args, k);
1547         do {
1548                 lu_context_key_degister(k);
1549                 k = va_arg(args, struct lu_context_key*);
1550         } while (k != NULL);
1551         va_end(args);
1552 }
1553 EXPORT_SYMBOL(lu_context_key_degister_many);
1554
1555 /**
1556  * Revive a number of keys.
1557  */
1558 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1559 {
1560         va_list args;
1561
1562         va_start(args, k);
1563         do {
1564                 lu_context_key_revive(k);
1565                 k = va_arg(args, struct lu_context_key*);
1566         } while (k != NULL);
1567         va_end(args);
1568 }
1569 EXPORT_SYMBOL(lu_context_key_revive_many);
1570
1571 /**
1572  * Quiescent a number of keys.
1573  */
1574 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1575 {
1576         va_list args;
1577
1578         va_start(args, k);
1579         do {
1580                 lu_context_key_quiesce(k);
1581                 k = va_arg(args, struct lu_context_key*);
1582         } while (k != NULL);
1583         va_end(args);
1584 }
1585 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1586
1587 /**
1588  * Return value associated with key \a key in context \a ctx.
1589  */
1590 void *lu_context_key_get(const struct lu_context *ctx,
1591                          const struct lu_context_key *key)
1592 {
1593         LINVRNT(ctx->lc_state == LCS_ENTERED);
1594         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1595         LASSERT(lu_keys[key->lct_index] == key);
1596         return ctx->lc_value[key->lct_index];
1597 }
1598 EXPORT_SYMBOL(lu_context_key_get);
1599
1600 /**
1601  * List of remembered contexts. XXX document me.
1602  */
1603 static struct list_head lu_context_remembered;
1604
1605 /**
1606  * Destroy \a key in all remembered contexts. This is used to destroy key
1607  * values in "shared" contexts (like service threads), when a module owning
1608  * the key is about to be unloaded.
1609  */
1610 void lu_context_key_quiesce(struct lu_context_key *key)
1611 {
1612         struct lu_context *ctx;
1613         extern unsigned cl_env_cache_purge(unsigned nr);
1614
1615         if (!(key->lct_tags & LCT_QUIESCENT)) {
1616                 /*
1617                  * XXX layering violation.
1618                  */
1619                 cl_env_cache_purge(~0);
1620                 key->lct_tags |= LCT_QUIESCENT;
1621                 /*
1622                  * XXX memory barrier has to go here.
1623                  */
1624                 spin_lock(&lu_keys_guard);
1625                 list_for_each_entry(ctx, &lu_context_remembered,
1626                                     lc_remember)
1627                         key_fini(ctx, key->lct_index);
1628                 spin_unlock(&lu_keys_guard);
1629                 ++key_set_version;
1630         }
1631 }
1632 EXPORT_SYMBOL(lu_context_key_quiesce);
1633
1634 void lu_context_key_revive(struct lu_context_key *key)
1635 {
1636         key->lct_tags &= ~LCT_QUIESCENT;
1637         ++key_set_version;
1638 }
1639 EXPORT_SYMBOL(lu_context_key_revive);
1640
1641 static void keys_fini(struct lu_context *ctx)
1642 {
1643         int     i;
1644
1645         if (ctx->lc_value == NULL)
1646                 return;
1647
1648         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1649                 key_fini(ctx, i);
1650
1651         OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1652         ctx->lc_value = NULL;
1653 }
1654
1655 static int keys_fill(struct lu_context *ctx)
1656 {
1657         int i;
1658
1659         LINVRNT(ctx->lc_value != NULL);
1660         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1661                 struct lu_context_key *key;
1662
1663                 key = lu_keys[i];
1664                 if (ctx->lc_value[i] == NULL && key != NULL &&
1665                     (key->lct_tags & ctx->lc_tags) &&
1666                     /*
1667                      * Don't create values for a LCT_QUIESCENT key, as this
1668                      * will pin module owning a key.
1669                      */
1670                     !(key->lct_tags & LCT_QUIESCENT)) {
1671                         void *value;
1672
1673                         LINVRNT(key->lct_init != NULL);
1674                         LINVRNT(key->lct_index == i);
1675
1676                         value = key->lct_init(ctx, key);
1677                         if (unlikely(IS_ERR(value)))
1678                                 return PTR_ERR(value);
1679
1680                         LASSERT(key->lct_owner != NULL);
1681                         if (!(ctx->lc_tags & LCT_NOREF))
1682                                 try_module_get(key->lct_owner);
1683                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1684                         atomic_inc(&key->lct_used);
1685                         /*
1686                          * This is the only place in the code, where an
1687                          * element of ctx->lc_value[] array is set to non-NULL
1688                          * value.
1689                          */
1690                         ctx->lc_value[i] = value;
1691                         if (key->lct_exit != NULL)
1692                                 ctx->lc_tags |= LCT_HAS_EXIT;
1693                 }
1694                 ctx->lc_version = key_set_version;
1695         }
1696         return 0;
1697 }
1698
1699 static int keys_init(struct lu_context *ctx)
1700 {
1701         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1702         if (likely(ctx->lc_value != NULL))
1703                 return keys_fill(ctx);
1704
1705         return -ENOMEM;
1706 }
1707
1708 /**
1709  * Initialize context data-structure. Create values for all keys.
1710  */
1711 int lu_context_init(struct lu_context *ctx, __u32 tags)
1712 {
1713         int     rc;
1714
1715         memset(ctx, 0, sizeof *ctx);
1716         ctx->lc_state = LCS_INITIALIZED;
1717         ctx->lc_tags = tags;
1718         if (tags & LCT_REMEMBER) {
1719                 spin_lock(&lu_keys_guard);
1720                 list_add(&ctx->lc_remember, &lu_context_remembered);
1721                 spin_unlock(&lu_keys_guard);
1722         } else {
1723                 INIT_LIST_HEAD(&ctx->lc_remember);
1724         }
1725
1726         rc = keys_init(ctx);
1727         if (rc != 0)
1728                 lu_context_fini(ctx);
1729
1730         return rc;
1731 }
1732 EXPORT_SYMBOL(lu_context_init);
1733
1734 /**
1735  * Finalize context data-structure. Destroy key values.
1736  */
1737 void lu_context_fini(struct lu_context *ctx)
1738 {
1739         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1740         ctx->lc_state = LCS_FINALIZED;
1741
1742         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1743                 LASSERT(list_empty(&ctx->lc_remember));
1744                 keys_fini(ctx);
1745
1746         } else { /* could race with key degister */
1747                 spin_lock(&lu_keys_guard);
1748                 keys_fini(ctx);
1749                 list_del_init(&ctx->lc_remember);
1750                 spin_unlock(&lu_keys_guard);
1751         }
1752 }
1753 EXPORT_SYMBOL(lu_context_fini);
1754
1755 /**
1756  * Called before entering context.
1757  */
1758 void lu_context_enter(struct lu_context *ctx)
1759 {
1760         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1761         ctx->lc_state = LCS_ENTERED;
1762 }
1763 EXPORT_SYMBOL(lu_context_enter);
1764
1765 /**
1766  * Called after exiting from \a ctx
1767  */
1768 void lu_context_exit(struct lu_context *ctx)
1769 {
1770         int i;
1771
1772         LINVRNT(ctx->lc_state == LCS_ENTERED);
1773         ctx->lc_state = LCS_LEFT;
1774         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1775                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1776                         if (ctx->lc_value[i] != NULL) {
1777                                 struct lu_context_key *key;
1778
1779                                 key = lu_keys[i];
1780                                 LASSERT(key != NULL);
1781                                 if (key->lct_exit != NULL)
1782                                         key->lct_exit(ctx,
1783                                                       key, ctx->lc_value[i]);
1784                         }
1785                 }
1786         }
1787 }
1788 EXPORT_SYMBOL(lu_context_exit);
1789
1790 /**
1791  * Allocate for context all missing keys that were registered after context
1792  * creation. key_set_version is only changed in rare cases when modules
1793  * are loaded and removed.
1794  */
1795 int lu_context_refill(struct lu_context *ctx)
1796 {
1797         return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
1798 }
1799 EXPORT_SYMBOL(lu_context_refill);
1800
1801 /**
1802  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1803  * obd being added. Currently, this is only used on client side, specifically
1804  * for echo device client, for other stack (like ptlrpc threads), context are
1805  * predefined when the lu_device type are registered, during the module probe
1806  * phase.
1807  */
1808 __u32 lu_context_tags_default = 0;
1809 __u32 lu_session_tags_default = 0;
1810
1811 void lu_context_tags_update(__u32 tags)
1812 {
1813         spin_lock(&lu_keys_guard);
1814         lu_context_tags_default |= tags;
1815         key_set_version++;
1816         spin_unlock(&lu_keys_guard);
1817 }
1818 EXPORT_SYMBOL(lu_context_tags_update);
1819
1820 void lu_context_tags_clear(__u32 tags)
1821 {
1822         spin_lock(&lu_keys_guard);
1823         lu_context_tags_default &= ~tags;
1824         key_set_version++;
1825         spin_unlock(&lu_keys_guard);
1826 }
1827 EXPORT_SYMBOL(lu_context_tags_clear);
1828
1829 void lu_session_tags_update(__u32 tags)
1830 {
1831         spin_lock(&lu_keys_guard);
1832         lu_session_tags_default |= tags;
1833         key_set_version++;
1834         spin_unlock(&lu_keys_guard);
1835 }
1836 EXPORT_SYMBOL(lu_session_tags_update);
1837
1838 void lu_session_tags_clear(__u32 tags)
1839 {
1840         spin_lock(&lu_keys_guard);
1841         lu_session_tags_default &= ~tags;
1842         key_set_version++;
1843         spin_unlock(&lu_keys_guard);
1844 }
1845 EXPORT_SYMBOL(lu_session_tags_clear);
1846
1847 int lu_env_init(struct lu_env *env, __u32 tags)
1848 {
1849         int result;
1850
1851         env->le_ses = NULL;
1852         result = lu_context_init(&env->le_ctx, tags);
1853         if (likely(result == 0))
1854                 lu_context_enter(&env->le_ctx);
1855         return result;
1856 }
1857 EXPORT_SYMBOL(lu_env_init);
1858
1859 void lu_env_fini(struct lu_env *env)
1860 {
1861         lu_context_exit(&env->le_ctx);
1862         lu_context_fini(&env->le_ctx);
1863         env->le_ses = NULL;
1864 }
1865 EXPORT_SYMBOL(lu_env_fini);
1866
1867 int lu_env_refill(struct lu_env *env)
1868 {
1869         int result;
1870
1871         result = lu_context_refill(&env->le_ctx);
1872         if (result == 0 && env->le_ses != NULL)
1873                 result = lu_context_refill(env->le_ses);
1874         return result;
1875 }
1876 EXPORT_SYMBOL(lu_env_refill);
1877
1878 /**
1879  * Currently, this API will only be used by echo client.
1880  * Because echo client and normal lustre client will share
1881  * same cl_env cache. So echo client needs to refresh
1882  * the env context after it get one from the cache, especially
1883  * when normal client and echo client co-exist in the same client.
1884  */
1885 int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
1886                           __u32 stags)
1887 {
1888         int    result;
1889
1890         if ((env->le_ctx.lc_tags & ctags) != ctags) {
1891                 env->le_ctx.lc_version = 0;
1892                 env->le_ctx.lc_tags |= ctags;
1893         }
1894
1895         if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
1896                 env->le_ses->lc_version = 0;
1897                 env->le_ses->lc_tags |= stags;
1898         }
1899
1900         result = lu_env_refill(env);
1901
1902         return result;
1903 }
1904 EXPORT_SYMBOL(lu_env_refill_by_tags);
1905
1906 static struct shrinker *lu_site_shrinker;
1907
1908 typedef struct lu_site_stats{
1909         unsigned        lss_populated;
1910         unsigned        lss_max_search;
1911         unsigned        lss_total;
1912         unsigned        lss_busy;
1913 } lu_site_stats_t;
1914
1915 static void lu_site_stats_get(cfs_hash_t *hs,
1916                               lu_site_stats_t *stats, int populated)
1917 {
1918         cfs_hash_bd_t bd;
1919         int           i;
1920
1921         cfs_hash_for_each_bucket(hs, &bd, i) {
1922                 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1923                 struct hlist_head       *hhead;
1924
1925                 cfs_hash_bd_lock(hs, &bd, 1);
1926                 stats->lss_busy  += bkt->lsb_busy;
1927                 stats->lss_total += cfs_hash_bd_count_get(&bd);
1928                 stats->lss_max_search = max((int)stats->lss_max_search,
1929                                             cfs_hash_bd_depmax_get(&bd));
1930                 if (!populated) {
1931                         cfs_hash_bd_unlock(hs, &bd, 1);
1932                         continue;
1933                 }
1934
1935                 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1936                         if (!hlist_empty(hhead))
1937                                 stats->lss_populated++;
1938                 }
1939                 cfs_hash_bd_unlock(hs, &bd, 1);
1940         }
1941 }
1942
1943 #ifdef __KERNEL__
1944
1945 static unsigned long lu_cache_shrink_count(struct shrinker *sk,
1946                                            struct shrink_control *sc)
1947 {
1948         lu_site_stats_t stats;
1949         struct lu_site *s;
1950         struct lu_site *tmp;
1951         unsigned long cached = 0;
1952
1953         if (!(sc->gfp_mask & __GFP_FS))
1954                 return 0;
1955
1956         mutex_lock(&lu_sites_guard);
1957         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1958                 memset(&stats, 0, sizeof(stats));
1959                 lu_site_stats_get(s->ls_obj_hash, &stats, 0);
1960                 cached += stats.lss_total - stats.lss_busy;
1961         }
1962         mutex_unlock(&lu_sites_guard);
1963
1964         cached = (cached / 100) * sysctl_vfs_cache_pressure;
1965         CDEBUG(D_INODE, "%ld objects cached\n", cached);
1966         return cached;
1967 }
1968
1969 static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
1970                                           struct shrink_control *sc)
1971 {
1972         struct lu_site *s;
1973         struct lu_site *tmp;
1974         unsigned long remain = sc->nr_to_scan;
1975         LIST_HEAD(splice);
1976
1977         if (!(sc->gfp_mask & __GFP_FS))
1978                 /* We must not take the lu_sites_guard lock when
1979                  * __GFP_FS is *not* set because of the deadlock
1980                  * possibility detailed above. Additionally,
1981                  * since we cannot determine the number of
1982                  * objects in the cache without taking this
1983                  * lock, we're in a particularly tough spot. As
1984                  * a result, we'll just lie and say our cache is
1985                  * empty. This _should_ be ok, as we can't
1986                  * reclaim objects when __GFP_FS is *not* set
1987                  * anyways.
1988                  */
1989                 return SHRINK_STOP;
1990
1991         mutex_lock(&lu_sites_guard);
1992         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1993                 remain = lu_site_purge(&lu_shrink_env, s, remain);
1994                 /*
1995                  * Move just shrunk site to the tail of site list to
1996                  * assure shrinking fairness.
1997                  */
1998                 list_move_tail(&s->ls_linkage, &splice);
1999         }
2000         list_splice(&splice, lu_sites.prev);
2001         mutex_unlock(&lu_sites_guard);
2002
2003         return sc->nr_to_scan - remain;
2004 }
2005
2006 #ifndef HAVE_SHRINKER_COUNT
2007 /*
2008  * There exists a potential lock inversion deadlock scenario when using
2009  * Lustre on top of ZFS. This occurs between one of ZFS's
2010  * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
2011  * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
2012  * while thread B will take the ht_lock and sleep on the lu_sites_guard
2013  * lock. Obviously neither thread will wake and drop their respective hold
2014  * on their lock.
2015  *
2016  * To prevent this from happening we must ensure the lu_sites_guard lock is
2017  * not taken while down this code path. ZFS reliably does not set the
2018  * __GFP_FS bit in its code paths, so this can be used to determine if it
2019  * is safe to take the lu_sites_guard lock.
2020  *
2021  * Ideally we should accurately return the remaining number of cached
2022  * objects without taking the  lu_sites_guard lock, but this is not
2023  * possible in the current implementation.
2024  */
2025 static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2026 {
2027         int cached = 0;
2028         struct shrink_control scv = {
2029                  .nr_to_scan = shrink_param(sc, nr_to_scan),
2030                  .gfp_mask   = shrink_param(sc, gfp_mask)
2031         };
2032 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2033         struct shrinker* shrinker = NULL;
2034 #endif
2035
2036
2037         CDEBUG(D_INODE, "Shrink %lu objects\n", scv.nr_to_scan);
2038
2039         lu_cache_shrink_scan(shrinker, &scv);
2040
2041         cached = lu_cache_shrink_count(shrinker, &scv);
2042         if (scv.nr_to_scan == 0)
2043                 CDEBUG(D_INODE, "%d objects cached\n", cached);
2044         return cached;
2045 }
2046
2047 #endif /* HAVE_SHRINKER_COUNT */
2048
2049
2050 /*
2051  * Debugging stuff.
2052  */
2053
2054 /**
2055  * Environment to be used in debugger, contains all tags.
2056  */
2057 struct lu_env lu_debugging_env;
2058
2059 /**
2060  * Debugging printer function using printk().
2061  */
2062 int lu_printk_printer(const struct lu_env *env,
2063                       void *unused, const char *format, ...)
2064 {
2065         va_list args;
2066
2067         va_start(args, format);
2068         vprintk(format, args);
2069         va_end(args);
2070         return 0;
2071 }
2072
2073 int lu_debugging_setup(void)
2074 {
2075         return lu_env_init(&lu_debugging_env, ~0);
2076 }
2077
2078 void lu_context_keys_dump(void)
2079 {
2080         int i;
2081
2082         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
2083                 struct lu_context_key *key;
2084
2085                 key = lu_keys[i];
2086                 if (key != NULL) {
2087                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
2088                                i, key, key->lct_tags,
2089                                key->lct_init, key->lct_fini, key->lct_exit,
2090                                key->lct_index, atomic_read(&key->lct_used),
2091                                key->lct_owner ? key->lct_owner->name : "",
2092                                key->lct_owner);
2093                         lu_ref_print(&key->lct_reference);
2094                 }
2095         }
2096 }
2097 EXPORT_SYMBOL(lu_context_keys_dump);
2098 #endif /* __KERNEL__ */
2099
2100 /**
2101  * Initialization of global lu_* data.
2102  */
2103 int lu_global_init(void)
2104 {
2105         int result;
2106         DEF_SHRINKER_VAR(shvar, lu_cache_shrink,
2107                          lu_cache_shrink_count, lu_cache_shrink_scan);
2108
2109         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
2110
2111         INIT_LIST_HEAD(&lu_device_types);
2112         INIT_LIST_HEAD(&lu_context_remembered);
2113         INIT_LIST_HEAD(&lu_sites);
2114
2115         result = lu_ref_global_init();
2116         if (result != 0)
2117                 return result;
2118
2119         LU_CONTEXT_KEY_INIT(&lu_global_key);
2120         result = lu_context_key_register(&lu_global_key);
2121         if (result != 0)
2122                 return result;
2123
2124         /*
2125          * At this level, we don't know what tags are needed, so allocate them
2126          * conservatively. This should not be too bad, because this
2127          * environment is global.
2128          */
2129         mutex_lock(&lu_sites_guard);
2130         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
2131         mutex_unlock(&lu_sites_guard);
2132         if (result != 0)
2133                 return result;
2134
2135         /*
2136          * seeks estimation: 3 seeks to read a record from oi, one to read
2137          * inode, one for ea. Unfortunately setting this high value results in
2138          * lu_object/inode cache consuming all the memory.
2139          */
2140         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS, &shvar);
2141         if (lu_site_shrinker == NULL)
2142                 return -ENOMEM;
2143
2144         return result;
2145 }
2146
2147 /**
2148  * Dual to lu_global_init().
2149  */
2150 void lu_global_fini(void)
2151 {
2152         if (lu_site_shrinker != NULL) {
2153                 remove_shrinker(lu_site_shrinker);
2154                 lu_site_shrinker = NULL;
2155         }
2156
2157         lu_context_key_degister(&lu_global_key);
2158
2159         /*
2160          * Tear shrinker environment down _after_ de-registering
2161          * lu_global_key, because the latter has a value in the former.
2162          */
2163         mutex_lock(&lu_sites_guard);
2164         lu_env_fini(&lu_shrink_env);
2165         mutex_unlock(&lu_sites_guard);
2166
2167         lu_ref_global_fini();
2168 }
2169
2170 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
2171 {
2172 #ifdef LPROCFS
2173         struct lprocfs_counter ret;
2174
2175         lprocfs_stats_collect(stats, idx, &ret);
2176         return (__u32)ret.lc_count;
2177 #else
2178         return 0;
2179 #endif
2180 }
2181
2182 /**
2183  * Output site statistical counters into a buffer. Suitable for
2184  * lprocfs_rd_*()-style functions.
2185  */
2186 int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
2187 {
2188         lu_site_stats_t stats;
2189
2190         memset(&stats, 0, sizeof(stats));
2191         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
2192
2193         return seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
2194                           stats.lss_busy,
2195                           stats.lss_total,
2196                           stats.lss_populated,
2197                           CFS_HASH_NHLIST(s->ls_obj_hash),
2198                           stats.lss_max_search,
2199                           ls_stats_read(s->ls_stats, LU_SS_CREATED),
2200                           ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2201                           ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2202                           ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2203                           ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2204                           ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2205 }
2206 EXPORT_SYMBOL(lu_site_stats_seq_print);
2207
2208 int lu_site_stats_print(const struct lu_site *s, char *page, int count)
2209 {
2210         lu_site_stats_t stats;
2211
2212         memset(&stats, 0, sizeof(stats));
2213         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
2214
2215         return snprintf(page, count, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
2216                         stats.lss_busy,
2217                         stats.lss_total,
2218                         stats.lss_populated,
2219                         CFS_HASH_NHLIST(s->ls_obj_hash),
2220                         stats.lss_max_search,
2221                         ls_stats_read(s->ls_stats, LU_SS_CREATED),
2222                         ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2223                         ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2224                         ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2225                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2226                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2227 }
2228 EXPORT_SYMBOL(lu_site_stats_print);
2229
2230 /**
2231  * Helper function to initialize a number of kmem slab caches at once.
2232  */
2233 int lu_kmem_init(struct lu_kmem_descr *caches)
2234 {
2235         int result;
2236         struct lu_kmem_descr *iter = caches;
2237
2238         for (result = 0; iter->ckd_cache != NULL; ++iter) {
2239                 *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
2240                                                      iter->ckd_size,
2241                                                      0, 0, NULL);
2242                 if (*iter->ckd_cache == NULL) {
2243                         result = -ENOMEM;
2244                         /* free all previously allocated caches */
2245                         lu_kmem_fini(caches);
2246                         break;
2247                 }
2248         }
2249         return result;
2250 }
2251 EXPORT_SYMBOL(lu_kmem_init);
2252
2253 /**
2254  * Helper function to finalize a number of kmem slab cached at once. Dual to
2255  * lu_kmem_init().
2256  */
2257 void lu_kmem_fini(struct lu_kmem_descr *caches)
2258 {
2259         for (; caches->ckd_cache != NULL; ++caches) {
2260                 if (*caches->ckd_cache != NULL) {
2261                         kmem_cache_destroy(*caches->ckd_cache);
2262                         *caches->ckd_cache = NULL;
2263                 }
2264         }
2265 }
2266 EXPORT_SYMBOL(lu_kmem_fini);
2267
2268 /**
2269  * Temporary solution to be able to assign fid in ->do_create()
2270  * till we have fully-functional OST fids
2271  */
2272 void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
2273                           const struct lu_fid *fid)
2274 {
2275         struct lu_site          *s = o->lo_dev->ld_site;
2276         struct lu_fid           *old = &o->lo_header->loh_fid;
2277         struct lu_site_bkt_data *bkt;
2278         struct lu_object        *shadow;
2279         wait_queue_t             waiter;
2280         cfs_hash_t              *hs;
2281         cfs_hash_bd_t            bd;
2282         __u64                    version = 0;
2283
2284         LASSERT(fid_is_zero(old));
2285
2286         hs = s->ls_obj_hash;
2287         cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
2288         shadow = htable_lookup(s, &bd, fid, &waiter, &version);
2289         /* supposed to be unique */
2290         LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
2291         *old = *fid;
2292         bkt = cfs_hash_bd_extra_get(hs, &bd);
2293         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
2294         bkt->lsb_busy++;
2295         cfs_hash_bd_unlock(hs, &bd, 1);
2296 }
2297 EXPORT_SYMBOL(lu_object_assign_fid);
2298
2299 /**
2300  * allocates object with 0 (non-assiged) fid
2301  * XXX: temporary solution to be able to assign fid in ->do_create()
2302  *      till we have fully-functional OST fids
2303  */
2304 struct lu_object *lu_object_anon(const struct lu_env *env,
2305                                  struct lu_device *dev,
2306                                  const struct lu_object_conf *conf)
2307 {
2308         struct lu_fid     fid;
2309         struct lu_object *o;
2310
2311         fid_zero(&fid);
2312         o = lu_object_alloc(env, dev, &fid, conf);
2313
2314         return o;
2315 }
2316 EXPORT_SYMBOL(lu_object_anon);
2317
2318 struct lu_buf LU_BUF_NULL = {
2319         .lb_buf = NULL,
2320         .lb_len = 0
2321 };
2322 EXPORT_SYMBOL(LU_BUF_NULL);
2323
2324 void lu_buf_free(struct lu_buf *buf)
2325 {
2326         LASSERT(buf);
2327         if (buf->lb_buf) {
2328                 LASSERT(buf->lb_len > 0);
2329                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2330                 buf->lb_buf = NULL;
2331                 buf->lb_len = 0;
2332         }
2333 }
2334 EXPORT_SYMBOL(lu_buf_free);
2335
2336 void lu_buf_alloc(struct lu_buf *buf, int size)
2337 {
2338         LASSERT(buf);
2339         LASSERT(buf->lb_buf == NULL);
2340         LASSERT(buf->lb_len == 0);
2341         OBD_ALLOC_LARGE(buf->lb_buf, size);
2342         if (likely(buf->lb_buf))
2343                 buf->lb_len = size;
2344 }
2345 EXPORT_SYMBOL(lu_buf_alloc);
2346
2347 void lu_buf_realloc(struct lu_buf *buf, int size)
2348 {
2349         lu_buf_free(buf);
2350         lu_buf_alloc(buf, size);
2351 }
2352 EXPORT_SYMBOL(lu_buf_realloc);
2353
2354 struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, int len)
2355 {
2356         if (buf->lb_buf == NULL && buf->lb_len == 0)
2357                 lu_buf_alloc(buf, len);
2358
2359         if ((len > buf->lb_len) && (buf->lb_buf != NULL))
2360                 lu_buf_realloc(buf, len);
2361
2362         return buf;
2363 }
2364 EXPORT_SYMBOL(lu_buf_check_and_alloc);
2365
2366 /**
2367  * Increase the size of the \a buf.
2368  * preserves old data in buffer
2369  * old buffer remains unchanged on error
2370  * \retval 0 or -ENOMEM
2371  */
2372 int lu_buf_check_and_grow(struct lu_buf *buf, int len)
2373 {
2374         char *ptr;
2375
2376         if (len <= buf->lb_len)
2377                 return 0;
2378
2379         OBD_ALLOC_LARGE(ptr, len);
2380         if (ptr == NULL)
2381                 return -ENOMEM;
2382
2383         /* Free the old buf */
2384         if (buf->lb_buf != NULL) {
2385                 memcpy(ptr, buf->lb_buf, buf->lb_len);
2386                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2387         }
2388
2389         buf->lb_buf = ptr;
2390         buf->lb_len = len;
2391         return 0;
2392 }
2393 EXPORT_SYMBOL(lu_buf_check_and_grow);
2394