Whamcloud - gitweb
357e73f566c04d5d76f2d54ad2f7d76fdb66d7cf
[fs/lustre-release.git] / lustre / obdclass / cl_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client Lustre Object.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
40  */
41
42 /*
43  * Locking.
44  *
45  *  i_mutex
46  *      PG_locked
47  *          ->coh_lock_guard
48  *          ->coh_attr_guard
49  *          ->ls_guard
50  */
51
52 #define DEBUG_SUBSYSTEM S_CLASS
53
54 #include <libcfs/libcfs.h>
55 /* class_put_type() */
56 #include <obd_class.h>
57 #include <obd_support.h>
58 #include <lustre_fid.h>
59 #include <libcfs/list.h>
60 #include <libcfs/libcfs_hash.h> /* for cfs_hash stuff */
61 #include <cl_object.h>
62 #include "cl_internal.h"
63
64 static struct kmem_cache *cl_env_kmem;
65
66 /** Lock class of cl_object_header::coh_lock_guard */
67 static struct lock_class_key cl_lock_guard_class;
68 /** Lock class of cl_object_header::coh_attr_guard */
69 static struct lock_class_key cl_attr_guard_class;
70
71 extern __u32 lu_context_tags_default;
72 extern __u32 lu_session_tags_default;
73 /**
74  * Initialize cl_object_header.
75  */
76 int cl_object_header_init(struct cl_object_header *h)
77 {
78         int result;
79
80         ENTRY;
81         result = lu_object_header_init(&h->coh_lu);
82         if (result == 0) {
83                 spin_lock_init(&h->coh_lock_guard);
84                 spin_lock_init(&h->coh_attr_guard);
85                 lockdep_set_class(&h->coh_lock_guard, &cl_lock_guard_class);
86                 lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class);
87                 INIT_LIST_HEAD(&h->coh_locks);
88                 h->coh_page_bufsize = 0;
89         }
90         RETURN(result);
91 }
92 EXPORT_SYMBOL(cl_object_header_init);
93
94 /**
95  * Finalize cl_object_header.
96  */
97 void cl_object_header_fini(struct cl_object_header *h)
98 {
99         LASSERT(list_empty(&h->coh_locks));
100         lu_object_header_fini(&h->coh_lu);
101 }
102 EXPORT_SYMBOL(cl_object_header_fini);
103
104 /**
105  * Returns a cl_object with a given \a fid.
106  *
107  * Returns either cached or newly created object. Additional reference on the
108  * returned object is acquired.
109  *
110  * \see lu_object_find(), cl_page_find(), cl_lock_find()
111  */
112 struct cl_object *cl_object_find(const struct lu_env *env,
113                                  struct cl_device *cd, const struct lu_fid *fid,
114                                  const struct cl_object_conf *c)
115 {
116         might_sleep();
117         return lu2cl(lu_object_find_slice(env, cl2lu_dev(cd), fid, &c->coc_lu));
118 }
119 EXPORT_SYMBOL(cl_object_find);
120
121 /**
122  * Releases a reference on \a o.
123  *
124  * When last reference is released object is returned to the cache, unless
125  * lu_object_header_flags::LU_OBJECT_HEARD_BANSHEE bit is set in its header.
126  *
127  * \see cl_page_put(), cl_lock_put().
128  */
129 void cl_object_put(const struct lu_env *env, struct cl_object *o)
130 {
131         lu_object_put(env, &o->co_lu);
132 }
133 EXPORT_SYMBOL(cl_object_put);
134
135 /**
136  * Acquire an additional reference to the object \a o.
137  *
138  * This can only be used to acquire _additional_ reference, i.e., caller
139  * already has to possess at least one reference to \a o before calling this.
140  *
141  * \see cl_page_get(), cl_lock_get().
142  */
143 void cl_object_get(struct cl_object *o)
144 {
145         lu_object_get(&o->co_lu);
146 }
147 EXPORT_SYMBOL(cl_object_get);
148
149 /**
150  * Returns the top-object for a given \a o.
151  *
152  * \see cl_io_top()
153  */
154 struct cl_object *cl_object_top(struct cl_object *o)
155 {
156         struct cl_object_header *hdr = cl_object_header(o);
157         struct cl_object *top;
158
159         while (hdr->coh_parent != NULL)
160                 hdr = hdr->coh_parent;
161
162         top = lu2cl(lu_object_top(&hdr->coh_lu));
163         CDEBUG(D_TRACE, "%p -> %p\n", o, top);
164         return top;
165 }
166 EXPORT_SYMBOL(cl_object_top);
167
168 /**
169  * Returns pointer to the lock protecting data-attributes for the given object
170  * \a o.
171  *
172  * Data-attributes are protected by the cl_object_header::coh_attr_guard
173  * spin-lock in the top-object.
174  *
175  * \see cl_attr, cl_object_attr_lock(), cl_object_operations::coo_attr_get().
176  */
177 static spinlock_t *cl_object_attr_guard(struct cl_object *o)
178 {
179         return &cl_object_header(cl_object_top(o))->coh_attr_guard;
180 }
181
182 /**
183  * Locks data-attributes.
184  *
185  * Prevents data-attributes from changing, until lock is released by
186  * cl_object_attr_unlock(). This has to be called before calls to
187  * cl_object_attr_get(), cl_object_attr_set().
188  */
189 void cl_object_attr_lock(struct cl_object *o)
190 {
191         spin_lock(cl_object_attr_guard(o));
192 }
193 EXPORT_SYMBOL(cl_object_attr_lock);
194
195 /**
196  * Releases data-attributes lock, acquired by cl_object_attr_lock().
197  */
198 void cl_object_attr_unlock(struct cl_object *o)
199 {
200         spin_unlock(cl_object_attr_guard(o));
201 }
202 EXPORT_SYMBOL(cl_object_attr_unlock);
203
204 /**
205  * Returns data-attributes of an object \a obj.
206  *
207  * Every layer is asked (by calling cl_object_operations::coo_attr_get())
208  * top-to-bottom to fill in parts of \a attr that this layer is responsible
209  * for.
210  */
211 int cl_object_attr_get(const struct lu_env *env, struct cl_object *obj,
212                        struct cl_attr *attr)
213 {
214         struct lu_object_header *top;
215         int result;
216
217         assert_spin_locked(cl_object_attr_guard(obj));
218         ENTRY;
219
220         top = obj->co_lu.lo_header;
221         result = 0;
222         list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
223                 if (obj->co_ops->coo_attr_get != NULL) {
224                         result = obj->co_ops->coo_attr_get(env, obj, attr);
225                         if (result != 0) {
226                                 if (result > 0)
227                                         result = 0;
228                                 break;
229                         }
230                 }
231         }
232         RETURN(result);
233 }
234 EXPORT_SYMBOL(cl_object_attr_get);
235
236 /**
237  * Updates data-attributes of an object \a obj.
238  *
239  * Only attributes, mentioned in a validness bit-mask \a v are
240  * updated. Calls cl_object_operations::coo_attr_set() on every layer, bottom
241  * to top.
242  */
243 int cl_object_attr_set(const struct lu_env *env, struct cl_object *obj,
244                        const struct cl_attr *attr, unsigned v)
245 {
246         struct lu_object_header *top;
247         int result;
248
249         assert_spin_locked(cl_object_attr_guard(obj));
250         ENTRY;
251
252         top = obj->co_lu.lo_header;
253         result = 0;
254         list_for_each_entry_reverse(obj, &top->loh_layers, co_lu.lo_linkage) {
255                 if (obj->co_ops->coo_attr_set != NULL) {
256                         result = obj->co_ops->coo_attr_set(env, obj, attr, v);
257                         if (result != 0) {
258                                 if (result > 0)
259                                         result = 0;
260                                 break;
261                         }
262                 }
263         }
264         RETURN(result);
265 }
266 EXPORT_SYMBOL(cl_object_attr_set);
267
268 /**
269  * Notifies layers (bottom-to-top) that glimpse AST was received.
270  *
271  * Layers have to fill \a lvb fields with information that will be shipped
272  * back to glimpse issuer.
273  *
274  * \see cl_lock_operations::clo_glimpse()
275  */
276 int cl_object_glimpse(const struct lu_env *env, struct cl_object *obj,
277                       struct ost_lvb *lvb)
278 {
279         struct lu_object_header *top;
280         int result;
281
282         ENTRY;
283         top = obj->co_lu.lo_header;
284         result = 0;
285         list_for_each_entry_reverse(obj, &top->loh_layers, co_lu.lo_linkage) {
286                 if (obj->co_ops->coo_glimpse != NULL) {
287                         result = obj->co_ops->coo_glimpse(env, obj, lvb);
288                         if (result != 0)
289                                 break;
290                 }
291         }
292         LU_OBJECT_HEADER(D_DLMTRACE, env, lu_object_top(top),
293                          "size: "LPU64" mtime: "LPU64" atime: "LPU64" "
294                          "ctime: "LPU64" blocks: "LPU64"\n",
295                          lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_atime,
296                          lvb->lvb_ctime, lvb->lvb_blocks);
297         RETURN(result);
298 }
299 EXPORT_SYMBOL(cl_object_glimpse);
300
301 /**
302  * Updates a configuration of an object \a obj.
303  */
304 int cl_conf_set(const struct lu_env *env, struct cl_object *obj,
305                 const struct cl_object_conf *conf)
306 {
307         struct lu_object_header *top;
308         int result;
309
310         ENTRY;
311         top = obj->co_lu.lo_header;
312         result = 0;
313         list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
314                 if (obj->co_ops->coo_conf_set != NULL) {
315                         result = obj->co_ops->coo_conf_set(env, obj, conf);
316                         if (result != 0)
317                                 break;
318                 }
319         }
320         RETURN(result);
321 }
322 EXPORT_SYMBOL(cl_conf_set);
323
324 /**
325  * Prunes caches of pages and locks for this object.
326  */
327 void cl_object_prune(const struct lu_env *env, struct cl_object *obj)
328 {
329         struct lu_object_header *top;
330         struct cl_object *o;
331         int result;
332         ENTRY;
333
334         top = obj->co_lu.lo_header;
335         result = 0;
336         list_for_each_entry(o, &top->loh_layers, co_lu.lo_linkage) {
337                 if (o->co_ops->coo_prune != NULL) {
338                         result = o->co_ops->coo_prune(env, o);
339                         if (result != 0)
340                                 break;
341                 }
342         }
343
344         /* TODO: pruning locks will be moved into layers after cl_lock
345          * simplification is done */
346         cl_locks_prune(env, obj, 1);
347         EXIT;
348 }
349 EXPORT_SYMBOL(cl_object_prune);
350
351 /**
352  * Helper function removing all object locks, and marking object for
353  * deletion. All object pages must have been deleted at this point.
354  *
355  * This is called by cl_inode_fini() and lov_object_delete() to destroy top-
356  * and sub- objects respectively.
357  */
358 void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
359 {
360         struct cl_object_header *hdr;
361
362         hdr = cl_object_header(obj);
363
364         set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags);
365         /*
366          * Destroy all locks. Object destruction (including cl_inode_fini())
367          * cannot cancel the locks, because in the case of a local client,
368          * where client and server share the same thread running
369          * prune_icache(), this can dead-lock with ldlm_cancel_handler()
370          * waiting on __wait_on_freeing_inode().
371          */
372         cl_locks_prune(env, obj, 0);
373 }
374 EXPORT_SYMBOL(cl_object_kill);
375
376 /**
377  * Check if the object has locks.
378  */
379 int cl_object_has_locks(struct cl_object *obj)
380 {
381         struct cl_object_header *head = cl_object_header(obj);
382         int has;
383
384         spin_lock(&head->coh_lock_guard);
385         has = list_empty(&head->coh_locks);
386         spin_unlock(&head->coh_lock_guard);
387
388         return (has == 0);
389 }
390 EXPORT_SYMBOL(cl_object_has_locks);
391
392 void cache_stats_init(struct cache_stats *cs, const char *name)
393 {
394         int i;
395
396         cs->cs_name = name;
397         for (i = 0; i < CS_NR; i++)
398                 atomic_set(&cs->cs_stats[i], 0);
399 }
400
401 int cache_stats_print(const struct cache_stats *cs, struct seq_file *m, int h)
402 {
403         int i;
404
405         /*
406          *   lookup    hit    total  cached create
407          * env: ...... ...... ...... ...... ......
408          */
409         if (h) {
410                 const char *names[CS_NR] = CS_NAMES;
411
412                 seq_printf(m, "%6s", " ");
413                 for (i = 0; i < CS_NR; i++)
414                         seq_printf(m, "%8s", names[i]);
415                 seq_printf(m, "\n");
416         }
417
418         seq_printf(m, "%5.5s:", cs->cs_name);
419         for (i = 0; i < CS_NR; i++)
420                 seq_printf(m, "%8u", atomic_read(&cs->cs_stats[i]));
421         return 0;
422 }
423
424 static void cl_env_percpu_refill(void);
425
426 /**
427  * Initialize client site.
428  *
429  * Perform common initialization (lu_site_init()), and initialize statistical
430  * counters. Also perform global initializations on the first call.
431  */
432 int cl_site_init(struct cl_site *s, struct cl_device *d)
433 {
434         int i;
435         int result;
436
437         result = lu_site_init(&s->cs_lu, &d->cd_lu_dev);
438         if (result == 0) {
439                 cache_stats_init(&s->cs_pages, "pages");
440                 cache_stats_init(&s->cs_locks, "locks");
441                 for (i = 0; i < ARRAY_SIZE(s->cs_pages_state); ++i)
442                         atomic_set(&s->cs_pages_state[0], 0);
443                 for (i = 0; i < ARRAY_SIZE(s->cs_locks_state); ++i)
444                         atomic_set(&s->cs_locks_state[i], 0);
445                 cl_env_percpu_refill();
446         }
447         return result;
448 }
449 EXPORT_SYMBOL(cl_site_init);
450
451 /**
452  * Finalize client site. Dual to cl_site_init().
453  */
454 void cl_site_fini(struct cl_site *s)
455 {
456         lu_site_fini(&s->cs_lu);
457 }
458 EXPORT_SYMBOL(cl_site_fini);
459
460 static struct cache_stats cl_env_stats = {
461         .cs_name    = "envs",
462         .cs_stats = { ATOMIC_INIT(0), }
463 };
464
465 /**
466  * Outputs client site statistical counters into a buffer. Suitable for
467  * ll_rd_*()-style functions.
468  */
469 int cl_site_stats_print(const struct cl_site *site, struct seq_file *m)
470 {
471         static const char *pstate[] = {
472                 [CPS_CACHED]    = "c",
473                 [CPS_OWNED]     = "o",
474                 [CPS_PAGEOUT]   = "w",
475                 [CPS_PAGEIN]    = "r",
476                 [CPS_FREEING]   = "f"
477         };
478         static const char *lstate[] = {
479                 [CLS_NEW]       = "n",
480                 [CLS_QUEUING]   = "q",
481                 [CLS_ENQUEUED]  = "e",
482                 [CLS_HELD]      = "h",
483                 [CLS_INTRANSIT] = "t",
484                 [CLS_CACHED]    = "c",
485                 [CLS_FREEING]   = "f"
486         };
487         int i;
488
489 /*
490        lookup    hit  total   busy create
491 pages: ...... ...... ...... ...... ...... [...... ...... ...... ......]
492 locks: ...... ...... ...... ...... ...... [...... ...... ...... ...... ......]
493   env: ...... ...... ...... ...... ......
494  */
495         lu_site_stats_seq_print(&site->cs_lu, m);
496         cache_stats_print(&site->cs_pages, m, 1);
497         seq_printf(m, " [");
498         for (i = 0; i < ARRAY_SIZE(site->cs_pages_state); ++i)
499                 seq_printf(m, "%s: %u ", pstate[i],
500                            atomic_read(&site->cs_pages_state[i]));
501         seq_printf(m, "]\n");
502         cache_stats_print(&site->cs_locks, m, 0);
503         seq_printf(m, " [");
504         for (i = 0; i < ARRAY_SIZE(site->cs_locks_state); ++i)
505                 seq_printf(m, "%s: %u ", lstate[i],
506                            atomic_read(&site->cs_locks_state[i]));
507         seq_printf(m, "]\n");
508         cache_stats_print(&cl_env_stats, m, 0);
509         seq_printf(m, "\n");
510         return 0;
511 }
512 EXPORT_SYMBOL(cl_site_stats_print);
513
514 /*****************************************************************************
515  *
516  * lu_env handling on client.
517  *
518  */
519
520 /**
521  * The most efficient way is to store cl_env pointer in task specific
522  * structures. On Linux, it wont' be easy to use task_struct->journal_info
523  * because Lustre code may call into other fs which has certain assumptions
524  * about journal_info. Currently following fields in task_struct are identified
525  * can be used for this purpose:
526  *  - cl_env: for liblustre.
527  *  - tux_info: ony on RedHat kernel.
528  *  - ...
529  * \note As long as we use task_struct to store cl_env, we assume that once
530  * called into Lustre, we'll never call into the other part of the kernel
531  * which will use those fields in task_struct without explicitly exiting
532  * Lustre.
533  *
534  * If there's no space in task_struct is available, hash will be used.
535  * bz20044, bz22683.
536  */
537
538 static struct list_head cl_envs;
539 static unsigned cl_envs_cached_nr  = 0;
540 static unsigned cl_envs_cached_max = 128; /* XXX: prototype: arbitrary limit
541                                            * for now. */
542 static DEFINE_SPINLOCK(cl_envs_guard);
543
544 struct cl_env {
545         void             *ce_magic;
546         struct lu_env     ce_lu;
547         struct lu_context ce_ses;
548
549 #ifdef LL_TASK_CL_ENV
550         void             *ce_prev;
551 #else
552         /**
553          * This allows cl_env to be entered into cl_env_hash which implements
554          * the current thread -> client environment lookup.
555          */
556         struct hlist_node  ce_node;
557 #endif
558         /**
559          * Owner for the current cl_env.
560          *
561          * If LL_TASK_CL_ENV is defined, this point to the owning current,
562          * only for debugging purpose ;
563          * Otherwise hash is used, and this is the key for cfs_hash.
564          * Now current thread pid is stored. Note using thread pointer would
565          * lead to unbalanced hash because of its specific allocation locality
566          * and could be varied for different platforms and OSes, even different
567          * OS versions.
568          */
569         void             *ce_owner;
570
571         /*
572          * Linkage into global list of all client environments. Used for
573          * garbage collection.
574          */
575         struct list_head  ce_linkage;
576         /*
577          *
578          */
579         int               ce_ref;
580         /*
581          * Debugging field: address of the caller who made original
582          * allocation.
583          */
584         void             *ce_debug;
585 };
586
587 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
588 #define CL_ENV_INC(counter) atomic_inc(&cl_env_stats.cs_stats[CS_##counter])
589
590 #define CL_ENV_DEC(counter) do {                                              \
591         LASSERT(atomic_read(&cl_env_stats.cs_stats[CS_##counter]) > 0);   \
592         atomic_dec(&cl_env_stats.cs_stats[CS_##counter]);                 \
593 } while (0)
594 #else
595 #define CL_ENV_INC(counter)
596 #define CL_ENV_DEC(counter)
597 #endif
598
599 static void cl_env_init0(struct cl_env *cle, void *debug)
600 {
601         LASSERT(cle->ce_ref == 0);
602         LASSERT(cle->ce_magic == &cl_env_init0);
603         LASSERT(cle->ce_debug == NULL && cle->ce_owner == NULL);
604
605         cle->ce_ref = 1;
606         cle->ce_debug = debug;
607         CL_ENV_INC(busy);
608 }
609
610
611 #ifndef LL_TASK_CL_ENV
612 /*
613  * The implementation of using hash table to connect cl_env and thread
614  */
615
616 static cfs_hash_t *cl_env_hash;
617
618 static unsigned cl_env_hops_hash(cfs_hash_t *lh,
619                                  const void *key, unsigned mask)
620 {
621 #if BITS_PER_LONG == 64
622         return cfs_hash_u64_hash((__u64)key, mask);
623 #else
624         return cfs_hash_u32_hash((__u32)key, mask);
625 #endif
626 }
627
628 static void *cl_env_hops_obj(struct hlist_node *hn)
629 {
630         struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
631
632         LASSERT(cle->ce_magic == &cl_env_init0);
633         return (void *)cle;
634 }
635
636 static int cl_env_hops_keycmp(const void *key, struct hlist_node *hn)
637 {
638         struct cl_env *cle = cl_env_hops_obj(hn);
639
640         LASSERT(cle->ce_owner != NULL);
641         return (key == cle->ce_owner);
642 }
643
644 static void cl_env_hops_noop(cfs_hash_t *hs, struct hlist_node *hn)
645 {
646         struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
647         LASSERT(cle->ce_magic == &cl_env_init0);
648 }
649
650 static cfs_hash_ops_t cl_env_hops = {
651         .hs_hash        = cl_env_hops_hash,
652         .hs_key         = cl_env_hops_obj,
653         .hs_keycmp      = cl_env_hops_keycmp,
654         .hs_object      = cl_env_hops_obj,
655         .hs_get         = cl_env_hops_noop,
656         .hs_put_locked  = cl_env_hops_noop,
657 };
658
659 static inline struct cl_env *cl_env_fetch(void)
660 {
661         struct cl_env *cle;
662
663         cle = cfs_hash_lookup(cl_env_hash, (void *) (long) current->pid);
664         LASSERT(ergo(cle, cle->ce_magic == &cl_env_init0));
665         return cle;
666 }
667
668 static inline void cl_env_attach(struct cl_env *cle)
669 {
670         if (cle) {
671                 int rc;
672
673                 LASSERT(cle->ce_owner == NULL);
674                 cle->ce_owner = (void *) (long) current->pid;
675                 rc = cfs_hash_add_unique(cl_env_hash, cle->ce_owner,
676                                          &cle->ce_node);
677                 LASSERT(rc == 0);
678         }
679 }
680
681 static inline void cl_env_do_detach(struct cl_env *cle)
682 {
683         void *cookie;
684
685         LASSERT(cle->ce_owner == (void *) (long) current->pid);
686         cookie = cfs_hash_del(cl_env_hash, cle->ce_owner,
687                               &cle->ce_node);
688         LASSERT(cookie == cle);
689         cle->ce_owner = NULL;
690 }
691
692 static int cl_env_store_init(void) {
693         cl_env_hash = cfs_hash_create("cl_env",
694                                       HASH_CL_ENV_BITS, HASH_CL_ENV_BITS,
695                                       HASH_CL_ENV_BKT_BITS, 0,
696                                       CFS_HASH_MIN_THETA,
697                                       CFS_HASH_MAX_THETA,
698                                       &cl_env_hops,
699                                       CFS_HASH_RW_BKTLOCK);
700         return cl_env_hash != NULL ? 0 :-ENOMEM;
701 }
702
703 static void cl_env_store_fini(void) {
704         cfs_hash_putref(cl_env_hash);
705 }
706
707 #else /* LL_TASK_CL_ENV */
708 /*
709  * The implementation of store cl_env directly in thread structure.
710  */
711
712 static inline struct cl_env *cl_env_fetch(void)
713 {
714         struct cl_env *cle;
715
716         cle = current->LL_TASK_CL_ENV;
717         if (cle && cle->ce_magic != &cl_env_init0)
718                 cle = NULL;
719         return cle;
720 }
721
722 static inline void cl_env_attach(struct cl_env *cle)
723 {
724         if (cle) {
725                 LASSERT(cle->ce_owner == NULL);
726                 cle->ce_owner = current;
727                 cle->ce_prev = current->LL_TASK_CL_ENV;
728                 current->LL_TASK_CL_ENV = cle;
729         }
730 }
731
732 static inline void cl_env_do_detach(struct cl_env *cle)
733 {
734         LASSERT(cle->ce_owner == current);
735         LASSERT(current->LL_TASK_CL_ENV == cle);
736         current->LL_TASK_CL_ENV = cle->ce_prev;
737         cle->ce_owner = NULL;
738 }
739
740 static int cl_env_store_init(void) { return 0; }
741 static void cl_env_store_fini(void) { }
742
743 #endif /* LL_TASK_CL_ENV */
744
745 static inline struct cl_env *cl_env_detach(struct cl_env *cle)
746 {
747         if (cle == NULL)
748                 cle = cl_env_fetch();
749
750         if (cle && cle->ce_owner)
751                 cl_env_do_detach(cle);
752
753         return cle;
754 }
755
756 static struct lu_env *cl_env_new(__u32 ctx_tags, __u32 ses_tags, void *debug)
757 {
758         struct lu_env *env;
759         struct cl_env *cle;
760
761         OBD_SLAB_ALLOC_PTR_GFP(cle, cl_env_kmem, GFP_NOFS);
762         if (cle != NULL) {
763                 int rc;
764
765                 INIT_LIST_HEAD(&cle->ce_linkage);
766                 cle->ce_magic = &cl_env_init0;
767                 env = &cle->ce_lu;
768                 rc = lu_env_init(env, LCT_CL_THREAD|ctx_tags);
769                 if (rc == 0) {
770                         rc = lu_context_init(&cle->ce_ses,
771                                              LCT_SESSION | ses_tags);
772                         if (rc == 0) {
773                                 lu_context_enter(&cle->ce_ses);
774                                 env->le_ses = &cle->ce_ses;
775                                 cl_env_init0(cle, debug);
776                         } else
777                                 lu_env_fini(env);
778                 }
779                 if (rc != 0) {
780                         OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
781                         env = ERR_PTR(rc);
782                 } else {
783                         CL_ENV_INC(create);
784                         CL_ENV_INC(total);
785                 }
786         } else
787                 env = ERR_PTR(-ENOMEM);
788         return env;
789 }
790
791 static void cl_env_fini(struct cl_env *cle)
792 {
793         CL_ENV_DEC(total);
794         lu_context_fini(&cle->ce_lu.le_ctx);
795         lu_context_fini(&cle->ce_ses);
796         OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
797 }
798
799 static struct lu_env *cl_env_obtain(void *debug)
800 {
801         struct cl_env *cle;
802         struct lu_env *env;
803
804         ENTRY;
805         spin_lock(&cl_envs_guard);
806         LASSERT(equi(cl_envs_cached_nr == 0, list_empty(&cl_envs)));
807         if (cl_envs_cached_nr > 0) {
808                 int rc;
809
810                 cle = container_of(cl_envs.next, struct cl_env, ce_linkage);
811                 list_del_init(&cle->ce_linkage);
812                 cl_envs_cached_nr--;
813                 spin_unlock(&cl_envs_guard);
814
815                 env = &cle->ce_lu;
816                 rc = lu_env_refill(env);
817                 if (rc == 0) {
818                         cl_env_init0(cle, debug);
819                         lu_context_enter(&env->le_ctx);
820                         lu_context_enter(&cle->ce_ses);
821                 } else {
822                         cl_env_fini(cle);
823                         env = ERR_PTR(rc);
824                 }
825         } else {
826                 spin_unlock(&cl_envs_guard);
827                 env = cl_env_new(lu_context_tags_default,
828                                  lu_session_tags_default, debug);
829         }
830         RETURN(env);
831 }
832
833 static inline struct cl_env *cl_env_container(struct lu_env *env)
834 {
835         return container_of(env, struct cl_env, ce_lu);
836 }
837
838 struct lu_env *cl_env_peek(int *refcheck)
839 {
840         struct lu_env *env;
841         struct cl_env *cle;
842
843         CL_ENV_INC(lookup);
844
845         /* check that we don't go far from untrusted pointer */
846         CLASSERT(offsetof(struct cl_env, ce_magic) == 0);
847
848         env = NULL;
849         cle = cl_env_fetch();
850         if (cle != NULL) {
851                 CL_ENV_INC(hit);
852                 env = &cle->ce_lu;
853                 *refcheck = ++cle->ce_ref;
854         }
855         CDEBUG(D_OTHER, "%d@%p\n", cle ? cle->ce_ref : 0, cle);
856         return env;
857 }
858 EXPORT_SYMBOL(cl_env_peek);
859
860 /**
861  * Returns lu_env: if there already is an environment associated with the
862  * current thread, it is returned, otherwise, new environment is allocated.
863  *
864  * Allocations are amortized through the global cache of environments.
865  *
866  * \param refcheck pointer to a counter used to detect environment leaks. In
867  * the usual case cl_env_get() and cl_env_put() are called in the same lexical
868  * scope and pointer to the same integer is passed as \a refcheck. This is
869  * used to detect missed cl_env_put().
870  *
871  * \see cl_env_put()
872  */
873 struct lu_env *cl_env_get(int *refcheck)
874 {
875         struct lu_env *env;
876
877         env = cl_env_peek(refcheck);
878         if (env == NULL) {
879                 env = cl_env_obtain(__builtin_return_address(0));
880                 if (!IS_ERR(env)) {
881                         struct cl_env *cle;
882
883                         cle = cl_env_container(env);
884                         cl_env_attach(cle);
885                         *refcheck = cle->ce_ref;
886                         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
887                 }
888         }
889         return env;
890 }
891 EXPORT_SYMBOL(cl_env_get);
892
893 /**
894  * Forces an allocation of a fresh environment with given tags.
895  *
896  * \see cl_env_get()
897  */
898 struct lu_env *cl_env_alloc(int *refcheck, __u32 tags)
899 {
900         struct lu_env *env;
901
902         LASSERT(cl_env_peek(refcheck) == NULL);
903         env = cl_env_new(tags, tags, __builtin_return_address(0));
904         if (!IS_ERR(env)) {
905                 struct cl_env *cle;
906
907                 cle = cl_env_container(env);
908                 *refcheck = cle->ce_ref;
909                 CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
910         }
911         return env;
912 }
913 EXPORT_SYMBOL(cl_env_alloc);
914
915 static void cl_env_exit(struct cl_env *cle)
916 {
917         LASSERT(cle->ce_owner == NULL);
918         lu_context_exit(&cle->ce_lu.le_ctx);
919         lu_context_exit(&cle->ce_ses);
920 }
921
922 /**
923  * Finalizes and frees a given number of cached environments. This is done to
924  * (1) free some memory (not currently hooked into VM), or (2) release
925  * references to modules.
926  */
927 unsigned cl_env_cache_purge(unsigned nr)
928 {
929         struct cl_env *cle;
930
931         ENTRY;
932         spin_lock(&cl_envs_guard);
933         for (; !list_empty(&cl_envs) && nr > 0; --nr) {
934                 cle = container_of(cl_envs.next, struct cl_env, ce_linkage);
935                 list_del_init(&cle->ce_linkage);
936                 LASSERT(cl_envs_cached_nr > 0);
937                 cl_envs_cached_nr--;
938                 spin_unlock(&cl_envs_guard);
939
940                 cl_env_fini(cle);
941                 spin_lock(&cl_envs_guard);
942         }
943         LASSERT(equi(cl_envs_cached_nr == 0, list_empty(&cl_envs)));
944         spin_unlock(&cl_envs_guard);
945         RETURN(nr);
946 }
947 EXPORT_SYMBOL(cl_env_cache_purge);
948
949 /**
950  * Release an environment.
951  *
952  * Decrement \a env reference counter. When counter drops to 0, nothing in
953  * this thread is using environment and it is returned to the allocation
954  * cache, or freed straight away, if cache is large enough.
955  */
956 void cl_env_put(struct lu_env *env, int *refcheck)
957 {
958         struct cl_env *cle;
959
960         cle = cl_env_container(env);
961
962         LASSERT(cle->ce_ref > 0);
963         LASSERT(ergo(refcheck != NULL, cle->ce_ref == *refcheck));
964
965         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
966         if (--cle->ce_ref == 0) {
967                 CL_ENV_DEC(busy);
968                 cl_env_detach(cle);
969                 cle->ce_debug = NULL;
970                 cl_env_exit(cle);
971                 /*
972                  * Don't bother to take a lock here.
973                  *
974                  * Return environment to the cache only when it was allocated
975                  * with the standard tags.
976                  */
977                 if (cl_envs_cached_nr < cl_envs_cached_max &&
978                     (env->le_ctx.lc_tags & ~LCT_HAS_EXIT) == LCT_CL_THREAD &&
979                     (env->le_ses->lc_tags & ~LCT_HAS_EXIT) == LCT_SESSION) {
980                         spin_lock(&cl_envs_guard);
981                         list_add(&cle->ce_linkage, &cl_envs);
982                         cl_envs_cached_nr++;
983                         spin_unlock(&cl_envs_guard);
984                 } else
985                         cl_env_fini(cle);
986         }
987 }
988 EXPORT_SYMBOL(cl_env_put);
989
990 /**
991  * Declares a point of re-entrancy.
992  *
993  * \see cl_env_reexit()
994  */
995 void *cl_env_reenter(void)
996 {
997         return cl_env_detach(NULL);
998 }
999 EXPORT_SYMBOL(cl_env_reenter);
1000
1001 /**
1002  * Exits re-entrancy.
1003  */
1004 void cl_env_reexit(void *cookie)
1005 {
1006         cl_env_detach(NULL);
1007         cl_env_attach(cookie);
1008 }
1009 EXPORT_SYMBOL(cl_env_reexit);
1010
1011 /**
1012  * Setup user-supplied \a env as a current environment. This is to be used to
1013  * guaranteed that environment exists even when cl_env_get() fails. It is up
1014  * to user to ensure proper concurrency control.
1015  *
1016  * \see cl_env_unplant()
1017  */
1018 void cl_env_implant(struct lu_env *env, int *refcheck)
1019 {
1020         struct cl_env *cle = cl_env_container(env);
1021
1022         LASSERT(cle->ce_ref > 0);
1023
1024         cl_env_attach(cle);
1025         cl_env_get(refcheck);
1026         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
1027 }
1028 EXPORT_SYMBOL(cl_env_implant);
1029
1030 /**
1031  * Detach environment installed earlier by cl_env_implant().
1032  */
1033 void cl_env_unplant(struct lu_env *env, int *refcheck)
1034 {
1035         struct cl_env *cle = cl_env_container(env);
1036
1037         LASSERT(cle->ce_ref > 1);
1038
1039         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
1040
1041         cl_env_detach(cle);
1042         cl_env_put(env, refcheck);
1043 }
1044 EXPORT_SYMBOL(cl_env_unplant);
1045
1046 struct lu_env *cl_env_nested_get(struct cl_env_nest *nest)
1047 {
1048         struct lu_env *env;
1049
1050         nest->cen_cookie = NULL;
1051         env = cl_env_peek(&nest->cen_refcheck);
1052         if (env != NULL) {
1053                 if (!cl_io_is_going(env))
1054                         return env;
1055                 else {
1056                         cl_env_put(env, &nest->cen_refcheck);
1057                         nest->cen_cookie = cl_env_reenter();
1058                 }
1059         }
1060         env = cl_env_get(&nest->cen_refcheck);
1061         if (IS_ERR(env)) {
1062                 cl_env_reexit(nest->cen_cookie);
1063                 return env;
1064         }
1065
1066         LASSERT(!cl_io_is_going(env));
1067         return env;
1068 }
1069 EXPORT_SYMBOL(cl_env_nested_get);
1070
1071 void cl_env_nested_put(struct cl_env_nest *nest, struct lu_env *env)
1072 {
1073         cl_env_put(env, &nest->cen_refcheck);
1074         cl_env_reexit(nest->cen_cookie);
1075 }
1076 EXPORT_SYMBOL(cl_env_nested_put);
1077
1078 /**
1079  * Converts struct cl_attr to struct ost_lvb.
1080  *
1081  * \see cl_lvb2attr
1082  */
1083 void cl_attr2lvb(struct ost_lvb *lvb, const struct cl_attr *attr)
1084 {
1085         ENTRY;
1086         lvb->lvb_size   = attr->cat_size;
1087         lvb->lvb_mtime  = attr->cat_mtime;
1088         lvb->lvb_atime  = attr->cat_atime;
1089         lvb->lvb_ctime  = attr->cat_ctime;
1090         lvb->lvb_blocks = attr->cat_blocks;
1091         EXIT;
1092 }
1093 EXPORT_SYMBOL(cl_attr2lvb);
1094
1095 /**
1096  * Converts struct ost_lvb to struct cl_attr.
1097  *
1098  * \see cl_attr2lvb
1099  */
1100 void cl_lvb2attr(struct cl_attr *attr, const struct ost_lvb *lvb)
1101 {
1102         ENTRY;
1103         attr->cat_size   = lvb->lvb_size;
1104         attr->cat_mtime  = lvb->lvb_mtime;
1105         attr->cat_atime  = lvb->lvb_atime;
1106         attr->cat_ctime  = lvb->lvb_ctime;
1107         attr->cat_blocks = lvb->lvb_blocks;
1108         EXIT;
1109 }
1110 EXPORT_SYMBOL(cl_lvb2attr);
1111
1112 static struct cl_env cl_env_percpu[NR_CPUS];
1113
1114 static int cl_env_percpu_init(void)
1115 {
1116         struct cl_env *cle;
1117         int tags = LCT_REMEMBER | LCT_NOREF;
1118         int i, j;
1119         int rc = 0;
1120
1121         for_each_possible_cpu(i) {
1122                 struct lu_env *env;
1123
1124                 cle = &cl_env_percpu[i];
1125                 env = &cle->ce_lu;
1126
1127                 INIT_LIST_HEAD(&cle->ce_linkage);
1128                 cle->ce_magic = &cl_env_init0;
1129                 rc = lu_env_init(env, LCT_CL_THREAD | tags);
1130                 if (rc == 0) {
1131                         rc = lu_context_init(&cle->ce_ses, LCT_SESSION | tags);
1132                         if (rc == 0) {
1133                                 lu_context_enter(&cle->ce_ses);
1134                                 env->le_ses = &cle->ce_ses;
1135                         } else {
1136                                 lu_env_fini(env);
1137                         }
1138                 }
1139                 if (rc != 0)
1140                         break;
1141         }
1142         if (rc != 0) {
1143                 /* Indices 0 to i (excluding i) were correctly initialized,
1144                  * thus we must uninitialize up to i, the rest are undefined. */
1145                 for (j = 0; j < i; j++) {
1146                         cle = &cl_env_percpu[i];
1147                         lu_context_exit(&cle->ce_ses);
1148                         lu_context_fini(&cle->ce_ses);
1149                         lu_env_fini(&cle->ce_lu);
1150                 }
1151         }
1152
1153         return rc;
1154 }
1155
1156 static void cl_env_percpu_fini(void)
1157 {
1158         int i;
1159
1160         for_each_possible_cpu(i) {
1161                 struct cl_env *cle = &cl_env_percpu[i];
1162
1163                 lu_context_exit(&cle->ce_ses);
1164                 lu_context_fini(&cle->ce_ses);
1165                 lu_env_fini(&cle->ce_lu);
1166         }
1167 }
1168
1169 static void cl_env_percpu_refill(void)
1170 {
1171         int i;
1172
1173         for_each_possible_cpu(i)
1174                 lu_env_refill(&cl_env_percpu[i].ce_lu);
1175 }
1176
1177 void cl_env_percpu_put(struct lu_env *env)
1178 {
1179         struct cl_env *cle;
1180         int cpu;
1181
1182         cpu = smp_processor_id();
1183         cle = cl_env_container(env);
1184         LASSERT(cle == &cl_env_percpu[cpu]);
1185
1186         cle->ce_ref--;
1187         LASSERT(cle->ce_ref == 0);
1188
1189         CL_ENV_DEC(busy);
1190         cl_env_detach(cle);
1191         cle->ce_debug = NULL;
1192
1193         put_cpu();
1194 }
1195 EXPORT_SYMBOL(cl_env_percpu_put);
1196
1197 struct lu_env *cl_env_percpu_get()
1198 {
1199         struct cl_env *cle;
1200
1201         cle = &cl_env_percpu[get_cpu()];
1202         cl_env_init0(cle, __builtin_return_address(0));
1203
1204         cl_env_attach(cle);
1205         return &cle->ce_lu;
1206 }
1207 EXPORT_SYMBOL(cl_env_percpu_get);
1208
1209 /*****************************************************************************
1210  *
1211  * Temporary prototype thing: mirror obd-devices into cl devices.
1212  *
1213  */
1214
1215 struct cl_device *cl_type_setup(const struct lu_env *env, struct lu_site *site,
1216                                 struct lu_device_type *ldt,
1217                                 struct lu_device *next)
1218 {
1219         const char       *typename;
1220         struct lu_device *d;
1221
1222         LASSERT(ldt != NULL);
1223
1224         typename = ldt->ldt_name;
1225         d = ldt->ldt_ops->ldto_device_alloc(env, ldt, NULL);
1226         if (!IS_ERR(d)) {
1227                 int rc;
1228
1229                 if (site != NULL)
1230                         d->ld_site = site;
1231                 rc = ldt->ldt_ops->ldto_device_init(env, d, typename, next);
1232                 if (rc == 0) {
1233                         lu_device_get(d);
1234                         lu_ref_add(&d->ld_reference,
1235                                    "lu-stack", &lu_site_init);
1236                 } else {
1237                         ldt->ldt_ops->ldto_device_free(env, d);
1238                         CERROR("can't init device '%s', %d\n", typename, rc);
1239                         d = ERR_PTR(rc);
1240                 }
1241         } else
1242                 CERROR("Cannot allocate device: '%s'\n", typename);
1243         return lu2cl_dev(d);
1244 }
1245 EXPORT_SYMBOL(cl_type_setup);
1246
1247 /**
1248  * Finalize device stack by calling lu_stack_fini().
1249  */
1250 void cl_stack_fini(const struct lu_env *env, struct cl_device *cl)
1251 {
1252         lu_stack_fini(env, cl2lu_dev(cl));
1253 }
1254 EXPORT_SYMBOL(cl_stack_fini);
1255
1256 int  cl_lock_init(void);
1257 void cl_lock_fini(void);
1258
1259 int  cl_page_init(void);
1260 void cl_page_fini(void);
1261
1262 static struct lu_context_key cl_key;
1263
1264 struct cl_thread_info *cl_env_info(const struct lu_env *env)
1265 {
1266         return lu_context_key_get(&env->le_ctx, &cl_key);
1267 }
1268
1269 /* defines cl0_key_{init,fini}() */
1270 LU_KEY_INIT_FINI(cl0, struct cl_thread_info);
1271
1272 static void *cl_key_init(const struct lu_context *ctx,
1273                          struct lu_context_key *key)
1274 {
1275         struct cl_thread_info *info;
1276
1277         info = cl0_key_init(ctx, key);
1278         if (!IS_ERR(info)) {
1279                 int i;
1280
1281                 for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1282                         lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1283         }
1284         return info;
1285 }
1286
1287 static void cl_key_fini(const struct lu_context *ctx,
1288                         struct lu_context_key *key, void *data)
1289 {
1290         struct cl_thread_info *info;
1291         int i;
1292
1293         info = data;
1294         for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1295                 lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1296         cl0_key_fini(ctx, key, data);
1297 }
1298
1299 static void cl_key_exit(const struct lu_context *ctx,
1300                         struct lu_context_key *key, void *data)
1301 {
1302         struct cl_thread_info *info = data;
1303         int i;
1304
1305         for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i) {
1306                 LASSERT(info->clt_counters[i].ctc_nr_held == 0);
1307                 LASSERT(info->clt_counters[i].ctc_nr_used == 0);
1308                 LASSERT(info->clt_counters[i].ctc_nr_locks_acquired == 0);
1309                 LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
1310                 lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1311                 lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1312         }
1313 }
1314
1315 static struct lu_context_key cl_key = {
1316         .lct_tags = LCT_CL_THREAD,
1317         .lct_init = cl_key_init,
1318         .lct_fini = cl_key_fini,
1319         .lct_exit = cl_key_exit
1320 };
1321
1322 static struct lu_kmem_descr cl_object_caches[] = {
1323         {
1324                 .ckd_cache = &cl_env_kmem,
1325                 .ckd_name  = "cl_env_kmem",
1326                 .ckd_size  = sizeof (struct cl_env)
1327         },
1328         {
1329                 .ckd_cache = NULL
1330         }
1331 };
1332
1333 /**
1334  * Global initialization of cl-data. Create kmem caches, register
1335  * lu_context_key's, etc.
1336  *
1337  * \see cl_global_fini()
1338  */
1339 int cl_global_init(void)
1340 {
1341         int result;
1342
1343         INIT_LIST_HEAD(&cl_envs);
1344
1345         result = cl_env_store_init();
1346         if (result)
1347                 return result;
1348
1349         result = lu_kmem_init(cl_object_caches);
1350         if (result)
1351                 goto out_store;
1352
1353         LU_CONTEXT_KEY_INIT(&cl_key);
1354         result = lu_context_key_register(&cl_key);
1355         if (result)
1356                 goto out_kmem;
1357
1358         result = cl_lock_init();
1359         if (result)
1360                 goto out_context;
1361
1362         result = cl_page_init();
1363         if (result)
1364                 goto out_lock;
1365
1366         result = cl_env_percpu_init();
1367         if (result)
1368                 /* no cl_env_percpu_fini on error */
1369                 goto out_lock;
1370
1371         return 0;
1372 out_lock:
1373         cl_lock_fini();
1374 out_context:
1375         lu_context_key_degister(&cl_key);
1376 out_kmem:
1377         lu_kmem_fini(cl_object_caches);
1378 out_store:
1379         cl_env_store_fini();
1380         return result;
1381 }
1382
1383 /**
1384  * Finalization of global cl-data. Dual to cl_global_init().
1385  */
1386 void cl_global_fini(void)
1387 {
1388         cl_env_percpu_fini();
1389         cl_lock_fini();
1390         cl_page_fini();
1391         lu_context_key_degister(&cl_key);
1392         lu_kmem_fini(cl_object_caches);
1393         cl_env_store_fini();
1394 }