Whamcloud - gitweb
LU-3963 obdclass: convert to linux list api
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  *   Author: Nikita Danilov <nikita.danilov@sun.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46
47 #include <libcfs/libcfs.h>
48
49 #ifdef __KERNEL__
50 # include <linux/module.h>
51 #endif
52
53 /* hash_long() */
54 #include <libcfs/libcfs_hash.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_disk.h>
58 #include <lustre_fid.h>
59 #include <lu_object.h>
60 #include <lu_ref.h>
61 #include <libcfs/list.h>
62
63 enum {
64         LU_CACHE_PERCENT_MAX     = 50,
65         LU_CACHE_PERCENT_DEFAULT = 20
66 };
67
68 #define LU_CACHE_NR_MAX_ADJUST          128
69 #define LU_CACHE_NR_UNLIMITED           -1
70 #define LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
71 #define LU_CACHE_NR_LDISKFS_LIMIT       LU_CACHE_NR_UNLIMITED
72 /** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */
73 #define LU_CACHE_NR_ZFS_LIMIT           10240
74
75 #define LU_SITE_BITS_MIN    12
76 #define LU_SITE_BITS_MAX    24
77 /**
78  * total 256 buckets, we don't want too many buckets because:
79  * - consume too much memory
80  * - avoid unbalanced LRU list
81  */
82 #define LU_SITE_BKT_BITS    8
83
84
85 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
86 CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
87                 "Percentage of memory to be used as lu_object cache");
88
89 static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
90 CFS_MODULE_PARM(lu_cache_nr, "l", long, 0644,
91                 "Maximum number of objects in lu_object cache");
92
93 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
94
95 /**
96  * Decrease reference counter on object. If last reference is freed, return
97  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
98  * case, free object immediately.
99  */
100 void lu_object_put(const struct lu_env *env, struct lu_object *o)
101 {
102         struct lu_site_bkt_data *bkt;
103         struct lu_object_header *top;
104         struct lu_site          *site;
105         struct lu_object        *orig;
106         cfs_hash_bd_t            bd;
107         const struct lu_fid     *fid;
108
109         top  = o->lo_header;
110         site = o->lo_dev->ld_site;
111         orig = o;
112
113         /*
114          * till we have full fids-on-OST implemented anonymous objects
115          * are possible in OSP. such an object isn't listed in the site
116          * so we should not remove it from the site.
117          */
118         fid = lu_object_fid(o);
119         if (fid_is_zero(fid)) {
120                 LASSERT(top->loh_hash.next == NULL
121                         && top->loh_hash.pprev == NULL);
122                 LASSERT(list_empty(&top->loh_lru));
123                 if (!atomic_dec_and_test(&top->loh_ref))
124                         return;
125                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
126                         if (o->lo_ops->loo_object_release != NULL)
127                                 o->lo_ops->loo_object_release(env, o);
128                 }
129                 lu_object_free(env, orig);
130                 return;
131         }
132
133         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
134         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
135
136         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
137                 if (lu_object_is_dying(top)) {
138
139                         /*
140                          * somebody may be waiting for this, currently only
141                          * used for cl_object, see cl_object_put_last().
142                          */
143                         wake_up_all(&bkt->lsb_marche_funebre);
144                 }
145                 return;
146         }
147
148         LASSERT(bkt->lsb_busy > 0);
149         bkt->lsb_busy--;
150         /*
151          * When last reference is released, iterate over object
152          * layers, and notify them that object is no longer busy.
153          */
154         list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
155                 if (o->lo_ops->loo_object_release != NULL)
156                         o->lo_ops->loo_object_release(env, o);
157         }
158
159         if (!lu_object_is_dying(top)) {
160                 LASSERT(list_empty(&top->loh_lru));
161                 list_add_tail(&top->loh_lru, &bkt->lsb_lru);
162                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
163                 return;
164         }
165
166         /*
167          * If object is dying (will not be cached), removed it
168          * from hash table and LRU.
169          *
170          * This is done with hash table and LRU lists locked. As the only
171          * way to acquire first reference to previously unreferenced
172          * object is through hash-table lookup (lu_object_find()),
173          * or LRU scanning (lu_site_purge()), that are done under hash-table
174          * and LRU lock, no race with concurrent object lookup is possible
175          * and we can safely destroy object below.
176          */
177         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
178                 cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
179         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
180         /*
181          * Object was already removed from hash and lru above, can
182          * kill it.
183          */
184         lu_object_free(env, orig);
185 }
186 EXPORT_SYMBOL(lu_object_put);
187
188 /**
189  * Put object and don't keep in cache. This is temporary solution for
190  * multi-site objects when its layering is not constant.
191  */
192 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
193 {
194         set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
195         return lu_object_put(env, o);
196 }
197 EXPORT_SYMBOL(lu_object_put_nocache);
198
199 /**
200  * Kill the object and take it out of LRU cache.
201  * Currently used by client code for layout change.
202  */
203 void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
204 {
205         struct lu_object_header *top;
206
207         top = o->lo_header;
208         set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
209         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
210                 cfs_hash_t *obj_hash = o->lo_dev->ld_site->ls_obj_hash;
211                 cfs_hash_bd_t bd;
212
213                 cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
214                 list_del_init(&top->loh_lru);
215                 cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
216                 cfs_hash_bd_unlock(obj_hash, &bd, 1);
217         }
218 }
219 EXPORT_SYMBOL(lu_object_unhash);
220
221 /**
222  * Allocate new object.
223  *
224  * This follows object creation protocol, described in the comment within
225  * struct lu_device_operations definition.
226  */
227 static struct lu_object *lu_object_alloc(const struct lu_env *env,
228                                          struct lu_device *dev,
229                                          const struct lu_fid *f,
230                                          const struct lu_object_conf *conf)
231 {
232         struct lu_object *scan;
233         struct lu_object *top;
234         struct list_head *layers;
235         unsigned int init_mask = 0;
236         unsigned int init_flag;
237         int clean;
238         int result;
239         ENTRY;
240
241         /*
242          * Create top-level object slice. This will also create
243          * lu_object_header.
244          */
245         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
246         if (top == NULL)
247                 RETURN(ERR_PTR(-ENOMEM));
248         if (IS_ERR(top))
249                 RETURN(top);
250         /*
251          * This is the only place where object fid is assigned. It's constant
252          * after this point.
253          */
254         top->lo_header->loh_fid = *f;
255         layers = &top->lo_header->loh_layers;
256
257         do {
258                 /*
259                  * Call ->loo_object_init() repeatedly, until no more new
260                  * object slices are created.
261                  */
262                 clean = 1;
263                 init_flag = 1;
264                 list_for_each_entry(scan, layers, lo_linkage) {
265                         if (init_mask & init_flag)
266                                 goto next;
267                         clean = 0;
268                         scan->lo_header = top->lo_header;
269                         result = scan->lo_ops->loo_object_init(env, scan, conf);
270                         if (result != 0) {
271                                 lu_object_free(env, top);
272                                 RETURN(ERR_PTR(result));
273                         }
274                         init_mask |= init_flag;
275 next:
276                         init_flag <<= 1;
277                 }
278         } while (!clean);
279
280         list_for_each_entry_reverse(scan, layers, lo_linkage) {
281                 if (scan->lo_ops->loo_object_start != NULL) {
282                         result = scan->lo_ops->loo_object_start(env, scan);
283                         if (result != 0) {
284                                 lu_object_free(env, top);
285                                 RETURN(ERR_PTR(result));
286                         }
287                 }
288         }
289
290         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
291         RETURN(top);
292 }
293
294 /**
295  * Free an object.
296  */
297 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
298 {
299         struct lu_site_bkt_data *bkt;
300         struct lu_site          *site;
301         struct lu_object        *scan;
302         struct list_head        *layers;
303         struct list_head         splice;
304
305         site   = o->lo_dev->ld_site;
306         layers = &o->lo_header->loh_layers;
307         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
308         /*
309          * First call ->loo_object_delete() method to release all resources.
310          */
311         list_for_each_entry_reverse(scan, layers, lo_linkage) {
312                 if (scan->lo_ops->loo_object_delete != NULL)
313                         scan->lo_ops->loo_object_delete(env, scan);
314         }
315
316         /*
317          * Then, splice object layers into stand-alone list, and call
318          * ->loo_object_free() on all layers to free memory. Splice is
319          * necessary, because lu_object_header is freed together with the
320          * top-level slice.
321          */
322         INIT_LIST_HEAD(&splice);
323         list_splice_init(layers, &splice);
324         while (!list_empty(&splice)) {
325                 /*
326                  * Free layers in bottom-to-top order, so that object header
327                  * lives as long as possible and ->loo_object_free() methods
328                  * can look at its contents.
329                  */
330                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
331                 list_del_init(&o->lo_linkage);
332                 LASSERT(o->lo_ops->loo_object_free != NULL);
333                 o->lo_ops->loo_object_free(env, o);
334         }
335
336         if (waitqueue_active(&bkt->lsb_marche_funebre))
337                 wake_up_all(&bkt->lsb_marche_funebre);
338 }
339
340 /**
341  * Free \a nr objects from the cold end of the site LRU list.
342  */
343 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
344 {
345         struct lu_object_header *h;
346         struct lu_object_header *temp;
347         struct lu_site_bkt_data *bkt;
348         cfs_hash_bd_t            bd;
349         cfs_hash_bd_t            bd2;
350         struct list_head         dispose;
351         int                      did_sth;
352         int                      start;
353         int                      count;
354         int                      bnr;
355         int                      i;
356
357         if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
358                 RETURN(0);
359
360         INIT_LIST_HEAD(&dispose);
361         /*
362          * Under LRU list lock, scan LRU list and move unreferenced objects to
363          * the dispose list, removing them from LRU and hash table.
364          */
365         start = s->ls_purge_start;
366         bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
367  again:
368         /*
369          * It doesn't make any sense to make purge threads parallel, that can
370          * only bring troubles to us. See LU-5331.
371          */
372         mutex_lock(&s->ls_purge_mutex);
373         did_sth = 0;
374         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
375                 if (i < start)
376                         continue;
377                 count = bnr;
378                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
379                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
380
381                 list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
382                         LASSERT(atomic_read(&h->loh_ref) == 0);
383
384                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
385                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
386
387                         cfs_hash_bd_del_locked(s->ls_obj_hash,
388                                                &bd2, &h->loh_hash);
389                         list_move(&h->loh_lru, &dispose);
390                         if (did_sth == 0)
391                                 did_sth = 1;
392
393                         if (nr != ~0 && --nr == 0)
394                                 break;
395
396                         if (count > 0 && --count == 0)
397                                 break;
398
399                 }
400                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
401                 cond_resched();
402                 /*
403                  * Free everything on the dispose list. This is safe against
404                  * races due to the reasons described in lu_object_put().
405                  */
406                 while (!list_empty(&dispose)) {
407                         h = container_of0(dispose.next,
408                                           struct lu_object_header, loh_lru);
409                         list_del_init(&h->loh_lru);
410                         lu_object_free(env, lu_object_top(h));
411                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
412                 }
413
414                 if (nr == 0)
415                         break;
416         }
417         mutex_unlock(&s->ls_purge_mutex);
418
419         if (nr != 0 && did_sth && start != 0) {
420                 start = 0; /* restart from the first bucket */
421                 goto again;
422         }
423         /* race on s->ls_purge_start, but nobody cares */
424         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
425
426         return nr;
427 }
428 EXPORT_SYMBOL(lu_site_purge);
429
430 /*
431  * Object printing.
432  *
433  * Code below has to jump through certain loops to output object description
434  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
435  * composes object description from strings that are parts of _lines_ of
436  * output (i.e., strings that are not terminated by newline). This doesn't fit
437  * very well into libcfs_debug_msg() interface that assumes that each message
438  * supplied to it is a self-contained output line.
439  *
440  * To work around this, strings are collected in a temporary buffer
441  * (implemented as a value of lu_cdebug_key key), until terminating newline
442  * character is detected.
443  *
444  */
445
446 enum {
447         /**
448          * Maximal line size.
449          *
450          * XXX overflow is not handled correctly.
451          */
452         LU_CDEBUG_LINE = 512
453 };
454
455 struct lu_cdebug_data {
456         /**
457          * Temporary buffer.
458          */
459         char lck_area[LU_CDEBUG_LINE];
460 };
461
462 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
463 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
464
465 /**
466  * Key, holding temporary buffer. This key is registered very early by
467  * lu_global_init().
468  */
469 struct lu_context_key lu_global_key = {
470         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
471                     LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
472         .lct_init = lu_global_key_init,
473         .lct_fini = lu_global_key_fini
474 };
475
476 /**
477  * Printer function emitting messages through libcfs_debug_msg().
478  */
479 int lu_cdebug_printer(const struct lu_env *env,
480                       void *cookie, const char *format, ...)
481 {
482         struct libcfs_debug_msg_data *msgdata = cookie;
483         struct lu_cdebug_data        *key;
484         int used;
485         int complete;
486         va_list args;
487
488         va_start(args, format);
489
490         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
491         LASSERT(key != NULL);
492
493         used = strlen(key->lck_area);
494         complete = format[strlen(format) - 1] == '\n';
495         /*
496          * Append new chunk to the buffer.
497          */
498         vsnprintf(key->lck_area + used,
499                   ARRAY_SIZE(key->lck_area) - used, format, args);
500         if (complete) {
501                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
502                         libcfs_debug_msg(msgdata, "%s\n", key->lck_area);
503                 key->lck_area[0] = 0;
504         }
505         va_end(args);
506         return 0;
507 }
508 EXPORT_SYMBOL(lu_cdebug_printer);
509
510 /**
511  * Print object header.
512  */
513 void lu_object_header_print(const struct lu_env *env, void *cookie,
514                             lu_printer_t printer,
515                             const struct lu_object_header *hdr)
516 {
517         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
518                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
519                    PFID(&hdr->loh_fid),
520                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
521                    list_empty((struct list_head *)&hdr->loh_lru) ? \
522                    "" : " lru",
523                    hdr->loh_attr & LOHA_EXISTS ? " exist" : "");
524 }
525 EXPORT_SYMBOL(lu_object_header_print);
526
527 /**
528  * Print human readable representation of the \a o to the \a printer.
529  */
530 void lu_object_print(const struct lu_env *env, void *cookie,
531                      lu_printer_t printer, const struct lu_object *o)
532 {
533         static const char ruler[] = "........................................";
534         struct lu_object_header *top;
535         int depth = 4;
536
537         top = o->lo_header;
538         lu_object_header_print(env, cookie, printer, top);
539         (*printer)(env, cookie, "{\n");
540
541         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
542                 /*
543                  * print `.' \a depth times followed by type name and address
544                  */
545                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
546                            o->lo_dev->ld_type->ldt_name, o);
547
548                 if (o->lo_ops->loo_object_print != NULL)
549                         (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
550
551                 (*printer)(env, cookie, "\n");
552         }
553
554         (*printer)(env, cookie, "} header@%p\n", top);
555 }
556 EXPORT_SYMBOL(lu_object_print);
557
558 /**
559  * Check object consistency.
560  */
561 int lu_object_invariant(const struct lu_object *o)
562 {
563         struct lu_object_header *top;
564
565         top = o->lo_header;
566         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
567                 if (o->lo_ops->loo_object_invariant != NULL &&
568                     !o->lo_ops->loo_object_invariant(o))
569                         return 0;
570         }
571         return 1;
572 }
573 EXPORT_SYMBOL(lu_object_invariant);
574
575 static struct lu_object *htable_lookup(struct lu_site *s,
576                                        cfs_hash_bd_t *bd,
577                                        const struct lu_fid *f,
578                                        wait_queue_t *waiter,
579                                        __u64 *version)
580 {
581         struct lu_site_bkt_data *bkt;
582         struct lu_object_header *h;
583         struct hlist_node       *hnode;
584         __u64  ver = cfs_hash_bd_version_get(bd);
585
586         if (*version == ver)
587                 return ERR_PTR(-ENOENT);
588
589         *version = ver;
590         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
591         /* cfs_hash_bd_peek_locked is a somehow "internal" function
592          * of cfs_hash, it doesn't add refcount on object. */
593         hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
594         if (hnode == NULL) {
595                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
596                 return ERR_PTR(-ENOENT);
597         }
598
599         h = container_of0(hnode, struct lu_object_header, loh_hash);
600         if (likely(!lu_object_is_dying(h))) {
601                 cfs_hash_get(s->ls_obj_hash, hnode);
602                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
603                 list_del_init(&h->loh_lru);
604                 return lu_object_top(h);
605         }
606
607         /*
608          * Lookup found an object being destroyed this object cannot be
609          * returned (to assure that references to dying objects are eventually
610          * drained), and moreover, lookup has to wait until object is freed.
611          */
612
613         init_waitqueue_entry_current(waiter);
614         add_wait_queue(&bkt->lsb_marche_funebre, waiter);
615         set_current_state(TASK_UNINTERRUPTIBLE);
616         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
617         return ERR_PTR(-EAGAIN);
618 }
619
620 static struct lu_object *htable_lookup_nowait(struct lu_site *s,
621                                               cfs_hash_bd_t *bd,
622                                               const struct lu_fid *f)
623 {
624         struct hlist_node       *hnode;
625         struct lu_object_header *h;
626
627         /* cfs_hash_bd_peek_locked is a somehow "internal" function
628          * of cfs_hash, it doesn't add refcount on object. */
629         hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
630         if (hnode == NULL) {
631                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
632                 return ERR_PTR(-ENOENT);
633         }
634
635         h = container_of0(hnode, struct lu_object_header, loh_hash);
636         if (unlikely(lu_object_is_dying(h)))
637                 return ERR_PTR(-ENOENT);
638
639         cfs_hash_get(s->ls_obj_hash, hnode);
640         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
641         list_del_init(&h->loh_lru);
642         return lu_object_top(h);
643 }
644
645 /**
646  * Search cache for an object with the fid \a f. If such object is found,
647  * return it. Otherwise, create new object, insert it into cache and return
648  * it. In any case, additional reference is acquired on the returned object.
649  */
650 struct lu_object *lu_object_find(const struct lu_env *env,
651                                  struct lu_device *dev, const struct lu_fid *f,
652                                  const struct lu_object_conf *conf)
653 {
654         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
655 }
656 EXPORT_SYMBOL(lu_object_find);
657
658 /*
659  * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because
660  * the calculation for the number of objects to reclaim is not covered by
661  * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST.
662  * This ensures that many concurrent threads will not accidentally purge
663  * the entire cache.
664  */
665 static void lu_object_limit(const struct lu_env *env,
666                             struct lu_device *dev)
667 {
668         __u64 size, nr;
669
670         if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
671                 return;
672
673         size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
674         nr = (__u64)lu_cache_nr;
675         if (size > nr)
676                 lu_site_purge(env, dev->ld_site,
677                               MIN(size - nr, LU_CACHE_NR_MAX_ADJUST));
678
679         return;
680 }
681
682 static struct lu_object *lu_object_new(const struct lu_env *env,
683                                        struct lu_device *dev,
684                                        const struct lu_fid *f,
685                                        const struct lu_object_conf *conf)
686 {
687         struct lu_object        *o;
688         cfs_hash_t              *hs;
689         cfs_hash_bd_t            bd;
690         struct lu_site_bkt_data *bkt;
691
692         o = lu_object_alloc(env, dev, f, conf);
693         if (unlikely(IS_ERR(o)))
694                 return o;
695
696         hs = dev->ld_site->ls_obj_hash;
697         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
698         bkt = cfs_hash_bd_extra_get(hs, &bd);
699         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
700         bkt->lsb_busy++;
701         cfs_hash_bd_unlock(hs, &bd, 1);
702
703         lu_object_limit(env, dev);
704
705         return o;
706 }
707
708 /**
709  * Core logic of lu_object_find*() functions.
710  */
711 static struct lu_object *lu_object_find_try(const struct lu_env *env,
712                                             struct lu_device *dev,
713                                             const struct lu_fid *f,
714                                             const struct lu_object_conf *conf,
715                                             wait_queue_t *waiter)
716 {
717         struct lu_object      *o;
718         struct lu_object      *shadow;
719         struct lu_site        *s;
720         cfs_hash_t            *hs;
721         cfs_hash_bd_t          bd;
722         __u64                  version = 0;
723
724         /*
725          * This uses standard index maintenance protocol:
726          *
727          *     - search index under lock, and return object if found;
728          *     - otherwise, unlock index, allocate new object;
729          *     - lock index and search again;
730          *     - if nothing is found (usual case), insert newly created
731          *       object into index;
732          *     - otherwise (race: other thread inserted object), free
733          *       object just allocated.
734          *     - unlock index;
735          *     - return object.
736          *
737          * For "LOC_F_NEW" case, we are sure the object is new established.
738          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
739          * just alloc and insert directly.
740          *
741          * If dying object is found during index search, add @waiter to the
742          * site wait-queue and return ERR_PTR(-EAGAIN).
743          */
744         if (conf != NULL && conf->loc_flags & LOC_F_NEW)
745                 return lu_object_new(env, dev, f, conf);
746
747         s  = dev->ld_site;
748         hs = s->ls_obj_hash;
749         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
750         o = htable_lookup(s, &bd, f, waiter, &version);
751         cfs_hash_bd_unlock(hs, &bd, 1);
752         if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
753                 return o;
754
755         /*
756          * Allocate new object. This may result in rather complicated
757          * operations, including fld queries, inode loading, etc.
758          */
759         o = lu_object_alloc(env, dev, f, conf);
760         if (unlikely(IS_ERR(o)))
761                 return o;
762
763         LASSERT(lu_fid_eq(lu_object_fid(o), f));
764
765         cfs_hash_bd_lock(hs, &bd, 1);
766
767         shadow = htable_lookup(s, &bd, f, waiter, &version);
768         if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
769                 struct lu_site_bkt_data *bkt;
770
771                 bkt = cfs_hash_bd_extra_get(hs, &bd);
772                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
773                 bkt->lsb_busy++;
774                 cfs_hash_bd_unlock(hs, &bd, 1);
775
776                 lu_object_limit(env, dev);
777
778                 return o;
779         }
780
781         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
782         cfs_hash_bd_unlock(hs, &bd, 1);
783         lu_object_free(env, o);
784         return shadow;
785 }
786
787 /**
788  * Much like lu_object_find(), but top level device of object is specifically
789  * \a dev rather than top level device of the site. This interface allows
790  * objects of different "stacking" to be created within the same site.
791  */
792 struct lu_object *lu_object_find_at(const struct lu_env *env,
793                                     struct lu_device *dev,
794                                     const struct lu_fid *f,
795                                     const struct lu_object_conf *conf)
796 {
797         struct lu_site_bkt_data *bkt;
798         struct lu_object        *obj;
799         wait_queue_t           wait;
800
801         while (1) {
802                 obj = lu_object_find_try(env, dev, f, conf, &wait);
803                 if (obj != ERR_PTR(-EAGAIN))
804                         return obj;
805                 /*
806                  * lu_object_find_try() already added waiter into the
807                  * wait queue.
808                  */
809                 waitq_wait(&wait, TASK_UNINTERRUPTIBLE);
810                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
811                 remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
812         }
813 }
814 EXPORT_SYMBOL(lu_object_find_at);
815
816 /**
817  * Try to find the object in cache without waiting for the dead object
818  * to be released nor allocating object if no cached one was found.
819  *
820  * The found object will be set as LU_OBJECT_HEARD_BANSHEE for purging.
821  */
822 void lu_object_purge(const struct lu_env *env, struct lu_device *dev,
823                      const struct lu_fid *f)
824 {
825         struct lu_site          *s  = dev->ld_site;
826         cfs_hash_t              *hs = s->ls_obj_hash;
827         cfs_hash_bd_t            bd;
828         struct lu_object        *o;
829
830         cfs_hash_bd_get_and_lock(hs, f, &bd, 1);
831         o = htable_lookup_nowait(s, &bd, f);
832         cfs_hash_bd_unlock(hs, &bd, 1);
833         if (!IS_ERR(o)) {
834                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
835                 lu_object_put(env, o);
836         }
837 }
838 EXPORT_SYMBOL(lu_object_purge);
839
840 /**
841  * Find object with given fid, and return its slice belonging to given device.
842  */
843 struct lu_object *lu_object_find_slice(const struct lu_env *env,
844                                        struct lu_device *dev,
845                                        const struct lu_fid *f,
846                                        const struct lu_object_conf *conf)
847 {
848         struct lu_object *top;
849         struct lu_object *obj;
850
851         top = lu_object_find(env, dev, f, conf);
852         if (!IS_ERR(top)) {
853                 obj = lu_object_locate(top->lo_header, dev->ld_type);
854                 if (obj == NULL)
855                         lu_object_put(env, top);
856         } else
857                 obj = top;
858         return obj;
859 }
860 EXPORT_SYMBOL(lu_object_find_slice);
861
862 /**
863  * Global list of all device types.
864  */
865 static struct list_head lu_device_types;
866
867 int lu_device_type_init(struct lu_device_type *ldt)
868 {
869         int result = 0;
870
871         atomic_set(&ldt->ldt_device_nr, 0);
872         INIT_LIST_HEAD(&ldt->ldt_linkage);
873         if (ldt->ldt_ops->ldto_init)
874                 result = ldt->ldt_ops->ldto_init(ldt);
875
876         if (result == 0) {
877                 spin_lock(&obd_types_lock);
878                 list_add(&ldt->ldt_linkage, &lu_device_types);
879                 spin_unlock(&obd_types_lock);
880         }
881
882         return result;
883 }
884 EXPORT_SYMBOL(lu_device_type_init);
885
886 void lu_device_type_fini(struct lu_device_type *ldt)
887 {
888         spin_lock(&obd_types_lock);
889         list_del_init(&ldt->ldt_linkage);
890         spin_unlock(&obd_types_lock);
891         if (ldt->ldt_ops->ldto_fini)
892                 ldt->ldt_ops->ldto_fini(ldt);
893 }
894 EXPORT_SYMBOL(lu_device_type_fini);
895
896 /**
897  * Global list of all sites on this node
898  */
899 static struct list_head lu_sites;
900 static DEFINE_MUTEX(lu_sites_guard);
901
902 /**
903  * Global environment used by site shrinker.
904  */
905 static struct lu_env lu_shrink_env;
906
907 struct lu_site_print_arg {
908         struct lu_env   *lsp_env;
909         void            *lsp_cookie;
910         lu_printer_t     lsp_printer;
911 };
912
913 static int
914 lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd,
915                   struct hlist_node *hnode, void *data)
916 {
917         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
918         struct lu_object_header  *h;
919
920         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
921         if (!list_empty(&h->loh_layers)) {
922                 const struct lu_object *o;
923
924                 o = lu_object_top(h);
925                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
926                                 arg->lsp_printer, o);
927         } else {
928                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
929                                        arg->lsp_printer, h);
930         }
931         return 0;
932 }
933
934 /**
935  * Print all objects in \a s.
936  */
937 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
938                    lu_printer_t printer)
939 {
940         struct lu_site_print_arg arg = {
941                 .lsp_env     = (struct lu_env *)env,
942                 .lsp_cookie  = cookie,
943                 .lsp_printer = printer,
944         };
945
946         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
947 }
948 EXPORT_SYMBOL(lu_site_print);
949
950 /**
951  * Return desired hash table order.
952  */
953 static int lu_htable_order(struct lu_device *top)
954 {
955         unsigned long cache_size;
956         int bits;
957
958         /*
959          * For ZFS based OSDs the cache should be disabled by default.  This
960          * allows the ZFS ARC maximum flexibility in determining what buffers
961          * to cache.  If Lustre has objects or buffer which it wants to ensure
962          * always stay cached it must maintain a hold on them.
963          */
964         if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
965                 lu_cache_percent = 1;
966                 lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
967                 return LU_SITE_BITS_MIN;
968         }
969
970         /*
971          * Calculate hash table size, assuming that we want reasonable
972          * performance when 20% of total memory is occupied by cache of
973          * lu_objects.
974          *
975          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
976          */
977         cache_size = totalram_pages;
978
979 #if BITS_PER_LONG == 32
980         /* limit hashtable size for lowmem systems to low RAM */
981         if (cache_size > 1 << (30 - PAGE_CACHE_SHIFT))
982                 cache_size = 1 << (30 - PAGE_CACHE_SHIFT) * 3 / 4;
983 #endif
984
985         /* clear off unreasonable cache setting. */
986         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
987                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
988                       " the range of (0, %u]. Will use default value: %u.\n",
989                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
990                       LU_CACHE_PERCENT_DEFAULT);
991
992                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
993         }
994         cache_size = cache_size / 100 * lu_cache_percent *
995                 (PAGE_CACHE_SIZE / 1024);
996
997         for (bits = 1; (1 << bits) < cache_size; ++bits) {
998                 ;
999         }
1000         return bits;
1001 }
1002
1003 static unsigned lu_obj_hop_hash(cfs_hash_t *hs,
1004                                 const void *key, unsigned mask)
1005 {
1006         struct lu_fid  *fid = (struct lu_fid *)key;
1007         __u32           hash;
1008
1009         hash = fid_flatten32(fid);
1010         hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
1011         hash = hash_long(hash, hs->hs_bkt_bits);
1012
1013         /* give me another random factor */
1014         hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
1015
1016         hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
1017         hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
1018
1019         return hash & mask;
1020 }
1021
1022 static void *lu_obj_hop_object(struct hlist_node *hnode)
1023 {
1024         return hlist_entry(hnode, struct lu_object_header, loh_hash);
1025 }
1026
1027 static void *lu_obj_hop_key(struct hlist_node *hnode)
1028 {
1029         struct lu_object_header *h;
1030
1031         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
1032         return &h->loh_fid;
1033 }
1034
1035 static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
1036 {
1037         struct lu_object_header *h;
1038
1039         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
1040         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
1041 }
1042
1043 static void lu_obj_hop_get(cfs_hash_t *hs, struct hlist_node *hnode)
1044 {
1045         struct lu_object_header *h;
1046
1047         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
1048         if (atomic_add_return(1, &h->loh_ref) == 1) {
1049                 struct lu_site_bkt_data *bkt;
1050                 cfs_hash_bd_t            bd;
1051
1052                 cfs_hash_bd_get(hs, &h->loh_fid, &bd);
1053                 bkt = cfs_hash_bd_extra_get(hs, &bd);
1054                 bkt->lsb_busy++;
1055         }
1056 }
1057
1058 static void lu_obj_hop_put_locked(cfs_hash_t *hs, struct hlist_node *hnode)
1059 {
1060         LBUG(); /* we should never called it */
1061 }
1062
1063 cfs_hash_ops_t lu_site_hash_ops = {
1064         .hs_hash        = lu_obj_hop_hash,
1065         .hs_key         = lu_obj_hop_key,
1066         .hs_keycmp      = lu_obj_hop_keycmp,
1067         .hs_object      = lu_obj_hop_object,
1068         .hs_get         = lu_obj_hop_get,
1069         .hs_put_locked  = lu_obj_hop_put_locked,
1070 };
1071
1072 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
1073 {
1074         spin_lock(&s->ls_ld_lock);
1075         if (list_empty(&d->ld_linkage))
1076                 list_add(&d->ld_linkage, &s->ls_ld_linkage);
1077         spin_unlock(&s->ls_ld_lock);
1078 }
1079 EXPORT_SYMBOL(lu_dev_add_linkage);
1080
1081 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
1082 {
1083         spin_lock(&s->ls_ld_lock);
1084         list_del_init(&d->ld_linkage);
1085         spin_unlock(&s->ls_ld_lock);
1086 }
1087 EXPORT_SYMBOL(lu_dev_del_linkage);
1088
1089 /**
1090   * Initialize site \a s, with \a d as the top level device.
1091   */
1092 int lu_site_init(struct lu_site *s, struct lu_device *top)
1093 {
1094         struct lu_site_bkt_data *bkt;
1095         cfs_hash_bd_t bd;
1096         char name[16];
1097         int bits;
1098         int i;
1099         ENTRY;
1100
1101         INIT_LIST_HEAD(&lu_sites);
1102
1103         memset(s, 0, sizeof *s);
1104         mutex_init(&s->ls_purge_mutex);
1105         bits = lu_htable_order(top);
1106         snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
1107         for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
1108              bits >= LU_SITE_BITS_MIN; bits--) {
1109                 s->ls_obj_hash = cfs_hash_create(name, bits, bits,
1110                                                  bits - LU_SITE_BKT_BITS,
1111                                                  sizeof(*bkt), 0, 0,
1112                                                  &lu_site_hash_ops,
1113                                                  CFS_HASH_SPIN_BKTLOCK |
1114                                                  CFS_HASH_NO_ITEMREF |
1115                                                  CFS_HASH_DEPTH |
1116                                                  CFS_HASH_ASSERT_EMPTY |
1117                                                  CFS_HASH_COUNTER);
1118                 if (s->ls_obj_hash != NULL)
1119                         break;
1120         }
1121
1122         if (s->ls_obj_hash == NULL) {
1123                 CERROR("failed to create lu_site hash with bits: %d\n", bits);
1124                 return -ENOMEM;
1125         }
1126
1127         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
1128                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
1129                 INIT_LIST_HEAD(&bkt->lsb_lru);
1130                 init_waitqueue_head(&bkt->lsb_marche_funebre);
1131         }
1132
1133         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
1134         if (s->ls_stats == NULL) {
1135                 cfs_hash_putref(s->ls_obj_hash);
1136                 s->ls_obj_hash = NULL;
1137                 return -ENOMEM;
1138         }
1139
1140         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
1141                              0, "created", "created");
1142         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
1143                              0, "cache_hit", "cache_hit");
1144         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
1145                              0, "cache_miss", "cache_miss");
1146         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
1147                              0, "cache_race", "cache_race");
1148         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
1149                              0, "cache_death_race", "cache_death_race");
1150         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
1151                              0, "lru_purged", "lru_purged");
1152
1153         INIT_LIST_HEAD(&s->ls_linkage);
1154         s->ls_top_dev = top;
1155         top->ld_site = s;
1156         lu_device_get(top);
1157         lu_ref_add(&top->ld_reference, "site-top", s);
1158
1159         INIT_LIST_HEAD(&s->ls_ld_linkage);
1160         spin_lock_init(&s->ls_ld_lock);
1161
1162         lu_dev_add_linkage(s, top);
1163
1164         RETURN(0);
1165 }
1166 EXPORT_SYMBOL(lu_site_init);
1167
1168 /**
1169  * Finalize \a s and release its resources.
1170  */
1171 void lu_site_fini(struct lu_site *s)
1172 {
1173         mutex_lock(&lu_sites_guard);
1174         list_del_init(&s->ls_linkage);
1175         mutex_unlock(&lu_sites_guard);
1176
1177         if (s->ls_obj_hash != NULL) {
1178                 cfs_hash_putref(s->ls_obj_hash);
1179                 s->ls_obj_hash = NULL;
1180         }
1181
1182         if (s->ls_top_dev != NULL) {
1183                 s->ls_top_dev->ld_site = NULL;
1184                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1185                 lu_device_put(s->ls_top_dev);
1186                 s->ls_top_dev = NULL;
1187         }
1188
1189         if (s->ls_stats != NULL)
1190                 lprocfs_free_stats(&s->ls_stats);
1191 }
1192 EXPORT_SYMBOL(lu_site_fini);
1193
1194 /**
1195  * Called when initialization of stack for this site is completed.
1196  */
1197 int lu_site_init_finish(struct lu_site *s)
1198 {
1199         int result;
1200         mutex_lock(&lu_sites_guard);
1201         result = lu_context_refill(&lu_shrink_env.le_ctx);
1202         if (result == 0)
1203                 list_add(&s->ls_linkage, &lu_sites);
1204         mutex_unlock(&lu_sites_guard);
1205         return result;
1206 }
1207 EXPORT_SYMBOL(lu_site_init_finish);
1208
1209 /**
1210  * Acquire additional reference on device \a d
1211  */
1212 void lu_device_get(struct lu_device *d)
1213 {
1214         atomic_inc(&d->ld_ref);
1215 }
1216 EXPORT_SYMBOL(lu_device_get);
1217
1218 /**
1219  * Release reference on device \a d.
1220  */
1221 void lu_device_put(struct lu_device *d)
1222 {
1223         LASSERT(atomic_read(&d->ld_ref) > 0);
1224         atomic_dec(&d->ld_ref);
1225 }
1226 EXPORT_SYMBOL(lu_device_put);
1227
1228 /**
1229  * Initialize device \a d of type \a t.
1230  */
1231 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1232 {
1233         if (atomic_inc_return(&t->ldt_device_nr) == 1 &&
1234             t->ldt_ops->ldto_start != NULL)
1235                 t->ldt_ops->ldto_start(t);
1236
1237         memset(d, 0, sizeof *d);
1238         d->ld_type = t;
1239         lu_ref_init(&d->ld_reference);
1240         INIT_LIST_HEAD(&d->ld_linkage);
1241
1242         return 0;
1243 }
1244 EXPORT_SYMBOL(lu_device_init);
1245
1246 /**
1247  * Finalize device \a d.
1248  */
1249 void lu_device_fini(struct lu_device *d)
1250 {
1251         struct lu_device_type *t = d->ld_type;
1252
1253         if (d->ld_obd != NULL) {
1254                 d->ld_obd->obd_lu_dev = NULL;
1255                 d->ld_obd = NULL;
1256         }
1257
1258         lu_ref_fini(&d->ld_reference);
1259         LASSERTF(atomic_read(&d->ld_ref) == 0,
1260                  "Refcount is %u\n", atomic_read(&d->ld_ref));
1261         LASSERT(atomic_read(&t->ldt_device_nr) > 0);
1262
1263         if (atomic_dec_and_test(&t->ldt_device_nr) &&
1264             t->ldt_ops->ldto_stop != NULL)
1265                 t->ldt_ops->ldto_stop(t);
1266 }
1267 EXPORT_SYMBOL(lu_device_fini);
1268
1269 /**
1270  * Initialize object \a o that is part of compound object \a h and was created
1271  * by device \a d.
1272  */
1273 int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1274                    struct lu_device *d)
1275 {
1276         memset(o, 0, sizeof(*o));
1277         o->lo_header = h;
1278         o->lo_dev = d;
1279         lu_device_get(d);
1280         lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
1281         INIT_LIST_HEAD(&o->lo_linkage);
1282
1283         return 0;
1284 }
1285 EXPORT_SYMBOL(lu_object_init);
1286
1287 /**
1288  * Finalize object and release its resources.
1289  */
1290 void lu_object_fini(struct lu_object *o)
1291 {
1292         struct lu_device *dev = o->lo_dev;
1293
1294         LASSERT(list_empty(&o->lo_linkage));
1295
1296         if (dev != NULL) {
1297                 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1298                               "lu_object", o);
1299                 lu_device_put(dev);
1300                 o->lo_dev = NULL;
1301         }
1302 }
1303 EXPORT_SYMBOL(lu_object_fini);
1304
1305 /**
1306  * Add object \a o as first layer of compound object \a h
1307  *
1308  * This is typically called by the ->ldo_object_alloc() method of top-level
1309  * device.
1310  */
1311 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1312 {
1313         list_move(&o->lo_linkage, &h->loh_layers);
1314 }
1315 EXPORT_SYMBOL(lu_object_add_top);
1316
1317 /**
1318  * Add object \a o as a layer of compound object, going after \a before.
1319  *
1320  * This is typically called by the ->ldo_object_alloc() method of \a
1321  * before->lo_dev.
1322  */
1323 void lu_object_add(struct lu_object *before, struct lu_object *o)
1324 {
1325         list_move(&o->lo_linkage, &before->lo_linkage);
1326 }
1327 EXPORT_SYMBOL(lu_object_add);
1328
1329 /**
1330  * Initialize compound object.
1331  */
1332 int lu_object_header_init(struct lu_object_header *h)
1333 {
1334         memset(h, 0, sizeof *h);
1335         atomic_set(&h->loh_ref, 1);
1336         INIT_HLIST_NODE(&h->loh_hash);
1337         INIT_LIST_HEAD(&h->loh_lru);
1338         INIT_LIST_HEAD(&h->loh_layers);
1339         lu_ref_init(&h->loh_reference);
1340         return 0;
1341 }
1342 EXPORT_SYMBOL(lu_object_header_init);
1343
1344 /**
1345  * Finalize compound object.
1346  */
1347 void lu_object_header_fini(struct lu_object_header *h)
1348 {
1349         LASSERT(list_empty(&h->loh_layers));
1350         LASSERT(list_empty(&h->loh_lru));
1351         LASSERT(hlist_unhashed(&h->loh_hash));
1352         lu_ref_fini(&h->loh_reference);
1353 }
1354 EXPORT_SYMBOL(lu_object_header_fini);
1355
1356 /**
1357  * Given a compound object, find its slice, corresponding to the device type
1358  * \a dtype.
1359  */
1360 struct lu_object *lu_object_locate(struct lu_object_header *h,
1361                                    const struct lu_device_type *dtype)
1362 {
1363         struct lu_object *o;
1364
1365         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1366                 if (o->lo_dev->ld_type == dtype)
1367                         return o;
1368         }
1369         return NULL;
1370 }
1371 EXPORT_SYMBOL(lu_object_locate);
1372
1373 /**
1374  * Finalize and free devices in the device stack.
1375  *
1376  * Finalize device stack by purging object cache, and calling
1377  * lu_device_type_operations::ldto_device_fini() and
1378  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1379  */
1380 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1381 {
1382         struct lu_site   *site = top->ld_site;
1383         struct lu_device *scan;
1384         struct lu_device *next;
1385
1386         lu_site_purge(env, site, ~0);
1387         for (scan = top; scan != NULL; scan = next) {
1388                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1389                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1390                 lu_device_put(scan);
1391         }
1392
1393         /* purge again. */
1394         lu_site_purge(env, site, ~0);
1395
1396         for (scan = top; scan != NULL; scan = next) {
1397                 const struct lu_device_type *ldt = scan->ld_type;
1398                 struct obd_type             *type;
1399
1400                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1401                 type = ldt->ldt_obd_type;
1402                 if (type != NULL) {
1403                         type->typ_refcnt--;
1404                         class_put_type(type);
1405                 }
1406         }
1407 }
1408 EXPORT_SYMBOL(lu_stack_fini);
1409
1410 enum {
1411         /**
1412          * Maximal number of tld slots.
1413          */
1414         LU_CONTEXT_KEY_NR = 40
1415 };
1416
1417 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1418
1419 static DEFINE_SPINLOCK(lu_keys_guard);
1420
1421 /**
1422  * Global counter incremented whenever key is registered, unregistered,
1423  * revived or quiesced. This is used to void unnecessary calls to
1424  * lu_context_refill(). No locking is provided, as initialization and shutdown
1425  * are supposed to be externally serialized.
1426  */
1427 static unsigned key_set_version = 0;
1428
1429 /**
1430  * Register new key.
1431  */
1432 int lu_context_key_register(struct lu_context_key *key)
1433 {
1434         int result;
1435         int i;
1436
1437         LASSERT(key->lct_init != NULL);
1438         LASSERT(key->lct_fini != NULL);
1439         LASSERT(key->lct_tags != 0);
1440         LASSERT(key->lct_owner != NULL);
1441
1442         result = -ENFILE;
1443         spin_lock(&lu_keys_guard);
1444         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1445                 if (lu_keys[i] == NULL) {
1446                         key->lct_index = i;
1447                         atomic_set(&key->lct_used, 1);
1448                         lu_keys[i] = key;
1449                         lu_ref_init(&key->lct_reference);
1450                         result = 0;
1451                         ++key_set_version;
1452                         break;
1453                 }
1454         }
1455         spin_unlock(&lu_keys_guard);
1456         return result;
1457 }
1458 EXPORT_SYMBOL(lu_context_key_register);
1459
1460 static void key_fini(struct lu_context *ctx, int index)
1461 {
1462         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1463                 struct lu_context_key *key;
1464
1465                 key = lu_keys[index];
1466                 LASSERT(key != NULL);
1467                 LASSERT(key->lct_fini != NULL);
1468                 LASSERT(atomic_read(&key->lct_used) > 1);
1469
1470                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1471                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1472                 atomic_dec(&key->lct_used);
1473
1474                 LASSERT(key->lct_owner != NULL);
1475                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1476                         LINVRNT(module_refcount(key->lct_owner) > 0);
1477                         module_put(key->lct_owner);
1478                 }
1479                 ctx->lc_value[index] = NULL;
1480         }
1481 }
1482
1483 /**
1484  * Deregister key.
1485  */
1486 void lu_context_key_degister(struct lu_context_key *key)
1487 {
1488         LASSERT(atomic_read(&key->lct_used) >= 1);
1489         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1490
1491         lu_context_key_quiesce(key);
1492
1493         ++key_set_version;
1494         spin_lock(&lu_keys_guard);
1495         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1496         if (lu_keys[key->lct_index]) {
1497                 lu_keys[key->lct_index] = NULL;
1498                 lu_ref_fini(&key->lct_reference);
1499         }
1500         spin_unlock(&lu_keys_guard);
1501
1502         LASSERTF(atomic_read(&key->lct_used) == 1,
1503                  "key has instances: %d\n",
1504                  atomic_read(&key->lct_used));
1505 }
1506 EXPORT_SYMBOL(lu_context_key_degister);
1507
1508 /**
1509  * Register a number of keys. This has to be called after all keys have been
1510  * initialized by a call to LU_CONTEXT_KEY_INIT().
1511  */
1512 int lu_context_key_register_many(struct lu_context_key *k, ...)
1513 {
1514         struct lu_context_key *key = k;
1515         va_list args;
1516         int result;
1517
1518         va_start(args, k);
1519         do {
1520                 result = lu_context_key_register(key);
1521                 if (result)
1522                         break;
1523                 key = va_arg(args, struct lu_context_key *);
1524         } while (key != NULL);
1525         va_end(args);
1526
1527         if (result != 0) {
1528                 va_start(args, k);
1529                 while (k != key) {
1530                         lu_context_key_degister(k);
1531                         k = va_arg(args, struct lu_context_key *);
1532                 }
1533                 va_end(args);
1534         }
1535
1536         return result;
1537 }
1538 EXPORT_SYMBOL(lu_context_key_register_many);
1539
1540 /**
1541  * De-register a number of keys. This is a dual to
1542  * lu_context_key_register_many().
1543  */
1544 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1545 {
1546         va_list args;
1547
1548         va_start(args, k);
1549         do {
1550                 lu_context_key_degister(k);
1551                 k = va_arg(args, struct lu_context_key*);
1552         } while (k != NULL);
1553         va_end(args);
1554 }
1555 EXPORT_SYMBOL(lu_context_key_degister_many);
1556
1557 /**
1558  * Revive a number of keys.
1559  */
1560 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1561 {
1562         va_list args;
1563
1564         va_start(args, k);
1565         do {
1566                 lu_context_key_revive(k);
1567                 k = va_arg(args, struct lu_context_key*);
1568         } while (k != NULL);
1569         va_end(args);
1570 }
1571 EXPORT_SYMBOL(lu_context_key_revive_many);
1572
1573 /**
1574  * Quiescent a number of keys.
1575  */
1576 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1577 {
1578         va_list args;
1579
1580         va_start(args, k);
1581         do {
1582                 lu_context_key_quiesce(k);
1583                 k = va_arg(args, struct lu_context_key*);
1584         } while (k != NULL);
1585         va_end(args);
1586 }
1587 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1588
1589 /**
1590  * Return value associated with key \a key in context \a ctx.
1591  */
1592 void *lu_context_key_get(const struct lu_context *ctx,
1593                          const struct lu_context_key *key)
1594 {
1595         LINVRNT(ctx->lc_state == LCS_ENTERED);
1596         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1597         LASSERT(lu_keys[key->lct_index] == key);
1598         return ctx->lc_value[key->lct_index];
1599 }
1600 EXPORT_SYMBOL(lu_context_key_get);
1601
1602 /**
1603  * List of remembered contexts. XXX document me.
1604  */
1605 static struct list_head lu_context_remembered;
1606
1607 /**
1608  * Destroy \a key in all remembered contexts. This is used to destroy key
1609  * values in "shared" contexts (like service threads), when a module owning
1610  * the key is about to be unloaded.
1611  */
1612 void lu_context_key_quiesce(struct lu_context_key *key)
1613 {
1614         struct lu_context *ctx;
1615         extern unsigned cl_env_cache_purge(unsigned nr);
1616
1617         if (!(key->lct_tags & LCT_QUIESCENT)) {
1618                 /*
1619                  * XXX layering violation.
1620                  */
1621                 cl_env_cache_purge(~0);
1622                 key->lct_tags |= LCT_QUIESCENT;
1623                 /*
1624                  * XXX memory barrier has to go here.
1625                  */
1626                 spin_lock(&lu_keys_guard);
1627                 list_for_each_entry(ctx, &lu_context_remembered,
1628                                     lc_remember)
1629                         key_fini(ctx, key->lct_index);
1630                 spin_unlock(&lu_keys_guard);
1631                 ++key_set_version;
1632         }
1633 }
1634 EXPORT_SYMBOL(lu_context_key_quiesce);
1635
1636 void lu_context_key_revive(struct lu_context_key *key)
1637 {
1638         key->lct_tags &= ~LCT_QUIESCENT;
1639         ++key_set_version;
1640 }
1641 EXPORT_SYMBOL(lu_context_key_revive);
1642
1643 static void keys_fini(struct lu_context *ctx)
1644 {
1645         int     i;
1646
1647         if (ctx->lc_value == NULL)
1648                 return;
1649
1650         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1651                 key_fini(ctx, i);
1652
1653         OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1654         ctx->lc_value = NULL;
1655 }
1656
1657 static int keys_fill(struct lu_context *ctx)
1658 {
1659         int i;
1660
1661         LINVRNT(ctx->lc_value != NULL);
1662         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1663                 struct lu_context_key *key;
1664
1665                 key = lu_keys[i];
1666                 if (ctx->lc_value[i] == NULL && key != NULL &&
1667                     (key->lct_tags & ctx->lc_tags) &&
1668                     /*
1669                      * Don't create values for a LCT_QUIESCENT key, as this
1670                      * will pin module owning a key.
1671                      */
1672                     !(key->lct_tags & LCT_QUIESCENT)) {
1673                         void *value;
1674
1675                         LINVRNT(key->lct_init != NULL);
1676                         LINVRNT(key->lct_index == i);
1677
1678                         value = key->lct_init(ctx, key);
1679                         if (unlikely(IS_ERR(value)))
1680                                 return PTR_ERR(value);
1681
1682                         LASSERT(key->lct_owner != NULL);
1683                         if (!(ctx->lc_tags & LCT_NOREF))
1684                                 try_module_get(key->lct_owner);
1685                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1686                         atomic_inc(&key->lct_used);
1687                         /*
1688                          * This is the only place in the code, where an
1689                          * element of ctx->lc_value[] array is set to non-NULL
1690                          * value.
1691                          */
1692                         ctx->lc_value[i] = value;
1693                         if (key->lct_exit != NULL)
1694                                 ctx->lc_tags |= LCT_HAS_EXIT;
1695                 }
1696                 ctx->lc_version = key_set_version;
1697         }
1698         return 0;
1699 }
1700
1701 static int keys_init(struct lu_context *ctx)
1702 {
1703         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1704         if (likely(ctx->lc_value != NULL))
1705                 return keys_fill(ctx);
1706
1707         return -ENOMEM;
1708 }
1709
1710 /**
1711  * Initialize context data-structure. Create values for all keys.
1712  */
1713 int lu_context_init(struct lu_context *ctx, __u32 tags)
1714 {
1715         int     rc;
1716
1717         memset(ctx, 0, sizeof *ctx);
1718         ctx->lc_state = LCS_INITIALIZED;
1719         ctx->lc_tags = tags;
1720         if (tags & LCT_REMEMBER) {
1721                 spin_lock(&lu_keys_guard);
1722                 list_add(&ctx->lc_remember, &lu_context_remembered);
1723                 spin_unlock(&lu_keys_guard);
1724         } else {
1725                 INIT_LIST_HEAD(&ctx->lc_remember);
1726         }
1727
1728         rc = keys_init(ctx);
1729         if (rc != 0)
1730                 lu_context_fini(ctx);
1731
1732         return rc;
1733 }
1734 EXPORT_SYMBOL(lu_context_init);
1735
1736 /**
1737  * Finalize context data-structure. Destroy key values.
1738  */
1739 void lu_context_fini(struct lu_context *ctx)
1740 {
1741         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1742         ctx->lc_state = LCS_FINALIZED;
1743
1744         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1745                 LASSERT(list_empty(&ctx->lc_remember));
1746                 keys_fini(ctx);
1747
1748         } else { /* could race with key degister */
1749                 spin_lock(&lu_keys_guard);
1750                 keys_fini(ctx);
1751                 list_del_init(&ctx->lc_remember);
1752                 spin_unlock(&lu_keys_guard);
1753         }
1754 }
1755 EXPORT_SYMBOL(lu_context_fini);
1756
1757 /**
1758  * Called before entering context.
1759  */
1760 void lu_context_enter(struct lu_context *ctx)
1761 {
1762         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1763         ctx->lc_state = LCS_ENTERED;
1764 }
1765 EXPORT_SYMBOL(lu_context_enter);
1766
1767 /**
1768  * Called after exiting from \a ctx
1769  */
1770 void lu_context_exit(struct lu_context *ctx)
1771 {
1772         int i;
1773
1774         LINVRNT(ctx->lc_state == LCS_ENTERED);
1775         ctx->lc_state = LCS_LEFT;
1776         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1777                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1778                         if (ctx->lc_value[i] != NULL) {
1779                                 struct lu_context_key *key;
1780
1781                                 key = lu_keys[i];
1782                                 LASSERT(key != NULL);
1783                                 if (key->lct_exit != NULL)
1784                                         key->lct_exit(ctx,
1785                                                       key, ctx->lc_value[i]);
1786                         }
1787                 }
1788         }
1789 }
1790 EXPORT_SYMBOL(lu_context_exit);
1791
1792 /**
1793  * Allocate for context all missing keys that were registered after context
1794  * creation. key_set_version is only changed in rare cases when modules
1795  * are loaded and removed.
1796  */
1797 int lu_context_refill(struct lu_context *ctx)
1798 {
1799         return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
1800 }
1801 EXPORT_SYMBOL(lu_context_refill);
1802
1803 /**
1804  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1805  * obd being added. Currently, this is only used on client side, specifically
1806  * for echo device client, for other stack (like ptlrpc threads), context are
1807  * predefined when the lu_device type are registered, during the module probe
1808  * phase.
1809  */
1810 __u32 lu_context_tags_default = 0;
1811 __u32 lu_session_tags_default = 0;
1812
1813 void lu_context_tags_update(__u32 tags)
1814 {
1815         spin_lock(&lu_keys_guard);
1816         lu_context_tags_default |= tags;
1817         key_set_version++;
1818         spin_unlock(&lu_keys_guard);
1819 }
1820 EXPORT_SYMBOL(lu_context_tags_update);
1821
1822 void lu_context_tags_clear(__u32 tags)
1823 {
1824         spin_lock(&lu_keys_guard);
1825         lu_context_tags_default &= ~tags;
1826         key_set_version++;
1827         spin_unlock(&lu_keys_guard);
1828 }
1829 EXPORT_SYMBOL(lu_context_tags_clear);
1830
1831 void lu_session_tags_update(__u32 tags)
1832 {
1833         spin_lock(&lu_keys_guard);
1834         lu_session_tags_default |= tags;
1835         key_set_version++;
1836         spin_unlock(&lu_keys_guard);
1837 }
1838 EXPORT_SYMBOL(lu_session_tags_update);
1839
1840 void lu_session_tags_clear(__u32 tags)
1841 {
1842         spin_lock(&lu_keys_guard);
1843         lu_session_tags_default &= ~tags;
1844         key_set_version++;
1845         spin_unlock(&lu_keys_guard);
1846 }
1847 EXPORT_SYMBOL(lu_session_tags_clear);
1848
1849 int lu_env_init(struct lu_env *env, __u32 tags)
1850 {
1851         int result;
1852
1853         env->le_ses = NULL;
1854         result = lu_context_init(&env->le_ctx, tags);
1855         if (likely(result == 0))
1856                 lu_context_enter(&env->le_ctx);
1857         return result;
1858 }
1859 EXPORT_SYMBOL(lu_env_init);
1860
1861 void lu_env_fini(struct lu_env *env)
1862 {
1863         lu_context_exit(&env->le_ctx);
1864         lu_context_fini(&env->le_ctx);
1865         env->le_ses = NULL;
1866 }
1867 EXPORT_SYMBOL(lu_env_fini);
1868
1869 int lu_env_refill(struct lu_env *env)
1870 {
1871         int result;
1872
1873         result = lu_context_refill(&env->le_ctx);
1874         if (result == 0 && env->le_ses != NULL)
1875                 result = lu_context_refill(env->le_ses);
1876         return result;
1877 }
1878 EXPORT_SYMBOL(lu_env_refill);
1879
1880 /**
1881  * Currently, this API will only be used by echo client.
1882  * Because echo client and normal lustre client will share
1883  * same cl_env cache. So echo client needs to refresh
1884  * the env context after it get one from the cache, especially
1885  * when normal client and echo client co-exist in the same client.
1886  */
1887 int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
1888                           __u32 stags)
1889 {
1890         int    result;
1891
1892         if ((env->le_ctx.lc_tags & ctags) != ctags) {
1893                 env->le_ctx.lc_version = 0;
1894                 env->le_ctx.lc_tags |= ctags;
1895         }
1896
1897         if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
1898                 env->le_ses->lc_version = 0;
1899                 env->le_ses->lc_tags |= stags;
1900         }
1901
1902         result = lu_env_refill(env);
1903
1904         return result;
1905 }
1906 EXPORT_SYMBOL(lu_env_refill_by_tags);
1907
1908 static struct shrinker *lu_site_shrinker;
1909
1910 typedef struct lu_site_stats{
1911         unsigned        lss_populated;
1912         unsigned        lss_max_search;
1913         unsigned        lss_total;
1914         unsigned        lss_busy;
1915 } lu_site_stats_t;
1916
1917 static void lu_site_stats_get(cfs_hash_t *hs,
1918                               lu_site_stats_t *stats, int populated)
1919 {
1920         cfs_hash_bd_t bd;
1921         int           i;
1922
1923         cfs_hash_for_each_bucket(hs, &bd, i) {
1924                 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1925                 struct hlist_head       *hhead;
1926
1927                 cfs_hash_bd_lock(hs, &bd, 1);
1928                 stats->lss_busy  += bkt->lsb_busy;
1929                 stats->lss_total += cfs_hash_bd_count_get(&bd);
1930                 stats->lss_max_search = max((int)stats->lss_max_search,
1931                                             cfs_hash_bd_depmax_get(&bd));
1932                 if (!populated) {
1933                         cfs_hash_bd_unlock(hs, &bd, 1);
1934                         continue;
1935                 }
1936
1937                 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1938                         if (!hlist_empty(hhead))
1939                                 stats->lss_populated++;
1940                 }
1941                 cfs_hash_bd_unlock(hs, &bd, 1);
1942         }
1943 }
1944
1945 #ifdef __KERNEL__
1946
1947 static unsigned long lu_cache_shrink_count(struct shrinker *sk,
1948                                            struct shrink_control *sc)
1949 {
1950         lu_site_stats_t stats;
1951         struct lu_site *s;
1952         struct lu_site *tmp;
1953         unsigned long cached = 0;
1954
1955         if (!(sc->gfp_mask & __GFP_FS))
1956                 return 0;
1957
1958         mutex_lock(&lu_sites_guard);
1959         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1960                 memset(&stats, 0, sizeof(stats));
1961                 lu_site_stats_get(s->ls_obj_hash, &stats, 0);
1962                 cached += stats.lss_total - stats.lss_busy;
1963         }
1964         mutex_unlock(&lu_sites_guard);
1965
1966         cached = (cached / 100) * sysctl_vfs_cache_pressure;
1967         CDEBUG(D_INODE, "%ld objects cached\n", cached);
1968         return cached;
1969 }
1970
1971 static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
1972                                           struct shrink_control *sc)
1973 {
1974         struct lu_site *s;
1975         struct lu_site *tmp;
1976         unsigned long remain = sc->nr_to_scan;
1977         LIST_HEAD(splice);
1978
1979         if (!(sc->gfp_mask & __GFP_FS))
1980                 /* We must not take the lu_sites_guard lock when
1981                  * __GFP_FS is *not* set because of the deadlock
1982                  * possibility detailed above. Additionally,
1983                  * since we cannot determine the number of
1984                  * objects in the cache without taking this
1985                  * lock, we're in a particularly tough spot. As
1986                  * a result, we'll just lie and say our cache is
1987                  * empty. This _should_ be ok, as we can't
1988                  * reclaim objects when __GFP_FS is *not* set
1989                  * anyways.
1990                  */
1991                 return SHRINK_STOP;
1992
1993         mutex_lock(&lu_sites_guard);
1994         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1995                 remain = lu_site_purge(&lu_shrink_env, s, remain);
1996                 /*
1997                  * Move just shrunk site to the tail of site list to
1998                  * assure shrinking fairness.
1999                  */
2000                 list_move_tail(&s->ls_linkage, &splice);
2001         }
2002         list_splice(&splice, lu_sites.prev);
2003         mutex_unlock(&lu_sites_guard);
2004
2005         return sc->nr_to_scan - remain;
2006 }
2007
2008 #ifndef HAVE_SHRINKER_COUNT
2009 /*
2010  * There exists a potential lock inversion deadlock scenario when using
2011  * Lustre on top of ZFS. This occurs between one of ZFS's
2012  * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
2013  * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
2014  * while thread B will take the ht_lock and sleep on the lu_sites_guard
2015  * lock. Obviously neither thread will wake and drop their respective hold
2016  * on their lock.
2017  *
2018  * To prevent this from happening we must ensure the lu_sites_guard lock is
2019  * not taken while down this code path. ZFS reliably does not set the
2020  * __GFP_FS bit in its code paths, so this can be used to determine if it
2021  * is safe to take the lu_sites_guard lock.
2022  *
2023  * Ideally we should accurately return the remaining number of cached
2024  * objects without taking the  lu_sites_guard lock, but this is not
2025  * possible in the current implementation.
2026  */
2027 static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2028 {
2029         int cached = 0;
2030         struct shrink_control scv = {
2031                  .nr_to_scan = shrink_param(sc, nr_to_scan),
2032                  .gfp_mask   = shrink_param(sc, gfp_mask)
2033         };
2034 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2035         struct shrinker* shrinker = NULL;
2036 #endif
2037
2038
2039         CDEBUG(D_INODE, "Shrink %lu objects\n", scv.nr_to_scan);
2040
2041         lu_cache_shrink_scan(shrinker, &scv);
2042
2043         cached = lu_cache_shrink_count(shrinker, &scv);
2044         if (scv.nr_to_scan == 0)
2045                 CDEBUG(D_INODE, "%d objects cached\n", cached);
2046         return cached;
2047 }
2048
2049 #endif /* HAVE_SHRINKER_COUNT */
2050
2051
2052 /*
2053  * Debugging stuff.
2054  */
2055
2056 /**
2057  * Environment to be used in debugger, contains all tags.
2058  */
2059 struct lu_env lu_debugging_env;
2060
2061 /**
2062  * Debugging printer function using printk().
2063  */
2064 int lu_printk_printer(const struct lu_env *env,
2065                       void *unused, const char *format, ...)
2066 {
2067         va_list args;
2068
2069         va_start(args, format);
2070         vprintk(format, args);
2071         va_end(args);
2072         return 0;
2073 }
2074
2075 int lu_debugging_setup(void)
2076 {
2077         return lu_env_init(&lu_debugging_env, ~0);
2078 }
2079
2080 void lu_context_keys_dump(void)
2081 {
2082         int i;
2083
2084         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
2085                 struct lu_context_key *key;
2086
2087                 key = lu_keys[i];
2088                 if (key != NULL) {
2089                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
2090                                i, key, key->lct_tags,
2091                                key->lct_init, key->lct_fini, key->lct_exit,
2092                                key->lct_index, atomic_read(&key->lct_used),
2093                                key->lct_owner ? key->lct_owner->name : "",
2094                                key->lct_owner);
2095                         lu_ref_print(&key->lct_reference);
2096                 }
2097         }
2098 }
2099 EXPORT_SYMBOL(lu_context_keys_dump);
2100 #endif /* __KERNEL__ */
2101
2102 /**
2103  * Initialization of global lu_* data.
2104  */
2105 int lu_global_init(void)
2106 {
2107         int result;
2108         DEF_SHRINKER_VAR(shvar, lu_cache_shrink,
2109                          lu_cache_shrink_count, lu_cache_shrink_scan);
2110
2111         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
2112
2113         INIT_LIST_HEAD(&lu_device_types);
2114         INIT_LIST_HEAD(&lu_context_remembered);
2115
2116         result = lu_ref_global_init();
2117         if (result != 0)
2118                 return result;
2119
2120         LU_CONTEXT_KEY_INIT(&lu_global_key);
2121         result = lu_context_key_register(&lu_global_key);
2122         if (result != 0)
2123                 return result;
2124
2125         /*
2126          * At this level, we don't know what tags are needed, so allocate them
2127          * conservatively. This should not be too bad, because this
2128          * environment is global.
2129          */
2130         mutex_lock(&lu_sites_guard);
2131         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
2132         mutex_unlock(&lu_sites_guard);
2133         if (result != 0)
2134                 return result;
2135
2136         /*
2137          * seeks estimation: 3 seeks to read a record from oi, one to read
2138          * inode, one for ea. Unfortunately setting this high value results in
2139          * lu_object/inode cache consuming all the memory.
2140          */
2141         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS, &shvar);
2142         if (lu_site_shrinker == NULL)
2143                 return -ENOMEM;
2144
2145         return result;
2146 }
2147
2148 /**
2149  * Dual to lu_global_init().
2150  */
2151 void lu_global_fini(void)
2152 {
2153         if (lu_site_shrinker != NULL) {
2154                 remove_shrinker(lu_site_shrinker);
2155                 lu_site_shrinker = NULL;
2156         }
2157
2158         lu_context_key_degister(&lu_global_key);
2159
2160         /*
2161          * Tear shrinker environment down _after_ de-registering
2162          * lu_global_key, because the latter has a value in the former.
2163          */
2164         mutex_lock(&lu_sites_guard);
2165         lu_env_fini(&lu_shrink_env);
2166         mutex_unlock(&lu_sites_guard);
2167
2168         lu_ref_global_fini();
2169 }
2170
2171 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
2172 {
2173 #ifdef LPROCFS
2174         struct lprocfs_counter ret;
2175
2176         lprocfs_stats_collect(stats, idx, &ret);
2177         return (__u32)ret.lc_count;
2178 #else
2179         return 0;
2180 #endif
2181 }
2182
2183 /**
2184  * Output site statistical counters into a buffer. Suitable for
2185  * lprocfs_rd_*()-style functions.
2186  */
2187 int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
2188 {
2189         lu_site_stats_t stats;
2190
2191         memset(&stats, 0, sizeof(stats));
2192         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
2193
2194         return seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
2195                           stats.lss_busy,
2196                           stats.lss_total,
2197                           stats.lss_populated,
2198                           CFS_HASH_NHLIST(s->ls_obj_hash),
2199                           stats.lss_max_search,
2200                           ls_stats_read(s->ls_stats, LU_SS_CREATED),
2201                           ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2202                           ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2203                           ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2204                           ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2205                           ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2206 }
2207 EXPORT_SYMBOL(lu_site_stats_seq_print);
2208
2209 int lu_site_stats_print(const struct lu_site *s, char *page, int count)
2210 {
2211         lu_site_stats_t stats;
2212
2213         memset(&stats, 0, sizeof(stats));
2214         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
2215
2216         return snprintf(page, count, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
2217                         stats.lss_busy,
2218                         stats.lss_total,
2219                         stats.lss_populated,
2220                         CFS_HASH_NHLIST(s->ls_obj_hash),
2221                         stats.lss_max_search,
2222                         ls_stats_read(s->ls_stats, LU_SS_CREATED),
2223                         ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2224                         ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2225                         ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2226                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2227                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2228 }
2229 EXPORT_SYMBOL(lu_site_stats_print);
2230
2231 /**
2232  * Helper function to initialize a number of kmem slab caches at once.
2233  */
2234 int lu_kmem_init(struct lu_kmem_descr *caches)
2235 {
2236         int result;
2237         struct lu_kmem_descr *iter = caches;
2238
2239         for (result = 0; iter->ckd_cache != NULL; ++iter) {
2240                 *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
2241                                                      iter->ckd_size,
2242                                                      0, 0, NULL);
2243                 if (*iter->ckd_cache == NULL) {
2244                         result = -ENOMEM;
2245                         /* free all previously allocated caches */
2246                         lu_kmem_fini(caches);
2247                         break;
2248                 }
2249         }
2250         return result;
2251 }
2252 EXPORT_SYMBOL(lu_kmem_init);
2253
2254 /**
2255  * Helper function to finalize a number of kmem slab cached at once. Dual to
2256  * lu_kmem_init().
2257  */
2258 void lu_kmem_fini(struct lu_kmem_descr *caches)
2259 {
2260         for (; caches->ckd_cache != NULL; ++caches) {
2261                 if (*caches->ckd_cache != NULL) {
2262                         kmem_cache_destroy(*caches->ckd_cache);
2263                         *caches->ckd_cache = NULL;
2264                 }
2265         }
2266 }
2267 EXPORT_SYMBOL(lu_kmem_fini);
2268
2269 /**
2270  * Temporary solution to be able to assign fid in ->do_create()
2271  * till we have fully-functional OST fids
2272  */
2273 void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
2274                           const struct lu_fid *fid)
2275 {
2276         struct lu_site          *s = o->lo_dev->ld_site;
2277         struct lu_fid           *old = &o->lo_header->loh_fid;
2278         struct lu_site_bkt_data *bkt;
2279         struct lu_object        *shadow;
2280         wait_queue_t             waiter;
2281         cfs_hash_t              *hs;
2282         cfs_hash_bd_t            bd;
2283         __u64                    version = 0;
2284
2285         LASSERT(fid_is_zero(old));
2286
2287         hs = s->ls_obj_hash;
2288         cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
2289         shadow = htable_lookup(s, &bd, fid, &waiter, &version);
2290         /* supposed to be unique */
2291         LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
2292         *old = *fid;
2293         bkt = cfs_hash_bd_extra_get(hs, &bd);
2294         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
2295         bkt->lsb_busy++;
2296         cfs_hash_bd_unlock(hs, &bd, 1);
2297 }
2298 EXPORT_SYMBOL(lu_object_assign_fid);
2299
2300 /**
2301  * allocates object with 0 (non-assiged) fid
2302  * XXX: temporary solution to be able to assign fid in ->do_create()
2303  *      till we have fully-functional OST fids
2304  */
2305 struct lu_object *lu_object_anon(const struct lu_env *env,
2306                                  struct lu_device *dev,
2307                                  const struct lu_object_conf *conf)
2308 {
2309         struct lu_fid     fid;
2310         struct lu_object *o;
2311
2312         fid_zero(&fid);
2313         o = lu_object_alloc(env, dev, &fid, conf);
2314
2315         return o;
2316 }
2317 EXPORT_SYMBOL(lu_object_anon);
2318
2319 struct lu_buf LU_BUF_NULL = {
2320         .lb_buf = NULL,
2321         .lb_len = 0
2322 };
2323 EXPORT_SYMBOL(LU_BUF_NULL);
2324
2325 void lu_buf_free(struct lu_buf *buf)
2326 {
2327         LASSERT(buf);
2328         if (buf->lb_buf) {
2329                 LASSERT(buf->lb_len > 0);
2330                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2331                 buf->lb_buf = NULL;
2332                 buf->lb_len = 0;
2333         }
2334 }
2335 EXPORT_SYMBOL(lu_buf_free);
2336
2337 void lu_buf_alloc(struct lu_buf *buf, int size)
2338 {
2339         LASSERT(buf);
2340         LASSERT(buf->lb_buf == NULL);
2341         LASSERT(buf->lb_len == 0);
2342         OBD_ALLOC_LARGE(buf->lb_buf, size);
2343         if (likely(buf->lb_buf))
2344                 buf->lb_len = size;
2345 }
2346 EXPORT_SYMBOL(lu_buf_alloc);
2347
2348 void lu_buf_realloc(struct lu_buf *buf, int size)
2349 {
2350         lu_buf_free(buf);
2351         lu_buf_alloc(buf, size);
2352 }
2353 EXPORT_SYMBOL(lu_buf_realloc);
2354
2355 struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, int len)
2356 {
2357         if (buf->lb_buf == NULL && buf->lb_len == 0)
2358                 lu_buf_alloc(buf, len);
2359
2360         if ((len > buf->lb_len) && (buf->lb_buf != NULL))
2361                 lu_buf_realloc(buf, len);
2362
2363         return buf;
2364 }
2365 EXPORT_SYMBOL(lu_buf_check_and_alloc);
2366
2367 /**
2368  * Increase the size of the \a buf.
2369  * preserves old data in buffer
2370  * old buffer remains unchanged on error
2371  * \retval 0 or -ENOMEM
2372  */
2373 int lu_buf_check_and_grow(struct lu_buf *buf, int len)
2374 {
2375         char *ptr;
2376
2377         if (len <= buf->lb_len)
2378                 return 0;
2379
2380         OBD_ALLOC_LARGE(ptr, len);
2381         if (ptr == NULL)
2382                 return -ENOMEM;
2383
2384         /* Free the old buf */
2385         if (buf->lb_buf != NULL) {
2386                 memcpy(ptr, buf->lb_buf, buf->lb_len);
2387                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2388         }
2389
2390         buf->lb_buf = ptr;
2391         buf->lb_len = len;
2392         return 0;
2393 }
2394 EXPORT_SYMBOL(lu_buf_check_and_grow);
2395