Whamcloud - gitweb
LU-7038 obdclass: lu_site_purge() to handle purge-all
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/lu_object.c
37  *
38  * Lustre Object.
39  * These are the only exported functions, they provide some generic
40  * infrastructure for managing object devices
41  *
42  *   Author: Nikita Danilov <nikita.danilov@sun.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46
47 #include <libcfs/libcfs.h>
48 #include <linux/module.h>
49 #include <libcfs/libcfs_hash.h> /* hash_long() */
50 #include <obd_class.h>
51 #include <obd_support.h>
52 #include <lustre_disk.h>
53 #include <lustre_fid.h>
54 #include <lu_object.h>
55 #include <lu_ref.h>
56 #include <libcfs/list.h>
57
58 enum {
59         LU_CACHE_PERCENT_MAX     = 50,
60         LU_CACHE_PERCENT_DEFAULT = 20
61 };
62
63 #define LU_CACHE_NR_MAX_ADJUST          128
64 #define LU_CACHE_NR_UNLIMITED           -1
65 #define LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
66 #define LU_CACHE_NR_LDISKFS_LIMIT       LU_CACHE_NR_UNLIMITED
67 /** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */
68 #define LU_CACHE_NR_ZFS_LIMIT           10240
69
70 #define LU_SITE_BITS_MIN    12
71 #define LU_SITE_BITS_MAX    24
72 /**
73  * total 256 buckets, we don't want too many buckets because:
74  * - consume too much memory
75  * - avoid unbalanced LRU list
76  */
77 #define LU_SITE_BKT_BITS    8
78
79
80 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
81 CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
82                 "Percentage of memory to be used as lu_object cache");
83
84 static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
85 CFS_MODULE_PARM(lu_cache_nr, "l", long, 0644,
86                 "Maximum number of objects in lu_object cache");
87
88 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
89 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
90
91 /**
92  * Decrease reference counter on object. If last reference is freed, return
93  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
94  * case, free object immediately.
95  */
96 void lu_object_put(const struct lu_env *env, struct lu_object *o)
97 {
98         struct lu_site_bkt_data *bkt;
99         struct lu_object_header *top;
100         struct lu_site          *site;
101         struct lu_object        *orig;
102         struct cfs_hash_bd            bd;
103         const struct lu_fid     *fid;
104
105         top  = o->lo_header;
106         site = o->lo_dev->ld_site;
107         orig = o;
108
109         /*
110          * till we have full fids-on-OST implemented anonymous objects
111          * are possible in OSP. such an object isn't listed in the site
112          * so we should not remove it from the site.
113          */
114         fid = lu_object_fid(o);
115         if (fid_is_zero(fid)) {
116                 LASSERT(top->loh_hash.next == NULL
117                         && top->loh_hash.pprev == NULL);
118                 LASSERT(list_empty(&top->loh_lru));
119                 if (!atomic_dec_and_test(&top->loh_ref))
120                         return;
121                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
122                         if (o->lo_ops->loo_object_release != NULL)
123                                 o->lo_ops->loo_object_release(env, o);
124                 }
125                 lu_object_free(env, orig);
126                 return;
127         }
128
129         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
130         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
131
132         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
133                 if (lu_object_is_dying(top)) {
134
135                         /*
136                          * somebody may be waiting for this, currently only
137                          * used for cl_object, see cl_object_put_last().
138                          */
139                         wake_up_all(&bkt->lsb_marche_funebre);
140                 }
141                 return;
142         }
143
144         /*
145          * When last reference is released, iterate over object
146          * layers, and notify them that object is no longer busy.
147          */
148         list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
149                 if (o->lo_ops->loo_object_release != NULL)
150                         o->lo_ops->loo_object_release(env, o);
151         }
152
153         if (!lu_object_is_dying(top) &&
154             (lu_object_exists(orig) || lu_object_is_cl(orig))) {
155                 LASSERT(list_empty(&top->loh_lru));
156                 list_add_tail(&top->loh_lru, &bkt->lsb_lru);
157                 bkt->lsb_lru_len++;
158                 lprocfs_counter_incr(site->ls_stats, LU_SS_LRU_LEN);
159                 CDEBUG(D_INODE, "Add %p to site lru. hash: %p, bkt: %p, "
160                        "lru_len: %ld\n",
161                        o, site->ls_obj_hash, bkt, bkt->lsb_lru_len);
162                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
163                 return;
164         }
165
166         /*
167          * If object is dying (will not be cached) then remove it
168          * from hash table and LRU.
169          *
170          * This is done with hash table and LRU lists locked. As the only
171          * way to acquire first reference to previously unreferenced
172          * object is through hash-table lookup (lu_object_find()),
173          * or LRU scanning (lu_site_purge()), that are done under hash-table
174          * and LRU lock, no race with concurrent object lookup is possible
175          * and we can safely destroy object below.
176          */
177         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
178                 cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
179         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
180         /*
181          * Object was already removed from hash and lru above, can
182          * kill it.
183          */
184         lu_object_free(env, orig);
185 }
186 EXPORT_SYMBOL(lu_object_put);
187
188 /**
189  * Put object and don't keep in cache. This is temporary solution for
190  * multi-site objects when its layering is not constant.
191  */
192 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
193 {
194         set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
195         return lu_object_put(env, o);
196 }
197 EXPORT_SYMBOL(lu_object_put_nocache);
198
199 /**
200  * Kill the object and take it out of LRU cache.
201  * Currently used by client code for layout change.
202  */
203 void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
204 {
205         struct lu_object_header *top;
206
207         top = o->lo_header;
208         set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
209         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
210                 struct lu_site *site = o->lo_dev->ld_site;
211                 struct cfs_hash *obj_hash = site->ls_obj_hash;
212                 struct cfs_hash_bd bd;
213
214                 cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
215                 if (!list_empty(&top->loh_lru)) {
216                         struct lu_site_bkt_data *bkt;
217
218                         list_del_init(&top->loh_lru);
219                         bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
220                         bkt->lsb_lru_len--;
221                         lprocfs_counter_decr(site->ls_stats, LU_SS_LRU_LEN);
222                 }
223                 cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
224                 cfs_hash_bd_unlock(obj_hash, &bd, 1);
225         }
226 }
227 EXPORT_SYMBOL(lu_object_unhash);
228
229 /**
230  * Allocate new object.
231  *
232  * This follows object creation protocol, described in the comment within
233  * struct lu_device_operations definition.
234  */
235 static struct lu_object *lu_object_alloc(const struct lu_env *env,
236                                          struct lu_device *dev,
237                                          const struct lu_fid *f,
238                                          const struct lu_object_conf *conf)
239 {
240         struct lu_object *scan;
241         struct lu_object *top;
242         struct list_head *layers;
243         unsigned int init_mask = 0;
244         unsigned int init_flag;
245         int clean;
246         int result;
247         ENTRY;
248
249         /*
250          * Create top-level object slice. This will also create
251          * lu_object_header.
252          */
253         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
254         if (top == NULL)
255                 RETURN(ERR_PTR(-ENOMEM));
256         if (IS_ERR(top))
257                 RETURN(top);
258         /*
259          * This is the only place where object fid is assigned. It's constant
260          * after this point.
261          */
262         top->lo_header->loh_fid = *f;
263         layers = &top->lo_header->loh_layers;
264
265         do {
266                 /*
267                  * Call ->loo_object_init() repeatedly, until no more new
268                  * object slices are created.
269                  */
270                 clean = 1;
271                 init_flag = 1;
272                 list_for_each_entry(scan, layers, lo_linkage) {
273                         if (init_mask & init_flag)
274                                 goto next;
275                         clean = 0;
276                         scan->lo_header = top->lo_header;
277                         result = scan->lo_ops->loo_object_init(env, scan, conf);
278                         if (result != 0) {
279                                 lu_object_free(env, top);
280                                 RETURN(ERR_PTR(result));
281                         }
282                         init_mask |= init_flag;
283 next:
284                         init_flag <<= 1;
285                 }
286         } while (!clean);
287
288         list_for_each_entry_reverse(scan, layers, lo_linkage) {
289                 if (scan->lo_ops->loo_object_start != NULL) {
290                         result = scan->lo_ops->loo_object_start(env, scan);
291                         if (result != 0) {
292                                 lu_object_free(env, top);
293                                 RETURN(ERR_PTR(result));
294                         }
295                 }
296         }
297
298         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
299         RETURN(top);
300 }
301
302 /**
303  * Free an object.
304  */
305 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
306 {
307         struct lu_site_bkt_data *bkt;
308         struct lu_site          *site;
309         struct lu_object        *scan;
310         struct list_head        *layers;
311         struct list_head         splice;
312
313         site   = o->lo_dev->ld_site;
314         layers = &o->lo_header->loh_layers;
315         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
316         /*
317          * First call ->loo_object_delete() method to release all resources.
318          */
319         list_for_each_entry_reverse(scan, layers, lo_linkage) {
320                 if (scan->lo_ops->loo_object_delete != NULL)
321                         scan->lo_ops->loo_object_delete(env, scan);
322         }
323
324         /*
325          * Then, splice object layers into stand-alone list, and call
326          * ->loo_object_free() on all layers to free memory. Splice is
327          * necessary, because lu_object_header is freed together with the
328          * top-level slice.
329          */
330         INIT_LIST_HEAD(&splice);
331         list_splice_init(layers, &splice);
332         while (!list_empty(&splice)) {
333                 /*
334                  * Free layers in bottom-to-top order, so that object header
335                  * lives as long as possible and ->loo_object_free() methods
336                  * can look at its contents.
337                  */
338                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
339                 list_del_init(&o->lo_linkage);
340                 LASSERT(o->lo_ops->loo_object_free != NULL);
341                 o->lo_ops->loo_object_free(env, o);
342         }
343
344         if (waitqueue_active(&bkt->lsb_marche_funebre))
345                 wake_up_all(&bkt->lsb_marche_funebre);
346 }
347
348 /**
349  * Free \a nr objects from the cold end of the site LRU list.
350  */
351 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
352 {
353         struct lu_object_header *h;
354         struct lu_object_header *temp;
355         struct lu_site_bkt_data *bkt;
356         struct cfs_hash_bd            bd;
357         struct cfs_hash_bd            bd2;
358         struct list_head         dispose;
359         int                      did_sth;
360         unsigned int             start = 0;
361         int                      count;
362         int                      bnr;
363         unsigned int             i;
364
365         if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
366                 RETURN(0);
367
368         INIT_LIST_HEAD(&dispose);
369         /*
370          * Under LRU list lock, scan LRU list and move unreferenced objects to
371          * the dispose list, removing them from LRU and hash table.
372          */
373         if (nr != ~0)
374                 start = s->ls_purge_start;
375         bnr = (nr == ~0) ? -1 : nr / (int)CFS_HASH_NBKT(s->ls_obj_hash) + 1;
376  again:
377         /*
378          * It doesn't make any sense to make purge threads parallel, that can
379          * only bring troubles to us. See LU-5331.
380          */
381         mutex_lock(&s->ls_purge_mutex);
382         did_sth = 0;
383         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
384                 if (i < start)
385                         continue;
386                 count = bnr;
387                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
388                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
389
390                 list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
391                         LASSERT(atomic_read(&h->loh_ref) == 0);
392
393                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
394                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
395
396                         cfs_hash_bd_del_locked(s->ls_obj_hash,
397                                                &bd2, &h->loh_hash);
398                         list_move(&h->loh_lru, &dispose);
399                         bkt->lsb_lru_len--;
400                         lprocfs_counter_decr(s->ls_stats, LU_SS_LRU_LEN);
401                         if (did_sth == 0)
402                                 did_sth = 1;
403
404                         if (nr != ~0 && --nr == 0)
405                                 break;
406
407                         if (count > 0 && --count == 0)
408                                 break;
409
410                 }
411                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
412                 cond_resched();
413                 /*
414                  * Free everything on the dispose list. This is safe against
415                  * races due to the reasons described in lu_object_put().
416                  */
417                 while (!list_empty(&dispose)) {
418                         h = container_of0(dispose.next,
419                                           struct lu_object_header, loh_lru);
420                         list_del_init(&h->loh_lru);
421                         lu_object_free(env, lu_object_top(h));
422                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
423                 }
424
425                 if (nr == 0)
426                         break;
427         }
428         mutex_unlock(&s->ls_purge_mutex);
429
430         if (nr != 0 && did_sth && start != 0) {
431                 start = 0; /* restart from the first bucket */
432                 goto again;
433         }
434         /* race on s->ls_purge_start, but nobody cares */
435         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
436
437         return nr;
438 }
439 EXPORT_SYMBOL(lu_site_purge);
440
441 /*
442  * Object printing.
443  *
444  * Code below has to jump through certain loops to output object description
445  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
446  * composes object description from strings that are parts of _lines_ of
447  * output (i.e., strings that are not terminated by newline). This doesn't fit
448  * very well into libcfs_debug_msg() interface that assumes that each message
449  * supplied to it is a self-contained output line.
450  *
451  * To work around this, strings are collected in a temporary buffer
452  * (implemented as a value of lu_cdebug_key key), until terminating newline
453  * character is detected.
454  *
455  */
456
457 enum {
458         /**
459          * Maximal line size.
460          *
461          * XXX overflow is not handled correctly.
462          */
463         LU_CDEBUG_LINE = 512
464 };
465
466 struct lu_cdebug_data {
467         /**
468          * Temporary buffer.
469          */
470         char lck_area[LU_CDEBUG_LINE];
471 };
472
473 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
474 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
475
476 /**
477  * Key, holding temporary buffer. This key is registered very early by
478  * lu_global_init().
479  */
480 static struct lu_context_key lu_global_key = {
481         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
482                     LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
483         .lct_init = lu_global_key_init,
484         .lct_fini = lu_global_key_fini
485 };
486
487 /**
488  * Printer function emitting messages through libcfs_debug_msg().
489  */
490 int lu_cdebug_printer(const struct lu_env *env,
491                       void *cookie, const char *format, ...)
492 {
493         struct libcfs_debug_msg_data *msgdata = cookie;
494         struct lu_cdebug_data        *key;
495         int used;
496         int complete;
497         va_list args;
498
499         va_start(args, format);
500
501         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
502         LASSERT(key != NULL);
503
504         used = strlen(key->lck_area);
505         complete = format[strlen(format) - 1] == '\n';
506         /*
507          * Append new chunk to the buffer.
508          */
509         vsnprintf(key->lck_area + used,
510                   ARRAY_SIZE(key->lck_area) - used, format, args);
511         if (complete) {
512                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
513                         libcfs_debug_msg(msgdata, "%s\n", key->lck_area);
514                 key->lck_area[0] = 0;
515         }
516         va_end(args);
517         return 0;
518 }
519 EXPORT_SYMBOL(lu_cdebug_printer);
520
521 /**
522  * Print object header.
523  */
524 void lu_object_header_print(const struct lu_env *env, void *cookie,
525                             lu_printer_t printer,
526                             const struct lu_object_header *hdr)
527 {
528         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
529                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
530                    PFID(&hdr->loh_fid),
531                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
532                    list_empty((struct list_head *)&hdr->loh_lru) ? \
533                    "" : " lru",
534                    hdr->loh_attr & LOHA_EXISTS ? " exist" : "");
535 }
536 EXPORT_SYMBOL(lu_object_header_print);
537
538 /**
539  * Print human readable representation of the \a o to the \a printer.
540  */
541 void lu_object_print(const struct lu_env *env, void *cookie,
542                      lu_printer_t printer, const struct lu_object *o)
543 {
544         static const char ruler[] = "........................................";
545         struct lu_object_header *top;
546         int depth = 4;
547
548         top = o->lo_header;
549         lu_object_header_print(env, cookie, printer, top);
550         (*printer)(env, cookie, "{\n");
551
552         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
553                 /*
554                  * print `.' \a depth times followed by type name and address
555                  */
556                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
557                            o->lo_dev->ld_type->ldt_name, o);
558
559                 if (o->lo_ops->loo_object_print != NULL)
560                         (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
561
562                 (*printer)(env, cookie, "\n");
563         }
564
565         (*printer)(env, cookie, "} header@%p\n", top);
566 }
567 EXPORT_SYMBOL(lu_object_print);
568
569 /**
570  * Check object consistency.
571  */
572 int lu_object_invariant(const struct lu_object *o)
573 {
574         struct lu_object_header *top;
575
576         top = o->lo_header;
577         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
578                 if (o->lo_ops->loo_object_invariant != NULL &&
579                     !o->lo_ops->loo_object_invariant(o))
580                         return 0;
581         }
582         return 1;
583 }
584
585 static struct lu_object *htable_lookup(struct lu_site *s,
586                                        struct cfs_hash_bd *bd,
587                                        const struct lu_fid *f,
588                                        wait_queue_t *waiter,
589                                        __u64 *version)
590 {
591         struct lu_site_bkt_data *bkt;
592         struct lu_object_header *h;
593         struct hlist_node       *hnode;
594         __u64  ver = cfs_hash_bd_version_get(bd);
595
596         if (*version == ver)
597                 return ERR_PTR(-ENOENT);
598
599         *version = ver;
600         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
601         /* cfs_hash_bd_peek_locked is a somehow "internal" function
602          * of cfs_hash, it doesn't add refcount on object. */
603         hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
604         if (hnode == NULL) {
605                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
606                 return ERR_PTR(-ENOENT);
607         }
608
609         h = container_of0(hnode, struct lu_object_header, loh_hash);
610         if (likely(!lu_object_is_dying(h))) {
611                 cfs_hash_get(s->ls_obj_hash, hnode);
612                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
613                 if (!list_empty(&h->loh_lru)) {
614                         list_del_init(&h->loh_lru);
615                         bkt->lsb_lru_len--;
616                         lprocfs_counter_decr(s->ls_stats, LU_SS_LRU_LEN);
617                 }
618                 return lu_object_top(h);
619         }
620
621         /*
622          * Lookup found an object being destroyed this object cannot be
623          * returned (to assure that references to dying objects are eventually
624          * drained), and moreover, lookup has to wait until object is freed.
625          */
626
627         if (likely(waiter != NULL)) {
628                 init_waitqueue_entry(waiter, current);
629                 add_wait_queue(&bkt->lsb_marche_funebre, waiter);
630                 set_current_state(TASK_UNINTERRUPTIBLE);
631                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
632         }
633
634         return ERR_PTR(-EAGAIN);
635 }
636
637 /**
638  * Search cache for an object with the fid \a f. If such object is found,
639  * return it. Otherwise, create new object, insert it into cache and return
640  * it. In any case, additional reference is acquired on the returned object.
641  */
642 struct lu_object *lu_object_find(const struct lu_env *env,
643                                  struct lu_device *dev, const struct lu_fid *f,
644                                  const struct lu_object_conf *conf)
645 {
646         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
647 }
648 EXPORT_SYMBOL(lu_object_find);
649
650 /*
651  * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because
652  * the calculation for the number of objects to reclaim is not covered by
653  * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST.
654  * This ensures that many concurrent threads will not accidentally purge
655  * the entire cache.
656  */
657 static void lu_object_limit(const struct lu_env *env,
658                             struct lu_device *dev)
659 {
660         __u64 size, nr;
661
662         if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
663                 return;
664
665         size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
666         nr = (__u64)lu_cache_nr;
667         if (size > nr)
668                 lu_site_purge(env, dev->ld_site,
669                               MIN(size - nr, LU_CACHE_NR_MAX_ADJUST));
670
671         return;
672 }
673
674 static struct lu_object *lu_object_new(const struct lu_env *env,
675                                        struct lu_device *dev,
676                                        const struct lu_fid *f,
677                                        const struct lu_object_conf *conf)
678 {
679         struct lu_object        *o;
680         struct cfs_hash              *hs;
681         struct cfs_hash_bd            bd;
682
683         o = lu_object_alloc(env, dev, f, conf);
684         if (unlikely(IS_ERR(o)))
685                 return o;
686
687         hs = dev->ld_site->ls_obj_hash;
688         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
689         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
690         cfs_hash_bd_unlock(hs, &bd, 1);
691
692         lu_object_limit(env, dev);
693
694         return o;
695 }
696
697 /**
698  * Core logic of lu_object_find*() functions.
699  */
700 static struct lu_object *lu_object_find_try(const struct lu_env *env,
701                                             struct lu_device *dev,
702                                             const struct lu_fid *f,
703                                             const struct lu_object_conf *conf,
704                                             wait_queue_t *waiter)
705 {
706         struct lu_object      *o;
707         struct lu_object      *shadow;
708         struct lu_site        *s;
709         struct cfs_hash            *hs;
710         struct cfs_hash_bd          bd;
711         __u64                  version = 0;
712
713         /*
714          * This uses standard index maintenance protocol:
715          *
716          *     - search index under lock, and return object if found;
717          *     - otherwise, unlock index, allocate new object;
718          *     - lock index and search again;
719          *     - if nothing is found (usual case), insert newly created
720          *       object into index;
721          *     - otherwise (race: other thread inserted object), free
722          *       object just allocated.
723          *     - unlock index;
724          *     - return object.
725          *
726          * For "LOC_F_NEW" case, we are sure the object is new established.
727          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
728          * just alloc and insert directly.
729          *
730          * If dying object is found during index search, add @waiter to the
731          * site wait-queue and return ERR_PTR(-EAGAIN).
732          */
733         if (conf != NULL && conf->loc_flags & LOC_F_NEW)
734                 return lu_object_new(env, dev, f, conf);
735
736         s  = dev->ld_site;
737         hs = s->ls_obj_hash;
738         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
739         o = htable_lookup(s, &bd, f, waiter, &version);
740         cfs_hash_bd_unlock(hs, &bd, 1);
741         if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
742                 return o;
743
744         /*
745          * Allocate new object. This may result in rather complicated
746          * operations, including fld queries, inode loading, etc.
747          */
748         o = lu_object_alloc(env, dev, f, conf);
749         if (unlikely(IS_ERR(o)))
750                 return o;
751
752         LASSERT(lu_fid_eq(lu_object_fid(o), f));
753
754         cfs_hash_bd_lock(hs, &bd, 1);
755
756         shadow = htable_lookup(s, &bd, f, waiter, &version);
757         if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
758                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
759                 cfs_hash_bd_unlock(hs, &bd, 1);
760
761                 lu_object_limit(env, dev);
762
763                 return o;
764         }
765
766         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
767         cfs_hash_bd_unlock(hs, &bd, 1);
768         lu_object_free(env, o);
769         return shadow;
770 }
771
772 /**
773  * Much like lu_object_find(), but top level device of object is specifically
774  * \a dev rather than top level device of the site. This interface allows
775  * objects of different "stacking" to be created within the same site.
776  */
777 struct lu_object *lu_object_find_at(const struct lu_env *env,
778                                     struct lu_device *dev,
779                                     const struct lu_fid *f,
780                                     const struct lu_object_conf *conf)
781 {
782         struct lu_site_bkt_data *bkt;
783         struct lu_object        *obj;
784         wait_queue_t           wait;
785
786         if (conf != NULL && conf->loc_flags & LOC_F_NOWAIT)
787                 return lu_object_find_try(env, dev, f, conf, NULL);
788
789         while (1) {
790                 obj = lu_object_find_try(env, dev, f, conf, &wait);
791                 if (obj != ERR_PTR(-EAGAIN))
792                         return obj;
793                 /*
794                  * lu_object_find_try() already added waiter into the
795                  * wait queue.
796                  */
797                 schedule();
798                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
799                 remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
800         }
801 }
802 EXPORT_SYMBOL(lu_object_find_at);
803
804 /**
805  * Find object with given fid, and return its slice belonging to given device.
806  */
807 struct lu_object *lu_object_find_slice(const struct lu_env *env,
808                                        struct lu_device *dev,
809                                        const struct lu_fid *f,
810                                        const struct lu_object_conf *conf)
811 {
812         struct lu_object *top;
813         struct lu_object *obj;
814
815         top = lu_object_find(env, dev, f, conf);
816         if (IS_ERR(top))
817                 return top;
818
819         obj = lu_object_locate(top->lo_header, dev->ld_type);
820         if (unlikely(obj == NULL)) {
821                 lu_object_put(env, top);
822                 obj = ERR_PTR(-ENOENT);
823         }
824
825         return obj;
826 }
827 EXPORT_SYMBOL(lu_object_find_slice);
828
829 /**
830  * Global list of all device types.
831  */
832 static struct list_head lu_device_types;
833
834 int lu_device_type_init(struct lu_device_type *ldt)
835 {
836         int result = 0;
837
838         atomic_set(&ldt->ldt_device_nr, 0);
839         INIT_LIST_HEAD(&ldt->ldt_linkage);
840         if (ldt->ldt_ops->ldto_init)
841                 result = ldt->ldt_ops->ldto_init(ldt);
842
843         if (result == 0) {
844                 spin_lock(&obd_types_lock);
845                 list_add(&ldt->ldt_linkage, &lu_device_types);
846                 spin_unlock(&obd_types_lock);
847         }
848
849         return result;
850 }
851 EXPORT_SYMBOL(lu_device_type_init);
852
853 void lu_device_type_fini(struct lu_device_type *ldt)
854 {
855         spin_lock(&obd_types_lock);
856         list_del_init(&ldt->ldt_linkage);
857         spin_unlock(&obd_types_lock);
858         if (ldt->ldt_ops->ldto_fini)
859                 ldt->ldt_ops->ldto_fini(ldt);
860 }
861 EXPORT_SYMBOL(lu_device_type_fini);
862
863 /**
864  * Global list of all sites on this node
865  */
866 static struct list_head lu_sites;
867 static DEFINE_MUTEX(lu_sites_guard);
868
869 /**
870  * Global environment used by site shrinker.
871  */
872 static struct lu_env lu_shrink_env;
873
874 struct lu_site_print_arg {
875         struct lu_env   *lsp_env;
876         void            *lsp_cookie;
877         lu_printer_t     lsp_printer;
878 };
879
880 static int
881 lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
882                   struct hlist_node *hnode, void *data)
883 {
884         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
885         struct lu_object_header  *h;
886
887         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
888         if (!list_empty(&h->loh_layers)) {
889                 const struct lu_object *o;
890
891                 o = lu_object_top(h);
892                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
893                                 arg->lsp_printer, o);
894         } else {
895                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
896                                        arg->lsp_printer, h);
897         }
898         return 0;
899 }
900
901 /**
902  * Print all objects in \a s.
903  */
904 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
905                    lu_printer_t printer)
906 {
907         struct lu_site_print_arg arg = {
908                 .lsp_env     = (struct lu_env *)env,
909                 .lsp_cookie  = cookie,
910                 .lsp_printer = printer,
911         };
912
913         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
914 }
915 EXPORT_SYMBOL(lu_site_print);
916
917 /**
918  * Return desired hash table order.
919  */
920 static unsigned long lu_htable_order(struct lu_device *top)
921 {
922         unsigned long cache_size;
923         unsigned long bits;
924
925         /*
926          * For ZFS based OSDs the cache should be disabled by default.  This
927          * allows the ZFS ARC maximum flexibility in determining what buffers
928          * to cache.  If Lustre has objects or buffer which it wants to ensure
929          * always stay cached it must maintain a hold on them.
930          */
931         if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
932                 lu_cache_percent = 1;
933                 lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
934                 return LU_SITE_BITS_MIN;
935         }
936
937         /*
938          * Calculate hash table size, assuming that we want reasonable
939          * performance when 20% of total memory is occupied by cache of
940          * lu_objects.
941          *
942          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
943          */
944         cache_size = totalram_pages;
945
946 #if BITS_PER_LONG == 32
947         /* limit hashtable size for lowmem systems to low RAM */
948         if (cache_size > 1 << (30 - PAGE_CACHE_SHIFT))
949                 cache_size = 1 << (30 - PAGE_CACHE_SHIFT) * 3 / 4;
950 #endif
951
952         /* clear off unreasonable cache setting. */
953         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
954                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
955                       " the range of (0, %u]. Will use default value: %u.\n",
956                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
957                       LU_CACHE_PERCENT_DEFAULT);
958
959                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
960         }
961         cache_size = cache_size / 100 * lu_cache_percent *
962                 (PAGE_CACHE_SIZE / 1024);
963
964         for (bits = 1; (1 << bits) < cache_size; ++bits) {
965                 ;
966         }
967         return bits;
968 }
969
970 static unsigned lu_obj_hop_hash(struct cfs_hash *hs,
971                                 const void *key, unsigned mask)
972 {
973         struct lu_fid  *fid = (struct lu_fid *)key;
974         __u32           hash;
975
976         hash = fid_flatten32(fid);
977         hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
978         hash = hash_long(hash, hs->hs_bkt_bits);
979
980         /* give me another random factor */
981         hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
982
983         hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
984         hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
985
986         return hash & mask;
987 }
988
989 static void *lu_obj_hop_object(struct hlist_node *hnode)
990 {
991         return hlist_entry(hnode, struct lu_object_header, loh_hash);
992 }
993
994 static void *lu_obj_hop_key(struct hlist_node *hnode)
995 {
996         struct lu_object_header *h;
997
998         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
999         return &h->loh_fid;
1000 }
1001
1002 static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
1003 {
1004         struct lu_object_header *h;
1005
1006         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
1007         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
1008 }
1009
1010 static void lu_obj_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1011 {
1012         struct lu_object_header *h;
1013
1014         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
1015         atomic_inc(&h->loh_ref);
1016 }
1017
1018 static void lu_obj_hop_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
1019 {
1020         LBUG(); /* we should never called it */
1021 }
1022
1023 static struct cfs_hash_ops lu_site_hash_ops = {
1024         .hs_hash        = lu_obj_hop_hash,
1025         .hs_key         = lu_obj_hop_key,
1026         .hs_keycmp      = lu_obj_hop_keycmp,
1027         .hs_object      = lu_obj_hop_object,
1028         .hs_get         = lu_obj_hop_get,
1029         .hs_put_locked  = lu_obj_hop_put_locked,
1030 };
1031
1032 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
1033 {
1034         spin_lock(&s->ls_ld_lock);
1035         if (list_empty(&d->ld_linkage))
1036                 list_add(&d->ld_linkage, &s->ls_ld_linkage);
1037         spin_unlock(&s->ls_ld_lock);
1038 }
1039 EXPORT_SYMBOL(lu_dev_add_linkage);
1040
1041 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
1042 {
1043         spin_lock(&s->ls_ld_lock);
1044         list_del_init(&d->ld_linkage);
1045         spin_unlock(&s->ls_ld_lock);
1046 }
1047 EXPORT_SYMBOL(lu_dev_del_linkage);
1048
1049 /**
1050   * Initialize site \a s, with \a d as the top level device.
1051   */
1052 int lu_site_init(struct lu_site *s, struct lu_device *top)
1053 {
1054         struct lu_site_bkt_data *bkt;
1055         struct cfs_hash_bd bd;
1056         char name[16];
1057         unsigned long bits;
1058         unsigned int i;
1059         ENTRY;
1060
1061         memset(s, 0, sizeof *s);
1062         mutex_init(&s->ls_purge_mutex);
1063         bits = lu_htable_order(top);
1064         snprintf(name, sizeof(name), "lu_site_%s", top->ld_type->ldt_name);
1065         for (bits = clamp_t(typeof(bits), bits,
1066                             LU_SITE_BITS_MIN, LU_SITE_BITS_MAX);
1067              bits >= LU_SITE_BITS_MIN; bits--) {
1068                 s->ls_obj_hash = cfs_hash_create(name, bits, bits,
1069                                                  bits - LU_SITE_BKT_BITS,
1070                                                  sizeof(*bkt), 0, 0,
1071                                                  &lu_site_hash_ops,
1072                                                  CFS_HASH_SPIN_BKTLOCK |
1073                                                  CFS_HASH_NO_ITEMREF |
1074                                                  CFS_HASH_DEPTH |
1075                                                  CFS_HASH_ASSERT_EMPTY |
1076                                                  CFS_HASH_COUNTER);
1077                 if (s->ls_obj_hash != NULL)
1078                         break;
1079         }
1080
1081         if (s->ls_obj_hash == NULL) {
1082                 CERROR("failed to create lu_site hash with bits: %lu\n", bits);
1083                 return -ENOMEM;
1084         }
1085
1086         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
1087                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
1088                 INIT_LIST_HEAD(&bkt->lsb_lru);
1089                 init_waitqueue_head(&bkt->lsb_marche_funebre);
1090         }
1091
1092         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
1093         if (s->ls_stats == NULL) {
1094                 cfs_hash_putref(s->ls_obj_hash);
1095                 s->ls_obj_hash = NULL;
1096                 return -ENOMEM;
1097         }
1098
1099         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
1100                              0, "created", "created");
1101         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
1102                              0, "cache_hit", "cache_hit");
1103         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
1104                              0, "cache_miss", "cache_miss");
1105         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
1106                              0, "cache_race", "cache_race");
1107         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
1108                              0, "cache_death_race", "cache_death_race");
1109         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
1110                              0, "lru_purged", "lru_purged");
1111         /*
1112          * Unlike other counters, lru_len can be decremented so
1113          * need lc_sum instead of just lc_count
1114          */
1115         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_LEN,
1116                              LPROCFS_CNTR_AVGMINMAX, "lru_len", "lru_len");
1117
1118         INIT_LIST_HEAD(&s->ls_linkage);
1119         s->ls_top_dev = top;
1120         top->ld_site = s;
1121         lu_device_get(top);
1122         lu_ref_add(&top->ld_reference, "site-top", s);
1123
1124         INIT_LIST_HEAD(&s->ls_ld_linkage);
1125         spin_lock_init(&s->ls_ld_lock);
1126
1127         lu_dev_add_linkage(s, top);
1128
1129         RETURN(0);
1130 }
1131 EXPORT_SYMBOL(lu_site_init);
1132
1133 /**
1134  * Finalize \a s and release its resources.
1135  */
1136 void lu_site_fini(struct lu_site *s)
1137 {
1138         mutex_lock(&lu_sites_guard);
1139         list_del_init(&s->ls_linkage);
1140         mutex_unlock(&lu_sites_guard);
1141
1142         if (s->ls_obj_hash != NULL) {
1143                 cfs_hash_putref(s->ls_obj_hash);
1144                 s->ls_obj_hash = NULL;
1145         }
1146
1147         if (s->ls_top_dev != NULL) {
1148                 s->ls_top_dev->ld_site = NULL;
1149                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1150                 lu_device_put(s->ls_top_dev);
1151                 s->ls_top_dev = NULL;
1152         }
1153
1154         if (s->ls_stats != NULL)
1155                 lprocfs_free_stats(&s->ls_stats);
1156 }
1157 EXPORT_SYMBOL(lu_site_fini);
1158
1159 /**
1160  * Called when initialization of stack for this site is completed.
1161  */
1162 int lu_site_init_finish(struct lu_site *s)
1163 {
1164         int result;
1165         mutex_lock(&lu_sites_guard);
1166         result = lu_context_refill(&lu_shrink_env.le_ctx);
1167         if (result == 0)
1168                 list_add(&s->ls_linkage, &lu_sites);
1169         mutex_unlock(&lu_sites_guard);
1170         return result;
1171 }
1172 EXPORT_SYMBOL(lu_site_init_finish);
1173
1174 /**
1175  * Acquire additional reference on device \a d
1176  */
1177 void lu_device_get(struct lu_device *d)
1178 {
1179         atomic_inc(&d->ld_ref);
1180 }
1181 EXPORT_SYMBOL(lu_device_get);
1182
1183 /**
1184  * Release reference on device \a d.
1185  */
1186 void lu_device_put(struct lu_device *d)
1187 {
1188         LASSERT(atomic_read(&d->ld_ref) > 0);
1189         atomic_dec(&d->ld_ref);
1190 }
1191 EXPORT_SYMBOL(lu_device_put);
1192
1193 /**
1194  * Initialize device \a d of type \a t.
1195  */
1196 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1197 {
1198         if (atomic_inc_return(&t->ldt_device_nr) == 1 &&
1199             t->ldt_ops->ldto_start != NULL)
1200                 t->ldt_ops->ldto_start(t);
1201
1202         memset(d, 0, sizeof *d);
1203         d->ld_type = t;
1204         lu_ref_init(&d->ld_reference);
1205         INIT_LIST_HEAD(&d->ld_linkage);
1206
1207         return 0;
1208 }
1209 EXPORT_SYMBOL(lu_device_init);
1210
1211 /**
1212  * Finalize device \a d.
1213  */
1214 void lu_device_fini(struct lu_device *d)
1215 {
1216         struct lu_device_type *t = d->ld_type;
1217
1218         if (d->ld_obd != NULL) {
1219                 d->ld_obd->obd_lu_dev = NULL;
1220                 d->ld_obd = NULL;
1221         }
1222
1223         lu_ref_fini(&d->ld_reference);
1224         LASSERTF(atomic_read(&d->ld_ref) == 0,
1225                  "Refcount is %u\n", atomic_read(&d->ld_ref));
1226         LASSERT(atomic_read(&t->ldt_device_nr) > 0);
1227
1228         if (atomic_dec_and_test(&t->ldt_device_nr) &&
1229             t->ldt_ops->ldto_stop != NULL)
1230                 t->ldt_ops->ldto_stop(t);
1231 }
1232 EXPORT_SYMBOL(lu_device_fini);
1233
1234 /**
1235  * Initialize object \a o that is part of compound object \a h and was created
1236  * by device \a d.
1237  */
1238 int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1239                    struct lu_device *d)
1240 {
1241         memset(o, 0, sizeof(*o));
1242         o->lo_header = h;
1243         o->lo_dev = d;
1244         lu_device_get(d);
1245         lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
1246         INIT_LIST_HEAD(&o->lo_linkage);
1247
1248         return 0;
1249 }
1250 EXPORT_SYMBOL(lu_object_init);
1251
1252 /**
1253  * Finalize object and release its resources.
1254  */
1255 void lu_object_fini(struct lu_object *o)
1256 {
1257         struct lu_device *dev = o->lo_dev;
1258
1259         LASSERT(list_empty(&o->lo_linkage));
1260
1261         if (dev != NULL) {
1262                 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1263                               "lu_object", o);
1264                 lu_device_put(dev);
1265                 o->lo_dev = NULL;
1266         }
1267 }
1268 EXPORT_SYMBOL(lu_object_fini);
1269
1270 /**
1271  * Add object \a o as first layer of compound object \a h
1272  *
1273  * This is typically called by the ->ldo_object_alloc() method of top-level
1274  * device.
1275  */
1276 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1277 {
1278         list_move(&o->lo_linkage, &h->loh_layers);
1279 }
1280 EXPORT_SYMBOL(lu_object_add_top);
1281
1282 /**
1283  * Add object \a o as a layer of compound object, going after \a before.
1284  *
1285  * This is typically called by the ->ldo_object_alloc() method of \a
1286  * before->lo_dev.
1287  */
1288 void lu_object_add(struct lu_object *before, struct lu_object *o)
1289 {
1290         list_move(&o->lo_linkage, &before->lo_linkage);
1291 }
1292 EXPORT_SYMBOL(lu_object_add);
1293
1294 /**
1295  * Initialize compound object.
1296  */
1297 int lu_object_header_init(struct lu_object_header *h)
1298 {
1299         memset(h, 0, sizeof *h);
1300         atomic_set(&h->loh_ref, 1);
1301         INIT_HLIST_NODE(&h->loh_hash);
1302         INIT_LIST_HEAD(&h->loh_lru);
1303         INIT_LIST_HEAD(&h->loh_layers);
1304         lu_ref_init(&h->loh_reference);
1305         return 0;
1306 }
1307 EXPORT_SYMBOL(lu_object_header_init);
1308
1309 /**
1310  * Finalize compound object.
1311  */
1312 void lu_object_header_fini(struct lu_object_header *h)
1313 {
1314         LASSERT(list_empty(&h->loh_layers));
1315         LASSERT(list_empty(&h->loh_lru));
1316         LASSERT(hlist_unhashed(&h->loh_hash));
1317         lu_ref_fini(&h->loh_reference);
1318 }
1319 EXPORT_SYMBOL(lu_object_header_fini);
1320
1321 /**
1322  * Given a compound object, find its slice, corresponding to the device type
1323  * \a dtype.
1324  */
1325 struct lu_object *lu_object_locate(struct lu_object_header *h,
1326                                    const struct lu_device_type *dtype)
1327 {
1328         struct lu_object *o;
1329
1330         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1331                 if (o->lo_dev->ld_type == dtype)
1332                         return o;
1333         }
1334         return NULL;
1335 }
1336 EXPORT_SYMBOL(lu_object_locate);
1337
1338 /**
1339  * Finalize and free devices in the device stack.
1340  *
1341  * Finalize device stack by purging object cache, and calling
1342  * lu_device_type_operations::ldto_device_fini() and
1343  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1344  */
1345 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1346 {
1347         struct lu_site   *site = top->ld_site;
1348         struct lu_device *scan;
1349         struct lu_device *next;
1350
1351         lu_site_purge(env, site, ~0);
1352         for (scan = top; scan != NULL; scan = next) {
1353                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1354                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1355                 lu_device_put(scan);
1356         }
1357
1358         /* purge again. */
1359         lu_site_purge(env, site, ~0);
1360
1361         for (scan = top; scan != NULL; scan = next) {
1362                 const struct lu_device_type *ldt = scan->ld_type;
1363                 struct obd_type             *type;
1364
1365                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1366                 type = ldt->ldt_obd_type;
1367                 if (type != NULL) {
1368                         type->typ_refcnt--;
1369                         class_put_type(type);
1370                 }
1371         }
1372 }
1373
1374 enum {
1375         /**
1376          * Maximal number of tld slots.
1377          */
1378         LU_CONTEXT_KEY_NR = 40
1379 };
1380
1381 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1382
1383 DEFINE_RWLOCK(lu_keys_guard);
1384 static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0);
1385
1386 /**
1387  * Global counter incremented whenever key is registered, unregistered,
1388  * revived or quiesced. This is used to void unnecessary calls to
1389  * lu_context_refill(). No locking is provided, as initialization and shutdown
1390  * are supposed to be externally serialized.
1391  */
1392 static unsigned key_set_version = 0;
1393
1394 /**
1395  * Register new key.
1396  */
1397 int lu_context_key_register(struct lu_context_key *key)
1398 {
1399         int result;
1400         unsigned int i;
1401
1402         LASSERT(key->lct_init != NULL);
1403         LASSERT(key->lct_fini != NULL);
1404         LASSERT(key->lct_tags != 0);
1405         LASSERT(key->lct_owner != NULL);
1406
1407         result = -ENFILE;
1408         write_lock(&lu_keys_guard);
1409         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1410                 if (lu_keys[i] == NULL) {
1411                         key->lct_index = i;
1412                         atomic_set(&key->lct_used, 1);
1413                         lu_keys[i] = key;
1414                         lu_ref_init(&key->lct_reference);
1415                         result = 0;
1416                         ++key_set_version;
1417                         break;
1418                 }
1419         }
1420         write_unlock(&lu_keys_guard);
1421         return result;
1422 }
1423 EXPORT_SYMBOL(lu_context_key_register);
1424
1425 static void key_fini(struct lu_context *ctx, int index)
1426 {
1427         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1428                 struct lu_context_key *key;
1429
1430                 key = lu_keys[index];
1431                 LASSERT(key != NULL);
1432                 LASSERT(key->lct_fini != NULL);
1433                 LASSERT(atomic_read(&key->lct_used) > 1);
1434
1435                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1436                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1437                 atomic_dec(&key->lct_used);
1438
1439                 LASSERT(key->lct_owner != NULL);
1440                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1441                         LINVRNT(module_refcount(key->lct_owner) > 0);
1442                         module_put(key->lct_owner);
1443                 }
1444                 ctx->lc_value[index] = NULL;
1445         }
1446 }
1447
1448 /**
1449  * Deregister key.
1450  */
1451 void lu_context_key_degister(struct lu_context_key *key)
1452 {
1453         LASSERT(atomic_read(&key->lct_used) >= 1);
1454         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1455
1456         lu_context_key_quiesce(key);
1457
1458         ++key_set_version;
1459         write_lock(&lu_keys_guard);
1460         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1461
1462         /**
1463          * Wait until all transient contexts referencing this key have
1464          * run lu_context_key::lct_fini() method.
1465          */
1466         while (atomic_read(&key->lct_used) > 1) {
1467                 write_unlock(&lu_keys_guard);
1468                 CDEBUG(D_INFO, "lu_context_key_degister: \"%s\" %p, %d\n",
1469                        key->lct_owner ? key->lct_owner->name : "", key,
1470                        atomic_read(&key->lct_used));
1471                 schedule();
1472                 write_lock(&lu_keys_guard);
1473         }
1474         if (lu_keys[key->lct_index]) {
1475                 lu_keys[key->lct_index] = NULL;
1476                 lu_ref_fini(&key->lct_reference);
1477         }
1478         write_unlock(&lu_keys_guard);
1479
1480         LASSERTF(atomic_read(&key->lct_used) == 1,
1481                  "key has instances: %d\n",
1482                  atomic_read(&key->lct_used));
1483 }
1484 EXPORT_SYMBOL(lu_context_key_degister);
1485
1486 /**
1487  * Register a number of keys. This has to be called after all keys have been
1488  * initialized by a call to LU_CONTEXT_KEY_INIT().
1489  */
1490 int lu_context_key_register_many(struct lu_context_key *k, ...)
1491 {
1492         struct lu_context_key *key = k;
1493         va_list args;
1494         int result;
1495
1496         va_start(args, k);
1497         do {
1498                 result = lu_context_key_register(key);
1499                 if (result)
1500                         break;
1501                 key = va_arg(args, struct lu_context_key *);
1502         } while (key != NULL);
1503         va_end(args);
1504
1505         if (result != 0) {
1506                 va_start(args, k);
1507                 while (k != key) {
1508                         lu_context_key_degister(k);
1509                         k = va_arg(args, struct lu_context_key *);
1510                 }
1511                 va_end(args);
1512         }
1513
1514         return result;
1515 }
1516 EXPORT_SYMBOL(lu_context_key_register_many);
1517
1518 /**
1519  * De-register a number of keys. This is a dual to
1520  * lu_context_key_register_many().
1521  */
1522 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1523 {
1524         va_list args;
1525
1526         va_start(args, k);
1527         do {
1528                 lu_context_key_degister(k);
1529                 k = va_arg(args, struct lu_context_key*);
1530         } while (k != NULL);
1531         va_end(args);
1532 }
1533 EXPORT_SYMBOL(lu_context_key_degister_many);
1534
1535 /**
1536  * Revive a number of keys.
1537  */
1538 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1539 {
1540         va_list args;
1541
1542         va_start(args, k);
1543         do {
1544                 lu_context_key_revive(k);
1545                 k = va_arg(args, struct lu_context_key*);
1546         } while (k != NULL);
1547         va_end(args);
1548 }
1549 EXPORT_SYMBOL(lu_context_key_revive_many);
1550
1551 /**
1552  * Quiescent a number of keys.
1553  */
1554 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1555 {
1556         va_list args;
1557
1558         va_start(args, k);
1559         do {
1560                 lu_context_key_quiesce(k);
1561                 k = va_arg(args, struct lu_context_key*);
1562         } while (k != NULL);
1563         va_end(args);
1564 }
1565 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1566
1567 /**
1568  * Return value associated with key \a key in context \a ctx.
1569  */
1570 void *lu_context_key_get(const struct lu_context *ctx,
1571                          const struct lu_context_key *key)
1572 {
1573         LINVRNT(ctx->lc_state == LCS_ENTERED);
1574         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1575         LASSERT(lu_keys[key->lct_index] == key);
1576         return ctx->lc_value[key->lct_index];
1577 }
1578 EXPORT_SYMBOL(lu_context_key_get);
1579
1580 /**
1581  * List of remembered contexts. XXX document me.
1582  */
1583 static struct list_head lu_context_remembered;
1584
1585 /**
1586  * Destroy \a key in all remembered contexts. This is used to destroy key
1587  * values in "shared" contexts (like service threads), when a module owning
1588  * the key is about to be unloaded.
1589  */
1590 void lu_context_key_quiesce(struct lu_context_key *key)
1591 {
1592         struct lu_context *ctx;
1593         extern unsigned cl_env_cache_purge(unsigned nr);
1594
1595         if (!(key->lct_tags & LCT_QUIESCENT)) {
1596                 /*
1597                  * XXX layering violation.
1598                  */
1599                 cl_env_cache_purge(~0);
1600                 /*
1601                  * XXX memory barrier has to go here.
1602                  */
1603                 write_lock(&lu_keys_guard);
1604                 key->lct_tags |= LCT_QUIESCENT;
1605
1606                 /**
1607                  * Wait until all lu_context_key::lct_init() methods
1608                  * have completed.
1609                  */
1610                 while (atomic_read(&lu_key_initing_cnt) > 0) {
1611                         write_unlock(&lu_keys_guard);
1612                         CDEBUG(D_INFO, "lu_context_key_quiesce: \"%s\""
1613                                " %p, %d (%d)\n",
1614                                key->lct_owner ? key->lct_owner->name : "",
1615                                key, atomic_read(&key->lct_used),
1616                                atomic_read(&lu_key_initing_cnt));
1617                         schedule();
1618                         write_lock(&lu_keys_guard);
1619                 }
1620
1621                 list_for_each_entry(ctx, &lu_context_remembered,
1622                                     lc_remember)
1623                         key_fini(ctx, key->lct_index);
1624                 write_unlock(&lu_keys_guard);
1625                 ++key_set_version;
1626         }
1627 }
1628
1629 void lu_context_key_revive(struct lu_context_key *key)
1630 {
1631         key->lct_tags &= ~LCT_QUIESCENT;
1632         ++key_set_version;
1633 }
1634
1635 static void keys_fini(struct lu_context *ctx)
1636 {
1637         unsigned int i;
1638
1639         if (ctx->lc_value == NULL)
1640                 return;
1641
1642         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1643                 key_fini(ctx, i);
1644
1645         OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1646         ctx->lc_value = NULL;
1647 }
1648
1649 static int keys_fill(struct lu_context *ctx)
1650 {
1651         unsigned int i;
1652
1653         /*
1654          * A serialisation with lu_context_key_quiesce() is needed, but some
1655          * "key->lct_init()" are calling kernel memory allocation routine and
1656          * can't be called while holding a spin_lock.
1657          * "lu_keys_guard" is held while incrementing "lu_key_initing_cnt"
1658          * to ensure the start of the serialisation.
1659          * An atomic_t variable is still used, in order not to reacquire the
1660          * lock when decrementing the counter.
1661          */
1662         read_lock(&lu_keys_guard);
1663         atomic_inc(&lu_key_initing_cnt);
1664         read_unlock(&lu_keys_guard);
1665
1666         LINVRNT(ctx->lc_value != NULL);
1667         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1668                 struct lu_context_key *key;
1669
1670                 key = lu_keys[i];
1671                 if (ctx->lc_value[i] == NULL && key != NULL &&
1672                     (key->lct_tags & ctx->lc_tags) &&
1673                     /*
1674                      * Don't create values for a LCT_QUIESCENT key, as this
1675                      * will pin module owning a key.
1676                      */
1677                     !(key->lct_tags & LCT_QUIESCENT)) {
1678                         void *value;
1679
1680                         LINVRNT(key->lct_init != NULL);
1681                         LINVRNT(key->lct_index == i);
1682
1683                         LASSERT(key->lct_owner != NULL);
1684                         if (!(ctx->lc_tags & LCT_NOREF) &&
1685                             try_module_get(key->lct_owner) == 0) {
1686                                 /* module is unloading, skip this key */
1687                                 continue;
1688                         }
1689
1690                         value = key->lct_init(ctx, key);
1691                         if (unlikely(IS_ERR(value))) {
1692                                 atomic_dec(&lu_key_initing_cnt);
1693                                 return PTR_ERR(value);
1694                         }
1695
1696                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1697                         atomic_inc(&key->lct_used);
1698                         /*
1699                          * This is the only place in the code, where an
1700                          * element of ctx->lc_value[] array is set to non-NULL
1701                          * value.
1702                          */
1703                         ctx->lc_value[i] = value;
1704                         if (key->lct_exit != NULL)
1705                                 ctx->lc_tags |= LCT_HAS_EXIT;
1706                 }
1707                 ctx->lc_version = key_set_version;
1708         }
1709         atomic_dec(&lu_key_initing_cnt);
1710         return 0;
1711 }
1712
1713 static int keys_init(struct lu_context *ctx)
1714 {
1715         OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1716         if (likely(ctx->lc_value != NULL))
1717                 return keys_fill(ctx);
1718
1719         return -ENOMEM;
1720 }
1721
1722 /**
1723  * Initialize context data-structure. Create values for all keys.
1724  */
1725 int lu_context_init(struct lu_context *ctx, __u32 tags)
1726 {
1727         int     rc;
1728
1729         memset(ctx, 0, sizeof *ctx);
1730         ctx->lc_state = LCS_INITIALIZED;
1731         ctx->lc_tags = tags;
1732         if (tags & LCT_REMEMBER) {
1733                 write_lock(&lu_keys_guard);
1734                 list_add(&ctx->lc_remember, &lu_context_remembered);
1735                 write_unlock(&lu_keys_guard);
1736         } else {
1737                 INIT_LIST_HEAD(&ctx->lc_remember);
1738         }
1739
1740         rc = keys_init(ctx);
1741         if (rc != 0)
1742                 lu_context_fini(ctx);
1743
1744         return rc;
1745 }
1746 EXPORT_SYMBOL(lu_context_init);
1747
1748 /**
1749  * Finalize context data-structure. Destroy key values.
1750  */
1751 void lu_context_fini(struct lu_context *ctx)
1752 {
1753         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1754         ctx->lc_state = LCS_FINALIZED;
1755
1756         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1757                 LASSERT(list_empty(&ctx->lc_remember));
1758                 keys_fini(ctx);
1759
1760         } else { /* could race with key degister */
1761                 write_lock(&lu_keys_guard);
1762                 keys_fini(ctx);
1763                 list_del_init(&ctx->lc_remember);
1764                 write_unlock(&lu_keys_guard);
1765         }
1766 }
1767 EXPORT_SYMBOL(lu_context_fini);
1768
1769 /**
1770  * Called before entering context.
1771  */
1772 void lu_context_enter(struct lu_context *ctx)
1773 {
1774         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1775         ctx->lc_state = LCS_ENTERED;
1776 }
1777 EXPORT_SYMBOL(lu_context_enter);
1778
1779 /**
1780  * Called after exiting from \a ctx
1781  */
1782 void lu_context_exit(struct lu_context *ctx)
1783 {
1784         unsigned int i;
1785
1786         LINVRNT(ctx->lc_state == LCS_ENTERED);
1787         ctx->lc_state = LCS_LEFT;
1788         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1789                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1790                         /* could race with key quiescency */
1791                         if (ctx->lc_tags & LCT_REMEMBER)
1792                                 read_lock(&lu_keys_guard);
1793                         if (ctx->lc_value[i] != NULL) {
1794                                 struct lu_context_key *key;
1795
1796                                 key = lu_keys[i];
1797                                 LASSERT(key != NULL);
1798                                 if (key->lct_exit != NULL)
1799                                         key->lct_exit(ctx,
1800                                                       key, ctx->lc_value[i]);
1801                         }
1802                         if (ctx->lc_tags & LCT_REMEMBER)
1803                                 read_unlock(&lu_keys_guard);
1804                 }
1805         }
1806 }
1807 EXPORT_SYMBOL(lu_context_exit);
1808
1809 /**
1810  * Allocate for context all missing keys that were registered after context
1811  * creation. key_set_version is only changed in rare cases when modules
1812  * are loaded and removed.
1813  */
1814 int lu_context_refill(struct lu_context *ctx)
1815 {
1816         return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
1817 }
1818
1819 /**
1820  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1821  * obd being added. Currently, this is only used on client side, specifically
1822  * for echo device client, for other stack (like ptlrpc threads), context are
1823  * predefined when the lu_device type are registered, during the module probe
1824  * phase.
1825  */
1826 __u32 lu_context_tags_default = 0;
1827 __u32 lu_session_tags_default = 0;
1828
1829 void lu_context_tags_update(__u32 tags)
1830 {
1831         write_lock(&lu_keys_guard);
1832         lu_context_tags_default |= tags;
1833         key_set_version++;
1834         write_unlock(&lu_keys_guard);
1835 }
1836 EXPORT_SYMBOL(lu_context_tags_update);
1837
1838 void lu_context_tags_clear(__u32 tags)
1839 {
1840         write_lock(&lu_keys_guard);
1841         lu_context_tags_default &= ~tags;
1842         key_set_version++;
1843         write_unlock(&lu_keys_guard);
1844 }
1845 EXPORT_SYMBOL(lu_context_tags_clear);
1846
1847 void lu_session_tags_update(__u32 tags)
1848 {
1849         write_lock(&lu_keys_guard);
1850         lu_session_tags_default |= tags;
1851         key_set_version++;
1852         write_unlock(&lu_keys_guard);
1853 }
1854 EXPORT_SYMBOL(lu_session_tags_update);
1855
1856 void lu_session_tags_clear(__u32 tags)
1857 {
1858         write_lock(&lu_keys_guard);
1859         lu_session_tags_default &= ~tags;
1860         key_set_version++;
1861         write_unlock(&lu_keys_guard);
1862 }
1863 EXPORT_SYMBOL(lu_session_tags_clear);
1864
1865 int lu_env_init(struct lu_env *env, __u32 tags)
1866 {
1867         int result;
1868
1869         env->le_ses = NULL;
1870         result = lu_context_init(&env->le_ctx, tags);
1871         if (likely(result == 0))
1872                 lu_context_enter(&env->le_ctx);
1873         return result;
1874 }
1875 EXPORT_SYMBOL(lu_env_init);
1876
1877 void lu_env_fini(struct lu_env *env)
1878 {
1879         lu_context_exit(&env->le_ctx);
1880         lu_context_fini(&env->le_ctx);
1881         env->le_ses = NULL;
1882 }
1883 EXPORT_SYMBOL(lu_env_fini);
1884
1885 int lu_env_refill(struct lu_env *env)
1886 {
1887         int result;
1888
1889         result = lu_context_refill(&env->le_ctx);
1890         if (result == 0 && env->le_ses != NULL)
1891                 result = lu_context_refill(env->le_ses);
1892         return result;
1893 }
1894 EXPORT_SYMBOL(lu_env_refill);
1895
1896 /**
1897  * Currently, this API will only be used by echo client.
1898  * Because echo client and normal lustre client will share
1899  * same cl_env cache. So echo client needs to refresh
1900  * the env context after it get one from the cache, especially
1901  * when normal client and echo client co-exist in the same client.
1902  */
1903 int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
1904                           __u32 stags)
1905 {
1906         int    result;
1907
1908         if ((env->le_ctx.lc_tags & ctags) != ctags) {
1909                 env->le_ctx.lc_version = 0;
1910                 env->le_ctx.lc_tags |= ctags;
1911         }
1912
1913         if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
1914                 env->le_ses->lc_version = 0;
1915                 env->le_ses->lc_tags |= stags;
1916         }
1917
1918         result = lu_env_refill(env);
1919
1920         return result;
1921 }
1922 EXPORT_SYMBOL(lu_env_refill_by_tags);
1923
1924 static struct shrinker *lu_site_shrinker;
1925
1926 typedef struct lu_site_stats{
1927         unsigned        lss_populated;
1928         unsigned        lss_max_search;
1929         unsigned        lss_total;
1930         unsigned        lss_busy;
1931 } lu_site_stats_t;
1932
1933 static void lu_site_stats_get(struct cfs_hash *hs,
1934                               lu_site_stats_t *stats, int populated)
1935 {
1936         struct cfs_hash_bd bd;
1937         unsigned int  i;
1938
1939         cfs_hash_for_each_bucket(hs, &bd, i) {
1940                 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1941                 struct hlist_head       *hhead;
1942
1943                 cfs_hash_bd_lock(hs, &bd, 1);
1944                 stats->lss_busy  +=
1945                         cfs_hash_bd_count_get(&bd) - bkt->lsb_lru_len;
1946                 stats->lss_total += cfs_hash_bd_count_get(&bd);
1947                 stats->lss_max_search = max((int)stats->lss_max_search,
1948                                             cfs_hash_bd_depmax_get(&bd));
1949                 if (!populated) {
1950                         cfs_hash_bd_unlock(hs, &bd, 1);
1951                         continue;
1952                 }
1953
1954                 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1955                         if (!hlist_empty(hhead))
1956                                 stats->lss_populated++;
1957                 }
1958                 cfs_hash_bd_unlock(hs, &bd, 1);
1959         }
1960 }
1961
1962
1963 /*
1964  * lu_cache_shrink_count returns the number of cached objects that are
1965  * candidates to be freed by shrink_slab(). A counter, which tracks
1966  * the number of items in the site's lru, is maintained in the per cpu
1967  * stats of each site. The counter is incremented when an object is added
1968  * to a site's lru and decremented when one is removed. The number of
1969  * free-able objects is the sum of all per cpu counters for all sites.
1970  *
1971  * Using a per cpu counter is a compromise solution to concurrent access:
1972  * lu_object_put() can update the counter without locking the site and
1973  * lu_cache_shrink_count can sum the counters without locking each
1974  * ls_obj_hash bucket.
1975  */
1976 static unsigned long lu_cache_shrink_count(struct shrinker *sk,
1977                                            struct shrink_control *sc)
1978 {
1979         struct lu_site *s;
1980         struct lu_site *tmp;
1981         unsigned long cached = 0;
1982
1983         if (!(sc->gfp_mask & __GFP_FS))
1984                 return 0;
1985
1986         mutex_lock(&lu_sites_guard);
1987         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1988                 cached += ls_stats_read(s->ls_stats, LU_SS_LRU_LEN);
1989         }
1990         mutex_unlock(&lu_sites_guard);
1991
1992         cached = (cached / 100) * sysctl_vfs_cache_pressure;
1993         CDEBUG(D_INODE, "%ld objects cached, cache pressure %d\n",
1994                cached, sysctl_vfs_cache_pressure);
1995
1996         return cached;
1997 }
1998
1999 static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
2000                                           struct shrink_control *sc)
2001 {
2002         struct lu_site *s;
2003         struct lu_site *tmp;
2004         unsigned long remain = sc->nr_to_scan;
2005         LIST_HEAD(splice);
2006
2007         if (!(sc->gfp_mask & __GFP_FS))
2008                 /* We must not take the lu_sites_guard lock when
2009                  * __GFP_FS is *not* set because of the deadlock
2010                  * possibility detailed above. Additionally,
2011                  * since we cannot determine the number of
2012                  * objects in the cache without taking this
2013                  * lock, we're in a particularly tough spot. As
2014                  * a result, we'll just lie and say our cache is
2015                  * empty. This _should_ be ok, as we can't
2016                  * reclaim objects when __GFP_FS is *not* set
2017                  * anyways.
2018                  */
2019                 return SHRINK_STOP;
2020
2021         mutex_lock(&lu_sites_guard);
2022         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
2023                 remain = lu_site_purge(&lu_shrink_env, s, remain);
2024                 /*
2025                  * Move just shrunk site to the tail of site list to
2026                  * assure shrinking fairness.
2027                  */
2028                 list_move_tail(&s->ls_linkage, &splice);
2029         }
2030         list_splice(&splice, lu_sites.prev);
2031         mutex_unlock(&lu_sites_guard);
2032
2033         return sc->nr_to_scan - remain;
2034 }
2035
2036 #ifndef HAVE_SHRINKER_COUNT
2037 /*
2038  * There exists a potential lock inversion deadlock scenario when using
2039  * Lustre on top of ZFS. This occurs between one of ZFS's
2040  * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
2041  * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
2042  * while thread B will take the ht_lock and sleep on the lu_sites_guard
2043  * lock. Obviously neither thread will wake and drop their respective hold
2044  * on their lock.
2045  *
2046  * To prevent this from happening we must ensure the lu_sites_guard lock is
2047  * not taken while down this code path. ZFS reliably does not set the
2048  * __GFP_FS bit in its code paths, so this can be used to determine if it
2049  * is safe to take the lu_sites_guard lock.
2050  *
2051  * Ideally we should accurately return the remaining number of cached
2052  * objects without taking the lu_sites_guard lock, but this is not
2053  * possible in the current implementation.
2054  */
2055 static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2056 {
2057         int cached = 0;
2058         struct shrink_control scv = {
2059                  .nr_to_scan = shrink_param(sc, nr_to_scan),
2060                  .gfp_mask   = shrink_param(sc, gfp_mask)
2061         };
2062 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2063         struct shrinker* shrinker = NULL;
2064 #endif
2065
2066
2067         CDEBUG(D_INODE, "Shrink %lu objects\n", scv.nr_to_scan);
2068
2069         if (scv.nr_to_scan != 0)
2070                 lu_cache_shrink_scan(shrinker, &scv);
2071
2072         cached = lu_cache_shrink_count(shrinker, &scv);
2073         return cached;
2074 }
2075
2076 #endif /* HAVE_SHRINKER_COUNT */
2077
2078
2079 /*
2080  * Debugging stuff.
2081  */
2082
2083 /**
2084  * Environment to be used in debugger, contains all tags.
2085  */
2086 static struct lu_env lu_debugging_env;
2087
2088 /**
2089  * Debugging printer function using printk().
2090  */
2091 int lu_printk_printer(const struct lu_env *env,
2092                       void *unused, const char *format, ...)
2093 {
2094         va_list args;
2095
2096         va_start(args, format);
2097         vprintk(format, args);
2098         va_end(args);
2099         return 0;
2100 }
2101
2102 int lu_debugging_setup(void)
2103 {
2104         return lu_env_init(&lu_debugging_env, ~0);
2105 }
2106
2107 void lu_context_keys_dump(void)
2108 {
2109         unsigned int i;
2110
2111         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
2112                 struct lu_context_key *key;
2113
2114                 key = lu_keys[i];
2115                 if (key != NULL) {
2116                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
2117                                i, key, key->lct_tags,
2118                                key->lct_init, key->lct_fini, key->lct_exit,
2119                                key->lct_index, atomic_read(&key->lct_used),
2120                                key->lct_owner ? key->lct_owner->name : "",
2121                                key->lct_owner);
2122                         lu_ref_print(&key->lct_reference);
2123                 }
2124         }
2125 }
2126
2127 /**
2128  * Initialization of global lu_* data.
2129  */
2130 int lu_global_init(void)
2131 {
2132         int result;
2133         DEF_SHRINKER_VAR(shvar, lu_cache_shrink,
2134                          lu_cache_shrink_count, lu_cache_shrink_scan);
2135
2136         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
2137
2138         INIT_LIST_HEAD(&lu_device_types);
2139         INIT_LIST_HEAD(&lu_context_remembered);
2140         INIT_LIST_HEAD(&lu_sites);
2141
2142         result = lu_ref_global_init();
2143         if (result != 0)
2144                 return result;
2145
2146         LU_CONTEXT_KEY_INIT(&lu_global_key);
2147         result = lu_context_key_register(&lu_global_key);
2148         if (result != 0)
2149                 return result;
2150
2151         /*
2152          * At this level, we don't know what tags are needed, so allocate them
2153          * conservatively. This should not be too bad, because this
2154          * environment is global.
2155          */
2156         mutex_lock(&lu_sites_guard);
2157         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
2158         mutex_unlock(&lu_sites_guard);
2159         if (result != 0)
2160                 return result;
2161
2162         /*
2163          * seeks estimation: 3 seeks to read a record from oi, one to read
2164          * inode, one for ea. Unfortunately setting this high value results in
2165          * lu_object/inode cache consuming all the memory.
2166          */
2167         lu_site_shrinker = set_shrinker(DEFAULT_SEEKS, &shvar);
2168         if (lu_site_shrinker == NULL)
2169                 return -ENOMEM;
2170
2171         return result;
2172 }
2173
2174 /**
2175  * Dual to lu_global_init().
2176  */
2177 void lu_global_fini(void)
2178 {
2179         if (lu_site_shrinker != NULL) {
2180                 remove_shrinker(lu_site_shrinker);
2181                 lu_site_shrinker = NULL;
2182         }
2183
2184         lu_context_key_degister(&lu_global_key);
2185
2186         /*
2187          * Tear shrinker environment down _after_ de-registering
2188          * lu_global_key, because the latter has a value in the former.
2189          */
2190         mutex_lock(&lu_sites_guard);
2191         lu_env_fini(&lu_shrink_env);
2192         mutex_unlock(&lu_sites_guard);
2193
2194         lu_ref_global_fini();
2195 }
2196
2197 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
2198 {
2199 #ifdef CONFIG_PROC_FS
2200         struct lprocfs_counter ret;
2201
2202         lprocfs_stats_collect(stats, idx, &ret);
2203         if (idx == LU_SS_LRU_LEN)
2204                 /*
2205                  * protect against counter on cpu A being decremented
2206                  * before counter is incremented on cpu B; unlikely
2207                  */
2208                 return (__u32)((ret.lc_sum > 0) ? ret.lc_sum : 0);
2209         else
2210                 return (__u32)ret.lc_count;
2211 #else
2212         return 0;
2213 #endif
2214 }
2215
2216 /**
2217  * Output site statistical counters into a buffer. Suitable for
2218  * lprocfs_rd_*()-style functions.
2219  */
2220 int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
2221 {
2222         lu_site_stats_t stats;
2223
2224         memset(&stats, 0, sizeof(stats));
2225         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
2226
2227         return seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d %d\n",
2228                           stats.lss_busy,
2229                           stats.lss_total,
2230                           stats.lss_populated,
2231                           CFS_HASH_NHLIST(s->ls_obj_hash),
2232                           stats.lss_max_search,
2233                           ls_stats_read(s->ls_stats, LU_SS_CREATED),
2234                           ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2235                           ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2236                           ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2237                           ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2238                           ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED),
2239                           ls_stats_read(s->ls_stats, LU_SS_LRU_LEN));
2240 }
2241 EXPORT_SYMBOL(lu_site_stats_seq_print);
2242
2243 /**
2244  * Helper function to initialize a number of kmem slab caches at once.
2245  */
2246 int lu_kmem_init(struct lu_kmem_descr *caches)
2247 {
2248         int result;
2249         struct lu_kmem_descr *iter = caches;
2250
2251         for (result = 0; iter->ckd_cache != NULL; ++iter) {
2252                 *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
2253                                                      iter->ckd_size,
2254                                                      0, 0, NULL);
2255                 if (*iter->ckd_cache == NULL) {
2256                         result = -ENOMEM;
2257                         /* free all previously allocated caches */
2258                         lu_kmem_fini(caches);
2259                         break;
2260                 }
2261         }
2262         return result;
2263 }
2264 EXPORT_SYMBOL(lu_kmem_init);
2265
2266 /**
2267  * Helper function to finalize a number of kmem slab cached at once. Dual to
2268  * lu_kmem_init().
2269  */
2270 void lu_kmem_fini(struct lu_kmem_descr *caches)
2271 {
2272         for (; caches->ckd_cache != NULL; ++caches) {
2273                 if (*caches->ckd_cache != NULL) {
2274                         kmem_cache_destroy(*caches->ckd_cache);
2275                         *caches->ckd_cache = NULL;
2276                 }
2277         }
2278 }
2279 EXPORT_SYMBOL(lu_kmem_fini);
2280
2281 /**
2282  * Temporary solution to be able to assign fid in ->do_create()
2283  * till we have fully-functional OST fids
2284  */
2285 void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
2286                           const struct lu_fid *fid)
2287 {
2288         struct lu_site          *s = o->lo_dev->ld_site;
2289         struct lu_fid           *old = &o->lo_header->loh_fid;
2290         struct lu_object        *shadow;
2291         wait_queue_t             waiter;
2292         struct cfs_hash         *hs;
2293         struct cfs_hash_bd       bd;
2294         __u64                    version = 0;
2295
2296         LASSERT(fid_is_zero(old));
2297
2298         hs = s->ls_obj_hash;
2299         cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
2300         shadow = htable_lookup(s, &bd, fid, &waiter, &version);
2301         /* supposed to be unique */
2302         LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
2303         *old = *fid;
2304         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
2305         cfs_hash_bd_unlock(hs, &bd, 1);
2306 }
2307 EXPORT_SYMBOL(lu_object_assign_fid);
2308
2309 /**
2310  * allocates object with 0 (non-assiged) fid
2311  * XXX: temporary solution to be able to assign fid in ->do_create()
2312  *      till we have fully-functional OST fids
2313  */
2314 struct lu_object *lu_object_anon(const struct lu_env *env,
2315                                  struct lu_device *dev,
2316                                  const struct lu_object_conf *conf)
2317 {
2318         struct lu_fid     fid;
2319         struct lu_object *o;
2320
2321         fid_zero(&fid);
2322         o = lu_object_alloc(env, dev, &fid, conf);
2323
2324         return o;
2325 }
2326 EXPORT_SYMBOL(lu_object_anon);
2327
2328 struct lu_buf LU_BUF_NULL = {
2329         .lb_buf = NULL,
2330         .lb_len = 0
2331 };
2332 EXPORT_SYMBOL(LU_BUF_NULL);
2333
2334 void lu_buf_free(struct lu_buf *buf)
2335 {
2336         LASSERT(buf);
2337         if (buf->lb_buf) {
2338                 LASSERT(buf->lb_len > 0);
2339                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2340                 buf->lb_buf = NULL;
2341                 buf->lb_len = 0;
2342         }
2343 }
2344 EXPORT_SYMBOL(lu_buf_free);
2345
2346 void lu_buf_alloc(struct lu_buf *buf, size_t size)
2347 {
2348         LASSERT(buf);
2349         LASSERT(buf->lb_buf == NULL);
2350         LASSERT(buf->lb_len == 0);
2351         OBD_ALLOC_LARGE(buf->lb_buf, size);
2352         if (likely(buf->lb_buf))
2353                 buf->lb_len = size;
2354 }
2355 EXPORT_SYMBOL(lu_buf_alloc);
2356
2357 void lu_buf_realloc(struct lu_buf *buf, size_t size)
2358 {
2359         lu_buf_free(buf);
2360         lu_buf_alloc(buf, size);
2361 }
2362 EXPORT_SYMBOL(lu_buf_realloc);
2363
2364 struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, size_t len)
2365 {
2366         if (buf->lb_buf == NULL && buf->lb_len == 0)
2367                 lu_buf_alloc(buf, len);
2368
2369         if ((len > buf->lb_len) && (buf->lb_buf != NULL))
2370                 lu_buf_realloc(buf, len);
2371
2372         return buf;
2373 }
2374 EXPORT_SYMBOL(lu_buf_check_and_alloc);
2375
2376 /**
2377  * Increase the size of the \a buf.
2378  * preserves old data in buffer
2379  * old buffer remains unchanged on error
2380  * \retval 0 or -ENOMEM
2381  */
2382 int lu_buf_check_and_grow(struct lu_buf *buf, size_t len)
2383 {
2384         char *ptr;
2385
2386         if (len <= buf->lb_len)
2387                 return 0;
2388
2389         OBD_ALLOC_LARGE(ptr, len);
2390         if (ptr == NULL)
2391                 return -ENOMEM;
2392
2393         /* Free the old buf */
2394         if (buf->lb_buf != NULL) {
2395                 memcpy(ptr, buf->lb_buf, buf->lb_len);
2396                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2397         }
2398
2399         buf->lb_buf = ptr;
2400         buf->lb_len = len;
2401         return 0;
2402 }