Whamcloud - gitweb
LU-14352 various: only use wake_up_all() on exclusive waitqs
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/lu_object.c
33  *
34  * Lustre Object.
35  * These are the only exported functions, they provide some generic
36  * infrastructure for managing object devices
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_CLASS
42
43 #include <linux/delay.h>
44 #include <linux/module.h>
45 #include <linux/list.h>
46 #include <linux/processor.h>
47 #include <linux/random.h>
48
49 #include <libcfs/libcfs.h>
50 #include <libcfs/linux/linux-mem.h>
51 #include <libcfs/linux/linux-hash.h>
52 #include <obd_class.h>
53 #include <obd_support.h>
54 #include <lustre_disk.h>
55 #include <lustre_fid.h>
56 #include <lu_object.h>
57 #include <lu_ref.h>
58
59 struct lu_site_bkt_data {
60         /**
61          * LRU list, updated on each access to object. Protected by
62          * lsb_waitq.lock.
63          *
64          * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
65          * moved to the lu_site::ls_lru.prev
66          */
67         struct list_head                lsb_lru;
68         /**
69          * Wait-queue signaled when an object in this site is ultimately
70          * destroyed (lu_object_free()) or initialized (lu_object_start()).
71          * It is used by lu_object_find() to wait before re-trying when
72          * object in the process of destruction is found in the hash table;
73          * or wait object to be initialized by the allocator.
74          *
75          * \see htable_lookup().
76          */
77         wait_queue_head_t               lsb_waitq;
78 };
79
80 enum {
81         LU_CACHE_PERCENT_MAX     = 50,
82         LU_CACHE_PERCENT_DEFAULT = 20
83 };
84
85 #define LU_CACHE_NR_MAX_ADJUST          512
86 #define LU_CACHE_NR_UNLIMITED           -1
87 #define LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
88 /** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */
89 #define LU_CACHE_NR_ZFS_LIMIT           10240
90
91 #define LU_CACHE_NR_MIN                 4096
92 #define LU_CACHE_NR_MAX                 0x80000000UL
93
94 /**
95  * Max 256 buckets, we don't want too many buckets because:
96  * - consume too much memory (currently max 16K)
97  * - avoid unbalanced LRU list
98  * With few cpus there is little gain from extra buckets, so
99  * we treat this as a maximum in lu_site_init().
100  */
101 #define LU_SITE_BKT_BITS    8
102
103 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
104 module_param(lu_cache_percent, int, 0644);
105 MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
106
107 static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
108 module_param(lu_cache_nr, long, 0644);
109 MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
110
111 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
112 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
113
114 static u32 lu_fid_hash(const void *data, u32 len, u32 seed)
115 {
116         const struct lu_fid *fid = data;
117
118         seed = cfs_hash_32(seed ^ fid->f_oid, 32);
119         seed ^= cfs_hash_64(fid->f_seq, 32);
120         return seed;
121 }
122
123 static const struct rhashtable_params obj_hash_params = {
124         .key_len        = sizeof(struct lu_fid),
125         .key_offset     = offsetof(struct lu_object_header, loh_fid),
126         .head_offset    = offsetof(struct lu_object_header, loh_hash),
127         .hashfn         = lu_fid_hash,
128         .automatic_shrinking = true,
129 };
130
131 static inline int lu_bkt_hash(struct lu_site *s, const struct lu_fid *fid)
132 {
133         return lu_fid_hash(fid, sizeof(*fid), s->ls_bkt_seed) &
134                (s->ls_bkt_cnt - 1);
135 }
136
137 wait_queue_head_t *
138 lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
139 {
140         struct lu_site_bkt_data *bkt;
141
142         bkt = &site->ls_bkts[lu_bkt_hash(site, fid)];
143         return &bkt->lsb_waitq;
144 }
145 EXPORT_SYMBOL(lu_site_wq_from_fid);
146
147 /**
148  * Decrease reference counter on object. If last reference is freed, return
149  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
150  * case, free object immediately.
151  */
152 void lu_object_put(const struct lu_env *env, struct lu_object *o)
153 {
154         struct lu_site_bkt_data *bkt;
155         struct lu_object_header *top = o->lo_header;
156         struct lu_site *site = o->lo_dev->ld_site;
157         struct lu_object *orig = o;
158         const struct lu_fid *fid = lu_object_fid(o);
159
160         /*
161          * till we have full fids-on-OST implemented anonymous objects
162          * are possible in OSP. such an object isn't listed in the site
163          * so we should not remove it from the site.
164          */
165         if (fid_is_zero(fid)) {
166                 LASSERT(list_empty(&top->loh_lru));
167                 if (!atomic_dec_and_test(&top->loh_ref))
168                         return;
169                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
170                         if (o->lo_ops->loo_object_release != NULL)
171                                 o->lo_ops->loo_object_release(env, o);
172                 }
173                 lu_object_free(env, orig);
174                 return;
175         }
176
177         bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
178         if (atomic_add_unless(&top->loh_ref, -1, 1)) {
179 still_active:
180                 /*
181                  * At this point the object reference is dropped and lock is
182                  * not taken, so lu_object should not be touched because it
183                  * can be freed by concurrent thread.
184                  *
185                  * Somebody may be waiting for this, currently only used for
186                  * cl_object, see cl_object_put_last().
187                  */
188                 wake_up(&bkt->lsb_waitq);
189
190                 return;
191         }
192
193         spin_lock(&bkt->lsb_waitq.lock);
194         if (!atomic_dec_and_test(&top->loh_ref)) {
195                 spin_unlock(&bkt->lsb_waitq.lock);
196                 goto still_active;
197         }
198
199         /*
200          * Refcount is zero, and cannot be incremented without taking the bkt
201          * lock, so object is stable.
202          */
203
204         /*
205          * When last reference is released, iterate over object layers, and
206          * notify them that object is no longer busy.
207          */
208         list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
209                 if (o->lo_ops->loo_object_release != NULL)
210                         o->lo_ops->loo_object_release(env, o);
211         }
212
213         /*
214          * Don't use local 'is_dying' here because if was taken without lock but
215          * here we need the latest actual value of it so check lu_object
216          * directly here.
217          */
218         if (!lu_object_is_dying(top) &&
219             (lu_object_exists(orig) || lu_object_is_cl(orig))) {
220                 LASSERT(list_empty(&top->loh_lru));
221                 list_add_tail(&top->loh_lru, &bkt->lsb_lru);
222                 spin_unlock(&bkt->lsb_waitq.lock);
223                 percpu_counter_inc(&site->ls_lru_len_counter);
224                 CDEBUG(D_INODE, "Add %p/%p to site lru. bkt: %p\n",
225                        orig, top, bkt);
226                 return;
227         }
228
229         /*
230          * If object is dying (will not be cached) then remove it from hash
231          * table (it is already not on the LRU).
232          *
233          * This is done with bucket lock held.  As the only way to acquire first
234          * reference to previously unreferenced object is through hash-table
235          * lookup (lu_object_find()) which takes the lock for first reference,
236          * no race with concurrent object lookup is possible and we can safely
237          * destroy object below.
238          */
239         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
240                 rhashtable_remove_fast(&site->ls_obj_hash, &top->loh_hash,
241                                        obj_hash_params);
242
243         spin_unlock(&bkt->lsb_waitq.lock);
244         /* Object was already removed from hash above, can kill it. */
245         lu_object_free(env, orig);
246 }
247 EXPORT_SYMBOL(lu_object_put);
248
249 /**
250  * Put object and don't keep in cache. This is temporary solution for
251  * multi-site objects when its layering is not constant.
252  */
253 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
254 {
255         set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
256         return lu_object_put(env, o);
257 }
258 EXPORT_SYMBOL(lu_object_put_nocache);
259
260 /**
261  * Kill the object and take it out of LRU cache.
262  * Currently used by client code for layout change.
263  */
264 void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
265 {
266         struct lu_object_header *top;
267
268         top = o->lo_header;
269         set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
270         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
271                 struct lu_site *site = o->lo_dev->ld_site;
272                 struct rhashtable *obj_hash = &site->ls_obj_hash;
273                 struct lu_site_bkt_data *bkt;
274
275                 bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
276                 spin_lock(&bkt->lsb_waitq.lock);
277                 if (!list_empty(&top->loh_lru)) {
278                         list_del_init(&top->loh_lru);
279                         percpu_counter_dec(&site->ls_lru_len_counter);
280                 }
281                 spin_unlock(&bkt->lsb_waitq.lock);
282
283                 rhashtable_remove_fast(obj_hash, &top->loh_hash,
284                                        obj_hash_params);
285         }
286 }
287 EXPORT_SYMBOL(lu_object_unhash);
288
289 /**
290  * Allocate new object.
291  *
292  * This follows object creation protocol, described in the comment within
293  * struct lu_device_operations definition.
294  */
295 static struct lu_object *lu_object_alloc(const struct lu_env *env,
296                                          struct lu_device *dev,
297                                          const struct lu_fid *f)
298 {
299         struct lu_object *top;
300
301         /*
302          * Create top-level object slice. This will also create
303          * lu_object_header.
304          */
305         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
306         if (top == NULL)
307                 return ERR_PTR(-ENOMEM);
308         if (IS_ERR(top))
309                 return top;
310         /*
311          * This is the only place where object fid is assigned. It's constant
312          * after this point.
313          */
314         top->lo_header->loh_fid = *f;
315
316         return top;
317 }
318
319 /**
320  * Initialize object.
321  *
322  * This is called after object hash insertion to avoid returning an object with
323  * stale attributes.
324  */
325 static int lu_object_start(const struct lu_env *env, struct lu_device *dev,
326                            struct lu_object *top,
327                            const struct lu_object_conf *conf)
328 {
329         struct lu_object *scan;
330         struct list_head *layers;
331         unsigned int init_mask = 0;
332         unsigned int init_flag;
333         int clean;
334         int result;
335
336         layers = &top->lo_header->loh_layers;
337
338         do {
339                 /*
340                  * Call ->loo_object_init() repeatedly, until no more new
341                  * object slices are created.
342                  */
343                 clean = 1;
344                 init_flag = 1;
345                 list_for_each_entry(scan, layers, lo_linkage) {
346                         if (init_mask & init_flag)
347                                 goto next;
348                         clean = 0;
349                         scan->lo_header = top->lo_header;
350                         result = scan->lo_ops->loo_object_init(env, scan, conf);
351                         if (result)
352                                 return result;
353
354                         init_mask |= init_flag;
355 next:
356                         init_flag <<= 1;
357                 }
358         } while (!clean);
359
360         list_for_each_entry_reverse(scan, layers, lo_linkage) {
361                 if (scan->lo_ops->loo_object_start != NULL) {
362                         result = scan->lo_ops->loo_object_start(env, scan);
363                         if (result)
364                                 return result;
365                 }
366         }
367
368         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
369
370         set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags);
371
372         return 0;
373 }
374
375 /**
376  * Free an object.
377  */
378 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
379 {
380         wait_queue_head_t *wq;
381         struct lu_site *site;
382         struct lu_object *scan;
383         struct list_head *layers;
384         LIST_HEAD(splice);
385
386         site = o->lo_dev->ld_site;
387         layers = &o->lo_header->loh_layers;
388         wq = lu_site_wq_from_fid(site, &o->lo_header->loh_fid);
389         /*
390          * First call ->loo_object_delete() method to release all resources.
391          */
392         list_for_each_entry_reverse(scan, layers, lo_linkage) {
393                 if (scan->lo_ops->loo_object_delete != NULL)
394                         scan->lo_ops->loo_object_delete(env, scan);
395         }
396
397         /*
398          * Then, splice object layers into stand-alone list, and call
399          * ->loo_object_free() on all layers to free memory. Splice is
400          * necessary, because lu_object_header is freed together with the
401          * top-level slice.
402          */
403         list_splice_init(layers, &splice);
404         while (!list_empty(&splice)) {
405                 /*
406                  * Free layers in bottom-to-top order, so that object header
407                  * lives as long as possible and ->loo_object_free() methods
408                  * can look at its contents.
409                  */
410                 o = container_of(splice.prev, struct lu_object, lo_linkage);
411                 list_del_init(&o->lo_linkage);
412                 LASSERT(o->lo_ops->loo_object_free != NULL);
413                 o->lo_ops->loo_object_free(env, o);
414         }
415
416         if (waitqueue_active(wq))
417                 wake_up(wq);
418 }
419
420 /**
421  * Free \a nr objects from the cold end of the site LRU list.
422  * if canblock is 0, then don't block awaiting for another
423  * instance of lu_site_purge() to complete
424  */
425 int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
426                           int nr, int canblock)
427 {
428         struct lu_object_header *h;
429         struct lu_object_header *temp;
430         struct lu_site_bkt_data *bkt;
431         LIST_HEAD(dispose);
432         int                      did_sth;
433         unsigned int             start = 0;
434         int                      count;
435         int                      bnr;
436         unsigned int             i;
437
438         if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
439                 RETURN(0);
440
441         /*
442          * Under LRU list lock, scan LRU list and move unreferenced objects to
443          * the dispose list, removing them from LRU and hash table.
444          */
445         if (nr != ~0)
446                 start = s->ls_purge_start;
447         bnr = (nr == ~0) ? -1 : nr / s->ls_bkt_cnt + 1;
448 again:
449         /*
450          * It doesn't make any sense to make purge threads parallel, that can
451          * only bring troubles to us.  See LU-5331.
452          */
453         if (canblock != 0)
454                 mutex_lock(&s->ls_purge_mutex);
455         else if (mutex_trylock(&s->ls_purge_mutex) == 0)
456                 goto out;
457
458         did_sth = 0;
459         for (i = start; i < s->ls_bkt_cnt ; i++) {
460                 count = bnr;
461                 bkt = &s->ls_bkts[i];
462                 spin_lock(&bkt->lsb_waitq.lock);
463
464                 list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
465                         LASSERT(atomic_read(&h->loh_ref) == 0);
466
467                         LINVRNT(lu_bkt_hash(s, &h->loh_fid) == i);
468
469                         set_bit(LU_OBJECT_UNHASHED, &h->loh_flags);
470                         rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
471                                                obj_hash_params);
472                         list_move(&h->loh_lru, &dispose);
473                         percpu_counter_dec(&s->ls_lru_len_counter);
474                         if (did_sth == 0)
475                                 did_sth = 1;
476
477                         if (nr != ~0 && --nr == 0)
478                                 break;
479
480                         if (count > 0 && --count == 0)
481                                 break;
482
483                 }
484                 spin_unlock(&bkt->lsb_waitq.lock);
485                 cond_resched();
486                 /*
487                  * Free everything on the dispose list. This is safe against
488                  * races due to the reasons described in lu_object_put().
489                  */
490                 while ((h = list_first_entry_or_null(&dispose,
491                                                      struct lu_object_header,
492                                                      loh_lru)) != NULL) {
493                         list_del_init(&h->loh_lru);
494                         lu_object_free(env, lu_object_top(h));
495                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
496                 }
497
498                 if (nr == 0)
499                         break;
500         }
501         mutex_unlock(&s->ls_purge_mutex);
502
503         if (nr != 0 && did_sth && start != 0) {
504                 start = 0; /* restart from the first bucket */
505                 goto again;
506         }
507         /* race on s->ls_purge_start, but nobody cares */
508         s->ls_purge_start = i & (s->ls_bkt_cnt - 1);
509 out:
510         return nr;
511 }
512 EXPORT_SYMBOL(lu_site_purge_objects);
513
514 /*
515  * Object printing.
516  *
517  * Code below has to jump through certain loops to output object description
518  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
519  * composes object description from strings that are parts of _lines_ of
520  * output (i.e., strings that are not terminated by newline). This doesn't fit
521  * very well into libcfs_debug_msg() interface that assumes that each message
522  * supplied to it is a self-contained output line.
523  *
524  * To work around this, strings are collected in a temporary buffer
525  * (implemented as a value of lu_cdebug_key key), until terminating newline
526  * character is detected.
527  *
528  */
529
530 enum {
531         /**
532          * Maximal line size.
533          *
534          * XXX overflow is not handled correctly.
535          */
536         LU_CDEBUG_LINE = 512
537 };
538
539 struct lu_cdebug_data {
540         /**
541          * Temporary buffer.
542          */
543         char lck_area[LU_CDEBUG_LINE];
544 };
545
546 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
547 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
548
549 /**
550  * Key, holding temporary buffer. This key is registered very early by
551  * lu_global_init().
552  */
553 static struct lu_context_key lu_global_key = {
554         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
555                     LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
556         .lct_init = lu_global_key_init,
557         .lct_fini = lu_global_key_fini
558 };
559
560 /**
561  * Printer function emitting messages through libcfs_debug_msg().
562  */
563 int lu_cdebug_printer(const struct lu_env *env,
564                       void *cookie, const char *format, ...)
565 {
566         struct libcfs_debug_msg_data *msgdata = cookie;
567         struct lu_cdebug_data        *key;
568         int used;
569         int complete;
570         va_list args;
571
572         va_start(args, format);
573
574         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
575         LASSERT(key != NULL);
576
577         used = strlen(key->lck_area);
578         complete = format[strlen(format) - 1] == '\n';
579         /*
580          * Append new chunk to the buffer.
581          */
582         vsnprintf(key->lck_area + used,
583                   ARRAY_SIZE(key->lck_area) - used, format, args);
584         if (complete) {
585                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
586                         libcfs_debug_msg(msgdata, "%s\n", key->lck_area);
587                 key->lck_area[0] = 0;
588         }
589         va_end(args);
590         return 0;
591 }
592 EXPORT_SYMBOL(lu_cdebug_printer);
593
594 /**
595  * Print object header.
596  */
597 void lu_object_header_print(const struct lu_env *env, void *cookie,
598                             lu_printer_t printer,
599                             const struct lu_object_header *hdr)
600 {
601         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
602                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
603                    PFID(&hdr->loh_fid),
604                    test_bit(LU_OBJECT_UNHASHED,
605                             &hdr->loh_flags) ? "" : " hash",
606                    list_empty(&hdr->loh_lru) ? "" : " lru",
607                    hdr->loh_attr & LOHA_EXISTS ? " exist" : "");
608 }
609 EXPORT_SYMBOL(lu_object_header_print);
610
611 /**
612  * Print human readable representation of the \a o to the \a printer.
613  */
614 void lu_object_print(const struct lu_env *env, void *cookie,
615                      lu_printer_t printer, const struct lu_object *o)
616 {
617         static const char ruler[] = "........................................";
618         struct lu_object_header *top;
619         int depth = 4;
620
621         top = o->lo_header;
622         lu_object_header_print(env, cookie, printer, top);
623         (*printer)(env, cookie, "{\n");
624
625         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
626                 /*
627                  * print `.' \a depth times followed by type name and address
628                  */
629                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
630                            o->lo_dev->ld_type->ldt_name, o);
631
632                 if (o->lo_ops->loo_object_print != NULL)
633                         (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
634
635                 (*printer)(env, cookie, "\n");
636         }
637
638         (*printer)(env, cookie, "} header@%p\n", top);
639 }
640 EXPORT_SYMBOL(lu_object_print);
641
642 /**
643  * Check object consistency.
644  */
645 int lu_object_invariant(const struct lu_object *o)
646 {
647         struct lu_object_header *top;
648
649         top = o->lo_header;
650         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
651                 if (o->lo_ops->loo_object_invariant != NULL &&
652                     !o->lo_ops->loo_object_invariant(o))
653                         return 0;
654         }
655         return 1;
656 }
657
658 /*
659  * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because the
660  * calculation for the number of objects to reclaim is not covered by a lock the
661  * maximum number of objects is capped by LU_CACHE_MAX_ADJUST.  This ensures
662  * that many concurrent threads will not accidentally purge the entire cache.
663  */
664 static void lu_object_limit(const struct lu_env *env,
665                             struct lu_device *dev)
666 {
667         u64 size, nr;
668
669         if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
670                 return;
671
672         size = atomic_read(&dev->ld_site->ls_obj_hash.nelems);
673         nr = (u64)lu_cache_nr;
674         if (size <= nr)
675                 return;
676
677         lu_site_purge_objects(env, dev->ld_site,
678                               min_t(u64, size - nr, LU_CACHE_NR_MAX_ADJUST),
679                               0);
680 }
681
682 static struct lu_object *htable_lookup(const struct lu_env *env,
683                                        struct lu_device *dev,
684                                        struct lu_site_bkt_data *bkt,
685                                        const struct lu_fid *f,
686                                        struct lu_object_header *new)
687 {
688         struct lu_site *s = dev->ld_site;
689         struct lu_object_header *h;
690
691 try_again:
692         rcu_read_lock();
693         if (new)
694                 h = rhashtable_lookup_get_insert_fast(&s->ls_obj_hash,
695                                                       &new->loh_hash,
696                                                       obj_hash_params);
697         else
698                 h = rhashtable_lookup(&s->ls_obj_hash, f, obj_hash_params);
699
700         if (IS_ERR_OR_NULL(h)) {
701                 /* Not found */
702                 if (!new)
703                         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
704                 rcu_read_unlock();
705                 if (PTR_ERR(h) == -ENOMEM) {
706                         msleep(20);
707                         goto try_again;
708                 }
709                 lu_object_limit(env, dev);
710                 if (PTR_ERR(h) == -E2BIG)
711                         goto try_again;
712
713                 return ERR_PTR(-ENOENT);
714         }
715
716         if (atomic_inc_not_zero(&h->loh_ref)) {
717                 rcu_read_unlock();
718                 return lu_object_top(h);
719         }
720
721         spin_lock(&bkt->lsb_waitq.lock);
722         if (lu_object_is_dying(h) ||
723             test_bit(LU_OBJECT_UNHASHED, &h->loh_flags)) {
724                 spin_unlock(&bkt->lsb_waitq.lock);
725                 rcu_read_unlock();
726                 if (new) {
727                         /*
728                          * Old object might have already been removed, or will
729                          * be soon.  We need to insert our new object, so
730                          * remove the old one just in case it is still there.
731                          */
732                         rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
733                                                obj_hash_params);
734                         goto try_again;
735                 }
736                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
737                 return ERR_PTR(-ENOENT);
738         }
739         /* Now protected by spinlock */
740         rcu_read_unlock();
741
742         if (!list_empty(&h->loh_lru)) {
743                 list_del_init(&h->loh_lru);
744                 percpu_counter_dec(&s->ls_lru_len_counter);
745         }
746         atomic_inc(&h->loh_ref);
747         spin_unlock(&bkt->lsb_waitq.lock);
748         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
749         return lu_object_top(h);
750 }
751
752 /**
753  * Search cache for an object with the fid \a f. If such object is found,
754  * return it. Otherwise, create new object, insert it into cache and return
755  * it. In any case, additional reference is acquired on the returned object.
756  */
757 struct lu_object *lu_object_find(const struct lu_env *env,
758                                  struct lu_device *dev, const struct lu_fid *f,
759                                  const struct lu_object_conf *conf)
760 {
761         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
762 }
763 EXPORT_SYMBOL(lu_object_find);
764
765 /*
766  * Get a 'first' reference to an object that was found while looking through the
767  * hash table.
768  */
769 struct lu_object *lu_object_get_first(struct lu_object_header *h,
770                                       struct lu_device *dev)
771 {
772         struct lu_site *s = dev->ld_site;
773         struct lu_object *ret;
774
775         if (IS_ERR_OR_NULL(h) || lu_object_is_dying(h))
776                 return NULL;
777
778         ret = lu_object_locate(h, dev->ld_type);
779         if (!ret)
780                 return ret;
781
782         if (!atomic_inc_not_zero(&h->loh_ref)) {
783                 struct lu_site_bkt_data *bkt;
784
785                 bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)];
786                 spin_lock(&bkt->lsb_waitq.lock);
787                 if (!lu_object_is_dying(h) &&
788                     !test_bit(LU_OBJECT_UNHASHED, &h->loh_flags))
789                         atomic_inc(&h->loh_ref);
790                 else
791                         ret = NULL;
792                 spin_unlock(&bkt->lsb_waitq.lock);
793         }
794         return ret;
795 }
796 EXPORT_SYMBOL(lu_object_get_first);
797
798 /**
799  * Core logic of lu_object_find*() functions.
800  *
801  * Much like lu_object_find(), but top level device of object is specifically
802  * \a dev rather than top level device of the site. This interface allows
803  * objects of different "stacking" to be created within the same site.
804  */
805 struct lu_object *lu_object_find_at(const struct lu_env *env,
806                                     struct lu_device *dev,
807                                     const struct lu_fid *f,
808                                     const struct lu_object_conf *conf)
809 {
810         struct lu_object *o;
811         struct lu_object *shadow;
812         struct lu_site *s;
813         struct lu_site_bkt_data *bkt;
814         struct rhashtable *hs;
815         int rc;
816
817         ENTRY;
818
819         /* FID is from disk or network, zero FID is meaningless, return error
820          * early to avoid assertion in lu_object_put. If a zero FID is wanted,
821          * it should be allocated via lu_object_anon().
822          */
823         if (fid_is_zero(f))
824                 RETURN(ERR_PTR(-EINVAL));
825
826         /*
827          * This uses standard index maintenance protocol:
828          *
829          *     - search index under lock, and return object if found;
830          *     - otherwise, unlock index, allocate new object;
831          *     - lock index and search again;
832          *     - if nothing is found (usual case), insert newly created
833          *       object into index;
834          *     - otherwise (race: other thread inserted object), free
835          *       object just allocated.
836          *     - unlock index;
837          *     - return object.
838          *
839          * For "LOC_F_NEW" case, we are sure the object is new established.
840          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
841          * just alloc and insert directly.
842          *
843          */
844         s  = dev->ld_site;
845         hs = &s->ls_obj_hash;
846
847         if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
848                 lu_site_purge(env, s, -1);
849
850         bkt = &s->ls_bkts[lu_bkt_hash(s, f)];
851         if (!(conf && conf->loc_flags & LOC_F_NEW)) {
852                 o = htable_lookup(env, dev, bkt, f, NULL);
853
854                 if (!IS_ERR(o)) {
855                         if (likely(lu_object_is_inited(o->lo_header)))
856                                 RETURN(o);
857
858                         wait_event_idle(bkt->lsb_waitq,
859                                         lu_object_is_inited(o->lo_header) ||
860                                         lu_object_is_dying(o->lo_header));
861
862                         if (lu_object_is_dying(o->lo_header)) {
863                                 lu_object_put(env, o);
864
865                                 RETURN(ERR_PTR(-ENOENT));
866                         }
867
868                         RETURN(o);
869                 }
870
871                 if (PTR_ERR(o) != -ENOENT)
872                         RETURN(o);
873         }
874
875         /*
876          * Allocate new object, NB, object is unitialized in case object
877          * is changed between allocation and hash insertion, thus the object
878          * with stale attributes is returned.
879          */
880         o = lu_object_alloc(env, dev, f);
881         if (IS_ERR(o))
882                 RETURN(o);
883
884         LASSERT(lu_fid_eq(lu_object_fid(o), f));
885
886         CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
887
888         if (conf && conf->loc_flags & LOC_F_NEW) {
889                 int status = rhashtable_insert_fast(hs, &o->lo_header->loh_hash,
890                                                     obj_hash_params);
891                 if (status)
892                         /* Strange error - go the slow way */
893                         shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
894                 else
895                         shadow = ERR_PTR(-ENOENT);
896         } else {
897                 shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
898         }
899         if (likely(PTR_ERR(shadow) == -ENOENT)) {
900                 /*
901                  * The new object has been successfully inserted.
902                  *
903                  * This may result in rather complicated operations, including
904                  * fld queries, inode loading, etc.
905                  */
906                 rc = lu_object_start(env, dev, o, conf);
907                 if (rc) {
908                         lu_object_put_nocache(env, o);
909                         RETURN(ERR_PTR(rc));
910                 }
911
912                 wake_up(&bkt->lsb_waitq);
913
914                 lu_object_limit(env, dev);
915
916                 RETURN(o);
917         }
918
919         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
920         lu_object_free(env, o);
921
922         if (!(conf && conf->loc_flags & LOC_F_NEW) &&
923             !IS_ERR(shadow) &&
924             !lu_object_is_inited(shadow->lo_header)) {
925                 wait_event_idle(bkt->lsb_waitq,
926                                 lu_object_is_inited(shadow->lo_header) ||
927                                 lu_object_is_dying(shadow->lo_header));
928
929                 if (lu_object_is_dying(shadow->lo_header)) {
930                         lu_object_put(env, shadow);
931
932                         RETURN(ERR_PTR(-ENOENT));
933                 }
934         }
935
936         RETURN(shadow);
937 }
938 EXPORT_SYMBOL(lu_object_find_at);
939
940 /**
941  * Find object with given fid, and return its slice belonging to given device.
942  */
943 struct lu_object *lu_object_find_slice(const struct lu_env *env,
944                                        struct lu_device *dev,
945                                        const struct lu_fid *f,
946                                        const struct lu_object_conf *conf)
947 {
948         struct lu_object *top;
949         struct lu_object *obj;
950
951         top = lu_object_find(env, dev, f, conf);
952         if (IS_ERR(top))
953                 return top;
954
955         obj = lu_object_locate(top->lo_header, dev->ld_type);
956         if (unlikely(obj == NULL)) {
957                 lu_object_put(env, top);
958                 obj = ERR_PTR(-ENOENT);
959         }
960
961         return obj;
962 }
963 EXPORT_SYMBOL(lu_object_find_slice);
964
965 int lu_device_type_init(struct lu_device_type *ldt)
966 {
967         int result = 0;
968
969         atomic_set(&ldt->ldt_device_nr, 0);
970         if (ldt->ldt_ops->ldto_init)
971                 result = ldt->ldt_ops->ldto_init(ldt);
972
973         return result;
974 }
975 EXPORT_SYMBOL(lu_device_type_init);
976
977 void lu_device_type_fini(struct lu_device_type *ldt)
978 {
979         if (ldt->ldt_ops->ldto_fini)
980                 ldt->ldt_ops->ldto_fini(ldt);
981 }
982 EXPORT_SYMBOL(lu_device_type_fini);
983
984 /**
985  * Global list of all sites on this node
986  */
987 static LIST_HEAD(lu_sites);
988 static DECLARE_RWSEM(lu_sites_guard);
989
990 /**
991  * Global environment used by site shrinker.
992  */
993 static struct lu_env lu_shrink_env;
994
995 struct lu_site_print_arg {
996         struct lu_env   *lsp_env;
997         void            *lsp_cookie;
998         lu_printer_t     lsp_printer;
999 };
1000
1001 static void
1002 lu_site_obj_print(struct lu_object_header *h, struct lu_site_print_arg *arg)
1003 {
1004         if (!list_empty(&h->loh_layers)) {
1005                 const struct lu_object *o;
1006
1007                 o = lu_object_top(h);
1008                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
1009                                 arg->lsp_printer, o);
1010         } else {
1011                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
1012                                        arg->lsp_printer, h);
1013         }
1014 }
1015
1016 /**
1017  * Print all objects in \a s.
1018  */
1019 void lu_site_print(const struct lu_env *env, struct lu_site *s, atomic_t *ref,
1020                    int msg_flag, lu_printer_t printer)
1021 {
1022         struct lu_site_print_arg arg = {
1023                 .lsp_env     = (struct lu_env *)env,
1024                 .lsp_printer = printer,
1025         };
1026         struct rhashtable_iter iter;
1027         struct lu_object_header *h;
1028         LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, msg_flag, NULL);
1029
1030         if (!s || !atomic_read(ref))
1031                 return;
1032
1033         arg.lsp_cookie = (void *)&msgdata;
1034
1035         rhashtable_walk_enter(&s->ls_obj_hash, &iter);
1036         rhashtable_walk_start(&iter);
1037         while ((h = rhashtable_walk_next(&iter)) != NULL) {
1038                 if (IS_ERR(h))
1039                         continue;
1040                 lu_site_obj_print(h, &arg);
1041         }
1042         rhashtable_walk_stop(&iter);
1043         rhashtable_walk_exit(&iter);
1044 }
1045 EXPORT_SYMBOL(lu_site_print);
1046
1047 /**
1048  * Return desired hash table order.
1049  */
1050 static void lu_htable_limits(struct lu_device *top)
1051 {
1052         unsigned long cache_size;
1053
1054         /*
1055          * For ZFS based OSDs the cache should be disabled by default.  This
1056          * allows the ZFS ARC maximum flexibility in determining what buffers
1057          * to cache.  If Lustre has objects or buffer which it wants to ensure
1058          * always stay cached it must maintain a hold on them.
1059          */
1060         if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
1061                 lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
1062                 return;
1063         }
1064
1065         /*
1066          * Calculate hash table size, assuming that we want reasonable
1067          * performance when 20% of total memory is occupied by cache of
1068          * lu_objects.
1069          *
1070          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
1071          */
1072         cache_size = cfs_totalram_pages();
1073
1074 #if BITS_PER_LONG == 32
1075         /* limit hashtable size for lowmem systems to low RAM */
1076         if (cache_size > 1 << (30 - PAGE_SHIFT))
1077                 cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4;
1078 #endif
1079
1080         /* clear off unreasonable cache setting. */
1081         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
1082                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in the range of (0, %u]. Will use default value: %u.\n",
1083                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
1084                       LU_CACHE_PERCENT_DEFAULT);
1085
1086                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
1087         }
1088         cache_size = cache_size / 100 * lu_cache_percent *
1089                 (PAGE_SIZE / 1024);
1090
1091         lu_cache_nr = clamp_t(typeof(cache_size), cache_size,
1092                               LU_CACHE_NR_MIN, LU_CACHE_NR_MAX);
1093 }
1094
1095 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
1096 {
1097         spin_lock(&s->ls_ld_lock);
1098         if (list_empty(&d->ld_linkage))
1099                 list_add(&d->ld_linkage, &s->ls_ld_linkage);
1100         spin_unlock(&s->ls_ld_lock);
1101 }
1102 EXPORT_SYMBOL(lu_dev_add_linkage);
1103
1104 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
1105 {
1106         spin_lock(&s->ls_ld_lock);
1107         list_del_init(&d->ld_linkage);
1108         spin_unlock(&s->ls_ld_lock);
1109 }
1110 EXPORT_SYMBOL(lu_dev_del_linkage);
1111
1112 /**
1113   * Initialize site \a s, with \a d as the top level device.
1114   */
1115 int lu_site_init(struct lu_site *s, struct lu_device *top)
1116 {
1117         struct lu_site_bkt_data *bkt;
1118         unsigned int i;
1119         int rc;
1120         ENTRY;
1121
1122         memset(s, 0, sizeof *s);
1123         mutex_init(&s->ls_purge_mutex);
1124         lu_htable_limits(top);
1125
1126 #ifdef HAVE_PERCPU_COUNTER_INIT_GFP_FLAG
1127         rc = percpu_counter_init(&s->ls_lru_len_counter, 0, GFP_NOFS);
1128 #else
1129         rc = percpu_counter_init(&s->ls_lru_len_counter, 0);
1130 #endif
1131         if (rc)
1132                 return -ENOMEM;
1133
1134         if (rhashtable_init(&s->ls_obj_hash, &obj_hash_params) != 0) {
1135                 CERROR("failed to create lu_site hash\n");
1136                 return -ENOMEM;
1137         }
1138
1139         s->ls_bkt_seed = prandom_u32();
1140         s->ls_bkt_cnt = max_t(long, 1 << LU_SITE_BKT_BITS,
1141                               2 * num_possible_cpus());
1142         s->ls_bkt_cnt = roundup_pow_of_two(s->ls_bkt_cnt);
1143         OBD_ALLOC_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
1144         if (!s->ls_bkts) {
1145                 rhashtable_destroy(&s->ls_obj_hash);
1146                 s->ls_bkts = NULL;
1147                 return -ENOMEM;
1148         }
1149
1150         for (i = 0; i < s->ls_bkt_cnt; i++) {
1151                 bkt = &s->ls_bkts[i];
1152                 INIT_LIST_HEAD(&bkt->lsb_lru);
1153                 init_waitqueue_head(&bkt->lsb_waitq);
1154         }
1155
1156         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
1157         if (s->ls_stats == NULL) {
1158                 OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
1159                 s->ls_bkts = NULL;
1160                 rhashtable_destroy(&s->ls_obj_hash);
1161                 return -ENOMEM;
1162         }
1163
1164         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
1165                              0, "created", "created");
1166         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
1167                              0, "cache_hit", "cache_hit");
1168         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
1169                              0, "cache_miss", "cache_miss");
1170         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
1171                              0, "cache_race", "cache_race");
1172         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
1173                              0, "cache_death_race", "cache_death_race");
1174         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
1175                              0, "lru_purged", "lru_purged");
1176
1177         INIT_LIST_HEAD(&s->ls_linkage);
1178         s->ls_top_dev = top;
1179         top->ld_site = s;
1180         lu_device_get(top);
1181         lu_ref_add(&top->ld_reference, "site-top", s);
1182
1183         INIT_LIST_HEAD(&s->ls_ld_linkage);
1184         spin_lock_init(&s->ls_ld_lock);
1185
1186         lu_dev_add_linkage(s, top);
1187
1188         RETURN(0);
1189 }
1190 EXPORT_SYMBOL(lu_site_init);
1191
1192 /**
1193  * Finalize \a s and release its resources.
1194  */
1195 void lu_site_fini(struct lu_site *s)
1196 {
1197         down_write(&lu_sites_guard);
1198         list_del_init(&s->ls_linkage);
1199         up_write(&lu_sites_guard);
1200
1201         percpu_counter_destroy(&s->ls_lru_len_counter);
1202
1203         if (s->ls_bkts) {
1204                 rhashtable_destroy(&s->ls_obj_hash);
1205                 OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
1206                 s->ls_bkts = NULL;
1207         }
1208
1209         if (s->ls_top_dev != NULL) {
1210                 s->ls_top_dev->ld_site = NULL;
1211                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1212                 lu_device_put(s->ls_top_dev);
1213                 s->ls_top_dev = NULL;
1214         }
1215
1216         if (s->ls_stats != NULL)
1217                 lprocfs_free_stats(&s->ls_stats);
1218 }
1219 EXPORT_SYMBOL(lu_site_fini);
1220
1221 /**
1222  * Called when initialization of stack for this site is completed.
1223  */
1224 int lu_site_init_finish(struct lu_site *s)
1225 {
1226         int result;
1227         down_write(&lu_sites_guard);
1228         result = lu_context_refill(&lu_shrink_env.le_ctx);
1229         if (result == 0)
1230                 list_add(&s->ls_linkage, &lu_sites);
1231         up_write(&lu_sites_guard);
1232         return result;
1233 }
1234 EXPORT_SYMBOL(lu_site_init_finish);
1235
1236 /**
1237  * Acquire additional reference on device \a d
1238  */
1239 void lu_device_get(struct lu_device *d)
1240 {
1241         atomic_inc(&d->ld_ref);
1242 }
1243 EXPORT_SYMBOL(lu_device_get);
1244
1245 /**
1246  * Release reference on device \a d.
1247  */
1248 void lu_device_put(struct lu_device *d)
1249 {
1250         LASSERT(atomic_read(&d->ld_ref) > 0);
1251         atomic_dec(&d->ld_ref);
1252 }
1253 EXPORT_SYMBOL(lu_device_put);
1254
1255 enum { /* Maximal number of tld slots. */
1256         LU_CONTEXT_KEY_NR = 40
1257 };
1258 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1259 static DECLARE_RWSEM(lu_key_initing);
1260
1261 /**
1262  * Initialize device \a d of type \a t.
1263  */
1264 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1265 {
1266         if (atomic_add_unless(&t->ldt_device_nr, 1, 0) == 0) {
1267                 down_write(&lu_key_initing);
1268                 if (t->ldt_ops->ldto_start &&
1269                     atomic_read(&t->ldt_device_nr) == 0)
1270                         t->ldt_ops->ldto_start(t);
1271                 atomic_inc(&t->ldt_device_nr);
1272                 up_write(&lu_key_initing);
1273         }
1274
1275         memset(d, 0, sizeof *d);
1276         d->ld_type = t;
1277         lu_ref_init(&d->ld_reference);
1278         INIT_LIST_HEAD(&d->ld_linkage);
1279
1280         return 0;
1281 }
1282 EXPORT_SYMBOL(lu_device_init);
1283
1284 /**
1285  * Finalize device \a d.
1286  */
1287 void lu_device_fini(struct lu_device *d)
1288 {
1289         struct lu_device_type *t = d->ld_type;
1290
1291         if (d->ld_obd != NULL) {
1292                 d->ld_obd->obd_lu_dev = NULL;
1293                 d->ld_obd = NULL;
1294         }
1295
1296         lu_ref_fini(&d->ld_reference);
1297         LASSERTF(atomic_read(&d->ld_ref) == 0,
1298                  "Refcount is %u\n", atomic_read(&d->ld_ref));
1299         LASSERT(atomic_read(&t->ldt_device_nr) > 0);
1300
1301         if (atomic_dec_and_test(&t->ldt_device_nr) &&
1302             t->ldt_ops->ldto_stop != NULL)
1303                 t->ldt_ops->ldto_stop(t);
1304 }
1305 EXPORT_SYMBOL(lu_device_fini);
1306
1307 /**
1308  * Initialize object \a o that is part of compound object \a h and was created
1309  * by device \a d.
1310  */
1311 int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1312                    struct lu_device *d)
1313 {
1314         memset(o, 0, sizeof(*o));
1315         o->lo_header = h;
1316         o->lo_dev = d;
1317         lu_device_get(d);
1318         lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
1319         INIT_LIST_HEAD(&o->lo_linkage);
1320
1321         return 0;
1322 }
1323 EXPORT_SYMBOL(lu_object_init);
1324
1325 /**
1326  * Finalize object and release its resources.
1327  */
1328 void lu_object_fini(struct lu_object *o)
1329 {
1330         struct lu_device *dev = o->lo_dev;
1331
1332         LASSERT(list_empty(&o->lo_linkage));
1333
1334         if (dev != NULL) {
1335                 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1336                               "lu_object", o);
1337                 lu_device_put(dev);
1338                 o->lo_dev = NULL;
1339         }
1340 }
1341 EXPORT_SYMBOL(lu_object_fini);
1342
1343 /**
1344  * Add object \a o as first layer of compound object \a h
1345  *
1346  * This is typically called by the ->ldo_object_alloc() method of top-level
1347  * device.
1348  */
1349 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1350 {
1351         list_move(&o->lo_linkage, &h->loh_layers);
1352 }
1353 EXPORT_SYMBOL(lu_object_add_top);
1354
1355 /**
1356  * Add object \a o as a layer of compound object, going after \a before.
1357  *
1358  * This is typically called by the ->ldo_object_alloc() method of \a
1359  * before->lo_dev.
1360  */
1361 void lu_object_add(struct lu_object *before, struct lu_object *o)
1362 {
1363         list_move(&o->lo_linkage, &before->lo_linkage);
1364 }
1365 EXPORT_SYMBOL(lu_object_add);
1366
1367 /**
1368  * Initialize compound object.
1369  */
1370 int lu_object_header_init(struct lu_object_header *h)
1371 {
1372         memset(h, 0, sizeof *h);
1373         atomic_set(&h->loh_ref, 1);
1374         INIT_LIST_HEAD(&h->loh_lru);
1375         INIT_LIST_HEAD(&h->loh_layers);
1376         lu_ref_init(&h->loh_reference);
1377         return 0;
1378 }
1379 EXPORT_SYMBOL(lu_object_header_init);
1380
1381 /**
1382  * Finalize compound object.
1383  */
1384 void lu_object_header_fini(struct lu_object_header *h)
1385 {
1386         LASSERT(list_empty(&h->loh_layers));
1387         LASSERT(list_empty(&h->loh_lru));
1388         lu_ref_fini(&h->loh_reference);
1389 }
1390 EXPORT_SYMBOL(lu_object_header_fini);
1391
1392 /**
1393  * Free lu_object_header with proper RCU handling
1394  */
1395 void lu_object_header_free(struct lu_object_header *h)
1396 {
1397         lu_object_header_fini(h);
1398         OBD_FREE_PRE(h, sizeof(*h), "kfreed");
1399         kfree_rcu(h, loh_rcu);
1400 }
1401 EXPORT_SYMBOL(lu_object_header_free);
1402
1403 /**
1404  * Given a compound object, find its slice, corresponding to the device type
1405  * \a dtype.
1406  */
1407 struct lu_object *lu_object_locate(struct lu_object_header *h,
1408                                    const struct lu_device_type *dtype)
1409 {
1410         struct lu_object *o;
1411
1412         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1413                 if (o->lo_dev->ld_type == dtype)
1414                         return o;
1415         }
1416         return NULL;
1417 }
1418 EXPORT_SYMBOL(lu_object_locate);
1419
1420 /**
1421  * Finalize and free devices in the device stack.
1422  *
1423  * Finalize device stack by purging object cache, and calling
1424  * lu_device_type_operations::ldto_device_fini() and
1425  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1426  */
1427 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1428 {
1429         struct lu_site   *site = top->ld_site;
1430         struct lu_device *scan;
1431         struct lu_device *next;
1432
1433         lu_site_purge(env, site, ~0);
1434         for (scan = top; scan != NULL; scan = next) {
1435                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1436                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1437                 lu_device_put(scan);
1438         }
1439
1440         /* purge again. */
1441         lu_site_purge(env, site, ~0);
1442
1443         for (scan = top; scan != NULL; scan = next) {
1444                 const struct lu_device_type *ldt = scan->ld_type;
1445
1446                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1447         }
1448 }
1449
1450 /**
1451  * Global counter incremented whenever key is registered, unregistered,
1452  * revived or quiesced. This is used to void unnecessary calls to
1453  * lu_context_refill(). No locking is provided, as initialization and shutdown
1454  * are supposed to be externally serialized.
1455  */
1456 static atomic_t key_set_version = ATOMIC_INIT(0);
1457
1458 /**
1459  * Register new key.
1460  */
1461 int lu_context_key_register(struct lu_context_key *key)
1462 {
1463         int result;
1464         unsigned int i;
1465
1466         LASSERT(key->lct_init != NULL);
1467         LASSERT(key->lct_fini != NULL);
1468         LASSERT(key->lct_tags != 0);
1469         LASSERT(key->lct_owner != NULL);
1470
1471         result = -ENFILE;
1472         atomic_set(&key->lct_used, 1);
1473         lu_ref_init(&key->lct_reference);
1474         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1475                 if (lu_keys[i])
1476                         continue;
1477                 key->lct_index = i;
1478
1479                 if (strncmp("osd_", module_name(key->lct_owner), 4) == 0)
1480                         CFS_RACE_WAIT(OBD_FAIL_OBD_SETUP);
1481
1482                 if (cmpxchg(&lu_keys[i], NULL, key) != NULL)
1483                         continue;
1484
1485                 result = 0;
1486                 atomic_inc(&key_set_version);
1487                 break;
1488         }
1489         if (result) {
1490                 lu_ref_fini(&key->lct_reference);
1491                 atomic_set(&key->lct_used, 0);
1492         }
1493         return result;
1494 }
1495 EXPORT_SYMBOL(lu_context_key_register);
1496
1497 static void key_fini(struct lu_context *ctx, int index)
1498 {
1499         if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1500                 struct lu_context_key *key;
1501
1502                 key = lu_keys[index];
1503                 LASSERT(key != NULL);
1504                 LASSERT(key->lct_fini != NULL);
1505                 LASSERT(atomic_read(&key->lct_used) > 0);
1506
1507                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1508                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1509                 if (atomic_dec_and_test(&key->lct_used))
1510                         wake_up_var(&key->lct_used);
1511
1512                 LASSERT(key->lct_owner != NULL);
1513                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1514                         LINVRNT(module_refcount(key->lct_owner) > 0);
1515                         module_put(key->lct_owner);
1516                 }
1517                 ctx->lc_value[index] = NULL;
1518         }
1519 }
1520
1521 /**
1522  * Deregister key.
1523  */
1524 void lu_context_key_degister(struct lu_context_key *key)
1525 {
1526         LASSERT(atomic_read(&key->lct_used) >= 1);
1527         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1528
1529         lu_context_key_quiesce(NULL, key);
1530
1531         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1532
1533         /**
1534          * Wait until all transient contexts referencing this key have
1535          * run lu_context_key::lct_fini() method.
1536          */
1537         atomic_dec(&key->lct_used);
1538         wait_var_event(&key->lct_used, atomic_read(&key->lct_used) == 0);
1539
1540         if (!WARN_ON(lu_keys[key->lct_index] == NULL))
1541                 lu_ref_fini(&key->lct_reference);
1542
1543         smp_store_release(&lu_keys[key->lct_index], NULL);
1544 }
1545 EXPORT_SYMBOL(lu_context_key_degister);
1546
1547 /**
1548  * Register a number of keys. This has to be called after all keys have been
1549  * initialized by a call to LU_CONTEXT_KEY_INIT().
1550  */
1551 int lu_context_key_register_many(struct lu_context_key *k, ...)
1552 {
1553         struct lu_context_key *key = k;
1554         va_list args;
1555         int result;
1556
1557         va_start(args, k);
1558         do {
1559                 result = lu_context_key_register(key);
1560                 if (result)
1561                         break;
1562                 key = va_arg(args, struct lu_context_key *);
1563         } while (key != NULL);
1564         va_end(args);
1565
1566         if (result != 0) {
1567                 va_start(args, k);
1568                 while (k != key) {
1569                         lu_context_key_degister(k);
1570                         k = va_arg(args, struct lu_context_key *);
1571                 }
1572                 va_end(args);
1573         }
1574
1575         return result;
1576 }
1577 EXPORT_SYMBOL(lu_context_key_register_many);
1578
1579 /**
1580  * De-register a number of keys. This is a dual to
1581  * lu_context_key_register_many().
1582  */
1583 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1584 {
1585         va_list args;
1586
1587         va_start(args, k);
1588         do {
1589                 lu_context_key_degister(k);
1590                 k = va_arg(args, struct lu_context_key*);
1591         } while (k != NULL);
1592         va_end(args);
1593 }
1594 EXPORT_SYMBOL(lu_context_key_degister_many);
1595
1596 /**
1597  * Revive a number of keys.
1598  */
1599 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1600 {
1601         va_list args;
1602
1603         va_start(args, k);
1604         do {
1605                 lu_context_key_revive(k);
1606                 k = va_arg(args, struct lu_context_key*);
1607         } while (k != NULL);
1608         va_end(args);
1609 }
1610 EXPORT_SYMBOL(lu_context_key_revive_many);
1611
1612 /**
1613  * Quiescent a number of keys.
1614  */
1615 void lu_context_key_quiesce_many(struct lu_device_type *t,
1616                                  struct lu_context_key *k, ...)
1617 {
1618         va_list args;
1619
1620         va_start(args, k);
1621         do {
1622                 lu_context_key_quiesce(t, k);
1623                 k = va_arg(args, struct lu_context_key*);
1624         } while (k != NULL);
1625         va_end(args);
1626 }
1627 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1628
1629 /**
1630  * Return value associated with key \a key in context \a ctx.
1631  */
1632 void *lu_context_key_get(const struct lu_context *ctx,
1633                          const struct lu_context_key *key)
1634 {
1635         LINVRNT(ctx->lc_state == LCS_ENTERED);
1636         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1637         LASSERT(lu_keys[key->lct_index] == key);
1638         return ctx->lc_value[key->lct_index];
1639 }
1640 EXPORT_SYMBOL(lu_context_key_get);
1641
1642 /**
1643  * List of remembered contexts. XXX document me.
1644  */
1645 static LIST_HEAD(lu_context_remembered);
1646 static DEFINE_SPINLOCK(lu_context_remembered_guard);
1647
1648 /**
1649  * Destroy \a key in all remembered contexts. This is used to destroy key
1650  * values in "shared" contexts (like service threads), when a module owning
1651  * the key is about to be unloaded.
1652  */
1653 void lu_context_key_quiesce(struct lu_device_type *t,
1654                             struct lu_context_key *key)
1655 {
1656         struct lu_context *ctx;
1657
1658         if (key->lct_tags & LCT_QUIESCENT)
1659                 return;
1660         /*
1661          * The write-lock on lu_key_initing will ensure that any
1662          * keys_fill() which didn't see LCT_QUIESCENT will have
1663          * finished before we call key_fini().
1664          */
1665         down_write(&lu_key_initing);
1666         if (!(key->lct_tags & LCT_QUIESCENT)) {
1667                 if (t == NULL || atomic_read(&t->ldt_device_nr) == 0)
1668                         key->lct_tags |= LCT_QUIESCENT;
1669                 up_write(&lu_key_initing);
1670
1671                 spin_lock(&lu_context_remembered_guard);
1672                 list_for_each_entry(ctx, &lu_context_remembered, lc_remember) {
1673                         spin_until_cond(READ_ONCE(ctx->lc_state) != LCS_LEAVING);
1674                         key_fini(ctx, key->lct_index);
1675                 }
1676                 spin_unlock(&lu_context_remembered_guard);
1677
1678                 return;
1679         }
1680         up_write(&lu_key_initing);
1681 }
1682
1683 void lu_context_key_revive(struct lu_context_key *key)
1684 {
1685         key->lct_tags &= ~LCT_QUIESCENT;
1686         atomic_inc(&key_set_version);
1687 }
1688
1689 static void keys_fini(struct lu_context *ctx)
1690 {
1691         unsigned int i;
1692
1693         if (ctx->lc_value == NULL)
1694                 return;
1695
1696         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1697                 key_fini(ctx, i);
1698
1699         OBD_FREE_PTR_ARRAY(ctx->lc_value, ARRAY_SIZE(lu_keys));
1700         ctx->lc_value = NULL;
1701 }
1702
1703 static int keys_fill(struct lu_context *ctx)
1704 {
1705         unsigned int i;
1706         int rc = 0;
1707
1708         /*
1709          * A serialisation with lu_context_key_quiesce() is needed, to
1710          * ensure we see LCT_QUIESCENT and don't allocate a new value
1711          * after it freed one.  The rwsem provides this.  As down_read()
1712          * does optimistic spinning while the writer is active, this is
1713          * unlikely to ever sleep.
1714          */
1715         down_read(&lu_key_initing);
1716         ctx->lc_version = atomic_read(&key_set_version);
1717
1718         LINVRNT(ctx->lc_value);
1719         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1720                 struct lu_context_key *key;
1721
1722                 key = lu_keys[i];
1723                 if (!ctx->lc_value[i] && key &&
1724                     (key->lct_tags & ctx->lc_tags) &&
1725                     /*
1726                      * Don't create values for a LCT_QUIESCENT key, as this
1727                      * will pin module owning a key.
1728                      */
1729                     !(key->lct_tags & LCT_QUIESCENT)) {
1730                         void *value;
1731
1732                         LINVRNT(key->lct_init != NULL);
1733                         LINVRNT(key->lct_index == i);
1734
1735                         LASSERT(key->lct_owner != NULL);
1736                         if (!(ctx->lc_tags & LCT_NOREF) &&
1737                             try_module_get(key->lct_owner) == 0) {
1738                                 /* module is unloading, skip this key */
1739                                 continue;
1740                         }
1741
1742                         value = key->lct_init(ctx, key);
1743                         if (unlikely(IS_ERR(value))) {
1744                                 rc = PTR_ERR(value);
1745                                 break;
1746                         }
1747
1748                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1749                         atomic_inc(&key->lct_used);
1750                         /*
1751                          * This is the only place in the code, where an
1752                          * element of ctx->lc_value[] array is set to non-NULL
1753                          * value.
1754                          */
1755                         ctx->lc_value[i] = value;
1756                         if (key->lct_exit != NULL)
1757                                 ctx->lc_tags |= LCT_HAS_EXIT;
1758                 }
1759         }
1760
1761         up_read(&lu_key_initing);
1762         return rc;
1763 }
1764
1765 static int keys_init(struct lu_context *ctx)
1766 {
1767         OBD_ALLOC_PTR_ARRAY(ctx->lc_value, ARRAY_SIZE(lu_keys));
1768         if (likely(ctx->lc_value != NULL))
1769                 return keys_fill(ctx);
1770
1771         return -ENOMEM;
1772 }
1773
1774 /**
1775  * Initialize context data-structure. Create values for all keys.
1776  */
1777 int lu_context_init(struct lu_context *ctx, __u32 tags)
1778 {
1779         int     rc;
1780
1781         memset(ctx, 0, sizeof *ctx);
1782         ctx->lc_state = LCS_INITIALIZED;
1783         ctx->lc_tags = tags;
1784         if (tags & LCT_REMEMBER) {
1785                 spin_lock(&lu_context_remembered_guard);
1786                 list_add(&ctx->lc_remember, &lu_context_remembered);
1787                 spin_unlock(&lu_context_remembered_guard);
1788         } else {
1789                 INIT_LIST_HEAD(&ctx->lc_remember);
1790         }
1791
1792         rc = keys_init(ctx);
1793         if (rc != 0)
1794                 lu_context_fini(ctx);
1795
1796         return rc;
1797 }
1798 EXPORT_SYMBOL(lu_context_init);
1799
1800 /**
1801  * Finalize context data-structure. Destroy key values.
1802  */
1803 void lu_context_fini(struct lu_context *ctx)
1804 {
1805         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1806         ctx->lc_state = LCS_FINALIZED;
1807
1808         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1809                 LASSERT(list_empty(&ctx->lc_remember));
1810         } else {
1811                 /* could race with key degister */
1812                 spin_lock(&lu_context_remembered_guard);
1813                 list_del_init(&ctx->lc_remember);
1814                 spin_unlock(&lu_context_remembered_guard);
1815         }
1816         keys_fini(ctx);
1817 }
1818 EXPORT_SYMBOL(lu_context_fini);
1819
1820 /**
1821  * Called before entering context.
1822  */
1823 void lu_context_enter(struct lu_context *ctx)
1824 {
1825         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1826         ctx->lc_state = LCS_ENTERED;
1827 }
1828 EXPORT_SYMBOL(lu_context_enter);
1829
1830 /**
1831  * Called after exiting from \a ctx
1832  */
1833 void lu_context_exit(struct lu_context *ctx)
1834 {
1835         unsigned int i;
1836
1837         LINVRNT(ctx->lc_state == LCS_ENTERED);
1838         /*
1839          * Disable preempt to ensure we get a warning if
1840          * any lct_exit ever tries to sleep.  That would hurt
1841          * lu_context_key_quiesce() which spins waiting for us.
1842          * This also ensure we aren't preempted while the state
1843          * is LCS_LEAVING, as that too would cause problems for
1844          * lu_context_key_quiesce().
1845          */
1846         preempt_disable();
1847         /*
1848          * Ensure lu_context_key_quiesce() sees LCS_LEAVING
1849          * or we see LCT_QUIESCENT
1850          */
1851         smp_store_mb(ctx->lc_state, LCS_LEAVING);
1852         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) {
1853                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1854                         struct lu_context_key *key;
1855
1856                         key = lu_keys[i];
1857                         if (ctx->lc_value[i] &&
1858                             !(key->lct_tags & LCT_QUIESCENT) &&
1859                             key->lct_exit)
1860                                 key->lct_exit(ctx, key, ctx->lc_value[i]);
1861                 }
1862         }
1863
1864         smp_store_release(&ctx->lc_state, LCS_LEFT);
1865         preempt_enable();
1866 }
1867 EXPORT_SYMBOL(lu_context_exit);
1868
1869 /**
1870  * Allocate for context all missing keys that were registered after context
1871  * creation. key_set_version is only changed in rare cases when modules
1872  * are loaded and removed.
1873  */
1874 int lu_context_refill(struct lu_context *ctx)
1875 {
1876         if (likely(ctx->lc_version == atomic_read(&key_set_version)))
1877                 return 0;
1878
1879         return keys_fill(ctx);
1880 }
1881
1882 /**
1883  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1884  * obd being added. Currently, this is only used on client side, specifically
1885  * for echo device client, for other stack (like ptlrpc threads), context are
1886  * predefined when the lu_device type are registered, during the module probe
1887  * phase.
1888  */
1889 u32 lu_context_tags_default = LCT_CL_THREAD;
1890 u32 lu_session_tags_default = LCT_SESSION;
1891
1892 void lu_context_tags_update(__u32 tags)
1893 {
1894         spin_lock(&lu_context_remembered_guard);
1895         lu_context_tags_default |= tags;
1896         atomic_inc(&key_set_version);
1897         spin_unlock(&lu_context_remembered_guard);
1898 }
1899 EXPORT_SYMBOL(lu_context_tags_update);
1900
1901 void lu_context_tags_clear(__u32 tags)
1902 {
1903         spin_lock(&lu_context_remembered_guard);
1904         lu_context_tags_default &= ~tags;
1905         atomic_inc(&key_set_version);
1906         spin_unlock(&lu_context_remembered_guard);
1907 }
1908 EXPORT_SYMBOL(lu_context_tags_clear);
1909
1910 void lu_session_tags_update(__u32 tags)
1911 {
1912         spin_lock(&lu_context_remembered_guard);
1913         lu_session_tags_default |= tags;
1914         atomic_inc(&key_set_version);
1915         spin_unlock(&lu_context_remembered_guard);
1916 }
1917 EXPORT_SYMBOL(lu_session_tags_update);
1918
1919 void lu_session_tags_clear(__u32 tags)
1920 {
1921         spin_lock(&lu_context_remembered_guard);
1922         lu_session_tags_default &= ~tags;
1923         atomic_inc(&key_set_version);
1924         spin_unlock(&lu_context_remembered_guard);
1925 }
1926 EXPORT_SYMBOL(lu_session_tags_clear);
1927
1928 int lu_env_init(struct lu_env *env, __u32 tags)
1929 {
1930         int result;
1931
1932         env->le_ses = NULL;
1933         result = lu_context_init(&env->le_ctx, tags);
1934         if (likely(result == 0))
1935                 lu_context_enter(&env->le_ctx);
1936         return result;
1937 }
1938 EXPORT_SYMBOL(lu_env_init);
1939
1940 void lu_env_fini(struct lu_env *env)
1941 {
1942         lu_context_exit(&env->le_ctx);
1943         lu_context_fini(&env->le_ctx);
1944         env->le_ses = NULL;
1945 }
1946 EXPORT_SYMBOL(lu_env_fini);
1947
1948 int lu_env_refill(struct lu_env *env)
1949 {
1950         int result;
1951
1952         result = lu_context_refill(&env->le_ctx);
1953         if (result == 0 && env->le_ses != NULL)
1954                 result = lu_context_refill(env->le_ses);
1955         return result;
1956 }
1957 EXPORT_SYMBOL(lu_env_refill);
1958
1959 /**
1960  * Currently, this API will only be used by echo client.
1961  * Because echo client and normal lustre client will share
1962  * same cl_env cache. So echo client needs to refresh
1963  * the env context after it get one from the cache, especially
1964  * when normal client and echo client co-exist in the same client.
1965  */
1966 int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
1967                           __u32 stags)
1968 {
1969         int    result;
1970
1971         if ((env->le_ctx.lc_tags & ctags) != ctags) {
1972                 env->le_ctx.lc_version = 0;
1973                 env->le_ctx.lc_tags |= ctags;
1974         }
1975
1976         if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
1977                 env->le_ses->lc_version = 0;
1978                 env->le_ses->lc_tags |= stags;
1979         }
1980
1981         result = lu_env_refill(env);
1982
1983         return result;
1984 }
1985 EXPORT_SYMBOL(lu_env_refill_by_tags);
1986
1987
1988 struct lu_env_item {
1989         struct task_struct *lei_task;   /* rhashtable key */
1990         struct rhash_head lei_linkage;
1991         struct lu_env *lei_env;
1992         struct rcu_head lei_rcu_head;
1993 };
1994
1995 static const struct rhashtable_params lu_env_rhash_params = {
1996         .key_len     = sizeof(struct task_struct *),
1997         .key_offset  = offsetof(struct lu_env_item, lei_task),
1998         .head_offset = offsetof(struct lu_env_item, lei_linkage),
1999     };
2000
2001 struct rhashtable lu_env_rhash;
2002
2003 struct lu_env_percpu {
2004         struct task_struct *lep_task;
2005         struct lu_env *lep_env ____cacheline_aligned_in_smp;
2006 };
2007
2008 static struct lu_env_percpu lu_env_percpu[NR_CPUS];
2009
2010 int lu_env_add_task(struct lu_env *env, struct task_struct *task)
2011 {
2012         struct lu_env_item *lei, *old;
2013
2014         LASSERT(env);
2015
2016         OBD_ALLOC_PTR(lei);
2017         if (!lei)
2018                 return -ENOMEM;
2019
2020         lei->lei_task = task;
2021         lei->lei_env = env;
2022
2023         old = rhashtable_lookup_get_insert_fast(&lu_env_rhash,
2024                                                 &lei->lei_linkage,
2025                                                 lu_env_rhash_params);
2026         LASSERT(!old);
2027
2028         return 0;
2029 }
2030 EXPORT_SYMBOL(lu_env_add_task);
2031
2032 int lu_env_add(struct lu_env *env)
2033 {
2034         return lu_env_add_task(env, current);
2035 }
2036 EXPORT_SYMBOL(lu_env_add);
2037
2038 static void lu_env_item_free(struct rcu_head *head)
2039 {
2040         struct lu_env_item *lei;
2041
2042         lei = container_of(head, struct lu_env_item, lei_rcu_head);
2043         OBD_FREE_PTR(lei);
2044 }
2045
2046 void lu_env_remove(struct lu_env *env)
2047 {
2048         struct lu_env_item *lei;
2049         const void *task = current;
2050         int i;
2051
2052         for_each_possible_cpu(i) {
2053                 if (lu_env_percpu[i].lep_env == env) {
2054                         LASSERT(lu_env_percpu[i].lep_task == task);
2055                         lu_env_percpu[i].lep_task = NULL;
2056                         lu_env_percpu[i].lep_env = NULL;
2057                 }
2058         }
2059
2060         /* The rcu_lock is not taking in this case since the key
2061          * used is the actual task_struct. This implies that each
2062          * object is only removed by the owning thread, so there
2063          * can never be a race on a particular object.
2064          */
2065         lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
2066                                      lu_env_rhash_params);
2067         if (lei && rhashtable_remove_fast(&lu_env_rhash, &lei->lei_linkage,
2068                                           lu_env_rhash_params) == 0)
2069                 call_rcu(&lei->lei_rcu_head, lu_env_item_free);
2070 }
2071 EXPORT_SYMBOL(lu_env_remove);
2072
2073 struct lu_env *lu_env_find(void)
2074 {
2075         struct lu_env *env = NULL;
2076         struct lu_env_item *lei;
2077         const void *task = current;
2078         int i = get_cpu();
2079
2080         if (lu_env_percpu[i].lep_task == current) {
2081                 env = lu_env_percpu[i].lep_env;
2082                 put_cpu();
2083                 LASSERT(env);
2084                 return env;
2085         }
2086
2087         lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
2088                                      lu_env_rhash_params);
2089         if (lei) {
2090                 env = lei->lei_env;
2091                 lu_env_percpu[i].lep_task = current;
2092                 lu_env_percpu[i].lep_env = env;
2093         }
2094         put_cpu();
2095
2096         return env;
2097 }
2098 EXPORT_SYMBOL(lu_env_find);
2099
2100 typedef struct lu_site_stats{
2101         unsigned        lss_populated;
2102         unsigned        lss_max_search;
2103         unsigned        lss_total;
2104         unsigned        lss_busy;
2105 } lu_site_stats_t;
2106
2107 static void lu_site_stats_get(const struct lu_site *s,
2108                               lu_site_stats_t *stats)
2109 {
2110         int cnt = atomic_read(&s->ls_obj_hash.nelems);
2111         /*
2112          * percpu_counter_sum_positive() won't accept a const pointer
2113          * as it does modify the struct by taking a spinlock
2114          */
2115         struct lu_site *s2 = (struct lu_site *)s;
2116
2117         stats->lss_busy += cnt -
2118                 percpu_counter_sum_positive(&s2->ls_lru_len_counter);
2119
2120         stats->lss_total += cnt;
2121         stats->lss_max_search = 0;
2122         stats->lss_populated = 0;
2123 }
2124
2125
2126 /*
2127  * lu_cache_shrink_count() returns an approximate number of cached objects
2128  * that can be freed by shrink_slab(). A counter, which tracks the
2129  * number of items in the site's lru, is maintained in a percpu_counter
2130  * for each site. The percpu values are incremented and decremented as
2131  * objects are added or removed from the lru. The percpu values are summed
2132  * and saved whenever a percpu value exceeds a threshold. Thus the saved,
2133  * summed value at any given time may not accurately reflect the current
2134  * lru length. But this value is sufficiently accurate for the needs of
2135  * a shrinker.
2136  *
2137  * Using a per cpu counter is a compromise solution to concurrent access:
2138  * lu_object_put() can update the counter without locking the site and
2139  * lu_cache_shrink_count can sum the counters without locking each
2140  * ls_obj_hash bucket.
2141  */
2142 static unsigned long lu_cache_shrink_count(struct shrinker *sk,
2143                                            struct shrink_control *sc)
2144 {
2145         struct lu_site *s;
2146         struct lu_site *tmp;
2147         unsigned long cached = 0;
2148
2149         if (!(sc->gfp_mask & __GFP_FS))
2150                 return 0;
2151
2152         down_read(&lu_sites_guard);
2153         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage)
2154                 cached += percpu_counter_read_positive(&s->ls_lru_len_counter);
2155         up_read(&lu_sites_guard);
2156
2157         cached = (cached / 100) * sysctl_vfs_cache_pressure;
2158         CDEBUG(D_INODE, "%ld objects cached, cache pressure %d\n",
2159                cached, sysctl_vfs_cache_pressure);
2160
2161         return cached;
2162 }
2163
2164 static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
2165                                           struct shrink_control *sc)
2166 {
2167         struct lu_site *s;
2168         struct lu_site *tmp;
2169         unsigned long remain = sc->nr_to_scan;
2170         LIST_HEAD(splice);
2171
2172         if (!(sc->gfp_mask & __GFP_FS))
2173                 /* We must not take the lu_sites_guard lock when
2174                  * __GFP_FS is *not* set because of the deadlock
2175                  * possibility detailed above. Additionally,
2176                  * since we cannot determine the number of
2177                  * objects in the cache without taking this
2178                  * lock, we're in a particularly tough spot. As
2179                  * a result, we'll just lie and say our cache is
2180                  * empty. This _should_ be ok, as we can't
2181                  * reclaim objects when __GFP_FS is *not* set
2182                  * anyways.
2183                  */
2184                 return SHRINK_STOP;
2185
2186         down_write(&lu_sites_guard);
2187         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
2188                 remain = lu_site_purge(&lu_shrink_env, s, remain);
2189                 /*
2190                  * Move just shrunk site to the tail of site list to
2191                  * assure shrinking fairness.
2192                  */
2193                 list_move_tail(&s->ls_linkage, &splice);
2194         }
2195         list_splice(&splice, lu_sites.prev);
2196         up_write(&lu_sites_guard);
2197
2198         return sc->nr_to_scan - remain;
2199 }
2200
2201 #ifdef HAVE_SHRINKER_COUNT
2202 static struct shrinker lu_site_shrinker = {
2203         .count_objects  = lu_cache_shrink_count,
2204         .scan_objects   = lu_cache_shrink_scan,
2205         .seeks          = DEFAULT_SEEKS,
2206 };
2207
2208 #else
2209 /*
2210  * There exists a potential lock inversion deadlock scenario when using
2211  * Lustre on top of ZFS. This occurs between one of ZFS's
2212  * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
2213  * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
2214  * while thread B will take the ht_lock and sleep on the lu_sites_guard
2215  * lock. Obviously neither thread will wake and drop their respective hold
2216  * on their lock.
2217  *
2218  * To prevent this from happening we must ensure the lu_sites_guard lock is
2219  * not taken while down this code path. ZFS reliably does not set the
2220  * __GFP_FS bit in its code paths, so this can be used to determine if it
2221  * is safe to take the lu_sites_guard lock.
2222  *
2223  * Ideally we should accurately return the remaining number of cached
2224  * objects without taking the lu_sites_guard lock, but this is not
2225  * possible in the current implementation.
2226  */
2227 static int lu_cache_shrink(struct shrinker *shrinker,
2228                            struct shrink_control *sc)
2229 {
2230         int cached = 0;
2231
2232         CDEBUG(D_INODE, "Shrink %lu objects\n", sc->nr_to_scan);
2233
2234         if (sc->nr_to_scan != 0)
2235                 lu_cache_shrink_scan(shrinker, sc);
2236
2237         cached = lu_cache_shrink_count(shrinker, sc);
2238         return cached;
2239 }
2240
2241 static struct shrinker lu_site_shrinker = {
2242         .shrink  = lu_cache_shrink,
2243         .seeks   = DEFAULT_SEEKS,
2244 };
2245
2246 #endif /* HAVE_SHRINKER_COUNT */
2247
2248
2249 /*
2250  * Debugging stuff.
2251  */
2252
2253 /**
2254  * Environment to be used in debugger, contains all tags.
2255  */
2256 static struct lu_env lu_debugging_env;
2257
2258 /**
2259  * Debugging printer function using printk().
2260  */
2261 int lu_printk_printer(const struct lu_env *env,
2262                       void *unused, const char *format, ...)
2263 {
2264         va_list args;
2265
2266         va_start(args, format);
2267         vprintk(format, args);
2268         va_end(args);
2269         return 0;
2270 }
2271
2272 int lu_debugging_setup(void)
2273 {
2274         return lu_env_init(&lu_debugging_env, ~0);
2275 }
2276
2277 void lu_context_keys_dump(void)
2278 {
2279         unsigned int i;
2280
2281         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
2282                 struct lu_context_key *key;
2283
2284                 key = lu_keys[i];
2285                 if (key != NULL) {
2286                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
2287                                i, key, key->lct_tags,
2288                                key->lct_init, key->lct_fini, key->lct_exit,
2289                                key->lct_index, atomic_read(&key->lct_used),
2290                                key->lct_owner ? key->lct_owner->name : "",
2291                                key->lct_owner);
2292                         lu_ref_print(&key->lct_reference);
2293                 }
2294         }
2295 }
2296
2297 /**
2298  * Initialization of global lu_* data.
2299  */
2300 int lu_global_init(void)
2301 {
2302         int result;
2303
2304         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
2305
2306         result = lu_ref_global_init();
2307         if (result != 0)
2308                 return result;
2309
2310         LU_CONTEXT_KEY_INIT(&lu_global_key);
2311         result = lu_context_key_register(&lu_global_key);
2312         if (result)
2313                 goto out_lu_ref;
2314
2315         /*
2316          * At this level, we don't know what tags are needed, so allocate them
2317          * conservatively. This should not be too bad, because this
2318          * environment is global.
2319          */
2320         down_write(&lu_sites_guard);
2321         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
2322         up_write(&lu_sites_guard);
2323         if (result) {
2324                 lu_context_key_degister(&lu_global_key);
2325                 goto out_lu_ref;
2326         }
2327
2328         /*
2329          * seeks estimation: 3 seeks to read a record from oi, one to read
2330          * inode, one for ea. Unfortunately setting this high value results in
2331          * lu_object/inode cache consuming all the memory.
2332          */
2333         result = register_shrinker(&lu_site_shrinker);
2334         if (result)
2335                 goto out_env;
2336
2337         result = rhashtable_init(&lu_env_rhash, &lu_env_rhash_params);
2338
2339         if (result)
2340                 goto out_shrinker;
2341
2342         return result;
2343
2344 out_shrinker:
2345         unregister_shrinker(&lu_site_shrinker);
2346 out_env:
2347         /* ordering here is explained in lu_global_fini() */
2348         lu_context_key_degister(&lu_global_key);
2349         down_write(&lu_sites_guard);
2350         lu_env_fini(&lu_shrink_env);
2351         up_write(&lu_sites_guard);
2352 out_lu_ref:
2353         lu_ref_global_fini();
2354         return result;
2355 }
2356
2357 /**
2358  * Dual to lu_global_init().
2359  */
2360 void lu_global_fini(void)
2361 {
2362         unregister_shrinker(&lu_site_shrinker);
2363
2364         lu_context_key_degister(&lu_global_key);
2365
2366         /*
2367          * Tear shrinker environment down _after_ de-registering
2368          * lu_global_key, because the latter has a value in the former.
2369          */
2370         down_write(&lu_sites_guard);
2371         lu_env_fini(&lu_shrink_env);
2372         up_write(&lu_sites_guard);
2373
2374         rhashtable_destroy(&lu_env_rhash);
2375
2376         lu_ref_global_fini();
2377 }
2378
2379 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
2380 {
2381 #ifdef CONFIG_PROC_FS
2382         struct lprocfs_counter ret;
2383
2384         lprocfs_stats_collect(stats, idx, &ret);
2385         return (__u32)ret.lc_count;
2386 #else
2387         return 0;
2388 #endif
2389 }
2390
2391 /**
2392  * Output site statistical counters into a buffer. Suitable for
2393  * lprocfs_rd_*()-style functions.
2394  */
2395 int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
2396 {
2397         const struct bucket_table *tbl;
2398         lu_site_stats_t stats;
2399         unsigned int chains;
2400
2401         memset(&stats, 0, sizeof(stats));
2402         lu_site_stats_get(s, &stats);
2403
2404         rcu_read_lock();
2405         tbl = rht_dereference_rcu(s->ls_obj_hash.tbl,
2406                                   &((struct lu_site *)s)->ls_obj_hash);
2407         chains = tbl->size;
2408         rcu_read_unlock();
2409         seq_printf(m, "%d/%d %d/%u %d %d %d %d %d %d %d\n",
2410                    stats.lss_busy,
2411                    stats.lss_total,
2412                    stats.lss_populated,
2413                    chains,
2414                    stats.lss_max_search,
2415                    ls_stats_read(s->ls_stats, LU_SS_CREATED),
2416                    ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2417                    ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2418                    ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2419                    ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2420                    ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2421         return 0;
2422 }
2423 EXPORT_SYMBOL(lu_site_stats_seq_print);
2424
2425 /**
2426  * Helper function to initialize a number of kmem slab caches at once.
2427  */
2428 int lu_kmem_init(struct lu_kmem_descr *caches)
2429 {
2430         int result;
2431         struct lu_kmem_descr *iter = caches;
2432
2433         for (result = 0; iter->ckd_cache != NULL; ++iter) {
2434                 *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
2435                                                      iter->ckd_size,
2436                                                      0, 0, NULL);
2437                 if (*iter->ckd_cache == NULL) {
2438                         result = -ENOMEM;
2439                         /* free all previously allocated caches */
2440                         lu_kmem_fini(caches);
2441                         break;
2442                 }
2443         }
2444         return result;
2445 }
2446 EXPORT_SYMBOL(lu_kmem_init);
2447
2448 /**
2449  * Helper function to finalize a number of kmem slab cached at once. Dual to
2450  * lu_kmem_init().
2451  */
2452 void lu_kmem_fini(struct lu_kmem_descr *caches)
2453 {
2454         for (; caches->ckd_cache != NULL; ++caches) {
2455                 if (*caches->ckd_cache != NULL) {
2456                         kmem_cache_destroy(*caches->ckd_cache);
2457                         *caches->ckd_cache = NULL;
2458                 }
2459         }
2460 }
2461 EXPORT_SYMBOL(lu_kmem_fini);
2462
2463 /**
2464  * Temporary solution to be able to assign fid in ->do_create()
2465  * till we have fully-functional OST fids
2466  */
2467 void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
2468                           const struct lu_fid *fid)
2469 {
2470         struct lu_site          *s = o->lo_dev->ld_site;
2471         struct lu_fid           *old = &o->lo_header->loh_fid;
2472         int rc;
2473
2474         LASSERT(fid_is_zero(old));
2475         *old = *fid;
2476 try_again:
2477         rc = rhashtable_lookup_insert_fast(&s->ls_obj_hash,
2478                                            &o->lo_header->loh_hash,
2479                                            obj_hash_params);
2480         /* supposed to be unique */
2481         LASSERT(rc != -EEXIST);
2482         /* handle hash table resizing */
2483         if (rc == -ENOMEM) {
2484                 msleep(20);
2485                 goto try_again;
2486         }
2487         /* trim the hash if its growing to big */
2488         lu_object_limit(env, o->lo_dev);
2489         if (rc == -E2BIG)
2490                 goto try_again;
2491
2492         LASSERTF(rc == 0, "failed hashtable insertion: rc = %d\n", rc);
2493 }
2494 EXPORT_SYMBOL(lu_object_assign_fid);
2495
2496 /**
2497  * allocates object with 0 (non-assiged) fid
2498  * XXX: temporary solution to be able to assign fid in ->do_create()
2499  *      till we have fully-functional OST fids
2500  */
2501 struct lu_object *lu_object_anon(const struct lu_env *env,
2502                                  struct lu_device *dev,
2503                                  const struct lu_object_conf *conf)
2504 {
2505         struct lu_fid fid;
2506         struct lu_object *o;
2507         int rc;
2508
2509         fid_zero(&fid);
2510         o = lu_object_alloc(env, dev, &fid);
2511         if (!IS_ERR(o)) {
2512                 rc = lu_object_start(env, dev, o, conf);
2513                 if (rc) {
2514                         lu_object_free(env, o);
2515                         return ERR_PTR(rc);
2516                 }
2517         }
2518
2519         return o;
2520 }
2521 EXPORT_SYMBOL(lu_object_anon);
2522
2523 struct lu_buf LU_BUF_NULL = {
2524         .lb_buf = NULL,
2525         .lb_len = 0
2526 };
2527 EXPORT_SYMBOL(LU_BUF_NULL);
2528
2529 void lu_buf_free(struct lu_buf *buf)
2530 {
2531         LASSERT(buf);
2532         if (buf->lb_buf) {
2533                 LASSERT(buf->lb_len > 0);
2534                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2535                 buf->lb_buf = NULL;
2536                 buf->lb_len = 0;
2537         }
2538 }
2539 EXPORT_SYMBOL(lu_buf_free);
2540
2541 void lu_buf_alloc(struct lu_buf *buf, size_t size)
2542 {
2543         LASSERT(buf);
2544         LASSERT(buf->lb_buf == NULL);
2545         LASSERT(buf->lb_len == 0);
2546         OBD_ALLOC_LARGE(buf->lb_buf, size);
2547         if (likely(buf->lb_buf))
2548                 buf->lb_len = size;
2549 }
2550 EXPORT_SYMBOL(lu_buf_alloc);
2551
2552 void lu_buf_realloc(struct lu_buf *buf, size_t size)
2553 {
2554         lu_buf_free(buf);
2555         lu_buf_alloc(buf, size);
2556 }
2557 EXPORT_SYMBOL(lu_buf_realloc);
2558
2559 struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, size_t len)
2560 {
2561         if (buf->lb_buf == NULL && buf->lb_len == 0)
2562                 lu_buf_alloc(buf, len);
2563
2564         if ((len > buf->lb_len) && (buf->lb_buf != NULL))
2565                 lu_buf_realloc(buf, len);
2566
2567         return buf;
2568 }
2569 EXPORT_SYMBOL(lu_buf_check_and_alloc);
2570
2571 /**
2572  * Increase the size of the \a buf.
2573  * preserves old data in buffer
2574  * old buffer remains unchanged on error
2575  * \retval 0 or -ENOMEM
2576  */
2577 int lu_buf_check_and_grow(struct lu_buf *buf, size_t len)
2578 {
2579         char *ptr;
2580
2581         if (len <= buf->lb_len)
2582                 return 0;
2583
2584         OBD_ALLOC_LARGE(ptr, len);
2585         if (ptr == NULL)
2586                 return -ENOMEM;
2587
2588         /* Free the old buf */
2589         if (buf->lb_buf != NULL) {
2590                 memcpy(ptr, buf->lb_buf, buf->lb_len);
2591                 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2592         }
2593
2594         buf->lb_buf = ptr;
2595         buf->lb_len = len;
2596         return 0;
2597 }
2598 EXPORT_SYMBOL(lu_buf_check_and_grow);