Whamcloud - gitweb
LU-14752 obdclass: handle EBUSY returned for lu_object hashtable
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/obdclass/lu_object.c
32  *
33  * Lustre Object.
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  *
37  *   Author: Nikita Danilov <nikita.danilov@sun.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_CLASS
41
42 #include <linux/delay.h>
43 #include <linux/module.h>
44 #include <linux/list.h>
45 #include <linux/processor.h>
46 #include <linux/random.h>
47
48 #include <libcfs/libcfs.h>
49 #include <libcfs/linux/linux-mem.h>
50 #include <libcfs/linux/linux-hash.h>
51 #include <obd_class.h>
52 #include <obd_support.h>
53 #include <lustre_disk.h>
54 #include <lustre_fid.h>
55 #include <lu_object.h>
56 #include <lu_ref.h>
57
58 struct lu_site_bkt_data {
59         /**
60          * LRU list, updated on each access to object. Protected by
61          * lsb_waitq.lock.
62          *
63          * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
64          * moved to the lu_site::ls_lru.prev
65          */
66         struct list_head                lsb_lru;
67         /**
68          * Wait-queue signaled when an object in this site is ultimately
69          * destroyed (lu_object_free()) or initialized (lu_object_start()).
70          * It is used by lu_object_find() to wait before re-trying when
71          * object in the process of destruction is found in the hash table;
72          * or wait object to be initialized by the allocator.
73          *
74          * \see htable_lookup().
75          */
76         wait_queue_head_t               lsb_waitq;
77 };
78
79 enum {
80         LU_CACHE_PERCENT_MAX     = 50,
81         LU_CACHE_PERCENT_DEFAULT = 20
82 };
83
84 #define LU_CACHE_NR_MAX_ADJUST          512
85 #define LU_CACHE_NR_UNLIMITED           -1
86 #define LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
87 /** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */
88 #define LU_CACHE_NR_ZFS_LIMIT           10240
89
90 #define LU_CACHE_NR_MIN                 4096
91 #define LU_CACHE_NR_MAX                 0x80000000UL
92
93 /**
94  * Max 256 buckets, we don't want too many buckets because:
95  * - consume too much memory (currently max 16K)
96  * - avoid unbalanced LRU list
97  * With few cpus there is little gain from extra buckets, so
98  * we treat this as a maximum in lu_site_init().
99  */
100 #define LU_SITE_BKT_BITS    8
101
102 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
103 module_param(lu_cache_percent, int, 0644);
104 MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
105
106 static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
107 module_param(lu_cache_nr, long, 0644);
108 MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
109
110 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
111 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
112
113 static u32 lu_fid_hash(const void *data, u32 len, u32 seed)
114 {
115         const struct lu_fid *fid = data;
116
117         seed = cfs_hash_32(seed ^ fid->f_oid, 32);
118         seed ^= cfs_hash_64(fid->f_seq, 32);
119         return seed;
120 }
121
122 static const struct rhashtable_params obj_hash_params = {
123         .key_len        = sizeof(struct lu_fid),
124         .key_offset     = offsetof(struct lu_object_header, loh_fid),
125         .head_offset    = offsetof(struct lu_object_header, loh_hash),
126         .hashfn         = lu_fid_hash,
127         .automatic_shrinking = true,
128 };
129
130 static inline int lu_bkt_hash(struct lu_site *s, const struct lu_fid *fid)
131 {
132         return lu_fid_hash(fid, sizeof(*fid), s->ls_bkt_seed) &
133                (s->ls_bkt_cnt - 1);
134 }
135
136 wait_queue_head_t *
137 lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
138 {
139         struct lu_site_bkt_data *bkt;
140
141         bkt = &site->ls_bkts[lu_bkt_hash(site, fid)];
142         return &bkt->lsb_waitq;
143 }
144 EXPORT_SYMBOL(lu_site_wq_from_fid);
145
146 /**
147  * Decrease reference counter on object. If last reference is freed, return
148  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
149  * case, free object immediately.
150  */
151 void lu_object_put(const struct lu_env *env, struct lu_object *o)
152 {
153         struct lu_site_bkt_data *bkt;
154         struct lu_object_header *top = o->lo_header;
155         struct lu_site *site = o->lo_dev->ld_site;
156         struct lu_object *orig = o;
157         const struct lu_fid *fid = lu_object_fid(o);
158
159         /*
160          * till we have full fids-on-OST implemented anonymous objects
161          * are possible in OSP. such an object isn't listed in the site
162          * so we should not remove it from the site.
163          */
164         if (fid_is_zero(fid)) {
165                 LASSERT(list_empty(&top->loh_lru));
166                 if (!atomic_dec_and_test(&top->loh_ref))
167                         return;
168                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
169                         if (o->lo_ops->loo_object_release != NULL)
170                                 o->lo_ops->loo_object_release(env, o);
171                 }
172                 lu_object_free(env, orig);
173                 return;
174         }
175
176         bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
177         if (atomic_add_unless(&top->loh_ref, -1, 1)) {
178 still_active:
179                 /*
180                  * At this point the object reference is dropped and lock is
181                  * not taken, so lu_object should not be touched because it
182                  * can be freed by concurrent thread.
183                  *
184                  * Somebody may be waiting for this, currently only used for
185                  * cl_object, see cl_object_put_last().
186                  */
187                 wake_up(&bkt->lsb_waitq);
188
189                 return;
190         }
191
192         spin_lock(&bkt->lsb_waitq.lock);
193         if (!atomic_dec_and_test(&top->loh_ref)) {
194                 spin_unlock(&bkt->lsb_waitq.lock);
195                 goto still_active;
196         }
197
198         /*
199          * Refcount is zero, and cannot be incremented without taking the bkt
200          * lock, so object is stable.
201          */
202
203         /*
204          * When last reference is released, iterate over object layers, and
205          * notify them that object is no longer busy.
206          */
207         list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
208                 if (o->lo_ops->loo_object_release != NULL)
209                         o->lo_ops->loo_object_release(env, o);
210         }
211
212         /*
213          * Don't use local 'is_dying' here because if was taken without lock but
214          * here we need the latest actual value of it so check lu_object
215          * directly here.
216          */
217         if (!lu_object_is_dying(top) &&
218             (lu_object_exists(orig) || lu_object_is_cl(orig))) {
219                 LASSERT(list_empty(&top->loh_lru));
220                 list_add_tail(&top->loh_lru, &bkt->lsb_lru);
221                 spin_unlock(&bkt->lsb_waitq.lock);
222                 percpu_counter_inc(&site->ls_lru_len_counter);
223                 CDEBUG(D_INODE, "Add %p/%p to site lru. bkt: %p\n",
224                        orig, top, bkt);
225                 return;
226         }
227
228         /*
229          * If object is dying (will not be cached) then remove it from hash
230          * table (it is already not on the LRU).
231          *
232          * This is done with bucket lock held.  As the only way to acquire first
233          * reference to previously unreferenced object is through hash-table
234          * lookup (lu_object_find()) which takes the lock for first reference,
235          * no race with concurrent object lookup is possible and we can safely
236          * destroy object below.
237          */
238         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
239                 rhashtable_remove_fast(&site->ls_obj_hash, &top->loh_hash,
240                                        obj_hash_params);
241
242         spin_unlock(&bkt->lsb_waitq.lock);
243         /* Object was already removed from hash above, can kill it. */
244         lu_object_free(env, orig);
245 }
246 EXPORT_SYMBOL(lu_object_put);
247
248 /**
249  * Put object and don't keep in cache. This is temporary solution for
250  * multi-site objects when its layering is not constant.
251  */
252 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
253 {
254         set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
255         return lu_object_put(env, o);
256 }
257 EXPORT_SYMBOL(lu_object_put_nocache);
258
259 /**
260  * Kill the object and take it out of LRU cache.
261  * Currently used by client code for layout change.
262  */
263 void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
264 {
265         struct lu_object_header *top;
266
267         top = o->lo_header;
268         set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
269         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
270                 struct lu_site *site = o->lo_dev->ld_site;
271                 struct rhashtable *obj_hash = &site->ls_obj_hash;
272                 struct lu_site_bkt_data *bkt;
273
274                 bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
275                 spin_lock(&bkt->lsb_waitq.lock);
276                 if (!list_empty(&top->loh_lru)) {
277                         list_del_init(&top->loh_lru);
278                         percpu_counter_dec(&site->ls_lru_len_counter);
279                 }
280                 spin_unlock(&bkt->lsb_waitq.lock);
281
282                 rhashtable_remove_fast(obj_hash, &top->loh_hash,
283                                        obj_hash_params);
284         }
285 }
286 EXPORT_SYMBOL(lu_object_unhash);
287
288 /**
289  * Allocate new object.
290  *
291  * This follows object creation protocol, described in the comment within
292  * struct lu_device_operations definition.
293  */
294 static struct lu_object *lu_object_alloc(const struct lu_env *env,
295                                          struct lu_device *dev,
296                                          const struct lu_fid *f)
297 {
298         struct lu_object *top;
299
300         /*
301          * Create top-level object slice. This will also create
302          * lu_object_header.
303          */
304         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
305         if (top == NULL)
306                 return ERR_PTR(-ENOMEM);
307         if (IS_ERR(top))
308                 return top;
309         /*
310          * This is the only place where object fid is assigned. It's constant
311          * after this point.
312          */
313         top->lo_header->loh_fid = *f;
314
315         return top;
316 }
317
318 /**
319  * Initialize object.
320  *
321  * This is called after object hash insertion to avoid returning an object with
322  * stale attributes.
323  */
324 static int lu_object_start(const struct lu_env *env, struct lu_device *dev,
325                            struct lu_object *top,
326                            const struct lu_object_conf *conf)
327 {
328         struct lu_object *scan;
329         struct list_head *layers;
330         unsigned int init_mask = 0;
331         unsigned int init_flag;
332         int clean;
333         int result;
334
335         layers = &top->lo_header->loh_layers;
336
337         do {
338                 /*
339                  * Call ->loo_object_init() repeatedly, until no more new
340                  * object slices are created.
341                  */
342                 clean = 1;
343                 init_flag = 1;
344                 list_for_each_entry(scan, layers, lo_linkage) {
345                         if (init_mask & init_flag)
346                                 goto next;
347                         clean = 0;
348                         scan->lo_header = top->lo_header;
349                         result = scan->lo_ops->loo_object_init(env, scan, conf);
350                         if (result)
351                                 return result;
352
353                         init_mask |= init_flag;
354 next:
355                         init_flag <<= 1;
356                 }
357         } while (!clean);
358
359         list_for_each_entry_reverse(scan, layers, lo_linkage) {
360                 if (scan->lo_ops->loo_object_start != NULL) {
361                         result = scan->lo_ops->loo_object_start(env, scan);
362                         if (result)
363                                 return result;
364                 }
365         }
366
367         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
368
369         set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags);
370
371         return 0;
372 }
373
374 /**
375  * Free an object.
376  */
377 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
378 {
379         wait_queue_head_t *wq;
380         struct lu_site *site;
381         struct lu_object *scan;
382         struct list_head *layers;
383         LIST_HEAD(splice);
384
385         site = o->lo_dev->ld_site;
386         layers = &o->lo_header->loh_layers;
387         wq = lu_site_wq_from_fid(site, &o->lo_header->loh_fid);
388         /*
389          * First call ->loo_object_delete() method to release all resources.
390          */
391         list_for_each_entry_reverse(scan, layers, lo_linkage) {
392                 if (scan->lo_ops->loo_object_delete != NULL)
393                         scan->lo_ops->loo_object_delete(env, scan);
394         }
395
396         /*
397          * Then, splice object layers into stand-alone list, and call
398          * ->loo_object_free() on all layers to free memory. Splice is
399          * necessary, because lu_object_header is freed together with the
400          * top-level slice.
401          */
402         list_splice_init(layers, &splice);
403         while (!list_empty(&splice)) {
404                 /*
405                  * Free layers in bottom-to-top order, so that object header
406                  * lives as long as possible and ->loo_object_free() methods
407                  * can look at its contents.
408                  */
409                 o = container_of(splice.prev, struct lu_object, lo_linkage);
410                 list_del_init(&o->lo_linkage);
411                 LASSERT(o->lo_ops->loo_object_free != NULL);
412                 o->lo_ops->loo_object_free(env, o);
413         }
414
415         if (waitqueue_active(wq))
416                 wake_up(wq);
417 }
418
419 /**
420  * Free \a nr objects from the cold end of the site LRU list.
421  * if canblock is 0, then don't block awaiting for another
422  * instance of lu_site_purge() to complete
423  */
424 int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
425                           int nr, int canblock)
426 {
427         struct lu_object_header *h;
428         struct lu_object_header *temp;
429         struct lu_site_bkt_data *bkt;
430         LIST_HEAD(dispose);
431         int                      did_sth;
432         unsigned int             start = 0;
433         int                      count;
434         int                      bnr;
435         unsigned int             i;
436
437         if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
438                 RETURN(0);
439
440         /*
441          * Under LRU list lock, scan LRU list and move unreferenced objects to
442          * the dispose list, removing them from LRU and hash table.
443          */
444         if (nr != ~0)
445                 start = s->ls_purge_start;
446         bnr = (nr == ~0) ? -1 : nr / s->ls_bkt_cnt + 1;
447 again:
448         /*
449          * It doesn't make any sense to make purge threads parallel, that can
450          * only bring troubles to us.  See LU-5331.
451          */
452         if (canblock != 0)
453                 mutex_lock(&s->ls_purge_mutex);
454         else if (mutex_trylock(&s->ls_purge_mutex) == 0)
455                 goto out;
456
457         did_sth = 0;
458         for (i = start; i < s->ls_bkt_cnt ; i++) {
459                 count = bnr;
460                 bkt = &s->ls_bkts[i];
461                 spin_lock(&bkt->lsb_waitq.lock);
462
463                 list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
464                         LASSERT(atomic_read(&h->loh_ref) == 0);
465
466                         LINVRNT(lu_bkt_hash(s, &h->loh_fid) == i);
467
468                         set_bit(LU_OBJECT_UNHASHED, &h->loh_flags);
469                         rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
470                                                obj_hash_params);
471                         list_move(&h->loh_lru, &dispose);
472                         percpu_counter_dec(&s->ls_lru_len_counter);
473                         if (did_sth == 0)
474                                 did_sth = 1;
475
476                         if (nr != ~0 && --nr == 0)
477                                 break;
478
479                         if (count > 0 && --count == 0)
480                                 break;
481
482                 }
483                 spin_unlock(&bkt->lsb_waitq.lock);
484                 cond_resched();
485                 /*
486                  * Free everything on the dispose list. This is safe against
487                  * races due to the reasons described in lu_object_put().
488                  */
489                 while ((h = list_first_entry_or_null(&dispose,
490                                                      struct lu_object_header,
491                                                      loh_lru)) != NULL) {
492                         list_del_init(&h->loh_lru);
493                         lu_object_free(env, lu_object_top(h));
494                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
495                 }
496
497                 if (nr == 0)
498                         break;
499         }
500         mutex_unlock(&s->ls_purge_mutex);
501
502         if (nr != 0 && did_sth && start != 0) {
503                 start = 0; /* restart from the first bucket */
504                 goto again;
505         }
506         /* race on s->ls_purge_start, but nobody cares */
507         s->ls_purge_start = i & (s->ls_bkt_cnt - 1);
508 out:
509         return nr;
510 }
511 EXPORT_SYMBOL(lu_site_purge_objects);
512
513 /*
514  * Object printing.
515  *
516  * Code below has to jump through certain loops to output object description
517  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
518  * composes object description from strings that are parts of _lines_ of
519  * output (i.e., strings that are not terminated by newline). This doesn't fit
520  * very well into libcfs_debug_msg() interface that assumes that each message
521  * supplied to it is a self-contained output line.
522  *
523  * To work around this, strings are collected in a temporary buffer
524  * (implemented as a value of lu_cdebug_key key), until terminating newline
525  * character is detected.
526  *
527  */
528
529 enum {
530         /**
531          * Maximal line size.
532          *
533          * XXX overflow is not handled correctly.
534          */
535         LU_CDEBUG_LINE = 512
536 };
537
538 struct lu_cdebug_data {
539         /**
540          * Temporary buffer.
541          */
542         char lck_area[LU_CDEBUG_LINE];
543 };
544
545 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
546 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
547
548 /**
549  * Key, holding temporary buffer. This key is registered very early by
550  * lu_global_init().
551  */
552 static struct lu_context_key lu_global_key = {
553         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
554                     LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
555         .lct_init = lu_global_key_init,
556         .lct_fini = lu_global_key_fini
557 };
558
559 /**
560  * Printer function emitting messages through libcfs_debug_msg().
561  */
562 int lu_cdebug_printer(const struct lu_env *env,
563                       void *cookie, const char *format, ...)
564 {
565         struct libcfs_debug_msg_data *msgdata = cookie;
566         struct lu_cdebug_data        *key;
567         int used;
568         int complete;
569         va_list args;
570
571         va_start(args, format);
572
573         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
574         LASSERT(key != NULL);
575
576         used = strlen(key->lck_area);
577         complete = format[strlen(format) - 1] == '\n';
578         /*
579          * Append new chunk to the buffer.
580          */
581         vsnprintf(key->lck_area + used,
582                   ARRAY_SIZE(key->lck_area) - used, format, args);
583         if (complete) {
584                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
585                         libcfs_debug_msg(msgdata, "%s\n", key->lck_area);
586                 key->lck_area[0] = 0;
587         }
588         va_end(args);
589         return 0;
590 }
591 EXPORT_SYMBOL(lu_cdebug_printer);
592
593 /**
594  * Print object header.
595  */
596 void lu_object_header_print(const struct lu_env *env, void *cookie,
597                             lu_printer_t printer,
598                             const struct lu_object_header *hdr)
599 {
600         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
601                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
602                    PFID(&hdr->loh_fid),
603                    test_bit(LU_OBJECT_UNHASHED,
604                             &hdr->loh_flags) ? "" : " hash",
605                    list_empty(&hdr->loh_lru) ? "" : " lru",
606                    hdr->loh_attr & LOHA_EXISTS ? " exist" : "");
607 }
608 EXPORT_SYMBOL(lu_object_header_print);
609
610 /**
611  * Print human readable representation of the \a o to the \a printer.
612  */
613 void lu_object_print(const struct lu_env *env, void *cookie,
614                      lu_printer_t printer, const struct lu_object *o)
615 {
616         static const char ruler[] = "........................................";
617         struct lu_object_header *top;
618         int depth = 4;
619
620         top = o->lo_header;
621         lu_object_header_print(env, cookie, printer, top);
622         (*printer)(env, cookie, "{\n");
623
624         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
625                 /*
626                  * print `.' \a depth times followed by type name and address
627                  */
628                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
629                            o->lo_dev->ld_type->ldt_name, o);
630
631                 if (o->lo_ops->loo_object_print != NULL)
632                         (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
633
634                 (*printer)(env, cookie, "\n");
635         }
636
637         (*printer)(env, cookie, "} header@%p\n", top);
638 }
639 EXPORT_SYMBOL(lu_object_print);
640
641 /**
642  * Check object consistency.
643  */
644 int lu_object_invariant(const struct lu_object *o)
645 {
646         struct lu_object_header *top;
647
648         top = o->lo_header;
649         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
650                 if (o->lo_ops->loo_object_invariant != NULL &&
651                     !o->lo_ops->loo_object_invariant(o))
652                         return 0;
653         }
654         return 1;
655 }
656
657 /*
658  * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because the
659  * calculation for the number of objects to reclaim is not covered by a lock the
660  * maximum number of objects is capped by LU_CACHE_MAX_ADJUST.  This ensures
661  * that many concurrent threads will not accidentally purge the entire cache.
662  */
663 static void lu_object_limit(const struct lu_env *env,
664                             struct lu_device *dev)
665 {
666         u64 size, nr;
667
668         if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
669                 return;
670
671         size = atomic_read(&dev->ld_site->ls_obj_hash.nelems);
672         nr = (u64)lu_cache_nr;
673         if (size <= nr)
674                 return;
675
676         lu_site_purge_objects(env, dev->ld_site,
677                               min_t(u64, size - nr, LU_CACHE_NR_MAX_ADJUST),
678                               0);
679 }
680
681 static struct lu_object *htable_lookup(const struct lu_env *env,
682                                        struct lu_device *dev,
683                                        struct lu_site_bkt_data *bkt,
684                                        const struct lu_fid *f,
685                                        struct lu_object_header *new)
686 {
687         struct lu_site *s = dev->ld_site;
688         struct lu_object_header *h;
689
690 try_again:
691         rcu_read_lock();
692         if (new)
693                 h = rhashtable_lookup_get_insert_fast(&s->ls_obj_hash,
694                                                       &new->loh_hash,
695                                                       obj_hash_params);
696         else
697                 h = rhashtable_lookup(&s->ls_obj_hash, f, obj_hash_params);
698
699         if (IS_ERR_OR_NULL(h)) {
700                 /* Not found */
701                 if (!new)
702                         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
703                 rcu_read_unlock();
704                 if (PTR_ERR(h) == -ENOMEM) {
705                         msleep(20);
706                         goto try_again;
707                 }
708                 lu_object_limit(env, dev);
709                 if (PTR_ERR(h) == -E2BIG)
710                         goto try_again;
711
712                 return ERR_PTR(-ENOENT);
713         }
714
715         if (atomic_inc_not_zero(&h->loh_ref)) {
716                 rcu_read_unlock();
717                 return lu_object_top(h);
718         }
719
720         spin_lock(&bkt->lsb_waitq.lock);
721         if (lu_object_is_dying(h) ||
722             test_bit(LU_OBJECT_UNHASHED, &h->loh_flags)) {
723                 spin_unlock(&bkt->lsb_waitq.lock);
724                 rcu_read_unlock();
725                 if (new) {
726                         /*
727                          * Old object might have already been removed, or will
728                          * be soon.  We need to insert our new object, so
729                          * remove the old one just in case it is still there.
730                          */
731                         rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
732                                                obj_hash_params);
733                         goto try_again;
734                 }
735                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
736                 return ERR_PTR(-ENOENT);
737         }
738         /* Now protected by spinlock */
739         rcu_read_unlock();
740
741         if (!list_empty(&h->loh_lru)) {
742                 list_del_init(&h->loh_lru);
743                 percpu_counter_dec(&s->ls_lru_len_counter);
744         }
745         atomic_inc(&h->loh_ref);
746         spin_unlock(&bkt->lsb_waitq.lock);
747         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
748         return lu_object_top(h);
749 }
750
751 /**
752  * Search cache for an object with the fid \a f. If such object is found,
753  * return it. Otherwise, create new object, insert it into cache and return
754  * it. In any case, additional reference is acquired on the returned object.
755  */
756 struct lu_object *lu_object_find(const struct lu_env *env,
757                                  struct lu_device *dev, const struct lu_fid *f,
758                                  const struct lu_object_conf *conf)
759 {
760         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
761 }
762 EXPORT_SYMBOL(lu_object_find);
763
764 /*
765  * Get a 'first' reference to an object that was found while looking through the
766  * hash table.
767  */
768 struct lu_object *lu_object_get_first(struct lu_object_header *h,
769                                       struct lu_device *dev)
770 {
771         struct lu_site *s = dev->ld_site;
772         struct lu_object *ret;
773
774         if (IS_ERR_OR_NULL(h) || lu_object_is_dying(h))
775                 return NULL;
776
777         ret = lu_object_locate(h, dev->ld_type);
778         if (!ret)
779                 return ret;
780
781         if (!atomic_inc_not_zero(&h->loh_ref)) {
782                 struct lu_site_bkt_data *bkt;
783
784                 bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)];
785                 spin_lock(&bkt->lsb_waitq.lock);
786                 if (!lu_object_is_dying(h) &&
787                     !test_bit(LU_OBJECT_UNHASHED, &h->loh_flags))
788                         atomic_inc(&h->loh_ref);
789                 else
790                         ret = NULL;
791                 spin_unlock(&bkt->lsb_waitq.lock);
792         }
793         return ret;
794 }
795 EXPORT_SYMBOL(lu_object_get_first);
796
797 /**
798  * Core logic of lu_object_find*() functions.
799  *
800  * Much like lu_object_find(), but top level device of object is specifically
801  * \a dev rather than top level device of the site. This interface allows
802  * objects of different "stacking" to be created within the same site.
803  */
804 struct lu_object *lu_object_find_at(const struct lu_env *env,
805                                     struct lu_device *dev,
806                                     const struct lu_fid *f,
807                                     const struct lu_object_conf *conf)
808 {
809         struct lu_object *o;
810         struct lu_object *shadow;
811         struct lu_site *s;
812         struct lu_site_bkt_data *bkt;
813         struct rhashtable *hs;
814         int rc;
815
816         ENTRY;
817
818         /* FID is from disk or network, zero FID is meaningless, return error
819          * early to avoid assertion in lu_object_put. If a zero FID is wanted,
820          * it should be allocated via lu_object_anon().
821          */
822         if (fid_is_zero(f))
823                 RETURN(ERR_PTR(-EINVAL));
824
825         /*
826          * This uses standard index maintenance protocol:
827          *
828          *     - search index under lock, and return object if found;
829          *     - otherwise, unlock index, allocate new object;
830          *     - lock index and search again;
831          *     - if nothing is found (usual case), insert newly created
832          *       object into index;
833          *     - otherwise (race: other thread inserted object), free
834          *       object just allocated.
835          *     - unlock index;
836          *     - return object.
837          *
838          * For "LOC_F_NEW" case, we are sure the object is new established.
839          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
840          * just alloc and insert directly.
841          *
842          */
843         s  = dev->ld_site;
844         hs = &s->ls_obj_hash;
845
846         if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
847                 lu_site_purge(env, s, -1);
848
849         bkt = &s->ls_bkts[lu_bkt_hash(s, f)];
850         if (!(conf && conf->loc_flags & LOC_F_NEW)) {
851                 o = htable_lookup(env, dev, bkt, f, NULL);
852
853                 if (!IS_ERR(o)) {
854                         if (likely(lu_object_is_inited(o->lo_header)))
855                                 RETURN(o);
856
857                         wait_event_idle(bkt->lsb_waitq,
858                                         lu_object_is_inited(o->lo_header) ||
859                                         lu_object_is_dying(o->lo_header));
860
861                         if (lu_object_is_dying(o->lo_header)) {
862                                 lu_object_put(env, o);
863
864                                 RETURN(ERR_PTR(-ENOENT));
865                         }
866
867                         RETURN(o);
868                 }
869
870                 if (PTR_ERR(o) != -ENOENT)
871                         RETURN(o);
872         }
873
874         /*
875          * Allocate new object, NB, object is unitialized in case object
876          * is changed between allocation and hash insertion, thus the object
877          * with stale attributes is returned.
878          */
879         o = lu_object_alloc(env, dev, f);
880         if (IS_ERR(o))
881                 RETURN(o);
882
883         LASSERT(lu_fid_eq(lu_object_fid(o), f));
884
885         CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
886
887         if (conf && conf->loc_flags & LOC_F_NEW) {
888                 int status = rhashtable_insert_fast(hs, &o->lo_header->loh_hash,
889                                                     obj_hash_params);
890                 if (status)
891                         /* Strange error - go the slow way */
892                         shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
893                 else
894                         shadow = ERR_PTR(-ENOENT);
895         } else {
896                 shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
897         }
898         if (likely(PTR_ERR(shadow) == -ENOENT)) {
899                 /*
900                  * The new object has been successfully inserted.
901                  *
902                  * This may result in rather complicated operations, including
903                  * fld queries, inode loading, etc.
904                  */
905                 rc = lu_object_start(env, dev, o, conf);
906                 if (rc) {
907                         lu_object_put_nocache(env, o);
908                         RETURN(ERR_PTR(rc));
909                 }
910
911                 wake_up(&bkt->lsb_waitq);
912
913                 lu_object_limit(env, dev);
914
915                 RETURN(o);
916         }
917
918         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
919         lu_object_free(env, o);
920
921         if (!(conf && conf->loc_flags & LOC_F_NEW) &&
922             !IS_ERR(shadow) &&
923             !lu_object_is_inited(shadow->lo_header)) {
924                 wait_event_idle(bkt->lsb_waitq,
925                                 lu_object_is_inited(shadow->lo_header) ||
926                                 lu_object_is_dying(shadow->lo_header));
927
928                 if (lu_object_is_dying(shadow->lo_header)) {
929                         lu_object_put(env, shadow);
930
931                         RETURN(ERR_PTR(-ENOENT));
932                 }
933         }
934
935         RETURN(shadow);
936 }
937 EXPORT_SYMBOL(lu_object_find_at);
938
939 /**
940  * Find object with given fid, and return its slice belonging to given device.
941  */
942 struct lu_object *lu_object_find_slice(const struct lu_env *env,
943                                        struct lu_device *dev,
944                                        const struct lu_fid *f,
945                                        const struct lu_object_conf *conf)
946 {
947         struct lu_object *top;
948         struct lu_object *obj;
949
950         top = lu_object_find(env, dev, f, conf);
951         if (IS_ERR(top))
952                 return top;
953
954         obj = lu_object_locate(top->lo_header, dev->ld_type);
955         if (unlikely(obj == NULL)) {
956                 lu_object_put(env, top);
957                 obj = ERR_PTR(-ENOENT);
958         }
959
960         return obj;
961 }
962 EXPORT_SYMBOL(lu_object_find_slice);
963
964 int lu_device_type_init(struct lu_device_type *ldt)
965 {
966         int result = 0;
967
968         atomic_set(&ldt->ldt_device_nr, 0);
969         if (ldt->ldt_ops->ldto_init)
970                 result = ldt->ldt_ops->ldto_init(ldt);
971
972         return result;
973 }
974 EXPORT_SYMBOL(lu_device_type_init);
975
976 void lu_device_type_fini(struct lu_device_type *ldt)
977 {
978         if (ldt->ldt_ops->ldto_fini)
979                 ldt->ldt_ops->ldto_fini(ldt);
980 }
981 EXPORT_SYMBOL(lu_device_type_fini);
982
983 /**
984  * Global list of all sites on this node
985  */
986 static LIST_HEAD(lu_sites);
987 static DECLARE_RWSEM(lu_sites_guard);
988
989 /**
990  * Global environment used by site shrinker.
991  */
992 static struct lu_env lu_shrink_env;
993
994 struct lu_site_print_arg {
995         struct lu_env   *lsp_env;
996         void            *lsp_cookie;
997         lu_printer_t     lsp_printer;
998 };
999
1000 static void
1001 lu_site_obj_print(struct lu_object_header *h, struct lu_site_print_arg *arg)
1002 {
1003         if (!list_empty(&h->loh_layers)) {
1004                 const struct lu_object *o;
1005
1006                 o = lu_object_top(h);
1007                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
1008                                 arg->lsp_printer, o);
1009         } else {
1010                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
1011                                        arg->lsp_printer, h);
1012         }
1013 }
1014
1015 /**
1016  * Print all objects in \a s.
1017  */
1018 void lu_site_print(const struct lu_env *env, struct lu_site *s, atomic_t *ref,
1019                    int msg_flag, lu_printer_t printer)
1020 {
1021         struct lu_site_print_arg arg = {
1022                 .lsp_env     = (struct lu_env *)env,
1023                 .lsp_printer = printer,
1024         };
1025         struct rhashtable_iter iter;
1026         struct lu_object_header *h;
1027         LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, msg_flag, NULL);
1028
1029         if (!s || !atomic_read(ref))
1030                 return;
1031
1032         arg.lsp_cookie = (void *)&msgdata;
1033
1034         rhashtable_walk_enter(&s->ls_obj_hash, &iter);
1035         rhashtable_walk_start(&iter);
1036         while ((h = rhashtable_walk_next(&iter)) != NULL) {
1037                 if (IS_ERR(h))
1038                         continue;
1039                 lu_site_obj_print(h, &arg);
1040         }
1041         rhashtable_walk_stop(&iter);
1042         rhashtable_walk_exit(&iter);
1043 }
1044 EXPORT_SYMBOL(lu_site_print);
1045
1046 /**
1047  * Return desired hash table order.
1048  */
1049 static void lu_htable_limits(struct lu_device *top)
1050 {
1051         unsigned long cache_size;
1052
1053         /*
1054          * For ZFS based OSDs the cache should be disabled by default.  This
1055          * allows the ZFS ARC maximum flexibility in determining what buffers
1056          * to cache.  If Lustre has objects or buffer which it wants to ensure
1057          * always stay cached it must maintain a hold on them.
1058          */
1059         if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
1060                 lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
1061                 return;
1062         }
1063
1064         /*
1065          * Calculate hash table size, assuming that we want reasonable
1066          * performance when 20% of total memory is occupied by cache of
1067          * lu_objects.
1068          *
1069          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
1070          */
1071         cache_size = cfs_totalram_pages();
1072
1073 #if BITS_PER_LONG == 32
1074         /* limit hashtable size for lowmem systems to low RAM */
1075         if (cache_size > 1 << (30 - PAGE_SHIFT))
1076                 cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4;
1077 #endif
1078
1079         /* clear off unreasonable cache setting. */
1080         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
1081                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in the range of (0, %u]. Will use default value: %u.\n",
1082                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
1083                       LU_CACHE_PERCENT_DEFAULT);
1084
1085                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
1086         }
1087         cache_size = cache_size / 100 * lu_cache_percent *
1088                 (PAGE_SIZE / 1024);
1089
1090         lu_cache_nr = clamp_t(typeof(cache_size), cache_size,
1091                               LU_CACHE_NR_MIN, LU_CACHE_NR_MAX);
1092 }
1093
1094 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
1095 {
1096         spin_lock(&s->ls_ld_lock);
1097         if (list_empty(&d->ld_linkage))
1098                 list_add(&d->ld_linkage, &s->ls_ld_linkage);
1099         spin_unlock(&s->ls_ld_lock);
1100 }
1101 EXPORT_SYMBOL(lu_dev_add_linkage);
1102
1103 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
1104 {
1105         spin_lock(&s->ls_ld_lock);
1106         list_del_init(&d->ld_linkage);
1107         spin_unlock(&s->ls_ld_lock);
1108 }
1109 EXPORT_SYMBOL(lu_dev_del_linkage);
1110
1111 /**
1112   * Initialize site \a s, with \a d as the top level device.
1113   */
1114 int lu_site_init(struct lu_site *s, struct lu_device *top)
1115 {
1116         struct lu_site_bkt_data *bkt;
1117         unsigned int i;
1118         int rc;
1119         ENTRY;
1120
1121         memset(s, 0, sizeof *s);
1122         mutex_init(&s->ls_purge_mutex);
1123         lu_htable_limits(top);
1124
1125 #ifdef HAVE_PERCPU_COUNTER_INIT_GFP_FLAG
1126         rc = percpu_counter_init(&s->ls_lru_len_counter, 0, GFP_NOFS);
1127 #else
1128         rc = percpu_counter_init(&s->ls_lru_len_counter, 0);
1129 #endif
1130         if (rc)
1131                 return -ENOMEM;
1132
1133         if (rhashtable_init(&s->ls_obj_hash, &obj_hash_params) != 0) {
1134                 CERROR("failed to create lu_site hash\n");
1135                 return -ENOMEM;
1136         }
1137
1138         s->ls_bkt_seed = prandom_u32();
1139         s->ls_bkt_cnt = max_t(long, 1 << LU_SITE_BKT_BITS,
1140                               2 * num_possible_cpus());
1141         s->ls_bkt_cnt = roundup_pow_of_two(s->ls_bkt_cnt);
1142         OBD_ALLOC_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
1143         if (!s->ls_bkts) {
1144                 rhashtable_destroy(&s->ls_obj_hash);
1145                 s->ls_bkts = NULL;
1146                 return -ENOMEM;
1147         }
1148
1149         for (i = 0; i < s->ls_bkt_cnt; i++) {
1150                 bkt = &s->ls_bkts[i];
1151                 INIT_LIST_HEAD(&bkt->lsb_lru);
1152                 init_waitqueue_head(&bkt->lsb_waitq);
1153         }
1154
1155         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
1156         if (s->ls_stats == NULL) {
1157                 OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
1158                 s->ls_bkts = NULL;
1159                 rhashtable_destroy(&s->ls_obj_hash);
1160                 return -ENOMEM;
1161         }
1162
1163         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
1164                              0, "created", "created");
1165         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
1166                              0, "cache_hit", "cache_hit");
1167         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
1168                              0, "cache_miss", "cache_miss");
1169         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
1170                              0, "cache_race", "cache_race");
1171         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
1172                              0, "cache_death_race", "cache_death_race");
1173         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
1174                              0, "lru_purged", "lru_purged");
1175
1176         INIT_LIST_HEAD(&s->ls_linkage);
1177         s->ls_top_dev = top;
1178         top->ld_site = s;
1179         lu_device_get(top);
1180         lu_ref_add(&top->ld_reference, "site-top", s);
1181
1182         INIT_LIST_HEAD(&s->ls_ld_linkage);
1183         spin_lock_init(&s->ls_ld_lock);
1184
1185         lu_dev_add_linkage(s, top);
1186
1187         RETURN(0);
1188 }
1189 EXPORT_SYMBOL(lu_site_init);
1190
1191 /**
1192  * Finalize \a s and release its resources.
1193  */
1194 void lu_site_fini(struct lu_site *s)
1195 {
1196         down_write(&lu_sites_guard);
1197         list_del_init(&s->ls_linkage);
1198         up_write(&lu_sites_guard);
1199
1200         percpu_counter_destroy(&s->ls_lru_len_counter);
1201
1202         if (s->ls_bkts) {
1203                 rhashtable_destroy(&s->ls_obj_hash);
1204                 OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
1205                 s->ls_bkts = NULL;
1206         }
1207
1208         if (s->ls_top_dev != NULL) {
1209                 s->ls_top_dev->ld_site = NULL;
1210                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1211                 lu_device_put(s->ls_top_dev);
1212                 s->ls_top_dev = NULL;
1213         }
1214
1215         if (s->ls_stats != NULL)
1216                 lprocfs_free_stats(&s->ls_stats);
1217 }
1218 EXPORT_SYMBOL(lu_site_fini);
1219
1220 /**
1221  * Called when initialization of stack for this site is completed.
1222  */
1223 int lu_site_init_finish(struct lu_site *s)
1224 {
1225         int result;
1226         down_write(&lu_sites_guard);
1227         result = lu_context_refill(&lu_shrink_env.le_ctx);
1228         if (result == 0)
1229                 list_add(&s->ls_linkage, &lu_sites);
1230         up_write(&lu_sites_guard);
1231         return result;
1232 }
1233 EXPORT_SYMBOL(lu_site_init_finish);
1234
1235 /**
1236  * Acquire additional reference on device \a d
1237  */
1238 void lu_device_get(struct lu_device *d)
1239 {
1240         atomic_inc(&d->ld_ref);
1241 }
1242 EXPORT_SYMBOL(lu_device_get);
1243
1244 /**
1245  * Release reference on device \a d.
1246  */
1247 void lu_device_put(struct lu_device *d)
1248 {
1249         LASSERT(atomic_read(&d->ld_ref) > 0);
1250         atomic_dec(&d->ld_ref);
1251 }
1252 EXPORT_SYMBOL(lu_device_put);
1253
1254 enum { /* Maximal number of tld slots. */
1255         LU_CONTEXT_KEY_NR = 40
1256 };
1257 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1258 static DECLARE_RWSEM(lu_key_initing);
1259
1260 /**
1261  * Initialize device \a d of type \a t.
1262  */
1263 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1264 {
1265         if (atomic_add_unless(&t->ldt_device_nr, 1, 0) == 0) {
1266                 down_write(&lu_key_initing);
1267                 if (t->ldt_ops->ldto_start &&
1268                     atomic_read(&t->ldt_device_nr) == 0)
1269                         t->ldt_ops->ldto_start(t);
1270                 atomic_inc(&t->ldt_device_nr);
1271                 up_write(&lu_key_initing);
1272         }
1273
1274         memset(d, 0, sizeof *d);
1275         d->ld_type = t;
1276         lu_ref_init(&d->ld_reference);
1277         INIT_LIST_HEAD(&d->ld_linkage);
1278
1279         return 0;
1280 }
1281 EXPORT_SYMBOL(lu_device_init);
1282
1283 /**
1284  * Finalize device \a d.
1285  */
1286 void lu_device_fini(struct lu_device *d)
1287 {
1288         struct lu_device_type *t = d->ld_type;
1289
1290         if (d->ld_obd != NULL) {
1291                 d->ld_obd->obd_lu_dev = NULL;
1292                 d->ld_obd = NULL;
1293         }
1294
1295         lu_ref_fini(&d->ld_reference);
1296         LASSERTF(atomic_read(&d->ld_ref) == 0,
1297                  "Refcount is %u\n", atomic_read(&d->ld_ref));
1298         LASSERT(atomic_read(&t->ldt_device_nr) > 0);
1299
1300         if (atomic_dec_and_test(&t->ldt_device_nr) &&
1301             t->ldt_ops->ldto_stop != NULL)
1302                 t->ldt_ops->ldto_stop(t);
1303 }
1304 EXPORT_SYMBOL(lu_device_fini);
1305
1306 /**
1307  * Initialize object \a o that is part of compound object \a h and was created
1308  * by device \a d.
1309  */
1310 int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1311                    struct lu_device *d)
1312 {
1313         memset(o, 0, sizeof(*o));
1314         o->lo_header = h;
1315         o->lo_dev = d;
1316         lu_device_get(d);
1317         lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
1318         INIT_LIST_HEAD(&o->lo_linkage);
1319
1320         return 0;
1321 }
1322 EXPORT_SYMBOL(lu_object_init);
1323
1324 /**
1325  * Finalize object and release its resources.
1326  */
1327 void lu_object_fini(struct lu_object *o)
1328 {
1329         struct lu_device *dev = o->lo_dev;
1330
1331         LASSERT(list_empty(&o->lo_linkage));
1332
1333         if (dev != NULL) {
1334                 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1335                               "lu_object", o);
1336                 lu_device_put(dev);
1337                 o->lo_dev = NULL;
1338         }
1339 }
1340 EXPORT_SYMBOL(lu_object_fini);
1341
1342 /**
1343  * Add object \a o as first layer of compound object \a h
1344  *
1345  * This is typically called by the ->ldo_object_alloc() method of top-level
1346  * device.
1347  */
1348 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1349 {
1350         list_move(&o->lo_linkage, &h->loh_layers);
1351 }
1352 EXPORT_SYMBOL(lu_object_add_top);
1353
1354 /**
1355  * Add object \a o as a layer of compound object, going after \a before.
1356  *
1357  * This is typically called by the ->ldo_object_alloc() method of \a
1358  * before->lo_dev.
1359  */
1360 void lu_object_add(struct lu_object *before, struct lu_object *o)
1361 {
1362         list_move(&o->lo_linkage, &before->lo_linkage);
1363 }
1364 EXPORT_SYMBOL(lu_object_add);
1365
1366 /**
1367  * Initialize compound object.
1368  */
1369 int lu_object_header_init(struct lu_object_header *h)
1370 {
1371         memset(h, 0, sizeof *h);
1372         atomic_set(&h->loh_ref, 1);
1373         INIT_LIST_HEAD(&h->loh_lru);
1374         INIT_LIST_HEAD(&h->loh_layers);
1375         lu_ref_init(&h->loh_reference);
1376         return 0;
1377 }
1378 EXPORT_SYMBOL(lu_object_header_init);
1379
1380 /**
1381  * Finalize compound object.
1382  */
1383 void lu_object_header_fini(struct lu_object_header *h)
1384 {
1385         LASSERT(list_empty(&h->loh_layers));
1386         LASSERT(list_empty(&h->loh_lru));
1387         lu_ref_fini(&h->loh_reference);
1388 }
1389 EXPORT_SYMBOL(lu_object_header_fini);
1390
1391 /**
1392  * Free lu_object_header with proper RCU handling
1393  */
1394 void lu_object_header_free(struct lu_object_header *h)
1395 {
1396         lu_object_header_fini(h);
1397         OBD_FREE_PRE(h, sizeof(*h), "kfreed");
1398         kfree_rcu(h, loh_rcu);
1399 }
1400 EXPORT_SYMBOL(lu_object_header_free);
1401
1402 /**
1403  * Given a compound object, find its slice, corresponding to the device type
1404  * \a dtype.
1405  */
1406 struct lu_object *lu_object_locate(struct lu_object_header *h,
1407                                    const struct lu_device_type *dtype)
1408 {
1409         struct lu_object *o;
1410
1411         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1412                 if (o->lo_dev->ld_type == dtype)
1413                         return o;
1414         }
1415         return NULL;
1416 }
1417 EXPORT_SYMBOL(lu_object_locate);
1418
1419 /**
1420  * Finalize and free devices in the device stack.
1421  *
1422  * Finalize device stack by purging object cache, and calling
1423  * lu_device_type_operations::ldto_device_fini() and
1424  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1425  */
1426 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1427 {
1428         struct lu_site   *site = top->ld_site;
1429         struct lu_device *scan;
1430         struct lu_device *next;
1431
1432         lu_site_purge(env, site, ~0);
1433         for (scan = top; scan != NULL; scan = next) {
1434                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1435                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1436                 lu_device_put(scan);
1437         }
1438
1439         /* purge again. */
1440         lu_site_purge(env, site, ~0);
1441
1442         for (scan = top; scan != NULL; scan = next) {
1443                 const struct lu_device_type *ldt = scan->ld_type;
1444
1445                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1446         }
1447 }
1448
1449 /**
1450  * Global counter incremented whenever key is registered, unregistered,
1451  * revived or quiesced. This is used to void unnecessary calls to
1452  * lu_context_refill(). No locking is provided, as initialization and shutdown
1453  * are supposed to be externally serialized.
1454  */
1455 static atomic_t key_set_version = ATOMIC_INIT(0);
1456
1457 /**
1458  * Register new key.
1459  */
1460 int lu_context_key_register(struct lu_context_key *key)
1461 {
1462         int result;
1463         unsigned int i;
1464
1465         LASSERT(key->lct_init != NULL);
1466         LASSERT(key->lct_fini != NULL);
1467         LASSERT(key->lct_tags != 0);
1468         LASSERT(key->lct_owner != NULL);
1469
1470         result = -ENFILE;
1471         atomic_set(&key->lct_used, 1);
1472         lu_ref_init(&key->lct_reference);
1473         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1474                 if (lu_keys[i])
1475                         continue;
1476                 key->lct_index = i;
1477
1478                 if (strncmp("osd_", module_name(key->lct_owner), 4) == 0)
1479                         CFS_RACE_WAIT(OBD_FAIL_OBD_SETUP);
1480
1481                 if (cmpxchg(&lu_keys[i], NULL, key) != NULL)
1482                         continue;
1483
1484                 result = 0;
1485                 atomic_inc(&key_set_version);
1486                 break;
1487         }
1488         if (result) {
1489                 lu_ref_fini(&key->lct_reference);
1490                 atomic_set(&key->lct_used, 0);
1491         }
1492         return result;
1493 }
1494 EXPORT_SYMBOL(lu_context_key_register);
1495
1496 static void key_fini(struct lu_context *ctx, int index)
1497 {
1498         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1499                 struct lu_context_key *key;
1500
1501                 key = lu_keys[index];
1502                 LASSERT(key != NULL);
1503                 LASSERT(key->lct_fini != NULL);
1504                 LASSERT(atomic_read(&key->lct_used) > 0);
1505
1506                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1507                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1508                 if (atomic_dec_and_test(&key->lct_used))
1509                         wake_up_var(&key->lct_used);
1510
1511                 LASSERT(key->lct_owner != NULL);
1512                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1513                         LINVRNT(module_refcount(key->lct_owner) > 0);
1514                         module_put(key->lct_owner);
1515                 }
1516                 ctx->lc_value[index] = NULL;
1517         }
1518 }
1519
1520 /**
1521  * Deregister key.
1522  */
1523 void lu_context_key_degister(struct lu_context_key *key)
1524 {
1525         LASSERT(atomic_read(&key->lct_used) >= 1);
1526         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1527
1528         lu_context_key_quiesce(NULL, key);
1529
1530         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1531
1532         /**
1533          * Wait until all transient contexts referencing this key have
1534          * run lu_context_key::lct_fini() method.
1535          */
1536         atomic_dec(&key->lct_used);
1537         wait_var_event(&key->lct_used, atomic_read(&key->lct_used) == 0);
1538
1539         if (!WARN_ON(lu_keys[key->lct_index] == NULL))
1540                 lu_ref_fini(&key->lct_reference);
1541
1542         smp_store_release(&lu_keys[key->lct_index], NULL);
1543 }
1544 EXPORT_SYMBOL(lu_context_key_degister);
1545
1546 /**
1547  * Register a number of keys. This has to be called after all keys have been
1548  * initialized by a call to LU_CONTEXT_KEY_INIT().
1549  */
1550 int lu_context_key_register_many(struct lu_context_key *k, ...)
1551 {
1552         struct lu_context_key *key = k;
1553         va_list args;
1554         int result;
1555
1556         va_start(args, k);
1557         do {
1558                 result = lu_context_key_register(key);
1559                 if (result)
1560                         break;
1561                 key = va_arg(args, struct lu_context_key *);
1562         } while (key != NULL);
1563         va_end(args);
1564
1565         if (result != 0) {
1566                 va_start(args, k);
1567                 while (k != key) {
1568                         lu_context_key_degister(k);
1569                         k = va_arg(args, struct lu_context_key *);
1570                 }
1571                 va_end(args);
1572         }
1573
1574         return result;
1575 }
1576 EXPORT_SYMBOL(lu_context_key_register_many);
1577
1578 /**
1579  * De-register a number of keys. This is a dual to
1580  * lu_context_key_register_many().
1581  */
1582 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1583 {
1584         va_list args;
1585
1586         va_start(args, k);
1587         do {
1588                 lu_context_key_degister(k);
1589                 k = va_arg(args, struct lu_context_key*);
1590         } while (k != NULL);
1591         va_end(args);
1592 }
1593 EXPORT_SYMBOL(lu_context_key_degister_many);
1594
1595 /**
1596  * Revive a number of keys.
1597  */
1598 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1599 {
1600         va_list args;
1601
1602         va_start(args, k);
1603         do {
1604                 lu_context_key_revive(k);
1605                 k = va_arg(args, struct lu_context_key*);
1606         } while (k != NULL);
1607         va_end(args);
1608 }
1609 EXPORT_SYMBOL(lu_context_key_revive_many);
1610
1611 /**
1612  * Quiescent a number of keys.
1613  */
1614 void lu_context_key_quiesce_many(struct lu_device_type *t,
1615                                  struct lu_context_key *k, ...)
1616 {
1617         va_list args;
1618
1619         va_start(args, k);
1620         do {
1621                 lu_context_key_quiesce(t, k);
1622                 k = va_arg(args, struct lu_context_key*);
1623         } while (k != NULL);
1624         va_end(args);
1625 }
1626 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1627
1628 /**
1629  * Return value associated with key \a key in context \a ctx.
1630  */
1631 void *lu_context_key_get(const struct lu_context *ctx,
1632                          const struct lu_context_key *key)
1633 {
1634         LINVRNT(ctx->lc_state == LCS_ENTERED);
1635         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1636         LASSERT(lu_keys[key->lct_index] == key);
1637         return ctx->lc_value[key->lct_index];
1638 }
1639 EXPORT_SYMBOL(lu_context_key_get);
1640
1641 /**
1642  * List of remembered contexts. XXX document me.
1643  */
1644 static LIST_HEAD(lu_context_remembered);
1645 static DEFINE_SPINLOCK(lu_context_remembered_guard);
1646
1647 /**
1648  * Destroy \a key in all remembered contexts. This is used to destroy key
1649  * values in "shared" contexts (like service threads), when a module owning
1650  * the key is about to be unloaded.
1651  */
1652 void lu_context_key_quiesce(struct lu_device_type *t,
1653                             struct lu_context_key *key)
1654 {
1655         struct lu_context *ctx;
1656
1657         if (key->lct_tags & LCT_QUIESCENT)
1658                 return;
1659         /*
1660          * The write-lock on lu_key_initing will ensure that any
1661          * keys_fill() which didn't see LCT_QUIESCENT will have
1662          * finished before we call key_fini().
1663          */
1664         down_write(&lu_key_initing);
1665         if (!(key->lct_tags & LCT_QUIESCENT)) {
1666                 if (t == NULL || atomic_read(&t->ldt_device_nr) == 0)
1667                         key->lct_tags |= LCT_QUIESCENT;
1668                 up_write(&lu_key_initing);
1669
1670                 spin_lock(&lu_context_remembered_guard);
1671                 list_for_each_entry(ctx, &lu_context_remembered, lc_remember) {
1672                         spin_until_cond(READ_ONCE(ctx->lc_state) != LCS_LEAVING);
1673                         key_fini(ctx, key->lct_index);
1674                 }
1675                 spin_unlock(&lu_context_remembered_guard);
1676
1677                 return;
1678         }
1679         up_write(&lu_key_initing);
1680 }
1681
1682 void lu_context_key_revive(struct lu_context_key *key)
1683 {
1684         key->lct_tags &= ~LCT_QUIESCENT;
1685         atomic_inc(&key_set_version);
1686 }
1687
1688 static void keys_fini(struct lu_context *ctx)
1689 {
1690         unsigned int i;
1691
1692         if (ctx->lc_value == NULL)
1693                 return;
1694
1695         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1696                 key_fini(ctx, i);
1697
1698         OBD_FREE_PTR_ARRAY(ctx->lc_value, ARRAY_SIZE(lu_keys));
1699         ctx->lc_value = NULL;
1700 }
1701
1702 static int keys_fill(struct lu_context *ctx)
1703 {
1704         unsigned int i;
1705         int rc = 0;
1706
1707         /*
1708          * A serialisation with lu_context_key_quiesce() is needed, to
1709          * ensure we see LCT_QUIESCENT and don't allocate a new value
1710          * after it freed one.  The rwsem provides this.  As down_read()
1711          * does optimistic spinning while the writer is active, this is
1712          * unlikely to ever sleep.
1713          */
1714         down_read(&lu_key_initing);
1715         ctx->lc_version = atomic_read(&key_set_version);
1716
1717         LINVRNT(ctx->lc_value);
1718         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1719                 struct lu_context_key *key;
1720
1721                 key = lu_keys[i];
1722                 if (!ctx->lc_value[i] && key &&
1723                     (key->lct_tags & ctx->lc_tags) &&
1724                     /*
1725                      * Don't create values for a LCT_QUIESCENT key, as this
1726                      * will pin module owning a key.
1727                      */
1728                     !(key->lct_tags & LCT_QUIESCENT)) {
1729                         void *value;
1730
1731                         LINVRNT(key->lct_init != NULL);
1732                         LINVRNT(key->lct_index == i);
1733
1734                         LASSERT(key->lct_owner != NULL);
1735                         if (!(ctx->lc_tags & LCT_NOREF) &&
1736                             try_module_get(key->lct_owner) == 0) {
1737                                 /* module is unloading, skip this key */
1738                                 continue;
1739                         }
1740
1741                         value = key->lct_init(ctx, key);
1742                         if (unlikely(IS_ERR(value))) {
1743                                 rc = PTR_ERR(value);
1744                                 break;
1745                         }
1746
1747                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1748                         atomic_inc(&key->lct_used);
1749                         /*
1750                          * This is the only place in the code, where an
1751                          * element of ctx->lc_value[] array is set to non-NULL
1752                          * value.
1753                          */
1754                         ctx->lc_value[i] = value;
1755                         if (key->lct_exit != NULL)
1756                                 ctx->lc_tags |= LCT_HAS_EXIT;
1757                 }
1758         }
1759
1760         up_read(&lu_key_initing);
1761         return rc;
1762 }
1763
1764 static int keys_init(struct lu_context *ctx)
1765 {
1766         OBD_ALLOC_PTR_ARRAY(ctx->lc_value, ARRAY_SIZE(lu_keys));
1767         if (likely(ctx->lc_value != NULL))
1768                 return keys_fill(ctx);
1769
1770         return -ENOMEM;
1771 }
1772
1773 /**
1774  * Initialize context data-structure. Create values for all keys.
1775  */
1776 int lu_context_init(struct lu_context *ctx, __u32 tags)
1777 {
1778         int     rc;
1779
1780         memset(ctx, 0, sizeof *ctx);
1781         ctx->lc_state = LCS_INITIALIZED;
1782         ctx->lc_tags = tags;
1783         if (tags & LCT_REMEMBER) {
1784                 spin_lock(&lu_context_remembered_guard);
1785                 list_add(&ctx->lc_remember, &lu_context_remembered);
1786                 spin_unlock(&lu_context_remembered_guard);
1787         } else {
1788                 INIT_LIST_HEAD(&ctx->lc_remember);
1789         }
1790
1791         rc = keys_init(ctx);
1792         if (rc != 0)
1793                 lu_context_fini(ctx);
1794
1795         return rc;
1796 }
1797 EXPORT_SYMBOL(lu_context_init);
1798
1799 /**
1800  * Finalize context data-structure. Destroy key values.
1801  */
1802 void lu_context_fini(struct lu_context *ctx)
1803 {
1804         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1805         ctx->lc_state = LCS_FINALIZED;
1806
1807         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1808                 LASSERT(list_empty(&ctx->lc_remember));
1809         } else {
1810                 /* could race with key degister */
1811                 spin_lock(&lu_context_remembered_guard);
1812                 list_del_init(&ctx->lc_remember);
1813                 spin_unlock(&lu_context_remembered_guard);
1814         }
1815         keys_fini(ctx);
1816 }
1817 EXPORT_SYMBOL(lu_context_fini);
1818
1819 /**
1820  * Called before entering context.
1821  */
1822 void lu_context_enter(struct lu_context *ctx)
1823 {
1824         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1825         ctx->lc_state = LCS_ENTERED;
1826 }
1827 EXPORT_SYMBOL(lu_context_enter);
1828
1829 /**
1830  * Called after exiting from \a ctx
1831  */
1832 void lu_context_exit(struct lu_context *ctx)
1833 {
1834         unsigned int i;
1835
1836         LINVRNT(ctx->lc_state == LCS_ENTERED);
1837         /*
1838          * Disable preempt to ensure we get a warning if
1839          * any lct_exit ever tries to sleep.  That would hurt
1840          * lu_context_key_quiesce() which spins waiting for us.
1841          * This also ensure we aren't preempted while the state
1842          * is LCS_LEAVING, as that too would cause problems for
1843          * lu_context_key_quiesce().
1844          */
1845         preempt_disable();
1846         /*
1847          * Ensure lu_context_key_quiesce() sees LCS_LEAVING
1848          * or we see LCT_QUIESCENT
1849          */
1850         smp_store_mb(ctx->lc_state, LCS_LEAVING);
1851         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) {
1852                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1853                         struct lu_context_key *key;
1854
1855                         key = lu_keys[i];
1856                         if (ctx->lc_value[i] &&
1857                             !(key->lct_tags & LCT_QUIESCENT) &&
1858                             key->lct_exit)
1859                                 key->lct_exit(ctx, key, ctx->lc_value[i]);
1860                 }
1861         }
1862
1863         smp_store_release(&ctx->lc_state, LCS_LEFT);
1864         preempt_enable();
1865 }
1866 EXPORT_SYMBOL(lu_context_exit);
1867
1868 /**
1869  * Allocate for context all missing keys that were registered after context
1870  * creation. key_set_version is only changed in rare cases when modules
1871  * are loaded and removed.
1872  */
1873 int lu_context_refill(struct lu_context *ctx)
1874 {
1875         if (likely(ctx->lc_version == atomic_read(&key_set_version)))
1876                 return 0;
1877
1878         return keys_fill(ctx);
1879 }
1880
1881 /**
1882  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1883  * obd being added. Currently, this is only used on client side, specifically
1884  * for echo device client, for other stack (like ptlrpc threads), context are
1885  * predefined when the lu_device type are registered, during the module probe
1886  * phase.
1887  */
1888 u32 lu_context_tags_default = LCT_CL_THREAD;
1889 u32 lu_session_tags_default = LCT_SESSION;
1890
1891 void lu_context_tags_update(__u32 tags)
1892 {
1893         spin_lock(&lu_context_remembered_guard);
1894         lu_context_tags_default |= tags;
1895         atomic_inc(&key_set_version);
1896         spin_unlock(&lu_context_remembered_guard);
1897 }
1898 EXPORT_SYMBOL(lu_context_tags_update);
1899
1900 void lu_context_tags_clear(__u32 tags)
1901 {
1902         spin_lock(&lu_context_remembered_guard);
1903         lu_context_tags_default &= ~tags;
1904         atomic_inc(&key_set_version);
1905         spin_unlock(&lu_context_remembered_guard);
1906 }
1907 EXPORT_SYMBOL(lu_context_tags_clear);
1908
1909 void lu_session_tags_update(__u32 tags)
1910 {
1911         spin_lock(&lu_context_remembered_guard);
1912         lu_session_tags_default |= tags;
1913         atomic_inc(&key_set_version);
1914         spin_unlock(&lu_context_remembered_guard);
1915 }
1916 EXPORT_SYMBOL(lu_session_tags_update);
1917
1918 void lu_session_tags_clear(__u32 tags)
1919 {
1920         spin_lock(&lu_context_remembered_guard);
1921         lu_session_tags_default &= ~tags;
1922         atomic_inc(&key_set_version);
1923         spin_unlock(&lu_context_remembered_guard);
1924 }
1925 EXPORT_SYMBOL(lu_session_tags_clear);
1926
1927 int lu_env_init(struct lu_env *env, __u32 tags)
1928 {
1929         int result;
1930
1931         env->le_ses = NULL;
1932         result = lu_context_init(&env->le_ctx, tags);
1933         if (likely(result == 0))
1934                 lu_context_enter(&env->le_ctx);
1935         return result;
1936 }
1937 EXPORT_SYMBOL(lu_env_init);
1938
1939 void lu_env_fini(struct lu_env *env)
1940 {
1941         lu_context_exit(&env->le_ctx);
1942         lu_context_fini(&env->le_ctx);
1943         env->le_ses = NULL;
1944 }
1945 EXPORT_SYMBOL(lu_env_fini);
1946
1947 int lu_env_refill(struct lu_env *env)
1948 {
1949         int result;
1950
1951         result = lu_context_refill(&env->le_ctx);
1952         if (result == 0 && env->le_ses != NULL)
1953                 result = lu_context_refill(env->le_ses);
1954         return result;
1955 }
1956 EXPORT_SYMBOL(lu_env_refill);
1957
1958 /**
1959  * Currently, this API will only be used by echo client.
1960  * Because echo client and normal lustre client will share
1961  * same cl_env cache. So echo client needs to refresh
1962  * the env context after it get one from the cache, especially
1963  * when normal client and echo client co-exist in the same client.
1964  */
1965 int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
1966                           __u32 stags)
1967 {
1968         int    result;
1969
1970         if ((env->le_ctx.lc_tags & ctags) != ctags) {
1971                 env->le_ctx.lc_version = 0;
1972                 env->le_ctx.lc_tags |= ctags;
1973         }
1974
1975         if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
1976                 env->le_ses->lc_version = 0;
1977                 env->le_ses->lc_tags |= stags;
1978         }
1979
1980         result = lu_env_refill(env);
1981
1982         return result;
1983 }
1984 EXPORT_SYMBOL(lu_env_refill_by_tags);
1985
1986
1987 struct lu_env_item {
1988         struct task_struct *lei_task;   /* rhashtable key */
1989         struct rhash_head lei_linkage;
1990         struct lu_env *lei_env;
1991         struct rcu_head lei_rcu_head;
1992 };
1993
1994 static const struct rhashtable_params lu_env_rhash_params = {
1995         .key_len     = sizeof(struct task_struct *),
1996         .key_offset  = offsetof(struct lu_env_item, lei_task),
1997         .head_offset = offsetof(struct lu_env_item, lei_linkage),
1998     };
1999
2000 struct rhashtable lu_env_rhash;
2001
2002 struct lu_env_percpu {
2003         struct task_struct *lep_task;
2004         struct lu_env *lep_env ____cacheline_aligned_in_smp;
2005 };
2006
2007 static struct lu_env_percpu lu_env_percpu[NR_CPUS];
2008
2009 int lu_env_add_task(struct lu_env *env, struct task_struct *task)
2010 {
2011         struct lu_env_item *lei, *old;
2012
2013         LASSERT(env);
2014
2015         OBD_ALLOC_PTR(lei);
2016         if (!lei)
2017                 return -ENOMEM;
2018
2019         lei->lei_task = task;
2020         lei->lei_env = env;
2021
2022         old = rhashtable_lookup_get_insert_fast(&lu_env_rhash,
2023                                                 &lei->lei_linkage,
2024                                                 lu_env_rhash_params);
2025         LASSERT(!old);
2026
2027         return 0;
2028 }
2029 EXPORT_SYMBOL(lu_env_add_task);
2030
2031 int lu_env_add(struct lu_env *env)
2032 {
2033         return lu_env_add_task(env, current);
2034 }
2035 EXPORT_SYMBOL(lu_env_add);
2036
2037 static void lu_env_item_free(struct rcu_head *head)
2038 {
2039         struct lu_env_item *lei;
2040
2041         lei = container_of(head, struct lu_env_item, lei_rcu_head);
2042         OBD_FREE_PTR(lei);
2043 }
2044
2045 void lu_env_remove(struct lu_env *env)
2046 {
2047         struct lu_env_item *lei;
2048         const void *task = current;
2049         int i;
2050
2051         for_each_possible_cpu(i) {
2052                 if (lu_env_percpu[i].lep_env == env) {
2053                         LASSERT(lu_env_percpu[i].lep_task == task);
2054                         lu_env_percpu[i].lep_task = NULL;
2055                         lu_env_percpu[i].lep_env = NULL;
2056                 }
2057         }
2058
2059         /* The rcu_lock is not taking in this case since the key
2060          * used is the actual task_struct. This implies that each
2061          * object is only removed by the owning thread, so there
2062          * can never be a race on a particular object.
2063          */
2064         lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
2065                                      lu_env_rhash_params);
2066         if (lei && rhashtable_remove_fast(&lu_env_rhash, &lei->lei_linkage,
2067                                           lu_env_rhash_params) == 0)
2068                 call_rcu(&lei->lei_rcu_head, lu_env_item_free);
2069 }
2070 EXPORT_SYMBOL(lu_env_remove);
2071
2072 struct lu_env *lu_env_find(void)
2073 {
2074         struct lu_env *env = NULL;
2075         struct lu_env_item *lei;
2076         const void *task = current;
2077         int i = get_cpu();
2078
2079         if (lu_env_percpu[i].lep_task == current) {
2080                 env = lu_env_percpu[i].lep_env;
2081                 put_cpu();
2082                 LASSERT(env);
2083                 return env;
2084         }
2085
2086         lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
2087                                      lu_env_rhash_params);
2088         if (lei) {
2089                 env = lei->lei_env;
2090                 lu_env_percpu[i].lep_task = current;
2091                 lu_env_percpu[i].lep_env = env;
2092         }
2093         put_cpu();
2094
2095         return env;
2096 }
2097 EXPORT_SYMBOL(lu_env_find);
2098
2099 typedef struct lu_site_stats{
2100         unsigned        lss_populated;
2101         unsigned        lss_max_search;
2102         unsigned        lss_total;
2103         unsigned        lss_busy;
2104 } lu_site_stats_t;
2105
2106 static void lu_site_stats_get(const struct lu_site *s,
2107                               lu_site_stats_t *stats)
2108 {
2109         int cnt = atomic_read(&s->ls_obj_hash.nelems);
2110         /*
2111          * percpu_counter_sum_positive() won't accept a const pointer
2112          * as it does modify the struct by taking a spinlock
2113          */
2114         struct lu_site *s2 = (struct lu_site *)s;
2115
2116         stats->lss_busy += cnt -
2117                 percpu_counter_sum_positive(&s2->ls_lru_len_counter);
2118
2119         stats->lss_total += cnt;
2120         stats->lss_max_search = 0;
2121         stats->lss_populated = 0;
2122 }
2123
2124
2125 /*
2126  * lu_cache_shrink_count() returns an approximate number of cached objects
2127  * that can be freed by shrink_slab(). A counter, which tracks the
2128  * number of items in the site's lru, is maintained in a percpu_counter
2129  * for each site. The percpu values are incremented and decremented as
2130  * objects are added or removed from the lru. The percpu values are summed
2131  * and saved whenever a percpu value exceeds a threshold. Thus the saved,
2132  * summed value at any given time may not accurately reflect the current
2133  * lru length. But this value is sufficiently accurate for the needs of
2134  * a shrinker.
2135  *
2136  * Using a per cpu counter is a compromise solution to concurrent access:
2137  * lu_object_put() can update the counter without locking the site and
2138  * lu_cache_shrink_count can sum the counters without locking each
2139  * ls_obj_hash bucket.
2140  */
2141 static unsigned long lu_cache_shrink_count(struct shrinker *sk,
2142                                            struct shrink_control *sc)
2143 {
2144         struct lu_site *s;
2145         struct lu_site *tmp;
2146         unsigned long cached = 0;
2147
2148         if (!(sc->gfp_mask & __GFP_FS))
2149                 return 0;
2150
2151         down_read(&lu_sites_guard);
2152         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage)
2153                 cached += percpu_counter_read_positive(&s->ls_lru_len_counter);
2154         up_read(&lu_sites_guard);
2155
2156         cached = (cached / 100) * sysctl_vfs_cache_pressure;
2157         CDEBUG(D_INODE, "%ld objects cached, cache pressure %d\n",
2158                cached, sysctl_vfs_cache_pressure);
2159
2160         return cached;
2161 }
2162
2163 static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
2164                                           struct shrink_control *sc)
2165 {
2166         struct lu_site *s;
2167         struct lu_site *tmp;
2168         unsigned long remain = sc->nr_to_scan;
2169         LIST_HEAD(splice);
2170
2171         if (!(sc->gfp_mask & __GFP_FS))
2172                 /* We must not take the lu_sites_guard lock when
2173                  * __GFP_FS is *not* set because of the deadlock
2174                  * possibility detailed above. Additionally,
2175                  * since we cannot determine the number of
2176                  * objects in the cache without taking this
2177                  * lock, we're in a particularly tough spot. As
2178                  * a result, we'll just lie and say our cache is
2179                  * empty. This _should_ be ok, as we can't
2180                  * reclaim objects when __GFP_FS is *not* set
2181                  * anyways.
2182                  */
2183                 return SHRINK_STOP;
2184
2185         down_write(&lu_sites_guard);
2186         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
2187                 remain = lu_site_purge(&lu_shrink_env, s, remain);
2188                 /*
2189                  * Move just shrunk site to the tail of site list to
2190                  * assure shrinking fairness.
2191                  */
2192                 list_move_tail(&s->ls_linkage, &splice);
2193         }
2194         list_splice(&splice, lu_sites.prev);
2195         up_write(&lu_sites_guard);
2196
2197         return sc->nr_to_scan - remain;
2198 }
2199
2200 #ifdef HAVE_SHRINKER_COUNT
2201 static struct shrinker lu_site_shrinker = {
2202         .count_objects  = lu_cache_shrink_count,
2203         .scan_objects   = lu_cache_shrink_scan,
2204         .seeks          = DEFAULT_SEEKS,
2205 };
2206
2207 #else
2208 /*
2209  * There exists a potential lock inversion deadlock scenario when using
2210  * Lustre on top of ZFS. This occurs between one of ZFS's
2211  * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
2212  * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
2213  * while thread B will take the ht_lock and sleep on the lu_sites_guard
2214  * lock. Obviously neither thread will wake and drop their respective hold
2215  * on their lock.
2216  *
2217  * To prevent this from happening we must ensure the lu_sites_guard lock is
2218  * not taken while down this code path. ZFS reliably does not set the
2219  * __GFP_FS bit in its code paths, so this can be used to determine if it
2220  * is safe to take the lu_sites_guard lock.
2221  *
2222  * Ideally we should accurately return the remaining number of cached
2223  * objects without taking the lu_sites_guard lock, but this is not
2224  * possible in the current implementation.
2225  */
2226 static int lu_cache_shrink(struct shrinker *shrinker,
2227                            struct shrink_control *sc)
2228 {
2229         int cached = 0;
2230
2231         CDEBUG(D_INODE, "Shrink %lu objects\n", sc->nr_to_scan);
2232
2233         if (sc->nr_to_scan != 0)
2234                 lu_cache_shrink_scan(shrinker, sc);
2235
2236         cached = lu_cache_shrink_count(shrinker, sc);
2237         return cached;
2238 }
2239
2240 static struct shrinker lu_site_shrinker = {
2241         .shrink  = lu_cache_shrink,
2242         .seeks   = DEFAULT_SEEKS,
2243 };
2244
2245 #endif /* HAVE_SHRINKER_COUNT */
2246
2247
2248 /*
2249  * Debugging stuff.
2250  */
2251
2252 /**
2253  * Environment to be used in debugger, contains all tags.
2254  */
2255 static struct lu_env lu_debugging_env;
2256
2257 /**
2258  * Debugging printer function using printk().
2259  */
2260 int lu_printk_printer(const struct lu_env *env,
2261                       void *unused, const char *format, ...)
2262 {
2263         va_list args;
2264
2265         va_start(args, format);
2266         vprintk(format, args);
2267         va_end(args);
2268         return 0;
2269 }
2270
2271 int lu_debugging_setup(void)
2272 {
2273         return lu_env_init(&lu_debugging_env, ~0);
2274 }
2275
2276 void lu_context_keys_dump(void)
2277 {
2278         unsigned int i;
2279
2280         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
2281                 struct lu_context_key *key;
2282
2283                 key = lu_keys[i];
2284                 if (key != NULL) {
2285                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
2286                                i, key, key->lct_tags,
2287                                key->lct_init, key->lct_fini, key->lct_exit,
2288                                key->lct_index, atomic_read(&key->lct_used),
2289                                key->lct_owner ? key->lct_owner->name : "",
2290                                key->lct_owner);
2291                         lu_ref_print(&key->lct_reference);
2292                 }
2293         }
2294 }
2295
2296 /**
2297  * Initialization of global lu_* data.
2298  */
2299 int lu_global_init(void)
2300 {
2301         int result;
2302
2303         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
2304
2305         result = lu_ref_global_init();
2306         if (result != 0)
2307                 return result;
2308
2309         LU_CONTEXT_KEY_INIT(&lu_global_key);
2310         result = lu_context_key_register(&lu_global_key);
2311         if (result)
2312                 goto out_lu_ref;
2313
2314         /*
2315          * At this level, we don't know what tags are needed, so allocate them
2316          * conservatively. This should not be too bad, because this
2317          * environment is global.
2318          */
2319         down_write(&lu_sites_guard);
2320         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
2321         up_write(&lu_sites_guard);
2322         if (result) {
2323                 lu_context_key_degister(&lu_global_key);
2324                 goto out_lu_ref;
2325         }
2326
2327         /*
2328          * seeks estimation: 3 seeks to read a record from oi, one to read
2329          * inode, one for ea. Unfortunately setting this high value results in
2330          * lu_object/inode cache consuming all the memory.
2331          */
2332         result = register_shrinker(&lu_site_shrinker);
2333         if (result)
2334                 goto out_env;
2335
2336         result = rhashtable_init(&lu_env_rhash, &lu_env_rhash_params);
2337
2338         if (result)
2339                 goto out_shrinker;
2340
2341         return result;
2342
2343 out_shrinker:
2344         unregister_shrinker(&lu_site_shrinker);
2345 out_env:
2346         /* ordering here is explained in lu_global_fini() */
2347         lu_context_key_degister(&lu_global_key);
2348         down_write(&lu_sites_guard);
2349         lu_env_fini(&lu_shrink_env);
2350         up_write(&lu_sites_guard);
2351 out_lu_ref:
2352         lu_ref_global_fini();
2353         return result;
2354 }
2355
2356 /**
2357  * Dual to lu_global_init().
2358  */
2359 void lu_global_fini(void)
2360 {
2361         unregister_shrinker(&lu_site_shrinker);
2362
2363         lu_context_key_degister(&lu_global_key);
2364
2365         /*
2366          * Tear shrinker environment down _after_ de-registering
2367          * lu_global_key, because the latter has a value in the former.
2368          */
2369         down_write(&lu_sites_guard);
2370         lu_env_fini(&lu_shrink_env);
2371         up_write(&lu_sites_guard);
2372
2373         rhashtable_destroy(&lu_env_rhash);
2374
2375         lu_ref_global_fini();
2376 }
2377
2378 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
2379 {
2380 #ifdef CONFIG_PROC_FS
2381         struct lprocfs_counter ret;
2382
2383         lprocfs_stats_collect(stats, idx, &ret);
2384         return (__u32)ret.lc_count;
2385 #else
2386         return 0;
2387 #endif
2388 }
2389
2390 /**
2391  * Output site statistical counters into a buffer. Suitable for
2392  * lprocfs_rd_*()-style functions.
2393  */
2394 int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
2395 {
2396         const struct bucket_table *tbl;
2397         lu_site_stats_t stats;
2398         unsigned int chains;
2399
2400         memset(&stats, 0, sizeof(stats));
2401         lu_site_stats_get(s, &stats);
2402
2403         rcu_read_lock();
2404         tbl = rht_dereference_rcu(s->ls_obj_hash.tbl,
2405                                   &((struct lu_site *)s)->ls_obj_hash);
2406         chains = tbl->size;
2407         rcu_read_unlock();
2408         seq_printf(m, "%d/%d %d/%u %d %d %d %d %d %d %d\n",
2409                    stats.lss_busy,
2410                    stats.lss_total,
2411                    stats.lss_populated,
2412                    chains,
2413                    stats.lss_max_search,
2414                    ls_stats_read(s->ls_stats, LU_SS_CREATED),
2415                    ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2416                    ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2417                    ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2418                    ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2419                    ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2420         return 0;
2421 }
2422 EXPORT_SYMBOL(lu_site_stats_seq_print);
2423
2424 /**
2425  * Helper function to initialize a number of kmem slab caches at once.
2426  */
2427 int lu_kmem_init(struct lu_kmem_descr *caches)
2428 {
2429         int result;
2430         struct lu_kmem_descr *iter = caches;
2431
2432         for (result = 0; iter->ckd_cache != NULL; ++iter) {
2433                 *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
2434                                                      iter->ckd_size,
2435                                                      0, 0, NULL);
2436                 if (*iter->ckd_cache == NULL) {
2437                         result = -ENOMEM;
2438                         /* free all previously allocated caches */
2439                         lu_kmem_fini(caches);
2440                         break;
2441                 }
2442         }
2443         return result;
2444 }
2445 EXPORT_SYMBOL(lu_kmem_init);
2446
2447 /**
2448  * Helper function to finalize a number of kmem slab cached at once. Dual to
2449  * lu_kmem_init().
2450  */
2451 void lu_kmem_fini(struct lu_kmem_descr *caches)
2452 {
2453         for (; caches->ckd_cache != NULL; ++caches) {
2454                 if (*caches->ckd_cache != NULL) {
2455                         kmem_cache_destroy(*caches->ckd_cache);
2456                         *caches->ckd_cache = NULL;
2457                 }
2458         }
2459 }
2460 EXPORT_SYMBOL(lu_kmem_fini);
2461
2462 /**
2463  * Temporary solution to be able to assign fid in ->do_create()
2464  * till we have fully-functional OST fids
2465  */
2466 void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
2467                           const struct lu_fid *fid)
2468 {
2469         struct lu_site          *s = o->lo_dev->ld_site;
2470         struct lu_fid           *old = &o->lo_header->loh_fid;
2471         int rc;
2472
2473         LASSERT(fid_is_zero(old));
2474         *old = *fid;
2475 try_again:
2476         rc = rhashtable_lookup_insert_fast(&s->ls_obj_hash,
2477                                            &o->lo_header->loh_hash,
2478                                            obj_hash_params);
2479         /* supposed to be unique */
2480         LASSERT(rc != -EEXIST);
2481         /* handle hash table resizing */
2482         if (rc == -ENOMEM || rc == -EBUSY) {
2483                 msleep(20);
2484                 goto try_again;
2485         }
2486         /* trim the hash if its growing to big */
2487         lu_object_limit(env, o->lo_dev);
2488         if (rc == -E2BIG)
2489                 goto try_again;
2490
2491         LASSERTF(rc == 0, "failed hashtable insertion: rc = %d\n", rc);
2492 }
2493 EXPORT_SYMBOL(lu_object_assign_fid);
2494
2495 /**
2496  * allocates object with 0 (non-assiged) fid
2497  * XXX: temporary solution to be able to assign fid in ->do_create()
2498  *      till we have fully-functional OST fids
2499  */
2500 struct lu_object *lu_object_anon(const struct lu_env *env,
2501                                  struct lu_device *dev,
2502                                  const struct lu_object_conf *conf)
2503 {
2504         struct lu_fid fid;
2505         struct lu_object *o;
2506         int rc;
2507
2508         fid_zero(&fid);
2509         o = lu_object_alloc(env, dev, &fid);
2510         if (!IS_ERR(o)) {
2511                 rc = lu_object_start(env, dev, o, conf);
2512                 if (rc) {
2513                         lu_object_free(env, o);
2514                         return ERR_PTR(rc);
2515                 }
2516         }
2517
2518         return o;
2519 }
2520 EXPORT_SYMBOL(lu_object_anon);
2521
2522 struct lu_buf LU_BUF_NULL = {
2523         .lb_buf = NULL,
2524         .lb_len = 0
2525 };
2526 EXPORT_SYMBOL(LU_BUF_NULL);
2527
2528 void lu_buf_free(struct lu_buf *buf)
2529 {
2530         LASSERT(buf);
2531         if (buf->lb_buf) {
2532                 LASSERT(buf->lb_len > 0);
2533                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2534                 buf->lb_buf = NULL;
2535                 buf->lb_len = 0;
2536         }
2537 }
2538 EXPORT_SYMBOL(lu_buf_free);
2539
2540 void lu_buf_alloc(struct lu_buf *buf, size_t size)
2541 {
2542         LASSERT(buf);
2543         LASSERT(buf->lb_buf == NULL);
2544         LASSERT(buf->lb_len == 0);
2545         OBD_ALLOC_LARGE(buf->lb_buf, size);
2546         if (likely(buf->lb_buf))
2547                 buf->lb_len = size;
2548 }
2549 EXPORT_SYMBOL(lu_buf_alloc);
2550
2551 void lu_buf_realloc(struct lu_buf *buf, size_t size)
2552 {
2553         lu_buf_free(buf);
2554         lu_buf_alloc(buf, size);
2555 }
2556 EXPORT_SYMBOL(lu_buf_realloc);
2557
2558 struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, size_t len)
2559 {
2560         if (buf->lb_buf == NULL && buf->lb_len == 0)
2561                 lu_buf_alloc(buf, len);
2562
2563         if ((len > buf->lb_len) && (buf->lb_buf != NULL))
2564                 lu_buf_realloc(buf, len);
2565
2566         return buf;
2567 }
2568 EXPORT_SYMBOL(lu_buf_check_and_alloc);
2569
2570 /**
2571  * Increase the size of the \a buf.
2572  * preserves old data in buffer
2573  * old buffer remains unchanged on error
2574  * \retval 0 or -ENOMEM
2575  */
2576 int lu_buf_check_and_grow(struct lu_buf *buf, size_t len)
2577 {
2578         char *ptr;
2579
2580         if (len <= buf->lb_len)
2581                 return 0;
2582
2583         OBD_ALLOC_LARGE(ptr, len);
2584         if (ptr == NULL)
2585                 return -ENOMEM;
2586
2587         /* Free the old buf */
2588         if (buf->lb_buf != NULL) {
2589                 memcpy(ptr, buf->lb_buf, buf->lb_len);
2590                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2591         }
2592
2593         buf->lb_buf = ptr;
2594         buf->lb_len = len;
2595         return 0;
2596 }
2597 EXPORT_SYMBOL(lu_buf_check_and_grow);