Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/lu_object.c
32  *
33  * Lustre Object.
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  *
37  *   Author: Nikita Danilov <nikita.danilov@sun.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_CLASS
41
42 #include <linux/delay.h>
43 #include <linux/module.h>
44 #include <linux/list.h>
45 #include <linux/processor.h>
46 #include <linux/random.h>
47
48 #include <libcfs/libcfs.h>
49 #include <libcfs/linux/linux-mem.h>
50 #include <libcfs/linux/linux-hash.h>
51 #include <obd_class.h>
52 #include <obd_support.h>
53 #include <lustre_disk.h>
54 #include <lustre_fid.h>
55 #include <lu_object.h>
56 #include <lu_ref.h>
57
58 struct lu_site_bkt_data {
59         /**
60          * LRU list, updated on each access to object. Protected by
61          * lsb_waitq.lock.
62          *
63          * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
64          * moved to the lu_site::ls_lru.prev
65          */
66         struct list_head                lsb_lru;
67         /**
68          * Wait-queue signaled when an object in this site is ultimately
69          * destroyed (lu_object_free()) or initialized (lu_object_start()).
70          * It is used by lu_object_find() to wait before re-trying when
71          * object in the process of destruction is found in the hash table;
72          * or wait object to be initialized by the allocator.
73          *
74          * \see htable_lookup().
75          */
76         wait_queue_head_t               lsb_waitq;
77 };
78
79 enum {
80         LU_CACHE_PERCENT_MAX     = 50,
81         LU_CACHE_PERCENT_DEFAULT = 20
82 };
83
84 #define LU_CACHE_NR_MAX_ADJUST          512
85 #define LU_CACHE_NR_UNLIMITED           -1
86 #define LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
87 /** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */
88 #define LU_CACHE_NR_ZFS_LIMIT           10240
89
90 #define LU_CACHE_NR_MIN                 4096
91 #define LU_CACHE_NR_MAX                 0x80000000UL
92
93 /**
94  * Max 256 buckets, we don't want too many buckets because:
95  * - consume too much memory (currently max 16K)
96  * - avoid unbalanced LRU list
97  * With few cpus there is little gain from extra buckets, so
98  * we treat this as a maximum in lu_site_init().
99  */
100 #define LU_SITE_BKT_BITS    8
101
102 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
103 module_param(lu_cache_percent, int, 0644);
104 MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
105
106 static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
107 module_param(lu_cache_nr, long, 0644);
108 MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
109
110 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
111 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
112
113 u32 lu_fid_hash(const void *data, u32 len, u32 seed)
114 {
115         const struct lu_fid *fid = data;
116
117         seed = cfs_hash_32(seed ^ fid->f_oid, 32);
118         seed ^= cfs_hash_64(fid->f_seq, 32);
119         return seed;
120 }
121 EXPORT_SYMBOL(lu_fid_hash);
122
123 static const struct rhashtable_params obj_hash_params = {
124         .key_len        = sizeof(struct lu_fid),
125         .key_offset     = offsetof(struct lu_object_header, loh_fid),
126         .head_offset    = offsetof(struct lu_object_header, loh_hash),
127         .hashfn         = lu_fid_hash,
128         .automatic_shrinking = true,
129 };
130
131 static inline int lu_bkt_hash(struct lu_site *s, const struct lu_fid *fid)
132 {
133         return lu_fid_hash(fid, sizeof(*fid), s->ls_bkt_seed) &
134                (s->ls_bkt_cnt - 1);
135 }
136
137 wait_queue_head_t *
138 lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
139 {
140         struct lu_site_bkt_data *bkt;
141
142         bkt = &site->ls_bkts[lu_bkt_hash(site, fid)];
143         return &bkt->lsb_waitq;
144 }
145 EXPORT_SYMBOL(lu_site_wq_from_fid);
146
147 /**
148  * Decrease reference counter on object. If last reference is freed, return
149  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
150  * case, free object immediately.
151  */
152 void lu_object_put(const struct lu_env *env, struct lu_object *o)
153 {
154         struct lu_site_bkt_data *bkt;
155         struct lu_object_header *top = o->lo_header;
156         struct lu_site *site = o->lo_dev->ld_site;
157         struct lu_object *orig = o;
158         const struct lu_fid *fid = lu_object_fid(o);
159
160         LASSERTF(atomic_read(&top->loh_ref) > 0, "o %p\n", o);
161         /*
162          * till we have full fids-on-OST implemented anonymous objects
163          * are possible in OSP. such an object isn't listed in the site
164          * so we should not remove it from the site.
165          */
166         if (fid_is_zero(fid)) {
167                 LASSERT(list_empty(&top->loh_lru));
168                 if (!atomic_dec_and_test(&top->loh_ref))
169                         return;
170                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
171                         if (o->lo_ops->loo_object_release != NULL)
172                                 o->lo_ops->loo_object_release(env, o);
173                 }
174                 lu_object_free(env, orig);
175                 return;
176         }
177
178         bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
179         if (atomic_add_unless(&top->loh_ref, -1, 1)) {
180 still_active:
181                 /*
182                  * At this point the object reference is dropped and lock is
183                  * not taken, so lu_object should not be touched because it
184                  * can be freed by concurrent thread.
185                  *
186                  * Somebody may be waiting for this, currently only used for
187                  * cl_object, see cl_object_put_last().
188                  */
189                 wake_up(&bkt->lsb_waitq);
190
191                 return;
192         }
193
194         spin_lock(&bkt->lsb_waitq.lock);
195         if (!atomic_dec_and_test(&top->loh_ref)) {
196                 spin_unlock(&bkt->lsb_waitq.lock);
197                 goto still_active;
198         }
199
200         /*
201          * Refcount is zero, and cannot be incremented without taking the bkt
202          * lock, so object is stable.
203          */
204
205         /*
206          * When last reference is released, iterate over object layers, and
207          * notify them that object is no longer busy.
208          */
209         list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
210                 if (o->lo_ops->loo_object_release != NULL)
211                         o->lo_ops->loo_object_release(env, o);
212         }
213
214         /*
215          * Don't use local 'is_dying' here because if was taken without lock but
216          * here we need the latest actual value of it so check lu_object
217          * directly here.
218          */
219         if (!lu_object_is_dying(top) &&
220             (lu_object_exists(orig) || lu_object_is_cl(orig))) {
221                 LASSERT(list_empty(&top->loh_lru));
222                 list_add_tail(&top->loh_lru, &bkt->lsb_lru);
223                 spin_unlock(&bkt->lsb_waitq.lock);
224                 percpu_counter_inc(&site->ls_lru_len_counter);
225                 CDEBUG(D_INODE, "Add %p/%p to site lru. bkt: %p\n",
226                        orig, top, bkt);
227                 return;
228         }
229
230         /*
231          * If object is dying (will not be cached) then remove it from hash
232          * table (it is already not on the LRU).
233          *
234          * This is done with bucket lock held.  As the only way to acquire first
235          * reference to previously unreferenced object is through hash-table
236          * lookup (lu_object_find()) which takes the lock for first reference,
237          * no race with concurrent object lookup is possible and we can safely
238          * destroy object below.
239          */
240         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
241                 rhashtable_remove_fast(&site->ls_obj_hash, &top->loh_hash,
242                                        obj_hash_params);
243
244         spin_unlock(&bkt->lsb_waitq.lock);
245         /* Object was already removed from hash above, can kill it. */
246         lu_object_free(env, orig);
247 }
248 EXPORT_SYMBOL(lu_object_put);
249
250 /**
251  * Put object and don't keep in cache. This is temporary solution for
252  * multi-site objects when its layering is not constant.
253  */
254 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
255 {
256         set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
257         return lu_object_put(env, o);
258 }
259 EXPORT_SYMBOL(lu_object_put_nocache);
260
261 /**
262  * Kill the object and take it out of LRU cache.
263  * Currently used by client code for layout change.
264  */
265 void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
266 {
267         struct lu_object_header *top;
268
269         top = o->lo_header;
270         set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
271         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
272                 struct lu_site *site = o->lo_dev->ld_site;
273                 struct rhashtable *obj_hash = &site->ls_obj_hash;
274                 struct lu_site_bkt_data *bkt;
275
276                 bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
277                 spin_lock(&bkt->lsb_waitq.lock);
278                 if (!list_empty(&top->loh_lru)) {
279                         list_del_init(&top->loh_lru);
280                         percpu_counter_dec(&site->ls_lru_len_counter);
281                 }
282                 spin_unlock(&bkt->lsb_waitq.lock);
283
284                 rhashtable_remove_fast(obj_hash, &top->loh_hash,
285                                        obj_hash_params);
286         }
287 }
288 EXPORT_SYMBOL(lu_object_unhash);
289
290 /**
291  * Allocate new object.
292  *
293  * This follows object creation protocol, described in the comment within
294  * struct lu_device_operations definition.
295  */
296 static struct lu_object *lu_object_alloc(const struct lu_env *env,
297                                          struct lu_device *dev,
298                                          const struct lu_fid *f)
299 {
300         struct lu_object *top;
301
302         /*
303          * Create top-level object slice. This will also create
304          * lu_object_header.
305          */
306         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
307         if (top == NULL)
308                 return ERR_PTR(-ENOMEM);
309         if (IS_ERR(top))
310                 return top;
311         /* The only place where obj fid is assigned. It's constant after this */
312         top->lo_header->loh_fid = *f;
313
314         return top;
315 }
316
317 /**
318  * Initialize object.
319  *
320  * This is called after object hash insertion to avoid returning an object with
321  * stale attributes.
322  */
323 static int lu_object_start(const struct lu_env *env, struct lu_device *dev,
324                            struct lu_object *top,
325                            const struct lu_object_conf *conf)
326 {
327         struct lu_object *scan;
328         struct list_head *layers;
329         unsigned int init_mask = 0;
330         unsigned int init_flag;
331         int clean;
332         int result;
333
334         layers = &top->lo_header->loh_layers;
335
336         do {
337                 /*
338                  * Call ->loo_object_init() repeatedly, until no more new
339                  * object slices are created.
340                  */
341                 clean = 1;
342                 init_flag = 1;
343                 list_for_each_entry(scan, layers, lo_linkage) {
344                         if (init_mask & init_flag)
345                                 goto next;
346                         clean = 0;
347                         scan->lo_header = top->lo_header;
348                         result = scan->lo_ops->loo_object_init(env, scan, conf);
349                         if (result)
350                                 return result;
351
352                         init_mask |= init_flag;
353 next:
354                         init_flag <<= 1;
355                 }
356         } while (!clean);
357
358         list_for_each_entry_reverse(scan, layers, lo_linkage) {
359                 if (scan->lo_ops->loo_object_start != NULL) {
360                         result = scan->lo_ops->loo_object_start(env, scan);
361                         if (result)
362                                 return result;
363                 }
364         }
365
366         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
367
368         set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags);
369
370         return 0;
371 }
372
373 /* Free an object. */
374 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
375 {
376         wait_queue_head_t *wq;
377         struct lu_site *site;
378         struct lu_object *scan;
379         struct list_head *layers;
380         LIST_HEAD(splice);
381
382         site = o->lo_dev->ld_site;
383         layers = &o->lo_header->loh_layers;
384         wq = lu_site_wq_from_fid(site, &o->lo_header->loh_fid);
385         /* First call ->loo_object_delete() method to release all resources. */
386         list_for_each_entry_reverse(scan, layers, lo_linkage) {
387                 if (scan->lo_ops->loo_object_delete != NULL)
388                         scan->lo_ops->loo_object_delete(env, scan);
389         }
390
391         /*
392          * Then, splice object layers into stand-alone list, and call
393          * ->loo_object_free() on all layers to free memory. Splice is
394          * necessary, because lu_object_header is freed together with the
395          * top-level slice.
396          */
397         list_splice_init(layers, &splice);
398         while (!list_empty(&splice)) {
399                 /*
400                  * Free layers in bottom-to-top order, so that object header
401                  * lives as long as possible and ->loo_object_free() methods
402                  * can look at its contents.
403                  */
404                 o = container_of(splice.prev, struct lu_object, lo_linkage);
405                 list_del_init(&o->lo_linkage);
406                 LASSERT(o->lo_ops->loo_object_free != NULL);
407                 o->lo_ops->loo_object_free(env, o);
408         }
409
410         if (waitqueue_active(wq))
411                 wake_up(wq);
412 }
413
414 /**
415  * Free \a nr objects from the cold end of the site LRU list.
416  * if canblock is 0, then don't block awaiting for another
417  * instance of lu_site_purge() to complete
418  */
419 int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
420                           int nr, int canblock)
421 {
422         struct lu_object_header *h;
423         struct lu_object_header *temp;
424         struct lu_site_bkt_data *bkt;
425         LIST_HEAD(dispose);
426         int                      did_sth;
427         unsigned int             start = 0;
428         int                      count;
429         int                      bnr;
430         unsigned int             i;
431
432         if (CFS_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
433                 RETURN(0);
434
435         /*
436          * Under LRU list lock, scan LRU list and move unreferenced objects to
437          * the dispose list, removing them from LRU and hash table.
438          */
439         if (nr != ~0)
440                 start = s->ls_purge_start;
441         bnr = (nr == ~0) ? -1 : nr / s->ls_bkt_cnt + 1;
442 again:
443         /*
444          * It doesn't make any sense to make purge threads parallel, that can
445          * only bring troubles to us.  See LU-5331.
446          */
447         if (canblock != 0)
448                 mutex_lock(&s->ls_purge_mutex);
449         else if (mutex_trylock(&s->ls_purge_mutex) == 0)
450                 goto out;
451
452         did_sth = 0;
453         for (i = start; i < s->ls_bkt_cnt ; i++) {
454                 count = bnr;
455                 bkt = &s->ls_bkts[i];
456                 spin_lock(&bkt->lsb_waitq.lock);
457
458                 list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
459                         LASSERT(atomic_read(&h->loh_ref) == 0);
460
461                         LINVRNT(lu_bkt_hash(s, &h->loh_fid) == i);
462
463                         set_bit(LU_OBJECT_UNHASHED, &h->loh_flags);
464                         rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
465                                                obj_hash_params);
466                         list_move(&h->loh_lru, &dispose);
467                         percpu_counter_dec(&s->ls_lru_len_counter);
468                         if (did_sth == 0)
469                                 did_sth = 1;
470
471                         if (nr != ~0 && --nr == 0)
472                                 break;
473
474                         if (count > 0 && --count == 0)
475                                 break;
476
477                 }
478                 spin_unlock(&bkt->lsb_waitq.lock);
479                 cond_resched();
480                 /*
481                  * Free everything on the dispose list. This is safe against
482                  * races due to the reasons described in lu_object_put().
483                  */
484                 while ((h = list_first_entry_or_null(&dispose,
485                                                      struct lu_object_header,
486                                                      loh_lru)) != NULL) {
487                         list_del_init(&h->loh_lru);
488                         lu_object_free(env, lu_object_top(h));
489                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
490                 }
491
492                 if (nr == 0)
493                         break;
494         }
495         mutex_unlock(&s->ls_purge_mutex);
496
497         if (nr != 0 && did_sth && start != 0) {
498                 start = 0; /* restart from the first bucket */
499                 goto again;
500         }
501         /* race on s->ls_purge_start, but nobody cares */
502         s->ls_purge_start = i & (s->ls_bkt_cnt - 1);
503 out:
504         return nr;
505 }
506 EXPORT_SYMBOL(lu_site_purge_objects);
507
508 /*
509  * Object printing.
510  *
511  * Code below has to jump through certain loops to output object description
512  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
513  * composes object description from strings that are parts of _lines_ of
514  * output (i.e., strings that are not terminated by newline). This doesn't fit
515  * very well into libcfs_debug_msg() interface that assumes that each message
516  * supplied to it is a self-contained output line.
517  *
518  * To work around this, strings are collected in a temporary buffer
519  * (implemented as a value of lu_cdebug_key key), until terminating newline
520  * character is detected.
521  *
522  */
523
524 enum {
525         /**
526          * Maximal line size.
527          *
528          * XXX overflow is not handled correctly.
529          */
530         LU_CDEBUG_LINE = 512
531 };
532
533 struct lu_cdebug_data {
534         /* Temporary buffer */
535         char lck_area[LU_CDEBUG_LINE];
536 };
537
538 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
539 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
540
541 /**
542  * Key, holding temporary buffer. This key is registered very early by
543  * lu_global_init().
544  */
545 static struct lu_context_key lu_global_key = {
546         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
547                     LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
548         .lct_init = lu_global_key_init,
549         .lct_fini = lu_global_key_fini
550 };
551
552 /* Printer function emitting messages through libcfs_debug_msg(). */
553 int lu_cdebug_printer(const struct lu_env *env,
554                       void *cookie, const char *format, ...)
555 {
556         struct libcfs_debug_msg_data *msgdata = cookie;
557         struct lu_cdebug_data        *key;
558         int used;
559         int complete;
560         va_list args;
561
562         va_start(args, format);
563
564         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
565         LASSERT(key != NULL);
566
567         used = strlen(key->lck_area);
568         complete = format[strlen(format) - 1] == '\n';
569         /* Append new chunk to the buffer. */
570         vsnprintf(key->lck_area + used,
571                   ARRAY_SIZE(key->lck_area) - used, format, args);
572         if (complete) {
573                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
574                         libcfs_debug_msg(msgdata, "%s", key->lck_area);
575                 key->lck_area[0] = 0;
576         }
577         va_end(args);
578         return 0;
579 }
580 EXPORT_SYMBOL(lu_cdebug_printer);
581
582 /* Print object header. */
583 void lu_object_header_print(const struct lu_env *env, void *cookie,
584                             lu_printer_t printer,
585                             const struct lu_object_header *hdr)
586 {
587         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
588                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
589                    PFID(&hdr->loh_fid),
590                    test_bit(LU_OBJECT_UNHASHED,
591                             &hdr->loh_flags) ? "" : " hash",
592                    list_empty(&hdr->loh_lru) ? "" : " lru",
593                    hdr->loh_attr & LOHA_EXISTS ? " exist" : "");
594 }
595 EXPORT_SYMBOL(lu_object_header_print);
596
597 /* Print human readable representation of the \a o to the \a printer. */
598 void lu_object_print(const struct lu_env *env, void *cookie,
599                      lu_printer_t printer, const struct lu_object *o)
600 {
601         static const char ruler[] = "........................................";
602         struct lu_object_header *top;
603         int depth = 4;
604
605         top = o->lo_header;
606         lu_object_header_print(env, cookie, printer, top);
607         (*printer)(env, cookie, "{\n");
608
609         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
610                 /* print '.' \a depth times followed by type name and address */
611                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
612                            o->lo_dev->ld_type->ldt_name, o);
613
614                 if (o->lo_ops->loo_object_print != NULL)
615                         (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
616
617                 (*printer)(env, cookie, "\n");
618         }
619
620         (*printer)(env, cookie, "} header@%p\n", top);
621 }
622 EXPORT_SYMBOL(lu_object_print);
623
624 /* Check object consistency. */
625 int lu_object_invariant(const struct lu_object *o)
626 {
627         struct lu_object_header *top;
628
629         top = o->lo_header;
630         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
631                 if (o->lo_ops->loo_object_invariant != NULL &&
632                     !o->lo_ops->loo_object_invariant(o))
633                         return 0;
634         }
635         return 1;
636 }
637
638 /*
639  * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because the
640  * calculation for the number of objects to reclaim is not covered by a lock the
641  * maximum number of objects is capped by LU_CACHE_MAX_ADJUST.  This ensures
642  * that many concurrent threads will not accidentally purge the entire cache.
643  */
644 static void lu_object_limit(const struct lu_env *env,
645                             struct lu_device *dev)
646 {
647         u64 size, nr;
648
649         if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
650                 return;
651
652         size = atomic_read(&dev->ld_site->ls_obj_hash.nelems);
653         nr = (u64)lu_cache_nr;
654         if (size <= nr)
655                 return;
656
657         lu_site_purge_objects(env, dev->ld_site,
658                               min_t(u64, size - nr, LU_CACHE_NR_MAX_ADJUST),
659                               0);
660 }
661
662 static struct lu_object *htable_lookup(const struct lu_env *env,
663                                        struct lu_device *dev,
664                                        struct lu_site_bkt_data *bkt,
665                                        const struct lu_fid *f,
666                                        struct lu_object_header *new)
667 {
668         struct lu_site *s = dev->ld_site;
669         struct lu_object_header *h;
670
671 try_again:
672         rcu_read_lock();
673         if (new)
674                 h = rhashtable_lookup_get_insert_fast(&s->ls_obj_hash,
675                                                       &new->loh_hash,
676                                                       obj_hash_params);
677         else
678                 h = rhashtable_lookup(&s->ls_obj_hash, f, obj_hash_params);
679
680         if (IS_ERR_OR_NULL(h)) {
681                 /* Not found */
682                 if (!new)
683                         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
684                 rcu_read_unlock();
685                 if (PTR_ERR(h) == -ENOMEM) {
686                         msleep(20);
687                         goto try_again;
688                 }
689                 lu_object_limit(env, dev);
690                 if (PTR_ERR(h) == -E2BIG)
691                         goto try_again;
692
693                 return ERR_PTR(-ENOENT);
694         }
695
696         if (atomic_inc_not_zero(&h->loh_ref)) {
697                 rcu_read_unlock();
698                 return lu_object_top(h);
699         }
700
701         spin_lock(&bkt->lsb_waitq.lock);
702         if (lu_object_is_dying(h) ||
703             test_bit(LU_OBJECT_UNHASHED, &h->loh_flags)) {
704                 spin_unlock(&bkt->lsb_waitq.lock);
705                 rcu_read_unlock();
706                 if (new) {
707                         /*
708                          * Old object might have already been removed, or will
709                          * be soon.  We need to insert our new object, so
710                          * remove the old one just in case it is still there.
711                          */
712                         rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
713                                                obj_hash_params);
714                         goto try_again;
715                 }
716                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
717                 return ERR_PTR(-ENOENT);
718         }
719         /* Now protected by spinlock */
720         rcu_read_unlock();
721
722         if (!list_empty(&h->loh_lru)) {
723                 list_del_init(&h->loh_lru);
724                 percpu_counter_dec(&s->ls_lru_len_counter);
725         }
726         atomic_inc(&h->loh_ref);
727         spin_unlock(&bkt->lsb_waitq.lock);
728         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
729         return lu_object_top(h);
730 }
731
732 /**
733  * Search cache for an object with the fid \a f. If such object is found,
734  * return it. Otherwise, create new object, insert it into cache and return
735  * it. In any case, additional reference is acquired on the returned object.
736  */
737 struct lu_object *lu_object_find(const struct lu_env *env,
738                                  struct lu_device *dev, const struct lu_fid *f,
739                                  const struct lu_object_conf *conf)
740 {
741         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
742 }
743 EXPORT_SYMBOL(lu_object_find);
744
745 /* Get a 'first' ref to an obj that was found looking through the hash table */
746 struct lu_object *lu_object_get_first(struct lu_object_header *h,
747                                       struct lu_device *dev)
748 {
749         struct lu_site *s = dev->ld_site;
750         struct lu_object *ret;
751
752         if (IS_ERR_OR_NULL(h) || lu_object_is_dying(h))
753                 return NULL;
754
755         ret = lu_object_locate(h, dev->ld_type);
756         if (!ret)
757                 return ret;
758
759         if (!atomic_inc_not_zero(&h->loh_ref)) {
760                 struct lu_site_bkt_data *bkt;
761
762                 bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)];
763                 spin_lock(&bkt->lsb_waitq.lock);
764                 if (!lu_object_is_dying(h) &&
765                     !test_bit(LU_OBJECT_UNHASHED, &h->loh_flags))
766                         atomic_inc(&h->loh_ref);
767                 else
768                         ret = NULL;
769                 spin_unlock(&bkt->lsb_waitq.lock);
770         }
771         return ret;
772 }
773 EXPORT_SYMBOL(lu_object_get_first);
774
775 /**
776  * Core logic of lu_object_find*() functions.
777  *
778  * Much like lu_object_find(), but top level device of object is specifically
779  * \a dev rather than top level device of the site. This interface allows
780  * objects of different "stacking" to be created within the same site.
781  */
782 struct lu_object *lu_object_find_at(const struct lu_env *env,
783                                     struct lu_device *dev,
784                                     const struct lu_fid *f,
785                                     const struct lu_object_conf *conf)
786 {
787         struct lu_object *o;
788         struct lu_object *shadow;
789         struct lu_site *s;
790         struct lu_site_bkt_data *bkt;
791         struct rhashtable *hs;
792         int rc;
793
794         ENTRY;
795
796         /* FID is from disk or network, zero FID is meaningless, return error
797          * early to avoid assertion in lu_object_put. If a zero FID is wanted,
798          * it should be allocated via lu_object_anon().
799          */
800         if (fid_is_zero(f))
801                 RETURN(ERR_PTR(-EINVAL));
802
803         /*
804          * This uses standard index maintenance protocol:
805          *
806          *     - search index under lock, and return object if found;
807          *     - otherwise, unlock index, allocate new object;
808          *     - lock index and search again;
809          *     - if nothing is found (usual case), insert newly created
810          *       object into index;
811          *     - otherwise (race: other thread inserted object), free
812          *       object just allocated.
813          *     - unlock index;
814          *     - return object.
815          *
816          * For "LOC_F_NEW" case, we are sure the object is new established.
817          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
818          * just alloc and insert directly.
819          *
820          */
821         s  = dev->ld_site;
822         hs = &s->ls_obj_hash;
823
824         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
825                 lu_site_purge(env, s, -1);
826
827         bkt = &s->ls_bkts[lu_bkt_hash(s, f)];
828         if (!(conf && conf->loc_flags & LOC_F_NEW)) {
829                 o = htable_lookup(env, dev, bkt, f, NULL);
830
831                 if (!IS_ERR(o)) {
832                         if (likely(lu_object_is_inited(o->lo_header)))
833                                 RETURN(o);
834
835                         wait_event_idle(bkt->lsb_waitq,
836                                         lu_object_is_inited(o->lo_header) ||
837                                         lu_object_is_dying(o->lo_header));
838
839                         if (lu_object_is_dying(o->lo_header)) {
840                                 lu_object_put(env, o);
841
842                                 RETURN(ERR_PTR(-ENOENT));
843                         }
844
845                         RETURN(o);
846                 }
847
848                 if (PTR_ERR(o) != -ENOENT)
849                         RETURN(o);
850         }
851
852         /*
853          * Allocate new object, NB, object is unitialized in case object
854          * is changed between allocation and hash insertion, thus the object
855          * with stale attributes is returned.
856          */
857         o = lu_object_alloc(env, dev, f);
858         if (IS_ERR(o))
859                 RETURN(o);
860
861         LASSERT(lu_fid_eq(lu_object_fid(o), f));
862
863         CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
864
865         if (conf && conf->loc_flags & LOC_F_NEW) {
866                 int status = rhashtable_insert_fast(hs, &o->lo_header->loh_hash,
867                                                     obj_hash_params);
868                 if (status)
869                         /* Strange error - go the slow way */
870                         shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
871                 else
872                         shadow = ERR_PTR(-ENOENT);
873         } else {
874                 shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
875         }
876         if (likely(PTR_ERR(shadow) == -ENOENT)) {
877                 /*
878                  * The new object has been successfully inserted.
879                  *
880                  * This may result in rather complicated operations, including
881                  * fld queries, inode loading, etc.
882                  */
883                 rc = lu_object_start(env, dev, o, conf);
884                 if (rc) {
885                         lu_object_put_nocache(env, o);
886                         RETURN(ERR_PTR(rc));
887                 }
888
889                 wake_up(&bkt->lsb_waitq);
890
891                 lu_object_limit(env, dev);
892
893                 RETURN(o);
894         }
895
896         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
897         lu_object_free(env, o);
898
899         if (!(conf && conf->loc_flags & LOC_F_NEW) &&
900             !IS_ERR(shadow) &&
901             !lu_object_is_inited(shadow->lo_header)) {
902                 wait_event_idle(bkt->lsb_waitq,
903                                 lu_object_is_inited(shadow->lo_header) ||
904                                 lu_object_is_dying(shadow->lo_header));
905
906                 if (lu_object_is_dying(shadow->lo_header)) {
907                         lu_object_put(env, shadow);
908
909                         RETURN(ERR_PTR(-ENOENT));
910                 }
911         }
912
913         RETURN(shadow);
914 }
915 EXPORT_SYMBOL(lu_object_find_at);
916
917 /* Find object with given fid, return its slice belonging to given device. */
918 struct lu_object *lu_object_find_slice(const struct lu_env *env,
919                                        struct lu_device *dev,
920                                        const struct lu_fid *f,
921                                        const struct lu_object_conf *conf)
922 {
923         struct lu_object *top;
924         struct lu_object *obj;
925
926         top = lu_object_find(env, dev, f, conf);
927         if (IS_ERR(top))
928                 return top;
929
930         obj = lu_object_locate(top->lo_header, dev->ld_type);
931         if (unlikely(obj == NULL)) {
932                 lu_object_put(env, top);
933                 obj = ERR_PTR(-ENOENT);
934         }
935
936         return obj;
937 }
938 EXPORT_SYMBOL(lu_object_find_slice);
939
940 int lu_device_type_init(struct lu_device_type *ldt)
941 {
942         int result = 0;
943
944         atomic_set(&ldt->ldt_device_nr, 0);
945         if (ldt->ldt_ops->ldto_init)
946                 result = ldt->ldt_ops->ldto_init(ldt);
947
948         return result;
949 }
950 EXPORT_SYMBOL(lu_device_type_init);
951
952 void lu_device_type_fini(struct lu_device_type *ldt)
953 {
954         if (ldt->ldt_ops->ldto_fini)
955                 ldt->ldt_ops->ldto_fini(ldt);
956 }
957 EXPORT_SYMBOL(lu_device_type_fini);
958
959 /* Global list of all sites on this node */
960 static LIST_HEAD(lu_sites);
961 static DECLARE_RWSEM(lu_sites_guard);
962
963 /* Global environment used by site shrinker. */
964 static struct lu_env lu_shrink_env;
965
966 struct lu_site_print_arg {
967         struct lu_env   *lsp_env;
968         void            *lsp_cookie;
969         lu_printer_t     lsp_printer;
970 };
971
972 static void
973 lu_site_obj_print(struct lu_object_header *h, struct lu_site_print_arg *arg)
974 {
975         if (!list_empty(&h->loh_layers)) {
976                 const struct lu_object *o;
977
978                 o = lu_object_top(h);
979                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
980                                 arg->lsp_printer, o);
981         } else {
982                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
983                                        arg->lsp_printer, h);
984         }
985 }
986
987 /* Print all objects in \a s. */
988 void lu_site_print(const struct lu_env *env, struct lu_site *s, atomic_t *ref,
989                    int msg_flag, lu_printer_t printer)
990 {
991         struct lu_site_print_arg arg = {
992                 .lsp_env     = (struct lu_env *)env,
993                 .lsp_printer = printer,
994         };
995         struct rhashtable_iter iter;
996         struct lu_object_header *h;
997
998         LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, msg_flag, NULL);
999
1000         if (!s || !atomic_read(ref))
1001                 return;
1002
1003         arg.lsp_cookie = (void *)&msgdata;
1004
1005         rhashtable_walk_enter(&s->ls_obj_hash, &iter);
1006         rhashtable_walk_start(&iter);
1007         while ((h = rhashtable_walk_next(&iter)) != NULL) {
1008                 if (IS_ERR(h))
1009                         continue;
1010                 lu_site_obj_print(h, &arg);
1011         }
1012         rhashtable_walk_stop(&iter);
1013         rhashtable_walk_exit(&iter);
1014 }
1015 EXPORT_SYMBOL(lu_site_print);
1016
1017 /* Return desired hash table order. */
1018 static void lu_htable_limits(struct lu_device *top)
1019 {
1020         unsigned long cache_size;
1021
1022         /*
1023          * For ZFS based OSDs the cache should be disabled by default.  This
1024          * allows the ZFS ARC maximum flexibility in determining what buffers
1025          * to cache.  If Lustre has objects or buffer which it wants to ensure
1026          * always stay cached it must maintain a hold on them.
1027          */
1028         if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
1029                 lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
1030                 return;
1031         }
1032
1033         /*
1034          * Calculate hash table size, assuming that we want reasonable
1035          * performance when 20% of total memory is occupied by cache of
1036          * lu_objects.
1037          *
1038          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
1039          */
1040         cache_size = cfs_totalram_pages();
1041
1042 #if BITS_PER_LONG == 32
1043         /* limit hashtable size for lowmem systems to low RAM */
1044         if (cache_size > 1 << (30 - PAGE_SHIFT))
1045                 cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4;
1046 #endif
1047
1048         /* clear off unreasonable cache setting. */
1049         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
1050                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in the range of (0, %u]. Will use default value: %u.\n",
1051                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
1052                       LU_CACHE_PERCENT_DEFAULT);
1053
1054                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
1055         }
1056         cache_size = cache_size / 100 * lu_cache_percent *
1057                 (PAGE_SIZE / 1024);
1058
1059         lu_cache_nr = clamp_t(typeof(cache_size), cache_size,
1060                               LU_CACHE_NR_MIN, LU_CACHE_NR_MAX);
1061 }
1062
1063 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
1064 {
1065         spin_lock(&s->ls_ld_lock);
1066         if (list_empty(&d->ld_linkage))
1067                 list_add(&d->ld_linkage, &s->ls_ld_linkage);
1068         spin_unlock(&s->ls_ld_lock);
1069 }
1070 EXPORT_SYMBOL(lu_dev_add_linkage);
1071
1072 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
1073 {
1074         spin_lock(&s->ls_ld_lock);
1075         list_del_init(&d->ld_linkage);
1076         spin_unlock(&s->ls_ld_lock);
1077 }
1078 EXPORT_SYMBOL(lu_dev_del_linkage);
1079
1080 /* Initialize site \a s, with \a d as the top level device.  */
1081 int lu_site_init(struct lu_site *s, struct lu_device *top)
1082 {
1083         struct lu_site_bkt_data *bkt;
1084         unsigned int i;
1085         int rc;
1086
1087         ENTRY;
1088
1089         memset(s, 0, sizeof(*s));
1090         mutex_init(&s->ls_purge_mutex);
1091         lu_htable_limits(top);
1092
1093 #ifdef HAVE_PERCPU_COUNTER_INIT_GFP_FLAG
1094         rc = percpu_counter_init(&s->ls_lru_len_counter, 0, GFP_NOFS);
1095 #else
1096         rc = percpu_counter_init(&s->ls_lru_len_counter, 0);
1097 #endif
1098         if (rc)
1099                 return -ENOMEM;
1100
1101         if (rhashtable_init(&s->ls_obj_hash, &obj_hash_params) != 0) {
1102                 CERROR("failed to create lu_site hash\n");
1103                 return -ENOMEM;
1104         }
1105
1106         s->ls_bkt_seed = get_random_u32();
1107         s->ls_bkt_cnt = max_t(long, 1 << LU_SITE_BKT_BITS,
1108                               2 * num_possible_cpus());
1109         s->ls_bkt_cnt = roundup_pow_of_two(s->ls_bkt_cnt);
1110         OBD_ALLOC_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
1111         if (!s->ls_bkts) {
1112                 rhashtable_destroy(&s->ls_obj_hash);
1113                 s->ls_bkts = NULL;
1114                 return -ENOMEM;
1115         }
1116
1117         for (i = 0; i < s->ls_bkt_cnt; i++) {
1118                 bkt = &s->ls_bkts[i];
1119                 INIT_LIST_HEAD(&bkt->lsb_lru);
1120                 init_waitqueue_head(&bkt->lsb_waitq);
1121         }
1122
1123         s->ls_stats = lprocfs_stats_alloc(LU_SS_LAST_STAT, 0);
1124         if (s->ls_stats == NULL) {
1125                 OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
1126                 s->ls_bkts = NULL;
1127                 rhashtable_destroy(&s->ls_obj_hash);
1128                 return -ENOMEM;
1129         }
1130
1131         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED, 0, "created");
1132         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT, 0, "cache_hit");
1133         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS, 0, "cache_miss");
1134         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE, 0, "cache_race");
1135         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
1136                              0, "cache_death_race");
1137         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED, 0, "lru_purged");
1138
1139         INIT_LIST_HEAD(&s->ls_linkage);
1140         s->ls_top_dev = top;
1141         top->ld_site = s;
1142         lu_device_get(top);
1143         lu_ref_add(&top->ld_reference, "site-top", s);
1144
1145         INIT_LIST_HEAD(&s->ls_ld_linkage);
1146         spin_lock_init(&s->ls_ld_lock);
1147
1148         lu_dev_add_linkage(s, top);
1149
1150         RETURN(0);
1151 }
1152 EXPORT_SYMBOL(lu_site_init);
1153
1154 /* Finalize \a s and release its resources. */
1155 void lu_site_fini(struct lu_site *s)
1156 {
1157         down_write(&lu_sites_guard);
1158         list_del_init(&s->ls_linkage);
1159         up_write(&lu_sites_guard);
1160
1161         percpu_counter_destroy(&s->ls_lru_len_counter);
1162
1163         if (s->ls_bkts) {
1164                 rhashtable_destroy(&s->ls_obj_hash);
1165                 OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
1166                 s->ls_bkts = NULL;
1167         }
1168
1169         if (s->ls_top_dev != NULL) {
1170                 s->ls_top_dev->ld_site = NULL;
1171                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1172                 lu_device_put(s->ls_top_dev);
1173                 s->ls_top_dev = NULL;
1174         }
1175
1176         if (s->ls_stats != NULL)
1177                 lprocfs_stats_free(&s->ls_stats);
1178 }
1179 EXPORT_SYMBOL(lu_site_fini);
1180
1181 /* Called when initialization of stack for this site is completed. */
1182 int lu_site_init_finish(struct lu_site *s)
1183 {
1184         int result;
1185
1186         down_write(&lu_sites_guard);
1187         result = lu_context_refill(&lu_shrink_env.le_ctx);
1188         if (result == 0)
1189                 list_add(&s->ls_linkage, &lu_sites);
1190         up_write(&lu_sites_guard);
1191
1192         return result;
1193 }
1194 EXPORT_SYMBOL(lu_site_init_finish);
1195
1196 /* Acquire additional reference on device \a d */
1197 void lu_device_get(struct lu_device *d)
1198 {
1199         atomic_inc(&d->ld_ref);
1200 }
1201 EXPORT_SYMBOL(lu_device_get);
1202
1203 /* Release reference on device \a d. */
1204 void lu_device_put(struct lu_device *d)
1205 {
1206         LASSERT(atomic_read(&d->ld_ref) > 0);
1207         atomic_dec(&d->ld_ref);
1208 }
1209 EXPORT_SYMBOL(lu_device_put);
1210
1211 enum { /* Maximal number of tld slots. */
1212         LU_CONTEXT_KEY_NR = 40
1213 };
1214 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1215 static DECLARE_RWSEM(lu_key_initing);
1216
1217 /* Initialize device \a d of type \a t. */
1218 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1219 {
1220         if (atomic_add_unless(&t->ldt_device_nr, 1, 0) == 0) {
1221                 down_write(&lu_key_initing);
1222                 if (t->ldt_ops->ldto_start &&
1223                     atomic_read(&t->ldt_device_nr) == 0)
1224                         t->ldt_ops->ldto_start(t);
1225                 atomic_inc(&t->ldt_device_nr);
1226                 up_write(&lu_key_initing);
1227         }
1228
1229         memset(d, 0, sizeof(*d));
1230         d->ld_type = t;
1231         lu_ref_init(&d->ld_reference);
1232         INIT_LIST_HEAD(&d->ld_linkage);
1233
1234         return 0;
1235 }
1236 EXPORT_SYMBOL(lu_device_init);
1237
1238 /* Finalize device \a d. */
1239 void lu_device_fini(struct lu_device *d)
1240 {
1241         struct lu_device_type *t = d->ld_type;
1242
1243         if (d->ld_obd != NULL) {
1244                 d->ld_obd->obd_lu_dev = NULL;
1245                 d->ld_obd = NULL;
1246         }
1247
1248         lu_ref_fini(&d->ld_reference);
1249         LASSERTF(atomic_read(&d->ld_ref) == 0,
1250                  "Refcount is %u\n", atomic_read(&d->ld_ref));
1251         LASSERT(atomic_read(&t->ldt_device_nr) > 0);
1252
1253         if (atomic_dec_and_test(&t->ldt_device_nr) &&
1254             t->ldt_ops->ldto_stop != NULL)
1255                 t->ldt_ops->ldto_stop(t);
1256 }
1257 EXPORT_SYMBOL(lu_device_fini);
1258
1259 /* Initialize obj o that is part of compound obj h and was created by dev d */
1260 int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1261                    struct lu_device *d)
1262 {
1263         memset(o, 0, sizeof(*o));
1264         o->lo_header = h;
1265         o->lo_dev = d;
1266         lu_device_get(d);
1267         lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
1268         INIT_LIST_HEAD(&o->lo_linkage);
1269
1270         return 0;
1271 }
1272 EXPORT_SYMBOL(lu_object_init);
1273
1274 /* Finalize object and release its resources. */
1275 void lu_object_fini(struct lu_object *o)
1276 {
1277         struct lu_device *dev = o->lo_dev;
1278
1279         LASSERT(list_empty(&o->lo_linkage));
1280
1281         if (dev != NULL) {
1282                 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1283                               "lu_object", o);
1284                 lu_device_put(dev);
1285                 o->lo_dev = NULL;
1286         }
1287 }
1288 EXPORT_SYMBOL(lu_object_fini);
1289
1290 /**
1291  * Add object \a o as first layer of compound object \a h
1292  *
1293  * This is typically called by the ->ldo_object_alloc() method of top-level
1294  * device.
1295  */
1296 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1297 {
1298         list_move(&o->lo_linkage, &h->loh_layers);
1299 }
1300 EXPORT_SYMBOL(lu_object_add_top);
1301
1302 /**
1303  * Add object \a o as a layer of compound object, going after \a before.
1304  *
1305  * This is typically called by the ->ldo_object_alloc() method of \a
1306  * before->lo_dev.
1307  */
1308 void lu_object_add(struct lu_object *before, struct lu_object *o)
1309 {
1310         list_move(&o->lo_linkage, &before->lo_linkage);
1311 }
1312 EXPORT_SYMBOL(lu_object_add);
1313
1314 /* Initialize compound object. */
1315 int lu_object_header_init(struct lu_object_header *h)
1316 {
1317         memset(h, 0, sizeof(*h));
1318         atomic_set(&h->loh_ref, 1);
1319         INIT_LIST_HEAD(&h->loh_lru);
1320         INIT_LIST_HEAD(&h->loh_layers);
1321         lu_ref_init(&h->loh_reference);
1322         return 0;
1323 }
1324 EXPORT_SYMBOL(lu_object_header_init);
1325
1326 /* Finalize compound object. */
1327 void lu_object_header_fini(struct lu_object_header *h)
1328 {
1329         LASSERT(list_empty(&h->loh_layers));
1330         LASSERT(list_empty(&h->loh_lru));
1331         lu_ref_fini(&h->loh_reference);
1332 }
1333 EXPORT_SYMBOL(lu_object_header_fini);
1334
1335 /* Free lu_object_header with proper RCU handling */
1336 void lu_object_header_free(struct lu_object_header *h)
1337 {
1338         lu_object_header_fini(h);
1339         OBD_FREE_PRE(h, sizeof(*h), "kfreed");
1340         kfree_rcu(h, loh_rcu);
1341 }
1342 EXPORT_SYMBOL(lu_object_header_free);
1343
1344 /* For compound obj, find its slice, corresponding to the device type dtype  */
1345 struct lu_object *lu_object_locate(struct lu_object_header *h,
1346                                    const struct lu_device_type *dtype)
1347 {
1348         struct lu_object *o;
1349
1350         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1351                 if (o->lo_dev->ld_type == dtype)
1352                         return o;
1353         }
1354         return NULL;
1355 }
1356 EXPORT_SYMBOL(lu_object_locate);
1357
1358 /**
1359  * Finalize and free devices in the device stack.
1360  *
1361  * Finalize device stack by purging object cache, and calling
1362  * lu_device_type_operations::ldto_device_fini() and
1363  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1364  */
1365 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1366 {
1367         struct lu_site   *site = top->ld_site;
1368         struct lu_device *scan;
1369         struct lu_device *next;
1370
1371         lu_site_purge(env, site, ~0);
1372         for (scan = top; scan != NULL; scan = next) {
1373                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1374                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1375                 lu_device_put(scan);
1376         }
1377
1378         /* purge again. */
1379         lu_site_purge(env, site, ~0);
1380
1381         for (scan = top; scan != NULL; scan = next) {
1382                 const struct lu_device_type *ldt = scan->ld_type;
1383
1384                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1385         }
1386 }
1387
1388 /**
1389  * Global counter incremented whenever key is registered, unregistered,
1390  * revived or quiesced. This is used to void unnecessary calls to
1391  * lu_context_refill(). No locking is provided, as initialization and shutdown
1392  * are supposed to be externally serialized.
1393  */
1394 static atomic_t key_set_version = ATOMIC_INIT(0);
1395
1396 /* Register new key. */
1397 int lu_context_key_register(struct lu_context_key *key)
1398 {
1399         int result;
1400         unsigned int i;
1401
1402         LASSERT(key->lct_init != NULL);
1403         LASSERT(key->lct_fini != NULL);
1404         LASSERT(key->lct_tags != 0);
1405         LASSERT(key->lct_owner != NULL);
1406
1407         result = -ENFILE;
1408         atomic_set(&key->lct_used, 1);
1409         lu_ref_init(&key->lct_reference);
1410         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1411                 if (lu_keys[i])
1412                         continue;
1413                 key->lct_index = i;
1414
1415                 if (strncmp("osd_", module_name(key->lct_owner), 4) == 0)
1416                         CFS_RACE_WAIT(OBD_FAIL_OBD_SETUP);
1417
1418                 if (cmpxchg(&lu_keys[i], NULL, key) != NULL)
1419                         continue;
1420
1421                 result = 0;
1422                 atomic_inc(&key_set_version);
1423                 break;
1424         }
1425         if (result) {
1426                 lu_ref_fini(&key->lct_reference);
1427                 atomic_set(&key->lct_used, 0);
1428         }
1429         return result;
1430 }
1431 EXPORT_SYMBOL(lu_context_key_register);
1432
1433 static void key_fini(struct lu_context *ctx, int index)
1434 {
1435         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1436                 struct lu_context_key *key;
1437
1438                 key = lu_keys[index];
1439                 LASSERT(key != NULL);
1440                 LASSERT(key->lct_fini != NULL);
1441                 LASSERT(atomic_read(&key->lct_used) > 0);
1442
1443                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1444                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1445                 if (atomic_dec_and_test(&key->lct_used))
1446                         wake_up_var(&key->lct_used);
1447
1448                 LASSERT(key->lct_owner != NULL);
1449                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1450                         LINVRNT(module_refcount(key->lct_owner) > 0);
1451                         module_put(key->lct_owner);
1452                 }
1453                 ctx->lc_value[index] = NULL;
1454         }
1455 }
1456
1457 /* Deregister key. */
1458 void lu_context_key_degister(struct lu_context_key *key)
1459 {
1460         LASSERT(atomic_read(&key->lct_used) >= 1);
1461         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1462
1463         lu_context_key_quiesce(NULL, key);
1464
1465         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1466
1467         /**
1468          * Wait until all transient contexts referencing this key have
1469          * run lu_context_key::lct_fini() method.
1470          */
1471         atomic_dec(&key->lct_used);
1472         wait_var_event(&key->lct_used, atomic_read(&key->lct_used) == 0);
1473
1474         if (!WARN_ON(lu_keys[key->lct_index] == NULL))
1475                 lu_ref_fini(&key->lct_reference);
1476
1477         smp_store_release(&lu_keys[key->lct_index], NULL); /* release key */
1478 }
1479 EXPORT_SYMBOL(lu_context_key_degister);
1480
1481 /**
1482  * Register a number of keys. This has to be called after all keys have been
1483  * initialized by a call to LU_CONTEXT_KEY_INIT().
1484  */
1485 int lu_context_key_register_many(struct lu_context_key *k, ...)
1486 {
1487         struct lu_context_key *key = k;
1488         va_list args;
1489         int result;
1490
1491         va_start(args, k);
1492         do {
1493                 result = lu_context_key_register(key);
1494                 if (result)
1495                         break;
1496                 key = va_arg(args, struct lu_context_key *);
1497         } while (key != NULL);
1498         va_end(args);
1499
1500         if (result != 0) {
1501                 va_start(args, k);
1502                 while (k != key) {
1503                         lu_context_key_degister(k);
1504                         k = va_arg(args, struct lu_context_key *);
1505                 }
1506                 va_end(args);
1507         }
1508
1509         return result;
1510 }
1511 EXPORT_SYMBOL(lu_context_key_register_many);
1512
1513 /**
1514  * De-register a number of keys. This is a dual to
1515  * lu_context_key_register_many().
1516  */
1517 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1518 {
1519         va_list args;
1520
1521         va_start(args, k);
1522         do {
1523                 lu_context_key_degister(k);
1524                 k = va_arg(args, struct lu_context_key*);
1525         } while (k != NULL);
1526         va_end(args);
1527 }
1528 EXPORT_SYMBOL(lu_context_key_degister_many);
1529
1530 /* Revive a number of keys. */
1531 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1532 {
1533         va_list args;
1534
1535         va_start(args, k);
1536         do {
1537                 lu_context_key_revive(k);
1538                 k = va_arg(args, struct lu_context_key*);
1539         } while (k != NULL);
1540         va_end(args);
1541 }
1542 EXPORT_SYMBOL(lu_context_key_revive_many);
1543
1544 /* Quiescent a number of keys. */
1545 void lu_context_key_quiesce_many(struct lu_device_type *t,
1546                                  struct lu_context_key *k, ...)
1547 {
1548         va_list args;
1549
1550         va_start(args, k);
1551         do {
1552                 lu_context_key_quiesce(t, k);
1553                 k = va_arg(args, struct lu_context_key*);
1554         } while (k != NULL);
1555         va_end(args);
1556 }
1557 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1558
1559 /* Return value associated with key \a key in context \a ctx. */
1560 void *lu_context_key_get(const struct lu_context *ctx,
1561                          const struct lu_context_key *key)
1562 {
1563         LINVRNT(ctx->lc_state == LCS_ENTERED);
1564         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1565         LASSERT(lu_keys[key->lct_index] == key);
1566         return ctx->lc_value[key->lct_index];
1567 }
1568 EXPORT_SYMBOL(lu_context_key_get);
1569
1570 /**
1571  * List of remembered contexts. XXX document me.
1572  */
1573 static LIST_HEAD(lu_context_remembered);
1574 static DEFINE_SPINLOCK(lu_context_remembered_guard);
1575
1576 /**
1577  * Destroy \a key in all remembered contexts. This is used to destroy key
1578  * values in "shared" contexts (like service threads), when a module owning
1579  * the key is about to be unloaded.
1580  */
1581 void lu_context_key_quiesce(struct lu_device_type *t,
1582                             struct lu_context_key *key)
1583 {
1584         struct lu_context *ctx;
1585
1586         if (key->lct_tags & LCT_QUIESCENT)
1587                 return;
1588         /*
1589          * The write-lock on lu_key_initing will ensure that any
1590          * keys_fill() which didn't see LCT_QUIESCENT will have
1591          * finished before we call key_fini().
1592          */
1593         down_write(&lu_key_initing);
1594         if (!(key->lct_tags & LCT_QUIESCENT)) {
1595                 if (t == NULL || atomic_read(&t->ldt_device_nr) == 0)
1596                         key->lct_tags |= LCT_QUIESCENT;
1597                 up_write(&lu_key_initing);
1598
1599                 spin_lock(&lu_context_remembered_guard);
1600                 list_for_each_entry(ctx, &lu_context_remembered, lc_remember) {
1601                         spin_until_cond(READ_ONCE(ctx->lc_state) !=
1602                                         LCS_LEAVING);
1603                         key_fini(ctx, key->lct_index);
1604                 }
1605                 spin_unlock(&lu_context_remembered_guard);
1606
1607                 return;
1608         }
1609         up_write(&lu_key_initing);
1610 }
1611
1612 void lu_context_key_revive(struct lu_context_key *key)
1613 {
1614         key->lct_tags &= ~LCT_QUIESCENT;
1615         atomic_inc(&key_set_version);
1616 }
1617
1618 static void keys_fini(struct lu_context *ctx)
1619 {
1620         unsigned int i;
1621
1622         if (ctx->lc_value == NULL)
1623                 return;
1624
1625         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1626                 key_fini(ctx, i);
1627
1628         OBD_FREE_PTR_ARRAY(ctx->lc_value, ARRAY_SIZE(lu_keys));
1629         ctx->lc_value = NULL;
1630 }
1631
1632 static int keys_fill(struct lu_context *ctx)
1633 {
1634         unsigned int i;
1635         int rc = 0;
1636
1637         /*
1638          * A serialisation with lu_context_key_quiesce() is needed, to
1639          * ensure we see LCT_QUIESCENT and don't allocate a new value
1640          * after it freed one.  The rwsem provides this.  As down_read()
1641          * does optimistic spinning while the writer is active, this is
1642          * unlikely to ever sleep.
1643          */
1644         down_read(&lu_key_initing);
1645         ctx->lc_version = atomic_read(&key_set_version);
1646
1647         LINVRNT(ctx->lc_value);
1648         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1649                 struct lu_context_key *key;
1650
1651                 key = lu_keys[i];
1652                 if (!ctx->lc_value[i] && key &&
1653                     (key->lct_tags & ctx->lc_tags) &&
1654                     /*
1655                      * Don't create values for a LCT_QUIESCENT key, as this
1656                      * will pin module owning a key.
1657                      */
1658                     !(key->lct_tags & LCT_QUIESCENT)) {
1659                         void *value;
1660
1661                         LINVRNT(key->lct_init != NULL);
1662                         LINVRNT(key->lct_index == i);
1663
1664                         LASSERT(key->lct_owner != NULL);
1665                         if (!(ctx->lc_tags & LCT_NOREF) &&
1666                             try_module_get(key->lct_owner) == 0) {
1667                                 /* module is unloading, skip this key */
1668                                 continue;
1669                         }
1670
1671                         value = key->lct_init(ctx, key);
1672                         if (unlikely(IS_ERR(value))) {
1673                                 rc = PTR_ERR(value);
1674                                 break;
1675                         }
1676
1677                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1678                         atomic_inc(&key->lct_used);
1679                         /*
1680                          * This is the only place in the code, where an
1681                          * element of ctx->lc_value[] array is set to non-NULL
1682                          * value.
1683                          */
1684                         ctx->lc_value[i] = value;
1685                         if (key->lct_exit != NULL)
1686                                 ctx->lc_tags |= LCT_HAS_EXIT;
1687                 }
1688         }
1689
1690         up_read(&lu_key_initing);
1691         return rc;
1692 }
1693
1694 static int keys_init(struct lu_context *ctx)
1695 {
1696         OBD_ALLOC_PTR_ARRAY(ctx->lc_value, ARRAY_SIZE(lu_keys));
1697         if (likely(ctx->lc_value != NULL))
1698                 return keys_fill(ctx);
1699
1700         return -ENOMEM;
1701 }
1702
1703 /* Initialize context data-structure. Create values for all keys. */
1704 int lu_context_init(struct lu_context *ctx, __u32 tags)
1705 {
1706         int     rc;
1707
1708         memset(ctx, 0, sizeof(*ctx));
1709         ctx->lc_state = LCS_INITIALIZED;
1710         ctx->lc_tags = tags;
1711         if (tags & LCT_REMEMBER) {
1712                 spin_lock(&lu_context_remembered_guard);
1713                 list_add(&ctx->lc_remember, &lu_context_remembered);
1714                 spin_unlock(&lu_context_remembered_guard);
1715         } else {
1716                 INIT_LIST_HEAD(&ctx->lc_remember);
1717         }
1718
1719         rc = keys_init(ctx);
1720         if (rc != 0)
1721                 lu_context_fini(ctx);
1722
1723         return rc;
1724 }
1725 EXPORT_SYMBOL(lu_context_init);
1726
1727 /* Finalize context data-structure. Destroy key values. */
1728 void lu_context_fini(struct lu_context *ctx)
1729 {
1730         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1731         ctx->lc_state = LCS_FINALIZED;
1732
1733         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1734                 LASSERT(list_empty(&ctx->lc_remember));
1735         } else {
1736                 /* could race with key degister */
1737                 spin_lock(&lu_context_remembered_guard);
1738                 list_del_init(&ctx->lc_remember);
1739                 spin_unlock(&lu_context_remembered_guard);
1740         }
1741         keys_fini(ctx);
1742 }
1743 EXPORT_SYMBOL(lu_context_fini);
1744
1745 /* Called before entering context. */
1746 void lu_context_enter(struct lu_context *ctx)
1747 {
1748         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1749         ctx->lc_state = LCS_ENTERED;
1750 }
1751 EXPORT_SYMBOL(lu_context_enter);
1752
1753 /* Called after exiting from \a ctx */
1754 void lu_context_exit(struct lu_context *ctx)
1755 {
1756         unsigned int i;
1757
1758         LINVRNT(ctx->lc_state == LCS_ENTERED);
1759         /*
1760          * Disable preempt to ensure we get a warning if
1761          * any lct_exit ever tries to sleep.  That would hurt
1762          * lu_context_key_quiesce() which spins waiting for us.
1763          * This also ensure we aren't preempted while the state
1764          * is LCS_LEAVING, as that too would cause problems for
1765          * lu_context_key_quiesce().
1766          */
1767         preempt_disable();
1768         /*
1769          * Ensure lu_context_key_quiesce() sees LCS_LEAVING
1770          * or we see LCT_QUIESCENT
1771          */
1772         smp_store_mb(ctx->lc_state, LCS_LEAVING);
1773         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) {
1774                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1775                         struct lu_context_key *key;
1776
1777                         key = lu_keys[i];
1778                         if (ctx->lc_value[i] &&
1779                             !(key->lct_tags & LCT_QUIESCENT) &&
1780                             key->lct_exit)
1781                                 key->lct_exit(ctx, key, ctx->lc_value[i]);
1782                 }
1783         }
1784
1785         smp_store_release(&ctx->lc_state, LCS_LEFT); /* release ownership  */
1786         preempt_enable();
1787 }
1788 EXPORT_SYMBOL(lu_context_exit);
1789
1790 /**
1791  * Allocate for context all missing keys that were registered after context
1792  * creation. key_set_version is only changed in rare cases when modules
1793  * are loaded and removed.
1794  */
1795 int lu_context_refill(struct lu_context *ctx)
1796 {
1797         if (likely(ctx->lc_version == atomic_read(&key_set_version)))
1798                 return 0;
1799
1800         return keys_fill(ctx);
1801 }
1802
1803 /**
1804  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1805  * obd being added. Currently, this is only used on client side, specifically
1806  * for echo device client, for other stack (like ptlrpc threads), context are
1807  * predefined when the lu_device type are registered, during the module probe
1808  * phase.
1809  */
1810 u32 lu_context_tags_default = LCT_CL_THREAD;
1811 u32 lu_session_tags_default = LCT_SESSION;
1812
1813 void lu_context_tags_update(__u32 tags)
1814 {
1815         spin_lock(&lu_context_remembered_guard);
1816         lu_context_tags_default |= tags;
1817         atomic_inc(&key_set_version);
1818         spin_unlock(&lu_context_remembered_guard);
1819 }
1820 EXPORT_SYMBOL(lu_context_tags_update);
1821
1822 void lu_context_tags_clear(__u32 tags)
1823 {
1824         spin_lock(&lu_context_remembered_guard);
1825         lu_context_tags_default &= ~tags;
1826         atomic_inc(&key_set_version);
1827         spin_unlock(&lu_context_remembered_guard);
1828 }
1829 EXPORT_SYMBOL(lu_context_tags_clear);
1830
1831 void lu_session_tags_update(__u32 tags)
1832 {
1833         spin_lock(&lu_context_remembered_guard);
1834         lu_session_tags_default |= tags;
1835         atomic_inc(&key_set_version);
1836         spin_unlock(&lu_context_remembered_guard);
1837 }
1838 EXPORT_SYMBOL(lu_session_tags_update);
1839
1840 void lu_session_tags_clear(__u32 tags)
1841 {
1842         spin_lock(&lu_context_remembered_guard);
1843         lu_session_tags_default &= ~tags;
1844         atomic_inc(&key_set_version);
1845         spin_unlock(&lu_context_remembered_guard);
1846 }
1847 EXPORT_SYMBOL(lu_session_tags_clear);
1848
1849 int lu_env_init(struct lu_env *env, __u32 tags)
1850 {
1851         int result;
1852
1853         env->le_ses = NULL;
1854         result = lu_context_init(&env->le_ctx, tags);
1855         if (likely(result == 0))
1856                 lu_context_enter(&env->le_ctx);
1857         return result;
1858 }
1859 EXPORT_SYMBOL(lu_env_init);
1860
1861 void lu_env_fini(struct lu_env *env)
1862 {
1863         lu_context_exit(&env->le_ctx);
1864         lu_context_fini(&env->le_ctx);
1865         env->le_ses = NULL;
1866 }
1867 EXPORT_SYMBOL(lu_env_fini);
1868
1869 int lu_env_refill(struct lu_env *env)
1870 {
1871         int result;
1872
1873         result = lu_context_refill(&env->le_ctx);
1874         if (result == 0 && env->le_ses != NULL)
1875                 result = lu_context_refill(env->le_ses);
1876         return result;
1877 }
1878 EXPORT_SYMBOL(lu_env_refill);
1879
1880 /**
1881  * Currently, this API will only be used by echo client.
1882  * Because echo client and normal lustre client will share
1883  * same cl_env cache. So echo client needs to refresh
1884  * the env context after it get one from the cache, especially
1885  * when normal client and echo client co-exist in the same client.
1886  */
1887 int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
1888                           __u32 stags)
1889 {
1890         int    result;
1891
1892         if ((env->le_ctx.lc_tags & ctags) != ctags) {
1893                 env->le_ctx.lc_version = 0;
1894                 env->le_ctx.lc_tags |= ctags;
1895         }
1896
1897         if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
1898                 env->le_ses->lc_version = 0;
1899                 env->le_ses->lc_tags |= stags;
1900         }
1901
1902         result = lu_env_refill(env);
1903
1904         return result;
1905 }
1906 EXPORT_SYMBOL(lu_env_refill_by_tags);
1907
1908
1909 struct lu_env_item {
1910         struct task_struct *lei_task;   /* rhashtable key */
1911         struct rhash_head lei_linkage;
1912         struct lu_env *lei_env;
1913         struct rcu_head lei_rcu_head;
1914 };
1915
1916 static const struct rhashtable_params lu_env_rhash_params = {
1917         .key_len     = sizeof(struct task_struct *),
1918         .key_offset  = offsetof(struct lu_env_item, lei_task),
1919         .head_offset = offsetof(struct lu_env_item, lei_linkage),
1920 };
1921
1922 struct rhashtable lu_env_rhash;
1923
1924 struct lu_env_percpu {
1925         struct task_struct *lep_task;
1926         struct lu_env *lep_env ____cacheline_aligned_in_smp;
1927 };
1928
1929 static struct lu_env_percpu lu_env_percpu[NR_CPUS];
1930
1931 int lu_env_add_task(struct lu_env *env, struct task_struct *task)
1932 {
1933         struct lu_env_item *lei, *old;
1934
1935         LASSERT(env);
1936
1937         OBD_ALLOC_PTR(lei);
1938         if (!lei)
1939                 return -ENOMEM;
1940
1941         lei->lei_task = task;
1942         lei->lei_env = env;
1943
1944         old = rhashtable_lookup_get_insert_fast(&lu_env_rhash,
1945                                                 &lei->lei_linkage,
1946                                                 lu_env_rhash_params);
1947         LASSERT(!old);
1948
1949         return 0;
1950 }
1951 EXPORT_SYMBOL(lu_env_add_task);
1952
1953 int lu_env_add(struct lu_env *env)
1954 {
1955         return lu_env_add_task(env, current);
1956 }
1957 EXPORT_SYMBOL(lu_env_add);
1958
1959 static void lu_env_item_free(struct rcu_head *head)
1960 {
1961         struct lu_env_item *lei;
1962
1963         lei = container_of(head, struct lu_env_item, lei_rcu_head);
1964         OBD_FREE_PTR(lei);
1965 }
1966
1967 void lu_env_remove(struct lu_env *env)
1968 {
1969         struct lu_env_item *lei;
1970         const void *task = current;
1971         int i;
1972
1973         for_each_possible_cpu(i) {
1974                 if (lu_env_percpu[i].lep_env == env) {
1975                         LASSERT(lu_env_percpu[i].lep_task == task);
1976                         lu_env_percpu[i].lep_task = NULL;
1977                         lu_env_percpu[i].lep_env = NULL;
1978                 }
1979         }
1980
1981         /* The rcu_lock is not taking in this case since the key
1982          * used is the actual task_struct. This implies that each
1983          * object is only removed by the owning thread, so there
1984          * can never be a race on a particular object.
1985          */
1986         lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
1987                                      lu_env_rhash_params);
1988         if (lei && rhashtable_remove_fast(&lu_env_rhash, &lei->lei_linkage,
1989                                           lu_env_rhash_params) == 0)
1990                 call_rcu(&lei->lei_rcu_head, lu_env_item_free);
1991 }
1992 EXPORT_SYMBOL(lu_env_remove);
1993
1994 struct lu_env *lu_env_find(void)
1995 {
1996         struct lu_env *env = NULL;
1997         struct lu_env_item *lei;
1998         const void *task = current;
1999         int i = get_cpu();
2000
2001         if (lu_env_percpu[i].lep_task == current) {
2002                 env = lu_env_percpu[i].lep_env;
2003                 put_cpu();
2004                 LASSERT(env);
2005                 return env;
2006         }
2007
2008         lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
2009                                      lu_env_rhash_params);
2010         if (lei) {
2011                 env = lei->lei_env;
2012                 lu_env_percpu[i].lep_task = current;
2013                 lu_env_percpu[i].lep_env = env;
2014         }
2015         put_cpu();
2016
2017         return env;
2018 }
2019 EXPORT_SYMBOL(lu_env_find);
2020
2021 typedef struct lu_site_stats {
2022         unsigned int lss_populated;
2023         unsigned int lss_max_search;
2024         unsigned int lss_total;
2025         unsigned int lss_busy;
2026 } lu_site_stats_t;
2027
2028 static void lu_site_stats_get(const struct lu_site *s,
2029                               lu_site_stats_t *stats)
2030 {
2031         int cnt = atomic_read(&s->ls_obj_hash.nelems);
2032         /*
2033          * percpu_counter_sum_positive() won't accept a const pointer
2034          * as it does modify the struct by taking a spinlock
2035          */
2036         struct lu_site *s2 = (struct lu_site *)s;
2037
2038         stats->lss_busy += cnt -
2039                 percpu_counter_sum_positive(&s2->ls_lru_len_counter);
2040
2041         stats->lss_total += cnt;
2042         stats->lss_max_search = 0;
2043         stats->lss_populated = 0;
2044 }
2045
2046
2047 /*
2048  * lu_cache_shrink_count() returns an approximate number of cached objects
2049  * that can be freed by shrink_slab(). A counter, which tracks the
2050  * number of items in the site's lru, is maintained in a percpu_counter
2051  * for each site. The percpu values are incremented and decremented as
2052  * objects are added or removed from the lru. The percpu values are summed
2053  * and saved whenever a percpu value exceeds a threshold. Thus the saved,
2054  * summed value at any given time may not accurately reflect the current
2055  * lru length. But this value is sufficiently accurate for the needs of
2056  * a shrinker.
2057  *
2058  * Using a per cpu counter is a compromise solution to concurrent access:
2059  * lu_object_put() can update the counter without locking the site and
2060  * lu_cache_shrink_count can sum the counters without locking each
2061  * ls_obj_hash bucket.
2062  */
2063 static unsigned long lu_cache_shrink_count(struct shrinker *sk,
2064                                            struct shrink_control *sc)
2065 {
2066         struct lu_site *s;
2067         struct lu_site *tmp;
2068         unsigned long cached = 0;
2069
2070         if (!(sc->gfp_mask & __GFP_FS))
2071                 return 0;
2072
2073         if (!down_read_trylock(&lu_sites_guard))
2074                 return 0;
2075         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage)
2076                 cached += percpu_counter_read_positive(&s->ls_lru_len_counter);
2077         up_read(&lu_sites_guard);
2078
2079         cached = (cached / 100) * sysctl_vfs_cache_pressure;
2080         CDEBUG(D_INODE, "%ld objects cached, cache pressure %d\n",
2081                cached, sysctl_vfs_cache_pressure);
2082
2083         return cached;
2084 }
2085
2086 static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
2087                                           struct shrink_control *sc)
2088 {
2089         struct lu_site *s;
2090         struct lu_site *tmp;
2091         unsigned long remain = sc->nr_to_scan;
2092         LIST_HEAD(splice);
2093
2094         if (!(sc->gfp_mask & __GFP_FS))
2095                 /* We must not take the lu_sites_guard lock when
2096                  * __GFP_FS is *not* set because of the deadlock
2097                  * possibility detailed above. Additionally,
2098                  * since we cannot determine the number of
2099                  * objects in the cache without taking this
2100                  * lock, we're in a particularly tough spot. As
2101                  * a result, we'll just lie and say our cache is
2102                  * empty. This _should_ be ok, as we can't
2103                  * reclaim objects when __GFP_FS is *not* set
2104                  * anyways.
2105                  */
2106                 return SHRINK_STOP;
2107
2108         if (!down_write_trylock(&lu_sites_guard))
2109                 return SHRINK_STOP;
2110
2111         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
2112                 remain = lu_site_purge(&lu_shrink_env, s, remain);
2113                 /*
2114                  * Move just shrunk site to the tail of site list to
2115                  * assure shrinking fairness.
2116                  */
2117                 list_move_tail(&s->ls_linkage, &splice);
2118         }
2119         list_splice(&splice, lu_sites.prev);
2120         up_write(&lu_sites_guard);
2121
2122         return sc->nr_to_scan - remain;
2123 }
2124
2125 #ifdef HAVE_SHRINKER_COUNT
2126 static struct ll_shrinker_ops lu_site_sh_ops = {
2127         .count_objects  = lu_cache_shrink_count,
2128         .scan_objects   = lu_cache_shrink_scan,
2129         .seeks          = DEFAULT_SEEKS,
2130 };
2131
2132 #else
2133 /*
2134  * There exists a potential lock inversion deadlock scenario when using
2135  * Lustre on top of ZFS. This occurs between one of ZFS's
2136  * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
2137  * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
2138  * while thread B will take the ht_lock and sleep on the lu_sites_guard
2139  * lock. Obviously neither thread will wake and drop their respective hold
2140  * on their lock.
2141  *
2142  * To prevent this from happening we must ensure the lu_sites_guard lock is
2143  * not taken while down this code path. ZFS reliably does not set the
2144  * __GFP_FS bit in its code paths, so this can be used to determine if it
2145  * is safe to take the lu_sites_guard lock.
2146  *
2147  * Ideally we should accurately return the remaining number of cached
2148  * objects without taking the lu_sites_guard lock, but this is not
2149  * possible in the current implementation.
2150  */
2151 static int lu_cache_shrink(struct shrinker *shrinker,
2152                            struct shrink_control *sc)
2153 {
2154         int cached = 0;
2155
2156         CDEBUG(D_INODE, "Shrink %lu objects\n", sc->nr_to_scan);
2157
2158         if (sc->nr_to_scan != 0)
2159                 lu_cache_shrink_scan(shrinker, sc);
2160
2161         cached = lu_cache_shrink_count(shrinker, sc);
2162         return cached;
2163 }
2164
2165 static struct ll_shrinker_ops lu_site_sh_ops = {
2166         .shrink  = lu_cache_shrink,
2167         .seeks   = DEFAULT_SEEKS,
2168 };
2169
2170 #endif /* HAVE_SHRINKER_COUNT */
2171
2172 static struct shrinker *lu_site_shrinker;
2173
2174 /* Initialization of global lu_* data. */
2175 int lu_global_init(void)
2176 {
2177         int result;
2178
2179         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
2180
2181         result = lu_ref_global_init();
2182         if (result != 0)
2183                 return result;
2184
2185         LU_CONTEXT_KEY_INIT(&lu_global_key);
2186         result = lu_context_key_register(&lu_global_key);
2187         if (result)
2188                 goto out_lu_ref;
2189
2190         /*
2191          * At this level, we don't know what tags are needed, so allocate them
2192          * conservatively. This should not be too bad, because this
2193          * environment is global.
2194          */
2195         down_write(&lu_sites_guard);
2196         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
2197         up_write(&lu_sites_guard);
2198         if (result) {
2199                 lu_context_key_degister(&lu_global_key);
2200                 goto out_lu_ref;
2201         }
2202
2203         /*
2204          * seeks estimation: 3 seeks to read a record from oi, one to read
2205          * inode, one for ea. Unfortunately setting this high value results in
2206          * lu_object/inode cache consuming all the memory.
2207          */
2208         lu_site_shrinker = ll_shrinker_create(&lu_site_sh_ops, 0, "lu_site");
2209         if (IS_ERR(lu_site_shrinker)) {
2210                 result = PTR_ERR(lu_site_shrinker);
2211                 goto out_env;
2212         }
2213
2214         result = rhashtable_init(&lu_env_rhash, &lu_env_rhash_params);
2215
2216         if (result)
2217                 goto out_shrinker;
2218
2219         return result;
2220
2221 out_shrinker:
2222         shrinker_free(lu_site_shrinker);
2223 out_env:
2224         /* ordering here is explained in lu_global_fini() */
2225         lu_context_key_degister(&lu_global_key);
2226         down_write(&lu_sites_guard);
2227         lu_env_fini(&lu_shrink_env);
2228         up_write(&lu_sites_guard);
2229 out_lu_ref:
2230         lu_ref_global_fini();
2231         return result;
2232 }
2233
2234 /* Dual to lu_global_init(). */
2235 void lu_global_fini(void)
2236 {
2237         shrinker_free(lu_site_shrinker);
2238
2239         lu_context_key_degister(&lu_global_key);
2240
2241         /*
2242          * Tear shrinker environment down _after_ de-registering
2243          * lu_global_key, because the latter has a value in the former.
2244          */
2245         down_write(&lu_sites_guard);
2246         lu_env_fini(&lu_shrink_env);
2247         up_write(&lu_sites_guard);
2248
2249         rhashtable_destroy(&lu_env_rhash);
2250
2251         lu_ref_global_fini();
2252 }
2253
2254 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
2255 {
2256 #ifdef CONFIG_PROC_FS
2257         struct lprocfs_counter ret;
2258
2259         lprocfs_stats_collect(stats, idx, &ret);
2260         return (__u32)ret.lc_count;
2261 #else
2262         return 0;
2263 #endif
2264 }
2265
2266 /**
2267  * Output site statistical counters into a buffer. Suitable for
2268  * lprocfs_rd_*()-style functions.
2269  */
2270 int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
2271 {
2272         const struct bucket_table *tbl;
2273         lu_site_stats_t stats;
2274         unsigned int chains;
2275
2276         memset(&stats, 0, sizeof(stats));
2277         lu_site_stats_get(s, &stats);
2278
2279         rcu_read_lock();
2280         tbl = rht_dereference_rcu(s->ls_obj_hash.tbl,
2281                                   &((struct lu_site *)s)->ls_obj_hash);
2282         chains = tbl->size;
2283         rcu_read_unlock();
2284         seq_printf(m, "%d/%d %d/%u %d %d %d %d %d %d %d\n",
2285                    stats.lss_busy,
2286                    stats.lss_total,
2287                    stats.lss_populated,
2288                    chains,
2289                    stats.lss_max_search,
2290                    ls_stats_read(s->ls_stats, LU_SS_CREATED),
2291                    ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2292                    ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2293                    ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2294                    ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2295                    ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2296         return 0;
2297 }
2298 EXPORT_SYMBOL(lu_site_stats_seq_print);
2299
2300 /* Helper function to initialize a number of kmem slab caches at once. */
2301 int lu_kmem_init(struct lu_kmem_descr *caches)
2302 {
2303         int result;
2304         struct lu_kmem_descr *iter = caches;
2305
2306         for (result = 0; iter->ckd_cache != NULL; ++iter) {
2307                 *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
2308                                                      iter->ckd_size,
2309                                                      0, 0, NULL);
2310                 if (*iter->ckd_cache == NULL) {
2311                         result = -ENOMEM;
2312                         /* free all previously allocated caches */
2313                         lu_kmem_fini(caches);
2314                         break;
2315                 }
2316         }
2317         return result;
2318 }
2319 EXPORT_SYMBOL(lu_kmem_init);
2320
2321 /**
2322  * Helper function to finalize a number of kmem slab cached at once. Dual to
2323  * lu_kmem_init().
2324  */
2325 void lu_kmem_fini(struct lu_kmem_descr *caches)
2326 {
2327         for (; caches->ckd_cache != NULL; ++caches) {
2328                 if (*caches->ckd_cache != NULL) {
2329                         kmem_cache_destroy(*caches->ckd_cache);
2330                         *caches->ckd_cache = NULL;
2331                 }
2332         }
2333 }
2334 EXPORT_SYMBOL(lu_kmem_fini);
2335
2336 /**
2337  * Temporary solution to be able to assign fid in ->do_create()
2338  * till we have fully-functional OST fids
2339  */
2340 void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
2341                           const struct lu_fid *fid)
2342 {
2343         struct lu_site          *s = o->lo_dev->ld_site;
2344         struct lu_fid           *old = &o->lo_header->loh_fid;
2345         int rc;
2346
2347         LASSERT(fid_is_zero(old));
2348         *old = *fid;
2349 try_again:
2350         rc = rhashtable_lookup_insert_fast(&s->ls_obj_hash,
2351                                            &o->lo_header->loh_hash,
2352                                            obj_hash_params);
2353         /* supposed to be unique */
2354         LASSERT(rc != -EEXIST);
2355         /* handle hash table resizing */
2356         if (rc == -ENOMEM || rc == -EBUSY) {
2357                 msleep(20);
2358                 goto try_again;
2359         }
2360         /* trim the hash if its growing to big */
2361         lu_object_limit(env, o->lo_dev);
2362         if (rc == -E2BIG)
2363                 goto try_again;
2364
2365         LASSERTF(rc == 0, "failed hashtable insertion: rc = %d\n", rc);
2366 }
2367 EXPORT_SYMBOL(lu_object_assign_fid);
2368
2369 /**
2370  * allocates object with 0 (non-assiged) fid
2371  * XXX: temporary solution to be able to assign fid in ->do_create()
2372  *      till we have fully-functional OST fids
2373  */
2374 struct lu_object *lu_object_anon(const struct lu_env *env,
2375                                  struct lu_device *dev,
2376                                  const struct lu_object_conf *conf)
2377 {
2378         struct lu_fid fid;
2379         struct lu_object *o;
2380         int rc;
2381
2382         fid_zero(&fid);
2383         o = lu_object_alloc(env, dev, &fid);
2384         if (!IS_ERR(o)) {
2385                 rc = lu_object_start(env, dev, o, conf);
2386                 if (rc) {
2387                         lu_object_free(env, o);
2388                         return ERR_PTR(rc);
2389                 }
2390         }
2391
2392         return o;
2393 }
2394 EXPORT_SYMBOL(lu_object_anon);
2395
2396 struct lu_buf LU_BUF_NULL = {
2397         .lb_buf = NULL,
2398         .lb_len = 0
2399 };
2400 EXPORT_SYMBOL(LU_BUF_NULL);
2401
2402 void lu_buf_free(struct lu_buf *buf)
2403 {
2404         LASSERT(buf);
2405         if (buf->lb_buf) {
2406                 LASSERT(buf->lb_len > 0);
2407                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2408                 buf->lb_buf = NULL;
2409                 buf->lb_len = 0;
2410         }
2411 }
2412 EXPORT_SYMBOL(lu_buf_free);
2413
2414 void lu_buf_alloc(struct lu_buf *buf, size_t size)
2415 {
2416         LASSERT(buf);
2417         LASSERT(buf->lb_buf == NULL);
2418         LASSERT(buf->lb_len == 0);
2419         OBD_ALLOC_LARGE(buf->lb_buf, size);
2420         if (likely(buf->lb_buf))
2421                 buf->lb_len = size;
2422 }
2423 EXPORT_SYMBOL(lu_buf_alloc);
2424
2425 void lu_buf_realloc(struct lu_buf *buf, size_t size)
2426 {
2427         lu_buf_free(buf);
2428         lu_buf_alloc(buf, size);
2429 }
2430 EXPORT_SYMBOL(lu_buf_realloc);
2431
2432 struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, size_t len)
2433 {
2434         if (buf->lb_buf == NULL && buf->lb_len == 0)
2435                 lu_buf_alloc(buf, len);
2436
2437         if ((len > buf->lb_len) && (buf->lb_buf != NULL))
2438                 lu_buf_realloc(buf, len);
2439
2440         return buf;
2441 }
2442 EXPORT_SYMBOL(lu_buf_check_and_alloc);
2443
2444 /**
2445  * Increase the size of the \a buf.
2446  * preserves old data in buffer
2447  * old buffer remains unchanged on error
2448  * \retval 0 or -ENOMEM
2449  */
2450 int lu_buf_check_and_grow(struct lu_buf *buf, size_t len)
2451 {
2452         char *ptr;
2453
2454         if (len <= buf->lb_len)
2455                 return 0;
2456
2457         OBD_ALLOC_LARGE(ptr, len);
2458         if (ptr == NULL)
2459                 return -ENOMEM;
2460
2461         /* Free the old buf */
2462         if (buf->lb_buf != NULL) {
2463                 memcpy(ptr, buf->lb_buf, buf->lb_len);
2464                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2465         }
2466
2467         buf->lb_buf = ptr;
2468         buf->lb_len = len;
2469         return 0;
2470 }
2471 EXPORT_SYMBOL(lu_buf_check_and_grow);