Whamcloud - gitweb
LU-5164 osd: Limit lu_object cache
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  *   Author: Nikita Danilov <nikita.danilov@sun.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46
47 #include <libcfs/libcfs.h>
48
49 #ifdef __KERNEL__
50 # include <linux/module.h>
51 #endif
52
53 /* hash_long() */
54 #include <libcfs/libcfs_hash.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_disk.h>
58 #include <lustre_fid.h>
59 #include <lu_object.h>
60 #include <lu_ref.h>
61 #include <libcfs/list.h>
62
63 enum {
64         LU_CACHE_PERCENT_MAX     = 50,
65         LU_CACHE_PERCENT_DEFAULT = 20
66 };
67
68 #define LU_CACHE_NR_MAX_ADJUST          128
69 #define LU_CACHE_NR_UNLIMITED           -1
70 #define LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
71 #define LU_CACHE_NR_LDISKFS_LIMIT       LU_CACHE_NR_UNLIMITED
72 #define LU_CACHE_NR_ZFS_LIMIT           256
73
74 #define LU_SITE_BITS_MIN    12
75 #define LU_SITE_BITS_MAX    24
76 /**
77  * total 256 buckets, we don't want too many buckets because:
78  * - consume too much memory
79  * - avoid unbalanced LRU list
80  */
81 #define LU_SITE_BKT_BITS    8
82
83
84 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
85 CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
86                 "Percentage of memory to be used as lu_object cache");
87
88 static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
89 CFS_MODULE_PARM(lu_cache_nr, "l", long, 0644,
90                 "Maximum number of objects in lu_object cache");
91
92 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
93
94 /**
95  * Decrease reference counter on object. If last reference is freed, return
96  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
97  * case, free object immediately.
98  */
99 void lu_object_put(const struct lu_env *env, struct lu_object *o)
100 {
101         struct lu_site_bkt_data *bkt;
102         struct lu_object_header *top;
103         struct lu_site          *site;
104         struct lu_object        *orig;
105         cfs_hash_bd_t            bd;
106         const struct lu_fid     *fid;
107
108         top  = o->lo_header;
109         site = o->lo_dev->ld_site;
110         orig = o;
111
112         /*
113          * till we have full fids-on-OST implemented anonymous objects
114          * are possible in OSP. such an object isn't listed in the site
115          * so we should not remove it from the site.
116          */
117         fid = lu_object_fid(o);
118         if (fid_is_zero(fid)) {
119                 LASSERT(top->loh_hash.next == NULL
120                         && top->loh_hash.pprev == NULL);
121                 LASSERT(cfs_list_empty(&top->loh_lru));
122                 if (!cfs_atomic_dec_and_test(&top->loh_ref))
123                         return;
124                 cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
125                         if (o->lo_ops->loo_object_release != NULL)
126                                 o->lo_ops->loo_object_release(env, o);
127                 }
128                 lu_object_free(env, orig);
129                 return;
130         }
131
132         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
133         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
134
135         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
136                 if (lu_object_is_dying(top)) {
137
138                         /*
139                          * somebody may be waiting for this, currently only
140                          * used for cl_object, see cl_object_put_last().
141                          */
142                         wake_up_all(&bkt->lsb_marche_funebre);
143                 }
144                 return;
145         }
146
147         LASSERT(bkt->lsb_busy > 0);
148         bkt->lsb_busy--;
149         /*
150          * When last reference is released, iterate over object
151          * layers, and notify them that object is no longer busy.
152          */
153         cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
154                 if (o->lo_ops->loo_object_release != NULL)
155                         o->lo_ops->loo_object_release(env, o);
156         }
157
158         if (!lu_object_is_dying(top)) {
159                 LASSERT(cfs_list_empty(&top->loh_lru));
160                 cfs_list_add_tail(&top->loh_lru, &bkt->lsb_lru);
161                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
162                 return;
163         }
164
165         /*
166          * If object is dying (will not be cached), removed it
167          * from hash table and LRU.
168          *
169          * This is done with hash table and LRU lists locked. As the only
170          * way to acquire first reference to previously unreferenced
171          * object is through hash-table lookup (lu_object_find()),
172          * or LRU scanning (lu_site_purge()), that are done under hash-table
173          * and LRU lock, no race with concurrent object lookup is possible
174          * and we can safely destroy object below.
175          */
176         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
177                 cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
178         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
179         /*
180          * Object was already removed from hash and lru above, can
181          * kill it.
182          */
183         lu_object_free(env, orig);
184 }
185 EXPORT_SYMBOL(lu_object_put);
186
187 /**
188  * Put object and don't keep in cache. This is temporary solution for
189  * multi-site objects when its layering is not constant.
190  */
191 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
192 {
193         set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
194         return lu_object_put(env, o);
195 }
196 EXPORT_SYMBOL(lu_object_put_nocache);
197
198 /**
199  * Kill the object and take it out of LRU cache.
200  * Currently used by client code for layout change.
201  */
202 void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
203 {
204         struct lu_object_header *top;
205
206         top = o->lo_header;
207         set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
208         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
209                 cfs_hash_t *obj_hash = o->lo_dev->ld_site->ls_obj_hash;
210                 cfs_hash_bd_t bd;
211
212                 cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
213                 cfs_list_del_init(&top->loh_lru);
214                 cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
215                 cfs_hash_bd_unlock(obj_hash, &bd, 1);
216         }
217 }
218 EXPORT_SYMBOL(lu_object_unhash);
219
220 /**
221  * Allocate new object.
222  *
223  * This follows object creation protocol, described in the comment within
224  * struct lu_device_operations definition.
225  */
226 static struct lu_object *lu_object_alloc(const struct lu_env *env,
227                                          struct lu_device *dev,
228                                          const struct lu_fid *f,
229                                          const struct lu_object_conf *conf)
230 {
231         struct lu_object *scan;
232         struct lu_object *top;
233         cfs_list_t *layers;
234         unsigned int init_mask = 0;
235         unsigned int init_flag;
236         int clean;
237         int result;
238         ENTRY;
239
240         /*
241          * Create top-level object slice. This will also create
242          * lu_object_header.
243          */
244         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
245         if (top == NULL)
246                 RETURN(ERR_PTR(-ENOMEM));
247         if (IS_ERR(top))
248                 RETURN(top);
249         /*
250          * This is the only place where object fid is assigned. It's constant
251          * after this point.
252          */
253         top->lo_header->loh_fid = *f;
254         layers = &top->lo_header->loh_layers;
255
256         do {
257                 /*
258                  * Call ->loo_object_init() repeatedly, until no more new
259                  * object slices are created.
260                  */
261                 clean = 1;
262                 init_flag = 1;
263                 cfs_list_for_each_entry(scan, layers, lo_linkage) {
264                         if (init_mask & init_flag)
265                                 goto next;
266                         clean = 0;
267                         scan->lo_header = top->lo_header;
268                         result = scan->lo_ops->loo_object_init(env, scan, conf);
269                         if (result != 0) {
270                                 lu_object_free(env, top);
271                                 RETURN(ERR_PTR(result));
272                         }
273                         init_mask |= init_flag;
274 next:
275                         init_flag <<= 1;
276                 }
277         } while (!clean);
278
279         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
280                 if (scan->lo_ops->loo_object_start != NULL) {
281                         result = scan->lo_ops->loo_object_start(env, scan);
282                         if (result != 0) {
283                                 lu_object_free(env, top);
284                                 RETURN(ERR_PTR(result));
285                         }
286                 }
287         }
288
289         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
290         RETURN(top);
291 }
292
293 /**
294  * Free an object.
295  */
296 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
297 {
298         struct lu_site_bkt_data *bkt;
299         struct lu_site          *site;
300         struct lu_object        *scan;
301         cfs_list_t              *layers;
302         cfs_list_t               splice;
303
304         site   = o->lo_dev->ld_site;
305         layers = &o->lo_header->loh_layers;
306         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
307         /*
308          * First call ->loo_object_delete() method to release all resources.
309          */
310         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
311                 if (scan->lo_ops->loo_object_delete != NULL)
312                         scan->lo_ops->loo_object_delete(env, scan);
313         }
314
315         /*
316          * Then, splice object layers into stand-alone list, and call
317          * ->loo_object_free() on all layers to free memory. Splice is
318          * necessary, because lu_object_header is freed together with the
319          * top-level slice.
320          */
321         CFS_INIT_LIST_HEAD(&splice);
322         cfs_list_splice_init(layers, &splice);
323         while (!cfs_list_empty(&splice)) {
324                 /*
325                  * Free layers in bottom-to-top order, so that object header
326                  * lives as long as possible and ->loo_object_free() methods
327                  * can look at its contents.
328                  */
329                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
330                 cfs_list_del_init(&o->lo_linkage);
331                 LASSERT(o->lo_ops->loo_object_free != NULL);
332                 o->lo_ops->loo_object_free(env, o);
333         }
334
335         if (waitqueue_active(&bkt->lsb_marche_funebre))
336                 wake_up_all(&bkt->lsb_marche_funebre);
337 }
338
339 /**
340  * Free \a nr objects from the cold end of the site LRU list.
341  */
342 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
343 {
344         struct lu_object_header *h;
345         struct lu_object_header *temp;
346         struct lu_site_bkt_data *bkt;
347         cfs_hash_bd_t            bd;
348         cfs_hash_bd_t            bd2;
349         cfs_list_t               dispose;
350         int                      did_sth;
351         int                      start;
352         int                      count;
353         int                      bnr;
354         int                      i;
355
356         if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
357                 RETURN(0);
358
359         CFS_INIT_LIST_HEAD(&dispose);
360         /*
361          * Under LRU list lock, scan LRU list and move unreferenced objects to
362          * the dispose list, removing them from LRU and hash table.
363          */
364         start = s->ls_purge_start;
365         bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
366  again:
367         did_sth = 0;
368         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
369                 if (i < start)
370                         continue;
371                 count = bnr;
372                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
373                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
374
375                 cfs_list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
376                         LASSERT(cfs_atomic_read(&h->loh_ref) == 0);
377
378                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
379                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
380
381                         cfs_hash_bd_del_locked(s->ls_obj_hash,
382                                                &bd2, &h->loh_hash);
383                         cfs_list_move(&h->loh_lru, &dispose);
384                         if (did_sth == 0)
385                                 did_sth = 1;
386
387                         if (nr != ~0 && --nr == 0)
388                                 break;
389
390                         if (count > 0 && --count == 0)
391                                 break;
392
393                 }
394                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
395                 cond_resched();
396                 /*
397                  * Free everything on the dispose list. This is safe against
398                  * races due to the reasons described in lu_object_put().
399                  */
400                 while (!cfs_list_empty(&dispose)) {
401                         h = container_of0(dispose.next,
402                                           struct lu_object_header, loh_lru);
403                         cfs_list_del_init(&h->loh_lru);
404                         lu_object_free(env, lu_object_top(h));
405                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
406                 }
407
408                 if (nr == 0)
409                         break;
410         }
411
412         if (nr != 0 && did_sth && start != 0) {
413                 start = 0; /* restart from the first bucket */
414                 goto again;
415         }
416         /* race on s->ls_purge_start, but nobody cares */
417         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
418
419         return nr;
420 }
421 EXPORT_SYMBOL(lu_site_purge);
422
423 /*
424  * Object printing.
425  *
426  * Code below has to jump through certain loops to output object description
427  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
428  * composes object description from strings that are parts of _lines_ of
429  * output (i.e., strings that are not terminated by newline). This doesn't fit
430  * very well into libcfs_debug_msg() interface that assumes that each message
431  * supplied to it is a self-contained output line.
432  *
433  * To work around this, strings are collected in a temporary buffer
434  * (implemented as a value of lu_cdebug_key key), until terminating newline
435  * character is detected.
436  *
437  */
438
439 enum {
440         /**
441          * Maximal line size.
442          *
443          * XXX overflow is not handled correctly.
444          */
445         LU_CDEBUG_LINE = 512
446 };
447
448 struct lu_cdebug_data {
449         /**
450          * Temporary buffer.
451          */
452         char lck_area[LU_CDEBUG_LINE];
453 };
454
455 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
456 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
457
458 /**
459  * Key, holding temporary buffer. This key is registered very early by
460  * lu_global_init().
461  */
462 struct lu_context_key lu_global_key = {
463         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
464                     LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
465         .lct_init = lu_global_key_init,
466         .lct_fini = lu_global_key_fini
467 };
468
469 /**
470  * Printer function emitting messages through libcfs_debug_msg().
471  */
472 int lu_cdebug_printer(const struct lu_env *env,
473                       void *cookie, const char *format, ...)
474 {
475         struct libcfs_debug_msg_data *msgdata = cookie;
476         struct lu_cdebug_data        *key;
477         int used;
478         int complete;
479         va_list args;
480
481         va_start(args, format);
482
483         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
484         LASSERT(key != NULL);
485
486         used = strlen(key->lck_area);
487         complete = format[strlen(format) - 1] == '\n';
488         /*
489          * Append new chunk to the buffer.
490          */
491         vsnprintf(key->lck_area + used,
492                   ARRAY_SIZE(key->lck_area) - used, format, args);
493         if (complete) {
494                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
495                         libcfs_debug_msg(msgdata, "%s", key->lck_area);
496                 key->lck_area[0] = 0;
497         }
498         va_end(args);
499         return 0;
500 }
501 EXPORT_SYMBOL(lu_cdebug_printer);
502
503 /**
504  * Print object header.
505  */
506 void lu_object_header_print(const struct lu_env *env, void *cookie,
507                             lu_printer_t printer,
508                             const struct lu_object_header *hdr)
509 {
510         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
511                    hdr, hdr->loh_flags, cfs_atomic_read(&hdr->loh_ref),
512                    PFID(&hdr->loh_fid),
513                    cfs_hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
514                    cfs_list_empty((cfs_list_t *)&hdr->loh_lru) ? \
515                    "" : " lru",
516                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
517 }
518 EXPORT_SYMBOL(lu_object_header_print);
519
520 /**
521  * Print human readable representation of the \a o to the \a printer.
522  */
523 void lu_object_print(const struct lu_env *env, void *cookie,
524                      lu_printer_t printer, const struct lu_object *o)
525 {
526         static const char ruler[] = "........................................";
527         struct lu_object_header *top;
528         int depth = 4;
529
530         top = o->lo_header;
531         lu_object_header_print(env, cookie, printer, top);
532         (*printer)(env, cookie, "{\n");
533
534         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
535                 /*
536                  * print `.' \a depth times followed by type name and address
537                  */
538                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
539                            o->lo_dev->ld_type->ldt_name, o);
540
541                 if (o->lo_ops->loo_object_print != NULL)
542                         (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
543
544                 (*printer)(env, cookie, "\n");
545         }
546
547         (*printer)(env, cookie, "} header@%p\n", top);
548 }
549 EXPORT_SYMBOL(lu_object_print);
550
551 /**
552  * Check object consistency.
553  */
554 int lu_object_invariant(const struct lu_object *o)
555 {
556         struct lu_object_header *top;
557
558         top = o->lo_header;
559         cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
560                 if (o->lo_ops->loo_object_invariant != NULL &&
561                     !o->lo_ops->loo_object_invariant(o))
562                         return 0;
563         }
564         return 1;
565 }
566 EXPORT_SYMBOL(lu_object_invariant);
567
568 static struct lu_object *htable_lookup(struct lu_site *s,
569                                        cfs_hash_bd_t *bd,
570                                        const struct lu_fid *f,
571                                        wait_queue_t *waiter,
572                                        __u64 *version)
573 {
574         struct lu_site_bkt_data *bkt;
575         struct lu_object_header *h;
576         cfs_hlist_node_t        *hnode;
577         __u64  ver = cfs_hash_bd_version_get(bd);
578
579         if (*version == ver)
580                 return ERR_PTR(-ENOENT);
581
582         *version = ver;
583         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
584         /* cfs_hash_bd_peek_locked is a somehow "internal" function
585          * of cfs_hash, it doesn't add refcount on object. */
586         hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
587         if (hnode == NULL) {
588                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
589                 return ERR_PTR(-ENOENT);
590         }
591
592         h = container_of0(hnode, struct lu_object_header, loh_hash);
593         if (likely(!lu_object_is_dying(h))) {
594                 cfs_hash_get(s->ls_obj_hash, hnode);
595                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
596                 cfs_list_del_init(&h->loh_lru);
597                 return lu_object_top(h);
598         }
599
600         /*
601          * Lookup found an object being destroyed this object cannot be
602          * returned (to assure that references to dying objects are eventually
603          * drained), and moreover, lookup has to wait until object is freed.
604          */
605
606         init_waitqueue_entry_current(waiter);
607         add_wait_queue(&bkt->lsb_marche_funebre, waiter);
608         set_current_state(TASK_UNINTERRUPTIBLE);
609         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
610         return ERR_PTR(-EAGAIN);
611 }
612
613 static struct lu_object *htable_lookup_nowait(struct lu_site *s,
614                                               cfs_hash_bd_t *bd,
615                                               const struct lu_fid *f)
616 {
617         cfs_hlist_node_t        *hnode;
618         struct lu_object_header *h;
619
620         /* cfs_hash_bd_peek_locked is a somehow "internal" function
621          * of cfs_hash, it doesn't add refcount on object. */
622         hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
623         if (hnode == NULL) {
624                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
625                 return ERR_PTR(-ENOENT);
626         }
627
628         h = container_of0(hnode, struct lu_object_header, loh_hash);
629         if (unlikely(lu_object_is_dying(h)))
630                 return ERR_PTR(-ENOENT);
631
632         cfs_hash_get(s->ls_obj_hash, hnode);
633         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
634         cfs_list_del_init(&h->loh_lru);
635         return lu_object_top(h);
636 }
637
638 /**
639  * Search cache for an object with the fid \a f. If such object is found,
640  * return it. Otherwise, create new object, insert it into cache and return
641  * it. In any case, additional reference is acquired on the returned object.
642  */
643 struct lu_object *lu_object_find(const struct lu_env *env,
644                                  struct lu_device *dev, const struct lu_fid *f,
645                                  const struct lu_object_conf *conf)
646 {
647         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
648 }
649 EXPORT_SYMBOL(lu_object_find);
650
651 /*
652  * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because
653  * the calculation for the number of objects to reclaim is not covered by
654  * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST.
655  * This ensures that many concurrent threads will not accidentally purge
656  * the entire cache.
657  */
658 static void lu_object_limit(const struct lu_env *env,
659                             struct lu_device *dev)
660 {
661         __u64 size, nr;
662
663         if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
664                 return;
665
666         size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
667         nr = (__u64)lu_cache_nr;
668         if (size > nr)
669                 lu_site_purge(env, dev->ld_site,
670                               MIN(size - nr, LU_CACHE_NR_MAX_ADJUST));
671
672         return;
673 }
674
675 static struct lu_object *lu_object_new(const struct lu_env *env,
676                                        struct lu_device *dev,
677                                        const struct lu_fid *f,
678                                        const struct lu_object_conf *conf)
679 {
680         struct lu_object        *o;
681         cfs_hash_t              *hs;
682         cfs_hash_bd_t            bd;
683         struct lu_site_bkt_data *bkt;
684
685         o = lu_object_alloc(env, dev, f, conf);
686         if (unlikely(IS_ERR(o)))
687                 return o;
688
689         hs = dev->ld_site->ls_obj_hash;
690         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
691         bkt = cfs_hash_bd_extra_get(hs, &bd);
692         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
693         bkt->lsb_busy++;
694         cfs_hash_bd_unlock(hs, &bd, 1);
695
696         lu_object_limit(env, dev);
697
698         return o;
699 }
700
701 /**
702  * Core logic of lu_object_find*() functions.
703  */
704 static struct lu_object *lu_object_find_try(const struct lu_env *env,
705                                             struct lu_device *dev,
706                                             const struct lu_fid *f,
707                                             const struct lu_object_conf *conf,
708                                             wait_queue_t *waiter)
709 {
710         struct lu_object      *o;
711         struct lu_object      *shadow;
712         struct lu_site        *s;
713         cfs_hash_t            *hs;
714         cfs_hash_bd_t          bd;
715         __u64                  version = 0;
716
717         /*
718          * This uses standard index maintenance protocol:
719          *
720          *     - search index under lock, and return object if found;
721          *     - otherwise, unlock index, allocate new object;
722          *     - lock index and search again;
723          *     - if nothing is found (usual case), insert newly created
724          *       object into index;
725          *     - otherwise (race: other thread inserted object), free
726          *       object just allocated.
727          *     - unlock index;
728          *     - return object.
729          *
730          * For "LOC_F_NEW" case, we are sure the object is new established.
731          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
732          * just alloc and insert directly.
733          *
734          * If dying object is found during index search, add @waiter to the
735          * site wait-queue and return ERR_PTR(-EAGAIN).
736          */
737         if (conf != NULL && conf->loc_flags & LOC_F_NEW)
738                 return lu_object_new(env, dev, f, conf);
739
740         s  = dev->ld_site;
741         hs = s->ls_obj_hash;
742         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
743         o = htable_lookup(s, &bd, f, waiter, &version);
744         cfs_hash_bd_unlock(hs, &bd, 1);
745         if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
746                 return o;
747
748         /*
749          * Allocate new object. This may result in rather complicated
750          * operations, including fld queries, inode loading, etc.
751          */
752         o = lu_object_alloc(env, dev, f, conf);
753         if (unlikely(IS_ERR(o)))
754                 return o;
755
756         LASSERT(lu_fid_eq(lu_object_fid(o), f));
757
758         cfs_hash_bd_lock(hs, &bd, 1);
759
760         shadow = htable_lookup(s, &bd, f, waiter, &version);
761         if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
762                 struct lu_site_bkt_data *bkt;
763
764                 bkt = cfs_hash_bd_extra_get(hs, &bd);
765                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
766                 bkt->lsb_busy++;
767                 cfs_hash_bd_unlock(hs, &bd, 1);
768
769                 lu_object_limit(env, dev);
770
771                 return o;
772         }
773
774         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
775         cfs_hash_bd_unlock(hs, &bd, 1);
776         lu_object_free(env, o);
777         return shadow;
778 }
779
780 /**
781  * Much like lu_object_find(), but top level device of object is specifically
782  * \a dev rather than top level device of the site. This interface allows
783  * objects of different "stacking" to be created within the same site.
784  */
785 struct lu_object *lu_object_find_at(const struct lu_env *env,
786                                     struct lu_device *dev,
787                                     const struct lu_fid *f,
788                                     const struct lu_object_conf *conf)
789 {
790         struct lu_site_bkt_data *bkt;
791         struct lu_object        *obj;
792         wait_queue_t           wait;
793
794         while (1) {
795                 obj = lu_object_find_try(env, dev, f, conf, &wait);
796                 if (obj != ERR_PTR(-EAGAIN))
797                         return obj;
798                 /*
799                  * lu_object_find_try() already added waiter into the
800                  * wait queue.
801                  */
802                 waitq_wait(&wait, TASK_UNINTERRUPTIBLE);
803                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
804                 remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
805         }
806 }
807 EXPORT_SYMBOL(lu_object_find_at);
808
809 /**
810  * Try to find the object in cache without waiting for the dead object
811  * to be released nor allocating object if no cached one was found.
812  *
813  * The found object will be set as LU_OBJECT_HEARD_BANSHEE for purging.
814  */
815 void lu_object_purge(const struct lu_env *env, struct lu_device *dev,
816                      const struct lu_fid *f)
817 {
818         struct lu_site          *s  = dev->ld_site;
819         cfs_hash_t              *hs = s->ls_obj_hash;
820         cfs_hash_bd_t            bd;
821         struct lu_object        *o;
822
823         cfs_hash_bd_get_and_lock(hs, f, &bd, 1);
824         o = htable_lookup_nowait(s, &bd, f);
825         cfs_hash_bd_unlock(hs, &bd, 1);
826         if (!IS_ERR(o)) {
827                 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
828                 lu_object_put(env, o);
829         }
830 }
831 EXPORT_SYMBOL(lu_object_purge);
832
833 /**
834  * Find object with given fid, and return its slice belonging to given device.
835  */
836 struct lu_object *lu_object_find_slice(const struct lu_env *env,
837                                        struct lu_device *dev,
838                                        const struct lu_fid *f,
839                                        const struct lu_object_conf *conf)
840 {
841         struct lu_object *top;
842         struct lu_object *obj;
843
844         top = lu_object_find(env, dev, f, conf);
845         if (!IS_ERR(top)) {
846                 obj = lu_object_locate(top->lo_header, dev->ld_type);
847                 if (obj == NULL)
848                         lu_object_put(env, top);
849         } else
850                 obj = top;
851         return obj;
852 }
853 EXPORT_SYMBOL(lu_object_find_slice);
854
855 /**
856  * Global list of all device types.
857  */
858 static CFS_LIST_HEAD(lu_device_types);
859
860 int lu_device_type_init(struct lu_device_type *ldt)
861 {
862         int result = 0;
863
864         CFS_INIT_LIST_HEAD(&ldt->ldt_linkage);
865         if (ldt->ldt_ops->ldto_init)
866                 result = ldt->ldt_ops->ldto_init(ldt);
867         if (result == 0)
868                 cfs_list_add(&ldt->ldt_linkage, &lu_device_types);
869         return result;
870 }
871 EXPORT_SYMBOL(lu_device_type_init);
872
873 void lu_device_type_fini(struct lu_device_type *ldt)
874 {
875         cfs_list_del_init(&ldt->ldt_linkage);
876         if (ldt->ldt_ops->ldto_fini)
877                 ldt->ldt_ops->ldto_fini(ldt);
878 }
879 EXPORT_SYMBOL(lu_device_type_fini);
880
881 void lu_types_stop(void)
882 {
883         struct lu_device_type *ldt;
884
885         cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
886                 if (ldt->ldt_device_nr == 0 && ldt->ldt_ops->ldto_stop)
887                         ldt->ldt_ops->ldto_stop(ldt);
888         }
889 }
890 EXPORT_SYMBOL(lu_types_stop);
891
892 /**
893  * Global list of all sites on this node
894  */
895 static CFS_LIST_HEAD(lu_sites);
896 static DEFINE_MUTEX(lu_sites_guard);
897
898 /**
899  * Global environment used by site shrinker.
900  */
901 static struct lu_env lu_shrink_env;
902
903 struct lu_site_print_arg {
904         struct lu_env   *lsp_env;
905         void            *lsp_cookie;
906         lu_printer_t     lsp_printer;
907 };
908
909 static int
910 lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd,
911                   cfs_hlist_node_t *hnode, void *data)
912 {
913         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
914         struct lu_object_header  *h;
915
916         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
917         if (!cfs_list_empty(&h->loh_layers)) {
918                 const struct lu_object *o;
919
920                 o = lu_object_top(h);
921                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
922                                 arg->lsp_printer, o);
923         } else {
924                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
925                                        arg->lsp_printer, h);
926         }
927         return 0;
928 }
929
930 /**
931  * Print all objects in \a s.
932  */
933 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
934                    lu_printer_t printer)
935 {
936         struct lu_site_print_arg arg = {
937                 .lsp_env     = (struct lu_env *)env,
938                 .lsp_cookie  = cookie,
939                 .lsp_printer = printer,
940         };
941
942         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
943 }
944 EXPORT_SYMBOL(lu_site_print);
945
946 /**
947  * Return desired hash table order.
948  */
949 static int lu_htable_order(struct lu_device *top)
950 {
951         unsigned long cache_size;
952         int bits;
953
954         /*
955          * For ZFS based OSDs the cache should be disabled by default.  This
956          * allows the ZFS ARC maximum flexibility in determining what buffers
957          * to cache.  If Lustre has objects or buffer which it wants to ensure
958          * always stay cached it must maintain a hold on them.
959          */
960         if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
961                 lu_cache_percent = 1;
962                 lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
963                 return LU_SITE_BITS_MIN;
964         }
965
966         /*
967          * Calculate hash table size, assuming that we want reasonable
968          * performance when 20% of total memory is occupied by cache of
969          * lu_objects.
970          *
971          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
972          */
973         cache_size = totalram_pages;
974
975 #if BITS_PER_LONG == 32
976         /* limit hashtable size for lowmem systems to low RAM */
977         if (cache_size > 1 << (30 - PAGE_CACHE_SHIFT))
978                 cache_size = 1 << (30 - PAGE_CACHE_SHIFT) * 3 / 4;
979 #endif
980
981         /* clear off unreasonable cache setting. */
982         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
983                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
984                       " the range of (0, %u]. Will use default value: %u.\n",
985                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
986                       LU_CACHE_PERCENT_DEFAULT);
987
988                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
989         }
990         cache_size = cache_size / 100 * lu_cache_percent *
991                 (PAGE_CACHE_SIZE / 1024);
992
993         for (bits = 1; (1 << bits) < cache_size; ++bits) {
994                 ;
995         }
996         return bits;
997 }
998
999 static unsigned lu_obj_hop_hash(cfs_hash_t *hs,
1000                                 const void *key, unsigned mask)
1001 {
1002         struct lu_fid  *fid = (struct lu_fid *)key;
1003         __u32           hash;
1004
1005         hash = fid_flatten32(fid);
1006         hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
1007         hash = cfs_hash_long(hash, hs->hs_bkt_bits);
1008
1009         /* give me another random factor */
1010         hash -= cfs_hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
1011
1012         hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
1013         hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
1014
1015         return hash & mask;
1016 }
1017
1018 static void *lu_obj_hop_object(cfs_hlist_node_t *hnode)
1019 {
1020         return cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
1021 }
1022
1023 static void *lu_obj_hop_key(cfs_hlist_node_t *hnode)
1024 {
1025         struct lu_object_header *h;
1026
1027         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
1028         return &h->loh_fid;
1029 }
1030
1031 static int lu_obj_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
1032 {
1033         struct lu_object_header *h;
1034
1035         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
1036         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
1037 }
1038
1039 static void lu_obj_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
1040 {
1041         struct lu_object_header *h;
1042
1043         h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
1044         if (cfs_atomic_add_return(1, &h->loh_ref) == 1) {
1045                 struct lu_site_bkt_data *bkt;
1046                 cfs_hash_bd_t            bd;
1047
1048                 cfs_hash_bd_get(hs, &h->loh_fid, &bd);
1049                 bkt = cfs_hash_bd_extra_get(hs, &bd);
1050                 bkt->lsb_busy++;
1051         }
1052 }
1053
1054 static void lu_obj_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
1055 {
1056         LBUG(); /* we should never called it */
1057 }
1058
1059 cfs_hash_ops_t lu_site_hash_ops = {
1060         .hs_hash        = lu_obj_hop_hash,
1061         .hs_key         = lu_obj_hop_key,
1062         .hs_keycmp      = lu_obj_hop_keycmp,
1063         .hs_object      = lu_obj_hop_object,
1064         .hs_get         = lu_obj_hop_get,
1065         .hs_put_locked  = lu_obj_hop_put_locked,
1066 };
1067
1068 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
1069 {
1070         spin_lock(&s->ls_ld_lock);
1071         if (cfs_list_empty(&d->ld_linkage))
1072                 cfs_list_add(&d->ld_linkage, &s->ls_ld_linkage);
1073         spin_unlock(&s->ls_ld_lock);
1074 }
1075 EXPORT_SYMBOL(lu_dev_add_linkage);
1076
1077 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
1078 {
1079         spin_lock(&s->ls_ld_lock);
1080         cfs_list_del_init(&d->ld_linkage);
1081         spin_unlock(&s->ls_ld_lock);
1082 }
1083 EXPORT_SYMBOL(lu_dev_del_linkage);
1084
1085 /**
1086   * Initialize site \a s, with \a d as the top level device.
1087   */
1088 int lu_site_init(struct lu_site *s, struct lu_device *top)
1089 {
1090         struct lu_site_bkt_data *bkt;
1091         cfs_hash_bd_t bd;
1092         char name[16];
1093         int bits;
1094         int i;
1095         ENTRY;
1096
1097         memset(s, 0, sizeof *s);
1098         bits = lu_htable_order(top);
1099         snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
1100         for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
1101              bits >= LU_SITE_BITS_MIN; bits--) {
1102                 s->ls_obj_hash = cfs_hash_create(name, bits, bits,
1103                                                  bits - LU_SITE_BKT_BITS,
1104                                                  sizeof(*bkt), 0, 0,
1105                                                  &lu_site_hash_ops,
1106                                                  CFS_HASH_SPIN_BKTLOCK |
1107                                                  CFS_HASH_NO_ITEMREF |
1108                                                  CFS_HASH_DEPTH |
1109                                                  CFS_HASH_ASSERT_EMPTY |
1110                                                  CFS_HASH_COUNTER);
1111                 if (s->ls_obj_hash != NULL)
1112                         break;
1113         }
1114
1115         if (s->ls_obj_hash == NULL) {
1116                 CERROR("failed to create lu_site hash with bits: %d\n", bits);
1117                 return -ENOMEM;
1118         }
1119
1120         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
1121                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
1122                 CFS_INIT_LIST_HEAD(&bkt->lsb_lru);
1123                 init_waitqueue_head(&bkt->lsb_marche_funebre);
1124         }
1125
1126         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
1127         if (s->ls_stats == NULL) {
1128                 cfs_hash_putref(s->ls_obj_hash);
1129                 s->ls_obj_hash = NULL;
1130                 return -ENOMEM;
1131         }
1132
1133         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
1134                              0, "created", "created");
1135         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
1136                              0, "cache_hit", "cache_hit");
1137         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
1138                              0, "cache_miss", "cache_miss");
1139         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
1140                              0, "cache_race", "cache_race");
1141         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
1142                              0, "cache_death_race", "cache_death_race");
1143         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
1144                              0, "lru_purged", "lru_purged");
1145
1146         CFS_INIT_LIST_HEAD(&s->ls_linkage);
1147         s->ls_top_dev = top;
1148         top->ld_site = s;
1149         lu_device_get(top);
1150         lu_ref_add(&top->ld_reference, "site-top", s);
1151
1152         CFS_INIT_LIST_HEAD(&s->ls_ld_linkage);
1153         spin_lock_init(&s->ls_ld_lock);
1154
1155         lu_dev_add_linkage(s, top);
1156
1157         RETURN(0);
1158 }
1159 EXPORT_SYMBOL(lu_site_init);
1160
1161 /**
1162  * Finalize \a s and release its resources.
1163  */
1164 void lu_site_fini(struct lu_site *s)
1165 {
1166         mutex_lock(&lu_sites_guard);
1167         cfs_list_del_init(&s->ls_linkage);
1168         mutex_unlock(&lu_sites_guard);
1169
1170         if (s->ls_obj_hash != NULL) {
1171                 cfs_hash_putref(s->ls_obj_hash);
1172                 s->ls_obj_hash = NULL;
1173         }
1174
1175         if (s->ls_top_dev != NULL) {
1176                 s->ls_top_dev->ld_site = NULL;
1177                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1178                 lu_device_put(s->ls_top_dev);
1179                 s->ls_top_dev = NULL;
1180         }
1181
1182         if (s->ls_stats != NULL)
1183                 lprocfs_free_stats(&s->ls_stats);
1184 }
1185 EXPORT_SYMBOL(lu_site_fini);
1186
1187 /**
1188  * Called when initialization of stack for this site is completed.
1189  */
1190 int lu_site_init_finish(struct lu_site *s)
1191 {
1192         int result;
1193         mutex_lock(&lu_sites_guard);
1194         result = lu_context_refill(&lu_shrink_env.le_ctx);
1195         if (result == 0)
1196                 cfs_list_add(&s->ls_linkage, &lu_sites);
1197         mutex_unlock(&lu_sites_guard);
1198         return result;
1199 }
1200 EXPORT_SYMBOL(lu_site_init_finish);
1201
1202 /**
1203  * Acquire additional reference on device \a d
1204  */
1205 void lu_device_get(struct lu_device *d)
1206 {
1207         cfs_atomic_inc(&d->ld_ref);
1208 }
1209 EXPORT_SYMBOL(lu_device_get);
1210
1211 /**
1212  * Release reference on device \a d.
1213  */
1214 void lu_device_put(struct lu_device *d)
1215 {
1216         LASSERT(cfs_atomic_read(&d->ld_ref) > 0);
1217         cfs_atomic_dec(&d->ld_ref);
1218 }
1219 EXPORT_SYMBOL(lu_device_put);
1220
1221 /**
1222  * Initialize device \a d of type \a t.
1223  */
1224 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1225 {
1226         if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL)
1227                 t->ldt_ops->ldto_start(t);
1228         memset(d, 0, sizeof *d);
1229         cfs_atomic_set(&d->ld_ref, 0);
1230         d->ld_type = t;
1231         lu_ref_init(&d->ld_reference);
1232         CFS_INIT_LIST_HEAD(&d->ld_linkage);
1233         return 0;
1234 }
1235 EXPORT_SYMBOL(lu_device_init);
1236
1237 /**
1238  * Finalize device \a d.
1239  */
1240 void lu_device_fini(struct lu_device *d)
1241 {
1242         struct lu_device_type *t;
1243
1244         t = d->ld_type;
1245         if (d->ld_obd != NULL) {
1246                 d->ld_obd->obd_lu_dev = NULL;
1247                 d->ld_obd = NULL;
1248         }
1249
1250         lu_ref_fini(&d->ld_reference);
1251         LASSERTF(cfs_atomic_read(&d->ld_ref) == 0,
1252                  "Refcount is %u\n", cfs_atomic_read(&d->ld_ref));
1253         LASSERT(t->ldt_device_nr > 0);
1254         if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL)
1255                 t->ldt_ops->ldto_stop(t);
1256 }
1257 EXPORT_SYMBOL(lu_device_fini);
1258
1259 /**
1260  * Initialize object \a o that is part of compound object \a h and was created
1261  * by device \a d.
1262  */
1263 int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1264                    struct lu_device *d)
1265 {
1266         memset(o, 0, sizeof(*o));
1267         o->lo_header = h;
1268         o->lo_dev = d;
1269         lu_device_get(d);
1270         lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
1271         CFS_INIT_LIST_HEAD(&o->lo_linkage);
1272
1273         return 0;
1274 }
1275 EXPORT_SYMBOL(lu_object_init);
1276
1277 /**
1278  * Finalize object and release its resources.
1279  */
1280 void lu_object_fini(struct lu_object *o)
1281 {
1282         struct lu_device *dev = o->lo_dev;
1283
1284         LASSERT(cfs_list_empty(&o->lo_linkage));
1285
1286         if (dev != NULL) {
1287                 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1288                               "lu_object", o);
1289                 lu_device_put(dev);
1290                 o->lo_dev = NULL;
1291         }
1292 }
1293 EXPORT_SYMBOL(lu_object_fini);
1294
1295 /**
1296  * Add object \a o as first layer of compound object \a h
1297  *
1298  * This is typically called by the ->ldo_object_alloc() method of top-level
1299  * device.
1300  */
1301 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1302 {
1303         cfs_list_move(&o->lo_linkage, &h->loh_layers);
1304 }
1305 EXPORT_SYMBOL(lu_object_add_top);
1306
1307 /**
1308  * Add object \a o as a layer of compound object, going after \a before.
1309  *
1310  * This is typically called by the ->ldo_object_alloc() method of \a
1311  * before->lo_dev.
1312  */
1313 void lu_object_add(struct lu_object *before, struct lu_object *o)
1314 {
1315         cfs_list_move(&o->lo_linkage, &before->lo_linkage);
1316 }
1317 EXPORT_SYMBOL(lu_object_add);
1318
1319 /**
1320  * Initialize compound object.
1321  */
1322 int lu_object_header_init(struct lu_object_header *h)
1323 {
1324         memset(h, 0, sizeof *h);
1325         cfs_atomic_set(&h->loh_ref, 1);
1326         CFS_INIT_HLIST_NODE(&h->loh_hash);
1327         CFS_INIT_LIST_HEAD(&h->loh_lru);
1328         CFS_INIT_LIST_HEAD(&h->loh_layers);
1329         lu_ref_init(&h->loh_reference);
1330         return 0;
1331 }
1332 EXPORT_SYMBOL(lu_object_header_init);
1333
1334 /**
1335  * Finalize compound object.
1336  */
1337 void lu_object_header_fini(struct lu_object_header *h)
1338 {
1339         LASSERT(cfs_list_empty(&h->loh_layers));
1340         LASSERT(cfs_list_empty(&h->loh_lru));
1341         LASSERT(cfs_hlist_unhashed(&h->loh_hash));
1342         lu_ref_fini(&h->loh_reference);
1343 }
1344 EXPORT_SYMBOL(lu_object_header_fini);
1345
1346 /**
1347  * Given a compound object, find its slice, corresponding to the device type
1348  * \a dtype.
1349  */
1350 struct lu_object *lu_object_locate(struct lu_object_header *h,
1351                                    const struct lu_device_type *dtype)
1352 {
1353         struct lu_object *o;
1354
1355         cfs_list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1356                 if (o->lo_dev->ld_type == dtype)
1357                         return o;
1358         }
1359         return NULL;
1360 }
1361 EXPORT_SYMBOL(lu_object_locate);
1362
1363
1364
1365 /**
1366  * Finalize and free devices in the device stack.
1367  *
1368  * Finalize device stack by purging object cache, and calling
1369  * lu_device_type_operations::ldto_device_fini() and
1370  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1371  */
1372 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1373 {
1374         struct lu_site   *site = top->ld_site;
1375         struct lu_device *scan;
1376         struct lu_device *next;
1377
1378         lu_site_purge(env, site, ~0);
1379         for (scan = top; scan != NULL; scan = next) {
1380                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1381                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1382                 lu_device_put(scan);
1383         }
1384
1385         /* purge again. */
1386         lu_site_purge(env, site, ~0);
1387
1388         for (scan = top; scan != NULL; scan = next) {
1389                 const struct lu_device_type *ldt = scan->ld_type;
1390                 struct obd_type             *type;
1391
1392                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1393                 type = ldt->ldt_obd_type;
1394                 if (type != NULL) {
1395                         type->typ_refcnt--;
1396                         class_put_type(type);
1397                 }
1398         }
1399 }
1400 EXPORT_SYMBOL(lu_stack_fini);
1401
1402 enum {
1403         /**
1404          * Maximal number of tld slots.
1405          */
1406         LU_CONTEXT_KEY_NR = 40
1407 };
1408
1409 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1410
1411 static DEFINE_SPINLOCK(lu_keys_guard);
1412
1413 /**
1414  * Global counter incremented whenever key is registered, unregistered,
1415  * revived or quiesced. This is used to void unnecessary calls to
1416  * lu_context_refill(). No locking is provided, as initialization and shutdown
1417  * are supposed to be externally serialized.
1418  */
1419 static unsigned key_set_version = 0;
1420
1421 /**
1422  * Register new key.
1423  */
1424 int lu_context_key_register(struct lu_context_key *key)
1425 {
1426         int result;
1427         int i;
1428
1429         LASSERT(key->lct_init != NULL);
1430         LASSERT(key->lct_fini != NULL);
1431         LASSERT(key->lct_tags != 0);
1432         LASSERT(key->lct_owner != NULL);
1433
1434         result = -ENFILE;
1435         spin_lock(&lu_keys_guard);
1436         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1437                 if (lu_keys[i] == NULL) {
1438                         key->lct_index = i;
1439                         cfs_atomic_set(&key->lct_used, 1);
1440                         lu_keys[i] = key;
1441                         lu_ref_init(&key->lct_reference);
1442                         result = 0;
1443                         ++key_set_version;
1444                         break;
1445                 }
1446         }
1447         spin_unlock(&lu_keys_guard);
1448         return result;
1449 }
1450 EXPORT_SYMBOL(lu_context_key_register);
1451
1452 static void key_fini(struct lu_context *ctx, int index)
1453 {
1454         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1455                 struct lu_context_key *key;
1456
1457                 key = lu_keys[index];
1458                 LASSERT(key != NULL);
1459                 LASSERT(key->lct_fini != NULL);
1460                 LASSERT(cfs_atomic_read(&key->lct_used) > 1);
1461
1462                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1463                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1464                 cfs_atomic_dec(&key->lct_used);
1465
1466                 LASSERT(key->lct_owner != NULL);
1467                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1468                         LINVRNT(module_refcount(key->lct_owner) > 0);
1469                         module_put(key->lct_owner);
1470                 }
1471                 ctx->lc_value[index] = NULL;
1472         }
1473 }
1474
1475 /**
1476  * Deregister key.
1477  */
1478 void lu_context_key_degister(struct lu_context_key *key)
1479 {
1480         LASSERT(cfs_atomic_read(&key->lct_used) >= 1);
1481         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1482
1483         lu_context_key_quiesce(key);
1484
1485         ++key_set_version;
1486         spin_lock(&lu_keys_guard);
1487         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1488         if (lu_keys[key->lct_index]) {
1489                 lu_keys[key->lct_index] = NULL;
1490                 lu_ref_fini(&key->lct_reference);
1491         }
1492         spin_unlock(&lu_keys_guard);
1493
1494         LASSERTF(cfs_atomic_read(&key->lct_used) == 1,
1495                  "key has instances: %d\n",
1496                  cfs_atomic_read(&key->lct_used));
1497 }
1498 EXPORT_SYMBOL(lu_context_key_degister);
1499
1500 /**
1501  * Register a number of keys. This has to be called after all keys have been
1502  * initialized by a call to LU_CONTEXT_KEY_INIT().
1503  */
1504 int lu_context_key_register_many(struct lu_context_key *k, ...)
1505 {
1506         struct lu_context_key *key = k;
1507         va_list args;
1508         int result;
1509
1510         va_start(args, k);
1511         do {
1512                 result = lu_context_key_register(key);
1513                 if (result)
1514                         break;
1515                 key = va_arg(args, struct lu_context_key *);
1516         } while (key != NULL);
1517         va_end(args);
1518
1519         if (result != 0) {
1520                 va_start(args, k);
1521                 while (k != key) {
1522                         lu_context_key_degister(k);
1523                         k = va_arg(args, struct lu_context_key *);
1524                 }
1525                 va_end(args);
1526         }
1527
1528         return result;
1529 }
1530 EXPORT_SYMBOL(lu_context_key_register_many);
1531
1532 /**
1533  * De-register a number of keys. This is a dual to
1534  * lu_context_key_register_many().
1535  */
1536 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1537 {
1538         va_list args;
1539
1540         va_start(args, k);
1541         do {
1542                 lu_context_key_degister(k);
1543                 k = va_arg(args, struct lu_context_key*);
1544         } while (k != NULL);
1545         va_end(args);
1546 }
1547 EXPORT_SYMBOL(lu_context_key_degister_many);
1548
1549 /**
1550  * Revive a number of keys.
1551  */
1552 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1553 {
1554         va_list args;
1555
1556         va_start(args, k);
1557         do {
1558                 lu_context_key_revive(k);
1559                 k = va_arg(args, struct lu_context_key*);
1560         } while (k != NULL);
1561         va_end(args);
1562 }
1563 EXPORT_SYMBOL(lu_context_key_revive_many);
1564
1565 /**
1566  * Quiescent a number of keys.
1567  */
1568 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1569 {
1570         va_list args;
1571
1572         va_start(args, k);
1573         do {
1574                 lu_context_key_quiesce(k);
1575                 k = va_arg(args, struct lu_context_key*);
1576         } while (k != NULL);
1577         va_end(args);
1578 }
1579 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1580
1581 /**
1582  * Return value associated with key \a key in context \a ctx.
1583  */
1584 void *lu_context_key_get(const struct lu_context *ctx,
1585                          const struct lu_context_key *key)
1586 {
1587         LINVRNT(ctx->lc_state == LCS_ENTERED);
1588         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1589         LASSERT(lu_keys[key->lct_index] == key);
1590         return ctx->lc_value[key->lct_index];
1591 }
1592 EXPORT_SYMBOL(lu_context_key_get);
1593
1594 /**
1595  * List of remembered contexts. XXX document me.
1596  */
1597 static CFS_LIST_HEAD(lu_context_remembered);
1598
1599 /**
1600  * Destroy \a key in all remembered contexts. This is used to destroy key
1601  * values in "shared" contexts (like service threads), when a module owning
1602  * the key is about to be unloaded.
1603  */
1604 void lu_context_key_quiesce(struct lu_context_key *key)
1605 {
1606         struct lu_context *ctx;
1607
1608         if (!(key->lct_tags & LCT_QUIESCENT)) {
1609                 /*
1610                  * XXX layering violation.
1611                  */
1612                 key->lct_tags |= LCT_QUIESCENT;
1613                 /*
1614                  * XXX memory barrier has to go here.
1615                  */
1616                 spin_lock(&lu_keys_guard);
1617                 cfs_list_for_each_entry(ctx, &lu_context_remembered,
1618                                         lc_remember)
1619                         key_fini(ctx, key->lct_index);
1620                 spin_unlock(&lu_keys_guard);
1621                 ++key_set_version;
1622         }
1623 }
1624 EXPORT_SYMBOL(lu_context_key_quiesce);
1625
1626 void lu_context_key_revive(struct lu_context_key *key)
1627 {
1628         key->lct_tags &= ~LCT_QUIESCENT;
1629         ++key_set_version;
1630 }
1631 EXPORT_SYMBOL(lu_context_key_revive);
1632
1633 static void keys_fini(struct lu_context *ctx)
1634 {
1635         int     i;
1636
1637         if (ctx->lc_value == NULL)
1638                 return;
1639
1640         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1641                 key_fini(ctx, i);
1642
1643         OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1644         ctx->lc_value = NULL;
1645 }
1646
1647 static int keys_fill(struct lu_context *ctx)
1648 {
1649         int i;
1650
1651         LINVRNT(ctx->lc_value != NULL);
1652         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1653                 struct lu_context_key *key;
1654
1655                 key = lu_keys[i];
1656                 if (ctx->lc_value[i] == NULL && key != NULL &&
1657                     (key->lct_tags & ctx->lc_tags) &&
1658                     /*
1659                      * Don't create values for a LCT_QUIESCENT key, as this
1660                      * will pin module owning a key.
1661                      */
1662                     !(key->lct_tags & LCT_QUIESCENT)) {
1663                         void *value;
1664
1665                         LINVRNT(key->lct_init != NULL);
1666                         LINVRNT(key->lct_index == i);
1667
1668                         value = key->lct_init(ctx, key);
1669                         if (unlikely(IS_ERR(value)))
1670                                 return PTR_ERR(value);
1671
1672                         LASSERT(key->lct_owner != NULL);
1673                         if (!(ctx->lc_tags & LCT_NOREF))
1674                                 try_module_get(key->lct_owner);
1675                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1676                         cfs_atomic_inc(&key->lct_used);
1677                         /*
1678                          * This is the only place in the code, where an
1679                          * element of ctx->lc_value[] array is set to non-NULL
1680                          * value.
1681                          */
1682                         ctx->lc_value[i] = value;
1683                         if (key->lct_exit != NULL)
1684                                 ctx->lc_tags |= LCT_HAS_EXIT;
1685                 }
1686                 ctx->lc_version = key_set_version;
1687         }
1688         return 0;
1689 }
1690
1691 static int keys_init(struct lu_context *ctx)
1692 {
1693         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1694         if (likely(ctx->lc_value != NULL))
1695                 return keys_fill(ctx);
1696
1697         return -ENOMEM;
1698 }
1699
1700 /**
1701  * Initialize context data-structure. Create values for all keys.
1702  */
1703 int lu_context_init(struct lu_context *ctx, __u32 tags)
1704 {
1705         int     rc;
1706
1707         memset(ctx, 0, sizeof *ctx);
1708         ctx->lc_state = LCS_INITIALIZED;
1709         ctx->lc_tags = tags;
1710         if (tags & LCT_REMEMBER) {
1711                 spin_lock(&lu_keys_guard);
1712                 cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
1713                 spin_unlock(&lu_keys_guard);
1714         } else {
1715                 CFS_INIT_LIST_HEAD(&ctx->lc_remember);
1716         }
1717
1718         rc = keys_init(ctx);
1719         if (rc != 0)
1720                 lu_context_fini(ctx);
1721
1722         return rc;
1723 }
1724 EXPORT_SYMBOL(lu_context_init);
1725
1726 /**
1727  * Finalize context data-structure. Destroy key values.
1728  */
1729 void lu_context_fini(struct lu_context *ctx)
1730 {
1731         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1732         ctx->lc_state = LCS_FINALIZED;
1733
1734         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1735                 LASSERT(cfs_list_empty(&ctx->lc_remember));
1736                 keys_fini(ctx);
1737
1738         } else { /* could race with key degister */
1739                 spin_lock(&lu_keys_guard);
1740                 keys_fini(ctx);
1741                 cfs_list_del_init(&ctx->lc_remember);
1742                 spin_unlock(&lu_keys_guard);
1743         }
1744 }
1745 EXPORT_SYMBOL(lu_context_fini);
1746
1747 /**
1748  * Called before entering context.
1749  */
1750 void lu_context_enter(struct lu_context *ctx)
1751 {
1752         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1753         ctx->lc_state = LCS_ENTERED;
1754 }
1755 EXPORT_SYMBOL(lu_context_enter);
1756
1757 /**
1758  * Called after exiting from \a ctx
1759  */
1760 void lu_context_exit(struct lu_context *ctx)
1761 {
1762         int i;
1763
1764         LINVRNT(ctx->lc_state == LCS_ENTERED);
1765         ctx->lc_state = LCS_LEFT;
1766         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1767                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1768                         if (ctx->lc_value[i] != NULL) {
1769                                 struct lu_context_key *key;
1770
1771                                 key = lu_keys[i];
1772                                 LASSERT(key != NULL);
1773                                 if (key->lct_exit != NULL)
1774                                         key->lct_exit(ctx,
1775                                                       key, ctx->lc_value[i]);
1776                         }
1777                 }
1778         }
1779 }
1780 EXPORT_SYMBOL(lu_context_exit);
1781
1782 /**
1783  * Allocate for context all missing keys that were registered after context
1784  * creation. key_set_version is only changed in rare cases when modules
1785  * are loaded and removed.
1786  */
1787 int lu_context_refill(struct lu_context *ctx)
1788 {
1789         return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
1790 }
1791 EXPORT_SYMBOL(lu_context_refill);
1792
1793 /**
1794  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1795  * obd being added. Currently, this is only used on client side, specifically
1796  * for echo device client, for other stack (like ptlrpc threads), context are
1797  * predefined when the lu_device type are registered, during the module probe
1798  * phase.
1799  */
1800 __u32 lu_context_tags_default = 0;
1801 __u32 lu_session_tags_default = 0;
1802
1803 void lu_context_tags_update(__u32 tags)
1804 {
1805         spin_lock(&lu_keys_guard);
1806         lu_context_tags_default |= tags;
1807         key_set_version++;
1808         spin_unlock(&lu_keys_guard);
1809 }
1810 EXPORT_SYMBOL(lu_context_tags_update);
1811
1812 void lu_context_tags_clear(__u32 tags)
1813 {
1814         spin_lock(&lu_keys_guard);
1815         lu_context_tags_default &= ~tags;
1816         key_set_version++;
1817         spin_unlock(&lu_keys_guard);
1818 }
1819 EXPORT_SYMBOL(lu_context_tags_clear);
1820
1821 void lu_session_tags_update(__u32 tags)
1822 {
1823         spin_lock(&lu_keys_guard);
1824         lu_session_tags_default |= tags;
1825         key_set_version++;
1826         spin_unlock(&lu_keys_guard);
1827 }
1828 EXPORT_SYMBOL(lu_session_tags_update);
1829
1830 void lu_session_tags_clear(__u32 tags)
1831 {
1832         spin_lock(&lu_keys_guard);
1833         lu_session_tags_default &= ~tags;
1834         key_set_version++;
1835         spin_unlock(&lu_keys_guard);
1836 }
1837 EXPORT_SYMBOL(lu_session_tags_clear);
1838
1839 int lu_env_init(struct lu_env *env, __u32 tags)
1840 {
1841         int result;
1842
1843         env->le_ses = NULL;
1844         result = lu_context_init(&env->le_ctx, tags);
1845         if (likely(result == 0))
1846                 lu_context_enter(&env->le_ctx);
1847         return result;
1848 }
1849 EXPORT_SYMBOL(lu_env_init);
1850
1851 void lu_env_fini(struct lu_env *env)
1852 {
1853         lu_context_exit(&env->le_ctx);
1854         lu_context_fini(&env->le_ctx);
1855         env->le_ses = NULL;
1856 }
1857 EXPORT_SYMBOL(lu_env_fini);
1858
1859 int lu_env_refill(struct lu_env *env)
1860 {
1861         int result;
1862
1863         result = lu_context_refill(&env->le_ctx);
1864         if (result == 0 && env->le_ses != NULL)
1865                 result = lu_context_refill(env->le_ses);
1866         return result;
1867 }
1868 EXPORT_SYMBOL(lu_env_refill);
1869
1870 /**
1871  * Currently, this API will only be used by echo client.
1872  * Because echo client and normal lustre client will share
1873  * same cl_env cache. So echo client needs to refresh
1874  * the env context after it get one from the cache, especially
1875  * when normal client and echo client co-exist in the same client.
1876  */
1877 int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
1878                           __u32 stags)
1879 {
1880         int    result;
1881
1882         if ((env->le_ctx.lc_tags & ctags) != ctags) {
1883                 env->le_ctx.lc_version = 0;
1884                 env->le_ctx.lc_tags |= ctags;
1885         }
1886
1887         if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
1888                 env->le_ses->lc_version = 0;
1889                 env->le_ses->lc_tags |= stags;
1890         }
1891
1892         result = lu_env_refill(env);
1893
1894         return result;
1895 }
1896 EXPORT_SYMBOL(lu_env_refill_by_tags);
1897
1898 static struct shrinker *lu_site_shrinker;
1899
1900 typedef struct lu_site_stats{
1901         unsigned        lss_populated;
1902         unsigned        lss_max_search;
1903         unsigned        lss_total;
1904         unsigned        lss_busy;
1905 } lu_site_stats_t;
1906
1907 static void lu_site_stats_get(cfs_hash_t *hs,
1908                               lu_site_stats_t *stats, int populated)
1909 {
1910         cfs_hash_bd_t bd;
1911         int           i;
1912
1913         cfs_hash_for_each_bucket(hs, &bd, i) {
1914                 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1915                 cfs_hlist_head_t        *hhead;
1916
1917                 cfs_hash_bd_lock(hs, &bd, 1);
1918                 stats->lss_busy  += bkt->lsb_busy;
1919                 stats->lss_total += cfs_hash_bd_count_get(&bd);
1920                 stats->lss_max_search = max((int)stats->lss_max_search,
1921                                             cfs_hash_bd_depmax_get(&bd));
1922                 if (!populated) {
1923                         cfs_hash_bd_unlock(hs, &bd, 1);
1924                         continue;
1925                 }
1926
1927                 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1928                         if (!cfs_hlist_empty(hhead))
1929                                 stats->lss_populated++;
1930                 }
1931                 cfs_hash_bd_unlock(hs, &bd, 1);
1932         }
1933 }
1934
1935 #ifdef __KERNEL__
1936
1937 /*
1938  * There exists a potential lock inversion deadlock scenario when using
1939  * Lustre on top of ZFS. This occurs between one of ZFS's
1940  * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
1941  * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
1942  * while thread B will take the ht_lock and sleep on the lu_sites_guard
1943  * lock. Obviously neither thread will wake and drop their respective hold
1944  * on their lock.
1945  *
1946  * To prevent this from happening we must ensure the lu_sites_guard lock is
1947  * not taken while down this code path. ZFS reliably does not set the
1948  * __GFP_FS bit in its code paths, so this can be used to determine if it
1949  * is safe to take the lu_sites_guard lock.
1950  *
1951  * Ideally we should accurately return the remaining number of cached
1952  * objects without taking the  lu_sites_guard lock, but this is not
1953  * possible in the current implementation.
1954  */
1955 static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
1956 {
1957         lu_site_stats_t stats;
1958         struct lu_site *s;
1959         struct lu_site *tmp;
1960         int cached = 0;
1961         int remain = shrink_param(sc, nr_to_scan);
1962         CFS_LIST_HEAD(splice);
1963
1964         if (!(shrink_param(sc, gfp_mask) & __GFP_FS)) {
1965                 if (remain != 0)
1966                         return -1;
1967                 else
1968                         /* We must not take the lu_sites_guard lock when
1969                          * __GFP_FS is *not* set because of the deadlock
1970                          * possibility detailed above. Additionally,
1971                          * since we cannot determine the number of
1972                          * objects in the cache without taking this
1973                          * lock, we're in a particularly tough spot. As
1974                          * a result, we'll just lie and say our cache is
1975                          * empty. This _should_ be ok, as we can't
1976                          * reclaim objects when __GFP_FS is *not* set
1977                          * anyways.
1978                          */
1979                         return 0;
1980         }
1981
1982         CDEBUG(D_INODE, "Shrink %d objects\n", remain);
1983
1984         mutex_lock(&lu_sites_guard);
1985         cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1986                 if (shrink_param(sc, nr_to_scan) != 0) {
1987                         remain = lu_site_purge(&lu_shrink_env, s, remain);
1988                         /*
1989                          * Move just shrunk site to the tail of site list to
1990                          * assure shrinking fairness.
1991                          */
1992                         cfs_list_move_tail(&s->ls_linkage, &splice);
1993                 }
1994
1995                 memset(&stats, 0, sizeof(stats));
1996                 lu_site_stats_get(s->ls_obj_hash, &stats, 0);
1997                 cached += stats.lss_total - stats.lss_busy;
1998                 if (shrink_param(sc, nr_to_scan) && remain <= 0)
1999                         break;
2000         }
2001         cfs_list_splice(&splice, lu_sites.prev);
2002         mutex_unlock(&lu_sites_guard);
2003
2004         cached = (cached / 100) * sysctl_vfs_cache_pressure;
2005         if (shrink_param(sc, nr_to_scan) == 0)
2006                 CDEBUG(D_INODE, "%d objects cached\n", cached);
2007         return cached;
2008 }
2009
2010 /*
2011  * Debugging stuff.
2012  */
2013
2014 /**
2015  * Environment to be used in debugger, contains all tags.
2016  */
2017 struct lu_env lu_debugging_env;
2018
2019 /**
2020  * Debugging printer function using printk().
2021  */
2022 int lu_printk_printer(const struct lu_env *env,
2023                       void *unused, const char *format, ...)
2024 {
2025         va_list args;
2026
2027         va_start(args, format);
2028         vprintk(format, args);
2029         va_end(args);
2030         return 0;
2031 }
2032
2033 int lu_debugging_setup(void)
2034 {
2035         return lu_env_init(&lu_debugging_env, ~0);
2036 }
2037
2038 void lu_context_keys_dump(void)
2039 {
2040         int i;
2041
2042         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
2043                 struct lu_context_key *key;
2044
2045                 key = lu_keys[i];
2046                 if (key != NULL) {
2047                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
2048                                i, key, key->lct_tags,
2049                                key->lct_init, key->lct_fini, key->lct_exit,
2050                                key->lct_index, cfs_atomic_read(&key->lct_used),
2051                                key->lct_owner ? key->lct_owner->name : "",
2052                                key->lct_owner);
2053                         lu_ref_print(&key->lct_reference);
2054                 }
2055         }
2056 }
2057 EXPORT_SYMBOL(lu_context_keys_dump);
2058 #else  /* !__KERNEL__ */
2059 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
2060 {
2061         return 0;
2062 }
2063 #endif /* __KERNEL__ */
2064
2065 /**
2066  * Initialization of global lu_* data.
2067  */
2068 int lu_global_init(void)
2069 {
2070         int result;
2071
2072         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
2073
2074         result = lu_ref_global_init();
2075         if (result != 0)
2076                 return result;
2077
2078         LU_CONTEXT_KEY_INIT(&lu_global_key);
2079         result = lu_context_key_register(&lu_global_key);
2080         if (result != 0)
2081                 return result;
2082
2083         /*
2084          * At this level, we don't know what tags are needed, so allocate them
2085          * conservatively. This should not be too bad, because this
2086          * environment is global.
2087          */
2088         mutex_lock(&lu_sites_guard);
2089         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
2090         mutex_unlock(&lu_sites_guard);
2091         if (result != 0)
2092                 return result;
2093
2094         /*
2095          * seeks estimation: 3 seeks to read a record from oi, one to read
2096          * inode, one for ea. Unfortunately setting this high value results in
2097          * lu_object/inode cache consuming all the memory.
2098          */
2099         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS, lu_cache_shrink);
2100         if (lu_site_shrinker == NULL)
2101                 return -ENOMEM;
2102
2103         return result;
2104 }
2105
2106 /**
2107  * Dual to lu_global_init().
2108  */
2109 void lu_global_fini(void)
2110 {
2111         if (lu_site_shrinker != NULL) {
2112                 remove_shrinker(lu_site_shrinker);
2113                 lu_site_shrinker = NULL;
2114         }
2115
2116         lu_context_key_degister(&lu_global_key);
2117
2118         /*
2119          * Tear shrinker environment down _after_ de-registering
2120          * lu_global_key, because the latter has a value in the former.
2121          */
2122         mutex_lock(&lu_sites_guard);
2123         lu_env_fini(&lu_shrink_env);
2124         mutex_unlock(&lu_sites_guard);
2125
2126         lu_ref_global_fini();
2127 }
2128
2129 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
2130 {
2131 #ifdef LPROCFS
2132         struct lprocfs_counter ret;
2133
2134         lprocfs_stats_collect(stats, idx, &ret);
2135         return (__u32)ret.lc_count;
2136 #else
2137         return 0;
2138 #endif
2139 }
2140
2141 /**
2142  * Output site statistical counters into a buffer. Suitable for
2143  * lprocfs_rd_*()-style functions.
2144  */
2145 int lu_site_stats_print(const struct lu_site *s, char *page, int count)
2146 {
2147         lu_site_stats_t stats;
2148
2149         memset(&stats, 0, sizeof(stats));
2150         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
2151
2152         return snprintf(page, count, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
2153                         stats.lss_busy,
2154                         stats.lss_total,
2155                         stats.lss_populated,
2156                         CFS_HASH_NHLIST(s->ls_obj_hash),
2157                         stats.lss_max_search,
2158                         ls_stats_read(s->ls_stats, LU_SS_CREATED),
2159                         ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2160                         ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2161                         ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2162                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2163                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2164 }
2165 EXPORT_SYMBOL(lu_site_stats_print);
2166
2167 /**
2168  * Helper function to initialize a number of kmem slab caches at once.
2169  */
2170 int lu_kmem_init(struct lu_kmem_descr *caches)
2171 {
2172         int result;
2173         struct lu_kmem_descr *iter = caches;
2174
2175         for (result = 0; iter->ckd_cache != NULL; ++iter) {
2176                 *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
2177                                                      iter->ckd_size,
2178                                                      0, 0, NULL);
2179                 if (*iter->ckd_cache == NULL) {
2180                         result = -ENOMEM;
2181                         /* free all previously allocated caches */
2182                         lu_kmem_fini(caches);
2183                         break;
2184                 }
2185         }
2186         return result;
2187 }
2188 EXPORT_SYMBOL(lu_kmem_init);
2189
2190 /**
2191  * Helper function to finalize a number of kmem slab cached at once. Dual to
2192  * lu_kmem_init().
2193  */
2194 void lu_kmem_fini(struct lu_kmem_descr *caches)
2195 {
2196         for (; caches->ckd_cache != NULL; ++caches) {
2197                 if (*caches->ckd_cache != NULL) {
2198                         kmem_cache_destroy(*caches->ckd_cache);
2199                         *caches->ckd_cache = NULL;
2200                 }
2201         }
2202 }
2203 EXPORT_SYMBOL(lu_kmem_fini);
2204
2205 /**
2206  * Temporary solution to be able to assign fid in ->do_create()
2207  * till we have fully-functional OST fids
2208  */
2209 void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
2210                           const struct lu_fid *fid)
2211 {
2212         struct lu_site          *s = o->lo_dev->ld_site;
2213         struct lu_fid           *old = &o->lo_header->loh_fid;
2214         struct lu_site_bkt_data *bkt;
2215         struct lu_object        *shadow;
2216         wait_queue_t             waiter;
2217         cfs_hash_t              *hs;
2218         cfs_hash_bd_t            bd;
2219         __u64                    version = 0;
2220
2221         LASSERT(fid_is_zero(old));
2222
2223         hs = s->ls_obj_hash;
2224         cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
2225         shadow = htable_lookup(s, &bd, fid, &waiter, &version);
2226         /* supposed to be unique */
2227         LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
2228         *old = *fid;
2229         bkt = cfs_hash_bd_extra_get(hs, &bd);
2230         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
2231         bkt->lsb_busy++;
2232         cfs_hash_bd_unlock(hs, &bd, 1);
2233 }
2234 EXPORT_SYMBOL(lu_object_assign_fid);
2235
2236 /**
2237  * allocates object with 0 (non-assiged) fid
2238  * XXX: temporary solution to be able to assign fid in ->do_create()
2239  *      till we have fully-functional OST fids
2240  */
2241 struct lu_object *lu_object_anon(const struct lu_env *env,
2242                                  struct lu_device *dev,
2243                                  const struct lu_object_conf *conf)
2244 {
2245         struct lu_fid     fid;
2246         struct lu_object *o;
2247
2248         fid_zero(&fid);
2249         o = lu_object_alloc(env, dev, &fid, conf);
2250
2251         return o;
2252 }
2253 EXPORT_SYMBOL(lu_object_anon);
2254
2255 struct lu_buf LU_BUF_NULL = {
2256         .lb_buf = NULL,
2257         .lb_len = 0
2258 };
2259 EXPORT_SYMBOL(LU_BUF_NULL);
2260
2261 void lu_buf_free(struct lu_buf *buf)
2262 {
2263         LASSERT(buf);
2264         if (buf->lb_buf) {
2265                 LASSERT(buf->lb_len > 0);
2266                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2267                 buf->lb_buf = NULL;
2268                 buf->lb_len = 0;
2269         }
2270 }
2271 EXPORT_SYMBOL(lu_buf_free);
2272
2273 void lu_buf_alloc(struct lu_buf *buf, int size)
2274 {
2275         LASSERT(buf);
2276         LASSERT(buf->lb_buf == NULL);
2277         LASSERT(buf->lb_len == 0);
2278         OBD_ALLOC_LARGE(buf->lb_buf, size);
2279         if (likely(buf->lb_buf))
2280                 buf->lb_len = size;
2281 }
2282 EXPORT_SYMBOL(lu_buf_alloc);
2283
2284 void lu_buf_realloc(struct lu_buf *buf, int size)
2285 {
2286         lu_buf_free(buf);
2287         lu_buf_alloc(buf, size);
2288 }
2289 EXPORT_SYMBOL(lu_buf_realloc);
2290
2291 struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, int len)
2292 {
2293         if (buf->lb_buf == NULL && buf->lb_len == 0)
2294                 lu_buf_alloc(buf, len);
2295
2296         if ((len > buf->lb_len) && (buf->lb_buf != NULL))
2297                 lu_buf_realloc(buf, len);
2298
2299         return buf;
2300 }
2301 EXPORT_SYMBOL(lu_buf_check_and_alloc);
2302
2303 /**
2304  * Increase the size of the \a buf.
2305  * preserves old data in buffer
2306  * old buffer remains unchanged on error
2307  * \retval 0 or -ENOMEM
2308  */
2309 int lu_buf_check_and_grow(struct lu_buf *buf, int len)
2310 {
2311         char *ptr;
2312
2313         if (len <= buf->lb_len)
2314                 return 0;
2315
2316         OBD_ALLOC_LARGE(ptr, len);
2317         if (ptr == NULL)
2318                 return -ENOMEM;
2319
2320         /* Free the old buf */
2321         if (buf->lb_buf != NULL) {
2322                 memcpy(ptr, buf->lb_buf, buf->lb_len);
2323                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2324         }
2325
2326         buf->lb_buf = ptr;
2327         buf->lb_len = len;
2328         return 0;
2329 }
2330 EXPORT_SYMBOL(lu_buf_check_and_grow);
2331